#ifndef SRC_STREAM_BASE_H_ #define SRC_STREAM_BASE_H_ #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS #include "env.h" #include "async_wrap.h" #include "req_wrap-inl.h" #include "node.h" #include "util.h" #include "v8.h" namespace node { // Forward declarations class StreamBase; template class StreamReq { public: typedef void (*DoneCb)(Req* req, int status); explicit StreamReq(DoneCb cb) : cb_(cb) { } inline void Done(int status, const char* error_str = nullptr) { Req* req = static_cast(this); Environment* env = req->env(); if (error_str != nullptr) { req->object()->Set(env->error_string(), OneByteString(env->isolate(), error_str)); } cb_(req, status); } private: DoneCb cb_; }; class ShutdownWrap : public ReqWrap, public StreamReq { public: ShutdownWrap(Environment* env, v8::Local req_wrap_obj, StreamBase* wrap, DoneCb cb) : ReqWrap(env, req_wrap_obj, AsyncWrap::PROVIDER_SHUTDOWNWRAP), StreamReq(cb), wrap_(wrap) { Wrap(req_wrap_obj, this); } ~ShutdownWrap() { ClearWrap(object()); } static ShutdownWrap* from_req(uv_shutdown_t* req) { return ContainerOf(&ShutdownWrap::req_, req); } inline StreamBase* wrap() const { return wrap_; } size_t self_size() const override { return sizeof(*this); } private: StreamBase* const wrap_; }; class WriteWrap: public ReqWrap, public StreamReq { public: static inline WriteWrap* New(Environment* env, v8::Local obj, StreamBase* wrap, DoneCb cb, size_t extra = 0); inline void Dispose(); inline char* Extra(size_t offset = 0); inline size_t ExtraSize() const; inline StreamBase* wrap() const { return wrap_; } size_t self_size() const override { return storage_size_; } static WriteWrap* from_req(uv_write_t* req) { return ContainerOf(&WriteWrap::req_, req); } static const size_t kAlignSize = 16; WriteWrap(Environment* env, v8::Local obj, StreamBase* wrap, DoneCb cb) : ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP), StreamReq(cb), wrap_(wrap), storage_size_(0) { Wrap(obj, this); } protected: WriteWrap(Environment* env, v8::Local obj, StreamBase* wrap, DoneCb cb, size_t storage_size) : ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP), StreamReq(cb), wrap_(wrap), storage_size_(storage_size) { Wrap(obj, this); } ~WriteWrap() { ClearWrap(object()); } void* operator new(size_t size) = delete; void* operator new(size_t size, char* storage) { return storage; } // This is just to keep the compiler happy. It should never be called, since // we don't use exceptions in node. void operator delete(void* ptr, char* storage) { UNREACHABLE(); } private: // People should not be using the non-placement new and delete operator on a // WriteWrap. Ensure this never happens. void operator delete(void* ptr) { UNREACHABLE(); } StreamBase* const wrap_; const size_t storage_size_; }; class StreamResource { public: template struct Callback { Callback() : fn(nullptr), ctx(nullptr) {} Callback(T fn, void* ctx) : fn(fn), ctx(ctx) {} Callback(const Callback&) = default; inline bool is_empty() { return fn == nullptr; } inline void clear() { fn = nullptr; ctx = nullptr; } T fn; void* ctx; }; typedef void (*AfterWriteCb)(WriteWrap* w, void* ctx); typedef void (*AllocCb)(size_t size, uv_buf_t* buf, void* ctx); typedef void (*ReadCb)(ssize_t nread, const uv_buf_t* buf, uv_handle_type pending, void* ctx); typedef void (*DestructCb)(void* ctx); StreamResource() : bytes_read_(0) { } virtual ~StreamResource() { if (!destruct_cb_.is_empty()) destruct_cb_.fn(destruct_cb_.ctx); } virtual int DoShutdown(ShutdownWrap* req_wrap) = 0; virtual int DoTryWrite(uv_buf_t** bufs, size_t* count); virtual int DoWrite(WriteWrap* w, uv_buf_t* bufs, size_t count, uv_stream_t* send_handle) = 0; virtual const char* Error() const; virtual void ClearError(); // Events inline void OnAfterWrite(WriteWrap* w) { if (!after_write_cb_.is_empty()) after_write_cb_.fn(w, after_write_cb_.ctx); } inline void OnAlloc(size_t size, uv_buf_t* buf) { if (!alloc_cb_.is_empty()) alloc_cb_.fn(size, buf, alloc_cb_.ctx); } inline void OnRead(ssize_t nread, const uv_buf_t* buf, uv_handle_type pending = UV_UNKNOWN_HANDLE) { if (nread > 0) bytes_read_ += static_cast(nread); if (!read_cb_.is_empty()) read_cb_.fn(nread, buf, pending, read_cb_.ctx); } inline void set_after_write_cb(Callback c) { after_write_cb_ = c; } inline void set_alloc_cb(Callback c) { alloc_cb_ = c; } inline void set_read_cb(Callback c) { read_cb_ = c; } inline void set_destruct_cb(Callback c) { destruct_cb_ = c; } inline Callback after_write_cb() { return after_write_cb_; } inline Callback alloc_cb() { return alloc_cb_; } inline Callback read_cb() { return read_cb_; } inline Callback destruct_cb() { return destruct_cb_; } private: Callback after_write_cb_; Callback alloc_cb_; Callback read_cb_; Callback destruct_cb_; uint64_t bytes_read_; friend class StreamBase; }; class StreamBase : public StreamResource { public: enum Flags { kFlagNone = 0x0, kFlagHasWritev = 0x1, kFlagNoShutdown = 0x2 }; template static inline void AddMethods(Environment* env, v8::Local target, int flags = kFlagNone); virtual void* Cast() = 0; virtual bool IsAlive() = 0; virtual bool IsClosing() = 0; virtual bool IsIPCPipe(); virtual int GetFD(); virtual int ReadStart() = 0; virtual int ReadStop() = 0; inline void Consume() { CHECK_EQ(consumed_, false); consumed_ = true; } inline void Unconsume() { CHECK_EQ(consumed_, true); consumed_ = false; } template inline Outer* Cast() { return static_cast(Cast()); } void EmitData(ssize_t nread, v8::Local buf, v8::Local handle); protected: explicit StreamBase(Environment* env) : env_(env), consumed_(false) { } virtual ~StreamBase() = default; // One of these must be implemented virtual AsyncWrap* GetAsyncWrap() = 0; virtual v8::Local GetObject(); // Libuv callbacks static void AfterShutdown(ShutdownWrap* req, int status); static void AfterWrite(WriteWrap* req, int status); // JS Methods int ReadStart(const v8::FunctionCallbackInfo& args); int ReadStop(const v8::FunctionCallbackInfo& args); int Shutdown(const v8::FunctionCallbackInfo& args); int Writev(const v8::FunctionCallbackInfo& args); int WriteBuffer(const v8::FunctionCallbackInfo& args); template int WriteString(const v8::FunctionCallbackInfo& args); template static void GetFD(v8::Local key, const v8::PropertyCallbackInfo& args); template static void GetExternal(v8::Local key, const v8::PropertyCallbackInfo& args); template static void GetBytesRead(v8::Local key, const v8::PropertyCallbackInfo& args); template & args)> static void JSMethod(const v8::FunctionCallbackInfo& args); private: Environment* env_; bool consumed_; }; } // namespace node #endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS #endif // SRC_STREAM_BASE_H_