summaryrefslogtreecommitdiff
path: root/test/parallel/test-stream-pipe-flow.js
blob: 1f2e8f54cec409f8a708a9de31349363255c2370 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
'use strict';
const common = require('../common');
const assert = require('assert');
const { Readable, Writable, PassThrough } = require('stream');

{
  let ticks = 17;

  const rs = new Readable({
    objectMode: true,
    read: () => {
      if (ticks-- > 0)
        return process.nextTick(() => rs.push({}));
      rs.push({});
      rs.push(null);
    }
  });

  const ws = new Writable({
    highWaterMark: 0,
    objectMode: true,
    write: (data, end, cb) => setImmediate(cb)
  });

  rs.on('end', common.mustCall());
  ws.on('finish', common.mustCall());
  rs.pipe(ws);
}

{
  let missing = 8;

  const rs = new Readable({
    objectMode: true,
    read: () => {
      if (missing--) rs.push({});
      else rs.push(null);
    }
  });

  const pt = rs
    .pipe(new PassThrough({ objectMode: true, highWaterMark: 2 }))
    .pipe(new PassThrough({ objectMode: true, highWaterMark: 2 }));

  pt.on('end', () => {
    wrapper.push(null);
  });

  const wrapper = new Readable({
    objectMode: true,
    read: () => {
      process.nextTick(() => {
        let data = pt.read();
        if (data === null) {
          pt.once('readable', () => {
            data = pt.read();
            if (data !== null) wrapper.push(data);
          });
        } else {
          wrapper.push(data);
        }
      });
    }
  });

  wrapper.resume();
  wrapper.on('end', common.mustCall());
}

{
  // Only register drain if there is backpressure.
  const rs = new Readable({ read() {} });

  const pt = rs
    .pipe(new PassThrough({ objectMode: true, highWaterMark: 2 }));
  assert.strictEqual(pt.listenerCount('drain'), 0);
  pt.on('finish', () => {
    assert.strictEqual(pt.listenerCount('drain'), 0);
  });

  rs.push('asd');
  assert.strictEqual(pt.listenerCount('drain'), 0);

  process.nextTick(() => {
    rs.push('asd');
    assert.strictEqual(pt.listenerCount('drain'), 0);
    rs.push(null);
    assert.strictEqual(pt.listenerCount('drain'), 0);
  });
}