diff options
-rw-r--r-- | doc/api/errors.md | 5 | ||||
-rw-r--r-- | doc/api/worker_threads.md | 49 | ||||
-rw-r--r-- | lib/internal/errors.js | 2 | ||||
-rw-r--r-- | lib/internal/worker.js | 52 | ||||
-rw-r--r-- | lib/worker_threads.js | 2 | ||||
-rw-r--r-- | src/node_worker.cc | 157 | ||||
-rw-r--r-- | src/node_worker.h | 17 | ||||
-rw-r--r-- | test/parallel/test-worker-resource-limits.js | 58 |
8 files changed, 316 insertions, 26 deletions
diff --git a/doc/api/errors.md b/doc/api/errors.md index 69b3745c78..f2aace2c5e 100644 --- a/doc/api/errors.md +++ b/doc/api/errors.md @@ -1998,6 +1998,11 @@ meaning of the error depends on the specific function. The `execArgv` option passed to the `Worker` constructor contains invalid flags. +<a id="ERR_WORKER_OUT_OF_MEMORY"></a> +### ERR_WORKER_OUT_OF_MEMORY + +The `Worker` instance terminated because it reached its memory limit. + <a id="ERR_WORKER_PATH"></a> ### ERR_WORKER_PATH diff --git a/doc/api/worker_threads.md b/doc/api/worker_threads.md index b5b7499637..ac7a020d70 100644 --- a/doc/api/worker_threads.md +++ b/doc/api/worker_threads.md @@ -157,6 +157,22 @@ console.log(receiveMessageOnPort(port2)); When this function is used, no `'message'` event will be emitted and the `onmessage` listener will not be invoked. +### worker.resourceLimits +<!-- YAML +added: REPLACEME +--> + +* {Object|undefined} + * `maxYoungGenerationSizeMb` {number} + * `maxOldGenerationSizeMb` {number} + * `codeRangeSizeMb` {number} + +Provides the set of JS engine resource constraints inside this Worker thread. +If the `resourceLimits` option was passed to the [`Worker`][] constructor, +this matches its values. + +If this is used in the main thread, its value is an empty object. + ## worker.SHARE_ENV <!-- YAML added: v11.14.0 @@ -488,6 +504,13 @@ if (isMainThread) { ``` ### new Worker(filename\[, options\]) +<!-- YAML +added: v10.5.0 +changes: + - version: REPLACEME + pr-url: https://github.com/nodejs/node/pull/26628 + description: The `resourceLimits` option was introduced. +--> * `filename` {string} The path to the Worker’s main script. Must be either an absolute path or a relative path (i.e. relative to the @@ -519,6 +542,16 @@ if (isMainThread) { occur as described in the [HTML structured clone algorithm][], and an error will be thrown if the object cannot be cloned (e.g. because it contains `function`s). + * `resourceLimits` {Object} An optional set of resource limits for the new + JS engine instance. Reaching these limits will lead to termination of the + `Worker` instance. These limits only affect the JS engine, and no external + data, including no `ArrayBuffer`s. Even if these limits are set, the process + may still abort if it encounters a global out-of-memory situation. + * `maxOldGenerationSizeMb` {number} The maximum size of the main heap in MB. + * `maxYoungGenerationSizeMb` {number} The maximum size of a heap space for + recently created objects. + * `codeRangeSizeMb` {number} The size of a pre-allocated memory range + used for generated code. ### Event: 'error' <!-- YAML @@ -583,6 +616,22 @@ Opposite of `unref()`, calling `ref()` on a previously `unref()`ed worker will behavior). If the worker is `ref()`ed, calling `ref()` again will have no effect. +### worker.resourceLimits +<!-- YAML +added: REPLACEME +--> + +* {Object} + * `maxYoungGenerationSizeMb` {number} + * `maxOldGenerationSizeMb` {number} + * `codeRangeSizeMb` {number} + +Provides the set of JS engine resource constraints for this Worker thread. +If the `resourceLimits` option was passed to the [`Worker`][] constructor, +this matches its values. + +If the worker has stopped, the return value is an empty object. + ### worker.stderr <!-- YAML added: v10.5.0 diff --git a/lib/internal/errors.js b/lib/internal/errors.js index cd3c162183..2684931a77 100644 --- a/lib/internal/errors.js +++ b/lib/internal/errors.js @@ -1226,6 +1226,8 @@ E('ERR_VM_MODULE_STATUS', 'Module status %s', Error); E('ERR_WORKER_INVALID_EXEC_ARGV', (errors) => `Initiated Worker with invalid execArgv flags: ${errors.join(', ')}`, Error); +E('ERR_WORKER_OUT_OF_MEMORY', 'Worker terminated due to reaching memory limit', + Error); E('ERR_WORKER_PATH', 'The worker script filename must be an absolute path or a relative ' + 'path starting with \'./\' or \'../\'. Received "%s"', diff --git a/lib/internal/worker.js b/lib/internal/worker.js index fc6588d527..614f930105 100644 --- a/lib/internal/worker.js +++ b/lib/internal/worker.js @@ -2,19 +2,20 @@ /* global SharedArrayBuffer */ -const { Object } = primordials; +const { Math, Object } = primordials; const EventEmitter = require('events'); const assert = require('internal/assert'); const path = require('path'); +const errorCodes = require('internal/errors').codes; const { ERR_WORKER_PATH, ERR_WORKER_UNSERIALIZABLE_ERROR, ERR_WORKER_UNSUPPORTED_EXTENSION, ERR_WORKER_INVALID_EXEC_ARGV, ERR_INVALID_ARG_TYPE, -} = require('internal/errors').codes; +} = errorCodes; const { validateString } = require('internal/validators'); const { getOptionValue } = require('internal/options'); @@ -37,8 +38,13 @@ const { pathToFileURL } = require('url'); const { ownsProcessState, isMainThread, + resourceLimits: resourceLimitsRaw, threadId, Worker: WorkerImpl, + kMaxYoungGenerationSizeMb, + kMaxOldGenerationSizeMb, + kCodeRangeSizeMb, + kTotalResourceLimitCount } = internalBinding('worker'); const kHandle = Symbol('kHandle'); @@ -102,7 +108,8 @@ class Worker extends EventEmitter { const url = options.eval ? null : pathToFileURL(filename); // Set up the C++ handle for the worker, as well as some internal wiring. - this[kHandle] = new WorkerImpl(url, options.execArgv); + this[kHandle] = new WorkerImpl(url, options.execArgv, + parseResourceLimits(options.resourceLimits)); if (this[kHandle].invalidExecArgv) { throw new ERR_WORKER_INVALID_EXEC_ARGV(this[kHandle].invalidExecArgv); } @@ -113,7 +120,7 @@ class Worker extends EventEmitter { } else if (env !== undefined) { this[kHandle].setEnvVars(env); } - this[kHandle].onexit = (code) => this[kOnExit](code); + this[kHandle].onexit = (code, customErr) => this[kOnExit](code, customErr); this[kPort] = this[kHandle].messagePort; this[kPort].on('message', (data) => this[kOnMessage](data)); this[kPort].start(); @@ -157,11 +164,15 @@ class Worker extends EventEmitter { this[kHandle].startThread(); } - [kOnExit](code) { + [kOnExit](code, customErr) { debug(`[${threadId}] hears end event for Worker ${this.threadId}`); drainMessagePort(this[kPublicPort]); drainMessagePort(this[kPort]); this[kDispose](); + if (customErr) { + debug(`[${threadId}] failing with custom error ${customErr}`); + this.emit('error', new errorCodes[customErr]()); + } this.emit('exit', code); this.removeAllListeners(); } @@ -280,6 +291,12 @@ class Worker extends EventEmitter { get stderr() { return this[kParentSideStdio].stderr; } + + get resourceLimits() { + if (this[kHandle] === null) return {}; + + return makeResourceLimits(this[kHandle].getResourceLimits()); + } } function pipeWithoutWarning(source, dest) { @@ -294,10 +311,35 @@ function pipeWithoutWarning(source, dest) { dest._maxListeners = destMaxListeners; } +const resourceLimitsArray = new Float64Array(kTotalResourceLimitCount); +function parseResourceLimits(obj) { + const ret = resourceLimitsArray; + ret.fill(-1); + if (typeof obj !== 'object' || obj === null) return ret; + + if (typeof obj.maxOldGenerationSizeMb === 'number') + ret[kMaxOldGenerationSizeMb] = Math.max(obj.maxOldGenerationSizeMb, 2); + if (typeof obj.maxYoungGenerationSizeMb === 'number') + ret[kMaxYoungGenerationSizeMb] = obj.maxYoungGenerationSizeMb; + if (typeof obj.codeRangeSizeMb === 'number') + ret[kCodeRangeSizeMb] = obj.codeRangeSizeMb; + return ret; +} + +function makeResourceLimits(float64arr) { + return { + maxYoungGenerationSizeMb: float64arr[kMaxYoungGenerationSizeMb], + maxOldGenerationSizeMb: float64arr[kMaxOldGenerationSizeMb], + codeRangeSizeMb: float64arr[kCodeRangeSizeMb] + }; +} + module.exports = { ownsProcessState, isMainThread, SHARE_ENV, + resourceLimits: + !isMainThread ? makeResourceLimits(resourceLimitsRaw) : {}, threadId, Worker, }; diff --git a/lib/worker_threads.js b/lib/worker_threads.js index bd455edea2..4b72bf2711 100644 --- a/lib/worker_threads.js +++ b/lib/worker_threads.js @@ -3,6 +3,7 @@ const { isMainThread, SHARE_ENV, + resourceLimits, threadId, Worker } = require('internal/worker'); @@ -20,6 +21,7 @@ module.exports = { MessageChannel, moveMessagePortToContext, receiveMessageOnPort, + resourceLimits, threadId, SHARE_ENV, Worker, diff --git a/src/node_worker.cc b/src/node_worker.cc index af79540631..c79968bad9 100644 --- a/src/node_worker.cc +++ b/src/node_worker.cc @@ -18,8 +18,10 @@ using node::options_parser::kDisallowedInEnvironment; using v8::Array; +using v8::ArrayBuffer; using v8::Boolean; using v8::Context; +using v8::Float64Array; using v8::FunctionCallbackInfo; using v8::FunctionTemplate; using v8::HandleScope; @@ -28,10 +30,13 @@ using v8::Isolate; using v8::Local; using v8::Locker; using v8::MaybeLocal; +using v8::Null; using v8::Number; using v8::Object; +using v8::ResourceConstraints; using v8::SealHandleScope; using v8::String; +using v8::TryCatch; using v8::Value; namespace node { @@ -105,6 +110,36 @@ std::shared_ptr<ArrayBufferAllocator> Worker::array_buffer_allocator() { return array_buffer_allocator_; } +void Worker::UpdateResourceConstraints(ResourceConstraints* constraints) { + constraints->set_stack_limit(reinterpret_cast<uint32_t*>(stack_base_)); + + constexpr double kMB = 1024 * 1024; + + if (resource_limits_[kMaxYoungGenerationSizeMb] > 0) { + constraints->set_max_young_generation_size_in_bytes( + resource_limits_[kMaxYoungGenerationSizeMb] * kMB); + } else { + resource_limits_[kMaxYoungGenerationSizeMb] = + constraints->max_young_generation_size_in_bytes() / kMB; + } + + if (resource_limits_[kMaxOldGenerationSizeMb] > 0) { + constraints->set_max_old_generation_size_in_bytes( + resource_limits_[kMaxOldGenerationSizeMb] * kMB); + } else { + resource_limits_[kMaxOldGenerationSizeMb] = + constraints->max_old_generation_size_in_bytes() / kMB; + } + + if (resource_limits_[kCodeRangeSizeMb] > 0) { + constraints->set_code_range_size_in_bytes( + resource_limits_[kCodeRangeSizeMb] * kMB); + } else { + resource_limits_[kCodeRangeSizeMb] = + constraints->code_range_size_in_bytes() / kMB; + } +} + // This class contains data that is only relevant to the child thread itself, // and only while it is running. // (Eventually, the Environment instance should probably also be moved here.) @@ -114,16 +149,27 @@ class WorkerThreadData { : w_(w) { CHECK_EQ(uv_loop_init(&loop_), 0); - Isolate* isolate = NewIsolate( - w->array_buffer_allocator_.get(), - &loop_, - w->platform_); - CHECK_NOT_NULL(isolate); + Isolate::CreateParams params; + SetIsolateCreateParamsForNode(¶ms); + params.array_buffer_allocator = w->array_buffer_allocator_.get(); + + w->UpdateResourceConstraints(¶ms.constraints); + + Isolate* isolate = Isolate::Allocate(); + if (isolate == nullptr) { + w->custom_error_ = "ERR_WORKER_OUT_OF_MEMORY"; + return; + } + + w->platform_->RegisterIsolate(isolate, &loop_); + Isolate::Initialize(isolate, params); + SetIsolateUpForNode(isolate); + + isolate->AddNearHeapLimitCallback(Worker::NearHeapLimit, w); { Locker locker(isolate); Isolate::Scope isolate_scope(isolate); - isolate->SetStackLimit(w_->stack_base_); HandleScope handle_scope(isolate); isolate_data_.reset(CreateIsolateData(isolate, @@ -148,20 +194,22 @@ class WorkerThreadData { w_->isolate_ = nullptr; } - bool platform_finished = false; + if (isolate != nullptr) { + bool platform_finished = false; - isolate_data_.reset(); + isolate_data_.reset(); - w_->platform_->AddIsolateFinishedCallback(isolate, [](void* data) { - *static_cast<bool*>(data) = true; - }, &platform_finished); + w_->platform_->AddIsolateFinishedCallback(isolate, [](void* data) { + *static_cast<bool*>(data) = true; + }, &platform_finished); - isolate->Dispose(); - w_->platform_->UnregisterIsolate(isolate); + isolate->Dispose(); + w_->platform_->UnregisterIsolate(isolate); - // Wait until the platform has cleaned up all relevant resources. - while (!platform_finished) - uv_run(&loop_, UV_RUN_ONCE); + // Wait until the platform has cleaned up all relevant resources. + while (!platform_finished) + uv_run(&loop_, UV_RUN_ONCE); + } CheckedUvLoopClose(&loop_); } @@ -174,6 +222,17 @@ class WorkerThreadData { friend class Worker; }; +size_t Worker::NearHeapLimit(void* data, size_t current_heap_limit, + size_t initial_heap_limit) { + Worker* worker = static_cast<Worker*>(data); + worker->custom_error_ = "ERR_WORKER_OUT_OF_MEMORY"; + worker->Exit(1); + // Give the current GC some extra leeway to let it finish rather than + // crash hard. We are not going to perform further allocations anyway. + constexpr size_t kExtraHeapAllowance = 16 * 1024 * 1024; + return current_heap_limit + kExtraHeapAllowance; +} + void Worker::Run() { std::string name = "WorkerThread "; name += std::to_string(thread_id_); @@ -185,6 +244,7 @@ void Worker::Run() { Debug(this, "Creating isolate for worker with id %llu", thread_id_); WorkerThreadData data(this); + if (isolate_ == nullptr) return; Debug(this, "Starting worker with id %llu", thread_id_); { @@ -238,7 +298,21 @@ void Worker::Run() { if (is_stopped()) return; { HandleScope handle_scope(isolate_); - Local<Context> context = NewContext(isolate_); + Local<Context> context; + { + // We create the Context object before we have an Environment* in place + // that we could use for error handling. If creation fails due to + // resource constraints, we need something in place to handle it, + // though. + TryCatch try_catch(isolate_); + context = NewContext(isolate_); + if (context.IsEmpty()) { + // TODO(addaleax): Inform the target about the actual underlying + // failure. + custom_error_ = "ERR_WORKER_OUT_OF_MEMORY"; + return; + } + } if (is_stopped()) return; CHECK(!context.IsEmpty()); @@ -365,8 +439,14 @@ void Worker::JoinThread() { env()->message_port_string(), Undefined(env()->isolate())).Check(); - Local<Value> code = Integer::New(env()->isolate(), exit_code_); - MakeCallback(env()->onexit_string(), 1, &code); + Local<Value> args[] = { + Integer::New(env()->isolate(), exit_code_), + custom_error_ != nullptr ? + OneByteString(env()->isolate(), custom_error_).As<Value>() : + Null(env()->isolate()).As<Value>(), + }; + + MakeCallback(env()->onexit_string(), arraysize(args), args); } // We cleared all libuv handles bound to this Worker above, @@ -400,7 +480,7 @@ void Worker::New(const FunctionCallbackInfo<Value>& args) { std::vector<std::string> exec_argv_out; bool has_explicit_exec_argv = false; - CHECK_EQ(args.Length(), 2); + CHECK_EQ(args.Length(), 3); // Argument might be a string or URL if (!args[0]->IsNullOrUndefined()) { Utf8Value value( @@ -466,7 +546,16 @@ void Worker::New(const FunctionCallbackInfo<Value>& args) { } if (!has_explicit_exec_argv) exec_argv_out = env->exec_argv(); - new Worker(env, args.This(), url, per_isolate_opts, std::move(exec_argv_out)); + + Worker* worker = + new Worker(env, args.This(), url, per_isolate_opts, + std::move(exec_argv_out)); + + CHECK(args[2]->IsFloat64Array()); + Local<Float64Array> limit_info = args[2].As<Float64Array>(); + CHECK_EQ(limit_info->Length(), kTotalResourceLimitCount); + limit_info->CopyContents(worker->resource_limits_, + sizeof(worker->resource_limits_)); } void Worker::CloneParentEnvVars(const FunctionCallbackInfo<Value>& args) { @@ -547,6 +636,18 @@ void Worker::Unref(const FunctionCallbackInfo<Value>& args) { uv_unref(reinterpret_cast<uv_handle_t*>(w->on_thread_finished_.GetHandle())); } +void Worker::GetResourceLimits(const FunctionCallbackInfo<Value>& args) { + Worker* w; + ASSIGN_OR_RETURN_UNWRAP(&w, args.This()); + args.GetReturnValue().Set(w->GetResourceLimits(args.GetIsolate())); +} + +Local<Float64Array> Worker::GetResourceLimits(Isolate* isolate) const { + Local<ArrayBuffer> ab = ArrayBuffer::New(isolate, sizeof(resource_limits_)); + memcpy(ab->GetContents().Data(), resource_limits_, sizeof(resource_limits_)); + return Float64Array::New(ab, 0, kTotalResourceLimitCount); +} + void Worker::Exit(int code) { Mutex::ScopedLock lock(mutex_); Debug(this, "Worker %llu called Exit(%d)", thread_id_, code); @@ -589,6 +690,7 @@ void InitWorker(Local<Object> target, env->SetProtoMethod(w, "stopThread", Worker::StopThread); env->SetProtoMethod(w, "ref", Worker::Ref); env->SetProtoMethod(w, "unref", Worker::Unref); + env->SetProtoMethod(w, "getResourceLimits", Worker::GetResourceLimits); Local<String> workerString = FIXED_ONE_BYTE_STRING(env->isolate(), "Worker"); @@ -617,6 +719,19 @@ void InitWorker(Local<Object> target, FIXED_ONE_BYTE_STRING(env->isolate(), "ownsProcessState"), Boolean::New(env->isolate(), env->owns_process_state())) .Check(); + + if (!env->is_main_thread()) { + target + ->Set(env->context(), + FIXED_ONE_BYTE_STRING(env->isolate(), "resourceLimits"), + env->worker_context()->GetResourceLimits(env->isolate())) + .Check(); + } + + NODE_DEFINE_CONSTANT(target, kMaxYoungGenerationSizeMb); + NODE_DEFINE_CONSTANT(target, kMaxOldGenerationSizeMb); + NODE_DEFINE_CONSTANT(target, kCodeRangeSizeMb); + NODE_DEFINE_CONSTANT(target, kTotalResourceLimitCount); } } // anonymous namespace diff --git a/src/node_worker.h b/src/node_worker.h index 77f68801e7..46eab70a49 100644 --- a/src/node_worker.h +++ b/src/node_worker.h @@ -12,6 +12,13 @@ namespace worker { class WorkerThreadData; +enum ResourceLimits { + kMaxYoungGenerationSizeMb, + kMaxOldGenerationSizeMb, + kCodeRangeSizeMb, + kTotalResourceLimitCount +}; + // A worker thread, as represented in its parent thread. class Worker : public AsyncWrap { public: @@ -51,9 +58,14 @@ class Worker : public AsyncWrap { static void StopThread(const v8::FunctionCallbackInfo<v8::Value>& args); static void Ref(const v8::FunctionCallbackInfo<v8::Value>& args); static void Unref(const v8::FunctionCallbackInfo<v8::Value>& args); + static void GetResourceLimits( + const v8::FunctionCallbackInfo<v8::Value>& args); + v8::Local<v8::Float64Array> GetResourceLimits(v8::Isolate* isolate) const; private: void CreateEnvMessagePort(Environment* env); + static size_t NearHeapLimit(void* data, size_t current_heap_limit, + size_t initial_heap_limit); std::shared_ptr<PerIsolateOptions> per_isolate_opts_; std::vector<std::string> exec_argv_; @@ -73,10 +85,15 @@ class Worker : public AsyncWrap { mutable Mutex mutex_; bool thread_joined_ = true; + const char* custom_error_ = nullptr; int exit_code_ = 0; uint64_t thread_id_ = -1; uintptr_t stack_base_ = 0; + // Custom resource constraints: + double resource_limits_[kTotalResourceLimitCount]; + void UpdateResourceConstraints(v8::ResourceConstraints* constraints); + // Full size of the thread's stack. static constexpr size_t kStackSize = 4 * 1024 * 1024; // Stack buffer size that is not available to the JS engine. diff --git a/test/parallel/test-worker-resource-limits.js b/test/parallel/test-worker-resource-limits.js new file mode 100644 index 0000000000..2d4ebbc0ce --- /dev/null +++ b/test/parallel/test-worker-resource-limits.js @@ -0,0 +1,58 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const v8 = require('v8'); +const { Worker, resourceLimits, isMainThread } = require('worker_threads'); + +if (isMainThread) { + assert.deepStrictEqual(resourceLimits, {}); +} + +const testResourceLimits = { + maxOldGenerationSizeMb: 16, + maxYoungGenerationSizeMb: 4, + codeRangeSizeMb: 16, +}; + +// Do not use isMainThread so that this test itself can be run inside a Worker. +if (!process.env.HAS_STARTED_WORKER) { + process.env.HAS_STARTED_WORKER = 1; + const w = new Worker(__filename, { resourceLimits: testResourceLimits }); + assert.deepStrictEqual(w.resourceLimits, testResourceLimits); + w.on('exit', common.mustCall((code) => { + assert.strictEqual(code, 1); + assert.deepStrictEqual(w.resourceLimits, {}); + })); + w.on('error', common.expectsError({ + code: 'ERR_WORKER_OUT_OF_MEMORY', + message: 'Worker terminated due to reaching memory limit' + })); + return; +} + +assert.deepStrictEqual(resourceLimits, testResourceLimits); +const array = []; +while (true) { + // Leave 10 % wiggle room here. + const usedMB = v8.getHeapStatistics().used_heap_size / 1024 / 1024; + assert(usedMB < resourceLimits.maxOldGenerationSizeMb * 1.1); + + let seenSpaces = 0; + for (const { space_name, space_size } of v8.getHeapSpaceStatistics()) { + if (space_name === 'new_space') { + seenSpaces++; + assert( + space_size / 1024 / 1024 < resourceLimits.maxYoungGenerationSizeMb * 2); + } else if (space_name === 'old_space') { + seenSpaces++; + assert(space_size / 1024 / 1024 < resourceLimits.maxOldGenerationSizeMb); + } else if (space_name === 'code_space') { + seenSpaces++; + assert(space_size / 1024 / 1024 < resourceLimits.codeRangeSizeMb); + } + } + assert.strictEqual(seenSpaces, 3); + + for (let i = 0; i < 100; i++) + array.push([array]); +} |