Initial commit
[yaffs-website] / node_modules / websocket-extensions / lib / pipeline / functor.js
1 'use strict';
2
3 var RingBuffer = require('./ring_buffer');
4
5 var Functor = function(session, method) {
6   this._session = session;
7   this._method  = method;
8   this._queue   = new RingBuffer(Functor.QUEUE_SIZE);
9   this._stopped = false;
10   this.pending  = 0;
11 };
12
13 Functor.QUEUE_SIZE = 8;
14
15 Functor.prototype.call = function(error, message, callback, context) {
16   if (this._stopped) return;
17
18   var record = {error: error, message: message, callback: callback, context: context, done: false},
19       called = false,
20       self   = this;
21
22   this._queue.push(record);
23
24   if (record.error) {
25     record.done = true;
26     this._stop();
27     return this._flushQueue();
28   }
29
30   this._session[this._method](message, function(err, msg) {
31     if (!(called ^ (called = true))) return;
32
33     if (err) {
34       self._stop();
35       record.error   = err;
36       record.message = null;
37     } else {
38       record.message = msg;
39     }
40
41     record.done = true;
42     self._flushQueue();
43   });
44 };
45
46 Functor.prototype._stop = function() {
47   this.pending  = this._queue.length;
48   this._stopped = true;
49 };
50
51 Functor.prototype._flushQueue = function() {
52   var queue = this._queue, record;
53
54   while (queue.length > 0 && queue.peek().done) {
55     this.pending -= 1;
56     record = queue.shift();
57     record.callback.call(record.context, record.error, record.message);
58   }
59 };
60
61 module.exports = Functor;