Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/nodejs/node.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnna Henningsen <anna@addaleax.net>2019-10-09 19:56:21 +0300
committerJames M Snell <jasnell@gmail.com>2020-03-03 21:21:19 +0300
commit43b7142fedc706d79ea77254bc463f63a3b73358 (patch)
tree64734ce331698aec42a90c0e3683a03a10cf7000 /src/udp_wrap.cc
parent7cafd5f3a924451294a4f2ce6efb628b53fce7eb (diff)
dgram: make UDPWrap more reusable
Allow using the handle more directly for I/O in other parts of the codebase. Originally landed in the QUIC repo Original review metadata: ``` PR-URL: https://github.com/nodejs/quic/pull/165 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Daniel Bevenius <daniel.bevenius@gmail.com> ``` Signed-off-by: James M Snell <jasnell@gmail.com> PR-URL: https://github.com/nodejs/node/pull/31871 Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
Diffstat (limited to 'src/udp_wrap.cc')
-rw-r--r--src/udp_wrap.cc205
1 files changed, 151 insertions, 54 deletions
diff --git a/src/udp_wrap.cc b/src/udp_wrap.cc
index fa5cf8da479..702449daae9 100644
--- a/src/udp_wrap.cc
+++ b/src/udp_wrap.cc
@@ -69,18 +69,57 @@ SendWrap::SendWrap(Environment* env,
}
-inline bool SendWrap::have_callback() const {
+bool SendWrap::have_callback() const {
return have_callback_;
}
+UDPListener::~UDPListener() {
+ if (wrap_ != nullptr)
+ wrap_->set_listener(nullptr);
+}
+
+UDPWrapBase::~UDPWrapBase() {
+ set_listener(nullptr);
+}
+
+UDPListener* UDPWrapBase::listener() const {
+ CHECK_NOT_NULL(listener_);
+ return listener_;
+}
+
+void UDPWrapBase::set_listener(UDPListener* listener) {
+ if (listener_ != nullptr)
+ listener_->wrap_ = nullptr;
+ listener_ = listener;
+ if (listener_ != nullptr) {
+ CHECK_NULL(listener_->wrap_);
+ listener_->wrap_ = this;
+ }
+}
+
+UDPWrapBase* UDPWrapBase::FromObject(Local<Object> obj) {
+ CHECK_GT(obj->InternalFieldCount(), UDPWrapBase::kUDPWrapBaseField);
+ return static_cast<UDPWrapBase*>(
+ obj->GetAlignedPointerFromInternalField(UDPWrapBase::kUDPWrapBaseField));
+}
+
+void UDPWrapBase::AddMethods(Environment* env, Local<FunctionTemplate> t) {
+ env->SetProtoMethod(t, "recvStart", RecvStart);
+ env->SetProtoMethod(t, "recvStop", RecvStop);
+}
UDPWrap::UDPWrap(Environment* env, Local<Object> object)
: HandleWrap(env,
object,
reinterpret_cast<uv_handle_t*>(&handle_),
AsyncWrap::PROVIDER_UDPWRAP) {
+ object->SetAlignedPointerInInternalField(
+ UDPWrapBase::kUDPWrapBaseField, static_cast<UDPWrapBase*>(this));
+
int r = uv_udp_init(env->event_loop(), &handle_);
CHECK_EQ(r, 0); // can't fail anyway
+
+ set_listener(this);
}
@@ -91,7 +130,8 @@ void UDPWrap::Initialize(Local<Object> target,
Environment* env = Environment::GetCurrent(context);
Local<FunctionTemplate> t = env->NewFunctionTemplate(New);
- t->InstanceTemplate()->SetInternalFieldCount(UDPWrap::kInternalFieldCount);
+ t->InstanceTemplate()->SetInternalFieldCount(
+ UDPWrapBase::kInternalFieldCount);
Local<String> udpString =
FIXED_ONE_BYTE_STRING(env->isolate(), "UDP");
t->SetClassName(udpString);
@@ -112,6 +152,7 @@ void UDPWrap::Initialize(Local<Object> target,
Local<FunctionTemplate>(),
attributes);
+ UDPWrapBase::AddMethods(env, t);
env->SetProtoMethod(t, "open", Open);
env->SetProtoMethod(t, "bind", Bind);
env->SetProtoMethod(t, "connect", Connect);
@@ -120,8 +161,6 @@ void UDPWrap::Initialize(Local<Object> target,
env->SetProtoMethod(t, "connect6", Connect6);
env->SetProtoMethod(t, "send6", Send6);
env->SetProtoMethod(t, "disconnect", Disconnect);
- env->SetProtoMethod(t, "recvStart", RecvStart);
- env->SetProtoMethod(t, "recvStop", RecvStop);
env->SetProtoMethod(t, "getpeername",
GetSockOrPeerName<UDPWrap, uv_udp_getpeername>);
env->SetProtoMethod(t, "getsockname",
@@ -220,6 +259,9 @@ void UDPWrap::DoBind(const FunctionCallbackInfo<Value>& args, int family) {
flags);
}
+ if (err == 0)
+ wrap->listener()->OnAfterBind();
+
args.GetReturnValue().Set(err);
}
@@ -464,14 +506,10 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
CHECK(args[3]->IsBoolean());
}
- Local<Object> req_wrap_obj = args[0].As<Object>();
Local<Array> chunks = args[1].As<Array>();
// it is faster to fetch the length of the
// array in js-land
size_t count = args[2].As<Uint32>()->Value();
- const bool have_callback = sendto ? args[5]->IsTrue() : args[3]->IsTrue();
-
- size_t msg_size = 0;
MaybeStackBuffer<uv_buf_t, 16> bufs(count);
@@ -482,7 +520,6 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
size_t length = Buffer::Length(chunk);
bufs[i] = uv_buf_init(Buffer::Data(chunk), length);
- msg_size += length;
}
int err = 0;
@@ -492,14 +529,36 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
const unsigned short port = args[3].As<Uint32>()->Value();
node::Utf8Value address(env->isolate(), args[4]);
err = sockaddr_for_family(family, address.out(), port, &addr_storage);
- if (err == 0) {
+ if (err == 0)
addr = reinterpret_cast<sockaddr*>(&addr_storage);
- }
}
- uv_buf_t* bufs_ptr = *bufs;
- if (err == 0 && !UNLIKELY(env->options()->test_udp_no_try_send)) {
- err = uv_udp_try_send(&wrap->handle_, bufs_ptr, count, addr);
+ if (err == 0) {
+ wrap->current_send_req_wrap_ = args[0].As<Object>();
+ wrap->current_send_has_callback_ =
+ sendto ? args[5]->IsTrue() : args[3]->IsTrue();
+
+ err = wrap->Send(*bufs, count, addr);
+
+ wrap->current_send_req_wrap_.Clear();
+ wrap->current_send_has_callback_ = false;
+ }
+
+ args.GetReturnValue().Set(err);
+}
+
+ssize_t UDPWrap::Send(uv_buf_t* bufs_ptr,
+ size_t count,
+ const sockaddr* addr) {
+ if (IsHandleClosing()) return UV_EBADF;
+
+ size_t msg_size = 0;
+ for (size_t i = 0; i < count; i++)
+ msg_size += bufs_ptr[i].len;
+
+ int err = 0;
+ if (!UNLIKELY(env()->options()->test_udp_no_try_send)) {
+ err = uv_udp_try_send(&handle_, bufs_ptr, count, addr);
if (err == UV_ENOSYS || err == UV_EAGAIN) {
err = 0;
} else if (err >= 0) {
@@ -517,28 +576,41 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
CHECK_EQ(static_cast<size_t>(err), msg_size);
// + 1 so that the JS side can distinguish 0-length async sends from
// 0-length sync sends.
- args.GetReturnValue().Set(static_cast<uint32_t>(msg_size) + 1);
- return;
+ return msg_size + 1;
}
}
}
if (err == 0) {
- AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap);
- SendWrap* req_wrap = new SendWrap(env, req_wrap_obj, have_callback);
- req_wrap->msg_size = msg_size;
-
- err = req_wrap->Dispatch(uv_udp_send,
- &wrap->handle_,
- bufs_ptr,
- count,
- addr,
- OnSend);
+ AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(this);
+ ReqWrap<uv_udp_send_t>* req_wrap = listener()->CreateSendWrap(msg_size);
+ if (req_wrap == nullptr) return UV_ENOSYS;
+
+ err = req_wrap->Dispatch(
+ uv_udp_send,
+ &handle_,
+ bufs_ptr,
+ count,
+ addr,
+ uv_udp_send_cb{[](uv_udp_send_t* req, int status) {
+ UDPWrap* self = ContainerOf(&UDPWrap::handle_, req->handle);
+ self->listener()->OnSendDone(
+ ReqWrap<uv_udp_send_t>::from_req(req), status);
+ }});
if (err)
delete req_wrap;
}
- args.GetReturnValue().Set(err);
+ return err;
+}
+
+
+ReqWrap<uv_udp_send_t>* UDPWrap::CreateSendWrap(size_t msg_size) {
+ SendWrap* req_wrap = new SendWrap(env(),
+ current_send_req_wrap_,
+ current_send_has_callback_);
+ req_wrap->msg_size = msg_size;
+ return req_wrap;
}
@@ -552,31 +624,46 @@ void UDPWrap::Send6(const FunctionCallbackInfo<Value>& args) {
}
-void UDPWrap::RecvStart(const FunctionCallbackInfo<Value>& args) {
- UDPWrap* wrap;
- ASSIGN_OR_RETURN_UNWRAP(&wrap,
- args.Holder(),
- args.GetReturnValue().Set(UV_EBADF));
- int err = uv_udp_recv_start(&wrap->handle_, OnAlloc, OnRecv);
+AsyncWrap* UDPWrap::GetAsyncWrap() {
+ return this;
+}
+
+int UDPWrap::GetPeerName(sockaddr* name, int* namelen) {
+ return uv_udp_getpeername(&handle_, name, namelen);
+}
+
+int UDPWrap::GetSockName(sockaddr* name, int* namelen) {
+ return uv_udp_getsockname(&handle_, name, namelen);
+}
+
+void UDPWrapBase::RecvStart(const FunctionCallbackInfo<Value>& args) {
+ UDPWrapBase* wrap = UDPWrapBase::FromObject(args.Holder());
+ args.GetReturnValue().Set(wrap == nullptr ? UV_EBADF : wrap->RecvStart());
+}
+
+int UDPWrap::RecvStart() {
+ if (IsHandleClosing()) return UV_EBADF;
+ int err = uv_udp_recv_start(&handle_, OnAlloc, OnRecv);
// UV_EALREADY means that the socket is already bound but that's okay
if (err == UV_EALREADY)
err = 0;
- args.GetReturnValue().Set(err);
+ return err;
}
-void UDPWrap::RecvStop(const FunctionCallbackInfo<Value>& args) {
- UDPWrap* wrap;
- ASSIGN_OR_RETURN_UNWRAP(&wrap,
- args.Holder(),
- args.GetReturnValue().Set(UV_EBADF));
- int r = uv_udp_recv_stop(&wrap->handle_);
- args.GetReturnValue().Set(r);
+void UDPWrapBase::RecvStop(const FunctionCallbackInfo<Value>& args) {
+ UDPWrapBase* wrap = UDPWrapBase::FromObject(args.Holder());
+ args.GetReturnValue().Set(wrap == nullptr ? UV_EBADF : wrap->RecvStop());
+}
+
+int UDPWrap::RecvStop() {
+ if (IsHandleClosing()) return UV_EBADF;
+ return uv_udp_recv_stop(&handle_);
}
-void UDPWrap::OnSend(uv_udp_send_t* req, int status) {
- std::unique_ptr<SendWrap> req_wrap{static_cast<SendWrap*>(req->data)};
+void UDPWrap::OnSendDone(ReqWrap<uv_udp_send_t>* req, int status) {
+ std::unique_ptr<SendWrap> req_wrap{static_cast<SendWrap*>(req)};
if (req_wrap->have_callback()) {
Environment* env = req_wrap->env();
HandleScope handle_scope(env->isolate());
@@ -593,19 +680,30 @@ void UDPWrap::OnSend(uv_udp_send_t* req, int status) {
void UDPWrap::OnAlloc(uv_handle_t* handle,
size_t suggested_size,
uv_buf_t* buf) {
- UDPWrap* wrap = static_cast<UDPWrap*>(handle->data);
- *buf = wrap->env()->AllocateManaged(suggested_size).release();
+ UDPWrap* wrap = ContainerOf(&UDPWrap::handle_,
+ reinterpret_cast<uv_udp_t*>(handle));
+ *buf = wrap->listener()->OnAlloc(suggested_size);
+}
+
+uv_buf_t UDPWrap::OnAlloc(size_t suggested_size) {
+ return env()->AllocateManaged(suggested_size).release();
}
void UDPWrap::OnRecv(uv_udp_t* handle,
ssize_t nread,
- const uv_buf_t* buf_,
- const struct sockaddr* addr,
+ const uv_buf_t* buf,
+ const sockaddr* addr,
unsigned int flags) {
- UDPWrap* wrap = static_cast<UDPWrap*>(handle->data);
- Environment* env = wrap->env();
+ UDPWrap* wrap = ContainerOf(&UDPWrap::handle_, handle);
+ wrap->listener()->OnRecv(nread, *buf, addr, flags);
+}
- AllocatedBuffer buf(env, *buf_);
+void UDPWrap::OnRecv(ssize_t nread,
+ const uv_buf_t& buf_,
+ const sockaddr* addr,
+ unsigned int flags) {
+ Environment* env = this->env();
+ AllocatedBuffer buf(env, buf_);
if (nread == 0 && addr == nullptr) {
return;
}
@@ -613,23 +711,22 @@ void UDPWrap::OnRecv(uv_udp_t* handle,
HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context());
- Local<Object> wrap_obj = wrap->object();
Local<Value> argv[] = {
Integer::New(env->isolate(), nread),
- wrap_obj,
+ object(),
Undefined(env->isolate()),
Undefined(env->isolate())
};
if (nread < 0) {
- wrap->MakeCallback(env->onmessage_string(), arraysize(argv), argv);
+ MakeCallback(env->onmessage_string(), arraysize(argv), argv);
return;
}
buf.Resize(nread);
argv[2] = buf.ToBuffer().ToLocalChecked();
argv[3] = AddressToJS(env, addr);
- wrap->MakeCallback(env->onmessage_string(), arraysize(argv), argv);
+ MakeCallback(env->onmessage_string(), arraysize(argv), argv);
}
MaybeLocal<Object> UDPWrap::Instantiate(Environment* env,