diff options
author | François Laignel <fengalin@free.fr> | 2022-11-04 16:11:44 +0300 |
---|---|---|
committer | Sebastian Dröge <slomo@coaxion.net> | 2022-11-09 10:55:04 +0300 |
commit | e1afa43aa3d4f11d9a513966b0f0175eaab3924a (patch) | |
tree | 7ed047a763198d7fa47166943b6bac22844c8832 /generic/threadshare/src | |
parent | 29a490f6dc7b792df7ab45f6a79cbfbee694d332 (diff) |
ts/udpsink: handle items in the PadSinkHandler
... instead of forwarding them to a Task via a channel.
This improves CPU usage by 5% according to `udpsrc-benchmark-sender`
with the `tuning` feature using default audio test buffers and
400 streams on the same ts-context.
It is expected to improve latency significantly. This is inferred
from `ts-standalone`: latency shrinks from around 5ms to 1.5µs
using the `task` sink compared to the `async-mutex` sink.
The async Mutex is mandatory here as we need to hold the lock
across await points.
Diffstat (limited to 'generic/threadshare/src')
-rw-r--r-- | generic/threadshare/src/udpsink/imp.rs | 1088 |
1 files changed, 505 insertions, 583 deletions
diff --git a/generic/threadshare/src/udpsink/imp.rs b/generic/threadshare/src/udpsink/imp.rs index cfb0763f3..770532356 100644 --- a/generic/threadshare/src/udpsink/imp.rs +++ b/generic/threadshare/src/udpsink/imp.rs @@ -19,7 +19,6 @@ use futures::future::BoxFuture; use futures::prelude::*; -use futures::stream::Peekable; use gst::glib; use gst::prelude::*; @@ -29,15 +28,14 @@ use gst::{element_error, error_msg}; use once_cell::sync::Lazy; +use crate::runtime::executor::block_on_or_add_sub_task; use crate::runtime::prelude::*; -use crate::runtime::{self, Async, Context, PadSink, Task}; +use crate::runtime::{self, Async, Context, PadSink}; use crate::socket::{wrap_socket, GioSocketWrapper}; use std::collections::BTreeSet; use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; -use std::pin::Pin; -use std::sync::Mutex; -use std::task::Poll; +use std::sync::{Arc, Mutex}; use std::time::Duration; use std::u16; use std::u8; @@ -62,6 +60,25 @@ const DEFAULT_CLIENTS: &str = ""; const DEFAULT_CONTEXT: &str = ""; const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO; +#[derive(Debug, Clone, Copy)] +struct SocketConf { + auto_multicast: bool, + multicast_loop: bool, + ttl: u32, + ttl_mc: u32, +} + +impl Default for SocketConf { + fn default() -> Self { + SocketConf { + auto_multicast: DEFAULT_AUTO_MULTICAST, + multicast_loop: DEFAULT_LOOP, + ttl: DEFAULT_TTL, + ttl_mc: DEFAULT_TTL_MC, + } + } +} + #[derive(Debug, Clone)] struct Settings { sync: bool, @@ -73,15 +90,10 @@ struct Settings { used_socket: Option<GioSocketWrapper>, socket_v6: Option<GioSocketWrapper>, used_socket_v6: Option<GioSocketWrapper>, - auto_multicast: bool, - multicast_loop: bool, - ttl: u32, - ttl_mc: u32, + socket_conf: SocketConf, qos_dscp: i32, context: String, context_wait: Duration, - clients: BTreeSet<SocketAddr>, - latency: Option<gst::ClockTime>, } impl Default for Settings { @@ -96,18 +108,10 @@ impl Default for Settings { used_socket: DEFAULT_USED_SOCKET, socket_v6: DEFAULT_SOCKET_V6, used_socket_v6: DEFAULT_USED_SOCKET_V6, - auto_multicast: DEFAULT_AUTO_MULTICAST, - multicast_loop: DEFAULT_LOOP, - ttl: DEFAULT_TTL, - ttl_mc: DEFAULT_TTL_MC, + socket_conf: SocketConf::default(), qos_dscp: DEFAULT_QOS_DSCP, context: DEFAULT_CONTEXT.into(), context_wait: DEFAULT_CONTEXT_WAIT, - clients: BTreeSet::from([SocketAddr::new( - DEFAULT_HOST.unwrap().parse().unwrap(), - DEFAULT_PORT as u16, - )]), - latency: None, } } } @@ -120,14 +124,166 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| { ) }); -#[derive(Debug)] -enum TaskItem { - Buffer(gst::Buffer), - Event(gst::Event), -} +#[derive(Clone, Debug, Default)] +struct UdpSinkPadHandler(Arc<futures::lock::Mutex<UdpSinkPadHandlerInner>>); + +impl UdpSinkPadHandler { + fn prepare( + &self, + _imp: &UdpSink, + socket: Option<Async<UdpSocket>>, + socket_v6: Option<Async<UdpSocket>>, + settings: &Settings, + ) -> Result<(), gst::ErrorMessage> { + futures::executor::block_on(async move { + let mut inner = self.0.lock().await; + + inner.sync = settings.sync; + inner.socket_conf = settings.socket_conf; + inner.socket = socket; + inner.socket_v6 = socket_v6; + + for addr in inner.clients.iter() { + inner.configure_client(addr)?; + } + + Ok(()) + }) + } + + fn unprepare(&self) { + futures::executor::block_on(async move { + let mut inner = self.0.lock().await; + + for addr in inner.clients.iter() { + let _ = inner.unconfigure_client(addr); + } + + inner.socket = None; + inner.socket_v6 = None; + }) + } + + fn start(&self) { + futures::executor::block_on(async move { + self.0.lock().await.is_flushing = false; + }) + } + + fn stop(&self) { + futures::executor::block_on(async move { + self.0.lock().await.is_flushing = true; + }) + } + + fn set_sync(&self, sync: bool) { + futures::executor::block_on(async move { + self.0.lock().await.sync = sync; + }) + } + + fn set_latency(&self, latency: Option<gst::ClockTime>) { + futures::executor::block_on(async move { + self.0.lock().await.latency = latency; + }) + } + + fn set_socket_conf(&self, socket_conf: SocketConf) { + futures::executor::block_on(async move { + self.0.lock().await.socket_conf = socket_conf; + }) + } + + fn clients(&self) -> BTreeSet<SocketAddr> { + futures::executor::block_on(async move { self.0.lock().await.clients.clone() }) + } + + fn add_client(&self, imp: &UdpSink, addr: SocketAddr) { + futures::executor::block_on(async move { + let mut inner = self.0.lock().await; + if inner.clients.contains(&addr) { + gst::warning!(CAT, imp: imp, "Not adding client {addr:?} again"); + return; + } + + match inner.configure_client(&addr) { + Ok(()) => { + gst::info!(CAT, imp: imp, "Added client {addr:?}"); + inner.clients.insert(addr); + } + Err(err) => { + gst::error!(CAT, imp: imp, "Failed to add client {addr:?}: {err}"); + imp.obj().post_error_message(err); + } + } + }) + } + + fn remove_client(&self, imp: &UdpSink, addr: SocketAddr) { + futures::executor::block_on(async move { + let mut inner = self.0.lock().await; + if inner.clients.take(&addr).is_none() { + gst::warning!(CAT, imp: imp, "Not removing unknown client {addr:?}"); + return; + } -#[derive(Clone, Debug)] -struct UdpSinkPadHandler; + match inner.unconfigure_client(&addr) { + Ok(()) => { + gst::info!(CAT, imp: imp, "Removed client {addr:?}"); + } + Err(err) => { + gst::error!(CAT, imp: imp, "Failed to remove client {addr:?}: {err}"); + imp.obj().post_error_message(err); + } + } + }) + } + + fn replace_clients(&self, imp: &UdpSink, mut new_clients: BTreeSet<SocketAddr>) { + futures::executor::block_on(async move { + let mut inner = self.0.lock().await; + if new_clients.is_empty() { + gst::info!(CAT, imp: imp, "Clearing clients"); + } else { + gst::info!(CAT, imp: imp, "Replacing clients"); + } + + let old_clients = std::mem::take(&mut inner.clients); + + let mut res = Ok(()); + + for addr in old_clients.iter() { + if new_clients.take(addr).is_some() { + // client is already configured + inner.clients.insert(*addr); + } else if let Err(err) = inner.unconfigure_client(addr) { + gst::error!(CAT, imp: imp, "Failed to remove client {addr:?}: {err}"); + res = Err(err); + } else { + gst::info!(CAT, imp: imp, "Removed client {addr:?}"); + } + } + + for addr in new_clients.into_iter() { + if let Err(err) = inner.configure_client(&addr) { + gst::error!(CAT, imp: imp, "Failed to add client {addr:?}: {err}"); + res = Err(err); + } else { + gst::info!(CAT, imp: imp, "Added client {addr:?}"); + inner.clients.insert(addr); + } + } + + // FIXME: which error handling: + // - If at least one client could be configured, should we keep going? (current) + // - or, should we consider the preparation failed when the first client + // configuration fails? (previously) + if let Err(err) = res { + imp.obj().post_error_message(err); + } + }) + } +} impl PadSinkHandler for UdpSinkPadHandler { type ElementImpl = UdpSink; @@ -138,16 +294,7 @@ impl PadSinkHandler for UdpSinkPadHandler { elem: super::UdpSink, buffer: gst::Buffer, ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> { - let sender = elem.imp().clone_item_sender(); - async move { - if sender.send_async(TaskItem::Buffer(buffer)).await.is_err() { - gst::debug!(CAT, obj: elem, "Flushing"); - return Err(gst::FlowError::Flushing); - } - - Ok(gst::FlowSuccess::Ok) - } - .boxed() + async move { self.0.lock().await.handle_buffer(&elem, buffer).await }.boxed() } fn sink_chain_list( @@ -156,13 +303,10 @@ impl PadSinkHandler for UdpSinkPadHandler { elem: super::UdpSink, list: gst::BufferList, ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> { - let sender = elem.imp().clone_item_sender(); async move { + let mut inner = self.0.lock().await; for buffer in list.iter_owned() { - if sender.send_async(TaskItem::Buffer(buffer)).await.is_err() { - gst::debug!(CAT, obj: elem, "Flushing"); - return Err(gst::FlowError::Flushing); - } + inner.handle_buffer(&elem, buffer).await?; } Ok(gst::FlowSuccess::Ok) @@ -176,13 +320,23 @@ impl PadSinkHandler for UdpSinkPadHandler { elem: super::UdpSink, event: gst::Event, ) -> BoxFuture<'static, bool> { - let sender = elem.imp().clone_item_sender(); async move { - if let EventView::FlushStop(_) = event.view() { - let imp = elem.imp(); - return imp.task.flush_stop().await_maybe_on_context().is_ok(); - } else if sender.send_async(TaskItem::Event(event)).await.is_err() { - gst::debug!(CAT, obj: elem, "Flushing"); + gst::debug!(CAT, obj: elem, "Handling {event:?}"); + + match event.view() { + EventView::Eos(_) => { + let _ = elem.post_message(gst::message::Eos::builder().src(&elem).build()); + } + EventView::Segment(e) => { + self.0.lock().await.segment = Some(e.segment().clone()); + } + EventView::FlushStop(_) => { + self.0.lock().await.is_flushing = false; + } + EventView::SinkMessage(e) => { + let _ = elem.post_message(e.message()); + } + _ => (), } true @@ -191,8 +345,12 @@ impl PadSinkHandler for UdpSinkPadHandler { } fn sink_event(self, _pad: &gst::Pad, imp: &UdpSink, event: gst::Event) -> bool { + gst::debug!(CAT, imp: imp, "Handling {event:?}"); + if let EventView::FlushStart(..) = event.view() { - return imp.task.flush_start().await_maybe_on_context().is_ok(); + block_on_or_add_sub_task(async move { + self.0.lock().await.is_flushing = true; + }); } true @@ -200,278 +358,43 @@ impl PadSinkHandler for UdpSinkPadHandler { } #[derive(Debug)] -enum Command { - AddClient(SocketAddr), - RemoveClient(SocketAddr), - ReplaceWithClients(BTreeSet<SocketAddr>), - SetLatency(Option<gst::ClockTime>), - SetSync(bool), -} - -struct UdpSinkTask { - element: super::UdpSink, - item_receiver: Peekable<flume::r#async::RecvStream<'static, TaskItem>>, - cmd_receiver: flume::Receiver<Command>, - clients: BTreeSet<SocketAddr>, - socket: Option<Async<UdpSocket>>, - socket_v6: Option<Async<UdpSocket>>, +struct UdpSinkPadHandlerInner { + is_flushing: bool, sync: bool, latency: Option<gst::ClockTime>, + socket: Option<Async<UdpSocket>>, + socket_v6: Option<Async<UdpSocket>>, + clients: BTreeSet<SocketAddr>, + socket_conf: SocketConf, segment: Option<gst::Segment>, } -impl UdpSinkTask { - fn new( - element: &super::UdpSink, - item_receiver: flume::Receiver<TaskItem>, - cmd_receiver: flume::Receiver<Command>, - ) -> Self { - UdpSinkTask { - element: element.clone(), - item_receiver: item_receiver.into_stream().peekable(), - cmd_receiver, - clients: Default::default(), +impl Default for UdpSinkPadHandlerInner { + fn default() -> Self { + Self { + is_flushing: true, + sync: DEFAULT_SYNC, + latency: None, socket: None, socket_v6: None, - sync: false, - latency: None, + clients: BTreeSet::from([SocketAddr::new( + DEFAULT_HOST.unwrap().parse().unwrap(), + DEFAULT_PORT as u16, + )]), + socket_conf: Default::default(), segment: None, } } - - async fn flush(&mut self) { - // Purge the channel - while let Poll::Ready(Some(_item)) = futures::poll!(self.item_receiver.next()) {} - } - - fn process_command(&mut self, cmd: Command) { - use Command::*; - match cmd { - AddClient(client) => self.add_client(client), - RemoveClient(client) => self.remove_client(&client), - ReplaceWithClients(clients) => self.replace_with_clients(clients), - SetSync(sync) => self.sync = sync, - SetLatency(latency) => self.latency = latency, - } - } } /// Socket configuration. -impl UdpSinkTask { - fn prepare_socket( - &self, - settings: &mut Settings, - family: SocketFamily, - ) -> Result<Option<Async<UdpSocket>>, gst::ErrorMessage> { - let wrapped_socket = match family { - SocketFamily::Ipv4 => &settings.socket, - SocketFamily::Ipv6 => &settings.socket_v6, - }; - - if let Some(ref wrapped_socket) = wrapped_socket { - let socket: UdpSocket = wrapped_socket.get(); - let socket = Async::<UdpSocket>::try_from(socket).map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to setup Async socket: {}", err] - ) - })?; - - match family { - SocketFamily::Ipv4 => { - settings.used_socket = Some(wrapped_socket.clone()); - } - SocketFamily::Ipv6 => { - settings.used_socket_v6 = Some(wrapped_socket.clone()); - } - } - - Ok(Some(socket)) - } else { - let bind_addr = match family { - SocketFamily::Ipv4 => &settings.bind_address, - SocketFamily::Ipv6 => &settings.bind_address_v6, - }; - - let bind_addr: IpAddr = bind_addr.parse().map_err(|err| { - error_msg!( - gst::ResourceError::Settings, - ["Invalid address '{}' set: {}", bind_addr, err] - ) - })?; - - let bind_port = match family { - SocketFamily::Ipv4 => settings.bind_port, - SocketFamily::Ipv6 => settings.bind_port_v6, - }; - - let saddr = SocketAddr::new(bind_addr, bind_port as u16); - gst::debug!(CAT, obj: self.element, "Binding to {:?}", saddr); - - let socket = match family { - SocketFamily::Ipv4 => socket2::Socket::new( - socket2::Domain::IPV4, - socket2::Type::DGRAM, - Some(socket2::Protocol::UDP), - ), - SocketFamily::Ipv6 => socket2::Socket::new( - socket2::Domain::IPV6, - socket2::Type::DGRAM, - Some(socket2::Protocol::UDP), - ), - }; - - let socket = match socket { - Ok(socket) => socket, - Err(err) => { - gst::warning!( - CAT, - obj: self.element, - "Failed to create {} socket: {}", - match family { - SocketFamily::Ipv4 => "IPv4", - SocketFamily::Ipv6 => "IPv6", - }, - err - ); - return Ok(None); - } - }; - - socket.bind(&saddr.into()).map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to bind socket: {}", err] - ) - })?; - - let socket = Async::<UdpSocket>::try_from(socket).map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to setup Async socket: {}", err] - ) - })?; - - let wrapper = wrap_socket(&socket)?; - - if settings.qos_dscp != -1 { - wrapper.set_tos(settings.qos_dscp).map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to set QoS DSCP: {}", err] - ) - })?; - } - - match family { - SocketFamily::Ipv4 => { - settings.used_socket = Some(wrapper); - } - SocketFamily::Ipv6 => { - settings.used_socket_v6 = Some(wrapper); - } - } - - Ok(Some(socket)) - } - } - - fn add_client(&mut self, addr: SocketAddr) { - if self.clients.contains(&addr) { - gst::warning!(CAT, obj: self.element, "Not adding client {:?} again", &addr); - return; - } - - let udpsink = self.element.imp(); - let mut settings = udpsink.settings.lock().unwrap(); - match self.configure_client(&settings, &addr) { - Ok(()) => { - gst::info!(CAT, obj: self.element, "Added client {:?}", addr); - self.clients.insert(addr); - } - Err(err) => { - gst::error!(CAT, obj: self.element, "Failed to add client {:?}: {}", addr, err); - settings.clients = self.clients.clone(); - self.element.post_error_message(err); - } - } - } - - fn remove_client(&mut self, addr: &SocketAddr) { - if self.clients.take(addr).is_none() { - gst::warning!(CAT, obj: self.element, "Not removing unknown client {:?}", &addr); - return; - } - - let udpsink = self.element.imp(); - let mut settings = udpsink.settings.lock().unwrap(); - match self.unconfigure_client(&settings, addr) { - Ok(()) => { - gst::info!(CAT, obj: self.element, "Removed client {:?}", addr); - } - Err(err) => { - gst::error!(CAT, obj: self.element, "Failed to remove client {:?}: {}", addr, err); - settings.clients = self.clients.clone(); - self.element.post_error_message(err); - } - } - } - - fn replace_with_clients(&mut self, mut clients_to_add: BTreeSet<SocketAddr>) { - if clients_to_add.is_empty() { - gst::info!(CAT, obj: self.element, "Clearing clients"); - } else { - gst::info!(CAT, obj: self.element, "Replacing clients"); - } - - let old_clients = std::mem::take(&mut self.clients); - - let mut res = Ok(()); - let udpsink = self.element.imp(); - let mut settings = udpsink.settings.lock().unwrap(); - - for addr in old_clients.iter() { - if clients_to_add.take(addr).is_some() { - // client is already configured - self.clients.insert(*addr); - } else if let Err(err) = self.unconfigure_client(&settings, addr) { - gst::error!(CAT, obj: self.element, "Failed to remove client {:?}: {}", addr, err); - res = Err(err); - } else { - gst::info!(CAT, obj: self.element, "Removed client {:?}", addr); - } - } - - for addr in clients_to_add.into_iter() { - if let Err(err) = self.configure_client(&settings, &addr) { - gst::error!(CAT, obj: self.element, "Failed to add client {:?}: {}", addr, err); - res = Err(err); - } else { - gst::info!(CAT, obj: self.element, "Added client {:?}", addr); - self.clients.insert(addr); - } - } - - // FIXME: which error handling: - // - If at least one client could be configured, should we keep going? (current) - // - or, should we consider the preparation failed when the first client - // configuration fails? (previously) - if let Err(err) = res { - settings.clients = self.clients.clone(); - self.element.post_error_message(err); - } - } - - fn configure_client( - &self, - settings: &Settings, - client: &SocketAddr, - ) -> Result<(), gst::ErrorMessage> { +impl UdpSinkPadHandlerInner { + fn configure_client(&self, client: &SocketAddr) -> Result<(), gst::ErrorMessage> { if client.ip().is_multicast() { match client.ip() { IpAddr::V4(addr) => { if let Some(socket) = self.socket.as_ref() { - if settings.auto_multicast { + if self.socket_conf.auto_multicast { socket .as_ref() .join_multicast_v4(&addr, &Ipv4Addr::new(0, 0, 0, 0)) @@ -486,7 +409,7 @@ impl UdpSinkTask { ) })?; } - if settings.multicast_loop { + if self.socket_conf.multicast_loop { socket.as_ref().set_multicast_loop_v4(true).map_err(|err| { error_msg!( gst::ResourceError::OpenWrite, @@ -497,7 +420,7 @@ impl UdpSinkTask { socket .as_ref() - .set_multicast_ttl_v4(settings.ttl_mc) + .set_multicast_ttl_v4(self.socket_conf.ttl_mc) .map_err(|err| { error_msg!( gst::ResourceError::OpenWrite, @@ -508,7 +431,7 @@ impl UdpSinkTask { } IpAddr::V6(addr) => { if let Some(socket) = self.socket_v6.as_ref() { - if settings.auto_multicast { + if self.socket_conf.auto_multicast { socket.as_ref().join_multicast_v6(&addr, 0).map_err(|err| { error_msg!( gst::ResourceError::OpenWrite, @@ -516,7 +439,7 @@ impl UdpSinkTask { ) })?; } - if settings.multicast_loop { + if self.socket_conf.multicast_loop { socket.as_ref().set_multicast_loop_v6(true).map_err(|err| { error_msg!( gst::ResourceError::OpenWrite, @@ -532,22 +455,28 @@ impl UdpSinkTask { match client.ip() { IpAddr::V4(_) => { if let Some(socket) = self.socket.as_ref() { - socket.as_ref().set_ttl(settings.ttl).map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to set unicast ttl for {:?}: {}", client, err] - ) - })?; + socket + .as_ref() + .set_ttl(self.socket_conf.ttl) + .map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to set unicast ttl for {:?}: {}", client, err] + ) + })?; } } IpAddr::V6(_) => { if let Some(socket) = self.socket_v6.as_ref() { - socket.as_ref().set_ttl(settings.ttl).map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to set unicast ttl for {:?}: {}", client, err] - ) - })?; + socket + .as_ref() + .set_ttl(self.socket_conf.ttl) + .map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to set unicast ttl for {:?}: {}", client, err] + ) + })?; } } } @@ -556,16 +485,12 @@ impl UdpSinkTask { Ok(()) } - fn unconfigure_client( - &self, - settings: &Settings, - client: &SocketAddr, - ) -> Result<(), gst::ErrorMessage> { + fn unconfigure_client(&self, client: &SocketAddr) -> Result<(), gst::ErrorMessage> { if client.ip().is_multicast() { match client.ip() { IpAddr::V4(addr) => { if let Some(socket) = self.socket.as_ref() { - if settings.auto_multicast { + if self.socket_conf.auto_multicast { socket .as_ref() .leave_multicast_v4(&addr, &Ipv4Addr::new(0, 0, 0, 0)) @@ -584,7 +509,7 @@ impl UdpSinkTask { } IpAddr::V6(addr) => { if let Some(socket) = self.socket_v6.as_ref() { - if settings.auto_multicast { + if self.socket_conf.auto_multicast { socket .as_ref() .leave_multicast_v6(&addr, 0) @@ -609,11 +534,15 @@ impl UdpSinkTask { } /// Buffer handling. -impl UdpSinkTask { - async fn render(&mut self, buffer: gst::Buffer) -> Result<(), gst::FlowError> { +impl UdpSinkPadHandlerInner { + async fn render( + &mut self, + elem: &super::UdpSink, + buffer: gst::Buffer, + ) -> Result<gst::FlowSuccess, gst::FlowError> { let data = buffer.map_readable().map_err(|_| { - element_error!( - self.element, + gst::element_error!( + elem, gst::StreamError::Format, ["Failed to map buffer readable"] ); @@ -627,10 +556,10 @@ impl UdpSinkTask { }; if let Some(socket) = socket.as_mut() { - gst::log!(CAT, obj: self.element, "Sending to {:?}", &client); + gst::log!(CAT, obj: elem, "Sending to {client:?}"); socket.send_to(&data, *client).await.map_err(|err| { - element_error!( - self.element, + gst::element_error!( + elem, gst::StreamError::Failed, ("I/O error"), ["streaming stopped, I/O error {}", err] @@ -638,8 +567,8 @@ impl UdpSinkTask { gst::FlowError::Error })?; } else { - element_error!( - self.element, + gst::element_error!( + elem, gst::StreamError::Failed, ("I/O error"), ["No socket available for sending to {}", client] @@ -648,157 +577,60 @@ impl UdpSinkTask { } } - gst::log!( - CAT, - obj: self.element, - "Sent buffer {:?} to all clients", - &buffer - ); + gst::log!(CAT, obj: elem, "Sent buffer {buffer:?} to all clients"); - Ok(()) + Ok(gst::FlowSuccess::Ok) } /// Waits until specified time. - async fn sync(&self, running_time: gst::ClockTime) { - let now = self.element.current_running_time(); + async fn sync(&self, elem: &super::UdpSink, running_time: gst::ClockTime) { + let now = elem.current_running_time(); if let Ok(Some(delay)) = running_time.opt_checked_sub(now) { - gst::trace!(CAT, obj: self.element, "sync: waiting {}", delay); + gst::trace!(CAT, obj: elem, "sync: waiting {delay}"); runtime::timer::delay_for(delay.into()).await; } } -} - -impl TaskImpl for UdpSinkTask { - type Item = TaskItem; - fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { - async move { - gst::info!(CAT, obj: self.element, "Preparing Task"); - assert!(self.clients.is_empty()); - let clients = { - let udpsink = self.element.imp(); - let mut settings = udpsink.settings.lock().unwrap(); - self.sync = settings.sync; - self.socket = self.prepare_socket(&mut settings, SocketFamily::Ipv4)?; - self.socket_v6 = self.prepare_socket(&mut settings, SocketFamily::Ipv6)?; - self.latency = settings.latency; - settings.clients.clone() - }; - - self.replace_with_clients(clients); + async fn handle_buffer( + &mut self, + elem: &super::UdpSink, + buffer: gst::Buffer, + ) -> Result<gst::FlowSuccess, gst::FlowError> { + if self.is_flushing { + gst::info!(CAT, obj: elem, "Discarding {buffer:?} (flushing)"); - Ok(()) + return Err(gst::FlowError::Flushing); } - .boxed() - } - fn unprepare(&mut self) -> BoxFuture<'_, ()> { - async move { - gst::info!(CAT, obj: self.element, "Unpreparing Task"); + if self.sync { + let rtime = self.segment.as_ref().and_then(|segment| { + segment + .downcast_ref::<gst::format::Time>() + .and_then(|segment| segment.to_running_time(buffer.pts()).opt_add(self.latency)) + }); - let udpsink = self.element.imp(); - let settings = udpsink.settings.lock().unwrap(); - for addr in self.clients.iter() { - let _ = self.unconfigure_client(&settings, addr); - } - } - .boxed() - } + if let Some(rtime) = rtime { + self.sync(elem, rtime).await; - fn try_next(&mut self) -> BoxFuture<'_, Result<TaskItem, gst::FlowError>> { - async move { - loop { - gst::info!(CAT, obj: self.element, "Awaiting next item or command"); - futures::select_biased! { - cmd = self.cmd_receiver.recv_async() => { - self.process_command(cmd.unwrap()); - } - item_opt = Pin::new(&mut self.item_receiver).peek() => { - // Check the peeked item in case we need to sync. - // The item will still be available in the channel - // in case this is cancelled by a state transition. - match item_opt { - Some(TaskItem::Buffer(buffer)) => { - if self.sync { - let rtime = self.segment.as_ref().and_then(|segment| { - segment - .downcast_ref::<gst::format::Time>() - .and_then(|segment| { - segment.to_running_time(buffer.pts()).opt_add(self.latency) - }) - }); - if let Some(rtime) = rtime { - // This can be cancelled by a state transition. - self.sync(rtime).await; - } - } - } - Some(_) => (), - None => { - panic!("Internal channel sender dropped while Task is Started"); - } - } + if self.is_flushing { + gst::info!(CAT, obj: elem, "Discarding {buffer:?} (flushing)"); - // An item was peeked above, we can now pop it without losing it. - return Ok(self.item_receiver.next().await.unwrap()); - } + return Err(gst::FlowError::Flushing); } } } - .boxed() - } - fn handle_item(&mut self, item: TaskItem) -> BoxFuture<'_, Result<(), gst::FlowError>> { - async move { - gst::info!(CAT, obj: self.element, "Handling {:?}", item); + gst::debug!(CAT, obj: elem, "Handling {buffer:?}"); - match item { - TaskItem::Buffer(buffer) => self.render(buffer).await.map_err(|err| { - element_error!( - &self.element, - gst::StreamError::Failed, - ["Failed to render item, stopping task: {}", err] - ); - gst::FlowError::Error - })?, - TaskItem::Event(event) => match event.view() { - EventView::Eos(_) => { - let _ = self - .element - .post_message(gst::message::Eos::builder().src(&self.element).build()); - } - EventView::Segment(e) => { - self.segment = Some(e.segment().clone()); - } - EventView::SinkMessage(e) => { - let _ = self.element.post_message(e.message()); - } - _ => (), - }, - } - - Ok(()) - } - .boxed() - } - - fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { - async { - gst::info!(CAT, obj: self.element, "Stopping Task"); - self.flush().await; - Ok(()) - } - .boxed() - } - - fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { - async { - gst::info!(CAT, obj: self.element, "Starting Task Flush"); - self.flush().await; - Ok(()) - } - .boxed() + self.render(elem, buffer).await.map_err(|err| { + element_error!( + elem, + gst::StreamError::Failed, + ["Failed to render item, stopping task: {}", err] + ); + gst::FlowError::Error + }) } } @@ -811,40 +643,153 @@ enum SocketFamily { #[derive(Debug)] pub struct UdpSink { sink_pad: PadSink, - task: Task, - item_sender: Mutex<Option<flume::Sender<TaskItem>>>, - cmd_sender: Mutex<Option<flume::Sender<Command>>>, + sink_pad_handler: UdpSinkPadHandler, settings: Mutex<Settings>, + ts_ctx: Mutex<Option<Context>>, } impl UdpSink { - #[track_caller] - fn clone_item_sender(&self) -> flume::Sender<TaskItem> { - self.item_sender.lock().unwrap().as_ref().unwrap().clone() - } + fn prepare_socket( + &self, + ts_ctx: &Context, + settings: &mut Settings, + family: SocketFamily, + ) -> Result<Option<Async<UdpSocket>>, gst::ErrorMessage> { + let wrapped_socket = match family { + SocketFamily::Ipv4 => &settings.socket, + SocketFamily::Ipv6 => &settings.socket_v6, + }; - fn prepare(&self) -> Result<(), gst::ErrorMessage> { - gst::debug!(CAT, imp: self, "Preparing"); + if let Some(ref wrapped_socket) = wrapped_socket { + let socket: UdpSocket = wrapped_socket.get(); + let socket = ts_ctx.enter(|| { + Async::<UdpSocket>::try_from(socket).map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to setup Async socket: {}", err] + ) + }) + })?; - let context = { - let settings = self.settings.lock().unwrap(); + match family { + SocketFamily::Ipv4 => { + settings.used_socket = Some(wrapped_socket.clone()); + } + SocketFamily::Ipv6 => { + settings.used_socket_v6 = Some(wrapped_socket.clone()); + } + } + + Ok(Some(socket)) + } else { + let bind_addr = match family { + SocketFamily::Ipv4 => &settings.bind_address, + SocketFamily::Ipv6 => &settings.bind_address_v6, + }; - Context::acquire(&settings.context, settings.context_wait).map_err(|err| { + let bind_addr: IpAddr = bind_addr.parse().map_err(|err| { + error_msg!( + gst::ResourceError::Settings, + ["Invalid address '{}' set: {}", bind_addr, err] + ) + })?; + + let bind_port = match family { + SocketFamily::Ipv4 => settings.bind_port, + SocketFamily::Ipv6 => settings.bind_port_v6, + }; + + let saddr = SocketAddr::new(bind_addr, bind_port as u16); + gst::debug!(CAT, imp: self, "Binding to {:?}", saddr); + + let socket = match family { + SocketFamily::Ipv4 => socket2::Socket::new( + socket2::Domain::IPV4, + socket2::Type::DGRAM, + Some(socket2::Protocol::UDP), + ), + SocketFamily::Ipv6 => socket2::Socket::new( + socket2::Domain::IPV6, + socket2::Type::DGRAM, + Some(socket2::Protocol::UDP), + ), + }; + + let socket = match socket { + Ok(socket) => socket, + Err(err) => { + gst::warning!( + CAT, + imp: self, + "Failed to create {} socket: {}", + match family { + SocketFamily::Ipv4 => "IPv4", + SocketFamily::Ipv6 => "IPv6", + }, + err + ); + return Ok(None); + } + }; + + socket.bind(&saddr.into()).map_err(|err| { error_msg!( gst::ResourceError::OpenWrite, - ["Failed to acquire Context: {}", err] + ["Failed to bind socket: {}", err] ) - })? - }; + })?; - // Enable backpressure for items - let (item_sender, item_receiver) = flume::bounded(0); - let (cmd_sender, cmd_receiver) = flume::unbounded(); - let task_impl = UdpSinkTask::new(&self.obj(), item_receiver, cmd_receiver); - self.task.prepare(task_impl, context).block_on()?; + let socket = ts_ctx.enter(|| { + Async::<UdpSocket>::try_from(socket).map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to setup Async socket: {}", err] + ) + }) + })?; - *self.item_sender.lock().unwrap() = Some(item_sender); - *self.cmd_sender.lock().unwrap() = Some(cmd_sender); + let wrapper = wrap_socket(&socket)?; + + if settings.qos_dscp != -1 { + wrapper.set_tos(settings.qos_dscp).map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to set QoS DSCP: {}", err] + ) + })?; + } + + match family { + SocketFamily::Ipv4 => { + settings.used_socket = Some(wrapper); + } + SocketFamily::Ipv6 => { + settings.used_socket_v6 = Some(wrapper); + } + } + + Ok(Some(socket)) + } + } + + fn prepare(&self) -> Result<(), gst::ErrorMessage> { + gst::debug!(CAT, imp: self, "Preparing"); + + let mut settings = self.settings.lock().unwrap(); + + let ts_ctx = Context::acquire(&settings.context, settings.context_wait).map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to acquire Context: {}", err] + ) + })?; + + let socket = self.prepare_socket(&ts_ctx, &mut settings, SocketFamily::Ipv4)?; + let socket_v6 = self.prepare_socket(&ts_ctx, &mut settings, SocketFamily::Ipv6)?; + + self.sink_pad_handler + .prepare(self, socket, socket_v6, &settings)?; + *self.ts_ctx.lock().unwrap() = Some(ts_ctx); gst::debug!(CAT, imp: self, "Started preparation"); @@ -853,75 +798,46 @@ impl UdpSink { fn unprepare(&self) { gst::debug!(CAT, imp: self, "Unpreparing"); - self.task.unprepare().block_on().unwrap(); + self.sink_pad_handler.unprepare(); + *self.ts_ctx.lock().unwrap() = None; gst::debug!(CAT, imp: self, "Unprepared"); } fn stop(&self) -> Result<(), gst::ErrorMessage> { gst::debug!(CAT, imp: self, "Stopping"); - self.task.stop().block_on()?; + self.sink_pad_handler.stop(); gst::debug!(CAT, imp: self, "Stopped"); Ok(()) } fn start(&self) -> Result<(), gst::ErrorMessage> { gst::debug!(CAT, imp: self, "Starting"); - self.task.start().block_on()?; + self.sink_pad_handler.start(); gst::debug!(CAT, imp: self, "Started"); Ok(()) } - fn add_client(&self, settings: &mut Settings, client: SocketAddr) { - settings.clients.insert(client); - if let Some(cmd_sender) = self.cmd_sender.lock().unwrap().as_mut() { - cmd_sender.send(Command::AddClient(client)).unwrap(); - } - } + fn try_into_socket_addr(&self, host: &str, port: i32) -> Result<SocketAddr, ()> { + let addr: IpAddr = match host.parse() { + Err(err) => { + gst::error!(CAT, imp: self, "Failed to parse host {}: {}", host, err); + return Err(()); + } + Ok(addr) => addr, + }; - fn remove_client(&self, settings: &mut Settings, client: SocketAddr) { - settings.clients.remove(&client); - if let Some(cmd_sender) = self.cmd_sender.lock().unwrap().as_mut() { - cmd_sender.send(Command::RemoveClient(client)).unwrap(); - } - } + let port: u16 = match port.try_into() { + Err(err) => { + gst::error!(CAT, imp: self, "Invalid port {}: {}", port, err); + return Err(()); + } + Ok(port) => port, + }; - fn replace_with_clients( - &self, - settings: &mut Settings, - clients: impl IntoIterator<Item = SocketAddr>, - ) { - let clients = BTreeSet::<SocketAddr>::from_iter(clients); - if let Some(cmd_sender) = self.cmd_sender.lock().unwrap().as_mut() { - settings.clients = clients.clone(); - cmd_sender - .send(Command::ReplaceWithClients(clients)) - .unwrap(); - } else { - settings.clients = clients; - } + Ok(SocketAddr::new(addr, port)) } } -fn try_into_socket_addr(imp: &UdpSink, host: &str, port: i32) -> Result<SocketAddr, ()> { - let addr: IpAddr = match host.parse() { - Err(err) => { - gst::error!(CAT, imp: imp, "Failed to parse host {}: {}", host, err); - return Err(()); - } - Ok(addr) => addr, - }; - - let port: u16 = match port.try_into() { - Err(err) => { - gst::error!(CAT, imp: imp, "Invalid port {}: {}", port, err); - return Err(()); - } - Ok(port) => port, - }; - - Ok(SocketAddr::new(addr, port)) -} - #[glib::object_subclass] impl ObjectSubclass for UdpSink { const NAME: &'static str = "GstTsUdpSink"; @@ -929,15 +845,15 @@ impl ObjectSubclass for UdpSink { type ParentType = gst::Element; fn with_class(klass: &Self::Class) -> Self { + let sink_pad_handler = UdpSinkPadHandler::default(); Self { sink_pad: PadSink::new( gst::Pad::from_template(&klass.pad_template("sink").unwrap(), Some("sink")), - UdpSinkPadHandler, + sink_pad_handler.clone(), ), - task: Task::default(), - item_sender: Default::default(), - cmd_sender: Default::default(), + sink_pad_handler, settings: Default::default(), + ts_ctx: Default::default(), } } } @@ -1051,14 +967,13 @@ impl ObjectImpl for UdpSink { .param_types([String::static_type(), i32::static_type()]) .action() .class_handler(|_, args| { - let element = args[0].get::<super::UdpSink>().expect("signal arg"); + let elem = args[0].get::<super::UdpSink>().expect("signal arg"); let host = args[1].get::<String>().expect("signal arg"); let port = args[2].get::<i32>().expect("signal arg"); - let udpsink = element.imp(); + let imp = elem.imp(); - if let Ok(addr) = try_into_socket_addr(udpsink, &host, port) { - let mut settings = udpsink.settings.lock().unwrap(); - udpsink.add_client(&mut settings, addr); + if let Ok(addr) = imp.try_into_socket_addr(&host, port) { + imp.sink_pad_handler.add_client(imp, addr); } None @@ -1068,14 +983,13 @@ impl ObjectImpl for UdpSink { .param_types([String::static_type(), i32::static_type()]) .action() .class_handler(|_, args| { - let element = args[0].get::<super::UdpSink>().expect("signal arg"); + let elem = args[0].get::<super::UdpSink>().expect("signal arg"); let host = args[1].get::<String>().expect("signal arg"); let port = args[2].get::<i32>().expect("signal arg"); - let udpsink = element.imp(); + let imp = elem.imp(); - if let Ok(addr) = try_into_socket_addr(udpsink, &host, port) { - let mut settings = udpsink.settings.lock().unwrap(); - udpsink.remove_client(&mut settings, addr); + if let Ok(addr) = imp.try_into_socket_addr(&host, port) { + imp.sink_pad_handler.remove_client(imp, addr); } None @@ -1084,11 +998,10 @@ impl ObjectImpl for UdpSink { glib::subclass::Signal::builder("clear") .action() .class_handler(|_, args| { - let element = args[0].get::<super::UdpSink>().expect("signal arg"); + let elem = args[0].get::<super::UdpSink>().expect("signal arg"); - let udpsink = element.imp(); - let mut settings = udpsink.settings.lock().unwrap(); - udpsink.replace_with_clients(&mut settings, BTreeSet::new()); + let imp = elem.imp(); + imp.sink_pad_handler.replace_clients(imp, BTreeSet::new()); None }) @@ -1105,9 +1018,7 @@ impl ObjectImpl for UdpSink { "sync" => { let sync = value.get().expect("type checked upstream"); settings.sync = sync; - if let Some(cmd_sender) = self.cmd_sender.lock().unwrap().as_mut() { - cmd_sender.send(Command::SetSync(sync)).unwrap(); - } + self.sink_pad_handler.set_sync(sync); } "bind-address" => { settings.bind_address = value @@ -1146,16 +1057,20 @@ impl ObjectImpl for UdpSink { unreachable!(); } "auto-multicast" => { - settings.auto_multicast = value.get().expect("type checked upstream"); + settings.socket_conf.auto_multicast = value.get().expect("type checked upstream"); + self.sink_pad_handler.set_socket_conf(settings.socket_conf); } "loop" => { - settings.multicast_loop = value.get().expect("type checked upstream"); + settings.socket_conf.multicast_loop = value.get().expect("type checked upstream"); + self.sink_pad_handler.set_socket_conf(settings.socket_conf); } "ttl" => { - settings.ttl = value.get().expect("type checked upstream"); + settings.socket_conf.ttl = value.get().expect("type checked upstream"); + self.sink_pad_handler.set_socket_conf(settings.socket_conf); } "ttl-mc" => { - settings.ttl_mc = value.get().expect("type checked upstream"); + settings.socket_conf.ttl_mc = value.get().expect("type checked upstream"); + self.sink_pad_handler.set_socket_conf(settings.socket_conf); } "qos-dscp" => { settings.qos_dscp = value.get().expect("type checked upstream"); @@ -1167,22 +1082,33 @@ impl ObjectImpl for UdpSink { .unwrap_or_else(|| "".into()); let clients = clients.split(',').filter_map(|client| { - let rsplit: Vec<&str> = client.rsplitn(2, ':').collect(); - - if rsplit.len() == 2 { - rsplit[0] - .parse::<i32>() - .map_err(|err| { - gst::error!(CAT, imp: self, "Invalid port {}: {}", rsplit[0], err); - }) - .and_then(|port| try_into_socket_addr(self, rsplit[1], port)) - .ok() + let mut splited = client.splitn(2, ':'); + if let Some((addr, port)) = splited.next().zip(splited.next()) { + match port.parse::<i32>() { + Ok(port) => match self.try_into_socket_addr(addr, port) { + Ok(socket_addr) => Some(socket_addr), + Err(()) => { + gst::error!( + CAT, + imp: self, + "Invalid socket address {addr}:{port}" + ); + None + } + }, + Err(err) => { + gst::error!(CAT, imp: self, "Invalid port {err}"); + None + } + } } else { + gst::error!(CAT, imp: self, "Invalid client {client}"); None } }); - self.replace_with_clients(&mut settings, clients); + let clients = BTreeSet::from_iter(clients); + self.sink_pad_handler.replace_clients(self, clients); } "context" => { settings.context = value @@ -1227,14 +1153,13 @@ impl ObjectImpl for UdpSink { .as_ref() .map(GioSocketWrapper::as_socket) .to_value(), - "auto-multicast" => settings.sync.to_value(), - "loop" => settings.multicast_loop.to_value(), - "ttl" => settings.ttl.to_value(), - "ttl-mc" => settings.ttl_mc.to_value(), + "auto-multicast" => settings.socket_conf.auto_multicast.to_value(), + "loop" => settings.socket_conf.multicast_loop.to_value(), + "ttl" => settings.socket_conf.ttl.to_value(), + "ttl-mc" => settings.socket_conf.ttl_mc.to_value(), "qos-dscp" => settings.qos_dscp.to_value(), "clients" => { - let clients = settings.clients.clone(); - drop(settings); + let clients = self.sink_pad_handler.clients(); let clients: Vec<String> = clients.iter().map(ToString::to_string).collect(); clients.join(",").to_value() @@ -1320,10 +1245,7 @@ impl ElementImpl for UdpSink { match event.view() { EventView::Latency(ev) => { let latency = Some(ev.latency()); - if let Some(cmd_sender) = self.cmd_sender.lock().unwrap().as_mut() { - cmd_sender.send(Command::SetLatency(latency)).unwrap(); - } - self.settings.lock().unwrap().latency = latency; + self.sink_pad_handler.set_latency(latency); self.sink_pad.gst_pad().push_event(event) } EventView::Step(..) => false, |