diff options
Diffstat (limited to 'src/stream_wrap.cc')
-rw-r--r-- | src/stream_wrap.cc | 80 |
1 files changed, 74 insertions, 6 deletions
diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc index 56d57b23be..548f978987 100644 --- a/src/stream_wrap.cc +++ b/src/stream_wrap.cc @@ -2,6 +2,7 @@ #include <node_buffer.h> #include <handle_wrap.h> #include <stream_wrap.h> +#include <tcp_wrap.h> #include <req_wrap.h> @@ -95,7 +96,14 @@ Handle<Value> StreamWrap::ReadStart(const Arguments& args) { UNWRAP - int r = uv_read_start(wrap->stream_, OnAlloc, OnRead); + bool ipc_pipe = wrap->stream_->type == UV_NAMED_PIPE && + ((uv_pipe_t*)wrap->stream_)->ipc; + int r; + if (ipc_pipe) { + r = uv_read2_start(wrap->stream_, OnAlloc, OnRead2); + } else { + r = uv_read_start(wrap->stream_, OnAlloc, OnRead); + } // Error starting the tcp. if (r) SetErrno(uv_last_error(uv_default_loop()).code); @@ -170,9 +178,13 @@ uv_buf_t StreamWrap::OnAlloc(uv_handle_t* handle, size_t suggested_size) { return buf; } -void StreamWrap::OnRead(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) { + +void StreamWrap::OnReadCommon(uv_stream_t* handle, ssize_t nread, + uv_buf_t buf, uv_handle_type pending) { HandleScope scope; + assert(pending == UV_UNKNOWN_HANDLE); // TODO + StreamWrap* wrap = static_cast<StreamWrap*>(handle->data); // We should not be getting this callback if someone as already called @@ -201,25 +213,59 @@ void StreamWrap::OnRead(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) { } if (nread > 0) { - Local<Value> argv[3] = { + int argc = 3; + Local<Value> argv[4] = { slab_v, Integer::New(wrap->slab_offset_), Integer::New(nread) }; - MakeCallback(wrap->object_, "onread", 3, argv); + + if (pending == UV_TCP) { + // Instantiate the client javascript object and handle. + Local<Object> pending_obj = TCPWrap::Instantiate(); + + // Unwrap the client javascript object. + assert(pending_obj->InternalFieldCount() > 0); + TCPWrap* pending_wrap = + static_cast<TCPWrap*>(pending_obj->GetPointerFromInternalField(0)); + + int r = uv_accept(handle, pending_wrap->GetStream()); + assert(r == 0); + + argv[3] = pending_obj; + argc++; + } else { + // We only support sending UV_TCP right now. + assert(pending == UV_UNKNOWN_HANDLE); + } + + MakeCallback(wrap->object_, "onread", argc, argv); } } +void StreamWrap::OnRead(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) { + OnReadCommon(handle, nread, buf, UV_UNKNOWN_HANDLE); +} + + +void StreamWrap::OnRead2(uv_pipe_t* handle, ssize_t nread, uv_buf_t buf, + uv_handle_type pending) { + OnReadCommon((uv_stream_t*)handle, nread, buf, pending); +} + + Handle<Value> StreamWrap::Write(const Arguments& args) { HandleScope scope; UNWRAP + bool ipc_pipe = wrap->stream_->type == UV_NAMED_PIPE && + ((uv_pipe_t*)wrap->stream_)->ipc; + // The first argument is a buffer. assert(Buffer::HasInstance(args[0])); Local<Object> buffer_obj = args[0]->ToObject(); - size_t offset = 0; size_t length = Buffer::Length(buffer_obj); @@ -239,7 +285,29 @@ Handle<Value> StreamWrap::Write(const Arguments& args) { buf.base = Buffer::Data(buffer_obj) + offset; buf.len = length; - int r = uv_write(&req_wrap->req_, wrap->stream_, &buf, 1, StreamWrap::AfterWrite); + int r; + + if (!ipc_pipe) { + r = uv_write(&req_wrap->req_, wrap->stream_, &buf, 1, StreamWrap::AfterWrite); + } else { + uv_stream_t* send_stream = NULL; + + if (args.Length() > 3) { + assert(args[3]->IsObject()); + Local<Object> send_stream_obj = args[3]->ToObject(); + assert(send_stream_obj->InternalFieldCount() > 0); + StreamWrap* send_stream_wrap = static_cast<StreamWrap*>( + send_stream_obj->GetPointerFromInternalField(0)); + send_stream = send_stream_wrap->GetStream(); + } + + r = uv_write2(&req_wrap->req_, + wrap->stream_, + &buf, + 1, + send_stream, + StreamWrap::AfterWrite); + } req_wrap->Dispatched(); |