summaryrefslogtreecommitdiff
path: root/lib/cluster.js
diff options
context:
space:
mode:
authorAndreas Madsen <amwebdk@gmail.com>2012-03-10 16:30:06 +0100
committerisaacs <i@izs.me>2012-03-19 13:29:01 -0700
commitd927fbc9ab01b8120d71dda0519c2ed2e82b030a (patch)
tree12556ddd3ebe60c77eb9a15c478edd9e21bb5fa9 /lib/cluster.js
parentab32e9e04346fe627e01b3b9a852f6c4663681f2 (diff)
downloadandroid-node-v8-d927fbc9ab01b8120d71dda0519c2ed2e82b030a.tar.gz
android-node-v8-d927fbc9ab01b8120d71dda0519c2ed2e82b030a.tar.bz2
android-node-v8-d927fbc9ab01b8120d71dda0519c2ed2e82b030a.zip
cluster: add graceful disconnect support
This patch add a worker.disconnect() method there will stop the worker from accepting new connections and then stop the IPC. This allow the worker to die graceful. When the IPC has been disconnected a 'disconnect' event will emit. The patch also add a cluster.disconnect() method, this will call worker.disconnect() on all connected workers. When the workers are disconneted it will then close all server handlers. This allow the cluster itself to self terminate in a graceful way.
Diffstat (limited to 'lib/cluster.js')
-rw-r--r--lib/cluster.js119
1 files changed, 107 insertions, 12 deletions
diff --git a/lib/cluster.js b/lib/cluster.js
index cd90219bfd..977f1dd3be 100644
--- a/lib/cluster.js
+++ b/lib/cluster.js
@@ -77,6 +77,19 @@ function eachWorker(cb) {
}
}
+// Extremely simple progress tracker
+function ProgressTracker(missing, callback) {
+ this.missing = missing;
+ this.callback = callback;
+}
+ProgressTracker.prototype.done = function() {
+ this.missing -= 1;
+ this.check();
+};
+ProgressTracker.prototype.check = function() {
+ if (this.missing === 0) this.callback();
+};
+
cluster.setupMaster = function(options) {
// This can only be called from the master.
assert(cluster.isMaster);
@@ -239,7 +252,10 @@ if (cluster.isMaster) {
// Messages to a worker will be handled using this methods
else if (cluster.isWorker) {
- // TODO: the disconnect step will use this
+ // Handle worker.disconnect from master
+ messageHandingObject.disconnect = function(message, worker) {
+ worker.disconnect();
+ };
}
function toDecInt(value) {
@@ -293,9 +309,11 @@ function Worker(customEnv) {
});
}
- // handle internalMessage and exit event
+ // handle internalMessage, exit and disconnect event
this.process.on('internalMessage', handleMessage.bind(null, this));
this.process.on('exit', prepareDeath.bind(null, this, 'dead', 'death'));
+ this.process.on('disconnect',
+ prepareDeath.bind(null, this, 'disconnected', 'disconnect'));
// relay message and error
this.process.on('message', this.emit.bind(this, 'message'));
@@ -356,14 +374,6 @@ Worker.prototype.send = function() {
this.process.send.apply(this.process, arguments);
};
-
-function closeWorkerChannel(worker, callback) {
- //Apparently the .close method is async, but do not have a callback
- worker.process._channel.close();
- worker.process._channel = null;
- process.nextTick(callback);
-}
-
// Kill the worker without restarting
Worker.prototype.destroy = function() {
var self = this;
@@ -373,9 +383,14 @@ Worker.prototype.destroy = function() {
if (cluster.isMaster) {
// Disconnect IPC channel
// this way the worker won't need to propagate suicide state to master
- closeWorkerChannel(this, function() {
+ if (self.process.connected) {
+ self.process.once('disconnect', function() {
+ self.process.kill();
+ });
+ self.process.disconnect();
+ } else {
self.process.kill();
- });
+ }
} else {
// Channel is open
@@ -403,6 +418,59 @@ Worker.prototype.destroy = function() {
}
};
+// The .disconnect function will close all server and then disconnect
+// the IPC channel.
+if (cluster.isMaster) {
+ // Used in master
+ Worker.prototype.disconnect = function() {
+ this.suicide = true;
+
+ sendInternalMessage(this, {cmd: 'disconnect'});
+ };
+
+} else {
+ // Used in workers
+ Worker.prototype.disconnect = function() {
+ var self = this;
+
+ this.suicide = true;
+
+ // keep track of open servers
+ var servers = Object.keys(serverLisenters).length;
+ var progress = new ProgressTracker(servers, function() {
+ // there are no more servers open so we will close the IPC channel.
+ // Closeing the IPC channel will emit emit a disconnect event
+ // in both master and worker on the process object.
+ // This event will be handled by prepearDeath.
+ self.process.disconnect();
+ });
+
+ // depending on where this function was called from (master or worker)
+ // the suicide state has allready been set.
+ // But it dosn't really matter if we set it again.
+ sendInternalMessage(this, {cmd: 'suicide'}, function() {
+ // in case there are no servers
+ progress.check();
+
+ // closeing all servers graceful
+ var server;
+ for (var key in serverLisenters) {
+ server = serverLisenters[key];
+
+ // in case the server is closed we wont close it again
+ if (server._handle === null) {
+ progress.done();
+ continue;
+ }
+
+ server.on('close', progress.done.bind(progress));
+ server.close();
+ }
+ });
+
+ };
+}
+
// Fork a new worker
cluster.fork = function(env) {
// This can only be called from the master.
@@ -414,6 +482,33 @@ cluster.fork = function(env) {
return (new cluster.Worker(env));
};
+// execute .disconnect on all workers and close handlers when done
+cluster.disconnect = function(callback) {
+ // This can only be called from the master.
+ assert(cluster.isMaster);
+
+ // Close all TCP handlers when all workers are disconnected
+ var workers = Object.keys(cluster.workers).length;
+ var progress = new ProgressTracker(workers, function() {
+ for (var key in serverHandlers) {
+ serverHandlers[key].close();
+ delete serverHandlers[key];
+ }
+
+ // call callback when done
+ if (callback) callback();
+ });
+
+ // begin disconnecting all workers
+ eachWorker(function(worker) {
+ worker.once('disconnect', progress.done.bind(progress));
+ worker.disconnect();
+ });
+
+ // in case there wasn't any workers
+ progress.check();
+};
+
// Sync way to quickly kill all cluster workers
// However the workers may not die instantly
function quickDestroyCluster() {