Welcome to mirror list, hosted at ThFree Co, Russian Federation.

git.blender.org/blender.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/source
diff options
context:
space:
mode:
Diffstat (limited to 'source')
-rw-r--r--source/blender/blenlib/BLI_threads.h1
-rw-r--r--source/blender/blenlib/intern/threads.c51
-rw-r--r--source/blender/compositor/intern/COM_ExecutionGroup.cpp3
-rw-r--r--source/blender/compositor/intern/COM_WorkScheduler.cpp65
-rw-r--r--source/blender/compositor/intern/COM_WorkScheduler.h13
5 files changed, 60 insertions, 73 deletions
diff --git a/source/blender/blenlib/BLI_threads.h b/source/blender/blenlib/BLI_threads.h
index b13da9f0dd4..fb8771722c1 100644
--- a/source/blender/blenlib/BLI_threads.h
+++ b/source/blender/blenlib/BLI_threads.h
@@ -136,6 +136,7 @@ void *BLI_thread_queue_pop(ThreadQueue *queue);
void *BLI_thread_queue_pop_timeout(ThreadQueue *queue, int ms);
int BLI_thread_queue_size(ThreadQueue *queue);
+void BLI_thread_queue_wait_finish(ThreadQueue *queue);
void BLI_thread_queue_nowait(ThreadQueue *queue);
#endif
diff --git a/source/blender/blenlib/intern/threads.c b/source/blender/blenlib/intern/threads.c
index f9f677d7c22..dc4c15a82fc 100644
--- a/source/blender/blenlib/intern/threads.c
+++ b/source/blender/blenlib/intern/threads.c
@@ -520,8 +520,10 @@ void BLI_insert_work(ThreadedWorker *worker, void *param)
struct ThreadQueue {
GSQueue *queue;
pthread_mutex_t mutex;
- pthread_cond_t cond;
- int nowait;
+ pthread_cond_t push_cond;
+ pthread_cond_t finish_cond;
+ volatile int nowait;
+ volatile int cancelled;
};
ThreadQueue *BLI_thread_queue_init(void)
@@ -532,14 +534,17 @@ ThreadQueue *BLI_thread_queue_init(void)
queue->queue = BLI_gsqueue_new(sizeof(void *));
pthread_mutex_init(&queue->mutex, NULL);
- pthread_cond_init(&queue->cond, NULL);
+ pthread_cond_init(&queue->push_cond, NULL);
+ pthread_cond_init(&queue->finish_cond, NULL);
return queue;
}
void BLI_thread_queue_free(ThreadQueue *queue)
{
- pthread_cond_destroy(&queue->cond);
+ /* destroy everything, assumes no one is using queue anymore */
+ pthread_cond_destroy(&queue->finish_cond);
+ pthread_cond_destroy(&queue->push_cond);
pthread_mutex_destroy(&queue->mutex);
BLI_gsqueue_free(queue->queue);
@@ -554,7 +559,7 @@ void BLI_thread_queue_push(ThreadQueue *queue, void *work)
BLI_gsqueue_push(queue->queue, &work);
/* signal threads waiting to pop */
- pthread_cond_signal(&queue->cond);
+ pthread_cond_signal(&queue->push_cond);
pthread_mutex_unlock(&queue->mutex);
}
@@ -565,11 +570,15 @@ void *BLI_thread_queue_pop(ThreadQueue *queue)
/* wait until there is work */
pthread_mutex_lock(&queue->mutex);
while (BLI_gsqueue_is_empty(queue->queue) && !queue->nowait)
- pthread_cond_wait(&queue->cond, &queue->mutex);
-
+ pthread_cond_wait(&queue->push_cond, &queue->mutex);
+
/* if we have something, pop it */
- if (!BLI_gsqueue_is_empty(queue->queue))
+ if (!BLI_gsqueue_is_empty(queue->queue)) {
BLI_gsqueue_pop(queue->queue, &work);
+
+ if(BLI_gsqueue_is_empty(queue->queue))
+ pthread_cond_broadcast(&queue->finish_cond);
+ }
pthread_mutex_unlock(&queue->mutex);
@@ -623,16 +632,20 @@ void *BLI_thread_queue_pop_timeout(ThreadQueue *queue, int ms)
/* wait until there is work */
pthread_mutex_lock(&queue->mutex);
while (BLI_gsqueue_is_empty(queue->queue) && !queue->nowait) {
- if (pthread_cond_timedwait(&queue->cond, &queue->mutex, &timeout) == ETIMEDOUT)
+ if (pthread_cond_timedwait(&queue->push_cond, &queue->mutex, &timeout) == ETIMEDOUT)
break;
else if (PIL_check_seconds_timer() - t >= ms * 0.001)
break;
}
/* if we have something, pop it */
- if (!BLI_gsqueue_is_empty(queue->queue))
+ if (!BLI_gsqueue_is_empty(queue->queue)) {
BLI_gsqueue_pop(queue->queue, &work);
-
+
+ if(BLI_gsqueue_is_empty(queue->queue))
+ pthread_cond_broadcast(&queue->finish_cond);
+ }
+
pthread_mutex_unlock(&queue->mutex);
return work;
@@ -656,10 +669,23 @@ void BLI_thread_queue_nowait(ThreadQueue *queue)
queue->nowait = 1;
/* signal threads waiting to pop */
- pthread_cond_signal(&queue->cond);
+ pthread_cond_broadcast(&queue->push_cond);
+ pthread_mutex_unlock(&queue->mutex);
+}
+
+void BLI_thread_queue_wait_finish(ThreadQueue *queue)
+{
+ /* wait for finish condition */
+ pthread_mutex_lock(&queue->mutex);
+
+ while(!BLI_gsqueue_is_empty(queue->queue))
+ pthread_cond_wait(&queue->finish_cond, &queue->mutex);
+
pthread_mutex_unlock(&queue->mutex);
}
+/* ************************************************ */
+
void BLI_begin_threaded_malloc(void)
{
if (thread_levels == 0) {
@@ -674,3 +700,4 @@ void BLI_end_threaded_malloc(void)
if (thread_levels == 0)
MEM_set_lock_callback(NULL, NULL);
}
+
diff --git a/source/blender/compositor/intern/COM_ExecutionGroup.cpp b/source/blender/compositor/intern/COM_ExecutionGroup.cpp
index 44b3c8dafbb..481b83c81a3 100644
--- a/source/blender/compositor/intern/COM_ExecutionGroup.cpp
+++ b/source/blender/compositor/intern/COM_ExecutionGroup.cpp
@@ -351,7 +351,8 @@ void ExecutionGroup::execute(ExecutionSystem *graph)
startIndex = index+1;
}
}
- PIL_sleep_ms(10);
+
+ WorkScheduler::finish();
if (bTree->test_break && bTree->test_break(bTree->tbh)) {
breaked = true;
diff --git a/source/blender/compositor/intern/COM_WorkScheduler.cpp b/source/blender/compositor/intern/COM_WorkScheduler.cpp
index 172107f720b..ba8bfe55310 100644
--- a/source/blender/compositor/intern/COM_WorkScheduler.cpp
+++ b/source/blender/compositor/intern/COM_WorkScheduler.cpp
@@ -39,8 +39,6 @@
#endif
-/// @brief global state of the WorkScheduler.
-static WorkSchedulerState state;
/// @brief list of all CPUDevices. for every hardware thread an instance of CPUDevice is created
static vector<CPUDevice*> cpudevices;
@@ -68,43 +66,29 @@ static bool openclActive = false;
#if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE
void *WorkScheduler::thread_execute_cpu(void *data)
{
- bool continueLoop = true;
Device *device = (Device*)data;
- while (continueLoop) {
- WorkPackage *work = (WorkPackage*)BLI_thread_queue_pop(cpuqueue);
- if (work) {
- device->execute(work);
- delete work;
- }
- PIL_sleep_ms(10);
-
- if (WorkScheduler::isStopping()) {
- continueLoop = false;
- }
+ WorkPackage *work;
+
+ while ((work = (WorkPackage*)BLI_thread_queue_pop(cpuqueue))) {
+ device->execute(work);
+ delete work;
}
+
return NULL;
}
void *WorkScheduler::thread_execute_gpu(void *data)
{
- bool continueLoop = true;
Device *device = (Device*)data;
- while (continueLoop) {
- WorkPackage *work = (WorkPackage*)BLI_thread_queue_pop(gpuqueue);
- if (work) {
- device->execute(work);
- delete work;
- }
- PIL_sleep_ms(10);
-
- if (WorkScheduler::isStopping()) {
- continueLoop = false;
- }
+ WorkPackage *work;
+
+ while ((work = (WorkPackage*)BLI_thread_queue_pop(gpuqueue))) {
+ device->execute(work);
+ delete work;
}
+
return NULL;
}
-
-bool WorkScheduler::isStopping() {return state == COM_WSS_STOPPING;}
#endif
@@ -135,7 +119,6 @@ void WorkScheduler::start(CompositorContext &context)
#if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE
unsigned int index;
cpuqueue = BLI_thread_queue_init();
- BLI_thread_queue_nowait(cpuqueue);
BLI_init_threads(&cputhreads, thread_execute_cpu, cpudevices.size());
for (index = 0 ; index < cpudevices.size() ; index ++) {
Device *device = cpudevices[index];
@@ -144,7 +127,6 @@ void WorkScheduler::start(CompositorContext &context)
#ifdef COM_OPENCL_ENABLED
if (context.getHasActiveOpenCLDevices()) {
gpuqueue = BLI_thread_queue_init();
- BLI_thread_queue_nowait(gpuqueue);
BLI_init_threads(&gputhreads, thread_execute_gpu, gpudevices.size());
for (index = 0 ; index < gpudevices.size() ; index ++) {
Device *device = gpudevices[index];
@@ -157,45 +139,39 @@ void WorkScheduler::start(CompositorContext &context)
}
#endif
#endif
- state = COM_WSS_STARTED;
}
void WorkScheduler::finish()
{
#if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE
#ifdef COM_OPENCL_ENABLED
if (openclActive) {
- while (BLI_thread_queue_size(gpuqueue) + BLI_thread_queue_size(cpuqueue) > 0) {
- PIL_sleep_ms(10);
- }
+ BLI_thread_queue_wait_finish(gpuqueue);
+ BLI_thread_queue_wait_finish(cpuqueue);
}
else {
- while (BLI_thread_queue_size(cpuqueue) > 0) {
- PIL_sleep_ms(10);
- }
+ BLI_thread_queue_wait_finish(cpuqueue);
}
#else
- while (BLI_thread_queue_size(cpuqueue) > 0) {
- PIL_sleep_ms(10);
- }
+ BLI_thread_queue_wait_finish(cpuqueue);
#endif
#endif
}
void WorkScheduler::stop()
{
- state = COM_WSS_STOPPING;
#if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE
+ BLI_thread_queue_nowait(cpuqueue);
BLI_end_threads(&cputhreads);
BLI_thread_queue_free(cpuqueue);
cpuqueue = NULL;
#ifdef COM_OPENCL_ENABLED
if (openclActive) {
+ BLI_thread_queue_nowait(gpuqueue);
BLI_end_threads(&gputhreads);
BLI_thread_queue_free(gpuqueue);
gpuqueue = NULL;
}
#endif
#endif
- state = COM_WSS_STOPPED;
}
bool WorkScheduler::hasGPUDevices()
@@ -218,8 +194,6 @@ extern void clContextError(const char *errinfo, const void *private_info, size_t
void WorkScheduler::initialize()
{
- state = COM_WSS_UNKNOWN;
-
#if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE
int numberOfCPUThreads = BLI_system_thread_count();
@@ -298,8 +272,6 @@ void WorkScheduler::initialize()
}
#endif
#endif
-
- state = COM_WSS_INITIALIZED;
}
void WorkScheduler::deinitialize()
@@ -329,5 +301,4 @@ void WorkScheduler::deinitialize()
}
#endif
#endif
- state = COM_WSS_DEINITIALIZED;
}
diff --git a/source/blender/compositor/intern/COM_WorkScheduler.h b/source/blender/compositor/intern/COM_WorkScheduler.h
index 0de1763749e..b03b514d139 100644
--- a/source/blender/compositor/intern/COM_WorkScheduler.h
+++ b/source/blender/compositor/intern/COM_WorkScheduler.h
@@ -31,19 +31,6 @@ extern "C" {
#include "COM_defines.h"
#include "COM_Device.h"
-// STATES
-/** @brief states of the WorkScheduler
- * @ingroup execution
- */
-typedef enum WorkSchedulerState {
- COM_WSS_UNKNOWN = -1,
- COM_WSS_INITIALIZED = 0,
- COM_WSS_STARTED = 1,
- COM_WSS_STOPPING = 2,
- COM_WSS_STOPPED = 3,
- COM_WSS_DEINITIALIZED = 4
-} WorkSchedulerState;
-
/** @brief the workscheduler
* @ingroup execution
*/