Initial commit
[yaffs-website] / node_modules / websocket-extensions / lib / pipeline / README.md
1 # Extension pipelining
2
3 `websocket-extensions` models the extension negotiation and processing pipeline
4 of the WebSocket protocol. Between the driver parsing messages from the TCP
5 stream and handing those messages off to the application, there may exist a
6 stack of extensions that transform the message somehow.
7
8 In the parlance of this framework, a *session* refers to a single instance of an
9 extension, acting on a particular socket on either the server or the client
10 side. A session may transform messages both incoming to the application and
11 outgoing from the application, for example the `permessage-deflate` extension
12 compresses outgoing messages and decompresses incoming messages. Message streams
13 in either direction are independent; that is, incoming and outgoing messages
14 cannot be assumed to 'pair up' as in a request-response protocol.
15
16 Asynchronous processing of messages poses a number of problems that this
17 pipeline construction is intended to solve.
18
19
20 ## Overview
21
22 Logically, we have the following:
23
24
25     +-------------+  out  +---+     +---+     +---+       +--------+
26     |             |------>|   |---->|   |---->|   |------>|        |
27     | Application |       | A |     | B |     | C |       | Driver |
28     |             |<------|   |<----|   |<----|   |<------|        |
29     +-------------+  in   +---+     +---+     +---+       +--------+
30
31                           \                       /
32                            +----------o----------+
33                                       |
34                                    sessions
35
36
37 For outgoing messages, the driver receives the result of
38
39         C.outgoing(B.outgoing(A.outgoing(message)))
40
41     or, [A, B, C].reduce(((m, ext) => ext.outgoing(m)), message)
42
43 For incoming messages, the application receives the result of
44
45         A.incoming(B.incoming(C.incoming(message)))
46
47     or, [C, B, A].reduce(((m, ext) => ext.incoming(m)), message)
48
49 A session is of the following type, to borrow notation from pseudo-Haskell:
50
51     type Session = {
52       incoming :: Message -> Message
53       outgoing :: Message -> Message
54       close    :: () -> ()
55     }
56
57 (That `() -> ()` syntax is intended to mean that `close()` is a nullary void
58 method; I apologise to any Haskell readers for not using the right monad.)
59
60 The `incoming()` and `outgoing()` methods perform message transformation in the
61 respective directions; `close()` is called when a socket closes so the session
62 can release any resources it's holding, for example a DEFLATE de/compression
63 context.
64
65 However because this is JavaScript, the `incoming()` and `outgoing()` methods
66 may be asynchronous (indeed, `permessage-deflate` is based on `zlib`, whose API
67 is stream-based). So their interface is strictly:
68
69     type Session = {
70       incoming :: Message -> Callback -> ()
71       outgoing :: Message -> Callback -> ()
72       close    :: () -> ()
73     }
74
75     type Callback = Either Error Message -> ()
76
77 This means a message *m2* can be pushed into a session while it's still
78 processing the preceding message *m1*. The messages can be processed
79 concurrently but they *must* be given to the next session in line (or to the
80 application) in the same order they came in. Applications will expect to receive
81 messages in the order they arrived over the wire, and sessions require this too.
82 So ordering of messages must be preserved throughout the pipeline.
83
84 Consider the following highly simplified extension that deflates messages on the
85 wire. `message` is a value conforming the type:
86
87     type Message = {
88       rsv1   :: Boolean
89       rsv2   :: Boolean
90       rsv3   :: Boolean
91       opcode :: Number
92       data   :: Buffer
93     }
94
95 Here's the extension:
96
97 ```js
98 var zlib = require('zlib');
99
100 var deflate = {
101   outgoing: function(message, callback) {
102     zlib.deflateRaw(message.data, function(error, result) {
103       message.rsv1 = true;
104       message.data = result;
105       callback(error, message);
106     });
107   },
108
109   incoming: function(message, callback) {
110     // decompress inbound messages (elided)
111   },
112
113   close: function() {
114     // no state to clean up
115   }
116 };
117 ```
118
119 We can call it with a large message followed by a small one, and the small one
120 will be returned first:
121
122 ```js
123 var crypto = require('crypto'),
124     large  = crypto.randomBytes(1 << 14),
125     small  = new Buffer('hi');
126
127 deflate.outgoing({data: large}, function() {
128   console.log(1, 'large');
129 });
130
131 deflate.outgoing({data: small}, function() {
132   console.log(2, 'small');
133 });
134
135 /* prints:  2 'small'
136             1 'large' */
137 ```
138
139 So a session that processes messages asynchronously may fail to preserve message
140 ordering.
141
142 Now, this extension is stateless, so it can process messages in any order and
143 still produce the same output. But some extensions are stateful and require
144 message order to be preserved.
145
146 For example, when using `permessage-deflate` without `no_context_takeover` set,
147 the session retains a DEFLATE de/compression context between messages, which
148 accumulates state as it consumes data (later messages can refer to sections of
149 previous ones to improve compression). Reordering parts of the DEFLATE stream
150 will result in a failed decompression. Messages must be decompressed in the same
151 order they were compressed by the peer in order for the DEFLATE protocol to
152 work.
153
154 Finally, there is the problem of closing a socket. When a WebSocket is closed by
155 the application, or receives a closing request from the other peer, there may be
156 messages outgoing from the application and incoming from the peer in the
157 pipeline. If we close the socket and pipeline immediately, two problems arise:
158
159 * We may send our own closing frame to the peer before all prior messages we
160   sent have been written to the socket, and before we have finished processing
161   all prior messages from the peer
162 * The session may be instructed to close its resources (e.g. its de/compression
163   context) while it's in the middle of processing a message, or before it has
164   received messages that are upstream of it in the pipeline
165
166 Essentially, we must defer closing the sessions and sending a closing frame
167 until after all prior messages have exited the pipeline.
168
169
170 ## Design goals
171
172 * Message order must be preserved between the protocol driver, the extension
173   sessions, and the application
174 * Messages should be handed off to sessions and endpoints as soon as possible,
175   to maximise throughput of stateless sessions
176 * The closing procedure should block any further messages from entering the
177   pipeline, and should allow all existing messages to drain
178 * Sessions should be closed as soon as possible to prevent them holding memory
179   and other resources when they have no more messages to handle
180 * The closing API should allow the caller to detect when the pipeline is empty
181   and it is safe to continue the WebSocket closing procedure
182 * Individual extensions should remain as simple as possible to facilitate
183   modularity and independent authorship
184
185 The final point about modularity is an important one: this framework is designed
186 to facilitate extensions existing as plugins, by decoupling the protocol driver,
187 extensions, and application. In an ideal world, plugins should only need to
188 contain code for their specific functionality, and not solve these problems that
189 apply to all sessions. Also, solving some of these problems requires
190 consideration of all active sessions collectively, which an individual session
191 is incapable of doing.
192
193 For example, it is entirely possible to take the simple `deflate` extension
194 above and wrap its `incoming()` and `outgoing()` methods in two `Transform`
195 streams, producing this type:
196
197     type Session = {
198       incoming :: TransformStream
199       outtoing :: TransformStream
200       close    :: () -> ()
201     }
202
203 The `Transform` class makes it easy to wrap an async function such that message
204 order is preserved:
205
206 ```js
207 var stream  = require('stream'),
208     session = new stream.Transform({objectMode: true});
209
210 session._transform = function(message, _, callback) {
211   var self = this;
212   deflate.outgoing(message, function(error, result) {
213     self.push(result);
214     callback();
215   });
216 };
217 ```
218
219 However, this has a negative impact on throughput: it works by deferring
220 `callback()` until the async function has 'returned', which blocks `Transform`
221 from passing further input into the `_transform()` method until the current
222 message is dealt with completely. This would prevent sessions from processing
223 messages concurrently, and would unnecessarily reduce the throughput of
224 stateless extensions.
225
226 So, input should be handed off to sessions as soon as possible, and all we need
227 is a mechanism to reorder the output so that message order is preserved for the
228 next session in line.
229
230
231 ## Solution
232
233 We now describe the model implemented here and how it meets the above design
234 goals. The above diagram where a stack of extensions sit between the driver and
235 application describes the data flow, but not the object graph. That looks like
236 this:
237
238
239             +--------+
240             | Driver |
241             +---o----+
242                 |
243                 V
244           +------------+      +----------+
245           | Extensions o----->| Pipeline |
246           +------------+      +-----o----+
247                                     |
248                     +---------------+---------------+
249                     |               |               |
250               +-----o----+    +-----o----+    +-----o----+
251               | Cell [A] |    | Cell [B] |    | Cell [C] |
252               +----------+    +----------+    +----------+
253
254
255 A driver using this framework holds an instance of the `Extensions` class, which
256 it uses to register extension plugins, negotiate headers and transform messages.
257 The `Extensions` instance itself holds a `Pipeline`, which contains an array of
258 `Cell` objects, each of which wraps one of the sessions.
259
260
261 ### Message processing
262
263 Both the `Pipeline` and `Cell` classes have `incoming()` and `outgoing()`
264 methods; the `Pipeline` interface pushes messages into the pipe, delegates the
265 message to each `Cell` in turn, then returns it back to the driver. Outgoing
266 messages pass through `A` then `B` then `C`, and incoming messages in the
267 reverse order.
268
269 Internally, a `Cell` contains two `Functor` objects. A `Functor` wraps an async
270 function and makes sure its output messages maintain the order of its input
271 messages. This name is due to [@fronx](https://github.com/fronx), on the basis
272 that, by preserving message order, the abstraction preserves the *mapping*
273 between input and output messages. To use our simple `deflate` extension from
274 above:
275
276 ```js
277 var functor = new Functor(deflate, 'outgoing');
278
279 functor.call({data: large}, function() {
280   console.log(1, 'large');
281 });
282
283 functor.call({data: small}, function() {
284   console.log(2, 'small');
285 });
286
287 /*  ->  1 'large'
288         2 'small' */
289 ```
290
291 A `Cell` contains two of these, one for each direction:
292
293
294                             +-----------------------+
295                       +---->| Functor [A, incoming] |
296     +----------+      |     +-----------------------+
297     | Cell [A] o------+
298     +----------+      |     +-----------------------+
299                       +---->| Functor [A, outgoing] |
300                             +-----------------------+
301
302
303 This satisfies the message transformation requirements: the `Pipeline` simply
304 loops over the cells in the appropriate direction to transform each message.
305 Because each `Cell` will preserve message order, we can pass a message to the
306 next `Cell` in line as soon as the current `Cell` returns it. This gives each
307 `Cell` all the messages in order while maximising throughput.
308
309
310 ### Session closing
311
312 We want to close each session as soon as possible, after all existing messages
313 have drained. To do this, each `Cell` begins with a pending message counter in
314 each direction, labelled `in` and `out` below.
315
316
317                               +----------+
318                               | Pipeline |
319                               +-----o----+
320                                     |
321                     +---------------+---------------+
322                     |               |               |
323               +-----o----+    +-----o----+    +-----o----+
324               | Cell [A] |    | Cell [B] |    | Cell [C] |
325               +----------+    +----------+    +----------+
326                  in: 0           in: 0           in: 0
327                 out: 0          out: 0          out: 0
328
329
330 When a message *m1* enters the pipeline, say in the `outgoing` direction, we
331 increment the `pending.out` counter on all cells immediately.
332
333
334                               +----------+
335                         m1 => | Pipeline |
336                               +-----o----+
337                                     |
338                     +---------------+---------------+
339                     |               |               |
340               +-----o----+    +-----o----+    +-----o----+
341               | Cell [A] |    | Cell [B] |    | Cell [C] |
342               +----------+    +----------+    +----------+
343                  in: 0           in: 0           in: 0
344                 out: 1          out: 1          out: 1
345
346
347 *m1* is handed off to `A`, meanwhile a second message `m2` arrives in the same
348 direction. All `pending.out` counters are again incremented.
349
350
351                               +----------+
352                         m2 => | Pipeline |
353                               +-----o----+
354                                     |
355                     +---------------+---------------+
356                 m1  |               |               |
357               +-----o----+    +-----o----+    +-----o----+
358               | Cell [A] |    | Cell [B] |    | Cell [C] |
359               +----------+    +----------+    +----------+
360                  in: 0           in: 0           in: 0
361                 out: 2          out: 2          out: 2
362
363
364 When the first cell's `A.outgoing` functor finishes processing *m1*, the first
365 `pending.out` counter is decremented and *m1* is handed off to cell `B`.
366
367
368                               +----------+
369                               | Pipeline |
370                               +-----o----+
371                                     |
372                     +---------------+---------------+
373                 m2  |           m1  |               |
374               +-----o----+    +-----o----+    +-----o----+
375               | Cell [A] |    | Cell [B] |    | Cell [C] |
376               +----------+    +----------+    +----------+
377                  in: 0           in: 0           in: 0
378                 out: 1          out: 2          out: 2
379
380
381
382 As `B` finishes with *m1*, and as `A` finishes with *m2*, the `pending.out`
383 counters continue to decrement.
384
385
386                               +----------+
387                               | Pipeline |
388                               +-----o----+
389                                     |
390                     +---------------+---------------+
391                     |           m2  |           m1  |
392               +-----o----+    +-----o----+    +-----o----+
393               | Cell [A] |    | Cell [B] |    | Cell [C] |
394               +----------+    +----------+    +----------+
395                  in: 0           in: 0           in: 0
396                 out: 0          out: 1          out: 2
397
398
399
400 Say `C` is a little slow, and begins processing *m2* while still processing
401 *m1*. That's fine, the `Functor` mechanism will keep *m1* ahead of *m2* in the
402 output.
403
404
405                               +----------+
406                               | Pipeline |
407                               +-----o----+
408                                     |
409                     +---------------+---------------+
410                     |               |           m2  | m1
411               +-----o----+    +-----o----+    +-----o----+
412               | Cell [A] |    | Cell [B] |    | Cell [C] |
413               +----------+    +----------+    +----------+
414                  in: 0           in: 0           in: 0
415                 out: 0          out: 0          out: 2
416
417
418 Once all messages are dealt with, the counters return to `0`.
419
420
421                               +----------+
422                               | Pipeline |
423                               +-----o----+
424                                     |
425                     +---------------+---------------+
426                     |               |               |
427               +-----o----+    +-----o----+    +-----o----+
428               | Cell [A] |    | Cell [B] |    | Cell [C] |
429               +----------+    +----------+    +----------+
430                  in: 0           in: 0           in: 0
431                 out: 0          out: 0          out: 0
432
433
434 The same process applies in the `incoming` direction, the only difference being
435 that messages are passed to `C` first.
436
437 This makes closing the sessions quite simple. When the driver wants to close the
438 socket, it calls `Pipeline.close()`. This *immediately* calls `close()` on all
439 the cells. If a cell has `in == out == 0`, then it immediately calls
440 `session.close()`. Otherwise, it stores the closing call and defers it until
441 `in` and `out` have both ticked down to zero. The pipeline will not accept new
442 messages after `close()` has been called, so we know the pending counts will not
443 increase after this point.
444
445 This means each session is closed as soon as possible: `A` can close while the
446 slow `C` session is still working, because it knows there are no more messages
447 on the way. Similarly, `C` will defer closing if `close()` is called while *m1*
448 is still in `B`, and *m2* in `A`, because its pending count means it knows it
449 has work yet to do, even if it's not received those messages yet. This concern
450 cannot be addressed by extensions acting only on their own local state, unless
451 we pollute individual extensions by making them all implement this same
452 mechanism.
453
454 The actual closing API at each level is slightly different:
455
456     type Session = {
457       close :: () -> ()
458     }
459
460     type Cell = {
461       close :: () -> Promise ()
462     }
463
464     type Pipeline = {
465       close :: Callback -> ()
466     }
467
468 This might appear inconsistent so it's worth explaining. Remember that a
469 `Pipeline` holds a list of `Cell` objects, each wrapping a `Session`. The driver
470 talks (via the `Extensions` API) to the `Pipeline` interface, and it wants
471 `Pipeline.close()` to do two things: close all the sessions, and tell me when
472 it's safe to start the closing procedure (i.e. when all messages have drained
473 from the pipe and been handed off to the application or socket). A callback API
474 works well for that.
475
476 At the other end of the stack, `Session.close()` is a nullary void method with
477 no callback or promise API because we don't care what it does, and whatever it
478 does do will not block the WebSocket protocol; we're not going to hold off
479 processing messages while a session closes its de/compression context. We just
480 tell it to close itself, and don't want to wait while it does that.
481
482 In the middle, `Cell.close()` returns a promise rather than using a callback.
483 This is for two reasons. First, `Cell.close()` might not do anything
484 immediately, it might have to defer its effect while messages drain. So, if
485 given a callback, it would have to store it in a queue for later execution.
486 Callbacks work fine if your method does something and can then invoke the
487 callback itself, but if you need to store callbacks somewhere so another method
488 can execute them, a promise is a better fit. Second, it better serves the
489 purposes of `Pipeline.close()`: it wants to call `close()` on each of a list of
490 cells, and wait for all of them to finish. This is simple and idiomatic using
491 promises:
492
493 ```js
494 var closed = cells.map((cell) => cell.close());
495 Promise.all(closed).then(callback);
496 ```
497
498 (We don't actually use a full *Promises/A+* compatible promise here, we use a
499 much simplified construction that acts as a callback aggregater and resolves
500 synchronously and does not support chaining, but the principle is the same.)
501
502
503 ### Error handling
504
505 We've not mentioned error handling so far but it bears some explanation. The
506 above counter system still applies, but behaves slightly differently in the
507 presence of errors.
508
509 Say we push three messages into the pipe in the outgoing direction:
510
511
512                               +----------+
513                 m3, m2, m1 => | Pipeline |
514                               +-----o----+
515                                     |
516                     +---------------+---------------+
517                     |               |               |
518               +-----o----+    +-----o----+    +-----o----+
519               | Cell [A] |    | Cell [B] |    | Cell [C] |
520               +----------+    +----------+    +----------+
521                  in: 0           in: 0           in: 0
522                 out: 3          out: 3          out: 3
523
524
525 They pass through the cells successfully up to this point:
526
527
528                               +----------+
529                               | Pipeline |
530                               +-----o----+
531                                     |
532                     +---------------+---------------+
533                 m3  |           m2  |           m1  |
534               +-----o----+    +-----o----+    +-----o----+
535               | Cell [A] |    | Cell [B] |    | Cell [C] |
536               +----------+    +----------+    +----------+
537                  in: 0           in: 0           in: 0
538                 out: 1          out: 2          out: 3
539
540
541 At this point, session `B` produces an error while processing *m2*, that is *m2*
542 becomes *e2*. *m1* is still in the pipeline, and *m3* is queued behind *m2*.
543 What ought to happen is that *m1* is handed off to the socket, then *m2* is
544 released to the driver, which will detect the error and begin closing the
545 socket. No further processing should be done on *m3* and it should not be
546 released to the driver after the error is emitted.
547
548 To handle this, we allow errors to pass down the pipeline just like messages do,
549 to maintain ordering. But, once a cell sees its session produce an error, or it
550 receives an error from upstream, it should refuse to accept any further
551 messages. Session `B` might have begun processing *m3* by the time it produces
552 the error *e2*, but `C` will have been given *e2* before it receives *m3*, and
553 can simply drop *m3*.
554
555 Now, say *e2* reaches the slow session `C` while *m1* is still present,
556 meanwhile *m3* has been dropped. `C` will never receive *m3* since it will have
557 been dropped upstream. Under the present model, its `out` counter will be `3`
558 but it is only going to emit two more values: *m1* and *e2*. In order for
559 closing to work, we need to decrement `out` to reflect this. The situation
560 should look like this:
561
562
563                               +----------+
564                               | Pipeline |
565                               +-----o----+
566                                     |
567                     +---------------+---------------+
568                     |               |           e2  | m1
569               +-----o----+    +-----o----+    +-----o----+
570               | Cell [A] |    | Cell [B] |    | Cell [C] |
571               +----------+    +----------+    +----------+
572                  in: 0           in: 0           in: 0
573                 out: 0          out: 0          out: 2
574
575
576 When a cell sees its session emit an error, or when it receives an error from
577 upstream, it sets its pending count in the appropriate direction to equal the
578 number of messages it is *currently* processing. It will not accept any messages
579 after it sees the error, so this will allow the counter to reach zero.
580
581 Note that while *e2* is in the pipeline, `Pipeline` should drop any further
582 messages in the outgoing direction, but should continue to accept incoming
583 messages. Until *e2* makes it out of the pipe to the driver, behind previous
584 successful messages, the driver does not know an error has happened, and a
585 message may arrive over the socket and make it all the way through the incoming
586 pipe in the meantime. We only halt processing in the affected direction to avoid
587 doing unnecessary work since messages arriving after an error should not be
588 processed.
589
590 Some unnecessary work may happen, for example any messages already in the
591 pipeline following *m2* will be processed by `A`, since it's upstream of the
592 error. Those messages will be dropped by `B`.
593
594
595 ## Alternative ideas
596
597 I am considering implementing `Functor` as an object-mode transform stream
598 rather than what is essentially an async function. Being object-mode, a stream
599 would preserve message boundaries and would also possibly help address
600 back-pressure. I'm not sure whether this would require external API changes so
601 that such streams could be connected to the downstream driver's streams.
602
603
604 ## Acknowledgements
605
606 Credit is due to [@mnowster](https://github.com/mnowster) for helping with the
607 design and to [@fronx](https://github.com/fronx) for helping name things.