summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/node_http2.cc128
-rw-r--r--src/node_http2.h12
-rw-r--r--test/parallel/test-http2-large-write-multiple-requests.js39
3 files changed, 136 insertions, 43 deletions
diff --git a/src/node_http2.cc b/src/node_http2.cc
index 0fec129373..7e32f7f14b 100644
--- a/src/node_http2.cc
+++ b/src/node_http2.cc
@@ -868,31 +868,51 @@ ssize_t Http2Session::OnCallbackPadding(size_t frameLen,
// various callback functions. Each of these will typically result in a call
// out to JavaScript so this particular function is rather hot and can be
// quite expensive. This is a potential performance optimization target later.
-ssize_t Http2Session::Write(const uv_buf_t* bufs, size_t nbufs) {
- size_t total = 0;
- // Note that nghttp2_session_mem_recv is a synchronous operation that
- // will trigger a number of other callbacks. Those will, in turn have
+ssize_t Http2Session::ConsumeHTTP2Data() {
+ CHECK_NOT_NULL(stream_buf_.base);
+ CHECK_LT(stream_buf_offset_, stream_buf_.len);
+ size_t read_len = stream_buf_.len - stream_buf_offset_;
+
// multiple side effects.
- for (size_t n = 0; n < nbufs; n++) {
- Debug(this, "receiving %d bytes [wants data? %d]",
- bufs[n].len,
- nghttp2_session_want_read(session_));
- ssize_t ret =
- nghttp2_session_mem_recv(session_,
- reinterpret_cast<uint8_t*>(bufs[n].base),
- bufs[n].len);
- CHECK_NE(ret, NGHTTP2_ERR_NOMEM);
-
- if (ret < 0)
- return ret;
+ Debug(this, "receiving %d bytes [wants data? %d]",
+ read_len,
+ nghttp2_session_want_read(session_));
+ flags_ &= ~SESSION_STATE_NGHTTP2_RECV_PAUSED;
+ ssize_t ret =
+ nghttp2_session_mem_recv(session_,
+ reinterpret_cast<uint8_t*>(stream_buf_.base) +
+ stream_buf_offset_,
+ read_len);
+ CHECK_NE(ret, NGHTTP2_ERR_NOMEM);
- total += ret;
+ if (flags_ & SESSION_STATE_NGHTTP2_RECV_PAUSED) {
+ CHECK_NE(flags_ & SESSION_STATE_READING_STOPPED, 0);
+
+ CHECK_GT(ret, 0);
+ CHECK_LE(static_cast<size_t>(ret), read_len);
+
+ if (static_cast<size_t>(ret) < read_len) {
+ // Mark the remainder of the data as available for later consumption.
+ stream_buf_offset_ += ret;
+ return ret;
+ }
}
+
+ // We are done processing the current input chunk.
+ DecrementCurrentSessionMemory(stream_buf_.len);
+ stream_buf_offset_ = 0;
+ stream_buf_ab_.Reset();
+ stream_buf_allocation_.clear();
+ stream_buf_ = uv_buf_init(nullptr, 0);
+
+ if (ret < 0)
+ return ret;
+
// Send any data that was queued up while processing the received data.
if (!IsDestroyed()) {
SendPendingData();
}
- return total;
+ return ret;
}
@@ -1194,8 +1214,18 @@ int Http2Session::OnDataChunkReceived(nghttp2_session* handle,
nghttp2_session_consume_stream(handle, id, avail);
else
stream->inbound_consumed_data_while_paused_ += avail;
+
+ // If we have a gathered a lot of data for output, try sending it now.
+ if (session->outgoing_length_ > 4096) session->SendPendingData();
} while (len != 0);
+ // If we are currently waiting for a write operation to finish, we should
+ // tell nghttp2 that we want to wait before we process more input data.
+ if (session->flags_ & SESSION_STATE_WRITE_IN_PROGRESS) {
+ session->flags_ |= SESSION_STATE_NGHTTP2_RECV_PAUSED;
+ return NGHTTP2_ERR_PAUSE;
+ }
+
return 0;
}
@@ -1283,6 +1313,7 @@ void Http2StreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
size_t offset = buf.base - session->stream_buf_.base;
// Verify that the data offset is inside the current read buffer.
+ CHECK_GE(offset, session->stream_buf_offset_);
CHECK_LE(offset, session->stream_buf_.len);
CHECK_LE(offset + buf.len, session->stream_buf_.len);
@@ -1554,6 +1585,11 @@ void Http2Session::OnStreamAfterWrite(WriteWrap* w, int status) {
stream_->ReadStart();
}
+ // If there is more incoming data queued up, consume it.
+ if (stream_buf_offset_ > 0) {
+ ConsumeHTTP2Data();
+ }
+
if (!(flags_ & SESSION_STATE_WRITE_SCHEDULED)) {
// Schedule a new write if nghttp2 wants to send data.
MaybeScheduleWrite();
@@ -1609,6 +1645,7 @@ void Http2Session::ClearOutgoing(int status) {
if (outgoing_buffers_.size() > 0) {
outgoing_storage_.clear();
+ outgoing_length_ = 0;
std::vector<nghttp2_stream_write> current_outgoing_buffers_;
current_outgoing_buffers_.swap(outgoing_buffers_);
@@ -1639,6 +1676,11 @@ void Http2Session::ClearOutgoing(int status) {
}
}
+void Http2Session::PushOutgoingBuffer(nghttp2_stream_write&& write) {
+ outgoing_length_ += write.buf.len;
+ outgoing_buffers_.emplace_back(std::move(write));
+}
+
// Queue a given block of data for sending. This always creates a copy,
// so it is used for the cases in which nghttp2 requests sending of a
// small chunk of data.
@@ -1651,7 +1693,7 @@ void Http2Session::CopyDataIntoOutgoing(const uint8_t* src, size_t src_length) {
// of the outgoing_buffers_ vector may invalidate the pointer.
// The correct base pointers will be set later, before writing to the
// underlying socket.
- outgoing_buffers_.emplace_back(nghttp2_stream_write {
+ PushOutgoingBuffer(nghttp2_stream_write {
uv_buf_init(nullptr, src_length)
});
}
@@ -1774,13 +1816,13 @@ int Http2Session::OnSendData(
if (write.buf.len <= length) {
// This write does not suffice by itself, so we can consume it completely.
length -= write.buf.len;
- session->outgoing_buffers_.emplace_back(std::move(write));
+ session->PushOutgoingBuffer(std::move(write));
stream->queue_.pop();
continue;
}
// Slice off `length` bytes of the first write in the queue.
- session->outgoing_buffers_.emplace_back(nghttp2_stream_write {
+ session->PushOutgoingBuffer(nghttp2_stream_write {
uv_buf_init(write.buf.base, length)
});
write.buf.base += length;
@@ -1790,7 +1832,7 @@ int Http2Session::OnSendData(
if (frame->data.padlen > 0) {
// Send padding if that was requested.
- session->outgoing_buffers_.emplace_back(nghttp2_stream_write {
+ session->PushOutgoingBuffer(nghttp2_stream_write {
uv_buf_init(const_cast<char*>(zero_bytes_256), frame->data.padlen - 1)
});
}
@@ -1827,8 +1869,6 @@ void Http2Session::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) {
Http2Scope h2scope(this);
CHECK_NOT_NULL(stream_);
Debug(this, "receiving %d bytes", nread);
- CHECK_EQ(stream_buf_allocation_.size(), 0);
- CHECK(stream_buf_ab_.IsEmpty());
AllocatedBuffer buf(env(), buf_);
// Only pass data on if nread > 0
@@ -1839,24 +1879,31 @@ void Http2Session::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) {
return;
}
- // Shrink to the actual amount of used data.
- buf.Resize(nread);
+ statistics_.data_received += nread;
- IncrementCurrentSessionMemory(nread);
- OnScopeLeave on_scope_leave([&]() {
- // Once finished handling this write, reset the stream buffer.
- // The memory has either been free()d or was handed over to V8.
- // We use `nread` instead of `buf.size()` here, because the buffer is
- // cleared as part of the `.ToArrayBuffer()` call below.
- DecrementCurrentSessionMemory(nread);
+ if (UNLIKELY(stream_buf_offset_ > 0)) {
+ // This is a very unlikely case, and should only happen if the ReadStart()
+ // call in OnStreamAfterWrite() immediately provides data. If that does
+ // happen, we concatenate the data we received with the already-stored
+ // pending input data, slicing off the already processed part.
+ AllocatedBuffer new_buf = env()->AllocateManaged(
+ stream_buf_.len - stream_buf_offset_ + nread);
+ memcpy(new_buf.data(),
+ stream_buf_.base + stream_buf_offset_,
+ stream_buf_.len - stream_buf_offset_);
+ memcpy(new_buf.data() + stream_buf_.len - stream_buf_offset_,
+ buf.data(),
+ nread);
+ buf = std::move(new_buf);
+ nread = buf.size();
+ stream_buf_offset_ = 0;
stream_buf_ab_.Reset();
- stream_buf_allocation_.clear();
- stream_buf_ = uv_buf_init(nullptr, 0);
- });
+ DecrementCurrentSessionMemory(stream_buf_offset_);
+ }
- // Make sure that there was no read previously active.
- CHECK_NULL(stream_buf_.base);
- CHECK_EQ(stream_buf_.len, 0);
+ // Shrink to the actual amount of used data.
+ buf.Resize(nread);
+ IncrementCurrentSessionMemory(nread);
// Remember the current buffer, so that OnDataChunkReceived knows the
// offset of a DATA frame's data into the socket read buffer.
@@ -1869,8 +1916,7 @@ void Http2Session::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) {
// to copy memory.
stream_buf_allocation_ = std::move(buf);
- statistics_.data_received += nread;
- ssize_t ret = Write(&stream_buf_, 1);
+ ssize_t ret = ConsumeHTTP2Data();
if (UNLIKELY(ret < 0)) {
Debug(this, "fatal error receiving data: %d", ret);
diff --git a/src/node_http2.h b/src/node_http2.h
index 21e05157c1..9c032f02d1 100644
--- a/src/node_http2.h
+++ b/src/node_http2.h
@@ -337,6 +337,7 @@ enum session_state_flags {
SESSION_STATE_SENDING = 0x10,
SESSION_STATE_WRITE_IN_PROGRESS = 0x20,
SESSION_STATE_READING_STOPPED = 0x40,
+ SESSION_STATE_NGHTTP2_RECV_PAUSED = 0x80
};
typedef uint32_t(*get_setting)(nghttp2_session* session,
@@ -767,14 +768,15 @@ class Http2Session : public AsyncWrap, public StreamListener {
// Indicates whether there currently exist outgoing buffers for this stream.
bool HasWritesOnSocketForStream(Http2Stream* stream);
- // Write data to the session
- ssize_t Write(const uv_buf_t* bufs, size_t nbufs);
+ // Write data from stream_buf_ to the session
+ ssize_t ConsumeHTTP2Data();
void MemoryInfo(MemoryTracker* tracker) const override {
tracker->TrackField("streams", streams_);
tracker->TrackField("outstanding_pings", outstanding_pings_);
tracker->TrackField("outstanding_settings", outstanding_settings_);
tracker->TrackField("outgoing_buffers", outgoing_buffers_);
+ tracker->TrackFieldWithSize("stream_buf", stream_buf_.len);
tracker->TrackFieldWithSize("outgoing_storage", outgoing_storage_.size());
tracker->TrackFieldWithSize("pending_rst_streams",
pending_rst_streams_.size() * sizeof(int32_t));
@@ -833,6 +835,7 @@ class Http2Session : public AsyncWrap, public StreamListener {
}
void DecrementCurrentSessionMemory(uint64_t amount) {
+ DCHECK_LE(amount, current_session_memory_);
current_session_memory_ -= amount;
}
@@ -995,8 +998,11 @@ class Http2Session : public AsyncWrap, public StreamListener {
uint32_t chunks_sent_since_last_write_ = 0;
uv_buf_t stream_buf_ = uv_buf_init(nullptr, 0);
+ // When processing input data, either stream_buf_ab_ or stream_buf_allocation_
+ // will be set. stream_buf_ab_ is lazily created from stream_buf_allocation_.
v8::Global<v8::ArrayBuffer> stream_buf_ab_;
AllocatedBuffer stream_buf_allocation_;
+ size_t stream_buf_offset_ = 0;
size_t max_outstanding_pings_ = DEFAULT_MAX_PINGS;
std::queue<std::unique_ptr<Http2Ping>> outstanding_pings_;
@@ -1006,6 +1012,7 @@ class Http2Session : public AsyncWrap, public StreamListener {
std::vector<nghttp2_stream_write> outgoing_buffers_;
std::vector<uint8_t> outgoing_storage_;
+ size_t outgoing_length_ = 0;
std::vector<int32_t> pending_rst_streams_;
// Count streams that have been rejected while being opened. Exceeding a fixed
// limit will result in the session being destroyed, as an indication of a
@@ -1015,6 +1022,7 @@ class Http2Session : public AsyncWrap, public StreamListener {
// Also use the invalid frame count as a measure for rejecting input frames.
int32_t invalid_frame_count_ = 0;
+ void PushOutgoingBuffer(nghttp2_stream_write&& write);
void CopyDataIntoOutgoing(const uint8_t* src, size_t src_length);
void ClearOutgoing(int status);
diff --git a/test/parallel/test-http2-large-write-multiple-requests.js b/test/parallel/test-http2-large-write-multiple-requests.js
new file mode 100644
index 0000000000..0d65c3479b
--- /dev/null
+++ b/test/parallel/test-http2-large-write-multiple-requests.js
@@ -0,0 +1,39 @@
+'use strict';
+const common = require('../common');
+if (!common.hasCrypto)
+ common.skip('missing crypto');
+
+const fixtures = require('../common/fixtures');
+const assert = require('assert');
+const http2 = require('http2');
+
+const content = fixtures.readSync('person-large.jpg');
+
+const server = http2.createServer({
+ maxSessionMemory: 1000
+});
+server.on('stream', (stream, headers) => {
+ stream.respond({
+ 'content-type': 'image/jpeg',
+ ':status': 200
+ });
+ stream.end(content);
+});
+server.unref();
+
+server.listen(0, common.mustCall(() => {
+ const client = http2.connect(`http://localhost:${server.address().port}/`);
+
+ let finished = 0;
+ for (let i = 0; i < 100; i++) {
+ const req = client.request({ ':path': '/' }).end();
+ const chunks = [];
+ req.on('data', (chunk) => {
+ chunks.push(chunk);
+ });
+ req.on('end', common.mustCall(() => {
+ assert.deepStrictEqual(Buffer.concat(chunks), content);
+ if (++finished === 100) client.close();
+ }));
+ }
+}));