summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/node_api.cc73
-rw-r--r--src/node_crypto.cc99
-rw-r--r--src/node_internals.h35
-rw-r--r--src/node_zlib.cc29
4 files changed, 108 insertions, 128 deletions
diff --git a/src/node_api.cc b/src/node_api.cc
index 91a47a12d9..b456ade2d4 100644
--- a/src/node_api.cc
+++ b/src/node_api.cc
@@ -3338,7 +3338,7 @@ static napi_status ConvertUVErrorCode(int code) {
}
// Wrapper around uv_work_t which calls user-provided callbacks.
-class Work : public node::AsyncResource {
+class Work : public node::AsyncResource, public node::ThreadPoolWork {
private:
explicit Work(napi_env env,
v8::Local<v8::Object> async_resource,
@@ -3349,15 +3349,14 @@ class Work : public node::AsyncResource {
: AsyncResource(env->isolate,
async_resource,
*v8::String::Utf8Value(env->isolate, async_resource_name)),
- _env(env),
- _data(data),
- _execute(execute),
- _complete(complete) {
- memset(&_request, 0, sizeof(_request));
- _request.data = this;
+ ThreadPoolWork(node::Environment::GetCurrent(env->isolate)),
+ _env(env),
+ _data(data),
+ _execute(execute),
+ _complete(complete) {
}
- ~Work() { }
+ virtual ~Work() { }
public:
static Work* New(napi_env env,
@@ -3374,47 +3373,36 @@ class Work : public node::AsyncResource {
delete work;
}
- static void ExecuteCallback(uv_work_t* req) {
- Work* work = static_cast<Work*>(req->data);
- work->_execute(work->_env, work->_data);
+ void DoThreadPoolWork() override {
+ _execute(_env, _data);
}
- static void CompleteCallback(uv_work_t* req, int status) {
- Work* work = static_cast<Work*>(req->data);
+ void AfterThreadPoolWork(int status) {
+ if (_complete == nullptr)
+ return;
- if (work->_complete != nullptr) {
- napi_env env = work->_env;
+ // Establish a handle scope here so that every callback doesn't have to.
+ // Also it is needed for the exception-handling below.
+ v8::HandleScope scope(_env->isolate);
- // Establish a handle scope here so that every callback doesn't have to.
- // Also it is needed for the exception-handling below.
- v8::HandleScope scope(env->isolate);
- node::Environment* env_ = node::Environment::GetCurrent(env->isolate);
- env_->DecreaseWaitingRequestCounter();
+ CallbackScope callback_scope(this);
- CallbackScope callback_scope(work);
+ NAPI_CALL_INTO_MODULE(_env,
+ _complete(_env, ConvertUVErrorCode(status), _data),
+ [this] (v8::Local<v8::Value> local_err) {
+ // If there was an unhandled exception in the complete callback,
+ // report it as a fatal exception. (There is no JavaScript on the
+ // callstack that can possibly handle it.)
+ v8impl::trigger_fatal_exception(_env, local_err);
+ });
- NAPI_CALL_INTO_MODULE(env,
- work->_complete(env, ConvertUVErrorCode(status), work->_data),
- [env] (v8::Local<v8::Value> local_err) {
- // If there was an unhandled exception in the complete callback,
- // report it as a fatal exception. (There is no JavaScript on the
- // callstack that can possibly handle it.)
- v8impl::trigger_fatal_exception(env, local_err);
- });
-
- // Note: Don't access `work` after this point because it was
- // likely deleted by the complete callback.
- }
- }
-
- uv_work_t* Request() {
- return &_request;
+ // Note: Don't access `work` after this point because it was
+ // likely deleted by the complete callback.
}
private:
napi_env _env;
void* _data;
- uv_work_t _request;
napi_async_execute_callback _execute;
napi_async_complete_callback _complete;
};
@@ -3491,12 +3479,7 @@ napi_status napi_queue_async_work(napi_env env, napi_async_work work) {
uvimpl::Work* w = reinterpret_cast<uvimpl::Work*>(work);
- node::Environment* env_ = node::Environment::GetCurrent(env->isolate);
- env_->IncreaseWaitingRequestCounter();
- CALL_UV(env, uv_queue_work(event_loop,
- w->Request(),
- uvimpl::Work::ExecuteCallback,
- uvimpl::Work::CompleteCallback));
+ w->ScheduleWork();
return napi_clear_last_error(env);
}
@@ -3507,7 +3490,7 @@ napi_status napi_cancel_async_work(napi_env env, napi_async_work work) {
uvimpl::Work* w = reinterpret_cast<uvimpl::Work*>(work);
- CALL_UV(env, uv_cancel(reinterpret_cast<uv_req_t*>(w->Request())));
+ CALL_UV(env, w->CancelWork());
return napi_clear_last_error(env);
}
diff --git a/src/node_crypto.cc b/src/node_crypto.cc
index 10e4f59391..8235b8b01c 100644
--- a/src/node_crypto.cc
+++ b/src/node_crypto.cc
@@ -4556,7 +4556,7 @@ bool ECDH::IsKeyPairValid() {
}
-class PBKDF2Request : public AsyncWrap {
+class PBKDF2Request : public AsyncWrap, public ThreadPoolWork {
public:
PBKDF2Request(Environment* env,
Local<Object> object,
@@ -4566,6 +4566,7 @@ class PBKDF2Request : public AsyncWrap {
int keylen,
int iteration_count)
: AsyncWrap(env, object, AsyncWrap::PROVIDER_PBKDF2REQUEST),
+ ThreadPoolWork(env),
digest_(digest),
success_(false),
pass_(std::move(pass)),
@@ -4574,21 +4575,14 @@ class PBKDF2Request : public AsyncWrap {
iteration_count_(iteration_count) {
}
- uv_work_t* work_req() {
- return &work_req_;
- }
-
size_t self_size() const override { return sizeof(*this); }
- static void Work(uv_work_t* work_req);
- void Work();
+ void DoThreadPoolWork() override;
+ void AfterThreadPoolWork(int status) override;
- static void After(uv_work_t* work_req, int status);
void After(Local<Value> (*argv)[2]);
- void After();
private:
- uv_work_t work_req_;
const EVP_MD* digest_;
bool success_;
MallocedBuffer<char> pass_;
@@ -4598,7 +4592,7 @@ class PBKDF2Request : public AsyncWrap {
};
-void PBKDF2Request::Work() {
+void PBKDF2Request::DoThreadPoolWork() {
success_ =
PKCS5_PBKDF2_HMAC(
pass_.data, pass_.size,
@@ -4611,12 +4605,6 @@ void PBKDF2Request::Work() {
}
-void PBKDF2Request::Work(uv_work_t* work_req) {
- PBKDF2Request* req = ContainerOf(&PBKDF2Request::work_req_, work_req);
- req->Work();
-}
-
-
void PBKDF2Request::After(Local<Value> (*argv)[2]) {
if (success_) {
(*argv)[0] = Null(env()->isolate());
@@ -4629,7 +4617,12 @@ void PBKDF2Request::After(Local<Value> (*argv)[2]) {
}
-void PBKDF2Request::After() {
+void PBKDF2Request::AfterThreadPoolWork(int status) {
+ std::unique_ptr<PBKDF2Request> req(this);
+ if (status == UV_ECANCELED)
+ return;
+ CHECK_EQ(status, 0);
+
HandleScope handle_scope(env()->isolate());
Context::Scope context_scope(env()->context());
Local<Value> argv[2];
@@ -4638,17 +4631,6 @@ void PBKDF2Request::After() {
}
-void PBKDF2Request::After(uv_work_t* work_req, int status) {
- std::unique_ptr<PBKDF2Request> req(
- ContainerOf(&PBKDF2Request::work_req_, work_req));
- req->env()->DecreaseWaitingRequestCounter();
- if (status == UV_ECANCELED)
- return;
- CHECK_EQ(status, 0);
- req->After();
-}
-
-
void PBKDF2(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
@@ -4695,14 +4677,10 @@ void PBKDF2(const FunctionCallbackInfo<Value>& args) {
if (args[5]->IsFunction()) {
obj->Set(env->context(), env->ondone_string(), args[5]).FromJust();
- env->IncreaseWaitingRequestCounter();
- uv_queue_work(env->event_loop(),
- req.release()->work_req(),
- PBKDF2Request::Work,
- PBKDF2Request::After);
+ req.release()->ScheduleWork();
} else {
env->PrintSyncTrace();
- req->Work();
+ req->DoThreadPoolWork();
Local<Value> argv[2];
req->After(&argv);
@@ -4715,7 +4693,7 @@ void PBKDF2(const FunctionCallbackInfo<Value>& args) {
// Only instantiate within a valid HandleScope.
-class RandomBytesRequest : public AsyncWrap {
+class RandomBytesRequest : public AsyncWrap, public ThreadPoolWork {
public:
enum FreeMode { FREE_DATA, DONT_FREE_DATA };
@@ -4725,16 +4703,13 @@ class RandomBytesRequest : public AsyncWrap {
char* data,
FreeMode free_mode)
: AsyncWrap(env, object, AsyncWrap::PROVIDER_RANDOMBYTESREQUEST),
+ ThreadPoolWork(env),
error_(0),
size_(size),
data_(data),
free_mode_(free_mode) {
}
- uv_work_t* work_req() {
- return &work_req_;
- }
-
inline size_t size() const {
return size_;
}
@@ -4772,7 +4747,8 @@ class RandomBytesRequest : public AsyncWrap {
size_t self_size() const override { return sizeof(*this); }
- uv_work_t work_req_;
+ void DoThreadPoolWork() override;
+ void AfterThreadPoolWork(int status) override;
private:
unsigned long error_; // NOLINT(runtime/int)
@@ -4782,21 +4758,17 @@ class RandomBytesRequest : public AsyncWrap {
};
-void RandomBytesWork(uv_work_t* work_req) {
- RandomBytesRequest* req =
- ContainerOf(&RandomBytesRequest::work_req_, work_req);
-
+void RandomBytesRequest::DoThreadPoolWork() {
// Ensure that OpenSSL's PRNG is properly seeded.
CheckEntropy();
- const int r = RAND_bytes(reinterpret_cast<unsigned char*>(req->data()),
- req->size());
+ const int r = RAND_bytes(reinterpret_cast<unsigned char*>(data_), size_);
// RAND_bytes() returns 0 on error.
if (r == 0) {
- req->set_error(ERR_get_error()); // NOLINT(runtime/int)
+ set_error(ERR_get_error()); // NOLINT(runtime/int)
} else if (r == -1) {
- req->set_error(static_cast<unsigned long>(-1)); // NOLINT(runtime/int)
+ set_error(static_cast<unsigned long>(-1)); // NOLINT(runtime/int)
}
}
@@ -4834,19 +4806,16 @@ void RandomBytesCheck(RandomBytesRequest* req, Local<Value> (*argv)[2]) {
}
-void RandomBytesAfter(uv_work_t* work_req, int status) {
- std::unique_ptr<RandomBytesRequest> req(
- ContainerOf(&RandomBytesRequest::work_req_, work_req));
- Environment* env = req->env();
- env->DecreaseWaitingRequestCounter();
+void RandomBytesRequest::AfterThreadPoolWork(int status) {
+ std::unique_ptr<RandomBytesRequest> req(this);
if (status == UV_ECANCELED)
return;
CHECK_EQ(status, 0);
- HandleScope handle_scope(env->isolate());
- Context::Scope context_scope(env->context());
+ HandleScope handle_scope(env()->isolate());
+ Context::Scope context_scope(env()->context());
Local<Value> argv[2];
- RandomBytesCheck(req.get(), &argv);
- req->MakeCallback(env->ondone_string(), arraysize(argv), argv);
+ RandomBytesCheck(this, &argv);
+ MakeCallback(env()->ondone_string(), arraysize(argv), argv);
}
@@ -4854,7 +4823,7 @@ void RandomBytesProcessSync(Environment* env,
std::unique_ptr<RandomBytesRequest> req,
Local<Value> (*argv)[2]) {
env->PrintSyncTrace();
- RandomBytesWork(req->work_req());
+ req->DoThreadPoolWork();
RandomBytesCheck(req.get(), argv);
if (!(*argv)[0]->IsNull())
@@ -4881,11 +4850,7 @@ void RandomBytes(const FunctionCallbackInfo<Value>& args) {
if (args[1]->IsFunction()) {
obj->Set(env->context(), env->ondone_string(), args[1]).FromJust();
- env->IncreaseWaitingRequestCounter();
- uv_queue_work(env->event_loop(),
- req.release()->work_req(),
- RandomBytesWork,
- RandomBytesAfter);
+ req.release()->ScheduleWork();
args.GetReturnValue().Set(obj);
} else {
Local<Value> argv[2];
@@ -4921,11 +4886,7 @@ void RandomBytesBuffer(const FunctionCallbackInfo<Value>& args) {
if (args[3]->IsFunction()) {
obj->Set(env->context(), env->ondone_string(), args[3]).FromJust();
- env->IncreaseWaitingRequestCounter();
- uv_queue_work(env->event_loop(),
- req.release()->work_req(),
- RandomBytesWork,
- RandomBytesAfter);
+ req.release()->ScheduleWork();
args.GetReturnValue().Set(obj);
} else {
Local<Value> argv[2];
diff --git a/src/node_internals.h b/src/node_internals.h
index e15df78ffd..8aa4631880 100644
--- a/src/node_internals.h
+++ b/src/node_internals.h
@@ -503,6 +503,41 @@ class InternalCallbackScope {
bool closed_ = false;
};
+class ThreadPoolWork {
+ public:
+ explicit inline ThreadPoolWork(Environment* env) : env_(env) {}
+ inline void ScheduleWork();
+ inline int CancelWork();
+
+ virtual void DoThreadPoolWork() = 0;
+ virtual void AfterThreadPoolWork(int status) = 0;
+
+ private:
+ Environment* env_;
+ uv_work_t work_req_;
+};
+
+void ThreadPoolWork::ScheduleWork() {
+ env_->IncreaseWaitingRequestCounter();
+ int status = uv_queue_work(
+ env_->event_loop(),
+ &work_req_,
+ [](uv_work_t* req) {
+ ThreadPoolWork* self = ContainerOf(&ThreadPoolWork::work_req_, req);
+ self->DoThreadPoolWork();
+ },
+ [](uv_work_t* req, int status) {
+ ThreadPoolWork* self = ContainerOf(&ThreadPoolWork::work_req_, req);
+ self->env_->DecreaseWaitingRequestCounter();
+ self->AfterThreadPoolWork(status);
+ });
+ CHECK_EQ(status, 0);
+}
+
+int ThreadPoolWork::CancelWork() {
+ return uv_cancel(reinterpret_cast<uv_req_t*>(&work_req_));
+}
+
static inline const char *errno_string(int errorno) {
#define ERRNO_CASE(e) case e: return #e;
switch (errorno) {
diff --git a/src/node_zlib.cc b/src/node_zlib.cc
index 3249905dfb..c77e6d3297 100644
--- a/src/node_zlib.cc
+++ b/src/node_zlib.cc
@@ -70,10 +70,11 @@ enum node_zlib_mode {
/**
* Deflate/Inflate
*/
-class ZCtx : public AsyncWrap {
+class ZCtx : public AsyncWrap, public ThreadPoolWork {
public:
ZCtx(Environment* env, Local<Object> wrap, node_zlib_mode mode)
: AsyncWrap(env, wrap, AsyncWrap::PROVIDER_ZLIB),
+ ThreadPoolWork(env),
dictionary_(nullptr),
dictionary_len_(0),
err_(0),
@@ -191,9 +192,6 @@ class ZCtx : public AsyncWrap {
CHECK(Buffer::IsWithinBounds(out_off, out_len, Buffer::Length(out_buf)));
out = reinterpret_cast<Bytef *>(Buffer::Data(out_buf) + out_off);
- // build up the work request
- uv_work_t* work_req = &(ctx->work_req_);
-
ctx->strm_.avail_in = in_len;
ctx->strm_.next_in = in;
ctx->strm_.avail_out = out_len;
@@ -203,7 +201,7 @@ class ZCtx : public AsyncWrap {
if (!async) {
// sync version
env->PrintSyncTrace();
- Process(work_req);
+ ctx->DoThreadPoolWork();
if (CheckError(ctx)) {
ctx->write_result_[0] = ctx->strm_.avail_out;
ctx->write_result_[1] = ctx->strm_.avail_in;
@@ -214,18 +212,24 @@ class ZCtx : public AsyncWrap {
}
// async version
- env->IncreaseWaitingRequestCounter();
- uv_queue_work(env->event_loop(), work_req, ZCtx::Process, ZCtx::After);
+ ctx->ScheduleWork();
}
+ // TODO(addaleax): Make these methods non-static. It's a significant bunch
+ // of churn that's better left for a separate PR.
+ void DoThreadPoolWork() {
+ Process(this);
+ }
+
+ void AfterThreadPoolWork(int status) {
+ After(this, status);
+ }
// thread pool!
// This function may be called multiple times on the uv_work pool
// for a single write() call, until all of the input bytes have
// been consumed.
- static void Process(uv_work_t* work_req) {
- ZCtx *ctx = ContainerOf(&ZCtx::work_req_, work_req);
-
+ static void Process(ZCtx* ctx) {
const Bytef* next_expected_header_byte = nullptr;
// If the avail_out is left at 0, then it means that it ran out
@@ -361,12 +365,10 @@ class ZCtx : public AsyncWrap {
// v8 land!
- static void After(uv_work_t* work_req, int status) {
- ZCtx* ctx = ContainerOf(&ZCtx::work_req_, work_req);
+ static void After(ZCtx* ctx, int status) {
Environment* env = ctx->env();
ctx->write_in_progress_ = false;
- env->DecreaseWaitingRequestCounter();
if (status == UV_ECANCELED) {
ctx->Close();
return;
@@ -685,7 +687,6 @@ class ZCtx : public AsyncWrap {
int strategy_;
z_stream strm_;
int windowBits_;
- uv_work_t work_req_;
bool write_in_progress_;
bool pending_close_;
unsigned int refs_;