Version 1
[yaffs-website] / node_modules / concat-stream / node_modules / readable-stream / lib / _stream_writable.js
1 // A bit simpler than readable streams.
2 // Implement an async ._write(chunk, encoding, cb), and it'll handle all
3 // the drain event emission and buffering.
4
5 'use strict';
6
7 module.exports = Writable;
8
9 /*<replacement>*/
10 var processNextTick = require('process-nextick-args');
11 /*</replacement>*/
12
13 /*<replacement>*/
14 var asyncWrite = !process.browser && ['v0.10', 'v0.9.'].indexOf(process.version.slice(0, 5)) > -1 ? setImmediate : processNextTick;
15 /*</replacement>*/
16
17 /*<replacement>*/
18 var Buffer = require('buffer').Buffer;
19 /*</replacement>*/
20
21 Writable.WritableState = WritableState;
22
23 /*<replacement>*/
24 var util = require('core-util-is');
25 util.inherits = require('inherits');
26 /*</replacement>*/
27
28 /*<replacement>*/
29 var internalUtil = {
30   deprecate: require('util-deprecate')
31 };
32 /*</replacement>*/
33
34 /*<replacement>*/
35 var Stream;
36 (function () {
37   try {
38     Stream = require('st' + 'ream');
39   } catch (_) {} finally {
40     if (!Stream) Stream = require('events').EventEmitter;
41   }
42 })();
43 /*</replacement>*/
44
45 var Buffer = require('buffer').Buffer;
46
47 util.inherits(Writable, Stream);
48
49 function nop() {}
50
51 function WriteReq(chunk, encoding, cb) {
52   this.chunk = chunk;
53   this.encoding = encoding;
54   this.callback = cb;
55   this.next = null;
56 }
57
58 var Duplex;
59 function WritableState(options, stream) {
60   Duplex = Duplex || require('./_stream_duplex');
61
62   options = options || {};
63
64   // object stream flag to indicate whether or not this stream
65   // contains buffers or objects.
66   this.objectMode = !!options.objectMode;
67
68   if (stream instanceof Duplex) this.objectMode = this.objectMode || !!options.writableObjectMode;
69
70   // the point at which write() starts returning false
71   // Note: 0 is a valid value, means that we always return false if
72   // the entire buffer is not flushed immediately on write()
73   var hwm = options.highWaterMark;
74   var defaultHwm = this.objectMode ? 16 : 16 * 1024;
75   this.highWaterMark = hwm || hwm === 0 ? hwm : defaultHwm;
76
77   // cast to ints.
78   this.highWaterMark = ~ ~this.highWaterMark;
79
80   this.needDrain = false;
81   // at the start of calling end()
82   this.ending = false;
83   // when end() has been called, and returned
84   this.ended = false;
85   // when 'finish' is emitted
86   this.finished = false;
87
88   // should we decode strings into buffers before passing to _write?
89   // this is here so that some node-core streams can optimize string
90   // handling at a lower level.
91   var noDecode = options.decodeStrings === false;
92   this.decodeStrings = !noDecode;
93
94   // Crypto is kind of old and crusty.  Historically, its default string
95   // encoding is 'binary' so we have to make this configurable.
96   // Everything else in the universe uses 'utf8', though.
97   this.defaultEncoding = options.defaultEncoding || 'utf8';
98
99   // not an actual buffer we keep track of, but a measurement
100   // of how much we're waiting to get pushed to some underlying
101   // socket or file.
102   this.length = 0;
103
104   // a flag to see when we're in the middle of a write.
105   this.writing = false;
106
107   // when true all writes will be buffered until .uncork() call
108   this.corked = 0;
109
110   // a flag to be able to tell if the onwrite cb is called immediately,
111   // or on a later tick.  We set this to true at first, because any
112   // actions that shouldn't happen until "later" should generally also
113   // not happen before the first write call.
114   this.sync = true;
115
116   // a flag to know if we're processing previously buffered items, which
117   // may call the _write() callback in the same tick, so that we don't
118   // end up in an overlapped onwrite situation.
119   this.bufferProcessing = false;
120
121   // the callback that's passed to _write(chunk,cb)
122   this.onwrite = function (er) {
123     onwrite(stream, er);
124   };
125
126   // the callback that the user supplies to write(chunk,encoding,cb)
127   this.writecb = null;
128
129   // the amount that is being written when _write is called.
130   this.writelen = 0;
131
132   this.bufferedRequest = null;
133   this.lastBufferedRequest = null;
134
135   // number of pending user-supplied write callbacks
136   // this must be 0 before 'finish' can be emitted
137   this.pendingcb = 0;
138
139   // emit prefinish if the only thing we're waiting for is _write cbs
140   // This is relevant for synchronous Transform streams
141   this.prefinished = false;
142
143   // True if the error was already emitted and should not be thrown again
144   this.errorEmitted = false;
145
146   // count buffered requests
147   this.bufferedRequestCount = 0;
148
149   // create the two objects needed to store the corked requests
150   // they are not a linked list, as no new elements are inserted in there
151   this.corkedRequestsFree = new CorkedRequest(this);
152   this.corkedRequestsFree.next = new CorkedRequest(this);
153 }
154
155 WritableState.prototype.getBuffer = function writableStateGetBuffer() {
156   var current = this.bufferedRequest;
157   var out = [];
158   while (current) {
159     out.push(current);
160     current = current.next;
161   }
162   return out;
163 };
164
165 (function () {
166   try {
167     Object.defineProperty(WritableState.prototype, 'buffer', {
168       get: internalUtil.deprecate(function () {
169         return this.getBuffer();
170       }, '_writableState.buffer is deprecated. Use _writableState.getBuffer ' + 'instead.')
171     });
172   } catch (_) {}
173 })();
174
175 var Duplex;
176 function Writable(options) {
177   Duplex = Duplex || require('./_stream_duplex');
178
179   // Writable ctor is applied to Duplexes, though they're not
180   // instanceof Writable, they're instanceof Readable.
181   if (!(this instanceof Writable) && !(this instanceof Duplex)) return new Writable(options);
182
183   this._writableState = new WritableState(options, this);
184
185   // legacy.
186   this.writable = true;
187
188   if (options) {
189     if (typeof options.write === 'function') this._write = options.write;
190
191     if (typeof options.writev === 'function') this._writev = options.writev;
192   }
193
194   Stream.call(this);
195 }
196
197 // Otherwise people can pipe Writable streams, which is just wrong.
198 Writable.prototype.pipe = function () {
199   this.emit('error', new Error('Cannot pipe. Not readable.'));
200 };
201
202 function writeAfterEnd(stream, cb) {
203   var er = new Error('write after end');
204   // TODO: defer error events consistently everywhere, not just the cb
205   stream.emit('error', er);
206   processNextTick(cb, er);
207 }
208
209 // If we get something that is not a buffer, string, null, or undefined,
210 // and we're not in objectMode, then that's an error.
211 // Otherwise stream chunks are all considered to be of length=1, and the
212 // watermarks determine how many objects to keep in the buffer, rather than
213 // how many bytes or characters.
214 function validChunk(stream, state, chunk, cb) {
215   var valid = true;
216
217   if (!Buffer.isBuffer(chunk) && typeof chunk !== 'string' && chunk !== null && chunk !== undefined && !state.objectMode) {
218     var er = new TypeError('Invalid non-string/buffer chunk');
219     stream.emit('error', er);
220     processNextTick(cb, er);
221     valid = false;
222   }
223   return valid;
224 }
225
226 Writable.prototype.write = function (chunk, encoding, cb) {
227   var state = this._writableState;
228   var ret = false;
229
230   if (typeof encoding === 'function') {
231     cb = encoding;
232     encoding = null;
233   }
234
235   if (Buffer.isBuffer(chunk)) encoding = 'buffer';else if (!encoding) encoding = state.defaultEncoding;
236
237   if (typeof cb !== 'function') cb = nop;
238
239   if (state.ended) writeAfterEnd(this, cb);else if (validChunk(this, state, chunk, cb)) {
240     state.pendingcb++;
241     ret = writeOrBuffer(this, state, chunk, encoding, cb);
242   }
243
244   return ret;
245 };
246
247 Writable.prototype.cork = function () {
248   var state = this._writableState;
249
250   state.corked++;
251 };
252
253 Writable.prototype.uncork = function () {
254   var state = this._writableState;
255
256   if (state.corked) {
257     state.corked--;
258
259     if (!state.writing && !state.corked && !state.finished && !state.bufferProcessing && state.bufferedRequest) clearBuffer(this, state);
260   }
261 };
262
263 Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) {
264   // node::ParseEncoding() requires lower case.
265   if (typeof encoding === 'string') encoding = encoding.toLowerCase();
266   if (!(['hex', 'utf8', 'utf-8', 'ascii', 'binary', 'base64', 'ucs2', 'ucs-2', 'utf16le', 'utf-16le', 'raw'].indexOf((encoding + '').toLowerCase()) > -1)) throw new TypeError('Unknown encoding: ' + encoding);
267   this._writableState.defaultEncoding = encoding;
268 };
269
270 function decodeChunk(state, chunk, encoding) {
271   if (!state.objectMode && state.decodeStrings !== false && typeof chunk === 'string') {
272     chunk = new Buffer(chunk, encoding);
273   }
274   return chunk;
275 }
276
277 // if we're already writing something, then just put this
278 // in the queue, and wait our turn.  Otherwise, call _write
279 // If we return false, then we need a drain event, so set that flag.
280 function writeOrBuffer(stream, state, chunk, encoding, cb) {
281   chunk = decodeChunk(state, chunk, encoding);
282
283   if (Buffer.isBuffer(chunk)) encoding = 'buffer';
284   var len = state.objectMode ? 1 : chunk.length;
285
286   state.length += len;
287
288   var ret = state.length < state.highWaterMark;
289   // we must ensure that previous needDrain will not be reset to false.
290   if (!ret) state.needDrain = true;
291
292   if (state.writing || state.corked) {
293     var last = state.lastBufferedRequest;
294     state.lastBufferedRequest = new WriteReq(chunk, encoding, cb);
295     if (last) {
296       last.next = state.lastBufferedRequest;
297     } else {
298       state.bufferedRequest = state.lastBufferedRequest;
299     }
300     state.bufferedRequestCount += 1;
301   } else {
302     doWrite(stream, state, false, len, chunk, encoding, cb);
303   }
304
305   return ret;
306 }
307
308 function doWrite(stream, state, writev, len, chunk, encoding, cb) {
309   state.writelen = len;
310   state.writecb = cb;
311   state.writing = true;
312   state.sync = true;
313   if (writev) stream._writev(chunk, state.onwrite);else stream._write(chunk, encoding, state.onwrite);
314   state.sync = false;
315 }
316
317 function onwriteError(stream, state, sync, er, cb) {
318   --state.pendingcb;
319   if (sync) processNextTick(cb, er);else cb(er);
320
321   stream._writableState.errorEmitted = true;
322   stream.emit('error', er);
323 }
324
325 function onwriteStateUpdate(state) {
326   state.writing = false;
327   state.writecb = null;
328   state.length -= state.writelen;
329   state.writelen = 0;
330 }
331
332 function onwrite(stream, er) {
333   var state = stream._writableState;
334   var sync = state.sync;
335   var cb = state.writecb;
336
337   onwriteStateUpdate(state);
338
339   if (er) onwriteError(stream, state, sync, er, cb);else {
340     // Check if we're actually ready to finish, but don't emit yet
341     var finished = needFinish(state);
342
343     if (!finished && !state.corked && !state.bufferProcessing && state.bufferedRequest) {
344       clearBuffer(stream, state);
345     }
346
347     if (sync) {
348       /*<replacement>*/
349       asyncWrite(afterWrite, stream, state, finished, cb);
350       /*</replacement>*/
351     } else {
352         afterWrite(stream, state, finished, cb);
353       }
354   }
355 }
356
357 function afterWrite(stream, state, finished, cb) {
358   if (!finished) onwriteDrain(stream, state);
359   state.pendingcb--;
360   cb();
361   finishMaybe(stream, state);
362 }
363
364 // Must force callback to be called on nextTick, so that we don't
365 // emit 'drain' before the write() consumer gets the 'false' return
366 // value, and has a chance to attach a 'drain' listener.
367 function onwriteDrain(stream, state) {
368   if (state.length === 0 && state.needDrain) {
369     state.needDrain = false;
370     stream.emit('drain');
371   }
372 }
373
374 // if there's something in the buffer waiting, then process it
375 function clearBuffer(stream, state) {
376   state.bufferProcessing = true;
377   var entry = state.bufferedRequest;
378
379   if (stream._writev && entry && entry.next) {
380     // Fast case, write everything using _writev()
381     var l = state.bufferedRequestCount;
382     var buffer = new Array(l);
383     var holder = state.corkedRequestsFree;
384     holder.entry = entry;
385
386     var count = 0;
387     while (entry) {
388       buffer[count] = entry;
389       entry = entry.next;
390       count += 1;
391     }
392
393     doWrite(stream, state, true, state.length, buffer, '', holder.finish);
394
395     // doWrite is always async, defer these to save a bit of time
396     // as the hot path ends with doWrite
397     state.pendingcb++;
398     state.lastBufferedRequest = null;
399     state.corkedRequestsFree = holder.next;
400     holder.next = null;
401   } else {
402     // Slow case, write chunks one-by-one
403     while (entry) {
404       var chunk = entry.chunk;
405       var encoding = entry.encoding;
406       var cb = entry.callback;
407       var len = state.objectMode ? 1 : chunk.length;
408
409       doWrite(stream, state, false, len, chunk, encoding, cb);
410       entry = entry.next;
411       // if we didn't call the onwrite immediately, then
412       // it means that we need to wait until it does.
413       // also, that means that the chunk and cb are currently
414       // being processed, so move the buffer counter past them.
415       if (state.writing) {
416         break;
417       }
418     }
419
420     if (entry === null) state.lastBufferedRequest = null;
421   }
422
423   state.bufferedRequestCount = 0;
424   state.bufferedRequest = entry;
425   state.bufferProcessing = false;
426 }
427
428 Writable.prototype._write = function (chunk, encoding, cb) {
429   cb(new Error('not implemented'));
430 };
431
432 Writable.prototype._writev = null;
433
434 Writable.prototype.end = function (chunk, encoding, cb) {
435   var state = this._writableState;
436
437   if (typeof chunk === 'function') {
438     cb = chunk;
439     chunk = null;
440     encoding = null;
441   } else if (typeof encoding === 'function') {
442     cb = encoding;
443     encoding = null;
444   }
445
446   if (chunk !== null && chunk !== undefined) this.write(chunk, encoding);
447
448   // .end() fully uncorks
449   if (state.corked) {
450     state.corked = 1;
451     this.uncork();
452   }
453
454   // ignore unnecessary end() calls.
455   if (!state.ending && !state.finished) endWritable(this, state, cb);
456 };
457
458 function needFinish(state) {
459   return state.ending && state.length === 0 && state.bufferedRequest === null && !state.finished && !state.writing;
460 }
461
462 function prefinish(stream, state) {
463   if (!state.prefinished) {
464     state.prefinished = true;
465     stream.emit('prefinish');
466   }
467 }
468
469 function finishMaybe(stream, state) {
470   var need = needFinish(state);
471   if (need) {
472     if (state.pendingcb === 0) {
473       prefinish(stream, state);
474       state.finished = true;
475       stream.emit('finish');
476     } else {
477       prefinish(stream, state);
478     }
479   }
480   return need;
481 }
482
483 function endWritable(stream, state, cb) {
484   state.ending = true;
485   finishMaybe(stream, state);
486   if (cb) {
487     if (state.finished) processNextTick(cb);else stream.once('finish', cb);
488   }
489   state.ended = true;
490   stream.writable = false;
491 }
492
493 // It seems a linked list but it is not
494 // there will be only 2 of these for each stream
495 function CorkedRequest(state) {
496   var _this = this;
497
498   this.next = null;
499   this.entry = null;
500
501   this.finish = function (err) {
502     var entry = _this.entry;
503     _this.entry = null;
504     while (entry) {
505       var cb = entry.callback;
506       state.pendingcb--;
507       cb(err);
508       entry = entry.next;
509     }
510     if (state.corkedRequestsFree) {
511       state.corkedRequestsFree.next = _this;
512     } else {
513       state.corkedRequestsFree = _this;
514     }
515   };
516 }