diff options
author | Sebastian Dröge <sebastian@centricular.com> | 2020-11-14 20:09:42 +0300 |
---|---|---|
committer | Sebastian Dröge <sebastian@centricular.com> | 2020-11-15 19:25:42 +0300 |
commit | af0337c26c25cc782142f914dfa19383e51e1743 (patch) | |
tree | 60263c9ee978753df8cc5be55ed04ce3c9c65488 /generic/threadshare/src/udpsink | |
parent | f54f9f977ec2143028c6068ac43bff5da4bea2ee (diff) |
generic: Update for subclassing API changes
Diffstat (limited to 'generic/threadshare/src/udpsink')
-rw-r--r-- | generic/threadshare/src/udpsink/imp.rs | 1445 | ||||
-rw-r--r-- | generic/threadshare/src/udpsink/mod.rs | 39 |
2 files changed, 1484 insertions, 0 deletions
diff --git a/generic/threadshare/src/udpsink/imp.rs b/generic/threadshare/src/udpsink/imp.rs new file mode 100644 index 000000000..064d9f6c3 --- /dev/null +++ b/generic/threadshare/src/udpsink/imp.rs @@ -0,0 +1,1445 @@ +// Copyright (C) 2019 Mathieu Duponchelle <mathieu@centricular.com> +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Library General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Library General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this library; if not, write to the +// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, +// Boston, MA 02110-1335, USA. + +use futures::channel::mpsc; +use futures::future::BoxFuture; +use futures::lock::Mutex; +use futures::prelude::*; + +use glib::glib_object_subclass; +use glib::prelude::*; +use glib::subclass; +use glib::subclass::prelude::*; + +use gst::prelude::*; +use gst::subclass::prelude::*; +use gst::EventView; +use gst::{ + gst_debug, gst_element_error, gst_error, gst_error_msg, gst_info, gst_log, gst_trace, + gst_warning, +}; + +use lazy_static::lazy_static; + +use crate::runtime::prelude::*; +use crate::runtime::{self, Context, PadSink, PadSinkRef, Task}; +use crate::socket::{wrap_socket, GioSocketWrapper}; + +use std::convert::TryInto; +use std::mem; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::string::ToString; +use std::sync::Mutex as StdMutex; +use std::sync::{Arc, RwLock}; +use std::time::Duration; +use std::u16; +use std::u8; + +const DEFAULT_HOST: Option<&str> = Some("127.0.0.1"); +const DEFAULT_PORT: i32 = 5004; +const DEFAULT_SYNC: bool = true; +const DEFAULT_BIND_ADDRESS: &str = "0.0.0.0"; +const DEFAULT_BIND_PORT: i32 = 0; +const DEFAULT_BIND_ADDRESS_V6: &str = "::"; +const DEFAULT_BIND_PORT_V6: i32 = 0; +const DEFAULT_SOCKET: Option<GioSocketWrapper> = None; +const DEFAULT_USED_SOCKET: Option<GioSocketWrapper> = None; +const DEFAULT_SOCKET_V6: Option<GioSocketWrapper> = None; +const DEFAULT_USED_SOCKET_V6: Option<GioSocketWrapper> = None; +const DEFAULT_AUTO_MULTICAST: bool = true; +const DEFAULT_LOOP: bool = true; +const DEFAULT_TTL: u32 = 64; +const DEFAULT_TTL_MC: u32 = 1; +const DEFAULT_QOS_DSCP: i32 = -1; +const DEFAULT_CLIENTS: &str = ""; +const DEFAULT_CONTEXT: &str = ""; +const DEFAULT_CONTEXT_WAIT: u32 = 0; + +#[derive(Debug, Clone)] +struct Settings { + sync: bool, + bind_address: String, + bind_port: i32, + bind_address_v6: String, + bind_port_v6: i32, + socket: Option<GioSocketWrapper>, + used_socket: Option<GioSocketWrapper>, + socket_v6: Option<GioSocketWrapper>, + used_socket_v6: Option<GioSocketWrapper>, + auto_multicast: bool, + multicast_loop: bool, + ttl: u32, + ttl_mc: u32, + qos_dscp: i32, + context: String, + context_wait: u32, +} + +impl Default for Settings { + fn default() -> Self { + Settings { + sync: DEFAULT_SYNC, + bind_address: DEFAULT_BIND_ADDRESS.into(), + bind_port: DEFAULT_BIND_PORT, + bind_address_v6: DEFAULT_BIND_ADDRESS_V6.into(), + bind_port_v6: DEFAULT_BIND_PORT_V6, + socket: DEFAULT_SOCKET, + used_socket: DEFAULT_USED_SOCKET, + socket_v6: DEFAULT_SOCKET_V6, + used_socket_v6: DEFAULT_USED_SOCKET_V6, + auto_multicast: DEFAULT_AUTO_MULTICAST, + multicast_loop: DEFAULT_LOOP, + ttl: DEFAULT_TTL, + ttl_mc: DEFAULT_TTL_MC, + qos_dscp: DEFAULT_QOS_DSCP, + context: DEFAULT_CONTEXT.into(), + context_wait: DEFAULT_CONTEXT_WAIT, + } + } +} + +lazy_static! { + static ref CAT: gst::DebugCategory = gst::DebugCategory::new( + "ts-udpsink", + gst::DebugColorFlags::empty(), + Some("Thread-sharing UDP sink"), + ); +} + +static PROPERTIES: [subclass::Property; 17] = [ + subclass::Property("sync", |name| { + glib::ParamSpec::boolean( + name, + "Sync", + "Sync on the clock", + DEFAULT_SYNC, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("bind-address", |name| { + glib::ParamSpec::string( + name, + "Bind Address", + "Address to bind the socket to", + Some(DEFAULT_BIND_ADDRESS), + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("bind-port", |name| { + glib::ParamSpec::int( + name, + "Bind Port", + "Port to bind the socket to", + 0, + u16::MAX as i32, + DEFAULT_BIND_PORT, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("bind-address-v6", |name| { + glib::ParamSpec::string( + name, + "Bind Address V6", + "Address to bind the V6 socket to", + Some(DEFAULT_BIND_ADDRESS_V6), + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("bind-port-v6", |name| { + glib::ParamSpec::int( + name, + "Bind Port", + "Port to bind the V6 socket to", + 0, + u16::MAX as i32, + DEFAULT_BIND_PORT_V6, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("socket", |name| { + glib::ParamSpec::object( + name, + "Socket", + "Socket to use for UDP transmission. (None == allocate)", + gio::Socket::static_type(), + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("used-socket", |name| { + glib::ParamSpec::object( + name, + "Used Socket", + "Socket currently in use for UDP transmission. (None = no socket)", + gio::Socket::static_type(), + glib::ParamFlags::READABLE, + ) + }), + subclass::Property("socket-v6", |name| { + glib::ParamSpec::object( + name, + "Socket V6", + "IPV6 Socket to use for UDP transmission. (None == allocate)", + gio::Socket::static_type(), + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("used-socket-v6", |name| { + glib::ParamSpec::object( + name, + "Used Socket V6", + "V6 Socket currently in use for UDP transmission. (None = no socket)", + gio::Socket::static_type(), + glib::ParamFlags::READABLE, + ) + }), + subclass::Property("auto-multicast", |name| { + glib::ParamSpec::boolean( + name, + "Auto multicast", + "Automatically join/leave the multicast groups, FALSE means user has to do it himself", + DEFAULT_AUTO_MULTICAST, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("loop", |name| { + glib::ParamSpec::boolean( + name, + "Loop", + "Set the multicast loop parameter.", + DEFAULT_LOOP, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("ttl", |name| { + glib::ParamSpec::uint( + name, + "Time To Live", + "Used for setting the unicast TTL parameter", + 0, + u8::MAX as u32, + DEFAULT_TTL, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("ttl-mc", |name| { + glib::ParamSpec::uint( + name, + "Time To Live Multicast", + "Used for setting the multicast TTL parameter", + 0, + u8::MAX as u32, + DEFAULT_TTL_MC, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("qos-dscp", |name| { + glib::ParamSpec::int( + name, + "QoS DSCP", + "Quality of Service, differentiated services code point (-1 default)", + -1, + 63, + DEFAULT_QOS_DSCP, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("clients", |name| { + glib::ParamSpec::string( + name, + "Clients", + "A comma separated list of host:port pairs with destinations", + Some(DEFAULT_CLIENTS), + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("context", |name| { + glib::ParamSpec::string( + name, + "Context", + "Context name to share threads with", + Some(DEFAULT_CONTEXT), + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("context-wait", |name| { + glib::ParamSpec::uint( + name, + "Context Wait", + "Throttle poll loop to run at most once every this many ms", + 0, + 1000, + DEFAULT_CONTEXT_WAIT, + glib::ParamFlags::READWRITE, + ) + }), +]; + +#[derive(Debug)] +enum TaskItem { + Buffer(gst::Buffer), + Event(gst::Event), +} + +#[derive(Debug)] +struct UdpSinkPadHandlerInner { + sync: bool, + segment: Option<gst::Segment>, + latency: gst::ClockTime, + socket: Arc<Mutex<Option<tokio::net::UdpSocket>>>, + socket_v6: Arc<Mutex<Option<tokio::net::UdpSocket>>>, + clients: Arc<Vec<SocketAddr>>, + clients_to_configure: Vec<SocketAddr>, + clients_to_unconfigure: Vec<SocketAddr>, + sender: Arc<Mutex<Option<mpsc::Sender<TaskItem>>>>, + settings: Arc<StdMutex<Settings>>, +} + +impl UdpSinkPadHandlerInner { + fn new(settings: Arc<StdMutex<Settings>>) -> Self { + UdpSinkPadHandlerInner { + sync: DEFAULT_SYNC, + segment: None, + latency: gst::CLOCK_TIME_NONE, + socket: Arc::new(Mutex::new(None)), + socket_v6: Arc::new(Mutex::new(None)), + clients: Arc::new(vec![SocketAddr::new( + DEFAULT_HOST.unwrap().parse().unwrap(), + DEFAULT_PORT as u16, + )]), + clients_to_configure: vec![], + clients_to_unconfigure: vec![], + sender: Arc::new(Mutex::new(None)), + settings, + } + } + + fn clear_clients( + &mut self, + gst_pad: &gst::Pad, + clients_to_add: impl Iterator<Item = SocketAddr>, + ) { + let old_clients = mem::replace(&mut *Arc::make_mut(&mut self.clients), vec![]); + + self.clients_to_configure = vec![]; + self.clients_to_unconfigure = vec![]; + + for addr in clients_to_add { + if !old_clients.contains(&addr) { + self.clients_to_unconfigure.push(addr); + } + self.add_client(gst_pad, addr); + } + } + + fn remove_client(&mut self, gst_pad: &gst::Pad, addr: SocketAddr) { + if !self.clients.contains(&addr) { + gst_warning!(CAT, obj: gst_pad, "Not removing unknown client {:?}", &addr); + return; + } + + gst_info!(CAT, obj: gst_pad, "Removing client {:?}", addr); + + Arc::make_mut(&mut self.clients).retain(|addr2| addr != *addr2); + + self.clients_to_unconfigure.push(addr); + self.clients_to_configure.retain(|addr2| addr != *addr2); + } + + fn add_client(&mut self, gst_pad: &gst::Pad, addr: SocketAddr) { + if self.clients.contains(&addr) { + gst_warning!(CAT, obj: gst_pad, "Not adding client {:?} again", &addr); + return; + } + + gst_info!(CAT, obj: gst_pad, "Adding client {:?}", addr); + + Arc::make_mut(&mut self.clients).push(addr); + + self.clients_to_configure.push(addr); + self.clients_to_unconfigure.retain(|addr2| addr != *addr2); + } +} + +#[derive(Debug)] +enum SocketQualified { + Ipv4(tokio::net::UdpSocket), + Ipv6(tokio::net::UdpSocket), +} + +#[derive(Clone, Debug)] +struct UdpSinkPadHandler(Arc<RwLock<UdpSinkPadHandlerInner>>); + +impl UdpSinkPadHandler { + fn new(settings: Arc<StdMutex<Settings>>) -> UdpSinkPadHandler { + Self(Arc::new(RwLock::new(UdpSinkPadHandlerInner::new(settings)))) + } + + fn set_latency(&self, latency: gst::ClockTime) { + self.0.write().unwrap().latency = latency; + } + + fn prepare(&self) { + let mut inner = self.0.write().unwrap(); + inner.clients_to_configure = inner.clients.to_vec(); + } + + fn prepare_socket(&self, socket: SocketQualified) { + let mut inner = self.0.write().unwrap(); + + match socket { + SocketQualified::Ipv4(socket) => inner.socket = Arc::new(Mutex::new(Some(socket))), + SocketQualified::Ipv6(socket) => inner.socket_v6 = Arc::new(Mutex::new(Some(socket))), + } + } + + fn unprepare(&self) { + let mut inner = self.0.write().unwrap(); + *inner = UdpSinkPadHandlerInner::new(Arc::clone(&inner.settings)) + } + + fn clear_clients(&self, gst_pad: &gst::Pad, clients_to_add: impl Iterator<Item = SocketAddr>) { + self.0 + .write() + .unwrap() + .clear_clients(gst_pad, clients_to_add); + } + + fn remove_client(&self, gst_pad: &gst::Pad, addr: SocketAddr) { + self.0.write().unwrap().remove_client(gst_pad, addr); + } + + fn add_client(&self, gst_pad: &gst::Pad, addr: SocketAddr) { + self.0.write().unwrap().add_client(gst_pad, addr); + } + + fn get_clients(&self) -> Vec<SocketAddr> { + (*self.0.read().unwrap().clients).clone() + } + + fn configure_client( + &self, + settings: &Settings, + socket: &mut Option<tokio::net::UdpSocket>, + socket_v6: &mut Option<tokio::net::UdpSocket>, + client: &SocketAddr, + ) -> Result<(), gst::ErrorMessage> { + if client.ip().is_multicast() { + match client.ip() { + IpAddr::V4(addr) => { + if let Some(socket) = socket.as_mut() { + if settings.auto_multicast { + socket + .join_multicast_v4(addr, Ipv4Addr::new(0, 0, 0, 0)) + .map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to join multicast group: {}", err] + ) + })?; + } + if settings.multicast_loop { + socket.set_multicast_loop_v4(true).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to set multicast loop: {}", err] + ) + })?; + } + socket + .set_multicast_ttl_v4(settings.ttl_mc) + .map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to set multicast ttl: {}", err] + ) + })?; + } + } + IpAddr::V6(addr) => { + if let Some(socket) = socket_v6.as_mut() { + if settings.auto_multicast { + socket.join_multicast_v6(&addr, 0).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to join multicast group: {}", err] + ) + })?; + } + if settings.multicast_loop { + socket.set_multicast_loop_v6(true).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to set multicast loop: {}", err] + ) + })?; + } + /* FIXME no API for set_multicast_ttl_v6 ? */ + } + } + } + } else { + match client.ip() { + IpAddr::V4(_) => { + if let Some(socket) = socket.as_mut() { + socket.set_ttl(settings.ttl).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to set unicast ttl: {}", err] + ) + })?; + } + } + IpAddr::V6(_) => { + if let Some(socket) = socket_v6.as_mut() { + socket.set_ttl(settings.ttl).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to set unicast ttl: {}", err] + ) + })?; + } + } + } + } + + Ok(()) + } + + fn unconfigure_client( + &self, + settings: &Settings, + socket: &mut Option<tokio::net::UdpSocket>, + socket_v6: &mut Option<tokio::net::UdpSocket>, + client: &SocketAddr, + ) -> Result<(), gst::ErrorMessage> { + if client.ip().is_multicast() { + match client.ip() { + IpAddr::V4(addr) => { + if let Some(socket) = socket.as_mut() { + if settings.auto_multicast { + socket + .leave_multicast_v4(addr, Ipv4Addr::new(0, 0, 0, 0)) + .map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to join multicast group: {}", err] + ) + })?; + } + } + } + IpAddr::V6(addr) => { + if let Some(socket) = socket_v6.as_mut() { + if settings.auto_multicast { + socket.leave_multicast_v6(&addr, 0).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to join multicast group: {}", err] + ) + })?; + } + } + } + } + } + + Ok(()) + } + + async fn render( + &self, + element: &super::UdpSink, + buffer: gst::Buffer, + ) -> Result<gst::FlowSuccess, gst::FlowError> { + let ( + do_sync, + rtime, + clients, + clients_to_configure, + clients_to_unconfigure, + socket, + socket_v6, + settings, + ) = { + let mut inner = self.0.write().unwrap(); + let do_sync = inner.sync; + let mut rtime: gst::ClockTime = 0.into(); + + if let Some(segment) = &inner.segment { + if let Some(segment) = segment.downcast_ref::<gst::format::Time>() { + rtime = segment.to_running_time(buffer.get_pts()); + if inner.latency.is_some() { + rtime += inner.latency; + } + } + } + + let clients_to_configure = mem::replace(&mut inner.clients_to_configure, vec![]); + let clients_to_unconfigure = mem::replace(&mut inner.clients_to_unconfigure, vec![]); + + let settings = inner.settings.lock().unwrap().clone(); + + ( + do_sync, + rtime, + Arc::clone(&inner.clients), + clients_to_configure, + clients_to_unconfigure, + Arc::clone(&inner.socket), + Arc::clone(&inner.socket_v6), + settings, + ) + }; + + let mut socket = socket.lock().await; + let mut socket_v6 = socket_v6.lock().await; + + if !clients_to_configure.is_empty() { + for client in &clients_to_configure { + self.configure_client(&settings, &mut socket, &mut socket_v6, &client) + .map_err(|err| { + gst_element_error!( + element, + gst::StreamError::Failed, + ["Failed to configure client {:?}: {}", client, err] + ); + + gst::FlowError::Error + })?; + } + } + + if !clients_to_unconfigure.is_empty() { + for client in &clients_to_unconfigure { + self.unconfigure_client(&settings, &mut socket, &mut socket_v6, &client) + .map_err(|err| { + gst_element_error!( + element, + gst::StreamError::Failed, + ["Failed to unconfigure client {:?}: {}", client, err] + ); + + gst::FlowError::Error + })?; + } + } + + if do_sync { + self.sync(&element, rtime).await; + } + + let data = buffer.map_readable().map_err(|_| { + gst_element_error!( + element, + gst::StreamError::Format, + ["Failed to map buffer readable"] + ); + + gst::FlowError::Error + })?; + + for client in clients.iter() { + let socket = match client.ip() { + IpAddr::V4(_) => &mut socket, + IpAddr::V6(_) => &mut socket_v6, + }; + + if let Some(socket) = socket.as_mut() { + gst_log!(CAT, obj: element, "Sending to {:?}", &client); + socket.send_to(&data, client).await.map_err(|err| { + gst_element_error!( + element, + gst::StreamError::Failed, + ("I/O error"), + ["streaming stopped, I/O error {}", err] + ); + gst::FlowError::Error + })?; + } else { + gst_element_error!( + element, + gst::StreamError::Failed, + ("I/O error"), + ["No socket available for sending to {}", client] + ); + return Err(gst::FlowError::Error); + } + } + + gst_log!( + CAT, + obj: element, + "Sent buffer {:?} to all clients", + &buffer + ); + + Ok(gst::FlowSuccess::Ok) + } + + /* Wait until specified time */ + async fn sync(&self, element: &super::UdpSink, running_time: gst::ClockTime) { + let now = element.get_current_running_time(); + + if let Some(delay) = running_time + .saturating_sub(now) + .and_then(|delay| delay.nseconds()) + { + runtime::time::delay_for(Duration::from_nanos(delay)).await; + } + } + + async fn handle_event(&self, element: &super::UdpSink, event: gst::Event) { + match event.view() { + EventView::Eos(_) => { + let _ = element.post_message(gst::message::Eos::builder().src(element).build()); + } + EventView::Segment(e) => { + self.0.write().unwrap().segment = Some(e.get_segment().clone()); + } + _ => (), + } + } +} + +impl PadSinkHandler for UdpSinkPadHandler { + type ElementImpl = UdpSink; + + fn sink_chain( + &self, + _pad: &PadSinkRef, + _udpsink: &UdpSink, + element: &gst::Element, + buffer: gst::Buffer, + ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> { + let sender = Arc::clone(&self.0.read().unwrap().sender); + let element = element.clone().downcast::<super::UdpSink>().unwrap(); + + async move { + if let Some(sender) = sender.lock().await.as_mut() { + if sender.send(TaskItem::Buffer(buffer)).await.is_err() { + gst_debug!(CAT, obj: &element, "Flushing"); + return Err(gst::FlowError::Flushing); + } + } + Ok(gst::FlowSuccess::Ok) + } + .boxed() + } + + fn sink_chain_list( + &self, + _pad: &PadSinkRef, + _udpsink: &UdpSink, + element: &gst::Element, + list: gst::BufferList, + ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> { + let sender = Arc::clone(&self.0.read().unwrap().sender); + let element = element.clone().downcast::<super::UdpSink>().unwrap(); + + async move { + if let Some(sender) = sender.lock().await.as_mut() { + for buffer in list.iter_owned() { + if sender.send(TaskItem::Buffer(buffer)).await.is_err() { + gst_debug!(CAT, obj: &element, "Flushing"); + return Err(gst::FlowError::Flushing); + } + } + } + + Ok(gst::FlowSuccess::Ok) + } + .boxed() + } + + fn sink_event_serialized( + &self, + _pad: &PadSinkRef, + _udpsink: &UdpSink, + element: &gst::Element, + event: gst::Event, + ) -> BoxFuture<'static, bool> { + let sender = Arc::clone(&self.0.read().unwrap().sender); + let element = element.clone().downcast::<super::UdpSink>().unwrap(); + + async move { + if let EventView::FlushStop(_) = event.view() { + let udpsink = UdpSink::from_instance(&element); + return udpsink.task.flush_stop().is_ok(); + } else if let Some(sender) = sender.lock().await.as_mut() { + if sender.send(TaskItem::Event(event)).await.is_err() { + gst_debug!(CAT, obj: &element, "Flushing"); + } + } + + true + } + .boxed() + } + + fn sink_event( + &self, + _pad: &PadSinkRef, + udpsink: &UdpSink, + _element: &gst::Element, + event: gst::Event, + ) -> bool { + if let EventView::FlushStart(..) = event.view() { + return udpsink.task.flush_start().is_ok(); + } + + true + } +} + +#[derive(Debug)] +struct UdpSinkTask { + element: super::UdpSink, + sink_pad_handler: UdpSinkPadHandler, + receiver: Option<mpsc::Receiver<TaskItem>>, +} + +impl UdpSinkTask { + fn new(element: &super::UdpSink, sink_pad_handler: &UdpSinkPadHandler) -> Self { + UdpSinkTask { + element: element.clone(), + sink_pad_handler: sink_pad_handler.clone(), + receiver: None, + } + } +} + +impl TaskImpl for UdpSinkTask { + fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { + async move { + gst_log!(CAT, obj: &self.element, "Starting task"); + + let (sender, receiver) = mpsc::channel(0); + + let mut sink_pad_handler = self.sink_pad_handler.0.write().unwrap(); + sink_pad_handler.sender = Arc::new(Mutex::new(Some(sender))); + + self.receiver = Some(receiver); + + gst_log!(CAT, obj: &self.element, "Task started"); + Ok(()) + } + .boxed() + } + + fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { + async move { + match self.receiver.as_mut().unwrap().next().await { + Some(TaskItem::Buffer(buffer)) => { + match self.sink_pad_handler.render(&self.element, buffer).await { + Err(err) => { + gst_element_error!( + &self.element, + gst::StreamError::Failed, + ["Failed to render item, stopping task: {}", err] + ); + + Err(gst::FlowError::Error) + } + _ => Ok(()), + } + } + Some(TaskItem::Event(event)) => { + self.sink_pad_handler + .handle_event(&self.element, event) + .await; + Ok(()) + } + None => Err(gst::FlowError::Flushing), + } + } + .boxed() + } +} + +#[derive(Debug)] +enum SocketFamily { + Ipv4, + Ipv6, +} + +#[derive(Debug)] +pub struct UdpSink { + sink_pad: PadSink, + sink_pad_handler: UdpSinkPadHandler, + task: Task, + settings: Arc<StdMutex<Settings>>, +} + +impl UdpSink { + fn prepare_socket( + &self, + family: SocketFamily, + context: &Context, + element: &super::UdpSink, + ) -> Result<(), gst::ErrorMessage> { + let mut settings = self.settings.lock().unwrap(); + + let wrapped_socket = match family { + SocketFamily::Ipv4 => &settings.socket, + SocketFamily::Ipv6 => &settings.socket_v6, + }; + + let socket_qualified: SocketQualified; + + if let Some(ref wrapped_socket) = wrapped_socket { + use std::net::UdpSocket; + + let socket: UdpSocket; + + #[cfg(unix)] + { + socket = wrapped_socket.get() + } + #[cfg(windows)] + { + socket = wrapped_socket.get() + } + + let socket = context.enter(|| { + tokio::net::UdpSocket::from_std(socket).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to setup socket for tokio: {}", err] + ) + }) + })?; + + match family { + SocketFamily::Ipv4 => { + settings.used_socket = Some(wrapped_socket.clone()); + socket_qualified = SocketQualified::Ipv4(socket); + } + SocketFamily::Ipv6 => { + settings.used_socket_v6 = Some(wrapped_socket.clone()); + socket_qualified = SocketQualified::Ipv6(socket); + } + } + } else { + let bind_addr = match family { + SocketFamily::Ipv4 => &settings.bind_address, + SocketFamily::Ipv6 => &settings.bind_address_v6, + }; + + let bind_addr: IpAddr = bind_addr.parse().map_err(|err| { + gst_error_msg!( + gst::ResourceError::Settings, + ["Invalid address '{}' set: {}", bind_addr, err] + ) + })?; + + let bind_port = match family { + SocketFamily::Ipv4 => settings.bind_port, + SocketFamily::Ipv6 => settings.bind_port_v6, + }; + + let saddr = SocketAddr::new(bind_addr, bind_port as u16); + gst_debug!(CAT, obj: element, "Binding to {:?}", saddr); + + let socket = match family { + SocketFamily::Ipv4 => socket2::Socket::new( + socket2::Domain::ipv4(), + socket2::Type::dgram(), + Some(socket2::Protocol::udp()), + ), + SocketFamily::Ipv6 => socket2::Socket::new( + socket2::Domain::ipv6(), + socket2::Type::dgram(), + Some(socket2::Protocol::udp()), + ), + }; + + let socket = match socket { + Ok(socket) => socket, + Err(err) => { + gst_warning!( + CAT, + obj: element, + "Failed to create {} socket: {}", + match family { + SocketFamily::Ipv4 => "IPv4", + SocketFamily::Ipv6 => "IPv6", + }, + err + ); + return Ok(()); + } + }; + + socket.bind(&saddr.into()).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to bind socket: {}", err] + ) + })?; + + let socket = context.enter(|| { + tokio::net::UdpSocket::from_std(socket.into()).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to setup socket for tokio: {}", err] + ) + }) + })?; + + let wrapper = wrap_socket(&socket)?; + + if settings.qos_dscp != -1 { + wrapper.set_tos(settings.qos_dscp).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to set QoS DSCP: {}", err] + ) + })?; + } + + match family { + SocketFamily::Ipv4 => { + settings.used_socket = Some(wrapper); + socket_qualified = SocketQualified::Ipv4(socket) + } + SocketFamily::Ipv6 => { + settings.used_socket_v6 = Some(wrapper); + socket_qualified = SocketQualified::Ipv6(socket) + } + } + } + + self.sink_pad_handler.prepare_socket(socket_qualified); + + Ok(()) + } + + fn prepare(&self, element: &super::UdpSink) -> Result<(), gst::ErrorMessage> { + gst_debug!(CAT, obj: element, "Preparing"); + + let context = { + let settings = self.settings.lock().unwrap(); + + Context::acquire(&settings.context, settings.context_wait).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to acquire Context: {}", err] + ) + })? + }; + + self.sink_pad_handler.prepare(); + self.prepare_socket(SocketFamily::Ipv4, &context, element)?; + self.prepare_socket(SocketFamily::Ipv6, &context, element)?; + + self.task + .prepare(UdpSinkTask::new(&element, &self.sink_pad_handler), context) + .map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenRead, + ["Error preparing Task: {:?}", err] + ) + })?; + + gst_debug!(CAT, obj: element, "Started preparing"); + + Ok(()) + } + + fn unprepare(&self, element: &super::UdpSink) { + gst_debug!(CAT, obj: element, "Unpreparing"); + + self.task.unprepare().unwrap(); + self.sink_pad_handler.unprepare(); + + gst_debug!(CAT, obj: element, "Unprepared"); + } + + fn stop(&self, element: &super::UdpSink) -> Result<(), gst::ErrorMessage> { + gst_debug!(CAT, obj: element, "Stopping"); + self.task.stop()?; + gst_debug!(CAT, obj: element, "Stopped"); + Ok(()) + } + + fn start(&self, element: &super::UdpSink) -> Result<(), gst::ErrorMessage> { + gst_debug!(CAT, obj: element, "Starting"); + self.task.start()?; + gst_debug!(CAT, obj: element, "Started"); + Ok(()) + } +} + +impl UdpSink { + fn clear_clients(&self, clients_to_add: impl Iterator<Item = SocketAddr>) { + self.sink_pad_handler + .clear_clients(&self.sink_pad.gst_pad(), clients_to_add); + } + + fn remove_client(&self, addr: SocketAddr) { + self.sink_pad_handler + .remove_client(&self.sink_pad.gst_pad(), addr); + } + + fn add_client(&self, addr: SocketAddr) { + self.sink_pad_handler + .add_client(&self.sink_pad.gst_pad(), addr); + } +} + +fn try_into_socket_addr(element: &super::UdpSink, host: &str, port: i32) -> Result<SocketAddr, ()> { + let addr: IpAddr = match host.parse() { + Err(err) => { + gst_error!(CAT, obj: element, "Failed to parse host {}: {}", host, err); + return Err(()); + } + Ok(addr) => addr, + }; + + let port: u16 = match port.try_into() { + Err(err) => { + gst_error!(CAT, obj: element, "Invalid port {}: {}", port, err); + return Err(()); + } + Ok(port) => port, + }; + + Ok(SocketAddr::new(addr, port)) +} + +impl ObjectSubclass for UdpSink { + const NAME: &'static str = "RsTsUdpSink"; + type Type = super::UdpSink; + type ParentType = gst::Element; + type Instance = gst::subclass::ElementInstanceStruct<Self>; + type Class = subclass::simple::ClassStruct<Self>; + + glib_object_subclass!(); + + fn class_init(klass: &mut Self::Class) { + klass.set_metadata( + "Thread-sharing UDP sink", + "Sink/Network", + "Thread-sharing UDP sink", + "Mathieu <mathieu@centricular.com>", + ); + + let caps = gst::Caps::new_any(); + + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &caps, + ) + .unwrap(); + klass.add_pad_template(sink_pad_template); + klass.add_signal_with_class_handler( + "add", + glib::SignalFlags::RUN_LAST | glib::SignalFlags::ACTION, + &[String::static_type(), i32::static_type()], + glib::types::Type::Unit, + |_, args| { + let element = args[0] + .get::<super::UdpSink>() + .expect("signal arg") + .expect("missing signal arg"); + let host = args[1] + .get::<String>() + .expect("signal arg") + .expect("missing signal arg"); + let port = args[2] + .get::<i32>() + .expect("signal arg") + .expect("missing signal arg"); + + if let Ok(addr) = try_into_socket_addr(&element, &host, port) { + let udpsink = Self::from_instance(&element); + udpsink.add_client(addr); + } + + None + }, + ); + + klass.add_signal_with_class_handler( + "remove", + glib::SignalFlags::RUN_LAST | glib::SignalFlags::ACTION, + &[String::static_type(), i32::static_type()], + glib::types::Type::Unit, + |_, args| { + let element = args[0] + .get::<super::UdpSink>() + .expect("signal arg") + .expect("missing signal arg"); + let host = args[1] + .get::<String>() + .expect("signal arg") + .expect("missing signal arg"); + let port = args[2] + .get::<i32>() + .expect("signal arg") + .expect("missing signal arg"); + + let udpsink = Self::from_instance(&element); + + if let Ok(addr) = try_into_socket_addr(&element, &host, port) { + udpsink.remove_client(addr); + } + + None + }, + ); + + klass.add_signal_with_class_handler( + "clear", + glib::SignalFlags::RUN_LAST | glib::SignalFlags::ACTION, + &[], + glib::types::Type::Unit, + |_, args| { + let element = args[0] + .get::<super::UdpSink>() + .expect("signal arg") + .expect("missing signal arg"); + + let udpsink = Self::from_instance(&element); + udpsink.clear_clients(std::iter::empty()); + + None + }, + ); + + klass.install_properties(&PROPERTIES); + } + + fn with_class(klass: &Self::Class) -> Self { + let settings = Arc::new(StdMutex::new(Settings::default())); + let sink_pad_handler = UdpSinkPadHandler::new(Arc::clone(&settings)); + + Self { + sink_pad: PadSink::new( + gst::Pad::from_template(&klass.get_pad_template("sink").unwrap(), Some("sink")), + sink_pad_handler.clone(), + ), + sink_pad_handler, + task: Task::default(), + settings, + } + } +} + +impl ObjectImpl for UdpSink { + fn set_property(&self, obj: &Self::Type, id: usize, value: &glib::Value) { + let prop = &PROPERTIES[id]; + + let mut settings = self.settings.lock().unwrap(); + match *prop { + subclass::Property("sync", ..) => { + settings.sync = value.get_some().expect("type checked upstream"); + } + subclass::Property("bind-address", ..) => { + settings.bind_address = value + .get() + .expect("type checked upstream") + .unwrap_or_else(|| "".into()); + } + subclass::Property("bind-port", ..) => { + settings.bind_port = value.get_some().expect("type checked upstream"); + } + subclass::Property("bind-address-v6", ..) => { + settings.bind_address_v6 = value + .get() + .expect("type checked upstream") + .unwrap_or_else(|| "".into()); + } + subclass::Property("bind-port-v6", ..) => { + settings.bind_port_v6 = value.get_some().expect("type checked upstream"); + } + subclass::Property("socket", ..) => { + settings.socket = value + .get::<gio::Socket>() + .expect("type checked upstream") + .map(|socket| GioSocketWrapper::new(&socket)); + } + subclass::Property("used-socket", ..) => { + unreachable!(); + } + subclass::Property("socket-v6", ..) => { + settings.socket_v6 = value + .get::<gio::Socket>() + .expect("type checked upstream") + .map(|socket| GioSocketWrapper::new(&socket)); + } + subclass::Property("used-socket-v6", ..) => { + unreachable!(); + } + subclass::Property("auto-multicast", ..) => { + settings.auto_multicast = value.get_some().expect("type checked upstream"); + } + subclass::Property("loop", ..) => { + settings.multicast_loop = value.get_some().expect("type checked upstream"); + } + subclass::Property("ttl", ..) => { + settings.ttl = value.get_some().expect("type checked upstream"); + } + subclass::Property("ttl-mc", ..) => { + settings.ttl_mc = value.get_some().expect("type checked upstream"); + } + subclass::Property("qos-dscp", ..) => { + settings.qos_dscp = value.get_some().expect("type checked upstream"); + } + subclass::Property("clients", ..) => { + let clients: String = value + .get() + .expect("type checked upstream") + .unwrap_or_else(|| "".into()); + + let clients_iter = clients.split(',').filter_map(|client| { + let rsplit: Vec<&str> = client.rsplitn(2, ':').collect(); + + if rsplit.len() == 2 { + rsplit[0] + .parse::<i32>() + .map_err(|err| { + gst_error!(CAT, obj: obj, "Invalid port {}: {}", rsplit[0], err); + }) + .and_then(|port| try_into_socket_addr(&obj, rsplit[1], port)) + .ok() + } else { + None + } + }); + drop(settings); + + self.clear_clients(clients_iter); + } + subclass::Property("context", ..) => { + settings.context = value + .get() + .expect("type checked upstream") + .unwrap_or_else(|| "".into()); + } + subclass::Property("context-wait", ..) => { + settings.context_wait = value.get_some().expect("type checked upstream"); + } + _ => unimplemented!(), + } + } + + fn get_property(&self, _obj: &Self::Type, id: usize) -> Result<glib::Value, ()> { + let prop = &PROPERTIES[id]; + + let settings = self.settings.lock().unwrap(); + match *prop { + subclass::Property("sync", ..) => Ok(settings.sync.to_value()), + subclass::Property("bind-address", ..) => Ok(settings.bind_address.to_value()), + subclass::Property("bind-port", ..) => Ok(settings.bind_port.to_value()), + subclass::Property("bind-address-v6", ..) => Ok(settings.bind_address_v6.to_value()), + subclass::Property("bind-port-v6", ..) => Ok(settings.bind_port_v6.to_value()), + subclass::Property("socket", ..) => Ok(settings + .socket + .as_ref() + .map(GioSocketWrapper::as_socket) + .to_value()), + subclass::Property("used-socket", ..) => Ok(settings + .used_socket + .as_ref() + .map(GioSocketWrapper::as_socket) + .to_value()), + subclass::Property("socket-v6", ..) => Ok(settings + .socket_v6 + .as_ref() + .map(GioSocketWrapper::as_socket) + .to_value()), + subclass::Property("used-socket-v6", ..) => Ok(settings + .used_socket_v6 + .as_ref() + .map(GioSocketWrapper::as_socket) + .to_value()), + subclass::Property("auto-multicast", ..) => Ok(settings.sync.to_value()), + subclass::Property("loop", ..) => Ok(settings.multicast_loop.to_value()), + subclass::Property("ttl", ..) => Ok(settings.ttl.to_value()), + subclass::Property("ttl-mc", ..) => Ok(settings.ttl_mc.to_value()), + subclass::Property("qos-dscp", ..) => Ok(settings.qos_dscp.to_value()), + subclass::Property("clients", ..) => { + drop(settings); + + let clients: Vec<String> = self + .sink_pad_handler + .get_clients() + .iter() + .map(ToString::to_string) + .collect(); + + Ok(clients.join(",").to_value()) + } + subclass::Property("context", ..) => Ok(settings.context.to_value()), + subclass::Property("context-wait", ..) => Ok(settings.context_wait.to_value()), + _ => unimplemented!(), + } + } + + fn constructed(&self, obj: &Self::Type) { + self.parent_constructed(obj); + + obj.add_pad(self.sink_pad.gst_pad()).unwrap(); + + crate::set_element_flags(obj, gst::ElementFlags::SINK); + } +} + +impl ElementImpl for UdpSink { + fn change_state( + &self, + element: &Self::Type, + transition: gst::StateChange, + ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> { + gst_trace!(CAT, obj: element, "Changing state {:?}", transition); + + match transition { + gst::StateChange::NullToReady => { + self.prepare(element).map_err(|err| { + element.post_error_message(err); + gst::StateChangeError + })?; + } + gst::StateChange::ReadyToPaused => { + self.start(element).map_err(|_| gst::StateChangeError)?; + } + gst::StateChange::PausedToReady => { + self.stop(element).map_err(|_| gst::StateChangeError)?; + } + gst::StateChange::ReadyToNull => { + self.unprepare(element); + } + _ => (), + } + + self.parent_change_state(element, transition) + } + + fn send_event(&self, _element: &Self::Type, event: gst::Event) -> bool { + match event.view() { + EventView::Latency(ev) => { + self.sink_pad_handler.set_latency(ev.get_latency()); + self.sink_pad.gst_pad().push_event(event) + } + EventView::Step(..) => false, + _ => self.sink_pad.gst_pad().push_event(event), + } + } +} diff --git a/generic/threadshare/src/udpsink/mod.rs b/generic/threadshare/src/udpsink/mod.rs new file mode 100644 index 000000000..da0729e93 --- /dev/null +++ b/generic/threadshare/src/udpsink/mod.rs @@ -0,0 +1,39 @@ +// Copyright (C) 2019 Mathieu Duponchelle <mathieu@centricular.com> +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Library General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Library General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this library; if not, write to the +// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, +// Boston, MA 02110-1335, USA. + +use glib::glib_wrapper; +use glib::prelude::*; + +mod imp; + +glib_wrapper! { + pub struct UdpSink(ObjectSubclass<imp::UdpSink>) @extends gst::Element, gst::Object; +} + +// GStreamer elements need to be thread-safe. For the private implementation this is automatically +// enforced but for the public wrapper type we need to specify this manually. +unsafe impl Send for UdpSink {} +unsafe impl Sync for UdpSink {} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "ts-udpsink", + gst::Rank::None, + UdpSink::static_type(), + ) +} |