Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/windirstat/llfio.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2020-04-28 13:00:09 +0300
committerNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2020-04-28 13:00:09 +0300
commit65693556c49ab446c25f5a9810f1a9d242790db6 (patch)
tree7145dc634817abf8bb926b25fac16915afb4a12a /test
parent720c2c5257220946e8679ffd43cafc4649adfe26 (diff)
Coroutines pipe test has been ported from resumable_io branch.
Diffstat (limited to 'test')
-rw-r--r--test/tests/pipe_handle.cpp184
1 files changed, 90 insertions, 94 deletions
diff --git a/test/tests/pipe_handle.cpp b/test/tests/pipe_handle.cpp
index 69727ff2..a56c1025 100644
--- a/test/tests/pipe_handle.cpp
+++ b/test/tests/pipe_handle.cpp
@@ -154,10 +154,10 @@ static inline void TestMultiplexedPipeHandle()
}
// Called if the read did not complete immediately
- virtual void read_initiated(llfio::io_multiplexer::io_operation_state * /*state*/, llfio::io_operation_state_type /*former*/) override { std::cout << " Pipe " << myindex << " will complete read later" << std::endl; }
+ virtual void read_initiated(llfio::io_multiplexer::io_operation_state::lock_guard & /*g*/, llfio::io_operation_state_type /*former*/) override { std::cout << " Pipe " << myindex << " will complete read later" << std::endl; }
// Called when the read completes
- virtual bool read_completed(llfio::io_multiplexer::io_operation_state * /*state*/, llfio::io_operation_state_type former, llfio::pipe_handle::io_result<llfio::pipe_handle::buffers_type> &&res) override
+ virtual bool read_completed(llfio::io_multiplexer::io_operation_state::lock_guard & /*g*/, llfio::io_operation_state_type former, llfio::pipe_handle::io_result<llfio::pipe_handle::buffers_type> &&res) override
{
if(is_initialised(former))
{
@@ -181,7 +181,7 @@ static inline void TestMultiplexedPipeHandle()
}
// Called when the state for the read can be disposed
- virtual void read_finished(llfio::io_multiplexer::io_operation_state * /*state*/, llfio::io_operation_state_type former) override
+ virtual void read_finished(llfio::io_multiplexer::io_operation_state::lock_guard & /*g*/, llfio::io_operation_state_type former) override
{
std::cout << " Pipe " << myindex << " read finishes" << std::endl;
BOOST_REQUIRE(former == llfio::io_operation_state_type::read_completed);
@@ -232,7 +232,7 @@ static inline void TestMultiplexedPipeHandle()
// Wait for all reads to finish
for(size_t n = 0; n < MAX_PIPES; n++)
{
- llfio::io_operation_state_type state ;
+ llfio::io_operation_state_type state;
while(!is_finished(state = async_reads[n].io_state->current_state()))
{
multiplexer->check_for_any_completed_io().value();
@@ -254,119 +254,115 @@ static inline void TestMultiplexedPipeHandle()
#endif
}
-#if LLFIO_ENABLE_COROUTINES && 0
+#if LLFIO_ENABLE_COROUTINES
static inline void TestCoroutinedPipeHandle()
{
static constexpr size_t MAX_PIPES = 70;
namespace llfio = LLFIO_V2_NAMESPACE;
- struct io_visitor
- {
- using container_type = std::unordered_set<llfio::io_awaitable<llfio::async_read, io_visitor> *>;
- container_type &c;
- void await_suspend(container_type::value_type i) { c.insert(i); }
- void await_resume(container_type::value_type i) { c.erase(i); }
- };
- io_visitor::container_type io_pending;
- struct coroutine
- {
- llfio::pipe_handle read_pipe, write_pipe;
- size_t received_for{0};
-
- explicit coroutine(llfio::pipe_handle &&r, llfio::pipe_handle &&w)
- : read_pipe(std::move(r))
- , write_pipe(std::move(w))
- {
- }
- llfio::eager<llfio::result<void>> operator()(io_visitor::container_type &io_pending)
+ auto test_multiplexer = [](llfio::io_multiplexer_ptr multiplexer) {
+ struct coroutine
{
- union {
- llfio::byte _buffer[sizeof(size_t)];
- size_t _index;
- };
- llfio::pipe_handle::buffer_type buffer;
- for(;;)
+ llfio::pipe_handle read_pipe, write_pipe;
+ size_t received_for{0};
+
+ explicit coroutine(llfio::pipe_handle &&r, llfio::pipe_handle &&w)
+ : read_pipe(std::move(r))
+ , write_pipe(std::move(w))
{
- buffer = {_buffer, sizeof(_buffer)};
- // This will never return if the coroutine gets cancelled
- auto r = co_await read_pipe.co_read(io_visitor{io_pending}, {{buffer}, 0});
- if(!r)
+ }
+ llfio::eager<llfio::result<void>> operator()()
+ {
+ union {
+ llfio::byte _buffer[sizeof(size_t)];
+ size_t _index;
+ };
+ llfio::pipe_handle::buffer_type buffer;
+ for(;;)
{
- co_return r.error();
+ buffer = {_buffer, sizeof(_buffer)};
+ // This will never return if the coroutine gets cancelled
+ auto r = co_await read_pipe.co_read({{&buffer, 1}, 0});
+ if(!r)
+ {
+ co_return r.error();
+ }
+ BOOST_CHECK(r.value().size() == 1);
+ BOOST_CHECK(r.value()[0].size() == sizeof(_buffer));
+ received_for+=_index;
}
- BOOST_CHECK(r.value().size() == 1);
- BOOST_CHECK(r.value()[0].size() == sizeof(_buffer));
- ++received_for;
}
+ };
+ std::vector<coroutine> coroutines;
+ for(size_t n = 0; n < MAX_PIPES; n++)
+ {
+ auto ret = llfio::pipe_handle::anonymous_pipe(llfio::pipe_handle::caching::reads, llfio::pipe_handle::flag::multiplexable).value();
+ ret.first.set_multiplexer(multiplexer.get()).value();
+ coroutines.push_back(coroutine(std::move(ret.first), std::move(ret.second)));
}
- };
- std::vector<coroutine> coroutines;
- auto multiplexer = llfio::this_thread::multiplexer();
- for(size_t n = 0; n < MAX_PIPES; n++)
- {
- auto ret = llfio::pipe_handle::anonymous_pipe(llfio::pipe_handle::caching::reads, llfio::pipe_handle::flag::multiplexable).value();
- ret.first.set_multiplexer(multiplexer).value();
- coroutines.push_back(coroutine(std::move(ret.first), std::move(ret.second)));
- }
- // Start the coroutines, all of whom will begin a read and then suspend
- std::vector<llfio::optional<llfio::eager<llfio::result<void>>>> states(MAX_PIPES);
- for(size_t n = 0; n < MAX_PIPES; n++)
- {
- states[n].emplace(coroutines[n](io_pending));
- }
- // Write to all the pipes, then pump coroutine resumption until all completions done
- for(size_t i = 0; i < 10; i++)
- {
- for(size_t n = MAX_PIPES - 1; n < MAX_PIPES; n--)
+ // Start the coroutines, all of whom will begin a read and then suspend
+ std::vector<llfio::optional<llfio::eager<llfio::result<void>>>> states(MAX_PIPES);
+ for(size_t n = 0; n < MAX_PIPES; n++)
{
- coroutines[n].write_pipe.write(0, {{(llfio::byte *) &i, sizeof(i)}}).value();
+ states[n].emplace(coroutines[n]());
}
- // Take a copy of all pending i/o
- std::vector<io_visitor::container_type::value_type> copy(io_pending.begin(), io_pending.end());
- for(;;)
+ // Write to all the pipes, then pump coroutine resumption until all completions done
+ size_t count = 0, failures = 0;
+ for(size_t i = 1; i <= 10; i++)
{
- // Manually check if an i/o completion is ready, avoiding any syscalls
- bool need_to_poll = true;
- for(auto it = copy.begin(); it != copy.end();)
+ std::cout << "\nWrite no " << i << std::endl;
+ for(size_t n = MAX_PIPES - 1; n < MAX_PIPES; n--)
{
- if((*it)->await_ready())
- {
- need_to_poll = false;
- it = copy.erase(it);
- std::cout << "Completed an i/o without syscall" << std::endl;
- }
- else
- ++it;
+ coroutines[n].write_pipe.write(0, {{(llfio::byte *) &i, sizeof(i)}}).value();
}
- if(need_to_poll)
+ // Take a copy of all pending i/o
+ for(;;)
{
// Have the kernel tell me when an i/o completion is ready
- auto r = multiplexer->complete_io();
- BOOST_CHECK(r.value() != 0);
- if(r.value() < 0)
+ auto r = multiplexer->check_for_any_completed_io();
+ if(r.value().initiated_ios_completed == 0)
{
- for(size_t n = 0; n < MAX_PIPES; n++)
- {
- BOOST_CHECK(coroutines[n].received_for == i + 1);
- }
break;
}
}
+ count += i;
+ for(size_t n = 0; n < MAX_PIPES; n++)
+ {
+ if(coroutines[n].received_for != count)
+ {
+ std::cout << "Coroutine " << n << " has count " << coroutines[n].received_for << " instead of " << count << std::endl;
+ failures++;
+ }
+ BOOST_CHECK(coroutines[n].received_for == count);
+ }
+ BOOST_REQUIRE(failures == 0);
}
- }
- // Rethrow any failures
- for(size_t n = 0; n < MAX_PIPES; n++)
- {
- if(states[n]->await_ready())
+ // Rethrow any failures
+ for(size_t n = 0; n < MAX_PIPES; n++)
{
- states[n]->await_resume().value();
+ if(states[n]->await_ready())
+ {
+ states[n]->await_resume().value();
+ }
}
- }
- // Destruction of coroutines when they are suspended must work.
- // This will cancel any pending i/o and immediately exit the
- // coroutines
- states.clear();
- // Now clear all the coroutines
- coroutines.clear();
+ // Destruction of coroutines when they are suspended must work.
+ // This will cancel any pending i/o and immediately exit the
+ // coroutines
+ states.clear();
+ // Now clear all the coroutines
+ coroutines.clear();
+ };
+#ifdef _WIN32
+ std::cout << "\nSingle threaded IOCP, immediate completions:\n";
+ test_multiplexer(llfio::test::multiplexer_win_iocp(1, false).value());
+ std::cout << "\nSingle threaded IOCP, reactor completions:\n";
+ test_multiplexer(llfio::test::multiplexer_win_iocp(1, true).value());
+ std::cout << "\nMultithreaded IOCP, immediate completions:\n";
+ test_multiplexer(llfio::test::multiplexer_win_iocp(2, false).value());
+ std::cout << "\nMultithreaded IOCP, reactor completions:\n";
+ test_multiplexer(llfio::test::multiplexer_win_iocp(2, true).value());
+#else
+#error Not implemented yet
+#endif
}
#endif
#endif
@@ -375,7 +371,7 @@ KERNELTEST_TEST_KERNEL(integration, llfio, pipe_handle, blocking, "Tests that bl
KERNELTEST_TEST_KERNEL(integration, llfio, pipe_handle, nonblocking, "Tests that nonblocking llfio::pipe_handle works as expected", TestNonBlockingPipeHandle())
#if LLFIO_ENABLE_TEST_IO_MULTIPLEXERS
KERNELTEST_TEST_KERNEL(integration, llfio, pipe_handle, multiplexed, "Tests that multiplexed llfio::pipe_handle works as expected", TestMultiplexedPipeHandle())
-#if LLFIO_ENABLE_COROUTINES && 0
+#if LLFIO_ENABLE_COROUTINES
KERNELTEST_TEST_KERNEL(integration, llfio, pipe_handle, coroutined, "Tests that coroutined llfio::pipe_handle works as expected", TestCoroutinedPipeHandle())
#endif
#endif