diff options
author | Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com> | 2021-01-25 15:23:46 +0300 |
---|---|---|
committer | Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com> | 2021-03-16 13:21:42 +0300 |
commit | 944f8f03360b2d12f04dff25a35e9ef0e1f15e7b (patch) | |
tree | 152d45d7c920225d4cebe44126fac52913ca24f4 | |
parent | eef0d398dcfd3b4068d6d330d0b0287102cd26c5 (diff) |
More improvements to dynamic scaling within native Linux implementation of dynamic_thread_pool_group, but still not working quite right yet.
-rw-r--r-- | include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp | 214 |
1 files changed, 122 insertions, 92 deletions
diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp index bc9b540c..b0bbef9a 100644 --- a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp +++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp @@ -168,6 +168,7 @@ namespace detail std::chrono::steady_clock::time_point threadmetrics_last_updated; std::atomic<bool> update_threadmetrics_reentrancy{false}; #ifdef __linux__ + std::mutex proc_self_task_fd_lock; int proc_self_task_fd{-1}; #endif #endif @@ -386,8 +387,8 @@ namespace detail } } } - std::cout << "Threadmetrics " << path << " " << state << " " << majflt << " " << utime << " " << stime << ". Previously " << item->diskfaults << " " - << item->utime << " " << item->stime << std::endl; + // std::cout << "Threadmetrics " << path << " " << state << " " << majflt << " " << utime << " " << stime << ". Previously " << item->diskfaults << " " + // << item->utime << " " << item->stime << std::endl; item->diskfaults = (uint32_t) majflt; item->utime = (uint32_t) utime; item->stime = (uint32_t) stime; @@ -406,52 +407,57 @@ namespace detail return false; } size_t updated = 0; - while(updated++ < 10 && now - threadmetrics_queue.front->last_updated >= std::chrono::milliseconds(100)) + while(now - threadmetrics_queue.front->last_updated >= std::chrono::milliseconds(100) && updated++ < 10) { auto *p = threadmetrics_queue.front; update_item(p); _remove_from_list(threadmetrics_queue, p); _append_to_list(threadmetrics_queue, p); } - static const auto min_hardware_concurrency = std::thread::hardware_concurrency(); - static const auto max_hardware_concurrency = min_hardware_concurrency + (min_hardware_concurrency >> 1); - auto toadd = std::max((ssize_t) 0, std::min((ssize_t) min_hardware_concurrency - (ssize_t) threadmetrics_queue.running, - (ssize_t) total_submitted_workitems.load(std::memory_order_relaxed) - - (ssize_t) threadpool_threads.load(std::memory_order_relaxed))); - auto toremove = std::max((ssize_t) 0, (ssize_t) threadmetrics_queue.running - (ssize_t) max_hardware_concurrency); - // std::cout << "Threadmetrics toadd = " << (toadd - (ssize_t) threadpool_sleeping.count) << " toremove = " << toremove - // << " running = " << threadmetrics_queue.running << " blocked = " << threadmetrics_queue.blocked << " total = " << threadmetrics_queue.count - // << ". Actual active = " << threadpool_active.count << " sleeping = " << threadpool_sleeping.count - // << ". Current working threads = " << threadpool_threads.load(std::memory_order_relaxed) - // << ". Current submitted work items = " << total_submitted_workitems.load(std::memory_order_relaxed) << std::endl; - if(toadd > 0 || toremove > 0) - { - if(update_threadmetrics_reentrancy.exchange(true, std::memory_order_relaxed)) - { - return false; - } - auto unupdate_threadmetrics_reentrancy = - make_scope_exit([this]() noexcept { update_threadmetrics_reentrancy.store(false, std::memory_order_relaxed); }); - g.unlock(); - _lock_guard gg(workqueue_lock); - toadd -= (ssize_t) threadpool_sleeping.count; - for(; toadd > 0; toadd--) - { - _add_thread(gg); - } - for(; toremove > 0 && threadpool_sleeping.count > 0; toremove--) - { - if(!_remove_thread(gg, threadpool_sleeping)) + if(updated > 0) + { + static const auto min_hardware_concurrency = std::thread::hardware_concurrency(); + static const auto max_hardware_concurrency = min_hardware_concurrency + (min_hardware_concurrency >> 1); + auto toadd = std::max((ssize_t) 0, std::min((ssize_t) min_hardware_concurrency - (ssize_t) threadmetrics_queue.running, + (ssize_t) total_submitted_workitems.load(std::memory_order_relaxed) - + (ssize_t) threadpool_threads.load(std::memory_order_relaxed))); + auto toremove = std::max((ssize_t) 0, (ssize_t) threadmetrics_queue.running - (ssize_t) max_hardware_concurrency); + // std::cout << "Threadmetrics toadd = " << (toadd - (ssize_t) threadpool_sleeping.count) << " toremove = " << toremove + // << " running = " << threadmetrics_queue.running << " blocked = " << threadmetrics_queue.blocked << " total = " << threadmetrics_queue.count + // << ". Actual active = " << threadpool_active.count << " sleeping = " << threadpool_sleeping.count + // << ". Current working threads = " << threadpool_threads.load(std::memory_order_relaxed) + // << ". Current submitted work items = " << total_submitted_workitems.load(std::memory_order_relaxed) << std::endl; + if(toadd > 0 || toremove > 0) + { + if(update_threadmetrics_reentrancy.exchange(true, std::memory_order_relaxed)) { - break; + return false; } + auto unupdate_threadmetrics_reentrancy = + make_scope_exit([this]() noexcept { update_threadmetrics_reentrancy.store(false, std::memory_order_relaxed); }); + g.unlock(); + _lock_guard gg(workqueue_lock); + toadd -= (ssize_t) threadpool_sleeping.count; + std::cout << "total active = " << threadpool_active.count << " total idle = " << threadpool_sleeping.count << " toadd = " << toadd + << " toremove = " << toremove << std::endl; + for(; toadd > 0; toadd--) + { + _add_thread(gg); + } + for(; toremove > 0 && threadpool_sleeping.count > 0; toremove--) + { + if(!_remove_thread(gg, threadpool_sleeping)) + { + break; + } + } + if(toremove > 0 && threadpool_active.count > 1) + { + // Kill myself, but not if I'm the final thread who needs to run timers + return true; + } + return false; } - if(toremove > 0 && threadpool_active.count > 1) - { - // Kill myself, but not if I'm the final thread who needs to run timers - return true; - } - return false; } return false; } @@ -471,6 +477,7 @@ namespace detail { return update_threadmetrics(std::move(g), now, nullptr); } + threadmetrics_last_updated = now; if(proc_self_task_fd < 0) { proc_self_task_fd = ::open("/proc/self/task", O_RDONLY | O_DIRECTORY | O_CLOEXEC); @@ -479,52 +486,70 @@ namespace detail posix_error().throw_exception(); } } + } + { + _lock_guard g(proc_self_task_fd_lock); + /* It turns out that /proc/self/task is quite racy in the Linux kernel, so keep + looping this until it stops telling obvious lies. + */ for(auto done = false; !done;) { if(-1 == ::lseek64(proc_self_task_fd, 0, SEEK_SET)) { posix_error().throw_exception(); } - bytes = 0; - for(;;) + auto _bytes = getdents(proc_self_task_fd, kernelbuffer.data(), kernelbuffer.size()); + // std::cout << "getdents(" << (kernelbuffer.size()-bytes) << ") returns " << _bytes << std::endl; + if(_bytes < 0 && errno != EINVAL) { - int _bytes = getdents(proc_self_task_fd, kernelbuffer.data() + bytes, kernelbuffer.size() - bytes); - // std::cout << "getdents(" << (kernelbuffer.size()-bytes) << ") returns " << _bytes << std::endl; - if(_bytes == 0) - { - done = true; - break; - } - if(_bytes == -1 && errno == EINVAL) + posix_error().throw_exception(); + } + if(_bytes >= 0 && kernelbuffer.size() - (size_t) _bytes >= sizeof(dirent) + 16) + { + bytes = (size_t) _bytes; + threadidsbuffer.clear(); + for(auto *dent = (dirent *) kernelbuffer.data();; dent = reinterpret_cast<dirent *>(reinterpret_cast<uintptr_t>(dent) + dent->d_reclen)) { - kernelbuffer.resize(kernelbuffer.size() << 1); - continue; + if(dent->d_ino != 0u && dent->d_type == DT_DIR && dent->d_name[0] != '.') + { + size_t length = strchr(dent->d_name, 0) - dent->d_name; + threadidsbuffer.push_back(string_view(dent->d_name, length)); + } + if((bytes -= dent->d_reclen) <= 0) + { + break; + } } - if(_bytes < 0) + auto mythreadcount = threadpool_threads.load(std::memory_order_relaxed); + if(threadidsbuffer.size() >= mythreadcount) { - posix_error().throw_exception(); + // std::cout << "Parsed from /proc " << threadidsbuffer.size() << " entries, should be at least " << mythreadcount << std::endl; + std::sort(threadidsbuffer.begin(), threadidsbuffer.end()); + done = true; + break; } - bytes += _bytes; +#ifndef NDEBUG + std::cout << "NOTE: /proc returned " << threadidsbuffer.size() << " items when we know for a fact at least " << mythreadcount + << " threads exist, retrying!" << std::endl; +#endif + continue; } + kernelbuffer.resize(kernelbuffer.size() << 1); } } - threadidsbuffer.clear(); - for(auto *dent = (dirent *) kernelbuffer.data();; dent = reinterpret_cast<dirent *>(reinterpret_cast<uintptr_t>(dent) + dent->d_reclen)) + threadmetrics_item *firstnewitem = nullptr; + _lock_guard g(threadmetrics_lock); +#if 0 { - if(dent->d_ino != 0u && dent->d_type == DT_DIR && dent->d_name[0] != '.') - { - size_t length = strchr(dent->d_name, 0) - dent->d_name; - threadidsbuffer.push_back(string_view(dent->d_name, length)); - } - if((bytes -= dent->d_reclen) <= 0) + std::stringstream s; + s << "Parsed from /proc " << threadidsbuffer.size() << " entries (should be at least " << threadpool_threads.load(std::memory_order_relaxed) << "):"; + for(auto &i : threadidsbuffer) { - break; + s << " " << string_view(i.text, 12); } + std::cout << s.str() << std::endl; } - std::cout << "Parsed from /proc " << threadidsbuffer.size() << " entries." << std::endl; - std::sort(threadidsbuffer.begin(), threadidsbuffer.end()); - threadmetrics_item *firstnewitem = nullptr; - _lock_guard g(threadmetrics_lock); +#endif #if 0 { auto d_it = threadmetrics_sorted.begin(); @@ -571,6 +596,7 @@ namespace detail p.release(); threadmetrics_queue.running++; }; + // std::cout << "Compare" << std::endl; for(; d_it != threadmetrics_sorted.end() && s_it != threadidsbuffer.end();) { auto c = (*d_it)->threadid.compare(*s_it); @@ -592,10 +618,12 @@ namespace detail add_item(); } } + // std::cout << "Tail dest" << std::endl; while(d_it != threadmetrics_sorted.end()) { remove_item(); } + // std::cout << "Tail source" << std::endl; while(s_it != threadidsbuffer.end()) { add_item(); @@ -617,7 +645,6 @@ namespace detail } #endif assert(threadmetrics_queue.running + threadmetrics_queue.blocked == threadidsbuffer.size()); - threadmetrics_last_updated = now; return update_threadmetrics(std::move(g), now, firstnewitem); } #endif @@ -668,7 +695,7 @@ namespace detail return success(); } - inline void _submit_work_item(_lock_guard &g, dynamic_thread_pool_group::work_item *workitem); + inline void _submit_work_item(_lock_guard &g, dynamic_thread_pool_group::work_item *workitem, bool defer_pool_wake); inline result<void> submit(_lock_guard &g, dynamic_thread_pool_group_impl *group, span<dynamic_thread_pool_group::work_item *> work) noexcept; @@ -1139,7 +1166,7 @@ namespace detail } #endif - inline void global_dynamic_thread_pool_impl::_submit_work_item(_lock_guard &g, dynamic_thread_pool_group::work_item *workitem) + inline void global_dynamic_thread_pool_impl::_submit_work_item(_lock_guard &g, dynamic_thread_pool_group::work_item *workitem, bool defer_pool_wake) { (void) g; if(workitem->_nextwork != -1) @@ -1249,31 +1276,34 @@ namespace detail std::cout << "*** DTP submits work item " << workitem << std::endl; #endif const auto active_work_items = total_submitted_workitems.fetch_add(1, std::memory_order_relaxed) + 1; - const auto sleeping_count = threadpool_sleeping_count.load(std::memory_order_relaxed); - const auto threads = threadpool_threads.load(std::memory_order_relaxed); - if(sleeping_count > 0 || threads == 0) + if(!defer_pool_wake) { - g.unlock(); // unlock group + const auto sleeping_count = threadpool_sleeping_count.load(std::memory_order_relaxed); + const auto threads = threadpool_threads.load(std::memory_order_relaxed); + if(sleeping_count > 0 || threads == 0) { - _lock_guard gg(workqueue_lock); // lock global - if(threadpool_active.count == 0 && threadpool_sleeping.count == 0) + g.unlock(); // unlock group { - _add_thread(gg); - } - else if(threadpool_sleeping.count > 0 && active_work_items > threadpool_active.count) - { - // Try to wake the most recently slept first - auto *t = threadpool_sleeping.back; - auto now = std::chrono::steady_clock::now(); - for(size_t n = std::min(active_work_items - threadpool_active.count, threadpool_sleeping.count); n > 0; n--) + _lock_guard gg(workqueue_lock); // lock global + if(threadpool_active.count == 0 && threadpool_sleeping.count == 0) + { + _add_thread(gg); + } + else if(threadpool_sleeping.count > 0 && active_work_items > threadpool_active.count) { - t->last_did_work = now; // prevent reap - t->cond.notify_one(); - t = t->_prev; + // Try to wake the most recently slept first + auto *t = threadpool_sleeping.back; + auto now = std::chrono::steady_clock::now(); + for(size_t n = std::min(active_work_items - threadpool_active.count, threadpool_sleeping.count); n > 0; n--) + { + t->last_did_work = now; // prevent reap + t->cond.notify_one(); + t = t->_prev; + } } } + g.lock(); // lock group } - g.lock(); // lock group } #endif } @@ -1345,7 +1375,7 @@ namespace detail { for(auto *i : work) { - _submit_work_item(g, i); + _submit_work_item(g, i, i != work.back()); } } return success(); @@ -1479,7 +1509,7 @@ namespace detail _work_item_done(g, workitem); return; } - _submit_work_item(g, workitem); + _submit_work_item(g, workitem, false); } inline result<void> global_dynamic_thread_pool_impl::stop(_lock_guard &g, dynamic_thread_pool_group_impl *group, result<void> err) noexcept @@ -1725,7 +1755,7 @@ namespace detail if(workitem->_timepoint1 - now > std::chrono::seconds(0)) { // Timer fired short, so schedule it again - _submit_work_item(g, workitem); + _submit_work_item(g, workitem, false); return; } #endif @@ -1738,7 +1768,7 @@ namespace detail if(workitem->_timepoint2 - now > std::chrono::seconds(0)) { // Timer fired short, so schedule it again - _submit_work_item(g, workitem); + _submit_work_item(g, workitem, false); return; } #endif |