From 9f873b3a659e82eb232785c9e7cfec6df8dd5277 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Thu, 26 Sep 2019 11:23:45 +0200 Subject: Revert "stream: make finished call the callback if the stream is closed" This reverts commit b03845b9376aec590b89f753a4b7c1b47729c5f8. PR-URL: https://github.com/nodejs/node/pull/29717 Reviewed-By: Benjamin Gruenbaum Reviewed-By: Rich Trott Reviewed-By: Beth Griggs --- lib/internal/streams/async_iterator.js | 10 ++++++ lib/internal/streams/end-of-stream.js | 58 +++++++++++----------------------- 2 files changed, 29 insertions(+), 39 deletions(-) (limited to 'lib') diff --git a/lib/internal/streams/async_iterator.js b/lib/internal/streams/async_iterator.js index 083befb89c..07f2191e71 100644 --- a/lib/internal/streams/async_iterator.js +++ b/lib/internal/streams/async_iterator.js @@ -112,6 +112,16 @@ const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({ return() { return new Promise((resolve, reject) => { const stream = this[kStream]; + + // TODO(ronag): Remove this check once finished() handles + // already ended and/or destroyed streams. + const ended = stream.destroyed || stream.readableEnded || + (stream._readableState && stream._readableState.endEmitted); + if (ended) { + resolve(createIterResult(undefined, true)); + return; + } + finished(stream, (err) => { if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') { reject(err); diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index 949ab63814..3f1c0f316c 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -13,18 +13,6 @@ function isRequest(stream) { return stream.setHeader && typeof stream.abort === 'function'; } -function isReadable(stream) { - return typeof stream.readable === 'boolean' || - typeof stream.readableEnded === 'boolean' || - !!stream._readableState; -} - -function isWritable(stream) { - return typeof stream.writable === 'boolean' || - typeof stream.writableEnded === 'boolean' || - !!stream._writableState; -} - function eos(stream, opts, callback) { if (arguments.length === 2) { callback = opts; @@ -40,51 +28,43 @@ function eos(stream, opts, callback) { callback = once(callback); - const onerror = (err) => { - callback.call(stream, err); - }; - - let writableFinished = stream.writableFinished || - (stream._writableState && stream._writableState.finished); - let readableEnded = stream.readableEnded || - (stream._readableState && stream._readableState.endEmitted); - - if (writableFinished || readableEnded || stream.destroyed || - stream.aborted) { - if (opts.error !== false) stream.on('error', onerror); - // A destroy(err) call emits error in nextTick. - process.nextTick(callback.bind(stream)); - return () => { - stream.removeListener('error', onerror); - }; - } - - let readable = opts.readable || - (opts.readable !== false && isReadable(stream)); - let writable = opts.writable || - (opts.writable !== false && isWritable(stream)); + let readable = opts.readable || (opts.readable !== false && stream.readable); + let writable = opts.writable || (opts.writable !== false && stream.writable); const onlegacyfinish = () => { if (!stream.writable) onfinish(); }; + var writableEnded = stream._writableState && stream._writableState.finished; const onfinish = () => { writable = false; - writableFinished = true; + writableEnded = true; if (!readable) callback.call(stream); }; + var readableEnded = stream.readableEnded || + (stream._readableState && stream._readableState.endEmitted); const onend = () => { readable = false; readableEnded = true; if (!writable) callback.call(stream); }; + const onerror = (err) => { + callback.call(stream, err); + }; + const onclose = () => { + let err; if (readable && !readableEnded) { - callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()); - } else if (writable && !writableFinished) { - callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()); + if (!stream._readableState || !stream._readableState.ended) + err = new ERR_STREAM_PREMATURE_CLOSE(); + return callback.call(stream, err); + } + if (writable && !writableEnded) { + if (!stream._writableState || !stream._writableState.ended) + err = new ERR_STREAM_PREMATURE_CLOSE(); + return callback.call(stream, err); } }; -- cgit v1.2.3