diff options
-rw-r--r-- | src/node_api.cc | 73 | ||||
-rw-r--r-- | src/node_crypto.cc | 99 | ||||
-rw-r--r-- | src/node_internals.h | 35 | ||||
-rw-r--r-- | src/node_zlib.cc | 29 |
4 files changed, 108 insertions, 128 deletions
diff --git a/src/node_api.cc b/src/node_api.cc index 91a47a12d9..b456ade2d4 100644 --- a/src/node_api.cc +++ b/src/node_api.cc @@ -3338,7 +3338,7 @@ static napi_status ConvertUVErrorCode(int code) { } // Wrapper around uv_work_t which calls user-provided callbacks. -class Work : public node::AsyncResource { +class Work : public node::AsyncResource, public node::ThreadPoolWork { private: explicit Work(napi_env env, v8::Local<v8::Object> async_resource, @@ -3349,15 +3349,14 @@ class Work : public node::AsyncResource { : AsyncResource(env->isolate, async_resource, *v8::String::Utf8Value(env->isolate, async_resource_name)), - _env(env), - _data(data), - _execute(execute), - _complete(complete) { - memset(&_request, 0, sizeof(_request)); - _request.data = this; + ThreadPoolWork(node::Environment::GetCurrent(env->isolate)), + _env(env), + _data(data), + _execute(execute), + _complete(complete) { } - ~Work() { } + virtual ~Work() { } public: static Work* New(napi_env env, @@ -3374,47 +3373,36 @@ class Work : public node::AsyncResource { delete work; } - static void ExecuteCallback(uv_work_t* req) { - Work* work = static_cast<Work*>(req->data); - work->_execute(work->_env, work->_data); + void DoThreadPoolWork() override { + _execute(_env, _data); } - static void CompleteCallback(uv_work_t* req, int status) { - Work* work = static_cast<Work*>(req->data); + void AfterThreadPoolWork(int status) { + if (_complete == nullptr) + return; - if (work->_complete != nullptr) { - napi_env env = work->_env; + // Establish a handle scope here so that every callback doesn't have to. + // Also it is needed for the exception-handling below. + v8::HandleScope scope(_env->isolate); - // Establish a handle scope here so that every callback doesn't have to. - // Also it is needed for the exception-handling below. - v8::HandleScope scope(env->isolate); - node::Environment* env_ = node::Environment::GetCurrent(env->isolate); - env_->DecreaseWaitingRequestCounter(); + CallbackScope callback_scope(this); - CallbackScope callback_scope(work); + NAPI_CALL_INTO_MODULE(_env, + _complete(_env, ConvertUVErrorCode(status), _data), + [this] (v8::Local<v8::Value> local_err) { + // If there was an unhandled exception in the complete callback, + // report it as a fatal exception. (There is no JavaScript on the + // callstack that can possibly handle it.) + v8impl::trigger_fatal_exception(_env, local_err); + }); - NAPI_CALL_INTO_MODULE(env, - work->_complete(env, ConvertUVErrorCode(status), work->_data), - [env] (v8::Local<v8::Value> local_err) { - // If there was an unhandled exception in the complete callback, - // report it as a fatal exception. (There is no JavaScript on the - // callstack that can possibly handle it.) - v8impl::trigger_fatal_exception(env, local_err); - }); - - // Note: Don't access `work` after this point because it was - // likely deleted by the complete callback. - } - } - - uv_work_t* Request() { - return &_request; + // Note: Don't access `work` after this point because it was + // likely deleted by the complete callback. } private: napi_env _env; void* _data; - uv_work_t _request; napi_async_execute_callback _execute; napi_async_complete_callback _complete; }; @@ -3491,12 +3479,7 @@ napi_status napi_queue_async_work(napi_env env, napi_async_work work) { uvimpl::Work* w = reinterpret_cast<uvimpl::Work*>(work); - node::Environment* env_ = node::Environment::GetCurrent(env->isolate); - env_->IncreaseWaitingRequestCounter(); - CALL_UV(env, uv_queue_work(event_loop, - w->Request(), - uvimpl::Work::ExecuteCallback, - uvimpl::Work::CompleteCallback)); + w->ScheduleWork(); return napi_clear_last_error(env); } @@ -3507,7 +3490,7 @@ napi_status napi_cancel_async_work(napi_env env, napi_async_work work) { uvimpl::Work* w = reinterpret_cast<uvimpl::Work*>(work); - CALL_UV(env, uv_cancel(reinterpret_cast<uv_req_t*>(w->Request()))); + CALL_UV(env, w->CancelWork()); return napi_clear_last_error(env); } diff --git a/src/node_crypto.cc b/src/node_crypto.cc index 10e4f59391..8235b8b01c 100644 --- a/src/node_crypto.cc +++ b/src/node_crypto.cc @@ -4556,7 +4556,7 @@ bool ECDH::IsKeyPairValid() { } -class PBKDF2Request : public AsyncWrap { +class PBKDF2Request : public AsyncWrap, public ThreadPoolWork { public: PBKDF2Request(Environment* env, Local<Object> object, @@ -4566,6 +4566,7 @@ class PBKDF2Request : public AsyncWrap { int keylen, int iteration_count) : AsyncWrap(env, object, AsyncWrap::PROVIDER_PBKDF2REQUEST), + ThreadPoolWork(env), digest_(digest), success_(false), pass_(std::move(pass)), @@ -4574,21 +4575,14 @@ class PBKDF2Request : public AsyncWrap { iteration_count_(iteration_count) { } - uv_work_t* work_req() { - return &work_req_; - } - size_t self_size() const override { return sizeof(*this); } - static void Work(uv_work_t* work_req); - void Work(); + void DoThreadPoolWork() override; + void AfterThreadPoolWork(int status) override; - static void After(uv_work_t* work_req, int status); void After(Local<Value> (*argv)[2]); - void After(); private: - uv_work_t work_req_; const EVP_MD* digest_; bool success_; MallocedBuffer<char> pass_; @@ -4598,7 +4592,7 @@ class PBKDF2Request : public AsyncWrap { }; -void PBKDF2Request::Work() { +void PBKDF2Request::DoThreadPoolWork() { success_ = PKCS5_PBKDF2_HMAC( pass_.data, pass_.size, @@ -4611,12 +4605,6 @@ void PBKDF2Request::Work() { } -void PBKDF2Request::Work(uv_work_t* work_req) { - PBKDF2Request* req = ContainerOf(&PBKDF2Request::work_req_, work_req); - req->Work(); -} - - void PBKDF2Request::After(Local<Value> (*argv)[2]) { if (success_) { (*argv)[0] = Null(env()->isolate()); @@ -4629,7 +4617,12 @@ void PBKDF2Request::After(Local<Value> (*argv)[2]) { } -void PBKDF2Request::After() { +void PBKDF2Request::AfterThreadPoolWork(int status) { + std::unique_ptr<PBKDF2Request> req(this); + if (status == UV_ECANCELED) + return; + CHECK_EQ(status, 0); + HandleScope handle_scope(env()->isolate()); Context::Scope context_scope(env()->context()); Local<Value> argv[2]; @@ -4638,17 +4631,6 @@ void PBKDF2Request::After() { } -void PBKDF2Request::After(uv_work_t* work_req, int status) { - std::unique_ptr<PBKDF2Request> req( - ContainerOf(&PBKDF2Request::work_req_, work_req)); - req->env()->DecreaseWaitingRequestCounter(); - if (status == UV_ECANCELED) - return; - CHECK_EQ(status, 0); - req->After(); -} - - void PBKDF2(const FunctionCallbackInfo<Value>& args) { Environment* env = Environment::GetCurrent(args); @@ -4695,14 +4677,10 @@ void PBKDF2(const FunctionCallbackInfo<Value>& args) { if (args[5]->IsFunction()) { obj->Set(env->context(), env->ondone_string(), args[5]).FromJust(); - env->IncreaseWaitingRequestCounter(); - uv_queue_work(env->event_loop(), - req.release()->work_req(), - PBKDF2Request::Work, - PBKDF2Request::After); + req.release()->ScheduleWork(); } else { env->PrintSyncTrace(); - req->Work(); + req->DoThreadPoolWork(); Local<Value> argv[2]; req->After(&argv); @@ -4715,7 +4693,7 @@ void PBKDF2(const FunctionCallbackInfo<Value>& args) { // Only instantiate within a valid HandleScope. -class RandomBytesRequest : public AsyncWrap { +class RandomBytesRequest : public AsyncWrap, public ThreadPoolWork { public: enum FreeMode { FREE_DATA, DONT_FREE_DATA }; @@ -4725,16 +4703,13 @@ class RandomBytesRequest : public AsyncWrap { char* data, FreeMode free_mode) : AsyncWrap(env, object, AsyncWrap::PROVIDER_RANDOMBYTESREQUEST), + ThreadPoolWork(env), error_(0), size_(size), data_(data), free_mode_(free_mode) { } - uv_work_t* work_req() { - return &work_req_; - } - inline size_t size() const { return size_; } @@ -4772,7 +4747,8 @@ class RandomBytesRequest : public AsyncWrap { size_t self_size() const override { return sizeof(*this); } - uv_work_t work_req_; + void DoThreadPoolWork() override; + void AfterThreadPoolWork(int status) override; private: unsigned long error_; // NOLINT(runtime/int) @@ -4782,21 +4758,17 @@ class RandomBytesRequest : public AsyncWrap { }; -void RandomBytesWork(uv_work_t* work_req) { - RandomBytesRequest* req = - ContainerOf(&RandomBytesRequest::work_req_, work_req); - +void RandomBytesRequest::DoThreadPoolWork() { // Ensure that OpenSSL's PRNG is properly seeded. CheckEntropy(); - const int r = RAND_bytes(reinterpret_cast<unsigned char*>(req->data()), - req->size()); + const int r = RAND_bytes(reinterpret_cast<unsigned char*>(data_), size_); // RAND_bytes() returns 0 on error. if (r == 0) { - req->set_error(ERR_get_error()); // NOLINT(runtime/int) + set_error(ERR_get_error()); // NOLINT(runtime/int) } else if (r == -1) { - req->set_error(static_cast<unsigned long>(-1)); // NOLINT(runtime/int) + set_error(static_cast<unsigned long>(-1)); // NOLINT(runtime/int) } } @@ -4834,19 +4806,16 @@ void RandomBytesCheck(RandomBytesRequest* req, Local<Value> (*argv)[2]) { } -void RandomBytesAfter(uv_work_t* work_req, int status) { - std::unique_ptr<RandomBytesRequest> req( - ContainerOf(&RandomBytesRequest::work_req_, work_req)); - Environment* env = req->env(); - env->DecreaseWaitingRequestCounter(); +void RandomBytesRequest::AfterThreadPoolWork(int status) { + std::unique_ptr<RandomBytesRequest> req(this); if (status == UV_ECANCELED) return; CHECK_EQ(status, 0); - HandleScope handle_scope(env->isolate()); - Context::Scope context_scope(env->context()); + HandleScope handle_scope(env()->isolate()); + Context::Scope context_scope(env()->context()); Local<Value> argv[2]; - RandomBytesCheck(req.get(), &argv); - req->MakeCallback(env->ondone_string(), arraysize(argv), argv); + RandomBytesCheck(this, &argv); + MakeCallback(env()->ondone_string(), arraysize(argv), argv); } @@ -4854,7 +4823,7 @@ void RandomBytesProcessSync(Environment* env, std::unique_ptr<RandomBytesRequest> req, Local<Value> (*argv)[2]) { env->PrintSyncTrace(); - RandomBytesWork(req->work_req()); + req->DoThreadPoolWork(); RandomBytesCheck(req.get(), argv); if (!(*argv)[0]->IsNull()) @@ -4881,11 +4850,7 @@ void RandomBytes(const FunctionCallbackInfo<Value>& args) { if (args[1]->IsFunction()) { obj->Set(env->context(), env->ondone_string(), args[1]).FromJust(); - env->IncreaseWaitingRequestCounter(); - uv_queue_work(env->event_loop(), - req.release()->work_req(), - RandomBytesWork, - RandomBytesAfter); + req.release()->ScheduleWork(); args.GetReturnValue().Set(obj); } else { Local<Value> argv[2]; @@ -4921,11 +4886,7 @@ void RandomBytesBuffer(const FunctionCallbackInfo<Value>& args) { if (args[3]->IsFunction()) { obj->Set(env->context(), env->ondone_string(), args[3]).FromJust(); - env->IncreaseWaitingRequestCounter(); - uv_queue_work(env->event_loop(), - req.release()->work_req(), - RandomBytesWork, - RandomBytesAfter); + req.release()->ScheduleWork(); args.GetReturnValue().Set(obj); } else { Local<Value> argv[2]; diff --git a/src/node_internals.h b/src/node_internals.h index e15df78ffd..8aa4631880 100644 --- a/src/node_internals.h +++ b/src/node_internals.h @@ -503,6 +503,41 @@ class InternalCallbackScope { bool closed_ = false; }; +class ThreadPoolWork { + public: + explicit inline ThreadPoolWork(Environment* env) : env_(env) {} + inline void ScheduleWork(); + inline int CancelWork(); + + virtual void DoThreadPoolWork() = 0; + virtual void AfterThreadPoolWork(int status) = 0; + + private: + Environment* env_; + uv_work_t work_req_; +}; + +void ThreadPoolWork::ScheduleWork() { + env_->IncreaseWaitingRequestCounter(); + int status = uv_queue_work( + env_->event_loop(), + &work_req_, + [](uv_work_t* req) { + ThreadPoolWork* self = ContainerOf(&ThreadPoolWork::work_req_, req); + self->DoThreadPoolWork(); + }, + [](uv_work_t* req, int status) { + ThreadPoolWork* self = ContainerOf(&ThreadPoolWork::work_req_, req); + self->env_->DecreaseWaitingRequestCounter(); + self->AfterThreadPoolWork(status); + }); + CHECK_EQ(status, 0); +} + +int ThreadPoolWork::CancelWork() { + return uv_cancel(reinterpret_cast<uv_req_t*>(&work_req_)); +} + static inline const char *errno_string(int errorno) { #define ERRNO_CASE(e) case e: return #e; switch (errorno) { diff --git a/src/node_zlib.cc b/src/node_zlib.cc index 3249905dfb..c77e6d3297 100644 --- a/src/node_zlib.cc +++ b/src/node_zlib.cc @@ -70,10 +70,11 @@ enum node_zlib_mode { /** * Deflate/Inflate */ -class ZCtx : public AsyncWrap { +class ZCtx : public AsyncWrap, public ThreadPoolWork { public: ZCtx(Environment* env, Local<Object> wrap, node_zlib_mode mode) : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_ZLIB), + ThreadPoolWork(env), dictionary_(nullptr), dictionary_len_(0), err_(0), @@ -191,9 +192,6 @@ class ZCtx : public AsyncWrap { CHECK(Buffer::IsWithinBounds(out_off, out_len, Buffer::Length(out_buf))); out = reinterpret_cast<Bytef *>(Buffer::Data(out_buf) + out_off); - // build up the work request - uv_work_t* work_req = &(ctx->work_req_); - ctx->strm_.avail_in = in_len; ctx->strm_.next_in = in; ctx->strm_.avail_out = out_len; @@ -203,7 +201,7 @@ class ZCtx : public AsyncWrap { if (!async) { // sync version env->PrintSyncTrace(); - Process(work_req); + ctx->DoThreadPoolWork(); if (CheckError(ctx)) { ctx->write_result_[0] = ctx->strm_.avail_out; ctx->write_result_[1] = ctx->strm_.avail_in; @@ -214,18 +212,24 @@ class ZCtx : public AsyncWrap { } // async version - env->IncreaseWaitingRequestCounter(); - uv_queue_work(env->event_loop(), work_req, ZCtx::Process, ZCtx::After); + ctx->ScheduleWork(); } + // TODO(addaleax): Make these methods non-static. It's a significant bunch + // of churn that's better left for a separate PR. + void DoThreadPoolWork() { + Process(this); + } + + void AfterThreadPoolWork(int status) { + After(this, status); + } // thread pool! // This function may be called multiple times on the uv_work pool // for a single write() call, until all of the input bytes have // been consumed. - static void Process(uv_work_t* work_req) { - ZCtx *ctx = ContainerOf(&ZCtx::work_req_, work_req); - + static void Process(ZCtx* ctx) { const Bytef* next_expected_header_byte = nullptr; // If the avail_out is left at 0, then it means that it ran out @@ -361,12 +365,10 @@ class ZCtx : public AsyncWrap { // v8 land! - static void After(uv_work_t* work_req, int status) { - ZCtx* ctx = ContainerOf(&ZCtx::work_req_, work_req); + static void After(ZCtx* ctx, int status) { Environment* env = ctx->env(); ctx->write_in_progress_ = false; - env->DecreaseWaitingRequestCounter(); if (status == UV_ECANCELED) { ctx->Close(); return; @@ -685,7 +687,6 @@ class ZCtx : public AsyncWrap { int strategy_; z_stream strm_; int windowBits_; - uv_work_t work_req_; bool write_in_progress_; bool pending_close_; unsigned int refs_; |