From 26ab769940e59646386bd15bbafb9378b8b56aed Mon Sep 17 00:00:00 2001 From: Sam Roberts Date: Tue, 30 May 2017 16:34:59 -0700 Subject: inspector: refactor to rename and comment methods Pure refactor, makes no functional changes but the renaming helped me see more clearly what the relationship was between methods and variables. * Renamed methods to reduce number of slightly different names for the same thing ("thread" vs "io thread", etc.). * Added comments where it was useful to me. PR-URL: https://github.com/nodejs/node/pull/13321 Reviewed-By: Eugene Ostroukhov Reviewed-By: Anna Henningsen Reviewed-By: James M Snell --- src/inspector_agent.cc | 125 ++++++++++++++++++++--------------------- src/inspector_agent.h | 43 +++++++++----- src/inspector_io.cc | 110 ++++++++++++++++++++---------------- src/inspector_io.h | 56 ++++++++++++------ src/inspector_socket.cc | 60 ++++++++++---------- src/inspector_socket.h | 5 +- src/inspector_socket_server.cc | 25 ++++----- src/inspector_socket_server.h | 15 ++++- src/node.cc | 6 +- 9 files changed, 255 insertions(+), 190 deletions(-) (limited to 'src') diff --git a/src/inspector_agent.cc b/src/inspector_agent.cc index 5daef2e1ba..a2296bebb4 100644 --- a/src/inspector_agent.cc +++ b/src/inspector_agent.cc @@ -40,8 +40,8 @@ using v8_inspector::StringBuffer; using v8_inspector::StringView; using v8_inspector::V8Inspector; -static uv_sem_t inspector_io_thread_semaphore; -static uv_async_t start_inspector_thread_async; +static uv_sem_t start_io_thread_semaphore; +static uv_async_t start_io_thread_async; class StartIoTask : public v8::Task { public: @@ -61,36 +61,36 @@ std::unique_ptr ToProtocolString(Isolate* isolate, return StringBuffer::create(StringView(*buffer, buffer.length())); } -// Called from the main thread. -void StartInspectorIoThreadAsyncCallback(uv_async_t* handle) { +// Called on the main thread. +void StartIoThreadAsyncCallback(uv_async_t* handle) { static_cast(handle->data)->StartIoThread(false); } -void StartIoCallback(Isolate* isolate, void* agent) { +void StartIoInterrupt(Isolate* isolate, void* agent) { static_cast(agent)->StartIoThread(false); } #ifdef __POSIX__ -static void EnableInspectorIOThreadSignalHandler(int signo) { - uv_sem_post(&inspector_io_thread_semaphore); +static void StartIoThreadWakeup(int signo) { + uv_sem_post(&start_io_thread_semaphore); } -inline void* InspectorIoThreadSignalThreadMain(void* unused) { +inline void* StartIoThreadMain(void* unused) { for (;;) { - uv_sem_wait(&inspector_io_thread_semaphore); - Agent* agent = static_cast(start_inspector_thread_async.data); + uv_sem_wait(&start_io_thread_semaphore); + Agent* agent = static_cast(start_io_thread_async.data); if (agent != nullptr) - agent->RequestIoStart(); + agent->RequestIoThreadStart(); } return nullptr; } -static int RegisterDebugSignalHandler() { +static int StartDebugSignalHandler() { // Start a watchdog thread for calling v8::Debug::DebugBreak() because // it's not safe to call directly from the signal handler, it can // deadlock with the thread it interrupts. - CHECK_EQ(0, uv_sem_init(&inspector_io_thread_semaphore, 0)); + CHECK_EQ(0, uv_sem_init(&start_io_thread_semaphore, 0)); pthread_attr_t attr; CHECK_EQ(0, pthread_attr_init(&attr)); // Don't shrink the thread's stack on FreeBSD. Said platform decided to @@ -101,11 +101,13 @@ static int RegisterDebugSignalHandler() { #endif // __FreeBSD__ CHECK_EQ(0, pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED)); sigset_t sigmask; + // Mask all signals. sigfillset(&sigmask); CHECK_EQ(0, pthread_sigmask(SIG_SETMASK, &sigmask, &sigmask)); pthread_t thread; const int err = pthread_create(&thread, &attr, - InspectorIoThreadSignalThreadMain, nullptr); + StartIoThreadMain, nullptr); + // Restore original mask CHECK_EQ(0, pthread_sigmask(SIG_SETMASK, &sigmask, nullptr)); CHECK_EQ(0, pthread_attr_destroy(&attr)); if (err != 0) { @@ -115,7 +117,7 @@ static int RegisterDebugSignalHandler() { // receiving the signal would terminate the process. return -err; } - RegisterSignalHandler(SIGUSR1, EnableInspectorIOThreadSignalHandler); + RegisterSignalHandler(SIGUSR1, StartIoThreadWakeup); // Unblock SIGUSR1. A pending SIGUSR1 signal will now be delivered. sigemptyset(&sigmask); sigaddset(&sigmask, SIGUSR1); @@ -126,10 +128,10 @@ static int RegisterDebugSignalHandler() { #ifdef _WIN32 -DWORD WINAPI EnableDebugThreadProc(void* arg) { - Agent* agent = static_cast(start_inspector_thread_async.data); +DWORD WINAPI StartIoThreadProc(void* arg) { + Agent* agent = static_cast(start_io_thread_async.data); if (agent != nullptr) - agent->RequestIoStart(); + agent->RequestIoThreadStart(); return 0; } @@ -138,7 +140,7 @@ static int GetDebugSignalHandlerMappingName(DWORD pid, wchar_t* buf, return _snwprintf(buf, buf_len, L"node-debug-handler-%u", pid); } -static int RegisterDebugSignalHandler() { +static int StartDebugSignalHandler() { wchar_t mapping_name[32]; HANDLE mapping_handle; DWORD pid; @@ -173,7 +175,7 @@ static int RegisterDebugSignalHandler() { return -1; } - *handler = EnableDebugThreadProc; + *handler = StartIoThreadProc; UnmapViewOfFile(static_cast(handler)); @@ -205,7 +207,7 @@ class JsBindingsSessionDelegate : public InspectorSessionDelegate { return false; } - void OnMessage(const v8_inspector::StringView& message) override { + void SendMessageToFrontend(const v8_inspector::StringView& message) override { Isolate* isolate = env_->isolate(); v8::HandleScope handle_scope(isolate); Context::Scope context_scope(env_->context()); @@ -418,7 +420,7 @@ class ChannelImpl final : public v8_inspector::V8Inspector::Channel { void flushProtocolNotifications() override { } void sendMessageToFrontend(const StringView& message) { - delegate_->OnMessage(message); + delegate_->SendMessageToFrontend(message); } InspectorSessionDelegate* const delegate_; @@ -434,7 +436,7 @@ class NodeInspectorClient : public v8_inspector::V8InspectorClient { platform_(platform), terminated_(false), running_nested_loop_(false) { - inspector_ = V8Inspector::create(env->isolate(), this); + client_ = V8Inspector::create(env->isolate(), this); } void runMessageLoopOnPause(int context_group_id) override { @@ -459,11 +461,11 @@ class NodeInspectorClient : public v8_inspector::V8InspectorClient { std::unique_ptr name_buffer = Utf8ToStringView(name); v8_inspector::V8ContextInfo info(context, CONTEXT_GROUP_ID, name_buffer->string()); - inspector_->contextCreated(info); + client_->contextCreated(info); } void contextDestroyed(Local context) { - inspector_->contextDestroyed(context); + client_->contextDestroyed(context); } void quitMessageLoopOnPause() override { @@ -473,7 +475,7 @@ class NodeInspectorClient : public v8_inspector::V8InspectorClient { void connectFrontend(InspectorSessionDelegate* delegate) { CHECK_EQ(channel_, nullptr); channel_ = std::unique_ptr( - new ChannelImpl(inspector_.get(), delegate)); + new ChannelImpl(client_.get(), delegate)); } void disconnectFrontend() { @@ -507,7 +509,7 @@ class NodeInspectorClient : public v8_inspector::V8InspectorClient { Isolate* isolate = context->GetIsolate(); - inspector_->exceptionThrown( + client_->exceptionThrown( context, StringView(DETAILS, sizeof(DETAILS) - 1), error, @@ -515,7 +517,7 @@ class NodeInspectorClient : public v8_inspector::V8InspectorClient { ToProtocolString(isolate, message->GetScriptResourceName())->string(), message->GetLineNumber(context).FromMaybe(0), message->GetStartColumn(context).FromMaybe(0), - inspector_->createStackTrace(stack_trace), + client_->createStackTrace(stack_trace), script_id); } @@ -528,12 +530,12 @@ class NodeInspectorClient : public v8_inspector::V8InspectorClient { v8::Platform* platform_; bool terminated_; bool running_nested_loop_; - std::unique_ptr inspector_; + std::unique_ptr client_; std::unique_ptr channel_; }; Agent::Agent(Environment* env) : parent_env_(env), - inspector_(nullptr), + client_(nullptr), platform_(nullptr), enabled_(false) {} @@ -546,18 +548,19 @@ bool Agent::Start(v8::Platform* platform, const char* path, const DebugOptions& options) { path_ = path == nullptr ? "" : path; debug_options_ = options; - inspector_ = + client_ = std::unique_ptr( new NodeInspectorClient(parent_env_, platform)); - inspector_->contextCreated(parent_env_->context(), "Node.js Main Context"); + client_->contextCreated(parent_env_->context(), "Node.js Main Context"); platform_ = platform; CHECK_EQ(0, uv_async_init(uv_default_loop(), - &start_inspector_thread_async, - StartInspectorIoThreadAsyncCallback)); - start_inspector_thread_async.data = this; - uv_unref(reinterpret_cast(&start_inspector_thread_async)); + &start_io_thread_async, + StartIoThreadAsyncCallback)); + start_io_thread_async.data = this; + uv_unref(reinterpret_cast(&start_io_thread_async)); - RegisterDebugSignalHandler(); + // Ignore failure, SIGUSR1 won't work, but that should not block node start. + StartDebugSignalHandler(); if (options.inspector_enabled()) { return StartIoThread(options.wait_for_connect()); } @@ -568,14 +571,14 @@ bool Agent::StartIoThread(bool wait_for_connect) { if (io_ != nullptr) return true; - CHECK_NE(inspector_, nullptr); + CHECK_NE(client_, nullptr); enabled_ = true; io_ = std::unique_ptr( new InspectorIo(parent_env_, platform_, path_, debug_options_, wait_for_connect)); if (!io_->Start()) { - inspector_.reset(); + client_.reset(); return false; } @@ -612,20 +615,16 @@ void Agent::Stop() { void Agent::Connect(InspectorSessionDelegate* delegate) { enabled_ = true; - inspector_->connectFrontend(delegate); + client_->connectFrontend(delegate); } bool Agent::IsConnected() { return io_ && io_->IsConnected(); } -bool Agent::IsStarted() { - return !!inspector_; -} - void Agent::WaitForDisconnect() { - CHECK_NE(inspector_, nullptr); - inspector_->contextDestroyed(parent_env_->context()); + CHECK_NE(client_, nullptr); + client_->contextDestroyed(parent_env_->context()); if (io_ != nullptr) { io_->WaitForDisconnect(); } @@ -634,42 +633,42 @@ void Agent::WaitForDisconnect() { void Agent::FatalException(Local error, Local message) { if (!IsStarted()) return; - inspector_->FatalException(error, message); + client_->FatalException(error, message); WaitForDisconnect(); } void Agent::Dispatch(const StringView& message) { - CHECK_NE(inspector_, nullptr); - inspector_->dispatchMessageFromFrontend(message); + CHECK_NE(client_, nullptr); + client_->dispatchMessageFromFrontend(message); } void Agent::Disconnect() { - CHECK_NE(inspector_, nullptr); - inspector_->disconnectFrontend(); + CHECK_NE(client_, nullptr); + client_->disconnectFrontend(); } void Agent::RunMessageLoop() { - CHECK_NE(inspector_, nullptr); - inspector_->runMessageLoopOnPause(CONTEXT_GROUP_ID); + CHECK_NE(client_, nullptr); + client_->runMessageLoopOnPause(CONTEXT_GROUP_ID); } InspectorSessionDelegate* Agent::delegate() { - CHECK_NE(inspector_, nullptr); - ChannelImpl* channel = inspector_->channel(); + CHECK_NE(client_, nullptr); + ChannelImpl* channel = client_->channel(); if (channel == nullptr) return nullptr; return channel->delegate(); } void Agent::PauseOnNextJavascriptStatement(const std::string& reason) { - ChannelImpl* channel = inspector_->channel(); + ChannelImpl* channel = client_->channel(); if (channel != nullptr) channel->schedulePauseOnNextStatement(reason); } // static -void Agent::InitJSBindings(Local target, Local unused, - Local context, void* priv) { +void Agent::InitInspector(Local target, Local unused, + Local context, void* priv) { Environment* env = Environment::GetCurrent(context); Agent* agent = env->inspector_agent(); env->SetMethod(target, "consoleCall", InspectorConsoleCall); @@ -678,19 +677,19 @@ void Agent::InitJSBindings(Local target, Local unused, env->SetMethod(target, "connect", ConnectJSBindingsSession); } -void Agent::RequestIoStart() { +void Agent::RequestIoThreadStart() { // We need to attempt to interrupt V8 flow (in case Node is running // continuous JS code) and to wake up libuv thread (in case Node is wating // for IO events) - uv_async_send(&start_inspector_thread_async); + uv_async_send(&start_io_thread_async); v8::Isolate* isolate = parent_env_->isolate(); platform_->CallOnForegroundThread(isolate, new StartIoTask(this)); - isolate->RequestInterrupt(StartIoCallback, this); - uv_async_send(&start_inspector_thread_async); + isolate->RequestInterrupt(StartIoInterrupt, this); + uv_async_send(&start_io_thread_async); } } // namespace inspector } // namespace node NODE_MODULE_CONTEXT_AWARE_BUILTIN(inspector, - node::inspector::Agent::InitJSBindings); + node::inspector::Agent::InitInspector); diff --git a/src/inspector_agent.h b/src/inspector_agent.h index 2bc65f7664..9966c02adf 100644 --- a/src/inspector_agent.h +++ b/src/inspector_agent.h @@ -39,7 +39,8 @@ class InspectorSessionDelegate { public: virtual ~InspectorSessionDelegate() = default; virtual bool WaitForFrontendMessage() = 0; - virtual void OnMessage(const v8_inspector::StringView& message) = 0; + virtual void SendMessageToFrontend(const v8_inspector::StringView& message) + = 0; }; class InspectorIo; @@ -50,39 +51,53 @@ class Agent { explicit Agent(node::Environment* env); ~Agent(); + // Create client_, may create io_ if option enabled bool Start(v8::Platform* platform, const char* path, const DebugOptions& options); + // Stop and destroy io_ void Stop(); - bool IsStarted(); + bool IsStarted() { return !!client_; } + + // IO thread started, and client connected bool IsConnected(); + + void WaitForDisconnect(); void FatalException(v8::Local error, v8::Local message); + + // These methods are called by the WS protocol and JS binding to create + // inspector sessions. The inspector responds by using the delegate to send + // messages back. void Connect(InspectorSessionDelegate* delegate); - InspectorSessionDelegate* delegate(); void Disconnect(); void Dispatch(const v8_inspector::StringView& message); + InspectorSessionDelegate* delegate(); + void RunMessageLoop(); - bool enabled() { - return enabled_; - } + bool enabled() { return enabled_; } void PauseOnNextJavascriptStatement(const std::string& reason); - static void InitJSBindings(v8::Local target, - v8::Local unused, - v8::Local context, - void* priv); - bool StartIoThread(bool wait_for_connect); + // Initialize 'inspector' module bindings + static void InitInspector(v8::Local target, + v8::Local unused, + v8::Local context, + void* priv); + InspectorIo* io() { return io_.get(); } - // Can be called from any thread - void RequestIoStart(); + + // Can only be called from the the main thread. + bool StartIoThread(bool wait_for_connect); + + // Calls StartIoThread() from off the main thread. + void RequestIoThreadStart(); private: node::Environment* parent_env_; - std::unique_ptr inspector_; + std::unique_ptr client_; std::unique_ptr io_; v8::Platform* platform_; bool enabled_; diff --git a/src/inspector_io.cc b/src/inspector_io.cc index 489b1552df..9766faa39a 100644 --- a/src/inspector_io.cc +++ b/src/inspector_io.cc @@ -28,16 +28,32 @@ template using TransportAndIo = std::pair; std::string GetProcessTitle() { - // uv_get_process_title will trim the title if it is too long. char title[2048]; int err = uv_get_process_title(title, sizeof(title)); if (err == 0) { return title; } else { + // Title is too long, or could not be retrieved. return "Node.js"; } } +std::string ScriptPath(uv_loop_t* loop, const std::string& script_name) { + std::string script_path; + + if (!script_name.empty()) { + uv_fs_t req; + req.ptr = nullptr; + if (0 == uv_fs_realpath(loop, &req, script_name.c_str(), nullptr)) { + CHECK_NE(req.ptr, nullptr); + script_path = std::string(static_cast(req.ptr)); + } + uv_fs_req_cleanup(&req); + } + + return script_path; +} + // UUID RFC: https://www.ietf.org/rfc/rfc4122.txt // Used ver 4 - with numbers std::string GenerateID() { @@ -97,6 +113,7 @@ int CloseAsyncAndLoop(uv_async_t* async) { return uv_loop_close(async->loop); } +// Delete main_thread_req_ on async handle close void ReleasePairOnAsyncClose(uv_handle_t* async) { AsyncAndAgent* pair = node::ContainerOf(&AsyncAndAgent::first, reinterpret_cast(async)); @@ -118,18 +135,25 @@ class IoSessionDelegate : public InspectorSessionDelegate { public: explicit IoSessionDelegate(InspectorIo* io) : io_(io) { } bool WaitForFrontendMessage() override; - void OnMessage(const v8_inspector::StringView& message) override; + void SendMessageToFrontend(const v8_inspector::StringView& message) override; private: InspectorIo* io_; }; +// Passed to InspectorSocketServer to handle WS inspector protocol events, +// mostly session start, message received, and session end. class InspectorIoDelegate: public node::inspector::SocketServerDelegate { public: InspectorIoDelegate(InspectorIo* io, const std::string& script_path, const std::string& script_name, bool wait); + // Calls PostIncomingMessage() with appropriate InspectorAction: + // kStartSession bool StartSession(int session_id, const std::string& target_id) override; + // kSendMessage void MessageReceived(int session_id, const std::string& message) override; + // kEndSession void EndSession(int session_id) override; + std::vector GetTargetIds() override; std::string GetTargetTitle(const std::string& id) override; std::string GetTargetUrl(const std::string& id) override; @@ -137,6 +161,7 @@ class InspectorIoDelegate: public node::inspector::SocketServerDelegate { void ServerDone() override { io_->ServerDone(); } + private: InspectorIo* io_; bool connected_; @@ -172,27 +197,27 @@ InspectorIo::InspectorIo(Environment* env, v8::Platform* platform, bool wait_for_connect) : options_(options), thread_(), delegate_(nullptr), state_(State::kNew), parent_env_(env), - io_thread_req_(), platform_(platform), + thread_req_(), platform_(platform), dispatching_messages_(false), session_id_(0), script_name_(path), wait_for_connect_(wait_for_connect), port_(-1) { main_thread_req_ = new AsyncAndAgent({uv_async_t(), env->inspector_agent()}); CHECK_EQ(0, uv_async_init(env->event_loop(), &main_thread_req_->first, - InspectorIo::MainThreadAsyncCb)); + InspectorIo::MainThreadReqAsyncCb)); uv_unref(reinterpret_cast(&main_thread_req_->first)); - CHECK_EQ(0, uv_sem_init(&start_sem_, 0)); + CHECK_EQ(0, uv_sem_init(&thread_start_sem_, 0)); } InspectorIo::~InspectorIo() { - uv_sem_destroy(&start_sem_); + uv_sem_destroy(&thread_start_sem_); uv_close(reinterpret_cast(&main_thread_req_->first), ReleasePairOnAsyncClose); } bool InspectorIo::Start() { CHECK_EQ(state_, State::kNew); - CHECK_EQ(uv_thread_create(&thread_, InspectorIo::ThreadCbIO, this), 0); - uv_sem_wait(&start_sem_); + CHECK_EQ(uv_thread_create(&thread_, InspectorIo::ThreadMain, this), 0); + uv_sem_wait(&thread_start_sem_); if (state_ == State::kError) { return false; @@ -234,76 +259,66 @@ void InspectorIo::WaitForDisconnect() { } // static -void InspectorIo::ThreadCbIO(void* io) { - static_cast(io)->WorkerRunIO(); +void InspectorIo::ThreadMain(void* io) { + static_cast(io)->ThreadMain(); } // static template -void InspectorIo::WriteCbIO(uv_async_t* async) { - TransportAndIo* io_and_transport = +void InspectorIo::IoThreadAsyncCb(uv_async_t* async) { + TransportAndIo* transport_and_io = static_cast*>(async->data); - if (io_and_transport == nullptr) { + if (transport_and_io == nullptr) { return; } - MessageQueue outgoing_messages; - InspectorIo* io = io_and_transport->second; - io->SwapBehindLock(&io->outgoing_message_queue_, &outgoing_messages); - for (const auto& outgoing : outgoing_messages) { + Transport* transport = transport_and_io->first; + InspectorIo* io = transport_and_io->second; + MessageQueue outgoing_message_queue; + io->SwapBehindLock(&io->outgoing_message_queue_, &outgoing_message_queue); + for (const auto& outgoing : outgoing_message_queue) { switch (std::get<0>(outgoing)) { case TransportAction::kKill: - io_and_transport->first->TerminateConnections(); + transport->TerminateConnections(); // Fallthrough case TransportAction::kStop: - io_and_transport->first->Stop(nullptr); + transport->Stop(nullptr); break; case TransportAction::kSendMessage: std::string message = StringViewToUtf8(std::get<2>(outgoing)->string()); - io_and_transport->first->Send(std::get<1>(outgoing), message); + transport->Send(std::get<1>(outgoing), message); break; } } } template -void InspectorIo::WorkerRunIO() { +void InspectorIo::ThreadMain() { uv_loop_t loop; loop.data = nullptr; int err = uv_loop_init(&loop); CHECK_EQ(err, 0); - io_thread_req_.data = nullptr; - err = uv_async_init(&loop, &io_thread_req_, WriteCbIO); + thread_req_.data = nullptr; + err = uv_async_init(&loop, &thread_req_, IoThreadAsyncCb); CHECK_EQ(err, 0); - std::string script_path; - if (!script_name_.empty()) { - uv_fs_t req; - req.ptr = nullptr; - if (0 == uv_fs_realpath(&loop, &req, script_name_.c_str(), nullptr)) { - CHECK_NE(req.ptr, nullptr); - script_path = std::string(static_cast(req.ptr)); - } - uv_fs_req_cleanup(&req); - } + std::string script_path = ScriptPath(&loop, script_name_); InspectorIoDelegate delegate(this, script_path, script_name_, wait_for_connect_); delegate_ = &delegate; - InspectorSocketServer server(&delegate, - options_.host_name(), - options_.port()); + Transport server(&delegate, &loop, options_.host_name(), options_.port()); TransportAndIo queue_transport(&server, this); - io_thread_req_.data = &queue_transport; - if (!server.Start(&loop)) { + thread_req_.data = &queue_transport; + if (!server.Start()) { state_ = State::kError; // Safe, main thread is waiting on semaphore - CHECK_EQ(0, CloseAsyncAndLoop(&io_thread_req_)); - uv_sem_post(&start_sem_); + CHECK_EQ(0, CloseAsyncAndLoop(&thread_req_)); + uv_sem_post(&thread_start_sem_); return; } port_ = server.port(); // Safe, main thread is waiting on semaphore. if (!wait_for_connect_) { - uv_sem_post(&start_sem_); + uv_sem_post(&thread_start_sem_); } uv_run(&loop, UV_RUN_DEFAULT); - io_thread_req_.data = nullptr; + thread_req_.data = nullptr; CHECK_EQ(uv_loop_close(&loop), 0); delegate_ = nullptr; } @@ -339,7 +354,7 @@ void InspectorIo::PostIncomingMessage(InspectorAction action, int session_id, NotifyMessageReceived(); } -void InspectorIo::WaitForFrontendMessage() { +void InspectorIo::WaitForIncomingMessage() { Mutex::ScopedLock scoped_lock(state_lock_); if (incoming_message_queue_.empty()) incoming_message_cond_.Wait(scoped_lock); @@ -394,7 +409,7 @@ void InspectorIo::DispatchMessages() { } // static -void InspectorIo::MainThreadAsyncCb(uv_async_t* req) { +void InspectorIo::MainThreadReqAsyncCb(uv_async_t* req) { AsyncAndAgent* pair = node::ContainerOf(&AsyncAndAgent::first, req); // Note that this may be called after io was closed or even after a new // one was created and ran. @@ -409,7 +424,7 @@ void InspectorIo::Write(TransportAction action, int session_id, return; AppendMessage(&outgoing_message_queue_, action, session_id, StringBuffer::create(inspector_message)); - int err = uv_async_send(&io_thread_req_); + int err = uv_async_send(&thread_req_); CHECK_EQ(0, err); } @@ -471,11 +486,12 @@ std::string InspectorIoDelegate::GetTargetUrl(const std::string& id) { } bool IoSessionDelegate::WaitForFrontendMessage() { - io_->WaitForFrontendMessage(); + io_->WaitForIncomingMessage(); return true; } -void IoSessionDelegate::OnMessage(const v8_inspector::StringView& message) { +void IoSessionDelegate::SendMessageToFrontend( + const v8_inspector::StringView& message) { io_->Write(TransportAction::kSendMessage, io_->session_id_, message); } diff --git a/src/inspector_io.h b/src/inspector_io.h index b323db4504..6c9c2d2664 100644 --- a/src/inspector_io.h +++ b/src/inspector_io.h @@ -50,22 +50,25 @@ class InspectorIo { bool wait_for_connect); ~InspectorIo(); - // Start the inspector agent thread + // Start the inspector agent thread, waiting for it to initialize, + // and waiting as well for a connection if wait_for_connect. bool Start(); - // Stop the inspector agent + // Stop the inspector agent thread. void Stop(); bool IsStarted(); bool IsConnected(); - void WaitForDisconnect(); + void WaitForDisconnect(); + // Called from thread to queue an incoming message and trigger + // DispatchMessages() on the main thread. void PostIncomingMessage(InspectorAction action, int session_id, const std::string& message); void ResumeStartup() { - uv_sem_post(&start_sem_); + uv_sem_post(&thread_start_sem_); } void ServerDone() { - uv_close(reinterpret_cast(&io_thread_req_), nullptr); + uv_close(reinterpret_cast(&thread_req_), nullptr); } int port() const { return port_; } @@ -84,46 +87,65 @@ class InspectorIo { kShutDown }; - static void ThreadCbIO(void* agent); - static void MainThreadAsyncCb(uv_async_t* req); + // Callback for main_thread_req_'s uv_async_t + static void MainThreadReqAsyncCb(uv_async_t* req); + + // Wrapper for agent->ThreadMain() + static void ThreadMain(void* agent); + + // Runs a uv_loop_t + template void ThreadMain(); + // Called by ThreadMain's loop when triggered by thread_req_, writes + // messages from outgoing_message_queue to the InspectorSockerServer + template static void IoThreadAsyncCb(uv_async_t* async); - template static void WriteCbIO(uv_async_t* async); - template void WorkerRunIO(); void SetConnected(bool connected); void DispatchMessages(); + // Write action to outgoing_message_queue, and wake the thread void Write(TransportAction action, int session_id, const v8_inspector::StringView& message); + // Thread-safe append of message to a queue. Return true if the queue + // used to be empty. template bool AppendMessage(MessageQueue* vector, ActionType action, int session_id, std::unique_ptr buffer); + // Used as equivalent of a thread-safe "pop" of an entire queue's content. template void SwapBehindLock(MessageQueue* vector1, MessageQueue* vector2); - void WaitForFrontendMessage(); + // Wait on incoming_message_cond_ + void WaitForIncomingMessage(); + // Broadcast incoming_message_cond_ void NotifyMessageReceived(); - bool StartThread(bool wait); - - // Message queues - ConditionVariable incoming_message_cond_; const DebugOptions options_; - uv_sem_t start_sem_; - Mutex state_lock_; + + // The IO thread runs its own uv_loop to implement the TCP server off + // the main thread. uv_thread_t thread_; + // Used by Start() to wait for thread to initialize, or for it to initialize + // and receive a connection if wait_for_connect was requested. + uv_sem_t thread_start_sem_; InspectorIoDelegate* delegate_; State state_; node::Environment* parent_env_; - uv_async_t io_thread_req_; + // Attached to the uv_loop in ThreadMain() + uv_async_t thread_req_; // Note that this will live while the async is being closed - likely, past // the parent object lifespan std::pair* main_thread_req_; std::unique_ptr session_delegate_; v8::Platform* platform_; + + // Message queues + ConditionVariable incoming_message_cond_; + Mutex state_lock_; // Locked before mutating either queue. MessageQueue incoming_message_queue_; MessageQueue outgoing_message_queue_; + bool dispatching_messages_; int session_id_; diff --git a/src/inspector_socket.cc b/src/inspector_socket.cc index 495cb3b3aa..85984b7fa1 100644 --- a/src/inspector_socket.cc +++ b/src/inspector_socket.cc @@ -68,7 +68,7 @@ static void dispose_inspector(uv_handle_t* handle) { } static void close_connection(InspectorSocket* inspector) { - uv_handle_t* socket = reinterpret_cast(&inspector->client); + uv_handle_t* socket = reinterpret_cast(&inspector->tcp); if (!uv_is_closing(socket)) { uv_read_stop(reinterpret_cast(socket)); uv_close(socket, dispose_inspector); @@ -107,7 +107,7 @@ static int write_to_client(InspectorSocket* inspector, // Freed in write_request_cleanup WriteRequest* wr = new WriteRequest(inspector, msg, len); - uv_stream_t* stream = reinterpret_cast(&inspector->client); + uv_stream_t* stream = reinterpret_cast(&inspector->tcp); return uv_write(&wr->req, stream, &wr->buf, 1, write_cb) < 0; } @@ -253,7 +253,7 @@ static void invoke_read_callback(InspectorSocket* inspector, int status, const uv_buf_t* buf) { if (inspector->ws_state->read_cb) { inspector->ws_state->read_cb( - reinterpret_cast(&inspector->client), status, buf); + reinterpret_cast(&inspector->tcp), status, buf); } } @@ -304,7 +304,7 @@ static int parse_ws_frames(InspectorSocket* inspector) { uv_buf_t buffer; size_t len = output.size(); inspector->ws_state->alloc_cb( - reinterpret_cast(&inspector->client), + reinterpret_cast(&inspector->tcp), len, &buffer); CHECK_GE(buffer.len, len); memcpy(buffer.base, &output[0], len); @@ -360,14 +360,14 @@ static void websockets_data_cb(uv_stream_t* stream, ssize_t nread, } int inspector_read_start(InspectorSocket* inspector, - uv_alloc_cb alloc_cb, uv_read_cb read_cb) { + uv_alloc_cb alloc_cb, uv_read_cb read_cb) { ASSERT(inspector->ws_mode); ASSERT(!inspector->shutting_down || read_cb == nullptr); inspector->ws_state->close_sent = false; inspector->ws_state->alloc_cb = alloc_cb; inspector->ws_state->read_cb = read_cb; int err = - uv_read_start(reinterpret_cast(&inspector->client), + uv_read_start(reinterpret_cast(&inspector->tcp), prepare_buffer, websockets_data_cb); if (err < 0) { @@ -377,7 +377,7 @@ int inspector_read_start(InspectorSocket* inspector, } void inspector_read_stop(InspectorSocket* inspector) { - uv_read_stop(reinterpret_cast(&inspector->client)); + uv_read_stop(reinterpret_cast(&inspector->tcp)); inspector->ws_state->alloc_cb = nullptr; inspector->ws_state->read_cb = nullptr; } @@ -426,7 +426,7 @@ static int path_cb(http_parser* parser, const char* at, size_t length) { } static void handshake_complete(InspectorSocket* inspector) { - uv_read_stop(reinterpret_cast(&inspector->client)); + uv_read_stop(reinterpret_cast(&inspector->tcp)); handshake_cb callback = inspector->http_parsing_state->callback; inspector->ws_state = new ws_state_s(); inspector->ws_mode = true; @@ -448,7 +448,7 @@ static void report_handshake_failure_cb(uv_handle_t* handle) { } static void close_and_report_handshake_failure(InspectorSocket* inspector) { - uv_handle_t* socket = reinterpret_cast(&inspector->client); + uv_handle_t* socket = reinterpret_cast(&inspector->tcp); if (uv_is_closing(socket)) { report_handshake_failure_cb(socket); } else { @@ -474,7 +474,7 @@ static void handshake_failed(InspectorSocket* inspector) { } // init_handshake references message_complete_cb -static void init_handshake(InspectorSocket* inspector); +static void init_handshake(InspectorSocket* socket); static int message_complete_cb(http_parser* parser) { InspectorSocket* inspector = static_cast(parser->data); @@ -513,7 +513,7 @@ static int message_complete_cb(http_parser* parser) { return 0; } -static void data_received_cb(uv_stream_s* client, ssize_t nread, +static void data_received_cb(uv_stream_s* tcp, ssize_t nread, const uv_buf_t* buf) { #if DUMP_READS if (nread >= 0) { @@ -523,7 +523,7 @@ static void data_received_cb(uv_stream_s* client, ssize_t nread, printf("[%s:%d] %s\n", __FUNCTION__, __LINE__, uv_err_name(nread)); } #endif - InspectorSocket* inspector = inspector_from_stream(client); + InspectorSocket* inspector = inspector_from_stream(tcp); reclaim_uv_buf(inspector, buf, nread); if (nread < 0 || nread == UV_EOF) { close_and_report_handshake_failure(inspector); @@ -542,15 +542,15 @@ static void data_received_cb(uv_stream_s* client, ssize_t nread, } } -static void init_handshake(InspectorSocket* inspector) { - http_parsing_state_s* state = inspector->http_parsing_state; +static void init_handshake(InspectorSocket* socket) { + http_parsing_state_s* state = socket->http_parsing_state; CHECK_NE(state, nullptr); state->current_header.clear(); state->ws_key.clear(); state->path.clear(); state->done = false; http_parser_init(&state->parser, HTTP_REQUEST); - state->parser.data = inspector; + state->parser.data = socket; http_parser_settings* settings = &state->parser_settings; http_parser_settings_init(settings); settings->on_header_field = header_field_cb; @@ -559,26 +559,26 @@ static void init_handshake(InspectorSocket* inspector) { settings->on_url = path_cb; } -int inspector_accept(uv_stream_t* server, InspectorSocket* inspector, +int inspector_accept(uv_stream_t* server, InspectorSocket* socket, handshake_cb callback) { ASSERT_NE(callback, nullptr); - CHECK_EQ(inspector->http_parsing_state, nullptr); + CHECK_EQ(socket->http_parsing_state, nullptr); - inspector->http_parsing_state = new http_parsing_state_s(); - uv_stream_t* client = reinterpret_cast(&inspector->client); - int err = uv_tcp_init(server->loop, &inspector->client); + socket->http_parsing_state = new http_parsing_state_s(); + uv_stream_t* tcp = reinterpret_cast(&socket->tcp); + int err = uv_tcp_init(server->loop, &socket->tcp); if (err == 0) { - err = uv_accept(server, client); + err = uv_accept(server, tcp); } if (err == 0) { - init_handshake(inspector); - inspector->http_parsing_state->callback = callback; - err = uv_read_start(client, prepare_buffer, + init_handshake(socket); + socket->http_parsing_state->callback = callback; + err = uv_read_start(tcp, prepare_buffer, data_received_cb); } if (err != 0) { - uv_close(reinterpret_cast(client), NULL); + uv_close(reinterpret_cast(tcp), NULL); } return err; } @@ -594,10 +594,10 @@ void inspector_write(InspectorSocket* inspector, const char* data, } void inspector_close(InspectorSocket* inspector, - inspector_cb callback) { + inspector_cb callback) { // libuv throws assertions when closing stream that's already closed - we // need to do the same. - ASSERT(!uv_is_closing(reinterpret_cast(&inspector->client))); + ASSERT(!uv_is_closing(reinterpret_cast(&inspector->tcp))); ASSERT(!inspector->shutting_down); inspector->shutting_down = true; inspector->ws_state->close_cb = callback; @@ -612,9 +612,9 @@ void inspector_close(InspectorSocket* inspector, } bool inspector_is_active(const InspectorSocket* inspector) { - const uv_handle_t* client = - reinterpret_cast(&inspector->client); - return !inspector->shutting_down && !uv_is_closing(client); + const uv_handle_t* tcp = + reinterpret_cast(&inspector->tcp); + return !inspector->shutting_down && !uv_is_closing(tcp); } void InspectorSocket::reinit() { diff --git a/src/inspector_socket.h b/src/inspector_socket.h index 558d87bcb7..7cd8254fb3 100644 --- a/src/inspector_socket.h +++ b/src/inspector_socket.h @@ -48,6 +48,7 @@ struct ws_state_s { bool received_close; }; +// HTTP Wrapper around a uv_tcp_t class InspectorSocket { public: InspectorSocket() : data(nullptr), http_parsing_state(nullptr), @@ -58,7 +59,7 @@ class InspectorSocket { struct http_parsing_state_s* http_parsing_state; struct ws_state_s* ws_state; std::vector buffer; - uv_tcp_t client; + uv_tcp_t tcp; bool ws_mode; bool shutting_down; bool connection_eof; @@ -82,7 +83,7 @@ void inspector_write(InspectorSocket* inspector, bool inspector_is_active(const InspectorSocket* inspector); inline InspectorSocket* inspector_from_stream(uv_tcp_t* stream) { - return node::ContainerOf(&InspectorSocket::client, stream); + return node::ContainerOf(&InspectorSocket::tcp, stream); } inline InspectorSocket* inspector_from_stream(uv_stream_t* stream) { diff --git a/src/inspector_socket_server.cc b/src/inspector_socket_server.cc index 1e1d0ff567..f3e56b6ceb 100644 --- a/src/inspector_socket_server.cc +++ b/src/inspector_socket_server.cc @@ -230,9 +230,9 @@ class SocketSession { private: enum class State { kHttp, kWebSocket, kClosing, kEOF, kDeclined }; - static void CloseCallback_(InspectorSocket* socket, int code); - static void ReadCallback_(uv_stream_t* stream, ssize_t read, - const uv_buf_t* buf); + static void CloseCallback(InspectorSocket* socket, int code); + static void ReadCallback(uv_stream_t* stream, ssize_t read, + const uv_buf_t* buf); void OnRemoteDataIO(ssize_t read, const uv_buf_t* buf); const int id_; InspectorSocket socket_; @@ -242,9 +242,10 @@ class SocketSession { }; InspectorSocketServer::InspectorSocketServer(SocketServerDelegate* delegate, + uv_loop_t* loop, const std::string& host, int port, - FILE* out) : loop_(nullptr), + FILE* out) : loop_(loop), delegate_(delegate), host_(host), port_(port), @@ -255,7 +256,6 @@ InspectorSocketServer::InspectorSocketServer(SocketServerDelegate* delegate, state_ = ServerState::kNew; } - // static bool InspectorSocketServer::HandshakeCallback(InspectorSocket* socket, inspector_handshake_event event, @@ -361,7 +361,7 @@ void InspectorSocketServer::SendListResponse(InspectorSocket* socket) { } if (!connected) { std::string host; - GetSocketHost(&socket->client, &host); + GetSocketHost(&socket->tcp, &host); std::string address = GetWsUrl(host, port_, id); std::ostringstream frontend_url; frontend_url << "chrome-devtools://devtools/bundled"; @@ -374,9 +374,8 @@ void InspectorSocketServer::SendListResponse(InspectorSocket* socket) { SendHttpResponse(socket, MapsToString(response)); } -bool InspectorSocketServer::Start(uv_loop_t* loop) { +bool InspectorSocketServer::Start() { CHECK_EQ(state_, ServerState::kNew); - loop_ = loop; sockaddr_in addr; uv_tcp_init(loop_, &server_); uv_ip4_addr(host_.c_str(), port_, &addr); @@ -470,11 +469,11 @@ SocketSession::SocketSession(InspectorSocketServer* server, int id) void SocketSession::Close() { CHECK_NE(state_, State::kClosing); state_ = State::kClosing; - inspector_close(&socket_, CloseCallback_); + inspector_close(&socket_, CloseCallback); } // static -void SocketSession::CloseCallback_(InspectorSocket* socket, int code) { +void SocketSession::CloseCallback(InspectorSocket* socket, int code) { SocketSession* session = SocketSession::From(socket); CHECK_EQ(State::kClosing, session->state_); session->server_->SessionTerminated(session); @@ -483,12 +482,12 @@ void SocketSession::CloseCallback_(InspectorSocket* socket, int code) { void SocketSession::FrontendConnected() { CHECK_EQ(State::kHttp, state_); state_ = State::kWebSocket; - inspector_read_start(&socket_, OnBufferAlloc, ReadCallback_); + inspector_read_start(&socket_, OnBufferAlloc, ReadCallback); } // static -void SocketSession::ReadCallback_(uv_stream_t* stream, ssize_t read, - const uv_buf_t* buf) { +void SocketSession::ReadCallback(uv_stream_t* stream, ssize_t read, + const uv_buf_t* buf) { InspectorSocket* socket = inspector_from_stream(stream); SocketSession::From(socket)->OnRemoteDataIO(read, buf); } diff --git a/src/inspector_socket_server.h b/src/inspector_socket_server.h index 8c8e2aaade..4c606ee77a 100644 --- a/src/inspector_socket_server.h +++ b/src/inspector_socket_server.h @@ -30,17 +30,30 @@ class SocketServerDelegate { virtual void ServerDone() = 0; }; +// HTTP Server, writes messages requested as TransportActions, and responds +// to HTTP requests and WS upgrades. + + + class InspectorSocketServer { public: using ServerCallback = void (*)(InspectorSocketServer*); InspectorSocketServer(SocketServerDelegate* delegate, + uv_loop_t* loop, const std::string& host, int port, FILE* out = stderr); - bool Start(uv_loop_t* loop); + // Start listening on host/port + bool Start(); + + // Called by the TransportAction sent with InspectorIo::Write(): + // kKill and kStop void Stop(ServerCallback callback); + // kSendMessage void Send(int session_id, const std::string& message); + // kKill void TerminateConnections(); + int port() { return port_; } diff --git a/src/node.cc b/src/node.cc index b29df6449f..37bfa96f9e 100644 --- a/src/node.cc +++ b/src/node.cc @@ -4010,8 +4010,8 @@ static void ParseArgs(int* argc, } -static void StartDebug(Environment* env, const char* path, - DebugOptions debug_options) { +static void StartInspector(Environment* env, const char* path, + DebugOptions debug_options) { #if HAVE_INSPECTOR CHECK(!env->inspector_agent()->IsStarted()); v8_platform.StartInspector(env, path, debug_options); @@ -4503,7 +4503,7 @@ inline int Start(Isolate* isolate, IsolateData* isolate_data, env.Start(argc, argv, exec_argc, exec_argv, v8_is_profiling); const char* path = argc > 1 ? argv[1] : nullptr; - StartDebug(&env, path, debug_options); + StartInspector(&env, path, debug_options); if (debug_options.inspector_enabled() && !v8_platform.InspectorStarted(&env)) return 12; // Signal internal error. -- cgit v1.2.3