Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/windirstat/llfio.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2021-03-17 15:23:58 +0300
committerNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2021-03-17 15:23:58 +0300
commitc66ba10ca949bdcce5f1029634f46b3db092a378 (patch)
treee672081eba86c4edf1db01238317b4379e0352b3
parent67226948b9f00aebbf33c232d10c417ba1abb289 (diff)
Fix all tsan failures for both native and libdispatch backends of dynamic_thread_pool_group. If this passes CI, it's ready for merge.
-rw-r--r--include/llfio/revision.hpp6
-rw-r--r--include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp112
-rw-r--r--include/llfio/v2.0/dynamic_thread_pool_group.hpp10
3 files changed, 50 insertions, 78 deletions
diff --git a/include/llfio/revision.hpp b/include/llfio/revision.hpp
index f0eeb8c2..2cd9f8b5 100644
--- a/include/llfio/revision.hpp
+++ b/include/llfio/revision.hpp
@@ -1,4 +1,4 @@
// Note the second line of this file must ALWAYS be the git SHA, third line ALWAYS the git SHA update time
-#define LLFIO_PREVIOUS_COMMIT_REF 45c0392e65da7e53984b3b734fbb810bb38561c8
-#define LLFIO_PREVIOUS_COMMIT_DATE "2021-03-11 11:47:09 +00:00"
-#define LLFIO_PREVIOUS_COMMIT_UNIQUE 45c0392e
+#define LLFIO_PREVIOUS_COMMIT_REF 67226948b9f00aebbf33c232d10c417ba1abb289
+#define LLFIO_PREVIOUS_COMMIT_DATE "2021-03-16 12:31:40 +00:00"
+#define LLFIO_PREVIOUS_COMMIT_UNIQUE 67226948
diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
index 9dff9425..287683bf 100644
--- a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
+++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
@@ -188,7 +188,6 @@ Benchmarking llfio (Linux native) ...
*/
-
/* Windows 4Kb and 64Kb Win32 thread pool
Benchmarking llfio (Win32 thread pool (Vista+)) ...
@@ -1141,47 +1140,54 @@ namespace detail
result<void> _prepare_work_item_delay(dynamic_thread_pool_group::work_item *workitem, grouph_type grouph, deadline d)
{
+ (void) grouph;
if(!d)
{
return errc::invalid_argument;
}
- workitem->_timepoint1 = {};
- workitem->_timepoint2 = {};
- assert(!workitem->_has_timer_set());
- if(workitem->_nextwork == 0 || d.nsecs > 0)
+ if(workitem->_nextwork.load(std::memory_order_acquire) == 0 || d.nsecs > 0)
{
if(d.nsecs > 0)
{
if(d.steady)
{
workitem->_timepoint1 = std::chrono::steady_clock::now() + std::chrono::nanoseconds(d.nsecs);
+ workitem->_timepoint2 = {};
}
else
{
+ workitem->_timepoint1 = {};
workitem->_timepoint2 = d.to_time_point();
}
}
else
{
workitem->_timepoint1 = std::chrono::steady_clock::time_point(std::chrono::steady_clock::duration(1));
+ workitem->_timepoint2 = {};
}
assert(workitem->_has_timer_set());
+#if defined(_WIN32)
if(nullptr == workitem->_internaltimerh)
{
-#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
- (void) grouph;
- workitem->_internaltimerh = (void *) (uintptr_t) -1;
-#elif defined(_WIN32)
workitem->_internaltimerh = CreateThreadpoolTimer(_win32_timer_thread_callback, workitem, grouph);
if(nullptr == workitem->_internaltimerh)
{
return win32_error();
}
-#else
- (void) grouph;
- workitem->_internaltimerh = (void *) (uintptr_t) -1;
+ }
#endif
+ }
+ else
+ {
+ if(workitem->_timepoint1 != std::chrono::steady_clock::time_point())
+ {
+ workitem->_timepoint1 = {};
}
+ if(workitem->_timepoint2 != std::chrono::system_clock::time_point())
+ {
+ workitem->_timepoint2 = {};
+ }
+ assert(!workitem->_has_timer_set());
}
return success();
}
@@ -1296,6 +1302,7 @@ public:
LLFIO_LOG_FUNCTION_CALL(this);
(void) wait();
auto &impl = detail::global_dynamic_thread_pool();
+ // detail::dynamic_thread_pool_group_impl_guard g1(_lock);
#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
if(nullptr != _grouph)
{
@@ -1309,7 +1316,7 @@ public:
_grouph = nullptr;
}
#endif
- detail::global_dynamic_thread_pool_impl::workqueue_guard g(impl.workqueue_lock);
+ detail::global_dynamic_thread_pool_impl::workqueue_guard g2(impl.workqueue_lock);
assert(impl.workqueue->nesting_level >= _nesting_level);
for(auto *p = impl.workqueue.get(); p != nullptr; p = p->next.get())
{
@@ -1648,13 +1655,13 @@ namespace detail
{
(void) submit_into_highest_priority;
(void) defer_pool_wake;
- if(workitem->_nextwork != -1)
+ const auto nextwork = workitem->_nextwork.load(std::memory_order_acquire);
+ if(nextwork != -1)
{
auto *parent = workitem->_parent.load(std::memory_order_relaxed);
// If no work item for now, or there is a delay, schedule a timer
- if(workitem->_nextwork == 0 || workitem->_has_timer_set())
+ if(nextwork == 0 || workitem->_has_timer_set())
{
- assert(workitem->_internaltimerh != nullptr);
#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
dispatch_time_t when;
if(workitem->_has_timer_set_relative())
@@ -1802,8 +1809,6 @@ namespace detail
}
#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32)
- // Indicate that I can be executed again
- workitem->_internalworkh = nullptr;
#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
std::cout << "*** DTP submits work item " << workitem << std::endl;
#endif
@@ -1854,10 +1859,7 @@ namespace detail
for(auto *i : work)
{
_remove_from_list(group->_work_items_active, i);
-#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
- i->_internalworkh = nullptr;
- i->_internaltimerh = nullptr;
-#elif defined(_WIN32)
+#if defined(_WIN32)
if(nullptr != i->_internaltimerh)
{
CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh);
@@ -1876,16 +1878,14 @@ namespace detail
{
deadline d(std::chrono::seconds(0));
i->_parent.store(group, std::memory_order_release);
- i->_nextwork = i->next(d);
- if(-1 == i->_nextwork)
+ i->_nextwork.store(i->next(d), std::memory_order_release);
+ if(-1 == i->_nextwork.load(std::memory_order_acquire))
{
_append_to_list(group->_work_items_done, i);
}
else
{
-#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
- i->_internalworkh = (void *) (uintptr_t) -1;
-#elif defined(_WIN32)
+#if defined(_WIN32)
i->_internalworkh = CreateThreadpoolWork(_win32_worker_thread_callback, i, group->_grouph);
if(nullptr == i->_internalworkh)
{
@@ -1908,6 +1908,7 @@ namespace detail
_submit_work_item(true, i, i != work.back());
}
}
+ g.lock();
return success();
}
catch(...)
@@ -1923,10 +1924,7 @@ namespace detail
auto *parent = i->_parent.load(std::memory_order_relaxed);
_remove_from_list(parent->_work_items_active, i);
_append_to_list(parent->_work_items_done, i);
-#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
- i->_internaltimerh = nullptr;
- i->_internalworkh = nullptr;
-#elif defined(_WIN32)
+#if defined(_WIN32)
if(i->_internalworkh_inuse > 0)
{
i->_internalworkh_inuse = 2;
@@ -1944,12 +1942,6 @@ namespace detail
i->_internalworkh = nullptr;
}
}
-#else
- i->_internaltimerh = nullptr;
- i->_internalworkh = nullptr;
-#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
- std::cout << "*** DTP sets done work item " << i << std::endl;
-#endif
#endif
if(parent->_work_items_active.count == 0)
{
@@ -1958,14 +1950,13 @@ namespace detail
for(; v != nullptr; v = n)
{
v->_parent.store(nullptr, std::memory_order_release);
- v->_nextwork = -1;
+ v->_nextwork.store(-1, std::memory_order_release);
n = v->_next;
}
n = v = parent->_work_items_done.front;
parent->_work_items_done.front = parent->_work_items_done.back = nullptr;
parent->_work_items_done.count = 0;
parent->_stopping.store(false, std::memory_order_release);
- parent->_stopped.store(true, std::memory_order_release);
parent->_completing.store(true, std::memory_order_release); // cause submissions to enter _work_items_delayed
#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
std::cout << "*** DTP executes group_complete for group " << parent << std::endl;
@@ -1975,6 +1966,7 @@ namespace detail
n = v->_next;
v->group_complete(parent->_abnormal_completion_cause);
}
+ parent->_stopped.store(true, std::memory_order_release);
parent->_completing.store(false, std::memory_order_release); // cease submitting to _work_items_delayed
#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
std::cout << "*** DTP group_complete done for group " << parent << ". _work_items_delayed.count = " << parent->_work_items_delayed.count << std::endl;
@@ -2207,26 +2199,6 @@ namespace detail
}
}
#else
-#if 0
- if(group->_stopping.load(std::memory_order_relaxed))
- {
- // Kill all work items not currently being executed immediately
- for(bool done = false; !done;)
- {
- done = true;
- for(auto *p = group->_work_items_active.front; p != nullptr; p = p->_next)
- {
- if(p->_internalworkh == nullptr)
- {
- _remove_from_list(group->_work_items_active, p);
- _append_to_list(group->_work_items_done, p);
- done = false;
- break;
- }
- }
- }
- }
-#endif
while(group->_work_items_active.count > 0)
{
LLFIO_DEADLINE_TO_TIMEOUT_LOOP(d);
@@ -2250,7 +2222,7 @@ namespace detail
inline void global_dynamic_thread_pool_impl::_timerthread(dynamic_thread_pool_group::work_item *workitem, threadh_type /*unused*/)
{
LLFIO_LOG_FUNCTION_CALL(this);
- assert(workitem->_nextwork != -1);
+ assert(workitem->_nextwork.load(std::memory_order_relaxed) != -1);
assert(workitem->_has_timer_set());
auto *parent = workitem->_parent.load(std::memory_order_relaxed);
// std::cout << "*** _timerthread " << workitem << std::endl;
@@ -2287,10 +2259,10 @@ namespace detail
workitem->_timepoint2 = {};
}
assert(!workitem->_has_timer_set());
- if(workitem->_nextwork == 0)
+ if(workitem->_nextwork.load(std::memory_order_acquire) == 0)
{
deadline d(std::chrono::seconds(0));
- workitem->_nextwork = workitem->next(d);
+ workitem->_nextwork.store(workitem->next(d), std::memory_order_release);
auto r2 = _prepare_work_item_delay(workitem, parent->_grouph, d);
if(!r2)
{
@@ -2299,7 +2271,7 @@ namespace detail
_work_item_done(g, workitem);
return;
}
- if(-1 == workitem->_nextwork)
+ if(-1 == workitem->_nextwork.load(std::memory_order_relaxed))
{
dynamic_thread_pool_group_impl_guard g(parent->_lock); // lock group
_work_item_done(g, workitem);
@@ -2317,10 +2289,10 @@ namespace detail
LLFIO_LOG_FUNCTION_CALL(this);
//{
// _lock_guard g(parent->_lock);
- // std::cout << "*** _workerthread " << workitem << " begins with work " << workitem->_nextwork << std::endl;
+ // std::cout << "*** _workerthread " << workitem << " begins with work " << workitem->_nextwork.load(std::memory_order_relaxed) << std::endl;
//}
- assert(workitem->_nextwork != -1);
- assert(workitem->_nextwork != 0);
+ assert(workitem->_nextwork.load(std::memory_order_relaxed) != -1);
+ assert(workitem->_nextwork.load(std::memory_order_relaxed) != 0);
auto *parent = workitem->_parent.load(std::memory_order_relaxed);
if(parent->_stopping.load(std::memory_order_relaxed))
{
@@ -2333,8 +2305,8 @@ namespace detail
tls.workitem = workitem;
tls.current_callback_instance = selfthreadh;
tls.nesting_level = parent->_nesting_level + 1;
- auto r = (*workitem)(workitem->_nextwork);
- workitem->_nextwork = 0; // call next() next time
+ auto r = (*workitem)(workitem->_nextwork.load(std::memory_order_acquire));
+ workitem->_nextwork.store(0, std::memory_order_release); // call next() next time
tls = old_thread_local_state;
// std::cout << "*** _workerthread " << workitem << " ends with work " << workitem->_nextwork << std::endl;
if(!r)
@@ -2352,7 +2324,7 @@ namespace detail
else
{
deadline d(std::chrono::seconds(0));
- workitem->_nextwork = workitem->next(d);
+ workitem->_nextwork.store(workitem->next(d), std::memory_order_release);
auto r2 = _prepare_work_item_delay(workitem, parent->_grouph, d);
if(!r2)
{
@@ -2361,7 +2333,7 @@ namespace detail
_work_item_done(g, workitem);
return;
}
- if(-1 == workitem->_nextwork)
+ if(-1 == workitem->_nextwork.load(std::memory_order_relaxed))
{
dynamic_thread_pool_group_impl_guard g(parent->_lock); // lock group
_work_item_done(g, workitem);
diff --git a/include/llfio/v2.0/dynamic_thread_pool_group.hpp b/include/llfio/v2.0/dynamic_thread_pool_group.hpp
index 2f89ce98..09828a6e 100644
--- a/include/llfio/v2.0/dynamic_thread_pool_group.hpp
+++ b/include/llfio/v2.0/dynamic_thread_pool_group.hpp
@@ -194,7 +194,7 @@ public:
void *_internalworkh{nullptr};
void *_internaltimerh{nullptr}; // lazily created if next() ever returns a deadline
work_item *_prev{nullptr}, *_next{nullptr}, *_next_scheduled{nullptr};
- intptr_t _nextwork{-1};
+ std::atomic<intptr_t> _nextwork{-1};
std::chrono::steady_clock::time_point _timepoint1;
std::chrono::system_clock::time_point _timepoint2;
int _internalworkh_inuse{0};
@@ -213,7 +213,7 @@ public:
, _prev(o._prev)
, _next(o._next)
, _next_scheduled(o._next_scheduled)
- , _nextwork(o._nextwork)
+ , _nextwork(o._nextwork.load(std::memory_order_relaxed))
, _timepoint1(o._timepoint1)
, _timepoint2(o._timepoint2)
, _internalworkh_inuse(o._internalworkh_inuse)
@@ -227,7 +227,7 @@ public:
abort();
}
o._prev = o._next = o._next_scheduled = nullptr;
- o._nextwork = -1;
+ o._nextwork.store(-1, std::memory_order_relaxed);
o._internalworkh_inuse = 0;
}
work_item &operator=(const work_item &) = delete;
@@ -236,8 +236,8 @@ public:
public:
virtual ~work_item()
{
- assert(_nextwork == -1);
- if(_nextwork != -1)
+ assert(_nextwork.load(std::memory_order_relaxed) == -1);
+ if(_nextwork.load(std::memory_order_relaxed) != -1)
{
LLFIO_LOG_FATAL(this, "FATAL: dynamic_thread_pool_group::work_item destroyed before all work was done!");
abort();