diff options
author | ambrop7 <ambrop7@1a93d707-3861-5ebc-ad3b-9740d49b5140> | 2011-03-17 01:33:42 +0300 |
---|---|---|
committer | ambrop7 <ambrop7@1a93d707-3861-5ebc-ad3b-9740d49b5140> | 2011-03-17 01:33:42 +0300 |
commit | eca5b73fd65080a5118471219ea445e1085422ec (patch) | |
tree | 8e322b9ae421e510d27bb9aab21860d987f09381 /threadwork | |
parent | e39dbd4fda964c8279e458294fdb2b819f88d3ff (diff) |
BThreadWork: support multiple worker threads
Diffstat (limited to 'threadwork')
-rw-r--r-- | threadwork/BThreadWork.c | 122 | ||||
-rw-r--r-- | threadwork/BThreadWork.h | 20 |
2 files changed, 91 insertions, 51 deletions
diff --git a/threadwork/BThreadWork.c b/threadwork/BThreadWork.c index b131a1e..28f290f 100644 --- a/threadwork/BThreadWork.c +++ b/threadwork/BThreadWork.c @@ -37,8 +37,10 @@ #ifdef BADVPN_THREADWORK_USE_PTHREAD -static void * dispatcher_thread (BThreadWorkDispatcher *o) +static void * dispatcher_thread (struct BThreadWorkDispatcher_thread *t) { + BThreadWorkDispatcher *o = t->d; + ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0) while (1) { @@ -49,7 +51,7 @@ static void * dispatcher_thread (BThreadWorkDispatcher *o) if (LinkedList2_IsEmpty(&o->pending_list)) { // wait for event - ASSERT_FORCE(pthread_cond_wait(&o->new_cond, &o->mutex) == 0) + ASSERT_FORCE(pthread_cond_wait(&t->new_cond, &o->mutex) == 0) continue; } @@ -57,7 +59,7 @@ static void * dispatcher_thread (BThreadWorkDispatcher *o) BThreadWork *w = UPPER_OBJECT(LinkedList2_GetFirst(&o->pending_list), BThreadWork, list_node); ASSERT(w->state == BTHREADWORK_STATE_PENDING) LinkedList2_Remove(&o->pending_list, &w->list_node); - o->running_work = w; + t->running_work = w; w->state = BTHREADWORK_STATE_RUNNING; // do the work @@ -66,7 +68,7 @@ static void * dispatcher_thread (BThreadWorkDispatcher *o) ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0) // release the work - o->running_work = NULL; + t->running_work = NULL; LinkedList2_Append(&o->finished_list, &w->list_node); w->state = BTHREADWORK_STATE_FINISHED; ASSERT_FORCE(sem_post(&w->finished_sem) == 0) @@ -145,6 +147,29 @@ static void more_job_handler (BThreadWorkDispatcher *o) return; } +static void stop_threads (BThreadWorkDispatcher *o) +{ + // set cancelling + ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0) + o->cancel = 1; + ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0) + + while (o->num_threads > 0) { + struct BThreadWorkDispatcher_thread *t = &o->threads[o->num_threads - 1]; + + // wake up thread + ASSERT_FORCE(pthread_cond_signal(&t->new_cond) == 0) + + // wait for thread to exit + ASSERT_FORCE(pthread_join(t->thread, NULL) == 0) + + // free condition variable + ASSERT_FORCE(pthread_cond_destroy(&t->new_cond) == 0) + + o->num_threads--; + } +} + #endif static void work_job_handler (BThreadWork *o) @@ -167,24 +192,19 @@ int BThreadWorkDispatcher_Init (BThreadWorkDispatcher *o, BReactor *reactor, int // init arguments o->reactor = reactor; - // set num threads - #ifdef BADVPN_THREADWORK_USE_PTHREAD if (num_threads_hint < 0) { - o->num_threads = 2; - } else { - o->num_threads = num_threads_hint; + num_threads_hint = 2; + } + if (num_threads_hint > BTHREADWORK_MAX_THREADS) { + num_threads_hint = BTHREADWORK_MAX_THREADS; } - #endif #ifdef BADVPN_THREADWORK_USE_PTHREAD - if (o->num_threads > 0) { + if (num_threads_hint > 0) { // init pending list LinkedList2_Init(&o->pending_list); - // set no running work - o->running_work = NULL; - // init finished list LinkedList2_Init(&o->finished_list); @@ -194,35 +214,29 @@ int BThreadWorkDispatcher_Init (BThreadWorkDispatcher *o, BReactor *reactor, int goto fail0; } - // init condition variable - if (pthread_cond_init(&o->new_cond, NULL) != 0) { - BLog(BLOG_ERROR, "pthread_cond_init failed"); - goto fail1; - } - // init pipe if (pipe(o->pipe) < 0) { BLog(BLOG_ERROR, "pipe failed"); - goto fail2; + goto fail1; } // set read end non-blocking if (fcntl(o->pipe[0], F_SETFL, O_NONBLOCK) < 0) { BLog(BLOG_ERROR, "fcntl failed"); - goto fail3; + goto fail2; } // set write end non-blocking if (fcntl(o->pipe[1], F_SETFL, O_NONBLOCK) < 0) { BLog(BLOG_ERROR, "fcntl failed"); - goto fail3; + goto fail2; } // init BFileDescriptor BFileDescriptor_Init(&o->bfd, o->pipe[0], (BFileDescriptor_handler)pipe_fd_handler, o); if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) { BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed"); - goto fail3; + goto fail2; } BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_READ); @@ -232,10 +246,31 @@ int BThreadWorkDispatcher_Init (BThreadWorkDispatcher *o, BReactor *reactor, int // set not cancelling o->cancel = 0; - // init thread - if (pthread_create(&o->thread, NULL, (void * (*) (void *))dispatcher_thread, o) != 0) { - BLog(BLOG_ERROR, "pthread_create failed"); - goto fail4; + // init threads + o->num_threads = 0; + for (int i = 0; i < num_threads_hint; i++) { + struct BThreadWorkDispatcher_thread *t = &o->threads[i]; + + // set parent pointer + t->d = o; + + // set no running work + t->running_work = NULL; + + // init condition variable + if (pthread_cond_init(&t->new_cond, NULL) != 0) { + BLog(BLOG_ERROR, "pthread_cond_init failed"); + goto fail3; + } + + // init thread + if (pthread_create(&t->thread, NULL, (void * (*) (void *))dispatcher_thread, t) != 0) { + BLog(BLOG_ERROR, "pthread_create failed"); + ASSERT_FORCE(pthread_cond_destroy(&t->new_cond) == 0) + goto fail3; + } + + o->num_threads++; } } @@ -246,19 +281,18 @@ int BThreadWorkDispatcher_Init (BThreadWorkDispatcher *o, BReactor *reactor, int return 1; #ifdef BADVPN_THREADWORK_USE_PTHREAD -fail4: +fail3: + stop_threads(o); BPending_Free(&o->more_job); BReactor_RemoveFileDescriptor(o->reactor, &o->bfd); -fail3: +fail2: ASSERT_FORCE(close(o->pipe[0]) == 0) ASSERT_FORCE(close(o->pipe[1]) == 0) -fail2: - ASSERT_FORCE(pthread_cond_destroy(&o->new_cond) == 0) fail1: ASSERT_FORCE(pthread_mutex_destroy(&o->mutex) == 0) - #endif fail0: return 0; + #endif } void BThreadWorkDispatcher_Free (BThreadWorkDispatcher *o) @@ -266,7 +300,7 @@ void BThreadWorkDispatcher_Free (BThreadWorkDispatcher *o) #ifdef BADVPN_THREADWORK_USE_PTHREAD if (o->num_threads > 0) { ASSERT(LinkedList2_IsEmpty(&o->pending_list)) - ASSERT(!o->running_work) + for (int i = 0; i < o->num_threads; i++) { ASSERT(!o->threads[i].running_work) } ASSERT(LinkedList2_IsEmpty(&o->finished_list)) } #endif @@ -276,14 +310,8 @@ void BThreadWorkDispatcher_Free (BThreadWorkDispatcher *o) #ifdef BADVPN_THREADWORK_USE_PTHREAD if (o->num_threads > 0) { - // post termination request - ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0) - o->cancel = 1; - ASSERT_FORCE(pthread_cond_signal(&o->new_cond) == 0) - ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0) - - // wait for thread to exit - ASSERT_FORCE(pthread_join(o->thread, NULL) == 0) + // stop threads + stop_threads(o); // free more job BPending_Free(&o->more_job); @@ -295,9 +323,6 @@ void BThreadWorkDispatcher_Free (BThreadWorkDispatcher *o) ASSERT_FORCE(close(o->pipe[0]) == 0) ASSERT_FORCE(close(o->pipe[1]) == 0) - // free condition variable - ASSERT_FORCE(pthread_cond_destroy(&o->new_cond) == 0) - // free mutex ASSERT_FORCE(pthread_mutex_destroy(&o->mutex) == 0) } @@ -336,7 +361,12 @@ void BThreadWork_Init (BThreadWork *o, BThreadWorkDispatcher *d, BThreadWork_han // post work ASSERT_FORCE(pthread_mutex_lock(&d->mutex) == 0) LinkedList2_Append(&d->pending_list, &o->list_node); - ASSERT_FORCE(pthread_cond_signal(&d->new_cond) == 0) + for (int i = 0; i < d->num_threads; i++) { + if (!d->threads[i].running_work) { + ASSERT_FORCE(pthread_cond_signal(&d->threads[i].new_cond) == 0) + break; + } + } ASSERT_FORCE(pthread_mutex_unlock(&d->mutex) == 0) } else { #endif diff --git a/threadwork/BThreadWork.h b/threadwork/BThreadWork.h index e83c065..e047f53 100644 --- a/threadwork/BThreadWork.h +++ b/threadwork/BThreadWork.h @@ -43,7 +43,10 @@ #define BTHREADWORK_STATE_FINISHED 3 #define BTHREADWORK_STATE_FORGOTTEN 4 +#define BTHREADWORK_MAX_THREADS 8 + struct BThreadWork_s; +struct BThreadWorkDispatcher_s; /** * Function called to do the work for a {@link BThreadWork}. @@ -60,20 +63,27 @@ typedef void (*BThreadWork_work_func) (void *user); */ typedef void (*BThreadWork_handler_done) (void *user); -typedef struct { +#ifdef BADVPN_THREADWORK_USE_PTHREAD +struct BThreadWorkDispatcher_thread { + struct BThreadWorkDispatcher_s *d; + struct BThreadWork_s *running_work; + pthread_cond_t new_cond; + pthread_t thread; +}; +#endif + +typedef struct BThreadWorkDispatcher_s { BReactor *reactor; #ifdef BADVPN_THREADWORK_USE_PTHREAD - int num_threads; LinkedList2 pending_list; - struct BThreadWork_s *running_work; LinkedList2 finished_list; pthread_mutex_t mutex; - pthread_cond_t new_cond; int pipe[2]; BFileDescriptor bfd; BPending more_job; int cancel; - pthread_t thread; + int num_threads; + struct BThreadWorkDispatcher_thread threads[BTHREADWORK_MAX_THREADS]; #endif DebugObject d_obj; DebugCounter d_ctr; |