From 7c2fd4d1070394326b39323e9beb53823336eca9 Mon Sep 17 00:00:00 2001 From: "Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com)" Date: Sat, 28 Nov 2020 17:58:08 +0000 Subject: Finally got back onto doing some work on the test io_uring multiplexer. --- .../impl/posix/test/io_uring_multiplexer.ipp | 388 ++++++++++++--------- include/llfio/v2.0/io_multiplexer.hpp | 2 +- 2 files changed, 222 insertions(+), 168 deletions(-) diff --git a/include/llfio/v2.0/detail/impl/posix/test/io_uring_multiplexer.ipp b/include/llfio/v2.0/detail/impl/posix/test/io_uring_multiplexer.ipp index 0988410a..afb71d90 100644 --- a/include/llfio/v2.0/detail/impl/posix/test/io_uring_multiplexer.ipp +++ b/include/llfio/v2.0/detail/impl/posix/test/io_uring_multiplexer.ipp @@ -87,9 +87,18 @@ namespace test - Two io_uring instances are used, one for seekable i/o, the other for non-seekable i/o. This prevents writes to seekable handles blocking until non-seekable i/o completes. + + Todo list: + + - Timeouts implementation + + - Registered i/o buffers + */ template class linux_io_uring_multiplexer final : public io_multiplexer_impl { + friend LLFIO_HEADERS_ONLY_FUNC_SPEC result multiplexer_linux_io_uring(size_t threads, bool is_polling) noexcept; + using _base = io_multiplexer_impl; using _multiplexer_lock_guard = typename _base::_lock_guard; @@ -428,39 +437,37 @@ namespace test return _to; } }; - int _wakecount{0}; - size_t threads{0}; - bool is_polling{false}; - struct _nonseekable_t // fd is kept in this->_v.fd + + const bool _is_polling{false}; + bool _have_ioring_register_files_update{true}; // track if this Linux kernel implements IORING_REGISTER_FILES_UPDATE + int _seekable_iouring_fd{-1}; + struct _submission_completion_t { struct submission_t { - std::atomic *head{nullptr}, *tail{nullptr}, *flags{nullptr}, *dropped{nullptr}, *array{nullptr}; + std::atomic *head{nullptr}, *tail{nullptr}, *flags{nullptr}, *dropped{nullptr}; uint32_t ring_mask{0}, ring_entries{0}; - span queue; + span region; // refers to the mmapped region, used to munmap on close span<_io_uring_sqe> entries; + span array; } submission; struct completion_t { std::atomic *head{nullptr}, *tail{nullptr}, *overflow{nullptr}; uint32_t ring_mask{0}, ring_entries{0}; - span queue; + span region; // refers to the mmapped region, used to munmap on close span<_io_uring_cqe> entries; } completion; - } _nonseekable; - struct _seekable_t : _nonseekable_t - { - int fd{-1}; - } _seekable; + } _nonseekable, _seekable; struct _registered_fd { - int fd{-1}; + const int fd{-1}; struct queue_t { _io_uring_operation_state *first{nullptr}, *last{nullptr}; }; // contains initiated i/o not yet submitted to io_uring. state->submitted_to_iouring will be false. - queue_t queued_reads, queued_writes_or_barriers; + queue_t enqueued_io; // contains initiated i/o submitted to io_uring. state->submitted_to_iouring will be true. struct { @@ -471,24 +478,26 @@ namespace test _io_uring_operation_state *write_or_barrier{nullptr}; } inprogress; - constexpr _registered_fd() {} - explicit _registered_fd(int _fd) - : fd(_fd) + explicit _registered_fd(io_handle &h) + : fd(h.native_handle().fd) { } bool operator<(const _registered_fd &o) const noexcept { return fd < o.fd; } }; - std::vector<_registered_fd> _registered_fds{4}; - bool _have_ioring_register_files_update{true}; + /* If this were a real, and not test, implementation this would be a hash table + and there would be separate tables for registered fds with i/o pending and + needing submitted. This would avoid a linear scan of the whole table per i/o pump. + */ + std::vector<_registered_fd> _registered_fds; // ordered by fd so can be binary searched std::vector _registered_buffers; - std::vector<_registered_fd>::iterator _find_fd(int fd) const + typename std::vector<_registered_fd>::iterator _find_fd(int fd) const { auto ret = std::lower_bound(_registered_fds.begin(), _registered_fds.end(), fd); assert(fd == ret->fd); return ret; } - static void _enqueue_to(_registered_fd::queue_t &queue, _io_uring_operation_state *state) + static void _enqueue_to(typename _registered_fd::queue_t &queue, _io_uring_operation_state *state) { assert(state->prev == nullptr); assert(state->next == nullptr); @@ -506,8 +515,9 @@ namespace test queue.last = state; } } - static void _dequeue_from(_registered_fd::queue_t &queue, _io_uring_operation_state *state) + static _io_uring_operation_state *_dequeue_from(typename _registered_fd::queue_t &queue, _io_uring_operation_state *state) { + _io_uring_operation_state *ret = queue.first; if(state->prev == nullptr) { assert(queue.first == this); @@ -515,9 +525,9 @@ namespace test } else { - state->prev->next = next; + state->prev->next = state->next; } - if(next == nullptr) + if(state->next == nullptr) { assert(queue.last == this); queue.last = state->prev; @@ -527,19 +537,25 @@ namespace test state->next->prev = state->prev; } state->next = state->prev = nullptr; + return ret; } void _pump(_multiplexer_lock_guard &g) { // Drain completions first - auto drain_completions = [&](_nonseekable_t &inst) { + auto drain_completions = [&](_submission_completion_t &inst) { for(;;) { - uint32_t head = inst.completion.head->load(std::memory_order_acquire); + const uint32_t head = inst.completion.head->load(std::memory_order_acquire); if(head == inst.completion.tail->load(std::memory_order_relaxed)) { break; } - _io_uring_cqe *cqe = &inst.completion.entries[head & inst.completion.ring_mask]; + const _io_uring_cqe *cqe = &inst.completion.entries[head & inst.completion.ring_mask]; + if(cqe->user_data == nullptr) + { + // A wakeup + continue; + } auto *state = (_io_uring_operation_state *) (uintptr_t) cqe->user_data; assert(state->submitted_to_iouring); assert(is_initiated(state->state)); @@ -648,86 +664,116 @@ namespace test break; } } + inst.completion.head->fetch_add(1, std::memory_order_release); } }; drain_completions(_nonseekable); - drain_completions(_seekable); - - // For all registered fds without an inprogress operation and a non-empty queue, - // submit i/o - const uint32_t index = queue.submission.tail & queue.submission.ring_mask; - _io_uring_sqe *sqe = &queue.submission.entries[index]; - auto submit = [&] { - queue.submission.array[index] = index; - queue.tail++; - WHAT AM I DOING HERE ? ; - }; - switch(s) - { - case io_operation_state_type::read_initialised: + if(-1 != _seekable_iouring_fd) { - io_handle::io_result ret(state->payload.noncompleted.params.read.reqs.buffers); - - /* Try to eagerly complete the i/o now, if so ... */ - if(false /* completed immediately */) - { - state->read_completed(std::move(ret).value()); - if(!_disable_immediate_completions /* state is no longer in use by anyone else */) - { - state->read_finished(); - return io_operation_state_type::read_finished; - } - return io_operation_state_type::read_completed; - } - /* Otherwise the i/o has been initiated and will complete at some later point */ - state->read_initiated(); - _multiplexer_lock_guard g(this->_lock); - _insert(state); - return io_operation_state_type::read_initiated; + drain_completions(_seekable); } - case io_operation_state_type::write_initialised: - { - io_handle::io_result ret(state->payload.noncompleted.params.write.reqs.buffers); - if(false /* completed immediately */) + + auto enqueue_submissions = [&](_submission_completion_t &inst) { + for(auto &rfd : _registered_fds) { - state->write_completed(std::move(ret).value()); - if(!_disable_immediate_completions /* state is no longer in use by anyone else */) + if(rfd.inprogress.reads.first == nullptr && rfd.inprogress.write_or_barrier == nullptr) { - state->write_or_barrier_finished(); - return io_operation_state_type::write_or_barrier_finished; + bool enqueue_more = true; + while(rfd.enqueued_io.first != nullptr && enqueue_more) + { + // This registered fd has no i/o in progress but does have i/o enqueued + auto *state = _dequeue_from(rfd.enqueued_io); + assert(!state->submitted_to_iouring); + assert(is_initiated(state->state)); + + const uint32_t tail = inst.submission.tail->load(std::memory_order_acquire); + if(tail == inst.submission.head->load(std::memory_order_relaxed)) + { + break; + } + const uint32_t sqeidx = tail & inst.submission.ring_mask; + _io_uring_sqe *sqe = &inst.submission.entries[sqeidx]; + auto s = state->current_state(); + bool enqueue_more = false; + memset(sqe, 0, sizeof(_io_uring_sqe)); + sqe->fd = state->fd; + sqe->user_data = state; + switch(s) + { + default: + abort(); + case io_operation_state_type::read_initialised: + { + sqe->opcode = _IORING_OP_READV; // TODO: _IORING_OP_READ_FIXED into registered i/o buffers + sqe->off = state->noncompleted.params.read.reqs.offset; + sqe->addr = state->noncompleted.params.read.reqs.buffers.data(); + sqe->len = state->noncompleted.params.read.reqs.buffers.size(); + _enqueue_to(rfd.inprogress.reads, state); + // If this is a read upon a seekable handle, keep enqueuing + enqueue_more = state->is_seekable; + break; + } + case io_operation_state_type::write_initialised: + { + sqe->opcode = _IORING_OP_WRITEV; // TODO: _IORING_OP_WRITE_FIXED into registered i/o buffers + if(state->is_seekable) + { + sqe->flags = _IOSQE_IO_DRAIN; // Drain all preceding reads before doing the write, and don't start anything new until this completes + } + sqe->off = state->noncompleted.params.write.reqs.offset; + sqe->addr = state->noncompleted.params.write.reqs.buffers.data(); + sqe->len = state->noncompleted.params.write.reqs.buffers.size(); + rfd.inprogress.write_or_barrier = state; + break; + } + case io_operation_state_type::barrier_initialised: + { + // Drain all preceding writes before doing the barrier, and don't start anything new until this completes + sqe->flags = _IOSQE_IO_DRAIN; + if(state->noncompleted.params.barrier.kind <= barrier_kind::wait_data_only) + { + // Linux has a lovely dedicated syscall giving us exactly what we need here + sqe->opcode = _IORING_OP_SYNC_FILE_RANGE; + sqe->off = state->noncompleted.params.write.reqs.offset; + // empty buffers means bytes = 0 which means sync entire file + for(const auto &req : state->noncompleted.params.write.reqs.buffers) + { + sqe->len += req.size(); + } + sqe->sync_range_flags = SYNC_FILE_RANGE_WRITE; // start writing all dirty pages in range now + if(state->noncompleted.params.barrier.kind == barrier_kind::wait_data_only) + { + sqe->sync_range_flags |= SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WAIT_AFTER; // block until they're on storage + } + } + else + { + sqe->opcode = _IORING_OP_FSYNC; + } + rfd.inprogress.write_or_barrier = state; + break; + } + } + inst.submission.array[sqeidx] = sqeidx; + inst.submission.tail->fetch_add(1, std::memory_order_release); + state->submitted_to_iouring = true; + } } - return io_operation_state_type::write_or_barrier_completed; } - state->write_initiated(); - _multiplexer_lock_guard g(this->_lock); - _insert(state); - return io_operation_state_type::write_initiated; - } - case io_operation_state_type::barrier_initialised: + }; + enqueue_submissions(_nonseekable); + if(-1 != _seekable_iouring_fd) { - io_handle::io_result ret(state->payload.noncompleted.params.write.reqs.buffers); - if(false /* completed immediately */) - { - state->barrier_completed(std::move(ret).value()); - if(!_disable_immediate_completions /* state is no longer in use by anyone else */) - { - state->write_or_barrier_finished(); - return io_operation_state_type::write_or_barrier_finished; - } - return io_operation_state_type::write_or_barrier_completed; - } - state->write_initiated(); - _multiplexer_lock_guard g(this->_lock); - _insert(state); - return io_operation_state_type::barrier_initiated; - } - default: - break; + enqueue_submissions(_seekable); } } public: - constexpr linux_io_uring_multiplexer() {} + constexpr explicit linux_io_uring_multiplexer(bool is_polling) + : _is_polling(is_polling) + { + _registered_fds.reserve(4); + } linux_io_uring_multiplexer(const linux_io_uring_multiplexer &) = delete; linux_io_uring_multiplexer(linux_io_uring_multiplexer &&) = delete; linux_io_uring_multiplexer &operator=(const linux_io_uring_multiplexer &) = delete; @@ -739,24 +785,24 @@ namespace test (void) linux_io_uring_multiplexer::close(); } } - static result init(_nonseekable_t &out, size_t threads, bool is_polling) + result init(bool is_seekable, _submission_completion_t &out) { int fd = -1; _io_uring_params params; memset(¶ms, 0, sizeof(params)); - if(is_polling) + if(_is_polling) { // We don't implement IORING_SETUP_IOPOLL, it is for O_DIRECT files only in any case params.flags |= _IORING_SETUP_SQPOLL; params.sq_thread_idle = 100; // 100 milliseconds - if(threads == 1) + if(!is_threadsafe) { // Pin kernel submission polling thread to same CPU as I am pinned to, if I am pinned cpu_set_t affinity; CPU_ZERO(&affinity); - if(-1 != sched_get_affinity(0, sizeof(affinity), &affinity) && CPU_COUNT(&affinity) == 1) + if(-1 != ::sched_getaffinity(0, sizeof(affinity), &affinity) && CPU_COUNT(&affinity) == 1) { - for(size_t n = 0; n < CPU_SET_SIZE; n++) + for(size_t n = 0; n < CPU_SETSIZE; n++) { if(CPU_ISSET(n, &affinity)) { @@ -781,14 +827,15 @@ namespace test { return posix_error(); } - out.submission.queue = {(uint32_t *) p, params.sq_entries}; - out.head = &out.submission.queue[params.sq_off.head / sizeof(uint32_t)]; - out.tail = &out.submission.queue[params.sq_off.tail / sizeof(uint32_t)]; - out.ring_mask = out.submission.queue[params.sq_off.ring_mask / sizeof(uint32_t)]; - out.ring_entries = out.submission.queue[params.sq_off.ring_entries / sizeof(uint32_t)]; - out.flags = out.submission.queue[params.sq_off.flags / sizeof(uint32_t)]; - out.dropped = &out.submission.queue[params.sq_off.dropped / sizeof(uint32_t)]; - out.array = &out.submission.queue[params.sq_off.array / sizeof(uint32_t)]; + out.submission.region = {(uint32_t *) p, (params.sq_off.array + params.sq_entries * sizeof(uint32_t)) / sizeof(uint32_t)}; + out.head = &out.submission.region[params.sq_off.head / sizeof(uint32_t)]; + out.tail = &out.submission.region[params.sq_off.tail / sizeof(uint32_t)]; + out.ring_mask = out.submission.region[params.sq_off.ring_mask / sizeof(uint32_t)]; + out.ring_entries = out.submission.region[params.sq_off.ring_entries / sizeof(uint32_t)]; + out.flags = out.submission.region[params.sq_off.flags / sizeof(uint32_t)]; + out.dropped = &out.submission.region[params.sq_off.dropped / sizeof(uint32_t)]; + out.array = { + &out.submission.region[params.sq_off.array / sizeof(uint32_t)], params.sq_entries}; } { auto *p = ::mmap(nullptr, params.sq_entries * sizeof(_io_uring_sqe), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, _IORING_OFF_SQES); @@ -804,16 +851,24 @@ namespace test { return posix_error(); } - out.completion.queue = {(uint32_t *) p, params.cq_entries}; - out.head = &out.completion.queue[params.cq_off.head / sizeof(uint32_t)]; - out.tail = &out.completion.queue[params.cq_off.tail / sizeof(uint32_t)]; - out.ring_mask = out.completion.queue[params.cq_off.ring_mask / sizeof(uint32_t)]; - out.ring_entries = out.completion.queue[params.cq_off.ring_entries / sizeof(uint32_t)]; - out.overflow = out.completion.queue[params.cq_off.overflow / sizeof(uint32_t)]; - out.completion.entries = {(_io_uring_cqe *) &out.completion.queue[params.cq_off.cqes / sizeof(uint32_t)], params.cq_entries}; + out.completion.region = {(uint32_t *) p, (params.cq_off.cqes + params.cq_entries * sizeof(_io_uring_cqe)) / sizeof(uint32_t)}; + out.head = &out.completion.region[params.cq_off.head / sizeof(uint32_t)]; + out.tail = &out.completion.region[params.cq_off.tail / sizeof(uint32_t)]; + out.ring_mask = out.completion.region[params.cq_off.ring_mask / sizeof(uint32_t)]; + out.ring_entries = out.completion.region[params.cq_off.ring_entries / sizeof(uint32_t)]; + out.overflow = out.completion.region[params.cq_off.overflow / sizeof(uint32_t)]; + out.completion.entries = {(_io_uring_cqe *) &out.completion.region[params.cq_off.cqes / sizeof(uint32_t)], params.cq_entries}; + } + if(is_seekable) + { + _seekable_iouring_fd = fd; } - this->_v.behaviour |= native_handle_type::disposition::multiplexer; - return fd; + else + { + this->_v.fd = fd; + this->_v.behaviour |= native_handle_type::disposition::multiplexer; + } + return success(); } // These functions are inherited from handle @@ -821,13 +876,13 @@ namespace test virtual result close() noexcept override { auto do_close = [this](auto &s) { - if(!s.submission.queue.empty()) + if(!s.submission.region.empty()) { - if(-1 == ::munmap(s.submission.queue.data(), s.submission.queue.size_bytes())) + if(-1 == ::munmap(s.submission.region.data(), s.submission.region.size_bytes())) { return posix_error(); } - s.submission.queue = {}; + s.submission.region = {}; } if(!s.submission.entries.empty()) { @@ -837,26 +892,27 @@ namespace test } s.submission.entries = {}; } - if(!s.completion.queue.empty()) + if(!s.completion.region.empty()) { - if(-1 == ::munmap(s.completion.queue.data(), s.completion.queue.size_bytes())) + if(-1 == ::munmap(s.completion.region.data(), s.completion.region.size_bytes())) { return posix_error(); } - s.completion.queue = {}; + s.completion.region = {}; } }; - do_close(_seekable); - if(-1 != _seekable.fd) + if(-1 != _seekable_iouring_fd) { - if(-1 == ::close(_seekable.fd)) + do_close(_seekable); + if(-1 == ::close(_seekable_iouring_fd)) { return posix_error(); } + _seekable_iouring_fd = -1; } do_close(_nonseekable); OUTCOME_TRY(_base::close()); - _known_fds.clear(); + _registered_fds.clear(); _registered_buffers.clear(); return success(); } @@ -869,61 +925,50 @@ namespace test return ret; } - result _recalculate_registered_fds(int index, int fd) noexcept + result _change_fd_registration(bool is_seekable, int fd, bool enable) noexcept { if(_have_ioring_register_files_update) { - int32_t newvalue = fd; + int32_t newvalue = enable ? fd : -1; _io_uring_files_update upd; memset(&upd, 0, sizeof(upd)); - upd.offset = index; + upd.offset = fd; upd.fds = (__aligned_u64) &newvalue; - if(_io_uring_register(this->_v.fd, _IORING_REGISTER_FILES_UPDATE, &upd, 1) >= 0) // Linux kernel 5.5 onwards + if(_io_uring_register(is_seekable ? _seekable_iouring_fd : this->_v.fd, _IORING_REGISTER_FILES_UPDATE, &upd, 1) >= 0) // Linux kernel 5.5 onwards { return success(); } + // Failed, so disable ever calling this again _have_ioring_register_files_update = false; } -#if 0 // Hangs the ring until it empties, which locks this implementation - // Fall back to the old inefficient API - std::vector map(_known_fds.back() + 1, -1); - for(auto fd : _known_fds) - { - map[fd] = fd; - } - (void) _io_uring_register(this->_v.fd, _IORING_UNREGISTER_FILES, nullptr, 0); - if(_io_uring_register(this->_v.fd, _IORING_REGISTER_FILES, map.data(), map.size()) < 0) - { - return posix_error(); - } -#endif return success(); } virtual result do_io_handle_register(io_handle *h) noexcept override // linear complexity to total handles registered { _multiplexer_lock_guard g(this->_lock); - if(_registered_fds.size() >= 64) + if(h->is_seekable() && -1 == _seekable_iouring_fd) { - return errc::resource_unavailable_try_again; // This test multiplexer can't handle more than 64 handles + // Create the seekable io_uring ring + OUTCOME_TRY(init(true, _seekable)); } - int toinsert = h->native_handle().fd; - _registered_fds.push_back(); // capacity expansion - _registered_fds.pop_back(); + _registered_fd toinsert(h); _registered_fds.insert(std::lower_bound(_registered_fds.begin(), _registered_fds.end(), toinsert), toinsert); - return _recalculate_registered_fds(toinsert, toinsert); + return _change_fd_registration(h->is_seekable(), toinsert.fd, true); } virtual result do_io_handle_deregister(io_handle *h) noexcept override { _multiplexer_lock_guard g(this->_lock); - int toremove = h->native_handle().fd; + int fd = h->native_handle().fd; auto it = _find_fd(fd); - assert(it->first == nullptr); - if(it->first != nullptr) + assert(it->inprogress.reads.first == nullptr); + assert(it->inprogress.write_or_barrier == nullptr); + if(it->inprogress.reads.first != nullptr || it->inprogress.write_or_barrier != nullptr) { + // Can't deregister a handle with i/o in progress return errc::operation_in_progress; } _registered_fds.erase(it); - return _recalculate_registered_fds(toremove, -1); + return _change_fd_registration(h->is_seekable(), fd, false); } virtual size_t do_io_handle_max_buffers(const io_handle * /*unused*/) const noexcept override { return IOV_MAX; } @@ -997,7 +1042,6 @@ namespace test assert(false); return s; } - bool use_write_barrier_queue = false; switch(s) { case io_operation_state_type::read_initialised: @@ -1008,13 +1052,11 @@ namespace test case io_operation_state_type::write_initialised: { state->write_initiated(); - use_write_barrier_queue = true; break; } case io_operation_state_type::barrier_initialised: { state->barrier_initiated(); - use_write_barrier_queue = true; break; } } @@ -1024,7 +1066,7 @@ namespace test _multiplexer_lock_guard g(this->_lock); auto it = _find_fd(state->fd); assert(it != _registered_fds.end()); - _enqueue_to(use_write_barrier_queue ? it->queued_writes_or_barriers : it->queued_reads, state); + _enqueue_to(it->enqueued_io, state); return state->state; } @@ -1035,7 +1077,7 @@ namespace test virtual result flush_inited_io_operations() noexcept override { _multiplexer_lock_guard g(this->_lock); - _pump(g); + TODO mark all submitted i / os as initiated; return success(); } @@ -1115,11 +1157,11 @@ namespace test while(max_completions > 0) { _multiplexer_lock_guard g(this->_lock); - // If another kernel thread woke me, exit the loop - if(_wakecount > 0) + TODO if there is a timeout, submit a timeout before this; + auto ret = io_uring_enter(_v.fd, 0, 1, _IORING_ENTER_GETEVENTS); + if(ret< 0) { - --_wakecount; - break; + return posix_error(); } auto *c = _first; if(c != nullptr) @@ -1165,23 +1207,35 @@ namespace test virtual result wake_check_for_any_completed_io() noexcept override { _multiplexer_lock_guard g(this->_lock); - ++_wakecount; + // Post a null SQE, it'll break out any waits + const uint32_t tail = inst.submission.tail->load(std::memory_order_acquire); + if(tail == inst.submission.head->load(std::memory_order_relaxed)) + { + return errc::resource_unavailable_try_again; // SQE ring is full + } + const uint32_t sqeidx = tail & inst.submission.ring_mask; + _io_uring_sqe *sqe = &inst.submission.entries[sqeidx]; + memset(sqe, 0, sizeof(_io_uring_sqe)); + sqe->opcode = _IORING_OP_NOP; + inst.submission.array[sqeidx] = sqeidx; + inst.submission.tail->fetch_add(1, std::memory_order_release); return success(); } }; - LLFIO_HEADERS_ONLY_FUNC_SPEC result multiplexer_linux_io_uring_multiplexer(size_t threads, bool is_polling) noexcept + LLFIO_HEADERS_ONLY_FUNC_SPEC result multiplexer_linux_io_uring(size_t threads, bool is_polling) noexcept { try { if(1 == threads) { - auto ret = std::make_unique>(); - OUTCOME_TRY(ret->init(1, is_polling)); + // Make non locking edition + auto ret = std::make_unique>(is_polling); + OUTCOME_TRY(ret->init(false, ret->_nonseekable)); return io_multiplexer_ptr(ret.release()); } - auto ret = std::make_unique>(); - OUTCOME_TRY(ret->init(threads, is_polling)); + auto ret = std::make_unique>(is_polling); + OUTCOME_TRY(ret->init(false, ret->_nonseekable)); return io_multiplexer_ptr(ret.release()); } catch(...) diff --git a/include/llfio/v2.0/io_multiplexer.hpp b/include/llfio/v2.0/io_multiplexer.hpp index 48008286..070cf0c1 100644 --- a/include/llfio/v2.0/io_multiplexer.hpp +++ b/include/llfio/v2.0/io_multiplexer.hpp @@ -1406,7 +1406,7 @@ namespace test #if defined(__linux__) || DOXYGEN_IS_IN_THE_HOUSE // LLFIO_HEADERS_ONLY_FUNC_SPEC result multiplexer_linux_epoll(size_t threads) noexcept; -// LLFIO_HEADERS_ONLY_FUNC_SPEC result multiplexer_linux_io_uring() noexcept; + LLFIO_HEADERS_ONLY_FUNC_SPEC result multiplexer_linux_io_uring(size_t threads, bool is_polling) noexcept; #endif #if(defined(__FreeBSD__) || defined(__APPLE__)) || DOXYGEN_IS_IN_THE_HOUSE // LLFIO_HEADERS_ONLY_FUNC_SPEC result multiplexer_bsd_kqueue(size_t threads) noexcept; -- cgit v1.2.3