From 315522698efe62d1d97957e5a8fd992a7b6ecf10 Mon Sep 17 00:00:00 2001 From: "Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com)" Date: Thu, 25 Feb 2021 13:07:28 +0000 Subject: Lots of performance tuning of native Linux implementation, but still 5-10% slower than libdispatch. --- .../v2.0/detail/impl/dynamic_thread_pool_group.ipp | 182 +++++++++++++-------- 1 file changed, 114 insertions(+), 68 deletions(-) (limited to 'include') diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp index 41975697..c26a7945 100644 --- a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp +++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp @@ -336,9 +336,9 @@ namespace detail struct global_dynamic_thread_pool_impl { std::mutex workqueue_lock; - using _lock_guard = std::unique_lock; std::vector workqueue; #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD + using _lock_guard = std::unique_lock; using threadh_type = void *; using grouph_type = dispatch_group_t; static void _gcd_dispatch_callback(void *arg) @@ -352,6 +352,7 @@ namespace detail global_dynamic_thread_pool()._timerthread(workitem, nullptr); } #elif defined(_WIN32) + using _lock_guard = std::unique_lock; using threadh_type = PTP_CALLBACK_INSTANCE; using grouph_type = PTP_CALLBACK_ENVIRON; static void CALLBACK _win32_worker_thread_callback(threadh_type threadh, PVOID Parameter, PTP_WORK /*unused*/) @@ -365,6 +366,44 @@ namespace detail global_dynamic_thread_pool()._timerthread(workitem, threadh); } #else + using _lock_guard = std::unique_lock; +#if 0 + class _lock_guard + { + std::mutex &_l; + bool _is_locked{false}; + _lock_guard(std::mutex &l) + : _l(l) + { + lock(); + } + _lock_guard(const _lock_guard &) = delete; + _lock_guard(_lock_guard &&) = delete; + _lock_guard &operator=(const _lock_guard &) = delete; + _lock_guard &operator=(_lock_guard &&) = delete; + ~_lock_guard() + { + if(_is_locked) + { + unlock(); + } + } + void lock() + { + assert(!_is_locked); + global_dynamic_thread_pool_threads_awaiting_mutex.fetch_add(1, std::memory_order_relaxed); + _l.lock(); + _is_locked = true; + } + void unlock() + { + assert(_is_locked); + global_dynamic_thread_pool_threads_awaiting_mutex.fetch_sub(1, std::memory_order_relaxed); + _l.unlock(); + _is_locked = false; + } + }; +#endif using threadh_type = void *; using grouph_type = void *; struct thread_t @@ -380,8 +419,8 @@ namespace detail size_t count{0}; thread_t *front{nullptr}, *back{nullptr}; } threadpool_active, threadpool_sleeping; - std::atomic total_submitted_workitems{0}, threadpool_threads{0}, threadpool_sleeping_count{0}; - std::atomic ms_sleep_for_more_work{30000}; // TODO put back to 60000 + std::atomic total_submitted_workitems{0}, threadpool_threads{0}; + std::atomic ms_sleep_for_more_work{20000}; std::mutex threadmetrics_lock; struct threadmetrics_threadid @@ -426,7 +465,7 @@ namespace detail } threadmetrics_queue; // items at front are least recently updated std::vector threadmetrics_sorted; // sorted by threadid std::chrono::steady_clock::time_point threadmetrics_last_updated; - std::atomic update_threadmetrics_reentrancy{false}; + std::atomic populate_threadmetrics_reentrancy{0}; #ifdef __linux__ std::mutex proc_self_task_fd_lock; int proc_self_task_fd{-1}; @@ -609,6 +648,7 @@ namespace detail } #ifdef __linux__ + // You are guaranteed only one of these EVER executes at a time. Locking is probably overkill, but equally also probably harmless bool update_threadmetrics(_lock_guard &&g, std::chrono::steady_clock::time_point now, threadmetrics_item *new_items) { auto update_item = [&](threadmetrics_item *item) { @@ -684,56 +724,46 @@ namespace detail return false; } size_t updated = 0; - while(now - threadmetrics_queue.front->last_updated >= std::chrono::milliseconds(100) && updated++ < 10) + while(now - threadmetrics_queue.front->last_updated >= std::chrono::milliseconds(200) && updated++ < 4) { auto *p = threadmetrics_queue.front; update_item(p); _remove_from_list(threadmetrics_queue, p); _append_to_list(threadmetrics_queue, p); } - if(updated > 0) + // if(updated > 0) { static const auto min_hardware_concurrency = std::thread::hardware_concurrency(); - static const auto max_hardware_concurrency = min_hardware_concurrency + (min_hardware_concurrency >> 1); - auto toadd = std::max((ssize_t) 0, std::min((ssize_t) min_hardware_concurrency - (ssize_t) threadmetrics_queue.running, - (ssize_t) total_submitted_workitems.load(std::memory_order_relaxed) - - (ssize_t) threadpool_threads.load(std::memory_order_relaxed))); - auto toremove = std::max((ssize_t) 0, (ssize_t) threadmetrics_queue.running - (ssize_t) max_hardware_concurrency); - // std::cout << "Threadmetrics toadd = " << (toadd - (ssize_t) threadpool_sleeping.count) << " toremove = " << toremove - // << " running = " << threadmetrics_queue.running << " blocked = " << threadmetrics_queue.blocked << " total = " << threadmetrics_queue.count - // << ". Actual active = " << threadpool_active.count << " sleeping = " << threadpool_sleeping.count - // << ". Current working threads = " << threadpool_threads.load(std::memory_order_relaxed) - // << ". Current submitted work items = " << total_submitted_workitems.load(std::memory_order_relaxed) << std::endl; + static const auto max_hardware_concurrency = min_hardware_concurrency + 3; + auto threadmetrics_running = (ssize_t) threadmetrics_queue.running; + auto threadmetrics_blocked = (ssize_t) threadmetrics_queue.blocked; + g.unlock(); // drop threadmetrics_lock + + _lock_guard gg(workqueue_lock); + // Adjust for the number of threads sleeping for more work + threadmetrics_running += threadpool_sleeping.count; + threadmetrics_blocked -= threadpool_sleeping.count; + if(threadmetrics_blocked < 0) + { + threadmetrics_blocked = 0; + } + const auto desired_concurrency = std::min((ssize_t) min_hardware_concurrency, (ssize_t) total_submitted_workitems.load(std::memory_order_relaxed)); + auto toadd = std::max((ssize_t) 0, std::min(desired_concurrency - threadmetrics_running, desired_concurrency - (ssize_t) threadpool_active.count)); + auto toremove = std::max((ssize_t) 0, (ssize_t) threadmetrics_running - (ssize_t) max_hardware_concurrency); if(toadd > 0 || toremove > 0) { - if(update_threadmetrics_reentrancy.exchange(true, std::memory_order_relaxed)) - { - return false; - } - auto unupdate_threadmetrics_reentrancy = - make_scope_exit([this]() noexcept { update_threadmetrics_reentrancy.store(false, std::memory_order_relaxed); }); - g.unlock(); - _lock_guard gg(workqueue_lock); - toadd -= (ssize_t) threadpool_sleeping.count; - std::cout << "total active = " << threadpool_active.count << " total idle = " << threadpool_sleeping.count << " toadd = " << toadd - << " toremove = " << toremove << std::endl; - for(; toadd > 0; toadd--) + //std::cout << "total active = " << threadpool_active.count << " total idle = " << threadpool_sleeping.count + // << " threadmetrics_running = " << threadmetrics_running << " threadmetrics_blocked = " << threadmetrics_blocked << " toadd = " << toadd + // << " toremove = " << toremove << std::endl; + if(toadd > 0) { _add_thread(gg); } - for(; toremove > 0 && threadpool_sleeping.count > 0; toremove--) - { - if(!_remove_thread(gg, threadpool_sleeping)) - { - break; - } - } if(toremove > 0 && threadpool_active.count > 1) { - // Kill myself, but not if I'm the final thread who needs to run timers + // Kill myself, but not if I'm the final thread who might need to run timers return true; } - return false; } } return false; @@ -741,6 +771,12 @@ namespace detail // Returns true if calling thread is to exit bool populate_threadmetrics(std::chrono::steady_clock::time_point now) { + if(populate_threadmetrics_reentrancy.exchange(1, std::memory_order_relaxed) == 1) + { + return false; + } + auto unpopulate_threadmetrics_reentrancy = make_scope_exit([this]() noexcept { populate_threadmetrics_reentrancy.store(0, std::memory_order_relaxed); }); + static thread_local std::vector kernelbuffer(1024); static thread_local std::vector threadidsbuffer(1024 / sizeof(dirent)); using getdents64_t = int (*)(int, char *, unsigned int); @@ -749,9 +785,13 @@ namespace detail size_t bytes = 0; { _lock_guard g(threadmetrics_lock); - if(now - threadmetrics_last_updated < std::chrono::milliseconds(100) && + if(now - threadmetrics_last_updated < std::chrono::milliseconds(250) && threadmetrics_queue.running + threadmetrics_queue.blocked >= threadpool_threads.load(std::memory_order_relaxed)) { + if(now - threadmetrics_last_updated < std::chrono::milliseconds(100)) + { + return false; + } return update_threadmetrics(std::move(g), now, nullptr); } threadmetrics_last_updated = now; @@ -909,7 +949,7 @@ namespace detail ++s_it; } assert(threadmetrics_sorted.size() == threadidsbuffer.size()); -#if 1 +#if 0 if(!std::is_sorted(threadmetrics_sorted.begin(), threadmetrics_sorted.end(), [](threadmetrics_item *a, threadmetrics_item *b) { return a->threadid < b->threadid; })) { @@ -1236,6 +1276,7 @@ namespace detail bool workitem_is_timer = false; std::chrono::steady_clock::time_point now_steady, earliest_duration; std::chrono::system_clock::time_point now_system, earliest_absolute; + // Start from highest priority work group, executing any timers due before selecting a work item for(auto it = workqueue.rbegin(); it != workqueue.rend() && workitem == nullptr; ++it) { auto &wq = *it; @@ -1279,16 +1320,25 @@ namespace detail { now_steady = std::chrono::steady_clock::now(); } + // If there are no timers, and no work to do, time to either die or sleep if(workitem == nullptr) { const std::chrono::steady_clock::duration max_sleep(std::chrono::milliseconds(ms_sleep_for_more_work.load(std::memory_order_relaxed))); if(now_steady - self->last_did_work >= max_sleep) { - _remove_from_list(threadpool_active, self); - threadpool_threads.fetch_sub(1, std::memory_order_release); - self->thread.detach(); - delete self; - return; + // If there are any timers running, leave at least one worker thread + if(threadpool_active.count > 1 || + (earliest_duration == std::chrono::steady_clock::time_point() && earliest_absolute == std::chrono::system_clock::time_point())) + { + _remove_from_list(threadpool_active, self); + threadpool_threads.fetch_sub(1, std::memory_order_release); +#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " exits due to no new work for ms_sleep_for_more_work" << std::endl; +#endif + self->thread.detach(); + delete self; + return; + } } std::chrono::steady_clock::duration duration(max_sleep); if(earliest_duration != std::chrono::steady_clock::time_point()) @@ -1313,7 +1363,6 @@ namespace detail _remove_from_list(threadpool_active, self); _append_to_list(threadpool_sleeping, self); self->state--; - threadpool_sleeping_count.fetch_add(1, std::memory_order_release); if(earliest_absolute != std::chrono::system_clock::time_point()) { #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING @@ -1331,7 +1380,6 @@ namespace detail self->state++; _remove_from_list(threadpool_sleeping, self); _append_to_list(threadpool_active, self); - threadpool_sleeping_count.fetch_sub(1, std::memory_order_release); #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING std::cout << "*** DTP " << self << " wakes, state = " << self->state << std::endl; #endif @@ -1367,6 +1415,9 @@ namespace detail { _remove_from_list(threadpool_active, self); threadpool_threads.fetch_sub(1, std::memory_order_release); +#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " exits due to threadmetrics saying we exceed max concurrency" << std::endl; +#endif self->thread.detach(); delete self; return; @@ -1379,8 +1430,8 @@ namespace detail } self->state -= 2; // dead threadpool_threads.fetch_sub(1, std::memory_order_release); -#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING - std::cout << "*** DTP " << self << " exits, state = " << self->state << std::endl; +#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " exits due to state request, state = " << self->state << std::endl; #endif } #endif @@ -1525,32 +1576,27 @@ namespace detail const auto active_work_items = total_submitted_workitems.fetch_add(1, std::memory_order_relaxed) + 1; if(!defer_pool_wake) { - const auto sleeping_count = threadpool_sleeping_count.load(std::memory_order_relaxed); - const auto threads = threadpool_threads.load(std::memory_order_relaxed); - if(sleeping_count > 0 || threads == 0) + g.unlock(); // unlock group { - g.unlock(); // unlock group + _lock_guard gg(workqueue_lock); // lock global + if(threadpool_active.count == 0 && threadpool_sleeping.count == 0) { - _lock_guard gg(workqueue_lock); // lock global - if(threadpool_active.count == 0 && threadpool_sleeping.count == 0) - { - _add_thread(gg); - } - else if(threadpool_sleeping.count > 0 && active_work_items > threadpool_active.count) + _add_thread(gg); + } + else if(threadpool_sleeping.count > 0 && active_work_items > threadpool_active.count) + { + // Try to wake the most recently slept first + auto *t = threadpool_sleeping.back; + auto now = std::chrono::steady_clock::now(); + for(size_t n = std::min(active_work_items - threadpool_active.count, threadpool_sleeping.count); n > 0; n--) { - // Try to wake the most recently slept first - auto *t = threadpool_sleeping.back; - auto now = std::chrono::steady_clock::now(); - for(size_t n = std::min(active_work_items - threadpool_active.count, threadpool_sleeping.count); n > 0; n--) - { - t->last_did_work = now; // prevent reap - t->cond.notify_one(); - t = t->_prev; - } + t->last_did_work = now; // prevent reap + t->cond.notify_one(); + t = t->_prev; } } - g.lock(); // lock group } + g.lock(); // lock group } #endif } -- cgit v1.2.3