From bb4719390b20944395e6a62522d395f0228fa3da Mon Sep 17 00:00:00 2001 From: "Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com)" Date: Fri, 19 Feb 2021 12:01:16 +0000 Subject: Finally got a fully working native implementation on Linux. I had realised that attempting to fix this each morning before work was turning into never actually making progress, so I carved out four hours this morning without distraction, and boom here we go. I next need to simplify this implementation, it has become bloated with all the changes, it definitely could be made much leaner. --- .../v2.0/detail/impl/dynamic_thread_pool_group.ipp | 456 ++++++++++----------- include/llfio/v2.0/dynamic_thread_pool_group.hpp | 7 +- 2 files changed, 229 insertions(+), 234 deletions(-) (limited to 'include') diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp index 07446ea0..df10efcb 100644 --- a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp +++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp @@ -68,17 +68,157 @@ LLFIO_V2_NAMESPACE_BEGIN namespace detail { + struct global_dynamic_thread_pool_impl_workqueue_item + { + std::unordered_set items; +#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) + dynamic_thread_pool_group::work_item *_next_active_front{nullptr}, *_next_timer_relative_front{nullptr}, *_next_timer_absolute_front{nullptr}; + dynamic_thread_pool_group::work_item *_next_active_back{nullptr}, *_next_timer_relative_back{nullptr}, *_next_timer_absolute_back{nullptr}; + + dynamic_thread_pool_group::work_item *next_active() + { + auto *ret = _next_active_front; + if(ret == nullptr) + { + assert(_next_active_back == nullptr); + return nullptr; + } + _next_active_front = ret->_next_scheduled; + if(_next_active_front == nullptr) + { + assert(_next_active_back == ret); + _next_active_back = nullptr; + } + ret->_next_scheduled = nullptr; + return ret; + } + void append_active(dynamic_thread_pool_group::work_item *p) + { + if(_next_active_back == nullptr) + { + assert(_next_active_front == nullptr); + _next_active_front = _next_active_back = p; + return; + } + p->_next_scheduled = nullptr; + _next_active_back->_next_scheduled = p; + _next_active_back = p; + } + void prepend_active(dynamic_thread_pool_group::work_item *p) + { + if(_next_active_front == nullptr) + { + assert(_next_active_back == nullptr); + _next_active_front = _next_active_back = p; + return; + } + p->_next_scheduled = _next_active_front; + _next_active_front = p; + } + + dynamic_thread_pool_group::work_item *next_timer(int which) + { + if(which == 0) + { + return nullptr; + } + auto *&front = (which == 1) ? _next_timer_relative_front : _next_timer_absolute_front; + auto *&back = (which == 1) ? _next_timer_relative_back : _next_timer_absolute_back; + auto *ret = front; + if(ret == nullptr) + { + assert(back == nullptr); + return nullptr; + } + front = ret->_next_scheduled; + if(front == nullptr) + { + assert(back == ret); + back = nullptr; + } + ret->_next_scheduled = nullptr; + return ret; + } + void append_timer(dynamic_thread_pool_group::work_item *i) + { + if(i->_timepoint1 != std::chrono::steady_clock::time_point()) + { + if(_next_timer_relative_front == nullptr) + { + _next_timer_relative_front = _next_timer_relative_back = i; + } + else + { + bool done = false; + for(dynamic_thread_pool_group::work_item *p = nullptr, *n = _next_timer_relative_front; n != nullptr; p = n, n = n->_next_scheduled) + { + if(n->_timepoint1 <= i->_timepoint1) + { + if(p == nullptr) + { + i->_next_scheduled = n; + _next_timer_relative_front = i; + } + else + { + i->_next_scheduled = n; + p->_next_scheduled = i; + } + done = true; + break; + } + } + if(!done) + { + _next_timer_relative_back->_next_scheduled = i; + i->_next_scheduled = nullptr; + _next_timer_relative_back = i; + } + } + } + else + { + if(_next_timer_absolute_front == nullptr) + { + _next_timer_absolute_front = _next_timer_absolute_back = i; + } + else + { + bool done = false; + for(dynamic_thread_pool_group::work_item *p = nullptr, *n = _next_timer_absolute_front; n != nullptr; p = n, n = n->_next_scheduled) + { + if(n->_timepoint2 <= i->_timepoint2) + { + if(p == nullptr) + { + i->_next_scheduled = n; + _next_timer_absolute_front = i; + } + else + { + i->_next_scheduled = n; + p->_next_scheduled = i; + } + done = true; + break; + } + } + if(!done) + { + _next_timer_absolute_back->_next_scheduled = i; + i->_next_scheduled = nullptr; + _next_timer_absolute_back = i; + } + } + } + } +#endif + }; struct global_dynamic_thread_pool_impl { std::mutex workqueue_lock; using _lock_guard = std::unique_lock; - struct workqueue_item - { - std::unordered_set items; - std::unordered_set::iterator currentgroup{items.begin()}; - dynamic_thread_pool_group_impl *_currentgroup{nullptr}; - }; - std::vector workqueue; + std::vector workqueue; #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD using threadh_type = void *; using grouph_type = dispatch_group_t; @@ -715,7 +855,8 @@ namespace detail return success(); } - inline void _submit_work_item(_lock_guard &g, bool item_in_timer_list, dynamic_thread_pool_group::work_item *workitem, bool defer_pool_wake); + inline void _submit_work_item(_lock_guard &g, bool item_in_timer_list, bool submit_into_highest_priority, dynamic_thread_pool_group::work_item *workitem, + bool defer_pool_wake); inline result submit(_lock_guard &g, dynamic_thread_pool_group_impl *group, span work) noexcept; @@ -796,31 +937,10 @@ public: // Append this group to the global work queue at its nesting level if(_nesting_level >= impl.workqueue.size()) { - for(auto &i : impl.workqueue) - { - i._currentgroup = (i.currentgroup != i.items.end()) ? *i.currentgroup : nullptr; - i.currentgroup = {}; - } impl.workqueue.resize(_nesting_level + 1); - for(auto &i : impl.workqueue) - { - if(i._currentgroup != nullptr) - { - i.currentgroup = i.items.find(i._currentgroup); - } - else - { - i.currentgroup = i.items.end(); - } - i._currentgroup = nullptr; - } } auto &wq = impl.workqueue[_nesting_level]; wq.items.insert(this); - if(wq.items.size() == 1) - { - wq.currentgroup = wq.items.begin(); - } return success(); } catch(...) @@ -850,13 +970,6 @@ public: detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.workqueue_lock); // lock global assert(impl.workqueue.size() > _nesting_level); auto &wq = impl.workqueue[_nesting_level]; - if(wq.items.end() != wq.currentgroup && *wq.currentgroup == this) - { - if(wq.items.end() == ++wq.currentgroup) - { - wq.currentgroup = wq.items.begin(); - } - } wq.items.erase(this); while(!impl.workqueue.empty() && impl.workqueue.back().items.empty()) { @@ -985,208 +1098,59 @@ namespace detail #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING std::cout << "*** DTP " << self << " begins." << std::endl; #endif - size_t workqueue_depth = workqueue.size() - 1; while(self->state > 0) { - restart: dynamic_thread_pool_group::work_item *workitem = nullptr; - std::chrono::steady_clock::time_point earliest_duration; - std::chrono::system_clock::time_point earliest_absolute; - if(!workqueue.empty()) + bool workitem_is_timer = false; + std::chrono::steady_clock::time_point now_steady, earliest_duration; + std::chrono::system_clock::time_point now_system, earliest_absolute; + for(auto it = workqueue.rbegin(); it != workqueue.rend() && workitem == nullptr; ++it) { - auto wq_it = --workqueue.end(); - bool in_highest_priority_queue = true, started_from_top = false; - if(workqueue.size() > 1 && workqueue_depth < workqueue.size() - 1) - { - wq_it = workqueue.begin() + workqueue_depth; - in_highest_priority_queue = false; - } - auto *wq = &(*wq_it); - if(in_highest_priority_queue) - { - if(wq->currentgroup == wq->items.end()) - { - wq->currentgroup = wq->items.begin(); - started_from_top = true; - } - else if(wq->currentgroup == wq->items.begin()) - { - started_from_top = true; - } - } - for(;;) + auto &wq = *it; + if(wq._next_timer_relative_front != nullptr) { - if(workqueue_depth >= workqueue.size()) - { - goto restart; - } - wq_it = workqueue.begin() + workqueue_depth; - wq = &(*wq_it); - dynamic_thread_pool_group_impl *tpg = *wq->currentgroup; - _lock_guard gg(tpg->_lock); // lock group - if(started_from_top) - { - if(tpg->_timer_work_items_remaining == 0 && tpg->_active_work_items_remaining == 0) - { - tpg->_timer_work_items_remaining = (size_t) -1; - tpg->_active_work_items_remaining = (size_t) -1; - } - else - { - started_from_top = false; - } - } - std::cout << "*** DTP " << self << " sees in group " << tpg << " in_highest_priority_queue = " << in_highest_priority_queue - << " work items = " << tpg->_work_items_active.count << " timer items = " << tpg->_work_items_timer.count << std::endl; - if(tpg->_work_items_timer.count > 0 && tpg->_work_items_timer.front->_internalworkh != nullptr) - { - tpg->_timer_work_items_remaining = 0; // this group is being fully executed right now - } - else if(tpg->_timer_work_items_remaining > tpg->_work_items_timer.count) + if(now_steady == std::chrono::steady_clock::time_point()) { - tpg->_timer_work_items_remaining = tpg->_work_items_timer.count; + now_steady = std::chrono::steady_clock::now(); } - // If we are not in the highest priority group, and there are no newly added - // work items, don't execute work. - if(tpg->_work_items_active.count > 0 && - (tpg->_work_items_active.front->_internalworkh != nullptr || (!in_highest_priority_queue && tpg->_newly_added_active_work_items == 0))) + if(wq._next_timer_relative_front->_timepoint1 <= now_steady) { - tpg->_active_work_items_remaining = 0; // this group is being fully executed right now + workitem = wq.next_timer(1); + workitem_is_timer = true; + break; } - else if(tpg->_active_work_items_remaining > tpg->_work_items_active.count) + if(earliest_duration == std::chrono::steady_clock::time_point() || wq._next_timer_relative_front->_timepoint1 < earliest_duration) { - tpg->_active_work_items_remaining = tpg->_work_items_active.count; + earliest_duration = wq._next_timer_relative_front->_timepoint1; } - std::cout << "*** DTP " << self << " sees in group " << tpg << " _active_work_items_remaining = " << tpg->_active_work_items_remaining - << " _timer_work_items_remaining = " << tpg->_timer_work_items_remaining << std::endl; - if(tpg->_timer_work_items_remaining > 0) + } + if(wq._next_timer_absolute_front != nullptr) + { + if(now_system == std::chrono::system_clock::time_point()) { - auto *wi = tpg->_work_items_timer.front; - assert(wi->_has_timer_set()); - _remove_from_list(tpg->_work_items_timer, wi); - _append_to_list(tpg->_work_items_timer, wi); - tpg->_timer_work_items_remaining--; - - bool invoketimer = false; - if(wi->_has_timer_set_relative()) - { - // Special constant for immediately rescheduled work items - if(wi->_timepoint1 == std::chrono::steady_clock::time_point(std::chrono::steady_clock::duration(1))) - { - invoketimer = true; - } - else if(earliest_duration == std::chrono::steady_clock::time_point() || wi->_timepoint1 < earliest_duration) - { - earliest_duration = wi->_timepoint1; - if(wi->_timepoint1 <= std::chrono::steady_clock::now()) - { - invoketimer = true; - } - } - } - if(wi->_has_timer_set_absolute() && (earliest_absolute == std::chrono::system_clock::time_point() || wi->_timepoint2 < earliest_absolute)) - { - earliest_absolute = wi->_timepoint2; - if(wi->_timepoint2 <= std::chrono::system_clock::now()) - { - invoketimer = true; - } - } - // If this work item's timer is due, execute immediately - if(invoketimer) - { - wi->_internalworkh = self; -#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING - std::cout << "*** DTP " << self << " executes timer item " << wi << std::endl; -#endif - gg.unlock(); - g.unlock(); - _timerthread(wi, nullptr); - g.lock(); - goto restart; - } -#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING - std::cout << "*** DTP " << self << " timer item " << wi << " timer is not ready yet " << std::endl; -#endif - // Process all timers before considering work - continue; + now_system = std::chrono::system_clock::now(); } - if(tpg->_active_work_items_remaining > 0) + if(wq._next_timer_absolute_front->_timepoint2 <= now_system) { - auto *wi = tpg->_work_items_active.front; - assert(!wi->_has_timer_set()); - _remove_from_list(tpg->_work_items_active, wi); - _append_to_list(tpg->_work_items_active, wi); - tpg->_active_work_items_remaining--; - - if(tpg->_newly_added_active_work_items > 0) - { - tpg->_newly_added_active_work_items--; - } - workitem = wi; - workitem->_internalworkh = self; -#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING - std::cout << "*** DTP " << self << " chooses work item " << workitem << " from group " << tpg << " distance from top " - << (workqueue.end() - wq_it - 1) << std::endl; -#endif - // Execute this workitem + workitem = wq.next_timer(2); + workitem_is_timer = true; break; } - - // Move to next group, which may be in a lower priority work queue - assert(tpg->_active_work_items_remaining == 0); - if(++wq->currentgroup == wq->items.end()) + if(earliest_absolute == std::chrono::system_clock::time_point() || wq._next_timer_absolute_front->_timepoint2 < earliest_absolute) { -#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING - std::cout << "*** DTP " << self << " reaches end of current group, started_from_top = " << started_from_top << std::endl; -#endif - gg.unlock(); // unlock group - if(!started_from_top) - { - workqueue_depth = (size_t) -1; - goto restart; - } - // Nothing for me to do in this workqueue - if(wq_it == workqueue.begin()) - { - // Reset all wq->currentgroup to begin() to ensure timers - // etc in lower priority groups do get seen - for(auto &i : workqueue) - { - if(i.currentgroup == i.items.end()) - { - i.currentgroup = i.items.begin(); - } - } -#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING - std::cout << "*** DTP " << self << " finds work queue empty, going to sleep." << std::endl; -#endif - assert(workitem == nullptr); - goto workqueue_empty; - } - --wq_it; - workqueue_depth = wq_it - workqueue.begin(); - wq = &(*wq_it); - tpg = *wq->currentgroup; - gg = _lock_guard(tpg->_lock); // lock group -#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING - std::cout << "*** DTP " << self << " moves work search up to distance from top = " << (workqueue.end() - wq_it - 1) << " examining " << tpg - << " finds _work_items_active.count = " << tpg->_work_items_active.count << "." << std::endl; -#endif + earliest_absolute = wq._next_timer_absolute_front->_timepoint2; } - tpg = *wq->currentgroup; - tpg->_active_work_items_remaining = tpg->_work_items_active.count; -#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING - std::cout << "*** DTP " << self << " choose new group " << tpg << std::endl; -#endif } + workitem = wq.next_active(); + } + if(now_steady == std::chrono::steady_clock::time_point()) + { + now_steady = std::chrono::steady_clock::now(); } - workqueue_empty: - auto now = std::chrono::steady_clock::now(); if(workitem == nullptr) { const std::chrono::steady_clock::duration max_sleep(std::chrono::milliseconds(ms_sleep_for_more_work.load(std::memory_order_relaxed))); - if(now - self->last_did_work >= max_sleep) + if(now_steady - self->last_did_work >= max_sleep) { _remove_from_list(threadpool_active, self); threadpool_threads.fetch_sub(1, std::memory_order_release); @@ -1197,14 +1161,18 @@ namespace detail std::chrono::steady_clock::duration duration(max_sleep); if(earliest_duration != std::chrono::steady_clock::time_point()) { - if(now - earliest_duration < duration) + if(now_steady - earliest_duration < duration) { - duration = now - earliest_duration; + duration = now_steady - earliest_duration; } } else if(earliest_absolute != std::chrono::system_clock::time_point()) { - auto diff = std::chrono::system_clock::now() - earliest_absolute; + if(now_system == std::chrono::system_clock::time_point()) + { + now_system = std::chrono::system_clock::now(); + } + auto diff = now_system - earliest_absolute; if(diff > duration) { earliest_absolute = {}; @@ -1216,14 +1184,14 @@ namespace detail threadpool_sleeping_count.fetch_add(1, std::memory_order_release); if(earliest_absolute != std::chrono::system_clock::time_point()) { -#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING std::cout << "*** DTP " << self << " goes to sleep absolute" << std::endl; #endif self->cond.wait_until(g, earliest_absolute); } else { -#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING std::cout << "*** DTP " << self << " goes to sleep for " << std::chrono::duration_cast(duration).count() << std::endl; #endif self->cond.wait_for(g, duration); @@ -1232,13 +1200,13 @@ namespace detail _remove_from_list(threadpool_sleeping, self); _append_to_list(threadpool_active, self); threadpool_sleeping_count.fetch_sub(1, std::memory_order_release); -#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING std::cout << "*** DTP " << self << " wakes, state = " << self->state << std::endl; #endif g.unlock(); try { - populate_threadmetrics(now); + populate_threadmetrics(now_steady); } catch(...) { @@ -1246,17 +1214,24 @@ namespace detail g.lock(); continue; } - self->last_did_work = now; -#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + self->last_did_work = now_steady; +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING std::cout << "*** DTP " << self << " executes work item " << workitem << std::endl; #endif total_submitted_workitems.fetch_sub(1, std::memory_order_relaxed); g.unlock(); - _workerthread(workitem, nullptr); + if(workitem_is_timer) + { + _timerthread(workitem, nullptr); + } + else + { + _workerthread(workitem, nullptr); + } // workitem->_internalworkh should be null, however workitem may also no longer exist try { - if(populate_threadmetrics(now)) + if(populate_threadmetrics(now_steady)) { _remove_from_list(threadpool_active, self); threadpool_threads.fetch_sub(1, std::memory_order_release); @@ -1278,10 +1253,11 @@ namespace detail } #endif - inline void global_dynamic_thread_pool_impl::_submit_work_item(_lock_guard &g, bool item_in_timer_list, dynamic_thread_pool_group::work_item *workitem, - bool defer_pool_wake) + inline void global_dynamic_thread_pool_impl::_submit_work_item(_lock_guard &g, bool item_in_timer_list, bool submit_into_highest_priority, + dynamic_thread_pool_group::work_item *workitem, bool defer_pool_wake) { (void) g; + (void) submit_into_highest_priority; (void) defer_pool_wake; if(workitem->_nextwork != -1) { @@ -1371,6 +1347,10 @@ namespace detail ft.dwLowDateTime = li.LowPart; // std::cout << "*** timer " << workitem << std::endl; SetThreadpoolTimer((PTP_TIMER) workitem->_internaltimerh, &ft, 0, slop); +#else + _lock_guard gg(workqueue_lock); + auto *wq = &workqueue[submit_into_highest_priority ? (workqueue.size() - 1) : parent->_nesting_level]; + wq->append_timer(workitem); #endif } else @@ -1406,6 +1386,18 @@ namespace detail SetThreadpoolCallbackPriority(parent->_grouph, priority); // std::cout << "*** submit " << workitem << std::endl; SubmitThreadpoolWork((PTP_WORK) workitem->_internalworkh); +#else + _lock_guard gg(workqueue_lock); + auto *wq = &workqueue[submit_into_highest_priority ? (workqueue.size() - 1) : parent->_nesting_level]; + // If the item came from a timer, prioritise execution + if(item_in_timer_list) + { + wq->prepend_active(workitem); + } + else + { + wq->append_active(workitem); + } #endif } @@ -1526,7 +1518,7 @@ namespace detail group->_newly_added_active_work_items++; group->_active_work_items_remaining++; #endif - _submit_work_item(g, i->_has_timer_set(), i, i != work.back()); + _submit_work_item(g, i->_has_timer_set(), true, i, i != work.back()); } } return success(); @@ -1942,10 +1934,10 @@ namespace detail _work_item_done(g, workitem); return; } - _submit_work_item(g, false, workitem, false); + _submit_work_item(g, false, false, workitem, false); return; } - _submit_work_item(g, true, workitem, false); + _submit_work_item(g, true, false, workitem, false); } // Worker thread entry point @@ -2001,7 +1993,7 @@ namespace detail _work_item_done(g, workitem); return; } - _submit_work_item(g, false, workitem, false); + _submit_work_item(g, false, false, workitem, false); } } } // namespace detail diff --git a/include/llfio/v2.0/dynamic_thread_pool_group.hpp b/include/llfio/v2.0/dynamic_thread_pool_group.hpp index 6caf6bf3..2af85564 100644 --- a/include/llfio/v2.0/dynamic_thread_pool_group.hpp +++ b/include/llfio/v2.0/dynamic_thread_pool_group.hpp @@ -43,6 +43,7 @@ class io_handle; namespace detail { struct global_dynamic_thread_pool_impl; + struct global_dynamic_thread_pool_impl_workqueue_item; LLFIO_HEADERS_ONLY_FUNC_SPEC global_dynamic_thread_pool_impl &global_dynamic_thread_pool() noexcept; } // namespace detail @@ -174,11 +175,12 @@ public: class work_item { friend struct detail::global_dynamic_thread_pool_impl; + friend struct detail::global_dynamic_thread_pool_impl_workqueue_item; friend class dynamic_thread_pool_group_impl; std::atomic _parent{nullptr}; void *_internalworkh{nullptr}; void *_internaltimerh{nullptr}; // lazily created if next() ever returns a deadline - work_item *_prev{nullptr}, *_next{nullptr}; + work_item *_prev{nullptr}, *_next{nullptr}, *_next_scheduled{nullptr}; intptr_t _nextwork{-1}; std::chrono::steady_clock::time_point _timepoint1; std::chrono::system_clock::time_point _timepoint2; @@ -197,6 +199,7 @@ public: , _internaltimerh(o._internaltimerh) , _prev(o._prev) , _next(o._next) + , _next_scheduled(o._next_scheduled) , _nextwork(o._nextwork) , _timepoint1(o._timepoint1) , _timepoint2(o._timepoint2) @@ -210,7 +213,7 @@ public: LLFIO_LOG_FATAL(this, "FATAL: dynamic_thread_pool_group::work_item was relocated in memory during use!"); abort(); } - o._prev = o._next = nullptr; + o._prev = o._next = o._next_scheduled = nullptr; o._nextwork = -1; o._internalworkh_inuse = 0; } -- cgit v1.2.3