diff options
Diffstat (limited to 'lib/internal')
-rw-r--r-- | lib/internal/bootstrap/pre_execution.js | 6 | ||||
-rw-r--r-- | lib/internal/child_process.js | 59 | ||||
-rw-r--r-- | lib/internal/child_process/serialization.js | 119 | ||||
-rw-r--r-- | lib/internal/cluster/master.js | 1 |
4 files changed, 152 insertions, 33 deletions
diff --git a/lib/internal/bootstrap/pre_execution.js b/lib/internal/bootstrap/pre_execution.js index 80ac97ee45..e58293e616 100644 --- a/lib/internal/bootstrap/pre_execution.js +++ b/lib/internal/bootstrap/pre_execution.js @@ -326,7 +326,11 @@ function setupChildProcessIpcChannel() { // Make sure it's not accidentally inherited by child processes. delete process.env.NODE_CHANNEL_FD; - require('child_process')._forkChild(fd); + const serializationMode = + process.env.NODE_CHANNEL_SERIALIZATION_MODE || 'json'; + delete process.env.NODE_CHANNEL_SERIALIZATION_MODE; + + require('child_process')._forkChild(fd, serializationMode); assert(process.send); } } diff --git a/lib/internal/child_process.js b/lib/internal/child_process.js index 22f7da92ce..9e13650fa3 100644 --- a/lib/internal/child_process.js +++ b/lib/internal/child_process.js @@ -1,6 +1,6 @@ 'use strict'; -const { JSON, Object } = primordials; +const { Object } = primordials; const { errnoException, @@ -55,8 +55,6 @@ const { const { SocketListSend, SocketListReceive } = SocketList; -// Lazy loaded for startup performance. -let StringDecoder; // Lazy loaded for startup performance and to allow monkey patching of // internalBinding('http_parser').HTTPParser. let freeParser; @@ -343,6 +341,15 @@ ChildProcess.prototype.spawn = function(options) { const ipcFd = stdio.ipcFd; stdio = options.stdio = stdio.stdio; + if (options.serialization !== undefined && + options.serialization !== 'json' && + options.serialization !== 'advanced') { + throw new ERR_INVALID_OPT_VALUE('options.serialization', + options.serialization); + } + + const serialization = options.serialization || 'json'; + if (ipc !== undefined) { // Let child process know about opened IPC channel if (options.envPairs === undefined) @@ -353,7 +360,8 @@ ChildProcess.prototype.spawn = function(options) { options.envPairs); } - options.envPairs.push('NODE_CHANNEL_FD=' + ipcFd); + options.envPairs.push(`NODE_CHANNEL_FD=${ipcFd}`); + options.envPairs.push(`NODE_CHANNEL_SERIALIZATION_MODE=${serialization}`); } validateString(options.file, 'options.file'); @@ -446,7 +454,7 @@ ChildProcess.prototype.spawn = function(options) { this.stdio.push(stdio[i].socket === undefined ? null : stdio[i].socket); // Add .send() method and start listening for IPC data - if (ipc !== undefined) setupChannel(this, ipc); + if (ipc !== undefined) setupChannel(this, ipc, serialization); return err; }; @@ -516,7 +524,8 @@ class Control extends EventEmitter { const channelDeprecationMsg = '_channel is deprecated. ' + 'Use ChildProcess.channel instead.'; -function setupChannel(target, channel) { +let serialization; +function setupChannel(target, channel, serializationMode) { target.channel = channel; Object.defineProperty(target, '_channel', { @@ -535,12 +544,16 @@ function setupChannel(target, channel) { const control = new Control(channel); - if (StringDecoder === undefined) - StringDecoder = require('string_decoder').StringDecoder; - const decoder = new StringDecoder('utf8'); - var jsonBuffer = ''; - var pendingHandle = null; - channel.buffering = false; + if (serialization === undefined) + serialization = require('internal/child_process/serialization'); + const { + initMessageChannel, + parseChannelMessages, + writeChannelMessage + } = serialization[serializationMode]; + + let pendingHandle = null; + initMessageChannel(channel); channel.pendingHandle = null; channel.onread = function(arrayBuffer) { const recvHandle = channel.pendingHandle; @@ -552,21 +565,7 @@ function setupChannel(target, channel) { if (recvHandle) pendingHandle = recvHandle; - // Linebreak is used as a message end sign - var chunks = decoder.write(pool).split('\n'); - var numCompleteChunks = chunks.length - 1; - // Last line does not have trailing linebreak - var incompleteChunk = chunks[numCompleteChunks]; - if (numCompleteChunks === 0) { - jsonBuffer += incompleteChunk; - this.buffering = jsonBuffer.length !== 0; - return; - } - chunks[0] = jsonBuffer + chunks[0]; - - for (var i = 0; i < numCompleteChunks; i++) { - var message = JSON.parse(chunks[i]); - + for (const message of parseChannelMessages(channel, pool)) { // There will be at most one NODE_HANDLE message in every chunk we // read because SCM_RIGHTS messages don't get coalesced. Make sure // that we deliver the handle with the right message however. @@ -581,9 +580,6 @@ function setupChannel(target, channel) { handleMessage(message, undefined, false); } } - jsonBuffer = incompleteChunk; - this.buffering = jsonBuffer.length !== 0; - } else { this.buffering = false; target.disconnect(); @@ -782,8 +778,7 @@ function setupChannel(target, channel) { const req = new WriteWrap(); - const string = JSON.stringify(message) + '\n'; - const err = channel.writeUtf8String(req, string, handle); + const err = writeChannelMessage(channel, req, message, handle); const wasAsyncWrite = streamBaseState[kLastWriteWasAsync]; if (err === 0) { diff --git a/lib/internal/child_process/serialization.js b/lib/internal/child_process/serialization.js new file mode 100644 index 0000000000..1381f29926 --- /dev/null +++ b/lib/internal/child_process/serialization.js @@ -0,0 +1,119 @@ +'use strict'; + +const { JSON } = primordials; +const { Buffer } = require('buffer'); +const { StringDecoder } = require('string_decoder'); +const v8 = require('v8'); +const { isArrayBufferView } = require('internal/util/types'); +const assert = require('internal/assert'); + +const kMessageBuffer = Symbol('kMessageBuffer'); +const kJSONBuffer = Symbol('kJSONBuffer'); +const kStringDecoder = Symbol('kStringDecoder'); + +// Extend V8's serializer APIs to give more JSON-like behaviour in +// some cases; in particular, for native objects this serializes them the same +// way that JSON does rather than throwing an exception. +const kArrayBufferViewTag = 0; +const kNotArrayBufferViewTag = 1; +class ChildProcessSerializer extends v8.DefaultSerializer { + _writeHostObject(object) { + if (isArrayBufferView(object)) { + this.writeUint32(kArrayBufferViewTag); + return super._writeHostObject(object); + } else { + this.writeUint32(kNotArrayBufferViewTag); + this.writeValue({ ...object }); + } + } +} + +class ChildProcessDeserializer extends v8.DefaultDeserializer { + _readHostObject() { + const tag = this.readUint32(); + if (tag === kArrayBufferViewTag) + return super._readHostObject(); + + assert(tag === kNotArrayBufferViewTag); + return this.readValue(); + } +} + +// Messages are parsed in either of the following formats: +// - Newline-delimited JSON, or +// - V8-serialized buffers, prefixed with their length as a big endian uint32 +// (aka 'advanced') +const advanced = { + initMessageChannel(channel) { + channel[kMessageBuffer] = Buffer.alloc(0); + channel.buffering = false; + }, + + *parseChannelMessages(channel, readData) { + if (readData.length === 0) return; + + let messageBuffer = Buffer.concat([channel[kMessageBuffer], readData]); + while (messageBuffer.length > 4) { + const size = messageBuffer.readUInt32BE(); + if (messageBuffer.length < 4 + size) { + break; + } + + const deserializer = new ChildProcessDeserializer( + messageBuffer.subarray(4, 4 + size)); + messageBuffer = messageBuffer.subarray(4 + size); + + deserializer.readHeader(); + yield deserializer.readValue(); + } + channel[kMessageBuffer] = messageBuffer; + channel.buffering = messageBuffer.length > 0; + }, + + writeChannelMessage(channel, req, message, handle) { + const ser = new ChildProcessSerializer(); + ser.writeHeader(); + ser.writeValue(message); + const serializedMessage = ser.releaseBuffer(); + const sizeBuffer = Buffer.allocUnsafe(4); + sizeBuffer.writeUInt32BE(serializedMessage.length); + return channel.writeBuffer(req, Buffer.concat([ + sizeBuffer, + serializedMessage + ]), handle); + }, +}; + +const json = { + initMessageChannel(channel) { + channel[kJSONBuffer] = ''; + channel[kStringDecoder] = undefined; + }, + + *parseChannelMessages(channel, readData) { + if (readData.length === 0) return; + + if (channel[kStringDecoder] === undefined) + channel[kStringDecoder] = new StringDecoder('utf8'); + const chunks = channel[kStringDecoder].write(readData).split('\n'); + const numCompleteChunks = chunks.length - 1; + // Last line does not have trailing linebreak + const incompleteChunk = chunks[numCompleteChunks]; + if (numCompleteChunks === 0) { + channel[kJSONBuffer] += incompleteChunk; + } else { + chunks[0] = channel[kJSONBuffer] + chunks[0]; + for (let i = 0; i < numCompleteChunks; i++) + yield JSON.parse(chunks[i]); + channel[kJSONBuffer] = incompleteChunk; + } + channel.buffering = channel[kJSONBuffer].length !== 0; + }, + + writeChannelMessage(channel, req, message, handle) { + const string = JSON.stringify(message) + '\n'; + return channel.writeUtf8String(req, string, handle); + }, +}; + +module.exports = { advanced, json }; diff --git a/lib/internal/cluster/master.js b/lib/internal/cluster/master.js index a881021c5e..005de8aa1b 100644 --- a/lib/internal/cluster/master.js +++ b/lib/internal/cluster/master.js @@ -130,6 +130,7 @@ function createWorkerProcess(id, env) { return fork(cluster.settings.exec, cluster.settings.args, { cwd: cluster.settings.cwd, env: workerEnv, + serialization: cluster.settings.serialization, silent: cluster.settings.silent, windowsHide: cluster.settings.windowsHide, execArgv: execArgv, |