Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/windirstat/llfio.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2021-01-15 14:26:12 +0300
committerNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2021-03-16 13:21:40 +0300
commit73f5571337a4760d526ea378d96f5b93693beb17 (patch)
tree10e4db01e0176c2532c810ab8de9faf971ccb599
parent476b74799c4504a248d2fe63ae060f8eead21c47 (diff)
Make the dynamic thread pool work scheduling not quite as strict in the native Linux implementation, now we only examine higher priority groups once per group cycle. This produces behaviours much more consistent with the Win32 thread pool and Grand Central Dispatch.
-rw-r--r--include/llfio/revision.hpp6
-rw-r--r--include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp35
-rw-r--r--test/tests/dynamic_thread_pool_group.cpp10
3 files changed, 34 insertions, 17 deletions
diff --git a/include/llfio/revision.hpp b/include/llfio/revision.hpp
index e5f28dcc..c72e8041 100644
--- a/include/llfio/revision.hpp
+++ b/include/llfio/revision.hpp
@@ -1,4 +1,4 @@
// Note the second line of this file must ALWAYS be the git SHA, third line ALWAYS the git SHA update time
-#define LLFIO_PREVIOUS_COMMIT_REF 45112b3cffebb5f8409c0edfc8c8879a0aeaf516
-#define LLFIO_PREVIOUS_COMMIT_DATE "2021-01-02 17:33:42 +00:00"
-#define LLFIO_PREVIOUS_COMMIT_UNIQUE 45112b3c
+#define LLFIO_PREVIOUS_COMMIT_REF df3f68c97a25654d5fc659ada8a7bc04c7c80e84
+#define LLFIO_PREVIOUS_COMMIT_DATE "2021-01-13 12:32:47 +00:00"
+#define LLFIO_PREVIOUS_COMMIT_UNIQUE df3f68c9
diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
index a04aef3c..4950bff1 100644
--- a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
+++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
@@ -548,6 +548,7 @@ namespace detail
#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
std::cout << "*** DTP " << self << " begins." << std::endl;
#endif
+ size_t workqueue_depth = workqueue.size() - 1;
while(self->state > 0)
{
restart:
@@ -556,10 +557,8 @@ namespace detail
std::chrono::system_clock::time_point earliest_absolute;
if(!workqueue.empty())
{
- auto wq = --workqueue.end();
-#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
- std::cout << "*** DTP " << self << " restarts from top of work queue" << std::endl;
-#endif
+ auto wq_it = (workqueue_depth >= workqueue.size()) ? --workqueue.end() : (workqueue.begin() + workqueue_depth);
+ auto *wq = &(*wq_it);
for(;;)
{
dynamic_thread_pool_group_impl *tpg = *wq->currentgroup;
@@ -583,7 +582,7 @@ namespace detail
workitem->_internalworkh = self;
#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
std::cout << "*** DTP " << self << " chooses work item " << workitem << " from group " << tpg << " distance from top "
- << (workqueue.end() - wq - 1) << std::endl;
+ << (workqueue.end() - wq_it - 1) << std::endl;
#endif
break;
}
@@ -634,6 +633,7 @@ namespace detail
else
{
auto startinggroup = wq->currentgroup;
+ bool at_top = (workqueue_depth == workqueue.size() - 1);
do
{
gg.unlock(); // unlock group
@@ -641,18 +641,26 @@ namespace detail
{
wq->currentgroup = wq->items.begin();
}
+ if(!at_top)
+ {
+ workqueue_depth = workqueue.size() - 1;
+ wq_it = --workqueue.end();
+ at_top = true;
+ wq = &(*wq_it);
+ startinggroup = wq->currentgroup;
+ }
tpg = *wq->currentgroup;
gg = _lock_guard(tpg->_lock); // lock group
- if(startinggroup == wq->currentgroup)
- {
#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
- std::cout << "*** DTP " << self << " workqueue distance " << (workqueue.end() - wq - 1) << " examining " << tpg
- << " finds _work_items_active.count = " << tpg->_work_items_active.count << "." << std::endl;
+ std::cout << "*** DTP " << self << " workqueue distance " << (workqueue.end() - wq_it - 1) << " examining " << tpg
+ << " finds _work_items_active.count = " << tpg->_work_items_active.count << "." << std::endl;
#endif
+ if(startinggroup == wq->currentgroup)
+ {
if(tpg->_work_items_active.count == 0 || tpg->_work_items_active.front->_internalworkh != nullptr)
{
// Nothing for me to do in this workqueue
- if(wq == workqueue.begin())
+ if(wq_it == workqueue.begin())
{
assert(workitem == nullptr);
#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
@@ -661,12 +669,15 @@ namespace detail
goto workqueue_empty;
}
gg.unlock(); // unlock group
- --wq;
+ --wq_it;
+ workqueue_depth = wq_it - workqueue.begin();
+ wq = &(*wq_it);
tpg = *wq->currentgroup;
startinggroup = wq->currentgroup;
gg = _lock_guard(tpg->_lock); // lock group
#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
- std::cout << "*** DTP " << self << " moves work search up to distance from top = " << (workqueue.end() - wq - 1) << std::endl;
+ std::cout << "*** DTP " << self << " moves work search up to distance from top = " << (workqueue.end() - wq_it - 1) << " examining " << tpg
+ << " finds _work_items_active.count = " << tpg->_work_items_active.count << "." << std::endl;
#endif
continue;
}
diff --git a/test/tests/dynamic_thread_pool_group.cpp b/test/tests/dynamic_thread_pool_group.cpp
index a40d3b05..6e985fc8 100644
--- a/test/tests/dynamic_thread_pool_group.cpp
+++ b/test/tests/dynamic_thread_pool_group.cpp
@@ -226,6 +226,11 @@ static inline void TestDynamicThreadPoolGroupWorks()
static inline void TestDynamicThreadPoolGroupNestingWorks()
{
+ if(std::thread::hardware_concurrency() < 4)
+ {
+ std::cout << "NOTE: Skipping TestDynamicThreadPoolGroupNestingWorks as hardware concurrency is below 4." << std::endl;
+ return;
+ }
namespace llfio = LLFIO_V2_NAMESPACE;
static constexpr size_t MAX_NESTING = 10;
static constexpr int COUNT_PER_WORK_ITEM = 1000;
@@ -299,6 +304,7 @@ static inline void TestDynamicThreadPoolGroupNestingWorks()
}
uint64_t idx = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
std::lock_guard<std::mutex> g(shared_states[nesting].lock);
+ //std::cout << "wi " << this << " nesting " << nesting << " work " << work << std::endl;
if(COUNT_PER_WORK_ITEM == work && childwi)
{
if(!shared_states[nesting].tpg)
@@ -438,8 +444,8 @@ static inline void TestDynamicThreadPoolGroupIoAwareWorks()
BOOST_CHECK(paced > 0);
}
-//KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, works, "Tests that llfio::dynamic_thread_pool_group works as expected",
-// TestDynamicThreadPoolGroupWorks())
+KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, works, "Tests that llfio::dynamic_thread_pool_group works as expected",
+ TestDynamicThreadPoolGroupWorks())
KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, nested, "Tests that nesting of llfio::dynamic_thread_pool_group works as expected",
TestDynamicThreadPoolGroupNestingWorks())
//KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, io_aware_work_item,