summaryrefslogtreecommitdiff
path: root/test/parallel/test-stream-readable-object-multi-push-async.js
diff options
context:
space:
mode:
authorMatteo Collina <hello@matteocollina.com>2018-01-04 18:06:56 +0100
committerMatteo Collina <hello@matteocollina.com>2018-01-10 10:48:03 +0100
commit1e0f3315c77033ef0e01bb37c3d41c8e1d65e686 (patch)
treeb529e81c0e3fda479f2ba69996f484490fb098ca /test/parallel/test-stream-readable-object-multi-push-async.js
parent800caac2362e602d80b5c61fe1cb288bbcdb316a (diff)
downloadandroid-node-v8-1e0f3315c77033ef0e01bb37c3d41c8e1d65e686.tar.gz
android-node-v8-1e0f3315c77033ef0e01bb37c3d41c8e1d65e686.tar.bz2
android-node-v8-1e0f3315c77033ef0e01bb37c3d41c8e1d65e686.zip
stream: always defer 'readable' with nextTick
Emit 'readable' always in the next tick, resulting in a single call to _read() per microtick. This removes the need for the user to implement buffering if they wanted to call this.push() multiple times in an asynchronous fashion, as this.push() triggers this._read() call. PR-URL: https://github.com/nodejs/node/pull/17979 Fixes: https://github.com/nodejs/node/issues/3203 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Diffstat (limited to 'test/parallel/test-stream-readable-object-multi-push-async.js')
-rw-r--r--test/parallel/test-stream-readable-object-multi-push-async.js183
1 files changed, 183 insertions, 0 deletions
diff --git a/test/parallel/test-stream-readable-object-multi-push-async.js b/test/parallel/test-stream-readable-object-multi-push-async.js
new file mode 100644
index 0000000000..4babfd12a2
--- /dev/null
+++ b/test/parallel/test-stream-readable-object-multi-push-async.js
@@ -0,0 +1,183 @@
+'use strict';
+
+const common = require('../common');
+const assert = require('assert');
+const { Readable } = require('stream');
+
+const MAX = 42;
+const BATCH = 10;
+
+{
+ const readable = new Readable({
+ objectMode: true,
+ read: common.mustCall(function() {
+ console.log('>> READ');
+ fetchData((err, data) => {
+ if (err) {
+ this.destroy(err);
+ return;
+ }
+
+ if (data.length === 0) {
+ console.log('pushing null');
+ this.push(null);
+ return;
+ }
+
+ console.log('pushing');
+ data.forEach((d) => this.push(d));
+ });
+ }, Math.floor(MAX / BATCH) + 2)
+ });
+
+ let i = 0;
+ function fetchData(cb) {
+ if (i > MAX) {
+ setTimeout(cb, 10, null, []);
+ } else {
+ const array = [];
+ const max = i + BATCH;
+ for (; i < max; i++) {
+ array.push(i);
+ }
+ setTimeout(cb, 10, null, array);
+ }
+ }
+
+ readable.on('readable', () => {
+ let data;
+ console.log('readable emitted');
+ while (data = readable.read()) {
+ console.log(data);
+ }
+ });
+
+ readable.on('end', common.mustCall(() => {
+ assert.strictEqual(i, (Math.floor(MAX / BATCH) + 1) * BATCH);
+ }));
+}
+
+{
+ const readable = new Readable({
+ objectMode: true,
+ read: common.mustCall(function() {
+ console.log('>> READ');
+ fetchData((err, data) => {
+ if (err) {
+ this.destroy(err);
+ return;
+ }
+
+ if (data.length === 0) {
+ console.log('pushing null');
+ this.push(null);
+ return;
+ }
+
+ console.log('pushing');
+ data.forEach((d) => this.push(d));
+ });
+ }, Math.floor(MAX / BATCH) + 2)
+ });
+
+ let i = 0;
+ function fetchData(cb) {
+ if (i > MAX) {
+ setTimeout(cb, 10, null, []);
+ } else {
+ const array = [];
+ const max = i + BATCH;
+ for (; i < max; i++) {
+ array.push(i);
+ }
+ setTimeout(cb, 10, null, array);
+ }
+ }
+
+ readable.on('data', (data) => {
+ console.log('data emitted', data);
+ });
+
+ readable.on('end', common.mustCall(() => {
+ assert.strictEqual(i, (Math.floor(MAX / BATCH) + 1) * BATCH);
+ }));
+}
+
+{
+ const readable = new Readable({
+ objectMode: true,
+ read: common.mustCall(function() {
+ console.log('>> READ');
+ fetchData((err, data) => {
+ if (err) {
+ this.destroy(err);
+ return;
+ }
+
+ console.log('pushing');
+ data.forEach((d) => this.push(d));
+
+ if (data[BATCH - 1] >= MAX) {
+ console.log('pushing null');
+ this.push(null);
+ }
+ });
+ }, Math.floor(MAX / BATCH) + 1)
+ });
+
+ let i = 0;
+ function fetchData(cb) {
+ const array = [];
+ const max = i + BATCH;
+ for (; i < max; i++) {
+ array.push(i);
+ }
+ setTimeout(cb, 10, null, array);
+ }
+
+ readable.on('data', (data) => {
+ console.log('data emitted', data);
+ });
+
+ readable.on('end', common.mustCall(() => {
+ assert.strictEqual(i, (Math.floor(MAX / BATCH) + 1) * BATCH);
+ }));
+}
+
+{
+ const readable = new Readable({
+ objectMode: true,
+ read: common.mustNotCall()
+ });
+
+ readable.on('data', common.mustNotCall());
+
+ readable.push(null);
+
+ let nextTickPassed = false;
+ process.nextTick(() => {
+ nextTickPassed = true;
+ });
+
+ readable.on('end', common.mustCall(() => {
+ assert.strictEqual(nextTickPassed, false);
+ }));
+}
+
+{
+ const readable = new Readable({
+ objectMode: true,
+ read: common.mustCall()
+ });
+
+ readable.on('data', (data) => {
+ console.log('data emitted', data);
+ });
+
+ readable.on('end', common.mustCall());
+
+ setImmediate(() => {
+ readable.push('aaa');
+ readable.push(null);
+ });
+}