'use strict'; const common = require('../common'); const { Stream, Writable, Readable, Transform, pipeline, PassThrough, Duplex, addAbortSignal, } = require('stream'); const assert = require('assert'); const http = require('http'); const { promisify } = require('util'); const net = require('net'); { let finished = false; const processed = []; const expected = [ Buffer.from('a'), Buffer.from('b'), Buffer.from('c') ]; const read = new Readable({ read() {} }); const write = new Writable({ write(data, enc, cb) { processed.push(data); cb(); } }); write.on('finish', () => { finished = true; }); for (let i = 0; i < expected.length; i++) { read.push(expected[i]); } read.push(null); pipeline(read, write, common.mustSucceed(() => { assert.ok(finished); assert.deepStrictEqual(processed, expected); })); } { const read = new Readable({ read() {} }); assert.throws(() => { pipeline(read, () => {}); }, /ERR_MISSING_ARGS/); assert.throws(() => { pipeline(() => {}); }, /ERR_MISSING_ARGS/); assert.throws(() => { pipeline(); }, /ERR_INVALID_CALLBACK/); } { const read = new Readable({ read() {} }); const write = new Writable({ write(data, enc, cb) { cb(); } }); read.push('data'); setImmediate(() => read.destroy()); pipeline(read, write, common.mustCall((err) => { assert.ok(err, 'should have an error'); })); } { const read = new Readable({ read() {} }); const write = new Writable({ write(data, enc, cb) { cb(); } }); read.push('data'); setImmediate(() => read.destroy(new Error('kaboom'))); const dst = pipeline(read, write, common.mustCall((err) => { assert.deepStrictEqual(err, new Error('kaboom')); })); assert.strictEqual(dst, write); } { const read = new Readable({ read() {} }); const transform = new Transform({ transform(data, enc, cb) { cb(new Error('kaboom')); } }); const write = new Writable({ write(data, enc, cb) { cb(); } }); read.on('close', common.mustCall()); transform.on('close', common.mustCall()); write.on('close', common.mustCall()); [read, transform, write].forEach((stream) => { stream.on('error', common.mustCall((err) => { assert.deepStrictEqual(err, new Error('kaboom')); })); }); const dst = pipeline(read, transform, write, common.mustCall((err) => { assert.deepStrictEqual(err, new Error('kaboom')); })); assert.strictEqual(dst, write); read.push('hello'); } { const server = http.createServer((req, res) => { const rs = new Readable({ read() { rs.push('hello'); rs.push(null); } }); pipeline(rs, res, () => {}); }); server.listen(0, () => { const req = http.request({ port: server.address().port }); req.end(); req.on('response', (res) => { const buf = []; res.on('data', (data) => buf.push(data)); res.on('end', common.mustCall(() => { assert.deepStrictEqual( Buffer.concat(buf), Buffer.from('hello') ); server.close(); })); }); }); } { const server = http.createServer((req, res) => { let sent = false; const rs = new Readable({ read() { if (sent) { return; } sent = true; rs.push('hello'); }, destroy: common.mustCall((err, cb) => { // Prevents fd leaks by destroying http pipelines cb(); }) }); pipeline(rs, res, () => {}); }); server.listen(0, () => { const req = http.request({ port: server.address().port }); req.end(); req.on('response', (res) => { setImmediate(() => { res.destroy(); server.close(); }); }); }); } { const server = http.createServer((req, res) => { let sent = 0; const rs = new Readable({ read() { if (sent++ > 10) { return; } rs.push('hello'); }, destroy: common.mustCall((err, cb) => { cb(); }) }); pipeline(rs, res, () => {}); }); let cnt = 10; const badSink = new Writable({ write(data, enc, cb) { cnt--; if (cnt === 0) cb(new Error('kaboom')); else cb(); } }); server.listen(0, () => { const req = http.request({ port: server.address().port }); req.end(); req.on('response', (res) => { pipeline(res, badSink, common.mustCall((err) => { assert.deepStrictEqual(err, new Error('kaboom')); server.close(); })); }); }); } { const server = http.createServer((req, res) => { pipeline(req, res, common.mustSucceed()); }); server.listen(0, () => { const req = http.request({ port: server.address().port }); let sent = 0; const rs = new Readable({ read() { if (sent++ > 10) { return; } rs.push('hello'); } }); pipeline(rs, req, common.mustCall(() => { server.close(); })); req.on('response', (res) => { let cnt = 10; res.on('data', () => { cnt--; if (cnt === 0) rs.destroy(); }); }); }); } { const makeTransform = () => { const tr = new Transform({ transform(data, enc, cb) { cb(null, data); } }); tr.on('close', common.mustCall()); return tr; }; const rs = new Readable({ read() { rs.push('hello'); } }); let cnt = 10; const ws = new Writable({ write(data, enc, cb) { cnt--; if (cnt === 0) return cb(new Error('kaboom')); cb(); } }); rs.on('close', common.mustCall()); ws.on('close', common.mustCall()); pipeline( rs, makeTransform(), makeTransform(), makeTransform(), makeTransform(), makeTransform(), makeTransform(), ws, common.mustCall((err) => { assert.deepStrictEqual(err, new Error('kaboom')); }) ); } { const oldStream = new Stream(); oldStream.pause = oldStream.resume = () => {}; oldStream.write = (data) => { oldStream.emit('data', data); return true; }; oldStream.end = () => { oldStream.emit('end'); }; const expected = [ Buffer.from('hello'), Buffer.from('world') ]; const rs = new Readable({ read() { for (let i = 0; i < expected.length; i++) { rs.push(expected[i]); } rs.push(null); } }); const ws = new Writable({ write(data, enc, cb) { assert.deepStrictEqual(data, expected.shift()); cb(); } }); let finished = false; ws.on('finish', () => { finished = true; }); pipeline( rs, oldStream, ws, common.mustSucceed(() => { assert(finished, 'last stream finished'); }) ); } { const oldStream = new Stream(); oldStream.pause = oldStream.resume = () => {}; oldStream.write = (data) => { oldStream.emit('data', data); return true; }; oldStream.end = () => { oldStream.emit('end'); }; const destroyableOldStream = new Stream(); destroyableOldStream.pause = destroyableOldStream.resume = () => {}; destroyableOldStream.destroy = common.mustCall(() => { destroyableOldStream.emit('close'); }); destroyableOldStream.write = (data) => { destroyableOldStream.emit('data', data); return true; }; destroyableOldStream.end = () => { destroyableOldStream.emit('end'); }; const rs = new Readable({ read() { rs.destroy(new Error('stop')); } }); const ws = new Writable({ write(data, enc, cb) { cb(); } }); let finished = false; ws.on('finish', () => { finished = true; }); pipeline( rs, oldStream, destroyableOldStream, ws, common.mustCall((err) => { assert.deepStrictEqual(err, new Error('stop')); assert(!finished, 'should not finish'); }) ); } { const pipelinePromise = promisify(pipeline); async function run() { const read = new Readable({ read() {} }); const write = new Writable({ write(data, enc, cb) { cb(); } }); read.push('data'); read.push(null); let finished = false; write.on('finish', () => { finished = true; }); await pipelinePromise(read, write); assert(finished); } run(); } { const read = new Readable({ read() {} }); const transform = new Transform({ transform(data, enc, cb) { cb(new Error('kaboom')); } }); const write = new Writable({ write(data, enc, cb) { cb(); } }); assert.throws( () => pipeline(read, transform, write), { code: 'ERR_INVALID_CALLBACK' } ); } { const server = http.Server(function(req, res) { res.write('asd'); }); server.listen(0, function() { http.get({ port: this.address().port }, (res) => { const stream = new PassThrough(); stream.on('error', common.mustCall()); pipeline( res, stream, common.mustCall((err) => { assert.strictEqual(err.message, 'oh no'); server.close(); }) ); stream.destroy(new Error('oh no')); }).on('error', common.mustNotCall()); }); } { let res = ''; const w = new Writable({ write(chunk, encoding, callback) { res += chunk; callback(); } }); pipeline(function*() { yield 'hello'; yield 'world'; }(), w, common.mustSucceed(() => { assert.strictEqual(res, 'helloworld'); })); } { let res = ''; const w = new Writable({ write(chunk, encoding, callback) { res += chunk; callback(); } }); pipeline(async function*() { await Promise.resolve(); yield 'hello'; yield 'world'; }(), w, common.mustSucceed(() => { assert.strictEqual(res, 'helloworld'); })); } { let res = ''; const w = new Writable({ write(chunk, encoding, callback) { res += chunk; callback(); } }); pipeline(function*() { yield 'hello'; yield 'world'; }, w, common.mustSucceed(() => { assert.strictEqual(res, 'helloworld'); })); } { let res = ''; const w = new Writable({ write(chunk, encoding, callback) { res += chunk; callback(); } }); pipeline(async function*() { await Promise.resolve(); yield 'hello'; yield 'world'; }, w, common.mustSucceed(() => { assert.strictEqual(res, 'helloworld'); })); } { let res = ''; pipeline(async function*() { await Promise.resolve(); yield 'hello'; yield 'world'; }, async function*(source) { for await (const chunk of source) { yield chunk.toUpperCase(); } }, async function(source) { for await (const chunk of source) { res += chunk; } }, common.mustSucceed(() => { assert.strictEqual(res, 'HELLOWORLD'); })); } { pipeline(async function*() { await Promise.resolve(); yield 'hello'; yield 'world'; }, async function*(source) { for await (const chunk of source) { yield chunk.toUpperCase(); } }, async function(source) { let ret = ''; for await (const chunk of source) { ret += chunk; } return ret; }, common.mustSucceed((val) => { assert.strictEqual(val, 'HELLOWORLD'); })); } { // AsyncIterable destination is returned and finalizes. const ret = pipeline(async function*() { await Promise.resolve(); yield 'hello'; }, async function*(source) { for await (const chunk of source) {} }, common.mustCall((err) => { assert.strictEqual(err, undefined); })); ret.resume(); assert.strictEqual(typeof ret.pipe, 'function'); } { // AsyncFunction destination is not returned and error is // propagated. const ret = pipeline(async function*() { await Promise.resolve(); throw new Error('kaboom'); }, async function*(source) { for await (const chunk of source) {} }, common.mustCall((err) => { assert.strictEqual(err.message, 'kaboom'); })); ret.resume(); assert.strictEqual(typeof ret.pipe, 'function'); } { const s = new PassThrough(); pipeline(async function*() { throw new Error('kaboom'); }, s, common.mustCall((err) => { assert.strictEqual(err.message, 'kaboom'); assert.strictEqual(s.destroyed, true); })); } { const s = new PassThrough(); pipeline(async function*() { throw new Error('kaboom'); }(), s, common.mustCall((err) => { assert.strictEqual(err.message, 'kaboom'); assert.strictEqual(s.destroyed, true); })); } { const s = new PassThrough(); pipeline(function*() { throw new Error('kaboom'); }, s, common.mustCall((err, val) => { assert.strictEqual(err.message, 'kaboom'); assert.strictEqual(s.destroyed, true); })); } { const s = new PassThrough(); pipeline(function*() { throw new Error('kaboom'); }(), s, common.mustCall((err, val) => { assert.strictEqual(err.message, 'kaboom'); assert.strictEqual(s.destroyed, true); })); } { const s = new PassThrough(); pipeline(async function*() { await Promise.resolve(); yield 'hello'; yield 'world'; }, s, async function(source) { for await (const chunk of source) { throw new Error('kaboom'); } }, common.mustCall((err, val) => { assert.strictEqual(err.message, 'kaboom'); assert.strictEqual(s.destroyed, true); })); } { const s = new PassThrough(); const ret = pipeline(function() { return ['hello', 'world']; }, s, async function*(source) { for await (const chunk of source) { throw new Error('kaboom'); } }, common.mustCall((err) => { assert.strictEqual(err.message, 'kaboom'); assert.strictEqual(s.destroyed, true); })); ret.resume(); assert.strictEqual(typeof ret.pipe, 'function'); } { // Legacy streams without async iterator. const s = new PassThrough(); s.push('asd'); s.push(null); s[Symbol.asyncIterator] = null; let ret = ''; pipeline(s, async function(source) { for await (const chunk of source) { ret += chunk; } }, common.mustCall((err) => { assert.strictEqual(err, undefined); assert.strictEqual(ret, 'asd'); })); } { // v1 streams without read(). const s = new Stream(); process.nextTick(() => { s.emit('data', 'asd'); s.emit('end'); }); // 'destroyer' can be called multiple times, // once from stream wrapper and // once from iterator wrapper. s.close = common.mustCallAtLeast(1); let ret = ''; pipeline(s, async function(source) { for await (const chunk of source) { ret += chunk; } }, common.mustCall((err) => { assert.strictEqual(err, undefined); assert.strictEqual(ret, 'asd'); })); } { // v1 error streams without read(). const s = new Stream(); process.nextTick(() => { s.emit('error', new Error('kaboom')); }); s.destroy = common.mustCall(); pipeline(s, async function(source) { }, common.mustCall((err) => { assert.strictEqual(err.message, 'kaboom'); })); } { const s = new PassThrough(); assert.throws(() => { pipeline(function(source) { }, s, () => {}); }, (err) => { assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE'); assert.strictEqual(s.destroyed, false); return true; }); } { const s = new PassThrough(); assert.throws(() => { pipeline(s, function(source) { }, s, () => {}); }, (err) => { assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE'); assert.strictEqual(s.destroyed, false); return true; }); } { const s = new PassThrough(); assert.throws(() => { pipeline(s, function(source) { }, () => {}); }, (err) => { assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE'); assert.strictEqual(s.destroyed, false); return true; }); } { const s = new PassThrough(); assert.throws(() => { pipeline(s, function*(source) { }, () => {}); }, (err) => { assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE'); assert.strictEqual(s.destroyed, false); return true; }); } { let res = ''; pipeline(async function*() { await Promise.resolve(); yield 'hello'; yield 'world'; }, new Transform({ transform(chunk, encoding, cb) { cb(new Error('kaboom')); } }), async function(source) { for await (const chunk of source) { res += chunk; } }, common.mustCall((err) => { assert.strictEqual(err.message, 'kaboom'); assert.strictEqual(res, ''); })); } { let res = ''; pipeline(async function*() { await Promise.resolve(); yield 'hello'; yield 'world'; }, new Transform({ transform(chunk, encoding, cb) { process.nextTick(cb, new Error('kaboom')); } }), async function(source) { for await (const chunk of source) { res += chunk; } }, common.mustCall((err) => { assert.strictEqual(err.message, 'kaboom'); assert.strictEqual(res, ''); })); } { let res = ''; pipeline(async function*() { await Promise.resolve(); yield 'hello'; yield 'world'; }, new Transform({ decodeStrings: false, transform(chunk, encoding, cb) { cb(null, chunk.toUpperCase()); } }), async function(source) { for await (const chunk of source) { res += chunk; } }, common.mustSucceed(() => { assert.strictEqual(res, 'HELLOWORLD'); })); } { // Ensure no unhandled rejection from async function. pipeline(async function*() { yield 'hello'; }, async function(source) { throw new Error('kaboom'); }, common.mustCall((err) => { assert.strictEqual(err.message, 'kaboom'); })); } { const src = new PassThrough({ autoDestroy: false }); const dst = new PassThrough({ autoDestroy: false }); pipeline(src, dst, common.mustCall(() => { assert.strictEqual(src.destroyed, false); assert.strictEqual(dst.destroyed, false); })); src.end(); } { // Make sure 'close' before 'end' finishes without error // if readable has received eof. // Ref: https://github.com/nodejs/node/issues/29699 const r = new Readable(); const w = new Writable({ write(chunk, encoding, cb) { cb(); } }); pipeline(r, w, (err) => { assert.strictEqual(err, undefined); }); r.push('asd'); r.push(null); r.emit('close'); } { const server = http.createServer((req, res) => { }); server.listen(0, () => { const req = http.request({ port: server.address().port }); const body = new PassThrough(); pipeline( body, req, common.mustSucceed(() => { assert(!req.res); assert(!req.aborted); req.abort(); server.close(); }) ); body.end(); }); } { const src = new PassThrough(); const dst = new PassThrough(); pipeline(src, dst, common.mustSucceed(() => { assert.strictEqual(dst.destroyed, false); })); src.end(); } { const src = new PassThrough(); const dst = new PassThrough(); dst.readable = false; pipeline(src, dst, common.mustSucceed(() => { assert.strictEqual(dst.destroyed, false); })); src.end(); } { let res = ''; const rs = new Readable({ read() { setImmediate(() => { rs.push('hello'); }); } }); const ws = new Writable({ write: common.mustNotCall() }); pipeline(rs, async function*(stream) { /* eslint no-unused-vars: off */ for await (const chunk of stream) { throw new Error('kaboom'); } }, async function *(source) { for await (const chunk of source) { res += chunk; } }, ws, common.mustCall((err) => { assert.strictEqual(err.message, 'kaboom'); assert.strictEqual(res, ''); })); } { const server = http.createServer((req, res) => { req.socket.on('error', common.mustNotCall()); pipeline(req, new PassThrough(), (err) => { assert.ifError(err); res.end(); server.close(); }); }); server.listen(0, () => { const req = http.request({ method: 'PUT', port: server.address().port }); req.end('asd123'); req.on('response', common.mustCall()); req.on('error', common.mustNotCall()); }); } { // Might still want to be able to use the writable side // of src. This is in the case where e.g. the Duplex input // is not directly connected to its output. Such a case could // happen when the Duplex is reading from a socket and then echos // the data back on the same socket. const src = new PassThrough(); assert.strictEqual(src.writable, true); const dst = new PassThrough(); pipeline(src, dst, common.mustCall((err) => { assert.strictEqual(src.writable, true); assert.strictEqual(src.destroyed, false); })); src.push(null); } { const src = new PassThrough(); const dst = pipeline( src, async function * (source) { for await (const chunk of source) { yield chunk; } }, common.mustCall((err) => { assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); }) ); src.push('asd'); dst.destroy(); } { pipeline(async function * () { yield 'asd'; }, async function * (source) { for await (const chunk of source) { yield { chunk }; } }, common.mustSucceed()); } { let closed = false; const src = new Readable({ read() {}, destroy(err, cb) { process.nextTick(cb); } }); const dst = new Writable({ write(chunk, encoding, callback) { callback(); } }); src.on('close', () => { closed = true; }); src.push(null); pipeline(src, dst, common.mustCall((err) => { assert.strictEqual(closed, true); })); } { let closed = false; const src = new Readable({ read() {}, destroy(err, cb) { process.nextTick(cb); } }); const dst = new Duplex({}); src.on('close', common.mustCall(() => { closed = true; })); src.push(null); pipeline(src, dst, common.mustCall((err) => { assert.strictEqual(closed, true); })); } { const server = net.createServer(common.mustCall((socket) => { // echo server pipeline(socket, socket, common.mustSucceed()); // 13 force destroys the socket before it has a chance to emit finish socket.on('finish', common.mustCall(() => { server.close(); })); })).listen(0, common.mustCall(() => { const socket = net.connect(server.address().port); socket.end(); })); } { const d = new Duplex({ autoDestroy: false, write: common.mustCall((data, enc, cb) => { d.push(data); cb(); }), read: common.mustCall(() => { d.push(null); }), final: common.mustCall((cb) => { setTimeout(() => { assert.strictEqual(d.destroyed, false); cb(); }, 1000); }), destroy: common.mustNotCall() }); const sink = new Writable({ write: common.mustCall((data, enc, cb) => { cb(); }) }); pipeline(d, sink, common.mustSucceed()); d.write('test'); d.end(); } { const server = net.createServer(common.mustCall((socket) => { // echo server pipeline(socket, socket, common.mustSucceed()); socket.on('finish', common.mustCall(() => { server.close(); })); })).listen(0, common.mustCall(() => { const socket = net.connect(server.address().port); socket.end(); })); } { const d = new Duplex({ autoDestroy: false, write: common.mustCall((data, enc, cb) => { d.push(data); cb(); }), read: common.mustCall(() => { d.push(null); }), final: common.mustCall((cb) => { setTimeout(() => { assert.strictEqual(d.destroyed, false); cb(); }, 1000); }), // `destroy()` won't be invoked by pipeline since // the writable side has not completed when // the pipeline has completed. destroy: common.mustNotCall() }); const sink = new Writable({ write: common.mustCall((data, enc, cb) => { cb(); }) }); pipeline(d, sink, common.mustSucceed()); d.write('test'); d.end(); } { const r = new Readable({ read() {} }); r.push('hello'); r.push('world'); r.push(null); let res = ''; const w = new Writable({ write(chunk, encoding, callback) { res += chunk; callback(); } }); pipeline([r, w], common.mustSucceed(() => { assert.strictEqual(res, 'helloworld'); })); } { let flushed = false; const makeStream = () => new Transform({ transform: (chunk, enc, cb) => cb(null, chunk), flush: (cb) => setTimeout(() => { flushed = true; cb(null); }, 1), }); const input = new Readable(); input.push(null); pipeline( input, makeStream(), common.mustCall(() => { assert.strictEqual(flushed, true); }), ); } { function createThenable() { let counter = 0; return { get then() { if (counter++) { throw new Error('Cannot access `then` more than once'); } return Function.prototype; }, }; } pipeline( function* () { yield 0; }, createThenable, () => common.mustNotCall(), ); } { const ac = new AbortController(); const r = Readable.from(async function* () { for (let i = 0; i < 10; i++) { await Promise.resolve(); yield String(i); if (i === 5) { ac.abort(); } } }()); let res = ''; const w = new Writable({ write(chunk, encoding, callback) { res += chunk; callback(); } }); const cb = common.mustCall((err) => { assert.strictEqual(err.name, 'AbortError'); assert.strictEqual(res, '012345'); assert.strictEqual(w.destroyed, true); assert.strictEqual(r.destroyed, true); assert.strictEqual(pipelined.destroyed, true); }); const pipelined = addAbortSignal(ac.signal, pipeline([r, w], cb)); } { pipeline([1, 2, 3], PassThrough({ objectMode: true }), common.mustSucceed(() => {})); let res = ''; const w = new Writable({ write(chunk, encoding, callback) { res += chunk; callback(); }, }); pipeline(['1', '2', '3'], w, common.mustSucceed(() => { assert.strictEqual(res, '123'); })); }