Initial commit
[yaffs-website] / node_modules / readable-stream / lib / _stream_writable.js
1 // Copyright Joyent, Inc. and other Node contributors.
2 //
3 // Permission is hereby granted, free of charge, to any person obtaining a
4 // copy of this software and associated documentation files (the
5 // "Software"), to deal in the Software without restriction, including
6 // without limitation the rights to use, copy, modify, merge, publish,
7 // distribute, sublicense, and/or sell copies of the Software, and to permit
8 // persons to whom the Software is furnished to do so, subject to the
9 // following conditions:
10 //
11 // The above copyright notice and this permission notice shall be included
12 // in all copies or substantial portions of the Software.
13 //
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17 // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18 // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19 // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20 // USE OR OTHER DEALINGS IN THE SOFTWARE.
21
22 // A bit simpler than readable streams.
23 // Implement an async ._write(chunk, cb), and it'll handle all
24 // the drain event emission and buffering.
25
26 module.exports = Writable;
27
28 /*<replacement>*/
29 var Buffer = require('buffer').Buffer;
30 /*</replacement>*/
31
32 Writable.WritableState = WritableState;
33
34
35 /*<replacement>*/
36 var util = require('core-util-is');
37 util.inherits = require('inherits');
38 /*</replacement>*/
39
40 var Stream = require('stream');
41
42 util.inherits(Writable, Stream);
43
44 function WriteReq(chunk, encoding, cb) {
45   this.chunk = chunk;
46   this.encoding = encoding;
47   this.callback = cb;
48 }
49
50 function WritableState(options, stream) {
51   var Duplex = require('./_stream_duplex');
52
53   options = options || {};
54
55   // the point at which write() starts returning false
56   // Note: 0 is a valid value, means that we always return false if
57   // the entire buffer is not flushed immediately on write()
58   var hwm = options.highWaterMark;
59   var defaultHwm = options.objectMode ? 16 : 16 * 1024;
60   this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;
61
62   // object stream flag to indicate whether or not this stream
63   // contains buffers or objects.
64   this.objectMode = !!options.objectMode;
65
66   if (stream instanceof Duplex)
67     this.objectMode = this.objectMode || !!options.writableObjectMode;
68
69   // cast to ints.
70   this.highWaterMark = ~~this.highWaterMark;
71
72   this.needDrain = false;
73   // at the start of calling end()
74   this.ending = false;
75   // when end() has been called, and returned
76   this.ended = false;
77   // when 'finish' is emitted
78   this.finished = false;
79
80   // should we decode strings into buffers before passing to _write?
81   // this is here so that some node-core streams can optimize string
82   // handling at a lower level.
83   var noDecode = options.decodeStrings === false;
84   this.decodeStrings = !noDecode;
85
86   // Crypto is kind of old and crusty.  Historically, its default string
87   // encoding is 'binary' so we have to make this configurable.
88   // Everything else in the universe uses 'utf8', though.
89   this.defaultEncoding = options.defaultEncoding || 'utf8';
90
91   // not an actual buffer we keep track of, but a measurement
92   // of how much we're waiting to get pushed to some underlying
93   // socket or file.
94   this.length = 0;
95
96   // a flag to see when we're in the middle of a write.
97   this.writing = false;
98
99   // when true all writes will be buffered until .uncork() call
100   this.corked = 0;
101
102   // a flag to be able to tell if the onwrite cb is called immediately,
103   // or on a later tick.  We set this to true at first, because any
104   // actions that shouldn't happen until "later" should generally also
105   // not happen before the first write call.
106   this.sync = true;
107
108   // a flag to know if we're processing previously buffered items, which
109   // may call the _write() callback in the same tick, so that we don't
110   // end up in an overlapped onwrite situation.
111   this.bufferProcessing = false;
112
113   // the callback that's passed to _write(chunk,cb)
114   this.onwrite = function(er) {
115     onwrite(stream, er);
116   };
117
118   // the callback that the user supplies to write(chunk,encoding,cb)
119   this.writecb = null;
120
121   // the amount that is being written when _write is called.
122   this.writelen = 0;
123
124   this.buffer = [];
125
126   // number of pending user-supplied write callbacks
127   // this must be 0 before 'finish' can be emitted
128   this.pendingcb = 0;
129
130   // emit prefinish if the only thing we're waiting for is _write cbs
131   // This is relevant for synchronous Transform streams
132   this.prefinished = false;
133
134   // True if the error was already emitted and should not be thrown again
135   this.errorEmitted = false;
136 }
137
138 function Writable(options) {
139   var Duplex = require('./_stream_duplex');
140
141   // Writable ctor is applied to Duplexes, though they're not
142   // instanceof Writable, they're instanceof Readable.
143   if (!(this instanceof Writable) && !(this instanceof Duplex))
144     return new Writable(options);
145
146   this._writableState = new WritableState(options, this);
147
148   // legacy.
149   this.writable = true;
150
151   Stream.call(this);
152 }
153
154 // Otherwise people can pipe Writable streams, which is just wrong.
155 Writable.prototype.pipe = function() {
156   this.emit('error', new Error('Cannot pipe. Not readable.'));
157 };
158
159
160 function writeAfterEnd(stream, state, cb) {
161   var er = new Error('write after end');
162   // TODO: defer error events consistently everywhere, not just the cb
163   stream.emit('error', er);
164   process.nextTick(function() {
165     cb(er);
166   });
167 }
168
169 // If we get something that is not a buffer, string, null, or undefined,
170 // and we're not in objectMode, then that's an error.
171 // Otherwise stream chunks are all considered to be of length=1, and the
172 // watermarks determine how many objects to keep in the buffer, rather than
173 // how many bytes or characters.
174 function validChunk(stream, state, chunk, cb) {
175   var valid = true;
176   if (!util.isBuffer(chunk) &&
177       !util.isString(chunk) &&
178       !util.isNullOrUndefined(chunk) &&
179       !state.objectMode) {
180     var er = new TypeError('Invalid non-string/buffer chunk');
181     stream.emit('error', er);
182     process.nextTick(function() {
183       cb(er);
184     });
185     valid = false;
186   }
187   return valid;
188 }
189
190 Writable.prototype.write = function(chunk, encoding, cb) {
191   var state = this._writableState;
192   var ret = false;
193
194   if (util.isFunction(encoding)) {
195     cb = encoding;
196     encoding = null;
197   }
198
199   if (util.isBuffer(chunk))
200     encoding = 'buffer';
201   else if (!encoding)
202     encoding = state.defaultEncoding;
203
204   if (!util.isFunction(cb))
205     cb = function() {};
206
207   if (state.ended)
208     writeAfterEnd(this, state, cb);
209   else if (validChunk(this, state, chunk, cb)) {
210     state.pendingcb++;
211     ret = writeOrBuffer(this, state, chunk, encoding, cb);
212   }
213
214   return ret;
215 };
216
217 Writable.prototype.cork = function() {
218   var state = this._writableState;
219
220   state.corked++;
221 };
222
223 Writable.prototype.uncork = function() {
224   var state = this._writableState;
225
226   if (state.corked) {
227     state.corked--;
228
229     if (!state.writing &&
230         !state.corked &&
231         !state.finished &&
232         !state.bufferProcessing &&
233         state.buffer.length)
234       clearBuffer(this, state);
235   }
236 };
237
238 function decodeChunk(state, chunk, encoding) {
239   if (!state.objectMode &&
240       state.decodeStrings !== false &&
241       util.isString(chunk)) {
242     chunk = new Buffer(chunk, encoding);
243   }
244   return chunk;
245 }
246
247 // if we're already writing something, then just put this
248 // in the queue, and wait our turn.  Otherwise, call _write
249 // If we return false, then we need a drain event, so set that flag.
250 function writeOrBuffer(stream, state, chunk, encoding, cb) {
251   chunk = decodeChunk(state, chunk, encoding);
252   if (util.isBuffer(chunk))
253     encoding = 'buffer';
254   var len = state.objectMode ? 1 : chunk.length;
255
256   state.length += len;
257
258   var ret = state.length < state.highWaterMark;
259   // we must ensure that previous needDrain will not be reset to false.
260   if (!ret)
261     state.needDrain = true;
262
263   if (state.writing || state.corked)
264     state.buffer.push(new WriteReq(chunk, encoding, cb));
265   else
266     doWrite(stream, state, false, len, chunk, encoding, cb);
267
268   return ret;
269 }
270
271 function doWrite(stream, state, writev, len, chunk, encoding, cb) {
272   state.writelen = len;
273   state.writecb = cb;
274   state.writing = true;
275   state.sync = true;
276   if (writev)
277     stream._writev(chunk, state.onwrite);
278   else
279     stream._write(chunk, encoding, state.onwrite);
280   state.sync = false;
281 }
282
283 function onwriteError(stream, state, sync, er, cb) {
284   if (sync)
285     process.nextTick(function() {
286       state.pendingcb--;
287       cb(er);
288     });
289   else {
290     state.pendingcb--;
291     cb(er);
292   }
293
294   stream._writableState.errorEmitted = true;
295   stream.emit('error', er);
296 }
297
298 function onwriteStateUpdate(state) {
299   state.writing = false;
300   state.writecb = null;
301   state.length -= state.writelen;
302   state.writelen = 0;
303 }
304
305 function onwrite(stream, er) {
306   var state = stream._writableState;
307   var sync = state.sync;
308   var cb = state.writecb;
309
310   onwriteStateUpdate(state);
311
312   if (er)
313     onwriteError(stream, state, sync, er, cb);
314   else {
315     // Check if we're actually ready to finish, but don't emit yet
316     var finished = needFinish(stream, state);
317
318     if (!finished &&
319         !state.corked &&
320         !state.bufferProcessing &&
321         state.buffer.length) {
322       clearBuffer(stream, state);
323     }
324
325     if (sync) {
326       process.nextTick(function() {
327         afterWrite(stream, state, finished, cb);
328       });
329     } else {
330       afterWrite(stream, state, finished, cb);
331     }
332   }
333 }
334
335 function afterWrite(stream, state, finished, cb) {
336   if (!finished)
337     onwriteDrain(stream, state);
338   state.pendingcb--;
339   cb();
340   finishMaybe(stream, state);
341 }
342
343 // Must force callback to be called on nextTick, so that we don't
344 // emit 'drain' before the write() consumer gets the 'false' return
345 // value, and has a chance to attach a 'drain' listener.
346 function onwriteDrain(stream, state) {
347   if (state.length === 0 && state.needDrain) {
348     state.needDrain = false;
349     stream.emit('drain');
350   }
351 }
352
353
354 // if there's something in the buffer waiting, then process it
355 function clearBuffer(stream, state) {
356   state.bufferProcessing = true;
357
358   if (stream._writev && state.buffer.length > 1) {
359     // Fast case, write everything using _writev()
360     var cbs = [];
361     for (var c = 0; c < state.buffer.length; c++)
362       cbs.push(state.buffer[c].callback);
363
364     // count the one we are adding, as well.
365     // TODO(isaacs) clean this up
366     state.pendingcb++;
367     doWrite(stream, state, true, state.length, state.buffer, '', function(err) {
368       for (var i = 0; i < cbs.length; i++) {
369         state.pendingcb--;
370         cbs[i](err);
371       }
372     });
373
374     // Clear buffer
375     state.buffer = [];
376   } else {
377     // Slow case, write chunks one-by-one
378     for (var c = 0; c < state.buffer.length; c++) {
379       var entry = state.buffer[c];
380       var chunk = entry.chunk;
381       var encoding = entry.encoding;
382       var cb = entry.callback;
383       var len = state.objectMode ? 1 : chunk.length;
384
385       doWrite(stream, state, false, len, chunk, encoding, cb);
386
387       // if we didn't call the onwrite immediately, then
388       // it means that we need to wait until it does.
389       // also, that means that the chunk and cb are currently
390       // being processed, so move the buffer counter past them.
391       if (state.writing) {
392         c++;
393         break;
394       }
395     }
396
397     if (c < state.buffer.length)
398       state.buffer = state.buffer.slice(c);
399     else
400       state.buffer.length = 0;
401   }
402
403   state.bufferProcessing = false;
404 }
405
406 Writable.prototype._write = function(chunk, encoding, cb) {
407   cb(new Error('not implemented'));
408
409 };
410
411 Writable.prototype._writev = null;
412
413 Writable.prototype.end = function(chunk, encoding, cb) {
414   var state = this._writableState;
415
416   if (util.isFunction(chunk)) {
417     cb = chunk;
418     chunk = null;
419     encoding = null;
420   } else if (util.isFunction(encoding)) {
421     cb = encoding;
422     encoding = null;
423   }
424
425   if (!util.isNullOrUndefined(chunk))
426     this.write(chunk, encoding);
427
428   // .end() fully uncorks
429   if (state.corked) {
430     state.corked = 1;
431     this.uncork();
432   }
433
434   // ignore unnecessary end() calls.
435   if (!state.ending && !state.finished)
436     endWritable(this, state, cb);
437 };
438
439
440 function needFinish(stream, state) {
441   return (state.ending &&
442           state.length === 0 &&
443           !state.finished &&
444           !state.writing);
445 }
446
447 function prefinish(stream, state) {
448   if (!state.prefinished) {
449     state.prefinished = true;
450     stream.emit('prefinish');
451   }
452 }
453
454 function finishMaybe(stream, state) {
455   var need = needFinish(stream, state);
456   if (need) {
457     if (state.pendingcb === 0) {
458       prefinish(stream, state);
459       state.finished = true;
460       stream.emit('finish');
461     } else
462       prefinish(stream, state);
463   }
464   return need;
465 }
466
467 function endWritable(stream, state, cb) {
468   state.ending = true;
469   finishMaybe(stream, state);
470   if (cb) {
471     if (state.finished)
472       process.nextTick(cb);
473     else
474       stream.once('finish', cb);
475   }
476   state.ended = true;
477 }