summaryrefslogtreecommitdiff
path: root/lib/cluster.js
diff options
context:
space:
mode:
authorAndreas Madsen <amwebdk@gmail.com>2011-12-20 10:42:48 +0100
committerRyan Dahl <ry@tinyclouds.org>2012-01-04 18:30:19 -0800
commit5f08c3cfa1ee3ece6032d1ffeda6f403e54dd5c0 (patch)
treeb0adda96165732daa489f77a4b4a6f0265460440 /lib/cluster.js
parente21643d618a923a484b1f692ceeabffa6855e6cf (diff)
downloadandroid-node-v8-5f08c3cfa1ee3ece6032d1ffeda6f403e54dd5c0.tar.gz
android-node-v8-5f08c3cfa1ee3ece6032d1ffeda6f403e54dd5c0.tar.bz2
android-node-v8-5f08c3cfa1ee3ece6032d1ffeda6f403e54dd5c0.zip
cluster improvements: Worker class and isolate internal messages
Fixes #2388
Diffstat (limited to 'lib/cluster.js')
-rw-r--r--lib/cluster.js459
1 files changed, 350 insertions, 109 deletions
diff --git a/lib/cluster.js b/lib/cluster.js
index 5892481c06..90da833bcb 100644
--- a/lib/cluster.js
+++ b/lib/cluster.js
@@ -23,22 +23,24 @@ var assert = require('assert');
var fork = require('child_process').fork;
var net = require('net');
var EventEmitter = require('events').EventEmitter;
+var util = require('util');
function isObject(o) {
return (typeof o === 'object' && o !== null);
}
function extendObject(origin, add) {
+ // Don't do anything if add isn't an object
+ if (!add) return origin;
+
var keys = Object.keys(add),
i = keys.length;
- while(i--) {
+ while (i--) {
origin[keys[i]] = add[keys[i]];
}
return origin;
}
-var cluster = module.exports = new EventEmitter();
-
var debug;
if (process.env.NODE_DEBUG && /cluster/.test(process.env.NODE_DEBUG)) {
debug = function(x) {
@@ -50,23 +52,42 @@ if (process.env.NODE_DEBUG && /cluster/.test(process.env.NODE_DEBUG)) {
debug = function() { };
}
+// cluster object:
+function cluster() {}
+util.inherits(cluster, EventEmitter);
+var cluster = module.exports = new cluster();
// Used in the master:
var masterStarted = false;
var ids = 0;
-var workers = [];
-var servers = {};
+var serverHandlers = {};
var workerFilename;
var workerArgs;
// Used in the worker:
-var workerId = 0;
+var serverLisenters = {};
var queryIds = 0;
var queryCallbacks = {};
-cluster.isWorker = 'NODE_WORKER_ID' in process.env;
+// Define isWorker and isMaster
+cluster.isWorker = 'NODE_UNIQUE_ID' in process.env;
cluster.isMaster = ! cluster.isWorker;
+// The worker object is only used in a worker
+cluster.worker = cluster.isWorker ? {} : null;
+// The workers array is oly used in the naster
+cluster.workers = cluster.isMaster ? {} : null;
+
+// Simple function there call a function on each worker
+function eachWorker(cb) {
+ // Go througe all workers
+ for (var id in cluster.workers) {
+ if (cluster.workers.hasOwnProperty(id)) {
+ cb(cluster.workers[id]);
+ }
+ }
+}
+
// Call this from the master process. It will start child workers.
//
// options.workerFilename
@@ -90,155 +111,375 @@ function startMaster() {
workerArgs = process.argv.slice(2);
process.on('uncaughtException', function(e) {
- // Quickly try to kill all the workers.
- // TODO: be session leader - will cause auto SIGHUP to the children.
- eachWorker(function(worker) {
- debug('kill worker ' + worker.pid);
- worker.kill();
- });
-
console.error('Exception in cluster master process: ' +
e.message + '\n' + e.stack);
+
+ quickDestroyCluster();
process.exit(1);
});
}
+// Check if a message is internal only
+var INTERNAL_PREFIX = 'NODE_CLUTER_';
+function isInternalMessage(message) {
+ return (isObject(message) &&
+ typeof message.cmd === 'string' &&
+ message.cmd.indexOf(INTERNAL_PREFIX) === 0);
+}
-function handleWorkerMessage(worker, message) {
- // This can only be called from the master.
- assert(cluster.isMaster);
+// Modyfi message object to be internal
+function internalMessage(inMessage) {
+ var outMessage = extendObject({}, inMessage);
+
+ // Add internal prefix to cmd
+ outMessage.cmd = INTERNAL_PREFIX + (outMessage.cmd || '');
- debug('recv ' + JSON.stringify(message));
-
- switch (message.cmd) {
- case 'online':
- debug('Worker ' + worker.pid + ' online');
- worker.online = true;
- break;
-
- case 'queryServer':
- var key = message.address + ':' +
- message.port + ':' +
- message.addressType;
- var response = { _queryId: message._queryId };
-
- if (!(key in servers)) {
- // Create a new server.
- debug('create new server ' + key);
- servers[key] = net._createServerHandle(message.address,
- message.port,
- message.addressType);
- }
- worker.send(response, servers[key]);
- break;
-
- default:
- // Ignore.
- break;
+ return outMessage;
+}
+
+// Handle callback messges
+function handleResponse(outMessage, outHandle, inMessage, inHandle, worker) {
+
+ // The message there will be send
+ var message = internalMessage(outMessage);
+
+ // callback id - will be undefined if not set
+ message._queryEcho = inMessage._requestEcho;
+
+ // Call callback if a query echo is received
+ if (inMessage.hasOwnProperty('_queryEcho')) {
+ queryCallbacks[inMessage._queryEcho](inMessage.content, inHandle);
+ delete queryCallbacks[inMessage._queryEcho];
+ }
+
+ // Send if outWrap do contain something useful
+ if (!(outMessage === undefined && message._queryEcho === undefined)) {
+ sendInternalMessage(worker, message, outHandle);
}
}
+// Handle messages from both master and workers
+var messageHandingObject = {};
+function handleMessage(inMessage, inHandle, worker) {
-function eachWorker(cb) {
- // This can only be called from the master.
- assert(cluster.isMaster);
+ //Remove internal prefix
+ var message = extendObject({}, inMessage);
+ message.cmd = inMessage.cmd.substr(INTERNAL_PREFIX.length);
- for (var id in workers) {
- if (workers[id]) {
- cb(workers[id]);
- }
+ var respondUsed = false;
+ var respond = function(outMessage, outHandler) {
+ respondUsed = true;
+ handleResponse(outMessage, outHandler, inMessage, inHandle, worker);
+ };
+
+ // Run handler if it exist
+ if (messageHandingObject[message.cmd]) {
+ messageHandingObject[message.cmd](message, worker, respond);
+ }
+
+ // Send respond if it wasn't done
+ if (respondUsed === false) {
+ respond();
}
}
+// Messages to the master will be handled using this methods
+if (cluster.isMaster) {
-cluster.fork = function(env) {
- // This can only be called from the master.
- assert(cluster.isMaster);
+ // Handle online messages from workers
+ messageHandingObject.online = function(message, worker) {
+ worker.state = 'online';
+ debug('Worker ' + worker.process.pid + ' online');
+ worker.emit('online', worker);
+ cluster.emit('online', worker);
+ };
- // Lazily start the master process stuff.
- startMaster();
+ // Handle queryServer messages form workers
+ messageHandingObject.queryServer = function(message, worker, send) {
+
+ // This sequence of infomation is unique to the connection but not
+ // to the worker
+ var args = [message.address, message.port, message.addressType];
+ var key = args.join(':');
+ var handler;
+
+ if (serverHandlers.hasOwnProperty(key)) {
+ handler = serverHandlers[key];
+ } else {
+ handler = serverHandlers[key] = net._createServerHandle.apply(net, args);
+ }
+
+ // echo callback with the fd handler associated with it
+ send({}, handler);
+ };
+
+ // Handle listening messages from workers
+ messageHandingObject.listening = function(message, worker) {
+
+ worker.state = 'listening';
+
+ // Emit listining, now that we know the worker is listning
+ worker.emit('listening', worker, {
+ address: message.address,
+ port: message.port,
+ addressType: message.addressType
+ });
+ cluster.emit('listening', worker, {
+ address: message.address,
+ port: message.port,
+ addressType: message.addressType
+ });
+ };
+
+ // Handle suicide messages from workers
+ messageHandingObject.suicide = function(message, worker) {
+ worker.suicide = true;
+ };
+
+}
+
+// Messages to a worker will be handled using this methods
+else if (cluster.isWorker) {
+
+ // TODO: the disconnect step will use this
+}
+
+function toDecInt(value) {
+ value = parseInt(value, 10);
+ return isNaN(value) ? null : value;
+}
+
+// Create a worker object, there works both for master and worker
+function Worker(customEnv) {
+ if (!(this instanceof Worker)) return new Worker();
+
+ var self = this;
+ var env = process.env;
+
+ // Assign uniqueID, default null
+ this.uniqueID = cluster.isMaster ? ++ids : toDecInt(env.NODE_UNIQUE_ID);
+
+ // Assign state
+ this.state = 'none';
- var id = ++ids;
+ // Create or get process
+ if (cluster.isMaster) {
- //Create env object
- var envCopy = extendObject({}, process.env);
- envCopy['NODE_WORKER_ID'] = id;
- if (isObject(env)) {
- envCopy = extendObject(envCopy, env);
+ // Create env object
+ // first: copy and add uniqueID
+ var envCopy = extendObject({}, env);
+ envCopy['NODE_UNIQUE_ID'] = this.uniqueID;
+ // second: extend envCopy with the env argument
+ if (isObject(customEnv)) {
+ envCopy = extendObject(envCopy, customEnv);
+ }
+
+ // fork worker
+ this.process = fork(workerFilename, workerArgs, {
+ 'env': envCopy
+ });
+
+ } else {
+ this.process = process;
}
- //fork worker
- var worker = fork(workerFilename, workerArgs, {
- 'env': envCopy
- });
+ if (cluster.isMaster) {
+ // Save worker in the cluster.workers array
+ cluster.workers[this.uniqueID] = this;
- workers[id] = worker;
+ // Emit a fork event, on next tick
+ // There is no worker.fork event since this has no real purpose
+ process.nextTick(function() {
+ cluster.emit('fork', self);
+ });
+ }
- worker.on('message', function(message) {
- handleWorkerMessage(worker, message);
- });
+ // Internal message: handle message
+ this.process.on('inernalMessage', function(message, handle) {
+ debug('recived: ', message);
- worker.on('exit', function() {
- debug('worker id=' + id + ' died');
- delete workers[id];
- cluster.emit('death', worker);
+ // relay to handleMessage
+ handleMessage(message, handle, self);
+ return;
});
- return worker;
-};
+ // Non-internal message: relay to Worker object
+ this.process.on('message', function(message, handle) {
+ self.emit('message', message, handle);
+ });
+ // Handle exit
+ self.process.on('exit', function() {
+ debug('worker id=' + self.uniqueID + ' died');
-// Internal function. Called from src/node.js when worker process starts.
-cluster._startWorker = function() {
- assert(cluster.isWorker);
- workerId = parseInt(process.env.NODE_WORKER_ID, 10);
-
- queryMaster({ cmd: 'online' });
-
- // Make callbacks from queryMaster()
- process.on('message', function(msg, handle) {
- debug('recv ' + JSON.stringify(msg));
- if (msg._queryId && msg._queryId in queryCallbacks) {
- var cb = queryCallbacks[msg._queryId];
- if (typeof cb == 'function') {
- cb(msg, handle);
- }
- delete queryCallbacks[msg._queryId];
- }
+ // Prepare worker to die and emit events
+ prepareDeath(self, 'dead', 'death');
});
+
+}
+util.inherits(Worker, EventEmitter);
+cluster.Worker = Worker;
+
+function prepareDeath(worker, state, eventName) {
+
+ // set state to disconnect
+ worker.state = state;
+
+ // Make suicide a boolean
+ worker.suicide = !!worker.suicide;
+
+ // Remove from workers in the master
+ if (cluster.isMaster) {
+ delete cluster.workers[worker.uniqueID];
+ }
+
+ // Emit events
+ worker.emit(eventName, worker);
+ cluster.emit(eventName, worker);
+}
+
+// Send internal message
+function sendInternalMessage(worker, message/*, handler, callback*/) {
+
+ // Exist callback
+ var callback = arguments[arguments.length - 1];
+ if (typeof callback !== 'function') {
+ callback = undefined;
+ }
+
+ // exist handler
+ var handler = arguments[2] !== callback ? arguments[2] : undefined;
+
+ if (!isInternalMessage(message)) {
+ message = internalMessage(message);
+ }
+
+ // Store callback for later
+ if (callback) {
+ message._requestEcho = worker.uniqueID + ':' + (++queryIds);
+ queryCallbacks[message._requestEcho] = callback;
+ }
+
+
+ worker.send(message, handler);
+}
+
+// Send message to worker or master
+Worker.prototype.send = function() {
+
+ //You could also just use process.send in a worker
+ this.process.send.apply(this.process, arguments);
};
-function queryMaster(msg, cb) {
- assert(cluster.isWorker);
+function closeWorkerChannel(worker, callback) {
+ //Apparently the .close method is async, but do not have a callback
+ worker.process._channel.close();
+ worker.process._channel = null;
+ process.nextTick(callback);
+}
- debug('send ' + JSON.stringify(msg));
+// Kill the worker without restarting
+Worker.prototype.destroy = function() {
+ var self = this;
- // Grab some random queryId
- msg._queryId = (++queryIds);
- msg._workerId = workerId;
+ this.suicide = true;
- // Store callback for later. Callback called in _startWorker.
- if (cb) {
- queryCallbacks[msg._queryId] = cb;
+ if (cluster.isMaster) {
+ // Stop channel
+ // this way the worker won't need to propagate suicide state to master
+ closeWorkerChannel(this, function() {
+ // Then kill worker
+ self.process.kill();
+ });
+
+ } else {
+ // Channel is open
+ if (this.process._channel !== null) {
+
+ // Inform master that is is suicide and then kill
+ sendInternalMessage(this, {cmd: 'suicide'}, function() {
+ // Kill worker
+ process.exit(0);
+ });
+
+ // When master do a quickDestroy the channel is not necesarily closed
+ // at the point this function runs. For that reason we need to keep
+ // checking that the channel is still open, until a actually callback
+ // from the master is resicved. Also we can't do a timeout and then
+ // just kill, since we don't know if the quickDestroy function was called.
+ setInterval(function() {
+ if (self.process._channel === null) {
+ process.exit(0);
+ }
+ }, 200);
+
+ } else {
+ process.exit(0);
+ }
}
+};
+
+// Fork a new worker
+cluster.fork = function(env) {
+ // This can only be called from the master.
+ assert(cluster.isMaster);
- // Send message to master.
- process.send(msg);
+ // Make sure that the master has been initalized
+ startMaster();
+
+ return (new cluster.Worker(env));
+};
+
+// Sync way to quickly kill all cluster workers
+// However the workers may not die instantly
+function quickDestroyCluster() {
+ eachWorker(function(worker) {
+ worker.process.kill();
+ });
}
+// Internal function. Called from src/node.js when worker process starts.
+cluster._setupWorker = function() {
+ // Get worker class
+ var worker = cluster.worker = new Worker();
+
+ // Tell master that the worker is online
+ worker.state = 'online';
+ sendInternalMessage(worker, { cmd: 'online' });
+};
-// Internal function. Called by lib/net.js when attempting to bind a
-// server.
-cluster._getServer = function(address, port, addressType, cb) {
+// Internal function. Called by lib/net.js when attempting to bind a server.
+cluster._getServer = function(tcpSelf, address, port, addressType, cb) {
+ // This can only be called from a worker.
assert(cluster.isWorker);
- queryMaster({
+ // Store tcp instance for later use
+ var key = [address, port, addressType].join(':');
+ serverLisenters[key] = tcpSelf;
+
+ // Send a listening message to the master
+ tcpSelf.once('listening', function() {
+ cluster.worker.state = 'listening';
+ sendInternalMessage(cluster.worker, {
+ cmd: 'listening',
+ address: address,
+ port: port,
+ addressType: addressType
+ });
+ });
+
+ // Request the fd handler from the master process
+ var message = {
cmd: 'queryServer',
address: address,
port: port,
addressType: addressType
- }, function(msg, handle) {
+ };
+
+ // The callback will be stored until the master has responed
+ sendInternalMessage(cluster.worker, message, function(msg, handle) {
cb(handle);
});
+
};