diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/child_process.js | 32 | ||||
-rw-r--r-- | lib/cluster.js | 13 | ||||
-rw-r--r-- | lib/dgram.js | 90 |
3 files changed, 105 insertions, 30 deletions
diff --git a/lib/child_process.js b/lib/child_process.js index 1ac2b9169f..ef8c800926 100644 --- a/lib/child_process.js +++ b/lib/child_process.js @@ -155,6 +155,18 @@ var handleConversion = { emit(socket); } + }, + + 'dgram.Native': { + simultaneousAccepts: false, + + send: function(message, handle) { + return handle; + }, + + got: function(message, handle, emit) { + emit(handle); + } } }; @@ -355,18 +367,20 @@ function setupChannel(target, channel) { // this message will be handled by an internalMessage event handler message = { cmd: 'NODE_HANDLE', - type: 'net.', msg: message }; - switch (handle.constructor.name) { - case 'Socket': - message.type += 'Socket'; break; - case 'Server': - message.type += 'Server'; break; - case 'Pipe': - case 'TCP': - message.type += 'Native'; break; + if (handle instanceof net.Socket) { + message.type = 'net.Socket'; + } else if (handle instanceof net.Server) { + message.type = 'net.Server'; + } else if (handle instanceof process.binding('tcp_wrap').TCP || + handle instanceof process.binding('pipe_wrap').Pipe) { + message.type = 'net.Native'; + } else if (handle instanceof process.binding('udp_wrap').UDP) { + message.type = 'dgram.Native'; + } else { + throw new TypeError("This handle type can't be sent"); } var obj = handleConversion[message.type]; diff --git a/lib/cluster.js b/lib/cluster.js index b2e7625ade..6c0f8f469d 100644 --- a/lib/cluster.js +++ b/lib/cluster.js @@ -227,8 +227,14 @@ if (cluster.isMaster) { if (serverHandlers.hasOwnProperty(key)) { handler = serverHandlers[key]; + } else if (message.addressType === 'udp4' || + message.addressType === 'udp6') { + var dgram = require('dgram'); + handler = dgram._createSocketHandle.apply(net, args); + serverHandlers[key] = handler; } else { - handler = serverHandlers[key] = net._createServerHandle.apply(net, args); + handler = net._createServerHandle.apply(net, args); + serverHandlers[key] = handler; } // echo callback with the fd handler associated with it @@ -259,9 +265,9 @@ if (cluster.isMaster) { messageHandler.suicide = function(message, worker) { worker.suicide = true; }; - } + // Messages to a worker will be handled using these methods else if (cluster.isWorker) { @@ -541,7 +547,8 @@ cluster._setupWorker = function() { sendInternalMessage(worker, { cmd: 'online' }); }; -// Internal function. Called by lib/net.js when attempting to bind a server. +// Internal function. Called by net.js and dgram.js when attempting to bind a +// TCP server or UDP socket. cluster._getServer = function(tcpSelf, address, port, addressType, fd, cb) { // This can only be called from a worker. assert(cluster.isWorker); diff --git a/lib/dgram.js b/lib/dgram.js index 65e080f81a..242c197515 100644 --- a/lib/dgram.js +++ b/lib/dgram.js @@ -19,6 +19,7 @@ // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE // USE OR OTHER DEALINGS IN THE SOFTWARE. +var assert = require('assert'); var util = require('util'); var events = require('events'); @@ -29,6 +30,7 @@ var BIND_STATE_BINDING = 1; var BIND_STATE_BOUND = 2; // lazily loaded +var cluster = null; var dns = null; var net = null; @@ -86,6 +88,24 @@ function newHandle(type) { } +exports._createSocketHandle = function(address, port, addressType, fd) { + // Opening an existing fd is not supported for UDP handles. + assert(typeof fd !== 'number' || fd < 0); + + var handle = newHandle(addressType); + + if (port || address) { + var r = handle.bind(address, port || 0, 0); + if (r == -1) { + handle.close(); + handle = null; + } + } + + return handle; +}; + + function Socket(type, listener) { events.EventEmitter.call(this); @@ -110,41 +130,75 @@ exports.createSocket = function(type, listener) { }; +function startListening(socket) { + socket._handle.onmessage = onMessage; + // Todo: handle errors + socket._handle.recvStart(); + socket._receiving = true; + socket._bindState = BIND_STATE_BOUND; + socket.fd = -42; // compatibility hack + + socket.emit('listening'); +} + + Socket.prototype.bind = function(port, address, callback) { var self = this; self._healthCheck(); + if (this._bindState != BIND_STATE_UNBOUND) + throw new Error('Socket is already bound'); + + this._bindState = BIND_STATE_BINDING; + if (typeof callback === 'function') self.once('listening', callback); // resolve address first self._handle.lookup(address, function(err, ip) { - self._bindState = BIND_STATE_UNBOUND; - - if (!self._handle) - return; // handle has been closed in the mean time - if (err) { + self._bindState = BIND_STATE_UNBOUND; self.emit('error', err); return; } - if (self._handle.bind(ip, port || 0, /*flags=*/ 0)) { - self.emit('error', errnoException(errno, 'bind')); - return; - } - - self._handle.onmessage = onMessage; - self._handle.recvStart(); - self._receiving = true; - self._bindState = BIND_STATE_BOUND; - self.fd = -42; // compatibility hack + if (!cluster) + cluster = require('cluster'); + + if (cluster.isWorker) { + cluster._getServer(self, ip, port, self.type, -1, function(handle) { + if (!self._handle) + // handle has been closed in the mean time. + return handle.close(); + + // Set up the handle that we got from master. + handle.lookup = self._handle.lookup; + handle.bind = self._handle.bind; + handle.send = self._handle.send; + handle.owner = self; + + // Replace the existing handle by the handle we got from master. + self._handle.close(); + self._handle = handle; + + startListening(self); + }); + + } else { + if (!self._handle) + return; // handle has been closed in the mean time + + if (self._handle.bind(ip, port || 0, /*flags=*/ 0)) { + self.emit('error', errnoException(errno, 'bind')); + self._bindState = BIND_STATE_UNBOUND; + // Todo: close? + return; + } - self.emit('listening'); + startListening(self); + } }); - - self._bindState = BIND_STATE_BINDING; }; |