diff options
Diffstat (limited to 'src/stream_wrap.cc')
-rw-r--r-- | src/stream_wrap.cc | 146 |
1 files changed, 118 insertions, 28 deletions
diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc index e0079b81364..848abefd5ea 100644 --- a/src/stream_wrap.cc +++ b/src/stream_wrap.cc @@ -33,6 +33,7 @@ #include "util-inl.h" #include <stdlib.h> // abort() +#include <string.h> // memcpy() #include <limits.h> // INT_MAX @@ -49,6 +50,7 @@ using v8::Number; using v8::Object; using v8::PropertyCallbackInfo; using v8::String; +using v8::True; using v8::Undefined; using v8::Value; @@ -200,30 +202,43 @@ void StreamWrap::WriteBuffer(const FunctionCallbackInfo<Value>& args) { Local<Object> buf_obj = args[1].As<Object>(); size_t length = Buffer::Length(buf_obj); - char* storage = new char[sizeof(WriteWrap)]; - WriteWrap* req_wrap = - new(storage) WriteWrap(env, req_wrap_obj, wrap); + char* storage; + WriteWrap* req_wrap; uv_buf_t buf; WriteBuffer(buf_obj, &buf); - int err = wrap->callbacks()->DoWrite(req_wrap, - &buf, - 1, - NULL, - StreamWrap::AfterWrite); + // Try writing immediately without allocation + uv_buf_t* bufs = &buf; + size_t count = 1; + int err = wrap->callbacks()->TryWrite(&bufs, &count); + if (err == 0) + goto done; + assert(count == 1); + + // Allocate, or write rest + storage = new char[sizeof(WriteWrap)]; + req_wrap = new(storage) WriteWrap(env, req_wrap_obj, wrap); + + err = wrap->callbacks()->DoWrite(req_wrap, + bufs, + count, + NULL, + StreamWrap::AfterWrite); req_wrap->Dispatched(); - req_wrap_obj->Set(env->bytes_string(), - Integer::NewFromUnsigned(length, node_isolate)); - const char* msg = wrap->callbacks()->Error(); - if (msg != NULL) - req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg)); + req_wrap_obj->Set(env->async(), True(node_isolate)); if (err) { req_wrap->~WriteWrap(); delete[] storage; } + done: + const char* msg = wrap->callbacks()->Error(); + if (msg != NULL) + req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg)); + req_wrap_obj->Set(env->bytes_string(), + Integer::NewFromUnsigned(length, node_isolate)); args.GetReturnValue().Set(err); } @@ -256,22 +271,53 @@ void StreamWrap::WriteStringImpl(const FunctionCallbackInfo<Value>& args) { return; } - char* storage = new char[sizeof(WriteWrap) + storage_size + 15]; - WriteWrap* req_wrap = - new(storage) WriteWrap(env, req_wrap_obj, wrap); + // Try writing immediately if write size isn't too big + char* storage; + WriteWrap* req_wrap; + char* data; + char stack_storage[16384]; // 16kb + size_t data_size; + uv_buf_t buf; + + bool try_write = storage_size + 15 <= sizeof(stack_storage) && + (!wrap->is_named_pipe_ipc() || !args[2]->IsObject()); + if (try_write) { + data_size = StringBytes::Write(stack_storage, + storage_size, + string, + encoding); + buf = uv_buf_init(stack_storage, data_size); + + uv_buf_t* bufs = &buf; + size_t count = 1; + err = wrap->callbacks()->TryWrite(&bufs, &count); + + // Success + if (err == 0) + goto done; + + // Failure, or partial write + assert(count == 1); + } + + storage = new char[sizeof(WriteWrap) + storage_size + 15]; + req_wrap = new(storage) WriteWrap(env, req_wrap_obj, wrap); - char* data = reinterpret_cast<char*>(ROUND_UP( + data = reinterpret_cast<char*>(ROUND_UP( reinterpret_cast<uintptr_t>(storage) + sizeof(WriteWrap), 16)); - size_t data_size; - data_size = StringBytes::Write(data, storage_size, string, encoding); + if (try_write) { + // Copy partial data + memcpy(data, buf.base, buf.len); + data_size = buf.len; + } else { + // Write it + data_size = StringBytes::Write(data, storage_size, string, encoding); + } assert(data_size <= storage_size); - uv_buf_t buf; - - buf.base = data; - buf.len = data_size; + buf = uv_buf_init(data, data_size); if (!wrap->is_named_pipe_ipc()) { err = wrap->callbacks()->DoWrite(req_wrap, @@ -301,17 +347,19 @@ void StreamWrap::WriteStringImpl(const FunctionCallbackInfo<Value>& args) { } req_wrap->Dispatched(); - req_wrap->object()->Set(env->bytes_string(), - Integer::NewFromUnsigned(data_size, node_isolate)); - const char* msg = wrap->callbacks()->Error(); - if (msg != NULL) - req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg)); + req_wrap->object()->Set(env->async(), True(node_isolate)); if (err) { req_wrap->~WriteWrap(); delete[] storage; } + done: + const char* msg = wrap->callbacks()->Error(); + if (msg != NULL) + req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg)); + req_wrap_obj->Set(env->bytes_string(), + Integer::NewFromUnsigned(data_size, node_isolate)); args.GetReturnValue().Set(err); } @@ -405,6 +453,7 @@ void StreamWrap::Writev(const FunctionCallbackInfo<Value>& args) { delete[] bufs; req_wrap->Dispatched(); + req_wrap->object()->Set(env->async(), True(node_isolate)); req_wrap->object()->Set(env->bytes_string(), Number::New(node_isolate, bytes)); const char* msg = wrap->callbacks()->Error(); @@ -518,6 +567,47 @@ const char* StreamWrapCallbacks::Error() { } +// NOTE: Call to this function could change both `buf`'s and `count`'s +// values, shifting their base and decrementing their length. This is +// required in order to skip the data that was successfully written via +// uv_try_write(). +int StreamWrapCallbacks::TryWrite(uv_buf_t** bufs, size_t* count) { + int err; + size_t written; + uv_buf_t* vbufs = *bufs; + size_t vcount = *count; + + err = uv_try_write(wrap()->stream(), vbufs, vcount); + if (err < 0) + return err; + + // Slice off the buffers: skip all written buffers and slice the one that + // was partially written. + written = err; + for (; written != 0 && vcount > 0; vbufs++, vcount--) { + // Slice + if (vbufs[0].len > written) { + vbufs[0].base += written; + vbufs[0].len -= written; + written = 0; + break; + + // Discard + } else { + written -= vbufs[0].len; + } + } + + *bufs = vbufs; + *count = vcount; + + if (vcount == 0) + return 0; + else + return -1; +} + + int StreamWrapCallbacks::DoWrite(WriteWrap* w, uv_buf_t* bufs, size_t count, |