summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/internal/http2/core.js25
-rw-r--r--lib/internal/stream_base_commons.js28
-rw-r--r--lib/net.js40
-rw-r--r--src/node_http2.cc8
4 files changed, 47 insertions, 54 deletions
diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js
index 9c17f8077b..ecf243f845 100644
--- a/lib/internal/http2/core.js
+++ b/lib/internal/http2/core.js
@@ -108,6 +108,7 @@ const {
writeGeneric,
writevGeneric,
onStreamRead,
+ kAfterAsyncWrite,
kMaybeDestroy,
kUpdateTimer
} = require('internal/stream_base_commons');
@@ -1515,21 +1516,6 @@ function trackWriteState(stream, bytes) {
session[kHandle].chunksSentSinceLastWrite = 0;
}
-function afterDoStreamWrite(status, handle) {
- const stream = handle[kOwner];
- const session = stream[kSession];
-
- stream[kUpdateTimer]();
-
- const { bytes } = this;
- stream[kState].writeQueueSize -= bytes;
-
- if (session !== undefined)
- session[kState].writeQueueSize -= bytes;
- if (typeof this.callback === 'function')
- this.callback(null);
-}
-
function streamOnResume() {
if (!this.destroyed)
this[kHandle].readStart();
@@ -1782,6 +1768,13 @@ class Http2Stream extends Duplex {
'bug in Node.js');
}
+ [kAfterAsyncWrite]({ bytes }) {
+ this[kState].writeQueueSize -= bytes;
+
+ if (this.session !== undefined)
+ this.session[kState].writeQueueSize -= bytes;
+ }
+
[kWriteGeneric](writev, data, encoding, cb) {
// When the Http2Stream is first created, it is corked until the
// handle and the stream ID is assigned. However, if the user calls
@@ -1808,7 +1801,7 @@ class Http2Stream extends Duplex {
if (!this.headersSent)
this[kProceed]();
- const req = createWriteWrap(this[kHandle], afterDoStreamWrite);
+ const req = createWriteWrap(this[kHandle]);
req.stream = this[kID];
if (writev)
diff --git a/lib/internal/stream_base_commons.js b/lib/internal/stream_base_commons.js
index 709395fa91..31291e751d 100644
--- a/lib/internal/stream_base_commons.js
+++ b/lib/internal/stream_base_commons.js
@@ -16,6 +16,7 @@ const { owner_symbol } = require('internal/async_hooks').symbols;
const kMaybeDestroy = Symbol('kMaybeDestroy');
const kUpdateTimer = Symbol('kUpdateTimer');
+const kAfterAsyncWrite = Symbol('kAfterAsyncWrite');
function handleWriteReq(req, data, encoding) {
const { handle } = req;
@@ -52,11 +53,33 @@ function handleWriteReq(req, data, encoding) {
}
}
-function createWriteWrap(handle, oncomplete) {
+function onWriteComplete(status) {
+ const stream = this.handle[owner_symbol];
+
+ if (stream.destroyed) {
+ if (typeof this.callback === 'function')
+ this.callback(null);
+ return;
+ }
+
+ if (status < 0) {
+ const ex = errnoException(status, 'write', this.error);
+ stream.destroy(ex, this.callback);
+ return;
+ }
+
+ stream[kUpdateTimer]();
+ stream[kAfterAsyncWrite](this);
+
+ if (typeof this.callback === 'function')
+ this.callback(null);
+}
+
+function createWriteWrap(handle) {
var req = new WriteWrap();
req.handle = handle;
- req.oncomplete = oncomplete;
+ req.oncomplete = onWriteComplete;
req.async = false;
req.bytes = 0;
req.buffer = null;
@@ -160,6 +183,7 @@ module.exports = {
writevGeneric,
writeGeneric,
onStreamRead,
+ kAfterAsyncWrite,
kMaybeDestroy,
kUpdateTimer,
};
diff --git a/lib/net.js b/lib/net.js
index 08d08888de..01cfed98e8 100644
--- a/lib/net.js
+++ b/lib/net.js
@@ -62,6 +62,7 @@ const {
writevGeneric,
writeGeneric,
onStreamRead,
+ kAfterAsyncWrite,
kUpdateTimer
} = require('internal/stream_base_commons');
const {
@@ -685,6 +686,10 @@ protoGetter('localPort', function localPort() {
});
+Socket.prototype[kAfterAsyncWrite] = function() {
+ this[kLastWriteQueueSize] = 0;
+};
+
Socket.prototype._writeGeneric = function(writev, data, encoding, cb) {
// If we are still connecting, then buffer this for later.
// The Writable logic will buffer up any more writes while
@@ -707,7 +712,7 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) {
this._unrefTimer();
- var req = createWriteWrap(this._handle, afterWrite);
+ var req = createWriteWrap(this._handle);
if (writev)
writevGeneric(this, req, data, cb);
else
@@ -771,39 +776,6 @@ protoGetter('bytesWritten', function bytesWritten() {
});
-function afterWrite(status, handle, err) {
- var self = handle[owner_symbol];
- if (self !== process.stderr && self !== process.stdout)
- debug('afterWrite', status);
-
- if (this.async)
- self[kLastWriteQueueSize] = 0;
-
- // callback may come after call to destroy.
- if (self.destroyed) {
- debug('afterWrite destroyed');
- if (this.callback)
- this.callback(null);
- return;
- }
-
- if (status < 0) {
- var ex = errnoException(status, 'write', this.error);
- debug('write failure', ex);
- self.destroy(ex, this.callback);
- return;
- }
-
- self._unrefTimer();
-
- if (self !== process.stderr && self !== process.stdout)
- debug('afterWrite call cb');
-
- if (this.callback)
- this.callback.call(undefined);
-}
-
-
function checkBindError(err, port, handle) {
// EADDRINUSE may not be reported until we call listen() or connect().
// To complicate matters, a failed bind() followed by listen() or connect()
diff --git a/src/node_http2.cc b/src/node_http2.cc
index f92d39f655..20cfcf3745 100644
--- a/src/node_http2.cc
+++ b/src/node_http2.cc
@@ -1571,8 +1571,12 @@ void Http2Session::ClearOutgoing(int status) {
current_outgoing_buffers_.swap(outgoing_buffers_);
for (const nghttp2_stream_write& wr : current_outgoing_buffers_) {
WriteWrap* wrap = wr.req_wrap;
- if (wrap != nullptr)
- wrap->Done(status);
+ if (wrap != nullptr) {
+ // TODO(addaleax): Pass `status` instead of 0, so that we actually error
+ // out with the error from the write to the underlying protocol,
+ // if one occurred.
+ wrap->Done(0);
+ }
}
}