Initial commit
[yaffs-website] / node_modules / websocket-extensions / lib / pipeline / index.js
1 'use strict';
2
3 var Cell   = require('./cell'),
4     Pledge = require('./pledge');
5
6 var Pipeline = function(sessions) {
7   this._cells   = sessions.map(function(session) { return new Cell(session) });
8   this._stopped = {incoming: false, outgoing: false};
9 };
10
11 Pipeline.prototype.processIncomingMessage = function(message, callback, context) {
12   if (this._stopped.incoming) return;
13   this._loop('incoming', this._cells.length - 1, -1, -1, message, callback, context);
14 };
15
16 Pipeline.prototype.processOutgoingMessage = function(message, callback, context) {
17   if (this._stopped.outgoing) return;
18   this._loop('outgoing', 0, this._cells.length, 1, message, callback, context);
19 };
20
21 Pipeline.prototype.close = function(callback, context) {
22   this._stopped = {incoming: true, outgoing: true};
23
24   var closed = this._cells.map(function(a) { return a.close() });
25   if (callback)
26     Pledge.all(closed).then(function() { callback.call(context) });
27 };
28
29 Pipeline.prototype._loop = function(direction, start, end, step, message, callback, context) {
30   var cells = this._cells,
31       n     = cells.length,
32       self  = this;
33
34   while (n--) cells[n].pending(direction);
35
36   var pipe = function(index, error, msg) {
37     if (index === end) return callback.call(context, error, msg);
38
39     cells[index][direction](error, msg, function(err, m) {
40       if (err) self._stopped[direction] = true;
41       pipe(index + step, err, m);
42     });
43   };
44   pipe(start, null, message);
45 };
46
47 module.exports = Pipeline;