Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/windirstat/llfio.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2021-02-09 15:57:41 +0300
committerNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2021-03-16 13:21:42 +0300
commitd00312487dd19c1c5f8967974c12239eebdeb667 (patch)
tree2449d0ea21cac4c0412cc4c6b7c44f41b966b5c9
parent944f8f03360b2d12f04dff25a35e9ef0e1f15e7b (diff)
More wip
-rw-r--r--include/llfio/revision.hpp6
-rw-r--r--include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp359
-rw-r--r--include/llfio/v2.0/dynamic_thread_pool_group.hpp26
-rw-r--r--test/tests/dynamic_thread_pool_group.cpp1
4 files changed, 271 insertions, 121 deletions
diff --git a/include/llfio/revision.hpp b/include/llfio/revision.hpp
index 3bf437f0..3456e101 100644
--- a/include/llfio/revision.hpp
+++ b/include/llfio/revision.hpp
@@ -1,4 +1,4 @@
// Note the second line of this file must ALWAYS be the git SHA, third line ALWAYS the git SHA update time
-#define LLFIO_PREVIOUS_COMMIT_REF 0d62d125a2ee2404976155f104dbe109430ae0ba
-#define LLFIO_PREVIOUS_COMMIT_DATE "2021-01-19 15:43:28 +00:00"
-#define LLFIO_PREVIOUS_COMMIT_UNIQUE 0d62d125
+#define LLFIO_PREVIOUS_COMMIT_REF b40a7594bd14dfbeaca42caf77b00df27df27b95
+#define LLFIO_PREVIOUS_COMMIT_DATE "2021-02-09 12:57:41 +00:00"
+#define LLFIO_PREVIOUS_COMMIT_UNIQUE b40a7594
diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
index b0bbef9a..571b90cf 100644
--- a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
+++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
@@ -75,8 +75,8 @@ namespace detail
struct workqueue_item
{
std::unordered_set<dynamic_thread_pool_group_impl *> items;
- std::unordered_set<dynamic_thread_pool_group_impl *>::iterator currentgroup;
- size_t currentgroupremaining{0};
+ std::unordered_set<dynamic_thread_pool_group_impl *>::iterator currentgroup{items.begin()};
+ dynamic_thread_pool_group_impl *_currentgroup{nullptr};
};
std::vector<workqueue_item> workqueue;
#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
@@ -122,6 +122,7 @@ namespace detail
thread_t *front{nullptr}, *back{nullptr};
} threadpool_active, threadpool_sleeping;
std::atomic<size_t> total_submitted_workitems{0}, threadpool_threads{0}, threadpool_sleeping_count{0};
+ std::atomic<uint32_t> ms_sleep_for_more_work{60000};
std::mutex threadmetrics_lock;
struct threadmetrics_threadid
@@ -209,6 +210,23 @@ namespace detail
}
what.count++;
}
+ template <class T, class U> static void _prepend_to_list(T &what, U *v)
+ {
+ if(what.front == nullptr)
+ {
+ assert(what.back == nullptr);
+ v->_next = v->_prev = nullptr;
+ what.front = what.back = v;
+ }
+ else
+ {
+ v->_prev = nullptr;
+ v->_next = what.front;
+ what.front->_prev = v;
+ what.front = v;
+ }
+ what.count++;
+ }
template <class T, class U> static void _remove_from_list(T &what, U *v)
{
if(v->_prev == nullptr && v->_next == nullptr)
@@ -739,7 +757,7 @@ class dynamic_thread_pool_group_impl final : public dynamic_thread_pool_group
{
size_t count{0};
dynamic_thread_pool_group::work_item *front{nullptr}, *back{nullptr};
- } _work_items_active, _work_items_done, _work_items_delayed;
+ } _work_items_timer, _work_items_active, _work_items_done, _work_items_delayed;
std::atomic<bool> _stopping{false}, _stopped{true}, _completing{false};
std::atomic<int> _waits{0};
result<void> _abnormal_completion_cause{success()}; // The cause of any abnormal group completion
@@ -751,6 +769,9 @@ class dynamic_thread_pool_group_impl final : public dynamic_thread_pool_group
PTP_CALLBACK_ENVIRON _grouph{&_callbackenviron};
#else
void *_grouph{nullptr};
+ size_t _newly_added_active_work_items{0};
+ size_t _timer_work_items_remaining{0};
+ size_t _active_work_items_remaining{0};
#endif
public:
@@ -774,14 +795,30 @@ public:
// Append this group to the global work queue at its nesting level
if(_nesting_level >= impl.workqueue.size())
{
+ for(auto &i : impl.workqueue)
+ {
+ i._currentgroup = (i.currentgroup != i.items.end()) ? *i.currentgroup : nullptr;
+ i.currentgroup = {};
+ }
impl.workqueue.resize(_nesting_level + 1);
+ for(auto &i : impl.workqueue)
+ {
+ if(i._currentgroup != nullptr)
+ {
+ i.currentgroup = i.items.find(i._currentgroup);
+ }
+ else
+ {
+ i.currentgroup = i.items.end();
+ }
+ i._currentgroup = nullptr;
+ }
}
auto &wq = impl.workqueue[_nesting_level];
wq.items.insert(this);
if(wq.items.size() == 1)
{
wq.currentgroup = wq.items.begin();
- wq.currentgroupremaining = _work_items_active.count;
}
return success();
}
@@ -812,16 +849,12 @@ public:
detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.workqueue_lock); // lock global
assert(impl.workqueue.size() > _nesting_level);
auto &wq = impl.workqueue[_nesting_level];
- if(*wq.currentgroup == this)
+ if(wq.items.end() != wq.currentgroup && *wq.currentgroup == this)
{
if(wq.items.end() == ++wq.currentgroup)
{
wq.currentgroup = wq.items.begin();
}
- if(!wq.items.empty())
- {
- wq.currentgroupremaining = (*wq.currentgroup)->_work_items_active.count;
- }
}
wq.items.erase(this);
while(!impl.workqueue.empty() && impl.workqueue.back().items.empty())
@@ -849,12 +882,12 @@ public:
_stopped.store(false, std::memory_order_release);
auto &impl = detail::global_dynamic_thread_pool();
_lock_guard g(_lock); // lock group
- if(_work_items_active.count == 0 && _work_items_done.count == 0)
+ if(_work_items_timer.count == 0 && _work_items_active.count == 0 && _work_items_done.count == 0)
{
_abnormal_completion_cause = success();
}
OUTCOME_TRY(impl.submit(g, this, work));
- if(_work_items_active.count == 0)
+ if(_work_items_timer.count == 0 && _work_items_active.count == 0)
{
_stopped.store(true, std::memory_order_release);
}
@@ -900,6 +933,30 @@ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC dynamic_thread_pool_group::work_item *dynamic_th
return detail::global_dynamic_thread_pool_thread_local_state().workitem;
}
+LLFIO_HEADERS_ONLY_MEMFUNC_SPEC uint32_t dynamic_thread_pool_group::ms_sleep_for_more_work() noexcept
+{
+#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32)
+ return detail::global_dynamic_thread_pool().ms_sleep_for_more_work.load(std::memory_order_relaxed);
+#else
+ return 0;
+#endif
+}
+
+LLFIO_HEADERS_ONLY_MEMFUNC_SPEC uint32_t dynamic_thread_pool_group::ms_sleep_for_more_work(uint32_t v) noexcept
+{
+#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32)
+ if(0 == v)
+ {
+ v = 1;
+ }
+ detail::global_dynamic_thread_pool().ms_sleep_for_more_work.store(v, std::memory_order_relaxed);
+ return v;
+#else
+ (void) v;
+ return 0;
+#endif
+}
+
LLFIO_HEADERS_ONLY_FUNC_SPEC result<dynamic_thread_pool_group_ptr> make_dynamic_thread_pool_group() noexcept
{
try
@@ -936,37 +993,75 @@ namespace detail
std::chrono::system_clock::time_point earliest_absolute;
if(!workqueue.empty())
{
- auto wq_it = (workqueue_depth >= workqueue.size()) ? --workqueue.end() : (workqueue.begin() + workqueue_depth);
+ auto wq_it = --workqueue.end();
+ bool in_highest_priority_queue = true, started_from_top = false;
+ if(workqueue.size() > 1 && workqueue_depth < workqueue.size() - 1)
+ {
+ wq_it = workqueue.begin() + workqueue_depth;
+ in_highest_priority_queue = false;
+ }
auto *wq = &(*wq_it);
+ if(in_highest_priority_queue)
+ {
+ if(wq->currentgroup == wq->items.end())
+ {
+ wq->currentgroup = wq->items.begin();
+ started_from_top = true;
+ }
+ else if(wq->currentgroup == wq->items.begin())
+ {
+ started_from_top = true;
+ }
+ }
for(;;)
{
dynamic_thread_pool_group_impl *tpg = *wq->currentgroup;
_lock_guard gg(tpg->_lock); // lock group
- if(wq->currentgroupremaining > tpg->_work_items_active.count)
- {
- wq->currentgroupremaining = tpg->_work_items_active.count;
- }
-#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
- std::cout << "*** DTP " << self << " sees " << wq->currentgroupremaining << " items remaining in group " << tpg << std::endl;
-#endif
- if(wq->currentgroupremaining > 0 && tpg->_work_items_active.front->_internalworkh == nullptr)
+ if(started_from_top)
{
- auto *wi = tpg->_work_items_active.front;
- _remove_from_list(tpg->_work_items_active, wi);
- _append_to_list(tpg->_work_items_active, wi);
- wq->currentgroupremaining--;
- if(wi->_internaltimerh == nullptr)
+ if(tpg->_timer_work_items_remaining == 0 && tpg->_active_work_items_remaining == 0)
{
- workitem = wi;
- workitem->_internalworkh = self;
-#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
- std::cout << "*** DTP " << self << " chooses work item " << workitem << " from group " << tpg << " distance from top "
- << (workqueue.end() - wq_it - 1) << std::endl;
-#endif
- break;
+ tpg->_timer_work_items_remaining = (size_t) -1;
+ tpg->_active_work_items_remaining = (size_t) -1;
+ }
+ else
+ {
+ started_from_top = false;
}
+ }
+ std::cout << "*** DTP " << self << " sees in group " << tpg << " in_highest_priority_queue = " << in_highest_priority_queue
+ << " work items = " << tpg->_work_items_active.count << " timer items = " << tpg->_work_items_timer.count << std::endl;
+ if(tpg->_work_items_timer.count > 0 && tpg->_work_items_timer.front->_internalworkh != nullptr)
+ {
+ tpg->_timer_work_items_remaining = 0; // this group is being fully executed right now
+ }
+ else if(tpg->_timer_work_items_remaining > tpg->_work_items_timer.count)
+ {
+ tpg->_timer_work_items_remaining = tpg->_work_items_timer.count;
+ }
+ // If we are not in the highest priority group, and there are no newly added
+ // work items, don't execute work.
+ if(tpg->_work_items_active.count > 0 &&
+ (tpg->_work_items_active.front->_internalworkh != nullptr || (!in_highest_priority_queue && tpg->_newly_added_active_work_items == 0)))
+ {
+ tpg->_active_work_items_remaining = 0; // this group is being fully executed right now
+ }
+ else if(tpg->_active_work_items_remaining > tpg->_work_items_active.count)
+ {
+ tpg->_active_work_items_remaining = tpg->_work_items_active.count;
+ }
+ std::cout << "*** DTP " << self << " sees in group " << tpg << " _active_work_items_remaining = " << tpg->_active_work_items_remaining
+ << " _timer_work_items_remaining = " << tpg->_timer_work_items_remaining << std::endl;
+ if(tpg->_timer_work_items_remaining > 0)
+ {
+ auto *wi = tpg->_work_items_timer.front;
+ assert(wi->_has_timer_set());
+ _remove_from_list(tpg->_work_items_timer, wi);
+ _append_to_list(tpg->_work_items_timer, wi);
+ tpg->_timer_work_items_remaining--;
+
bool invoketimer = false;
- if(wi->_timepoint1 != std::chrono::steady_clock::time_point())
+ if(wi->_has_timer_set_relative())
{
// Special constant for immediately rescheduled work items
if(wi->_timepoint1 == std::chrono::steady_clock::time_point(std::chrono::steady_clock::duration(1)))
@@ -982,8 +1077,7 @@ namespace detail
}
}
}
- if(wi->_timepoint2 != std::chrono::system_clock::time_point() &&
- (earliest_absolute == std::chrono::system_clock::time_point() || wi->_timepoint2 < earliest_absolute))
+ if(wi->_has_timer_set_absolute() && (earliest_absolute == std::chrono::system_clock::time_point() || wi->_timepoint2 < earliest_absolute))
{
earliest_absolute = wi->_timepoint2;
if(wi->_timepoint2 <= std::chrono::system_clock::now())
@@ -991,11 +1085,12 @@ namespace detail
invoketimer = true;
}
}
+ // If this work item's timer is due, execute immediately
if(invoketimer)
{
wi->_internalworkh = self;
wi->_internaltimerh = nullptr;
-#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
std::cout << "*** DTP " << self << " executes timer item " << wi << std::endl;
#endif
gg.unlock();
@@ -1005,68 +1100,80 @@ namespace detail
// wi->_internalworkh should be null, however wi may also no longer exist
goto restart;
}
-#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
std::cout << "*** DTP " << self << " timer item " << wi << " timer is not ready yet " << std::endl;
#endif
+ // Process all timers before considering work
+ continue;
}
- else
+ if(tpg->_active_work_items_remaining > 0)
{
- auto startinggroup = wq->currentgroup;
- bool at_top = (workqueue_depth == workqueue.size() - 1);
- do
+ auto *wi = tpg->_work_items_active.front;
+ assert(!wi->_has_timer_set());
+ _remove_from_list(tpg->_work_items_active, wi);
+ _append_to_list(tpg->_work_items_active, wi);
+ tpg->_active_work_items_remaining--;
+
+ if(tpg->_newly_added_active_work_items > 0)
{
- gg.unlock(); // unlock group
- if(++wq->currentgroup == wq->items.end())
- {
- wq->currentgroup = wq->items.begin();
- }
- if(!at_top)
- {
- workqueue_depth = workqueue.size() - 1;
- wq_it = --workqueue.end();
- at_top = true;
- wq = &(*wq_it);
- startinggroup = wq->currentgroup;
- }
- tpg = *wq->currentgroup;
- gg = _lock_guard(tpg->_lock); // lock group
-#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
- std::cout << "*** DTP " << self << " workqueue distance " << (workqueue.end() - wq_it - 1) << " examining " << tpg
- << " finds _work_items_active.count = " << tpg->_work_items_active.count << "." << std::endl;
+ tpg->_newly_added_active_work_items--;
+ }
+ workitem = wi;
+ workitem->_internalworkh = self;
+#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP " << self << " chooses work item " << workitem << " from group " << tpg << " distance from top "
+ << (workqueue.end() - wq_it - 1) << std::endl;
#endif
- if(startinggroup == wq->currentgroup)
+ // Execute this workitem
+ break;
+ }
+
+ // Move to next group, which may be in a lower priority work queue
+ assert(tpg->_active_work_items_remaining == 0);
+ if(++wq->currentgroup == wq->items.end())
+ {
+#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP " << self << " reaches end of current group, started_from_top = " << started_from_top << std::endl;
+#endif
+ gg.unlock(); // unlock group
+ if(!started_from_top)
+ {
+ workqueue_depth = (size_t) -1;
+ goto restart;
+ }
+ // Nothing for me to do in this workqueue
+ if(wq_it == workqueue.begin())
+ {
+ // Reset all wq->currentgroup to begin() to ensure timers
+ // etc in lower priority groups do get seen
+ for(auto &i : workqueue)
{
- if(tpg->_work_items_active.count == 0 || tpg->_work_items_active.front->_internalworkh != nullptr)
+ if(i.currentgroup == i.items.end())
{
- // Nothing for me to do in this workqueue
- if(wq_it == workqueue.begin())
- {
- assert(workitem == nullptr);
-#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
- std::cout << "*** DTP " << self << " finds work queue empty, going to sleep." << std::endl;
-#endif
- goto workqueue_empty;
- }
- gg.unlock(); // unlock group
- --wq_it;
- workqueue_depth = wq_it - workqueue.begin();
- wq = &(*wq_it);
- tpg = *wq->currentgroup;
- startinggroup = wq->currentgroup;
- gg = _lock_guard(tpg->_lock); // lock group
-#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
- std::cout << "*** DTP " << self << " moves work search up to distance from top = " << (workqueue.end() - wq_it - 1) << " examining " << tpg
- << " finds _work_items_active.count = " << tpg->_work_items_active.count << "." << std::endl;
-#endif
- continue;
+ i.currentgroup = i.items.begin();
}
}
- } while(tpg->_work_items_active.count == 0);
- wq->currentgroupremaining = tpg->_work_items_active.count;
-#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
- std::cout << "*** DTP " << self << " choose new group " << tpg << std::endl;
+#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP " << self << " finds work queue empty, going to sleep." << std::endl;
+#endif
+ assert(workitem == nullptr);
+ goto workqueue_empty;
+ }
+ --wq_it;
+ workqueue_depth = wq_it - workqueue.begin();
+ wq = &(*wq_it);
+ tpg = *wq->currentgroup;
+ gg = _lock_guard(tpg->_lock); // lock group
+#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP " << self << " moves work search up to distance from top = " << (workqueue.end() - wq_it - 1) << " examining " << tpg
+ << " finds _work_items_active.count = " << tpg->_work_items_active.count << "." << std::endl;
#endif
}
+ tpg = *wq->currentgroup;
+ tpg->_active_work_items_remaining = tpg->_work_items_active.count;
+#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP " << self << " choose new group " << tpg << std::endl;
+#endif
}
}
workqueue_empty:
@@ -1089,7 +1196,7 @@ namespace detail
earliest_absolute = {};
}
}
- else if(now - self->last_did_work >= std::chrono::minutes(1))
+ else if(now - self->last_did_work >= std::chrono::milliseconds(ms_sleep_for_more_work.load(std::memory_order_relaxed)))
{
_remove_from_list(threadpool_active, self);
threadpool_threads.fetch_sub(1, std::memory_order_release);
@@ -1104,14 +1211,14 @@ namespace detail
threadpool_sleeping_count.fetch_add(1, std::memory_order_release);
if(earliest_absolute != std::chrono::system_clock::time_point())
{
-#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
std::cout << "*** DTP " << self << " goes to sleep absolute" << std::endl;
#endif
self->cond.wait_until(g, earliest_absolute);
}
else
{
-#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
std::cout << "*** DTP " << self << " goes to sleep for " << std::chrono::duration_cast<std::chrono::milliseconds>(duration).count() << std::endl;
#endif
self->cond.wait_for(g, duration);
@@ -1120,7 +1227,7 @@ namespace detail
_remove_from_list(threadpool_sleeping, self);
_append_to_list(threadpool_active, self);
threadpool_sleeping_count.fetch_sub(1, std::memory_order_release);
-#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
std::cout << "*** DTP " << self << " wakes, state = " << self->state << std::endl;
#endif
g.unlock();
@@ -1135,7 +1242,7 @@ namespace detail
continue;
}
self->last_did_work = now;
-#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
std::cout << "*** DTP " << self << " executes work item " << workitem << std::endl;
#endif
total_submitted_workitems.fetch_sub(1, std::memory_order_relaxed);
@@ -1169,16 +1276,16 @@ namespace detail
inline void global_dynamic_thread_pool_impl::_submit_work_item(_lock_guard &g, dynamic_thread_pool_group::work_item *workitem, bool defer_pool_wake)
{
(void) g;
+ (void) defer_pool_wake;
if(workitem->_nextwork != -1)
{
// If no work item for now, or there is a delay, schedule a timer
- if(workitem->_nextwork == 0 || workitem->_timepoint1 != std::chrono::steady_clock::time_point() ||
- workitem->_timepoint2 != std::chrono::system_clock::time_point())
+ if(workitem->_nextwork == 0 || workitem->_has_timer_set())
{
assert(workitem->_internaltimerh != nullptr);
#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
dispatch_time_t when;
- if(workitem->_timepoint1 != std::chrono::steady_clock::time_point())
+ if(workitem->_has_timer_set_relative())
{
auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(workitem->_timepoint1 - std::chrono::steady_clock::now()).count();
if(duration > 1000000000LL)
@@ -1191,7 +1298,7 @@ namespace detail
}
when = dispatch_time(DISPATCH_TIME_NOW, duration);
}
- else if(workitem->_timepoint2 != std::chrono::system_clock::time_point())
+ else if(workitem->_has_timer_set_absolute())
{
deadline d(workitem->_timepoint2);
auto now = std::chrono::system_clock::now();
@@ -1210,7 +1317,7 @@ namespace detail
#elif defined(_WIN32)
LARGE_INTEGER li;
DWORD slop = 1000;
- if(workitem->_timepoint1 != std::chrono::steady_clock::time_point())
+ if(workitem->_has_timer_set_relative())
{
li.QuadPart = std::chrono::duration_cast<std::chrono::nanoseconds>(workitem->_timepoint1 - std::chrono::steady_clock::now()).count() / 100;
if(li.QuadPart < 0)
@@ -1223,7 +1330,7 @@ namespace detail
}
li.QuadPart = -li.QuadPart; // negative is relative
}
- else if(workitem->_timepoint2 != std::chrono::system_clock::time_point())
+ else if(workitem->_has_timer_set_absolute())
{
li = windows_nt_kernel::from_timepoint(workitem->_timepoint2);
}
@@ -1328,7 +1435,7 @@ namespace detail
auto uninit = make_scope_exit([&]() noexcept {
for(auto *i : work)
{
- _remove_from_list(group->_work_items_active, i);
+ _remove_from_list(!i->_has_timer_set() ? group->_work_items_active : group->_work_items_timer, i);
#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
i->_internalworkh = nullptr;
i->_internaltimerh = nullptr;
@@ -1368,13 +1475,24 @@ namespace detail
}
#endif
OUTCOME_TRY(_prepare_work_item_delay(i, group->_grouph, d));
- _append_to_list(group->_work_items_active, i);
+ if(!i->_has_timer_set())
+ {
+ _prepend_to_list(group->_work_items_active, i);
+ }
+ else
+ {
+ _prepend_to_list(group->_work_items_timer, i);
+ }
}
}
uninit.release();
{
for(auto *i : work)
{
+#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32)
+ group->_newly_added_active_work_items++;
+ group->_active_work_items_remaining++;
+#endif
_submit_work_item(g, i, i != work.back());
}
}
@@ -1390,6 +1508,9 @@ namespace detail
{
(void) g;
// std::cout << "*** _work_item_done " << i << std::endl;
+ auto *parent = i->_parent.load(std::memory_order_relaxed);
+ _remove_from_list(!i->_has_timer_set() ? parent->_work_items_active : parent->_work_items_timer, i);
+ _append_to_list(parent->_work_items_done, i);
#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
i->_internaltimerh = nullptr;
i->_internalworkh = nullptr;
@@ -1418,11 +1539,8 @@ namespace detail
std::cout << "*** DTP sets done work item " << i << std::endl;
#endif
#endif
- _remove_from_list(i->_parent.load(std::memory_order_relaxed)->_work_items_active, i);
- _append_to_list(i->_parent.load(std::memory_order_relaxed)->_work_items_done, i);
- if(i->_parent.load(std::memory_order_relaxed)->_work_items_active.count == 0)
+ if(parent->_work_items_timer.count == 0 && parent->_work_items_active.count == 0)
{
- auto *parent = i->_parent.load(std::memory_order_relaxed);
i = nullptr;
auto *v = parent->_work_items_done.front, *n = v;
for(; v != nullptr; v = n)
@@ -1542,7 +1660,7 @@ namespace detail
group->_waits.fetch_add(1, std::memory_order_release);
auto unwaitcount = make_scope_exit([&]() noexcept { group->_waits.fetch_sub(1, std::memory_order_release); });
#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
- while(group->_work_items_active.count > 0)
+ while(group->_work_items_timer.count > 0 || group->_work_items_active.count > 0)
{
LLFIO_DEADLINE_TO_TIMEOUT_LOOP(d);
dispatch_time_t timeout = DISPATCH_TIME_FOREVER;
@@ -1572,9 +1690,9 @@ namespace detail
// Is this a cancellation?
if(group->_stopping.load(std::memory_order_relaxed))
{
- while(group->_work_items_active.count > 0)
+ while(group->_work_items_timer.count > 0 || group->_work_items_active.count > 0)
{
- auto *i = group->_work_items_active.front;
+ auto *i = (group->_work_items_active.front != nullptr) ? group->_work_items_active.front : group->_work_items_timer.front;
if(nullptr != i->_internalworkh)
{
if(0 == i->_internalworkh_inuse)
@@ -1623,6 +1741,11 @@ namespace detail
}
i->_internalworkh_inuse = 0;
}
+ if(group->_work_items_timer.count > 0 && group->_work_items_timer.front == i)
+ {
+ // This item got cancelled before it started
+ _work_item_done(g, group->_work_items_timer.front);
+ }
if(group->_work_items_active.count > 0 && group->_work_items_active.front == i)
{
// This item got cancelled before it started
@@ -1633,9 +1756,9 @@ namespace detail
}
else if(!d)
{
- while(group->_work_items_active.count > 0)
+ while(group->_work_items_timer.count > 0 || group->_work_items_active.count > 0)
{
- auto *i = group->_work_items_active.front;
+ auto *i = (group->_work_items_active.front != nullptr) ? group->_work_items_active.front : group->_work_items_timer.front;
if(nullptr != i->_internalworkh)
{
if(0 == i->_internalworkh_inuse)
@@ -1688,7 +1811,7 @@ namespace detail
}
else
{
- while(group->_work_items_active.count > 0)
+ while(group->_work_items_timer.count > 0 || group->_work_items_active.count > 0)
{
LLFIO_DEADLINE_TO_TIMEOUT_LOOP(d);
g.unlock();
@@ -1704,6 +1827,16 @@ namespace detail
for(bool done = false; !done;)
{
done = true;
+ for(auto *p = group->_work_items_timer.front; p != nullptr; p = p->_next)
+ {
+ if(p->_internalworkh == nullptr)
+ {
+ _remove_from_list(group->_work_items_timer, p);
+ _append_to_list(group->_work_items_done, p);
+ done = false;
+ break;
+ }
+ }
for(auto *p = group->_work_items_active.front; p != nullptr; p = p->_next)
{
if(p->_internalworkh == nullptr)
@@ -1717,7 +1850,7 @@ namespace detail
}
}
#endif
- while(group->_work_items_active.count > 0)
+ while(group->_work_items_timer.count > 0 || group->_work_items_active.count > 0)
{
LLFIO_DEADLINE_TO_TIMEOUT_LOOP(d);
g.unlock();
@@ -1726,7 +1859,7 @@ namespace detail
}
#endif
}
- if(group->_work_items_active.count > 0)
+ if(group->_work_items_timer.count > 0 || group->_work_items_active.count > 0)
{
return errc::timed_out;
}
@@ -1748,7 +1881,7 @@ namespace detail
_work_item_done(g, workitem);
return;
}
- if(workitem->_timepoint1 != std::chrono::steady_clock::time_point())
+ if(workitem->_has_timer_set_relative())
{
#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
auto now = std::chrono::steady_clock::now();
@@ -1761,7 +1894,7 @@ namespace detail
#endif
workitem->_timepoint1 = {};
}
- if(workitem->_timepoint2 != std::chrono::system_clock::time_point())
+ if(workitem->_has_timer_set_absolute())
{
#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
auto now = std::chrono::system_clock::now();
diff --git a/include/llfio/v2.0/dynamic_thread_pool_group.hpp b/include/llfio/v2.0/dynamic_thread_pool_group.hpp
index 748bd94c..6caf6bf3 100644
--- a/include/llfio/v2.0/dynamic_thread_pool_group.hpp
+++ b/include/llfio/v2.0/dynamic_thread_pool_group.hpp
@@ -1,5 +1,5 @@
/* Dynamic thread pool group
-(C) 2020 Niall Douglas <http://www.nedproductions.biz/> (9 commits)
+(C) 2020-2021 Niall Douglas <http://www.nedproductions.biz/> (9 commits)
File Created: Dec 2020
@@ -168,6 +168,7 @@ allocations.
*/
class LLFIO_DECL dynamic_thread_pool_group
{
+ friend class dynamic_thread_pool_group_impl;
public:
//! An individual item of work within the work group.
class work_item
@@ -175,7 +176,8 @@ public:
friend struct detail::global_dynamic_thread_pool_impl;
friend class dynamic_thread_pool_group_impl;
std::atomic<dynamic_thread_pool_group_impl *> _parent{nullptr};
- void *_internalworkh{nullptr}, *_internaltimerh{nullptr};
+ void *_internalworkh{nullptr};
+ void *_internaltimerh{nullptr}; // lazily created if next() ever returns a deadline
work_item *_prev{nullptr}, *_next{nullptr};
intptr_t _nextwork{-1};
std::chrono::steady_clock::time_point _timepoint1;
@@ -183,7 +185,9 @@ public:
int _internalworkh_inuse{0};
protected:
- void *_private{nullptr};
+ constexpr bool _has_timer_set_relative() const noexcept { return _timepoint1 != std::chrono::steady_clock::time_point(); }
+ constexpr bool _has_timer_set_absolute() const noexcept { return _timepoint2 != std::chrono::system_clock::time_point(); }
+ constexpr bool _has_timer_set() const noexcept { return _has_timer_set_relative() || _has_timer_set_absolute(); }
constexpr work_item() {}
work_item(const work_item &o) = delete;
@@ -197,7 +201,6 @@ public:
, _timepoint1(o._timepoint1)
, _timepoint2(o._timepoint2)
, _internalworkh_inuse(o._internalworkh_inuse)
- , _private(o._private)
{
assert(o._parent.load(std::memory_order_relaxed) == nullptr);
assert(o._internalworkh == nullptr);
@@ -210,7 +213,6 @@ public:
o._prev = o._next = nullptr;
o._nextwork = -1;
o._internalworkh_inuse = 0;
- o._private = nullptr;
}
work_item &operator=(const work_item &) = delete;
work_item &operator=(work_item &&) = delete;
@@ -459,6 +461,20 @@ public:
static LLFIO_HEADERS_ONLY_MEMFUNC_SPEC size_t current_nesting_level() noexcept;
//! Returns the work item the calling thread is running within, if any.
static LLFIO_HEADERS_ONLY_MEMFUNC_SPEC work_item *current_work_item() noexcept;
+ /*! \brief Returns the number of milliseconds that a thread is without work before it is shut down.
+ Note that this will be zero on all but on Linux if using our local thread pool
+ implementation, because the system controls this value on Windows, Grand Central
+ Dispatch etc.
+ */
+ static LLFIO_HEADERS_ONLY_MEMFUNC_SPEC uint32_t ms_sleep_for_more_work() noexcept;
+ /*! \brief Sets the number of milliseconds that a thread is without work before it is shut down,
+ returning the value actually set.
+
+ Note that this will have no effect (and thus return zero) on all but on Linux if
+ using our local thread pool implementation, because the system controls this value
+ on Windows, Grand Central Dispatch etc.
+ */
+ static LLFIO_HEADERS_ONLY_MEMFUNC_SPEC uint32_t ms_sleep_for_more_work(uint32_t v) noexcept;
};
//! A unique ptr to a work group within the global dynamic thread pool.
using dynamic_thread_pool_group_ptr = std::unique_ptr<dynamic_thread_pool_group>;
diff --git a/test/tests/dynamic_thread_pool_group.cpp b/test/tests/dynamic_thread_pool_group.cpp
index 6e985fc8..dfc46e9a 100644
--- a/test/tests/dynamic_thread_pool_group.cpp
+++ b/test/tests/dynamic_thread_pool_group.cpp
@@ -199,6 +199,7 @@ static inline void TestDynamicThreadPoolGroupWorks()
reset(1);
submit();
check();
+ exit(0);
// Test 10 work items
reset(10);