Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/windirstat/llfio.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2021-01-25 15:23:46 +0300
committerNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2021-03-16 13:21:42 +0300
commit944f8f03360b2d12f04dff25a35e9ef0e1f15e7b (patch)
tree152d45d7c920225d4cebe44126fac52913ca24f4
parenteef0d398dcfd3b4068d6d330d0b0287102cd26c5 (diff)
More improvements to dynamic scaling within native Linux implementation of dynamic_thread_pool_group, but still not working quite right yet.
-rw-r--r--include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp214
1 files changed, 122 insertions, 92 deletions
diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
index bc9b540c..b0bbef9a 100644
--- a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
+++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
@@ -168,6 +168,7 @@ namespace detail
std::chrono::steady_clock::time_point threadmetrics_last_updated;
std::atomic<bool> update_threadmetrics_reentrancy{false};
#ifdef __linux__
+ std::mutex proc_self_task_fd_lock;
int proc_self_task_fd{-1};
#endif
#endif
@@ -386,8 +387,8 @@ namespace detail
}
}
}
- std::cout << "Threadmetrics " << path << " " << state << " " << majflt << " " << utime << " " << stime << ". Previously " << item->diskfaults << " "
- << item->utime << " " << item->stime << std::endl;
+ // std::cout << "Threadmetrics " << path << " " << state << " " << majflt << " " << utime << " " << stime << ". Previously " << item->diskfaults << " "
+ // << item->utime << " " << item->stime << std::endl;
item->diskfaults = (uint32_t) majflt;
item->utime = (uint32_t) utime;
item->stime = (uint32_t) stime;
@@ -406,52 +407,57 @@ namespace detail
return false;
}
size_t updated = 0;
- while(updated++ < 10 && now - threadmetrics_queue.front->last_updated >= std::chrono::milliseconds(100))
+ while(now - threadmetrics_queue.front->last_updated >= std::chrono::milliseconds(100) && updated++ < 10)
{
auto *p = threadmetrics_queue.front;
update_item(p);
_remove_from_list(threadmetrics_queue, p);
_append_to_list(threadmetrics_queue, p);
}
- static const auto min_hardware_concurrency = std::thread::hardware_concurrency();
- static const auto max_hardware_concurrency = min_hardware_concurrency + (min_hardware_concurrency >> 1);
- auto toadd = std::max((ssize_t) 0, std::min((ssize_t) min_hardware_concurrency - (ssize_t) threadmetrics_queue.running,
- (ssize_t) total_submitted_workitems.load(std::memory_order_relaxed) -
- (ssize_t) threadpool_threads.load(std::memory_order_relaxed)));
- auto toremove = std::max((ssize_t) 0, (ssize_t) threadmetrics_queue.running - (ssize_t) max_hardware_concurrency);
- // std::cout << "Threadmetrics toadd = " << (toadd - (ssize_t) threadpool_sleeping.count) << " toremove = " << toremove
- // << " running = " << threadmetrics_queue.running << " blocked = " << threadmetrics_queue.blocked << " total = " << threadmetrics_queue.count
- // << ". Actual active = " << threadpool_active.count << " sleeping = " << threadpool_sleeping.count
- // << ". Current working threads = " << threadpool_threads.load(std::memory_order_relaxed)
- // << ". Current submitted work items = " << total_submitted_workitems.load(std::memory_order_relaxed) << std::endl;
- if(toadd > 0 || toremove > 0)
- {
- if(update_threadmetrics_reentrancy.exchange(true, std::memory_order_relaxed))
- {
- return false;
- }
- auto unupdate_threadmetrics_reentrancy =
- make_scope_exit([this]() noexcept { update_threadmetrics_reentrancy.store(false, std::memory_order_relaxed); });
- g.unlock();
- _lock_guard gg(workqueue_lock);
- toadd -= (ssize_t) threadpool_sleeping.count;
- for(; toadd > 0; toadd--)
- {
- _add_thread(gg);
- }
- for(; toremove > 0 && threadpool_sleeping.count > 0; toremove--)
- {
- if(!_remove_thread(gg, threadpool_sleeping))
+ if(updated > 0)
+ {
+ static const auto min_hardware_concurrency = std::thread::hardware_concurrency();
+ static const auto max_hardware_concurrency = min_hardware_concurrency + (min_hardware_concurrency >> 1);
+ auto toadd = std::max((ssize_t) 0, std::min((ssize_t) min_hardware_concurrency - (ssize_t) threadmetrics_queue.running,
+ (ssize_t) total_submitted_workitems.load(std::memory_order_relaxed) -
+ (ssize_t) threadpool_threads.load(std::memory_order_relaxed)));
+ auto toremove = std::max((ssize_t) 0, (ssize_t) threadmetrics_queue.running - (ssize_t) max_hardware_concurrency);
+ // std::cout << "Threadmetrics toadd = " << (toadd - (ssize_t) threadpool_sleeping.count) << " toremove = " << toremove
+ // << " running = " << threadmetrics_queue.running << " blocked = " << threadmetrics_queue.blocked << " total = " << threadmetrics_queue.count
+ // << ". Actual active = " << threadpool_active.count << " sleeping = " << threadpool_sleeping.count
+ // << ". Current working threads = " << threadpool_threads.load(std::memory_order_relaxed)
+ // << ". Current submitted work items = " << total_submitted_workitems.load(std::memory_order_relaxed) << std::endl;
+ if(toadd > 0 || toremove > 0)
+ {
+ if(update_threadmetrics_reentrancy.exchange(true, std::memory_order_relaxed))
{
- break;
+ return false;
}
+ auto unupdate_threadmetrics_reentrancy =
+ make_scope_exit([this]() noexcept { update_threadmetrics_reentrancy.store(false, std::memory_order_relaxed); });
+ g.unlock();
+ _lock_guard gg(workqueue_lock);
+ toadd -= (ssize_t) threadpool_sleeping.count;
+ std::cout << "total active = " << threadpool_active.count << " total idle = " << threadpool_sleeping.count << " toadd = " << toadd
+ << " toremove = " << toremove << std::endl;
+ for(; toadd > 0; toadd--)
+ {
+ _add_thread(gg);
+ }
+ for(; toremove > 0 && threadpool_sleeping.count > 0; toremove--)
+ {
+ if(!_remove_thread(gg, threadpool_sleeping))
+ {
+ break;
+ }
+ }
+ if(toremove > 0 && threadpool_active.count > 1)
+ {
+ // Kill myself, but not if I'm the final thread who needs to run timers
+ return true;
+ }
+ return false;
}
- if(toremove > 0 && threadpool_active.count > 1)
- {
- // Kill myself, but not if I'm the final thread who needs to run timers
- return true;
- }
- return false;
}
return false;
}
@@ -471,6 +477,7 @@ namespace detail
{
return update_threadmetrics(std::move(g), now, nullptr);
}
+ threadmetrics_last_updated = now;
if(proc_self_task_fd < 0)
{
proc_self_task_fd = ::open("/proc/self/task", O_RDONLY | O_DIRECTORY | O_CLOEXEC);
@@ -479,52 +486,70 @@ namespace detail
posix_error().throw_exception();
}
}
+ }
+ {
+ _lock_guard g(proc_self_task_fd_lock);
+ /* It turns out that /proc/self/task is quite racy in the Linux kernel, so keep
+ looping this until it stops telling obvious lies.
+ */
for(auto done = false; !done;)
{
if(-1 == ::lseek64(proc_self_task_fd, 0, SEEK_SET))
{
posix_error().throw_exception();
}
- bytes = 0;
- for(;;)
+ auto _bytes = getdents(proc_self_task_fd, kernelbuffer.data(), kernelbuffer.size());
+ // std::cout << "getdents(" << (kernelbuffer.size()-bytes) << ") returns " << _bytes << std::endl;
+ if(_bytes < 0 && errno != EINVAL)
{
- int _bytes = getdents(proc_self_task_fd, kernelbuffer.data() + bytes, kernelbuffer.size() - bytes);
- // std::cout << "getdents(" << (kernelbuffer.size()-bytes) << ") returns " << _bytes << std::endl;
- if(_bytes == 0)
- {
- done = true;
- break;
- }
- if(_bytes == -1 && errno == EINVAL)
+ posix_error().throw_exception();
+ }
+ if(_bytes >= 0 && kernelbuffer.size() - (size_t) _bytes >= sizeof(dirent) + 16)
+ {
+ bytes = (size_t) _bytes;
+ threadidsbuffer.clear();
+ for(auto *dent = (dirent *) kernelbuffer.data();; dent = reinterpret_cast<dirent *>(reinterpret_cast<uintptr_t>(dent) + dent->d_reclen))
{
- kernelbuffer.resize(kernelbuffer.size() << 1);
- continue;
+ if(dent->d_ino != 0u && dent->d_type == DT_DIR && dent->d_name[0] != '.')
+ {
+ size_t length = strchr(dent->d_name, 0) - dent->d_name;
+ threadidsbuffer.push_back(string_view(dent->d_name, length));
+ }
+ if((bytes -= dent->d_reclen) <= 0)
+ {
+ break;
+ }
}
- if(_bytes < 0)
+ auto mythreadcount = threadpool_threads.load(std::memory_order_relaxed);
+ if(threadidsbuffer.size() >= mythreadcount)
{
- posix_error().throw_exception();
+ // std::cout << "Parsed from /proc " << threadidsbuffer.size() << " entries, should be at least " << mythreadcount << std::endl;
+ std::sort(threadidsbuffer.begin(), threadidsbuffer.end());
+ done = true;
+ break;
}
- bytes += _bytes;
+#ifndef NDEBUG
+ std::cout << "NOTE: /proc returned " << threadidsbuffer.size() << " items when we know for a fact at least " << mythreadcount
+ << " threads exist, retrying!" << std::endl;
+#endif
+ continue;
}
+ kernelbuffer.resize(kernelbuffer.size() << 1);
}
}
- threadidsbuffer.clear();
- for(auto *dent = (dirent *) kernelbuffer.data();; dent = reinterpret_cast<dirent *>(reinterpret_cast<uintptr_t>(dent) + dent->d_reclen))
+ threadmetrics_item *firstnewitem = nullptr;
+ _lock_guard g(threadmetrics_lock);
+#if 0
{
- if(dent->d_ino != 0u && dent->d_type == DT_DIR && dent->d_name[0] != '.')
- {
- size_t length = strchr(dent->d_name, 0) - dent->d_name;
- threadidsbuffer.push_back(string_view(dent->d_name, length));
- }
- if((bytes -= dent->d_reclen) <= 0)
+ std::stringstream s;
+ s << "Parsed from /proc " << threadidsbuffer.size() << " entries (should be at least " << threadpool_threads.load(std::memory_order_relaxed) << "):";
+ for(auto &i : threadidsbuffer)
{
- break;
+ s << " " << string_view(i.text, 12);
}
+ std::cout << s.str() << std::endl;
}
- std::cout << "Parsed from /proc " << threadidsbuffer.size() << " entries." << std::endl;
- std::sort(threadidsbuffer.begin(), threadidsbuffer.end());
- threadmetrics_item *firstnewitem = nullptr;
- _lock_guard g(threadmetrics_lock);
+#endif
#if 0
{
auto d_it = threadmetrics_sorted.begin();
@@ -571,6 +596,7 @@ namespace detail
p.release();
threadmetrics_queue.running++;
};
+ // std::cout << "Compare" << std::endl;
for(; d_it != threadmetrics_sorted.end() && s_it != threadidsbuffer.end();)
{
auto c = (*d_it)->threadid.compare(*s_it);
@@ -592,10 +618,12 @@ namespace detail
add_item();
}
}
+ // std::cout << "Tail dest" << std::endl;
while(d_it != threadmetrics_sorted.end())
{
remove_item();
}
+ // std::cout << "Tail source" << std::endl;
while(s_it != threadidsbuffer.end())
{
add_item();
@@ -617,7 +645,6 @@ namespace detail
}
#endif
assert(threadmetrics_queue.running + threadmetrics_queue.blocked == threadidsbuffer.size());
- threadmetrics_last_updated = now;
return update_threadmetrics(std::move(g), now, firstnewitem);
}
#endif
@@ -668,7 +695,7 @@ namespace detail
return success();
}
- inline void _submit_work_item(_lock_guard &g, dynamic_thread_pool_group::work_item *workitem);
+ inline void _submit_work_item(_lock_guard &g, dynamic_thread_pool_group::work_item *workitem, bool defer_pool_wake);
inline result<void> submit(_lock_guard &g, dynamic_thread_pool_group_impl *group, span<dynamic_thread_pool_group::work_item *> work) noexcept;
@@ -1139,7 +1166,7 @@ namespace detail
}
#endif
- inline void global_dynamic_thread_pool_impl::_submit_work_item(_lock_guard &g, dynamic_thread_pool_group::work_item *workitem)
+ inline void global_dynamic_thread_pool_impl::_submit_work_item(_lock_guard &g, dynamic_thread_pool_group::work_item *workitem, bool defer_pool_wake)
{
(void) g;
if(workitem->_nextwork != -1)
@@ -1249,31 +1276,34 @@ namespace detail
std::cout << "*** DTP submits work item " << workitem << std::endl;
#endif
const auto active_work_items = total_submitted_workitems.fetch_add(1, std::memory_order_relaxed) + 1;
- const auto sleeping_count = threadpool_sleeping_count.load(std::memory_order_relaxed);
- const auto threads = threadpool_threads.load(std::memory_order_relaxed);
- if(sleeping_count > 0 || threads == 0)
+ if(!defer_pool_wake)
{
- g.unlock(); // unlock group
+ const auto sleeping_count = threadpool_sleeping_count.load(std::memory_order_relaxed);
+ const auto threads = threadpool_threads.load(std::memory_order_relaxed);
+ if(sleeping_count > 0 || threads == 0)
{
- _lock_guard gg(workqueue_lock); // lock global
- if(threadpool_active.count == 0 && threadpool_sleeping.count == 0)
+ g.unlock(); // unlock group
{
- _add_thread(gg);
- }
- else if(threadpool_sleeping.count > 0 && active_work_items > threadpool_active.count)
- {
- // Try to wake the most recently slept first
- auto *t = threadpool_sleeping.back;
- auto now = std::chrono::steady_clock::now();
- for(size_t n = std::min(active_work_items - threadpool_active.count, threadpool_sleeping.count); n > 0; n--)
+ _lock_guard gg(workqueue_lock); // lock global
+ if(threadpool_active.count == 0 && threadpool_sleeping.count == 0)
+ {
+ _add_thread(gg);
+ }
+ else if(threadpool_sleeping.count > 0 && active_work_items > threadpool_active.count)
{
- t->last_did_work = now; // prevent reap
- t->cond.notify_one();
- t = t->_prev;
+ // Try to wake the most recently slept first
+ auto *t = threadpool_sleeping.back;
+ auto now = std::chrono::steady_clock::now();
+ for(size_t n = std::min(active_work_items - threadpool_active.count, threadpool_sleeping.count); n > 0; n--)
+ {
+ t->last_did_work = now; // prevent reap
+ t->cond.notify_one();
+ t = t->_prev;
+ }
}
}
+ g.lock(); // lock group
}
- g.lock(); // lock group
}
#endif
}
@@ -1345,7 +1375,7 @@ namespace detail
{
for(auto *i : work)
{
- _submit_work_item(g, i);
+ _submit_work_item(g, i, i != work.back());
}
}
return success();
@@ -1479,7 +1509,7 @@ namespace detail
_work_item_done(g, workitem);
return;
}
- _submit_work_item(g, workitem);
+ _submit_work_item(g, workitem, false);
}
inline result<void> global_dynamic_thread_pool_impl::stop(_lock_guard &g, dynamic_thread_pool_group_impl *group, result<void> err) noexcept
@@ -1725,7 +1755,7 @@ namespace detail
if(workitem->_timepoint1 - now > std::chrono::seconds(0))
{
// Timer fired short, so schedule it again
- _submit_work_item(g, workitem);
+ _submit_work_item(g, workitem, false);
return;
}
#endif
@@ -1738,7 +1768,7 @@ namespace detail
if(workitem->_timepoint2 - now > std::chrono::seconds(0))
{
// Timer fired short, so schedule it again
- _submit_work_item(g, workitem);
+ _submit_work_item(g, workitem, false);
return;
}
#endif