From 9337a11d0a820df50620f479300dcdde516f563e Mon Sep 17 00:00:00 2001 From: "Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com)" Date: Fri, 12 Mar 2021 11:57:51 +0000 Subject: Fix failure to execute with priority on Linux native dynamic_thread_pool_group. --- include/llfio/revision.hpp | 6 +-- .../v2.0/detail/impl/dynamic_thread_pool_group.ipp | 47 +++++++++++++--------- test/tests/dynamic_thread_pool_group.cpp | 2 +- 3 files changed, 31 insertions(+), 24 deletions(-) diff --git a/include/llfio/revision.hpp b/include/llfio/revision.hpp index b36857ae..f0eeb8c2 100644 --- a/include/llfio/revision.hpp +++ b/include/llfio/revision.hpp @@ -1,4 +1,4 @@ // Note the second line of this file must ALWAYS be the git SHA, third line ALWAYS the git SHA update time -#define LLFIO_PREVIOUS_COMMIT_REF 1cf5addd4a11570624f8faf17491431d7dde5427 -#define LLFIO_PREVIOUS_COMMIT_DATE "2021-02-23 12:58:16 +00:00" -#define LLFIO_PREVIOUS_COMMIT_UNIQUE 1cf5addd +#define LLFIO_PREVIOUS_COMMIT_REF 45c0392e65da7e53984b3b734fbb810bb38561c8 +#define LLFIO_PREVIOUS_COMMIT_DATE "2021-03-11 11:47:09 +00:00" +#define LLFIO_PREVIOUS_COMMIT_UNIQUE 45c0392e diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp index 7b2de48e..e3c5c347 100644 --- a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp +++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp @@ -543,6 +543,7 @@ namespace detail global_dynamic_thread_pool()._timerthread(workitem, threadh); } #else + global_dynamic_thread_pool_impl_workqueue_item first_execute{(size_t) -1, {}}; using threadh_type = void *; using grouph_type = void *; std::mutex threadpool_lock; @@ -1434,12 +1435,7 @@ namespace detail std::chrono::system_clock::time_point now_system, earliest_absolute; // Start from highest priority work group, executing any timers due before selecting a work item { - workqueue_lock.lock(); - auto lock_wq = workqueue; // take shared_ptr to highest priority collection of work groups - workqueue_lock.unlock(); - while(lock_wq) - { - auto &wq = *lock_wq; + auto examine_wq = [&](global_dynamic_thread_pool_impl_workqueue_item &wq) -> dynamic_thread_pool_group::work_item * { if(wq.next_timer_relative.count.load(std::memory_order_relaxed) > 0) { if(now_steady == std::chrono::steady_clock::time_point()) @@ -1453,7 +1449,7 @@ namespace detail { workitem = wq.next_timer<1>(); // unlocks wq.next_timer_relative.lock workitem_is_timer = true; - break; + return workitem; } if(earliest_duration == std::chrono::steady_clock::time_point() || wq.next_timer_relative.front->_timepoint1 < earliest_duration) { @@ -1475,7 +1471,7 @@ namespace detail { workitem = wq.next_timer<2>(); // unlocks wq.next_timer_absolute.lock workitem_is_timer = true; - break; + return workitem; } if(earliest_absolute == std::chrono::system_clock::time_point() || wq.next_timer_absolute.front->_timepoint2 < earliest_absolute) { @@ -1485,17 +1481,28 @@ namespace detail wq.next_timer_absolute.lock.unlock(); } unsigned count = 0; - workitem = wq.next_active(count, mythreadidx); - if(workitem != nullptr) + return wq.next_active(count, mythreadidx); + }; + workitem = examine_wq(first_execute); + if(workitem == nullptr) + { + workqueue_lock.lock(); + auto lock_wq = workqueue; // take shared_ptr to highest priority collection of work groups + workqueue_lock.unlock(); + while(lock_wq) { + workitem = examine_wq(*lock_wq); + if(workitem != nullptr) + { + // workqueue_lock.lock(); + // std::cout << "workitem = " << workitem << " nesting_level = " << wq.nesting_level << " count = " << count << std::endl; + // workqueue_lock.unlock(); + break; + } workqueue_lock.lock(); - //std::cout << "workitem = " << workitem << " nesting_level = " << wq.nesting_level << " count = " << count << std::endl; + lock_wq = lock_wq->next; workqueue_lock.unlock(); - break; } - workqueue_lock.lock(); - lock_wq = lock_wq->next; - workqueue_lock.unlock(); } } if(now_steady == std::chrono::steady_clock::time_point()) @@ -1755,21 +1762,21 @@ namespace detail SubmitThreadpoolWork((PTP_WORK) workitem->_internalworkh); #else global_dynamic_thread_pool_impl::workqueue_guard gg(workqueue_lock); - if(submit_into_highest_priority || workqueue->nesting_level == parent->_nesting_level) + if(submit_into_highest_priority) { // TODO: It would be super nice if we prepended this instead if it came from a timer - workqueue->append_active(workitem); - //std::cout << "append_active _nesting_level = " << parent->_nesting_level << std::endl; + first_execute.append_active(workitem); + // std::cout << "append_active _nesting_level = " << parent->_nesting_level << std::endl; } else { - for(auto *p = workqueue->next.get(); p != nullptr; p = p->next.get()) + for(auto *p = workqueue.get(); p != nullptr; p = p->next.get()) { if(p->nesting_level == parent->_nesting_level) { // TODO: It would be super nice if we prepended this instead if it came from a timer p->append_active(workitem); - //std::cout << "append_active _nesting_level = " << parent->_nesting_level << std::endl; + // std::cout << "append_active _nesting_level = " << parent->_nesting_level << std::endl; break; } } diff --git a/test/tests/dynamic_thread_pool_group.cpp b/test/tests/dynamic_thread_pool_group.cpp index 4989c585..66a49425 100644 --- a/test/tests/dynamic_thread_pool_group.cpp +++ b/test/tests/dynamic_thread_pool_group.cpp @@ -341,7 +341,7 @@ static inline void TestDynamicThreadPoolGroupNestingWorks() shared_states[n].calc_stddev(); std::cout << " Standard deviation for nesting level " << (n + 1) << " was " << shared_states[n].stddev << std::endl; } - BOOST_CHECK(shared_states[MAX_NESTING - 1].stddev < shared_states[MAX_NESTING / 2].stddev / 2); + BOOST_CHECK(shared_states[MAX_NESTING - 1].stddev < shared_states[MAX_NESTING / 4].stddev * 3 / 4); } static inline void TestDynamicThreadPoolGroupIoAwareWorks() -- cgit v1.2.3