Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/windirstat/llfio.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'test/tests/dynamic_thread_pool_group.cpp')
-rw-r--r--test/tests/dynamic_thread_pool_group.cpp97
1 files changed, 97 insertions, 0 deletions
diff --git a/test/tests/dynamic_thread_pool_group.cpp b/test/tests/dynamic_thread_pool_group.cpp
index 0c2b6dad..fa641bb6 100644
--- a/test/tests/dynamic_thread_pool_group.cpp
+++ b/test/tests/dynamic_thread_pool_group.cpp
@@ -26,6 +26,8 @@ Distributed under the Boost Software License, Version 1.0.
#include "../test_kernel_decl.hpp"
+#include "quickcpplib/algorithm/small_prng.hpp"
+
#include <cmath> // for sqrt
static inline void TestDynamicThreadPoolGroupWorks()
@@ -222,6 +224,98 @@ static inline void TestDynamicThreadPoolGroupWorks()
check();
}
+static inline void TestDynamicThreadPoolGroupWorkItemDelayWorks()
+{
+ static constexpr size_t WORKITEMS = 100;
+ namespace llfio = LLFIO_V2_NAMESPACE;
+ struct work_item;
+ struct shared_state_t
+ {
+ std::vector<std::pair<std::chrono::steady_clock::time_point, std::chrono::microseconds>> timepoints;
+ std::atomic<int> awaiting{0}, within_1ms{0}, within_10ms{0}, within_100ms{0}, over_100ms{0};
+ std::atomic<bool> cancelling{false};
+ } shared_state;
+ struct work_item final : public llfio::dynamic_thread_pool_group::work_item
+ {
+ using _base = llfio::dynamic_thread_pool_group::work_item;
+ shared_state_t *shared{nullptr};
+ const size_t myidx;
+
+ work_item() = default;
+ explicit work_item(shared_state_t *_shared, size_t _myidx)
+ : shared(_shared)
+ , myidx(_myidx)
+ {
+ }
+ work_item(work_item &&o) noexcept
+ : _base(std::move(o))
+ , shared(o.shared)
+ , myidx(o.myidx)
+ {
+ }
+
+ virtual intptr_t next(llfio::deadline &d) noexcept override
+ {
+ if(shared->cancelling.load(std::memory_order_relaxed))
+ {
+ return -1;
+ }
+ auto now = std::chrono::steady_clock::now();
+ auto diff = std::chrono::milliseconds(QUICKCPPLIB_NAMESPACE::algorithm::small_prng::thread_local_prng()() / 524288); // up to 8.6 seconds
+ shared->timepoints[myidx].first = now + diff; // up to 17.2 seconds
+ shared->timepoints[myidx].second = diff;
+ d = diff;
+ shared->awaiting.fetch_add(1, std::memory_order_relaxed);
+ return 1;
+ }
+ virtual llfio::result<void> operator()(intptr_t /*unused*/) noexcept override
+ {
+ auto now = std::chrono::steady_clock::now();
+ auto &x = shared->timepoints[myidx];
+ auto diff = now - x.first;
+ BOOST_CHECK(diff > -std::chrono::milliseconds(1)); // permit up to 1 millisecond early
+ if(diff > -std::chrono::milliseconds(1) && diff < std::chrono::milliseconds(1))
+ {
+ shared->within_1ms.fetch_add(1, std::memory_order_relaxed);
+ }
+ else if(diff < std::chrono::milliseconds(10))
+ {
+ shared->within_10ms.fetch_add(1, std::memory_order_relaxed);
+ }
+ else if(diff < std::chrono::milliseconds(100))
+ {
+ shared->within_100ms.fetch_add(1, std::memory_order_relaxed);
+ }
+ else
+ {
+ shared->over_100ms.fetch_add(1, std::memory_order_relaxed);
+ }
+ shared->awaiting.fetch_sub(1, std::memory_order_relaxed);
+ return llfio::success();
+ }
+ };
+ std::vector<work_item> workitems;
+ workitems.reserve(WORKITEMS);
+ shared_state.timepoints.resize(WORKITEMS);
+ for(size_t n = 0; n < WORKITEMS; n++)
+ {
+ workitems.emplace_back(&shared_state, n);
+ }
+ auto tpg = llfio::make_dynamic_thread_pool_group().value();
+ tpg->submit(llfio::span<work_item>(workitems)).value();
+ std::this_thread::sleep_for(std::chrono::seconds(10));
+ std::cout << " Telling work items to cancel ..." << std::endl;
+ shared_state.cancelling = true;
+ tpg->wait().value();
+ BOOST_CHECK(shared_state.awaiting == 0);
+ std::cout << " " << shared_state.within_1ms << " delayed work items were scheduled within 1ms of request." << std::endl;
+ std::cout << " " << shared_state.within_10ms << " delayed work items were scheduled within 10ms of request." << std::endl;
+ std::cout << " " << shared_state.within_100ms << " delayed work items were scheduled within 100ms of request." << std::endl;
+ std::cout << " " << shared_state.over_100ms << " delayed work items were scheduled over 100ms of request." << std::endl;
+ BOOST_CHECK(shared_state.within_1ms > 0);
+ BOOST_CHECK(shared_state.over_100ms < 10);
+}
+
static inline void TestDynamicThreadPoolGroupNestingWorks()
{
if(std::thread::hardware_concurrency() < 4)
@@ -477,6 +571,9 @@ static inline void TestDynamicThreadPoolGroupIoAwareWorks()
KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, works, "Tests that llfio::dynamic_thread_pool_group works as expected",
TestDynamicThreadPoolGroupWorks())
+KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, delay,
+ "Tests that setting a delay in a llfio::dynamic_thread_pool_group::work_item works as expected",
+ TestDynamicThreadPoolGroupWorkItemDelayWorks())
KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, nested, "Tests that nesting of llfio::dynamic_thread_pool_group works as expected",
TestDynamicThreadPoolGroupNestingWorks())
KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, io_aware_work_item,