Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/windirstat/llfio.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2021-02-25 16:07:28 +0300
committerNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2021-03-16 13:21:45 +0300
commit315522698efe62d1d97957e5a8fd992a7b6ecf10 (patch)
tree73f40af2ab336335a9547e4d4d66a3d05bf90d90 /include
parent0cf5eafa75c3cc66fd56a8a9a3dcc7089254846a (diff)
Lots of performance tuning of native Linux implementation, but still 5-10% slower than libdispatch.
Diffstat (limited to 'include')
-rw-r--r--include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp182
1 files changed, 114 insertions, 68 deletions
diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
index 41975697..c26a7945 100644
--- a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
+++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
@@ -336,9 +336,9 @@ namespace detail
struct global_dynamic_thread_pool_impl
{
std::mutex workqueue_lock;
- using _lock_guard = std::unique_lock<std::mutex>;
std::vector<global_dynamic_thread_pool_impl_workqueue_item> workqueue;
#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
+ using _lock_guard = std::unique_lock<std::mutex>;
using threadh_type = void *;
using grouph_type = dispatch_group_t;
static void _gcd_dispatch_callback(void *arg)
@@ -352,6 +352,7 @@ namespace detail
global_dynamic_thread_pool()._timerthread(workitem, nullptr);
}
#elif defined(_WIN32)
+ using _lock_guard = std::unique_lock<std::mutex>;
using threadh_type = PTP_CALLBACK_INSTANCE;
using grouph_type = PTP_CALLBACK_ENVIRON;
static void CALLBACK _win32_worker_thread_callback(threadh_type threadh, PVOID Parameter, PTP_WORK /*unused*/)
@@ -365,6 +366,44 @@ namespace detail
global_dynamic_thread_pool()._timerthread(workitem, threadh);
}
#else
+ using _lock_guard = std::unique_lock<std::mutex>;
+#if 0
+ class _lock_guard
+ {
+ std::mutex &_l;
+ bool _is_locked{false};
+ _lock_guard(std::mutex &l)
+ : _l(l)
+ {
+ lock();
+ }
+ _lock_guard(const _lock_guard &) = delete;
+ _lock_guard(_lock_guard &&) = delete;
+ _lock_guard &operator=(const _lock_guard &) = delete;
+ _lock_guard &operator=(_lock_guard &&) = delete;
+ ~_lock_guard()
+ {
+ if(_is_locked)
+ {
+ unlock();
+ }
+ }
+ void lock()
+ {
+ assert(!_is_locked);
+ global_dynamic_thread_pool_threads_awaiting_mutex.fetch_add(1, std::memory_order_relaxed);
+ _l.lock();
+ _is_locked = true;
+ }
+ void unlock()
+ {
+ assert(_is_locked);
+ global_dynamic_thread_pool_threads_awaiting_mutex.fetch_sub(1, std::memory_order_relaxed);
+ _l.unlock();
+ _is_locked = false;
+ }
+ };
+#endif
using threadh_type = void *;
using grouph_type = void *;
struct thread_t
@@ -380,8 +419,8 @@ namespace detail
size_t count{0};
thread_t *front{nullptr}, *back{nullptr};
} threadpool_active, threadpool_sleeping;
- std::atomic<size_t> total_submitted_workitems{0}, threadpool_threads{0}, threadpool_sleeping_count{0};
- std::atomic<uint32_t> ms_sleep_for_more_work{30000}; // TODO put back to 60000
+ std::atomic<size_t> total_submitted_workitems{0}, threadpool_threads{0};
+ std::atomic<uint32_t> ms_sleep_for_more_work{20000};
std::mutex threadmetrics_lock;
struct threadmetrics_threadid
@@ -426,7 +465,7 @@ namespace detail
} threadmetrics_queue; // items at front are least recently updated
std::vector<threadmetrics_item *> threadmetrics_sorted; // sorted by threadid
std::chrono::steady_clock::time_point threadmetrics_last_updated;
- std::atomic<bool> update_threadmetrics_reentrancy{false};
+ std::atomic<unsigned> populate_threadmetrics_reentrancy{0};
#ifdef __linux__
std::mutex proc_self_task_fd_lock;
int proc_self_task_fd{-1};
@@ -609,6 +648,7 @@ namespace detail
}
#ifdef __linux__
+ // You are guaranteed only one of these EVER executes at a time. Locking is probably overkill, but equally also probably harmless
bool update_threadmetrics(_lock_guard &&g, std::chrono::steady_clock::time_point now, threadmetrics_item *new_items)
{
auto update_item = [&](threadmetrics_item *item) {
@@ -684,56 +724,46 @@ namespace detail
return false;
}
size_t updated = 0;
- while(now - threadmetrics_queue.front->last_updated >= std::chrono::milliseconds(100) && updated++ < 10)
+ while(now - threadmetrics_queue.front->last_updated >= std::chrono::milliseconds(200) && updated++ < 4)
{
auto *p = threadmetrics_queue.front;
update_item(p);
_remove_from_list(threadmetrics_queue, p);
_append_to_list(threadmetrics_queue, p);
}
- if(updated > 0)
+ // if(updated > 0)
{
static const auto min_hardware_concurrency = std::thread::hardware_concurrency();
- static const auto max_hardware_concurrency = min_hardware_concurrency + (min_hardware_concurrency >> 1);
- auto toadd = std::max((ssize_t) 0, std::min((ssize_t) min_hardware_concurrency - (ssize_t) threadmetrics_queue.running,
- (ssize_t) total_submitted_workitems.load(std::memory_order_relaxed) -
- (ssize_t) threadpool_threads.load(std::memory_order_relaxed)));
- auto toremove = std::max((ssize_t) 0, (ssize_t) threadmetrics_queue.running - (ssize_t) max_hardware_concurrency);
- // std::cout << "Threadmetrics toadd = " << (toadd - (ssize_t) threadpool_sleeping.count) << " toremove = " << toremove
- // << " running = " << threadmetrics_queue.running << " blocked = " << threadmetrics_queue.blocked << " total = " << threadmetrics_queue.count
- // << ". Actual active = " << threadpool_active.count << " sleeping = " << threadpool_sleeping.count
- // << ". Current working threads = " << threadpool_threads.load(std::memory_order_relaxed)
- // << ". Current submitted work items = " << total_submitted_workitems.load(std::memory_order_relaxed) << std::endl;
+ static const auto max_hardware_concurrency = min_hardware_concurrency + 3;
+ auto threadmetrics_running = (ssize_t) threadmetrics_queue.running;
+ auto threadmetrics_blocked = (ssize_t) threadmetrics_queue.blocked;
+ g.unlock(); // drop threadmetrics_lock
+
+ _lock_guard gg(workqueue_lock);
+ // Adjust for the number of threads sleeping for more work
+ threadmetrics_running += threadpool_sleeping.count;
+ threadmetrics_blocked -= threadpool_sleeping.count;
+ if(threadmetrics_blocked < 0)
+ {
+ threadmetrics_blocked = 0;
+ }
+ const auto desired_concurrency = std::min((ssize_t) min_hardware_concurrency, (ssize_t) total_submitted_workitems.load(std::memory_order_relaxed));
+ auto toadd = std::max((ssize_t) 0, std::min(desired_concurrency - threadmetrics_running, desired_concurrency - (ssize_t) threadpool_active.count));
+ auto toremove = std::max((ssize_t) 0, (ssize_t) threadmetrics_running - (ssize_t) max_hardware_concurrency);
if(toadd > 0 || toremove > 0)
{
- if(update_threadmetrics_reentrancy.exchange(true, std::memory_order_relaxed))
- {
- return false;
- }
- auto unupdate_threadmetrics_reentrancy =
- make_scope_exit([this]() noexcept { update_threadmetrics_reentrancy.store(false, std::memory_order_relaxed); });
- g.unlock();
- _lock_guard gg(workqueue_lock);
- toadd -= (ssize_t) threadpool_sleeping.count;
- std::cout << "total active = " << threadpool_active.count << " total idle = " << threadpool_sleeping.count << " toadd = " << toadd
- << " toremove = " << toremove << std::endl;
- for(; toadd > 0; toadd--)
+ //std::cout << "total active = " << threadpool_active.count << " total idle = " << threadpool_sleeping.count
+ // << " threadmetrics_running = " << threadmetrics_running << " threadmetrics_blocked = " << threadmetrics_blocked << " toadd = " << toadd
+ // << " toremove = " << toremove << std::endl;
+ if(toadd > 0)
{
_add_thread(gg);
}
- for(; toremove > 0 && threadpool_sleeping.count > 0; toremove--)
- {
- if(!_remove_thread(gg, threadpool_sleeping))
- {
- break;
- }
- }
if(toremove > 0 && threadpool_active.count > 1)
{
- // Kill myself, but not if I'm the final thread who needs to run timers
+ // Kill myself, but not if I'm the final thread who might need to run timers
return true;
}
- return false;
}
}
return false;
@@ -741,6 +771,12 @@ namespace detail
// Returns true if calling thread is to exit
bool populate_threadmetrics(std::chrono::steady_clock::time_point now)
{
+ if(populate_threadmetrics_reentrancy.exchange(1, std::memory_order_relaxed) == 1)
+ {
+ return false;
+ }
+ auto unpopulate_threadmetrics_reentrancy = make_scope_exit([this]() noexcept { populate_threadmetrics_reentrancy.store(0, std::memory_order_relaxed); });
+
static thread_local std::vector<char> kernelbuffer(1024);
static thread_local std::vector<threadmetrics_threadid> threadidsbuffer(1024 / sizeof(dirent));
using getdents64_t = int (*)(int, char *, unsigned int);
@@ -749,9 +785,13 @@ namespace detail
size_t bytes = 0;
{
_lock_guard g(threadmetrics_lock);
- if(now - threadmetrics_last_updated < std::chrono::milliseconds(100) &&
+ if(now - threadmetrics_last_updated < std::chrono::milliseconds(250) &&
threadmetrics_queue.running + threadmetrics_queue.blocked >= threadpool_threads.load(std::memory_order_relaxed))
{
+ if(now - threadmetrics_last_updated < std::chrono::milliseconds(100))
+ {
+ return false;
+ }
return update_threadmetrics(std::move(g), now, nullptr);
}
threadmetrics_last_updated = now;
@@ -909,7 +949,7 @@ namespace detail
++s_it;
}
assert(threadmetrics_sorted.size() == threadidsbuffer.size());
-#if 1
+#if 0
if(!std::is_sorted(threadmetrics_sorted.begin(), threadmetrics_sorted.end(),
[](threadmetrics_item *a, threadmetrics_item *b) { return a->threadid < b->threadid; }))
{
@@ -1236,6 +1276,7 @@ namespace detail
bool workitem_is_timer = false;
std::chrono::steady_clock::time_point now_steady, earliest_duration;
std::chrono::system_clock::time_point now_system, earliest_absolute;
+ // Start from highest priority work group, executing any timers due before selecting a work item
for(auto it = workqueue.rbegin(); it != workqueue.rend() && workitem == nullptr; ++it)
{
auto &wq = *it;
@@ -1279,16 +1320,25 @@ namespace detail
{
now_steady = std::chrono::steady_clock::now();
}
+ // If there are no timers, and no work to do, time to either die or sleep
if(workitem == nullptr)
{
const std::chrono::steady_clock::duration max_sleep(std::chrono::milliseconds(ms_sleep_for_more_work.load(std::memory_order_relaxed)));
if(now_steady - self->last_did_work >= max_sleep)
{
- _remove_from_list(threadpool_active, self);
- threadpool_threads.fetch_sub(1, std::memory_order_release);
- self->thread.detach();
- delete self;
- return;
+ // If there are any timers running, leave at least one worker thread
+ if(threadpool_active.count > 1 ||
+ (earliest_duration == std::chrono::steady_clock::time_point() && earliest_absolute == std::chrono::system_clock::time_point()))
+ {
+ _remove_from_list(threadpool_active, self);
+ threadpool_threads.fetch_sub(1, std::memory_order_release);
+#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP " << self << " exits due to no new work for ms_sleep_for_more_work" << std::endl;
+#endif
+ self->thread.detach();
+ delete self;
+ return;
+ }
}
std::chrono::steady_clock::duration duration(max_sleep);
if(earliest_duration != std::chrono::steady_clock::time_point())
@@ -1313,7 +1363,6 @@ namespace detail
_remove_from_list(threadpool_active, self);
_append_to_list(threadpool_sleeping, self);
self->state--;
- threadpool_sleeping_count.fetch_add(1, std::memory_order_release);
if(earliest_absolute != std::chrono::system_clock::time_point())
{
#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
@@ -1331,7 +1380,6 @@ namespace detail
self->state++;
_remove_from_list(threadpool_sleeping, self);
_append_to_list(threadpool_active, self);
- threadpool_sleeping_count.fetch_sub(1, std::memory_order_release);
#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
std::cout << "*** DTP " << self << " wakes, state = " << self->state << std::endl;
#endif
@@ -1367,6 +1415,9 @@ namespace detail
{
_remove_from_list(threadpool_active, self);
threadpool_threads.fetch_sub(1, std::memory_order_release);
+#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP " << self << " exits due to threadmetrics saying we exceed max concurrency" << std::endl;
+#endif
self->thread.detach();
delete self;
return;
@@ -1379,8 +1430,8 @@ namespace detail
}
self->state -= 2; // dead
threadpool_threads.fetch_sub(1, std::memory_order_release);
-#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
- std::cout << "*** DTP " << self << " exits, state = " << self->state << std::endl;
+#if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP " << self << " exits due to state request, state = " << self->state << std::endl;
#endif
}
#endif
@@ -1525,32 +1576,27 @@ namespace detail
const auto active_work_items = total_submitted_workitems.fetch_add(1, std::memory_order_relaxed) + 1;
if(!defer_pool_wake)
{
- const auto sleeping_count = threadpool_sleeping_count.load(std::memory_order_relaxed);
- const auto threads = threadpool_threads.load(std::memory_order_relaxed);
- if(sleeping_count > 0 || threads == 0)
+ g.unlock(); // unlock group
{
- g.unlock(); // unlock group
+ _lock_guard gg(workqueue_lock); // lock global
+ if(threadpool_active.count == 0 && threadpool_sleeping.count == 0)
{
- _lock_guard gg(workqueue_lock); // lock global
- if(threadpool_active.count == 0 && threadpool_sleeping.count == 0)
- {
- _add_thread(gg);
- }
- else if(threadpool_sleeping.count > 0 && active_work_items > threadpool_active.count)
+ _add_thread(gg);
+ }
+ else if(threadpool_sleeping.count > 0 && active_work_items > threadpool_active.count)
+ {
+ // Try to wake the most recently slept first
+ auto *t = threadpool_sleeping.back;
+ auto now = std::chrono::steady_clock::now();
+ for(size_t n = std::min(active_work_items - threadpool_active.count, threadpool_sleeping.count); n > 0; n--)
{
- // Try to wake the most recently slept first
- auto *t = threadpool_sleeping.back;
- auto now = std::chrono::steady_clock::now();
- for(size_t n = std::min(active_work_items - threadpool_active.count, threadpool_sleeping.count); n > 0; n--)
- {
- t->last_did_work = now; // prevent reap
- t->cond.notify_one();
- t = t->_prev;
- }
+ t->last_did_work = now; // prevent reap
+ t->cond.notify_one();
+ t = t->_prev;
}
}
- g.lock(); // lock group
}
+ g.lock(); // lock group
}
#endif
}