summaryrefslogtreecommitdiff
path: root/test/parallel/test-stream-pipe-unpipe-streams.js
blob: 4cb8413af22f1842fd82ca902d29f450d0161dc7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
'use strict';
const common = require('../common');
const assert = require('assert');

const { Readable, Writable } = require('stream');

const source = Readable({ read: () => {} });
const dest1 = Writable({ write: () => {} });
const dest2 = Writable({ write: () => {} });

source.pipe(dest1);
source.pipe(dest2);

dest1.on('unpipe', common.mustCall());
dest2.on('unpipe', common.mustCall());

assert.strictEqual(source._readableState.pipes[0], dest1);
assert.strictEqual(source._readableState.pipes[1], dest2);
assert.strictEqual(source._readableState.pipes.length, 2);

// Should be able to unpipe them in the reverse order that they were piped.

source.unpipe(dest2);

assert.deepStrictEqual(source._readableState.pipes, [dest1]);
assert.notStrictEqual(source._readableState.pipes, dest2);

dest2.on('unpipe', common.mustNotCall());
source.unpipe(dest2);

source.unpipe(dest1);

assert.strictEqual(source._readableState.pipes.length, 0);

{
  // Test `cleanup()` if we unpipe all streams.
  const source = Readable({ read: () => {} });
  const dest1 = Writable({ write: () => {} });
  const dest2 = Writable({ write: () => {} });

  let destCount = 0;
  const srcCheckEventNames = ['end', 'data'];
  const destCheckEventNames = ['close', 'finish', 'drain', 'error', 'unpipe'];

  const checkSrcCleanup = common.mustCall(() => {
    assert.strictEqual(source._readableState.pipes.length, 0);
    assert.strictEqual(source._readableState.flowing, false);

    srcCheckEventNames.forEach((eventName) => {
      assert.strictEqual(
        source.listenerCount(eventName), 0,
        `source's '${eventName}' event listeners not removed`
      );
    });
  });

  function checkDestCleanup(dest) {
    const currentDestId = ++destCount;
    source.pipe(dest);

    const unpipeChecker = common.mustCall(() => {
      assert.deepStrictEqual(
        dest.listeners('unpipe'), [unpipeChecker],
        `destination{${currentDestId}} should have a 'unpipe' event ` +
        'listener which is `unpipeChecker`'
      );
      dest.removeListener('unpipe', unpipeChecker);
      destCheckEventNames.forEach((eventName) => {
        assert.strictEqual(
          dest.listenerCount(eventName), 0,
          `destination{${currentDestId}}'s '${eventName}' event ` +
          'listeners not removed'
        );
      });

      if (--destCount === 0)
        checkSrcCleanup();
    });

    dest.on('unpipe', unpipeChecker);
  }

  checkDestCleanup(dest1);
  checkDestCleanup(dest2);
  source.unpipe();
}