summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/child_process.js32
-rw-r--r--lib/cluster.js13
-rw-r--r--lib/dgram.js90
3 files changed, 105 insertions, 30 deletions
diff --git a/lib/child_process.js b/lib/child_process.js
index 1ac2b9169f..ef8c800926 100644
--- a/lib/child_process.js
+++ b/lib/child_process.js
@@ -155,6 +155,18 @@ var handleConversion = {
emit(socket);
}
+ },
+
+ 'dgram.Native': {
+ simultaneousAccepts: false,
+
+ send: function(message, handle) {
+ return handle;
+ },
+
+ got: function(message, handle, emit) {
+ emit(handle);
+ }
}
};
@@ -355,18 +367,20 @@ function setupChannel(target, channel) {
// this message will be handled by an internalMessage event handler
message = {
cmd: 'NODE_HANDLE',
- type: 'net.',
msg: message
};
- switch (handle.constructor.name) {
- case 'Socket':
- message.type += 'Socket'; break;
- case 'Server':
- message.type += 'Server'; break;
- case 'Pipe':
- case 'TCP':
- message.type += 'Native'; break;
+ if (handle instanceof net.Socket) {
+ message.type = 'net.Socket';
+ } else if (handle instanceof net.Server) {
+ message.type = 'net.Server';
+ } else if (handle instanceof process.binding('tcp_wrap').TCP ||
+ handle instanceof process.binding('pipe_wrap').Pipe) {
+ message.type = 'net.Native';
+ } else if (handle instanceof process.binding('udp_wrap').UDP) {
+ message.type = 'dgram.Native';
+ } else {
+ throw new TypeError("This handle type can't be sent");
}
var obj = handleConversion[message.type];
diff --git a/lib/cluster.js b/lib/cluster.js
index b2e7625ade..6c0f8f469d 100644
--- a/lib/cluster.js
+++ b/lib/cluster.js
@@ -227,8 +227,14 @@ if (cluster.isMaster) {
if (serverHandlers.hasOwnProperty(key)) {
handler = serverHandlers[key];
+ } else if (message.addressType === 'udp4' ||
+ message.addressType === 'udp6') {
+ var dgram = require('dgram');
+ handler = dgram._createSocketHandle.apply(net, args);
+ serverHandlers[key] = handler;
} else {
- handler = serverHandlers[key] = net._createServerHandle.apply(net, args);
+ handler = net._createServerHandle.apply(net, args);
+ serverHandlers[key] = handler;
}
// echo callback with the fd handler associated with it
@@ -259,9 +265,9 @@ if (cluster.isMaster) {
messageHandler.suicide = function(message, worker) {
worker.suicide = true;
};
-
}
+
// Messages to a worker will be handled using these methods
else if (cluster.isWorker) {
@@ -541,7 +547,8 @@ cluster._setupWorker = function() {
sendInternalMessage(worker, { cmd: 'online' });
};
-// Internal function. Called by lib/net.js when attempting to bind a server.
+// Internal function. Called by net.js and dgram.js when attempting to bind a
+// TCP server or UDP socket.
cluster._getServer = function(tcpSelf, address, port, addressType, fd, cb) {
// This can only be called from a worker.
assert(cluster.isWorker);
diff --git a/lib/dgram.js b/lib/dgram.js
index 65e080f81a..242c197515 100644
--- a/lib/dgram.js
+++ b/lib/dgram.js
@@ -19,6 +19,7 @@
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
+var assert = require('assert');
var util = require('util');
var events = require('events');
@@ -29,6 +30,7 @@ var BIND_STATE_BINDING = 1;
var BIND_STATE_BOUND = 2;
// lazily loaded
+var cluster = null;
var dns = null;
var net = null;
@@ -86,6 +88,24 @@ function newHandle(type) {
}
+exports._createSocketHandle = function(address, port, addressType, fd) {
+ // Opening an existing fd is not supported for UDP handles.
+ assert(typeof fd !== 'number' || fd < 0);
+
+ var handle = newHandle(addressType);
+
+ if (port || address) {
+ var r = handle.bind(address, port || 0, 0);
+ if (r == -1) {
+ handle.close();
+ handle = null;
+ }
+ }
+
+ return handle;
+};
+
+
function Socket(type, listener) {
events.EventEmitter.call(this);
@@ -110,41 +130,75 @@ exports.createSocket = function(type, listener) {
};
+function startListening(socket) {
+ socket._handle.onmessage = onMessage;
+ // Todo: handle errors
+ socket._handle.recvStart();
+ socket._receiving = true;
+ socket._bindState = BIND_STATE_BOUND;
+ socket.fd = -42; // compatibility hack
+
+ socket.emit('listening');
+}
+
+
Socket.prototype.bind = function(port, address, callback) {
var self = this;
self._healthCheck();
+ if (this._bindState != BIND_STATE_UNBOUND)
+ throw new Error('Socket is already bound');
+
+ this._bindState = BIND_STATE_BINDING;
+
if (typeof callback === 'function')
self.once('listening', callback);
// resolve address first
self._handle.lookup(address, function(err, ip) {
- self._bindState = BIND_STATE_UNBOUND;
-
- if (!self._handle)
- return; // handle has been closed in the mean time
-
if (err) {
+ self._bindState = BIND_STATE_UNBOUND;
self.emit('error', err);
return;
}
- if (self._handle.bind(ip, port || 0, /*flags=*/ 0)) {
- self.emit('error', errnoException(errno, 'bind'));
- return;
- }
-
- self._handle.onmessage = onMessage;
- self._handle.recvStart();
- self._receiving = true;
- self._bindState = BIND_STATE_BOUND;
- self.fd = -42; // compatibility hack
+ if (!cluster)
+ cluster = require('cluster');
+
+ if (cluster.isWorker) {
+ cluster._getServer(self, ip, port, self.type, -1, function(handle) {
+ if (!self._handle)
+ // handle has been closed in the mean time.
+ return handle.close();
+
+ // Set up the handle that we got from master.
+ handle.lookup = self._handle.lookup;
+ handle.bind = self._handle.bind;
+ handle.send = self._handle.send;
+ handle.owner = self;
+
+ // Replace the existing handle by the handle we got from master.
+ self._handle.close();
+ self._handle = handle;
+
+ startListening(self);
+ });
+
+ } else {
+ if (!self._handle)
+ return; // handle has been closed in the mean time
+
+ if (self._handle.bind(ip, port || 0, /*flags=*/ 0)) {
+ self.emit('error', errnoException(errno, 'bind'));
+ self._bindState = BIND_STATE_UNBOUND;
+ // Todo: close?
+ return;
+ }
- self.emit('listening');
+ startListening(self);
+ }
});
-
- self._bindState = BIND_STATE_BINDING;
};