Initial commit
[yaffs-website] / node_modules / stdout-stream / node_modules / readable-stream / lib / _stream_writable.js
1 // A bit simpler than readable streams.
2 // Implement an async ._write(chunk, encoding, cb), and it'll handle all
3 // the drain event emission and buffering.
4
5 'use strict';
6
7 module.exports = Writable;
8
9 /*<replacement>*/
10 var processNextTick = require('process-nextick-args');
11 /*</replacement>*/
12
13 /*<replacement>*/
14 var asyncWrite = !process.browser && ['v0.10', 'v0.9.'].indexOf(process.version.slice(0, 5)) > -1 ? setImmediate : processNextTick;
15 /*</replacement>*/
16
17 /*<replacement>*/
18 var Duplex;
19 /*</replacement>*/
20
21 Writable.WritableState = WritableState;
22
23 /*<replacement>*/
24 var util = require('core-util-is');
25 util.inherits = require('inherits');
26 /*</replacement>*/
27
28 /*<replacement>*/
29 var internalUtil = {
30   deprecate: require('util-deprecate')
31 };
32 /*</replacement>*/
33
34 /*<replacement>*/
35 var Stream;
36 (function () {
37   try {
38     Stream = require('st' + 'ream');
39   } catch (_) {} finally {
40     if (!Stream) Stream = require('events').EventEmitter;
41   }
42 })();
43 /*</replacement>*/
44
45 var Buffer = require('buffer').Buffer;
46 /*<replacement>*/
47 var bufferShim = require('buffer-shims');
48 /*</replacement>*/
49
50 util.inherits(Writable, Stream);
51
52 function nop() {}
53
54 function WriteReq(chunk, encoding, cb) {
55   this.chunk = chunk;
56   this.encoding = encoding;
57   this.callback = cb;
58   this.next = null;
59 }
60
61 function WritableState(options, stream) {
62   Duplex = Duplex || require('./_stream_duplex');
63
64   options = options || {};
65
66   // object stream flag to indicate whether or not this stream
67   // contains buffers or objects.
68   this.objectMode = !!options.objectMode;
69
70   if (stream instanceof Duplex) this.objectMode = this.objectMode || !!options.writableObjectMode;
71
72   // the point at which write() starts returning false
73   // Note: 0 is a valid value, means that we always return false if
74   // the entire buffer is not flushed immediately on write()
75   var hwm = options.highWaterMark;
76   var defaultHwm = this.objectMode ? 16 : 16 * 1024;
77   this.highWaterMark = hwm || hwm === 0 ? hwm : defaultHwm;
78
79   // cast to ints.
80   this.highWaterMark = ~ ~this.highWaterMark;
81
82   // drain event flag.
83   this.needDrain = false;
84   // at the start of calling end()
85   this.ending = false;
86   // when end() has been called, and returned
87   this.ended = false;
88   // when 'finish' is emitted
89   this.finished = false;
90
91   // should we decode strings into buffers before passing to _write?
92   // this is here so that some node-core streams can optimize string
93   // handling at a lower level.
94   var noDecode = options.decodeStrings === false;
95   this.decodeStrings = !noDecode;
96
97   // Crypto is kind of old and crusty.  Historically, its default string
98   // encoding is 'binary' so we have to make this configurable.
99   // Everything else in the universe uses 'utf8', though.
100   this.defaultEncoding = options.defaultEncoding || 'utf8';
101
102   // not an actual buffer we keep track of, but a measurement
103   // of how much we're waiting to get pushed to some underlying
104   // socket or file.
105   this.length = 0;
106
107   // a flag to see when we're in the middle of a write.
108   this.writing = false;
109
110   // when true all writes will be buffered until .uncork() call
111   this.corked = 0;
112
113   // a flag to be able to tell if the onwrite cb is called immediately,
114   // or on a later tick.  We set this to true at first, because any
115   // actions that shouldn't happen until "later" should generally also
116   // not happen before the first write call.
117   this.sync = true;
118
119   // a flag to know if we're processing previously buffered items, which
120   // may call the _write() callback in the same tick, so that we don't
121   // end up in an overlapped onwrite situation.
122   this.bufferProcessing = false;
123
124   // the callback that's passed to _write(chunk,cb)
125   this.onwrite = function (er) {
126     onwrite(stream, er);
127   };
128
129   // the callback that the user supplies to write(chunk,encoding,cb)
130   this.writecb = null;
131
132   // the amount that is being written when _write is called.
133   this.writelen = 0;
134
135   this.bufferedRequest = null;
136   this.lastBufferedRequest = null;
137
138   // number of pending user-supplied write callbacks
139   // this must be 0 before 'finish' can be emitted
140   this.pendingcb = 0;
141
142   // emit prefinish if the only thing we're waiting for is _write cbs
143   // This is relevant for synchronous Transform streams
144   this.prefinished = false;
145
146   // True if the error was already emitted and should not be thrown again
147   this.errorEmitted = false;
148
149   // count buffered requests
150   this.bufferedRequestCount = 0;
151
152   // allocate the first CorkedRequest, there is always
153   // one allocated and free to use, and we maintain at most two
154   this.corkedRequestsFree = new CorkedRequest(this);
155 }
156
157 WritableState.prototype.getBuffer = function getBuffer() {
158   var current = this.bufferedRequest;
159   var out = [];
160   while (current) {
161     out.push(current);
162     current = current.next;
163   }
164   return out;
165 };
166
167 (function () {
168   try {
169     Object.defineProperty(WritableState.prototype, 'buffer', {
170       get: internalUtil.deprecate(function () {
171         return this.getBuffer();
172       }, '_writableState.buffer is deprecated. Use _writableState.getBuffer ' + 'instead.')
173     });
174   } catch (_) {}
175 })();
176
177 // Test _writableState for inheritance to account for Duplex streams,
178 // whose prototype chain only points to Readable.
179 var realHasInstance;
180 if (typeof Symbol === 'function' && Symbol.hasInstance && typeof Function.prototype[Symbol.hasInstance] === 'function') {
181   realHasInstance = Function.prototype[Symbol.hasInstance];
182   Object.defineProperty(Writable, Symbol.hasInstance, {
183     value: function (object) {
184       if (realHasInstance.call(this, object)) return true;
185
186       return object && object._writableState instanceof WritableState;
187     }
188   });
189 } else {
190   realHasInstance = function (object) {
191     return object instanceof this;
192   };
193 }
194
195 function Writable(options) {
196   Duplex = Duplex || require('./_stream_duplex');
197
198   // Writable ctor is applied to Duplexes, too.
199   // `realHasInstance` is necessary because using plain `instanceof`
200   // would return false, as no `_writableState` property is attached.
201
202   // Trying to use the custom `instanceof` for Writable here will also break the
203   // Node.js LazyTransform implementation, which has a non-trivial getter for
204   // `_writableState` that would lead to infinite recursion.
205   if (!realHasInstance.call(Writable, this) && !(this instanceof Duplex)) {
206     return new Writable(options);
207   }
208
209   this._writableState = new WritableState(options, this);
210
211   // legacy.
212   this.writable = true;
213
214   if (options) {
215     if (typeof options.write === 'function') this._write = options.write;
216
217     if (typeof options.writev === 'function') this._writev = options.writev;
218   }
219
220   Stream.call(this);
221 }
222
223 // Otherwise people can pipe Writable streams, which is just wrong.
224 Writable.prototype.pipe = function () {
225   this.emit('error', new Error('Cannot pipe, not readable'));
226 };
227
228 function writeAfterEnd(stream, cb) {
229   var er = new Error('write after end');
230   // TODO: defer error events consistently everywhere, not just the cb
231   stream.emit('error', er);
232   processNextTick(cb, er);
233 }
234
235 // If we get something that is not a buffer, string, null, or undefined,
236 // and we're not in objectMode, then that's an error.
237 // Otherwise stream chunks are all considered to be of length=1, and the
238 // watermarks determine how many objects to keep in the buffer, rather than
239 // how many bytes or characters.
240 function validChunk(stream, state, chunk, cb) {
241   var valid = true;
242   var er = false;
243   // Always throw error if a null is written
244   // if we are not in object mode then throw
245   // if it is not a buffer, string, or undefined.
246   if (chunk === null) {
247     er = new TypeError('May not write null values to stream');
248   } else if (!Buffer.isBuffer(chunk) && typeof chunk !== 'string' && chunk !== undefined && !state.objectMode) {
249     er = new TypeError('Invalid non-string/buffer chunk');
250   }
251   if (er) {
252     stream.emit('error', er);
253     processNextTick(cb, er);
254     valid = false;
255   }
256   return valid;
257 }
258
259 Writable.prototype.write = function (chunk, encoding, cb) {
260   var state = this._writableState;
261   var ret = false;
262
263   if (typeof encoding === 'function') {
264     cb = encoding;
265     encoding = null;
266   }
267
268   if (Buffer.isBuffer(chunk)) encoding = 'buffer';else if (!encoding) encoding = state.defaultEncoding;
269
270   if (typeof cb !== 'function') cb = nop;
271
272   if (state.ended) writeAfterEnd(this, cb);else if (validChunk(this, state, chunk, cb)) {
273     state.pendingcb++;
274     ret = writeOrBuffer(this, state, chunk, encoding, cb);
275   }
276
277   return ret;
278 };
279
280 Writable.prototype.cork = function () {
281   var state = this._writableState;
282
283   state.corked++;
284 };
285
286 Writable.prototype.uncork = function () {
287   var state = this._writableState;
288
289   if (state.corked) {
290     state.corked--;
291
292     if (!state.writing && !state.corked && !state.finished && !state.bufferProcessing && state.bufferedRequest) clearBuffer(this, state);
293   }
294 };
295
296 Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) {
297   // node::ParseEncoding() requires lower case.
298   if (typeof encoding === 'string') encoding = encoding.toLowerCase();
299   if (!(['hex', 'utf8', 'utf-8', 'ascii', 'binary', 'base64', 'ucs2', 'ucs-2', 'utf16le', 'utf-16le', 'raw'].indexOf((encoding + '').toLowerCase()) > -1)) throw new TypeError('Unknown encoding: ' + encoding);
300   this._writableState.defaultEncoding = encoding;
301   return this;
302 };
303
304 function decodeChunk(state, chunk, encoding) {
305   if (!state.objectMode && state.decodeStrings !== false && typeof chunk === 'string') {
306     chunk = bufferShim.from(chunk, encoding);
307   }
308   return chunk;
309 }
310
311 // if we're already writing something, then just put this
312 // in the queue, and wait our turn.  Otherwise, call _write
313 // If we return false, then we need a drain event, so set that flag.
314 function writeOrBuffer(stream, state, chunk, encoding, cb) {
315   chunk = decodeChunk(state, chunk, encoding);
316
317   if (Buffer.isBuffer(chunk)) encoding = 'buffer';
318   var len = state.objectMode ? 1 : chunk.length;
319
320   state.length += len;
321
322   var ret = state.length < state.highWaterMark;
323   // we must ensure that previous needDrain will not be reset to false.
324   if (!ret) state.needDrain = true;
325
326   if (state.writing || state.corked) {
327     var last = state.lastBufferedRequest;
328     state.lastBufferedRequest = new WriteReq(chunk, encoding, cb);
329     if (last) {
330       last.next = state.lastBufferedRequest;
331     } else {
332       state.bufferedRequest = state.lastBufferedRequest;
333     }
334     state.bufferedRequestCount += 1;
335   } else {
336     doWrite(stream, state, false, len, chunk, encoding, cb);
337   }
338
339   return ret;
340 }
341
342 function doWrite(stream, state, writev, len, chunk, encoding, cb) {
343   state.writelen = len;
344   state.writecb = cb;
345   state.writing = true;
346   state.sync = true;
347   if (writev) stream._writev(chunk, state.onwrite);else stream._write(chunk, encoding, state.onwrite);
348   state.sync = false;
349 }
350
351 function onwriteError(stream, state, sync, er, cb) {
352   --state.pendingcb;
353   if (sync) processNextTick(cb, er);else cb(er);
354
355   stream._writableState.errorEmitted = true;
356   stream.emit('error', er);
357 }
358
359 function onwriteStateUpdate(state) {
360   state.writing = false;
361   state.writecb = null;
362   state.length -= state.writelen;
363   state.writelen = 0;
364 }
365
366 function onwrite(stream, er) {
367   var state = stream._writableState;
368   var sync = state.sync;
369   var cb = state.writecb;
370
371   onwriteStateUpdate(state);
372
373   if (er) onwriteError(stream, state, sync, er, cb);else {
374     // Check if we're actually ready to finish, but don't emit yet
375     var finished = needFinish(state);
376
377     if (!finished && !state.corked && !state.bufferProcessing && state.bufferedRequest) {
378       clearBuffer(stream, state);
379     }
380
381     if (sync) {
382       /*<replacement>*/
383       asyncWrite(afterWrite, stream, state, finished, cb);
384       /*</replacement>*/
385     } else {
386         afterWrite(stream, state, finished, cb);
387       }
388   }
389 }
390
391 function afterWrite(stream, state, finished, cb) {
392   if (!finished) onwriteDrain(stream, state);
393   state.pendingcb--;
394   cb();
395   finishMaybe(stream, state);
396 }
397
398 // Must force callback to be called on nextTick, so that we don't
399 // emit 'drain' before the write() consumer gets the 'false' return
400 // value, and has a chance to attach a 'drain' listener.
401 function onwriteDrain(stream, state) {
402   if (state.length === 0 && state.needDrain) {
403     state.needDrain = false;
404     stream.emit('drain');
405   }
406 }
407
408 // if there's something in the buffer waiting, then process it
409 function clearBuffer(stream, state) {
410   state.bufferProcessing = true;
411   var entry = state.bufferedRequest;
412
413   if (stream._writev && entry && entry.next) {
414     // Fast case, write everything using _writev()
415     var l = state.bufferedRequestCount;
416     var buffer = new Array(l);
417     var holder = state.corkedRequestsFree;
418     holder.entry = entry;
419
420     var count = 0;
421     while (entry) {
422       buffer[count] = entry;
423       entry = entry.next;
424       count += 1;
425     }
426
427     doWrite(stream, state, true, state.length, buffer, '', holder.finish);
428
429     // doWrite is almost always async, defer these to save a bit of time
430     // as the hot path ends with doWrite
431     state.pendingcb++;
432     state.lastBufferedRequest = null;
433     if (holder.next) {
434       state.corkedRequestsFree = holder.next;
435       holder.next = null;
436     } else {
437       state.corkedRequestsFree = new CorkedRequest(state);
438     }
439   } else {
440     // Slow case, write chunks one-by-one
441     while (entry) {
442       var chunk = entry.chunk;
443       var encoding = entry.encoding;
444       var cb = entry.callback;
445       var len = state.objectMode ? 1 : chunk.length;
446
447       doWrite(stream, state, false, len, chunk, encoding, cb);
448       entry = entry.next;
449       // if we didn't call the onwrite immediately, then
450       // it means that we need to wait until it does.
451       // also, that means that the chunk and cb are currently
452       // being processed, so move the buffer counter past them.
453       if (state.writing) {
454         break;
455       }
456     }
457
458     if (entry === null) state.lastBufferedRequest = null;
459   }
460
461   state.bufferedRequestCount = 0;
462   state.bufferedRequest = entry;
463   state.bufferProcessing = false;
464 }
465
466 Writable.prototype._write = function (chunk, encoding, cb) {
467   cb(new Error('_write() is not implemented'));
468 };
469
470 Writable.prototype._writev = null;
471
472 Writable.prototype.end = function (chunk, encoding, cb) {
473   var state = this._writableState;
474
475   if (typeof chunk === 'function') {
476     cb = chunk;
477     chunk = null;
478     encoding = null;
479   } else if (typeof encoding === 'function') {
480     cb = encoding;
481     encoding = null;
482   }
483
484   if (chunk !== null && chunk !== undefined) this.write(chunk, encoding);
485
486   // .end() fully uncorks
487   if (state.corked) {
488     state.corked = 1;
489     this.uncork();
490   }
491
492   // ignore unnecessary end() calls.
493   if (!state.ending && !state.finished) endWritable(this, state, cb);
494 };
495
496 function needFinish(state) {
497   return state.ending && state.length === 0 && state.bufferedRequest === null && !state.finished && !state.writing;
498 }
499
500 function prefinish(stream, state) {
501   if (!state.prefinished) {
502     state.prefinished = true;
503     stream.emit('prefinish');
504   }
505 }
506
507 function finishMaybe(stream, state) {
508   var need = needFinish(state);
509   if (need) {
510     if (state.pendingcb === 0) {
511       prefinish(stream, state);
512       state.finished = true;
513       stream.emit('finish');
514     } else {
515       prefinish(stream, state);
516     }
517   }
518   return need;
519 }
520
521 function endWritable(stream, state, cb) {
522   state.ending = true;
523   finishMaybe(stream, state);
524   if (cb) {
525     if (state.finished) processNextTick(cb);else stream.once('finish', cb);
526   }
527   state.ended = true;
528   stream.writable = false;
529 }
530
531 // It seems a linked list but it is not
532 // there will be only 2 of these for each stream
533 function CorkedRequest(state) {
534   var _this = this;
535
536   this.next = null;
537   this.entry = null;
538
539   this.finish = function (err) {
540     var entry = _this.entry;
541     _this.entry = null;
542     while (entry) {
543       var cb = entry.callback;
544       state.pendingcb--;
545       cb(err);
546       entry = entry.next;
547     }
548     if (state.corkedRequestsFree) {
549       state.corkedRequestsFree.next = _this;
550     } else {
551       state.corkedRequestsFree = _this;
552     }
553   };
554 }