diff options
author | Robert Nagy <ronagy@icloud.com> | 2019-07-16 00:03:23 +0200 |
---|---|---|
committer | Rich Trott <rtrott@gmail.com> | 2019-08-16 21:33:53 -0700 |
commit | 4a2bd69db99c1bb8692e1f653edcb225fbc23032 (patch) | |
tree | b90971ab2b513dcf3f69758113d72661ea017e16 | |
parent | a890771cd0a31bda055fc71741ace7822bc678dd (diff) | |
download | android-node-v8-4a2bd69db99c1bb8692e1f653edcb225fbc23032.tar.gz android-node-v8-4a2bd69db99c1bb8692e1f653edcb225fbc23032.tar.bz2 android-node-v8-4a2bd69db99c1bb8692e1f653edcb225fbc23032.zip |
stream: fix destroy() behavior
Ensure errorEmitted is always set. Only emit 'error' once.
PR-URL: https://github.com/nodejs/node/pull/29058
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: Franziska Hinkelmann <franziska.hinkelmann@gmail.com>
Reviewed-By: Rich Trott <rtrott@gmail.com>
-rw-r--r-- | doc/api/stream.md | 3 | ||||
-rw-r--r-- | lib/_stream_readable.js | 3 | ||||
-rw-r--r-- | lib/_stream_writable.js | 2 | ||||
-rw-r--r-- | lib/internal/streams/destroy.js | 107 | ||||
-rw-r--r-- | test/parallel/test-net-connect-buffer.js | 6 | ||||
-rw-r--r-- | test/parallel/test-stream-error-once.js | 19 | ||||
-rw-r--r-- | test/parallel/test-stream-readable-invalid-chunk.js | 33 | ||||
-rw-r--r-- | test/parallel/test-stream-readable-unshift.js | 17 | ||||
-rw-r--r-- | test/parallel/test-stream-unshift-read-race.js | 8 | ||||
-rw-r--r-- | test/parallel/test-stream2-writable.js | 39 |
10 files changed, 153 insertions, 84 deletions
diff --git a/doc/api/stream.md b/doc/api/stream.md index cb6d23e525..07755e05dc 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -281,6 +281,9 @@ The stream is not closed when the `'error'` event is emitted unless the [`autoDestroy`][writable-new] option was set to `true` when creating the stream. +After `'error'`, no further events other than `'close'` *should* be emitted +(including `'error'` events). + ##### Event: 'finish' <!-- YAML added: v0.9.4 diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 01f95104b8..bb9024e637 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -117,6 +117,9 @@ function ReadableState(options, stream, isDuplex) { this.resumeScheduled = false; this.paused = true; + // True if the error was already emitted and should not be thrown again + this.errorEmitted = false; + // Should close be emitted on destroy. Defaults to true. this.emitClose = options.emitClose !== false; diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 5d52b0d2a4..315360ca6b 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -429,13 +429,11 @@ function onwriteError(stream, state, sync, er, cb) { // This can emit finish, and it will always happen // after error process.nextTick(finishMaybe, stream, state); - stream._writableState.errorEmitted = true; errorOrDestroy(stream, er); } else { // The caller expect this to happen before if // it is async cb(er); - stream._writableState.errorEmitted = true; errorOrDestroy(stream, er); // This can emit finish, but finish must // always follow error diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index 200c75459a..0d3eb6327b 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -1,22 +1,37 @@ 'use strict'; +function needError(stream, err) { + if (!err) { + return false; + } + + const r = stream._readableState; + const w = stream._writableState; + + if ((w && w.errorEmitted) || (r && r.errorEmitted)) { + return false; + } + + if (w) { + w.errorEmitted = true; + } + if (r) { + r.errorEmitted = true; + } + + return true; +} + // Undocumented cb() API, needed for core, not for public API function destroy(err, cb) { - const readableDestroyed = this._readableState && - this._readableState.destroyed; - const writableDestroyed = this._writableState && - this._writableState.destroyed; + const r = this._readableState; + const w = this._writableState; - if (readableDestroyed || writableDestroyed) { + if ((w && w.destroyed) || (r && r.destroyed)) { if (cb) { cb(err); - } else if (err) { - if (!this._writableState) { - process.nextTick(emitErrorNT, this, err); - } else if (!this._writableState.errorEmitted) { - this._writableState.errorEmitted = true; - process.nextTick(emitErrorNT, this, err); - } + } else if (needError(this, err)) { + process.nextTick(emitErrorNT, this, err); } return this; @@ -25,28 +40,19 @@ function destroy(err, cb) { // We set destroyed to true before firing error callbacks in order // to make it re-entrance safe in case destroy() is called within callbacks - if (this._readableState) { - this._readableState.destroyed = true; + if (w) { + w.destroyed = true; } - - // If this is a duplex stream mark the writable part as destroyed as well - if (this._writableState) { - this._writableState.destroyed = true; + if (r) { + r.destroyed = true; } this._destroy(err || null, (err) => { - if (!cb && err) { - if (!this._writableState) { - process.nextTick(emitErrorAndCloseNT, this, err); - } else if (!this._writableState.errorEmitted) { - this._writableState.errorEmitted = true; - process.nextTick(emitErrorAndCloseNT, this, err); - } else { - process.nextTick(emitCloseNT, this); - } - } else if (cb) { + if (cb) { process.nextTick(emitCloseNT, this); cb(err); + } else if (needError(this, err)) { + process.nextTick(emitErrorAndCloseNT, this, err); } else { process.nextTick(emitCloseNT, this); } @@ -61,29 +67,36 @@ function emitErrorAndCloseNT(self, err) { } function emitCloseNT(self) { - if (self._writableState && !self._writableState.emitClose) + const r = self._readableState; + const w = self._writableState; + + if (w && !w.emitClose) return; - if (self._readableState && !self._readableState.emitClose) + if (r && !r.emitClose) return; self.emit('close'); } function undestroy() { - if (this._readableState) { - this._readableState.destroyed = false; - this._readableState.reading = false; - this._readableState.ended = false; - this._readableState.endEmitted = false; + const r = this._readableState; + const w = this._writableState; + + if (r) { + r.destroyed = false; + r.reading = false; + r.ended = false; + r.endEmitted = false; + r.errorEmitted = false; } - if (this._writableState) { - this._writableState.destroyed = false; - this._writableState.ended = false; - this._writableState.ending = false; - this._writableState.finalCalled = false; - this._writableState.prefinished = false; - this._writableState.finished = false; - this._writableState.errorEmitted = false; + if (w) { + w.destroyed = false; + w.ended = false; + w.ending = false; + w.finalCalled = false; + w.prefinished = false; + w.finished = false; + w.errorEmitted = false; } } @@ -98,12 +111,12 @@ function errorOrDestroy(stream, err) { // the error to be emitted nextTick. In a future // semver major update we should change the default to this. - const rState = stream._readableState; - const wState = stream._writableState; + const r = stream._readableState; + const w = stream._writableState; - if ((rState && rState.autoDestroy) || (wState && wState.autoDestroy)) + if ((r && r.autoDestroy) || (w && w.autoDestroy)) stream.destroy(err); - else + else if (needError(stream, err)) stream.emit('error', err); } diff --git a/test/parallel/test-net-connect-buffer.js b/test/parallel/test-net-connect-buffer.js index 8c95400fb6..41df6a0666 100644 --- a/test/parallel/test-net-connect-buffer.js +++ b/test/parallel/test-net-connect-buffer.js @@ -69,12 +69,14 @@ tcp.listen(0, common.mustCall(function() { [], {} ].forEach((value) => { - common.expectsError(() => socket.write(value), { + // We need to check the callback since 'error' will only + // be emitted once per instance. + socket.write(value, common.expectsError({ code: 'ERR_INVALID_ARG_TYPE', type: TypeError, message: 'The "chunk" argument must be one of type string or Buffer. ' + `Received type ${typeof value}` - }); + })); }); // Write a string that contains a multi-byte character sequence to test that diff --git a/test/parallel/test-stream-error-once.js b/test/parallel/test-stream-error-once.js new file mode 100644 index 0000000000..71f268cfa4 --- /dev/null +++ b/test/parallel/test-stream-error-once.js @@ -0,0 +1,19 @@ +'use strict'; +const common = require('../common'); +const { Writable, Readable } = require('stream'); + +{ + const writable = new Writable(); + writable.on('error', common.mustCall()); + writable.end(); + writable.write('h'); + writable.write('h'); +} + +{ + const readable = new Readable(); + readable.on('error', common.mustCall()); + readable.push(null); + readable.push('h'); + readable.push('h'); +} diff --git a/test/parallel/test-stream-readable-invalid-chunk.js b/test/parallel/test-stream-readable-invalid-chunk.js index fcd7414bb6..3e147c065f 100644 --- a/test/parallel/test-stream-readable-invalid-chunk.js +++ b/test/parallel/test-stream-readable-invalid-chunk.js @@ -3,17 +3,32 @@ const common = require('../common'); const stream = require('stream'); -const readable = new stream.Readable({ - read: () => {} -}); - -function checkError(fn) { - common.expectsError(fn, { +function testPushArg(val) { + const readable = new stream.Readable({ + read: () => {} + }); + readable.on('error', common.expectsError({ code: 'ERR_INVALID_ARG_TYPE', type: TypeError + })); + readable.push(val); +} + +testPushArg([]); +testPushArg({}); +testPushArg(0); + +function testUnshiftArg(val) { + const readable = new stream.Readable({ + read: () => {} }); + readable.on('error', common.expectsError({ + code: 'ERR_INVALID_ARG_TYPE', + type: TypeError + })); + readable.unshift(val); } -checkError(() => readable.push([])); -checkError(() => readable.push({})); -checkError(() => readable.push(0)); +testUnshiftArg([]); +testUnshiftArg({}); +testUnshiftArg(0); diff --git a/test/parallel/test-stream-readable-unshift.js b/test/parallel/test-stream-readable-unshift.js index d574b7d046..6eefb55d73 100644 --- a/test/parallel/test-stream-readable-unshift.js +++ b/test/parallel/test-stream-readable-unshift.js @@ -113,23 +113,6 @@ const { Readable } = require('stream'); } { - // Check that error is thrown for invalid chunks - - const readable = new Readable({ read() {} }); - function checkError(fn) { - common.expectsError(fn, { - code: 'ERR_INVALID_ARG_TYPE', - type: TypeError - }); - } - - checkError(() => readable.unshift([])); - checkError(() => readable.unshift({})); - checkError(() => readable.unshift(0)); - -} - -{ // Check that ObjectMode works const readable = new Readable({ objectMode: true, read() {} }); diff --git a/test/parallel/test-stream-unshift-read-race.js b/test/parallel/test-stream-unshift-read-race.js index f7b633338c..562e10776e 100644 --- a/test/parallel/test-stream-unshift-read-race.js +++ b/test/parallel/test-stream-unshift-read-race.js @@ -86,13 +86,7 @@ w._write = function(chunk, encoding, cb) { }; r.on('end', common.mustCall(function() { - common.expectsError(function() { - r.unshift(Buffer.allocUnsafe(1)); - }, { - code: 'ERR_STREAM_UNSHIFT_AFTER_END_EVENT', - type: Error, - message: 'stream.unshift() after end event' - }); + r.unshift(Buffer.allocUnsafe(1)); w.end(); })); diff --git a/test/parallel/test-stream2-writable.js b/test/parallel/test-stream2-writable.js index 262606d906..b20f5d3f18 100644 --- a/test/parallel/test-stream2-writable.js +++ b/test/parallel/test-stream2-writable.js @@ -402,3 +402,42 @@ const helloWorldBuffer = Buffer.from('hello world'); w.write(Buffer.allocUnsafe(1)); w.end(Buffer.allocUnsafe(0)); } + +{ + // Verify that error is only emitted once when failing in _finish. + const w = new W(); + + w._final = common.mustCall(function(cb) { + cb(new Error('test')); + }); + w.on('error', common.mustCall((err) => { + assert.strictEqual(w._writableState.errorEmitted, true); + assert.strictEqual(err.message, 'test'); + w.on('error', common.mustNotCall()); + w.destroy(new Error()); + })); + w.end(); +} + +{ + // Verify that error is only emitted once when failing in write. + const w = new W(); + w.on('error', common.mustCall((err) => { + assert.strictEqual(w._writableState.errorEmitted, true); + assert.strictEqual(err.code, 'ERR_STREAM_NULL_VALUES'); + })); + w.write(null); + w.destroy(new Error()); +} + +{ + // Verify that error is only emitted once when failing in write after end. + const w = new W(); + w.on('error', common.mustCall((err) => { + assert.strictEqual(w._writableState.errorEmitted, true); + assert.strictEqual(err.code, 'ERR_STREAM_WRITE_AFTER_END'); + })); + w.end(); + w.write('hello'); + w.destroy(new Error()); +} |