Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/nodejs/node.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnna Henningsen <anna@addaleax.net>2018-02-13 03:23:50 +0300
committerAnna Henningsen <anna@addaleax.net>2018-03-15 14:53:13 +0300
commit67f1d76956a8a5da9875b113371c8786ad579086 (patch)
tree220df0ab56ebc499c1edec20fe7b90449189d44b /src/stream_pipe.cc
parentf7f1437d44f3e4b745e36540a752065cc58d993b (diff)
src: introduce native-layer stream piping
Provide a way to create pipes between native `StreamBase` instances that acts more directly than a `.pipe()` call would. PR-URL: https://github.com/nodejs/node/pull/18936 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Diffstat (limited to 'src/stream_pipe.cc')
-rw-r--r--src/stream_pipe.cc266
1 files changed, 266 insertions, 0 deletions
diff --git a/src/stream_pipe.cc b/src/stream_pipe.cc
new file mode 100644
index 00000000000..8f0263cd9ae
--- /dev/null
+++ b/src/stream_pipe.cc
@@ -0,0 +1,266 @@
+#include "stream_pipe.h"
+#include "stream_base-inl.h"
+#include "node_buffer.h"
+#include "node_internals.h"
+
+using v8::Context;
+using v8::External;
+using v8::FunctionCallbackInfo;
+using v8::FunctionTemplate;
+using v8::Local;
+using v8::Object;
+using v8::Value;
+
+namespace node {
+
+StreamPipe::StreamPipe(StreamBase* source,
+ StreamBase* sink,
+ Local<Object> obj)
+ : AsyncWrap(source->stream_env(), obj, AsyncWrap::PROVIDER_STREAMPIPE) {
+ MakeWeak(this);
+
+ CHECK_NE(sink, nullptr);
+ CHECK_NE(source, nullptr);
+
+ source->PushStreamListener(&readable_listener_);
+ sink->PushStreamListener(&writable_listener_);
+
+ CHECK(sink->HasWantsWrite());
+
+ // Set up links between this object and the source/sink objects.
+ // In particular, this makes sure that they are garbage collected as a group,
+ // if that applies to the given streams (for example, Http2Streams use
+ // weak references).
+ obj->Set(env()->context(), env()->source_string(), source->GetObject())
+ .FromJust();
+ source->GetObject()->Set(env()->context(), env()->pipe_target_string(), obj)
+ .FromJust();
+ obj->Set(env()->context(), env()->sink_string(), sink->GetObject())
+ .FromJust();
+ sink->GetObject()->Set(env()->context(), env()->pipe_source_string(), obj)
+ .FromJust();
+}
+
+StreamPipe::~StreamPipe() {
+ CHECK(is_closed_);
+}
+
+StreamBase* StreamPipe::source() {
+ return static_cast<StreamBase*>(readable_listener_.stream());
+}
+
+StreamBase* StreamPipe::sink() {
+ return static_cast<StreamBase*>(writable_listener_.stream());
+}
+
+void StreamPipe::Unpipe() {
+ if (is_closed_)
+ return;
+
+ // Note that we cannot use virtual methods on `source` and `sink` here,
+ // because this function can be called from their destructors via
+ // `OnStreamDestroy()`.
+
+ is_closed_ = true;
+ is_reading_ = false;
+ source()->RemoveStreamListener(&readable_listener_);
+ sink()->RemoveStreamListener(&writable_listener_);
+
+ // Delay the JS-facing part with SetImmediate, because this might be from
+ // inside the garbage collector, so we can’t run JS here.
+ HandleScope handle_scope(env()->isolate());
+ env()->SetImmediate([](Environment* env, void* data) {
+ StreamPipe* pipe = static_cast<StreamPipe*>(data);
+
+ HandleScope handle_scope(env->isolate());
+ Context::Scope context_scope(env->context());
+ Local<Object> object = pipe->object();
+
+ if (object->Has(env->context(), env->onunpipe_string()).FromJust()) {
+ pipe->MakeCallback(env->onunpipe_string(), 0, nullptr).ToLocalChecked();
+ }
+
+ // Set all the links established in the constructor to `null`.
+ Local<Value> null = Null(env->isolate());
+
+ Local<Value> source_v;
+ Local<Value> sink_v;
+ source_v = object->Get(env->context(), env->source_string())
+ .ToLocalChecked();
+ sink_v = object->Get(env->context(), env->sink_string())
+ .ToLocalChecked();
+ CHECK(source_v->IsObject());
+ CHECK(sink_v->IsObject());
+
+ object->Set(env->context(), env->source_string(), null).FromJust();
+ object->Set(env->context(), env->sink_string(), null).FromJust();
+ source_v.As<Object>()->Set(env->context(),
+ env->pipe_target_string(),
+ null).FromJust();
+ sink_v.As<Object>()->Set(env->context(),
+ env->pipe_source_string(),
+ null).FromJust();
+ }, static_cast<void*>(this), object());
+}
+
+uv_buf_t StreamPipe::ReadableListener::OnStreamAlloc(size_t suggested_size) {
+ StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
+ size_t size = std::min(suggested_size, pipe->wanted_data_);
+ CHECK_GT(size, 0);
+ return uv_buf_init(Malloc(size), size);
+}
+
+void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread,
+ const uv_buf_t& buf) {
+ StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
+ AsyncScope async_scope(pipe);
+ if (nread < 0) {
+ // EOF or error; stop reading and pass the error to the previous listener
+ // (which might end up in JS).
+ free(buf.base);
+ pipe->is_eof_ = true;
+ stream()->ReadStop();
+ CHECK_NE(previous_listener_, nullptr);
+ previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0));
+ // If we’re not writing, close now. Otherwise, we’ll do that in
+ // `OnStreamAfterWrite()`.
+ if (!pipe->is_writing_) {
+ pipe->ShutdownWritable();
+ pipe->Unpipe();
+ }
+ return;
+ }
+
+ pipe->ProcessData(nread, buf);
+}
+
+void StreamPipe::ProcessData(size_t nread, const uv_buf_t& buf) {
+ uv_buf_t buffer = uv_buf_init(buf.base, nread);
+ StreamWriteResult res = sink()->Write(&buffer, 1);
+ if (!res.async) {
+ free(buf.base);
+ writable_listener_.OnStreamAfterWrite(nullptr, res.err);
+ } else {
+ is_writing_ = true;
+ is_reading_ = false;
+ res.wrap->SetAllocatedStorage(buf.base, buf.len);
+ source()->ReadStop();
+ }
+}
+
+void StreamPipe::ShutdownWritable() {
+ sink()->Shutdown();
+}
+
+void StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w,
+ int status) {
+ StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
+ pipe->is_writing_ = false;
+ if (pipe->is_eof_) {
+ AsyncScope async_scope(pipe);
+ pipe->ShutdownWritable();
+ pipe->Unpipe();
+ return;
+ }
+
+ if (status != 0) {
+ CHECK_NE(previous_listener_, nullptr);
+ StreamListener* prev = previous_listener_;
+ pipe->Unpipe();
+ prev->OnStreamAfterWrite(w, status);
+ return;
+ }
+}
+
+void StreamPipe::WritableListener::OnStreamAfterShutdown(ShutdownWrap* w,
+ int status) {
+ StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
+ CHECK_NE(previous_listener_, nullptr);
+ StreamListener* prev = previous_listener_;
+ pipe->Unpipe();
+ prev->OnStreamAfterShutdown(w, status);
+}
+
+void StreamPipe::ReadableListener::OnStreamDestroy() {
+ StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
+ if (!pipe->is_eof_) {
+ OnStreamRead(UV_EPIPE, uv_buf_init(nullptr, 0));
+ }
+}
+
+void StreamPipe::WritableListener::OnStreamDestroy() {
+ StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
+ pipe->is_eof_ = true;
+ pipe->Unpipe();
+}
+
+void StreamPipe::WritableListener::OnStreamWantsWrite(size_t suggested_size) {
+ StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
+ pipe->wanted_data_ = suggested_size;
+ if (pipe->is_reading_ || pipe->is_closed_)
+ return;
+ AsyncScope async_scope(pipe);
+ pipe->is_reading_ = true;
+ pipe->source()->ReadStart();
+}
+
+uv_buf_t StreamPipe::WritableListener::OnStreamAlloc(size_t suggested_size) {
+ CHECK_NE(previous_listener_, nullptr);
+ return previous_listener_->OnStreamAlloc(suggested_size);
+}
+
+void StreamPipe::WritableListener::OnStreamRead(ssize_t nread,
+ const uv_buf_t& buf) {
+ CHECK_NE(previous_listener_, nullptr);
+ return previous_listener_->OnStreamRead(nread, buf);
+}
+
+void StreamPipe::New(const FunctionCallbackInfo<Value>& args) {
+ CHECK(args.IsConstructCall());
+ CHECK(args[0]->IsExternal());
+ CHECK(args[1]->IsExternal());
+ auto source = static_cast<StreamBase*>(args[0].As<External>()->Value());
+ auto sink = static_cast<StreamBase*>(args[1].As<External>()->Value());
+
+ new StreamPipe(source, sink, args.This());
+}
+
+void StreamPipe::Start(const FunctionCallbackInfo<Value>& args) {
+ StreamPipe* pipe;
+ ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
+ pipe->is_closed_ = false;
+ if (pipe->wanted_data_ > 0)
+ pipe->writable_listener_.OnStreamWantsWrite(pipe->wanted_data_);
+}
+
+void StreamPipe::Unpipe(const FunctionCallbackInfo<Value>& args) {
+ StreamPipe* pipe;
+ ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
+ pipe->Unpipe();
+}
+
+namespace {
+
+void InitializeStreamPipe(Local<Object> target,
+ Local<Value> unused,
+ Local<Context> context) {
+ Environment* env = Environment::GetCurrent(context);
+
+ // Create FunctionTemplate for FileHandle::CloseReq
+ Local<FunctionTemplate> pipe = env->NewFunctionTemplate(StreamPipe::New);
+ Local<String> stream_pipe_string =
+ FIXED_ONE_BYTE_STRING(env->isolate(), "StreamPipe");
+ env->SetProtoMethod(pipe, "unpipe", StreamPipe::Unpipe);
+ env->SetProtoMethod(pipe, "start", StreamPipe::Start);
+ AsyncWrap::AddWrapMethods(env, pipe);
+ pipe->SetClassName(stream_pipe_string);
+ pipe->InstanceTemplate()->SetInternalFieldCount(1);
+ target->Set(context, stream_pipe_string, pipe->GetFunction()).FromJust();
+}
+
+} // anonymous namespace
+
+} // namespace node
+
+NODE_MODULE_CONTEXT_AWARE_INTERNAL(stream_pipe,
+ node::InitializeStreamPipe)