Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/ambrop72/badvpn.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorambrop7 <ambrop7@1a93d707-3861-5ebc-ad3b-9740d49b5140>2011-03-17 01:33:42 +0300
committerambrop7 <ambrop7@1a93d707-3861-5ebc-ad3b-9740d49b5140>2011-03-17 01:33:42 +0300
commiteca5b73fd65080a5118471219ea445e1085422ec (patch)
tree8e322b9ae421e510d27bb9aab21860d987f09381 /threadwork
parente39dbd4fda964c8279e458294fdb2b819f88d3ff (diff)
BThreadWork: support multiple worker threads
Diffstat (limited to 'threadwork')
-rw-r--r--threadwork/BThreadWork.c122
-rw-r--r--threadwork/BThreadWork.h20
2 files changed, 91 insertions, 51 deletions
diff --git a/threadwork/BThreadWork.c b/threadwork/BThreadWork.c
index b131a1e..28f290f 100644
--- a/threadwork/BThreadWork.c
+++ b/threadwork/BThreadWork.c
@@ -37,8 +37,10 @@
#ifdef BADVPN_THREADWORK_USE_PTHREAD
-static void * dispatcher_thread (BThreadWorkDispatcher *o)
+static void * dispatcher_thread (struct BThreadWorkDispatcher_thread *t)
{
+ BThreadWorkDispatcher *o = t->d;
+
ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
while (1) {
@@ -49,7 +51,7 @@ static void * dispatcher_thread (BThreadWorkDispatcher *o)
if (LinkedList2_IsEmpty(&o->pending_list)) {
// wait for event
- ASSERT_FORCE(pthread_cond_wait(&o->new_cond, &o->mutex) == 0)
+ ASSERT_FORCE(pthread_cond_wait(&t->new_cond, &o->mutex) == 0)
continue;
}
@@ -57,7 +59,7 @@ static void * dispatcher_thread (BThreadWorkDispatcher *o)
BThreadWork *w = UPPER_OBJECT(LinkedList2_GetFirst(&o->pending_list), BThreadWork, list_node);
ASSERT(w->state == BTHREADWORK_STATE_PENDING)
LinkedList2_Remove(&o->pending_list, &w->list_node);
- o->running_work = w;
+ t->running_work = w;
w->state = BTHREADWORK_STATE_RUNNING;
// do the work
@@ -66,7 +68,7 @@ static void * dispatcher_thread (BThreadWorkDispatcher *o)
ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
// release the work
- o->running_work = NULL;
+ t->running_work = NULL;
LinkedList2_Append(&o->finished_list, &w->list_node);
w->state = BTHREADWORK_STATE_FINISHED;
ASSERT_FORCE(sem_post(&w->finished_sem) == 0)
@@ -145,6 +147,29 @@ static void more_job_handler (BThreadWorkDispatcher *o)
return;
}
+static void stop_threads (BThreadWorkDispatcher *o)
+{
+ // set cancelling
+ ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
+ o->cancel = 1;
+ ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
+
+ while (o->num_threads > 0) {
+ struct BThreadWorkDispatcher_thread *t = &o->threads[o->num_threads - 1];
+
+ // wake up thread
+ ASSERT_FORCE(pthread_cond_signal(&t->new_cond) == 0)
+
+ // wait for thread to exit
+ ASSERT_FORCE(pthread_join(t->thread, NULL) == 0)
+
+ // free condition variable
+ ASSERT_FORCE(pthread_cond_destroy(&t->new_cond) == 0)
+
+ o->num_threads--;
+ }
+}
+
#endif
static void work_job_handler (BThreadWork *o)
@@ -167,24 +192,19 @@ int BThreadWorkDispatcher_Init (BThreadWorkDispatcher *o, BReactor *reactor, int
// init arguments
o->reactor = reactor;
- // set num threads
- #ifdef BADVPN_THREADWORK_USE_PTHREAD
if (num_threads_hint < 0) {
- o->num_threads = 2;
- } else {
- o->num_threads = num_threads_hint;
+ num_threads_hint = 2;
+ }
+ if (num_threads_hint > BTHREADWORK_MAX_THREADS) {
+ num_threads_hint = BTHREADWORK_MAX_THREADS;
}
- #endif
#ifdef BADVPN_THREADWORK_USE_PTHREAD
- if (o->num_threads > 0) {
+ if (num_threads_hint > 0) {
// init pending list
LinkedList2_Init(&o->pending_list);
- // set no running work
- o->running_work = NULL;
-
// init finished list
LinkedList2_Init(&o->finished_list);
@@ -194,35 +214,29 @@ int BThreadWorkDispatcher_Init (BThreadWorkDispatcher *o, BReactor *reactor, int
goto fail0;
}
- // init condition variable
- if (pthread_cond_init(&o->new_cond, NULL) != 0) {
- BLog(BLOG_ERROR, "pthread_cond_init failed");
- goto fail1;
- }
-
// init pipe
if (pipe(o->pipe) < 0) {
BLog(BLOG_ERROR, "pipe failed");
- goto fail2;
+ goto fail1;
}
// set read end non-blocking
if (fcntl(o->pipe[0], F_SETFL, O_NONBLOCK) < 0) {
BLog(BLOG_ERROR, "fcntl failed");
- goto fail3;
+ goto fail2;
}
// set write end non-blocking
if (fcntl(o->pipe[1], F_SETFL, O_NONBLOCK) < 0) {
BLog(BLOG_ERROR, "fcntl failed");
- goto fail3;
+ goto fail2;
}
// init BFileDescriptor
BFileDescriptor_Init(&o->bfd, o->pipe[0], (BFileDescriptor_handler)pipe_fd_handler, o);
if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
- goto fail3;
+ goto fail2;
}
BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_READ);
@@ -232,10 +246,31 @@ int BThreadWorkDispatcher_Init (BThreadWorkDispatcher *o, BReactor *reactor, int
// set not cancelling
o->cancel = 0;
- // init thread
- if (pthread_create(&o->thread, NULL, (void * (*) (void *))dispatcher_thread, o) != 0) {
- BLog(BLOG_ERROR, "pthread_create failed");
- goto fail4;
+ // init threads
+ o->num_threads = 0;
+ for (int i = 0; i < num_threads_hint; i++) {
+ struct BThreadWorkDispatcher_thread *t = &o->threads[i];
+
+ // set parent pointer
+ t->d = o;
+
+ // set no running work
+ t->running_work = NULL;
+
+ // init condition variable
+ if (pthread_cond_init(&t->new_cond, NULL) != 0) {
+ BLog(BLOG_ERROR, "pthread_cond_init failed");
+ goto fail3;
+ }
+
+ // init thread
+ if (pthread_create(&t->thread, NULL, (void * (*) (void *))dispatcher_thread, t) != 0) {
+ BLog(BLOG_ERROR, "pthread_create failed");
+ ASSERT_FORCE(pthread_cond_destroy(&t->new_cond) == 0)
+ goto fail3;
+ }
+
+ o->num_threads++;
}
}
@@ -246,19 +281,18 @@ int BThreadWorkDispatcher_Init (BThreadWorkDispatcher *o, BReactor *reactor, int
return 1;
#ifdef BADVPN_THREADWORK_USE_PTHREAD
-fail4:
+fail3:
+ stop_threads(o);
BPending_Free(&o->more_job);
BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
-fail3:
+fail2:
ASSERT_FORCE(close(o->pipe[0]) == 0)
ASSERT_FORCE(close(o->pipe[1]) == 0)
-fail2:
- ASSERT_FORCE(pthread_cond_destroy(&o->new_cond) == 0)
fail1:
ASSERT_FORCE(pthread_mutex_destroy(&o->mutex) == 0)
- #endif
fail0:
return 0;
+ #endif
}
void BThreadWorkDispatcher_Free (BThreadWorkDispatcher *o)
@@ -266,7 +300,7 @@ void BThreadWorkDispatcher_Free (BThreadWorkDispatcher *o)
#ifdef BADVPN_THREADWORK_USE_PTHREAD
if (o->num_threads > 0) {
ASSERT(LinkedList2_IsEmpty(&o->pending_list))
- ASSERT(!o->running_work)
+ for (int i = 0; i < o->num_threads; i++) { ASSERT(!o->threads[i].running_work) }
ASSERT(LinkedList2_IsEmpty(&o->finished_list))
}
#endif
@@ -276,14 +310,8 @@ void BThreadWorkDispatcher_Free (BThreadWorkDispatcher *o)
#ifdef BADVPN_THREADWORK_USE_PTHREAD
if (o->num_threads > 0) {
- // post termination request
- ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
- o->cancel = 1;
- ASSERT_FORCE(pthread_cond_signal(&o->new_cond) == 0)
- ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
-
- // wait for thread to exit
- ASSERT_FORCE(pthread_join(o->thread, NULL) == 0)
+ // stop threads
+ stop_threads(o);
// free more job
BPending_Free(&o->more_job);
@@ -295,9 +323,6 @@ void BThreadWorkDispatcher_Free (BThreadWorkDispatcher *o)
ASSERT_FORCE(close(o->pipe[0]) == 0)
ASSERT_FORCE(close(o->pipe[1]) == 0)
- // free condition variable
- ASSERT_FORCE(pthread_cond_destroy(&o->new_cond) == 0)
-
// free mutex
ASSERT_FORCE(pthread_mutex_destroy(&o->mutex) == 0)
}
@@ -336,7 +361,12 @@ void BThreadWork_Init (BThreadWork *o, BThreadWorkDispatcher *d, BThreadWork_han
// post work
ASSERT_FORCE(pthread_mutex_lock(&d->mutex) == 0)
LinkedList2_Append(&d->pending_list, &o->list_node);
- ASSERT_FORCE(pthread_cond_signal(&d->new_cond) == 0)
+ for (int i = 0; i < d->num_threads; i++) {
+ if (!d->threads[i].running_work) {
+ ASSERT_FORCE(pthread_cond_signal(&d->threads[i].new_cond) == 0)
+ break;
+ }
+ }
ASSERT_FORCE(pthread_mutex_unlock(&d->mutex) == 0)
} else {
#endif
diff --git a/threadwork/BThreadWork.h b/threadwork/BThreadWork.h
index e83c065..e047f53 100644
--- a/threadwork/BThreadWork.h
+++ b/threadwork/BThreadWork.h
@@ -43,7 +43,10 @@
#define BTHREADWORK_STATE_FINISHED 3
#define BTHREADWORK_STATE_FORGOTTEN 4
+#define BTHREADWORK_MAX_THREADS 8
+
struct BThreadWork_s;
+struct BThreadWorkDispatcher_s;
/**
* Function called to do the work for a {@link BThreadWork}.
@@ -60,20 +63,27 @@ typedef void (*BThreadWork_work_func) (void *user);
*/
typedef void (*BThreadWork_handler_done) (void *user);
-typedef struct {
+#ifdef BADVPN_THREADWORK_USE_PTHREAD
+struct BThreadWorkDispatcher_thread {
+ struct BThreadWorkDispatcher_s *d;
+ struct BThreadWork_s *running_work;
+ pthread_cond_t new_cond;
+ pthread_t thread;
+};
+#endif
+
+typedef struct BThreadWorkDispatcher_s {
BReactor *reactor;
#ifdef BADVPN_THREADWORK_USE_PTHREAD
- int num_threads;
LinkedList2 pending_list;
- struct BThreadWork_s *running_work;
LinkedList2 finished_list;
pthread_mutex_t mutex;
- pthread_cond_t new_cond;
int pipe[2];
BFileDescriptor bfd;
BPending more_job;
int cancel;
- pthread_t thread;
+ int num_threads;
+ struct BThreadWorkDispatcher_thread threads[BTHREADWORK_MAX_THREADS];
#endif
DebugObject d_obj;
DebugCounter d_ctr;