summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFedor Indutny <fedor@indutny.com>2015-02-23 23:09:44 +0300
committerFedor Indutny <fedor@indutny.com>2015-02-24 22:38:21 +0300
commit1738c7783526868d86cb213414cb4d40c5a89662 (patch)
tree956d199d9e1b7f793bfe60db58bef27583ca1ebb
parente00c938d246c29897344be3b3060533bb4ad7806 (diff)
downloadandroid-node-v8-1738c7783526868d86cb213414cb4d40c5a89662.tar.gz
android-node-v8-1738c7783526868d86cb213414cb4d40c5a89662.tar.bz2
android-node-v8-1738c7783526868d86cb213414cb4d40c5a89662.zip
streams: introduce StreamWrap and JSStream
Introduce a way to wrap plain-js `stream.Duplex` streams into C++ StreamBase's child class. With such method at hand it is now possible to pass `stream.Duplex` instance as a `socket` parameter to `tls.connect()`. PR-URL: https://github.com/iojs/io.js/pull/926 Reviewed-By: Chris Dickinson <christopher.s.dickinson@gmail.com>
-rw-r--r--lib/_stream_wrap.js118
-rw-r--r--lib/_tls_wrap.js15
-rw-r--r--node.gyp3
-rw-r--r--src/async-wrap.h1
-rw-r--r--src/env.h7
-rw-r--r--src/js_stream.cc199
-rw-r--r--src/js_stream.h30
-rw-r--r--src/node_wrap.h5
-rw-r--r--src/stream_base.cc26
-rw-r--r--src/stream_base.h16
-rw-r--r--src/stream_wrap.cc18
-rw-r--r--src/stream_wrap.h10
-rw-r--r--src/tls_wrap.cc22
-rw-r--r--src/tls_wrap.h11
-rw-r--r--test/parallel/test-tls-js-stream.js69
15 files changed, 498 insertions, 52 deletions
diff --git a/lib/_stream_wrap.js b/lib/_stream_wrap.js
new file mode 100644
index 0000000000..c3dcfe51b6
--- /dev/null
+++ b/lib/_stream_wrap.js
@@ -0,0 +1,118 @@
+const util = require('util');
+const Socket = require('net').Socket;
+const JSStream = process.binding('js_stream').JSStream;
+const uv = process.binding('uv');
+
+function StreamWrap(stream) {
+ var handle = new JSStream();
+
+ this.stream = stream;
+
+ var self = this;
+ handle.close = function(cb) {
+ cb();
+ };
+ handle.isAlive = function() {
+ return self.isAlive();
+ };
+ handle.isClosing = function() {
+ return self.isClosing();
+ };
+ handle.onreadstart = function() {
+ return self.readStart();
+ };
+ handle.onreadstop = function() {
+ return self.readStop();
+ };
+ handle.onshutdown = function(req) {
+ return self.shutdown(req);
+ };
+ handle.onwrite = function(req, bufs) {
+ return self.write(req, bufs);
+ };
+
+ this.stream.pause();
+ this.stream.on('data', function(chunk) {
+ self._handle.readBuffer(chunk);
+ });
+ this.stream.once('end', function() {
+ self._handle.emitEOF();
+ });
+ this.stream.on('error', function(err) {
+ self.emit('error', err);
+ });
+
+ Socket.call(this, {
+ handle: handle
+ });
+}
+util.inherits(StreamWrap, Socket);
+module.exports = StreamWrap;
+
+// require('_stream_wrap').StreamWrap
+StreamWrap.StreamWrap = StreamWrap;
+
+StreamWrap.prototype.isAlive = function isAlive() {
+ return this.readable && this.writable;
+};
+
+StreamWrap.prototype.isClosing = function isClosing() {
+ return !this.isAlive();
+};
+
+StreamWrap.prototype.readStart = function readStart() {
+ this.stream.resume();
+ return 0;
+};
+
+StreamWrap.prototype.readStop = function readStop() {
+ this.stream.pause();
+ return 0;
+};
+
+StreamWrap.prototype.shutdown = function shutdown(req) {
+ var self = this;
+
+ this.stream.end(function() {
+ // Ensure that write was dispatched
+ setImmediate(function() {
+ self._handle.finishShutdown(req, 0);
+ });
+ });
+ return 0;
+};
+
+StreamWrap.prototype.write = function write(req, bufs) {
+ var pending = bufs.length;
+ var self = this;
+
+ self.stream.cork();
+ bufs.forEach(function(buf) {
+ self.stream.write(buf, done);
+ });
+ self.stream.uncork();
+
+ function done(err) {
+ if (!err && --pending !== 0)
+ return;
+
+ // Ensure that this is called once in case of error
+ pending = 0;
+
+ // Ensure that write was dispatched
+ setImmediate(function() {
+ var errCode = 0;
+ if (err) {
+ if (err.code && uv['UV_' + err.code])
+ errCode = uv['UV_' + err.code];
+ else
+ errCode = uv.UV_EPIPE;
+ }
+
+ self._handle.doAfterWrite(req);
+ self._handle.finishWrite(req, errCode);
+ });
+ }
+
+ return 0;
+};
diff --git a/lib/_tls_wrap.js b/lib/_tls_wrap.js
index 10221b99c3..41421e1988 100644
--- a/lib/_tls_wrap.js
+++ b/lib/_tls_wrap.js
@@ -7,6 +7,8 @@ const tls = require('tls');
const util = require('util');
const listenerCount = require('events').listenerCount;
const common = require('_tls_common');
+const StreamWrap = require('_stream_wrap').StreamWrap;
+const Duplex = require('stream').Duplex;
const debug = util.debuglog('tls');
const Timer = process.binding('timer_wrap').Timer;
const tls_wrap = process.binding('tls_wrap');
@@ -224,6 +226,10 @@ function TLSSocket(socket, options) {
this.authorized = false;
this.authorizationError = null;
+ // Wrap plain JS Stream into StreamWrap
+ if (!(socket instanceof net.Socket) && socket instanceof Duplex)
+ socket = new StreamWrap(socket);
+
// Just a documented property to make secure sockets
// distinguishable from regular ones.
this.encrypted = true;
@@ -280,7 +286,8 @@ TLSSocket.prototype._wrapHandle = function(handle) {
// Proxy HandleWrap, PipeWrap and TCPWrap methods
proxiedMethods.forEach(function(name) {
res[name] = function methodProxy() {
- return handle[name].apply(handle, arguments);
+ if (handle[name])
+ return handle[name].apply(handle, arguments);
};
});
@@ -373,7 +380,7 @@ TLSSocket.prototype._init = function(socket) {
this.setTimeout(options.handshakeTimeout, this._handleTimeout);
// Socket already has some buffered data - emulate receiving it
- if (socket && socket._readableState.length) {
+ if (socket && socket._readableState && socket._readableState.length) {
var buf;
while ((buf = socket.read()) !== null)
ssl.receive(buf);
@@ -388,6 +395,10 @@ TLSSocket.prototype._init = function(socket) {
self._connecting = false;
self.emit('connect');
});
+
+ socket.on('error', function(err) {
+ self._tlsError(err);
+ });
}
// Assume `tls.connect()`
diff --git a/node.gyp b/node.gyp
index 996121ee45..4af27d8de0 100644
--- a/node.gyp
+++ b/node.gyp
@@ -56,6 +56,7 @@
'lib/_stream_duplex.js',
'lib/_stream_transform.js',
'lib/_stream_passthrough.js',
+ 'lib/_stream_wrap.js',
'lib/string_decoder.js',
'lib/sys.js',
'lib/timers.js',
@@ -95,6 +96,7 @@
'src/fs_event_wrap.cc',
'src/cares_wrap.cc',
'src/handle_wrap.cc',
+ 'src/js_stream.cc',
'src/node.cc',
'src/node_buffer.cc',
'src/node_constants.cc',
@@ -132,6 +134,7 @@
'src/env.h',
'src/env-inl.h',
'src/handle_wrap.h',
+ 'src/js_stream.h',
'src/node.h',
'src/node_buffer.h',
'src/node_constants.h',
diff --git a/src/async-wrap.h b/src/async-wrap.h
index 86748a5fef..5e898fe4c2 100644
--- a/src/async-wrap.h
+++ b/src/async-wrap.h
@@ -17,6 +17,7 @@ namespace node {
V(FSREQWRAP) \
V(GETADDRINFOREQWRAP) \
V(GETNAMEINFOREQWRAP) \
+ V(JSSTREAM) \
V(PIPEWRAP) \
V(PROCESSWRAP) \
V(QUERYWRAP) \
diff --git a/src/env.h b/src/env.h
index c9b4cc0736..18fed18304 100644
--- a/src/env.h
+++ b/src/env.h
@@ -107,6 +107,8 @@ namespace node {
V(ipv4_string, "IPv4") \
V(ipv6_lc_string, "ipv6") \
V(ipv6_string, "IPv6") \
+ V(isalive_string, "isAlive") \
+ V(isclosing_string, "isClosing") \
V(issuer_string, "issuer") \
V(issuercert_string, "issuerCertificate") \
V(kill_signal_string, "killSignal") \
@@ -141,9 +143,13 @@ namespace node {
V(onnewsessiondone_string, "onnewsessiondone") \
V(onocspresponse_string, "onocspresponse") \
V(onread_string, "onread") \
+ V(onreadstart_string, "onreadstart") \
+ V(onreadstop_string, "onreadstop") \
V(onselect_string, "onselect") \
+ V(onshutdown_string, "onshutdown") \
V(onsignal_string, "onsignal") \
V(onstop_string, "onstop") \
+ V(onwrite_string, "onwrite") \
V(output_string, "output") \
V(order_string, "order") \
V(owner_string, "owner") \
@@ -225,6 +231,7 @@ namespace node {
V(context, v8::Context) \
V(domain_array, v8::Array) \
V(fs_stats_constructor_function, v8::Function) \
+ V(jsstream_constructor_template, v8::FunctionTemplate) \
V(module_load_list_array, v8::Array) \
V(pipe_constructor_template, v8::FunctionTemplate) \
V(process_object, v8::Object) \
diff --git a/src/js_stream.cc b/src/js_stream.cc
index 3cc3a895fc..38ab847954 100644
--- a/src/js_stream.cc
+++ b/src/js_stream.cc
@@ -3,19 +3,218 @@
#include "async-wrap.h"
#include "env.h"
#include "env-inl.h"
+#include "node_buffer.h"
#include "stream_base.h"
#include "v8.h"
namespace node {
+using v8::Array;
using v8::Context;
+using v8::External;
+using v8::FunctionCallbackInfo;
+using v8::FunctionTemplate;
using v8::Handle;
+using v8::HandleScope;
+using v8::Local;
using v8::Object;
using v8::Value;
+
+JSStream::JSStream(Environment* env, Handle<Object> obj, AsyncWrap* parent)
+ : StreamBase(env),
+ AsyncWrap(env, obj, AsyncWrap::PROVIDER_JSSTREAM, parent) {
+ node::Wrap(obj, this);
+}
+
+
+JSStream::~JSStream() {
+}
+
+
+void* JSStream::Cast() {
+ return static_cast<void*>(this);
+}
+
+
+AsyncWrap* JSStream::GetAsyncWrap() {
+ return static_cast<AsyncWrap*>(this);
+}
+
+
+bool JSStream::IsAlive() {
+ return MakeCallback(env()->isalive_string(), 0, nullptr)->IsTrue();
+}
+
+
+bool JSStream::IsClosing() {
+ return MakeCallback(env()->isclosing_string(), 0, nullptr)->IsTrue();
+}
+
+
+int JSStream::ReadStart() {
+ return MakeCallback(env()->onreadstart_string(), 0, nullptr)->Int32Value();
+}
+
+
+int JSStream::ReadStop() {
+ return MakeCallback(env()->onreadstop_string(), 0, nullptr)->Int32Value();
+}
+
+
+int JSStream::DoShutdown(ShutdownWrap* req_wrap) {
+ HandleScope scope(env()->isolate());
+
+ Local<Value> argv[] = {
+ req_wrap->object()
+ };
+
+ Local<Value> res =
+ MakeCallback(env()->onshutdown_string(), ARRAY_SIZE(argv), argv);
+
+ return res->Int32Value();
+}
+
+
+int JSStream::DoWrite(WriteWrap* w,
+ uv_buf_t* bufs,
+ size_t count,
+ uv_stream_t* send_handle) {
+ CHECK_EQ(send_handle, nullptr);
+
+ HandleScope scope(env()->isolate());
+
+ Local<Array> bufs_arr = Array::New(env()->isolate(), count);
+ for (size_t i = 0; i < count; i++)
+ bufs_arr->Set(i, Buffer::New(env(), bufs[0].base, bufs[0].len));
+
+ Local<Value> argv[] = {
+ w->object(),
+ bufs_arr
+ };
+
+ Local<Value> res =
+ MakeCallback(env()->onwrite_string(), ARRAY_SIZE(argv), argv);
+
+ return res->Int32Value();
+}
+
+
+void JSStream::New(const FunctionCallbackInfo<Value>& args) {
+ // This constructor should not be exposed to public javascript.
+ // Therefore we assert that we are not trying to call this as a
+ // normal function.
+ CHECK(args.IsConstructCall());
+ Environment* env = Environment::GetCurrent(args);
+ JSStream* wrap;
+
+ if (args.Length() == 0) {
+ wrap = new JSStream(env, args.This(), nullptr);
+ } else if (args[0]->IsExternal()) {
+ void* ptr = args[0].As<External>()->Value();
+ wrap = new JSStream(env, args.This(), static_cast<AsyncWrap*>(ptr));
+ } else {
+ UNREACHABLE();
+ }
+ CHECK(wrap);
+}
+
+
+static void FreeCallback(char* data, void* hint) {
+ // Intentional no-op
+}
+
+
+void JSStream::DoAlloc(const FunctionCallbackInfo<Value>& args) {
+ JSStream* wrap = Unwrap<JSStream>(args.Holder());
+
+ uv_buf_t buf;
+ wrap->OnAlloc(args[0]->Int32Value(), &buf);
+ args.GetReturnValue().Set(Buffer::New(wrap->env(),
+ buf.base,
+ buf.len,
+ FreeCallback,
+ nullptr));
+}
+
+
+void JSStream::DoRead(const FunctionCallbackInfo<Value>& args) {
+ JSStream* wrap = Unwrap<JSStream>(args.Holder());
+
+ CHECK(Buffer::HasInstance(args[1]));
+ uv_buf_t buf = uv_buf_init(Buffer::Data(args[1]), Buffer::Length(args[1]));
+ wrap->OnRead(args[0]->Int32Value(), &buf);
+}
+
+
+void JSStream::DoAfterWrite(const FunctionCallbackInfo<Value>& args) {
+ JSStream* wrap = Unwrap<JSStream>(args.Holder());
+ WriteWrap* w = Unwrap<WriteWrap>(args[0].As<Object>());
+
+ wrap->OnAfterWrite(w);
+}
+
+
+template <class Wrap>
+void JSStream::Finish(const FunctionCallbackInfo<Value>& args) {
+ Wrap* w = Unwrap<Wrap>(args[0].As<Object>());
+
+ w->Done(args[0]->Int32Value());
+}
+
+
+void JSStream::ReadBuffer(const FunctionCallbackInfo<Value>& args) {
+ JSStream* wrap = Unwrap<JSStream>(args.Holder());
+
+ CHECK(Buffer::HasInstance(args[0]));
+ char* data = Buffer::Data(args[0]);
+ int len = Buffer::Length(args[0]);
+
+ do {
+ uv_buf_t buf;
+ ssize_t avail = len;
+ wrap->OnAlloc(len, &buf);
+ if (static_cast<ssize_t>(buf.len) < avail)
+ avail = buf.len;
+
+ memcpy(buf.base, data, avail);
+ data += avail;
+ len -= avail;
+ wrap->OnRead(avail, &buf);
+ } while (len != 0);
+}
+
+
+void JSStream::EmitEOF(const FunctionCallbackInfo<Value>& args) {
+ JSStream* wrap = Unwrap<JSStream>(args.Holder());
+
+ wrap->OnRead(UV_EOF, nullptr);
+}
+
+
void JSStream::Initialize(Handle<Object> target,
Handle<Value> unused,
Handle<Context> context) {
+ Environment* env = Environment::GetCurrent(context);
+
+ Local<FunctionTemplate> t = env->NewFunctionTemplate(New);
+ t->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "JSStream"));
+ t->InstanceTemplate()->SetInternalFieldCount(1);
+
+ env->SetProtoMethod(t, "doAlloc", DoAlloc);
+ env->SetProtoMethod(t, "doRead", DoRead);
+ env->SetProtoMethod(t, "doAfterWrite", DoAfterWrite);
+ env->SetProtoMethod(t, "finishWrite", Finish<WriteWrap>);
+ env->SetProtoMethod(t, "finishShutdown", Finish<ShutdownWrap>);
+ env->SetProtoMethod(t, "readBuffer", ReadBuffer);
+ env->SetProtoMethod(t, "emitEOF", EmitEOF);
+
+ StreamBase::AddMethods<JSStream>(env, t);
+ target->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "JSStream"),
+ t->GetFunction());
+ env->set_jsstream_constructor_template(t);
}
} // namespace node
+
+NODE_MODULE_CONTEXT_AWARE_BUILTIN(js_stream, node::JSStream::Initialize)
diff --git a/src/js_stream.h b/src/js_stream.h
index 6a2d3bfb4f..8e2ff13258 100644
--- a/src/js_stream.h
+++ b/src/js_stream.h
@@ -8,11 +8,39 @@
namespace node {
-class JSStream : public StreamBase {
+class JSStream : public StreamBase, public AsyncWrap {
public:
static void Initialize(v8::Handle<v8::Object> target,
v8::Handle<v8::Value> unused,
v8::Handle<v8::Context> context);
+
+ void* Cast() override;
+ bool IsAlive() override;
+ bool IsClosing() override;
+ int ReadStart() override;
+ int ReadStop() override;
+
+ int DoShutdown(ShutdownWrap* req_wrap) override;
+ int DoWrite(WriteWrap* w,
+ uv_buf_t* bufs,
+ size_t count,
+ uv_stream_t* send_handle) override;
+
+ protected:
+ JSStream(Environment* env, v8::Handle<v8::Object> obj, AsyncWrap* parent);
+ ~JSStream();
+
+ AsyncWrap* GetAsyncWrap() override;
+
+ static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
+ static void DoAlloc(const v8::FunctionCallbackInfo<v8::Value>& args);
+ static void DoRead(const v8::FunctionCallbackInfo<v8::Value>& args);
+ static void DoAfterWrite(const v8::FunctionCallbackInfo<v8::Value>& args);
+ static void ReadBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
+ static void EmitEOF(const v8::FunctionCallbackInfo<v8::Value>& args);
+
+ template <class Wrap>
+ static void Finish(const v8::FunctionCallbackInfo<v8::Value>& args);
};
} // namespace node
diff --git a/src/node_wrap.h b/src/node_wrap.h
index ddd7bd16e0..58b042a63b 100644
--- a/src/node_wrap.h
+++ b/src/node_wrap.h
@@ -3,6 +3,7 @@
#include "env.h"
#include "env-inl.h"
+#include "js_stream.h"
#include "pipe_wrap.h"
#include "tcp_wrap.h"
#include "tty_wrap.h"
@@ -40,6 +41,10 @@ namespace node {
env->tls_wrap_constructor_template()->HasInstance(obj)) { \
TLSWrap* const wrap = Unwrap<TLSWrap>(obj); \
BODY \
+ } else if (env->jsstream_constructor_template().IsEmpty() == false && \
+ env->jsstream_constructor_template()->HasInstance(obj)) { \
+ JSStream* const wrap = Unwrap<JSStream>(obj); \
+ BODY \
} \
}); \
} while (0)
diff --git a/src/stream_base.cc b/src/stream_base.cc
index 0a1324bb58..82b1d65396 100644
--- a/src/stream_base.cc
+++ b/src/stream_base.cc
@@ -5,6 +5,7 @@
#include "node_buffer.h"
#include "env.h"
#include "env-inl.h"
+#include "js_stream.h"
#include "string_bytes.h"
#include "tls_wrap.h"
#include "util.h"
@@ -34,6 +35,8 @@ template void StreamBase::AddMethods<StreamWrap>(Environment* env,
Handle<FunctionTemplate> t);
template void StreamBase::AddMethods<TLSWrap>(Environment* env,
Handle<FunctionTemplate> t);
+template void StreamBase::AddMethods<JSStream>(Environment* env,
+ Handle<FunctionTemplate> t);
template <class Base>
@@ -488,8 +491,29 @@ void StreamBase::EmitData(ssize_t nread,
}
-AsyncWrap* StreamBase::GetAsyncWrap() {
+bool StreamBase::IsIPCPipe() {
+ return false;
+}
+
+
+int StreamBase::GetFD() {
+ return -1;
+}
+
+
+int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) {
+ // No TryWrite by default
+ return 0;
+}
+
+
+const char* StreamResource::Error() const {
return nullptr;
}
+
+void StreamResource::ClearError() {
+ // No-op
+}
+
} // namespace node
diff --git a/src/stream_base.h b/src/stream_base.h
index d6b3a555b0..87aae05973 100644
--- a/src/stream_base.h
+++ b/src/stream_base.h
@@ -106,13 +106,13 @@ class StreamResource {
virtual ~StreamResource() = default;
virtual int DoShutdown(ShutdownWrap* req_wrap) = 0;
- virtual int DoTryWrite(uv_buf_t** bufs, size_t* count) = 0;
+ virtual int DoTryWrite(uv_buf_t** bufs, size_t* count);
virtual int DoWrite(WriteWrap* w,
uv_buf_t* bufs,
size_t count,
uv_stream_t* send_handle) = 0;
- virtual const char* Error() const = 0;
- virtual void ClearError() = 0;
+ virtual const char* Error() const;
+ virtual void ClearError();
// Events
inline void OnAfterWrite(WriteWrap* w) {
@@ -127,7 +127,7 @@ class StreamResource {
inline void OnRead(size_t nread,
const uv_buf_t* buf,
- uv_handle_type pending) {
+ uv_handle_type pending = UV_UNKNOWN_HANDLE) {
if (read_cb_ != nullptr)
read_cb_(nread, buf, pending, read_ctx_);
}
@@ -163,10 +163,10 @@ class StreamBase : public StreamResource {
v8::Handle<v8::FunctionTemplate> target);
virtual void* Cast() = 0;
- virtual bool IsAlive() const = 0;
- virtual bool IsClosing() const = 0;
- virtual bool IsIPCPipe() const = 0;
- virtual int GetFD() const = 0;
+ virtual bool IsAlive() = 0;
+ virtual bool IsClosing() = 0;
+ virtual bool IsIPCPipe();
+ virtual int GetFD();
virtual int ReadStart() = 0;
virtual int ReadStop() = 0;
diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc
index 3b50f638eb..c8ea8d228f 100644
--- a/src/stream_wrap.cc
+++ b/src/stream_wrap.cc
@@ -84,7 +84,7 @@ void StreamWrap::AddMethods(Environment* env,
}
-int StreamWrap::GetFD() const {
+int StreamWrap::GetFD() {
int fd = -1;
#if !defined(_WIN32)
if (stream() != nullptr)
@@ -94,12 +94,12 @@ int StreamWrap::GetFD() const {
}
-bool StreamWrap::IsAlive() const {
+bool StreamWrap::IsAlive() {
return HandleWrap::IsAlive(this);
}
-bool StreamWrap::IsClosing() const {
+bool StreamWrap::IsClosing() {
return uv_is_closing(reinterpret_cast<uv_handle_t*>(stream()));
}
@@ -114,7 +114,7 @@ AsyncWrap* StreamWrap::GetAsyncWrap() {
}
-bool StreamWrap::IsIPCPipe() const {
+bool StreamWrap::IsIPCPipe() {
return is_named_pipe_ipc();
}
@@ -359,16 +359,6 @@ void StreamWrap::OnAfterWriteImpl(WriteWrap* w, void* ctx) {
wrap->UpdateWriteQueueSize();
}
-
-const char* StreamWrap::Error() const {
- return nullptr;
-}
-
-
-void StreamWrap::ClearError() {
- // No-op
-}
-
} // namespace node
NODE_MODULE_CONTEXT_AWARE_BUILTIN(stream_wrap, node::StreamWrap::Initialize)
diff --git a/src/stream_wrap.h b/src/stream_wrap.h
index ca673b4ef1..99561e843a 100644
--- a/src/stream_wrap.h
+++ b/src/stream_wrap.h
@@ -19,11 +19,11 @@ class StreamWrap : public HandleWrap, public StreamBase {
v8::Handle<v8::Value> unused,
v8::Handle<v8::Context> context);
- int GetFD() const override;
+ int GetFD() override;
void* Cast() override;
- bool IsAlive() const override;
- bool IsClosing() const override;
- bool IsIPCPipe() const override;
+ bool IsAlive() override;
+ bool IsClosing() override;
+ bool IsIPCPipe() override;
// JavaScript functions
int ReadStart() override;
@@ -36,8 +36,6 @@ class StreamWrap : public HandleWrap, public StreamBase {
uv_buf_t* bufs,
size_t count,
uv_stream_t* send_handle) override;
- const char* Error() const override;
- void ClearError() override;
inline uv_stream_t* stream() const {
return stream_;
diff --git a/src/tls_wrap.cc b/src/tls_wrap.cc
index ab8db6951b..86056723df 100644
--- a/src/tls_wrap.cc
+++ b/src/tls_wrap.cc
@@ -216,7 +216,7 @@ void TLSWrap::Receive(const FunctionCallbackInfo<Value>& args) {
size_t copy = buf.len > len ? len : buf.len;
memcpy(buf.base, data, copy);
buf.len = copy;
- wrap->stream_->OnRead(buf.len, &buf, UV_UNKNOWN_HANDLE);
+ wrap->stream_->OnRead(buf.len, &buf);
data += copy;
len -= copy;
@@ -414,7 +414,7 @@ void TLSWrap::ClearOut() {
if (static_cast<int>(buf.len) < avail)
avail = buf.len;
memcpy(buf.base, out, avail);
- OnRead(avail, &buf, UV_UNKNOWN_HANDLE);
+ OnRead(avail, &buf);
read -= avail;
}
@@ -423,7 +423,7 @@ void TLSWrap::ClearOut() {
int flags = SSL_get_shutdown(ssl_);
if (!eof_ && flags & SSL_RECEIVED_SHUTDOWN) {
eof_ = true;
- OnRead(UV_EOF, nullptr, UV_UNKNOWN_HANDLE);
+ OnRead(UV_EOF, nullptr);
}
if (read == -1) {
@@ -495,22 +495,22 @@ AsyncWrap* TLSWrap::GetAsyncWrap() {
}
-bool TLSWrap::IsIPCPipe() const {
+bool TLSWrap::IsIPCPipe() {
return stream_->IsIPCPipe();
}
-int TLSWrap::GetFD() const {
+int TLSWrap::GetFD() {
return stream_->GetFD();
}
-bool TLSWrap::IsAlive() const {
+bool TLSWrap::IsAlive() {
return stream_->IsAlive();
}
-bool TLSWrap::IsClosing() const {
+bool TLSWrap::IsClosing() {
return stream_->IsClosing();
}
@@ -536,12 +536,6 @@ void TLSWrap::ClearError() {
}
-int TLSWrap::DoTryWrite(uv_buf_t** bufs, size_t* count) {
- // TODO(indutny): Support it
- return 0;
-}
-
-
int TLSWrap::DoWrite(WriteWrap* w,
uv_buf_t* bufs,
size_t count,
@@ -668,7 +662,7 @@ void TLSWrap::DoRead(ssize_t nread,
HandleScope handle_scope(env()->isolate());
Context::Scope context_scope(env()->context());
- OnRead(nread, nullptr, UV_UNKNOWN_HANDLE);
+ OnRead(nread, nullptr);
return;
}
diff --git a/src/tls_wrap.h b/src/tls_wrap.h
index 42452055ce..73a9f84ec0 100644
--- a/src/tls_wrap.h
+++ b/src/tls_wrap.h
@@ -32,16 +32,15 @@ class TLSWrap : public crypto::SSLWrap<TLSWrap>,
v8::Handle<v8::Context> context);
void* Cast() override;
- int GetFD() const override;
- bool IsAlive() const override;
- bool IsClosing() const override;
+ int GetFD() override;
+ bool IsAlive() override;
+ bool IsClosing() override;
// JavaScript functions
int ReadStart() override;
int ReadStop() override;
int DoShutdown(ShutdownWrap* req_wrap) override;
- int DoTryWrite(uv_buf_t** bufs, size_t* count) override;
int DoWrite(WriteWrap* w,
uv_buf_t* bufs,
size_t count,
@@ -78,7 +77,7 @@ class TLSWrap : public crypto::SSLWrap<TLSWrap>,
TLSWrap(Environment* env,
Kind kind,
- StreamBase* steram,
+ StreamBase* stream,
v8::Handle<v8::Object> stream_obj,
v8::Handle<v8::Object> sc);
@@ -104,7 +103,7 @@ class TLSWrap : public crypto::SSLWrap<TLSWrap>,
}
AsyncWrap* GetAsyncWrap() override;
- bool IsIPCPipe() const override;
+ bool IsIPCPipe() override;
// Resource implementation
static void OnAfterWriteImpl(WriteWrap* w, void* ctx);
diff --git a/test/parallel/test-tls-js-stream.js b/test/parallel/test-tls-js-stream.js
new file mode 100644
index 0000000000..7caa7e3f19
--- /dev/null
+++ b/test/parallel/test-tls-js-stream.js
@@ -0,0 +1,69 @@
+var assert = require('assert');
+var stream = require('stream');
+var tls = require('tls');
+var fs = require('fs');
+var net = require('net');
+
+var common = require('../common');
+
+var connected = {
+ client: 0,
+ server: 0
+};
+
+var server = tls.createServer({
+ key: fs.readFileSync(common.fixturesDir + '/keys/agent1-key.pem'),
+ cert: fs.readFileSync(common.fixturesDir + '/keys/agent1-cert.pem')
+}, function(c) {
+ console.log('new client');
+ connected.server++;
+ c.end('ohai');
+}).listen(common.PORT, function() {
+ var raw = net.connect(common.PORT);
+
+ var pending = false;
+ raw.on('readable', function() {
+ if (pending)
+ p._read();
+ });
+
+ var p = new stream.Duplex({
+ read: function read() {
+ pending = false;
+
+ var chunk = raw.read();
+ if (chunk) {
+ console.log('read', chunk);
+ this.push(chunk);
+ } else {
+ pending = true;
+ }
+ },
+ write: function write(data, enc, cb) {
+ console.log('write', data, enc);
+ raw.write(data, enc, cb);
+ }
+ });
+
+ var socket = tls.connect({
+ socket: p,
+ rejectUnauthorized: false
+ }, function() {
+ console.log('client secure');
+
+ connected.client++;
+
+ socket.end('hello');
+ socket.resume();
+ });
+
+ socket.once('close', function() {
+ console.log('client close');
+ server.close();
+ });
+});
+
+process.once('exit', function() {
+ assert.equal(connected.client, 1);
+ assert.equal(connected.server, 1);
+});