summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/api/errors.md11
-rw-r--r--doc/api/stream.md19
-rw-r--r--lib/_stream_duplex.js7
-rw-r--r--lib/_stream_readable.js6
-rw-r--r--lib/_stream_transform.js3
-rw-r--r--lib/_stream_writable.js10
-rw-r--r--lib/fs.js6
-rw-r--r--lib/internal/errors.js2
-rw-r--r--lib/internal/http2/core.js1
-rw-r--r--lib/internal/streams/destroy.js9
-rw-r--r--lib/net.js5
-rw-r--r--lib/zlib.js9
-rw-r--r--test/parallel/test-net-socket-destroy-send.js8
-rw-r--r--test/parallel/test-stream-duplex-destroy.js14
-rw-r--r--test/parallel/test-stream-readable-destroy.js19
-rw-r--r--test/parallel/test-stream-transform-destroy.js16
-rw-r--r--test/parallel/test-stream-writable-destroy.js13
-rw-r--r--test/parallel/test-zlib-write-after-close.js4
18 files changed, 107 insertions, 55 deletions
diff --git a/doc/api/errors.md b/doc/api/errors.md
index b9c52ad0d9..70ac01de61 100644
--- a/doc/api/errors.md
+++ b/doc/api/errors.md
@@ -1449,6 +1449,12 @@ An unspecified or non-specific system error has occurred within the Node.js
process. The error object will have an `err.info` object property with
additional details.
+<a id="ERR_STREAM_DESTROYED"></a>
+### ERR_STREAM_DESTROYED
+
+A stream method was called that cannot complete because the stream was
+destroyed using `stream.destroy()`.
+
<a id="ERR_TLS_CERT_ALTNAME_INVALID"></a>
### ERR_TLS_CERT_ALTNAME_INVALID
@@ -1615,11 +1621,6 @@ The fulfilled value of a linking promise is not a `vm.Module` object.
The current module's status does not allow for this operation. The specific
meaning of the error depends on the specific function.
-<a id="ERR_ZLIB_BINDING_CLOSED"></a>
-### ERR_ZLIB_BINDING_CLOSED
-
-An attempt was made to use a `zlib` object after it has already been closed.
-
<a id="ERR_ZLIB_INITIALIZATION_FAILED"></a>
### ERR_ZLIB_INITIALIZATION_FAILED
diff --git a/doc/api/stream.md b/doc/api/stream.md
index 5db990d4d2..32e368f05f 100644
--- a/doc/api/stream.md
+++ b/doc/api/stream.md
@@ -543,8 +543,10 @@ added: v8.0.0
* Returns: {this}
-Destroy the stream, and emit the passed error. After this call, the
-writable stream has ended. Implementors should not override this method,
+Destroy the stream, and emit the passed `error` and a `close` event.
+After this call, the writable stream has ended and subsequent calls
+to `write` / `end` will give an `ERR_STREAM_DESTROYED` error.
+Implementors should not override this method,
but instead implement [`writable._destroy`][writable-_destroy].
### Readable Streams
@@ -1167,8 +1169,9 @@ myReader.on('readable', () => {
added: v8.0.0
-->
-Destroy the stream, and emit `'error'`. After this call, the
-readable stream will release any internal resources.
+Destroy the stream, and emit `'error'` and `close`. After this call, the
+readable stream will release any internal resources and subsequent calls
+to `push` will be ignored.
Implementors should not override this method, but instead implement
[`readable._destroy`][readable-_destroy].
@@ -1382,6 +1385,12 @@ constructor and implement the `writable._write()` method. The
`writable._writev()` method *may* also be implemented.
#### Constructor: new stream.Writable([options])
+<!-- YAML
+changes:
+ - version: REPLACEME
+ pr-url: https://github.com/nodejs/node/pull/18438
+ description: Add `emitClose` option to specify if `close` is emitted on destroy
+-->
* `options` {Object}
* `highWaterMark` {number} Buffer level when
@@ -1395,6 +1404,8 @@ constructor and implement the `writable._write()` method. The
it becomes possible to write JavaScript values other than string,
`Buffer` or `Uint8Array` if supported by the stream implementation.
Defaults to `false`
+ * `emitClose` {boolean} Whether or not the stream should emit `close`
+ after it has been destroyed. Defaults to `true`
* `write` {Function} Implementation for the
[`stream._write()`][stream-_write] method.
* `writev` {Function} Implementation for the
diff --git a/lib/_stream_duplex.js b/lib/_stream_duplex.js
index 59ce832927..1ccb931260 100644
--- a/lib/_stream_duplex.js
+++ b/lib/_stream_duplex.js
@@ -135,10 +135,3 @@ Object.defineProperty(Duplex.prototype, 'destroyed', {
this._writableState.destroyed = value;
}
});
-
-Duplex.prototype._destroy = function(err, cb) {
- this.push(null);
- this.end();
-
- process.nextTick(cb, err);
-};
diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js
index ba231ccda9..5781dfd471 100644
--- a/lib/_stream_readable.js
+++ b/lib/_stream_readable.js
@@ -106,6 +106,9 @@ function ReadableState(options, stream) {
this.readableListening = false;
this.resumeScheduled = false;
+ // Should close be emitted on destroy. Defaults to true.
+ this.emitClose = options.emitClose !== false;
+
// has it been destroyed
this.destroyed = false;
@@ -177,7 +180,6 @@ Object.defineProperty(Readable.prototype, 'destroyed', {
Readable.prototype.destroy = destroyImpl.destroy;
Readable.prototype._undestroy = destroyImpl.undestroy;
Readable.prototype._destroy = function(err, cb) {
- this.push(null);
cb(err);
};
@@ -236,6 +238,8 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
addChunk(stream, state, chunk, true);
} else if (state.ended) {
stream.emit('error', new errors.Error('ERR_STREAM_PUSH_AFTER_EOF'));
+ } else if (state.destroyed) {
+ return false;
} else {
state.reading = false;
if (state.decoder && !encoding) {
diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js
index a9fcddda2d..b82114ecae 100644
--- a/lib/_stream_transform.js
+++ b/lib/_stream_transform.js
@@ -132,7 +132,7 @@ function Transform(options) {
}
function prefinish() {
- if (typeof this._flush === 'function') {
+ if (typeof this._flush === 'function' && !this._readableState.destroyed) {
this._flush((er, data) => {
done(this, er, data);
});
@@ -194,7 +194,6 @@ Transform.prototype._read = function(n) {
Transform.prototype._destroy = function(err, cb) {
Duplex.prototype._destroy.call(this, err, (err2) => {
cb(err2);
- this.emit('close');
});
};
diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js
index 2b76588135..d5cfe07f17 100644
--- a/lib/_stream_writable.js
+++ b/lib/_stream_writable.js
@@ -134,6 +134,9 @@ function WritableState(options, stream) {
// True if the error was already emitted and should not be thrown again
this.errorEmitted = false;
+ // Should close be emitted on destroy. Defaults to true.
+ this.emitClose = options.emitClose !== false;
+
// count buffered requests
this.bufferedRequestCount = 0;
@@ -390,7 +393,9 @@ function doWrite(stream, state, writev, len, chunk, encoding, cb) {
state.writecb = cb;
state.writing = true;
state.sync = true;
- if (writev)
+ if (state.destroyed)
+ state.onwrite(new errors.Error('ERR_STREAM_DESTROYED', 'write'));
+ else if (writev)
stream._writev(chunk, state.onwrite);
else
stream._write(chunk, encoding, state.onwrite);
@@ -604,7 +609,7 @@ function callFinal(stream, state) {
}
function prefinish(stream, state) {
if (!state.prefinished && !state.finalCalled) {
- if (typeof stream._final === 'function') {
+ if (typeof stream._final === 'function' && !state.destroyed) {
state.pendingcb++;
state.finalCalled = true;
process.nextTick(callFinal, stream, state);
@@ -681,6 +686,5 @@ Object.defineProperty(Writable.prototype, 'destroyed', {
Writable.prototype.destroy = destroyImpl.destroy;
Writable.prototype._undestroy = destroyImpl.undestroy;
Writable.prototype._destroy = function(err, cb) {
- this.end();
cb(err);
};
diff --git a/lib/fs.js b/lib/fs.js
index 3771efad10..917c3eb3a9 100644
--- a/lib/fs.js
+++ b/lib/fs.js
@@ -1929,6 +1929,9 @@ function ReadStream(path, options) {
if (options.highWaterMark === undefined)
options.highWaterMark = 64 * 1024;
+ // for backwards compat do not emit close on destroy.
+ options.emitClose = false;
+
Readable.call(this, options);
// path will be ignored when fd is specified, so it can be falsy
@@ -2084,6 +2087,9 @@ function WriteStream(path, options) {
options = copyObject(getOptions(options, {}));
+ // for backwards compat do not emit close on destroy.
+ options.emitClose = false;
+
Writable.call(this, options);
// path will be ignored when fd is specified, so it can be falsy
diff --git a/lib/internal/errors.js b/lib/internal/errors.js
index a4a79d671e..11f32ccdc1 100644
--- a/lib/internal/errors.js
+++ b/lib/internal/errors.js
@@ -843,6 +843,7 @@ E('ERR_SOCKET_DGRAM_NOT_RUNNING', 'Not running', Error);
E('ERR_STDERR_CLOSE', 'process.stderr cannot be closed', Error);
E('ERR_STDOUT_CLOSE', 'process.stdout cannot be closed', Error);
E('ERR_STREAM_CANNOT_PIPE', 'Cannot pipe, not readable', Error);
+E('ERR_STREAM_DESTROYED', 'Cannot call %s after a stream was destroyed');
E('ERR_STREAM_NULL_VALUES', 'May not write null values to stream', TypeError);
E('ERR_STREAM_PUSH_AFTER_EOF', 'stream.push() after EOF', Error);
E('ERR_STREAM_READ_NOT_IMPLEMENTED', '_read() is not implemented', Error);
@@ -908,7 +909,6 @@ E('ERR_VM_MODULE_NOT_LINKED',
E('ERR_VM_MODULE_NOT_MODULE',
'Provided module is not an instance of Module', Error);
E('ERR_VM_MODULE_STATUS', 'Module status %s', Error);
-E('ERR_ZLIB_BINDING_CLOSED', 'zlib binding closed', Error);
E('ERR_ZLIB_INITIALIZATION_FAILED', 'Initialization failed', Error);
function sysError(code, syscall, path, dest,
diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js
index 71bb55ee23..f60c6388af 100644
--- a/lib/internal/http2/core.js
+++ b/lib/internal/http2/core.js
@@ -1475,6 +1475,7 @@ class Http2Stream extends Duplex {
constructor(session, options) {
options.allowHalfOpen = true;
options.decodeStrings = false;
+ options.emitClose = false;
super(options);
this[async_id_symbol] = -1;
diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js
index 985332ac46..5d29e18204 100644
--- a/lib/internal/streams/destroy.js
+++ b/lib/internal/streams/destroy.js
@@ -30,6 +30,7 @@ function destroy(err, cb) {
}
this._destroy(err || null, (err) => {
+ process.nextTick(emitCloseNT, this);
if (!cb && err) {
process.nextTick(emitErrorNT, this, err);
if (this._writableState) {
@@ -43,6 +44,14 @@ function destroy(err, cb) {
return this;
}
+function emitCloseNT(self) {
+ if (self._writableState && !self._writableState.emitClose)
+ return;
+ if (self._readableState && !self._readableState.emitClose)
+ return;
+ self.emit('close');
+}
+
function undestroy() {
if (this._readableState) {
this._readableState.destroyed = false;
diff --git a/lib/net.js b/lib/net.js
index 7583fcb27d..f2cb423f30 100644
--- a/lib/net.js
+++ b/lib/net.js
@@ -232,6 +232,11 @@ function Socket(options) {
options = { fd: options }; // Legacy interface.
else if (options === undefined)
options = {};
+ else
+ options = util._extend({}, options);
+
+ // For backwards compat do not emit close on destroy.
+ options.emitClose = false;
stream.Duplex.call(this, options);
diff --git a/lib/zlib.js b/lib/zlib.js
index 93f878712a..4adfd1ffa2 100644
--- a/lib/zlib.js
+++ b/lib/zlib.js
@@ -25,7 +25,6 @@ const {
ERR_BUFFER_TOO_LARGE,
ERR_INVALID_ARG_TYPE,
ERR_OUT_OF_RANGE,
- ERR_ZLIB_BINDING_CLOSED,
ERR_ZLIB_INITIALIZATION_FAILED
} = require('internal/errors').codes;
const Transform = require('_stream_transform');
@@ -392,7 +391,7 @@ Zlib.prototype.flush = function flush(kind, callback) {
Zlib.prototype.close = function close(callback) {
_close(this, callback);
- process.nextTick(emitCloseNT, this);
+ this.destroy();
};
Zlib.prototype._transform = function _transform(chunk, encoding, cb) {
@@ -510,7 +509,7 @@ function processChunkSync(self, chunk, flushFlag) {
function processChunk(self, chunk, flushFlag, cb) {
var handle = self._handle;
if (!handle)
- return cb(new ERR_ZLIB_BINDING_CLOSED());
+ assert(false, 'zlib binding closed');
handle.buffer = chunk;
handle.cb = cb;
@@ -603,10 +602,6 @@ function _close(engine, callback) {
engine._handle = null;
}
-function emitCloseNT(self) {
- self.emit('close');
-}
-
// generic zlib
// minimal 2-byte header
function Deflate(opts) {
diff --git a/test/parallel/test-net-socket-destroy-send.js b/test/parallel/test-net-socket-destroy-send.js
index a602b89253..aa587fc2e1 100644
--- a/test/parallel/test-net-socket-destroy-send.js
+++ b/test/parallel/test-net-socket-destroy-send.js
@@ -13,14 +13,14 @@ server.listen(0, common.mustCall(function() {
// Test destroy returns this, even on multiple calls when it short-circuits.
assert.strictEqual(conn, conn.destroy().destroy());
conn.on('error', common.expectsError({
- code: 'ERR_SOCKET_CLOSED',
- message: 'Socket is closed',
+ code: 'ERR_STREAM_DESTROYED',
+ message: 'Cannot call write after a stream was destroyed',
type: Error
}));
conn.write(Buffer.from('kaboom'), common.expectsError({
- code: 'ERR_SOCKET_CLOSED',
- message: 'Socket is closed',
+ code: 'ERR_STREAM_DESTROYED',
+ message: 'Cannot call write after a stream was destroyed',
type: Error
}));
server.close();
diff --git a/test/parallel/test-stream-duplex-destroy.js b/test/parallel/test-stream-duplex-destroy.js
index 00e334d64b..854d29ffc1 100644
--- a/test/parallel/test-stream-duplex-destroy.js
+++ b/test/parallel/test-stream-duplex-destroy.js
@@ -13,8 +13,9 @@ const { inherits } = require('util');
duplex.resume();
- duplex.on('end', common.mustCall());
- duplex.on('finish', common.mustCall());
+ duplex.on('end', common.mustNotCall());
+ duplex.on('finish', common.mustNotCall());
+ duplex.on('close', common.mustCall());
duplex.destroy();
assert.strictEqual(duplex.destroyed, true);
@@ -29,8 +30,8 @@ const { inherits } = require('util');
const expected = new Error('kaboom');
- duplex.on('end', common.mustCall());
- duplex.on('finish', common.mustCall());
+ duplex.on('end', common.mustNotCall());
+ duplex.on('finish', common.mustNotCall());
duplex.on('error', common.mustCall((err) => {
assert.strictEqual(err, expected);
}));
@@ -78,6 +79,7 @@ const { inherits } = require('util');
// error is swallowed by the custom _destroy
duplex.on('error', common.mustNotCall('no error event'));
+ duplex.on('close', common.mustCall());
duplex.destroy(expected);
assert.strictEqual(duplex.destroyed, true);
@@ -159,8 +161,8 @@ const { inherits } = require('util');
});
duplex.resume();
- duplex.on('finish', common.mustCall());
- duplex.on('end', common.mustCall());
+ duplex.on('finish', common.mustNotCall());
+ duplex.on('end', common.mustNotCall());
duplex.destroy();
assert.strictEqual(duplex.destroyed, true);
diff --git a/test/parallel/test-stream-readable-destroy.js b/test/parallel/test-stream-readable-destroy.js
index def20d26c3..026aa8ca16 100644
--- a/test/parallel/test-stream-readable-destroy.js
+++ b/test/parallel/test-stream-readable-destroy.js
@@ -11,7 +11,7 @@ const { inherits } = require('util');
});
read.resume();
- read.on('end', common.mustCall());
+ read.on('close', common.mustCall());
read.destroy();
assert.strictEqual(read.destroyed, true);
@@ -25,7 +25,8 @@ const { inherits } = require('util');
const expected = new Error('kaboom');
- read.on('end', common.mustCall());
+ read.on('end', common.mustNotCall('no end event'));
+ read.on('close', common.mustCall());
read.on('error', common.mustCall((err) => {
assert.strictEqual(err, expected);
}));
@@ -47,6 +48,7 @@ const { inherits } = require('util');
const expected = new Error('kaboom');
read.on('end', common.mustNotCall('no end event'));
+ read.on('close', common.mustCall());
read.on('error', common.mustCall((err) => {
assert.strictEqual(err, expected);
}));
@@ -70,6 +72,7 @@ const { inherits } = require('util');
// error is swallowed by the custom _destroy
read.on('error', common.mustNotCall('no error event'));
+ read.on('close', common.mustCall());
read.destroy(expected);
assert.strictEqual(read.destroyed, true);
@@ -106,6 +109,7 @@ const { inherits } = require('util');
const fail = common.mustNotCall('no end event');
read.on('end', fail);
+ read.on('close', common.mustCall());
read.destroy();
@@ -170,7 +174,18 @@ const { inherits } = require('util');
const expected = new Error('kaboom');
+ read.on('close', common.mustCall());
read.destroy(expected, common.mustCall(function(err) {
assert.strictEqual(expected, err);
}));
}
+
+{
+ const read = new Readable({
+ read() {}
+ });
+
+ read.destroy();
+ read.push('hi');
+ read.on('data', common.mustNotCall());
+}
diff --git a/test/parallel/test-stream-transform-destroy.js b/test/parallel/test-stream-transform-destroy.js
index c42fe1d6f9..47cce87264 100644
--- a/test/parallel/test-stream-transform-destroy.js
+++ b/test/parallel/test-stream-transform-destroy.js
@@ -11,9 +11,9 @@ const assert = require('assert');
transform.resume();
- transform.on('end', common.mustCall());
+ transform.on('end', common.mustNotCall());
transform.on('close', common.mustCall());
- transform.on('finish', common.mustCall());
+ transform.on('finish', common.mustNotCall());
transform.destroy();
}
@@ -26,8 +26,8 @@ const assert = require('assert');
const expected = new Error('kaboom');
- transform.on('end', common.mustCall());
- transform.on('finish', common.mustCall());
+ transform.on('end', common.mustNotCall());
+ transform.on('finish', common.mustNotCall());
transform.on('close', common.mustCall());
transform.on('error', common.mustCall((err) => {
assert.strictEqual(err, expected);
@@ -49,7 +49,7 @@ const assert = require('assert');
const expected = new Error('kaboom');
transform.on('finish', common.mustNotCall('no finish event'));
- transform.on('close', common.mustNotCall('no close event'));
+ transform.on('close', common.mustCall());
transform.on('error', common.mustCall((err) => {
assert.strictEqual(err, expected);
}));
@@ -69,7 +69,7 @@ const assert = require('assert');
transform.resume();
transform.on('end', common.mustNotCall('no end event'));
- transform.on('close', common.mustNotCall('no close event'));
+ transform.on('close', common.mustCall());
transform.on('finish', common.mustNotCall('no finish event'));
// error is swallowed by the custom _destroy
@@ -110,7 +110,7 @@ const assert = require('assert');
transform.on('finish', fail);
transform.on('end', fail);
- transform.on('close', fail);
+ transform.on('close', common.mustCall());
transform.destroy();
@@ -132,7 +132,7 @@ const assert = require('assert');
cb(expected);
}, 1);
- transform.on('close', common.mustNotCall('no close event'));
+ transform.on('close', common.mustCall());
transform.on('finish', common.mustNotCall('no finish event'));
transform.on('end', common.mustNotCall('no end event'));
transform.on('error', common.mustCall((err) => {
diff --git a/test/parallel/test-stream-writable-destroy.js b/test/parallel/test-stream-writable-destroy.js
index 46c4851117..565a5564e2 100644
--- a/test/parallel/test-stream-writable-destroy.js
+++ b/test/parallel/test-stream-writable-destroy.js
@@ -10,7 +10,8 @@ const { inherits } = require('util');
write(chunk, enc, cb) { cb(); }
});
- write.on('finish', common.mustCall());
+ write.on('finish', common.mustNotCall());
+ write.on('close', common.mustCall());
write.destroy();
assert.strictEqual(write.destroyed, true);
@@ -23,7 +24,8 @@ const { inherits } = require('util');
const expected = new Error('kaboom');
- write.on('finish', common.mustCall());
+ write.on('finish', common.mustNotCall());
+ write.on('close', common.mustCall());
write.on('error', common.mustCall((err) => {
assert.strictEqual(err, expected);
}));
@@ -45,6 +47,7 @@ const { inherits } = require('util');
const expected = new Error('kaboom');
write.on('finish', common.mustNotCall('no finish event'));
+ write.on('close', common.mustCall());
write.on('error', common.mustCall((err) => {
assert.strictEqual(err, expected);
}));
@@ -65,6 +68,7 @@ const { inherits } = require('util');
const expected = new Error('kaboom');
write.on('finish', common.mustNotCall('no finish event'));
+ write.on('close', common.mustCall());
// error is swallowed by the custom _destroy
write.on('error', common.mustNotCall('no error event'));
@@ -103,6 +107,7 @@ const { inherits } = require('util');
const fail = common.mustNotCall('no finish event');
write.on('finish', fail);
+ write.on('close', common.mustCall());
write.destroy();
@@ -123,6 +128,7 @@ const { inherits } = require('util');
cb(expected);
});
+ write.on('close', common.mustCall());
write.on('finish', common.mustNotCall('no finish event'));
write.on('error', common.mustCall((err) => {
assert.strictEqual(err, expected);
@@ -138,6 +144,7 @@ const { inherits } = require('util');
write(chunk, enc, cb) { cb(); }
});
+ write.on('close', common.mustCall());
write.on('error', common.mustCall());
write.destroy(new Error('kaboom 1'));
@@ -155,7 +162,7 @@ const { inherits } = require('util');
assert.strictEqual(write.destroyed, true);
// the internal destroy() mechanism should not be triggered
- write.on('finish', common.mustNotCall());
+ write.on('close', common.mustNotCall());
write.destroy();
}
diff --git a/test/parallel/test-zlib-write-after-close.js b/test/parallel/test-zlib-write-after-close.js
index 88d6643da8..160971b16b 100644
--- a/test/parallel/test-zlib-write-after-close.js
+++ b/test/parallel/test-zlib-write-after-close.js
@@ -29,9 +29,9 @@ zlib.gzip('hello', common.mustCall(function(err, out) {
common.expectsError(
() => unzip.write(out),
{
- code: 'ERR_ZLIB_BINDING_CLOSED',
+ code: 'ERR_STREAM_DESTROYED',
type: Error,
- message: 'zlib binding closed'
+ message: 'Cannot call write after a stream was destroyed'
}
);
}));