aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames M Snell <jasnell@gmail.com>2017-11-21 17:09:32 -0800
committerJames M Snell <jasnell@gmail.com>2017-11-24 12:20:05 -0800
commit9d87082d7df8b509d3e035b0222899d9cd8e58a3 (patch)
treeb45a62c0c3673d802d99efd4c95212ddd106420d
parent9ec78101fad90e6469677598b6bc3e8196d1b17f (diff)
downloadandroid-node-v8-9d87082d7df8b509d3e035b0222899d9cd8e58a3.tar.gz
android-node-v8-9d87082d7df8b509d3e035b0222899d9cd8e58a3.tar.bz2
android-node-v8-9d87082d7df8b509d3e035b0222899d9cd8e58a3.zip
http2: general cleanups in core.js
* fixup js debug messages * simplify and improve rstStream * improve and simplify _read * simplify and improve priority * simplify on ready a bit * simplify and improve respond/push * reduce duplication with _unrefActive * simplify stream close handling PR-URL: https://github.com/nodejs/node/pull/17209 Reviewed-By: Anatoli Papirovski <apapirovski@mac.com>
-rw-r--r--lib/internal/http2/core.js317
-rw-r--r--test/parallel/test-http2-priority-parent-self.js57
2 files changed, 125 insertions, 249 deletions
diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js
index de20eb23f2..39f67c4823 100644
--- a/lib/internal/http2/core.js
+++ b/lib/internal/http2/core.js
@@ -79,6 +79,7 @@ const kServer = Symbol('server');
const kSession = Symbol('session');
const kState = Symbol('state');
const kType = Symbol('type');
+const kUpdateTimer = Symbol('update-timer');
const kDefaultSocketTimeout = 2 * 60 * 1000;
const kRenegTest = /TLS session renegotiation disabled for this socket/;
@@ -149,9 +150,9 @@ function emit(self, ...args) {
function onSessionHeaders(handle, id, cat, flags, headers) {
const owner = this[kOwner];
const type = owner[kType];
- _unrefActive(owner);
- debug(`[${sessionName(type)}] headers were received on ` +
- `stream ${id}: ${cat}`);
+ owner[kUpdateTimer]();
+ debug(`Http2Stream ${id} [Http2Session ` +
+ `${sessionName(type)}]: headers received`);
const streams = owner[kState].streams;
const endOfStream = !!(flags & NGHTTP2_FLAG_END_STREAM);
@@ -196,7 +197,8 @@ function onSessionHeaders(handle, id, cat, flags, headers) {
} else {
event = endOfStream ? 'trailers' : 'headers';
}
- debug(`[${sessionName(type)}] emitting stream '${event}' event`);
+ debug(`Http2Stream ${id} [Http2Session ` +
+ `${sessionName(type)}]: emitting stream '${event}' event`);
process.nextTick(emit, stream, event, obj, flags, headers);
}
if (endOfStream) {
@@ -228,8 +230,7 @@ function onStreamTrailers() {
// Readable and Writable sides of the Duplex.
function onStreamClose(code) {
const stream = this[kOwner];
- _unrefActive(stream);
- _unrefActive(stream[kSession]);
+ stream[kUpdateTimer]();
abort(stream);
const state = stream[kState];
state.rst = true;
@@ -248,7 +249,7 @@ function afterFDClose(err) {
// Called when an error event needs to be triggered
function onSessionError(error) {
const owner = this[kOwner];
- _unrefActive(owner);
+ owner[kUpdateTimer]();
process.nextTick(emit, owner, 'error', error);
}
@@ -256,8 +257,7 @@ function onSessionError(error) {
// to the Http2Stream Duplex for processing.
function onStreamRead(nread, buf, handle) {
const stream = handle[kOwner];
- _unrefActive(stream);
- _unrefActive(stream[kSession]);
+ stream[kUpdateTimer]();
if (nread >= 0 && !stream.destroyed) {
if (!stream.push(buf)) {
handle.readStop();
@@ -272,8 +272,8 @@ function onStreamRead(nread, buf, handle) {
// Resets the cached settings.
function onSettings(ack) {
const owner = this[kOwner];
- debug(`[${sessionName(owner[kType])}] new settings received`);
- _unrefActive(owner);
+ debug(`Http2Session ${sessionName(owner[kType])}: new settings received`);
+ owner[kUpdateTimer]();
let event = 'remoteSettings';
if (ack) {
if (owner[kState].pendingAck > 0)
@@ -293,10 +293,10 @@ function onSettings(ack) {
// session (which may, in turn, forward it on to the server)
function onPriority(id, parent, weight, exclusive) {
const owner = this[kOwner];
- debug(`[${sessionName(owner[kType])}] priority advisement for stream ` +
- `${id}: \n parent: ${parent},\n weight: ${weight},\n` +
- ` exclusive: ${exclusive}`);
- _unrefActive(owner);
+ debug(`Http2Stream ${id} [Http2Session ` +
+ `${sessionName(owner[kType])}]: priority [parent: ${parent}, ` +
+ `weight: ${weight}, exclusive: ${exclusive}]`);
+ owner[kUpdateTimer]();
const streams = owner[kState].streams;
const stream = streams.get(id);
const emitter = stream === undefined ? owner : stream;
@@ -315,9 +315,9 @@ function emitFrameError(self, id, type, code) {
// frame. This should be exceedingly rare.
function onFrameError(id, type, code) {
const owner = this[kOwner];
- debug(`[${sessionName(owner[kType])}] error sending frame type ` +
+ debug(`Http2Session ${sessionName(owner[kType])}: error sending frame type ` +
`${type} on stream ${id}, code: ${code}`);
- _unrefActive(owner);
+ owner[kUpdateTimer]();
const streams = owner[kState].streams;
const stream = streams.get(id);
const emitter = stream !== undefined ? stream : owner;
@@ -340,7 +340,8 @@ function emitGoaway(self, code, lastStreamID, buf) {
// Called by the native layer when a goaway frame has been received
function onGoawayData(code, lastStreamID, buf) {
const owner = this[kOwner];
- debug(`[${sessionName(owner[kType])}] goaway data received`);
+ debug(`Http2Session ${sessionName(owner[kType])}: goaway ${code} received ` +
+ `[last stream id: ${lastStreamID}]`);
process.nextTick(emitGoaway, owner, code, lastStreamID, buf);
}
@@ -350,7 +351,6 @@ function onGoawayData(code, lastStreamID, buf) {
// frameLen <= n <= maxPayloadLen.
function onSelectPadding(fn) {
return function getPadding() {
- debug('fetching padding for frame');
const frameLen = paddingBuffer[PADDING_BUF_FRAME_LENGTH];
const maxFramePayloadLen = paddingBuffer[PADDING_BUF_MAX_PAYLOAD_LENGTH];
paddingBuffer[PADDING_BUF_RETURN_VALUE] =
@@ -366,7 +366,8 @@ function onSelectPadding(fn) {
// will be deferred until the socket is ready to go.
function requestOnConnect(headers, options) {
const session = this[kSession];
- debug(`[${sessionName(session[kType])}] connected.. initializing request`);
+ debug(`Http2Session ${sessionName(session[kType])}: connected, ` +
+ 'initializing request');
const streams = session[kState].streams;
validatePriorityOptions(options);
@@ -480,7 +481,7 @@ function onSessionInternalError(code) {
// of the socket. No other code should read from or write to the socket.
function setupHandle(session, socket, type, options) {
return function() {
- debug(`[${sessionName(type)}] setting up session handle`);
+ debug(`Http2Session ${sessionName(type)}: setting up session handle`);
session[kState].connecting = false;
updateOptionsBuffer(options);
@@ -515,20 +516,23 @@ function setupHandle(session, socket, type, options) {
// Submits a SETTINGS frame to be sent to the remote peer.
function submitSettings(settings) {
const type = this[kType];
- debug(`[${sessionName(type)}] submitting actual settings`);
- _unrefActive(this);
+ debug(`Http2Session ${sessionName(type)}: submitting settings`);
+ this[kUpdateTimer]();
this[kLocalSettings] = undefined;
updateSettingsBuffer(settings);
this[kHandle].settings();
- debug(`[${sessionName(type)}] settings complete`);
}
// Submits a PRIORITY frame to be sent to the remote peer
// Note: If the silent option is true, the change will be made
// locally with no PRIORITY frame sent.
function submitPriority(options) {
- _unrefActive(this);
- _unrefActive(this[kSession]);
+ this[kUpdateTimer]();
+
+ // If the parent is the id, do nothing because a
+ // stream cannot be made to depend on itself.
+ if (options.parent === this[kID])
+ return;
this[kHandle].priority(options.parent | 0,
options.weight | 0,
@@ -539,8 +543,13 @@ function submitPriority(options) {
// Submit an RST-STREAM frame to be sent to the remote peer.
// This will cause the Http2Stream to be closed.
function submitRstStream(code) {
- _unrefActive(this);
- _unrefActive(this[kSession]);
+ this[kUpdateTimer]();
+
+ const state = this[kState];
+ if (state.rst) return;
+ state.rst = true;
+ state.rstCode = code;
+
const ret = this[kHandle].rstStream(code);
if (ret < 0) {
const err = new NghttpError(ret);
@@ -561,19 +570,18 @@ function doShutdown(options) {
state.shuttingDown = false;
state.shutdown = true;
if (ret < 0) {
- debug(`[${sessionName(this[kType])}] shutdown failed! code: ${ret}`);
+ debug(`Http2Session ${sessionName(this[kType])}: shutdown failed`);
const err = new NghttpError(ret);
process.nextTick(emit, this, 'error', err);
return;
}
process.nextTick(emit, this, 'shutdown', options);
- debug(`[${sessionName(this[kType])}] shutdown is complete`);
}
// Submit a graceful or immediate shutdown request for the Http2Session.
function submitShutdown(options) {
const type = this[kType];
- debug(`[${sessionName(type)}] submitting actual shutdown request`);
+ debug(`Http2Session ${sessionName(type)}: submitting shutdown request`);
if (type === NGHTTP2_SESSION_SERVER && options.graceful === true) {
// first send a shutdown notice
this[kHandle].shutdownNotice();
@@ -712,7 +720,11 @@ class Http2Session extends EventEmitter {
// of concurrent streams (2^31-1 is the upper limit on the number
// of streams)
this.setMaxListeners(kMaxStreams);
- debug(`[${sessionName(type)}] http2session created`);
+ debug(`Http2Session ${sessionName(type)}: created`);
+ }
+
+ [kUpdateTimer]() {
+ _unrefActive(this);
}
setNextStreamID(id) {
@@ -854,12 +866,10 @@ class Http2Session extends EventEmitter {
throw new errors.Error('ERR_HTTP2_MAX_PENDING_SETTINGS_ACK',
this[kState].pendingAck);
}
- debug(`[${sessionName(this[kType])}] sending settings`);
+ debug(`Http2Session ${sessionName(this[kType])}: sending settings`);
state.pendingAck++;
if (state.connecting) {
- debug(`[${sessionName(this[kType])}] session still connecting, ` +
- 'queue settings');
this.once('connect', submitSettings.bind(this, settings));
return;
}
@@ -871,7 +881,7 @@ class Http2Session extends EventEmitter {
const state = this[kState];
if (state.destroyed || state.destroying)
return;
- debug(`[${sessionName(this[kType])}] destroying nghttp2session`);
+ debug(`Http2Session ${sessionName(this[kType])}: destroying`);
state.destroying = true;
state.destroyed = false;
@@ -942,7 +952,7 @@ class Http2Session extends EventEmitter {
options.lastStreamID);
}
- debug(`[${sessionName(type)}] initiating shutdown`);
+ debug(`Http2Session ${sessionName(type)}: initiating shutdown`);
state.shuttingDown = true;
if (callback) {
@@ -950,13 +960,11 @@ class Http2Session extends EventEmitter {
}
if (state.connecting) {
- debug(`[${sessionName(type)}] session still connecting, queue ` +
- 'shutdown');
this.once('connect', submitShutdown.bind(this, options));
return;
}
- debug(`[${sessionName(type)}] sending shutdown`);
+ debug(`Http2Session ${sessionName(type)}: sending shutdown`);
submitShutdown.call(this, options);
}
@@ -972,7 +980,7 @@ class Http2Session extends EventEmitter {
handle.chunksSentSinceLastWrite : null;
if (chunksSentSinceLastWrite !== null &&
chunksSentSinceLastWrite !== handle.updateChunksSent()) {
- _unrefActive(this);
+ this[kUpdateTimer]();
return;
}
}
@@ -995,7 +1003,6 @@ class ServerHttp2Session extends Http2Session {
class ClientHttp2Session extends Http2Session {
constructor(options, socket) {
super(NGHTTP2_SESSION_CLIENT, options, socket);
- debug(`[${sessionName(this[kType])}] clienthttp2session created`);
}
// Submits a new HTTP2 request to the connected peer. Returns the
@@ -1004,13 +1011,15 @@ class ClientHttp2Session extends Http2Session {
const state = this[kState];
if (state.destroyed || state.destroying)
throw new errors.Error('ERR_HTTP2_INVALID_SESSION');
- debug(`[${sessionName(this[kType])}] initiating request`);
- _unrefActive(this);
+ debug(`Http2Session ${sessionName(this[kType])}: initiating request`);
+
+ this[kUpdateTimer]();
+
assertIsObject(headers, 'headers');
assertIsObject(options, 'options');
headers = Object.assign(Object.create(null), headers);
- options = Object.assign(Object.create(null), options);
+ options = Object.assign({}, options);
if (headers[HTTP2_HEADER_METHOD] === undefined)
headers[HTTP2_HEADER_METHOD] = HTTP2_METHOD_GET;
@@ -1055,19 +1064,14 @@ class ClientHttp2Session extends Http2Session {
const stream = new ClientHttp2Stream(this, undefined, undefined, {});
- const onConnect = requestOnConnect.bind(stream, headers, options);
-
// Close the writable side of the stream if options.endStream is set.
if (options.endStream)
stream.end();
+ const onConnect = requestOnConnect.bind(stream, headers, options);
if (state.connecting) {
- debug(`[${sessionName(this[kType])}] session still connecting, queue ` +
- 'stream init');
stream.on('connect', onConnect);
} else {
- debug(`[${sessionName(this[kType])}] session connected, immediate ` +
- 'stream init');
onConnect();
}
return stream;
@@ -1107,15 +1111,13 @@ function afterDoStreamWrite(status, handle, req) {
const stream = handle[kOwner];
const session = stream[kSession];
- _unrefActive(stream);
+ stream[kUpdateTimer]();
const { bytes } = req;
stream[kState].writeQueueSize -= bytes;
- if (session !== undefined) {
- _unrefActive(session);
+ if (session !== undefined)
session[kState].writeQueueSize -= bytes;
- }
if (typeof req.callback === 'function')
req.callback();
@@ -1142,12 +1144,6 @@ function onSessionClose(hadError, code) {
this.end(); // Close the writable side
}
-function onStreamClosed(code) {
- abort(this);
- this.push(null); // Close the readable side
- this.end(); // Close the writable side
-}
-
function streamOnResume() {
if (this[kID] === undefined) {
this.once('ready', streamOnResume);
@@ -1166,18 +1162,11 @@ function handleFlushData(handle) {
function streamOnSessionConnect() {
const session = this[kSession];
- debug(`[${sessionName(session[kType])}] session connected. emiting stream ` +
- 'connect');
+ debug(`Http2Session ${sessionName(session[kType])}: session connected`);
this[kState].connecting = false;
process.nextTick(emit, this, 'connect');
}
-function streamOnceReady() {
- const session = this[kSession];
- debug(`[${sessionName(session[kType])}] stream ${this[kID]} is ready`);
- this.uncork();
-}
-
function abort(stream) {
if (!stream[kState].aborted &&
!(stream._writableState.ended || stream._writableState.ending)) {
@@ -1207,20 +1196,21 @@ class Http2Stream extends Duplex {
writeQueueSize: 0
};
- this.once('ready', streamOnceReady);
- this.once('streamClosed', onStreamClosed);
this.once('finish', onHandleFinish);
this.on('resume', streamOnResume);
this.on('pause', streamOnPause);
session.once('close', state.closeHandler);
if (session[kState].connecting) {
- debug(`[${sessionName(session[kType])}] session is still connecting, ` +
- 'queuing stream init');
state.connecting = true;
session.once('connect', streamOnSessionConnect.bind(this));
}
- debug(`[${sessionName(session[kType])}] http2stream created`);
+ }
+
+ [kUpdateTimer]() {
+ _unrefActive(this);
+ if (this[kSession])
+ _unrefActive(this[kSession]);
}
[kInit](id, handle) {
@@ -1231,6 +1221,7 @@ class Http2Stream extends Duplex {
handle.ontrailers = onStreamTrailers;
handle.onstreamclose = onStreamClose;
handle.onread = onStreamRead;
+ this.uncork();
this.emit('ready');
}
@@ -1267,8 +1258,7 @@ class Http2Stream extends Duplex {
handle.chunksSentSinceLastWrite : null;
if (chunksSentSinceLastWrite !== null &&
chunksSentSinceLastWrite !== handle.updateChunksSent()) {
- _unrefActive(this);
- _unrefActive(this[kSession]);
+ this[kUpdateTimer]();
return;
}
}
@@ -1311,8 +1301,7 @@ class Http2Stream extends Duplex {
return;
}
- _unrefActive(this);
- _unrefActive(this[kSession]);
+ this[kUpdateTimer]();
if (!this[kState].headersSent)
this[kProceed]();
@@ -1336,8 +1325,7 @@ class Http2Stream extends Duplex {
return;
}
- _unrefActive(this);
- _unrefActive(this[kSession]);
+ this[kUpdateTimer]();
if (!this[kState].headersSent)
this[kProceed]();
@@ -1362,16 +1350,12 @@ class Http2Stream extends Duplex {
}
_read(nread) {
- if (this[kID] === undefined) {
- this.once('ready', this._read.bind(this, nread));
- return;
- }
if (this.destroyed) {
this.push(null);
return;
}
- _unrefActive(this);
- process.nextTick(handleFlushData, this[kHandle]);
+ if (this[kHandle] !== undefined)
+ process.nextTick(handleFlushData, this[kHandle]);
}
// Submits an RST-STREAM frame to shutdown this stream.
@@ -1382,30 +1366,15 @@ class Http2Stream extends Duplex {
rstStream(code = NGHTTP2_NO_ERROR) {
if (typeof code !== 'number')
throw new errors.TypeError('ERR_INVALID_ARG_TYPE', 'code', 'number');
+ if (code < 0 || code > 2 ** 32 - 1)
+ throw new errors.RangeError('ERR_OUT_OF_RANGE', 'code');
+ const fn = submitRstStream.bind(this, code);
if (this[kID] === undefined) {
- this.once('ready', this.rstStream.bind(this, code));
+ this.once('ready', fn);
return;
}
-
- const state = this[kState];
- if (state.rst) {
- // rst has already been set by self or peer, do not set again
- return;
- }
- state.rst = true;
- state.rstCode = code;
-
- _unrefActive(this);
- _unrefActive(this[kSession]);
-
- const id = this[kID];
-
- if (id === undefined) {
- this.once('ready', submitRstStream.bind(this, code));
- return;
- }
- submitRstStream.call(this, code);
+ fn();
}
rstWithNoError() {
@@ -1431,36 +1400,17 @@ class Http2Stream extends Duplex {
priority(options) {
if (this.destroyed)
throw new errors.Error('ERR_HTTP2_INVALID_STREAM');
- const session = this[kSession];
- if (this[kID] === undefined) {
- debug(`[${sessionName(session[kType])}] queuing priority for new stream`);
- this.once('ready', this.priority.bind(this, options));
- return;
- }
- debug(`[${sessionName(session[kType])}] sending priority for stream ` +
- `${this[kID]}`);
- _unrefActive(this);
assertIsObject(options, 'options');
- options = Object.assign(Object.create(null), options);
+ options = Object.assign({}, options);
validatePriorityOptions(options);
- const id = this[kID];
- debug(`[${sessionName(session[kType])}] sending priority for stream ` +
- `${id}`);
-
- // A stream cannot be made to depend on itself
- if (options.parent === id) {
- throw new errors.TypeError('ERR_INVALID_OPT_VALUE',
- 'parent',
- options.parent);
- }
-
- if (id === undefined) {
- this.once('ready', submitPriority.bind(this, options));
+ const fn = submitPriority.bind(this, options);
+ if (this[kID] === undefined) {
+ this.once('ready', fn);
return;
}
- submitPriority.call(this, options);
+ fn();
}
// Called by this.destroy().
@@ -1472,11 +1422,13 @@ class Http2Stream extends Duplex {
_destroy(err, callback) {
const session = this[kSession];
if (this[kID] === undefined) {
- debug(`[${sessionName(session[kType])}] queuing destroy for new stream`);
this.once('ready', this._destroy.bind(this, err, callback));
return;
}
+ debug(`Http2Stream ${this[kID]} [Http2Session ` +
+ `${sessionName(session[kType])}]: destroying stream`);
+
const state = this[kState];
session[kState].writeQueueSize -= state.writeQueueSize;
state.writeQueueSize = 0;
@@ -1494,8 +1446,6 @@ function continueStreamDestroy(err, callback) {
const session = this[kSession];
const state = this[kState];
- debug(`[${sessionName(session[kType])}] destroying stream ${this[kID]}`);
-
// Submit RST-STREAM frame if one hasn't been sent already and the
// stream hasn't closed normally...
const rst = state.rst;
@@ -1520,8 +1470,10 @@ function continueStreamDestroy(err, callback) {
err = new errors.Error('ERR_HTTP2_STREAM_ERROR', code);
}
callback(err);
+ abort(this);
+ this.push(null); // Close the readable side
+ this.end(); // Close the writable side
process.nextTick(emit, this, 'streamClosed', code);
- debug(`[${sessionName(session[kType])}] stream ${this[kID]} destroyed`);
}
function finishStreamDestroy() {
@@ -1696,7 +1648,8 @@ function afterOpen(session, options, headers, streamOptions, err, fd) {
function streamOnError(err) {
// we swallow the error for parity with HTTP1
// all the errors that ends here are not critical for the project
- debug('ServerHttp2Stream errored, avoiding uncaughtException', err);
+ debug(`Http2Stream ${this[kID]} [Http2Session ` +
+ `${this[kSession][kType]}: error`, err);
}
@@ -1707,7 +1660,6 @@ class ServerHttp2Stream extends Http2Stream {
this[kProtocol] = headers[HTTP2_HEADER_SCHEME];
this[kAuthority] = headers[HTTP2_HEADER_AUTHORITY];
this.on('error', streamOnError);
- debug(`[${sessionName(session[kType])}] created serverhttp2stream`);
}
// true if the HEADERS frame has been sent
@@ -1730,12 +1682,10 @@ class ServerHttp2Stream extends Http2Stream {
if (!session.remoteSettings.enablePush)
throw new errors.Error('ERR_HTTP2_PUSH_DISABLED');
- debug(`[${sessionName(session[kType])}] initiating push stream for stream` +
- ` ${this[kID]}`);
+ debug(`Http2Stream ${this[kID]} [Http2Session ` +
+ `${sessionName(session[kType])}]: initiating push stream`);
- _unrefActive(this);
- const state = session[kState];
- const streams = state.streams;
+ this[kUpdateTimer]();
if (typeof options === 'function') {
callback = options;
@@ -1746,7 +1696,7 @@ class ServerHttp2Stream extends Http2Stream {
throw new errors.TypeError('ERR_INVALID_CALLBACK');
assertIsObject(options, 'options');
- options = Object.assign(Object.create(null), options);
+ options = Object.assign({}, options);
options.endStream = !!options.endStream;
assertIsObject(headers, 'headers');
@@ -1762,16 +1712,13 @@ class ServerHttp2Stream extends Http2Stream {
headers[HTTP2_HEADER_PATH] = '/';
let headRequest = false;
- if (headers[HTTP2_HEADER_METHOD] === HTTP2_METHOD_HEAD) {
- headRequest = true;
- options.endStream = true;
- }
+ if (headers[HTTP2_HEADER_METHOD] === HTTP2_METHOD_HEAD)
+ headRequest = options.endStream = true;
+ options.readable = !options.endStream;
const headersList = mapToHeaders(headers);
- if (!Array.isArray(headersList)) {
- // An error occurred!
+ if (!Array.isArray(headersList))
throw headersList;
- }
const streamOptions = options.endStream ? STREAM_OPTION_EMPTY_PAYLOAD : 0;
@@ -1794,19 +1741,14 @@ class ServerHttp2Stream extends Http2Stream {
}
const id = ret.id();
- debug(`[${sessionName(session[kType])}] push stream ${id} created`);
- options.readable = !options.endStream;
-
const stream = new ServerHttp2Stream(session, ret, id, options, headers);
- streams.set(id, stream);
+ session[kState].streams.set(id, stream);
- // If the push stream is a head request, close the writable side of
- // the stream immediately as there won't be any data sent.
- if (headRequest) {
+ if (options.endStream)
stream.end();
- const state = stream[kState];
- state.headRequest = true;
- }
+
+ if (headRequest)
+ stream[kState].headRequest = true;
process.nextTick(callback, stream, headers, 0);
}
@@ -1816,16 +1758,16 @@ class ServerHttp2Stream extends Http2Stream {
const session = this[kSession];
if (this.destroyed)
throw new errors.Error('ERR_HTTP2_INVALID_STREAM');
- debug(`[${sessionName(session[kType])}] initiating response for stream ` +
- `${this[kID]}`);
- _unrefActive(this);
+ debug(`Http2Stream ${this[kID]} [Http2Session ` +
+ `${sessionName(session[kType])}]: initiating response`);
+ this[kUpdateTimer]();
const state = this[kState];
if (state.headersSent)
throw new errors.Error('ERR_HTTP2_HEADERS_SENT');
assertIsObject(options, 'options');
- options = Object.assign(Object.create(null), options);
+ options = Object.assign({}, options);
options.endStream = !!options.endStream;
let streamOptions = 0;
@@ -1856,10 +1798,9 @@ class ServerHttp2Stream extends Http2Stream {
}
const headersList = mapToHeaders(headers, assertValidPseudoHeaderResponse);
- if (!Array.isArray(headersList)) {
- // An error occurred!
+ if (!Array.isArray(headersList))
throw headersList;
- }
+
state.headersSent = true;
// Close the writable side if the endStream option is set
@@ -1883,9 +1824,9 @@ class ServerHttp2Stream extends Http2Stream {
const session = this[kSession];
if (this.destroyed)
throw new errors.Error('ERR_HTTP2_INVALID_STREAM');
- debug(`[${sessionName(session[kType])}] initiating response for stream ` +
- `${this[kID]}`);
- _unrefActive(this);
+ debug(`Http2Stream ${this[kID]} [Http2Session ` +
+ `${sessionName(session[kType])}]: initiating response`);
+ this[kUpdateTimer]();
const state = this[kState];
if (state.headersSent)
@@ -1966,9 +1907,9 @@ class ServerHttp2Stream extends Http2Stream {
const session = this[kSession];
if (this.destroyed)
throw new errors.Error('ERR_HTTP2_INVALID_STREAM');
- debug(`[${sessionName(session[kType])}] initiating response for stream ` +
- `${this[kID]}`);
- _unrefActive(this);
+ debug(`Http2Stream ${this[kID]} [Http2Session ` +
+ `${sessionName(session[kType])}]: initiating response`);
+ this[kUpdateTimer]();
const state = this[kState];
if (state.headersSent)
@@ -2033,7 +1974,8 @@ class ServerHttp2Stream extends Http2Stream {
throw new errors.Error('ERR_HTTP2_HEADERS_AFTER_RESPOND');
const session = this[kSession];
- debug(`[${sessionName(session[kType])}] sending additional headers`);
+ debug(`Http2Stream ${this[kID]} [Http2Session ` +
+ `${sessionName(session[kType])}]: sending additional headers`);
assertIsObject(headers, 'headers');
headers = Object.assign(Object.create(null), headers);
@@ -2047,7 +1989,7 @@ class ServerHttp2Stream extends Http2Stream {
}
}
- _unrefActive(this);
+ this[kUpdateTimer]();
const headersList = mapToHeaders(headers,
assertValidPseudoHeaderResponse);
@@ -2072,7 +2014,6 @@ class ClientHttp2Stream extends Http2Stream {
if (id !== undefined)
this[kInit](id, handle);
this.on('headers', handleHeaderContinue);
- debug(`[${sessionName(session[kType])}] clienthttp2stream created`);
}
}
@@ -2101,7 +2042,7 @@ const setTimeout = {
}
} else {
enroll(this, msecs);
- _unrefActive(this);
+ this[kUpdateTimer]();
if (callback !== undefined) {
if (typeof callback !== 'function')
throw new errors.TypeError('ERR_INVALID_CALLBACK');
@@ -2121,13 +2062,12 @@ Object.defineProperty(Http2Session.prototype, 'setTimeout', setTimeout);
function socketDestroy(error) {
const session = this[kSession];
const type = session[kType];
- debug(`[${sessionName(type)}] socket destroy called`);
+ debug(`Http2Session ${sessionName(type)}: socket destroy called`);
delete this[kServer];
// destroy the session first so that it will stop trying to
// send data while we close the socket.
session.destroy();
this.destroy = this[kDestroySocket];
- debug(`[${sessionName(type)}] destroying the socket`);
this.destroy(error);
}
@@ -2136,7 +2076,8 @@ function socketDestroy(error) {
// a sessionError; failing that, destroy, remove the error listener, and
// re-emit the error event
function sessionOnError(error) {
- debug(`[${sessionName(this[kType])}] server session error: ${error.message}`);
+ debug(`Http2Session ${sessionName(this[kType])}: session error: ` +
+ `${error.message}`);
if (this[kServer] !== undefined && this[kServer].emit('sessionError', error))
return;
if (this[kSocket] !== undefined && this[kSocket].emit('sessionError', error))
@@ -2151,7 +2092,7 @@ function sessionOnError(error) {
function socketOnError(error) {
const session = this[kSession];
const type = session && session[kType];
- debug(`[${sessionName(type)}] server socket error: ${error.message}`);
+ debug(`Http2Session ${sessionName(type)}: socket error: ${error.message}`);
if (kRenegTest.test(error.message))
return this.destroy();
if (session !== undefined &&
@@ -2164,12 +2105,11 @@ function socketOnError(error) {
// Handles the on('stream') event for a session and forwards
// it on to the server object.
function sessionOnStream(stream, headers, flags, rawHeaders) {
- debug(`[${sessionName(this[kType])}] emit server stream event`);
this[kServer].emit('stream', stream, headers, flags, rawHeaders);
}
function sessionOnPriority(stream, parent, weight, exclusive) {
- debug(`[${sessionName(this[kType])}] priority change received`);
+ debug(`Http2Session ${sessionName(this[kType])}: priority change received`);
this[kServer].emit('priority', stream, parent, weight, exclusive);
}
@@ -2180,7 +2120,6 @@ function sessionOnSocketError(error, socket) {
// When the session times out on the server, attempt a graceful shutdown
function sessionOnTimeout() {
- debug('session timeout');
process.nextTick(() => {
const state = this[kState];
// if destroyed or destryoing, do nothing
@@ -2204,7 +2143,7 @@ function sessionOnTimeout() {
}
function connectionListener(socket) {
- debug('[server] received a connection');
+ debug('Http2Session server: received a connection');
const options = this[kOptions] || {};
if (socket.alpnProtocol === false || socket.alpnProtocol === 'http/1.1') {
@@ -2273,7 +2212,6 @@ class Http2SecureServer extends TLSServer {
if (typeof requestListener === 'function')
this.on('request', requestListener);
this.on('tlsClientError', onErrorSecureServerSession);
- debug('http2secureserver created');
}
setTimeout(msecs, callback) {
@@ -2295,7 +2233,6 @@ class Http2Server extends NETServer {
this.on('newListener', setupCompat);
if (typeof requestListener === 'function')
this.on('request', requestListener);
- debug('http2server created');
}
setTimeout(msecs, callback) {
@@ -2311,7 +2248,6 @@ class Http2Server extends NETServer {
function setupCompat(ev) {
if (ev === 'request') {
- debug('setting up compatibility handler');
this.removeListener('newListener', setupCompat);
this.on('stream', onServerStream);
}
@@ -2329,7 +2265,8 @@ function socketOnClose() {
// If the session emits an error, forward it to the socket as a sessionError;
// failing that, destroy the session, remove the listener and re-emit the error
function clientSessionOnError(error) {
- debug(`[${sessionName(this[kType])}] session error: ${error.message}`);
+ debug(`Http2Session ${sessionName(this[kType])}]: session error: ` +
+ `${error.message}`);
if (this[kSocket] !== undefined && this[kSocket].emit('sessionError', error))
return;
this.destroy();
@@ -2351,8 +2288,6 @@ function connect(authority, options, listener) {
assertIsObject(authority, 'authority', ['string', 'Object', 'URL']);
- debug(`connecting to ${authority}`);
-
const protocol = authority.protocol || options.protocol || 'https:';
const port = '' + (authority.port !== '' ?
authority.port : (authority.protocol === 'http:' ? 80 : 443));
@@ -2406,7 +2341,6 @@ function createSecureServer(options, handler) {
'options',
'Object');
}
- debug('creating http2secureserver');
return new Http2SecureServer(options, handler);
}
@@ -2415,7 +2349,6 @@ function createServer(options, handler) {
handler = options;
options = Object.create(null);
}
- debug('creating htt2pserver');
return new Http2Server(options, handler);
}
diff --git a/test/parallel/test-http2-priority-parent-self.js b/test/parallel/test-http2-priority-parent-self.js
deleted file mode 100644
index 55a161bf17..0000000000
--- a/test/parallel/test-http2-priority-parent-self.js
+++ /dev/null
@@ -1,57 +0,0 @@
-'use strict';
-
-const common = require('../common');
-if (!common.hasCrypto)
- common.skip('missing crypto');
-const h2 = require('http2');
-
-const server = h2.createServer();
-const invalidOptValueError = (value) => ({
- type: TypeError,
- code: 'ERR_INVALID_OPT_VALUE',
- message: `The value "${value}" is invalid for option "parent"`
-});
-
-// we use the lower-level API here
-server.on('stream', common.mustCall((stream) => {
- common.expectsError(
- () => stream.priority({
- parent: stream.id,
- weight: 1,
- exclusive: false
- }),
- invalidOptValueError(stream.id)
- );
- stream.respond({
- 'content-type': 'text/html',
- ':status': 200
- });
- stream.end('hello world');
-}));
-
-server.listen(0, common.mustCall(() => {
-
- const client = h2.connect(`http://localhost:${server.address().port}`);
- const req = client.request({ ':path': '/' });
-
- req.on(
- 'ready',
- () => common.expectsError(
- () => req.priority({
- parent: req.id,
- weight: 1,
- exclusive: false
- }),
- invalidOptValueError(req.id)
- )
- );
-
- req.on('response', common.mustCall());
- req.resume();
- req.on('end', common.mustCall(() => {
- server.close();
- client.destroy();
- }));
- req.end();
-
-}));