Welcome to mirror list, hosted at ThFree Co, Russian Federation.

git.blender.org/blender.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBrecht Van Lommel <brechtvanlommel@pandora.be>2010-01-22 14:06:57 +0300
committerBrecht Van Lommel <brechtvanlommel@pandora.be>2010-01-22 14:06:57 +0300
commit2d2339a70992b819a23e8c71f68027022b395f46 (patch)
tree747707ad2e7660e95e67b9c417feb5a619685072 /source/blender/blenlib
parentcbc4aae06a9b3878b0af68884a75b953e3c8612b (diff)
Threads: added queue for passing data between threads. Includes a function
to wait for an item to be put in the queue and then pop immediately without, this makes it possible to avoid sleep() while waiting for the results of a thread.
Diffstat (limited to 'source/blender/blenlib')
-rw-r--r--source/blender/blenlib/BLI_gsqueue.h5
-rw-r--r--source/blender/blenlib/BLI_threads.h15
-rw-r--r--source/blender/blenlib/intern/threads.c132
3 files changed, 151 insertions, 1 deletions
diff --git a/source/blender/blenlib/BLI_gsqueue.h b/source/blender/blenlib/BLI_gsqueue.h
index 16157ebe0a8..29b15bcf55a 100644
--- a/source/blender/blenlib/BLI_gsqueue.h
+++ b/source/blender/blenlib/BLI_gsqueue.h
@@ -49,6 +49,11 @@ GSQueue* BLI_gsqueue_new (int elem_size);
int BLI_gsqueue_is_empty(GSQueue *gq);
/**
+ * Query number elements in the queue
+ */
+int BLI_gsqueue_size(GSQueue *gq);
+
+ /**
* Access the item at the head of the queue
* without removing it.
*
diff --git a/source/blender/blenlib/BLI_threads.h b/source/blender/blenlib/BLI_threads.h
index ace9ddd729f..089ee9bdbdd 100644
--- a/source/blender/blenlib/BLI_threads.h
+++ b/source/blender/blenlib/BLI_threads.h
@@ -107,6 +107,21 @@ void BLI_destroy_worker(struct ThreadedWorker *worker);
* NOTE: inserting work is NOT thread safe, so make sure it is only done from one thread */
void BLI_insert_work(struct ThreadedWorker *worker, void *param);
+/* ThreadWorkQueue
+ *
+ * Thread-safe work queue to push work/pointers between threads. */
+
+typedef struct ThreadQueue ThreadQueue;
+
+ThreadQueue *BLI_thread_queue_init();
+void BLI_thread_queue_free(ThreadQueue *queue);
+
+void BLI_thread_queue_push(ThreadQueue *queue, void *work);
+void *BLI_thread_queue_pop(ThreadQueue *queue);
+void *BLI_thread_queue_pop_timeout(ThreadQueue *queue, int ms);
+int BLI_thread_queue_size(ThreadQueue *queue);
+
+void BLI_thread_queue_nowait(ThreadQueue *queue);
#endif
diff --git a/source/blender/blenlib/intern/threads.c b/source/blender/blenlib/intern/threads.c
index b5c6a5a3b4e..e5d0385b6e3 100644
--- a/source/blender/blenlib/intern/threads.c
+++ b/source/blender/blenlib/intern/threads.c
@@ -32,12 +32,14 @@
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
+#include <errno.h>
#include "MEM_guardedalloc.h"
#include "DNA_listBase.h"
#include "BLI_blenlib.h"
+#include "BLI_gsqueue.h"
#include "BLI_threads.h"
#include "PIL_time.h"
@@ -458,4 +460,132 @@ void BLI_insert_work(ThreadedWorker *worker, void *param)
BLI_insert_thread(&worker->threadbase, p);
}
-/* eof */
+/* ************************************************ */
+
+struct ThreadQueue {
+ GSQueue *queue;
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+ int nowait;
+};
+
+ThreadQueue *BLI_thread_queue_init()
+{
+ ThreadQueue *queue;
+
+ queue= MEM_callocN(sizeof(ThreadQueue), "ThreadQueue");
+ queue->queue= BLI_gsqueue_new(sizeof(void*));
+
+ pthread_mutex_init(&queue->mutex, NULL);
+ pthread_cond_init(&queue->cond, NULL);
+
+ return queue;
+}
+
+void BLI_thread_queue_free(ThreadQueue *queue)
+{
+ pthread_cond_destroy(&queue->cond);
+ pthread_mutex_destroy(&queue->mutex);
+
+ BLI_gsqueue_free(queue->queue);
+
+ MEM_freeN(queue);
+}
+
+void BLI_thread_queue_push(ThreadQueue *queue, void *work)
+{
+ pthread_mutex_lock(&queue->mutex);
+
+ BLI_gsqueue_push(queue->queue, &work);
+
+ /* signal threads waiting to pop */
+ pthread_cond_signal(&queue->cond);
+ pthread_mutex_unlock(&queue->mutex);
+}
+
+void *BLI_thread_queue_pop(ThreadQueue *queue)
+{
+ void *work= NULL;
+
+ /* wait until there is work */
+ pthread_mutex_lock(&queue->mutex);
+ while(BLI_gsqueue_is_empty(queue->queue) && !queue->nowait)
+ pthread_cond_wait(&queue->cond, &queue->mutex);
+
+ /* if we have something, pop it */
+ if(!BLI_gsqueue_is_empty(queue->queue))
+ BLI_gsqueue_pop(queue->queue, &work);
+
+ pthread_mutex_unlock(&queue->mutex);
+
+ return work;
+}
+
+static void wait_timeout(struct timespec *timeout, int ms)
+{
+ struct timeval now;
+ ldiv_t div_result;
+ long x;
+
+ gettimeofday(&now, NULL);
+ div_result = ldiv(ms, 1000);
+ timeout->tv_sec = now.tv_sec + div_result.quot;
+ x = now.tv_usec + (div_result.rem*1000);
+
+ if (x >= 1000000) {
+ timeout->tv_sec++;
+ x -= 1000000;
+ }
+
+ timeout->tv_nsec = x*1000;
+}
+
+void *BLI_thread_queue_pop_timeout(ThreadQueue *queue, int ms)
+{
+ double t;
+ void *work= NULL;
+ struct timespec timeout;
+
+ t= PIL_check_seconds_timer();
+ wait_timeout(&timeout, ms);
+
+ /* wait until there is work */
+ pthread_mutex_lock(&queue->mutex);
+ while(BLI_gsqueue_is_empty(queue->queue) && !queue->nowait) {
+ if(pthread_cond_timedwait(&queue->cond, &queue->mutex, &timeout) == ETIMEDOUT)
+ break;
+ else if(PIL_check_seconds_timer() - t >= ms*0.001)
+ break;
+ }
+
+ /* if we have something, pop it */
+ if(!BLI_gsqueue_is_empty(queue->queue))
+ BLI_gsqueue_pop(queue->queue, &work);
+
+ pthread_mutex_unlock(&queue->mutex);
+
+ return work;
+}
+
+int BLI_thread_queue_size(ThreadQueue *queue)
+{
+ int size;
+
+ pthread_mutex_lock(&queue->mutex);
+ size= BLI_gsqueue_size(queue->queue);
+ pthread_mutex_unlock(&queue->mutex);
+
+ return size;
+}
+
+void BLI_thread_queue_nowait(ThreadQueue *queue)
+{
+ pthread_mutex_lock(&queue->mutex);
+
+ queue->nowait= 1;
+
+ /* signal threads waiting to pop */
+ pthread_cond_signal(&queue->cond);
+ pthread_mutex_unlock(&queue->mutex);
+}
+