Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/windirstat/llfio.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2021-03-10 14:39:22 +0300
committerNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2021-03-16 13:21:45 +0300
commit4550edca2ecb7c4660b8516ef2e9d14f5dd198a4 (patch)
tree8eb13f8541d46920c793b6353fe2af2e90ea33b4
parent315522698efe62d1d97957e5a8fd992a7b6ecf10 (diff)
Rework native Linux dynamic_thread_pool_group to no longer have a single mutex around work calculation and dispatch. Native Linux backend is currently not working.
-rw-r--r--include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp544
1 files changed, 361 insertions, 183 deletions
diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
index c26a7945..c22cae35 100644
--- a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
+++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
@@ -27,7 +27,10 @@ Distributed under the Boost Software License, Version 1.0.
#include "../../file_handle.hpp"
#include "../../statfs.hpp"
+#include "quickcpplib/spinlock.hpp"
+
#include <atomic>
+#include <memory>
#include <mutex>
#include <unordered_map>
#include <unordered_set>
@@ -183,100 +186,238 @@ Benchmarking llfio (Win32 thread pool (Vista+)) ...
For 1024 work items got 66416.2 SHA256 hashes/sec with 33 maximum concurrency.
*/
+/* The Win32 thread pool numbers match those for ASIO on Windows for 64Kb SHA256
+so the base implementation is probably good enough.
+
+1. Need multiple work queues, with speculative locking for insert/remove.
+
+2. Pumping timers needs to not be in work queue loop:
+
+ - If there is a waiting thread, it can pump timers.
+
+ - Otherwise a separate timer thread would need to be launched.
+
+3. List counts for doubly linked lists need optional atomic count, so add fake
+atomic type.
+
+*/
+
LLFIO_V2_NAMESPACE_BEGIN
namespace detail
{
+ struct dynamic_thread_pool_group_impl_guard : std::unique_lock<std::mutex>
+ {
+ using std::unique_lock<std::mutex>::unique_lock;
+ };
+ template <class T> class fake_atomic
+ {
+ T _v;
+
+ public:
+ constexpr fake_atomic(T v)
+ : _v(v)
+ {
+ }
+ T load(std::memory_order /*unused*/) const { return _v; }
+ void store(T v, std::memory_order /*unused*/) { _v = v; }
+ T fetch_add(T v, std::memory_order /*unused*/)
+ {
+ _v += v;
+ return _v - v;
+ }
+ T fetch_sub(T v, std::memory_order /*unused*/)
+ {
+ _v -= v;
+ return _v + v;
+ }
+ };
struct global_dynamic_thread_pool_impl_workqueue_item
{
- std::unordered_set<dynamic_thread_pool_group_impl *> items;
+ const size_t nesting_level;
+ std::shared_ptr<global_dynamic_thread_pool_impl_workqueue_item> next;
+ std::unordered_set<dynamic_thread_pool_group_impl *> items; // Do NOT use without holding workqueue_lock
+
+ explicit global_dynamic_thread_pool_impl_workqueue_item(size_t _nesting_level, std::shared_ptr<global_dynamic_thread_pool_impl_workqueue_item> &&preceding)
+ : nesting_level(_nesting_level)
+ , next(preceding)
+ {
+ }
+
#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32)
- dynamic_thread_pool_group::work_item *_next_active_front{nullptr}, *_next_timer_relative_front{nullptr}, *_next_timer_absolute_front{nullptr};
- dynamic_thread_pool_group::work_item *_next_active_back{nullptr}, *_next_timer_relative_back{nullptr}, *_next_timer_absolute_back{nullptr};
+ static constexpr unsigned TOTAL_NEXTACTIVES = 4;
+ struct next_active_base_t
+ {
+ std::atomic<unsigned> count{0};
+ QUICKCPPLIB_NAMESPACE::configurable_spinlock::spinlock<unsigned> lock;
+ dynamic_thread_pool_group::work_item *front{nullptr}, *back{nullptr};
+
+ next_active_base_t() = default;
+ next_active_base_t(const next_active_base_t &o)
+ : count(o.count.load(std::memory_order_relaxed))
+ , front(o.front)
+ , back(o.back)
+ {
+ }
+ };
+ struct alignas(64) next_active_work_t : next_active_base_t
+ {
+ char _padding[64 - sizeof(next_active_base_t)]; // 40 bytes?
+ } next_actives[TOTAL_NEXTACTIVES];
+ static_assert(sizeof(next_active_work_t) == 64, "next_active_work_t is not a cacheline");
+ next_active_base_t next_timer_relative, next_timer_absolute;
- dynamic_thread_pool_group::work_item *next_active()
+ dynamic_thread_pool_group::work_item *next_active(size_t threadidx)
{
- auto *ret = _next_active_front;
- if(ret == nullptr)
+ threadidx &= ~(TOTAL_NEXTACTIVES - 1);
+ const size_t original_threadidx = threadidx;
+ bool all_empty = true;
+ for(;;)
{
- assert(_next_active_back == nullptr);
- return nullptr;
+ next_active_base_t &x = next_actives[threadidx];
+ if(x.count.load(std::memory_order_relaxed) > 0)
+ {
+ all_empty = false;
+ if(x.lock.try_lock())
+ {
+ auto *ret = x.front;
+ if(ret != nullptr)
+ {
+ x.front = ret->_next_scheduled;
+ x.count.fetch_sub(1, std::memory_order_relaxed);
+ if(x.front == nullptr)
+ {
+ assert(x.back == ret);
+ x.back = nullptr;
+ }
+ ret->_next_scheduled = nullptr;
+ return ret;
+ }
+ }
+ x.lock.unlock();
+ }
+ if(++threadidx >= TOTAL_NEXTACTIVES)
+ {
+ threadidx = 0;
+ }
+ if(threadidx == original_threadidx)
+ {
+ if(all_empty)
+ {
+ return nullptr;
+ }
+ all_empty = true;
+ }
}
- _next_active_front = ret->_next_scheduled;
- if(_next_active_front == nullptr)
+ }
+
+ private:
+ next_active_base_t &_choose_next_active()
+ {
+ unsigned idx = (unsigned) -1, max_count = (unsigned) -1;
+ for(unsigned n = 0; n < TOTAL_NEXTACTIVES; n++)
{
- assert(_next_active_back == ret);
- _next_active_back = nullptr;
+ auto c = next_actives[n].count.load(std::memory_order_relaxed);
+ if(c < max_count)
+ {
+ idx = n;
+ max_count = c;
+ }
+ }
+ for(;;)
+ {
+ if(next_actives[idx].lock.try_lock())
+ {
+ return next_actives[idx];
+ }
+ if(++idx >= TOTAL_NEXTACTIVES)
+ {
+ idx = 0;
+ }
}
- ret->_next_scheduled = nullptr;
- return ret;
}
+
+ public:
void append_active(dynamic_thread_pool_group::work_item *p)
{
- if(_next_active_back == nullptr)
+ next_active_base_t &x = _choose_next_active();
+ x.count.fetch_add(1, std::memory_order_relaxed);
+ if(x.back == nullptr)
{
- assert(_next_active_front == nullptr);
- _next_active_front = _next_active_back = p;
+ assert(x.front == nullptr);
+ x.front = x.back = p;
+ x.lock.unlock();
return;
}
p->_next_scheduled = nullptr;
- _next_active_back->_next_scheduled = p;
- _next_active_back = p;
+ x.back->_next_scheduled = p;
+ x.back = p;
+ x.lock.unlock();
}
void prepend_active(dynamic_thread_pool_group::work_item *p)
{
- if(_next_active_front == nullptr)
+ next_active_base_t &x = _choose_next_active();
+ x.count.fetch_add(1, std::memory_order_relaxed);
+ if(x.front == nullptr)
{
- assert(_next_active_back == nullptr);
- _next_active_front = _next_active_back = p;
+ assert(x.back == nullptr);
+ x.front = x.back = p;
+ x.lock.unlock();
return;
}
- p->_next_scheduled = _next_active_front;
- _next_active_front = p;
+ p->_next_scheduled = x.front;
+ x.front = p;
+ x.lock.unlock();
}
- dynamic_thread_pool_group::work_item *next_timer(int which)
+ // x must be LOCKED on entry
+ template <int which> dynamic_thread_pool_group::work_item *next_timer()
{
if(which == 0)
{
return nullptr;
}
- auto *&front = (which == 1) ? _next_timer_relative_front : _next_timer_absolute_front;
- auto *&back = (which == 1) ? _next_timer_relative_back : _next_timer_absolute_back;
- auto *ret = front;
+ next_active_base_t &x = (which == 1) ? next_timer_relative : next_timer_absolute;
+ // x.lock.lock();
+ auto *ret = x.front;
if(ret == nullptr)
{
- assert(back == nullptr);
+ assert(x.back == nullptr);
+ x.lock.unlock();
return nullptr;
}
- front = ret->_next_scheduled;
- if(front == nullptr)
+ x.front = ret->_next_scheduled;
+ if(x.front == nullptr)
{
- assert(back == ret);
- back = nullptr;
+ assert(x.back == ret);
+ x.back = nullptr;
}
ret->_next_scheduled = nullptr;
+ x.count.fetch_sub(1, std::memory_order_relaxed);
+ x.lock.unlock();
return ret;
}
void append_timer(dynamic_thread_pool_group::work_item *i)
{
if(i->_timepoint1 != std::chrono::steady_clock::time_point())
{
- if(_next_timer_relative_front == nullptr)
+ next_timer_relative.lock.lock();
+ next_timer_relative.count.fetch_add(1, std::memory_order_relaxed);
+ if(next_timer_relative.front == nullptr)
{
- _next_timer_relative_front = _next_timer_relative_back = i;
+ next_timer_relative.front = next_timer_relative.back = i;
}
else
{
bool done = false;
- for(dynamic_thread_pool_group::work_item *p = nullptr, *n = _next_timer_relative_front; n != nullptr; p = n, n = n->_next_scheduled)
+ for(dynamic_thread_pool_group::work_item *p = nullptr, *n = next_timer_relative.front; n != nullptr; p = n, n = n->_next_scheduled)
{
if(n->_timepoint1 <= i->_timepoint1)
{
if(p == nullptr)
{
i->_next_scheduled = n;
- _next_timer_relative_front = i;
+ next_timer_relative.front = i;
}
else
{
@@ -289,29 +430,32 @@ namespace detail
}
if(!done)
{
- _next_timer_relative_back->_next_scheduled = i;
+ next_timer_relative.back->_next_scheduled = i;
i->_next_scheduled = nullptr;
- _next_timer_relative_back = i;
+ next_timer_relative.back = i;
}
}
+ next_timer_relative.lock.unlock();
}
else
{
- if(_next_timer_absolute_front == nullptr)
+ next_timer_absolute.lock.lock();
+ next_timer_absolute.count.fetch_add(1, std::memory_order_relaxed);
+ if(next_timer_absolute.front == nullptr)
{
- _next_timer_absolute_front = _next_timer_absolute_back = i;
+ next_timer_absolute.front = next_timer_absolute.back = i;
}
else
{
bool done = false;
- for(dynamic_thread_pool_group::work_item *p = nullptr, *n = _next_timer_absolute_front; n != nullptr; p = n, n = n->_next_scheduled)
+ for(dynamic_thread_pool_group::work_item *p = nullptr, *n = next_timer_absolute.front; n != nullptr; p = n, n = n->_next_scheduled)
{
if(n->_timepoint2 <= i->_timepoint2)
{
if(p == nullptr)
{
i->_next_scheduled = n;
- _next_timer_absolute_front = i;
+ next_timer_absolute.front = i;
}
else
{
@@ -324,21 +468,27 @@ namespace detail
}
if(!done)
{
- _next_timer_absolute_back->_next_scheduled = i;
+ next_timer_absolute.back->_next_scheduled = i;
i->_next_scheduled = nullptr;
- _next_timer_absolute_back = i;
+ next_timer_absolute.back = i;
}
}
+ next_timer_absolute.lock.unlock();
}
}
#endif
};
struct global_dynamic_thread_pool_impl
{
- std::mutex workqueue_lock;
- std::vector<global_dynamic_thread_pool_impl_workqueue_item> workqueue;
+ using _spinlock_type = QUICKCPPLIB_NAMESPACE::configurable_spinlock::spinlock<unsigned>;
+
+ _spinlock_type workqueue_lock;
+ struct workqueue_guard : std::unique_lock<_spinlock_type>
+ {
+ using std::unique_lock<_spinlock_type>::unique_lock;
+ };
+ std::shared_ptr<global_dynamic_thread_pool_impl_workqueue_item> workqueue;
#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
- using _lock_guard = std::unique_lock<std::mutex>;
using threadh_type = void *;
using grouph_type = dispatch_group_t;
static void _gcd_dispatch_callback(void *arg)
@@ -352,7 +502,6 @@ namespace detail
global_dynamic_thread_pool()._timerthread(workitem, nullptr);
}
#elif defined(_WIN32)
- using _lock_guard = std::unique_lock<std::mutex>;
using threadh_type = PTP_CALLBACK_INSTANCE;
using grouph_type = PTP_CALLBACK_ENVIRON;
static void CALLBACK _win32_worker_thread_callback(threadh_type threadh, PVOID Parameter, PTP_WORK /*unused*/)
@@ -366,46 +515,13 @@ namespace detail
global_dynamic_thread_pool()._timerthread(workitem, threadh);
}
#else
- using _lock_guard = std::unique_lock<std::mutex>;
-#if 0
- class _lock_guard
- {
- std::mutex &_l;
- bool _is_locked{false};
- _lock_guard(std::mutex &l)
- : _l(l)
- {
- lock();
- }
- _lock_guard(const _lock_guard &) = delete;
- _lock_guard(_lock_guard &&) = delete;
- _lock_guard &operator=(const _lock_guard &) = delete;
- _lock_guard &operator=(_lock_guard &&) = delete;
- ~_lock_guard()
- {
- if(_is_locked)
- {
- unlock();
- }
- }
- void lock()
- {
- assert(!_is_locked);
- global_dynamic_thread_pool_threads_awaiting_mutex.fetch_add(1, std::memory_order_relaxed);
- _l.lock();
- _is_locked = true;
- }
- void unlock()
- {
- assert(_is_locked);
- global_dynamic_thread_pool_threads_awaiting_mutex.fetch_sub(1, std::memory_order_relaxed);
- _l.unlock();
- _is_locked = false;
- }
- };
-#endif
using threadh_type = void *;
using grouph_type = void *;
+ std::mutex threadpool_lock;
+ struct threadpool_guard : std::unique_lock<std::mutex>
+ {
+ using std::unique_lock<std::mutex>::unique_lock;
+ };
struct thread_t
{
thread_t *_prev{nullptr}, *_next{nullptr};
@@ -423,6 +539,10 @@ namespace detail
std::atomic<uint32_t> ms_sleep_for_more_work{20000};
std::mutex threadmetrics_lock;
+ struct threadmetrics_guard : std::unique_lock<std::mutex>
+ {
+ using std::unique_lock<std::mutex>::unique_lock;
+ };
struct threadmetrics_threadid
{
char text[12]; // enough for a UINT32_MAX in decimal
@@ -473,6 +593,10 @@ namespace detail
#endif
std::mutex io_aware_work_item_handles_lock;
+ struct io_aware_work_item_handles_guard : std::unique_lock<std::mutex>
+ {
+ using std::unique_lock<std::mutex>::unique_lock;
+ };
struct io_aware_work_item_statfs
{
size_t refcount{0};
@@ -485,7 +609,6 @@ namespace detail
global_dynamic_thread_pool_impl()
{
- workqueue.reserve(4); // preallocate 4 levels of nesting
#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32)
populate_threadmetrics(std::chrono::steady_clock::now());
#endif
@@ -559,7 +682,7 @@ namespace detail
#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32)
inline void _execute_work(thread_t *self);
- void _add_thread(_lock_guard & /*unused*/)
+ void _add_thread(threadpool_guard & /*unused*/)
{
thread_t *p = nullptr;
try
@@ -578,7 +701,7 @@ namespace detail
}
}
- bool _remove_thread(_lock_guard &g, threads_t &which)
+ bool _remove_thread(threadpool_guard &g, threads_t &which)
{
if(which.count == 0)
{
@@ -614,7 +737,7 @@ namespace detail
~global_dynamic_thread_pool_impl()
{
{
- _lock_guard g(workqueue_lock); // lock global
+ threadpool_guard g(threadpool_lock);
while(threadpool_active.count > 0 || threadpool_sleeping.count > 0)
{
while(threadpool_sleeping.count > 0)
@@ -631,7 +754,7 @@ namespace detail
}
}
}
- _lock_guard g(threadmetrics_lock);
+ threadmetrics_guard g(threadmetrics_lock);
for(auto *p : threadmetrics_sorted)
{
delete p;
@@ -649,7 +772,7 @@ namespace detail
#ifdef __linux__
// You are guaranteed only one of these EVER executes at a time. Locking is probably overkill, but equally also probably harmless
- bool update_threadmetrics(_lock_guard &&g, std::chrono::steady_clock::time_point now, threadmetrics_item *new_items)
+ bool update_threadmetrics(threadmetrics_guard &&g, std::chrono::steady_clock::time_point now, threadmetrics_item *new_items)
{
auto update_item = [&](threadmetrics_item *item) {
char path[64] = "/proc/self/task/", *pend = path + 16, *tend = item->threadid.text;
@@ -739,7 +862,7 @@ namespace detail
auto threadmetrics_blocked = (ssize_t) threadmetrics_queue.blocked;
g.unlock(); // drop threadmetrics_lock
- _lock_guard gg(workqueue_lock);
+ threadpool_guard gg(threadpool_lock);
// Adjust for the number of threads sleeping for more work
threadmetrics_running += threadpool_sleeping.count;
threadmetrics_blocked -= threadpool_sleeping.count;
@@ -752,7 +875,7 @@ namespace detail
auto toremove = std::max((ssize_t) 0, (ssize_t) threadmetrics_running - (ssize_t) max_hardware_concurrency);
if(toadd > 0 || toremove > 0)
{
- //std::cout << "total active = " << threadpool_active.count << " total idle = " << threadpool_sleeping.count
+ // std::cout << "total active = " << threadpool_active.count << " total idle = " << threadpool_sleeping.count
// << " threadmetrics_running = " << threadmetrics_running << " threadmetrics_blocked = " << threadmetrics_blocked << " toadd = " << toadd
// << " toremove = " << toremove << std::endl;
if(toadd > 0)
@@ -784,7 +907,7 @@ namespace detail
using dirent = dirent64;
size_t bytes = 0;
{
- _lock_guard g(threadmetrics_lock);
+ threadmetrics_guard g(threadmetrics_lock);
if(now - threadmetrics_last_updated < std::chrono::milliseconds(250) &&
threadmetrics_queue.running + threadmetrics_queue.blocked >= threadpool_threads.load(std::memory_order_relaxed))
{
@@ -805,7 +928,7 @@ namespace detail
}
}
{
- _lock_guard g(proc_self_task_fd_lock);
+ std::lock_guard<std::mutex> g(proc_self_task_fd_lock);
/* It turns out that /proc/self/task is quite racy in the Linux kernel, so keep
looping this until it stops telling obvious lies.
*/
@@ -855,7 +978,7 @@ namespace detail
}
}
threadmetrics_item *firstnewitem = nullptr;
- _lock_guard g(threadmetrics_lock);
+ threadmetrics_guard g(threadmetrics_lock);
#if 0
{
std::stringstream s;
@@ -1015,14 +1138,16 @@ namespace detail
return success();
}
- inline void _submit_work_item(_lock_guard &g, bool submit_into_highest_priority, dynamic_thread_pool_group::work_item *workitem, bool defer_pool_wake);
+ inline void _submit_work_item(dynamic_thread_pool_group_impl_guard &g, bool submit_into_highest_priority, dynamic_thread_pool_group::work_item *workitem,
+ bool defer_pool_wake);
- inline result<void> submit(_lock_guard &g, dynamic_thread_pool_group_impl *group, span<dynamic_thread_pool_group::work_item *> work) noexcept;
+ inline result<void> submit(dynamic_thread_pool_group_impl_guard &g, dynamic_thread_pool_group_impl *group,
+ span<dynamic_thread_pool_group::work_item *> work) noexcept;
- inline void _work_item_done(_lock_guard &g, dynamic_thread_pool_group::work_item *i) noexcept;
+ inline void _work_item_done(dynamic_thread_pool_group_impl_guard &g, dynamic_thread_pool_group::work_item *i) noexcept;
- inline result<void> stop(_lock_guard &g, dynamic_thread_pool_group_impl *group, result<void> err) noexcept;
- inline result<void> wait(_lock_guard &g, bool reap, dynamic_thread_pool_group_impl *group, deadline d) noexcept;
+ inline result<void> stop(dynamic_thread_pool_group_impl_guard &g, dynamic_thread_pool_group_impl *group, result<void> err) noexcept;
+ inline result<void> wait(dynamic_thread_pool_group_impl_guard &g, bool reap, dynamic_thread_pool_group_impl *group, deadline d) noexcept;
inline void _timerthread(dynamic_thread_pool_group::work_item *workitem, threadh_type selfthreadh);
inline void _workerthread(dynamic_thread_pool_group::work_item *workitem, threadh_type selfthreadh);
@@ -1065,7 +1190,6 @@ class dynamic_thread_pool_group_impl final : public dynamic_thread_pool_group
friend struct detail::global_dynamic_thread_pool_impl;
mutable std::mutex _lock;
- using _lock_guard = detail::global_dynamic_thread_pool_impl::_lock_guard;
size_t _nesting_level{0};
struct workitems_t
{
@@ -1105,14 +1229,13 @@ public:
#elif defined(_WIN32)
InitializeThreadpoolEnvironment(_grouph);
#endif
- detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.workqueue_lock); // lock global
+ detail::global_dynamic_thread_pool_impl::workqueue_guard g(impl.workqueue_lock);
// Append this group to the global work queue at its nesting level
- if(_nesting_level >= impl.workqueue.size())
+ if(!impl.workqueue || impl.workqueue->nesting_level <= _nesting_level)
{
- impl.workqueue.resize(_nesting_level + 1);
+ impl.workqueue = std::make_shared<detail::global_dynamic_thread_pool_impl_workqueue_item>(_nesting_level, std::move(impl.workqueue));
}
- auto &wq = impl.workqueue[_nesting_level];
- wq.items.insert(this);
+ impl.workqueue->items.insert(this);
return success();
}
catch(...)
@@ -1139,13 +1262,19 @@ public:
_grouph = nullptr;
}
#endif
- detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.workqueue_lock); // lock global
- assert(impl.workqueue.size() > _nesting_level);
- auto &wq = impl.workqueue[_nesting_level];
- wq.items.erase(this);
- while(!impl.workqueue.empty() && impl.workqueue.back().items.empty())
+ detail::global_dynamic_thread_pool_impl::workqueue_guard g(impl.workqueue_lock);
+ assert(impl.workqueue->nesting_level >= _nesting_level);
+ for(auto *p = impl.workqueue.get(); p != nullptr; p = p->next.get())
{
- impl.workqueue.pop_back();
+ if(p->nesting_level == _nesting_level)
+ {
+ p->items.erase(this);
+ break;
+ }
+ }
+ while(impl.workqueue && impl.workqueue->items.empty())
+ {
+ impl.workqueue = std::move(impl.workqueue->next);
}
}
@@ -1167,7 +1296,7 @@ public:
}
_stopped.store(false, std::memory_order_release);
auto &impl = detail::global_dynamic_thread_pool();
- _lock_guard g(_lock); // lock group
+ detail::dynamic_thread_pool_group_impl_guard g(_lock); // lock group
if(_work_items_active.count == 0 && _work_items_done.count == 0)
{
_abnormal_completion_cause = success();
@@ -1188,7 +1317,7 @@ public:
return success();
}
auto &impl = detail::global_dynamic_thread_pool();
- _lock_guard g(_lock); // lock group
+ detail::dynamic_thread_pool_group_impl_guard g(_lock); // lock group
return impl.stop(g, this, errc::operation_canceled);
}
@@ -1204,7 +1333,7 @@ public:
return success();
}
auto &impl = detail::global_dynamic_thread_pool();
- _lock_guard g(_lock); // lock group
+ detail::dynamic_thread_pool_group_impl_guard g(_lock); // lock group
return impl.wait(g, true, const_cast<dynamic_thread_pool_group_impl *>(this), d);
}
};
@@ -1264,9 +1393,8 @@ namespace detail
{
pthread_setname_np(pthread_self(), "LLFIO DYN TPG");
self->last_did_work = std::chrono::steady_clock::now();
- _lock_guard g(workqueue_lock); // lock global
- self->state++; // busy
- threadpool_threads.fetch_add(1, std::memory_order_release);
+ self->state++; // busy
+ const unsigned mythreadidx = threadpool_threads.fetch_add(1, std::memory_order_release);
#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
std::cout << "*** DTP " << self << " begins." << std::endl;
#endif
@@ -1277,44 +1405,66 @@ namespace detail
std::chrono::steady_clock::time_point now_steady, earliest_duration;
std::chrono::system_clock::time_point now_system, earliest_absolute;
// Start from highest priority work group, executing any timers due before selecting a work item
- for(auto it = workqueue.rbegin(); it != workqueue.rend() && workitem == nullptr; ++it)
{
- auto &wq = *it;
- if(wq._next_timer_relative_front != nullptr)
+ workqueue_lock.lock();
+ auto lock_wq = workqueue;
+ workqueue_lock.unlock();
+ while(lock_wq)
{
- if(now_steady == std::chrono::steady_clock::time_point())
- {
- now_steady = std::chrono::steady_clock::now();
- }
- if(wq._next_timer_relative_front->_timepoint1 <= now_steady)
- {
- workitem = wq.next_timer(1);
- workitem_is_timer = true;
- break;
- }
- if(earliest_duration == std::chrono::steady_clock::time_point() || wq._next_timer_relative_front->_timepoint1 < earliest_duration)
+ auto &wq = *lock_wq;
+ if(wq.next_timer_relative.count.load(std::memory_order_relaxed) > 0)
{
- earliest_duration = wq._next_timer_relative_front->_timepoint1;
+ if(now_steady == std::chrono::steady_clock::time_point())
+ {
+ now_steady = std::chrono::steady_clock::now();
+ }
+ wq.next_timer_relative.lock.lock();
+ if(wq.next_timer_relative.front != nullptr)
+ {
+ if(wq.next_timer_relative.front->_timepoint1 <= now_steady)
+ {
+ workitem = wq.next_timer<1>(); // unlocks wq.next_timer_relative.lock
+ workitem_is_timer = true;
+ break;
+ }
+ if(earliest_duration == std::chrono::steady_clock::time_point() || wq.next_timer_relative.front->_timepoint1 < earliest_duration)
+ {
+ earliest_duration = wq.next_timer_relative.front->_timepoint1;
+ }
+ }
+ wq.next_timer_relative.lock.unlock();
}
- }
- if(wq._next_timer_absolute_front != nullptr)
- {
- if(now_system == std::chrono::system_clock::time_point())
+ if(wq.next_timer_absolute.count.load(std::memory_order_relaxed) > 0)
{
- now_system = std::chrono::system_clock::now();
+ if(now_system == std::chrono::system_clock::time_point())
+ {
+ now_system = std::chrono::system_clock::now();
+ }
+ wq.next_timer_absolute.lock.lock();
+ if(wq.next_timer_absolute.front != nullptr)
+ {
+ if(wq.next_timer_absolute.front->_timepoint2 <= now_system)
+ {
+ workitem = wq.next_timer<2>(); // unlocks wq.next_timer_absolute.lock
+ workitem_is_timer = true;
+ break;
+ }
+ if(earliest_absolute == std::chrono::system_clock::time_point() || wq.next_timer_absolute.front->_timepoint2 < earliest_absolute)
+ {
+ earliest_absolute = wq.next_timer_absolute.front->_timepoint2;
+ }
+ }
+ wq.next_timer_absolute.lock.unlock();
}
- if(wq._next_timer_absolute_front->_timepoint2 <= now_system)
+ workitem = wq.next_active(mythreadidx);
+ if(workitem != nullptr)
{
- workitem = wq.next_timer(2);
- workitem_is_timer = true;
break;
}
- if(earliest_absolute == std::chrono::system_clock::time_point() || wq._next_timer_absolute_front->_timepoint2 < earliest_absolute)
- {
- earliest_absolute = wq._next_timer_absolute_front->_timepoint2;
- }
+ workqueue_lock.lock();
+ lock_wq = lock_wq->next;
+ workqueue_lock.unlock();
}
- workitem = wq.next_active();
}
if(now_steady == std::chrono::steady_clock::time_point())
{
@@ -1326,6 +1476,7 @@ namespace detail
const std::chrono::steady_clock::duration max_sleep(std::chrono::milliseconds(ms_sleep_for_more_work.load(std::memory_order_relaxed)));
if(now_steady - self->last_did_work >= max_sleep)
{
+ threadpool_guard g(threadpool_lock);
// If there are any timers running, leave at least one worker thread
if(threadpool_active.count > 1 ||
(earliest_duration == std::chrono::steady_clock::time_point() && earliest_absolute == std::chrono::system_clock::time_point()))
@@ -1360,6 +1511,7 @@ namespace detail
earliest_absolute = {};
}
}
+ threadpool_guard g(threadpool_lock);
_remove_from_list(threadpool_active, self);
_append_to_list(threadpool_sleeping, self);
self->state--;
@@ -1391,7 +1543,6 @@ namespace detail
catch(...)
{
}
- g.lock();
continue;
}
self->last_did_work = now_steady;
@@ -1399,7 +1550,6 @@ namespace detail
std::cout << "*** DTP " << self << " executes work item " << workitem << std::endl;
#endif
total_submitted_workitems.fetch_sub(1, std::memory_order_relaxed);
- g.unlock();
if(workitem_is_timer)
{
_timerthread(workitem, nullptr);
@@ -1413,6 +1563,7 @@ namespace detail
{
if(populate_threadmetrics(now_steady))
{
+ threadpool_guard g(threadpool_lock);
_remove_from_list(threadpool_active, self);
threadpool_threads.fetch_sub(1, std::memory_order_release);
#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
@@ -1426,7 +1577,6 @@ namespace detail
catch(...)
{
}
- g.lock();
}
self->state -= 2; // dead
threadpool_threads.fetch_sub(1, std::memory_order_release);
@@ -1436,7 +1586,7 @@ namespace detail
}
#endif
- inline void global_dynamic_thread_pool_impl::_submit_work_item(_lock_guard &g, bool submit_into_highest_priority,
+ inline void global_dynamic_thread_pool_impl::_submit_work_item(dynamic_thread_pool_group_impl_guard &g, bool submit_into_highest_priority,
dynamic_thread_pool_group::work_item *workitem, bool defer_pool_wake)
{
(void) g;
@@ -1526,44 +1676,70 @@ namespace detail
// std::cout << "*** timer " << workitem << std::endl;
SetThreadpoolTimer((PTP_TIMER) workitem->_internaltimerh, &ft, 0, slop);
#else
- _lock_guard gg(workqueue_lock);
- auto *wq = &workqueue[submit_into_highest_priority ? (workqueue.size() - 1) : parent->_nesting_level];
- wq->append_timer(workitem);
+ workqueue_guard gg(workqueue_lock);
+ for(auto *p = workqueue.get(); p != nullptr; p = p->next.get())
+ {
+ if(p->nesting_level == parent->_nesting_level)
+ {
+ p->append_timer(workitem);
+ break;
+ }
+ }
#endif
}
else
{
#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
intptr_t priority = DISPATCH_QUEUE_PRIORITY_LOW;
- if(workqueue.size() - parent->_nesting_level == 1)
{
- priority = DISPATCH_QUEUE_PRIORITY_HIGH;
- }
- else if(workqueue.size() - parent->_nesting_level == 2)
- {
- priority = DISPATCH_QUEUE_PRIORITY_DEFAULT;
+ global_dynamic_thread_pool_impl::workqueue_guard gg(workqueue_lock);
+ if(workqueue->nesting_level == parent->_nesting_level)
+ {
+ priority = DISPATCH_QUEUE_PRIORITY_HIGH;
+ }
+ else if(workqueue->nesting_level == parent->_nesting_level + 1)
+ {
+ priority = DISPATCH_QUEUE_PRIORITY_DEFAULT;
+ }
}
// std::cout << "*** submit " << workitem << std::endl;
dispatch_group_async_f(parent->_grouph, dispatch_get_global_queue(priority, 0), workitem, _gcd_dispatch_callback);
#elif defined(_WIN32)
// Set the priority of the group according to distance from the top
TP_CALLBACK_PRIORITY priority = TP_CALLBACK_PRIORITY_LOW;
- if(workqueue.size() - parent->_nesting_level == 1)
{
- priority = TP_CALLBACK_PRIORITY_HIGH;
- }
- else if(workqueue.size() - parent->_nesting_level == 2)
- {
- priority = TP_CALLBACK_PRIORITY_NORMAL;
+ global_dynamic_thread_pool_impl::workqueue_guard gg(workqueue_lock);
+ if(workqueue->nesting_level == parent->_nesting_level)
+ {
+ priority = TP_CALLBACK_PRIORITY_HIGH;
+ }
+ else if(workqueue->nesting_level == parent->_nesting_level + 1)
+ {
+ priority = TP_CALLBACK_PRIORITY_NORMAL;
+ }
}
SetThreadpoolCallbackPriority(parent->_grouph, priority);
// std::cout << "*** submit " << workitem << std::endl;
SubmitThreadpoolWork((PTP_WORK) workitem->_internalworkh);
#else
- _lock_guard gg(workqueue_lock);
- auto *wq = &workqueue[submit_into_highest_priority ? (workqueue.size() - 1) : parent->_nesting_level];
- // TODO: It would be super nice if we prepended this instead if it came from a timer
- wq->append_active(workitem);
+ global_dynamic_thread_pool_impl::workqueue_guard gg(workqueue_lock);
+ if(submit_into_highest_priority || workqueue->nesting_level == parent->_nesting_level)
+ {
+ // TODO: It would be super nice if we prepended this instead if it came from a timer
+ workqueue->append_active(workitem);
+ }
+ else
+ {
+ for(auto *p = workqueue->next.get(); p != nullptr; p = p->next.get())
+ {
+ if(p->nesting_level == parent->_nesting_level)
+ {
+ // TODO: It would be super nice if we prepended this instead if it came from a timer
+ p->append_active(workitem);
+ break;
+ }
+ }
+ }
#endif
}
@@ -1578,7 +1754,7 @@ namespace detail
{
g.unlock(); // unlock group
{
- _lock_guard gg(workqueue_lock); // lock global
+ threadpool_guard gg(threadpool_lock);
if(threadpool_active.count == 0 && threadpool_sleeping.count == 0)
{
_add_thread(gg);
@@ -1602,7 +1778,7 @@ namespace detail
}
}
- inline result<void> global_dynamic_thread_pool_impl::submit(_lock_guard &g, dynamic_thread_pool_group_impl *group,
+ inline result<void> global_dynamic_thread_pool_impl::submit(dynamic_thread_pool_group_impl_guard &g, dynamic_thread_pool_group_impl *group,
span<dynamic_thread_pool_group::work_item *> work) noexcept
{
try
@@ -1683,7 +1859,7 @@ namespace detail
}
}
- inline void global_dynamic_thread_pool_impl::_work_item_done(_lock_guard &g, dynamic_thread_pool_group::work_item *i) noexcept
+ inline void global_dynamic_thread_pool_impl::_work_item_done(dynamic_thread_pool_group_impl_guard &g, dynamic_thread_pool_group::work_item *i) noexcept
{
(void) g;
// std::cout << "*** _work_item_done " << i << std::endl;
@@ -1787,7 +1963,8 @@ namespace detail
}
}
- inline result<void> global_dynamic_thread_pool_impl::stop(_lock_guard &g, dynamic_thread_pool_group_impl *group, result<void> err) noexcept
+ inline result<void> global_dynamic_thread_pool_impl::stop(dynamic_thread_pool_group_impl_guard &g, dynamic_thread_pool_group_impl *group,
+ result<void> err) noexcept
{
(void) g;
if(group->_abnormal_completion_cause)
@@ -1799,7 +1976,8 @@ namespace detail
}
- inline result<void> global_dynamic_thread_pool_impl::wait(_lock_guard &g, bool reap, dynamic_thread_pool_group_impl *group, deadline d) noexcept
+ inline result<void> global_dynamic_thread_pool_impl::wait(dynamic_thread_pool_group_impl_guard &g, bool reap, dynamic_thread_pool_group_impl *group,
+ deadline d) noexcept
{
LLFIO_DEADLINE_TO_SLEEP_INIT(d);
if(!d || d.nsecs > 0)
@@ -2018,7 +2196,7 @@ namespace detail
assert(workitem->_nextwork != -1);
assert(workitem->_has_timer_set());
auto *parent = workitem->_parent.load(std::memory_order_relaxed);
- _lock_guard g(parent->_lock); // lock group
+ dynamic_thread_pool_group_impl_guard g(parent->_lock); // lock group
// std::cout << "*** _timerthread " << workitem << std::endl;
if(parent->_stopping.load(std::memory_order_relaxed))
{
@@ -2087,7 +2265,7 @@ namespace detail
auto *parent = workitem->_parent.load(std::memory_order_relaxed);
if(parent->_stopping.load(std::memory_order_relaxed))
{
- _lock_guard g(parent->_lock); // lock group
+ dynamic_thread_pool_group_impl_guard g(parent->_lock); // lock group
_work_item_done(g, workitem);
return;
}
@@ -2099,7 +2277,7 @@ namespace detail
auto r = (*workitem)(workitem->_nextwork);
workitem->_nextwork = 0; // call next() next time
tls = old_thread_local_state;
- _lock_guard g(parent->_lock); // lock group
+ dynamic_thread_pool_group_impl_guard g(parent->_lock); // lock group
// std::cout << "*** _workerthread " << workitem << " ends with work " << workitem->_nextwork << std::endl;
if(!r)
{
@@ -2157,7 +2335,7 @@ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC dynamic_thread_pool_group::io_aware_work_item::i
}
}
auto &impl = detail::global_dynamic_thread_pool();
- detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.io_aware_work_item_handles_lock);
+ detail::global_dynamic_thread_pool_impl::io_aware_work_item_handles_guard g(impl.io_aware_work_item_handles_lock);
for(auto &h : hs)
{
if(!h.h->is_seekable())
@@ -2195,7 +2373,7 @@ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC dynamic_thread_pool_group::io_aware_work_item::~
{
LLFIO_LOG_FUNCTION_CALL(this);
auto &impl = detail::global_dynamic_thread_pool();
- detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.io_aware_work_item_handles_lock);
+ detail::global_dynamic_thread_pool_impl::io_aware_work_item_handles_guard g(impl.io_aware_work_item_handles_lock);
using value_type = decltype(impl.io_aware_work_item_handles)::value_type;
for(auto &h : _handles)
{
@@ -2213,7 +2391,7 @@ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC intptr_t dynamic_thread_pool_group::io_aware_wor
{
auto &impl = detail::global_dynamic_thread_pool();
auto now = std::chrono::steady_clock::now();
- detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.io_aware_work_item_handles_lock);
+ detail::global_dynamic_thread_pool_impl::io_aware_work_item_handles_guard g(impl.io_aware_work_item_handles_lock);
using value_type = decltype(impl.io_aware_work_item_handles)::value_type;
for(auto &h : _handles)
{