summaryrefslogtreecommitdiff
path: root/lib/internal/http2/core.js
diff options
context:
space:
mode:
Diffstat (limited to 'lib/internal/http2/core.js')
-rw-r--r--lib/internal/http2/core.js110
1 files changed, 66 insertions, 44 deletions
diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js
index fbccbf0d84..39d456fb75 100644
--- a/lib/internal/http2/core.js
+++ b/lib/internal/http2/core.js
@@ -341,20 +341,25 @@ function onStreamClose(code) {
stream[kState].fd = -1;
// Defer destroy we actually emit end.
- if (stream._readableState.endEmitted || code !== NGHTTP2_NO_ERROR) {
+ if (!stream.readable || code !== NGHTTP2_NO_ERROR) {
// If errored or ended, we can destroy immediately.
- stream[kMaybeDestroy](null, code);
+ stream[kMaybeDestroy](code);
} else {
// Wait for end to destroy.
stream.on('end', stream[kMaybeDestroy]);
// Push a null so the stream can end whenever the client consumes
// it completely.
stream.push(null);
- // If the client hasn't tried to consume the stream and there is no
- // resume scheduled (which would indicate they would consume in the future),
- // then just dump the incoming data so that the stream can be destroyed.
- if (!stream[kState].didRead && !stream._readableState.resumeScheduled)
+
+ // If the user hasn't tried to consume the stream (and this is a server
+ // session) then just dump the incoming data so that the stream can
+ // be destroyed.
+ if (stream[kSession][kType] === NGHTTP2_SESSION_SERVER &&
+ !stream[kState].didRead &&
+ stream.readableFlowing === null)
stream.resume();
+ else
+ stream.read(0);
}
}
@@ -379,7 +384,7 @@ function onStreamRead(nread, buf) {
`${sessionName(stream[kSession][kType])}]: ending readable.`);
// defer this until we actually emit end
- if (stream._readableState.endEmitted) {
+ if (!stream.readable) {
stream[kMaybeDestroy]();
} else {
stream.on('end', stream[kMaybeDestroy]);
@@ -469,8 +474,7 @@ function onGoawayData(code, lastStreamID, buf) {
// goaway using NGHTTP2_NO_ERROR because there was no error
// condition on this side of the session that caused the
// shutdown.
- session.destroy(new ERR_HTTP2_SESSION_ERROR(code),
- { errorCode: NGHTTP2_NO_ERROR });
+ session.destroy(new ERR_HTTP2_SESSION_ERROR(code), NGHTTP2_NO_ERROR);
}
}
@@ -813,6 +817,21 @@ function emitClose(self, error) {
self.emit('close');
}
+function finishSessionDestroy(session, error) {
+ const socket = session[kSocket];
+ if (!socket.destroyed)
+ socket.destroy(error);
+
+ session[kProxySocket] = undefined;
+ session[kSocket] = undefined;
+ session[kHandle] = undefined;
+ socket[kSession] = undefined;
+ socket[kServer] = undefined;
+
+ // Finally, emit the close and error events (if necessary) on next tick.
+ process.nextTick(emitClose, session, error);
+}
+
// Upon creation, the Http2Session takes ownership of the socket. The session
// may not be ready to use immediately if the socket is not yet fully connected.
// In that case, the Http2Session will wait for the socket to connect. Once
@@ -869,6 +888,8 @@ class Http2Session extends EventEmitter {
this[kState] = {
flags: SESSION_FLAGS_PENDING,
+ goawayCode: null,
+ goawayLastStreamID: null,
streams: new Map(),
pendingStreams: new Set(),
pendingAck: 0,
@@ -1171,25 +1192,13 @@ class Http2Session extends EventEmitter {
if (handle !== undefined)
handle.destroy(code, socket.destroyed);
- // If there is no error, use setImmediate to destroy the socket on the
+ // If the socket is alive, use setImmediate to destroy the session on the
// next iteration of the event loop in order to give data time to transmit.
// Otherwise, destroy immediately.
- if (!socket.destroyed) {
- if (!error) {
- setImmediate(socket.destroy.bind(socket));
- } else {
- socket.destroy(error);
- }
- }
-
- this[kProxySocket] = undefined;
- this[kSocket] = undefined;
- this[kHandle] = undefined;
- socket[kSession] = undefined;
- socket[kServer] = undefined;
-
- // Finally, emit the close and error events (if necessary) on next tick.
- process.nextTick(emitClose, this, error);
+ if (!socket.destroyed)
+ setImmediate(finishSessionDestroy, this, error);
+ else
+ finishSessionDestroy(this, error);
}
// Closing the session will:
@@ -1441,11 +1450,8 @@ function afterDoStreamWrite(status, handle) {
}
function streamOnResume() {
- if (!this.destroyed && !this.pending) {
- if (!this[kState].didRead)
- this[kState].didRead = true;
+ if (!this.destroyed)
this[kHandle].readStart();
- }
}
function streamOnPause() {
@@ -1460,6 +1466,16 @@ function afterShutdown() {
stream[kMaybeDestroy]();
}
+function finishSendTrailers(stream, headersList) {
+ stream[kState].flags &= ~STREAM_FLAGS_HAS_TRAILERS;
+
+ const ret = stream[kHandle].trailers(headersList);
+ if (ret < 0)
+ stream.destroy(new NghttpError(ret));
+ else
+ stream[kMaybeDestroy]();
+}
+
function closeStream(stream, code, shouldSubmitRstStream = true) {
const state = stream[kState];
state.flags |= STREAM_FLAGS_CLOSED;
@@ -1521,6 +1537,10 @@ class Http2Stream extends Duplex {
this[kSession] = session;
session[kState].pendingStreams.add(this);
+ // Allow our logic for determining whether any reads have happened to
+ // work in all situations. This is similar to what we do in _http_incoming.
+ this._readableState.readingMore = true;
+
this[kTimeout] = null;
this[kState] = {
@@ -1531,7 +1551,6 @@ class Http2Stream extends Duplex {
trailersReady: false
};
- this.on('resume', streamOnResume);
this.on('pause', streamOnPause);
}
@@ -1725,6 +1744,10 @@ class Http2Stream extends Duplex {
this.push(null);
return;
}
+ if (!this[kState].didRead) {
+ this._readableState.readingMore = false;
+ this[kState].didRead = true;
+ }
if (!this.pending) {
streamOnResume.call(this);
} else {
@@ -1773,13 +1796,8 @@ class Http2Stream extends Duplex {
throw headersList;
this[kSentTrailers] = headers;
- this[kState].flags &= ~STREAM_FLAGS_HAS_TRAILERS;
-
- const ret = this[kHandle].trailers(headersList);
- if (ret < 0)
- this.destroy(new NghttpError(ret));
- else
- this[kMaybeDestroy]();
+ // Send the trailers in setImmediate so we don't do it on nghttp2 stack.
+ setImmediate(finishSendTrailers, this, headersList);
}
get closed() {
@@ -1866,15 +1884,15 @@ class Http2Stream extends Duplex {
}
// The Http2Stream can be destroyed if it has closed and if the readable
// side has received the final chunk.
- [kMaybeDestroy](error, code = NGHTTP2_NO_ERROR) {
- if (error || code !== NGHTTP2_NO_ERROR) {
- this.destroy(error);
+ [kMaybeDestroy](code = NGHTTP2_NO_ERROR) {
+ if (code !== NGHTTP2_NO_ERROR) {
+ this.destroy();
return;
}
// TODO(mcollina): remove usage of _*State properties
- if (this._writableState.ended && this._writableState.pendingcb === 0) {
- if (this._readableState.ended && this.closed) {
+ if (!this.writable) {
+ if (!this.readable && this.closed) {
this.destroy();
return;
}
@@ -1887,7 +1905,7 @@ class Http2Stream extends Duplex {
this[kSession][kType] === NGHTTP2_SESSION_SERVER &&
!(state.flags & STREAM_FLAGS_HAS_TRAILERS) &&
!state.didRead &&
- !this._readableState.resumeScheduled) {
+ this.readableFlowing === null) {
this.close();
}
}
@@ -2477,6 +2495,10 @@ Object.defineProperty(Http2Session.prototype, 'setTimeout', setTimeout);
function socketOnError(error) {
const session = this[kSession];
if (session !== undefined) {
+ // We can ignore ECONNRESET after GOAWAY was received as there's nothing
+ // we can do and the other side is fully within its rights to do so.
+ if (error.code === 'ECONNRESET' && session[kState].goawayCode !== null)
+ return session.destroy();
debug(`Http2Session ${sessionName(session[kType])}: socket error [` +
`${error.message}]`);
session.destroy(error);