Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrançois Laignel <fengalin@free.fr>2022-11-04 16:11:44 +0300
committerSebastian Dröge <slomo@coaxion.net>2022-11-09 10:55:04 +0300
commite1afa43aa3d4f11d9a513966b0f0175eaab3924a (patch)
tree7ed047a763198d7fa47166943b6bac22844c8832 /generic/threadshare/src
parent29a490f6dc7b792df7ab45f6a79cbfbee694d332 (diff)
ts/udpsink: handle items in the PadSinkHandler
... instead of forwarding them to a Task via a channel. This improves CPU usage by 5% according to `udpsrc-benchmark-sender` with the `tuning` feature using default audio test buffers and 400 streams on the same ts-context. It is expected to improve latency significantly. This is inferred from `ts-standalone`: latency shrinks from around 5ms to 1.5µs using the `task` sink compared to the `async-mutex` sink. The async Mutex is mandatory here as we need to hold the lock across await points.
Diffstat (limited to 'generic/threadshare/src')
-rw-r--r--generic/threadshare/src/udpsink/imp.rs1088
1 files changed, 505 insertions, 583 deletions
diff --git a/generic/threadshare/src/udpsink/imp.rs b/generic/threadshare/src/udpsink/imp.rs
index cfb0763f3..770532356 100644
--- a/generic/threadshare/src/udpsink/imp.rs
+++ b/generic/threadshare/src/udpsink/imp.rs
@@ -19,7 +19,6 @@
use futures::future::BoxFuture;
use futures::prelude::*;
-use futures::stream::Peekable;
use gst::glib;
use gst::prelude::*;
@@ -29,15 +28,14 @@ use gst::{element_error, error_msg};
use once_cell::sync::Lazy;
+use crate::runtime::executor::block_on_or_add_sub_task;
use crate::runtime::prelude::*;
-use crate::runtime::{self, Async, Context, PadSink, Task};
+use crate::runtime::{self, Async, Context, PadSink};
use crate::socket::{wrap_socket, GioSocketWrapper};
use std::collections::BTreeSet;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
-use std::pin::Pin;
-use std::sync::Mutex;
-use std::task::Poll;
+use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::u16;
use std::u8;
@@ -62,6 +60,25 @@ const DEFAULT_CLIENTS: &str = "";
const DEFAULT_CONTEXT: &str = "";
const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO;
+#[derive(Debug, Clone, Copy)]
+struct SocketConf {
+ auto_multicast: bool,
+ multicast_loop: bool,
+ ttl: u32,
+ ttl_mc: u32,
+}
+
+impl Default for SocketConf {
+ fn default() -> Self {
+ SocketConf {
+ auto_multicast: DEFAULT_AUTO_MULTICAST,
+ multicast_loop: DEFAULT_LOOP,
+ ttl: DEFAULT_TTL,
+ ttl_mc: DEFAULT_TTL_MC,
+ }
+ }
+}
+
#[derive(Debug, Clone)]
struct Settings {
sync: bool,
@@ -73,15 +90,10 @@ struct Settings {
used_socket: Option<GioSocketWrapper>,
socket_v6: Option<GioSocketWrapper>,
used_socket_v6: Option<GioSocketWrapper>,
- auto_multicast: bool,
- multicast_loop: bool,
- ttl: u32,
- ttl_mc: u32,
+ socket_conf: SocketConf,
qos_dscp: i32,
context: String,
context_wait: Duration,
- clients: BTreeSet<SocketAddr>,
- latency: Option<gst::ClockTime>,
}
impl Default for Settings {
@@ -96,18 +108,10 @@ impl Default for Settings {
used_socket: DEFAULT_USED_SOCKET,
socket_v6: DEFAULT_SOCKET_V6,
used_socket_v6: DEFAULT_USED_SOCKET_V6,
- auto_multicast: DEFAULT_AUTO_MULTICAST,
- multicast_loop: DEFAULT_LOOP,
- ttl: DEFAULT_TTL,
- ttl_mc: DEFAULT_TTL_MC,
+ socket_conf: SocketConf::default(),
qos_dscp: DEFAULT_QOS_DSCP,
context: DEFAULT_CONTEXT.into(),
context_wait: DEFAULT_CONTEXT_WAIT,
- clients: BTreeSet::from([SocketAddr::new(
- DEFAULT_HOST.unwrap().parse().unwrap(),
- DEFAULT_PORT as u16,
- )]),
- latency: None,
}
}
}
@@ -120,14 +124,166 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
)
});
-#[derive(Debug)]
-enum TaskItem {
- Buffer(gst::Buffer),
- Event(gst::Event),
-}
+#[derive(Clone, Debug, Default)]
+struct UdpSinkPadHandler(Arc<futures::lock::Mutex<UdpSinkPadHandlerInner>>);
+
+impl UdpSinkPadHandler {
+ fn prepare(
+ &self,
+ _imp: &UdpSink,
+ socket: Option<Async<UdpSocket>>,
+ socket_v6: Option<Async<UdpSocket>>,
+ settings: &Settings,
+ ) -> Result<(), gst::ErrorMessage> {
+ futures::executor::block_on(async move {
+ let mut inner = self.0.lock().await;
+
+ inner.sync = settings.sync;
+ inner.socket_conf = settings.socket_conf;
+ inner.socket = socket;
+ inner.socket_v6 = socket_v6;
+
+ for addr in inner.clients.iter() {
+ inner.configure_client(addr)?;
+ }
+
+ Ok(())
+ })
+ }
+
+ fn unprepare(&self) {
+ futures::executor::block_on(async move {
+ let mut inner = self.0.lock().await;
+
+ for addr in inner.clients.iter() {
+ let _ = inner.unconfigure_client(addr);
+ }
+
+ inner.socket = None;
+ inner.socket_v6 = None;
+ })
+ }
+
+ fn start(&self) {
+ futures::executor::block_on(async move {
+ self.0.lock().await.is_flushing = false;
+ })
+ }
+
+ fn stop(&self) {
+ futures::executor::block_on(async move {
+ self.0.lock().await.is_flushing = true;
+ })
+ }
+
+ fn set_sync(&self, sync: bool) {
+ futures::executor::block_on(async move {
+ self.0.lock().await.sync = sync;
+ })
+ }
+
+ fn set_latency(&self, latency: Option<gst::ClockTime>) {
+ futures::executor::block_on(async move {
+ self.0.lock().await.latency = latency;
+ })
+ }
+
+ fn set_socket_conf(&self, socket_conf: SocketConf) {
+ futures::executor::block_on(async move {
+ self.0.lock().await.socket_conf = socket_conf;
+ })
+ }
+
+ fn clients(&self) -> BTreeSet<SocketAddr> {
+ futures::executor::block_on(async move { self.0.lock().await.clients.clone() })
+ }
+
+ fn add_client(&self, imp: &UdpSink, addr: SocketAddr) {
+ futures::executor::block_on(async move {
+ let mut inner = self.0.lock().await;
+ if inner.clients.contains(&addr) {
+ gst::warning!(CAT, imp: imp, "Not adding client {addr:?} again");
+ return;
+ }
+
+ match inner.configure_client(&addr) {
+ Ok(()) => {
+ gst::info!(CAT, imp: imp, "Added client {addr:?}");
+ inner.clients.insert(addr);
+ }
+ Err(err) => {
+ gst::error!(CAT, imp: imp, "Failed to add client {addr:?}: {err}");
+ imp.obj().post_error_message(err);
+ }
+ }
+ })
+ }
+
+ fn remove_client(&self, imp: &UdpSink, addr: SocketAddr) {
+ futures::executor::block_on(async move {
+ let mut inner = self.0.lock().await;
+ if inner.clients.take(&addr).is_none() {
+ gst::warning!(CAT, imp: imp, "Not removing unknown client {addr:?}");
+ return;
+ }
-#[derive(Clone, Debug)]
-struct UdpSinkPadHandler;
+ match inner.unconfigure_client(&addr) {
+ Ok(()) => {
+ gst::info!(CAT, imp: imp, "Removed client {addr:?}");
+ }
+ Err(err) => {
+ gst::error!(CAT, imp: imp, "Failed to remove client {addr:?}: {err}");
+ imp.obj().post_error_message(err);
+ }
+ }
+ })
+ }
+
+ fn replace_clients(&self, imp: &UdpSink, mut new_clients: BTreeSet<SocketAddr>) {
+ futures::executor::block_on(async move {
+ let mut inner = self.0.lock().await;
+ if new_clients.is_empty() {
+ gst::info!(CAT, imp: imp, "Clearing clients");
+ } else {
+ gst::info!(CAT, imp: imp, "Replacing clients");
+ }
+
+ let old_clients = std::mem::take(&mut inner.clients);
+
+ let mut res = Ok(());
+
+ for addr in old_clients.iter() {
+ if new_clients.take(addr).is_some() {
+ // client is already configured
+ inner.clients.insert(*addr);
+ } else if let Err(err) = inner.unconfigure_client(addr) {
+ gst::error!(CAT, imp: imp, "Failed to remove client {addr:?}: {err}");
+ res = Err(err);
+ } else {
+ gst::info!(CAT, imp: imp, "Removed client {addr:?}");
+ }
+ }
+
+ for addr in new_clients.into_iter() {
+ if let Err(err) = inner.configure_client(&addr) {
+ gst::error!(CAT, imp: imp, "Failed to add client {addr:?}: {err}");
+ res = Err(err);
+ } else {
+ gst::info!(CAT, imp: imp, "Added client {addr:?}");
+ inner.clients.insert(addr);
+ }
+ }
+
+ // FIXME: which error handling:
+ // - If at least one client could be configured, should we keep going? (current)
+ // - or, should we consider the preparation failed when the first client
+ // configuration fails? (previously)
+ if let Err(err) = res {
+ imp.obj().post_error_message(err);
+ }
+ })
+ }
+}
impl PadSinkHandler for UdpSinkPadHandler {
type ElementImpl = UdpSink;
@@ -138,16 +294,7 @@ impl PadSinkHandler for UdpSinkPadHandler {
elem: super::UdpSink,
buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
- let sender = elem.imp().clone_item_sender();
- async move {
- if sender.send_async(TaskItem::Buffer(buffer)).await.is_err() {
- gst::debug!(CAT, obj: elem, "Flushing");
- return Err(gst::FlowError::Flushing);
- }
-
- Ok(gst::FlowSuccess::Ok)
- }
- .boxed()
+ async move { self.0.lock().await.handle_buffer(&elem, buffer).await }.boxed()
}
fn sink_chain_list(
@@ -156,13 +303,10 @@ impl PadSinkHandler for UdpSinkPadHandler {
elem: super::UdpSink,
list: gst::BufferList,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
- let sender = elem.imp().clone_item_sender();
async move {
+ let mut inner = self.0.lock().await;
for buffer in list.iter_owned() {
- if sender.send_async(TaskItem::Buffer(buffer)).await.is_err() {
- gst::debug!(CAT, obj: elem, "Flushing");
- return Err(gst::FlowError::Flushing);
- }
+ inner.handle_buffer(&elem, buffer).await?;
}
Ok(gst::FlowSuccess::Ok)
@@ -176,13 +320,23 @@ impl PadSinkHandler for UdpSinkPadHandler {
elem: super::UdpSink,
event: gst::Event,
) -> BoxFuture<'static, bool> {
- let sender = elem.imp().clone_item_sender();
async move {
- if let EventView::FlushStop(_) = event.view() {
- let imp = elem.imp();
- return imp.task.flush_stop().await_maybe_on_context().is_ok();
- } else if sender.send_async(TaskItem::Event(event)).await.is_err() {
- gst::debug!(CAT, obj: elem, "Flushing");
+ gst::debug!(CAT, obj: elem, "Handling {event:?}");
+
+ match event.view() {
+ EventView::Eos(_) => {
+ let _ = elem.post_message(gst::message::Eos::builder().src(&elem).build());
+ }
+ EventView::Segment(e) => {
+ self.0.lock().await.segment = Some(e.segment().clone());
+ }
+ EventView::FlushStop(_) => {
+ self.0.lock().await.is_flushing = false;
+ }
+ EventView::SinkMessage(e) => {
+ let _ = elem.post_message(e.message());
+ }
+ _ => (),
}
true
@@ -191,8 +345,12 @@ impl PadSinkHandler for UdpSinkPadHandler {
}
fn sink_event(self, _pad: &gst::Pad, imp: &UdpSink, event: gst::Event) -> bool {
+ gst::debug!(CAT, imp: imp, "Handling {event:?}");
+
if let EventView::FlushStart(..) = event.view() {
- return imp.task.flush_start().await_maybe_on_context().is_ok();
+ block_on_or_add_sub_task(async move {
+ self.0.lock().await.is_flushing = true;
+ });
}
true
@@ -200,278 +358,43 @@ impl PadSinkHandler for UdpSinkPadHandler {
}
#[derive(Debug)]
-enum Command {
- AddClient(SocketAddr),
- RemoveClient(SocketAddr),
- ReplaceWithClients(BTreeSet<SocketAddr>),
- SetLatency(Option<gst::ClockTime>),
- SetSync(bool),
-}
-
-struct UdpSinkTask {
- element: super::UdpSink,
- item_receiver: Peekable<flume::r#async::RecvStream<'static, TaskItem>>,
- cmd_receiver: flume::Receiver<Command>,
- clients: BTreeSet<SocketAddr>,
- socket: Option<Async<UdpSocket>>,
- socket_v6: Option<Async<UdpSocket>>,
+struct UdpSinkPadHandlerInner {
+ is_flushing: bool,
sync: bool,
latency: Option<gst::ClockTime>,
+ socket: Option<Async<UdpSocket>>,
+ socket_v6: Option<Async<UdpSocket>>,
+ clients: BTreeSet<SocketAddr>,
+ socket_conf: SocketConf,
segment: Option<gst::Segment>,
}
-impl UdpSinkTask {
- fn new(
- element: &super::UdpSink,
- item_receiver: flume::Receiver<TaskItem>,
- cmd_receiver: flume::Receiver<Command>,
- ) -> Self {
- UdpSinkTask {
- element: element.clone(),
- item_receiver: item_receiver.into_stream().peekable(),
- cmd_receiver,
- clients: Default::default(),
+impl Default for UdpSinkPadHandlerInner {
+ fn default() -> Self {
+ Self {
+ is_flushing: true,
+ sync: DEFAULT_SYNC,
+ latency: None,
socket: None,
socket_v6: None,
- sync: false,
- latency: None,
+ clients: BTreeSet::from([SocketAddr::new(
+ DEFAULT_HOST.unwrap().parse().unwrap(),
+ DEFAULT_PORT as u16,
+ )]),
+ socket_conf: Default::default(),
segment: None,
}
}
-
- async fn flush(&mut self) {
- // Purge the channel
- while let Poll::Ready(Some(_item)) = futures::poll!(self.item_receiver.next()) {}
- }
-
- fn process_command(&mut self, cmd: Command) {
- use Command::*;
- match cmd {
- AddClient(client) => self.add_client(client),
- RemoveClient(client) => self.remove_client(&client),
- ReplaceWithClients(clients) => self.replace_with_clients(clients),
- SetSync(sync) => self.sync = sync,
- SetLatency(latency) => self.latency = latency,
- }
- }
}
/// Socket configuration.
-impl UdpSinkTask {
- fn prepare_socket(
- &self,
- settings: &mut Settings,
- family: SocketFamily,
- ) -> Result<Option<Async<UdpSocket>>, gst::ErrorMessage> {
- let wrapped_socket = match family {
- SocketFamily::Ipv4 => &settings.socket,
- SocketFamily::Ipv6 => &settings.socket_v6,
- };
-
- if let Some(ref wrapped_socket) = wrapped_socket {
- let socket: UdpSocket = wrapped_socket.get();
- let socket = Async::<UdpSocket>::try_from(socket).map_err(|err| {
- error_msg!(
- gst::ResourceError::OpenWrite,
- ["Failed to setup Async socket: {}", err]
- )
- })?;
-
- match family {
- SocketFamily::Ipv4 => {
- settings.used_socket = Some(wrapped_socket.clone());
- }
- SocketFamily::Ipv6 => {
- settings.used_socket_v6 = Some(wrapped_socket.clone());
- }
- }
-
- Ok(Some(socket))
- } else {
- let bind_addr = match family {
- SocketFamily::Ipv4 => &settings.bind_address,
- SocketFamily::Ipv6 => &settings.bind_address_v6,
- };
-
- let bind_addr: IpAddr = bind_addr.parse().map_err(|err| {
- error_msg!(
- gst::ResourceError::Settings,
- ["Invalid address '{}' set: {}", bind_addr, err]
- )
- })?;
-
- let bind_port = match family {
- SocketFamily::Ipv4 => settings.bind_port,
- SocketFamily::Ipv6 => settings.bind_port_v6,
- };
-
- let saddr = SocketAddr::new(bind_addr, bind_port as u16);
- gst::debug!(CAT, obj: self.element, "Binding to {:?}", saddr);
-
- let socket = match family {
- SocketFamily::Ipv4 => socket2::Socket::new(
- socket2::Domain::IPV4,
- socket2::Type::DGRAM,
- Some(socket2::Protocol::UDP),
- ),
- SocketFamily::Ipv6 => socket2::Socket::new(
- socket2::Domain::IPV6,
- socket2::Type::DGRAM,
- Some(socket2::Protocol::UDP),
- ),
- };
-
- let socket = match socket {
- Ok(socket) => socket,
- Err(err) => {
- gst::warning!(
- CAT,
- obj: self.element,
- "Failed to create {} socket: {}",
- match family {
- SocketFamily::Ipv4 => "IPv4",
- SocketFamily::Ipv6 => "IPv6",
- },
- err
- );
- return Ok(None);
- }
- };
-
- socket.bind(&saddr.into()).map_err(|err| {
- error_msg!(
- gst::ResourceError::OpenWrite,
- ["Failed to bind socket: {}", err]
- )
- })?;
-
- let socket = Async::<UdpSocket>::try_from(socket).map_err(|err| {
- error_msg!(
- gst::ResourceError::OpenWrite,
- ["Failed to setup Async socket: {}", err]
- )
- })?;
-
- let wrapper = wrap_socket(&socket)?;
-
- if settings.qos_dscp != -1 {
- wrapper.set_tos(settings.qos_dscp).map_err(|err| {
- error_msg!(
- gst::ResourceError::OpenWrite,
- ["Failed to set QoS DSCP: {}", err]
- )
- })?;
- }
-
- match family {
- SocketFamily::Ipv4 => {
- settings.used_socket = Some(wrapper);
- }
- SocketFamily::Ipv6 => {
- settings.used_socket_v6 = Some(wrapper);
- }
- }
-
- Ok(Some(socket))
- }
- }
-
- fn add_client(&mut self, addr: SocketAddr) {
- if self.clients.contains(&addr) {
- gst::warning!(CAT, obj: self.element, "Not adding client {:?} again", &addr);
- return;
- }
-
- let udpsink = self.element.imp();
- let mut settings = udpsink.settings.lock().unwrap();
- match self.configure_client(&settings, &addr) {
- Ok(()) => {
- gst::info!(CAT, obj: self.element, "Added client {:?}", addr);
- self.clients.insert(addr);
- }
- Err(err) => {
- gst::error!(CAT, obj: self.element, "Failed to add client {:?}: {}", addr, err);
- settings.clients = self.clients.clone();
- self.element.post_error_message(err);
- }
- }
- }
-
- fn remove_client(&mut self, addr: &SocketAddr) {
- if self.clients.take(addr).is_none() {
- gst::warning!(CAT, obj: self.element, "Not removing unknown client {:?}", &addr);
- return;
- }
-
- let udpsink = self.element.imp();
- let mut settings = udpsink.settings.lock().unwrap();
- match self.unconfigure_client(&settings, addr) {
- Ok(()) => {
- gst::info!(CAT, obj: self.element, "Removed client {:?}", addr);
- }
- Err(err) => {
- gst::error!(CAT, obj: self.element, "Failed to remove client {:?}: {}", addr, err);
- settings.clients = self.clients.clone();
- self.element.post_error_message(err);
- }
- }
- }
-
- fn replace_with_clients(&mut self, mut clients_to_add: BTreeSet<SocketAddr>) {
- if clients_to_add.is_empty() {
- gst::info!(CAT, obj: self.element, "Clearing clients");
- } else {
- gst::info!(CAT, obj: self.element, "Replacing clients");
- }
-
- let old_clients = std::mem::take(&mut self.clients);
-
- let mut res = Ok(());
- let udpsink = self.element.imp();
- let mut settings = udpsink.settings.lock().unwrap();
-
- for addr in old_clients.iter() {
- if clients_to_add.take(addr).is_some() {
- // client is already configured
- self.clients.insert(*addr);
- } else if let Err(err) = self.unconfigure_client(&settings, addr) {
- gst::error!(CAT, obj: self.element, "Failed to remove client {:?}: {}", addr, err);
- res = Err(err);
- } else {
- gst::info!(CAT, obj: self.element, "Removed client {:?}", addr);
- }
- }
-
- for addr in clients_to_add.into_iter() {
- if let Err(err) = self.configure_client(&settings, &addr) {
- gst::error!(CAT, obj: self.element, "Failed to add client {:?}: {}", addr, err);
- res = Err(err);
- } else {
- gst::info!(CAT, obj: self.element, "Added client {:?}", addr);
- self.clients.insert(addr);
- }
- }
-
- // FIXME: which error handling:
- // - If at least one client could be configured, should we keep going? (current)
- // - or, should we consider the preparation failed when the first client
- // configuration fails? (previously)
- if let Err(err) = res {
- settings.clients = self.clients.clone();
- self.element.post_error_message(err);
- }
- }
-
- fn configure_client(
- &self,
- settings: &Settings,
- client: &SocketAddr,
- ) -> Result<(), gst::ErrorMessage> {
+impl UdpSinkPadHandlerInner {
+ fn configure_client(&self, client: &SocketAddr) -> Result<(), gst::ErrorMessage> {
if client.ip().is_multicast() {
match client.ip() {
IpAddr::V4(addr) => {
if let Some(socket) = self.socket.as_ref() {
- if settings.auto_multicast {
+ if self.socket_conf.auto_multicast {
socket
.as_ref()
.join_multicast_v4(&addr, &Ipv4Addr::new(0, 0, 0, 0))
@@ -486,7 +409,7 @@ impl UdpSinkTask {
)
})?;
}
- if settings.multicast_loop {
+ if self.socket_conf.multicast_loop {
socket.as_ref().set_multicast_loop_v4(true).map_err(|err| {
error_msg!(
gst::ResourceError::OpenWrite,
@@ -497,7 +420,7 @@ impl UdpSinkTask {
socket
.as_ref()
- .set_multicast_ttl_v4(settings.ttl_mc)
+ .set_multicast_ttl_v4(self.socket_conf.ttl_mc)
.map_err(|err| {
error_msg!(
gst::ResourceError::OpenWrite,
@@ -508,7 +431,7 @@ impl UdpSinkTask {
}
IpAddr::V6(addr) => {
if let Some(socket) = self.socket_v6.as_ref() {
- if settings.auto_multicast {
+ if self.socket_conf.auto_multicast {
socket.as_ref().join_multicast_v6(&addr, 0).map_err(|err| {
error_msg!(
gst::ResourceError::OpenWrite,
@@ -516,7 +439,7 @@ impl UdpSinkTask {
)
})?;
}
- if settings.multicast_loop {
+ if self.socket_conf.multicast_loop {
socket.as_ref().set_multicast_loop_v6(true).map_err(|err| {
error_msg!(
gst::ResourceError::OpenWrite,
@@ -532,22 +455,28 @@ impl UdpSinkTask {
match client.ip() {
IpAddr::V4(_) => {
if let Some(socket) = self.socket.as_ref() {
- socket.as_ref().set_ttl(settings.ttl).map_err(|err| {
- error_msg!(
- gst::ResourceError::OpenWrite,
- ["Failed to set unicast ttl for {:?}: {}", client, err]
- )
- })?;
+ socket
+ .as_ref()
+ .set_ttl(self.socket_conf.ttl)
+ .map_err(|err| {
+ error_msg!(
+ gst::ResourceError::OpenWrite,
+ ["Failed to set unicast ttl for {:?}: {}", client, err]
+ )
+ })?;
}
}
IpAddr::V6(_) => {
if let Some(socket) = self.socket_v6.as_ref() {
- socket.as_ref().set_ttl(settings.ttl).map_err(|err| {
- error_msg!(
- gst::ResourceError::OpenWrite,
- ["Failed to set unicast ttl for {:?}: {}", client, err]
- )
- })?;
+ socket
+ .as_ref()
+ .set_ttl(self.socket_conf.ttl)
+ .map_err(|err| {
+ error_msg!(
+ gst::ResourceError::OpenWrite,
+ ["Failed to set unicast ttl for {:?}: {}", client, err]
+ )
+ })?;
}
}
}
@@ -556,16 +485,12 @@ impl UdpSinkTask {
Ok(())
}
- fn unconfigure_client(
- &self,
- settings: &Settings,
- client: &SocketAddr,
- ) -> Result<(), gst::ErrorMessage> {
+ fn unconfigure_client(&self, client: &SocketAddr) -> Result<(), gst::ErrorMessage> {
if client.ip().is_multicast() {
match client.ip() {
IpAddr::V4(addr) => {
if let Some(socket) = self.socket.as_ref() {
- if settings.auto_multicast {
+ if self.socket_conf.auto_multicast {
socket
.as_ref()
.leave_multicast_v4(&addr, &Ipv4Addr::new(0, 0, 0, 0))
@@ -584,7 +509,7 @@ impl UdpSinkTask {
}
IpAddr::V6(addr) => {
if let Some(socket) = self.socket_v6.as_ref() {
- if settings.auto_multicast {
+ if self.socket_conf.auto_multicast {
socket
.as_ref()
.leave_multicast_v6(&addr, 0)
@@ -609,11 +534,15 @@ impl UdpSinkTask {
}
/// Buffer handling.
-impl UdpSinkTask {
- async fn render(&mut self, buffer: gst::Buffer) -> Result<(), gst::FlowError> {
+impl UdpSinkPadHandlerInner {
+ async fn render(
+ &mut self,
+ elem: &super::UdpSink,
+ buffer: gst::Buffer,
+ ) -> Result<gst::FlowSuccess, gst::FlowError> {
let data = buffer.map_readable().map_err(|_| {
- element_error!(
- self.element,
+ gst::element_error!(
+ elem,
gst::StreamError::Format,
["Failed to map buffer readable"]
);
@@ -627,10 +556,10 @@ impl UdpSinkTask {
};
if let Some(socket) = socket.as_mut() {
- gst::log!(CAT, obj: self.element, "Sending to {:?}", &client);
+ gst::log!(CAT, obj: elem, "Sending to {client:?}");
socket.send_to(&data, *client).await.map_err(|err| {
- element_error!(
- self.element,
+ gst::element_error!(
+ elem,
gst::StreamError::Failed,
("I/O error"),
["streaming stopped, I/O error {}", err]
@@ -638,8 +567,8 @@ impl UdpSinkTask {
gst::FlowError::Error
})?;
} else {
- element_error!(
- self.element,
+ gst::element_error!(
+ elem,
gst::StreamError::Failed,
("I/O error"),
["No socket available for sending to {}", client]
@@ -648,157 +577,60 @@ impl UdpSinkTask {
}
}
- gst::log!(
- CAT,
- obj: self.element,
- "Sent buffer {:?} to all clients",
- &buffer
- );
+ gst::log!(CAT, obj: elem, "Sent buffer {buffer:?} to all clients");
- Ok(())
+ Ok(gst::FlowSuccess::Ok)
}
/// Waits until specified time.
- async fn sync(&self, running_time: gst::ClockTime) {
- let now = self.element.current_running_time();
+ async fn sync(&self, elem: &super::UdpSink, running_time: gst::ClockTime) {
+ let now = elem.current_running_time();
if let Ok(Some(delay)) = running_time.opt_checked_sub(now) {
- gst::trace!(CAT, obj: self.element, "sync: waiting {}", delay);
+ gst::trace!(CAT, obj: elem, "sync: waiting {delay}");
runtime::timer::delay_for(delay.into()).await;
}
}
-}
-
-impl TaskImpl for UdpSinkTask {
- type Item = TaskItem;
- fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
- async move {
- gst::info!(CAT, obj: self.element, "Preparing Task");
- assert!(self.clients.is_empty());
- let clients = {
- let udpsink = self.element.imp();
- let mut settings = udpsink.settings.lock().unwrap();
- self.sync = settings.sync;
- self.socket = self.prepare_socket(&mut settings, SocketFamily::Ipv4)?;
- self.socket_v6 = self.prepare_socket(&mut settings, SocketFamily::Ipv6)?;
- self.latency = settings.latency;
- settings.clients.clone()
- };
-
- self.replace_with_clients(clients);
+ async fn handle_buffer(
+ &mut self,
+ elem: &super::UdpSink,
+ buffer: gst::Buffer,
+ ) -> Result<gst::FlowSuccess, gst::FlowError> {
+ if self.is_flushing {
+ gst::info!(CAT, obj: elem, "Discarding {buffer:?} (flushing)");
- Ok(())
+ return Err(gst::FlowError::Flushing);
}
- .boxed()
- }
- fn unprepare(&mut self) -> BoxFuture<'_, ()> {
- async move {
- gst::info!(CAT, obj: self.element, "Unpreparing Task");
+ if self.sync {
+ let rtime = self.segment.as_ref().and_then(|segment| {
+ segment
+ .downcast_ref::<gst::format::Time>()
+ .and_then(|segment| segment.to_running_time(buffer.pts()).opt_add(self.latency))
+ });
- let udpsink = self.element.imp();
- let settings = udpsink.settings.lock().unwrap();
- for addr in self.clients.iter() {
- let _ = self.unconfigure_client(&settings, addr);
- }
- }
- .boxed()
- }
+ if let Some(rtime) = rtime {
+ self.sync(elem, rtime).await;
- fn try_next(&mut self) -> BoxFuture<'_, Result<TaskItem, gst::FlowError>> {
- async move {
- loop {
- gst::info!(CAT, obj: self.element, "Awaiting next item or command");
- futures::select_biased! {
- cmd = self.cmd_receiver.recv_async() => {
- self.process_command(cmd.unwrap());
- }
- item_opt = Pin::new(&mut self.item_receiver).peek() => {
- // Check the peeked item in case we need to sync.
- // The item will still be available in the channel
- // in case this is cancelled by a state transition.
- match item_opt {
- Some(TaskItem::Buffer(buffer)) => {
- if self.sync {
- let rtime = self.segment.as_ref().and_then(|segment| {
- segment
- .downcast_ref::<gst::format::Time>()
- .and_then(|segment| {
- segment.to_running_time(buffer.pts()).opt_add(self.latency)
- })
- });
- if let Some(rtime) = rtime {
- // This can be cancelled by a state transition.
- self.sync(rtime).await;
- }
- }
- }
- Some(_) => (),
- None => {
- panic!("Internal channel sender dropped while Task is Started");
- }
- }
+ if self.is_flushing {
+ gst::info!(CAT, obj: elem, "Discarding {buffer:?} (flushing)");
- // An item was peeked above, we can now pop it without losing it.
- return Ok(self.item_receiver.next().await.unwrap());
- }
+ return Err(gst::FlowError::Flushing);
}
}
}
- .boxed()
- }
- fn handle_item(&mut self, item: TaskItem) -> BoxFuture<'_, Result<(), gst::FlowError>> {
- async move {
- gst::info!(CAT, obj: self.element, "Handling {:?}", item);
+ gst::debug!(CAT, obj: elem, "Handling {buffer:?}");
- match item {
- TaskItem::Buffer(buffer) => self.render(buffer).await.map_err(|err| {
- element_error!(
- &self.element,
- gst::StreamError::Failed,
- ["Failed to render item, stopping task: {}", err]
- );
- gst::FlowError::Error
- })?,
- TaskItem::Event(event) => match event.view() {
- EventView::Eos(_) => {
- let _ = self
- .element
- .post_message(gst::message::Eos::builder().src(&self.element).build());
- }
- EventView::Segment(e) => {
- self.segment = Some(e.segment().clone());
- }
- EventView::SinkMessage(e) => {
- let _ = self.element.post_message(e.message());
- }
- _ => (),
- },
- }
-
- Ok(())
- }
- .boxed()
- }
-
- fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
- async {
- gst::info!(CAT, obj: self.element, "Stopping Task");
- self.flush().await;
- Ok(())
- }
- .boxed()
- }
-
- fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
- async {
- gst::info!(CAT, obj: self.element, "Starting Task Flush");
- self.flush().await;
- Ok(())
- }
- .boxed()
+ self.render(elem, buffer).await.map_err(|err| {
+ element_error!(
+ elem,
+ gst::StreamError::Failed,
+ ["Failed to render item, stopping task: {}", err]
+ );
+ gst::FlowError::Error
+ })
}
}
@@ -811,40 +643,153 @@ enum SocketFamily {
#[derive(Debug)]
pub struct UdpSink {
sink_pad: PadSink,
- task: Task,
- item_sender: Mutex<Option<flume::Sender<TaskItem>>>,
- cmd_sender: Mutex<Option<flume::Sender<Command>>>,
+ sink_pad_handler: UdpSinkPadHandler,
settings: Mutex<Settings>,
+ ts_ctx: Mutex<Option<Context>>,
}
impl UdpSink {
- #[track_caller]
- fn clone_item_sender(&self) -> flume::Sender<TaskItem> {
- self.item_sender.lock().unwrap().as_ref().unwrap().clone()
- }
+ fn prepare_socket(
+ &self,
+ ts_ctx: &Context,
+ settings: &mut Settings,
+ family: SocketFamily,
+ ) -> Result<Option<Async<UdpSocket>>, gst::ErrorMessage> {
+ let wrapped_socket = match family {
+ SocketFamily::Ipv4 => &settings.socket,
+ SocketFamily::Ipv6 => &settings.socket_v6,
+ };
- fn prepare(&self) -> Result<(), gst::ErrorMessage> {
- gst::debug!(CAT, imp: self, "Preparing");
+ if let Some(ref wrapped_socket) = wrapped_socket {
+ let socket: UdpSocket = wrapped_socket.get();
+ let socket = ts_ctx.enter(|| {
+ Async::<UdpSocket>::try_from(socket).map_err(|err| {
+ error_msg!(
+ gst::ResourceError::OpenWrite,
+ ["Failed to setup Async socket: {}", err]
+ )
+ })
+ })?;
- let context = {
- let settings = self.settings.lock().unwrap();
+ match family {
+ SocketFamily::Ipv4 => {
+ settings.used_socket = Some(wrapped_socket.clone());
+ }
+ SocketFamily::Ipv6 => {
+ settings.used_socket_v6 = Some(wrapped_socket.clone());
+ }
+ }
+
+ Ok(Some(socket))
+ } else {
+ let bind_addr = match family {
+ SocketFamily::Ipv4 => &settings.bind_address,
+ SocketFamily::Ipv6 => &settings.bind_address_v6,
+ };
- Context::acquire(&settings.context, settings.context_wait).map_err(|err| {
+ let bind_addr: IpAddr = bind_addr.parse().map_err(|err| {
+ error_msg!(
+ gst::ResourceError::Settings,
+ ["Invalid address '{}' set: {}", bind_addr, err]
+ )
+ })?;
+
+ let bind_port = match family {
+ SocketFamily::Ipv4 => settings.bind_port,
+ SocketFamily::Ipv6 => settings.bind_port_v6,
+ };
+
+ let saddr = SocketAddr::new(bind_addr, bind_port as u16);
+ gst::debug!(CAT, imp: self, "Binding to {:?}", saddr);
+
+ let socket = match family {
+ SocketFamily::Ipv4 => socket2::Socket::new(
+ socket2::Domain::IPV4,
+ socket2::Type::DGRAM,
+ Some(socket2::Protocol::UDP),
+ ),
+ SocketFamily::Ipv6 => socket2::Socket::new(
+ socket2::Domain::IPV6,
+ socket2::Type::DGRAM,
+ Some(socket2::Protocol::UDP),
+ ),
+ };
+
+ let socket = match socket {
+ Ok(socket) => socket,
+ Err(err) => {
+ gst::warning!(
+ CAT,
+ imp: self,
+ "Failed to create {} socket: {}",
+ match family {
+ SocketFamily::Ipv4 => "IPv4",
+ SocketFamily::Ipv6 => "IPv6",
+ },
+ err
+ );
+ return Ok(None);
+ }
+ };
+
+ socket.bind(&saddr.into()).map_err(|err| {
error_msg!(
gst::ResourceError::OpenWrite,
- ["Failed to acquire Context: {}", err]
+ ["Failed to bind socket: {}", err]
)
- })?
- };
+ })?;
- // Enable backpressure for items
- let (item_sender, item_receiver) = flume::bounded(0);
- let (cmd_sender, cmd_receiver) = flume::unbounded();
- let task_impl = UdpSinkTask::new(&self.obj(), item_receiver, cmd_receiver);
- self.task.prepare(task_impl, context).block_on()?;
+ let socket = ts_ctx.enter(|| {
+ Async::<UdpSocket>::try_from(socket).map_err(|err| {
+ error_msg!(
+ gst::ResourceError::OpenWrite,
+ ["Failed to setup Async socket: {}", err]
+ )
+ })
+ })?;
- *self.item_sender.lock().unwrap() = Some(item_sender);
- *self.cmd_sender.lock().unwrap() = Some(cmd_sender);
+ let wrapper = wrap_socket(&socket)?;
+
+ if settings.qos_dscp != -1 {
+ wrapper.set_tos(settings.qos_dscp).map_err(|err| {
+ error_msg!(
+ gst::ResourceError::OpenWrite,
+ ["Failed to set QoS DSCP: {}", err]
+ )
+ })?;
+ }
+
+ match family {
+ SocketFamily::Ipv4 => {
+ settings.used_socket = Some(wrapper);
+ }
+ SocketFamily::Ipv6 => {
+ settings.used_socket_v6 = Some(wrapper);
+ }
+ }
+
+ Ok(Some(socket))
+ }
+ }
+
+ fn prepare(&self) -> Result<(), gst::ErrorMessage> {
+ gst::debug!(CAT, imp: self, "Preparing");
+
+ let mut settings = self.settings.lock().unwrap();
+
+ let ts_ctx = Context::acquire(&settings.context, settings.context_wait).map_err(|err| {
+ error_msg!(
+ gst::ResourceError::OpenWrite,
+ ["Failed to acquire Context: {}", err]
+ )
+ })?;
+
+ let socket = self.prepare_socket(&ts_ctx, &mut settings, SocketFamily::Ipv4)?;
+ let socket_v6 = self.prepare_socket(&ts_ctx, &mut settings, SocketFamily::Ipv6)?;
+
+ self.sink_pad_handler
+ .prepare(self, socket, socket_v6, &settings)?;
+ *self.ts_ctx.lock().unwrap() = Some(ts_ctx);
gst::debug!(CAT, imp: self, "Started preparation");
@@ -853,75 +798,46 @@ impl UdpSink {
fn unprepare(&self) {
gst::debug!(CAT, imp: self, "Unpreparing");
- self.task.unprepare().block_on().unwrap();
+ self.sink_pad_handler.unprepare();
+ *self.ts_ctx.lock().unwrap() = None;
gst::debug!(CAT, imp: self, "Unprepared");
}
fn stop(&self) -> Result<(), gst::ErrorMessage> {
gst::debug!(CAT, imp: self, "Stopping");
- self.task.stop().block_on()?;
+ self.sink_pad_handler.stop();
gst::debug!(CAT, imp: self, "Stopped");
Ok(())
}
fn start(&self) -> Result<(), gst::ErrorMessage> {
gst::debug!(CAT, imp: self, "Starting");
- self.task.start().block_on()?;
+ self.sink_pad_handler.start();
gst::debug!(CAT, imp: self, "Started");
Ok(())
}
- fn add_client(&self, settings: &mut Settings, client: SocketAddr) {
- settings.clients.insert(client);
- if let Some(cmd_sender) = self.cmd_sender.lock().unwrap().as_mut() {
- cmd_sender.send(Command::AddClient(client)).unwrap();
- }
- }
+ fn try_into_socket_addr(&self, host: &str, port: i32) -> Result<SocketAddr, ()> {
+ let addr: IpAddr = match host.parse() {
+ Err(err) => {
+ gst::error!(CAT, imp: self, "Failed to parse host {}: {}", host, err);
+ return Err(());
+ }
+ Ok(addr) => addr,
+ };
- fn remove_client(&self, settings: &mut Settings, client: SocketAddr) {
- settings.clients.remove(&client);
- if let Some(cmd_sender) = self.cmd_sender.lock().unwrap().as_mut() {
- cmd_sender.send(Command::RemoveClient(client)).unwrap();
- }
- }
+ let port: u16 = match port.try_into() {
+ Err(err) => {
+ gst::error!(CAT, imp: self, "Invalid port {}: {}", port, err);
+ return Err(());
+ }
+ Ok(port) => port,
+ };
- fn replace_with_clients(
- &self,
- settings: &mut Settings,
- clients: impl IntoIterator<Item = SocketAddr>,
- ) {
- let clients = BTreeSet::<SocketAddr>::from_iter(clients);
- if let Some(cmd_sender) = self.cmd_sender.lock().unwrap().as_mut() {
- settings.clients = clients.clone();
- cmd_sender
- .send(Command::ReplaceWithClients(clients))
- .unwrap();
- } else {
- settings.clients = clients;
- }
+ Ok(SocketAddr::new(addr, port))
}
}
-fn try_into_socket_addr(imp: &UdpSink, host: &str, port: i32) -> Result<SocketAddr, ()> {
- let addr: IpAddr = match host.parse() {
- Err(err) => {
- gst::error!(CAT, imp: imp, "Failed to parse host {}: {}", host, err);
- return Err(());
- }
- Ok(addr) => addr,
- };
-
- let port: u16 = match port.try_into() {
- Err(err) => {
- gst::error!(CAT, imp: imp, "Invalid port {}: {}", port, err);
- return Err(());
- }
- Ok(port) => port,
- };
-
- Ok(SocketAddr::new(addr, port))
-}
-
#[glib::object_subclass]
impl ObjectSubclass for UdpSink {
const NAME: &'static str = "GstTsUdpSink";
@@ -929,15 +845,15 @@ impl ObjectSubclass for UdpSink {
type ParentType = gst::Element;
fn with_class(klass: &Self::Class) -> Self {
+ let sink_pad_handler = UdpSinkPadHandler::default();
Self {
sink_pad: PadSink::new(
gst::Pad::from_template(&klass.pad_template("sink").unwrap(), Some("sink")),
- UdpSinkPadHandler,
+ sink_pad_handler.clone(),
),
- task: Task::default(),
- item_sender: Default::default(),
- cmd_sender: Default::default(),
+ sink_pad_handler,
settings: Default::default(),
+ ts_ctx: Default::default(),
}
}
}
@@ -1051,14 +967,13 @@ impl ObjectImpl for UdpSink {
.param_types([String::static_type(), i32::static_type()])
.action()
.class_handler(|_, args| {
- let element = args[0].get::<super::UdpSink>().expect("signal arg");
+ let elem = args[0].get::<super::UdpSink>().expect("signal arg");
let host = args[1].get::<String>().expect("signal arg");
let port = args[2].get::<i32>().expect("signal arg");
- let udpsink = element.imp();
+ let imp = elem.imp();
- if let Ok(addr) = try_into_socket_addr(udpsink, &host, port) {
- let mut settings = udpsink.settings.lock().unwrap();
- udpsink.add_client(&mut settings, addr);
+ if let Ok(addr) = imp.try_into_socket_addr(&host, port) {
+ imp.sink_pad_handler.add_client(imp, addr);
}
None
@@ -1068,14 +983,13 @@ impl ObjectImpl for UdpSink {
.param_types([String::static_type(), i32::static_type()])
.action()
.class_handler(|_, args| {
- let element = args[0].get::<super::UdpSink>().expect("signal arg");
+ let elem = args[0].get::<super::UdpSink>().expect("signal arg");
let host = args[1].get::<String>().expect("signal arg");
let port = args[2].get::<i32>().expect("signal arg");
- let udpsink = element.imp();
+ let imp = elem.imp();
- if let Ok(addr) = try_into_socket_addr(udpsink, &host, port) {
- let mut settings = udpsink.settings.lock().unwrap();
- udpsink.remove_client(&mut settings, addr);
+ if let Ok(addr) = imp.try_into_socket_addr(&host, port) {
+ imp.sink_pad_handler.remove_client(imp, addr);
}
None
@@ -1084,11 +998,10 @@ impl ObjectImpl for UdpSink {
glib::subclass::Signal::builder("clear")
.action()
.class_handler(|_, args| {
- let element = args[0].get::<super::UdpSink>().expect("signal arg");
+ let elem = args[0].get::<super::UdpSink>().expect("signal arg");
- let udpsink = element.imp();
- let mut settings = udpsink.settings.lock().unwrap();
- udpsink.replace_with_clients(&mut settings, BTreeSet::new());
+ let imp = elem.imp();
+ imp.sink_pad_handler.replace_clients(imp, BTreeSet::new());
None
})
@@ -1105,9 +1018,7 @@ impl ObjectImpl for UdpSink {
"sync" => {
let sync = value.get().expect("type checked upstream");
settings.sync = sync;
- if let Some(cmd_sender) = self.cmd_sender.lock().unwrap().as_mut() {
- cmd_sender.send(Command::SetSync(sync)).unwrap();
- }
+ self.sink_pad_handler.set_sync(sync);
}
"bind-address" => {
settings.bind_address = value
@@ -1146,16 +1057,20 @@ impl ObjectImpl for UdpSink {
unreachable!();
}
"auto-multicast" => {
- settings.auto_multicast = value.get().expect("type checked upstream");
+ settings.socket_conf.auto_multicast = value.get().expect("type checked upstream");
+ self.sink_pad_handler.set_socket_conf(settings.socket_conf);
}
"loop" => {
- settings.multicast_loop = value.get().expect("type checked upstream");
+ settings.socket_conf.multicast_loop = value.get().expect("type checked upstream");
+ self.sink_pad_handler.set_socket_conf(settings.socket_conf);
}
"ttl" => {
- settings.ttl = value.get().expect("type checked upstream");
+ settings.socket_conf.ttl = value.get().expect("type checked upstream");
+ self.sink_pad_handler.set_socket_conf(settings.socket_conf);
}
"ttl-mc" => {
- settings.ttl_mc = value.get().expect("type checked upstream");
+ settings.socket_conf.ttl_mc = value.get().expect("type checked upstream");
+ self.sink_pad_handler.set_socket_conf(settings.socket_conf);
}
"qos-dscp" => {
settings.qos_dscp = value.get().expect("type checked upstream");
@@ -1167,22 +1082,33 @@ impl ObjectImpl for UdpSink {
.unwrap_or_else(|| "".into());
let clients = clients.split(',').filter_map(|client| {
- let rsplit: Vec<&str> = client.rsplitn(2, ':').collect();
-
- if rsplit.len() == 2 {
- rsplit[0]
- .parse::<i32>()
- .map_err(|err| {
- gst::error!(CAT, imp: self, "Invalid port {}: {}", rsplit[0], err);
- })
- .and_then(|port| try_into_socket_addr(self, rsplit[1], port))
- .ok()
+ let mut splited = client.splitn(2, ':');
+ if let Some((addr, port)) = splited.next().zip(splited.next()) {
+ match port.parse::<i32>() {
+ Ok(port) => match self.try_into_socket_addr(addr, port) {
+ Ok(socket_addr) => Some(socket_addr),
+ Err(()) => {
+ gst::error!(
+ CAT,
+ imp: self,
+ "Invalid socket address {addr}:{port}"
+ );
+ None
+ }
+ },
+ Err(err) => {
+ gst::error!(CAT, imp: self, "Invalid port {err}");
+ None
+ }
+ }
} else {
+ gst::error!(CAT, imp: self, "Invalid client {client}");
None
}
});
- self.replace_with_clients(&mut settings, clients);
+ let clients = BTreeSet::from_iter(clients);
+ self.sink_pad_handler.replace_clients(self, clients);
}
"context" => {
settings.context = value
@@ -1227,14 +1153,13 @@ impl ObjectImpl for UdpSink {
.as_ref()
.map(GioSocketWrapper::as_socket)
.to_value(),
- "auto-multicast" => settings.sync.to_value(),
- "loop" => settings.multicast_loop.to_value(),
- "ttl" => settings.ttl.to_value(),
- "ttl-mc" => settings.ttl_mc.to_value(),
+ "auto-multicast" => settings.socket_conf.auto_multicast.to_value(),
+ "loop" => settings.socket_conf.multicast_loop.to_value(),
+ "ttl" => settings.socket_conf.ttl.to_value(),
+ "ttl-mc" => settings.socket_conf.ttl_mc.to_value(),
"qos-dscp" => settings.qos_dscp.to_value(),
"clients" => {
- let clients = settings.clients.clone();
- drop(settings);
+ let clients = self.sink_pad_handler.clients();
let clients: Vec<String> = clients.iter().map(ToString::to_string).collect();
clients.join(",").to_value()
@@ -1320,10 +1245,7 @@ impl ElementImpl for UdpSink {
match event.view() {
EventView::Latency(ev) => {
let latency = Some(ev.latency());
- if let Some(cmd_sender) = self.cmd_sender.lock().unwrap().as_mut() {
- cmd_sender.send(Command::SetLatency(latency)).unwrap();
- }
- self.settings.lock().unwrap().latency = latency;
+ self.sink_pad_handler.set_latency(latency);
self.sink_pad.gst_pad().push_event(event)
}
EventView::Step(..) => false,