Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/windirstat/llfio.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp')
-rw-r--r--include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp112
1 files changed, 42 insertions, 70 deletions
diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
index 9dff9425..287683bf 100644
--- a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
+++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
@@ -188,7 +188,6 @@ Benchmarking llfio (Linux native) ...
*/
-
/* Windows 4Kb and 64Kb Win32 thread pool
Benchmarking llfio (Win32 thread pool (Vista+)) ...
@@ -1141,47 +1140,54 @@ namespace detail
result<void> _prepare_work_item_delay(dynamic_thread_pool_group::work_item *workitem, grouph_type grouph, deadline d)
{
+ (void) grouph;
if(!d)
{
return errc::invalid_argument;
}
- workitem->_timepoint1 = {};
- workitem->_timepoint2 = {};
- assert(!workitem->_has_timer_set());
- if(workitem->_nextwork == 0 || d.nsecs > 0)
+ if(workitem->_nextwork.load(std::memory_order_acquire) == 0 || d.nsecs > 0)
{
if(d.nsecs > 0)
{
if(d.steady)
{
workitem->_timepoint1 = std::chrono::steady_clock::now() + std::chrono::nanoseconds(d.nsecs);
+ workitem->_timepoint2 = {};
}
else
{
+ workitem->_timepoint1 = {};
workitem->_timepoint2 = d.to_time_point();
}
}
else
{
workitem->_timepoint1 = std::chrono::steady_clock::time_point(std::chrono::steady_clock::duration(1));
+ workitem->_timepoint2 = {};
}
assert(workitem->_has_timer_set());
+#if defined(_WIN32)
if(nullptr == workitem->_internaltimerh)
{
-#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
- (void) grouph;
- workitem->_internaltimerh = (void *) (uintptr_t) -1;
-#elif defined(_WIN32)
workitem->_internaltimerh = CreateThreadpoolTimer(_win32_timer_thread_callback, workitem, grouph);
if(nullptr == workitem->_internaltimerh)
{
return win32_error();
}
-#else
- (void) grouph;
- workitem->_internaltimerh = (void *) (uintptr_t) -1;
+ }
#endif
+ }
+ else
+ {
+ if(workitem->_timepoint1 != std::chrono::steady_clock::time_point())
+ {
+ workitem->_timepoint1 = {};
}
+ if(workitem->_timepoint2 != std::chrono::system_clock::time_point())
+ {
+ workitem->_timepoint2 = {};
+ }
+ assert(!workitem->_has_timer_set());
}
return success();
}
@@ -1296,6 +1302,7 @@ public:
LLFIO_LOG_FUNCTION_CALL(this);
(void) wait();
auto &impl = detail::global_dynamic_thread_pool();
+ // detail::dynamic_thread_pool_group_impl_guard g1(_lock);
#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
if(nullptr != _grouph)
{
@@ -1309,7 +1316,7 @@ public:
_grouph = nullptr;
}
#endif
- detail::global_dynamic_thread_pool_impl::workqueue_guard g(impl.workqueue_lock);
+ detail::global_dynamic_thread_pool_impl::workqueue_guard g2(impl.workqueue_lock);
assert(impl.workqueue->nesting_level >= _nesting_level);
for(auto *p = impl.workqueue.get(); p != nullptr; p = p->next.get())
{
@@ -1648,13 +1655,13 @@ namespace detail
{
(void) submit_into_highest_priority;
(void) defer_pool_wake;
- if(workitem->_nextwork != -1)
+ const auto nextwork = workitem->_nextwork.load(std::memory_order_acquire);
+ if(nextwork != -1)
{
auto *parent = workitem->_parent.load(std::memory_order_relaxed);
// If no work item for now, or there is a delay, schedule a timer
- if(workitem->_nextwork == 0 || workitem->_has_timer_set())
+ if(nextwork == 0 || workitem->_has_timer_set())
{
- assert(workitem->_internaltimerh != nullptr);
#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
dispatch_time_t when;
if(workitem->_has_timer_set_relative())
@@ -1802,8 +1809,6 @@ namespace detail
}
#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32)
- // Indicate that I can be executed again
- workitem->_internalworkh = nullptr;
#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
std::cout << "*** DTP submits work item " << workitem << std::endl;
#endif
@@ -1854,10 +1859,7 @@ namespace detail
for(auto *i : work)
{
_remove_from_list(group->_work_items_active, i);
-#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
- i->_internalworkh = nullptr;
- i->_internaltimerh = nullptr;
-#elif defined(_WIN32)
+#if defined(_WIN32)
if(nullptr != i->_internaltimerh)
{
CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh);
@@ -1876,16 +1878,14 @@ namespace detail
{
deadline d(std::chrono::seconds(0));
i->_parent.store(group, std::memory_order_release);
- i->_nextwork = i->next(d);
- if(-1 == i->_nextwork)
+ i->_nextwork.store(i->next(d), std::memory_order_release);
+ if(-1 == i->_nextwork.load(std::memory_order_acquire))
{
_append_to_list(group->_work_items_done, i);
}
else
{
-#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
- i->_internalworkh = (void *) (uintptr_t) -1;
-#elif defined(_WIN32)
+#if defined(_WIN32)
i->_internalworkh = CreateThreadpoolWork(_win32_worker_thread_callback, i, group->_grouph);
if(nullptr == i->_internalworkh)
{
@@ -1908,6 +1908,7 @@ namespace detail
_submit_work_item(true, i, i != work.back());
}
}
+ g.lock();
return success();
}
catch(...)
@@ -1923,10 +1924,7 @@ namespace detail
auto *parent = i->_parent.load(std::memory_order_relaxed);
_remove_from_list(parent->_work_items_active, i);
_append_to_list(parent->_work_items_done, i);
-#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
- i->_internaltimerh = nullptr;
- i->_internalworkh = nullptr;
-#elif defined(_WIN32)
+#if defined(_WIN32)
if(i->_internalworkh_inuse > 0)
{
i->_internalworkh_inuse = 2;
@@ -1944,12 +1942,6 @@ namespace detail
i->_internalworkh = nullptr;
}
}
-#else
- i->_internaltimerh = nullptr;
- i->_internalworkh = nullptr;
-#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
- std::cout << "*** DTP sets done work item " << i << std::endl;
-#endif
#endif
if(parent->_work_items_active.count == 0)
{
@@ -1958,14 +1950,13 @@ namespace detail
for(; v != nullptr; v = n)
{
v->_parent.store(nullptr, std::memory_order_release);
- v->_nextwork = -1;
+ v->_nextwork.store(-1, std::memory_order_release);
n = v->_next;
}
n = v = parent->_work_items_done.front;
parent->_work_items_done.front = parent->_work_items_done.back = nullptr;
parent->_work_items_done.count = 0;
parent->_stopping.store(false, std::memory_order_release);
- parent->_stopped.store(true, std::memory_order_release);
parent->_completing.store(true, std::memory_order_release); // cause submissions to enter _work_items_delayed
#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
std::cout << "*** DTP executes group_complete for group " << parent << std::endl;
@@ -1975,6 +1966,7 @@ namespace detail
n = v->_next;
v->group_complete(parent->_abnormal_completion_cause);
}
+ parent->_stopped.store(true, std::memory_order_release);
parent->_completing.store(false, std::memory_order_release); // cease submitting to _work_items_delayed
#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
std::cout << "*** DTP group_complete done for group " << parent << ". _work_items_delayed.count = " << parent->_work_items_delayed.count << std::endl;
@@ -2207,26 +2199,6 @@ namespace detail
}
}
#else
-#if 0
- if(group->_stopping.load(std::memory_order_relaxed))
- {
- // Kill all work items not currently being executed immediately
- for(bool done = false; !done;)
- {
- done = true;
- for(auto *p = group->_work_items_active.front; p != nullptr; p = p->_next)
- {
- if(p->_internalworkh == nullptr)
- {
- _remove_from_list(group->_work_items_active, p);
- _append_to_list(group->_work_items_done, p);
- done = false;
- break;
- }
- }
- }
- }
-#endif
while(group->_work_items_active.count > 0)
{
LLFIO_DEADLINE_TO_TIMEOUT_LOOP(d);
@@ -2250,7 +2222,7 @@ namespace detail
inline void global_dynamic_thread_pool_impl::_timerthread(dynamic_thread_pool_group::work_item *workitem, threadh_type /*unused*/)
{
LLFIO_LOG_FUNCTION_CALL(this);
- assert(workitem->_nextwork != -1);
+ assert(workitem->_nextwork.load(std::memory_order_relaxed) != -1);
assert(workitem->_has_timer_set());
auto *parent = workitem->_parent.load(std::memory_order_relaxed);
// std::cout << "*** _timerthread " << workitem << std::endl;
@@ -2287,10 +2259,10 @@ namespace detail
workitem->_timepoint2 = {};
}
assert(!workitem->_has_timer_set());
- if(workitem->_nextwork == 0)
+ if(workitem->_nextwork.load(std::memory_order_acquire) == 0)
{
deadline d(std::chrono::seconds(0));
- workitem->_nextwork = workitem->next(d);
+ workitem->_nextwork.store(workitem->next(d), std::memory_order_release);
auto r2 = _prepare_work_item_delay(workitem, parent->_grouph, d);
if(!r2)
{
@@ -2299,7 +2271,7 @@ namespace detail
_work_item_done(g, workitem);
return;
}
- if(-1 == workitem->_nextwork)
+ if(-1 == workitem->_nextwork.load(std::memory_order_relaxed))
{
dynamic_thread_pool_group_impl_guard g(parent->_lock); // lock group
_work_item_done(g, workitem);
@@ -2317,10 +2289,10 @@ namespace detail
LLFIO_LOG_FUNCTION_CALL(this);
//{
// _lock_guard g(parent->_lock);
- // std::cout << "*** _workerthread " << workitem << " begins with work " << workitem->_nextwork << std::endl;
+ // std::cout << "*** _workerthread " << workitem << " begins with work " << workitem->_nextwork.load(std::memory_order_relaxed) << std::endl;
//}
- assert(workitem->_nextwork != -1);
- assert(workitem->_nextwork != 0);
+ assert(workitem->_nextwork.load(std::memory_order_relaxed) != -1);
+ assert(workitem->_nextwork.load(std::memory_order_relaxed) != 0);
auto *parent = workitem->_parent.load(std::memory_order_relaxed);
if(parent->_stopping.load(std::memory_order_relaxed))
{
@@ -2333,8 +2305,8 @@ namespace detail
tls.workitem = workitem;
tls.current_callback_instance = selfthreadh;
tls.nesting_level = parent->_nesting_level + 1;
- auto r = (*workitem)(workitem->_nextwork);
- workitem->_nextwork = 0; // call next() next time
+ auto r = (*workitem)(workitem->_nextwork.load(std::memory_order_acquire));
+ workitem->_nextwork.store(0, std::memory_order_release); // call next() next time
tls = old_thread_local_state;
// std::cout << "*** _workerthread " << workitem << " ends with work " << workitem->_nextwork << std::endl;
if(!r)
@@ -2352,7 +2324,7 @@ namespace detail
else
{
deadline d(std::chrono::seconds(0));
- workitem->_nextwork = workitem->next(d);
+ workitem->_nextwork.store(workitem->next(d), std::memory_order_release);
auto r2 = _prepare_work_item_delay(workitem, parent->_grouph, d);
if(!r2)
{
@@ -2361,7 +2333,7 @@ namespace detail
_work_item_done(g, workitem);
return;
}
- if(-1 == workitem->_nextwork)
+ if(-1 == workitem->_nextwork.load(std::memory_order_relaxed))
{
dynamic_thread_pool_group_impl_guard g(parent->_lock); // lock group
_work_item_done(g, workitem);