Welcome to mirror list, hosted at ThFree Co, Russian Federation.

git.blender.org/blender.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSergey Sharybin <sergey.vfx@gmail.com>2014-11-21 13:31:30 +0300
committerSergey Sharybin <sergey.vfx@gmail.com>2014-11-21 13:31:30 +0300
commit7b0c529fe211e4481fe8e459cbf11159857c611d (patch)
tree41d066884611f2dde2a3abcf4301053a656fc4a1 /source/blender/blenlib
parenta5f674de3032293287423aaeb6fc53ced25f6025 (diff)
Task scheduler: Add an option to limit number of threads per pool
This way we can have scheduler capable of scheduling tasks on all the CPUs but in the same time we can limit tasks like baking (in the future) to use no more than given number of threads.
Diffstat (limited to 'source/blender/blenlib')
-rw-r--r--source/blender/blenlib/BLI_task.h3
-rw-r--r--source/blender/blenlib/intern/task.c62
2 files changed, 51 insertions, 14 deletions
diff --git a/source/blender/blenlib/BLI_task.h b/source/blender/blenlib/BLI_task.h
index 28da673ea97..8f85bc4ec31 100644
--- a/source/blender/blenlib/BLI_task.h
+++ b/source/blender/blenlib/BLI_task.h
@@ -88,6 +88,9 @@ void BLI_task_pool_cancel(TaskPool *pool);
/* stop all worker threads */
void BLI_task_pool_stop(TaskPool *pool);
+/* set number of threads allowed to be used by this pool */
+void BLI_pool_set_num_threads(TaskPool *pool, int num_threads);
+
/* for worker threads, test if canceled */
bool BLI_task_pool_canceled(TaskPool *pool);
diff --git a/source/blender/blenlib/intern/task.c b/source/blender/blenlib/intern/task.c
index 219ccb18d98..e4cded18b76 100644
--- a/source/blender/blenlib/intern/task.c
+++ b/source/blender/blenlib/intern/task.c
@@ -49,6 +49,8 @@ struct TaskPool {
volatile size_t num;
volatile size_t done;
+ volatile int num_threads;
+ volatile int currently_running_tasks;
ThreadMutex num_mutex;
ThreadCondition num_cond;
@@ -84,6 +86,7 @@ static void task_pool_num_decrease(TaskPool *pool, size_t done)
BLI_assert(pool->num >= done);
pool->num -= done;
+ pool->currently_running_tasks -= done;
pool->done += done;
if (pool->num == 0)
@@ -104,19 +107,37 @@ static void task_pool_num_increase(TaskPool *pool)
static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task)
{
+ bool found_task = false;
BLI_mutex_lock(&scheduler->queue_mutex);
while (!scheduler->queue.first && !scheduler->do_exit)
BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex);
- if (!scheduler->queue.first) {
- BLI_mutex_unlock(&scheduler->queue_mutex);
- BLI_assert(scheduler->do_exit);
- return false;
- }
-
- *task = scheduler->queue.first;
- BLI_remlink(&scheduler->queue, *task);
+ do {
+ Task *current_task;
+ if (!scheduler->queue.first) {
+ BLI_mutex_unlock(&scheduler->queue_mutex);
+ BLI_assert(scheduler->do_exit);
+ return false;
+ }
+ for (current_task = scheduler->queue.first;
+ current_task != NULL;
+ current_task = current_task->next)
+ {
+ TaskPool *pool = current_task->pool;
+ if (pool->num_threads == 0 ||
+ pool->currently_running_tasks < pool->num_threads)
+ {
+ *task = current_task;
+ found_task = true;
+ pool->currently_running_tasks++;
+ BLI_remlink(&scheduler->queue, *task);
+ break;
+ }
+ }
+ if (!found_task)
+ BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex);
+ } while (!found_task);
BLI_mutex_unlock(&scheduler->queue_mutex);
@@ -288,6 +309,8 @@ TaskPool *BLI_task_pool_create(TaskScheduler *scheduler, void *userdata)
pool->scheduler = scheduler;
pool->num = 0;
+ pool->num_threads = 0;
+ pool->currently_running_tasks = 0;
pool->do_cancel = false;
BLI_mutex_init(&pool->num_mutex);
@@ -351,12 +374,16 @@ void BLI_task_pool_work_and_wait(TaskPool *pool)
/* find task from this pool. if we get a task from another pool,
* we can get into deadlock */
- for (task = scheduler->queue.first; task; task = task->next) {
- if (task->pool == pool) {
- work_task = task;
- found_task = true;
- BLI_remlink(&scheduler->queue, task);
- break;
+ if (pool->num_threads == 0 ||
+ pool->currently_running_tasks < pool->num_threads)
+ {
+ for (task = scheduler->queue.first; task; task = task->next) {
+ if (task->pool == pool) {
+ work_task = task;
+ found_task = true;
+ BLI_remlink(&scheduler->queue, task);
+ break;
+ }
}
}
@@ -365,6 +392,7 @@ void BLI_task_pool_work_and_wait(TaskPool *pool)
/* if found task, do it, otherwise wait until other tasks are done */
if (found_task) {
/* run task */
+ pool->currently_running_tasks++;
work_task->run(pool, work_task->taskdata, 0);
/* delete task */
@@ -387,6 +415,12 @@ void BLI_task_pool_work_and_wait(TaskPool *pool)
BLI_mutex_unlock(&pool->num_mutex);
}
+void BLI_pool_set_num_threads(TaskPool *pool, int num_threads)
+{
+ /* NOTE: Don't try to modify threads while tasks are running! */
+ pool->num_threads = num_threads;
+}
+
void BLI_task_pool_cancel(TaskPool *pool)
{
pool->do_cancel = true;