summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--node.gyp4
-rw-r--r--src/inspector_agent.cc15
-rw-r--r--src/inspector_agent.h3
-rw-r--r--src/node.cc60
-rw-r--r--src/node_platform.cc189
-rw-r--r--src/node_platform.h69
-rw-r--r--src/tracing/agent.cc1
-rw-r--r--src/tracing/agent.h1
-rw-r--r--src/tracing/trace_event.cc6
-rw-r--r--src/tracing/trace_event.h1
-rw-r--r--test/cctest/node_test_fixture.h15
-rw-r--r--test/cctest/test_environment.cc3
12 files changed, 322 insertions, 45 deletions
diff --git a/node.gyp b/node.gyp
index 5ef250d5dd..08b792ef30 100644
--- a/node.gyp
+++ b/node.gyp
@@ -190,6 +190,7 @@
'src/node_http_parser.cc',
'src/node_main.cc',
'src/node_os.cc',
+ 'src/node_platform.cc',
'src/node_revert.cc',
'src/node_serdes.cc',
'src/node_url.cc',
@@ -238,6 +239,7 @@
'src/node_internals.h',
'src/node_javascript.h',
'src/node_mutex.h',
+ 'src/node_platform.h',
'src/node_root_certs.h',
'src/node_version.h',
'src/node_watchdog.h',
@@ -656,6 +658,8 @@
'defines': [ 'NODE_WANT_INTERNALS=1' ],
'sources': [
+ 'src/node_platform.cc',
+ 'src/node_platform.h',
'test/cctest/test_base64.cc',
'test/cctest/test_environment.cc',
'test/cctest/test_util.cc',
diff --git a/src/inspector_agent.cc b/src/inspector_agent.cc
index 2520fbdd53..828006ecf2 100644
--- a/src/inspector_agent.cc
+++ b/src/inspector_agent.cc
@@ -502,11 +502,9 @@ class InspectorTimerHandle {
class NodeInspectorClient : public V8InspectorClient {
public:
- NodeInspectorClient(node::Environment* env,
- v8::Platform* platform) : env_(env),
- platform_(platform),
- terminated_(false),
- running_nested_loop_(false) {
+ NodeInspectorClient(node::Environment* env, node::NodePlatform* platform)
+ : env_(env), platform_(platform), terminated_(false),
+ running_nested_loop_(false) {
client_ = V8Inspector::create(env->isolate(), this);
contextCreated(env->context(), "Node.js Main Context");
}
@@ -518,8 +516,7 @@ class NodeInspectorClient : public V8InspectorClient {
terminated_ = false;
running_nested_loop_ = true;
while (!terminated_ && channel_->waitForFrontendMessage()) {
- while (v8::platform::PumpMessageLoop(platform_, env_->isolate()))
- {}
+ platform_->FlushForegroundTasksInternal();
}
terminated_ = false;
running_nested_loop_ = false;
@@ -647,7 +644,7 @@ class NodeInspectorClient : public V8InspectorClient {
private:
node::Environment* env_;
- v8::Platform* platform_;
+ node::NodePlatform* platform_;
bool terminated_;
bool running_nested_loop_;
std::unique_ptr<V8Inspector> client_;
@@ -666,7 +663,7 @@ Agent::Agent(Environment* env) : parent_env_(env),
Agent::~Agent() {
}
-bool Agent::Start(v8::Platform* platform, const char* path,
+bool Agent::Start(node::NodePlatform* platform, const char* path,
const DebugOptions& options) {
path_ = path == nullptr ? "" : path;
debug_options_ = options;
diff --git a/src/inspector_agent.h b/src/inspector_agent.h
index 6ec1bc28dc..8195e001c2 100644
--- a/src/inspector_agent.h
+++ b/src/inspector_agent.h
@@ -14,6 +14,7 @@
// Forward declaration to break recursive dependency chain with src/env.h.
namespace node {
class Environment;
+class NodePlatform;
} // namespace node
#include "v8.h"
@@ -42,7 +43,7 @@ class Agent {
~Agent();
// Create client_, may create io_ if option enabled
- bool Start(v8::Platform* platform, const char* path,
+ bool Start(node::NodePlatform* platform, const char* path,
const DebugOptions& options);
// Stop and destroy io_
void Stop();
diff --git a/src/node.cc b/src/node.cc
index a37753e385..1ef5adce3b 100644
--- a/src/node.cc
+++ b/src/node.cc
@@ -23,6 +23,7 @@
#include "node_buffer.h"
#include "node_constants.h"
#include "node_javascript.h"
+#include "node_platform.h"
#include "node_version.h"
#include "node_internals.h"
#include "node_revert.h"
@@ -250,25 +251,26 @@ node::DebugOptions debug_options;
static struct {
#if NODE_USE_V8_PLATFORM
- void Initialize(int thread_pool_size) {
+ void Initialize(int thread_pool_size, uv_loop_t* loop) {
tracing_agent_ =
- trace_enabled ? new tracing::Agent() : nullptr;
- platform_ = v8::platform::CreateDefaultPlatform(
- thread_pool_size, v8::platform::IdleTaskSupport::kDisabled,
- v8::platform::InProcessStackDumping::kDisabled,
- trace_enabled ? tracing_agent_->GetTracingController() : nullptr);
+ trace_enabled ? new tracing::Agent() : nullptr;
+ platform_ = new NodePlatform(thread_pool_size, loop,
+ trace_enabled ? tracing_agent_->GetTracingController() : nullptr);
V8::InitializePlatform(platform_);
tracing::TraceEventHelper::SetTracingController(
- trace_enabled ? tracing_agent_->GetTracingController() : nullptr);
- }
-
- void PumpMessageLoop(Isolate* isolate) {
- v8::platform::PumpMessageLoop(platform_, isolate);
+ trace_enabled ? tracing_agent_->GetTracingController() : nullptr);
}
void Dispose() {
+ platform_->Shutdown();
delete platform_;
platform_ = nullptr;
+ delete tracing_agent_;
+ tracing_agent_ = nullptr;
+ }
+
+ void DrainVMTasks() {
+ platform_->DrainBackgroundTasks();
}
#if HAVE_INSPECTOR
@@ -293,12 +295,12 @@ static struct {
tracing_agent_->Stop();
}
- v8::Platform* platform_;
tracing::Agent* tracing_agent_;
+ NodePlatform* platform_;
#else // !NODE_USE_V8_PLATFORM
- void Initialize(int thread_pool_size) {}
- void PumpMessageLoop(Isolate* isolate) {}
+ void Initialize(int thread_pool_size, uv_loop_t* loop) {}
void Dispose() {}
+ void DrainVMTasks() {}
bool StartInspector(Environment *env, const char* script_path,
const node::DebugOptions& options) {
env->ThrowError("Node compiled with NODE_USE_V8_PLATFORM=0");
@@ -4555,19 +4557,14 @@ inline int Start(Isolate* isolate, IsolateData* isolate_data,
SealHandleScope seal(isolate);
bool more;
do {
- v8_platform.PumpMessageLoop(isolate);
- more = uv_run(env.event_loop(), UV_RUN_ONCE);
-
- if (more == false) {
- v8_platform.PumpMessageLoop(isolate);
- EmitBeforeExit(&env);
-
- // Emit `beforeExit` if the loop became alive either after emitting
- // event, or after running some callbacks.
- more = uv_loop_alive(env.event_loop());
- if (uv_run(env.event_loop(), UV_RUN_NOWAIT) != 0)
- more = true;
- }
+ uv_run(env.event_loop(), UV_RUN_DEFAULT);
+
+ EmitBeforeExit(&env);
+
+ v8_platform.DrainVMTasks();
+ // Emit `beforeExit` if the loop became alive either after emitting
+ // event, or after running some callbacks.
+ more = uv_loop_alive(env.event_loop());
} while (more == true);
}
@@ -4577,6 +4574,7 @@ inline int Start(Isolate* isolate, IsolateData* isolate_data,
RunAtExit(&env);
uv_key_delete(&thread_local_env);
+ v8_platform.DrainVMTasks();
WaitForInspectorDisconnect(&env);
#if defined(LEAK_SANITIZER)
__lsan_do_leak_check();
@@ -4665,7 +4663,7 @@ int Start(int argc, char** argv) {
V8::SetEntropySource(crypto::EntropySource);
#endif // HAVE_OPENSSL
- v8_platform.Initialize(v8_thread_pool_size);
+ v8_platform.Initialize(v8_thread_pool_size, uv_default_loop());
// Enable tracing when argv has --trace-events-enabled.
if (trace_enabled) {
fprintf(stderr, "Warning: Trace event is an experimental feature "
@@ -4682,6 +4680,12 @@ int Start(int argc, char** argv) {
v8_initialized = false;
V8::Dispose();
+ // uv_run cannot be called from the time before the beforeExit callback
+ // runs until the program exits unless the event loop has any referenced
+ // handles after beforeExit terminates. This prevents unrefed timers
+ // that happen to terminate during shutdown from being run unsafely.
+ // Since uv_run cannot be called, uv_async handles held by the platform
+ // will never be fully cleaned up.
v8_platform.Dispose();
delete[] exec_argv;
diff --git a/src/node_platform.cc b/src/node_platform.cc
new file mode 100644
index 0000000000..3d023114ad
--- /dev/null
+++ b/src/node_platform.cc
@@ -0,0 +1,189 @@
+#include "node_platform.h"
+
+#include "util.h"
+
+namespace node {
+
+using v8::Isolate;
+using v8::Platform;
+using v8::Task;
+using v8::TracingController;
+
+static void FlushTasks(uv_async_t* handle) {
+ NodePlatform* platform = static_cast<NodePlatform*>(handle->data);
+ platform->FlushForegroundTasksInternal();
+}
+
+static void BackgroundRunner(void* data) {
+ TaskQueue<Task>* background_tasks = static_cast<TaskQueue<Task>*>(data);
+ while (Task* task = background_tasks->BlockingPop()) {
+ task->Run();
+ delete task;
+ background_tasks->NotifyOfCompletion();
+ }
+}
+
+NodePlatform::NodePlatform(int thread_pool_size, uv_loop_t* loop,
+ TracingController* tracing_controller)
+ : loop_(loop) {
+ CHECK_EQ(0, uv_async_init(loop, &flush_tasks_, FlushTasks));
+ flush_tasks_.data = static_cast<void*>(this);
+ uv_unref(reinterpret_cast<uv_handle_t*>(&flush_tasks_));
+ if (tracing_controller) {
+ tracing_controller_.reset(tracing_controller);
+ } else {
+ TracingController* controller = new TracingController();
+ tracing_controller_.reset(controller);
+ }
+ for (int i = 0; i < thread_pool_size; i++) {
+ uv_thread_t* t = new uv_thread_t();
+ if (uv_thread_create(t, BackgroundRunner, &background_tasks_) != 0) {
+ delete t;
+ break;
+ }
+ threads_.push_back(std::unique_ptr<uv_thread_t>(t));
+ }
+}
+
+void NodePlatform::Shutdown() {
+ background_tasks_.Stop();
+ for (size_t i = 0; i < threads_.size(); i++) {
+ CHECK_EQ(0, uv_thread_join(threads_[i].get()));
+ }
+ // uv_run cannot be called from the time before the beforeExit callback
+ // runs until the program exits unless the event loop has any referenced
+ // handles after beforeExit terminates. This prevents unrefed timers
+ // that happen to terminate during shutdown from being run unsafely.
+ // Since uv_run cannot be called, this handle will never be fully cleaned
+ // up.
+ uv_close(reinterpret_cast<uv_handle_t*>(&flush_tasks_), nullptr);
+}
+
+size_t NodePlatform::NumberOfAvailableBackgroundThreads() {
+ return threads_.size();
+}
+
+static void RunForegroundTask(uv_timer_t* handle) {
+ Task* task = static_cast<Task*>(handle->data);
+ task->Run();
+ delete task;
+ uv_close(reinterpret_cast<uv_handle_t*>(handle), [](uv_handle_t* handle) {
+ delete reinterpret_cast<uv_timer_t*>(handle);
+ });
+}
+
+void NodePlatform::DrainBackgroundTasks() {
+ FlushForegroundTasksInternal();
+ background_tasks_.BlockingDrain();
+}
+
+void NodePlatform::FlushForegroundTasksInternal() {
+ while (auto delayed = foreground_delayed_tasks_.Pop()) {
+ uint64_t delay_millis =
+ static_cast<uint64_t>(delayed->second + 0.5) * 1000;
+ uv_timer_t* handle = new uv_timer_t();
+ handle->data = static_cast<void*>(delayed->first);
+ uv_timer_init(loop_, handle);
+ // Timers may not guarantee queue ordering of events with the same delay if
+ // the delay is non-zero. This should not be a problem in practice.
+ uv_timer_start(handle, RunForegroundTask, delay_millis, 0);
+ uv_unref(reinterpret_cast<uv_handle_t*>(handle));
+ delete delayed;
+ }
+ while (Task* task = foreground_tasks_.Pop()) {
+ task->Run();
+ delete task;
+ }
+}
+
+void NodePlatform::CallOnBackgroundThread(Task* task,
+ ExpectedRuntime expected_runtime) {
+ background_tasks_.Push(task);
+}
+
+void NodePlatform::CallOnForegroundThread(Isolate* isolate, Task* task) {
+ foreground_tasks_.Push(task);
+ uv_async_send(&flush_tasks_);
+}
+
+void NodePlatform::CallDelayedOnForegroundThread(Isolate* isolate,
+ Task* task,
+ double delay_in_seconds) {
+ auto pair = new std::pair<Task*, double>(task, delay_in_seconds);
+ foreground_delayed_tasks_.Push(pair);
+ uv_async_send(&flush_tasks_);
+}
+
+bool NodePlatform::IdleTasksEnabled(Isolate* isolate) { return false; }
+
+double NodePlatform::MonotonicallyIncreasingTime() {
+ // Convert nanos to seconds.
+ return uv_hrtime() / 1e9;
+}
+
+TracingController* NodePlatform::GetTracingController() {
+ return tracing_controller_.get();
+}
+
+template <class T>
+TaskQueue<T>::TaskQueue()
+ : lock_(), tasks_available_(), tasks_drained_(),
+ outstanding_tasks_(0), stopped_(false), task_queue_() { }
+
+template <class T>
+void TaskQueue<T>::Push(T* task) {
+ Mutex::ScopedLock scoped_lock(lock_);
+ outstanding_tasks_++;
+ task_queue_.push(task);
+ tasks_available_.Signal(scoped_lock);
+}
+
+template <class T>
+T* TaskQueue<T>::Pop() {
+ Mutex::ScopedLock scoped_lock(lock_);
+ T* result = nullptr;
+ if (!task_queue_.empty()) {
+ result = task_queue_.front();
+ task_queue_.pop();
+ }
+ return result;
+}
+
+template <class T>
+T* TaskQueue<T>::BlockingPop() {
+ Mutex::ScopedLock scoped_lock(lock_);
+ while (task_queue_.empty() && !stopped_) {
+ tasks_available_.Wait(scoped_lock);
+ }
+ if (stopped_) {
+ return nullptr;
+ }
+ T* result = task_queue_.front();
+ task_queue_.pop();
+ return result;
+}
+
+template <class T>
+void TaskQueue<T>::NotifyOfCompletion() {
+ Mutex::ScopedLock scoped_lock(lock_);
+ if (--outstanding_tasks_ == 0) {
+ tasks_drained_.Broadcast(scoped_lock);
+ }
+}
+
+template <class T>
+void TaskQueue<T>::BlockingDrain() {
+ Mutex::ScopedLock scoped_lock(lock_);
+ while (outstanding_tasks_ > 0) {
+ tasks_drained_.Wait(scoped_lock);
+ }
+}
+
+template <class T>
+void TaskQueue<T>::Stop() {
+ Mutex::ScopedLock scoped_lock(lock_);
+ stopped_ = true;
+ tasks_available_.Broadcast(scoped_lock);
+}
+
+} // namespace node
diff --git a/src/node_platform.h b/src/node_platform.h
new file mode 100644
index 0000000000..668fcf28e4
--- /dev/null
+++ b/src/node_platform.h
@@ -0,0 +1,69 @@
+#ifndef SRC_NODE_PLATFORM_H_
+#define SRC_NODE_PLATFORM_H_
+
+#include <queue>
+#include <vector>
+
+#include "libplatform/libplatform.h"
+#include "node_mutex.h"
+#include "uv.h"
+
+namespace node {
+
+template <class T>
+class TaskQueue {
+ public:
+ TaskQueue();
+ ~TaskQueue() {}
+
+ void Push(T* task);
+ T* Pop();
+ T* BlockingPop();
+ void NotifyOfCompletion();
+ void BlockingDrain();
+ void Stop();
+
+ private:
+ Mutex lock_;
+ ConditionVariable tasks_available_;
+ ConditionVariable tasks_drained_;
+ int outstanding_tasks_;
+ bool stopped_;
+ std::queue<T*> task_queue_;
+};
+
+class NodePlatform : public v8::Platform {
+ public:
+ NodePlatform(int thread_pool_size, uv_loop_t* loop,
+ v8::TracingController* tracing_controller);
+ virtual ~NodePlatform() {}
+
+ void DrainBackgroundTasks();
+ void FlushForegroundTasksInternal();
+ void Shutdown();
+
+ // v8::Platform implementation.
+ size_t NumberOfAvailableBackgroundThreads() override;
+ void CallOnBackgroundThread(v8::Task* task,
+ ExpectedRuntime expected_runtime) override;
+ void CallOnForegroundThread(v8::Isolate* isolate, v8::Task* task) override;
+ void CallDelayedOnForegroundThread(v8::Isolate* isolate, v8::Task* task,
+ double delay_in_seconds) override;
+ bool IdleTasksEnabled(v8::Isolate* isolate) override;
+ double MonotonicallyIncreasingTime() override;
+ v8::TracingController* GetTracingController() override;
+
+ private:
+ uv_loop_t* const loop_;
+ uv_async_t flush_tasks_;
+ TaskQueue<v8::Task> foreground_tasks_;
+ TaskQueue<std::pair<v8::Task*, double>> foreground_delayed_tasks_;
+ TaskQueue<v8::Task> background_tasks_;
+ std::vector<std::unique_ptr<uv_thread_t>> threads_;
+
+ std::unique_ptr<v8::TracingController> tracing_controller_;
+};
+
+} // namespace node
+
+#endif // SRC_NODE_PLATFORM_H_
diff --git a/src/tracing/agent.cc b/src/tracing/agent.cc
index 38e651ebb2..1ac99bbb34 100644
--- a/src/tracing/agent.cc
+++ b/src/tracing/agent.cc
@@ -4,7 +4,6 @@
#include <string>
#include "env-inl.h"
-#include "libplatform/libplatform.h"
namespace node {
namespace tracing {
diff --git a/src/tracing/agent.h b/src/tracing/agent.h
index cc00c53144..e781281712 100644
--- a/src/tracing/agent.h
+++ b/src/tracing/agent.h
@@ -1,6 +1,7 @@
#ifndef SRC_TRACING_AGENT_H_
#define SRC_TRACING_AGENT_H_
+#include "node_platform.h"
#include "tracing/node_trace_buffer.h"
#include "tracing/node_trace_writer.h"
#include "uv.h"
diff --git a/src/tracing/trace_event.cc b/src/tracing/trace_event.cc
index 856b344e9d..f661dd5c69 100644
--- a/src/tracing/trace_event.cc
+++ b/src/tracing/trace_event.cc
@@ -3,14 +3,14 @@
namespace node {
namespace tracing {
-v8::TracingController* controller_ = nullptr;
+v8::TracingController* g_controller = nullptr;
void TraceEventHelper::SetTracingController(v8::TracingController* controller) {
- controller_ = controller;
+ g_controller = controller;
}
v8::TracingController* TraceEventHelper::GetTracingController() {
- return controller_;
+ return g_controller;
}
} // namespace tracing
diff --git a/src/tracing/trace_event.h b/src/tracing/trace_event.h
index 24806d375f..61808eb94f 100644
--- a/src/tracing/trace_event.h
+++ b/src/tracing/trace_event.h
@@ -7,6 +7,7 @@
#include <stddef.h>
+#include "node_platform.h"
#include "v8-platform.h"
#include "trace_event_common.h"
diff --git a/test/cctest/node_test_fixture.h b/test/cctest/node_test_fixture.h
index e52b1b5dfd..f30823a8fd 100644
--- a/test/cctest/node_test_fixture.h
+++ b/test/cctest/node_test_fixture.h
@@ -66,7 +66,12 @@ struct Argv {
int nr_args_;
};
+uv_loop_t current_loop;
+
class NodeTestFixture : public ::testing::Test {
+ public:
+ static uv_loop_t* CurrentLoop() { return &current_loop; }
+
protected:
v8::Isolate::CreateParams params_;
ArrayBufferAllocator allocator_;
@@ -77,7 +82,8 @@ class NodeTestFixture : public ::testing::Test {
}
virtual void SetUp() {
- platform_ = v8::platform::CreateDefaultPlatform();
+ CHECK_EQ(0, uv_loop_init(&current_loop));
+ platform_ = new node::NodePlatform(8, &current_loop, nullptr);
v8::V8::InitializePlatform(platform_);
v8::V8::Initialize();
params_.array_buffer_allocator = &allocator_;
@@ -86,13 +92,18 @@ class NodeTestFixture : public ::testing::Test {
virtual void TearDown() {
if (platform_ == nullptr) return;
+ platform_->Shutdown();
+ while (uv_loop_alive(&current_loop)) {
+ uv_run(&current_loop, UV_RUN_ONCE);
+ }
v8::V8::ShutdownPlatform();
delete platform_;
platform_ = nullptr;
+ CHECK_EQ(0, uv_loop_close(&current_loop));
}
private:
- v8::Platform* platform_ = nullptr;
+ node::NodePlatform* platform_ = nullptr;
};
#endif // TEST_CCTEST_NODE_TEST_FIXTURE_H_
diff --git a/test/cctest/test_environment.cc b/test/cctest/test_environment.cc
index aee8e795ec..4651e865a9 100644
--- a/test/cctest/test_environment.cc
+++ b/test/cctest/test_environment.cc
@@ -31,7 +31,8 @@ class EnvironmentTest : public NodeTestFixture {
const Argv& argv) {
context_ = v8::Context::New(isolate);
CHECK(!context_.IsEmpty());
- isolate_data_ = CreateIsolateData(isolate, uv_default_loop());
+ isolate_data_ = CreateIsolateData(isolate,
+ NodeTestFixture::CurrentLoop());
CHECK_NE(nullptr, isolate_data_);
environment_ = CreateEnvironment(isolate_data_,
context_,