diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/_stream_duplex.js | 7 | ||||
-rw-r--r-- | lib/_stream_readable.js | 6 | ||||
-rw-r--r-- | lib/_stream_transform.js | 3 | ||||
-rw-r--r-- | lib/_stream_writable.js | 10 | ||||
-rw-r--r-- | lib/fs.js | 6 | ||||
-rw-r--r-- | lib/internal/errors.js | 2 | ||||
-rw-r--r-- | lib/internal/http2/core.js | 1 | ||||
-rw-r--r-- | lib/internal/streams/destroy.js | 9 | ||||
-rw-r--r-- | lib/net.js | 5 | ||||
-rw-r--r-- | lib/zlib.js | 9 |
10 files changed, 37 insertions, 21 deletions
diff --git a/lib/_stream_duplex.js b/lib/_stream_duplex.js index 59ce832927..1ccb931260 100644 --- a/lib/_stream_duplex.js +++ b/lib/_stream_duplex.js @@ -135,10 +135,3 @@ Object.defineProperty(Duplex.prototype, 'destroyed', { this._writableState.destroyed = value; } }); - -Duplex.prototype._destroy = function(err, cb) { - this.push(null); - this.end(); - - process.nextTick(cb, err); -}; diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index ba231ccda9..5781dfd471 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -106,6 +106,9 @@ function ReadableState(options, stream) { this.readableListening = false; this.resumeScheduled = false; + // Should close be emitted on destroy. Defaults to true. + this.emitClose = options.emitClose !== false; + // has it been destroyed this.destroyed = false; @@ -177,7 +180,6 @@ Object.defineProperty(Readable.prototype, 'destroyed', { Readable.prototype.destroy = destroyImpl.destroy; Readable.prototype._undestroy = destroyImpl.undestroy; Readable.prototype._destroy = function(err, cb) { - this.push(null); cb(err); }; @@ -236,6 +238,8 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) { addChunk(stream, state, chunk, true); } else if (state.ended) { stream.emit('error', new errors.Error('ERR_STREAM_PUSH_AFTER_EOF')); + } else if (state.destroyed) { + return false; } else { state.reading = false; if (state.decoder && !encoding) { diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index a9fcddda2d..b82114ecae 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -132,7 +132,7 @@ function Transform(options) { } function prefinish() { - if (typeof this._flush === 'function') { + if (typeof this._flush === 'function' && !this._readableState.destroyed) { this._flush((er, data) => { done(this, er, data); }); @@ -194,7 +194,6 @@ Transform.prototype._read = function(n) { Transform.prototype._destroy = function(err, cb) { Duplex.prototype._destroy.call(this, err, (err2) => { cb(err2); - this.emit('close'); }); }; diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 2b76588135..d5cfe07f17 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -134,6 +134,9 @@ function WritableState(options, stream) { // True if the error was already emitted and should not be thrown again this.errorEmitted = false; + // Should close be emitted on destroy. Defaults to true. + this.emitClose = options.emitClose !== false; + // count buffered requests this.bufferedRequestCount = 0; @@ -390,7 +393,9 @@ function doWrite(stream, state, writev, len, chunk, encoding, cb) { state.writecb = cb; state.writing = true; state.sync = true; - if (writev) + if (state.destroyed) + state.onwrite(new errors.Error('ERR_STREAM_DESTROYED', 'write')); + else if (writev) stream._writev(chunk, state.onwrite); else stream._write(chunk, encoding, state.onwrite); @@ -604,7 +609,7 @@ function callFinal(stream, state) { } function prefinish(stream, state) { if (!state.prefinished && !state.finalCalled) { - if (typeof stream._final === 'function') { + if (typeof stream._final === 'function' && !state.destroyed) { state.pendingcb++; state.finalCalled = true; process.nextTick(callFinal, stream, state); @@ -681,6 +686,5 @@ Object.defineProperty(Writable.prototype, 'destroyed', { Writable.prototype.destroy = destroyImpl.destroy; Writable.prototype._undestroy = destroyImpl.undestroy; Writable.prototype._destroy = function(err, cb) { - this.end(); cb(err); }; @@ -1929,6 +1929,9 @@ function ReadStream(path, options) { if (options.highWaterMark === undefined) options.highWaterMark = 64 * 1024; + // for backwards compat do not emit close on destroy. + options.emitClose = false; + Readable.call(this, options); // path will be ignored when fd is specified, so it can be falsy @@ -2084,6 +2087,9 @@ function WriteStream(path, options) { options = copyObject(getOptions(options, {})); + // for backwards compat do not emit close on destroy. + options.emitClose = false; + Writable.call(this, options); // path will be ignored when fd is specified, so it can be falsy diff --git a/lib/internal/errors.js b/lib/internal/errors.js index a4a79d671e..11f32ccdc1 100644 --- a/lib/internal/errors.js +++ b/lib/internal/errors.js @@ -843,6 +843,7 @@ E('ERR_SOCKET_DGRAM_NOT_RUNNING', 'Not running', Error); E('ERR_STDERR_CLOSE', 'process.stderr cannot be closed', Error); E('ERR_STDOUT_CLOSE', 'process.stdout cannot be closed', Error); E('ERR_STREAM_CANNOT_PIPE', 'Cannot pipe, not readable', Error); +E('ERR_STREAM_DESTROYED', 'Cannot call %s after a stream was destroyed'); E('ERR_STREAM_NULL_VALUES', 'May not write null values to stream', TypeError); E('ERR_STREAM_PUSH_AFTER_EOF', 'stream.push() after EOF', Error); E('ERR_STREAM_READ_NOT_IMPLEMENTED', '_read() is not implemented', Error); @@ -908,7 +909,6 @@ E('ERR_VM_MODULE_NOT_LINKED', E('ERR_VM_MODULE_NOT_MODULE', 'Provided module is not an instance of Module', Error); E('ERR_VM_MODULE_STATUS', 'Module status %s', Error); -E('ERR_ZLIB_BINDING_CLOSED', 'zlib binding closed', Error); E('ERR_ZLIB_INITIALIZATION_FAILED', 'Initialization failed', Error); function sysError(code, syscall, path, dest, diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 71bb55ee23..f60c6388af 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -1475,6 +1475,7 @@ class Http2Stream extends Duplex { constructor(session, options) { options.allowHalfOpen = true; options.decodeStrings = false; + options.emitClose = false; super(options); this[async_id_symbol] = -1; diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index 985332ac46..5d29e18204 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -30,6 +30,7 @@ function destroy(err, cb) { } this._destroy(err || null, (err) => { + process.nextTick(emitCloseNT, this); if (!cb && err) { process.nextTick(emitErrorNT, this, err); if (this._writableState) { @@ -43,6 +44,14 @@ function destroy(err, cb) { return this; } +function emitCloseNT(self) { + if (self._writableState && !self._writableState.emitClose) + return; + if (self._readableState && !self._readableState.emitClose) + return; + self.emit('close'); +} + function undestroy() { if (this._readableState) { this._readableState.destroyed = false; diff --git a/lib/net.js b/lib/net.js index 7583fcb27d..f2cb423f30 100644 --- a/lib/net.js +++ b/lib/net.js @@ -232,6 +232,11 @@ function Socket(options) { options = { fd: options }; // Legacy interface. else if (options === undefined) options = {}; + else + options = util._extend({}, options); + + // For backwards compat do not emit close on destroy. + options.emitClose = false; stream.Duplex.call(this, options); diff --git a/lib/zlib.js b/lib/zlib.js index 93f878712a..4adfd1ffa2 100644 --- a/lib/zlib.js +++ b/lib/zlib.js @@ -25,7 +25,6 @@ const { ERR_BUFFER_TOO_LARGE, ERR_INVALID_ARG_TYPE, ERR_OUT_OF_RANGE, - ERR_ZLIB_BINDING_CLOSED, ERR_ZLIB_INITIALIZATION_FAILED } = require('internal/errors').codes; const Transform = require('_stream_transform'); @@ -392,7 +391,7 @@ Zlib.prototype.flush = function flush(kind, callback) { Zlib.prototype.close = function close(callback) { _close(this, callback); - process.nextTick(emitCloseNT, this); + this.destroy(); }; Zlib.prototype._transform = function _transform(chunk, encoding, cb) { @@ -510,7 +509,7 @@ function processChunkSync(self, chunk, flushFlag) { function processChunk(self, chunk, flushFlag, cb) { var handle = self._handle; if (!handle) - return cb(new ERR_ZLIB_BINDING_CLOSED()); + assert(false, 'zlib binding closed'); handle.buffer = chunk; handle.cb = cb; @@ -603,10 +602,6 @@ function _close(engine, callback) { engine._handle = null; } -function emitCloseNT(self) { - self.emit('close'); -} - // generic zlib // minimal 2-byte header function Deflate(opts) { |