Welcome to mirror list, hosted at ThFree Co, Russian Federation.

git.blender.org/blender.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBrecht Van Lommel <brecht@blender.org>2020-04-30 08:59:23 +0300
committerJeroen Bakker <jeroen@blender.org>2020-04-30 09:09:21 +0300
commitd8a3f3595af0fb3ca5937e41c2728fd750d986ef (patch)
tree03886cfd2ea7ad200a50317d2362f0fc94070f0c /source/blender/blenlib
parenta18ad3c3b6198964ab7134302afda1afc89da5f4 (diff)
Task: Use TBB as Task Scheduler
This patch enables TBB as the default task scheduler. TBB stands for Threading Building Blocks and is developed by Intel. The library contains several threading patters. This patch maps blenders BLI_task_* function to their counterpart. After this patch we can add more patterns. A promising one is TBB:graph that can be used for depsgraph, draw manager and compositor. Performance changes depends on the actual hardware. It was tested on different hardwares from laptops to workstations and we didn't detected any downgrade of the performance. * Linux Xeon E5-2699 v4 got FPS boost from 12 to 17 using Spring's 04_010_A.anim.blend. * AMD Ryzen Threadripper 2990WX 32-Core Animation playback goes from 9.5-10.5 FPS to 13.0-14.0 FPS on Agent 327 , 10_03_B.anim.blend. Reviewed By: brecht, sergey Differential Revision: https://developer.blender.org/D7475
Diffstat (limited to 'source/blender/blenlib')
-rw-r--r--source/blender/blenlib/BLI_task.h66
-rw-r--r--source/blender/blenlib/BLI_threads.h2
-rw-r--r--source/blender/blenlib/CMakeLists.txt12
-rw-r--r--source/blender/blenlib/intern/task_iterator.c341
-rw-r--r--source/blender/blenlib/intern/task_pool.cc1143
-rw-r--r--source/blender/blenlib/intern/task_range.cc167
-rw-r--r--source/blender/blenlib/intern/task_scheduler.cc73
-rw-r--r--source/blender/blenlib/intern/threads.c26
8 files changed, 599 insertions, 1231 deletions
diff --git a/source/blender/blenlib/BLI_task.h b/source/blender/blenlib/BLI_task.h
index 42dd47266dc..ee087600a31 100644
--- a/source/blender/blenlib/BLI_task.h
+++ b/source/blender/blenlib/BLI_task.h
@@ -43,12 +43,9 @@ struct BLI_mempool;
* must be called from the main threads. All other scheduler and pool functions
* are thread-safe. */
-typedef struct TaskScheduler TaskScheduler;
-
-TaskScheduler *BLI_task_scheduler_create(int num_threads);
-void BLI_task_scheduler_free(TaskScheduler *scheduler);
-
-int BLI_task_scheduler_num_threads(TaskScheduler *scheduler);
+void BLI_task_scheduler_init(void);
+void BLI_task_scheduler_exit(void);
+int BLI_task_scheduler_num_threads(void);
/* Task Pool
*
@@ -70,16 +67,14 @@ typedef enum TaskPriority {
} TaskPriority;
typedef struct TaskPool TaskPool;
-typedef void (*TaskRunFunction)(TaskPool *__restrict pool, void *taskdata, int threadid);
+typedef void (*TaskRunFunction)(TaskPool *__restrict pool, void *taskdata);
typedef void (*TaskFreeFunction)(TaskPool *__restrict pool, void *taskdata);
-TaskPool *BLI_task_pool_create(TaskScheduler *scheduler, void *userdata, TaskPriority priority);
-TaskPool *BLI_task_pool_create_background(TaskScheduler *scheduler,
- void *userdata,
- TaskPriority priority);
-TaskPool *BLI_task_pool_create_suspended(TaskScheduler *scheduler,
- void *userdata,
- TaskPriority priority);
+TaskPool *BLI_task_pool_create(void *userdata, TaskPriority priority);
+TaskPool *BLI_task_pool_create_background(void *userdata, TaskPriority priority);
+TaskPool *BLI_task_pool_create_suspended(void *userdata, TaskPriority priority);
+TaskPool *BLI_task_pool_create_no_threads(void *userdata);
+TaskPool *BLI_task_pool_create_background_serial(void *userdata, TaskPriority priority);
void BLI_task_pool_free(TaskPool *pool);
void BLI_task_pool_push(TaskPool *pool,
@@ -87,17 +82,9 @@ void BLI_task_pool_push(TaskPool *pool,
void *taskdata,
bool free_taskdata,
TaskFreeFunction freedata);
-void BLI_task_pool_push_from_thread(TaskPool *pool,
- TaskRunFunction run,
- void *taskdata,
- bool free_taskdata,
- TaskFreeFunction freedata,
- int thread_id);
/* work and wait until all tasks are done */
void BLI_task_pool_work_and_wait(TaskPool *pool);
-/* work and wait until all tasks are done, then reset to the initial suspended state */
-void BLI_task_pool_work_wait_and_reset(TaskPool *pool);
/* cancel all tasks, keep worker threads running */
void BLI_task_pool_cancel(TaskPool *pool);
@@ -110,36 +97,10 @@ void *BLI_task_pool_user_data(TaskPool *pool);
/* optional mutex to use from run function */
ThreadMutex *BLI_task_pool_user_mutex(TaskPool *pool);
-/* Thread ID of thread that created the task pool. */
-int BLI_task_pool_creator_thread_id(TaskPool *pool);
-
-/* Delayed push, use that to reduce thread overhead by accumulating
- * all new tasks into local queue first and pushing it to scheduler
- * from within a single mutex lock.
- */
-void BLI_task_pool_delayed_push_begin(TaskPool *pool, int thread_id);
-void BLI_task_pool_delayed_push_end(TaskPool *pool, int thread_id);
-
/* Parallel for routines */
-typedef enum eTaskSchedulingMode {
- /* Task scheduler will divide overall work into equal chunks, scheduling
- * even chunks to all worker threads.
- * Least run time benefit, ideal for cases when each task requires equal
- * amount of compute power.
- */
- TASK_SCHEDULING_STATIC,
- /* Task scheduler will schedule small amount of work to each worker thread.
- * Has more run time overhead, but deals much better with cases when each
- * part of the work requires totally different amount of compute power.
- */
- TASK_SCHEDULING_DYNAMIC,
-} eTaskSchedulingMode;
-
/* Per-thread specific data passed to the callback. */
typedef struct TaskParallelTLS {
- /* Identifier of the thread who this data belongs to. */
- int thread_id;
/* Copy of user-specifier chunk, which is copied from original chunk to all
* worker threads. This is similar to OpenMP's firstprivate.
*/
@@ -163,8 +124,6 @@ typedef struct TaskParallelSettings {
* is higher than a chunk size. As in, threading will always be performed.
*/
bool use_threading;
- /* Scheduling mode to use for this parallel range invocation. */
- eTaskSchedulingMode scheduling_mode;
/* Each instance of looping chunks will get a copy of this data
* (similar to OpenMP's firstprivate).
*/
@@ -199,7 +158,7 @@ void BLI_task_parallel_range(const int start,
const int stop,
void *userdata,
TaskParallelRangeFunc func,
- TaskParallelSettings *settings);
+ const TaskParallelSettings *settings);
/* This data is shared between all tasks, its access needs thread lock or similar protection.
*/
@@ -254,11 +213,14 @@ BLI_INLINE void BLI_parallel_range_settings_defaults(TaskParallelSettings *setti
{
memset(settings, 0, sizeof(*settings));
settings->use_threading = true;
- settings->scheduling_mode = TASK_SCHEDULING_STATIC;
/* Use default heuristic to define actual chunk size. */
settings->min_iter_per_thread = 0;
}
+/* Don't use this, store any thread specific data in tls->userdata_chunk instead.
+ * Ony here for code to be removed. */
+int BLI_task_parallel_thread_id(const TaskParallelTLS *tls);
+
#ifdef __cplusplus
}
#endif
diff --git a/source/blender/blenlib/BLI_threads.h b/source/blender/blenlib/BLI_threads.h
index c2127c1ec3a..243efedebf9 100644
--- a/source/blender/blenlib/BLI_threads.h
+++ b/source/blender/blenlib/BLI_threads.h
@@ -47,8 +47,6 @@ struct TaskScheduler;
void BLI_threadapi_init(void);
void BLI_threadapi_exit(void);
-struct TaskScheduler *BLI_task_scheduler_get(void);
-
void BLI_threadpool_init(struct ListBase *threadbase, void *(*do_thread)(void *), int tot);
int BLI_available_threads(struct ListBase *threadbase);
int BLI_threadpool_available_thread_index(struct ListBase *threadbase);
diff --git a/source/blender/blenlib/CMakeLists.txt b/source/blender/blenlib/CMakeLists.txt
index a26d5cc46fb..52b302f99d4 100644
--- a/source/blender/blenlib/CMakeLists.txt
+++ b/source/blender/blenlib/CMakeLists.txt
@@ -119,8 +119,10 @@ set(SRC
intern/string_utf8.c
intern/string_utils.c
intern/system.c
- intern/task_pool.cc
intern/task_iterator.c
+ intern/task_pool.cc
+ intern/task_range.cc
+ intern/task_scheduler.cc
intern/threads.c
intern/time.c
intern/timecode.c
@@ -278,6 +280,14 @@ if(WITH_MEM_VALGRIND)
add_definitions(-DWITH_MEM_VALGRIND)
endif()
+if(WITH_TBB)
+ add_definitions(-DWITH_TBB)
+
+ list(APPEND INC_SYS
+ ${TBB_INCLUDE_DIRS}
+ )
+endif()
+
if(WIN32)
list(APPEND INC
../../../intern/utfconv
diff --git a/source/blender/blenlib/intern/task_iterator.c b/source/blender/blenlib/intern/task_iterator.c
index 1189ec0d0c0..ee459ac2548 100644
--- a/source/blender/blenlib/intern/task_iterator.c
+++ b/source/blender/blenlib/intern/task_iterator.c
@@ -17,7 +17,7 @@
/** \file
* \ingroup bli
*
- * A generic task system which can be used for any task based subsystem.
+ * Parallel tasks over all elements in a container.
*/
#include <stdlib.h>
@@ -34,82 +34,12 @@
#include "atomic_ops.h"
-/* Parallel range routines */
-
-/**
- *
- * Main functions:
- * - #BLI_task_parallel_range
- * - #BLI_task_parallel_listbase (#ListBase - double linked list)
- *
- * TODO:
- * - #BLI_task_parallel_foreach_link (#Link - single linked list)
- * - #BLI_task_parallel_foreach_ghash/gset (#GHash/#GSet - hash & set)
- * - #BLI_task_parallel_foreach_mempool (#BLI_mempool - iterate over mempools)
- */
-
/* Allows to avoid using malloc for userdata_chunk in tasks, when small enough. */
#define MALLOCA(_size) ((_size) <= 8192) ? alloca((_size)) : MEM_mallocN((_size), __func__)
#define MALLOCA_FREE(_mem, _size) \
if (((_mem) != NULL) && ((_size) > 8192)) \
MEM_freeN((_mem))
-/* Stores all needed data to perform a parallelized iteration,
- * with a same operation (callback function).
- * It can be chained with other tasks in a single-linked list way. */
-typedef struct TaskParallelRangeState {
- struct TaskParallelRangeState *next;
-
- /* Start and end point of integer value iteration. */
- int start, stop;
-
- /* User-defined data, shared between all worker threads. */
- void *userdata_shared;
- /* User-defined callback function called for each value in [start, stop[ specified range. */
- TaskParallelRangeFunc func;
-
- /* Each instance of looping chunks will get a copy of this data
- * (similar to OpenMP's firstprivate).
- */
- void *initial_tls_memory; /* Pointer to actual user-defined 'tls' data. */
- size_t tls_data_size; /* Size of that data. */
-
- void *flatten_tls_storage; /* 'tls' copies of initial_tls_memory for each running task. */
- /* Number of 'tls' copies in the array, i.e. number of worker threads. */
- size_t num_elements_in_tls_storage;
-
- /* Function called to join user data chunk into another, to reduce
- * the result to the original userdata_chunk memory.
- * The reduce functions should have no side effects, so that they
- * can be run on any thread. */
- TaskParallelReduceFunc func_reduce;
- /* Function called to free data created by TaskParallelRangeFunc. */
- TaskParallelFreeFunc func_free;
-
- /* Current value of the iterator, shared between all threads (atomically updated). */
- int iter_value;
- int iter_chunk_num; /* Amount of iterations to process in a single step. */
-} TaskParallelRangeState;
-
-/* Stores all the parallel tasks for a single pool. */
-typedef struct TaskParallelRangePool {
- /* The workers' task pool. */
- TaskPool *pool;
- /* The number of worker tasks we need to create. */
- int num_tasks;
- /* The total number of iterations in all the added ranges. */
- int num_total_iters;
- /* The size (number of items) processed at once by a worker task. */
- int chunk_size;
-
- /* Linked list of range tasks to process. */
- TaskParallelRangeState *parallel_range_states;
- /* Current range task beeing processed, swapped atomically. */
- TaskParallelRangeState *current_state;
- /* Scheduling settings common to all tasks. */
- TaskParallelSettings *settings;
-} TaskParallelRangePool;
-
BLI_INLINE void task_parallel_calc_chunk_size(const TaskParallelSettings *settings,
const int tot_items,
int num_tasks,
@@ -154,232 +84,7 @@ BLI_INLINE void task_parallel_calc_chunk_size(const TaskParallelSettings *settin
}
BLI_assert(chunk_size > 0);
-
- if (tot_items > 0) {
- switch (settings->scheduling_mode) {
- case TASK_SCHEDULING_STATIC:
- *r_chunk_size = max_ii(chunk_size, tot_items / num_tasks);
- break;
- case TASK_SCHEDULING_DYNAMIC:
- *r_chunk_size = chunk_size;
- break;
- }
- }
- else {
- /* If total amount of items is unknown, we can only use dynamic scheduling. */
- *r_chunk_size = chunk_size;
- }
-}
-
-BLI_INLINE void task_parallel_range_calc_chunk_size(TaskParallelRangePool *range_pool)
-{
- int num_iters = 0;
- int min_num_iters = INT_MAX;
- for (TaskParallelRangeState *state = range_pool->parallel_range_states; state != NULL;
- state = state->next) {
- const int ni = state->stop - state->start;
- num_iters += ni;
- if (min_num_iters > ni) {
- min_num_iters = ni;
- }
- }
- range_pool->num_total_iters = num_iters;
- /* Note: Passing min_num_iters here instead of num_iters kind of partially breaks the 'static'
- * scheduling, but pooled range iterator is inherently non-static anyway, so adding a small level
- * of dynamic scheduling here should be fine. */
- task_parallel_calc_chunk_size(
- range_pool->settings, min_num_iters, range_pool->num_tasks, &range_pool->chunk_size);
-}
-
-BLI_INLINE bool parallel_range_next_iter_get(TaskParallelRangePool *__restrict range_pool,
- int *__restrict r_iter,
- int *__restrict r_count,
- TaskParallelRangeState **__restrict r_state)
-{
- /* We need an atomic op here as well to fetch the initial state, since some other thread might
- * have already updated it. */
- TaskParallelRangeState *current_state = atomic_cas_ptr(
- (void **)&range_pool->current_state, NULL, NULL);
-
- int previter = INT32_MAX;
-
- while (current_state != NULL && previter >= current_state->stop) {
- previter = atomic_fetch_and_add_int32(&current_state->iter_value, range_pool->chunk_size);
- *r_iter = previter;
- *r_count = max_ii(0, min_ii(range_pool->chunk_size, current_state->stop - previter));
-
- if (previter >= current_state->stop) {
- /* At this point the state we got is done, we need to go to the next one. In case some other
- * thread already did it, then this does nothing, and we'll just get current valid state
- * at start of the next loop. */
- TaskParallelRangeState *current_state_from_atomic_cas = atomic_cas_ptr(
- (void **)&range_pool->current_state, current_state, current_state->next);
-
- if (current_state == current_state_from_atomic_cas) {
- /* The atomic CAS operation was successful, we did update range_pool->current_state, so we
- * can safely switch to next state. */
- current_state = current_state->next;
- }
- else {
- /* The atomic CAS operation failed, but we still got range_pool->current_state value out of
- * it, just use it as our new current state. */
- current_state = current_state_from_atomic_cas;
- }
- }
- }
-
- *r_state = current_state;
- return (current_state != NULL && previter < current_state->stop);
-}
-
-static void parallel_range_func(TaskPool *__restrict pool, void *tls_data_idx, int thread_id)
-{
- TaskParallelRangePool *__restrict range_pool = BLI_task_pool_user_data(pool);
- TaskParallelTLS tls = {
- .thread_id = thread_id,
- .userdata_chunk = NULL,
- };
- TaskParallelRangeState *state;
- int iter, count;
- while (parallel_range_next_iter_get(range_pool, &iter, &count, &state)) {
- tls.userdata_chunk = (char *)state->flatten_tls_storage +
- (((size_t)POINTER_AS_INT(tls_data_idx)) * state->tls_data_size);
- for (int i = 0; i < count; i++) {
- state->func(state->userdata_shared, iter + i, &tls);
- }
- }
-}
-
-static void parallel_range_single_thread(TaskParallelRangePool *range_pool)
-{
- for (TaskParallelRangeState *state = range_pool->parallel_range_states; state != NULL;
- state = state->next) {
- const int start = state->start;
- const int stop = state->stop;
- void *userdata = state->userdata_shared;
- TaskParallelRangeFunc func = state->func;
-
- void *initial_tls_memory = state->initial_tls_memory;
- const size_t tls_data_size = state->tls_data_size;
- const bool use_tls_data = (tls_data_size != 0) && (initial_tls_memory != NULL);
- TaskParallelTLS tls = {
- .thread_id = 0,
- .userdata_chunk = initial_tls_memory,
- };
- for (int i = start; i < stop; i++) {
- func(userdata, i, &tls);
- }
- if (use_tls_data && state->func_free != NULL) {
- /* `func_free` should only free data that was created during execution of `func`. */
- state->func_free(userdata, initial_tls_memory);
- }
- }
-}
-
-/**
- * This function allows to parallelized for loops in a similar way to OpenMP's
- * 'parallel for' statement.
- *
- * See public API doc of ParallelRangeSettings for description of all settings.
- */
-void BLI_task_parallel_range(const int start,
- const int stop,
- void *userdata,
- TaskParallelRangeFunc func,
- TaskParallelSettings *settings)
-{
- if (start == stop) {
- return;
- }
-
- BLI_assert(start < stop);
-
- TaskParallelRangeState state = {
- .next = NULL,
- .start = start,
- .stop = stop,
- .userdata_shared = userdata,
- .func = func,
- .iter_value = start,
- .initial_tls_memory = settings->userdata_chunk,
- .tls_data_size = settings->userdata_chunk_size,
- .func_free = settings->func_free,
- };
- TaskParallelRangePool range_pool = {
- .pool = NULL, .parallel_range_states = &state, .current_state = NULL, .settings = settings};
- int i, num_threads, num_tasks;
-
- void *tls_data = settings->userdata_chunk;
- const size_t tls_data_size = settings->userdata_chunk_size;
- if (tls_data_size != 0) {
- BLI_assert(tls_data != NULL);
- }
- const bool use_tls_data = (tls_data_size != 0) && (tls_data != NULL);
- void *flatten_tls_storage = NULL;
-
- /* If it's not enough data to be crunched, don't bother with tasks at all,
- * do everything from the current thread.
- */
- if (!settings->use_threading) {
- parallel_range_single_thread(&range_pool);
- return;
- }
-
- TaskScheduler *task_scheduler = BLI_task_scheduler_get();
- num_threads = BLI_task_scheduler_num_threads(task_scheduler);
-
- /* The idea here is to prevent creating task for each of the loop iterations
- * and instead have tasks which are evenly distributed across CPU cores and
- * pull next iter to be crunched using the queue.
- */
- range_pool.num_tasks = num_tasks = num_threads + 2;
-
- task_parallel_range_calc_chunk_size(&range_pool);
- range_pool.num_tasks = num_tasks = min_ii(num_tasks,
- max_ii(1, (stop - start) / range_pool.chunk_size));
-
- if (num_tasks == 1) {
- parallel_range_single_thread(&range_pool);
- return;
- }
-
- TaskPool *task_pool = range_pool.pool = BLI_task_pool_create_suspended(
- task_scheduler, &range_pool, TASK_PRIORITY_HIGH);
-
- range_pool.current_state = &state;
-
- if (use_tls_data) {
- state.flatten_tls_storage = flatten_tls_storage = MALLOCA(tls_data_size * (size_t)num_tasks);
- state.tls_data_size = tls_data_size;
- }
-
- const int thread_id = BLI_task_pool_creator_thread_id(task_pool);
- for (i = 0; i < num_tasks; i++) {
- if (use_tls_data) {
- void *userdata_chunk_local = (char *)flatten_tls_storage + (tls_data_size * (size_t)i);
- memcpy(userdata_chunk_local, tls_data, tls_data_size);
- }
- /* Use this pool's pre-allocated tasks. */
- BLI_task_pool_push_from_thread(
- task_pool, parallel_range_func, POINTER_FROM_INT(i), false, NULL, thread_id);
- }
-
- BLI_task_pool_work_and_wait(task_pool);
- BLI_task_pool_free(task_pool);
-
- if (use_tls_data && (settings->func_free != NULL || settings->func_reduce != NULL)) {
- for (i = 0; i < num_tasks; i++) {
- void *userdata_chunk_local = (char *)flatten_tls_storage + (tls_data_size * (size_t)i);
- if (settings->func_reduce) {
- settings->func_reduce(userdata, tls_data, userdata_chunk_local);
- }
- if (settings->func_free) {
- /* `func_free` should only free data that was created during execution of `func`. */
- settings->func_free(userdata, userdata_chunk_local);
- }
- }
- MALLOCA_FREE(flatten_tls_storage, tls_data_size * (size_t)num_tasks);
- }
+ *r_chunk_size = chunk_size;
}
typedef struct TaskParallelIteratorState {
@@ -394,20 +99,10 @@ typedef struct TaskParallelIteratorState {
int tot_items;
} TaskParallelIteratorState;
-BLI_INLINE void task_parallel_iterator_calc_chunk_size(const TaskParallelSettings *settings,
- const int num_tasks,
- TaskParallelIteratorState *state)
-{
- task_parallel_calc_chunk_size(
- settings, state->tot_items, num_tasks, &state->iter_shared.chunk_size);
-}
-
static void parallel_iterator_func_do(TaskParallelIteratorState *__restrict state,
- void *userdata_chunk,
- int threadid)
+ void *userdata_chunk)
{
TaskParallelTLS tls = {
- .thread_id = threadid,
.userdata_chunk = userdata_chunk,
};
@@ -460,11 +155,11 @@ static void parallel_iterator_func_do(TaskParallelIteratorState *__restrict stat
MALLOCA_FREE(current_chunk_indices, indices_size);
}
-static void parallel_iterator_func(TaskPool *__restrict pool, void *userdata_chunk, int threadid)
+static void parallel_iterator_func(TaskPool *__restrict pool, void *userdata_chunk)
{
TaskParallelIteratorState *__restrict state = BLI_task_pool_user_data(pool);
- parallel_iterator_func_do(state, userdata_chunk, threadid);
+ parallel_iterator_func_do(state, userdata_chunk);
}
static void task_parallel_iterator_no_threads(const TaskParallelSettings *settings,
@@ -483,7 +178,7 @@ static void task_parallel_iterator_no_threads(const TaskParallelSettings *settin
/* Also marking it as non-threaded for the iterator callback. */
state->iter_shared.spin_lock = NULL;
- parallel_iterator_func_do(state, userdata_chunk, 0);
+ parallel_iterator_func_do(state, userdata_chunk);
if (use_userdata_chunk && settings->func_free != NULL) {
/* `func_free` should only free data that was created during execution of `func`. */
@@ -494,10 +189,10 @@ static void task_parallel_iterator_no_threads(const TaskParallelSettings *settin
static void task_parallel_iterator_do(const TaskParallelSettings *settings,
TaskParallelIteratorState *state)
{
- TaskScheduler *task_scheduler = BLI_task_scheduler_get();
- const int num_threads = BLI_task_scheduler_num_threads(task_scheduler);
+ const int num_threads = BLI_task_scheduler_num_threads();
- task_parallel_iterator_calc_chunk_size(settings, num_threads, state);
+ task_parallel_calc_chunk_size(
+ settings, state->tot_items, num_threads, &state->iter_shared.chunk_size);
if (!settings->use_threading) {
task_parallel_iterator_no_threads(settings, state);
@@ -526,21 +221,19 @@ static void task_parallel_iterator_do(const TaskParallelSettings *settings,
void *userdata_chunk_array = NULL;
const bool use_userdata_chunk = (userdata_chunk_size != 0) && (userdata_chunk != NULL);
- TaskPool *task_pool = BLI_task_pool_create_suspended(task_scheduler, state, TASK_PRIORITY_HIGH);
+ TaskPool *task_pool = BLI_task_pool_create(state, TASK_PRIORITY_HIGH);
if (use_userdata_chunk) {
userdata_chunk_array = MALLOCA(userdata_chunk_size * num_tasks);
}
- const int thread_id = BLI_task_pool_creator_thread_id(task_pool);
for (size_t i = 0; i < num_tasks; i++) {
if (use_userdata_chunk) {
userdata_chunk_local = (char *)userdata_chunk_array + (userdata_chunk_size * i);
memcpy(userdata_chunk_local, userdata_chunk, userdata_chunk_size);
}
/* Use this pool's pre-allocated tasks. */
- BLI_task_pool_push_from_thread(
- task_pool, parallel_iterator_func, userdata_chunk_local, false, NULL, thread_id);
+ BLI_task_pool_push(task_pool, parallel_iterator_func, userdata_chunk_local, false, NULL);
}
BLI_task_pool_work_and_wait(task_pool);
@@ -656,7 +349,7 @@ typedef struct ParallelMempoolState {
TaskParallelMempoolFunc func;
} ParallelMempoolState;
-static void parallel_mempool_func(TaskPool *__restrict pool, void *taskdata, int UNUSED(threadid))
+static void parallel_mempool_func(TaskPool *__restrict pool, void *taskdata)
{
ParallelMempoolState *__restrict state = BLI_task_pool_user_data(pool);
BLI_mempool_iter *iter = taskdata;
@@ -684,7 +377,6 @@ void BLI_task_parallel_mempool(BLI_mempool *mempool,
TaskParallelMempoolFunc func,
const bool use_threading)
{
- TaskScheduler *task_scheduler;
TaskPool *task_pool;
ParallelMempoolState state;
int i, num_threads, num_tasks;
@@ -704,9 +396,8 @@ void BLI_task_parallel_mempool(BLI_mempool *mempool,
return;
}
- task_scheduler = BLI_task_scheduler_get();
- task_pool = BLI_task_pool_create_suspended(task_scheduler, &state, TASK_PRIORITY_HIGH);
- num_threads = BLI_task_scheduler_num_threads(task_scheduler);
+ task_pool = BLI_task_pool_create(&state, TASK_PRIORITY_HIGH);
+ num_threads = BLI_task_scheduler_num_threads();
/* The idea here is to prevent creating task for each of the loop iterations
* and instead have tasks which are evenly distributed across CPU cores and
@@ -720,11 +411,9 @@ void BLI_task_parallel_mempool(BLI_mempool *mempool,
BLI_mempool_iter *mempool_iterators = BLI_mempool_iter_threadsafe_create(mempool,
(size_t)num_tasks);
- const int thread_id = BLI_task_pool_creator_thread_id(task_pool);
for (i = 0; i < num_tasks; i++) {
/* Use this pool's pre-allocated tasks. */
- BLI_task_pool_push_from_thread(
- task_pool, parallel_mempool_func, &mempool_iterators[i], false, NULL, thread_id);
+ BLI_task_pool_push(task_pool, parallel_mempool_func, &mempool_iterators[i], false, NULL);
}
BLI_task_pool_work_and_wait(task_pool);
diff --git a/source/blender/blenlib/intern/task_pool.cc b/source/blender/blenlib/intern/task_pool.cc
index 60ed156105c..da67412865b 100644
--- a/source/blender/blenlib/intern/task_pool.cc
+++ b/source/blender/blenlib/intern/task_pool.cc
@@ -17,731 +17,368 @@
/** \file
* \ingroup bli
*
- * A generic task system which can be used for any task based subsystem.
+ * Task pool to run tasks in parallel.
*/
+#include <memory>
#include <stdlib.h>
+#include <utility>
#include "MEM_guardedalloc.h"
#include "DNA_listBase.h"
-#include "BLI_listbase.h"
#include "BLI_math.h"
#include "BLI_mempool.h"
#include "BLI_task.h"
#include "BLI_threads.h"
-#include "atomic_ops.h"
-
-/* Define this to enable some detailed statistic print. */
-#undef DEBUG_STATS
-
-/* Types */
-
-/* Number of per-thread pre-allocated tasks.
- *
- * For more details see description of TaskMemPool.
- */
-#define MEMPOOL_SIZE 256
-
-/* Number of tasks which are pushed directly to local thread queue.
- *
- * This allows thread to fetch next task without locking the whole queue.
- */
-#define LOCAL_QUEUE_SIZE 1
-
-/* Number of tasks which are allowed to be scheduled in a delayed manner.
- *
- * This allows to use less locks per graph node children schedule. More details
- * could be found at TaskThreadLocalStorage::do_delayed_push.
- */
-#define DELAYED_QUEUE_SIZE 4096
-
-#ifndef NDEBUG
-# define ASSERT_THREAD_ID(scheduler, thread_id) \
- do { \
- if (!BLI_thread_is_main()) { \
- TaskThread *thread = (TaskThread *)pthread_getspecific(scheduler->tls_id_key); \
- if (thread == NULL) { \
- BLI_assert(thread_id == 0); \
- } \
- else { \
- BLI_assert(thread_id == thread->id); \
- } \
- } \
- else { \
- BLI_assert(thread_id == 0); \
- } \
- } while (false)
-#else
-# define ASSERT_THREAD_ID(scheduler, thread_id)
+#ifdef WITH_TBB
+/* Quiet top level deprecation message, unrelated to API usage here. */
+# define TBB_SUPPRESS_DEPRECATED_MESSAGES 1
+# include <tbb/tbb.h>
#endif
-typedef struct Task {
- struct Task *next, *prev;
+/* Task
+ *
+ * Unit of work to execute. This is a C++ class to work with TBB. */
+class Task {
+ public:
+ TaskPool *pool;
TaskRunFunction run;
void *taskdata;
bool free_taskdata;
TaskFreeFunction freedata;
- TaskPool *pool;
-} Task;
-/* This is a per-thread storage of pre-allocated tasks.
- *
- * The idea behind this is simple: reduce amount of malloc() calls when pushing
- * new task to the pool. This is done by keeping memory from the tasks which
- * were finished already, so instead of freeing that memory we put it to the
- * pool for the later re-use.
- *
- * The tricky part here is to avoid any inter-thread synchronization, hence no
- * lock must exist around this pool. The pool will become an owner of the pointer
- * from freed task, and only corresponding thread will be able to use this pool
- * (no memory stealing and such).
- *
- * This leads to the following use of the pool:
- *
- * - task_push() should provide proper thread ID from which the task is being
- * pushed from.
- *
- * - Task allocation function which check corresponding memory pool and if there
- * is any memory in there it'll mark memory as re-used, remove it from the pool
- * and use that memory for the new task.
- *
- * At this moment task queue owns the memory.
- *
- * - When task is done and task_free() is called the memory will be put to the
- * pool which corresponds to a thread which handled the task.
- */
-typedef struct TaskMemPool {
- /* Number of pre-allocated tasks in the pool. */
- int num_tasks;
- /* Pre-allocated task memory pointers. */
- Task *tasks[MEMPOOL_SIZE];
-} TaskMemPool;
-
-#ifdef DEBUG_STATS
-typedef struct TaskMemPoolStats {
- /* Number of allocations. */
- int num_alloc;
- /* Number of avoided allocations (pointer was re-used from the pool). */
- int num_reuse;
- /* Number of discarded memory due to pool saturation, */
- int num_discard;
-} TaskMemPoolStats;
-#endif
-
-typedef struct TaskThreadLocalStorage {
- /* Memory pool for faster task allocation.
- * The idea is to re-use memory of finished/discarded tasks by this thread.
- */
- TaskMemPool task_mempool;
-
- /* Local queue keeps thread alive by keeping small amount of tasks ready
- * to be picked up without causing global thread locks for synchronization.
- */
- int num_local_queue;
- Task *local_queue[LOCAL_QUEUE_SIZE];
-
- /* Thread can be marked for delayed tasks push. This is helpful when it's
- * know that lots of subsequent task pushed will happen from the same thread
- * without "interrupting" for task execution.
- *
- * We try to accumulate as much tasks as possible in a local queue without
- * any locks first, and then we push all of them into a scheduler's queue
- * from within a single mutex lock.
- */
- bool do_delayed_push;
- int num_delayed_queue;
- Task *delayed_queue[DELAYED_QUEUE_SIZE];
-} TaskThreadLocalStorage;
-
-struct TaskPool {
- TaskScheduler *scheduler;
-
- volatile size_t num;
- ThreadMutex num_mutex;
- ThreadCondition num_cond;
-
- void *userdata;
- ThreadMutex user_mutex;
+ Task(TaskPool *pool,
+ TaskRunFunction run,
+ void *taskdata,
+ bool free_taskdata,
+ TaskFreeFunction freedata)
+ : pool(pool), run(run), taskdata(taskdata), free_taskdata(free_taskdata), freedata(freedata)
+ {
+ }
- volatile bool do_cancel;
- volatile bool do_work;
+ ~Task()
+ {
+ if (free_taskdata) {
+ if (freedata) {
+ freedata(pool, taskdata);
+ }
+ else {
+ MEM_freeN(taskdata);
+ }
+ }
+ }
- volatile bool is_suspended;
- bool start_suspended;
- ListBase suspended_queue;
- size_t num_suspended;
+ /* Move constructor. */
+ Task(Task &&other)
+ : pool(other.pool),
+ run(other.run),
+ taskdata(other.taskdata),
+ free_taskdata(other.free_taskdata),
+ freedata(other.freedata)
+ {
+ other.pool = NULL;
+ other.run = NULL;
+ other.taskdata = NULL;
+ other.free_taskdata = false;
+ other.freedata = NULL;
+ }
- TaskPriority priority;
+ /* Execute task. */
+ void operator()() const
+ {
+ run(pool, taskdata);
+ }
- /* If set, this pool may never be work_and_wait'ed, which means TaskScheduler
- * has to use its special background fallback thread in case we are in
- * single-threaded situation.
- */
- bool run_in_background;
+ /* For performance, ensure we never copy the task and only move it. */
+ Task(const Task &other) = delete;
+ Task &operator=(const Task &other) = delete;
+ Task &operator=(Task &&other) = delete;
+};
- /* This is a task scheduler's ID of a thread at which pool was constructed.
- * It will be used to access task TLS.
- */
- int thread_id;
+/* TBB Task Group.
+ *
+ * Subclass since there seems to be no other way to set priority. */
+
+#ifdef WITH_TBB
+class TBBTaskGroup : public tbb::task_group {
+ public:
+ TBBTaskGroup(TaskPriority priority)
+ {
+ switch (priority) {
+ case TASK_PRIORITY_LOW:
+ my_context.set_priority(tbb::priority_low);
+ break;
+ case TASK_PRIORITY_HIGH:
+ my_context.set_priority(tbb::priority_normal);
+ break;
+ }
+ }
- /* For the pools which are created from non-main thread which is not a
- * scheduler worker thread we can't re-use any of scheduler's threads TLS
- * and have to use our own one.
- */
- bool use_local_tls;
- TaskThreadLocalStorage local_tls;
-#ifndef NDEBUG
- pthread_t creator_thread_id;
+ ~TBBTaskGroup()
+ {
+ }
+};
#endif
-#ifdef DEBUG_STATS
- TaskMemPoolStats *mempool_stats;
-#endif
-};
+/* Task Pool */
-struct TaskScheduler {
- pthread_t *threads;
- struct TaskThread *task_threads;
- int num_threads;
- bool background_thread_only;
+typedef enum TaskPoolType {
+ TASK_POOL_TBB,
+ TASK_POOL_TBB_SUSPENDED,
+ TASK_POOL_NO_THREADS,
+ TASK_POOL_BACKGROUND,
+ TASK_POOL_BACKGROUND_SERIAL,
+} TaskPoolType;
- ListBase queue;
- ThreadMutex queue_mutex;
- ThreadCondition queue_cond;
+struct TaskPool {
+ TaskPoolType type;
+ bool use_threads;
- ThreadMutex startup_mutex;
- ThreadCondition startup_cond;
- volatile int num_thread_started;
+ ThreadMutex user_mutex;
+ void *userdata;
- volatile bool do_exit;
+ /* TBB task pool. */
+#ifdef WITH_TBB
+ TBBTaskGroup tbb_group;
+#endif
+ volatile bool is_suspended;
+ BLI_mempool *suspended_mempool;
- /* NOTE: In pthread's TLS we store the whole TaskThread structure. */
- pthread_key_t tls_id_key;
+ /* Background task pool. */
+ ListBase background_threads;
+ ThreadQueue *background_queue;
+ volatile bool background_is_canceling;
};
-typedef struct TaskThread {
- TaskScheduler *scheduler;
- int id;
- TaskThreadLocalStorage tls;
-} TaskThread;
-
-/* Helper */
-BLI_INLINE void task_data_free(Task *task, const int UNUSED(thread_id))
-{
- if (task->free_taskdata) {
- if (task->freedata) {
- task->freedata(task->pool, task->taskdata);
- }
- else {
- MEM_freeN(task->taskdata);
- }
- }
-}
-
-BLI_INLINE void initialize_task_tls(TaskThreadLocalStorage *tls)
-{
- memset(tls, 0, sizeof(TaskThreadLocalStorage));
-}
+/* TBB Task Pool.
+ *
+ * Task pool using the TBB scheduler for tasks. When building without TBB
+ * support or running Blender with -t 1, this reverts to single threaded.
+ *
+ * Tasks may be suspended until in all are created, to make it possible to
+ * initialize data structures and create tasks in a single pass. */
-BLI_INLINE TaskThreadLocalStorage *get_task_tls(TaskPool *pool, const int thread_id)
+static void tbb_task_pool_create(TaskPool *pool, TaskPriority priority)
{
- TaskScheduler *scheduler = pool->scheduler;
- BLI_assert(thread_id >= 0);
- BLI_assert(thread_id <= scheduler->num_threads);
- if (pool->use_local_tls && thread_id == 0) {
- BLI_assert(pool->thread_id == 0);
- BLI_assert(!BLI_thread_is_main());
- BLI_assert(pthread_equal(pthread_self(), pool->creator_thread_id));
- return &pool->local_tls;
- }
- if (thread_id == 0) {
- BLI_assert(BLI_thread_is_main());
- return &scheduler->task_threads[pool->thread_id].tls;
+ if (pool->type == TASK_POOL_TBB_SUSPENDED) {
+ pool->is_suspended = true;
+ pool->suspended_mempool = BLI_mempool_create(sizeof(Task), 512, 512, BLI_MEMPOOL_ALLOW_ITER);
}
- return &scheduler->task_threads[thread_id].tls;
-}
-BLI_INLINE void free_task_tls(TaskThreadLocalStorage *tls)
-{
- TaskMemPool *task_mempool = &tls->task_mempool;
- for (int i = 0; i < task_mempool->num_tasks; i++) {
- MEM_freeN(task_mempool->tasks[i]);
+#ifdef WITH_TBB
+ if (pool->use_threads) {
+ new (&pool->tbb_group) TBBTaskGroup(priority);
}
-}
-
-static Task *task_alloc(TaskPool *pool, const int thread_id)
-{
- BLI_assert(thread_id <= pool->scheduler->num_threads);
- if (thread_id != -1) {
- BLI_assert(thread_id >= 0);
- BLI_assert(thread_id <= pool->scheduler->num_threads);
- TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id);
- TaskMemPool *task_mempool = &tls->task_mempool;
- /* Try to re-use task memory from a thread local storage. */
- if (task_mempool->num_tasks > 0) {
- --task_mempool->num_tasks;
- /* Success! We've just avoided task allocation. */
-#ifdef DEBUG_STATS
- pool->mempool_stats[thread_id].num_reuse++;
#endif
- return task_mempool->tasks[task_mempool->num_tasks];
- }
- /* We are doomed to allocate new task data. */
-#ifdef DEBUG_STATS
- pool->mempool_stats[thread_id].num_alloc++;
-#endif
- }
- return (Task *)MEM_mallocN(sizeof(Task), "New task");
}
-static void task_free(TaskPool *pool, Task *task, const int thread_id)
+static void tbb_task_pool_run(TaskPool *pool, Task &&task)
{
- task_data_free(task, thread_id);
- BLI_assert(thread_id >= 0);
- BLI_assert(thread_id <= pool->scheduler->num_threads);
- if (thread_id == 0) {
- BLI_assert(pool->use_local_tls || BLI_thread_is_main());
+ if (pool->is_suspended) {
+ /* Suspended task that will be executed in work_and_wait(). */
+ Task *task_mem = (Task *)BLI_mempool_alloc(pool->suspended_mempool);
+ new (task_mem) Task(std::move(task));
+#ifdef __GNUC__
+ /* Work around apparent compiler bug where task is not properly copied
+ * to task_mem. This appears unrelated to the use of placement new or
+ * move semantics, happens even writing to a plain C struct. Rather the
+ * call into TBB seems to have some indirect effect. */
+ std::atomic_thread_fence(std::memory_order_release);
+#endif
}
- TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id);
- TaskMemPool *task_mempool = &tls->task_mempool;
- if (task_mempool->num_tasks < MEMPOOL_SIZE - 1) {
- /* Successfully allowed the task to be re-used later. */
- task_mempool->tasks[task_mempool->num_tasks] = task;
- ++task_mempool->num_tasks;
+#ifdef WITH_TBB
+ else if (pool->use_threads) {
+ /* Execute in TBB task group. */
+ pool->tbb_group.run(std::move(task));
}
- else {
- /* Local storage saturated, no other way than just discard
- * the memory.
- *
- * TODO(sergey): We can perhaps store such pointer in a global
- * scheduler pool, maybe it'll be faster than discarding and
- * allocating again.
- */
- MEM_freeN(task);
-#ifdef DEBUG_STATS
- pool->mempool_stats[thread_id].num_discard++;
#endif
+ else {
+ /* Execute immediately. */
+ task();
}
}
-/* Task Scheduler */
-
-static void task_pool_num_decrease(TaskPool *pool, size_t done)
+static void tbb_task_pool_work_and_wait(TaskPool *pool)
{
- BLI_mutex_lock(&pool->num_mutex);
-
- BLI_assert(pool->num >= done);
+ /* Start any suspended task now. */
+ if (pool->suspended_mempool) {
+ pool->is_suspended = false;
- pool->num -= done;
+ BLI_mempool_iter iter;
+ BLI_mempool_iternew(pool->suspended_mempool, &iter);
+ while (Task *task = (Task *)BLI_mempool_iterstep(&iter)) {
+ tbb_task_pool_run(pool, std::move(*task));
+ }
- if (pool->num == 0) {
- BLI_condition_notify_all(&pool->num_cond);
+ BLI_mempool_clear(pool->suspended_mempool);
}
- BLI_mutex_unlock(&pool->num_mutex);
+#ifdef WITH_TBB
+ if (pool->use_threads) {
+ /* This is called wait(), but internally it can actually do work. This
+ * matters because we don't want recursive usage of task pools to run
+ * out of threads and get stuck. */
+ pool->tbb_group.wait();
+ }
+#endif
}
-static void task_pool_num_increase(TaskPool *pool, size_t new_num)
+static void tbb_task_pool_cancel(TaskPool *pool)
{
- BLI_mutex_lock(&pool->num_mutex);
-
- pool->num += new_num;
- BLI_condition_notify_all(&pool->num_cond);
-
- BLI_mutex_unlock(&pool->num_mutex);
+#ifdef WITH_TBB
+ if (pool->use_threads) {
+ pool->tbb_group.cancel();
+ pool->tbb_group.wait();
+ }
+#endif
}
-static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task)
+static bool tbb_task_pool_canceled(TaskPool *pool)
{
- bool found_task = false;
- BLI_mutex_lock(&scheduler->queue_mutex);
-
- while (!scheduler->queue.first && !scheduler->do_exit) {
- BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex);
+#ifdef WITH_TBB
+ if (pool->use_threads) {
+ return pool->tbb_group.is_canceling();
}
+#endif
- do {
- Task *current_task;
-
- /* Assuming we can only have a void queue in 'exit' case here seems logical
- * (we should only be here after our worker thread has been woken up from a
- * condition_wait(), which only happens after a new task was added to the queue),
- * but it is wrong.
- * Waiting on condition may wake up the thread even if condition is not signaled
- * (spurious wake-ups), and some race condition may also empty the queue **after**
- * condition has been signaled, but **before** awoken thread reaches this point...
- * See http://stackoverflow.com/questions/8594591
- *
- * So we only abort here if do_exit is set.
- */
- if (scheduler->do_exit) {
- BLI_mutex_unlock(&scheduler->queue_mutex);
- return false;
- }
-
- for (current_task = (Task *)scheduler->queue.first; current_task != NULL;
- current_task = current_task->next) {
- TaskPool *pool = current_task->pool;
-
- if (scheduler->background_thread_only && !pool->run_in_background) {
- continue;
- }
-
- *task = current_task;
- found_task = true;
- BLI_remlink(&scheduler->queue, *task);
- break;
- }
- if (!found_task) {
- BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex);
- }
- } while (!found_task);
-
- BLI_mutex_unlock(&scheduler->queue_mutex);
-
- return true;
+ return false;
}
-BLI_INLINE void handle_local_queue(TaskThreadLocalStorage *tls, const int thread_id)
+static void tbb_task_pool_free(TaskPool *pool)
{
- BLI_assert(!tls->do_delayed_push);
- while (tls->num_local_queue > 0) {
- /* We pop task from queue before handling it so handler of the task can
- * push next job to the local queue.
- */
- tls->num_local_queue--;
- Task *local_task = tls->local_queue[tls->num_local_queue];
- /* TODO(sergey): Double-check work_and_wait() doesn't handle other's
- * pool tasks.
- */
- TaskPool *local_pool = local_task->pool;
- local_task->run(local_pool, local_task->taskdata, thread_id);
- task_free(local_pool, local_task, thread_id);
+#ifdef WITH_TBB
+ if (pool->use_threads) {
+ pool->tbb_group.~TBBTaskGroup();
}
- BLI_assert(!tls->do_delayed_push);
-}
+#endif
-static void *task_scheduler_thread_run(void *thread_p)
-{
- TaskThread *thread = (TaskThread *)thread_p;
- TaskThreadLocalStorage *tls = &thread->tls;
- TaskScheduler *scheduler = thread->scheduler;
- int thread_id = thread->id;
- Task *task;
-
- pthread_setspecific(scheduler->tls_id_key, thread);
-
- /* signal the main thread when all threads have started */
- BLI_mutex_lock(&scheduler->startup_mutex);
- scheduler->num_thread_started++;
- if (scheduler->num_thread_started == scheduler->num_threads) {
- BLI_condition_notify_one(&scheduler->startup_cond);
+ if (pool->suspended_mempool) {
+ BLI_mempool_destroy(pool->suspended_mempool);
}
- BLI_mutex_unlock(&scheduler->startup_mutex);
-
- /* keep popping off tasks */
- while (task_scheduler_thread_wait_pop(scheduler, &task)) {
- TaskPool *pool = task->pool;
-
- /* run task */
- BLI_assert(!tls->do_delayed_push);
- task->run(pool, task->taskdata, thread_id);
- BLI_assert(!tls->do_delayed_push);
-
- /* delete task */
- task_free(pool, task, thread_id);
+}
- /* Handle all tasks from local queue. */
- handle_local_queue(tls, thread_id);
+/* Background Task Pool.
+ *
+ * Fallback for running background tasks when building without TBB. */
- /* notify pool task was done */
- task_pool_num_decrease(pool, 1);
+static void *background_task_run(void *userdata)
+{
+ TaskPool *pool = (TaskPool *)userdata;
+ while (Task *task = (Task *)BLI_thread_queue_pop(pool->background_queue)) {
+ (*task)();
+ task->~Task();
+ MEM_freeN(task);
}
-
return NULL;
}
-TaskScheduler *BLI_task_scheduler_create(int num_threads)
+static void background_task_pool_create(TaskPool *pool)
{
- TaskScheduler *scheduler = (TaskScheduler *)MEM_callocN(sizeof(TaskScheduler), "TaskScheduler");
-
- /* multiple places can use this task scheduler, sharing the same
- * threads, so we keep track of the number of users. */
- scheduler->do_exit = false;
-
- BLI_listbase_clear(&scheduler->queue);
- BLI_mutex_init(&scheduler->queue_mutex);
- BLI_condition_init(&scheduler->queue_cond);
-
- BLI_mutex_init(&scheduler->startup_mutex);
- BLI_condition_init(&scheduler->startup_cond);
- scheduler->num_thread_started = 0;
-
- if (num_threads == 0) {
- /* automatic number of threads will be main thread + num cores */
- num_threads = BLI_system_thread_count();
- }
-
- /* main thread will also work, so we count it too */
- num_threads -= 1;
-
- /* Add background-only thread if needed. */
- if (num_threads == 0) {
- scheduler->background_thread_only = true;
- num_threads = 1;
- }
-
- scheduler->task_threads = (TaskThread *)MEM_mallocN(sizeof(TaskThread) * (num_threads + 1),
- "TaskScheduler task threads");
-
- /* Initialize TLS for main thread. */
- initialize_task_tls(&scheduler->task_threads[0].tls);
-
- pthread_key_create(&scheduler->tls_id_key, NULL);
-
- /* launch threads that will be waiting for work */
- if (num_threads > 0) {
- int i;
-
- scheduler->num_threads = num_threads;
- scheduler->threads = (pthread_t *)MEM_callocN(sizeof(pthread_t) * num_threads,
- "TaskScheduler threads");
-
- for (i = 0; i < num_threads; i++) {
- TaskThread *thread = &scheduler->task_threads[i + 1];
- thread->scheduler = scheduler;
- thread->id = i + 1;
- initialize_task_tls(&thread->tls);
-
- if (pthread_create(&scheduler->threads[i], NULL, task_scheduler_thread_run, thread) != 0) {
- fprintf(stderr, "TaskScheduler failed to launch thread %d/%d\n", i, num_threads);
- }
- }
- }
-
- /* Wait for all worker threads to start before returning to caller to prevent the case where
- * threads are still starting and pthread_join is called, which causes a deadlock on pthreads4w.
- */
- BLI_mutex_lock(&scheduler->startup_mutex);
- /* NOTE: Use loop here to avoid false-positive everything-is-ready caused by spontaneous thread
- * wake up. */
- while (scheduler->num_thread_started != num_threads) {
- BLI_condition_wait(&scheduler->startup_cond, &scheduler->startup_mutex);
- }
- BLI_mutex_unlock(&scheduler->startup_mutex);
-
- return scheduler;
+ pool->background_queue = BLI_thread_queue_init();
+ BLI_threadpool_init(&pool->background_threads, background_task_run, 1);
+ BLI_threadpool_insert(&pool->background_threads, pool);
}
-void BLI_task_scheduler_free(TaskScheduler *scheduler)
+static void background_task_pool_run(TaskPool *pool, Task &&task)
{
- Task *task;
-
- /* stop all waiting threads */
- BLI_mutex_lock(&scheduler->queue_mutex);
- scheduler->do_exit = true;
- BLI_condition_notify_all(&scheduler->queue_cond);
- BLI_mutex_unlock(&scheduler->queue_mutex);
-
- pthread_key_delete(scheduler->tls_id_key);
-
- /* delete threads */
- if (scheduler->threads) {
- int i;
-
- for (i = 0; i < scheduler->num_threads; i++) {
- if (pthread_join(scheduler->threads[i], NULL) != 0) {
- fprintf(stderr, "TaskScheduler failed to join thread %d/%d\n", i, scheduler->num_threads);
- }
- }
-
- MEM_freeN(scheduler->threads);
- }
-
- /* Delete task thread data */
- if (scheduler->task_threads) {
- for (int i = 0; i < scheduler->num_threads + 1; i++) {
- TaskThreadLocalStorage *tls = &scheduler->task_threads[i].tls;
- free_task_tls(tls);
- }
-
- MEM_freeN(scheduler->task_threads);
- }
-
- /* delete leftover tasks */
- for (task = (Task *)scheduler->queue.first; task; task = task->next) {
- task_data_free(task, 0);
- }
- BLI_freelistN(&scheduler->queue);
-
- /* delete mutex/condition */
- BLI_mutex_end(&scheduler->queue_mutex);
- BLI_condition_end(&scheduler->queue_cond);
- BLI_mutex_end(&scheduler->startup_mutex);
- BLI_condition_end(&scheduler->startup_cond);
-
- MEM_freeN(scheduler);
+ Task *task_mem = (Task *)MEM_mallocN(sizeof(Task), __func__);
+ new (task_mem) Task(std::move(task));
+ BLI_thread_queue_push(pool->background_queue, task_mem);
}
-int BLI_task_scheduler_num_threads(TaskScheduler *scheduler)
+static void background_task_pool_work_and_wait(TaskPool *pool)
{
- return scheduler->num_threads + 1;
+ /* Signal background thread to stop waiting for new tasks if none are
+ * left, and wait for tasks and thread to finish. */
+ BLI_thread_queue_nowait(pool->background_queue);
+ BLI_thread_queue_wait_finish(pool->background_queue);
+ BLI_threadpool_remove(&pool->background_threads, pool);
}
-static void task_scheduler_push(TaskScheduler *scheduler, Task *task, TaskPriority priority)
+static void background_task_pool_cancel(TaskPool *pool)
{
- task_pool_num_increase(task->pool, 1);
-
- /* add task to queue */
- BLI_mutex_lock(&scheduler->queue_mutex);
+ pool->background_is_canceling = true;
- if (priority == TASK_PRIORITY_HIGH) {
- BLI_addhead(&scheduler->queue, task);
- }
- else {
- BLI_addtail(&scheduler->queue, task);
+ /* Remove tasks not yet started by background thread. */
+ BLI_thread_queue_nowait(pool->background_queue);
+ while (Task *task = (Task *)BLI_thread_queue_pop(pool->background_queue)) {
+ task->~Task();
+ MEM_freeN(task);
}
- BLI_condition_notify_one(&scheduler->queue_cond);
- BLI_mutex_unlock(&scheduler->queue_mutex);
+ /* Let background thread finish or cancel task it is working on. */
+ BLI_threadpool_remove(&pool->background_threads, pool);
+ pool->background_is_canceling = false;
}
-static void task_scheduler_push_all(TaskScheduler *scheduler,
- TaskPool *pool,
- Task **tasks,
- int num_tasks)
+static bool background_task_pool_canceled(TaskPool *pool)
{
- if (num_tasks == 0) {
- return;
- }
-
- task_pool_num_increase(pool, num_tasks);
-
- BLI_mutex_lock(&scheduler->queue_mutex);
-
- for (int i = 0; i < num_tasks; i++) {
- BLI_addhead(&scheduler->queue, tasks[i]);
- }
-
- BLI_condition_notify_all(&scheduler->queue_cond);
- BLI_mutex_unlock(&scheduler->queue_mutex);
+ return pool->background_is_canceling;
}
-static void task_scheduler_clear(TaskScheduler *scheduler, TaskPool *pool)
+static void background_task_pool_free(TaskPool *pool)
{
- Task *task, *nexttask;
- size_t done = 0;
-
- BLI_mutex_lock(&scheduler->queue_mutex);
-
- /* free all tasks from this pool from the queue */
- for (task = (Task *)scheduler->queue.first; task; task = nexttask) {
- nexttask = task->next;
-
- if (task->pool == pool) {
- task_data_free(task, pool->thread_id);
- BLI_freelinkN(&scheduler->queue, task);
-
- done++;
- }
- }
+ background_task_pool_work_and_wait(pool);
- BLI_mutex_unlock(&scheduler->queue_mutex);
-
- /* notify done */
- task_pool_num_decrease(pool, done);
+ BLI_threadpool_end(&pool->background_threads);
+ BLI_thread_queue_free(pool->background_queue);
}
/* Task Pool */
-static TaskPool *task_pool_create_ex(TaskScheduler *scheduler,
- void *userdata,
- const bool is_background,
- const bool is_suspended,
- TaskPriority priority)
+static TaskPool *task_pool_create_ex(void *userdata, TaskPoolType type, TaskPriority priority)
{
- TaskPool *pool = (TaskPool *)MEM_mallocN(sizeof(TaskPool), "TaskPool");
+ /* Ensure malloc will go fine from threads,
+ *
+ * This is needed because we could be in main thread here
+ * and malloc could be non-thread safe at this point because
+ * no other jobs are running.
+ */
+ BLI_threaded_malloc_begin();
-#ifndef NDEBUG
- /* Assert we do not try to create a background pool from some parent task -
- * those only work OK from main thread. */
- if (is_background) {
- const pthread_t thread_id = pthread_self();
- int i = scheduler->num_threads;
+ const bool use_threads = BLI_task_scheduler_num_threads() > 1 && type != TASK_POOL_NO_THREADS;
- while (i--) {
- BLI_assert(!pthread_equal(scheduler->threads[i], thread_id));
- }
+ /* Background task pool uses regular TBB scheduling if available. Only when
+ * building without TBB or running with -t 1 do we need to ensure these tasks
+ * do not block the main thread. */
+ if (type == TASK_POOL_BACKGROUND && use_threads) {
+ type = TASK_POOL_TBB;
}
-#endif
- pool->scheduler = scheduler;
- pool->num = 0;
- pool->do_cancel = false;
- pool->do_work = false;
- pool->is_suspended = is_suspended;
- pool->start_suspended = is_suspended;
- pool->num_suspended = 0;
- pool->suspended_queue.first = pool->suspended_queue.last = NULL;
- pool->priority = priority;
- pool->run_in_background = is_background;
- pool->use_local_tls = false;
-
- BLI_mutex_init(&pool->num_mutex);
- BLI_condition_init(&pool->num_cond);
+ /* Allocate task pool. */
+ TaskPool *pool = (TaskPool *)MEM_callocN(sizeof(TaskPool), "TaskPool");
+
+ pool->type = type;
+ pool->use_threads = use_threads;
pool->userdata = userdata;
BLI_mutex_init(&pool->user_mutex);
- if (BLI_thread_is_main()) {
- pool->thread_id = 0;
- }
- else {
- TaskThread *thread = (TaskThread *)pthread_getspecific(scheduler->tls_id_key);
- if (thread == NULL) {
- /* NOTE: Task pool is created from non-main thread which is not
- * managed by the task scheduler. We identify ourselves as thread ID
- * 0 but we do not use scheduler's TLS storage and use our own
- * instead to avoid any possible threading conflicts.
- */
- pool->thread_id = 0;
- pool->use_local_tls = true;
-#ifndef NDEBUG
- pool->creator_thread_id = pthread_self();
-#endif
- initialize_task_tls(&pool->local_tls);
- }
- else {
- pool->thread_id = thread->id;
- }
+ switch (type) {
+ case TASK_POOL_TBB:
+ case TASK_POOL_TBB_SUSPENDED:
+ case TASK_POOL_NO_THREADS:
+ tbb_task_pool_create(pool, priority);
+ break;
+ case TASK_POOL_BACKGROUND:
+ case TASK_POOL_BACKGROUND_SERIAL:
+ background_task_pool_create(pool);
+ break;
}
-#ifdef DEBUG_STATS
- pool->mempool_stats = (TaskMemPoolStats *)MEM_callocN(
- sizeof(*pool->mempool_stats) * (scheduler->num_threads + 1), "per-taskpool mempool stats");
-#endif
-
- /* Ensure malloc will go fine from threads,
- *
- * This is needed because we could be in main thread here
- * and malloc could be non-thread safe at this point because
- * no other jobs are running.
- */
- BLI_threaded_malloc_begin();
-
return pool;
}
/**
* Create a normal task pool. Tasks will be executed as soon as they are added.
*/
-TaskPool *BLI_task_pool_create(TaskScheduler *scheduler, void *userdata, TaskPriority priority)
+TaskPool *BLI_task_pool_create(void *userdata, TaskPriority priority)
{
- return task_pool_create_ex(scheduler, userdata, false, false, priority);
+ return task_pool_create_ex(userdata, TASK_POOL_TBB, priority);
}
/**
@@ -756,11 +393,9 @@ TaskPool *BLI_task_pool_create(TaskScheduler *scheduler, void *userdata, TaskPri
* they could end never being executed, since the 'fallback' background thread is already
* busy with parent task in single-threaded context).
*/
-TaskPool *BLI_task_pool_create_background(TaskScheduler *scheduler,
- void *userdata,
- TaskPriority priority)
+TaskPool *BLI_task_pool_create_background(void *userdata, TaskPriority priority)
{
- return task_pool_create_ex(scheduler, userdata, true, false, priority);
+ return task_pool_create_ex(userdata, TASK_POOL_BACKGROUND, priority);
}
/**
@@ -768,228 +403,114 @@ TaskPool *BLI_task_pool_create_background(TaskScheduler *scheduler,
* for until BLI_task_pool_work_and_wait() is called. This helps reducing threading
* overhead when pushing huge amount of small initial tasks from the main thread.
*/
-TaskPool *BLI_task_pool_create_suspended(TaskScheduler *scheduler,
- void *userdata,
- TaskPriority priority)
+TaskPool *BLI_task_pool_create_suspended(void *userdata, TaskPriority priority)
{
- return task_pool_create_ex(scheduler, userdata, false, true, priority);
+ return task_pool_create_ex(userdata, TASK_POOL_TBB_SUSPENDED, priority);
}
-void BLI_task_pool_free(TaskPool *pool)
+/**
+ * Single threaded task pool that executes pushed task immediately, for
+ * debugging purposes.
+ */
+TaskPool *BLI_task_pool_create_no_threads(void *userdata)
{
- BLI_task_pool_cancel(pool);
-
- BLI_mutex_end(&pool->num_mutex);
- BLI_condition_end(&pool->num_cond);
+ return task_pool_create_ex(userdata, TASK_POOL_NO_THREADS, TASK_PRIORITY_HIGH);
+}
- BLI_mutex_end(&pool->user_mutex);
+/**
+ * Task pool that executeds one task after the other, possibly on different threads
+ * but never in parallel.
+ */
+TaskPool *BLI_task_pool_create_background_serial(void *userdata, TaskPriority priority)
+{
+ return task_pool_create_ex(userdata, TASK_POOL_BACKGROUND_SERIAL, priority);
+}
-#ifdef DEBUG_STATS
- printf("Thread ID Allocated Reused Discarded\n");
- for (int i = 0; i < pool->scheduler->num_threads + 1; i++) {
- printf("%02d %05d %05d %05d\n",
- i,
- pool->mempool_stats[i].num_alloc,
- pool->mempool_stats[i].num_reuse,
- pool->mempool_stats[i].num_discard);
+void BLI_task_pool_free(TaskPool *pool)
+{
+ switch (pool->type) {
+ case TASK_POOL_TBB:
+ case TASK_POOL_TBB_SUSPENDED:
+ case TASK_POOL_NO_THREADS:
+ tbb_task_pool_free(pool);
+ break;
+ case TASK_POOL_BACKGROUND:
+ case TASK_POOL_BACKGROUND_SERIAL:
+ background_task_pool_free(pool);
+ break;
}
- MEM_freeN(pool->mempool_stats);
-#endif
- if (pool->use_local_tls) {
- free_task_tls(&pool->local_tls);
- }
+ BLI_mutex_end(&pool->user_mutex);
MEM_freeN(pool);
BLI_threaded_malloc_end();
}
-BLI_INLINE bool task_can_use_local_queues(TaskPool *pool, int thread_id)
-{
- return (thread_id != -1 && (thread_id != pool->thread_id || pool->do_work));
-}
-
-static void task_pool_push(TaskPool *pool,
- TaskRunFunction run,
- void *taskdata,
- bool free_taskdata,
- TaskFreeFunction freedata,
- int thread_id)
-{
- /* Allocate task and fill it's properties. */
- Task *task = task_alloc(pool, thread_id);
- task->run = run;
- task->taskdata = taskdata;
- task->free_taskdata = free_taskdata;
- task->freedata = freedata;
- task->pool = pool;
- /* For suspended pools we put everything yo a global queue first
- * and exit as soon as possible.
- *
- * This tasks will be moved to actual execution when pool is
- * activated by work_and_wait().
- */
- if (pool->is_suspended) {
- BLI_addhead(&pool->suspended_queue, task);
- atomic_fetch_and_add_z(&pool->num_suspended, 1);
- return;
- }
- /* Populate to any local queue first, this is cheapest push ever. */
- if (task_can_use_local_queues(pool, thread_id)) {
- ASSERT_THREAD_ID(pool->scheduler, thread_id);
- TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id);
- /* Try to push to a local execution queue.
- * These tasks will be picked up next.
- */
- if (tls->num_local_queue < LOCAL_QUEUE_SIZE) {
- tls->local_queue[tls->num_local_queue] = task;
- tls->num_local_queue++;
- return;
- }
- /* If we are in the delayed tasks push mode, we push tasks to a
- * temporary local queue first without any locks, and then move them
- * to global execution queue with a single lock.
- */
- if (tls->do_delayed_push && tls->num_delayed_queue < DELAYED_QUEUE_SIZE) {
- tls->delayed_queue[tls->num_delayed_queue] = task;
- tls->num_delayed_queue++;
- return;
- }
- }
- /* Do push to a global execution pool, slowest possible method,
- * causes quite reasonable amount of threading overhead.
- */
- task_scheduler_push(pool->scheduler, task, pool->priority);
-}
-
void BLI_task_pool_push(TaskPool *pool,
TaskRunFunction run,
void *taskdata,
bool free_taskdata,
TaskFreeFunction freedata)
{
- task_pool_push(pool, run, taskdata, free_taskdata, freedata, -1);
-}
+ Task task(pool, run, taskdata, free_taskdata, freedata);
-void BLI_task_pool_push_from_thread(TaskPool *pool,
- TaskRunFunction run,
- void *taskdata,
- bool free_taskdata,
- TaskFreeFunction freedata,
- int thread_id)
-{
- task_pool_push(pool, run, taskdata, free_taskdata, freedata, thread_id);
+ switch (pool->type) {
+ case TASK_POOL_TBB:
+ case TASK_POOL_TBB_SUSPENDED:
+ case TASK_POOL_NO_THREADS:
+ tbb_task_pool_run(pool, std::move(task));
+ break;
+ case TASK_POOL_BACKGROUND:
+ case TASK_POOL_BACKGROUND_SERIAL:
+ background_task_pool_run(pool, std::move(task));
+ break;
+ }
}
void BLI_task_pool_work_and_wait(TaskPool *pool)
{
- TaskThreadLocalStorage *tls = get_task_tls(pool, pool->thread_id);
- TaskScheduler *scheduler = pool->scheduler;
-
- if (atomic_fetch_and_and_uint8((uint8_t *)&pool->is_suspended, 0)) {
- if (pool->num_suspended) {
- task_pool_num_increase(pool, pool->num_suspended);
- BLI_mutex_lock(&scheduler->queue_mutex);
-
- BLI_movelisttolist(&scheduler->queue, &pool->suspended_queue);
-
- BLI_condition_notify_all(&scheduler->queue_cond);
- BLI_mutex_unlock(&scheduler->queue_mutex);
-
- pool->num_suspended = 0;
- }
- }
-
- pool->do_work = true;
-
- ASSERT_THREAD_ID(pool->scheduler, pool->thread_id);
-
- handle_local_queue(tls, pool->thread_id);
-
- BLI_mutex_lock(&pool->num_mutex);
-
- while (pool->num != 0) {
- Task *task, *work_task = NULL;
- bool found_task = false;
-
- BLI_mutex_unlock(&pool->num_mutex);
-
- BLI_mutex_lock(&scheduler->queue_mutex);
-
- /* find task from this pool. if we get a task from another pool,
- * we can get into deadlock */
-
- for (task = (Task *)scheduler->queue.first; task; task = task->next) {
- if (task->pool == pool) {
- work_task = task;
- found_task = true;
- BLI_remlink(&scheduler->queue, task);
- break;
- }
- }
-
- BLI_mutex_unlock(&scheduler->queue_mutex);
-
- /* if found task, do it, otherwise wait until other tasks are done */
- if (found_task) {
- /* run task */
- BLI_assert(!tls->do_delayed_push);
- work_task->run(pool, work_task->taskdata, pool->thread_id);
- BLI_assert(!tls->do_delayed_push);
-
- /* delete task */
- task_free(pool, task, pool->thread_id);
-
- /* Handle all tasks from local queue. */
- handle_local_queue(tls, pool->thread_id);
-
- /* notify pool task was done */
- task_pool_num_decrease(pool, 1);
- }
-
- BLI_mutex_lock(&pool->num_mutex);
- if (pool->num == 0) {
+ switch (pool->type) {
+ case TASK_POOL_TBB:
+ case TASK_POOL_TBB_SUSPENDED:
+ case TASK_POOL_NO_THREADS:
+ tbb_task_pool_work_and_wait(pool);
+ break;
+ case TASK_POOL_BACKGROUND:
+ case TASK_POOL_BACKGROUND_SERIAL:
+ background_task_pool_work_and_wait(pool);
break;
- }
-
- if (!found_task) {
- BLI_condition_wait(&pool->num_cond, &pool->num_mutex);
- }
}
-
- BLI_mutex_unlock(&pool->num_mutex);
-
- BLI_assert(tls->num_local_queue == 0);
-}
-
-void BLI_task_pool_work_wait_and_reset(TaskPool *pool)
-{
- BLI_task_pool_work_and_wait(pool);
-
- pool->do_work = false;
- pool->is_suspended = pool->start_suspended;
}
void BLI_task_pool_cancel(TaskPool *pool)
{
- pool->do_cancel = true;
-
- task_scheduler_clear(pool->scheduler, pool);
-
- /* wait until all entries are cleared */
- BLI_mutex_lock(&pool->num_mutex);
- while (pool->num) {
- BLI_condition_wait(&pool->num_cond, &pool->num_mutex);
+ switch (pool->type) {
+ case TASK_POOL_TBB:
+ case TASK_POOL_TBB_SUSPENDED:
+ case TASK_POOL_NO_THREADS:
+ tbb_task_pool_cancel(pool);
+ break;
+ case TASK_POOL_BACKGROUND:
+ case TASK_POOL_BACKGROUND_SERIAL:
+ background_task_pool_cancel(pool);
+ break;
}
- BLI_mutex_unlock(&pool->num_mutex);
-
- pool->do_cancel = false;
}
bool BLI_task_pool_canceled(TaskPool *pool)
{
- return pool->do_cancel;
+ switch (pool->type) {
+ case TASK_POOL_TBB:
+ case TASK_POOL_TBB_SUSPENDED:
+ case TASK_POOL_NO_THREADS:
+ return tbb_task_pool_canceled(pool);
+ case TASK_POOL_BACKGROUND:
+ case TASK_POOL_BACKGROUND_SERIAL:
+ return background_task_pool_canceled(pool);
+ }
+ BLI_assert("BLI_task_pool_canceled: Control flow should not come here!");
+ return false;
}
void *BLI_task_pool_user_data(TaskPool *pool)
@@ -1000,30 +521,4 @@ void *BLI_task_pool_user_data(TaskPool *pool)
ThreadMutex *BLI_task_pool_user_mutex(TaskPool *pool)
{
return &pool->user_mutex;
-}
-
-int BLI_task_pool_creator_thread_id(TaskPool *pool)
-{
- return pool->thread_id;
-}
-
-void BLI_task_pool_delayed_push_begin(TaskPool *pool, int thread_id)
-{
- if (task_can_use_local_queues(pool, thread_id)) {
- ASSERT_THREAD_ID(pool->scheduler, thread_id);
- TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id);
- tls->do_delayed_push = true;
- }
-}
-
-void BLI_task_pool_delayed_push_end(TaskPool *pool, int thread_id)
-{
- if (task_can_use_local_queues(pool, thread_id)) {
- ASSERT_THREAD_ID(pool->scheduler, thread_id);
- TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id);
- BLI_assert(tls->do_delayed_push);
- task_scheduler_push_all(pool->scheduler, pool, tls->delayed_queue, tls->num_delayed_queue);
- tls->do_delayed_push = false;
- tls->num_delayed_queue = 0;
- }
-}
+} \ No newline at end of file
diff --git a/source/blender/blenlib/intern/task_range.cc b/source/blender/blenlib/intern/task_range.cc
new file mode 100644
index 00000000000..a8447c305e0
--- /dev/null
+++ b/source/blender/blenlib/intern/task_range.cc
@@ -0,0 +1,167 @@
+/*
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+/** \file
+ * \ingroup bli
+ *
+ * Task parallel range functions.
+ */
+
+#include <stdlib.h>
+
+#include "MEM_guardedalloc.h"
+
+#include "DNA_listBase.h"
+
+#include "BLI_task.h"
+#include "BLI_threads.h"
+
+#include "atomic_ops.h"
+
+#ifdef WITH_TBB
+/* Quiet top level deprecation message, unrelated to API usage here. */
+# define TBB_SUPPRESS_DEPRECATED_MESSAGES 1
+# include <tbb/tbb.h>
+#endif
+
+#ifdef WITH_TBB
+
+/* Functor for running TBB parallel_for and parallel_reduce. */
+struct RangeTask {
+ TaskParallelRangeFunc func;
+ void *userdata;
+ const TaskParallelSettings *settings;
+
+ void *userdata_chunk;
+
+ /* Root constructor. */
+ RangeTask(TaskParallelRangeFunc func, void *userdata, const TaskParallelSettings *settings)
+ : func(func), userdata(userdata), settings(settings)
+ {
+ init_chunk(settings->userdata_chunk);
+ }
+
+ /* Copy constructor. */
+ RangeTask(const RangeTask &other)
+ : func(other.func), userdata(other.userdata), settings(other.settings)
+ {
+ init_chunk(settings->userdata_chunk);
+ }
+
+ /* Splitting constructor for parallel reduce. */
+ RangeTask(RangeTask &other, tbb::split)
+ : func(other.func), userdata(other.userdata), settings(other.settings)
+ {
+ init_chunk(settings->userdata_chunk);
+ }
+
+ ~RangeTask()
+ {
+ if (settings->func_free != NULL) {
+ settings->func_free(userdata, userdata_chunk);
+ }
+ MEM_SAFE_FREE(userdata_chunk);
+ }
+
+ void init_chunk(void *from_chunk)
+ {
+ if (from_chunk) {
+ userdata_chunk = MEM_mallocN(settings->userdata_chunk_size, "RangeTask");
+ memcpy(userdata_chunk, from_chunk, settings->userdata_chunk_size);
+ }
+ else {
+ userdata_chunk = NULL;
+ }
+ }
+
+ void operator()(const tbb::blocked_range<int> &r) const
+ {
+ TaskParallelTLS tls;
+ tls.userdata_chunk = userdata_chunk;
+ for (int i = r.begin(); i != r.end(); ++i) {
+ func(userdata, i, &tls);
+ }
+ }
+
+ void join(const RangeTask &other)
+ {
+ settings->func_reduce(userdata, userdata_chunk, other.userdata_chunk);
+ }
+};
+
+#endif
+
+void BLI_task_parallel_range(const int start,
+ const int stop,
+ void *userdata,
+ TaskParallelRangeFunc func,
+ const TaskParallelSettings *settings)
+{
+#ifdef WITH_TBB
+ /* Multithreading. */
+ if (settings->use_threading && BLI_task_scheduler_num_threads() > 1) {
+ RangeTask task(func, userdata, settings);
+ const size_t grainsize = MAX2(settings->min_iter_per_thread, 1);
+ const tbb::blocked_range<int> range(start, stop, grainsize);
+
+ if (settings->func_reduce) {
+ parallel_reduce(range, task);
+ if (settings->userdata_chunk) {
+ memcpy(settings->userdata_chunk, task.userdata_chunk, settings->userdata_chunk_size);
+ }
+ }
+ else {
+ parallel_for(range, task);
+ }
+
+ return;
+ }
+#endif
+
+ /* Single threaded. Nothing to reduce as everything is accumulated into the
+ * main userdata chunk directly. */
+ TaskParallelTLS tls;
+ tls.userdata_chunk = settings->userdata_chunk;
+ for (int i = start; i < stop; i++) {
+ func(userdata, i, &tls);
+ }
+ if (settings->func_free != NULL) {
+ settings->func_free(userdata, settings->userdata_chunk);
+ }
+}
+
+int BLI_task_parallel_thread_id(const TaskParallelTLS *UNUSED(tls))
+{
+#ifdef WITH_TBB
+ /* Get a unique thread ID for texture nodes. In the future we should get rid
+ * of the thread ID and change texture evaluation to not require per-thread
+ * storage that can't be efficiently allocated on the stack. */
+ static tbb::enumerable_thread_specific<int> tbb_thread_id(-1);
+ static int tbb_thread_id_counter = 0;
+
+ int &thread_id = tbb_thread_id.local();
+ if (thread_id == -1) {
+ thread_id = atomic_fetch_and_add_int32(&tbb_thread_id_counter, 1);
+ if (thread_id >= BLENDER_MAX_THREADS) {
+ BLI_assert(!"Maximum number of threads exceeded for sculpting");
+ thread_id = thread_id % BLENDER_MAX_THREADS;
+ }
+ }
+ return thread_id;
+#else
+ return 0;
+#endif
+}
diff --git a/source/blender/blenlib/intern/task_scheduler.cc b/source/blender/blenlib/intern/task_scheduler.cc
new file mode 100644
index 00000000000..682fee5c46d
--- /dev/null
+++ b/source/blender/blenlib/intern/task_scheduler.cc
@@ -0,0 +1,73 @@
+/*
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+/** \file
+ * \ingroup bli
+ *
+ * Task scheduler initialization.
+ */
+
+#include "MEM_guardedalloc.h"
+
+#include "BLI_task.h"
+#include "BLI_threads.h"
+
+#ifdef WITH_TBB
+/* Quiet top level deprecation message, unrelated to API usage here. */
+# define TBB_SUPPRESS_DEPRECATED_MESSAGES 1
+# include <tbb/tbb.h>
+#endif
+
+/* Task Scheduler */
+
+static int task_scheduler_num_threads = 1;
+#ifdef WITH_TBB
+static tbb::global_control *task_scheduler_global_control = nullptr;
+#endif
+
+void BLI_task_scheduler_init()
+{
+#ifdef WITH_TBB
+ const int num_threads_override = BLI_system_num_threads_override_get();
+
+ if (num_threads_override > 0) {
+ /* Override number of threads. This settings is used within the lifetime
+ * of tbb::global_control, so we allocate it on the heap. */
+ task_scheduler_global_control = OBJECT_GUARDED_NEW(
+ tbb::global_control, tbb::global_control::max_allowed_parallelism, num_threads_override);
+ task_scheduler_num_threads = num_threads_override;
+ }
+ else {
+ /* Let TBB choose the number of threads. For (legacy) code that calss
+ * BLI_task_scheduler_num_threads() we provide the system thread count.
+ * Ideally such code should be rewritten not to use the number of threads
+ * at all. */
+ task_scheduler_num_threads = BLI_system_thread_count();
+ }
+#endif
+}
+
+void BLI_task_scheduler_exit()
+{
+#ifdef WITH_TBB
+ OBJECT_GUARDED_DELETE(task_scheduler_global_control, tbb::global_control);
+#endif
+}
+
+int BLI_task_scheduler_num_threads()
+{
+ return task_scheduler_num_threads;
+}
diff --git a/source/blender/blenlib/intern/threads.c b/source/blender/blenlib/intern/threads.c
index 31e8581590a..f535798f86d 100644
--- a/source/blender/blenlib/intern/threads.c
+++ b/source/blender/blenlib/intern/threads.c
@@ -61,9 +61,6 @@ extern pthread_key_t gomp_tls_key;
static void *thread_tls_data;
#endif
-/* We're using one global task scheduler for all kind of tasks. */
-static TaskScheduler *task_scheduler = NULL;
-
/* ********** basic thread control API ************
*
* Many thread cases have an X amount of jobs, and only an Y amount of
@@ -157,27 +154,9 @@ void BLI_threadapi_init(void)
void BLI_threadapi_exit(void)
{
- if (task_scheduler) {
- BLI_task_scheduler_free(task_scheduler);
- task_scheduler = NULL;
- }
BLI_spin_end(&_malloc_lock);
}
-TaskScheduler *BLI_task_scheduler_get(void)
-{
- if (task_scheduler == NULL) {
- int tot_thread = BLI_system_thread_count();
-
- /* Do a lazy initialization, so it happens after
- * command line arguments parsing
- */
- task_scheduler = BLI_task_scheduler_create(tot_thread);
- }
-
- return task_scheduler;
-}
-
/* tot = 0 only initializes malloc mutex in a safe way (see sequence.c)
* problem otherwise: scene render will kill of the mutex!
*/
@@ -839,11 +818,6 @@ void BLI_threaded_malloc_begin(void)
unsigned int level = atomic_fetch_and_add_u(&thread_levels, 1);
if (level == 0) {
MEM_set_lock_callback(BLI_lock_malloc_thread, BLI_unlock_malloc_thread);
- /* There is a little chance that two threads will need to access to a
- * scheduler which was not yet created from main thread. which could
- * cause scheduler created multiple times.
- */
- BLI_task_scheduler_get();
}
}