From 1380d0cbac91821488a1f0a494cb220ae864a252 Mon Sep 17 00:00:00 2001 From: "Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com)" Date: Tue, 25 May 2021 11:49:37 +0100 Subject: Fix bug where native Linux dynamic_thread_pool_group would schedule delayed work items inverted to correctness. --- include/llfio/revision.hpp | 6 +- .../v2.0/detail/impl/dynamic_thread_pool_group.ipp | 27 +++++- include/llfio/v2.0/detail/impl/reduce.ipp | 6 -- programs/benchmark-io-congestion/main.cpp | 2 +- test/tests/dynamic_thread_pool_group.cpp | 97 ++++++++++++++++++++++ 5 files changed, 124 insertions(+), 14 deletions(-) diff --git a/include/llfio/revision.hpp b/include/llfio/revision.hpp index 8d06f55a..424376d1 100644 --- a/include/llfio/revision.hpp +++ b/include/llfio/revision.hpp @@ -1,4 +1,4 @@ // Note the second line of this file must ALWAYS be the git SHA, third line ALWAYS the git SHA update time -#define LLFIO_PREVIOUS_COMMIT_REF b1a36308e99c5477fc3791de6f9fff993099c69f -#define LLFIO_PREVIOUS_COMMIT_DATE "2021-04-29 16:33:49 +00:00" -#define LLFIO_PREVIOUS_COMMIT_UNIQUE b1a36308 +#define LLFIO_PREVIOUS_COMMIT_REF a93b8521afb5d858f423de7accdc6d9d4504e7cb +#define LLFIO_PREVIOUS_COMMIT_DATE "2021-05-11 18:16:38 +00:00" +#define LLFIO_PREVIOUS_COMMIT_UNIQUE a93b8521 diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp index 08e6762e..4192306c 100644 --- a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp +++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp @@ -458,7 +458,7 @@ namespace detail bool done = false; for(dynamic_thread_pool_group::work_item *p = nullptr, *n = next_timer_relative.front; n != nullptr; p = n, n = n->_next_scheduled) { - if(n->_timepoint1 <= i->_timepoint1) + if(n->_timepoint1 > i->_timepoint1) { if(p == nullptr) { @@ -480,6 +480,21 @@ namespace detail i->_next_scheduled = nullptr; next_timer_relative.back = i; } +#if 0 + { + auto now = std::chrono::steady_clock::now(); + std::cout << "\n"; + for(dynamic_thread_pool_group::work_item *p = nullptr, *n = next_timer_relative.front; n != nullptr; p = n, n = n->_next_scheduled) + { + if(p != nullptr) + { + assert(n->_timepoint1 >= p->_timepoint1); + } + std::cout << "\nRelative timer: " << std::chrono::duration_cast(n->_timepoint1 - now).count(); + } + std::cout << std::endl; + } +#endif } next_timer_relative.lock.unlock(); } @@ -496,7 +511,7 @@ namespace detail bool done = false; for(dynamic_thread_pool_group::work_item *p = nullptr, *n = next_timer_absolute.front; n != nullptr; p = n, n = n->_next_scheduled) { - if(n->_timepoint2 <= i->_timepoint2) + if(n->_timepoint2 > i->_timepoint2) { if(p == nullptr) { @@ -1151,8 +1166,12 @@ namespace detail { if(d.steady) { - workitem->_timepoint1 = std::chrono::steady_clock::now() + std::chrono::nanoseconds(d.nsecs); - workitem->_timepoint2 = {}; + std::chrono::microseconds diff(d.nsecs / 1000); + if(diff > std::chrono::microseconds(0)) + { + workitem->_timepoint1 = std::chrono::steady_clock::now() + diff; + workitem->_timepoint2 = {}; + } } else { diff --git a/include/llfio/v2.0/detail/impl/reduce.ipp b/include/llfio/v2.0/detail/impl/reduce.ipp index 5ce27e3b..1ed837cd 100644 --- a/include/llfio/v2.0/detail/impl/reduce.ipp +++ b/include/llfio/v2.0/detail/impl/reduce.ipp @@ -64,8 +64,6 @@ namespace algorithm 0x20 /*FILE_SYNCHRONOUS_IO_NONALERT*/ | 0x00200000 /*FILE_OPEN_REPARSE_POINT*/ | 0x00001000 /*FILE_DELETE_ON_CLOSE*/ | 0x040 /*FILE_NON_DIRECTORY_FILE*/; const DWORD deletedir_ntflags = 0x20 /*FILE_SYNCHRONOUS_IO_NONALERT*/ | 0x00200000 /*FILE_OPEN_REPARSE_POINT*/ | 0x00001000 /*FILE_DELETE_ON_CLOSE*/ | 0x01 /*FILE_DIRECTORY_FILE*/; - const DWORD renamefile_ntflags = 0x20 /*FILE_SYNCHRONOUS_IO_NONALERT*/ | 0x00200000 /*FILE_OPEN_REPARSE_POINT*/ | 0x040 /*FILE_NON_DIRECTORY_FILE*/; - const DWORD renamedir_ntflags = 0x20 /*FILE_SYNCHRONOUS_IO_NONALERT*/ | 0x00200000 /*FILE_OPEN_REPARSE_POINT*/ | 0x01 /*FILE_DIRECTORY_FILE*/; IO_STATUS_BLOCK isb = make_iostatus(); path_view::c_str<> zpath(leafname, path_view::zero_terminated); UNICODE_STRING _path{}; @@ -172,10 +170,6 @@ namespace algorithm using namespace windows_nt_kernel; const DWORD access = SYNCHRONIZE | DELETE; const DWORD fileshare = FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE; - const DWORD deletefile_ntflags = - 0x20 /*FILE_SYNCHRONOUS_IO_NONALERT*/ | 0x00200000 /*FILE_OPEN_REPARSE_POINT*/ | 0x00001000 /*FILE_DELETE_ON_CLOSE*/ | 0x040 /*FILE_NON_DIRECTORY_FILE*/; - const DWORD deletedir_ntflags = - 0x20 /*FILE_SYNCHRONOUS_IO_NONALERT*/ | 0x00200000 /*FILE_OPEN_REPARSE_POINT*/ | 0x00001000 /*FILE_DELETE_ON_CLOSE*/ | 0x01 /*FILE_DIRECTORY_FILE*/; const DWORD renamefile_ntflags = 0x20 /*FILE_SYNCHRONOUS_IO_NONALERT*/ | 0x00200000 /*FILE_OPEN_REPARSE_POINT*/ | 0x040 /*FILE_NON_DIRECTORY_FILE*/; const DWORD renamedir_ntflags = 0x20 /*FILE_SYNCHRONOUS_IO_NONALERT*/ | 0x00200000 /*FILE_OPEN_REPARSE_POINT*/ | 0x01 /*FILE_DIRECTORY_FILE*/; IO_STATUS_BLOCK isb = make_iostatus(); diff --git a/programs/benchmark-io-congestion/main.cpp b/programs/benchmark-io-congestion/main.cpp index ea902938..f7e929a5 100644 --- a/programs/benchmark-io-congestion/main.cpp +++ b/programs/benchmark-io-congestion/main.cpp @@ -210,7 +210,7 @@ struct llfio_runner_paced if(last_pace != d.nsecs) { parent->last_pace.store(d.nsecs, std::memory_order_relaxed); - std::cout << "Pacing work by milliseconds " << (d.nsecs / 1000.0) << std::endl; + std::cout << "Pacing work by milliseconds " << (d.nsecs / 1000000.0) << std::endl; } #endif return parent->cancel.load(std::memory_order_relaxed) ? -1 : 1; diff --git a/test/tests/dynamic_thread_pool_group.cpp b/test/tests/dynamic_thread_pool_group.cpp index 0c2b6dad..fa641bb6 100644 --- a/test/tests/dynamic_thread_pool_group.cpp +++ b/test/tests/dynamic_thread_pool_group.cpp @@ -26,6 +26,8 @@ Distributed under the Boost Software License, Version 1.0. #include "../test_kernel_decl.hpp" +#include "quickcpplib/algorithm/small_prng.hpp" + #include // for sqrt static inline void TestDynamicThreadPoolGroupWorks() @@ -222,6 +224,98 @@ static inline void TestDynamicThreadPoolGroupWorks() check(); } +static inline void TestDynamicThreadPoolGroupWorkItemDelayWorks() +{ + static constexpr size_t WORKITEMS = 100; + namespace llfio = LLFIO_V2_NAMESPACE; + struct work_item; + struct shared_state_t + { + std::vector> timepoints; + std::atomic awaiting{0}, within_1ms{0}, within_10ms{0}, within_100ms{0}, over_100ms{0}; + std::atomic cancelling{false}; + } shared_state; + struct work_item final : public llfio::dynamic_thread_pool_group::work_item + { + using _base = llfio::dynamic_thread_pool_group::work_item; + shared_state_t *shared{nullptr}; + const size_t myidx; + + work_item() = default; + explicit work_item(shared_state_t *_shared, size_t _myidx) + : shared(_shared) + , myidx(_myidx) + { + } + work_item(work_item &&o) noexcept + : _base(std::move(o)) + , shared(o.shared) + , myidx(o.myidx) + { + } + + virtual intptr_t next(llfio::deadline &d) noexcept override + { + if(shared->cancelling.load(std::memory_order_relaxed)) + { + return -1; + } + auto now = std::chrono::steady_clock::now(); + auto diff = std::chrono::milliseconds(QUICKCPPLIB_NAMESPACE::algorithm::small_prng::thread_local_prng()() / 524288); // up to 8.6 seconds + shared->timepoints[myidx].first = now + diff; // up to 17.2 seconds + shared->timepoints[myidx].second = diff; + d = diff; + shared->awaiting.fetch_add(1, std::memory_order_relaxed); + return 1; + } + virtual llfio::result operator()(intptr_t /*unused*/) noexcept override + { + auto now = std::chrono::steady_clock::now(); + auto &x = shared->timepoints[myidx]; + auto diff = now - x.first; + BOOST_CHECK(diff > -std::chrono::milliseconds(1)); // permit up to 1 millisecond early + if(diff > -std::chrono::milliseconds(1) && diff < std::chrono::milliseconds(1)) + { + shared->within_1ms.fetch_add(1, std::memory_order_relaxed); + } + else if(diff < std::chrono::milliseconds(10)) + { + shared->within_10ms.fetch_add(1, std::memory_order_relaxed); + } + else if(diff < std::chrono::milliseconds(100)) + { + shared->within_100ms.fetch_add(1, std::memory_order_relaxed); + } + else + { + shared->over_100ms.fetch_add(1, std::memory_order_relaxed); + } + shared->awaiting.fetch_sub(1, std::memory_order_relaxed); + return llfio::success(); + } + }; + std::vector workitems; + workitems.reserve(WORKITEMS); + shared_state.timepoints.resize(WORKITEMS); + for(size_t n = 0; n < WORKITEMS; n++) + { + workitems.emplace_back(&shared_state, n); + } + auto tpg = llfio::make_dynamic_thread_pool_group().value(); + tpg->submit(llfio::span(workitems)).value(); + std::this_thread::sleep_for(std::chrono::seconds(10)); + std::cout << " Telling work items to cancel ..." << std::endl; + shared_state.cancelling = true; + tpg->wait().value(); + BOOST_CHECK(shared_state.awaiting == 0); + std::cout << " " << shared_state.within_1ms << " delayed work items were scheduled within 1ms of request." << std::endl; + std::cout << " " << shared_state.within_10ms << " delayed work items were scheduled within 10ms of request." << std::endl; + std::cout << " " << shared_state.within_100ms << " delayed work items were scheduled within 100ms of request." << std::endl; + std::cout << " " << shared_state.over_100ms << " delayed work items were scheduled over 100ms of request." << std::endl; + BOOST_CHECK(shared_state.within_1ms > 0); + BOOST_CHECK(shared_state.over_100ms < 10); +} + static inline void TestDynamicThreadPoolGroupNestingWorks() { if(std::thread::hardware_concurrency() < 4) @@ -477,6 +571,9 @@ static inline void TestDynamicThreadPoolGroupIoAwareWorks() KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, works, "Tests that llfio::dynamic_thread_pool_group works as expected", TestDynamicThreadPoolGroupWorks()) +KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, delay, + "Tests that setting a delay in a llfio::dynamic_thread_pool_group::work_item works as expected", + TestDynamicThreadPoolGroupWorkItemDelayWorks()) KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, nested, "Tests that nesting of llfio::dynamic_thread_pool_group works as expected", TestDynamicThreadPoolGroupNestingWorks()) KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, io_aware_work_item, -- cgit v1.2.3