summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/api/stream.md6
-rw-r--r--lib/internal/streams/pipeline.js13
-rw-r--r--test/parallel/test-stream-pipeline.js19
3 files changed, 14 insertions, 24 deletions
diff --git a/doc/api/stream.md b/doc/api/stream.md
index 014b2f68b3..09ff2c02b1 100644
--- a/doc/api/stream.md
+++ b/doc/api/stream.md
@@ -1340,14 +1340,14 @@ run().catch(console.error);
rs.resume(); // drain the stream
```
-### stream.pipeline(...streams[, callback])
+### stream.pipeline(...streams, callback)
<!-- YAML
added: v10.0.0
-->
* `...streams` {Stream} Two or more streams to pipe between.
-* `callback` {Function} A callback function that takes an optional error
- argument.
+* `callback` {Function} Called when the pipeline is fully done.
+ * `err` {Error}
A module method to pipe between streams forwarding errors and properly cleaning
up and provide a callback when the pipeline is complete.
diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js
index 849b3d39db..caa4042339 100644
--- a/lib/internal/streams/pipeline.js
+++ b/lib/internal/streams/pipeline.js
@@ -6,6 +6,7 @@
let eos;
const {
+ ERR_INVALID_CALLBACK,
ERR_MISSING_ARGS,
ERR_STREAM_DESTROYED
} = require('internal/errors').codes;
@@ -19,11 +20,6 @@ function once(callback) {
};
}
-function noop(err) {
- // Rethrow the error if it exists to avoid swallowing it
- if (err) throw err;
-}
-
function isRequest(stream) {
return stream.setHeader && typeof stream.abort === 'function';
}
@@ -66,8 +62,11 @@ function pipe(from, to) {
}
function popCallback(streams) {
- if (!streams.length) return noop;
- if (typeof streams[streams.length - 1] !== 'function') return noop;
+ // Streams should never be an empty array. It should always contain at least
+ // a single stream. Therefore optimize for the average case instead of
+ // checking for length === 0 as well.
+ if (typeof streams[streams.length - 1] !== 'function')
+ throw new ERR_INVALID_CALLBACK();
return streams.pop();
}
diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js
index 12733d88a7..f735054e88 100644
--- a/test/parallel/test-stream-pipeline.js
+++ b/test/parallel/test-stream-pipeline.js
@@ -60,7 +60,7 @@ common.crashOnUnhandledRejection();
}, /ERR_MISSING_ARGS/);
assert.throws(() => {
pipeline();
- }, /ERR_MISSING_ARGS/);
+ }, /ERR_INVALID_CALLBACK/);
}
{
@@ -493,17 +493,8 @@ common.crashOnUnhandledRejection();
}
});
- read.on('close', common.mustCall());
- transform.on('close', common.mustCall());
- write.on('close', common.mustCall());
-
- process.on('uncaughtException', common.mustCall((err) => {
- assert.deepStrictEqual(err, new Error('kaboom'));
- }));
-
- const dst = pipeline(read, transform, write);
-
- assert.strictEqual(dst, write);
-
- read.push('hello');
+ assert.throws(
+ () => pipeline(read, transform, write),
+ { code: 'ERR_INVALID_CALLBACK' }
+ );
}