diff options
author | Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com> | 2021-01-15 14:26:12 +0300 |
---|---|---|
committer | Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com> | 2021-03-16 13:21:40 +0300 |
commit | 73f5571337a4760d526ea378d96f5b93693beb17 (patch) | |
tree | 10e4db01e0176c2532c810ab8de9faf971ccb599 | |
parent | 476b74799c4504a248d2fe63ae060f8eead21c47 (diff) |
Make the dynamic thread pool work scheduling not quite as strict in the native Linux implementation, now we only examine higher priority groups once per group cycle. This produces behaviours much more consistent with the Win32 thread pool and Grand Central Dispatch.
-rw-r--r-- | include/llfio/revision.hpp | 6 | ||||
-rw-r--r-- | include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp | 35 | ||||
-rw-r--r-- | test/tests/dynamic_thread_pool_group.cpp | 10 |
3 files changed, 34 insertions, 17 deletions
diff --git a/include/llfio/revision.hpp b/include/llfio/revision.hpp index e5f28dcc..c72e8041 100644 --- a/include/llfio/revision.hpp +++ b/include/llfio/revision.hpp @@ -1,4 +1,4 @@ // Note the second line of this file must ALWAYS be the git SHA, third line ALWAYS the git SHA update time -#define LLFIO_PREVIOUS_COMMIT_REF 45112b3cffebb5f8409c0edfc8c8879a0aeaf516 -#define LLFIO_PREVIOUS_COMMIT_DATE "2021-01-02 17:33:42 +00:00" -#define LLFIO_PREVIOUS_COMMIT_UNIQUE 45112b3c +#define LLFIO_PREVIOUS_COMMIT_REF df3f68c97a25654d5fc659ada8a7bc04c7c80e84 +#define LLFIO_PREVIOUS_COMMIT_DATE "2021-01-13 12:32:47 +00:00" +#define LLFIO_PREVIOUS_COMMIT_UNIQUE df3f68c9 diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp index a04aef3c..4950bff1 100644 --- a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp +++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp @@ -548,6 +548,7 @@ namespace detail #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING std::cout << "*** DTP " << self << " begins." << std::endl; #endif + size_t workqueue_depth = workqueue.size() - 1; while(self->state > 0) { restart: @@ -556,10 +557,8 @@ namespace detail std::chrono::system_clock::time_point earliest_absolute; if(!workqueue.empty()) { - auto wq = --workqueue.end(); -#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING - std::cout << "*** DTP " << self << " restarts from top of work queue" << std::endl; -#endif + auto wq_it = (workqueue_depth >= workqueue.size()) ? --workqueue.end() : (workqueue.begin() + workqueue_depth); + auto *wq = &(*wq_it); for(;;) { dynamic_thread_pool_group_impl *tpg = *wq->currentgroup; @@ -583,7 +582,7 @@ namespace detail workitem->_internalworkh = self; #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING std::cout << "*** DTP " << self << " chooses work item " << workitem << " from group " << tpg << " distance from top " - << (workqueue.end() - wq - 1) << std::endl; + << (workqueue.end() - wq_it - 1) << std::endl; #endif break; } @@ -634,6 +633,7 @@ namespace detail else { auto startinggroup = wq->currentgroup; + bool at_top = (workqueue_depth == workqueue.size() - 1); do { gg.unlock(); // unlock group @@ -641,18 +641,26 @@ namespace detail { wq->currentgroup = wq->items.begin(); } + if(!at_top) + { + workqueue_depth = workqueue.size() - 1; + wq_it = --workqueue.end(); + at_top = true; + wq = &(*wq_it); + startinggroup = wq->currentgroup; + } tpg = *wq->currentgroup; gg = _lock_guard(tpg->_lock); // lock group - if(startinggroup == wq->currentgroup) - { #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING - std::cout << "*** DTP " << self << " workqueue distance " << (workqueue.end() - wq - 1) << " examining " << tpg - << " finds _work_items_active.count = " << tpg->_work_items_active.count << "." << std::endl; + std::cout << "*** DTP " << self << " workqueue distance " << (workqueue.end() - wq_it - 1) << " examining " << tpg + << " finds _work_items_active.count = " << tpg->_work_items_active.count << "." << std::endl; #endif + if(startinggroup == wq->currentgroup) + { if(tpg->_work_items_active.count == 0 || tpg->_work_items_active.front->_internalworkh != nullptr) { // Nothing for me to do in this workqueue - if(wq == workqueue.begin()) + if(wq_it == workqueue.begin()) { assert(workitem == nullptr); #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING @@ -661,12 +669,15 @@ namespace detail goto workqueue_empty; } gg.unlock(); // unlock group - --wq; + --wq_it; + workqueue_depth = wq_it - workqueue.begin(); + wq = &(*wq_it); tpg = *wq->currentgroup; startinggroup = wq->currentgroup; gg = _lock_guard(tpg->_lock); // lock group #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING - std::cout << "*** DTP " << self << " moves work search up to distance from top = " << (workqueue.end() - wq - 1) << std::endl; + std::cout << "*** DTP " << self << " moves work search up to distance from top = " << (workqueue.end() - wq_it - 1) << " examining " << tpg + << " finds _work_items_active.count = " << tpg->_work_items_active.count << "." << std::endl; #endif continue; } diff --git a/test/tests/dynamic_thread_pool_group.cpp b/test/tests/dynamic_thread_pool_group.cpp index a40d3b05..6e985fc8 100644 --- a/test/tests/dynamic_thread_pool_group.cpp +++ b/test/tests/dynamic_thread_pool_group.cpp @@ -226,6 +226,11 @@ static inline void TestDynamicThreadPoolGroupWorks() static inline void TestDynamicThreadPoolGroupNestingWorks() { + if(std::thread::hardware_concurrency() < 4) + { + std::cout << "NOTE: Skipping TestDynamicThreadPoolGroupNestingWorks as hardware concurrency is below 4." << std::endl; + return; + } namespace llfio = LLFIO_V2_NAMESPACE; static constexpr size_t MAX_NESTING = 10; static constexpr int COUNT_PER_WORK_ITEM = 1000; @@ -299,6 +304,7 @@ static inline void TestDynamicThreadPoolGroupNestingWorks() } uint64_t idx = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count(); std::lock_guard<std::mutex> g(shared_states[nesting].lock); + //std::cout << "wi " << this << " nesting " << nesting << " work " << work << std::endl; if(COUNT_PER_WORK_ITEM == work && childwi) { if(!shared_states[nesting].tpg) @@ -438,8 +444,8 @@ static inline void TestDynamicThreadPoolGroupIoAwareWorks() BOOST_CHECK(paced > 0); } -//KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, works, "Tests that llfio::dynamic_thread_pool_group works as expected", -// TestDynamicThreadPoolGroupWorks()) +KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, works, "Tests that llfio::dynamic_thread_pool_group works as expected", + TestDynamicThreadPoolGroupWorks()) KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, nested, "Tests that nesting of llfio::dynamic_thread_pool_group works as expected", TestDynamicThreadPoolGroupNestingWorks()) //KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, io_aware_work_item, |