1 // Copyright Joyent, Inc. and other Node contributors.
3 // Permission is hereby granted, free of charge, to any person obtaining a
4 // copy of this software and associated documentation files (the
5 // "Software"), to deal in the Software without restriction, including
6 // without limitation the rights to use, copy, modify, merge, publish,
7 // distribute, sublicense, and/or sell copies of the Software, and to permit
8 // persons to whom the Software is furnished to do so, subject to the
9 // following conditions:
11 // The above copyright notice and this permission notice shall be included
12 // in all copies or substantial portions of the Software.
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17 // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18 // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19 // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20 // USE OR OTHER DEALINGS IN THE SOFTWARE.
22 // A bit simpler than readable streams.
23 // Implement an async ._write(chunk, cb), and it'll handle all
24 // the drain event emission and buffering.
26 module.exports = Writable;
29 var Buffer = require('buffer').Buffer;
32 Writable.WritableState = WritableState;
36 var util = require('core-util-is');
37 util.inherits = require('inherits');
40 var Stream = require('stream');
42 util.inherits(Writable, Stream);
44 function WriteReq(chunk, encoding, cb) {
46 this.encoding = encoding;
50 function WritableState(options, stream) {
51 var Duplex = require('./_stream_duplex');
53 options = options || {};
55 // the point at which write() starts returning false
56 // Note: 0 is a valid value, means that we always return false if
57 // the entire buffer is not flushed immediately on write()
58 var hwm = options.highWaterMark;
59 var defaultHwm = options.objectMode ? 16 : 16 * 1024;
60 this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;
62 // object stream flag to indicate whether or not this stream
63 // contains buffers or objects.
64 this.objectMode = !!options.objectMode;
66 if (stream instanceof Duplex)
67 this.objectMode = this.objectMode || !!options.writableObjectMode;
70 this.highWaterMark = ~~this.highWaterMark;
72 this.needDrain = false;
73 // at the start of calling end()
75 // when end() has been called, and returned
77 // when 'finish' is emitted
78 this.finished = false;
80 // should we decode strings into buffers before passing to _write?
81 // this is here so that some node-core streams can optimize string
82 // handling at a lower level.
83 var noDecode = options.decodeStrings === false;
84 this.decodeStrings = !noDecode;
86 // Crypto is kind of old and crusty. Historically, its default string
87 // encoding is 'binary' so we have to make this configurable.
88 // Everything else in the universe uses 'utf8', though.
89 this.defaultEncoding = options.defaultEncoding || 'utf8';
91 // not an actual buffer we keep track of, but a measurement
92 // of how much we're waiting to get pushed to some underlying
96 // a flag to see when we're in the middle of a write.
99 // when true all writes will be buffered until .uncork() call
102 // a flag to be able to tell if the onwrite cb is called immediately,
103 // or on a later tick. We set this to true at first, because any
104 // actions that shouldn't happen until "later" should generally also
105 // not happen before the first write call.
108 // a flag to know if we're processing previously buffered items, which
109 // may call the _write() callback in the same tick, so that we don't
110 // end up in an overlapped onwrite situation.
111 this.bufferProcessing = false;
113 // the callback that's passed to _write(chunk,cb)
114 this.onwrite = function(er) {
118 // the callback that the user supplies to write(chunk,encoding,cb)
121 // the amount that is being written when _write is called.
126 // number of pending user-supplied write callbacks
127 // this must be 0 before 'finish' can be emitted
130 // emit prefinish if the only thing we're waiting for is _write cbs
131 // This is relevant for synchronous Transform streams
132 this.prefinished = false;
134 // True if the error was already emitted and should not be thrown again
135 this.errorEmitted = false;
138 function Writable(options) {
139 var Duplex = require('./_stream_duplex');
141 // Writable ctor is applied to Duplexes, though they're not
142 // instanceof Writable, they're instanceof Readable.
143 if (!(this instanceof Writable) && !(this instanceof Duplex))
144 return new Writable(options);
146 this._writableState = new WritableState(options, this);
149 this.writable = true;
154 // Otherwise people can pipe Writable streams, which is just wrong.
155 Writable.prototype.pipe = function() {
156 this.emit('error', new Error('Cannot pipe. Not readable.'));
160 function writeAfterEnd(stream, state, cb) {
161 var er = new Error('write after end');
162 // TODO: defer error events consistently everywhere, not just the cb
163 stream.emit('error', er);
164 process.nextTick(function() {
169 // If we get something that is not a buffer, string, null, or undefined,
170 // and we're not in objectMode, then that's an error.
171 // Otherwise stream chunks are all considered to be of length=1, and the
172 // watermarks determine how many objects to keep in the buffer, rather than
173 // how many bytes or characters.
174 function validChunk(stream, state, chunk, cb) {
176 if (!util.isBuffer(chunk) &&
177 !util.isString(chunk) &&
178 !util.isNullOrUndefined(chunk) &&
180 var er = new TypeError('Invalid non-string/buffer chunk');
181 stream.emit('error', er);
182 process.nextTick(function() {
190 Writable.prototype.write = function(chunk, encoding, cb) {
191 var state = this._writableState;
194 if (util.isFunction(encoding)) {
199 if (util.isBuffer(chunk))
202 encoding = state.defaultEncoding;
204 if (!util.isFunction(cb))
208 writeAfterEnd(this, state, cb);
209 else if (validChunk(this, state, chunk, cb)) {
211 ret = writeOrBuffer(this, state, chunk, encoding, cb);
217 Writable.prototype.cork = function() {
218 var state = this._writableState;
223 Writable.prototype.uncork = function() {
224 var state = this._writableState;
229 if (!state.writing &&
232 !state.bufferProcessing &&
234 clearBuffer(this, state);
238 function decodeChunk(state, chunk, encoding) {
239 if (!state.objectMode &&
240 state.decodeStrings !== false &&
241 util.isString(chunk)) {
242 chunk = new Buffer(chunk, encoding);
247 // if we're already writing something, then just put this
248 // in the queue, and wait our turn. Otherwise, call _write
249 // If we return false, then we need a drain event, so set that flag.
250 function writeOrBuffer(stream, state, chunk, encoding, cb) {
251 chunk = decodeChunk(state, chunk, encoding);
252 if (util.isBuffer(chunk))
254 var len = state.objectMode ? 1 : chunk.length;
258 var ret = state.length < state.highWaterMark;
259 // we must ensure that previous needDrain will not be reset to false.
261 state.needDrain = true;
263 if (state.writing || state.corked)
264 state.buffer.push(new WriteReq(chunk, encoding, cb));
266 doWrite(stream, state, false, len, chunk, encoding, cb);
271 function doWrite(stream, state, writev, len, chunk, encoding, cb) {
272 state.writelen = len;
274 state.writing = true;
277 stream._writev(chunk, state.onwrite);
279 stream._write(chunk, encoding, state.onwrite);
283 function onwriteError(stream, state, sync, er, cb) {
285 process.nextTick(function() {
294 stream._writableState.errorEmitted = true;
295 stream.emit('error', er);
298 function onwriteStateUpdate(state) {
299 state.writing = false;
300 state.writecb = null;
301 state.length -= state.writelen;
305 function onwrite(stream, er) {
306 var state = stream._writableState;
307 var sync = state.sync;
308 var cb = state.writecb;
310 onwriteStateUpdate(state);
313 onwriteError(stream, state, sync, er, cb);
315 // Check if we're actually ready to finish, but don't emit yet
316 var finished = needFinish(stream, state);
320 !state.bufferProcessing &&
321 state.buffer.length) {
322 clearBuffer(stream, state);
326 process.nextTick(function() {
327 afterWrite(stream, state, finished, cb);
330 afterWrite(stream, state, finished, cb);
335 function afterWrite(stream, state, finished, cb) {
337 onwriteDrain(stream, state);
340 finishMaybe(stream, state);
343 // Must force callback to be called on nextTick, so that we don't
344 // emit 'drain' before the write() consumer gets the 'false' return
345 // value, and has a chance to attach a 'drain' listener.
346 function onwriteDrain(stream, state) {
347 if (state.length === 0 && state.needDrain) {
348 state.needDrain = false;
349 stream.emit('drain');
354 // if there's something in the buffer waiting, then process it
355 function clearBuffer(stream, state) {
356 state.bufferProcessing = true;
358 if (stream._writev && state.buffer.length > 1) {
359 // Fast case, write everything using _writev()
361 for (var c = 0; c < state.buffer.length; c++)
362 cbs.push(state.buffer[c].callback);
364 // count the one we are adding, as well.
365 // TODO(isaacs) clean this up
367 doWrite(stream, state, true, state.length, state.buffer, '', function(err) {
368 for (var i = 0; i < cbs.length; i++) {
377 // Slow case, write chunks one-by-one
378 for (var c = 0; c < state.buffer.length; c++) {
379 var entry = state.buffer[c];
380 var chunk = entry.chunk;
381 var encoding = entry.encoding;
382 var cb = entry.callback;
383 var len = state.objectMode ? 1 : chunk.length;
385 doWrite(stream, state, false, len, chunk, encoding, cb);
387 // if we didn't call the onwrite immediately, then
388 // it means that we need to wait until it does.
389 // also, that means that the chunk and cb are currently
390 // being processed, so move the buffer counter past them.
397 if (c < state.buffer.length)
398 state.buffer = state.buffer.slice(c);
400 state.buffer.length = 0;
403 state.bufferProcessing = false;
406 Writable.prototype._write = function(chunk, encoding, cb) {
407 cb(new Error('not implemented'));
411 Writable.prototype._writev = null;
413 Writable.prototype.end = function(chunk, encoding, cb) {
414 var state = this._writableState;
416 if (util.isFunction(chunk)) {
420 } else if (util.isFunction(encoding)) {
425 if (!util.isNullOrUndefined(chunk))
426 this.write(chunk, encoding);
428 // .end() fully uncorks
434 // ignore unnecessary end() calls.
435 if (!state.ending && !state.finished)
436 endWritable(this, state, cb);
440 function needFinish(stream, state) {
441 return (state.ending &&
442 state.length === 0 &&
447 function prefinish(stream, state) {
448 if (!state.prefinished) {
449 state.prefinished = true;
450 stream.emit('prefinish');
454 function finishMaybe(stream, state) {
455 var need = needFinish(stream, state);
457 if (state.pendingcb === 0) {
458 prefinish(stream, state);
459 state.finished = true;
460 stream.emit('finish');
462 prefinish(stream, state);
467 function endWritable(stream, state, cb) {
469 finishMaybe(stream, state);
472 process.nextTick(cb);
474 stream.once('finish', cb);