Initial commit
[yaffs-website] / node_modules / glob-stream / node_modules / readable-stream / lib / _stream_writable.js
1 // Copyright Joyent, Inc. and other Node contributors.
2 //
3 // Permission is hereby granted, free of charge, to any person obtaining a
4 // copy of this software and associated documentation files (the
5 // "Software"), to deal in the Software without restriction, including
6 // without limitation the rights to use, copy, modify, merge, publish,
7 // distribute, sublicense, and/or sell copies of the Software, and to permit
8 // persons to whom the Software is furnished to do so, subject to the
9 // following conditions:
10 //
11 // The above copyright notice and this permission notice shall be included
12 // in all copies or substantial portions of the Software.
13 //
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17 // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18 // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19 // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20 // USE OR OTHER DEALINGS IN THE SOFTWARE.
21
22 // A bit simpler than readable streams.
23 // Implement an async ._write(chunk, cb), and it'll handle all
24 // the drain event emission and buffering.
25
26 module.exports = Writable;
27
28 /*<replacement>*/
29 var Buffer = require('buffer').Buffer;
30 /*</replacement>*/
31
32 Writable.WritableState = WritableState;
33
34
35 /*<replacement>*/
36 var util = require('core-util-is');
37 util.inherits = require('inherits');
38 /*</replacement>*/
39
40 var Stream = require('stream');
41
42 util.inherits(Writable, Stream);
43
44 function WriteReq(chunk, encoding, cb) {
45   this.chunk = chunk;
46   this.encoding = encoding;
47   this.callback = cb;
48 }
49
50 function WritableState(options, stream) {
51   options = options || {};
52
53   // the point at which write() starts returning false
54   // Note: 0 is a valid value, means that we always return false if
55   // the entire buffer is not flushed immediately on write()
56   var hwm = options.highWaterMark;
57   this.highWaterMark = (hwm || hwm === 0) ? hwm : 16 * 1024;
58
59   // object stream flag to indicate whether or not this stream
60   // contains buffers or objects.
61   this.objectMode = !!options.objectMode;
62
63   // cast to ints.
64   this.highWaterMark = ~~this.highWaterMark;
65
66   this.needDrain = false;
67   // at the start of calling end()
68   this.ending = false;
69   // when end() has been called, and returned
70   this.ended = false;
71   // when 'finish' is emitted
72   this.finished = false;
73
74   // should we decode strings into buffers before passing to _write?
75   // this is here so that some node-core streams can optimize string
76   // handling at a lower level.
77   var noDecode = options.decodeStrings === false;
78   this.decodeStrings = !noDecode;
79
80   // Crypto is kind of old and crusty.  Historically, its default string
81   // encoding is 'binary' so we have to make this configurable.
82   // Everything else in the universe uses 'utf8', though.
83   this.defaultEncoding = options.defaultEncoding || 'utf8';
84
85   // not an actual buffer we keep track of, but a measurement
86   // of how much we're waiting to get pushed to some underlying
87   // socket or file.
88   this.length = 0;
89
90   // a flag to see when we're in the middle of a write.
91   this.writing = false;
92
93   // a flag to be able to tell if the onwrite cb is called immediately,
94   // or on a later tick.  We set this to true at first, becuase any
95   // actions that shouldn't happen until "later" should generally also
96   // not happen before the first write call.
97   this.sync = true;
98
99   // a flag to know if we're processing previously buffered items, which
100   // may call the _write() callback in the same tick, so that we don't
101   // end up in an overlapped onwrite situation.
102   this.bufferProcessing = false;
103
104   // the callback that's passed to _write(chunk,cb)
105   this.onwrite = function(er) {
106     onwrite(stream, er);
107   };
108
109   // the callback that the user supplies to write(chunk,encoding,cb)
110   this.writecb = null;
111
112   // the amount that is being written when _write is called.
113   this.writelen = 0;
114
115   this.buffer = [];
116
117   // True if the error was already emitted and should not be thrown again
118   this.errorEmitted = false;
119 }
120
121 function Writable(options) {
122   var Duplex = require('./_stream_duplex');
123
124   // Writable ctor is applied to Duplexes, though they're not
125   // instanceof Writable, they're instanceof Readable.
126   if (!(this instanceof Writable) && !(this instanceof Duplex))
127     return new Writable(options);
128
129   this._writableState = new WritableState(options, this);
130
131   // legacy.
132   this.writable = true;
133
134   Stream.call(this);
135 }
136
137 // Otherwise people can pipe Writable streams, which is just wrong.
138 Writable.prototype.pipe = function() {
139   this.emit('error', new Error('Cannot pipe. Not readable.'));
140 };
141
142
143 function writeAfterEnd(stream, state, cb) {
144   var er = new Error('write after end');
145   // TODO: defer error events consistently everywhere, not just the cb
146   stream.emit('error', er);
147   process.nextTick(function() {
148     cb(er);
149   });
150 }
151
152 // If we get something that is not a buffer, string, null, or undefined,
153 // and we're not in objectMode, then that's an error.
154 // Otherwise stream chunks are all considered to be of length=1, and the
155 // watermarks determine how many objects to keep in the buffer, rather than
156 // how many bytes or characters.
157 function validChunk(stream, state, chunk, cb) {
158   var valid = true;
159   if (!Buffer.isBuffer(chunk) &&
160       'string' !== typeof chunk &&
161       chunk !== null &&
162       chunk !== undefined &&
163       !state.objectMode) {
164     var er = new TypeError('Invalid non-string/buffer chunk');
165     stream.emit('error', er);
166     process.nextTick(function() {
167       cb(er);
168     });
169     valid = false;
170   }
171   return valid;
172 }
173
174 Writable.prototype.write = function(chunk, encoding, cb) {
175   var state = this._writableState;
176   var ret = false;
177
178   if (typeof encoding === 'function') {
179     cb = encoding;
180     encoding = null;
181   }
182
183   if (Buffer.isBuffer(chunk))
184     encoding = 'buffer';
185   else if (!encoding)
186     encoding = state.defaultEncoding;
187
188   if (typeof cb !== 'function')
189     cb = function() {};
190
191   if (state.ended)
192     writeAfterEnd(this, state, cb);
193   else if (validChunk(this, state, chunk, cb))
194     ret = writeOrBuffer(this, state, chunk, encoding, cb);
195
196   return ret;
197 };
198
199 function decodeChunk(state, chunk, encoding) {
200   if (!state.objectMode &&
201       state.decodeStrings !== false &&
202       typeof chunk === 'string') {
203     chunk = new Buffer(chunk, encoding);
204   }
205   return chunk;
206 }
207
208 // if we're already writing something, then just put this
209 // in the queue, and wait our turn.  Otherwise, call _write
210 // If we return false, then we need a drain event, so set that flag.
211 function writeOrBuffer(stream, state, chunk, encoding, cb) {
212   chunk = decodeChunk(state, chunk, encoding);
213   if (Buffer.isBuffer(chunk))
214     encoding = 'buffer';
215   var len = state.objectMode ? 1 : chunk.length;
216
217   state.length += len;
218
219   var ret = state.length < state.highWaterMark;
220   // we must ensure that previous needDrain will not be reset to false.
221   if (!ret)
222     state.needDrain = true;
223
224   if (state.writing)
225     state.buffer.push(new WriteReq(chunk, encoding, cb));
226   else
227     doWrite(stream, state, len, chunk, encoding, cb);
228
229   return ret;
230 }
231
232 function doWrite(stream, state, len, chunk, encoding, cb) {
233   state.writelen = len;
234   state.writecb = cb;
235   state.writing = true;
236   state.sync = true;
237   stream._write(chunk, encoding, state.onwrite);
238   state.sync = false;
239 }
240
241 function onwriteError(stream, state, sync, er, cb) {
242   if (sync)
243     process.nextTick(function() {
244       cb(er);
245     });
246   else
247     cb(er);
248
249   stream._writableState.errorEmitted = true;
250   stream.emit('error', er);
251 }
252
253 function onwriteStateUpdate(state) {
254   state.writing = false;
255   state.writecb = null;
256   state.length -= state.writelen;
257   state.writelen = 0;
258 }
259
260 function onwrite(stream, er) {
261   var state = stream._writableState;
262   var sync = state.sync;
263   var cb = state.writecb;
264
265   onwriteStateUpdate(state);
266
267   if (er)
268     onwriteError(stream, state, sync, er, cb);
269   else {
270     // Check if we're actually ready to finish, but don't emit yet
271     var finished = needFinish(stream, state);
272
273     if (!finished && !state.bufferProcessing && state.buffer.length)
274       clearBuffer(stream, state);
275
276     if (sync) {
277       process.nextTick(function() {
278         afterWrite(stream, state, finished, cb);
279       });
280     } else {
281       afterWrite(stream, state, finished, cb);
282     }
283   }
284 }
285
286 function afterWrite(stream, state, finished, cb) {
287   if (!finished)
288     onwriteDrain(stream, state);
289   cb();
290   if (finished)
291     finishMaybe(stream, state);
292 }
293
294 // Must force callback to be called on nextTick, so that we don't
295 // emit 'drain' before the write() consumer gets the 'false' return
296 // value, and has a chance to attach a 'drain' listener.
297 function onwriteDrain(stream, state) {
298   if (state.length === 0 && state.needDrain) {
299     state.needDrain = false;
300     stream.emit('drain');
301   }
302 }
303
304
305 // if there's something in the buffer waiting, then process it
306 function clearBuffer(stream, state) {
307   state.bufferProcessing = true;
308
309   for (var c = 0; c < state.buffer.length; c++) {
310     var entry = state.buffer[c];
311     var chunk = entry.chunk;
312     var encoding = entry.encoding;
313     var cb = entry.callback;
314     var len = state.objectMode ? 1 : chunk.length;
315
316     doWrite(stream, state, len, chunk, encoding, cb);
317
318     // if we didn't call the onwrite immediately, then
319     // it means that we need to wait until it does.
320     // also, that means that the chunk and cb are currently
321     // being processed, so move the buffer counter past them.
322     if (state.writing) {
323       c++;
324       break;
325     }
326   }
327
328   state.bufferProcessing = false;
329   if (c < state.buffer.length)
330     state.buffer = state.buffer.slice(c);
331   else
332     state.buffer.length = 0;
333 }
334
335 Writable.prototype._write = function(chunk, encoding, cb) {
336   cb(new Error('not implemented'));
337 };
338
339 Writable.prototype.end = function(chunk, encoding, cb) {
340   var state = this._writableState;
341
342   if (typeof chunk === 'function') {
343     cb = chunk;
344     chunk = null;
345     encoding = null;
346   } else if (typeof encoding === 'function') {
347     cb = encoding;
348     encoding = null;
349   }
350
351   if (typeof chunk !== 'undefined' && chunk !== null)
352     this.write(chunk, encoding);
353
354   // ignore unnecessary end() calls.
355   if (!state.ending && !state.finished)
356     endWritable(this, state, cb);
357 };
358
359
360 function needFinish(stream, state) {
361   return (state.ending &&
362           state.length === 0 &&
363           !state.finished &&
364           !state.writing);
365 }
366
367 function finishMaybe(stream, state) {
368   var need = needFinish(stream, state);
369   if (need) {
370     state.finished = true;
371     stream.emit('finish');
372   }
373   return need;
374 }
375
376 function endWritable(stream, state, cb) {
377   state.ending = true;
378   finishMaybe(stream, state);
379   if (cb) {
380     if (state.finished)
381       process.nextTick(cb);
382     else
383       stream.once('finish', cb);
384   }
385   state.ended = true;
386 }