diff options
author | Anatoli Papirovski <apapirovski@mac.com> | 2017-10-25 19:04:41 -0400 |
---|---|---|
committer | James M Snell <jasnell@gmail.com> | 2017-10-29 10:01:16 -0700 |
commit | ca82e3088dfe204ab36baa492a3e399d46827453 (patch) | |
tree | 0d7f7d31ea9731b4300ba65e8477ed616a33ef2c /lib | |
parent | 8a9be4175b17b574459bccb99d8b6e6be6db4070 (diff) | |
download | android-node-v8-ca82e3088dfe204ab36baa492a3e399d46827453.tar.gz android-node-v8-ca82e3088dfe204ab36baa492a3e399d46827453.tar.bz2 android-node-v8-ca82e3088dfe204ab36baa492a3e399d46827453.zip |
http2: fix several timeout related issues
* correctly reset write timers: currently reset timers on
both session & stream when write starts and when it ends.
* prevent large writes from timing out: when writing a large
chunk of data in http2, once the data is handed off to C++,
the JS session & stream lose all track of the write and will
timeout if the write doesn't complete within the timeout window
Fix this issue by tracking whether a write request is ongoing and
also tracking how many chunks have been sent since the most recent
write started. (Since each write call resets the timer.)
PR-URL: https://github.com/nodejs/node/pull/16525
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: James M Snell <jasnell@gmail.com>
Diffstat (limited to 'lib')
-rw-r--r-- | lib/internal/http2/core.js | 78 |
1 files changed, 72 insertions, 6 deletions
diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 727ca51798..a667a2e760 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -746,7 +746,8 @@ class Http2Session extends EventEmitter { shutdown: false, shuttingDown: false, pendingAck: 0, - maxPendingAck: Math.max(1, (options.maxPendingAck | 0) || 10) + maxPendingAck: Math.max(1, (options.maxPendingAck | 0) || 10), + writeQueueSize: 0 }; this[kType] = type; @@ -1080,6 +1081,22 @@ class Http2Session extends EventEmitter { } _onTimeout() { + // This checks whether a write is currently in progress and also whether + // that write is actually sending data across the write. The kHandle + // stored `chunksSentSinceLastWrite` is only updated when a timeout event + // happens, meaning that if a write is ongoing it should never equal the + // newly fetched, updated value. + if (this[kState].writeQueueSize > 0) { + const handle = this[kHandle]; + const chunksSentSinceLastWrite = handle !== undefined ? + handle.chunksSentSinceLastWrite : null; + if (chunksSentSinceLastWrite !== null && + chunksSentSinceLastWrite !== handle.updateChunksSent()) { + _unrefActive(this); + return; + } + } + process.nextTick(emit, this, 'timeout'); } } @@ -1199,8 +1216,27 @@ function createWriteReq(req, handle, data, encoding) { } } +function trackWriteState(stream, bytes) { + const session = stream[kSession]; + stream[kState].writeQueueSize += bytes; + session[kState].writeQueueSize += bytes; + session[kHandle].chunksSentSinceLastWrite = 0; +} + function afterDoStreamWrite(status, handle, req) { - _unrefActive(handle[kOwner]); + const session = handle[kOwner]; + _unrefActive(session); + + const state = session[kState]; + const { bytes } = req; + state.writeQueueSize -= bytes; + + const stream = state.streams.get(req.stream); + if (stream !== undefined) { + _unrefActive(stream); + stream[kState].writeQueueSize -= bytes; + } + if (typeof req.callback === 'function') req.callback(); this.handle = undefined; @@ -1312,7 +1348,8 @@ class Http2Stream extends Duplex { headersSent: false, headRequest: false, aborted: false, - closeHandler: onSessionClose.bind(this) + closeHandler: onSessionClose.bind(this), + writeQueueSize: 0 }; this.once('ready', streamOnceReady); @@ -1359,6 +1396,23 @@ class Http2Stream extends Duplex { } _onTimeout() { + // This checks whether a write is currently in progress and also whether + // that write is actually sending data across the write. The kHandle + // stored `chunksSentSinceLastWrite` is only updated when a timeout event + // happens, meaning that if a write is ongoing it should never equal the + // newly fetched, updated value. + if (this[kState].writeQueueSize > 0) { + const handle = this[kSession][kHandle]; + const chunksSentSinceLastWrite = handle !== undefined ? + handle.chunksSentSinceLastWrite : null; + if (chunksSentSinceLastWrite !== null && + chunksSentSinceLastWrite !== handle.updateChunksSent()) { + _unrefActive(this); + _unrefActive(this[kSession]); + return; + } + } + process.nextTick(emit, this, 'timeout'); } @@ -1396,10 +1450,11 @@ class Http2Stream extends Duplex { this.once('ready', this._write.bind(this, data, encoding, cb)); return; } - _unrefActive(this); if (!this[kState].headersSent) this[kProceed](); const session = this[kSession]; + _unrefActive(this); + _unrefActive(session); const handle = session[kHandle]; const req = new WriteWrap(); req.stream = this[kID]; @@ -1410,7 +1465,7 @@ class Http2Stream extends Duplex { const err = createWriteReq(req, handle, data, encoding); if (err) throw util._errnoException(err, 'write', req.error); - this._bytesDispatched += req.bytes; + trackWriteState(this, req.bytes); } _writev(data, cb) { @@ -1418,10 +1473,11 @@ class Http2Stream extends Duplex { this.once('ready', this._writev.bind(this, data, cb)); return; } - _unrefActive(this); if (!this[kState].headersSent) this[kProceed](); const session = this[kSession]; + _unrefActive(this); + _unrefActive(session); const handle = session[kHandle]; const req = new WriteWrap(); req.stream = this[kID]; @@ -1438,6 +1494,7 @@ class Http2Stream extends Duplex { const err = handle.writev(req, chunks); if (err) throw util._errnoException(err, 'write', req.error); + trackWriteState(this, req.bytes); } _read(nread) { @@ -1531,6 +1588,10 @@ class Http2Stream extends Duplex { return; } + const state = this[kState]; + session[kState].writeQueueSize -= state.writeQueueSize; + state.writeQueueSize = 0; + const server = session[kServer]; if (server !== undefined && err) { server.emit('streamError', err, this); @@ -1625,7 +1686,12 @@ function processRespondWithFD(fd, headers, offset = 0, length = -1, if (ret < 0) { err = new NghttpError(ret); process.nextTick(emit, this, 'error', err); + break; } + // exact length of the file doesn't matter here, since the + // stream is closing anyway — just use 1 to signify that + // a write does exist + trackWriteState(this, 1); } } |