Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/windirstat/llfio.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2021-03-11 14:47:02 +0300
committerNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2021-03-16 13:21:46 +0300
commit84d3f89aa4f3663a33aed3853ae48abd546e91a1 (patch)
tree26651016d91a6198cb569b6fb57f02d34a9775ad
parent4550edca2ecb7c4660b8516ef2e9d14f5dd198a4 (diff)
Linux native dynamic_thread_pool_group is working again after last commit refactor, but priority is broken again. Sigh.
-rw-r--r--include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp120
1 files changed, 77 insertions, 43 deletions
diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
index c22cae35..7b2de48e 100644
--- a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
+++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
@@ -245,7 +245,7 @@ namespace detail
}
#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32)
- static constexpr unsigned TOTAL_NEXTACTIVES = 4;
+ static constexpr unsigned TOTAL_NEXTACTIVES = 1;
struct next_active_base_t
{
std::atomic<unsigned> count{0};
@@ -267,48 +267,76 @@ namespace detail
static_assert(sizeof(next_active_work_t) == 64, "next_active_work_t is not a cacheline");
next_active_base_t next_timer_relative, next_timer_absolute;
- dynamic_thread_pool_group::work_item *next_active(size_t threadidx)
+ dynamic_thread_pool_group::work_item *next_active(unsigned &count, size_t threadidx)
{
- threadidx &= ~(TOTAL_NEXTACTIVES - 1);
- const size_t original_threadidx = threadidx;
- bool all_empty = true;
- for(;;)
+ if(TOTAL_NEXTACTIVES > 1)
{
- next_active_base_t &x = next_actives[threadidx];
- if(x.count.load(std::memory_order_relaxed) > 0)
+ threadidx &= (TOTAL_NEXTACTIVES - 1);
+ const size_t original_threadidx = threadidx;
+ bool all_empty = true;
+ for(;;)
{
- all_empty = false;
- if(x.lock.try_lock())
+ next_active_base_t &x = next_actives[threadidx];
+ if(x.count.load(std::memory_order_relaxed) > 0)
{
- auto *ret = x.front;
- if(ret != nullptr)
+ all_empty = false;
+ if(x.lock.try_lock())
{
- x.front = ret->_next_scheduled;
- x.count.fetch_sub(1, std::memory_order_relaxed);
- if(x.front == nullptr)
+ auto *ret = x.front;
+ if(ret != nullptr)
{
- assert(x.back == ret);
- x.back = nullptr;
+ x.front = ret->_next_scheduled;
+ count = x.count.fetch_sub(1, std::memory_order_relaxed);
+ if(x.front == nullptr)
+ {
+ assert(x.back == ret);
+ x.back = nullptr;
+ }
+ ret->_next_scheduled = nullptr;
+ x.lock.unlock();
+ return ret;
}
- ret->_next_scheduled = nullptr;
- return ret;
+ x.lock.unlock();
}
}
- x.lock.unlock();
- }
- if(++threadidx >= TOTAL_NEXTACTIVES)
- {
- threadidx = 0;
+ if(++threadidx >= TOTAL_NEXTACTIVES)
+ {
+ threadidx = 0;
+ }
+ if(threadidx == original_threadidx)
+ {
+ if(all_empty)
+ {
+ return nullptr;
+ }
+ all_empty = true;
+ }
}
- if(threadidx == original_threadidx)
+ }
+ else
+ {
+ next_active_base_t &x = next_actives[0];
+ if(x.count.load(std::memory_order_relaxed) > 0)
{
- if(all_empty)
+ x.lock.lock();
+ auto *ret = x.front;
+ if(ret != nullptr)
{
- return nullptr;
+ x.front = ret->_next_scheduled;
+ count = x.count.fetch_sub(1, std::memory_order_relaxed);
+ if(x.front == nullptr)
+ {
+ assert(x.back == ret);
+ x.back = nullptr;
+ }
+ ret->_next_scheduled = nullptr;
+ x.lock.unlock();
+ return ret;
}
- all_empty = true;
+ x.lock.unlock();
}
}
+ return nullptr;
}
private:
@@ -528,7 +556,7 @@ namespace detail
std::thread thread;
std::condition_variable cond;
std::chrono::steady_clock::time_point last_did_work;
- int state{0}; // <0 = dead, 0 = sleeping/please die, 1 = busy
+ std::atomic<int> state{0}; // <0 = dead, 0 = sleeping/please die, 1 = busy
};
struct threads_t
{
@@ -709,13 +737,13 @@ namespace detail
}
// Threads which went to sleep the longest ago are at the front
auto *t = which.front;
- if(t->state < 0)
+ if(t->state.load(std::memory_order_acquire) < 0)
{
// He's already exiting
return false;
}
- assert(t->state == 0);
- t->state--;
+ assert(t->state.load(std::memory_order_acquire) == 0);
+ t->state.fetch_sub(1, std::memory_order_release);
#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
std::cout << "*** DTP " << t << " is told to quit" << std::endl;
#endif
@@ -724,7 +752,7 @@ namespace detail
g.unlock();
t->cond.notify_one();
g.lock();
- } while(t->state >= -1);
+ } while(t->state.load(std::memory_order_acquire) >= -1);
#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
std::cout << "*** DTP " << t << " has quit, deleting" << std::endl;
#endif
@@ -1393,12 +1421,12 @@ namespace detail
{
pthread_setname_np(pthread_self(), "LLFIO DYN TPG");
self->last_did_work = std::chrono::steady_clock::now();
- self->state++; // busy
+ self->state.fetch_add(1, std::memory_order_release); // busy
const unsigned mythreadidx = threadpool_threads.fetch_add(1, std::memory_order_release);
#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
std::cout << "*** DTP " << self << " begins." << std::endl;
#endif
- while(self->state > 0)
+ while(self->state.load(std::memory_order_relaxed) > 0)
{
dynamic_thread_pool_group::work_item *workitem = nullptr;
bool workitem_is_timer = false;
@@ -1407,7 +1435,7 @@ namespace detail
// Start from highest priority work group, executing any timers due before selecting a work item
{
workqueue_lock.lock();
- auto lock_wq = workqueue;
+ auto lock_wq = workqueue; // take shared_ptr to highest priority collection of work groups
workqueue_lock.unlock();
while(lock_wq)
{
@@ -1456,9 +1484,13 @@ namespace detail
}
wq.next_timer_absolute.lock.unlock();
}
- workitem = wq.next_active(mythreadidx);
+ unsigned count = 0;
+ workitem = wq.next_active(count, mythreadidx);
if(workitem != nullptr)
{
+ workqueue_lock.lock();
+ //std::cout << "workitem = " << workitem << " nesting_level = " << wq.nesting_level << " count = " << count << std::endl;
+ workqueue_lock.unlock();
break;
}
workqueue_lock.lock();
@@ -1483,7 +1515,7 @@ namespace detail
{
_remove_from_list(threadpool_active, self);
threadpool_threads.fetch_sub(1, std::memory_order_release);
-#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
std::cout << "*** DTP " << self << " exits due to no new work for ms_sleep_for_more_work" << std::endl;
#endif
self->thread.detach();
@@ -1514,7 +1546,7 @@ namespace detail
threadpool_guard g(threadpool_lock);
_remove_from_list(threadpool_active, self);
_append_to_list(threadpool_sleeping, self);
- self->state--;
+ self->state.fetch_sub(1, std::memory_order_release);
if(earliest_absolute != std::chrono::system_clock::time_point())
{
#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
@@ -1529,7 +1561,7 @@ namespace detail
#endif
self->cond.wait_for(g, duration);
}
- self->state++;
+ self->state.fetch_add(1, std::memory_order_release);
_remove_from_list(threadpool_sleeping, self);
_append_to_list(threadpool_active, self);
#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
@@ -1566,7 +1598,7 @@ namespace detail
threadpool_guard g(threadpool_lock);
_remove_from_list(threadpool_active, self);
threadpool_threads.fetch_sub(1, std::memory_order_release);
-#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
std::cout << "*** DTP " << self << " exits due to threadmetrics saying we exceed max concurrency" << std::endl;
#endif
self->thread.detach();
@@ -1578,9 +1610,9 @@ namespace detail
{
}
}
- self->state -= 2; // dead
+ self->state.fetch_sub(2, std::memory_order_release); // dead
threadpool_threads.fetch_sub(1, std::memory_order_release);
-#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
std::cout << "*** DTP " << self << " exits due to state request, state = " << self->state << std::endl;
#endif
}
@@ -1727,6 +1759,7 @@ namespace detail
{
// TODO: It would be super nice if we prepended this instead if it came from a timer
workqueue->append_active(workitem);
+ //std::cout << "append_active _nesting_level = " << parent->_nesting_level << std::endl;
}
else
{
@@ -1736,6 +1769,7 @@ namespace detail
{
// TODO: It would be super nice if we prepended this instead if it came from a timer
p->append_active(workitem);
+ //std::cout << "append_active _nesting_level = " << parent->_nesting_level << std::endl;
break;
}
}