Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/windirstat/llfio.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2021-04-15 13:37:07 +0300
committerNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2021-04-15 13:37:07 +0300
commitdb383d69b71ae5dab99e8e90699629c1a684f4ee (patch)
tree5cf5803690157aa8c2382257292b687064112165 /programs
parent2040dcb520d92328a9318b948c380e1692f41249 (diff)
Merge https://github.com/ned14/llfio/pull/77 into my current working tree and push the bits which are safe to push for now.
Diffstat (limited to 'programs')
-rw-r--r--programs/benchmark-io-congestion/main.cpp197
1 files changed, 161 insertions, 36 deletions
diff --git a/programs/benchmark-io-congestion/main.cpp b/programs/benchmark-io-congestion/main.cpp
index e54cb25a..ea902938 100644
--- a/programs/benchmark-io-congestion/main.cpp
+++ b/programs/benchmark-io-congestion/main.cpp
@@ -25,11 +25,11 @@ Distributed under the Boost Software License, Version 1.0.
//! Seconds to run the benchmark
static constexpr unsigned BENCHMARK_DURATION = 10;
//! Maximum work items to create
-static constexpr unsigned MAX_WORK_ITEMS = 512;
+static constexpr unsigned MAX_WORK_ITEMS = 4096;
// Size of buffer to SHA256
-static constexpr unsigned SHA256_BUFFER_SIZE = 1024 * 1024; // 1Mb
+static constexpr unsigned SHA256_BUFFER_SIZE = 4 * 1024; // 64Kb
// Size of test file
-static constexpr unsigned long long TEST_FILE_SIZE = 4ULL * SHA256_BUFFER_SIZE * MAX_WORK_ITEMS; // 4Gb
+static constexpr unsigned long long TEST_FILE_SIZE = 4ULL * 1024 * 1024 * 1024; // 4Gb
#include "../../include/llfio/llfio.hpp"
@@ -58,6 +58,12 @@ static constexpr unsigned long long TEST_FILE_SIZE = 4ULL * SHA256_BUFFER_SIZE *
namespace llfio = LLFIO_V2_NAMESPACE;
+struct benchmark_results
+{
+ std::chrono::microseconds duration;
+ llfio::utils::process_memory_usage memory_usage;
+};
+
inline QUICKCPPLIB_NOINLINE void memcpy_s(llfio::byte *dest, const llfio::byte *s, size_t len)
{
#if defined(__SSE2__) || defined(_M_X64) || (defined(_M_IX86_FP) && _M_IX86_FP >= 2)
@@ -119,14 +125,14 @@ inline QUICKCPPLIB_NOINLINE void memcpy_s(llfio::byte *dest, const llfio::byte *
}
}
-#if 0
-struct llfio_runner
+struct llfio_runner_unpaced
{
std::atomic<bool> cancel{false};
llfio::dynamic_thread_pool_group_ptr group = llfio::make_dynamic_thread_pool_group().value();
std::vector<llfio::dynamic_thread_pool_group::work_item *> workitems;
- ~llfio_runner()
+ llfio_runner_unpaced(llfio::io_handle * /*unused*/) {}
+ ~llfio_runner_unpaced()
{
for(auto *p : workitems)
{
@@ -137,9 +143,9 @@ struct llfio_runner
{
struct workitem final : public llfio::dynamic_thread_pool_group::work_item
{
- llfio_runner *parent;
+ llfio_runner_unpaced *parent;
F f;
- workitem(llfio_runner *_parent, F &&_f)
+ workitem(llfio_runner_unpaced *_parent, F &&_f)
: parent(_parent)
, f(std::move(_f))
{
@@ -153,18 +159,82 @@ struct llfio_runner
};
workitems.push_back(new workitem(this, std::move(f)));
}
- std::chrono::microseconds run(unsigned seconds)
+ benchmark_results run(unsigned seconds)
{
group->submit(workitems).value();
auto begin = std::chrono::steady_clock::now();
std::this_thread::sleep_for(std::chrono::seconds(seconds));
+ auto memusage = llfio::utils::current_process_memory_usage().value();
cancel.store(true, std::memory_order_release);
group->wait().value();
auto end = std::chrono::steady_clock::now();
- return std::chrono::duration_cast<std::chrono::microseconds>(end - begin);
+ return {std::chrono::duration_cast<std::chrono::microseconds>(end - begin), memusage};
}
};
+
+struct llfio_runner_paced
+{
+ std::atomic<bool> cancel{false};
+ llfio::dynamic_thread_pool_group_ptr group = llfio::make_dynamic_thread_pool_group().value();
+ llfio::dynamic_thread_pool_group::io_aware_work_item::io_handle_awareness awareness;
+ std::vector<llfio::dynamic_thread_pool_group::work_item *> workitems;
+ std::atomic<int64_t> last_pace{0};
+
+ llfio_runner_paced(llfio::io_handle *h)
+ : awareness{h, 1.0f /* 100% reads */}
+ {
+ }
+ ~llfio_runner_paced()
+ {
+ for(auto *p : workitems)
+ {
+ delete p;
+ }
+ }
+ template <class F> void add_workitem(F &&f)
+ {
+ struct workitem final : public llfio::dynamic_thread_pool_group::io_aware_work_item
+ {
+ llfio_runner_paced *parent;
+ F f;
+ workitem(llfio_runner_paced *_parent, F &&_f)
+ : llfio::dynamic_thread_pool_group::io_aware_work_item({&_parent->awareness, 1})
+ , parent(_parent)
+ , f(std::move(_f))
+ {
+ }
+ virtual intptr_t io_aware_next(llfio::deadline &d) noexcept override
+ {
+#if 1
+ auto last_pace = parent->last_pace.load(std::memory_order_relaxed);
+ if(last_pace != d.nsecs)
+ {
+ parent->last_pace.store(d.nsecs, std::memory_order_relaxed);
+ std::cout << "Pacing work by milliseconds " << (d.nsecs / 1000.0) << std::endl;
+ }
#endif
+ return parent->cancel.load(std::memory_order_relaxed) ? -1 : 1;
+ }
+ virtual llfio::result<void> operator()(intptr_t /*unused*/) noexcept override
+ {
+ f();
+ return llfio::success();
+ }
+ };
+ workitems.push_back(new workitem(this, std::move(f)));
+ }
+ benchmark_results run(unsigned seconds)
+ {
+ group->submit(workitems).value();
+ auto begin = std::chrono::steady_clock::now();
+ std::this_thread::sleep_for(std::chrono::seconds(seconds));
+ auto memusage = llfio::utils::current_process_memory_usage().value();
+ cancel.store(true, std::memory_order_release);
+ group->wait().value();
+ auto end = std::chrono::steady_clock::now();
+ return {std::chrono::duration_cast<std::chrono::microseconds>(end - begin), memusage};
+ }
+};
#if ENABLE_ASIO
@@ -173,6 +243,7 @@ struct asio_runner
std::atomic<bool> cancel{false};
asio::io_context ctx;
+ asio_runner(llfio::io_handle * /*unused*/) {}
template <class F> struct C
{
asio_runner *parent;
@@ -192,7 +263,7 @@ struct asio_runner
}
};
template <class F> void add_workitem(F &&f) { ctx.post(C<F>(this, std::move(f))); }
- std::chrono::microseconds run(unsigned seconds)
+ benchmark_results run(unsigned seconds)
{
std::vector<std::thread> threads;
auto do_cleanup = [&]() noexcept {
@@ -205,21 +276,37 @@ struct asio_runner
auto cleanup = llfio::make_scope_exit(do_cleanup);
for(size_t n = 0; n < MAX_WORK_ITEMS; n++)
{
- threads.emplace_back([&] { ctx.run(); });
+ try
+ {
+ threads.emplace_back([&] { ctx.run(); });
+ }
+ catch(...)
+ {
+ std::cerr << "Thread creation failed at number " << n << std::endl;
+ throw;
+ }
}
auto begin = std::chrono::steady_clock::now();
std::this_thread::sleep_for(std::chrono::seconds(seconds));
+ auto memusage = llfio::utils::current_process_memory_usage().value();
cleanup.release();
do_cleanup();
auto end = std::chrono::steady_clock::now();
- return std::chrono::duration_cast<std::chrono::microseconds>(end - begin);
+ return {std::chrono::duration_cast<std::chrono::microseconds>(end - begin), memusage};
}
};
#endif
-template <class Runner> void benchmark(llfio::span<llfio::byte> ioregion, const char *name)
+template <class Runner> void benchmark(llfio::mapped_file_handle &maph, const char *name)
{
- std::cout << "\nBenchmarking " << name << " ..." << std::endl;
+ if(name != nullptr)
+ {
+ std::cout << "\nBenchmarking " << name << " ..." << std::endl;
+ }
+ else
+ {
+ std::cout << "\nWarming up ..." << std::endl;
+ }
struct shared_t
{
llfio::span<llfio::byte> ioregion;
@@ -240,8 +327,12 @@ template <class Runner> void benchmark(llfio::span<llfio::byte> ioregion, const
{
shared->max_concurrency.store(concurrency, std::memory_order_relaxed);
}
+#if 1
+ auto offset = rand() % (TEST_FILE_SIZE - SHA256_BUFFER_SIZE - 1);
+#else
auto offset = rand() & (TEST_FILE_SIZE - 1);
offset &= ~(SHA256_BUFFER_SIZE - 1);
+#endif
hash = QUICKCPPLIB_NAMESPACE::algorithm::hash::sha256_hash::hash(shared->ioregion.data() + offset, SHA256_BUFFER_SIZE);
count++;
shared->concurrency.fetch_sub(1, std::memory_order_relaxed);
@@ -253,38 +344,57 @@ template <class Runner> void benchmark(llfio::span<llfio::byte> ioregion, const
}
};
std::vector<worker> workers;
- std::vector<std::tuple<size_t, double, unsigned>> results;
+ struct result_t
+ {
+ size_t items;
+ double throughput;
+ size_t paged_in;
+ unsigned max_concurrency;
+ };
+ std::vector<result_t> results;
for(size_t items = 1; items <= MAX_WORK_ITEMS; items <<= 1)
{
- shared_t shared{ioregion};
+ if(name == nullptr && items != 16)
+ {
+ continue;
+ }
+ shared_t shared{maph.map().as_span()};
workers.clear();
for(uint32_t n = 0; n < items; n++)
{
workers.emplace_back(&shared, n);
}
- Runner runner;
+ Runner runner(&maph);
for(auto &i : workers)
{
runner.add_workitem([&] { i(); });
}
- auto duration = runner.run(BENCHMARK_DURATION);
+ auto out = runner.run(BENCHMARK_DURATION);
uint64_t total = 0;
for(auto &i : workers)
{
total += i.count;
}
- results.emplace_back(items, 1000000.0 * total / duration.count(), shared.max_concurrency);
- std::cout << " For " << std::get<0>(results.back()) << " work items got " << std::get<1>(results.back()) << " SHA256 hashes/sec with "
- << (items * SHA256_BUFFER_SIZE / 1024.0 / 1024.0) << " Mb working set and " << std::get<2>(results.back()) << " maximum concurrency."
- << std::endl;
+ results.push_back({items, 1000000.0 * total / out.duration.count(), out.memory_usage.total_address_space_paged_in, shared.max_concurrency});
+ std::cout << " For " << results.back().items << " work items got " << results.back().throughput << " SHA256 hashes/sec with "
+ << (results.back().items * SHA256_BUFFER_SIZE / 1024.0 / 1024.0) << " Mb working set, " << results.back().max_concurrency
+ << " maximum concurrency, and " << (results.back().paged_in / 1024.0 / 1024.0) << " Mb paged in." << std::endl;
+ // std::cout << " " << (out.memory_usage.total_address_space_in_use / 1024.0 / 1024.0) << ","
+ // << (out.memory_usage.total_address_space_paged_in / 1024.0 / 1024.0) << "," << (out.memory_usage.private_committed / 1024.0 / 1024.0) << ","
+ // << (out.memory_usage.private_paged_in / 1024.0 / 1024.0) << std::endl;
}
- std::ofstream out(std::string(name) + "_results.csv");
- out << R"("Work items","SHA256 hashes/sec","Working set","Max concurrency")";
- for(auto &i : results)
+ if(name != nullptr)
{
- out << "\n" << std::get<0>(i) << "," << std::get<1>(i) << "," << (std::get<0>(i) * SHA256_BUFFER_SIZE) << "," << std::get<2>(i);
+ std::ofstream out(std::string(name) + "_results.csv");
+ out << R"("Work items","SHA256 hashes/sec","Working set","Max concurrency","Paged in")";
+ for(auto &i : results)
+ {
+ out << "\n"
+ << i.items << "," << i.throughput << "," << (i.items * SHA256_BUFFER_SIZE / 1024.0 / 1024.0) << "," << i.max_concurrency << ","
+ << (i.paged_in / 1024.0 / 1024.0);
+ }
+ out << std::endl;
}
- out << std::endl;
}
int main(int argc, char *argv[])
@@ -308,35 +418,50 @@ int main(int argc, char *argv[])
{
fileh.close().value();
}
+#if 0
else
{
- std::cout << "Extending " << (TEST_FILE_SIZE / 1024.0 / 1024.0) << " Mb test file at " << fileh.current_path().value() << " ..." << std::endl;
+ std::cout << "Prefaulting " << (TEST_FILE_SIZE / 1024.0 / 1024.0) << " Mb test file at " << fileh.current_path().value() << " ..." << std::endl;
std::vector<llfio::byte> buffer(SHA256_BUFFER_SIZE);
for(size_t n = 0; n < TEST_FILE_SIZE; n += SHA256_BUFFER_SIZE)
{
memcpy_s(buffer.data(), fileh.address() + n, SHA256_BUFFER_SIZE);
}
}
+#endif
}
if(!fileh.is_valid())
{
fileh = llfio::mapped_file_handle::mapped_file(TEST_FILE_SIZE, where, "testfile", llfio::mapped_file_handle::mode::write,
- llfio::mapped_file_handle::creation::if_needed, llfio::mapped_file_handle::caching::reads_and_metadata)
+ llfio::mapped_file_handle::creation::always_new, llfio::mapped_file_handle::caching::reads_and_metadata)
.value();
std::cout << "Writing " << (TEST_FILE_SIZE / 1024.0 / 1024.0) << " Mb test file at " << fileh.current_path().value() << " ..." << std::endl;
fileh.truncate(TEST_FILE_SIZE).value();
memset(fileh.address(), 0xff, TEST_FILE_SIZE);
}
+ benchmark<llfio_runner_unpaced>(fileh, nullptr);
+
#if 0
- std::string llfio_name("llfio (");
- llfio_name.append(llfio::dynamic_thread_pool_group::implementation_description());
- llfio_name.push_back(')');
- benchmark<llfio_runner>(llfio_name.c_str());
+ {
+ std::string llfio_name("llfio unpaced (");
+ llfio_name.append(llfio::dynamic_thread_pool_group::implementation_description());
+ llfio_name.push_back(')');
+ benchmark<llfio_runner_unpaced>(fileh, llfio_name.c_str());
+ }
#endif
-#if ENABLE_ASIO
- benchmark<asio_runner>(fileh.map().as_span(), "asio");
+#if 1
+ {
+ std::string llfio_name("llfio paced (");
+ llfio_name.append(llfio::dynamic_thread_pool_group::implementation_description());
+ llfio_name.push_back(')');
+ benchmark<llfio_runner_paced>(fileh, llfio_name.c_str());
+ }
+#endif
+
+#if 0 // ENABLE_ASIO
+ benchmark<asio_runner>(fileh, "asio");
#endif
std::cout << "\nReminder: you may wish to delete " << fileh.current_path().value() << std::endl;