Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/nodejs/node.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/deps
diff options
context:
space:
mode:
authorRyan Dahl <ry@tinyclouds.org>2011-07-19 03:26:37 +0400
committerRyan Dahl <ry@tinyclouds.org>2011-07-19 03:26:37 +0400
commit2e16ae703ee5d14cf7199218b3112407a188af42 (patch)
treed7d6a66fec08ecb4c4e6fa8244e063c97fdb0c53 /deps
parent85404c5c558ebbba3aaf27c56d1edccc2061d664 (diff)
Upgrade libuv to 4eff34da4
Diffstat (limited to 'deps')
-rw-r--r--deps/uv/include/uv-unix.h22
-rw-r--r--deps/uv/include/uv-win.h2
-rw-r--r--deps/uv/src/uv-unix.c854
-rw-r--r--deps/uv/src/uv-win.c153
-rw-r--r--deps/uv/test/benchmark-pump.c7
-rw-r--r--deps/uv/test/echo-server.c43
-rw-r--r--deps/uv/test/task.h3
7 files changed, 775 insertions, 309 deletions
diff --git a/deps/uv/include/uv-unix.h b/deps/uv/include/uv-unix.h
index 5a92f1257d5..cad2ed4abe9 100644
--- a/deps/uv/include/uv-unix.h
+++ b/deps/uv/include/uv-unix.h
@@ -67,25 +67,27 @@ typedef struct {
#define UV_STREAM_PRIVATE_FIELDS \
uv_read_cb read_cb; \
- uv_alloc_cb alloc_cb;
-
-
-/* UV_TCP */
-#define UV_TCP_PRIVATE_FIELDS \
- int delayed_error; \
- uv_connection_cb connection_cb; \
- int accepted_fd; \
+ uv_alloc_cb alloc_cb; \
uv_connect_t *connect_req; \
uv_shutdown_t *shutdown_req; \
ev_io read_watcher; \
ev_io write_watcher; \
ngx_queue_t write_queue; \
- ngx_queue_t write_completed_queue;
+ ngx_queue_t write_completed_queue; \
+ int delayed_error; \
+ uv_connection_cb connection_cb; \
+ int accepted_fd;
+
+
+/* UV_TCP */
+#define UV_TCP_PRIVATE_FIELDS
/* UV_NAMED_PIPE */
#define UV_PIPE_PRIVATE_TYPEDEF
-#define UV_PIPE_PRIVATE_FIELDS
+#define UV_PIPE_PRIVATE_FIELDS \
+ UV_TCP_PRIVATE_FIELDS \
+ const char* pipe_fname; /* strdup'ed */ \
/* UV_PREPARE */ \
diff --git a/deps/uv/include/uv-win.h b/deps/uv/include/uv-win.h
index d9f65ba5d19..12588e96974 100644
--- a/deps/uv/include/uv-win.h
+++ b/deps/uv/include/uv-win.h
@@ -96,7 +96,6 @@ typedef struct uv_buf_t {
struct uv_req_s accept_req; \
#define uv_pipe_server_fields \
- char* name; \
uv_pipe_accept_t accept_reqs[4]; \
uv_pipe_accept_t* pending_accepts;
@@ -104,6 +103,7 @@ typedef struct uv_buf_t {
HANDLE handle;
#define UV_PIPE_PRIVATE_FIELDS \
+ char* name; \
union { \
struct { uv_pipe_server_fields }; \
struct { uv_pipe_connection_fields }; \
diff --git a/deps/uv/src/uv-unix.c b/deps/uv/src/uv-unix.c
index 1dc20d84373..f35cc6f043b 100644
--- a/deps/uv/src/uv-unix.c
+++ b/deps/uv/src/uv-unix.c
@@ -22,6 +22,8 @@
#include "uv-common.h"
#include "uv-eio.h"
+#define _GNU_SOURCE /* O_CLOEXEC */
+
#include <stddef.h> /* NULL */
#include <stdio.h> /* printf */
#include <stdlib.h>
@@ -29,8 +31,11 @@
#include <errno.h>
#include <assert.h>
#include <unistd.h>
+#include <sys/types.h>
+#include <sys/stat.h>
#include <fcntl.h>
#include <sys/socket.h>
+#include <sys/un.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <limits.h> /* PATH_MAX */
@@ -60,11 +65,30 @@ static struct uv_ares_data_s ares_data;
void uv__req_init(uv_req_t*);
-void uv__tcp_io(EV_P_ ev_io* watcher, int revents);
void uv__next(EV_P_ ev_idle* watcher, int revents);
-static void uv__tcp_connect(uv_tcp_t*);
-int uv_tcp_open(uv_tcp_t*, int fd);
+static int uv__stream_open(uv_stream_t*, int fd);
static void uv__finish_close(uv_handle_t* handle);
+static uv_err_t uv_err_new(uv_handle_t* handle, int sys_error);
+
+static uv_write_t* uv__write(uv_stream_t* stream);
+static void uv__read(uv_stream_t* stream);
+static void uv__stream_connect(uv_stream_t*);
+static void uv__stream_io(EV_P_ ev_io* watcher, int revents);
+static void uv__pipe_accept(EV_P_ ev_io* watcher, int revents);
+
+#ifndef __GNUC__
+#define __attribute__(a)
+#endif
+
+/* Unused on systems that support O_CLOEXEC, SOCK_CLOEXEC, etc. */
+static int uv__cloexec(int fd, int set) __attribute__((unused));
+static int uv__nonblock(int fd, int set) __attribute__((unused));
+
+static int uv__socket(int domain, int type, int protocol);
+static int uv__accept(int sockfd, struct sockaddr* saddr, socklen_t len);
+
+size_t uv__strlcpy(char* dst, const char* src, size_t size);
+
/* flags */
enum {
@@ -161,6 +185,7 @@ static uv_err_t uv_err_new(uv_handle_t* handle, int sys_error) {
int uv_close(uv_handle_t* handle, uv_close_cb close_cb) {
uv_tcp_t* tcp;
+ uv_pipe_t* pipe;
uv_async_t* async;
uv_timer_t* timer;
@@ -199,6 +224,22 @@ int uv_close(uv_handle_t* handle, uv_close_cb close_cb) {
ev_timer_stop(EV_DEFAULT_ &timer->timer_watcher);
break;
+ case UV_NAMED_PIPE:
+ pipe = (uv_pipe_t*)handle;
+ if (pipe->pipe_fname) {
+ /*
+ * Unlink the file system entity before closing the file descriptor.
+ * Doing it the other way around introduces a race where our process
+ * unlinks a socket with the same name that's just been created by
+ * another thread or process.
+ */
+ unlink(pipe->pipe_fname);
+ free((void*)pipe->pipe_fname);
+ }
+ uv_read_stop((uv_stream_t*)pipe);
+ ev_io_stop(EV_DEFAULT_ &pipe->write_watcher);
+ break;
+
default:
assert(0);
return -1;
@@ -258,10 +299,10 @@ int uv_tcp_init(uv_tcp_t* tcp) {
ngx_queue_init(&tcp->write_completed_queue);
tcp->write_queue_size = 0;
- ev_init(&tcp->read_watcher, uv__tcp_io);
+ ev_init(&tcp->read_watcher, uv__stream_io);
tcp->read_watcher.data = tcp;
- ev_init(&tcp->write_watcher, uv__tcp_io);
+ ev_init(&tcp->write_watcher, uv__stream_io);
tcp->write_watcher.data = tcp;
assert(ngx_queue_empty(&tcp->write_queue));
@@ -273,40 +314,42 @@ int uv_tcp_init(uv_tcp_t* tcp) {
int uv__bind(uv_tcp_t* tcp, int domain, struct sockaddr* addr, int addrsize) {
- int r;
+ int saved_errno;
+ int status;
+ int fd;
- if (tcp->fd <= 0) {
- int fd = socket(domain, SOCK_STREAM, 0);
+ saved_errno = errno;
+ status = -1;
- if (fd < 0) {
+ if (tcp->fd <= 0) {
+ if ((fd = uv__socket(domain, SOCK_STREAM, 0)) == -1) {
uv_err_new((uv_handle_t*)tcp, errno);
- return -1;
+ goto out;
}
- if (uv_tcp_open(tcp, fd)) {
+ if (uv__stream_open((uv_stream_t*)tcp, fd)) {
+ status = -2;
close(fd);
- return -2;
+ goto out;
}
}
assert(tcp->fd >= 0);
- r = bind(tcp->fd, addr, addrsize);
tcp->delayed_error = 0;
-
- if (r) {
- switch (errno) {
- case EADDRINUSE:
- tcp->delayed_error = errno;
- return 0;
-
- default:
- uv_err_new((uv_handle_t*)tcp, errno);
- return -1;
+ if (bind(tcp->fd, addr, addrsize) == -1) {
+ if (errno == EADDRINUSE) {
+ tcp->delayed_error = errno;
+ } else {
+ uv_err_new((uv_handle_t*)tcp, errno);
+ goto out;
}
}
+ status = 0;
- return 0;
+out:
+ errno = saved_errno;
+ return status;
}
@@ -330,32 +373,27 @@ int uv_tcp_bind6(uv_tcp_t* tcp, struct sockaddr_in6 addr) {
}
-int uv_tcp_open(uv_tcp_t* tcp, int fd) {
- int yes;
- int r;
+static int uv__stream_open(uv_stream_t* stream, int fd) {
+ socklen_t yes;
assert(fd >= 0);
- tcp->fd = fd;
+ stream->fd = fd;
- /* Set non-blocking. */
+ /* Reuse the port address if applicable. */
yes = 1;
- r = fcntl(fd, F_SETFL, O_NONBLOCK);
- assert(r == 0);
-
- /* Reuse the port address. */
- r = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
- assert(r == 0);
+ if (stream->type == UV_TCP
+ && setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1) {
+ uv_err_new((uv_handle_t*)stream, errno);
+ return -1;
+ }
/* Associate the fd with each ev_io watcher. */
- ev_io_set(&tcp->read_watcher, fd, EV_READ);
- ev_io_set(&tcp->write_watcher, fd, EV_WRITE);
+ ev_io_set(&stream->read_watcher, fd, EV_READ);
+ ev_io_set(&stream->write_watcher, fd, EV_WRITE);
- /* These should have been set up by uv_tcp_init. */
- assert(tcp->next_watcher.data == tcp);
- assert(tcp->write_watcher.data == tcp);
- assert(tcp->read_watcher.data == tcp);
- assert(tcp->read_watcher.cb == uv__tcp_io);
- assert(tcp->write_watcher.cb == uv__tcp_io);
+ /* These should have been set up by uv_tcp_init or uv_pipe_init. */
+ assert(stream->read_watcher.cb == uv__stream_io);
+ assert(stream->write_watcher.cb == uv__stream_io);
return 0;
}
@@ -365,22 +403,22 @@ void uv__server_io(EV_P_ ev_io* watcher, int revents) {
int fd;
struct sockaddr_storage addr;
socklen_t addrlen = sizeof(struct sockaddr_storage);
- uv_tcp_t* tcp = watcher->data;
+ uv_stream_t* stream = watcher->data;
- assert(watcher == &tcp->read_watcher ||
- watcher == &tcp->write_watcher);
+ assert(watcher == &stream->read_watcher ||
+ watcher == &stream->write_watcher);
assert(revents == EV_READ);
- assert(!uv_flag_is_set((uv_handle_t*)tcp, UV_CLOSING));
+ assert(!uv_flag_is_set((uv_handle_t*)stream, UV_CLOSING));
- if (tcp->accepted_fd >= 0) {
- ev_io_stop(EV_DEFAULT_ &tcp->read_watcher);
+ if (stream->accepted_fd >= 0) {
+ ev_io_stop(EV_DEFAULT_ &stream->read_watcher);
return;
}
while (1) {
- assert(tcp->accepted_fd < 0);
- fd = accept(tcp->fd, (struct sockaddr*)&addr, &addrlen);
+ assert(stream->accepted_fd < 0);
+ fd = accept(stream->fd, (struct sockaddr*)&addr, &addrlen);
if (fd < 0) {
if (errno == EAGAIN) {
@@ -390,16 +428,16 @@ void uv__server_io(EV_P_ ev_io* watcher, int revents) {
/* TODO special trick. unlock reserved socket, accept, close. */
return;
} else {
- uv_err_new((uv_handle_t*)tcp, errno);
- tcp->connection_cb((uv_handle_t*)tcp, -1);
+ uv_err_new((uv_handle_t*)stream, errno);
+ stream->connection_cb((uv_handle_t*)stream, -1);
}
} else {
- tcp->accepted_fd = fd;
- tcp->connection_cb((uv_handle_t*)tcp, 0);
- if (tcp->accepted_fd >= 0) {
+ stream->accepted_fd = fd;
+ stream->connection_cb((uv_handle_t*)stream, 0);
+ if (stream->accepted_fd >= 0) {
/* The user hasn't yet accepted called uv_accept() */
- ev_io_stop(EV_DEFAULT_ &tcp->read_watcher);
+ ev_io_stop(EV_DEFAULT_ &stream->read_watcher);
return;
}
}
@@ -408,24 +446,36 @@ void uv__server_io(EV_P_ ev_io* watcher, int revents) {
int uv_accept(uv_handle_t* server, uv_stream_t* client) {
- uv_tcp_t* tcpServer = (uv_tcp_t*)server;
- uv_tcp_t* tcpClient = (uv_tcp_t*)client;
+ uv_stream_t* streamServer;
+ uv_stream_t* streamClient;
+ int saved_errno;
+ int status;
+
+ saved_errno = errno;
+ status = -1;
+
+ streamServer = (uv_stream_t*)server;
+ streamClient = (uv_stream_t*)client;
- if (tcpServer->accepted_fd < 0) {
+ if (streamServer->accepted_fd < 0) {
uv_err_new(server, EAGAIN);
- return -1;
+ goto out;
}
- if (uv_tcp_open(tcpClient, tcpServer->accepted_fd)) {
- /* Ignore error for now */
- tcpServer->accepted_fd = -1;
- close(tcpServer->accepted_fd);
- return -1;
- } else {
- tcpServer->accepted_fd = -1;
- ev_io_start(EV_DEFAULT_ &tcpServer->read_watcher);
- return 0;
+ if (uv__stream_open(streamClient, streamServer->accepted_fd)) {
+ /* TODO handle error */
+ streamServer->accepted_fd = -1;
+ close(streamServer->accepted_fd);
+ goto out;
}
+
+ ev_io_start(EV_DEFAULT_ &streamServer->read_watcher);
+ streamServer->accepted_fd = -1;
+ status = 0;
+
+out:
+ errno = saved_errno;
+ return status;
}
@@ -457,33 +507,11 @@ int uv_tcp_listen(uv_tcp_t* tcp, int backlog, uv_connection_cb cb) {
void uv__finish_close(uv_handle_t* handle) {
- uv_tcp_t* tcp;
-
assert(uv_flag_is_set(handle, UV_CLOSING));
assert(!uv_flag_is_set(handle, UV_CLOSED));
uv_flag_set(handle, UV_CLOSED);
switch (handle->type) {
- case UV_TCP:
- /* XXX Is it necessary to stop these watchers here? weren't they
- * supposed to be stopped in uv_close()?
- */
- tcp = (uv_tcp_t*)handle;
- ev_io_stop(EV_DEFAULT_ &tcp->write_watcher);
- ev_io_stop(EV_DEFAULT_ &tcp->read_watcher);
-
- assert(!ev_is_active(&tcp->read_watcher));
- assert(!ev_is_active(&tcp->write_watcher));
-
- close(tcp->fd);
- tcp->fd = -1;
-
- if (tcp->accepted_fd >= 0) {
- close(tcp->accepted_fd);
- tcp->accepted_fd = -1;
- }
- break;
-
case UV_PREPARE:
assert(!ev_is_active(&((uv_prepare_t*)handle)->prepare_watcher));
break;
@@ -504,6 +532,26 @@ void uv__finish_close(uv_handle_t* handle) {
assert(!ev_is_active(&((uv_timer_t*)handle)->timer_watcher));
break;
+ case UV_NAMED_PIPE:
+ case UV_TCP:
+ {
+ uv_stream_t* stream;
+
+ stream = (uv_stream_t*)handle;
+
+ assert(!ev_is_active(&stream->read_watcher));
+ assert(!ev_is_active(&stream->write_watcher));
+
+ close(stream->fd);
+ stream->fd = -1;
+
+ if (stream->accepted_fd >= 0) {
+ close(stream->accepted_fd);
+ stream->accepted_fd = -1;
+ }
+ break;
+ }
+
default:
assert(0);
break;
@@ -519,15 +567,15 @@ void uv__finish_close(uv_handle_t* handle) {
}
-uv_write_t* uv_write_queue_head(uv_tcp_t* tcp) {
+uv_write_t* uv_write_queue_head(uv_stream_t* stream) {
ngx_queue_t* q;
uv_write_t* req;
- if (ngx_queue_empty(&tcp->write_queue)) {
+ if (ngx_queue_empty(&stream->write_queue)) {
return NULL;
}
- q = ngx_queue_head(&tcp->write_queue);
+ q = ngx_queue_head(&stream->write_queue);
if (!q) {
return NULL;
}
@@ -552,31 +600,31 @@ void uv__next(EV_P_ ev_idle* watcher, int revents) {
}
-static void uv__drain(uv_tcp_t* tcp) {
+static void uv__drain(uv_stream_t* stream) {
uv_shutdown_t* req;
- assert(!uv_write_queue_head(tcp));
- assert(tcp->write_queue_size == 0);
+ assert(!uv_write_queue_head(stream));
+ assert(stream->write_queue_size == 0);
- ev_io_stop(EV_DEFAULT_ &tcp->write_watcher);
+ ev_io_stop(EV_DEFAULT_ &stream->write_watcher);
/* Shutdown? */
- if (uv_flag_is_set((uv_handle_t*)tcp, UV_SHUTTING) &&
- !uv_flag_is_set((uv_handle_t*)tcp, UV_CLOSING) &&
- !uv_flag_is_set((uv_handle_t*)tcp, UV_SHUT)) {
- assert(tcp->shutdown_req);
+ if (uv_flag_is_set((uv_handle_t*)stream, UV_SHUTTING) &&
+ !uv_flag_is_set((uv_handle_t*)stream, UV_CLOSING) &&
+ !uv_flag_is_set((uv_handle_t*)stream, UV_SHUT)) {
+ assert(stream->shutdown_req);
- req = tcp->shutdown_req;
+ req = stream->shutdown_req;
- if (shutdown(tcp->fd, SHUT_WR)) {
+ if (shutdown(stream->fd, SHUT_WR)) {
/* Error. Report it. User should call uv_close(). */
- uv_err_new((uv_handle_t*)tcp, errno);
+ uv_err_new((uv_handle_t*)stream, errno);
if (req->cb) {
req->cb(req, -1);
}
} else {
- uv_err_new((uv_handle_t*)tcp, 0);
- uv_flag_set((uv_handle_t*)tcp, UV_SHUT);
+ uv_err_new((uv_handle_t*)stream, 0);
+ uv_flag_set((uv_handle_t*)stream, UV_SHUT);
if (req->cb) {
req->cb(req, 0);
}
@@ -588,24 +636,24 @@ static void uv__drain(uv_tcp_t* tcp) {
/* On success returns NULL. On error returns a pointer to the write request
* which had the error.
*/
-static uv_write_t* uv__write(uv_tcp_t* tcp) {
+static uv_write_t* uv__write(uv_stream_t* stream) {
uv_write_t* req;
struct iovec* iov;
int iovcnt;
ssize_t n;
- assert(tcp->fd >= 0);
+ assert(stream->fd >= 0);
/* TODO: should probably while(1) here until EAGAIN */
/* Get the request at the head of the queue. */
- req = uv_write_queue_head(tcp);
+ req = uv_write_queue_head(stream);
if (!req) {
- assert(tcp->write_queue_size == 0);
+ assert(stream->write_queue_size == 0);
return NULL;
}
- assert(req->handle == (uv_stream_t*)tcp);
+ assert(req->handle == stream);
/* Cast to iovec. We had to have our own uv_buf_t instead of iovec
* because Windows's WSABUF is not an iovec.
@@ -619,16 +667,16 @@ static uv_write_t* uv__write(uv_tcp_t* tcp) {
*/
if (iovcnt == 1) {
- n = write(tcp->fd, iov[0].iov_base, iov[0].iov_len);
+ n = write(stream->fd, iov[0].iov_base, iov[0].iov_len);
}
else {
- n = writev(tcp->fd, iov, iovcnt);
+ n = writev(stream->fd, iov, iovcnt);
}
if (n < 0) {
if (errno != EAGAIN) {
/* Error */
- uv_err_new((uv_handle_t*)tcp, errno);
+ uv_err_new((uv_handle_t*)stream, errno);
return req;
}
} else {
@@ -644,7 +692,7 @@ static uv_write_t* uv__write(uv_tcp_t* tcp) {
if (n < len) {
buf->base += n;
buf->len -= n;
- tcp->write_queue_size -= n;
+ stream->write_queue_size -= n;
n = 0;
/* There is more to write. Break and ensure the watcher is pending. */
@@ -657,8 +705,8 @@ static uv_write_t* uv__write(uv_tcp_t* tcp) {
assert(n >= len);
n -= len;
- assert(tcp->write_queue_size >= len);
- tcp->write_queue_size -= len;
+ assert(stream->write_queue_size >= len);
+ stream->write_queue_size -= len;
if (req->write_index == req->bufcnt) {
/* Then we're done! */
@@ -675,8 +723,8 @@ static uv_write_t* uv__write(uv_tcp_t* tcp) {
* callback called in the near future.
* TODO: start trying to write the next request.
*/
- ngx_queue_insert_tail(&tcp->write_completed_queue, &req->queue);
- ev_feed_event(EV_DEFAULT_ &tcp->write_watcher, EV_WRITE);
+ ngx_queue_insert_tail(&stream->write_completed_queue, &req->queue);
+ ev_feed_event(EV_DEFAULT_ &stream->write_watcher, EV_WRITE);
return NULL;
}
}
@@ -687,20 +735,20 @@ static uv_write_t* uv__write(uv_tcp_t* tcp) {
assert(n == 0 || n == -1);
/* We're not done. */
- ev_io_start(EV_DEFAULT_ &tcp->write_watcher);
+ ev_io_start(EV_DEFAULT_ &stream->write_watcher);
return NULL;
}
-static void uv__write_callbacks(uv_tcp_t* tcp) {
+static void uv__write_callbacks(uv_stream_t* stream) {
int callbacks_made = 0;
ngx_queue_t* q;
uv_write_t* req;
- while (!ngx_queue_empty(&tcp->write_completed_queue)) {
+ while (!ngx_queue_empty(&stream->write_completed_queue)) {
/* Pop a req off write_completed_queue. */
- q = ngx_queue_head(&tcp->write_completed_queue);
+ q = ngx_queue_head(&stream->write_completed_queue);
assert(q);
req = ngx_queue_data(q, struct uv_write_s, queue);
ngx_queue_remove(q);
@@ -713,16 +761,16 @@ static void uv__write_callbacks(uv_tcp_t* tcp) {
callbacks_made++;
}
- assert(ngx_queue_empty(&tcp->write_completed_queue));
+ assert(ngx_queue_empty(&stream->write_completed_queue));
/* Write queue drained. */
- if (!uv_write_queue_head(tcp)) {
- uv__drain(tcp);
+ if (!uv_write_queue_head(stream)) {
+ uv__drain(stream);
}
}
-void uv__read(uv_tcp_t* tcp) {
+static void uv__read(uv_stream_t* stream) {
uv_buf_t buf;
struct iovec* iov;
ssize_t nread;
@@ -730,43 +778,43 @@ void uv__read(uv_tcp_t* tcp) {
/* XXX: Maybe instead of having UV_READING we just test if
* tcp->read_cb is NULL or not?
*/
- while (tcp->read_cb && uv_flag_is_set((uv_handle_t*)tcp, UV_READING)) {
- assert(tcp->alloc_cb);
- buf = tcp->alloc_cb((uv_stream_t*)tcp, 64 * 1024);
+ while (stream->read_cb && uv_flag_is_set((uv_handle_t*)stream, UV_READING)) {
+ assert(stream->alloc_cb);
+ buf = stream->alloc_cb(stream, 64 * 1024);
assert(buf.len > 0);
assert(buf.base);
iov = (struct iovec*) &buf;
- nread = read(tcp->fd, buf.base, buf.len);
+ nread = read(stream->fd, buf.base, buf.len);
if (nread < 0) {
/* Error */
if (errno == EAGAIN) {
/* Wait for the next one. */
- if (uv_flag_is_set((uv_handle_t*)tcp, UV_READING)) {
- ev_io_start(EV_DEFAULT_UC_ &tcp->read_watcher);
+ if (uv_flag_is_set((uv_handle_t*)stream, UV_READING)) {
+ ev_io_start(EV_DEFAULT_UC_ &stream->read_watcher);
}
- uv_err_new((uv_handle_t*)tcp, EAGAIN);
- tcp->read_cb((uv_stream_t*)tcp, 0, buf);
+ uv_err_new((uv_handle_t*)stream, EAGAIN);
+ stream->read_cb(stream, 0, buf);
return;
} else {
/* Error. User should call uv_close(). */
- uv_err_new((uv_handle_t*)tcp, errno);
- tcp->read_cb((uv_stream_t*)tcp, -1, buf);
- assert(!ev_is_active(&tcp->read_watcher));
+ uv_err_new((uv_handle_t*)stream, errno);
+ stream->read_cb(stream, -1, buf);
+ assert(!ev_is_active(&stream->read_watcher));
return;
}
} else if (nread == 0) {
/* EOF */
- uv_err_new_artificial((uv_handle_t*)tcp, UV_EOF);
- ev_io_stop(EV_DEFAULT_UC_ &tcp->read_watcher);
- tcp->read_cb((uv_stream_t*)tcp, -1, buf);
+ uv_err_new_artificial((uv_handle_t*)stream, UV_EOF);
+ ev_io_stop(EV_DEFAULT_UC_ &stream->read_watcher);
+ stream->read_cb(stream, -1, buf);
return;
} else {
/* Successful read */
- tcp->read_cb((uv_stream_t*)tcp, nread, buf);
+ stream->read_cb(stream, nread, buf);
}
}
}
@@ -774,8 +822,8 @@ void uv__read(uv_tcp_t* tcp) {
int uv_shutdown(uv_shutdown_t* req, uv_stream_t* handle, uv_shutdown_cb cb) {
uv_tcp_t* tcp = (uv_tcp_t*)handle;
- assert(handle->type == UV_TCP &&
- "uv_shutdown (unix) only supports uv_tcp_t right now");
+ assert((handle->type == UV_TCP || handle->type == UV_NAMED_PIPE)
+ && "uv_shutdown (unix) only supports uv_tcp_t right now");
assert(tcp->fd >= 0);
/* Initialize request */
@@ -800,31 +848,32 @@ int uv_shutdown(uv_shutdown_t* req, uv_stream_t* handle, uv_shutdown_cb cb) {
}
-void uv__tcp_io(EV_P_ ev_io* watcher, int revents) {
- uv_tcp_t* tcp = watcher->data;
+static void uv__stream_io(EV_P_ ev_io* watcher, int revents) {
+ uv_stream_t* stream = watcher->data;
- assert(tcp->type == UV_TCP);
- assert(watcher == &tcp->read_watcher ||
- watcher == &tcp->write_watcher);
- assert(tcp->fd >= 0);
- assert(!uv_flag_is_set((uv_handle_t*)tcp, UV_CLOSING));
+ assert(stream->type == UV_TCP ||
+ stream->type == UV_NAMED_PIPE);
+ assert(watcher == &stream->read_watcher ||
+ watcher == &stream->write_watcher);
+ assert(stream->fd >= 0);
+ assert(!uv_flag_is_set((uv_handle_t*)stream, UV_CLOSING));
- if (tcp->connect_req) {
- uv__tcp_connect(tcp);
+ if (stream->connect_req) {
+ uv__stream_connect(stream);
} else {
if (revents & EV_READ) {
- uv__read(tcp);
+ uv__read((uv_stream_t*)stream);
}
if (revents & EV_WRITE) {
- uv_write_t* req = uv__write(tcp);
+ uv_write_t* req = uv__write(stream);
if (req) {
/* Error. Notify the user. */
if (req->cb) {
req->cb(req, -1);
}
} else {
- uv__write_callbacks(tcp);
+ uv__write_callbacks(stream);
}
}
}
@@ -836,32 +885,32 @@ void uv__tcp_io(EV_P_ ev_io* watcher, int revents) {
* In order to determine if we've errored out or succeeded must call
* getsockopt.
*/
-static void uv__tcp_connect(uv_tcp_t* tcp) {
+static void uv__stream_connect(uv_stream_t* stream) {
int error;
- uv_connect_t* req = tcp->connect_req;
+ uv_connect_t* req = stream->connect_req;
socklen_t errorsize = sizeof(int);
- assert(tcp->type == UV_TCP);
- assert(tcp->fd >= 0);
+ assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE);
+ assert(stream->fd >= 0);
assert(req);
- if (tcp->delayed_error) {
+ if (stream->delayed_error) {
/* To smooth over the differences between unixes errors that
* were reported synchronously on the first connect can be delayed
* until the next tick--which is now.
*/
- error = tcp->delayed_error;
- tcp->delayed_error = 0;
+ error = stream->delayed_error;
+ stream->delayed_error = 0;
} else {
/* Normal situation: we need to get the socket error from the kernel. */
- getsockopt(tcp->fd, SOL_SOCKET, SO_ERROR, &error, &errorsize);
+ getsockopt(stream->fd, SOL_SOCKET, SO_ERROR, &error, &errorsize);
}
if (!error) {
- ev_io_start(EV_DEFAULT_ &tcp->read_watcher);
+ ev_io_start(EV_DEFAULT_ &stream->read_watcher);
/* Successful connection */
- tcp->connect_req = NULL;
+ stream->connect_req = NULL;
if (req->cb) {
req->cb(req, 0);
}
@@ -871,9 +920,9 @@ static void uv__tcp_connect(uv_tcp_t* tcp) {
return;
} else {
/* Error */
- uv_err_new((uv_handle_t*)tcp, error);
+ uv_err_new((uv_handle_t*)stream, error);
- tcp->connect_req = NULL;
+ stream->connect_req = NULL;
if (req->cb) {
req->cb(req, -1);
}
@@ -881,45 +930,52 @@ static void uv__tcp_connect(uv_tcp_t* tcp) {
}
-static int uv__connect(uv_connect_t* req, uv_tcp_t* tcp, struct sockaddr* addr,
- socklen_t addrlen, uv_connect_cb cb) {
+static int uv__connect(uv_connect_t* req,
+ uv_stream_t* stream,
+ struct sockaddr* addr,
+ socklen_t addrlen,
+ uv_connect_cb cb) {
+
+ int sockfd;
int r;
- if (tcp->fd <= 0) {
- int fd = socket(addr->sa_family, SOCK_STREAM, 0);
+ if (stream->fd <= 0) {
+ if ((sockfd = uv__socket(addr->sa_family, SOCK_STREAM, 0)) == -1) {
- if (fd < 0) {
- uv_err_new((uv_handle_t*)tcp, errno);
+ }
+
+ if (sockfd < 0) {
+ uv_err_new((uv_handle_t*)stream, errno);
return -1;
}
- if (uv_tcp_open(tcp, fd)) {
- close(fd);
+ if (uv__stream_open(stream, sockfd)) {
+ close(sockfd);
return -2;
}
}
uv__req_init((uv_req_t*)req);
req->cb = cb;
- req->handle = (uv_stream_t*)tcp;
+ req->handle = stream;
req->type = UV_CONNECT;
ngx_queue_init(&req->queue);
- if (tcp->connect_req) {
- uv_err_new((uv_handle_t*)tcp, EALREADY);
+ if (stream->connect_req) {
+ uv_err_new((uv_handle_t*)stream, EALREADY);
return -1;
}
- if (tcp->type != UV_TCP) {
- uv_err_new((uv_handle_t*)tcp, ENOTSOCK);
+ if (stream->type != UV_TCP) {
+ uv_err_new((uv_handle_t*)stream, ENOTSOCK);
return -1;
}
- tcp->connect_req = req;
+ stream->connect_req = req;
- r = connect(tcp->fd, addr, addrlen);
+ r = connect(stream->fd, addr, addrlen);
- tcp->delayed_error = 0;
+ stream->delayed_error = 0;
if (r != 0 && errno != EINPROGRESS) {
switch (errno) {
@@ -928,41 +984,87 @@ static int uv__connect(uv_connect_t* req, uv_tcp_t* tcp, struct sockaddr* addr,
* wait.
*/
case ECONNREFUSED:
- tcp->delayed_error = errno;
+ stream->delayed_error = errno;
break;
default:
- uv_err_new((uv_handle_t*)tcp, errno);
+ uv_err_new((uv_handle_t*)stream, errno);
return -1;
}
}
- assert(tcp->write_watcher.data == tcp);
- ev_io_start(EV_DEFAULT_ &tcp->write_watcher);
+ assert(stream->write_watcher.data == stream);
+ ev_io_start(EV_DEFAULT_ &stream->write_watcher);
- if (tcp->delayed_error) {
- ev_feed_event(EV_DEFAULT_ &tcp->write_watcher, EV_WRITE);
+ if (stream->delayed_error) {
+ ev_feed_event(EV_DEFAULT_ &stream->write_watcher, EV_WRITE);
}
return 0;
}
-int uv_tcp_connect(uv_connect_t* req, uv_tcp_t* handle,
- struct sockaddr_in address, uv_connect_cb cb) {
- assert(handle->type == UV_TCP);
- assert(address.sin_family == AF_INET);
- return uv__connect(req, handle, (struct sockaddr*) &address,
- sizeof(struct sockaddr_in), cb);
+int uv_tcp_connect(uv_connect_t* req,
+ uv_tcp_t* handle,
+ struct sockaddr_in address,
+ uv_connect_cb cb) {
+ int saved_errno;
+ int status;
+
+ saved_errno = errno;
+ status = -1;
+
+ if (handle->type != UV_TCP) {
+ uv_err_new((uv_handle_t*)handle, EINVAL);
+ goto out;
+ }
+
+ if (address.sin_family != AF_INET) {
+ uv_err_new((uv_handle_t*)handle, EINVAL);
+ goto out;
+ }
+
+ status = uv__connect(req,
+ (uv_stream_t*)handle,
+ (struct sockaddr*)&address,
+ sizeof address,
+ cb);
+
+out:
+ errno = saved_errno;
+ return status;
}
-int uv_tcp_connect6(uv_connect_t* req, uv_tcp_t* handle,
- struct sockaddr_in6 address, uv_connect_cb cb) {
- assert(handle->type == UV_TCP);
- assert(address.sin6_family == AF_INET6);
- return uv__connect(req, handle, (struct sockaddr*) &address,
- sizeof(struct sockaddr_in6), cb);
+int uv_tcp_connect6(uv_connect_t* req,
+ uv_tcp_t* handle,
+ struct sockaddr_in6 address,
+ uv_connect_cb cb) {
+ int saved_errno;
+ int status;
+
+ saved_errno = errno;
+ status = -1;
+
+ if (handle->type != UV_TCP) {
+ uv_err_new((uv_handle_t*)handle, EINVAL);
+ goto out;
+ }
+
+ if (address.sin6_family != AF_INET6) {
+ uv_err_new((uv_handle_t*)handle, EINVAL);
+ goto out;
+ }
+
+ status = uv__connect(req,
+ (uv_stream_t*)handle,
+ (struct sockaddr*)&address,
+ sizeof address,
+ cb);
+
+out:
+ errno = saved_errno;
+ return status;
}
@@ -1004,8 +1106,10 @@ static size_t uv__buf_count(uv_buf_t bufs[], int bufcnt) {
*/
int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt,
uv_write_cb cb) {
+ uv_stream_t* stream;
int empty_queue;
- uv_tcp_t* tcp = (uv_tcp_t*)handle;
+
+ stream = (uv_stream_t*)handle;
/* Initialize the req */
uv__req_init((uv_req_t*) req);
@@ -1013,11 +1117,11 @@ int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt,
req->handle = handle;
ngx_queue_init(&req->queue);
- assert(handle->type == UV_TCP &&
- "uv_write (unix) does not yet support other types of streams");
+ assert((handle->type == UV_TCP || handle->type == UV_NAMED_PIPE)
+ && "uv_write (unix) does not yet support other types of streams");
- empty_queue = (tcp->write_queue_size == 0);
- assert(tcp->fd >= 0);
+ empty_queue = (stream->write_queue_size == 0);
+ assert(stream->fd >= 0);
ngx_queue_init(&req->queue);
req->type = UV_WRITE;
@@ -1038,22 +1142,22 @@ int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt,
*/
req->write_index = 0;
- tcp->write_queue_size += uv__buf_count(bufs, bufcnt);
+ stream->write_queue_size += uv__buf_count(bufs, bufcnt);
/* Append the request to write_queue. */
- ngx_queue_insert_tail(&tcp->write_queue, &req->queue);
+ ngx_queue_insert_tail(&stream->write_queue, &req->queue);
- assert(!ngx_queue_empty(&tcp->write_queue));
- assert(tcp->write_watcher.cb == uv__tcp_io);
- assert(tcp->write_watcher.data == tcp);
- assert(tcp->write_watcher.fd == tcp->fd);
+ assert(!ngx_queue_empty(&stream->write_queue));
+ assert(stream->write_watcher.cb == uv__stream_io);
+ assert(stream->write_watcher.data == stream);
+ assert(stream->write_watcher.fd == stream->fd);
/* If the queue was empty when this function began, we should attempt to
* do the write immediately. Otherwise start the write_watcher and wait
* for the fd to become writable.
*/
if (empty_queue) {
- if (uv__write(tcp)) {
+ if (uv__write(stream)) {
/* Error. uv_last_error has been set. */
return -1;
}
@@ -1063,13 +1167,13 @@ int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt,
* means we need to make the callback. The callback can only be done on a
* fresh stack so we feed the event loop in order to service it.
*/
- if (ngx_queue_empty(&tcp->write_queue)) {
- ev_feed_event(EV_DEFAULT_ &tcp->write_watcher, EV_WRITE);
+ if (ngx_queue_empty(&stream->write_queue)) {
+ ev_feed_event(EV_DEFAULT_ &stream->write_watcher, EV_WRITE);
} else {
/* Otherwise there is data to write - so we should wait for the file
* descriptor to become writable.
*/
- ev_io_start(EV_DEFAULT_ &tcp->write_watcher);
+ ev_io_start(EV_DEFAULT_ &stream->write_watcher);
}
return 0;
@@ -1097,28 +1201,27 @@ int64_t uv_now() {
int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, uv_read_cb read_cb) {
- uv_tcp_t* tcp = (uv_tcp_t*)stream;
+ assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE);
/* The UV_READING flag is irrelevant of the state of the tcp - it just
* expresses the desired state of the user.
*/
- uv_flag_set((uv_handle_t*)tcp, UV_READING);
+ uv_flag_set((uv_handle_t*)stream, UV_READING);
/* TODO: try to do the read inline? */
/* TODO: keep track of tcp state. If we've gotten a EOF then we should
* not start the IO watcher.
*/
- assert(tcp->fd >= 0);
+ assert(stream->fd >= 0);
assert(alloc_cb);
- tcp->read_cb = read_cb;
- tcp->alloc_cb = alloc_cb;
+ stream->read_cb = read_cb;
+ stream->alloc_cb = alloc_cb;
/* These should have been set by uv_tcp_init. */
- assert(tcp->read_watcher.data == tcp);
- assert(tcp->read_watcher.cb == uv__tcp_io);
+ assert(stream->read_watcher.cb == uv__stream_io);
- ev_io_start(EV_DEFAULT_UC_ &tcp->read_watcher);
+ ev_io_start(EV_DEFAULT_UC_ &stream->read_watcher);
return 0;
}
@@ -1635,21 +1738,306 @@ int uv_getaddrinfo(uv_getaddrinfo_t* handle,
int uv_pipe_init(uv_pipe_t* handle) {
- assert(0 && "implement me");
+ memset(handle, 0, sizeof handle);
+
+ uv__handle_init((uv_handle_t*)handle, UV_NAMED_PIPE);
+ uv_counters()->pipe_init++;
+
+ handle->type = UV_NAMED_PIPE;
+ handle->pipe_fname = NULL; /* Only set by listener. */
+
+ ev_init(&handle->write_watcher, uv__stream_io);
+ ev_init(&handle->read_watcher, uv__stream_io);
+ handle->write_watcher.data = handle;
+ handle->read_watcher.data = handle;
+ handle->fd = -1;
+
+ ngx_queue_init(&handle->write_completed_queue);
+ ngx_queue_init(&handle->write_queue);
+
+ return 0;
}
int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
- assert(0 && "implement me");
+ struct sockaddr_un sun;
+ int saved_errno;
+ int sockfd;
+ int status;
+ int bound;
+
+ saved_errno = errno;
+ sockfd = -1;
+ status = -1;
+ bound = 0;
+
+ /* Make a copy of the file name, it outlives this function's scope. */
+ if ((name = (const char*)strdup(name)) == NULL) {
+ uv_err_new((uv_handle_t*)handle, ENOMEM);
+ goto out;
+ }
+
+ if ((sockfd = uv__socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
+ uv_err_new((uv_handle_t*)handle, errno);
+ goto out;
+ }
+
+ memset(&sun, 0, sizeof sun);
+ uv__strlcpy(sun.sun_path, name, sizeof(sun.sun_path));
+ sun.sun_family = AF_UNIX;
+
+ if (bind(sockfd, (struct sockaddr*)&sun, sizeof sun) == -1) {
+#ifdef DONT_RACE_ME_BRO
+ /*
+ * Try to bind the socket. Note that we explicitly don't act
+ * on EADDRINUSE. Unlinking and trying to bind again opens
+ * a window for races with other threads and processes.
+ */
+ uv_err_new((uv_handle_t*)handle, errno);
+ goto out;
+#else
+ /*
+ * Try to re-purpose the socket. This is a potential race window.
+ */
+ if (errno != EADDRINUSE
+ || unlink(name) == -1
+ || bind(sockfd, (struct sockaddr*)&sun, sizeof sun) == -1) {
+ uv_err_new((uv_handle_t*)handle, errno);
+ goto out;
+ }
+#endif
+ }
+ bound = 1;
+
+ /* Success. */
+ handle->pipe_fname = name; /* Is a strdup'ed copy. */
+ handle->fd = sockfd;
+ status = 0;
+
+out:
+ /* Clean up on error. */
+ if (status) {
+ if (bound) {
+ /* unlink() before close() to avoid races. */
+ unlink(name);
+ }
+ close(sockfd);
+ free((void*)name);
+ }
+
+ errno = saved_errno;
+ return status;
}
int uv_pipe_listen(uv_pipe_t* handle, uv_connection_cb cb) {
- assert(0 && "implement me");
+ int saved_errno;
+ int status;
+
+ saved_errno = errno;
+
+ if ((status = listen(handle->fd, SOMAXCONN)) == -1) {
+ uv_err_new((uv_handle_t*)handle, errno);
+ } else {
+ handle->connection_cb = cb;
+ ev_io_init(&handle->read_watcher, uv__pipe_accept, handle->fd, EV_READ);
+ ev_io_start(EV_DEFAULT_ &handle->read_watcher);
+ }
+
+ errno = saved_errno;
+ return status;
+}
+
+
+int uv_pipe_connect(uv_connect_t* req,
+ uv_pipe_t* handle,
+ const char* name,
+ uv_connect_cb cb) {
+ struct sockaddr_un sun;
+ int saved_errno;
+ int sockfd;
+ int status;
+
+ saved_errno = errno;
+ sockfd = -1;
+ status = -1;
+
+ if ((sockfd = uv__socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
+ uv_err_new((uv_handle_t*)handle, errno);
+ goto out;
+ }
+
+ memset(&sun, 0, sizeof sun);
+ uv__strlcpy(sun.sun_path, name, sizeof(sun.sun_path));
+ sun.sun_family = AF_UNIX;
+
+ /* We don't check for EINPROGRESS. Think about it: the socket
+ * is either there or not.
+ */
+ if (connect(sockfd, (struct sockaddr*)&sun, sizeof sun) == -1) {
+ uv_err_new((uv_handle_t*)handle, errno);
+ close(sockfd);
+ goto out;
+ }
+
+ handle->fd = sockfd;
+ ev_io_init(&handle->read_watcher, uv__stream_io, sockfd, EV_READ);
+ ev_io_init(&handle->write_watcher, uv__stream_io, sockfd, EV_WRITE);
+ ev_io_start(EV_DEFAULT_ &handle->read_watcher);
+ ev_io_start(EV_DEFAULT_ &handle->write_watcher);
+
+ status = 0;
+
+out:
+ uv__req_init((uv_req_t*)req);
+ req->handle = (uv_stream_t*)handle;
+ req->type = UV_CONNECT;
+ req->cb = cb;
+ ngx_queue_init(&req->queue);
+
+ if (cb) {
+ cb(req, status);
+ }
+
+ /* Mimic the Windows pipe implementation, always
+ * return 0 and let the callback handle errors.
+ */
+ errno = saved_errno;
+ return 0;
}
-int uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
- const char* name, uv_connect_cb cb) {
- assert(0 && "implement me");
+/* TODO merge with uv__server_io()? */
+static void uv__pipe_accept(EV_P_ ev_io* watcher, int revents) {
+ struct sockaddr_un sun;
+ uv_pipe_t* pipe;
+ int saved_errno;
+ int sockfd;
+
+ saved_errno = errno;
+ pipe = watcher->data;
+
+ assert(pipe->type == UV_NAMED_PIPE);
+ assert(pipe->pipe_fname != NULL);
+
+ sockfd = uv__accept(pipe->fd, (struct sockaddr *)&sun, sizeof sun);
+ if (sockfd == -1) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ assert(0 && "EAGAIN on uv__accept(pipefd)");
+ } else {
+ uv_err_new((uv_handle_t*)pipe, errno);
+ }
+ } else {
+ pipe->accepted_fd = sockfd;
+ pipe->connection_cb((uv_handle_t*)pipe, 0);
+ if (pipe->accepted_fd == sockfd) {
+ /* The user hasn't yet accepted called uv_accept() */
+ ev_io_stop(EV_DEFAULT_ &pipe->read_watcher);
+ }
+ }
+
+ errno = saved_errno;
+}
+
+
+/* Open a socket in non-blocking close-on-exec mode, atomically if possible. */
+static int uv__socket(int domain, int type, int protocol) {
+#if defined(SOCK_NONBLOCK) && defined(SOCK_CLOEXEC)
+ return socket(domain, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
+#else
+ int sockfd;
+
+ if ((sockfd = socket(domain, type, protocol)) == -1) {
+ return -1;
+ }
+
+ if (uv__nonblock(sockfd, 1) == -1 || uv__cloexec(sockfd, 1) == -1) {
+ close(sockfd);
+ return -1;
+ }
+
+ return sockfd;
+#endif
+}
+
+
+static int uv__accept(int sockfd, struct sockaddr* saddr, socklen_t slen) {
+#if defined(SOCK_NONBLOCK) && defined(SOCK_CLOEXEC)
+ return accept4(sockfd, saddr, &slen, SOCK_NONBLOCK | SOCK_CLOEXEC);
+#else
+ int peerfd;
+
+ if ((peerfd = accept(sockfd, saddr, &slen)) == -1) {
+ return -1;
+ }
+
+ if (uv__cloexec(peerfd, 1) == -1 || uv__nonblock(peerfd, 1) == -1) {
+ close(peerfd);
+ return -1;
+ }
+
+ return peerfd;
+#endif
+}
+
+
+static int uv__nonblock(int fd, int set) {
+ int flags;
+
+ if ((flags = fcntl(fd, F_GETFL)) == -1) {
+ return -1;
+ }
+
+ if (set) {
+ flags |= O_NONBLOCK;
+ } else {
+ flags &= ~O_NONBLOCK;
+ }
+
+ if (fcntl(fd, F_SETFL, flags) == -1) {
+ return -1;
+ }
+
+ return 0;
+}
+
+
+static int uv__cloexec(int fd, int set) {
+ int flags;
+
+ if ((flags = fcntl(fd, F_GETFD)) == -1) {
+ return -1;
+ }
+
+ if (set) {
+ flags |= FD_CLOEXEC;
+ } else {
+ flags &= ~FD_CLOEXEC;
+ }
+
+ if (fcntl(fd, F_SETFD, flags) == -1) {
+ return -1;
+ }
+
+ return 0;
+}
+
+
+/* TODO move to uv-common.c? */
+size_t uv__strlcpy(char* dst, const char* src, size_t size) {
+ const char *org;
+
+ if (size == 0) {
+ return 0;
+ }
+
+ org = src;
+ while (size > 1) {
+ if ((*dst++ = *src++) == '\0') {
+ return org - src;
+ }
+ }
+ *dst = '\0';
+
+ return src - org;
}
diff --git a/deps/uv/src/uv-win.c b/deps/uv/src/uv-win.c
index 111d99cb001..8241340ae81 100644
--- a/deps/uv/src/uv-win.c
+++ b/deps/uv/src/uv-win.c
@@ -222,6 +222,7 @@ static char uv_zero_[] = "";
/* mark if IPv6 sockets are supported */
static BOOL uv_allow_ipv6 = FALSE;
+
/*
* Subclass of uv_handle_t. Used for integration of c-ares.
*/
@@ -374,6 +375,7 @@ static uv_err_code uv_translate_sys_error(int sys_errno) {
case ERROR_NO_UNICODE_TRANSLATION: return UV_ECHARSET;
case ERROR_BROKEN_PIPE: return UV_EOF;
case ERROR_PIPE_BUSY: return UV_EBUSY;
+ case ERROR_SEM_TIMEOUT: return UV_ETIMEDOUT;
default: return UV_UNKNOWN;
}
}
@@ -517,6 +519,7 @@ void uv_init() {
static void uv_req_init(uv_req_t* req) {
uv_counters()->req_init++;
req->type = UV_UNKNOWN_REQ;
+ req->error = uv_ok_;
}
@@ -1028,10 +1031,14 @@ static void uv_pipe_queue_accept(uv_pipe_t* handle, uv_pipe_accept_t* req) {
/* Prepare the overlapped structure. */
memset(&(req->overlapped), 0, sizeof(req->overlapped));
- if (!ConnectNamedPipe(pipeHandle, &req->overlapped) &&
- GetLastError() != ERROR_IO_PENDING && GetLastError() != ERROR_PIPE_CONNECTED) {
- /* Make this req pending reporting an error. */
- req->error = uv_new_sys_error(GetLastError());
+ if (!ConnectNamedPipe(pipeHandle, &req->overlapped) && GetLastError() != ERROR_IO_PENDING) {
+ if (GetLastError() == ERROR_PIPE_CONNECTED) {
+ req->pipeHandle = pipeHandle;
+ req->error = uv_ok_;
+ } else {
+ /* Make this req pending reporting an error. */
+ req->error = uv_new_sys_error(GetLastError());
+ }
uv_insert_pending_req((uv_req_t*) req);
handle->reqs_pending++;
return;
@@ -2314,9 +2321,7 @@ static void uv_poll() {
/* Package was dequeued */
req = uv_overlapped_to_req(overlapped);
- if (success) {
- req->error = uv_ok_;
- } else {
+ if (!success) {
req->error = uv_new_sys_error(GetLastError());
}
@@ -2970,6 +2975,7 @@ int uv_pipe_init(uv_pipe_t* handle) {
handle->type = UV_NAMED_PIPE;
handle->reqs_pending = 0;
handle->pending_accepts = NULL;
+ handle->name = NULL;
uv_counters()->pipe_init++;
@@ -3033,61 +3039,131 @@ int uv_pipe_listen(uv_pipe_t* handle, uv_connection_cb cb) {
return 0;
}
+
+static int uv_set_pipe_handle(uv_pipe_t* handle, HANDLE pipeHandle) {
+ DWORD mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT;
+
+ if (!SetNamedPipeHandleState(pipeHandle, &mode, NULL, NULL)) {
+ return -1;
+ }
+
+ if (CreateIoCompletionPort(pipeHandle,
+ uv_iocp_,
+ (ULONG_PTR)handle,
+ 0) == NULL) {
+ return -1;
+ }
+
+ return 0;
+}
+
+
+static DWORD WINAPI pipe_connect_thread_proc(void* parameter) {
+ HANDLE pipeHandle = INVALID_HANDLE_VALUE;
+ int errno;
+ uv_pipe_t* handle;
+ uv_connect_t* req;
+
+ req = (uv_connect_t*)parameter;
+ assert(req);
+ handle = (uv_pipe_t*)req->handle;
+ assert(handle);
+
+ /* We're here because CreateFile on a pipe returned ERROR_PIPE_BUSY. We wait for the pipe to become available with WaitNamedPipe. */
+ while (WaitNamedPipe(handle->name, 30000)) {
+ /* The pipe is now available, try to connect. */
+ pipeHandle = CreateFile(handle->name,
+ GENERIC_READ | GENERIC_WRITE,
+ 0,
+ NULL,
+ OPEN_EXISTING,
+ FILE_FLAG_OVERLAPPED,
+ NULL);
+
+ if (pipeHandle != INVALID_HANDLE_VALUE) {
+ break;
+ }
+ }
+
+ if (pipeHandle != INVALID_HANDLE_VALUE && !uv_set_pipe_handle(handle, pipeHandle)) {
+ handle->handle = pipeHandle;
+ req->error = uv_ok_;
+ } else {
+ req->error = uv_new_sys_error(GetLastError());
+ }
+
+ memset(&req->overlapped, 0, sizeof(req->overlapped));
+
+ /* Post completed */
+ if (!PostQueuedCompletionStatus(uv_iocp_,
+ 0,
+ 0,
+ &req->overlapped)) {
+ uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
+ }
+
+ return 0;
+}
+
+
/* TODO: make this work with UTF8 name */
-/* TODO: run this in the thread pool */
int uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
const char* name, uv_connect_cb cb) {
int errno;
- DWORD mode;
+ HANDLE pipeHandle;
+
+ handle->handle = INVALID_HANDLE_VALUE;
uv_req_init((uv_req_t*) req);
req->type = UV_CONNECT;
req->handle = (uv_stream_t*) handle;
req->cb = cb;
- memset(&req->overlapped, 0, sizeof(req->overlapped));
-
- handle->handle = CreateFile(name,
- GENERIC_READ | GENERIC_WRITE,
- 0,
- NULL,
- OPEN_EXISTING,
- FILE_FLAG_OVERLAPPED,
- NULL);
+ pipeHandle = CreateFile(name,
+ GENERIC_READ | GENERIC_WRITE,
+ 0,
+ NULL,
+ OPEN_EXISTING,
+ FILE_FLAG_OVERLAPPED,
+ NULL);
- if (handle->handle == INVALID_HANDLE_VALUE &&
- GetLastError() != ERROR_IO_PENDING) {
- errno = GetLastError();
- goto error;
- }
+ if (pipeHandle == INVALID_HANDLE_VALUE) {
+ if (GetLastError() == ERROR_PIPE_BUSY) {
+ /* Wait for the server to make a pipe instance available. */
+ handle->name = strdup(name);
+ if (!handle->name) {
+ uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
+ }
- mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT;
+ if (!QueueUserWorkItem(&pipe_connect_thread_proc, req, WT_EXECUTELONGFUNCTION)) {
+ errno = GetLastError();
+ goto error;
+ }
- if (!SetNamedPipeHandleState(handle->handle, &mode, NULL, NULL)) {
+ return 0;
+ }
+
errno = GetLastError();
goto error;
}
-
- if (CreateIoCompletionPort(handle->handle,
- uv_iocp_,
- (ULONG_PTR)handle,
- 0) == NULL) {
+
+ if (uv_set_pipe_handle((uv_pipe_t*)req->handle, pipeHandle)) {
errno = GetLastError();
goto error;
}
+ handle->handle = pipeHandle;
+
req->error = uv_ok_;
uv_insert_pending_req((uv_req_t*) req);
handle->reqs_pending++;
return 0;
error:
- if (handle->handle != INVALID_HANDLE_VALUE) {
- CloseHandle(handle->handle);
+ if (pipeHandle != INVALID_HANDLE_VALUE) {
+ CloseHandle(pipeHandle);
}
- req->error = uv_new_sys_error(errno);
- uv_insert_pending_req((uv_req_t*) req);
- handle->reqs_pending++;
+ uv_set_sys_error(errno);
return -1;
}
@@ -3097,6 +3173,11 @@ static void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err) {
int i;
HANDLE pipeHandle;
+ if (handle->name) {
+ free(handle->name);
+ handle->name;
+ }
+
if (handle->flags & UV_HANDLE_PIPESERVER) {
for (i = 0; i < COUNTOF(handle->accept_reqs); i++) {
pipeHandle = handle->accept_reqs[i].pipeHandle;
@@ -3105,7 +3186,7 @@ static void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err) {
}
}
- } else {
+ } else if (handle->handle != INVALID_HANDLE_VALUE) {
CloseHandle(handle->handle);
}
diff --git a/deps/uv/test/benchmark-pump.c b/deps/uv/test/benchmark-pump.c
index d7524f74705..1732e84ff54 100644
--- a/deps/uv/test/benchmark-pump.c
+++ b/deps/uv/test/benchmark-pump.c
@@ -261,13 +261,6 @@ static void maybe_connect_some() {
req = (uv_connect_t*) req_alloc();
r = uv_pipe_connect(req, pipe, TEST_PIPENAME, connect_cb);
ASSERT(r == 0);
-
-#ifdef _WIN32
- /* HACK: This is temporary to give the pipes server enough time to create new handles.
- * This will go away once uv_pipe_connect can deal with UV_EBUSY.
- */
- Sleep(1);
-#endif
}
}
}
diff --git a/deps/uv/test/echo-server.c b/deps/uv/test/echo-server.c
index e107dc5b95a..992c88bb2d1 100644
--- a/deps/uv/test/echo-server.c
+++ b/deps/uv/test/echo-server.c
@@ -124,7 +124,7 @@ static uv_buf_t echo_alloc(uv_stream_t* handle, size_t suggested_size) {
static void on_connection(uv_handle_t* server, int status) {
- uv_handle_t* handle;
+ uv_stream_t* stream;
int r;
if (status != 0) {
@@ -132,25 +132,31 @@ static void on_connection(uv_handle_t* server, int status) {
}
ASSERT(status == 0);
- if (serverType == TCP) {
- handle = (uv_handle_t*) malloc(sizeof(uv_tcp_t));
- ASSERT(handle != NULL);
-
- uv_tcp_init((uv_tcp_t*)handle);
- } else {
- handle = (uv_handle_t*) malloc(sizeof(uv_pipe_t));
- ASSERT(handle != NULL);
-
- uv_pipe_init((uv_pipe_t*)handle);
+ switch (serverType) {
+ case TCP:
+ stream = malloc(sizeof(uv_tcp_t));
+ ASSERT(stream != NULL);
+ uv_tcp_init((uv_tcp_t*)stream);
+ break;
+
+ case PIPE:
+ stream = malloc(sizeof(uv_pipe_t));
+ ASSERT(stream != NULL);
+ uv_pipe_init((uv_pipe_t*)stream);
+ break;
+
+ default:
+ ASSERT(0 && "Bad serverType");
+ abort();
}
/* associate server with stream */
- handle->data = server;
+ stream->data = server;
- r = uv_accept(server, (uv_stream_t*)handle);
+ r = uv_accept(server, stream);
ASSERT(r == 0);
- r = uv_read_start((uv_stream_t*)handle, echo_alloc, after_read);
+ r = uv_read_start(stream, echo_alloc, after_read);
ASSERT(r == 0);
}
@@ -233,22 +239,19 @@ static int pipe_echo_start(char* pipeName) {
r = uv_pipe_init(&pipeServer);
if (r) {
- /* TODO: Error codes */
- fprintf(stderr, "Pipe creation error\n");
+ fprintf(stderr, "uv_pipe_init: %s\n", uv_strerror(uv_last_error()));
return 1;
}
r = uv_pipe_bind(&pipeServer, pipeName);
if (r) {
- /* TODO: Error codes */
- fprintf(stderr, "create error\n");
+ fprintf(stderr, "uv_pipe_bind: %s\n", uv_strerror(uv_last_error()));
return 1;
}
r = uv_pipe_listen(&pipeServer, on_connection);
if (r) {
- /* TODO: Error codes */
- fprintf(stderr, "Listen error on IPv6\n");
+ fprintf(stderr, "uv_pipe_listen: %s\n", uv_strerror(uv_last_error()));
return 1;
}
diff --git a/deps/uv/test/task.h b/deps/uv/test/task.h
index d47c209480e..2c0febdad96 100644
--- a/deps/uv/test/task.h
+++ b/deps/uv/test/task.h
@@ -33,8 +33,7 @@
#ifdef _WIN32
# define TEST_PIPENAME "\\\\.\\pipe\\uv-test"
#else
-# /* TODO: define unix pipe name */
-# define TEST_PIPENAME ""
+# define TEST_PIPENAME "/tmp/uv-test-sock"
#endif
typedef enum {