summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/api/callback.cc2
-rw-r--r--src/api/environment.cc2
-rw-r--r--src/env-inl.h8
-rw-r--r--src/env.cc62
-rw-r--r--src/env.h29
-rw-r--r--src/module_wrap.cc2
-rw-r--r--src/node.cc10
-rw-r--r--src/node.h10
-rw-r--r--src/node_contextify.cc2
-rw-r--r--src/node_worker.cc102
-rw-r--r--src/node_worker.h34
11 files changed, 154 insertions, 109 deletions
diff --git a/src/api/callback.cc b/src/api/callback.cc
index 6c6aec4573..b90f5ca92d 100644
--- a/src/api/callback.cc
+++ b/src/api/callback.cc
@@ -82,7 +82,7 @@ void InternalCallbackScope::Close() {
HandleScope handle_scope(env_->isolate());
if (!env_->can_call_into_js()) return;
- if (failed_ && !env_->is_main_thread() && env_->is_stopping_worker()) {
+ if (failed_ && !env_->is_main_thread() && env_->is_stopping()) {
env_->async_hooks()->clear_async_id_stack();
}
diff --git a/src/api/environment.cc b/src/api/environment.cc
index 6e3c61fac1..1597a6cec5 100644
--- a/src/api/environment.cc
+++ b/src/api/environment.cc
@@ -40,7 +40,7 @@ static bool ShouldAbortOnUncaughtException(Isolate* isolate) {
DebugSealHandleScope scope(isolate);
Environment* env = Environment::GetCurrent(isolate);
return env != nullptr &&
- (env->is_main_thread() || !env->is_stopping_worker()) &&
+ (env->is_main_thread() || !env->is_stopping()) &&
env->should_abort_on_uncaught_toggle()[0] &&
!env->inside_should_not_abort_on_uncaught_scope();
}
diff --git a/src/env-inl.h b/src/env-inl.h
index 8c37b393dc..aff12e57bf 100644
--- a/src/env-inl.h
+++ b/src/env-inl.h
@@ -32,7 +32,6 @@
#include "v8.h"
#include "node_perf_common.h"
#include "node_context_data.h"
-#include "node_worker.h"
#include <cstddef>
#include <cstdint>
@@ -661,7 +660,7 @@ void Environment::SetUnrefImmediate(native_immediate_callback cb,
}
inline bool Environment::can_call_into_js() const {
- return can_call_into_js_ && (is_main_thread() || !is_stopping_worker());
+ return can_call_into_js_ && !is_stopping();
}
inline void Environment::set_can_call_into_js(bool can_call_into_js) {
@@ -709,9 +708,8 @@ inline void Environment::remove_sub_worker_context(worker::Worker* context) {
sub_worker_contexts_.erase(context);
}
-inline bool Environment::is_stopping_worker() const {
- CHECK(!is_main_thread());
- return worker_context_->is_stopped();
+inline bool Environment::is_stopping() const {
+ return thread_stopper_.IsStopped();
}
inline performance::performance_state* Environment::performance_state() {
diff --git a/src/env.cc b/src/env.cc
index 2448754db9..768e14d796 100644
--- a/src/env.cc
+++ b/src/env.cc
@@ -340,6 +340,14 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) {
uv_unref(reinterpret_cast<uv_handle_t*>(&idle_prepare_handle_));
uv_unref(reinterpret_cast<uv_handle_t*>(&idle_check_handle_));
+ GetAsyncRequest()->Install(
+ this, static_cast<void*>(this), [](uv_async_t* handle) {
+ Environment* env = static_cast<Environment*>(handle->data);
+ uv_stop(env->event_loop());
+ });
+ GetAsyncRequest()->SetStopped(false);
+ uv_unref(reinterpret_cast<uv_handle_t*>(GetAsyncRequest()->GetHandle()));
+
// Register clean-up cb to be called to clean up the handles
// when the environment is freed, note that they are not cleaned in
// the one environment per process setup, but will be called in
@@ -355,6 +363,12 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) {
uv_key_set(&thread_local_env, this);
}
+void Environment::ExitEnv() {
+ set_can_call_into_js(false);
+ GetAsyncRequest()->Stop();
+ isolate_->TerminateExecution();
+}
+
MaybeLocal<Object> Environment::ProcessCliArgs(
const std::vector<std::string>& args,
const std::vector<std::string>& exec_args) {
@@ -519,6 +533,7 @@ void Environment::RunCleanup() {
started_cleanup_ = true;
TraceEventScope trace_scope(TRACING_CATEGORY_NODE1(environment),
"RunCleanup", this);
+ GetAsyncRequest()->Uninstall();
CleanupHandles();
while (!cleanup_hooks_.empty()) {
@@ -932,6 +947,53 @@ char* Environment::Reallocate(char* data, size_t old_size, size_t size) {
return new_data;
}
+void AsyncRequest::Install(Environment* env, void* data, uv_async_cb target) {
+ Mutex::ScopedLock lock(mutex_);
+ env_ = env;
+ async_ = new uv_async_t;
+ async_->data = data;
+ CHECK_EQ(uv_async_init(env_->event_loop(), async_, target), 0);
+}
+
+void AsyncRequest::Uninstall() {
+ Mutex::ScopedLock lock(mutex_);
+ if (async_ != nullptr) {
+ env_->CloseHandle(async_, [](uv_async_t* async) { delete async; });
+ async_ = nullptr;
+ }
+}
+
+void AsyncRequest::Stop() {
+ Mutex::ScopedLock lock(mutex_);
+ stop_ = true;
+ if (async_ != nullptr) uv_async_send(async_);
+}
+
+void AsyncRequest::SetStopped(bool flag) {
+ Mutex::ScopedLock lock(mutex_);
+ stop_ = flag;
+}
+
+bool AsyncRequest::IsStopped() const {
+ Mutex::ScopedLock lock(mutex_);
+ return stop_;
+}
+
+uv_async_t* AsyncRequest::GetHandle() {
+ Mutex::ScopedLock lock(mutex_);
+ return async_;
+}
+
+void AsyncRequest::MemoryInfo(MemoryTracker* tracker) const {
+ Mutex::ScopedLock lock(mutex_);
+ if (async_ != nullptr) tracker->TrackField("async_request", *async_);
+}
+
+AsyncRequest::~AsyncRequest() {
+ Mutex::ScopedLock lock(mutex_);
+ CHECK_NULL(async_);
+}
+
// Not really any better place than env.cc at this moment.
void BaseObject::DeleteMe(void* data) {
BaseObject* self = static_cast<BaseObject*>(data);
diff --git a/src/env.h b/src/env.h
index 2c61082c5f..5be13df1a9 100644
--- a/src/env.h
+++ b/src/env.h
@@ -511,6 +511,27 @@ struct AllocatedBuffer {
friend class Environment;
};
+class AsyncRequest : public MemoryRetainer {
+ public:
+ AsyncRequest() {}
+ ~AsyncRequest();
+ void Install(Environment* env, void* data, uv_async_cb target);
+ void Uninstall();
+ void Stop();
+ void SetStopped(bool flag);
+ bool IsStopped() const;
+ uv_async_t* GetHandle();
+ void MemoryInfo(MemoryTracker* tracker) const override;
+ SET_MEMORY_INFO_NAME(AsyncRequest)
+ SET_SELF_SIZE(AsyncRequest)
+
+ private:
+ Environment* env_;
+ uv_async_t* async_ = nullptr;
+ mutable Mutex mutex_;
+ bool stop_ = true;
+};
+
class Environment {
public:
class AsyncHooks {
@@ -695,6 +716,7 @@ class Environment {
void RegisterHandleCleanups();
void CleanupHandles();
void Exit(int code);
+ void ExitEnv();
// Register clean-up cb to be called on environment destruction.
inline void RegisterHandleCleanup(uv_handle_t* handle,
@@ -844,7 +866,7 @@ class Environment {
inline void add_sub_worker_context(worker::Worker* context);
inline void remove_sub_worker_context(worker::Worker* context);
void stop_sub_worker_contexts();
- inline bool is_stopping_worker() const;
+ inline bool is_stopping() const;
inline void ThrowError(const char* errmsg);
inline void ThrowTypeError(const char* errmsg);
@@ -1018,6 +1040,7 @@ class Environment {
inline ExecutionMode execution_mode() { return execution_mode_; }
inline void set_execution_mode(ExecutionMode mode) { execution_mode_ = mode; }
+ inline AsyncRequest* GetAsyncRequest() { return &thread_stopper_; }
private:
inline void CreateImmediate(native_immediate_callback cb,
@@ -1174,6 +1197,10 @@ class Environment {
uint64_t cleanup_hook_counter_ = 0;
bool started_cleanup_ = false;
+ // A custom async abstraction (a pair of async handle and a state variable)
+ // Used by embedders to shutdown running Node instance.
+ AsyncRequest thread_stopper_;
+
static void EnvPromiseHook(v8::PromiseHookType type,
v8::Local<v8::Promise> promise,
v8::Local<v8::Value> parent);
diff --git a/src/module_wrap.cc b/src/module_wrap.cc
index d311f7caca..ac5d28fb23 100644
--- a/src/module_wrap.cc
+++ b/src/module_wrap.cc
@@ -302,7 +302,7 @@ void ModuleWrap::Evaluate(const FunctionCallbackInfo<Value>& args) {
// Convert the termination exception into a regular exception.
if (timed_out || received_signal) {
- if (!env->is_main_thread() && env->is_stopping_worker())
+ if (!env->is_main_thread() && env->is_stopping())
return;
env->isolate()->CancelTerminateExecution();
// It is possible that execution was terminated by another timeout in
diff --git a/src/node.cc b/src/node.cc
index adcd6f6cad..783962d192 100644
--- a/src/node.cc
+++ b/src/node.cc
@@ -832,15 +832,14 @@ inline int StartNodeWithIsolate(Isolate* isolate,
per_process::v8_platform.DrainVMTasks(isolate);
more = uv_loop_alive(env.event_loop());
- if (more)
- continue;
+ if (more && !env.GetAsyncRequest()->IsStopped()) continue;
RunBeforeExit(&env);
// Emit `beforeExit` if the loop became alive either after emitting
// event, or after running some callbacks.
more = uv_loop_alive(env.event_loop());
- } while (more == true);
+ } while (more == true && !env.GetAsyncRequest()->IsStopped());
env.performance_state()->Mark(
node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_EXIT);
}
@@ -977,6 +976,11 @@ int Start(int argc, char** argv) {
return exit_code;
}
+int Stop(Environment* env) {
+ env->ExitEnv();
+ return 0;
+}
+
} // namespace node
#if !HAVE_INSPECTOR
diff --git a/src/node.h b/src/node.h
index 17eae72e9c..6568085335 100644
--- a/src/node.h
+++ b/src/node.h
@@ -201,10 +201,17 @@ typedef intptr_t ssize_t;
namespace node {
+class IsolateData;
+class Environment;
+
// TODO(addaleax): Officially deprecate this and replace it with something
// better suited for a public embedder API.
NODE_EXTERN int Start(int argc, char* argv[]);
+// Tear down Node.js while it is running (there are active handles
+// in the loop and / or actively executing JavaScript code).
+NODE_EXTERN int Stop(Environment* env);
+
// TODO(addaleax): Officially deprecate this and replace it with something
// better suited for a public embedder API.
NODE_EXTERN void Init(int* argc,
@@ -239,9 +246,6 @@ class NODE_EXTERN ArrayBufferAllocator : public v8::ArrayBuffer::Allocator {
NODE_EXTERN ArrayBufferAllocator* CreateArrayBufferAllocator();
NODE_EXTERN void FreeArrayBufferAllocator(ArrayBufferAllocator* allocator);
-class IsolateData;
-class Environment;
-
class NODE_EXTERN MultiIsolatePlatform : public v8::Platform {
public:
~MultiIsolatePlatform() override { }
diff --git a/src/node_contextify.cc b/src/node_contextify.cc
index b9962f091d..621fa7eb16 100644
--- a/src/node_contextify.cc
+++ b/src/node_contextify.cc
@@ -924,7 +924,7 @@ bool ContextifyScript::EvalMachine(Environment* env,
// Convert the termination exception into a regular exception.
if (timed_out || received_signal) {
- if (!env->is_main_thread() && env->is_stopping_worker())
+ if (!env->is_main_thread() && env->is_stopping())
return false;
env->isolate()->CancelTerminateExecution();
// It is possible that execution was terminated by another timeout in
diff --git a/src/node_worker.cc b/src/node_worker.cc
index 42d407e654..d9f7311c5d 100644
--- a/src/node_worker.cc
+++ b/src/node_worker.cc
@@ -57,46 +57,6 @@ void WaitForWorkerInspectorToStop(Environment* child) {
} // anonymous namespace
-void AsyncRequest::Install(Environment* env, void* data, uv_async_cb target) {
- Mutex::ScopedLock lock(mutex_);
- env_ = env;
- async_ = new uv_async_t;
- if (data != nullptr) async_->data = data;
- CHECK_EQ(uv_async_init(env_->event_loop(), async_, target), 0);
-}
-
-void AsyncRequest::Uninstall() {
- Mutex::ScopedLock lock(mutex_);
- if (async_ != nullptr)
- env_->CloseHandle(async_, [](uv_async_t* async) { delete async; });
-}
-
-void AsyncRequest::Stop() {
- Mutex::ScopedLock lock(mutex_);
- stop_ = true;
- if (async_ != nullptr) uv_async_send(async_);
-}
-
-void AsyncRequest::SetStopped(bool flag) {
- Mutex::ScopedLock lock(mutex_);
- stop_ = flag;
-}
-
-bool AsyncRequest::IsStopped() const {
- Mutex::ScopedLock lock(mutex_);
- return stop_;
-}
-
-uv_async_t* AsyncRequest::GetHandle() {
- Mutex::ScopedLock lock(mutex_);
- return async_;
-}
-
-void AsyncRequest::MemoryInfo(MemoryTracker* tracker) const {
- Mutex::ScopedLock lock(mutex_);
- if (async_ != nullptr) tracker->TrackField("async_request", *async_);
-}
-
Worker::Worker(Environment* env,
Local<Object> wrap,
const std::string& url,
@@ -142,7 +102,10 @@ Worker::Worker(Environment* env,
}
bool Worker::is_stopped() const {
- return thread_stopper_.IsStopped();
+ Mutex::ScopedLock lock(mutex_);
+ if (env_ != nullptr)
+ return env_->GetAsyncRequest()->IsStopped();
+ return stopped_;
}
// This class contains data that is only relevant to the child thread itself,
@@ -254,8 +217,12 @@ void Worker::Run() {
Context::Scope context_scope(env_->context());
if (child_port != nullptr)
child_port->Close();
- thread_stopper_.Uninstall();
- thread_stopper_.SetStopped(true);
+ {
+ Mutex::ScopedLock lock(mutex_);
+ stopped_ = true;
+ this->env_ = nullptr;
+ }
+ env_->GetAsyncRequest()->SetStopped(true);
env_->stop_sub_worker_contexts();
env_->RunCleanup();
RunAtExit(env_.get());
@@ -271,12 +238,12 @@ void Worker::Run() {
}
});
- if (thread_stopper_.IsStopped()) return;
+ if (is_stopped()) return;
{
HandleScope handle_scope(isolate_);
Local<Context> context = NewContext(isolate_);
- if (thread_stopper_.IsStopped()) return;
+ if (is_stopped()) return;
CHECK(!context.IsEmpty());
Context::Scope context_scope(context);
{
@@ -294,18 +261,13 @@ void Worker::Run() {
env_->ProcessCliArgs(std::vector<std::string>{},
std::move(exec_argv_));
}
-
+ {
+ Mutex::ScopedLock lock(mutex_);
+ if (stopped_) return;
+ this->env_ = env_.get();
+ }
Debug(this, "Created Environment for worker with id %llu", thread_id_);
-
if (is_stopped()) return;
- thread_stopper_.Install(env_.get(), env_.get(), [](uv_async_t* handle) {
- Environment* env_ = static_cast<Environment*>(handle->data);
- uv_stop(env_->event_loop());
- });
- uv_unref(reinterpret_cast<uv_handle_t*>(thread_stopper_.GetHandle()));
-
- Debug(this, "Created Environment for worker with id %llu", thread_id_);
- if (thread_stopper_.IsStopped()) return;
{
HandleScope handle_scope(isolate_);
Mutex::ScopedLock lock(mutex_);
@@ -321,7 +283,7 @@ void Worker::Run() {
Debug(this, "Created message port for worker %llu", thread_id_);
}
- if (thread_stopper_.IsStopped()) return;
+ if (is_stopped()) return;
{
#if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR
StartWorkerInspector(env_.get(),
@@ -342,28 +304,28 @@ void Worker::Run() {
Debug(this, "Loaded environment for worker %llu", thread_id_);
}
- if (thread_stopper_.IsStopped()) return;
+ if (is_stopped()) return;
{
SealHandleScope seal(isolate_);
bool more;
env_->performance_state()->Mark(
node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START);
do {
- if (thread_stopper_.IsStopped()) break;
+ if (is_stopped()) break;
uv_run(&data.loop_, UV_RUN_DEFAULT);
- if (thread_stopper_.IsStopped()) break;
+ if (is_stopped()) break;
platform_->DrainTasks(isolate_);
more = uv_loop_alive(&data.loop_);
- if (more && !thread_stopper_.IsStopped()) continue;
+ if (more && !is_stopped()) continue;
EmitBeforeExit(env_.get());
// Emit `beforeExit` if the loop became alive either after emitting
// event, or after running some callbacks.
more = uv_loop_alive(&data.loop_);
- } while (more == true);
+ } while (more == true && !is_stopped());
env_->performance_state()->Mark(
node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_EXIT);
}
@@ -371,7 +333,7 @@ void Worker::Run() {
{
int exit_code;
- bool stopped = thread_stopper_.IsStopped();
+ bool stopped = is_stopped();
if (!stopped)
exit_code = EmitExit(env_.get());
Mutex::ScopedLock lock(mutex_);
@@ -419,7 +381,7 @@ void Worker::OnThreadStopped() {
Worker::~Worker() {
Mutex::ScopedLock lock(mutex_);
- CHECK(thread_stopper_.IsStopped());
+ CHECK(stopped_ || env_ == nullptr || env_->GetAsyncRequest()->IsStopped());
CHECK(thread_joined_);
Debug(this, "Worker %llu destroyed", thread_id_);
@@ -516,12 +478,12 @@ void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
w->ClearWeak();
w->env()->add_sub_worker_context(w);
+ w->stopped_ = false;
w->thread_joined_ = false;
- w->thread_stopper_.SetStopped(false);
w->on_thread_finished_.Install(w->env(), w, [](uv_async_t* handle) {
Worker* w_ = static_cast<Worker*>(handle->data);
- CHECK(w_->thread_stopper_.IsStopped());
+ CHECK(w_->is_stopped());
w_->parent_port_ = nullptr;
w_->JoinThread();
delete w_;
@@ -569,14 +531,12 @@ void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
void Worker::Exit(int code) {
Mutex::ScopedLock lock(mutex_);
-
Debug(this, "Worker %llu called Exit(%d)", thread_id_, code);
- if (!thread_stopper_.IsStopped()) {
+ if (env_ != nullptr) {
exit_code_ = code;
- Debug(this, "Received StopEventLoop request");
- thread_stopper_.Stop();
- if (isolate_ != nullptr)
- isolate_->TerminateExecution();
+ Stop(env_);
+ } else {
+ stopped_ = true;
}
}
diff --git a/src/node_worker.h b/src/node_worker.h
index 442056eaac..adc755426d 100644
--- a/src/node_worker.h
+++ b/src/node_worker.h
@@ -12,26 +12,6 @@ namespace worker {
class WorkerThreadData;
-class AsyncRequest : public MemoryRetainer {
- public:
- AsyncRequest() {}
- void Install(Environment* env, void* data, uv_async_cb target);
- void Uninstall();
- void Stop();
- void SetStopped(bool flag);
- bool IsStopped() const;
- uv_async_t* GetHandle();
- void MemoryInfo(MemoryTracker* tracker) const override;
- SET_MEMORY_INFO_NAME(AsyncRequest)
- SET_SELF_SIZE(AsyncRequest)
-
- private:
- Environment* env_;
- uv_async_t* async_ = nullptr;
- mutable Mutex mutex_;
- bool stop_ = true;
-};
-
// A worker thread, as represented in its parent thread.
class Worker : public AsyncWrap {
public:
@@ -54,7 +34,6 @@ class Worker : public AsyncWrap {
void MemoryInfo(MemoryTracker* tracker) const override {
tracker->TrackField("parent_port", parent_port_);
- tracker->TrackInlineField(&thread_stopper_, "thread_stopper_");
tracker->TrackInlineField(&on_thread_finished_, "on_thread_finished_");
}
@@ -107,9 +86,20 @@ class Worker : public AsyncWrap {
// instance refers to it via its [kPort] property.
MessagePort* parent_port_ = nullptr;
- AsyncRequest thread_stopper_;
AsyncRequest on_thread_finished_;
+ // A raw flag that is used by creator and worker threads to
+ // sync up on pre-mature termination of worker - while in the
+ // warmup phase. Once the worker is fully warmed up, use the
+ // async handle of the worker's Environment for the same purpose.
+ bool stopped_ = true;
+
+ // The real Environment of the worker object. It has a lesser
+ // lifespan than the worker object itself - comes to life
+ // when the worker thread creates a new Environment, and gets
+ // destroyed alongwith the worker thread.
+ Environment* env_ = nullptr;
+
friend class WorkerThreadData;
};