diff options
Diffstat (limited to 'lib/internal/http2/core.js')
-rw-r--r-- | lib/internal/http2/core.js | 110 |
1 files changed, 66 insertions, 44 deletions
diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index fbccbf0d84..39d456fb75 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -341,20 +341,25 @@ function onStreamClose(code) { stream[kState].fd = -1; // Defer destroy we actually emit end. - if (stream._readableState.endEmitted || code !== NGHTTP2_NO_ERROR) { + if (!stream.readable || code !== NGHTTP2_NO_ERROR) { // If errored or ended, we can destroy immediately. - stream[kMaybeDestroy](null, code); + stream[kMaybeDestroy](code); } else { // Wait for end to destroy. stream.on('end', stream[kMaybeDestroy]); // Push a null so the stream can end whenever the client consumes // it completely. stream.push(null); - // If the client hasn't tried to consume the stream and there is no - // resume scheduled (which would indicate they would consume in the future), - // then just dump the incoming data so that the stream can be destroyed. - if (!stream[kState].didRead && !stream._readableState.resumeScheduled) + + // If the user hasn't tried to consume the stream (and this is a server + // session) then just dump the incoming data so that the stream can + // be destroyed. + if (stream[kSession][kType] === NGHTTP2_SESSION_SERVER && + !stream[kState].didRead && + stream.readableFlowing === null) stream.resume(); + else + stream.read(0); } } @@ -379,7 +384,7 @@ function onStreamRead(nread, buf) { `${sessionName(stream[kSession][kType])}]: ending readable.`); // defer this until we actually emit end - if (stream._readableState.endEmitted) { + if (!stream.readable) { stream[kMaybeDestroy](); } else { stream.on('end', stream[kMaybeDestroy]); @@ -469,8 +474,7 @@ function onGoawayData(code, lastStreamID, buf) { // goaway using NGHTTP2_NO_ERROR because there was no error // condition on this side of the session that caused the // shutdown. - session.destroy(new ERR_HTTP2_SESSION_ERROR(code), - { errorCode: NGHTTP2_NO_ERROR }); + session.destroy(new ERR_HTTP2_SESSION_ERROR(code), NGHTTP2_NO_ERROR); } } @@ -813,6 +817,21 @@ function emitClose(self, error) { self.emit('close'); } +function finishSessionDestroy(session, error) { + const socket = session[kSocket]; + if (!socket.destroyed) + socket.destroy(error); + + session[kProxySocket] = undefined; + session[kSocket] = undefined; + session[kHandle] = undefined; + socket[kSession] = undefined; + socket[kServer] = undefined; + + // Finally, emit the close and error events (if necessary) on next tick. + process.nextTick(emitClose, session, error); +} + // Upon creation, the Http2Session takes ownership of the socket. The session // may not be ready to use immediately if the socket is not yet fully connected. // In that case, the Http2Session will wait for the socket to connect. Once @@ -869,6 +888,8 @@ class Http2Session extends EventEmitter { this[kState] = { flags: SESSION_FLAGS_PENDING, + goawayCode: null, + goawayLastStreamID: null, streams: new Map(), pendingStreams: new Set(), pendingAck: 0, @@ -1171,25 +1192,13 @@ class Http2Session extends EventEmitter { if (handle !== undefined) handle.destroy(code, socket.destroyed); - // If there is no error, use setImmediate to destroy the socket on the + // If the socket is alive, use setImmediate to destroy the session on the // next iteration of the event loop in order to give data time to transmit. // Otherwise, destroy immediately. - if (!socket.destroyed) { - if (!error) { - setImmediate(socket.destroy.bind(socket)); - } else { - socket.destroy(error); - } - } - - this[kProxySocket] = undefined; - this[kSocket] = undefined; - this[kHandle] = undefined; - socket[kSession] = undefined; - socket[kServer] = undefined; - - // Finally, emit the close and error events (if necessary) on next tick. - process.nextTick(emitClose, this, error); + if (!socket.destroyed) + setImmediate(finishSessionDestroy, this, error); + else + finishSessionDestroy(this, error); } // Closing the session will: @@ -1441,11 +1450,8 @@ function afterDoStreamWrite(status, handle) { } function streamOnResume() { - if (!this.destroyed && !this.pending) { - if (!this[kState].didRead) - this[kState].didRead = true; + if (!this.destroyed) this[kHandle].readStart(); - } } function streamOnPause() { @@ -1460,6 +1466,16 @@ function afterShutdown() { stream[kMaybeDestroy](); } +function finishSendTrailers(stream, headersList) { + stream[kState].flags &= ~STREAM_FLAGS_HAS_TRAILERS; + + const ret = stream[kHandle].trailers(headersList); + if (ret < 0) + stream.destroy(new NghttpError(ret)); + else + stream[kMaybeDestroy](); +} + function closeStream(stream, code, shouldSubmitRstStream = true) { const state = stream[kState]; state.flags |= STREAM_FLAGS_CLOSED; @@ -1521,6 +1537,10 @@ class Http2Stream extends Duplex { this[kSession] = session; session[kState].pendingStreams.add(this); + // Allow our logic for determining whether any reads have happened to + // work in all situations. This is similar to what we do in _http_incoming. + this._readableState.readingMore = true; + this[kTimeout] = null; this[kState] = { @@ -1531,7 +1551,6 @@ class Http2Stream extends Duplex { trailersReady: false }; - this.on('resume', streamOnResume); this.on('pause', streamOnPause); } @@ -1725,6 +1744,10 @@ class Http2Stream extends Duplex { this.push(null); return; } + if (!this[kState].didRead) { + this._readableState.readingMore = false; + this[kState].didRead = true; + } if (!this.pending) { streamOnResume.call(this); } else { @@ -1773,13 +1796,8 @@ class Http2Stream extends Duplex { throw headersList; this[kSentTrailers] = headers; - this[kState].flags &= ~STREAM_FLAGS_HAS_TRAILERS; - - const ret = this[kHandle].trailers(headersList); - if (ret < 0) - this.destroy(new NghttpError(ret)); - else - this[kMaybeDestroy](); + // Send the trailers in setImmediate so we don't do it on nghttp2 stack. + setImmediate(finishSendTrailers, this, headersList); } get closed() { @@ -1866,15 +1884,15 @@ class Http2Stream extends Duplex { } // The Http2Stream can be destroyed if it has closed and if the readable // side has received the final chunk. - [kMaybeDestroy](error, code = NGHTTP2_NO_ERROR) { - if (error || code !== NGHTTP2_NO_ERROR) { - this.destroy(error); + [kMaybeDestroy](code = NGHTTP2_NO_ERROR) { + if (code !== NGHTTP2_NO_ERROR) { + this.destroy(); return; } // TODO(mcollina): remove usage of _*State properties - if (this._writableState.ended && this._writableState.pendingcb === 0) { - if (this._readableState.ended && this.closed) { + if (!this.writable) { + if (!this.readable && this.closed) { this.destroy(); return; } @@ -1887,7 +1905,7 @@ class Http2Stream extends Duplex { this[kSession][kType] === NGHTTP2_SESSION_SERVER && !(state.flags & STREAM_FLAGS_HAS_TRAILERS) && !state.didRead && - !this._readableState.resumeScheduled) { + this.readableFlowing === null) { this.close(); } } @@ -2477,6 +2495,10 @@ Object.defineProperty(Http2Session.prototype, 'setTimeout', setTimeout); function socketOnError(error) { const session = this[kSession]; if (session !== undefined) { + // We can ignore ECONNRESET after GOAWAY was received as there's nothing + // we can do and the other side is fully within its rights to do so. + if (error.code === 'ECONNRESET' && session[kState].goawayCode !== null) + return session.destroy(); debug(`Http2Session ${sessionName(session[kType])}: socket error [` + `${error.message}]`); session.destroy(error); |