summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/_stream_duplex.js7
-rw-r--r--lib/_stream_readable.js6
-rw-r--r--lib/_stream_transform.js3
-rw-r--r--lib/_stream_writable.js10
-rw-r--r--lib/fs.js6
-rw-r--r--lib/internal/errors.js2
-rw-r--r--lib/internal/http2/core.js1
-rw-r--r--lib/internal/streams/destroy.js9
-rw-r--r--lib/net.js5
-rw-r--r--lib/zlib.js9
10 files changed, 37 insertions, 21 deletions
diff --git a/lib/_stream_duplex.js b/lib/_stream_duplex.js
index 59ce832927..1ccb931260 100644
--- a/lib/_stream_duplex.js
+++ b/lib/_stream_duplex.js
@@ -135,10 +135,3 @@ Object.defineProperty(Duplex.prototype, 'destroyed', {
this._writableState.destroyed = value;
}
});
-
-Duplex.prototype._destroy = function(err, cb) {
- this.push(null);
- this.end();
-
- process.nextTick(cb, err);
-};
diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js
index ba231ccda9..5781dfd471 100644
--- a/lib/_stream_readable.js
+++ b/lib/_stream_readable.js
@@ -106,6 +106,9 @@ function ReadableState(options, stream) {
this.readableListening = false;
this.resumeScheduled = false;
+ // Should close be emitted on destroy. Defaults to true.
+ this.emitClose = options.emitClose !== false;
+
// has it been destroyed
this.destroyed = false;
@@ -177,7 +180,6 @@ Object.defineProperty(Readable.prototype, 'destroyed', {
Readable.prototype.destroy = destroyImpl.destroy;
Readable.prototype._undestroy = destroyImpl.undestroy;
Readable.prototype._destroy = function(err, cb) {
- this.push(null);
cb(err);
};
@@ -236,6 +238,8 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
addChunk(stream, state, chunk, true);
} else if (state.ended) {
stream.emit('error', new errors.Error('ERR_STREAM_PUSH_AFTER_EOF'));
+ } else if (state.destroyed) {
+ return false;
} else {
state.reading = false;
if (state.decoder && !encoding) {
diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js
index a9fcddda2d..b82114ecae 100644
--- a/lib/_stream_transform.js
+++ b/lib/_stream_transform.js
@@ -132,7 +132,7 @@ function Transform(options) {
}
function prefinish() {
- if (typeof this._flush === 'function') {
+ if (typeof this._flush === 'function' && !this._readableState.destroyed) {
this._flush((er, data) => {
done(this, er, data);
});
@@ -194,7 +194,6 @@ Transform.prototype._read = function(n) {
Transform.prototype._destroy = function(err, cb) {
Duplex.prototype._destroy.call(this, err, (err2) => {
cb(err2);
- this.emit('close');
});
};
diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js
index 2b76588135..d5cfe07f17 100644
--- a/lib/_stream_writable.js
+++ b/lib/_stream_writable.js
@@ -134,6 +134,9 @@ function WritableState(options, stream) {
// True if the error was already emitted and should not be thrown again
this.errorEmitted = false;
+ // Should close be emitted on destroy. Defaults to true.
+ this.emitClose = options.emitClose !== false;
+
// count buffered requests
this.bufferedRequestCount = 0;
@@ -390,7 +393,9 @@ function doWrite(stream, state, writev, len, chunk, encoding, cb) {
state.writecb = cb;
state.writing = true;
state.sync = true;
- if (writev)
+ if (state.destroyed)
+ state.onwrite(new errors.Error('ERR_STREAM_DESTROYED', 'write'));
+ else if (writev)
stream._writev(chunk, state.onwrite);
else
stream._write(chunk, encoding, state.onwrite);
@@ -604,7 +609,7 @@ function callFinal(stream, state) {
}
function prefinish(stream, state) {
if (!state.prefinished && !state.finalCalled) {
- if (typeof stream._final === 'function') {
+ if (typeof stream._final === 'function' && !state.destroyed) {
state.pendingcb++;
state.finalCalled = true;
process.nextTick(callFinal, stream, state);
@@ -681,6 +686,5 @@ Object.defineProperty(Writable.prototype, 'destroyed', {
Writable.prototype.destroy = destroyImpl.destroy;
Writable.prototype._undestroy = destroyImpl.undestroy;
Writable.prototype._destroy = function(err, cb) {
- this.end();
cb(err);
};
diff --git a/lib/fs.js b/lib/fs.js
index 3771efad10..917c3eb3a9 100644
--- a/lib/fs.js
+++ b/lib/fs.js
@@ -1929,6 +1929,9 @@ function ReadStream(path, options) {
if (options.highWaterMark === undefined)
options.highWaterMark = 64 * 1024;
+ // for backwards compat do not emit close on destroy.
+ options.emitClose = false;
+
Readable.call(this, options);
// path will be ignored when fd is specified, so it can be falsy
@@ -2084,6 +2087,9 @@ function WriteStream(path, options) {
options = copyObject(getOptions(options, {}));
+ // for backwards compat do not emit close on destroy.
+ options.emitClose = false;
+
Writable.call(this, options);
// path will be ignored when fd is specified, so it can be falsy
diff --git a/lib/internal/errors.js b/lib/internal/errors.js
index a4a79d671e..11f32ccdc1 100644
--- a/lib/internal/errors.js
+++ b/lib/internal/errors.js
@@ -843,6 +843,7 @@ E('ERR_SOCKET_DGRAM_NOT_RUNNING', 'Not running', Error);
E('ERR_STDERR_CLOSE', 'process.stderr cannot be closed', Error);
E('ERR_STDOUT_CLOSE', 'process.stdout cannot be closed', Error);
E('ERR_STREAM_CANNOT_PIPE', 'Cannot pipe, not readable', Error);
+E('ERR_STREAM_DESTROYED', 'Cannot call %s after a stream was destroyed');
E('ERR_STREAM_NULL_VALUES', 'May not write null values to stream', TypeError);
E('ERR_STREAM_PUSH_AFTER_EOF', 'stream.push() after EOF', Error);
E('ERR_STREAM_READ_NOT_IMPLEMENTED', '_read() is not implemented', Error);
@@ -908,7 +909,6 @@ E('ERR_VM_MODULE_NOT_LINKED',
E('ERR_VM_MODULE_NOT_MODULE',
'Provided module is not an instance of Module', Error);
E('ERR_VM_MODULE_STATUS', 'Module status %s', Error);
-E('ERR_ZLIB_BINDING_CLOSED', 'zlib binding closed', Error);
E('ERR_ZLIB_INITIALIZATION_FAILED', 'Initialization failed', Error);
function sysError(code, syscall, path, dest,
diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js
index 71bb55ee23..f60c6388af 100644
--- a/lib/internal/http2/core.js
+++ b/lib/internal/http2/core.js
@@ -1475,6 +1475,7 @@ class Http2Stream extends Duplex {
constructor(session, options) {
options.allowHalfOpen = true;
options.decodeStrings = false;
+ options.emitClose = false;
super(options);
this[async_id_symbol] = -1;
diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js
index 985332ac46..5d29e18204 100644
--- a/lib/internal/streams/destroy.js
+++ b/lib/internal/streams/destroy.js
@@ -30,6 +30,7 @@ function destroy(err, cb) {
}
this._destroy(err || null, (err) => {
+ process.nextTick(emitCloseNT, this);
if (!cb && err) {
process.nextTick(emitErrorNT, this, err);
if (this._writableState) {
@@ -43,6 +44,14 @@ function destroy(err, cb) {
return this;
}
+function emitCloseNT(self) {
+ if (self._writableState && !self._writableState.emitClose)
+ return;
+ if (self._readableState && !self._readableState.emitClose)
+ return;
+ self.emit('close');
+}
+
function undestroy() {
if (this._readableState) {
this._readableState.destroyed = false;
diff --git a/lib/net.js b/lib/net.js
index 7583fcb27d..f2cb423f30 100644
--- a/lib/net.js
+++ b/lib/net.js
@@ -232,6 +232,11 @@ function Socket(options) {
options = { fd: options }; // Legacy interface.
else if (options === undefined)
options = {};
+ else
+ options = util._extend({}, options);
+
+ // For backwards compat do not emit close on destroy.
+ options.emitClose = false;
stream.Duplex.call(this, options);
diff --git a/lib/zlib.js b/lib/zlib.js
index 93f878712a..4adfd1ffa2 100644
--- a/lib/zlib.js
+++ b/lib/zlib.js
@@ -25,7 +25,6 @@ const {
ERR_BUFFER_TOO_LARGE,
ERR_INVALID_ARG_TYPE,
ERR_OUT_OF_RANGE,
- ERR_ZLIB_BINDING_CLOSED,
ERR_ZLIB_INITIALIZATION_FAILED
} = require('internal/errors').codes;
const Transform = require('_stream_transform');
@@ -392,7 +391,7 @@ Zlib.prototype.flush = function flush(kind, callback) {
Zlib.prototype.close = function close(callback) {
_close(this, callback);
- process.nextTick(emitCloseNT, this);
+ this.destroy();
};
Zlib.prototype._transform = function _transform(chunk, encoding, cb) {
@@ -510,7 +509,7 @@ function processChunkSync(self, chunk, flushFlag) {
function processChunk(self, chunk, flushFlag, cb) {
var handle = self._handle;
if (!handle)
- return cb(new ERR_ZLIB_BINDING_CLOSED());
+ assert(false, 'zlib binding closed');
handle.buffer = chunk;
handle.cb = cb;
@@ -603,10 +602,6 @@ function _close(engine, callback) {
engine._handle = null;
}
-function emitCloseNT(self) {
- self.emit('close');
-}
-
// generic zlib
// minimal 2-byte header
function Deflate(opts) {