aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/api/stream.md4
-rw-r--r--lib/_stream_readable.js18
-rw-r--r--lib/_stream_writable.js18
-rw-r--r--test/parallel/test-stream-transform-split-highwatermark.js71
4 files changed, 107 insertions, 4 deletions
diff --git a/doc/api/stream.md b/doc/api/stream.md
index 743bea7955..c26ab800fa 100644
--- a/doc/api/stream.md
+++ b/doc/api/stream.md
@@ -1752,6 +1752,10 @@ constructor and implement *both* the `readable._read()` and
* `writableObjectMode` {boolean} Defaults to `false`. Sets `objectMode`
for writable side of the stream. Has no effect if `objectMode`
is `true`.
+ * `readableHighWaterMark` {number} Sets `highWaterMark` for the readable side
+ of the stream. Has no effect if `highWaterMark` is provided.
+ * `writableHighWaterMark` {number} Sets `highWaterMark` for the writable side
+ of the stream. Has no effect if `highWaterMark` is provided.
For example:
diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js
index 463ac3bfbc..1a0bc8902a 100644
--- a/lib/_stream_readable.js
+++ b/lib/_stream_readable.js
@@ -61,18 +61,32 @@ function prependListener(emitter, event, fn) {
function ReadableState(options, stream) {
options = options || {};
+ // Duplex streams are both readable and writable, but share
+ // the same options object.
+ // However, some cases require setting options to different
+ // values for the readable and the writable sides of the duplex stream.
+ // These options can be provided separately as readableXXX and writableXXX.
+ var isDuplex = stream instanceof Stream.Duplex;
+
// object stream flag. Used to make read(n) ignore n and to
// make all the buffer merging and length checks go away
this.objectMode = !!options.objectMode;
- if (stream instanceof Stream.Duplex)
+ if (isDuplex)
this.objectMode = this.objectMode || !!options.readableObjectMode;
// the point at which it stops calling _read() to fill the buffer
// Note: 0 is a valid value, means "don't call _read preemptively ever"
var hwm = options.highWaterMark;
+ var readableHwm = options.readableHighWaterMark;
var defaultHwm = this.objectMode ? 16 : 16 * 1024;
- this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;
+
+ if (hwm || hwm === 0)
+ this.highWaterMark = hwm;
+ else if (isDuplex && (readableHwm || readableHwm === 0))
+ this.highWaterMark = readableHwm;
+ else
+ this.highWaterMark = defaultHwm;
// cast to ints.
this.highWaterMark = Math.floor(this.highWaterMark);
diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js
index 334f492ba6..6e0eaf45b5 100644
--- a/lib/_stream_writable.js
+++ b/lib/_stream_writable.js
@@ -41,19 +41,33 @@ function nop() {}
function WritableState(options, stream) {
options = options || {};
+ // Duplex streams are both readable and writable, but share
+ // the same options object.
+ // However, some cases require setting options to different
+ // values for the readable and the writable sides of the duplex stream.
+ // These options can be provided separately as readableXXX and writableXXX.
+ var isDuplex = stream instanceof Stream.Duplex;
+
// object stream flag to indicate whether or not this stream
// contains buffers or objects.
this.objectMode = !!options.objectMode;
- if (stream instanceof Stream.Duplex)
+ if (isDuplex)
this.objectMode = this.objectMode || !!options.writableObjectMode;
// the point at which write() starts returning false
// Note: 0 is a valid value, means that we always return false if
// the entire buffer is not flushed immediately on write()
var hwm = options.highWaterMark;
+ var writableHwm = options.writableHighWaterMark;
var defaultHwm = this.objectMode ? 16 : 16 * 1024;
- this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;
+
+ if (hwm || hwm === 0)
+ this.highWaterMark = hwm;
+ else if (isDuplex && (writableHwm || writableHwm === 0))
+ this.highWaterMark = writableHwm;
+ else
+ this.highWaterMark = defaultHwm;
// cast to ints.
this.highWaterMark = Math.floor(this.highWaterMark);
diff --git a/test/parallel/test-stream-transform-split-highwatermark.js b/test/parallel/test-stream-transform-split-highwatermark.js
new file mode 100644
index 0000000000..af2558ec6d
--- /dev/null
+++ b/test/parallel/test-stream-transform-split-highwatermark.js
@@ -0,0 +1,71 @@
+'use strict';
+require('../common');
+const assert = require('assert');
+
+const { Transform, Readable, Writable } = require('stream');
+
+const DEFAULT = 16 * 1024;
+
+function testTransform(expectedReadableHwm, expectedWritableHwm, options) {
+ const t = new Transform(options);
+ assert.strictEqual(t._readableState.highWaterMark, expectedReadableHwm);
+ assert.strictEqual(t._writableState.highWaterMark, expectedWritableHwm);
+}
+
+// test overriding defaultHwm
+testTransform(666, DEFAULT, { readableHighWaterMark: 666 });
+testTransform(DEFAULT, 777, { writableHighWaterMark: 777 });
+testTransform(666, 777, {
+ readableHighWaterMark: 666,
+ writableHighWaterMark: 777,
+});
+
+// test 0 overriding defaultHwm
+testTransform(0, DEFAULT, { readableHighWaterMark: 0 });
+testTransform(DEFAULT, 0, { writableHighWaterMark: 0 });
+
+// test highWaterMark overriding
+testTransform(555, 555, {
+ highWaterMark: 555,
+ readableHighWaterMark: 666,
+});
+testTransform(555, 555, {
+ highWaterMark: 555,
+ writableHighWaterMark: 777,
+});
+testTransform(555, 555, {
+ highWaterMark: 555,
+ readableHighWaterMark: 666,
+ writableHighWaterMark: 777,
+});
+
+// test highWaterMark = 0 overriding
+testTransform(0, 0, {
+ highWaterMark: 0,
+ readableHighWaterMark: 666,
+});
+testTransform(0, 0, {
+ highWaterMark: 0,
+ writableHighWaterMark: 777,
+});
+testTransform(0, 0, {
+ highWaterMark: 0,
+ readableHighWaterMark: 666,
+ writableHighWaterMark: 777,
+});
+
+// test undefined, null, NaN
+[undefined, null, NaN].forEach((v) => {
+ testTransform(DEFAULT, DEFAULT, { readableHighWaterMark: v });
+ testTransform(DEFAULT, DEFAULT, { writableHighWaterMark: v });
+ testTransform(666, DEFAULT, { highWaterMark: v, readableHighWaterMark: 666 });
+ testTransform(DEFAULT, 777, { highWaterMark: v, writableHighWaterMark: 777 });
+});
+
+// test non Duplex streams ignore the options
+{
+ const r = new Readable({ readableHighWaterMark: 666 });
+ assert.strictEqual(r._readableState.highWaterMark, DEFAULT);
+ const w = new Writable({ writableHighWaterMark: 777 });
+ assert.strictEqual(w._writableState.highWaterMark, DEFAULT);
+}