summaryrefslogtreecommitdiff
path: root/src/node_worker.cc
diff options
context:
space:
mode:
authorAnna Henningsen <anna@addaleax.net>2017-09-01 17:03:41 +0200
committerAnna Henningsen <anna@addaleax.net>2018-06-06 19:43:52 +0200
commit0df031acadcc6490379d72676203a980c8d60592 (patch)
tree3f49864e72b0193ea9af937874f62c6316877ec4 /src/node_worker.cc
parent8939f36630bd718fc0b0b8557cf7f2ed9ecab312 (diff)
downloadandroid-node-v8-0df031acadcc6490379d72676203a980c8d60592.tar.gz
android-node-v8-0df031acadcc6490379d72676203a980c8d60592.tar.bz2
android-node-v8-0df031acadcc6490379d72676203a980c8d60592.zip
worker: initial implementation
Implement multi-threading support for most of the API. Thanks to Stephen Belanger for reviewing this change in its original form, to Olivia Hugger for reviewing the documentation and some of the tests coming along with it, and to Alexey Orlenko and Timothy Gu for reviewing other parts of the tests. Refs: https://github.com/ayojs/ayo/pull/110 Refs: https://github.com/ayojs/ayo/pull/114 Refs: https://github.com/ayojs/ayo/pull/117 PR-URL: https://github.com/nodejs/node/pull/20876 Reviewed-By: Gireesh Punathil <gpunathi@in.ibm.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Shingo Inoue <leko.noor@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Tiancheng "Timothy" Gu <timothygu99@gmail.com> Reviewed-By: John-David Dalton <john.david.dalton@gmail.com> Reviewed-By: Gus Caplan <me@gus.host>
Diffstat (limited to 'src/node_worker.cc')
-rw-r--r--src/node_worker.cc428
1 files changed, 428 insertions, 0 deletions
diff --git a/src/node_worker.cc b/src/node_worker.cc
new file mode 100644
index 0000000000..366dca353d
--- /dev/null
+++ b/src/node_worker.cc
@@ -0,0 +1,428 @@
+#include "node_worker.h"
+#include "node_errors.h"
+#include "node_internals.h"
+#include "node_buffer.h"
+#include "node_perf.h"
+#include "util.h"
+#include "util-inl.h"
+#include "async_wrap.h"
+#include "async_wrap-inl.h"
+
+using v8::ArrayBuffer;
+using v8::Context;
+using v8::Function;
+using v8::FunctionCallbackInfo;
+using v8::FunctionTemplate;
+using v8::HandleScope;
+using v8::Integer;
+using v8::Isolate;
+using v8::Local;
+using v8::Locker;
+using v8::Number;
+using v8::Object;
+using v8::SealHandleScope;
+using v8::String;
+using v8::Value;
+
+namespace node {
+namespace worker {
+
+namespace {
+
+double next_thread_id = 1;
+Mutex next_thread_id_mutex;
+
+} // anonymous namespace
+
+Worker::Worker(Environment* env, Local<Object> wrap)
+ : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER) {
+ // Generate a new thread id.
+ {
+ Mutex::ScopedLock next_thread_id_lock(next_thread_id_mutex);
+ thread_id_ = next_thread_id++;
+ }
+ wrap->Set(env->context(),
+ env->thread_id_string(),
+ Number::New(env->isolate(), thread_id_)).FromJust();
+
+ // Set up everything that needs to be set up in the parent environment.
+ parent_port_ = MessagePort::New(env, env->context());
+ if (parent_port_ == nullptr) {
+ // This can happen e.g. because execution is terminating.
+ return;
+ }
+
+ child_port_data_.reset(new MessagePortData(nullptr));
+ MessagePort::Entangle(parent_port_, child_port_data_.get());
+
+ object()->Set(env->context(),
+ env->message_port_string(),
+ parent_port_->object()).FromJust();
+
+ array_buffer_allocator_.reset(CreateArrayBufferAllocator());
+
+ isolate_ = NewIsolate(array_buffer_allocator_.get());
+ CHECK_NE(isolate_, nullptr);
+ CHECK_EQ(uv_loop_init(&loop_), 0);
+
+ thread_exit_async_.reset(new uv_async_t);
+ thread_exit_async_->data = this;
+ CHECK_EQ(uv_async_init(env->event_loop(),
+ thread_exit_async_.get(),
+ [](uv_async_t* handle) {
+ static_cast<Worker*>(handle->data)->OnThreadStopped();
+ }), 0);
+
+ {
+ // Enter an environment capable of executing code in the child Isolate
+ // (and only in it).
+ Locker locker(isolate_);
+ Isolate::Scope isolate_scope(isolate_);
+ HandleScope handle_scope(isolate_);
+
+ isolate_data_.reset(CreateIsolateData(isolate_,
+ &loop_,
+ env->isolate_data()->platform(),
+ array_buffer_allocator_.get()));
+ CHECK(isolate_data_);
+
+ Local<Context> context = NewContext(isolate_);
+ Context::Scope context_scope(context);
+
+ // TODO(addaleax): Use CreateEnvironment(), or generally another public API.
+ env_.reset(new Environment(isolate_data_.get(),
+ context,
+ nullptr));
+ CHECK_NE(env_, nullptr);
+ env_->set_abort_on_uncaught_exception(false);
+ env_->set_worker_context(this);
+ env_->set_thread_id(thread_id_);
+
+ env_->Start(0, nullptr, 0, nullptr, env->profiler_idle_notifier_started());
+ }
+
+ // The new isolate won't be bothered on this thread again.
+ isolate_->DiscardThreadSpecificMetadata();
+}
+
+bool Worker::is_stopped() const {
+ Mutex::ScopedLock stopped_lock(stopped_mutex_);
+ return stopped_;
+}
+
+void Worker::Run() {
+ MultiIsolatePlatform* platform = isolate_data_->platform();
+ CHECK_NE(platform, nullptr);
+
+ {
+ Locker locker(isolate_);
+ Isolate::Scope isolate_scope(isolate_);
+ SealHandleScope outer_seal(isolate_);
+
+ {
+ Context::Scope context_scope(env_->context());
+ HandleScope handle_scope(isolate_);
+
+ {
+ HandleScope handle_scope(isolate_);
+ Mutex::ScopedLock lock(mutex_);
+ // Set up the message channel for receiving messages in the child.
+ child_port_ = MessagePort::New(env_.get(),
+ env_->context(),
+ std::move(child_port_data_));
+ // MessagePort::New() may return nullptr if execution is terminated
+ // within it.
+ if (child_port_ != nullptr)
+ env_->set_message_port(child_port_->object(isolate_));
+ }
+
+ if (!is_stopped()) {
+ HandleScope handle_scope(isolate_);
+ Environment::AsyncCallbackScope callback_scope(env_.get());
+ env_->async_hooks()->push_async_ids(1, 0);
+ // This loads the Node bootstrapping code.
+ LoadEnvironment(env_.get());
+ env_->async_hooks()->pop_async_id(1);
+ }
+
+ {
+ SealHandleScope seal(isolate_);
+ bool more;
+ env_->performance_state()->Mark(
+ node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START);
+ do {
+ if (is_stopped()) break;
+ uv_run(&loop_, UV_RUN_DEFAULT);
+ if (is_stopped()) break;
+
+ platform->DrainBackgroundTasks(isolate_);
+
+ more = uv_loop_alive(&loop_);
+ if (more && !is_stopped())
+ continue;
+
+ EmitBeforeExit(env_.get());
+
+ // Emit `beforeExit` if the loop became alive either after emitting
+ // event, or after running some callbacks.
+ more = uv_loop_alive(&loop_);
+ } while (more == true);
+ env_->performance_state()->Mark(
+ node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_EXIT);
+ }
+ }
+
+ {
+ int exit_code;
+ bool stopped = is_stopped();
+ if (!stopped)
+ exit_code = EmitExit(env_.get());
+ Mutex::ScopedLock lock(mutex_);
+ if (exit_code_ == 0 && !stopped)
+ exit_code_ = exit_code;
+ }
+
+ env_->set_can_call_into_js(false);
+ Isolate::DisallowJavascriptExecutionScope disallow_js(isolate_,
+ Isolate::DisallowJavascriptExecutionScope::THROW_ON_FAILURE);
+
+ // Grab the parent-to-child channel and render is unusable.
+ MessagePort* child_port;
+ {
+ Mutex::ScopedLock lock(mutex_);
+ child_port = child_port_;
+ child_port_ = nullptr;
+ }
+
+ {
+ Context::Scope context_scope(env_->context());
+ child_port->Close();
+ env_->stop_sub_worker_contexts();
+ env_->RunCleanup();
+ RunAtExit(env_.get());
+
+ {
+ Mutex::ScopedLock stopped_lock(stopped_mutex_);
+ stopped_ = true;
+ }
+
+ env_->RunCleanup();
+
+ // This call needs to be made while the `Environment` is still alive
+ // because we assume that it is available for async tracking in the
+ // NodePlatform implementation.
+ platform->DrainBackgroundTasks(isolate_);
+ }
+
+ env_.reset();
+ }
+
+ DisposeIsolate();
+
+ // Need to run the loop one more time to close the platform's uv_async_t
+ uv_run(&loop_, UV_RUN_ONCE);
+
+ {
+ Mutex::ScopedLock lock(mutex_);
+ CHECK(thread_exit_async_);
+ scheduled_on_thread_stopped_ = true;
+ uv_async_send(thread_exit_async_.get());
+ }
+}
+
+void Worker::DisposeIsolate() {
+ if (isolate_ == nullptr)
+ return;
+
+ CHECK(isolate_data_);
+ MultiIsolatePlatform* platform = isolate_data_->platform();
+ platform->CancelPendingDelayedTasks(isolate_);
+
+ isolate_data_.reset();
+
+ isolate_->Dispose();
+ isolate_ = nullptr;
+}
+
+void Worker::JoinThread() {
+ if (thread_joined_)
+ return;
+ CHECK_EQ(uv_thread_join(&tid_), 0);
+ thread_joined_ = true;
+
+ env()->remove_sub_worker_context(this);
+
+ if (thread_exit_async_) {
+ env()->CloseHandle(thread_exit_async_.release(), [](uv_async_t* async) {
+ delete async;
+ });
+
+ if (scheduled_on_thread_stopped_)
+ OnThreadStopped();
+ }
+}
+
+void Worker::OnThreadStopped() {
+ Mutex::ScopedLock lock(mutex_);
+ scheduled_on_thread_stopped_ = false;
+
+ {
+ Mutex::ScopedLock stopped_lock(stopped_mutex_);
+ CHECK(stopped_);
+ }
+
+ CHECK_EQ(child_port_, nullptr);
+ parent_port_ = nullptr;
+
+ // It's okay to join the thread while holding the mutex because
+ // OnThreadStopped means it's no longer doing any work that might grab it
+ // and really just silently exiting.
+ JoinThread();
+
+ {
+ HandleScope handle_scope(env()->isolate());
+ Context::Scope context_scope(env()->context());
+
+ // Reset the parent port as we're closing it now anyway.
+ object()->Set(env()->context(),
+ env()->message_port_string(),
+ Undefined(env()->isolate())).FromJust();
+
+ Local<Value> code = Integer::New(env()->isolate(), exit_code_);
+ MakeCallback(env()->onexit_string(), 1, &code);
+ }
+
+ // JoinThread() cleared all libuv handles bound to this Worker,
+ // the C++ object is no longer needed for anything now.
+ MakeWeak();
+}
+
+Worker::~Worker() {
+ Mutex::ScopedLock lock(mutex_);
+ JoinThread();
+
+ CHECK(stopped_);
+ CHECK(thread_joined_);
+ CHECK_EQ(child_port_, nullptr);
+ CHECK_EQ(uv_loop_close(&loop_), 0);
+
+ // This has most likely already happened within the worker thread -- this
+ // is just in case Worker creation failed early.
+ DisposeIsolate();
+}
+
+void Worker::New(const FunctionCallbackInfo<Value>& args) {
+ Environment* env = Environment::GetCurrent(args);
+
+ CHECK(args.IsConstructCall());
+
+ if (env->isolate_data()->platform() == nullptr) {
+ THROW_ERR_MISSING_PLATFORM_FOR_WORKER(env);
+ return;
+ }
+
+ new Worker(env, args.This());
+}
+
+void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
+ Worker* w;
+ ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
+ Mutex::ScopedLock lock(w->mutex_);
+
+ w->env()->add_sub_worker_context(w);
+ w->stopped_ = false;
+ CHECK_EQ(uv_thread_create(&w->tid_, [](void* arg) {
+ static_cast<Worker*>(arg)->Run();
+ }, static_cast<void*>(w)), 0);
+ w->thread_joined_ = false;
+}
+
+void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
+ Worker* w;
+ ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
+
+ w->Exit(1);
+ w->JoinThread();
+}
+
+void Worker::Ref(const FunctionCallbackInfo<Value>& args) {
+ Worker* w;
+ ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
+ if (w->thread_exit_async_)
+ uv_ref(reinterpret_cast<uv_handle_t*>(w->thread_exit_async_.get()));
+}
+
+void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
+ Worker* w;
+ ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
+ if (w->thread_exit_async_)
+ uv_unref(reinterpret_cast<uv_handle_t*>(w->thread_exit_async_.get()));
+}
+
+void Worker::Exit(int code) {
+ Mutex::ScopedLock lock(mutex_);
+ Mutex::ScopedLock stopped_lock(stopped_mutex_);
+ if (!stopped_) {
+ CHECK_NE(env_, nullptr);
+ stopped_ = true;
+ exit_code_ = code;
+ if (child_port_ != nullptr)
+ child_port_->StopEventLoop();
+ isolate_->TerminateExecution();
+ }
+}
+
+size_t Worker::self_size() const {
+ return sizeof(*this);
+}
+
+namespace {
+
+// Return the MessagePort that is global for this Environment and communicates
+// with the internal [kPort] port of the JS Worker class in the parent thread.
+void GetEnvMessagePort(const FunctionCallbackInfo<Value>& args) {
+ Environment* env = Environment::GetCurrent(args);
+ Local<Object> port = env->message_port();
+ if (!port.IsEmpty()) {
+ CHECK_EQ(port->CreationContext()->GetIsolate(), args.GetIsolate());
+ args.GetReturnValue().Set(port);
+ }
+}
+
+void InitWorker(Local<Object> target,
+ Local<Value> unused,
+ Local<Context> context,
+ void* priv) {
+ Environment* env = Environment::GetCurrent(context);
+
+ {
+ Local<FunctionTemplate> w = env->NewFunctionTemplate(Worker::New);
+
+ w->InstanceTemplate()->SetInternalFieldCount(1);
+
+ AsyncWrap::AddWrapMethods(env, w);
+ env->SetProtoMethod(w, "startThread", Worker::StartThread);
+ env->SetProtoMethod(w, "stopThread", Worker::StopThread);
+ env->SetProtoMethod(w, "ref", Worker::Ref);
+ env->SetProtoMethod(w, "unref", Worker::Unref);
+
+ Local<String> workerString =
+ FIXED_ONE_BYTE_STRING(env->isolate(), "Worker");
+ w->SetClassName(workerString);
+ target->Set(workerString, w->GetFunction());
+ }
+
+ env->SetMethod(target, "getEnvMessagePort", GetEnvMessagePort);
+
+ auto thread_id_string = FIXED_ONE_BYTE_STRING(env->isolate(), "threadId");
+ target->Set(env->context(),
+ thread_id_string,
+ Number::New(env->isolate(), env->thread_id())).FromJust();
+}
+
+} // anonymous namespace
+
+} // namespace worker
+} // namespace node
+
+NODE_MODULE_CONTEXT_AWARE_INTERNAL(worker, node::worker::InitWorker)