diff options
Diffstat (limited to 'deps/npm/node_modules/readable-stream/lib/_stream_writable.js')
-rw-r--r-- | deps/npm/node_modules/readable-stream/lib/_stream_writable.js | 37 |
1 files changed, 25 insertions, 12 deletions
diff --git a/deps/npm/node_modules/readable-stream/lib/_stream_writable.js b/deps/npm/node_modules/readable-stream/lib/_stream_writable.js index d20da88c75..0e1c28df51 100644 --- a/deps/npm/node_modules/readable-stream/lib/_stream_writable.js +++ b/deps/npm/node_modules/readable-stream/lib/_stream_writable.js @@ -27,7 +27,7 @@ /*<replacement>*/ -var processNextTick = require('process-nextick-args'); +var pna = require('process-nextick-args'); /*</replacement>*/ module.exports = Writable; @@ -54,7 +54,7 @@ function CorkedRequest(state) { /* </replacement> */ /*<replacement>*/ -var asyncWrite = !process.browser && ['v0.10', 'v0.9.'].indexOf(process.version.slice(0, 5)) > -1 ? setImmediate : processNextTick; +var asyncWrite = !process.browser && ['v0.10', 'v0.9.'].indexOf(process.version.slice(0, 5)) > -1 ? setImmediate : pna.nextTick; /*</replacement>*/ /*<replacement>*/ @@ -79,6 +79,7 @@ var Stream = require('./internal/streams/stream'); /*</replacement>*/ /*<replacement>*/ + var Buffer = require('safe-buffer').Buffer; var OurUint8Array = global.Uint8Array || function () {}; function _uint8ArrayToBuffer(chunk) { @@ -87,6 +88,7 @@ function _uint8ArrayToBuffer(chunk) { function _isUint8Array(obj) { return Buffer.isBuffer(obj) || obj instanceof OurUint8Array; } + /*</replacement>*/ var destroyImpl = require('./internal/streams/destroy'); @@ -100,18 +102,27 @@ function WritableState(options, stream) { options = options || {}; + // Duplex streams are both readable and writable, but share + // the same options object. + // However, some cases require setting options to different + // values for the readable and the writable sides of the duplex stream. + // These options can be provided separately as readableXXX and writableXXX. + var isDuplex = stream instanceof Duplex; + // object stream flag to indicate whether or not this stream // contains buffers or objects. this.objectMode = !!options.objectMode; - if (stream instanceof Duplex) this.objectMode = this.objectMode || !!options.writableObjectMode; + if (isDuplex) this.objectMode = this.objectMode || !!options.writableObjectMode; // the point at which write() starts returning false // Note: 0 is a valid value, means that we always return false if // the entire buffer is not flushed immediately on write() var hwm = options.highWaterMark; + var writableHwm = options.writableHighWaterMark; var defaultHwm = this.objectMode ? 16 : 16 * 1024; - this.highWaterMark = hwm || hwm === 0 ? hwm : defaultHwm; + + if (hwm || hwm === 0) this.highWaterMark = hwm;else if (isDuplex && (writableHwm || writableHwm === 0)) this.highWaterMark = writableHwm;else this.highWaterMark = defaultHwm; // cast to ints. this.highWaterMark = Math.floor(this.highWaterMark); @@ -225,6 +236,7 @@ if (typeof Symbol === 'function' && Symbol.hasInstance && typeof Function.protot Object.defineProperty(Writable, Symbol.hasInstance, { value: function (object) { if (realHasInstance.call(this, object)) return true; + if (this !== Writable) return false; return object && object._writableState instanceof WritableState; } @@ -276,7 +288,7 @@ function writeAfterEnd(stream, cb) { var er = new Error('write after end'); // TODO: defer error events consistently everywhere, not just the cb stream.emit('error', er); - processNextTick(cb, er); + pna.nextTick(cb, er); } // Checks that a user-supplied chunk is valid, especially for the particular @@ -293,7 +305,7 @@ function validChunk(stream, state, chunk, cb) { } if (er) { stream.emit('error', er); - processNextTick(cb, er); + pna.nextTick(cb, er); valid = false; } return valid; @@ -302,7 +314,7 @@ function validChunk(stream, state, chunk, cb) { Writable.prototype.write = function (chunk, encoding, cb) { var state = this._writableState; var ret = false; - var isBuf = _isUint8Array(chunk) && !state.objectMode; + var isBuf = !state.objectMode && _isUint8Array(chunk); if (isBuf && !Buffer.isBuffer(chunk)) { chunk = _uint8ArrayToBuffer(chunk); @@ -413,10 +425,10 @@ function onwriteError(stream, state, sync, er, cb) { if (sync) { // defer the callback if we are being called synchronously // to avoid piling up things on the stack - processNextTick(cb, er); + pna.nextTick(cb, er); // this can emit finish, and it will always happen // after error - processNextTick(finishMaybe, stream, state); + pna.nextTick(finishMaybe, stream, state); stream._writableState.errorEmitted = true; stream.emit('error', er); } else { @@ -514,6 +526,7 @@ function clearBuffer(stream, state) { } else { state.corkedRequestsFree = new CorkedRequest(state); } + state.bufferedRequestCount = 0; } else { // Slow case, write chunks one-by-one while (entry) { @@ -524,6 +537,7 @@ function clearBuffer(stream, state) { doWrite(stream, state, false, len, chunk, encoding, cb); entry = entry.next; + state.bufferedRequestCount--; // if we didn't call the onwrite immediately, then // it means that we need to wait until it does. // also, that means that the chunk and cb are currently @@ -536,7 +550,6 @@ function clearBuffer(stream, state) { if (entry === null) state.lastBufferedRequest = null; } - state.bufferedRequestCount = 0; state.bufferedRequest = entry; state.bufferProcessing = false; } @@ -590,7 +603,7 @@ function prefinish(stream, state) { if (typeof stream._final === 'function') { state.pendingcb++; state.finalCalled = true; - processNextTick(callFinal, stream, state); + pna.nextTick(callFinal, stream, state); } else { state.prefinished = true; stream.emit('prefinish'); @@ -614,7 +627,7 @@ function endWritable(stream, state, cb) { state.ending = true; finishMaybe(stream, state); if (cb) { - if (state.finished) processNextTick(cb);else stream.once('finish', cb); + if (state.finished) pna.nextTick(cb);else stream.once('finish', cb); } state.ended = true; stream.writable = false; |