summaryrefslogtreecommitdiff
path: root/test/parallel/test-stream-pipeline.js
diff options
context:
space:
mode:
authorMathias Buus <mathiasbuus@gmail.com>2018-04-04 16:52:19 +0200
committerMathias Buus <mathiasbuus@gmail.com>2018-04-16 16:02:12 +0200
commitf64bebf2059d35299da58cf9c5ca22d68035d617 (patch)
tree0c1861d2ff8ef1eff11c7a9ecf90302867b4084b /test/parallel/test-stream-pipeline.js
parent5cc948b77a1452cdd8b667978c3cc1188b433b1a (diff)
downloadandroid-node-v8-f64bebf2059d35299da58cf9c5ca22d68035d617.tar.gz
android-node-v8-f64bebf2059d35299da58cf9c5ca22d68035d617.tar.bz2
android-node-v8-f64bebf2059d35299da58cf9c5ca22d68035d617.zip
stream: add pipeline and finished
PR-URL: https://github.com/nodejs/node/pull/19828 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
Diffstat (limited to 'test/parallel/test-stream-pipeline.js')
-rw-r--r--test/parallel/test-stream-pipeline.js483
1 files changed, 483 insertions, 0 deletions
diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js
new file mode 100644
index 0000000000..e63ee2ed11
--- /dev/null
+++ b/test/parallel/test-stream-pipeline.js
@@ -0,0 +1,483 @@
+'use strict';
+
+const common = require('../common');
+if (!common.hasCrypto)
+ common.skip('missing crypto');
+const { Stream, Writable, Readable, Transform, pipeline } = require('stream');
+const assert = require('assert');
+const http = require('http');
+const http2 = require('http2');
+const { promisify } = require('util');
+
+common.crashOnUnhandledRejection();
+
+{
+ let finished = false;
+ const processed = [];
+ const expected = [
+ Buffer.from('a'),
+ Buffer.from('b'),
+ Buffer.from('c')
+ ];
+
+ const read = new Readable({
+ read() {}
+ });
+
+ const write = new Writable({
+ write(data, enc, cb) {
+ processed.push(data);
+ cb();
+ }
+ });
+
+ write.on('finish', () => {
+ finished = true;
+ });
+
+ for (let i = 0; i < expected.length; i++) {
+ read.push(expected[i]);
+ }
+ read.push(null);
+
+ pipeline(read, write, common.mustCall((err) => {
+ assert.ok(!err, 'no error');
+ assert.ok(finished);
+ assert.deepStrictEqual(processed, expected);
+ }));
+}
+
+{
+ const read = new Readable({
+ read() {}
+ });
+
+ assert.throws(() => {
+ pipeline(read, () => {});
+ }, /ERR_MISSING_ARGS/);
+ assert.throws(() => {
+ pipeline(() => {});
+ }, /ERR_MISSING_ARGS/);
+ assert.throws(() => {
+ pipeline();
+ }, /ERR_MISSING_ARGS/);
+}
+
+{
+ const read = new Readable({
+ read() {}
+ });
+
+ const write = new Writable({
+ write(data, enc, cb) {
+ cb();
+ }
+ });
+
+ read.push('data');
+ setImmediate(() => read.destroy());
+
+ pipeline(read, write, common.mustCall((err) => {
+ assert.ok(err, 'should have an error');
+ }));
+}
+
+{
+ const read = new Readable({
+ read() {}
+ });
+
+ const write = new Writable({
+ write(data, enc, cb) {
+ cb();
+ }
+ });
+
+ read.push('data');
+ setImmediate(() => read.destroy(new Error('kaboom')));
+
+ const dst = pipeline(read, write, common.mustCall((err) => {
+ assert.deepStrictEqual(err, new Error('kaboom'));
+ }));
+
+ assert.strictEqual(dst, write);
+}
+
+{
+ const read = new Readable({
+ read() {}
+ });
+
+ const transform = new Transform({
+ transform(data, enc, cb) {
+ cb(new Error('kaboom'));
+ }
+ });
+
+ const write = new Writable({
+ write(data, enc, cb) {
+ cb();
+ }
+ });
+
+ read.on('close', common.mustCall());
+ transform.on('close', common.mustCall());
+ write.on('close', common.mustCall());
+
+ const dst = pipeline(read, transform, write, common.mustCall((err) => {
+ assert.deepStrictEqual(err, new Error('kaboom'));
+ }));
+
+ assert.strictEqual(dst, write);
+
+ read.push('hello');
+}
+
+{
+ const server = http.createServer((req, res) => {
+ const rs = new Readable({
+ read() {
+ rs.push('hello');
+ rs.push(null);
+ }
+ });
+
+ pipeline(rs, res);
+ });
+
+ server.listen(0, () => {
+ const req = http.request({
+ port: server.address().port
+ });
+
+ req.end();
+ req.on('response', (res) => {
+ const buf = [];
+ res.on('data', (data) => buf.push(data));
+ res.on('end', common.mustCall(() => {
+ assert.deepStrictEqual(
+ Buffer.concat(buf),
+ Buffer.from('hello')
+ );
+ server.close();
+ }));
+ });
+ });
+}
+
+{
+ const server = http.createServer((req, res) => {
+ const rs = new Readable({
+ read() {
+ rs.push('hello');
+ },
+ destroy: common.mustCall((err, cb) => {
+ // prevents fd leaks by destroying http pipelines
+ cb();
+ })
+ });
+
+ pipeline(rs, res);
+ });
+
+ server.listen(0, () => {
+ const req = http.request({
+ port: server.address().port
+ });
+
+ req.end();
+ req.on('response', (res) => {
+ setImmediate(() => {
+ res.destroy();
+ server.close();
+ });
+ });
+ });
+}
+
+{
+ const server = http.createServer((req, res) => {
+ const rs = new Readable({
+ read() {
+ rs.push('hello');
+ },
+ destroy: common.mustCall((err, cb) => {
+ cb();
+ })
+ });
+
+ pipeline(rs, res);
+ });
+
+ let cnt = 10;
+
+ const badSink = new Writable({
+ write(data, enc, cb) {
+ cnt--;
+ if (cnt === 0) cb(new Error('kaboom'));
+ else cb();
+ }
+ });
+
+ server.listen(0, () => {
+ const req = http.request({
+ port: server.address().port
+ });
+
+ req.end();
+ req.on('response', (res) => {
+ pipeline(res, badSink, common.mustCall((err) => {
+ assert.deepStrictEqual(err, new Error('kaboom'));
+ server.close();
+ }));
+ });
+ });
+}
+
+{
+ const server = http.createServer((req, res) => {
+ pipeline(req, res, common.mustCall());
+ });
+
+ server.listen(0, () => {
+ const req = http.request({
+ port: server.address().port
+ });
+
+ const rs = new Readable({
+ read() {
+ rs.push('hello');
+ }
+ });
+
+ pipeline(rs, req, common.mustCall(() => {
+ server.close();
+ }));
+
+ req.on('response', (res) => {
+ let cnt = 10;
+ res.on('data', () => {
+ cnt--;
+ if (cnt === 0) rs.destroy();
+ });
+ });
+ });
+}
+
+{
+ const server = http2.createServer((req, res) => {
+ pipeline(req, res, common.mustCall());
+ });
+
+ server.listen(0, () => {
+ const url = `http://localhost:${server.address().port}`;
+ const client = http2.connect(url);
+ const req = client.request({ ':method': 'POST' });
+
+ const rs = new Readable({
+ read() {
+ rs.push('hello');
+ }
+ });
+
+ pipeline(rs, req, common.mustCall((err) => {
+ // TODO: this is working around an http2 bug
+ // where the client keeps the event loop going
+ // (replacing the rs.destroy() with req.end()
+ // exits it so seems to be a destroy bug there
+ client.unref();
+
+ server.close();
+ client.close();
+ }));
+
+ let cnt = 10;
+ req.on('data', (data) => {
+ cnt--;
+ if (cnt === 0) rs.destroy();
+ });
+ });
+}
+
+{
+ const makeTransform = () => {
+ const tr = new Transform({
+ transform(data, enc, cb) {
+ cb(null, data);
+ }
+ });
+
+ tr.on('close', common.mustCall());
+ return tr;
+ };
+
+ const rs = new Readable({
+ read() {
+ rs.push('hello');
+ }
+ });
+
+ let cnt = 10;
+
+ const ws = new Writable({
+ write(data, enc, cb) {
+ cnt--;
+ if (cnt === 0) return cb(new Error('kaboom'));
+ cb();
+ }
+ });
+
+ rs.on('close', common.mustCall());
+ ws.on('close', common.mustCall());
+
+ pipeline(
+ rs,
+ makeTransform(),
+ makeTransform(),
+ makeTransform(),
+ makeTransform(),
+ makeTransform(),
+ makeTransform(),
+ ws,
+ common.mustCall((err) => {
+ assert.deepStrictEqual(err, new Error('kaboom'));
+ })
+ );
+}
+
+{
+ const oldStream = new Stream();
+
+ oldStream.pause = oldStream.resume = () => {};
+ oldStream.write = (data) => {
+ oldStream.emit('data', data);
+ return true;
+ };
+ oldStream.end = () => {
+ oldStream.emit('end');
+ };
+
+ const expected = [
+ Buffer.from('hello'),
+ Buffer.from('world')
+ ];
+
+ const rs = new Readable({
+ read() {
+ for (let i = 0; i < expected.length; i++) {
+ rs.push(expected[i]);
+ }
+ rs.push(null);
+ }
+ });
+
+ const ws = new Writable({
+ write(data, enc, cb) {
+ assert.deepStrictEqual(data, expected.shift());
+ cb();
+ }
+ });
+
+ let finished = false;
+
+ ws.on('finish', () => {
+ finished = true;
+ });
+
+ pipeline(
+ rs,
+ oldStream,
+ ws,
+ common.mustCall((err) => {
+ assert(!err, 'no error');
+ assert(finished, 'last stream finished');
+ })
+ );
+}
+
+{
+ const oldStream = new Stream();
+
+ oldStream.pause = oldStream.resume = () => {};
+ oldStream.write = (data) => {
+ oldStream.emit('data', data);
+ return true;
+ };
+ oldStream.end = () => {
+ oldStream.emit('end');
+ };
+
+ const destroyableOldStream = new Stream();
+
+ destroyableOldStream.pause = destroyableOldStream.resume = () => {};
+ destroyableOldStream.destroy = common.mustCall(() => {
+ destroyableOldStream.emit('close');
+ });
+ destroyableOldStream.write = (data) => {
+ destroyableOldStream.emit('data', data);
+ return true;
+ };
+ destroyableOldStream.end = () => {
+ destroyableOldStream.emit('end');
+ };
+
+ const rs = new Readable({
+ read() {
+ rs.destroy(new Error('stop'));
+ }
+ });
+
+ const ws = new Writable({
+ write(data, enc, cb) {
+ cb();
+ }
+ });
+
+ let finished = false;
+
+ ws.on('finish', () => {
+ finished = true;
+ });
+
+ pipeline(
+ rs,
+ oldStream,
+ destroyableOldStream,
+ ws,
+ common.mustCall((err) => {
+ assert.deepStrictEqual(err, new Error('stop'));
+ assert(!finished, 'should not finish');
+ })
+ );
+}
+
+{
+ const pipelinePromise = promisify(pipeline);
+
+ async function run() {
+ const read = new Readable({
+ read() {}
+ });
+
+ const write = new Writable({
+ write(data, enc, cb) {
+ cb();
+ }
+ });
+
+ read.push('data');
+ read.push(null);
+
+ let finished = false;
+
+ write.on('finish', () => {
+ finished = true;
+ });
+
+ await pipelinePromise(read, write);
+
+ assert(finished);
+ }
+
+ run();
+}