diff options
author | Lukas Toenne <lukas.toenne@googlemail.com> | 2012-06-10 16:26:33 +0400 |
---|---|---|
committer | Lukas Toenne <lukas.toenne@googlemail.com> | 2012-06-10 16:26:33 +0400 |
commit | 7496a58cfb3e9c77645265af262543b357f9e361 (patch) | |
tree | e2ccc2b22e1a5b077c6f840e6974a039480361bd /source | |
parent | 5e29381825bb7e84b6e3535bd6ca6a53c8079e2a (diff) |
Applied and completed a compositor patch by Brecht to use signalling and waiting in scheduling and worker threads instead of continuous loops with sleep times. This should help reduce unnecessary wait times in Tile.
Diffstat (limited to 'source')
-rw-r--r-- | source/blender/blenlib/BLI_threads.h | 1 | ||||
-rw-r--r-- | source/blender/blenlib/intern/threads.c | 51 | ||||
-rw-r--r-- | source/blender/compositor/intern/COM_ExecutionGroup.cpp | 3 | ||||
-rw-r--r-- | source/blender/compositor/intern/COM_WorkScheduler.cpp | 65 | ||||
-rw-r--r-- | source/blender/compositor/intern/COM_WorkScheduler.h | 13 |
5 files changed, 60 insertions, 73 deletions
diff --git a/source/blender/blenlib/BLI_threads.h b/source/blender/blenlib/BLI_threads.h index b13da9f0dd4..fb8771722c1 100644 --- a/source/blender/blenlib/BLI_threads.h +++ b/source/blender/blenlib/BLI_threads.h @@ -136,6 +136,7 @@ void *BLI_thread_queue_pop(ThreadQueue *queue); void *BLI_thread_queue_pop_timeout(ThreadQueue *queue, int ms); int BLI_thread_queue_size(ThreadQueue *queue); +void BLI_thread_queue_wait_finish(ThreadQueue *queue); void BLI_thread_queue_nowait(ThreadQueue *queue); #endif diff --git a/source/blender/blenlib/intern/threads.c b/source/blender/blenlib/intern/threads.c index f9f677d7c22..dc4c15a82fc 100644 --- a/source/blender/blenlib/intern/threads.c +++ b/source/blender/blenlib/intern/threads.c @@ -520,8 +520,10 @@ void BLI_insert_work(ThreadedWorker *worker, void *param) struct ThreadQueue { GSQueue *queue; pthread_mutex_t mutex; - pthread_cond_t cond; - int nowait; + pthread_cond_t push_cond; + pthread_cond_t finish_cond; + volatile int nowait; + volatile int cancelled; }; ThreadQueue *BLI_thread_queue_init(void) @@ -532,14 +534,17 @@ ThreadQueue *BLI_thread_queue_init(void) queue->queue = BLI_gsqueue_new(sizeof(void *)); pthread_mutex_init(&queue->mutex, NULL); - pthread_cond_init(&queue->cond, NULL); + pthread_cond_init(&queue->push_cond, NULL); + pthread_cond_init(&queue->finish_cond, NULL); return queue; } void BLI_thread_queue_free(ThreadQueue *queue) { - pthread_cond_destroy(&queue->cond); + /* destroy everything, assumes no one is using queue anymore */ + pthread_cond_destroy(&queue->finish_cond); + pthread_cond_destroy(&queue->push_cond); pthread_mutex_destroy(&queue->mutex); BLI_gsqueue_free(queue->queue); @@ -554,7 +559,7 @@ void BLI_thread_queue_push(ThreadQueue *queue, void *work) BLI_gsqueue_push(queue->queue, &work); /* signal threads waiting to pop */ - pthread_cond_signal(&queue->cond); + pthread_cond_signal(&queue->push_cond); pthread_mutex_unlock(&queue->mutex); } @@ -565,11 +570,15 @@ void *BLI_thread_queue_pop(ThreadQueue *queue) /* wait until there is work */ pthread_mutex_lock(&queue->mutex); while (BLI_gsqueue_is_empty(queue->queue) && !queue->nowait) - pthread_cond_wait(&queue->cond, &queue->mutex); - + pthread_cond_wait(&queue->push_cond, &queue->mutex); + /* if we have something, pop it */ - if (!BLI_gsqueue_is_empty(queue->queue)) + if (!BLI_gsqueue_is_empty(queue->queue)) { BLI_gsqueue_pop(queue->queue, &work); + + if(BLI_gsqueue_is_empty(queue->queue)) + pthread_cond_broadcast(&queue->finish_cond); + } pthread_mutex_unlock(&queue->mutex); @@ -623,16 +632,20 @@ void *BLI_thread_queue_pop_timeout(ThreadQueue *queue, int ms) /* wait until there is work */ pthread_mutex_lock(&queue->mutex); while (BLI_gsqueue_is_empty(queue->queue) && !queue->nowait) { - if (pthread_cond_timedwait(&queue->cond, &queue->mutex, &timeout) == ETIMEDOUT) + if (pthread_cond_timedwait(&queue->push_cond, &queue->mutex, &timeout) == ETIMEDOUT) break; else if (PIL_check_seconds_timer() - t >= ms * 0.001) break; } /* if we have something, pop it */ - if (!BLI_gsqueue_is_empty(queue->queue)) + if (!BLI_gsqueue_is_empty(queue->queue)) { BLI_gsqueue_pop(queue->queue, &work); - + + if(BLI_gsqueue_is_empty(queue->queue)) + pthread_cond_broadcast(&queue->finish_cond); + } + pthread_mutex_unlock(&queue->mutex); return work; @@ -656,10 +669,23 @@ void BLI_thread_queue_nowait(ThreadQueue *queue) queue->nowait = 1; /* signal threads waiting to pop */ - pthread_cond_signal(&queue->cond); + pthread_cond_broadcast(&queue->push_cond); + pthread_mutex_unlock(&queue->mutex); +} + +void BLI_thread_queue_wait_finish(ThreadQueue *queue) +{ + /* wait for finish condition */ + pthread_mutex_lock(&queue->mutex); + + while(!BLI_gsqueue_is_empty(queue->queue)) + pthread_cond_wait(&queue->finish_cond, &queue->mutex); + pthread_mutex_unlock(&queue->mutex); } +/* ************************************************ */ + void BLI_begin_threaded_malloc(void) { if (thread_levels == 0) { @@ -674,3 +700,4 @@ void BLI_end_threaded_malloc(void) if (thread_levels == 0) MEM_set_lock_callback(NULL, NULL); } + diff --git a/source/blender/compositor/intern/COM_ExecutionGroup.cpp b/source/blender/compositor/intern/COM_ExecutionGroup.cpp index 44b3c8dafbb..481b83c81a3 100644 --- a/source/blender/compositor/intern/COM_ExecutionGroup.cpp +++ b/source/blender/compositor/intern/COM_ExecutionGroup.cpp @@ -351,7 +351,8 @@ void ExecutionGroup::execute(ExecutionSystem *graph) startIndex = index+1; } } - PIL_sleep_ms(10); + + WorkScheduler::finish(); if (bTree->test_break && bTree->test_break(bTree->tbh)) { breaked = true; diff --git a/source/blender/compositor/intern/COM_WorkScheduler.cpp b/source/blender/compositor/intern/COM_WorkScheduler.cpp index 172107f720b..ba8bfe55310 100644 --- a/source/blender/compositor/intern/COM_WorkScheduler.cpp +++ b/source/blender/compositor/intern/COM_WorkScheduler.cpp @@ -39,8 +39,6 @@ #endif -/// @brief global state of the WorkScheduler. -static WorkSchedulerState state; /// @brief list of all CPUDevices. for every hardware thread an instance of CPUDevice is created static vector<CPUDevice*> cpudevices; @@ -68,43 +66,29 @@ static bool openclActive = false; #if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE void *WorkScheduler::thread_execute_cpu(void *data) { - bool continueLoop = true; Device *device = (Device*)data; - while (continueLoop) { - WorkPackage *work = (WorkPackage*)BLI_thread_queue_pop(cpuqueue); - if (work) { - device->execute(work); - delete work; - } - PIL_sleep_ms(10); - - if (WorkScheduler::isStopping()) { - continueLoop = false; - } + WorkPackage *work; + + while ((work = (WorkPackage*)BLI_thread_queue_pop(cpuqueue))) { + device->execute(work); + delete work; } + return NULL; } void *WorkScheduler::thread_execute_gpu(void *data) { - bool continueLoop = true; Device *device = (Device*)data; - while (continueLoop) { - WorkPackage *work = (WorkPackage*)BLI_thread_queue_pop(gpuqueue); - if (work) { - device->execute(work); - delete work; - } - PIL_sleep_ms(10); - - if (WorkScheduler::isStopping()) { - continueLoop = false; - } + WorkPackage *work; + + while ((work = (WorkPackage*)BLI_thread_queue_pop(gpuqueue))) { + device->execute(work); + delete work; } + return NULL; } - -bool WorkScheduler::isStopping() {return state == COM_WSS_STOPPING;} #endif @@ -135,7 +119,6 @@ void WorkScheduler::start(CompositorContext &context) #if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE unsigned int index; cpuqueue = BLI_thread_queue_init(); - BLI_thread_queue_nowait(cpuqueue); BLI_init_threads(&cputhreads, thread_execute_cpu, cpudevices.size()); for (index = 0 ; index < cpudevices.size() ; index ++) { Device *device = cpudevices[index]; @@ -144,7 +127,6 @@ void WorkScheduler::start(CompositorContext &context) #ifdef COM_OPENCL_ENABLED if (context.getHasActiveOpenCLDevices()) { gpuqueue = BLI_thread_queue_init(); - BLI_thread_queue_nowait(gpuqueue); BLI_init_threads(&gputhreads, thread_execute_gpu, gpudevices.size()); for (index = 0 ; index < gpudevices.size() ; index ++) { Device *device = gpudevices[index]; @@ -157,45 +139,39 @@ void WorkScheduler::start(CompositorContext &context) } #endif #endif - state = COM_WSS_STARTED; } void WorkScheduler::finish() { #if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE #ifdef COM_OPENCL_ENABLED if (openclActive) { - while (BLI_thread_queue_size(gpuqueue) + BLI_thread_queue_size(cpuqueue) > 0) { - PIL_sleep_ms(10); - } + BLI_thread_queue_wait_finish(gpuqueue); + BLI_thread_queue_wait_finish(cpuqueue); } else { - while (BLI_thread_queue_size(cpuqueue) > 0) { - PIL_sleep_ms(10); - } + BLI_thread_queue_wait_finish(cpuqueue); } #else - while (BLI_thread_queue_size(cpuqueue) > 0) { - PIL_sleep_ms(10); - } + BLI_thread_queue_wait_finish(cpuqueue); #endif #endif } void WorkScheduler::stop() { - state = COM_WSS_STOPPING; #if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE + BLI_thread_queue_nowait(cpuqueue); BLI_end_threads(&cputhreads); BLI_thread_queue_free(cpuqueue); cpuqueue = NULL; #ifdef COM_OPENCL_ENABLED if (openclActive) { + BLI_thread_queue_nowait(gpuqueue); BLI_end_threads(&gputhreads); BLI_thread_queue_free(gpuqueue); gpuqueue = NULL; } #endif #endif - state = COM_WSS_STOPPED; } bool WorkScheduler::hasGPUDevices() @@ -218,8 +194,6 @@ extern void clContextError(const char *errinfo, const void *private_info, size_t void WorkScheduler::initialize() { - state = COM_WSS_UNKNOWN; - #if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE int numberOfCPUThreads = BLI_system_thread_count(); @@ -298,8 +272,6 @@ void WorkScheduler::initialize() } #endif #endif - - state = COM_WSS_INITIALIZED; } void WorkScheduler::deinitialize() @@ -329,5 +301,4 @@ void WorkScheduler::deinitialize() } #endif #endif - state = COM_WSS_DEINITIALIZED; } diff --git a/source/blender/compositor/intern/COM_WorkScheduler.h b/source/blender/compositor/intern/COM_WorkScheduler.h index 0de1763749e..b03b514d139 100644 --- a/source/blender/compositor/intern/COM_WorkScheduler.h +++ b/source/blender/compositor/intern/COM_WorkScheduler.h @@ -31,19 +31,6 @@ extern "C" { #include "COM_defines.h" #include "COM_Device.h" -// STATES -/** @brief states of the WorkScheduler - * @ingroup execution - */ -typedef enum WorkSchedulerState { - COM_WSS_UNKNOWN = -1, - COM_WSS_INITIALIZED = 0, - COM_WSS_STARTED = 1, - COM_WSS_STOPPING = 2, - COM_WSS_STOPPED = 3, - COM_WSS_DEINITIALIZED = 4 -} WorkSchedulerState; - /** @brief the workscheduler * @ingroup execution */ |