summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBert Belder <bertbelder@gmail.com>2013-01-22 23:52:20 +0100
committerBert Belder <bertbelder@gmail.com>2013-01-28 22:12:21 +0100
commit5e7e51c2fe5b66f65a78afa8c931488c780fa0c4 (patch)
treeb627dc049b2204136c9559f158c1948542979bd7
parentc13354e339da3849156287b0ad3bb70b9c115632 (diff)
downloadandroid-node-v8-5e7e51c2fe5b66f65a78afa8c931488c780fa0c4.tar.gz
android-node-v8-5e7e51c2fe5b66f65a78afa8c931488c780fa0c4.tar.bz2
android-node-v8-5e7e51c2fe5b66f65a78afa8c931488c780fa0c4.zip
cluster: support datagram sockets
-rw-r--r--lib/child_process.js32
-rw-r--r--lib/cluster.js13
-rw-r--r--lib/dgram.js90
-rw-r--r--src/handle_wrap.h2
-rw-r--r--src/stream_wrap.cc54
-rw-r--r--src/udp_wrap.cc17
-rw-r--r--src/udp_wrap.h1
-rw-r--r--test/simple/test-cluster-dgram-1.js115
-rw-r--r--test/simple/test-cluster-dgram-2.js81
9 files changed, 356 insertions, 49 deletions
diff --git a/lib/child_process.js b/lib/child_process.js
index 1ac2b9169f..ef8c800926 100644
--- a/lib/child_process.js
+++ b/lib/child_process.js
@@ -155,6 +155,18 @@ var handleConversion = {
emit(socket);
}
+ },
+
+ 'dgram.Native': {
+ simultaneousAccepts: false,
+
+ send: function(message, handle) {
+ return handle;
+ },
+
+ got: function(message, handle, emit) {
+ emit(handle);
+ }
}
};
@@ -355,18 +367,20 @@ function setupChannel(target, channel) {
// this message will be handled by an internalMessage event handler
message = {
cmd: 'NODE_HANDLE',
- type: 'net.',
msg: message
};
- switch (handle.constructor.name) {
- case 'Socket':
- message.type += 'Socket'; break;
- case 'Server':
- message.type += 'Server'; break;
- case 'Pipe':
- case 'TCP':
- message.type += 'Native'; break;
+ if (handle instanceof net.Socket) {
+ message.type = 'net.Socket';
+ } else if (handle instanceof net.Server) {
+ message.type = 'net.Server';
+ } else if (handle instanceof process.binding('tcp_wrap').TCP ||
+ handle instanceof process.binding('pipe_wrap').Pipe) {
+ message.type = 'net.Native';
+ } else if (handle instanceof process.binding('udp_wrap').UDP) {
+ message.type = 'dgram.Native';
+ } else {
+ throw new TypeError("This handle type can't be sent");
}
var obj = handleConversion[message.type];
diff --git a/lib/cluster.js b/lib/cluster.js
index b2e7625ade..6c0f8f469d 100644
--- a/lib/cluster.js
+++ b/lib/cluster.js
@@ -227,8 +227,14 @@ if (cluster.isMaster) {
if (serverHandlers.hasOwnProperty(key)) {
handler = serverHandlers[key];
+ } else if (message.addressType === 'udp4' ||
+ message.addressType === 'udp6') {
+ var dgram = require('dgram');
+ handler = dgram._createSocketHandle.apply(net, args);
+ serverHandlers[key] = handler;
} else {
- handler = serverHandlers[key] = net._createServerHandle.apply(net, args);
+ handler = net._createServerHandle.apply(net, args);
+ serverHandlers[key] = handler;
}
// echo callback with the fd handler associated with it
@@ -259,9 +265,9 @@ if (cluster.isMaster) {
messageHandler.suicide = function(message, worker) {
worker.suicide = true;
};
-
}
+
// Messages to a worker will be handled using these methods
else if (cluster.isWorker) {
@@ -541,7 +547,8 @@ cluster._setupWorker = function() {
sendInternalMessage(worker, { cmd: 'online' });
};
-// Internal function. Called by lib/net.js when attempting to bind a server.
+// Internal function. Called by net.js and dgram.js when attempting to bind a
+// TCP server or UDP socket.
cluster._getServer = function(tcpSelf, address, port, addressType, fd, cb) {
// This can only be called from a worker.
assert(cluster.isWorker);
diff --git a/lib/dgram.js b/lib/dgram.js
index 65e080f81a..242c197515 100644
--- a/lib/dgram.js
+++ b/lib/dgram.js
@@ -19,6 +19,7 @@
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
+var assert = require('assert');
var util = require('util');
var events = require('events');
@@ -29,6 +30,7 @@ var BIND_STATE_BINDING = 1;
var BIND_STATE_BOUND = 2;
// lazily loaded
+var cluster = null;
var dns = null;
var net = null;
@@ -86,6 +88,24 @@ function newHandle(type) {
}
+exports._createSocketHandle = function(address, port, addressType, fd) {
+ // Opening an existing fd is not supported for UDP handles.
+ assert(typeof fd !== 'number' || fd < 0);
+
+ var handle = newHandle(addressType);
+
+ if (port || address) {
+ var r = handle.bind(address, port || 0, 0);
+ if (r == -1) {
+ handle.close();
+ handle = null;
+ }
+ }
+
+ return handle;
+};
+
+
function Socket(type, listener) {
events.EventEmitter.call(this);
@@ -110,41 +130,75 @@ exports.createSocket = function(type, listener) {
};
+function startListening(socket) {
+ socket._handle.onmessage = onMessage;
+ // Todo: handle errors
+ socket._handle.recvStart();
+ socket._receiving = true;
+ socket._bindState = BIND_STATE_BOUND;
+ socket.fd = -42; // compatibility hack
+
+ socket.emit('listening');
+}
+
+
Socket.prototype.bind = function(port, address, callback) {
var self = this;
self._healthCheck();
+ if (this._bindState != BIND_STATE_UNBOUND)
+ throw new Error('Socket is already bound');
+
+ this._bindState = BIND_STATE_BINDING;
+
if (typeof callback === 'function')
self.once('listening', callback);
// resolve address first
self._handle.lookup(address, function(err, ip) {
- self._bindState = BIND_STATE_UNBOUND;
-
- if (!self._handle)
- return; // handle has been closed in the mean time
-
if (err) {
+ self._bindState = BIND_STATE_UNBOUND;
self.emit('error', err);
return;
}
- if (self._handle.bind(ip, port || 0, /*flags=*/ 0)) {
- self.emit('error', errnoException(errno, 'bind'));
- return;
- }
-
- self._handle.onmessage = onMessage;
- self._handle.recvStart();
- self._receiving = true;
- self._bindState = BIND_STATE_BOUND;
- self.fd = -42; // compatibility hack
+ if (!cluster)
+ cluster = require('cluster');
+
+ if (cluster.isWorker) {
+ cluster._getServer(self, ip, port, self.type, -1, function(handle) {
+ if (!self._handle)
+ // handle has been closed in the mean time.
+ return handle.close();
+
+ // Set up the handle that we got from master.
+ handle.lookup = self._handle.lookup;
+ handle.bind = self._handle.bind;
+ handle.send = self._handle.send;
+ handle.owner = self;
+
+ // Replace the existing handle by the handle we got from master.
+ self._handle.close();
+ self._handle = handle;
+
+ startListening(self);
+ });
+
+ } else {
+ if (!self._handle)
+ return; // handle has been closed in the mean time
+
+ if (self._handle.bind(ip, port || 0, /*flags=*/ 0)) {
+ self.emit('error', errnoException(errno, 'bind'));
+ self._bindState = BIND_STATE_UNBOUND;
+ // Todo: close?
+ return;
+ }
- self.emit('listening');
+ startListening(self);
+ }
});
-
- self._bindState = BIND_STATE_BINDING;
};
diff --git a/src/handle_wrap.h b/src/handle_wrap.h
index a358e812a5..951b9386f5 100644
--- a/src/handle_wrap.h
+++ b/src/handle_wrap.h
@@ -53,6 +53,8 @@ class HandleWrap {
static v8::Handle<v8::Value> Ref(const v8::Arguments& args);
static v8::Handle<v8::Value> Unref(const v8::Arguments& args);
+ inline uv_handle_t* GetHandle() { return handle__; };
+
protected:
HandleWrap(v8::Handle<v8::Object> object, uv_handle_t* handle);
virtual ~HandleWrap();
diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc
index 910e94b2ed..e6756e1195 100644
--- a/src/stream_wrap.cc
+++ b/src/stream_wrap.cc
@@ -27,6 +27,7 @@
#include "pipe_wrap.h"
#include "tcp_wrap.h"
#include "req_wrap.h"
+#include "udp_wrap.h"
#include "node_counters.h"
#include <stdlib.h> // abort()
@@ -118,7 +119,7 @@ StreamWrap::StreamWrap(Handle<Object> object, uv_stream_t* stream)
void StreamWrap::SetHandle(uv_handle_t* h) {
HandleWrap::SetHandle(h);
- stream_ = (uv_stream_t*)h;
+ stream_ = reinterpret_cast<uv_stream_t*>(h);
stream_->data = this;
}
@@ -173,6 +174,28 @@ uv_buf_t StreamWrap::OnAlloc(uv_handle_t* handle, size_t suggested_size) {
}
+template <class WrapType, class UVType>
+static Local<Object> AcceptHandle(uv_stream_t* pipe) {
+ HandleScope scope;
+ Local<Object> wrap_obj;
+ WrapType* wrap;
+ UVType* handle;
+
+ wrap_obj = WrapType::Instantiate();
+ if (wrap_obj.IsEmpty())
+ return Local<Object>();
+
+ wrap = static_cast<WrapType*>(
+ wrap_obj->GetAlignedPointerFromInternalField(0));
+ handle = wrap->UVHandle();
+
+ if (uv_accept(pipe, reinterpret_cast<uv_stream_t*>(handle)))
+ abort();
+
+ return scope.Close(wrap_obj);
+}
+
+
void StreamWrap::OnReadCommon(uv_stream_t* handle, ssize_t nread,
uv_buf_t buf, uv_handle_type pending) {
HandleScope scope;
@@ -212,19 +235,16 @@ void StreamWrap::OnReadCommon(uv_stream_t* handle, ssize_t nread,
Local<Object> pending_obj;
if (pending == UV_TCP) {
- pending_obj = TCPWrap::Instantiate();
+ pending_obj = AcceptHandle<TCPWrap, uv_tcp_t>(handle);
} else if (pending == UV_NAMED_PIPE) {
- pending_obj = PipeWrap::Instantiate();
+ pending_obj = AcceptHandle<PipeWrap, uv_pipe_t>(handle);
+ } else if (pending == UV_UDP) {
+ pending_obj = AcceptHandle<UDPWrap, uv_udp_t>(handle);
} else {
- // We only support sending UV_TCP and UV_NAMED_PIPE right now.
assert(pending == UV_UNKNOWN_HANDLE);
}
if (!pending_obj.IsEmpty()) {
- assert(pending_obj->InternalFieldCount() > 0);
- StreamWrap* pending_wrap = static_cast<StreamWrap*>(
- pending_obj->GetAlignedPointerFromInternalField(0));
- if (uv_accept(handle, pending_wrap->GetStream())) abort();
argv[3] = pending_obj;
argc++;
}
@@ -246,7 +266,7 @@ void StreamWrap::OnRead(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
void StreamWrap::OnRead2(uv_pipe_t* handle, ssize_t nread, uv_buf_t buf,
uv_handle_type pending) {
- OnReadCommon((uv_stream_t*)handle, nread, buf, pending);
+ OnReadCommon(reinterpret_cast<uv_stream_t*>(handle), nread, buf, pending);
}
@@ -404,14 +424,14 @@ Handle<Value> StreamWrap::WriteStringImpl(const Arguments& args) {
StreamWrap::AfterWrite);
} else {
- uv_stream_t* send_stream = NULL;
+ uv_handle_t* send_handle = NULL;
if (args[1]->IsObject()) {
- Local<Object> send_stream_obj = args[1]->ToObject();
- assert(send_stream_obj->InternalFieldCount() > 0);
- StreamWrap* send_stream_wrap = static_cast<StreamWrap*>(
- send_stream_obj->GetAlignedPointerFromInternalField(0));
- send_stream = send_stream_wrap->GetStream();
+ Local<Object> send_handle_obj = args[1]->ToObject();
+ assert(send_handle_obj->InternalFieldCount() > 0);
+ HandleWrap* send_handle_wrap = static_cast<HandleWrap*>(
+ send_handle_obj->GetAlignedPointerFromInternalField(0));
+ send_handle = send_handle_wrap->GetHandle();
// Reference StreamWrap instance to prevent it from being garbage
// collected before `AfterWrite` is called.
@@ -419,14 +439,14 @@ Handle<Value> StreamWrap::WriteStringImpl(const Arguments& args) {
handle_sym = NODE_PSYMBOL("handle");
}
assert(!req_wrap->object_.IsEmpty());
- req_wrap->object_->Set(handle_sym, send_stream_obj);
+ req_wrap->object_->Set(handle_sym, send_handle_obj);
}
r = uv_write2(&req_wrap->req_,
wrap->stream_,
&buf,
1,
- send_stream,
+ reinterpret_cast<uv_stream_t*>(send_handle),
StreamWrap::AfterWrite);
}
diff --git a/src/udp_wrap.cc b/src/udp_wrap.cc
index d4761493cb..6d01ebd6f3 100644
--- a/src/udp_wrap.cc
+++ b/src/udp_wrap.cc
@@ -40,6 +40,7 @@ typedef ReqWrap<uv_udp_send_t> SendWrap;
// see tcp_wrap.cc
Local<Object> AddressToJS(const sockaddr* addr);
+static Persistent<Function> constructor;
static Persistent<String> buffer_sym;
static Persistent<String> oncomplete_sym;
static Persistent<String> onmessage_sym;
@@ -98,8 +99,9 @@ void UDPWrap::Initialize(Handle<Object> target) {
NODE_SET_PROTOTYPE_METHOD(t, "ref", HandleWrap::Ref);
NODE_SET_PROTOTYPE_METHOD(t, "unref", HandleWrap::Unref);
- target->Set(String::NewSymbol("UDP"),
- Persistent<FunctionTemplate>::New(t)->GetFunction());
+ constructor = Persistent<Function>::New(
+ Persistent<FunctionTemplate>::New(t)->GetFunction());
+ target->Set(String::NewSymbol("UDP"), constructor);
}
@@ -392,6 +394,17 @@ UDPWrap* UDPWrap::Unwrap(Local<Object> obj) {
}
+Local<Object> UDPWrap::Instantiate() {
+ // If this assert fires then Initialize hasn't been called yet.
+ assert(constructor.IsEmpty() == false);
+
+ HandleScope scope;
+ Local<Object> obj = constructor->NewInstance();
+
+ return scope.Close(obj);
+}
+
+
uv_udp_t* UDPWrap::UVHandle() {
return &handle_;
}
diff --git a/src/udp_wrap.h b/src/udp_wrap.h
index 9ca2eaea90..527346fccd 100644
--- a/src/udp_wrap.h
+++ b/src/udp_wrap.h
@@ -33,6 +33,7 @@ class UDPWrap: public HandleWrap {
static Handle<Value> SetTTL(const Arguments& args);
static UDPWrap* Unwrap(Local<Object> obj);
+ static Local<Object> Instantiate();
uv_udp_t* UVHandle();
private:
diff --git a/test/simple/test-cluster-dgram-1.js b/test/simple/test-cluster-dgram-1.js
new file mode 100644
index 0000000000..c6dc095d08
--- /dev/null
+++ b/test/simple/test-cluster-dgram-1.js
@@ -0,0 +1,115 @@
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+var NUM_WORKERS = 4;
+var PACKETS_PER_WORKER = 10;
+
+var assert = require('assert');
+var cluster = require('cluster');
+var common = require('../common');
+var dgram = require('dgram');
+
+
+if (process.platform === 'win32') {
+ console.warn("dgram clustering is currently not supported on windows.");
+ process.exit(0);
+}
+
+if (cluster.isMaster)
+ master();
+else
+ worker();
+
+
+function master() {
+ var listening = 0;
+
+ // Fork 4 workers.
+ for (var i = 0; i < NUM_WORKERS; i++)
+ cluster.fork();
+
+ // Wait until all workers are listening.
+ cluster.on('listening', function() {
+ if (++listening < NUM_WORKERS)
+ return;
+
+ // Start sending messages.
+ var buf = new Buffer('hello world');
+ var socket = dgram.createSocket('udp4');
+ var sent = 0;
+ doSend();
+
+ function doSend() {
+ socket.send(buf, 0, buf.length, common.PORT, '127.0.0.1', afterSend);
+ }
+
+ function afterSend() {
+ sent++;
+ if (sent < NUM_WORKERS * PACKETS_PER_WORKER) {
+ doSend();
+ } else {
+ console.log('master sent %d packets', sent);
+ socket.close();
+ }
+ }
+ });
+
+ // Set up event handlers for every worker. Each worker sends a message when
+ // it has received the expected number of packets. After that it disconnects.
+ for (var key in cluster.workers) {
+ if (cluster.workers.hasOwnProperty(key))
+ setupWorker(cluster.workers[key]);
+ }
+
+ function setupWorker(worker) {
+ var received = 0;
+
+ worker.on('message', function(msg) {
+ received = msg.received;
+ console.log('worker %d received %d packets', worker.id, received);
+ });
+
+ worker.on('disconnect', function() {
+ assert(received === PACKETS_PER_WORKER);
+ console.log('worker %d disconnected', worker.id);
+ });
+ }
+}
+
+
+function worker() {
+ var received = 0;
+
+ // Create udp socket and start listening.
+ var socket = dgram.createSocket('udp4');
+
+ socket.on('message', function(data, info) {
+ received++;
+
+ // Every 10 messages, notify the master.
+ if (received == PACKETS_PER_WORKER) {
+ process.send({received: received});
+ process.disconnect();
+ }
+ });
+
+ socket.bind(common.PORT);
+}
diff --git a/test/simple/test-cluster-dgram-2.js b/test/simple/test-cluster-dgram-2.js
new file mode 100644
index 0000000000..d06ad7935a
--- /dev/null
+++ b/test/simple/test-cluster-dgram-2.js
@@ -0,0 +1,81 @@
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following condonitions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+var NUM_WORKERS = 4;
+var PACKETS_PER_WORKER = 10;
+
+var assert = require('assert');
+var cluster = require('cluster');
+var common = require('../common');
+var dgram = require('dgram');
+
+
+if (process.platform === 'win32') {
+ console.warn("dgram clustering is currently not supported on windows.");
+ process.exit(0);
+}
+
+if (cluster.isMaster)
+ master();
+else
+ worker();
+
+
+function master() {
+ var i;
+ var received = 0;
+
+ // Start listening on a socket.
+ var socket = dgram.createSocket('udp4');
+ socket.bind(common.PORT);
+
+ // Disconnect workers when the expected number of messages have been
+ // received.
+ socket.on('message', function(data, info) {
+ received++;
+
+ if (received == PACKETS_PER_WORKER * NUM_WORKERS) {
+ console.log('master received %d packets', received);
+
+ // Close the socket.
+ socket.close();
+
+ // Disconnect all workers.
+ cluster.disconnect();
+ }
+ });
+
+ // Fork workers.
+ for (var i = 0; i < NUM_WORKERS; i++)
+ cluster.fork();
+}
+
+
+function worker() {
+ // Create udp socket and send packets to master.
+ var socket = dgram.createSocket('udp4');
+ var buf = new Buffer('hello world');
+
+ for (var i = 0; i < PACKETS_PER_WORKER; i++)
+ socket.send(buf, 0, buf.length, common.PORT, '127.0.0.1');
+
+ console.log('worker %d sent %d packets', cluster.worker.id, PACKETS_PER_WORKER);
+}