summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/api/stream.md3
-rw-r--r--lib/_stream_writable.js37
-rw-r--r--lib/internal/streams/destroy.js13
-rw-r--r--test/parallel/test-http2-reset-flood.js5
-rw-r--r--test/parallel/test-stream-writable-destroy.js14
-rw-r--r--test/parallel/test-stream-writable-write-cb-error.js58
-rw-r--r--test/parallel/test-wrap-js-stream-exceptions.js6
-rw-r--r--test/parallel/test-zlib-write-after-close.js14
8 files changed, 28 insertions, 122 deletions
diff --git a/doc/api/stream.md b/doc/api/stream.md
index 48b478a58e..d4dbe54dbc 100644
--- a/doc/api/stream.md
+++ b/doc/api/stream.md
@@ -571,8 +571,7 @@ The `writable.write()` method writes some data to the stream, and calls the
supplied `callback` once the data has been fully handled. If an error
occurs, the `callback` *may or may not* be called with the error as its
first argument. To reliably detect write errors, add a listener for the
-`'error'` event. If `callback` is called with an error, it will be called
-before the `'error'` event is emitted.
+`'error'` event.
The return value is `true` if the internal buffer is less than the
`highWaterMark` configured when the stream was created after admitting `chunk`.
diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js
index 9b75b672cb..c6d895ff5d 100644
--- a/lib/_stream_writable.js
+++ b/lib/_stream_writable.js
@@ -158,11 +158,6 @@ function WritableState(options, stream, isDuplex) {
// Should .destroy() be called after 'finish' (and potentially 'end')
this.autoDestroy = !!(options && options.autoDestroy);
- // Indicates whether the stream has errored. When true all write() calls
- // should return false. This is needed since when autoDestroy
- // is disabled we need a way to tell whether the stream has failed.
- this.errored = false;
-
// Count buffered requests
this.bufferedRequestCount = 0;
@@ -406,7 +401,7 @@ function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {
if (!ret)
state.needDrain = true;
- if (state.writing || state.corked || state.errored) {
+ if (state.writing || state.corked) {
var last = state.lastBufferedRequest;
state.lastBufferedRequest = {
chunk,
@@ -425,9 +420,7 @@ function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {
doWrite(stream, state, false, len, chunk, encoding, cb);
}
- // Return false if errored or destroyed in order to break
- // any synchronous while(stream.write(data)) loops.
- return ret && !state.errored && !state.destroyed;
+ return ret;
}
function doWrite(stream, state, writev, len, chunk, encoding, cb) {
@@ -444,11 +437,18 @@ function doWrite(stream, state, writev, len, chunk, encoding, cb) {
state.sync = false;
}
-function onwriteError(stream, state, er, cb) {
+function onwriteError(stream, state, sync, er, cb) {
--state.pendingcb;
- cb(er);
- // This can emit error, but error must always follow cb.
+ if (sync) {
+ // Defer the callback if we are being called synchronously
+ // to avoid piling up things on the stack
+ process.nextTick(cb, er);
+ } else {
+ // The caller expect this to happen before if
+ // it is async
+ cb(er);
+ }
errorOrDestroy(stream, er);
}
@@ -465,14 +465,9 @@ function onwrite(stream, er) {
state.length -= state.writelen;
state.writelen = 0;
- if (er) {
- state.errored = true;
- if (sync) {
- process.nextTick(onwriteError, stream, state, er, cb);
- } else {
- onwriteError(stream, state, er, cb);
- }
- } else {
+ if (er)
+ onwriteError(stream, state, sync, er, cb);
+ else {
// Check if we're actually ready to finish, but don't emit yet
var finished = needFinish(state) || stream.destroyed;
@@ -627,7 +622,7 @@ Object.defineProperty(Writable.prototype, 'writableLength', {
function needFinish(state) {
return (state.ending &&
state.length === 0 &&
- !state.errored &&
+ !state.errorEmitted &&
state.bufferedRequest === null &&
!state.finished &&
!state.writing);
diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js
index 8708ca022c..27985482ce 100644
--- a/lib/internal/streams/destroy.js
+++ b/lib/internal/streams/destroy.js
@@ -27,10 +27,6 @@ function destroy(err, cb) {
const r = this._readableState;
const w = this._writableState;
- if (w && err) {
- w.errored = true;
- }
-
if ((w && w.destroyed) || (r && r.destroyed)) {
if (cb) {
cb(err);
@@ -54,12 +50,10 @@ function destroy(err, cb) {
this._destroy(err || null, (err) => {
const emitClose = (w && w.emitClose) || (r && r.emitClose);
if (cb) {
- // Invoke callback before scheduling emitClose so that callback
- // can schedule before.
- cb(err);
if (emitClose) {
process.nextTick(emitCloseNT, this);
}
+ cb(err);
} else if (needError(this, err)) {
process.nextTick(emitClose ? emitErrorCloseNT : emitErrorNT, this, err);
} else if (emitClose) {
@@ -97,7 +91,6 @@ function undestroy() {
if (w) {
w.destroyed = false;
- w.errored = false;
w.ended = false;
w.ending = false;
w.finalCalled = false;
@@ -117,10 +110,6 @@ function errorOrDestroy(stream, err) {
const r = stream._readableState;
const w = stream._writableState;
- if (w & err) {
- w.errored = true;
- }
-
if ((r && r.autoDestroy) || (w && w.autoDestroy))
stream.destroy(err);
else if (needError(stream, err))
diff --git a/test/parallel/test-http2-reset-flood.js b/test/parallel/test-http2-reset-flood.js
index 9977bfd1a3..a6553401fb 100644
--- a/test/parallel/test-http2-reset-flood.js
+++ b/test/parallel/test-http2-reset-flood.js
@@ -67,10 +67,7 @@ const worker = new Worker(__filename).on('message', common.mustCall((port) => {
h2header.writeIntBE(1, 0, 3); // Length: 1
h2header.writeIntBE(i, 5, 4); // Stream ID
// 0x88 = :status: 200
- if (!conn.write(Buffer.concat([h2header, Buffer.from([0x88])]))) {
- process.nextTick(writeRequests);
- break;
- }
+ conn.write(Buffer.concat([h2header, Buffer.from([0x88])]));
}
}
diff --git a/test/parallel/test-stream-writable-destroy.js b/test/parallel/test-stream-writable-destroy.js
index c4a96788ab..ac107ecbb7 100644
--- a/test/parallel/test-stream-writable-destroy.js
+++ b/test/parallel/test-stream-writable-destroy.js
@@ -18,20 +18,6 @@ const assert = require('assert');
{
const write = new Writable({
- write(chunk, enc, cb) {
- this.destroy(new Error('asd'));
- cb();
- }
- });
-
- write.on('error', common.mustCall());
- write.on('finish', common.mustNotCall());
- write.end('asd');
- assert.strictEqual(write.destroyed, true);
-}
-
-{
- const write = new Writable({
write(chunk, enc, cb) { cb(); }
});
diff --git a/test/parallel/test-stream-writable-write-cb-error.js b/test/parallel/test-stream-writable-write-cb-error.js
deleted file mode 100644
index 72db1b7e3f..0000000000
--- a/test/parallel/test-stream-writable-write-cb-error.js
+++ /dev/null
@@ -1,58 +0,0 @@
-'use strict';
-const common = require('../common');
-const { Writable } = require('stream');
-const assert = require('assert');
-
-// Ensure callback is always invoked before
-// error is emitted. Regardless if error was
-// sync or async.
-
-{
- let callbackCalled = false;
- // Sync Error
- const writable = new Writable({
- write: common.mustCall((buf, enc, cb) => {
- cb(new Error());
- })
- });
- writable.on('error', common.mustCall(() => {
- assert.strictEqual(callbackCalled, true);
- }));
- writable.write('hi', common.mustCall(() => {
- callbackCalled = true;
- }));
-}
-
-{
- let callbackCalled = false;
- // Async Error
- const writable = new Writable({
- write: common.mustCall((buf, enc, cb) => {
- process.nextTick(cb, new Error());
- })
- });
- writable.on('error', common.mustCall(() => {
- assert.strictEqual(callbackCalled, true);
- }));
- writable.write('hi', common.mustCall(() => {
- callbackCalled = true;
- }));
-}
-
-{
- // Sync Error
- const writable = new Writable({
- write: common.mustCall((buf, enc, cb) => {
- cb(new Error());
- })
- });
-
- writable.on('error', common.mustCall());
-
- let cnt = 0;
- // Ensure we don't live lock on sync error
- while (writable.write('a'))
- cnt++;
-
- assert.strictEqual(cnt, 0);
-}
diff --git a/test/parallel/test-wrap-js-stream-exceptions.js b/test/parallel/test-wrap-js-stream-exceptions.js
index 2cc592a760..eeab26f525 100644
--- a/test/parallel/test-wrap-js-stream-exceptions.js
+++ b/test/parallel/test-wrap-js-stream-exceptions.js
@@ -16,8 +16,4 @@ const socket = new JSStreamWrap(new Duplex({
})
}));
-socket.end('foo');
-socket.on('error', common.expectsError({
- type: Error,
- message: 'write EPROTO'
-}));
+assert.throws(() => socket.end('foo'), /Error: write EPROTO/);
diff --git a/test/parallel/test-zlib-write-after-close.js b/test/parallel/test-zlib-write-after-close.js
index 24d1e9b990..160971b16b 100644
--- a/test/parallel/test-zlib-write-after-close.js
+++ b/test/parallel/test-zlib-write-after-close.js
@@ -26,10 +26,12 @@ const zlib = require('zlib');
zlib.gzip('hello', common.mustCall(function(err, out) {
const unzip = zlib.createGunzip();
unzip.close(common.mustCall());
-
- unzip.write(out);
- unzip.on('error', common.expectsError({
- code: 'ERR_STREAM_DESTROYED',
- type: Error
- }));
+ common.expectsError(
+ () => unzip.write(out),
+ {
+ code: 'ERR_STREAM_DESTROYED',
+ type: Error,
+ message: 'Cannot call write after a stream was destroyed'
+ }
+ );
}));