Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/nodejs/node.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnna Henningsen <anna@addaleax.net>2018-03-12 17:02:28 +0300
committerJames M Snell <jasnell@gmail.com>2020-02-27 02:34:23 +0300
commitb44c63b87e6c165844f7303a9ec6f8c0cce7d3dc (patch)
treefd50abef76272cf8da042bf51d272119923a47e9 /src/stream_pipe.cc
parentb2be348fcce7efd30f88b9fabbb2e9629b534e20 (diff)
src: enable `StreamPipe` for generic `StreamBase`s
Originally landed in the nodejs/quic repo and used there for file sending. Original review: ``` PR-URL: https://github.com/nodejs/quic/pull/150 Reviewed-By: James M Snell <jasnell@gmail.com> ``` PR-URL: https://github.com/nodejs/node/pull/31869 Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Daniel Bevenius <daniel.bevenius@gmail.com>
Diffstat (limited to 'src/stream_pipe.cc')
-rw-r--r--src/stream_pipe.cc56
1 files changed, 43 insertions, 13 deletions
diff --git a/src/stream_pipe.cc b/src/stream_pipe.cc
index d405c4d5cbe..5f7514b1b84 100644
--- a/src/stream_pipe.cc
+++ b/src/stream_pipe.cc
@@ -25,7 +25,7 @@ StreamPipe::StreamPipe(StreamBase* source,
source->PushStreamListener(&readable_listener_);
sink->PushStreamListener(&writable_listener_);
- CHECK(sink->HasWantsWrite());
+ uses_wants_write_ = sink->HasWantsWrite();
// Set up links between this object and the source/sink objects.
// In particular, this makes sure that they are garbage collected as a group,
@@ -66,7 +66,8 @@ void StreamPipe::Unpipe(bool is_in_deletion) {
is_closed_ = true;
is_reading_ = false;
source()->RemoveStreamListener(&readable_listener_);
- sink()->RemoveStreamListener(&writable_listener_);
+ if (pending_writes_ == 0)
+ sink()->RemoveStreamListener(&writable_listener_);
if (is_in_deletion) return;
@@ -126,13 +127,16 @@ void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread,
// EOF or error; stop reading and pass the error to the previous listener
// (which might end up in JS).
pipe->is_eof_ = true;
+ // Cache `sink()` here because the previous listener might do things
+ // that eventually lead to an `Unpipe()` call.
+ StreamBase* sink = pipe->sink();
stream()->ReadStop();
CHECK_NOT_NULL(previous_listener_);
previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0));
// If we’re not writing, close now. Otherwise, we’ll do that in
// `OnStreamAfterWrite()`.
- if (!pipe->is_writing_) {
- pipe->ShutdownWritable();
+ if (pipe->pending_writes_ == 0) {
+ sink->Shutdown();
pipe->Unpipe();
}
return;
@@ -142,12 +146,13 @@ void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread,
}
void StreamPipe::ProcessData(size_t nread, AllocatedBuffer&& buf) {
+ CHECK(uses_wants_write_ || pending_writes_ == 0);
uv_buf_t buffer = uv_buf_init(buf.data(), nread);
StreamWriteResult res = sink()->Write(&buffer, 1);
+ pending_writes_++;
if (!res.async) {
writable_listener_.OnStreamAfterWrite(nullptr, res.err);
} else {
- is_writing_ = true;
is_reading_ = false;
res.wrap->SetAllocatedStorage(std::move(buf));
if (source() != nullptr)
@@ -155,19 +160,26 @@ void StreamPipe::ProcessData(size_t nread, AllocatedBuffer&& buf) {
}
}
-void StreamPipe::ShutdownWritable() {
- sink()->Shutdown();
-}
-
void StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w,
int status) {
StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
- pipe->is_writing_ = false;
+ pipe->pending_writes_--;
+ if (pipe->is_closed_) {
+ if (pipe->pending_writes_ == 0) {
+ Environment* env = pipe->env();
+ HandleScope handle_scope(env->isolate());
+ Context::Scope context_scope(env->context());
+ pipe->MakeCallback(env->oncomplete_string(), 0, nullptr).ToLocalChecked();
+ stream()->RemoveStreamListener(this);
+ }
+ return;
+ }
+
if (pipe->is_eof_) {
HandleScope handle_scope(pipe->env()->isolate());
InternalCallbackScope callback_scope(pipe,
InternalCallbackScope::kSkipTaskQueues);
- pipe->ShutdownWritable();
+ pipe->sink()->Shutdown();
pipe->Unpipe();
return;
}
@@ -179,6 +191,10 @@ void StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w,
prev->OnStreamAfterWrite(w, status);
return;
}
+
+ if (!pipe->uses_wants_write_) {
+ OnStreamWantsWrite(65536);
+ }
}
void StreamPipe::WritableListener::OnStreamAfterShutdown(ShutdownWrap* w,
@@ -202,6 +218,7 @@ void StreamPipe::WritableListener::OnStreamDestroy() {
StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
pipe->sink_destroyed_ = true;
pipe->is_eof_ = true;
+ pipe->pending_writes_ = 0;
pipe->Unpipe();
}
@@ -242,8 +259,7 @@ void StreamPipe::Start(const FunctionCallbackInfo<Value>& args) {
StreamPipe* pipe;
ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
pipe->is_closed_ = false;
- if (pipe->wanted_data_ > 0)
- pipe->writable_listener_.OnStreamWantsWrite(pipe->wanted_data_);
+ pipe->writable_listener_.OnStreamWantsWrite(65536);
}
void StreamPipe::Unpipe(const FunctionCallbackInfo<Value>& args) {
@@ -252,6 +268,18 @@ void StreamPipe::Unpipe(const FunctionCallbackInfo<Value>& args) {
pipe->Unpipe();
}
+void StreamPipe::IsClosed(const FunctionCallbackInfo<Value>& args) {
+ StreamPipe* pipe;
+ ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
+ args.GetReturnValue().Set(pipe->is_closed_);
+}
+
+void StreamPipe::PendingWrites(const FunctionCallbackInfo<Value>& args) {
+ StreamPipe* pipe;
+ ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
+ args.GetReturnValue().Set(pipe->pending_writes_);
+}
+
namespace {
void InitializeStreamPipe(Local<Object> target,
@@ -266,6 +294,8 @@ void InitializeStreamPipe(Local<Object> target,
FIXED_ONE_BYTE_STRING(env->isolate(), "StreamPipe");
env->SetProtoMethod(pipe, "unpipe", StreamPipe::Unpipe);
env->SetProtoMethod(pipe, "start", StreamPipe::Start);
+ env->SetProtoMethod(pipe, "isClosed", StreamPipe::IsClosed);
+ env->SetProtoMethod(pipe, "pendingWrites", StreamPipe::PendingWrites);
pipe->Inherit(AsyncWrap::GetConstructorTemplate(env));
pipe->SetClassName(stream_pipe_string);
pipe->InstanceTemplate()->SetInternalFieldCount(1);