diff options
author | Gus Caplan <me@gus.host> | 2018-09-23 15:10:12 -0500 |
---|---|---|
committer | Gus Caplan <me@gus.host> | 2018-10-08 22:17:29 -0500 |
commit | 8bce9e8f190ad2e10eb31bdceeaab4158d5ee8b4 (patch) | |
tree | 5803076ffdfa8e8dc713e69e21e6003107fef229 /lib/internal/streams/async_iterator.js | |
parent | e688fe6b7e60d3dd50f0aa922a1ab74d85cb39b5 (diff) | |
download | android-node-v8-8bce9e8f190ad2e10eb31bdceeaab4158d5ee8b4.tar.gz android-node-v8-8bce9e8f190ad2e10eb31bdceeaab4158d5ee8b4.tar.bz2 android-node-v8-8bce9e8f190ad2e10eb31bdceeaab4158d5ee8b4.zip |
streams: refactor ReadableStream asyncIterator creation and a few fixes
Closes: https://github.com/nodejs/node/issues/23041
- Rewrite `ReadableAsyncIterator` class into
`ReadableStreamAsyncIteratorPrototype` which contains no constructor and
inherits from `%AsyncIteratorPrototype%`.
- Rewrite `AsyncIteratorRecord` into dumb function.
PR-URL: https://github.com/nodejs/node/pull/23042
Fixes: https://github.com/nodejs/node/issues/23041
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Anatoli Papirovski <apapirovski@mac.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Ruben Bridgewater <ruben@bridgewater.de>
Diffstat (limited to 'lib/internal/streams/async_iterator.js')
-rw-r--r-- | lib/internal/streams/async_iterator.js | 94 |
1 files changed, 50 insertions, 44 deletions
diff --git a/lib/internal/streams/async_iterator.js b/lib/internal/streams/async_iterator.js index 0e34573d87..91c473ee9d 100644 --- a/lib/internal/streams/async_iterator.js +++ b/lib/internal/streams/async_iterator.js @@ -8,12 +8,9 @@ const kLastPromise = Symbol('lastPromise'); const kHandlePromise = Symbol('handlePromise'); const kStream = Symbol('stream'); -const AsyncIteratorRecord = class AsyncIteratorRecord { - constructor(value, done) { - this.done = done; - this.value = value; - } -}; +function createIterResult(value, done) { + return { value, done }; +} function readAndResolve(iter) { const resolve = iter[kLastResolve]; @@ -26,7 +23,7 @@ function readAndResolve(iter) { iter[kLastPromise] = null; iter[kLastResolve] = null; iter[kLastReject] = null; - resolve(new AsyncIteratorRecord(data, false)); + resolve(createIterResult(data, false)); } } } @@ -43,7 +40,7 @@ function onEnd(iter) { iter[kLastPromise] = null; iter[kLastResolve] = null; iter[kLastReject] = null; - resolve(new AsyncIteratorRecord(null, true)); + resolve(createIterResult(null, true)); } iter[kEnded] = true; } @@ -69,39 +66,13 @@ function wrapForNext(lastPromise, iter) { }; } -const ReadableAsyncIterator = class ReadableAsyncIterator { - constructor(stream) { - this[kStream] = stream; - this[kLastResolve] = null; - this[kLastReject] = null; - this[kError] = null; - this[kEnded] = false; - this[kLastPromise] = null; - - stream.on('readable', onReadable.bind(null, this)); - stream.on('end', onEnd.bind(null, this)); - stream.on('error', onError.bind(null, this)); - - // the function passed to new Promise - // is cached so we avoid allocating a new - // closure at every run - this[kHandlePromise] = (resolve, reject) => { - const data = this[kStream].read(); - if (data) { - this[kLastPromise] = null; - this[kLastResolve] = null; - this[kLastReject] = null; - resolve(new AsyncIteratorRecord(data, false)); - } else { - this[kLastResolve] = resolve; - this[kLastReject] = reject; - } - }; - } +const AsyncIteratorPrototype = Object.getPrototypeOf( + Object.getPrototypeOf(async function* () {}).prototype); +const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({ get stream() { return this[kStream]; - } + }, next() { // if we have detected an error in the meanwhile @@ -112,7 +83,7 @@ const ReadableAsyncIterator = class ReadableAsyncIterator { } if (this[kEnded]) { - return Promise.resolve(new AsyncIteratorRecord(null, true)); + return Promise.resolve(createIterResult(null, true)); } // if we have multiple next() calls @@ -129,7 +100,7 @@ const ReadableAsyncIterator = class ReadableAsyncIterator { // without triggering the next() queue const data = this[kStream].read(); if (data !== null) { - return Promise.resolve(new AsyncIteratorRecord(data, false)); + return Promise.resolve(createIterResult(data, false)); } promise = new Promise(this[kHandlePromise]); @@ -138,7 +109,7 @@ const ReadableAsyncIterator = class ReadableAsyncIterator { this[kLastPromise] = promise; return promise; - } + }, return() { // destroy(err, cb) is a private API @@ -150,10 +121,45 @@ const ReadableAsyncIterator = class ReadableAsyncIterator { reject(err); return; } - resolve(new AsyncIteratorRecord(null, true)); + resolve(createIterResult(null, true)); }); }); - } + }, +}, AsyncIteratorPrototype); + +const createReadableStreamAsyncIterator = (stream) => { + const iterator = Object.create(ReadableStreamAsyncIteratorPrototype, { + [kStream]: { value: stream, writable: true }, + [kLastResolve]: { value: null, writable: true }, + [kLastReject]: { value: null, writable: true }, + [kError]: { value: null, writable: true }, + [kEnded]: { value: false, writable: true }, + [kLastPromise]: { value: null, writable: true }, + // the function passed to new Promise + // is cached so we avoid allocating a new + // closure at every run + [kHandlePromise]: { + value: (resolve, reject) => { + const data = iterator[kStream].read(); + if (data) { + iterator[kLastPromise] = null; + iterator[kLastResolve] = null; + iterator[kLastReject] = null; + resolve(createIterResult(data, false)); + } else { + iterator[kLastResolve] = resolve; + iterator[kLastReject] = reject; + } + }, + writable: true, + }, + }); + + stream.on('readable', onReadable.bind(null, iterator)); + stream.on('end', onEnd.bind(null, iterator)); + stream.on('error', onError.bind(null, iterator)); + + return iterator; }; -module.exports = ReadableAsyncIterator; +module.exports = createReadableStreamAsyncIterator; |