summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAnna Henningsen <anna@addaleax.net>2018-01-25 00:43:06 +0100
committerRuben Bridgewater <ruben@bridgewater.de>2018-02-01 10:53:53 +0100
commit5898dc3d0c071a8e5135ba0a80565dd19d49cb32 (patch)
tree8bf1fd4236b183dfd8509b8d5bc43ec7f659f14a /src
parent7c4b09b24bbe7d6a8cbad256f47b30a101a909ea (diff)
downloadandroid-node-v8-5898dc3d0c071a8e5135ba0a80565dd19d49cb32.tar.gz
android-node-v8-5898dc3d0c071a8e5135ba0a80565dd19d49cb32.tar.bz2
android-node-v8-5898dc3d0c071a8e5135ba0a80565dd19d49cb32.zip
src: simplify handles for libuv streams
Instead of passing along the handle object, just set it as a property on the stream handle object and let the read handler grab it from there. PR-URL: https://github.com/nodejs/node/pull/18334 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Anatoli Papirovski <apapirovski@mac.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Diffstat (limited to 'src')
-rw-r--r--src/env.h1
-rw-r--r--src/stream_base-inl.h10
-rw-r--r--src/stream_base.cc23
-rw-r--r--src/stream_base.h18
-rw-r--r--src/stream_wrap.cc98
-rw-r--r--src/stream_wrap.h26
6 files changed, 49 insertions, 127 deletions
diff --git a/src/env.h b/src/env.h
index d73be8156e..7342613595 100644
--- a/src/env.h
+++ b/src/env.h
@@ -210,6 +210,7 @@ class ModuleWrap;
V(owner_string, "owner") \
V(parse_error_string, "Parse Error") \
V(path_string, "path") \
+ V(pending_handle_string, "pendingHandle") \
V(pbkdf2_error_string, "PBKDF2 Error") \
V(pid_string, "pid") \
V(pipe_string, "pipe") \
diff --git a/src/stream_base-inl.h b/src/stream_base-inl.h
index 287978a870..76922c1d8a 100644
--- a/src/stream_base-inl.h
+++ b/src/stream_base-inl.h
@@ -33,9 +33,7 @@ inline StreamListener::~StreamListener() {
inline void StreamListener::PassReadErrorToPreviousListener(ssize_t nread) {
CHECK_NE(previous_listener_, nullptr);
- previous_listener_->OnStreamRead(nread,
- uv_buf_init(nullptr, 0),
- UV_UNKNOWN_HANDLE);
+ previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0));
}
@@ -85,12 +83,10 @@ inline uv_buf_t StreamResource::EmitAlloc(size_t suggested_size) {
return listener_->OnStreamAlloc(suggested_size);
}
-inline void StreamResource::EmitRead(ssize_t nread,
- const uv_buf_t& buf,
- uv_handle_type pending) {
+inline void StreamResource::EmitRead(ssize_t nread, const uv_buf_t& buf) {
if (nread > 0)
bytes_read_ += static_cast<uint64_t>(nread);
- listener_->OnStreamRead(nread, buf, pending);
+ listener_->OnStreamRead(nread, buf);
}
inline void StreamResource::EmitAfterWrite(WriteWrap* w, int status) {
diff --git a/src/stream_base.cc b/src/stream_base.cc
index 9acf2273ab..8bdcebe88a 100644
--- a/src/stream_base.cc
+++ b/src/stream_base.cc
@@ -437,23 +437,17 @@ void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) {
}
-void StreamBase::CallJSOnreadMethod(ssize_t nread,
- Local<Object> buf,
- Local<Object> handle) {
+void StreamBase::CallJSOnreadMethod(ssize_t nread, Local<Object> buf) {
Environment* env = env_;
Local<Value> argv[] = {
Integer::New(env->isolate(), nread),
- buf,
- handle
+ buf
};
if (argv[1].IsEmpty())
argv[1] = Undefined(env->isolate());
- if (argv[2].IsEmpty())
- argv[2] = Undefined(env->isolate());
-
AsyncWrap* wrap = GetAsyncWrap();
CHECK_NE(wrap, nullptr);
wrap->MakeCallback(env->onread_string(), arraysize(argv), argv);
@@ -495,19 +489,6 @@ uv_buf_t StreamListener::OnStreamAlloc(size_t suggested_size) {
return uv_buf_init(Malloc(suggested_size), suggested_size);
}
-void StreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
- // This cannot be virtual because it is just as valid to override the other
- // OnStreamRead() callback.
- CHECK(0 && "OnStreamRead() needs to be implemented");
-}
-
-void StreamListener::OnStreamRead(ssize_t nread,
- const uv_buf_t& buf,
- uv_handle_type pending) {
- CHECK_EQ(pending, UV_UNKNOWN_HANDLE);
- OnStreamRead(nread, buf);
-}
-
void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
CHECK_NE(stream_, nullptr);
diff --git a/src/stream_base.h b/src/stream_base.h
index 0b176d1181..f18b6bda0a 100644
--- a/src/stream_base.h
+++ b/src/stream_base.h
@@ -150,17 +150,8 @@ class StreamListener {
// with base nullpptr in case of an error.
// `nread` is the number of read bytes (which is at most the buffer length),
// or, if negative, a libuv error code.
- // The variant with a `uv_handle_type` argument is used by libuv-backed
- // streams for handle transfers (e.g. passing net.Socket instances between
- // cluster workers). For all other streams, overriding the simple variant
- // should be sufficient.
- // By default, the second variant crashes if `pending` is set and otherwise
- // calls the simple variant.
virtual void OnStreamRead(ssize_t nread,
const uv_buf_t& buf) = 0;
- virtual void OnStreamRead(ssize_t nread,
- const uv_buf_t& buf,
- uv_handle_type pending);
// This is called once a Write has finished. `status` may be 0 or,
// if negative, a libuv error code.
@@ -229,9 +220,7 @@ class StreamResource {
uv_buf_t EmitAlloc(size_t suggested_size);
// Call the current listener's OnStreamRead() method and update the
// stream's read byte counter.
- void EmitRead(ssize_t nread,
- const uv_buf_t& buf = uv_buf_init(nullptr, 0),
- uv_handle_type pending = UV_UNKNOWN_HANDLE);
+ void EmitRead(ssize_t nread, const uv_buf_t& buf = uv_buf_init(nullptr, 0));
// Call the current listener's OnStreamAfterWrite() method.
void EmitAfterWrite(WriteWrap* w, int status);
@@ -260,10 +249,7 @@ class StreamBase : public StreamResource {
virtual bool IsIPCPipe();
virtual int GetFD();
- void CallJSOnreadMethod(
- ssize_t nread,
- v8::Local<v8::Object> buf,
- v8::Local<v8::Object> handle = v8::Local<v8::Object>());
+ void CallJSOnreadMethod(ssize_t nread, v8::Local<v8::Object> buf);
// These are called by the respective {Write,Shutdown}Wrap class.
virtual void AfterShutdown(ShutdownWrap* req, int status);
diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc
index 0be73f9114..bc10cf80e8 100644
--- a/src/stream_wrap.cc
+++ b/src/stream_wrap.cc
@@ -93,7 +93,6 @@ LibuvStreamWrap::LibuvStreamWrap(Environment* env,
provider),
StreamBase(env),
stream_(stream) {
- PushStreamListener(this);
}
@@ -146,7 +145,13 @@ bool LibuvStreamWrap::IsIPCPipe() {
int LibuvStreamWrap::ReadStart() {
- return uv_read_start(stream(), OnAlloc, OnRead);
+ return uv_read_start(stream(), [](uv_handle_t* handle,
+ size_t suggested_size,
+ uv_buf_t* buf) {
+ static_cast<LibuvStreamWrap*>(handle->data)->OnUvAlloc(suggested_size, buf);
+ }, [](uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
+ static_cast<LibuvStreamWrap*>(stream->data)->OnUvRead(nread, buf);
+ });
}
@@ -155,16 +160,11 @@ int LibuvStreamWrap::ReadStop() {
}
-void LibuvStreamWrap::OnAlloc(uv_handle_t* handle,
- size_t suggested_size,
- uv_buf_t* buf) {
- LibuvStreamWrap* wrap = static_cast<LibuvStreamWrap*>(handle->data);
- HandleScope scope(wrap->env()->isolate());
- Context::Scope context_scope(wrap->env()->context());
-
- CHECK_EQ(wrap->stream(), reinterpret_cast<uv_stream_t*>(handle));
+void LibuvStreamWrap::OnUvAlloc(size_t suggested_size, uv_buf_t* buf) {
+ HandleScope scope(env()->isolate());
+ Context::Scope context_scope(env()->context());
- *buf = wrap->EmitAlloc(suggested_size);
+ *buf = EmitAlloc(suggested_size);
}
@@ -190,64 +190,47 @@ static Local<Object> AcceptHandle(Environment* env, LibuvStreamWrap* parent) {
}
-void LibuvStreamWrap::OnStreamRead(ssize_t nread,
- const uv_buf_t& buf,
- uv_handle_type pending) {
- HandleScope handle_scope(env()->isolate());
+void LibuvStreamWrap::OnUvRead(ssize_t nread, const uv_buf_t* buf) {
+ HandleScope scope(env()->isolate());
Context::Scope context_scope(env()->context());
-
- if (nread <= 0) {
- free(buf.base);
- if (nread < 0)
- CallJSOnreadMethod(nread, Local<Object>());
- return;
- }
-
- CHECK_LE(static_cast<size_t>(nread), buf.len);
-
- Local<Object> pending_obj;
-
- if (pending == UV_TCP) {
- pending_obj = AcceptHandle<TCPWrap, uv_tcp_t>(env(), this);
- } else if (pending == UV_NAMED_PIPE) {
- pending_obj = AcceptHandle<PipeWrap, uv_pipe_t>(env(), this);
- } else if (pending == UV_UDP) {
- pending_obj = AcceptHandle<UDPWrap, uv_udp_t>(env(), this);
- } else {
- CHECK_EQ(pending, UV_UNKNOWN_HANDLE);
- }
-
- Local<Object> obj = Buffer::New(env(), buf.base, nread).ToLocalChecked();
- CallJSOnreadMethod(nread, obj, pending_obj);
-}
-
-
-void LibuvStreamWrap::OnRead(uv_stream_t* handle,
- ssize_t nread,
- const uv_buf_t* buf) {
- LibuvStreamWrap* wrap = static_cast<LibuvStreamWrap*>(handle->data);
- HandleScope scope(wrap->env()->isolate());
- Context::Scope context_scope(wrap->env()->context());
uv_handle_type type = UV_UNKNOWN_HANDLE;
- if (wrap->is_named_pipe_ipc() &&
- uv_pipe_pending_count(reinterpret_cast<uv_pipe_t*>(handle)) > 0) {
- type = uv_pipe_pending_type(reinterpret_cast<uv_pipe_t*>(handle));
+ if (is_named_pipe_ipc() &&
+ uv_pipe_pending_count(reinterpret_cast<uv_pipe_t*>(stream())) > 0) {
+ type = uv_pipe_pending_type(reinterpret_cast<uv_pipe_t*>(stream()));
}
// We should not be getting this callback if someone as already called
// uv_close() on the handle.
- CHECK_EQ(wrap->persistent().IsEmpty(), false);
+ CHECK_EQ(persistent().IsEmpty(), false);
if (nread > 0) {
- if (wrap->is_tcp()) {
+ if (is_tcp()) {
NODE_COUNT_NET_BYTES_RECV(nread);
- } else if (wrap->is_named_pipe()) {
+ } else if (is_named_pipe()) {
NODE_COUNT_PIPE_BYTES_RECV(nread);
}
+
+ Local<Object> pending_obj;
+
+ if (type == UV_TCP) {
+ pending_obj = AcceptHandle<TCPWrap, uv_tcp_t>(env(), this);
+ } else if (type == UV_NAMED_PIPE) {
+ pending_obj = AcceptHandle<PipeWrap, uv_pipe_t>(env(), this);
+ } else if (type == UV_UDP) {
+ pending_obj = AcceptHandle<UDPWrap, uv_udp_t>(env(), this);
+ } else {
+ CHECK_EQ(type, UV_UNKNOWN_HANDLE);
+ }
+
+ if (!pending_obj.IsEmpty()) {
+ object()->Set(env()->context(),
+ env()->pending_handle_string(),
+ pending_obj).FromJust();
+ }
}
- wrap->EmitRead(nread, *buf, type);
+ EmitRead(nread, *buf);
}
@@ -373,11 +356,6 @@ void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) {
req_wrap->Done(status);
}
-
-void LibuvStreamWrap::AfterWrite(WriteWrap* w, int status) {
- StreamBase::AfterWrite(w, status);
-}
-
} // namespace node
NODE_BUILTIN_MODULE_CONTEXT_AWARE(stream_wrap,
diff --git a/src/stream_wrap.h b/src/stream_wrap.h
index 129006b160..e5ad25b91e 100644
--- a/src/stream_wrap.h
+++ b/src/stream_wrap.h
@@ -33,9 +33,7 @@
namespace node {
-class LibuvStreamWrap : public HandleWrap,
- public StreamListener,
- public StreamBase {
+class LibuvStreamWrap : public HandleWrap, public StreamBase {
public:
static void Initialize(v8::Local<v8::Object> target,
v8::Local<v8::Value> unused,
@@ -93,30 +91,12 @@ class LibuvStreamWrap : public HandleWrap,
static void SetBlocking(const v8::FunctionCallbackInfo<v8::Value>& args);
// Callbacks for libuv
- static void OnAlloc(uv_handle_t* handle,
- size_t suggested_size,
- uv_buf_t* buf);
+ void OnUvAlloc(size_t suggested_size, uv_buf_t* buf);
+ void OnUvRead(ssize_t nread, const uv_buf_t* buf);
- static void OnRead(uv_stream_t* handle,
- ssize_t nread,
- const uv_buf_t* buf);
static void AfterUvWrite(uv_write_t* req, int status);
static void AfterUvShutdown(uv_shutdown_t* req, int status);
- // Resource interface implementation
- void OnStreamRead(ssize_t nread,
- const uv_buf_t& buf) override {
- CHECK(0 && "must not be called");
- }
- void OnStreamRead(ssize_t nread,
- const uv_buf_t& buf,
- uv_handle_type pending) override;
- void OnStreamAfterWrite(WriteWrap* w, int status) override {
- previous_listener_->OnStreamAfterWrite(w, status);
- }
-
- void AfterWrite(WriteWrap* req_wrap, int status) override;
-
uv_stream_t* const stream_;
};