Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/windirstat/llfio.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2020-11-28 20:58:08 +0300
committerNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2020-11-28 20:58:08 +0300
commit7c2fd4d1070394326b39323e9beb53823336eca9 (patch)
tree09315c68b40fe039383e1ecd6f5dcbe03f50d51e
parent0e304a916da69b0367407bea496edd6477fed16d (diff)
Finally got back onto doing some work on the test io_uring multiplexer.
-rw-r--r--include/llfio/v2.0/detail/impl/posix/test/io_uring_multiplexer.ipp388
-rw-r--r--include/llfio/v2.0/io_multiplexer.hpp2
2 files changed, 222 insertions, 168 deletions
diff --git a/include/llfio/v2.0/detail/impl/posix/test/io_uring_multiplexer.ipp b/include/llfio/v2.0/detail/impl/posix/test/io_uring_multiplexer.ipp
index 0988410a..afb71d90 100644
--- a/include/llfio/v2.0/detail/impl/posix/test/io_uring_multiplexer.ipp
+++ b/include/llfio/v2.0/detail/impl/posix/test/io_uring_multiplexer.ipp
@@ -87,9 +87,18 @@ namespace test
- Two io_uring instances are used, one for seekable i/o, the other for non-seekable
i/o. This prevents writes to seekable handles blocking until non-seekable i/o completes.
+
+ Todo list:
+
+ - Timeouts implementation
+
+ - Registered i/o buffers
+
*/
template <bool is_threadsafe> class linux_io_uring_multiplexer final : public io_multiplexer_impl<is_threadsafe>
{
+ friend LLFIO_HEADERS_ONLY_FUNC_SPEC result<io_multiplexer_ptr> multiplexer_linux_io_uring(size_t threads, bool is_polling) noexcept;
+
using _base = io_multiplexer_impl<is_threadsafe>;
using _multiplexer_lock_guard = typename _base::_lock_guard;
@@ -428,39 +437,37 @@ namespace test
return _to;
}
};
- int _wakecount{0};
- size_t threads{0};
- bool is_polling{false};
- struct _nonseekable_t // fd is kept in this->_v.fd
+
+ const bool _is_polling{false};
+ bool _have_ioring_register_files_update{true}; // track if this Linux kernel implements IORING_REGISTER_FILES_UPDATE
+ int _seekable_iouring_fd{-1};
+ struct _submission_completion_t
{
struct submission_t
{
- std::atomic<uint32_t> *head{nullptr}, *tail{nullptr}, *flags{nullptr}, *dropped{nullptr}, *array{nullptr};
+ std::atomic<uint32_t> *head{nullptr}, *tail{nullptr}, *flags{nullptr}, *dropped{nullptr};
uint32_t ring_mask{0}, ring_entries{0};
- span<uint32_t> queue;
+ span<uint32_t> region; // refers to the mmapped region, used to munmap on close
span<_io_uring_sqe> entries;
+ span<uint32_t> array;
} submission;
struct completion_t
{
std::atomic<uint32_t> *head{nullptr}, *tail{nullptr}, *overflow{nullptr};
uint32_t ring_mask{0}, ring_entries{0};
- span<uint32_t> queue;
+ span<uint32_t> region; // refers to the mmapped region, used to munmap on close
span<_io_uring_cqe> entries;
} completion;
- } _nonseekable;
- struct _seekable_t : _nonseekable_t
- {
- int fd{-1};
- } _seekable;
+ } _nonseekable, _seekable;
struct _registered_fd
{
- int fd{-1};
+ const int fd{-1};
struct queue_t
{
_io_uring_operation_state *first{nullptr}, *last{nullptr};
};
// contains initiated i/o not yet submitted to io_uring. state->submitted_to_iouring will be false.
- queue_t queued_reads, queued_writes_or_barriers;
+ queue_t enqueued_io;
// contains initiated i/o submitted to io_uring. state->submitted_to_iouring will be true.
struct
{
@@ -471,24 +478,26 @@ namespace test
_io_uring_operation_state *write_or_barrier{nullptr};
} inprogress;
- constexpr _registered_fd() {}
- explicit _registered_fd(int _fd)
- : fd(_fd)
+ explicit _registered_fd(io_handle &h)
+ : fd(h.native_handle().fd)
{
}
bool operator<(const _registered_fd &o) const noexcept { return fd < o.fd; }
};
- std::vector<_registered_fd> _registered_fds{4};
- bool _have_ioring_register_files_update{true};
+ /* If this were a real, and not test, implementation this would be a hash table
+ and there would be separate tables for registered fds with i/o pending and
+ needing submitted. This would avoid a linear scan of the whole table per i/o pump.
+ */
+ std::vector<_registered_fd> _registered_fds; // ordered by fd so can be binary searched
std::vector<registered_buffer_type> _registered_buffers;
- std::vector<_registered_fd>::iterator _find_fd(int fd) const
+ typename std::vector<_registered_fd>::iterator _find_fd(int fd) const
{
auto ret = std::lower_bound(_registered_fds.begin(), _registered_fds.end(), fd);
assert(fd == ret->fd);
return ret;
}
- static void _enqueue_to(_registered_fd::queue_t &queue, _io_uring_operation_state *state)
+ static void _enqueue_to(typename _registered_fd::queue_t &queue, _io_uring_operation_state *state)
{
assert(state->prev == nullptr);
assert(state->next == nullptr);
@@ -506,8 +515,9 @@ namespace test
queue.last = state;
}
}
- static void _dequeue_from(_registered_fd::queue_t &queue, _io_uring_operation_state *state)
+ static _io_uring_operation_state *_dequeue_from(typename _registered_fd::queue_t &queue, _io_uring_operation_state *state)
{
+ _io_uring_operation_state *ret = queue.first;
if(state->prev == nullptr)
{
assert(queue.first == this);
@@ -515,9 +525,9 @@ namespace test
}
else
{
- state->prev->next = next;
+ state->prev->next = state->next;
}
- if(next == nullptr)
+ if(state->next == nullptr)
{
assert(queue.last == this);
queue.last = state->prev;
@@ -527,19 +537,25 @@ namespace test
state->next->prev = state->prev;
}
state->next = state->prev = nullptr;
+ return ret;
}
void _pump(_multiplexer_lock_guard &g)
{
// Drain completions first
- auto drain_completions = [&](_nonseekable_t &inst) {
+ auto drain_completions = [&](_submission_completion_t &inst) {
for(;;)
{
- uint32_t head = inst.completion.head->load(std::memory_order_acquire);
+ const uint32_t head = inst.completion.head->load(std::memory_order_acquire);
if(head == inst.completion.tail->load(std::memory_order_relaxed))
{
break;
}
- _io_uring_cqe *cqe = &inst.completion.entries[head & inst.completion.ring_mask];
+ const _io_uring_cqe *cqe = &inst.completion.entries[head & inst.completion.ring_mask];
+ if(cqe->user_data == nullptr)
+ {
+ // A wakeup
+ continue;
+ }
auto *state = (_io_uring_operation_state *) (uintptr_t) cqe->user_data;
assert(state->submitted_to_iouring);
assert(is_initiated(state->state));
@@ -648,86 +664,116 @@ namespace test
break;
}
}
+ inst.completion.head->fetch_add(1, std::memory_order_release);
}
};
drain_completions(_nonseekable);
- drain_completions(_seekable);
-
- // For all registered fds without an inprogress operation and a non-empty queue,
- // submit i/o
- const uint32_t index = queue.submission.tail & queue.submission.ring_mask;
- _io_uring_sqe *sqe = &queue.submission.entries[index];
- auto submit = [&] {
- queue.submission.array[index] = index;
- queue.tail++;
- WHAT AM I DOING HERE ? ;
- };
- switch(s)
- {
- case io_operation_state_type::read_initialised:
+ if(-1 != _seekable_iouring_fd)
{
- io_handle::io_result<io_handle::buffers_type> ret(state->payload.noncompleted.params.read.reqs.buffers);
-
- /* Try to eagerly complete the i/o now, if so ... */
- if(false /* completed immediately */)
- {
- state->read_completed(std::move(ret).value());
- if(!_disable_immediate_completions /* state is no longer in use by anyone else */)
- {
- state->read_finished();
- return io_operation_state_type::read_finished;
- }
- return io_operation_state_type::read_completed;
- }
- /* Otherwise the i/o has been initiated and will complete at some later point */
- state->read_initiated();
- _multiplexer_lock_guard g(this->_lock);
- _insert(state);
- return io_operation_state_type::read_initiated;
+ drain_completions(_seekable);
}
- case io_operation_state_type::write_initialised:
- {
- io_handle::io_result<io_handle::const_buffers_type> ret(state->payload.noncompleted.params.write.reqs.buffers);
- if(false /* completed immediately */)
+
+ auto enqueue_submissions = [&](_submission_completion_t &inst) {
+ for(auto &rfd : _registered_fds)
{
- state->write_completed(std::move(ret).value());
- if(!_disable_immediate_completions /* state is no longer in use by anyone else */)
+ if(rfd.inprogress.reads.first == nullptr && rfd.inprogress.write_or_barrier == nullptr)
{
- state->write_or_barrier_finished();
- return io_operation_state_type::write_or_barrier_finished;
+ bool enqueue_more = true;
+ while(rfd.enqueued_io.first != nullptr && enqueue_more)
+ {
+ // This registered fd has no i/o in progress but does have i/o enqueued
+ auto *state = _dequeue_from(rfd.enqueued_io);
+ assert(!state->submitted_to_iouring);
+ assert(is_initiated(state->state));
+
+ const uint32_t tail = inst.submission.tail->load(std::memory_order_acquire);
+ if(tail == inst.submission.head->load(std::memory_order_relaxed))
+ {
+ break;
+ }
+ const uint32_t sqeidx = tail & inst.submission.ring_mask;
+ _io_uring_sqe *sqe = &inst.submission.entries[sqeidx];
+ auto s = state->current_state();
+ bool enqueue_more = false;
+ memset(sqe, 0, sizeof(_io_uring_sqe));
+ sqe->fd = state->fd;
+ sqe->user_data = state;
+ switch(s)
+ {
+ default:
+ abort();
+ case io_operation_state_type::read_initialised:
+ {
+ sqe->opcode = _IORING_OP_READV; // TODO: _IORING_OP_READ_FIXED into registered i/o buffers
+ sqe->off = state->noncompleted.params.read.reqs.offset;
+ sqe->addr = state->noncompleted.params.read.reqs.buffers.data();
+ sqe->len = state->noncompleted.params.read.reqs.buffers.size();
+ _enqueue_to(rfd.inprogress.reads, state);
+ // If this is a read upon a seekable handle, keep enqueuing
+ enqueue_more = state->is_seekable;
+ break;
+ }
+ case io_operation_state_type::write_initialised:
+ {
+ sqe->opcode = _IORING_OP_WRITEV; // TODO: _IORING_OP_WRITE_FIXED into registered i/o buffers
+ if(state->is_seekable)
+ {
+ sqe->flags = _IOSQE_IO_DRAIN; // Drain all preceding reads before doing the write, and don't start anything new until this completes
+ }
+ sqe->off = state->noncompleted.params.write.reqs.offset;
+ sqe->addr = state->noncompleted.params.write.reqs.buffers.data();
+ sqe->len = state->noncompleted.params.write.reqs.buffers.size();
+ rfd.inprogress.write_or_barrier = state;
+ break;
+ }
+ case io_operation_state_type::barrier_initialised:
+ {
+ // Drain all preceding writes before doing the barrier, and don't start anything new until this completes
+ sqe->flags = _IOSQE_IO_DRAIN;
+ if(state->noncompleted.params.barrier.kind <= barrier_kind::wait_data_only)
+ {
+ // Linux has a lovely dedicated syscall giving us exactly what we need here
+ sqe->opcode = _IORING_OP_SYNC_FILE_RANGE;
+ sqe->off = state->noncompleted.params.write.reqs.offset;
+ // empty buffers means bytes = 0 which means sync entire file
+ for(const auto &req : state->noncompleted.params.write.reqs.buffers)
+ {
+ sqe->len += req.size();
+ }
+ sqe->sync_range_flags = SYNC_FILE_RANGE_WRITE; // start writing all dirty pages in range now
+ if(state->noncompleted.params.barrier.kind == barrier_kind::wait_data_only)
+ {
+ sqe->sync_range_flags |= SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WAIT_AFTER; // block until they're on storage
+ }
+ }
+ else
+ {
+ sqe->opcode = _IORING_OP_FSYNC;
+ }
+ rfd.inprogress.write_or_barrier = state;
+ break;
+ }
+ }
+ inst.submission.array[sqeidx] = sqeidx;
+ inst.submission.tail->fetch_add(1, std::memory_order_release);
+ state->submitted_to_iouring = true;
+ }
}
- return io_operation_state_type::write_or_barrier_completed;
}
- state->write_initiated();
- _multiplexer_lock_guard g(this->_lock);
- _insert(state);
- return io_operation_state_type::write_initiated;
- }
- case io_operation_state_type::barrier_initialised:
+ };
+ enqueue_submissions(_nonseekable);
+ if(-1 != _seekable_iouring_fd)
{
- io_handle::io_result<io_handle::const_buffers_type> ret(state->payload.noncompleted.params.write.reqs.buffers);
- if(false /* completed immediately */)
- {
- state->barrier_completed(std::move(ret).value());
- if(!_disable_immediate_completions /* state is no longer in use by anyone else */)
- {
- state->write_or_barrier_finished();
- return io_operation_state_type::write_or_barrier_finished;
- }
- return io_operation_state_type::write_or_barrier_completed;
- }
- state->write_initiated();
- _multiplexer_lock_guard g(this->_lock);
- _insert(state);
- return io_operation_state_type::barrier_initiated;
- }
- default:
- break;
+ enqueue_submissions(_seekable);
}
}
public:
- constexpr linux_io_uring_multiplexer() {}
+ constexpr explicit linux_io_uring_multiplexer(bool is_polling)
+ : _is_polling(is_polling)
+ {
+ _registered_fds.reserve(4);
+ }
linux_io_uring_multiplexer(const linux_io_uring_multiplexer &) = delete;
linux_io_uring_multiplexer(linux_io_uring_multiplexer &&) = delete;
linux_io_uring_multiplexer &operator=(const linux_io_uring_multiplexer &) = delete;
@@ -739,24 +785,24 @@ namespace test
(void) linux_io_uring_multiplexer::close();
}
}
- static result<int> init(_nonseekable_t &out, size_t threads, bool is_polling)
+ result<void> init(bool is_seekable, _submission_completion_t &out)
{
int fd = -1;
_io_uring_params params;
memset(&params, 0, sizeof(params));
- if(is_polling)
+ if(_is_polling)
{
// We don't implement IORING_SETUP_IOPOLL, it is for O_DIRECT files only in any case
params.flags |= _IORING_SETUP_SQPOLL;
params.sq_thread_idle = 100; // 100 milliseconds
- if(threads == 1)
+ if(!is_threadsafe)
{
// Pin kernel submission polling thread to same CPU as I am pinned to, if I am pinned
cpu_set_t affinity;
CPU_ZERO(&affinity);
- if(-1 != sched_get_affinity(0, sizeof(affinity), &affinity) && CPU_COUNT(&affinity) == 1)
+ if(-1 != ::sched_getaffinity(0, sizeof(affinity), &affinity) && CPU_COUNT(&affinity) == 1)
{
- for(size_t n = 0; n < CPU_SET_SIZE; n++)
+ for(size_t n = 0; n < CPU_SETSIZE; n++)
{
if(CPU_ISSET(n, &affinity))
{
@@ -781,14 +827,15 @@ namespace test
{
return posix_error();
}
- out.submission.queue = {(uint32_t *) p, params.sq_entries};
- out.head = &out.submission.queue[params.sq_off.head / sizeof(uint32_t)];
- out.tail = &out.submission.queue[params.sq_off.tail / sizeof(uint32_t)];
- out.ring_mask = out.submission.queue[params.sq_off.ring_mask / sizeof(uint32_t)];
- out.ring_entries = out.submission.queue[params.sq_off.ring_entries / sizeof(uint32_t)];
- out.flags = out.submission.queue[params.sq_off.flags / sizeof(uint32_t)];
- out.dropped = &out.submission.queue[params.sq_off.dropped / sizeof(uint32_t)];
- out.array = &out.submission.queue[params.sq_off.array / sizeof(uint32_t)];
+ out.submission.region = {(uint32_t *) p, (params.sq_off.array + params.sq_entries * sizeof(uint32_t)) / sizeof(uint32_t)};
+ out.head = &out.submission.region[params.sq_off.head / sizeof(uint32_t)];
+ out.tail = &out.submission.region[params.sq_off.tail / sizeof(uint32_t)];
+ out.ring_mask = out.submission.region[params.sq_off.ring_mask / sizeof(uint32_t)];
+ out.ring_entries = out.submission.region[params.sq_off.ring_entries / sizeof(uint32_t)];
+ out.flags = out.submission.region[params.sq_off.flags / sizeof(uint32_t)];
+ out.dropped = &out.submission.region[params.sq_off.dropped / sizeof(uint32_t)];
+ out.array = {
+ &out.submission.region[params.sq_off.array / sizeof(uint32_t)], params.sq_entries};
}
{
auto *p = ::mmap(nullptr, params.sq_entries * sizeof(_io_uring_sqe), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, _IORING_OFF_SQES);
@@ -804,16 +851,24 @@ namespace test
{
return posix_error();
}
- out.completion.queue = {(uint32_t *) p, params.cq_entries};
- out.head = &out.completion.queue[params.cq_off.head / sizeof(uint32_t)];
- out.tail = &out.completion.queue[params.cq_off.tail / sizeof(uint32_t)];
- out.ring_mask = out.completion.queue[params.cq_off.ring_mask / sizeof(uint32_t)];
- out.ring_entries = out.completion.queue[params.cq_off.ring_entries / sizeof(uint32_t)];
- out.overflow = out.completion.queue[params.cq_off.overflow / sizeof(uint32_t)];
- out.completion.entries = {(_io_uring_cqe *) &out.completion.queue[params.cq_off.cqes / sizeof(uint32_t)], params.cq_entries};
+ out.completion.region = {(uint32_t *) p, (params.cq_off.cqes + params.cq_entries * sizeof(_io_uring_cqe)) / sizeof(uint32_t)};
+ out.head = &out.completion.region[params.cq_off.head / sizeof(uint32_t)];
+ out.tail = &out.completion.region[params.cq_off.tail / sizeof(uint32_t)];
+ out.ring_mask = out.completion.region[params.cq_off.ring_mask / sizeof(uint32_t)];
+ out.ring_entries = out.completion.region[params.cq_off.ring_entries / sizeof(uint32_t)];
+ out.overflow = out.completion.region[params.cq_off.overflow / sizeof(uint32_t)];
+ out.completion.entries = {(_io_uring_cqe *) &out.completion.region[params.cq_off.cqes / sizeof(uint32_t)], params.cq_entries};
+ }
+ if(is_seekable)
+ {
+ _seekable_iouring_fd = fd;
}
- this->_v.behaviour |= native_handle_type::disposition::multiplexer;
- return fd;
+ else
+ {
+ this->_v.fd = fd;
+ this->_v.behaviour |= native_handle_type::disposition::multiplexer;
+ }
+ return success();
}
// These functions are inherited from handle
@@ -821,13 +876,13 @@ namespace test
virtual result<void> close() noexcept override
{
auto do_close = [this](auto &s) {
- if(!s.submission.queue.empty())
+ if(!s.submission.region.empty())
{
- if(-1 == ::munmap(s.submission.queue.data(), s.submission.queue.size_bytes()))
+ if(-1 == ::munmap(s.submission.region.data(), s.submission.region.size_bytes()))
{
return posix_error();
}
- s.submission.queue = {};
+ s.submission.region = {};
}
if(!s.submission.entries.empty())
{
@@ -837,26 +892,27 @@ namespace test
}
s.submission.entries = {};
}
- if(!s.completion.queue.empty())
+ if(!s.completion.region.empty())
{
- if(-1 == ::munmap(s.completion.queue.data(), s.completion.queue.size_bytes()))
+ if(-1 == ::munmap(s.completion.region.data(), s.completion.region.size_bytes()))
{
return posix_error();
}
- s.completion.queue = {};
+ s.completion.region = {};
}
};
- do_close(_seekable);
- if(-1 != _seekable.fd)
+ if(-1 != _seekable_iouring_fd)
{
- if(-1 == ::close(_seekable.fd))
+ do_close(_seekable);
+ if(-1 == ::close(_seekable_iouring_fd))
{
return posix_error();
}
+ _seekable_iouring_fd = -1;
}
do_close(_nonseekable);
OUTCOME_TRY(_base::close());
- _known_fds.clear();
+ _registered_fds.clear();
_registered_buffers.clear();
return success();
}
@@ -869,61 +925,50 @@ namespace test
return ret;
}
- result<void> _recalculate_registered_fds(int index, int fd) noexcept
+ result<void> _change_fd_registration(bool is_seekable, int fd, bool enable) noexcept
{
if(_have_ioring_register_files_update)
{
- int32_t newvalue = fd;
+ int32_t newvalue = enable ? fd : -1;
_io_uring_files_update upd;
memset(&upd, 0, sizeof(upd));
- upd.offset = index;
+ upd.offset = fd;
upd.fds = (__aligned_u64) &newvalue;
- if(_io_uring_register(this->_v.fd, _IORING_REGISTER_FILES_UPDATE, &upd, 1) >= 0) // Linux kernel 5.5 onwards
+ if(_io_uring_register(is_seekable ? _seekable_iouring_fd : this->_v.fd, _IORING_REGISTER_FILES_UPDATE, &upd, 1) >= 0) // Linux kernel 5.5 onwards
{
return success();
}
+ // Failed, so disable ever calling this again
_have_ioring_register_files_update = false;
}
-#if 0 // Hangs the ring until it empties, which locks this implementation
- // Fall back to the old inefficient API
- std::vector<int32_t> map(_known_fds.back() + 1, -1);
- for(auto fd : _known_fds)
- {
- map[fd] = fd;
- }
- (void) _io_uring_register(this->_v.fd, _IORING_UNREGISTER_FILES, nullptr, 0);
- if(_io_uring_register(this->_v.fd, _IORING_REGISTER_FILES, map.data(), map.size()) < 0)
- {
- return posix_error();
- }
-#endif
return success();
}
virtual result<uint8_t> do_io_handle_register(io_handle *h) noexcept override // linear complexity to total handles registered
{
_multiplexer_lock_guard g(this->_lock);
- if(_registered_fds.size() >= 64)
+ if(h->is_seekable() && -1 == _seekable_iouring_fd)
{
- return errc::resource_unavailable_try_again; // This test multiplexer can't handle more than 64 handles
+ // Create the seekable io_uring ring
+ OUTCOME_TRY(init(true, _seekable));
}
- int toinsert = h->native_handle().fd;
- _registered_fds.push_back(); // capacity expansion
- _registered_fds.pop_back();
+ _registered_fd toinsert(h);
_registered_fds.insert(std::lower_bound(_registered_fds.begin(), _registered_fds.end(), toinsert), toinsert);
- return _recalculate_registered_fds(toinsert, toinsert);
+ return _change_fd_registration(h->is_seekable(), toinsert.fd, true);
}
virtual result<void> do_io_handle_deregister(io_handle *h) noexcept override
{
_multiplexer_lock_guard g(this->_lock);
- int toremove = h->native_handle().fd;
+ int fd = h->native_handle().fd;
auto it = _find_fd(fd);
- assert(it->first == nullptr);
- if(it->first != nullptr)
+ assert(it->inprogress.reads.first == nullptr);
+ assert(it->inprogress.write_or_barrier == nullptr);
+ if(it->inprogress.reads.first != nullptr || it->inprogress.write_or_barrier != nullptr)
{
+ // Can't deregister a handle with i/o in progress
return errc::operation_in_progress;
}
_registered_fds.erase(it);
- return _recalculate_registered_fds(toremove, -1);
+ return _change_fd_registration(h->is_seekable(), fd, false);
}
virtual size_t do_io_handle_max_buffers(const io_handle * /*unused*/) const noexcept override { return IOV_MAX; }
@@ -997,7 +1042,6 @@ namespace test
assert(false);
return s;
}
- bool use_write_barrier_queue = false;
switch(s)
{
case io_operation_state_type::read_initialised:
@@ -1008,13 +1052,11 @@ namespace test
case io_operation_state_type::write_initialised:
{
state->write_initiated();
- use_write_barrier_queue = true;
break;
}
case io_operation_state_type::barrier_initialised:
{
state->barrier_initiated();
- use_write_barrier_queue = true;
break;
}
}
@@ -1024,7 +1066,7 @@ namespace test
_multiplexer_lock_guard g(this->_lock);
auto it = _find_fd(state->fd);
assert(it != _registered_fds.end());
- _enqueue_to(use_write_barrier_queue ? it->queued_writes_or_barriers : it->queued_reads, state);
+ _enqueue_to(it->enqueued_io, state);
return state->state;
}
@@ -1035,7 +1077,7 @@ namespace test
virtual result<void> flush_inited_io_operations() noexcept override
{
_multiplexer_lock_guard g(this->_lock);
- _pump(g);
+ TODO mark all submitted i / os as initiated;
return success();
}
@@ -1115,11 +1157,11 @@ namespace test
while(max_completions > 0)
{
_multiplexer_lock_guard g(this->_lock);
- // If another kernel thread woke me, exit the loop
- if(_wakecount > 0)
+ TODO if there is a timeout, submit a timeout before this;
+ auto ret = io_uring_enter(_v.fd, 0, 1, _IORING_ENTER_GETEVENTS);
+ if(ret< 0)
{
- --_wakecount;
- break;
+ return posix_error();
}
auto *c = _first;
if(c != nullptr)
@@ -1165,23 +1207,35 @@ namespace test
virtual result<void> wake_check_for_any_completed_io() noexcept override
{
_multiplexer_lock_guard g(this->_lock);
- ++_wakecount;
+ // Post a null SQE, it'll break out any waits
+ const uint32_t tail = inst.submission.tail->load(std::memory_order_acquire);
+ if(tail == inst.submission.head->load(std::memory_order_relaxed))
+ {
+ return errc::resource_unavailable_try_again; // SQE ring is full
+ }
+ const uint32_t sqeidx = tail & inst.submission.ring_mask;
+ _io_uring_sqe *sqe = &inst.submission.entries[sqeidx];
+ memset(sqe, 0, sizeof(_io_uring_sqe));
+ sqe->opcode = _IORING_OP_NOP;
+ inst.submission.array[sqeidx] = sqeidx;
+ inst.submission.tail->fetch_add(1, std::memory_order_release);
return success();
}
};
- LLFIO_HEADERS_ONLY_FUNC_SPEC result<io_multiplexer_ptr> multiplexer_linux_io_uring_multiplexer(size_t threads, bool is_polling) noexcept
+ LLFIO_HEADERS_ONLY_FUNC_SPEC result<io_multiplexer_ptr> multiplexer_linux_io_uring(size_t threads, bool is_polling) noexcept
{
try
{
if(1 == threads)
{
- auto ret = std::make_unique<linux_io_uring_multiplexer<false>>();
- OUTCOME_TRY(ret->init(1, is_polling));
+ // Make non locking edition
+ auto ret = std::make_unique<linux_io_uring_multiplexer<false>>(is_polling);
+ OUTCOME_TRY(ret->init(false, ret->_nonseekable));
return io_multiplexer_ptr(ret.release());
}
- auto ret = std::make_unique<linux_io_uring_multiplexer<true>>();
- OUTCOME_TRY(ret->init(threads, is_polling));
+ auto ret = std::make_unique<linux_io_uring_multiplexer<true>>(is_polling);
+ OUTCOME_TRY(ret->init(false, ret->_nonseekable));
return io_multiplexer_ptr(ret.release());
}
catch(...)
diff --git a/include/llfio/v2.0/io_multiplexer.hpp b/include/llfio/v2.0/io_multiplexer.hpp
index 48008286..070cf0c1 100644
--- a/include/llfio/v2.0/io_multiplexer.hpp
+++ b/include/llfio/v2.0/io_multiplexer.hpp
@@ -1406,7 +1406,7 @@ namespace test
#if defined(__linux__) || DOXYGEN_IS_IN_THE_HOUSE
// LLFIO_HEADERS_ONLY_FUNC_SPEC result<io_multiplexer_ptr> multiplexer_linux_epoll(size_t threads) noexcept;
-// LLFIO_HEADERS_ONLY_FUNC_SPEC result<io_multiplexer_ptr> multiplexer_linux_io_uring() noexcept;
+ LLFIO_HEADERS_ONLY_FUNC_SPEC result<io_multiplexer_ptr> multiplexer_linux_io_uring(size_t threads, bool is_polling) noexcept;
#endif
#if(defined(__FreeBSD__) || defined(__APPLE__)) || DOXYGEN_IS_IN_THE_HOUSE
// LLFIO_HEADERS_ONLY_FUNC_SPEC result<io_multiplexer_ptr> multiplexer_bsd_kqueue(size_t threads) noexcept;