3 `websocket-extensions` models the extension negotiation and processing pipeline
4 of the WebSocket protocol. Between the driver parsing messages from the TCP
5 stream and handing those messages off to the application, there may exist a
6 stack of extensions that transform the message somehow.
8 In the parlance of this framework, a *session* refers to a single instance of an
9 extension, acting on a particular socket on either the server or the client
10 side. A session may transform messages both incoming to the application and
11 outgoing from the application, for example the `permessage-deflate` extension
12 compresses outgoing messages and decompresses incoming messages. Message streams
13 in either direction are independent; that is, incoming and outgoing messages
14 cannot be assumed to 'pair up' as in a request-response protocol.
16 Asynchronous processing of messages poses a number of problems that this
17 pipeline construction is intended to solve.
22 Logically, we have the following:
25 +-------------+ out +---+ +---+ +---+ +--------+
26 | |------>| |---->| |---->| |------>| |
27 | Application | | A | | B | | C | | Driver |
28 | |<------| |<----| |<----| |<------| |
29 +-------------+ in +---+ +---+ +---+ +--------+
32 +----------o----------+
37 For outgoing messages, the driver receives the result of
39 C.outgoing(B.outgoing(A.outgoing(message)))
41 or, [A, B, C].reduce(((m, ext) => ext.outgoing(m)), message)
43 For incoming messages, the application receives the result of
45 A.incoming(B.incoming(C.incoming(message)))
47 or, [C, B, A].reduce(((m, ext) => ext.incoming(m)), message)
49 A session is of the following type, to borrow notation from pseudo-Haskell:
52 incoming :: Message -> Message
53 outgoing :: Message -> Message
57 (That `() -> ()` syntax is intended to mean that `close()` is a nullary void
58 method; I apologise to any Haskell readers for not using the right monad.)
60 The `incoming()` and `outgoing()` methods perform message transformation in the
61 respective directions; `close()` is called when a socket closes so the session
62 can release any resources it's holding, for example a DEFLATE de/compression
65 However because this is JavaScript, the `incoming()` and `outgoing()` methods
66 may be asynchronous (indeed, `permessage-deflate` is based on `zlib`, whose API
67 is stream-based). So their interface is strictly:
70 incoming :: Message -> Callback -> ()
71 outgoing :: Message -> Callback -> ()
75 type Callback = Either Error Message -> ()
77 This means a message *m2* can be pushed into a session while it's still
78 processing the preceding message *m1*. The messages can be processed
79 concurrently but they *must* be given to the next session in line (or to the
80 application) in the same order they came in. Applications will expect to receive
81 messages in the order they arrived over the wire, and sessions require this too.
82 So ordering of messages must be preserved throughout the pipeline.
84 Consider the following highly simplified extension that deflates messages on the
85 wire. `message` is a value conforming the type:
98 var zlib = require('zlib');
101 outgoing: function(message, callback) {
102 zlib.deflateRaw(message.data, function(error, result) {
104 message.data = result;
105 callback(error, message);
109 incoming: function(message, callback) {
110 // decompress inbound messages (elided)
114 // no state to clean up
119 We can call it with a large message followed by a small one, and the small one
120 will be returned first:
123 var crypto = require('crypto'),
124 large = crypto.randomBytes(1 << 14),
125 small = new Buffer('hi');
127 deflate.outgoing({data: large}, function() {
128 console.log(1, 'large');
131 deflate.outgoing({data: small}, function() {
132 console.log(2, 'small');
139 So a session that processes messages asynchronously may fail to preserve message
142 Now, this extension is stateless, so it can process messages in any order and
143 still produce the same output. But some extensions are stateful and require
144 message order to be preserved.
146 For example, when using `permessage-deflate` without `no_context_takeover` set,
147 the session retains a DEFLATE de/compression context between messages, which
148 accumulates state as it consumes data (later messages can refer to sections of
149 previous ones to improve compression). Reordering parts of the DEFLATE stream
150 will result in a failed decompression. Messages must be decompressed in the same
151 order they were compressed by the peer in order for the DEFLATE protocol to
154 Finally, there is the problem of closing a socket. When a WebSocket is closed by
155 the application, or receives a closing request from the other peer, there may be
156 messages outgoing from the application and incoming from the peer in the
157 pipeline. If we close the socket and pipeline immediately, two problems arise:
159 * We may send our own closing frame to the peer before all prior messages we
160 sent have been written to the socket, and before we have finished processing
161 all prior messages from the peer
162 * The session may be instructed to close its resources (e.g. its de/compression
163 context) while it's in the middle of processing a message, or before it has
164 received messages that are upstream of it in the pipeline
166 Essentially, we must defer closing the sessions and sending a closing frame
167 until after all prior messages have exited the pipeline.
172 * Message order must be preserved between the protocol driver, the extension
173 sessions, and the application
174 * Messages should be handed off to sessions and endpoints as soon as possible,
175 to maximise throughput of stateless sessions
176 * The closing procedure should block any further messages from entering the
177 pipeline, and should allow all existing messages to drain
178 * Sessions should be closed as soon as possible to prevent them holding memory
179 and other resources when they have no more messages to handle
180 * The closing API should allow the caller to detect when the pipeline is empty
181 and it is safe to continue the WebSocket closing procedure
182 * Individual extensions should remain as simple as possible to facilitate
183 modularity and independent authorship
185 The final point about modularity is an important one: this framework is designed
186 to facilitate extensions existing as plugins, by decoupling the protocol driver,
187 extensions, and application. In an ideal world, plugins should only need to
188 contain code for their specific functionality, and not solve these problems that
189 apply to all sessions. Also, solving some of these problems requires
190 consideration of all active sessions collectively, which an individual session
191 is incapable of doing.
193 For example, it is entirely possible to take the simple `deflate` extension
194 above and wrap its `incoming()` and `outgoing()` methods in two `Transform`
195 streams, producing this type:
198 incoming :: TransformStream
199 outtoing :: TransformStream
203 The `Transform` class makes it easy to wrap an async function such that message
207 var stream = require('stream'),
208 session = new stream.Transform({objectMode: true});
210 session._transform = function(message, _, callback) {
212 deflate.outgoing(message, function(error, result) {
219 However, this has a negative impact on throughput: it works by deferring
220 `callback()` until the async function has 'returned', which blocks `Transform`
221 from passing further input into the `_transform()` method until the current
222 message is dealt with completely. This would prevent sessions from processing
223 messages concurrently, and would unnecessarily reduce the throughput of
224 stateless extensions.
226 So, input should be handed off to sessions as soon as possible, and all we need
227 is a mechanism to reorder the output so that message order is preserved for the
228 next session in line.
233 We now describe the model implemented here and how it meets the above design
234 goals. The above diagram where a stack of extensions sit between the driver and
235 application describes the data flow, but not the object graph. That looks like
244 +------------+ +----------+
245 | Extensions o----->| Pipeline |
246 +------------+ +-----o----+
248 +---------------+---------------+
250 +-----o----+ +-----o----+ +-----o----+
251 | Cell [A] | | Cell [B] | | Cell [C] |
252 +----------+ +----------+ +----------+
255 A driver using this framework holds an instance of the `Extensions` class, which
256 it uses to register extension plugins, negotiate headers and transform messages.
257 The `Extensions` instance itself holds a `Pipeline`, which contains an array of
258 `Cell` objects, each of which wraps one of the sessions.
261 ### Message processing
263 Both the `Pipeline` and `Cell` classes have `incoming()` and `outgoing()`
264 methods; the `Pipeline` interface pushes messages into the pipe, delegates the
265 message to each `Cell` in turn, then returns it back to the driver. Outgoing
266 messages pass through `A` then `B` then `C`, and incoming messages in the
269 Internally, a `Cell` contains two `Functor` objects. A `Functor` wraps an async
270 function and makes sure its output messages maintain the order of its input
271 messages. This name is due to [@fronx](https://github.com/fronx), on the basis
272 that, by preserving message order, the abstraction preserves the *mapping*
273 between input and output messages. To use our simple `deflate` extension from
277 var functor = new Functor(deflate, 'outgoing');
279 functor.call({data: large}, function() {
280 console.log(1, 'large');
283 functor.call({data: small}, function() {
284 console.log(2, 'small');
291 A `Cell` contains two of these, one for each direction:
294 +-----------------------+
295 +---->| Functor [A, incoming] |
296 +----------+ | +-----------------------+
298 +----------+ | +-----------------------+
299 +---->| Functor [A, outgoing] |
300 +-----------------------+
303 This satisfies the message transformation requirements: the `Pipeline` simply
304 loops over the cells in the appropriate direction to transform each message.
305 Because each `Cell` will preserve message order, we can pass a message to the
306 next `Cell` in line as soon as the current `Cell` returns it. This gives each
307 `Cell` all the messages in order while maximising throughput.
312 We want to close each session as soon as possible, after all existing messages
313 have drained. To do this, each `Cell` begins with a pending message counter in
314 each direction, labelled `in` and `out` below.
321 +---------------+---------------+
323 +-----o----+ +-----o----+ +-----o----+
324 | Cell [A] | | Cell [B] | | Cell [C] |
325 +----------+ +----------+ +----------+
330 When a message *m1* enters the pipeline, say in the `outgoing` direction, we
331 increment the `pending.out` counter on all cells immediately.
338 +---------------+---------------+
340 +-----o----+ +-----o----+ +-----o----+
341 | Cell [A] | | Cell [B] | | Cell [C] |
342 +----------+ +----------+ +----------+
347 *m1* is handed off to `A`, meanwhile a second message `m2` arrives in the same
348 direction. All `pending.out` counters are again incremented.
355 +---------------+---------------+
357 +-----o----+ +-----o----+ +-----o----+
358 | Cell [A] | | Cell [B] | | Cell [C] |
359 +----------+ +----------+ +----------+
364 When the first cell's `A.outgoing` functor finishes processing *m1*, the first
365 `pending.out` counter is decremented and *m1* is handed off to cell `B`.
372 +---------------+---------------+
374 +-----o----+ +-----o----+ +-----o----+
375 | Cell [A] | | Cell [B] | | Cell [C] |
376 +----------+ +----------+ +----------+
382 As `B` finishes with *m1*, and as `A` finishes with *m2*, the `pending.out`
383 counters continue to decrement.
390 +---------------+---------------+
392 +-----o----+ +-----o----+ +-----o----+
393 | Cell [A] | | Cell [B] | | Cell [C] |
394 +----------+ +----------+ +----------+
400 Say `C` is a little slow, and begins processing *m2* while still processing
401 *m1*. That's fine, the `Functor` mechanism will keep *m1* ahead of *m2* in the
409 +---------------+---------------+
411 +-----o----+ +-----o----+ +-----o----+
412 | Cell [A] | | Cell [B] | | Cell [C] |
413 +----------+ +----------+ +----------+
418 Once all messages are dealt with, the counters return to `0`.
425 +---------------+---------------+
427 +-----o----+ +-----o----+ +-----o----+
428 | Cell [A] | | Cell [B] | | Cell [C] |
429 +----------+ +----------+ +----------+
434 The same process applies in the `incoming` direction, the only difference being
435 that messages are passed to `C` first.
437 This makes closing the sessions quite simple. When the driver wants to close the
438 socket, it calls `Pipeline.close()`. This *immediately* calls `close()` on all
439 the cells. If a cell has `in == out == 0`, then it immediately calls
440 `session.close()`. Otherwise, it stores the closing call and defers it until
441 `in` and `out` have both ticked down to zero. The pipeline will not accept new
442 messages after `close()` has been called, so we know the pending counts will not
443 increase after this point.
445 This means each session is closed as soon as possible: `A` can close while the
446 slow `C` session is still working, because it knows there are no more messages
447 on the way. Similarly, `C` will defer closing if `close()` is called while *m1*
448 is still in `B`, and *m2* in `A`, because its pending count means it knows it
449 has work yet to do, even if it's not received those messages yet. This concern
450 cannot be addressed by extensions acting only on their own local state, unless
451 we pollute individual extensions by making them all implement this same
454 The actual closing API at each level is slightly different:
461 close :: () -> Promise ()
465 close :: Callback -> ()
468 This might appear inconsistent so it's worth explaining. Remember that a
469 `Pipeline` holds a list of `Cell` objects, each wrapping a `Session`. The driver
470 talks (via the `Extensions` API) to the `Pipeline` interface, and it wants
471 `Pipeline.close()` to do two things: close all the sessions, and tell me when
472 it's safe to start the closing procedure (i.e. when all messages have drained
473 from the pipe and been handed off to the application or socket). A callback API
476 At the other end of the stack, `Session.close()` is a nullary void method with
477 no callback or promise API because we don't care what it does, and whatever it
478 does do will not block the WebSocket protocol; we're not going to hold off
479 processing messages while a session closes its de/compression context. We just
480 tell it to close itself, and don't want to wait while it does that.
482 In the middle, `Cell.close()` returns a promise rather than using a callback.
483 This is for two reasons. First, `Cell.close()` might not do anything
484 immediately, it might have to defer its effect while messages drain. So, if
485 given a callback, it would have to store it in a queue for later execution.
486 Callbacks work fine if your method does something and can then invoke the
487 callback itself, but if you need to store callbacks somewhere so another method
488 can execute them, a promise is a better fit. Second, it better serves the
489 purposes of `Pipeline.close()`: it wants to call `close()` on each of a list of
490 cells, and wait for all of them to finish. This is simple and idiomatic using
494 var closed = cells.map((cell) => cell.close());
495 Promise.all(closed).then(callback);
498 (We don't actually use a full *Promises/A+* compatible promise here, we use a
499 much simplified construction that acts as a callback aggregater and resolves
500 synchronously and does not support chaining, but the principle is the same.)
505 We've not mentioned error handling so far but it bears some explanation. The
506 above counter system still applies, but behaves slightly differently in the
509 Say we push three messages into the pipe in the outgoing direction:
513 m3, m2, m1 => | Pipeline |
516 +---------------+---------------+
518 +-----o----+ +-----o----+ +-----o----+
519 | Cell [A] | | Cell [B] | | Cell [C] |
520 +----------+ +----------+ +----------+
525 They pass through the cells successfully up to this point:
532 +---------------+---------------+
534 +-----o----+ +-----o----+ +-----o----+
535 | Cell [A] | | Cell [B] | | Cell [C] |
536 +----------+ +----------+ +----------+
541 At this point, session `B` produces an error while processing *m2*, that is *m2*
542 becomes *e2*. *m1* is still in the pipeline, and *m3* is queued behind *m2*.
543 What ought to happen is that *m1* is handed off to the socket, then *m2* is
544 released to the driver, which will detect the error and begin closing the
545 socket. No further processing should be done on *m3* and it should not be
546 released to the driver after the error is emitted.
548 To handle this, we allow errors to pass down the pipeline just like messages do,
549 to maintain ordering. But, once a cell sees its session produce an error, or it
550 receives an error from upstream, it should refuse to accept any further
551 messages. Session `B` might have begun processing *m3* by the time it produces
552 the error *e2*, but `C` will have been given *e2* before it receives *m3*, and
553 can simply drop *m3*.
555 Now, say *e2* reaches the slow session `C` while *m1* is still present,
556 meanwhile *m3* has been dropped. `C` will never receive *m3* since it will have
557 been dropped upstream. Under the present model, its `out` counter will be `3`
558 but it is only going to emit two more values: *m1* and *e2*. In order for
559 closing to work, we need to decrement `out` to reflect this. The situation
560 should look like this:
567 +---------------+---------------+
569 +-----o----+ +-----o----+ +-----o----+
570 | Cell [A] | | Cell [B] | | Cell [C] |
571 +----------+ +----------+ +----------+
576 When a cell sees its session emit an error, or when it receives an error from
577 upstream, it sets its pending count in the appropriate direction to equal the
578 number of messages it is *currently* processing. It will not accept any messages
579 after it sees the error, so this will allow the counter to reach zero.
581 Note that while *e2* is in the pipeline, `Pipeline` should drop any further
582 messages in the outgoing direction, but should continue to accept incoming
583 messages. Until *e2* makes it out of the pipe to the driver, behind previous
584 successful messages, the driver does not know an error has happened, and a
585 message may arrive over the socket and make it all the way through the incoming
586 pipe in the meantime. We only halt processing in the affected direction to avoid
587 doing unnecessary work since messages arriving after an error should not be
590 Some unnecessary work may happen, for example any messages already in the
591 pipeline following *m2* will be processed by `A`, since it's upstream of the
592 error. Those messages will be dropped by `B`.
597 I am considering implementing `Functor` as an object-mode transform stream
598 rather than what is essentially an async function. Being object-mode, a stream
599 would preserve message boundaries and would also possibly help address
600 back-pressure. I'm not sure whether this would require external API changes so
601 that such streams could be connected to the downstream driver's streams.
606 Credit is due to [@mnowster](https://github.com/mnowster) for helping with the
607 design and to [@fronx](https://github.com/fronx) for helping name things.