diff options
author | Sebastian Dröge <sebastian@centricular.com> | 2018-11-13 15:13:23 +0300 |
---|---|---|
committer | Sebastian Dröge <sebastian@centricular.com> | 2018-11-13 15:13:23 +0300 |
commit | ab08cbd412bcf61c959db9120dfa5b88aa3299d2 (patch) | |
tree | f047de5eb9ba25bc52867025a9a7728239e629b9 | |
parent | 4fb18382c28f01af06a6205b155137420c852c2e (diff) |
ts-udpsrc: Implement socket and used-socket properties like in udpsrc
-rw-r--r-- | gst-plugin-threadshare/Cargo.toml | 3 | ||||
-rw-r--r-- | gst-plugin-threadshare/src/lib.rs | 3 | ||||
-rw-r--r-- | gst-plugin-threadshare/src/udpsrc.rs | 370 |
3 files changed, 292 insertions, 84 deletions
diff --git a/gst-plugin-threadshare/Cargo.toml b/gst-plugin-threadshare/Cargo.toml index 770929341..df87781c6 100644 --- a/gst-plugin-threadshare/Cargo.toml +++ b/gst-plugin-threadshare/Cargo.toml @@ -6,8 +6,11 @@ license = "LGPL-2.1+" [dependencies] glib-sys = { git = "https://github.com/gtk-rs/sys" } +gobject-sys = { git = "https://github.com/gtk-rs/sys" } +gio-sys = { git = "https://github.com/gtk-rs/sys" } gstreamer-sys = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs-sys" } glib = { git = "https://github.com/gtk-rs/glib" } +gio = { git = "https://github.com/gtk-rs/gio" } gstreamer = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } gobject-subclass = { git = "https://github.com/gtk-rs/gobject-subclass" } gst-plugin = { path = "../gst-plugin" } diff --git a/gst-plugin-threadshare/src/lib.rs b/gst-plugin-threadshare/src/lib.rs index 459421df7..446e4703b 100644 --- a/gst-plugin-threadshare/src/lib.rs +++ b/gst-plugin-threadshare/src/lib.rs @@ -17,9 +17,12 @@ #![crate_type = "cdylib"] +extern crate gio_sys as gio_ffi; extern crate glib_sys as glib_ffi; +extern crate gobject_sys as gobject_ffi; extern crate gstreamer_sys as gst_ffi; +extern crate gio; extern crate glib; extern crate gobject_subclass; #[macro_use] diff --git a/gst-plugin-threadshare/src/udpsrc.rs b/gst-plugin-threadshare/src/udpsrc.rs index 3fa3b964a..1707320e0 100644 --- a/gst-plugin-threadshare/src/udpsrc.rs +++ b/gst-plugin-threadshare/src/udpsrc.rs @@ -20,6 +20,11 @@ use glib::prelude::*; use gst; use gst::prelude::*; +use gio; + +use gio_ffi; +use gobject_ffi; + use gobject_subclass::object::*; use gst_plugin::element::*; @@ -38,6 +43,12 @@ use rand; use net2; +#[cfg(unix)] +use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; + +#[cfg(windows)] +use std::os::windows::io::{AsRawSocket, FromRawSocket, IntoRawFd, RawSocket}; + use iocontext::*; use socket::*; @@ -46,9 +57,69 @@ const DEFAULT_PORT: u32 = 5000; const DEFAULT_REUSE: bool = true; const DEFAULT_CAPS: Option<gst::Caps> = None; const DEFAULT_MTU: u32 = 1500; +const DEFAULT_SOCKET: Option<GioSocketWrapper> = None; +const DEFAULT_USED_SOCKET: Option<GioSocketWrapper> = None; const DEFAULT_CONTEXT: &'static str = ""; const DEFAULT_CONTEXT_WAIT: u32 = 0; +// Send/Sync struct for passing around a gio::Socket +// and getting the raw fd from it +// +// gio::Socket is not Send/Sync as it's generally unsafe +// to access it from multiple threads. Getting the underlying raw +// fd is safe though, as is receiving/sending from two different threads +#[derive(Debug)] +struct GioSocketWrapper { + socket: *mut gio_ffi::GSocket, +} + +unsafe impl Send for GioSocketWrapper {} +unsafe impl Sync for GioSocketWrapper {} + +impl GioSocketWrapper { + fn new(socket: &gio::Socket) -> Self { + use glib::translate::*; + + Self { + socket: socket.to_glib_full(), + } + } + + fn as_socket(&self) -> gio::Socket { + unsafe { + use glib::translate::*; + + from_glib_none(self.socket) + } + } + + #[cfg(unix)] + fn get<T: FromRawFd>(&self) -> T { + unsafe { FromRawFd::from_raw_fd(gio_ffi::g_socket_get_fd(self.socket)) } + } + + #[cfg(windows)] + fn get<T: FromRawSocket>(&self) -> T { + unsafe { FromRawSocket::from_raw_socket(ffi::g_socket_get_fd(self.socket) as _) } + } +} + +impl Clone for GioSocketWrapper { + fn clone(&self) -> Self { + Self { + socket: unsafe { gobject_ffi::g_object_ref(self.socket as *mut _) as *mut _ }, + } + } +} + +impl Drop for GioSocketWrapper { + fn drop(&mut self) { + unsafe { + gobject_ffi::g_object_unref(self.socket as *mut _); + } + } +} + #[derive(Debug, Clone)] struct Settings { address: Option<String>, @@ -56,6 +127,8 @@ struct Settings { reuse: bool, caps: Option<gst::Caps>, mtu: u32, + socket: Option<GioSocketWrapper>, + used_socket: Option<GioSocketWrapper>, context: String, context_wait: u32, } @@ -68,13 +141,15 @@ impl Default for Settings { reuse: DEFAULT_REUSE, caps: DEFAULT_CAPS, mtu: DEFAULT_MTU, + socket: DEFAULT_SOCKET, + used_socket: DEFAULT_USED_SOCKET, context: DEFAULT_CONTEXT.into(), context_wait: DEFAULT_CONTEXT_WAIT, } } } -static PROPERTIES: [Property; 7] = [ +static PROPERTIES: [Property; 9] = [ Property::String( "address", "Address", @@ -112,6 +187,20 @@ static PROPERTIES: [Property; 7] = [ DEFAULT_MTU, PropertyMutability::ReadWrite, ), + Property::Object( + "socket", + "Socket", + "Socket to use for UDP reception. (None == allocate)", + gio::Socket::static_type, + PropertyMutability::ReadWrite, + ), + Property::Object( + "used-socket", + "Used Socket", + "Socket currently in use for UDP reception. (None = no socket)", + gio::Socket::static_type, + PropertyMutability::Readable, + ), Property::String( "context", "Context", @@ -438,120 +527,203 @@ impl UdpSrc { ) })?; - let addr: IpAddr = match settings.address { - None => { - return Err(gst_error_msg!( - gst::ResourceError::Settings, - ["No address set"] - )) + let socket = if let Some(ref wrapped_socket) = settings.socket { + use std::net::UdpSocket; + + let mut socket: UdpSocket; + + #[cfg(unix)] + { + socket = wrapped_socket.get() } - Some(ref addr) => match addr.parse() { - Err(err) => { + #[cfg(windows)] + { + socket = wrapped_socket.get() + } + + let socket = + net::UdpSocket::from_std(socket, io_context.reactor_handle()).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenRead, + ["Failed to setup socket for tokio: {}", err] + ) + })?; + + self.settings.lock().unwrap().used_socket = Some(wrapped_socket.clone()); + + socket + } else { + let addr: IpAddr = match settings.address { + None => { return Err(gst_error_msg!( gst::ResourceError::Settings, - ["Invalid address '{}' set: {}", addr, err] + ["No address set"] )) } - Ok(addr) => addr, - }, - }; - let port = settings.port; - - // TODO: TTL, multicast loopback, etc - let saddr = if addr.is_multicast() { - // TODO: Use ::unspecified() constructor once stable - let bind_addr = if addr.is_ipv4() { - IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)) - } else { - IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)) + Some(ref addr) => match addr.parse() { + Err(err) => { + return Err(gst_error_msg!( + gst::ResourceError::Settings, + ["Invalid address '{}' set: {}", addr, err] + )) + } + Ok(addr) => addr, + }, }; + let port = settings.port; - let saddr = SocketAddr::new(bind_addr, port as u16); - gst_debug!( - self.cat, - obj: element, - "Binding to {:?} for multicast group {:?}", - saddr, - addr - ); - - saddr - } else { - let saddr = SocketAddr::new(addr, port as u16); - gst_debug!(self.cat, obj: element, "Binding to {:?}", saddr); + // TODO: TTL, multicast loopback, etc + let saddr = if addr.is_multicast() { + // TODO: Use ::unspecified() constructor once stable + let bind_addr = if addr.is_ipv4() { + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)) + } else { + IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)) + }; - saddr - }; + let saddr = SocketAddr::new(bind_addr, port as u16); + gst_debug!( + self.cat, + obj: element, + "Binding to {:?} for multicast group {:?}", + saddr, + addr + ); - let builder = if addr.is_ipv4() { - net2::UdpBuilder::new_v4() - } else { - net2::UdpBuilder::new_v6() - } - .map_err(|err| { - gst_error_msg!( - gst::ResourceError::OpenRead, - ["Failed to create socket: {}", err] - ) - })?; + saddr + } else { + let saddr = SocketAddr::new(addr, port as u16); + gst_debug!(self.cat, obj: element, "Binding to {:?}", saddr); - builder.reuse_address(settings.reuse).map_err(|err| { - gst_error_msg!( - gst::ResourceError::OpenRead, - ["Failed to set reuse_address: {}", err] - ) - })?; + saddr + }; - #[cfg(unix)] - { - use net2::unix::UnixUdpBuilderExt; + let builder = if addr.is_ipv4() { + net2::UdpBuilder::new_v4() + } else { + net2::UdpBuilder::new_v6() + } + .map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenRead, + ["Failed to create socket: {}", err] + ) + })?; - builder.reuse_port(settings.reuse).map_err(|err| { + builder.reuse_address(settings.reuse).map_err(|err| { gst_error_msg!( gst::ResourceError::OpenRead, - ["Failed to set reuse_port: {}", err] + ["Failed to set reuse_address: {}", err] ) })?; - } - let socket = builder.bind(&saddr).map_err(|err| { - gst_error_msg!( - gst::ResourceError::OpenRead, - ["Failed to bind socket: {}", err] - ) - })?; + #[cfg(unix)] + { + use net2::unix::UnixUdpBuilderExt; + + builder.reuse_port(settings.reuse).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenRead, + ["Failed to set reuse_port: {}", err] + ) + })?; + } - let socket = - net::UdpSocket::from_std(socket, io_context.reactor_handle()).map_err(|err| { + let socket = builder.bind(&saddr).map_err(|err| { gst_error_msg!( gst::ResourceError::OpenRead, - ["Failed to setup socket for tokio: {}", err] + ["Failed to bind socket: {}", err] ) })?; - if addr.is_multicast() { - // TODO: Multicast interface configuration, going to be tricky - match addr { - IpAddr::V4(addr) => { - socket - .join_multicast_v4(&addr, &Ipv4Addr::new(0, 0, 0, 0)) - .map_err(|err| { + let socket = + net::UdpSocket::from_std(socket, io_context.reactor_handle()).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenRead, + ["Failed to setup socket for tokio: {}", err] + ) + })?; + + if addr.is_multicast() { + // TODO: Multicast interface configuration, going to be tricky + match addr { + IpAddr::V4(addr) => { + socket + .join_multicast_v4(&addr, &Ipv4Addr::new(0, 0, 0, 0)) + .map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenRead, + ["Failed to join multicast group: {}", err] + ) + })?; + } + IpAddr::V6(addr) => { + socket.join_multicast_v6(&addr, 0).map_err(|err| { gst_error_msg!( gst::ResourceError::OpenRead, ["Failed to join multicast group: {}", err] ) })?; + } } - IpAddr::V6(addr) => { - socket.join_multicast_v6(&addr, 0).map_err(|err| { - gst_error_msg!( - gst::ResourceError::OpenRead, - ["Failed to join multicast group: {}", err] - ) - })?; + } + + // Store the socket as used-socket in the settings + #[cfg(unix)] + { + let fd = socket.as_raw_fd(); + + // This is technically unsafe because it allows + // us to share the fd between the socket and the + // GIO socket below, but safety of this is the + // job of the application + struct FdConverter(RawFd); + impl IntoRawFd for FdConverter { + fn into_raw_fd(self) -> RawFd { + self.0 + } } + + let fd = FdConverter(fd); + + let gio_socket = gio::Socket::new_from_fd(fd).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenRead, + ["Failed to create wrapped GIO socket: {}", err] + ) + })?;; + let wrapper = GioSocketWrapper::new(&gio_socket); + self.settings.lock().unwrap().used_socket = Some(wrapper); } - } + #[cfg(windows)] + { + let fd = socket.as_raw_socket(); + + // This is technically unsafe because it allows + // us to share the fd between the socket and the + // GIO socket below, but safety of this is the + // job of the application + struct SocketConverter(RawSocket); + impl IntoRawSocket for SocketConverter { + fn into_raw_socket(self) -> RawSocket { + self.0 + } + } + + let fd = SocketConverter(fd); + + let gio_socket = gio::Socket::new_from_socket(fd).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenRead, + ["Failed to create wrapped GIO socket: {}", err] + ) + })?;; + let wrapper = GioSocketWrapper::new(&gio_socket); + self.settings.lock().unwrap().used_socket = Some(wrapper); + } + + socket + }; let buffer_pool = gst::BufferPool::new(); let mut config = buffer_pool.get_config(); @@ -615,6 +787,9 @@ impl UdpSrc { state.pending_future_id = Some(pending_future_id); gst_debug!(self.cat, obj: element, "Prepared"); + drop(state); + + element.notify("used-socket"); Ok(()) } @@ -622,6 +797,8 @@ impl UdpSrc { fn unprepare(&self, element: &Element) -> Result<(), ()> { gst_debug!(self.cat, obj: element, "Unpreparing"); + self.settings.lock().unwrap().used_socket = None; + // FIXME: The IO Context has to be alive longer than the queue, // otherwise the queue can't finish any remaining work let (mut socket, io_context) = { @@ -701,6 +878,15 @@ impl ObjectImpl<Element> for UdpSrc { let mut settings = self.settings.lock().unwrap(); settings.mtu = value.get().unwrap(); } + Property::Object("socket", ..) => { + let mut settings = self.settings.lock().unwrap(); + settings.socket = value + .get::<gio::Socket>() + .map(|socket| GioSocketWrapper::new(&socket)); + } + Property::Object("used-socket", ..) => { + unreachable!(); + } Property::String("context", ..) => { let mut settings = self.settings.lock().unwrap(); settings.context = value.get().unwrap_or_else(|| "".into()); @@ -737,6 +923,22 @@ impl ObjectImpl<Element> for UdpSrc { let mut settings = self.settings.lock().unwrap(); Ok(settings.mtu.to_value()) } + Property::Object("socket", ..) => { + let mut settings = self.settings.lock().unwrap(); + Ok(settings + .socket + .as_ref() + .map(GioSocketWrapper::as_socket) + .to_value()) + } + Property::Object("used-socket", ..) => { + let mut settings = self.settings.lock().unwrap(); + Ok(settings + .used_socket + .as_ref() + .map(GioSocketWrapper::as_socket) + .to_value()) + } Property::String("context", ..) => { let mut settings = self.settings.lock().unwrap(); Ok(settings.context.to_value()) |