summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAnna Henningsen <anna@addaleax.net>2018-01-08 01:14:06 +0100
committerRuben Bridgewater <ruben@bridgewater.de>2018-02-01 10:53:26 +0100
commit7c4b09b24bbe7d6a8cbad256f47b30a101a909ea (patch)
tree1aef41b1fd1cc0aad300b178e0a19e6da29615c8 /src
parent1b6cb947611de5865641d1a6780ee6930a4e1d69 (diff)
downloadandroid-node-v8-7c4b09b24bbe7d6a8cbad256f47b30a101a909ea.tar.gz
android-node-v8-7c4b09b24bbe7d6a8cbad256f47b30a101a909ea.tar.bz2
android-node-v8-7c4b09b24bbe7d6a8cbad256f47b30a101a909ea.zip
src: refactor stream callbacks and ownership
Instead of setting individual callbacks on streams and tracking stream ownership through a boolean `consume_` flag, always have one specific listener object in charge of a stream, and call methods on that object rather than generic C-style callbacks. PR-URL: https://github.com/nodejs/node/pull/18334 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Anatoli Papirovski <apapirovski@mac.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Diffstat (limited to 'src')
-rw-r--r--src/connection_wrap.cc1
-rw-r--r--src/js_stream.cc55
-rw-r--r--src/node_http2.cc208
-rw-r--r--src/node_http2.h33
-rw-r--r--src/node_http_parser.cc75
-rw-r--r--src/pipe_wrap.cc1
-rw-r--r--src/process_wrap.cc1
-rw-r--r--src/stream_base-inl.h85
-rw-r--r--src/stream_base.cc49
-rw-r--r--src/stream_base.h201
-rw-r--r--src/stream_wrap.cc62
-rw-r--r--src/stream_wrap.h22
-rw-r--r--src/tcp_wrap.cc1
-rw-r--r--src/tls_wrap.cc115
-rw-r--r--src/tls_wrap.h24
-rw-r--r--src/tty_wrap.cc1
16 files changed, 461 insertions, 473 deletions
diff --git a/src/connection_wrap.cc b/src/connection_wrap.cc
index 8de77f361d..a6cf67ceee 100644
--- a/src/connection_wrap.cc
+++ b/src/connection_wrap.cc
@@ -3,6 +3,7 @@
#include "connect_wrap.h"
#include "env-inl.h"
#include "pipe_wrap.h"
+#include "stream_base-inl.h"
#include "stream_wrap.h"
#include "tcp_wrap.h"
#include "util-inl.h"
diff --git a/src/js_stream.cc b/src/js_stream.cc
index 7d1115f12a..9e67a2094d 100644
--- a/src/js_stream.cc
+++ b/src/js_stream.cc
@@ -25,9 +25,6 @@ JSStream::JSStream(Environment* env, Local<Object> obj)
StreamBase(env) {
node::Wrap(obj, this);
MakeWeak<JSStream>(this);
-
- set_alloc_cb({ OnAllocImpl, this });
- set_read_cb({ OnReadImpl, this });
}
@@ -35,45 +32,6 @@ JSStream::~JSStream() {
}
-void JSStream::OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx) {
- buf->base = Malloc(size);
- buf->len = size;
-}
-
-
-void JSStream::OnReadImpl(ssize_t nread,
- const uv_buf_t* buf,
- uv_handle_type pending,
- void* ctx) {
- JSStream* wrap = static_cast<JSStream*>(ctx);
- CHECK_NE(wrap, nullptr);
- Environment* env = wrap->env();
- HandleScope handle_scope(env->isolate());
- Context::Scope context_scope(env->context());
-
- if (nread < 0) {
- if (buf != nullptr && buf->base != nullptr)
- free(buf->base);
- wrap->EmitData(nread, Local<Object>(), Local<Object>());
- return;
- }
-
- if (nread == 0) {
- if (buf->base != nullptr)
- free(buf->base);
- return;
- }
-
- CHECK_LE(static_cast<size_t>(nread), buf->len);
- char* base = node::Realloc(buf->base, nread);
-
- CHECK_EQ(pending, UV_UNKNOWN_HANDLE);
-
- Local<Object> obj = Buffer::New(env, base, nread).ToLocalChecked();
- wrap->EmitData(nread, obj, Local<Object>());
-}
-
-
AsyncWrap* JSStream::GetAsyncWrap() {
return static_cast<AsyncWrap*>(this);
}
@@ -212,18 +170,19 @@ void JSStream::ReadBuffer(const FunctionCallbackInfo<Value>& args) {
char* data = Buffer::Data(args[0]);
int len = Buffer::Length(args[0]);
- do {
- uv_buf_t buf;
+ // Repeatedly ask the stream's owner for memory, copy the data that we
+ // just read from JS into those buffers and emit them as reads.
+ while (len != 0) {
+ uv_buf_t buf = wrap->EmitAlloc(len);
ssize_t avail = len;
- wrap->EmitAlloc(len, &buf);
if (static_cast<ssize_t>(buf.len) < avail)
avail = buf.len;
memcpy(buf.base, data, avail);
data += avail;
len -= avail;
- wrap->EmitRead(avail, &buf);
- } while (len != 0);
+ wrap->EmitRead(avail, buf);
+ }
}
@@ -231,7 +190,7 @@ void JSStream::EmitEOF(const FunctionCallbackInfo<Value>& args) {
JSStream* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
- wrap->EmitRead(UV_EOF, nullptr);
+ wrap->EmitRead(UV_EOF);
}
diff --git a/src/node_http2.cc b/src/node_http2.cc
index bd7eeee865..bd2e93a13c 100644
--- a/src/node_http2.cc
+++ b/src/node_http2.cc
@@ -531,24 +531,12 @@ Http2Session::Http2Session(Environment* env,
outgoing_buffers_.reserve(32);
}
-void Http2Session::Unconsume() {
- if (stream_ != nullptr) {
- DEBUG_HTTP2SESSION(this, "unconsuming the i/o stream");
- stream_->set_destruct_cb({ nullptr, nullptr });
- stream_->set_alloc_cb({ nullptr, nullptr });
- stream_->set_read_cb({ nullptr, nullptr });
- stream_->Unconsume();
- stream_ = nullptr;
- }
-}
-
Http2Session::~Http2Session() {
CHECK_EQ(flags_ & SESSION_STATE_HAS_SCOPE, 0);
if (!object().IsEmpty())
ClearWrap(object());
persistent().Reset();
CHECK(persistent().IsEmpty());
- Unconsume();
DEBUG_HTTP2SESSION(this, "freeing nghttp2 session");
nghttp2_session_del(session_);
}
@@ -646,7 +634,8 @@ void Http2Session::Close(uint32_t code, bool socket_closed) {
DEBUG_HTTP2SESSION2(this, "terminating session with code %d", code);
CHECK_EQ(nghttp2_session_terminate_session(session_, code), 0);
} else {
- Unconsume();
+ if (stream_ != nullptr)
+ stream_->RemoveStreamListener(this);
}
// If there are outstanding pings, those will need to be canceled, do
@@ -1044,22 +1033,38 @@ inline int Http2Session::OnDataChunkReceived(nghttp2_session* handle,
stream->statistics_.received_bytes += len;
- // There is a single large array buffer for the entire data read from the
- // network; create a slice of that array buffer and emit it as the
- // received data buffer.
- CHECK(!session->stream_buf_ab_.IsEmpty());
- size_t offset = reinterpret_cast<const char*>(data) - session->stream_buf_;
- // Verify that the data offset is inside the current read buffer.
- CHECK_LE(offset, session->stream_buf_size_);
-
- Local<Object> buf =
- Buffer::New(env, session->stream_buf_ab_, offset, len).ToLocalChecked();
-
- stream->EmitData(len, buf, Local<Object>());
- if (!stream->IsReading())
- stream->inbound_consumed_data_while_paused_ += len;
- else
- nghttp2_session_consume_stream(handle, id, len);
+ // Repeatedly ask the stream's owner for memory, and copy the read data
+ // into those buffers.
+ // The typical case is actually the exception here; Http2StreamListeners
+ // know about the HTTP2 session associated with this stream, so they know
+ // about the larger from-socket read buffer, so they do not require copying.
+ do {
+ uv_buf_t buf = stream->EmitAlloc(len);
+ ssize_t avail = len;
+ if (static_cast<ssize_t>(buf.len) < avail)
+ avail = buf.len;
+
+ // `buf.base == nullptr` is the default Http2StreamListener's way
+ // of saying that it wants a pointer to the raw original.
+ // Since it has access to the original socket buffer from which the data
+ // was read in the first place, it can use that to minizime ArrayBuffer
+ // allocations.
+ if (LIKELY(buf.base == nullptr))
+ buf.base = reinterpret_cast<char*>(const_cast<uint8_t*>(data));
+ else
+ memcpy(buf.base, data, avail);
+ data += avail;
+ len -= avail;
+ stream->EmitRead(avail, buf);
+
+ // If the stream owner (e.g. the JS Http2Stream) wants more data, just
+ // tell nghttp2 that all data has been consumed. Otherwise, defer until
+ // more data is being requested.
+ if (stream->IsReading())
+ nghttp2_session_consume_stream(handle, id, avail);
+ else
+ stream->inbound_consumed_data_while_paused_ += avail;
+ } while (len != 0);
}
return 0;
}
@@ -1129,6 +1134,38 @@ inline void Http2Session::GetTrailers(Http2Stream* stream, uint32_t* flags) {
}
}
+uv_buf_t Http2StreamListener::OnStreamAlloc(size_t size) {
+ // See the comments in Http2Session::OnDataChunkReceived
+ // (which is the only possible call site for this method).
+ return uv_buf_init(nullptr, size);
+}
+
+void Http2StreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
+ Http2Stream* stream = static_cast<Http2Stream*>(stream_);
+ Http2Session* session = stream->session();
+ Environment* env = stream->env();
+
+ if (nread < 0) {
+ PassReadErrorToPreviousListener(nread);
+ return;
+ }
+
+ CHECK(!session->stream_buf_ab_.IsEmpty());
+
+ // There is a single large array buffer for the entire data read from the
+ // network; create a slice of that array buffer and emit it as the
+ // received data buffer.
+ size_t offset = buf.base - session->stream_buf_.base;
+
+ // Verify that the data offset is inside the current read buffer.
+ CHECK_LE(offset, session->stream_buf_.len);
+ CHECK_LE(offset + buf.len, session->stream_buf_.len);
+
+ Local<Object> buffer =
+ Buffer::New(env, session->stream_buf_ab_, offset, nread).ToLocalChecked();
+
+ stream->CallJSOnreadMethod(nread, buffer);
+}
Http2Stream::SubmitTrailers::SubmitTrailers(
Http2Session* session,
@@ -1257,7 +1294,7 @@ inline void Http2Session::HandleDataFrame(const nghttp2_frame* frame) {
return;
if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
- stream->EmitData(UV_EOF, Local<Object>(), Local<Object>());
+ stream->EmitRead(UV_EOF);
}
}
@@ -1378,16 +1415,15 @@ inline void Http2Session::HandleSettingsFrame(const nghttp2_frame* frame) {
}
// Callback used when data has been written to the stream.
-void Http2Session::OnStreamAfterWriteImpl(WriteWrap* w, int status, void* ctx) {
- Http2Session* session = static_cast<Http2Session*>(ctx);
- DEBUG_HTTP2SESSION2(session, "write finished with status %d", status);
+void Http2Session::OnStreamAfterWrite(WriteWrap* w, int status) {
+ DEBUG_HTTP2SESSION2(this, "write finished with status %d", status);
// Inform all pending writes about their completion.
- session->ClearOutgoing(status);
+ ClearOutgoing(status);
- if (!(session->flags_ & SESSION_STATE_WRITE_SCHEDULED)) {
+ if (!(flags_ & SESSION_STATE_WRITE_SCHEDULED)) {
// Schedule a new write if nghttp2 wants to send data.
- session->MaybeScheduleWrite();
+ MaybeScheduleWrite();
}
}
@@ -1625,97 +1661,76 @@ WriteWrap* Http2Session::AllocateSend() {
Local<Object> obj =
env()->write_wrap_constructor_function()
->NewInstance(env()->context()).ToLocalChecked();
- return WriteWrap::New(env(), obj, stream_);
-}
-
-// Allocates the data buffer used to receive inbound data from the i/o stream
-void Http2Session::OnStreamAllocImpl(size_t suggested_size,
- uv_buf_t* buf,
- void* ctx) {
- Http2Session* session = static_cast<Http2Session*>(ctx);
- CHECK_EQ(session->stream_buf_, nullptr);
- CHECK_EQ(session->stream_buf_size_, 0);
- buf->base = session->stream_buf_ = Malloc(suggested_size);
- buf->len = session->stream_buf_size_ = suggested_size;
- session->IncrementCurrentSessionMemory(suggested_size);
+ return WriteWrap::New(env(), obj, static_cast<StreamBase*>(stream_));
}
// Callback used to receive inbound data from the i/o stream
-void Http2Session::OnStreamReadImpl(ssize_t nread,
- const uv_buf_t* buf,
- uv_handle_type pending,
- void* ctx) {
- Http2Session* session = static_cast<Http2Session*>(ctx);
- Http2Scope h2scope(session);
- CHECK_NE(session->stream_, nullptr);
- DEBUG_HTTP2SESSION2(session, "receiving %d bytes", nread);
+void Http2Session::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
+ Http2Scope h2scope(this);
+ CHECK_NE(stream_, nullptr);
+ DEBUG_HTTP2SESSION2(this, "receiving %d bytes", nread);
+ IncrementCurrentSessionMemory(buf.len);
+ CHECK(stream_buf_ab_.IsEmpty());
+
if (nread <= 0) {
- free(session->stream_buf_);
+ free(buf.base);
if (nread < 0) {
- uv_buf_t tmp_buf = uv_buf_init(nullptr, 0);
- session->prev_read_cb_.fn(nread,
- &tmp_buf,
- pending,
- session->prev_read_cb_.ctx);
+ PassReadErrorToPreviousListener(nread);
}
} else {
// Only pass data on if nread > 0
+ // Makre sure that there was no read previously active.
+ CHECK_EQ(stream_buf_.base, nullptr);
+ CHECK_EQ(stream_buf_.len, 0);
+
+ // Remember the current buffer, so that OnDataChunkReceived knows the
+ // offset of a DATA frame's data into the socket read buffer.
+ stream_buf_ = uv_buf_init(buf.base, nread);
+
// Verify that currently: There is memory allocated into which
// the data has been read, and that memory buffer is at least as large
// as the amount of data we have read, but we have not yet made an
// ArrayBuffer out of it.
- CHECK_NE(session->stream_buf_, nullptr);
- CHECK_EQ(session->stream_buf_, buf->base);
- CHECK_EQ(session->stream_buf_size_, buf->len);
- CHECK_GE(session->stream_buf_size_, static_cast<size_t>(nread));
- CHECK(session->stream_buf_ab_.IsEmpty());
+ CHECK_LE(static_cast<size_t>(nread), stream_buf_.len);
- Environment* env = session->env();
- Isolate* isolate = env->isolate();
+ Isolate* isolate = env()->isolate();
HandleScope scope(isolate);
- Local<Context> context = env->context();
- Context::Scope context_scope(context);
+ Context::Scope context_scope(env()->context());
// Create an array buffer for the read data. DATA frames will be emitted
// as slices of this array buffer to avoid having to copy memory.
- session->stream_buf_ab_ =
+ stream_buf_ab_ =
ArrayBuffer::New(isolate,
- session->stream_buf_,
- session->stream_buf_size_,
+ buf.base,
+ nread,
v8::ArrayBufferCreationMode::kInternalized);
- uv_buf_t buf_ = uv_buf_init(buf->base, nread);
- session->statistics_.data_received += nread;
- ssize_t ret = session->Write(&buf_, 1);
+ statistics_.data_received += nread;
+ ssize_t ret = Write(&stream_buf_, 1);
// Note: if ssize_t is not defined (e.g. on Win32), nghttp2 will typedef
// ssize_t to int. Cast here so that the < 0 check actually works on
// Windows.
if (static_cast<int>(ret) < 0) {
- DEBUG_HTTP2SESSION2(session, "fatal error receiving data: %d", ret);
+ DEBUG_HTTP2SESSION2(this, "fatal error receiving data: %d", ret);
- Local<Value> argv[1] = {
+ Local<Value> argv[] = {
Integer::New(isolate, ret),
};
- session->MakeCallback(env->error_string(), arraysize(argv), argv);
+ MakeCallback(env()->error_string(), arraysize(argv), argv);
} else {
- DEBUG_HTTP2SESSION2(session, "processed %d bytes. wants more? %d", ret,
- nghttp2_session_want_read(**session));
+ DEBUG_HTTP2SESSION2(this, "processed %d bytes. wants more? %d", ret,
+ nghttp2_session_want_read(session_));
}
}
// Since we are finished handling this write, reset the stream buffer.
// The memory has either been free()d or was handed over to V8.
- session->DecrementCurrentSessionMemory(session->stream_buf_size_);
- session->stream_buf_ = nullptr;
- session->stream_buf_size_ = 0;
- session->stream_buf_ab_ = Local<ArrayBuffer>();
-}
+ DecrementCurrentSessionMemory(buf.len);
-void Http2Session::OnStreamDestructImpl(void* ctx) {
- Http2Session* session = static_cast<Http2Session*>(ctx);
- session->stream_ = nullptr;
+ stream_buf_ab_ = Local<ArrayBuffer>();
+ stream_buf_ = uv_buf_init(nullptr, 0);
}
// Every Http2Session session is tightly bound to a single i/o StreamBase
@@ -1724,14 +1739,7 @@ void Http2Session::OnStreamDestructImpl(void* ctx) {
// C++ layer via the StreamBase API.
void Http2Session::Consume(Local<External> external) {
StreamBase* stream = static_cast<StreamBase*>(external->Value());
- stream->Consume();
- stream_ = stream;
- prev_alloc_cb_ = stream->alloc_cb();
- prev_read_cb_ = stream->read_cb();
- stream->set_alloc_cb({ Http2Session::OnStreamAllocImpl, this });
- stream->set_read_cb({ Http2Session::OnStreamReadImpl, this });
- stream->set_after_write_cb({ Http2Session::OnStreamAfterWriteImpl, this });
- stream->set_destruct_cb({ Http2Session::OnStreamDestructImpl, this });
+ stream->PushStreamListener(this);
DEBUG_HTTP2SESSION(this, "i/o stream consumed");
}
@@ -1769,6 +1777,8 @@ Http2Stream::Http2Stream(
if (options & STREAM_OPTION_GET_TRAILERS)
flags_ |= NGHTTP2_STREAM_FLAG_TRAILERS;
+ PushStreamListener(&stream_listener_);
+
if (options & STREAM_OPTION_EMPTY_PAYLOAD)
Shutdown();
session->AddStream(this);
diff --git a/src/node_http2.h b/src/node_http2.h
index 9027ed7feb..bf41d74ed4 100644
--- a/src/node_http2.h
+++ b/src/node_http2.h
@@ -535,6 +535,12 @@ class Http2Priority {
nghttp2_priority_spec spec;
};
+class Http2StreamListener : public StreamListener {
+ public:
+ uv_buf_t OnStreamAlloc(size_t suggested_size) override;
+ void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
+};
+
class Http2Stream : public AsyncWrap,
public StreamBase {
public:
@@ -747,6 +753,8 @@ class Http2Stream : public AsyncWrap,
int64_t fd_offset_ = 0;
int64_t fd_length_ = -1;
+ Http2StreamListener stream_listener_;
+
friend class Http2Session;
};
@@ -798,7 +806,7 @@ class Http2Stream::Provider::Stream : public Http2Stream::Provider {
};
-class Http2Session : public AsyncWrap {
+class Http2Session : public AsyncWrap, public StreamListener {
public:
Http2Session(Environment* env,
Local<Object> wrap,
@@ -872,21 +880,11 @@ class Http2Session : public AsyncWrap {
size_t self_size() const override { return sizeof(*this); }
- char* stream_alloc() {
- return stream_buf_;
- }
-
inline void GetTrailers(Http2Stream* stream, uint32_t* flags);
- static void OnStreamAllocImpl(size_t suggested_size,
- uv_buf_t* buf,
- void* ctx);
- static void OnStreamReadImpl(ssize_t nread,
- const uv_buf_t* bufs,
- uv_handle_type pending,
- void* ctx);
- static void OnStreamAfterWriteImpl(WriteWrap* w, int status, void* ctx);
- static void OnStreamDestructImpl(void* ctx);
+ // Handle reads/writes from the underlying network transport.
+ void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
+ void OnStreamAfterWrite(WriteWrap* w, int status) override;
// The JavaScript API
static void New(const FunctionCallbackInfo<Value>& args);
@@ -1074,16 +1072,12 @@ class Http2Session : public AsyncWrap {
int flags_ = SESSION_STATE_NONE;
// The StreamBase instance being used for i/o
- StreamBase* stream_;
- StreamResource::Callback<StreamResource::AllocCb> prev_alloc_cb_;
- StreamResource::Callback<StreamResource::ReadCb> prev_read_cb_;
padding_strategy_type padding_strategy_ = PADDING_STRATEGY_NONE;
// use this to allow timeout tracking during long-lasting writes
uint32_t chunks_sent_since_last_write_ = 0;
- char* stream_buf_ = nullptr;
- size_t stream_buf_size_ = 0;
+ uv_buf_t stream_buf_ = uv_buf_init(nullptr, 0);
v8::Local<v8::ArrayBuffer> stream_buf_ab_;
size_t max_outstanding_pings_ = DEFAULT_MAX_PINGS;
@@ -1099,6 +1093,7 @@ class Http2Session : public AsyncWrap {
void ClearOutgoing(int status);
friend class Http2Scope;
+ friend class Http2StreamListener;
};
class Http2SessionPerformanceEntry : public PerformanceEntry {
diff --git a/src/node_http_parser.cc b/src/node_http_parser.cc
index 9debb8a205..d4044f8bbe 100644
--- a/src/node_http_parser.cc
+++ b/src/node_http_parser.cc
@@ -144,7 +144,7 @@ struct StringPtr {
};
-class Parser : public AsyncWrap {
+class Parser : public AsyncWrap, public StreamListener {
public:
Parser(Environment* env, Local<Object> wrap, enum http_parser_type type)
: AsyncWrap(env, wrap, AsyncWrap::PROVIDER_HTTPPARSER),
@@ -494,14 +494,7 @@ class Parser : public AsyncWrap {
Local<External> stream_obj = args[0].As<External>();
StreamBase* stream = static_cast<StreamBase*>(stream_obj->Value());
CHECK_NE(stream, nullptr);
-
- stream->Consume();
-
- parser->prev_alloc_cb_ = stream->alloc_cb();
- parser->prev_read_cb_ = stream->read_cb();
-
- stream->set_alloc_cb({ OnAllocImpl, parser });
- stream->set_read_cb({ OnReadImpl, parser });
+ stream->PushStreamListener(parser);
}
@@ -510,22 +503,10 @@ class Parser : public AsyncWrap {
ASSIGN_OR_RETURN_UNWRAP(&parser, args.Holder());
// Already unconsumed
- if (parser->prev_alloc_cb_.is_empty())
+ if (parser->stream_ == nullptr)
return;
- // Restore stream's callbacks
- if (args.Length() == 1 && args[0]->IsExternal()) {
- Local<External> stream_obj = args[0].As<External>();
- StreamBase* stream = static_cast<StreamBase*>(stream_obj->Value());
- CHECK_NE(stream, nullptr);
-
- stream->set_alloc_cb(parser->prev_alloc_cb_);
- stream->set_read_cb(parser->prev_read_cb_);
- stream->Unconsume();
- }
-
- parser->prev_alloc_cb_.clear();
- parser->prev_read_cb_.clear();
+ parser->stream_->RemoveStreamListener(parser);
}
@@ -544,33 +525,19 @@ class Parser : public AsyncWrap {
protected:
static const size_t kAllocBufferSize = 64 * 1024;
- static void OnAllocImpl(size_t suggested_size, uv_buf_t* buf, void* ctx) {
- Parser* parser = static_cast<Parser*>(ctx);
- Environment* env = parser->env();
+ uv_buf_t OnStreamAlloc(size_t suggested_size) override {
+ if (env()->http_parser_buffer() == nullptr)
+ env()->set_http_parser_buffer(new char[kAllocBufferSize]);
- if (env->http_parser_buffer() == nullptr)
- env->set_http_parser_buffer(new char[kAllocBufferSize]);
-
- buf->base = env->http_parser_buffer();
- buf->len = kAllocBufferSize;
+ return uv_buf_init(env()->http_parser_buffer(), kAllocBufferSize);
}
- static void OnReadImpl(ssize_t nread,
- const uv_buf_t* buf,
- uv_handle_type pending,
- void* ctx) {
- Parser* parser = static_cast<Parser*>(ctx);
- HandleScope scope(parser->env()->isolate());
+ void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override {
+ HandleScope scope(env()->isolate());
if (nread < 0) {
- uv_buf_t tmp_buf;
- tmp_buf.base = nullptr;
- tmp_buf.len = 0;
- parser->prev_read_cb_.fn(nread,
- &tmp_buf,
- pending,
- parser->prev_read_cb_.ctx);
+ PassReadErrorToPreviousListener(nread);
return;
}
@@ -578,27 +545,27 @@ class Parser : public AsyncWrap {
if (nread == 0)
return;
- parser->current_buffer_.Clear();
- Local<Value> ret = parser->Execute(buf->base, nread);
+ current_buffer_.Clear();
+ Local<Value> ret = Execute(buf.base, nread);
// Exception
if (ret.IsEmpty())
return;
- Local<Object> obj = parser->object();
- Local<Value> cb = obj->Get(kOnExecute);
+ Local<Value> cb =
+ object()->Get(env()->context(), kOnExecute).ToLocalChecked();
if (!cb->IsFunction())
return;
// Hooks for GetCurrentBuffer
- parser->current_buffer_len_ = nread;
- parser->current_buffer_data_ = buf->base;
+ current_buffer_len_ = nread;
+ current_buffer_data_ = buf.base;
- parser->MakeCallback(cb.As<Function>(), 1, &ret);
+ MakeCallback(cb.As<Function>(), 1, &ret);
- parser->current_buffer_len_ = 0;
- parser->current_buffer_data_ = nullptr;
+ current_buffer_len_ = 0;
+ current_buffer_data_ = nullptr;
}
@@ -713,8 +680,6 @@ class Parser : public AsyncWrap {
Local<Object> current_buffer_;
size_t current_buffer_len_;
char* current_buffer_data_;
- StreamResource::Callback<StreamResource::AllocCb> prev_alloc_cb_;
- StreamResource::Callback<StreamResource::ReadCb> prev_read_cb_;
// These are helper functions for filling `http_parser_settings`, which turn
// a member function of Parser into a C-style HTTP parser callback.
diff --git a/src/pipe_wrap.cc b/src/pipe_wrap.cc
index c5958a2271..016ce480b6 100644
--- a/src/pipe_wrap.cc
+++ b/src/pipe_wrap.cc
@@ -29,6 +29,7 @@
#include "node_buffer.h"
#include "node_wrap.h"
#include "connect_wrap.h"
+#include "stream_base-inl.h"
#include "stream_wrap.h"
#include "util-inl.h"
diff --git a/src/process_wrap.cc b/src/process_wrap.cc
index b01ef56270..314131e1dd 100644
--- a/src/process_wrap.cc
+++ b/src/process_wrap.cc
@@ -22,6 +22,7 @@
#include "env-inl.h"
#include "handle_wrap.h"
#include "node_wrap.h"
+#include "stream_base-inl.h"
#include "util-inl.h"
#include <string.h>
diff --git a/src/stream_base-inl.h b/src/stream_base-inl.h
index cdcff67cc5..287978a870 100644
--- a/src/stream_base-inl.h
+++ b/src/stream_base-inl.h
@@ -25,6 +25,87 @@ using v8::Value;
using AsyncHooks = Environment::AsyncHooks;
+
+inline StreamListener::~StreamListener() {
+ if (stream_ != nullptr)
+ stream_->RemoveStreamListener(this);
+}
+
+inline void StreamListener::PassReadErrorToPreviousListener(ssize_t nread) {
+ CHECK_NE(previous_listener_, nullptr);
+ previous_listener_->OnStreamRead(nread,
+ uv_buf_init(nullptr, 0),
+ UV_UNKNOWN_HANDLE);
+}
+
+
+inline StreamResource::~StreamResource() {
+ while (listener_ != nullptr) {
+ listener_->OnStreamDestroy();
+ RemoveStreamListener(listener_);
+ }
+}
+
+inline void StreamResource::PushStreamListener(StreamListener* listener) {
+ CHECK_NE(listener, nullptr);
+ CHECK_EQ(listener->stream_, nullptr);
+
+ listener->previous_listener_ = listener_;
+ listener->stream_ = this;
+
+ listener_ = listener;
+}
+
+inline void StreamResource::RemoveStreamListener(StreamListener* listener) {
+ CHECK_NE(listener, nullptr);
+
+ StreamListener* previous;
+ StreamListener* current;
+
+ // Remove from the linked list.
+ for (current = listener_, previous = nullptr;
+ /* No loop condition because we want a crash if listener is not found */
+ ; previous = current, current = current->previous_listener_) {
+ CHECK_NE(current, nullptr);
+ if (current == listener) {
+ if (previous != nullptr)
+ previous->previous_listener_ = current->previous_listener_;
+ else
+ listener_ = listener->previous_listener_;
+ break;
+ }
+ }
+
+ listener->stream_ = nullptr;
+ listener->previous_listener_ = nullptr;
+}
+
+
+inline uv_buf_t StreamResource::EmitAlloc(size_t suggested_size) {
+ return listener_->OnStreamAlloc(suggested_size);
+}
+
+inline void StreamResource::EmitRead(ssize_t nread,
+ const uv_buf_t& buf,
+ uv_handle_type pending) {
+ if (nread > 0)
+ bytes_read_ += static_cast<uint64_t>(nread);
+ listener_->OnStreamRead(nread, buf, pending);
+}
+
+inline void StreamResource::EmitAfterWrite(WriteWrap* w, int status) {
+ listener_->OnStreamAfterWrite(w, status);
+}
+
+
+inline StreamBase::StreamBase(Environment* env) : env_(env) {
+ PushStreamListener(&default_listener_);
+}
+
+inline Environment* StreamBase::stream_env() const {
+ return env_;
+}
+
template <class Base>
void StreamBase::AddMethods(Environment* env,
Local<FunctionTemplate> t,
@@ -70,8 +151,8 @@ void StreamBase::AddMethods(Environment* env,
Local<FunctionTemplate>(),
attributes);
- env->SetProtoMethod(t, "readStart", JSMethod<Base, &StreamBase::ReadStart>);
- env->SetProtoMethod(t, "readStop", JSMethod<Base, &StreamBase::ReadStop>);
+ env->SetProtoMethod(t, "readStart", JSMethod<Base, &StreamBase::ReadStartJS>);
+ env->SetProtoMethod(t, "readStop", JSMethod<Base, &StreamBase::ReadStopJS>);
if ((flags & kFlagNoShutdown) == 0)
env->SetProtoMethod(t, "shutdown", JSMethod<Base, &StreamBase::Shutdown>);
if ((flags & kFlagHasWritev) != 0)
diff --git a/src/stream_base.cc b/src/stream_base.cc
index 0fb801ddd5..9acf2273ab 100644
--- a/src/stream_base.cc
+++ b/src/stream_base.cc
@@ -34,12 +34,12 @@ template int StreamBase::WriteString<LATIN1>(
const FunctionCallbackInfo<Value>& args);
-int StreamBase::ReadStart(const FunctionCallbackInfo<Value>& args) {
+int StreamBase::ReadStartJS(const FunctionCallbackInfo<Value>& args) {
return ReadStart();
}
-int StreamBase::ReadStop(const FunctionCallbackInfo<Value>& args) {
+int StreamBase::ReadStopJS(const FunctionCallbackInfo<Value>& args) {
return ReadStop();
}
@@ -437,9 +437,9 @@ void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) {
}
-void StreamBase::EmitData(ssize_t nread,
- Local<Object> buf,
- Local<Object> handle) {
+void StreamBase::CallJSOnreadMethod(ssize_t nread,
+ Local<Object> buf,
+ Local<Object> handle) {
Environment* env = env_;
Local<Value> argv[] = {
@@ -490,4 +490,43 @@ void StreamResource::ClearError() {
// No-op
}
+
+uv_buf_t StreamListener::OnStreamAlloc(size_t suggested_size) {
+ return uv_buf_init(Malloc(suggested_size), suggested_size);
+}
+
+void StreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
+ // This cannot be virtual because it is just as valid to override the other
+ // OnStreamRead() callback.
+ CHECK(0 && "OnStreamRead() needs to be implemented");
+}
+
+void StreamListener::OnStreamRead(ssize_t nread,
+ const uv_buf_t& buf,
+ uv_handle_type pending) {
+ CHECK_EQ(pending, UV_UNKNOWN_HANDLE);
+ OnStreamRead(nread, buf);
+}
+
+
+void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
+ CHECK_NE(stream_, nullptr);
+ StreamBase* stream = static_cast<StreamBase*>(stream_);
+ Environment* env = stream->stream_env();
+ HandleScope handle_scope(env->isolate());
+ Context::Scope context_scope(env->context());
+
+ if (nread <= 0) {
+ free(buf.base);
+ if (nread < 0)
+ stream->CallJSOnreadMethod(nread, Local<Object>());
+ return;
+ }
+
+ CHECK_LE(static_cast<size_t>(nread), buf.len);
+
+ Local<Object> obj = Buffer::New(env, buf.base, nread).ToLocalChecked();
+ stream->CallJSOnreadMethod(nread, obj);
+}
+
} // namespace node
diff --git a/src/stream_base.h b/src/stream_base.h
index d063176b04..0b176d1181 100644
--- a/src/stream_base.h
+++ b/src/stream_base.h
@@ -15,6 +15,7 @@ namespace node {
// Forward declarations
class StreamBase;
+class StreamResource;
template<typename Base>
class StreamReq {
@@ -123,38 +124,78 @@ class WriteWrap : public ReqWrap<uv_write_t>,
const size_t storage_size_;
};
-class StreamResource {
+
+// This is the generic interface for objects that control Node.js' C++ streams.
+// For example, the default `EmitToJSStreamListener` emits a stream's data
+// as Buffers in JS, or `TLSWrap` reads and decrypts data from a stream.
+class StreamListener {
public:
- template <class T>
- struct Callback {
- Callback() : fn(nullptr), ctx(nullptr) {}
- Callback(T fn, void* ctx) : fn(fn), ctx(ctx) {}
- Callback(const Callback&) = default;
-
- inline bool is_empty() { return fn == nullptr; }
- inline void clear() {
- fn = nullptr;
- ctx = nullptr;
- }
+ virtual ~StreamListener();
+
+ // This is called when a stream wants to allocate memory immediately before
+ // reading data into the freshly allocated buffer (i.e. it is always followed
+ // by a `OnStreamRead()` call).
+ // This memory may be statically or dynamically allocated; for example,
+ // a protocol parser may want to read data into a static buffer if it knows
+ // that all data is going to be fully handled during the next
+ // `OnStreamRead()` call.
+ // The returned buffer does not need to contain `suggested_size` bytes.
+ // The default implementation of this method returns a buffer that has exactly
+ // the suggested size and is allocated using malloc().
+ virtual uv_buf_t OnStreamAlloc(size_t suggested_size);
+
+ // `OnStreamRead()` is called when data is available on the socket and has
+ // been read into the buffer provided by `OnStreamAlloc()`.
+ // The `buf` argument is the return value of `uv_buf_t`, or may be a buffer
+ // with base nullpptr in case of an error.
+ // `nread` is the number of read bytes (which is at most the buffer length),
+ // or, if negative, a libuv error code.
+ // The variant with a `uv_handle_type` argument is used by libuv-backed
+ // streams for handle transfers (e.g. passing net.Socket instances between
+ // cluster workers). For all other streams, overriding the simple variant
+ // should be sufficient.
+ // By default, the second variant crashes if `pending` is set and otherwise
+ // calls the simple variant.
+ virtual void OnStreamRead(ssize_t nread,
+ const uv_buf_t& buf) = 0;
+ virtual void OnStreamRead(ssize_t nread,
+ const uv_buf_t& buf,
+ uv_handle_type pending);
+
+ // This is called once a Write has finished. `status` may be 0 or,
+ // if negative, a libuv error code.
+ virtual void OnStreamAfterWrite(WriteWrap* w, int status) {}
+
+ // This is called immediately before the stream is destroyed.
+ virtual void OnStreamDestroy() {}
- T fn;
- void* ctx;
- };
+ protected:
+ // Pass along a read error to the `StreamListener` instance that was active
+ // before this one. For example, a protocol parser does not care about read
+ // errors and may instead want to let the original handler
+ // (e.g. the JS handler) take care of the situation.
+ void PassReadErrorToPreviousListener(ssize_t nread);
- typedef void (*AfterWriteCb)(WriteWrap* w, int status, void* ctx);
- typedef void (*AllocCb)(size_t size, uv_buf_t* buf, void* ctx);
- typedef void (*ReadCb)(ssize_t nread,
- const uv_buf_t* buf,
- uv_handle_type pending,
- void* ctx);
- typedef void (*DestructCb)(void* ctx);
+ StreamResource* stream_ = nullptr;
+ StreamListener* previous_listener_ = nullptr;
- StreamResource() : bytes_read_(0) {
- }
- virtual ~StreamResource() {
- if (!destruct_cb_.is_empty())
- destruct_cb_.fn(destruct_cb_.ctx);
- }
+ friend class StreamResource;
+};
+
+
+// A default emitter that just pushes data chunks as Buffer instances to
+// JS land via the handle’s .ondata method.
+class EmitToJSStreamListener : public StreamListener {
+ public:
+ void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
+};
+
+
+// A generic stream, comparable to JS land’s `Duplex` streams.
+// A stream is always controlled through one `StreamListener` instance.
+class StreamResource {
+ public:
+ virtual ~StreamResource();
virtual int DoShutdown(ShutdownWrap* req_wrap) = 0;
virtual int DoTryWrite(uv_buf_t** bufs, size_t* count);
@@ -162,50 +203,45 @@ class StreamResource {
uv_buf_t* bufs,
size_t count,
uv_stream_t* send_handle) = 0;
- virtual const char* Error() const;
- virtual void ClearError();
-
- // Events
- inline void EmitAfterWrite(WriteWrap* w, int status) {
- if (!after_write_cb_.is_empty())
- after_write_cb_.fn(w, status, after_write_cb_.ctx);
- }
- inline void EmitAlloc(size_t size, uv_buf_t* buf) {
- if (!alloc_cb_.is_empty())
- alloc_cb_.fn(size, buf, alloc_cb_.ctx);
- }
-
- inline void EmitRead(ssize_t nread,
- const uv_buf_t* buf,
- uv_handle_type pending = UV_UNKNOWN_HANDLE) {
- if (nread > 0)
- bytes_read_ += static_cast<uint64_t>(nread);
- if (!read_cb_.is_empty())
- read_cb_.fn(nread, buf, pending, read_cb_.ctx);
- }
-
- inline void set_after_write_cb(Callback<AfterWriteCb> c) {
- after_write_cb_ = c;
- }
+ // Start reading from the underlying resource. This is called by the consumer
+ // when more data is desired.
+ virtual int ReadStart() = 0;
+ // Stop reading from the underlying resource. This is called by the
+ // consumer when its buffers are full and no more data can be handled.
+ virtual int ReadStop() = 0;
- inline void set_alloc_cb(Callback<AllocCb> c) { alloc_cb_ = c; }
- inline void set_read_cb(Callback<ReadCb> c) { read_cb_ = c; }
- inline void set_destruct_cb(Callback<DestructCb> c) { destruct_cb_ = c; }
+ // Optionally, this may provide an error message to be used for
+ // failing writes.
+ virtual const char* Error() const;
+ // Clear the current error (i.e. that would be returned by Error()).
+ virtual void ClearError();
- inline Callback<AfterWriteCb> after_write_cb() { return after_write_cb_; }
- inline Callback<AllocCb> alloc_cb() { return alloc_cb_; }
- inline Callback<ReadCb> read_cb() { return read_cb_; }
- inline Callback<DestructCb> destruct_cb() { return destruct_cb_; }
+ // Transfer ownership of this tream to `listener`. The previous listener
+ // will not receive any more callbacks while the new listener was active.
+ void PushStreamListener(StreamListener* listener);
+ // Remove a listener, and, if this was the currently active one,
+ // transfer ownership back to the previous listener.
+ void RemoveStreamListener(StreamListener* listener);
protected:
- Callback<AfterWriteCb> after_write_cb_;
- Callback<AllocCb> alloc_cb_;
- Callback<ReadCb> read_cb_;
- Callback<DestructCb> destruct_cb_;
- uint64_t bytes_read_;
+ // Call the current listener's OnStreamAlloc() method.
+ uv_buf_t EmitAlloc(size_t suggested_size);
+ // Call the current listener's OnStreamRead() method and update the
+ // stream's read byte counter.
+ void EmitRead(ssize_t nread,
+ const uv_buf_t& buf = uv_buf_init(nullptr, 0),
+ uv_handle_type pending = UV_UNKNOWN_HANDLE);
+ // Call the current listener's OnStreamAfterWrite() method.
+ void EmitAfterWrite(WriteWrap* w, int status);
+
+ StreamListener* listener_ = nullptr;
+ uint64_t bytes_read_ = 0;
+
+ friend class StreamListener;
};
+
class StreamBase : public StreamResource {
public:
enum Flags {
@@ -224,40 +260,29 @@ class StreamBase : public StreamResource {
virtual bool IsIPCPipe();
virtual int GetFD();
- virtual int ReadStart() = 0;
- virtual int ReadStop() = 0;
-
- inline void Consume() {
- CHECK_EQ(consumed_, false);
- consumed_ = true;
- }
-
- inline void Unconsume() {
- CHECK_EQ(consumed_, true);
- consumed_ = false;
- }
-
- void EmitData(ssize_t nread,
- v8::Local<v8::Object> buf,
- v8::Local<v8::Object> handle);
+ void CallJSOnreadMethod(
+ ssize_t nread,
+ v8::Local<v8::Object> buf,
+ v8::Local<v8::Object> handle = v8::Local<v8::Object>());
// These are called by the respective {Write,Shutdown}Wrap class.
virtual void AfterShutdown(ShutdownWrap* req, int status);
virtual void AfterWrite(WriteWrap* req, int status);
- protected:
- explicit StreamBase(Environment* env) : env_(env), consumed_(false) {
- }
+ // This is named `stream_env` to avoid name clashes, because a lot of
+ // subclasses are also `BaseObject`s.
+ Environment* stream_env() const;
- virtual ~StreamBase() = default;
+ protected:
+ explicit StreamBase(Environment* env);
// One of these must be implemented
virtual AsyncWrap* GetAsyncWrap() = 0;
virtual v8::Local<v8::Object> GetObject();
// JS Methods
- int ReadStart(const v8::FunctionCallbackInfo<v8::Value>& args);
- int ReadStop(const v8::FunctionCallbackInfo<v8::Value>& args);
+ int ReadStartJS(const v8::FunctionCallbackInfo<v8::Value>& args);
+ int ReadStopJS(const v8::FunctionCallbackInfo<v8::Value>& args);
int Shutdown(const v8::FunctionCallbackInfo<v8::Value>& args);
int Writev(const v8::FunctionCallbackInfo<v8::Value>& args);
int WriteBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
@@ -280,7 +305,7 @@ class StreamBase : public StreamResource {
private:
Environment* env_;
- bool consumed_;
+ EmitToJSStreamListener default_listener_;
};
} // namespace node
diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc
index b639d94500..0be73f9114 100644
--- a/src/stream_wrap.cc
+++ b/src/stream_wrap.cc
@@ -93,8 +93,7 @@ LibuvStreamWrap::LibuvStreamWrap(Environment* env,
provider),
StreamBase(env),
stream_(stream) {
- set_alloc_cb({ OnAllocImpl, this });
- set_read_cb({ OnReadImpl, this });
+ PushStreamListener(this);
}
@@ -157,23 +156,18 @@ int LibuvStreamWrap::ReadStop() {
void LibuvStreamWrap::OnAlloc(uv_handle_t* handle,
- size_t suggested_size,
- uv_buf_t* buf) {
+ size_t suggested_size,
+ uv_buf_t* buf) {
LibuvStreamWrap* wrap = static_cast<LibuvStreamWrap*>(handle->data);
HandleScope scope(wrap->env()->isolate());
Context::Scope context_scope(wrap->env()->context());
CHECK_EQ(wrap->stream(), reinterpret_cast<uv_stream_t*>(handle));
- return wrap->EmitAlloc(suggested_size, buf);
+ *buf = wrap->EmitAlloc(suggested_size);
}
-void LibuvStreamWrap::OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx) {
- buf->base = node::Malloc(size);
- buf->len = size;
-}
-
template <class WrapType, class UVType>
static Local<Object> AcceptHandle(Environment* env, LibuvStreamWrap* parent) {
@@ -196,51 +190,41 @@ static Local<Object> AcceptHandle(Environment* env, LibuvStreamWrap* parent) {
}
-void LibuvStreamWrap::OnReadImpl(ssize_t nread,
- const uv_buf_t* buf,
- uv_handle_type pending,
- void* ctx) {
- LibuvStreamWrap* wrap = static_cast<LibuvStreamWrap*>(ctx);
- Environment* env = wrap->env();
- HandleScope handle_scope(env->isolate());
- Context::Scope context_scope(env->context());
-
- Local<Object> pending_obj;
+void LibuvStreamWrap::OnStreamRead(ssize_t nread,
+ const uv_buf_t& buf,
+ uv_handle_type pending) {
+ HandleScope handle_scope(env()->isolate());
+ Context::Scope context_scope(env()->context());
- if (nread < 0) {
- if (buf->base != nullptr)
- free(buf->base);
- wrap->EmitData(nread, Local<Object>(), pending_obj);
+ if (nread <= 0) {
+ free(buf.base);
+ if (nread < 0)
+ CallJSOnreadMethod(nread, Local<Object>());
return;
}
- if (nread == 0) {
- if (buf->base != nullptr)
- free(buf->base);
- return;
- }
+ CHECK_LE(static_cast<size_t>(nread), buf.len);
- CHECK_LE(static_cast<size_t>(nread), buf->len);
- char* base = node::Realloc(buf->base, nread);
+ Local<Object> pending_obj;
if (pending == UV_TCP) {
- pending_obj = AcceptHandle<TCPWrap, uv_tcp_t>(env, wrap);
+ pending_obj = AcceptHandle<TCPWrap, uv_tcp_t>(env(), this);
} else if (pending == UV_NAMED_PIPE) {
- pending_obj = AcceptHandle<PipeWrap, uv_pipe_t>(env, wrap);
+ pending_obj = AcceptHandle<PipeWrap, uv_pipe_t>(env(), this);
} else if (pending == UV_UDP) {
- pending_obj = AcceptHandle<UDPWrap, uv_udp_t>(env, wrap);
+ pending_obj = AcceptHandle<UDPWrap, uv_udp_t>(env(), this);
} else {
CHECK_EQ(pending, UV_UNKNOWN_HANDLE);
}
- Local<Object> obj = Buffer::New(env, base, nread).ToLocalChecked();
- wrap->EmitData(nread, obj, pending_obj);
+ Local<Object> obj = Buffer::New(env(), buf.base, nread).ToLocalChecked();
+ CallJSOnreadMethod(nread, obj, pending_obj);
}
void LibuvStreamWrap::OnRead(uv_stream_t* handle,
- ssize_t nread,
- const uv_buf_t* buf) {
+ ssize_t nread,
+ const uv_buf_t* buf) {
LibuvStreamWrap* wrap = static_cast<LibuvStreamWrap*>(handle->data);
HandleScope scope(wrap->env()->isolate());
Context::Scope context_scope(wrap->env()->context());
@@ -263,7 +247,7 @@ void LibuvStreamWrap::OnRead(uv_stream_t* handle,
}
}
- wrap->EmitRead(nread, buf, type);
+ wrap->EmitRead(nread, *buf, type);
}
diff --git a/src/stream_wrap.h b/src/stream_wrap.h
index 0146d41c6e..129006b160 100644
--- a/src/stream_wrap.h
+++ b/src/stream_wrap.h
@@ -33,7 +33,9 @@
namespace node {
-class LibuvStreamWrap : public HandleWrap, public StreamBase {
+class LibuvStreamWrap : public HandleWrap,
+ public StreamListener,
+ public StreamBase {
public:
static void Initialize(v8::Local<v8::Object> target,
v8::Local<v8::Value> unused,
@@ -79,9 +81,6 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase {
uv_stream_t* stream,
AsyncWrap::ProviderType provider);
- ~LibuvStreamWrap() {
- }
-
AsyncWrap* GetAsyncWrap() override;
static void AddMethods(Environment* env,
@@ -105,11 +104,16 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase {
static void AfterUvShutdown(uv_shutdown_t* req, int status);
// Resource interface implementation
- static void OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx);
- static void OnReadImpl(ssize_t nread,
- const uv_buf_t* buf,
- uv_handle_type pending,
- void* ctx);
+ void OnStreamRead(ssize_t nread,
+ const uv_buf_t& buf) override {
+ CHECK(0 && "must not be called");
+ }
+ void OnStreamRead(ssize_t nread,
+ const uv_buf_t& buf,
+ uv_handle_type pending) override;
+ void OnStreamAfterWrite(WriteWrap* w, int status) override {
+ previous_listener_->OnStreamAfterWrite(w, status);
+ }
void AfterWrite(WriteWrap* req_wrap, int status) override;
diff --git a/src/tcp_wrap.cc b/src/tcp_wrap.cc
index 3a0a3f295e..a0a58fb1b5 100644
--- a/src/tcp_wrap.cc
+++ b/src/tcp_wrap.cc
@@ -27,6 +27,7 @@
#include "node_buffer.h"
#include "node_wrap.h"
#include "connect_wrap.h"
+#include "stream_base-inl.h"
#include "stream_wrap.h"
#include "util-inl.h"
diff --git a/src/tls_wrap.cc b/src/tls_wrap.cc
index 18b3cf01f4..971dbb857f 100644
--- a/src/tls_wrap.cc
+++ b/src/tls_wrap.cc
@@ -59,7 +59,6 @@ TLSWrap::TLSWrap(Environment* env,
SSLWrap<TLSWrap>(env, sc, kind),
StreamBase(env),
sc_(sc),
- stream_(stream),
enc_in_(nullptr),
enc_out_(nullptr),
write_size_(0),
@@ -78,14 +77,7 @@ TLSWrap::TLSWrap(Environment* env,
SSL_CTX_sess_set_get_cb(sc_->ctx_, SSLWrap<TLSWrap>::GetSessionCallback);
SSL_CTX_sess_set_new_cb(sc_->ctx_, SSLWrap<TLSWrap>::NewSessionCallback);
- stream_->Consume();
- stream_->set_after_write_cb({ OnAfterWriteImpl, this });
- stream_->set_alloc_cb({ OnAllocImpl, this });
- stream_->set_read_cb({ OnReadImpl, this });
- stream_->set_destruct_cb({ OnDestructImpl, this });
-
- set_alloc_cb({ OnAllocSelf, this });
- set_read_cb({ OnReadSelf, this });
+ stream->PushStreamListener(this);
InitSSL();
}
@@ -100,19 +92,6 @@ TLSWrap::~TLSWrap() {
#ifdef SSL_CTRL_SET_TLSEXT_SERVERNAME_CB
sni_context_.Reset();
#endif // SSL_CTRL_SET_TLSEXT_SERVERNAME_CB
-
- // See test/parallel/test-tls-transport-destroy-after-own-gc.js:
- // If this TLSWrap is garbage collected, we cannot allow callbacks to be
- // called on this stream.
-
- if (stream_ == nullptr)
- return;
- stream_->set_destruct_cb({ nullptr, nullptr });
- stream_->set_after_write_cb({ nullptr, nullptr });
- stream_->set_alloc_cb({ nullptr, nullptr });
- stream_->set_read_cb({ nullptr, nullptr });
- stream_->set_destruct_cb({ nullptr, nullptr });
- stream_->Unconsume();
}
@@ -208,15 +187,13 @@ void TLSWrap::Receive(const FunctionCallbackInfo<Value>& args) {
char* data = Buffer::Data(args[0]);
size_t len = Buffer::Length(args[0]);
- uv_buf_t buf;
-
// Copy given buffer entirely or partiall if handle becomes closed
while (len > 0 && wrap->IsAlive() && !wrap->IsClosing()) {
- wrap->stream_->EmitAlloc(len, &buf);
+ uv_buf_t buf = wrap->OnStreamAlloc(len);
size_t copy = buf.len > len ? len : buf.len;
memcpy(buf.base, data, copy);
buf.len = copy;
- wrap->stream_->EmitRead(buf.len, &buf);
+ wrap->OnStreamRead(copy, buf);
data += copy;
len -= copy;
@@ -307,7 +284,7 @@ void TLSWrap::EncOut() {
->NewInstance(env()->context()).ToLocalChecked();
WriteWrap* write_req = WriteWrap::New(env(),
req_wrap_obj,
- stream_);
+ static_cast<StreamBase*>(stream_));
uv_buf_t buf[arraysize(data)];
for (size_t i = 0; i < count; i++)
@@ -324,7 +301,7 @@ void TLSWrap::EncOut() {
}
-void TLSWrap::EncOutAfterWrite(WriteWrap* req_wrap, int status) {
+void TLSWrap::OnStreamAfterWrite(WriteWrap* req_wrap, int status) {
// We should not be getting here after `DestroySSL`, because all queued writes
// must be invoked with UV_ECANCELED
CHECK_NE(ssl_, nullptr);
@@ -421,12 +398,11 @@ void TLSWrap::ClearOut() {
while (read > 0) {
int avail = read;
- uv_buf_t buf;
- EmitAlloc(avail, &buf);
+ uv_buf_t buf = EmitAlloc(avail);
if (static_cast<int>(buf.len) < avail)
avail = buf.len;
memcpy(buf.base, current, avail);
- EmitRead(avail, &buf);
+ EmitRead(avail, buf);
// Caveat emptor: OnRead() calls into JS land which can result in
// the SSL context object being destroyed. We have to carefully
@@ -442,7 +418,7 @@ void TLSWrap::ClearOut() {
int flags = SSL_get_shutdown(ssl_);
if (!eof_ && flags & SSL_RECEIVED_SHUTDOWN) {
eof_ = true;
- EmitRead(UV_EOF, nullptr);
+ EmitRead(UV_EOF);
}
// We need to check whether an error occurred or the connection was
@@ -524,22 +500,24 @@ AsyncWrap* TLSWrap::GetAsyncWrap() {
bool TLSWrap::IsIPCPipe() {
- return stream_->IsIPCPipe();
+ return static_cast<StreamBase*>(stream_)->IsIPCPipe();
}
int TLSWrap::GetFD() {
- return stream_->GetFD();
+ return static_cast<StreamBase*>(stream_)->GetFD();
}
bool TLSWrap::IsAlive() {
- return ssl_ != nullptr && stream_ != nullptr && stream_->IsAlive();
+ return ssl_ != nullptr &&
+ stream_ != nullptr &&
+ static_cast<StreamBase*>(stream_)->IsAlive();
}
bool TLSWrap::IsClosing() {
- return stream_->IsClosing();
+ return static_cast<StreamBase*>(stream_)->IsClosing();
}
@@ -638,62 +616,16 @@ int TLSWrap::DoWrite(WriteWrap* w,
}
-void TLSWrap::OnAfterWriteImpl(WriteWrap* w, int status, void* ctx) {
- TLSWrap* wrap = static_cast<TLSWrap*>(ctx);
- wrap->EncOutAfterWrite(w, status);
-}
-
-
-void TLSWrap::OnAllocImpl(size_t suggested_size, uv_buf_t* buf, void* ctx) {
- TLSWrap* wrap = static_cast<TLSWrap*>(ctx);
-
- if (wrap->ssl_ == nullptr) {
- *buf = uv_buf_init(nullptr, 0);
- return;
- }
-
- size_t size = 0;
- buf->base = crypto::NodeBIO::FromBIO(wrap->enc_in_)->PeekWritable(&size);
- buf->len = size;
-}
-
-
-void TLSWrap::OnReadImpl(ssize_t nread,
- const uv_buf_t* buf,
- uv_handle_type pending,
- void* ctx) {
- TLSWrap* wrap = static_cast<TLSWrap*>(ctx);
- wrap->DoRead(nread, buf, pending);
-}
-
-
-void TLSWrap::OnDestructImpl(void* ctx) {
- TLSWrap* wrap = static_cast<TLSWrap*>(ctx);
- wrap->clear_stream();
-}
-
-
-void TLSWrap::OnAllocSelf(size_t suggested_size, uv_buf_t* buf, void* ctx) {
- buf->base = node::Malloc(suggested_size);
- buf->len = suggested_size;
-}
-
+uv_buf_t TLSWrap::OnStreamAlloc(size_t suggested_size) {
+ CHECK_NE(ssl_, nullptr);
-void TLSWrap::OnReadSelf(ssize_t nread,
- const uv_buf_t* buf,
- uv_handle_type pending,
- void* ctx) {
- TLSWrap* wrap = static_cast<TLSWrap*>(ctx);
- Local<Object> buf_obj;
- if (buf != nullptr)
- buf_obj = Buffer::New(wrap->env(), buf->base, buf->len).ToLocalChecked();
- wrap->EmitData(nread, buf_obj, Local<Object>());
+ size_t size = suggested_size;
+ char* base = crypto::NodeBIO::FromBIO(enc_in_)->PeekWritable(&size);
+ return uv_buf_init(base, size);
}
-void TLSWrap::DoRead(ssize_t nread,
- const uv_buf_t* buf,
- uv_handle_type pending) {
+void TLSWrap::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
if (nread < 0) {
// Error should be emitted only after all data was read
ClearOut();
@@ -705,13 +637,13 @@ void TLSWrap::DoRead(ssize_t nread,
eof_ = true;
}
- EmitRead(nread, nullptr);
+ EmitRead(nread);
return;
}
// Only client connections can receive data
if (ssl_ == nullptr) {
- EmitRead(UV_EPROTO, nullptr);
+ EmitRead(UV_EPROTO);
return;
}
@@ -800,6 +732,9 @@ void TLSWrap::DestroySSL(const FunctionCallbackInfo<Value>& args) {
// Destroy the SSL structure and friends
wrap->SSLWrap<TLSWrap>::DestroySSL();
+
+ if (wrap->stream_ != nullptr)
+ wrap->stream_->RemoveStreamListener(wrap);
}
diff --git a/src/tls_wrap.h b/src/tls_wrap.h
index ae83c82c32..a1f0b99e86 100644
--- a/src/tls_wrap.h
+++ b/src/tls_wrap.h
@@ -48,7 +48,8 @@ class NodeBIO;
class TLSWrap : public AsyncWrap,
public crypto::SSLWrap<TLSWrap>,
- public StreamBase {
+ public StreamBase,
+ public StreamListener {
public:
~TLSWrap() override;
@@ -76,8 +77,6 @@ class TLSWrap : public AsyncWrap,
size_t self_size() const override { return sizeof(*this); }
- void clear_stream() { stream_ = nullptr; }
-
protected:
static const int kClearOutChunkSize = 16384;
@@ -98,7 +97,6 @@ class TLSWrap : public AsyncWrap,
static void SSLInfoCallback(const SSL* ssl_, int where, int ret);
void InitSSL();
void EncOut();
- void EncOutAfterWrite(WriteWrap* req_wrap, int status);
bool ClearIn();
void ClearOut();
bool InvokeQueued(int status, const char* error_str = nullptr);
@@ -119,20 +117,9 @@ class TLSWrap : public AsyncWrap,
bool IsIPCPipe() override;
// Resource implementation
- static void OnAfterWriteImpl(WriteWrap* w, int status, void* ctx);
- static void OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx);
- static void OnReadImpl(ssize_t nread,
- const uv_buf_t* buf,
- uv_handle_type pending,
- void* ctx);
- static void OnAllocSelf(size_t size, uv_buf_t* buf, void* ctx);
- static void OnReadSelf(ssize_t nread,
- const uv_buf_t* buf,
- uv_handle_type pending,
- void* ctx);
- static void OnDestructImpl(void* ctx);
-
- void DoRead(ssize_t nread, const uv_buf_t* buf, uv_handle_type pending);
+ void OnStreamAfterWrite(WriteWrap* w, int status) override;
+ uv_buf_t OnStreamAlloc(size_t size) override;
+ void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
v8::Local<v8::Value> GetSSLError(int status, int* err, std::string* msg);
@@ -154,7 +141,6 @@ class TLSWrap : public AsyncWrap,
#endif // SSL_CTRL_SET_TLSEXT_SERVERNAME_CB
crypto::SecureContext* sc_;
- StreamBase* stream_;
BIO* enc_in_;
BIO* enc_out_;
std::vector<uv_buf_t> pending_cleartext_input_;
diff --git a/src/tty_wrap.cc b/src/tty_wrap.cc
index fae39158ef..9977738afc 100644
--- a/src/tty_wrap.cc
+++ b/src/tty_wrap.cc
@@ -26,6 +26,7 @@
#include "node_buffer.h"
#include "node_wrap.h"
#include "req_wrap-inl.h"
+#include "stream_base-inl.h"
#include "stream_wrap.h"
#include "util-inl.h"