Version 1
[yaffs-website] / node_modules / concat-stream / node_modules / readable-stream / lib / _stream_readable.js
1 'use strict';
2
3 module.exports = Readable;
4
5 /*<replacement>*/
6 var processNextTick = require('process-nextick-args');
7 /*</replacement>*/
8
9 /*<replacement>*/
10 var isArray = require('isarray');
11 /*</replacement>*/
12
13 /*<replacement>*/
14 var Buffer = require('buffer').Buffer;
15 /*</replacement>*/
16
17 Readable.ReadableState = ReadableState;
18
19 var EE = require('events');
20
21 /*<replacement>*/
22 var EElistenerCount = function (emitter, type) {
23   return emitter.listeners(type).length;
24 };
25 /*</replacement>*/
26
27 /*<replacement>*/
28 var Stream;
29 (function () {
30   try {
31     Stream = require('st' + 'ream');
32   } catch (_) {} finally {
33     if (!Stream) Stream = require('events').EventEmitter;
34   }
35 })();
36 /*</replacement>*/
37
38 var Buffer = require('buffer').Buffer;
39
40 /*<replacement>*/
41 var util = require('core-util-is');
42 util.inherits = require('inherits');
43 /*</replacement>*/
44
45 /*<replacement>*/
46 var debugUtil = require('util');
47 var debug = undefined;
48 if (debugUtil && debugUtil.debuglog) {
49   debug = debugUtil.debuglog('stream');
50 } else {
51   debug = function () {};
52 }
53 /*</replacement>*/
54
55 var StringDecoder;
56
57 util.inherits(Readable, Stream);
58
59 var Duplex;
60 function ReadableState(options, stream) {
61   Duplex = Duplex || require('./_stream_duplex');
62
63   options = options || {};
64
65   // object stream flag. Used to make read(n) ignore n and to
66   // make all the buffer merging and length checks go away
67   this.objectMode = !!options.objectMode;
68
69   if (stream instanceof Duplex) this.objectMode = this.objectMode || !!options.readableObjectMode;
70
71   // the point at which it stops calling _read() to fill the buffer
72   // Note: 0 is a valid value, means "don't call _read preemptively ever"
73   var hwm = options.highWaterMark;
74   var defaultHwm = this.objectMode ? 16 : 16 * 1024;
75   this.highWaterMark = hwm || hwm === 0 ? hwm : defaultHwm;
76
77   // cast to ints.
78   this.highWaterMark = ~ ~this.highWaterMark;
79
80   this.buffer = [];
81   this.length = 0;
82   this.pipes = null;
83   this.pipesCount = 0;
84   this.flowing = null;
85   this.ended = false;
86   this.endEmitted = false;
87   this.reading = false;
88
89   // a flag to be able to tell if the onwrite cb is called immediately,
90   // or on a later tick.  We set this to true at first, because any
91   // actions that shouldn't happen until "later" should generally also
92   // not happen before the first write call.
93   this.sync = true;
94
95   // whenever we return null, then we set a flag to say
96   // that we're awaiting a 'readable' event emission.
97   this.needReadable = false;
98   this.emittedReadable = false;
99   this.readableListening = false;
100   this.resumeScheduled = false;
101
102   // Crypto is kind of old and crusty.  Historically, its default string
103   // encoding is 'binary' so we have to make this configurable.
104   // Everything else in the universe uses 'utf8', though.
105   this.defaultEncoding = options.defaultEncoding || 'utf8';
106
107   // when piping, we only care about 'readable' events that happen
108   // after read()ing all the bytes and not getting any pushback.
109   this.ranOut = false;
110
111   // the number of writers that are awaiting a drain event in .pipe()s
112   this.awaitDrain = 0;
113
114   // if true, a maybeReadMore has been scheduled
115   this.readingMore = false;
116
117   this.decoder = null;
118   this.encoding = null;
119   if (options.encoding) {
120     if (!StringDecoder) StringDecoder = require('string_decoder/').StringDecoder;
121     this.decoder = new StringDecoder(options.encoding);
122     this.encoding = options.encoding;
123   }
124 }
125
126 var Duplex;
127 function Readable(options) {
128   Duplex = Duplex || require('./_stream_duplex');
129
130   if (!(this instanceof Readable)) return new Readable(options);
131
132   this._readableState = new ReadableState(options, this);
133
134   // legacy
135   this.readable = true;
136
137   if (options && typeof options.read === 'function') this._read = options.read;
138
139   Stream.call(this);
140 }
141
142 // Manually shove something into the read() buffer.
143 // This returns true if the highWaterMark has not been hit yet,
144 // similar to how Writable.write() returns true if you should
145 // write() some more.
146 Readable.prototype.push = function (chunk, encoding) {
147   var state = this._readableState;
148
149   if (!state.objectMode && typeof chunk === 'string') {
150     encoding = encoding || state.defaultEncoding;
151     if (encoding !== state.encoding) {
152       chunk = new Buffer(chunk, encoding);
153       encoding = '';
154     }
155   }
156
157   return readableAddChunk(this, state, chunk, encoding, false);
158 };
159
160 // Unshift should *always* be something directly out of read()
161 Readable.prototype.unshift = function (chunk) {
162   var state = this._readableState;
163   return readableAddChunk(this, state, chunk, '', true);
164 };
165
166 Readable.prototype.isPaused = function () {
167   return this._readableState.flowing === false;
168 };
169
170 function readableAddChunk(stream, state, chunk, encoding, addToFront) {
171   var er = chunkInvalid(state, chunk);
172   if (er) {
173     stream.emit('error', er);
174   } else if (chunk === null) {
175     state.reading = false;
176     onEofChunk(stream, state);
177   } else if (state.objectMode || chunk && chunk.length > 0) {
178     if (state.ended && !addToFront) {
179       var e = new Error('stream.push() after EOF');
180       stream.emit('error', e);
181     } else if (state.endEmitted && addToFront) {
182       var e = new Error('stream.unshift() after end event');
183       stream.emit('error', e);
184     } else {
185       var skipAdd;
186       if (state.decoder && !addToFront && !encoding) {
187         chunk = state.decoder.write(chunk);
188         skipAdd = !state.objectMode && chunk.length === 0;
189       }
190
191       if (!addToFront) state.reading = false;
192
193       // Don't add to the buffer if we've decoded to an empty string chunk and
194       // we're not in object mode
195       if (!skipAdd) {
196         // if we want the data now, just emit it.
197         if (state.flowing && state.length === 0 && !state.sync) {
198           stream.emit('data', chunk);
199           stream.read(0);
200         } else {
201           // update the buffer info.
202           state.length += state.objectMode ? 1 : chunk.length;
203           if (addToFront) state.buffer.unshift(chunk);else state.buffer.push(chunk);
204
205           if (state.needReadable) emitReadable(stream);
206         }
207       }
208
209       maybeReadMore(stream, state);
210     }
211   } else if (!addToFront) {
212     state.reading = false;
213   }
214
215   return needMoreData(state);
216 }
217
218 // if it's past the high water mark, we can push in some more.
219 // Also, if we have no data yet, we can stand some
220 // more bytes.  This is to work around cases where hwm=0,
221 // such as the repl.  Also, if the push() triggered a
222 // readable event, and the user called read(largeNumber) such that
223 // needReadable was set, then we ought to push more, so that another
224 // 'readable' event will be triggered.
225 function needMoreData(state) {
226   return !state.ended && (state.needReadable || state.length < state.highWaterMark || state.length === 0);
227 }
228
229 // backwards compatibility.
230 Readable.prototype.setEncoding = function (enc) {
231   if (!StringDecoder) StringDecoder = require('string_decoder/').StringDecoder;
232   this._readableState.decoder = new StringDecoder(enc);
233   this._readableState.encoding = enc;
234   return this;
235 };
236
237 // Don't raise the hwm > 8MB
238 var MAX_HWM = 0x800000;
239 function computeNewHighWaterMark(n) {
240   if (n >= MAX_HWM) {
241     n = MAX_HWM;
242   } else {
243     // Get the next highest power of 2
244     n--;
245     n |= n >>> 1;
246     n |= n >>> 2;
247     n |= n >>> 4;
248     n |= n >>> 8;
249     n |= n >>> 16;
250     n++;
251   }
252   return n;
253 }
254
255 function howMuchToRead(n, state) {
256   if (state.length === 0 && state.ended) return 0;
257
258   if (state.objectMode) return n === 0 ? 0 : 1;
259
260   if (n === null || isNaN(n)) {
261     // only flow one buffer at a time
262     if (state.flowing && state.buffer.length) return state.buffer[0].length;else return state.length;
263   }
264
265   if (n <= 0) return 0;
266
267   // If we're asking for more than the target buffer level,
268   // then raise the water mark.  Bump up to the next highest
269   // power of 2, to prevent increasing it excessively in tiny
270   // amounts.
271   if (n > state.highWaterMark) state.highWaterMark = computeNewHighWaterMark(n);
272
273   // don't have that much.  return null, unless we've ended.
274   if (n > state.length) {
275     if (!state.ended) {
276       state.needReadable = true;
277       return 0;
278     } else {
279       return state.length;
280     }
281   }
282
283   return n;
284 }
285
286 // you can override either this method, or the async _read(n) below.
287 Readable.prototype.read = function (n) {
288   debug('read', n);
289   var state = this._readableState;
290   var nOrig = n;
291
292   if (typeof n !== 'number' || n > 0) state.emittedReadable = false;
293
294   // if we're doing read(0) to trigger a readable event, but we
295   // already have a bunch of data in the buffer, then just trigger
296   // the 'readable' event and move on.
297   if (n === 0 && state.needReadable && (state.length >= state.highWaterMark || state.ended)) {
298     debug('read: emitReadable', state.length, state.ended);
299     if (state.length === 0 && state.ended) endReadable(this);else emitReadable(this);
300     return null;
301   }
302
303   n = howMuchToRead(n, state);
304
305   // if we've ended, and we're now clear, then finish it up.
306   if (n === 0 && state.ended) {
307     if (state.length === 0) endReadable(this);
308     return null;
309   }
310
311   // All the actual chunk generation logic needs to be
312   // *below* the call to _read.  The reason is that in certain
313   // synthetic stream cases, such as passthrough streams, _read
314   // may be a completely synchronous operation which may change
315   // the state of the read buffer, providing enough data when
316   // before there was *not* enough.
317   //
318   // So, the steps are:
319   // 1. Figure out what the state of things will be after we do
320   // a read from the buffer.
321   //
322   // 2. If that resulting state will trigger a _read, then call _read.
323   // Note that this may be asynchronous, or synchronous.  Yes, it is
324   // deeply ugly to write APIs this way, but that still doesn't mean
325   // that the Readable class should behave improperly, as streams are
326   // designed to be sync/async agnostic.
327   // Take note if the _read call is sync or async (ie, if the read call
328   // has returned yet), so that we know whether or not it's safe to emit
329   // 'readable' etc.
330   //
331   // 3. Actually pull the requested chunks out of the buffer and return.
332
333   // if we need a readable event, then we need to do some reading.
334   var doRead = state.needReadable;
335   debug('need readable', doRead);
336
337   // if we currently have less than the highWaterMark, then also read some
338   if (state.length === 0 || state.length - n < state.highWaterMark) {
339     doRead = true;
340     debug('length less than watermark', doRead);
341   }
342
343   // however, if we've ended, then there's no point, and if we're already
344   // reading, then it's unnecessary.
345   if (state.ended || state.reading) {
346     doRead = false;
347     debug('reading or ended', doRead);
348   }
349
350   if (doRead) {
351     debug('do read');
352     state.reading = true;
353     state.sync = true;
354     // if the length is currently zero, then we *need* a readable event.
355     if (state.length === 0) state.needReadable = true;
356     // call internal read method
357     this._read(state.highWaterMark);
358     state.sync = false;
359   }
360
361   // If _read pushed data synchronously, then `reading` will be false,
362   // and we need to re-evaluate how much data we can return to the user.
363   if (doRead && !state.reading) n = howMuchToRead(nOrig, state);
364
365   var ret;
366   if (n > 0) ret = fromList(n, state);else ret = null;
367
368   if (ret === null) {
369     state.needReadable = true;
370     n = 0;
371   }
372
373   state.length -= n;
374
375   // If we have nothing in the buffer, then we want to know
376   // as soon as we *do* get something into the buffer.
377   if (state.length === 0 && !state.ended) state.needReadable = true;
378
379   // If we tried to read() past the EOF, then emit end on the next tick.
380   if (nOrig !== n && state.ended && state.length === 0) endReadable(this);
381
382   if (ret !== null) this.emit('data', ret);
383
384   return ret;
385 };
386
387 function chunkInvalid(state, chunk) {
388   var er = null;
389   if (!Buffer.isBuffer(chunk) && typeof chunk !== 'string' && chunk !== null && chunk !== undefined && !state.objectMode) {
390     er = new TypeError('Invalid non-string/buffer chunk');
391   }
392   return er;
393 }
394
395 function onEofChunk(stream, state) {
396   if (state.ended) return;
397   if (state.decoder) {
398     var chunk = state.decoder.end();
399     if (chunk && chunk.length) {
400       state.buffer.push(chunk);
401       state.length += state.objectMode ? 1 : chunk.length;
402     }
403   }
404   state.ended = true;
405
406   // emit 'readable' now to make sure it gets picked up.
407   emitReadable(stream);
408 }
409
410 // Don't emit readable right away in sync mode, because this can trigger
411 // another read() call => stack overflow.  This way, it might trigger
412 // a nextTick recursion warning, but that's not so bad.
413 function emitReadable(stream) {
414   var state = stream._readableState;
415   state.needReadable = false;
416   if (!state.emittedReadable) {
417     debug('emitReadable', state.flowing);
418     state.emittedReadable = true;
419     if (state.sync) processNextTick(emitReadable_, stream);else emitReadable_(stream);
420   }
421 }
422
423 function emitReadable_(stream) {
424   debug('emit readable');
425   stream.emit('readable');
426   flow(stream);
427 }
428
429 // at this point, the user has presumably seen the 'readable' event,
430 // and called read() to consume some data.  that may have triggered
431 // in turn another _read(n) call, in which case reading = true if
432 // it's in progress.
433 // However, if we're not ended, or reading, and the length < hwm,
434 // then go ahead and try to read some more preemptively.
435 function maybeReadMore(stream, state) {
436   if (!state.readingMore) {
437     state.readingMore = true;
438     processNextTick(maybeReadMore_, stream, state);
439   }
440 }
441
442 function maybeReadMore_(stream, state) {
443   var len = state.length;
444   while (!state.reading && !state.flowing && !state.ended && state.length < state.highWaterMark) {
445     debug('maybeReadMore read 0');
446     stream.read(0);
447     if (len === state.length)
448       // didn't get any data, stop spinning.
449       break;else len = state.length;
450   }
451   state.readingMore = false;
452 }
453
454 // abstract method.  to be overridden in specific implementation classes.
455 // call cb(er, data) where data is <= n in length.
456 // for virtual (non-string, non-buffer) streams, "length" is somewhat
457 // arbitrary, and perhaps not very meaningful.
458 Readable.prototype._read = function (n) {
459   this.emit('error', new Error('not implemented'));
460 };
461
462 Readable.prototype.pipe = function (dest, pipeOpts) {
463   var src = this;
464   var state = this._readableState;
465
466   switch (state.pipesCount) {
467     case 0:
468       state.pipes = dest;
469       break;
470     case 1:
471       state.pipes = [state.pipes, dest];
472       break;
473     default:
474       state.pipes.push(dest);
475       break;
476   }
477   state.pipesCount += 1;
478   debug('pipe count=%d opts=%j', state.pipesCount, pipeOpts);
479
480   var doEnd = (!pipeOpts || pipeOpts.end !== false) && dest !== process.stdout && dest !== process.stderr;
481
482   var endFn = doEnd ? onend : cleanup;
483   if (state.endEmitted) processNextTick(endFn);else src.once('end', endFn);
484
485   dest.on('unpipe', onunpipe);
486   function onunpipe(readable) {
487     debug('onunpipe');
488     if (readable === src) {
489       cleanup();
490     }
491   }
492
493   function onend() {
494     debug('onend');
495     dest.end();
496   }
497
498   // when the dest drains, it reduces the awaitDrain counter
499   // on the source.  This would be more elegant with a .once()
500   // handler in flow(), but adding and removing repeatedly is
501   // too slow.
502   var ondrain = pipeOnDrain(src);
503   dest.on('drain', ondrain);
504
505   var cleanedUp = false;
506   function cleanup() {
507     debug('cleanup');
508     // cleanup event handlers once the pipe is broken
509     dest.removeListener('close', onclose);
510     dest.removeListener('finish', onfinish);
511     dest.removeListener('drain', ondrain);
512     dest.removeListener('error', onerror);
513     dest.removeListener('unpipe', onunpipe);
514     src.removeListener('end', onend);
515     src.removeListener('end', cleanup);
516     src.removeListener('data', ondata);
517
518     cleanedUp = true;
519
520     // if the reader is waiting for a drain event from this
521     // specific writer, then it would cause it to never start
522     // flowing again.
523     // So, if this is awaiting a drain, then we just call it now.
524     // If we don't know, then assume that we are waiting for one.
525     if (state.awaitDrain && (!dest._writableState || dest._writableState.needDrain)) ondrain();
526   }
527
528   src.on('data', ondata);
529   function ondata(chunk) {
530     debug('ondata');
531     var ret = dest.write(chunk);
532     if (false === ret) {
533       // If the user unpiped during `dest.write()`, it is possible
534       // to get stuck in a permanently paused state if that write
535       // also returned false.
536       if (state.pipesCount === 1 && state.pipes[0] === dest && src.listenerCount('data') === 1 && !cleanedUp) {
537         debug('false write response, pause', src._readableState.awaitDrain);
538         src._readableState.awaitDrain++;
539       }
540       src.pause();
541     }
542   }
543
544   // if the dest has an error, then stop piping into it.
545   // however, don't suppress the throwing behavior for this.
546   function onerror(er) {
547     debug('onerror', er);
548     unpipe();
549     dest.removeListener('error', onerror);
550     if (EElistenerCount(dest, 'error') === 0) dest.emit('error', er);
551   }
552   // This is a brutally ugly hack to make sure that our error handler
553   // is attached before any userland ones.  NEVER DO THIS.
554   if (!dest._events || !dest._events.error) dest.on('error', onerror);else if (isArray(dest._events.error)) dest._events.error.unshift(onerror);else dest._events.error = [onerror, dest._events.error];
555
556   // Both close and finish should trigger unpipe, but only once.
557   function onclose() {
558     dest.removeListener('finish', onfinish);
559     unpipe();
560   }
561   dest.once('close', onclose);
562   function onfinish() {
563     debug('onfinish');
564     dest.removeListener('close', onclose);
565     unpipe();
566   }
567   dest.once('finish', onfinish);
568
569   function unpipe() {
570     debug('unpipe');
571     src.unpipe(dest);
572   }
573
574   // tell the dest that it's being piped to
575   dest.emit('pipe', src);
576
577   // start the flow if it hasn't been started already.
578   if (!state.flowing) {
579     debug('pipe resume');
580     src.resume();
581   }
582
583   return dest;
584 };
585
586 function pipeOnDrain(src) {
587   return function () {
588     var state = src._readableState;
589     debug('pipeOnDrain', state.awaitDrain);
590     if (state.awaitDrain) state.awaitDrain--;
591     if (state.awaitDrain === 0 && EElistenerCount(src, 'data')) {
592       state.flowing = true;
593       flow(src);
594     }
595   };
596 }
597
598 Readable.prototype.unpipe = function (dest) {
599   var state = this._readableState;
600
601   // if we're not piping anywhere, then do nothing.
602   if (state.pipesCount === 0) return this;
603
604   // just one destination.  most common case.
605   if (state.pipesCount === 1) {
606     // passed in one, but it's not the right one.
607     if (dest && dest !== state.pipes) return this;
608
609     if (!dest) dest = state.pipes;
610
611     // got a match.
612     state.pipes = null;
613     state.pipesCount = 0;
614     state.flowing = false;
615     if (dest) dest.emit('unpipe', this);
616     return this;
617   }
618
619   // slow case. multiple pipe destinations.
620
621   if (!dest) {
622     // remove all.
623     var dests = state.pipes;
624     var len = state.pipesCount;
625     state.pipes = null;
626     state.pipesCount = 0;
627     state.flowing = false;
628
629     for (var _i = 0; _i < len; _i++) {
630       dests[_i].emit('unpipe', this);
631     }return this;
632   }
633
634   // try to find the right one.
635   var i = indexOf(state.pipes, dest);
636   if (i === -1) return this;
637
638   state.pipes.splice(i, 1);
639   state.pipesCount -= 1;
640   if (state.pipesCount === 1) state.pipes = state.pipes[0];
641
642   dest.emit('unpipe', this);
643
644   return this;
645 };
646
647 // set up data events if they are asked for
648 // Ensure readable listeners eventually get something
649 Readable.prototype.on = function (ev, fn) {
650   var res = Stream.prototype.on.call(this, ev, fn);
651
652   // If listening to data, and it has not explicitly been paused,
653   // then call resume to start the flow of data on the next tick.
654   if (ev === 'data' && false !== this._readableState.flowing) {
655     this.resume();
656   }
657
658   if (ev === 'readable' && !this._readableState.endEmitted) {
659     var state = this._readableState;
660     if (!state.readableListening) {
661       state.readableListening = true;
662       state.emittedReadable = false;
663       state.needReadable = true;
664       if (!state.reading) {
665         processNextTick(nReadingNextTick, this);
666       } else if (state.length) {
667         emitReadable(this, state);
668       }
669     }
670   }
671
672   return res;
673 };
674 Readable.prototype.addListener = Readable.prototype.on;
675
676 function nReadingNextTick(self) {
677   debug('readable nexttick read 0');
678   self.read(0);
679 }
680
681 // pause() and resume() are remnants of the legacy readable stream API
682 // If the user uses them, then switch into old mode.
683 Readable.prototype.resume = function () {
684   var state = this._readableState;
685   if (!state.flowing) {
686     debug('resume');
687     state.flowing = true;
688     resume(this, state);
689   }
690   return this;
691 };
692
693 function resume(stream, state) {
694   if (!state.resumeScheduled) {
695     state.resumeScheduled = true;
696     processNextTick(resume_, stream, state);
697   }
698 }
699
700 function resume_(stream, state) {
701   if (!state.reading) {
702     debug('resume read 0');
703     stream.read(0);
704   }
705
706   state.resumeScheduled = false;
707   stream.emit('resume');
708   flow(stream);
709   if (state.flowing && !state.reading) stream.read(0);
710 }
711
712 Readable.prototype.pause = function () {
713   debug('call pause flowing=%j', this._readableState.flowing);
714   if (false !== this._readableState.flowing) {
715     debug('pause');
716     this._readableState.flowing = false;
717     this.emit('pause');
718   }
719   return this;
720 };
721
722 function flow(stream) {
723   var state = stream._readableState;
724   debug('flow', state.flowing);
725   if (state.flowing) {
726     do {
727       var chunk = stream.read();
728     } while (null !== chunk && state.flowing);
729   }
730 }
731
732 // wrap an old-style stream as the async data source.
733 // This is *not* part of the readable stream interface.
734 // It is an ugly unfortunate mess of history.
735 Readable.prototype.wrap = function (stream) {
736   var state = this._readableState;
737   var paused = false;
738
739   var self = this;
740   stream.on('end', function () {
741     debug('wrapped end');
742     if (state.decoder && !state.ended) {
743       var chunk = state.decoder.end();
744       if (chunk && chunk.length) self.push(chunk);
745     }
746
747     self.push(null);
748   });
749
750   stream.on('data', function (chunk) {
751     debug('wrapped data');
752     if (state.decoder) chunk = state.decoder.write(chunk);
753
754     // don't skip over falsy values in objectMode
755     if (state.objectMode && (chunk === null || chunk === undefined)) return;else if (!state.objectMode && (!chunk || !chunk.length)) return;
756
757     var ret = self.push(chunk);
758     if (!ret) {
759       paused = true;
760       stream.pause();
761     }
762   });
763
764   // proxy all the other methods.
765   // important when wrapping filters and duplexes.
766   for (var i in stream) {
767     if (this[i] === undefined && typeof stream[i] === 'function') {
768       this[i] = function (method) {
769         return function () {
770           return stream[method].apply(stream, arguments);
771         };
772       }(i);
773     }
774   }
775
776   // proxy certain important events.
777   var events = ['error', 'close', 'destroy', 'pause', 'resume'];
778   forEach(events, function (ev) {
779     stream.on(ev, self.emit.bind(self, ev));
780   });
781
782   // when we try to consume some more bytes, simply unpause the
783   // underlying stream.
784   self._read = function (n) {
785     debug('wrapped _read', n);
786     if (paused) {
787       paused = false;
788       stream.resume();
789     }
790   };
791
792   return self;
793 };
794
795 // exposed for testing purposes only.
796 Readable._fromList = fromList;
797
798 // Pluck off n bytes from an array of buffers.
799 // Length is the combined lengths of all the buffers in the list.
800 function fromList(n, state) {
801   var list = state.buffer;
802   var length = state.length;
803   var stringMode = !!state.decoder;
804   var objectMode = !!state.objectMode;
805   var ret;
806
807   // nothing in the list, definitely empty.
808   if (list.length === 0) return null;
809
810   if (length === 0) ret = null;else if (objectMode) ret = list.shift();else if (!n || n >= length) {
811     // read it all, truncate the array.
812     if (stringMode) ret = list.join('');else if (list.length === 1) ret = list[0];else ret = Buffer.concat(list, length);
813     list.length = 0;
814   } else {
815     // read just some of it.
816     if (n < list[0].length) {
817       // just take a part of the first list item.
818       // slice is the same for buffers and strings.
819       var buf = list[0];
820       ret = buf.slice(0, n);
821       list[0] = buf.slice(n);
822     } else if (n === list[0].length) {
823       // first list is a perfect match
824       ret = list.shift();
825     } else {
826       // complex case.
827       // we have enough to cover it, but it spans past the first buffer.
828       if (stringMode) ret = '';else ret = new Buffer(n);
829
830       var c = 0;
831       for (var i = 0, l = list.length; i < l && c < n; i++) {
832         var buf = list[0];
833         var cpy = Math.min(n - c, buf.length);
834
835         if (stringMode) ret += buf.slice(0, cpy);else buf.copy(ret, c, 0, cpy);
836
837         if (cpy < buf.length) list[0] = buf.slice(cpy);else list.shift();
838
839         c += cpy;
840       }
841     }
842   }
843
844   return ret;
845 }
846
847 function endReadable(stream) {
848   var state = stream._readableState;
849
850   // If we get here before consuming all the bytes, then that is a
851   // bug in node.  Should never happen.
852   if (state.length > 0) throw new Error('endReadable called on non-empty stream');
853
854   if (!state.endEmitted) {
855     state.ended = true;
856     processNextTick(endReadableNT, state, stream);
857   }
858 }
859
860 function endReadableNT(state, stream) {
861   // Check that we didn't get one last unshift.
862   if (!state.endEmitted && state.length === 0) {
863     state.endEmitted = true;
864     stream.readable = false;
865     stream.emit('end');
866   }
867 }
868
869 function forEach(xs, f) {
870   for (var i = 0, l = xs.length; i < l; i++) {
871     f(xs[i], i);
872   }
873 }
874
875 function indexOf(xs, x) {
876   for (var i = 0, l = xs.length; i < l; i++) {
877     if (xs[i] === x) return i;
878   }
879   return -1;
880 }