Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/windirstat/llfio.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2020-12-22 22:34:56 +0300
committerNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2021-03-16 13:21:38 +0300
commit82fcea61c21b31ce325dc47401808dd45d4ef42c (patch)
treecbc729475a11da4f0119ebf6db1802509049b725
parent2e729a6c6d6d6fef94c6c9fa48826ec313a46fd9 (diff)
wip dynamic_thread_pool_group, so far has Windows support only. Note that LLFIO now hard requires Windows 7 rather than Vista.
-rw-r--r--CMakeLists.txt3
-rw-r--r--cmake/headers.cmake2
-rw-r--r--cmake/tests.cmake1
-rw-r--r--include/llfio/revision.hpp6
-rw-r--r--include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp728
-rw-r--r--include/llfio/v2.0/dynamic_thread_pool_group.hpp311
-rw-r--r--include/llfio/v2.0/llfio.hpp2
-rw-r--r--include/llfio/v2.0/logging.hpp31
-rw-r--r--include/llfio/v2.0/status_code.hpp6
9 files changed, 1081 insertions, 9 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index dec1ade8..ccf2387d 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -293,7 +293,8 @@ if(LLFIO_USE_EXPERIMENTAL_SG14_STATUS_CODE)
all_compile_definitions(PUBLIC LLFIO_EXPERIMENTAL_STATUS_CODE=1)
endif()
if(WIN32)
- all_compile_definitions(PRIVATE _WIN32_WINNT=0x601) ## Target Win7
+ all_compile_definitions(PRIVATE _WIN32_WINNT=0x601) ## Target Win7
+ target_compile_definitions(llfio_hl INTERFACE _WIN32_WINNT=0x601) ## Target Win7
if(NOT LLFIO_USE_EXPERIMENTAL_SG14_STATUS_CODE)
target_link_libraries(llfio_hl INTERFACE ntkernel-error-category::hl)
target_link_libraries(llfio_dl PUBLIC ntkernel-error-category::dl)
diff --git a/cmake/headers.cmake b/cmake/headers.cmake
index af4511e6..88c7deb9 100644
--- a/cmake/headers.cmake
+++ b/cmake/headers.cmake
@@ -29,6 +29,7 @@ set(llfio_HEADERS
"include/llfio/v2.0/detail/impl/cached_parent_handle_adapter.ipp"
"include/llfio/v2.0/detail/impl/clone.ipp"
"include/llfio/v2.0/detail/impl/config.ipp"
+ "include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp"
"include/llfio/v2.0/detail/impl/fast_random_file_handle.ipp"
"include/llfio/v2.0/detail/impl/io_multiplexer.ipp"
"include/llfio/v2.0/detail/impl/path_discovery.ipp"
@@ -77,6 +78,7 @@ set(llfio_HEADERS
"include/llfio/v2.0/detail/impl/windows/test/iocp_multiplexer.ipp"
"include/llfio/v2.0/detail/impl/windows/utils.ipp"
"include/llfio/v2.0/directory_handle.hpp"
+ "include/llfio/v2.0/dynamic_thread_pool_group.hpp"
"include/llfio/v2.0/fast_random_file_handle.hpp"
"include/llfio/v2.0/file_handle.hpp"
"include/llfio/v2.0/fs_handle.hpp"
diff --git a/cmake/tests.cmake b/cmake/tests.cmake
index 47e3ba9e..de7d54d4 100644
--- a/cmake/tests.cmake
+++ b/cmake/tests.cmake
@@ -7,6 +7,7 @@ set(llfio_TESTS
"test/tests/directory_handle_create_close/runner.cpp"
"test/tests/directory_handle_enumerate/kernel_directory_handle_enumerate.cpp.hpp"
"test/tests/directory_handle_enumerate/runner.cpp"
+ "test/tests/dynamic_thread_pool_group.cpp"
"test/tests/fast_random_file_handle.cpp"
"test/tests/file_handle_create_close/kernel_file_handle.cpp.hpp"
"test/tests/file_handle_create_close/runner.cpp"
diff --git a/include/llfio/revision.hpp b/include/llfio/revision.hpp
index 9e73339c..4fec5a76 100644
--- a/include/llfio/revision.hpp
+++ b/include/llfio/revision.hpp
@@ -1,4 +1,4 @@
// Note the second line of this file must ALWAYS be the git SHA, third line ALWAYS the git SHA update time
-#define LLFIO_PREVIOUS_COMMIT_REF 540cff7aa6d51cdad25c50857a4c3f523368cd33
-#define LLFIO_PREVIOUS_COMMIT_DATE "2020-12-17 13:41:30 +00:00"
-#define LLFIO_PREVIOUS_COMMIT_UNIQUE 540cff7a
+#define LLFIO_PREVIOUS_COMMIT_REF 4fa0e9bf0835d9d4d9dd8b0f93e6c650650fc0af
+#define LLFIO_PREVIOUS_COMMIT_DATE "2020-12-21 16:27:04 +00:00"
+#define LLFIO_PREVIOUS_COMMIT_UNIQUE 4fa0e9bf
diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
new file mode 100644
index 00000000..c44dcde4
--- /dev/null
+++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
@@ -0,0 +1,728 @@
+/* Dynamic thread pool group
+(C) 2020 Niall Douglas <http://www.nedproductions.biz/> (9 commits)
+File Created: Dec 2020
+
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License in the accompanying file
+Licence.txt or at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+
+Distributed under the Boost Software License, Version 1.0.
+ (See accompanying file Licence.txt or copy at
+ http://www.boost.org/LICENSE_1_0.txt)
+*/
+
+#include "../../dynamic_thread_pool_group.hpp"
+
+#include <atomic>
+#include <mutex>
+#include <unordered_set>
+#include <vector>
+
+#ifdef _WIN32
+#include "windows/import.hpp"
+#include <threadpoolapiset.h>
+#else
+#include <pthread>
+#endif
+
+LLFIO_V2_NAMESPACE_BEGIN
+
+namespace detail
+{
+ struct global_dynamic_thread_pool_impl
+ {
+#ifdef _WIN32
+ using threadh_type = PTP_CALLBACK_INSTANCE;
+ using grouph_type = PTP_CALLBACK_ENVIRON;
+ static void CALLBACK _win32_worker_thread_callback(threadh_type threadh, PVOID Parameter, PTP_WORK /*unused*/)
+ {
+ auto *workitem = (dynamic_thread_pool_group::work_item *) Parameter;
+ global_dynamic_thread_pool()._workerthread(workitem, threadh);
+ }
+ static void CALLBACK _win32_timer_thread_callback(threadh_type threadh, PVOID Parameter, PTP_TIMER /*unused*/)
+ {
+ auto *workitem = (dynamic_thread_pool_group::work_item *) Parameter;
+ global_dynamic_thread_pool()._timerthread(workitem, threadh);
+ }
+#else
+#endif
+
+ std::mutex workqueue_lock;
+ using _lock_guard = std::unique_lock<std::mutex>;
+ struct workqueue_item
+ {
+ std::unordered_set<dynamic_thread_pool_group_impl *> items;
+ };
+ std::vector<workqueue_item> workqueue;
+
+ global_dynamic_thread_pool_impl()
+ {
+ workqueue.reserve(4); // preallocate 4 levels of nesting
+ }
+
+ template <class T, class U> static void _append_to_list(T &what, U *v)
+ {
+ if(what.front == nullptr)
+ {
+ assert(what.back == nullptr);
+ v->_next = v->_prev = nullptr;
+ what.front = what.back = v;
+ }
+ else
+ {
+ v->_next = nullptr;
+ v->_prev = what.back;
+ what.back->_next = v;
+ what.back = v;
+ }
+ what.count++;
+ }
+ template <class T, class U> static void _remove_from_list(T &what, U *v)
+ {
+ if(v->_prev == nullptr && v->_next == nullptr)
+ {
+ assert(what.front == v);
+ assert(what.back == v);
+ what.front = what.back = nullptr;
+ }
+ else if(v->_prev == nullptr)
+ {
+ assert(what.front == v);
+ v->_next->_prev = nullptr;
+ what.front = v->_next;
+ v->_next = v->_prev = nullptr;
+ }
+ else if(v->_next == nullptr)
+ {
+ assert(what.back == v);
+ v->_prev->_next = nullptr;
+ what.back = v->_prev;
+ v->_next = v->_prev = nullptr;
+ }
+ else
+ {
+ v->_next->_prev = v->_prev;
+ v->_prev->_next = v->_next;
+ v->_next = v->_prev = nullptr;
+ }
+ what.count--;
+ }
+
+ result<void> _prepare_work_item_delay(dynamic_thread_pool_group::work_item *workitem, grouph_type grouph, deadline d)
+ {
+ if(!d)
+ {
+ return errc::invalid_argument;
+ }
+ if(workitem->_nextwork == 0 || d.nsecs > 0)
+ {
+ if(nullptr == workitem->_internaltimerh)
+ {
+ workitem->_internaltimerh = CreateThreadpoolTimer(_win32_timer_thread_callback, workitem, grouph);
+ if(nullptr == workitem->_internaltimerh)
+ {
+ return win32_error();
+ }
+ }
+ if(d.nsecs > 0)
+ {
+ if(d.steady)
+ {
+ workitem->_timepoint1 = std::chrono::steady_clock::now() + std::chrono::nanoseconds(d.nsecs);
+ }
+ else
+ {
+ workitem->_timepoint2 = d.to_time_point();
+ }
+ }
+ }
+ return success();
+ }
+
+ inline void _submit_work_item(_lock_guard &g, dynamic_thread_pool_group::work_item *workitem);
+
+ inline result<void> submit(_lock_guard &g, dynamic_thread_pool_group_impl *group, span<dynamic_thread_pool_group::work_item *> work) noexcept;
+
+ inline void _work_item_done(_lock_guard &g, dynamic_thread_pool_group::work_item *i) noexcept;
+ inline void _work_item_next(_lock_guard &g, dynamic_thread_pool_group::work_item *i) noexcept;
+
+ inline result<void> stop(_lock_guard &g, dynamic_thread_pool_group_impl *group, result<void> err) noexcept;
+ inline result<void> wait(_lock_guard &g, bool reap, const dynamic_thread_pool_group_impl *group, deadline d) noexcept;
+
+ inline void _timerthread(dynamic_thread_pool_group::work_item *workitem, threadh_type selfthreadh);
+ inline void _workerthread(dynamic_thread_pool_group::work_item *workitem, threadh_type selfthreadh);
+ };
+ struct global_dynamic_thread_pool_impl_thread_local_state_t
+ {
+ dynamic_thread_pool_group::work_item *workitem{nullptr};
+ global_dynamic_thread_pool_impl::threadh_type current_callback_instance{nullptr};
+ size_t nesting_level{0};
+ };
+ LLFIO_HEADERS_ONLY_FUNC_SPEC global_dynamic_thread_pool_impl_thread_local_state_t &global_dynamic_thread_pool_thread_local_state() noexcept
+ {
+ static thread_local global_dynamic_thread_pool_impl_thread_local_state_t tls;
+ return tls;
+ }
+
+ LLFIO_HEADERS_ONLY_FUNC_SPEC global_dynamic_thread_pool_impl &global_dynamic_thread_pool() noexcept
+ {
+ static global_dynamic_thread_pool_impl impl;
+ return impl;
+ }
+} // namespace detail
+
+
+class dynamic_thread_pool_group_impl final : public dynamic_thread_pool_group
+{
+ friend struct detail::global_dynamic_thread_pool_impl;
+
+ mutable std::mutex _lock;
+ using _lock_guard = detail::global_dynamic_thread_pool_impl::_lock_guard;
+ size_t _nesting_level{0};
+ struct workitems_t
+ {
+ size_t count{0};
+ dynamic_thread_pool_group::work_item *front{nullptr}, *back{nullptr};
+ } _work_items_active, _work_items_done, _work_items_delayed;
+ std::atomic<bool> _stopping{false}, _stopped{true}, _completing{false};
+ result<void> _abnormal_completion_cause{success()}; // The cause of any abnormal group completion
+
+#ifdef _WIN32
+ TP_CALLBACK_ENVIRON _callbackenviron;
+ PTP_CALLBACK_ENVIRON _grouph{&_callbackenviron};
+#endif
+
+public:
+ result<void> init()
+ {
+ LLFIO_LOG_FUNCTION_CALL(this);
+ try
+ {
+ auto &impl = detail::global_dynamic_thread_pool();
+ _nesting_level = detail::global_dynamic_thread_pool_thread_local_state().nesting_level;
+#ifdef _WIN32
+ InitializeThreadpoolEnvironment(_grouph);
+#else
+#endif
+ detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.workqueue_lock);
+ // Append this group to the global work queue at its nesting level
+ if(_nesting_level >= impl.workqueue.size())
+ {
+ impl.workqueue.resize(_nesting_level + 1);
+ }
+ impl.workqueue[_nesting_level].items.insert(this);
+ return success();
+ }
+ catch(...)
+ {
+ return error_from_exception();
+ }
+ }
+
+ virtual ~dynamic_thread_pool_group_impl()
+ {
+ LLFIO_LOG_FUNCTION_CALL(this);
+ (void) wait();
+ auto &impl = detail::global_dynamic_thread_pool();
+#ifdef _WIN32
+ if(nullptr != _grouph)
+ {
+ DestroyThreadpoolEnvironment(_grouph);
+ _grouph = nullptr;
+ }
+#else
+#endif
+ detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.workqueue_lock);
+ assert(impl.workqueue.size() > _nesting_level);
+ impl.workqueue[_nesting_level].items.erase(this);
+ while(!impl.workqueue.empty() && impl.workqueue.back().items.empty())
+ {
+ impl.workqueue.pop_back();
+ }
+ }
+
+ virtual result<void> submit(span<work_item *> work) noexcept override
+ {
+ LLFIO_LOG_FUNCTION_CALL(this);
+ if(_stopping.load(std::memory_order_relaxed))
+ {
+ return errc::operation_canceled;
+ }
+ _stopped.store(false, std::memory_order_release);
+ if(_completing.load(std::memory_order_relaxed))
+ {
+ for(auto *i : work)
+ {
+ i->_parent = this;
+ detail::global_dynamic_thread_pool_impl::_append_to_list(_work_items_delayed, i);
+ }
+ return success();
+ }
+ auto &impl = detail::global_dynamic_thread_pool();
+ _lock_guard g(_lock);
+ if(_work_items_active.count == 0 && _work_items_done.count == 0)
+ {
+ _abnormal_completion_cause = success();
+ }
+ OUTCOME_TRY(impl.submit(g, this, work));
+ if(_work_items_active.count == 0)
+ {
+ _stopped.store(true, std::memory_order_release);
+ }
+ return success();
+ }
+
+ virtual result<void> stop() noexcept override
+ {
+ LLFIO_LOG_FUNCTION_CALL(this);
+ if(_stopped.load(std::memory_order_relaxed))
+ {
+ return success();
+ }
+ auto &impl = detail::global_dynamic_thread_pool();
+ _lock_guard g(_lock);
+ return impl.stop(g, this, errc::operation_canceled);
+ }
+
+ virtual bool stopping() const noexcept override { return _stopping.load(std::memory_order_relaxed); }
+
+ virtual bool stopped() const noexcept override { return _stopped.load(std::memory_order_relaxed); }
+
+ virtual result<void> wait(deadline d = {}) const noexcept override
+ {
+ LLFIO_LOG_FUNCTION_CALL(this);
+ if(_stopped.load(std::memory_order_relaxed))
+ {
+ return success();
+ }
+ auto &impl = detail::global_dynamic_thread_pool();
+ _lock_guard g(_lock);
+ return impl.wait(g, true, this, d);
+ }
+};
+
+LLFIO_HEADERS_ONLY_MEMFUNC_SPEC size_t dynamic_thread_pool_group::current_nesting_level() noexcept
+{
+ return detail::global_dynamic_thread_pool_thread_local_state().nesting_level;
+}
+
+LLFIO_HEADERS_ONLY_MEMFUNC_SPEC dynamic_thread_pool_group::work_item *dynamic_thread_pool_group::current_work_item() noexcept
+{
+ return detail::global_dynamic_thread_pool_thread_local_state().workitem;
+}
+
+LLFIO_HEADERS_ONLY_FUNC_SPEC result<dynamic_thread_pool_group_ptr> make_dynamic_thread_pool_group() noexcept
+{
+ try
+ {
+ auto ret = std::make_unique<dynamic_thread_pool_group_impl>();
+ OUTCOME_TRY(ret->init());
+ return dynamic_thread_pool_group_ptr(std::move(ret));
+ }
+ catch(...)
+ {
+ return error_from_exception();
+ }
+}
+
+namespace detail
+{
+ inline void global_dynamic_thread_pool_impl::_submit_work_item(_lock_guard &g, dynamic_thread_pool_group::work_item *workitem)
+ {
+ (void) g;
+ if(workitem->_nextwork != -1 && !workitem->_parent->_stopping.load(std::memory_order_relaxed))
+ {
+ // If no work item for now, or there is a delay, schedule a timer
+ if(workitem->_nextwork == 0 || workitem->_timepoint1 != std::chrono::steady_clock::time_point() ||
+ workitem->_timepoint2 != std::chrono::system_clock::time_point())
+ {
+ assert(workitem->_internaltimerh != nullptr);
+#ifdef _WIN32
+ LARGE_INTEGER li;
+ if(workitem->_timepoint1 != std::chrono::steady_clock::time_point())
+ {
+ li.QuadPart = std::chrono::duration_cast<std::chrono::nanoseconds>(workitem->_timepoint1 - std::chrono::steady_clock::now()).count() / 100;
+ if(li.QuadPart < 0)
+ {
+ li.QuadPart = 0;
+ }
+ li.QuadPart = -li.QuadPart; // negative is relative
+ }
+ else if(workitem->_timepoint2 != std::chrono::system_clock::time_point())
+ {
+ li = windows_nt_kernel::from_timepoint(workitem->_timepoint2);
+ }
+ else
+ {
+ li.QuadPart = -1; // smallest possible non immediate duration from now
+ }
+ FILETIME ft;
+ ft.dwHighDateTime = (DWORD) li.HighPart;
+ ft.dwLowDateTime = li.LowPart;
+ // std::cout << "*** timer " << workitem << std::endl;
+ SetThreadpoolTimer((PTP_TIMER) workitem->_internaltimerh, &ft, 0, 1000);
+#else
+#endif
+ }
+ else
+ {
+#ifdef _WIN32
+ // Set the priority of the group according to distance from the top
+ TP_CALLBACK_PRIORITY priority = TP_CALLBACK_PRIORITY_LOW;
+ if(workqueue.size() - workitem->_parent->_nesting_level == 1)
+ {
+ priority = TP_CALLBACK_PRIORITY_HIGH;
+ }
+ else if(workqueue.size() - workitem->_parent->_nesting_level == 2)
+ {
+ priority = TP_CALLBACK_PRIORITY_NORMAL;
+ }
+ SetThreadpoolCallbackPriority(workitem->_parent->_grouph, priority);
+ // std::cout << "*** submit " << workitem << std::endl;
+ SubmitThreadpoolWork((PTP_WORK) workitem->_internalworkh);
+#else
+#endif
+ }
+ }
+ }
+
+ inline result<void> global_dynamic_thread_pool_impl::submit(_lock_guard &g, dynamic_thread_pool_group_impl *group,
+ span<dynamic_thread_pool_group::work_item *> work) noexcept
+ {
+ try
+ {
+ if(work.empty())
+ {
+ return success();
+ }
+ for(auto *i : work)
+ {
+ if(i->_parent != nullptr)
+ {
+ return errc::address_in_use;
+ }
+ }
+ auto uninit = make_scope_exit([&]() noexcept {
+ for(auto *i : work)
+ {
+ _remove_from_list(group->_work_items_active, i);
+#ifdef _WIN32
+ if(nullptr != i->_internaltimerh)
+ {
+ CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh);
+ i->_internaltimerh = nullptr;
+ }
+ if(nullptr != i->_internalworkh)
+ {
+ CloseThreadpoolWork((PTP_WORK) i->_internalworkh);
+ i->_internalworkh = nullptr;
+ }
+#else
+#endif
+ i->_parent = nullptr;
+ }
+ });
+ for(auto *i : work)
+ {
+ deadline d(std::chrono::seconds(0));
+ i->_parent = group;
+ i->_nextwork = i->next(d);
+ if(-1 == i->_nextwork)
+ {
+ _append_to_list(group->_work_items_done, i);
+ }
+ else
+ {
+#ifdef _WIN32
+ i->_internalworkh = CreateThreadpoolWork(_win32_worker_thread_callback, i, group->_grouph);
+ if(nullptr == i->_internalworkh)
+ {
+ return win32_error();
+ }
+#else
+#endif
+ OUTCOME_TRY(_prepare_work_item_delay(i, group->_grouph, d));
+ _append_to_list(group->_work_items_active, i);
+ }
+ }
+ uninit.release();
+ {
+ for(auto *i : work)
+ {
+ _submit_work_item(g, i);
+ }
+ }
+ return success();
+ }
+ catch(...)
+ {
+ return error_from_exception();
+ }
+ }
+
+ inline void global_dynamic_thread_pool_impl::_work_item_done(_lock_guard &g, dynamic_thread_pool_group::work_item *i) noexcept
+ {
+ (void) g;
+#ifdef _WIN32
+ if(1 == i->_internalworkh_inuse)
+ {
+ i->_internalworkh_inuse = 2;
+ }
+ else
+ {
+ if(i->_internaltimerh != nullptr)
+ {
+ CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh);
+ i->_internaltimerh = nullptr;
+ }
+ if(i->_internalworkh != nullptr)
+ {
+ CloseThreadpoolWork((PTP_WORK) i->_internalworkh);
+ i->_internalworkh = nullptr;
+ }
+ }
+#else
+#endif
+ _remove_from_list(i->_parent->_work_items_active, i);
+ _append_to_list(i->_parent->_work_items_done, i);
+ if(i->_parent->_work_items_active.count == 0)
+ {
+ auto *parent = i->_parent;
+ i = nullptr;
+ auto *v = parent->_work_items_done.front, *n = v;
+ for(; v != nullptr; v = n)
+ {
+ v->_parent = nullptr;
+ n = v->_next;
+ }
+ n = v = parent->_work_items_done.front;
+ parent->_work_items_done.front = parent->_work_items_done.back = nullptr;
+ parent->_work_items_done.count = 0;
+ parent->_stopping.store(false, std::memory_order_release);
+ parent->_stopped.store(true, std::memory_order_release);
+ parent->_completing.store(true, std::memory_order_release);
+ for(; v != nullptr; v = n)
+ {
+ n = v->_next;
+ v->group_complete(parent->_abnormal_completion_cause);
+ }
+ parent->_completing.store(false, std::memory_order_release);
+ // Did a least one group_complete() submit more work to myself?
+ while(parent->_work_items_delayed.count > 0)
+ {
+ i = parent->_work_items_delayed.front;
+ _remove_from_list(parent->_work_items_delayed, i);
+ auto r = submit(g, parent, {&i, 1});
+ if(!r)
+ {
+ parent->_work_items_delayed = {};
+ (void) stop(g, parent, std::move(r));
+ break;
+ }
+ }
+ }
+ }
+ inline void global_dynamic_thread_pool_impl::_work_item_next(_lock_guard &g, dynamic_thread_pool_group::work_item *workitem) noexcept
+ {
+ assert(workitem->_nextwork != -1);
+ if(workitem->_nextwork == 0)
+ {
+ deadline d(std::chrono::seconds(0));
+ workitem->_nextwork = workitem->next(d);
+ auto r = _prepare_work_item_delay(workitem, workitem->_parent->_grouph, d);
+ if(!r)
+ {
+ (void) stop(g, workitem->_parent, std::move(r));
+ _work_item_done(g, workitem);
+ return;
+ }
+ }
+ if(-1 == workitem->_nextwork)
+ {
+ _work_item_done(g, workitem);
+ return;
+ }
+ _submit_work_item(g, workitem);
+ }
+
+ inline result<void> global_dynamic_thread_pool_impl::stop(_lock_guard &g, dynamic_thread_pool_group_impl *group, result<void> err) noexcept
+ {
+ (void) g;
+ if(group->_abnormal_completion_cause)
+ {
+ group->_abnormal_completion_cause = std::move(err);
+ }
+ group->_stopping.store(true, std::memory_order_release);
+ return success();
+ }
+
+ inline result<void> global_dynamic_thread_pool_impl::wait(_lock_guard &g, bool reap, const dynamic_thread_pool_group_impl *group, deadline d) noexcept
+ {
+ LLFIO_DEADLINE_TO_SLEEP_INIT(d);
+ if(!d || d.nsecs > 0)
+ {
+ auto &tls = detail::global_dynamic_thread_pool_thread_local_state();
+#ifdef _WIN32
+ if(tls.current_callback_instance != nullptr)
+ {
+ // I am being called from within a thread worker. Tell
+ // the thread pool that I am not going to exit promptly.
+ CallbackMayRunLong(tls.current_callback_instance);
+ }
+#endif
+ // Is this a cancellation?
+ if(group->_stopping.load(std::memory_order_relaxed))
+ {
+ while(group->_work_items_active.count > 0)
+ {
+#ifdef _WIN32
+ auto *i = group->_work_items_active.front;
+ if(nullptr != i->_internalworkh)
+ {
+ i->_internalworkh_inuse = 1;
+ g.unlock();
+ WaitForThreadpoolWorkCallbacks((PTP_WORK) i->_internalworkh, true);
+ g.lock();
+ if(i->_internalworkh_inuse == 2)
+ {
+ CloseThreadpoolWork((PTP_WORK) i->_internalworkh);
+ i->_internalworkh = nullptr;
+ }
+ i->_internalworkh_inuse = 0;
+ }
+ if(nullptr != i->_internaltimerh)
+ {
+ i->_internalworkh_inuse = 1;
+ g.unlock();
+ WaitForThreadpoolTimerCallbacks((PTP_TIMER) i->_internaltimerh, true);
+ g.lock();
+ if(i->_internalworkh_inuse == 2)
+ {
+ CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh);
+ i->_internalworkh = nullptr;
+ }
+ i->_internalworkh_inuse = 0;
+ }
+ if(group->_work_items_active.count > 0 && group->_work_items_active.front == i)
+ {
+ // This item got cancelled before it started
+ _work_item_done(g, group->_work_items_active.front);
+ }
+#else
+#endif
+ }
+ assert(!group->_stopping.load(std::memory_order_relaxed));
+ }
+ else if(!d)
+ {
+ while(group->_work_items_active.count > 0)
+ {
+#ifdef _WIN32
+ auto *i = group->_work_items_active.front;
+ if(nullptr != i->_internalworkh)
+ {
+ i->_internalworkh_inuse = 1;
+ g.unlock();
+ WaitForThreadpoolWorkCallbacks((PTP_WORK) i->_internalworkh, false);
+ g.lock();
+ if(i->_internalworkh_inuse == 2)
+ {
+ CloseThreadpoolWork((PTP_WORK) i->_internalworkh);
+ i->_internalworkh = nullptr;
+ }
+ i->_internalworkh_inuse = 0;
+ }
+ if(nullptr != i->_internaltimerh)
+ {
+ i->_internalworkh_inuse = 1;
+ g.unlock();
+ WaitForThreadpoolTimerCallbacks((PTP_TIMER) i->_internaltimerh, false);
+ g.lock();
+ if(i->_internalworkh_inuse == 2)
+ {
+ CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh);
+ i->_internalworkh = nullptr;
+ }
+ i->_internalworkh_inuse = 0;
+ }
+#else
+#endif
+ }
+ }
+ else
+ {
+ while(group->_work_items_active.count > 0)
+ {
+ LLFIO_DEADLINE_TO_TIMEOUT_LOOP(d);
+ g.unlock();
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ g.lock();
+ }
+ }
+ }
+ if(group->_work_items_active.count > 0)
+ {
+ return errc::timed_out;
+ }
+ if(reap)
+ {
+ return std::move(group->_abnormal_completion_cause);
+ }
+ return success();
+ }
+
+ inline void global_dynamic_thread_pool_impl::_timerthread(dynamic_thread_pool_group::work_item *workitem, threadh_type /*unused*/)
+ {
+ LLFIO_LOG_FUNCTION_CALL(this);
+ // std::cout << "*** _timerthread " << workitem << std::endl;
+ _lock_guard g(workitem->_parent->_lock);
+ _work_item_next(g, workitem);
+ }
+
+ // Worker thread entry point
+ inline void global_dynamic_thread_pool_impl::_workerthread(dynamic_thread_pool_group::work_item *workitem, threadh_type selfthreadh)
+ {
+ LLFIO_LOG_FUNCTION_CALL(this);
+ // std::cout << "*** _workerthread " << workitem << " with work " << workitem->_nextwork << std::endl;
+ assert(workitem->_nextwork != -1);
+ assert(workitem->_nextwork != 0);
+ if(workitem->_parent->_stopped.load(std::memory_order_relaxed))
+ {
+ _lock_guard g(workitem->_parent->_lock);
+ _work_item_done(g, workitem);
+ return;
+ }
+ auto &tls = detail::global_dynamic_thread_pool_thread_local_state();
+ auto old_thread_local_state = tls;
+ tls.workitem = workitem;
+ tls.current_callback_instance = selfthreadh;
+ tls.nesting_level++;
+ auto r = (*workitem)(workitem->_nextwork);
+ workitem->_nextwork = 0; // call next() next time
+ tls = old_thread_local_state;
+ _lock_guard g(workitem->_parent->_lock);
+ if(!r)
+ {
+ (void) stop(g, workitem->_parent, std::move(r));
+ _work_item_done(g, workitem);
+ workitem = nullptr;
+ }
+ else
+ {
+ _work_item_next(g, workitem);
+ }
+ }
+} // namespace detail
+
+LLFIO_V2_NAMESPACE_END
diff --git a/include/llfio/v2.0/dynamic_thread_pool_group.hpp b/include/llfio/v2.0/dynamic_thread_pool_group.hpp
new file mode 100644
index 00000000..c216f023
--- /dev/null
+++ b/include/llfio/v2.0/dynamic_thread_pool_group.hpp
@@ -0,0 +1,311 @@
+/* Dynamic thread pool group
+(C) 2020 Niall Douglas <http://www.nedproductions.biz/> (9 commits)
+File Created: Dec 2020
+
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License in the accompanying file
+Licence.txt or at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+
+Distributed under the Boost Software License, Version 1.0.
+ (See accompanying file Licence.txt or copy at
+ http://www.boost.org/LICENSE_1_0.txt)
+*/
+
+#ifndef LLFIO_DYNAMIC_THREAD_POOL_GROUP_H
+#define LLFIO_DYNAMIC_THREAD_POOL_GROUP_H
+
+#include "deadline.h"
+
+#include <memory> // for unique_ptr and shared_ptr
+
+#ifdef _MSC_VER
+#pragma warning(push)
+#pragma warning(disable : 4251) // dll interface
+#endif
+
+LLFIO_V2_NAMESPACE_EXPORT_BEGIN
+
+class dynamic_thread_pool_group_impl;
+
+namespace detail
+{
+ struct global_dynamic_thread_pool_impl;
+ LLFIO_HEADERS_ONLY_FUNC_SPEC global_dynamic_thread_pool_impl &global_dynamic_thread_pool() noexcept;
+} // namespace detail
+
+/*! \class dynamic_thread_pool_group
+\brief Work group within the global dynamic thread pool.
+
+Some operating systems provide a per-process global kernel thread pool capable of
+dynamically adjusting its kernel thread count to how many of the threads in
+the pool are currently blocked. The platform will choose the exact strategy used,
+but as an example of a strategy, one might keep creating new kernel threads
+so long as the total threads currently running and not blocked on page faults,
+i/o or syscalls, is below the hardware concurrency. Similarly, if more threads
+are running and not blocked than hardware concurrency, one might remove kernel
+threads from executing work. Such a strategy would dynamically increase
+concurrency until all CPUs are busy, but reduce concurrency if more work is
+being done than CPUs available.
+
+Such dynamic kernel thread pools are excellent for CPU bound processing, you
+simply fire and forget work into them. However, for i/o bound processing, you
+must be careful as there are gotchas. For non-seekable i/o, it is very possible
+that there could be 100k handles upon which we do i/o. Doing i/o on
+100k handles using a dynamic thread pool would in theory cause the creation
+of 100k kernel threads, which would not be wise. A much better solution is
+to use an `io_multiplexer` to await changes in large sets of i/o handles.
+
+For seekable i/o, the same problem applies, but worse again: an i/o bound problem
+would cause a rapid increase in the number of kernel threads, which by
+definition makes i/o even more congested. Basically the system runs off
+into pathological performance loss. You must therefore never naively do
+i/o bound work (e.g. with memory mapped files) from within a dynamic thread
+pool without employing some mechanism to force concurrency downwards if
+the backing storage is congested.
+
+## Work groups
+
+Instances of this class contain zero or more work items. Each work item
+is asked for its next item of work, and if an item of work is available,
+that item of work is executed by the global kernel thread pool at a time
+of its choosing. It is NEVER possible that any one work item is concurrently
+executed at a time, each work item is always sequentially executed with
+respect to itself. The only concurrency possible is *across* work items.
+Therefore, if you want to execute the same piece of code concurrently,
+you need to submit a separate work item for each possible amount of
+concurrency (e.g. `std::thread::hardware_concurrency()`).
+
+You can have as many or as few items of work as you like. You can
+dynamically submit additional work items at any time. The group of work items can
+be waited upon to complete, after which the work group becomes reset as
+if back to freshly constructed. You can also stop executing all the work
+items in the group, even if they have not fully completed. If any work
+item returns a failure, this equals a `stop()`, and the next `wait()` will
+return that error.
+
+Work items may create sub work groups as part of their
+operation. If they do so, the work items from such nested work groups are
+scheduled preferentially. This ensures good forward progress, so if you
+have 100 work items each of which do another 100 work items, you don't get
+10,000 slowly progressing work. Rather, the work items in the first set
+progress slowly, whereas the work items in the second set progress quickly.
+
+`work_item::next()` may optionally set a deadline to delay when that work
+item ought to be processed again. Deadlines can be relative or absolute.
+
+## C++ 23 Executors
+
+As with elsewhere in LLFIO, as a low level facility, we don't implement
+https://wg21.link/P0443 Executors, but it is trivially easy to implement
+a dynamic equivalent to `std::static_thread_pool` using this class.
+
+## Implementation notes
+
+### Microsoft Windows
+
+On Microsoft Windows, the Win32 thread pool API is used (https://docs.microsoft.com/en-us/windows/win32/procthread/thread-pool-api).
+This is an IOCP-aware thread pool which will dynamically increase the number
+of kernel threads until none are blocked. If more kernel threads
+are running than twice the number of CPUs in the system, the number of kernel
+threads is dynamically reduced. The maximum number of kernel threads which
+will run simultaneously is 500. Note that the Win32 thread pool is shared
+across the process by multiple Windows facilities.
+
+Note that the Win32 thread pool has built in support for IOCP, so if you
+have a custom i/o multiplexer, you can use the global Win32 thread pool
+to execute i/o completions handling. See `CreateThreadpoolIo()` for more.
+
+No dynamic memory allocation is performed by this implementation outside
+of the initial `make_dynamic_thread_pool_group()`. The Win32 thread pool
+API may perform dynamic memory allocation internally, but that is outside
+our control.
+
+### Linux
+
+On Linux, a similar strategy to Microsoft Windows' approach is used. We
+dynamically increase the number of kernel threads until none are sleeping
+awaiting i/o. If more kernel threads are running than 1.5x the number of
+CPUs in the system, the number of kernel threads is dynamically reduced.
+For portability, we also gate the maximum number of kernel threads to 500.
+Note that **all** the kernel threads for the current process are considered,
+not just the kernel threads created by this thread pool implementation.
+Therefore, if you have alternative thread pool implementations (e.g. OpenMP,
+`std::async`), those are also included in the dynamic adjustment.
+
+As this is wholly implemented by this library, dynamic memory allocation
+occurs in the initial `make_dynamic_thread_pool_group()`, but otherwise
+the implementation does not perform dynamic memory allocations.
+*/
+class LLFIO_DECL dynamic_thread_pool_group
+{
+public:
+ //! An individual item of work within the work group.
+ class work_item
+ {
+ friend struct detail::global_dynamic_thread_pool_impl;
+ friend class dynamic_thread_pool_group_impl;
+ dynamic_thread_pool_group_impl *_parent{nullptr};
+ void *_internalworkh{nullptr}, *_internaltimerh{nullptr};
+ work_item *_prev{nullptr}, *_next{nullptr};
+ intptr_t _nextwork{0};
+ std::chrono::steady_clock::time_point _timepoint1;
+ std::chrono::system_clock::time_point _timepoint2;
+ int _internalworkh_inuse{0};
+
+ protected:
+ work_item() = default;
+ work_item(const work_item &) = default;
+ work_item(work_item &&) = default;
+ work_item &operator=(const work_item &) = default;
+ work_item &operator=(work_item &&) = default;
+
+ public:
+ virtual ~work_item() {}
+
+ //! Returns the parent work group between successful submission and just before `group_complete()`.
+ dynamic_thread_pool_group *parent() const noexcept { return reinterpret_cast<dynamic_thread_pool_group *>(_parent); }
+
+ /*! Invoked by the i/o thread pool to determine if this work item
+ has more work to do.
+
+ \return If there is no work _currently_ available to do, but there
+ might be some later, you should return zero. You will be called again
+ later after other work has been done. If you return -1, you are
+ saying that no further work will be done, and the group need never
+ call you again. If you have more work you want to do, return any
+ other value.
+ \param d Optional delay before the next item of work ought to be
+ executed (return != 0), or `next()` ought to be called again to
+ determine the next item (return == 0). On entry `d` is set to no
+ delay, so if you don't modify it, the next item of work occurs
+ as soon as possible.
+
+ Note that this function is called from multiple kernel threads.
+ You must NOT do any significant work in this function.
+ In particular do NOT call any dynamic thread pool group function,
+ as you will experience deadlock.
+
+ `dynamic_thread_pool_group::current_work_item()` may have any
+ value during this call.
+ */
+ virtual intptr_t next(deadline &d) noexcept = 0;
+
+ /*! Invoked by the i/o thread pool to perform the next item of work.
+
+ \return Any failure causes all remaining work in this group to
+ be cancelled as soon as possible.
+ \param work The value returned by `next()`.
+
+ Note that this function is called from multiple kernel threads,
+ and may not be the kernel thread from which `next()`
+ was called.
+
+ `dynamic_thread_pool_group::current_work_item()` will always be
+ `this` during this call.
+ */
+ virtual result<void> operator()(intptr_t work) noexcept = 0;
+
+ /*! Invoked by the i/o thread pool when all work in this thread
+ pool group is complete.
+
+ `cancelled` indicates if this is an abnormal completion. If its
+ error compares equal to `errc::operation_cancelled`, then `stop()`
+ was called.
+
+ Just before this is called for all work items submitted, the group
+ becomes reset to fresh, and `parent()` becomes null. You can resubmit
+ this work item, but do not submit other work items until their
+ `group_complete()` has been invoked.
+
+ Note that this function is called from multiple kernel threads.
+
+ `dynamic_thread_pool_group::current_work_item()` may have any
+ value during this call.
+ */
+ virtual void group_complete(const result<void> &cancelled) noexcept { (void) cancelled; }
+ };
+
+ virtual ~dynamic_thread_pool_group() {}
+
+ /*! \brief Threadsafe. Submit one or more work items for execution. Note that you can submit more later.
+
+ Note that if the group is currently stopping, you cannot submit more
+ work until the group has stopped. An error code comparing equal to
+ `errc::operation_canceled` is returned if you try.
+ */
+ virtual result<void> submit(span<work_item *> work) noexcept = 0;
+
+ //! Threadsafe. Cancel any remaining work previously submitted, but without blocking (use `wait()` to block).
+ virtual result<void> stop() noexcept = 0;
+
+ /*! \brief Threadsafe. True if a work item reported an error, or
+ `stop()` was called, but work items are still running.
+ */
+ virtual bool stopping() const noexcept = 0;
+
+ //! Threadsafe. True if all the work previously submitted is complete.
+ virtual bool stopped() const noexcept = 0;
+
+ //! Threadsafe. Wait for work previously submitted to complete, returning any failures by any work item.
+ virtual result<void> wait(deadline d = {}) const noexcept = 0;
+ //! \overload
+ template <class Rep, class Period> result<bool> wait_for(const std::chrono::duration<Rep, Period> &duration) const noexcept
+ {
+ auto r = wait(duration);
+ if(!r && r.error() == errc::timed_out)
+ {
+ return false;
+ }
+ OUTCOME_TRY(std::move(r));
+ return true;
+ }
+ //! \overload
+ template <class Clock, class Duration> result<bool> wait_until(const std::chrono::time_point<Clock, Duration> &timeout) const noexcept
+ {
+ auto r = wait(timeout);
+ if(!r && r.error() == errc::timed_out)
+ {
+ return false;
+ }
+ OUTCOME_TRY(std::move(r));
+ return true;
+ }
+
+ //! Returns the work item nesting level which would be used if a new dynamic thread pool group were created within the current work item.
+ static LLFIO_HEADERS_ONLY_MEMFUNC_SPEC size_t current_nesting_level() noexcept;
+ //! Returns the work item the calling thread is running within, if any.
+ static LLFIO_HEADERS_ONLY_MEMFUNC_SPEC work_item *current_work_item() noexcept;
+};
+//! A unique ptr to a work group within the global dynamic thread pool.
+using dynamic_thread_pool_group_ptr = std::unique_ptr<dynamic_thread_pool_group>;
+
+//! Creates a new work group within the global dynamic thread pool.
+LLFIO_HEADERS_ONLY_FUNC_SPEC result<dynamic_thread_pool_group_ptr> make_dynamic_thread_pool_group() noexcept;
+
+// BEGIN make_free_functions.py
+// END make_free_functions.py
+
+LLFIO_V2_NAMESPACE_END
+
+#ifdef _MSC_VER
+#pragma warning(pop)
+#endif
+
+#if LLFIO_HEADERS_ONLY == 1 && !defined(DOXYGEN_SHOULD_SKIP_THIS)
+#define LLFIO_INCLUDED_BY_HEADER 1
+#include "detail/impl/dynamic_thread_pool_group.ipp"
+#undef LLFIO_INCLUDED_BY_HEADER
+#endif
+
+#endif
diff --git a/include/llfio/v2.0/llfio.hpp b/include/llfio/v2.0/llfio.hpp
index 9cbe624d..5c0874de 100644
--- a/include/llfio/v2.0/llfio.hpp
+++ b/include/llfio/v2.0/llfio.hpp
@@ -63,9 +63,9 @@ import LLFIO_MODULE_NAME;
#include "utils.hpp"
#include "directory_handle.hpp"
+#include "dynamic_thread_pool_group.hpp"
#include "fast_random_file_handle.hpp"
#include "file_handle.hpp"
-//#include "io_thread_pool_group.hpp"
#include "process_handle.hpp"
#include "statfs.hpp"
#ifdef LLFIO_INCLUDE_STORAGE_PROFILE
diff --git a/include/llfio/v2.0/logging.hpp b/include/llfio/v2.0/logging.hpp
index 1dff759a..7ef30363 100644
--- a/include/llfio/v2.0/logging.hpp
+++ b/include/llfio/v2.0/logging.hpp
@@ -77,7 +77,7 @@ public:
{
reinterpret_cast<log_level &>(detail::thread_local_log_level()) = n;
}
- ~log_level_guard() {reinterpret_cast<log_level &>(detail::thread_local_log_level()) = _v; }
+ ~log_level_guard() { reinterpret_cast<log_level &>(detail::thread_local_log_level()) = _v; }
};
// Infrastructure for recording the current path for when failure occurs
@@ -262,8 +262,9 @@ namespace detail
return span<char>(buffer, length);
}
// Strips a __PRETTY_FUNCTION__ of all instances of ::LLFIO_V2_NAMESPACE:: and ::LLFIO_V2_NAMESPACE::
- inline void strip_pretty_function(char *out, size_t bytes, const char *in)
+ inline void strip_pretty_function(char *_out, size_t bytes, const char *in)
{
+ char *out = _out;
const span<char> remove1 = llfio_namespace_string();
const span<char> remove2 = outcome_namespace_string();
for(--bytes; bytes && *in; --bytes)
@@ -272,6 +273,32 @@ namespace detail
in += remove1.size();
if(!strncmp(in, remove2.data(), remove2.size()))
in += remove2.size();
+ if(!strncmp(in, "basic_result<", 13))
+ {
+ int count = 13;
+ for(--bytes; bytes && *in && count; --bytes, --count)
+ {
+ *out++ = *in++;
+ }
+ if(!*in || bytes ==0)
+ {
+ break;
+ }
+ count = 1;
+ while(*in && count > 0)
+ {
+ if(*in == '<')
+ {
+ count++;
+ }
+ else if(*in == '>')
+ {
+ count--;
+ }
+ in++;
+ }
+ in--;
+ }
*out++ = *in++;
}
*out = 0;
diff --git a/include/llfio/v2.0/status_code.hpp b/include/llfio/v2.0/status_code.hpp
index 80fd905d..03eb0f67 100644
--- a/include/llfio/v2.0/status_code.hpp
+++ b/include/llfio/v2.0/status_code.hpp
@@ -25,8 +25,6 @@ Distributed under the Boost Software License, Version 1.0.
#ifndef LLFIO_STATUS_CODE_HPP
#define LLFIO_STATUS_CODE_HPP
-#include "logging.hpp"
-
/* The SG14 status code implementation is quite profoundly different to the
error code implementation. In the error code implementation, std::error_code
is fixed by the standard library, so we wrap it with extra metadata into
@@ -68,6 +66,8 @@ as that (a) enables safe header only LLFIO on Windows (b) produces better codege
#error LLFIO needs Outcome v2.2 or higher
#endif
+#include "logging.hpp"
+
LLFIO_V2_NAMESPACE_BEGIN
#ifndef LLFIO_DISABLE_PATHS_IN_FAILURE_INFO
@@ -351,6 +351,8 @@ LLFIO_V2_NAMESPACE_END
#error LLFIO needs Outcome v2.2 or higher
#endif
+#include "logging.hpp"
+
LLFIO_V2_NAMESPACE_BEGIN
namespace detail