summaryrefslogtreecommitdiff
path: root/deps/npm/node_modules/readable-stream/lib/_stream_writable.js
diff options
context:
space:
mode:
Diffstat (limited to 'deps/npm/node_modules/readable-stream/lib/_stream_writable.js')
-rw-r--r--deps/npm/node_modules/readable-stream/lib/_stream_writable.js364
1 files changed, 180 insertions, 184 deletions
diff --git a/deps/npm/node_modules/readable-stream/lib/_stream_writable.js b/deps/npm/node_modules/readable-stream/lib/_stream_writable.js
index b3f4e85a2f..9abbad6bc2 100644
--- a/deps/npm/node_modules/readable-stream/lib/_stream_writable.js
+++ b/deps/npm/node_modules/readable-stream/lib/_stream_writable.js
@@ -18,35 +18,29 @@
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
-
// A bit simpler than readable streams.
// Implement an async ._write(chunk, encoding, cb), and it'll handle all
// the drain event emission and buffering.
-
'use strict';
-/*<replacement>*/
-
-var pna = require('process-nextick-args');
-/*</replacement>*/
-
module.exports = Writable;
-
/* <replacement> */
+
function WriteReq(chunk, encoding, cb) {
this.chunk = chunk;
this.encoding = encoding;
this.callback = cb;
this.next = null;
-}
-
-// It seems a linked list but it is not
+} // It seems a linked list but it is not
// there will be only 2 of these for each stream
+
+
function CorkedRequest(state) {
var _this = this;
this.next = null;
this.entry = null;
+
this.finish = function () {
onCorkedFinish(_this, state);
};
@@ -54,266 +48,243 @@ function CorkedRequest(state) {
/* </replacement> */
/*<replacement>*/
-var asyncWrite = !process.browser && ['v0.10', 'v0.9.'].indexOf(process.version.slice(0, 5)) > -1 ? setImmediate : pna.nextTick;
-/*</replacement>*/
-/*<replacement>*/
+
var Duplex;
/*</replacement>*/
Writable.WritableState = WritableState;
-
/*<replacement>*/
-var util = require('core-util-is');
-util.inherits = require('inherits');
-/*</replacement>*/
-/*<replacement>*/
var internalUtil = {
deprecate: require('util-deprecate')
};
/*</replacement>*/
/*<replacement>*/
+
var Stream = require('./internal/streams/stream');
/*</replacement>*/
-/*<replacement>*/
-var Buffer = require('safe-buffer').Buffer;
+var Buffer = require('buffer').Buffer;
+
var OurUint8Array = global.Uint8Array || function () {};
+
function _uint8ArrayToBuffer(chunk) {
return Buffer.from(chunk);
}
+
function _isUint8Array(obj) {
return Buffer.isBuffer(obj) || obj instanceof OurUint8Array;
}
-/*</replacement>*/
-
var destroyImpl = require('./internal/streams/destroy');
-util.inherits(Writable, Stream);
+var _require = require('./internal/streams/state'),
+ getHighWaterMark = _require.getHighWaterMark;
-function nop() {}
+var _require$codes = require('../errors').codes,
+ ERR_INVALID_ARG_TYPE = _require$codes.ERR_INVALID_ARG_TYPE,
+ ERR_METHOD_NOT_IMPLEMENTED = _require$codes.ERR_METHOD_NOT_IMPLEMENTED,
+ ERR_MULTIPLE_CALLBACK = _require$codes.ERR_MULTIPLE_CALLBACK,
+ ERR_STREAM_CANNOT_PIPE = _require$codes.ERR_STREAM_CANNOT_PIPE,
+ ERR_STREAM_DESTROYED = _require$codes.ERR_STREAM_DESTROYED,
+ ERR_STREAM_NULL_VALUES = _require$codes.ERR_STREAM_NULL_VALUES,
+ ERR_STREAM_WRITE_AFTER_END = _require$codes.ERR_STREAM_WRITE_AFTER_END,
+ ERR_UNKNOWN_ENCODING = _require$codes.ERR_UNKNOWN_ENCODING;
-function WritableState(options, stream) {
- Duplex = Duplex || require('./_stream_duplex');
+require('inherits')(Writable, Stream);
- options = options || {};
+function nop() {}
- // Duplex streams are both readable and writable, but share
+function WritableState(options, stream, isDuplex) {
+ Duplex = Duplex || require('./_stream_duplex');
+ options = options || {}; // Duplex streams are both readable and writable, but share
// the same options object.
// However, some cases require setting options to different
// values for the readable and the writable sides of the duplex stream.
// These options can be provided separately as readableXXX and writableXXX.
- var isDuplex = stream instanceof Duplex;
- // object stream flag to indicate whether or not this stream
+ if (typeof isDuplex !== 'boolean') isDuplex = stream instanceof Duplex; // object stream flag to indicate whether or not this stream
// contains buffers or objects.
- this.objectMode = !!options.objectMode;
- if (isDuplex) this.objectMode = this.objectMode || !!options.writableObjectMode;
-
- // the point at which write() starts returning false
+ this.objectMode = !!options.objectMode;
+ if (isDuplex) this.objectMode = this.objectMode || !!options.writableObjectMode; // the point at which write() starts returning false
// Note: 0 is a valid value, means that we always return false if
// the entire buffer is not flushed immediately on write()
- var hwm = options.highWaterMark;
- var writableHwm = options.writableHighWaterMark;
- var defaultHwm = this.objectMode ? 16 : 16 * 1024;
- if (hwm || hwm === 0) this.highWaterMark = hwm;else if (isDuplex && (writableHwm || writableHwm === 0)) this.highWaterMark = writableHwm;else this.highWaterMark = defaultHwm;
+ this.highWaterMark = getHighWaterMark(this, options, 'writableHighWaterMark', isDuplex); // if _final has been called
- // cast to ints.
- this.highWaterMark = Math.floor(this.highWaterMark);
+ this.finalCalled = false; // drain event flag.
- // if _final has been called
- this.finalCalled = false;
+ this.needDrain = false; // at the start of calling end()
- // drain event flag.
- this.needDrain = false;
- // at the start of calling end()
- this.ending = false;
- // when end() has been called, and returned
- this.ended = false;
- // when 'finish' is emitted
- this.finished = false;
+ this.ending = false; // when end() has been called, and returned
- // has it been destroyed
- this.destroyed = false;
+ this.ended = false; // when 'finish' is emitted
- // should we decode strings into buffers before passing to _write?
+ this.finished = false; // has it been destroyed
+
+ this.destroyed = false; // should we decode strings into buffers before passing to _write?
// this is here so that some node-core streams can optimize string
// handling at a lower level.
- var noDecode = options.decodeStrings === false;
- this.decodeStrings = !noDecode;
- // Crypto is kind of old and crusty. Historically, its default string
+ var noDecode = options.decodeStrings === false;
+ this.decodeStrings = !noDecode; // Crypto is kind of old and crusty. Historically, its default string
// encoding is 'binary' so we have to make this configurable.
// Everything else in the universe uses 'utf8', though.
- this.defaultEncoding = options.defaultEncoding || 'utf8';
- // not an actual buffer we keep track of, but a measurement
+ this.defaultEncoding = options.defaultEncoding || 'utf8'; // not an actual buffer we keep track of, but a measurement
// of how much we're waiting to get pushed to some underlying
// socket or file.
- this.length = 0;
- // a flag to see when we're in the middle of a write.
- this.writing = false;
+ this.length = 0; // a flag to see when we're in the middle of a write.
- // when true all writes will be buffered until .uncork() call
- this.corked = 0;
+ this.writing = false; // when true all writes will be buffered until .uncork() call
- // a flag to be able to tell if the onwrite cb is called immediately,
+ this.corked = 0; // a flag to be able to tell if the onwrite cb is called immediately,
// or on a later tick. We set this to true at first, because any
// actions that shouldn't happen until "later" should generally also
// not happen before the first write call.
- this.sync = true;
- // a flag to know if we're processing previously buffered items, which
+ this.sync = true; // a flag to know if we're processing previously buffered items, which
// may call the _write() callback in the same tick, so that we don't
// end up in an overlapped onwrite situation.
- this.bufferProcessing = false;
- // the callback that's passed to _write(chunk,cb)
+ this.bufferProcessing = false; // the callback that's passed to _write(chunk,cb)
+
this.onwrite = function (er) {
onwrite(stream, er);
- };
+ }; // the callback that the user supplies to write(chunk,encoding,cb)
- // the callback that the user supplies to write(chunk,encoding,cb)
- this.writecb = null;
- // the amount that is being written when _write is called.
- this.writelen = 0;
+ this.writecb = null; // the amount that is being written when _write is called.
+ this.writelen = 0;
this.bufferedRequest = null;
- this.lastBufferedRequest = null;
-
- // number of pending user-supplied write callbacks
+ this.lastBufferedRequest = null; // number of pending user-supplied write callbacks
// this must be 0 before 'finish' can be emitted
- this.pendingcb = 0;
- // emit prefinish if the only thing we're waiting for is _write cbs
+ this.pendingcb = 0; // emit prefinish if the only thing we're waiting for is _write cbs
// This is relevant for synchronous Transform streams
- this.prefinished = false;
- // True if the error was already emitted and should not be thrown again
- this.errorEmitted = false;
+ this.prefinished = false; // True if the error was already emitted and should not be thrown again
+
+ this.errorEmitted = false; // Should close be emitted on destroy. Defaults to true.
- // count buffered requests
- this.bufferedRequestCount = 0;
+ this.emitClose = options.emitClose !== false; // count buffered requests
- // allocate the first CorkedRequest, there is always
+ this.bufferedRequestCount = 0; // allocate the first CorkedRequest, there is always
// one allocated and free to use, and we maintain at most two
+
this.corkedRequestsFree = new CorkedRequest(this);
}
WritableState.prototype.getBuffer = function getBuffer() {
var current = this.bufferedRequest;
var out = [];
+
while (current) {
out.push(current);
current = current.next;
}
+
return out;
};
(function () {
try {
Object.defineProperty(WritableState.prototype, 'buffer', {
- get: internalUtil.deprecate(function () {
+ get: internalUtil.deprecate(function writableStateBufferGetter() {
return this.getBuffer();
}, '_writableState.buffer is deprecated. Use _writableState.getBuffer ' + 'instead.', 'DEP0003')
});
} catch (_) {}
-})();
-
-// Test _writableState for inheritance to account for Duplex streams,
+})(); // Test _writableState for inheritance to account for Duplex streams,
// whose prototype chain only points to Readable.
+
+
var realHasInstance;
+
if (typeof Symbol === 'function' && Symbol.hasInstance && typeof Function.prototype[Symbol.hasInstance] === 'function') {
realHasInstance = Function.prototype[Symbol.hasInstance];
Object.defineProperty(Writable, Symbol.hasInstance, {
- value: function (object) {
+ value: function value(object) {
if (realHasInstance.call(this, object)) return true;
if (this !== Writable) return false;
-
return object && object._writableState instanceof WritableState;
}
});
} else {
- realHasInstance = function (object) {
+ realHasInstance = function realHasInstance(object) {
return object instanceof this;
};
}
function Writable(options) {
- Duplex = Duplex || require('./_stream_duplex');
-
- // Writable ctor is applied to Duplexes, too.
+ Duplex = Duplex || require('./_stream_duplex'); // Writable ctor is applied to Duplexes, too.
// `realHasInstance` is necessary because using plain `instanceof`
// would return false, as no `_writableState` property is attached.
-
// Trying to use the custom `instanceof` for Writable here will also break the
// Node.js LazyTransform implementation, which has a non-trivial getter for
// `_writableState` that would lead to infinite recursion.
- if (!realHasInstance.call(Writable, this) && !(this instanceof Duplex)) {
- return new Writable(options);
- }
+ // Checking for a Stream.Duplex instance is faster here instead of inside
+ // the WritableState constructor, at least with V8 6.5
- this._writableState = new WritableState(options, this);
+ var isDuplex = this instanceof Duplex;
+ if (!isDuplex && !realHasInstance.call(Writable, this)) return new Writable(options);
+ this._writableState = new WritableState(options, this, isDuplex); // legacy.
- // legacy.
this.writable = true;
if (options) {
if (typeof options.write === 'function') this._write = options.write;
-
if (typeof options.writev === 'function') this._writev = options.writev;
-
if (typeof options.destroy === 'function') this._destroy = options.destroy;
-
if (typeof options.final === 'function') this._final = options.final;
}
Stream.call(this);
-}
+} // Otherwise people can pipe Writable streams, which is just wrong.
+
-// Otherwise people can pipe Writable streams, which is just wrong.
Writable.prototype.pipe = function () {
- this.emit('error', new Error('Cannot pipe, not readable'));
+ this.emit('error', new ERR_STREAM_CANNOT_PIPE());
};
function writeAfterEnd(stream, cb) {
- var er = new Error('write after end');
- // TODO: defer error events consistently everywhere, not just the cb
- stream.emit('error', er);
- pna.nextTick(cb, er);
-}
+ var er = new ERR_STREAM_WRITE_AFTER_END(); // TODO: defer error events consistently everywhere, not just the cb
-// Checks that a user-supplied chunk is valid, especially for the particular
+ stream.emit('error', er);
+ process.nextTick(cb, er);
+} // Checks that a user-supplied chunk is valid, especially for the particular
// mode the stream is in. Currently this means that `null` is never accepted
// and undefined/non-string values are only allowed in object mode.
+
+
function validChunk(stream, state, chunk, cb) {
- var valid = true;
- var er = false;
+ var er;
if (chunk === null) {
- er = new TypeError('May not write null values to stream');
- } else if (typeof chunk !== 'string' && chunk !== undefined && !state.objectMode) {
- er = new TypeError('Invalid non-string/buffer chunk');
+ er = new ERR_STREAM_NULL_VALUES();
+ } else if (typeof chunk !== 'string' && !state.objectMode) {
+ er = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer'], chunk);
}
+
if (er) {
stream.emit('error', er);
- pna.nextTick(cb, er);
- valid = false;
+ process.nextTick(cb, er);
+ return false;
}
- return valid;
+
+ return true;
}
Writable.prototype.write = function (chunk, encoding, cb) {
var state = this._writableState;
var ret = false;
+
var isBuf = !state.objectMode && _isUint8Array(chunk);
if (isBuf && !Buffer.isBuffer(chunk)) {
@@ -326,21 +297,16 @@ Writable.prototype.write = function (chunk, encoding, cb) {
}
if (isBuf) encoding = 'buffer';else if (!encoding) encoding = state.defaultEncoding;
-
if (typeof cb !== 'function') cb = nop;
-
- if (state.ended) writeAfterEnd(this, cb);else if (isBuf || validChunk(this, state, chunk, cb)) {
+ if (state.ending) writeAfterEnd(this, cb);else if (isBuf || validChunk(this, state, chunk, cb)) {
state.pendingcb++;
ret = writeOrBuffer(this, state, isBuf, chunk, encoding, cb);
}
-
return ret;
};
Writable.prototype.cork = function () {
- var state = this._writableState;
-
- state.corked++;
+ this._writableState.corked++;
};
Writable.prototype.uncork = function () {
@@ -348,23 +314,33 @@ Writable.prototype.uncork = function () {
if (state.corked) {
state.corked--;
-
- if (!state.writing && !state.corked && !state.finished && !state.bufferProcessing && state.bufferedRequest) clearBuffer(this, state);
+ if (!state.writing && !state.corked && !state.bufferProcessing && state.bufferedRequest) clearBuffer(this, state);
}
};
Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) {
// node::ParseEncoding() requires lower case.
if (typeof encoding === 'string') encoding = encoding.toLowerCase();
- if (!(['hex', 'utf8', 'utf-8', 'ascii', 'binary', 'base64', 'ucs2', 'ucs-2', 'utf16le', 'utf-16le', 'raw'].indexOf((encoding + '').toLowerCase()) > -1)) throw new TypeError('Unknown encoding: ' + encoding);
+ if (!(['hex', 'utf8', 'utf-8', 'ascii', 'binary', 'base64', 'ucs2', 'ucs-2', 'utf16le', 'utf-16le', 'raw'].indexOf((encoding + '').toLowerCase()) > -1)) throw new ERR_UNKNOWN_ENCODING(encoding);
this._writableState.defaultEncoding = encoding;
return this;
};
+Object.defineProperty(Writable.prototype, 'writableBuffer', {
+ // making it explicit this property is not enumerable
+ // because otherwise some prototype manipulation in
+ // userland will fail
+ enumerable: false,
+ get: function get() {
+ return this._writableState && this._writableState.getBuffer();
+ }
+});
+
function decodeChunk(state, chunk, encoding) {
if (!state.objectMode && state.decodeStrings !== false && typeof chunk === 'string') {
chunk = Buffer.from(chunk, encoding);
}
+
return chunk;
}
@@ -373,29 +349,28 @@ Object.defineProperty(Writable.prototype, 'writableHighWaterMark', {
// because otherwise some prototype manipulation in
// userland will fail
enumerable: false,
- get: function () {
+ get: function get() {
return this._writableState.highWaterMark;
}
-});
-
-// if we're already writing something, then just put this
+}); // if we're already writing something, then just put this
// in the queue, and wait our turn. Otherwise, call _write
// If we return false, then we need a drain event, so set that flag.
+
function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {
if (!isBuf) {
var newChunk = decodeChunk(state, chunk, encoding);
+
if (chunk !== newChunk) {
isBuf = true;
encoding = 'buffer';
chunk = newChunk;
}
}
- var len = state.objectMode ? 1 : chunk.length;
+ var len = state.objectMode ? 1 : chunk.length;
state.length += len;
+ var ret = state.length < state.highWaterMark; // we must ensure that previous needDrain will not be reset to false.
- var ret = state.length < state.highWaterMark;
- // we must ensure that previous needDrain will not be reset to false.
if (!ret) state.needDrain = true;
if (state.writing || state.corked) {
@@ -407,11 +382,13 @@ function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {
callback: cb,
next: null
};
+
if (last) {
last.next = state.lastBufferedRequest;
} else {
state.bufferedRequest = state.lastBufferedRequest;
}
+
state.bufferedRequestCount += 1;
} else {
doWrite(stream, state, false, len, chunk, encoding, cb);
@@ -425,7 +402,7 @@ function doWrite(stream, state, writev, len, chunk, encoding, cb) {
state.writecb = cb;
state.writing = true;
state.sync = true;
- if (writev) stream._writev(chunk, state.onwrite);else stream._write(chunk, encoding, state.onwrite);
+ if (state.destroyed) state.onwrite(new ERR_STREAM_DESTROYED('write'));else if (writev) stream._writev(chunk, state.onwrite);else stream._write(chunk, encoding, state.onwrite);
state.sync = false;
}
@@ -435,10 +412,10 @@ function onwriteError(stream, state, sync, er, cb) {
if (sync) {
// defer the callback if we are being called synchronously
// to avoid piling up things on the stack
- pna.nextTick(cb, er);
- // this can emit finish, and it will always happen
+ process.nextTick(cb, er); // this can emit finish, and it will always happen
// after error
- pna.nextTick(finishMaybe, stream, state);
+
+ process.nextTick(finishMaybe, stream, state);
stream._writableState.errorEmitted = true;
stream.emit('error', er);
} else {
@@ -446,9 +423,9 @@ function onwriteError(stream, state, sync, er, cb) {
// it is async
cb(er);
stream._writableState.errorEmitted = true;
- stream.emit('error', er);
- // this can emit finish, but finish must
+ stream.emit('error', er); // this can emit finish, but finish must
// always follow error
+
finishMaybe(stream, state);
}
}
@@ -464,9 +441,8 @@ function onwrite(stream, er) {
var state = stream._writableState;
var sync = state.sync;
var cb = state.writecb;
-
+ if (typeof cb !== 'function') throw new ERR_MULTIPLE_CALLBACK();
onwriteStateUpdate(state);
-
if (er) onwriteError(stream, state, sync, er, cb);else {
// Check if we're actually ready to finish, but don't emit yet
var finished = needFinish(state);
@@ -476,9 +452,7 @@ function onwrite(stream, er) {
}
if (sync) {
- /*<replacement>*/
- asyncWrite(afterWrite, stream, state, finished, cb);
- /*</replacement>*/
+ process.nextTick(afterWrite, stream, state, finished, cb);
} else {
afterWrite(stream, state, finished, cb);
}
@@ -490,19 +464,19 @@ function afterWrite(stream, state, finished, cb) {
state.pendingcb--;
cb();
finishMaybe(stream, state);
-}
-
-// Must force callback to be called on nextTick, so that we don't
+} // Must force callback to be called on nextTick, so that we don't
// emit 'drain' before the write() consumer gets the 'false' return
// value, and has a chance to attach a 'drain' listener.
+
+
function onwriteDrain(stream, state) {
if (state.length === 0 && state.needDrain) {
state.needDrain = false;
stream.emit('drain');
}
-}
+} // if there's something in the buffer waiting, then process it
+
-// if there's something in the buffer waiting, then process it
function clearBuffer(stream, state) {
state.bufferProcessing = true;
var entry = state.bufferedRequest;
@@ -513,29 +487,30 @@ function clearBuffer(stream, state) {
var buffer = new Array(l);
var holder = state.corkedRequestsFree;
holder.entry = entry;
-
var count = 0;
var allBuffers = true;
+
while (entry) {
buffer[count] = entry;
if (!entry.isBuf) allBuffers = false;
entry = entry.next;
count += 1;
}
- buffer.allBuffers = allBuffers;
-
- doWrite(stream, state, true, state.length, buffer, '', holder.finish);
- // doWrite is almost always async, defer these to save a bit of time
+ buffer.allBuffers = allBuffers;
+ doWrite(stream, state, true, state.length, buffer, '', holder.finish); // doWrite is almost always async, defer these to save a bit of time
// as the hot path ends with doWrite
+
state.pendingcb++;
state.lastBufferedRequest = null;
+
if (holder.next) {
state.corkedRequestsFree = holder.next;
holder.next = null;
} else {
state.corkedRequestsFree = new CorkedRequest(state);
}
+
state.bufferedRequestCount = 0;
} else {
// Slow case, write chunks one-by-one
@@ -544,14 +519,13 @@ function clearBuffer(stream, state) {
var encoding = entry.encoding;
var cb = entry.callback;
var len = state.objectMode ? 1 : chunk.length;
-
doWrite(stream, state, false, len, chunk, encoding, cb);
entry = entry.next;
- state.bufferedRequestCount--;
- // if we didn't call the onwrite immediately, then
+ state.bufferedRequestCount--; // if we didn't call the onwrite immediately, then
// it means that we need to wait until it does.
// also, that means that the chunk and cb are currently
// being processed, so move the buffer counter past them.
+
if (state.writing) {
break;
}
@@ -565,7 +539,7 @@ function clearBuffer(stream, state) {
}
Writable.prototype._write = function (chunk, encoding, cb) {
- cb(new Error('_write() is not implemented'));
+ cb(new ERR_METHOD_NOT_IMPLEMENTED('_write()'));
};
Writable.prototype._writev = null;
@@ -582,38 +556,52 @@ Writable.prototype.end = function (chunk, encoding, cb) {
encoding = null;
}
- if (chunk !== null && chunk !== undefined) this.write(chunk, encoding);
+ if (chunk !== null && chunk !== undefined) this.write(chunk, encoding); // .end() fully uncorks
- // .end() fully uncorks
if (state.corked) {
state.corked = 1;
this.uncork();
- }
+ } // ignore unnecessary end() calls.
+
- // ignore unnecessary end() calls.
- if (!state.ending && !state.finished) endWritable(this, state, cb);
+ if (!state.ending) endWritable(this, state, cb);
+ return this;
};
+Object.defineProperty(Writable.prototype, 'writableLength', {
+ // making it explicit this property is not enumerable
+ // because otherwise some prototype manipulation in
+ // userland will fail
+ enumerable: false,
+ get: function get() {
+ return this._writableState.length;
+ }
+});
+
function needFinish(state) {
return state.ending && state.length === 0 && state.bufferedRequest === null && !state.finished && !state.writing;
}
+
function callFinal(stream, state) {
stream._final(function (err) {
state.pendingcb--;
+
if (err) {
stream.emit('error', err);
}
+
state.prefinished = true;
stream.emit('prefinish');
finishMaybe(stream, state);
});
}
+
function prefinish(stream, state) {
if (!state.prefinished && !state.finalCalled) {
- if (typeof stream._final === 'function') {
+ if (typeof stream._final === 'function' && !state.destroyed) {
state.pendingcb++;
state.finalCalled = true;
- pna.nextTick(callFinal, stream, state);
+ process.nextTick(callFinal, stream, state);
} else {
state.prefinished = true;
stream.emit('prefinish');
@@ -623,22 +611,27 @@ function prefinish(stream, state) {
function finishMaybe(stream, state) {
var need = needFinish(state);
+
if (need) {
prefinish(stream, state);
+
if (state.pendingcb === 0) {
state.finished = true;
stream.emit('finish');
}
}
+
return need;
}
function endWritable(stream, state, cb) {
state.ending = true;
finishMaybe(stream, state);
+
if (cb) {
- if (state.finished) pna.nextTick(cb);else stream.once('finish', cb);
+ if (state.finished) process.nextTick(cb);else stream.once('finish', cb);
}
+
state.ended = true;
stream.writable = false;
}
@@ -646,42 +639,45 @@ function endWritable(stream, state, cb) {
function onCorkedFinish(corkReq, state, err) {
var entry = corkReq.entry;
corkReq.entry = null;
+
while (entry) {
var cb = entry.callback;
state.pendingcb--;
cb(err);
entry = entry.next;
- }
- if (state.corkedRequestsFree) {
- state.corkedRequestsFree.next = corkReq;
- } else {
- state.corkedRequestsFree = corkReq;
- }
+ } // reuse the free corkReq.
+
+
+ state.corkedRequestsFree.next = corkReq;
}
Object.defineProperty(Writable.prototype, 'destroyed', {
- get: function () {
+ // making it explicit this property is not enumerable
+ // because otherwise some prototype manipulation in
+ // userland will fail
+ enumerable: false,
+ get: function get() {
if (this._writableState === undefined) {
return false;
}
+
return this._writableState.destroyed;
},
- set: function (value) {
+ set: function set(value) {
// we ignore the value if the stream
// has not been initialized yet
if (!this._writableState) {
return;
- }
-
- // backward compatibility, the user is explicitly
+ } // backward compatibility, the user is explicitly
// managing destroyed
+
+
this._writableState.destroyed = value;
}
});
-
Writable.prototype.destroy = destroyImpl.destroy;
Writable.prototype._undestroy = destroyImpl.undestroy;
+
Writable.prototype._destroy = function (err, cb) {
- this.end();
cb(err);
}; \ No newline at end of file