summaryrefslogtreecommitdiff
path: root/lib/internal/streams/from.js
diff options
context:
space:
mode:
Diffstat (limited to 'lib/internal/streams/from.js')
-rw-r--r--lib/internal/streams/from.js46
1 files changed, 46 insertions, 0 deletions
diff --git a/lib/internal/streams/from.js b/lib/internal/streams/from.js
new file mode 100644
index 0000000000..e809f2658d
--- /dev/null
+++ b/lib/internal/streams/from.js
@@ -0,0 +1,46 @@
+'use strict';
+
+const {
+ ERR_INVALID_ARG_TYPE
+} = require('internal/errors').codes;
+
+function from(Readable, iterable, opts) {
+ let iterator;
+ if (iterable && iterable[Symbol.asyncIterator])
+ iterator = iterable[Symbol.asyncIterator]();
+ else if (iterable && iterable[Symbol.iterator])
+ iterator = iterable[Symbol.iterator]();
+ else
+ throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable);
+
+ const readable = new Readable({
+ objectMode: true,
+ ...opts
+ });
+ // Reading boolean to protect against _read
+ // being called before last iteration completion.
+ let reading = false;
+ readable._read = function() {
+ if (!reading) {
+ reading = true;
+ next();
+ }
+ };
+ async function next() {
+ try {
+ const { value, done } = await iterator.next();
+ if (done) {
+ readable.push(null);
+ } else if (readable.push(await value)) {
+ next();
+ } else {
+ reading = false;
+ }
+ } catch (err) {
+ readable.destroy(err);
+ }
+ }
+ return readable;
+}
+
+module.exports = from;