aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/_tls_legacy.js4
-rw-r--r--lib/_tls_wrap.js363
-rw-r--r--lib/net.js4
-rw-r--r--node.gyp2
-rw-r--r--src/env.h2
-rw-r--r--src/js_stream.cc21
-rw-r--r--src/js_stream.h20
-rw-r--r--src/node_crypto.cc26
-rw-r--r--src/node_wrap.h19
-rw-r--r--src/pipe_wrap.cc21
-rw-r--r--src/stream_base.cc495
-rw-r--r--src/stream_base.h223
-rw-r--r--src/stream_wrap.cc664
-rw-r--r--src/stream_wrap.h166
-rw-r--r--src/tcp_wrap.cc20
-rw-r--r--src/tls_wrap.cc302
-rw-r--r--src/tls_wrap.h73
-rw-r--r--src/tty_wrap.cc18
-rw-r--r--test/parallel/test-tls-client-default-ciphers.js14
-rw-r--r--test/parallel/test-tls-close-notify.js4
-rw-r--r--test/parallel/test-tls-multi-key.js5
21 files changed, 1406 insertions, 1060 deletions
diff --git a/lib/_tls_legacy.js b/lib/_tls_legacy.js
index 4148085503..fc0d115aee 100644
--- a/lib/_tls_legacy.js
+++ b/lib/_tls_legacy.js
@@ -92,11 +92,11 @@ function onCryptoStreamFinish() {
// Generate close notify
// NOTE: first call checks if client has sent us shutdown,
// second call enqueues shutdown into the BIO.
- if (this.pair.ssl.shutdown() !== 1) {
+ if (this.pair.ssl.shutdownSSL() !== 1) {
if (this.pair.ssl && this.pair.ssl.error)
return this.pair.error();
- this.pair.ssl.shutdown();
+ this.pair.ssl.shutdownSSL();
}
if (this.pair.ssl && this.pair.ssl.error)
diff --git a/lib/_tls_wrap.js b/lib/_tls_wrap.js
index fb63667581..10221b99c3 100644
--- a/lib/_tls_wrap.js
+++ b/lib/_tls_wrap.js
@@ -11,14 +11,23 @@ const debug = util.debuglog('tls');
const Timer = process.binding('timer_wrap').Timer;
const tls_wrap = process.binding('tls_wrap');
-// Lazy load
-var tls_legacy;
+// constructor for lazy loading
+function createTCP() {
+ var TCP = process.binding('tcp_wrap').TCP;
+ return new TCP();
+}
+
+// constructor for lazy loading
+function createPipe() {
+ var Pipe = process.binding('pipe_wrap').Pipe;
+ return new Pipe();
+}
function onhandshakestart() {
debug('onhandshakestart');
var self = this;
- var ssl = self.ssl;
+ var ssl = self._handle;
var now = Timer.now();
assert(now >= ssl.lastHandshakeTime);
@@ -63,7 +72,7 @@ function loadSession(self, hello, cb) {
// NOTE: That we have disabled OpenSSL's internal session storage in
// `node_crypto.cc` and hence its safe to rely on getting servername only
// from clienthello or this place.
- var ret = self.ssl.loadSession(session);
+ var ret = self._handle.loadSession(session);
cb(null, ret);
}
@@ -92,9 +101,9 @@ function loadSNI(self, servername, cb) {
// TODO(indutny): eventually disallow raw `SecureContext`
if (context)
- self.ssl.sni_context = context.context || context;
+ self._handle.sni_context = context.context || context;
- cb(null, self.ssl.sni_context);
+ cb(null, self._handle.sni_context);
});
}
@@ -127,7 +136,7 @@ function requestOCSP(self, hello, ctx, cb) {
return cb(err);
if (response)
- self.ssl.setOCSPResponse(response);
+ self._handle.setOCSPResponse(response);
cb(null);
}
}
@@ -161,7 +170,7 @@ function onclienthello(hello) {
if (err)
return self.destroy(err);
- self.ssl.endParser();
+ self._handle.endParser();
});
});
});
@@ -184,7 +193,7 @@ function onnewsession(key, session) {
return;
once = true;
- self.ssl.newSessionDone();
+ self._handle.newSessionDone();
self._newSessionPending = false;
if (self._securePending)
@@ -204,29 +213,12 @@ function onocspresponse(resp) {
*/
function TLSSocket(socket, options) {
- // Disallow wrapping TLSSocket in TLSSocket
- assert(!(socket instanceof TLSSocket));
-
- net.Socket.call(this, {
- handle: socket && socket._handle,
- allowHalfOpen: socket && socket.allowHalfOpen,
- readable: false,
- writable: false
- });
- if (socket) {
- this._parent = socket;
-
- // To prevent assertion in afterConnect()
- this._connecting = socket._connecting;
- }
-
this._tlsOptions = options;
this._secureEstablished = false;
this._securePending = false;
this._newSessionPending = false;
this._controlReleased = false;
this._SNICallback = null;
- this.ssl = null;
this.servername = null;
this.npnProtocol = null;
this.authorized = false;
@@ -236,15 +228,19 @@ function TLSSocket(socket, options) {
// distinguishable from regular ones.
this.encrypted = true;
+ net.Socket.call(this, {
+ handle: this._wrapHandle(socket && socket._handle),
+ allowHalfOpen: socket && socket.allowHalfOpen,
+ readable: false,
+ writable: false
+ });
+
+ // Proxy for API compatibility
+ this.ssl = this._handle;
+
this.on('error', this._tlsError);
- if (!this._handle) {
- this.once('connect', function() {
- this._init(null);
- });
- } else {
- this._init(socket);
- }
+ this._init(socket);
// Make sure to setup all required properties like: `_connecting` before
// starting the flow of the data
@@ -255,23 +251,53 @@ function TLSSocket(socket, options) {
util.inherits(TLSSocket, net.Socket);
exports.TLSSocket = TLSSocket;
-TLSSocket.prototype._init = function(socket) {
- assert(this._handle);
+var proxiedMethods = [
+ 'close', 'ref', 'unref', 'open', 'bind', 'listen', 'connect', 'bind6',
+ 'connect6', 'getsockname', 'getpeername', 'setNoDelay', 'setKeepAlive',
+ 'setSimultaneousAccepts', 'setBlocking',
- // lib/net.js expect this value to be non-zero if write hasn't been flushed
- // immediately
- // TODO(indutny): rewise this solution, it might be 1 before handshake and
- // represent real writeQueueSize during regular writes.
- this._handle.writeQueueSize = 1;
+ // PipeWrap
+ 'setPendingInstances'
+];
+
+TLSSocket.prototype._wrapHandle = function(handle) {
+ var res;
- var self = this;
var options = this._tlsOptions;
+ if (!handle) {
+ handle = options.pipe ? createPipe() : createTCP();
+ handle.owner = this;
+ }
// Wrap socket's handle
var context = options.secureContext ||
options.credentials ||
tls.createSecureContext();
- this.ssl = tls_wrap.wrap(this._handle, context.context, options.isServer);
+ res = tls_wrap.wrap(handle, context.context, options.isServer);
+ res._parent = handle;
+ res._reading = handle._reading;
+
+ // Proxy HandleWrap, PipeWrap and TCPWrap methods
+ proxiedMethods.forEach(function(name) {
+ res[name] = function methodProxy() {
+ return handle[name].apply(handle, arguments);
+ };
+ });
+
+ return res;
+};
+
+TLSSocket.prototype._init = function(socket) {
+ var self = this;
+ var options = this._tlsOptions;
+ var ssl = this._handle;
+
+ // lib/net.js expect this value to be non-zero if write hasn't been flushed
+ // immediately
+ // TODO(indutny): rewise this solution, it might be 1 before handshake and
+ // represent real writeQueueSize during regular writes.
+ ssl.writeQueueSize = 1;
+
this.server = options.server || null;
// For clients, we will always have either a given ca list or be using
@@ -282,32 +308,32 @@ TLSSocket.prototype._init = function(socket) {
this._requestCert = requestCert;
this._rejectUnauthorized = rejectUnauthorized;
if (requestCert || rejectUnauthorized)
- this.ssl.setVerifyMode(requestCert, rejectUnauthorized);
+ ssl.setVerifyMode(requestCert, rejectUnauthorized);
if (options.isServer) {
- this.ssl.onhandshakestart = onhandshakestart.bind(this);
- this.ssl.onhandshakedone = onhandshakedone.bind(this);
- this.ssl.onclienthello = onclienthello.bind(this);
- this.ssl.onnewsession = onnewsession.bind(this);
- this.ssl.lastHandshakeTime = 0;
- this.ssl.handshakes = 0;
+ ssl.onhandshakestart = onhandshakestart.bind(this);
+ ssl.onhandshakedone = onhandshakedone.bind(this);
+ ssl.onclienthello = onclienthello.bind(this);
+ ssl.onnewsession = onnewsession.bind(this);
+ ssl.lastHandshakeTime = 0;
+ ssl.handshakes = 0;
if (this.server &&
(listenerCount(this.server, 'resumeSession') > 0 ||
listenerCount(this.server, 'newSession') > 0 ||
listenerCount(this.server, 'OCSPRequest') > 0)) {
- this.ssl.enableSessionCallbacks();
+ ssl.enableSessionCallbacks();
}
} else {
- this.ssl.onhandshakestart = function() {};
- this.ssl.onhandshakedone = this._finishInit.bind(this);
- this.ssl.onocspresponse = onocspresponse.bind(this);
+ ssl.onhandshakestart = function() {};
+ ssl.onhandshakedone = this._finishInit.bind(this);
+ ssl.onocspresponse = onocspresponse.bind(this);
if (options.session)
- this.ssl.setSession(options.session);
+ ssl.setSession(options.session);
}
- this.ssl.onerror = function(err) {
+ ssl.onerror = function(err) {
if (self._writableState.errorEmitted)
return;
self._writableState.errorEmitted = true;
@@ -337,11 +363,11 @@ TLSSocket.prototype._init = function(socket) {
options.server._contexts.length)) {
assert(typeof options.SNICallback === 'function');
this._SNICallback = options.SNICallback;
- this.ssl.enableHelloParser();
+ ssl.enableHelloParser();
}
if (process.features.tls_npn && options.NPNProtocols)
- this.ssl.setNPNProtocols(options.NPNProtocols);
+ ssl.setNPNProtocols(options.NPNProtocols);
if (options.handshakeTimeout > 0)
this.setTimeout(options.handshakeTimeout, this._handleTimeout);
@@ -350,8 +376,23 @@ TLSSocket.prototype._init = function(socket) {
if (socket && socket._readableState.length) {
var buf;
while ((buf = socket.read()) !== null)
- this.ssl.receive(buf);
+ ssl.receive(buf);
+ }
+
+ if (socket) {
+ this._parent = socket;
+
+ // To prevent assertion in afterConnect() and properly kick off readStart
+ this._connecting = socket._connecting;
+ socket.once('connect', function() {
+ self._connecting = false;
+ self.emit('connect');
+ });
}
+
+ // Assume `tls.connect()`
+ if (!socket)
+ this._connecting = true;
};
TLSSocket.prototype.renegotiate = function(options, callback) {
@@ -365,11 +406,11 @@ TLSSocket.prototype.renegotiate = function(options, callback) {
if (requestCert !== this._requestCert ||
rejectUnauthorized !== this._rejectUnauthorized) {
- this.ssl.setVerifyMode(requestCert, rejectUnauthorized);
+ this._handle.setVerifyMode(requestCert, rejectUnauthorized);
this._requestCert = requestCert;
this._rejectUnauthorized = rejectUnauthorized;
}
- if (!this.ssl.renegotiate()) {
+ if (!this._handle.renegotiate()) {
if (callback) {
process.nextTick(function() {
callback(new Error('Failed to renegotiate'));
@@ -391,11 +432,11 @@ TLSSocket.prototype.renegotiate = function(options, callback) {
};
TLSSocket.prototype.setMaxSendFragment = function setMaxSendFragment(size) {
- return this.ssl.setMaxSendFragment(size) == 1;
+ return this._handle.setMaxSendFragment(size) == 1;
};
TLSSocket.prototype.getTLSTicket = function getTLSTicket() {
- return this.ssl.getTLSTicket();
+ return this._handle.getTLSTicket();
};
TLSSocket.prototype._handleTimeout = function() {
@@ -424,11 +465,11 @@ TLSSocket.prototype._finishInit = function() {
}
if (process.features.tls_npn) {
- this.npnProtocol = this.ssl.getNegotiatedProtocol();
+ this.npnProtocol = this._handle.getNegotiatedProtocol();
}
if (process.features.tls_sni && this._tlsOptions.isServer) {
- this.servername = this.ssl.getServername();
+ this.servername = this._handle.getServername();
}
debug('secure established');
@@ -439,49 +480,56 @@ TLSSocket.prototype._finishInit = function() {
};
TLSSocket.prototype._start = function() {
+ if (this._connecting) {
+ this.once('connect', function() {
+ this._start();
+ });
+ return;
+ }
+
if (this._tlsOptions.requestOCSP)
- this.ssl.requestOCSP();
- this.ssl.start();
+ this._handle.requestOCSP();
+ this._handle.start();
};
TLSSocket.prototype.setServername = function(name) {
- this.ssl.setServername(name);
+ this._handle.setServername(name);
};
TLSSocket.prototype.setSession = function(session) {
if (typeof session === 'string')
session = new Buffer(session, 'binary');
- this.ssl.setSession(session);
+ this._handle.setSession(session);
};
TLSSocket.prototype.getPeerCertificate = function(detailed) {
- if (this.ssl) {
+ if (this._handle) {
return common.translatePeerCertificate(
- this.ssl.getPeerCertificate(detailed));
+ this._handle.getPeerCertificate(detailed));
}
return null;
};
TLSSocket.prototype.getSession = function() {
- if (this.ssl) {
- return this.ssl.getSession();
+ if (this._handle) {
+ return this._handle.getSession();
}
return null;
};
TLSSocket.prototype.isSessionReused = function() {
- if (this.ssl) {
- return this.ssl.isSessionReused();
+ if (this._handle) {
+ return this._handle.isSessionReused();
}
return null;
};
TLSSocket.prototype.getCipher = function(err) {
- if (this.ssl) {
- return this.ssl.getCurrentCipher();
+ if (this._handle) {
+ return this._handle.getCurrentCipher();
} else {
return null;
}
@@ -620,7 +668,7 @@ function Server(/* [options], listener */) {
socket.on('secure', function() {
if (socket._requestCert) {
- var verifyError = socket.ssl.verifyError();
+ var verifyError = socket._handle.verifyError();
if (verifyError) {
socket.authorizationError = verifyError.code;
@@ -775,28 +823,6 @@ function normalizeConnectArgs(listArgs) {
return (cb) ? [options, cb] : [options];
}
-function legacyConnect(hostname, options, NPN, context) {
- assert(options.socket);
- if (!tls_legacy)
- tls_legacy = require('_tls_legacy');
-
- var pair = tls_legacy.createSecurePair(context,
- false,
- true,
- !!options.rejectUnauthorized,
- {
- NPNProtocols: NPN.NPNProtocols,
- servername: hostname
- });
- tls_legacy.pipe(pair, options.socket);
- pair.cleartext._controlReleased = true;
- pair.on('error', function(err) {
- pair.cleartext.emit('error', err);
- });
-
- return pair;
-}
-
exports.connect = function(/* [port, host], options, cb */) {
var args = normalizeConnectArgs(arguments);
var options = args[0];
@@ -819,51 +845,21 @@ exports.connect = function(/* [port, host], options, cb */) {
context = tls.createSecureContext(options);
tls.convertNPNProtocols(options.NPNProtocols, NPN);
- // Wrapping TLS socket inside another TLS socket was requested -
- // create legacy secure pair
- var socket;
- var legacy;
- var result;
- if (options.socket instanceof TLSSocket) {
- debug('legacy connect');
- legacy = true;
- socket = legacyConnect(hostname, options, NPN, context);
- result = socket.cleartext;
- } else {
- legacy = false;
- socket = new TLSSocket(options.socket, {
- secureContext: context,
- isServer: false,
- requestCert: true,
- rejectUnauthorized: options.rejectUnauthorized,
- session: options.session,
- NPNProtocols: NPN.NPNProtocols,
- requestOCSP: options.requestOCSP
- });
- result = socket;
- }
-
- if (socket._handle && !socket._connecting) {
- onHandle();
- } else {
- // Not even started connecting yet (or probably resolving dns address),
- // catch socket errors and assign handle.
- if (!legacy && options.socket) {
- options.socket.once('connect', function() {
- assert(options.socket._handle);
- socket._handle = options.socket._handle;
- socket._handle.owner = socket;
- socket.emit('connect');
- });
- }
- socket.once('connect', onHandle);
- }
+ var socket = new TLSSocket(options.socket, {
+ pipe: options.path && !options.port,
+ secureContext: context,
+ isServer: false,
+ requestCert: true,
+ rejectUnauthorized: options.rejectUnauthorized,
+ session: options.session,
+ NPNProtocols: NPN.NPNProtocols,
+ requestOCSP: options.requestOCSP
+ });
if (cb)
- result.once('secureConnect', cb);
+ socket.once('secureConnect', cb);
if (!options.socket) {
- assert(!legacy);
var connect_opt;
if (options.path && !options.port) {
connect_opt = { path: options.path };
@@ -874,63 +870,62 @@ exports.connect = function(/* [port, host], options, cb */) {
localAddress: options.localAddress
};
}
- socket.connect(connect_opt);
+ socket.connect(connect_opt, function() {
+ socket._start();
+ });
}
- return result;
+ socket._releaseControl();
- function onHandle() {
- if (!legacy)
- socket._releaseControl();
+ if (options.session)
+ socket.setSession(options.session);
- if (options.session)
- socket.setSession(options.session);
+ if (options.servername)
+ socket.setServername(options.servername);
- if (!legacy) {
- if (options.servername)
- socket.setServername(options.servername);
+ if (options.socket)
+ socket._start();
- socket._start();
- }
- socket.on('secure', function() {
- var verifyError = socket.ssl.verifyError();
+ socket.on('secure', function() {
+ var verifyError = socket._handle.verifyError();
- // Verify that server's identity matches it's certificate's names
- if (!verifyError) {
- var cert = result.getPeerCertificate();
- verifyError = options.checkServerIdentity(hostname, cert);
- }
+ // Verify that server's identity matches it's certificate's names
+ if (!verifyError) {
+ var cert = socket.getPeerCertificate();
+ verifyError = options.checkServerIdentity(hostname, cert);
+ }
- if (verifyError) {
- result.authorized = false;
- result.authorizationError = verifyError.code || verifyError.message;
+ if (verifyError) {
+ socket.authorized = false;
+ socket.authorizationError = verifyError.code || verifyError.message;
- if (options.rejectUnauthorized) {
- result.emit('error', verifyError);
- result.destroy();
- return;
- } else {
- result.emit('secureConnect');
- }
+ if (options.rejectUnauthorized) {
+ socket.emit('error', verifyError);
+ socket.destroy();
+ return;
} else {
- result.authorized = true;
- result.emit('secureConnect');
+ socket.emit('secureConnect');
}
+ } else {
+ socket.authorized = true;
+ socket.emit('secureConnect');
+ }
- // Uncork incoming data
- result.removeListener('end', onHangUp);
- });
+ // Uncork incoming data
+ socket.removeListener('end', onHangUp);
+ });
- function onHangUp() {
- // NOTE: This logic is shared with _http_client.js
- if (!socket._hadError) {
- socket._hadError = true;
- var error = new Error('socket hang up');
- error.code = 'ECONNRESET';
- socket.destroy();
- socket.emit('error', error);
- }
+ function onHangUp() {
+ // NOTE: This logic is shared with _http_client.js
+ if (!socket._hadError) {
+ socket._hadError = true;
+ var error = new Error('socket hang up');
+ error.code = 'ECONNRESET';
+ socket.destroy();
+ socket.emit('error', error);
}
- result.once('end', onHangUp);
}
+ socket.once('end', onHangUp);
+
+ return socket;
};
diff --git a/lib/net.js b/lib/net.js
index 5bf2f292fb..cdabe6e798 100644
--- a/lib/net.js
+++ b/lib/net.js
@@ -961,7 +961,9 @@ function afterConnect(status, handle, req, readable, writable) {
return;
}
- assert(handle === self._handle, 'handle != self._handle');
+ // Update handle if it was wrapped
+ // TODO(indutny): assert that the handle is actually an ancestor of old one
+ handle = self._handle;
debug('afterConnect');
diff --git a/node.gyp b/node.gyp
index 01a67a08c8..996121ee45 100644
--- a/node.gyp
+++ b/node.gyp
@@ -115,6 +115,7 @@
'src/smalloc.cc',
'src/spawn_sync.cc',
'src/string_bytes.cc',
+ 'src/stream_base.cc',
'src/stream_wrap.cc',
'src/tcp_wrap.cc',
'src/timer_wrap.cc',
@@ -151,6 +152,7 @@
'src/req-wrap.h',
'src/req-wrap-inl.h',
'src/string_bytes.h',
+ 'src/stream_base.h',
'src/stream_wrap.h',
'src/tree.h',
'src/util.h',
diff --git a/src/env.h b/src/env.h
index ccacbb09f5..c9b4cc0736 100644
--- a/src/env.h
+++ b/src/env.h
@@ -234,8 +234,10 @@ namespace node {
V(tcp_constructor_template, v8::FunctionTemplate) \
V(tick_callback_function, v8::Function) \
V(tls_wrap_constructor_function, v8::Function) \
+ V(tls_wrap_constructor_template, v8::FunctionTemplate) \
V(tty_constructor_template, v8::FunctionTemplate) \
V(udp_constructor_function, v8::Function) \
+ V(write_wrap_constructor_function, v8::Function) \
class Environment;
diff --git a/src/js_stream.cc b/src/js_stream.cc
new file mode 100644
index 0000000000..3cc3a895fc
--- /dev/null
+++ b/src/js_stream.cc
@@ -0,0 +1,21 @@
+#include "js_stream.h"
+
+#include "async-wrap.h"
+#include "env.h"
+#include "env-inl.h"
+#include "stream_base.h"
+#include "v8.h"
+
+namespace node {
+
+using v8::Context;
+using v8::Handle;
+using v8::Object;
+using v8::Value;
+
+void JSStream::Initialize(Handle<Object> target,
+ Handle<Value> unused,
+ Handle<Context> context) {
+}
+
+} // namespace node
diff --git a/src/js_stream.h b/src/js_stream.h
new file mode 100644
index 0000000000..6a2d3bfb4f
--- /dev/null
+++ b/src/js_stream.h
@@ -0,0 +1,20 @@
+#ifndef SRC_JS_STREAM_H_
+#define SRC_JS_STREAM_H_
+
+#include "async-wrap.h"
+#include "env.h"
+#include "stream_base.h"
+#include "v8.h"
+
+namespace node {
+
+class JSStream : public StreamBase {
+ public:
+ static void Initialize(v8::Handle<v8::Object> target,
+ v8::Handle<v8::Value> unused,
+ v8::Handle<v8::Context> context);
+};
+
+} // namespace node
+
+#endif // SRC_JS_STREAM_H_
diff --git a/src/node_crypto.cc b/src/node_crypto.cc
index 230231080b..912320771e 100644
--- a/src/node_crypto.cc
+++ b/src/node_crypto.cc
@@ -3,7 +3,7 @@
#include "node_crypto.h"
#include "node_crypto_bio.h"
#include "node_crypto_groups.h"
-#include "tls_wrap.h" // TLSCallbacks
+#include "tls_wrap.h" // TLSWrap
#include "async-wrap.h"
#include "async-wrap-inl.h"
@@ -98,28 +98,28 @@ const char* const root_certs[] = {
X509_STORE* root_cert_store;
// Just to generate static methods
-template class SSLWrap<TLSCallbacks>;
-template void SSLWrap<TLSCallbacks>::AddMethods(Environment* env,
- Handle<FunctionTemplate> t);
-template void SSLWrap<TLSCallbacks>::InitNPN(SecureContext* sc);
-template SSL_SESSION* SSLWrap<TLSCallbacks>::GetSessionCallback(
+template class SSLWrap<TLSWrap>;
+template void SSLWrap<TLSWrap>::AddMethods(Environment* env,
+ Handle<FunctionTemplate> t);
+template void SSLWrap<TLSWrap>::InitNPN(SecureContext* sc);
+template SSL_SESSION* SSLWrap<TLSWrap>::GetSessionCallback(
SSL* s,
unsigned char* key,
int len,
int* copy);
-template int SSLWrap<TLSCallbacks>::NewSessionCallback(SSL* s,
- SSL_SESSION* sess);
-template void SSLWrap<TLSCallbacks>::OnClientHello(
+template int SSLWrap<TLSWrap>::NewSessionCallback(SSL* s,
+ SSL_SESSION* sess);
+template void SSLWrap<TLSWrap>::OnClientHello(
void* arg,
const ClientHelloParser::ClientHello& hello);
#ifdef OPENSSL_NPN_NEGOTIATED
-template int SSLWrap<TLSCallbacks>::AdvertiseNextProtoCallback(
+template int SSLWrap<TLSWrap>::AdvertiseNextProtoCallback(
SSL* s,
const unsigned char** data,
unsigned int* len,
void* arg);
-template int SSLWrap<TLSCallbacks>::SelectNextProtoCallback(
+template int SSLWrap<TLSWrap>::SelectNextProtoCallback(
SSL* s,
unsigned char** out,
unsigned char* outlen,
@@ -127,7 +127,7 @@ template int SSLWrap<TLSCallbacks>::SelectNextProtoCallback(
unsigned int inlen,
void* arg);
#endif
-template int SSLWrap<TLSCallbacks>::TLSExtStatusCallback(SSL* s, void* arg);
+template int SSLWrap<TLSWrap>::TLSExtStatusCallback(SSL* s, void* arg);
static void crypto_threadid_cb(CRYPTO_THREADID* tid) {
@@ -973,7 +973,7 @@ void SSLWrap<Base>::AddMethods(Environment* env, Handle<FunctionTemplate> t) {
env->SetProtoMethod(t, "getCurrentCipher", GetCurrentCipher);
env->SetProtoMethod(t, "endParser", EndParser);
env->SetProtoMethod(t, "renegotiate", Renegotiate);
- env->SetProtoMethod(t, "shutdown", Shutdown);
+ env->SetProtoMethod(t, "shutdownSSL", Shutdown);
env->SetProtoMethod(t, "getTLSTicket", GetTLSTicket);
env->SetProtoMethod(t, "newSessionDone", NewSessionDone);
env->SetProtoMethod(t, "setOCSPResponse", SetOCSPResponse);
diff --git a/src/node_wrap.h b/src/node_wrap.h
index 80d679606e..ddd7bd16e0 100644
--- a/src/node_wrap.h
+++ b/src/node_wrap.h
@@ -14,7 +14,7 @@
namespace node {
-#define WITH_GENERIC_STREAM(env, obj, BODY) \
+#define WITH_GENERIC_UV_STREAM(env, obj, BODY, ELSE) \
do { \
if (env->tcp_constructor_template().IsEmpty() == false && \
env->tcp_constructor_template()->HasInstance(obj)) { \
@@ -28,16 +28,29 @@ namespace node {
env->pipe_constructor_template()->HasInstance(obj)) { \
PipeWrap* const wrap = Unwrap<PipeWrap>(obj); \
BODY \
+ } else { \
+ ELSE \
} \
} while (0)
+#define WITH_GENERIC_STREAM(env, obj, BODY) \
+ do { \
+ WITH_GENERIC_UV_STREAM(env, obj, BODY, { \
+ if (env->tls_wrap_constructor_template().IsEmpty() == false && \
+ env->tls_wrap_constructor_template()->HasInstance(obj)) { \
+ TLSWrap* const wrap = Unwrap<TLSWrap>(obj); \
+ BODY \
+ } \
+ }); \
+ } while (0)
+
inline uv_stream_t* HandleToStream(Environment* env,
v8::Local<v8::Object> obj) {
v8::HandleScope scope(env->isolate());
- WITH_GENERIC_STREAM(env, obj, {
+ WITH_GENERIC_UV_STREAM(env, obj, {
return reinterpret_cast<uv_stream_t*>(wrap->UVHandle());
- });
+ }, {});
return nullptr;
}
diff --git a/src/pipe_wrap.cc b/src/pipe_wrap.cc
index 55d5f84ff4..08fed68741 100644
--- a/src/pipe_wrap.cc
+++ b/src/pipe_wrap.cc
@@ -77,30 +77,11 @@ void PipeWrap::Initialize(Handle<Object> target,
t->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "Pipe"));
t->InstanceTemplate()->SetInternalFieldCount(1);
- enum PropertyAttribute attributes =
- static_cast<PropertyAttribute>(v8::ReadOnly | v8::DontDelete);
- t->InstanceTemplate()->SetAccessor(env->fd_string(),
- StreamWrap::GetFD,
- nullptr,
- Handle<Value>(),
- v8::DEFAULT,
- attributes);
-
env->SetProtoMethod(t, "close", HandleWrap::Close);
env->SetProtoMethod(t, "unref", HandleWrap::Unref);
env->SetProtoMethod(t, "ref", HandleWrap::Ref);
- env->SetProtoMethod(t, "setBlocking", StreamWrap::SetBlocking);
-
- env->SetProtoMethod(t, "readStart", StreamWrap::ReadStart);
- env->SetProtoMethod(t, "readStop", StreamWrap::ReadStop);
- env->SetProtoMethod(t, "shutdown", StreamWrap::Shutdown);
-
- env->SetProtoMethod(t, "writeBuffer", StreamWrap::WriteBuffer);
- env->SetProtoMethod(t, "writeAsciiString", StreamWrap::WriteAsciiString);
- env->SetProtoMethod(t, "writeUtf8String", StreamWrap::WriteUtf8String);
- env->SetProtoMethod(t, "writeUcs2String", StreamWrap::WriteUcs2String);
- env->SetProtoMethod(t, "writeBinaryString", StreamWrap::WriteBinaryString);
+ StreamWrap::AddMethods(env, t);
env->SetProtoMethod(t, "bind", Bind);
env->SetProtoMethod(t, "listen", Listen);
diff --git a/src/stream_base.cc b/src/stream_base.cc
new file mode 100644
index 0000000000..0a1324bb58
--- /dev/null
+++ b/src/stream_base.cc
@@ -0,0 +1,495 @@
+#include "stream_base.h"
+#include "stream_wrap.h"
+
+#include "node.h"
+#include "node_buffer.h"
+#include "env.h"
+#include "env-inl.h"
+#include "string_bytes.h"
+#include "tls_wrap.h"
+#include "util.h"
+#include "util-inl.h"
+#include "v8.h"
+
+#include <limits.h> // INT_MAX
+
+namespace node {
+
+using v8::Array;
+using v8::Context;
+using v8::FunctionCallbackInfo;
+using v8::FunctionTemplate;
+using v8::Handle;
+using v8::HandleScope;
+using v8::Integer;
+using v8::Local;
+using v8::Number;
+using v8::Object;
+using v8::PropertyAttribute;
+using v8::PropertyCallbackInfo;
+using v8::String;
+using v8::Value;
+
+template void StreamBase::AddMethods<StreamWrap>(Environment* env,
+ Handle<FunctionTemplate> t);
+template void StreamBase::AddMethods<TLSWrap>(Environment* env,
+ Handle<FunctionTemplate> t);
+
+
+template <class Base>
+void StreamBase::AddMethods(Environment* env, Handle<FunctionTemplate> t) {
+ HandleScope scope(env->isolate());
+
+ enum PropertyAttribute attributes =
+ static_cast<PropertyAttribute>(v8::ReadOnly | v8::DontDelete);
+ t->InstanceTemplate()->SetAccessor(env->fd_string(),
+ GetFD<Base>,
+ nullptr,
+ Handle<Value>(),
+ v8::DEFAULT,
+ attributes);
+
+ env->SetProtoMethod(t, "readStart", JSMethod<Base, &StreamBase::ReadStart>);
+ env->SetProtoMethod(t, "readStop", JSMethod<Base, &StreamBase::ReadStop>);
+ env->SetProtoMethod(t, "shutdown", JSMethod<Base, &StreamBase::Shutdown>);
+ env->SetProtoMethod(t, "writev", JSMethod<Base, &StreamBase::Writev>);
+ env->SetProtoMethod(t,
+ "writeBuffer",
+ JSMethod<Base, &StreamBase::WriteBuffer>);
+ env->SetProtoMethod(t,
+ "writeAsciiString",
+ JSMethod<Base, &StreamBase::WriteString<ASCII> >);
+ env->SetProtoMethod(t,
+ "writeUtf8String",
+ JSMethod<Base, &StreamBase::WriteString<UTF8> >);
+ env->SetProtoMethod(t,
+ "writeUcs2String",
+ JSMethod<Base, &StreamBase::WriteString<UCS2> >);
+ env->SetProtoMethod(t,
+ "writeBinaryString",
+ JSMethod<Base, &StreamBase::WriteString<BINARY> >);
+}
+
+
+template <class Base>
+void StreamBase::GetFD(Local<String> key,
+ const PropertyCallbackInfo<Value>& args) {
+ StreamBase* wrap = Unwrap<Base>(args.Holder());
+
+ if (!wrap->IsAlive())
+ return args.GetReturnValue().Set(UV_EINVAL);
+
+ args.GetReturnValue().Set(wrap->GetFD());
+}
+
+
+template <class Base,
+ int (StreamBase::*Method)(const FunctionCallbackInfo<Value>& args)>
+void StreamBase::JSMethod(const FunctionCallbackInfo<Value>& args) {
+ StreamBase* wrap = Unwrap<Base>(args.Holder());
+
+ if (!wrap->IsAlive())
+ return args.GetReturnValue().Set(UV_EINVAL);
+
+ args.GetReturnValue().Set((wrap->*Method)(args));
+}
+
+
+int StreamBase::ReadStart(const FunctionCallbackInfo<Value>& args) {
+ return ReadStart();
+}
+
+
+int StreamBase::ReadStop(const FunctionCallbackInfo<Value>& args) {
+ return ReadStop();
+}
+
+
+int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
+ Environment* env = Environment::GetCurrent(args);
+
+ CHECK(args[0]->IsObject());
+ Local<Object> req_wrap_obj = args[0].As<Object>();
+
+ ShutdownWrap* req_wrap = new ShutdownWrap(env,
+ req_wrap_obj,
+ this,
+ AfterShutdown);
+
+ int err = DoShutdown(req_wrap);
+ req_wrap->Dispatched();
+ if (err)
+ delete req_wrap;
+ return err;
+}
+
+
+void StreamBase::AfterShutdown(ShutdownWrap* req_wrap, int status) {
+ StreamBase* wrap = req_wrap->wrap();
+ Environment* env = req_wrap->env();
+
+ // The wrap and request objects should still be there.
+ CHECK_EQ(req_wrap->persistent().IsEmpty(), false);
+ CHECK_EQ(wrap->GetAsyncWrap()->persistent().IsEmpty(), false);
+
+ HandleScope handle_scope(env->isolate());
+ Context::Scope context_scope(env->context());
+
+ Local<Object> req_wrap_obj = req_wrap->object();
+ Local<Value> argv[3] = {
+ Integer::New(env->isolate(), status),
+ wrap->GetAsyncWrap()->object(),
+ req_wrap_obj
+ };
+
+ if (req_wrap->object()->Has(env->oncomplete_string()))
+ req_wrap->MakeCallback(env->oncomplete_string(), ARRAY_SIZE(argv), argv);
+
+ delete req_wrap;
+}
+
+
+int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
+ Environment* env = Environment::GetCurrent(args);
+
+ CHECK(args[0]->IsObject());
+ CHECK(args[1]->IsArray());
+
+ Local<Object> req_wrap_obj = args[0].As<Object>();
+ Local<Array> chunks = args[1].As<Array>();
+
+ size_t count = chunks->Length() >> 1;
+
+ uv_buf_t bufs_[16];
+ uv_buf_t* bufs = bufs_;
+
+ // Determine storage size first
+ size_t storage_size = 0;
+ for (size_t i = 0; i < count; i++) {
+ Handle<Value> chunk = chunks->Get(i * 2);
+
+ if (Buffer::HasInstance(chunk))
+ continue;
+ // Buffer chunk, no additional storage required
+
+ // String chunk
+ Handle<String> string = chunk->ToString(env->isolate());
+ enum encoding encoding = ParseEncoding(env->isolate(),
+ chunks->Get(i * 2 + 1));
+ size_t chunk_size;
+ if (encoding == UTF8 && string->Length() > 65535)
+ chunk_size = StringBytes::Size(env->isolate(), string, encoding);
+ else
+ chunk_size = StringBytes::StorageSize(env->isolate(), string, encoding);
+
+ storage_size += chunk_size + 15;
+ }
+
+ if (storage_size > INT_MAX)
+ return UV_ENOBUFS;
+
+ if (ARRAY_SIZE(bufs_) < count)
+ bufs = new uv_buf_t[count];
+
+ storage_size += sizeof(WriteWrap);
+ char* storage = new char[storage_size];
+ WriteWrap* req_wrap =
+ new(storage) WriteWrap(env, req_wrap_obj, this, AfterWrite);
+
+ uint32_t bytes = 0;
+ size_t offset = sizeof(WriteWrap);
+ for (size_t i = 0; i < count; i++) {
+ Handle<Value> chunk = chunks->Get(i * 2);
+
+ // Write buffer
+ if (Buffer::HasInstance(chunk)) {
+ bufs[i].base = Buffer::Data(chunk);
+ bufs[i].len = Buffer::Length(chunk);
+ bytes += bufs[i].len;
+ continue;
+ }
+
+ // Write string
+ offset = ROUND_UP(offset, 16);
+ CHECK_LT(offset, storage_size);
+ char* str_storage = storage + offset;
+ size_t str_size = storage_size - offset;
+
+ Handle<String> string = chunk->ToString(env->isolate());
+ enum encoding encoding = ParseEncoding(env->isolate(),
+ chunks->Get(i * 2 + 1));
+ str_size = StringBytes::Write(env->isolate(),
+ str_storage,
+ str_size,
+ string,
+ encoding);
+ bufs[i].base = str_storage;
+ bufs[i].len = str_size;
+ offset += str_size;
+ bytes += str_size;
+ }
+
+ int err = DoWrite(req_wrap, bufs, count, nullptr);
+
+ // Deallocate space
+ if (bufs != bufs_)
+ delete[] bufs;
+
+ req_wrap->Dispatched();
+ req_wrap->object()->Set(env->async(), True(env->isolate()));
+ req_wrap->object()->Set(env->bytes_string(),
+ Number::New(env->isolate(), bytes));
+ const char* msg = Error();
+ if (msg != nullptr) {
+ req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
+ ClearError();
+ }
+
+ if (err) {
+ req_wrap->~WriteWrap();
+ delete[] storage;
+ }
+
+ return err;
+}
+
+
+
+
+int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
+ CHECK(args[0]->IsObject());
+ CHECK(Buffer::HasInstance(args[1]));
+ Environment* env = Environment::GetCurrent(args);
+
+ Local<Object> req_wrap_obj = args[0].As<Object>();
+ const char* data = Buffer::Data(args[1]);
+ size_t length = Buffer::Length(args[1]);
+
+ char* storage;
+ WriteWrap* req_wrap;
+ uv_buf_t buf;
+ buf.base = const_cast<char*>(data);
+ buf.len = length;
+
+ // Try writing immediately without allocation
+ uv_buf_t* bufs = &buf;
+ size_t count = 1;
+ int err = DoTryWrite(&bufs, &count);
+ if (err != 0)
+ goto done;
+ if (count == 0)
+ goto done;
+ CHECK_EQ(count, 1);
+
+ // Allocate, or write rest
+ storage = new char[sizeof(WriteWrap)];
+ req_wrap = new(storage) WriteWrap(env, req_wrap_obj, this, AfterWrite);
+
+ err = DoWrite(req_wrap, bufs, count, nullptr);
+ req_wrap->Dispatched();
+ req_wrap_obj->Set(env->async(), True(env->isolate()));
+
+ if (err) {
+ req_wrap->~WriteWrap();
+ delete[] storage;
+ }
+
+ done:
+ const char* msg = Error();
+ if (msg != nullptr) {
+ req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
+ ClearError();
+ }
+ req_wrap_obj->Set(env->bytes_string(),
+ Integer::NewFromUnsigned(env->isolate(), length));
+ return err;
+}
+
+
+template <enum encoding enc>
+int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
+ Environment* env = Environment::GetCurrent(args);
+ CHECK(args[0]->IsObject());
+ CHECK(args[1]->IsString());
+
+ Local<Object> req_wrap_obj = args[0].As<Object>();
+ Local<String> string = args[1].As<String>();
+ Local<Object> send_handle_obj;
+ if (args[2]->IsObject())
+ send_handle_obj = args[2].As<Object>();
+
+ int err;
+
+ // Compute the size of the storage that the string will be flattened into.
+ // For UTF8 strings that are very long, go ahead and take the hit for
+ // computing their actual size, rather than tripling the storage.
+ size_t storage_size;
+ if (enc == UTF8 && string->Length() > 65535)
+ storage_size = StringBytes::Size(env->isolate(), string, enc);
+ else
+ storage_size = StringBytes::StorageSize(env->isolate(), string, enc);
+
+ if (storage_size > INT_MAX)
+ return UV_ENOBUFS;
+
+ // Try writing immediately if write size isn't too big
+ char* storage;
+ WriteWrap* req_wrap;
+ char* data;
+ char stack_storage[16384]; // 16kb
+ size_t data_size;
+ uv_buf_t buf;
+
+ bool try_write = storage_size + 15 <= sizeof(stack_storage) &&
+ (!IsIPCPipe() || send_handle_obj.IsEmpty());
+ if (try_write) {
+ data_size = StringBytes::Write(env->isolate(),
+ stack_storage,
+ storage_size,
+ string,
+ enc);
+ buf = uv_buf_init(stack_storage, data_size);
+
+ uv_buf_t* bufs = &buf;
+ size_t count = 1;
+ err = DoTryWrite(&bufs, &count);
+
+ // Failure
+ if (err != 0)
+ goto done;
+
+ // Success
+ if (count == 0)
+ goto done;
+
+ // Partial write
+ CHECK_EQ(count, 1);
+ }
+
+ storage = new char[sizeof(WriteWrap) + storage_size + 15];
+ req_wrap = new(storage) WriteWrap(env, req_wrap_obj, this, AfterWrite);
+
+ data = reinterpret_cast<char*>(ROUND_UP(
+ reinterpret_cast<uintptr_t>(storage) + sizeof(WriteWrap), 16));
+
+ if (try_write) {
+ // Copy partial data
+ memcpy(data, buf.base, buf.len);
+ data_size = buf.len;
+ } else {
+ // Write it
+ data_size = StringBytes::Write(env->isolate(),
+ data,
+ storage_size,
+ string,
+ enc);
+ }
+
+ CHECK_LE(data_size, storage_size);
+
+ buf = uv_buf_init(data, data_size);
+
+ if (!IsIPCPipe()) {
+ err = DoWrite(req_wrap, &buf, 1, nullptr);
+ } else {
+ uv_handle_t* send_handle = nullptr;
+
+ if (!send_handle_obj.IsEmpty()) {
+ HandleWrap* wrap = Unwrap<HandleWrap>(send_handle_obj);
+ send_handle = wrap->GetHandle();
+ // Reference StreamWrap instance to prevent it from being garbage
+ // collected before `AfterWrite` is called.
+ CHECK_EQ(false, req_wrap->persistent().IsEmpty());
+ req_wrap->object()->Set(env->handle_string(), send_handle_obj);
+ }
+
+ err = DoWrite(
+ req_wrap,
+ &buf,
+ 1,
+ reinterpret_cast<uv_stream_t*>(send_handle));
+ }
+
+ req_wrap->Dispatched();
+ req_wrap->object()->Set(env->async(), True(env->isolate()));
+
+ if (err) {
+ req_wrap->~WriteWrap();
+ delete[] storage;
+ }
+
+ done:
+ const char* msg = Error();
+ if (msg != nullptr) {
+ req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
+ ClearError();
+ }
+ req_wrap_obj->Set(env->bytes_string(),
+ Integer::NewFromUnsigned(env->isolate(), data_size));
+ return err;
+}
+
+
+void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) {
+ StreamBase* wrap = req_wrap->wrap();
+ Environment* env = req_wrap->env();
+
+ HandleScope handle_scope(env->isolate());
+ Context::Scope context_scope(env->context());
+
+ // The wrap and request objects should still be there.
+ CHECK_EQ(req_wrap->persistent().IsEmpty(), false);
+ CHECK_EQ(wrap->GetAsyncWrap()->persistent().IsEmpty(), false);
+
+ // Unref handle property
+ Local<Object> req_wrap_obj = req_wrap->object();
+ req_wrap_obj->Delete(env->handle_string());
+ wrap->OnAfterWrite(req_wrap);
+
+ Local<Value> argv[] = {
+ Integer::New(env->isolate(), status),
+ wrap->GetAsyncWrap()->object(),
+ req_wrap_obj,
+ Undefined(env->isolate())
+ };
+
+ const char* msg = wrap->Error();
+ if (msg != nullptr) {
+ argv[3] = OneByteString(env->isolate(), msg);
+ wrap->ClearError();
+ }
+
+ if (req_wrap->object()->Has(env->oncomplete_string()))
+ req_wrap->MakeCallback(env->oncomplete_string(), ARRAY_SIZE(argv), argv);
+
+ req_wrap->~WriteWrap();
+ delete[] reinterpret_cast<char*>(req_wrap);
+}
+
+
+void StreamBase::EmitData(ssize_t nread,
+ Local<Object> buf,
+ Local<Object> handle) {
+ Environment* env = env_;
+
+ Local<Value> argv[] = {
+ Integer::New(env->isolate(), nread),
+ buf,
+ handle
+ };
+
+ if (argv[1].IsEmpty())
+ argv[1] = Undefined(env->isolate());
+
+ if (argv[2].IsEmpty())
+ argv[2] = Undefined(env->isolate());
+
+ GetAsyncWrap()->MakeCallback(env->onread_string(), ARRAY_SIZE(argv), argv);
+}
+
+
+AsyncWrap* StreamBase::GetAsyncWrap() {
+ return nullptr;
+}
+
+} // namespace node
diff --git a/src/stream_base.h b/src/stream_base.h
new file mode 100644
index 0000000000..d6b3a555b0
--- /dev/null
+++ b/src/stream_base.h
@@ -0,0 +1,223 @@
+#ifndef SRC_STREAM_BASE_H_
+#define SRC_STREAM_BASE_H_
+
+#include "env.h"
+#include "async-wrap.h"
+#include "req-wrap.h"
+#include "req-wrap-inl.h"
+#include "node.h"
+
+#include "v8.h"
+
+namespace node {
+
+// Forward declarations
+class StreamBase;
+
+template <class Req>
+class StreamReq {
+ public:
+ typedef void (*DoneCb)(Req* req, int status);
+
+ explicit StreamReq(DoneCb cb) : cb_(cb) {
+ }
+
+ inline void Done(int status) {
+ cb_(static_cast<Req*>(this), status);
+ }
+
+ private:
+ DoneCb cb_;
+};
+
+class ShutdownWrap : public ReqWrap<uv_shutdown_t>,
+ public StreamReq<ShutdownWrap> {
+ public:
+ ShutdownWrap(Environment* env,
+ v8::Local<v8::Object> req_wrap_obj,
+ StreamBase* wrap,
+ DoneCb cb)
+ : ReqWrap(env, req_wrap_obj, AsyncWrap::PROVIDER_SHUTDOWNWRAP),
+ StreamReq<ShutdownWrap>(cb),
+ wrap_(wrap) {
+ Wrap(req_wrap_obj, this);
+ }
+
+ static void NewShutdownWrap(const v8::FunctionCallbackInfo<v8::Value>& args) {
+ CHECK(args.IsConstructCall());
+ }
+
+ inline StreamBase* wrap() const { return wrap_; }
+
+ private:
+ StreamBase* const wrap_;
+};
+
+class WriteWrap: public ReqWrap<uv_write_t>,
+ public StreamReq<WriteWrap> {
+ public:
+ WriteWrap(Environment* env,
+ v8::Local<v8::Object> obj,
+ StreamBase* wrap,
+ DoneCb cb)
+ : ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP),
+ StreamReq<WriteWrap>(cb),
+ wrap_(wrap) {
+ Wrap(obj, this);
+ }
+
+ void* operator new(size_t size, char* storage) { return storage; }
+
+ // This is just to keep the compiler happy. It should never be called, since
+ // we don't use exceptions in node.
+ void operator delete(void* ptr, char* storage) { UNREACHABLE(); }
+
+ inline StreamBase* wrap() const {
+ return wrap_;
+ }
+
+ static void NewWriteWrap(const v8::FunctionCallbackInfo<v8::Value>& args) {
+ CHECK(args.IsConstructCall());
+ }
+
+ private:
+ // People should not be using the non-placement new and delete operator on a
+ // WriteWrap. Ensure this never happens.
+ void* operator new(size_t size) { UNREACHABLE(); }
+ void operator delete(void* ptr) { UNREACHABLE(); }
+
+ StreamBase* const wrap_;
+};
+
+class StreamResource {
+ public:
+ typedef void (*AfterWriteCb)(WriteWrap* w, void* ctx);
+ typedef void (*AllocCb)(size_t size, uv_buf_t* buf, void* ctx);
+ typedef void (*ReadCb)(ssize_t nread,
+ const uv_buf_t* buf,
+ uv_handle_type pending,
+ void* ctx);
+
+ StreamResource() : after_write_cb_(nullptr),
+ alloc_cb_(nullptr),
+ read_cb_(nullptr) {
+ }
+
+ virtual ~StreamResource() = default;
+
+ virtual int DoShutdown(ShutdownWrap* req_wrap) = 0;
+ virtual int DoTryWrite(uv_buf_t** bufs, size_t* count) = 0;
+ virtual int DoWrite(WriteWrap* w,
+ uv_buf_t* bufs,
+ size_t count,
+ uv_stream_t* send_handle) = 0;
+ virtual const char* Error() const = 0;
+ virtual void ClearError() = 0;
+
+ // Events
+ inline void OnAfterWrite(WriteWrap* w) {
+ if (after_write_cb_ != nullptr)
+ after_write_cb_(w, after_write_ctx_);
+ }
+
+ inline void OnAlloc(size_t size, uv_buf_t* buf) {
+ if (alloc_cb_ != nullptr)
+ alloc_cb_(size, buf, alloc_ctx_);
+ }
+
+ inline void OnRead(size_t nread,
+ const uv_buf_t* buf,
+ uv_handle_type pending) {
+ if (read_cb_ != nullptr)
+ read_cb_(nread, buf, pending, read_ctx_);
+ }
+
+ inline void set_after_write_cb(AfterWriteCb cb, void* ctx) {
+ after_write_ctx_ = ctx;
+ after_write_cb_ = cb;
+ }
+
+ inline void set_alloc_cb(AllocCb cb, void* ctx) {
+ alloc_cb_ = cb;
+ alloc_ctx_ = ctx;
+ }
+
+ inline void set_read_cb(ReadCb cb, void* ctx) {
+ read_cb_ = cb;
+ read_ctx_ = ctx;
+ }
+
+ private:
+ AfterWriteCb after_write_cb_;
+ void* after_write_ctx_;
+ AllocCb alloc_cb_;
+ void* alloc_ctx_;
+ ReadCb read_cb_;
+ void* read_ctx_;
+};
+
+class StreamBase : public StreamResource {
+ public:
+ template <class Base>
+ static void AddMethods(Environment* env,
+ v8::Handle<v8::FunctionTemplate> target);
+
+ virtual void* Cast() = 0;
+ virtual bool IsAlive() const = 0;
+ virtual bool IsClosing() const = 0;
+ virtual bool IsIPCPipe() const = 0;
+ virtual int GetFD() const = 0;
+
+ virtual int ReadStart() = 0;
+ virtual int ReadStop() = 0;
+
+ inline void Consume() {
+ CHECK_EQ(consumed_, false);
+ consumed_ = true;
+ }
+
+ template <class Outer>
+ inline Outer* Cast() { return static_cast<Outer*>(Cast()); }
+
+ void EmitData(ssize_t nread,
+ v8::Local<v8::Object> buf,
+ v8::Local<v8::Object> handle);
+
+ protected:
+ explicit StreamBase(Environment* env) : env_(env), consumed_(false) {
+ }
+
+ virtual ~StreamBase() = default;
+
+ virtual AsyncWrap* GetAsyncWrap() = 0;
+
+ // Libuv callbacks
+ static void AfterShutdown(ShutdownWrap* req, int status);
+ static void AfterWrite(WriteWrap* req, int status);
+
+ // JS Methods
+ int ReadStart(const v8::FunctionCallbackInfo<v8::Value>& args);
+ int ReadStop(const v8::FunctionCallbackInfo<v8::Value>& args);
+ int Shutdown(const v8::FunctionCallbackInfo<v8::Value>& args);
+ int Writev(const v8::FunctionCallbackInfo<v8::Value>& args);
+ int WriteBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
+ template <enum encoding enc>
+ int WriteString(const v8::FunctionCallbackInfo<v8::Value>& args);
+
+ template <class Base>
+ static void GetFD(v8::Local<v8::String>,
+ const v8::PropertyCallbackInfo<v8::Value>&);
+
+ template <class Base,
+ int (StreamBase::*Method)( // NOLINT(whitespace/parens)
+ const v8::FunctionCallbackInfo<v8::Value>& args)>
+ static void JSMethod(const v8::FunctionCallbackInfo<v8::Value>& args);
+
+ private:
+ Environment* env_;
+ bool consumed_;
+};
+
+} // namespace node
+
+#endif // SRC_STREAM_BASE_H_
diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc
index a9f89e47bb..3b50f638eb 100644
--- a/src/stream_wrap.cc
+++ b/src/stream_wrap.cc
@@ -38,8 +38,8 @@ using v8::Value;
void StreamWrap::Initialize(Handle<Object> target,
- Handle<Value> unused,
- Handle<Context> context) {
+ Handle<Value> unused,
+ Handle<Context> context) {
Environment* env = Environment::GetCurrent(context);
Local<FunctionTemplate> sw =
@@ -55,6 +55,7 @@ void StreamWrap::Initialize(Handle<Object> target,
ww->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "WriteWrap"));
target->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "WriteWrap"),
ww->GetFunction());
+ env->set_write_wrap_constructor_function(ww->GetFunction());
}
@@ -68,23 +69,53 @@ StreamWrap::StreamWrap(Environment* env,
reinterpret_cast<uv_handle_t*>(stream),
provider,
parent),
- stream_(stream),
- default_callbacks_(this),
- callbacks_(&default_callbacks_),
- callbacks_gc_(false) {
+ StreamBase(env),
+ stream_(stream) {
+ set_after_write_cb(OnAfterWriteImpl, this);
+ set_alloc_cb(OnAllocImpl, this);
+ set_read_cb(OnReadImpl, this);
}
-void StreamWrap::GetFD(Local<String>, const PropertyCallbackInfo<Value>& args) {
-#if !defined(_WIN32)
- HandleScope scope(args.GetIsolate());
- StreamWrap* wrap = Unwrap<StreamWrap>(args.Holder());
+void StreamWrap::AddMethods(Environment* env,
+ v8::Handle<v8::FunctionTemplate> target) {
+ env->SetProtoMethod(target, "setBlocking", SetBlocking);
+ StreamBase::AddMethods<StreamWrap>(env, target);
+}
+
+
+int StreamWrap::GetFD() const {
int fd = -1;
- if (wrap != nullptr && wrap->stream() != nullptr) {
- fd = wrap->stream()->io_watcher.fd;
- }
- args.GetReturnValue().Set(fd);
+#if !defined(_WIN32)
+ if (stream() != nullptr)
+ fd = stream()->io_watcher.fd;
#endif
+ return fd;
+}
+
+
+bool StreamWrap::IsAlive() const {
+ return HandleWrap::IsAlive(this);
+}
+
+
+bool StreamWrap::IsClosing() const {
+ return uv_is_closing(reinterpret_cast<uv_handle_t*>(stream()));
+}
+
+
+void* StreamWrap::Cast() {
+ return reinterpret_cast<void*>(this);
+}
+
+
+AsyncWrap* StreamWrap::GetAsyncWrap() {
+ return static_cast<AsyncWrap*>(this);
+}
+
+
+bool StreamWrap::IsIPCPipe() const {
+ return is_named_pipe_ipc();
}
@@ -96,22 +127,13 @@ void StreamWrap::UpdateWriteQueueSize() {
}
-void StreamWrap::ReadStart(const FunctionCallbackInfo<Value>& args) {
- StreamWrap* wrap = Unwrap<StreamWrap>(args.Holder());
- if (!IsAlive(wrap))
- return args.GetReturnValue().Set(UV_EINVAL);
-
- int err = uv_read_start(wrap->stream(), OnAlloc, OnRead);
- args.GetReturnValue().Set(err);
+int StreamWrap::ReadStart() {
+ return uv_read_start(stream(), OnAlloc, OnRead);
}
-void StreamWrap::ReadStop(const FunctionCallbackInfo<Value>& args) {
- StreamWrap* wrap = Unwrap<StreamWrap>(args.Holder());
- if (!IsAlive(wrap))
- return args.GetReturnValue().Set(UV_EINVAL);
- int err = uv_read_stop(wrap->stream());
- args.GetReturnValue().Set(err);
+int StreamWrap::ReadStop() {
+ return uv_read_stop(stream());
}
@@ -120,14 +142,25 @@ void StreamWrap::OnAlloc(uv_handle_t* handle,
uv_buf_t* buf) {
StreamWrap* wrap = static_cast<StreamWrap*>(handle->data);
CHECK_EQ(wrap->stream(), reinterpret_cast<uv_stream_t*>(handle));
- wrap->callbacks()->DoAlloc(handle, suggested_size, buf);
+
+ return static_cast<StreamBase*>(wrap)->OnAlloc(suggested_size, buf);
+}
+
+
+void StreamWrap::OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx) {
+ buf->base = static_cast<char*>(malloc(size));
+ buf->len = size;
+
+ if (buf->base == nullptr && size > 0) {
+ FatalError(
+ "node::StreamWrap::DoAlloc(size_t, uv_buf_t*, void*)",
+ "Out Of Memory");
+ }
}
template <class WrapType, class UVType>
-static Local<Object> AcceptHandle(Environment* env,
- uv_stream_t* pipe,
- AsyncWrap* parent) {
+static Local<Object> AcceptHandle(Environment* env, StreamWrap* parent) {
EscapableHandleScope scope(env->isolate());
Local<Object> wrap_obj;
UVType* handle;
@@ -139,13 +172,54 @@ static Local<Object> AcceptHandle(Environment* env,
WrapType* wrap = Unwrap<WrapType>(wrap_obj);
handle = wrap->UVHandle();
- if (uv_accept(pipe, reinterpret_cast<uv_stream_t*>(handle)))
+ if (uv_accept(parent->stream(), reinterpret_cast<uv_stream_t*>(handle)))
abort();
return scope.Escape(wrap_obj);
}
+void StreamWrap::OnReadImpl(ssize_t nread,
+ const uv_buf_t* buf,
+ uv_handle_type pending,
+ void* ctx) {
+ StreamWrap* wrap = static_cast<StreamWrap*>(ctx);
+ Environment* env = wrap->env();
+ HandleScope handle_scope(env->isolate());
+ Context::Scope context_scope(env->context());
+
+ Local<Object> pending_obj;
+
+ if (nread < 0) {
+ if (buf->base != nullptr)
+ free(buf->base);
+ wrap->EmitData(nread, Local<Object>(), pending_obj);
+ return;
+ }
+
+ if (nread == 0) {
+ if (buf->base != nullptr)
+ free(buf->base);
+ return;
+ }
+
+ char* base = static_cast<char*>(realloc(buf->base, nread));
+ CHECK_LE(static_cast<size_t>(nread), buf->len);
+
+ if (pending == UV_TCP) {
+ pending_obj = AcceptHandle<TCPWrap, uv_tcp_t>(env, wrap);
+ } else if (pending == UV_NAMED_PIPE) {
+ pending_obj = AcceptHandle<PipeWrap, uv_pipe_t>(env, wrap);
+ } else if (pending == UV_UDP) {
+ pending_obj = AcceptHandle<UDPWrap, uv_udp_t>(env, wrap);
+ } else {
+ CHECK_EQ(pending, UV_UNKNOWN_HANDLE);
+ }
+
+ wrap->EmitData(nread, Buffer::Use(env, base, nread), pending_obj);
+}
+
+
void StreamWrap::OnReadCommon(uv_stream_t* handle,
ssize_t nread,
const uv_buf_t* buf,
@@ -164,7 +238,7 @@ void StreamWrap::OnReadCommon(uv_stream_t* handle,
}
}
- wrap->callbacks()->DoRead(handle, nread, buf, pending);
+ static_cast<StreamBase*>(wrap)->OnRead(nread, buf, pending);
}
@@ -183,437 +257,26 @@ void StreamWrap::OnRead(uv_stream_t* handle,
}
-size_t StreamWrap::WriteBuffer(Handle<Value> val, uv_buf_t* buf) {
- CHECK(Buffer::HasInstance(val));
-
- // Simple non-writev case
- buf->base = Buffer::Data(val);
- buf->len = Buffer::Length(val);
-
- return buf->len;
-}
-
-
-void StreamWrap::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
- Environment* env = Environment::GetCurrent(args);
-
- StreamWrap* wrap = Unwrap<StreamWrap>(args.Holder());
- if (!IsAlive(wrap))
- return args.GetReturnValue().Set(UV_EINVAL);
-
- CHECK(args[0]->IsObject());
- CHECK(Buffer::HasInstance(args[1]));
-
- Local<Object> req_wrap_obj = args[0].As<Object>();
- Local<Object> buf_obj = args[1].As<Object>();
-
- size_t length = Buffer::Length(buf_obj);
-
- char* storage;
- WriteWrap* req_wrap;
- uv_buf_t buf;
- WriteBuffer(buf_obj, &buf);
-
- // Try writing immediately without allocation
- uv_buf_t* bufs = &buf;
- size_t count = 1;
- int err = wrap->callbacks()->TryWrite(&bufs, &count);
- if (err != 0)
- goto done;
- if (count == 0)
- goto done;
- CHECK_EQ(count, 1);
-
- // Allocate, or write rest
- storage = new char[sizeof(WriteWrap)];
- req_wrap = new(storage) WriteWrap(env, req_wrap_obj, wrap);
-
- err = wrap->callbacks()->DoWrite(req_wrap,
- bufs,
- count,
- nullptr,
- StreamWrap::AfterWrite);
- req_wrap->Dispatched();
- req_wrap_obj->Set(env->async(), True(env->isolate()));
-
- if (err) {
- req_wrap->~WriteWrap();
- delete[] storage;
- }
-
- done:
- const char* msg = wrap->callbacks()->Error();
- if (msg != nullptr) {
- req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
- wrap->callbacks()->ClearError();
- }
- req_wrap_obj->Set(env->bytes_string(),
- Integer::NewFromUnsigned(env->isolate(), length));
- args.GetReturnValue().Set(err);
-}
-
-
-template <enum encoding encoding>
-void StreamWrap::WriteStringImpl(const FunctionCallbackInfo<Value>& args) {
- Environment* env = Environment::GetCurrent(args);
- int err;
-
- StreamWrap* wrap = Unwrap<StreamWrap>(args.Holder());
- if (!IsAlive(wrap))
- return args.GetReturnValue().Set(UV_EINVAL);
-
- CHECK(args[0]->IsObject());
- CHECK(args[1]->IsString());
-
- Local<Object> req_wrap_obj = args[0].As<Object>();
- Local<String> string = args[1].As<String>();
-
- // Compute the size of the storage that the string will be flattened into.
- // For UTF8 strings that are very long, go ahead and take the hit for
- // computing their actual size, rather than tripling the storage.
- size_t storage_size;
- if (encoding == UTF8 && string->Length() > 65535)
- storage_size = StringBytes::Size(env->isolate(), string, encoding);
- else
- storage_size = StringBytes::StorageSize(env->isolate(), string, encoding);
-
- if (storage_size > INT_MAX) {
- args.GetReturnValue().Set(UV_ENOBUFS);
- return;
- }
-
- // Try writing immediately if write size isn't too big
- char* storage;
- WriteWrap* req_wrap;
- char* data;
- char stack_storage[16384]; // 16kb
- size_t data_size;
- uv_buf_t buf;
-
- bool try_write = storage_size + 15 <= sizeof(stack_storage) &&
- (!wrap->is_named_pipe_ipc() || !args[2]->IsObject());
- if (try_write) {
- data_size = StringBytes::Write(env->isolate(),
- stack_storage,
- storage_size,
- string,
- encoding);
- buf = uv_buf_init(stack_storage, data_size);
-
- uv_buf_t* bufs = &buf;
- size_t count = 1;
- err = wrap->callbacks()->TryWrite(&bufs, &count);
-
- // Failure
- if (err != 0)
- goto done;
-
- // Success
- if (count == 0)
- goto done;
-
- // Partial write
- CHECK_EQ(count, 1);
- }
-
- storage = new char[sizeof(WriteWrap) + storage_size + 15];
- req_wrap = new(storage) WriteWrap(env, req_wrap_obj, wrap);
-
- data = reinterpret_cast<char*>(ROUND_UP(
- reinterpret_cast<uintptr_t>(storage) + sizeof(WriteWrap), 16));
-
- if (try_write) {
- // Copy partial data
- memcpy(data, buf.base, buf.len);
- data_size = buf.len;
- } else {
- // Write it
- data_size = StringBytes::Write(env->isolate(),
- data,
- storage_size,
- string,
- encoding);
- }
-
- CHECK_LE(data_size, storage_size);
-
- buf = uv_buf_init(data, data_size);
-
- if (!wrap->is_named_pipe_ipc()) {
- err = wrap->callbacks()->DoWrite(req_wrap,
- &buf,
- 1,
- nullptr,
- StreamWrap::AfterWrite);
- } else {
- uv_handle_t* send_handle = nullptr;
-
- if (args[2]->IsObject()) {
- Local<Object> send_handle_obj = args[2].As<Object>();
- HandleWrap* wrap = Unwrap<HandleWrap>(send_handle_obj);
- send_handle = wrap->GetHandle();
- // Reference StreamWrap instance to prevent it from being garbage
- // collected before `AfterWrite` is called.
- CHECK_EQ(false, req_wrap->persistent().IsEmpty());
- req_wrap->object()->Set(env->handle_string(), send_handle_obj);
- }
-
- err = wrap->callbacks()->DoWrite(
- req_wrap,
- &buf,
- 1,
- reinterpret_cast<uv_stream_t*>(send_handle),
- StreamWrap::AfterWrite);
- }
-
- req_wrap->Dispatched();
- req_wrap->object()->Set(env->async(), True(env->isolate()));
-
- if (err) {
- req_wrap->~WriteWrap();
- delete[] storage;
- }
-
- done:
- const char* msg = wrap->callbacks()->Error();
- if (msg != nullptr) {
- req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
- wrap->callbacks()->ClearError();
- }
- req_wrap_obj->Set(env->bytes_string(),
- Integer::NewFromUnsigned(env->isolate(), data_size));
- args.GetReturnValue().Set(err);
-}
-
-
-void StreamWrap::Writev(const FunctionCallbackInfo<Value>& args) {
- Environment* env = Environment::GetCurrent(args);
-
- StreamWrap* wrap = Unwrap<StreamWrap>(args.Holder());
- if (!IsAlive(wrap))
- return args.GetReturnValue().Set(UV_EINVAL);
-
- CHECK(args[0]->IsObject());
- CHECK(args[1]->IsArray());
-
- Local<Object> req_wrap_obj = args[0].As<Object>();
- Local<Array> chunks = args[1].As<Array>();
- size_t count = chunks->Length() >> 1;
-
- uv_buf_t bufs_[16];
- uv_buf_t* bufs = bufs_;
-
- // Determine storage size first
- size_t storage_size = 0;
- for (size_t i = 0; i < count; i++) {
- Handle<Value> chunk = chunks->Get(i * 2);
-
- if (Buffer::HasInstance(chunk))
- continue;
- // Buffer chunk, no additional storage required
-
- // String chunk
- Handle<String> string = chunk->ToString(env->isolate());
- enum encoding encoding = ParseEncoding(env->isolate(),
- chunks->Get(i * 2 + 1));
- size_t chunk_size;
- if (encoding == UTF8 && string->Length() > 65535)
- chunk_size = StringBytes::Size(env->isolate(), string, encoding);
- else
- chunk_size = StringBytes::StorageSize(env->isolate(), string, encoding);
-
- storage_size += chunk_size + 15;
- }
-
- if (storage_size > INT_MAX) {
- args.GetReturnValue().Set(UV_ENOBUFS);
- return;
- }
-
- if (ARRAY_SIZE(bufs_) < count)
- bufs = new uv_buf_t[count];
-
- storage_size += sizeof(WriteWrap);
- char* storage = new char[storage_size];
- WriteWrap* req_wrap =
- new(storage) WriteWrap(env, req_wrap_obj, wrap);
-
- uint32_t bytes = 0;
- size_t offset = sizeof(WriteWrap);
- for (size_t i = 0; i < count; i++) {
- Handle<Value> chunk = chunks->Get(i * 2);
-
- // Write buffer
- if (Buffer::HasInstance(chunk)) {
- bufs[i].base = Buffer::Data(chunk);
- bufs[i].len = Buffer::Length(chunk);
- bytes += bufs[i].len;
- continue;
- }
-
- // Write string
- offset = ROUND_UP(offset, 16);
- CHECK_LT(offset, storage_size);
- char* str_storage = storage + offset;
- size_t str_size = storage_size - offset;
-
- Handle<String> string = chunk->ToString(env->isolate());
- enum encoding encoding = ParseEncoding(env->isolate(),
- chunks->Get(i * 2 + 1));
- str_size = StringBytes::Write(env->isolate(),
- str_storage,
- str_size,
- string,
- encoding);
- bufs[i].base = str_storage;
- bufs[i].len = str_size;
- offset += str_size;
- bytes += str_size;
- }
-
- int err = wrap->callbacks()->DoWrite(req_wrap,
- bufs,
- count,
- nullptr,
- StreamWrap::AfterWrite);
-
- // Deallocate space
- if (bufs != bufs_)
- delete[] bufs;
-
- req_wrap->Dispatched();
- req_wrap->object()->Set(env->async(), True(env->isolate()));
- req_wrap->object()->Set(env->bytes_string(),
- Number::New(env->isolate(), bytes));
- const char* msg = wrap->callbacks()->Error();
- if (msg != nullptr) {
- req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
- wrap->callbacks()->ClearError();
- }
-
- if (err) {
- req_wrap->~WriteWrap();
- delete[] storage;
- }
-
- args.GetReturnValue().Set(err);
-}
-
-
-void StreamWrap::WriteAsciiString(const FunctionCallbackInfo<Value>& args) {
- WriteStringImpl<ASCII>(args);
-}
-
-
-void StreamWrap::WriteUtf8String(const FunctionCallbackInfo<Value>& args) {
- WriteStringImpl<UTF8>(args);
-}
-
-
-void StreamWrap::WriteUcs2String(const FunctionCallbackInfo<Value>& args) {
- WriteStringImpl<UCS2>(args);
-}
-
-void StreamWrap::WriteBinaryString(const FunctionCallbackInfo<Value>& args) {
- WriteStringImpl<BINARY>(args);
-}
-
void StreamWrap::SetBlocking(const FunctionCallbackInfo<Value>& args) {
StreamWrap* wrap = Unwrap<StreamWrap>(args.Holder());
- if (!IsAlive(wrap))
- return args.GetReturnValue().Set(UV_EINVAL);
- CHECK_GT(args.Length(), 0);
- int err = uv_stream_set_blocking(wrap->stream(), args[0]->IsTrue());
- args.GetReturnValue().Set(err);
-}
-
-void StreamWrap::AfterWrite(uv_write_t* req, int status) {
- WriteWrap* req_wrap = ContainerOf(&WriteWrap::req_, req);
- StreamWrap* wrap = req_wrap->wrap();
- Environment* env = wrap->env();
-
- HandleScope handle_scope(env->isolate());
- Context::Scope context_scope(env->context());
-
- // The wrap and request objects should still be there.
- CHECK_EQ(req_wrap->persistent().IsEmpty(), false);
- CHECK_EQ(wrap->persistent().IsEmpty(), false);
-
- // Unref handle property
- Local<Object> req_wrap_obj = req_wrap->object();
- req_wrap_obj->Delete(env->handle_string());
- wrap->callbacks()->AfterWrite(req_wrap);
-
- Local<Value> argv[] = {
- Integer::New(env->isolate(), status),
- wrap->object(),
- req_wrap_obj,
- Undefined(env->isolate())
- };
-
- const char* msg = wrap->callbacks()->Error();
- if (msg != nullptr) {
- argv[3] = OneByteString(env->isolate(), msg);
- wrap->callbacks()->ClearError();
- }
-
- req_wrap->MakeCallback(env->oncomplete_string(), ARRAY_SIZE(argv), argv);
-
- req_wrap->~WriteWrap();
- delete[] reinterpret_cast<char*>(req_wrap);
-}
-
-void StreamWrap::Shutdown(const FunctionCallbackInfo<Value>& args) {
- Environment* env = Environment::GetCurrent(args);
-
- StreamWrap* wrap = Unwrap<StreamWrap>(args.Holder());
- if (!IsAlive(wrap))
+ CHECK_GT(args.Length(), 0);
+ if (!wrap->IsAlive())
return args.GetReturnValue().Set(UV_EINVAL);
- CHECK(args[0]->IsObject());
- Local<Object> req_wrap_obj = args[0].As<Object>();
-
- ShutdownWrap* req_wrap = new ShutdownWrap(env, req_wrap_obj);
- int err = wrap->callbacks()->DoShutdown(req_wrap, AfterShutdown);
- req_wrap->Dispatched();
- if (err)
- delete req_wrap;
- args.GetReturnValue().Set(err);
-}
-
-
-void StreamWrap::AfterShutdown(uv_shutdown_t* req, int status) {
- ShutdownWrap* req_wrap = static_cast<ShutdownWrap*>(req->data);
- StreamWrap* wrap = static_cast<StreamWrap*>(req->handle->data);
- Environment* env = wrap->env();
-
- // The wrap and request objects should still be there.
- CHECK_EQ(req_wrap->persistent().IsEmpty(), false);
- CHECK_EQ(wrap->persistent().IsEmpty(), false);
-
- HandleScope handle_scope(env->isolate());
- Context::Scope context_scope(env->context());
-
- Local<Object> req_wrap_obj = req_wrap->object();
- Local<Value> argv[3] = {
- Integer::New(env->isolate(), status),
- wrap->object(),
- req_wrap_obj
- };
-
- req_wrap->MakeCallback(env->oncomplete_string(), ARRAY_SIZE(argv), argv);
-
- delete req_wrap;
+ bool enable = args[0]->IsTrue();
+ args.GetReturnValue().Set(uv_stream_set_blocking(wrap->stream(), enable));
}
-const char* StreamWrapCallbacks::Error() const {
- return nullptr;
+int StreamWrap::DoShutdown(ShutdownWrap* req_wrap) {
+ return uv_shutdown(&req_wrap->req_, stream(), AfterShutdown);
}
-void StreamWrapCallbacks::ClearError() {
+void StreamWrap::AfterShutdown(uv_shutdown_t* req, int status) {
+ ShutdownWrap* req_wrap = ContainerOf(&ShutdownWrap::req_, req);
+ req_wrap->Done(status);
}
@@ -621,13 +284,13 @@ void StreamWrapCallbacks::ClearError() {
// values, shifting their base and decrementing their length. This is
// required in order to skip the data that was successfully written via
// uv_try_write().
-int StreamWrapCallbacks::TryWrite(uv_buf_t** bufs, size_t* count) {
+int StreamWrap::DoTryWrite(uv_buf_t** bufs, size_t* count) {
int err;
size_t written;
uv_buf_t* vbufs = *bufs;
size_t vcount = *count;
- err = uv_try_write(wrap()->stream(), vbufs, vcount);
+ err = uv_try_write(stream(), vbufs, vcount);
if (err == UV_ENOSYS || err == UV_EAGAIN)
return 0;
if (err < 0)
@@ -657,106 +320,53 @@ int StreamWrapCallbacks::TryWrite(uv_buf_t** bufs, size_t* count) {
}
-int StreamWrapCallbacks::DoWrite(WriteWrap* w,
- uv_buf_t* bufs,
- size_t count,
- uv_stream_t* send_handle,
- uv_write_cb cb) {
+int StreamWrap::DoWrite(WriteWrap* w,
+ uv_buf_t* bufs,
+ size_t count,
+ uv_stream_t* send_handle) {
int r;
if (send_handle == nullptr) {
- r = uv_write(&w->req_, wrap()->stream(), bufs, count, cb);
+ r = uv_write(&w->req_, stream(), bufs, count, AfterWrite);
} else {
- r = uv_write2(&w->req_, wrap()->stream(), bufs, count, send_handle, cb);
+ r = uv_write2(&w->req_, stream(), bufs, count, send_handle, AfterWrite);
}
if (!r) {
size_t bytes = 0;
for (size_t i = 0; i < count; i++)
bytes += bufs[i].len;
- if (wrap()->stream()->type == UV_TCP) {
+ if (stream()->type == UV_TCP) {
NODE_COUNT_NET_BYTES_SENT(bytes);
- } else if (wrap()->stream()->type == UV_NAMED_PIPE) {
+ } else if (stream()->type == UV_NAMED_PIPE) {
NODE_COUNT_PIPE_BYTES_SENT(bytes);
}
}
- wrap()->UpdateWriteQueueSize();
+ UpdateWriteQueueSize();
return r;
}
-void StreamWrapCallbacks::AfterWrite(WriteWrap* w) {
- wrap()->UpdateWriteQueueSize();
+void StreamWrap::AfterWrite(uv_write_t* req, int status) {
+ WriteWrap* req_wrap = ContainerOf(&WriteWrap::req_, req);
+ req_wrap->Done(status);
}
-void StreamWrapCallbacks::DoAlloc(uv_handle_t* handle,
- size_t suggested_size,
- uv_buf_t* buf) {
- buf->base = static_cast<char*>(malloc(suggested_size));
- buf->len = suggested_size;
-
- if (buf->base == nullptr && suggested_size > 0) {
- FatalError(
- "node::StreamWrapCallbacks::DoAlloc(uv_handle_t*, size_t, uv_buf_t*)",
- "Out Of Memory");
- }
+void StreamWrap::OnAfterWriteImpl(WriteWrap* w, void* ctx) {
+ StreamWrap* wrap = static_cast<StreamWrap*>(ctx);
+ wrap->UpdateWriteQueueSize();
}
-void StreamWrapCallbacks::DoRead(uv_stream_t* handle,
- ssize_t nread,
- const uv_buf_t* buf,
- uv_handle_type pending) {
- Environment* env = wrap()->env();
- HandleScope handle_scope(env->isolate());
- Context::Scope context_scope(env->context());
-
- Local<Value> argv[] = {
- Integer::New(env->isolate(), nread),
- Undefined(env->isolate()),
- Undefined(env->isolate())
- };
-
- if (nread < 0) {
- if (buf->base != nullptr)
- free(buf->base);
- wrap()->MakeCallback(env->onread_string(), ARRAY_SIZE(argv), argv);
- return;
- }
-
- if (nread == 0) {
- if (buf->base != nullptr)
- free(buf->base);
- return;
- }
-
- char* base = static_cast<char*>(realloc(buf->base, nread));
- CHECK_LE(static_cast<size_t>(nread), buf->len);
- argv[1] = Buffer::Use(env, base, nread);
-
- Local<Object> pending_obj;
- if (pending == UV_TCP) {
- pending_obj = AcceptHandle<TCPWrap, uv_tcp_t>(env, handle, wrap());
- } else if (pending == UV_NAMED_PIPE) {
- pending_obj = AcceptHandle<PipeWrap, uv_pipe_t>(env, handle, wrap());
- } else if (pending == UV_UDP) {
- pending_obj = AcceptHandle<UDPWrap, uv_udp_t>(env, handle, wrap());
- } else {
- CHECK_EQ(pending, UV_UNKNOWN_HANDLE);
- }
-
- if (!pending_obj.IsEmpty()) {
- argv[2] = pending_obj;
- }
-
- wrap()->MakeCallback(env->onread_string(), ARRAY_SIZE(argv), argv);
+const char* StreamWrap::Error() const {
+ return nullptr;
}
-int StreamWrapCallbacks::DoShutdown(ShutdownWrap* req_wrap, uv_shutdown_cb cb) {
- return uv_shutdown(&req_wrap->req_, wrap()->stream(), cb);
+void StreamWrap::ClearError() {
+ // No-op
}
} // namespace node
diff --git a/src/stream_wrap.h b/src/stream_wrap.h
index 5148228112..ca673b4ef1 100644
--- a/src/stream_wrap.h
+++ b/src/stream_wrap.h
@@ -1,10 +1,10 @@
#ifndef SRC_STREAM_WRAP_H_
#define SRC_STREAM_WRAP_H_
+#include "stream_base.h"
+
#include "env.h"
#include "handle_wrap.h"
-#include "req-wrap.h"
-#include "req-wrap-inl.h"
#include "string_bytes.h"
#include "v8.h"
@@ -13,126 +13,31 @@ namespace node {
// Forward declaration
class StreamWrap;
-class ShutdownWrap : public ReqWrap<uv_shutdown_t> {
- public:
- ShutdownWrap(Environment* env, v8::Local<v8::Object> req_wrap_obj)
- : ReqWrap(env, req_wrap_obj, AsyncWrap::PROVIDER_SHUTDOWNWRAP) {
- Wrap(req_wrap_obj, this);
- }
-
- static void NewShutdownWrap(const v8::FunctionCallbackInfo<v8::Value>& args) {
- CHECK(args.IsConstructCall());
- }
-};
-
-class WriteWrap: public ReqWrap<uv_write_t> {
- public:
- // TODO(trevnorris): WrapWrap inherits from ReqWrap, which I've globbed
- // into the same provider. How should these be broken apart?
- WriteWrap(Environment* env, v8::Local<v8::Object> obj, StreamWrap* wrap)
- : ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP),
- wrap_(wrap) {
- Wrap(obj, this);
- }
-
- void* operator new(size_t size, char* storage) { return storage; }
-
- // This is just to keep the compiler happy. It should never be called, since
- // we don't use exceptions in node.
- void operator delete(void* ptr, char* storage) { UNREACHABLE(); }
-
- inline StreamWrap* wrap() const {
- return wrap_;
- }
-
- static void NewWriteWrap(const v8::FunctionCallbackInfo<v8::Value>& args) {
- CHECK(args.IsConstructCall());
- }
-
- private:
- // People should not be using the non-placement new and delete operator on a
- // WriteWrap. Ensure this never happens.
- void* operator new(size_t size) { UNREACHABLE(); }
- void operator delete(void* ptr) { UNREACHABLE(); }
-
- StreamWrap* const wrap_;
-};
-
-// Overridable callbacks' types
-class StreamWrapCallbacks {
- public:
- explicit StreamWrapCallbacks(StreamWrap* wrap) : wrap_(wrap) {
- }
-
- explicit StreamWrapCallbacks(StreamWrapCallbacks* old) : wrap_(old->wrap()) {
- }
-
- virtual ~StreamWrapCallbacks() = default;
-
- virtual const char* Error() const;
- virtual void ClearError();
-
- virtual int TryWrite(uv_buf_t** bufs, size_t* count);
-
- virtual int DoWrite(WriteWrap* w,
- uv_buf_t* bufs,
- size_t count,
- uv_stream_t* send_handle,
- uv_write_cb cb);
- virtual void AfterWrite(WriteWrap* w);
- virtual void DoAlloc(uv_handle_t* handle,
- size_t suggested_size,
- uv_buf_t* buf);
- virtual void DoRead(uv_stream_t* handle,
- ssize_t nread,
- const uv_buf_t* buf,
- uv_handle_type pending);
- virtual int DoShutdown(ShutdownWrap* req_wrap, uv_shutdown_cb cb);
-
- protected:
- inline StreamWrap* wrap() const {
- return wrap_;
- }
-
- private:
- StreamWrap* const wrap_;
-};
-
-class StreamWrap : public HandleWrap {
+class StreamWrap : public HandleWrap, public StreamBase {
public:
static void Initialize(v8::Handle<v8::Object> target,
v8::Handle<v8::Value> unused,
v8::Handle<v8::Context> context);
- void OverrideCallbacks(StreamWrapCallbacks* callbacks, bool gc) {
- StreamWrapCallbacks* old = callbacks_;
- callbacks_ = callbacks;
- callbacks_gc_ = gc;
- if (old != &default_callbacks_)
- delete old;
- }
-
- static void GetFD(v8::Local<v8::String>,
- const v8::PropertyCallbackInfo<v8::Value>&);
+ int GetFD() const override;
+ void* Cast() override;
+ bool IsAlive() const override;
+ bool IsClosing() const override;
+ bool IsIPCPipe() const override;
// JavaScript functions
- static void ReadStart(const v8::FunctionCallbackInfo<v8::Value>& args);
- static void ReadStop(const v8::FunctionCallbackInfo<v8::Value>& args);
- static void Shutdown(const v8::FunctionCallbackInfo<v8::Value>& args);
-
- static void Writev(const v8::FunctionCallbackInfo<v8::Value>& args);
- static void WriteBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
- static void WriteAsciiString(const v8::FunctionCallbackInfo<v8::Value>& args);
- static void WriteUtf8String(const v8::FunctionCallbackInfo<v8::Value>& args);
- static void WriteUcs2String(const v8::FunctionCallbackInfo<v8::Value>& args);
- static void WriteBinaryString(
- const v8::FunctionCallbackInfo<v8::Value>& args);
-
- static void SetBlocking(const v8::FunctionCallbackInfo<v8::Value>& args);
-
- inline StreamWrapCallbacks* callbacks() const {
- return callbacks_;
- }
+ int ReadStart() override;
+ int ReadStop() override;
+
+ // Resource implementation
+ int DoShutdown(ShutdownWrap* req_wrap) override;
+ int DoTryWrite(uv_buf_t** bufs, size_t* count) override;
+ int DoWrite(WriteWrap* w,
+ uv_buf_t* bufs,
+ size_t count,
+ uv_stream_t* send_handle) override;
+ const char* Error() const override;
+ void ClearError() override;
inline uv_stream_t* stream() const {
return stream_;
@@ -152,8 +57,6 @@ class StreamWrap : public HandleWrap {
}
protected:
- static size_t WriteBuffer(v8::Handle<v8::Value> val, uv_buf_t* buf);
-
StreamWrap(Environment* env,
v8::Local<v8::Object> object,
uv_stream_t* stream,
@@ -161,22 +64,21 @@ class StreamWrap : public HandleWrap {
AsyncWrap* parent = nullptr);
~StreamWrap() {
- if (!callbacks_gc_ && callbacks_ != &default_callbacks_) {
- delete callbacks_;
- }
- callbacks_ = nullptr;
}
- void StateChange() { }
+ AsyncWrap* GetAsyncWrap() override;
void UpdateWriteQueueSize();
+ static void AddMethods(Environment* env,
+ v8::Handle<v8::FunctionTemplate> target);
+
private:
+ static void SetBlocking(const v8::FunctionCallbackInfo<v8::Value>& args);
+
// Callbacks for libuv
- static void AfterWrite(uv_write_t* req, int status);
static void OnAlloc(uv_handle_t* handle,
size_t suggested_size,
uv_buf_t* buf);
- static void AfterShutdown(uv_shutdown_t* req, int status);
static void OnRead(uv_stream_t* handle,
ssize_t nread,
@@ -185,16 +87,18 @@ class StreamWrap : public HandleWrap {
ssize_t nread,
const uv_buf_t* buf,
uv_handle_type pending);
+ static void AfterWrite(uv_write_t* req, int status);
+ static void AfterShutdown(uv_shutdown_t* req, int status);
- template <enum encoding encoding>
- static void WriteStringImpl(const v8::FunctionCallbackInfo<v8::Value>& args);
+ // Resource interface implementation
+ static void OnAfterWriteImpl(WriteWrap* w, void* ctx);
+ static void OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx);
+ static void OnReadImpl(ssize_t nread,
+ const uv_buf_t* buf,
+ uv_handle_type pending,
+ void* ctx);
uv_stream_t* const stream_;
- StreamWrapCallbacks default_callbacks_;
- StreamWrapCallbacks* callbacks_; // Overridable callbacks
- bool callbacks_gc_;
-
- friend class StreamWrapCallbacks;
};
diff --git a/src/tcp_wrap.cc b/src/tcp_wrap.cc
index 3f011422ef..a823e758ee 100644
--- a/src/tcp_wrap.cc
+++ b/src/tcp_wrap.cc
@@ -72,15 +72,6 @@ void TCPWrap::Initialize(Handle<Object> target,
t->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "TCP"));
t->InstanceTemplate()->SetInternalFieldCount(1);
- enum PropertyAttribute attributes =
- static_cast<PropertyAttribute>(v8::ReadOnly | v8::DontDelete);
- t->InstanceTemplate()->SetAccessor(env->fd_string(),
- StreamWrap::GetFD,
- nullptr,
- Handle<Value>(),
- v8::DEFAULT,
- attributes);
-
// Init properties
t->InstanceTemplate()->Set(String::NewFromUtf8(env->isolate(), "reading"),
Boolean::New(env->isolate(), false));
@@ -98,16 +89,7 @@ void TCPWrap::Initialize(Handle<Object> target,
env->SetProtoMethod(t, "ref", HandleWrap::Ref);
env->SetProtoMethod(t, "unref", HandleWrap::Unref);
- env->SetProtoMethod(t, "readStart", StreamWrap::ReadStart);
- env->SetProtoMethod(t, "readStop", StreamWrap::ReadStop);
- env->SetProtoMethod(t, "shutdown", StreamWrap::Shutdown);
-
- env->SetProtoMethod(t, "writeBuffer", StreamWrap::WriteBuffer);
- env->SetProtoMethod(t, "writeAsciiString", StreamWrap::WriteAsciiString);
- env->SetProtoMethod(t, "writeUtf8String", StreamWrap::WriteUtf8String);
- env->SetProtoMethod(t, "writeUcs2String", StreamWrap::WriteUcs2String);
- env->SetProtoMethod(t, "writeBinaryString", StreamWrap::WriteBinaryString);
- env->SetProtoMethod(t, "writev", StreamWrap::Writev);
+ StreamWrap::AddMethods(env, t);
env->SetProtoMethod(t, "open", Open);
env->SetProtoMethod(t, "bind", Bind);
diff --git a/src/tls_wrap.cc b/src/tls_wrap.cc
index 9aafe3925d..ab8db6951b 100644
--- a/src/tls_wrap.cc
+++ b/src/tls_wrap.cc
@@ -33,17 +33,20 @@ using v8::String;
using v8::Value;
-TLSCallbacks::TLSCallbacks(Environment* env,
- Kind kind,
- Handle<Object> sc,
- StreamWrapCallbacks* old)
- : SSLWrap<TLSCallbacks>(env, Unwrap<SecureContext>(sc), kind),
- StreamWrapCallbacks(old),
+TLSWrap::TLSWrap(Environment* env,
+ Kind kind,
+ StreamBase* stream,
+ Handle<Object> stream_obj,
+ Handle<Object> sc)
+ : SSLWrap<TLSWrap>(env, Unwrap<SecureContext>(sc), kind),
+ StreamBase(env),
AsyncWrap(env,
env->tls_wrap_constructor_function()->NewInstance(),
AsyncWrap::PROVIDER_TLSWRAP),
sc_(Unwrap<SecureContext>(sc)),
sc_handle_(env->isolate(), sc),
+ stream_(stream),
+ stream_handle_(env->isolate(), stream_obj),
enc_in_(nullptr),
enc_out_(nullptr),
clear_in_(nullptr),
@@ -58,14 +61,22 @@ TLSCallbacks::TLSCallbacks(Environment* env,
MakeWeak(this);
// We've our own session callbacks
- SSL_CTX_sess_set_get_cb(sc_->ctx_, SSLWrap<TLSCallbacks>::GetSessionCallback);
- SSL_CTX_sess_set_new_cb(sc_->ctx_, SSLWrap<TLSCallbacks>::NewSessionCallback);
+ SSL_CTX_sess_set_get_cb(sc_->ctx_, SSLWrap<TLSWrap>::GetSessionCallback);
+ SSL_CTX_sess_set_new_cb(sc_->ctx_, SSLWrap<TLSWrap>::NewSessionCallback);
+
+ stream_->Consume();
+ stream_->set_after_write_cb(OnAfterWriteImpl, this);
+ stream_->set_alloc_cb(OnAllocImpl, this);
+ stream_->set_read_cb(OnReadImpl, this);
+
+ set_alloc_cb(OnAllocSelf, this);
+ set_read_cb(OnReadSelf, this);
InitSSL();
}
-TLSCallbacks::~TLSCallbacks() {
+TLSWrap::~TLSWrap() {
enc_in_ = nullptr;
enc_out_ = nullptr;
delete clear_in_;
@@ -73,6 +84,7 @@ TLSCallbacks::~TLSCallbacks() {
sc_ = nullptr;
sc_handle_.Reset();
+ stream_handle_.Reset();
persistent().Reset();
#ifdef SSL_CTRL_SET_TLSEXT_SERVERNAME_CB
@@ -90,12 +102,12 @@ TLSCallbacks::~TLSCallbacks() {
}
-void TLSCallbacks::MakePending() {
+void TLSWrap::MakePending() {
write_item_queue_.MoveBack(&pending_write_items_);
}
-bool TLSCallbacks::InvokeQueued(int status) {
+bool TLSWrap::InvokeQueued(int status) {
if (pending_write_items_.IsEmpty())
return false;
@@ -103,7 +115,7 @@ bool TLSCallbacks::InvokeQueued(int status) {
WriteItemList queue;
pending_write_items_.MoveBack(&queue);
while (WriteItem* wi = queue.PopFront()) {
- wi->cb_(&wi->w_->req_, status);
+ wi->w_->Done(status);
delete wi;
}
@@ -111,12 +123,12 @@ bool TLSCallbacks::InvokeQueued(int status) {
}
-void TLSCallbacks::NewSessionDoneCb() {
+void TLSWrap::NewSessionDoneCb() {
Cycle();
}
-void TLSCallbacks::InitSSL() {
+void TLSWrap::InitSSL() {
// Initialize SSL
enc_in_ = NodeBIO::New();
enc_out_ = NodeBIO::New();
@@ -158,7 +170,7 @@ void TLSCallbacks::InitSSL() {
}
-void TLSCallbacks::Wrap(const FunctionCallbackInfo<Value>& args) {
+void TLSWrap::Wrap(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
if (args.Length() < 1 || !args[0]->IsObject()) {
@@ -172,42 +184,39 @@ void TLSCallbacks::Wrap(const FunctionCallbackInfo<Value>& args) {
if (args.Length() < 3 || !args[2]->IsBoolean())
return env->ThrowTypeError("Third argument should be boolean");
- Local<Object> stream = args[0].As<Object>();
+ Local<Object> stream_obj = args[0].As<Object>();
Local<Object> sc = args[1].As<Object>();
- Kind kind = args[2]->IsTrue() ? SSLWrap<TLSCallbacks>::kServer :
- SSLWrap<TLSCallbacks>::kClient;
+ Kind kind = args[2]->IsTrue() ? SSLWrap<TLSWrap>::kServer :
+ SSLWrap<TLSWrap>::kClient;
- TLSCallbacks* callbacks = nullptr;
- WITH_GENERIC_STREAM(env, stream, {
- callbacks = new TLSCallbacks(env, kind, sc, wrap->callbacks());
- wrap->OverrideCallbacks(callbacks, true);
+ StreamBase* stream = nullptr;
+ WITH_GENERIC_STREAM(env, stream_obj, {
+ stream = wrap;
});
+ CHECK_NE(stream, nullptr);
- if (callbacks == nullptr) {
- return args.GetReturnValue().SetNull();
- }
+ TLSWrap* res = new TLSWrap(env, kind, stream, stream_obj, sc);
- args.GetReturnValue().Set(callbacks->persistent());
+ args.GetReturnValue().Set(res->persistent());
}
-void TLSCallbacks::Receive(const FunctionCallbackInfo<Value>& args) {
- TLSCallbacks* wrap = Unwrap<TLSCallbacks>(args.Holder());
+void TLSWrap::Receive(const FunctionCallbackInfo<Value>& args) {
+ TLSWrap* wrap = Unwrap<TLSWrap>(args.Holder());
CHECK(Buffer::HasInstance(args[0]));
char* data = Buffer::Data(args[0]);
size_t len = Buffer::Length(args[0]);
uv_buf_t buf;
- uv_stream_t* stream = wrap->wrap()->stream();
// Copy given buffer entirely or partiall if handle becomes closed
- while (len > 0 && !uv_is_closing(reinterpret_cast<uv_handle_t*>(stream))) {
- wrap->DoAlloc(reinterpret_cast<uv_handle_t*>(stream), len, &buf);
+ while (len > 0 && !wrap->IsClosing()) {
+ wrap->stream_->OnAlloc(len, &buf);
size_t copy = buf.len > len ? len : buf.len;
memcpy(buf.base, data, copy);
buf.len = copy;
- wrap->DoRead(stream, buf.len, &buf, UV_UNKNOWN_HANDLE);
+ wrap->stream_->OnRead(buf.len, &buf, UV_UNKNOWN_HANDLE);
data += copy;
len -= copy;
@@ -215,10 +224,10 @@ void TLSCallbacks::Receive(const FunctionCallbackInfo<Value>& args) {
}
-void TLSCallbacks::Start(const FunctionCallbackInfo<Value>& args) {
+void TLSWrap::Start(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
- TLSCallbacks* wrap = Unwrap<TLSCallbacks>(args.Holder());
+ TLSWrap* wrap = Unwrap<TLSWrap>(args.Holder());
if (wrap->started_)
return env->ThrowError("Already started.");
@@ -231,14 +240,14 @@ void TLSCallbacks::Start(const FunctionCallbackInfo<Value>& args) {
}
-void TLSCallbacks::SSLInfoCallback(const SSL* ssl_, int where, int ret) {
+void TLSWrap::SSLInfoCallback(const SSL* ssl_, int where, int ret) {
if (!(where & (SSL_CB_HANDSHAKE_START | SSL_CB_HANDSHAKE_DONE)))
return;
// Be compatible with older versions of OpenSSL. SSL_get_app_data() wants
// a non-const SSL* in OpenSSL <= 0.9.7e.
SSL* ssl = const_cast<SSL*>(ssl_);
- TLSCallbacks* c = static_cast<TLSCallbacks*>(SSL_get_app_data(ssl));
+ TLSWrap* c = static_cast<TLSWrap*>(SSL_get_app_data(ssl));
Environment* env = c->env();
HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context());
@@ -261,7 +270,7 @@ void TLSCallbacks::SSLInfoCallback(const SSL* ssl_, int where, int ret) {
}
-void TLSCallbacks::EncOut() {
+void TLSWrap::EncOut() {
// Ignore cycling data if ClientHello wasn't yet parsed
if (!hello_parser_.IsEnded())
return;
@@ -291,47 +300,49 @@ void TLSCallbacks::EncOut() {
write_size_ = NodeBIO::FromBIO(enc_out_)->PeekMultiple(data, size, &count);
CHECK(write_size_ != 0 && count != 0);
- write_req_.data = this;
+ Local<Object> req_wrap_obj =
+ env()->write_wrap_constructor_function()->NewInstance();
+ char* storage = new char[sizeof(WriteWrap)];
+ WriteWrap* write_req = new(storage) WriteWrap(env(),
+ req_wrap_obj,
+ this,
+ EncOutCb);
+
uv_buf_t buf[ARRAY_SIZE(data)];
for (size_t i = 0; i < count; i++)
buf[i] = uv_buf_init(data[i], size[i]);
- int r = uv_write(&write_req_, wrap()->stream(), buf, count, EncOutCb);
+ int r = stream_->DoWrite(write_req, buf, count, nullptr);
// Ignore errors, this should be already handled in js
- if (!r) {
- if (wrap()->is_tcp()) {
- NODE_COUNT_NET_BYTES_SENT(write_size_);
- } else if (wrap()->is_named_pipe()) {
- NODE_COUNT_PIPE_BYTES_SENT(write_size_);
- }
- }
+ if (!r)
+ NODE_COUNT_NET_BYTES_SENT(write_size_);
}
-void TLSCallbacks::EncOutCb(uv_write_t* req, int status) {
- TLSCallbacks* callbacks = static_cast<TLSCallbacks*>(req->data);
+void TLSWrap::EncOutCb(WriteWrap* req_wrap, int status) {
+ TLSWrap* wrap = req_wrap->wrap()->Cast<TLSWrap>();
// Handle error
if (status) {
// Ignore errors after shutdown
- if (callbacks->shutdown_)
+ if (wrap->shutdown_)
return;
// Notify about error
- callbacks->InvokeQueued(status);
+ wrap->InvokeQueued(status);
return;
}
// Commit
- NodeBIO::FromBIO(callbacks->enc_out_)->Read(nullptr, callbacks->write_size_);
+ NodeBIO::FromBIO(wrap->enc_out_)->Read(nullptr, wrap->write_size_);
// Try writing more data
- callbacks->write_size_ = 0;
- callbacks->EncOut();
+ wrap->write_size_ = 0;
+ wrap->EncOut();
}
-Local<Value> TLSCallbacks::GetSSLError(int status, int* err, const char** msg) {
+Local<Value> TLSWrap::GetSSLError(int status, int* err, const char** msg) {
EscapableHandleScope scope(env()->isolate());
*err = SSL_get_error(ssl_, status);
@@ -373,7 +384,7 @@ Local<Value> TLSCallbacks::GetSSLError(int status, int* err, const char** msg) {
}
-void TLSCallbacks::ClearOut() {
+void TLSWrap::ClearOut() {
// Ignore cycling data if ClientHello wasn't yet parsed
if (!hello_parser_.IsEnded())
return;
@@ -389,22 +400,30 @@ void TLSCallbacks::ClearOut() {
char out[kClearOutChunkSize];
int read;
- do {
+ for (;;) {
read = SSL_read(ssl_, out, sizeof(out));
- if (read > 0) {
- Local<Value> argv[] = {
- Integer::New(env()->isolate(), read),
- Buffer::New(env(), out, read)
- };
- wrap()->MakeCallback(env()->onread_string(), ARRAY_SIZE(argv), argv);
+
+ if (read <= 0)
+ break;
+
+ while (read > 0) {
+ int avail = read;
+
+ uv_buf_t buf;
+ OnAlloc(avail, &buf);
+ if (static_cast<int>(buf.len) < avail)
+ avail = buf.len;
+ memcpy(buf.base, out, avail);
+ OnRead(avail, &buf, UV_UNKNOWN_HANDLE);
+
+ read -= avail;
}
- } while (read > 0);
+ }
int flags = SSL_get_shutdown(ssl_);
if (!eof_ && flags & SSL_RECEIVED_SHUTDOWN) {
eof_ = true;
- Local<Value> arg = Integer::New(env()->isolate(), UV_EOF);
- wrap()->MakeCallback(env()->onread_string(), 1, &arg);
+ OnRead(UV_EOF, nullptr, UV_UNKNOWN_HANDLE);
}
if (read == -1) {
@@ -427,7 +446,7 @@ void TLSCallbacks::ClearOut() {
}
-bool TLSCallbacks::ClearIn() {
+bool TLSWrap::ClearIn() {
// Ignore cycling data if ClientHello wasn't yet parsed
if (!hello_parser_.IsEnded())
return false;
@@ -466,28 +485,67 @@ bool TLSCallbacks::ClearIn() {
}
-const char* TLSCallbacks::Error() const {
+void* TLSWrap::Cast() {
+ return reinterpret_cast<void*>(this);
+}
+
+
+AsyncWrap* TLSWrap::GetAsyncWrap() {
+ return static_cast<AsyncWrap*>(this);
+}
+
+
+bool TLSWrap::IsIPCPipe() const {
+ return stream_->IsIPCPipe();
+}
+
+
+int TLSWrap::GetFD() const {
+ return stream_->GetFD();
+}
+
+
+bool TLSWrap::IsAlive() const {
+ return stream_->IsAlive();
+}
+
+
+bool TLSWrap::IsClosing() const {
+ return stream_->IsClosing();
+}
+
+
+int TLSWrap::ReadStart() {
+ return stream_->ReadStart();
+}
+
+
+int TLSWrap::ReadStop() {
+ return stream_->ReadStop();
+}
+
+
+const char* TLSWrap::Error() const {
return error_;
}
-void TLSCallbacks::ClearError() {
+void TLSWrap::ClearError() {
delete[] error_;
error_ = nullptr;
}
-int TLSCallbacks::TryWrite(uv_buf_t** bufs, size_t* count) {
+int TLSWrap::DoTryWrite(uv_buf_t** bufs, size_t* count) {
// TODO(indutny): Support it
return 0;
}
-int TLSCallbacks::DoWrite(WriteWrap* w,
- uv_buf_t* bufs,
- size_t count,
- uv_stream_t* send_handle,
- uv_write_cb cb) {
+int TLSWrap::DoWrite(WriteWrap* w,
+ uv_buf_t* bufs,
+ size_t count,
+ uv_stream_t* send_handle) {
CHECK_EQ(send_handle, nullptr);
bool empty = true;
@@ -504,11 +562,11 @@ int TLSCallbacks::DoWrite(WriteWrap* w,
// However if there any data that should be written to socket,
// callback should not be invoked immediately
if (BIO_pending(enc_out_) == 0)
- return uv_write(&w->req_, wrap()->stream(), bufs, count, cb);
+ return stream_->DoWrite(w, bufs, count, send_handle);
}
// Queue callback to execute it on next tick
- write_item_queue_.PushBack(new WriteItem(w, cb));
+ write_item_queue_.PushBack(new WriteItem(w));
// Write queued data
if (empty) {
@@ -552,24 +610,51 @@ int TLSCallbacks::DoWrite(WriteWrap* w,
}
-void TLSCallbacks::AfterWrite(WriteWrap* w) {
+void TLSWrap::OnAfterWriteImpl(WriteWrap* w, void* ctx) {
// Intentionally empty
}
-void TLSCallbacks::DoAlloc(uv_handle_t* handle,
- size_t suggested_size,
- uv_buf_t* buf) {
+void TLSWrap::OnAllocImpl(size_t suggested_size, uv_buf_t* buf, void* ctx) {
+ TLSWrap* wrap = static_cast<TLSWrap*>(ctx);
+
size_t size = 0;
- buf->base = NodeBIO::FromBIO(enc_in_)->PeekWritable(&size);
+ buf->base = NodeBIO::FromBIO(wrap->enc_in_)->PeekWritable(&size);
buf->len = size;
}
-void TLSCallbacks::DoRead(uv_stream_t* handle,
- ssize_t nread,
- const uv_buf_t* buf,
- uv_handle_type pending) {
+void TLSWrap::OnReadImpl(ssize_t nread,
+ const uv_buf_t* buf,
+ uv_handle_type pending,
+ void* ctx) {
+ TLSWrap* wrap = static_cast<TLSWrap*>(ctx);
+ wrap->DoRead(nread, buf, pending);
+}
+
+
+void TLSWrap::OnAllocSelf(size_t suggested_size, uv_buf_t* buf, void* ctx) {
+ buf->base = static_cast<char*>(malloc(suggested_size));
+ CHECK_NE(buf->base, nullptr);
+ buf->len = suggested_size;
+}
+
+
+void TLSWrap::OnReadSelf(ssize_t nread,
+ const uv_buf_t* buf,
+ uv_handle_type pending,
+ void* ctx) {
+ TLSWrap* wrap = static_cast<TLSWrap*>(ctx);
+ Local<Object> buf_obj;
+ if (buf != nullptr)
+ buf_obj = Buffer::Use(wrap->env(), buf->base, buf->len);
+ wrap->EmitData(nread, buf_obj, Local<Object>());
+}
+
+
+void TLSWrap::DoRead(ssize_t nread,
+ const uv_buf_t* buf,
+ uv_handle_type pending) {
if (nread < 0) {
// Error should be emitted only after all data was read
ClearOut();
@@ -583,8 +668,7 @@ void TLSCallbacks::DoRead(uv_stream_t* handle,
HandleScope handle_scope(env()->isolate());
Context::Scope context_scope(env()->context());
- Local<Value> arg = Integer::New(env()->isolate(), nread);
- wrap()->MakeCallback(env()->onread_string(), 1, &arg);
+ OnRead(nread, nullptr, UV_UNKNOWN_HANDLE);
return;
}
@@ -608,19 +692,19 @@ void TLSCallbacks::DoRead(uv_stream_t* handle,
}
-int TLSCallbacks::DoShutdown(ShutdownWrap* req_wrap, uv_shutdown_cb cb) {
+int TLSWrap::DoShutdown(ShutdownWrap* req_wrap) {
if (SSL_shutdown(ssl_) == 0)
SSL_shutdown(ssl_);
shutdown_ = true;
EncOut();
- return StreamWrapCallbacks::DoShutdown(req_wrap, cb);
+ return stream_->DoShutdown(req_wrap);
}
-void TLSCallbacks::SetVerifyMode(const FunctionCallbackInfo<Value>& args) {
+void TLSWrap::SetVerifyMode(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
- TLSCallbacks* wrap = Unwrap<TLSCallbacks>(args.Holder());
+ TLSWrap* wrap = Unwrap<TLSWrap>(args.Holder());
if (args.Length() < 2 || !args[0]->IsBoolean() || !args[1]->IsBoolean())
return env->ThrowTypeError("Bad arguments, expected two booleans");
@@ -647,34 +731,34 @@ void TLSCallbacks::SetVerifyMode(const FunctionCallbackInfo<Value>& args) {
}
-void TLSCallbacks::EnableSessionCallbacks(
+void TLSWrap::EnableSessionCallbacks(
const FunctionCallbackInfo<Value>& args) {
- TLSCallbacks* wrap = Unwrap<TLSCallbacks>(args.Holder());
+ TLSWrap* wrap = Unwrap<TLSWrap>(args.Holder());
wrap->enable_session_callbacks();
EnableHelloParser(args);
}
-void TLSCallbacks::EnableHelloParser(const FunctionCallbackInfo<Value>& args) {
- TLSCallbacks* wrap = Unwrap<TLSCallbacks>(args.Holder());
+void TLSWrap::EnableHelloParser(const FunctionCallbackInfo<Value>& args) {
+ TLSWrap* wrap = Unwrap<TLSWrap>(args.Holder());
NodeBIO::FromBIO(wrap->enc_in_)->set_initial(kMaxHelloLength);
- wrap->hello_parser_.Start(SSLWrap<TLSCallbacks>::OnClientHello,
+ wrap->hello_parser_.Start(SSLWrap<TLSWrap>::OnClientHello,
OnClientHelloParseEnd,
wrap);
}
-void TLSCallbacks::OnClientHelloParseEnd(void* arg) {
- TLSCallbacks* c = static_cast<TLSCallbacks*>(arg);
+void TLSWrap::OnClientHelloParseEnd(void* arg) {
+ TLSWrap* c = static_cast<TLSWrap*>(arg);
c->Cycle();
}
#ifdef SSL_CTRL_SET_TLSEXT_SERVERNAME_CB
-void TLSCallbacks::GetServername(const FunctionCallbackInfo<Value>& args) {
+void TLSWrap::GetServername(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
- TLSCallbacks* wrap = Unwrap<TLSCallbacks>(args.Holder());
+ TLSWrap* wrap = Unwrap<TLSWrap>(args.Holder());
const char* servername = SSL_get_servername(wrap->ssl_,
TLSEXT_NAMETYPE_host_name);
@@ -686,10 +770,10 @@ void TLSCallbacks::GetServername(const FunctionCallbackInfo<Value>& args) {
}
-void TLSCallbacks::SetServername(const FunctionCallbackInfo<Value>& args) {
+void TLSWrap::SetServername(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
- TLSCallbacks* wrap = Unwrap<TLSCallbacks>(args.Holder());
+ TLSWrap* wrap = Unwrap<TLSWrap>(args.Holder());
if (args.Length() < 1 || !args[0]->IsString())
return env->ThrowTypeError("First argument should be a string");
@@ -707,8 +791,8 @@ void TLSCallbacks::SetServername(const FunctionCallbackInfo<Value>& args) {
}
-int TLSCallbacks::SelectSNIContextCallback(SSL* s, int* ad, void* arg) {
- TLSCallbacks* p = static_cast<TLSCallbacks*>(SSL_get_app_data(s));
+int TLSWrap::SelectSNIContextCallback(SSL* s, int* ad, void* arg) {
+ TLSWrap* p = static_cast<TLSWrap*>(SSL_get_app_data(s));
Environment* env = p->env();
const char* servername = SSL_get_servername(s, TLSEXT_NAMETYPE_host_name);
@@ -744,12 +828,12 @@ int TLSCallbacks::SelectSNIContextCallback(SSL* s, int* ad, void* arg) {
#endif // SSL_CTRL_SET_TLSEXT_SERVERNAME_CB
-void TLSCallbacks::Initialize(Handle<Object> target,
+void TLSWrap::Initialize(Handle<Object> target,
Handle<Value> unused,
Handle<Context> context) {
Environment* env = Environment::GetCurrent(context);
- env->SetMethod(target, "wrap", TLSCallbacks::Wrap);
+ env->SetMethod(target, "wrap", TLSWrap::Wrap);
Local<FunctionTemplate> t = FunctionTemplate::New(env->isolate());
t->InstanceTemplate()->SetInternalFieldCount(1);
@@ -761,16 +845,18 @@ void TLSCallbacks::Initialize(Handle<Object> target,
env->SetProtoMethod(t, "enableSessionCallbacks", EnableSessionCallbacks);
env->SetProtoMethod(t, "enableHelloParser", EnableHelloParser);
- SSLWrap<TLSCallbacks>::AddMethods(env, t);
+ StreamBase::AddMethods<TLSWrap>(env, t);
+ SSLWrap<TLSWrap>::AddMethods(env, t);
#ifdef SSL_CTRL_SET_TLSEXT_SERVERNAME_CB
env->SetProtoMethod(t, "getServername", GetServername);
env->SetProtoMethod(t, "setServername", SetServername);
#endif // SSL_CRT_SET_TLSEXT_SERVERNAME_CB
+ env->set_tls_wrap_constructor_template(t);
env->set_tls_wrap_constructor_function(t->GetFunction());
}
} // namespace node
-NODE_MODULE_CONTEXT_AWARE_BUILTIN(tls_wrap, node::TLSCallbacks::Initialize)
+NODE_MODULE_CONTEXT_AWARE_BUILTIN(tls_wrap, node::TLSWrap::Initialize)
diff --git a/src/tls_wrap.h b/src/tls_wrap.h
index 3815878d58..42452055ce 100644
--- a/src/tls_wrap.h
+++ b/src/tls_wrap.h
@@ -21,33 +21,33 @@ namespace crypto {
class SecureContext;
}
-class TLSCallbacks : public crypto::SSLWrap<TLSCallbacks>,
- public StreamWrapCallbacks,
- public AsyncWrap {
+class TLSWrap : public crypto::SSLWrap<TLSWrap>,
+ public StreamBase,
+ public AsyncWrap {
public:
- ~TLSCallbacks() override;
+ ~TLSWrap() override;
static void Initialize(v8::Handle<v8::Object> target,
v8::Handle<v8::Value> unused,
v8::Handle<v8::Context> context);
- const char* Error() const override;
- void ClearError() override;
- int TryWrite(uv_buf_t** bufs, size_t* count) override;
+ void* Cast() override;
+ int GetFD() const override;
+ bool IsAlive() const override;
+ bool IsClosing() const override;
+
+ // JavaScript functions
+ int ReadStart() override;
+ int ReadStop() override;
+
+ int DoShutdown(ShutdownWrap* req_wrap) override;
+ int DoTryWrite(uv_buf_t** bufs, size_t* count) override;
int DoWrite(WriteWrap* w,
uv_buf_t* bufs,
size_t count,
- uv_stream_t* send_handle,
- uv_write_cb cb) override;
- void AfterWrite(WriteWrap* w) override;
- void DoAlloc(uv_handle_t* handle,
- size_t suggested_size,
- uv_buf_t* buf) override;
- void DoRead(uv_stream_t* handle,
- ssize_t nread,
- const uv_buf_t* buf,
- uv_handle_type pending) override;
- int DoShutdown(ShutdownWrap* req_wrap, uv_shutdown_cb cb) override;
+ uv_stream_t* send_handle) override;
+ const char* Error() const override;
+ void ClearError() override;
void NewSessionDoneCb();
@@ -66,27 +66,26 @@ class TLSCallbacks : public crypto::SSLWrap<TLSCallbacks>,
// Write callback queue's item
class WriteItem {
public:
- WriteItem(WriteWrap* w, uv_write_cb cb) : w_(w), cb_(cb) {
+ explicit WriteItem(WriteWrap* w) : w_(w) {
}
~WriteItem() {
w_ = nullptr;
- cb_ = nullptr;
}
WriteWrap* w_;
- uv_write_cb cb_;
ListNode<WriteItem> member_;
};
- TLSCallbacks(Environment* env,
- Kind kind,
- v8::Handle<v8::Object> sc,
- StreamWrapCallbacks* old);
+ TLSWrap(Environment* env,
+ Kind kind,
+ StreamBase* steram,
+ v8::Handle<v8::Object> stream_obj,
+ v8::Handle<v8::Object> sc);
static void SSLInfoCallback(const SSL* ssl_, int where, int ret);
void InitSSL();
void EncOut();
- static void EncOutCb(uv_write_t* req, int status);
+ static void EncOutCb(WriteWrap* req_wrap, int status);
bool ClearIn();
void ClearOut();
void MakePending();
@@ -104,6 +103,25 @@ class TLSCallbacks : public crypto::SSLWrap<TLSCallbacks>,
}
}
+ AsyncWrap* GetAsyncWrap() override;
+ bool IsIPCPipe() const override;
+
+ // Resource implementation
+ static void OnAfterWriteImpl(WriteWrap* w, void* ctx);
+ static void OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx);
+ static void OnReadImpl(ssize_t nread,
+ const uv_buf_t* buf,
+ uv_handle_type pending,
+ void* ctx);
+ static void OnAfterWriteSelf(WriteWrap* w, void* ctx);
+ static void OnAllocSelf(size_t size, uv_buf_t* buf, void* ctx);
+ static void OnReadSelf(ssize_t nread,
+ const uv_buf_t* buf,
+ uv_handle_type pending,
+ void* ctx);
+
+ void DoRead(ssize_t nread, const uv_buf_t* buf, uv_handle_type pending);
+
// If |msg| is not nullptr, caller is responsible for calling `delete[] *msg`.
v8::Local<v8::Value> GetSSLError(int status, int* err, const char** msg);
@@ -125,10 +143,11 @@ class TLSCallbacks : public crypto::SSLWrap<TLSCallbacks>,
crypto::SecureContext* sc_;
v8::Persistent<v8::Object> sc_handle_;
+ StreamBase* stream_;
+ v8::Persistent<v8::Object> stream_handle_;
BIO* enc_in_;
BIO* enc_out_;
NodeBIO* clear_in_;
- uv_write_t write_req_;
size_t write_size_;
size_t write_queue_size_;
typedef ListHead<WriteItem, &WriteItem::member_> WriteItemList;
diff --git a/src/tty_wrap.cc b/src/tty_wrap.cc
index 08c50d911f..186f2f0100 100644
--- a/src/tty_wrap.cc
+++ b/src/tty_wrap.cc
@@ -36,26 +36,10 @@ void TTYWrap::Initialize(Handle<Object> target,
t->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "TTY"));
t->InstanceTemplate()->SetInternalFieldCount(1);
- enum PropertyAttribute attributes =
- static_cast<PropertyAttribute>(v8::ReadOnly | v8::DontDelete);
- t->InstanceTemplate()->SetAccessor(env->fd_string(),
- StreamWrap::GetFD,
- nullptr,
- Handle<Value>(),
- v8::DEFAULT,
- attributes);
-
env->SetProtoMethod(t, "close", HandleWrap::Close);
env->SetProtoMethod(t, "unref", HandleWrap::Unref);
- env->SetProtoMethod(t, "readStart", StreamWrap::ReadStart);
- env->SetProtoMethod(t, "readStop", StreamWrap::ReadStop);
-
- env->SetProtoMethod(t, "writeBuffer", StreamWrap::WriteBuffer);
- env->SetProtoMethod(t, "writeAsciiString", StreamWrap::WriteAsciiString);
- env->SetProtoMethod(t, "writeUtf8String", StreamWrap::WriteUtf8String);
- env->SetProtoMethod(t, "writeUcs2String", StreamWrap::WriteUcs2String);
- env->SetProtoMethod(t, "writeBinaryString", StreamWrap::WriteBinaryString);
+ StreamWrap::AddMethods(env, t);
env->SetProtoMethod(t, "getWindowSize", TTYWrap::GetWindowSize);
env->SetProtoMethod(t, "setRawMode", SetRawMode);
diff --git a/test/parallel/test-tls-client-default-ciphers.js b/test/parallel/test-tls-client-default-ciphers.js
index 1eb74e6981..dfae4a7bb9 100644
--- a/test/parallel/test-tls-client-default-ciphers.js
+++ b/test/parallel/test-tls-client-default-ciphers.js
@@ -2,13 +2,21 @@ var assert = require('assert');
var common = require('../common');
var tls = require('tls');
+function Done() {}
+
function test1() {
var ciphers = '';
+
tls.createSecureContext = function(options) {
- ciphers = options.ciphers
+ ciphers = options.ciphers;
+ throw new Done();
+ }
+
+ try {
+ var s = tls.connect(common.PORT);
+ } catch (e) {
+ assert(e instanceof Done);
}
- var s = tls.connect(common.PORT);
- s.destroy();
assert.equal(ciphers, tls.DEFAULT_CIPHERS);
}
test1();
diff --git a/test/parallel/test-tls-close-notify.js b/test/parallel/test-tls-close-notify.js
index 54f7314e2f..c5decad5e5 100644
--- a/test/parallel/test-tls-close-notify.js
+++ b/test/parallel/test-tls-close-notify.js
@@ -17,8 +17,8 @@ var server = tls.createServer({
cert: fs.readFileSync(common.fixturesDir + '/keys/agent1-cert.pem')
}, function(c) {
// Send close-notify without shutting down TCP socket
- if (c.ssl.shutdown() !== 1)
- c.ssl.shutdown();
+ if (c._handle.shutdownSSL() !== 1)
+ c._handle.shutdownSSL();
}).listen(common.PORT, function() {
var c = tls.connect(common.PORT, {
rejectUnauthorized: false
diff --git a/test/parallel/test-tls-multi-key.js b/test/parallel/test-tls-multi-key.js
index cdf8500874..657d9084d4 100644
--- a/test/parallel/test-tls-multi-key.js
+++ b/test/parallel/test-tls-multi-key.js
@@ -28,15 +28,14 @@ var server = tls.createServer(options, function(conn) {
ciphers: 'ECDHE-ECDSA-AES256-GCM-SHA384',
rejectUnauthorized: false
}, function() {
+ ciphers.push(ecdsa.getCipher());
var rsa = tls.connect(common.PORT, {
ciphers: 'ECDHE-RSA-AES256-GCM-SHA384',
rejectUnauthorized: false
}, function() {
+ ciphers.push(rsa.getCipher());
ecdsa.destroy();
rsa.destroy();
-
- ciphers.push(ecdsa.getCipher());
- ciphers.push(rsa.getCipher());
server.close();
});
});