From 84d3f89aa4f3663a33aed3853ae48abd546e91a1 Mon Sep 17 00:00:00 2001 From: "Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com)" Date: Thu, 11 Mar 2021 11:47:02 +0000 Subject: Linux native dynamic_thread_pool_group is working again after last commit refactor, but priority is broken again. Sigh. --- .../v2.0/detail/impl/dynamic_thread_pool_group.ipp | 120 +++++++++++++-------- 1 file changed, 77 insertions(+), 43 deletions(-) diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp index c22cae35..7b2de48e 100644 --- a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp +++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp @@ -245,7 +245,7 @@ namespace detail } #if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) - static constexpr unsigned TOTAL_NEXTACTIVES = 4; + static constexpr unsigned TOTAL_NEXTACTIVES = 1; struct next_active_base_t { std::atomic count{0}; @@ -267,48 +267,76 @@ namespace detail static_assert(sizeof(next_active_work_t) == 64, "next_active_work_t is not a cacheline"); next_active_base_t next_timer_relative, next_timer_absolute; - dynamic_thread_pool_group::work_item *next_active(size_t threadidx) + dynamic_thread_pool_group::work_item *next_active(unsigned &count, size_t threadidx) { - threadidx &= ~(TOTAL_NEXTACTIVES - 1); - const size_t original_threadidx = threadidx; - bool all_empty = true; - for(;;) + if(TOTAL_NEXTACTIVES > 1) { - next_active_base_t &x = next_actives[threadidx]; - if(x.count.load(std::memory_order_relaxed) > 0) + threadidx &= (TOTAL_NEXTACTIVES - 1); + const size_t original_threadidx = threadidx; + bool all_empty = true; + for(;;) { - all_empty = false; - if(x.lock.try_lock()) + next_active_base_t &x = next_actives[threadidx]; + if(x.count.load(std::memory_order_relaxed) > 0) { - auto *ret = x.front; - if(ret != nullptr) + all_empty = false; + if(x.lock.try_lock()) { - x.front = ret->_next_scheduled; - x.count.fetch_sub(1, std::memory_order_relaxed); - if(x.front == nullptr) + auto *ret = x.front; + if(ret != nullptr) { - assert(x.back == ret); - x.back = nullptr; + x.front = ret->_next_scheduled; + count = x.count.fetch_sub(1, std::memory_order_relaxed); + if(x.front == nullptr) + { + assert(x.back == ret); + x.back = nullptr; + } + ret->_next_scheduled = nullptr; + x.lock.unlock(); + return ret; } - ret->_next_scheduled = nullptr; - return ret; + x.lock.unlock(); } } - x.lock.unlock(); - } - if(++threadidx >= TOTAL_NEXTACTIVES) - { - threadidx = 0; + if(++threadidx >= TOTAL_NEXTACTIVES) + { + threadidx = 0; + } + if(threadidx == original_threadidx) + { + if(all_empty) + { + return nullptr; + } + all_empty = true; + } } - if(threadidx == original_threadidx) + } + else + { + next_active_base_t &x = next_actives[0]; + if(x.count.load(std::memory_order_relaxed) > 0) { - if(all_empty) + x.lock.lock(); + auto *ret = x.front; + if(ret != nullptr) { - return nullptr; + x.front = ret->_next_scheduled; + count = x.count.fetch_sub(1, std::memory_order_relaxed); + if(x.front == nullptr) + { + assert(x.back == ret); + x.back = nullptr; + } + ret->_next_scheduled = nullptr; + x.lock.unlock(); + return ret; } - all_empty = true; + x.lock.unlock(); } } + return nullptr; } private: @@ -528,7 +556,7 @@ namespace detail std::thread thread; std::condition_variable cond; std::chrono::steady_clock::time_point last_did_work; - int state{0}; // <0 = dead, 0 = sleeping/please die, 1 = busy + std::atomic state{0}; // <0 = dead, 0 = sleeping/please die, 1 = busy }; struct threads_t { @@ -709,13 +737,13 @@ namespace detail } // Threads which went to sleep the longest ago are at the front auto *t = which.front; - if(t->state < 0) + if(t->state.load(std::memory_order_acquire) < 0) { // He's already exiting return false; } - assert(t->state == 0); - t->state--; + assert(t->state.load(std::memory_order_acquire) == 0); + t->state.fetch_sub(1, std::memory_order_release); #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING std::cout << "*** DTP " << t << " is told to quit" << std::endl; #endif @@ -724,7 +752,7 @@ namespace detail g.unlock(); t->cond.notify_one(); g.lock(); - } while(t->state >= -1); + } while(t->state.load(std::memory_order_acquire) >= -1); #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING std::cout << "*** DTP " << t << " has quit, deleting" << std::endl; #endif @@ -1393,12 +1421,12 @@ namespace detail { pthread_setname_np(pthread_self(), "LLFIO DYN TPG"); self->last_did_work = std::chrono::steady_clock::now(); - self->state++; // busy + self->state.fetch_add(1, std::memory_order_release); // busy const unsigned mythreadidx = threadpool_threads.fetch_add(1, std::memory_order_release); #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING std::cout << "*** DTP " << self << " begins." << std::endl; #endif - while(self->state > 0) + while(self->state.load(std::memory_order_relaxed) > 0) { dynamic_thread_pool_group::work_item *workitem = nullptr; bool workitem_is_timer = false; @@ -1407,7 +1435,7 @@ namespace detail // Start from highest priority work group, executing any timers due before selecting a work item { workqueue_lock.lock(); - auto lock_wq = workqueue; + auto lock_wq = workqueue; // take shared_ptr to highest priority collection of work groups workqueue_lock.unlock(); while(lock_wq) { @@ -1456,9 +1484,13 @@ namespace detail } wq.next_timer_absolute.lock.unlock(); } - workitem = wq.next_active(mythreadidx); + unsigned count = 0; + workitem = wq.next_active(count, mythreadidx); if(workitem != nullptr) { + workqueue_lock.lock(); + //std::cout << "workitem = " << workitem << " nesting_level = " << wq.nesting_level << " count = " << count << std::endl; + workqueue_lock.unlock(); break; } workqueue_lock.lock(); @@ -1483,7 +1515,7 @@ namespace detail { _remove_from_list(threadpool_active, self); threadpool_threads.fetch_sub(1, std::memory_order_release); -#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING std::cout << "*** DTP " << self << " exits due to no new work for ms_sleep_for_more_work" << std::endl; #endif self->thread.detach(); @@ -1514,7 +1546,7 @@ namespace detail threadpool_guard g(threadpool_lock); _remove_from_list(threadpool_active, self); _append_to_list(threadpool_sleeping, self); - self->state--; + self->state.fetch_sub(1, std::memory_order_release); if(earliest_absolute != std::chrono::system_clock::time_point()) { #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING @@ -1529,7 +1561,7 @@ namespace detail #endif self->cond.wait_for(g, duration); } - self->state++; + self->state.fetch_add(1, std::memory_order_release); _remove_from_list(threadpool_sleeping, self); _append_to_list(threadpool_active, self); #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING @@ -1566,7 +1598,7 @@ namespace detail threadpool_guard g(threadpool_lock); _remove_from_list(threadpool_active, self); threadpool_threads.fetch_sub(1, std::memory_order_release); -#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING std::cout << "*** DTP " << self << " exits due to threadmetrics saying we exceed max concurrency" << std::endl; #endif self->thread.detach(); @@ -1578,9 +1610,9 @@ namespace detail { } } - self->state -= 2; // dead + self->state.fetch_sub(2, std::memory_order_release); // dead threadpool_threads.fetch_sub(1, std::memory_order_release); -#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING std::cout << "*** DTP " << self << " exits due to state request, state = " << self->state << std::endl; #endif } @@ -1727,6 +1759,7 @@ namespace detail { // TODO: It would be super nice if we prepended this instead if it came from a timer workqueue->append_active(workitem); + //std::cout << "append_active _nesting_level = " << parent->_nesting_level << std::endl; } else { @@ -1736,6 +1769,7 @@ namespace detail { // TODO: It would be super nice if we prepended this instead if it came from a timer p->append_active(workitem); + //std::cout << "append_active _nesting_level = " << parent->_nesting_level << std::endl; break; } } -- cgit v1.2.3