summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/async_wrap.h1
-rw-r--r--src/base_object-inl.h8
-rw-r--r--src/base_object.h4
-rw-r--r--src/bootstrapper.cc14
-rw-r--r--src/callback_scope.cc5
-rw-r--r--src/env-inl.h31
-rw-r--r--src/env.cc27
-rw-r--r--src/env.h29
-rw-r--r--src/js_stream.cc15
-rw-r--r--src/node.cc116
-rw-r--r--src/node_errors.h4
-rw-r--r--src/node_internals.h5
-rw-r--r--src/node_messaging.cc29
-rw-r--r--src/node_messaging.h5
-rw-r--r--src/node_worker.cc428
-rw-r--r--src/node_worker.h83
16 files changed, 746 insertions, 58 deletions
diff --git a/src/async_wrap.h b/src/async_wrap.h
index cf269a4c1f..b2f96477b4 100644
--- a/src/async_wrap.h
+++ b/src/async_wrap.h
@@ -67,6 +67,7 @@ namespace node {
V(TTYWRAP) \
V(UDPSENDWRAP) \
V(UDPWRAP) \
+ V(WORKER) \
V(WRITEWRAP) \
V(ZLIB)
diff --git a/src/base_object-inl.h b/src/base_object-inl.h
index 3bd854639b..06a2922397 100644
--- a/src/base_object-inl.h
+++ b/src/base_object-inl.h
@@ -65,6 +65,14 @@ v8::Local<v8::Object> BaseObject::object() {
return PersistentToLocal(env_->isolate(), persistent_handle_);
}
+v8::Local<v8::Object> BaseObject::object(v8::Isolate* isolate) {
+ v8::Local<v8::Object> handle = object();
+#ifdef DEBUG
+ CHECK_EQ(handle->CreationContext()->GetIsolate(), isolate);
+ CHECK_EQ(env_->isolate(), isolate);
+#endif
+ return handle;
+}
Environment* BaseObject::env() const {
return env_;
diff --git a/src/base_object.h b/src/base_object.h
index e0b6084340..38291d598f 100644
--- a/src/base_object.h
+++ b/src/base_object.h
@@ -43,6 +43,10 @@ class BaseObject {
// persistent.IsEmpty() is true.
inline v8::Local<v8::Object> object();
+ // Same as the above, except it additionally verifies that this object
+ // is associated with the passed Isolate in debug mode.
+ inline v8::Local<v8::Object> object(v8::Isolate* isolate);
+
inline Persistent<v8::Object>& persistent();
inline Environment* env() const;
diff --git a/src/bootstrapper.cc b/src/bootstrapper.cc
index 35c7c4dc69..f9db02562d 100644
--- a/src/bootstrapper.cc
+++ b/src/bootstrapper.cc
@@ -114,12 +114,14 @@ void SetupBootstrapObject(Environment* env,
BOOTSTRAP_METHOD(_umask, Umask);
#if defined(__POSIX__) && !defined(__ANDROID__) && !defined(__CloudABI__)
- BOOTSTRAP_METHOD(_initgroups, InitGroups);
- BOOTSTRAP_METHOD(_setegid, SetEGid);
- BOOTSTRAP_METHOD(_seteuid, SetEUid);
- BOOTSTRAP_METHOD(_setgid, SetGid);
- BOOTSTRAP_METHOD(_setuid, SetUid);
- BOOTSTRAP_METHOD(_setgroups, SetGroups);
+ if (env->is_main_thread()) {
+ BOOTSTRAP_METHOD(_initgroups, InitGroups);
+ BOOTSTRAP_METHOD(_setegid, SetEGid);
+ BOOTSTRAP_METHOD(_seteuid, SetEUid);
+ BOOTSTRAP_METHOD(_setgid, SetGid);
+ BOOTSTRAP_METHOD(_setuid, SetUid);
+ BOOTSTRAP_METHOD(_setgroups, SetGroups);
+ }
#endif // __POSIX__ && !defined(__ANDROID__) && !defined(__CloudABI__)
Local<String> should_abort_on_uncaught_toggle =
diff --git a/src/callback_scope.cc b/src/callback_scope.cc
index 9eac7beb03..23e6d5b063 100644
--- a/src/callback_scope.cc
+++ b/src/callback_scope.cc
@@ -79,6 +79,11 @@ void InternalCallbackScope::Close() {
closed_ = true;
HandleScope handle_scope(env_->isolate());
+ if (!env_->can_call_into_js()) return;
+ if (failed_ && !env_->is_main_thread() && env_->is_stopping_worker()) {
+ env_->async_hooks()->clear_async_id_stack();
+ }
+
if (pushed_ids_)
env_->async_hooks()->pop_async_id(async_context_.async_id);
diff --git a/src/env-inl.h b/src/env-inl.h
index 50328bd77c..eeb419b4a0 100644
--- a/src/env-inl.h
+++ b/src/env-inl.h
@@ -582,13 +582,42 @@ void Environment::SetUnrefImmediate(native_immediate_callback cb,
}
inline bool Environment::can_call_into_js() const {
- return can_call_into_js_;
+ return can_call_into_js_ && (is_main_thread() || !is_stopping_worker());
}
inline void Environment::set_can_call_into_js(bool can_call_into_js) {
can_call_into_js_ = can_call_into_js;
}
+inline bool Environment::is_main_thread() const {
+ return thread_id_ == 0;
+}
+
+inline double Environment::thread_id() const {
+ return thread_id_;
+}
+
+inline void Environment::set_thread_id(double id) {
+ thread_id_ = id;
+}
+
+inline worker::Worker* Environment::worker_context() const {
+ return worker_context_;
+}
+
+inline void Environment::set_worker_context(worker::Worker* context) {
+ CHECK_EQ(worker_context_, nullptr); // Should be set only once.
+ worker_context_ = context;
+}
+
+inline void Environment::add_sub_worker_context(worker::Worker* context) {
+ sub_worker_contexts_.insert(context);
+}
+
+inline void Environment::remove_sub_worker_context(worker::Worker* context) {
+ sub_worker_contexts_.erase(context);
+}
+
inline performance::performance_state* Environment::performance_state() {
return performance_state_.get();
}
diff --git a/src/env.cc b/src/env.cc
index 090b43968b..8df59d1546 100644
--- a/src/env.cc
+++ b/src/env.cc
@@ -4,6 +4,7 @@
#include "node_buffer.h"
#include "node_platform.h"
#include "node_file.h"
+#include "node_worker.h"
#include "tracing/agent.h"
#include <stdio.h>
@@ -25,6 +26,7 @@ using v8::StackTrace;
using v8::String;
using v8::Symbol;
using v8::Value;
+using worker::Worker;
IsolateData::IsolateData(Isolate* isolate,
uv_loop_t* event_loop,
@@ -444,7 +446,9 @@ void Environment::RunAndClearNativeImmediates() {
if (it->refed_)
ref_count++;
if (UNLIKELY(try_catch.HasCaught())) {
- FatalException(isolate(), try_catch);
+ if (!try_catch.HasTerminated())
+ FatalException(isolate(), try_catch);
+
// Bail out, remove the already executed callbacks from list
// and set up a new TryCatch for the other pending callbacks.
std::move_backward(it, list.end(), list.begin() + (list.end() - it));
@@ -632,4 +636,25 @@ void Environment::AsyncHooks::grow_async_ids_stack() {
uv_key_t Environment::thread_local_env = {};
+void Environment::Exit(int exit_code) {
+ if (is_main_thread())
+ exit(exit_code);
+ else
+ worker_context_->Exit(exit_code);
+}
+
+void Environment::stop_sub_worker_contexts() {
+ while (!sub_worker_contexts_.empty()) {
+ Worker* w = *sub_worker_contexts_.begin();
+ remove_sub_worker_context(w);
+ w->Exit(1);
+ w->JoinThread();
+ }
+}
+
+bool Environment::is_stopping_worker() const {
+ CHECK(!is_main_thread());
+ return worker_context_->is_stopped();
+}
+
} // namespace node
diff --git a/src/env.h b/src/env.h
index cdb592732a..cf6873e5fe 100644
--- a/src/env.h
+++ b/src/env.h
@@ -55,6 +55,10 @@ namespace performance {
class performance_state;
}
+namespace worker {
+class Worker;
+}
+
namespace loader {
class ModuleWrap;
@@ -193,7 +197,10 @@ struct PackageConfig {
V(mac_string, "mac") \
V(main_string, "main") \
V(max_buffer_string, "maxBuffer") \
+ V(max_semi_space_size_string, "maxSemiSpaceSize") \
+ V(max_old_space_size_string, "maxOldSpaceSize") \
V(message_string, "message") \
+ V(message_port_string, "messagePort") \
V(message_port_constructor_string, "MessagePort") \
V(minttl_string, "minttl") \
V(modulus_string, "modulus") \
@@ -280,6 +287,7 @@ struct PackageConfig {
V(subject_string, "subject") \
V(subjectaltname_string, "subjectaltname") \
V(syscall_string, "syscall") \
+ V(thread_id_string, "threadId") \
V(ticketkeycallback_string, "onticketkeycallback") \
V(timeout_string, "timeout") \
V(tls_ticket_string, "tlsTicket") \
@@ -328,6 +336,7 @@ struct PackageConfig {
V(http2stream_constructor_template, v8::ObjectTemplate) \
V(immediate_callback_function, v8::Function) \
V(inspector_console_api_object, v8::Object) \
+ V(message_port, v8::Object) \
V(message_port_constructor_template, v8::FunctionTemplate) \
V(pbkdf2_constructor_template, v8::ObjectTemplate) \
V(pipe_constructor_template, v8::FunctionTemplate) \
@@ -601,6 +610,7 @@ class Environment {
void RegisterHandleCleanups();
void CleanupHandles();
+ void Exit(int code);
// Register clean-up cb to be called on environment destruction.
inline void RegisterHandleCleanup(uv_handle_t* handle,
@@ -714,6 +724,18 @@ class Environment {
inline bool can_call_into_js() const;
inline void set_can_call_into_js(bool can_call_into_js);
+ // TODO(addaleax): This should be inline.
+ bool is_stopping_worker() const;
+
+ inline bool is_main_thread() const;
+ inline double thread_id() const;
+ inline void set_thread_id(double id);
+ inline worker::Worker* worker_context() const;
+ inline void set_worker_context(worker::Worker* context);
+ inline void add_sub_worker_context(worker::Worker* context);
+ inline void remove_sub_worker_context(worker::Worker* context);
+ void stop_sub_worker_contexts();
+
inline void ThrowError(const char* errmsg);
inline void ThrowTypeError(const char* errmsg);
inline void ThrowRangeError(const char* errmsg);
@@ -855,12 +877,15 @@ class Environment {
std::vector<double> destroy_async_id_list_;
AliasedBuffer<uint32_t, v8::Uint32Array> should_abort_on_uncaught_toggle_;
-
int should_not_abort_scope_counter_ = 0;
std::unique_ptr<performance::performance_state> performance_state_;
std::unordered_map<std::string, uint64_t> performance_marks_;
+
bool can_call_into_js_ = true;
+ double thread_id_ = 0;
+ std::unordered_set<worker::Worker*> sub_worker_contexts_;
+
#if HAVE_INSPECTOR
std::unique_ptr<inspector::Agent> inspector_agent_;
@@ -893,6 +918,8 @@ class Environment {
std::vector<std::unique_ptr<fs::FileHandleReadWrap>>
file_handle_read_wrap_freelist_;
+ worker::Worker* worker_context_ = nullptr;
+
struct ExitCallback {
void (*cb_)(void* arg);
void* arg_;
diff --git a/src/js_stream.cc b/src/js_stream.cc
index c766c322e3..e562a62f3d 100644
--- a/src/js_stream.cc
+++ b/src/js_stream.cc
@@ -44,7 +44,8 @@ bool JSStream::IsClosing() {
TryCatch try_catch(env()->isolate());
Local<Value> value;
if (!MakeCallback(env()->isclosing_string(), 0, nullptr).ToLocal(&value)) {
- FatalException(env()->isolate(), try_catch);
+ if (!try_catch.HasTerminated())
+ FatalException(env()->isolate(), try_catch);
return true;
}
return value->IsTrue();
@@ -59,7 +60,8 @@ int JSStream::ReadStart() {
int value_int = UV_EPROTO;
if (!MakeCallback(env()->onreadstart_string(), 0, nullptr).ToLocal(&value) ||
!value->Int32Value(env()->context()).To(&value_int)) {
- FatalException(env()->isolate(), try_catch);
+ if (!try_catch.HasTerminated())
+ FatalException(env()->isolate(), try_catch);
}
return value_int;
}
@@ -73,7 +75,8 @@ int JSStream::ReadStop() {
int value_int = UV_EPROTO;
if (!MakeCallback(env()->onreadstop_string(), 0, nullptr).ToLocal(&value) ||
!value->Int32Value(env()->context()).To(&value_int)) {
- FatalException(env()->isolate(), try_catch);
+ if (!try_catch.HasTerminated())
+ FatalException(env()->isolate(), try_catch);
}
return value_int;
}
@@ -94,7 +97,8 @@ int JSStream::DoShutdown(ShutdownWrap* req_wrap) {
arraysize(argv),
argv).ToLocal(&value) ||
!value->Int32Value(env()->context()).To(&value_int)) {
- FatalException(env()->isolate(), try_catch);
+ if (!try_catch.HasTerminated())
+ FatalException(env()->isolate(), try_catch);
}
return value_int;
}
@@ -128,7 +132,8 @@ int JSStream::DoWrite(WriteWrap* w,
arraysize(argv),
argv).ToLocal(&value) ||
!value->Int32Value(env()->context()).To(&value_int)) {
- FatalException(env()->isolate(), try_catch);
+ if (!try_catch.HasTerminated())
+ FatalException(env()->isolate(), try_catch);
}
return value_int;
}
diff --git a/src/node.cc b/src/node.cc
index baa97281b0..663e4a222e 100644
--- a/src/node.cc
+++ b/src/node.cc
@@ -1021,9 +1021,9 @@ void AppendExceptionLine(Environment* env,
}
-static void ReportException(Environment* env,
- Local<Value> er,
- Local<Message> message) {
+void ReportException(Environment* env,
+ Local<Value> er,
+ Local<Message> message) {
CHECK(!er.IsEmpty());
HandleScope scope(env->isolate());
@@ -1110,9 +1110,9 @@ static void ReportException(Environment* env, const TryCatch& try_catch) {
// Executes a str within the current v8 context.
-static Local<Value> ExecuteString(Environment* env,
- Local<String> source,
- Local<String> filename) {
+static MaybeLocal<Value> ExecuteString(Environment* env,
+ Local<String> source,
+ Local<String> filename) {
EscapableHandleScope scope(env->isolate());
TryCatch try_catch(env->isolate());
@@ -1125,13 +1125,19 @@ static Local<Value> ExecuteString(Environment* env,
v8::Script::Compile(env->context(), source, &origin);
if (script.IsEmpty()) {
ReportException(env, try_catch);
- exit(3);
+ env->Exit(3);
+ return MaybeLocal<Value>();
}
MaybeLocal<Value> result = script.ToLocalChecked()->Run(env->context());
if (result.IsEmpty()) {
+ if (try_catch.HasTerminated()) {
+ env->isolate()->CancelTerminateExecution();
+ return MaybeLocal<Value>();
+ }
ReportException(env, try_catch);
- exit(4);
+ env->Exit(4);
+ return MaybeLocal<Value>();
}
return scope.Escape(result.ToLocalChecked());
@@ -1230,6 +1236,7 @@ static void Abort(const FunctionCallbackInfo<Value>& args) {
void Chdir(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
+ CHECK(env->is_main_thread());
CHECK_EQ(args.Length(), 1);
CHECK(args[0]->IsString());
@@ -1411,6 +1418,7 @@ static void GetEGid(const FunctionCallbackInfo<Value>& args) {
void SetGid(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
+ CHECK(env->is_main_thread());
CHECK_EQ(args.Length(), 1);
CHECK(args[0]->IsUint32() || args[0]->IsString());
@@ -1430,6 +1438,7 @@ void SetGid(const FunctionCallbackInfo<Value>& args) {
void SetEGid(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
+ CHECK(env->is_main_thread());
CHECK_EQ(args.Length(), 1);
CHECK(args[0]->IsUint32() || args[0]->IsString());
@@ -1449,6 +1458,7 @@ void SetEGid(const FunctionCallbackInfo<Value>& args) {
void SetUid(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
+ CHECK(env->is_main_thread());
CHECK_EQ(args.Length(), 1);
CHECK(args[0]->IsUint32() || args[0]->IsString());
@@ -1468,6 +1478,7 @@ void SetUid(const FunctionCallbackInfo<Value>& args) {
void SetEUid(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
+ CHECK(env->is_main_thread());
CHECK_EQ(args.Length(), 1);
CHECK(args[0]->IsUint32() || args[0]->IsString());
@@ -1629,9 +1640,10 @@ static void WaitForInspectorDisconnect(Environment* env) {
static void Exit(const FunctionCallbackInfo<Value>& args) {
- WaitForInspectorDisconnect(Environment::GetCurrent(args));
+ Environment* env = Environment::GetCurrent(args);
+ WaitForInspectorDisconnect(env);
v8_platform.StopTracingAgent();
- exit(args[0]->Int32Value());
+ env->Exit(args[0]->Int32Value());
}
@@ -2040,6 +2052,9 @@ void FatalException(Isolate* isolate,
Local<Value> caught =
fatal_exception_function->Call(process_object, 1, &error);
+ if (fatal_try_catch.HasTerminated())
+ return;
+
if (fatal_try_catch.HasCaught()) {
// The fatal exception function threw, so we must exit
ReportException(env, fatal_try_catch);
@@ -2053,6 +2068,12 @@ void FatalException(Isolate* isolate,
void FatalException(Isolate* isolate, const TryCatch& try_catch) {
+ // If we try to print out a termination exception, we'd just get 'null',
+ // so just crashing here with that information seems like a better idea,
+ // and in particular it seems like we should handle terminations at the call
+ // site for this function rather than by printing them out somewhere.
+ CHECK(!try_catch.HasTerminated());
+
HandleScope scope(isolate);
if (!try_catch.IsVerbose()) {
FatalException(isolate, try_catch.Exception(), try_catch.Message());
@@ -2574,11 +2595,12 @@ void SetupProcessObject(Environment* env,
Local<Object> process = env->process_object();
auto title_string = FIXED_ONE_BYTE_STRING(env->isolate(), "title");
- CHECK(process->SetAccessor(env->context(),
- title_string,
- ProcessTitleGetter,
- ProcessTitleSetter,
- env->as_external()).FromJust());
+ CHECK(process->SetAccessor(
+ env->context(),
+ title_string,
+ ProcessTitleGetter,
+ env->is_main_thread() ? ProcessTitleSetter : nullptr,
+ env->as_external()).FromJust());
// process.version
READONLY_PROPERTY(process,
@@ -2862,25 +2884,27 @@ void SetupProcessObject(Environment* env,
CHECK(process->SetAccessor(env->context(),
debug_port_string,
DebugPortGetter,
- DebugPortSetter,
+ env->is_main_thread() ? DebugPortSetter : nullptr,
env->as_external()).FromJust());
// define various internal methods
- env->SetMethod(process,
- "_startProfilerIdleNotifier",
- StartProfilerIdleNotifier);
- env->SetMethod(process,
- "_stopProfilerIdleNotifier",
- StopProfilerIdleNotifier);
+ if (env->is_main_thread()) {
+ env->SetMethod(process,
+ "_startProfilerIdleNotifier",
+ StartProfilerIdleNotifier);
+ env->SetMethod(process,
+ "_stopProfilerIdleNotifier",
+ StopProfilerIdleNotifier);
+ env->SetMethod(process, "abort", Abort);
+ env->SetMethod(process, "chdir", Chdir);
+ env->SetMethod(process, "umask", Umask);
+ }
+
env->SetMethod(process, "_getActiveRequests", GetActiveRequests);
env->SetMethod(process, "_getActiveHandles", GetActiveHandles);
env->SetMethod(process, "reallyExit", Exit);
- env->SetMethod(process, "abort", Abort);
- env->SetMethod(process, "chdir", Chdir);
env->SetMethod(process, "cwd", Cwd);
- env->SetMethod(process, "umask", Umask);
-
#if defined(__POSIX__) && !defined(__ANDROID__) && !defined(__CloudABI__)
env->SetMethod(process, "getuid", GetUid);
env->SetMethod(process, "geteuid", GetEUid);
@@ -2890,16 +2914,17 @@ void SetupProcessObject(Environment* env,
#endif // __POSIX__ && !defined(__ANDROID__) && !defined(__CloudABI__)
env->SetMethod(process, "_kill", Kill);
+ env->SetMethod(process, "dlopen", DLOpen);
- env->SetMethod(process, "_debugProcess", DebugProcess);
- env->SetMethod(process, "_debugEnd", DebugEnd);
+ if (env->is_main_thread()) {
+ env->SetMethod(process, "_debugProcess", DebugProcess);
+ env->SetMethod(process, "_debugEnd", DebugEnd);
+ }
env->SetMethod(process, "hrtime", Hrtime);
env->SetMethod(process, "cpuUsage", CPUUsage);
- env->SetMethod(process, "dlopen", DLOpen);
-
env->SetMethod(process, "uptime", Uptime);
env->SetMethod(process, "memoryUsage", MemoryUsage);
}
@@ -2935,8 +2960,10 @@ void RawDebug(const FunctionCallbackInfo<Value>& args) {
}
-static Local<Function> GetBootstrapper(Environment* env, Local<String> source,
- Local<String> script_name) {
+static MaybeLocal<Function> GetBootstrapper(
+ Environment* env,
+ Local<String> source,
+ Local<String> script_name) {
EscapableHandleScope scope(env->isolate());
TryCatch try_catch(env->isolate());
@@ -2947,16 +2974,17 @@ static Local<Function> GetBootstrapper(Environment* env, Local<String> source,
try_catch.SetVerbose(false);
// Execute the bootstrapper javascript file
- Local<Value> bootstrapper_v = ExecuteString(env, source, script_name);
+ MaybeLocal<Value> bootstrapper_v = ExecuteString(env, source, script_name);
+ if (bootstrapper_v.IsEmpty()) // This happens when execution was interrupted.
+ return MaybeLocal<Function>();
+
if (try_catch.HasCaught()) {
ReportException(env, try_catch);
exit(10);
}
- CHECK(bootstrapper_v->IsFunction());
- Local<Function> bootstrapper = Local<Function>::Cast(bootstrapper_v);
-
- return scope.Escape(bootstrapper);
+ CHECK(bootstrapper_v.ToLocalChecked()->IsFunction());
+ return scope.Escape(bootstrapper_v.ToLocalChecked().As<Function>());
}
static bool ExecuteBootstrapper(Environment* env, Local<Function> bootstrapper,
@@ -2995,13 +3023,18 @@ void LoadEnvironment(Environment* env) {
// node_js2c.
Local<String> loaders_name =
FIXED_ONE_BYTE_STRING(env->isolate(), "internal/bootstrap/loaders.js");
- Local<Function> loaders_bootstrapper =
+ MaybeLocal<Function> loaders_bootstrapper =
GetBootstrapper(env, LoadersBootstrapperSource(env), loaders_name);
Local<String> node_name =
FIXED_ONE_BYTE_STRING(env->isolate(), "internal/bootstrap/node.js");
- Local<Function> node_bootstrapper =
+ MaybeLocal<Function> node_bootstrapper =
GetBootstrapper(env, NodeBootstrapperSource(env), node_name);
+ if (loaders_bootstrapper.IsEmpty() || node_bootstrapper.IsEmpty()) {
+ // Execution was interrupted.
+ return;
+ }
+
// Add a reference to the global object
Local<Object> global = env->context()->Global();
@@ -3049,7 +3082,7 @@ void LoadEnvironment(Environment* env) {
// Bootstrap internal loaders
Local<Value> bootstrapped_loaders;
- if (!ExecuteBootstrapper(env, loaders_bootstrapper,
+ if (!ExecuteBootstrapper(env, loaders_bootstrapper.ToLocalChecked(),
arraysize(loaders_bootstrapper_args),
loaders_bootstrapper_args,
&bootstrapped_loaders)) {
@@ -3065,7 +3098,7 @@ void LoadEnvironment(Environment* env) {
bootstrapper,
bootstrapped_loaders
};
- if (!ExecuteBootstrapper(env, node_bootstrapper,
+ if (!ExecuteBootstrapper(env, node_bootstrapper.ToLocalChecked(),
arraysize(node_bootstrapper_args),
node_bootstrapper_args,
&bootstrapped_node)) {
@@ -4279,6 +4312,7 @@ inline int Start(Isolate* isolate, IsolateData* isolate_data,
WaitForInspectorDisconnect(&env);
env.set_can_call_into_js(false);
+ env.stop_sub_worker_contexts();
env.RunCleanup();
RunAtExit(&env);
diff --git a/src/node_errors.h b/src/node_errors.h
index 931ce7b8fd..2c97088cc5 100644
--- a/src/node_errors.h
+++ b/src/node_errors.h
@@ -34,6 +34,7 @@ namespace node {
V(ERR_MISSING_ARGS, TypeError) \
V(ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST, TypeError) \
V(ERR_MISSING_MODULE, Error) \
+ V(ERR_MISSING_PLATFORM_FOR_WORKER, Error) \
V(ERR_SCRIPT_EXECUTION_INTERRUPTED, Error) \
V(ERR_SCRIPT_EXECUTION_TIMEOUT, Error) \
V(ERR_STRING_TOO_LONG, Error) \
@@ -68,6 +69,9 @@ namespace node {
V(ERR_MEMORY_ALLOCATION_FAILED, "Failed to allocate memory") \
V(ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST, \
"MessagePort was found in message but not listed in transferList") \
+ V(ERR_MISSING_PLATFORM_FOR_WORKER, \
+ "The V8 platform used by this instance of Node does not support " \
+ "creating Workers") \
V(ERR_SCRIPT_EXECUTION_INTERRUPTED, \
"Script execution was interrupted by `SIGINT`") \
V(ERR_TRANSFERRING_EXTERNALIZED_SHAREDARRAYBUFFER, \
diff --git a/src/node_internals.h b/src/node_internals.h
index a5d8ed0e5d..7760eb26c6 100644
--- a/src/node_internals.h
+++ b/src/node_internals.h
@@ -137,6 +137,7 @@ struct sockaddr;
V(util) \
V(uv) \
V(v8) \
+ V(worker) \
V(zlib)
#define NODE_BUILTIN_MODULES(V) \
@@ -314,6 +315,10 @@ class FatalTryCatch : public v8::TryCatch {
Environment* env_;
};
+void ReportException(Environment* env,
+ v8::Local<v8::Value> er,
+ v8::Local<v8::Message> message);
+
v8::Maybe<bool> ProcessEmitWarning(Environment* env, const char* fmt, ...);
v8::Maybe<bool> ProcessEmitDeprecationWarning(Environment* env,
const char* warning,
diff --git a/src/node_messaging.cc b/src/node_messaging.cc
index b56cef2d77..352749ea48 100644
--- a/src/node_messaging.cc
+++ b/src/node_messaging.cc
@@ -57,7 +57,7 @@ class DeserializerDelegate : public ValueDeserializer::Delegate {
if (!deserializer->ReadUint32(&id))
return MaybeLocal<Object>();
CHECK_LE(id, message_ports_.size());
- return message_ports_[id]->object();
+ return message_ports_[id]->object(isolate);
};
MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId(
@@ -436,7 +436,7 @@ MessagePort* MessagePort::New(
void MessagePort::OnMessage() {
HandleScope handle_scope(env()->isolate());
- Local<Context> context = object()->CreationContext();
+ Local<Context> context = object(env()->isolate())->CreationContext();
// data_ can only ever be modified by the owner thread, so no need to lock.
// However, the message port may be transferred while it is processing
@@ -447,6 +447,13 @@ void MessagePort::OnMessage() {
{
// Get the head of the message queue.
Mutex::ScopedLock lock(data_->mutex_);
+
+ if (stop_event_loop_) {
+ CHECK(!data_->receiving_messages_);
+ uv_stop(env()->event_loop());
+ break;
+ }
+
if (!data_->receiving_messages_)
break;
if (data_->incoming_messages_.empty())
@@ -514,8 +521,9 @@ void MessagePort::Send(Message&& message) {
void MessagePort::Send(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
+ Local<Context> context = object(env->isolate())->CreationContext();
Message msg;
- if (msg.Serialize(env, object()->CreationContext(), args[0], args[1])
+ if (msg.Serialize(env, context, args[0], args[1])
.IsNothing()) {
return;
}
@@ -548,6 +556,14 @@ void MessagePort::Stop() {
data_->receiving_messages_ = false;
}
+void MessagePort::StopEventLoop() {
+ Mutex::ScopedLock lock(data_->mutex_);
+ data_->receiving_messages_ = false;
+ stop_event_loop_ = true;
+
+ TriggerAsync();
+}
+
void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
MessagePort* port;
@@ -570,6 +586,12 @@ void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
port->Stop();
}
+void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
+ MessagePort* port;
+ ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
+ port->OnMessage();
+}
+
size_t MessagePort::self_size() const {
Mutex::ScopedLock lock(data_->mutex_);
size_t sz = sizeof(*this) + sizeof(*data_);
@@ -604,6 +626,7 @@ MaybeLocal<Function> GetMessagePortConstructor(
env->SetProtoMethod(m, "postMessage", MessagePort::PostMessage);
env->SetProtoMethod(m, "start", MessagePort::Start);
env->SetProtoMethod(m, "stop", MessagePort::Stop);
+ env->SetProtoMethod(m, "drain", MessagePort::Drain);
env->SetProtoMethod(m, "close", HandleWrap::Close);
env->SetProtoMethod(m, "unref", HandleWrap::Unref);
env->SetProtoMethod(m, "ref", HandleWrap::Ref);
diff --git a/src/node_messaging.h b/src/node_messaging.h
index ff8fcc7243..9a13437d19 100644
--- a/src/node_messaging.h
+++ b/src/node_messaging.h
@@ -133,11 +133,15 @@ class MessagePort : public HandleWrap {
void Start();
// Stop processing messages on this port as a receiving end.
void Stop();
+ // Stop processing messages on this port as a receiving end,
+ // and stop the event loop that this port is associated with.
+ void StopEventLoop();
static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
static void PostMessage(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Start(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Stop(const v8::FunctionCallbackInfo<v8::Value>& args);
+ static void Drain(const v8::FunctionCallbackInfo<v8::Value>& args);
// Turns `a` and `b` into siblings, i.e. connects the sending side of one
// to the receiving side of the other. This is not thread-safe.
@@ -160,6 +164,7 @@ class MessagePort : public HandleWrap {
inline uv_async_t* async();
std::unique_ptr<MessagePortData> data_ = nullptr;
+ bool stop_event_loop_ = false;
friend class MessagePortData;
};
diff --git a/src/node_worker.cc b/src/node_worker.cc
new file mode 100644
index 0000000000..366dca353d
--- /dev/null
+++ b/src/node_worker.cc
@@ -0,0 +1,428 @@
+#include "node_worker.h"
+#include "node_errors.h"
+#include "node_internals.h"
+#include "node_buffer.h"
+#include "node_perf.h"
+#include "util.h"
+#include "util-inl.h"
+#include "async_wrap.h"
+#include "async_wrap-inl.h"
+
+using v8::ArrayBuffer;
+using v8::Context;
+using v8::Function;
+using v8::FunctionCallbackInfo;
+using v8::FunctionTemplate;
+using v8::HandleScope;
+using v8::Integer;
+using v8::Isolate;
+using v8::Local;
+using v8::Locker;
+using v8::Number;
+using v8::Object;
+using v8::SealHandleScope;
+using v8::String;
+using v8::Value;
+
+namespace node {
+namespace worker {
+
+namespace {
+
+double next_thread_id = 1;
+Mutex next_thread_id_mutex;
+
+} // anonymous namespace
+
+Worker::Worker(Environment* env, Local<Object> wrap)
+ : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER) {
+ // Generate a new thread id.
+ {
+ Mutex::ScopedLock next_thread_id_lock(next_thread_id_mutex);
+ thread_id_ = next_thread_id++;
+ }
+ wrap->Set(env->context(),
+ env->thread_id_string(),
+ Number::New(env->isolate(), thread_id_)).FromJust();
+
+ // Set up everything that needs to be set up in the parent environment.
+ parent_port_ = MessagePort::New(env, env->context());
+ if (parent_port_ == nullptr) {
+ // This can happen e.g. because execution is terminating.
+ return;
+ }
+
+ child_port_data_.reset(new MessagePortData(nullptr));
+ MessagePort::Entangle(parent_port_, child_port_data_.get());
+
+ object()->Set(env->context(),
+ env->message_port_string(),
+ parent_port_->object()).FromJust();
+
+ array_buffer_allocator_.reset(CreateArrayBufferAllocator());
+
+ isolate_ = NewIsolate(array_buffer_allocator_.get());
+ CHECK_NE(isolate_, nullptr);
+ CHECK_EQ(uv_loop_init(&loop_), 0);
+
+ thread_exit_async_.reset(new uv_async_t);
+ thread_exit_async_->data = this;
+ CHECK_EQ(uv_async_init(env->event_loop(),
+ thread_exit_async_.get(),
+ [](uv_async_t* handle) {
+ static_cast<Worker*>(handle->data)->OnThreadStopped();
+ }), 0);
+
+ {
+ // Enter an environment capable of executing code in the child Isolate
+ // (and only in it).
+ Locker locker(isolate_);
+ Isolate::Scope isolate_scope(isolate_);
+ HandleScope handle_scope(isolate_);
+
+ isolate_data_.reset(CreateIsolateData(isolate_,
+ &loop_,
+ env->isolate_data()->platform(),
+ array_buffer_allocator_.get()));
+ CHECK(isolate_data_);
+
+ Local<Context> context = NewContext(isolate_);
+ Context::Scope context_scope(context);
+
+ // TODO(addaleax): Use CreateEnvironment(), or generally another public API.
+ env_.reset(new Environment(isolate_data_.get(),
+ context,
+ nullptr));
+ CHECK_NE(env_, nullptr);
+ env_->set_abort_on_uncaught_exception(false);
+ env_->set_worker_context(this);
+ env_->set_thread_id(thread_id_);
+
+ env_->Start(0, nullptr, 0, nullptr, env->profiler_idle_notifier_started());
+ }
+
+ // The new isolate won't be bothered on this thread again.
+ isolate_->DiscardThreadSpecificMetadata();
+}
+
+bool Worker::is_stopped() const {
+ Mutex::ScopedLock stopped_lock(stopped_mutex_);
+ return stopped_;
+}
+
+void Worker::Run() {
+ MultiIsolatePlatform* platform = isolate_data_->platform();
+ CHECK_NE(platform, nullptr);
+
+ {
+ Locker locker(isolate_);
+ Isolate::Scope isolate_scope(isolate_);
+ SealHandleScope outer_seal(isolate_);
+
+ {
+ Context::Scope context_scope(env_->context());
+ HandleScope handle_scope(isolate_);
+
+ {
+ HandleScope handle_scope(isolate_);
+ Mutex::ScopedLock lock(mutex_);
+ // Set up the message channel for receiving messages in the child.
+ child_port_ = MessagePort::New(env_.get(),
+ env_->context(),
+ std::move(child_port_data_));
+ // MessagePort::New() may return nullptr if execution is terminated
+ // within it.
+ if (child_port_ != nullptr)
+ env_->set_message_port(child_port_->object(isolate_));
+ }
+
+ if (!is_stopped()) {
+ HandleScope handle_scope(isolate_);
+ Environment::AsyncCallbackScope callback_scope(env_.get());
+ env_->async_hooks()->push_async_ids(1, 0);
+ // This loads the Node bootstrapping code.
+ LoadEnvironment(env_.get());
+ env_->async_hooks()->pop_async_id(1);
+ }
+
+ {
+ SealHandleScope seal(isolate_);
+ bool more;
+ env_->performance_state()->Mark(
+ node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START);
+ do {
+ if (is_stopped()) break;
+ uv_run(&loop_, UV_RUN_DEFAULT);
+ if (is_stopped()) break;
+
+ platform->DrainBackgroundTasks(isolate_);
+
+ more = uv_loop_alive(&loop_);
+ if (more && !is_stopped())
+ continue;
+
+ EmitBeforeExit(env_.get());
+
+ // Emit `beforeExit` if the loop became alive either after emitting
+ // event, or after running some callbacks.
+ more = uv_loop_alive(&loop_);
+ } while (more == true);
+ env_->performance_state()->Mark(
+ node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_EXIT);
+ }
+ }
+
+ {
+ int exit_code;
+ bool stopped = is_stopped();
+ if (!stopped)
+ exit_code = EmitExit(env_.get());
+ Mutex::ScopedLock lock(mutex_);
+ if (exit_code_ == 0 && !stopped)
+ exit_code_ = exit_code;
+ }
+
+ env_->set_can_call_into_js(false);
+ Isolate::DisallowJavascriptExecutionScope disallow_js(isolate_,
+ Isolate::DisallowJavascriptExecutionScope::THROW_ON_FAILURE);
+
+ // Grab the parent-to-child channel and render is unusable.
+ MessagePort* child_port;
+ {
+ Mutex::ScopedLock lock(mutex_);
+ child_port = child_port_;
+ child_port_ = nullptr;
+ }
+
+ {
+ Context::Scope context_scope(env_->context());
+ child_port->Close();
+ env_->stop_sub_worker_contexts();
+ env_->RunCleanup();
+ RunAtExit(env_.get());
+
+ {
+ Mutex::ScopedLock stopped_lock(stopped_mutex_);
+ stopped_ = true;
+ }
+
+ env_->RunCleanup();
+
+ // This call needs to be made while the `Environment` is still alive
+ // because we assume that it is available for async tracking in the
+ // NodePlatform implementation.
+ platform->DrainBackgroundTasks(isolate_);
+ }
+
+ env_.reset();
+ }
+
+ DisposeIsolate();
+
+ // Need to run the loop one more time to close the platform's uv_async_t
+ uv_run(&loop_, UV_RUN_ONCE);
+
+ {
+ Mutex::ScopedLock lock(mutex_);
+ CHECK(thread_exit_async_);
+ scheduled_on_thread_stopped_ = true;
+ uv_async_send(thread_exit_async_.get());
+ }
+}
+
+void Worker::DisposeIsolate() {
+ if (isolate_ == nullptr)
+ return;
+
+ CHECK(isolate_data_);
+ MultiIsolatePlatform* platform = isolate_data_->platform();
+ platform->CancelPendingDelayedTasks(isolate_);
+
+ isolate_data_.reset();
+
+ isolate_->Dispose();
+ isolate_ = nullptr;
+}
+
+void Worker::JoinThread() {
+ if (thread_joined_)
+ return;
+ CHECK_EQ(uv_thread_join(&tid_), 0);
+ thread_joined_ = true;
+
+ env()->remove_sub_worker_context(this);
+
+ if (thread_exit_async_) {
+ env()->CloseHandle(thread_exit_async_.release(), [](uv_async_t* async) {
+ delete async;
+ });
+
+ if (scheduled_on_thread_stopped_)
+ OnThreadStopped();
+ }
+}
+
+void Worker::OnThreadStopped() {
+ Mutex::ScopedLock lock(mutex_);
+ scheduled_on_thread_stopped_ = false;
+
+ {
+ Mutex::ScopedLock stopped_lock(stopped_mutex_);
+ CHECK(stopped_);
+ }
+
+ CHECK_EQ(child_port_, nullptr);
+ parent_port_ = nullptr;
+
+ // It's okay to join the thread while holding the mutex because
+ // OnThreadStopped means it's no longer doing any work that might grab it
+ // and really just silently exiting.
+ JoinThread();
+
+ {
+ HandleScope handle_scope(env()->isolate());
+ Context::Scope context_scope(env()->context());
+
+ // Reset the parent port as we're closing it now anyway.
+ object()->Set(env()->context(),
+ env()->message_port_string(),
+ Undefined(env()->isolate())).FromJust();
+
+ Local<Value> code = Integer::New(env()->isolate(), exit_code_);
+ MakeCallback(env()->onexit_string(), 1, &code);
+ }
+
+ // JoinThread() cleared all libuv handles bound to this Worker,
+ // the C++ object is no longer needed for anything now.
+ MakeWeak();
+}
+
+Worker::~Worker() {
+ Mutex::ScopedLock lock(mutex_);
+ JoinThread();
+
+ CHECK(stopped_);
+ CHECK(thread_joined_);
+ CHECK_EQ(child_port_, nullptr);
+ CHECK_EQ(uv_loop_close(&loop_), 0);
+
+ // This has most likely already happened within the worker thread -- this
+ // is just in case Worker creation failed early.
+ DisposeIsolate();
+}
+
+void Worker::New(const FunctionCallbackInfo<Value>& args) {
+ Environment* env = Environment::GetCurrent(args);
+
+ CHECK(args.IsConstructCall());
+
+ if (env->isolate_data()->platform() == nullptr) {
+ THROW_ERR_MISSING_PLATFORM_FOR_WORKER(env);
+ return;
+ }
+
+ new Worker(env, args.This());
+}
+
+void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
+ Worker* w;
+ ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
+ Mutex::ScopedLock lock(w->mutex_);
+
+ w->env()->add_sub_worker_context(w);
+ w->stopped_ = false;
+ CHECK_EQ(uv_thread_create(&w->tid_, [](void* arg) {
+ static_cast<Worker*>(arg)->Run();
+ }, static_cast<void*>(w)), 0);
+ w->thread_joined_ = false;
+}
+
+void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
+ Worker* w;
+ ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
+
+ w->Exit(1);
+ w->JoinThread();
+}
+
+void Worker::Ref(const FunctionCallbackInfo<Value>& args) {
+ Worker* w;
+ ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
+ if (w->thread_exit_async_)
+ uv_ref(reinterpret_cast<uv_handle_t*>(w->thread_exit_async_.get()));
+}
+
+void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
+ Worker* w;
+ ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
+ if (w->thread_exit_async_)
+ uv_unref(reinterpret_cast<uv_handle_t*>(w->thread_exit_async_.get()));
+}
+
+void Worker::Exit(int code) {
+ Mutex::ScopedLock lock(mutex_);
+ Mutex::ScopedLock stopped_lock(stopped_mutex_);
+ if (!stopped_) {
+ CHECK_NE(env_, nullptr);
+ stopped_ = true;
+ exit_code_ = code;
+ if (child_port_ != nullptr)
+ child_port_->StopEventLoop();
+ isolate_->TerminateExecution();
+ }
+}
+
+size_t Worker::self_size() const {
+ return sizeof(*this);
+}
+
+namespace {
+
+// Return the MessagePort that is global for this Environment and communicates
+// with the internal [kPort] port of the JS Worker class in the parent thread.
+void GetEnvMessagePort(const FunctionCallbackInfo<Value>& args) {
+ Environment* env = Environment::GetCurrent(args);
+ Local<Object> port = env->message_port();
+ if (!port.IsEmpty()) {
+ CHECK_EQ(port->CreationContext()->GetIsolate(), args.GetIsolate());
+ args.GetReturnValue().Set(port);
+ }
+}
+
+void InitWorker(Local<Object> target,
+ Local<Value> unused,
+ Local<Context> context,
+ void* priv) {
+ Environment* env = Environment::GetCurrent(context);
+
+ {
+ Local<FunctionTemplate> w = env->NewFunctionTemplate(Worker::New);
+
+ w->InstanceTemplate()->SetInternalFieldCount(1);
+
+ AsyncWrap::AddWrapMethods(env, w);
+ env->SetProtoMethod(w, "startThread", Worker::StartThread);
+ env->SetProtoMethod(w, "stopThread", Worker::StopThread);
+ env->SetProtoMethod(w, "ref", Worker::Ref);
+ env->SetProtoMethod(w, "unref", Worker::Unref);
+
+ Local<String> workerString =
+ FIXED_ONE_BYTE_STRING(env->isolate(), "Worker");
+ w->SetClassName(workerString);
+ target->Set(workerString, w->GetFunction());
+ }
+
+ env->SetMethod(target, "getEnvMessagePort", GetEnvMessagePort);
+
+ auto thread_id_string = FIXED_ONE_BYTE_STRING(env->isolate(), "threadId");
+ target->Set(env->context(),
+ thread_id_string,
+ Number::New(env->isolate(), env->thread_id())).FromJust();
+}
+
+} // anonymous namespace
+
+} // namespace worker
+} // namespace node
+
+NODE_MODULE_CONTEXT_AWARE_INTERNAL(worker, node::worker::InitWorker)
diff --git a/src/node_worker.h b/src/node_worker.h
new file mode 100644
index 0000000000..0a98d2f11e
--- /dev/null
+++ b/src/node_worker.h
@@ -0,0 +1,83 @@
+#ifndef SRC_NODE_WORKER_H_
+#define SRC_NODE_WORKER_H_
+
+#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
+
+#include "node_messaging.h"
+#include <unordered_map>
+
+namespace node {
+namespace worker {
+
+// A worker thread, as represented in its parent thread.
+class Worker : public AsyncWrap {
+ public:
+ Worker(Environment* env, v8::Local<v8::Object> wrap);
+ ~Worker();
+
+ // Run the worker. This is only called from the worker thread.
+ void Run();
+
+ // Forcibly exit the thread with a specified exit code. This may be called
+ // from any thread.
+ void Exit(int code);
+
+ // Wait for the worker thread to stop (in a blocking manner).
+ void JoinThread();
+
+ size_t self_size() const override;
+ bool is_stopped() const;
+
+ static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
+ static void StartThread(const v8::FunctionCallbackInfo<v8::Value>& args);
+ static void StopThread(const v8::FunctionCallbackInfo<v8::Value>& args);
+ static void GetMessagePort(const v8::FunctionCallbackInfo<v8::Value>& args);
+ static void Ref(const v8::FunctionCallbackInfo<v8::Value>& args);
+ static void Unref(const v8::FunctionCallbackInfo<v8::Value>& args);
+
+ private:
+ void OnThreadStopped();
+ void DisposeIsolate();
+
+ uv_loop_t loop_;
+ DeleteFnPtr<IsolateData, FreeIsolateData> isolate_data_;
+ DeleteFnPtr<Environment, FreeEnvironment> env_;
+ v8::Isolate* isolate_ = nullptr;
+ DeleteFnPtr<ArrayBufferAllocator, FreeArrayBufferAllocator>
+ array_buffer_allocator_;
+ uv_thread_t tid_;
+
+ // This mutex protects access to all variables listed below it.
+ mutable Mutex mutex_;
+
+ // Currently only used for telling the parent thread that the child
+ // thread exited.
+ std::unique_ptr<uv_async_t> thread_exit_async_;
+ bool scheduled_on_thread_stopped_ = false;
+
+ // This mutex only protects stopped_. If both locks are acquired, this needs
+ // to be the latter one.
+ mutable Mutex stopped_mutex_;
+ bool stopped_ = true;
+
+ bool thread_joined_ = true;
+ int exit_code_ = 0;
+ double thread_id_ = -1;
+
+ std::unique_ptr<MessagePortData> child_port_data_;
+
+ // The child port is always kept alive by the child Environment's persistent
+ // handle to it.
+ MessagePort* child_port_ = nullptr;
+ // This is always kept alive because the JS object associated with the Worker
+ // instance refers to it via its [kPort] property.
+ MessagePort* parent_port_ = nullptr;
+};
+
+} // namespace worker
+} // namespace node
+
+#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
+
+
+#endif // SRC_NODE_WORKER_H_