Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/nginx/nginx.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/src/os/unix
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2004-07-05 10:55:54 +0400
committerIgor Sysoev <igor@sysoev.ru>2004-07-05 10:55:54 +0400
commit32fcd5cf64b9f55f9184e98e39f1b2152321a710 (patch)
treef349f2fb8106ba4490ec3f47e7330f160f6f5f9b /src/os/unix
parent98c1cf18c1a4ffb14ded78e93359f87ee7bdeed4 (diff)
nginx-0.0.7-2004-07-05-10:55:54 import
Diffstat (limited to 'src/os/unix')
-rw-r--r--src/os/unix/ngx_freebsd_rfork_thread.c207
-rw-r--r--src/os/unix/ngx_freebsd_rfork_thread.h93
-rw-r--r--src/os/unix/ngx_posix_cycle.c231
-rw-r--r--src/os/unix/ngx_process_cycle.c141
-rw-r--r--src/os/unix/ngx_process_cycle.h4
-rw-r--r--src/os/unix/ngx_pthread.c26
-rw-r--r--src/os/unix/ngx_pthread.h14
-rw-r--r--src/os/unix/ngx_pthread_thread.c268
-rw-r--r--src/os/unix/ngx_thread.h107
9 files changed, 648 insertions, 443 deletions
diff --git a/src/os/unix/ngx_freebsd_rfork_thread.c b/src/os/unix/ngx_freebsd_rfork_thread.c
index aa3e78ba6..6a367bdbc 100644
--- a/src/os/unix/ngx_freebsd_rfork_thread.c
+++ b/src/os/unix/ngx_freebsd_rfork_thread.c
@@ -10,18 +10,20 @@
/*
* The threads implementation uses the rfork(RFPROC|RFTHREAD|RFMEM) syscall
* to create threads. All threads use the stacks of the same size mmap()ed
- * below the main stack. Thus the current thread id is determinated through
- * the stack pointer.
+ * below the main stack. Thus the current thread id is determinated via
+ * the stack pointer value.
*
* The mutex implementation uses the ngx_atomic_cmp_set() operation
- * to acquire a mutex and the SysV semaphore to wait on a mutex or to wake up
+ * to acquire a mutex and the SysV semaphore to wait on a mutex and to wake up
* the waiting threads. The light mutex does not use semaphore, so after
* spinning in the lock the thread calls sched_yield(). However the light
* mutecies are intended to be used with the "trylock" operation only.
+ * The SysV semop() is a cheap syscall, particularly if it has little sembuf's
+ * and does not use SEM_UNDO.
*
- * The condition variable implementation uses the SysV semaphore set of two
- * semaphores. The first is used by the CV mutex, and the second is used
- * by the CV to signal.
+ * The condition variable implementation uses signal #64. The signal handler
+ * is SIG_IGN so the kill() is a cheap syscall. The thread waits a signal
+ * in kevent(). The use of the EVFILT_SIGNAL is safe since FreeBSD 4.7.
*
* This threads implementation currently works on i386 (486+) and amd64
* platforms only.
@@ -76,7 +78,7 @@ void _spinlock(ngx_atomic_t *lock)
for ( ;; ) {
if (*lock) {
- if (ngx_freebsd_hw_ncpu > 1 && tries++ < 1000) {
+ if (ngx_ncpu > 1 && tries++ < 1000) {
continue;
}
@@ -110,7 +112,7 @@ void _spinunlock(ngx_atomic_t *lock)
#endif
-int ngx_create_thread(ngx_tid_t *tid, int (*func)(void *arg), void *arg,
+int ngx_create_thread(ngx_tid_t *tid, void* (*func)(void *arg), void *arg,
ngx_log_t *log)
{
int id, err;
@@ -144,15 +146,10 @@ int ngx_create_thread(ngx_tid_t *tid, int (*func)(void *arg), void *arg,
ngx_log_debug2(NGX_LOG_DEBUG_CORE, log, 0,
"thread stack: " PTR_FMT "-" PTR_FMT, stack, stack_top);
-#if 1
- id = rfork_thread(RFPROC|RFTHREAD|RFMEM, stack_top, func, arg);
-#elif 1
- id = rfork_thread(RFPROC|RFMEM, stack_top, func, arg);
-#elif 1
- id = rfork_thread(RFFDG|RFCFDG, stack_top, func, arg);
-#else
- id = rfork(RFFDG|RFCFDG);
-#endif
+ ngx_set_errno(0);
+
+ id = rfork_thread(RFPROC|RFTHREAD|RFMEM, stack_top,
+ (ngx_rfork_thread_func_pt) func, arg);
err = ngx_errno;
@@ -174,10 +171,23 @@ int ngx_create_thread(ngx_tid_t *tid, int (*func)(void *arg), void *arg,
ngx_int_t ngx_init_threads(int n, size_t size, ngx_cycle_t *cycle)
{
- size_t len;
- char *red_zone, *zone;
-
- max_threads = n;
+ char *red_zone, *zone;
+ size_t len;
+ ngx_int_t i;
+ struct sigaction sa;
+
+ max_threads = n + 1;
+
+ for (i = 0; i < n; i++) {
+ ngx_memzero(&sa, sizeof(struct sigaction));
+ sa.sa_handler = SIG_IGN;
+ sigemptyset(&sa.sa_mask);
+ if (sigaction(NGX_CV_SIGNAL, &sa, NULL) == -1) {
+ ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
+ "sigaction(%d, SIG_IGN) failed", NGX_CV_SIGNAL);
+ return NGX_ERROR;
+ }
+ }
len = sizeof(ngx_freebsd_kern_usrstack);
if (sysctlbyname("kern.usrstack", &ngx_freebsd_kern_usrstack, &len,
@@ -249,14 +259,6 @@ ngx_tid_t ngx_thread_self()
return ngx_pid;
}
-#if 0
- if (tids[tid] == 0) {
- pid = ngx_pid;
- tids[tid] = pid;
- return pid;
- }
-#endif
-
return tids[tid];
}
@@ -301,7 +303,7 @@ ngx_mutex_t *ngx_mutex_init(ngx_log_t *log, uint flags)
}
-void ngx_mutex_done(ngx_mutex_t *m)
+void ngx_mutex_destroy(ngx_mutex_t *m)
{
if (semctl(m->semid, 0, IPC_RMID) == -1) {
ngx_log_error(NGX_LOG_ALERT, m->log, ngx_errno,
@@ -538,43 +540,26 @@ ngx_int_t ngx_mutex_unlock(ngx_mutex_t *m)
ngx_cond_t *ngx_cond_init(ngx_log_t *log)
{
- ngx_cond_t *cv;
- union semun op;
+ ngx_cond_t *cv;
if (!(cv = ngx_alloc(sizeof(ngx_cond_t), log))) {
return NULL;
}
+ cv->signo = NGX_CV_SIGNAL;
+ cv->tid = 0;
cv->log = log;
-
- cv->semid = semget(IPC_PRIVATE, 2, SEM_R|SEM_A);
- if (cv->semid == -1) {
- ngx_log_error(NGX_LOG_ALERT, log, ngx_errno, "semget() failed");
- return NULL;
- }
-
- op.val = 0;
-
- if (semctl(cv->semid, 0, SETVAL, op) == -1) {
- ngx_log_error(NGX_LOG_ALERT, log, ngx_errno, "semctl(SETVAL) failed");
-
- if (semctl(cv->semid, 0, IPC_RMID) == -1) {
- ngx_log_error(NGX_LOG_ALERT, log, ngx_errno,
- "semctl(IPC_RMID) failed");
- }
-
- return NULL;
- }
+ cv->kq = -1;
return cv;
}
-void ngx_cond_done(ngx_cond_t *cv)
+void ngx_cond_destroy(ngx_cond_t *cv)
{
- if (semctl(cv->semid, 0, IPC_RMID) == -1) {
+ if (close(cv->kq) == -1) {
ngx_log_error(NGX_LOG_ALERT, cv->log, ngx_errno,
- "semctl(IPC_RMID) failed");
+ "kqueue close() failed");
}
ngx_free(cv);
@@ -583,21 +568,101 @@ void ngx_cond_done(ngx_cond_t *cv)
ngx_int_t ngx_cond_wait(ngx_cond_t *cv, ngx_mutex_t *m)
{
- struct sembuf op;
+ int n;
+ ngx_err_t err;
+ struct kevent kev;
+ struct timespec ts;
+
+ if (cv->kq == -1) {
+
+ /*
+ * We have to add the EVFILT_SIGNAL filter in the rfork()ed thread.
+ * Otherwise the thread would not get a signal event.
+ *
+ * However, we have not to open the kqueue in the thread,
+ * it is simply handy do it together.
+ */
+
+ cv->kq = kqueue();
+ if (cv->kq == -1) {
+ ngx_log_error(NGX_LOG_ALERT, cv->log, ngx_errno, "kqueue() failed");
+ return NGX_ERROR;
+ }
- ngx_log_debug1(NGX_LOG_DEBUG_CORE, cv->log, 0,
- "cv " PTR_FMT " wait", cv);
+ ngx_log_debug2(NGX_LOG_DEBUG_CORE, cv->log, 0,
+ "cv kq:%d signo:%d", cv->kq, cv->signo);
- op.sem_num = 0;
- op.sem_op = -1;
- op.sem_flg = 0;
+ kev.ident = cv->signo;
+ kev.filter = EVFILT_SIGNAL;
+ kev.flags = EV_ADD;
+ kev.fflags = 0;
+ kev.data = 0;
+ kev.udata = NULL;
- if (semop(cv->semid, &op, 1) == -1) {
- ngx_log_error(NGX_LOG_ALERT, cv->log, ngx_errno,
- "semop() failed while waiting on cv " PTR_FMT, cv);
+ ts.tv_sec = 0;
+ ts.tv_nsec = 0;
+
+ if (kevent(cv->kq, &kev, 1, NULL, 0, &ts) == -1) {
+ ngx_log_error(NGX_LOG_ALERT, cv->log, ngx_errno, "kevent() failed");
+ return NGX_ERROR;
+ }
+ }
+
+ if (ngx_mutex_unlock(m) == NGX_ERROR) {
return NGX_ERROR;
}
+ ngx_log_debug3(NGX_LOG_DEBUG_CORE, cv->log, 0,
+ "cv " PTR_FMT " wait, kq:%d, signo:%d",
+ cv, cv->kq, cv->signo);
+
+ for ( ;; ) {
+ n = kevent(cv->kq, NULL, 0, &kev, 1, NULL);
+
+ ngx_log_debug2(NGX_LOG_DEBUG_CORE, cv->log, 0,
+ "cv " PTR_FMT " kevent: %d", cv, n);
+
+ if (n == -1) {
+ err = ngx_errno;
+ ngx_log_error((err == NGX_EINTR) ? NGX_LOG_INFO : NGX_LOG_ALERT,
+ cv->log, ngx_errno,
+ "kevent() failed while waiting condition variable "
+ PTR_FMT, cv);
+
+ if (err == NGX_EINTR) {
+ break;
+ }
+
+ return NGX_ERROR;
+ }
+
+ if (n == 0) {
+ ngx_log_error(NGX_LOG_ALERT, cv->log, 0,
+ "kevent() returned no events "
+ "while waiting condition variable " PTR_FMT,
+ cv);
+ continue;
+ }
+
+ if (kev.filter != EVFILT_SIGNAL) {
+ ngx_log_error(NGX_LOG_ALERT, cv->log, 0,
+ "kevent() returned unexpected events: %d "
+ "while waiting condition variable " PTR_FMT,
+ kev.filter, cv);
+ continue;
+ }
+
+ if (kev.ident != (uintptr_t) cv->signo) {
+ ngx_log_error(NGX_LOG_ALERT, cv->log, 0,
+ "kevent() returned unexpected signal: %d ",
+ "while waiting condition variable " PTR_FMT,
+ kev.ident, cv);
+ continue;
+ }
+
+ break;
+ }
+
ngx_log_debug1(NGX_LOG_DEBUG_CORE, cv->log, 0,
"cv " PTR_FMT " is waked up", cv);
@@ -611,18 +676,14 @@ ngx_int_t ngx_cond_wait(ngx_cond_t *cv, ngx_mutex_t *m)
ngx_int_t ngx_cond_signal(ngx_cond_t *cv)
{
- struct sembuf op;
-
- ngx_log_debug1(NGX_LOG_DEBUG_CORE, cv->log, 0,
- "cv " PTR_FMT " to signal", cv);
-
- op.sem_num = 0;
- op.sem_op = 1;
- op.sem_flg = 0;
+ ngx_log_debug3(NGX_LOG_DEBUG_CORE, cv->log, 0,
+ "cv " PTR_FMT " to signal " PID_T_FMT " %d",
+ cv, cv->tid, cv->signo);
- if (semop(cv->semid, &op, 1) == -1) {
+ if (kill(cv->tid, cv->signo) == -1) {
ngx_log_error(NGX_LOG_ALERT, cv->log, ngx_errno,
- "semop() failed while signaling cv " PTR_FMT, cv);
+ "kill() failed while signaling condition variable "
+ PTR_FMT, cv);
return NGX_ERROR;
}
diff --git a/src/os/unix/ngx_freebsd_rfork_thread.h b/src/os/unix/ngx_freebsd_rfork_thread.h
new file mode 100644
index 000000000..a9eb1d824
--- /dev/null
+++ b/src/os/unix/ngx_freebsd_rfork_thread.h
@@ -0,0 +1,93 @@
+#ifndef _NGX_FREEBSD_RFORK_THREAD_H_INCLUDED_
+#define _NGX_FREEBSD_RFORK_THREAD_H_INCLUDED_
+
+
+#include <sys/ipc.h>
+#include <sys/sem.h>
+#include <sched.h>
+
+typedef pid_t ngx_tid_t;
+
+#undef ngx_log_pid
+#define ngx_log_pid ngx_thread_self()
+#define ngx_log_tid 0
+
+#define TID_T_FMT PID_T_FMT
+
+
+#define NGX_MUTEX_LIGHT 1
+
+#define NGX_MUTEX_LOCK_BUSY 0x80000000
+
+typedef volatile struct {
+ ngx_atomic_t lock;
+ ngx_log_t *log;
+ int semid;
+} ngx_mutex_t;
+
+
+#define NGX_CV_SIGNAL 64
+
+typedef struct {
+ int signo;
+ int kq;
+ ngx_tid_t tid;
+ ngx_log_t *log;
+} ngx_cond_t;
+
+
+#define ngx_thread_sigmask(how, set, oset) \
+ (sigprocmask(how, set, oset) == -1) ? ngx_errno : 0
+
+#define ngx_thread_sigmask_n "sigprocmask()"
+
+#define ngx_thread_join(t, p)
+
+#define ngx_setthrtitle(n) setproctitle(n)
+
+
+extern char *ngx_freebsd_kern_usrstack;
+extern size_t ngx_thread_stack_size;
+
+
+static inline int ngx_gettid()
+{
+ char *sp;
+
+ if (ngx_thread_stack_size == 0) {
+ return 0;
+ }
+
+#if ( __i386__ )
+
+ __asm__ volatile ("mov %%esp, %0" : "=q" (sp));
+
+#elif ( __amd64__ )
+
+ __asm__ volatile ("mov %%rsp, %0" : "=q" (sp));
+
+#else
+
+#error "rfork()ed threads are not supported on this platform"
+
+#endif
+
+ return (ngx_freebsd_kern_usrstack - sp) / ngx_thread_stack_size;
+}
+
+
+ngx_tid_t ngx_thread_self();
+#define ngx_thread_main() (ngx_gettid() == 0)
+
+
+#define ngx_mutex_trylock(m) ngx_mutex_dolock(m, 1)
+#define ngx_mutex_lock(m) ngx_mutex_dolock(m, 0)
+ngx_int_t ngx_mutex_dolock(ngx_mutex_t *m, ngx_int_t try);
+ngx_int_t ngx_mutex_unlock(ngx_mutex_t *m);
+
+
+typedef int (*ngx_rfork_thread_func_pt)(void *arg);
+
+
+
+#endif /* _NGX_FREEBSD_RFORK_THREAD_H_INCLUDED_ */
diff --git a/src/os/unix/ngx_posix_cycle.c b/src/os/unix/ngx_posix_cycle.c
deleted file mode 100644
index 0b712ed29..000000000
--- a/src/os/unix/ngx_posix_cycle.c
+++ /dev/null
@@ -1,231 +0,0 @@
-
-
-void ngx_posix_master_cycle(ngx_cycle_t *cycle)
-{
- static ngx_int_t sent;
- static ngx_msec_t delay = 125;
-
- if (ngx_process == NGX_PROCESS_MASTER) {
- if (sent) {
- ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
- "sent signal cycle");
-
- if (sigprocmask(SIG_UNBLOCK, &set, NULL) == -1) {
- ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
- "sigprocmask() failed");
- continue;
- }
-
- /*
- * there is very big chance that the pending signals
- * would be delivered right on the sigprocmask() return
- */
-
- if (!ngx_signal) {
-
- if (delay < 15000) {
- delay *= 2;
- }
-
- ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
- "msleep %d", delay);
-
- ngx_msleep(delay);
-
- ngx_gettimeofday(&tv);
- ngx_time_update(tv.tv_sec);
-
- ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
- "wake up");
- }
-
- if (sigprocmask(SIG_BLOCK, &set, NULL) == -1) {
- ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
- "sigprocmask() failed");
- }
-
- ngx_signal = 0;
-
- } else {
- ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
- "sigsuspend");
-
- sigsuspend(&wset);
-
- ngx_gettimeofday(&tv);
- ngx_time_update(tv.tv_sec);
-
- ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
- "wake up");
- }
-
- } else { /* NGX_PROCESS_SINGLE */
- ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
- "worker cycle");
-
- ngx_process_events(cycle->log);
- }
-
- if (ngx_reap) {
- ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
- "reap childs");
-
- live = 0;
- for (i = 0; i < ngx_last_process; i++) {
-
- ngx_log_debug6(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
- "child: " PID_T_FMT
- " s:%d e:%d t:%d d:%d r:%d",
- ngx_processes[i].pid,
- ngx_processes[i].signal,
- ngx_processes[i].exiting,
- ngx_processes[i].exited,
- ngx_processes[i].detached,
- ngx_processes[i].respawn);
-
- if (ngx_processes[i].exited) {
-
- if (ngx_processes[i].respawn
- && !ngx_processes[i].exiting
- && !ngx_terminate
- && !ngx_quit)
- {
- if (ngx_spawn_process(cycle,
- ngx_processes[i].proc,
- ngx_processes[i].data,
- ngx_processes[i].name, i)
- == NGX_ERROR)
- {
- ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
- "can not respawn %s",
- ngx_processes[i].name);
- }
-
- continue;
- }
-
- if (ngx_processes[i].pid == ngx_new_binary) {
- ngx_new_binary = 0;
- }
-
- if (i != --ngx_last_process) {
- ngx_processes[i--] =
- ngx_processes[ngx_last_process];
- }
-
- } else if (!ngx_processes[i].detached
- && (ngx_terminate || ngx_quit))
- {
- live = 1;
-
- } else if (ngx_processes[i].exiting) {
- live = 1;
- }
- }
-
- if (!live) {
- if (ngx_terminate || ngx_quit) {
-
- if (ngx_inherited && getppid() > 1) {
- name = ctx->pid.name.data;
-
- } else {
- name = ctx->name;
- }
-
- if (ngx_delete_file(name) == NGX_FILE_ERROR) {
- ngx_log_error(NGX_LOG_ALERT, cycle->log,
- ngx_errno,
- ngx_delete_file_n
- " \"%s\" failed", name);
- }
-
- ngx_log_error(NGX_LOG_INFO, cycle->log, 0, "exit");
- exit(0);
-
- } else {
- sent = 0;
- }
- }
- }
-
- if (ngx_terminate) {
- if (delay > 10000) {
- signo = SIGKILL;
- } else {
- signo = ngx_signal_value(NGX_TERMINATE_SIGNAL);
- }
-
- } else if (ngx_quit) {
- signo = ngx_signal_value(NGX_SHUTDOWN_SIGNAL);
-
- } else {
-
- if (ngx_noaccept) {
- signo = ngx_signal_value(NGX_SHUTDOWN_SIGNAL);
- }
-
- if (ngx_change_binary) {
- ngx_change_binary = 0;
- ngx_log_error(NGX_LOG_INFO, cycle->log, 0, "changing binary");
- ngx_new_binary = ngx_exec_new_binary(cycle, ctx->argv);
- }
-
- if (ngx_reconfigure) {
- signo = ngx_signal_value(NGX_SHUTDOWN_SIGNAL);
- ngx_log_error(NGX_LOG_INFO, cycle->log, 0, "reconfiguring");
- }
-
- if (ngx_reopen) {
- /* STUB */
- signo = ngx_signal_value(NGX_SHUTDOWN_SIGNAL);
-
- ngx_log_error(NGX_LOG_INFO, cycle->log, 0, "reopening logs");
- ngx_reopen_files(cycle);
- }
- }
-
- if (signo) {
- for (i = 0; i < ngx_last_process; i++) {
-
- if (!ngx_processes[i].detached) {
- ngx_processes[i].signal = signo;
-
- ngx_log_debug2(NGX_LOG_DEBUG_EVENT,
- cycle->log, 0,
- "signal " PID_T_FMT " %d",
- ngx_processes[i].pid, signo);
- }
- }
-
- delay = 125;
- signo = 0;
- }
-
- for (i = 0; i < ngx_last_process; i++) {
-
- if (ngx_processes[i].signal == 0) {
- continue;
- }
-
- if (ccf->kqueue_signal != 1) {
- sent = 1;
- }
-
- ngx_log_debug2(NGX_LOG_DEBUG_CORE, cycle->log, 0,
- "kill (" PID_T_FMT ", %d)" ,
- ngx_processes[i].pid,
- ngx_processes[i].signal);
-
- if (kill(ngx_processes[i].pid, ngx_processes[i].signal) == -1) {
- ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
- "kill(%d, %d) failed",
- ngx_processes[i].pid, ngx_processes[i].signal);
- continue;
- }
-
- if (ngx_processes[i].signal != ngx_signal_value(NGX_REOPEN_SIGNAL)) {
- ngx_processes[i].exiting = 1;
- }
- }
-}
diff --git a/src/os/unix/ngx_process_cycle.c b/src/os/unix/ngx_process_cycle.c
index 8a0bd6f14..97b301459 100644
--- a/src/os/unix/ngx_process_cycle.c
+++ b/src/os/unix/ngx_process_cycle.c
@@ -13,7 +13,8 @@ static void ngx_master_exit(ngx_cycle_t *cycle, ngx_master_ctx_t *ctx);
static void ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data);
static void ngx_channel_handler(ngx_event_t *ev);
#if (NGX_THREADS)
-static int ngx_worker_thread_cycle(void *data);
+static void ngx_wakeup_worker_threads(ngx_cycle_t *cycle);
+static void *ngx_worker_thread_cycle(void *data);
#endif
@@ -40,6 +41,12 @@ ngx_uint_t ngx_noaccepting;
ngx_uint_t ngx_restart;
+#if (NGX_THREADS)
+volatile ngx_thread_t ngx_threads[NGX_MAX_THREADS];
+ngx_int_t ngx_threads_n;
+#endif
+
+
u_char master_process[] = "master process";
@@ -524,9 +531,6 @@ static void ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data)
ngx_listening_t *ls;
ngx_core_conf_t *ccf;
ngx_connection_t *c;
-#if (NGX_THREADS)
- ngx_tid_t tid;
-#endif
ngx_process = NGX_PROCESS_WORKER;
@@ -620,23 +624,34 @@ static void ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data)
#if (NGX_THREADS)
- if (ngx_init_threads(5, 128 * 1024 * 1024, cycle) == NGX_ERROR) {
+ if (ngx_time_mutex_init(cycle->log) == NGX_ERROR) {
/* fatal */
exit(2);
}
- if (!(ngx_posted_events_cv = ngx_cond_init(cycle->log))) {
- /* fatal */
- exit(2);
- }
-
- for (i = 0; i < 2; i++) {
- if (ngx_create_thread(&tid, ngx_worker_thread_cycle,
- cycle, cycle->log) != 0)
+ if (ngx_threads_n) {
+ if (ngx_init_threads(ngx_threads_n,
+ ccf->thread_stack_size, cycle) == NGX_ERROR)
{
/* fatal */
exit(2);
}
+
+ for (n = 0; n < ngx_threads_n; n++) {
+
+ if (!(ngx_threads[n].cv = ngx_cond_init(cycle->log))) {
+ /* fatal */
+ exit(2);
+ }
+
+ if (ngx_create_thread((ngx_tid_t *) &ngx_threads[n].tid,
+ ngx_worker_thread_cycle,
+ (void *) &ngx_threads[n], cycle->log) != 0)
+ {
+ /* fatal */
+ exit(2);
+ }
+ }
}
#endif
@@ -646,6 +661,14 @@ static void ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data)
&& ngx_event_timer_rbtree == &ngx_event_timer_sentinel)
{
ngx_log_error(NGX_LOG_INFO, cycle->log, 0, "exiting");
+
+
+#if (NGX_THREADS)
+ ngx_terminate = 1;
+
+ ngx_wakeup_worker_threads(cycle);
+#endif
+
/*
* we do not destroy cycle->pool here because a signal handler
* that uses cycle->log can be called at this point
@@ -659,6 +682,11 @@ static void ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data)
if (ngx_terminate) {
ngx_log_error(NGX_LOG_INFO, cycle->log, 0, "exiting");
+
+#if (NGX_THREADS)
+ ngx_wakeup_worker_threads(cycle);
+#endif
+
/*
* we do not destroy cycle->pool here because a signal handler
* that uses cycle->log can be called at this point
@@ -752,14 +780,53 @@ static void ngx_channel_handler(ngx_event_t *ev)
#if (NGX_THREADS)
-int ngx_worker_thread_cycle(void *data)
+static void ngx_wakeup_worker_threads(ngx_cycle_t *cycle)
{
- ngx_cycle_t *cycle = data;
+ ngx_int_t i;
+ ngx_uint_t live;
+
+ for ( ;; ) {
+
+ live = 0;
+
+ for (i = 0; i < ngx_threads_n; i++) {
+ if (ngx_threads[i].state < NGX_THREAD_EXIT) {
+ ngx_cond_signal(ngx_threads[i].cv);
+ live = 1;
+ }
+
+ if (ngx_threads[i].state == NGX_THREAD_EXIT) {
+ ngx_thread_join(ngx_threads[i].tid, NULL);
+ ngx_threads[i].state = NGX_THREAD_DONE;
+ }
+ }
+
+ if (live == 0) {
+ ngx_log_debug0(NGX_LOG_DEBUG_CORE, cycle->log, 0,
+ "all worker threads are joined");
+
+ /* STUB */
+ ngx_mutex_destroy(ngx_event_timer_mutex);
+ ngx_mutex_destroy(ngx_posted_events_mutex);
+
+ return;
+ }
+
+ ngx_sched_yield();
+ }
+}
+
+
+static void* ngx_worker_thread_cycle(void *data)
+{
+ ngx_thread_t *thr = data;
- ngx_err_t err;
sigset_t set;
+ ngx_err_t err;
struct timeval tv;
+ thr->cv->tid = ngx_thread_self();
+
sigemptyset(&set);
sigaddset(&set, ngx_signal_value(NGX_RECONFIGURE_SIGNAL));
sigaddset(&set, ngx_signal_value(NGX_REOPEN_SIGNAL));
@@ -767,32 +834,46 @@ int ngx_worker_thread_cycle(void *data)
err = ngx_thread_sigmask(SIG_BLOCK, &set, NULL);
if (err) {
- ngx_log_error(NGX_LOG_ALERT, cycle->log, err,
+ ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, err,
ngx_thread_sigmask_n " failed");
- return 1;
+ return (void *) 1;
}
ngx_log_debug1(NGX_LOG_DEBUG_CORE, ngx_cycle->log, ngx_errno,
- "thread %d started", ngx_thread_self());
+ "thread " TID_T_FMT " started", ngx_thread_self());
- ngx_setproctitle("worker thread");
+ ngx_setthrtitle("worker thread");
+
+ if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) {
+ return (void *) 1;
+ }
for ( ;; ) {
- if (ngx_cond_wait(ngx_posted_events_cv, ngx_posted_events_mutex)
- == NGX_ERROR)
- {
- return 1;
+ thr->state = NGX_THREAD_FREE;
+
+ if (ngx_cond_wait(thr->cv, ngx_posted_events_mutex) == NGX_ERROR) {
+ return (void *) 1;
}
- if (ngx_event_thread_process_posted(cycle) == NGX_ERROR) {
- return 1;
+ if (ngx_terminate) {
+ thr->state = NGX_THREAD_EXIT;
+
+ ngx_mutex_unlock(ngx_posted_events_mutex);
+
+ ngx_log_debug1(NGX_LOG_DEBUG_CORE, ngx_cycle->log, ngx_errno,
+ "thread %d is done", ngx_thread_self());
+
+ return (void *) 0;
}
- }
- ngx_log_debug1(NGX_LOG_DEBUG_CORE, ngx_cycle->log, ngx_errno,
- "thread %d done", ngx_thread_self());
+ thr->state = NGX_THREAD_BUSY;
- return 0;
+ if (ngx_event_thread_process_posted((ngx_cycle_t *) ngx_cycle)
+ == NGX_ERROR)
+ {
+ return (void *) 1;
+ }
+ }
}
#endif
diff --git a/src/os/unix/ngx_process_cycle.h b/src/os/unix/ngx_process_cycle.h
index b0a7d62bc..7c14d3a83 100644
--- a/src/os/unix/ngx_process_cycle.h
+++ b/src/os/unix/ngx_process_cycle.h
@@ -14,8 +14,8 @@
typedef struct {
- int argc;
- char *const *argv;
+ int argc;
+ char *const *argv;
} ngx_master_ctx_t;
diff --git a/src/os/unix/ngx_pthread.c b/src/os/unix/ngx_pthread.c
deleted file mode 100644
index 2491032a4..000000000
--- a/src/os/unix/ngx_pthread.c
+++ /dev/null
@@ -1,26 +0,0 @@
-
-
-#include <ngx_config.h>
-
-#include <ngx_log.h>
-#include <ngx_pthread.h>
-
-
-int ngx_create_os_thread(ngx_os_tid_t *tid, void *stack,
- ngx_thread_start_routine_t func, void *arg,
- ngx_log_t log)
-{
- int err;
- pthread_attr_t *attr;
-
- attr = NULL;
-
- err = pthread_create(tid, attr, func, arg);
-
- if (err != 0) {
- ngx_log_error(NGX_LOG_ERR, log, err, "pthread_create() failed");
- return NGX_ERROR;
- }
-
- return NGX_OK;
-}
diff --git a/src/os/unix/ngx_pthread.h b/src/os/unix/ngx_pthread.h
deleted file mode 100644
index 845655f57..000000000
--- a/src/os/unix/ngx_pthread.h
+++ /dev/null
@@ -1,14 +0,0 @@
-#ifndef _NGX_OS_THREAD_H_INCLUDED_
-#define _NGX_OS_THREAD_H_INCLUDED_
-
-
-#include <pthread.h>
-
-
-typedef pthread_t ngx_os_tid_t;
-typedef int ngx_tid_t;
-
-typedef void *(*)(void *) ngx_thread_start_routine_t
-
-
-#endif /* _NGX_OS_THREAD_H_INCLUDED_ */
diff --git a/src/os/unix/ngx_pthread_thread.c b/src/os/unix/ngx_pthread_thread.c
new file mode 100644
index 000000000..951123cb4
--- /dev/null
+++ b/src/os/unix/ngx_pthread_thread.c
@@ -0,0 +1,268 @@
+
+/*
+ * Copyright (C) 2002-2004 Igor Sysoev, http://sysoev.ru/en/
+ */
+
+
+#include <ngx_config.h>
+#include <ngx_core.h>
+
+
+static ngx_uint_t nthreads;
+static ngx_uint_t max_threads;
+
+
+static pthread_attr_t thr_attr;
+
+
+int ngx_create_thread(ngx_tid_t *tid, void* (*func)(void *arg), void *arg,
+ ngx_log_t *log)
+{
+ int err;
+
+ if (nthreads >= max_threads) {
+ ngx_log_error(NGX_LOG_CRIT, log, 0,
+ "no more than %d threads can be created", max_threads);
+ return NGX_ERROR;
+ }
+
+ err = pthread_create(tid, &thr_attr, func, arg);
+
+ if (err != 0) {
+ ngx_log_error(NGX_LOG_ALERT, log, err, "pthread_create() failed");
+ return err;
+ }
+
+ ngx_log_debug1(NGX_LOG_DEBUG_CORE, log, 0,
+ "thread is created: " TID_T_FMT, *tid);
+
+ nthreads++;
+
+ return err;
+}
+
+
+ngx_int_t ngx_init_threads(int n, size_t size, ngx_cycle_t *cycle)
+{
+ int err;
+
+ max_threads = n;
+
+ err = pthread_attr_init(&thr_attr);
+
+ if (err != 0) {
+ ngx_log_error(NGX_LOG_ALERT, cycle->log, err,
+ "pthread_attr_init() failed");
+ return NGX_ERROR;
+ }
+
+ err = pthread_attr_setstacksize(&thr_attr, size);
+
+ if (err != 0) {
+ ngx_log_error(NGX_LOG_ALERT, cycle->log, err,
+ "pthread_attr_setstacksize() failed");
+ return NGX_ERROR;
+ }
+
+ ngx_threaded = 1;
+
+ return NGX_OK;
+}
+
+
+ngx_mutex_t *ngx_mutex_init(ngx_log_t *log, uint flags)
+{
+ int err;
+ ngx_mutex_t *m;
+
+ if (!(m = ngx_alloc(sizeof(ngx_mutex_t), log))) {
+ return NULL;
+ }
+
+ m->log = log;
+
+ err = pthread_mutex_init(&m->mutex, NULL);
+
+ if (err != 0) {
+ ngx_log_error(NGX_LOG_ALERT, m->log, err,
+ "pthread_mutex_init() failed");
+ return NULL;
+ }
+
+ return m;
+}
+
+
+void ngx_mutex_destroy(ngx_mutex_t *m)
+{
+ int err;
+
+ err = pthread_mutex_destroy(&m->mutex);
+
+ if (err != 0) {
+ ngx_log_error(NGX_LOG_ALERT, m->log, err,
+ "pthread_mutex_destroy(" PTR_FMT ") failed", m);
+ }
+
+ ngx_free(m);
+}
+
+
+ngx_int_t ngx_mutex_lock(ngx_mutex_t *m)
+{
+ int err;
+
+ if (!ngx_threaded) {
+ return NGX_OK;
+ }
+
+ ngx_log_debug1(NGX_LOG_DEBUG_CORE, m->log, 0, "lock mutex " PTR_FMT, m);
+
+ err = pthread_mutex_lock(&m->mutex);
+
+ if (err != 0) {
+ ngx_log_error(NGX_LOG_ALERT, m->log, err,
+ "pthread_mutex_lock(" PTR_FMT ") failed", m);
+ return NGX_ERROR;
+ }
+
+ ngx_log_debug1(NGX_LOG_DEBUG_CORE, m->log, 0,
+ "mutex " PTR_FMT " is locked", m);
+
+ return NGX_OK;
+}
+
+
+ngx_int_t ngx_mutex_trylock(ngx_mutex_t *m)
+{
+ int err;
+
+ if (!ngx_threaded) {
+ return NGX_OK;
+ }
+
+ ngx_log_debug1(NGX_LOG_DEBUG_CORE, m->log, 0, "try lock mutex " PTR_FMT, m);
+
+ err = pthread_mutex_trylock(&m->mutex);
+
+ if (err != 0) {
+ ngx_log_error(NGX_LOG_ALERT, m->log, err,
+ "pthread_mutex_trylock(" PTR_FMT ") failed", m);
+ return NGX_ERROR;
+ }
+
+ ngx_log_debug1(NGX_LOG_DEBUG_CORE, m->log, 0,
+ "mutex " PTR_FMT " is locked", m);
+
+ return NGX_OK;
+}
+
+
+ngx_int_t ngx_mutex_unlock(ngx_mutex_t *m)
+{
+ int err;
+
+ if (!ngx_threaded) {
+ return NGX_OK;
+ }
+
+ ngx_log_debug1(NGX_LOG_DEBUG_CORE, m->log, 0, "unlock mutex " PTR_FMT, m);
+
+ err = pthread_mutex_unlock(&m->mutex);
+
+ if (err != 0) {
+ ngx_log_error(NGX_LOG_ALERT, m->log, err,
+ "pthread_mutex_unlock(" PTR_FMT ") failed", m);
+ return NGX_ERROR;
+ }
+
+ ngx_log_debug1(NGX_LOG_DEBUG_CORE, m->log, 0,
+ "mutex " PTR_FMT " is unlocked", m);
+
+ return NGX_OK;
+}
+
+
+ngx_cond_t *ngx_cond_init(ngx_log_t *log)
+{
+ int err;
+ ngx_cond_t *cv;
+
+ if (!(cv = ngx_alloc(sizeof(ngx_cond_t), log))) {
+ return NULL;
+ }
+
+ cv->log = log;
+
+ err = pthread_cond_init(&cv->cond, NULL);
+
+ if (err != 0) {
+ ngx_log_error(NGX_LOG_ALERT, cv->log, err,
+ "pthread_cond_init() failed");
+ return NULL;
+ }
+
+ return cv;
+}
+
+
+void ngx_cond_destroy(ngx_cond_t *cv)
+{
+ int err;
+
+ err = pthread_cond_destroy(&cv->cond);
+
+ if (err != 0) {
+ ngx_log_error(NGX_LOG_ALERT, cv->log, err,
+ "pthread_cond_destroy(" PTR_FMT ") failed", cv);
+ }
+
+ ngx_free(cv);
+}
+
+
+ngx_int_t ngx_cond_wait(ngx_cond_t *cv, ngx_mutex_t *m)
+{
+ int err;
+
+ ngx_log_debug1(NGX_LOG_DEBUG_CORE, cv->log, 0,
+ "cv " PTR_FMT " wait", cv);
+
+ err = pthread_cond_wait(&cv->cond, &m->mutex);
+
+ if (err != 0) {
+ ngx_log_error(NGX_LOG_ALERT, cv->log, err,
+ "pthread_cond_wait(" PTR_FMT ") failed", cv);
+ return NGX_ERROR;
+ }
+
+ ngx_log_debug1(NGX_LOG_DEBUG_CORE, cv->log, 0,
+ "cv " PTR_FMT " is waked up", cv);
+
+ ngx_log_debug1(NGX_LOG_DEBUG_CORE, m->log, 0,
+ "mutex " PTR_FMT " is locked", m);
+
+ return NGX_OK;
+}
+
+
+ngx_int_t ngx_cond_signal(ngx_cond_t *cv)
+{
+ int err;
+
+ ngx_log_debug1(NGX_LOG_DEBUG_CORE, cv->log, 0,
+ "cv " PTR_FMT " to signal", cv);
+
+ err = pthread_cond_signal(&cv->cond);
+
+ if (err != 0) {
+ ngx_log_error(NGX_LOG_ALERT, cv->log, err,
+ "pthread_cond_signal(" PTR_FMT ") failed", cv);
+ return NGX_ERROR;
+ }
+
+ ngx_log_debug1(NGX_LOG_DEBUG_CORE, cv->log, 0,
+ "cv " PTR_FMT " is signaled", cv);
+
+ return NGX_OK;
+}
diff --git a/src/os/unix/ngx_thread.h b/src/os/unix/ngx_thread.h
index a1404038e..3bd9753d7 100644
--- a/src/os/unix/ngx_thread.h
+++ b/src/os/unix/ngx_thread.h
@@ -7,111 +7,84 @@
#if (NGX_THREADS)
-#define ngx_thread_volatile volatile
+#define NGX_MAX_THREADS 128
#if (NGX_USE_RFORK)
+#include <ngx_freebsd_rfork_thread.h>
-#include <sys/ipc.h>
-#include <sys/sem.h>
-#include <sched.h>
-typedef pid_t ngx_tid_t;
+#else /* use pthreads */
-#undef ngx_log_pid
-#define ngx_log_pid ngx_thread_self()
-#define ngx_log_tid 0
+#include <pthread.h>
+#include <pthread_np.h>
-#define TID_T_FMT PID_T_FMT
+typedef pthread_t ngx_tid_t;
+#define ngx_thread_self() pthread_self()
+#define ngx_thread_main() pthread_main_np()
+#define ngx_log_tid (int) ngx_thread_self()
-#define NGX_MUTEX_LIGHT 1
-#define NGX_MUTEX_CV 2
+#define TID_T_FMT PTR_FMT
-#define NGX_MUTEX_LOCK_BUSY 0x80000000
-typedef volatile struct {
- ngx_atomic_t lock;
- ngx_log_t *log;
- int semid;
-} ngx_mutex_t;
+#define NGX_MUTEX_LIGHT 0
+typedef struct {
+ pthread_mutex_t mutex;
+ ngx_log_t *log;
+} ngx_mutex_t;
typedef struct {
- int semid;
- ngx_log_t *log;
+ pthread_cond_t cond;
+ ngx_tid_t tid;
+ ngx_log_t *log;
} ngx_cond_t;
+#define ngx_thread_sigmask pthread_sigmask
+#define ngx_thread_sigmask_n "pthread_sigmask()"
-#define ngx_thread_sigmask(how, set, oset) \
- (sigprocmask(how, set, oset) == -1) ? ngx_errno : 0
-
-#define ngx_thread_sigmask_n "sigprocmask()"
-
-
-extern char *ngx_freebsd_kern_usrstack;
-extern size_t ngx_thread_stack_size;
-
-static inline int ngx_gettid()
-{
- char *sp;
-
- if (ngx_thread_stack_size == 0) {
- return 0;
- }
-
-#if ( __i386__ )
-
- __asm__ volatile ("mov %%esp, %0" : "=q" (sp));
+#define ngx_thread_join(t, p) pthread_join(t, p)
-#elif ( __amd64__ )
+#define ngx_setthrtitle(n)
- __asm__ volatile ("mov %%rsp, %0" : "=q" (sp));
-#else
-#error "rfork()ed threads are not supported on this platform"
+ngx_int_t ngx_mutex_trylock(ngx_mutex_t *m);
+ngx_int_t ngx_mutex_lock(ngx_mutex_t *m);
+ngx_int_t ngx_mutex_unlock(ngx_mutex_t *m);
#endif
- return (ngx_freebsd_kern_usrstack - sp) / ngx_thread_stack_size;
-}
-
-
-#define ngx_thread_main() (ngx_gettid() == 0)
-
-#else /* use pthreads */
-
-#include <pthread.h>
+#define ngx_thread_volatile volatile
-typedef pthread_t ngx_tid_t;
-#define ngx_gettid() ((ngx_int_t) pthread_getspecific(0))
-#define ngx_log_tid ngx_thread_self()
+typedef struct {
+ ngx_tid_t tid;
+ ngx_cond_t *cv;
+ ngx_uint_t state;
+} ngx_thread_t;
-#define ngx_thread_sigmask pthread_sigmask
-#define ngx_thread_sigmask_n "pthread_sigmask()"
+#define NGX_THREAD_FREE 1
+#define NGX_THREAD_BUSY 2
+#define NGX_THREAD_EXIT 3
+#define NGX_THREAD_DONE 4
-#endif
+extern ngx_int_t ngx_threads_n;
+extern volatile ngx_thread_t ngx_threads[NGX_MAX_THREADS];
ngx_int_t ngx_init_threads(int n, size_t size, ngx_cycle_t *cycle);
-int ngx_create_thread(ngx_tid_t *tid, int (*func)(void *arg), void *arg,
+int ngx_create_thread(ngx_tid_t *tid, void* (*func)(void *arg), void *arg,
ngx_log_t *log);
-ngx_tid_t ngx_thread_self();
ngx_mutex_t *ngx_mutex_init(ngx_log_t *log, uint flags);
-void ngx_mutex_done(ngx_mutex_t *m);
-
-#define ngx_mutex_trylock(m) ngx_mutex_dolock(m, 1)
-#define ngx_mutex_lock(m) ngx_mutex_dolock(m, 0)
-ngx_int_t ngx_mutex_dolock(ngx_mutex_t *m, ngx_int_t try);
-ngx_int_t ngx_mutex_unlock(ngx_mutex_t *m);
+void ngx_mutex_destroy(ngx_mutex_t *m);
ngx_cond_t *ngx_cond_init(ngx_log_t *log);
-void ngx_cond_done(ngx_cond_t *cv);
+void ngx_cond_destroy(ngx_cond_t *cv);
ngx_int_t ngx_cond_wait(ngx_cond_t *cv, ngx_mutex_t *m);
ngx_int_t ngx_cond_signal(ngx_cond_t *cv);