summaryrefslogtreecommitdiff
path: root/test/parallel/test-readable-from-iterator-closing.js
blob: 02252ffe56854c2bfa6f240a6faf40faa518fe88 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
'use strict';

const { mustCall, mustNotCall } = require('../common');
const { Readable } = require('stream');
const { strictEqual } = require('assert');

async function asyncSupport() {
  const finallyMustCall = mustCall();
  const bodyMustCall = mustCall();

  async function* infiniteGenerate() {
    try {
      while (true) yield 'a';
    } finally {
      finallyMustCall();
    }
  }

  const stream = Readable.from(infiniteGenerate());

  for await (const chunk of stream) {
    bodyMustCall();
    strictEqual(chunk, 'a');
    break;
  }
}

async function syncSupport() {
  const finallyMustCall = mustCall();
  const bodyMustCall = mustCall();

  function* infiniteGenerate() {
    try {
      while (true) yield 'a';
    } finally {
      finallyMustCall();
    }
  }

  const stream = Readable.from(infiniteGenerate());

  for await (const chunk of stream) {
    bodyMustCall();
    strictEqual(chunk, 'a');
    break;
  }
}

async function syncPromiseSupport() {
  const returnMustBeAwaited = mustCall();
  const bodyMustCall = mustCall();

  function* infiniteGenerate() {
    try {
      while (true) yield Promise.resolve('a');
    } finally {
      // eslint-disable-next-line no-unsafe-finally
      return { then(cb) {
        returnMustBeAwaited();
        cb();
      } };
    }
  }

  const stream = Readable.from(infiniteGenerate());

  for await (const chunk of stream) {
    bodyMustCall();
    strictEqual(chunk, 'a');
    break;
  }
}

async function syncRejectedSupport() {
  const returnMustBeAwaited = mustCall();
  const bodyMustNotCall = mustNotCall();
  const catchMustCall = mustCall();
  const secondNextMustNotCall = mustNotCall();

  function* generate() {
    try {
      yield Promise.reject('a');
      secondNextMustNotCall();
    } finally {
      // eslint-disable-next-line no-unsafe-finally
      return { then(cb) {
        returnMustBeAwaited();
        cb();
      } };
    }
  }

  const stream = Readable.from(generate());

  try {
    for await (const chunk of stream) {
      bodyMustNotCall(chunk);
    }
  } catch {
    catchMustCall();
  }
}

async function noReturnAfterThrow() {
  const returnMustNotCall = mustNotCall();
  const bodyMustNotCall = mustNotCall();
  const catchMustCall = mustCall();
  const nextMustCall = mustCall();

  const stream = Readable.from({
    [Symbol.asyncIterator]() { return this; },
    async next() {
      nextMustCall();
      throw new Error('a');
    },
    async return() {
      returnMustNotCall();
      return { done: true };
    },
  });

  try {
    for await (const chunk of stream) {
      bodyMustNotCall(chunk);
    }
  } catch {
    catchMustCall();
  }
}

async function closeStreamWhileNextIsPending() {
  const finallyMustCall = mustCall();
  const dataMustCall = mustCall();

  let resolveDestroy;
  const destroyed =
    new Promise((resolve) => { resolveDestroy = mustCall(resolve); });
  let resolveYielded;
  const yielded =
    new Promise((resolve) => { resolveYielded = mustCall(resolve); });

  async function* infiniteGenerate() {
    try {
      while (true) {
        yield 'a';
        resolveYielded();
        await destroyed;
      }
    } finally {
      finallyMustCall();
    }
  }

  const stream = Readable.from(infiniteGenerate());

  stream.on('data', (data) => {
    dataMustCall();
    strictEqual(data, 'a');
  });

  yielded.then(() => {
    stream.destroy();
    resolveDestroy();
  });
}

async function closeAfterNullYielded() {
  const finallyMustCall = mustCall();
  const dataMustCall = mustCall(3);

  function* generate() {
    try {
      yield 'a';
      yield 'a';
      yield 'a';
    } finally {
      finallyMustCall();
    }
  }

  const stream = Readable.from(generate());

  stream.on('data', (chunk) => {
    dataMustCall();
    strictEqual(chunk, 'a');
  });
}

Promise.all([
  asyncSupport(),
  syncSupport(),
  syncPromiseSupport(),
  syncRejectedSupport(),
  noReturnAfterThrow(),
  closeStreamWhileNextIsPending(),
  closeAfterNullYielded(),
]).then(mustCall());