diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/node_http2.cc | 650 | ||||
-rw-r--r-- | src/node_http2.h | 42 |
2 files changed, 487 insertions, 205 deletions
diff --git a/src/node_http2.cc b/src/node_http2.cc index 1ce3c5cb46..ac785de4cd 100644 --- a/src/node_http2.cc +++ b/src/node_http2.cc @@ -23,13 +23,30 @@ using v8::Undefined; namespace http2 { +// These configure the callbacks required by nghttp2 itself. There are +// two sets of callback functions, one that is used if a padding callback +// is set, and other that does not include the padding callback. const Http2Session::Callbacks Http2Session::callback_struct_saved[2] = { Callbacks(false), Callbacks(true)}; +// The Http2Scope object is used to queue a write to the i/o stream. It is +// used whenever any action is take on the underlying nghttp2 API that may +// push data into nghttp2 outbound data queue. +// +// For example: +// +// Http2Scope h2scope(session); +// nghttp2_submit_ping(**session, ... ); +// +// When the Http2Scope passes out of scope and is deconstructed, it will +// call Http2Session::MaybeScheduleWrite(). Http2Scope::Http2Scope(Http2Stream* stream) : Http2Scope(stream->session()) {} Http2Scope::Http2Scope(Http2Session* session) { + if (session == nullptr) + return; + if (session->flags_ & (SESSION_STATE_HAS_SCOPE | SESSION_STATE_WRITE_SCHEDULED)) { // There is another scope further below on the stack, or it is already @@ -48,9 +65,19 @@ Http2Scope::~Http2Scope() { session_->MaybeScheduleWrite(); } +// The Http2Options object is used during the construction of Http2Session +// instances to configure an appropriate nghttp2_options struct. The class +// uses a single TypedArray instance that is shared with the JavaScript side +// to more efficiently pass values back and forth. Http2Options::Http2Options(Environment* env) { nghttp2_option_new(&options_); + // We manually handle flow control within a session in order to + // implement backpressure -- that is, we only send WINDOW_UPDATE + // frames to the remote peer as data is actually consumed by user + // code. This ensures that the flow of data over the connection + // does not move too quickly and limits the amount of data we + // are required to buffer. nghttp2_option_set_no_auto_window_update(options_, 1); AliasedBuffer<uint32_t, v8::Uint32Array>& buffer = @@ -83,6 +110,10 @@ Http2Options::Http2Options(Environment* env) { buffer[IDX_OPTIONS_PEER_MAX_CONCURRENT_STREAMS]); } + // The padding strategy sets the mechanism by which we determine how much + // additional frame padding to apply to DATA and HEADERS frames. Currently + // this is set on a per-session basis, but eventually we may switch to + // a per-stream setting, giving users greater control if (flags & (1 << IDX_OPTIONS_PADDING_STRATEGY)) { padding_strategy_type strategy = static_cast<padding_strategy_type>( @@ -90,16 +121,27 @@ Http2Options::Http2Options(Environment* env) { SetPaddingStrategy(strategy); } + // The max header list pairs option controls the maximum number of + // header pairs the session may accept. This is a hard limit.. that is, + // if the remote peer sends more than this amount, the stream will be + // automatically closed with an RST_STREAM. if (flags & (1 << IDX_OPTIONS_MAX_HEADER_LIST_PAIRS)) { SetMaxHeaderPairs(buffer[IDX_OPTIONS_MAX_HEADER_LIST_PAIRS]); } + // The HTTP2 specification places no limits on the number of HTTP2 + // PING frames that can be sent. In order to prevent PINGS from being + // abused as an attack vector, however, we place a strict upper limit + // on the number of unacknowledged PINGS that can be sent at any given + // time. if (flags & (1 << IDX_OPTIONS_MAX_OUTSTANDING_PINGS)) { SetMaxOutstandingPings(buffer[IDX_OPTIONS_MAX_OUTSTANDING_PINGS]); } } - +// The Http2Settings class is used to configure a SETTINGS frame that is +// to be sent to the connected peer. The settings are set using a TypedArray +// that is shared with the JavaScript side. Http2Settings::Http2Settings(Environment* env) : env_(env) { entries_.AllocateSufficientStorage(IDX_SETTINGS_COUNT); AliasedBuffer<uint32_t, v8::Uint32Array>& buffer = @@ -160,6 +202,9 @@ Http2Settings::Http2Settings(Environment* env) : env_(env) { } +// Generates a Buffer that contains the serialized payload of a SETTINGS +// frame. This can be used, for instance, to create the Base64-encoded +// content of an Http2-Settings header field. inline Local<Value> Http2Settings::Pack() { const size_t len = count_ * 6; Local<Value> buf = Buffer::New(env_, len).ToLocalChecked(); @@ -173,27 +218,28 @@ inline Local<Value> Http2Settings::Pack() { return Undefined(env_->isolate()); } - +// Updates the shared TypedArray with the current remote or local settings for +// the session. inline void Http2Settings::Update(Environment* env, Http2Session* session, get_setting fn) { AliasedBuffer<uint32_t, v8::Uint32Array>& buffer = env->http2_state()->settings_buffer; buffer[IDX_SETTINGS_HEADER_TABLE_SIZE] = - fn(session->session(), NGHTTP2_SETTINGS_HEADER_TABLE_SIZE); + fn(**session, NGHTTP2_SETTINGS_HEADER_TABLE_SIZE); buffer[IDX_SETTINGS_MAX_CONCURRENT_STREAMS] = - fn(session->session(), NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS); + fn(**session, NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS); buffer[IDX_SETTINGS_INITIAL_WINDOW_SIZE] = - fn(session->session(), NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE); + fn(**session, NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE); buffer[IDX_SETTINGS_MAX_FRAME_SIZE] = - fn(session->session(), NGHTTP2_SETTINGS_MAX_FRAME_SIZE); + fn(**session, NGHTTP2_SETTINGS_MAX_FRAME_SIZE); buffer[IDX_SETTINGS_MAX_HEADER_LIST_SIZE] = - fn(session->session(), NGHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE); + fn(**session, NGHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE); buffer[IDX_SETTINGS_ENABLE_PUSH] = - fn(session->session(), NGHTTP2_SETTINGS_ENABLE_PUSH); + fn(**session, NGHTTP2_SETTINGS_ENABLE_PUSH); } - +// Initializes the shared TypedArray with the default settings values. inline void Http2Settings::RefreshDefaults(Environment* env) { AliasedBuffer<uint32_t, v8::Uint32Array>& buffer = env->http2_state()->settings_buffer; @@ -217,6 +263,9 @@ inline void Http2Settings::RefreshDefaults(Environment* env) { } +// The Http2Priority class initializes an appropriate nghttp2_priority_spec +// struct used when either creating a stream or updating its priority +// settings. Http2Priority::Http2Priority(Environment* env, Local<Value> parent, Local<Value> weight, @@ -241,7 +290,8 @@ inline const char* Http2Session::TypeName() { } } - +// The Headers class initializes a proper array of nghttp2_nv structs +// containing the header name value pairs. Headers::Headers(Isolate* isolate, Local<Context> context, Local<Array> headers) { @@ -299,8 +349,11 @@ Headers::Headers(Isolate* isolate, } +// Sets the various callback functions that nghttp2 will use to notify us +// about significant events while processing http2 stuff. Http2Session::Callbacks::Callbacks(bool kHasGetPaddingCallback) { CHECK_EQ(nghttp2_session_callbacks_new(&callbacks), 0); + nghttp2_session_callbacks_set_on_begin_headers_callback( callbacks, OnBeginHeadersCallback); nghttp2_session_callbacks_set_on_header_callback2( @@ -329,7 +382,6 @@ Http2Session::Callbacks::~Callbacks() { nghttp2_session_callbacks_del(callbacks); } - Http2Session::Http2Session(Environment* env, Local<Object> wrap, nghttp2_session_type type) @@ -337,6 +389,7 @@ Http2Session::Http2Session(Environment* env, session_type_(type) { MakeWeak<Http2Session>(this); + // Capture the configuration options for this session Http2Options opts(env); int32_t maxHeaderPairs = opts.GetMaxHeaderPairs(); @@ -368,37 +421,64 @@ Http2Session::Http2Session(Environment* env, CHECK_EQ(fn(&session_, callbacks, this, *opts), 0); } +void Http2Session::Unconsume() { + if (stream_ != nullptr) { + DEBUG_HTTP2SESSION(this, "unconsuming the i/o stream"); + stream_->set_destruct_cb({ nullptr, nullptr }); + stream_->set_alloc_cb({ nullptr, nullptr }); + stream_->set_read_cb({ nullptr, nullptr }); + stream_->Unconsume(); + stream_ = nullptr; + } +} Http2Session::~Http2Session() { + if (!object().IsEmpty()) + ClearWrap(object()); + persistent().Reset(); CHECK(persistent().IsEmpty()); - Close(); + Unconsume(); + DEBUG_HTTP2SESSION(this, "freeing nghttp2 session"); + nghttp2_session_del(session_); } - -void Http2Session::Close() { +// Closes the session and frees the associated resources +void Http2Session::Close(uint32_t code, bool socket_closed) { DEBUG_HTTP2SESSION(this, "closing session"); - if (!object().IsEmpty()) - ClearWrap(object()); - persistent().Reset(); - if (session_ == nullptr) + if (flags_ & SESSION_STATE_CLOSED) return; + flags_ |= SESSION_STATE_CLOSED; + + // Stop reading on the i/o stream + if (stream_ != nullptr) + stream_->ReadStop(); + + // If the socket is not closed, then attempt to send a closing GOAWAY + // frame. There is no guarantee that this GOAWAY will be received by + // the peer but the HTTP/2 spec recommends sendinng it anyway. We'll + // make a best effort. + if (!socket_closed) { + Http2Scope h2scope(this); + DEBUG_HTTP2SESSION2(this, "terminating session with code %d", code); + CHECK_EQ(nghttp2_session_terminate_session(session_, code), 0); + } else { + Unconsume(); + } - CHECK_EQ(nghttp2_session_terminate_session(session_, NGHTTP2_NO_ERROR), 0); - nghttp2_session_del(session_); - session_ = nullptr; - + // If there are outstanding pings, those will need to be canceled, do + // so on the next iteration of the event loop to avoid calling out into + // javascript since this may be called during garbage collection. while (!outstanding_pings_.empty()) { Http2Session::Http2Ping* ping = PopPing(); - // Since this method may be called from GC, calling into JS directly - // is not allowed. env()->SetImmediate([](Environment* env, void* data) { static_cast<Http2Session::Http2Ping*>(data)->Done(false); }, static_cast<void*>(ping)); } } - +// Locates an existing known stream by ID. nghttp2 has a similar method +// but this is faster and does not fail if the stream is not found. inline Http2Stream* Http2Session::FindStream(int32_t id) { auto s = streams_.find(id); return s != streams_.end() ? s->second : nullptr; @@ -414,14 +494,18 @@ inline void Http2Session::RemoveStream(int32_t id) { streams_.erase(id); } - +// Used as one of the Padding Strategy functions. Uses the maximum amount +// of padding allowed for the current frame. inline ssize_t Http2Session::OnMaxFrameSizePadding(size_t frameLen, size_t maxPayloadLen) { DEBUG_HTTP2SESSION2(this, "using max frame size padding: %d", maxPayloadLen); return maxPayloadLen; } - +// Used as one of the Padding Strategy functions. Uses a callback to JS land +// to determine the amount of padding for the current frame. This option is +// rather more expensive because of the JS boundary cross. It generally should +// not be the preferred option. inline ssize_t Http2Session::OnCallbackPadding(size_t frameLen, size_t maxPayloadLen) { DEBUG_HTTP2SESSION(this, "using callback to determine padding"); @@ -448,20 +532,10 @@ inline ssize_t Http2Session::OnCallbackPadding(size_t frameLen, } -// Submits a graceful shutdown notice to nghttp -// See: https://nghttp2.org/documentation/nghttp2_submit_shutdown_notice.html -inline void Http2Session::SubmitShutdownNotice() { - // Only an HTTP2 Server is permitted to send a shutdown notice - if (session_type_ == NGHTTP2_SESSION_CLIENT) - return; - DEBUG_HTTP2SESSION(this, "sending shutdown notice"); - // The only situation where this should fail is if the system is - // out of memory, which will cause other problems. Go ahead and crash - // in that case. - CHECK_EQ(nghttp2_submit_shutdown_notice(session_), 0); -} - - +// Sends a SETTINGS frame to the connected peer. This has the side effect of +// changing the settings state within the nghttp2_session, but those will +// only be considered active once the connected peer acknowledges the SETTINGS +// frame. // Note: This *must* send a SETTINGS frame even if niv == 0 inline void Http2Session::Settings(const nghttp2_settings_entry iv[], size_t niv) { @@ -475,25 +549,40 @@ inline void Http2Session::Settings(const nghttp2_settings_entry iv[], // Write data received from the i/o stream to the underlying nghttp2_session. +// On each call to nghttp2_session_mem_recv, nghttp2 will begin calling the +// various callback functions. Each of these will typically result in a call +// out to JavaScript so this particular function is rather hot and can be +// quite expensive. This is a potential performance optimization target later. inline ssize_t Http2Session::Write(const uv_buf_t* bufs, size_t nbufs) { size_t total = 0; // Note that nghttp2_session_mem_recv is a synchronous operation that // will trigger a number of other callbacks. Those will, in turn have // multiple side effects. for (size_t n = 0; n < nbufs; n++) { + DEBUG_HTTP2SESSION2(this, "receiving %d bytes [wants data? %d]", + bufs[n].len, + nghttp2_session_want_read(session_)); ssize_t ret = nghttp2_session_mem_recv(session_, reinterpret_cast<uint8_t*>(bufs[n].base), bufs[n].len); CHECK_NE(ret, NGHTTP2_ERR_NOMEM); - if (ret < 0) + // If there is an error calling any of the callbacks, ret will be a + // negative number identifying the error code. This can happen, for + // instance, if the session is destroyed during any of the JS callbacks + // Note: if ssize_t is not defined (e.g. on Win32), nghttp2 will typedef + // ssize_t to int. Cast here so that the < 0 check actually works on + // Windows. + if (static_cast<int>(ret) < 0) return ret; total += ret; } // Send any data that was queued up while processing the received data. - SendPendingData(); + if (!IsDestroyed()) { + SendPendingData(); + } return total; } @@ -506,6 +595,10 @@ inline int32_t GetFrameID(const nghttp2_frame* frame) { } +// Called by nghttp2 at the start of receiving a HEADERS frame. We use this +// callback to determine if a new stream is being created or if we are simply +// adding a new block of headers to an existing stream. The header pairs +// themselves are set in the OnHeaderCallback inline int Http2Session::OnBeginHeadersCallback(nghttp2_session* handle, const nghttp2_frame* frame, void* user_data) { @@ -517,12 +610,17 @@ inline int Http2Session::OnBeginHeadersCallback(nghttp2_session* handle, if (stream == nullptr) { new Http2Stream(session, id, frame->headers.cat); } else { + // If the stream has already been destroyed, ignore. + if (stream->IsDestroyed()) + return 0; stream->StartHeaders(frame->headers.cat); } return 0; } - +// Called by nghttp2 for each header name/value pair in a HEADERS block. +// This had to have been preceeded by a call to OnBeginHeadersCallback so +// the Http2Stream is guaranteed to already exist. inline int Http2Session::OnHeaderCallback(nghttp2_session* handle, const nghttp2_frame* frame, nghttp2_rcbuf* name, @@ -532,6 +630,10 @@ inline int Http2Session::OnHeaderCallback(nghttp2_session* handle, Http2Session* session = static_cast<Http2Session*>(user_data); int32_t id = GetFrameID(frame); Http2Stream* stream = session->FindStream(id); + CHECK_NE(stream, nullptr); + // If the stream has already been destroyed, ignore. + if (stream->IsDestroyed()) + return 0; if (!stream->AddHeader(name, value, flags)) { // This will only happen if the connected peer sends us more // than the allowed number of header items at any given time @@ -542,6 +644,8 @@ inline int Http2Session::OnHeaderCallback(nghttp2_session* handle, } +// Called by nghttp2 when a complete HTTP2 frame has been received. There are +// only a handful of frame types tha we care about handling here. inline int Http2Session::OnFrameReceive(nghttp2_session* handle, const nghttp2_frame* frame, void* user_data) { @@ -575,6 +679,12 @@ inline int Http2Session::OnFrameReceive(nghttp2_session* handle, } +// If nghttp2 is unable to send a queued up frame, it will call this callback +// to let us know. If the failure occurred because we are in the process of +// closing down the session or stream, we go ahead and ignore it. We don't +// really care about those and there's nothing we can reasonably do about it +// anyway. Other types of failures are reported up to JavaScript. This should +// be exceedingly rare. inline int Http2Session::OnFrameNotSent(nghttp2_session* handle, const nghttp2_frame* frame, int error_code, @@ -602,7 +712,7 @@ inline int Http2Session::OnFrameNotSent(nghttp2_session* handle, return 0; } - +// Called by nghttp2 when a stream closes. inline int Http2Session::OnStreamClose(nghttp2_session* handle, int32_t id, uint32_t code, @@ -615,17 +725,24 @@ inline int Http2Session::OnStreamClose(nghttp2_session* handle, Context::Scope context_scope(context); DEBUG_HTTP2SESSION2(session, "stream %d closed with code: %d", id, code); Http2Stream* stream = session->FindStream(id); - // Intentionally ignore the callback if the stream does not exist - if (stream != nullptr) { + // Intentionally ignore the callback if the stream does not exist or has + // already been destroyed + if (stream != nullptr && !stream->IsDestroyed()) { + stream->AddChunk(nullptr, 0); stream->Close(code); // It is possible for the stream close to occur before the stream is // ever passed on to the javascript side. If that happens, skip straight - // to destroying the stream + // to destroying the stream. We can check this by looking for the + // onstreamclose function. If it exists, then the stream has already + // been passed on to javascript. Local<Value> fn = stream->object()->Get(context, env->onstreamclose_string()) .ToLocalChecked(); if (fn->IsFunction()) { - Local<Value> argv[1] = { Integer::NewFromUnsigned(isolate, code) }; + Local<Value> argv[2] = { + Integer::NewFromUnsigned(isolate, code), + Boolean::New(isolate, stream->HasDataChunks(true)) + }; stream->MakeCallback(fn.As<Function>(), arraysize(argv), argv); } else { stream->Destroy(); @@ -634,7 +751,10 @@ inline int Http2Session::OnStreamClose(nghttp2_session* handle, return 0; } - +// Called by nghttp2 when an invalid header has been received. For now, we +// ignore these. If this callback was not provided, nghttp2 would handle +// invalid headers strictly and would shut down the stream. We are intentionally +// being more lenient here although we may want to revisit this choice later. inline int Http2Session::OnInvalidHeader(nghttp2_session* session, const nghttp2_frame* frame, nghttp2_rcbuf* name, @@ -645,7 +765,10 @@ inline int Http2Session::OnInvalidHeader(nghttp2_session* session, return 0; } - +// When nghttp2 receives a DATA frame, it will deliver the data payload to +// us in discrete chunks. We push these into a linked list stored in the +// Http2Sttream which is flushed out to JavaScript as quickly as possible. +// This can be a particularly hot path. inline int Http2Session::OnDataChunkReceived(nghttp2_session* handle, uint8_t flags, int32_t id, @@ -658,30 +781,39 @@ inline int Http2Session::OnDataChunkReceived(nghttp2_session* handle, // We should never actually get a 0-length chunk so this check is // only a precaution at this point. if (len > 0) { + // Notify nghttp2 that we've consumed a chunk of data on the connection + // so that it can send a WINDOW_UPDATE frame. This is a critical part of + // the flow control process in http2 CHECK_EQ(nghttp2_session_consume_connection(handle, len), 0); Http2Stream* stream = session->FindStream(id); + // If the stream has been destroyed, ignore this chunk + if (stream->IsDestroyed()) + return 0; stream->AddChunk(data, len); } return 0; } - -inline ssize_t Http2Session::OnSelectPadding(nghttp2_session* session, +// Called by nghttp2 when it needs to determine how much padding to use in +// a DATA or HEADERS frame. +inline ssize_t Http2Session::OnSelectPadding(nghttp2_session* handle, const nghttp2_frame* frame, size_t maxPayloadLen, void* user_data) { - Http2Session* handle = static_cast<Http2Session*>(user_data); + Http2Session* session = static_cast<Http2Session*>(user_data); ssize_t padding = frame->hd.length; - return handle->padding_strategy_ == PADDING_STRATEGY_MAX - ? handle->OnMaxFrameSizePadding(padding, maxPayloadLen) - : handle->OnCallbackPadding(padding, maxPayloadLen); + return session->padding_strategy_ == PADDING_STRATEGY_MAX + ? session->OnMaxFrameSizePadding(padding, maxPayloadLen) + : session->OnCallbackPadding(padding, maxPayloadLen); } #define BAD_PEER_MESSAGE "Remote peer returned unexpected data while we " \ "expected SETTINGS frame. Perhaps, peer does not " \ "support HTTP/2 properly." +// We use this currently to determine when an attempt is made to use the http2 +// protocol with a non-http2 peer. inline int Http2Session::OnNghttpError(nghttp2_session* handle, const char* message, size_t len, @@ -705,9 +837,11 @@ inline int Http2Session::OnNghttpError(nghttp2_session* handle, return 0; } - +// Once all of the DATA frames for a Stream have been sent, the GetTrailers +// method calls out to JavaScript to fetch the trailing headers that need +// to be sent. inline void Http2Session::GetTrailers(Http2Stream* stream, uint32_t* flags) { - if (stream->HasTrailers()) { + if (!stream->IsDestroyed() && stream->HasTrailers()) { Http2Stream::SubmitTrailers submit_trailers{this, stream, flags}; stream->OnTrailers(submit_trailers); } @@ -734,6 +868,9 @@ inline void Http2Stream::SubmitTrailers::Submit(nghttp2_nv* trailers, } +// Called by OnFrameReceived to notify JavaScript land that a complete +// HEADERS frame has been received and processed. This method converts the +// received headers into a JavaScript array and pushes those out to JS. inline void Http2Session::HandleHeadersFrame(const nghttp2_frame* frame) { Isolate* isolate = env()->isolate(); HandleScope scope(isolate); @@ -744,6 +881,10 @@ inline void Http2Session::HandleHeadersFrame(const nghttp2_frame* frame) { DEBUG_HTTP2SESSION2(this, "handle headers frame for stream %d", id); Http2Stream* stream = FindStream(id); + // If the stream has already been destroyed, ignore. + if (stream->IsDestroyed()) + return; + nghttp2_header* headers = stream->headers(); size_t count = stream->headers_count(); @@ -795,6 +936,10 @@ inline void Http2Session::HandleHeadersFrame(const nghttp2_frame* frame) { } +// Called by OnFrameReceived when a complete PRIORITY frame has been +// received. Notifies JS land about the priority change. Note that priorities +// are considered advisory only, so this has no real effect other than to +// simply let user code know that the priority has changed. inline void Http2Session::HandlePriorityFrame(const nghttp2_frame* frame) { Isolate* isolate = env()->isolate(); HandleScope scope(isolate); @@ -817,11 +962,19 @@ inline void Http2Session::HandlePriorityFrame(const nghttp2_frame* frame) { } +// Called by OnFrameReceived when a complete DATA frame has been received. +// If we know that this is the last DATA frame (because the END_STREAM flag +// is set), then we'll terminate the readable side of the StreamBase. If +// the StreamBase is flowing, we'll push the chunks of data out to JS land. inline void Http2Session::HandleDataFrame(const nghttp2_frame* frame) { int32_t id = GetFrameID(frame); DEBUG_HTTP2SESSION2(this, "handling data frame for stream %d", id); Http2Stream* stream = FindStream(id); + // If the stream has already been destroyed, do nothing + if (stream->IsDestroyed()) + return; + if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) { stream->AddChunk(nullptr, 0); } @@ -831,6 +984,7 @@ inline void Http2Session::HandleDataFrame(const nghttp2_frame* frame) { } +// Called by OnFrameReceived when a complete GOAWAY frame has been received. inline void Http2Session::HandleGoawayFrame(const nghttp2_frame* frame) { Isolate* isolate = env()->isolate(); HandleScope scope(isolate); @@ -856,6 +1010,7 @@ inline void Http2Session::HandleGoawayFrame(const nghttp2_frame* frame) { MakeCallback(env()->ongoawaydata_string(), arraysize(argv), argv); } +// Called by OnFrameReceived when a complete PING frame has been received. inline void Http2Session::HandlePingFrame(const nghttp2_frame* frame) { bool ack = frame->hd.flags & NGHTTP2_FLAG_ACK; if (ack) { @@ -865,7 +1020,7 @@ inline void Http2Session::HandlePingFrame(const nghttp2_frame* frame) { } } - +// Called by OnFrameReceived when a complete SETTINGS frame has been received. inline void Http2Session::HandleSettingsFrame(const nghttp2_frame* frame) { Isolate* isolate = env()->isolate(); HandleScope scope(isolate); @@ -878,6 +1033,10 @@ inline void Http2Session::HandleSettingsFrame(const nghttp2_frame* frame) { MakeCallback(env()->onsettings_string(), arraysize(argv), argv); } +// If the underlying nghttp2_session struct has data pending in its outbound +// queue, MaybeScheduleWrite will schedule a SendPendingData() call to occcur +// on the next iteration of the Node.js event loop (using the SetImmediate +// queue), but only if a write has not already been scheduled. void Http2Session::MaybeScheduleWrite() { CHECK_EQ(flags_ & SESSION_STATE_WRITE_SCHEDULED, 0); if (session_ != nullptr && nghttp2_session_want_write(session_)) { @@ -900,16 +1059,24 @@ void Http2Session::MaybeScheduleWrite() { } } - +// Prompts nghttp2 to begin serializing it's pending data and pushes each +// chunk out to the i/o socket to be sent. This is a particularly hot method +// that will generally be called at least twice be event loop iteration. +// This is a potential performance optimization target later. void Http2Session::SendPendingData() { DEBUG_HTTP2SESSION(this, "sending pending data"); // Do not attempt to send data on the socket if the destroying flag has // been set. That means everything is shutting down and the socket // will not be usable. - if (IsDestroying()) + if (IsDestroyed()) return; flags_ &= ~SESSION_STATE_WRITE_SCHEDULED; + // SendPendingData should not be called recursively. + if (flags_ & SESSION_STATE_SENDING) + return; + flags_ |= SESSION_STATE_SENDING; + WriteWrap* req = nullptr; char* dest = nullptr; size_t destRemaining = 0; @@ -958,14 +1125,17 @@ void Http2Session::SendPendingData() { } } CHECK_NE(srcLength, NGHTTP2_ERR_NOMEM); - - if (destLength > 0) { + if (destLength > 0 && srcLength >= 0) { DEBUG_HTTP2SESSION2(this, "pushing %d bytes to the socket", destLength); Send(req, dest, destLength); } -} + DEBUG_HTTP2SESSION2(this, "wants data in return? %d", + nghttp2_session_want_read(session_)); + flags_ &= ~SESSION_STATE_SENDING; +} +// Creates a new Http2Stream and submits a new http2 request. inline Http2Stream* Http2Session::SubmitRequest( nghttp2_priority_spec* prispec, nghttp2_nv* nva, @@ -987,7 +1157,7 @@ inline void Http2Session::SetChunksSinceLastWrite(size_t n) { chunks_sent_since_last_write_ = n; } - +// Allocates the data buffer used to pass outbound data to the i/o stream. WriteWrap* Http2Session::AllocateSend() { HandleScope scope(env()->isolate()); Local<Object> obj = @@ -1002,12 +1172,11 @@ WriteWrap* Http2Session::AllocateSend() { return WriteWrap::New(env(), obj, stream_, size + 9); } +// Pushes chunks of data to the i/o stream. void Http2Session::Send(WriteWrap* req, char* buf, size_t length) { - DEBUG_HTTP2SESSION(this, "attempting to send data"); - if (stream_ == nullptr || !stream_->IsAlive() || stream_->IsClosing()) { + DEBUG_HTTP2SESSION2(this, "attempting to send %d bytes", length); + if (stream_ == nullptr) return; - } - chunks_sent_since_last_write_++; uv_buf_t actual = uv_buf_init(buf, length); if (stream_->DoWrite(req, &actual, 1, nullptr)) { @@ -1015,7 +1184,7 @@ void Http2Session::Send(WriteWrap* req, char* buf, size_t length) { } } - +// Allocates the data buffer used to receive inbound data from the i/o stream void Http2Session::OnStreamAllocImpl(size_t suggested_size, uv_buf_t* buf, void* ctx) { @@ -1024,13 +1193,14 @@ void Http2Session::OnStreamAllocImpl(size_t suggested_size, buf->len = kAllocBufferSize; } - +// Callback used to receive inbound data from the i/o stream void Http2Session::OnStreamReadImpl(ssize_t nread, const uv_buf_t* bufs, uv_handle_type pending, void* ctx) { Http2Session* session = static_cast<Http2Session*>(ctx); Http2Scope h2scope(session); + DEBUG_HTTP2SESSION2(session, "receiving %d bytes", nread); if (nread < 0) { uv_buf_t tmp_buf; tmp_buf.base = nullptr; @@ -1041,19 +1211,42 @@ void Http2Session::OnStreamReadImpl(ssize_t nread, session->prev_read_cb_.ctx); return; } - if (nread > 0) { + if (bufs->len > 0) { // Only pass data on if nread > 0 uv_buf_t buf[] { uv_buf_init((*bufs).base, nread) }; ssize_t ret = session->Write(buf, 1); - if (ret < 0) { + + // Note: if ssize_t is not defined (e.g. on Win32), nghttp2 will typedef + // ssize_t to int. Cast here so that the < 0 check actually works on + // Windows. + if (static_cast<int>(ret) < 0) { DEBUG_HTTP2SESSION2(session, "fatal error receiving data: %d", ret); - CHECK_EQ(nghttp2_session_terminate_session(session->session(), - NGHTTP2_PROTOCOL_ERROR), 0); + Environment* env = session->env(); + Isolate* isolate = env->isolate(); + HandleScope scope(isolate); + Local<Context> context = env->context(); + Context::Scope context_scope(context); + + Local<Value> argv[1] = { + Integer::New(isolate, ret), + }; + session->MakeCallback(env->error_string(), arraysize(argv), argv); + } else { + DEBUG_HTTP2SESSION2(session, "processed %d bytes. wants more? %d", ret, + nghttp2_session_want_read(**session)); } } } +void Http2Session::OnStreamDestructImpl(void* ctx) { + Http2Session* session = static_cast<Http2Session*>(ctx); + session->stream_ = nullptr; +} +// Every Http2Session session is tightly bound to a single i/o StreamBase +// (typically a net.Socket or tls.TLSSocket). The lifecycle of the two is +// tightly coupled with all data transfer between the two happening at the +// C++ layer via the StreamBase API. void Http2Session::Consume(Local<External> external) { StreamBase* stream = static_cast<StreamBase*>(external->Value()); stream->Consume(); @@ -1062,24 +1255,11 @@ void Http2Session::Consume(Local<External> external) { prev_read_cb_ = stream->read_cb(); stream->set_alloc_cb({ Http2Session::OnStreamAllocImpl, this }); stream->set_read_cb({ Http2Session::OnStreamReadImpl, this }); + stream->set_destruct_cb({ Http2Session::OnStreamDestructImpl, this }); DEBUG_HTTP2SESSION(this, "i/o stream consumed"); } -void Http2Session::Unconsume() { - if (prev_alloc_cb_.is_empty()) - return; - stream_->set_alloc_cb(prev_alloc_cb_); - stream_->set_read_cb(prev_read_cb_); - prev_alloc_cb_.clear(); - prev_read_cb_.clear(); - stream_ = nullptr; - DEBUG_HTTP2SESSION(this, "i/o stream unconsumed"); -} - - - - Http2Stream::Http2Stream( Http2Session* session, int32_t id, @@ -1119,26 +1299,37 @@ Http2Stream::Http2Stream( Http2Stream::~Http2Stream() { - CHECK(persistent().IsEmpty()); + if (session_ != nullptr) { + session_->RemoveStream(id_); + session_ = nullptr; + } + if (!object().IsEmpty()) ClearWrap(object()); persistent().Reset(); + CHECK(persistent().IsEmpty()); } +// Notify the Http2Stream that a new block of HEADERS is being processed. void Http2Stream::StartHeaders(nghttp2_headers_category category) { DEBUG_HTTP2STREAM2(this, "starting headers, category: %d", id_, category); + CHECK(!this->IsDestroyed()); current_headers_length_ = 0; current_headers_.clear(); current_headers_category_ = category; } + nghttp2_stream* Http2Stream::operator*() { return nghttp2_session_find_stream(**session_, id_); } +// Calls out to JavaScript land to fetch the actual trailer headers to send +// for this stream. void Http2Stream::OnTrailers(const SubmitTrailers& submit_trailers) { DEBUG_HTTP2STREAM(this, "prompting for trailers"); + CHECK(!this->IsDestroyed()); Isolate* isolate = env()->isolate(); HandleScope scope(isolate); Local<Context> context = env()->context(); @@ -1146,7 +1337,7 @@ void Http2Stream::OnTrailers(const SubmitTrailers& submit_trailers) { Local<Value> ret = MakeCallback(env()->ontrailers_string(), 0, nullptr).ToLocalChecked(); - if (!ret.IsEmpty()) { + if (!ret.IsEmpty() && !IsDestroyed()) { if (ret->IsArray()) { Local<Array> headers = ret.As<Array>(); if (headers->Length() > 0) { @@ -1157,21 +1348,36 @@ void Http2Stream::OnTrailers(const SubmitTrailers& submit_trailers) { } } +inline bool Http2Stream::HasDataChunks(bool ignore_eos) { + return data_chunks_.size() > (ignore_eos ? 1 : 0); +} +// Appends a chunk of received DATA frame data to this Http2Streams internal +// queue. Note that we must memcpy each chunk because of the way that nghttp2 +// handles it's internal memory`. inline void Http2Stream::AddChunk(const uint8_t* data, size_t len) { + CHECK(!this->IsDestroyed()); + if (flags_ & NGHTTP2_STREAM_FLAG_EOS) + return; char* buf = nullptr; - if (len > 0) { + if (len > 0 && data != nullptr) { buf = Malloc<char>(len); memcpy(buf, data, len); + } else if (data == nullptr) { + flags_ |= NGHTTP2_STREAM_FLAG_EOS; } data_chunks_.emplace(uv_buf_init(buf, len)); } - +// The Http2Stream class is a subclass of StreamBase. The DoWrite method +// receives outbound chunks of data to send as outbound DATA frames. These +// are queued in an internal linked list of uv_buf_t structs that are sent +// when nghttp2 is ready to serialize the data frame. int Http2Stream::DoWrite(WriteWrap* req_wrap, uv_buf_t* bufs, size_t count, uv_stream_t* send_handle) { + CHECK(!this->IsDestroyed()); session_->SetChunksSinceLastWrite(); nghttp2_stream_write_t* req = new nghttp2_stream_write_t; @@ -1189,6 +1395,7 @@ int Http2Stream::DoWrite(WriteWrap* req_wrap, inline void Http2Stream::Close(int32_t code) { + CHECK(!this->IsDestroyed()); flags_ |= NGHTTP2_STREAM_FLAG_CLOSED; code_ = code; DEBUG_HTTP2STREAM2(this, "closed with code %d", code); @@ -1196,6 +1403,7 @@ inline void Http2Stream::Close(int32_t code) { inline void Http2Stream::Shutdown() { + CHECK(!this->IsDestroyed()); Http2Scope h2scope(this); flags_ |= NGHTTP2_STREAM_FLAG_SHUT; CHECK_NE(nghttp2_session_resume_data(session_->session(), id_), @@ -1204,26 +1412,23 @@ inline void Http2Stream::Shutdown() { } int Http2Stream::DoShutdown(ShutdownWrap* req_wrap) { + CHECK(!this->IsDestroyed()); req_wrap->Dispatched(); Shutdown(); req_wrap->Done(0); return 0; } +// Destroy the Http2Stream and render it unusable. Actual resources for the +// Stream will not be freed until the next tick of the Node.js event loop +// using the SetImmediate queue. inline void Http2Stream::Destroy() { - Http2Scope h2scope(this); - DEBUG_HTTP2STREAM(this, "destroying stream"); // Do nothing if this stream instance is already destroyed if (IsDestroyed()) return; - flags_ |= NGHTTP2_STREAM_FLAG_DESTROYED; - Http2Session* session = this->session_; - if (session != nullptr) { - session_->RemoveStream(id_); - session_ = nullptr; - } + DEBUG_HTTP2STREAM(this, "destroying stream"); // Free any remaining incoming data chunks. while (!data_chunks_.empty()) { @@ -1232,24 +1437,30 @@ inline void Http2Stream::Destroy() { data_chunks_.pop(); } - // Free any remaining outgoing data chunks. - while (!queue_.empty()) { - nghttp2_stream_write* head = queue_.front(); - head->cb(head->req, UV_ECANCELED); - delete head; - queue_.pop(); - } - - if (!object().IsEmpty()) - ClearWrap(object()); - persistent().Reset(); + // Wait until the start of the next loop to delete because there + // may still be some pending operations queued for this stream. + env()->SetImmediate([](Environment* env, void* data) { + Http2Stream* stream = static_cast<Http2Stream*>(data); + + // Free any remaining outgoing data chunks here. This should be done + // here because it's possible for destroy to have been called while + // we still have qeueued outbound writes. + while (!stream->queue_.empty()) { + nghttp2_stream_write* head = stream->queue_.front(); + head->cb(head->req, UV_ECANCELED); + delete head; + stream->queue_.pop(); + } - delete this; + delete stream; + }, this, this->object()); } -void Http2Stream::OnDataChunk( - uv_buf_t* chunk) { +// Uses the StreamBase API to push a single chunk of queued inbound DATA +// to JS land. +void Http2Stream::OnDataChunk(uv_buf_t* chunk) { + CHECK(!this->IsDestroyed()); Isolate* isolate = env()->isolate(); HandleScope scope(isolate); ssize_t len = -1; @@ -1263,6 +1474,7 @@ void Http2Stream::OnDataChunk( inline void Http2Stream::FlushDataChunks() { + CHECK(!this->IsDestroyed()); Http2Scope h2scope(this); if (!data_chunks_.empty()) { uv_buf_t buf = data_chunks_.front(); @@ -1278,9 +1490,12 @@ inline void Http2Stream::FlushDataChunks() { } +// Initiates a response on the Http2Stream using data provided via the +// StreamBase Streams API. inline int Http2Stream::SubmitResponse(nghttp2_nv* nva, size_t len, int options) { + CHECK(!this->IsDestroyed()); Http2Scope h2scope(this); DEBUG_HTTP2STREAM(this, "submitting response"); if (options & STREAM_OPTION_GET_TRAILERS) @@ -1302,6 +1517,7 @@ inline int Http2Stream::SubmitFile(int fd, int64_t offset, int64_t length, int options) { + CHECK(!this->IsDestroyed()); Http2Scope h2scope(this); DEBUG_HTTP2STREAM(this, "submitting file"); if (options & STREAM_OPTION_GET_TRAILERS) @@ -1319,6 +1535,7 @@ inline int Http2Stream::SubmitFile(int fd, // Submit informational headers for a stream. inline int Http2Stream::SubmitInfo(nghttp2_nv* nva, size_t len) { + CHECK(!this->IsDestroyed()); Http2Scope h2scope(this); DEBUG_HTTP2STREAM2(this, "sending %d informational headers", len); int ret = nghttp2_submit_headers(session_->session(), @@ -1329,9 +1546,10 @@ inline int Http2Stream::SubmitInfo(nghttp2_nv* nva, size_t len) { return ret; } - +// Submit a PRIORITY frame to the connected peer. inline int Http2Stream::SubmitPriority(nghttp2_priority_spec* prispec, bool silent) { + CHECK(!this->IsDestroyed()); Http2Scope h2scope(this); DEBUG_HTTP2STREAM(this, "sending priority spec"); int ret = silent ? @@ -1344,27 +1562,28 @@ inline int Http2Stream::SubmitPriority(nghttp2_priority_spec* prispec, return ret; } - -inline int Http2Stream::SubmitRstStream(const uint32_t code) { +// Closes the Http2Stream by submitting an RST_STREAM frame to the connected +// peer. +inline void Http2Stream::SubmitRstStream(const uint32_t code) { + CHECK(!this->IsDestroyed()); Http2Scope h2scope(this); - DEBUG_HTTP2STREAM2(this, "sending rst-stream with code %d", code); + // Force a purge of any currently pending data here to make sure + // it is sent before closing the stream. session_->SendPendingData(); - CHECK_EQ(nghttp2_submit_rst_stream(session_->session(), - NGHTTP2_FLAG_NONE, - id_, - code), 0); - return 0; + CHECK_EQ(nghttp2_submit_rst_stream(**session_, NGHTTP2_FLAG_NONE, + id_, code), 0); } -// Submit a push promise. +// Submit a push promise and create the associated Http2Stream if successful. inline Http2Stream* Http2Stream::SubmitPushPromise(nghttp2_nv* nva, size_t len, int32_t* ret, int options) { + CHECK(!this->IsDestroyed()); Http2Scope h2scope(this); DEBUG_HTTP2STREAM(this, "sending push promise"); - *ret = nghttp2_submit_push_promise(session_->session(), NGHTTP2_FLAG_NONE, + *ret = nghttp2_submit_push_promise(**session_, NGHTTP2_FLAG_NONE, id_, nva, len, nullptr); CHECK_NE(*ret, NGHTTP2_ERR_NOMEM); Http2Stream* stream = nullptr; @@ -1374,7 +1593,10 @@ inline Http2Stream* Http2Stream::SubmitPushPromise(nghttp2_nv* nva, return stream; } +// Switch the StreamBase into flowing mode to begin pushing chunks of data +// out to JS land. inline int Http2Stream::ReadStart() { + CHECK(!this->IsDestroyed()); flags_ |= NGHTTP2_STREAM_FLAG_READ_START; flags_ &= ~NGHTTP2_STREAM_FLAG_READ_PAUSED; @@ -1384,8 +1606,9 @@ inline int Http2Stream::ReadStart() { return 0; } - +// Switch the StreamBase into paused mode. inline int Http2Stream::ReadStop() { + CHECK(!this->IsDestroyed()); if (!IsReading()) return 0; flags_ |= NGHTTP2_STREAM_FLAG_READ_PAUSED; @@ -1402,6 +1625,7 @@ inline int Http2Stream::Write(nghttp2_stream_write_t* req, const uv_buf_t bufs[], unsigned int nbufs, nghttp2_stream_write_cb cb) { + CHECK(!this->IsDestroyed()); Http2Scope h2scope(this); if (!IsWritable()) { if (cb != nullptr) @@ -1424,9 +1648,15 @@ inline size_t GetBufferLength(nghttp2_rcbuf* buf) { return nghttp2_rcbuf_get_buf(buf).len; } +// Ads a header to the Http2Stream. Note that the header name and value are +// provided using a buffer structure provided by nghttp2 that allows us to +// avoid unnecessary memcpy's. Those buffers are ref counted. The ref count +// is incremented here and are decremented when the header name and values +// are garbage collected later. inline bool Http2Stream::AddHeader(nghttp2_rcbuf* name, nghttp2_rcbuf* value, uint8_t flags) { + CHECK(!this->IsDestroyed()); size_t length = GetBufferLength(name) + GetBufferLength(value) + 32; if (current_headers_.size() == max_header_pairs_ || current_headers_length_ + length > max_header_length_) { @@ -1455,7 +1685,9 @@ Http2Stream* GetStream(Http2Session* session, return stream; } +// A Provider is the thing that provides outbound DATA frame data. Http2Stream::Provider::Provider(Http2Stream* stream, int options) { + CHECK(!stream->IsDestroyed()); provider_.source.ptr = stream; empty_ = options & STREAM_OPTION_EMPTY_PAYLOAD; } @@ -1469,8 +1701,12 @@ Http2Stream::Provider::~Provider() { provider_.source.ptr = nullptr; } +// The FD Provider pulls data from a file descriptor using libuv. All of the +// data transfer occurs in C++, without any chunks being passed through JS +// land. Http2Stream::Provider::FD::FD(Http2Stream* stream, int options, int fd) : Http2Stream::Provider(stream, options) { + CHECK(!stream->IsDestroyed()); provider_.source.fd = fd; provider_.read_callback = Http2Stream::Provider::FD::OnRead; } @@ -1490,6 +1726,7 @@ ssize_t Http2Stream::Provider::FD::OnRead(nghttp2_session* handle, void* user_data) { Http2Session* session = static_cast<Http2Session*>(user_data); Http2Stream* stream = session->FindStream(id); + DEBUG_HTTP2SESSION2(session, "reading outbound file data for stream %d", id); CHECK_EQ(id, stream->id()); @@ -1524,16 +1761,26 @@ ssize_t Http2Stream::Provider::FD::OnRead(nghttp2_session* handle, stream->fd_offset_ += numchars; stream->fd_length_ -= numchars; + DEBUG_HTTP2SESSION2(session, "sending %d bytes", numchars); + // if numchars < length, assume that we are done. if (static_cast<size_t>(numchars) < length || length <= 0) { DEBUG_HTTP2SESSION2(session, "no more data for stream %d", id); *flags |= NGHTTP2_DATA_FLAG_EOF; session->GetTrailers(stream, flags); + // If the stream or session gets destroyed during the GetTrailers + // callback, check that here and close down the stream + if (stream->IsDestroyed()) + return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE; + if (session->IsDestroyed()) + return NGHTTP2_ERR_CALLBACK_FAILURE; } return numchars; } +// The Stream Provider pulls data from a linked list of uv_buf_t structs +// built via the StreamBase API and the Streams js API. Http2Stream::Provider::Stream::Stream(int options) : Http2Stream::Provider(options) { provider_.read_callback = Http2Stream::Provider::Stream::OnRead; @@ -1593,10 +1840,14 @@ ssize_t Http2Stream::Provider::Stream::OnRead(nghttp2_session* handle, if (stream->queue_.empty() && !stream->IsWritable()) { DEBUG_HTTP2SESSION2(session, "no more data for stream %d", id); *flags |= NGHTTP2_DATA_FLAG_EOF; - session->GetTrailers(stream, flags); + // If the stream or session gets destroyed during the GetTrailers + // callback, check that here and close down the stream + if (stream->IsDestroyed()) + return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE; + if (session->IsDestroyed()) + return NGHTTP2_ERR_CALLBACK_FAILURE; } - return amount; } @@ -1604,6 +1855,8 @@ ssize_t Http2Stream::Provider::Stream::OnRead(nghttp2_session* handle, // Implementation of the JavaScript API +// Fetches the string description of a nghttp2 error code and passes that +// back to JS land void HttpErrorString(const FunctionCallbackInfo<Value>& args) { Environment* env = Environment::GetCurrent(args); uint32_t val = args[0]->Uint32Value(env->context()).ToChecked(); @@ -1624,13 +1877,15 @@ void PackSettings(const FunctionCallbackInfo<Value>& args) { args.GetReturnValue().Set(settings.Pack()); } - +// A TypedArray instance is shared between C++ and JS land to contain the +// default SETTINGS. RefreshDefaultSettings updates that TypedArray with the +// default values. void RefreshDefaultSettings(const FunctionCallbackInfo<Value>& args) { Environment* env = Environment::GetCurrent(args); Http2Settings::RefreshDefaults(env); } - +// Sets the next stream ID the Http2Session. If successful, returns true. void Http2Session::SetNextStreamID(const FunctionCallbackInfo<Value>& args) { Environment* env = Environment::GetCurrent(args); Http2Session* session; @@ -1644,7 +1899,9 @@ void Http2Session::SetNextStreamID(const FunctionCallbackInfo<Value>& args) { DEBUG_HTTP2SESSION2(session, "set next stream id to %d", id); } - +// A TypedArray instance is shared between C++ and JS land to contain the +// SETTINGS (either remote or local). RefreshSettings updates the current +// values established for each of the settings so those can be read in JS land. template <get_setting fn> void Http2Session::RefreshSettings(const FunctionCallbackInfo<Value>& args) { Environment* env = Environment::GetCurrent(args); @@ -1654,7 +1911,9 @@ void Http2Session::RefreshSettings(const FunctionCallbackInfo<Value>& args) { DEBUG_HTTP2SESSION(session, "settings refreshed for session"); } - +// A TypedArray instance is shared between C++ and JS land to contain state +// information of the current Http2Session. This updates the values in the +// TypedRray so those can be read in JS land. void Http2Session::RefreshState(const FunctionCallbackInfo<Value>& args) { Environment* env = Environment::GetCurrent(args); Http2Session* session; @@ -1687,6 +1946,7 @@ void Http2Session::RefreshState(const FunctionCallbackInfo<Value>& args) { } +// Constructor for new Http2Session instances. void Http2Session::New(const FunctionCallbackInfo<Value>& args) { Environment* env = Environment::GetCurrent(args); CHECK(args.IsConstructCall()); @@ -1698,6 +1958,7 @@ void Http2Session::New(const FunctionCallbackInfo<Value>& args) { } +// Binds the Http2Session with a StreamBase used for i/o void Http2Session::Consume(const FunctionCallbackInfo<Value>& args) { Http2Session* session; ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder()); @@ -1705,31 +1966,21 @@ void Http2Session::Consume(const FunctionCallbackInfo<Value>& args) { session->Consume(args[0].As<External>()); } - +// Destroys the Http2Session instance and renders it unusable void Http2Session::Destroy(const FunctionCallbackInfo<Value>& args) { Http2Session* session; ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder()); DEBUG_HTTP2SESSION(session, "destroying session"); - Environment* env = Environment::GetCurrent(args); Local<Context> context = env->context(); - bool skipUnconsume = args[0]->BooleanValue(context).ToChecked(); - - if (!skipUnconsume) - session->Unconsume(); - session->Close(); -} - + uint32_t code = args[0]->Uint32Value(context).ToChecked(); + bool socketDestroyed = args[1]->BooleanValue(context).ToChecked(); -void Http2Session::Destroying(const FunctionCallbackInfo<Value>& args) { - Http2Session* session; - ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder()); - session->MarkDestroying(); - DEBUG_HTTP2SESSION(session, "preparing to destroy session"); + session->Close(code, socketDestroyed); } - +// Submits a SETTINGS frame for the Http2Session void Http2Session::Settings(const FunctionCallbackInfo<Value>& args) { Http2Session* session; ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder()); @@ -1740,7 +1991,8 @@ void Http2Session::Settings(const FunctionCallbackInfo<Value>& args) { DEBUG_HTTP2SESSION(session, "settings submitted"); } - +// Submits a new request on the Http2Session and returns either an error code +// or the Http2Stream object. void Http2Session::Request(const FunctionCallbackInfo<Value>& args) { Http2Session* session; ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder()); @@ -1772,49 +2024,50 @@ void Http2Session::Request(const FunctionCallbackInfo<Value>& args) { args.GetReturnValue().Set(stream->object()); } +// Submits a GOAWAY frame to signal that the Http2Session is in the process +// of shutting down. Note that this function does not actually alter the +// state of the Http2Session, it's simply a notification. +void Http2Session::Goaway(uint32_t code, + int32_t lastStreamID, + uint8_t* data, + size_t len) { + if (IsDestroyed()) + return; -void Http2Session::ShutdownNotice(const FunctionCallbackInfo<Value>& args) { - Http2Session* session; - ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder()); - session->SubmitShutdownNotice(); - DEBUG_HTTP2SESSION(session, "shutdown notice sent"); + Http2Scope h2scope(this); + // the last proc stream id is the most recently created Http2Stream. + if (lastStreamID <= 0) + lastStreamID = nghttp2_session_get_last_proc_stream_id(session_); + DEBUG_HTTP2SESSION(this, "submitting goaway"); + nghttp2_submit_goaway(session_, NGHTTP2_FLAG_NONE, + lastStreamID, code, data, len); } - +// Submits a GOAWAY frame to signal that the Http2Session is in the process +// of shutting down. The opaque data argument is an optional TypedArray that +// can be used to send debugging data to the connected peer. void Http2Session::Goaway(const FunctionCallbackInfo<Value>& args) { - Http2Session* session; Environment* env = Environment::GetCurrent(args); Local<Context> context = env->context(); + Http2Session* session; ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder()); - Http2Scope h2scope(session); - uint32_t errorCode = args[0]->Uint32Value(context).ToChecked(); + uint32_t code = args[0]->Uint32Value(context).ToChecked(); int32_t lastStreamID = args[1]->Int32Value(context).ToChecked(); Local<Value> opaqueData = args[2]; - uint8_t* data = nullptr; size_t length = 0; - if (opaqueData->BooleanValue(context).ToChecked()) { - THROW_AND_RETURN_UNLESS_BUFFER(env, opaqueData); - SPREAD_BUFFER_ARG(opaqueData, buf); - data = reinterpret_cast<uint8_t*>(buf_data); - length = buf_length; + if (Buffer::HasInstance(opaqueData)) { + data = reinterpret_cast<uint8_t*>(Buffer::Data(opaqueData)); + length = Buffer::Length(opaqueData); } - int status = nghttp2_submit_goaway(session->session(), - NGHTTP2_FLAG_NONE, - lastStreamID, - errorCode, - data, length); - CHECK_NE(status, NGHTTP2_ERR_NOMEM); - args.GetReturnValue().Set(status); - DEBUG_HTTP2SESSION2(session, "immediate shutdown initiated with " - "last stream id %d, code %d, and opaque-data length %d", - lastStreamID, errorCode, length); + session->Goaway(code, lastStreamID, data, length); } - +// Update accounting of data chunks. This is used primarily to manage timeout +// logic when using the FD Provider. void Http2Session::UpdateChunksSent(const FunctionCallbackInfo<Value>& args) { Environment* env = Environment::GetCurrent(args); Isolate* isolate = env->isolate(); @@ -1831,18 +2084,21 @@ void Http2Session::UpdateChunksSent(const FunctionCallbackInfo<Value>& args) { args.GetReturnValue().Set(length); } - +// Submits an RST_STREAM frame effectively closing the Http2Stream. Note that +// this *WILL* alter the state of the stream, causing the OnStreamClose +// callback to the triggered. void Http2Stream::RstStream(const FunctionCallbackInfo<Value>& args) { Environment* env = Environment::GetCurrent(args); Local<Context> context = env->context(); Http2Stream* stream; ASSIGN_OR_RETURN_UNWRAP(&stream, args.Holder()); uint32_t code = args[0]->Uint32Value(context).ToChecked(); - args.GetReturnValue().Set(stream->SubmitRstStream(code)); - DEBUG_HTTP2STREAM2(stream, "rst_stream code %d sent", code); + DEBUG_HTTP2STREAM2(stream, "sending rst_stream with code %d", code); + stream->SubmitRstStream(code); } - +// Initiates a response on the Http2Stream using the StreamBase API to provide +// outbound DATA frames. void Http2Stream::Respond(const FunctionCallbackInfo<Value>& args) { Environment* env = Environment::GetCurrent(args); Local<Context> context = env->context(); @@ -1860,7 +2116,8 @@ void Http2Stream::Respond(const FunctionCallbackInfo<Value>& args) { DEBUG_HTTP2STREAM(stream, "response submitted"); } - +// Initiates a response on the Http2Stream using a file descriptor to provide +// outbound DATA frames. void Http2Stream::RespondFD(const FunctionCallbackInfo<Value>& args) { Environment* env = Environment::GetCurrent(args); Local<Context> context = env->context(); @@ -1883,7 +2140,7 @@ void Http2Stream::RespondFD(const FunctionCallbackInfo<Value>& args) { DEBUG_HTTP2STREAM2(stream, "file response submitted for fd %d", fd); } - +// Submits informational headers on the Http2Stream void Http2Stream::Info(const FunctionCallbackInfo<Value>& args) { Environment* env = Environment::GetCurrent(args); Local<Context> context = env->context(); @@ -1899,14 +2156,14 @@ void Http2Stream::Info(const FunctionCallbackInfo<Value>& args) { headers->Length()); } - +// Grab the numeric id of the Http2Stream void Http2Stream::GetID(const FunctionCallbackInfo<Value>& args) { Http2Stream* stream; ASSIGN_OR_RETURN_UNWRAP(&stream, args.Holder()); args.GetReturnValue().Set(stream->id()); } - +// Destroy the Http2Stream, rendering it no longer usable void Http2Stream::Destroy(const FunctionCallbackInfo<Value>& args) { Http2Stream* stream; ASSIGN_OR_RETURN_UNWRAP(&stream, args.Holder()); @@ -1914,7 +2171,7 @@ void Http2Stream::Destroy(const FunctionCallbackInfo<Value>& args) { stream->Destroy(); } - +// Prompt the Http2Stream to begin sending data to the JS land. void Http2Stream::FlushData(const FunctionCallbackInfo<Value>& args) { Http2Stream* stream; ASSIGN_OR_RETURN_UNWRAP(&stream, args.Holder()); @@ -1922,7 +2179,7 @@ void Http2Stream::FlushData(const FunctionCallbackInfo<Value>& args) { DEBUG_HTTP2STREAM(stream, "data flushed to js"); } - +// Initiate a Push Promise and create the associated Http2Stream void Http2Stream::PushPromise(const FunctionCallbackInfo<Value>& args) { Environment* env = Environment::GetCurrent(args); Local<Context> context = env->context(); @@ -1948,7 +2205,7 @@ void Http2Stream::PushPromise(const FunctionCallbackInfo<Value>& args) { args.GetReturnValue().Set(stream->object()); } - +// Send a PRIORITY frame void Http2Stream::Priority(const FunctionCallbackInfo<Value>& args) { Environment* env = Environment::GetCurrent(args); Local<Context> context = env->context(); @@ -1962,7 +2219,9 @@ void Http2Stream::Priority(const FunctionCallbackInfo<Value>& args) { DEBUG_HTTP2STREAM(stream, "priority submitted"); } - +// A TypedArray shared by C++ and JS land is used to communicate state +// information about the Http2Stream. This updates the values in that +// TypedArray so that the state can be read by JS. void Http2Stream::RefreshState(const FunctionCallbackInfo<Value>& args) { Environment* env = Environment::GetCurrent(args); Http2Stream* stream; @@ -1999,11 +2258,14 @@ void Http2Stream::RefreshState(const FunctionCallbackInfo<Value>& args) { } } +// Submits a PING frame to be sent to the connected peer. void Http2Session::Ping(const FunctionCallbackInfo<Value>& args) { Environment* env = Environment::GetCurrent(args); Http2Session* session; ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder()); + // A PING frame may have exactly 8 bytes of payload data. If not provided, + // then the current hrtime will be used as the payload. uint8_t* payload = nullptr; if (Buffer::HasInstance(args[0])) { payload = reinterpret_cast<uint8_t*>(Buffer::Data(args[0])); @@ -2014,11 +2276,18 @@ void Http2Session::Ping(const FunctionCallbackInfo<Value>& args) { Local<Object> obj = ping->object(); obj->Set(env->context(), env->ondone_string(), args[1]).FromJust(); + // To prevent abuse, we strictly limit the number of unacknowledged PING + // frames that may be sent at any given time. This is configurable in the + // Options when creating a Http2Session. if (!session->AddPing(ping)) { ping->Done(false); return args.GetReturnValue().Set(false); } + // The Ping itself is an Async resource. When the acknowledgement is recieved, + // the callback will be invoked and a notification sent out to JS land. The + // notification will include the duration of the ping, allowing the round + // trip to be measured. ping->Send(payload); args.GetReturnValue().Set(true); } @@ -2039,6 +2308,7 @@ bool Http2Session::AddPing(Http2Session::Http2Ping* ping) { return true; } + Http2Session::Http2Ping::Http2Ping( Http2Session* session) : AsyncWrap(session->env(), @@ -2086,6 +2356,8 @@ void Http2Session::Http2Ping::Done(bool ack, const uint8_t* payload) { delete this; } + +// Set up the process.binding('http2') binding. void Initialize(Local<Object> target, Local<Value> unused, Local<Context> context, @@ -2164,8 +2436,6 @@ void Initialize(Local<Object> target, env->SetProtoMethod(session, "ping", Http2Session::Ping); env->SetProtoMethod(session, "consume", Http2Session::Consume); env->SetProtoMethod(session, "destroy", Http2Session::Destroy); - env->SetProtoMethod(session, "destroying", Http2Session::Destroying); - env->SetProtoMethod(session, "shutdownNotice", Http2Session::ShutdownNotice); env->SetProtoMethod(session, "goaway", Http2Session::Goaway); env->SetProtoMethod(session, "settings", Http2Session::Settings); env->SetProtoMethod(session, "request", Http2Session::Request); diff --git a/src/node_http2.h b/src/node_http2.h index 5a61e465a4..8318a8ee5b 100644 --- a/src/node_http2.h +++ b/src/node_http2.h @@ -70,7 +70,13 @@ void inline debug_vfprintf(const char* format, ...) { #define DEBUG_HTTP2STREAM2(...) do {} while (0) #endif +// We strictly limit the number of outstanding unacknowledged PINGS a user +// may send in order to prevent abuse. The current default cap is 10. The +// user may set a different limit using a per Http2Session configuration +// option. #define DEFAULT_MAX_PINGS 10 + +// These are the standard HTTP/2 defaults as specified by the RFC #define DEFAULT_SETTINGS_HEADER_TABLE_SIZE 4096 #define DEFAULT_SETTINGS_ENABLE_PUSH 1 #define DEFAULT_SETTINGS_INITIAL_WINDOW_SIZE 65535 @@ -83,10 +89,10 @@ void inline debug_vfprintf(const char* format, ...) { #define MAX_MAX_HEADER_LIST_SIZE 16777215u #define DEFAULT_MAX_HEADER_LIST_PAIRS 128u -struct nghttp2_stream_write_t; - #define MAX_BUFFER_COUNT 16 +struct nghttp2_stream_write_t; + enum nghttp2_session_type { NGHTTP2_SESSION_SERVER, NGHTTP2_SESSION_CLIENT @@ -109,11 +115,15 @@ enum nghttp2_stream_flags { // Stream is destroyed NGHTTP2_STREAM_FLAG_DESTROYED = 0x10, // Stream has trailers - NGHTTP2_STREAM_FLAG_TRAILERS = 0x20 + NGHTTP2_STREAM_FLAG_TRAILERS = 0x20, + // Stream has received all the data it can + NGHTTP2_STREAM_FLAG_EOS = 0x40 }; enum nghttp2_stream_options { + // Stream is not going to have any DATA frames STREAM_OPTION_EMPTY_PAYLOAD = 0x1, + // Stream might have trailing headers STREAM_OPTION_GET_TRAILERS = 0x2, }; @@ -136,7 +146,6 @@ struct nghttp2_header { }; - struct nghttp2_stream_write_t { void* data; int status; @@ -417,9 +426,10 @@ const char* nghttp2_errname(int rv) { enum session_state_flags { SESSION_STATE_NONE = 0x0, - SESSION_STATE_DESTROYING = 0x1, - SESSION_STATE_HAS_SCOPE = 0x2, - SESSION_STATE_WRITE_SCHEDULED = 0x4 + SESSION_STATE_HAS_SCOPE = 0x1, + SESSION_STATE_WRITE_SCHEDULED = 0x2, + SESSION_STATE_CLOSED = 0x4, + SESSION_STATE_SENDING = 0x8, }; // This allows for 4 default-sized frames with their frame headers @@ -555,6 +565,8 @@ class Http2Stream : public AsyncWrap, unsigned int nbufs, nghttp2_stream_write_cb cb); + inline bool HasDataChunks(bool ignore_eos = false); + inline void AddChunk(const uint8_t* data, size_t len); inline void FlushDataChunks(); @@ -592,7 +604,7 @@ class Http2Stream : public AsyncWrap, bool silent = false); // Submits an RST_STREAM frame using the given code - inline int SubmitRstStream(const uint32_t code); + inline void SubmitRstStream(const uint32_t code); // Submits a PUSH_PROMISE frame with this stream as the parent. inline Http2Stream* SubmitPushPromise( @@ -799,9 +811,11 @@ class Http2Session : public AsyncWrap { void Start(); void Stop(); - void Close(); + void Close(uint32_t code = NGHTTP2_NO_ERROR, + bool socket_closed = false); void Consume(Local<External> external); void Unconsume(); + void Goaway(uint32_t code, int32_t lastStreamID, uint8_t* data, size_t len); bool Ping(v8::Local<v8::Function> function); @@ -827,8 +841,9 @@ class Http2Session : public AsyncWrap { inline const char* TypeName(); - inline void MarkDestroying() { flags_ |= SESSION_STATE_DESTROYING; } - inline bool IsDestroying() { return flags_ & SESSION_STATE_DESTROYING; } + inline bool IsDestroyed() { + return (flags_ & SESSION_STATE_CLOSED) || session_ == nullptr; + } // Schedule a write if nghttp2 indicates it wants to write to the socket. void MaybeScheduleWrite(); @@ -842,9 +857,6 @@ class Http2Session : public AsyncWrap { // Removes a stream instance from this session inline void RemoveStream(int32_t id); - // Sends a notice to the connected peer that the session is shutting down. - inline void SubmitShutdownNotice(); - // Submits a SETTINGS frame to the connected peer. inline void Settings(const nghttp2_settings_entry iv[], size_t niv); @@ -868,6 +880,7 @@ class Http2Session : public AsyncWrap { const uv_buf_t* bufs, uv_handle_type pending, void* ctx); + static void OnStreamDestructImpl(void* ctx); // The JavaScript API static void New(const FunctionCallbackInfo<Value>& args); @@ -878,7 +891,6 @@ class Http2Session : public AsyncWrap { static void Settings(const FunctionCallbackInfo<Value>& args); static void Request(const FunctionCallbackInfo<Value>& args); static void SetNextStreamID(const FunctionCallbackInfo<Value>& args); - static void ShutdownNotice(const FunctionCallbackInfo<Value>& args); static void Goaway(const FunctionCallbackInfo<Value>& args); static void UpdateChunksSent(const FunctionCallbackInfo<Value>& args); static void RefreshState(const FunctionCallbackInfo<Value>& args); |