summaryrefslogtreecommitdiff
path: root/src/stream_base.cc
diff options
context:
space:
mode:
authorAnna Henningsen <anna@addaleax.net>2018-02-08 04:59:10 +0100
committerAnna Henningsen <anna@addaleax.net>2018-02-14 10:00:29 +0100
commit0e7b61229aa602e55c5fb034a63d7da97eecff3b (patch)
tree0e64305591fd94e1b609c5fd4ba1ae1bd19ea66a /src/stream_base.cc
parent0ed9ea861b847579478457b7f5aab430fb6d77cb (diff)
downloadandroid-node-v8-0e7b61229aa602e55c5fb034a63d7da97eecff3b.tar.gz
android-node-v8-0e7b61229aa602e55c5fb034a63d7da97eecff3b.tar.bz2
android-node-v8-0e7b61229aa602e55c5fb034a63d7da97eecff3b.zip
src: refactor WriteWrap and ShutdownWraps
Encapsulate stream requests more: - `WriteWrap` and `ShutdownWrap` classes are now tailored to the streams on which they are used. In particular, for most streams these are now plain `AsyncWrap`s and do not carry the overhead of unused libuv request data. - Provide generic `Write()` and `Shutdown()` methods that wrap around the actual implementations, and make *usage* of streams easier, rather than implementing; for example, wrap objects don’t need to be provided by callers anymore. - Use `EmitAfterWrite()` and `EmitAfterShutdown()` handlers to call the corresponding JS handlers, rather than always trying to call them. This makes usage of streams by other C++ code easier and leaner. Also fix up some tests that were previously not actually testing asynchronicity when the comments indicated that they would. PR-URL: https://github.com/nodejs/node/pull/18676 Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl> Reviewed-By: Anatoli Papirovski <apapirovski@mac.com> Reviewed-By: James M Snell <jasnell@gmail.com>
Diffstat (limited to 'src/stream_base.cc')
-rw-r--r--src/stream_base.cc300
1 files changed, 98 insertions, 202 deletions
diff --git a/src/stream_base.cc b/src/stream_base.cc
index 8bdcebe88a..9ad9fd5bcb 100644
--- a/src/stream_base.cc
+++ b/src/stream_base.cc
@@ -34,6 +34,11 @@ template int StreamBase::WriteString<LATIN1>(
const FunctionCallbackInfo<Value>& args);
+struct Free {
+ void operator()(char* ptr) const { free(ptr); }
+};
+
+
int StreamBase::ReadStartJS(const FunctionCallbackInfo<Value>& args) {
return ReadStart();
}
@@ -45,45 +50,10 @@ int StreamBase::ReadStopJS(const FunctionCallbackInfo<Value>& args) {
int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
- Environment* env = Environment::GetCurrent(args);
-
CHECK(args[0]->IsObject());
Local<Object> req_wrap_obj = args[0].As<Object>();
- AsyncWrap* wrap = GetAsyncWrap();
- CHECK_NE(wrap, nullptr);
- AsyncHooks::DefaultTriggerAsyncIdScope(env, wrap->get_async_id());
- ShutdownWrap* req_wrap = new ShutdownWrap(env,
- req_wrap_obj,
- this);
-
- int err = DoShutdown(req_wrap);
- if (err)
- delete req_wrap;
- return err;
-}
-
-
-void StreamBase::AfterShutdown(ShutdownWrap* req_wrap, int status) {
- Environment* env = req_wrap->env();
-
- // The wrap and request objects should still be there.
- CHECK_EQ(req_wrap->persistent().IsEmpty(), false);
-
- HandleScope handle_scope(env->isolate());
- Context::Scope context_scope(env->context());
-
- Local<Object> req_wrap_obj = req_wrap->object();
- Local<Value> argv[3] = {
- Integer::New(env->isolate(), status),
- GetObject(),
- req_wrap_obj
- };
-
- if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
- req_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
-
- delete req_wrap;
+ return Shutdown(req_wrap_obj);
}
@@ -104,19 +74,14 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
count = chunks->Length() >> 1;
MaybeStackBuffer<uv_buf_t, 16> bufs(count);
- uv_buf_t* buf_list = *bufs;
size_t storage_size = 0;
uint32_t bytes = 0;
size_t offset;
- WriteWrap* req_wrap;
- int err;
if (!all_buffers) {
// Determine storage size first
for (size_t i = 0; i < count; i++) {
- storage_size = ROUND_UP(storage_size, WriteWrap::kAlignSize);
-
Local<Value> chunk = chunks->Get(i * 2);
if (Buffer::HasInstance(chunk))
@@ -145,20 +110,11 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
bufs[i].len = Buffer::Length(chunk);
bytes += bufs[i].len;
}
-
- // Try writing immediately without allocation
- err = DoTryWrite(&buf_list, &count);
- if (err != 0 || count == 0)
- goto done;
}
- {
- AsyncWrap* wrap = GetAsyncWrap();
- CHECK_NE(wrap, nullptr);
- AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(env,
- wrap->get_async_id());
- req_wrap = WriteWrap::New(env, req_wrap_obj, this, storage_size);
- }
+ std::unique_ptr<char[], Free> storage;
+ if (storage_size > 0)
+ storage = std::unique_ptr<char[], Free>(Malloc(storage_size));
offset = 0;
if (!all_buffers) {
@@ -174,9 +130,8 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
}
// Write string
- offset = ROUND_UP(offset, WriteWrap::kAlignSize);
CHECK_LE(offset, storage_size);
- char* str_storage = req_wrap->Extra(offset);
+ char* str_storage = storage.get() + offset;
size_t str_size = storage_size - offset;
Local<String> string = chunk->ToString(env->context()).ToLocalChecked();
@@ -192,35 +147,17 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
offset += str_size;
bytes += str_size;
}
-
- err = DoTryWrite(&buf_list, &count);
- if (err != 0 || count == 0) {
- req_wrap->Dispatched();
- req_wrap->Dispose();
- goto done;
- }
}
- err = DoWrite(req_wrap, buf_list, count, nullptr);
- req_wrap_obj->Set(env->async(), True(env->isolate()));
-
- if (err)
- req_wrap->Dispose();
-
- done:
- const char* msg = Error();
- if (msg != nullptr) {
- req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
- ClearError();
- }
+ StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj);
req_wrap_obj->Set(env->bytes_string(), Number::New(env->isolate(), bytes));
-
- return err;
+ if (res.wrap != nullptr && storage) {
+ res.wrap->SetAllocatedStorage(storage.release(), storage_size);
+ }
+ return res.err;
}
-
-
int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
CHECK(args[0]->IsObject());
@@ -232,49 +169,20 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
}
Local<Object> req_wrap_obj = args[0].As<Object>();
- const char* data = Buffer::Data(args[1]);
- size_t length = Buffer::Length(args[1]);
- WriteWrap* req_wrap;
uv_buf_t buf;
- buf.base = const_cast<char*>(data);
- buf.len = length;
-
- // Try writing immediately without allocation
- uv_buf_t* bufs = &buf;
- size_t count = 1;
- int err = DoTryWrite(&bufs, &count);
- if (err != 0)
- goto done;
- if (count == 0)
- goto done;
- CHECK_EQ(count, 1);
-
- // Allocate, or write rest
- {
- AsyncWrap* wrap = GetAsyncWrap();
- CHECK_NE(wrap, nullptr);
- AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(env,
- wrap->get_async_id());
- req_wrap = WriteWrap::New(env, req_wrap_obj, this);
- }
+ buf.base = Buffer::Data(args[1]);
+ buf.len = Buffer::Length(args[1]);
- err = DoWrite(req_wrap, bufs, count, nullptr);
- req_wrap_obj->Set(env->async(), True(env->isolate()));
- req_wrap_obj->Set(env->buffer_string(), args[1]);
+ StreamWriteResult res = Write(&buf, 1, nullptr, req_wrap_obj);
- if (err)
- req_wrap->Dispose();
+ if (res.async)
+ req_wrap_obj->Set(env->context(), env->buffer_string(), args[1]).FromJust();
+ req_wrap_obj->Set(env->context(), env->bytes_string(),
+ Integer::NewFromUnsigned(env->isolate(), buf.len))
+ .FromJust();
- done:
- const char* msg = Error();
- if (msg != nullptr) {
- req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
- ClearError();
- }
- req_wrap_obj->Set(env->bytes_string(),
- Integer::NewFromUnsigned(env->isolate(), length));
- return err;
+ return res.err;
}
@@ -305,8 +213,6 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
return UV_ENOBUFS;
// Try writing immediately if write size isn't too big
- WriteWrap* req_wrap;
- char* data;
char stack_storage[16384]; // 16kb
size_t data_size;
uv_buf_t buf;
@@ -325,36 +231,33 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
size_t count = 1;
err = DoTryWrite(&bufs, &count);
- // Failure
- if (err != 0)
- goto done;
-
- // Success
- if (count == 0)
- goto done;
+ // Immediate failure or success
+ if (err != 0 || count == 0) {
+ req_wrap_obj->Set(env->context(), env->async(), False(env->isolate()))
+ .FromJust();
+ req_wrap_obj->Set(env->context(),
+ env->bytes_string(),
+ Integer::NewFromUnsigned(env->isolate(), data_size))
+ .FromJust();
+ return err;
+ }
// Partial write
CHECK_EQ(count, 1);
}
- {
- AsyncWrap* wrap = GetAsyncWrap();
- CHECK_NE(wrap, nullptr);
- AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(env,
- wrap->get_async_id());
- req_wrap = WriteWrap::New(env, req_wrap_obj, this, storage_size);
- }
-
- data = req_wrap->Extra();
+ std::unique_ptr<char[], Free> data;
if (try_write) {
// Copy partial data
- memcpy(data, buf.base, buf.len);
+ data = std::unique_ptr<char[], Free>(Malloc(buf.len));
+ memcpy(data.get(), buf.base, buf.len);
data_size = buf.len;
} else {
// Write it
+ data = std::unique_ptr<char[], Free>(Malloc(storage_size));
data_size = StringBytes::Write(env->isolate(),
- data,
+ data.get(),
storage_size,
string,
enc);
@@ -362,78 +265,36 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
CHECK_LE(data_size, storage_size);
- buf = uv_buf_init(data, data_size);
-
- if (!IsIPCPipe()) {
- err = DoWrite(req_wrap, &buf, 1, nullptr);
- } else {
- uv_handle_t* send_handle = nullptr;
-
- if (!send_handle_obj.IsEmpty()) {
- HandleWrap* wrap;
- ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
- send_handle = wrap->GetHandle();
- // Reference LibuvStreamWrap instance to prevent it from being garbage
- // collected before `AfterWrite` is called.
- CHECK_EQ(false, req_wrap->persistent().IsEmpty());
- req_wrap_obj->Set(env->handle_string(), send_handle_obj);
- }
-
- err = DoWrite(
- req_wrap,
- &buf,
- 1,
- reinterpret_cast<uv_stream_t*>(send_handle));
+ buf = uv_buf_init(data.get(), data_size);
+
+ uv_stream_t* send_handle = nullptr;
+
+ if (IsIPCPipe() && !send_handle_obj.IsEmpty()) {
+ // TODO(addaleax): This relies on the fact that HandleWrap comes first
+ // as a superclass of each individual subclass.
+ // There are similar assumptions in other places in the code base.
+ // A better idea would be having all BaseObject's internal pointers
+ // refer to the BaseObject* itself; this would require refactoring
+ // throughout the code base but makes Node rely much less on C++ quirks.
+ HandleWrap* wrap;
+ ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
+ send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
+ // Reference LibuvStreamWrap instance to prevent it from being garbage
+ // collected before `AfterWrite` is called.
+ req_wrap_obj->Set(env->handle_string(), send_handle_obj);
}
- req_wrap_obj->Set(env->async(), True(env->isolate()));
+ StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
- if (err)
- req_wrap->Dispose();
+ req_wrap_obj->Set(env->context(), env->bytes_string(),
+ Integer::NewFromUnsigned(env->isolate(), data_size))
+ .FromJust();
- done:
- const char* msg = Error();
- if (msg != nullptr) {
- req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
- ClearError();
+ if (res.wrap != nullptr) {
+ res.wrap->SetAllocatedStorage(data.release(), data_size);
}
- req_wrap_obj->Set(env->bytes_string(),
- Integer::NewFromUnsigned(env->isolate(), data_size));
- return err;
-}
-
-
-void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) {
- Environment* env = req_wrap->env();
-
- HandleScope handle_scope(env->isolate());
- Context::Scope context_scope(env->context());
-
- // The wrap and request objects should still be there.
- CHECK_EQ(req_wrap->persistent().IsEmpty(), false);
-
- // Unref handle property
- Local<Object> req_wrap_obj = req_wrap->object();
- req_wrap_obj->Delete(env->context(), env->handle_string()).FromJust();
- EmitAfterWrite(req_wrap, status);
-
- Local<Value> argv[] = {
- Integer::New(env->isolate(), status),
- GetObject(),
- req_wrap_obj,
- Undefined(env->isolate())
- };
-
- const char* msg = Error();
- if (msg != nullptr) {
- argv[3] = OneByteString(env->isolate(), msg);
- ClearError();
- }
-
- if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
- req_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
- req_wrap->Dispose();
+ return res.err;
}
@@ -510,4 +371,39 @@ void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
stream->CallJSOnreadMethod(nread, obj);
}
+
+void ReportWritesToJSStreamListener::OnStreamAfterReqFinished(
+ StreamReq* req_wrap, int status) {
+ StreamBase* stream = static_cast<StreamBase*>(stream_);
+ Environment* env = stream->stream_env();
+ AsyncWrap* async_wrap = req_wrap->GetAsyncWrap();
+ Local<Object> req_wrap_obj = async_wrap->object();
+
+ Local<Value> argv[] = {
+ Integer::New(env->isolate(), status),
+ stream->GetObject(),
+ Undefined(env->isolate())
+ };
+
+ const char* msg = stream->Error();
+ if (msg != nullptr) {
+ argv[2] = OneByteString(env->isolate(), msg);
+ stream->ClearError();
+ }
+
+ if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
+ async_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
+}
+
+void ReportWritesToJSStreamListener::OnStreamAfterWrite(
+ WriteWrap* req_wrap, int status) {
+ OnStreamAfterReqFinished(req_wrap, status);
+}
+
+void ReportWritesToJSStreamListener::OnStreamAfterShutdown(
+ ShutdownWrap* req_wrap, int status) {
+ OnStreamAfterReqFinished(req_wrap, status);
+}
+
+
} // namespace node