Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/windirstat/llfio.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2021-01-13 15:32:41 +0300
committerNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2021-03-16 13:21:40 +0300
commit476b74799c4504a248d2fe63ae060f8eead21c47 (patch)
tree5721bd90024dadfff03c50047c4565c83026ea95
parent29f262d1b4d918ce1b96b8ab0536ae9fc343e6b3 (diff)
wip native linux threadpool implementation for dynamic_thread_pool_group.
-rw-r--r--CMakeLists.txt7
-rw-r--r--include/llfio/revision.hpp6
-rw-r--r--include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp536
-rw-r--r--include/llfio/v2.0/detail/impl/posix/statfs.ipp4
-rw-r--r--test/tests/dynamic_thread_pool_group.cpp18
5 files changed, 524 insertions, 47 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 0a82bd45..11750c94 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -291,18 +291,19 @@ if(NOT LLFIO_DISABLE_LIBDISPATCH)
check_cxx_source_compiles("
#include <dispatch/dispatch.h>
int main() {
- dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
- return 0;
+ return dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0) != nullptr;
}
" LLFIO_HAS_LIBDISPATCH_${postfix})
endfunction()
check_have_libdispatch(BUILTIN)
if(NOT LLFIO_HAS_LIBDISPATCH_BUILTIN)
- check_have_libdispatch(WITH_LIBDISPATCH)
+ check_have_libdispatch(WITH_LIBDISPATCH dispatch)
if(LLFIO_HAS_LIBDISPATCH_WITH_LIBDISPATCH)
all_link_libraries(PUBLIC dispatch)
endif()
endif()
+else()
+ all_compile_definitions(PUBLIC LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD=0)
endif()
if(NOT LLFIO_HAS_LIBDISPATCH_BUILTIN AND NOT LLFIO_HAS_LIBDISPATCH_WITH_LIBDISPATCH AND (CMAKE_SYSTEM_NAME MATCHES "FreeBSD" OR APPLE))
indented_message(FATAL_ERROR "FATAL: Grand Central Dispatch as libdispatch was not found on this FreeBSD or Mac OS system. libdispatch is required for LLFIO to build on those systems.")
diff --git a/include/llfio/revision.hpp b/include/llfio/revision.hpp
index c56a2976..e5f28dcc 100644
--- a/include/llfio/revision.hpp
+++ b/include/llfio/revision.hpp
@@ -1,4 +1,4 @@
// Note the second line of this file must ALWAYS be the git SHA, third line ALWAYS the git SHA update time
-#define LLFIO_PREVIOUS_COMMIT_REF 96bff4fddf442a2526cda850876b82c191e4030a
-#define LLFIO_PREVIOUS_COMMIT_DATE "2021-01-01 18:00:23 +00:00"
-#define LLFIO_PREVIOUS_COMMIT_UNIQUE 96bff4fd
+#define LLFIO_PREVIOUS_COMMIT_REF 45112b3cffebb5f8409c0edfc8c8879a0aeaf516
+#define LLFIO_PREVIOUS_COMMIT_DATE "2021-01-02 17:33:42 +00:00"
+#define LLFIO_PREVIOUS_COMMIT_UNIQUE 45112b3c
diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
index 57e1e525..a04aef3c 100644
--- a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
+++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
@@ -47,13 +47,19 @@ Distributed under the Boost Software License, Version 1.0.
#if __has_include(<dispatch/dispatch.h>)
#include <dispatch/dispatch.h>
#define LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD 1
-#else
-#define LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD 0
-#error Right now dynamic_thread_pool_group requires libdispatch to be available on POSIX. It should get auto discovered if installed, which is the default on BSD and Mac OS. Try installing libdispatch-dev if on Linux.
#endif
#endif
#endif
#endif
+#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32)
+#if !defined(__linux__)
+#error dynamic_thread_pool_group requires Grand Central Dispatch (libdispatch) on non-Linux POSIX.
+#endif
+#include <condition_variable>
+#include <thread>
+#endif
+
+#define LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING 0
LLFIO_V2_NAMESPACE_BEGIN
@@ -61,6 +67,15 @@ namespace detail
{
struct global_dynamic_thread_pool_impl
{
+ std::mutex workqueue_lock;
+ using _lock_guard = std::unique_lock<std::mutex>;
+ struct workqueue_item
+ {
+ std::unordered_set<dynamic_thread_pool_group_impl *> items;
+ std::unordered_set<dynamic_thread_pool_group_impl *>::iterator currentgroup;
+ size_t currentgroupremaining{0};
+ };
+ std::vector<workqueue_item> workqueue;
#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
using threadh_type = void *;
using grouph_type = dispatch_group_t;
@@ -87,15 +102,39 @@ namespace detail
auto *workitem = (dynamic_thread_pool_group::work_item *) Parameter;
global_dynamic_thread_pool()._timerthread(workitem, threadh);
}
-#endif
+#else
+ using threadh_type = void *;
+ using grouph_type = void *;
+ struct thread_t
+ {
+ thread_t *_prev{nullptr}, *_next{nullptr};
+ std::thread thread;
+ std::condition_variable cond;
+ std::chrono::steady_clock::time_point last_did_work;
+ int state{0}; // <0 = dead, 0 = sleeping/please die, 1 = busy
+ };
+ struct threads_t
+ {
+ size_t count{0};
+ thread_t *front{nullptr}, *back{nullptr};
+ } threadpool_active, threadpool_sleeping;
+ std::atomic<size_t> total_submitted_workitems{0}, threadpool_threads{0}, threadpool_sleeping_count{0};
- std::mutex workqueue_lock;
- using _lock_guard = std::unique_lock<std::mutex>;
- struct workqueue_item
+ std::mutex threadmetrics_lock;
+ struct threadmetrics_item
{
- std::unordered_set<dynamic_thread_pool_group_impl *> items;
+ threadmetrics_item *_prev{nullptr}, *_next{nullptr};
+ uint64_t threadid{0};
+ std::chrono::steady_clock::time_point last_updated, blocked_since; // latter set if thread seen no time
+ uint32_t diskfaults{0}, utime{0}, stime{0}; // culmulative ticks spent in user and system for this thread
};
- std::vector<workqueue_item> workqueue;
+ struct threadmetrics_t
+ {
+ size_t count{0};
+ threadmetrics_item *front{nullptr}, *back{nullptr};
+ uint32_t blocked{0}, running{0};
+ } threadmetrics;
+#endif
std::mutex io_aware_work_item_handles_lock;
struct io_aware_work_item_statfs
@@ -161,6 +200,77 @@ namespace detail
what.count--;
}
+#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32)
+ inline void _execute_work(thread_t *self);
+
+ void _add_thread(_lock_guard & /*unused*/)
+ {
+ thread_t *p = nullptr;
+ try
+ {
+ p = new thread_t;
+ _append_to_list(threadpool_active, p);
+ p->thread = std::thread([this, p] { _execute_work(p); });
+ }
+ catch(...)
+ {
+ if(p != nullptr)
+ {
+ _remove_from_list(threadpool_active, p);
+ }
+ // drop failure
+ }
+ }
+
+ bool _remove_thread(_lock_guard &g, threads_t &which)
+ {
+ if(which.count == 0)
+ {
+ return false;
+ }
+ // Threads which went to sleep the longest ago are at the front
+ auto *t = which.front;
+ assert(t->state == 0);
+ t->state--;
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP " << t << " is told to quit" << std::endl;
+#endif
+ do
+ {
+ g.unlock();
+ t->cond.notify_one();
+ g.lock();
+ } while(t->state >= -1);
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP " << t << " has quit, deleting" << std::endl;
+#endif
+ _remove_from_list(threadpool_active, t);
+ t->thread.join();
+ delete t;
+ return true;
+ }
+
+ ~global_dynamic_thread_pool_impl()
+ {
+ _lock_guard g(workqueue_lock); // lock global
+ while(threadpool_active.count > 0 || threadpool_sleeping.count > 0)
+ {
+ while(threadpool_sleeping.count > 0)
+ {
+ auto removed = _remove_thread(g, threadpool_sleeping);
+ assert(removed);
+ (void) removed;
+ }
+ if(threadpool_active.count > 0)
+ {
+ auto removed = _remove_thread(g, threadpool_active);
+ assert(removed);
+ (void) removed;
+ }
+ }
+ }
+#endif
+
result<void> _prepare_work_item_delay(dynamic_thread_pool_group::work_item *workitem, grouph_type grouph, deadline d)
{
if(!d)
@@ -182,6 +292,9 @@ namespace detail
{
return win32_error();
}
+#else
+ (void) grouph;
+ workitem->_internaltimerh = (void *) (uintptr_t) -1;
#endif
}
if(d.nsecs > 0)
@@ -195,6 +308,10 @@ namespace detail
workitem->_timepoint2 = d.to_time_point();
}
}
+ else
+ {
+ workitem->_timepoint1 = std::chrono::steady_clock::time_point(std::chrono::steady_clock::duration(1));
+ }
}
return success();
}
@@ -207,7 +324,7 @@ namespace detail
inline void _work_item_next(_lock_guard &g, dynamic_thread_pool_group::work_item *i) noexcept;
inline result<void> stop(_lock_guard &g, dynamic_thread_pool_group_impl *group, result<void> err) noexcept;
- inline result<void> wait(_lock_guard &g, bool reap, const dynamic_thread_pool_group_impl *group, deadline d) noexcept;
+ inline result<void> wait(_lock_guard &g, bool reap, dynamic_thread_pool_group_impl *group, deadline d) noexcept;
inline void _timerthread(dynamic_thread_pool_group::work_item *workitem, threadh_type selfthreadh);
inline void _workerthread(dynamic_thread_pool_group::work_item *workitem, threadh_type selfthreadh);
@@ -245,6 +362,7 @@ class dynamic_thread_pool_group_impl final : public dynamic_thread_pool_group
dynamic_thread_pool_group::work_item *front{nullptr}, *back{nullptr};
} _work_items_active, _work_items_done, _work_items_delayed;
std::atomic<bool> _stopping{false}, _stopped{true}, _completing{false};
+ std::atomic<int> _waits{0};
result<void> _abnormal_completion_cause{success()}; // The cause of any abnormal group completion
#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
@@ -252,6 +370,8 @@ class dynamic_thread_pool_group_impl final : public dynamic_thread_pool_group
#elif defined(_WIN32)
TP_CALLBACK_ENVIRON _callbackenviron;
PTP_CALLBACK_ENVIRON _grouph{&_callbackenviron};
+#else
+ void *_grouph{nullptr};
#endif
public:
@@ -271,13 +391,19 @@ public:
#elif defined(_WIN32)
InitializeThreadpoolEnvironment(_grouph);
#endif
- detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.workqueue_lock);
+ detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.workqueue_lock); // lock global
// Append this group to the global work queue at its nesting level
if(_nesting_level >= impl.workqueue.size())
{
impl.workqueue.resize(_nesting_level + 1);
}
- impl.workqueue[_nesting_level].items.insert(this);
+ auto &wq = impl.workqueue[_nesting_level];
+ wq.items.insert(this);
+ if(wq.items.size() == 1)
+ {
+ wq.currentgroup = wq.items.begin();
+ wq.currentgroupremaining = _work_items_active.count;
+ }
return success();
}
catch(...)
@@ -304,9 +430,21 @@ public:
_grouph = nullptr;
}
#endif
- detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.workqueue_lock);
+ detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.workqueue_lock); // lock global
assert(impl.workqueue.size() > _nesting_level);
- impl.workqueue[_nesting_level].items.erase(this);
+ auto &wq = impl.workqueue[_nesting_level];
+ if(*wq.currentgroup == this)
+ {
+ if(wq.items.end() == ++wq.currentgroup)
+ {
+ wq.currentgroup = wq.items.begin();
+ }
+ if(!wq.items.empty())
+ {
+ wq.currentgroupremaining = (*wq.currentgroup)->_work_items_active.count;
+ }
+ }
+ wq.items.erase(this);
while(!impl.workqueue.empty() && impl.workqueue.back().items.empty())
{
impl.workqueue.pop_back();
@@ -320,7 +458,6 @@ public:
{
return errc::operation_canceled;
}
- _stopped.store(false, std::memory_order_release);
if(_completing.load(std::memory_order_relaxed))
{
for(auto *i : work)
@@ -330,8 +467,9 @@ public:
}
return success();
}
+ _stopped.store(false, std::memory_order_release);
auto &impl = detail::global_dynamic_thread_pool();
- _lock_guard g(_lock);
+ _lock_guard g(_lock); // lock group
if(_work_items_active.count == 0 && _work_items_done.count == 0)
{
_abnormal_completion_cause = success();
@@ -352,7 +490,7 @@ public:
return success();
}
auto &impl = detail::global_dynamic_thread_pool();
- _lock_guard g(_lock);
+ _lock_guard g(_lock); // lock group
return impl.stop(g, this, errc::operation_canceled);
}
@@ -368,8 +506,8 @@ public:
return success();
}
auto &impl = detail::global_dynamic_thread_pool();
- _lock_guard g(_lock);
- return impl.wait(g, true, this, d);
+ _lock_guard g(_lock); // lock group
+ return impl.wait(g, true, const_cast<dynamic_thread_pool_group_impl *>(this), d);
}
};
@@ -399,6 +537,221 @@ LLFIO_HEADERS_ONLY_FUNC_SPEC result<dynamic_thread_pool_group_ptr> make_dynamic_
namespace detail
{
+#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32)
+ inline void global_dynamic_thread_pool_impl::_execute_work(thread_t *self)
+ {
+ pthread_setname_np(pthread_self(), "LLFIO DYN TPG");
+ self->last_did_work = std::chrono::steady_clock::now();
+ _lock_guard g(workqueue_lock); // lock global
+ self->state++; // busy
+ threadpool_threads.fetch_add(1, std::memory_order_release);
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP " << self << " begins." << std::endl;
+#endif
+ while(self->state > 0)
+ {
+ restart:
+ dynamic_thread_pool_group::work_item *workitem = nullptr;
+ std::chrono::steady_clock::time_point earliest_duration;
+ std::chrono::system_clock::time_point earliest_absolute;
+ if(!workqueue.empty())
+ {
+ auto wq = --workqueue.end();
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP " << self << " restarts from top of work queue" << std::endl;
+#endif
+ for(;;)
+ {
+ dynamic_thread_pool_group_impl *tpg = *wq->currentgroup;
+ _lock_guard gg(tpg->_lock); // lock group
+ if(wq->currentgroupremaining > tpg->_work_items_active.count)
+ {
+ wq->currentgroupremaining = tpg->_work_items_active.count;
+ }
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP " << self << " sees " << wq->currentgroupremaining << " items remaining in group " << tpg << std::endl;
+#endif
+ if(wq->currentgroupremaining > 0 && tpg->_work_items_active.front->_internalworkh == nullptr)
+ {
+ auto *wi = tpg->_work_items_active.front;
+ _remove_from_list(tpg->_work_items_active, wi);
+ _append_to_list(tpg->_work_items_active, wi);
+ wq->currentgroupremaining--;
+ if(wi->_internaltimerh == nullptr)
+ {
+ workitem = wi;
+ workitem->_internalworkh = self;
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP " << self << " chooses work item " << workitem << " from group " << tpg << " distance from top "
+ << (workqueue.end() - wq - 1) << std::endl;
+#endif
+ break;
+ }
+ bool invoketimer = false;
+ if(wi->_timepoint1 != std::chrono::steady_clock::time_point())
+ {
+ // Special constant for immediately rescheduled work items
+ if(wi->_timepoint1 == std::chrono::steady_clock::time_point(std::chrono::steady_clock::duration(1)))
+ {
+ invoketimer = true;
+ }
+ else if(earliest_duration == std::chrono::steady_clock::time_point() || wi->_timepoint1 < earliest_duration)
+ {
+ earliest_duration = wi->_timepoint1;
+ if(wi->_timepoint1 <= std::chrono::steady_clock::now())
+ {
+ invoketimer = true;
+ }
+ }
+ }
+ if(wi->_timepoint2 != std::chrono::system_clock::time_point() &&
+ (earliest_absolute == std::chrono::system_clock::time_point() || wi->_timepoint2 < earliest_absolute))
+ {
+ earliest_absolute = wi->_timepoint2;
+ if(wi->_timepoint2 <= std::chrono::system_clock::now())
+ {
+ invoketimer = true;
+ }
+ }
+ if(invoketimer)
+ {
+ wi->_internalworkh = self;
+ wi->_internaltimerh = nullptr;
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP " << self << " executes timer item " << wi << std::endl;
+#endif
+ gg.unlock();
+ g.unlock();
+ _timerthread(wi, nullptr);
+ g.lock();
+ // wi->_internalworkh should be null, however wi may also no longer exist
+ goto restart;
+ }
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP " << self << " timer item " << wi << " timer is not ready yet " << std::endl;
+#endif
+ }
+ else
+ {
+ auto startinggroup = wq->currentgroup;
+ do
+ {
+ gg.unlock(); // unlock group
+ if(++wq->currentgroup == wq->items.end())
+ {
+ wq->currentgroup = wq->items.begin();
+ }
+ tpg = *wq->currentgroup;
+ gg = _lock_guard(tpg->_lock); // lock group
+ if(startinggroup == wq->currentgroup)
+ {
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP " << self << " workqueue distance " << (workqueue.end() - wq - 1) << " examining " << tpg
+ << " finds _work_items_active.count = " << tpg->_work_items_active.count << "." << std::endl;
+#endif
+ if(tpg->_work_items_active.count == 0 || tpg->_work_items_active.front->_internalworkh != nullptr)
+ {
+ // Nothing for me to do in this workqueue
+ if(wq == workqueue.begin())
+ {
+ assert(workitem == nullptr);
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP " << self << " finds work queue empty, going to sleep." << std::endl;
+#endif
+ goto workqueue_empty;
+ }
+ gg.unlock(); // unlock group
+ --wq;
+ tpg = *wq->currentgroup;
+ startinggroup = wq->currentgroup;
+ gg = _lock_guard(tpg->_lock); // lock group
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP " << self << " moves work search up to distance from top = " << (workqueue.end() - wq - 1) << std::endl;
+#endif
+ continue;
+ }
+ }
+ } while(tpg->_work_items_active.count == 0);
+ wq->currentgroupremaining = tpg->_work_items_active.count;
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP " << self << " choose new group " << tpg << std::endl;
+#endif
+ }
+ }
+ }
+ workqueue_empty:
+ auto now = std::chrono::steady_clock::now();
+ if(workitem == nullptr)
+ {
+ std::chrono::steady_clock::duration duration(std::chrono::minutes(1));
+ if(earliest_duration != std::chrono::steady_clock::time_point())
+ {
+ if(now - earliest_duration < duration)
+ {
+ duration = now - earliest_duration;
+ }
+ }
+ else if(earliest_absolute != std::chrono::system_clock::time_point())
+ {
+ auto diff = std::chrono::system_clock::now() - earliest_absolute;
+ if(diff > duration)
+ {
+ earliest_absolute = {};
+ }
+ }
+ else if(now - self->last_did_work >= std::chrono::minutes(1))
+ {
+ _remove_from_list(threadpool_active, self);
+ self->thread.detach();
+ delete self;
+ return;
+ }
+ self->last_did_work = now;
+ _remove_from_list(threadpool_active, self);
+ _append_to_list(threadpool_sleeping, self);
+ self->state--;
+ threadpool_sleeping_count.fetch_add(1, std::memory_order_release);
+ if(earliest_absolute != std::chrono::system_clock::time_point())
+ {
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP " << self << " goes to sleep absolute" << std::endl;
+#endif
+ self->cond.wait_until(g, earliest_absolute);
+ }
+ else
+ {
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP " << self << " goes to sleep for " << std::chrono::duration_cast<std::chrono::milliseconds>(duration).count() << std::endl;
+#endif
+ self->cond.wait_for(g, duration);
+ }
+ self->state++;
+ _remove_from_list(threadpool_sleeping, self);
+ _append_to_list(threadpool_active, self);
+ threadpool_sleeping_count.fetch_sub(1, std::memory_order_release);
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP " << self << " wakes, state = " << self->state << std::endl;
+#endif
+ continue;
+ }
+ self->last_did_work = now;
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP " << self << " executes work item " << workitem << std::endl;
+#endif
+ total_submitted_workitems.fetch_sub(1, std::memory_order_relaxed);
+ g.unlock();
+ _workerthread(workitem, nullptr);
+ g.lock();
+ // workitem->_internalworkh should be null, however workitem may also no longer exist
+ }
+ self->state -= 2; // dead
+ threadpool_threads.fetch_sub(1, std::memory_order_release);
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP " << self << " exits, state = " << self->state << std::endl;
+#endif
+ }
+#endif
+
inline void global_dynamic_thread_pool_impl::_submit_work_item(_lock_guard &g, dynamic_thread_pool_group::work_item *workitem)
{
(void) g;
@@ -502,6 +855,43 @@ namespace detail
SubmitThreadpoolWork((PTP_WORK) workitem->_internalworkh);
#endif
}
+#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32)
+ // Indicate that I can be executed again
+ workitem->_internalworkh = nullptr;
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP submits work item " << workitem << std::endl;
+#endif
+ const auto active_work_items = total_submitted_workitems.fetch_add(1, std::memory_order_relaxed) + 1;
+ const auto sleeping_count = threadpool_sleeping_count.load(std::memory_order_relaxed);
+ const auto threads = threadpool_threads.load(std::memory_order_relaxed);
+ if(sleeping_count > 0 || threads == 0)
+ {
+ g.unlock(); // unlock group
+ {
+ _lock_guard gg(workqueue_lock); // lock global
+ if(threadpool_active.count == 0 && threadpool_sleeping.count == 0)
+ {
+ _add_thread(gg);
+ _add_thread(gg);
+ _add_thread(gg);
+ _add_thread(gg);
+ }
+ else if(threadpool_sleeping.count > 0 && active_work_items > threadpool_active.count)
+ {
+ // Try to wake the most recently slept first
+ auto *t = threadpool_sleeping.back;
+ auto now = std::chrono::steady_clock::now();
+ for(size_t n = std::min(active_work_items - threadpool_active.count, threadpool_sleeping.count); n > 0; n--)
+ {
+ t->last_did_work = now; // prevent reap
+ t->cond.notify_one();
+ t = t->_prev;
+ }
+ }
+ }
+ g.lock(); // lock group
+ }
+#endif
}
}
@@ -607,6 +997,12 @@ namespace detail
i->_internalworkh = nullptr;
}
}
+#else
+ i->_internaltimerh = nullptr;
+ i->_internalworkh = nullptr;
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP sets done work item " << i << std::endl;
+#endif
#endif
_remove_from_list(i->_parent.load(std::memory_order_relaxed)->_work_items_active, i);
_append_to_list(i->_parent.load(std::memory_order_relaxed)->_work_items_done, i);
@@ -626,24 +1022,55 @@ namespace detail
parent->_work_items_done.count = 0;
parent->_stopping.store(false, std::memory_order_release);
parent->_stopped.store(true, std::memory_order_release);
- parent->_completing.store(true, std::memory_order_release);
+ parent->_completing.store(true, std::memory_order_release); // cause submissions to enter _work_items_delayed
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP executes group_complete for group " << parent << std::endl;
+#endif
for(; v != nullptr; v = n)
{
n = v->_next;
v->group_complete(parent->_abnormal_completion_cause);
}
- parent->_completing.store(false, std::memory_order_release);
- // Did a least one group_complete() submit more work to myself?
- while(parent->_work_items_delayed.count > 0)
+ parent->_completing.store(false, std::memory_order_release); // cease submitting to _work_items_delayed
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP group_complete done for group " << parent << ". _work_items_delayed.count = " << parent->_work_items_delayed.count << std::endl;
+#endif
+ if(parent->_work_items_delayed.count > 0)
{
- i = parent->_work_items_delayed.front;
- _remove_from_list(parent->_work_items_delayed, i);
- auto r = submit(g, parent, {&i, 1});
- if(!r)
+ /* If there are waits on this group to complete, forward progress those now.
+ */
+ while(parent->_waits.load(std::memory_order_relaxed) > 0)
{
- parent->_work_items_delayed = {};
- (void) stop(g, parent, std::move(r));
- break;
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP group_complete blocks on waits for group " << parent << std::endl;
+#endif
+ g.unlock();
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ g.lock();
+ }
+ // Now submit all delayed work
+ while(parent->_work_items_delayed.count > 0)
+ {
+ i = parent->_work_items_delayed.front;
+ _remove_from_list(parent->_work_items_delayed, i);
+ auto r = submit(g, parent, {&i, 1});
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP group_complete submits delayed work item " << i << " for group " << parent << " which saw error ";
+ if(r)
+ {
+ std::cout << "none" << std::endl;
+ }
+ else
+ {
+ std::cout << r.error().message() << std::endl;
+ }
+#endif
+ if(!r)
+ {
+ parent->_work_items_delayed = {};
+ (void) stop(g, parent, std::move(r));
+ break;
+ }
}
}
}
@@ -682,11 +1109,24 @@ namespace detail
return success();
}
- inline result<void> global_dynamic_thread_pool_impl::wait(_lock_guard &g, bool reap, const dynamic_thread_pool_group_impl *group, deadline d) noexcept
+
+ inline result<void> global_dynamic_thread_pool_impl::wait(_lock_guard &g, bool reap, dynamic_thread_pool_group_impl *group, deadline d) noexcept
{
LLFIO_DEADLINE_TO_SLEEP_INIT(d);
if(!d || d.nsecs > 0)
{
+ /* To ensure forward progress, we need to gate new waits during delayed work submission.
+ Otherwise waits may never exit if the window where _work_items_active.count == 0 is
+ missed.
+ */
+ while(group->_work_items_delayed.count > 0)
+ {
+ g.unlock();
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ g.lock();
+ }
+ group->_waits.fetch_add(1, std::memory_order_release);
+ auto unwaitcount = make_scope_exit([&]() noexcept { group->_waits.fetch_sub(1, std::memory_order_release); });
#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
while(group->_work_items_active.count > 0)
{
@@ -842,6 +1282,34 @@ namespace detail
g.lock();
}
}
+#else
+#if 0
+ if(group->_stopping.load(std::memory_order_relaxed))
+ {
+ // Kill all work items not currently being executed immediately
+ for(bool done = false; !done;)
+ {
+ done = true;
+ for(auto *p = group->_work_items_active.front; p != nullptr; p = p->_next)
+ {
+ if(p->_internalworkh == nullptr)
+ {
+ _remove_from_list(group->_work_items_active, p);
+ _append_to_list(group->_work_items_done, p);
+ done = false;
+ break;
+ }
+ }
+ }
+ }
+#endif
+ while(group->_work_items_active.count > 0)
+ {
+ LLFIO_DEADLINE_TO_TIMEOUT_LOOP(d);
+ g.unlock();
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ g.lock();
+ }
#endif
}
if(group->_work_items_active.count > 0)
@@ -859,7 +1327,7 @@ namespace detail
{
LLFIO_LOG_FUNCTION_CALL(this);
assert(workitem->_nextwork != -1);
- _lock_guard g(workitem->_parent.load(std::memory_order_relaxed)->_lock);
+ _lock_guard g(workitem->_parent.load(std::memory_order_relaxed)->_lock); // lock group
// std::cout << "*** _timerthread " << workitem << std::endl;
if(workitem->_parent.load(std::memory_order_relaxed)->_stopping.load(std::memory_order_relaxed))
{
@@ -907,7 +1375,7 @@ namespace detail
assert(workitem->_nextwork != 0);
if(workitem->_parent.load(std::memory_order_relaxed)->_stopping.load(std::memory_order_relaxed))
{
- _lock_guard g(workitem->_parent.load(std::memory_order_relaxed)->_lock);
+ _lock_guard g(workitem->_parent.load(std::memory_order_relaxed)->_lock); // lock group
_work_item_done(g, workitem);
return;
}
@@ -919,7 +1387,7 @@ namespace detail
auto r = (*workitem)(workitem->_nextwork);
workitem->_nextwork = 0; // call next() next time
tls = old_thread_local_state;
- _lock_guard g(workitem->_parent.load(std::memory_order_relaxed)->_lock);
+ _lock_guard g(workitem->_parent.load(std::memory_order_relaxed)->_lock); // lock group
// std::cout << "*** _workerthread " << workitem << " ends with work " << workitem->_nextwork << std::endl;
if(!r)
{
diff --git a/include/llfio/v2.0/detail/impl/posix/statfs.ipp b/include/llfio/v2.0/detail/impl/posix/statfs.ipp
index 97aff0f0..092ca9f2 100644
--- a/include/llfio/v2.0/detail/impl/posix/statfs.ipp
+++ b/include/llfio/v2.0/detail/impl/posix/statfs.ipp
@@ -1,5 +1,5 @@
/* Information about the volume storing a file
-(C) 2016-2017 Niall Douglas <http://www.nedproductions.biz/> (5 commits)
+(C) 2016-2020 Niall Douglas <http://www.nedproductions.biz/> (5 commits)
File Created: Jan 2016
@@ -483,7 +483,7 @@ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result<std::pair<uint32_t, float>> statfs_t::_fi
On Mac OS, getting the current i/o wait time appears to be privileged only?
*/
#endif
- return {-1, _allbits1_float};
+ return {-1, detail::constexpr_float_allbits_set_nan()};
}
catch(...)
{
diff --git a/test/tests/dynamic_thread_pool_group.cpp b/test/tests/dynamic_thread_pool_group.cpp
index 58d670c4..a40d3b05 100644
--- a/test/tests/dynamic_thread_pool_group.cpp
+++ b/test/tests/dynamic_thread_pool_group.cpp
@@ -146,7 +146,12 @@ static inline void TestDynamicThreadPoolGroupWorks()
BOOST_CHECK(llfio::dynamic_thread_pool_group::current_work_item() == nullptr);
};
auto check = [&] {
- shared_state.tpg->wait().value();
+ auto r = shared_state.tpg->wait();
+ if(!r)
+ {
+ std::cerr << "ERROR: wait() reports failure " << r.error().message() << std::endl;
+ r.value();
+ }
BOOST_CHECK(!shared_state.tpg->stopping());
BOOST_CHECK(shared_state.tpg->stopped());
BOOST_CHECK(llfio::dynamic_thread_pool_group::current_nesting_level() == 0);
@@ -186,6 +191,9 @@ static inline void TestDynamicThreadPoolGroupWorks()
}
std::cout << "Maximum concurrency achieved with " << workitems.size() << " work items = " << shared_state.max_concurrency << "\n" << std::endl;
};
+ auto print_exception_throw = llfio::make_scope_fail([]() noexcept {
+ std::cout << "NOTE: Exception throw occurred!" << std::endl;
+ });
// Test a single work item
reset(1);
@@ -430,9 +438,9 @@ static inline void TestDynamicThreadPoolGroupIoAwareWorks()
BOOST_CHECK(paced > 0);
}
-KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, works, "Tests that llfio::dynamic_thread_pool_group works as expected",
- TestDynamicThreadPoolGroupWorks())
+//KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, works, "Tests that llfio::dynamic_thread_pool_group works as expected",
+// TestDynamicThreadPoolGroupWorks())
KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, nested, "Tests that nesting of llfio::dynamic_thread_pool_group works as expected",
TestDynamicThreadPoolGroupNestingWorks())
-KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, io_aware_work_item,
- "Tests that llfio::dynamic_thread_pool_group::io_aware_work_item works as expected", TestDynamicThreadPoolGroupIoAwareWorks())
+//KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, io_aware_work_item,
+// "Tests that llfio::dynamic_thread_pool_group::io_aware_work_item works as expected", TestDynamicThreadPoolGroupIoAwareWorks())