diff options
author | Roman Arutyunyan <arut@nginx.com> | 2016-09-15 14:55:46 +0300 |
---|---|---|
committer | Roman Arutyunyan <arut@nginx.com> | 2016-09-15 14:55:46 +0300 |
commit | 04b9434b1817def37eb55cd7f4360d1164678797 (patch) | |
tree | c0b609612ea98be3291822b4685d5cda6f4ea8d7 /src/os/unix | |
parent | afa771140bcf677505adcc6517008a9f9108c878 (diff) |
Stream: filters.
Diffstat (limited to 'src/os/unix')
-rw-r--r-- | src/os/unix/ngx_darwin_init.c | 1 | ||||
-rw-r--r-- | src/os/unix/ngx_freebsd_init.c | 1 | ||||
-rw-r--r-- | src/os/unix/ngx_linux_init.c | 1 | ||||
-rw-r--r-- | src/os/unix/ngx_os.h | 3 | ||||
-rw-r--r-- | src/os/unix/ngx_posix_init.c | 1 | ||||
-rw-r--r-- | src/os/unix/ngx_solaris_init.c | 1 | ||||
-rw-r--r-- | src/os/unix/ngx_udp_sendmsg_chain.c | 245 |
7 files changed, 253 insertions, 0 deletions
diff --git a/src/os/unix/ngx_darwin_init.c b/src/os/unix/ngx_darwin_init.c index a9d12a823..aabe02ff9 100644 --- a/src/os/unix/ngx_darwin_init.c +++ b/src/os/unix/ngx_darwin_init.c @@ -24,6 +24,7 @@ static ngx_os_io_t ngx_darwin_io = { ngx_udp_unix_recv, ngx_unix_send, ngx_udp_unix_send, + ngx_udp_unix_sendmsg_chain, #if (NGX_HAVE_SENDFILE) ngx_darwin_sendfile_chain, NGX_IO_SENDFILE diff --git a/src/os/unix/ngx_freebsd_init.c b/src/os/unix/ngx_freebsd_init.c index 71672c70b..1823f0281 100644 --- a/src/os/unix/ngx_freebsd_init.c +++ b/src/os/unix/ngx_freebsd_init.c @@ -33,6 +33,7 @@ static ngx_os_io_t ngx_freebsd_io = { ngx_udp_unix_recv, ngx_unix_send, ngx_udp_unix_send, + ngx_udp_unix_sendmsg_chain, #if (NGX_HAVE_SENDFILE) ngx_freebsd_sendfile_chain, NGX_IO_SENDFILE diff --git a/src/os/unix/ngx_linux_init.c b/src/os/unix/ngx_linux_init.c index a1372e960..a8cf6a048 100644 --- a/src/os/unix/ngx_linux_init.c +++ b/src/os/unix/ngx_linux_init.c @@ -19,6 +19,7 @@ static ngx_os_io_t ngx_linux_io = { ngx_udp_unix_recv, ngx_unix_send, ngx_udp_unix_send, + ngx_udp_unix_sendmsg_chain, #if (NGX_HAVE_SENDFILE) ngx_linux_sendfile_chain, NGX_IO_SENDFILE diff --git a/src/os/unix/ngx_os.h b/src/os/unix/ngx_os.h index e22f07c79..3b328191b 100644 --- a/src/os/unix/ngx_os.h +++ b/src/os/unix/ngx_os.h @@ -29,6 +29,7 @@ typedef struct { ngx_recv_pt udp_recv; ngx_send_pt send; ngx_send_pt udp_send; + ngx_send_chain_pt udp_send_chain; ngx_send_chain_pt send_chain; ngx_uint_t flags; } ngx_os_io_t; @@ -49,6 +50,8 @@ ssize_t ngx_unix_send(ngx_connection_t *c, u_char *buf, size_t size); ngx_chain_t *ngx_writev_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit); ssize_t ngx_udp_unix_send(ngx_connection_t *c, u_char *buf, size_t size); +ngx_chain_t *ngx_udp_unix_sendmsg_chain(ngx_connection_t *c, ngx_chain_t *in, + off_t limit); #if (IOV_MAX > 64) diff --git a/src/os/unix/ngx_posix_init.c b/src/os/unix/ngx_posix_init.c index 7e6e79d01..583ea4ff9 100644 --- a/src/os/unix/ngx_posix_init.c +++ b/src/os/unix/ngx_posix_init.c @@ -25,6 +25,7 @@ ngx_os_io_t ngx_os_io = { ngx_udp_unix_recv, ngx_unix_send, ngx_udp_unix_send, + ngx_udp_unix_sendmsg_chain, ngx_writev_chain, 0 }; diff --git a/src/os/unix/ngx_solaris_init.c b/src/os/unix/ngx_solaris_init.c index 83acae144..65d787539 100644 --- a/src/os/unix/ngx_solaris_init.c +++ b/src/os/unix/ngx_solaris_init.c @@ -20,6 +20,7 @@ static ngx_os_io_t ngx_solaris_io = { ngx_udp_unix_recv, ngx_unix_send, ngx_udp_unix_send, + ngx_udp_unix_sendmsg_chain, #if (NGX_HAVE_SENDFILE) ngx_solaris_sendfilev_chain, NGX_IO_SENDFILE diff --git a/src/os/unix/ngx_udp_sendmsg_chain.c b/src/os/unix/ngx_udp_sendmsg_chain.c new file mode 100644 index 000000000..65bde6f97 --- /dev/null +++ b/src/os/unix/ngx_udp_sendmsg_chain.c @@ -0,0 +1,245 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) Nginx, Inc. + */ + + +#include <ngx_config.h> +#include <ngx_core.h> +#include <ngx_event.h> + + +static ngx_chain_t *ngx_udp_output_chain_to_iovec(ngx_iovec_t *vec, + ngx_chain_t *in, ngx_log_t *log); +static ssize_t ngx_sendmsg(ngx_connection_t *c, ngx_iovec_t *vec); + + +ngx_chain_t * +ngx_udp_unix_sendmsg_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit) +{ + ssize_t n; + off_t send; + ngx_chain_t *cl; + ngx_event_t *wev; + ngx_iovec_t vec; + struct iovec iovs[NGX_IOVS_PREALLOCATE]; + + wev = c->write; + + if (!wev->ready) { + return in; + } + +#if (NGX_HAVE_KQUEUE) + + if ((ngx_event_flags & NGX_USE_KQUEUE_EVENT) && wev->pending_eof) { + (void) ngx_connection_error(c, wev->kq_errno, + "kevent() reported about an closed connection"); + wev->error = 1; + return NGX_CHAIN_ERROR; + } + +#endif + + /* the maximum limit size is the maximum size_t value - the page size */ + + if (limit == 0 || limit > (off_t) (NGX_MAX_SIZE_T_VALUE - ngx_pagesize)) { + limit = NGX_MAX_SIZE_T_VALUE - ngx_pagesize; + } + + send = 0; + + vec.iovs = iovs; + vec.nalloc = NGX_IOVS_PREALLOCATE; + + for ( ;; ) { + + /* create the iovec and coalesce the neighbouring bufs */ + + cl = ngx_udp_output_chain_to_iovec(&vec, in, c->log); + + if (cl == NGX_CHAIN_ERROR) { + return NGX_CHAIN_ERROR; + } + + if (cl && cl->buf->in_file) { + ngx_log_error(NGX_LOG_ALERT, c->log, 0, + "file buf in sendmsg " + "t:%d r:%d f:%d %p %p-%p %p %O-%O", + cl->buf->temporary, + cl->buf->recycled, + cl->buf->in_file, + cl->buf->start, + cl->buf->pos, + cl->buf->last, + cl->buf->file, + cl->buf->file_pos, + cl->buf->file_last); + + ngx_debug_point(); + + return NGX_CHAIN_ERROR; + } + + if (cl == in) { + return in; + } + + send += vec.size; + + n = ngx_sendmsg(c, &vec); + + if (n == NGX_ERROR) { + return NGX_CHAIN_ERROR; + } + + if (n == NGX_AGAIN) { + wev->ready = 0; + return in; + } + + c->sent += n; + + in = ngx_chain_update_sent(in, n); + + if (send >= limit || in == NULL) { + return in; + } + } +} + + +static ngx_chain_t * +ngx_udp_output_chain_to_iovec(ngx_iovec_t *vec, ngx_chain_t *in, ngx_log_t *log) +{ + size_t total, size; + u_char *prev; + ngx_uint_t n, flush; + ngx_chain_t *cl; + struct iovec *iov; + + cl = in; + iov = NULL; + prev = NULL; + total = 0; + n = 0; + flush = 0; + + for ( /* void */ ; in && !flush; in = in->next) { + + if (in->buf->flush || in->buf->last_buf) { + flush = 1; + } + + if (ngx_buf_special(in->buf)) { + continue; + } + + if (in->buf->in_file) { + break; + } + + if (!ngx_buf_in_memory(in->buf)) { + ngx_log_error(NGX_LOG_ALERT, log, 0, + "bad buf in output chain " + "t:%d r:%d f:%d %p %p-%p %p %O-%O", + in->buf->temporary, + in->buf->recycled, + in->buf->in_file, + in->buf->start, + in->buf->pos, + in->buf->last, + in->buf->file, + in->buf->file_pos, + in->buf->file_last); + + ngx_debug_point(); + + return NGX_CHAIN_ERROR; + } + + size = in->buf->last - in->buf->pos; + + if (prev == in->buf->pos) { + iov->iov_len += size; + + } else { + if (n == vec->nalloc) { + ngx_log_error(NGX_LOG_ALERT, log, 0, + "too many parts in a datagram"); + return NGX_CHAIN_ERROR; + } + + iov = &vec->iovs[n++]; + + iov->iov_base = (void *) in->buf->pos; + iov->iov_len = size; + } + + prev = in->buf->pos + size; + total += size; + } + + if (!flush) { +#if (NGX_SUPPRESS_WARN) + vec->size = 0; + vec->count = 0; +#endif + return cl; + } + + vec->count = n; + vec->size = total; + + return in; +} + + +static ssize_t +ngx_sendmsg(ngx_connection_t *c, ngx_iovec_t *vec) +{ + ssize_t n; + ngx_err_t err; + struct msghdr msg; + + ngx_memzero(&msg, sizeof(struct msghdr)); + + if (c->socklen) { + msg.msg_name = c->sockaddr; + msg.msg_namelen = c->socklen; + } + + msg.msg_iov = vec->iovs; + msg.msg_iovlen = vec->count; + +eintr: + + n = sendmsg(c->fd, &msg, 0); + + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0, + "sendmsg: %z of %uz", n, vec->size); + + if (n == -1) { + err = ngx_errno; + + switch (err) { + case NGX_EAGAIN: + ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, err, + "sendmsg() not ready"); + return NGX_AGAIN; + + case NGX_EINTR: + ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, err, + "sendmsg() was interrupted"); + goto eintr; + + default: + c->write->error = 1; + ngx_connection_error(c, err, "sendmsg() failed"); + return NGX_ERROR; + } + } + + return n; +} |