summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatteo Collina <hello@matteocollina.com>2019-08-19 17:45:30 +0200
committerMatteo Collina <hello@matteocollina.com>2019-12-03 12:14:48 +0100
commite490d9b1539a7ec5f6b9d9b2f18753ce1c3459f7 (patch)
tree2d0f7e2431de3a96df24200d8c733896fdfb7176
parentae8f20ec5eee55f648823392c9c4e9491c958b60 (diff)
downloadandroid-node-v8-e490d9b1539a7ec5f6b9d9b2f18753ce1c3459f7.tar.gz
android-node-v8-e490d9b1539a7ec5f6b9d9b2f18753ce1c3459f7.tar.bz2
android-node-v8-e490d9b1539a7ec5f6b9d9b2f18753ce1c3459f7.zip
stream: add support for captureRejection option
PR-URL: https://github.com/nodejs/node/pull/27867 Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Jeremiah Senkpiel <fishrock123@rocketmail.com> Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Michaƫl Zasso <targos@protonmail.com>
-rw-r--r--lib/_stream_readable.js10
-rw-r--r--lib/_stream_writable.js7
-rw-r--r--lib/internal/streams/legacy.js4
-rw-r--r--test/parallel/test-stream-catch-rejections.js52
4 files changed, 69 insertions, 4 deletions
diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js
index 0d83ada205..8f5f66b391 100644
--- a/lib/_stream_readable.js
+++ b/lib/_stream_readable.js
@@ -190,7 +190,7 @@ function Readable(options) {
this._destroy = options.destroy;
}
- Stream.call(this);
+ Stream.call(this, options);
}
ObjectDefineProperty(Readable.prototype, 'destroyed', {
@@ -233,6 +233,14 @@ Readable.prototype._destroy = function(err, cb) {
cb(err);
};
+Readable.prototype[EE.captureRejectionSymbol] = function(err) {
+ // TODO(mcollina): remove the destroyed if once errorEmitted lands in
+ // Readable.
+ if (!this.destroyed) {
+ this.destroy(err);
+ }
+};
+
// Manually shove something into the read() buffer.
// This returns true if the highWaterMark has not been hit yet,
// similar to how Writable.write() returns true if you should
diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js
index ab168fafe5..a9f7164dc7 100644
--- a/lib/_stream_writable.js
+++ b/lib/_stream_writable.js
@@ -35,6 +35,7 @@ module.exports = Writable;
Writable.WritableState = WritableState;
const internalUtil = require('internal/util');
+const EE = require('events');
const Stream = require('stream');
const { Buffer } = require('buffer');
const destroyImpl = require('internal/streams/destroy');
@@ -254,7 +255,7 @@ function Writable(options) {
this._final = options.final;
}
- Stream.call(this);
+ Stream.call(this, options);
}
// Otherwise people can pipe Writable streams, which is just wrong.
@@ -831,3 +832,7 @@ Writable.prototype._undestroy = destroyImpl.undestroy;
Writable.prototype._destroy = function(err, cb) {
cb(err);
};
+
+Writable.prototype[EE.captureRejectionSymbol] = function(err) {
+ this.destroy(err);
+};
diff --git a/lib/internal/streams/legacy.js b/lib/internal/streams/legacy.js
index 702e3c56ba..2bc7a86aa0 100644
--- a/lib/internal/streams/legacy.js
+++ b/lib/internal/streams/legacy.js
@@ -6,8 +6,8 @@ const {
const EE = require('events');
-function Stream() {
- EE.call(this);
+function Stream(opts) {
+ EE.call(this, opts);
}
ObjectSetPrototypeOf(Stream.prototype, EE.prototype);
ObjectSetPrototypeOf(Stream, EE);
diff --git a/test/parallel/test-stream-catch-rejections.js b/test/parallel/test-stream-catch-rejections.js
new file mode 100644
index 0000000000..fb5f1fccc1
--- /dev/null
+++ b/test/parallel/test-stream-catch-rejections.js
@@ -0,0 +1,52 @@
+'use strict';
+
+const common = require('../common');
+const stream = require('stream');
+const assert = require('assert');
+
+{
+ const r = new stream.Readable({
+ captureRejections: true,
+ read() {
+ this.push('hello');
+ this.push('world');
+ this.push(null);
+ }
+ });
+
+ const err = new Error('kaboom');
+
+ r.on('error', common.mustCall((_err) => {
+ assert.strictEqual(err, _err);
+ assert.strictEqual(r.destroyed, true);
+ }));
+
+ r.on('data', async () => {
+ throw err;
+ });
+}
+
+{
+ const w = new stream.Writable({
+ captureRejections: true,
+ highWaterMark: 1,
+ write(chunk, enc, cb) {
+ cb();
+ }
+ });
+
+ const err = new Error('kaboom');
+
+ w.write('hello', () => {
+ w.write('world');
+ });
+
+ w.on('error', common.mustCall((_err) => {
+ assert.strictEqual(err, _err);
+ assert.strictEqual(w.destroyed, true);
+ }));
+
+ w.on('drain', common.mustCall(async () => {
+ throw err;
+ }, 2));
+}