summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatteo Collina <hello@matteocollina.com>2018-10-20 15:04:57 +0200
committerMatteo Collina <hello@matteocollina.com>2018-10-24 15:23:07 +0200
commit3ec8cec6483549a91d1c89bed3c15856b71850c3 (patch)
treeff498f4353a156afcaef9dafa7332766241b7da6
parent12c530ccd5890e75515918e0456daf42260ba5b6 (diff)
downloadandroid-node-v8-3ec8cec6483549a91d1c89bed3c15856b71850c3.tar.gz
android-node-v8-3ec8cec6483549a91d1c89bed3c15856b71850c3.tar.bz2
android-node-v8-3ec8cec6483549a91d1c89bed3c15856b71850c3.zip
stream: async iteration should work with destroyed stream
Fixes https://github.com/nodejs/node/issues/23730. PR-URL: https://github.com/nodejs/node/pull/23785 Reviewed-By: Gus Caplan <me@gus.host> Reviewed-By: Luigi Pinca <luigipinca@gmail.com> Reviewed-By: Matheus Marchini <mat@mmarchini.me>
-rw-r--r--lib/internal/streams/async_iterator.js69
-rw-r--r--test/parallel/test-stream-readable-async-iterators.js40
2 files changed, 82 insertions, 27 deletions
diff --git a/lib/internal/streams/async_iterator.js b/lib/internal/streams/async_iterator.js
index 91c473ee9d..25b393d21f 100644
--- a/lib/internal/streams/async_iterator.js
+++ b/lib/internal/streams/async_iterator.js
@@ -1,5 +1,7 @@
'use strict';
+const finished = require('internal/streams/end-of-stream');
+
const kLastResolve = Symbol('lastResolve');
const kLastReject = Symbol('lastReject');
const kError = Symbol('error');
@@ -34,30 +36,6 @@ function onReadable(iter) {
process.nextTick(readAndResolve, iter);
}
-function onEnd(iter) {
- const resolve = iter[kLastResolve];
- if (resolve !== null) {
- iter[kLastPromise] = null;
- iter[kLastResolve] = null;
- iter[kLastReject] = null;
- resolve(createIterResult(null, true));
- }
- iter[kEnded] = true;
-}
-
-function onError(iter, err) {
- const reject = iter[kLastReject];
- // reject if we are waiting for data in the Promise
- // returned by next() and store the error
- if (reject !== null) {
- iter[kLastPromise] = null;
- iter[kLastResolve] = null;
- iter[kLastReject] = null;
- reject(err);
- }
- iter[kError] = err;
-}
-
function wrapForNext(lastPromise, iter) {
return function(resolve, reject) {
lastPromise.then(function() {
@@ -86,6 +64,22 @@ const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({
return Promise.resolve(createIterResult(null, true));
}
+ if (this[kStream].destroyed) {
+ // We need to defer via nextTick because if .destroy(err) is
+ // called, the error will be emitted via nextTick, and
+ // we cannot guarantee that there is no error lingering around
+ // waiting to be emitted.
+ return new Promise((resolve, reject) => {
+ process.nextTick(() => {
+ if (this[kError]) {
+ reject(this[kError]);
+ } else {
+ resolve(createIterResult(null, true));
+ }
+ });
+ });
+ }
+
// if we have multiple next() calls
// we will wait for the previous Promise to finish
// this logic is optimized to support for await loops,
@@ -155,9 +149,32 @@ const createReadableStreamAsyncIterator = (stream) => {
},
});
+ finished(stream, (err) => {
+ if (err) {
+ const reject = iterator[kLastReject];
+ // reject if we are waiting for data in the Promise
+ // returned by next() and store the error
+ if (reject !== null) {
+ iterator[kLastPromise] = null;
+ iterator[kLastResolve] = null;
+ iterator[kLastReject] = null;
+ reject(err);
+ }
+ iterator[kError] = err;
+ return;
+ }
+
+ const resolve = iterator[kLastResolve];
+ if (resolve !== null) {
+ iterator[kLastPromise] = null;
+ iterator[kLastResolve] = null;
+ iterator[kLastReject] = null;
+ resolve(createIterResult(null, true));
+ }
+ iterator[kEnded] = true;
+ });
+
stream.on('readable', onReadable.bind(null, iterator));
- stream.on('end', onEnd.bind(null, iterator));
- stream.on('error', onError.bind(null, iterator));
return iterator;
};
diff --git a/test/parallel/test-stream-readable-async-iterators.js b/test/parallel/test-stream-readable-async-iterators.js
index fb3c55846c..ec558955c6 100644
--- a/test/parallel/test-stream-readable-async-iterators.js
+++ b/test/parallel/test-stream-readable-async-iterators.js
@@ -1,7 +1,7 @@
'use strict';
const common = require('../common');
-const { Readable } = require('stream');
+const { Readable, PassThrough, pipeline } = require('stream');
const assert = require('assert');
async function tests() {
@@ -324,6 +324,44 @@ async function tests() {
assert.strictEqual(data, expected);
})();
+
+ await (async function() {
+ console.log('.next() on destroyed stream');
+ const readable = new Readable({
+ read() {
+ // no-op
+ }
+ });
+
+ readable.destroy();
+
+ try {
+ await readable[Symbol.asyncIterator]().next();
+ } catch (e) {
+ assert.strictEqual(e.code, 'ERR_STREAM_PREMATURE_CLOSE');
+ }
+ })();
+
+ await (async function() {
+ console.log('.next() on pipelined stream');
+ const readable = new Readable({
+ read() {
+ // no-op
+ }
+ });
+
+ const passthrough = new PassThrough();
+ const err = new Error('kaboom');
+ pipeline(readable, passthrough, common.mustCall((e) => {
+ assert.strictEqual(e, err);
+ }));
+ readable.destroy(err);
+ try {
+ await readable[Symbol.asyncIterator]().next();
+ } catch (e) {
+ assert.strictEqual(e, err);
+ }
+ })();
}
// to avoid missing some tests if a promise does not resolve