From 0cf5eafa75c3cc66fd56a8a9a3dcc7089254846a Mon Sep 17 00:00:00 2001 From: "Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com)" Date: Wed, 24 Feb 2021 13:08:45 +0000 Subject: Simplify non-native-Linux implementation. Appears to make little difference to benchmarks. --- .../v2.0/detail/impl/dynamic_thread_pool_group.ipp | 95 ++++++---------------- 1 file changed, 25 insertions(+), 70 deletions(-) (limited to 'include') diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp index 3c8bba1f..41975697 100644 --- a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp +++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp @@ -975,8 +975,7 @@ namespace detail return success(); } - inline void _submit_work_item(_lock_guard &g, bool item_in_timer_list, bool submit_into_highest_priority, dynamic_thread_pool_group::work_item *workitem, - bool defer_pool_wake); + inline void _submit_work_item(_lock_guard &g, bool submit_into_highest_priority, dynamic_thread_pool_group::work_item *workitem, bool defer_pool_wake); inline result submit(_lock_guard &g, dynamic_thread_pool_group_impl *group, span work) noexcept; @@ -1032,7 +1031,7 @@ class dynamic_thread_pool_group_impl final : public dynamic_thread_pool_group { size_t count{0}; dynamic_thread_pool_group::work_item *front{nullptr}, *back{nullptr}; - } _work_items_timer, _work_items_active, _work_items_done, _work_items_delayed; + } _work_items_active, _work_items_done, _work_items_delayed; std::atomic _stopping{false}, _stopped{true}, _completing{false}; std::atomic _waits{0}; result _abnormal_completion_cause{success()}; // The cause of any abnormal group completion @@ -1129,12 +1128,12 @@ public: _stopped.store(false, std::memory_order_release); auto &impl = detail::global_dynamic_thread_pool(); _lock_guard g(_lock); // lock group - if(_work_items_timer.count == 0 && _work_items_active.count == 0 && _work_items_done.count == 0) + if(_work_items_active.count == 0 && _work_items_done.count == 0) { _abnormal_completion_cause = success(); } OUTCOME_TRY(impl.submit(g, this, work)); - if(_work_items_timer.count == 0 && _work_items_active.count == 0) + if(_work_items_active.count == 0) { _stopped.store(true, std::memory_order_release); } @@ -1386,7 +1385,7 @@ namespace detail } #endif - inline void global_dynamic_thread_pool_impl::_submit_work_item(_lock_guard &g, bool item_in_timer_list, bool submit_into_highest_priority, + inline void global_dynamic_thread_pool_impl::_submit_work_item(_lock_guard &g, bool submit_into_highest_priority, dynamic_thread_pool_group::work_item *workitem, bool defer_pool_wake) { (void) g; @@ -1399,11 +1398,6 @@ namespace detail if(workitem->_nextwork == 0 || workitem->_has_timer_set()) { assert(workitem->_internaltimerh != nullptr); - if(!item_in_timer_list) - { - _remove_from_list(parent->_work_items_active, workitem); - _append_to_list(parent->_work_items_timer, workitem); - } #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD dispatch_time_t when; if(workitem->_has_timer_set_relative()) @@ -1488,11 +1482,6 @@ namespace detail } else { - if(item_in_timer_list) - { - _remove_from_list(parent->_work_items_timer, workitem); - _append_to_list(parent->_work_items_active, workitem); - } #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD intptr_t priority = DISPATCH_QUEUE_PRIORITY_LOW; if(workqueue.size() - parent->_nesting_level == 1) @@ -1522,15 +1511,8 @@ namespace detail #else _lock_guard gg(workqueue_lock); auto *wq = &workqueue[submit_into_highest_priority ? (workqueue.size() - 1) : parent->_nesting_level]; - // If the item came from a timer, prioritise execution - if(item_in_timer_list) - { - wq->prepend_active(workitem); - } - else - { - wq->append_active(workitem); - } + // TODO: It would be super nice if we prepended this instead if it came from a timer + wq->append_active(workitem); #endif } @@ -1593,7 +1575,7 @@ namespace detail auto uninit = make_scope_exit([&]() noexcept { for(auto *i : work) { - _remove_from_list(!i->_has_timer_set() ? group->_work_items_active : group->_work_items_timer, i); + _remove_from_list(group->_work_items_active, i); #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD i->_internalworkh = nullptr; i->_internaltimerh = nullptr; @@ -1633,14 +1615,7 @@ namespace detail } #endif OUTCOME_TRY(_prepare_work_item_delay(i, group->_grouph, d)); - if(!i->_has_timer_set()) - { - _prepend_to_list(group->_work_items_active, i); - } - else - { - _prepend_to_list(group->_work_items_timer, i); - } + _prepend_to_list(group->_work_items_active, i); } } uninit.release(); @@ -1651,7 +1626,7 @@ namespace detail group->_newly_added_active_work_items++; group->_active_work_items_remaining++; #endif - _submit_work_item(g, i->_has_timer_set(), true, i, i != work.back()); + _submit_work_item(g, true, i, i != work.back()); } } return success(); @@ -1667,7 +1642,7 @@ namespace detail (void) g; // std::cout << "*** _work_item_done " << i << std::endl; auto *parent = i->_parent.load(std::memory_order_relaxed); - _remove_from_list(!i->_has_timer_set() ? parent->_work_items_active : parent->_work_items_timer, i); + _remove_from_list(parent->_work_items_active, i); _append_to_list(parent->_work_items_done, i); #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD i->_internaltimerh = nullptr; @@ -1697,7 +1672,7 @@ namespace detail std::cout << "*** DTP sets done work item " << i << std::endl; #endif #endif - if(parent->_work_items_timer.count == 0 && parent->_work_items_active.count == 0) + if(parent->_work_items_active.count == 0) { i = nullptr; auto *v = parent->_work_items_done.front, *n = v; @@ -1796,7 +1771,7 @@ namespace detail group->_waits.fetch_add(1, std::memory_order_release); auto unwaitcount = make_scope_exit([&]() noexcept { group->_waits.fetch_sub(1, std::memory_order_release); }); #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD - while(group->_work_items_timer.count > 0 || group->_work_items_active.count > 0) + while(group->_work_items_active.count > 0) { LLFIO_DEADLINE_TO_TIMEOUT_LOOP(d); dispatch_time_t timeout = DISPATCH_TIME_FOREVER; @@ -1826,9 +1801,9 @@ namespace detail // Is this a cancellation? if(group->_stopping.load(std::memory_order_relaxed)) { - while(group->_work_items_timer.count > 0 || group->_work_items_active.count > 0) + while(group->_work_items_active.count > 0) { - auto *i = (group->_work_items_active.front != nullptr) ? group->_work_items_active.front : group->_work_items_timer.front; + auto *i = group->_work_items_active.front; if(nullptr != i->_internalworkh) { if(0 == i->_internalworkh_inuse) @@ -1877,11 +1852,6 @@ namespace detail } i->_internalworkh_inuse = 0; } - if(group->_work_items_timer.count > 0 && group->_work_items_timer.front == i) - { - // This item got cancelled before it started - _work_item_done(g, group->_work_items_timer.front); - } if(group->_work_items_active.count > 0 && group->_work_items_active.front == i) { // This item got cancelled before it started @@ -1892,9 +1862,9 @@ namespace detail } else if(!d) { - while(group->_work_items_timer.count > 0 || group->_work_items_active.count > 0) + while(group->_work_items_active.count > 0) { - auto *i = (group->_work_items_active.front != nullptr) ? group->_work_items_active.front : group->_work_items_timer.front; + auto *i = group->_work_items_active.front; if(nullptr != i->_internalworkh) { if(0 == i->_internalworkh_inuse) @@ -1947,7 +1917,7 @@ namespace detail } else { - while(group->_work_items_timer.count > 0 || group->_work_items_active.count > 0) + while(group->_work_items_active.count > 0) { LLFIO_DEADLINE_TO_TIMEOUT_LOOP(d); g.unlock(); @@ -1963,16 +1933,6 @@ namespace detail for(bool done = false; !done;) { done = true; - for(auto *p = group->_work_items_timer.front; p != nullptr; p = p->_next) - { - if(p->_internalworkh == nullptr) - { - _remove_from_list(group->_work_items_timer, p); - _append_to_list(group->_work_items_done, p); - done = false; - break; - } - } for(auto *p = group->_work_items_active.front; p != nullptr; p = p->_next) { if(p->_internalworkh == nullptr) @@ -1986,7 +1946,7 @@ namespace detail } } #endif - while(group->_work_items_timer.count > 0 || group->_work_items_active.count > 0) + while(group->_work_items_active.count > 0) { LLFIO_DEADLINE_TO_TIMEOUT_LOOP(d); g.unlock(); @@ -1995,7 +1955,7 @@ namespace detail } #endif } - if(group->_work_items_timer.count > 0 || group->_work_items_active.count > 0) + if(group->_work_items_active.count > 0) { return errc::timed_out; } @@ -2026,7 +1986,7 @@ namespace detail if(workitem->_timepoint1 - now > std::chrono::seconds(0)) { // Timer fired short, so schedule it again - _submit_work_item(g, true, false, workitem, false); + _submit_work_item(g, false, workitem, false); return; } #endif @@ -2039,7 +1999,7 @@ namespace detail if(workitem->_timepoint2 - now > std::chrono::seconds(0)) { // Timer fired short, so schedule it again - _submit_work_item(g, true, false, workitem, false); + _submit_work_item(g, false, workitem, false); return; } #endif @@ -2051,11 +2011,6 @@ namespace detail deadline d(std::chrono::seconds(0)); workitem->_nextwork = workitem->next(d); auto r2 = _prepare_work_item_delay(workitem, parent->_grouph, d); - if(!workitem->_has_timer_set()) - { - _remove_from_list(parent->_work_items_timer, workitem); - _append_to_list(parent->_work_items_active, workitem); - } if(!r2) { (void) stop(g, parent, std::move(r2)); @@ -2067,10 +2022,10 @@ namespace detail _work_item_done(g, workitem); return; } - _submit_work_item(g, false, false, workitem, false); + _submit_work_item(g, false, workitem, false); return; } - _submit_work_item(g, true, false, workitem, false); + _submit_work_item(g, false, workitem, false); } // Worker thread entry point @@ -2126,7 +2081,7 @@ namespace detail _work_item_done(g, workitem); return; } - _submit_work_item(g, false, false, workitem, false); + _submit_work_item(g, false, workitem, false); } } } // namespace detail -- cgit v1.2.3