diff options
40 files changed, 1629 insertions, 647 deletions
diff --git a/deps/uv/.mailmap b/deps/uv/.mailmap index e5929405839..2d608f95f4f 100644 --- a/deps/uv/.mailmap +++ b/deps/uv/.mailmap @@ -7,3 +7,4 @@ San-Tai Hsu <vanilla@fatpipi.com> Isaac Z. Schlueter <i@izs.me> Saúl Ibarra Corretgé <saghul@gmail.com> +Yuki OKUMURA <mjt@cltn.org> diff --git a/deps/uv/AUTHORS b/deps/uv/AUTHORS index 00512c46c1f..0b852b12c1f 100644 --- a/deps/uv/AUTHORS +++ b/deps/uv/AUTHORS @@ -27,3 +27,5 @@ Pieter Noordhuis <pcnoordhuis@gmail.com> Marek Jelen <marek@jelen.biz> Fedor Indutny <fedor.indutny@gmail.com> Saúl Ibarra Corretgé <saghul@gmail.com> +Felix Geisendörfer <felix@debuggable.com> +Yuki OKUMURA <mjt@cltn.org> diff --git a/deps/uv/Makefile b/deps/uv/Makefile index 59be3de08f3..884f514bf22 100644 --- a/deps/uv/Makefile +++ b/deps/uv/Makefile @@ -77,8 +77,8 @@ else include config-unix.mk endif -TESTS=test/echo-server.c test/test-*.c -BENCHMARKS=test/echo-server.c test/dns-server.c test/benchmark-*.c +TESTS=test/blackhole-server.c test/echo-server.c test/test-*.c +BENCHMARKS=test/blackhole-server.c test/echo-server.c test/dns-server.c test/benchmark-*.c all: uv.a test/run-tests$(E) test/run-benchmarks$(E) diff --git a/deps/uv/include/uv-private/uv-unix.h b/deps/uv/include/uv-private/uv-unix.h index 1b6d86df606..72709c8a588 100644 --- a/deps/uv/include/uv-private/uv-unix.h +++ b/deps/uv/include/uv-private/uv-unix.h @@ -91,8 +91,6 @@ typedef int uv_file; #define UV_STREAM_PRIVATE_FIELDS \ - uv_read_cb read_cb; \ - uv_alloc_cb alloc_cb; \ uv_connect_t *connect_req; \ uv_shutdown_t *shutdown_req; \ ev_io read_watcher; \ diff --git a/deps/uv/include/uv-private/uv-win.h b/deps/uv/include/uv-private/uv-win.h index 6610e016fe2..7ede6882774 100644 --- a/deps/uv/include/uv-private/uv-win.h +++ b/deps/uv/include/uv-private/uv-win.h @@ -98,7 +98,7 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s); struct uv_req_s* next_req; #define UV_WRITE_PRIVATE_FIELDS \ - /* empty */ + int ipc_header; #define UV_CONNECT_PRIVATE_FIELDS \ /* empty */ @@ -120,6 +120,8 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s); UV_REQ_FIELDS \ SOCKET accept_socket; \ char accept_buffer[sizeof(struct sockaddr_storage) * 2 + 32]; \ + HANDLE event_handle; \ + HANDLE wait_handle; \ struct uv_tcp_accept_s* next_pending; \ } uv_tcp_accept_t; @@ -132,8 +134,6 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s); #define UV_STREAM_PRIVATE_FIELDS \ unsigned int reqs_pending; \ - uv_alloc_cb alloc_cb; \ - uv_read_cb read_cb; \ uv_req_t read_req; \ union { \ struct { uv_stream_connection_fields }; \ @@ -142,10 +142,12 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s); #define uv_tcp_server_fields \ uv_tcp_accept_t* accept_reqs; \ - uv_tcp_accept_t* pending_accepts; + uv_tcp_accept_t* pending_accepts; \ + LPFN_ACCEPTEX func_acceptex; #define uv_tcp_connection_fields \ - uv_buf_t read_buffer; + uv_buf_t read_buffer; \ + LPFN_CONNECTEX func_connectex; #define UV_TCP_PRIVATE_FIELDS \ SOCKET socket; \ @@ -166,11 +168,15 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s); uv_alloc_cb alloc_cb; #define uv_pipe_server_fields \ - uv_pipe_accept_t accept_reqs[4]; \ - uv_pipe_accept_t* pending_accepts; + uv_pipe_accept_t accept_reqs[4]; \ + uv_pipe_accept_t* pending_accepts; #define uv_pipe_connection_fields \ - uv_timer_t* eof_timer; + uv_timer_t* eof_timer; \ + uv_write_t ipc_header_write_req; \ + int ipc_pid; \ + uint64_t remaining_ipc_rawdata_bytes; \ + WSAPROTOCOL_INFOW* pending_socket_info; #define UV_PIPE_PRIVATE_FIELDS \ HANDLE handle; \ diff --git a/deps/uv/include/uv.h b/deps/uv/include/uv.h index 824b3c41787..8ea7ab0ebbd 100644 --- a/deps/uv/include/uv.h +++ b/deps/uv/include/uv.h @@ -41,6 +41,96 @@ extern "C" { typedef intptr_t ssize_t; #endif +#if defined(__unix__) || defined(__POSIX__) || defined(__APPLE__) +# include "uv-private/uv-unix.h" +#else +# include "uv-private/uv-win.h" +#endif + +/* Expand this list if necessary. */ +typedef enum { + UV_UNKNOWN = -1, + UV_OK = 0, + UV_EOF, + UV_EACCESS, + UV_EAGAIN, + UV_EADDRINUSE, + UV_EADDRNOTAVAIL, + UV_EAFNOSUPPORT, + UV_EALREADY, + UV_EBADF, + UV_EBUSY, + UV_ECONNABORTED, + UV_ECONNREFUSED, + UV_ECONNRESET, + UV_EDESTADDRREQ, + UV_EFAULT, + UV_EHOSTUNREACH, + UV_EINTR, + UV_EINVAL, + UV_EISCONN, + UV_EMFILE, + UV_EMSGSIZE, + UV_ENETDOWN, + UV_ENETUNREACH, + UV_ENFILE, + UV_ENOBUFS, + UV_ENOMEM, + UV_ENONET, + UV_ENOPROTOOPT, + UV_ENOTCONN, + UV_ENOTSOCK, + UV_ENOTSUP, + UV_ENOENT, + UV_EPIPE, + UV_EPROTO, + UV_EPROTONOSUPPORT, + UV_EPROTOTYPE, + UV_ETIMEDOUT, + UV_ECHARSET, + UV_EAIFAMNOSUPPORT, + UV_EAINONAME, + UV_EAISERVICE, + UV_EAISOCKTYPE, + UV_ESHUTDOWN, + UV_EEXIST +} uv_err_code; + +typedef enum { + UV_UNKNOWN_HANDLE = 0, + UV_TCP, + UV_UDP, + UV_NAMED_PIPE, + UV_TTY, + UV_FILE, + UV_TIMER, + UV_PREPARE, + UV_CHECK, + UV_IDLE, + UV_ASYNC, + UV_ARES_TASK, + UV_ARES_EVENT, + UV_PROCESS, + UV_FS_EVENT +} uv_handle_type; + +typedef enum { + UV_UNKNOWN_REQ = 0, + UV_CONNECT, + UV_ACCEPT, + UV_READ, + UV_WRITE, + UV_SHUTDOWN, + UV_WAKEUP, + UV_UDP_SEND, + UV_FS, + UV_WORK, + UV_GETADDRINFO, + UV_REQ_TYPE_PRIVATE +} uv_req_type; + + + typedef struct uv_loop_s uv_loop_t; typedef struct uv_ares_task_s uv_ares_task_t; typedef struct uv_err_s uv_err_t; @@ -69,12 +159,6 @@ typedef struct uv_fs_s uv_fs_t; typedef struct uv_fs_event_s uv_fs_event_t; typedef struct uv_work_s uv_work_t; -#if defined(__unix__) || defined(__POSIX__) || defined(__APPLE__) -# include "uv-private/uv-unix.h" -#else -# include "uv-private/uv-win.h" -#endif - /* * This function must be called before any other functions in libuv. @@ -121,6 +205,13 @@ int64_t uv_now(uv_loop_t*); */ typedef uv_buf_t (*uv_alloc_cb)(uv_handle_t* handle, size_t suggested_size); typedef void (*uv_read_cb)(uv_stream_t* stream, ssize_t nread, uv_buf_t buf); +/* + * Just like the uv_read_cb except that if the pending parameter is true + * then you can use uv_accept() to pull the new handle into the process. + * If no handle is pending then pending will be UV_UNKNOWN_HANDLE. + */ +typedef void (*uv_read2_cb)(uv_pipe_t* pipe, ssize_t nread, uv_buf_t buf, + uv_handle_type pending); typedef void (*uv_write_cb)(uv_write_t* req, int status); typedef void (*uv_connect_cb)(uv_connect_t* req, int status); typedef void (*uv_shutdown_cb)(uv_shutdown_t* req, int status); @@ -149,89 +240,6 @@ typedef void (*uv_fs_event_cb)(uv_fs_event_t* handle, const char* filename, int events, int status); -/* Expand this list if necessary. */ -typedef enum { - UV_UNKNOWN = -1, - UV_OK = 0, - UV_EOF, - UV_EACCESS, - UV_EAGAIN, - UV_EADDRINUSE, - UV_EADDRNOTAVAIL, - UV_EAFNOSUPPORT, - UV_EALREADY, - UV_EBADF, - UV_EBUSY, - UV_ECONNABORTED, - UV_ECONNREFUSED, - UV_ECONNRESET, - UV_EDESTADDRREQ, - UV_EFAULT, - UV_EHOSTUNREACH, - UV_EINTR, - UV_EINVAL, - UV_EISCONN, - UV_EMFILE, - UV_EMSGSIZE, - UV_ENETDOWN, - UV_ENETUNREACH, - UV_ENFILE, - UV_ENOBUFS, - UV_ENOMEM, - UV_ENONET, - UV_ENOPROTOOPT, - UV_ENOTCONN, - UV_ENOTSOCK, - UV_ENOTSUP, - UV_ENOENT, - UV_EPIPE, - UV_EPROTO, - UV_EPROTONOSUPPORT, - UV_EPROTOTYPE, - UV_ETIMEDOUT, - UV_ECHARSET, - UV_EAIFAMNOSUPPORT, - UV_EAINONAME, - UV_EAISERVICE, - UV_EAISOCKTYPE, - UV_ESHUTDOWN, - UV_EEXIST -} uv_err_code; - -typedef enum { - UV_UNKNOWN_HANDLE = 0, - UV_TCP, - UV_UDP, - UV_NAMED_PIPE, - UV_TTY, - UV_FILE, - UV_TIMER, - UV_PREPARE, - UV_CHECK, - UV_IDLE, - UV_ASYNC, - UV_ARES_TASK, - UV_ARES_EVENT, - UV_PROCESS, - UV_FS_EVENT -} uv_handle_type; - -typedef enum { - UV_UNKNOWN_REQ = 0, - UV_CONNECT, - UV_ACCEPT, - UV_READ, - UV_WRITE, - UV_SHUTDOWN, - UV_WAKEUP, - UV_UDP_SEND, - UV_FS, - UV_WORK, - UV_GETADDRINFO, - UV_REQ_TYPE_PRIVATE -} uv_req_type; - - struct uv_err_s { /* read-only */ uv_err_code code; @@ -330,6 +338,9 @@ uv_buf_t uv_buf_init(char* base, size_t len); #define UV_STREAM_FIELDS \ /* number of bytes queued for writing */ \ size_t write_queue_size; \ + uv_alloc_cb alloc_cb; \ + uv_read_cb read_cb; \ + uv_read2_cb read2_cb; \ /* private */ \ UV_STREAM_PRIVATE_FIELDS @@ -338,8 +349,8 @@ uv_buf_t uv_buf_init(char* base, size_t len); * * uv_stream is an abstract class. * - * uv_stream_t is the parent class of uv_tcp_t, uv_pipe_t, uv_tty_t - * and soon uv_file_t. + * uv_stream_t is the parent class of uv_tcp_t, uv_pipe_t, uv_tty_t, and + * soon uv_file_t. */ struct uv_stream_s { UV_HANDLE_FIELDS @@ -375,13 +386,12 @@ int uv_read_start(uv_stream_t*, uv_alloc_cb alloc_cb, uv_read_cb read_cb); int uv_read_stop(uv_stream_t*); -typedef enum { - UV_STDIN = 0, - UV_STDOUT, - UV_STDERR -} uv_std_type; +/* + * Extended read methods for receiving handles over a pipe. The pipe must be + * initialized with ipc == 1. + */ +int uv_read2_start(uv_stream_t*, uv_alloc_cb alloc_cb, uv_read2_cb read_cb); -uv_stream_t* uv_std_handle(uv_loop_t*, uv_std_type type); /* * Write data to stream. Buffers are written in order. Example: @@ -404,10 +414,14 @@ uv_stream_t* uv_std_handle(uv_loop_t*, uv_std_type type); int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt, uv_write_cb cb); +int uv_write2(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt, + uv_stream_t* send_handle, uv_write_cb cb); + /* uv_write_t is a subclass of uv_req_t */ struct uv_write_s { UV_REQ_FIELDS uv_write_cb cb; + uv_stream_t* send_handle; uv_stream_t* handle; UV_WRITE_PRIVATE_FIELDS }; @@ -648,9 +662,14 @@ struct uv_pipe_s { UV_HANDLE_FIELDS UV_STREAM_FIELDS UV_PIPE_PRIVATE_FIELDS + int ipc; /* non-zero if this pipe is used for passing handles */ }; -int uv_pipe_init(uv_loop_t*, uv_pipe_t* handle); +/* + * Initialize a pipe. The last argument is a boolean to indicate if + * this pipe will be used for handle passing between processes. + */ +int uv_pipe_init(uv_loop_t*, uv_pipe_t* handle, int ipc); /* * Opens an existing file descriptor or HANDLE as a pipe. diff --git a/deps/uv/src/unix/core.c b/deps/uv/src/unix/core.c index 719327a9c51..c834aaaea96 100644 --- a/deps/uv/src/unix/core.c +++ b/deps/uv/src/unix/core.c @@ -790,10 +790,3 @@ size_t uv__strlcpy(char* dst, const char* src, size_t size) { return src - org; } - - -uv_stream_t* uv_std_handle(uv_loop_t* loop, uv_std_type type) { - assert(0 && "implement me"); - return NULL; -} - diff --git a/deps/uv/src/unix/error.c b/deps/uv/src/unix/error.c index cd8bb30669a..5ce2156e03c 100644 --- a/deps/uv/src/unix/error.c +++ b/deps/uv/src/unix/error.c @@ -56,8 +56,28 @@ void uv_fatal_error(const int errorno, const char* syscall) { } -char* uv_strerror(uv_err_t err) { - return strerror(err.sys_errno_); +static int uv__translate_lib_error(int code) { + switch (code) { + case UV_ENOENT: return ENOENT; + case UV_EACCESS: return EACCES; + case UV_EBADF: return EBADF; + case UV_EPIPE: return EPIPE; + case UV_EAGAIN: return EAGAIN; + case UV_ECONNRESET: return ECONNRESET; + case UV_EFAULT: return EFAULT; + case UV_EMFILE: return EMFILE; + case UV_EMSGSIZE: return EMSGSIZE; + case UV_EINVAL: return EINVAL; + case UV_ECONNREFUSED: return ECONNREFUSED; + case UV_EADDRINUSE: return EADDRINUSE; + case UV_EADDRNOTAVAIL: return EADDRNOTAVAIL; + case UV_ENOTCONN: return ENOTCONN; + case UV_EEXIST: return EEXIST; + default: return -1; + } + + assert(0 && "unreachable"); + return -1; } @@ -85,3 +105,22 @@ uv_err_code uv_translate_sys_error(int sys_errno) { assert(0 && "unreachable"); return -1; } + + +/* TODO Pull in error messages so we don't have to + * a) rely on what the system provides us + * b) reverse-map the error codes + */ +char* uv_strerror(uv_err_t err) { + int errorno; + + if (err.sys_errno_) + errorno = err.sys_errno_; + else + errorno = uv__translate_lib_error(err.code); + + if (errorno == -1) + return "Unknown error"; + else + return strerror(errorno); +} diff --git a/deps/uv/src/unix/freebsd.c b/deps/uv/src/unix/freebsd.c index 49aeb95ec89..5609ac4c07c 100644 --- a/deps/uv/src/unix/freebsd.c +++ b/deps/uv/src/unix/freebsd.c @@ -27,7 +27,9 @@ #include <sys/types.h> #include <sys/resource.h> #include <sys/sysctl.h> +#include <vm/vm_param.h> /* VM_LOADAVG */ #include <time.h> +#include <unistd.h> /* sysconf */ #undef NANOSEC #define NANOSEC 1000000000 @@ -69,15 +71,15 @@ int uv_exepath(char* buffer, size_t* size) { } double uv_get_free_memory(void) { - vm_statistics_data_t info; - mach_msg_type_number_t count = sizeof(info) / sizeof(integer_t); + int freecount; + size_t size = sizeof(freecount); - if (host_statistics(mach_host_self(), HOST_VM_INFO, - (host_info_t)&info, &count) != KERN_SUCCESS) { + if(sysctlbyname("vm.stats.vm.v_free_count", + &freecount, &size, NULL, 0) == -1){ return -1; } + return (double) freecount * sysconf(_SC_PAGESIZE); - return (double) info.free_count * sysconf(_SC_PAGESIZE); } double uv_get_total_memory(void) { diff --git a/deps/uv/src/unix/pipe.c b/deps/uv/src/unix/pipe.c index 86c11deaa20..dabdcd6cff8 100644 --- a/deps/uv/src/unix/pipe.c +++ b/deps/uv/src/unix/pipe.c @@ -29,10 +29,12 @@ #include <unistd.h> #include <stdlib.h> -int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle) { + +int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) { uv__stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE); loop->counters.pipe_init++; handle->pipe_fname = NULL; + handle->ipc = ipc; return 0; } diff --git a/deps/uv/src/unix/process.c b/deps/uv/src/unix/process.c index 487f2075946..06af65d5b8a 100644 --- a/deps/uv/src/unix/process.c +++ b/deps/uv/src/unix/process.c @@ -1,4 +1,3 @@ - /* Copyright Joyent, Inc. and other Node contributors. All rights reserved. * * Permission is hereby granted, free of charge, to any person obtaining a copy @@ -63,6 +62,34 @@ static void uv__chld(EV_P_ ev_child* watcher, int revents) { } } + +/* + * Used for initializing stdio streams like options.stdin_stream. Returns + * zero on success. + */ +static int uv__process_init_pipe(uv_pipe_t* handle, int fds[2]) { + if (handle->type != UV_NAMED_PIPE) { + errno = EINVAL; + return -1; + } + + if (handle->ipc) { + if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0) { + return -1; + } + } else { + if (pipe(fds) < 0) { + return -1; + } + } + + uv__cloexec(fds[0], 1); + uv__cloexec(fds[1], 1); + + return 0; +} + + #ifndef SPAWN_WAIT_EXEC # define SPAWN_WAIT_EXEC 1 #endif @@ -89,43 +116,19 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process, process->exit_cb = options.exit_cb; - if (options.stdin_stream) { - if (options.stdin_stream->type != UV_NAMED_PIPE) { - errno = EINVAL; - goto error; - } - - if (pipe(stdin_pipe) < 0) { - goto error; - } - uv__cloexec(stdin_pipe[0], 1); - uv__cloexec(stdin_pipe[1], 1); + if (options.stdin_stream && + uv__process_init_pipe(options.stdin_stream, stdin_pipe)) { + goto error; } - if (options.stdout_stream) { - if (options.stdout_stream->type != UV_NAMED_PIPE) { - errno = EINVAL; - goto error; - } - - if (pipe(stdout_pipe) < 0) { - goto error; - } - uv__cloexec(stdout_pipe[0], 1); - uv__cloexec(stdout_pipe[1], 1); + if (options.stdout_stream && + uv__process_init_pipe(options.stdout_stream, stdout_pipe)) { + goto error; } - if (options.stderr_stream) { - if (options.stderr_stream->type != UV_NAMED_PIPE) { - errno = EINVAL; - goto error; - } - - if (pipe(stderr_pipe) < 0) { - goto error; - } - uv__cloexec(stderr_pipe[0], 1); - uv__cloexec(stderr_pipe[1], 1); + if (options.stderr_stream && + uv__process_init_pipe(options.stderr_stream, stderr_pipe)) { + goto error; } /* This pipe is used by the parent to wait until @@ -154,7 +157,7 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process, goto error; } # else - if (pipe(signal_pipe) < 0) { + if (socketpair(AF_UNIX, SOCK_STREAM, 0, signal_pipe) < 0) { goto error; } uv__cloexec(signal_pipe[0], 1); diff --git a/deps/uv/src/unix/stream.c b/deps/uv/src/unix/stream.c index f7c0a68469b..0268158252b 100644 --- a/deps/uv/src/unix/stream.c +++ b/deps/uv/src/unix/stream.c @@ -29,6 +29,8 @@ #include <string.h> #include <sys/uio.h> +#include <stdio.h> + static void uv__stream_connect(uv_stream_t*); static void uv__write(uv_stream_t* stream); @@ -349,14 +351,43 @@ static void uv__write(uv_stream_t* stream) { * inside the iov each time we write. So there is no need to offset it. */ - do { - if (iovcnt == 1) { - n = write(stream->fd, iov[0].iov_base, iov[0].iov_len); - } else { - n = writev(stream->fd, iov, iovcnt); + if (req->send_handle) { + struct msghdr msg; + char scratch[64]; + struct cmsghdr *cmsg; + int fd_to_send = req->send_handle->fd; + + assert(fd_to_send >= 0); + + msg.msg_name = NULL; + msg.msg_namelen = 0; + msg.msg_iov = iov; + msg.msg_iovlen = iovcnt; + msg.msg_flags = 0; + + msg.msg_control = (void*) scratch; + msg.msg_controllen = CMSG_LEN(sizeof(fd_to_send)); + + cmsg = CMSG_FIRSTHDR(&msg); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + cmsg->cmsg_len = msg.msg_controllen; + *(int*) CMSG_DATA(cmsg) = fd_to_send; + + do { + n = sendmsg(stream->fd, &msg, 0); + } + while (n == -1 && errno == EINTR); + } else { + do { + if (iovcnt == 1) { + n = write(stream->fd, iov[0].iov_base, iov[0].iov_len); + } else { + n = writev(stream->fd, iov, iovcnt); + } } + while (n == -1 && errno == EINTR); } - while (n == -1 && errno == EINTR); if (n < 0) { if (errno != EAGAIN) { @@ -447,12 +478,17 @@ static void uv__write_callbacks(uv_stream_t* stream) { static void uv__read(uv_stream_t* stream) { uv_buf_t buf; ssize_t nread; + struct msghdr msg; + struct cmsghdr* cmsg; + char cmsg_space[64]; + int received_fd = -1; struct ev_loop* ev = stream->loop->ev; /* XXX: Maybe instead of having UV_READING we just test if * tcp->read_cb is NULL or not? */ - while (stream->read_cb && ((uv_handle_t*)stream)->flags & UV_READING) { + while ((stream->read_cb || stream->read2_cb) && + stream->flags & UV_READING) { assert(stream->alloc_cb); buf = stream->alloc_cb((uv_handle_t*)stream, 64 * 1024); @@ -460,10 +496,29 @@ static void uv__read(uv_stream_t* stream) { assert(buf.base); assert(stream->fd >= 0); - do { - nread = read(stream->fd, buf.base, buf.len); + if (stream->read_cb) { + do { + nread = read(stream->fd, buf.base, buf.len); + } + while (nread < 0 && errno == EINTR); + } else { + assert(stream->read2_cb); + /* read2_cb uses recvmsg */ + msg.msg_flags = 0; + msg.msg_iov = (struct iovec*) &buf; + msg.msg_iovlen = 1; + msg.msg_name = NULL; + msg.msg_namelen = 0; + /* Set up to receive a descriptor even if one isn't in the message */ + msg.msg_controllen = 64; + msg.msg_control = (void *) cmsg_space; + + do { + nread = recvmsg(stream->fd, &msg, 0); + } + while (nread < 0 && errno == EINTR); } - while (nread < 0 && errno == EINTR); + if (nread < 0) { /* Error */ @@ -473,24 +528,78 @@ static void uv__read(uv_stream_t* stream) { ev_io_start(ev, &stream->read_watcher); } uv__set_sys_error(stream->loop, EAGAIN); - stream->read_cb(stream, 0, buf); + + if (stream->read_cb) { + stream->read_cb(stream, 0, buf); + } else { + stream->read2_cb((uv_pipe_t*)stream, 0, buf, UV_UNKNOWN_HANDLE); + } + return; } else { /* Error. User should call uv_close(). */ uv__set_sys_error(stream->loop, errno); - stream->read_cb(stream, -1, buf); + + if (stream->read_cb) { + stream->read_cb(stream, -1, buf); + } else { + stream->read2_cb((uv_pipe_t*)stream, -1, buf, UV_UNKNOWN_HANDLE); + } + assert(!ev_is_active(&stream->read_watcher)); return; } + } else if (nread == 0) { /* EOF */ uv__set_artificial_error(stream->loop, UV_EOF); ev_io_stop(ev, &stream->read_watcher); - stream->read_cb(stream, -1, buf); + + if (stream->read_cb) { + stream->read_cb(stream, -1, buf); + } else { + stream->read2_cb((uv_pipe_t*)stream, -1, buf, UV_UNKNOWN_HANDLE); + } return; } else { /* Successful read */ - stream->read_cb(stream, nread, buf); + + if (stream->read_cb) { + stream->read_cb(stream, nread, buf); + } else { + assert(stream->read2_cb); + + /* + * XXX: Some implementations can send multiple file descriptors in a + * single message. We should be using CMSG_NXTHDR() to walk the + * chain to get at them all. This would require changing the API to + * hand these back up the caller, is a pain. + */ + + for (cmsg = CMSG_FIRSTHDR(&msg); + msg.msg_controllen > 0 && cmsg != NULL; + cmsg = CMSG_NXTHDR(&msg, cmsg)) { + + if (cmsg->cmsg_type == SCM_RIGHTS) { + if (stream->accepted_fd != -1) { + fprintf(stderr, "(libuv) ignoring extra FD received\n"); + } + + stream->accepted_fd = *(int *) CMSG_DATA(cmsg); + + } else { + fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n", + cmsg->cmsg_type); + } + } + + + if (stream->accepted_fd >= 0) { + stream->read2_cb((uv_pipe_t*)stream, nread, buf, UV_TCP); + } else { + stream->read2_cb((uv_pipe_t*)stream, nread, buf, UV_UNKNOWN_HANDLE); + } + } } } } @@ -672,11 +781,8 @@ int uv__connect(uv_connect_t* req, uv_stream_t* stream, struct sockaddr* addr, } -/* The buffers to be written must remain valid until the callback is called. - * This is not required for the uv_buf_t array. - */ -int uv_write(uv_write_t* req, uv_stream_t* stream, uv_buf_t bufs[], int bufcnt, - uv_write_cb cb) { +int uv_write2(uv_write_t* req, uv_stream_t* stream, uv_buf_t bufs[], int bufcnt, + uv_stream_t* send_handle, uv_write_cb cb) { int empty_queue; assert((stream->type == UV_TCP || stream->type == UV_NAMED_PIPE || @@ -688,6 +794,13 @@ int uv_write(uv_write_t* req, uv_stream_t* stream, uv_buf_t bufs[], int bufcnt, return -1; } + if (send_handle) { + if (stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc) { + uv__set_sys_error(stream->loop, EOPNOTSUPP); + return -1; + } + } + empty_queue = (stream->write_queue_size == 0); /* Initialize the req */ @@ -695,6 +808,7 @@ int uv_write(uv_write_t* req, uv_stream_t* stream, uv_buf_t bufs[], int bufcnt, req->cb = cb; req->handle = stream; req->error = 0; + req->send_handle = send_handle; req->type = UV_WRITE; ngx_queue_init(&req->queue); @@ -737,7 +851,17 @@ int uv_write(uv_write_t* req, uv_stream_t* stream, uv_buf_t bufs[], int bufcnt, } -int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, uv_read_cb read_cb) { +/* The buffers to be written must remain valid until the callback is called. + * This is not required for the uv_buf_t array. + */ +int uv_write(uv_write_t* req, uv_stream_t* stream, uv_buf_t bufs[], int bufcnt, + uv_write_cb cb) { + return uv_write2(req, stream, bufs, bufcnt, NULL, cb); +} + + +int uv__read_start_common(uv_stream_t* stream, uv_alloc_cb alloc_cb, + uv_read_cb read_cb, uv_read2_cb read2_cb) { assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE || stream->type == UV_TTY); @@ -759,6 +883,7 @@ int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, uv_read_cb read_cb) assert(alloc_cb); stream->read_cb = read_cb; + stream->read2_cb = read2_cb; stream->alloc_cb = alloc_cb; /* These should have been set by uv_tcp_init. */ @@ -769,6 +894,18 @@ int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, uv_read_cb read_cb) } +int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, + uv_read_cb read_cb) { + return uv__read_start_common(stream, alloc_cb, read_cb, NULL); +} + + +int uv_read2_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, + uv_read2_cb read_cb) { + return uv__read_start_common(stream, alloc_cb, NULL, read_cb); +} + + int uv_read_stop(uv_stream_t* stream) { uv_tcp_t* tcp = (uv_tcp_t*)stream; @@ -776,6 +913,7 @@ int uv_read_stop(uv_stream_t* stream) { ev_io_stop(tcp->loop->ev, &tcp->read_watcher); tcp->read_cb = NULL; + tcp->read2_cb = NULL; tcp->alloc_cb = NULL; return 0; } diff --git a/deps/uv/src/win/internal.h b/deps/uv/src/win/internal.h index f8762145796..7753e70bf4c 100644 --- a/deps/uv/src/win/internal.h +++ b/deps/uv/src/win/internal.h @@ -44,27 +44,28 @@ void uv_process_timers(uv_loop_t* loop); */ /* Private uv_handle flags */ -#define UV_HANDLE_CLOSING 0x0001 -#define UV_HANDLE_CLOSED 0x0002 -#define UV_HANDLE_BOUND 0x0004 -#define UV_HANDLE_LISTENING 0x0008 -#define UV_HANDLE_CONNECTION 0x0010 -#define UV_HANDLE_CONNECTED 0x0020 -#define UV_HANDLE_READING 0x0040 -#define UV_HANDLE_ACTIVE 0x0040 -#define UV_HANDLE_EOF 0x0080 -#define UV_HANDLE_SHUTTING 0x0100 -#define UV_HANDLE_SHUT 0x0200 -#define UV_HANDLE_ENDGAME_QUEUED 0x0400 -#define UV_HANDLE_BIND_ERROR 0x1000 -#define UV_HANDLE_IPV6 0x2000 -#define UV_HANDLE_PIPESERVER 0x4000 -#define UV_HANDLE_READ_PENDING 0x8000 -#define UV_HANDLE_GIVEN_OS_HANDLE 0x10000 -#define UV_HANDLE_UV_ALLOCED 0x20000 -#define UV_HANDLE_SYNC_BYPASS_IOCP 0x40000 -#define UV_HANDLE_ZERO_READ 0x80000 -#define UV_HANDLE_TTY_RAW 0x100000 +#define UV_HANDLE_CLOSING 0x0001 +#define UV_HANDLE_CLOSED 0x0002 +#define UV_HANDLE_BOUND 0x0004 +#define UV_HANDLE_LISTENING 0x0008 +#define UV_HANDLE_CONNECTION 0x0010 +#define UV_HANDLE_CONNECTED 0x0020 +#define UV_HANDLE_READING 0x0040 +#define UV_HANDLE_ACTIVE 0x0040 +#define UV_HANDLE_EOF 0x0080 +#define UV_HANDLE_SHUTTING 0x0100 +#define UV_HANDLE_SHUT 0x0200 +#define UV_HANDLE_ENDGAME_QUEUED 0x0400 +#define UV_HANDLE_BIND_ERROR 0x1000 +#define UV_HANDLE_IPV6 0x2000 +#define UV_HANDLE_PIPESERVER 0x4000 +#define UV_HANDLE_READ_PENDING 0x8000 +#define UV_HANDLE_UV_ALLOCED 0x10000 +#define UV_HANDLE_SYNC_BYPASS_IOCP 0x20000 +#define UV_HANDLE_ZERO_READ 0x40000 +#define UV_HANDLE_TTY_RAW 0x80000 +#define UV_HANDLE_USE_IPC_PROTOCOL 0x100000 +#define UV_HANDLE_EMULATE_IOCP 0x200000 void uv_want_endgame(uv_loop_t* loop, uv_handle_t* handle); void uv_process_endgames(uv_loop_t* loop); @@ -97,8 +98,8 @@ uv_req_t* uv_overlapped_to_req(OVERLAPPED* overlapped); void uv_insert_pending_req(uv_loop_t* loop, uv_req_t* req); void uv_process_reqs(uv_loop_t* loop); -#define POST_COMPLETION_FOR_REQ(loop, req) \ - if (!PostQueuedCompletionStatus((loop)->iocp, \ +#define POST_COMPLETION_FOR_REQ(loop, req) \ + if (!PostQueuedCompletionStatus((loop)->iocp, \ 0, \ 0, \ &((req)->overlapped))) { \ @@ -135,6 +136,8 @@ void uv_process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle, void uv_tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle); +int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info); + /* * UDP @@ -149,19 +152,21 @@ void uv_udp_endgame(uv_loop_t* loop, uv_udp_t* handle); /* * Pipes */ -int uv_pipe_init_with_handle(uv_loop_t* loop, uv_pipe_t* handle, - HANDLE pipeHandle); int uv_stdio_pipe_server(uv_loop_t* loop, uv_pipe_t* handle, DWORD access, char* name, size_t nameSize); void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err); void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle); int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb); -int uv_pipe_accept(uv_pipe_t* server, uv_pipe_t* client); +int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client); int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, uv_read_cb read_cb); +int uv_pipe_read2_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, + uv_read2_cb read_cb); int uv_pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, uv_buf_t bufs[], int bufcnt, uv_write_cb cb); +int uv_pipe_write2(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, + uv_buf_t bufs[], int bufcnt, uv_stream_t* send_handle, uv_write_cb cb); void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, uv_req_t* req); @@ -267,6 +272,10 @@ void uv_fs_event_close(uv_loop_t* loop, uv_fs_event_t* handle); void uv_fs_event_endgame(uv_loop_t* loop, uv_fs_event_t* handle); +/* Utils */ +int uv_parent_pid(); + + /* * Error handling */ diff --git a/deps/uv/src/win/pipe.c b/deps/uv/src/win/pipe.c index c997a1e2160..34ff0245043 100644 --- a/deps/uv/src/win/pipe.c +++ b/deps/uv/src/win/pipe.c @@ -20,6 +20,7 @@ */ #include <assert.h> +#include <io.h> #include <string.h> #include <stdio.h> @@ -38,6 +39,22 @@ static const uv_buf_t uv_null_buf_ = { 0, NULL }; /* when the local ends wants to shut it down. */ static const int64_t eof_timeout = 50; /* ms */ +/* IPC protocol flags. */ +#define UV_IPC_RAW_DATA 0x0001 +#define UV_IPC_UV_STREAM 0x0002 + +/* IPC frame header. */ +typedef struct { + int flags; + uint64_t raw_data_length; +} uv_ipc_frame_header_t; + +/* IPC frame, which contains an imported TCP socket stream. */ +typedef struct { + uv_ipc_frame_header_t header; + WSAPROTOCOL_INFOW socket_info; +} uv_ipc_frame_uv_stream; + static void eof_timer_init(uv_pipe_t* pipe); static void eof_timer_start(uv_pipe_t* pipe); static void eof_timer_stop(uv_pipe_t* pipe); @@ -51,35 +68,26 @@ static void uv_unique_pipe_name(char* ptr, char* name, size_t size) { } -int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle) { +int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) { uv_stream_init(loop, (uv_stream_t*)handle); handle->type = UV_NAMED_PIPE; handle->reqs_pending = 0; handle->handle = INVALID_HANDLE_VALUE; handle->name = NULL; + handle->ipc_pid = 0; + handle->remaining_ipc_rawdata_bytes = 0; + handle->pending_socket_info = NULL; - loop->counters.pipe_init++; + uv_req_init(loop, (uv_req_t*) &handle->ipc_header_write_req); - return 0; -} - - -int uv_pipe_init_with_handle(uv_loop_t* loop, uv_pipe_t* handle, - HANDLE pipeHandle) { - int err = uv_pipe_init(loop, handle); - - if (!err) { - /* - * At this point we don't know whether the pipe will be used as a client - * or a server. So, we assume that it will be a client until - * uv_listen is called. - */ - handle->handle = pipeHandle; - handle->flags |= UV_HANDLE_GIVEN_OS_HANDLE; + if (ipc) { + handle->flags |= UV_HANDLE_USE_IPC_PROTOCOL; } - return err; + loop->counters.pipe_init++; + + return 0; } @@ -131,7 +139,6 @@ int uv_stdio_pipe_server(uv_loop_t* loop, uv_pipe_t* handle, DWORD access, uv_pipe_connection_init(handle); handle->handle = pipeHandle; - handle->flags |= UV_HANDLE_GIVEN_OS_HANDLE; err = 0; done: @@ -192,7 +199,6 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) { IO_STATUS_BLOCK io_status; FILE_PIPE_LOCAL_INFORMATION pipe_info; - if (handle->flags & UV_HANDLE_SHUTTING && !(handle->flags & UV_HANDLE_SHUT) && handle->write_reqs_pending == 0) { @@ -250,6 +256,13 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) { assert(!(handle->flags & UV_HANDLE_CLOSED)); handle->flags |= UV_HANDLE_CLOSED; + if (handle->flags & UV_HANDLE_CONNECTION) { + if (handle->pending_socket_info) { + free(handle->pending_socket_info); + handle->pending_socket_info = NULL; + } + } + /* Remember the state of this flag because the close callback is */ /* allowed to clobber or free the handle's memory */ uv_alloced = handle->flags & UV_HANDLE_UV_ALLOCED; @@ -567,30 +580,44 @@ static void uv_pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle, } -int uv_pipe_accept(uv_pipe_t* server, uv_pipe_t* client) { +int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) { uv_loop_t* loop = server->loop; - /* Find a connection instance that has been connected, but not yet */ - /* accepted. */ - uv_pipe_accept_t* req = server->pending_accepts; + uv_pipe_t* pipe_client; + uv_pipe_accept_t* req; - if (!req) { - /* No valid connections found, so we error out. */ - uv__set_sys_error(loop, WSAEWOULDBLOCK); - return -1; - } + if (server->flags & UV_HANDLE_USE_IPC_PROTOCOL) { + if (!server->pending_socket_info) { + /* No valid pending sockets. */ + uv__set_sys_error(loop, WSAEWOULDBLOCK); + return -1; + } - /* Initialize the client handle and copy the pipeHandle to the client */ - uv_pipe_connection_init(client); - client->handle = req->pipeHandle; + return uv_tcp_import((uv_tcp_t*)client, server->pending_socket_info); + } else { + pipe_client = (uv_pipe_t*)client; + + /* Find a connection instance that has been connected, but not yet */ + /* accepted. */ + req = server->pending_accepts; + + if (!req) { + /* No valid connections found, so we error out. */ + uv__set_sys_error(loop, WSAEWOULDBLOCK); + return -1; + } + + /* Initialize the client handle and copy the pipeHandle to the client */ + uv_pipe_connection_init(pipe_client); + pipe_client->handle = req->pipeHandle; - /* Prepare the req to pick up a new connection */ - server->pending_accepts = req->next_pending; - req->next_pending = NULL; - req->pipeHandle = INVALID_HANDLE_VALUE; + /* Prepare the req to pick up a new connection */ + server->pending_accepts = req->next_pending; + req->next_pending = NULL; + req->pipeHandle = INVALID_HANDLE_VALUE; - if (!(server->flags & UV_HANDLE_CLOSING) && - !(server->flags & UV_HANDLE_GIVEN_OS_HANDLE)) { - uv_pipe_queue_accept(loop, server, req, FALSE); + if (!(server->flags & UV_HANDLE_CLOSING)) { + uv_pipe_queue_accept(loop, server, req, FALSE); + } } return 0; @@ -602,11 +629,8 @@ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) { uv_loop_t* loop = handle->loop; int i, errno; - uv_pipe_accept_t* req; - HANDLE pipeHandle; - if (!(handle->flags & UV_HANDLE_BOUND) && - !(handle->flags & UV_HANDLE_GIVEN_OS_HANDLE)) { + if (!(handle->flags & UV_HANDLE_BOUND)) { uv__set_artificial_error(loop, UV_EINVAL); return -1; } @@ -617,8 +641,7 @@ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) { return -1; } - if (!(handle->flags & UV_HANDLE_PIPESERVER) && - !(handle->flags & UV_HANDLE_GIVEN_OS_HANDLE)) { + if (!(handle->flags & UV_HANDLE_PIPESERVER)) { uv__set_artificial_error(loop, UV_ENOTSUP); return -1; } @@ -626,30 +649,11 @@ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) { handle->flags |= UV_HANDLE_LISTENING; handle->connection_cb = cb; - if (handle->flags & UV_HANDLE_GIVEN_OS_HANDLE) { - handle->flags |= UV_HANDLE_PIPESERVER; - pipeHandle = handle->handle; - assert(pipeHandle != INVALID_HANDLE_VALUE); - req = &handle->accept_reqs[0]; - uv_req_init(loop, (uv_req_t*) req); - req->pipeHandle = pipeHandle; - req->type = UV_ACCEPT; - req->data = handle; - req->next_pending = NULL; - - if (uv_set_pipe_handle(loop, handle, pipeHandle)) { - uv__set_sys_error(loop, GetLastError()); - return -1; - } - - uv_pipe_queue_accept(loop, handle, req, TRUE); - } else { - /* First pipe handle should have already been created in uv_pipe_bind */ - assert(handle->accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE); + /* First pipe handle should have already been created in uv_pipe_bind */ + assert(handle->accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE); - for (i = 0; i < COUNTOF(handle->accept_reqs); i++) { - uv_pipe_queue_accept(loop, handle, &handle->accept_reqs[i], i == 0); - } + for (i = 0; i < COUNTOF(handle->accept_reqs); i++) { + uv_pipe_queue_accept(loop, handle, &handle->accept_reqs[i], i == 0); } return 0; @@ -693,8 +697,8 @@ static void uv_pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) { } -int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, - uv_read_cb read_cb) { +static int uv_pipe_read_start_impl(uv_pipe_t* handle, uv_alloc_cb alloc_cb, + uv_read_cb read_cb, uv_read2_cb read2_cb) { uv_loop_t* loop = handle->loop; if (!(handle->flags & UV_HANDLE_CONNECTION)) { @@ -714,9 +718,10 @@ int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, handle->flags |= UV_HANDLE_READING; handle->read_cb = read_cb; + handle->read2_cb = read2_cb; handle->alloc_cb = alloc_cb; - /* If reading was stopped and then started again, there could stell be a */ + /* If reading was stopped and then started again, there could still be a */ /* read request pending. */ if (!(handle->flags & UV_HANDLE_READ_PENDING)) uv_pipe_queue_read(loop, handle); @@ -725,11 +730,33 @@ int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, } -int uv_pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, - uv_buf_t bufs[], int bufcnt, uv_write_cb cb) { +int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, + uv_read_cb read_cb) { + return uv_pipe_read_start_impl(handle, alloc_cb, read_cb, NULL); +} + + +int uv_pipe_read2_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, + uv_read2_cb read_cb) { + return uv_pipe_read_start_impl(handle, alloc_cb, NULL, read_cb); +} + + +static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req, + uv_pipe_t* handle, uv_buf_t bufs[], int bufcnt, + uv_stream_t* send_handle, uv_write_cb cb) { int result; + uv_tcp_t* tcp_send_handle; + uv_write_t* ipc_header_req; + DWORD written; + uv_ipc_frame_uv_stream ipc_frame; + + if (bufcnt != 1 && (bufcnt != 0 || !send_handle)) { + uv__set_artificial_error(loop, UV_ENOTSUP); + return -1; + } - if (bufcnt != 1) { + if (send_handle && send_handle->type != UV_TCP) { uv__set_artificial_error(loop, UV_ENOTSUP); return -1; } @@ -750,8 +777,86 @@ int uv_pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, req->type = UV_WRITE; req->handle = (uv_stream_t*) handle; req->cb = cb; + req->ipc_header = 0; memset(&req->overlapped, 0, sizeof(req->overlapped)); + if (handle->flags & UV_HANDLE_USE_IPC_PROTOCOL) { + /* Use the IPC framing protocol. */ + if (send_handle) { + tcp_send_handle = (uv_tcp_t*)send_handle; + if (WSADuplicateSocketW(tcp_send_handle->socket, handle->ipc_pid, + &ipc_frame.socket_info)) { + uv__set_sys_error(loop, WSAGetLastError()); + return -1; + } + ipc_frame.header.flags |= UV_IPC_UV_STREAM; + } + + if (bufcnt == 1) { + ipc_frame.header.flags |= UV_IPC_RAW_DATA; + ipc_frame.header.raw_data_length = bufs[0].len; + } + + /* + * Use the provided req if we're only doing a single write. + * If we're doing multiple writes, use ipc_header_write_req to do + * the first write, and then use the provided req for the second write. + */ + if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) { + ipc_header_req = req; + } else { + /* + * Try to use the preallocated write req if it's available. + * Otherwise allocate a new one. + */ + if (handle->ipc_header_write_req.type != UV_WRITE) { + ipc_header_req = (uv_write_t*)&handle->ipc_header_write_req; + } else { + ipc_header_req = (uv_write_t*)malloc(sizeof(uv_write_t)); + if (!handle->accept_reqs) { + uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); + } + } + + uv_req_init(loop, (uv_req_t*) ipc_header_req); + ipc_header_req->type = UV_WRITE; + ipc_header_req->handle = (uv_stream_t*) handle; + ipc_header_req->cb = NULL; + ipc_header_req->ipc_header = 1; + } + + /* Write the header or the whole frame. */ + memset(&ipc_header_req->overlapped, 0, sizeof(ipc_header_req->overlapped)); + + result = WriteFile(handle->handle, + &ipc_frame, + ipc_frame.header.flags & UV_IPC_UV_STREAM ? + sizeof(ipc_frame) : sizeof(ipc_frame.header), + &written, + &ipc_header_req->overlapped); + if (!result && GetLastError() != ERROR_IO_PENDING) { + uv__set_sys_error(loop, GetLastError()); + return -1; + } + + if (result) { + /* Request completed immediately. */ + req->queued_bytes = 0; + } else { + /* Request queued by the kernel. */ + req->queued_bytes = written; + handle->write_queue_size += req->queued_bytes; + } + + handle->reqs_pending++; + handle->write_reqs_pending++; + + /* If we don't have any raw data to write - we're done. */ + if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) { + return 0; + } + } + result = WriteFile(handle->handle, bufs[0].base, bufs[0].len, @@ -779,6 +884,23 @@ int uv_pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, } +int uv_pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, + uv_buf_t bufs[], int bufcnt, uv_write_cb cb) { + return uv_pipe_write_impl(loop, req, handle, bufs, bufcnt, NULL, cb); +} + + +int uv_pipe_write2(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, + uv_buf_t bufs[], int bufcnt, uv_stream_t* send_handle, uv_write_cb cb) { + if (!(handle->flags & UV_HANDLE_USE_IPC_PROTOCOL)) { + uv__set_artificial_error(loop, UV_EINVAL); + return -1; + } + + return uv_pipe_write_impl(loop, req, handle, bufs, bufcnt, send_handle, cb); +} + + static void uv_pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle, uv_buf_t buf) { /* If there is an eof timer running, we don't need it any more, */ @@ -789,7 +911,11 @@ static void uv_pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle, uv_read_stop((uv_stream_t*) handle); uv__set_artificial_error(loop, UV_EOF); - handle->read_cb((uv_stream_t*) handle, -1, uv_null_buf_); + if (handle->read2_cb) { + handle->read2_cb(handle, -1, uv_null_buf_, UV_UNKNOWN_HANDLE); + } else { + handle->read_cb((uv_stream_t*) handle, -1, uv_null_buf_); + } } @@ -802,7 +928,11 @@ static void uv_pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error, uv_read_stop((uv_stream_t*) handle); uv__set_sys_error(loop, error); - handle->read_cb((uv_stream_t*)handle, -1, buf); + if (handle->read2_cb) { + handle->read2_cb(handle, -1, buf, UV_UNKNOWN_HANDLE); + } else { + handle->read_cb((uv_stream_t*)handle, -1, buf); + } } @@ -820,6 +950,7 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, uv_req_t* req) { DWORD bytes, avail; uv_buf_t buf; + uv_ipc_frame_uv_stream ipc_frame; assert(handle->type == UV_NAMED_PIPE); @@ -838,11 +969,11 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, /* Do non-blocking reads until the buffer is empty */ while (handle->flags & UV_HANDLE_READING) { if (!PeekNamedPipe(handle->handle, - NULL, - 0, - NULL, - &avail, - NULL)) { + NULL, + 0, + NULL, + &avail, + NULL)) { uv_pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_); break; } @@ -852,6 +983,62 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, break; } + if (handle->flags & UV_HANDLE_USE_IPC_PROTOCOL) { + /* Use the IPC framing protocol to read the incoming data. */ + if (handle->remaining_ipc_rawdata_bytes == 0) { + /* We're reading a new frame. First, read the header. */ + assert(avail >= sizeof(ipc_frame.header)); + + if (!ReadFile(handle->handle, + &ipc_frame.header, + sizeof(ipc_frame.header), + &bytes, + NULL)) { + uv_pipe_read_error_or_eof(loop, handle, GetLastError(), + uv_null_buf_); + break; + } + + assert(bytes == sizeof(ipc_frame.header)); + + if (ipc_frame.header.flags & UV_IPC_UV_STREAM) { + assert(avail - sizeof(ipc_frame.header) >= + sizeof(ipc_frame.socket_info)); + + /* Read the TCP socket info. */ + if (!ReadFile(handle->handle, + &ipc_frame.socket_info, + sizeof(ipc_frame) - sizeof(ipc_frame.header), + &bytes, + NULL)) { + uv_pipe_read_error_or_eof(loop, handle, GetLastError(), + uv_null_buf_); + break; + } + + assert(bytes == sizeof(ipc_frame) - sizeof(ipc_frame.header)); + + /* Store the pending socket info. */ + assert(!handle->pending_socket_info); + handle->pending_socket_info = + (WSAPROTOCOL_INFOW*)malloc(sizeof(*(handle->pending_socket_info))); + if (!handle->pending_socket_info) { + uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); + } + + *(handle->pending_socket_info) = ipc_frame.socket_info; + } + + if (ipc_frame.header.flags & UV_IPC_RAW_DATA) { + handle->remaining_ipc_rawdata_bytes = + ipc_frame.header.raw_data_length; + continue; + } + } else { + avail = min(avail, (DWORD)handle->remaining_ipc_rawdata_bytes); + } + } + buf = handle->alloc_cb((uv_handle_t*) handle, avail); assert(buf.len > 0); @@ -861,7 +1048,25 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, &bytes, NULL)) { /* Successful read */ - handle->read_cb((uv_stream_t*)handle, bytes, buf); + if (handle->flags & UV_HANDLE_USE_IPC_PROTOCOL) { + assert(handle->remaining_ipc_rawdata_bytes >= bytes); + handle->remaining_ipc_rawdata_bytes = + handle->remaining_ipc_rawdata_bytes - bytes; + if (handle->read2_cb) { + handle->read2_cb(handle, bytes, buf, + handle->pending_socket_info ? UV_TCP : UV_UNKNOWN_HANDLE); + } else if (handle->read_cb) { + handle->read_cb((uv_stream_t*)handle, bytes, buf); + } + + if (handle->pending_socket_info) { + free(handle->pending_socket_info); + handle->pending_socket_info = NULL; + } + } else { + handle->read_cb((uv_stream_t*)handle, bytes, buf); + } + /* Read again only if bytes == buf.len */ if (bytes <= buf.len) { break; @@ -889,12 +1094,20 @@ void uv_process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle, handle->write_queue_size -= req->queued_bytes; - if (req->cb) { - if (!REQ_SUCCESS(req)) { - uv__set_sys_error(loop, GET_REQ_ERROR(req)); - ((uv_write_cb)req->cb)(req, -1); + if (req->ipc_header) { + if (req == &handle->ipc_header_write_req) { + req->type = UV_UNKNOWN_REQ; } else { - ((uv_write_cb)req->cb)(req, 0); + free(req); + } + } else { + if (req->cb) { + if (!REQ_SUCCESS(req)) { + uv__set_sys_error(loop, GET_REQ_ERROR(req)); + ((uv_write_cb)req->cb)(req, -1); + } else { + ((uv_write_cb)req->cb)(req, 0); + } } } @@ -927,8 +1140,7 @@ void uv_process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle, CloseHandle(req->pipeHandle); req->pipeHandle = INVALID_HANDLE_VALUE; } - if (!(handle->flags & UV_HANDLE_CLOSING) && - !(handle->flags & UV_HANDLE_GIVEN_OS_HANDLE)) { + if (!(handle->flags & UV_HANDLE_CLOSING)) { uv_pipe_queue_accept(loop, handle, req, FALSE); } } @@ -1065,6 +1277,21 @@ static void eof_timer_close_cb(uv_handle_t* handle) { void uv_pipe_open(uv_pipe_t* pipe, uv_file file) { - assert(0 && "implement me"); -} + HANDLE os_handle; + + /* Special-case stdin with ipc. */ + if (file == 0 && pipe->flags & UV_HANDLE_USE_IPC_PROTOCOL) { + os_handle = (HANDLE)_get_osfhandle(file); + + if (os_handle == INVALID_HANDLE_VALUE || + uv_set_pipe_handle(pipe->loop, pipe, os_handle) == -1) { + return; + } + + uv_pipe_connection_init(pipe); + pipe->ipc_pid = uv_parent_pid(); + assert(pipe->ipc_pid != -1); + pipe->handle = os_handle; + } +} diff --git a/deps/uv/src/win/process.c b/deps/uv/src/win/process.c index 4db04832514..da72d55ef35 100644 --- a/deps/uv/src/win/process.c +++ b/deps/uv/src/win/process.c @@ -45,7 +45,7 @@ typedef struct env_var { uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); \ } \ if (!uv_utf8_to_utf16(s, t, size / sizeof(wchar_t))) { \ - uv__set_sys_error(loop, GetLastError()); \ + uv__set_sys_error(loop, GetLastError()); \ err = -1; \ goto done; \ } @@ -739,7 +739,8 @@ void uv_process_close(uv_loop_t* loop, uv_process_t* handle) { static int uv_create_stdio_pipe_pair(uv_loop_t* loop, uv_pipe_t* server_pipe, - HANDLE* child_pipe, DWORD server_access, DWORD child_access) { + HANDLE* child_pipe, DWORD server_access, DWORD child_access, + int overlapped) { int err; SECURITY_ATTRIBUTES sa = { sizeof(SECURITY_ATTRIBUTES), NULL, TRUE }; char pipe_name[64]; @@ -767,7 +768,7 @@ static int uv_create_stdio_pipe_pair(uv_loop_t* loop, uv_pipe_t* server_pipe, 0, &sa, OPEN_EXISTING, - 0, + overlapped ? FILE_FLAG_OVERLAPPED : 0, NULL); if (*child_pipe == INVALID_HANDLE_VALUE) { @@ -848,7 +849,8 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process, wchar_t* path = NULL; int size; BOOL result; - wchar_t* application_path = NULL, *application = NULL, *arguments = NULL, *env = NULL, *cwd = NULL; + wchar_t* application_path = NULL, *application = NULL, *arguments = NULL, + *env = NULL, *cwd = NULL; HANDLE* child_stdio = process->child_stdio; STARTUPINFOW startup; PROCESS_INFORMATION info; @@ -904,12 +906,23 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process, /* Create stdio pipes. */ if (options.stdin_stream) { - err = uv_create_stdio_pipe_pair( - loop, - options.stdin_stream, - &child_stdio[0], - PIPE_ACCESS_OUTBOUND, - GENERIC_READ | FILE_WRITE_ATTRIBUTES); + if (options.stdin_stream->flags & UV_HANDLE_USE_IPC_PROTOCOL) { + err = uv_create_stdio_pipe_pair( + loop, + options.stdin_stream, + &child_stdio[0], + PIPE_ACCESS_DUPLEX, + GENERIC_READ | FILE_WRITE_ATTRIBUTES | GENERIC_WRITE, + 1); + } else { + err = uv_create_stdio_pipe_pair( + loop, + options.stdin_stream, + &child_stdio[0], + PIPE_ACCESS_OUTBOUND, + GENERIC_READ | FILE_WRITE_ATTRIBUTES, + 0); + } } else { err = duplicate_std_handle(loop, STD_INPUT_HANDLE, &child_stdio[0]); } @@ -922,7 +935,8 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process, loop, options.stdout_stream, &child_stdio[1], PIPE_ACCESS_INBOUND, - GENERIC_WRITE); + GENERIC_WRITE, + 0); } else { err = duplicate_std_handle(loop, STD_OUTPUT_HANDLE, &child_stdio[1]); } @@ -936,7 +950,8 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process, options.stderr_stream, &child_stdio[2], PIPE_ACCESS_INBOUND, - GENERIC_WRITE); + GENERIC_WRITE, + 0); } else { err = duplicate_std_handle(loop, STD_ERROR_HANDLE, &child_stdio[2]); } @@ -969,6 +984,11 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process, process->process_handle = info.hProcess; process->pid = info.dwProcessId; + if (options.stdin_stream && + options.stdin_stream->flags & UV_HANDLE_USE_IPC_PROTOCOL) { + options.stdin_stream->ipc_pid = info.dwProcessId; + } + /* Setup notifications for when the child process exits. */ result = RegisterWaitForSingleObject(&process->wait_handle, process->process_handle, exit_wait_callback, (void*)process, INFINITE, diff --git a/deps/uv/src/win/stream.c b/deps/uv/src/win/stream.c index c38e06bb2ab..f12117841a0 100644 --- a/deps/uv/src/win/stream.c +++ b/deps/uv/src/win/stream.c @@ -62,13 +62,11 @@ int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) { int uv_accept(uv_stream_t* server, uv_stream_t* client) { - assert(client->type == server->type); - switch (server->type) { case UV_TCP: return uv_tcp_accept((uv_tcp_t*)server, (uv_tcp_t*)client); case UV_NAMED_PIPE: - return uv_pipe_accept((uv_pipe_t*)server, (uv_pipe_t*)client); + return uv_pipe_accept((uv_pipe_t*)server, client); default: assert(0); return -1; @@ -92,6 +90,18 @@ int uv_read_start(uv_stream_t* handle, uv_alloc_cb alloc_cb, } +int uv_read2_start(uv_stream_t* handle, uv_alloc_cb alloc_cb, + uv_read2_cb read_cb) { + switch (handle->type) { + case UV_NAMED_PIPE: + return uv_pipe_read2_start((uv_pipe_t*)handle, alloc_cb, read_cb); + default: + assert(0); + return -1; + } +} + + int uv_read_stop(uv_stream_t* handle) { if (handle->type == UV_TTY) { return uv_tty_read_stop((uv_tty_t*) handle); @@ -121,6 +131,21 @@ int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt, } +int uv_write2(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt, + uv_stream_t* send_handle, uv_write_cb cb) { + uv_loop_t* loop = handle->loop; + + switch (handle->type) { + case UV_NAMED_PIPE: + return uv_pipe_write2(loop, req, (uv_pipe_t*) handle, bufs, bufcnt, send_handle, cb); + default: + assert(0); + uv__set_sys_error(loop, WSAEINVAL); + return -1; + } +} + + int uv_shutdown(uv_shutdown_t* req, uv_stream_t* handle, uv_shutdown_cb cb) { uv_loop_t* loop = handle->loop; diff --git a/deps/uv/src/win/tcp.c b/deps/uv/src/win/tcp.c index ee95aa11108..897ea5e9c4a 100644 --- a/deps/uv/src/win/tcp.c +++ b/deps/uv/src/win/tcp.c @@ -47,7 +47,7 @@ static unsigned int active_tcp_streams = 0; static int uv_tcp_set_socket(uv_loop_t* loop, uv_tcp_t* handle, - SOCKET socket) { + SOCKET socket, int imported) { DWORD yes = 1; assert(handle->socket == INVALID_SOCKET); @@ -70,8 +70,12 @@ static int uv_tcp_set_socket(uv_loop_t* loop, uv_tcp_t* handle, loop->iocp, (ULONG_PTR)socket, 0) == NULL) { - uv__set_sys_error(loop, GetLastError()); - return -1; + if (imported) { + handle->flags |= UV_HANDLE_EMULATE_IOCP; + } else { + uv__set_sys_error(loop, GetLastError()); + return -1; + } } if (pSetFileCompletionNotificationModes) { @@ -99,6 +103,8 @@ int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* handle) { handle->socket = INVALID_SOCKET; handle->type = UV_TCP; handle->reqs_pending = 0; + handle->func_acceptex = NULL; + handle->func_connectex = NULL; loop->counters.tcp_init++; @@ -109,6 +115,8 @@ int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* handle) { void uv_tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle) { int status; int sys_error; + unsigned int i; + uv_tcp_accept_t* req; if (handle->flags & UV_HANDLE_CONNECTION && handle->flags & UV_HANDLE_SHUTTING && @@ -139,6 +147,20 @@ void uv_tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle) { handle->flags |= UV_HANDLE_CLOSED; if (!(handle->flags & UV_HANDLE_CONNECTION) && handle->accept_reqs) { + if (handle->flags & UV_HANDLE_EMULATE_IOCP) { + for (i = 0; i < uv_simultaneous_server_accepts; i++) { + req = &handle->accept_reqs[i]; + if (req->wait_handle != INVALID_HANDLE_VALUE) { + UnregisterWait(req->wait_handle); + req->wait_handle = INVALID_HANDLE_VALUE; + } + if (req->event_handle) { + CloseHandle(req->event_handle); + req->event_handle = NULL; + } + } + } + free(handle->accept_reqs); handle->accept_reqs = NULL; } @@ -169,7 +191,7 @@ static int uv__bind(uv_tcp_t* handle, return -1; } - if (uv_tcp_set_socket(handle->loop, handle, sock) == -1) { + if (uv_tcp_set_socket(handle->loop, handle, sock, 0) == -1) { closesocket(sock); return -1; } @@ -218,24 +240,40 @@ int uv__tcp_bind6(uv_tcp_t* handle, struct sockaddr_in6 addr) { } +static void CALLBACK post_completion(void* context, BOOLEAN timed_out) { + uv_tcp_accept_t* req; + uv_tcp_t* handle; + + req = (uv_tcp_accept_t*) context; + assert(req != NULL); + handle = (uv_tcp_t*)req->data; + assert(handle != NULL); + assert(!timed_out); + + if (!PostQueuedCompletionStatus(handle->loop->iocp, + req->overlapped.InternalHigh, + 0, + &req->overlapped)) { + uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus"); + } +} + + static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) { uv_loop_t* loop = handle->loop; BOOL success; DWORD bytes; SOCKET accept_socket; short family; - LPFN_ACCEPTEX pAcceptExFamily; assert(handle->flags & UV_HANDLE_LISTENING); assert(req->accept_socket == INVALID_SOCKET); /* choose family and extension function */ - if ((handle->flags & UV_HANDLE_IPV6) != 0) { + if (handle->flags & UV_HANDLE_IPV6) { family = AF_INET6; - pAcceptExFamily = pAcceptEx6; } else { family = AF_INET; - pAcceptExFamily = pAcceptEx; } /* Open a socket for the accepted connection. */ @@ -249,15 +287,18 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) { /* Prepare the overlapped structure. */ memset(&(req->overlapped), 0, sizeof(req->overlapped)); + if (handle->flags & UV_HANDLE_EMULATE_IOCP) { + req->overlapped.hEvent = (HANDLE) ((DWORD) req->event_handle | 1); + } - success = pAcceptExFamily(handle->socket, - accept_socket, - (void*)req->accept_buffer, - 0, - sizeof(struct sockaddr_storage), - sizeof(struct sockaddr_storage), - &bytes, - &req->overlapped); + success = handle->func_acceptex(handle->socket, + accept_socket, + (void*)req->accept_buffer, + 0, + sizeof(struct sockaddr_storage), + sizeof(struct sockaddr_storage), + &bytes, + &req->overlapped); if (UV_SUCCEEDED_WITHOUT_IOCP(success)) { /* Process the req without IOCP. */ @@ -268,6 +309,15 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) { /* The req will be processed with IOCP. */ req->accept_socket = accept_socket; handle->reqs_pending++; + if (handle->flags & UV_HANDLE_EMULATE_IOCP && + req->wait_handle == INVALID_HANDLE_VALUE && + !RegisterWaitForSingleObject(&req->wait_handle, + req->overlapped.hEvent, post_completion, (void*) req, + INFINITE, WT_EXECUTEINWAITTHREAD)) { + SET_REQ_ERROR(req, GetLastError()); + uv_insert_pending_req(loop, (uv_req_t*)req); + return; + } } else { /* Make this req pending reporting an error. */ SET_REQ_ERROR(req, WSAGetLastError()); @@ -275,6 +325,11 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) { handle->reqs_pending++; /* Destroy the preallocated client socket. */ closesocket(accept_socket); + /* Destroy the event handle */ + if (handle->flags & UV_HANDLE_EMULATE_IOCP) { + CloseHandle(req->overlapped.hEvent); + req->event_handle = NULL; + } } } @@ -357,6 +412,13 @@ int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) { uv_tcp_bind(handle, uv_addr_ip4_any_) < 0) return -1; + if (!handle->func_acceptex) { + if(!uv_get_acceptex_function(handle->socket, &handle->func_acceptex)) { + uv__set_sys_error(loop, WSAEAFNOSUPPORT); + return -1; + } + } + if (listen(handle->socket, backlog) == SOCKET_ERROR) { uv__set_sys_error(loop, WSAGetLastError()); return -1; @@ -378,6 +440,17 @@ int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) { req->type = UV_ACCEPT; req->accept_socket = INVALID_SOCKET; req->data = handle; + + req->wait_handle = INVALID_HANDLE_VALUE; + if (handle->flags & UV_HANDLE_EMULATE_IOCP) { + req->event_handle = CreateEvent(NULL, 0, 0, NULL); + if (!req->event_handle) { + uv_fatal_error(GetLastError(), "CreateEvent"); + } + } else { + req->event_handle = NULL; + } + uv_tcp_queue_accept(handle, req); } @@ -402,7 +475,7 @@ int uv_tcp_accept(uv_tcp_t* server, uv_tcp_t* client) { return -1; } - if (uv_tcp_set_socket(client->loop, client, req->accept_socket) == -1) { + if (uv_tcp_set_socket(client->loop, client, req->accept_socket, 0) == -1) { closesocket(req->accept_socket); rv = -1; } else { @@ -476,19 +549,26 @@ int uv__tcp_connect(uv_connect_t* req, uv_tcp_bind(handle, uv_addr_ip4_any_) < 0) return -1; + if (!handle->func_connectex) { + if(!uv_get_connectex_function(handle->socket, &handle->func_connectex)) { + uv__set_sys_error(loop, WSAEAFNOSUPPORT); + return -1; + } + } + uv_req_init(loop, (uv_req_t*) req); req->type = UV_CONNECT; req->handle = (uv_stream_t*) handle; req->cb = cb; memset(&req->overlapped, 0, sizeof(req->overlapped)); - success = pConnectEx(handle->socket, - (struct sockaddr*) &address, - addrsize, - NULL, - 0, - &bytes, - &req->overlapped); + success = handle->func_connectex(handle->socket, + (struct sockaddr*) &address, + addrsize, + NULL, + 0, + &bytes, + &req->overlapped); if (UV_SUCCEEDED_WITHOUT_IOCP(success)) { /* Process the req without IOCP. */ @@ -529,19 +609,26 @@ int uv__tcp_connect6(uv_connect_t* req, uv_tcp_bind6(handle, uv_addr_ip6_any_) < 0) return -1; + if (!handle->func_connectex) { + if(!uv_get_connectex_function(handle->socket, &handle->func_connectex)) { + uv__set_sys_error(loop, WSAEAFNOSUPPORT); + return -1; + } + } + uv_req_init(loop, (uv_req_t*) req); req->type = UV_CONNECT; req->handle = (uv_stream_t*) handle; req->cb = cb; memset(&req->overlapped, 0, sizeof(req->overlapped)); - success = pConnectEx6(handle->socket, - (struct sockaddr*) &address, - addrsize, - NULL, - 0, - &bytes, - &req->overlapped); + success = handle->func_connectex(handle->socket, + (struct sockaddr*) &address, + addrsize, + NULL, + 0, + &bytes, + &req->overlapped); if (UV_SUCCEEDED_WITHOUT_IOCP(success)) { handle->reqs_pending++; @@ -848,3 +935,22 @@ void uv_process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle, DECREASE_PENDING_REQ_COUNT(handle); } + + +int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info) { + SOCKET socket = WSASocketW(AF_INET, + SOCK_STREAM, + IPPROTO_IP, + socket_protocol_info, + 0, + WSA_FLAG_OVERLAPPED); + + if (socket == INVALID_SOCKET) { + uv__set_sys_error(tcp->loop, WSAGetLastError()); + return -1; + } + + tcp->flags |= UV_HANDLE_BOUND; + + return uv_tcp_set_socket(tcp->loop, tcp, socket, 1); +} diff --git a/deps/uv/src/win/util.c b/deps/uv/src/win/util.c index cb2d44381c8..cc6f93cfa6c 100644 --- a/deps/uv/src/win/util.c +++ b/deps/uv/src/win/util.c @@ -25,6 +25,7 @@ #include "uv.h" #include "internal.h" +#include "Tlhelp32.h" int uv_utf16_to_utf8(const wchar_t* utf16Buffer, size_t utf16Size, @@ -95,11 +96,13 @@ done: return retVal; } + void uv_loadavg(double avg[3]) { /* Can't be implemented */ avg[0] = avg[1] = avg[2] = 0; } + double uv_get_free_memory(void) { MEMORYSTATUSEX memory_status; memory_status.dwLength = sizeof(memory_status); @@ -112,6 +115,7 @@ double uv_get_free_memory(void) { return (double)memory_status.ullAvailPhys; } + double uv_get_total_memory(void) { MEMORYSTATUSEX memory_status; memory_status.dwLength = sizeof(memory_status); @@ -123,3 +127,26 @@ double uv_get_total_memory(void) { return (double)memory_status.ullTotalPhys; } + + +int uv_parent_pid() { + int parent_pid = -1; + HANDLE handle; + PROCESSENTRY32 pe; + int current_pid = GetCurrentProcessId(); + + pe.dwSize = sizeof(PROCESSENTRY32); + handle = CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0); + + if (Process32First(handle, &pe)) { + do { + if (pe.th32ProcessID == current_pid) { + parent_pid = pe.th32ParentProcessID; + break; + } + } while( Process32Next(handle, &pe)); + } + + CloseHandle(handle); + return parent_pid; +} diff --git a/deps/uv/src/win/winsock.c b/deps/uv/src/win/winsock.c index 1f56b3d75bd..e37a60a9d38 100644 --- a/deps/uv/src/win/winsock.c +++ b/deps/uv/src/win/winsock.c @@ -25,21 +25,6 @@ #include "../uv-common.h" #include "internal.h" - -/* Winsock extension functions (ipv4) */ -LPFN_CONNECTEX pConnectEx; -LPFN_ACCEPTEX pAcceptEx; -LPFN_GETACCEPTEXSOCKADDRS pGetAcceptExSockAddrs; -LPFN_DISCONNECTEX pDisconnectEx; -LPFN_TRANSMITFILE pTransmitFile; - -/* Winsock extension functions (ipv6) */ -LPFN_CONNECTEX pConnectEx6; -LPFN_ACCEPTEX pAcceptEx6; -LPFN_GETACCEPTEXSOCKADDRS pGetAcceptExSockAddrs6; -LPFN_DISCONNECTEX pDisconnectEx6; -LPFN_TRANSMITFILE pTransmitFile6; - /* Whether ipv6 is supported */ int uv_allow_ipv6; @@ -74,6 +59,18 @@ static BOOL uv_get_extension_function(SOCKET socket, GUID guid, } +BOOL uv_get_acceptex_function(SOCKET socket, LPFN_ACCEPTEX* target) { + const GUID wsaid_acceptex = WSAID_ACCEPTEX; + return uv_get_extension_function(socket, wsaid_acceptex, (void**)target); +} + + +BOOL uv_get_connectex_function(SOCKET socket, LPFN_CONNECTEX* target) { + const GUID wsaid_connectex = WSAID_CONNECTEX; + return uv_get_extension_function(socket, wsaid_connectex, (void**)target); +} + + void uv_winsock_init() { const GUID wsaid_connectex = WSAID_CONNECTEX; const GUID wsaid_acceptex = WSAID_ACCEPTEX; @@ -83,7 +80,6 @@ void uv_winsock_init() { WSADATA wsa_data; int errorno; - SOCKET dummy; SOCKET dummy6; /* Initialize winsock */ @@ -96,58 +92,10 @@ void uv_winsock_init() { uv_addr_ip4_any_ = uv_ip4_addr("0.0.0.0", 0); uv_addr_ip6_any_ = uv_ip6_addr("::", 0); - /* Retrieve the needed winsock extension function pointers. */ - dummy = socket(AF_INET, SOCK_STREAM, IPPROTO_IP); - if (dummy == INVALID_SOCKET) { - uv_fatal_error(WSAGetLastError(), "socket"); - } - - if (!uv_get_extension_function(dummy, - wsaid_connectex, - (void**)&pConnectEx) || - !uv_get_extension_function(dummy, - wsaid_acceptex, - (void**)&pAcceptEx) || - !uv_get_extension_function(dummy, - wsaid_getacceptexsockaddrs, - (void**)&pGetAcceptExSockAddrs) || - !uv_get_extension_function(dummy, - wsaid_disconnectex, - (void**)&pDisconnectEx) || - !uv_get_extension_function(dummy, - wsaid_transmitfile, - (void**)&pTransmitFile)) { - uv_fatal_error(WSAGetLastError(), - "WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER)"); - } - - if (closesocket(dummy) == SOCKET_ERROR) { - uv_fatal_error(WSAGetLastError(), "closesocket"); - } - - /* optional IPv6 versions of winsock extension functions */ + /* Detect IPV6 support */ dummy6 = socket(AF_INET6, SOCK_STREAM, IPPROTO_IP); if (dummy6 != INVALID_SOCKET) { uv_allow_ipv6 = TRUE; - - if (!uv_get_extension_function(dummy6, - wsaid_connectex, - (void**)&pConnectEx6) || - !uv_get_extension_function(dummy6, - wsaid_acceptex, - (void**)&pAcceptEx6) || - !uv_get_extension_function(dummy6, - wsaid_getacceptexsockaddrs, - (void**)&pGetAcceptExSockAddrs6) || - !uv_get_extension_function(dummy6, - wsaid_disconnectex, - (void**)&pDisconnectEx6) || - !uv_get_extension_function(dummy6, - wsaid_transmitfile, - (void**)&pTransmitFile6)) { - uv_allow_ipv6 = FALSE; - } - if (closesocket(dummy6) == SOCKET_ERROR) { uv_fatal_error(WSAGetLastError(), "closesocket"); } diff --git a/deps/uv/src/win/winsock.h b/deps/uv/src/win/winsock.h index 2c9fb92db1b..f879cc65cec 100644 --- a/deps/uv/src/win/winsock.h +++ b/deps/uv/src/win/winsock.h @@ -109,24 +109,12 @@ #define IPV6_V6ONLY 27 #endif - -/* Winsock extension functions (ipv4) */ -extern LPFN_CONNECTEX pConnectEx; -extern LPFN_ACCEPTEX pAcceptEx; -extern LPFN_GETACCEPTEXSOCKADDRS pGetAcceptExSockAddrs; -extern LPFN_DISCONNECTEX pDisconnectEx; -extern LPFN_TRANSMITFILE pTransmitFile; - -/* Winsock extension functions (ipv6) */ -extern LPFN_CONNECTEX pConnectEx6; -extern LPFN_ACCEPTEX pAcceptEx6; -extern LPFN_GETACCEPTEXSOCKADDRS pGetAcceptExSockAddrs6; -extern LPFN_DISCONNECTEX pDisconnectEx6; -extern LPFN_TRANSMITFILE pTransmitFile6; - /* Whether ipv6 is supported */ extern int uv_allow_ipv6; +BOOL uv_get_acceptex_function(SOCKET socket, LPFN_ACCEPTEX* target); +BOOL uv_get_connectex_function(SOCKET socket, LPFN_CONNECTEX* target); + /* Ip address used to bind to any port at any interface */ extern struct sockaddr_in uv_addr_ip4_any_; extern struct sockaddr_in6 uv_addr_ip6_any_; diff --git a/deps/uv/test/benchmark-list.h b/deps/uv/test/benchmark-list.h index 0e5467c7a1b..0a72fb98615 100644 --- a/deps/uv/test/benchmark-list.h +++ b/deps/uv/test/benchmark-list.h @@ -21,6 +21,7 @@ BENCHMARK_DECLARE (sizes) BENCHMARK_DECLARE (ping_pongs) +BENCHMARK_DECLARE (tcp_write_batch) BENCHMARK_DECLARE (tcp4_pound_100) BENCHMARK_DECLARE (tcp4_pound_1000) BENCHMARK_DECLARE (pipe_pound_100) @@ -42,6 +43,7 @@ BENCHMARK_DECLARE (udp_packet_storm_1000v1000) BENCHMARK_DECLARE (gethostbyname) BENCHMARK_DECLARE (getaddrinfo) BENCHMARK_DECLARE (spawn) +HELPER_DECLARE (tcp4_blackhole_server) HELPER_DECLARE (tcp_pump_server) HELPER_DECLARE (pipe_pump_server) HELPER_DECLARE (tcp4_echo_server) @@ -54,6 +56,9 @@ TASK_LIST_START BENCHMARK_ENTRY (ping_pongs) BENCHMARK_HELPER (ping_pongs, tcp4_echo_server) + BENCHMARK_ENTRY (tcp_write_batch) + BENCHMARK_HELPER (tcp_write_batch, tcp4_blackhole_server) + BENCHMARK_ENTRY (tcp_pump100_client) BENCHMARK_HELPER (tcp_pump100_client, tcp_pump_server) diff --git a/deps/uv/test/benchmark-pound.c b/deps/uv/test/benchmark-pound.c index 1f56e27f69d..af7ce247dab 100644 --- a/deps/uv/test/benchmark-pound.c +++ b/deps/uv/test/benchmark-pound.c @@ -222,7 +222,7 @@ static void tcp_make_connect(conn_rec* p) { static void pipe_make_connect(conn_rec* p) { int r; - r = uv_pipe_init(loop, (uv_pipe_t*)&p->stream); + r = uv_pipe_init(loop, (uv_pipe_t*)&p->stream, 0); ASSERT(r == 0); r = uv_pipe_connect(&((pipe_conn_rec*)p)->conn_req, (uv_pipe_t*)&p->stream, TEST_PIPENAME, connect_cb); diff --git a/deps/uv/test/benchmark-pump.c b/deps/uv/test/benchmark-pump.c index d0b09301570..27e8abe0c82 100644 --- a/deps/uv/test/benchmark-pump.c +++ b/deps/uv/test/benchmark-pump.c @@ -253,7 +253,7 @@ static void maybe_connect_some() { } else { pipe = &pipe_write_handles[max_connect_socket++]; - r = uv_pipe_init(loop, pipe); + r = uv_pipe_init(loop, pipe, 0); ASSERT(r == 0); req = (uv_connect_t*) req_alloc(); @@ -277,7 +277,7 @@ static void connection_cb(uv_stream_t* s, int status) { ASSERT(r == 0); } else { stream = (uv_stream_t*)malloc(sizeof(uv_pipe_t)); - r = uv_pipe_init(loop, (uv_pipe_t*)stream); + r = uv_pipe_init(loop, (uv_pipe_t*)stream, 0); ASSERT(r == 0); } @@ -396,7 +396,7 @@ HELPER_IMPL(pipe_pump_server) { /* Server */ server = (uv_stream_t*)&pipeServer; - r = uv_pipe_init(loop, &pipeServer); + r = uv_pipe_init(loop, &pipeServer, 0); ASSERT(r == 0); r = uv_pipe_bind(&pipeServer, TEST_PIPENAME); ASSERT(r == 0); diff --git a/deps/uv/test/benchmark-spawn.c b/deps/uv/test/benchmark-spawn.c index 6e5493d529b..d34f42b9feb 100644 --- a/deps/uv/test/benchmark-spawn.c +++ b/deps/uv/test/benchmark-spawn.c @@ -113,7 +113,7 @@ static void spawn() { options.args = args; options.exit_cb = exit_cb; - uv_pipe_init(loop, &out); + uv_pipe_init(loop, &out, 0); options.stdout_stream = &out; r = uv_spawn(loop, &process, options); diff --git a/deps/uv/test/benchmark-tcp-write-batch.c b/deps/uv/test/benchmark-tcp-write-batch.c new file mode 100644 index 00000000000..77bb0191b1e --- /dev/null +++ b/deps/uv/test/benchmark-tcp-write-batch.c @@ -0,0 +1,144 @@ +/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include "uv.h" +#include "task.h" + +#include <stdio.h> +#include <stddef.h> +#include <stdlib.h> + +#define WRITE_REQ_DATA "Hello, world." +#define NUM_WRITE_REQS (1000 * 1000) + +#define container_of(ptr, type, member) \ + ((type *) ((char *) (ptr) - offsetof(type, member))) + +typedef struct { + uv_write_t req; + uv_buf_t buf; +} write_req; + + +static write_req* write_reqs; +static uv_tcp_t tcp_client; +static uv_connect_t connect_req; +static uv_shutdown_t shutdown_req; + +static int shutdown_cb_called = 0; +static int connect_cb_called = 0; +static int write_cb_called = 0; +static int close_cb_called = 0; + +static void connect_cb(uv_connect_t* req, int status); +static void write_cb(uv_write_t* req, int status); +static void shutdown_cb(uv_shutdown_t* req, int status); +static void close_cb(uv_handle_t* handle); + + +static void connect_cb(uv_connect_t* req, int status) { + write_req* w; + int i; + int r; + + ASSERT(req->handle == (uv_stream_t*)&tcp_client); + + for (i = 0; i < NUM_WRITE_REQS; i++) { + w = &write_reqs[i]; + r = uv_write(&w->req, req->handle, &w->buf, 1, write_cb); + ASSERT(r == 0); + } + + r = uv_shutdown(&shutdown_req, req->handle, shutdown_cb); + ASSERT(r == 0); + + connect_cb_called++; +} + + +static void write_cb(uv_write_t* req, int status) { + ASSERT(req != NULL); + ASSERT(status == 0); + write_cb_called++; +} + + +static void shutdown_cb(uv_shutdown_t* req, int status) { + ASSERT(req->handle == (uv_stream_t*)&tcp_client); + ASSERT(req->handle->write_queue_size == 0); + + uv_close((uv_handle_t*)req->handle, close_cb); + free(write_reqs); + + shutdown_cb_called++; +} + + +static void close_cb(uv_handle_t* handle) { + ASSERT(handle == (uv_handle_t*)&tcp_client); + close_cb_called++; +} + + +BENCHMARK_IMPL(tcp_write_batch) { + struct sockaddr_in addr; + uv_loop_t* loop; + uint64_t start; + uint64_t stop; + int i; + int r; + + write_reqs = malloc(sizeof(*write_reqs) * NUM_WRITE_REQS); + ASSERT(write_reqs != NULL); + + /* Prepare the data to write out. */ + for (i = 0; i < NUM_WRITE_REQS; i++) { + write_reqs[i].buf = uv_buf_init(WRITE_REQ_DATA, + sizeof(WRITE_REQ_DATA) - 1); + } + + loop = uv_default_loop(); + addr = uv_ip4_addr("127.0.0.1", TEST_PORT); + + r = uv_tcp_init(loop, &tcp_client); + ASSERT(r == 0); + + r = uv_tcp_connect(&connect_req, &tcp_client, addr, connect_cb); + ASSERT(r == 0); + + start = uv_hrtime(); + + r = uv_run(loop); + ASSERT(r == 0); + + stop = uv_hrtime(); + + ASSERT(connect_cb_called == 1); + ASSERT(write_cb_called == NUM_WRITE_REQS); + ASSERT(shutdown_cb_called == 1); + ASSERT(close_cb_called == 1); + + printf("%ld write requests in %.2fs.\n", + (long)NUM_WRITE_REQS, + (stop - start) / 10e8); + + return 0; +} diff --git a/deps/uv/test/blackhole-server.c b/deps/uv/test/blackhole-server.c new file mode 100644 index 00000000000..85a0efc26bf --- /dev/null +++ b/deps/uv/test/blackhole-server.c @@ -0,0 +1,122 @@ +/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include "uv.h" +#include "task.h" + +#include <stdio.h> +#include <stddef.h> +#include <stdlib.h> + +#define container_of(ptr, type, member) \ + ((type *) ((char *) (ptr) - offsetof(type, member))) + +typedef struct { + uv_tcp_t handle; + uv_shutdown_t shutdown_req; +} conn_rec; + +static uv_tcp_t tcp_server; + +static void connection_cb(uv_stream_t* stream, int status); +static uv_buf_t alloc_cb(uv_handle_t* handle, size_t suggested_size); +static void read_cb(uv_stream_t* stream, ssize_t nread, uv_buf_t buf); +static void shutdown_cb(uv_shutdown_t* req, int status); +static void close_cb(uv_handle_t* handle); + + +static void connection_cb(uv_stream_t* stream, int status) { + conn_rec* conn; + int r; + + ASSERT(status == 0); + ASSERT(stream == (uv_stream_t*)&tcp_server); + + conn = malloc(sizeof *conn); + ASSERT(conn != NULL); + + r = uv_tcp_init(stream->loop, &conn->handle); + ASSERT(r == 0); + + r = uv_accept(stream, (uv_stream_t*)&conn->handle); + ASSERT(r == 0); + + r = uv_read_start((uv_stream_t*)&conn->handle, alloc_cb, read_cb); + ASSERT(r == 0); +} + + +static uv_buf_t alloc_cb(uv_handle_t* handle, size_t suggested_size) { + static char buf[65536]; + return uv_buf_init(buf, sizeof buf); +} + + +static void read_cb(uv_stream_t* stream, ssize_t nread, uv_buf_t buf) { + conn_rec* conn; + int r; + + if (nread >= 0) + return; + + ASSERT(uv_last_error(stream->loop).code == UV_EOF); + + conn = container_of(stream, conn_rec, handle); + + r = uv_shutdown(&conn->shutdown_req, stream, shutdown_cb); + ASSERT(r == 0); +} + + +static void shutdown_cb(uv_shutdown_t* req, int status) { + conn_rec* conn = container_of(req, conn_rec, shutdown_req); + uv_close((uv_handle_t*)&conn->handle, close_cb); +} + + +static void close_cb(uv_handle_t* handle) { + conn_rec* conn = container_of(handle, conn_rec, handle); + free(conn); +} + + +HELPER_IMPL(tcp4_blackhole_server) { + struct sockaddr_in addr; + uv_loop_t* loop; + int r; + + loop = uv_default_loop(); + addr = uv_ip4_addr("127.0.0.1", TEST_PORT); + + r = uv_tcp_init(loop, &tcp_server); + ASSERT(r == 0); + + r = uv_tcp_bind(&tcp_server, addr); + ASSERT(r == 0); + + r = uv_listen((uv_stream_t*)&tcp_server, 128, connection_cb); + ASSERT(r == 0); + + r = uv_run(loop); + ASSERT(0 && "Blackhole server dropped out of event loop."); + + return 0; +} diff --git a/deps/uv/test/echo-server.c b/deps/uv/test/echo-server.c index 453ada66d24..8b1754410cd 100644 --- a/deps/uv/test/echo-server.c +++ b/deps/uv/test/echo-server.c @@ -151,7 +151,7 @@ static void on_connection(uv_stream_t* server, int status) { case PIPE: stream = malloc(sizeof(uv_pipe_t)); ASSERT(stream != NULL); - r = uv_pipe_init(loop, (uv_pipe_t*)stream); + r = uv_pipe_init(loop, (uv_pipe_t*)stream, 0); ASSERT(r == 0); break; @@ -248,7 +248,7 @@ static int pipe_echo_start(char* pipeName) { server = (uv_handle_t*)&pipeServer; serverType = PIPE; - r = uv_pipe_init(loop, &pipeServer); + r = uv_pipe_init(loop, &pipeServer, 0); if (r) { fprintf(stderr, "uv_pipe_init: %s\n", uv_strerror(uv_last_error(loop))); diff --git a/deps/uv/test/run-tests.c b/deps/uv/test/run-tests.c index a187869110a..fa7b8b8f30a 100644 --- a/deps/uv/test/run-tests.c +++ b/deps/uv/test/run-tests.c @@ -22,6 +22,7 @@ #include <stdio.h> #include <string.h> +#include "uv.h" #include "runner.h" #include "task.h" @@ -48,12 +49,115 @@ int main(int argc, char **argv) { } +static uv_pipe_t channel; +static uv_tcp_t tcp_server; +static uv_write_t conn_notify_req; +static int close_cb_called; +static int connection_accepted; + + +static void close_cb(uv_handle_t* handle) { + close_cb_called++; +} + + +static void close_conn_cb(uv_handle_t* handle) { + free(handle); + close_cb_called++; +} + + +void conn_notify_write_cb(uv_write_t* req, int status) { + uv_close((uv_handle_t*)&tcp_server, close_cb); + uv_close((uv_handle_t*)&channel, close_cb); +} + + +static void ipc_on_connection(uv_stream_t* server, int status) { + int r; + uv_buf_t buf; + uv_tcp_t* conn; + + if (!connection_accepted) { + /* + * Accept the connection and close it. Also let the other + * side know. + */ + ASSERT(status == 0); + ASSERT((uv_stream_t*)&tcp_server == server); + + conn = malloc(sizeof(*conn)); + ASSERT(conn); + + r = uv_tcp_init(server->loop, conn); + ASSERT(r == 0); + + r = uv_accept(server, (uv_stream_t*)conn); + ASSERT(r == 0); + + uv_close((uv_handle_t*)conn, close_conn_cb); + + buf = uv_buf_init("accepted_connection\n", 20); + r = uv_write2(&conn_notify_req, (uv_stream_t*)&channel, &buf, 1, + NULL, conn_notify_write_cb); + ASSERT(r == 0); + + connection_accepted = 1; + } +} + + +static int ipc_helper() { + /* + * This is launched from test-ipc.c. stdin is a duplex channel that we + * over which a handle will be transmitted. In this initial version only + * data is transfered over the channel. XXX edit this comment after handle + * transfer is added. + */ + + uv_write_t write_req; + int r; + uv_buf_t buf; + + r = uv_pipe_init(uv_default_loop(), &channel, 1); + ASSERT(r == 0); + + uv_pipe_open(&channel, 0); + + r = uv_tcp_init(uv_default_loop(), &tcp_server); + ASSERT(r == 0); + + r = uv_tcp_bind(&tcp_server, uv_ip4_addr("0.0.0.0", TEST_PORT)); + ASSERT(r == 0); + + r = uv_listen((uv_stream_t*)&tcp_server, 12, ipc_on_connection); + ASSERT(r == 0); + + buf = uv_buf_init("hello\n", 6); + r = uv_write2(&write_req, (uv_stream_t*)&channel, &buf, 1, + (uv_stream_t*)&tcp_server, NULL); + ASSERT(r == 0); + + r = uv_run(uv_default_loop()); + ASSERT(r == 0); + + ASSERT(connection_accepted == 1); + ASSERT(close_cb_called == 3); + + return 0; +} + + static int maybe_run_test(int argc, char **argv) { if (strcmp(argv[1], "--list") == 0) { print_tests(stdout); return 0; } + if (strcmp(argv[1], "ipc_helper") == 0) { + return ipc_helper(); + } + if (strcmp(argv[1], "spawn_helper1") == 0) { return 1; } diff --git a/deps/uv/src/win/stdio.c b/deps/uv/test/test-error.c index b65e7fb5448..8d6f2355273 100644 --- a/deps/uv/src/win/stdio.c +++ b/deps/uv/test/test-error.c @@ -19,57 +19,41 @@ * IN THE SOFTWARE. */ -#include <assert.h> -#include <string.h> - #include "uv.h" -#include "../uv-common.h" -#include "internal.h" - - -static uv_pipe_t* uv_make_pipe_for_std_handle(uv_loop_t* loop, HANDLE handle) { - uv_pipe_t* pipe = NULL; +#include "task.h" - pipe = (uv_pipe_t*)malloc(sizeof(uv_pipe_t)); - if (!pipe) { - uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); - } - - if (uv_pipe_init_with_handle(loop, pipe, handle)) { - free(pipe); - return NULL; - } +#include <stdio.h> +#include <stdlib.h> +#include <string.h> - pipe->flags |= UV_HANDLE_UV_ALLOCED; - return pipe; -} +/* + * Synthetic errors (errors that originate from within libuv, not the system) + * should produce sensible error messages when run through uv_strerror(). + * + * See https://github.com/joyent/libuv/issues/210 + */ +TEST_IMPL(error_message) { + uv_err_t e; -uv_stream_t* uv_std_handle(uv_loop_t* loop, uv_std_type type) { - HANDLE handle; + /* Cop out. Can't do proper checks on systems with + * i18n-ized error messages... + */ + e.code = 0, e.sys_errno_ = 0; - switch (type) { - case UV_STDIN: - handle = GetStdHandle(STD_INPUT_HANDLE); - if (handle == INVALID_HANDLE_VALUE) { - return NULL; - } + if (strcmp(uv_strerror(e), "Success") != 0) { + printf("i18n error messages detected, skipping test.\n"); + return 0; + } - /* Assume only named pipes for now. */ - return (uv_stream_t*)uv_make_pipe_for_std_handle(loop, handle); - break; + e.code = UV_EINVAL, e.sys_errno_ = 0; + ASSERT(strstr(uv_strerror(e), "Success") == NULL); - case UV_STDOUT: - return NULL; - break; + e.code = UV_UNKNOWN, e.sys_errno_ = 0; + ASSERT(strcmp(uv_strerror(e), "Unknown error") == 0); - case UV_STDERR: - return NULL; - break; + e.code = 1337, e.sys_errno_ = 0; + ASSERT(strcmp(uv_strerror(e), "Unknown error") == 0); - default: - assert(0); - uv__set_artificial_error(loop, UV_EINVAL); - return NULL; - } + return 0; } diff --git a/deps/uv/test/test-ipc.c b/deps/uv/test/test-ipc.c new file mode 100644 index 00000000000..0024cdee5c5 --- /dev/null +++ b/deps/uv/test/test-ipc.c @@ -0,0 +1,221 @@ +/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include "uv.h" +#include "task.h" + +#include <stdio.h> +#include <string.h> + +static char exepath[1024]; +static size_t exepath_size = 1024; +static char* args[3]; +static uv_pipe_t channel; +static uv_tcp_t tcp_server; + +static int exit_cb_called; +static int read2_cb_called; +static int local_conn_accepted; +static int remote_conn_accepted; +static int tcp_server_listening; + +static uv_write_t write_req; + +typedef struct { + uv_connect_t conn_req; + uv_tcp_t conn; +} tcp_conn; + +#define CONN_COUNT 100 + + +static void close_server_conn_cb(uv_handle_t* handle) { + free(handle); +} + + +static void ipc_on_connection(uv_stream_t* server, int status) { + uv_tcp_t* conn; + int r; + + if (!local_conn_accepted) { + /* Accept the connection and close it. Also and close the server. */ + ASSERT(status == 0); + ASSERT((uv_stream_t*)&tcp_server == server); + + conn = malloc(sizeof(*conn)); + ASSERT(conn); + r = uv_tcp_init(server->loop, conn); + ASSERT(r == 0); + + r = uv_accept(server, (uv_stream_t*)conn); + ASSERT(r == 0); + + uv_close((uv_handle_t*)conn, close_server_conn_cb); + uv_close((uv_handle_t*)server, NULL); + local_conn_accepted = 1; + } +} + + +static void exit_cb(uv_process_t* process, int exit_status, int term_signal) { + printf("exit_cb\n"); + exit_cb_called++; + ASSERT(exit_status == 0); + uv_close((uv_handle_t*)process, NULL); +} + + +static uv_buf_t on_alloc(uv_handle_t* handle, size_t suggested_size) { + return uv_buf_init(malloc(suggested_size), suggested_size); +} + + +static void close_client_conn_cb(uv_handle_t* handle) { + tcp_conn* p = (tcp_conn*)handle->data; + free(p); +} + + +static void connect_cb(uv_connect_t* req, int status) { + uv_close((uv_handle_t*)req->handle, close_client_conn_cb); +} + + +static void make_many_connections() { + tcp_conn* conn; + struct sockaddr_in addr; + int r, i; + + for (i = 0; i < CONN_COUNT; i++) { + conn = malloc(sizeof(*conn)); + ASSERT(conn); + + r = uv_tcp_init(uv_default_loop(), &conn->conn); + ASSERT(r == 0); + + addr = uv_ip4_addr("127.0.0.1", TEST_PORT); + + r = uv_tcp_connect(&conn->conn_req, (uv_tcp_t*)&conn->conn, addr, connect_cb); + ASSERT(r == 0); + + conn->conn.data = conn; + } +} + + +static void on_read(uv_pipe_t* pipe, ssize_t nread, uv_buf_t buf, + uv_handle_type pending) { + int r; + uv_buf_t outbuf; + uv_err_t err; + + if (nread == 0) { + /* Everything OK, but nothing read. */ + free(buf.base); + return; + } + + if (nread < 0) { + err = uv_last_error(pipe->loop); + if (err.code == UV_EOF) { + free(buf.base); + return; + } + + printf("error recving on channel: %s\n", uv_strerror(err)); + abort(); + } + + fprintf(stderr, "got %d bytes\n", (int)nread); + + if (!tcp_server_listening) { + ASSERT(nread > 0 && buf.base && pending != UV_UNKNOWN_HANDLE); + read2_cb_called++; + + /* Accept the pending TCP server, and start listening on it. */ + ASSERT(pending == UV_TCP); + r = uv_tcp_init(uv_default_loop(), &tcp_server); + ASSERT(r == 0); + + r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server); + ASSERT(r == 0); + + r = uv_listen((uv_stream_t*)&tcp_server, 12, ipc_on_connection); + ASSERT(r == 0); + + tcp_server_listening = 1; + + /* Make sure that the expected data is correctly multiplexed. */ + ASSERT(memcmp("hello\n", buf.base, nread) == 0); + + outbuf = uv_buf_init("world\n", 6); + r = uv_write(&write_req, (uv_stream_t*)pipe, &outbuf, 1, NULL); + ASSERT(r == 0); + + /* Create a bunch of connections to get both servers to accept. */ + make_many_connections(); + } else if (memcmp("accepted_connection\n", buf.base, nread) == 0) { + /* Remote server has accepted a connection. Close the channel. */ + ASSERT(pending == UV_UNKNOWN_HANDLE); + remote_conn_accepted = 1; + uv_close((uv_handle_t*)&channel, NULL); + } + + free(buf.base); +} + + +TEST_IMPL(ipc) { + int r; + uv_process_options_t options; + uv_process_t process; + + r = uv_pipe_init(uv_default_loop(), &channel, 1); + ASSERT(r == 0); + + memset(&options, 0, sizeof(uv_process_options_t)); + + r = uv_exepath(exepath, &exepath_size); + ASSERT(r == 0); + exepath[exepath_size] = '\0'; + args[0] = exepath; + args[1] = "ipc_helper"; + args[2] = NULL; + options.file = exepath; + options.args = args; + options.exit_cb = exit_cb; + options.stdin_stream = &channel; + + r = uv_spawn(uv_default_loop(), &process, options); + ASSERT(r == 0); + + uv_read2_start((uv_stream_t*)&channel, on_alloc, on_read); + + r = uv_run(uv_default_loop()); + ASSERT(r == 0); + + ASSERT(local_conn_accepted == 1); + ASSERT(remote_conn_accepted == 1); + ASSERT(read2_cb_called == 1); + ASSERT(exit_cb_called == 1); + return 0; +} diff --git a/deps/uv/test/test-list.h b/deps/uv/test/test-list.h index f137493b295..d11257aa95e 100644 --- a/deps/uv/test/test-list.h +++ b/deps/uv/test/test-list.h @@ -20,6 +20,7 @@ */ TEST_DECLARE (tty) +TEST_DECLARE (ipc) TEST_DECLARE (tcp_ping_pong) TEST_DECLARE (tcp_ping_pong_v6) TEST_DECLARE (tcp_ref) @@ -53,6 +54,7 @@ TEST_DECLARE (connection_fail) TEST_DECLARE (connection_fail_doesnt_auto_close) TEST_DECLARE (shutdown_eof) TEST_DECLARE (callback_stack) +TEST_DECLARE (error_message) TEST_DECLARE (timer) TEST_DECLARE (timer_ref) TEST_DECLARE (timer_ref2) @@ -110,6 +112,7 @@ HELPER_DECLARE (pipe_echo_server) TASK_LIST_START TEST_ENTRY (tty) + TEST_ENTRY (ipc) TEST_ENTRY (tcp_ref) @@ -166,6 +169,8 @@ TASK_LIST_START TEST_ENTRY (callback_stack) TEST_HELPER (callback_stack, tcp4_echo_server) + TEST_ENTRY (error_message) + TEST_ENTRY (timer) TEST_ENTRY (timer_ref) TEST_ENTRY (timer_ref2) diff --git a/deps/uv/test/test-ping-pong.c b/deps/uv/test/test-ping-pong.c index f452fce50b5..0e59166c48d 100644 --- a/deps/uv/test/test-ping-pong.c +++ b/deps/uv/test/test-ping-pong.c @@ -204,7 +204,7 @@ static void pipe_pinger_new() { pinger->pongs = 0; /* Try to connec to the server and do NUM_PINGS ping-pongs. */ - r = uv_pipe_init(uv_default_loop(), &pinger->stream.pipe); + r = uv_pipe_init(uv_default_loop(), &pinger->stream.pipe, 0); pinger->stream.pipe.data = pinger; ASSERT(!r); diff --git a/deps/uv/test/test-pipe-bind-error.c b/deps/uv/test/test-pipe-bind-error.c index 832ce023153..3443f19dc87 100644 --- a/deps/uv/test/test-pipe-bind-error.c +++ b/deps/uv/test/test-pipe-bind-error.c @@ -45,12 +45,12 @@ TEST_IMPL(pipe_bind_error_addrinuse) { uv_pipe_t server1, server2; int r; - r = uv_pipe_init(uv_default_loop(), &server1); + r = uv_pipe_init(uv_default_loop(), &server1, 0); ASSERT(r == 0); r = uv_pipe_bind(&server1, TEST_PIPENAME); ASSERT(r == 0); - r = uv_pipe_init(uv_default_loop(), &server2); + r = uv_pipe_init(uv_default_loop(), &server2, 0); ASSERT(r == 0); r = uv_pipe_bind(&server2, TEST_PIPENAME); ASSERT(r == -1); @@ -79,7 +79,7 @@ TEST_IMPL(pipe_bind_error_addrnotavail) { uv_pipe_t server; int r; - r = uv_pipe_init(uv_default_loop(), &server); + r = uv_pipe_init(uv_default_loop(), &server, 0); ASSERT(r == 0); r = uv_pipe_bind(&server, BAD_PIPENAME); @@ -100,7 +100,7 @@ TEST_IMPL(pipe_bind_error_inval) { uv_pipe_t server; int r; - r = uv_pipe_init(uv_default_loop(), &server); + r = uv_pipe_init(uv_default_loop(), &server, 0); ASSERT(r == 0); r = uv_pipe_bind(&server, TEST_PIPENAME); ASSERT(r == 0); @@ -123,7 +123,7 @@ TEST_IMPL(pipe_listen_without_bind) { uv_pipe_t server; int r; - r = uv_pipe_init(uv_default_loop(), &server); + r = uv_pipe_init(uv_default_loop(), &server, 0); ASSERT(r == 0); r = uv_listen((uv_stream_t*)&server, SOMAXCONN, NULL); ASSERT(r == -1); diff --git a/deps/uv/test/test-spawn.c b/deps/uv/test/test-spawn.c index 653f9ac95d3..238e6c9cea9 100644 --- a/deps/uv/test/test-spawn.c +++ b/deps/uv/test/test-spawn.c @@ -67,7 +67,7 @@ static void kill_cb(uv_process_t* process, int exit_status, int term_signal) { } -uv_buf_t on_alloc(uv_handle_t* handle, size_t suggested_size) { +static uv_buf_t on_alloc(uv_handle_t* handle, size_t suggested_size) { uv_buf_t buf; buf.base = output + output_used; buf.len = OUTPUT_SIZE - output_used; @@ -138,7 +138,7 @@ TEST_IMPL(spawn_stdout) { init_process_options("spawn_helper2", exit_cb); - uv_pipe_init(uv_default_loop(), &out); + uv_pipe_init(uv_default_loop(), &out, 0); options.stdout_stream = &out; r = uv_spawn(uv_default_loop(), &process, options); @@ -169,8 +169,8 @@ int r; init_process_options("spawn_helper3", exit_cb); - uv_pipe_init(uv_default_loop(), &out); - uv_pipe_init(uv_default_loop(), &in); + uv_pipe_init(uv_default_loop(), &out, 0); + uv_pipe_init(uv_default_loop(), &in, 0); options.stdout_stream = &out; options.stdin_stream = ∈ @@ -229,7 +229,7 @@ TEST_IMPL(spawn_detect_pipe_name_collisions_on_windows) { init_process_options("spawn_helper2", exit_cb); - uv_pipe_init(uv_default_loop(), &out); + uv_pipe_init(uv_default_loop(), &out, 0); options.stdout_stream = &out; /* Create a pipe that'll cause a collision. */ diff --git a/deps/uv/uv.gyp b/deps/uv/uv.gyp index e85927aa31e..a4298377189 100644 --- a/deps/uv/uv.gyp +++ b/deps/uv/uv.gyp @@ -117,7 +117,6 @@ 'src/win/pipe.c', 'src/win/process.c', 'src/win/req.c', - 'src/win/stdio.c', 'src/win/stream.c', 'src/win/tcp.c', 'src/win/tty.c', @@ -240,6 +239,7 @@ 'type': 'executable', 'dependencies': [ 'uv' ], 'sources': [ + 'test/blackhole-server.c', 'test/echo-server.c', 'test/run-tests.c', 'test/runner.c', @@ -247,6 +247,7 @@ 'test/test-get-loadavg.c', 'test/task.h', 'test/test-async.c', + 'test/test-error.c', 'test/test-callback-stack.c', 'test/test-connection-fail.c', 'test/test-delayed-accept.c', @@ -260,6 +261,7 @@ 'test/test-getsockname.c', 'test/test-hrtime.c', 'test/test-idle.c', + 'test/test-ipc.c', 'test/test-list.h', 'test/test-loop-handles.c', 'test/test-pass-always.c', @@ -325,9 +327,11 @@ 'test/benchmark-pump.c', 'test/benchmark-sizes.c', 'test/benchmark-spawn.c', + 'test/benchmark-tcp-write-batch.c', 'test/benchmark-udp-packet-storm.c', 'test/dns-server.c', 'test/echo-server.c', + 'test/blackhole-server.c', 'test/run-benchmarks.c', 'test/runner.c', 'test/runner.h', @@ -89,7 +89,6 @@ 'src/node_string.cc', 'src/node_zlib.cc', 'src/pipe_wrap.cc', - 'src/stdio_wrap.cc', 'src/stream_wrap.cc', 'src/tcp_wrap.cc', 'src/timer_wrap.cc', diff --git a/src/node_extensions.h b/src/node_extensions.h index 5cc76611966..a9023c62f76 100644 --- a/src/node_extensions.h +++ b/src/node_extensions.h @@ -48,7 +48,6 @@ NODE_EXT_LIST_ITEM(node_tcp_wrap) NODE_EXT_LIST_ITEM(node_udp_wrap) NODE_EXT_LIST_ITEM(node_pipe_wrap) NODE_EXT_LIST_ITEM(node_cares_wrap) -NODE_EXT_LIST_ITEM(node_stdio_wrap) NODE_EXT_LIST_ITEM(node_tty_wrap) NODE_EXT_LIST_ITEM(node_process_wrap) NODE_EXT_LIST_ITEM(node_fs_event_wrap) diff --git a/src/pipe_wrap.cc b/src/pipe_wrap.cc index 162f1efef2a..c4f965d65fd 100644 --- a/src/pipe_wrap.cc +++ b/src/pipe_wrap.cc @@ -95,7 +95,7 @@ Handle<Value> PipeWrap::New(const Arguments& args) { PipeWrap::PipeWrap(Handle<Object> object) : StreamWrap(object, (uv_stream_t*) &handle_) { - int r = uv_pipe_init(uv_default_loop(), &handle_); + int r = uv_pipe_init(uv_default_loop(), &handle_, 0); assert(r == 0); // How do we proxy this error up to javascript? // Suggestion: uv_pipe_init() returns void. handle_.data = reinterpret_cast<void*>(this); diff --git a/src/stdio_wrap.cc b/src/stdio_wrap.cc deleted file mode 100644 index 3b5fb4b6940..00000000000 --- a/src/stdio_wrap.cc +++ /dev/null @@ -1,157 +0,0 @@ -#include <node.h> -#include <node_buffer.h> -#include <req_wrap.h> -#include <handle_wrap.h> -#include <stream_wrap.h> - -#define UNWRAP \ - assert(!args.Holder().IsEmpty()); \ - assert(args.Holder()->InternalFieldCount() > 0); \ - StdIOWrap* wrap = \ - static_cast<StdIOWrap*>(args.Holder()->GetPointerFromInternalField(0)); \ - if (!wrap) { \ - SetErrno(UV_EBADF); \ - return scope.Close(Integer::New(-1)); \ - } - -namespace node { - -using v8::Object; -using v8::Handle; -using v8::Local; -using v8::Persistent; -using v8::Value; -using v8::HandleScope; -using v8::FunctionTemplate; -using v8::String; -using v8::Function; -using v8::TryCatch; -using v8::Context; -using v8::Arguments; -using v8::Integer; -using v8::Undefined; - -extern Persistent<Function> tcpConstructor; -extern Persistent<Function> pipeConstructor; -static Persistent<Function> constructor; - - -class StdIOWrap : StreamWrap { - public: - static void Initialize(Handle<Object> target) { - StreamWrap::Initialize(target); - - HandleScope scope; - - Local<FunctionTemplate> t = FunctionTemplate::New(New); - t->SetClassName(String::NewSymbol("StdIO")); - - t->InstanceTemplate()->SetInternalFieldCount(1); - - NODE_SET_PROTOTYPE_METHOD(t, "readStart", StreamWrap::ReadStart); - NODE_SET_PROTOTYPE_METHOD(t, "readStop", StreamWrap::ReadStop); - NODE_SET_PROTOTYPE_METHOD(t, "write", StreamWrap::Write); - NODE_SET_PROTOTYPE_METHOD(t, "listen", Listen); - - constructor = Persistent<Function>::New(t->GetFunction()); - - target->Set(String::NewSymbol("StdIO"), constructor); - } - - private: - static Handle<Value> New(const Arguments& args) { - // This constructor should not be exposed to public javascript. - // Therefore we assert that we are not trying to call this as a - // normal function. - assert(args.IsConstructCall()); - - uv_std_type stdHandleType = (uv_std_type)args[0]->Int32Value(); - - assert(stdHandleType == UV_STDIN || stdHandleType == UV_STDOUT || stdHandleType == UV_STDERR); - - uv_stream_t* stdHandle = uv_std_handle(uv_default_loop(), stdHandleType); - if (stdHandle) { - HandleScope scope; - StdIOWrap* wrap = new StdIOWrap(args.This()); - assert(wrap); - - wrap->handle_ = stdHandle; - wrap->SetHandle((uv_handle_t*)stdHandle); - wrap->UpdateWriteQueueSize(); - - return scope.Close(args.This()); - } else { - return Undefined(); - } - } - - StdIOWrap(Handle<Object> object) : StreamWrap(object, NULL) { - } - - static Handle<Value> Listen(const Arguments& args) { - HandleScope scope; - - UNWRAP - - int backlog = args[0]->Int32Value(); - - int r = uv_listen(wrap->handle_, SOMAXCONN, OnConnection); - - // Error starting the pipe. - if (r) SetErrno(uv_last_error(uv_default_loop()).code); - - return scope.Close(Integer::New(r)); - } - - // TODO maybe share with TCPWrap? - static void OnConnection(uv_stream_t* handle, int status) { - HandleScope scope; - Local<Object> client_obj; - - StdIOWrap* wrap = static_cast<StdIOWrap*>(handle->data); - assert(wrap->handle_ == handle); - - // We should not be getting this callback if someone as already called - // uv_close() on the handle. - assert(wrap->object_.IsEmpty() == false); - - if (status != 0) { - // TODO Handle server error (set errno and call onconnection with NULL) - assert(0); - return; - } - - // Instanciate the client javascript object and handle. - switch (handle->type) { - case UV_TCP: - client_obj = tcpConstructor->NewInstance(); - break; - case UV_NAMED_PIPE: - client_obj = pipeConstructor->NewInstance(); - break; - default: - assert(0); - return; - } - - // Unwrap the client javascript object. - assert(client_obj->InternalFieldCount() > 0); - StreamWrap* client_wrap = - static_cast<StreamWrap*>(client_obj->GetPointerFromInternalField(0)); - - int r = uv_accept(handle, client_wrap->GetStream()); - - // uv_accept should always work. - assert(r == 0); - - // Successful accept. Call the onconnection callback in JavaScript land. - Local<Value> argv[1] = { client_obj }; - MakeCallback(wrap->object_, "onconnection", 1, argv); - } - - uv_stream_t* handle_; -}; - -} // namespace node - -NODE_MODULE(node_stdio_wrap, node::StdIOWrap::Initialize); @@ -893,7 +893,6 @@ def build(bld): src/udp_wrap.cc src/pipe_wrap.cc src/cares_wrap.cc - src/stdio_wrap.cc src/tty_wrap.cc src/fs_event_wrap.cc src/process_wrap.cc |