Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/windirstat/llfio.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2021-02-19 15:01:16 +0300
committerNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2021-03-16 13:21:43 +0300
commitbb4719390b20944395e6a62522d395f0228fa3da (patch)
treec991554ae754621ca4dc27935d231a05c0f58f5f /include
parent5558cb7bc5f3a74589e579e8504f76cde117bb03 (diff)
Finally got a fully working native implementation on Linux. I had realised that attempting to fix this each morning before work was turning into never actually making progress, so I carved out four hours this morning without distraction, and boom here we go.
I next need to simplify this implementation, it has become bloated with all the changes, it definitely could be made much leaner.
Diffstat (limited to 'include')
-rw-r--r--include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp456
-rw-r--r--include/llfio/v2.0/dynamic_thread_pool_group.hpp7
2 files changed, 229 insertions, 234 deletions
diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
index 07446ea0..df10efcb 100644
--- a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
+++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
@@ -68,17 +68,157 @@ LLFIO_V2_NAMESPACE_BEGIN
namespace detail
{
+ struct global_dynamic_thread_pool_impl_workqueue_item
+ {
+ std::unordered_set<dynamic_thread_pool_group_impl *> items;
+#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32)
+ dynamic_thread_pool_group::work_item *_next_active_front{nullptr}, *_next_timer_relative_front{nullptr}, *_next_timer_absolute_front{nullptr};
+ dynamic_thread_pool_group::work_item *_next_active_back{nullptr}, *_next_timer_relative_back{nullptr}, *_next_timer_absolute_back{nullptr};
+
+ dynamic_thread_pool_group::work_item *next_active()
+ {
+ auto *ret = _next_active_front;
+ if(ret == nullptr)
+ {
+ assert(_next_active_back == nullptr);
+ return nullptr;
+ }
+ _next_active_front = ret->_next_scheduled;
+ if(_next_active_front == nullptr)
+ {
+ assert(_next_active_back == ret);
+ _next_active_back = nullptr;
+ }
+ ret->_next_scheduled = nullptr;
+ return ret;
+ }
+ void append_active(dynamic_thread_pool_group::work_item *p)
+ {
+ if(_next_active_back == nullptr)
+ {
+ assert(_next_active_front == nullptr);
+ _next_active_front = _next_active_back = p;
+ return;
+ }
+ p->_next_scheduled = nullptr;
+ _next_active_back->_next_scheduled = p;
+ _next_active_back = p;
+ }
+ void prepend_active(dynamic_thread_pool_group::work_item *p)
+ {
+ if(_next_active_front == nullptr)
+ {
+ assert(_next_active_back == nullptr);
+ _next_active_front = _next_active_back = p;
+ return;
+ }
+ p->_next_scheduled = _next_active_front;
+ _next_active_front = p;
+ }
+
+ dynamic_thread_pool_group::work_item *next_timer(int which)
+ {
+ if(which == 0)
+ {
+ return nullptr;
+ }
+ auto *&front = (which == 1) ? _next_timer_relative_front : _next_timer_absolute_front;
+ auto *&back = (which == 1) ? _next_timer_relative_back : _next_timer_absolute_back;
+ auto *ret = front;
+ if(ret == nullptr)
+ {
+ assert(back == nullptr);
+ return nullptr;
+ }
+ front = ret->_next_scheduled;
+ if(front == nullptr)
+ {
+ assert(back == ret);
+ back = nullptr;
+ }
+ ret->_next_scheduled = nullptr;
+ return ret;
+ }
+ void append_timer(dynamic_thread_pool_group::work_item *i)
+ {
+ if(i->_timepoint1 != std::chrono::steady_clock::time_point())
+ {
+ if(_next_timer_relative_front == nullptr)
+ {
+ _next_timer_relative_front = _next_timer_relative_back = i;
+ }
+ else
+ {
+ bool done = false;
+ for(dynamic_thread_pool_group::work_item *p = nullptr, *n = _next_timer_relative_front; n != nullptr; p = n, n = n->_next_scheduled)
+ {
+ if(n->_timepoint1 <= i->_timepoint1)
+ {
+ if(p == nullptr)
+ {
+ i->_next_scheduled = n;
+ _next_timer_relative_front = i;
+ }
+ else
+ {
+ i->_next_scheduled = n;
+ p->_next_scheduled = i;
+ }
+ done = true;
+ break;
+ }
+ }
+ if(!done)
+ {
+ _next_timer_relative_back->_next_scheduled = i;
+ i->_next_scheduled = nullptr;
+ _next_timer_relative_back = i;
+ }
+ }
+ }
+ else
+ {
+ if(_next_timer_absolute_front == nullptr)
+ {
+ _next_timer_absolute_front = _next_timer_absolute_back = i;
+ }
+ else
+ {
+ bool done = false;
+ for(dynamic_thread_pool_group::work_item *p = nullptr, *n = _next_timer_absolute_front; n != nullptr; p = n, n = n->_next_scheduled)
+ {
+ if(n->_timepoint2 <= i->_timepoint2)
+ {
+ if(p == nullptr)
+ {
+ i->_next_scheduled = n;
+ _next_timer_absolute_front = i;
+ }
+ else
+ {
+ i->_next_scheduled = n;
+ p->_next_scheduled = i;
+ }
+ done = true;
+ break;
+ }
+ }
+ if(!done)
+ {
+ _next_timer_absolute_back->_next_scheduled = i;
+ i->_next_scheduled = nullptr;
+ _next_timer_absolute_back = i;
+ }
+ }
+ }
+ }
+#endif
+ };
struct global_dynamic_thread_pool_impl
{
std::mutex workqueue_lock;
using _lock_guard = std::unique_lock<std::mutex>;
- struct workqueue_item
- {
- std::unordered_set<dynamic_thread_pool_group_impl *> items;
- std::unordered_set<dynamic_thread_pool_group_impl *>::iterator currentgroup{items.begin()};
- dynamic_thread_pool_group_impl *_currentgroup{nullptr};
- };
- std::vector<workqueue_item> workqueue;
+ std::vector<global_dynamic_thread_pool_impl_workqueue_item> workqueue;
#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
using threadh_type = void *;
using grouph_type = dispatch_group_t;
@@ -715,7 +855,8 @@ namespace detail
return success();
}
- inline void _submit_work_item(_lock_guard &g, bool item_in_timer_list, dynamic_thread_pool_group::work_item *workitem, bool defer_pool_wake);
+ inline void _submit_work_item(_lock_guard &g, bool item_in_timer_list, bool submit_into_highest_priority, dynamic_thread_pool_group::work_item *workitem,
+ bool defer_pool_wake);
inline result<void> submit(_lock_guard &g, dynamic_thread_pool_group_impl *group, span<dynamic_thread_pool_group::work_item *> work) noexcept;
@@ -796,31 +937,10 @@ public:
// Append this group to the global work queue at its nesting level
if(_nesting_level >= impl.workqueue.size())
{
- for(auto &i : impl.workqueue)
- {
- i._currentgroup = (i.currentgroup != i.items.end()) ? *i.currentgroup : nullptr;
- i.currentgroup = {};
- }
impl.workqueue.resize(_nesting_level + 1);
- for(auto &i : impl.workqueue)
- {
- if(i._currentgroup != nullptr)
- {
- i.currentgroup = i.items.find(i._currentgroup);
- }
- else
- {
- i.currentgroup = i.items.end();
- }
- i._currentgroup = nullptr;
- }
}
auto &wq = impl.workqueue[_nesting_level];
wq.items.insert(this);
- if(wq.items.size() == 1)
- {
- wq.currentgroup = wq.items.begin();
- }
return success();
}
catch(...)
@@ -850,13 +970,6 @@ public:
detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.workqueue_lock); // lock global
assert(impl.workqueue.size() > _nesting_level);
auto &wq = impl.workqueue[_nesting_level];
- if(wq.items.end() != wq.currentgroup && *wq.currentgroup == this)
- {
- if(wq.items.end() == ++wq.currentgroup)
- {
- wq.currentgroup = wq.items.begin();
- }
- }
wq.items.erase(this);
while(!impl.workqueue.empty() && impl.workqueue.back().items.empty())
{
@@ -985,208 +1098,59 @@ namespace detail
#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
std::cout << "*** DTP " << self << " begins." << std::endl;
#endif
- size_t workqueue_depth = workqueue.size() - 1;
while(self->state > 0)
{
- restart:
dynamic_thread_pool_group::work_item *workitem = nullptr;
- std::chrono::steady_clock::time_point earliest_duration;
- std::chrono::system_clock::time_point earliest_absolute;
- if(!workqueue.empty())
+ bool workitem_is_timer = false;
+ std::chrono::steady_clock::time_point now_steady, earliest_duration;
+ std::chrono::system_clock::time_point now_system, earliest_absolute;
+ for(auto it = workqueue.rbegin(); it != workqueue.rend() && workitem == nullptr; ++it)
{
- auto wq_it = --workqueue.end();
- bool in_highest_priority_queue = true, started_from_top = false;
- if(workqueue.size() > 1 && workqueue_depth < workqueue.size() - 1)
- {
- wq_it = workqueue.begin() + workqueue_depth;
- in_highest_priority_queue = false;
- }
- auto *wq = &(*wq_it);
- if(in_highest_priority_queue)
- {
- if(wq->currentgroup == wq->items.end())
- {
- wq->currentgroup = wq->items.begin();
- started_from_top = true;
- }
- else if(wq->currentgroup == wq->items.begin())
- {
- started_from_top = true;
- }
- }
- for(;;)
+ auto &wq = *it;
+ if(wq._next_timer_relative_front != nullptr)
{
- if(workqueue_depth >= workqueue.size())
- {
- goto restart;
- }
- wq_it = workqueue.begin() + workqueue_depth;
- wq = &(*wq_it);
- dynamic_thread_pool_group_impl *tpg = *wq->currentgroup;
- _lock_guard gg(tpg->_lock); // lock group
- if(started_from_top)
- {
- if(tpg->_timer_work_items_remaining == 0 && tpg->_active_work_items_remaining == 0)
- {
- tpg->_timer_work_items_remaining = (size_t) -1;
- tpg->_active_work_items_remaining = (size_t) -1;
- }
- else
- {
- started_from_top = false;
- }
- }
- std::cout << "*** DTP " << self << " sees in group " << tpg << " in_highest_priority_queue = " << in_highest_priority_queue
- << " work items = " << tpg->_work_items_active.count << " timer items = " << tpg->_work_items_timer.count << std::endl;
- if(tpg->_work_items_timer.count > 0 && tpg->_work_items_timer.front->_internalworkh != nullptr)
- {
- tpg->_timer_work_items_remaining = 0; // this group is being fully executed right now
- }
- else if(tpg->_timer_work_items_remaining > tpg->_work_items_timer.count)
+ if(now_steady == std::chrono::steady_clock::time_point())
{
- tpg->_timer_work_items_remaining = tpg->_work_items_timer.count;
+ now_steady = std::chrono::steady_clock::now();
}
- // If we are not in the highest priority group, and there are no newly added
- // work items, don't execute work.
- if(tpg->_work_items_active.count > 0 &&
- (tpg->_work_items_active.front->_internalworkh != nullptr || (!in_highest_priority_queue && tpg->_newly_added_active_work_items == 0)))
+ if(wq._next_timer_relative_front->_timepoint1 <= now_steady)
{
- tpg->_active_work_items_remaining = 0; // this group is being fully executed right now
+ workitem = wq.next_timer(1);
+ workitem_is_timer = true;
+ break;
}
- else if(tpg->_active_work_items_remaining > tpg->_work_items_active.count)
+ if(earliest_duration == std::chrono::steady_clock::time_point() || wq._next_timer_relative_front->_timepoint1 < earliest_duration)
{
- tpg->_active_work_items_remaining = tpg->_work_items_active.count;
+ earliest_duration = wq._next_timer_relative_front->_timepoint1;
}
- std::cout << "*** DTP " << self << " sees in group " << tpg << " _active_work_items_remaining = " << tpg->_active_work_items_remaining
- << " _timer_work_items_remaining = " << tpg->_timer_work_items_remaining << std::endl;
- if(tpg->_timer_work_items_remaining > 0)
+ }
+ if(wq._next_timer_absolute_front != nullptr)
+ {
+ if(now_system == std::chrono::system_clock::time_point())
{
- auto *wi = tpg->_work_items_timer.front;
- assert(wi->_has_timer_set());
- _remove_from_list(tpg->_work_items_timer, wi);
- _append_to_list(tpg->_work_items_timer, wi);
- tpg->_timer_work_items_remaining--;
-
- bool invoketimer = false;
- if(wi->_has_timer_set_relative())
- {
- // Special constant for immediately rescheduled work items
- if(wi->_timepoint1 == std::chrono::steady_clock::time_point(std::chrono::steady_clock::duration(1)))
- {
- invoketimer = true;
- }
- else if(earliest_duration == std::chrono::steady_clock::time_point() || wi->_timepoint1 < earliest_duration)
- {
- earliest_duration = wi->_timepoint1;
- if(wi->_timepoint1 <= std::chrono::steady_clock::now())
- {
- invoketimer = true;
- }
- }
- }
- if(wi->_has_timer_set_absolute() && (earliest_absolute == std::chrono::system_clock::time_point() || wi->_timepoint2 < earliest_absolute))
- {
- earliest_absolute = wi->_timepoint2;
- if(wi->_timepoint2 <= std::chrono::system_clock::now())
- {
- invoketimer = true;
- }
- }
- // If this work item's timer is due, execute immediately
- if(invoketimer)
- {
- wi->_internalworkh = self;
-#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
- std::cout << "*** DTP " << self << " executes timer item " << wi << std::endl;
-#endif
- gg.unlock();
- g.unlock();
- _timerthread(wi, nullptr);
- g.lock();
- goto restart;
- }
-#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
- std::cout << "*** DTP " << self << " timer item " << wi << " timer is not ready yet " << std::endl;
-#endif
- // Process all timers before considering work
- continue;
+ now_system = std::chrono::system_clock::now();
}
- if(tpg->_active_work_items_remaining > 0)
+ if(wq._next_timer_absolute_front->_timepoint2 <= now_system)
{
- auto *wi = tpg->_work_items_active.front;
- assert(!wi->_has_timer_set());
- _remove_from_list(tpg->_work_items_active, wi);
- _append_to_list(tpg->_work_items_active, wi);
- tpg->_active_work_items_remaining--;
-
- if(tpg->_newly_added_active_work_items > 0)
- {
- tpg->_newly_added_active_work_items--;
- }
- workitem = wi;
- workitem->_internalworkh = self;
-#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
- std::cout << "*** DTP " << self << " chooses work item " << workitem << " from group " << tpg << " distance from top "
- << (workqueue.end() - wq_it - 1) << std::endl;
-#endif
- // Execute this workitem
+ workitem = wq.next_timer(2);
+ workitem_is_timer = true;
break;
}
-
- // Move to next group, which may be in a lower priority work queue
- assert(tpg->_active_work_items_remaining == 0);
- if(++wq->currentgroup == wq->items.end())
+ if(earliest_absolute == std::chrono::system_clock::time_point() || wq._next_timer_absolute_front->_timepoint2 < earliest_absolute)
{
-#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
- std::cout << "*** DTP " << self << " reaches end of current group, started_from_top = " << started_from_top << std::endl;
-#endif
- gg.unlock(); // unlock group
- if(!started_from_top)
- {
- workqueue_depth = (size_t) -1;
- goto restart;
- }
- // Nothing for me to do in this workqueue
- if(wq_it == workqueue.begin())
- {
- // Reset all wq->currentgroup to begin() to ensure timers
- // etc in lower priority groups do get seen
- for(auto &i : workqueue)
- {
- if(i.currentgroup == i.items.end())
- {
- i.currentgroup = i.items.begin();
- }
- }
-#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
- std::cout << "*** DTP " << self << " finds work queue empty, going to sleep." << std::endl;
-#endif
- assert(workitem == nullptr);
- goto workqueue_empty;
- }
- --wq_it;
- workqueue_depth = wq_it - workqueue.begin();
- wq = &(*wq_it);
- tpg = *wq->currentgroup;
- gg = _lock_guard(tpg->_lock); // lock group
-#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
- std::cout << "*** DTP " << self << " moves work search up to distance from top = " << (workqueue.end() - wq_it - 1) << " examining " << tpg
- << " finds _work_items_active.count = " << tpg->_work_items_active.count << "." << std::endl;
-#endif
+ earliest_absolute = wq._next_timer_absolute_front->_timepoint2;
}
- tpg = *wq->currentgroup;
- tpg->_active_work_items_remaining = tpg->_work_items_active.count;
-#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
- std::cout << "*** DTP " << self << " choose new group " << tpg << std::endl;
-#endif
}
+ workitem = wq.next_active();
+ }
+ if(now_steady == std::chrono::steady_clock::time_point())
+ {
+ now_steady = std::chrono::steady_clock::now();
}
- workqueue_empty:
- auto now = std::chrono::steady_clock::now();
if(workitem == nullptr)
{
const std::chrono::steady_clock::duration max_sleep(std::chrono::milliseconds(ms_sleep_for_more_work.load(std::memory_order_relaxed)));
- if(now - self->last_did_work >= max_sleep)
+ if(now_steady - self->last_did_work >= max_sleep)
{
_remove_from_list(threadpool_active, self);
threadpool_threads.fetch_sub(1, std::memory_order_release);
@@ -1197,14 +1161,18 @@ namespace detail
std::chrono::steady_clock::duration duration(max_sleep);
if(earliest_duration != std::chrono::steady_clock::time_point())
{
- if(now - earliest_duration < duration)
+ if(now_steady - earliest_duration < duration)
{
- duration = now - earliest_duration;
+ duration = now_steady - earliest_duration;
}
}
else if(earliest_absolute != std::chrono::system_clock::time_point())
{
- auto diff = std::chrono::system_clock::now() - earliest_absolute;
+ if(now_system == std::chrono::system_clock::time_point())
+ {
+ now_system = std::chrono::system_clock::now();
+ }
+ auto diff = now_system - earliest_absolute;
if(diff > duration)
{
earliest_absolute = {};
@@ -1216,14 +1184,14 @@ namespace detail
threadpool_sleeping_count.fetch_add(1, std::memory_order_release);
if(earliest_absolute != std::chrono::system_clock::time_point())
{
-#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
std::cout << "*** DTP " << self << " goes to sleep absolute" << std::endl;
#endif
self->cond.wait_until(g, earliest_absolute);
}
else
{
-#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
std::cout << "*** DTP " << self << " goes to sleep for " << std::chrono::duration_cast<std::chrono::milliseconds>(duration).count() << std::endl;
#endif
self->cond.wait_for(g, duration);
@@ -1232,13 +1200,13 @@ namespace detail
_remove_from_list(threadpool_sleeping, self);
_append_to_list(threadpool_active, self);
threadpool_sleeping_count.fetch_sub(1, std::memory_order_release);
-#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
std::cout << "*** DTP " << self << " wakes, state = " << self->state << std::endl;
#endif
g.unlock();
try
{
- populate_threadmetrics(now);
+ populate_threadmetrics(now_steady);
}
catch(...)
{
@@ -1246,17 +1214,24 @@ namespace detail
g.lock();
continue;
}
- self->last_did_work = now;
-#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ self->last_did_work = now_steady;
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
std::cout << "*** DTP " << self << " executes work item " << workitem << std::endl;
#endif
total_submitted_workitems.fetch_sub(1, std::memory_order_relaxed);
g.unlock();
- _workerthread(workitem, nullptr);
+ if(workitem_is_timer)
+ {
+ _timerthread(workitem, nullptr);
+ }
+ else
+ {
+ _workerthread(workitem, nullptr);
+ }
// workitem->_internalworkh should be null, however workitem may also no longer exist
try
{
- if(populate_threadmetrics(now))
+ if(populate_threadmetrics(now_steady))
{
_remove_from_list(threadpool_active, self);
threadpool_threads.fetch_sub(1, std::memory_order_release);
@@ -1278,10 +1253,11 @@ namespace detail
}
#endif
- inline void global_dynamic_thread_pool_impl::_submit_work_item(_lock_guard &g, bool item_in_timer_list, dynamic_thread_pool_group::work_item *workitem,
- bool defer_pool_wake)
+ inline void global_dynamic_thread_pool_impl::_submit_work_item(_lock_guard &g, bool item_in_timer_list, bool submit_into_highest_priority,
+ dynamic_thread_pool_group::work_item *workitem, bool defer_pool_wake)
{
(void) g;
+ (void) submit_into_highest_priority;
(void) defer_pool_wake;
if(workitem->_nextwork != -1)
{
@@ -1371,6 +1347,10 @@ namespace detail
ft.dwLowDateTime = li.LowPart;
// std::cout << "*** timer " << workitem << std::endl;
SetThreadpoolTimer((PTP_TIMER) workitem->_internaltimerh, &ft, 0, slop);
+#else
+ _lock_guard gg(workqueue_lock);
+ auto *wq = &workqueue[submit_into_highest_priority ? (workqueue.size() - 1) : parent->_nesting_level];
+ wq->append_timer(workitem);
#endif
}
else
@@ -1406,6 +1386,18 @@ namespace detail
SetThreadpoolCallbackPriority(parent->_grouph, priority);
// std::cout << "*** submit " << workitem << std::endl;
SubmitThreadpoolWork((PTP_WORK) workitem->_internalworkh);
+#else
+ _lock_guard gg(workqueue_lock);
+ auto *wq = &workqueue[submit_into_highest_priority ? (workqueue.size() - 1) : parent->_nesting_level];
+ // If the item came from a timer, prioritise execution
+ if(item_in_timer_list)
+ {
+ wq->prepend_active(workitem);
+ }
+ else
+ {
+ wq->append_active(workitem);
+ }
#endif
}
@@ -1526,7 +1518,7 @@ namespace detail
group->_newly_added_active_work_items++;
group->_active_work_items_remaining++;
#endif
- _submit_work_item(g, i->_has_timer_set(), i, i != work.back());
+ _submit_work_item(g, i->_has_timer_set(), true, i, i != work.back());
}
}
return success();
@@ -1942,10 +1934,10 @@ namespace detail
_work_item_done(g, workitem);
return;
}
- _submit_work_item(g, false, workitem, false);
+ _submit_work_item(g, false, false, workitem, false);
return;
}
- _submit_work_item(g, true, workitem, false);
+ _submit_work_item(g, true, false, workitem, false);
}
// Worker thread entry point
@@ -2001,7 +1993,7 @@ namespace detail
_work_item_done(g, workitem);
return;
}
- _submit_work_item(g, false, workitem, false);
+ _submit_work_item(g, false, false, workitem, false);
}
}
} // namespace detail
diff --git a/include/llfio/v2.0/dynamic_thread_pool_group.hpp b/include/llfio/v2.0/dynamic_thread_pool_group.hpp
index 6caf6bf3..2af85564 100644
--- a/include/llfio/v2.0/dynamic_thread_pool_group.hpp
+++ b/include/llfio/v2.0/dynamic_thread_pool_group.hpp
@@ -43,6 +43,7 @@ class io_handle;
namespace detail
{
struct global_dynamic_thread_pool_impl;
+ struct global_dynamic_thread_pool_impl_workqueue_item;
LLFIO_HEADERS_ONLY_FUNC_SPEC global_dynamic_thread_pool_impl &global_dynamic_thread_pool() noexcept;
} // namespace detail
@@ -174,11 +175,12 @@ public:
class work_item
{
friend struct detail::global_dynamic_thread_pool_impl;
+ friend struct detail::global_dynamic_thread_pool_impl_workqueue_item;
friend class dynamic_thread_pool_group_impl;
std::atomic<dynamic_thread_pool_group_impl *> _parent{nullptr};
void *_internalworkh{nullptr};
void *_internaltimerh{nullptr}; // lazily created if next() ever returns a deadline
- work_item *_prev{nullptr}, *_next{nullptr};
+ work_item *_prev{nullptr}, *_next{nullptr}, *_next_scheduled{nullptr};
intptr_t _nextwork{-1};
std::chrono::steady_clock::time_point _timepoint1;
std::chrono::system_clock::time_point _timepoint2;
@@ -197,6 +199,7 @@ public:
, _internaltimerh(o._internaltimerh)
, _prev(o._prev)
, _next(o._next)
+ , _next_scheduled(o._next_scheduled)
, _nextwork(o._nextwork)
, _timepoint1(o._timepoint1)
, _timepoint2(o._timepoint2)
@@ -210,7 +213,7 @@ public:
LLFIO_LOG_FATAL(this, "FATAL: dynamic_thread_pool_group::work_item was relocated in memory during use!");
abort();
}
- o._prev = o._next = nullptr;
+ o._prev = o._next = o._next_scheduled = nullptr;
o._nextwork = -1;
o._internalworkh_inuse = 0;
}