summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorMatteo Collina <hello@matteocollina.com>2018-01-04 18:06:56 +0100
committerMatteo Collina <hello@matteocollina.com>2018-01-10 10:48:03 +0100
commit1e0f3315c77033ef0e01bb37c3d41c8e1d65e686 (patch)
treeb529e81c0e3fda479f2ba69996f484490fb098ca /test
parent800caac2362e602d80b5c61fe1cb288bbcdb316a (diff)
downloadandroid-node-v8-1e0f3315c77033ef0e01bb37c3d41c8e1d65e686.tar.gz
android-node-v8-1e0f3315c77033ef0e01bb37c3d41c8e1d65e686.tar.bz2
android-node-v8-1e0f3315c77033ef0e01bb37c3d41c8e1d65e686.zip
stream: always defer 'readable' with nextTick
Emit 'readable' always in the next tick, resulting in a single call to _read() per microtick. This removes the need for the user to implement buffering if they wanted to call this.push() multiple times in an asynchronous fashion, as this.push() triggers this._read() call. PR-URL: https://github.com/nodejs/node/pull/17979 Fixes: https://github.com/nodejs/node/issues/3203 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Diffstat (limited to 'test')
-rw-r--r--test/parallel/test-net-end-close.js12
-rw-r--r--test/parallel/test-stream-pipe-await-drain-push-while-write.js32
-rw-r--r--test/parallel/test-stream-readable-emittedReadable.js17
-rw-r--r--test/parallel/test-stream-readable-needReadable.js23
-rw-r--r--test/parallel/test-stream-readable-object-multi-push-async.js183
-rw-r--r--test/parallel/test-stream-readable-reading-readingMore.js3
-rw-r--r--test/parallel/test-stream2-transform.js13
7 files changed, 234 insertions, 49 deletions
diff --git a/test/parallel/test-net-end-close.js b/test/parallel/test-net-end-close.js
index 44a539a3e8..31c150e09c 100644
--- a/test/parallel/test-net-end-close.js
+++ b/test/parallel/test-net-end-close.js
@@ -8,9 +8,9 @@ const uv = process.binding('uv');
const s = new net.Socket({
handle: {
readStart: function() {
- process.nextTick(() => this.onread(uv.UV_EOF, null));
+ setImmediate(() => this.onread(uv.UV_EOF, null));
},
- close: (cb) => process.nextTick(cb)
+ close: (cb) => setImmediate(cb)
},
writable: false
});
@@ -18,8 +18,12 @@ assert.strictEqual(s, s.resume());
const events = [];
-s.on('end', () => events.push('end'));
-s.on('close', () => events.push('close'));
+s.on('end', () => {
+ events.push('end');
+});
+s.on('close', () => {
+ events.push('close');
+});
process.on('exit', () => {
assert.deepStrictEqual(events, [ 'end', 'close' ]);
diff --git a/test/parallel/test-stream-pipe-await-drain-push-while-write.js b/test/parallel/test-stream-pipe-await-drain-push-while-write.js
index adefad70ad..263e6b6801 100644
--- a/test/parallel/test-stream-pipe-await-drain-push-while-write.js
+++ b/test/parallel/test-stream-pipe-await-drain-push-while-write.js
@@ -3,32 +3,24 @@ const common = require('../common');
const stream = require('stream');
const assert = require('assert');
-const awaitDrainStates = [
- 1, // after first chunk before callback
- 1, // after second chunk before callback
- 0 // resolving chunk pushed after first chunk, awaitDrain is decreased
-];
-
-// A writable stream which pushes data onto the stream which pipes into it,
-// but only the first time it's written to. Since it's not paused at this time,
-// a second write will occur. If the pipe increases awaitDrain twice, we'll
-// never get subsequent chunks because 'drain' is only emitted once.
const writable = new stream.Writable({
write: common.mustCall(function(chunk, encoding, cb) {
- if (chunk.length === 32 * 1024) { // first chunk
- const beforePush = readable._readableState.awaitDrain;
- readable.push(Buffer.alloc(34 * 1024)); // above hwm
- // We should check if awaitDrain counter is increased.
- const afterPush = readable._readableState.awaitDrain;
- assert.strictEqual(afterPush - beforePush, 1,
- 'Counter is not increased for awaitDrain');
- }
-
assert.strictEqual(
- awaitDrainStates.shift(),
readable._readableState.awaitDrain,
+ 0,
'State variable awaitDrain is not correct.'
);
+
+ if (chunk.length === 32 * 1024) { // first chunk
+ readable.push(Buffer.alloc(34 * 1024)); // above hwm
+ // We should check if awaitDrain counter is increased in the next
+ // tick, because awaitDrain is incremented after this method finished
+ process.nextTick(() => {
+ assert.strictEqual(readable._readableState.awaitDrain, 1,
+ 'Counter is not increased for awaitDrain');
+ });
+ }
+
cb();
}, 3)
});
diff --git a/test/parallel/test-stream-readable-emittedReadable.js b/test/parallel/test-stream-readable-emittedReadable.js
index 65b6b5b15a..5b9affc59f 100644
--- a/test/parallel/test-stream-readable-emittedReadable.js
+++ b/test/parallel/test-stream-readable-emittedReadable.js
@@ -10,30 +10,33 @@ const readable = new Readable({
// Initialized to false.
assert.strictEqual(readable._readableState.emittedReadable, false);
+const expected = [Buffer.from('foobar'), Buffer.from('quo'), null];
readable.on('readable', common.mustCall(() => {
// emittedReadable should be true when the readable event is emitted
assert.strictEqual(readable._readableState.emittedReadable, true);
- readable.read();
+ assert.deepStrictEqual(readable.read(), expected.shift());
// emittedReadable is reset to false during read()
assert.strictEqual(readable._readableState.emittedReadable, false);
-}, 4));
+}, 3));
// When the first readable listener is just attached,
// emittedReadable should be false
assert.strictEqual(readable._readableState.emittedReadable, false);
-// Each one of these should trigger a readable event.
+// These trigger a single 'readable', as things are batched up
process.nextTick(common.mustCall(() => {
readable.push('foo');
}));
process.nextTick(common.mustCall(() => {
readable.push('bar');
}));
-process.nextTick(common.mustCall(() => {
+
+// these triggers two readable events
+setImmediate(common.mustCall(() => {
readable.push('quo');
-}));
-process.nextTick(common.mustCall(() => {
- readable.push(null);
+ process.nextTick(common.mustCall(() => {
+ readable.push(null);
+ }));
}));
const noRead = new Readable({
diff --git a/test/parallel/test-stream-readable-needReadable.js b/test/parallel/test-stream-readable-needReadable.js
index be397dc5dc..7058e123f0 100644
--- a/test/parallel/test-stream-readable-needReadable.js
+++ b/test/parallel/test-stream-readable-needReadable.js
@@ -38,7 +38,7 @@ asyncReadable.on('readable', common.mustCall(() => {
// then we need to notify the reader on future changes.
assert.strictEqual(asyncReadable._readableState.needReadable, true);
}
-}, 3));
+}, 2));
process.nextTick(common.mustCall(() => {
asyncReadable.push('foooo');
@@ -46,8 +46,9 @@ process.nextTick(common.mustCall(() => {
process.nextTick(common.mustCall(() => {
asyncReadable.push('bar');
}));
-process.nextTick(common.mustCall(() => {
+setImmediate(common.mustCall(() => {
asyncReadable.push(null);
+ assert.strictEqual(asyncReadable._readableState.needReadable, false);
}));
const flowing = new Readable({
@@ -84,13 +85,13 @@ slowProducer.on('readable', common.mustCall(() => {
process.nextTick(common.mustCall(() => {
slowProducer.push('foo');
-}));
-process.nextTick(common.mustCall(() => {
- slowProducer.push('foo');
-}));
-process.nextTick(common.mustCall(() => {
- slowProducer.push('foo');
-}));
-process.nextTick(common.mustCall(() => {
- slowProducer.push(null);
+ process.nextTick(common.mustCall(() => {
+ slowProducer.push('foo');
+ process.nextTick(common.mustCall(() => {
+ slowProducer.push('foo');
+ process.nextTick(common.mustCall(() => {
+ slowProducer.push(null);
+ }));
+ }));
+ }));
}));
diff --git a/test/parallel/test-stream-readable-object-multi-push-async.js b/test/parallel/test-stream-readable-object-multi-push-async.js
new file mode 100644
index 0000000000..4babfd12a2
--- /dev/null
+++ b/test/parallel/test-stream-readable-object-multi-push-async.js
@@ -0,0 +1,183 @@
+'use strict';
+
+const common = require('../common');
+const assert = require('assert');
+const { Readable } = require('stream');
+
+const MAX = 42;
+const BATCH = 10;
+
+{
+ const readable = new Readable({
+ objectMode: true,
+ read: common.mustCall(function() {
+ console.log('>> READ');
+ fetchData((err, data) => {
+ if (err) {
+ this.destroy(err);
+ return;
+ }
+
+ if (data.length === 0) {
+ console.log('pushing null');
+ this.push(null);
+ return;
+ }
+
+ console.log('pushing');
+ data.forEach((d) => this.push(d));
+ });
+ }, Math.floor(MAX / BATCH) + 2)
+ });
+
+ let i = 0;
+ function fetchData(cb) {
+ if (i > MAX) {
+ setTimeout(cb, 10, null, []);
+ } else {
+ const array = [];
+ const max = i + BATCH;
+ for (; i < max; i++) {
+ array.push(i);
+ }
+ setTimeout(cb, 10, null, array);
+ }
+ }
+
+ readable.on('readable', () => {
+ let data;
+ console.log('readable emitted');
+ while (data = readable.read()) {
+ console.log(data);
+ }
+ });
+
+ readable.on('end', common.mustCall(() => {
+ assert.strictEqual(i, (Math.floor(MAX / BATCH) + 1) * BATCH);
+ }));
+}
+
+{
+ const readable = new Readable({
+ objectMode: true,
+ read: common.mustCall(function() {
+ console.log('>> READ');
+ fetchData((err, data) => {
+ if (err) {
+ this.destroy(err);
+ return;
+ }
+
+ if (data.length === 0) {
+ console.log('pushing null');
+ this.push(null);
+ return;
+ }
+
+ console.log('pushing');
+ data.forEach((d) => this.push(d));
+ });
+ }, Math.floor(MAX / BATCH) + 2)
+ });
+
+ let i = 0;
+ function fetchData(cb) {
+ if (i > MAX) {
+ setTimeout(cb, 10, null, []);
+ } else {
+ const array = [];
+ const max = i + BATCH;
+ for (; i < max; i++) {
+ array.push(i);
+ }
+ setTimeout(cb, 10, null, array);
+ }
+ }
+
+ readable.on('data', (data) => {
+ console.log('data emitted', data);
+ });
+
+ readable.on('end', common.mustCall(() => {
+ assert.strictEqual(i, (Math.floor(MAX / BATCH) + 1) * BATCH);
+ }));
+}
+
+{
+ const readable = new Readable({
+ objectMode: true,
+ read: common.mustCall(function() {
+ console.log('>> READ');
+ fetchData((err, data) => {
+ if (err) {
+ this.destroy(err);
+ return;
+ }
+
+ console.log('pushing');
+ data.forEach((d) => this.push(d));
+
+ if (data[BATCH - 1] >= MAX) {
+ console.log('pushing null');
+ this.push(null);
+ }
+ });
+ }, Math.floor(MAX / BATCH) + 1)
+ });
+
+ let i = 0;
+ function fetchData(cb) {
+ const array = [];
+ const max = i + BATCH;
+ for (; i < max; i++) {
+ array.push(i);
+ }
+ setTimeout(cb, 10, null, array);
+ }
+
+ readable.on('data', (data) => {
+ console.log('data emitted', data);
+ });
+
+ readable.on('end', common.mustCall(() => {
+ assert.strictEqual(i, (Math.floor(MAX / BATCH) + 1) * BATCH);
+ }));
+}
+
+{
+ const readable = new Readable({
+ objectMode: true,
+ read: common.mustNotCall()
+ });
+
+ readable.on('data', common.mustNotCall());
+
+ readable.push(null);
+
+ let nextTickPassed = false;
+ process.nextTick(() => {
+ nextTickPassed = true;
+ });
+
+ readable.on('end', common.mustCall(() => {
+ assert.strictEqual(nextTickPassed, false);
+ }));
+}
+
+{
+ const readable = new Readable({
+ objectMode: true,
+ read: common.mustCall()
+ });
+
+ readable.on('data', (data) => {
+ console.log('data emitted', data);
+ });
+
+ readable.on('end', common.mustCall());
+
+ setImmediate(() => {
+ readable.push('aaa');
+ readable.push(null);
+ });
+}
diff --git a/test/parallel/test-stream-readable-reading-readingMore.js b/test/parallel/test-stream-readable-reading-readingMore.js
index e31d2dd921..0af2eeb71f 100644
--- a/test/parallel/test-stream-readable-reading-readingMore.js
+++ b/test/parallel/test-stream-readable-reading-readingMore.js
@@ -37,7 +37,8 @@ readable.on('readable', common.mustCall(() => {
// if the stream has ended, we shouldn't be reading
assert.strictEqual(state.ended, !state.reading);
- if (readable.read() === null) // reached end of stream
+ const data = readable.read();
+ if (data === null) // reached end of stream
process.nextTick(common.mustCall(onStreamEnd, 1));
}, 2));
diff --git a/test/parallel/test-stream2-transform.js b/test/parallel/test-stream2-transform.js
index d085926542..68c25141aa 100644
--- a/test/parallel/test-stream2-transform.js
+++ b/test/parallel/test-stream2-transform.js
@@ -306,25 +306,26 @@ const Transform = require('_stream_transform');
pt.write(Buffer.from('foog'));
pt.write(Buffer.from('bark'));
- assert.strictEqual(emits, 1);
+ assert.strictEqual(emits, 0);
assert.strictEqual(pt.read(5).toString(), 'foogb');
assert.strictEqual(String(pt.read(5)), 'null');
+ assert.strictEqual(emits, 0);
pt.write(Buffer.from('bazy'));
pt.write(Buffer.from('kuel'));
- assert.strictEqual(emits, 2);
+ assert.strictEqual(emits, 0);
assert.strictEqual(pt.read(5).toString(), 'arkba');
assert.strictEqual(pt.read(5).toString(), 'zykue');
assert.strictEqual(pt.read(5), null);
pt.end();
- assert.strictEqual(emits, 3);
+ assert.strictEqual(emits, 1);
assert.strictEqual(pt.read(5).toString(), 'l');
assert.strictEqual(pt.read(5), null);
- assert.strictEqual(emits, 3);
+ assert.strictEqual(emits, 1);
}
{
@@ -338,7 +339,7 @@ const Transform = require('_stream_transform');
pt.write(Buffer.from('foog'));
pt.write(Buffer.from('bark'));
- assert.strictEqual(emits, 1);
+ assert.strictEqual(emits, 0);
assert.strictEqual(pt.read(5).toString(), 'foogb');
assert.strictEqual(pt.read(5), null);
@@ -352,7 +353,7 @@ const Transform = require('_stream_transform');
pt.once('readable', common.mustCall(function() {
assert.strictEqual(pt.read(5).toString(), 'l');
assert.strictEqual(pt.read(5), null);
- assert.strictEqual(emits, 4);
+ assert.strictEqual(emits, 3);
}));
pt.end();
}));