#include "stream_base.h" // NOLINT(build/include_inline) #include "stream_base-inl.h" #include "stream_wrap.h" #include "node.h" #include "node_buffer.h" #include "node_errors.h" #include "env-inl.h" #include "js_stream.h" #include "string_bytes.h" #include "util-inl.h" #include "v8.h" #include // INT_MAX namespace node { using v8::Array; using v8::ArrayBuffer; using v8::Context; using v8::DontDelete; using v8::DontEnum; using v8::External; using v8::Function; using v8::FunctionCallbackInfo; using v8::HandleScope; using v8::Integer; using v8::Local; using v8::MaybeLocal; using v8::Object; using v8::ReadOnly; using v8::String; using v8::Value; template int StreamBase::WriteString( const FunctionCallbackInfo& args); template int StreamBase::WriteString( const FunctionCallbackInfo& args); template int StreamBase::WriteString( const FunctionCallbackInfo& args); template int StreamBase::WriteString( const FunctionCallbackInfo& args); int StreamBase::ReadStartJS(const FunctionCallbackInfo& args) { return ReadStart(); } int StreamBase::ReadStopJS(const FunctionCallbackInfo& args) { return ReadStop(); } int StreamBase::UseUserBuffer(const FunctionCallbackInfo& args) { CHECK(Buffer::HasInstance(args[0])); uv_buf_t buf = uv_buf_init(Buffer::Data(args[0]), Buffer::Length(args[0])); PushStreamListener(new CustomBufferJSListener(buf)); return 0; } int StreamBase::Shutdown(const FunctionCallbackInfo& args) { CHECK(args[0]->IsObject()); Local req_wrap_obj = args[0].As(); return Shutdown(req_wrap_obj); } void StreamBase::SetWriteResult(const StreamWriteResult& res) { env_->stream_base_state()[kBytesWritten] = res.bytes; env_->stream_base_state()[kLastWriteWasAsync] = res.async; } int StreamBase::Writev(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); CHECK(args[0]->IsObject()); CHECK(args[1]->IsArray()); Local req_wrap_obj = args[0].As(); Local chunks = args[1].As(); bool all_buffers = args[2]->IsTrue(); size_t count; if (all_buffers) count = chunks->Length(); else count = chunks->Length() >> 1; MaybeStackBuffer bufs(count); size_t storage_size = 0; size_t offset; if (!all_buffers) { // Determine storage size first for (size_t i = 0; i < count; i++) { Local chunk = chunks->Get(env->context(), i * 2).ToLocalChecked(); if (Buffer::HasInstance(chunk)) continue; // Buffer chunk, no additional storage required // String chunk Local string = chunk->ToString(env->context()).ToLocalChecked(); enum encoding encoding = ParseEncoding(env->isolate(), chunks->Get(env->context(), i * 2 + 1).ToLocalChecked()); size_t chunk_size; if (encoding == UTF8 && string->Length() > 65535 && !StringBytes::Size(env->isolate(), string, encoding).To(&chunk_size)) return 0; else if (!StringBytes::StorageSize(env->isolate(), string, encoding) .To(&chunk_size)) return 0; storage_size += chunk_size; } if (storage_size > INT_MAX) return UV_ENOBUFS; } else { for (size_t i = 0; i < count; i++) { Local chunk = chunks->Get(env->context(), i).ToLocalChecked(); bufs[i].base = Buffer::Data(chunk); bufs[i].len = Buffer::Length(chunk); } } AllocatedBuffer storage; if (storage_size > 0) storage = env->AllocateManaged(storage_size); offset = 0; if (!all_buffers) { for (size_t i = 0; i < count; i++) { Local chunk = chunks->Get(env->context(), i * 2).ToLocalChecked(); // Write buffer if (Buffer::HasInstance(chunk)) { bufs[i].base = Buffer::Data(chunk); bufs[i].len = Buffer::Length(chunk); continue; } // Write string CHECK_LE(offset, storage_size); char* str_storage = storage.data() + offset; size_t str_size = storage.size() - offset; Local string = chunk->ToString(env->context()).ToLocalChecked(); enum encoding encoding = ParseEncoding(env->isolate(), chunks->Get(env->context(), i * 2 + 1).ToLocalChecked()); str_size = StringBytes::Write(env->isolate(), str_storage, str_size, string, encoding); bufs[i].base = str_storage; bufs[i].len = str_size; offset += str_size; } } StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj); SetWriteResult(res); if (res.wrap != nullptr && storage_size > 0) { res.wrap->SetAllocatedStorage(std::move(storage)); } return res.err; } int StreamBase::WriteBuffer(const FunctionCallbackInfo& args) { CHECK(args[0]->IsObject()); Environment* env = Environment::GetCurrent(args); if (!args[1]->IsUint8Array()) { node::THROW_ERR_INVALID_ARG_TYPE(env, "Second argument must be a buffer"); return 0; } Local req_wrap_obj = args[0].As(); uv_buf_t buf; buf.base = Buffer::Data(args[1]); buf.len = Buffer::Length(args[1]); uv_stream_t* send_handle = nullptr; if (args[2]->IsObject() && IsIPCPipe()) { Local send_handle_obj = args[2].As(); HandleWrap* wrap; ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL); send_handle = reinterpret_cast(wrap->GetHandle()); // Reference LibuvStreamWrap instance to prevent it from being garbage // collected before `AfterWrite` is called. req_wrap_obj->Set(env->context(), env->handle_string(), send_handle_obj).Check(); } StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj); SetWriteResult(res); return res.err; } template int StreamBase::WriteString(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); CHECK(args[0]->IsObject()); CHECK(args[1]->IsString()); Local req_wrap_obj = args[0].As(); Local string = args[1].As(); Local send_handle_obj; if (args[2]->IsObject()) send_handle_obj = args[2].As(); // Compute the size of the storage that the string will be flattened into. // For UTF8 strings that are very long, go ahead and take the hit for // computing their actual size, rather than tripling the storage. size_t storage_size; if (enc == UTF8 && string->Length() > 65535 && !StringBytes::Size(env->isolate(), string, enc).To(&storage_size)) return 0; else if (!StringBytes::StorageSize(env->isolate(), string, enc) .To(&storage_size)) return 0; if (storage_size > INT_MAX) return UV_ENOBUFS; // Try writing immediately if write size isn't too big char stack_storage[16384]; // 16kb size_t data_size; size_t synchronously_written = 0; uv_buf_t buf; bool try_write = storage_size <= sizeof(stack_storage) && (!IsIPCPipe() || send_handle_obj.IsEmpty()); if (try_write) { data_size = StringBytes::Write(env->isolate(), stack_storage, storage_size, string, enc); buf = uv_buf_init(stack_storage, data_size); uv_buf_t* bufs = &buf; size_t count = 1; const int err = DoTryWrite(&bufs, &count); // Keep track of the bytes written here, because we're taking a shortcut // by using `DoTryWrite()` directly instead of using the utilities // provided by `Write()`. synchronously_written = count == 0 ? data_size : data_size - buf.len; bytes_written_ += synchronously_written; // Immediate failure or success if (err != 0 || count == 0) { SetWriteResult(StreamWriteResult { false, err, nullptr, data_size }); return err; } // Partial write CHECK_EQ(count, 1); } AllocatedBuffer data; if (try_write) { // Copy partial data data = env->AllocateManaged(buf.len); memcpy(data.data(), buf.base, buf.len); data_size = buf.len; } else { // Write it data = env->AllocateManaged(storage_size); data_size = StringBytes::Write(env->isolate(), data.data(), storage_size, string, enc); } CHECK_LE(data_size, storage_size); buf = uv_buf_init(data.data(), data_size); uv_stream_t* send_handle = nullptr; if (IsIPCPipe() && !send_handle_obj.IsEmpty()) { HandleWrap* wrap; ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL); send_handle = reinterpret_cast(wrap->GetHandle()); // Reference LibuvStreamWrap instance to prevent it from being garbage // collected before `AfterWrite` is called. req_wrap_obj->Set(env->context(), env->handle_string(), send_handle_obj).Check(); } StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj); res.bytes += synchronously_written; SetWriteResult(res); if (res.wrap != nullptr) { res.wrap->SetAllocatedStorage(std::move(data)); } return res.err; } MaybeLocal StreamBase::CallJSOnreadMethod(ssize_t nread, Local ab, size_t offset, StreamBaseJSChecks checks) { Environment* env = env_; DCHECK_EQ(static_cast(nread), nread); DCHECK_LE(offset, INT32_MAX); if (checks == DONT_SKIP_NREAD_CHECKS) { if (ab.IsEmpty()) { DCHECK_EQ(offset, 0); DCHECK_LE(nread, 0); } else { DCHECK_GE(nread, 0); } } env->stream_base_state()[kReadBytesOrError] = nread; env->stream_base_state()[kArrayBufferOffset] = offset; Local argv[] = { ab.IsEmpty() ? Undefined(env->isolate()).As() : ab.As() }; AsyncWrap* wrap = GetAsyncWrap(); CHECK_NOT_NULL(wrap); Local onread = wrap->object()->GetInternalField(kOnReadFunctionField); CHECK(onread->IsFunction()); return wrap->MakeCallback(onread.As(), arraysize(argv), argv); } bool StreamBase::IsIPCPipe() { return false; } int StreamBase::GetFD() { return -1; } Local StreamBase::GetObject() { return GetAsyncWrap()->object(); } void StreamBase::AddMethod(Environment* env, Local signature, enum PropertyAttribute attributes, Local t, JSMethodFunction* stream_method, Local string) { Local templ = env->NewFunctionTemplate(stream_method, signature, v8::ConstructorBehavior::kThrow, v8::SideEffectType::kHasNoSideEffect); t->PrototypeTemplate()->SetAccessorProperty( string, templ, Local(), attributes); } void StreamBase::AddMethods(Environment* env, Local t) { HandleScope scope(env->isolate()); enum PropertyAttribute attributes = static_cast(ReadOnly | DontDelete | DontEnum); Local sig = Signature::New(env->isolate(), t); AddMethod(env, sig, attributes, t, GetFD, env->fd_string()); AddMethod( env, sig, attributes, t, GetExternal, env->external_stream_string()); AddMethod(env, sig, attributes, t, GetBytesRead, env->bytes_read_string()); AddMethod( env, sig, attributes, t, GetBytesWritten, env->bytes_written_string()); env->SetProtoMethod(t, "readStart", JSMethod<&StreamBase::ReadStartJS>); env->SetProtoMethod(t, "readStop", JSMethod<&StreamBase::ReadStopJS>); env->SetProtoMethod(t, "shutdown", JSMethod<&StreamBase::Shutdown>); env->SetProtoMethod(t, "useUserBuffer", JSMethod<&StreamBase::UseUserBuffer>); env->SetProtoMethod(t, "writev", JSMethod<&StreamBase::Writev>); env->SetProtoMethod(t, "writeBuffer", JSMethod<&StreamBase::WriteBuffer>); env->SetProtoMethod( t, "writeAsciiString", JSMethod<&StreamBase::WriteString>); env->SetProtoMethod( t, "writeUtf8String", JSMethod<&StreamBase::WriteString>); env->SetProtoMethod( t, "writeUcs2String", JSMethod<&StreamBase::WriteString>); env->SetProtoMethod( t, "writeLatin1String", JSMethod<&StreamBase::WriteString>); t->PrototypeTemplate()->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "isStreamBase"), True(env->isolate())); t->PrototypeTemplate()->SetAccessor( FIXED_ONE_BYTE_STRING(env->isolate(), "onread"), BaseObject::InternalFieldGet, BaseObject::InternalFieldSet); } void StreamBase::GetFD(const FunctionCallbackInfo& args) { // Mimic implementation of StreamBase::GetFD() and UDPWrap::GetFD(). StreamBase* wrap = StreamBase::FromObject(args.This().As()); if (wrap == nullptr) return args.GetReturnValue().Set(UV_EINVAL); if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL); args.GetReturnValue().Set(wrap->GetFD()); } void StreamBase::GetBytesRead(const FunctionCallbackInfo& args) { StreamBase* wrap = StreamBase::FromObject(args.This().As()); if (wrap == nullptr) return args.GetReturnValue().Set(0); // uint64_t -> double. 53bits is enough for all real cases. args.GetReturnValue().Set(static_cast(wrap->bytes_read_)); } void StreamBase::GetBytesWritten(const FunctionCallbackInfo& args) { StreamBase* wrap = StreamBase::FromObject(args.This().As()); if (wrap == nullptr) return args.GetReturnValue().Set(0); // uint64_t -> double. 53bits is enough for all real cases. args.GetReturnValue().Set(static_cast(wrap->bytes_written_)); } void StreamBase::GetExternal(const FunctionCallbackInfo& args) { StreamBase* wrap = StreamBase::FromObject(args.This().As()); if (wrap == nullptr) return; Local ext = External::New(args.GetIsolate(), wrap); args.GetReturnValue().Set(ext); } template & args)> void StreamBase::JSMethod(const FunctionCallbackInfo& args) { StreamBase* wrap = StreamBase::FromObject(args.Holder().As()); if (wrap == nullptr) return; if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL); AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap->GetAsyncWrap()); args.GetReturnValue().Set((wrap->*Method)(args)); } int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) { // No TryWrite by default return 0; } const char* StreamResource::Error() const { return nullptr; } void StreamResource::ClearError() { // No-op } uv_buf_t EmitToJSStreamListener::OnStreamAlloc(size_t suggested_size) { CHECK_NOT_NULL(stream_); Environment* env = static_cast(stream_)->stream_env(); return env->AllocateManaged(suggested_size).release(); } void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) { CHECK_NOT_NULL(stream_); StreamBase* stream = static_cast(stream_); Environment* env = stream->stream_env(); HandleScope handle_scope(env->isolate()); Context::Scope context_scope(env->context()); AllocatedBuffer buf(env, buf_); if (nread <= 0) { if (nread < 0) stream->CallJSOnreadMethod(nread, Local()); return; } CHECK_LE(static_cast(nread), buf.size()); buf.Resize(nread); stream->CallJSOnreadMethod(nread, buf.ToArrayBuffer()); } uv_buf_t CustomBufferJSListener::OnStreamAlloc(size_t suggested_size) { return buffer_; } void CustomBufferJSListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) { CHECK_NOT_NULL(stream_); CHECK_EQ(buf.base, buffer_.base); StreamBase* stream = static_cast(stream_); Environment* env = stream->stream_env(); HandleScope handle_scope(env->isolate()); Context::Scope context_scope(env->context()); MaybeLocal ret = stream->CallJSOnreadMethod(nread, Local(), 0, StreamBase::SKIP_NREAD_CHECKS); Local next_buf_v; if (ret.ToLocal(&next_buf_v) && !next_buf_v->IsUndefined()) { buffer_.base = Buffer::Data(next_buf_v); buffer_.len = Buffer::Length(next_buf_v); } } void ReportWritesToJSStreamListener::OnStreamAfterReqFinished( StreamReq* req_wrap, int status) { StreamBase* stream = static_cast(stream_); Environment* env = stream->stream_env(); AsyncWrap* async_wrap = req_wrap->GetAsyncWrap(); HandleScope handle_scope(env->isolate()); Context::Scope context_scope(env->context()); CHECK(!async_wrap->persistent().IsEmpty()); Local req_wrap_obj = async_wrap->object(); Local argv[] = { Integer::New(env->isolate(), status), stream->GetObject(), Undefined(env->isolate()) }; const char* msg = stream->Error(); if (msg != nullptr) { argv[2] = OneByteString(env->isolate(), msg); stream->ClearError(); } if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust()) async_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv); } void ReportWritesToJSStreamListener::OnStreamAfterWrite( WriteWrap* req_wrap, int status) { OnStreamAfterReqFinished(req_wrap, status); } void ReportWritesToJSStreamListener::OnStreamAfterShutdown( ShutdownWrap* req_wrap, int status) { OnStreamAfterReqFinished(req_wrap, status); } } // namespace node