From c9f1aea5f393dca136de2c2cf8d20759e234cf8a Mon Sep 17 00:00:00 2001 From: "Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com)" Date: Tue, 19 Jan 2021 12:20:56 +0000 Subject: Implement thread enumeration for Linux native dynamic_thread_pool_group. --- include/llfio/revision.hpp | 6 +- .../v2.0/detail/impl/dynamic_thread_pool_group.ipp | 225 +++++++++++++++++++-- 2 files changed, 212 insertions(+), 19 deletions(-) diff --git a/include/llfio/revision.hpp b/include/llfio/revision.hpp index c72e8041..9e7774c5 100644 --- a/include/llfio/revision.hpp +++ b/include/llfio/revision.hpp @@ -1,4 +1,4 @@ // Note the second line of this file must ALWAYS be the git SHA, third line ALWAYS the git SHA update time -#define LLFIO_PREVIOUS_COMMIT_REF df3f68c97a25654d5fc659ada8a7bc04c7c80e84 -#define LLFIO_PREVIOUS_COMMIT_DATE "2021-01-13 12:32:47 +00:00" -#define LLFIO_PREVIOUS_COMMIT_UNIQUE df3f68c9 +#define LLFIO_PREVIOUS_COMMIT_REF ca8673c41b5a87fb834f9365c3dfbc6b668cd260 +#define LLFIO_PREVIOUS_COMMIT_DATE "2021-01-15 11:26:17 +00:00" +#define LLFIO_PREVIOUS_COMMIT_UNIQUE ca8673c4 diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp index 4950bff1..e5c7fbb9 100644 --- a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp +++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp @@ -55,6 +55,9 @@ Distributed under the Boost Software License, Version 1.0. #if !defined(__linux__) #error dynamic_thread_pool_group requires Grand Central Dispatch (libdispatch) on non-Linux POSIX. #endif +#include /* Defines DT_* constants */ +#include + #include #include #endif @@ -121,19 +124,51 @@ namespace detail std::atomic total_submitted_workitems{0}, threadpool_threads{0}, threadpool_sleeping_count{0}; std::mutex threadmetrics_lock; + struct threadmetrics_threadid + { + char text[12]; // enough for a UINT32_MAX in decimal + constexpr threadmetrics_threadid() + : text{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} + { + } + threadmetrics_threadid(string_view sv) + { + memset(text, '0', sizeof(text)); + assert(sv.size() <= sizeof(text)); + if(sv.size() > sizeof(text)) + { + abort(); + } + memcpy(text + sizeof(text) - sv.size(), sv.data(), sv.size()); + } + int compare(const threadmetrics_threadid &o) const noexcept { return memcmp(text, o.text, sizeof(text)); } + bool operator<(const threadmetrics_threadid &o) const noexcept { return compare(o) < 0; } + bool operator==(const threadmetrics_threadid &o) const noexcept { return compare(o) == 0; } + }; struct threadmetrics_item { threadmetrics_item *_prev{nullptr}, *_next{nullptr}; - uint64_t threadid{0}; std::chrono::steady_clock::time_point last_updated, blocked_since; // latter set if thread seen no time - uint32_t diskfaults{0}, utime{0}, stime{0}; // culmulative ticks spent in user and system for this thread + threadmetrics_threadid threadid; + uint32_t diskfaults{(uint32_t) -1}, utime{(uint32_t) -1}, stime{(uint32_t) -1}; // culmulative ticks spent in user and system for this thread + + explicit threadmetrics_item(threadmetrics_threadid v) + : threadid(v) + { + } + string_view threadid_name() const noexcept { return string_view(threadid.text, sizeof(threadid.text)); } }; struct threadmetrics_t { size_t count{0}; threadmetrics_item *front{nullptr}, *back{nullptr}; uint32_t blocked{0}, running{0}; - } threadmetrics; + } threadmetrics_queue; // items at end are least recently updated + std::vector threadmetrics_sorted; + std::chrono::steady_clock::time_point threadmetrics_last_updated; +#ifdef __linux__ + int proc_self_task_fd{-1}; +#endif #endif std::mutex io_aware_work_item_handles_lock; @@ -150,6 +185,9 @@ namespace detail global_dynamic_thread_pool_impl() { workqueue.reserve(4); // preallocate 4 levels of nesting +#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) + populate_threadmetrics(std::chrono::steady_clock::now()); +#endif } template static void _append_to_list(T &what, U *v) @@ -201,6 +239,129 @@ namespace detail } #if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) +#ifdef __linux__ + void populate_threadmetrics(std::chrono::steady_clock::time_point now) + { + static thread_local std::vector kernelbuffer(1024); + static thread_local std::vector threadidsbuffer(1024 / sizeof(dirent)); + using getdents64_t = int (*)(int, char *, unsigned int); + static auto getdents = static_cast([](int fd, char *buf, unsigned count) -> int { return syscall(SYS_getdents64, fd, buf, count); }); + using dirent = dirent64; + int bytes = 0; + { + _lock_guard g(threadmetrics_lock); + if(now - threadmetrics_last_updated < std::chrono::milliseconds(100)) + { + return; + } + if(proc_self_task_fd < 0) + { + proc_self_task_fd = ::open("/proc/self/task", O_RDONLY | O_DIRECTORY | O_CLOEXEC); + if(proc_self_task_fd < 0) + { + posix_error().throw_exception(); + } + } + for(;;) + { + if(-1 == ::lseek64(proc_self_task_fd, 0, SEEK_SET)) + { + posix_error().throw_exception(); + } + bytes = getdents(proc_self_task_fd, kernelbuffer.data(), kernelbuffer.size()); + if(bytes < (int) kernelbuffer.size()) + { + break; + } + kernelbuffer.resize(kernelbuffer.size() << 1); + } + } + threadidsbuffer.clear(); + for(auto *dent = (dirent *) kernelbuffer.data();; dent = reinterpret_cast(reinterpret_cast(dent) + dent->d_reclen)) + { + if(dent->d_ino != 0u && dent->d_type == DT_DIR && dent->d_name[0] != '.') + { + size_t length = strchr(dent->d_name, 0) - dent->d_name; + threadidsbuffer.push_back(string_view(dent->d_name, length)); + } + if((bytes -= dent->d_reclen) <= 0) + { + break; + } + } + std::sort(threadidsbuffer.begin(), threadidsbuffer.end()); + _lock_guard g(threadmetrics_lock); + auto d_it = threadmetrics_sorted.begin(); + auto s_it = threadidsbuffer.begin(); + auto remove_item = [&] { + std::cout << "Removing thread " << (*d_it)->threadid_name() << std::endl; + if((*d_it)->blocked_since != std::chrono::steady_clock::time_point()) + { + threadmetrics_queue.blocked--; + } + else + { + threadmetrics_queue.running--; + } + _remove_from_list(threadmetrics_queue, *d_it); + d_it = threadmetrics_sorted.erase(d_it); + }; + auto add_item = [&] { + auto p = std::make_unique(*s_it++); + if(d_it != threadmetrics_sorted.end()) + { + ++d_it; + } + d_it = threadmetrics_sorted.insert(d_it, p.get()); + _append_to_list(threadmetrics_queue, p.get()); + std::cout << "Adding thread " << p->threadid_name() << std::endl; + p.release(); + threadmetrics_queue.running++; + }; + for(; d_it != threadmetrics_sorted.end() && s_it != threadidsbuffer.end();) + { + auto c = (*d_it)->threadid.compare(*s_it); + // std::cout << "Comparing " << (*d_it)->threadid_name() << " with " << string_view(s_it->text, 12) << " = " << c << std::endl; + if(0 == c) + { + ++d_it; + ++s_it; + continue; + } + if(c < 0) + { + // d_it has gone away + remove_item(); + } + if(c > 0) + { + // s_it is a new entry + add_item(); + } + } + while(d_it != threadmetrics_sorted.end()) + { + remove_item(); + } + while(s_it != threadidsbuffer.end()) + { + add_item(); + } +#if 0 + std::cout << "Threadmetrics:"; + for(auto *p : threadmetrics_sorted) + { + std::cout << "\n " << p->threadid_name(); + } + std::cout << std::endl; +#endif + assert(threadmetrics_sorted.size() == threadidsbuffer.size()); + assert(std::is_sorted(threadmetrics_sorted.begin(), threadmetrics_sorted.end(), + [](threadmetrics_item *a, threadmetrics_item *b) { return a->threadid < b->threadid; })); + threadmetrics_last_updated = now; + } +#endif + inline void _execute_work(thread_t *self); void _add_thread(_lock_guard & /*unused*/) @@ -252,22 +413,38 @@ namespace detail ~global_dynamic_thread_pool_impl() { - _lock_guard g(workqueue_lock); // lock global - while(threadpool_active.count > 0 || threadpool_sleeping.count > 0) { - while(threadpool_sleeping.count > 0) + _lock_guard g(workqueue_lock); // lock global + while(threadpool_active.count > 0 || threadpool_sleeping.count > 0) { - auto removed = _remove_thread(g, threadpool_sleeping); - assert(removed); - (void) removed; - } - if(threadpool_active.count > 0) - { - auto removed = _remove_thread(g, threadpool_active); - assert(removed); - (void) removed; + while(threadpool_sleeping.count > 0) + { + auto removed = _remove_thread(g, threadpool_sleeping); + assert(removed); + (void) removed; + } + if(threadpool_active.count > 0) + { + auto removed = _remove_thread(g, threadpool_active); + assert(removed); + (void) removed; + } } } + _lock_guard g(threadmetrics_lock); + for(auto *p : threadmetrics_sorted) + { + delete p; + } + threadmetrics_sorted.clear(); + threadmetrics_queue = {}; +#ifdef __linux__ + if(proc_self_task_fd > 0) + { + ::close(proc_self_task_fd); + proc_self_task_fd = -1; + } +#endif } #endif @@ -743,6 +920,15 @@ namespace detail #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING std::cout << "*** DTP " << self << " wakes, state = " << self->state << std::endl; #endif + g.unlock(); + try + { + populate_threadmetrics(now); + } + catch(...) + { + } + g.lock(); continue; } self->last_did_work = now; @@ -752,8 +938,15 @@ namespace detail total_submitted_workitems.fetch_sub(1, std::memory_order_relaxed); g.unlock(); _workerthread(workitem, nullptr); - g.lock(); // workitem->_internalworkh should be null, however workitem may also no longer exist + try + { + populate_threadmetrics(now); + } + catch(...) + { + } + g.lock(); } self->state -= 2; // dead threadpool_threads.fetch_sub(1, std::memory_order_release); -- cgit v1.2.3