Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/nodejs/node.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRyan Dahl <ry@tinyclouds.org>2010-11-17 21:58:45 +0300
committerRyan Dahl <ry@tinyclouds.org>2010-11-19 03:47:38 +0300
commitd3fbe3e3d141973a00722f5631f200cdf5c69c46 (patch)
tree3670453914f75a3a9192766d0280faa468a9f071 /src
parentfa556a142512ab932b7359760e5e4585e4e035b6 (diff)
Emit drain and stop IOWatcher even on empty buffer
Diffstat (limited to 'src')
-rw-r--r--src/node_io_watcher.cc245
1 files changed, 123 insertions, 122 deletions
diff --git a/src/node_io_watcher.cc b/src/node_io_watcher.cc
index 62c56464632..45bc424eeaf 100644
--- a/src/node_io_watcher.cc
+++ b/src/node_io_watcher.cc
@@ -374,155 +374,156 @@ void IOWatcher::Dump() {
}
}
- if (to_write == 0) continue;
-
- ssize_t written;
-
- if (unix_socket) {
- struct msghdr msg;
- char scratch[64];
-
- msg.msg_name = NULL;
- msg.msg_namelen = 0;
- msg.msg_iov = iov;
- msg.msg_iovlen = iovcnt;
- msg.msg_control = NULL; // void*
- msg.msg_controllen = 0; // socklen_t
- msg.msg_flags = 0; // int
-
- if (fd_to_send >= 0) {
- struct cmsghdr *cmsg;
-
- msg.msg_control = (void *) scratch;
- msg.msg_controllen = CMSG_LEN(sizeof(fd_to_send));
+ if (to_write > 0) {
+ ssize_t written;
+
+ if (unix_socket) {
+ struct msghdr msg;
+ char scratch[64];
+
+ msg.msg_name = NULL;
+ msg.msg_namelen = 0;
+ msg.msg_iov = iov;
+ msg.msg_iovlen = iovcnt;
+ msg.msg_control = NULL; // void*
+ msg.msg_controllen = 0; // socklen_t
+ msg.msg_flags = 0; // int
+
+ if (fd_to_send >= 0) {
+ struct cmsghdr *cmsg;
+
+ msg.msg_control = (void *) scratch;
+ msg.msg_controllen = CMSG_LEN(sizeof(fd_to_send));
+
+ cmsg = CMSG_FIRSTHDR(&msg);
+ cmsg->cmsg_level = SOL_SOCKET;
+ cmsg->cmsg_type = SCM_RIGHTS;
+ cmsg->cmsg_len = msg.msg_controllen;
+ *(int*) CMSG_DATA(cmsg) = fd_to_send;
+ }
- cmsg = CMSG_FIRSTHDR(&msg);
- cmsg->cmsg_level = SOL_SOCKET;
- cmsg->cmsg_type = SCM_RIGHTS;
- cmsg->cmsg_len = msg.msg_controllen;
- *(int*) CMSG_DATA(cmsg) = fd_to_send;
+ written = sendmsg(io->watcher_.fd, &msg, 0);
+ } else {
+ written = writev(io->watcher_.fd, iov, iovcnt);
}
- written = sendmsg(io->watcher_.fd, &msg, 0);
- } else {
- written = writev(io->watcher_.fd, iov, iovcnt);
- }
+ DEBUG_PRINT("iovcnt: %d, to_write: %ld, written: %ld",
+ iovcnt,
+ to_write,
+ written);
- DEBUG_PRINT("iovcnt: %d, to_write: %ld, written: %ld",
- iovcnt,
- to_write,
- written);
-
- if (written < 0) {
- // Allow EAGAIN.
- // TODO: handle EMSGSIZE after sendmsg().
- if (errno == EAGAIN) {
- io->Start();
- } else {
- // Emit error event
- if (watcher->Has(onerror_sym)) {
- Local<Value> callback_v = io->handle_->Get(onerror_sym);
- assert(callback_v->IsFunction());
- Local<Function> callback = Local<Function>::Cast(callback_v);
+ if (written < 0) {
+ // Allow EAGAIN.
+ // TODO: handle EMSGSIZE after sendmsg().
+ if (errno == EAGAIN) {
+ io->Start();
+ } else {
+ // Emit error event
+ if (watcher->Has(onerror_sym)) {
+ Local<Value> callback_v = io->handle_->Get(onerror_sym);
+ assert(callback_v->IsFunction());
+ Local<Function> callback = Local<Function>::Cast(callback_v);
- Local<Value> argv[1] = { Integer::New(errno) };
+ Local<Value> argv[1] = { Integer::New(errno) };
- TryCatch try_catch;
+ TryCatch try_catch;
- callback->Call(io->handle_, 1, argv);
+ callback->Call(io->handle_, 1, argv);
- if (try_catch.HasCaught()) {
- FatalException(try_catch);
+ if (try_catch.HasCaught()) {
+ FatalException(try_catch);
+ }
}
}
+ // Continue with the next watcher.
+ continue;
}
- // Continue with the next watcher.
- continue;
- }
-
- // what about written == 0 ?
- size_t queue_size = watcher->Get(queue_size_sym)->Uint32Value();
- assert(queue_size >= offset);
+ // what about written == 0 ?
- // Now drop the buckets that have been written.
- bucket_index = 0;
+ size_t queue_size = watcher->Get(queue_size_sym)->Uint32Value();
+ assert(queue_size >= offset);
- while (written > 0) {
- bucket_v = watcher->Get(first_bucket_sym);
- if (!bucket_v->IsObject()) {
- // No more buckets in the queue. Make sure the last_bucket_sym is
- // updated and then go to the next watcher.
- watcher->Set(last_bucket_sym, Null());
- break;
- }
+ // Now drop the buckets that have been written.
+ bucket_index = 0;
- bucket = bucket_v->ToObject();
-
- Local<Value> data_v = bucket->Get(data_sym);
- assert(!data_v.IsEmpty());
+ while (written > 0) {
+ bucket_v = watcher->Get(first_bucket_sym);
+ if (!bucket_v->IsObject()) {
+ // No more buckets in the queue. Make sure the last_bucket_sym is
+ // updated and then go to the next watcher.
+ watcher->Set(last_bucket_sym, Null());
+ break;
+ }
- // At the moment we're turning all string into buffers
- // so we assert that this is not a string. However, when the
- // "Pointer patch" lands, this assert will need to be removed.
- assert(!data_v->IsString());
- // When the "Pointer patch" lands, we will need to be careful
- // to somehow store the length of strings that we're optimizing on
- // so that it need not be recalculated here. Note the "Pointer patch"
- // will only apply to ASCII strings - UTF8 ones will need to be
- // serialized onto a buffer.
- size_t bucket_len = Buffer::Length(data_v->ToObject());
+ bucket = bucket_v->ToObject();
- if (unix_socket && bucket->Has(fd_sym)) {
- bucket->Set(fd_sym, Null());
- }
+ Local<Value> data_v = bucket->Get(data_sym);
+ assert(!data_v.IsEmpty());
- assert(bucket_len > offset);
- DEBUG_PRINT("[%ld] bucket_len: %ld, offset: %ld", bucket_index, bucket_len, offset);
+ // At the moment we're turning all string into buffers
+ // so we assert that this is not a string. However, when the
+ // "Pointer patch" lands, this assert will need to be removed.
+ assert(!data_v->IsString());
+ // When the "Pointer patch" lands, we will need to be careful
+ // to somehow store the length of strings that we're optimizing on
+ // so that it need not be recalculated here. Note the "Pointer patch"
+ // will only apply to ASCII strings - UTF8 ones will need to be
+ // serialized onto a buffer.
+ size_t bucket_len = Buffer::Length(data_v->ToObject());
- queue_size -= written;
-
- // Only on the first bucket does is the offset > 0.
- if (offset + written < bucket_len) {
- // we have not written the entire bucket
- DEBUG_PRINT("[%ld] Only wrote part of the buffer. "
- "setting watcher.offset = %ld",
- bucket_index,
- offset + written);
+ if (unix_socket && bucket->Has(fd_sym)) {
+ bucket->Set(fd_sym, Null());
+ }
- watcher->Set(offset_sym,
- Integer::NewFromUnsigned(offset + written));
- break;
- } else {
- DEBUG_PRINT("[%ld] wrote the whole bucket. discarding.",
- bucket_index);
-
- written -= bucket_len - offset;
-
- Local<Value> bucket_callback_v = bucket->Get(callback_sym);
- if (bucket_callback_v->IsFunction()) {
- Local<Function> bucket_callback =
- Local<Function>::Cast(bucket_callback_v);
- TryCatch try_catch;
- bucket_callback->Call(io->handle_, 0, NULL);
- if (try_catch.HasCaught()) {
- FatalException(try_catch);
+ assert(bucket_len > offset);
+ DEBUG_PRINT("[%ld] bucket_len: %ld, offset: %ld", bucket_index, bucket_len, offset);
+
+ queue_size -= written;
+
+ // Only on the first bucket does is the offset > 0.
+ if (offset + written < bucket_len) {
+ // we have not written the entire bucket
+ DEBUG_PRINT("[%ld] Only wrote part of the buffer. "
+ "setting watcher.offset = %ld",
+ bucket_index,
+ offset + written);
+
+ watcher->Set(offset_sym,
+ Integer::NewFromUnsigned(offset + written));
+ break;
+ } else {
+ DEBUG_PRINT("[%ld] wrote the whole bucket. discarding.",
+ bucket_index);
+
+ written -= bucket_len - offset;
+
+ Local<Value> bucket_callback_v = bucket->Get(callback_sym);
+ if (bucket_callback_v->IsFunction()) {
+ Local<Function> bucket_callback =
+ Local<Function>::Cast(bucket_callback_v);
+ TryCatch try_catch;
+ bucket_callback->Call(io->handle_, 0, NULL);
+ if (try_catch.HasCaught()) {
+ FatalException(try_catch);
+ }
}
+
+ // Offset is now zero
+ watcher->Set(offset_sym, Integer::NewFromUnsigned(0));
}
- // Offset is now zero
- watcher->Set(offset_sym, Integer::NewFromUnsigned(0));
- }
+ offset = 0; // the next bucket will have zero offset;
+ bucket_index++;
- offset = 0; // the next bucket will have zero offset;
- bucket_index++;
+ // unshift
+ watcher->Set(first_bucket_sym, bucket->Get(next_sym));
+ }
- // unshift
- watcher->Set(first_bucket_sym, bucket->Get(next_sym));
+ // Set the queue size.
+ watcher->Set(queue_size_sym, Integer::NewFromUnsigned(queue_size));
}
- // Set the queue size.
- watcher->Set(queue_size_sym, Integer::NewFromUnsigned(queue_size));
// Finished dumping the buckets.
//