Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/ned14/llfio.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2022-04-17 00:39:15 +0300
committerNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2022-04-17 00:39:15 +0300
commit621d1a7907e51ff06a52462a85d4bc8702bc1acb (patch)
treecc6cac2fad15033d72dd3ba9929927ff55f2bb4a
parentac897294b2e9a0383a754527efbbbcc30a582b21 (diff)
Fix installability.
-rw-r--r--.github/workflows/installability.yml2
-rw-r--r--include/llfio/revision.hpp6
-rw-r--r--include/llfio/v2.0/detail/impl/tls_socket_sources/openssl.ipp90
-rw-r--r--test/tests/byte_socket_handle.cpp282
4 files changed, 220 insertions, 160 deletions
diff --git a/.github/workflows/installability.yml b/.github/workflows/installability.yml
index 2234f023..8fbe2289 100644
--- a/.github/workflows/installability.yml
+++ b/.github/workflows/installability.yml
@@ -34,7 +34,7 @@ jobs:
run: |
git config --global core.longpaths true
if [ "${{ matrix.configuration }}" = "status_code" ]; then
- export CMAKE_CONFIGURE_OPTIONS="-DLLFIO_USE_EXPERIMENTAL_SG14_STATUS_CODE=ON"
+ export CMAKE_CONFIGURE_OPTIONS="-DLLFIO_USE_EXPERIMENTAL_SG14_STATUS_CODE=ON;-DLLFIO_FORCE_OPENSSL_OFF=ON"
fi
git clone --depth 1 https://github.com/ned14/quickcpplib.git
pip install --user gitpython
diff --git a/include/llfio/revision.hpp b/include/llfio/revision.hpp
index aac0b15c..9e150cc6 100644
--- a/include/llfio/revision.hpp
+++ b/include/llfio/revision.hpp
@@ -1,4 +1,4 @@
// Note the second line of this file must ALWAYS be the git SHA, third line ALWAYS the git SHA update time
-#define LLFIO_PREVIOUS_COMMIT_REF 5c6f7f3933de89fb4e4a9aa7df69da933a8f09aa
-#define LLFIO_PREVIOUS_COMMIT_DATE "2022-04-15 23:08:23 +00:00"
-#define LLFIO_PREVIOUS_COMMIT_UNIQUE 5c6f7f39
+#define LLFIO_PREVIOUS_COMMIT_REF ac897294b2e9a0383a754527efbbbcc30a582b21
+#define LLFIO_PREVIOUS_COMMIT_DATE "2022-04-16 19:00:45 +00:00"
+#define LLFIO_PREVIOUS_COMMIT_UNIQUE ac897294
diff --git a/include/llfio/v2.0/detail/impl/tls_socket_sources/openssl.ipp b/include/llfio/v2.0/detail/impl/tls_socket_sources/openssl.ipp
index ffb72949..a1c22b46 100644
--- a/include/llfio/v2.0/detail/impl/tls_socket_sources/openssl.ipp
+++ b/include/llfio/v2.0/detail/impl/tls_socket_sources/openssl.ipp
@@ -1048,35 +1048,75 @@ public:
virtual result<void> close() noexcept override
{
LLFIO_LOG_FUNCTION_CALL(this);
- _lock_holder.lock();
- auto unlock = make_scope_exit(
- [this]() noexcept
+ if(_v)
{
- if(_lock_holder.owns_lock())
+ _lock_holder.lock();
+ auto unlock = make_scope_exit(
+ [this]() noexcept
+ {
+ if(_lock_holder.owns_lock())
+ {
+ _lock_holder.unlock();
+ }
+ });
+ OUTCOME_TRY(_flush_towrite({}));
+ if(are_safety_barriers_issued() && is_writable())
{
_lock_holder.unlock();
+ auto r = shutdown(shutdown_write);
+ _lock_holder.lock();
+ if(r)
+ {
+ byte buffer[4096];
+ for(;;)
+ {
+ _lock_holder.unlock();
+ OUTCOME_TRY(auto readed, read(0, {{buffer}}));
+ _lock_holder.lock();
+ if(readed == 0)
+ {
+ break;
+ }
+ }
+ }
+ else if(r.error() != errc::not_connected)
+ {
+ OUTCOME_TRY(std::move(r));
+ }
+ }
+ if(_ssl_bio != nullptr)
+ {
+ BIO_free_all(_ssl_bio); // also frees _self_bio
+ _ssl_bio = nullptr;
+ }
+ if(_v.behaviour & native_handle_type::disposition::is_pointer)
+ {
+ tls_socket_handle::release();
+ }
+ else
+ {
+ if(!!(_v.behaviour & native_handle_type::disposition::_is_connected) && _still_connecting == 0)
+ {
+ _lock_holder.unlock();
+ auto r = tls_socket_handle::close();
+ if(!r)
+ {
+ char msg[1024];
+#ifdef _MSC_VER
+ sprintf_s(msg, "WARNING: openssl_socket_handle::close() underlying close() fails with %s", r.error().message().c_str());
+#else
+ sprintf(msg, "WARNING: openssl_socket_handle::close() underlying close() fails with %s", r.error().message().c_str());
+#endif
+ LLFIO_LOG_WARN(this, msg);
+ }
+ _lock_holder.lock();
+ }
+ }
+ for(size_t n = 0; n < BUFFERS_COUNT; n++)
+ {
+ _read_buffers_valid[n] = {};
+ _read_buffers[n].reset();
}
- });
- OUTCOME_TRY(_flush_towrite({}));
- if(_ssl_bio != nullptr)
- {
- BIO_free_all(_ssl_bio); // also frees _self_bio
- _ssl_bio = nullptr;
- }
- if(_v.behaviour & native_handle_type::disposition::is_pointer)
- {
- tls_socket_handle::release();
- }
- else
- {
- _lock_holder.unlock();
- OUTCOME_TRY(tls_socket_handle::close());
- _lock_holder.lock();
- }
- for(size_t n = 0; n < BUFFERS_COUNT; n++)
- {
- _read_buffers_valid[n] = {};
- _read_buffers[n].reset();
}
return success();
}
diff --git a/test/tests/byte_socket_handle.cpp b/test/tests/byte_socket_handle.cpp
index 785398fb..f2c601aa 100644
--- a/test/tests/byte_socket_handle.cpp
+++ b/test/tests/byte_socket_handle.cpp
@@ -375,7 +375,8 @@ static inline void TestSocketResolve()
static inline void TestBlockingSocketHandles()
{
namespace llfio = LLFIO_V2_NAMESPACE;
- auto serversocket = llfio::listening_byte_socket_handle::listening_byte_socket(llfio::ip::family::v4, llfio::listening_byte_socket_handle::mode::read).value();
+ auto serversocket =
+ llfio::listening_byte_socket_handle::listening_byte_socket(llfio::ip::family::v4, llfio::listening_byte_socket_handle::mode::read).value();
BOOST_REQUIRE(serversocket.is_valid());
BOOST_CHECK(serversocket.is_socket());
BOOST_CHECK(serversocket.is_readable());
@@ -389,7 +390,9 @@ static inline void TestBlockingSocketHandles()
<< std::endl;
return;
}
- auto readerthread = std::async([serversocket = std::move(serversocket)]() mutable {
+ auto readerthread = std::async(
+ [serversocket = std::move(serversocket)]() mutable
+ {
std::pair<llfio::byte_socket_handle, llfio::ip::address> s;
serversocket.read({s}).value(); // This immediately blocks in blocking mode
BOOST_REQUIRE(s.first.is_valid());
@@ -438,9 +441,10 @@ static inline void TestBlockingSocketHandles()
static inline void TestNonBlockingSocketHandles()
{
namespace llfio = LLFIO_V2_NAMESPACE;
- auto serversocket = llfio::listening_byte_socket_handle::listening_byte_socket(llfio::ip::family::v4, llfio::listening_byte_socket_handle::mode::read,
- llfio::byte_socket_handle::caching::all, llfio::byte_socket_handle::flag::multiplexable)
- .value();
+ auto serversocket =
+ llfio::listening_byte_socket_handle::listening_byte_socket(llfio::ip::family::v4, llfio::listening_byte_socket_handle::mode::read,
+ llfio::byte_socket_handle::caching::all, llfio::byte_socket_handle::flag::multiplexable)
+ .value();
BOOST_REQUIRE(serversocket.is_valid());
BOOST_CHECK(serversocket.is_socket());
BOOST_CHECK(serversocket.is_readable());
@@ -507,7 +511,8 @@ static inline void TestMultiplexedSocketHandles()
{
static constexpr size_t MAX_SOCKETS = 64;
namespace llfio = LLFIO_V2_NAMESPACE;
- auto test_multiplexer = [](llfio::byte_io_multiplexer_ptr multiplexer) {
+ auto test_multiplexer = [](llfio::byte_io_multiplexer_ptr multiplexer)
+ {
std::vector<llfio::byte_socket_handle> read_sockets, write_sockets;
std::vector<size_t> received_for(MAX_SOCKETS);
struct checking_receiver final : public llfio::byte_io_multiplexer::io_operation_state_visitor
@@ -603,7 +608,7 @@ static inline void TestMultiplexedSocketHandles()
};
auto serversocket =
llfio::listening_byte_socket_handle::listening_byte_socket(llfio::ip::family::v4, llfio::listening_byte_socket_handle::mode::write,
- llfio::byte_socket_handle::caching::all, llfio::byte_socket_handle::flag::multiplexable)
+ llfio::byte_socket_handle::caching::all, llfio::byte_socket_handle::flag::multiplexable)
.value();
serversocket.bind(llfio::ip::address_v4::loopback()).value();
auto endpoint = serversocket.local_endpoint().value();
@@ -629,7 +634,9 @@ static inline void TestMultiplexedSocketHandles()
read_sockets.push_back(std::move(read_socket.first));
write_sockets.push_back(std::move(write_socket));
}
- auto writerthread = std::async([&] {
+ auto writerthread = std::async(
+ [&]
+ {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
for(size_t n = MAX_SOCKETS - 1; n < MAX_SOCKETS; n--)
{
@@ -692,7 +699,8 @@ static inline void TestCoroutinedSocketHandles()
{
static constexpr size_t MAX_SOCKETS = 70;
namespace llfio = LLFIO_V2_NAMESPACE;
- auto test_multiplexer = [](llfio::byte_io_multiplexer_ptr multiplexer) {
+ auto test_multiplexer = [](llfio::byte_io_multiplexer_ptr multiplexer)
+ {
struct coroutine
{
llfio::byte_socket_handle read_socket, write_socket;
@@ -727,8 +735,8 @@ static inline void TestCoroutinedSocketHandles()
}
};
auto serversocket =
- llfio::listening_byte_socket_handle::listening_socket(llfio::ip::family::v4, llfio::listening_byte_socket_handle::mode::write, llfio::byte_socket_handle::caching::all,
- llfio::byte_socket_handle::flag::multiplexable)
+ llfio::listening_byte_socket_handle::listening_socket(llfio::ip::family::v4, llfio::listening_byte_socket_handle::mode::write,
+ llfio::byte_socket_handle::caching::all, llfio::byte_socket_handle::flag::multiplexable)
.value();
serversocket.bind(llfio::ip::address_v4::loopback()).value();
auto endpoint = serversocket.local_endpoint().value();
@@ -844,126 +852,138 @@ static inline void TestPollingSocketHandles()
}
QUICKCPPLIB_NAMESPACE::algorithm::small_prng::random_shuffle(idxs.begin(), idxs.end());
std::mutex lock;
- std::atomic<size_t> currently_connecting{(size_t)-1};
- auto poll_listening_task = std::async(std::launch::async, [&] {
- std::vector<llfio::pollable_handle *> handles;
- std::vector<llfio::poll_what> what, out;
- for(size_t n = 0; n < MAX_SOCKETS; n++)
- {
- handles.push_back(&listening[n].first);
- what.push_back(llfio::poll_what::is_readable);
- out.push_back(llfio::poll_what::none);
- }
- for(;;)
- {
- int ret = (int) llfio::poll(out, {handles}, what, std::chrono::seconds(30)).value();
- bool done = true;
- for(size_t n = 0; n < MAX_SOCKETS; n++)
- {
- auto idx = idxs[n];
- if(handles[idx] != nullptr)
- {
- done = false;
- if(out[idx] & llfio::poll_what::is_readable)
- {
- {
- std::lock_guard<std::mutex> g(lock);
- std::cout << "Poll listening sees readable (raw = " << (int) (uint8_t) out[idx] << ") on socket " << idx << ". Currently connecting is "
- << currently_connecting << std::endl;
- }
- BOOST_CHECK(currently_connecting == idx);
- std::pair<llfio::byte_socket_handle, llfio::ip::address> s;
- listening[idx].first.read({s}).value();
- handles[idx] = nullptr;
- ret--;
- }
- out[idx] = llfio::poll_what::none;
- }
- }
- BOOST_CHECK(ret == 0);
- if(done)
- {
- std::lock_guard<std::mutex> g(lock);
- std::cout << "Poll listening task exits." << std::endl;
- break;
- }
- }
- });
- auto poll_connecting_task = std::async(std::launch::async, [&] {
- std::vector<llfio::pollable_handle *> handles;
- std::vector<llfio::poll_what> what, out;
- for(size_t n = 0; n < MAX_SOCKETS; n++)
- {
- handles.push_back(&sockets[n]);
- what.push_back(llfio::poll_what::is_writable);
- out.push_back(llfio::poll_what::none);
- }
- for(;;)
- {
- int ret = (int) llfio::poll(out, {handles}, what, std::chrono::seconds(30)).value();
- bool done = true, saw_closed = false;
- size_t remaining = MAX_SOCKETS;
- for(size_t n = 0; n < MAX_SOCKETS; n++)
- {
- auto idx = idxs[n];
- if(handles[idx] != nullptr)
- {
- done = false;
- // On Linux, a new socket not yet connected MAY appear as both writable and hanged up,
- // so filter out the closed.
- if(!(out[idx] & llfio::poll_what::is_closed) || (remaining == 1 && currently_connecting == idx))
- {
- if(out[idx] & llfio::poll_what::is_writable)
- {
- {
- std::lock_guard<std::mutex> g(lock);
- std::cout << "Poll connect sees writable (raw = " << (int) (uint8_t) out[idx] << ") on socket " << idx << ". Currently connecting is "
- << currently_connecting << std::endl;
- }
- BOOST_CHECK(currently_connecting == idx);
- handles[idx] = nullptr;
- ret--;
- }
- }
- else
- {
- saw_closed = true;
- }
- out[idx] = llfio::poll_what::none;
- }
- else
- {
- remaining--;
- }
- }
- if(!saw_closed)
- {
- BOOST_CHECK(ret == 0);
- }
- if(done)
- {
- std::lock_guard<std::mutex> g(lock);
- std::cout << "Poll connect task exits." << std::endl;
- break;
- }
- }
- });
- auto connect_task = std::async(std::launch::async, [&] {
- for(size_t n = 0; n < MAX_SOCKETS; n++)
- {
- auto idx = idxs[n];
- {
- std::lock_guard<std::mutex> g(lock);
- std::cout << "Connecting " << idx << " ... " << std::endl;
- }
- currently_connecting = idx;
- sockets[idx].connect(listening[idx].second).value();
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
- }
- std::this_thread::sleep_for(std::chrono::seconds(1));
- std::lock_guard<std::mutex> g(lock);
- std::cout << "Connecting task exits." << std::endl;
- });
+ std::atomic<size_t> currently_connecting{(size_t) -1};
+ auto poll_listening_task = std::async(std::launch::async,
+ [&]
+ {
+ std::vector<llfio::pollable_handle *> handles;
+ std::vector<llfio::poll_what> what, out;
+ for(size_t n = 0; n < MAX_SOCKETS; n++)
+ {
+ handles.push_back(&listening[n].first);
+ what.push_back(llfio::poll_what::is_readable);
+ out.push_back(llfio::poll_what::none);
+ }
+ for(;;)
+ {
+ int ret = (int) llfio::poll(out, {handles}, what, std::chrono::seconds(30)).value();
+ bool done = true;
+ for(size_t n = 0; n < MAX_SOCKETS; n++)
+ {
+ auto idx = idxs[n];
+ if(handles[idx] != nullptr)
+ {
+ done = false;
+ if(out[idx] & llfio::poll_what::is_readable)
+ {
+ {
+ std::lock_guard<std::mutex> g(lock);
+ std::cout << "Poll listening sees readable (raw = " << (int) (uint8_t) out[idx] << ") on socket " << idx
+ << ". Currently connecting is " << currently_connecting << std::endl;
+ }
+ BOOST_CHECK(currently_connecting == idx);
+ std::pair<llfio::byte_socket_handle, llfio::ip::address> s;
+ listening[idx].first.read({s}).value();
+ llfio::byte buf;
+ llfio::byte_socket_handle::buffer_type b(&buf, 1);
+ s.first.read({{b}});
+ handles[idx] = nullptr;
+ ret--;
+ }
+ out[idx] = llfio::poll_what::none;
+ }
+ }
+ BOOST_CHECK(ret == 0);
+ if(done)
+ {
+ std::lock_guard<std::mutex> g(lock);
+ std::cout << "Poll listening task exits." << std::endl;
+ break;
+ }
+ }
+ });
+ auto poll_connecting_task = std::async(std::launch::async,
+ [&]
+ {
+ std::vector<llfio::pollable_handle *> handles;
+ std::vector<llfio::poll_what> what, out;
+ for(size_t n = 0; n < MAX_SOCKETS; n++)
+ {
+ handles.push_back(&sockets[n]);
+ what.push_back(llfio::poll_what::is_writable);
+ out.push_back(llfio::poll_what::none);
+ }
+ for(;;)
+ {
+ int ret = (int) llfio::poll(out, {handles}, what, std::chrono::seconds(30)).value();
+ bool done = true, saw_closed = false;
+ size_t remaining = MAX_SOCKETS;
+ for(size_t n = 0; n < MAX_SOCKETS; n++)
+ {
+ auto idx = idxs[n];
+ if(handles[idx] != nullptr)
+ {
+ done = false;
+ // On Linux, a new socket not yet connected MAY appear as both writable and hanged up,
+ // so filter out the closed.
+ if(!(out[idx] & llfio::poll_what::is_closed) || (remaining == 1 && currently_connecting == idx))
+ {
+ if(out[idx] & llfio::poll_what::is_writable)
+ {
+ {
+ std::lock_guard<std::mutex> g(lock);
+ std::cout << "Poll connect sees writable (raw = " << (int) (uint8_t) out[idx] << ") on socket " << idx
+ << ". Currently connecting is " << currently_connecting << std::endl;
+ }
+ BOOST_CHECK(currently_connecting == idx);
+ handles[idx] = nullptr;
+ ret--;
+ }
+ }
+ else
+ {
+ saw_closed = true;
+ }
+ out[idx] = llfio::poll_what::none;
+ }
+ else
+ {
+ remaining--;
+ }
+ }
+ if(!saw_closed)
+ {
+ BOOST_CHECK(ret == 0);
+ }
+ if(done)
+ {
+ std::lock_guard<std::mutex> g(lock);
+ std::cout << "Poll connect task exits." << std::endl;
+ break;
+ }
+ }
+ });
+ auto connect_task = std::async(std::launch::async,
+ [&]
+ {
+ for(size_t n = 0; n < MAX_SOCKETS; n++)
+ {
+ auto idx = idxs[n];
+ {
+ std::lock_guard<std::mutex> g(lock);
+ std::cout << "Connecting " << idx << " ... " << std::endl;
+ }
+ currently_connecting = idx;
+ sockets[idx].connect(listening[idx].second).value();
+ llfio::byte buf(llfio::to_byte(0));
+ llfio::byte_socket_handle::const_buffer_type b(&buf, 1);
+ sockets[idx].write({{&b, 1}});
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ }
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ std::lock_guard<std::mutex> g(lock);
+ std::cout << "Connecting task exits." << std::endl;
+ });
connect_task.get();
poll_listening_task.get();
poll_connecting_task.get();