From 7496a58cfb3e9c77645265af262543b357f9e361 Mon Sep 17 00:00:00 2001 From: Lukas Toenne Date: Sun, 10 Jun 2012 12:26:33 +0000 Subject: Applied and completed a compositor patch by Brecht to use signalling and waiting in scheduling and worker threads instead of continuous loops with sleep times. This should help reduce unnecessary wait times in Tile. --- source/blender/blenlib/BLI_threads.h | 1 + source/blender/blenlib/intern/threads.c | 51 +++++++++++++---- .../compositor/intern/COM_ExecutionGroup.cpp | 3 +- .../compositor/intern/COM_WorkScheduler.cpp | 65 ++++++---------------- .../blender/compositor/intern/COM_WorkScheduler.h | 13 ----- 5 files changed, 60 insertions(+), 73 deletions(-) (limited to 'source') diff --git a/source/blender/blenlib/BLI_threads.h b/source/blender/blenlib/BLI_threads.h index b13da9f0dd4..fb8771722c1 100644 --- a/source/blender/blenlib/BLI_threads.h +++ b/source/blender/blenlib/BLI_threads.h @@ -136,6 +136,7 @@ void *BLI_thread_queue_pop(ThreadQueue *queue); void *BLI_thread_queue_pop_timeout(ThreadQueue *queue, int ms); int BLI_thread_queue_size(ThreadQueue *queue); +void BLI_thread_queue_wait_finish(ThreadQueue *queue); void BLI_thread_queue_nowait(ThreadQueue *queue); #endif diff --git a/source/blender/blenlib/intern/threads.c b/source/blender/blenlib/intern/threads.c index f9f677d7c22..dc4c15a82fc 100644 --- a/source/blender/blenlib/intern/threads.c +++ b/source/blender/blenlib/intern/threads.c @@ -520,8 +520,10 @@ void BLI_insert_work(ThreadedWorker *worker, void *param) struct ThreadQueue { GSQueue *queue; pthread_mutex_t mutex; - pthread_cond_t cond; - int nowait; + pthread_cond_t push_cond; + pthread_cond_t finish_cond; + volatile int nowait; + volatile int cancelled; }; ThreadQueue *BLI_thread_queue_init(void) @@ -532,14 +534,17 @@ ThreadQueue *BLI_thread_queue_init(void) queue->queue = BLI_gsqueue_new(sizeof(void *)); pthread_mutex_init(&queue->mutex, NULL); - pthread_cond_init(&queue->cond, NULL); + pthread_cond_init(&queue->push_cond, NULL); + pthread_cond_init(&queue->finish_cond, NULL); return queue; } void BLI_thread_queue_free(ThreadQueue *queue) { - pthread_cond_destroy(&queue->cond); + /* destroy everything, assumes no one is using queue anymore */ + pthread_cond_destroy(&queue->finish_cond); + pthread_cond_destroy(&queue->push_cond); pthread_mutex_destroy(&queue->mutex); BLI_gsqueue_free(queue->queue); @@ -554,7 +559,7 @@ void BLI_thread_queue_push(ThreadQueue *queue, void *work) BLI_gsqueue_push(queue->queue, &work); /* signal threads waiting to pop */ - pthread_cond_signal(&queue->cond); + pthread_cond_signal(&queue->push_cond); pthread_mutex_unlock(&queue->mutex); } @@ -565,11 +570,15 @@ void *BLI_thread_queue_pop(ThreadQueue *queue) /* wait until there is work */ pthread_mutex_lock(&queue->mutex); while (BLI_gsqueue_is_empty(queue->queue) && !queue->nowait) - pthread_cond_wait(&queue->cond, &queue->mutex); - + pthread_cond_wait(&queue->push_cond, &queue->mutex); + /* if we have something, pop it */ - if (!BLI_gsqueue_is_empty(queue->queue)) + if (!BLI_gsqueue_is_empty(queue->queue)) { BLI_gsqueue_pop(queue->queue, &work); + + if(BLI_gsqueue_is_empty(queue->queue)) + pthread_cond_broadcast(&queue->finish_cond); + } pthread_mutex_unlock(&queue->mutex); @@ -623,16 +632,20 @@ void *BLI_thread_queue_pop_timeout(ThreadQueue *queue, int ms) /* wait until there is work */ pthread_mutex_lock(&queue->mutex); while (BLI_gsqueue_is_empty(queue->queue) && !queue->nowait) { - if (pthread_cond_timedwait(&queue->cond, &queue->mutex, &timeout) == ETIMEDOUT) + if (pthread_cond_timedwait(&queue->push_cond, &queue->mutex, &timeout) == ETIMEDOUT) break; else if (PIL_check_seconds_timer() - t >= ms * 0.001) break; } /* if we have something, pop it */ - if (!BLI_gsqueue_is_empty(queue->queue)) + if (!BLI_gsqueue_is_empty(queue->queue)) { BLI_gsqueue_pop(queue->queue, &work); - + + if(BLI_gsqueue_is_empty(queue->queue)) + pthread_cond_broadcast(&queue->finish_cond); + } + pthread_mutex_unlock(&queue->mutex); return work; @@ -656,10 +669,23 @@ void BLI_thread_queue_nowait(ThreadQueue *queue) queue->nowait = 1; /* signal threads waiting to pop */ - pthread_cond_signal(&queue->cond); + pthread_cond_broadcast(&queue->push_cond); + pthread_mutex_unlock(&queue->mutex); +} + +void BLI_thread_queue_wait_finish(ThreadQueue *queue) +{ + /* wait for finish condition */ + pthread_mutex_lock(&queue->mutex); + + while(!BLI_gsqueue_is_empty(queue->queue)) + pthread_cond_wait(&queue->finish_cond, &queue->mutex); + pthread_mutex_unlock(&queue->mutex); } +/* ************************************************ */ + void BLI_begin_threaded_malloc(void) { if (thread_levels == 0) { @@ -674,3 +700,4 @@ void BLI_end_threaded_malloc(void) if (thread_levels == 0) MEM_set_lock_callback(NULL, NULL); } + diff --git a/source/blender/compositor/intern/COM_ExecutionGroup.cpp b/source/blender/compositor/intern/COM_ExecutionGroup.cpp index 44b3c8dafbb..481b83c81a3 100644 --- a/source/blender/compositor/intern/COM_ExecutionGroup.cpp +++ b/source/blender/compositor/intern/COM_ExecutionGroup.cpp @@ -351,7 +351,8 @@ void ExecutionGroup::execute(ExecutionSystem *graph) startIndex = index+1; } } - PIL_sleep_ms(10); + + WorkScheduler::finish(); if (bTree->test_break && bTree->test_break(bTree->tbh)) { breaked = true; diff --git a/source/blender/compositor/intern/COM_WorkScheduler.cpp b/source/blender/compositor/intern/COM_WorkScheduler.cpp index 172107f720b..ba8bfe55310 100644 --- a/source/blender/compositor/intern/COM_WorkScheduler.cpp +++ b/source/blender/compositor/intern/COM_WorkScheduler.cpp @@ -39,8 +39,6 @@ #endif -/// @brief global state of the WorkScheduler. -static WorkSchedulerState state; /// @brief list of all CPUDevices. for every hardware thread an instance of CPUDevice is created static vector cpudevices; @@ -68,43 +66,29 @@ static bool openclActive = false; #if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE void *WorkScheduler::thread_execute_cpu(void *data) { - bool continueLoop = true; Device *device = (Device*)data; - while (continueLoop) { - WorkPackage *work = (WorkPackage*)BLI_thread_queue_pop(cpuqueue); - if (work) { - device->execute(work); - delete work; - } - PIL_sleep_ms(10); - - if (WorkScheduler::isStopping()) { - continueLoop = false; - } + WorkPackage *work; + + while ((work = (WorkPackage*)BLI_thread_queue_pop(cpuqueue))) { + device->execute(work); + delete work; } + return NULL; } void *WorkScheduler::thread_execute_gpu(void *data) { - bool continueLoop = true; Device *device = (Device*)data; - while (continueLoop) { - WorkPackage *work = (WorkPackage*)BLI_thread_queue_pop(gpuqueue); - if (work) { - device->execute(work); - delete work; - } - PIL_sleep_ms(10); - - if (WorkScheduler::isStopping()) { - continueLoop = false; - } + WorkPackage *work; + + while ((work = (WorkPackage*)BLI_thread_queue_pop(gpuqueue))) { + device->execute(work); + delete work; } + return NULL; } - -bool WorkScheduler::isStopping() {return state == COM_WSS_STOPPING;} #endif @@ -135,7 +119,6 @@ void WorkScheduler::start(CompositorContext &context) #if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE unsigned int index; cpuqueue = BLI_thread_queue_init(); - BLI_thread_queue_nowait(cpuqueue); BLI_init_threads(&cputhreads, thread_execute_cpu, cpudevices.size()); for (index = 0 ; index < cpudevices.size() ; index ++) { Device *device = cpudevices[index]; @@ -144,7 +127,6 @@ void WorkScheduler::start(CompositorContext &context) #ifdef COM_OPENCL_ENABLED if (context.getHasActiveOpenCLDevices()) { gpuqueue = BLI_thread_queue_init(); - BLI_thread_queue_nowait(gpuqueue); BLI_init_threads(&gputhreads, thread_execute_gpu, gpudevices.size()); for (index = 0 ; index < gpudevices.size() ; index ++) { Device *device = gpudevices[index]; @@ -157,45 +139,39 @@ void WorkScheduler::start(CompositorContext &context) } #endif #endif - state = COM_WSS_STARTED; } void WorkScheduler::finish() { #if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE #ifdef COM_OPENCL_ENABLED if (openclActive) { - while (BLI_thread_queue_size(gpuqueue) + BLI_thread_queue_size(cpuqueue) > 0) { - PIL_sleep_ms(10); - } + BLI_thread_queue_wait_finish(gpuqueue); + BLI_thread_queue_wait_finish(cpuqueue); } else { - while (BLI_thread_queue_size(cpuqueue) > 0) { - PIL_sleep_ms(10); - } + BLI_thread_queue_wait_finish(cpuqueue); } #else - while (BLI_thread_queue_size(cpuqueue) > 0) { - PIL_sleep_ms(10); - } + BLI_thread_queue_wait_finish(cpuqueue); #endif #endif } void WorkScheduler::stop() { - state = COM_WSS_STOPPING; #if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE + BLI_thread_queue_nowait(cpuqueue); BLI_end_threads(&cputhreads); BLI_thread_queue_free(cpuqueue); cpuqueue = NULL; #ifdef COM_OPENCL_ENABLED if (openclActive) { + BLI_thread_queue_nowait(gpuqueue); BLI_end_threads(&gputhreads); BLI_thread_queue_free(gpuqueue); gpuqueue = NULL; } #endif #endif - state = COM_WSS_STOPPED; } bool WorkScheduler::hasGPUDevices() @@ -218,8 +194,6 @@ extern void clContextError(const char *errinfo, const void *private_info, size_t void WorkScheduler::initialize() { - state = COM_WSS_UNKNOWN; - #if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE int numberOfCPUThreads = BLI_system_thread_count(); @@ -298,8 +272,6 @@ void WorkScheduler::initialize() } #endif #endif - - state = COM_WSS_INITIALIZED; } void WorkScheduler::deinitialize() @@ -329,5 +301,4 @@ void WorkScheduler::deinitialize() } #endif #endif - state = COM_WSS_DEINITIALIZED; } diff --git a/source/blender/compositor/intern/COM_WorkScheduler.h b/source/blender/compositor/intern/COM_WorkScheduler.h index 0de1763749e..b03b514d139 100644 --- a/source/blender/compositor/intern/COM_WorkScheduler.h +++ b/source/blender/compositor/intern/COM_WorkScheduler.h @@ -31,19 +31,6 @@ extern "C" { #include "COM_defines.h" #include "COM_Device.h" -// STATES -/** @brief states of the WorkScheduler - * @ingroup execution - */ -typedef enum WorkSchedulerState { - COM_WSS_UNKNOWN = -1, - COM_WSS_INITIALIZED = 0, - COM_WSS_STARTED = 1, - COM_WSS_STOPPING = 2, - COM_WSS_STOPPED = 3, - COM_WSS_DEINITIALIZED = 4 -} WorkSchedulerState; - /** @brief the workscheduler * @ingroup execution */ -- cgit v1.2.3