Initial commit
[yaffs-website] / node_modules / websocket-extensions / lib / pipeline / cell.js
1 'use strict';
2
3 var Functor = require('./functor'),
4     Pledge  = require('./pledge');
5
6 var Cell = function(tuple) {
7   this._ext     = tuple[0];
8   this._session = tuple[1];
9
10   this._functors = {
11     incoming: new Functor(this._session, 'processIncomingMessage'),
12     outgoing: new Functor(this._session, 'processOutgoingMessage')
13   };
14 };
15
16 Cell.prototype.pending = function(direction) {
17   this._functors[direction].pending += 1;
18 };
19
20 Cell.prototype.incoming = function(error, message, callback, context) {
21   this._exec('incoming', error, message, callback, context);
22 };
23
24 Cell.prototype.outgoing = function(error, message, callback, context) {
25   this._exec('outgoing', error, message, callback, context);
26 };
27
28 Cell.prototype.close = function() {
29   this._closed = this._closed || new Pledge();
30   this._doClose();
31   return this._closed;
32 };
33
34 Cell.prototype._exec = function(direction, error, message, callback, context) {
35   this._functors[direction].call(error, message, function(err, msg) {
36     if (err) err.message = this._ext.name + ': ' + err.message;
37     callback.call(context, err, msg);
38     this._doClose();
39   }, this);
40 };
41
42 Cell.prototype._doClose = function() {
43   var fin  = this._functors.incoming,
44       fout = this._functors.outgoing;
45
46   if (!this._closed || fin.pending + fout.pending !== 0) return;
47   if (this._session) this._session.close();
48   this._session = null;
49   this._closed.done();
50 };
51
52 module.exports = Cell;