Initial commit
[yaffs-website] / node_modules / combined-stream / lib / combined_stream.js
1 var util = require('util');
2 var Stream = require('stream').Stream;
3 var DelayedStream = require('delayed-stream');
4
5 module.exports = CombinedStream;
6 function CombinedStream() {
7   this.writable = false;
8   this.readable = true;
9   this.dataSize = 0;
10   this.maxDataSize = 2 * 1024 * 1024;
11   this.pauseStreams = true;
12
13   this._released = false;
14   this._streams = [];
15   this._currentStream = null;
16 }
17 util.inherits(CombinedStream, Stream);
18
19 CombinedStream.create = function(options) {
20   var combinedStream = new this();
21
22   options = options || {};
23   for (var option in options) {
24     combinedStream[option] = options[option];
25   }
26
27   return combinedStream;
28 };
29
30 CombinedStream.isStreamLike = function(stream) {
31   return (typeof stream !== 'function')
32     && (typeof stream !== 'string')
33     && (typeof stream !== 'boolean')
34     && (typeof stream !== 'number')
35     && (!Buffer.isBuffer(stream));
36 };
37
38 CombinedStream.prototype.append = function(stream) {
39   var isStreamLike = CombinedStream.isStreamLike(stream);
40
41   if (isStreamLike) {
42     if (!(stream instanceof DelayedStream)) {
43       var newStream = DelayedStream.create(stream, {
44         maxDataSize: Infinity,
45         pauseStream: this.pauseStreams,
46       });
47       stream.on('data', this._checkDataSize.bind(this));
48       stream = newStream;
49     }
50
51     this._handleErrors(stream);
52
53     if (this.pauseStreams) {
54       stream.pause();
55     }
56   }
57
58   this._streams.push(stream);
59   return this;
60 };
61
62 CombinedStream.prototype.pipe = function(dest, options) {
63   Stream.prototype.pipe.call(this, dest, options);
64   this.resume();
65   return dest;
66 };
67
68 CombinedStream.prototype._getNext = function() {
69   this._currentStream = null;
70   var stream = this._streams.shift();
71
72
73   if (typeof stream == 'undefined') {
74     this.end();
75     return;
76   }
77
78   if (typeof stream !== 'function') {
79     this._pipeNext(stream);
80     return;
81   }
82
83   var getStream = stream;
84   getStream(function(stream) {
85     var isStreamLike = CombinedStream.isStreamLike(stream);
86     if (isStreamLike) {
87       stream.on('data', this._checkDataSize.bind(this));
88       this._handleErrors(stream);
89     }
90
91     this._pipeNext(stream);
92   }.bind(this));
93 };
94
95 CombinedStream.prototype._pipeNext = function(stream) {
96   this._currentStream = stream;
97
98   var isStreamLike = CombinedStream.isStreamLike(stream);
99   if (isStreamLike) {
100     stream.on('end', this._getNext.bind(this));
101     stream.pipe(this, {end: false});
102     return;
103   }
104
105   var value = stream;
106   this.write(value);
107   this._getNext();
108 };
109
110 CombinedStream.prototype._handleErrors = function(stream) {
111   var self = this;
112   stream.on('error', function(err) {
113     self._emitError(err);
114   });
115 };
116
117 CombinedStream.prototype.write = function(data) {
118   this.emit('data', data);
119 };
120
121 CombinedStream.prototype.pause = function() {
122   if (!this.pauseStreams) {
123     return;
124   }
125
126   if(this.pauseStreams && this._currentStream && typeof(this._currentStream.pause) == 'function') this._currentStream.pause();
127   this.emit('pause');
128 };
129
130 CombinedStream.prototype.resume = function() {
131   if (!this._released) {
132     this._released = true;
133     this.writable = true;
134     this._getNext();
135   }
136
137   if(this.pauseStreams && this._currentStream && typeof(this._currentStream.resume) == 'function') this._currentStream.resume();
138   this.emit('resume');
139 };
140
141 CombinedStream.prototype.end = function() {
142   this._reset();
143   this.emit('end');
144 };
145
146 CombinedStream.prototype.destroy = function() {
147   this._reset();
148   this.emit('close');
149 };
150
151 CombinedStream.prototype._reset = function() {
152   this.writable = false;
153   this._streams = [];
154   this._currentStream = null;
155 };
156
157 CombinedStream.prototype._checkDataSize = function() {
158   this._updateDataSize();
159   if (this.dataSize <= this.maxDataSize) {
160     return;
161   }
162
163   var message =
164     'DelayedStream#maxDataSize of ' + this.maxDataSize + ' bytes exceeded.';
165   this._emitError(new Error(message));
166 };
167
168 CombinedStream.prototype._updateDataSize = function() {
169   this.dataSize = 0;
170
171   var self = this;
172   this._streams.forEach(function(stream) {
173     if (!stream.dataSize) {
174       return;
175     }
176
177     self.dataSize += stream.dataSize;
178   });
179
180   if (this._currentStream && this._currentStream.dataSize) {
181     this.dataSize += this._currentStream.dataSize;
182   }
183 };
184
185 CombinedStream.prototype._emitError = function(err) {
186   this._reset();
187   this.emit('error', err);
188 };