diff options
-rw-r--r-- | lib/child_process.js | 32 | ||||
-rw-r--r-- | lib/cluster.js | 13 | ||||
-rw-r--r-- | lib/dgram.js | 90 | ||||
-rw-r--r-- | src/handle_wrap.h | 2 | ||||
-rw-r--r-- | src/stream_wrap.cc | 54 | ||||
-rw-r--r-- | src/udp_wrap.cc | 17 | ||||
-rw-r--r-- | src/udp_wrap.h | 1 | ||||
-rw-r--r-- | test/simple/test-cluster-dgram-1.js | 115 | ||||
-rw-r--r-- | test/simple/test-cluster-dgram-2.js | 81 |
9 files changed, 356 insertions, 49 deletions
diff --git a/lib/child_process.js b/lib/child_process.js index 1ac2b9169f..ef8c800926 100644 --- a/lib/child_process.js +++ b/lib/child_process.js @@ -155,6 +155,18 @@ var handleConversion = { emit(socket); } + }, + + 'dgram.Native': { + simultaneousAccepts: false, + + send: function(message, handle) { + return handle; + }, + + got: function(message, handle, emit) { + emit(handle); + } } }; @@ -355,18 +367,20 @@ function setupChannel(target, channel) { // this message will be handled by an internalMessage event handler message = { cmd: 'NODE_HANDLE', - type: 'net.', msg: message }; - switch (handle.constructor.name) { - case 'Socket': - message.type += 'Socket'; break; - case 'Server': - message.type += 'Server'; break; - case 'Pipe': - case 'TCP': - message.type += 'Native'; break; + if (handle instanceof net.Socket) { + message.type = 'net.Socket'; + } else if (handle instanceof net.Server) { + message.type = 'net.Server'; + } else if (handle instanceof process.binding('tcp_wrap').TCP || + handle instanceof process.binding('pipe_wrap').Pipe) { + message.type = 'net.Native'; + } else if (handle instanceof process.binding('udp_wrap').UDP) { + message.type = 'dgram.Native'; + } else { + throw new TypeError("This handle type can't be sent"); } var obj = handleConversion[message.type]; diff --git a/lib/cluster.js b/lib/cluster.js index b2e7625ade..6c0f8f469d 100644 --- a/lib/cluster.js +++ b/lib/cluster.js @@ -227,8 +227,14 @@ if (cluster.isMaster) { if (serverHandlers.hasOwnProperty(key)) { handler = serverHandlers[key]; + } else if (message.addressType === 'udp4' || + message.addressType === 'udp6') { + var dgram = require('dgram'); + handler = dgram._createSocketHandle.apply(net, args); + serverHandlers[key] = handler; } else { - handler = serverHandlers[key] = net._createServerHandle.apply(net, args); + handler = net._createServerHandle.apply(net, args); + serverHandlers[key] = handler; } // echo callback with the fd handler associated with it @@ -259,9 +265,9 @@ if (cluster.isMaster) { messageHandler.suicide = function(message, worker) { worker.suicide = true; }; - } + // Messages to a worker will be handled using these methods else if (cluster.isWorker) { @@ -541,7 +547,8 @@ cluster._setupWorker = function() { sendInternalMessage(worker, { cmd: 'online' }); }; -// Internal function. Called by lib/net.js when attempting to bind a server. +// Internal function. Called by net.js and dgram.js when attempting to bind a +// TCP server or UDP socket. cluster._getServer = function(tcpSelf, address, port, addressType, fd, cb) { // This can only be called from a worker. assert(cluster.isWorker); diff --git a/lib/dgram.js b/lib/dgram.js index 65e080f81a..242c197515 100644 --- a/lib/dgram.js +++ b/lib/dgram.js @@ -19,6 +19,7 @@ // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE // USE OR OTHER DEALINGS IN THE SOFTWARE. +var assert = require('assert'); var util = require('util'); var events = require('events'); @@ -29,6 +30,7 @@ var BIND_STATE_BINDING = 1; var BIND_STATE_BOUND = 2; // lazily loaded +var cluster = null; var dns = null; var net = null; @@ -86,6 +88,24 @@ function newHandle(type) { } +exports._createSocketHandle = function(address, port, addressType, fd) { + // Opening an existing fd is not supported for UDP handles. + assert(typeof fd !== 'number' || fd < 0); + + var handle = newHandle(addressType); + + if (port || address) { + var r = handle.bind(address, port || 0, 0); + if (r == -1) { + handle.close(); + handle = null; + } + } + + return handle; +}; + + function Socket(type, listener) { events.EventEmitter.call(this); @@ -110,41 +130,75 @@ exports.createSocket = function(type, listener) { }; +function startListening(socket) { + socket._handle.onmessage = onMessage; + // Todo: handle errors + socket._handle.recvStart(); + socket._receiving = true; + socket._bindState = BIND_STATE_BOUND; + socket.fd = -42; // compatibility hack + + socket.emit('listening'); +} + + Socket.prototype.bind = function(port, address, callback) { var self = this; self._healthCheck(); + if (this._bindState != BIND_STATE_UNBOUND) + throw new Error('Socket is already bound'); + + this._bindState = BIND_STATE_BINDING; + if (typeof callback === 'function') self.once('listening', callback); // resolve address first self._handle.lookup(address, function(err, ip) { - self._bindState = BIND_STATE_UNBOUND; - - if (!self._handle) - return; // handle has been closed in the mean time - if (err) { + self._bindState = BIND_STATE_UNBOUND; self.emit('error', err); return; } - if (self._handle.bind(ip, port || 0, /*flags=*/ 0)) { - self.emit('error', errnoException(errno, 'bind')); - return; - } - - self._handle.onmessage = onMessage; - self._handle.recvStart(); - self._receiving = true; - self._bindState = BIND_STATE_BOUND; - self.fd = -42; // compatibility hack + if (!cluster) + cluster = require('cluster'); + + if (cluster.isWorker) { + cluster._getServer(self, ip, port, self.type, -1, function(handle) { + if (!self._handle) + // handle has been closed in the mean time. + return handle.close(); + + // Set up the handle that we got from master. + handle.lookup = self._handle.lookup; + handle.bind = self._handle.bind; + handle.send = self._handle.send; + handle.owner = self; + + // Replace the existing handle by the handle we got from master. + self._handle.close(); + self._handle = handle; + + startListening(self); + }); + + } else { + if (!self._handle) + return; // handle has been closed in the mean time + + if (self._handle.bind(ip, port || 0, /*flags=*/ 0)) { + self.emit('error', errnoException(errno, 'bind')); + self._bindState = BIND_STATE_UNBOUND; + // Todo: close? + return; + } - self.emit('listening'); + startListening(self); + } }); - - self._bindState = BIND_STATE_BINDING; }; diff --git a/src/handle_wrap.h b/src/handle_wrap.h index a358e812a5..951b9386f5 100644 --- a/src/handle_wrap.h +++ b/src/handle_wrap.h @@ -53,6 +53,8 @@ class HandleWrap { static v8::Handle<v8::Value> Ref(const v8::Arguments& args); static v8::Handle<v8::Value> Unref(const v8::Arguments& args); + inline uv_handle_t* GetHandle() { return handle__; }; + protected: HandleWrap(v8::Handle<v8::Object> object, uv_handle_t* handle); virtual ~HandleWrap(); diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc index 910e94b2ed..e6756e1195 100644 --- a/src/stream_wrap.cc +++ b/src/stream_wrap.cc @@ -27,6 +27,7 @@ #include "pipe_wrap.h" #include "tcp_wrap.h" #include "req_wrap.h" +#include "udp_wrap.h" #include "node_counters.h" #include <stdlib.h> // abort() @@ -118,7 +119,7 @@ StreamWrap::StreamWrap(Handle<Object> object, uv_stream_t* stream) void StreamWrap::SetHandle(uv_handle_t* h) { HandleWrap::SetHandle(h); - stream_ = (uv_stream_t*)h; + stream_ = reinterpret_cast<uv_stream_t*>(h); stream_->data = this; } @@ -173,6 +174,28 @@ uv_buf_t StreamWrap::OnAlloc(uv_handle_t* handle, size_t suggested_size) { } +template <class WrapType, class UVType> +static Local<Object> AcceptHandle(uv_stream_t* pipe) { + HandleScope scope; + Local<Object> wrap_obj; + WrapType* wrap; + UVType* handle; + + wrap_obj = WrapType::Instantiate(); + if (wrap_obj.IsEmpty()) + return Local<Object>(); + + wrap = static_cast<WrapType*>( + wrap_obj->GetAlignedPointerFromInternalField(0)); + handle = wrap->UVHandle(); + + if (uv_accept(pipe, reinterpret_cast<uv_stream_t*>(handle))) + abort(); + + return scope.Close(wrap_obj); +} + + void StreamWrap::OnReadCommon(uv_stream_t* handle, ssize_t nread, uv_buf_t buf, uv_handle_type pending) { HandleScope scope; @@ -212,19 +235,16 @@ void StreamWrap::OnReadCommon(uv_stream_t* handle, ssize_t nread, Local<Object> pending_obj; if (pending == UV_TCP) { - pending_obj = TCPWrap::Instantiate(); + pending_obj = AcceptHandle<TCPWrap, uv_tcp_t>(handle); } else if (pending == UV_NAMED_PIPE) { - pending_obj = PipeWrap::Instantiate(); + pending_obj = AcceptHandle<PipeWrap, uv_pipe_t>(handle); + } else if (pending == UV_UDP) { + pending_obj = AcceptHandle<UDPWrap, uv_udp_t>(handle); } else { - // We only support sending UV_TCP and UV_NAMED_PIPE right now. assert(pending == UV_UNKNOWN_HANDLE); } if (!pending_obj.IsEmpty()) { - assert(pending_obj->InternalFieldCount() > 0); - StreamWrap* pending_wrap = static_cast<StreamWrap*>( - pending_obj->GetAlignedPointerFromInternalField(0)); - if (uv_accept(handle, pending_wrap->GetStream())) abort(); argv[3] = pending_obj; argc++; } @@ -246,7 +266,7 @@ void StreamWrap::OnRead(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) { void StreamWrap::OnRead2(uv_pipe_t* handle, ssize_t nread, uv_buf_t buf, uv_handle_type pending) { - OnReadCommon((uv_stream_t*)handle, nread, buf, pending); + OnReadCommon(reinterpret_cast<uv_stream_t*>(handle), nread, buf, pending); } @@ -404,14 +424,14 @@ Handle<Value> StreamWrap::WriteStringImpl(const Arguments& args) { StreamWrap::AfterWrite); } else { - uv_stream_t* send_stream = NULL; + uv_handle_t* send_handle = NULL; if (args[1]->IsObject()) { - Local<Object> send_stream_obj = args[1]->ToObject(); - assert(send_stream_obj->InternalFieldCount() > 0); - StreamWrap* send_stream_wrap = static_cast<StreamWrap*>( - send_stream_obj->GetAlignedPointerFromInternalField(0)); - send_stream = send_stream_wrap->GetStream(); + Local<Object> send_handle_obj = args[1]->ToObject(); + assert(send_handle_obj->InternalFieldCount() > 0); + HandleWrap* send_handle_wrap = static_cast<HandleWrap*>( + send_handle_obj->GetAlignedPointerFromInternalField(0)); + send_handle = send_handle_wrap->GetHandle(); // Reference StreamWrap instance to prevent it from being garbage // collected before `AfterWrite` is called. @@ -419,14 +439,14 @@ Handle<Value> StreamWrap::WriteStringImpl(const Arguments& args) { handle_sym = NODE_PSYMBOL("handle"); } assert(!req_wrap->object_.IsEmpty()); - req_wrap->object_->Set(handle_sym, send_stream_obj); + req_wrap->object_->Set(handle_sym, send_handle_obj); } r = uv_write2(&req_wrap->req_, wrap->stream_, &buf, 1, - send_stream, + reinterpret_cast<uv_stream_t*>(send_handle), StreamWrap::AfterWrite); } diff --git a/src/udp_wrap.cc b/src/udp_wrap.cc index d4761493cb..6d01ebd6f3 100644 --- a/src/udp_wrap.cc +++ b/src/udp_wrap.cc @@ -40,6 +40,7 @@ typedef ReqWrap<uv_udp_send_t> SendWrap; // see tcp_wrap.cc Local<Object> AddressToJS(const sockaddr* addr); +static Persistent<Function> constructor; static Persistent<String> buffer_sym; static Persistent<String> oncomplete_sym; static Persistent<String> onmessage_sym; @@ -98,8 +99,9 @@ void UDPWrap::Initialize(Handle<Object> target) { NODE_SET_PROTOTYPE_METHOD(t, "ref", HandleWrap::Ref); NODE_SET_PROTOTYPE_METHOD(t, "unref", HandleWrap::Unref); - target->Set(String::NewSymbol("UDP"), - Persistent<FunctionTemplate>::New(t)->GetFunction()); + constructor = Persistent<Function>::New( + Persistent<FunctionTemplate>::New(t)->GetFunction()); + target->Set(String::NewSymbol("UDP"), constructor); } @@ -392,6 +394,17 @@ UDPWrap* UDPWrap::Unwrap(Local<Object> obj) { } +Local<Object> UDPWrap::Instantiate() { + // If this assert fires then Initialize hasn't been called yet. + assert(constructor.IsEmpty() == false); + + HandleScope scope; + Local<Object> obj = constructor->NewInstance(); + + return scope.Close(obj); +} + + uv_udp_t* UDPWrap::UVHandle() { return &handle_; } diff --git a/src/udp_wrap.h b/src/udp_wrap.h index 9ca2eaea90..527346fccd 100644 --- a/src/udp_wrap.h +++ b/src/udp_wrap.h @@ -33,6 +33,7 @@ class UDPWrap: public HandleWrap { static Handle<Value> SetTTL(const Arguments& args); static UDPWrap* Unwrap(Local<Object> obj); + static Local<Object> Instantiate(); uv_udp_t* UVHandle(); private: diff --git a/test/simple/test-cluster-dgram-1.js b/test/simple/test-cluster-dgram-1.js new file mode 100644 index 0000000000..c6dc095d08 --- /dev/null +++ b/test/simple/test-cluster-dgram-1.js @@ -0,0 +1,115 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +var NUM_WORKERS = 4; +var PACKETS_PER_WORKER = 10; + +var assert = require('assert'); +var cluster = require('cluster'); +var common = require('../common'); +var dgram = require('dgram'); + + +if (process.platform === 'win32') { + console.warn("dgram clustering is currently not supported on windows."); + process.exit(0); +} + +if (cluster.isMaster) + master(); +else + worker(); + + +function master() { + var listening = 0; + + // Fork 4 workers. + for (var i = 0; i < NUM_WORKERS; i++) + cluster.fork(); + + // Wait until all workers are listening. + cluster.on('listening', function() { + if (++listening < NUM_WORKERS) + return; + + // Start sending messages. + var buf = new Buffer('hello world'); + var socket = dgram.createSocket('udp4'); + var sent = 0; + doSend(); + + function doSend() { + socket.send(buf, 0, buf.length, common.PORT, '127.0.0.1', afterSend); + } + + function afterSend() { + sent++; + if (sent < NUM_WORKERS * PACKETS_PER_WORKER) { + doSend(); + } else { + console.log('master sent %d packets', sent); + socket.close(); + } + } + }); + + // Set up event handlers for every worker. Each worker sends a message when + // it has received the expected number of packets. After that it disconnects. + for (var key in cluster.workers) { + if (cluster.workers.hasOwnProperty(key)) + setupWorker(cluster.workers[key]); + } + + function setupWorker(worker) { + var received = 0; + + worker.on('message', function(msg) { + received = msg.received; + console.log('worker %d received %d packets', worker.id, received); + }); + + worker.on('disconnect', function() { + assert(received === PACKETS_PER_WORKER); + console.log('worker %d disconnected', worker.id); + }); + } +} + + +function worker() { + var received = 0; + + // Create udp socket and start listening. + var socket = dgram.createSocket('udp4'); + + socket.on('message', function(data, info) { + received++; + + // Every 10 messages, notify the master. + if (received == PACKETS_PER_WORKER) { + process.send({received: received}); + process.disconnect(); + } + }); + + socket.bind(common.PORT); +} diff --git a/test/simple/test-cluster-dgram-2.js b/test/simple/test-cluster-dgram-2.js new file mode 100644 index 0000000000..d06ad7935a --- /dev/null +++ b/test/simple/test-cluster-dgram-2.js @@ -0,0 +1,81 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following condonitions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +var NUM_WORKERS = 4; +var PACKETS_PER_WORKER = 10; + +var assert = require('assert'); +var cluster = require('cluster'); +var common = require('../common'); +var dgram = require('dgram'); + + +if (process.platform === 'win32') { + console.warn("dgram clustering is currently not supported on windows."); + process.exit(0); +} + +if (cluster.isMaster) + master(); +else + worker(); + + +function master() { + var i; + var received = 0; + + // Start listening on a socket. + var socket = dgram.createSocket('udp4'); + socket.bind(common.PORT); + + // Disconnect workers when the expected number of messages have been + // received. + socket.on('message', function(data, info) { + received++; + + if (received == PACKETS_PER_WORKER * NUM_WORKERS) { + console.log('master received %d packets', received); + + // Close the socket. + socket.close(); + + // Disconnect all workers. + cluster.disconnect(); + } + }); + + // Fork workers. + for (var i = 0; i < NUM_WORKERS; i++) + cluster.fork(); +} + + +function worker() { + // Create udp socket and send packets to master. + var socket = dgram.createSocket('udp4'); + var buf = new Buffer('hello world'); + + for (var i = 0; i < PACKETS_PER_WORKER; i++) + socket.send(buf, 0, buf.length, common.PORT, '127.0.0.1'); + + console.log('worker %d sent %d packets', cluster.worker.id, PACKETS_PER_WORKER); +} |