diff options
-rw-r--r-- | source/blender/blenlib/BLI_gsqueue.h | 5 | ||||
-rw-r--r-- | source/blender/blenlib/BLI_threads.h | 15 | ||||
-rw-r--r-- | source/blender/blenlib/intern/threads.c | 132 |
3 files changed, 151 insertions, 1 deletions
diff --git a/source/blender/blenlib/BLI_gsqueue.h b/source/blender/blenlib/BLI_gsqueue.h index 16157ebe0a8..29b15bcf55a 100644 --- a/source/blender/blenlib/BLI_gsqueue.h +++ b/source/blender/blenlib/BLI_gsqueue.h @@ -49,6 +49,11 @@ GSQueue* BLI_gsqueue_new (int elem_size); int BLI_gsqueue_is_empty(GSQueue *gq); /** + * Query number elements in the queue + */ +int BLI_gsqueue_size(GSQueue *gq); + + /** * Access the item at the head of the queue * without removing it. * diff --git a/source/blender/blenlib/BLI_threads.h b/source/blender/blenlib/BLI_threads.h index ace9ddd729f..089ee9bdbdd 100644 --- a/source/blender/blenlib/BLI_threads.h +++ b/source/blender/blenlib/BLI_threads.h @@ -107,6 +107,21 @@ void BLI_destroy_worker(struct ThreadedWorker *worker); * NOTE: inserting work is NOT thread safe, so make sure it is only done from one thread */ void BLI_insert_work(struct ThreadedWorker *worker, void *param); +/* ThreadWorkQueue + * + * Thread-safe work queue to push work/pointers between threads. */ + +typedef struct ThreadQueue ThreadQueue; + +ThreadQueue *BLI_thread_queue_init(); +void BLI_thread_queue_free(ThreadQueue *queue); + +void BLI_thread_queue_push(ThreadQueue *queue, void *work); +void *BLI_thread_queue_pop(ThreadQueue *queue); +void *BLI_thread_queue_pop_timeout(ThreadQueue *queue, int ms); +int BLI_thread_queue_size(ThreadQueue *queue); + +void BLI_thread_queue_nowait(ThreadQueue *queue); #endif diff --git a/source/blender/blenlib/intern/threads.c b/source/blender/blenlib/intern/threads.c index b5c6a5a3b4e..e5d0385b6e3 100644 --- a/source/blender/blenlib/intern/threads.c +++ b/source/blender/blenlib/intern/threads.c @@ -32,12 +32,14 @@ #include <stdlib.h> #include <string.h> #include <pthread.h> +#include <errno.h> #include "MEM_guardedalloc.h" #include "DNA_listBase.h" #include "BLI_blenlib.h" +#include "BLI_gsqueue.h" #include "BLI_threads.h" #include "PIL_time.h" @@ -458,4 +460,132 @@ void BLI_insert_work(ThreadedWorker *worker, void *param) BLI_insert_thread(&worker->threadbase, p); } -/* eof */ +/* ************************************************ */ + +struct ThreadQueue { + GSQueue *queue; + pthread_mutex_t mutex; + pthread_cond_t cond; + int nowait; +}; + +ThreadQueue *BLI_thread_queue_init() +{ + ThreadQueue *queue; + + queue= MEM_callocN(sizeof(ThreadQueue), "ThreadQueue"); + queue->queue= BLI_gsqueue_new(sizeof(void*)); + + pthread_mutex_init(&queue->mutex, NULL); + pthread_cond_init(&queue->cond, NULL); + + return queue; +} + +void BLI_thread_queue_free(ThreadQueue *queue) +{ + pthread_cond_destroy(&queue->cond); + pthread_mutex_destroy(&queue->mutex); + + BLI_gsqueue_free(queue->queue); + + MEM_freeN(queue); +} + +void BLI_thread_queue_push(ThreadQueue *queue, void *work) +{ + pthread_mutex_lock(&queue->mutex); + + BLI_gsqueue_push(queue->queue, &work); + + /* signal threads waiting to pop */ + pthread_cond_signal(&queue->cond); + pthread_mutex_unlock(&queue->mutex); +} + +void *BLI_thread_queue_pop(ThreadQueue *queue) +{ + void *work= NULL; + + /* wait until there is work */ + pthread_mutex_lock(&queue->mutex); + while(BLI_gsqueue_is_empty(queue->queue) && !queue->nowait) + pthread_cond_wait(&queue->cond, &queue->mutex); + + /* if we have something, pop it */ + if(!BLI_gsqueue_is_empty(queue->queue)) + BLI_gsqueue_pop(queue->queue, &work); + + pthread_mutex_unlock(&queue->mutex); + + return work; +} + +static void wait_timeout(struct timespec *timeout, int ms) +{ + struct timeval now; + ldiv_t div_result; + long x; + + gettimeofday(&now, NULL); + div_result = ldiv(ms, 1000); + timeout->tv_sec = now.tv_sec + div_result.quot; + x = now.tv_usec + (div_result.rem*1000); + + if (x >= 1000000) { + timeout->tv_sec++; + x -= 1000000; + } + + timeout->tv_nsec = x*1000; +} + +void *BLI_thread_queue_pop_timeout(ThreadQueue *queue, int ms) +{ + double t; + void *work= NULL; + struct timespec timeout; + + t= PIL_check_seconds_timer(); + wait_timeout(&timeout, ms); + + /* wait until there is work */ + pthread_mutex_lock(&queue->mutex); + while(BLI_gsqueue_is_empty(queue->queue) && !queue->nowait) { + if(pthread_cond_timedwait(&queue->cond, &queue->mutex, &timeout) == ETIMEDOUT) + break; + else if(PIL_check_seconds_timer() - t >= ms*0.001) + break; + } + + /* if we have something, pop it */ + if(!BLI_gsqueue_is_empty(queue->queue)) + BLI_gsqueue_pop(queue->queue, &work); + + pthread_mutex_unlock(&queue->mutex); + + return work; +} + +int BLI_thread_queue_size(ThreadQueue *queue) +{ + int size; + + pthread_mutex_lock(&queue->mutex); + size= BLI_gsqueue_size(queue->queue); + pthread_mutex_unlock(&queue->mutex); + + return size; +} + +void BLI_thread_queue_nowait(ThreadQueue *queue) +{ + pthread_mutex_lock(&queue->mutex); + + queue->nowait= 1; + + /* signal threads waiting to pop */ + pthread_cond_signal(&queue->cond); + pthread_mutex_unlock(&queue->mutex); +} + |