diff options
author | Andreas Madsen <amwebdk@gmail.com> | 2011-12-20 10:42:48 +0100 |
---|---|---|
committer | Ryan Dahl <ry@tinyclouds.org> | 2012-01-04 18:30:19 -0800 |
commit | 5f08c3cfa1ee3ece6032d1ffeda6f403e54dd5c0 (patch) | |
tree | b0adda96165732daa489f77a4b4a6f0265460440 /lib/cluster.js | |
parent | e21643d618a923a484b1f692ceeabffa6855e6cf (diff) | |
download | android-node-v8-5f08c3cfa1ee3ece6032d1ffeda6f403e54dd5c0.tar.gz android-node-v8-5f08c3cfa1ee3ece6032d1ffeda6f403e54dd5c0.tar.bz2 android-node-v8-5f08c3cfa1ee3ece6032d1ffeda6f403e54dd5c0.zip |
cluster improvements: Worker class and isolate internal messages
Fixes #2388
Diffstat (limited to 'lib/cluster.js')
-rw-r--r-- | lib/cluster.js | 459 |
1 files changed, 350 insertions, 109 deletions
diff --git a/lib/cluster.js b/lib/cluster.js index 5892481c06..90da833bcb 100644 --- a/lib/cluster.js +++ b/lib/cluster.js @@ -23,22 +23,24 @@ var assert = require('assert'); var fork = require('child_process').fork; var net = require('net'); var EventEmitter = require('events').EventEmitter; +var util = require('util'); function isObject(o) { return (typeof o === 'object' && o !== null); } function extendObject(origin, add) { + // Don't do anything if add isn't an object + if (!add) return origin; + var keys = Object.keys(add), i = keys.length; - while(i--) { + while (i--) { origin[keys[i]] = add[keys[i]]; } return origin; } -var cluster = module.exports = new EventEmitter(); - var debug; if (process.env.NODE_DEBUG && /cluster/.test(process.env.NODE_DEBUG)) { debug = function(x) { @@ -50,23 +52,42 @@ if (process.env.NODE_DEBUG && /cluster/.test(process.env.NODE_DEBUG)) { debug = function() { }; } +// cluster object: +function cluster() {} +util.inherits(cluster, EventEmitter); +var cluster = module.exports = new cluster(); // Used in the master: var masterStarted = false; var ids = 0; -var workers = []; -var servers = {}; +var serverHandlers = {}; var workerFilename; var workerArgs; // Used in the worker: -var workerId = 0; +var serverLisenters = {}; var queryIds = 0; var queryCallbacks = {}; -cluster.isWorker = 'NODE_WORKER_ID' in process.env; +// Define isWorker and isMaster +cluster.isWorker = 'NODE_UNIQUE_ID' in process.env; cluster.isMaster = ! cluster.isWorker; +// The worker object is only used in a worker +cluster.worker = cluster.isWorker ? {} : null; +// The workers array is oly used in the naster +cluster.workers = cluster.isMaster ? {} : null; + +// Simple function there call a function on each worker +function eachWorker(cb) { + // Go througe all workers + for (var id in cluster.workers) { + if (cluster.workers.hasOwnProperty(id)) { + cb(cluster.workers[id]); + } + } +} + // Call this from the master process. It will start child workers. // // options.workerFilename @@ -90,155 +111,375 @@ function startMaster() { workerArgs = process.argv.slice(2); process.on('uncaughtException', function(e) { - // Quickly try to kill all the workers. - // TODO: be session leader - will cause auto SIGHUP to the children. - eachWorker(function(worker) { - debug('kill worker ' + worker.pid); - worker.kill(); - }); - console.error('Exception in cluster master process: ' + e.message + '\n' + e.stack); + + quickDestroyCluster(); process.exit(1); }); } +// Check if a message is internal only +var INTERNAL_PREFIX = 'NODE_CLUTER_'; +function isInternalMessage(message) { + return (isObject(message) && + typeof message.cmd === 'string' && + message.cmd.indexOf(INTERNAL_PREFIX) === 0); +} -function handleWorkerMessage(worker, message) { - // This can only be called from the master. - assert(cluster.isMaster); +// Modyfi message object to be internal +function internalMessage(inMessage) { + var outMessage = extendObject({}, inMessage); + + // Add internal prefix to cmd + outMessage.cmd = INTERNAL_PREFIX + (outMessage.cmd || ''); - debug('recv ' + JSON.stringify(message)); - - switch (message.cmd) { - case 'online': - debug('Worker ' + worker.pid + ' online'); - worker.online = true; - break; - - case 'queryServer': - var key = message.address + ':' + - message.port + ':' + - message.addressType; - var response = { _queryId: message._queryId }; - - if (!(key in servers)) { - // Create a new server. - debug('create new server ' + key); - servers[key] = net._createServerHandle(message.address, - message.port, - message.addressType); - } - worker.send(response, servers[key]); - break; - - default: - // Ignore. - break; + return outMessage; +} + +// Handle callback messges +function handleResponse(outMessage, outHandle, inMessage, inHandle, worker) { + + // The message there will be send + var message = internalMessage(outMessage); + + // callback id - will be undefined if not set + message._queryEcho = inMessage._requestEcho; + + // Call callback if a query echo is received + if (inMessage.hasOwnProperty('_queryEcho')) { + queryCallbacks[inMessage._queryEcho](inMessage.content, inHandle); + delete queryCallbacks[inMessage._queryEcho]; + } + + // Send if outWrap do contain something useful + if (!(outMessage === undefined && message._queryEcho === undefined)) { + sendInternalMessage(worker, message, outHandle); } } +// Handle messages from both master and workers +var messageHandingObject = {}; +function handleMessage(inMessage, inHandle, worker) { -function eachWorker(cb) { - // This can only be called from the master. - assert(cluster.isMaster); + //Remove internal prefix + var message = extendObject({}, inMessage); + message.cmd = inMessage.cmd.substr(INTERNAL_PREFIX.length); - for (var id in workers) { - if (workers[id]) { - cb(workers[id]); - } + var respondUsed = false; + var respond = function(outMessage, outHandler) { + respondUsed = true; + handleResponse(outMessage, outHandler, inMessage, inHandle, worker); + }; + + // Run handler if it exist + if (messageHandingObject[message.cmd]) { + messageHandingObject[message.cmd](message, worker, respond); + } + + // Send respond if it wasn't done + if (respondUsed === false) { + respond(); } } +// Messages to the master will be handled using this methods +if (cluster.isMaster) { -cluster.fork = function(env) { - // This can only be called from the master. - assert(cluster.isMaster); + // Handle online messages from workers + messageHandingObject.online = function(message, worker) { + worker.state = 'online'; + debug('Worker ' + worker.process.pid + ' online'); + worker.emit('online', worker); + cluster.emit('online', worker); + }; - // Lazily start the master process stuff. - startMaster(); + // Handle queryServer messages form workers + messageHandingObject.queryServer = function(message, worker, send) { + + // This sequence of infomation is unique to the connection but not + // to the worker + var args = [message.address, message.port, message.addressType]; + var key = args.join(':'); + var handler; + + if (serverHandlers.hasOwnProperty(key)) { + handler = serverHandlers[key]; + } else { + handler = serverHandlers[key] = net._createServerHandle.apply(net, args); + } + + // echo callback with the fd handler associated with it + send({}, handler); + }; + + // Handle listening messages from workers + messageHandingObject.listening = function(message, worker) { + + worker.state = 'listening'; + + // Emit listining, now that we know the worker is listning + worker.emit('listening', worker, { + address: message.address, + port: message.port, + addressType: message.addressType + }); + cluster.emit('listening', worker, { + address: message.address, + port: message.port, + addressType: message.addressType + }); + }; + + // Handle suicide messages from workers + messageHandingObject.suicide = function(message, worker) { + worker.suicide = true; + }; + +} + +// Messages to a worker will be handled using this methods +else if (cluster.isWorker) { + + // TODO: the disconnect step will use this +} + +function toDecInt(value) { + value = parseInt(value, 10); + return isNaN(value) ? null : value; +} + +// Create a worker object, there works both for master and worker +function Worker(customEnv) { + if (!(this instanceof Worker)) return new Worker(); + + var self = this; + var env = process.env; + + // Assign uniqueID, default null + this.uniqueID = cluster.isMaster ? ++ids : toDecInt(env.NODE_UNIQUE_ID); + + // Assign state + this.state = 'none'; - var id = ++ids; + // Create or get process + if (cluster.isMaster) { - //Create env object - var envCopy = extendObject({}, process.env); - envCopy['NODE_WORKER_ID'] = id; - if (isObject(env)) { - envCopy = extendObject(envCopy, env); + // Create env object + // first: copy and add uniqueID + var envCopy = extendObject({}, env); + envCopy['NODE_UNIQUE_ID'] = this.uniqueID; + // second: extend envCopy with the env argument + if (isObject(customEnv)) { + envCopy = extendObject(envCopy, customEnv); + } + + // fork worker + this.process = fork(workerFilename, workerArgs, { + 'env': envCopy + }); + + } else { + this.process = process; } - //fork worker - var worker = fork(workerFilename, workerArgs, { - 'env': envCopy - }); + if (cluster.isMaster) { + // Save worker in the cluster.workers array + cluster.workers[this.uniqueID] = this; - workers[id] = worker; + // Emit a fork event, on next tick + // There is no worker.fork event since this has no real purpose + process.nextTick(function() { + cluster.emit('fork', self); + }); + } - worker.on('message', function(message) { - handleWorkerMessage(worker, message); - }); + // Internal message: handle message + this.process.on('inernalMessage', function(message, handle) { + debug('recived: ', message); - worker.on('exit', function() { - debug('worker id=' + id + ' died'); - delete workers[id]; - cluster.emit('death', worker); + // relay to handleMessage + handleMessage(message, handle, self); + return; }); - return worker; -}; + // Non-internal message: relay to Worker object + this.process.on('message', function(message, handle) { + self.emit('message', message, handle); + }); + // Handle exit + self.process.on('exit', function() { + debug('worker id=' + self.uniqueID + ' died'); -// Internal function. Called from src/node.js when worker process starts. -cluster._startWorker = function() { - assert(cluster.isWorker); - workerId = parseInt(process.env.NODE_WORKER_ID, 10); - - queryMaster({ cmd: 'online' }); - - // Make callbacks from queryMaster() - process.on('message', function(msg, handle) { - debug('recv ' + JSON.stringify(msg)); - if (msg._queryId && msg._queryId in queryCallbacks) { - var cb = queryCallbacks[msg._queryId]; - if (typeof cb == 'function') { - cb(msg, handle); - } - delete queryCallbacks[msg._queryId]; - } + // Prepare worker to die and emit events + prepareDeath(self, 'dead', 'death'); }); + +} +util.inherits(Worker, EventEmitter); +cluster.Worker = Worker; + +function prepareDeath(worker, state, eventName) { + + // set state to disconnect + worker.state = state; + + // Make suicide a boolean + worker.suicide = !!worker.suicide; + + // Remove from workers in the master + if (cluster.isMaster) { + delete cluster.workers[worker.uniqueID]; + } + + // Emit events + worker.emit(eventName, worker); + cluster.emit(eventName, worker); +} + +// Send internal message +function sendInternalMessage(worker, message/*, handler, callback*/) { + + // Exist callback + var callback = arguments[arguments.length - 1]; + if (typeof callback !== 'function') { + callback = undefined; + } + + // exist handler + var handler = arguments[2] !== callback ? arguments[2] : undefined; + + if (!isInternalMessage(message)) { + message = internalMessage(message); + } + + // Store callback for later + if (callback) { + message._requestEcho = worker.uniqueID + ':' + (++queryIds); + queryCallbacks[message._requestEcho] = callback; + } + + + worker.send(message, handler); +} + +// Send message to worker or master +Worker.prototype.send = function() { + + //You could also just use process.send in a worker + this.process.send.apply(this.process, arguments); }; -function queryMaster(msg, cb) { - assert(cluster.isWorker); +function closeWorkerChannel(worker, callback) { + //Apparently the .close method is async, but do not have a callback + worker.process._channel.close(); + worker.process._channel = null; + process.nextTick(callback); +} - debug('send ' + JSON.stringify(msg)); +// Kill the worker without restarting +Worker.prototype.destroy = function() { + var self = this; - // Grab some random queryId - msg._queryId = (++queryIds); - msg._workerId = workerId; + this.suicide = true; - // Store callback for later. Callback called in _startWorker. - if (cb) { - queryCallbacks[msg._queryId] = cb; + if (cluster.isMaster) { + // Stop channel + // this way the worker won't need to propagate suicide state to master + closeWorkerChannel(this, function() { + // Then kill worker + self.process.kill(); + }); + + } else { + // Channel is open + if (this.process._channel !== null) { + + // Inform master that is is suicide and then kill + sendInternalMessage(this, {cmd: 'suicide'}, function() { + // Kill worker + process.exit(0); + }); + + // When master do a quickDestroy the channel is not necesarily closed + // at the point this function runs. For that reason we need to keep + // checking that the channel is still open, until a actually callback + // from the master is resicved. Also we can't do a timeout and then + // just kill, since we don't know if the quickDestroy function was called. + setInterval(function() { + if (self.process._channel === null) { + process.exit(0); + } + }, 200); + + } else { + process.exit(0); + } } +}; + +// Fork a new worker +cluster.fork = function(env) { + // This can only be called from the master. + assert(cluster.isMaster); - // Send message to master. - process.send(msg); + // Make sure that the master has been initalized + startMaster(); + + return (new cluster.Worker(env)); +}; + +// Sync way to quickly kill all cluster workers +// However the workers may not die instantly +function quickDestroyCluster() { + eachWorker(function(worker) { + worker.process.kill(); + }); } +// Internal function. Called from src/node.js when worker process starts. +cluster._setupWorker = function() { + // Get worker class + var worker = cluster.worker = new Worker(); + + // Tell master that the worker is online + worker.state = 'online'; + sendInternalMessage(worker, { cmd: 'online' }); +}; -// Internal function. Called by lib/net.js when attempting to bind a -// server. -cluster._getServer = function(address, port, addressType, cb) { +// Internal function. Called by lib/net.js when attempting to bind a server. +cluster._getServer = function(tcpSelf, address, port, addressType, cb) { + // This can only be called from a worker. assert(cluster.isWorker); - queryMaster({ + // Store tcp instance for later use + var key = [address, port, addressType].join(':'); + serverLisenters[key] = tcpSelf; + + // Send a listening message to the master + tcpSelf.once('listening', function() { + cluster.worker.state = 'listening'; + sendInternalMessage(cluster.worker, { + cmd: 'listening', + address: address, + port: port, + addressType: addressType + }); + }); + + // Request the fd handler from the master process + var message = { cmd: 'queryServer', address: address, port: port, addressType: addressType - }, function(msg, handle) { + }; + + // The callback will be stored until the master has responed + sendInternalMessage(cluster.worker, message, function(msg, handle) { cb(handle); }); + }; |