Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/windirstat/llfio.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2020-04-22 13:39:15 +0300
committerNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2020-04-22 13:39:15 +0300
commitd3afbce5fe3f63b47dba9b14057d0cbd018e513e (patch)
tree8a25a155e413113dd6a6d09f4a30f622921e6d43 /test
parent181e7310b40b574ba39ab411b93f87ecac689ed9 (diff)
Port async pipe handle test over from resumable i/o branch
Diffstat (limited to 'test')
-rw-r--r--test/tests/pipe_handle.cpp284
1 files changed, 284 insertions, 0 deletions
diff --git a/test/tests/pipe_handle.cpp b/test/tests/pipe_handle.cpp
index b266af10..0f514859 100644
--- a/test/tests/pipe_handle.cpp
+++ b/test/tests/pipe_handle.cpp
@@ -25,6 +25,7 @@ Distributed under the Boost Software License, Version 1.0.
#include "../test_kernel_decl.hpp"
#include <future>
+#include <unordered_set>
static inline void TestBlockingPipeHandle()
{
@@ -93,5 +94,288 @@ static inline void TestNonBlockingPipeHandle()
reader.close().value();
}
+#if LLFIO_ENABLE_TEST_IO_MULTIPLEXERS
+static inline void TestMultiplexedPipeHandle()
+{
+ static constexpr size_t MAX_PIPES = 64;
+ namespace llfio = LLFIO_V2_NAMESPACE;
+ auto test_multiplexer = [](llfio::io_multiplexer_ptr multiplexer) {
+ std::vector<llfio::pipe_handle> read_pipes, write_pipes;
+ std::vector<size_t> received_for(MAX_PIPES);
+ struct checking_receiver final : public llfio::io_multiplexer::io_operation_state_visitor
+ {
+ size_t myindex;
+ std::unique_ptr<llfio::byte[]> io_state_ptr;
+ std::vector<size_t> &received_for;
+ union {
+ llfio::byte _buffer[sizeof(size_t)];
+ size_t _index;
+ };
+ llfio::pipe_handle::buffer_type buffer;
+ llfio::io_multiplexer::io_operation_state *io_state{nullptr};
+
+ checking_receiver(size_t _myindex, llfio::io_multiplexer_ptr &multiplexer, std::vector<size_t> &r)
+ : myindex(_myindex)
+ , io_state_ptr(std::make_unique<llfio::byte[]>(multiplexer->io_state_requirements().first))
+ , received_for(r)
+ , buffer(_buffer, sizeof(_buffer))
+ {
+ memset(_buffer, 0, sizeof(_buffer));
+ }
+ checking_receiver(const checking_receiver &) = delete;
+ checking_receiver(checking_receiver &&o) = default;
+ checking_receiver &operator=(const checking_receiver &) = delete;
+ checking_receiver &operator=(checking_receiver &&) = default;
+ ~checking_receiver()
+ {
+ if(io_state != nullptr)
+ {
+ if(!is_finished(io_state->current_state()))
+ {
+ abort();
+ }
+ io_state->~io_operation_state();
+ io_state = nullptr;
+ }
+ }
+
+ // Initiated the read
+ llfio::result<void> read_begin(llfio::io_multiplexer_ptr &multiplexer, llfio::io_handle &h)
+ {
+ if(io_state != nullptr)
+ {
+ BOOST_REQUIRE(is_finished(io_state->current_state()));
+ io_state->~io_operation_state();
+ io_state = nullptr;
+ }
+ buffer = {_buffer, sizeof(_buffer)};
+ OUTCOME_TRY(s, multiplexer->init_io_operation({io_state_ptr.get(), 4096 /*lies*/}, &h, this, {}, {}, llfio::pipe_handle::io_request<llfio::pipe_handle::buffers_type>({&buffer, 1}, 0)));
+ io_state = s;
+ return llfio::success();
+ }
+
+ // Called if the read did not complete immediately
+ virtual void read_initiated(llfio::io_multiplexer::io_operation_state * /*state*/, llfio::io_operation_state_type /*former*/) override { std::cout << " Pipe " << myindex << " will complete read later" << std::endl; }
+
+ // Called when the read completes
+ virtual void read_completed(llfio::io_multiplexer::io_operation_state * /*state*/, llfio::io_operation_state_type former, llfio::pipe_handle::io_result<llfio::pipe_handle::buffers_type> &&res) override
+ {
+ if(is_initialised(former))
+ {
+ std::cout << " Pipe " << myindex << " read completes immediately" << std::endl;
+ }
+ else
+ {
+ std::cout << " Pipe " << myindex << " read completes asynchronously" << std::endl;
+ }
+ BOOST_CHECK(res.has_value());
+ if(res)
+ {
+ BOOST_REQUIRE(res.value().size() == 1);
+ BOOST_CHECK(res.value()[0].data() == _buffer);
+ BOOST_CHECK(res.value()[0].size() == sizeof(size_t));
+ BOOST_REQUIRE(_index < MAX_PIPES);
+ BOOST_CHECK(_index == myindex);
+ received_for[_index]++;
+ }
+ }
+
+ // Called when the state for the read can be disposed
+ virtual void read_finished(llfio::io_multiplexer::io_operation_state * /*state*/, llfio::io_operation_state_type former) override
+ {
+ std::cout << " Pipe " << myindex << " read finishes" << std::endl;
+ BOOST_REQUIRE(former == llfio::io_operation_state_type::read_completed);
+ }
+ };
+ std::vector<checking_receiver> async_reads;
+ for(size_t n = 0; n < MAX_PIPES; n++)
+ {
+ auto ret = llfio::pipe_handle::anonymous_pipe(llfio::pipe_handle::caching::reads, llfio::pipe_handle::flag::multiplexable).value();
+ ret.first.set_multiplexer(multiplexer.get()).value();
+ async_reads.push_back(checking_receiver(n, multiplexer, received_for));
+ read_pipes.push_back(std::move(ret.first));
+ write_pipes.push_back(std::move(ret.second));
+ }
+ auto writerthread = std::async([&] {
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
+ for(size_t n = MAX_PIPES - 1; n < MAX_PIPES; n--)
+ {
+ auto r = write_pipes[n].write(0, {{(llfio::byte *) &n, sizeof(n)}});
+ if(!r)
+ {
+ abort();
+ }
+ }
+ });
+ // Start the pipe reads. They cannot move in memory until complete
+ for(size_t n = 0; n < MAX_PIPES; n++)
+ {
+ async_reads[n].read_begin(multiplexer, read_pipes[n]).value();
+ }
+ // Wait for all reads to complete
+ for(size_t n = 0; n < MAX_PIPES; n++)
+ {
+ // Spin until this i/o completes
+ for(;;)
+ {
+ auto state = multiplexer->check_io_operation(async_reads[n].io_state);
+ if(is_completed(state) || is_finished(state))
+ {
+ break;
+ }
+ }
+ }
+ for(size_t n = 0; n < MAX_PIPES; n++)
+ {
+ BOOST_CHECK(received_for[n] == 1);
+ }
+ // Wait for all reads to finish
+ for(size_t n = 0; n < MAX_PIPES; n++)
+ {
+ llfio::io_operation_state_type state ;
+ while(!is_finished(state = async_reads[n].io_state->current_state()))
+ {
+ multiplexer->check_for_any_completed_io().value();
+ }
+ }
+ writerthread.get();
+ };
+#ifdef _WIN32
+ std::cout << "\nSingle threaded IOCP, immediate completions:\n";
+ test_multiplexer(llfio::test::multiplexer_win_iocp(1, false).value());
+ std::cout << "\nSingle threaded IOCP, reactor completions:\n";
+ test_multiplexer(llfio::test::multiplexer_win_iocp(1, true).value());
+ std::cout << "\nMultithreaded IOCP, immediate completions:\n";
+ test_multiplexer(llfio::test::multiplexer_win_iocp(2, false).value());
+ std::cout << "\nMultithreaded IOCP, reactor completions:\n";
+ test_multiplexer(llfio::test::multiplexer_win_iocp(2, true).value());
+#else
+#error Not implemented yet
+#endif
+}
+
+#if LLFIO_ENABLE_COROUTINES && 0
+static inline void TestCoroutinedPipeHandle()
+{
+ static constexpr size_t MAX_PIPES = 70;
+ namespace llfio = LLFIO_V2_NAMESPACE;
+ struct io_visitor
+ {
+ using container_type = std::unordered_set<llfio::io_awaitable<llfio::async_read, io_visitor> *>;
+ container_type &c;
+ void await_suspend(container_type::value_type i) { c.insert(i); }
+ void await_resume(container_type::value_type i) { c.erase(i); }
+ };
+ io_visitor::container_type io_pending;
+ struct coroutine
+ {
+ llfio::pipe_handle read_pipe, write_pipe;
+ size_t received_for{0};
+
+ explicit coroutine(llfio::pipe_handle &&r, llfio::pipe_handle &&w)
+ : read_pipe(std::move(r))
+ , write_pipe(std::move(w))
+ {
+ }
+ llfio::eager<llfio::result<void>> operator()(io_visitor::container_type &io_pending)
+ {
+ union {
+ llfio::byte _buffer[sizeof(size_t)];
+ size_t _index;
+ };
+ llfio::pipe_handle::buffer_type buffer;
+ for(;;)
+ {
+ buffer = {_buffer, sizeof(_buffer)};
+ // This will never return if the coroutine gets cancelled
+ auto r = co_await read_pipe.co_read(io_visitor{io_pending}, {{buffer}, 0});
+ if(!r)
+ {
+ co_return r.error();
+ }
+ BOOST_CHECK(r.value().size() == 1);
+ BOOST_CHECK(r.value()[0].size() == sizeof(_buffer));
+ ++received_for;
+ }
+ }
+ };
+ std::vector<coroutine> coroutines;
+ auto multiplexer = llfio::this_thread::multiplexer();
+ for(size_t n = 0; n < MAX_PIPES; n++)
+ {
+ auto ret = llfio::pipe_handle::anonymous_pipe(llfio::pipe_handle::caching::reads, llfio::pipe_handle::flag::multiplexable).value();
+ ret.first.set_multiplexer(multiplexer).value();
+ coroutines.push_back(coroutine(std::move(ret.first), std::move(ret.second)));
+ }
+ // Start the coroutines, all of whom will begin a read and then suspend
+ std::vector<llfio::optional<llfio::eager<llfio::result<void>>>> states(MAX_PIPES);
+ for(size_t n = 0; n < MAX_PIPES; n++)
+ {
+ states[n].emplace(coroutines[n](io_pending));
+ }
+ // Write to all the pipes, then pump coroutine resumption until all completions done
+ for(size_t i = 0; i < 10; i++)
+ {
+ for(size_t n = MAX_PIPES - 1; n < MAX_PIPES; n--)
+ {
+ coroutines[n].write_pipe.write(0, {{(llfio::byte *) &i, sizeof(i)}}).value();
+ }
+ // Take a copy of all pending i/o
+ std::vector<io_visitor::container_type::value_type> copy(io_pending.begin(), io_pending.end());
+ for(;;)
+ {
+ // Manually check if an i/o completion is ready, avoiding any syscalls
+ bool need_to_poll = true;
+ for(auto it = copy.begin(); it != copy.end();)
+ {
+ if((*it)->await_ready())
+ {
+ need_to_poll = false;
+ it = copy.erase(it);
+ std::cout << "Completed an i/o without syscall" << std::endl;
+ }
+ else
+ ++it;
+ }
+ if(need_to_poll)
+ {
+ // Have the kernel tell me when an i/o completion is ready
+ auto r = multiplexer->complete_io();
+ BOOST_CHECK(r.value() != 0);
+ if(r.value() < 0)
+ {
+ for(size_t n = 0; n < MAX_PIPES; n++)
+ {
+ BOOST_CHECK(coroutines[n].received_for == i + 1);
+ }
+ break;
+ }
+ }
+ }
+ }
+ // Rethrow any failures
+ for(size_t n = 0; n < MAX_PIPES; n++)
+ {
+ if(states[n]->await_ready())
+ {
+ states[n]->await_resume().value();
+ }
+ }
+ // Destruction of coroutines when they are suspended must work.
+ // This will cancel any pending i/o and immediately exit the
+ // coroutines
+ states.clear();
+ // Now clear all the coroutines
+ coroutines.clear();
+}
+#endif
+#endif
+
KERNELTEST_TEST_KERNEL(integration, llfio, pipe_handle, blocking, "Tests that blocking llfio::pipe_handle works as expected", TestBlockingPipeHandle())
KERNELTEST_TEST_KERNEL(integration, llfio, pipe_handle, nonblocking, "Tests that nonblocking llfio::pipe_handle works as expected", TestNonBlockingPipeHandle())
+#if LLFIO_ENABLE_TEST_IO_MULTIPLEXERS
+KERNELTEST_TEST_KERNEL(integration, llfio, pipe_handle, multiplexed, "Tests that multiplexed llfio::pipe_handle works as expected", TestMultiplexedPipeHandle())
+#if LLFIO_ENABLE_COROUTINES && 0
+KERNELTEST_TEST_KERNEL(integration, llfio, pipe_handle, coroutined, "Tests that coroutined llfio::pipe_handle works as expected", TestCoroutinedPipeHandle())
+#endif
+#endif