diff options
Diffstat (limited to 'programs/benchmark-io-congestion/main.cpp')
-rw-r--r-- | programs/benchmark-io-congestion/main.cpp | 197 |
1 files changed, 161 insertions, 36 deletions
diff --git a/programs/benchmark-io-congestion/main.cpp b/programs/benchmark-io-congestion/main.cpp index e54cb25a..ea902938 100644 --- a/programs/benchmark-io-congestion/main.cpp +++ b/programs/benchmark-io-congestion/main.cpp @@ -25,11 +25,11 @@ Distributed under the Boost Software License, Version 1.0. //! Seconds to run the benchmark static constexpr unsigned BENCHMARK_DURATION = 10; //! Maximum work items to create -static constexpr unsigned MAX_WORK_ITEMS = 512; +static constexpr unsigned MAX_WORK_ITEMS = 4096; // Size of buffer to SHA256 -static constexpr unsigned SHA256_BUFFER_SIZE = 1024 * 1024; // 1Mb +static constexpr unsigned SHA256_BUFFER_SIZE = 4 * 1024; // 64Kb // Size of test file -static constexpr unsigned long long TEST_FILE_SIZE = 4ULL * SHA256_BUFFER_SIZE * MAX_WORK_ITEMS; // 4Gb +static constexpr unsigned long long TEST_FILE_SIZE = 4ULL * 1024 * 1024 * 1024; // 4Gb #include "../../include/llfio/llfio.hpp" @@ -58,6 +58,12 @@ static constexpr unsigned long long TEST_FILE_SIZE = 4ULL * SHA256_BUFFER_SIZE * namespace llfio = LLFIO_V2_NAMESPACE; +struct benchmark_results +{ + std::chrono::microseconds duration; + llfio::utils::process_memory_usage memory_usage; +}; + inline QUICKCPPLIB_NOINLINE void memcpy_s(llfio::byte *dest, const llfio::byte *s, size_t len) { #if defined(__SSE2__) || defined(_M_X64) || (defined(_M_IX86_FP) && _M_IX86_FP >= 2) @@ -119,14 +125,14 @@ inline QUICKCPPLIB_NOINLINE void memcpy_s(llfio::byte *dest, const llfio::byte * } } -#if 0 -struct llfio_runner +struct llfio_runner_unpaced { std::atomic<bool> cancel{false}; llfio::dynamic_thread_pool_group_ptr group = llfio::make_dynamic_thread_pool_group().value(); std::vector<llfio::dynamic_thread_pool_group::work_item *> workitems; - ~llfio_runner() + llfio_runner_unpaced(llfio::io_handle * /*unused*/) {} + ~llfio_runner_unpaced() { for(auto *p : workitems) { @@ -137,9 +143,9 @@ struct llfio_runner { struct workitem final : public llfio::dynamic_thread_pool_group::work_item { - llfio_runner *parent; + llfio_runner_unpaced *parent; F f; - workitem(llfio_runner *_parent, F &&_f) + workitem(llfio_runner_unpaced *_parent, F &&_f) : parent(_parent) , f(std::move(_f)) { @@ -153,18 +159,82 @@ struct llfio_runner }; workitems.push_back(new workitem(this, std::move(f))); } - std::chrono::microseconds run(unsigned seconds) + benchmark_results run(unsigned seconds) { group->submit(workitems).value(); auto begin = std::chrono::steady_clock::now(); std::this_thread::sleep_for(std::chrono::seconds(seconds)); + auto memusage = llfio::utils::current_process_memory_usage().value(); cancel.store(true, std::memory_order_release); group->wait().value(); auto end = std::chrono::steady_clock::now(); - return std::chrono::duration_cast<std::chrono::microseconds>(end - begin); + return {std::chrono::duration_cast<std::chrono::microseconds>(end - begin), memusage}; } }; + +struct llfio_runner_paced +{ + std::atomic<bool> cancel{false}; + llfio::dynamic_thread_pool_group_ptr group = llfio::make_dynamic_thread_pool_group().value(); + llfio::dynamic_thread_pool_group::io_aware_work_item::io_handle_awareness awareness; + std::vector<llfio::dynamic_thread_pool_group::work_item *> workitems; + std::atomic<int64_t> last_pace{0}; + + llfio_runner_paced(llfio::io_handle *h) + : awareness{h, 1.0f /* 100% reads */} + { + } + ~llfio_runner_paced() + { + for(auto *p : workitems) + { + delete p; + } + } + template <class F> void add_workitem(F &&f) + { + struct workitem final : public llfio::dynamic_thread_pool_group::io_aware_work_item + { + llfio_runner_paced *parent; + F f; + workitem(llfio_runner_paced *_parent, F &&_f) + : llfio::dynamic_thread_pool_group::io_aware_work_item({&_parent->awareness, 1}) + , parent(_parent) + , f(std::move(_f)) + { + } + virtual intptr_t io_aware_next(llfio::deadline &d) noexcept override + { +#if 1 + auto last_pace = parent->last_pace.load(std::memory_order_relaxed); + if(last_pace != d.nsecs) + { + parent->last_pace.store(d.nsecs, std::memory_order_relaxed); + std::cout << "Pacing work by milliseconds " << (d.nsecs / 1000.0) << std::endl; + } #endif + return parent->cancel.load(std::memory_order_relaxed) ? -1 : 1; + } + virtual llfio::result<void> operator()(intptr_t /*unused*/) noexcept override + { + f(); + return llfio::success(); + } + }; + workitems.push_back(new workitem(this, std::move(f))); + } + benchmark_results run(unsigned seconds) + { + group->submit(workitems).value(); + auto begin = std::chrono::steady_clock::now(); + std::this_thread::sleep_for(std::chrono::seconds(seconds)); + auto memusage = llfio::utils::current_process_memory_usage().value(); + cancel.store(true, std::memory_order_release); + group->wait().value(); + auto end = std::chrono::steady_clock::now(); + return {std::chrono::duration_cast<std::chrono::microseconds>(end - begin), memusage}; + } +}; #if ENABLE_ASIO @@ -173,6 +243,7 @@ struct asio_runner std::atomic<bool> cancel{false}; asio::io_context ctx; + asio_runner(llfio::io_handle * /*unused*/) {} template <class F> struct C { asio_runner *parent; @@ -192,7 +263,7 @@ struct asio_runner } }; template <class F> void add_workitem(F &&f) { ctx.post(C<F>(this, std::move(f))); } - std::chrono::microseconds run(unsigned seconds) + benchmark_results run(unsigned seconds) { std::vector<std::thread> threads; auto do_cleanup = [&]() noexcept { @@ -205,21 +276,37 @@ struct asio_runner auto cleanup = llfio::make_scope_exit(do_cleanup); for(size_t n = 0; n < MAX_WORK_ITEMS; n++) { - threads.emplace_back([&] { ctx.run(); }); + try + { + threads.emplace_back([&] { ctx.run(); }); + } + catch(...) + { + std::cerr << "Thread creation failed at number " << n << std::endl; + throw; + } } auto begin = std::chrono::steady_clock::now(); std::this_thread::sleep_for(std::chrono::seconds(seconds)); + auto memusage = llfio::utils::current_process_memory_usage().value(); cleanup.release(); do_cleanup(); auto end = std::chrono::steady_clock::now(); - return std::chrono::duration_cast<std::chrono::microseconds>(end - begin); + return {std::chrono::duration_cast<std::chrono::microseconds>(end - begin), memusage}; } }; #endif -template <class Runner> void benchmark(llfio::span<llfio::byte> ioregion, const char *name) +template <class Runner> void benchmark(llfio::mapped_file_handle &maph, const char *name) { - std::cout << "\nBenchmarking " << name << " ..." << std::endl; + if(name != nullptr) + { + std::cout << "\nBenchmarking " << name << " ..." << std::endl; + } + else + { + std::cout << "\nWarming up ..." << std::endl; + } struct shared_t { llfio::span<llfio::byte> ioregion; @@ -240,8 +327,12 @@ template <class Runner> void benchmark(llfio::span<llfio::byte> ioregion, const { shared->max_concurrency.store(concurrency, std::memory_order_relaxed); } +#if 1 + auto offset = rand() % (TEST_FILE_SIZE - SHA256_BUFFER_SIZE - 1); +#else auto offset = rand() & (TEST_FILE_SIZE - 1); offset &= ~(SHA256_BUFFER_SIZE - 1); +#endif hash = QUICKCPPLIB_NAMESPACE::algorithm::hash::sha256_hash::hash(shared->ioregion.data() + offset, SHA256_BUFFER_SIZE); count++; shared->concurrency.fetch_sub(1, std::memory_order_relaxed); @@ -253,38 +344,57 @@ template <class Runner> void benchmark(llfio::span<llfio::byte> ioregion, const } }; std::vector<worker> workers; - std::vector<std::tuple<size_t, double, unsigned>> results; + struct result_t + { + size_t items; + double throughput; + size_t paged_in; + unsigned max_concurrency; + }; + std::vector<result_t> results; for(size_t items = 1; items <= MAX_WORK_ITEMS; items <<= 1) { - shared_t shared{ioregion}; + if(name == nullptr && items != 16) + { + continue; + } + shared_t shared{maph.map().as_span()}; workers.clear(); for(uint32_t n = 0; n < items; n++) { workers.emplace_back(&shared, n); } - Runner runner; + Runner runner(&maph); for(auto &i : workers) { runner.add_workitem([&] { i(); }); } - auto duration = runner.run(BENCHMARK_DURATION); + auto out = runner.run(BENCHMARK_DURATION); uint64_t total = 0; for(auto &i : workers) { total += i.count; } - results.emplace_back(items, 1000000.0 * total / duration.count(), shared.max_concurrency); - std::cout << " For " << std::get<0>(results.back()) << " work items got " << std::get<1>(results.back()) << " SHA256 hashes/sec with " - << (items * SHA256_BUFFER_SIZE / 1024.0 / 1024.0) << " Mb working set and " << std::get<2>(results.back()) << " maximum concurrency." - << std::endl; + results.push_back({items, 1000000.0 * total / out.duration.count(), out.memory_usage.total_address_space_paged_in, shared.max_concurrency}); + std::cout << " For " << results.back().items << " work items got " << results.back().throughput << " SHA256 hashes/sec with " + << (results.back().items * SHA256_BUFFER_SIZE / 1024.0 / 1024.0) << " Mb working set, " << results.back().max_concurrency + << " maximum concurrency, and " << (results.back().paged_in / 1024.0 / 1024.0) << " Mb paged in." << std::endl; + // std::cout << " " << (out.memory_usage.total_address_space_in_use / 1024.0 / 1024.0) << "," + // << (out.memory_usage.total_address_space_paged_in / 1024.0 / 1024.0) << "," << (out.memory_usage.private_committed / 1024.0 / 1024.0) << "," + // << (out.memory_usage.private_paged_in / 1024.0 / 1024.0) << std::endl; } - std::ofstream out(std::string(name) + "_results.csv"); - out << R"("Work items","SHA256 hashes/sec","Working set","Max concurrency")"; - for(auto &i : results) + if(name != nullptr) { - out << "\n" << std::get<0>(i) << "," << std::get<1>(i) << "," << (std::get<0>(i) * SHA256_BUFFER_SIZE) << "," << std::get<2>(i); + std::ofstream out(std::string(name) + "_results.csv"); + out << R"("Work items","SHA256 hashes/sec","Working set","Max concurrency","Paged in")"; + for(auto &i : results) + { + out << "\n" + << i.items << "," << i.throughput << "," << (i.items * SHA256_BUFFER_SIZE / 1024.0 / 1024.0) << "," << i.max_concurrency << "," + << (i.paged_in / 1024.0 / 1024.0); + } + out << std::endl; } - out << std::endl; } int main(int argc, char *argv[]) @@ -308,35 +418,50 @@ int main(int argc, char *argv[]) { fileh.close().value(); } +#if 0 else { - std::cout << "Extending " << (TEST_FILE_SIZE / 1024.0 / 1024.0) << " Mb test file at " << fileh.current_path().value() << " ..." << std::endl; + std::cout << "Prefaulting " << (TEST_FILE_SIZE / 1024.0 / 1024.0) << " Mb test file at " << fileh.current_path().value() << " ..." << std::endl; std::vector<llfio::byte> buffer(SHA256_BUFFER_SIZE); for(size_t n = 0; n < TEST_FILE_SIZE; n += SHA256_BUFFER_SIZE) { memcpy_s(buffer.data(), fileh.address() + n, SHA256_BUFFER_SIZE); } } +#endif } if(!fileh.is_valid()) { fileh = llfio::mapped_file_handle::mapped_file(TEST_FILE_SIZE, where, "testfile", llfio::mapped_file_handle::mode::write, - llfio::mapped_file_handle::creation::if_needed, llfio::mapped_file_handle::caching::reads_and_metadata) + llfio::mapped_file_handle::creation::always_new, llfio::mapped_file_handle::caching::reads_and_metadata) .value(); std::cout << "Writing " << (TEST_FILE_SIZE / 1024.0 / 1024.0) << " Mb test file at " << fileh.current_path().value() << " ..." << std::endl; fileh.truncate(TEST_FILE_SIZE).value(); memset(fileh.address(), 0xff, TEST_FILE_SIZE); } + benchmark<llfio_runner_unpaced>(fileh, nullptr); + #if 0 - std::string llfio_name("llfio ("); - llfio_name.append(llfio::dynamic_thread_pool_group::implementation_description()); - llfio_name.push_back(')'); - benchmark<llfio_runner>(llfio_name.c_str()); + { + std::string llfio_name("llfio unpaced ("); + llfio_name.append(llfio::dynamic_thread_pool_group::implementation_description()); + llfio_name.push_back(')'); + benchmark<llfio_runner_unpaced>(fileh, llfio_name.c_str()); + } #endif -#if ENABLE_ASIO - benchmark<asio_runner>(fileh.map().as_span(), "asio"); +#if 1 + { + std::string llfio_name("llfio paced ("); + llfio_name.append(llfio::dynamic_thread_pool_group::implementation_description()); + llfio_name.push_back(')'); + benchmark<llfio_runner_paced>(fileh, llfio_name.c_str()); + } +#endif + +#if 0 // ENABLE_ASIO + benchmark<asio_runner>(fileh, "asio"); #endif std::cout << "\nReminder: you may wish to delete " << fileh.current_path().value() << std::endl; |