diff options
author | François Laignel <francois@centricular.com> | 2023-09-27 19:32:54 +0300 |
---|---|---|
committer | Sebastian Dröge <sebastian@centricular.com> | 2023-10-04 19:09:11 +0300 |
commit | ffb7ea1885ac1cfa8761254512c46dc4fc890975 (patch) | |
tree | 9272fc97d63eac63582c4485a8c7c56a6af86478 /generic/threadshare/src | |
parent | fcd16bc070dd2460212e1b74931a53bbb173cc6c (diff) |
generic: threadshare: port to polling 3.1.0
Also use `rustix` & `std::ffi` instead of `libc`.
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1350>
Diffstat (limited to 'generic/threadshare/src')
-rw-r--r-- | generic/threadshare/src/jitterbuffer/ffi.rs | 2 | ||||
-rw-r--r-- | generic/threadshare/src/runtime/executor/async_wrapper.rs | 233 | ||||
-rw-r--r-- | generic/threadshare/src/runtime/executor/mod.rs | 2 | ||||
-rw-r--r-- | generic/threadshare/src/runtime/executor/reactor.rs | 136 | ||||
-rw-r--r-- | generic/threadshare/src/runtime/executor/reactor/kqueue.rs | 108 | ||||
-rw-r--r-- | generic/threadshare/src/runtime/executor/reactor/unix.rs | 61 | ||||
-rw-r--r-- | generic/threadshare/src/runtime/executor/reactor/windows.rs | 60 | ||||
-rw-r--r-- | generic/threadshare/src/socket.rs | 56 |
8 files changed, 548 insertions, 110 deletions
diff --git a/generic/threadshare/src/jitterbuffer/ffi.rs b/generic/threadshare/src/jitterbuffer/ffi.rs index 531d6255d..4bde73ef3 100644 --- a/generic/threadshare/src/jitterbuffer/ffi.rs +++ b/generic/threadshare/src/jitterbuffer/ffi.rs @@ -21,7 +21,7 @@ use glib::ffi::{gboolean, gpointer, GList, GType}; use gst::glib; use gst::ffi::GstClockTime; -use libc::{c_int, c_uint, c_ulonglong, c_ushort, c_void}; +use std::ffi::{c_int, c_uint, c_ulonglong, c_ushort, c_void}; #[repr(C)] #[derive(Copy, Clone)] diff --git a/generic/threadshare/src/runtime/executor/async_wrapper.rs b/generic/threadshare/src/runtime/executor/async_wrapper.rs index f0e235f1a..0e23691ac 100644 --- a/generic/threadshare/src/runtime/executor/async_wrapper.rs +++ b/generic/threadshare/src/runtime/executor/async_wrapper.rs @@ -18,20 +18,20 @@ use std::task::{Context, Poll}; #[cfg(unix)] use std::{ - os::unix::io::{AsRawFd, RawFd}, + os::unix::io::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd}, os::unix::net::{SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream}, path::Path, }; #[cfg(windows)] -use std::os::windows::io::{AsRawSocket, RawSocket}; +use std::os::windows::io::{AsRawSocket, AsSocket, BorrowedSocket, OwnedSocket, RawSocket}; use socket2::{Domain, Protocol, SockAddr, Socket, Type}; use crate::runtime::RUNTIME_CAT; use super::scheduler::{self, Scheduler}; -use super::{Reactor, Readable, ReadableOwned, Source, Writable, WritableOwned}; +use super::{Reactor, Readable, ReadableOwned, Registration, Source, Writable, WritableOwned}; /// Async adapter for I/O types. /// @@ -103,11 +103,11 @@ pub struct Async<T: Send + 'static> { impl<T: Send + 'static> Unpin for Async<T> {} #[cfg(unix)] -impl<T: AsRawFd + Send + 'static> Async<T> { +impl<T: AsFd + Send + 'static> Async<T> { /// Creates an async I/O handle. /// /// This method will put the handle in non-blocking mode and register it in - /// [epoll]/[kqueue]/[event ports]/[wepoll]. + /// [epoll]/[kqueue]/[event ports]/[IOCP]. /// /// On Unix systems, the handle must implement `AsRawFd`, while on Windows it must implement /// `AsRawSocket`. @@ -115,22 +115,33 @@ impl<T: AsRawFd + Send + 'static> Async<T> { /// [epoll]: https://en.wikipedia.org/wiki/Epoll /// [kqueue]: https://en.wikipedia.org/wiki/Kqueue /// [event ports]: https://illumos.org/man/port_create - /// [wepoll]: https://github.com/piscisaureus/wepoll + /// [IOCP]: https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports pub fn new(io: T) -> io::Result<Async<T>> { - let fd = io.as_raw_fd(); - // Put the file descriptor in non-blocking mode. - unsafe { - let mut res = libc::fcntl(fd, libc::F_GETFL); - if res != -1 { - res = libc::fcntl(fd, libc::F_SETFL, res | libc::O_NONBLOCK); - } - if res == -1 { - return Err(io::Error::last_os_error()); + let fd = io.as_fd(); + cfg_if::cfg_if! { + // ioctl(FIONBIO) sets the flag atomically, but we use this only on Linux + // for now, as with the standard library, because it seems to behave + // differently depending on the platform. + // https://github.com/rust-lang/rust/commit/efeb42be2837842d1beb47b51bb693c7474aba3d + // https://github.com/libuv/libuv/blob/e9d91fccfc3e5ff772d5da90e1c4a24061198ca0/src/unix/poll.c#L78-L80 + // https://github.com/tokio-rs/mio/commit/0db49f6d5caf54b12176821363d154384357e70a + if #[cfg(target_os = "linux")] { + rustix::io::ioctl_fionbio(fd, true)?; + } else { + let previous = rustix::fs::fcntl_getfl(fd)?; + let new = previous | rustix::fs::OFlags::NONBLOCK; + if new != previous { + rustix::fs::fcntl_setfl(fd, new)?; + } } } - let source = Reactor::with_mut(|reactor| reactor.insert_io(fd))?; + // SAFETY: It is impossible to drop the I/O source while it is registered through + // this type. + let registration = unsafe { Registration::new(fd) }; + + let source = Reactor::with_mut(|reactor| reactor.insert_io(registration))?; Ok(Async { source, io: Some(io), @@ -144,16 +155,41 @@ impl<T: AsRawFd + Send + 'static> Async<T> { #[cfg(unix)] impl<T: AsRawFd + Send + 'static> AsRawFd for Async<T> { fn as_raw_fd(&self) -> RawFd { - self.source.raw + self.get_ref().as_raw_fd() + } +} + +#[cfg(unix)] +impl<T: AsFd + Send + 'static> AsFd for Async<T> { + fn as_fd(&self) -> BorrowedFd<'_> { + self.get_ref().as_fd() + } +} + +#[cfg(unix)] +impl<T: AsFd + From<OwnedFd> + Send + 'static> TryFrom<OwnedFd> for Async<T> { + type Error = io::Error; + + fn try_from(value: OwnedFd) -> Result<Self, Self::Error> { + Async::new(value.into()) + } +} + +#[cfg(unix)] +impl<T: Into<OwnedFd> + Send + 'static> TryFrom<Async<T>> for OwnedFd { + type Error = io::Error; + + fn try_from(value: Async<T>) -> Result<Self, Self::Error> { + value.into_inner().map(Into::into) } } #[cfg(windows)] -impl<T: AsRawSocket + Send + 'static> Async<T> { +impl<T: AsSocket + Send + 'static> Async<T> { /// Creates an async I/O handle. /// /// This method will put the handle in non-blocking mode and register it in - /// [epoll]/[kqueue]/[event ports]/[wepoll]. + /// [epoll]/[kqueue]/[event ports]/[IOCP]. /// /// On Unix systems, the handle must implement `AsRawFd`, while on Windows it must implement /// `AsRawSocket`. @@ -161,27 +197,24 @@ impl<T: AsRawSocket + Send + 'static> Async<T> { /// [epoll]: https://en.wikipedia.org/wiki/Epoll /// [kqueue]: https://en.wikipedia.org/wiki/Kqueue /// [event ports]: https://illumos.org/man/port_create - /// [wepoll]: https://github.com/piscisaureus/wepoll + /// [IOCP]: https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports pub fn new(io: T) -> io::Result<Async<T>> { - let sock = io.as_raw_socket(); + let borrowed = io.as_socket(); // Put the socket in non-blocking mode. - unsafe { - use winapi::ctypes; - use winapi::um::winsock2; - - let mut nonblocking = true as ctypes::c_ulong; - let res = winsock2::ioctlsocket( - sock as winsock2::SOCKET, - winsock2::FIONBIO, - &mut nonblocking, - ); - if res != 0 { - return Err(io::Error::last_os_error()); - } - } - - let source = Reactor::with_mut(|reactor| reactor.insert_io(sock))?; + // + // Safety: We assume `as_raw_socket()` returns a valid fd. When we can + // depend on Rust >= 1.63, where `AsFd` is stabilized, and when + // `TimerFd` implements it, we can remove this unsafe and simplify this. + rustix::io::ioctl_fionbio(borrowed, true)?; + + // Create the registration. + // + // SAFETY: It is impossible to drop the I/O source while it is registered through + // this type. + let registration = unsafe { Registration::new(borrowed) }; + + let source = Reactor::with_mut(|reactor| reactor.insert_io(registration))?; Ok(Async { source, io: Some(io), @@ -195,7 +228,32 @@ impl<T: AsRawSocket + Send + 'static> Async<T> { #[cfg(windows)] impl<T: AsRawSocket + Send + 'static> AsRawSocket for Async<T> { fn as_raw_socket(&self) -> RawSocket { - self.source.raw + self.get_ref().as_raw_socket() + } +} + +#[cfg(windows)] +impl<T: AsSocket + Send + 'static> AsSocket for Async<T> { + fn as_socket(&self) -> BorrowedSocket<'_> { + self.get_ref().as_socket() + } +} + +#[cfg(windows)] +impl<T: AsSocket + From<OwnedSocket> + Send + 'static> TryFrom<OwnedSocket> for Async<T> { + type Error = io::Error; + + fn try_from(value: OwnedSocket) -> Result<Self, Self::Error> { + Async::new(value.into()) + } +} + +#[cfg(windows)] +impl<T: Into<OwnedSocket> + Send + 'static> TryFrom<Async<T>> for OwnedSocket { + type Error = io::Error; + + fn try_from(value: Async<T>) -> Result<Self, Self::Error> { + value.into_inner().map(Into::into) } } @@ -380,7 +438,12 @@ impl<T: Send + 'static> Drop for Async<T> { sched.spawn_and_unpark(async move { Reactor::with_mut(|reactor| { if let Err(err) = reactor.remove_io(&source) { - gst::error!(RUNTIME_CAT, "Failed to remove fd {}: {}", source.raw, err); + gst::error!( + RUNTIME_CAT, + "Failed to remove fd {:?}: {}", + source.registration, + err + ); } }); drop(io); @@ -392,6 +455,94 @@ impl<T: Send + 'static> Drop for Async<T> { } } +/// Types whose I/O trait implementations do not drop the underlying I/O source. +/// +/// The resource contained inside of the [`Async`] cannot be invalidated. This invalidation can +/// happen if the inner resource (the [`TcpStream`], [`UnixListener`] or other `T`) is moved out +/// and dropped before the [`Async`]. Because of this, functions that grant mutable access to +/// the inner type are unsafe, as there is no way to guarantee that the source won't be dropped +/// and a dangling handle won't be left behind. +/// +/// Unfortunately this extends to implementations of [`Read`] and [`Write`]. Since methods on those +/// traits take `&mut`, there is no guarantee that the implementor of those traits won't move the +/// source out while the method is being run. +/// +/// This trait is an antidote to this predicament. By implementing this trait, the user pledges +/// that using any I/O traits won't destroy the source. This way, [`Async`] can implement the +/// `async` version of these I/O traits, like [`AsyncRead`] and [`AsyncWrite`]. +/// +/// # Safety +/// +/// Any I/O trait implementations for this type must not drop the underlying I/O source. Traits +/// affected by this trait include [`Read`], [`Write`], [`Seek`] and [`BufRead`]. +/// +/// This trait is implemented by default on top of `libstd` types. In addition, it is implemented +/// for immutable reference types, as it is impossible to invalidate any outstanding references +/// while holding an immutable reference, even with interior mutability. As Rust's current pinning +/// system relies on similar guarantees, I believe that this approach is robust. +/// +/// [`BufRead`]: https://doc.rust-lang.org/std/io/trait.BufRead.html +/// [`Read`]: https://doc.rust-lang.org/std/io/trait.Read.html +/// [`Seek`]: https://doc.rust-lang.org/std/io/trait.Seek.html +/// [`Write`]: https://doc.rust-lang.org/std/io/trait.Write.html +/// +/// [`AsyncRead`]: https://docs.rs/futures-io/latest/futures_io/trait.AsyncRead.html +/// [`AsyncWrite`]: https://docs.rs/futures-io/latest/futures_io/trait.AsyncWrite.html +pub unsafe trait IoSafe {} + +/// Reference types can't be mutated. +/// +/// The worst thing that can happen is that external state is used to change what kind of pointer +/// `as_fd()` returns. For instance: +/// +/// ``` +/// # #[cfg(unix)] { +/// use std::cell::Cell; +/// use std::net::TcpStream; +/// use std::os::unix::io::{AsFd, BorrowedFd}; +/// +/// struct Bar { +/// flag: Cell<bool>, +/// a: TcpStream, +/// b: TcpStream +/// } +/// +/// impl AsFd for Bar { +/// fn as_fd(&self) -> BorrowedFd<'_> { +/// if self.flag.replace(!self.flag.get()) { +/// self.a.as_fd() +/// } else { +/// self.b.as_fd() +/// } +/// } +/// } +/// # } +/// ``` +/// +/// We solve this problem by only calling `as_fd()` once to get the original source. Implementations +/// like this are considered buggy (but not unsound) and are thus not really supported by `async-io`. +unsafe impl<T: ?Sized> IoSafe for &T {} + +// Can be implemented on top of libstd types. +unsafe impl IoSafe for std::fs::File {} +unsafe impl IoSafe for std::io::Stderr {} +unsafe impl IoSafe for std::io::Stdin {} +unsafe impl IoSafe for std::io::Stdout {} +unsafe impl IoSafe for std::io::StderrLock<'_> {} +unsafe impl IoSafe for std::io::StdinLock<'_> {} +unsafe impl IoSafe for std::io::StdoutLock<'_> {} +unsafe impl IoSafe for std::net::TcpStream {} + +#[cfg(unix)] +unsafe impl IoSafe for std::os::unix::net::UnixStream {} + +unsafe impl<T: IoSafe + Read> IoSafe for std::io::BufReader<T> {} +unsafe impl<T: IoSafe + Write> IoSafe for std::io::BufWriter<T> {} +unsafe impl<T: IoSafe + Write> IoSafe for std::io::LineWriter<T> {} +unsafe impl<T: IoSafe + ?Sized> IoSafe for &mut T {} +unsafe impl<T: IoSafe + ?Sized> IoSafe for Box<T> {} +unsafe impl<T: Clone + IoSafe + ?Sized> IoSafe for std::borrow::Cow<'_, T> {} + impl<T: Read + Send + 'static> AsyncRead for Async<T> { fn poll_read( mut self: Pin<&mut Self>, @@ -422,6 +573,8 @@ impl<T: Read + Send + 'static> AsyncRead for Async<T> { } } +// Since this is through a reference, we can't mutate the inner I/O source. +// Therefore this is safe! impl<T: Send + 'static> AsyncRead for &Async<T> where for<'a> &'a T: Read, @@ -886,7 +1039,7 @@ fn connect(addr: SockAddr, domain: Domain, protocol: Option<Protocol>) -> io::Re match socket.connect(&addr) { Ok(_) => {} #[cfg(unix)] - Err(err) if err.raw_os_error() == Some(libc::EINPROGRESS) => {} + Err(err) if err.raw_os_error() == Some(rustix::io::Errno::INPROGRESS.raw_os_error()) => {} Err(err) if err.kind() == io::ErrorKind::WouldBlock => {} Err(err) => return Err(err), } diff --git a/generic/threadshare/src/runtime/executor/mod.rs b/generic/threadshare/src/runtime/executor/mod.rs index f47a6491f..71b9bc403 100644 --- a/generic/threadshare/src/runtime/executor/mod.rs +++ b/generic/threadshare/src/runtime/executor/mod.rs @@ -30,7 +30,7 @@ mod join; pub use join::JoinHandle; pub mod reactor; -use reactor::{Reactor, Readable, ReadableOwned, Source, Writable, WritableOwned}; +use reactor::{Reactor, Readable, ReadableOwned, Registration, Source, Writable, WritableOwned}; // We need the `Mutex<bool>` to work in pair with `Condvar`. #[allow(clippy::mutex_atomic)] diff --git a/generic/threadshare/src/runtime/executor/reactor.rs b/generic/threadshare/src/runtime/executor/reactor.rs index 86fa079bd..4e297385f 100644 --- a/generic/threadshare/src/runtime/executor/reactor.rs +++ b/generic/threadshare/src/runtime/executor/reactor.rs @@ -7,7 +7,7 @@ use concurrent_queue::ConcurrentQueue; use futures::ready; -use polling::{Event, Poller}; +use polling::{Event, Events, Poller}; use slab::Slab; use std::borrow::Borrow; @@ -18,10 +18,6 @@ use std::future::Future; use std::io; use std::marker::PhantomData; use std::mem; -#[cfg(unix)] -use std::os::unix::io::RawFd; -#[cfg(windows)] -use std::os::windows::io::RawSocket; use std::panic; use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -29,6 +25,31 @@ use std::sync::{Arc, Mutex}; use std::task::{Context, Poll, Waker}; use std::time::{Duration, Instant}; +// Choose the proper implementation of `Registration` based on the target platform. +cfg_if::cfg_if! { + if #[cfg(windows)] { + mod windows; + pub use windows::Registration; + } else if #[cfg(any( + target_os = "macos", + target_os = "ios", + target_os = "tvos", + target_os = "watchos", + target_os = "freebsd", + target_os = "netbsd", + target_os = "openbsd", + target_os = "dragonfly", + ))] { + mod kqueue; + pub use kqueue::Registration; + } else if #[cfg(unix)] { + mod unix; + pub use unix::Registration; + } else { + compile_error!("unsupported platform"); + } +} + use crate::runtime::{Async, RUNTIME_CAT}; const READ: usize = 0; @@ -71,7 +92,7 @@ pub(super) struct Reactor { /// Temporary storage for I/O events when polling the reactor. /// /// Holding a lock on this event list implies the exclusive right to poll I/O. - events: Vec<Event>, + events: Events, /// An ordered map of registered regular timers. /// @@ -106,7 +127,7 @@ impl Reactor { half_max_throttling: max_throttling / 2, wakers: Vec::new(), sources: Slab::new(), - events: Vec::new(), + events: Events::new(), timers: BTreeMap::new(), after_timers: BTreeMap::new(), timer_ops: ConcurrentQueue::bounded(1000), @@ -210,16 +231,12 @@ impl Reactor { } /// Registers an I/O source in the reactor. - pub fn insert_io( - &mut self, - #[cfg(unix)] raw: RawFd, - #[cfg(windows)] raw: RawSocket, - ) -> io::Result<Arc<Source>> { + pub fn insert_io(&mut self, raw: Registration) -> io::Result<Arc<Source>> { // Create an I/O source for this file descriptor. let source = { let key = self.sources.vacant_entry().key(); let source = Arc::new(Source { - raw, + registration: raw, key, state: Default::default(), }); @@ -228,11 +245,11 @@ impl Reactor { }; // Register the file descriptor. - if let Err(err) = self.poller.add(raw, Event::none(source.key)) { + if let Err(err) = source.registration.add(&self.poller, source.key) { gst::error!( crate::runtime::RUNTIME_CAT, - "Failed to register fd {}: {}", - source.raw, + "Failed to register fd {:?}: {}", + source.registration, err, ); self.sources.remove(source.key); @@ -245,7 +262,7 @@ impl Reactor { /// Deregisters an I/O source from the reactor. pub fn remove_io(&mut self, source: &Source) -> io::Result<()> { self.sources.remove(source.key); - self.poller.delete(source.raw) + source.registration.delete(&self.poller) } /// Registers a regular timer in the reactor. @@ -414,14 +431,16 @@ impl Reactor { // e.g. we were previously interested in both readability and writability, // but only one of them was emitted. if !state[READ].is_empty() || !state[WRITE].is_empty() { - self.poller.modify( - source.raw, - Event { - key: source.key, - readable: !state[READ].is_empty(), - writable: !state[WRITE].is_empty(), - }, - )?; + // Create the event that we are interested in. + let event = { + let mut event = Event::none(source.key); + event.readable = !state[READ].is_empty(); + event.writable = !state[WRITE].is_empty(); + event + }; + + // Register interest in this event. + source.registration.modify(&self.poller, event)?; } } } @@ -493,13 +512,8 @@ enum TimerOp { /// A registered source of I/O events. #[derive(Debug)] pub(super) struct Source { - /// Raw file descriptor on Unix platforms. - #[cfg(unix)] - pub(super) raw: RawFd, - - /// Raw socket handle on Windows. - #[cfg(windows)] - pub(super) raw: RawSocket, + /// This source's registration into the reactor. + pub(super) registration: Registration, /// The key of this source obtained during registration. key: usize, @@ -590,14 +604,15 @@ impl Source { // Update interest in this I/O handle. if was_empty { - reactor.poller.modify( - self.raw, - Event { - key: self.key, - readable: !state[READ].is_empty(), - writable: !state[WRITE].is_empty(), - }, - )?; + let event = { + let mut event = Event::none(self.key); + event.readable = !state[READ].is_empty(); + event.writable = !state[WRITE].is_empty(); + event + }; + + // Register interest in it. + self.registration.modify(&reactor.poller, event)?; } Poll::Pending @@ -645,7 +660,11 @@ impl<T: Send + 'static> Future for Readable<'_, T> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { ready!(Pin::new(&mut self.0).poll(cx))?; - gst::trace!(RUNTIME_CAT, "readable: fd={}", self.0.handle.source.raw); + gst::trace!( + RUNTIME_CAT, + "readable: fd={:?}", + self.0.handle.source.registration + ); Poll::Ready(Ok(())) } } @@ -667,8 +686,8 @@ impl<T: Send + 'static> Future for ReadableOwned<T> { ready!(Pin::new(&mut self.0).poll(cx))?; gst::trace!( RUNTIME_CAT, - "readable_owned: fd={}", - self.0.handle.source.raw + "readable_owned: fd={:?}", + self.0.handle.source.registration ); Poll::Ready(Ok(())) } @@ -689,7 +708,11 @@ impl<T: Send + 'static> Future for Writable<'_, T> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { ready!(Pin::new(&mut self.0).poll(cx))?; - gst::trace!(RUNTIME_CAT, "writable: fd={}", self.0.handle.source.raw); + gst::trace!( + RUNTIME_CAT, + "writable: fd={:?}", + self.0.handle.source.registration + ); Poll::Ready(Ok(())) } } @@ -711,8 +734,8 @@ impl<T: Send + 'static> Future for WritableOwned<T> { ready!(Pin::new(&mut self.0).poll(cx))?; gst::trace!( RUNTIME_CAT, - "writable_owned: fd={}", - self.0.handle.source.raw + "writable_owned: fd={:?}", + self.0.handle.source.registration ); Poll::Ready(Ok(())) } @@ -780,14 +803,19 @@ impl<H: Borrow<Async<T>> + Clone, T: Send + 'static> Future for Ready<H, T> { // Update interest in this I/O handle. if was_empty { - reactor.poller.modify( - handle.borrow().source.raw, - Event { - key: handle.borrow().source.key, - readable: !state[READ].is_empty(), - writable: !state[WRITE].is_empty(), - }, - )?; + // Create the event that we are interested in. + let event = { + let mut event = Event::none(handle.borrow().source.key); + event.readable = !state[READ].is_empty(); + event.writable = !state[WRITE].is_empty(); + event + }; + + handle + .borrow() + .source + .registration + .modify(&reactor.poller, event)?; } Poll::Pending diff --git a/generic/threadshare/src/runtime/executor/reactor/kqueue.rs b/generic/threadshare/src/runtime/executor/reactor/kqueue.rs new file mode 100644 index 000000000..9f31af98b --- /dev/null +++ b/generic/threadshare/src/runtime/executor/reactor/kqueue.rs @@ -0,0 +1,108 @@ +// SPDX-License-Identifier: MIT OR Apache-2.0 + +use crate::os::kqueue::Signal; + +use polling::os::kqueue::{PollerKqueueExt, Process, ProcessOps, Signal as PollSignal}; +use polling::{Event, PollMode, Poller}; + +use std::fmt; +use std::io::Result; +use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd}; +use std::process::Child; + +/// The raw registration into the reactor. +/// +/// This needs to be public, since it is technically exposed through the `QueueableSealed` trait. +#[doc(hidden)] +pub enum Registration { + /// Raw file descriptor for readability/writability. + /// + /// + /// # Invariant + /// + /// This describes a valid file descriptor that has not been `close`d. It will not be + /// closed while this object is alive. + Fd(RawFd), + + /// Raw signal number for signal delivery. + Signal(Signal), + + /// Process for process termination. + Process(Child), +} + +impl fmt::Debug for Registration { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Fd(raw) => fmt::Debug::fmt(raw, f), + Self::Signal(signal) => fmt::Debug::fmt(signal, f), + Self::Process(process) => fmt::Debug::fmt(process, f), + } + } +} + +impl Registration { + /// Add this file descriptor into the reactor. + /// + /// # Safety + /// + /// The provided file descriptor must be valid and not be closed while this object is alive. + pub(crate) unsafe fn new(f: impl AsFd) -> Self { + Self::Fd(f.as_fd().as_raw_fd()) + } + + /// Registers the object into the reactor. + #[inline] + pub(crate) fn add(&self, poller: &Poller, token: usize) -> Result<()> { + match self { + Self::Fd(raw) => { + // SAFETY: This object's existence validates the invariants of Poller::add + unsafe { poller.add(*raw, Event::none(token)) } + } + Self::Signal(signal) => { + poller.add_filter(PollSignal(signal.0), token, PollMode::Oneshot) + } + Self::Process(process) => poller.add_filter( + unsafe { Process::new(process, ProcessOps::Exit) }, + token, + PollMode::Oneshot, + ), + } + } + + /// Re-registers the object into the reactor. + #[inline] + pub(crate) fn modify(&self, poller: &Poller, interest: Event) -> Result<()> { + match self { + Self::Fd(raw) => { + // SAFETY: self.raw is a valid file descriptor + let fd = unsafe { BorrowedFd::borrow_raw(*raw) }; + poller.modify(fd, interest) + } + Self::Signal(signal) => { + poller.modify_filter(PollSignal(signal.0), interest.key, PollMode::Oneshot) + } + Self::Process(process) => poller.modify_filter( + unsafe { Process::new(process, ProcessOps::Exit) }, + interest.key, + PollMode::Oneshot, + ), + } + } + + /// Deregisters the object from the reactor. + #[inline] + pub(crate) fn delete(&self, poller: &Poller) -> Result<()> { + match self { + Self::Fd(raw) => { + // SAFETY: self.raw is a valid file descriptor + let fd = unsafe { BorrowedFd::borrow_raw(*raw) }; + poller.delete(fd) + } + Self::Signal(signal) => poller.delete_filter(PollSignal(signal.0)), + Self::Process(process) => { + poller.delete_filter(unsafe { Process::new(process, ProcessOps::Exit) }) + } + } + } +} diff --git a/generic/threadshare/src/runtime/executor/reactor/unix.rs b/generic/threadshare/src/runtime/executor/reactor/unix.rs new file mode 100644 index 000000000..b2f9b1b20 --- /dev/null +++ b/generic/threadshare/src/runtime/executor/reactor/unix.rs @@ -0,0 +1,61 @@ +// SPDX-License-Identifier: MIT OR Apache-2.0 + +use polling::{Event, Poller}; + +use std::fmt; +use std::io::Result; +use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd}; + +/// The raw registration into the reactor. +#[doc(hidden)] +pub struct Registration { + /// Raw file descriptor on Unix. + /// + /// # Invariant + /// + /// This describes a valid file descriptor that has not been `close`d. It will not be + /// closed while this object is alive. + raw: RawFd, +} + +impl fmt::Debug for Registration { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&self.raw, f) + } +} + +impl Registration { + /// Add this file descriptor into the reactor. + /// + /// # Safety + /// + /// The provided file descriptor must be valid and not be closed while this object is alive. + pub(crate) unsafe fn new(f: impl AsFd) -> Self { + Self { + raw: f.as_fd().as_raw_fd(), + } + } + + /// Registers the object into the reactor. + #[inline] + pub(crate) fn add(&self, poller: &Poller, token: usize) -> Result<()> { + // SAFETY: This object's existence validates the invariants of Poller::add + unsafe { poller.add(self.raw, Event::none(token)) } + } + + /// Re-registers the object into the reactor. + #[inline] + pub(crate) fn modify(&self, poller: &Poller, interest: Event) -> Result<()> { + // SAFETY: self.raw is a valid file descriptor + let fd = unsafe { BorrowedFd::borrow_raw(self.raw) }; + poller.modify(fd, interest) + } + + /// Deregisters the object from the reactor. + #[inline] + pub(crate) fn delete(&self, poller: &Poller) -> Result<()> { + // SAFETY: self.raw is a valid file descriptor + let fd = unsafe { BorrowedFd::borrow_raw(self.raw) }; + poller.delete(fd) + } +} diff --git a/generic/threadshare/src/runtime/executor/reactor/windows.rs b/generic/threadshare/src/runtime/executor/reactor/windows.rs new file mode 100644 index 000000000..1c92f00a6 --- /dev/null +++ b/generic/threadshare/src/runtime/executor/reactor/windows.rs @@ -0,0 +1,60 @@ +// SPDX-License-Identifier: MIT OR Apache-2.0 + +use polling::{Event, Poller}; +use std::fmt; +use std::io::Result; +use std::os::windows::io::{AsRawSocket, AsSocket, BorrowedSocket, RawSocket}; + +/// The raw registration into the reactor. +#[doc(hidden)] +pub struct Registration { + /// Raw socket handle on Windows. + /// + /// # Invariant + /// + /// This describes a valid socket that has not been `close`d. It will not be + /// closed while this object is alive. + raw: RawSocket, +} + +impl fmt::Debug for Registration { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&self.raw, f) + } +} + +impl Registration { + /// Add this file descriptor into the reactor. + /// + /// # Safety + /// + /// The provided file descriptor must be valid and not be closed while this object is alive. + pub(crate) unsafe fn new(f: impl AsSocket) -> Self { + Self { + raw: f.as_socket().as_raw_socket(), + } + } + + /// Registers the object into the reactor. + #[inline] + pub(crate) fn add(&self, poller: &Poller, token: usize) -> Result<()> { + // SAFETY: This object's existence validates the invariants of Poller::add + unsafe { poller.add(self.raw, Event::none(token)) } + } + + /// Re-registers the object into the reactor. + #[inline] + pub(crate) fn modify(&self, poller: &Poller, interest: Event) -> Result<()> { + // SAFETY: self.raw is a valid file descriptor + let fd = unsafe { BorrowedSocket::borrow_raw(self.raw) }; + poller.modify(fd, interest) + } + + /// Deregisters the object from the reactor. + #[inline] + pub(crate) fn delete(&self, poller: &Poller) -> Result<()> { + // SAFETY: self.raw is a valid file descriptor + let fd = unsafe { BorrowedSocket::borrow_raw(self.raw) }; + poller.delete(fd) + } +} diff --git a/generic/threadshare/src/socket.rs b/generic/threadshare/src/socket.rs index f52039fbd..702fa2836 100644 --- a/generic/threadshare/src/socket.rs +++ b/generic/threadshare/src/socket.rs @@ -25,8 +25,6 @@ use gst::prelude::*; use gst::glib::once_cell::sync::Lazy; -use gio::prelude::*; - use std::error; use std::fmt; use std::io; @@ -222,31 +220,57 @@ impl GioSocketWrapper { } } - #[cfg(unix)] - pub fn set_tos(&self, qos_dscp: i32) -> Result<(), glib::Error> { - use libc::{IPPROTO_IP, IPPROTO_IPV6, IPV6_TCLASS, IP_TOS}; + #[cfg(any( + bsd, + linux_like, + target_os = "aix", + target_os = "fuchsia", + target_os = "haiku", + target_env = "newlib" + ))] + pub fn set_tos(&self, qos_dscp: i32) -> rustix::io::Result<()> { + use gio::prelude::*; + use rustix::net::sockopt; let tos = (qos_dscp & 0x3f) << 2; let socket = self.as_socket(); - socket.set_option(IPPROTO_IP, IP_TOS, tos)?; + sockopt::set_ip_tos(socket, tos)?; if socket.family() == gio::SocketFamily::Ipv6 { - socket.set_option(IPPROTO_IPV6, IPV6_TCLASS, tos)?; + sockopt::set_ipv6_tclass(socket, tos)?; } Ok(()) } - #[cfg(not(unix))] - pub fn set_tos(&self, _qos_dscp: i32) -> Result<(), glib::Error> { + #[cfg(not(any( + bsd, + linux_like, + target_os = "aix", + target_os = "fuchsia", + target_os = "haiku", + target_env = "newlib" + )))] + pub fn set_tos(&self, _qos_dscp: i32) -> rustix::io::Result<()> { Ok(()) } - #[cfg(unix)] + #[cfg(not(windows))] pub fn get<T: FromRawFd>(&self) -> T { - unsafe { FromRawFd::from_raw_fd(libc::dup(gio::ffi::g_socket_get_fd(self.socket))) } + unsafe { + let borrowed = + rustix::fd::BorrowedFd::borrow_raw(gio::ffi::g_socket_get_fd(self.socket)); + + let dupped = rustix::io::dup(borrowed).unwrap(); + let res = FromRawFd::from_raw_fd(dupped.as_raw_fd()); + + // We transferred ownership to T so don't drop dupped + std::mem::forget(dupped); + + res + } } #[cfg(windows)] @@ -308,7 +332,7 @@ unsafe fn dup_socket(socket: usize) -> usize { pub fn wrap_socket(socket: &Async<UdpSocket>) -> Result<GioSocketWrapper, gst::ErrorMessage> { #[cfg(unix)] unsafe { - let fd = libc::dup(socket.as_raw_fd()); + let dupped = rustix::io::dup(socket).unwrap(); // This is unsafe because it allows us to share the fd between the socket and the // GIO socket below, but safety of this is the job of the application @@ -319,14 +343,18 @@ pub fn wrap_socket(socket: &Async<UdpSocket>) -> Result<GioSocketWrapper, gst::E } } - let fd = FdConverter(fd); + let fd = FdConverter(dupped.as_raw_fd()); - let gio_socket = gio::Socket::from_fd(fd).map_err(|err| { + let gio_socket = gio::Socket::from_fd(fd); + // We transferred ownership to gio_socket so don't drop dupped + std::mem::forget(dupped); + let gio_socket = gio_socket.map_err(|err| { gst::error_msg!( gst::ResourceError::OpenWrite, ["Failed to create wrapped GIO socket: {}", err] ) })?; + Ok(GioSocketWrapper::new(&gio_socket)) } #[cfg(windows)] |