Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/windirstat/llfio.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2021-05-25 13:49:37 +0300
committerNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2021-05-25 13:49:37 +0300
commit1380d0cbac91821488a1f0a494cb220ae864a252 (patch)
tree4d8c5cae473e45eda41986c17a49eb054f1a54f3
parenta93b8521afb5d858f423de7accdc6d9d4504e7cb (diff)
Fix bug where native Linux dynamic_thread_pool_group would schedule delayed work items inverted to correctness.
-rw-r--r--include/llfio/revision.hpp6
-rw-r--r--include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp27
-rw-r--r--include/llfio/v2.0/detail/impl/reduce.ipp6
-rw-r--r--programs/benchmark-io-congestion/main.cpp2
-rw-r--r--test/tests/dynamic_thread_pool_group.cpp97
5 files changed, 124 insertions, 14 deletions
diff --git a/include/llfio/revision.hpp b/include/llfio/revision.hpp
index 8d06f55a..424376d1 100644
--- a/include/llfio/revision.hpp
+++ b/include/llfio/revision.hpp
@@ -1,4 +1,4 @@
// Note the second line of this file must ALWAYS be the git SHA, third line ALWAYS the git SHA update time
-#define LLFIO_PREVIOUS_COMMIT_REF b1a36308e99c5477fc3791de6f9fff993099c69f
-#define LLFIO_PREVIOUS_COMMIT_DATE "2021-04-29 16:33:49 +00:00"
-#define LLFIO_PREVIOUS_COMMIT_UNIQUE b1a36308
+#define LLFIO_PREVIOUS_COMMIT_REF a93b8521afb5d858f423de7accdc6d9d4504e7cb
+#define LLFIO_PREVIOUS_COMMIT_DATE "2021-05-11 18:16:38 +00:00"
+#define LLFIO_PREVIOUS_COMMIT_UNIQUE a93b8521
diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
index 08e6762e..4192306c 100644
--- a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
+++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
@@ -458,7 +458,7 @@ namespace detail
bool done = false;
for(dynamic_thread_pool_group::work_item *p = nullptr, *n = next_timer_relative.front; n != nullptr; p = n, n = n->_next_scheduled)
{
- if(n->_timepoint1 <= i->_timepoint1)
+ if(n->_timepoint1 > i->_timepoint1)
{
if(p == nullptr)
{
@@ -480,6 +480,21 @@ namespace detail
i->_next_scheduled = nullptr;
next_timer_relative.back = i;
}
+#if 0
+ {
+ auto now = std::chrono::steady_clock::now();
+ std::cout << "\n";
+ for(dynamic_thread_pool_group::work_item *p = nullptr, *n = next_timer_relative.front; n != nullptr; p = n, n = n->_next_scheduled)
+ {
+ if(p != nullptr)
+ {
+ assert(n->_timepoint1 >= p->_timepoint1);
+ }
+ std::cout << "\nRelative timer: " << std::chrono::duration_cast<std::chrono::milliseconds>(n->_timepoint1 - now).count();
+ }
+ std::cout << std::endl;
+ }
+#endif
}
next_timer_relative.lock.unlock();
}
@@ -496,7 +511,7 @@ namespace detail
bool done = false;
for(dynamic_thread_pool_group::work_item *p = nullptr, *n = next_timer_absolute.front; n != nullptr; p = n, n = n->_next_scheduled)
{
- if(n->_timepoint2 <= i->_timepoint2)
+ if(n->_timepoint2 > i->_timepoint2)
{
if(p == nullptr)
{
@@ -1151,8 +1166,12 @@ namespace detail
{
if(d.steady)
{
- workitem->_timepoint1 = std::chrono::steady_clock::now() + std::chrono::nanoseconds(d.nsecs);
- workitem->_timepoint2 = {};
+ std::chrono::microseconds diff(d.nsecs / 1000);
+ if(diff > std::chrono::microseconds(0))
+ {
+ workitem->_timepoint1 = std::chrono::steady_clock::now() + diff;
+ workitem->_timepoint2 = {};
+ }
}
else
{
diff --git a/include/llfio/v2.0/detail/impl/reduce.ipp b/include/llfio/v2.0/detail/impl/reduce.ipp
index 5ce27e3b..1ed837cd 100644
--- a/include/llfio/v2.0/detail/impl/reduce.ipp
+++ b/include/llfio/v2.0/detail/impl/reduce.ipp
@@ -64,8 +64,6 @@ namespace algorithm
0x20 /*FILE_SYNCHRONOUS_IO_NONALERT*/ | 0x00200000 /*FILE_OPEN_REPARSE_POINT*/ | 0x00001000 /*FILE_DELETE_ON_CLOSE*/ | 0x040 /*FILE_NON_DIRECTORY_FILE*/;
const DWORD deletedir_ntflags =
0x20 /*FILE_SYNCHRONOUS_IO_NONALERT*/ | 0x00200000 /*FILE_OPEN_REPARSE_POINT*/ | 0x00001000 /*FILE_DELETE_ON_CLOSE*/ | 0x01 /*FILE_DIRECTORY_FILE*/;
- const DWORD renamefile_ntflags = 0x20 /*FILE_SYNCHRONOUS_IO_NONALERT*/ | 0x00200000 /*FILE_OPEN_REPARSE_POINT*/ | 0x040 /*FILE_NON_DIRECTORY_FILE*/;
- const DWORD renamedir_ntflags = 0x20 /*FILE_SYNCHRONOUS_IO_NONALERT*/ | 0x00200000 /*FILE_OPEN_REPARSE_POINT*/ | 0x01 /*FILE_DIRECTORY_FILE*/;
IO_STATUS_BLOCK isb = make_iostatus();
path_view::c_str<> zpath(leafname, path_view::zero_terminated);
UNICODE_STRING _path{};
@@ -172,10 +170,6 @@ namespace algorithm
using namespace windows_nt_kernel;
const DWORD access = SYNCHRONIZE | DELETE;
const DWORD fileshare = FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE;
- const DWORD deletefile_ntflags =
- 0x20 /*FILE_SYNCHRONOUS_IO_NONALERT*/ | 0x00200000 /*FILE_OPEN_REPARSE_POINT*/ | 0x00001000 /*FILE_DELETE_ON_CLOSE*/ | 0x040 /*FILE_NON_DIRECTORY_FILE*/;
- const DWORD deletedir_ntflags =
- 0x20 /*FILE_SYNCHRONOUS_IO_NONALERT*/ | 0x00200000 /*FILE_OPEN_REPARSE_POINT*/ | 0x00001000 /*FILE_DELETE_ON_CLOSE*/ | 0x01 /*FILE_DIRECTORY_FILE*/;
const DWORD renamefile_ntflags = 0x20 /*FILE_SYNCHRONOUS_IO_NONALERT*/ | 0x00200000 /*FILE_OPEN_REPARSE_POINT*/ | 0x040 /*FILE_NON_DIRECTORY_FILE*/;
const DWORD renamedir_ntflags = 0x20 /*FILE_SYNCHRONOUS_IO_NONALERT*/ | 0x00200000 /*FILE_OPEN_REPARSE_POINT*/ | 0x01 /*FILE_DIRECTORY_FILE*/;
IO_STATUS_BLOCK isb = make_iostatus();
diff --git a/programs/benchmark-io-congestion/main.cpp b/programs/benchmark-io-congestion/main.cpp
index ea902938..f7e929a5 100644
--- a/programs/benchmark-io-congestion/main.cpp
+++ b/programs/benchmark-io-congestion/main.cpp
@@ -210,7 +210,7 @@ struct llfio_runner_paced
if(last_pace != d.nsecs)
{
parent->last_pace.store(d.nsecs, std::memory_order_relaxed);
- std::cout << "Pacing work by milliseconds " << (d.nsecs / 1000.0) << std::endl;
+ std::cout << "Pacing work by milliseconds " << (d.nsecs / 1000000.0) << std::endl;
}
#endif
return parent->cancel.load(std::memory_order_relaxed) ? -1 : 1;
diff --git a/test/tests/dynamic_thread_pool_group.cpp b/test/tests/dynamic_thread_pool_group.cpp
index 0c2b6dad..fa641bb6 100644
--- a/test/tests/dynamic_thread_pool_group.cpp
+++ b/test/tests/dynamic_thread_pool_group.cpp
@@ -26,6 +26,8 @@ Distributed under the Boost Software License, Version 1.0.
#include "../test_kernel_decl.hpp"
+#include "quickcpplib/algorithm/small_prng.hpp"
+
#include <cmath> // for sqrt
static inline void TestDynamicThreadPoolGroupWorks()
@@ -222,6 +224,98 @@ static inline void TestDynamicThreadPoolGroupWorks()
check();
}
+static inline void TestDynamicThreadPoolGroupWorkItemDelayWorks()
+{
+ static constexpr size_t WORKITEMS = 100;
+ namespace llfio = LLFIO_V2_NAMESPACE;
+ struct work_item;
+ struct shared_state_t
+ {
+ std::vector<std::pair<std::chrono::steady_clock::time_point, std::chrono::microseconds>> timepoints;
+ std::atomic<int> awaiting{0}, within_1ms{0}, within_10ms{0}, within_100ms{0}, over_100ms{0};
+ std::atomic<bool> cancelling{false};
+ } shared_state;
+ struct work_item final : public llfio::dynamic_thread_pool_group::work_item
+ {
+ using _base = llfio::dynamic_thread_pool_group::work_item;
+ shared_state_t *shared{nullptr};
+ const size_t myidx;
+
+ work_item() = default;
+ explicit work_item(shared_state_t *_shared, size_t _myidx)
+ : shared(_shared)
+ , myidx(_myidx)
+ {
+ }
+ work_item(work_item &&o) noexcept
+ : _base(std::move(o))
+ , shared(o.shared)
+ , myidx(o.myidx)
+ {
+ }
+
+ virtual intptr_t next(llfio::deadline &d) noexcept override
+ {
+ if(shared->cancelling.load(std::memory_order_relaxed))
+ {
+ return -1;
+ }
+ auto now = std::chrono::steady_clock::now();
+ auto diff = std::chrono::milliseconds(QUICKCPPLIB_NAMESPACE::algorithm::small_prng::thread_local_prng()() / 524288); // up to 8.6 seconds
+ shared->timepoints[myidx].first = now + diff; // up to 17.2 seconds
+ shared->timepoints[myidx].second = diff;
+ d = diff;
+ shared->awaiting.fetch_add(1, std::memory_order_relaxed);
+ return 1;
+ }
+ virtual llfio::result<void> operator()(intptr_t /*unused*/) noexcept override
+ {
+ auto now = std::chrono::steady_clock::now();
+ auto &x = shared->timepoints[myidx];
+ auto diff = now - x.first;
+ BOOST_CHECK(diff > -std::chrono::milliseconds(1)); // permit up to 1 millisecond early
+ if(diff > -std::chrono::milliseconds(1) && diff < std::chrono::milliseconds(1))
+ {
+ shared->within_1ms.fetch_add(1, std::memory_order_relaxed);
+ }
+ else if(diff < std::chrono::milliseconds(10))
+ {
+ shared->within_10ms.fetch_add(1, std::memory_order_relaxed);
+ }
+ else if(diff < std::chrono::milliseconds(100))
+ {
+ shared->within_100ms.fetch_add(1, std::memory_order_relaxed);
+ }
+ else
+ {
+ shared->over_100ms.fetch_add(1, std::memory_order_relaxed);
+ }
+ shared->awaiting.fetch_sub(1, std::memory_order_relaxed);
+ return llfio::success();
+ }
+ };
+ std::vector<work_item> workitems;
+ workitems.reserve(WORKITEMS);
+ shared_state.timepoints.resize(WORKITEMS);
+ for(size_t n = 0; n < WORKITEMS; n++)
+ {
+ workitems.emplace_back(&shared_state, n);
+ }
+ auto tpg = llfio::make_dynamic_thread_pool_group().value();
+ tpg->submit(llfio::span<work_item>(workitems)).value();
+ std::this_thread::sleep_for(std::chrono::seconds(10));
+ std::cout << " Telling work items to cancel ..." << std::endl;
+ shared_state.cancelling = true;
+ tpg->wait().value();
+ BOOST_CHECK(shared_state.awaiting == 0);
+ std::cout << " " << shared_state.within_1ms << " delayed work items were scheduled within 1ms of request." << std::endl;
+ std::cout << " " << shared_state.within_10ms << " delayed work items were scheduled within 10ms of request." << std::endl;
+ std::cout << " " << shared_state.within_100ms << " delayed work items were scheduled within 100ms of request." << std::endl;
+ std::cout << " " << shared_state.over_100ms << " delayed work items were scheduled over 100ms of request." << std::endl;
+ BOOST_CHECK(shared_state.within_1ms > 0);
+ BOOST_CHECK(shared_state.over_100ms < 10);
+}
+
static inline void TestDynamicThreadPoolGroupNestingWorks()
{
if(std::thread::hardware_concurrency() < 4)
@@ -477,6 +571,9 @@ static inline void TestDynamicThreadPoolGroupIoAwareWorks()
KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, works, "Tests that llfio::dynamic_thread_pool_group works as expected",
TestDynamicThreadPoolGroupWorks())
+KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, delay,
+ "Tests that setting a delay in a llfio::dynamic_thread_pool_group::work_item works as expected",
+ TestDynamicThreadPoolGroupWorkItemDelayWorks())
KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, nested, "Tests that nesting of llfio::dynamic_thread_pool_group works as expected",
TestDynamicThreadPoolGroupNestingWorks())
KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, io_aware_work_item,