diff options
author | Matteo Collina <hello@matteocollina.com> | 2018-10-20 15:04:57 +0200 |
---|---|---|
committer | Matteo Collina <hello@matteocollina.com> | 2018-10-24 15:23:07 +0200 |
commit | 3ec8cec6483549a91d1c89bed3c15856b71850c3 (patch) | |
tree | ff498f4353a156afcaef9dafa7332766241b7da6 /lib | |
parent | 12c530ccd5890e75515918e0456daf42260ba5b6 (diff) | |
download | android-node-v8-3ec8cec6483549a91d1c89bed3c15856b71850c3.tar.gz android-node-v8-3ec8cec6483549a91d1c89bed3c15856b71850c3.tar.bz2 android-node-v8-3ec8cec6483549a91d1c89bed3c15856b71850c3.zip |
stream: async iteration should work with destroyed stream
Fixes https://github.com/nodejs/node/issues/23730.
PR-URL: https://github.com/nodejs/node/pull/23785
Reviewed-By: Gus Caplan <me@gus.host>
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: Matheus Marchini <mat@mmarchini.me>
Diffstat (limited to 'lib')
-rw-r--r-- | lib/internal/streams/async_iterator.js | 69 |
1 files changed, 43 insertions, 26 deletions
diff --git a/lib/internal/streams/async_iterator.js b/lib/internal/streams/async_iterator.js index 91c473ee9d..25b393d21f 100644 --- a/lib/internal/streams/async_iterator.js +++ b/lib/internal/streams/async_iterator.js @@ -1,5 +1,7 @@ 'use strict'; +const finished = require('internal/streams/end-of-stream'); + const kLastResolve = Symbol('lastResolve'); const kLastReject = Symbol('lastReject'); const kError = Symbol('error'); @@ -34,30 +36,6 @@ function onReadable(iter) { process.nextTick(readAndResolve, iter); } -function onEnd(iter) { - const resolve = iter[kLastResolve]; - if (resolve !== null) { - iter[kLastPromise] = null; - iter[kLastResolve] = null; - iter[kLastReject] = null; - resolve(createIterResult(null, true)); - } - iter[kEnded] = true; -} - -function onError(iter, err) { - const reject = iter[kLastReject]; - // reject if we are waiting for data in the Promise - // returned by next() and store the error - if (reject !== null) { - iter[kLastPromise] = null; - iter[kLastResolve] = null; - iter[kLastReject] = null; - reject(err); - } - iter[kError] = err; -} - function wrapForNext(lastPromise, iter) { return function(resolve, reject) { lastPromise.then(function() { @@ -86,6 +64,22 @@ const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({ return Promise.resolve(createIterResult(null, true)); } + if (this[kStream].destroyed) { + // We need to defer via nextTick because if .destroy(err) is + // called, the error will be emitted via nextTick, and + // we cannot guarantee that there is no error lingering around + // waiting to be emitted. + return new Promise((resolve, reject) => { + process.nextTick(() => { + if (this[kError]) { + reject(this[kError]); + } else { + resolve(createIterResult(null, true)); + } + }); + }); + } + // if we have multiple next() calls // we will wait for the previous Promise to finish // this logic is optimized to support for await loops, @@ -155,9 +149,32 @@ const createReadableStreamAsyncIterator = (stream) => { }, }); + finished(stream, (err) => { + if (err) { + const reject = iterator[kLastReject]; + // reject if we are waiting for data in the Promise + // returned by next() and store the error + if (reject !== null) { + iterator[kLastPromise] = null; + iterator[kLastResolve] = null; + iterator[kLastReject] = null; + reject(err); + } + iterator[kError] = err; + return; + } + + const resolve = iterator[kLastResolve]; + if (resolve !== null) { + iterator[kLastPromise] = null; + iterator[kLastResolve] = null; + iterator[kLastReject] = null; + resolve(createIterResult(null, true)); + } + iterator[kEnded] = true; + }); + stream.on('readable', onReadable.bind(null, iterator)); - stream.on('end', onEnd.bind(null, iterator)); - stream.on('error', onError.bind(null, iterator)); return iterator; }; |