summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/api/dgram.md6
-rw-r--r--lib/dgram.js91
-rw-r--r--lib/internal/dgram.js39
-rw-r--r--src/udp_wrap.cc13
-rw-r--r--src/udp_wrap.h1
-rw-r--r--test/parallel/test-cluster-dgram-bind-fd.js108
-rw-r--r--test/parallel/test-dgram-bind-fd-error.js55
-rw-r--r--test/parallel/test-dgram-bind-fd.js118
-rw-r--r--test/parallel/test-dgram-create-socket-handle-fd.js42
-rw-r--r--test/parallel/test-dgram-create-socket-handle.js8
10 files changed, 442 insertions, 39 deletions
diff --git a/doc/api/dgram.md b/doc/api/dgram.md
index 50fd5db5a6..b9b43fcb21 100644
--- a/doc/api/dgram.md
+++ b/doc/api/dgram.md
@@ -166,6 +166,7 @@ added: v0.11.14
* `port` {integer}
* `address` {string}
* `exclusive` {boolean}
+ * `fd` {integer}
* `callback` {Function}
For UDP sockets, causes the `dgram.Socket` to listen for datagram
@@ -177,6 +178,11 @@ system will attempt to listen on all addresses. Once binding is
complete, a `'listening'` event is emitted and the optional `callback`
function is called.
+The `options` object may contain a `fd` property. When a `fd` greater
+than `0` is set, it will wrap around an existing socket with the given
+file descriptor. In this case, the properties of `port` and `address`
+will be ignored.
+
Note that specifying both a `'listening'` event listener and passing a
`callback` to the `socket.bind()` method is not harmful but not very
useful.
diff --git a/lib/dgram.js b/lib/dgram.js
index 8ddd523237..c541373fe2 100644
--- a/lib/dgram.js
+++ b/lib/dgram.js
@@ -25,7 +25,8 @@ const errors = require('internal/errors');
const {
kStateSymbol,
_createSocketHandle,
- newHandle
+ newHandle,
+ guessHandleType,
} = require('internal/dgram');
const {
ERR_INVALID_ARG_TYPE,
@@ -35,7 +36,8 @@ const {
ERR_SOCKET_BAD_PORT,
ERR_SOCKET_BUFFER_SIZE,
ERR_SOCKET_CANNOT_SEND,
- ERR_SOCKET_DGRAM_NOT_RUNNING
+ ERR_SOCKET_DGRAM_NOT_RUNNING,
+ ERR_INVALID_FD_TYPE
} = errors.codes;
const { Buffer } = require('buffer');
const util = require('util');
@@ -45,6 +47,7 @@ const {
defaultTriggerAsyncIdScope,
symbols: { async_id_symbol, owner_symbol }
} = require('internal/async_hooks');
+const { isInt32 } = require('internal/validators');
const { UV_UDP_REUSEADDR } = process.binding('constants').os;
const { UDP, SendWrap } = process.binding('udp_wrap');
@@ -151,6 +154,28 @@ function bufferSize(self, size, buffer) {
return ret;
}
+// Query master process to get the server handle and utilize it.
+function bindServerHandle(self, options, errCb) {
+ if (!cluster)
+ cluster = require('cluster');
+
+ const state = self[kStateSymbol];
+ cluster._getServer(self, options, (err, handle) => {
+ if (err) {
+ errCb(err);
+ return;
+ }
+
+ if (!state.handle) {
+ // Handle has been closed in the mean time.
+ return handle.close();
+ }
+
+ replaceHandle(self, handle);
+ startListening(self);
+ });
+}
+
Socket.prototype.bind = function(port_, address_ /* , callback */) {
let port = port_;
@@ -171,6 +196,44 @@ Socket.prototype.bind = function(port_, address_ /* , callback */) {
return this;
}
+ // Open an existing fd instead of creating a new one.
+ if (port !== null && typeof port === 'object' &&
+ isInt32(port.fd) && port.fd > 0) {
+ const fd = port.fd;
+ const exclusive = !!port.exclusive;
+ const state = this[kStateSymbol];
+
+ if (!cluster)
+ cluster = require('cluster');
+
+ if (cluster.isWorker && !exclusive) {
+ bindServerHandle(this, {
+ address: null,
+ port: null,
+ addressType: this.type,
+ fd,
+ flags: null
+ }, (err) => {
+ // Callback to handle error.
+ const ex = errnoException(err, 'open');
+ this.emit('error', ex);
+ state.bindState = BIND_STATE_UNBOUND;
+ });
+ return this;
+ }
+
+ const type = guessHandleType(fd);
+ if (type !== 'UDP')
+ throw new ERR_INVALID_FD_TYPE(type);
+ const err = state.handle.open(fd);
+
+ if (err)
+ throw errnoException(err, 'open');
+
+ startListening(this);
+ return this;
+ }
+
var address;
var exclusive;
@@ -207,28 +270,18 @@ Socket.prototype.bind = function(port_, address_ /* , callback */) {
flags |= UV_UDP_REUSEADDR;
if (cluster.isWorker && !exclusive) {
- const onHandle = (err, handle) => {
- if (err) {
- var ex = exceptionWithHostPort(err, 'bind', ip, port);
- this.emit('error', ex);
- state.bindState = BIND_STATE_UNBOUND;
- return;
- }
-
- if (!state.handle)
- // handle has been closed in the mean time.
- return handle.close();
-
- replaceHandle(this, handle);
- startListening(this);
- };
- cluster._getServer(this, {
+ bindServerHandle(this, {
address: ip,
port: port,
addressType: this.type,
fd: -1,
flags: flags
- }, onHandle);
+ }, (err) => {
+ // Callback to handle error.
+ const ex = exceptionWithHostPort(err, 'bind', ip, port);
+ this.emit('error', ex);
+ state.bindState = BIND_STATE_UNBOUND;
+ });
} else {
if (!state.handle)
return; // handle has been closed in the mean time
diff --git a/lib/internal/dgram.js b/lib/internal/dgram.js
index 82b65294c2..e9b5364c8e 100644
--- a/lib/internal/dgram.js
+++ b/lib/internal/dgram.js
@@ -1,7 +1,9 @@
'use strict';
-const assert = require('assert');
const { codes } = require('internal/errors');
const { UDP } = process.binding('udp_wrap');
+const { isInt32 } = require('internal/validators');
+const TTYWrap = process.binding('tty_wrap');
+const { UV_EINVAL } = process.binding('uv');
const { ERR_INVALID_ARG_TYPE, ERR_SOCKET_BAD_TYPE } = codes;
const kStateSymbol = Symbol('state symbol');
let dns; // Lazy load for startup performance.
@@ -17,6 +19,9 @@ function lookup6(lookup, address, callback) {
}
+const guessHandleType = TTYWrap.guessHandleType;
+
+
function newHandle(type, lookup) {
if (lookup === undefined) {
if (dns === undefined) {
@@ -49,22 +54,32 @@ function newHandle(type, lookup) {
function _createSocketHandle(address, port, addressType, fd, flags) {
- // Opening an existing fd is not supported for UDP handles.
- assert(typeof fd !== 'number' || fd < 0);
-
const handle = newHandle(addressType);
-
- if (port || address) {
- const err = handle.bind(address, port || 0, flags);
-
- if (err) {
- handle.close();
- return err;
+ let err;
+
+ if (isInt32(fd) && fd > 0) {
+ const type = guessHandleType(fd);
+ if (type !== 'UDP') {
+ err = UV_EINVAL;
+ } else {
+ err = handle.open(fd);
}
+ } else if (port || address) {
+ err = handle.bind(address, port || 0, flags);
+ }
+
+ if (err) {
+ handle.close();
+ return err;
}
return handle;
}
-module.exports = { kStateSymbol, _createSocketHandle, newHandle };
+module.exports = {
+ kStateSymbol,
+ _createSocketHandle,
+ newHandle,
+ guessHandleType,
+};
diff --git a/src/udp_wrap.cc b/src/udp_wrap.cc
index 2ef5c61358..9bcf6ceb7b 100644
--- a/src/udp_wrap.cc
+++ b/src/udp_wrap.cc
@@ -117,6 +117,7 @@ void UDPWrap::Initialize(Local<Object> target,
Local<FunctionTemplate>(),
attributes);
+ env->SetProtoMethod(t, "open", Open);
env->SetProtoMethod(t, "bind", Bind);
env->SetProtoMethod(t, "send", Send);
env->SetProtoMethod(t, "bind6", Bind6);
@@ -206,6 +207,18 @@ void UDPWrap::DoBind(const FunctionCallbackInfo<Value>& args, int family) {
}
+void UDPWrap::Open(const FunctionCallbackInfo<Value>& args) {
+ UDPWrap* wrap;
+ ASSIGN_OR_RETURN_UNWRAP(&wrap,
+ args.Holder(),
+ args.GetReturnValue().Set(UV_EBADF));
+ int fd = static_cast<int>(args[0]->IntegerValue());
+ int err = uv_udp_open(&wrap->handle_, fd);
+
+ args.GetReturnValue().Set(err);
+}
+
+
void UDPWrap::Bind(const FunctionCallbackInfo<Value>& args) {
DoBind(args, AF_INET);
}
diff --git a/src/udp_wrap.h b/src/udp_wrap.h
index ca048f5aef..b5d2824896 100644
--- a/src/udp_wrap.h
+++ b/src/udp_wrap.h
@@ -42,6 +42,7 @@ class UDPWrap: public HandleWrap {
v8::Local<v8::Context> context);
static void GetFD(const v8::FunctionCallbackInfo<v8::Value>& args);
static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
+ static void Open(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Bind(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Send(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Bind6(const v8::FunctionCallbackInfo<v8::Value>& args);
diff --git a/test/parallel/test-cluster-dgram-bind-fd.js b/test/parallel/test-cluster-dgram-bind-fd.js
new file mode 100644
index 0000000000..429f932608
--- /dev/null
+++ b/test/parallel/test-cluster-dgram-bind-fd.js
@@ -0,0 +1,108 @@
+'use strict';
+const common = require('../common');
+if (common.isWindows)
+ common.skip('dgram clustering is currently not supported on Windows.');
+
+const NUM_WORKERS = 4;
+const PACKETS_PER_WORKER = 10;
+
+const assert = require('assert');
+const cluster = require('cluster');
+const dgram = require('dgram');
+const { UDP } = process.binding('udp_wrap');
+
+if (cluster.isMaster)
+ master();
+else
+ worker();
+
+
+function master() {
+ // Create a handle and use its fd.
+ const rawHandle = new UDP();
+ const err = rawHandle.bind(common.localhostIPv4, 0, 0);
+ assert(err >= 0, String(err));
+ assert.notStrictEqual(rawHandle.fd, -1);
+
+ const fd = rawHandle.fd;
+
+ let listening = 0;
+
+ // Fork 4 workers.
+ for (let i = 0; i < NUM_WORKERS; i++)
+ cluster.fork();
+
+ // Wait until all workers are listening.
+ cluster.on('listening', common.mustCall((worker, address) => {
+ if (++listening < NUM_WORKERS)
+ return;
+
+ // Start sending messages.
+ const buf = Buffer.from('hello world');
+ const socket = dgram.createSocket('udp4');
+ let sent = 0;
+ doSend();
+
+ function doSend() {
+ socket.send(buf, 0, buf.length, address.port, address.address, afterSend);
+ }
+
+ function afterSend() {
+ sent++;
+ if (sent < NUM_WORKERS * PACKETS_PER_WORKER) {
+ doSend();
+ } else {
+ socket.close();
+ }
+ }
+ }, NUM_WORKERS));
+
+ // Set up event handlers for every worker. Each worker sends a message when
+ // it has received the expected number of packets. After that it disconnects.
+ for (const key in cluster.workers) {
+ if (cluster.workers.hasOwnProperty(key))
+ setupWorker(cluster.workers[key]);
+ }
+
+ function setupWorker(worker) {
+ let received = 0;
+
+ worker.send({
+ fd,
+ });
+
+ worker.on('message', common.mustCall((msg) => {
+ received = msg.received;
+ worker.disconnect();
+ }));
+
+ worker.on('exit', common.mustCall(() => {
+ assert.strictEqual(received, PACKETS_PER_WORKER);
+ }));
+ }
+}
+
+
+function worker() {
+ let received = 0;
+
+ process.on('message', common.mustCall((data) => {
+ const { fd } = data;
+ // Create udp socket and start listening.
+ const socket = dgram.createSocket('udp4');
+
+ socket.on('message', common.mustCall((data, info) => {
+ received++;
+
+ // Every 10 messages, notify the master.
+ if (received === PACKETS_PER_WORKER) {
+ process.send({ received });
+ socket.close();
+ }
+ }, PACKETS_PER_WORKER));
+
+ socket.bind({
+ fd,
+ });
+ }));
+}
diff --git a/test/parallel/test-dgram-bind-fd-error.js b/test/parallel/test-dgram-bind-fd-error.js
new file mode 100644
index 0000000000..efe0e43d7b
--- /dev/null
+++ b/test/parallel/test-dgram-bind-fd-error.js
@@ -0,0 +1,55 @@
+// Flags: --expose-internals
+'use strict';
+const common = require('../common');
+if (common.isWindows)
+ common.skip('Does not support binding fd on Windows');
+
+const dgram = require('dgram');
+const assert = require('assert');
+const { kStateSymbol } = require('internal/dgram');
+const { TCP, constants } = process.binding('tcp_wrap');
+const TYPE = 'udp4';
+
+// Throw when the fd is occupied according to https://github.com/libuv/libuv/pull/1851.
+{
+ const socket = dgram.createSocket(TYPE);
+
+ socket.bind(common.mustCall(() => {
+ const anotherSocket = dgram.createSocket(TYPE);
+ const { handle } = socket[kStateSymbol];
+
+ common.expectsError(() => {
+ anotherSocket.bind({
+ fd: handle.fd,
+ });
+ }, {
+ code: 'EEXIST',
+ type: Error,
+ message: /^open EEXIST$/
+ });
+
+ socket.close();
+ }));
+}
+
+// Throw when the type of fd is not "UDP".
+{
+ const handle = new TCP(constants.SOCKET);
+ handle.listen();
+
+ const fd = handle.fd;
+ assert.notStrictEqual(fd, -1);
+
+ const socket = new dgram.createSocket(TYPE);
+ common.expectsError(() => {
+ socket.bind({
+ fd,
+ });
+ }, {
+ code: 'ERR_INVALID_FD_TYPE',
+ type: TypeError,
+ message: /^Unsupported fd type: TCP$/
+ });
+
+ handle.close();
+}
diff --git a/test/parallel/test-dgram-bind-fd.js b/test/parallel/test-dgram-bind-fd.js
new file mode 100644
index 0000000000..c4a80abb92
--- /dev/null
+++ b/test/parallel/test-dgram-bind-fd.js
@@ -0,0 +1,118 @@
+'use strict';
+const common = require('../common');
+if (common.isWindows)
+ common.skip('Does not support binding fd on Windows');
+
+const assert = require('assert');
+const dgram = require('dgram');
+const { UDP } = process.binding('udp_wrap');
+const { UV_UDP_REUSEADDR } = process.binding('constants').os;
+
+const BUFFER_SIZE = 4096;
+
+// Test binding a fd.
+{
+ function createHandle(reuseAddr, udp4, bindAddress) {
+ let flags = 0;
+ if (reuseAddr)
+ flags |= UV_UDP_REUSEADDR;
+
+ const handle = new UDP();
+ let err = 0;
+
+ if (udp4) {
+ err = handle.bind(bindAddress, 0, flags);
+ } else {
+ err = handle.bind6(bindAddress, 0, flags);
+ }
+ assert(err >= 0, String(err));
+ assert.notStrictEqual(handle.fd, -1);
+ return handle;
+ }
+
+ function testWithOptions(reuseAddr, udp4) {
+ const type = udp4 ? 'udp4' : 'udp6';
+ const bindAddress = udp4 ? common.localhostIPv4 : '::1';
+
+ let fd;
+
+ const receiver = dgram.createSocket({
+ type,
+ });
+
+ receiver.bind({
+ port: 0,
+ address: bindAddress,
+ }, common.mustCall(() => {
+ const { port, address } = receiver.address();
+ // Create a handle to reuse its fd.
+ const handle = createHandle(reuseAddr, udp4, bindAddress);
+
+ fd = handle.fd;
+ assert.notStrictEqual(handle.fd, -1);
+
+ const socket = dgram.createSocket({
+ type,
+ recvBufferSize: BUFFER_SIZE,
+ sendBufferSize: BUFFER_SIZE,
+ });
+
+ socket.bind({
+ port: 0,
+ address: bindAddress,
+ fd,
+ }, common.mustCall(() => {
+ // Test address().
+ const rinfo = {};
+ const err = handle.getsockname(rinfo);
+ assert.strictEqual(err, 0);
+ const socketRInfo = socket.address();
+ assert.strictEqual(rinfo.address, socketRInfo.address);
+ assert.strictEqual(rinfo.port, socketRInfo.port);
+
+ // Test buffer size.
+ const recvBufferSize = socket.getRecvBufferSize();
+ const sendBufferSize = socket.getSendBufferSize();
+
+ // note: linux will double the buffer size
+ const expectedBufferSize = common.isLinux ?
+ BUFFER_SIZE * 2 : BUFFER_SIZE;
+ assert.strictEqual(recvBufferSize, expectedBufferSize);
+ assert.strictEqual(sendBufferSize, expectedBufferSize);
+
+ socket.send(String(fd), port, address);
+ }));
+
+ socket.on('message', common.mustCall((data) => {
+ assert.strictEqual(data.toString('utf8'), String(fd));
+ socket.close();
+ }));
+
+ socket.on('error', (err) => {
+ console.error(err.message);
+ assert.fail(err.message);
+ });
+
+ socket.on('close', common.mustCall(() => {}));
+ }));
+
+ receiver.on('message', common.mustCall((data, { address, port }) => {
+ assert.strictEqual(data.toString('utf8'), String(fd));
+ receiver.send(String(fd), port, address);
+ process.nextTick(() => receiver.close());
+ }));
+
+ receiver.on('error', (err) => {
+ console.error(err.message);
+ assert.fail(err.message);
+ });
+
+ receiver.on('close', common.mustCall(() => {}));
+ }
+
+ testWithOptions(true, true);
+ testWithOptions(false, true);
+ if (common.hasIPv6) {
+ testWithOptions(false, false);
+ }
+}
diff --git a/test/parallel/test-dgram-create-socket-handle-fd.js b/test/parallel/test-dgram-create-socket-handle-fd.js
new file mode 100644
index 0000000000..ff507b6ec5
--- /dev/null
+++ b/test/parallel/test-dgram-create-socket-handle-fd.js
@@ -0,0 +1,42 @@
+'use strict';
+const common = require('../common');
+if (common.isWindows)
+ common.skip('Does not support binding fd on Windows');
+
+const assert = require('assert');
+const dgram = require('dgram');
+const { UDP } = process.binding('udp_wrap');
+const { TCP, constants } = process.binding('tcp_wrap');
+const _createSocketHandle = dgram._createSocketHandle;
+
+// Return a negative number if the "existing fd" is invalid.
+{
+ const err = _createSocketHandle(common.localhostIPv4, 0, 'udp4', 42);
+ assert(err < 0);
+}
+
+// Return a negative number if the type of fd is not "UDP".
+{
+ // Create a handle with fd.
+ const rawHandle = new UDP();
+ const err = rawHandle.bind(common.localhostIPv4, 0, 0);
+ assert(err >= 0, String(err));
+ assert.notStrictEqual(rawHandle.fd, -1);
+
+ const handle = _createSocketHandle(null, 0, 'udp4', rawHandle.fd);
+ assert(handle instanceof UDP);
+ assert.strictEqual(typeof handle.fd, 'number');
+ assert(handle.fd > 0);
+}
+
+// Create a bound handle.
+{
+ const rawHandle = new TCP(constants.SOCKET);
+ const err = rawHandle.listen();
+ assert(err >= 0, String(err));
+ assert.notStrictEqual(rawHandle.fd, -1);
+
+ const handle = _createSocketHandle(null, 0, 'udp4', rawHandle.fd);
+ assert(handle < 0);
+ rawHandle.close();
+}
diff --git a/test/parallel/test-dgram-create-socket-handle.js b/test/parallel/test-dgram-create-socket-handle.js
index e49e3e8dd6..3df34a95c2 100644
--- a/test/parallel/test-dgram-create-socket-handle.js
+++ b/test/parallel/test-dgram-create-socket-handle.js
@@ -5,14 +5,6 @@ const assert = require('assert');
const { _createSocketHandle } = require('internal/dgram');
const UDP = process.binding('udp_wrap').UDP;
-// Throws if an "existing fd" is passed in.
-common.expectsError(() => {
- _createSocketHandle(common.localhostIPv4, 0, 'udp4', 42);
-}, {
- code: 'ERR_ASSERTION',
- message: /^false == true$/
-});
-
{
// Create a handle that is not bound.
const handle = _createSocketHandle(null, null, 'udp4');