diff options
author | Andreas Madsen <amwebdk@gmail.com> | 2012-03-10 16:30:06 +0100 |
---|---|---|
committer | isaacs <i@izs.me> | 2012-03-19 13:29:01 -0700 |
commit | d927fbc9ab01b8120d71dda0519c2ed2e82b030a (patch) | |
tree | 12556ddd3ebe60c77eb9a15c478edd9e21bb5fa9 /lib/cluster.js | |
parent | ab32e9e04346fe627e01b3b9a852f6c4663681f2 (diff) | |
download | android-node-v8-d927fbc9ab01b8120d71dda0519c2ed2e82b030a.tar.gz android-node-v8-d927fbc9ab01b8120d71dda0519c2ed2e82b030a.tar.bz2 android-node-v8-d927fbc9ab01b8120d71dda0519c2ed2e82b030a.zip |
cluster: add graceful disconnect support
This patch add a worker.disconnect() method there will stop the worker from accepting
new connections and then stop the IPC. This allow the worker to die graceful.
When the IPC has been disconnected a 'disconnect' event will emit.
The patch also add a cluster.disconnect() method, this will call worker.disconnect() on
all connected workers. When the workers are disconneted it will then close all server
handlers. This allow the cluster itself to self terminate in a graceful way.
Diffstat (limited to 'lib/cluster.js')
-rw-r--r-- | lib/cluster.js | 119 |
1 files changed, 107 insertions, 12 deletions
diff --git a/lib/cluster.js b/lib/cluster.js index cd90219bfd..977f1dd3be 100644 --- a/lib/cluster.js +++ b/lib/cluster.js @@ -77,6 +77,19 @@ function eachWorker(cb) { } } +// Extremely simple progress tracker +function ProgressTracker(missing, callback) { + this.missing = missing; + this.callback = callback; +} +ProgressTracker.prototype.done = function() { + this.missing -= 1; + this.check(); +}; +ProgressTracker.prototype.check = function() { + if (this.missing === 0) this.callback(); +}; + cluster.setupMaster = function(options) { // This can only be called from the master. assert(cluster.isMaster); @@ -239,7 +252,10 @@ if (cluster.isMaster) { // Messages to a worker will be handled using this methods else if (cluster.isWorker) { - // TODO: the disconnect step will use this + // Handle worker.disconnect from master + messageHandingObject.disconnect = function(message, worker) { + worker.disconnect(); + }; } function toDecInt(value) { @@ -293,9 +309,11 @@ function Worker(customEnv) { }); } - // handle internalMessage and exit event + // handle internalMessage, exit and disconnect event this.process.on('internalMessage', handleMessage.bind(null, this)); this.process.on('exit', prepareDeath.bind(null, this, 'dead', 'death')); + this.process.on('disconnect', + prepareDeath.bind(null, this, 'disconnected', 'disconnect')); // relay message and error this.process.on('message', this.emit.bind(this, 'message')); @@ -356,14 +374,6 @@ Worker.prototype.send = function() { this.process.send.apply(this.process, arguments); }; - -function closeWorkerChannel(worker, callback) { - //Apparently the .close method is async, but do not have a callback - worker.process._channel.close(); - worker.process._channel = null; - process.nextTick(callback); -} - // Kill the worker without restarting Worker.prototype.destroy = function() { var self = this; @@ -373,9 +383,14 @@ Worker.prototype.destroy = function() { if (cluster.isMaster) { // Disconnect IPC channel // this way the worker won't need to propagate suicide state to master - closeWorkerChannel(this, function() { + if (self.process.connected) { + self.process.once('disconnect', function() { + self.process.kill(); + }); + self.process.disconnect(); + } else { self.process.kill(); - }); + } } else { // Channel is open @@ -403,6 +418,59 @@ Worker.prototype.destroy = function() { } }; +// The .disconnect function will close all server and then disconnect +// the IPC channel. +if (cluster.isMaster) { + // Used in master + Worker.prototype.disconnect = function() { + this.suicide = true; + + sendInternalMessage(this, {cmd: 'disconnect'}); + }; + +} else { + // Used in workers + Worker.prototype.disconnect = function() { + var self = this; + + this.suicide = true; + + // keep track of open servers + var servers = Object.keys(serverLisenters).length; + var progress = new ProgressTracker(servers, function() { + // there are no more servers open so we will close the IPC channel. + // Closeing the IPC channel will emit emit a disconnect event + // in both master and worker on the process object. + // This event will be handled by prepearDeath. + self.process.disconnect(); + }); + + // depending on where this function was called from (master or worker) + // the suicide state has allready been set. + // But it dosn't really matter if we set it again. + sendInternalMessage(this, {cmd: 'suicide'}, function() { + // in case there are no servers + progress.check(); + + // closeing all servers graceful + var server; + for (var key in serverLisenters) { + server = serverLisenters[key]; + + // in case the server is closed we wont close it again + if (server._handle === null) { + progress.done(); + continue; + } + + server.on('close', progress.done.bind(progress)); + server.close(); + } + }); + + }; +} + // Fork a new worker cluster.fork = function(env) { // This can only be called from the master. @@ -414,6 +482,33 @@ cluster.fork = function(env) { return (new cluster.Worker(env)); }; +// execute .disconnect on all workers and close handlers when done +cluster.disconnect = function(callback) { + // This can only be called from the master. + assert(cluster.isMaster); + + // Close all TCP handlers when all workers are disconnected + var workers = Object.keys(cluster.workers).length; + var progress = new ProgressTracker(workers, function() { + for (var key in serverHandlers) { + serverHandlers[key].close(); + delete serverHandlers[key]; + } + + // call callback when done + if (callback) callback(); + }); + + // begin disconnecting all workers + eachWorker(function(worker) { + worker.once('disconnect', progress.done.bind(progress)); + worker.disconnect(); + }); + + // in case there wasn't any workers + progress.check(); +}; + // Sync way to quickly kill all cluster workers // However the workers may not die instantly function quickDestroyCluster() { |