From 4550edca2ecb7c4660b8516ef2e9d14f5dd198a4 Mon Sep 17 00:00:00 2001 From: "Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com)" Date: Wed, 10 Mar 2021 11:39:22 +0000 Subject: Rework native Linux dynamic_thread_pool_group to no longer have a single mutex around work calculation and dispatch. Native Linux backend is currently not working. --- .../v2.0/detail/impl/dynamic_thread_pool_group.ipp | 544 ++++++++++++++------- 1 file changed, 361 insertions(+), 183 deletions(-) diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp index c26a7945..c22cae35 100644 --- a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp +++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp @@ -27,7 +27,10 @@ Distributed under the Boost Software License, Version 1.0. #include "../../file_handle.hpp" #include "../../statfs.hpp" +#include "quickcpplib/spinlock.hpp" + #include +#include #include #include #include @@ -183,100 +186,238 @@ Benchmarking llfio (Win32 thread pool (Vista+)) ... For 1024 work items got 66416.2 SHA256 hashes/sec with 33 maximum concurrency. */ +/* The Win32 thread pool numbers match those for ASIO on Windows for 64Kb SHA256 +so the base implementation is probably good enough. + +1. Need multiple work queues, with speculative locking for insert/remove. + +2. Pumping timers needs to not be in work queue loop: + + - If there is a waiting thread, it can pump timers. + + - Otherwise a separate timer thread would need to be launched. + +3. List counts for doubly linked lists need optional atomic count, so add fake +atomic type. + +*/ + LLFIO_V2_NAMESPACE_BEGIN namespace detail { + struct dynamic_thread_pool_group_impl_guard : std::unique_lock + { + using std::unique_lock::unique_lock; + }; + template class fake_atomic + { + T _v; + + public: + constexpr fake_atomic(T v) + : _v(v) + { + } + T load(std::memory_order /*unused*/) const { return _v; } + void store(T v, std::memory_order /*unused*/) { _v = v; } + T fetch_add(T v, std::memory_order /*unused*/) + { + _v += v; + return _v - v; + } + T fetch_sub(T v, std::memory_order /*unused*/) + { + _v -= v; + return _v + v; + } + }; struct global_dynamic_thread_pool_impl_workqueue_item { - std::unordered_set items; + const size_t nesting_level; + std::shared_ptr next; + std::unordered_set items; // Do NOT use without holding workqueue_lock + + explicit global_dynamic_thread_pool_impl_workqueue_item(size_t _nesting_level, std::shared_ptr &&preceding) + : nesting_level(_nesting_level) + , next(preceding) + { + } + #if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) - dynamic_thread_pool_group::work_item *_next_active_front{nullptr}, *_next_timer_relative_front{nullptr}, *_next_timer_absolute_front{nullptr}; - dynamic_thread_pool_group::work_item *_next_active_back{nullptr}, *_next_timer_relative_back{nullptr}, *_next_timer_absolute_back{nullptr}; + static constexpr unsigned TOTAL_NEXTACTIVES = 4; + struct next_active_base_t + { + std::atomic count{0}; + QUICKCPPLIB_NAMESPACE::configurable_spinlock::spinlock lock; + dynamic_thread_pool_group::work_item *front{nullptr}, *back{nullptr}; + + next_active_base_t() = default; + next_active_base_t(const next_active_base_t &o) + : count(o.count.load(std::memory_order_relaxed)) + , front(o.front) + , back(o.back) + { + } + }; + struct alignas(64) next_active_work_t : next_active_base_t + { + char _padding[64 - sizeof(next_active_base_t)]; // 40 bytes? + } next_actives[TOTAL_NEXTACTIVES]; + static_assert(sizeof(next_active_work_t) == 64, "next_active_work_t is not a cacheline"); + next_active_base_t next_timer_relative, next_timer_absolute; - dynamic_thread_pool_group::work_item *next_active() + dynamic_thread_pool_group::work_item *next_active(size_t threadidx) { - auto *ret = _next_active_front; - if(ret == nullptr) + threadidx &= ~(TOTAL_NEXTACTIVES - 1); + const size_t original_threadidx = threadidx; + bool all_empty = true; + for(;;) { - assert(_next_active_back == nullptr); - return nullptr; + next_active_base_t &x = next_actives[threadidx]; + if(x.count.load(std::memory_order_relaxed) > 0) + { + all_empty = false; + if(x.lock.try_lock()) + { + auto *ret = x.front; + if(ret != nullptr) + { + x.front = ret->_next_scheduled; + x.count.fetch_sub(1, std::memory_order_relaxed); + if(x.front == nullptr) + { + assert(x.back == ret); + x.back = nullptr; + } + ret->_next_scheduled = nullptr; + return ret; + } + } + x.lock.unlock(); + } + if(++threadidx >= TOTAL_NEXTACTIVES) + { + threadidx = 0; + } + if(threadidx == original_threadidx) + { + if(all_empty) + { + return nullptr; + } + all_empty = true; + } } - _next_active_front = ret->_next_scheduled; - if(_next_active_front == nullptr) + } + + private: + next_active_base_t &_choose_next_active() + { + unsigned idx = (unsigned) -1, max_count = (unsigned) -1; + for(unsigned n = 0; n < TOTAL_NEXTACTIVES; n++) { - assert(_next_active_back == ret); - _next_active_back = nullptr; + auto c = next_actives[n].count.load(std::memory_order_relaxed); + if(c < max_count) + { + idx = n; + max_count = c; + } + } + for(;;) + { + if(next_actives[idx].lock.try_lock()) + { + return next_actives[idx]; + } + if(++idx >= TOTAL_NEXTACTIVES) + { + idx = 0; + } } - ret->_next_scheduled = nullptr; - return ret; } + + public: void append_active(dynamic_thread_pool_group::work_item *p) { - if(_next_active_back == nullptr) + next_active_base_t &x = _choose_next_active(); + x.count.fetch_add(1, std::memory_order_relaxed); + if(x.back == nullptr) { - assert(_next_active_front == nullptr); - _next_active_front = _next_active_back = p; + assert(x.front == nullptr); + x.front = x.back = p; + x.lock.unlock(); return; } p->_next_scheduled = nullptr; - _next_active_back->_next_scheduled = p; - _next_active_back = p; + x.back->_next_scheduled = p; + x.back = p; + x.lock.unlock(); } void prepend_active(dynamic_thread_pool_group::work_item *p) { - if(_next_active_front == nullptr) + next_active_base_t &x = _choose_next_active(); + x.count.fetch_add(1, std::memory_order_relaxed); + if(x.front == nullptr) { - assert(_next_active_back == nullptr); - _next_active_front = _next_active_back = p; + assert(x.back == nullptr); + x.front = x.back = p; + x.lock.unlock(); return; } - p->_next_scheduled = _next_active_front; - _next_active_front = p; + p->_next_scheduled = x.front; + x.front = p; + x.lock.unlock(); } - dynamic_thread_pool_group::work_item *next_timer(int which) + // x must be LOCKED on entry + template dynamic_thread_pool_group::work_item *next_timer() { if(which == 0) { return nullptr; } - auto *&front = (which == 1) ? _next_timer_relative_front : _next_timer_absolute_front; - auto *&back = (which == 1) ? _next_timer_relative_back : _next_timer_absolute_back; - auto *ret = front; + next_active_base_t &x = (which == 1) ? next_timer_relative : next_timer_absolute; + // x.lock.lock(); + auto *ret = x.front; if(ret == nullptr) { - assert(back == nullptr); + assert(x.back == nullptr); + x.lock.unlock(); return nullptr; } - front = ret->_next_scheduled; - if(front == nullptr) + x.front = ret->_next_scheduled; + if(x.front == nullptr) { - assert(back == ret); - back = nullptr; + assert(x.back == ret); + x.back = nullptr; } ret->_next_scheduled = nullptr; + x.count.fetch_sub(1, std::memory_order_relaxed); + x.lock.unlock(); return ret; } void append_timer(dynamic_thread_pool_group::work_item *i) { if(i->_timepoint1 != std::chrono::steady_clock::time_point()) { - if(_next_timer_relative_front == nullptr) + next_timer_relative.lock.lock(); + next_timer_relative.count.fetch_add(1, std::memory_order_relaxed); + if(next_timer_relative.front == nullptr) { - _next_timer_relative_front = _next_timer_relative_back = i; + next_timer_relative.front = next_timer_relative.back = i; } else { bool done = false; - for(dynamic_thread_pool_group::work_item *p = nullptr, *n = _next_timer_relative_front; n != nullptr; p = n, n = n->_next_scheduled) + for(dynamic_thread_pool_group::work_item *p = nullptr, *n = next_timer_relative.front; n != nullptr; p = n, n = n->_next_scheduled) { if(n->_timepoint1 <= i->_timepoint1) { if(p == nullptr) { i->_next_scheduled = n; - _next_timer_relative_front = i; + next_timer_relative.front = i; } else { @@ -289,29 +430,32 @@ namespace detail } if(!done) { - _next_timer_relative_back->_next_scheduled = i; + next_timer_relative.back->_next_scheduled = i; i->_next_scheduled = nullptr; - _next_timer_relative_back = i; + next_timer_relative.back = i; } } + next_timer_relative.lock.unlock(); } else { - if(_next_timer_absolute_front == nullptr) + next_timer_absolute.lock.lock(); + next_timer_absolute.count.fetch_add(1, std::memory_order_relaxed); + if(next_timer_absolute.front == nullptr) { - _next_timer_absolute_front = _next_timer_absolute_back = i; + next_timer_absolute.front = next_timer_absolute.back = i; } else { bool done = false; - for(dynamic_thread_pool_group::work_item *p = nullptr, *n = _next_timer_absolute_front; n != nullptr; p = n, n = n->_next_scheduled) + for(dynamic_thread_pool_group::work_item *p = nullptr, *n = next_timer_absolute.front; n != nullptr; p = n, n = n->_next_scheduled) { if(n->_timepoint2 <= i->_timepoint2) { if(p == nullptr) { i->_next_scheduled = n; - _next_timer_absolute_front = i; + next_timer_absolute.front = i; } else { @@ -324,21 +468,27 @@ namespace detail } if(!done) { - _next_timer_absolute_back->_next_scheduled = i; + next_timer_absolute.back->_next_scheduled = i; i->_next_scheduled = nullptr; - _next_timer_absolute_back = i; + next_timer_absolute.back = i; } } + next_timer_absolute.lock.unlock(); } } #endif }; struct global_dynamic_thread_pool_impl { - std::mutex workqueue_lock; - std::vector workqueue; + using _spinlock_type = QUICKCPPLIB_NAMESPACE::configurable_spinlock::spinlock; + + _spinlock_type workqueue_lock; + struct workqueue_guard : std::unique_lock<_spinlock_type> + { + using std::unique_lock<_spinlock_type>::unique_lock; + }; + std::shared_ptr workqueue; #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD - using _lock_guard = std::unique_lock; using threadh_type = void *; using grouph_type = dispatch_group_t; static void _gcd_dispatch_callback(void *arg) @@ -352,7 +502,6 @@ namespace detail global_dynamic_thread_pool()._timerthread(workitem, nullptr); } #elif defined(_WIN32) - using _lock_guard = std::unique_lock; using threadh_type = PTP_CALLBACK_INSTANCE; using grouph_type = PTP_CALLBACK_ENVIRON; static void CALLBACK _win32_worker_thread_callback(threadh_type threadh, PVOID Parameter, PTP_WORK /*unused*/) @@ -366,46 +515,13 @@ namespace detail global_dynamic_thread_pool()._timerthread(workitem, threadh); } #else - using _lock_guard = std::unique_lock; -#if 0 - class _lock_guard - { - std::mutex &_l; - bool _is_locked{false}; - _lock_guard(std::mutex &l) - : _l(l) - { - lock(); - } - _lock_guard(const _lock_guard &) = delete; - _lock_guard(_lock_guard &&) = delete; - _lock_guard &operator=(const _lock_guard &) = delete; - _lock_guard &operator=(_lock_guard &&) = delete; - ~_lock_guard() - { - if(_is_locked) - { - unlock(); - } - } - void lock() - { - assert(!_is_locked); - global_dynamic_thread_pool_threads_awaiting_mutex.fetch_add(1, std::memory_order_relaxed); - _l.lock(); - _is_locked = true; - } - void unlock() - { - assert(_is_locked); - global_dynamic_thread_pool_threads_awaiting_mutex.fetch_sub(1, std::memory_order_relaxed); - _l.unlock(); - _is_locked = false; - } - }; -#endif using threadh_type = void *; using grouph_type = void *; + std::mutex threadpool_lock; + struct threadpool_guard : std::unique_lock + { + using std::unique_lock::unique_lock; + }; struct thread_t { thread_t *_prev{nullptr}, *_next{nullptr}; @@ -423,6 +539,10 @@ namespace detail std::atomic ms_sleep_for_more_work{20000}; std::mutex threadmetrics_lock; + struct threadmetrics_guard : std::unique_lock + { + using std::unique_lock::unique_lock; + }; struct threadmetrics_threadid { char text[12]; // enough for a UINT32_MAX in decimal @@ -473,6 +593,10 @@ namespace detail #endif std::mutex io_aware_work_item_handles_lock; + struct io_aware_work_item_handles_guard : std::unique_lock + { + using std::unique_lock::unique_lock; + }; struct io_aware_work_item_statfs { size_t refcount{0}; @@ -485,7 +609,6 @@ namespace detail global_dynamic_thread_pool_impl() { - workqueue.reserve(4); // preallocate 4 levels of nesting #if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) populate_threadmetrics(std::chrono::steady_clock::now()); #endif @@ -559,7 +682,7 @@ namespace detail #if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) inline void _execute_work(thread_t *self); - void _add_thread(_lock_guard & /*unused*/) + void _add_thread(threadpool_guard & /*unused*/) { thread_t *p = nullptr; try @@ -578,7 +701,7 @@ namespace detail } } - bool _remove_thread(_lock_guard &g, threads_t &which) + bool _remove_thread(threadpool_guard &g, threads_t &which) { if(which.count == 0) { @@ -614,7 +737,7 @@ namespace detail ~global_dynamic_thread_pool_impl() { { - _lock_guard g(workqueue_lock); // lock global + threadpool_guard g(threadpool_lock); while(threadpool_active.count > 0 || threadpool_sleeping.count > 0) { while(threadpool_sleeping.count > 0) @@ -631,7 +754,7 @@ namespace detail } } } - _lock_guard g(threadmetrics_lock); + threadmetrics_guard g(threadmetrics_lock); for(auto *p : threadmetrics_sorted) { delete p; @@ -649,7 +772,7 @@ namespace detail #ifdef __linux__ // You are guaranteed only one of these EVER executes at a time. Locking is probably overkill, but equally also probably harmless - bool update_threadmetrics(_lock_guard &&g, std::chrono::steady_clock::time_point now, threadmetrics_item *new_items) + bool update_threadmetrics(threadmetrics_guard &&g, std::chrono::steady_clock::time_point now, threadmetrics_item *new_items) { auto update_item = [&](threadmetrics_item *item) { char path[64] = "/proc/self/task/", *pend = path + 16, *tend = item->threadid.text; @@ -739,7 +862,7 @@ namespace detail auto threadmetrics_blocked = (ssize_t) threadmetrics_queue.blocked; g.unlock(); // drop threadmetrics_lock - _lock_guard gg(workqueue_lock); + threadpool_guard gg(threadpool_lock); // Adjust for the number of threads sleeping for more work threadmetrics_running += threadpool_sleeping.count; threadmetrics_blocked -= threadpool_sleeping.count; @@ -752,7 +875,7 @@ namespace detail auto toremove = std::max((ssize_t) 0, (ssize_t) threadmetrics_running - (ssize_t) max_hardware_concurrency); if(toadd > 0 || toremove > 0) { - //std::cout << "total active = " << threadpool_active.count << " total idle = " << threadpool_sleeping.count + // std::cout << "total active = " << threadpool_active.count << " total idle = " << threadpool_sleeping.count // << " threadmetrics_running = " << threadmetrics_running << " threadmetrics_blocked = " << threadmetrics_blocked << " toadd = " << toadd // << " toremove = " << toremove << std::endl; if(toadd > 0) @@ -784,7 +907,7 @@ namespace detail using dirent = dirent64; size_t bytes = 0; { - _lock_guard g(threadmetrics_lock); + threadmetrics_guard g(threadmetrics_lock); if(now - threadmetrics_last_updated < std::chrono::milliseconds(250) && threadmetrics_queue.running + threadmetrics_queue.blocked >= threadpool_threads.load(std::memory_order_relaxed)) { @@ -805,7 +928,7 @@ namespace detail } } { - _lock_guard g(proc_self_task_fd_lock); + std::lock_guard g(proc_self_task_fd_lock); /* It turns out that /proc/self/task is quite racy in the Linux kernel, so keep looping this until it stops telling obvious lies. */ @@ -855,7 +978,7 @@ namespace detail } } threadmetrics_item *firstnewitem = nullptr; - _lock_guard g(threadmetrics_lock); + threadmetrics_guard g(threadmetrics_lock); #if 0 { std::stringstream s; @@ -1015,14 +1138,16 @@ namespace detail return success(); } - inline void _submit_work_item(_lock_guard &g, bool submit_into_highest_priority, dynamic_thread_pool_group::work_item *workitem, bool defer_pool_wake); + inline void _submit_work_item(dynamic_thread_pool_group_impl_guard &g, bool submit_into_highest_priority, dynamic_thread_pool_group::work_item *workitem, + bool defer_pool_wake); - inline result submit(_lock_guard &g, dynamic_thread_pool_group_impl *group, span work) noexcept; + inline result submit(dynamic_thread_pool_group_impl_guard &g, dynamic_thread_pool_group_impl *group, + span work) noexcept; - inline void _work_item_done(_lock_guard &g, dynamic_thread_pool_group::work_item *i) noexcept; + inline void _work_item_done(dynamic_thread_pool_group_impl_guard &g, dynamic_thread_pool_group::work_item *i) noexcept; - inline result stop(_lock_guard &g, dynamic_thread_pool_group_impl *group, result err) noexcept; - inline result wait(_lock_guard &g, bool reap, dynamic_thread_pool_group_impl *group, deadline d) noexcept; + inline result stop(dynamic_thread_pool_group_impl_guard &g, dynamic_thread_pool_group_impl *group, result err) noexcept; + inline result wait(dynamic_thread_pool_group_impl_guard &g, bool reap, dynamic_thread_pool_group_impl *group, deadline d) noexcept; inline void _timerthread(dynamic_thread_pool_group::work_item *workitem, threadh_type selfthreadh); inline void _workerthread(dynamic_thread_pool_group::work_item *workitem, threadh_type selfthreadh); @@ -1065,7 +1190,6 @@ class dynamic_thread_pool_group_impl final : public dynamic_thread_pool_group friend struct detail::global_dynamic_thread_pool_impl; mutable std::mutex _lock; - using _lock_guard = detail::global_dynamic_thread_pool_impl::_lock_guard; size_t _nesting_level{0}; struct workitems_t { @@ -1105,14 +1229,13 @@ public: #elif defined(_WIN32) InitializeThreadpoolEnvironment(_grouph); #endif - detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.workqueue_lock); // lock global + detail::global_dynamic_thread_pool_impl::workqueue_guard g(impl.workqueue_lock); // Append this group to the global work queue at its nesting level - if(_nesting_level >= impl.workqueue.size()) + if(!impl.workqueue || impl.workqueue->nesting_level <= _nesting_level) { - impl.workqueue.resize(_nesting_level + 1); + impl.workqueue = std::make_shared(_nesting_level, std::move(impl.workqueue)); } - auto &wq = impl.workqueue[_nesting_level]; - wq.items.insert(this); + impl.workqueue->items.insert(this); return success(); } catch(...) @@ -1139,13 +1262,19 @@ public: _grouph = nullptr; } #endif - detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.workqueue_lock); // lock global - assert(impl.workqueue.size() > _nesting_level); - auto &wq = impl.workqueue[_nesting_level]; - wq.items.erase(this); - while(!impl.workqueue.empty() && impl.workqueue.back().items.empty()) + detail::global_dynamic_thread_pool_impl::workqueue_guard g(impl.workqueue_lock); + assert(impl.workqueue->nesting_level >= _nesting_level); + for(auto *p = impl.workqueue.get(); p != nullptr; p = p->next.get()) { - impl.workqueue.pop_back(); + if(p->nesting_level == _nesting_level) + { + p->items.erase(this); + break; + } + } + while(impl.workqueue && impl.workqueue->items.empty()) + { + impl.workqueue = std::move(impl.workqueue->next); } } @@ -1167,7 +1296,7 @@ public: } _stopped.store(false, std::memory_order_release); auto &impl = detail::global_dynamic_thread_pool(); - _lock_guard g(_lock); // lock group + detail::dynamic_thread_pool_group_impl_guard g(_lock); // lock group if(_work_items_active.count == 0 && _work_items_done.count == 0) { _abnormal_completion_cause = success(); @@ -1188,7 +1317,7 @@ public: return success(); } auto &impl = detail::global_dynamic_thread_pool(); - _lock_guard g(_lock); // lock group + detail::dynamic_thread_pool_group_impl_guard g(_lock); // lock group return impl.stop(g, this, errc::operation_canceled); } @@ -1204,7 +1333,7 @@ public: return success(); } auto &impl = detail::global_dynamic_thread_pool(); - _lock_guard g(_lock); // lock group + detail::dynamic_thread_pool_group_impl_guard g(_lock); // lock group return impl.wait(g, true, const_cast(this), d); } }; @@ -1264,9 +1393,8 @@ namespace detail { pthread_setname_np(pthread_self(), "LLFIO DYN TPG"); self->last_did_work = std::chrono::steady_clock::now(); - _lock_guard g(workqueue_lock); // lock global - self->state++; // busy - threadpool_threads.fetch_add(1, std::memory_order_release); + self->state++; // busy + const unsigned mythreadidx = threadpool_threads.fetch_add(1, std::memory_order_release); #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING std::cout << "*** DTP " << self << " begins." << std::endl; #endif @@ -1277,44 +1405,66 @@ namespace detail std::chrono::steady_clock::time_point now_steady, earliest_duration; std::chrono::system_clock::time_point now_system, earliest_absolute; // Start from highest priority work group, executing any timers due before selecting a work item - for(auto it = workqueue.rbegin(); it != workqueue.rend() && workitem == nullptr; ++it) { - auto &wq = *it; - if(wq._next_timer_relative_front != nullptr) + workqueue_lock.lock(); + auto lock_wq = workqueue; + workqueue_lock.unlock(); + while(lock_wq) { - if(now_steady == std::chrono::steady_clock::time_point()) - { - now_steady = std::chrono::steady_clock::now(); - } - if(wq._next_timer_relative_front->_timepoint1 <= now_steady) - { - workitem = wq.next_timer(1); - workitem_is_timer = true; - break; - } - if(earliest_duration == std::chrono::steady_clock::time_point() || wq._next_timer_relative_front->_timepoint1 < earliest_duration) + auto &wq = *lock_wq; + if(wq.next_timer_relative.count.load(std::memory_order_relaxed) > 0) { - earliest_duration = wq._next_timer_relative_front->_timepoint1; + if(now_steady == std::chrono::steady_clock::time_point()) + { + now_steady = std::chrono::steady_clock::now(); + } + wq.next_timer_relative.lock.lock(); + if(wq.next_timer_relative.front != nullptr) + { + if(wq.next_timer_relative.front->_timepoint1 <= now_steady) + { + workitem = wq.next_timer<1>(); // unlocks wq.next_timer_relative.lock + workitem_is_timer = true; + break; + } + if(earliest_duration == std::chrono::steady_clock::time_point() || wq.next_timer_relative.front->_timepoint1 < earliest_duration) + { + earliest_duration = wq.next_timer_relative.front->_timepoint1; + } + } + wq.next_timer_relative.lock.unlock(); } - } - if(wq._next_timer_absolute_front != nullptr) - { - if(now_system == std::chrono::system_clock::time_point()) + if(wq.next_timer_absolute.count.load(std::memory_order_relaxed) > 0) { - now_system = std::chrono::system_clock::now(); + if(now_system == std::chrono::system_clock::time_point()) + { + now_system = std::chrono::system_clock::now(); + } + wq.next_timer_absolute.lock.lock(); + if(wq.next_timer_absolute.front != nullptr) + { + if(wq.next_timer_absolute.front->_timepoint2 <= now_system) + { + workitem = wq.next_timer<2>(); // unlocks wq.next_timer_absolute.lock + workitem_is_timer = true; + break; + } + if(earliest_absolute == std::chrono::system_clock::time_point() || wq.next_timer_absolute.front->_timepoint2 < earliest_absolute) + { + earliest_absolute = wq.next_timer_absolute.front->_timepoint2; + } + } + wq.next_timer_absolute.lock.unlock(); } - if(wq._next_timer_absolute_front->_timepoint2 <= now_system) + workitem = wq.next_active(mythreadidx); + if(workitem != nullptr) { - workitem = wq.next_timer(2); - workitem_is_timer = true; break; } - if(earliest_absolute == std::chrono::system_clock::time_point() || wq._next_timer_absolute_front->_timepoint2 < earliest_absolute) - { - earliest_absolute = wq._next_timer_absolute_front->_timepoint2; - } + workqueue_lock.lock(); + lock_wq = lock_wq->next; + workqueue_lock.unlock(); } - workitem = wq.next_active(); } if(now_steady == std::chrono::steady_clock::time_point()) { @@ -1326,6 +1476,7 @@ namespace detail const std::chrono::steady_clock::duration max_sleep(std::chrono::milliseconds(ms_sleep_for_more_work.load(std::memory_order_relaxed))); if(now_steady - self->last_did_work >= max_sleep) { + threadpool_guard g(threadpool_lock); // If there are any timers running, leave at least one worker thread if(threadpool_active.count > 1 || (earliest_duration == std::chrono::steady_clock::time_point() && earliest_absolute == std::chrono::system_clock::time_point())) @@ -1360,6 +1511,7 @@ namespace detail earliest_absolute = {}; } } + threadpool_guard g(threadpool_lock); _remove_from_list(threadpool_active, self); _append_to_list(threadpool_sleeping, self); self->state--; @@ -1391,7 +1543,6 @@ namespace detail catch(...) { } - g.lock(); continue; } self->last_did_work = now_steady; @@ -1399,7 +1550,6 @@ namespace detail std::cout << "*** DTP " << self << " executes work item " << workitem << std::endl; #endif total_submitted_workitems.fetch_sub(1, std::memory_order_relaxed); - g.unlock(); if(workitem_is_timer) { _timerthread(workitem, nullptr); @@ -1413,6 +1563,7 @@ namespace detail { if(populate_threadmetrics(now_steady)) { + threadpool_guard g(threadpool_lock); _remove_from_list(threadpool_active, self); threadpool_threads.fetch_sub(1, std::memory_order_release); #if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING @@ -1426,7 +1577,6 @@ namespace detail catch(...) { } - g.lock(); } self->state -= 2; // dead threadpool_threads.fetch_sub(1, std::memory_order_release); @@ -1436,7 +1586,7 @@ namespace detail } #endif - inline void global_dynamic_thread_pool_impl::_submit_work_item(_lock_guard &g, bool submit_into_highest_priority, + inline void global_dynamic_thread_pool_impl::_submit_work_item(dynamic_thread_pool_group_impl_guard &g, bool submit_into_highest_priority, dynamic_thread_pool_group::work_item *workitem, bool defer_pool_wake) { (void) g; @@ -1526,44 +1676,70 @@ namespace detail // std::cout << "*** timer " << workitem << std::endl; SetThreadpoolTimer((PTP_TIMER) workitem->_internaltimerh, &ft, 0, slop); #else - _lock_guard gg(workqueue_lock); - auto *wq = &workqueue[submit_into_highest_priority ? (workqueue.size() - 1) : parent->_nesting_level]; - wq->append_timer(workitem); + workqueue_guard gg(workqueue_lock); + for(auto *p = workqueue.get(); p != nullptr; p = p->next.get()) + { + if(p->nesting_level == parent->_nesting_level) + { + p->append_timer(workitem); + break; + } + } #endif } else { #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD intptr_t priority = DISPATCH_QUEUE_PRIORITY_LOW; - if(workqueue.size() - parent->_nesting_level == 1) { - priority = DISPATCH_QUEUE_PRIORITY_HIGH; - } - else if(workqueue.size() - parent->_nesting_level == 2) - { - priority = DISPATCH_QUEUE_PRIORITY_DEFAULT; + global_dynamic_thread_pool_impl::workqueue_guard gg(workqueue_lock); + if(workqueue->nesting_level == parent->_nesting_level) + { + priority = DISPATCH_QUEUE_PRIORITY_HIGH; + } + else if(workqueue->nesting_level == parent->_nesting_level + 1) + { + priority = DISPATCH_QUEUE_PRIORITY_DEFAULT; + } } // std::cout << "*** submit " << workitem << std::endl; dispatch_group_async_f(parent->_grouph, dispatch_get_global_queue(priority, 0), workitem, _gcd_dispatch_callback); #elif defined(_WIN32) // Set the priority of the group according to distance from the top TP_CALLBACK_PRIORITY priority = TP_CALLBACK_PRIORITY_LOW; - if(workqueue.size() - parent->_nesting_level == 1) { - priority = TP_CALLBACK_PRIORITY_HIGH; - } - else if(workqueue.size() - parent->_nesting_level == 2) - { - priority = TP_CALLBACK_PRIORITY_NORMAL; + global_dynamic_thread_pool_impl::workqueue_guard gg(workqueue_lock); + if(workqueue->nesting_level == parent->_nesting_level) + { + priority = TP_CALLBACK_PRIORITY_HIGH; + } + else if(workqueue->nesting_level == parent->_nesting_level + 1) + { + priority = TP_CALLBACK_PRIORITY_NORMAL; + } } SetThreadpoolCallbackPriority(parent->_grouph, priority); // std::cout << "*** submit " << workitem << std::endl; SubmitThreadpoolWork((PTP_WORK) workitem->_internalworkh); #else - _lock_guard gg(workqueue_lock); - auto *wq = &workqueue[submit_into_highest_priority ? (workqueue.size() - 1) : parent->_nesting_level]; - // TODO: It would be super nice if we prepended this instead if it came from a timer - wq->append_active(workitem); + global_dynamic_thread_pool_impl::workqueue_guard gg(workqueue_lock); + if(submit_into_highest_priority || workqueue->nesting_level == parent->_nesting_level) + { + // TODO: It would be super nice if we prepended this instead if it came from a timer + workqueue->append_active(workitem); + } + else + { + for(auto *p = workqueue->next.get(); p != nullptr; p = p->next.get()) + { + if(p->nesting_level == parent->_nesting_level) + { + // TODO: It would be super nice if we prepended this instead if it came from a timer + p->append_active(workitem); + break; + } + } + } #endif } @@ -1578,7 +1754,7 @@ namespace detail { g.unlock(); // unlock group { - _lock_guard gg(workqueue_lock); // lock global + threadpool_guard gg(threadpool_lock); if(threadpool_active.count == 0 && threadpool_sleeping.count == 0) { _add_thread(gg); @@ -1602,7 +1778,7 @@ namespace detail } } - inline result global_dynamic_thread_pool_impl::submit(_lock_guard &g, dynamic_thread_pool_group_impl *group, + inline result global_dynamic_thread_pool_impl::submit(dynamic_thread_pool_group_impl_guard &g, dynamic_thread_pool_group_impl *group, span work) noexcept { try @@ -1683,7 +1859,7 @@ namespace detail } } - inline void global_dynamic_thread_pool_impl::_work_item_done(_lock_guard &g, dynamic_thread_pool_group::work_item *i) noexcept + inline void global_dynamic_thread_pool_impl::_work_item_done(dynamic_thread_pool_group_impl_guard &g, dynamic_thread_pool_group::work_item *i) noexcept { (void) g; // std::cout << "*** _work_item_done " << i << std::endl; @@ -1787,7 +1963,8 @@ namespace detail } } - inline result global_dynamic_thread_pool_impl::stop(_lock_guard &g, dynamic_thread_pool_group_impl *group, result err) noexcept + inline result global_dynamic_thread_pool_impl::stop(dynamic_thread_pool_group_impl_guard &g, dynamic_thread_pool_group_impl *group, + result err) noexcept { (void) g; if(group->_abnormal_completion_cause) @@ -1799,7 +1976,8 @@ namespace detail } - inline result global_dynamic_thread_pool_impl::wait(_lock_guard &g, bool reap, dynamic_thread_pool_group_impl *group, deadline d) noexcept + inline result global_dynamic_thread_pool_impl::wait(dynamic_thread_pool_group_impl_guard &g, bool reap, dynamic_thread_pool_group_impl *group, + deadline d) noexcept { LLFIO_DEADLINE_TO_SLEEP_INIT(d); if(!d || d.nsecs > 0) @@ -2018,7 +2196,7 @@ namespace detail assert(workitem->_nextwork != -1); assert(workitem->_has_timer_set()); auto *parent = workitem->_parent.load(std::memory_order_relaxed); - _lock_guard g(parent->_lock); // lock group + dynamic_thread_pool_group_impl_guard g(parent->_lock); // lock group // std::cout << "*** _timerthread " << workitem << std::endl; if(parent->_stopping.load(std::memory_order_relaxed)) { @@ -2087,7 +2265,7 @@ namespace detail auto *parent = workitem->_parent.load(std::memory_order_relaxed); if(parent->_stopping.load(std::memory_order_relaxed)) { - _lock_guard g(parent->_lock); // lock group + dynamic_thread_pool_group_impl_guard g(parent->_lock); // lock group _work_item_done(g, workitem); return; } @@ -2099,7 +2277,7 @@ namespace detail auto r = (*workitem)(workitem->_nextwork); workitem->_nextwork = 0; // call next() next time tls = old_thread_local_state; - _lock_guard g(parent->_lock); // lock group + dynamic_thread_pool_group_impl_guard g(parent->_lock); // lock group // std::cout << "*** _workerthread " << workitem << " ends with work " << workitem->_nextwork << std::endl; if(!r) { @@ -2157,7 +2335,7 @@ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC dynamic_thread_pool_group::io_aware_work_item::i } } auto &impl = detail::global_dynamic_thread_pool(); - detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.io_aware_work_item_handles_lock); + detail::global_dynamic_thread_pool_impl::io_aware_work_item_handles_guard g(impl.io_aware_work_item_handles_lock); for(auto &h : hs) { if(!h.h->is_seekable()) @@ -2195,7 +2373,7 @@ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC dynamic_thread_pool_group::io_aware_work_item::~ { LLFIO_LOG_FUNCTION_CALL(this); auto &impl = detail::global_dynamic_thread_pool(); - detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.io_aware_work_item_handles_lock); + detail::global_dynamic_thread_pool_impl::io_aware_work_item_handles_guard g(impl.io_aware_work_item_handles_lock); using value_type = decltype(impl.io_aware_work_item_handles)::value_type; for(auto &h : _handles) { @@ -2213,7 +2391,7 @@ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC intptr_t dynamic_thread_pool_group::io_aware_wor { auto &impl = detail::global_dynamic_thread_pool(); auto now = std::chrono::steady_clock::now(); - detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.io_aware_work_item_handles_lock); + detail::global_dynamic_thread_pool_impl::io_aware_work_item_handles_guard g(impl.io_aware_work_item_handles_lock); using value_type = decltype(impl.io_aware_work_item_handles)::value_type; for(auto &h : _handles) { -- cgit v1.2.3