diff options
author | ambrop7 <ambrop7@1a93d707-3861-5ebc-ad3b-9740d49b5140> | 2011-03-14 02:35:29 +0300 |
---|---|---|
committer | ambrop7 <ambrop7@1a93d707-3861-5ebc-ad3b-9740d49b5140> | 2011-03-14 02:35:29 +0300 |
commit | 48963af6f9e26fe23d329cf934016929d87d6a9f (patch) | |
tree | 366c8d7578f2a19bef42018ca5c0664db04f9ac3 /threadwork | |
parent | 6f95696566235d6fb2eac1b2e384bfb1203fa53d (diff) |
add BThreadWork
Diffstat (limited to 'threadwork')
-rw-r--r-- | threadwork/BThreadWork.c | 340 | ||||
-rw-r--r-- | threadwork/BThreadWork.h | 144 | ||||
-rw-r--r-- | threadwork/CMakeLists.txt | 7 |
3 files changed, 491 insertions, 0 deletions
diff --git a/threadwork/BThreadWork.c b/threadwork/BThreadWork.c new file mode 100644 index 0000000..133ea9c --- /dev/null +++ b/threadwork/BThreadWork.c @@ -0,0 +1,340 @@ +/** + * @file BThreadWork.c + * @author Ambroz Bizjak <ambrop7@gmail.com> + * + * @section LICENSE + * + * This file is part of BadVPN. + * + * BadVPN is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 + * as published by the Free Software Foundation. + * + * BadVPN is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include <stdint.h> + +#ifdef BADVPN_THREADWORK_USE_PTHREAD + #include <unistd.h> + #include <errno.h> +#endif + +#include <misc/offset.h> +#include <system/BLog.h> + +#include <generated/blog_channel_BThreadWork.h> + +#include <threadwork/BThreadWork.h> + +#ifdef BADVPN_THREADWORK_USE_PTHREAD + +static void * dispatcher_thread (BThreadWorkDispatcher *o) +{ + while (1) { + // wait for a work + ASSERT_FORCE(sem_wait(&o->new_sem) == 0) + + // grab the work + ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0) + if (LinkedList2_IsEmpty(&o->pending_list)) { + ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0) + continue; + } + BThreadWork *w = UPPER_OBJECT(LinkedList2_GetFirst(&o->pending_list), BThreadWork, list_node); + ASSERT(w->state == BTHREADWORK_STATE_PENDING) + LinkedList2_Remove(&o->pending_list, &w->list_node); + o->running_work = w; + w->state = BTHREADWORK_STATE_RUNNING; + ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0) + + // do the work + w->work_func(w->work_func_user); + + // release the work + ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0) + o->running_work = NULL; + LinkedList2_Append(&o->finished_list, &w->list_node); + w->state = BTHREADWORK_STATE_FINISHED; + ASSERT_FORCE(sem_post(&w->finished_sem) == 0) + ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0) + + // write to pipe + uint8_t b = 0; + ASSERT_FORCE(write(o->pipe[1], &b, sizeof(b)) == sizeof(b)) + } +} + +static void pipe_fd_handler (BThreadWorkDispatcher *o, int events) +{ + ASSERT(o->num_threads > 0) + DebugObject_Access(&o->d_obj); + + // read from pipe + uint8_t b; + int res = read(o->pipe[0], &b, sizeof(b)); + if (res < 0) { + int error = errno; + ASSERT_FORCE(error == EAGAIN || error == EWOULDBLOCK) + return; + } + ASSERT(res == sizeof(b)) + ASSERT(b == 0) + + // grab a finished work + ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0) + if (LinkedList2_IsEmpty(&o->finished_list)) { + ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0) + return; + } + BThreadWork *w = UPPER_OBJECT(LinkedList2_GetFirst(&o->finished_list), BThreadWork, list_node); + ASSERT(w->state == BTHREADWORK_STATE_FINISHED) + LinkedList2_Remove(&o->finished_list, &w->list_node); + ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0) + + // set state forgotten + w->state = BTHREADWORK_STATE_FORGOTTEN; + + // call handler + w->handler_done(w->user); + return; +} + +#endif + +static void work_job_handler (BThreadWork *o) +{ + ASSERT(o->d->num_threads == 0) + DebugObject_Access(&o->d_obj); + + // do the work + o->work_func(o->work_func_user); + + // call handler + o->handler_done(o->user); + return; +} + +int BThreadWorkDispatcher_Init (BThreadWorkDispatcher *o, BReactor *reactor, int num_threads_hint) +{ + // init arguments + o->reactor = reactor; + + // set num threads + #ifdef BADVPN_THREADWORK_USE_PTHREAD + if (num_threads_hint < 0) { + o->num_threads = 2; + } else { + o->num_threads = num_threads_hint; + } + #else + o->num_threads = 0; + #endif + + #ifdef BADVPN_THREADWORK_USE_PTHREAD + + if (o->num_threads > 0) { + // init pending list + LinkedList2_Init(&o->pending_list); + + // set no running work + o->running_work = NULL; + + // init finished list + LinkedList2_Init(&o->finished_list); + + // init mutex + if (pthread_mutex_init(&o->mutex, NULL) != 0) { + BLog(BLOG_ERROR, "pthread_mutex_init failed"); + goto fail0; + } + + // init semaphore + if (sem_init(&o->new_sem, 0, 0) != 0) { + BLog(BLOG_ERROR, "sem_init failed"); + goto fail1; + } + + // init pipe + if (pipe(o->pipe) < 0) { + BLog(BLOG_ERROR, "pipe failed"); + goto fail2; + } + + // init BFileDescriptor + BFileDescriptor_Init(&o->bfd, o->pipe[0], (BFileDescriptor_handler)pipe_fd_handler, o); + if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) { + BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed"); + goto fail3; + } + BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_READ); + + // init thread + if (pthread_create(&o->thread, NULL, (void * (*) (void *))dispatcher_thread, o) != 0) { + BLog(BLOG_ERROR, "pthread_create failed"); + goto fail4; + } + } + + #endif + + DebugObject_Init(&o->d_obj); + DebugCounter_Init(&o->d_ctr); + return 1; + + #ifdef BADVPN_THREADWORK_USE_PTHREAD +fail4: + BReactor_RemoveFileDescriptor(o->reactor, &o->bfd); +fail3: + ASSERT_FORCE(close(o->pipe[0]) == 0) + ASSERT_FORCE(close(o->pipe[1]) == 0) +fail2: + ASSERT_FORCE(sem_destroy(&o->new_sem) == 0) +fail1: + ASSERT_FORCE(pthread_mutex_destroy(&o->mutex) == 0) + #endif +fail0: + return 0; +} + +void BThreadWorkDispatcher_Free (BThreadWorkDispatcher *o) +{ + #ifdef BADVPN_THREADWORK_USE_PTHREAD + if (o->num_threads > 0) { + ASSERT(LinkedList2_IsEmpty(&o->pending_list)) + ASSERT(!o->running_work) + ASSERT(LinkedList2_IsEmpty(&o->finished_list)) + } + #endif + DebugObject_Free(&o->d_obj); + DebugCounter_Free(&o->d_ctr); + + #ifdef BADVPN_THREADWORK_USE_PTHREAD + + if (o->num_threads > 0) { + // stop thread + ASSERT_FORCE(pthread_cancel(o->thread) == 0) + void *retval; + ASSERT_FORCE(pthread_join(o->thread, &retval) == 0) + + // free BFileDescriptor + BReactor_RemoveFileDescriptor(o->reactor, &o->bfd); + + // free pipe + ASSERT_FORCE(close(o->pipe[0]) == 0) + ASSERT_FORCE(close(o->pipe[1]) == 0) + + // free semaphore + ASSERT_FORCE(sem_destroy(&o->new_sem) == 0) + + // free mutex + ASSERT_FORCE(pthread_mutex_destroy(&o->mutex) == 0) + } + + #endif +} + +void BThreadWork_Init (BThreadWork *o, BThreadWorkDispatcher *d, BThreadWork_handler_done handler_done, void *user, BThreadWork_work_func work_func, void *work_func_user) +{ + DebugObject_Access(&d->d_obj); + + // init arguments + o->d = d; + o->handler_done = handler_done; + o->user = user; + o->work_func = work_func; + o->work_func_user = work_func_user; + + #ifdef BADVPN_THREADWORK_USE_PTHREAD + if (d->num_threads > 0) { + // set state + o->state = BTHREADWORK_STATE_PENDING; + + // init finished semaphore + ASSERT_FORCE(sem_init(&o->finished_sem, 0, 0) == 0) + + // insert to pending list + ASSERT_FORCE(pthread_mutex_lock(&d->mutex) == 0) + LinkedList2_Append(&d->pending_list, &o->list_node); + ASSERT_FORCE(pthread_mutex_unlock(&d->mutex) == 0) + + // post to new semaphore + ASSERT_FORCE(sem_post(&d->new_sem) == 0) + } else { + #endif + // schedule job + BPending_Init(&o->job, BReactor_PendingGroup(d->reactor), (BPending_handler)work_job_handler, o); + BPending_Set(&o->job); + #ifdef BADVPN_THREADWORK_USE_PTHREAD + } + #endif + + DebugObject_Init(&o->d_obj); + DebugCounter_Increment(&d->d_ctr); +} + +void BThreadWork_Free (BThreadWork *o) +{ + BThreadWorkDispatcher *d = o->d; + DebugObject_Free(&o->d_obj); + DebugCounter_Decrement(&d->d_ctr); + + #ifdef BADVPN_THREADWORK_USE_PTHREAD + if (d->num_threads > 0) { + ASSERT_FORCE(pthread_mutex_lock(&d->mutex) == 0) + + switch (o->state) { + case BTHREADWORK_STATE_PENDING: { + BLog(BLOG_DEBUG, "remove pending work"); + + // remove from pending list + LinkedList2_Remove(&d->pending_list, &o->list_node); + } break; + + case BTHREADWORK_STATE_RUNNING: { + BLog(BLOG_DEBUG, "remove running work"); + + // wait for the work to finish running + ASSERT_FORCE(pthread_mutex_unlock(&d->mutex) == 0) + ASSERT_FORCE(sem_wait(&o->finished_sem) == 0) + ASSERT_FORCE(pthread_mutex_lock(&d->mutex) == 0) + + ASSERT(o->state == BTHREADWORK_STATE_FINISHED) + + // remove from finished list + LinkedList2_Remove(&d->finished_list, &o->list_node); + } break; + + case BTHREADWORK_STATE_FINISHED: { + BLog(BLOG_DEBUG, "remove finished work"); + + // remove from finished list + LinkedList2_Remove(&d->finished_list, &o->list_node); + } break; + + case BTHREADWORK_STATE_FORGOTTEN: { + BLog(BLOG_DEBUG, "remove forgotten work"); + } break; + + default: + ASSERT(0); + } + + ASSERT_FORCE(pthread_mutex_unlock(&d->mutex) == 0) + + // free finished semaphore + ASSERT_FORCE(sem_destroy(&o->finished_sem) == 0) + } else { + #endif + BPending_Free(&o->job); + #ifdef BADVPN_THREADWORK_USE_PTHREAD + } + #endif +} diff --git a/threadwork/BThreadWork.h b/threadwork/BThreadWork.h new file mode 100644 index 0000000..930b4ab --- /dev/null +++ b/threadwork/BThreadWork.h @@ -0,0 +1,144 @@ +/** + * @file BThreadWork.h + * @author Ambroz Bizjak <ambrop7@gmail.com> + * + * @section LICENSE + * + * This file is part of BadVPN. + * + * BadVPN is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 + * as published by the Free Software Foundation. + * + * BadVPN is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * @section DESCRIPTION + * + * System for performing computations (possibly) in parallel with the event loop + * in a different thread. + */ + +#ifndef BADVPN_BTHREADWORK_BTHREADWORK_H +#define BADVPN_BTHREADWORK_BTHREADWORK_H + +#ifdef BADVPN_THREADWORK_USE_PTHREAD + #include <pthread.h> + #include <semaphore.h> +#endif + +#include <misc/debug.h> +#include <structure/LinkedList2.h> +#include <system/DebugObject.h> +#include <system/BReactor.h> + +#define BTHREADWORK_STATE_PENDING 1 +#define BTHREADWORK_STATE_RUNNING 2 +#define BTHREADWORK_STATE_FINISHED 3 +#define BTHREADWORK_STATE_FORGOTTEN 4 + +struct BThreadWork_s; + +/** + * Function called to do the work for a {@link BThreadWork}. + * The function may be called in another thread, in parallel with the event loop. + * + * @param user as work_func_user in {@link BThreadWork_Init} + */ +typedef void (*BThreadWork_work_func) (void *user); + +/** + * Handler called when a {@link BThreadWork} work is done. + * + * @param user as in {@link BThreadWork_Init} + */ +typedef void (*BThreadWork_handler_done) (void *user); + +typedef struct { + BReactor *reactor; + int num_threads; + #ifdef BADVPN_THREADWORK_USE_PTHREAD + LinkedList2 pending_list; + struct BThreadWork_s *running_work; + LinkedList2 finished_list; + pthread_mutex_t mutex; + sem_t new_sem; + int pipe[2]; + BFileDescriptor bfd; + pthread_t thread; + #endif + DebugObject d_obj; + DebugCounter d_ctr; +} BThreadWorkDispatcher; + +typedef struct BThreadWork_s { + BThreadWorkDispatcher *d; + BThreadWork_handler_done handler_done; + void *user; + BThreadWork_work_func work_func; + void *work_func_user; + union { + #ifdef BADVPN_THREADWORK_USE_PTHREAD + struct { + LinkedList2Node list_node; + int state; + sem_t finished_sem; + }; + #endif + struct { + BPending job; + }; + }; + DebugObject d_obj; +} BThreadWork; + +/** + * Initializes the work dispatcher. + * Works may be started using {@link BThreadWork_Init}. + * + * @param o the object + * @param reactor reactor we live in + * @param num_threads_hint hint for the number of threads to use: + * <0 - A choice will be made automatically, probably based on the number of CPUs. + * 0 - No additional threads will be used, and computations will be performed directly + * in the event loop in job handlers. + * @return 1 on success, 0 on failure + */ +int BThreadWorkDispatcher_Init (BThreadWorkDispatcher *o, BReactor *reactor, int num_threads_hint) WARN_UNUSED; + +/** + * Frees the work dispatcher. + * There must be no {@link BThreadWork}'s with this dispatcher. + * + * @param o the object + */ +void BThreadWorkDispatcher_Free (BThreadWorkDispatcher *o); + +/** + * Initializes the work. + * + * @param o the object + * @param d work dispatcher + * @param handler_done handler to call when the work is done + * @param user argument to handler + * @param work_func function that will do the work, possibly from another thread + * @param work_func_user argument to work_func + */ +void BThreadWork_Init (BThreadWork *o, BThreadWorkDispatcher *d, BThreadWork_handler_done handler_done, void *user, BThreadWork_work_func work_func, void *work_func_user); + +/** + * Frees the work. + * After this function returns, the work function will either have fully executed, + * or not called at all, and never will be. + * + * @param o the object + */ +void BThreadWork_Free (BThreadWork *o); + +#endif diff --git a/threadwork/CMakeLists.txt b/threadwork/CMakeLists.txt new file mode 100644 index 0000000..d7e6412 --- /dev/null +++ b/threadwork/CMakeLists.txt @@ -0,0 +1,7 @@ +set(BADVPN_THREADWORK_EXTRA_LIBS) +if (BADVPN_THREADWORK_USE_PTHREAD) + list(APPEND BADVPN_THREADWORK_EXTRA_LIBS pthread) +endif () + +add_library(threadwork BThreadWork.c) +target_link_libraries(threadwork system ${BADVPN_THREADWORK_EXTRA_LIBS}) |