src: refactor stream callbacks and ownership

Instead of setting individual callbacks on streams and tracking
stream ownership through a boolean `consume_` flag, always have
one specific listener object in charge of a stream, and call
methods on that object rather than generic C-style callbacks.

PR-URL: https://github.com/nodejs/node/pull/18334
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Anatoli Papirovski <apapirovski@mac.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
This commit is contained in:
Anna Henningsen 2018-01-08 01:14:06 +01:00 committed by Ruben Bridgewater
parent 1b6cb94761
commit 7c4b09b24b
No known key found for this signature in database
GPG Key ID: F07496B3EB3C1762
18 changed files with 467 additions and 478 deletions

View File

@ -666,7 +666,7 @@ function onSocketPause() {
function unconsume(parser, socket) {
if (socket._handle) {
if (parser._consumed)
parser.unconsume(socket._handle._externalStream);
parser.unconsume();
parser._consumed = false;
socket.removeListener('pause', onSocketPause);
socket.removeListener('resume', onSocketResume);

View File

@ -3,6 +3,7 @@
#include "connect_wrap.h"
#include "env-inl.h"
#include "pipe_wrap.h"
#include "stream_base-inl.h"
#include "stream_wrap.h"
#include "tcp_wrap.h"
#include "util-inl.h"

View File

@ -25,9 +25,6 @@ JSStream::JSStream(Environment* env, Local<Object> obj)
StreamBase(env) {
node::Wrap(obj, this);
MakeWeak<JSStream>(this);
set_alloc_cb({ OnAllocImpl, this });
set_read_cb({ OnReadImpl, this });
}
@ -35,45 +32,6 @@ JSStream::~JSStream() {
}
void JSStream::OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx) {
buf->base = Malloc(size);
buf->len = size;
}
void JSStream::OnReadImpl(ssize_t nread,
const uv_buf_t* buf,
uv_handle_type pending,
void* ctx) {
JSStream* wrap = static_cast<JSStream*>(ctx);
CHECK_NE(wrap, nullptr);
Environment* env = wrap->env();
HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context());
if (nread < 0) {
if (buf != nullptr && buf->base != nullptr)
free(buf->base);
wrap->EmitData(nread, Local<Object>(), Local<Object>());
return;
}
if (nread == 0) {
if (buf->base != nullptr)
free(buf->base);
return;
}
CHECK_LE(static_cast<size_t>(nread), buf->len);
char* base = node::Realloc(buf->base, nread);
CHECK_EQ(pending, UV_UNKNOWN_HANDLE);
Local<Object> obj = Buffer::New(env, base, nread).ToLocalChecked();
wrap->EmitData(nread, obj, Local<Object>());
}
AsyncWrap* JSStream::GetAsyncWrap() {
return static_cast<AsyncWrap*>(this);
}
@ -212,18 +170,19 @@ void JSStream::ReadBuffer(const FunctionCallbackInfo<Value>& args) {
char* data = Buffer::Data(args[0]);
int len = Buffer::Length(args[0]);
do {
uv_buf_t buf;
// Repeatedly ask the stream's owner for memory, copy the data that we
// just read from JS into those buffers and emit them as reads.
while (len != 0) {
uv_buf_t buf = wrap->EmitAlloc(len);
ssize_t avail = len;
wrap->EmitAlloc(len, &buf);
if (static_cast<ssize_t>(buf.len) < avail)
avail = buf.len;
memcpy(buf.base, data, avail);
data += avail;
len -= avail;
wrap->EmitRead(avail, &buf);
} while (len != 0);
wrap->EmitRead(avail, buf);
}
}
@ -231,7 +190,7 @@ void JSStream::EmitEOF(const FunctionCallbackInfo<Value>& args) {
JSStream* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
wrap->EmitRead(UV_EOF, nullptr);
wrap->EmitRead(UV_EOF);
}

View File

@ -531,24 +531,12 @@ Http2Session::Http2Session(Environment* env,
outgoing_buffers_.reserve(32);
}
void Http2Session::Unconsume() {
if (stream_ != nullptr) {
DEBUG_HTTP2SESSION(this, "unconsuming the i/o stream");
stream_->set_destruct_cb({ nullptr, nullptr });
stream_->set_alloc_cb({ nullptr, nullptr });
stream_->set_read_cb({ nullptr, nullptr });
stream_->Unconsume();
stream_ = nullptr;
}
}
Http2Session::~Http2Session() {
CHECK_EQ(flags_ & SESSION_STATE_HAS_SCOPE, 0);
if (!object().IsEmpty())
ClearWrap(object());
persistent().Reset();
CHECK(persistent().IsEmpty());
Unconsume();
DEBUG_HTTP2SESSION(this, "freeing nghttp2 session");
nghttp2_session_del(session_);
}
@ -646,7 +634,8 @@ void Http2Session::Close(uint32_t code, bool socket_closed) {
DEBUG_HTTP2SESSION2(this, "terminating session with code %d", code);
CHECK_EQ(nghttp2_session_terminate_session(session_, code), 0);
} else {
Unconsume();
if (stream_ != nullptr)
stream_->RemoveStreamListener(this);
}
// If there are outstanding pings, those will need to be canceled, do
@ -1044,22 +1033,38 @@ inline int Http2Session::OnDataChunkReceived(nghttp2_session* handle,
stream->statistics_.received_bytes += len;
// There is a single large array buffer for the entire data read from the
// network; create a slice of that array buffer and emit it as the
// received data buffer.
CHECK(!session->stream_buf_ab_.IsEmpty());
size_t offset = reinterpret_cast<const char*>(data) - session->stream_buf_;
// Verify that the data offset is inside the current read buffer.
CHECK_LE(offset, session->stream_buf_size_);
// Repeatedly ask the stream's owner for memory, and copy the read data
// into those buffers.
// The typical case is actually the exception here; Http2StreamListeners
// know about the HTTP2 session associated with this stream, so they know
// about the larger from-socket read buffer, so they do not require copying.
do {
uv_buf_t buf = stream->EmitAlloc(len);
ssize_t avail = len;
if (static_cast<ssize_t>(buf.len) < avail)
avail = buf.len;
Local<Object> buf =
Buffer::New(env, session->stream_buf_ab_, offset, len).ToLocalChecked();
// `buf.base == nullptr` is the default Http2StreamListener's way
// of saying that it wants a pointer to the raw original.
// Since it has access to the original socket buffer from which the data
// was read in the first place, it can use that to minizime ArrayBuffer
// allocations.
if (LIKELY(buf.base == nullptr))
buf.base = reinterpret_cast<char*>(const_cast<uint8_t*>(data));
else
memcpy(buf.base, data, avail);
data += avail;
len -= avail;
stream->EmitRead(avail, buf);
stream->EmitData(len, buf, Local<Object>());
if (!stream->IsReading())
stream->inbound_consumed_data_while_paused_ += len;
else
nghttp2_session_consume_stream(handle, id, len);
// If the stream owner (e.g. the JS Http2Stream) wants more data, just
// tell nghttp2 that all data has been consumed. Otherwise, defer until
// more data is being requested.
if (stream->IsReading())
nghttp2_session_consume_stream(handle, id, avail);
else
stream->inbound_consumed_data_while_paused_ += avail;
} while (len != 0);
}
return 0;
}
@ -1129,6 +1134,38 @@ inline void Http2Session::GetTrailers(Http2Stream* stream, uint32_t* flags) {
}
}
uv_buf_t Http2StreamListener::OnStreamAlloc(size_t size) {
// See the comments in Http2Session::OnDataChunkReceived
// (which is the only possible call site for this method).
return uv_buf_init(nullptr, size);
}
void Http2StreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
Http2Stream* stream = static_cast<Http2Stream*>(stream_);
Http2Session* session = stream->session();
Environment* env = stream->env();
if (nread < 0) {
PassReadErrorToPreviousListener(nread);
return;
}
CHECK(!session->stream_buf_ab_.IsEmpty());
// There is a single large array buffer for the entire data read from the
// network; create a slice of that array buffer and emit it as the
// received data buffer.
size_t offset = buf.base - session->stream_buf_.base;
// Verify that the data offset is inside the current read buffer.
CHECK_LE(offset, session->stream_buf_.len);
CHECK_LE(offset + buf.len, session->stream_buf_.len);
Local<Object> buffer =
Buffer::New(env, session->stream_buf_ab_, offset, nread).ToLocalChecked();
stream->CallJSOnreadMethod(nread, buffer);
}
Http2Stream::SubmitTrailers::SubmitTrailers(
Http2Session* session,
@ -1257,7 +1294,7 @@ inline void Http2Session::HandleDataFrame(const nghttp2_frame* frame) {
return;
if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
stream->EmitData(UV_EOF, Local<Object>(), Local<Object>());
stream->EmitRead(UV_EOF);
}
}
@ -1378,16 +1415,15 @@ inline void Http2Session::HandleSettingsFrame(const nghttp2_frame* frame) {
}
// Callback used when data has been written to the stream.
void Http2Session::OnStreamAfterWriteImpl(WriteWrap* w, int status, void* ctx) {
Http2Session* session = static_cast<Http2Session*>(ctx);
DEBUG_HTTP2SESSION2(session, "write finished with status %d", status);
void Http2Session::OnStreamAfterWrite(WriteWrap* w, int status) {
DEBUG_HTTP2SESSION2(this, "write finished with status %d", status);
// Inform all pending writes about their completion.
session->ClearOutgoing(status);
ClearOutgoing(status);
if (!(session->flags_ & SESSION_STATE_WRITE_SCHEDULED)) {
if (!(flags_ & SESSION_STATE_WRITE_SCHEDULED)) {
// Schedule a new write if nghttp2 wants to send data.
session->MaybeScheduleWrite();
MaybeScheduleWrite();
}
}
@ -1625,97 +1661,76 @@ WriteWrap* Http2Session::AllocateSend() {
Local<Object> obj =
env()->write_wrap_constructor_function()
->NewInstance(env()->context()).ToLocalChecked();
return WriteWrap::New(env(), obj, stream_);
}
// Allocates the data buffer used to receive inbound data from the i/o stream
void Http2Session::OnStreamAllocImpl(size_t suggested_size,
uv_buf_t* buf,
void* ctx) {
Http2Session* session = static_cast<Http2Session*>(ctx);
CHECK_EQ(session->stream_buf_, nullptr);
CHECK_EQ(session->stream_buf_size_, 0);
buf->base = session->stream_buf_ = Malloc(suggested_size);
buf->len = session->stream_buf_size_ = suggested_size;
session->IncrementCurrentSessionMemory(suggested_size);
return WriteWrap::New(env(), obj, static_cast<StreamBase*>(stream_));
}
// Callback used to receive inbound data from the i/o stream
void Http2Session::OnStreamReadImpl(ssize_t nread,
const uv_buf_t* buf,
uv_handle_type pending,
void* ctx) {
Http2Session* session = static_cast<Http2Session*>(ctx);
Http2Scope h2scope(session);
CHECK_NE(session->stream_, nullptr);
DEBUG_HTTP2SESSION2(session, "receiving %d bytes", nread);
void Http2Session::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
Http2Scope h2scope(this);
CHECK_NE(stream_, nullptr);
DEBUG_HTTP2SESSION2(this, "receiving %d bytes", nread);
IncrementCurrentSessionMemory(buf.len);
CHECK(stream_buf_ab_.IsEmpty());
if (nread <= 0) {
free(session->stream_buf_);
free(buf.base);
if (nread < 0) {
uv_buf_t tmp_buf = uv_buf_init(nullptr, 0);
session->prev_read_cb_.fn(nread,
&tmp_buf,
pending,
session->prev_read_cb_.ctx);
PassReadErrorToPreviousListener(nread);
}
} else {
// Only pass data on if nread > 0
// Makre sure that there was no read previously active.
CHECK_EQ(stream_buf_.base, nullptr);
CHECK_EQ(stream_buf_.len, 0);
// Remember the current buffer, so that OnDataChunkReceived knows the
// offset of a DATA frame's data into the socket read buffer.
stream_buf_ = uv_buf_init(buf.base, nread);
// Verify that currently: There is memory allocated into which
// the data has been read, and that memory buffer is at least as large
// as the amount of data we have read, but we have not yet made an
// ArrayBuffer out of it.
CHECK_NE(session->stream_buf_, nullptr);
CHECK_EQ(session->stream_buf_, buf->base);
CHECK_EQ(session->stream_buf_size_, buf->len);
CHECK_GE(session->stream_buf_size_, static_cast<size_t>(nread));
CHECK(session->stream_buf_ab_.IsEmpty());
CHECK_LE(static_cast<size_t>(nread), stream_buf_.len);
Environment* env = session->env();
Isolate* isolate = env->isolate();
Isolate* isolate = env()->isolate();
HandleScope scope(isolate);
Local<Context> context = env->context();
Context::Scope context_scope(context);
Context::Scope context_scope(env()->context());
// Create an array buffer for the read data. DATA frames will be emitted
// as slices of this array buffer to avoid having to copy memory.
session->stream_buf_ab_ =
stream_buf_ab_ =
ArrayBuffer::New(isolate,
session->stream_buf_,
session->stream_buf_size_,
buf.base,
nread,
v8::ArrayBufferCreationMode::kInternalized);
uv_buf_t buf_ = uv_buf_init(buf->base, nread);
session->statistics_.data_received += nread;
ssize_t ret = session->Write(&buf_, 1);
statistics_.data_received += nread;
ssize_t ret = Write(&stream_buf_, 1);
// Note: if ssize_t is not defined (e.g. on Win32), nghttp2 will typedef
// ssize_t to int. Cast here so that the < 0 check actually works on
// Windows.
if (static_cast<int>(ret) < 0) {
DEBUG_HTTP2SESSION2(session, "fatal error receiving data: %d", ret);
DEBUG_HTTP2SESSION2(this, "fatal error receiving data: %d", ret);
Local<Value> argv[1] = {
Local<Value> argv[] = {
Integer::New(isolate, ret),
};
session->MakeCallback(env->error_string(), arraysize(argv), argv);
MakeCallback(env()->error_string(), arraysize(argv), argv);
} else {
DEBUG_HTTP2SESSION2(session, "processed %d bytes. wants more? %d", ret,
nghttp2_session_want_read(**session));
DEBUG_HTTP2SESSION2(this, "processed %d bytes. wants more? %d", ret,
nghttp2_session_want_read(session_));
}
}
// Since we are finished handling this write, reset the stream buffer.
// The memory has either been free()d or was handed over to V8.
session->DecrementCurrentSessionMemory(session->stream_buf_size_);
session->stream_buf_ = nullptr;
session->stream_buf_size_ = 0;
session->stream_buf_ab_ = Local<ArrayBuffer>();
}
DecrementCurrentSessionMemory(buf.len);
void Http2Session::OnStreamDestructImpl(void* ctx) {
Http2Session* session = static_cast<Http2Session*>(ctx);
session->stream_ = nullptr;
stream_buf_ab_ = Local<ArrayBuffer>();
stream_buf_ = uv_buf_init(nullptr, 0);
}
// Every Http2Session session is tightly bound to a single i/o StreamBase
@ -1724,14 +1739,7 @@ void Http2Session::OnStreamDestructImpl(void* ctx) {
// C++ layer via the StreamBase API.
void Http2Session::Consume(Local<External> external) {
StreamBase* stream = static_cast<StreamBase*>(external->Value());
stream->Consume();
stream_ = stream;
prev_alloc_cb_ = stream->alloc_cb();
prev_read_cb_ = stream->read_cb();
stream->set_alloc_cb({ Http2Session::OnStreamAllocImpl, this });
stream->set_read_cb({ Http2Session::OnStreamReadImpl, this });
stream->set_after_write_cb({ Http2Session::OnStreamAfterWriteImpl, this });
stream->set_destruct_cb({ Http2Session::OnStreamDestructImpl, this });
stream->PushStreamListener(this);
DEBUG_HTTP2SESSION(this, "i/o stream consumed");
}
@ -1769,6 +1777,8 @@ Http2Stream::Http2Stream(
if (options & STREAM_OPTION_GET_TRAILERS)
flags_ |= NGHTTP2_STREAM_FLAG_TRAILERS;
PushStreamListener(&stream_listener_);
if (options & STREAM_OPTION_EMPTY_PAYLOAD)
Shutdown();
session->AddStream(this);

View File

@ -535,6 +535,12 @@ class Http2Priority {
nghttp2_priority_spec spec;
};
class Http2StreamListener : public StreamListener {
public:
uv_buf_t OnStreamAlloc(size_t suggested_size) override;
void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
};
class Http2Stream : public AsyncWrap,
public StreamBase {
public:
@ -747,6 +753,8 @@ class Http2Stream : public AsyncWrap,
int64_t fd_offset_ = 0;
int64_t fd_length_ = -1;
Http2StreamListener stream_listener_;
friend class Http2Session;
};
@ -798,7 +806,7 @@ class Http2Stream::Provider::Stream : public Http2Stream::Provider {
};
class Http2Session : public AsyncWrap {
class Http2Session : public AsyncWrap, public StreamListener {
public:
Http2Session(Environment* env,
Local<Object> wrap,
@ -872,21 +880,11 @@ class Http2Session : public AsyncWrap {
size_t self_size() const override { return sizeof(*this); }
char* stream_alloc() {
return stream_buf_;
}
inline void GetTrailers(Http2Stream* stream, uint32_t* flags);
static void OnStreamAllocImpl(size_t suggested_size,
uv_buf_t* buf,
void* ctx);
static void OnStreamReadImpl(ssize_t nread,
const uv_buf_t* bufs,
uv_handle_type pending,
void* ctx);
static void OnStreamAfterWriteImpl(WriteWrap* w, int status, void* ctx);
static void OnStreamDestructImpl(void* ctx);
// Handle reads/writes from the underlying network transport.
void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
void OnStreamAfterWrite(WriteWrap* w, int status) override;
// The JavaScript API
static void New(const FunctionCallbackInfo<Value>& args);
@ -1074,16 +1072,12 @@ class Http2Session : public AsyncWrap {
int flags_ = SESSION_STATE_NONE;
// The StreamBase instance being used for i/o
StreamBase* stream_;
StreamResource::Callback<StreamResource::AllocCb> prev_alloc_cb_;
StreamResource::Callback<StreamResource::ReadCb> prev_read_cb_;
padding_strategy_type padding_strategy_ = PADDING_STRATEGY_NONE;
// use this to allow timeout tracking during long-lasting writes
uint32_t chunks_sent_since_last_write_ = 0;
char* stream_buf_ = nullptr;
size_t stream_buf_size_ = 0;
uv_buf_t stream_buf_ = uv_buf_init(nullptr, 0);
v8::Local<v8::ArrayBuffer> stream_buf_ab_;
size_t max_outstanding_pings_ = DEFAULT_MAX_PINGS;
@ -1099,6 +1093,7 @@ class Http2Session : public AsyncWrap {
void ClearOutgoing(int status);
friend class Http2Scope;
friend class Http2StreamListener;
};
class Http2SessionPerformanceEntry : public PerformanceEntry {

View File

@ -144,7 +144,7 @@ struct StringPtr {
};
class Parser : public AsyncWrap {
class Parser : public AsyncWrap, public StreamListener {
public:
Parser(Environment* env, Local<Object> wrap, enum http_parser_type type)
: AsyncWrap(env, wrap, AsyncWrap::PROVIDER_HTTPPARSER),
@ -494,14 +494,7 @@ class Parser : public AsyncWrap {
Local<External> stream_obj = args[0].As<External>();
StreamBase* stream = static_cast<StreamBase*>(stream_obj->Value());
CHECK_NE(stream, nullptr);
stream->Consume();
parser->prev_alloc_cb_ = stream->alloc_cb();
parser->prev_read_cb_ = stream->read_cb();
stream->set_alloc_cb({ OnAllocImpl, parser });
stream->set_read_cb({ OnReadImpl, parser });
stream->PushStreamListener(parser);
}
@ -510,22 +503,10 @@ class Parser : public AsyncWrap {
ASSIGN_OR_RETURN_UNWRAP(&parser, args.Holder());
// Already unconsumed
if (parser->prev_alloc_cb_.is_empty())
if (parser->stream_ == nullptr)
return;
// Restore stream's callbacks
if (args.Length() == 1 && args[0]->IsExternal()) {
Local<External> stream_obj = args[0].As<External>();
StreamBase* stream = static_cast<StreamBase*>(stream_obj->Value());
CHECK_NE(stream, nullptr);
stream->set_alloc_cb(parser->prev_alloc_cb_);
stream->set_read_cb(parser->prev_read_cb_);
stream->Unconsume();
}
parser->prev_alloc_cb_.clear();
parser->prev_read_cb_.clear();
parser->stream_->RemoveStreamListener(parser);
}
@ -544,33 +525,19 @@ class Parser : public AsyncWrap {
protected:
static const size_t kAllocBufferSize = 64 * 1024;
static void OnAllocImpl(size_t suggested_size, uv_buf_t* buf, void* ctx) {
Parser* parser = static_cast<Parser*>(ctx);
Environment* env = parser->env();
uv_buf_t OnStreamAlloc(size_t suggested_size) override {
if (env()->http_parser_buffer() == nullptr)
env()->set_http_parser_buffer(new char[kAllocBufferSize]);
if (env->http_parser_buffer() == nullptr)
env->set_http_parser_buffer(new char[kAllocBufferSize]);
buf->base = env->http_parser_buffer();
buf->len = kAllocBufferSize;
return uv_buf_init(env()->http_parser_buffer(), kAllocBufferSize);
}
static void OnReadImpl(ssize_t nread,
const uv_buf_t* buf,
uv_handle_type pending,
void* ctx) {
Parser* parser = static_cast<Parser*>(ctx);
HandleScope scope(parser->env()->isolate());
void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override {
HandleScope scope(env()->isolate());
if (nread < 0) {
uv_buf_t tmp_buf;
tmp_buf.base = nullptr;
tmp_buf.len = 0;
parser->prev_read_cb_.fn(nread,
&tmp_buf,
pending,
parser->prev_read_cb_.ctx);
PassReadErrorToPreviousListener(nread);
return;
}
@ -578,27 +545,27 @@ class Parser : public AsyncWrap {
if (nread == 0)
return;
parser->current_buffer_.Clear();
Local<Value> ret = parser->Execute(buf->base, nread);
current_buffer_.Clear();
Local<Value> ret = Execute(buf.base, nread);
// Exception
if (ret.IsEmpty())
return;
Local<Object> obj = parser->object();
Local<Value> cb = obj->Get(kOnExecute);
Local<Value> cb =
object()->Get(env()->context(), kOnExecute).ToLocalChecked();
if (!cb->IsFunction())
return;
// Hooks for GetCurrentBuffer
parser->current_buffer_len_ = nread;
parser->current_buffer_data_ = buf->base;
current_buffer_len_ = nread;
current_buffer_data_ = buf.base;
parser->MakeCallback(cb.As<Function>(), 1, &ret);
MakeCallback(cb.As<Function>(), 1, &ret);
parser->current_buffer_len_ = 0;
parser->current_buffer_data_ = nullptr;
current_buffer_len_ = 0;
current_buffer_data_ = nullptr;
}
@ -713,8 +680,6 @@ class Parser : public AsyncWrap {
Local<Object> current_buffer_;
size_t current_buffer_len_;
char* current_buffer_data_;
StreamResource::Callback<StreamResource::AllocCb> prev_alloc_cb_;
StreamResource::Callback<StreamResource::ReadCb> prev_read_cb_;
// These are helper functions for filling `http_parser_settings`, which turn
// a member function of Parser into a C-style HTTP parser callback.

View File

@ -29,6 +29,7 @@
#include "node_buffer.h"
#include "node_wrap.h"
#include "connect_wrap.h"
#include "stream_base-inl.h"
#include "stream_wrap.h"
#include "util-inl.h"

View File

@ -22,6 +22,7 @@
#include "env-inl.h"
#include "handle_wrap.h"
#include "node_wrap.h"
#include "stream_base-inl.h"
#include "util-inl.h"
#include <string.h>

View File

@ -25,6 +25,87 @@ using v8::Value;
using AsyncHooks = Environment::AsyncHooks;
inline StreamListener::~StreamListener() {
if (stream_ != nullptr)
stream_->RemoveStreamListener(this);
}
inline void StreamListener::PassReadErrorToPreviousListener(ssize_t nread) {
CHECK_NE(previous_listener_, nullptr);
previous_listener_->OnStreamRead(nread,
uv_buf_init(nullptr, 0),
UV_UNKNOWN_HANDLE);
}
inline StreamResource::~StreamResource() {
while (listener_ != nullptr) {
listener_->OnStreamDestroy();
RemoveStreamListener(listener_);
}
}
inline void StreamResource::PushStreamListener(StreamListener* listener) {
CHECK_NE(listener, nullptr);
CHECK_EQ(listener->stream_, nullptr);
listener->previous_listener_ = listener_;
listener->stream_ = this;
listener_ = listener;
}
inline void StreamResource::RemoveStreamListener(StreamListener* listener) {
CHECK_NE(listener, nullptr);
StreamListener* previous;
StreamListener* current;
// Remove from the linked list.
for (current = listener_, previous = nullptr;
/* No loop condition because we want a crash if listener is not found */
; previous = current, current = current->previous_listener_) {
CHECK_NE(current, nullptr);
if (current == listener) {
if (previous != nullptr)
previous->previous_listener_ = current->previous_listener_;
else
listener_ = listener->previous_listener_;
break;
}
}
listener->stream_ = nullptr;
listener->previous_listener_ = nullptr;
}
inline uv_buf_t StreamResource::EmitAlloc(size_t suggested_size) {
return listener_->OnStreamAlloc(suggested_size);
}
inline void StreamResource::EmitRead(ssize_t nread,
const uv_buf_t& buf,
uv_handle_type pending) {
if (nread > 0)
bytes_read_ += static_cast<uint64_t>(nread);
listener_->OnStreamRead(nread, buf, pending);
}
inline void StreamResource::EmitAfterWrite(WriteWrap* w, int status) {
listener_->OnStreamAfterWrite(w, status);
}
inline StreamBase::StreamBase(Environment* env) : env_(env) {
PushStreamListener(&default_listener_);
}
inline Environment* StreamBase::stream_env() const {
return env_;
}
template <class Base>
void StreamBase::AddMethods(Environment* env,
Local<FunctionTemplate> t,
@ -70,8 +151,8 @@ void StreamBase::AddMethods(Environment* env,
Local<FunctionTemplate>(),
attributes);
env->SetProtoMethod(t, "readStart", JSMethod<Base, &StreamBase::ReadStart>);
env->SetProtoMethod(t, "readStop", JSMethod<Base, &StreamBase::ReadStop>);
env->SetProtoMethod(t, "readStart", JSMethod<Base, &StreamBase::ReadStartJS>);
env->SetProtoMethod(t, "readStop", JSMethod<Base, &StreamBase::ReadStopJS>);
if ((flags & kFlagNoShutdown) == 0)
env->SetProtoMethod(t, "shutdown", JSMethod<Base, &StreamBase::Shutdown>);
if ((flags & kFlagHasWritev) != 0)

View File

@ -34,12 +34,12 @@ template int StreamBase::WriteString<LATIN1>(
const FunctionCallbackInfo<Value>& args);
int StreamBase::ReadStart(const FunctionCallbackInfo<Value>& args) {
int StreamBase::ReadStartJS(const FunctionCallbackInfo<Value>& args) {
return ReadStart();
}
int StreamBase::ReadStop(const FunctionCallbackInfo<Value>& args) {
int StreamBase::ReadStopJS(const FunctionCallbackInfo<Value>& args) {
return ReadStop();
}
@ -437,9 +437,9 @@ void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) {
}
void StreamBase::EmitData(ssize_t nread,
Local<Object> buf,
Local<Object> handle) {
void StreamBase::CallJSOnreadMethod(ssize_t nread,
Local<Object> buf,
Local<Object> handle) {
Environment* env = env_;
Local<Value> argv[] = {
@ -490,4 +490,43 @@ void StreamResource::ClearError() {
// No-op
}
uv_buf_t StreamListener::OnStreamAlloc(size_t suggested_size) {
return uv_buf_init(Malloc(suggested_size), suggested_size);
}
void StreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
// This cannot be virtual because it is just as valid to override the other
// OnStreamRead() callback.
CHECK(0 && "OnStreamRead() needs to be implemented");
}
void StreamListener::OnStreamRead(ssize_t nread,
const uv_buf_t& buf,
uv_handle_type pending) {
CHECK_EQ(pending, UV_UNKNOWN_HANDLE);
OnStreamRead(nread, buf);
}
void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
CHECK_NE(stream_, nullptr);
StreamBase* stream = static_cast<StreamBase*>(stream_);
Environment* env = stream->stream_env();
HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context());
if (nread <= 0) {
free(buf.base);
if (nread < 0)
stream->CallJSOnreadMethod(nread, Local<Object>());
return;
}
CHECK_LE(static_cast<size_t>(nread), buf.len);
Local<Object> obj = Buffer::New(env, buf.base, nread).ToLocalChecked();
stream->CallJSOnreadMethod(nread, obj);
}
} // namespace node

View File

@ -15,6 +15,7 @@ namespace node {
// Forward declarations
class StreamBase;
class StreamResource;
template<typename Base>
class StreamReq {
@ -123,38 +124,78 @@ class WriteWrap : public ReqWrap<uv_write_t>,
const size_t storage_size_;
};
// This is the generic interface for objects that control Node.js' C++ streams.
// For example, the default `EmitToJSStreamListener` emits a stream's data
// as Buffers in JS, or `TLSWrap` reads and decrypts data from a stream.
class StreamListener {
public:
virtual ~StreamListener();
// This is called when a stream wants to allocate memory immediately before
// reading data into the freshly allocated buffer (i.e. it is always followed
// by a `OnStreamRead()` call).
// This memory may be statically or dynamically allocated; for example,
// a protocol parser may want to read data into a static buffer if it knows
// that all data is going to be fully handled during the next
// `OnStreamRead()` call.
// The returned buffer does not need to contain `suggested_size` bytes.
// The default implementation of this method returns a buffer that has exactly
// the suggested size and is allocated using malloc().
virtual uv_buf_t OnStreamAlloc(size_t suggested_size);
// `OnStreamRead()` is called when data is available on the socket and has
// been read into the buffer provided by `OnStreamAlloc()`.
// The `buf` argument is the return value of `uv_buf_t`, or may be a buffer
// with base nullpptr in case of an error.
// `nread` is the number of read bytes (which is at most the buffer length),
// or, if negative, a libuv error code.
// The variant with a `uv_handle_type` argument is used by libuv-backed
// streams for handle transfers (e.g. passing net.Socket instances between
// cluster workers). For all other streams, overriding the simple variant
// should be sufficient.
// By default, the second variant crashes if `pending` is set and otherwise
// calls the simple variant.
virtual void OnStreamRead(ssize_t nread,
const uv_buf_t& buf) = 0;
virtual void OnStreamRead(ssize_t nread,
const uv_buf_t& buf,
uv_handle_type pending);
// This is called once a Write has finished. `status` may be 0 or,
// if negative, a libuv error code.
virtual void OnStreamAfterWrite(WriteWrap* w, int status) {}
// This is called immediately before the stream is destroyed.
virtual void OnStreamDestroy() {}
protected:
// Pass along a read error to the `StreamListener` instance that was active
// before this one. For example, a protocol parser does not care about read
// errors and may instead want to let the original handler
// (e.g. the JS handler) take care of the situation.
void PassReadErrorToPreviousListener(ssize_t nread);
StreamResource* stream_ = nullptr;
StreamListener* previous_listener_ = nullptr;
friend class StreamResource;
};
// A default emitter that just pushes data chunks as Buffer instances to
// JS land via the handles .ondata method.
class EmitToJSStreamListener : public StreamListener {
public:
void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
};
// A generic stream, comparable to JS lands `Duplex` streams.
// A stream is always controlled through one `StreamListener` instance.
class StreamResource {
public:
template <class T>
struct Callback {
Callback() : fn(nullptr), ctx(nullptr) {}
Callback(T fn, void* ctx) : fn(fn), ctx(ctx) {}
Callback(const Callback&) = default;
inline bool is_empty() { return fn == nullptr; }
inline void clear() {
fn = nullptr;
ctx = nullptr;
}
T fn;
void* ctx;
};
typedef void (*AfterWriteCb)(WriteWrap* w, int status, void* ctx);
typedef void (*AllocCb)(size_t size, uv_buf_t* buf, void* ctx);
typedef void (*ReadCb)(ssize_t nread,
const uv_buf_t* buf,
uv_handle_type pending,
void* ctx);
typedef void (*DestructCb)(void* ctx);
StreamResource() : bytes_read_(0) {
}
virtual ~StreamResource() {
if (!destruct_cb_.is_empty())
destruct_cb_.fn(destruct_cb_.ctx);
}
virtual ~StreamResource();
virtual int DoShutdown(ShutdownWrap* req_wrap) = 0;
virtual int DoTryWrite(uv_buf_t** bufs, size_t* count);
@ -162,50 +203,45 @@ class StreamResource {
uv_buf_t* bufs,
size_t count,
uv_stream_t* send_handle) = 0;
// Start reading from the underlying resource. This is called by the consumer
// when more data is desired.
virtual int ReadStart() = 0;
// Stop reading from the underlying resource. This is called by the
// consumer when its buffers are full and no more data can be handled.
virtual int ReadStop() = 0;
// Optionally, this may provide an error message to be used for
// failing writes.
virtual const char* Error() const;
// Clear the current error (i.e. that would be returned by Error()).
virtual void ClearError();
// Events
inline void EmitAfterWrite(WriteWrap* w, int status) {
if (!after_write_cb_.is_empty())
after_write_cb_.fn(w, status, after_write_cb_.ctx);
}
inline void EmitAlloc(size_t size, uv_buf_t* buf) {
if (!alloc_cb_.is_empty())
alloc_cb_.fn(size, buf, alloc_cb_.ctx);
}
inline void EmitRead(ssize_t nread,
const uv_buf_t* buf,
uv_handle_type pending = UV_UNKNOWN_HANDLE) {
if (nread > 0)
bytes_read_ += static_cast<uint64_t>(nread);
if (!read_cb_.is_empty())
read_cb_.fn(nread, buf, pending, read_cb_.ctx);
}
inline void set_after_write_cb(Callback<AfterWriteCb> c) {
after_write_cb_ = c;
}
inline void set_alloc_cb(Callback<AllocCb> c) { alloc_cb_ = c; }
inline void set_read_cb(Callback<ReadCb> c) { read_cb_ = c; }
inline void set_destruct_cb(Callback<DestructCb> c) { destruct_cb_ = c; }
inline Callback<AfterWriteCb> after_write_cb() { return after_write_cb_; }
inline Callback<AllocCb> alloc_cb() { return alloc_cb_; }
inline Callback<ReadCb> read_cb() { return read_cb_; }
inline Callback<DestructCb> destruct_cb() { return destruct_cb_; }
// Transfer ownership of this tream to `listener`. The previous listener
// will not receive any more callbacks while the new listener was active.
void PushStreamListener(StreamListener* listener);
// Remove a listener, and, if this was the currently active one,
// transfer ownership back to the previous listener.
void RemoveStreamListener(StreamListener* listener);
protected:
Callback<AfterWriteCb> after_write_cb_;
Callback<AllocCb> alloc_cb_;
Callback<ReadCb> read_cb_;
Callback<DestructCb> destruct_cb_;
uint64_t bytes_read_;
// Call the current listener's OnStreamAlloc() method.
uv_buf_t EmitAlloc(size_t suggested_size);
// Call the current listener's OnStreamRead() method and update the
// stream's read byte counter.
void EmitRead(ssize_t nread,
const uv_buf_t& buf = uv_buf_init(nullptr, 0),
uv_handle_type pending = UV_UNKNOWN_HANDLE);
// Call the current listener's OnStreamAfterWrite() method.
void EmitAfterWrite(WriteWrap* w, int status);
StreamListener* listener_ = nullptr;
uint64_t bytes_read_ = 0;
friend class StreamListener;
};
class StreamBase : public StreamResource {
public:
enum Flags {
@ -224,40 +260,29 @@ class StreamBase : public StreamResource {
virtual bool IsIPCPipe();
virtual int GetFD();
virtual int ReadStart() = 0;
virtual int ReadStop() = 0;
inline void Consume() {
CHECK_EQ(consumed_, false);
consumed_ = true;
}
inline void Unconsume() {
CHECK_EQ(consumed_, true);
consumed_ = false;
}
void EmitData(ssize_t nread,
v8::Local<v8::Object> buf,
v8::Local<v8::Object> handle);
void CallJSOnreadMethod(
ssize_t nread,
v8::Local<v8::Object> buf,
v8::Local<v8::Object> handle = v8::Local<v8::Object>());
// These are called by the respective {Write,Shutdown}Wrap class.
virtual void AfterShutdown(ShutdownWrap* req, int status);
virtual void AfterWrite(WriteWrap* req, int status);
protected:
explicit StreamBase(Environment* env) : env_(env), consumed_(false) {
}
// This is named `stream_env` to avoid name clashes, because a lot of
// subclasses are also `BaseObject`s.
Environment* stream_env() const;
virtual ~StreamBase() = default;
protected:
explicit StreamBase(Environment* env);
// One of these must be implemented
virtual AsyncWrap* GetAsyncWrap() = 0;
virtual v8::Local<v8::Object> GetObject();
// JS Methods
int ReadStart(const v8::FunctionCallbackInfo<v8::Value>& args);
int ReadStop(const v8::FunctionCallbackInfo<v8::Value>& args);
int ReadStartJS(const v8::FunctionCallbackInfo<v8::Value>& args);
int ReadStopJS(const v8::FunctionCallbackInfo<v8::Value>& args);
int Shutdown(const v8::FunctionCallbackInfo<v8::Value>& args);
int Writev(const v8::FunctionCallbackInfo<v8::Value>& args);
int WriteBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
@ -280,7 +305,7 @@ class StreamBase : public StreamResource {
private:
Environment* env_;
bool consumed_;
EmitToJSStreamListener default_listener_;
};
} // namespace node

View File

@ -93,8 +93,7 @@ LibuvStreamWrap::LibuvStreamWrap(Environment* env,
provider),
StreamBase(env),
stream_(stream) {
set_alloc_cb({ OnAllocImpl, this });
set_read_cb({ OnReadImpl, this });
PushStreamListener(this);
}
@ -157,23 +156,18 @@ int LibuvStreamWrap::ReadStop() {
void LibuvStreamWrap::OnAlloc(uv_handle_t* handle,
size_t suggested_size,
uv_buf_t* buf) {
size_t suggested_size,
uv_buf_t* buf) {
LibuvStreamWrap* wrap = static_cast<LibuvStreamWrap*>(handle->data);
HandleScope scope(wrap->env()->isolate());
Context::Scope context_scope(wrap->env()->context());
CHECK_EQ(wrap->stream(), reinterpret_cast<uv_stream_t*>(handle));
return wrap->EmitAlloc(suggested_size, buf);
*buf = wrap->EmitAlloc(suggested_size);
}
void LibuvStreamWrap::OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx) {
buf->base = node::Malloc(size);
buf->len = size;
}
template <class WrapType, class UVType>
static Local<Object> AcceptHandle(Environment* env, LibuvStreamWrap* parent) {
@ -196,51 +190,41 @@ static Local<Object> AcceptHandle(Environment* env, LibuvStreamWrap* parent) {
}
void LibuvStreamWrap::OnReadImpl(ssize_t nread,
const uv_buf_t* buf,
uv_handle_type pending,
void* ctx) {
LibuvStreamWrap* wrap = static_cast<LibuvStreamWrap*>(ctx);
Environment* env = wrap->env();
HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context());
void LibuvStreamWrap::OnStreamRead(ssize_t nread,
const uv_buf_t& buf,
uv_handle_type pending) {
HandleScope handle_scope(env()->isolate());
Context::Scope context_scope(env()->context());
if (nread <= 0) {
free(buf.base);
if (nread < 0)
CallJSOnreadMethod(nread, Local<Object>());
return;
}
CHECK_LE(static_cast<size_t>(nread), buf.len);
Local<Object> pending_obj;
if (nread < 0) {
if (buf->base != nullptr)
free(buf->base);
wrap->EmitData(nread, Local<Object>(), pending_obj);
return;
}
if (nread == 0) {
if (buf->base != nullptr)
free(buf->base);
return;
}
CHECK_LE(static_cast<size_t>(nread), buf->len);
char* base = node::Realloc(buf->base, nread);
if (pending == UV_TCP) {
pending_obj = AcceptHandle<TCPWrap, uv_tcp_t>(env, wrap);
pending_obj = AcceptHandle<TCPWrap, uv_tcp_t>(env(), this);
} else if (pending == UV_NAMED_PIPE) {
pending_obj = AcceptHandle<PipeWrap, uv_pipe_t>(env, wrap);
pending_obj = AcceptHandle<PipeWrap, uv_pipe_t>(env(), this);
} else if (pending == UV_UDP) {
pending_obj = AcceptHandle<UDPWrap, uv_udp_t>(env, wrap);
pending_obj = AcceptHandle<UDPWrap, uv_udp_t>(env(), this);
} else {
CHECK_EQ(pending, UV_UNKNOWN_HANDLE);
}
Local<Object> obj = Buffer::New(env, base, nread).ToLocalChecked();
wrap->EmitData(nread, obj, pending_obj);
Local<Object> obj = Buffer::New(env(), buf.base, nread).ToLocalChecked();
CallJSOnreadMethod(nread, obj, pending_obj);
}
void LibuvStreamWrap::OnRead(uv_stream_t* handle,
ssize_t nread,
const uv_buf_t* buf) {
ssize_t nread,
const uv_buf_t* buf) {
LibuvStreamWrap* wrap = static_cast<LibuvStreamWrap*>(handle->data);
HandleScope scope(wrap->env()->isolate());
Context::Scope context_scope(wrap->env()->context());
@ -263,7 +247,7 @@ void LibuvStreamWrap::OnRead(uv_stream_t* handle,
}
}
wrap->EmitRead(nread, buf, type);
wrap->EmitRead(nread, *buf, type);
}

View File

@ -33,7 +33,9 @@
namespace node {
class LibuvStreamWrap : public HandleWrap, public StreamBase {
class LibuvStreamWrap : public HandleWrap,
public StreamListener,
public StreamBase {
public:
static void Initialize(v8::Local<v8::Object> target,
v8::Local<v8::Value> unused,
@ -79,9 +81,6 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase {
uv_stream_t* stream,
AsyncWrap::ProviderType provider);
~LibuvStreamWrap() {
}
AsyncWrap* GetAsyncWrap() override;
static void AddMethods(Environment* env,
@ -105,11 +104,16 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase {
static void AfterUvShutdown(uv_shutdown_t* req, int status);
// Resource interface implementation
static void OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx);
static void OnReadImpl(ssize_t nread,
const uv_buf_t* buf,
uv_handle_type pending,
void* ctx);
void OnStreamRead(ssize_t nread,
const uv_buf_t& buf) override {
CHECK(0 && "must not be called");
}
void OnStreamRead(ssize_t nread,
const uv_buf_t& buf,
uv_handle_type pending) override;
void OnStreamAfterWrite(WriteWrap* w, int status) override {
previous_listener_->OnStreamAfterWrite(w, status);
}
void AfterWrite(WriteWrap* req_wrap, int status) override;

View File

@ -27,6 +27,7 @@
#include "node_buffer.h"
#include "node_wrap.h"
#include "connect_wrap.h"
#include "stream_base-inl.h"
#include "stream_wrap.h"
#include "util-inl.h"

View File

@ -59,7 +59,6 @@ TLSWrap::TLSWrap(Environment* env,
SSLWrap<TLSWrap>(env, sc, kind),
StreamBase(env),
sc_(sc),
stream_(stream),
enc_in_(nullptr),
enc_out_(nullptr),
write_size_(0),
@ -78,14 +77,7 @@ TLSWrap::TLSWrap(Environment* env,
SSL_CTX_sess_set_get_cb(sc_->ctx_, SSLWrap<TLSWrap>::GetSessionCallback);
SSL_CTX_sess_set_new_cb(sc_->ctx_, SSLWrap<TLSWrap>::NewSessionCallback);
stream_->Consume();
stream_->set_after_write_cb({ OnAfterWriteImpl, this });
stream_->set_alloc_cb({ OnAllocImpl, this });
stream_->set_read_cb({ OnReadImpl, this });
stream_->set_destruct_cb({ OnDestructImpl, this });
set_alloc_cb({ OnAllocSelf, this });
set_read_cb({ OnReadSelf, this });
stream->PushStreamListener(this);
InitSSL();
}
@ -100,19 +92,6 @@ TLSWrap::~TLSWrap() {
#ifdef SSL_CTRL_SET_TLSEXT_SERVERNAME_CB
sni_context_.Reset();
#endif // SSL_CTRL_SET_TLSEXT_SERVERNAME_CB
// See test/parallel/test-tls-transport-destroy-after-own-gc.js:
// If this TLSWrap is garbage collected, we cannot allow callbacks to be
// called on this stream.
if (stream_ == nullptr)
return;
stream_->set_destruct_cb({ nullptr, nullptr });
stream_->set_after_write_cb({ nullptr, nullptr });
stream_->set_alloc_cb({ nullptr, nullptr });
stream_->set_read_cb({ nullptr, nullptr });
stream_->set_destruct_cb({ nullptr, nullptr });
stream_->Unconsume();
}
@ -208,15 +187,13 @@ void TLSWrap::Receive(const FunctionCallbackInfo<Value>& args) {
char* data = Buffer::Data(args[0]);
size_t len = Buffer::Length(args[0]);
uv_buf_t buf;
// Copy given buffer entirely or partiall if handle becomes closed
while (len > 0 && wrap->IsAlive() && !wrap->IsClosing()) {
wrap->stream_->EmitAlloc(len, &buf);
uv_buf_t buf = wrap->OnStreamAlloc(len);
size_t copy = buf.len > len ? len : buf.len;
memcpy(buf.base, data, copy);
buf.len = copy;
wrap->stream_->EmitRead(buf.len, &buf);
wrap->OnStreamRead(copy, buf);
data += copy;
len -= copy;
@ -307,7 +284,7 @@ void TLSWrap::EncOut() {
->NewInstance(env()->context()).ToLocalChecked();
WriteWrap* write_req = WriteWrap::New(env(),
req_wrap_obj,
stream_);
static_cast<StreamBase*>(stream_));
uv_buf_t buf[arraysize(data)];
for (size_t i = 0; i < count; i++)
@ -324,7 +301,7 @@ void TLSWrap::EncOut() {
}
void TLSWrap::EncOutAfterWrite(WriteWrap* req_wrap, int status) {
void TLSWrap::OnStreamAfterWrite(WriteWrap* req_wrap, int status) {
// We should not be getting here after `DestroySSL`, because all queued writes
// must be invoked with UV_ECANCELED
CHECK_NE(ssl_, nullptr);
@ -421,12 +398,11 @@ void TLSWrap::ClearOut() {
while (read > 0) {
int avail = read;
uv_buf_t buf;
EmitAlloc(avail, &buf);
uv_buf_t buf = EmitAlloc(avail);
if (static_cast<int>(buf.len) < avail)
avail = buf.len;
memcpy(buf.base, current, avail);
EmitRead(avail, &buf);
EmitRead(avail, buf);
// Caveat emptor: OnRead() calls into JS land which can result in
// the SSL context object being destroyed. We have to carefully
@ -442,7 +418,7 @@ void TLSWrap::ClearOut() {
int flags = SSL_get_shutdown(ssl_);
if (!eof_ && flags & SSL_RECEIVED_SHUTDOWN) {
eof_ = true;
EmitRead(UV_EOF, nullptr);
EmitRead(UV_EOF);
}
// We need to check whether an error occurred or the connection was
@ -524,22 +500,24 @@ AsyncWrap* TLSWrap::GetAsyncWrap() {
bool TLSWrap::IsIPCPipe() {
return stream_->IsIPCPipe();
return static_cast<StreamBase*>(stream_)->IsIPCPipe();
}
int TLSWrap::GetFD() {
return stream_->GetFD();
return static_cast<StreamBase*>(stream_)->GetFD();
}
bool TLSWrap::IsAlive() {
return ssl_ != nullptr && stream_ != nullptr && stream_->IsAlive();
return ssl_ != nullptr &&
stream_ != nullptr &&
static_cast<StreamBase*>(stream_)->IsAlive();
}
bool TLSWrap::IsClosing() {
return stream_->IsClosing();
return static_cast<StreamBase*>(stream_)->IsClosing();
}
@ -638,62 +616,16 @@ int TLSWrap::DoWrite(WriteWrap* w,
}
void TLSWrap::OnAfterWriteImpl(WriteWrap* w, int status, void* ctx) {
TLSWrap* wrap = static_cast<TLSWrap*>(ctx);
wrap->EncOutAfterWrite(w, status);
uv_buf_t TLSWrap::OnStreamAlloc(size_t suggested_size) {
CHECK_NE(ssl_, nullptr);
size_t size = suggested_size;
char* base = crypto::NodeBIO::FromBIO(enc_in_)->PeekWritable(&size);
return uv_buf_init(base, size);
}
void TLSWrap::OnAllocImpl(size_t suggested_size, uv_buf_t* buf, void* ctx) {
TLSWrap* wrap = static_cast<TLSWrap*>(ctx);
if (wrap->ssl_ == nullptr) {
*buf = uv_buf_init(nullptr, 0);
return;
}
size_t size = 0;
buf->base = crypto::NodeBIO::FromBIO(wrap->enc_in_)->PeekWritable(&size);
buf->len = size;
}
void TLSWrap::OnReadImpl(ssize_t nread,
const uv_buf_t* buf,
uv_handle_type pending,
void* ctx) {
TLSWrap* wrap = static_cast<TLSWrap*>(ctx);
wrap->DoRead(nread, buf, pending);
}
void TLSWrap::OnDestructImpl(void* ctx) {
TLSWrap* wrap = static_cast<TLSWrap*>(ctx);
wrap->clear_stream();
}
void TLSWrap::OnAllocSelf(size_t suggested_size, uv_buf_t* buf, void* ctx) {
buf->base = node::Malloc(suggested_size);
buf->len = suggested_size;
}
void TLSWrap::OnReadSelf(ssize_t nread,
const uv_buf_t* buf,
uv_handle_type pending,
void* ctx) {
TLSWrap* wrap = static_cast<TLSWrap*>(ctx);
Local<Object> buf_obj;
if (buf != nullptr)
buf_obj = Buffer::New(wrap->env(), buf->base, buf->len).ToLocalChecked();
wrap->EmitData(nread, buf_obj, Local<Object>());
}
void TLSWrap::DoRead(ssize_t nread,
const uv_buf_t* buf,
uv_handle_type pending) {
void TLSWrap::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
if (nread < 0) {
// Error should be emitted only after all data was read
ClearOut();
@ -705,13 +637,13 @@ void TLSWrap::DoRead(ssize_t nread,
eof_ = true;
}
EmitRead(nread, nullptr);
EmitRead(nread);
return;
}
// Only client connections can receive data
if (ssl_ == nullptr) {
EmitRead(UV_EPROTO, nullptr);
EmitRead(UV_EPROTO);
return;
}
@ -800,6 +732,9 @@ void TLSWrap::DestroySSL(const FunctionCallbackInfo<Value>& args) {
// Destroy the SSL structure and friends
wrap->SSLWrap<TLSWrap>::DestroySSL();
if (wrap->stream_ != nullptr)
wrap->stream_->RemoveStreamListener(wrap);
}

View File

@ -48,7 +48,8 @@ class NodeBIO;
class TLSWrap : public AsyncWrap,
public crypto::SSLWrap<TLSWrap>,
public StreamBase {
public StreamBase,
public StreamListener {
public:
~TLSWrap() override;
@ -76,8 +77,6 @@ class TLSWrap : public AsyncWrap,
size_t self_size() const override { return sizeof(*this); }
void clear_stream() { stream_ = nullptr; }
protected:
static const int kClearOutChunkSize = 16384;
@ -98,7 +97,6 @@ class TLSWrap : public AsyncWrap,
static void SSLInfoCallback(const SSL* ssl_, int where, int ret);
void InitSSL();
void EncOut();
void EncOutAfterWrite(WriteWrap* req_wrap, int status);
bool ClearIn();
void ClearOut();
bool InvokeQueued(int status, const char* error_str = nullptr);
@ -119,20 +117,9 @@ class TLSWrap : public AsyncWrap,
bool IsIPCPipe() override;
// Resource implementation
static void OnAfterWriteImpl(WriteWrap* w, int status, void* ctx);
static void OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx);
static void OnReadImpl(ssize_t nread,
const uv_buf_t* buf,
uv_handle_type pending,
void* ctx);
static void OnAllocSelf(size_t size, uv_buf_t* buf, void* ctx);
static void OnReadSelf(ssize_t nread,
const uv_buf_t* buf,
uv_handle_type pending,
void* ctx);
static void OnDestructImpl(void* ctx);
void DoRead(ssize_t nread, const uv_buf_t* buf, uv_handle_type pending);
void OnStreamAfterWrite(WriteWrap* w, int status) override;
uv_buf_t OnStreamAlloc(size_t size) override;
void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
v8::Local<v8::Value> GetSSLError(int status, int* err, std::string* msg);
@ -154,7 +141,6 @@ class TLSWrap : public AsyncWrap,
#endif // SSL_CTRL_SET_TLSEXT_SERVERNAME_CB
crypto::SecureContext* sc_;
StreamBase* stream_;
BIO* enc_in_;
BIO* enc_out_;
std::vector<uv_buf_t> pending_cleartext_input_;

View File

@ -26,6 +26,7 @@
#include "node_buffer.h"
#include "node_wrap.h"
#include "req_wrap-inl.h"
#include "stream_base-inl.h"
#include "stream_wrap.h"
#include "util-inl.h"

View File

@ -19,6 +19,7 @@ const server = net.createServer(common.mustCall((conn) => {
const socket = new tls.TLSSocket(conn, options);
socket.once('data', common.mustCall(() => {
socket._destroySSL(); // Should not crash.
socket.destroy();
server.close();
}));
}));