aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Nagy <ronagy@icloud.com>2019-07-16 00:03:23 +0200
committerRich Trott <rtrott@gmail.com>2019-08-16 21:33:53 -0700
commit4a2bd69db99c1bb8692e1f653edcb225fbc23032 (patch)
treeb90971ab2b513dcf3f69758113d72661ea017e16
parenta890771cd0a31bda055fc71741ace7822bc678dd (diff)
downloadandroid-node-v8-4a2bd69db99c1bb8692e1f653edcb225fbc23032.tar.gz
android-node-v8-4a2bd69db99c1bb8692e1f653edcb225fbc23032.tar.bz2
android-node-v8-4a2bd69db99c1bb8692e1f653edcb225fbc23032.zip
stream: fix destroy() behavior
Ensure errorEmitted is always set. Only emit 'error' once. PR-URL: https://github.com/nodejs/node/pull/29058 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Luigi Pinca <luigipinca@gmail.com> Reviewed-By: Franziska Hinkelmann <franziska.hinkelmann@gmail.com> Reviewed-By: Rich Trott <rtrott@gmail.com>
-rw-r--r--doc/api/stream.md3
-rw-r--r--lib/_stream_readable.js3
-rw-r--r--lib/_stream_writable.js2
-rw-r--r--lib/internal/streams/destroy.js107
-rw-r--r--test/parallel/test-net-connect-buffer.js6
-rw-r--r--test/parallel/test-stream-error-once.js19
-rw-r--r--test/parallel/test-stream-readable-invalid-chunk.js33
-rw-r--r--test/parallel/test-stream-readable-unshift.js17
-rw-r--r--test/parallel/test-stream-unshift-read-race.js8
-rw-r--r--test/parallel/test-stream2-writable.js39
10 files changed, 153 insertions, 84 deletions
diff --git a/doc/api/stream.md b/doc/api/stream.md
index cb6d23e525..07755e05dc 100644
--- a/doc/api/stream.md
+++ b/doc/api/stream.md
@@ -281,6 +281,9 @@ The stream is not closed when the `'error'` event is emitted unless the
[`autoDestroy`][writable-new] option was set to `true` when creating the
stream.
+After `'error'`, no further events other than `'close'` *should* be emitted
+(including `'error'` events).
+
##### Event: 'finish'
<!-- YAML
added: v0.9.4
diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js
index 01f95104b8..bb9024e637 100644
--- a/lib/_stream_readable.js
+++ b/lib/_stream_readable.js
@@ -117,6 +117,9 @@ function ReadableState(options, stream, isDuplex) {
this.resumeScheduled = false;
this.paused = true;
+ // True if the error was already emitted and should not be thrown again
+ this.errorEmitted = false;
+
// Should close be emitted on destroy. Defaults to true.
this.emitClose = options.emitClose !== false;
diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js
index 5d52b0d2a4..315360ca6b 100644
--- a/lib/_stream_writable.js
+++ b/lib/_stream_writable.js
@@ -429,13 +429,11 @@ function onwriteError(stream, state, sync, er, cb) {
// This can emit finish, and it will always happen
// after error
process.nextTick(finishMaybe, stream, state);
- stream._writableState.errorEmitted = true;
errorOrDestroy(stream, er);
} else {
// The caller expect this to happen before if
// it is async
cb(er);
- stream._writableState.errorEmitted = true;
errorOrDestroy(stream, er);
// This can emit finish, but finish must
// always follow error
diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js
index 200c75459a..0d3eb6327b 100644
--- a/lib/internal/streams/destroy.js
+++ b/lib/internal/streams/destroy.js
@@ -1,22 +1,37 @@
'use strict';
+function needError(stream, err) {
+ if (!err) {
+ return false;
+ }
+
+ const r = stream._readableState;
+ const w = stream._writableState;
+
+ if ((w && w.errorEmitted) || (r && r.errorEmitted)) {
+ return false;
+ }
+
+ if (w) {
+ w.errorEmitted = true;
+ }
+ if (r) {
+ r.errorEmitted = true;
+ }
+
+ return true;
+}
+
// Undocumented cb() API, needed for core, not for public API
function destroy(err, cb) {
- const readableDestroyed = this._readableState &&
- this._readableState.destroyed;
- const writableDestroyed = this._writableState &&
- this._writableState.destroyed;
+ const r = this._readableState;
+ const w = this._writableState;
- if (readableDestroyed || writableDestroyed) {
+ if ((w && w.destroyed) || (r && r.destroyed)) {
if (cb) {
cb(err);
- } else if (err) {
- if (!this._writableState) {
- process.nextTick(emitErrorNT, this, err);
- } else if (!this._writableState.errorEmitted) {
- this._writableState.errorEmitted = true;
- process.nextTick(emitErrorNT, this, err);
- }
+ } else if (needError(this, err)) {
+ process.nextTick(emitErrorNT, this, err);
}
return this;
@@ -25,28 +40,19 @@ function destroy(err, cb) {
// We set destroyed to true before firing error callbacks in order
// to make it re-entrance safe in case destroy() is called within callbacks
- if (this._readableState) {
- this._readableState.destroyed = true;
+ if (w) {
+ w.destroyed = true;
}
-
- // If this is a duplex stream mark the writable part as destroyed as well
- if (this._writableState) {
- this._writableState.destroyed = true;
+ if (r) {
+ r.destroyed = true;
}
this._destroy(err || null, (err) => {
- if (!cb && err) {
- if (!this._writableState) {
- process.nextTick(emitErrorAndCloseNT, this, err);
- } else if (!this._writableState.errorEmitted) {
- this._writableState.errorEmitted = true;
- process.nextTick(emitErrorAndCloseNT, this, err);
- } else {
- process.nextTick(emitCloseNT, this);
- }
- } else if (cb) {
+ if (cb) {
process.nextTick(emitCloseNT, this);
cb(err);
+ } else if (needError(this, err)) {
+ process.nextTick(emitErrorAndCloseNT, this, err);
} else {
process.nextTick(emitCloseNT, this);
}
@@ -61,29 +67,36 @@ function emitErrorAndCloseNT(self, err) {
}
function emitCloseNT(self) {
- if (self._writableState && !self._writableState.emitClose)
+ const r = self._readableState;
+ const w = self._writableState;
+
+ if (w && !w.emitClose)
return;
- if (self._readableState && !self._readableState.emitClose)
+ if (r && !r.emitClose)
return;
self.emit('close');
}
function undestroy() {
- if (this._readableState) {
- this._readableState.destroyed = false;
- this._readableState.reading = false;
- this._readableState.ended = false;
- this._readableState.endEmitted = false;
+ const r = this._readableState;
+ const w = this._writableState;
+
+ if (r) {
+ r.destroyed = false;
+ r.reading = false;
+ r.ended = false;
+ r.endEmitted = false;
+ r.errorEmitted = false;
}
- if (this._writableState) {
- this._writableState.destroyed = false;
- this._writableState.ended = false;
- this._writableState.ending = false;
- this._writableState.finalCalled = false;
- this._writableState.prefinished = false;
- this._writableState.finished = false;
- this._writableState.errorEmitted = false;
+ if (w) {
+ w.destroyed = false;
+ w.ended = false;
+ w.ending = false;
+ w.finalCalled = false;
+ w.prefinished = false;
+ w.finished = false;
+ w.errorEmitted = false;
}
}
@@ -98,12 +111,12 @@ function errorOrDestroy(stream, err) {
// the error to be emitted nextTick. In a future
// semver major update we should change the default to this.
- const rState = stream._readableState;
- const wState = stream._writableState;
+ const r = stream._readableState;
+ const w = stream._writableState;
- if ((rState && rState.autoDestroy) || (wState && wState.autoDestroy))
+ if ((r && r.autoDestroy) || (w && w.autoDestroy))
stream.destroy(err);
- else
+ else if (needError(stream, err))
stream.emit('error', err);
}
diff --git a/test/parallel/test-net-connect-buffer.js b/test/parallel/test-net-connect-buffer.js
index 8c95400fb6..41df6a0666 100644
--- a/test/parallel/test-net-connect-buffer.js
+++ b/test/parallel/test-net-connect-buffer.js
@@ -69,12 +69,14 @@ tcp.listen(0, common.mustCall(function() {
[],
{}
].forEach((value) => {
- common.expectsError(() => socket.write(value), {
+ // We need to check the callback since 'error' will only
+ // be emitted once per instance.
+ socket.write(value, common.expectsError({
code: 'ERR_INVALID_ARG_TYPE',
type: TypeError,
message: 'The "chunk" argument must be one of type string or Buffer. ' +
`Received type ${typeof value}`
- });
+ }));
});
// Write a string that contains a multi-byte character sequence to test that
diff --git a/test/parallel/test-stream-error-once.js b/test/parallel/test-stream-error-once.js
new file mode 100644
index 0000000000..71f268cfa4
--- /dev/null
+++ b/test/parallel/test-stream-error-once.js
@@ -0,0 +1,19 @@
+'use strict';
+const common = require('../common');
+const { Writable, Readable } = require('stream');
+
+{
+ const writable = new Writable();
+ writable.on('error', common.mustCall());
+ writable.end();
+ writable.write('h');
+ writable.write('h');
+}
+
+{
+ const readable = new Readable();
+ readable.on('error', common.mustCall());
+ readable.push(null);
+ readable.push('h');
+ readable.push('h');
+}
diff --git a/test/parallel/test-stream-readable-invalid-chunk.js b/test/parallel/test-stream-readable-invalid-chunk.js
index fcd7414bb6..3e147c065f 100644
--- a/test/parallel/test-stream-readable-invalid-chunk.js
+++ b/test/parallel/test-stream-readable-invalid-chunk.js
@@ -3,17 +3,32 @@
const common = require('../common');
const stream = require('stream');
-const readable = new stream.Readable({
- read: () => {}
-});
-
-function checkError(fn) {
- common.expectsError(fn, {
+function testPushArg(val) {
+ const readable = new stream.Readable({
+ read: () => {}
+ });
+ readable.on('error', common.expectsError({
code: 'ERR_INVALID_ARG_TYPE',
type: TypeError
+ }));
+ readable.push(val);
+}
+
+testPushArg([]);
+testPushArg({});
+testPushArg(0);
+
+function testUnshiftArg(val) {
+ const readable = new stream.Readable({
+ read: () => {}
});
+ readable.on('error', common.expectsError({
+ code: 'ERR_INVALID_ARG_TYPE',
+ type: TypeError
+ }));
+ readable.unshift(val);
}
-checkError(() => readable.push([]));
-checkError(() => readable.push({}));
-checkError(() => readable.push(0));
+testUnshiftArg([]);
+testUnshiftArg({});
+testUnshiftArg(0);
diff --git a/test/parallel/test-stream-readable-unshift.js b/test/parallel/test-stream-readable-unshift.js
index d574b7d046..6eefb55d73 100644
--- a/test/parallel/test-stream-readable-unshift.js
+++ b/test/parallel/test-stream-readable-unshift.js
@@ -113,23 +113,6 @@ const { Readable } = require('stream');
}
{
- // Check that error is thrown for invalid chunks
-
- const readable = new Readable({ read() {} });
- function checkError(fn) {
- common.expectsError(fn, {
- code: 'ERR_INVALID_ARG_TYPE',
- type: TypeError
- });
- }
-
- checkError(() => readable.unshift([]));
- checkError(() => readable.unshift({}));
- checkError(() => readable.unshift(0));
-
-}
-
-{
// Check that ObjectMode works
const readable = new Readable({ objectMode: true, read() {} });
diff --git a/test/parallel/test-stream-unshift-read-race.js b/test/parallel/test-stream-unshift-read-race.js
index f7b633338c..562e10776e 100644
--- a/test/parallel/test-stream-unshift-read-race.js
+++ b/test/parallel/test-stream-unshift-read-race.js
@@ -86,13 +86,7 @@ w._write = function(chunk, encoding, cb) {
};
r.on('end', common.mustCall(function() {
- common.expectsError(function() {
- r.unshift(Buffer.allocUnsafe(1));
- }, {
- code: 'ERR_STREAM_UNSHIFT_AFTER_END_EVENT',
- type: Error,
- message: 'stream.unshift() after end event'
- });
+ r.unshift(Buffer.allocUnsafe(1));
w.end();
}));
diff --git a/test/parallel/test-stream2-writable.js b/test/parallel/test-stream2-writable.js
index 262606d906..b20f5d3f18 100644
--- a/test/parallel/test-stream2-writable.js
+++ b/test/parallel/test-stream2-writable.js
@@ -402,3 +402,42 @@ const helloWorldBuffer = Buffer.from('hello world');
w.write(Buffer.allocUnsafe(1));
w.end(Buffer.allocUnsafe(0));
}
+
+{
+ // Verify that error is only emitted once when failing in _finish.
+ const w = new W();
+
+ w._final = common.mustCall(function(cb) {
+ cb(new Error('test'));
+ });
+ w.on('error', common.mustCall((err) => {
+ assert.strictEqual(w._writableState.errorEmitted, true);
+ assert.strictEqual(err.message, 'test');
+ w.on('error', common.mustNotCall());
+ w.destroy(new Error());
+ }));
+ w.end();
+}
+
+{
+ // Verify that error is only emitted once when failing in write.
+ const w = new W();
+ w.on('error', common.mustCall((err) => {
+ assert.strictEqual(w._writableState.errorEmitted, true);
+ assert.strictEqual(err.code, 'ERR_STREAM_NULL_VALUES');
+ }));
+ w.write(null);
+ w.destroy(new Error());
+}
+
+{
+ // Verify that error is only emitted once when failing in write after end.
+ const w = new W();
+ w.on('error', common.mustCall((err) => {
+ assert.strictEqual(w._writableState.errorEmitted, true);
+ assert.strictEqual(err.code, 'ERR_STREAM_WRITE_AFTER_END');
+ }));
+ w.end();
+ w.write('hello');
+ w.destroy(new Error());
+}