diff options
Diffstat (limited to 'include/llfio/v2.0/dynamic_thread_pool_group.hpp')
-rw-r--r-- | include/llfio/v2.0/dynamic_thread_pool_group.hpp | 311 |
1 files changed, 311 insertions, 0 deletions
diff --git a/include/llfio/v2.0/dynamic_thread_pool_group.hpp b/include/llfio/v2.0/dynamic_thread_pool_group.hpp new file mode 100644 index 00000000..c216f023 --- /dev/null +++ b/include/llfio/v2.0/dynamic_thread_pool_group.hpp @@ -0,0 +1,311 @@ +/* Dynamic thread pool group +(C) 2020 Niall Douglas <http://www.nedproductions.biz/> (9 commits) +File Created: Dec 2020 + + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License in the accompanying file +Licence.txt or at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + + +Distributed under the Boost Software License, Version 1.0. + (See accompanying file Licence.txt or copy at + http://www.boost.org/LICENSE_1_0.txt) +*/ + +#ifndef LLFIO_DYNAMIC_THREAD_POOL_GROUP_H +#define LLFIO_DYNAMIC_THREAD_POOL_GROUP_H + +#include "deadline.h" + +#include <memory> // for unique_ptr and shared_ptr + +#ifdef _MSC_VER +#pragma warning(push) +#pragma warning(disable : 4251) // dll interface +#endif + +LLFIO_V2_NAMESPACE_EXPORT_BEGIN + +class dynamic_thread_pool_group_impl; + +namespace detail +{ + struct global_dynamic_thread_pool_impl; + LLFIO_HEADERS_ONLY_FUNC_SPEC global_dynamic_thread_pool_impl &global_dynamic_thread_pool() noexcept; +} // namespace detail + +/*! \class dynamic_thread_pool_group +\brief Work group within the global dynamic thread pool. + +Some operating systems provide a per-process global kernel thread pool capable of +dynamically adjusting its kernel thread count to how many of the threads in +the pool are currently blocked. The platform will choose the exact strategy used, +but as an example of a strategy, one might keep creating new kernel threads +so long as the total threads currently running and not blocked on page faults, +i/o or syscalls, is below the hardware concurrency. Similarly, if more threads +are running and not blocked than hardware concurrency, one might remove kernel +threads from executing work. Such a strategy would dynamically increase +concurrency until all CPUs are busy, but reduce concurrency if more work is +being done than CPUs available. + +Such dynamic kernel thread pools are excellent for CPU bound processing, you +simply fire and forget work into them. However, for i/o bound processing, you +must be careful as there are gotchas. For non-seekable i/o, it is very possible +that there could be 100k handles upon which we do i/o. Doing i/o on +100k handles using a dynamic thread pool would in theory cause the creation +of 100k kernel threads, which would not be wise. A much better solution is +to use an `io_multiplexer` to await changes in large sets of i/o handles. + +For seekable i/o, the same problem applies, but worse again: an i/o bound problem +would cause a rapid increase in the number of kernel threads, which by +definition makes i/o even more congested. Basically the system runs off +into pathological performance loss. You must therefore never naively do +i/o bound work (e.g. with memory mapped files) from within a dynamic thread +pool without employing some mechanism to force concurrency downwards if +the backing storage is congested. + +## Work groups + +Instances of this class contain zero or more work items. Each work item +is asked for its next item of work, and if an item of work is available, +that item of work is executed by the global kernel thread pool at a time +of its choosing. It is NEVER possible that any one work item is concurrently +executed at a time, each work item is always sequentially executed with +respect to itself. The only concurrency possible is *across* work items. +Therefore, if you want to execute the same piece of code concurrently, +you need to submit a separate work item for each possible amount of +concurrency (e.g. `std::thread::hardware_concurrency()`). + +You can have as many or as few items of work as you like. You can +dynamically submit additional work items at any time. The group of work items can +be waited upon to complete, after which the work group becomes reset as +if back to freshly constructed. You can also stop executing all the work +items in the group, even if they have not fully completed. If any work +item returns a failure, this equals a `stop()`, and the next `wait()` will +return that error. + +Work items may create sub work groups as part of their +operation. If they do so, the work items from such nested work groups are +scheduled preferentially. This ensures good forward progress, so if you +have 100 work items each of which do another 100 work items, you don't get +10,000 slowly progressing work. Rather, the work items in the first set +progress slowly, whereas the work items in the second set progress quickly. + +`work_item::next()` may optionally set a deadline to delay when that work +item ought to be processed again. Deadlines can be relative or absolute. + +## C++ 23 Executors + +As with elsewhere in LLFIO, as a low level facility, we don't implement +https://wg21.link/P0443 Executors, but it is trivially easy to implement +a dynamic equivalent to `std::static_thread_pool` using this class. + +## Implementation notes + +### Microsoft Windows + +On Microsoft Windows, the Win32 thread pool API is used (https://docs.microsoft.com/en-us/windows/win32/procthread/thread-pool-api). +This is an IOCP-aware thread pool which will dynamically increase the number +of kernel threads until none are blocked. If more kernel threads +are running than twice the number of CPUs in the system, the number of kernel +threads is dynamically reduced. The maximum number of kernel threads which +will run simultaneously is 500. Note that the Win32 thread pool is shared +across the process by multiple Windows facilities. + +Note that the Win32 thread pool has built in support for IOCP, so if you +have a custom i/o multiplexer, you can use the global Win32 thread pool +to execute i/o completions handling. See `CreateThreadpoolIo()` for more. + +No dynamic memory allocation is performed by this implementation outside +of the initial `make_dynamic_thread_pool_group()`. The Win32 thread pool +API may perform dynamic memory allocation internally, but that is outside +our control. + +### Linux + +On Linux, a similar strategy to Microsoft Windows' approach is used. We +dynamically increase the number of kernel threads until none are sleeping +awaiting i/o. If more kernel threads are running than 1.5x the number of +CPUs in the system, the number of kernel threads is dynamically reduced. +For portability, we also gate the maximum number of kernel threads to 500. +Note that **all** the kernel threads for the current process are considered, +not just the kernel threads created by this thread pool implementation. +Therefore, if you have alternative thread pool implementations (e.g. OpenMP, +`std::async`), those are also included in the dynamic adjustment. + +As this is wholly implemented by this library, dynamic memory allocation +occurs in the initial `make_dynamic_thread_pool_group()`, but otherwise +the implementation does not perform dynamic memory allocations. +*/ +class LLFIO_DECL dynamic_thread_pool_group +{ +public: + //! An individual item of work within the work group. + class work_item + { + friend struct detail::global_dynamic_thread_pool_impl; + friend class dynamic_thread_pool_group_impl; + dynamic_thread_pool_group_impl *_parent{nullptr}; + void *_internalworkh{nullptr}, *_internaltimerh{nullptr}; + work_item *_prev{nullptr}, *_next{nullptr}; + intptr_t _nextwork{0}; + std::chrono::steady_clock::time_point _timepoint1; + std::chrono::system_clock::time_point _timepoint2; + int _internalworkh_inuse{0}; + + protected: + work_item() = default; + work_item(const work_item &) = default; + work_item(work_item &&) = default; + work_item &operator=(const work_item &) = default; + work_item &operator=(work_item &&) = default; + + public: + virtual ~work_item() {} + + //! Returns the parent work group between successful submission and just before `group_complete()`. + dynamic_thread_pool_group *parent() const noexcept { return reinterpret_cast<dynamic_thread_pool_group *>(_parent); } + + /*! Invoked by the i/o thread pool to determine if this work item + has more work to do. + + \return If there is no work _currently_ available to do, but there + might be some later, you should return zero. You will be called again + later after other work has been done. If you return -1, you are + saying that no further work will be done, and the group need never + call you again. If you have more work you want to do, return any + other value. + \param d Optional delay before the next item of work ought to be + executed (return != 0), or `next()` ought to be called again to + determine the next item (return == 0). On entry `d` is set to no + delay, so if you don't modify it, the next item of work occurs + as soon as possible. + + Note that this function is called from multiple kernel threads. + You must NOT do any significant work in this function. + In particular do NOT call any dynamic thread pool group function, + as you will experience deadlock. + + `dynamic_thread_pool_group::current_work_item()` may have any + value during this call. + */ + virtual intptr_t next(deadline &d) noexcept = 0; + + /*! Invoked by the i/o thread pool to perform the next item of work. + + \return Any failure causes all remaining work in this group to + be cancelled as soon as possible. + \param work The value returned by `next()`. + + Note that this function is called from multiple kernel threads, + and may not be the kernel thread from which `next()` + was called. + + `dynamic_thread_pool_group::current_work_item()` will always be + `this` during this call. + */ + virtual result<void> operator()(intptr_t work) noexcept = 0; + + /*! Invoked by the i/o thread pool when all work in this thread + pool group is complete. + + `cancelled` indicates if this is an abnormal completion. If its + error compares equal to `errc::operation_cancelled`, then `stop()` + was called. + + Just before this is called for all work items submitted, the group + becomes reset to fresh, and `parent()` becomes null. You can resubmit + this work item, but do not submit other work items until their + `group_complete()` has been invoked. + + Note that this function is called from multiple kernel threads. + + `dynamic_thread_pool_group::current_work_item()` may have any + value during this call. + */ + virtual void group_complete(const result<void> &cancelled) noexcept { (void) cancelled; } + }; + + virtual ~dynamic_thread_pool_group() {} + + /*! \brief Threadsafe. Submit one or more work items for execution. Note that you can submit more later. + + Note that if the group is currently stopping, you cannot submit more + work until the group has stopped. An error code comparing equal to + `errc::operation_canceled` is returned if you try. + */ + virtual result<void> submit(span<work_item *> work) noexcept = 0; + + //! Threadsafe. Cancel any remaining work previously submitted, but without blocking (use `wait()` to block). + virtual result<void> stop() noexcept = 0; + + /*! \brief Threadsafe. True if a work item reported an error, or + `stop()` was called, but work items are still running. + */ + virtual bool stopping() const noexcept = 0; + + //! Threadsafe. True if all the work previously submitted is complete. + virtual bool stopped() const noexcept = 0; + + //! Threadsafe. Wait for work previously submitted to complete, returning any failures by any work item. + virtual result<void> wait(deadline d = {}) const noexcept = 0; + //! \overload + template <class Rep, class Period> result<bool> wait_for(const std::chrono::duration<Rep, Period> &duration) const noexcept + { + auto r = wait(duration); + if(!r && r.error() == errc::timed_out) + { + return false; + } + OUTCOME_TRY(std::move(r)); + return true; + } + //! \overload + template <class Clock, class Duration> result<bool> wait_until(const std::chrono::time_point<Clock, Duration> &timeout) const noexcept + { + auto r = wait(timeout); + if(!r && r.error() == errc::timed_out) + { + return false; + } + OUTCOME_TRY(std::move(r)); + return true; + } + + //! Returns the work item nesting level which would be used if a new dynamic thread pool group were created within the current work item. + static LLFIO_HEADERS_ONLY_MEMFUNC_SPEC size_t current_nesting_level() noexcept; + //! Returns the work item the calling thread is running within, if any. + static LLFIO_HEADERS_ONLY_MEMFUNC_SPEC work_item *current_work_item() noexcept; +}; +//! A unique ptr to a work group within the global dynamic thread pool. +using dynamic_thread_pool_group_ptr = std::unique_ptr<dynamic_thread_pool_group>; + +//! Creates a new work group within the global dynamic thread pool. +LLFIO_HEADERS_ONLY_FUNC_SPEC result<dynamic_thread_pool_group_ptr> make_dynamic_thread_pool_group() noexcept; + +// BEGIN make_free_functions.py +// END make_free_functions.py + +LLFIO_V2_NAMESPACE_END + +#ifdef _MSC_VER +#pragma warning(pop) +#endif + +#if LLFIO_HEADERS_ONLY == 1 && !defined(DOXYGEN_SHOULD_SKIP_THIS) +#define LLFIO_INCLUDED_BY_HEADER 1 +#include "detail/impl/dynamic_thread_pool_group.ipp" +#undef LLFIO_INCLUDED_BY_HEADER +#endif + +#endif |