src: slim down stream_base-inl.h

PR-URL: https://github.com/nodejs/node/pull/46972
Refs: https://github.com/nodejs/node/issues/43712
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
Reviewed-By: Juan José Arboleda <soyjuanarbol@gmail.com>
Reviewed-By: Debadree Chatterjee <debadree333@gmail.com>
This commit is contained in:
lilsweetcaligula 2023-04-11 08:52:27 +03:00 committed by GitHub
parent 17d024ba69
commit 97d3912eb8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 142 additions and 140 deletions

View File

@ -67,30 +67,6 @@ void StreamResource::PushStreamListener(StreamListener* listener) {
listener_ = listener;
}
void StreamResource::RemoveStreamListener(StreamListener* listener) {
CHECK_NOT_NULL(listener);
StreamListener* previous;
StreamListener* current;
// Remove from the linked list.
for (current = listener_, previous = nullptr;
/* No loop condition because we want a crash if listener is not found */
; previous = current, current = current->previous_listener_) {
CHECK_NOT_NULL(current);
if (current == listener) {
if (previous != nullptr)
previous->previous_listener_ = current->previous_listener_;
else
listener_ = listener->previous_listener_;
break;
}
}
listener->stream_ = nullptr;
listener->previous_listener_ = nullptr;
}
uv_buf_t StreamResource::EmitAlloc(size_t suggested_size) {
DebugSealHandleScope seal_handle_scope;
return listener_->OnStreamAlloc(suggested_size);
@ -122,101 +98,6 @@ StreamBase::StreamBase(Environment* env) : env_(env) {
PushStreamListener(&default_listener_);
}
int StreamBase::Shutdown(v8::Local<v8::Object> req_wrap_obj) {
Environment* env = stream_env();
v8::HandleScope handle_scope(env->isolate());
if (req_wrap_obj.IsEmpty()) {
if (!env->shutdown_wrap_template()
->NewInstance(env->context())
.ToLocal(&req_wrap_obj)) {
return UV_EBUSY;
}
StreamReq::ResetObject(req_wrap_obj);
}
BaseObjectPtr<AsyncWrap> req_wrap_ptr;
AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap());
ShutdownWrap* req_wrap = CreateShutdownWrap(req_wrap_obj);
if (req_wrap != nullptr)
req_wrap_ptr.reset(req_wrap->GetAsyncWrap());
int err = DoShutdown(req_wrap);
if (err != 0 && req_wrap != nullptr) {
req_wrap->Dispose();
}
const char* msg = Error();
if (msg != nullptr) {
if (req_wrap_obj->Set(env->context(),
env->error_string(),
OneByteString(env->isolate(), msg)).IsNothing()) {
return UV_EBUSY;
}
ClearError();
}
return err;
}
StreamWriteResult StreamBase::Write(uv_buf_t* bufs,
size_t count,
uv_stream_t* send_handle,
v8::Local<v8::Object> req_wrap_obj,
bool skip_try_write) {
Environment* env = stream_env();
int err;
size_t total_bytes = 0;
for (size_t i = 0; i < count; ++i)
total_bytes += bufs[i].len;
bytes_written_ += total_bytes;
if (send_handle == nullptr && !skip_try_write) {
err = DoTryWrite(&bufs, &count);
if (err != 0 || count == 0) {
return StreamWriteResult { false, err, nullptr, total_bytes, {} };
}
}
v8::HandleScope handle_scope(env->isolate());
if (req_wrap_obj.IsEmpty()) {
if (!env->write_wrap_template()
->NewInstance(env->context())
.ToLocal(&req_wrap_obj)) {
return StreamWriteResult { false, UV_EBUSY, nullptr, 0, {} };
}
StreamReq::ResetObject(req_wrap_obj);
}
AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap());
WriteWrap* req_wrap = CreateWriteWrap(req_wrap_obj);
BaseObjectPtr<AsyncWrap> req_wrap_ptr(req_wrap->GetAsyncWrap());
err = DoWrite(req_wrap, bufs, count, send_handle);
bool async = err == 0;
if (!async) {
req_wrap->Dispose();
req_wrap = nullptr;
}
const char* msg = Error();
if (msg != nullptr) {
if (req_wrap_obj->Set(env->context(),
env->error_string(),
OneByteString(env->isolate(), msg)).IsNothing()) {
return StreamWriteResult { false, UV_EBUSY, nullptr, 0, {} };
}
ClearError();
}
return StreamWriteResult {
async, err, req_wrap, total_bytes, std::move(req_wrap_ptr) };
}
template <typename OtherBase>
SimpleShutdownWrap<OtherBase>::SimpleShutdownWrap(
StreamBase* stream,
@ -278,22 +159,6 @@ void WriteWrap::SetBackingStore(std::unique_ptr<v8::BackingStore> bs) {
backing_store_ = std::move(bs);
}
void StreamReq::Done(int status, const char* error_str) {
AsyncWrap* async_wrap = GetAsyncWrap();
Environment* env = async_wrap->env();
if (error_str != nullptr) {
v8::HandleScope handle_scope(env->isolate());
if (async_wrap->object()->Set(
env->context(),
env->error_string(),
OneByteString(env->isolate(), error_str)).IsNothing()) {
return;
}
}
OnDone(status);
}
void StreamReq::ResetObject(v8::Local<v8::Object> obj) {
DCHECK_GT(obj->InternalFieldCount(), StreamReq::kStreamReqField);

View File

@ -40,6 +40,103 @@ using v8::Signature;
using v8::String;
using v8::Value;
int StreamBase::Shutdown(v8::Local<v8::Object> req_wrap_obj) {
Environment* env = stream_env();
v8::HandleScope handle_scope(env->isolate());
if (req_wrap_obj.IsEmpty()) {
if (!env->shutdown_wrap_template()
->NewInstance(env->context())
.ToLocal(&req_wrap_obj)) {
return UV_EBUSY;
}
StreamReq::ResetObject(req_wrap_obj);
}
BaseObjectPtr<AsyncWrap> req_wrap_ptr;
AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap());
ShutdownWrap* req_wrap = CreateShutdownWrap(req_wrap_obj);
if (req_wrap != nullptr) req_wrap_ptr.reset(req_wrap->GetAsyncWrap());
int err = DoShutdown(req_wrap);
if (err != 0 && req_wrap != nullptr) {
req_wrap->Dispose();
}
const char* msg = Error();
if (msg != nullptr) {
if (req_wrap_obj
->Set(env->context(),
env->error_string(),
OneByteString(env->isolate(), msg))
.IsNothing()) {
return UV_EBUSY;
}
ClearError();
}
return err;
}
StreamWriteResult StreamBase::Write(uv_buf_t* bufs,
size_t count,
uv_stream_t* send_handle,
v8::Local<v8::Object> req_wrap_obj,
bool skip_try_write) {
Environment* env = stream_env();
int err;
size_t total_bytes = 0;
for (size_t i = 0; i < count; ++i) total_bytes += bufs[i].len;
bytes_written_ += total_bytes;
if (send_handle == nullptr && !skip_try_write) {
err = DoTryWrite(&bufs, &count);
if (err != 0 || count == 0) {
return StreamWriteResult{false, err, nullptr, total_bytes, {}};
}
}
v8::HandleScope handle_scope(env->isolate());
if (req_wrap_obj.IsEmpty()) {
if (!env->write_wrap_template()
->NewInstance(env->context())
.ToLocal(&req_wrap_obj)) {
return StreamWriteResult{false, UV_EBUSY, nullptr, 0, {}};
}
StreamReq::ResetObject(req_wrap_obj);
}
AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap());
WriteWrap* req_wrap = CreateWriteWrap(req_wrap_obj);
BaseObjectPtr<AsyncWrap> req_wrap_ptr(req_wrap->GetAsyncWrap());
err = DoWrite(req_wrap, bufs, count, send_handle);
bool async = err == 0;
if (!async) {
req_wrap->Dispose();
req_wrap = nullptr;
}
const char* msg = Error();
if (msg != nullptr) {
if (req_wrap_obj
->Set(env->context(),
env->error_string(),
OneByteString(env->isolate(), msg))
.IsNothing()) {
return StreamWriteResult{false, UV_EBUSY, nullptr, 0, {}};
}
ClearError();
}
return StreamWriteResult{
async, err, req_wrap, total_bytes, std::move(req_wrap_ptr)};
}
template int StreamBase::WriteString<ASCII>(
const FunctionCallbackInfo<Value>& args);
template int StreamBase::WriteString<UTF8>(
@ -680,6 +777,30 @@ StreamResource::~StreamResource() {
}
}
void StreamResource::RemoveStreamListener(StreamListener* listener) {
CHECK_NOT_NULL(listener);
StreamListener* previous;
StreamListener* current;
// Remove from the linked list.
// No loop condition because we want a crash if listener is not found.
for (current = listener_, previous = nullptr;;
previous = current, current = current->previous_listener_) {
CHECK_NOT_NULL(current);
if (current == listener) {
if (previous != nullptr)
previous->previous_listener_ = current->previous_listener_;
else
listener_ = listener->previous_listener_;
break;
}
}
listener->stream_ = nullptr;
listener->previous_listener_ = nullptr;
}
ShutdownWrap* StreamBase::CreateShutdownWrap(
Local<Object> object) {
auto* wrap = new SimpleShutdownWrap<AsyncWrap>(this, object);
@ -694,4 +815,21 @@ WriteWrap* StreamBase::CreateWriteWrap(
return wrap;
}
void StreamReq::Done(int status, const char* error_str) {
AsyncWrap* async_wrap = GetAsyncWrap();
Environment* env = async_wrap->env();
if (error_str != nullptr) {
v8::HandleScope handle_scope(env->isolate());
if (async_wrap->object()
->Set(env->context(),
env->error_string(),
OneByteString(env->isolate(), error_str))
.IsNothing()) {
return;
}
}
OnDone(status);
}
} // namespace node

View File

@ -51,7 +51,7 @@ class StreamReq {
// TODO(RaisinTen): Update the return type to a Maybe, so that we can indicate
// if there is a pending exception/termination.
inline void Done(int status, const char* error_str = nullptr);
void Done(int status, const char* error_str = nullptr);
inline void Dispose();
StreamBase* stream() const { return stream_; }
@ -276,7 +276,7 @@ class StreamResource {
inline void PushStreamListener(StreamListener* listener);
// Remove a listener, and, if this was the currently active one,
// transfer ownership back to the previous listener.
inline void RemoveStreamListener(StreamListener* listener);
void RemoveStreamListener(StreamListener* listener);
protected:
// Call the current listener's OnStreamAlloc() method.
@ -339,8 +339,7 @@ class StreamBase : public StreamResource {
// ShutdownWrap object (that was created in JS), or a new one will be created.
// Returns 1 in case of a synchronous completion, 0 in case of asynchronous
// completion, and a libuv error case in case of synchronous failure.
inline int Shutdown(
v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>());
int Shutdown(v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>());
// TODO(RaisinTen): Update the return type to a Maybe, so that we can indicate
// if there is a pending exception/termination.
@ -353,7 +352,7 @@ class StreamBase : public StreamResource {
// write is too large to finish synchronously.
// If the return value indicates a synchronous completion, no callback will
// be invoked.
inline StreamWriteResult Write(
StreamWriteResult Write(
uv_buf_t* bufs,
size_t count,
uv_stream_t* send_handle = nullptr,