Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/windirstat/llfio.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2021-01-19 15:20:56 +0300
committerNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2021-03-16 13:21:41 +0300
commitc9f1aea5f393dca136de2c2cf8d20759e234cf8a (patch)
tree5e10ec73e8bfca7431ee08aca033971672693b0a
parent73f5571337a4760d526ea378d96f5b93693beb17 (diff)
Implement thread enumeration for Linux native dynamic_thread_pool_group.
-rw-r--r--include/llfio/revision.hpp6
-rw-r--r--include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp225
2 files changed, 212 insertions, 19 deletions
diff --git a/include/llfio/revision.hpp b/include/llfio/revision.hpp
index c72e8041..9e7774c5 100644
--- a/include/llfio/revision.hpp
+++ b/include/llfio/revision.hpp
@@ -1,4 +1,4 @@
// Note the second line of this file must ALWAYS be the git SHA, third line ALWAYS the git SHA update time
-#define LLFIO_PREVIOUS_COMMIT_REF df3f68c97a25654d5fc659ada8a7bc04c7c80e84
-#define LLFIO_PREVIOUS_COMMIT_DATE "2021-01-13 12:32:47 +00:00"
-#define LLFIO_PREVIOUS_COMMIT_UNIQUE df3f68c9
+#define LLFIO_PREVIOUS_COMMIT_REF ca8673c41b5a87fb834f9365c3dfbc6b668cd260
+#define LLFIO_PREVIOUS_COMMIT_DATE "2021-01-15 11:26:17 +00:00"
+#define LLFIO_PREVIOUS_COMMIT_UNIQUE ca8673c4
diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
index 4950bff1..e5c7fbb9 100644
--- a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
+++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
@@ -55,6 +55,9 @@ Distributed under the Boost Software License, Version 1.0.
#if !defined(__linux__)
#error dynamic_thread_pool_group requires Grand Central Dispatch (libdispatch) on non-Linux POSIX.
#endif
+#include <dirent.h> /* Defines DT_* constants */
+#include <sys/syscall.h>
+
#include <condition_variable>
#include <thread>
#endif
@@ -121,19 +124,51 @@ namespace detail
std::atomic<size_t> total_submitted_workitems{0}, threadpool_threads{0}, threadpool_sleeping_count{0};
std::mutex threadmetrics_lock;
+ struct threadmetrics_threadid
+ {
+ char text[12]; // enough for a UINT32_MAX in decimal
+ constexpr threadmetrics_threadid()
+ : text{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
+ {
+ }
+ threadmetrics_threadid(string_view sv)
+ {
+ memset(text, '0', sizeof(text));
+ assert(sv.size() <= sizeof(text));
+ if(sv.size() > sizeof(text))
+ {
+ abort();
+ }
+ memcpy(text + sizeof(text) - sv.size(), sv.data(), sv.size());
+ }
+ int compare(const threadmetrics_threadid &o) const noexcept { return memcmp(text, o.text, sizeof(text)); }
+ bool operator<(const threadmetrics_threadid &o) const noexcept { return compare(o) < 0; }
+ bool operator==(const threadmetrics_threadid &o) const noexcept { return compare(o) == 0; }
+ };
struct threadmetrics_item
{
threadmetrics_item *_prev{nullptr}, *_next{nullptr};
- uint64_t threadid{0};
std::chrono::steady_clock::time_point last_updated, blocked_since; // latter set if thread seen no time
- uint32_t diskfaults{0}, utime{0}, stime{0}; // culmulative ticks spent in user and system for this thread
+ threadmetrics_threadid threadid;
+ uint32_t diskfaults{(uint32_t) -1}, utime{(uint32_t) -1}, stime{(uint32_t) -1}; // culmulative ticks spent in user and system for this thread
+
+ explicit threadmetrics_item(threadmetrics_threadid v)
+ : threadid(v)
+ {
+ }
+ string_view threadid_name() const noexcept { return string_view(threadid.text, sizeof(threadid.text)); }
};
struct threadmetrics_t
{
size_t count{0};
threadmetrics_item *front{nullptr}, *back{nullptr};
uint32_t blocked{0}, running{0};
- } threadmetrics;
+ } threadmetrics_queue; // items at end are least recently updated
+ std::vector<threadmetrics_item *> threadmetrics_sorted;
+ std::chrono::steady_clock::time_point threadmetrics_last_updated;
+#ifdef __linux__
+ int proc_self_task_fd{-1};
+#endif
#endif
std::mutex io_aware_work_item_handles_lock;
@@ -150,6 +185,9 @@ namespace detail
global_dynamic_thread_pool_impl()
{
workqueue.reserve(4); // preallocate 4 levels of nesting
+#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32)
+ populate_threadmetrics(std::chrono::steady_clock::now());
+#endif
}
template <class T, class U> static void _append_to_list(T &what, U *v)
@@ -201,6 +239,129 @@ namespace detail
}
#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32)
+#ifdef __linux__
+ void populate_threadmetrics(std::chrono::steady_clock::time_point now)
+ {
+ static thread_local std::vector<char> kernelbuffer(1024);
+ static thread_local std::vector<threadmetrics_threadid> threadidsbuffer(1024 / sizeof(dirent));
+ using getdents64_t = int (*)(int, char *, unsigned int);
+ static auto getdents = static_cast<getdents64_t>([](int fd, char *buf, unsigned count) -> int { return syscall(SYS_getdents64, fd, buf, count); });
+ using dirent = dirent64;
+ int bytes = 0;
+ {
+ _lock_guard g(threadmetrics_lock);
+ if(now - threadmetrics_last_updated < std::chrono::milliseconds(100))
+ {
+ return;
+ }
+ if(proc_self_task_fd < 0)
+ {
+ proc_self_task_fd = ::open("/proc/self/task", O_RDONLY | O_DIRECTORY | O_CLOEXEC);
+ if(proc_self_task_fd < 0)
+ {
+ posix_error().throw_exception();
+ }
+ }
+ for(;;)
+ {
+ if(-1 == ::lseek64(proc_self_task_fd, 0, SEEK_SET))
+ {
+ posix_error().throw_exception();
+ }
+ bytes = getdents(proc_self_task_fd, kernelbuffer.data(), kernelbuffer.size());
+ if(bytes < (int) kernelbuffer.size())
+ {
+ break;
+ }
+ kernelbuffer.resize(kernelbuffer.size() << 1);
+ }
+ }
+ threadidsbuffer.clear();
+ for(auto *dent = (dirent *) kernelbuffer.data();; dent = reinterpret_cast<dirent *>(reinterpret_cast<uintptr_t>(dent) + dent->d_reclen))
+ {
+ if(dent->d_ino != 0u && dent->d_type == DT_DIR && dent->d_name[0] != '.')
+ {
+ size_t length = strchr(dent->d_name, 0) - dent->d_name;
+ threadidsbuffer.push_back(string_view(dent->d_name, length));
+ }
+ if((bytes -= dent->d_reclen) <= 0)
+ {
+ break;
+ }
+ }
+ std::sort(threadidsbuffer.begin(), threadidsbuffer.end());
+ _lock_guard g(threadmetrics_lock);
+ auto d_it = threadmetrics_sorted.begin();
+ auto s_it = threadidsbuffer.begin();
+ auto remove_item = [&] {
+ std::cout << "Removing thread " << (*d_it)->threadid_name() << std::endl;
+ if((*d_it)->blocked_since != std::chrono::steady_clock::time_point())
+ {
+ threadmetrics_queue.blocked--;
+ }
+ else
+ {
+ threadmetrics_queue.running--;
+ }
+ _remove_from_list(threadmetrics_queue, *d_it);
+ d_it = threadmetrics_sorted.erase(d_it);
+ };
+ auto add_item = [&] {
+ auto p = std::make_unique<threadmetrics_item>(*s_it++);
+ if(d_it != threadmetrics_sorted.end())
+ {
+ ++d_it;
+ }
+ d_it = threadmetrics_sorted.insert(d_it, p.get());
+ _append_to_list(threadmetrics_queue, p.get());
+ std::cout << "Adding thread " << p->threadid_name() << std::endl;
+ p.release();
+ threadmetrics_queue.running++;
+ };
+ for(; d_it != threadmetrics_sorted.end() && s_it != threadidsbuffer.end();)
+ {
+ auto c = (*d_it)->threadid.compare(*s_it);
+ // std::cout << "Comparing " << (*d_it)->threadid_name() << " with " << string_view(s_it->text, 12) << " = " << c << std::endl;
+ if(0 == c)
+ {
+ ++d_it;
+ ++s_it;
+ continue;
+ }
+ if(c < 0)
+ {
+ // d_it has gone away
+ remove_item();
+ }
+ if(c > 0)
+ {
+ // s_it is a new entry
+ add_item();
+ }
+ }
+ while(d_it != threadmetrics_sorted.end())
+ {
+ remove_item();
+ }
+ while(s_it != threadidsbuffer.end())
+ {
+ add_item();
+ }
+#if 0
+ std::cout << "Threadmetrics:";
+ for(auto *p : threadmetrics_sorted)
+ {
+ std::cout << "\n " << p->threadid_name();
+ }
+ std::cout << std::endl;
+#endif
+ assert(threadmetrics_sorted.size() == threadidsbuffer.size());
+ assert(std::is_sorted(threadmetrics_sorted.begin(), threadmetrics_sorted.end(),
+ [](threadmetrics_item *a, threadmetrics_item *b) { return a->threadid < b->threadid; }));
+ threadmetrics_last_updated = now;
+ }
+#endif
+
inline void _execute_work(thread_t *self);
void _add_thread(_lock_guard & /*unused*/)
@@ -252,22 +413,38 @@ namespace detail
~global_dynamic_thread_pool_impl()
{
- _lock_guard g(workqueue_lock); // lock global
- while(threadpool_active.count > 0 || threadpool_sleeping.count > 0)
{
- while(threadpool_sleeping.count > 0)
+ _lock_guard g(workqueue_lock); // lock global
+ while(threadpool_active.count > 0 || threadpool_sleeping.count > 0)
{
- auto removed = _remove_thread(g, threadpool_sleeping);
- assert(removed);
- (void) removed;
- }
- if(threadpool_active.count > 0)
- {
- auto removed = _remove_thread(g, threadpool_active);
- assert(removed);
- (void) removed;
+ while(threadpool_sleeping.count > 0)
+ {
+ auto removed = _remove_thread(g, threadpool_sleeping);
+ assert(removed);
+ (void) removed;
+ }
+ if(threadpool_active.count > 0)
+ {
+ auto removed = _remove_thread(g, threadpool_active);
+ assert(removed);
+ (void) removed;
+ }
}
}
+ _lock_guard g(threadmetrics_lock);
+ for(auto *p : threadmetrics_sorted)
+ {
+ delete p;
+ }
+ threadmetrics_sorted.clear();
+ threadmetrics_queue = {};
+#ifdef __linux__
+ if(proc_self_task_fd > 0)
+ {
+ ::close(proc_self_task_fd);
+ proc_self_task_fd = -1;
+ }
+#endif
}
#endif
@@ -743,6 +920,15 @@ namespace detail
#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
std::cout << "*** DTP " << self << " wakes, state = " << self->state << std::endl;
#endif
+ g.unlock();
+ try
+ {
+ populate_threadmetrics(now);
+ }
+ catch(...)
+ {
+ }
+ g.lock();
continue;
}
self->last_did_work = now;
@@ -752,8 +938,15 @@ namespace detail
total_submitted_workitems.fetch_sub(1, std::memory_order_relaxed);
g.unlock();
_workerthread(workitem, nullptr);
- g.lock();
// workitem->_internalworkh should be null, however workitem may also no longer exist
+ try
+ {
+ populate_threadmetrics(now);
+ }
+ catch(...)
+ {
+ }
+ g.lock();
}
self->state -= 2; // dead
threadpool_threads.fetch_sub(1, std::memory_order_release);