diff options
author | Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com> | 2021-02-09 15:57:41 +0300 |
---|---|---|
committer | Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com> | 2021-03-16 13:21:42 +0300 |
commit | d00312487dd19c1c5f8967974c12239eebdeb667 (patch) | |
tree | 2449d0ea21cac4c0412cc4c6b7c44f41b966b5c9 | |
parent | 944f8f03360b2d12f04dff25a35e9ef0e1f15e7b (diff) |
More wip
-rw-r--r-- | include/llfio/revision.hpp | 6 | ||||
-rw-r--r-- | include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp | 359 | ||||
-rw-r--r-- | include/llfio/v2.0/dynamic_thread_pool_group.hpp | 26 | ||||
-rw-r--r-- | test/tests/dynamic_thread_pool_group.cpp | 1 |
4 files changed, 271 insertions, 121 deletions
diff --git a/include/llfio/revision.hpp b/include/llfio/revision.hpp index 3bf437f0..3456e101 100644 --- a/include/llfio/revision.hpp +++ b/include/llfio/revision.hpp @@ -1,4 +1,4 @@ // Note the second line of this file must ALWAYS be the git SHA, third line ALWAYS the git SHA update time -#define LLFIO_PREVIOUS_COMMIT_REF 0d62d125a2ee2404976155f104dbe109430ae0ba -#define LLFIO_PREVIOUS_COMMIT_DATE "2021-01-19 15:43:28 +00:00" -#define LLFIO_PREVIOUS_COMMIT_UNIQUE 0d62d125 +#define LLFIO_PREVIOUS_COMMIT_REF b40a7594bd14dfbeaca42caf77b00df27df27b95 +#define LLFIO_PREVIOUS_COMMIT_DATE "2021-02-09 12:57:41 +00:00" +#define LLFIO_PREVIOUS_COMMIT_UNIQUE b40a7594 diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp index b0bbef9a..571b90cf 100644 --- a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp +++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp @@ -75,8 +75,8 @@ namespace detail struct workqueue_item { std::unordered_set<dynamic_thread_pool_group_impl *> items; - std::unordered_set<dynamic_thread_pool_group_impl *>::iterator currentgroup; - size_t currentgroupremaining{0}; + std::unordered_set<dynamic_thread_pool_group_impl *>::iterator currentgroup{items.begin()}; + dynamic_thread_pool_group_impl *_currentgroup{nullptr}; }; std::vector<workqueue_item> workqueue; #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD @@ -122,6 +122,7 @@ namespace detail thread_t *front{nullptr}, *back{nullptr}; } threadpool_active, threadpool_sleeping; std::atomic<size_t> total_submitted_workitems{0}, threadpool_threads{0}, threadpool_sleeping_count{0}; + std::atomic<uint32_t> ms_sleep_for_more_work{60000}; std::mutex threadmetrics_lock; struct threadmetrics_threadid @@ -209,6 +210,23 @@ namespace detail } what.count++; } + template <class T, class U> static void _prepend_to_list(T &what, U *v) + { + if(what.front == nullptr) + { + assert(what.back == nullptr); + v->_next = v->_prev = nullptr; + what.front = what.back = v; + } + else + { + v->_prev = nullptr; + v->_next = what.front; + what.front->_prev = v; + what.front = v; + } + what.count++; + } template <class T, class U> static void _remove_from_list(T &what, U *v) { if(v->_prev == nullptr && v->_next == nullptr) @@ -739,7 +757,7 @@ class dynamic_thread_pool_group_impl final : public dynamic_thread_pool_group { size_t count{0}; dynamic_thread_pool_group::work_item *front{nullptr}, *back{nullptr}; - } _work_items_active, _work_items_done, _work_items_delayed; + } _work_items_timer, _work_items_active, _work_items_done, _work_items_delayed; std::atomic<bool> _stopping{false}, _stopped{true}, _completing{false}; std::atomic<int> _waits{0}; result<void> _abnormal_completion_cause{success()}; // The cause of any abnormal group completion @@ -751,6 +769,9 @@ class dynamic_thread_pool_group_impl final : public dynamic_thread_pool_group PTP_CALLBACK_ENVIRON _grouph{&_callbackenviron}; #else void *_grouph{nullptr}; + size_t _newly_added_active_work_items{0}; + size_t _timer_work_items_remaining{0}; + size_t _active_work_items_remaining{0}; #endif public: @@ -774,14 +795,30 @@ public: // Append this group to the global work queue at its nesting level if(_nesting_level >= impl.workqueue.size()) { + for(auto &i : impl.workqueue) + { + i._currentgroup = (i.currentgroup != i.items.end()) ? *i.currentgroup : nullptr; + i.currentgroup = {}; + } impl.workqueue.resize(_nesting_level + 1); + for(auto &i : impl.workqueue) + { + if(i._currentgroup != nullptr) + { + i.currentgroup = i.items.find(i._currentgroup); + } + else + { + i.currentgroup = i.items.end(); + } + i._currentgroup = nullptr; + } } auto &wq = impl.workqueue[_nesting_level]; wq.items.insert(this); if(wq.items.size() == 1) { wq.currentgroup = wq.items.begin(); - wq.currentgroupremaining = _work_items_active.count; } return success(); } @@ -812,16 +849,12 @@ public: detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.workqueue_lock); // lock global assert(impl.workqueue.size() > _nesting_level); auto &wq = impl.workqueue[_nesting_level]; - if(*wq.currentgroup == this) + if(wq.items.end() != wq.currentgroup && *wq.currentgroup == this) { if(wq.items.end() == ++wq.currentgroup) { wq.currentgroup = wq.items.begin(); } - if(!wq.items.empty()) - { - wq.currentgroupremaining = (*wq.currentgroup)->_work_items_active.count; - } } wq.items.erase(this); while(!impl.workqueue.empty() && impl.workqueue.back().items.empty()) @@ -849,12 +882,12 @@ public: _stopped.store(false, std::memory_order_release); auto &impl = detail::global_dynamic_thread_pool(); _lock_guard g(_lock); // lock group - if(_work_items_active.count == 0 && _work_items_done.count == 0) + if(_work_items_timer.count == 0 && _work_items_active.count == 0 && _work_items_done.count == 0) { _abnormal_completion_cause = success(); } OUTCOME_TRY(impl.submit(g, this, work)); - if(_work_items_active.count == 0) + if(_work_items_timer.count == 0 && _work_items_active.count == 0) { _stopped.store(true, std::memory_order_release); } @@ -900,6 +933,30 @@ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC dynamic_thread_pool_group::work_item *dynamic_th return detail::global_dynamic_thread_pool_thread_local_state().workitem; } +LLFIO_HEADERS_ONLY_MEMFUNC_SPEC uint32_t dynamic_thread_pool_group::ms_sleep_for_more_work() noexcept +{ +#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) + return detail::global_dynamic_thread_pool().ms_sleep_for_more_work.load(std::memory_order_relaxed); +#else + return 0; +#endif +} + +LLFIO_HEADERS_ONLY_MEMFUNC_SPEC uint32_t dynamic_thread_pool_group::ms_sleep_for_more_work(uint32_t v) noexcept +{ +#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) + if(0 == v) + { + v = 1; + } + detail::global_dynamic_thread_pool().ms_sleep_for_more_work.store(v, std::memory_order_relaxed); + return v; +#else + (void) v; + return 0; +#endif +} + LLFIO_HEADERS_ONLY_FUNC_SPEC result<dynamic_thread_pool_group_ptr> make_dynamic_thread_pool_group() noexcept { try @@ -936,37 +993,75 @@ namespace detail std::chrono::system_clock::time_point earliest_absolute; if(!workqueue.empty()) { - auto wq_it = (workqueue_depth >= workqueue.size()) ? --workqueue.end() : (workqueue.begin() + workqueue_depth); + auto wq_it = --workqueue.end(); + bool in_highest_priority_queue = true, started_from_top = false; + if(workqueue.size() > 1 && workqueue_depth < workqueue.size() - 1) + { + wq_it = workqueue.begin() + workqueue_depth; + in_highest_priority_queue = false; + } auto *wq = &(*wq_it); + if(in_highest_priority_queue) + { + if(wq->currentgroup == wq->items.end()) + { + wq->currentgroup = wq->items.begin(); + started_from_top = true; + } + else if(wq->currentgroup == wq->items.begin()) + { + started_from_top = true; + } + } for(;;) { dynamic_thread_pool_group_impl *tpg = *wq->currentgroup; _lock_guard gg(tpg->_lock); // lock group - if(wq->currentgroupremaining > tpg->_work_items_active.count) - { - wq->currentgroupremaining = tpg->_work_items_active.count; - } -#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING - std::cout << "*** DTP " << self << " sees " << wq->currentgroupremaining << " items remaining in group " << tpg << std::endl; -#endif - if(wq->currentgroupremaining > 0 && tpg->_work_items_active.front->_internalworkh == nullptr) + if(started_from_top) { - auto *wi = tpg->_work_items_active.front; - _remove_from_list(tpg->_work_items_active, wi); - _append_to_list(tpg->_work_items_active, wi); - wq->currentgroupremaining--; - if(wi->_internaltimerh == nullptr) + if(tpg->_timer_work_items_remaining == 0 && tpg->_active_work_items_remaining == 0) { - workitem = wi; - workitem->_internalworkh = self; -#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING - std::cout << "*** DTP " << self << " chooses work item " << workitem << " from group " << tpg << " distance from top " - << (workqueue.end() - wq_it - 1) << std::endl; -#endif - break; + tpg->_timer_work_items_remaining = (size_t) -1; + tpg->_active_work_items_remaining = (size_t) -1; + } + else + { + started_from_top = false; } + } + std::cout << "*** DTP " << self << " sees in group " << tpg << " in_highest_priority_queue = " << in_highest_priority_queue + << " work items = " << tpg->_work_items_active.count << " timer items = " << tpg->_work_items_timer.count << std::endl; + if(tpg->_work_items_timer.count > 0 && tpg->_work_items_timer.front->_internalworkh != nullptr) + { + tpg->_timer_work_items_remaining = 0; // this group is being fully executed right now + } + else if(tpg->_timer_work_items_remaining > tpg->_work_items_timer.count) + { + tpg->_timer_work_items_remaining = tpg->_work_items_timer.count; + } + // If we are not in the highest priority group, and there are no newly added + // work items, don't execute work. + if(tpg->_work_items_active.count > 0 && + (tpg->_work_items_active.front->_internalworkh != nullptr || (!in_highest_priority_queue && tpg->_newly_added_active_work_items == 0))) + { + tpg->_active_work_items_remaining = 0; // this group is being fully executed right now + } + else if(tpg->_active_work_items_remaining > tpg->_work_items_active.count) + { + tpg->_active_work_items_remaining = tpg->_work_items_active.count; + } + std::cout << "*** DTP " << self << " sees in group " << tpg << " _active_work_items_remaining = " << tpg->_active_work_items_remaining + << " _timer_work_items_remaining = " << tpg->_timer_work_items_remaining << std::endl; + if(tpg->_timer_work_items_remaining > 0) + { + auto *wi = tpg->_work_items_timer.front; + assert(wi->_has_timer_set()); + _remove_from_list(tpg->_work_items_timer, wi); + _append_to_list(tpg->_work_items_timer, wi); + tpg->_timer_work_items_remaining--; + bool invoketimer = false; - if(wi->_timepoint1 != std::chrono::steady_clock::time_point()) + if(wi->_has_timer_set_relative()) { // Special constant for immediately rescheduled work items if(wi->_timepoint1 == std::chrono::steady_clock::time_point(std::chrono::steady_clock::duration(1))) @@ -982,8 +1077,7 @@ namespace detail } } } - if(wi->_timepoint2 != std::chrono::system_clock::time_point() && - (earliest_absolute == std::chrono::system_clock::time_point() || wi->_timepoint2 < earliest_absolute)) + if(wi->_has_timer_set_absolute() && (earliest_absolute == std::chrono::system_clock::time_point() || wi->_timepoint2 < earliest_absolute)) { earliest_absolute = wi->_timepoint2; if(wi->_timepoint2 <= std::chrono::system_clock::now()) @@ -991,11 +1085,12 @@ namespace detail invoketimer = true; } } + // If this work item's timer is due, execute immediately if(invoketimer) { wi->_internalworkh = self; wi->_internaltimerh = nullptr; -#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING +#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING std::cout << "*** DTP " << self << " executes timer item " << wi << std::endl; #endif gg.unlock(); @@ -1005,68 +1100,80 @@ namespace detail // wi->_internalworkh should be null, however wi may also no longer exist goto restart; } -#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING +#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING std::cout << "*** DTP " << self << " timer item " << wi << " timer is not ready yet " << std::endl; #endif + // Process all timers before considering work + continue; } - else + if(tpg->_active_work_items_remaining > 0) { - auto startinggroup = wq->currentgroup; - bool at_top = (workqueue_depth == workqueue.size() - 1); - do + auto *wi = tpg->_work_items_active.front; + assert(!wi->_has_timer_set()); + _remove_from_list(tpg->_work_items_active, wi); + _append_to_list(tpg->_work_items_active, wi); + tpg->_active_work_items_remaining--; + + if(tpg->_newly_added_active_work_items > 0) { - gg.unlock(); // unlock group - if(++wq->currentgroup == wq->items.end()) - { - wq->currentgroup = wq->items.begin(); - } - if(!at_top) - { - workqueue_depth = workqueue.size() - 1; - wq_it = --workqueue.end(); - at_top = true; - wq = &(*wq_it); - startinggroup = wq->currentgroup; - } - tpg = *wq->currentgroup; - gg = _lock_guard(tpg->_lock); // lock group -#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING - std::cout << "*** DTP " << self << " workqueue distance " << (workqueue.end() - wq_it - 1) << " examining " << tpg - << " finds _work_items_active.count = " << tpg->_work_items_active.count << "." << std::endl; + tpg->_newly_added_active_work_items--; + } + workitem = wi; + workitem->_internalworkh = self; +#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " chooses work item " << workitem << " from group " << tpg << " distance from top " + << (workqueue.end() - wq_it - 1) << std::endl; #endif - if(startinggroup == wq->currentgroup) + // Execute this workitem + break; + } + + // Move to next group, which may be in a lower priority work queue + assert(tpg->_active_work_items_remaining == 0); + if(++wq->currentgroup == wq->items.end()) + { +#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " reaches end of current group, started_from_top = " << started_from_top << std::endl; +#endif + gg.unlock(); // unlock group + if(!started_from_top) + { + workqueue_depth = (size_t) -1; + goto restart; + } + // Nothing for me to do in this workqueue + if(wq_it == workqueue.begin()) + { + // Reset all wq->currentgroup to begin() to ensure timers + // etc in lower priority groups do get seen + for(auto &i : workqueue) { - if(tpg->_work_items_active.count == 0 || tpg->_work_items_active.front->_internalworkh != nullptr) + if(i.currentgroup == i.items.end()) { - // Nothing for me to do in this workqueue - if(wq_it == workqueue.begin()) - { - assert(workitem == nullptr); -#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING - std::cout << "*** DTP " << self << " finds work queue empty, going to sleep." << std::endl; -#endif - goto workqueue_empty; - } - gg.unlock(); // unlock group - --wq_it; - workqueue_depth = wq_it - workqueue.begin(); - wq = &(*wq_it); - tpg = *wq->currentgroup; - startinggroup = wq->currentgroup; - gg = _lock_guard(tpg->_lock); // lock group -#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING - std::cout << "*** DTP " << self << " moves work search up to distance from top = " << (workqueue.end() - wq_it - 1) << " examining " << tpg - << " finds _work_items_active.count = " << tpg->_work_items_active.count << "." << std::endl; -#endif - continue; + i.currentgroup = i.items.begin(); } } - } while(tpg->_work_items_active.count == 0); - wq->currentgroupremaining = tpg->_work_items_active.count; -#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING - std::cout << "*** DTP " << self << " choose new group " << tpg << std::endl; +#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " finds work queue empty, going to sleep." << std::endl; +#endif + assert(workitem == nullptr); + goto workqueue_empty; + } + --wq_it; + workqueue_depth = wq_it - workqueue.begin(); + wq = &(*wq_it); + tpg = *wq->currentgroup; + gg = _lock_guard(tpg->_lock); // lock group +#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " moves work search up to distance from top = " << (workqueue.end() - wq_it - 1) << " examining " << tpg + << " finds _work_items_active.count = " << tpg->_work_items_active.count << "." << std::endl; #endif } + tpg = *wq->currentgroup; + tpg->_active_work_items_remaining = tpg->_work_items_active.count; +#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " choose new group " << tpg << std::endl; +#endif } } workqueue_empty: @@ -1089,7 +1196,7 @@ namespace detail earliest_absolute = {}; } } - else if(now - self->last_did_work >= std::chrono::minutes(1)) + else if(now - self->last_did_work >= std::chrono::milliseconds(ms_sleep_for_more_work.load(std::memory_order_relaxed))) { _remove_from_list(threadpool_active, self); threadpool_threads.fetch_sub(1, std::memory_order_release); @@ -1104,14 +1211,14 @@ namespace detail threadpool_sleeping_count.fetch_add(1, std::memory_order_release); if(earliest_absolute != std::chrono::system_clock::time_point()) { -#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING +#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING std::cout << "*** DTP " << self << " goes to sleep absolute" << std::endl; #endif self->cond.wait_until(g, earliest_absolute); } else { -#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING +#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING std::cout << "*** DTP " << self << " goes to sleep for " << std::chrono::duration_cast<std::chrono::milliseconds>(duration).count() << std::endl; #endif self->cond.wait_for(g, duration); @@ -1120,7 +1227,7 @@ namespace detail _remove_from_list(threadpool_sleeping, self); _append_to_list(threadpool_active, self); threadpool_sleeping_count.fetch_sub(1, std::memory_order_release); -#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING +#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING std::cout << "*** DTP " << self << " wakes, state = " << self->state << std::endl; #endif g.unlock(); @@ -1135,7 +1242,7 @@ namespace detail continue; } self->last_did_work = now; -#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING +#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING std::cout << "*** DTP " << self << " executes work item " << workitem << std::endl; #endif total_submitted_workitems.fetch_sub(1, std::memory_order_relaxed); @@ -1169,16 +1276,16 @@ namespace detail inline void global_dynamic_thread_pool_impl::_submit_work_item(_lock_guard &g, dynamic_thread_pool_group::work_item *workitem, bool defer_pool_wake) { (void) g; + (void) defer_pool_wake; if(workitem->_nextwork != -1) { // If no work item for now, or there is a delay, schedule a timer - if(workitem->_nextwork == 0 || workitem->_timepoint1 != std::chrono::steady_clock::time_point() || - workitem->_timepoint2 != std::chrono::system_clock::time_point()) + if(workitem->_nextwork == 0 || workitem->_has_timer_set()) { assert(workitem->_internaltimerh != nullptr); #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD dispatch_time_t when; - if(workitem->_timepoint1 != std::chrono::steady_clock::time_point()) + if(workitem->_has_timer_set_relative()) { auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(workitem->_timepoint1 - std::chrono::steady_clock::now()).count(); if(duration > 1000000000LL) @@ -1191,7 +1298,7 @@ namespace detail } when = dispatch_time(DISPATCH_TIME_NOW, duration); } - else if(workitem->_timepoint2 != std::chrono::system_clock::time_point()) + else if(workitem->_has_timer_set_absolute()) { deadline d(workitem->_timepoint2); auto now = std::chrono::system_clock::now(); @@ -1210,7 +1317,7 @@ namespace detail #elif defined(_WIN32) LARGE_INTEGER li; DWORD slop = 1000; - if(workitem->_timepoint1 != std::chrono::steady_clock::time_point()) + if(workitem->_has_timer_set_relative()) { li.QuadPart = std::chrono::duration_cast<std::chrono::nanoseconds>(workitem->_timepoint1 - std::chrono::steady_clock::now()).count() / 100; if(li.QuadPart < 0) @@ -1223,7 +1330,7 @@ namespace detail } li.QuadPart = -li.QuadPart; // negative is relative } - else if(workitem->_timepoint2 != std::chrono::system_clock::time_point()) + else if(workitem->_has_timer_set_absolute()) { li = windows_nt_kernel::from_timepoint(workitem->_timepoint2); } @@ -1328,7 +1435,7 @@ namespace detail auto uninit = make_scope_exit([&]() noexcept { for(auto *i : work) { - _remove_from_list(group->_work_items_active, i); + _remove_from_list(!i->_has_timer_set() ? group->_work_items_active : group->_work_items_timer, i); #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD i->_internalworkh = nullptr; i->_internaltimerh = nullptr; @@ -1368,13 +1475,24 @@ namespace detail } #endif OUTCOME_TRY(_prepare_work_item_delay(i, group->_grouph, d)); - _append_to_list(group->_work_items_active, i); + if(!i->_has_timer_set()) + { + _prepend_to_list(group->_work_items_active, i); + } + else + { + _prepend_to_list(group->_work_items_timer, i); + } } } uninit.release(); { for(auto *i : work) { +#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) + group->_newly_added_active_work_items++; + group->_active_work_items_remaining++; +#endif _submit_work_item(g, i, i != work.back()); } } @@ -1390,6 +1508,9 @@ namespace detail { (void) g; // std::cout << "*** _work_item_done " << i << std::endl; + auto *parent = i->_parent.load(std::memory_order_relaxed); + _remove_from_list(!i->_has_timer_set() ? parent->_work_items_active : parent->_work_items_timer, i); + _append_to_list(parent->_work_items_done, i); #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD i->_internaltimerh = nullptr; i->_internalworkh = nullptr; @@ -1418,11 +1539,8 @@ namespace detail std::cout << "*** DTP sets done work item " << i << std::endl; #endif #endif - _remove_from_list(i->_parent.load(std::memory_order_relaxed)->_work_items_active, i); - _append_to_list(i->_parent.load(std::memory_order_relaxed)->_work_items_done, i); - if(i->_parent.load(std::memory_order_relaxed)->_work_items_active.count == 0) + if(parent->_work_items_timer.count == 0 && parent->_work_items_active.count == 0) { - auto *parent = i->_parent.load(std::memory_order_relaxed); i = nullptr; auto *v = parent->_work_items_done.front, *n = v; for(; v != nullptr; v = n) @@ -1542,7 +1660,7 @@ namespace detail group->_waits.fetch_add(1, std::memory_order_release); auto unwaitcount = make_scope_exit([&]() noexcept { group->_waits.fetch_sub(1, std::memory_order_release); }); #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD - while(group->_work_items_active.count > 0) + while(group->_work_items_timer.count > 0 || group->_work_items_active.count > 0) { LLFIO_DEADLINE_TO_TIMEOUT_LOOP(d); dispatch_time_t timeout = DISPATCH_TIME_FOREVER; @@ -1572,9 +1690,9 @@ namespace detail // Is this a cancellation? if(group->_stopping.load(std::memory_order_relaxed)) { - while(group->_work_items_active.count > 0) + while(group->_work_items_timer.count > 0 || group->_work_items_active.count > 0) { - auto *i = group->_work_items_active.front; + auto *i = (group->_work_items_active.front != nullptr) ? group->_work_items_active.front : group->_work_items_timer.front; if(nullptr != i->_internalworkh) { if(0 == i->_internalworkh_inuse) @@ -1623,6 +1741,11 @@ namespace detail } i->_internalworkh_inuse = 0; } + if(group->_work_items_timer.count > 0 && group->_work_items_timer.front == i) + { + // This item got cancelled before it started + _work_item_done(g, group->_work_items_timer.front); + } if(group->_work_items_active.count > 0 && group->_work_items_active.front == i) { // This item got cancelled before it started @@ -1633,9 +1756,9 @@ namespace detail } else if(!d) { - while(group->_work_items_active.count > 0) + while(group->_work_items_timer.count > 0 || group->_work_items_active.count > 0) { - auto *i = group->_work_items_active.front; + auto *i = (group->_work_items_active.front != nullptr) ? group->_work_items_active.front : group->_work_items_timer.front; if(nullptr != i->_internalworkh) { if(0 == i->_internalworkh_inuse) @@ -1688,7 +1811,7 @@ namespace detail } else { - while(group->_work_items_active.count > 0) + while(group->_work_items_timer.count > 0 || group->_work_items_active.count > 0) { LLFIO_DEADLINE_TO_TIMEOUT_LOOP(d); g.unlock(); @@ -1704,6 +1827,16 @@ namespace detail for(bool done = false; !done;) { done = true; + for(auto *p = group->_work_items_timer.front; p != nullptr; p = p->_next) + { + if(p->_internalworkh == nullptr) + { + _remove_from_list(group->_work_items_timer, p); + _append_to_list(group->_work_items_done, p); + done = false; + break; + } + } for(auto *p = group->_work_items_active.front; p != nullptr; p = p->_next) { if(p->_internalworkh == nullptr) @@ -1717,7 +1850,7 @@ namespace detail } } #endif - while(group->_work_items_active.count > 0) + while(group->_work_items_timer.count > 0 || group->_work_items_active.count > 0) { LLFIO_DEADLINE_TO_TIMEOUT_LOOP(d); g.unlock(); @@ -1726,7 +1859,7 @@ namespace detail } #endif } - if(group->_work_items_active.count > 0) + if(group->_work_items_timer.count > 0 || group->_work_items_active.count > 0) { return errc::timed_out; } @@ -1748,7 +1881,7 @@ namespace detail _work_item_done(g, workitem); return; } - if(workitem->_timepoint1 != std::chrono::steady_clock::time_point()) + if(workitem->_has_timer_set_relative()) { #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD auto now = std::chrono::steady_clock::now(); @@ -1761,7 +1894,7 @@ namespace detail #endif workitem->_timepoint1 = {}; } - if(workitem->_timepoint2 != std::chrono::system_clock::time_point()) + if(workitem->_has_timer_set_absolute()) { #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD auto now = std::chrono::system_clock::now(); diff --git a/include/llfio/v2.0/dynamic_thread_pool_group.hpp b/include/llfio/v2.0/dynamic_thread_pool_group.hpp index 748bd94c..6caf6bf3 100644 --- a/include/llfio/v2.0/dynamic_thread_pool_group.hpp +++ b/include/llfio/v2.0/dynamic_thread_pool_group.hpp @@ -1,5 +1,5 @@ /* Dynamic thread pool group -(C) 2020 Niall Douglas <http://www.nedproductions.biz/> (9 commits) +(C) 2020-2021 Niall Douglas <http://www.nedproductions.biz/> (9 commits) File Created: Dec 2020 @@ -168,6 +168,7 @@ allocations. */ class LLFIO_DECL dynamic_thread_pool_group { + friend class dynamic_thread_pool_group_impl; public: //! An individual item of work within the work group. class work_item @@ -175,7 +176,8 @@ public: friend struct detail::global_dynamic_thread_pool_impl; friend class dynamic_thread_pool_group_impl; std::atomic<dynamic_thread_pool_group_impl *> _parent{nullptr}; - void *_internalworkh{nullptr}, *_internaltimerh{nullptr}; + void *_internalworkh{nullptr}; + void *_internaltimerh{nullptr}; // lazily created if next() ever returns a deadline work_item *_prev{nullptr}, *_next{nullptr}; intptr_t _nextwork{-1}; std::chrono::steady_clock::time_point _timepoint1; @@ -183,7 +185,9 @@ public: int _internalworkh_inuse{0}; protected: - void *_private{nullptr}; + constexpr bool _has_timer_set_relative() const noexcept { return _timepoint1 != std::chrono::steady_clock::time_point(); } + constexpr bool _has_timer_set_absolute() const noexcept { return _timepoint2 != std::chrono::system_clock::time_point(); } + constexpr bool _has_timer_set() const noexcept { return _has_timer_set_relative() || _has_timer_set_absolute(); } constexpr work_item() {} work_item(const work_item &o) = delete; @@ -197,7 +201,6 @@ public: , _timepoint1(o._timepoint1) , _timepoint2(o._timepoint2) , _internalworkh_inuse(o._internalworkh_inuse) - , _private(o._private) { assert(o._parent.load(std::memory_order_relaxed) == nullptr); assert(o._internalworkh == nullptr); @@ -210,7 +213,6 @@ public: o._prev = o._next = nullptr; o._nextwork = -1; o._internalworkh_inuse = 0; - o._private = nullptr; } work_item &operator=(const work_item &) = delete; work_item &operator=(work_item &&) = delete; @@ -459,6 +461,20 @@ public: static LLFIO_HEADERS_ONLY_MEMFUNC_SPEC size_t current_nesting_level() noexcept; //! Returns the work item the calling thread is running within, if any. static LLFIO_HEADERS_ONLY_MEMFUNC_SPEC work_item *current_work_item() noexcept; + /*! \brief Returns the number of milliseconds that a thread is without work before it is shut down. + Note that this will be zero on all but on Linux if using our local thread pool + implementation, because the system controls this value on Windows, Grand Central + Dispatch etc. + */ + static LLFIO_HEADERS_ONLY_MEMFUNC_SPEC uint32_t ms_sleep_for_more_work() noexcept; + /*! \brief Sets the number of milliseconds that a thread is without work before it is shut down, + returning the value actually set. + + Note that this will have no effect (and thus return zero) on all but on Linux if + using our local thread pool implementation, because the system controls this value + on Windows, Grand Central Dispatch etc. + */ + static LLFIO_HEADERS_ONLY_MEMFUNC_SPEC uint32_t ms_sleep_for_more_work(uint32_t v) noexcept; }; //! A unique ptr to a work group within the global dynamic thread pool. using dynamic_thread_pool_group_ptr = std::unique_ptr<dynamic_thread_pool_group>; diff --git a/test/tests/dynamic_thread_pool_group.cpp b/test/tests/dynamic_thread_pool_group.cpp index 6e985fc8..dfc46e9a 100644 --- a/test/tests/dynamic_thread_pool_group.cpp +++ b/test/tests/dynamic_thread_pool_group.cpp @@ -199,6 +199,7 @@ static inline void TestDynamicThreadPoolGroupWorks() reset(1); submit(); check(); + exit(0); // Test 10 work items reset(10); |