summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAnna Henningsen <anna@addaleax.net>2018-01-08 03:50:51 +0100
committerAnna Henningsen <anna@addaleax.net>2018-01-18 22:42:31 +0100
commit0625627d824fb539cf02e797c6aab2b17b6df99f (patch)
treeb9c2334b87c684e844ceadc6d0106b839b06674c /src
parentda3078835a78e1348d5f30c8c8f1c2301bc1ccc4 (diff)
downloadandroid-node-v8-0625627d824fb539cf02e797c6aab2b17b6df99f.tar.gz
android-node-v8-0625627d824fb539cf02e797c6aab2b17b6df99f.tar.bz2
android-node-v8-0625627d824fb539cf02e797c6aab2b17b6df99f.zip
http2: refactor read mechanism
Refactor the read mechanism to completely avoid copying. Instead of copying individual `DATA` frame contents into buffers, create `ArrayBuffer` instances for all socket reads and emit slices of those `ArrayBuffer`s to JS. PR-URL: https://github.com/nodejs/node/pull/18030 Reviewed-By: James M Snell <jasnell@gmail.com>
Diffstat (limited to 'src')
-rw-r--r--src/node_http2.cc184
-rw-r--r--src/node_http2.h18
2 files changed, 93 insertions, 109 deletions
diff --git a/src/node_http2.cc b/src/node_http2.cc
index cf346efba4..cd28f57ffe 100644
--- a/src/node_http2.cc
+++ b/src/node_http2.cc
@@ -9,6 +9,7 @@
namespace node {
+using v8::ArrayBuffer;
using v8::Boolean;
using v8::Context;
using v8::Float64Array;
@@ -978,7 +979,6 @@ inline int Http2Session::OnStreamClose(nghttp2_session* handle,
// Intentionally ignore the callback if the stream does not exist or has
// already been destroyed
if (stream != nullptr && !stream->IsDestroyed()) {
- stream->AddChunk(nullptr, 0);
stream->Close(code);
// It is possible for the stream close to occur before the stream is
// ever passed on to the javascript side. If that happens, skip straight
@@ -989,9 +989,8 @@ inline int Http2Session::OnStreamClose(nghttp2_session* handle,
stream->object()->Get(context, env->onstreamclose_string())
.ToLocalChecked();
if (fn->IsFunction()) {
- Local<Value> argv[2] = {
- Integer::NewFromUnsigned(isolate, code),
- Boolean::New(isolate, stream->HasDataChunks(true))
+ Local<Value> argv[] = {
+ Integer::NewFromUnsigned(isolate, code)
};
stream->MakeCallback(fn.As<Function>(), arraysize(argv), argv);
} else {
@@ -1028,6 +1027,8 @@ inline int Http2Session::OnDataChunkReceived(nghttp2_session* handle,
Http2Session* session = static_cast<Http2Session*>(user_data);
DEBUG_HTTP2SESSION2(session, "buffering data chunk for stream %d, size: "
"%d, flags: %d", id, len, flags);
+ Environment* env = session->env();
+ HandleScope scope(env->isolate());
// We should never actually get a 0-length chunk so this check is
// only a precaution at this point.
if (len > 0) {
@@ -1039,8 +1040,25 @@ inline int Http2Session::OnDataChunkReceived(nghttp2_session* handle,
// If the stream has been destroyed, ignore this chunk
if (stream->IsDestroyed())
return 0;
+
stream->statistics_.received_bytes += len;
- stream->AddChunk(data, len);
+
+ // There is a single large array buffer for the entire data read from the
+ // network; create a slice of that array buffer and emit it as the
+ // received data buffer.
+ CHECK(!session->stream_buf_ab_.IsEmpty());
+ size_t offset = reinterpret_cast<const char*>(data) - session->stream_buf_;
+ // Verify that the data offset is inside the current read buffer.
+ CHECK_LE(offset, session->stream_buf_size_);
+
+ Local<Object> buf =
+ Buffer::New(env, session->stream_buf_ab_, offset, len).ToLocalChecked();
+
+ stream->EmitData(len, buf, Local<Object>());
+ if (!stream->IsReading())
+ stream->inbound_consumed_data_while_paused_ += len;
+ else
+ nghttp2_session_consume_stream(handle, id, len);
}
return 0;
}
@@ -1226,9 +1244,8 @@ inline void Http2Session::HandlePriorityFrame(const nghttp2_frame* frame) {
// Called by OnFrameReceived when a complete DATA frame has been received.
-// If we know that this is the last DATA frame (because the END_STREAM flag
-// is set), then we'll terminate the readable side of the StreamBase. If
-// the StreamBase is flowing, we'll push the chunks of data out to JS land.
+// If we know that this was the last DATA frame (because the END_STREAM flag
+// is set), then we'll terminate the readable side of the StreamBase.
inline void Http2Session::HandleDataFrame(const nghttp2_frame* frame) {
int32_t id = GetFrameID(frame);
DEBUG_HTTP2SESSION2(this, "handling data frame for stream %d", id);
@@ -1239,11 +1256,8 @@ inline void Http2Session::HandleDataFrame(const nghttp2_frame* frame) {
return;
if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
- stream->AddChunk(nullptr, 0);
+ stream->EmitData(UV_EOF, Local<Object>(), Local<Object>());
}
-
- if (stream->IsReading())
- stream->FlushDataChunks();
}
@@ -1618,45 +1632,67 @@ void Http2Session::OnStreamAllocImpl(size_t suggested_size,
uv_buf_t* buf,
void* ctx) {
Http2Session* session = static_cast<Http2Session*>(ctx);
- buf->base = session->stream_alloc();
- buf->len = kAllocBufferSize;
+ CHECK_EQ(session->stream_buf_, nullptr);
+ CHECK_EQ(session->stream_buf_size_, 0);
+ buf->base = session->stream_buf_ = Malloc(suggested_size);
+ buf->len = session->stream_buf_size_ = suggested_size;
+ session->IncrementCurrentSessionMemory(suggested_size);
}
// Callback used to receive inbound data from the i/o stream
void Http2Session::OnStreamReadImpl(ssize_t nread,
- const uv_buf_t* bufs,
+ const uv_buf_t* buf,
uv_handle_type pending,
void* ctx) {
Http2Session* session = static_cast<Http2Session*>(ctx);
Http2Scope h2scope(session);
CHECK_NE(session->stream_, nullptr);
DEBUG_HTTP2SESSION2(session, "receiving %d bytes", nread);
- if (nread < 0) {
- uv_buf_t tmp_buf;
- tmp_buf.base = nullptr;
- tmp_buf.len = 0;
- session->prev_read_cb_.fn(nread,
- &tmp_buf,
- pending,
- session->prev_read_cb_.ctx);
- return;
- }
- if (bufs->len > 0) {
+ if (nread <= 0) {
+ free(session->stream_buf_);
+ if (nread < 0) {
+ uv_buf_t tmp_buf = uv_buf_init(nullptr, 0);
+ session->prev_read_cb_.fn(nread,
+ &tmp_buf,
+ pending,
+ session->prev_read_cb_.ctx);
+ }
+ } else {
// Only pass data on if nread > 0
- uv_buf_t buf[] { uv_buf_init((*bufs).base, nread) };
+
+ // Verify that currently: There is memory allocated into which
+ // the data has been read, and that memory buffer is at least as large
+ // as the amount of data we have read, but we have not yet made an
+ // ArrayBuffer out of it.
+ CHECK_NE(session->stream_buf_, nullptr);
+ CHECK_EQ(session->stream_buf_, buf->base);
+ CHECK_EQ(session->stream_buf_size_, buf->len);
+ CHECK_GE(session->stream_buf_size_, static_cast<size_t>(nread));
+ CHECK(session->stream_buf_ab_.IsEmpty());
+
+ Environment* env = session->env();
+ Isolate* isolate = env->isolate();
+ HandleScope scope(isolate);
+ Local<Context> context = env->context();
+ Context::Scope context_scope(context);
+
+ // Create an array buffer for the read data. DATA frames will be emitted
+ // as slices of this array buffer to avoid having to copy memory.
+ session->stream_buf_ab_ =
+ ArrayBuffer::New(isolate,
+ session->stream_buf_,
+ session->stream_buf_size_,
+ v8::ArrayBufferCreationMode::kInternalized);
+
+ uv_buf_t buf_ = uv_buf_init(buf->base, nread);
session->statistics_.data_received += nread;
- ssize_t ret = session->Write(buf, 1);
+ ssize_t ret = session->Write(&buf_, 1);
// Note: if ssize_t is not defined (e.g. on Win32), nghttp2 will typedef
// ssize_t to int. Cast here so that the < 0 check actually works on
// Windows.
if (static_cast<int>(ret) < 0) {
DEBUG_HTTP2SESSION2(session, "fatal error receiving data: %d", ret);
- Environment* env = session->env();
- Isolate* isolate = env->isolate();
- HandleScope scope(isolate);
- Local<Context> context = env->context();
- Context::Scope context_scope(context);
Local<Value> argv[1] = {
Integer::New(isolate, ret),
@@ -1667,6 +1703,13 @@ void Http2Session::OnStreamReadImpl(ssize_t nread,
nghttp2_session_want_read(**session));
}
}
+
+ // Since we are finished handling this write, reset the stream buffer.
+ // The memory has either been free()d or was handed over to V8.
+ session->DecrementCurrentSessionMemory(session->stream_buf_size_);
+ session->stream_buf_ = nullptr;
+ session->stream_buf_size_ = 0;
+ session->stream_buf_ab_ = Local<ArrayBuffer>();
}
void Http2Session::OnStreamDestructImpl(void* ctx) {
@@ -1781,30 +1824,6 @@ void Http2Stream::OnTrailers(const SubmitTrailers& submit_trailers) {
}
}
-inline bool Http2Stream::HasDataChunks(bool ignore_eos) {
- return data_chunks_.size() > (ignore_eos ? 1 : 0);
-}
-
-// Appends a chunk of received DATA frame data to this Http2Streams internal
-// queue. Note that we must memcpy each chunk because of the way that nghttp2
-// handles it's internal memory`.
-inline void Http2Stream::AddChunk(const uint8_t* data, size_t len) {
- CHECK(!this->IsDestroyed());
- if (this->statistics_.first_byte == 0)
- this->statistics_.first_byte = uv_hrtime();
- if (flags_ & NGHTTP2_STREAM_FLAG_EOS)
- return;
- char* buf = nullptr;
- if (len > 0 && data != nullptr) {
- buf = Malloc<char>(len);
- memcpy(buf, data, len);
- } else if (data == nullptr) {
- flags_ |= NGHTTP2_STREAM_FLAG_EOS;
- }
- data_chunks_.emplace(uv_buf_init(buf, len));
-}
-
-
inline void Http2Stream::Close(int32_t code) {
CHECK(!this->IsDestroyed());
flags_ |= NGHTTP2_STREAM_FLAG_CLOSED;
@@ -1841,13 +1860,6 @@ inline void Http2Stream::Destroy() {
DEBUG_HTTP2STREAM(this, "destroying stream");
- // Free any remaining incoming data chunks.
- while (!data_chunks_.empty()) {
- uv_buf_t buf = data_chunks_.front();
- free(buf.base);
- data_chunks_.pop();
- }
-
// Wait until the start of the next loop to delete because there
// may still be some pending operations queued for this stream.
env()->SetImmediate([](Environment* env, void* data) {
@@ -1873,39 +1885,6 @@ inline void Http2Stream::Destroy() {
}
-// Uses the StreamBase API to push a single chunk of queued inbound DATA
-// to JS land.
-void Http2Stream::OnDataChunk(uv_buf_t* chunk) {
- CHECK(!this->IsDestroyed());
- Isolate* isolate = env()->isolate();
- HandleScope scope(isolate);
- ssize_t len = -1;
- Local<Object> buf;
- if (chunk != nullptr) {
- len = chunk->len;
- buf = Buffer::New(isolate, chunk->base, len).ToLocalChecked();
- }
- EmitData(len, buf, this->object());
-}
-
-
-inline void Http2Stream::FlushDataChunks() {
- CHECK(!this->IsDestroyed());
- Http2Scope h2scope(this);
- if (!data_chunks_.empty()) {
- uv_buf_t buf = data_chunks_.front();
- data_chunks_.pop();
- if (buf.len > 0) {
- CHECK_EQ(nghttp2_session_consume_stream(session_->session(),
- id_, buf.len), 0);
- OnDataChunk(&buf);
- } else {
- OnDataChunk(nullptr);
- }
- }
-}
-
-
// Initiates a response on the Http2Stream using data provided via the
// StreamBase Streams API.
inline int Http2Stream::SubmitResponse(nghttp2_nv* nva,
@@ -2012,13 +1991,20 @@ inline Http2Stream* Http2Stream::SubmitPushPromise(nghttp2_nv* nva,
// Switch the StreamBase into flowing mode to begin pushing chunks of data
// out to JS land.
inline int Http2Stream::ReadStart() {
+ Http2Scope h2scope(this);
CHECK(!this->IsDestroyed());
flags_ |= NGHTTP2_STREAM_FLAG_READ_START;
flags_ &= ~NGHTTP2_STREAM_FLAG_READ_PAUSED;
- // Flush any queued data chunks immediately out to the JS layer
- FlushDataChunks();
DEBUG_HTTP2STREAM(this, "reading starting");
+
+ // Tell nghttp2 about our consumption of the data that was handed
+ // off to JS land.
+ nghttp2_session_consume_stream(session_->session(),
+ id_,
+ inbound_consumed_data_while_paused_);
+ inbound_consumed_data_while_paused_ = 0;
+
return 0;
}
diff --git a/src/node_http2.h b/src/node_http2.h
index 4ed06c9597..9027ed7feb 100644
--- a/src/node_http2.h
+++ b/src/node_http2.h
@@ -550,12 +550,6 @@ class Http2Stream : public AsyncWrap,
inline void EmitStatistics();
- inline bool HasDataChunks(bool ignore_eos = false);
-
- inline void AddChunk(const uint8_t* data, size_t len);
-
- inline void FlushDataChunks();
-
// Process a Data Chunk
void OnDataChunk(uv_buf_t* chunk);
@@ -740,8 +734,11 @@ class Http2Stream : public AsyncWrap,
uint32_t current_headers_length_ = 0; // total number of octets
std::vector<nghttp2_header> current_headers_;
- // Inbound Data... This is the data received via DATA frames for this stream.
- std::queue<uv_buf_t> data_chunks_;
+ // This keeps track of the amount of data read from the socket while the
+ // socket was in paused mode. When `ReadStart()` is called (and not before
+ // then), we tell nghttp2 that we consumed that data to get proper
+ // backpressure handling.
+ size_t inbound_consumed_data_while_paused_ = 0;
// Outbound Data... This is the data written by the JS layer that is
// waiting to be written out to the socket.
@@ -1085,8 +1082,9 @@ class Http2Session : public AsyncWrap {
// use this to allow timeout tracking during long-lasting writes
uint32_t chunks_sent_since_last_write_ = 0;
- uv_prepare_t* prep_ = nullptr;
- char stream_buf_[kAllocBufferSize];
+ char* stream_buf_ = nullptr;
+ size_t stream_buf_size_ = 0;
+ v8::Local<v8::ArrayBuffer> stream_buf_ab_;
size_t max_outstanding_pings_ = DEFAULT_MAX_PINGS;
std::queue<Http2Ping*> outstanding_pings_;