Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/nodejs/node.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFedor Indutny <fedor.indutny@gmail.com>2014-01-29 02:48:10 +0400
committerFedor Indutny <fedor.indutny@gmail.com>2014-01-29 02:49:03 +0400
commit9836a4eeda1e2d43aad0923f1f72b364792629bc (patch)
treef9a43115eaca3a49f83c910d20ea363bef3b2b29
parenteaf76648a6ba05932465fdb2478a16ca4b6c17a6 (diff)
stream_wrap: use `uv_try_write` where possible
Use `uv_try_write` for string and buffer writes, thus avoiding to do allocations and copying in some of the cases.
-rw-r--r--benchmark/net/tcp-raw-pipe.js2
-rw-r--r--benchmark/net/tcp-raw-s2c.js9
-rw-r--r--lib/net.js8
-rw-r--r--src/env.h1
-rw-r--r--src/stream_wrap.cc146
-rw-r--r--src/stream_wrap.h3
-rw-r--r--src/tls_wrap.cc6
-rw-r--r--src/tls_wrap.h1
-rw-r--r--test/simple/test-tcp-wrap-listen.js9
9 files changed, 148 insertions, 37 deletions
diff --git a/benchmark/net/tcp-raw-pipe.js b/benchmark/net/tcp-raw-pipe.js
index 91c69e9b6c9..bda683985d4 100644
--- a/benchmark/net/tcp-raw-pipe.js
+++ b/benchmark/net/tcp-raw-pipe.js
@@ -51,7 +51,7 @@ function server() {
if (nread < 0)
fail(nread, 'read');
- var writeReq = {};
+ var writeReq = { async: false };
err = clientHandle.writeBuffer(writeReq, buffer);
if (err)
diff --git a/benchmark/net/tcp-raw-s2c.js b/benchmark/net/tcp-raw-s2c.js
index 6fb65685272..500be1b72bb 100644
--- a/benchmark/net/tcp-raw-s2c.js
+++ b/benchmark/net/tcp-raw-s2c.js
@@ -68,7 +68,7 @@ function server() {
write();
function write() {
- var writeReq = { oncomplete: afterWrite };
+ var writeReq = { async: false, oncomplete: afterWrite };
var err;
switch (type) {
case 'buf':
@@ -82,8 +82,13 @@ function server() {
break;
}
- if (err)
+ if (err) {
fail(err, 'write');
+ } else if (!writeReq.async) {
+ process.nextTick(function() {
+ afterWrite(null, clientHandle, writeReq);
+ });
+ }
}
function afterWrite(err, handle, req) {
diff --git a/lib/net.js b/lib/net.js
index 800bae38cf5..8e6dcbf7bf9 100644
--- a/lib/net.js
+++ b/lib/net.js
@@ -626,7 +626,7 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) {
return false;
}
- var req = { oncomplete: afterWrite };
+ var req = { oncomplete: afterWrite, async: false };
var err;
if (writev) {
@@ -660,10 +660,10 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) {
// If it was entirely flushed, we can write some more right now.
// However, if more is left in the queue, then wait until that clears.
- if (this._handle.writeQueueSize === 0)
- cb();
- else
+ if (req.async && this._handle.writeQueueSize != 0)
req.cb = cb;
+ else
+ cb();
};
diff --git a/src/env.h b/src/env.h
index 23a3959d7bb..26630bb091a 100644
--- a/src/env.h
+++ b/src/env.h
@@ -53,6 +53,7 @@ namespace node {
#define PER_ISOLATE_STRING_PROPERTIES(V) \
V(address_string, "address") \
V(atime_string, "atime") \
+ V(async, "async") \
V(async_queue_string, "_asyncQueue") \
V(birthtime_string, "birthtime") \
V(blksize_string, "blksize") \
diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc
index e0079b81364..848abefd5ea 100644
--- a/src/stream_wrap.cc
+++ b/src/stream_wrap.cc
@@ -33,6 +33,7 @@
#include "util-inl.h"
#include <stdlib.h> // abort()
+#include <string.h> // memcpy()
#include <limits.h> // INT_MAX
@@ -49,6 +50,7 @@ using v8::Number;
using v8::Object;
using v8::PropertyCallbackInfo;
using v8::String;
+using v8::True;
using v8::Undefined;
using v8::Value;
@@ -200,30 +202,43 @@ void StreamWrap::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
Local<Object> buf_obj = args[1].As<Object>();
size_t length = Buffer::Length(buf_obj);
- char* storage = new char[sizeof(WriteWrap)];
- WriteWrap* req_wrap =
- new(storage) WriteWrap(env, req_wrap_obj, wrap);
+ char* storage;
+ WriteWrap* req_wrap;
uv_buf_t buf;
WriteBuffer(buf_obj, &buf);
- int err = wrap->callbacks()->DoWrite(req_wrap,
- &buf,
- 1,
- NULL,
- StreamWrap::AfterWrite);
+ // Try writing immediately without allocation
+ uv_buf_t* bufs = &buf;
+ size_t count = 1;
+ int err = wrap->callbacks()->TryWrite(&bufs, &count);
+ if (err == 0)
+ goto done;
+ assert(count == 1);
+
+ // Allocate, or write rest
+ storage = new char[sizeof(WriteWrap)];
+ req_wrap = new(storage) WriteWrap(env, req_wrap_obj, wrap);
+
+ err = wrap->callbacks()->DoWrite(req_wrap,
+ bufs,
+ count,
+ NULL,
+ StreamWrap::AfterWrite);
req_wrap->Dispatched();
- req_wrap_obj->Set(env->bytes_string(),
- Integer::NewFromUnsigned(length, node_isolate));
- const char* msg = wrap->callbacks()->Error();
- if (msg != NULL)
- req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
+ req_wrap_obj->Set(env->async(), True(node_isolate));
if (err) {
req_wrap->~WriteWrap();
delete[] storage;
}
+ done:
+ const char* msg = wrap->callbacks()->Error();
+ if (msg != NULL)
+ req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
+ req_wrap_obj->Set(env->bytes_string(),
+ Integer::NewFromUnsigned(length, node_isolate));
args.GetReturnValue().Set(err);
}
@@ -256,22 +271,53 @@ void StreamWrap::WriteStringImpl(const FunctionCallbackInfo<Value>& args) {
return;
}
- char* storage = new char[sizeof(WriteWrap) + storage_size + 15];
- WriteWrap* req_wrap =
- new(storage) WriteWrap(env, req_wrap_obj, wrap);
+ // Try writing immediately if write size isn't too big
+ char* storage;
+ WriteWrap* req_wrap;
+ char* data;
+ char stack_storage[16384]; // 16kb
+ size_t data_size;
+ uv_buf_t buf;
+
+ bool try_write = storage_size + 15 <= sizeof(stack_storage) &&
+ (!wrap->is_named_pipe_ipc() || !args[2]->IsObject());
+ if (try_write) {
+ data_size = StringBytes::Write(stack_storage,
+ storage_size,
+ string,
+ encoding);
+ buf = uv_buf_init(stack_storage, data_size);
+
+ uv_buf_t* bufs = &buf;
+ size_t count = 1;
+ err = wrap->callbacks()->TryWrite(&bufs, &count);
+
+ // Success
+ if (err == 0)
+ goto done;
+
+ // Failure, or partial write
+ assert(count == 1);
+ }
+
+ storage = new char[sizeof(WriteWrap) + storage_size + 15];
+ req_wrap = new(storage) WriteWrap(env, req_wrap_obj, wrap);
- char* data = reinterpret_cast<char*>(ROUND_UP(
+ data = reinterpret_cast<char*>(ROUND_UP(
reinterpret_cast<uintptr_t>(storage) + sizeof(WriteWrap), 16));
- size_t data_size;
- data_size = StringBytes::Write(data, storage_size, string, encoding);
+ if (try_write) {
+ // Copy partial data
+ memcpy(data, buf.base, buf.len);
+ data_size = buf.len;
+ } else {
+ // Write it
+ data_size = StringBytes::Write(data, storage_size, string, encoding);
+ }
assert(data_size <= storage_size);
- uv_buf_t buf;
-
- buf.base = data;
- buf.len = data_size;
+ buf = uv_buf_init(data, data_size);
if (!wrap->is_named_pipe_ipc()) {
err = wrap->callbacks()->DoWrite(req_wrap,
@@ -301,17 +347,19 @@ void StreamWrap::WriteStringImpl(const FunctionCallbackInfo<Value>& args) {
}
req_wrap->Dispatched();
- req_wrap->object()->Set(env->bytes_string(),
- Integer::NewFromUnsigned(data_size, node_isolate));
- const char* msg = wrap->callbacks()->Error();
- if (msg != NULL)
- req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
+ req_wrap->object()->Set(env->async(), True(node_isolate));
if (err) {
req_wrap->~WriteWrap();
delete[] storage;
}
+ done:
+ const char* msg = wrap->callbacks()->Error();
+ if (msg != NULL)
+ req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
+ req_wrap_obj->Set(env->bytes_string(),
+ Integer::NewFromUnsigned(data_size, node_isolate));
args.GetReturnValue().Set(err);
}
@@ -405,6 +453,7 @@ void StreamWrap::Writev(const FunctionCallbackInfo<Value>& args) {
delete[] bufs;
req_wrap->Dispatched();
+ req_wrap->object()->Set(env->async(), True(node_isolate));
req_wrap->object()->Set(env->bytes_string(),
Number::New(node_isolate, bytes));
const char* msg = wrap->callbacks()->Error();
@@ -518,6 +567,47 @@ const char* StreamWrapCallbacks::Error() {
}
+// NOTE: Call to this function could change both `buf`'s and `count`'s
+// values, shifting their base and decrementing their length. This is
+// required in order to skip the data that was successfully written via
+// uv_try_write().
+int StreamWrapCallbacks::TryWrite(uv_buf_t** bufs, size_t* count) {
+ int err;
+ size_t written;
+ uv_buf_t* vbufs = *bufs;
+ size_t vcount = *count;
+
+ err = uv_try_write(wrap()->stream(), vbufs, vcount);
+ if (err < 0)
+ return err;
+
+ // Slice off the buffers: skip all written buffers and slice the one that
+ // was partially written.
+ written = err;
+ for (; written != 0 && vcount > 0; vbufs++, vcount--) {
+ // Slice
+ if (vbufs[0].len > written) {
+ vbufs[0].base += written;
+ vbufs[0].len -= written;
+ written = 0;
+ break;
+
+ // Discard
+ } else {
+ written -= vbufs[0].len;
+ }
+ }
+
+ *bufs = vbufs;
+ *count = vcount;
+
+ if (vcount == 0)
+ return 0;
+ else
+ return -1;
+}
+
+
int StreamWrapCallbacks::DoWrite(WriteWrap* w,
uv_buf_t* bufs,
size_t count,
diff --git a/src/stream_wrap.h b/src/stream_wrap.h
index d1a94fb422f..f91bb8ba55f 100644
--- a/src/stream_wrap.h
+++ b/src/stream_wrap.h
@@ -74,6 +74,9 @@ class StreamWrapCallbacks {
}
virtual const char* Error();
+
+ virtual int TryWrite(uv_buf_t** bufs, size_t* count);
+
virtual int DoWrite(WriteWrap* w,
uv_buf_t* bufs,
size_t count,
diff --git a/src/tls_wrap.cc b/src/tls_wrap.cc
index 6d9425c040e..92febc15b8e 100644
--- a/src/tls_wrap.cc
+++ b/src/tls_wrap.cc
@@ -511,6 +511,12 @@ const char* TLSCallbacks::Error() {
}
+int TLSCallbacks::TryWrite(uv_buf_t** bufs, size_t* count) {
+ // TODO(indutny): Support it
+ return -1;
+}
+
+
int TLSCallbacks::DoWrite(WriteWrap* w,
uv_buf_t* bufs,
size_t count,
diff --git a/src/tls_wrap.h b/src/tls_wrap.h
index db78009ede2..946cc1c64d9 100644
--- a/src/tls_wrap.h
+++ b/src/tls_wrap.h
@@ -51,6 +51,7 @@ class TLSCallbacks : public crypto::SSLWrap<TLSCallbacks>,
v8::Handle<v8::Context> context);
const char* Error();
+ int TryWrite(uv_buf_t** bufs, size_t* count);
int DoWrite(WriteWrap* w,
uv_buf_t* bufs,
size_t count,
diff --git a/test/simple/test-tcp-wrap-listen.js b/test/simple/test-tcp-wrap-listen.js
index 1a3dc12ee98..940ea8b4b8c 100644
--- a/test/simple/test-tcp-wrap-listen.js
+++ b/test/simple/test-tcp-wrap-listen.js
@@ -55,7 +55,7 @@ server.onconnection = function(err, client) {
assert.equal(0, client.writeQueueSize);
- var req = {};
+ var req = { async: false };
var err = client.writeBuffer(req, buffer);
assert.equal(err, 0);
client.pendingWrites.push(req);
@@ -64,7 +64,12 @@ server.onconnection = function(err, client) {
// 11 bytes should flush
assert.equal(0, client.writeQueueSize);
- req.oncomplete = function(status, client_, req_) {
+ if (req.async && client.writeQueueSize != 0)
+ req.oncomplete = done;
+ else
+ process.nextTick(done.bind(null, 0, client, req));
+
+ function done(status, client_, req_) {
assert.equal(req, client.pendingWrites.shift());
// Check parameters.