From 7046e37eded55ac8879835500788a1e734437bbf Mon Sep 17 00:00:00 2001 From: Jeroen Bakker Date: Tue, 23 Mar 2021 15:33:51 +0100 Subject: Cleanup: WorkScheduler. - Use constexpr for better readability. - Split in functions per backend. - Split work scheduler global struct in smaller structs. - Replaced std::vector with blender::Vector. - Removed threading defines in COM_defines.h --- .../blender/compositor/intern/COM_WorkScheduler.cc | 615 ++++++++++++++------- 1 file changed, 414 insertions(+), 201 deletions(-) (limited to 'source/blender/compositor/intern/COM_WorkScheduler.cc') diff --git a/source/blender/compositor/intern/COM_WorkScheduler.cc b/source/blender/compositor/intern/COM_WorkScheduler.cc index 5d3f232221f..2bc3ff936b1 100644 --- a/source/blender/compositor/intern/COM_WorkScheduler.cc +++ b/source/blender/compositor/intern/COM_WorkScheduler.cc @@ -25,66 +25,89 @@ #include "COM_WorkScheduler.h" #include "COM_WriteBufferOperation.h" #include "COM_compositor.h" + #include "clew.h" #include "MEM_guardedalloc.h" +#include "BLI_task.h" #include "BLI_threads.h" +#include "BLI_vector.hh" #include "PIL_time.h" #include "BKE_global.h" -#if COM_CURRENT_THREADING_MODEL == COM_TM_NOTHREAD -# ifndef DEBUG /* Test this so we don't get warnings in debug builds. */ -# warning COM_CURRENT_THREADING_MODEL COM_TM_NOTHREAD is activated. Use only for debugging. -# endif -#elif COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE -/* do nothing - default */ -#else -# error COM_CURRENT_THREADING_MODEL No threading model selected -#endif +enum class ThreadingModel { + /** Everything is executed in the caller thread. easy for debugging. */ + SingleThreaded, + /** Multi-threaded model, which uses the BLI_thread_queue pattern. */ + Queue, + /** Uses BLI_task as threading backend. */ + Task +}; + +/** + * Returns the active threading model. + * + * Default is `ThreadingModel::Queue`. + */ +constexpr ThreadingModel COM_threading_model() +{ + return ThreadingModel::Queue; +} + +/** + * Does the active threading model support opencl? + */ +constexpr bool COM_is_opencl_enabled() +{ + return COM_threading_model() != ThreadingModel::SingleThreaded; +} static ThreadLocal(CPUDevice *) g_thread_device; static struct { - /** \brief list of all CPUDevices. for every hardware thread an instance of CPUDevice is created - */ - std::vector cpu_devices; - -#if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE - /** \brief list of all thread for every CPUDevice in cpudevices a thread exists. */ - ListBase cpu_threads; - bool cpu_initialized = false; - /** \brief all scheduled work for the cpu */ - ThreadQueue *cpu_queue; - ThreadQueue *gpu_queue; -# ifdef COM_OPENCL_ENABLED - cl_context opencl_context; - cl_program opencl_program; - /** \brief list of all OpenCLDevices. for every OpenCL GPU device an instance of OpenCLDevice is - * created. */ - std::vector gpu_devices; - /** \brief list of all thread for every GPUDevice in cpudevices a thread exists. */ - ListBase gpu_threads; - /** \brief all scheduled work for the GPU. */ - bool opencl_active = false; - bool opencl_initialized = false; -# endif -#endif + struct { + /** \brief list of all CPUDevices. for every hardware thread an instance of CPUDevice is + * created + */ + blender::Vector devices; + + /** \brief list of all thread for every CPUDevice in cpudevices a thread exists. */ + ListBase threads; + bool initialized = false; + /** \brief all scheduled work for the cpu */ + ThreadQueue *queue; + } queue; + struct { + TaskPool *pool; + } task; + + struct { + ThreadQueue *queue; + cl_context context; + cl_program program; + /** \brief list of all OpenCLDevices. for every OpenCL GPU device an instance of OpenCLDevice + * is created. */ + blender::Vector devices; + /** \brief list of all thread for every GPUDevice in cpudevices a thread exists. */ + ListBase threads; + /** \brief all scheduled work for the GPU. */ + bool active = false; + bool initialized = false; + } opencl; } g_work_scheduler; -#if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE -static void *thread_execute_cpu(void *data) -{ - CPUDevice *device = (CPUDevice *)data; - WorkPackage *work; - BLI_thread_local_set(g_thread_device, device); - while ((work = (WorkPackage *)BLI_thread_queue_pop(g_work_scheduler.cpu_queue))) { - device->execute(work); - delete work; - } +/* -------------------------------------------------------------------- */ +/** \name OpenCL Scheduling + * \{ */ - return nullptr; +static void CL_CALLBACK clContextError(const char *errinfo, + const void * /*private_info*/, + size_t /*cb*/, + void * /*user_data*/) +{ + printf("OPENCL error: %s\n", errinfo); } static void *thread_execute_gpu(void *data) @@ -92,156 +115,69 @@ static void *thread_execute_gpu(void *data) Device *device = (Device *)data; WorkPackage *work; - while ((work = (WorkPackage *)BLI_thread_queue_pop(g_work_scheduler.gpu_queue))) { + while ((work = (WorkPackage *)BLI_thread_queue_pop(g_work_scheduler.opencl.queue))) { device->execute(work); delete work; } return nullptr; } -#endif - -void WorkScheduler::schedule(ExecutionGroup *group, int chunkNumber) -{ - WorkPackage *package = new WorkPackage(group, chunkNumber); -#if COM_CURRENT_THREADING_MODEL == COM_TM_NOTHREAD - CPUDevice device(0); - device.execute(package); - delete package; -#elif COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE -# ifdef COM_OPENCL_ENABLED - if (group->isOpenCL() && g_work_scheduler.opencl_active) { - BLI_thread_queue_push(g_work_scheduler.gpu_queue, package); - } - else { - BLI_thread_queue_push(g_work_scheduler.cpu_queue, package); - } -# else - BLI_thread_queue_push(g_work_scheduler.cpu_queue, package); -# endif -#endif -} -void WorkScheduler::start(CompositorContext &context) +static void opencl_start(CompositorContext &context) { -#if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE - unsigned int index; - g_work_scheduler.cpu_queue = BLI_thread_queue_init(); - BLI_threadpool_init( - &g_work_scheduler.cpu_threads, thread_execute_cpu, g_work_scheduler.cpu_devices.size()); - for (index = 0; index < g_work_scheduler.cpu_devices.size(); index++) { - Device *device = g_work_scheduler.cpu_devices[index]; - BLI_threadpool_insert(&g_work_scheduler.cpu_threads, device); - } -# ifdef COM_OPENCL_ENABLED if (context.getHasActiveOpenCLDevices()) { - g_work_scheduler.gpu_queue = BLI_thread_queue_init(); - BLI_threadpool_init( - &g_work_scheduler.gpu_threads, thread_execute_gpu, g_work_scheduler.gpu_devices.size()); - for (index = 0; index < g_work_scheduler.gpu_devices.size(); index++) { - Device *device = g_work_scheduler.gpu_devices[index]; - BLI_threadpool_insert(&g_work_scheduler.gpu_threads, device); + g_work_scheduler.opencl.queue = BLI_thread_queue_init(); + BLI_threadpool_init(&g_work_scheduler.opencl.threads, + thread_execute_gpu, + g_work_scheduler.opencl.devices.size()); + for (int index = 0; index < g_work_scheduler.opencl.devices.size(); index++) { + Device *device = g_work_scheduler.opencl.devices[index]; + BLI_threadpool_insert(&g_work_scheduler.opencl.threads, device); } - g_work_scheduler.opencl_active = true; + g_work_scheduler.opencl.active = true; } else { - g_work_scheduler.opencl_active = false; + g_work_scheduler.opencl.active = false; } -# endif -#endif } -void WorkScheduler::finish() + +static bool opencl_schedule(WorkPackage *package) { -#if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE -# ifdef COM_OPENCL_ENABLED - if (g_work_scheduler.opencl_active) { - BLI_thread_queue_wait_finish(g_work_scheduler.gpu_queue); - BLI_thread_queue_wait_finish(g_work_scheduler.cpu_queue); - } - else { - BLI_thread_queue_wait_finish(g_work_scheduler.cpu_queue); + if (package->execution_group->isOpenCL() && g_work_scheduler.opencl.active) { + BLI_thread_queue_push(g_work_scheduler.opencl.queue, package); + return true; } -# else - BLI_thread_queue_wait_finish(cpuqueue); -# endif -#endif + return false; } -void WorkScheduler::stop() + +static void opencl_finish() { -#if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE - BLI_thread_queue_nowait(g_work_scheduler.cpu_queue); - BLI_threadpool_end(&g_work_scheduler.cpu_threads); - BLI_thread_queue_free(g_work_scheduler.cpu_queue); - g_work_scheduler.cpu_queue = nullptr; -# ifdef COM_OPENCL_ENABLED - if (g_work_scheduler.opencl_active) { - BLI_thread_queue_nowait(g_work_scheduler.gpu_queue); - BLI_threadpool_end(&g_work_scheduler.gpu_threads); - BLI_thread_queue_free(g_work_scheduler.gpu_queue); - g_work_scheduler.gpu_queue = nullptr; + if (g_work_scheduler.opencl.active) { + BLI_thread_queue_wait_finish(g_work_scheduler.opencl.queue); } -# endif -#endif } -bool WorkScheduler::has_gpu_devices() +static void opencl_stop() { -#if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE -# ifdef COM_OPENCL_ENABLED - return !g_work_scheduler.gpu_devices.empty(); -# else - return false; -# endif -#else - return false; -#endif + if (g_work_scheduler.opencl.active) { + BLI_thread_queue_nowait(g_work_scheduler.opencl.queue); + BLI_threadpool_end(&g_work_scheduler.opencl.threads); + BLI_thread_queue_free(g_work_scheduler.opencl.queue); + g_work_scheduler.opencl.queue = nullptr; + } } -#if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE -static void CL_CALLBACK clContextError(const char *errinfo, - const void * /*private_info*/, - size_t /*cb*/, - void * /*user_data*/) +static bool opencl_has_gpu_devices() { - printf("OPENCL error: %s\n", errinfo); + return !g_work_scheduler.opencl.devices.is_empty(); } -#endif -void WorkScheduler::initialize(bool use_opencl, int num_cpu_threads) +static void opencl_initialize(const bool use_opencl) { -#if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE - /* deinitialize if number of threads doesn't match */ - if (g_work_scheduler.cpu_devices.size() != num_cpu_threads) { - Device *device; - - while (!g_work_scheduler.cpu_devices.empty()) { - device = g_work_scheduler.cpu_devices.back(); - g_work_scheduler.cpu_devices.pop_back(); - device->deinitialize(); - delete device; - } - if (g_work_scheduler.cpu_initialized) { - BLI_thread_local_delete(g_thread_device); - } - g_work_scheduler.cpu_initialized = false; - } - - /* initialize CPU threads */ - if (!g_work_scheduler.cpu_initialized) { - for (int index = 0; index < num_cpu_threads; index++) { - CPUDevice *device = new CPUDevice(index); - device->initialize(); - g_work_scheduler.cpu_devices.push_back(device); - } - BLI_thread_local_create(g_thread_device); - g_work_scheduler.cpu_initialized = true; - } - -# ifdef COM_OPENCL_ENABLED /* deinitialize OpenCL GPU's */ - if (use_opencl && !g_work_scheduler.opencl_initialized) { - g_work_scheduler.opencl_context = nullptr; - g_work_scheduler.opencl_program = nullptr; + if (use_opencl && !g_work_scheduler.opencl.initialized) { + g_work_scheduler.opencl.context = nullptr; + g_work_scheduler.opencl.program = nullptr; /* This will check for errors and skip if already initialized. */ if (clewInit() != CLEW_SUCCESS) { @@ -276,15 +212,15 @@ void WorkScheduler::initialize(bool use_opencl, int num_cpu_threads) sizeof(cl_device_id) * numberOfDevices, __func__); clGetDeviceIDs(platform, CL_DEVICE_TYPE_GPU, numberOfDevices, cldevices, nullptr); - g_work_scheduler.opencl_context = clCreateContext( + g_work_scheduler.opencl.context = clCreateContext( nullptr, numberOfDevices, cldevices, clContextError, nullptr, &error); if (error != CL_SUCCESS) { printf("CLERROR[%d]: %s\n", error, clewErrorString(error)); } const char *cl_str[2] = {datatoc_COM_OpenCLKernels_cl, nullptr}; - g_work_scheduler.opencl_program = clCreateProgramWithSource( - g_work_scheduler.opencl_context, 1, cl_str, nullptr, &error); - error = clBuildProgram(g_work_scheduler.opencl_program, + g_work_scheduler.opencl.program = clCreateProgramWithSource( + g_work_scheduler.opencl.context, 1, cl_str, nullptr, &error); + error = clBuildProgram(g_work_scheduler.opencl.program, numberOfDevices, cldevices, nullptr, @@ -294,7 +230,7 @@ void WorkScheduler::initialize(bool use_opencl, int num_cpu_threads) cl_int error2; size_t ret_val_size = 0; printf("CLERROR[%d]: %s\n", error, clewErrorString(error)); - error2 = clGetProgramBuildInfo(g_work_scheduler.opencl_program, + error2 = clGetProgramBuildInfo(g_work_scheduler.opencl.program, cldevices[0], CL_PROGRAM_BUILD_LOG, 0, @@ -304,7 +240,7 @@ void WorkScheduler::initialize(bool use_opencl, int num_cpu_threads) printf("CLERROR[%d]: %s\n", error, clewErrorString(error)); } char *build_log = (char *)MEM_mallocN(sizeof(char) * ret_val_size + 1, __func__); - error2 = clGetProgramBuildInfo(g_work_scheduler.opencl_program, + error2 = clGetProgramBuildInfo(g_work_scheduler.opencl.program, cldevices[0], CL_PROGRAM_BUILD_LOG, ret_val_size, @@ -327,12 +263,12 @@ void WorkScheduler::initialize(bool use_opencl, int num_cpu_threads) if (error2 != CL_SUCCESS) { printf("CLERROR[%d]: %s\n", error2, clewErrorString(error2)); } - OpenCLDevice *clDevice = new OpenCLDevice(g_work_scheduler.opencl_context, + OpenCLDevice *clDevice = new OpenCLDevice(g_work_scheduler.opencl.context, device, - g_work_scheduler.opencl_program, + g_work_scheduler.opencl.program, vendorID); clDevice->initialize(); - g_work_scheduler.gpu_devices.push_back(clDevice); + g_work_scheduler.opencl.devices.append(clDevice); } } MEM_freeN(cldevices); @@ -340,55 +276,332 @@ void WorkScheduler::initialize(bool use_opencl, int num_cpu_threads) MEM_freeN(platforms); } - g_work_scheduler.opencl_initialized = true; + g_work_scheduler.opencl.initialized = true; } -# endif -#endif } -void WorkScheduler::deinitialize() +static void opencl_deinitialize() { -#if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE - /* deinitialize CPU threads */ - if (g_work_scheduler.cpu_initialized) { + /* Deinitialize OpenCL GPU's. */ + if (g_work_scheduler.opencl.initialized) { Device *device; - while (!g_work_scheduler.cpu_devices.empty()) { - device = g_work_scheduler.cpu_devices.back(); - g_work_scheduler.cpu_devices.pop_back(); + while (!g_work_scheduler.opencl.devices.is_empty()) { + device = g_work_scheduler.opencl.devices.pop_last(); device->deinitialize(); delete device; } - BLI_thread_local_delete(g_thread_device); - g_work_scheduler.cpu_initialized = false; + if (g_work_scheduler.opencl.program) { + clReleaseProgram(g_work_scheduler.opencl.program); + g_work_scheduler.opencl.program = nullptr; + } + if (g_work_scheduler.opencl.context) { + clReleaseContext(g_work_scheduler.opencl.context); + g_work_scheduler.opencl.context = nullptr; + } + + g_work_scheduler.opencl.initialized = false; } +} -# ifdef COM_OPENCL_ENABLED - /* deinitialize OpenCL GPU's */ - if (g_work_scheduler.opencl_initialized) { +/* \} */ + +/* -------------------------------------------------------------------- */ +/** \name Single threaded Scheduling + * \{ */ + +static void threading_model_single_thread_execute(WorkPackage *package) +{ + CPUDevice device(0); + device.execute(package); + delete package; +} + +/* \} */ + +/* -------------------------------------------------------------------- */ +/** \name Queue Scheduling + * \{ */ + +static void *threading_model_queue_execute(void *data) +{ + CPUDevice *device = (CPUDevice *)data; + WorkPackage *work; + BLI_thread_local_set(g_thread_device, device); + while ((work = (WorkPackage *)BLI_thread_queue_pop(g_work_scheduler.queue.queue))) { + device->execute(work); + delete work; + } + + return nullptr; +} + +static void threading_model_queue_schedule(WorkPackage *package) +{ + BLI_thread_queue_push(g_work_scheduler.queue.queue, package); +} + +static void threading_model_queue_start() +{ + g_work_scheduler.queue.queue = BLI_thread_queue_init(); + BLI_threadpool_init(&g_work_scheduler.queue.threads, + threading_model_queue_execute, + g_work_scheduler.queue.devices.size()); + for (int index = 0; index < g_work_scheduler.queue.devices.size(); index++) { + Device *device = g_work_scheduler.queue.devices[index]; + BLI_threadpool_insert(&g_work_scheduler.queue.threads, device); + } +} + +static void threading_model_queue_finish() +{ + BLI_thread_queue_wait_finish(g_work_scheduler.queue.queue); +} + +static void threading_model_queue_stop() +{ + BLI_thread_queue_nowait(g_work_scheduler.queue.queue); + BLI_threadpool_end(&g_work_scheduler.queue.threads); + BLI_thread_queue_free(g_work_scheduler.queue.queue); + g_work_scheduler.queue.queue = nullptr; +} + +static void threading_model_queue_initialize(const int num_cpu_threads) +{ + /* Reinitialize if number of threads doesn't match. */ + if (g_work_scheduler.queue.devices.size() != num_cpu_threads) { Device *device; - while (!g_work_scheduler.gpu_devices.empty()) { - device = g_work_scheduler.gpu_devices.back(); - g_work_scheduler.gpu_devices.pop_back(); + + while (!g_work_scheduler.queue.devices.is_empty()) { + device = g_work_scheduler.queue.devices.pop_last(); device->deinitialize(); delete device; } - if (g_work_scheduler.opencl_program) { - clReleaseProgram(g_work_scheduler.opencl_program); - g_work_scheduler.opencl_program = nullptr; + if (g_work_scheduler.queue.initialized) { + BLI_thread_local_delete(g_thread_device); + } + g_work_scheduler.queue.initialized = false; + } + + /* Initialize CPU threads. */ + if (!g_work_scheduler.queue.initialized) { + for (int index = 0; index < num_cpu_threads; index++) { + CPUDevice *device = new CPUDevice(index); + device->initialize(); + g_work_scheduler.queue.devices.append(device); } - if (g_work_scheduler.opencl_context) { - clReleaseContext(g_work_scheduler.opencl_context); - g_work_scheduler.opencl_context = nullptr; + BLI_thread_local_create(g_thread_device); + g_work_scheduler.queue.initialized = true; + } +} +static void threading_model_queue_deinitialize() +{ + /* deinitialize CPU threads */ + if (g_work_scheduler.queue.initialized) { + Device *device; + while (!g_work_scheduler.queue.devices.is_empty()) { + device = g_work_scheduler.queue.devices.pop_last(); + device->deinitialize(); + delete device; } + BLI_thread_local_delete(g_thread_device); + g_work_scheduler.queue.initialized = false; + } +} + +/* \} */ + +/* -------------------------------------------------------------------- */ +/** \name Task Scheduling + * \{ */ + +static void threading_model_task_execute(TaskPool *__restrict UNUSED(pool), void *task_data) +{ + WorkPackage *package = static_cast(task_data); + CPUDevice device(BLI_task_parallel_thread_id(nullptr)); + BLI_thread_local_set(g_thread_device, &device); + device.execute(package); + delete package; +} + +static void threading_model_task_schedule(WorkPackage *package) +{ + BLI_task_pool_push( + g_work_scheduler.task.pool, threading_model_task_execute, package, false, nullptr); +} + +static void threading_model_task_start() +{ + BLI_thread_local_create(g_thread_device); + g_work_scheduler.task.pool = BLI_task_pool_create(nullptr, TASK_PRIORITY_HIGH); +} + +static void threading_model_task_finish() +{ + BLI_task_pool_work_and_wait(g_work_scheduler.task.pool); +} + +static void threading_model_task_stop() +{ + BLI_task_pool_free(g_work_scheduler.task.pool); + g_work_scheduler.task.pool = nullptr; + BLI_thread_local_delete(g_thread_device); +} - g_work_scheduler.opencl_initialized = false; +/* \} */ + +/* -------------------------------------------------------------------- */ +/** \name Public API + * \{ */ + +void WorkScheduler::schedule(ExecutionGroup *group, int chunkNumber) +{ + WorkPackage *package = new WorkPackage(group, chunkNumber); + + if (COM_is_opencl_enabled()) { + if (opencl_schedule(package)) { + return; + } + } + + switch (COM_threading_model()) { + case ThreadingModel::SingleThreaded: { + threading_model_single_thread_execute(package); + break; + } + + case ThreadingModel::Queue: { + threading_model_queue_schedule(package); + break; + } + + case ThreadingModel::Task: { + threading_model_task_schedule(package); + break; + } + } +} + +void WorkScheduler::start(CompositorContext &context) +{ + if (COM_is_opencl_enabled()) { + opencl_start(context); + } + + switch (COM_threading_model()) { + case ThreadingModel::SingleThreaded: + /* Nothing to do. */ + break; + + case ThreadingModel::Queue: + threading_model_queue_start(); + break; + + case ThreadingModel::Task: + threading_model_task_start(); + break; + } +} + +void WorkScheduler::finish() +{ + if (COM_is_opencl_enabled()) { + opencl_finish(); + } + + switch (COM_threading_model()) { + case ThreadingModel::SingleThreaded: + /* Nothing to do. */ + break; + + case ThreadingModel::Queue: + threading_model_queue_finish(); + break; + + case ThreadingModel::Task: + threading_model_task_finish(); + break; + } +} + +void WorkScheduler::stop() +{ + if (COM_is_opencl_enabled()) { + opencl_stop(); + } + + switch (COM_threading_model()) { + case ThreadingModel::SingleThreaded: + /* Nothing to do. */ + break; + + case ThreadingModel::Queue: + threading_model_queue_stop(); + break; + + case ThreadingModel::Task: + threading_model_task_stop(); + break; + } +} + +bool WorkScheduler::has_gpu_devices() +{ + if (COM_is_opencl_enabled()) { + return opencl_has_gpu_devices(); + } + return false; +} + +void WorkScheduler::initialize(bool use_opencl, int num_cpu_threads) +{ + if (COM_is_opencl_enabled()) { + opencl_initialize(use_opencl); + } + + switch (COM_threading_model()) { + case ThreadingModel::SingleThreaded: + /* Nothing to do. */ + break; + + case ThreadingModel::Queue: + threading_model_queue_initialize(num_cpu_threads); + break; + + case ThreadingModel::Task: + /* Nothing to do. */ + break; + } +} + +void WorkScheduler::deinitialize() +{ + if (COM_is_opencl_enabled()) { + opencl_deinitialize(); + } + + switch (COM_threading_model()) { + case ThreadingModel::SingleThreaded: + /* Nothing to do. */ + break; + + case ThreadingModel::Queue: + threading_model_queue_deinitialize(); + break; + + case ThreadingModel::Task: + /* Nothing to do. */ + break; } -# endif -#endif } int WorkScheduler::current_thread_id() { + if (COM_threading_model() == ThreadingModel::SingleThreaded) { + return 0; + } + CPUDevice *device = (CPUDevice *)BLI_thread_local_get(g_thread_device); return device->thread_id(); } + +/* \} */ -- cgit v1.2.3