diff options
author | Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com> | 2022-04-17 00:39:15 +0300 |
---|---|---|
committer | Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com> | 2022-04-17 00:39:15 +0300 |
commit | 621d1a7907e51ff06a52462a85d4bc8702bc1acb (patch) | |
tree | cc6cac2fad15033d72dd3ba9929927ff55f2bb4a | |
parent | ac897294b2e9a0383a754527efbbbcc30a582b21 (diff) |
Fix installability.
-rw-r--r-- | .github/workflows/installability.yml | 2 | ||||
-rw-r--r-- | include/llfio/revision.hpp | 6 | ||||
-rw-r--r-- | include/llfio/v2.0/detail/impl/tls_socket_sources/openssl.ipp | 90 | ||||
-rw-r--r-- | test/tests/byte_socket_handle.cpp | 282 |
4 files changed, 220 insertions, 160 deletions
diff --git a/.github/workflows/installability.yml b/.github/workflows/installability.yml index 2234f023..8fbe2289 100644 --- a/.github/workflows/installability.yml +++ b/.github/workflows/installability.yml @@ -34,7 +34,7 @@ jobs: run: | git config --global core.longpaths true if [ "${{ matrix.configuration }}" = "status_code" ]; then - export CMAKE_CONFIGURE_OPTIONS="-DLLFIO_USE_EXPERIMENTAL_SG14_STATUS_CODE=ON" + export CMAKE_CONFIGURE_OPTIONS="-DLLFIO_USE_EXPERIMENTAL_SG14_STATUS_CODE=ON;-DLLFIO_FORCE_OPENSSL_OFF=ON" fi git clone --depth 1 https://github.com/ned14/quickcpplib.git pip install --user gitpython diff --git a/include/llfio/revision.hpp b/include/llfio/revision.hpp index aac0b15c..9e150cc6 100644 --- a/include/llfio/revision.hpp +++ b/include/llfio/revision.hpp @@ -1,4 +1,4 @@ // Note the second line of this file must ALWAYS be the git SHA, third line ALWAYS the git SHA update time -#define LLFIO_PREVIOUS_COMMIT_REF 5c6f7f3933de89fb4e4a9aa7df69da933a8f09aa -#define LLFIO_PREVIOUS_COMMIT_DATE "2022-04-15 23:08:23 +00:00" -#define LLFIO_PREVIOUS_COMMIT_UNIQUE 5c6f7f39 +#define LLFIO_PREVIOUS_COMMIT_REF ac897294b2e9a0383a754527efbbbcc30a582b21 +#define LLFIO_PREVIOUS_COMMIT_DATE "2022-04-16 19:00:45 +00:00" +#define LLFIO_PREVIOUS_COMMIT_UNIQUE ac897294 diff --git a/include/llfio/v2.0/detail/impl/tls_socket_sources/openssl.ipp b/include/llfio/v2.0/detail/impl/tls_socket_sources/openssl.ipp index ffb72949..a1c22b46 100644 --- a/include/llfio/v2.0/detail/impl/tls_socket_sources/openssl.ipp +++ b/include/llfio/v2.0/detail/impl/tls_socket_sources/openssl.ipp @@ -1048,35 +1048,75 @@ public: virtual result<void> close() noexcept override { LLFIO_LOG_FUNCTION_CALL(this); - _lock_holder.lock(); - auto unlock = make_scope_exit( - [this]() noexcept + if(_v) { - if(_lock_holder.owns_lock()) + _lock_holder.lock(); + auto unlock = make_scope_exit( + [this]() noexcept + { + if(_lock_holder.owns_lock()) + { + _lock_holder.unlock(); + } + }); + OUTCOME_TRY(_flush_towrite({})); + if(are_safety_barriers_issued() && is_writable()) { _lock_holder.unlock(); + auto r = shutdown(shutdown_write); + _lock_holder.lock(); + if(r) + { + byte buffer[4096]; + for(;;) + { + _lock_holder.unlock(); + OUTCOME_TRY(auto readed, read(0, {{buffer}})); + _lock_holder.lock(); + if(readed == 0) + { + break; + } + } + } + else if(r.error() != errc::not_connected) + { + OUTCOME_TRY(std::move(r)); + } + } + if(_ssl_bio != nullptr) + { + BIO_free_all(_ssl_bio); // also frees _self_bio + _ssl_bio = nullptr; + } + if(_v.behaviour & native_handle_type::disposition::is_pointer) + { + tls_socket_handle::release(); + } + else + { + if(!!(_v.behaviour & native_handle_type::disposition::_is_connected) && _still_connecting == 0) + { + _lock_holder.unlock(); + auto r = tls_socket_handle::close(); + if(!r) + { + char msg[1024]; +#ifdef _MSC_VER + sprintf_s(msg, "WARNING: openssl_socket_handle::close() underlying close() fails with %s", r.error().message().c_str()); +#else + sprintf(msg, "WARNING: openssl_socket_handle::close() underlying close() fails with %s", r.error().message().c_str()); +#endif + LLFIO_LOG_WARN(this, msg); + } + _lock_holder.lock(); + } + } + for(size_t n = 0; n < BUFFERS_COUNT; n++) + { + _read_buffers_valid[n] = {}; + _read_buffers[n].reset(); } - }); - OUTCOME_TRY(_flush_towrite({})); - if(_ssl_bio != nullptr) - { - BIO_free_all(_ssl_bio); // also frees _self_bio - _ssl_bio = nullptr; - } - if(_v.behaviour & native_handle_type::disposition::is_pointer) - { - tls_socket_handle::release(); - } - else - { - _lock_holder.unlock(); - OUTCOME_TRY(tls_socket_handle::close()); - _lock_holder.lock(); - } - for(size_t n = 0; n < BUFFERS_COUNT; n++) - { - _read_buffers_valid[n] = {}; - _read_buffers[n].reset(); } return success(); } diff --git a/test/tests/byte_socket_handle.cpp b/test/tests/byte_socket_handle.cpp index 785398fb..f2c601aa 100644 --- a/test/tests/byte_socket_handle.cpp +++ b/test/tests/byte_socket_handle.cpp @@ -375,7 +375,8 @@ static inline void TestSocketResolve() static inline void TestBlockingSocketHandles() { namespace llfio = LLFIO_V2_NAMESPACE; - auto serversocket = llfio::listening_byte_socket_handle::listening_byte_socket(llfio::ip::family::v4, llfio::listening_byte_socket_handle::mode::read).value(); + auto serversocket = + llfio::listening_byte_socket_handle::listening_byte_socket(llfio::ip::family::v4, llfio::listening_byte_socket_handle::mode::read).value(); BOOST_REQUIRE(serversocket.is_valid()); BOOST_CHECK(serversocket.is_socket()); BOOST_CHECK(serversocket.is_readable()); @@ -389,7 +390,9 @@ static inline void TestBlockingSocketHandles() << std::endl; return; } - auto readerthread = std::async([serversocket = std::move(serversocket)]() mutable { + auto readerthread = std::async( + [serversocket = std::move(serversocket)]() mutable + { std::pair<llfio::byte_socket_handle, llfio::ip::address> s; serversocket.read({s}).value(); // This immediately blocks in blocking mode BOOST_REQUIRE(s.first.is_valid()); @@ -438,9 +441,10 @@ static inline void TestBlockingSocketHandles() static inline void TestNonBlockingSocketHandles() { namespace llfio = LLFIO_V2_NAMESPACE; - auto serversocket = llfio::listening_byte_socket_handle::listening_byte_socket(llfio::ip::family::v4, llfio::listening_byte_socket_handle::mode::read, - llfio::byte_socket_handle::caching::all, llfio::byte_socket_handle::flag::multiplexable) - .value(); + auto serversocket = + llfio::listening_byte_socket_handle::listening_byte_socket(llfio::ip::family::v4, llfio::listening_byte_socket_handle::mode::read, + llfio::byte_socket_handle::caching::all, llfio::byte_socket_handle::flag::multiplexable) + .value(); BOOST_REQUIRE(serversocket.is_valid()); BOOST_CHECK(serversocket.is_socket()); BOOST_CHECK(serversocket.is_readable()); @@ -507,7 +511,8 @@ static inline void TestMultiplexedSocketHandles() { static constexpr size_t MAX_SOCKETS = 64; namespace llfio = LLFIO_V2_NAMESPACE; - auto test_multiplexer = [](llfio::byte_io_multiplexer_ptr multiplexer) { + auto test_multiplexer = [](llfio::byte_io_multiplexer_ptr multiplexer) + { std::vector<llfio::byte_socket_handle> read_sockets, write_sockets; std::vector<size_t> received_for(MAX_SOCKETS); struct checking_receiver final : public llfio::byte_io_multiplexer::io_operation_state_visitor @@ -603,7 +608,7 @@ static inline void TestMultiplexedSocketHandles() }; auto serversocket = llfio::listening_byte_socket_handle::listening_byte_socket(llfio::ip::family::v4, llfio::listening_byte_socket_handle::mode::write, - llfio::byte_socket_handle::caching::all, llfio::byte_socket_handle::flag::multiplexable) + llfio::byte_socket_handle::caching::all, llfio::byte_socket_handle::flag::multiplexable) .value(); serversocket.bind(llfio::ip::address_v4::loopback()).value(); auto endpoint = serversocket.local_endpoint().value(); @@ -629,7 +634,9 @@ static inline void TestMultiplexedSocketHandles() read_sockets.push_back(std::move(read_socket.first)); write_sockets.push_back(std::move(write_socket)); } - auto writerthread = std::async([&] { + auto writerthread = std::async( + [&] + { std::this_thread::sleep_for(std::chrono::milliseconds(500)); for(size_t n = MAX_SOCKETS - 1; n < MAX_SOCKETS; n--) { @@ -692,7 +699,8 @@ static inline void TestCoroutinedSocketHandles() { static constexpr size_t MAX_SOCKETS = 70; namespace llfio = LLFIO_V2_NAMESPACE; - auto test_multiplexer = [](llfio::byte_io_multiplexer_ptr multiplexer) { + auto test_multiplexer = [](llfio::byte_io_multiplexer_ptr multiplexer) + { struct coroutine { llfio::byte_socket_handle read_socket, write_socket; @@ -727,8 +735,8 @@ static inline void TestCoroutinedSocketHandles() } }; auto serversocket = - llfio::listening_byte_socket_handle::listening_socket(llfio::ip::family::v4, llfio::listening_byte_socket_handle::mode::write, llfio::byte_socket_handle::caching::all, - llfio::byte_socket_handle::flag::multiplexable) + llfio::listening_byte_socket_handle::listening_socket(llfio::ip::family::v4, llfio::listening_byte_socket_handle::mode::write, + llfio::byte_socket_handle::caching::all, llfio::byte_socket_handle::flag::multiplexable) .value(); serversocket.bind(llfio::ip::address_v4::loopback()).value(); auto endpoint = serversocket.local_endpoint().value(); @@ -844,126 +852,138 @@ static inline void TestPollingSocketHandles() } QUICKCPPLIB_NAMESPACE::algorithm::small_prng::random_shuffle(idxs.begin(), idxs.end()); std::mutex lock; - std::atomic<size_t> currently_connecting{(size_t)-1}; - auto poll_listening_task = std::async(std::launch::async, [&] { - std::vector<llfio::pollable_handle *> handles; - std::vector<llfio::poll_what> what, out; - for(size_t n = 0; n < MAX_SOCKETS; n++) - { - handles.push_back(&listening[n].first); - what.push_back(llfio::poll_what::is_readable); - out.push_back(llfio::poll_what::none); - } - for(;;) - { - int ret = (int) llfio::poll(out, {handles}, what, std::chrono::seconds(30)).value(); - bool done = true; - for(size_t n = 0; n < MAX_SOCKETS; n++) - { - auto idx = idxs[n]; - if(handles[idx] != nullptr) - { - done = false; - if(out[idx] & llfio::poll_what::is_readable) - { - { - std::lock_guard<std::mutex> g(lock); - std::cout << "Poll listening sees readable (raw = " << (int) (uint8_t) out[idx] << ") on socket " << idx << ". Currently connecting is " - << currently_connecting << std::endl; - } - BOOST_CHECK(currently_connecting == idx); - std::pair<llfio::byte_socket_handle, llfio::ip::address> s; - listening[idx].first.read({s}).value(); - handles[idx] = nullptr; - ret--; - } - out[idx] = llfio::poll_what::none; - } - } - BOOST_CHECK(ret == 0); - if(done) - { - std::lock_guard<std::mutex> g(lock); - std::cout << "Poll listening task exits." << std::endl; - break; - } - } - }); - auto poll_connecting_task = std::async(std::launch::async, [&] { - std::vector<llfio::pollable_handle *> handles; - std::vector<llfio::poll_what> what, out; - for(size_t n = 0; n < MAX_SOCKETS; n++) - { - handles.push_back(&sockets[n]); - what.push_back(llfio::poll_what::is_writable); - out.push_back(llfio::poll_what::none); - } - for(;;) - { - int ret = (int) llfio::poll(out, {handles}, what, std::chrono::seconds(30)).value(); - bool done = true, saw_closed = false; - size_t remaining = MAX_SOCKETS; - for(size_t n = 0; n < MAX_SOCKETS; n++) - { - auto idx = idxs[n]; - if(handles[idx] != nullptr) - { - done = false; - // On Linux, a new socket not yet connected MAY appear as both writable and hanged up, - // so filter out the closed. - if(!(out[idx] & llfio::poll_what::is_closed) || (remaining == 1 && currently_connecting == idx)) - { - if(out[idx] & llfio::poll_what::is_writable) - { - { - std::lock_guard<std::mutex> g(lock); - std::cout << "Poll connect sees writable (raw = " << (int) (uint8_t) out[idx] << ") on socket " << idx << ". Currently connecting is " - << currently_connecting << std::endl; - } - BOOST_CHECK(currently_connecting == idx); - handles[idx] = nullptr; - ret--; - } - } - else - { - saw_closed = true; - } - out[idx] = llfio::poll_what::none; - } - else - { - remaining--; - } - } - if(!saw_closed) - { - BOOST_CHECK(ret == 0); - } - if(done) - { - std::lock_guard<std::mutex> g(lock); - std::cout << "Poll connect task exits." << std::endl; - break; - } - } - }); - auto connect_task = std::async(std::launch::async, [&] { - for(size_t n = 0; n < MAX_SOCKETS; n++) - { - auto idx = idxs[n]; - { - std::lock_guard<std::mutex> g(lock); - std::cout << "Connecting " << idx << " ... " << std::endl; - } - currently_connecting = idx; - sockets[idx].connect(listening[idx].second).value(); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - std::this_thread::sleep_for(std::chrono::seconds(1)); - std::lock_guard<std::mutex> g(lock); - std::cout << "Connecting task exits." << std::endl; - }); + std::atomic<size_t> currently_connecting{(size_t) -1}; + auto poll_listening_task = std::async(std::launch::async, + [&] + { + std::vector<llfio::pollable_handle *> handles; + std::vector<llfio::poll_what> what, out; + for(size_t n = 0; n < MAX_SOCKETS; n++) + { + handles.push_back(&listening[n].first); + what.push_back(llfio::poll_what::is_readable); + out.push_back(llfio::poll_what::none); + } + for(;;) + { + int ret = (int) llfio::poll(out, {handles}, what, std::chrono::seconds(30)).value(); + bool done = true; + for(size_t n = 0; n < MAX_SOCKETS; n++) + { + auto idx = idxs[n]; + if(handles[idx] != nullptr) + { + done = false; + if(out[idx] & llfio::poll_what::is_readable) + { + { + std::lock_guard<std::mutex> g(lock); + std::cout << "Poll listening sees readable (raw = " << (int) (uint8_t) out[idx] << ") on socket " << idx + << ". Currently connecting is " << currently_connecting << std::endl; + } + BOOST_CHECK(currently_connecting == idx); + std::pair<llfio::byte_socket_handle, llfio::ip::address> s; + listening[idx].first.read({s}).value(); + llfio::byte buf; + llfio::byte_socket_handle::buffer_type b(&buf, 1); + s.first.read({{b}}); + handles[idx] = nullptr; + ret--; + } + out[idx] = llfio::poll_what::none; + } + } + BOOST_CHECK(ret == 0); + if(done) + { + std::lock_guard<std::mutex> g(lock); + std::cout << "Poll listening task exits." << std::endl; + break; + } + } + }); + auto poll_connecting_task = std::async(std::launch::async, + [&] + { + std::vector<llfio::pollable_handle *> handles; + std::vector<llfio::poll_what> what, out; + for(size_t n = 0; n < MAX_SOCKETS; n++) + { + handles.push_back(&sockets[n]); + what.push_back(llfio::poll_what::is_writable); + out.push_back(llfio::poll_what::none); + } + for(;;) + { + int ret = (int) llfio::poll(out, {handles}, what, std::chrono::seconds(30)).value(); + bool done = true, saw_closed = false; + size_t remaining = MAX_SOCKETS; + for(size_t n = 0; n < MAX_SOCKETS; n++) + { + auto idx = idxs[n]; + if(handles[idx] != nullptr) + { + done = false; + // On Linux, a new socket not yet connected MAY appear as both writable and hanged up, + // so filter out the closed. + if(!(out[idx] & llfio::poll_what::is_closed) || (remaining == 1 && currently_connecting == idx)) + { + if(out[idx] & llfio::poll_what::is_writable) + { + { + std::lock_guard<std::mutex> g(lock); + std::cout << "Poll connect sees writable (raw = " << (int) (uint8_t) out[idx] << ") on socket " << idx + << ". Currently connecting is " << currently_connecting << std::endl; + } + BOOST_CHECK(currently_connecting == idx); + handles[idx] = nullptr; + ret--; + } + } + else + { + saw_closed = true; + } + out[idx] = llfio::poll_what::none; + } + else + { + remaining--; + } + } + if(!saw_closed) + { + BOOST_CHECK(ret == 0); + } + if(done) + { + std::lock_guard<std::mutex> g(lock); + std::cout << "Poll connect task exits." << std::endl; + break; + } + } + }); + auto connect_task = std::async(std::launch::async, + [&] + { + for(size_t n = 0; n < MAX_SOCKETS; n++) + { + auto idx = idxs[n]; + { + std::lock_guard<std::mutex> g(lock); + std::cout << "Connecting " << idx << " ... " << std::endl; + } + currently_connecting = idx; + sockets[idx].connect(listening[idx].second).value(); + llfio::byte buf(llfio::to_byte(0)); + llfio::byte_socket_handle::const_buffer_type b(&buf, 1); + sockets[idx].write({{&b, 1}}); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + std::this_thread::sleep_for(std::chrono::seconds(1)); + std::lock_guard<std::mutex> g(lock); + std::cout << "Connecting task exits." << std::endl; + }); connect_task.get(); poll_listening_task.get(); poll_connecting_task.get(); |