diff options
author | Sebastian Dröge <sebastian@centricular.com> | 2018-05-23 10:32:06 +0300 |
---|---|---|
committer | Sebastian Dröge <sebastian@centricular.com> | 2018-11-05 14:36:47 +0300 |
commit | 9adf6630737bbf14274886e9d370ef841853a901 (patch) | |
tree | 736a4c0765f3d5e4ad9ceac2b3a86d0d1eb6f8a4 | |
parent | c5d901609f5c345343673547f0cede664c5bd46f (diff) |
threadshare: Add property to udpsrc for allowing port/address reuse
-rw-r--r-- | gst-plugin-threadshare/Cargo.toml | 1 | ||||
-rw-r--r-- | gst-plugin-threadshare/src/iocontext.rs | 10 | ||||
-rw-r--r-- | gst-plugin-threadshare/src/lib.rs | 2 | ||||
-rw-r--r-- | gst-plugin-threadshare/src/udpsrc.rs | 62 |
4 files changed, 71 insertions, 4 deletions
diff --git a/gst-plugin-threadshare/Cargo.toml b/gst-plugin-threadshare/Cargo.toml index 7ec7fe912..7e93d3114 100644 --- a/gst-plugin-threadshare/Cargo.toml +++ b/gst-plugin-threadshare/Cargo.toml @@ -20,6 +20,7 @@ futures = "0.1" lazy_static = "1.0" either = "1.0" rand = "0.4" +net2 = "0.2" [lib] name = "gstthreadshare" diff --git a/gst-plugin-threadshare/src/iocontext.rs b/gst-plugin-threadshare/src/iocontext.rs index 13c430412..53b4ea4f5 100644 --- a/gst-plugin-threadshare/src/iocontext.rs +++ b/gst-plugin-threadshare/src/iocontext.rs @@ -246,6 +246,7 @@ pub struct IOContext(Arc<IOContextInner>); struct IOContextInner { name: String, pool: Either<thread_pool::ThreadPool, IOContextExecutor>, + handle: reactor::Handle, // Only used for dropping _shutdown: IOContextShutdown, pending_futures: Mutex<( @@ -274,12 +275,12 @@ impl IOContext { let reactor = reactor::Reactor::new()?; + let handle = reactor.handle().clone(); let (pool, shutdown) = if n_threads >= 0 { - let handle = reactor.handle().clone(); - let timers = Arc::new(Mutex::new(HashMap::<_, timer::Handle>::new())); let t1 = timers.clone(); + let handle = handle.clone(); let shutdown = IOContextRunner::start(name, wait, reactor); // FIXME: The executor threads are not throttled at all, only the reactor @@ -319,6 +320,7 @@ impl IOContext { let context = Arc::new(IOContextInner { name: name.into(), pool, + handle, _shutdown: shutdown, pending_futures: Mutex::new((0, HashMap::new())), }); @@ -338,6 +340,10 @@ impl IOContext { } } + pub fn handle(&self) -> &reactor::Handle { + &self.0.handle + } + pub fn acquire_pending_future_id(&self) -> PendingFutureId { let mut pending_futures = self.0.pending_futures.lock().unwrap(); let id = pending_futures.0; diff --git a/gst-plugin-threadshare/src/lib.rs b/gst-plugin-threadshare/src/lib.rs index 9d7f6aa85..064a303d3 100644 --- a/gst-plugin-threadshare/src/lib.rs +++ b/gst-plugin-threadshare/src/lib.rs @@ -41,6 +41,8 @@ extern crate rand; #[macro_use] extern crate lazy_static; +extern crate net2; + mod iocontext; mod udpsocket; diff --git a/gst-plugin-threadshare/src/udpsrc.rs b/gst-plugin-threadshare/src/udpsrc.rs index f777cf455..d4d0aa79b 100644 --- a/gst-plugin-threadshare/src/udpsrc.rs +++ b/gst-plugin-threadshare/src/udpsrc.rs @@ -35,11 +35,14 @@ use either::Either; use rand; +use net2; + use iocontext::*; use udpsocket::*; const DEFAULT_ADDRESS: Option<&'static str> = Some("127.0.0.1"); const DEFAULT_PORT: u32 = 5000; +const DEFAULT_REUSE: bool = true; const DEFAULT_CAPS: Option<gst::Caps> = None; const DEFAULT_MTU: u32 = 1500; const DEFAULT_CONTEXT: &'static str = ""; @@ -50,6 +53,7 @@ const DEFAULT_CONTEXT_WAIT: u32 = 0; struct Settings { address: Option<String>, port: u32, + reuse: bool, caps: Option<gst::Caps>, mtu: u32, context: String, @@ -62,6 +66,7 @@ impl Default for Settings { Settings { address: DEFAULT_ADDRESS.map(Into::into), port: DEFAULT_PORT, + reuse: DEFAULT_REUSE, caps: DEFAULT_CAPS, mtu: DEFAULT_MTU, context: DEFAULT_CONTEXT.into(), @@ -71,7 +76,7 @@ impl Default for Settings { } } -static PROPERTIES: [Property; 7] = [ +static PROPERTIES: [Property; 8] = [ Property::String( "address", "Address", @@ -87,6 +92,13 @@ static PROPERTIES: [Property; 7] = [ DEFAULT_PORT, PropertyMutability::ReadWrite, ), + Property::Boolean( + "reuse", + "Reuse", + "Allow reuse of the port", + DEFAULT_REUSE, + PropertyMutability::ReadWrite, + ), Property::Boxed( "caps", "Caps", @@ -442,6 +454,7 @@ impl UdpSrc { // TODO: TTL, multicast loopback, etc let socket = if addr.is_multicast() { + // TODO: Use ::unspecified() constructor once stable let bind_addr = if addr.is_ipv4() { IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)) @@ -458,13 +471,50 @@ impl UdpSrc { addr ); - let socket = net::UdpSocket::bind(&saddr).map_err(|err| { + let builder = if bind_addr.is_ipv4() { + net2::UdpBuilder::new_v4() + } else { + net2::UdpBuilder::new_v6() + }.map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenRead, + ["Failed to create socket: {}", err] + ) + })?; + + builder.reuse_address(settings.reuse).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenRead, + ["Failed to set reuse_address: {}", err] + ) + })?; + + #[cfg(unix)] + { + use net2::unix::UnixUdpBuilderExt; + + builder.reuse_port(settings.reuse).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenRead, + ["Failed to set reuse_port: {}", err] + ) + })?; + } + + let socket = builder.bind(&saddr).map_err(|err| { gst_error_msg!( gst::ResourceError::OpenRead, ["Failed to bind socket: {}", err] ) })?; + let socket = net::UdpSocket::from_std(socket, io_context.handle()).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenRead, + ["Failed to setup socket for tokio: {}", err] + ) + })?; + // TODO: Multicast interface configuration, going to be tricky match addr { IpAddr::V4(addr) => { @@ -637,6 +687,10 @@ impl ObjectImpl<Element> for UdpSrc { let mut settings = self.settings.lock().unwrap(); settings.port = value.get().unwrap(); } + Property::Boolean("reuse", ..) => { + let mut settings = self.settings.lock().unwrap(); + settings.reuse = value.get().unwrap(); + } Property::Boxed("caps", ..) => { let mut settings = self.settings.lock().unwrap(); settings.caps = value.get(); @@ -673,6 +727,10 @@ impl ObjectImpl<Element> for UdpSrc { let mut settings = self.settings.lock().unwrap(); Ok(settings.port.to_value()) } + Property::Boolean("reuse", ..) => { + let mut settings = self.settings.lock().unwrap(); + Ok(settings.reuse.to_value()) + } Property::Boxed("caps", ..) => { let mut settings = self.settings.lock().unwrap(); Ok(settings.caps.to_value()) |