Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/windirstat/llfio.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'include/llfio/v2.0/dynamic_thread_pool_group.hpp')
-rw-r--r--include/llfio/v2.0/dynamic_thread_pool_group.hpp530
1 files changed, 530 insertions, 0 deletions
diff --git a/include/llfio/v2.0/dynamic_thread_pool_group.hpp b/include/llfio/v2.0/dynamic_thread_pool_group.hpp
new file mode 100644
index 00000000..09828a6e
--- /dev/null
+++ b/include/llfio/v2.0/dynamic_thread_pool_group.hpp
@@ -0,0 +1,530 @@
+/* Dynamic thread pool group
+(C) 2020-2021 Niall Douglas <http://www.nedproductions.biz/> (9 commits)
+File Created: Dec 2020
+
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License in the accompanying file
+Licence.txt or at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+
+Distributed under the Boost Software License, Version 1.0.
+ (See accompanying file Licence.txt or copy at
+ http://www.boost.org/LICENSE_1_0.txt)
+*/
+
+#ifndef LLFIO_DYNAMIC_THREAD_POOL_GROUP_H
+#define LLFIO_DYNAMIC_THREAD_POOL_GROUP_H
+
+#include "deadline.h"
+
+#include <memory> // for unique_ptr and shared_ptr
+
+#ifdef _MSC_VER
+#pragma warning(push)
+#pragma warning(disable : 4251) // dll interface
+#pragma warning(disable : 4275) // dll interface
+#endif
+
+LLFIO_V2_NAMESPACE_EXPORT_BEGIN
+
+class dynamic_thread_pool_group_impl;
+class io_handle;
+
+namespace detail
+{
+ struct global_dynamic_thread_pool_impl;
+ struct global_dynamic_thread_pool_impl_workqueue_item;
+ LLFIO_HEADERS_ONLY_FUNC_SPEC global_dynamic_thread_pool_impl &global_dynamic_thread_pool() noexcept;
+} // namespace detail
+
+/*! \class dynamic_thread_pool_group
+\brief Work group within the global dynamic thread pool.
+
+Some operating systems provide a per-process global kernel thread pool capable of
+dynamically adjusting its kernel thread count to how many of the threads in
+the pool are currently blocked. The platform will choose the exact strategy used,
+but as an example of a strategy, one might keep creating new kernel threads
+so long as the total threads currently running and not blocked on page faults,
+i/o or syscalls, is below the hardware concurrency. Similarly, if more threads
+are running and not blocked than hardware concurrency, one might remove kernel
+threads from executing work. Such a strategy would dynamically increase
+concurrency until all CPUs are busy, but reduce concurrency if more work is
+being done than CPUs available.
+
+Such dynamic kernel thread pools are excellent for CPU bound processing, you
+simply fire and forget work into them. However, for i/o bound processing, you
+must be careful as there are gotchas. For non-seekable i/o, it is very possible
+that there could be 100k handles upon which we do i/o. Doing i/o on
+100k handles using a dynamic thread pool would in theory cause the creation
+of 100k kernel threads, which would not be wise. A much better solution is
+to use an `io_multiplexer` to await changes in large sets of i/o handles.
+
+For seekable i/o, the same problem applies, but worse again: an i/o bound problem
+would cause a rapid increase in the number of kernel threads, which by
+definition makes i/o even more congested. Basically the system runs off
+into pathological performance loss. You must therefore never naively do
+i/o bound work (e.g. with memory mapped files) from within a dynamic thread
+pool without employing some mechanism to force concurrency downwards if
+the backing storage is congested.
+
+## Work groups
+
+Instances of this class contain zero or more work items. Each work item
+is asked for its next item of work, and if an item of work is available,
+that item of work is executed by the global kernel thread pool at a time
+of its choosing. It is NEVER possible that any one work item is concurrently
+executed at a time, each work item is always sequentially executed with
+respect to itself. The only concurrency possible is *across* work items.
+Therefore, if you want to execute the same piece of code concurrently,
+you need to submit a separate work item for each possible amount of
+concurrency (e.g. `std::thread::hardware_concurrency()`).
+
+You can have as many or as few items of work as you like. You can
+dynamically submit additional work items at any time, except when a group
+is currently in the process of being stopped. The group of work items can
+be waited upon to complete, after which the work group becomes reset as
+if back to freshly constructed. You can also stop executing all the work
+items in the group, even if they have not fully completed. If any work
+item returns a failure, this equals a `stop()`, and the next `wait()` will
+return that error.
+
+Work items may create sub work groups as part of their
+operation. If they do so, the work items from such nested work groups are
+scheduled preferentially. This ensures good forward progress, so if you
+have 100 work items each of which do another 100 work items, you don't get
+10,000 slowly progressing work. Rather, the work items in the first set
+progress slowly, whereas the work items in the second set progress quickly.
+
+`work_item::next()` may optionally set a deadline to delay when that work
+item ought to be processed again. Deadlines can be relative or absolute.
+
+## C++ 23 Executors
+
+As with elsewhere in LLFIO, as a low level facility, we don't implement
+https://wg21.link/P0443 Executors, but it is trivially easy to implement
+a dynamic equivalent to `std::static_thread_pool` using this class.
+
+## Implementation notes
+
+### Microsoft Windows
+
+On Microsoft Windows, the Win32 thread pool API is used (https://docs.microsoft.com/en-us/windows/win32/procthread/thread-pool-api).
+This is an IOCP-aware thread pool which will dynamically increase the number
+of kernel threads until none are blocked. If more kernel threads
+are running than twice the number of CPUs in the system, the number of kernel
+threads is dynamically reduced. The maximum number of kernel threads which
+will run simultaneously is 500. Note that the Win32 thread pool is shared
+across the process by multiple Windows facilities.
+
+Note that the Win32 thread pool has built in support for IOCP, so if you
+have a custom i/o multiplexer, you can use the global Win32 thread pool
+to execute i/o completions handling. See `CreateThreadpoolIo()` for more.
+
+No dynamic memory allocation is performed by this implementation outside
+of the initial `make_dynamic_thread_pool_group()`. The Win32 thread pool
+API may perform dynamic memory allocation internally, but that is outside
+our control.
+
+Overhead of LLFIO above the Win32 thread pool API is very low, statistically
+unmeasurable.
+
+### POSIX
+
+If not on Linux, you will need libdispatch which is detected by LLFIO cmake
+during configuration. libdispatch is better known as
+Grand Central Dispatch, originally a Mac OS technology but since ported
+to a high quality kernel based implementation on recent FreeBSDs, and to
+a lower quality userspace based implementation on Linux. Generally
+libdispatch should get automatically found on Mac OS without additional
+effort; on FreeBSD it may need installing from ports; on Linux you would
+need to explicitly install `libdispatch-dev` or the equivalent. You can
+force the use in cmake of libdispatch by setting the cmake variable
+`LLFIO_USE_LIBDISPATCH` to On.
+
+Overhead of LLFIO above the libdispatch API is very low, statistically
+unmeasurable.
+
+### Linux
+
+On Linux only, we have a custom userspace implementation with superior performance.
+A similar strategy to Microsoft Windows' approach is used. We
+dynamically increase the number of kernel threads until none are sleeping
+awaiting i/o. If more kernel threads are running than three more than the number of
+CPUs in the system, the number of kernel threads is dynamically reduced.
+Note that **all** the kernel threads for the current process are considered,
+not just the kernel threads created by this thread pool implementation.
+Therefore, if you have alternative thread pool implementations (e.g. OpenMP,
+`std::async`), those are also included in the dynamic adjustment.
+
+As this is wholly implemented by this library, dynamic memory allocation
+occurs in the initial `make_dynamic_thread_pool_group()` and per thread
+creation, but otherwise the implementation does not perform dynamic memory
+allocations.
+
+After multiple rewrites, eventually I got this custom userspace implementation
+to have superior performance to both ASIO and libdispatch. For larger work
+items the difference is meaningless between all three, however for smaller
+work items I benchmarked this custom userspace implementation as beating
+(non-dynamic) ASIO by approx 29% and Linux libdispatch by approx 52% (note
+that Linux libdispatch appears to have a scale up bug when work items are
+small and few, it is often less than half the performance of LLFIO's custom
+implementation).
+*/
+class LLFIO_DECL dynamic_thread_pool_group
+{
+ friend class dynamic_thread_pool_group_impl;
+public:
+ //! An individual item of work within the work group.
+ class work_item
+ {
+ friend struct detail::global_dynamic_thread_pool_impl;
+ friend struct detail::global_dynamic_thread_pool_impl_workqueue_item;
+ friend class dynamic_thread_pool_group_impl;
+ std::atomic<dynamic_thread_pool_group_impl *> _parent{nullptr};
+ void *_internalworkh{nullptr};
+ void *_internaltimerh{nullptr}; // lazily created if next() ever returns a deadline
+ work_item *_prev{nullptr}, *_next{nullptr}, *_next_scheduled{nullptr};
+ std::atomic<intptr_t> _nextwork{-1};
+ std::chrono::steady_clock::time_point _timepoint1;
+ std::chrono::system_clock::time_point _timepoint2;
+ int _internalworkh_inuse{0};
+
+ protected:
+ constexpr bool _has_timer_set_relative() const noexcept { return _timepoint1 != std::chrono::steady_clock::time_point(); }
+ constexpr bool _has_timer_set_absolute() const noexcept { return _timepoint2 != std::chrono::system_clock::time_point(); }
+ constexpr bool _has_timer_set() const noexcept { return _has_timer_set_relative() || _has_timer_set_absolute(); }
+
+ constexpr work_item() {}
+ work_item(const work_item &o) = delete;
+ work_item(work_item &&o) noexcept
+ : _parent(o._parent.load(std::memory_order_relaxed))
+ , _internalworkh(o._internalworkh)
+ , _internaltimerh(o._internaltimerh)
+ , _prev(o._prev)
+ , _next(o._next)
+ , _next_scheduled(o._next_scheduled)
+ , _nextwork(o._nextwork.load(std::memory_order_relaxed))
+ , _timepoint1(o._timepoint1)
+ , _timepoint2(o._timepoint2)
+ , _internalworkh_inuse(o._internalworkh_inuse)
+ {
+ assert(o._parent.load(std::memory_order_relaxed) == nullptr);
+ assert(o._internalworkh == nullptr);
+ assert(o._internaltimerh == nullptr);
+ if(o._parent.load(std::memory_order_relaxed) != nullptr || o._internalworkh != nullptr)
+ {
+ LLFIO_LOG_FATAL(this, "FATAL: dynamic_thread_pool_group::work_item was relocated in memory during use!");
+ abort();
+ }
+ o._prev = o._next = o._next_scheduled = nullptr;
+ o._nextwork.store(-1, std::memory_order_relaxed);
+ o._internalworkh_inuse = 0;
+ }
+ work_item &operator=(const work_item &) = delete;
+ work_item &operator=(work_item &&) = delete;
+
+ public:
+ virtual ~work_item()
+ {
+ assert(_nextwork.load(std::memory_order_relaxed) == -1);
+ if(_nextwork.load(std::memory_order_relaxed) != -1)
+ {
+ LLFIO_LOG_FATAL(this, "FATAL: dynamic_thread_pool_group::work_item destroyed before all work was done!");
+ abort();
+ }
+ assert(_internalworkh == nullptr);
+ assert(_internaltimerh == nullptr);
+ assert(_parent == nullptr);
+ if(_internalworkh != nullptr || _parent != nullptr)
+ {
+ LLFIO_LOG_FATAL(this, "FATAL: dynamic_thread_pool_group::work_item destroyed before group_complete() was executed!");
+ abort();
+ }
+ }
+
+ //! Returns the parent work group between successful submission and just before `group_complete()`.
+ dynamic_thread_pool_group *parent() const noexcept { return reinterpret_cast<dynamic_thread_pool_group *>(_parent.load(std::memory_order_relaxed)); }
+
+ /*! Invoked by the i/o thread pool to determine if this work item
+ has more work to do.
+
+ \return If there is no work _currently_ available to do, but there
+ might be some later, you should return zero. You will be called again
+ later after other work has been done. If you return -1, you are
+ saying that no further work will be done, and the group need never
+ call you again. If you have more work you want to do, return any
+ other value.
+ \param d Optional delay before the next item of work ought to be
+ executed (return != 0), or `next()` ought to be called again to
+ determine the next item (return == 0). On entry `d` is set to no
+ delay, so if you don't modify it, the next item of work occurs
+ as soon as possible.
+
+ Note that this function is called from multiple kernel threads.
+ You must NOT do any significant work in this function.
+ In particular do NOT call any dynamic thread pool group function,
+ as you will experience deadlock.
+
+ `dynamic_thread_pool_group::current_work_item()` may have any
+ value during this call.
+ */
+ virtual intptr_t next(deadline &d) noexcept = 0;
+
+ /*! Invoked by the i/o thread pool to perform the next item of work.
+
+ \return Any failure causes all remaining work in this group to
+ be cancelled as soon as possible.
+ \param work The value returned by `next()`.
+
+ Note that this function is called from multiple kernel threads,
+ and may not be the kernel thread from which `next()`
+ was called.
+
+ `dynamic_thread_pool_group::current_work_item()` will always be
+ `this` during this call.
+ */
+ virtual result<void> operator()(intptr_t work) noexcept = 0;
+
+ /*! Invoked by the i/o thread pool when all work in this thread
+ pool group is complete.
+
+ `cancelled` indicates if this is an abnormal completion. If its
+ error compares equal to `errc::operation_cancelled`, then `stop()`
+ was called.
+
+ Just before this is called for all work items submitted, the group
+ becomes reset to fresh, and `parent()` becomes null. You can resubmit
+ this work item, but do not submit other work items until their
+ `group_complete()` has been invoked.
+
+ Note that this function is called from multiple kernel threads.
+
+ `dynamic_thread_pool_group::current_work_item()` may have any
+ value during this call.
+ */
+ virtual void group_complete(const result<void> &cancelled) noexcept { (void) cancelled; }
+ };
+
+ /*! \class io_aware_work_item
+ \brief A work item which paces when it next executes according to i/o congestion.
+
+ Currently there is only a working implementation of this for the Microsoft Windows
+ and Linux platforms, due to lack of working `statfs_t::f_iosinprogress` on other
+ platforms. If retrieving that for a seekable handle does not work, the constructor
+ throws an exception.
+
+ For seekable handles, currently `reads`, `writes` and `barriers` are ignored. We
+ simply retrieve, periodically, `statfs_t::f_iosinprogress` and `statfs_t::f_iosbusytime`
+ for the storage devices backing the seekable handle. If the recent averaged i/o wait time exceeds
+ `max_iosbusytime` and the i/o in progress > `max_iosinprogress`, `next()` will
+ start setting the default deadline passed to
+ `io_aware_next()`. Thereafter, every 1/10th of a second, if `statfs_t::f_iosinprogress`
+ is above `max_iosinprogress`, it will increase the deadline by 1/16th, whereas if it is
+ below `min_iosinprogress`, it will decrease the deadline by 1/16th. The default deadline
+ chosen is always the worst of all the
+ storage devices of all the handles. This will reduce concurrency within the kernel thread pool
+ in order to reduce congestion on the storage devices. If at any point `statfs_t::f_iosbusytime`
+ drops below `max_iosbusytime` as averaged across one second, and `statfs_t::f_iosinprogress` drops
+ below `min_iosinprogress`, the additional
+ throttling is completely removed. `io_aware_next()` can ignore the default deadline
+ passed into it, and can set any other deadline.
+
+ For non-seekable handles, the handle must have an i/o multiplexer set upon it, and on
+ Microsoft Windows, that i/o multiplexer must be utilising the IOCP instance of the
+ global Win32 thread pool. For each `reads`, `writes` and `barriers` which is non-zero,
+ a corresponding zero length i/o is constructed and initiated. When the i/o completes,
+ and all readable handles in the work item's set have data waiting to be read, and all
+ writable handles in the work item's set have space to allow writes, only then is the
+ work item invoked with the next piece of work.
+
+ \note Non-seekable handle support is not implemented yet.
+ */
+ class LLFIO_DECL io_aware_work_item : public work_item
+ {
+ public:
+ //! Maximum i/o busyness above which throttling is to begin.
+ float max_iosbusytime{0.95f};
+ //! Minimum i/o in progress to target if `iosbusytime` exceeded. The default of 16 suits SSDs, you want around 4 for spinning rust or NV-RAM.
+ uint32_t min_iosinprogress{16};
+ //! Maximum i/o in progress to target if `iosbusytime` exceeded. The default of 32 suits SSDs, you want around 8 for spinning rust or NV-RAM.
+#ifdef _WIN32
+ uint32_t max_iosinprogress{1}; // windows appears to do a lot of i/o coalescing
+#else
+ uint32_t max_iosinprogress{32};
+#endif
+ //! Information about an i/o handle this work item will use
+ struct io_handle_awareness
+ {
+ //! An i/o handle this work item will use
+ io_handle *h{nullptr};
+ //! The relative amount of reading done by this work item from the handle.
+ float reads{0};
+ //! The relative amount of writing done by this work item to the handle.
+ float writes{0};
+ //! The relative amount of write barriering done by this work item to the handle.
+ float barriers{0};
+
+ void *_internal{nullptr};
+ };
+
+ private:
+ const span<io_handle_awareness> _handles;
+
+ LLFIO_HEADERS_ONLY_VIRTUAL_SPEC intptr_t next(deadline &d) noexcept override final;
+
+ public:
+ constexpr io_aware_work_item() {}
+ /*! \brief Constructs a work item aware of i/o done to the handles in `hs`.
+
+ Note that the `reads`, `writes` and `barriers` are normalised to proportions
+ out of `1.0` by this constructor, so if for example you had `reads/writes/barriers = 200/100/0`,
+ after normalisation those become `0.66/0.33/0.0` such that the total is `1.0`.
+ If `reads/writes/barriers = 0/0/0` on entry, they are replaced with `0.5/0.5/0.0`.
+
+ Note that normalisation is across *all* i/o handles in the set, so three handles
+ each with `reads/writes/barriers = 200/100/0` on entry would have `0.22/0.11/0.0`
+ each after construction.
+ */
+ explicit LLFIO_HEADERS_ONLY_MEMFUNC_SPEC io_aware_work_item(span<io_handle_awareness> hs);
+ io_aware_work_item(io_aware_work_item &&o) noexcept
+ : work_item(std::move(o))
+ , _handles(o._handles)
+ {
+ }
+ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC ~io_aware_work_item();
+
+ //! The handles originally registered during construction.
+ span<io_handle_awareness> handles() const noexcept { return _handles; }
+
+ /*! \brief As for `work_item::next()`, but deadline may be extended to
+ reduce i/o congestion on the hardware devices to which the handles
+ refer.
+ */
+ virtual intptr_t io_aware_next(deadline &d) noexcept = 0;
+ };
+
+ virtual ~dynamic_thread_pool_group() {}
+
+ /*! \brief A textual description of the underlying implementation of
+ this dynamic thread pool group.
+
+ The current possible underlying implementations are:
+
+ - "Grand Central Dispatch" (Mac OS, FreeBSD, Linux)
+ - "Linux native" (Linux)
+ - "Win32 thread pool (Vista+)" (Windows)
+
+ Which one is chosen depends on what was detected at cmake configure time,
+ and possibly what the host OS running the program binary supports.
+ */
+ static LLFIO_HEADERS_ONLY_MEMFUNC_SPEC const char *implementation_description() noexcept;
+
+ /*! \brief Threadsafe. Submit one or more work items for execution. Note that you can submit more later.
+
+ Note that if the group is currently stopping, you cannot submit more
+ work until the group has stopped. An error code comparing equal to
+ `errc::operation_canceled` is returned if you try.
+ */
+ virtual result<void> submit(span<work_item *> work) noexcept = 0;
+ //! \overload
+ result<void> submit(work_item *wi) noexcept { return submit(span<work_item *>(&wi, 1)); }
+ //! \overload
+ LLFIO_TEMPLATE(class T)
+ LLFIO_TREQUIRES(LLFIO_TPRED(!std::is_pointer<T>::value), LLFIO_TPRED(std::is_base_of<work_item, T>::value))
+ result<void> submit(span<T> wi) noexcept
+ {
+ auto *wis = (T **) alloca(sizeof(T *) * wi.size());
+ for(size_t n = 0; n < wi.size(); n++)
+ {
+ wis[n] = &wi[n];
+ }
+ return submit(span<work_item *>((work_item **) wis, wi.size()));
+ }
+
+ //! Threadsafe. Cancel any remaining work previously submitted, but without blocking (use `wait()` to block).
+ virtual result<void> stop() noexcept = 0;
+
+ /*! \brief Threadsafe. True if a work item reported an error, or
+ `stop()` was called, but work items are still running.
+ */
+ virtual bool stopping() const noexcept = 0;
+
+ //! Threadsafe. True if all the work previously submitted is complete.
+ virtual bool stopped() const noexcept = 0;
+
+ //! Threadsafe. Wait for work previously submitted to complete, returning any failures by any work item.
+ virtual result<void> wait(deadline d = {}) const noexcept = 0;
+ //! \overload
+ template <class Rep, class Period> result<bool> wait_for(const std::chrono::duration<Rep, Period> &duration) const noexcept
+ {
+ auto r = wait(duration);
+ if(!r && r.error() == errc::timed_out)
+ {
+ return false;
+ }
+ OUTCOME_TRY(std::move(r));
+ return true;
+ }
+ //! \overload
+ template <class Clock, class Duration> result<bool> wait_until(const std::chrono::time_point<Clock, Duration> &timeout) const noexcept
+ {
+ auto r = wait(timeout);
+ if(!r && r.error() == errc::timed_out)
+ {
+ return false;
+ }
+ OUTCOME_TRY(std::move(r));
+ return true;
+ }
+
+ //! Returns the work item nesting level which would be used if a new dynamic thread pool group were created within the current work item.
+ static LLFIO_HEADERS_ONLY_MEMFUNC_SPEC size_t current_nesting_level() noexcept;
+ //! Returns the work item the calling thread is running within, if any.
+ static LLFIO_HEADERS_ONLY_MEMFUNC_SPEC work_item *current_work_item() noexcept;
+ /*! \brief Returns the number of milliseconds that a thread is without work before it is shut down.
+ Note that this will be zero on all but on Linux if using our local thread pool
+ implementation, because the system controls this value on Windows, Grand Central
+ Dispatch etc.
+ */
+ static LLFIO_HEADERS_ONLY_MEMFUNC_SPEC uint32_t ms_sleep_for_more_work() noexcept;
+ /*! \brief Sets the number of milliseconds that a thread is without work before it is shut down,
+ returning the value actually set.
+
+ Note that this will have no effect (and thus return zero) on all but on Linux if
+ using our local thread pool implementation, because the system controls this value
+ on Windows, Grand Central Dispatch etc.
+ */
+ static LLFIO_HEADERS_ONLY_MEMFUNC_SPEC uint32_t ms_sleep_for_more_work(uint32_t v) noexcept;
+};
+//! A unique ptr to a work group within the global dynamic thread pool.
+using dynamic_thread_pool_group_ptr = std::unique_ptr<dynamic_thread_pool_group>;
+
+//! Creates a new work group within the global dynamic thread pool.
+LLFIO_HEADERS_ONLY_FUNC_SPEC result<dynamic_thread_pool_group_ptr> make_dynamic_thread_pool_group() noexcept;
+
+// BEGIN make_free_functions.py
+// END make_free_functions.py
+
+LLFIO_V2_NAMESPACE_END
+
+#ifdef _MSC_VER
+#pragma warning(pop)
+#endif
+
+#if LLFIO_HEADERS_ONLY == 1 && !defined(DOXYGEN_SHOULD_SKIP_THIS)
+#define LLFIO_INCLUDED_BY_HEADER 1
+#include "detail/impl/dynamic_thread_pool_group.ipp"
+#undef LLFIO_INCLUDED_BY_HEADER
+#endif
+
+#endif