diff options
author | Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com> | 2021-05-25 13:49:37 +0300 |
---|---|---|
committer | Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com> | 2021-05-25 13:49:37 +0300 |
commit | 1380d0cbac91821488a1f0a494cb220ae864a252 (patch) | |
tree | 4d8c5cae473e45eda41986c17a49eb054f1a54f3 /test | |
parent | a93b8521afb5d858f423de7accdc6d9d4504e7cb (diff) |
Fix bug where native Linux dynamic_thread_pool_group would schedule delayed work items inverted to correctness.
Diffstat (limited to 'test')
-rw-r--r-- | test/tests/dynamic_thread_pool_group.cpp | 97 |
1 files changed, 97 insertions, 0 deletions
diff --git a/test/tests/dynamic_thread_pool_group.cpp b/test/tests/dynamic_thread_pool_group.cpp index 0c2b6dad..fa641bb6 100644 --- a/test/tests/dynamic_thread_pool_group.cpp +++ b/test/tests/dynamic_thread_pool_group.cpp @@ -26,6 +26,8 @@ Distributed under the Boost Software License, Version 1.0. #include "../test_kernel_decl.hpp" +#include "quickcpplib/algorithm/small_prng.hpp" + #include <cmath> // for sqrt static inline void TestDynamicThreadPoolGroupWorks() @@ -222,6 +224,98 @@ static inline void TestDynamicThreadPoolGroupWorks() check(); } +static inline void TestDynamicThreadPoolGroupWorkItemDelayWorks() +{ + static constexpr size_t WORKITEMS = 100; + namespace llfio = LLFIO_V2_NAMESPACE; + struct work_item; + struct shared_state_t + { + std::vector<std::pair<std::chrono::steady_clock::time_point, std::chrono::microseconds>> timepoints; + std::atomic<int> awaiting{0}, within_1ms{0}, within_10ms{0}, within_100ms{0}, over_100ms{0}; + std::atomic<bool> cancelling{false}; + } shared_state; + struct work_item final : public llfio::dynamic_thread_pool_group::work_item + { + using _base = llfio::dynamic_thread_pool_group::work_item; + shared_state_t *shared{nullptr}; + const size_t myidx; + + work_item() = default; + explicit work_item(shared_state_t *_shared, size_t _myidx) + : shared(_shared) + , myidx(_myidx) + { + } + work_item(work_item &&o) noexcept + : _base(std::move(o)) + , shared(o.shared) + , myidx(o.myidx) + { + } + + virtual intptr_t next(llfio::deadline &d) noexcept override + { + if(shared->cancelling.load(std::memory_order_relaxed)) + { + return -1; + } + auto now = std::chrono::steady_clock::now(); + auto diff = std::chrono::milliseconds(QUICKCPPLIB_NAMESPACE::algorithm::small_prng::thread_local_prng()() / 524288); // up to 8.6 seconds + shared->timepoints[myidx].first = now + diff; // up to 17.2 seconds + shared->timepoints[myidx].second = diff; + d = diff; + shared->awaiting.fetch_add(1, std::memory_order_relaxed); + return 1; + } + virtual llfio::result<void> operator()(intptr_t /*unused*/) noexcept override + { + auto now = std::chrono::steady_clock::now(); + auto &x = shared->timepoints[myidx]; + auto diff = now - x.first; + BOOST_CHECK(diff > -std::chrono::milliseconds(1)); // permit up to 1 millisecond early + if(diff > -std::chrono::milliseconds(1) && diff < std::chrono::milliseconds(1)) + { + shared->within_1ms.fetch_add(1, std::memory_order_relaxed); + } + else if(diff < std::chrono::milliseconds(10)) + { + shared->within_10ms.fetch_add(1, std::memory_order_relaxed); + } + else if(diff < std::chrono::milliseconds(100)) + { + shared->within_100ms.fetch_add(1, std::memory_order_relaxed); + } + else + { + shared->over_100ms.fetch_add(1, std::memory_order_relaxed); + } + shared->awaiting.fetch_sub(1, std::memory_order_relaxed); + return llfio::success(); + } + }; + std::vector<work_item> workitems; + workitems.reserve(WORKITEMS); + shared_state.timepoints.resize(WORKITEMS); + for(size_t n = 0; n < WORKITEMS; n++) + { + workitems.emplace_back(&shared_state, n); + } + auto tpg = llfio::make_dynamic_thread_pool_group().value(); + tpg->submit(llfio::span<work_item>(workitems)).value(); + std::this_thread::sleep_for(std::chrono::seconds(10)); + std::cout << " Telling work items to cancel ..." << std::endl; + shared_state.cancelling = true; + tpg->wait().value(); + BOOST_CHECK(shared_state.awaiting == 0); + std::cout << " " << shared_state.within_1ms << " delayed work items were scheduled within 1ms of request." << std::endl; + std::cout << " " << shared_state.within_10ms << " delayed work items were scheduled within 10ms of request." << std::endl; + std::cout << " " << shared_state.within_100ms << " delayed work items were scheduled within 100ms of request." << std::endl; + std::cout << " " << shared_state.over_100ms << " delayed work items were scheduled over 100ms of request." << std::endl; + BOOST_CHECK(shared_state.within_1ms > 0); + BOOST_CHECK(shared_state.over_100ms < 10); +} + static inline void TestDynamicThreadPoolGroupNestingWorks() { if(std::thread::hardware_concurrency() < 4) @@ -477,6 +571,9 @@ static inline void TestDynamicThreadPoolGroupIoAwareWorks() KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, works, "Tests that llfio::dynamic_thread_pool_group works as expected", TestDynamicThreadPoolGroupWorks()) +KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, delay, + "Tests that setting a delay in a llfio::dynamic_thread_pool_group::work_item works as expected", + TestDynamicThreadPoolGroupWorkItemDelayWorks()) KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, nested, "Tests that nesting of llfio::dynamic_thread_pool_group works as expected", TestDynamicThreadPoolGroupNestingWorks()) KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, io_aware_work_item, |