Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrançois Laignel <fengalin@free.fr>2022-06-24 12:48:04 +0300
committerSebastian Dröge <slomo@coaxion.net>2022-07-09 20:03:21 +0300
commita1b89c1fb96ca84899e599aad4d81c5ccc4d293a (patch)
treefc828ee2c94145b185e24470a291dee21c605fbc /generic/threadshare/src/udpsrc
parent885d3de7bbca3abe76eae38619b97523959ece35 (diff)
ts/udpsrc: reduce sync primitives in async hot path
- Moved UdpSrcPadHandlerState and related funtions to UdpSrcTask. - Moved Socket preparation in UdpSrcTask. No longer need for Context::enter.
Diffstat (limited to 'generic/threadshare/src/udpsrc')
-rw-r--r--generic/threadshare/src/udpsrc/imp.rs707
1 files changed, 324 insertions, 383 deletions
diff --git a/generic/threadshare/src/udpsrc/imp.rs b/generic/threadshare/src/udpsrc/imp.rs
index cd5cf5fe1..df887616d 100644
--- a/generic/threadshare/src/udpsrc/imp.rs
+++ b/generic/threadshare/src/udpsrc/imp.rs
@@ -18,7 +18,6 @@
// SPDX-License-Identifier: LGPL-2.1-or-later
use futures::future::BoxFuture;
-use futures::lock::Mutex as FutMutex;
use futures::prelude::*;
use gst::glib;
@@ -31,13 +30,12 @@ use once_cell::sync::Lazy;
use std::i32;
use std::io;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket};
-use std::sync::Arc;
-use std::sync::Mutex as StdMutex;
+use std::sync::Mutex;
use std::time::Duration;
use std::u16;
use crate::runtime::prelude::*;
-use crate::runtime::{Async, Context, PadSrc, PadSrcRef, PadSrcWeak, Task};
+use crate::runtime::{Async, Context, PadSrc, PadSrcRef, Task};
use crate::socket::{wrap_socket, GioSocketWrapper, Socket, SocketError, SocketRead};
@@ -109,91 +107,8 @@ impl SocketRead for UdpReader {
}
}
-#[derive(Debug)]
-struct UdpSrcPadHandlerState {
- retrieve_sender_address: bool,
- need_initial_events: bool,
- need_segment: bool,
- caps: Option<gst::Caps>,
-}
-
-impl Default for UdpSrcPadHandlerState {
- fn default() -> Self {
- UdpSrcPadHandlerState {
- retrieve_sender_address: true,
- need_initial_events: true,
- need_segment: true,
- caps: None,
- }
- }
-}
-
-#[derive(Debug, Default)]
-struct UdpSrcPadHandlerInner {
- state: FutMutex<UdpSrcPadHandlerState>,
- configured_caps: StdMutex<Option<gst::Caps>>,
-}
-
-#[derive(Clone, Debug, Default)]
-struct UdpSrcPadHandler(Arc<UdpSrcPadHandlerInner>);
-
-impl UdpSrcPadHandler {
- fn prepare(&self, caps: Option<gst::Caps>, retrieve_sender_address: bool) {
- let mut state = self.0.state.try_lock().expect("State locked elsewhere");
-
- state.caps = caps;
- state.retrieve_sender_address = retrieve_sender_address;
- }
-
- async fn reset_state(&self) {
- *self.0.state.lock().await = Default::default();
- }
-
- async fn set_need_segment(&self) {
- self.0.state.lock().await.need_segment = true;
- }
-
- async fn push_prelude(&self, pad: &PadSrcRef<'_>, _element: &super::UdpSrc) {
- let mut state = self.0.state.lock().await;
- if state.need_initial_events {
- gst::debug!(CAT, obj: pad.gst_pad(), "Pushing initial events");
-
- let stream_id = format!("{:08x}{:08x}", rand::random::<u32>(), rand::random::<u32>());
- let stream_start_evt = gst::event::StreamStart::builder(&stream_id)
- .group_id(gst::GroupId::next())
- .build();
- pad.push_event(stream_start_evt).await;
-
- if let Some(ref caps) = state.caps {
- pad.push_event(gst::event::Caps::new(caps)).await;
- *self.0.configured_caps.lock().unwrap() = Some(caps.clone());
- }
-
- state.need_initial_events = false;
- }
-
- if state.need_segment {
- let segment_evt =
- gst::event::Segment::new(&gst::FormattedSegment::<gst::format::Time>::new());
- pad.push_event(segment_evt).await;
-
- state.need_segment = false;
- }
- }
-
- async fn push_buffer(
- &self,
- pad: &PadSrcRef<'_>,
- element: &super::UdpSrc,
- buffer: gst::Buffer,
- ) -> Result<gst::FlowSuccess, gst::FlowError> {
- gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", buffer);
-
- self.push_prelude(pad, element).await;
-
- pad.push(buffer).await
- }
-}
+#[derive(Clone, Debug)]
+struct UdpSrcPadHandler;
impl PadSrcHandler for UdpSrcPadHandler {
type ElementImpl = UdpSrc;
@@ -229,7 +144,7 @@ impl PadSrcHandler for UdpSrcPadHandler {
fn src_query(
&self,
pad: &PadSrcRef,
- _udpsrc: &UdpSrc,
+ udpsrc: &UdpSrc,
_element: &gst::Element,
query: &mut gst::QueryRef,
) -> bool {
@@ -248,7 +163,7 @@ impl PadSrcHandler for UdpSrcPadHandler {
true
}
QueryViewMut::Caps(q) => {
- let caps = if let Some(caps) = self.0.configured_caps.lock().unwrap().as_ref() {
+ let caps = if let Some(caps) = udpsrc.configured_caps.lock().unwrap().as_ref() {
q.filter()
.map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First))
.unwrap_or_else(|| caps.clone())
@@ -277,32 +192,296 @@ impl PadSrcHandler for UdpSrcPadHandler {
struct UdpSrcTask {
element: super::UdpSrc,
- src_pad: PadSrcWeak,
- src_pad_handler: UdpSrcPadHandler,
- socket: Socket<UdpReader>,
+ socket: Option<Socket<UdpReader>>,
+ retrieve_sender_address: bool,
+ need_initial_events: bool,
+ need_segment: bool,
}
impl UdpSrcTask {
- fn new(
- element: &super::UdpSrc,
- src_pad: &PadSrc,
- src_pad_handler: &UdpSrcPadHandler,
- socket: Socket<UdpReader>,
- ) -> Self {
+ fn new(element: super::UdpSrc) -> Self {
UdpSrcTask {
- element: element.clone(),
- src_pad: src_pad.downgrade(),
- src_pad_handler: src_pad_handler.clone(),
- socket,
+ element,
+ socket: None,
+ retrieve_sender_address: DEFAULT_RETRIEVE_SENDER_ADDRESS,
+ need_initial_events: true,
+ need_segment: true,
+ }
+ }
+
+ async fn push_buffer(
+ &mut self,
+ buffer: gst::Buffer,
+ ) -> Result<gst::FlowSuccess, gst::FlowError> {
+ gst::log!(CAT, obj: &self.element, "Handling {:?}", buffer);
+ let udpsrc = self.element.imp();
+
+ if self.need_initial_events {
+ gst::debug!(CAT, obj: &self.element, "Pushing initial events");
+
+ let stream_id = format!("{:08x}{:08x}", rand::random::<u32>(), rand::random::<u32>());
+ let stream_start_evt = gst::event::StreamStart::builder(&stream_id)
+ .group_id(gst::GroupId::next())
+ .build();
+ udpsrc.src_pad.push_event(stream_start_evt).await;
+
+ let caps = udpsrc.settings.lock().unwrap().caps.clone();
+ if let Some(caps) = caps {
+ udpsrc
+ .src_pad
+ .push_event(gst::event::Caps::new(&caps))
+ .await;
+ *udpsrc.configured_caps.lock().unwrap() = Some(caps);
+ }
+
+ self.need_initial_events = false;
+ }
+
+ if self.need_segment {
+ let segment_evt =
+ gst::event::Segment::new(&gst::FormattedSegment::<gst::format::Time>::new());
+ udpsrc.src_pad.push_event(segment_evt).await;
+
+ self.need_segment = false;
}
+
+ let res = udpsrc.src_pad.push(buffer).await;
+ match res {
+ Ok(_) => gst::log!(CAT, obj: &self.element, "Successfully pushed buffer"),
+ Err(gst::FlowError::Flushing) => gst::debug!(CAT, obj: &self.element, "Flushing"),
+ Err(gst::FlowError::Eos) => {
+ gst::debug!(CAT, obj: &self.element, "EOS");
+ udpsrc.src_pad.push_event(gst::event::Eos::new()).await;
+ }
+ Err(err) => {
+ gst::error!(CAT, obj: &self.element, "Got error {}", err);
+ gst::element_error!(
+ self.element,
+ gst::StreamError::Failed,
+ ("Internal data stream error"),
+ ["streaming stopped, reason {}", err]
+ );
+ }
+ }
+
+ res
}
}
impl TaskImpl for UdpSrcTask {
+ fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
+ async move {
+ let udpsrc = self.element.imp();
+ let mut settings = udpsrc.settings.lock().unwrap();
+
+ gst::debug!(CAT, obj: &self.element, "Preparing Task");
+
+ self.retrieve_sender_address = settings.retrieve_sender_address;
+
+ let socket = if let Some(ref wrapped_socket) = settings.socket {
+ let socket: UdpSocket;
+
+ #[cfg(unix)]
+ {
+ socket = wrapped_socket.get()
+ }
+ #[cfg(windows)]
+ {
+ socket = wrapped_socket.get()
+ }
+
+ let socket = Async::<UdpSocket>::try_from(socket).map_err(|err| {
+ gst::error_msg!(
+ gst::ResourceError::OpenRead,
+ ["Failed to setup Async socket: {}", err]
+ )
+ })?;
+
+ settings.used_socket = Some(wrapped_socket.clone());
+
+ socket
+ } else {
+ let addr: IpAddr = match settings.address {
+ None => {
+ return Err(gst::error_msg!(
+ gst::ResourceError::Settings,
+ ["No address set"]
+ ));
+ }
+ Some(ref addr) => match addr.parse() {
+ Err(err) => {
+ return Err(gst::error_msg!(
+ gst::ResourceError::Settings,
+ ["Invalid address '{}' set: {}", addr, err]
+ ));
+ }
+ Ok(addr) => addr,
+ },
+ };
+ let port = settings.port;
+
+ // TODO: TTL, multicast loopback, etc
+ let saddr = if addr.is_multicast() {
+ let bind_addr = if addr.is_ipv4() {
+ IpAddr::V4(Ipv4Addr::UNSPECIFIED)
+ } else {
+ IpAddr::V6(Ipv6Addr::UNSPECIFIED)
+ };
+
+ let saddr = SocketAddr::new(bind_addr, port as u16);
+ gst::debug!(
+ CAT,
+ obj: &self.element,
+ "Binding to {:?} for multicast group {:?}",
+ saddr,
+ addr
+ );
+
+ saddr
+ } else {
+ let saddr = SocketAddr::new(addr, port as u16);
+ gst::debug!(CAT, obj: &self.element, "Binding to {:?}", saddr);
+
+ saddr
+ };
+
+ let socket = if addr.is_ipv4() {
+ socket2::Socket::new(
+ socket2::Domain::IPV4,
+ socket2::Type::DGRAM,
+ Some(socket2::Protocol::UDP),
+ )
+ } else {
+ socket2::Socket::new(
+ socket2::Domain::IPV6,
+ socket2::Type::DGRAM,
+ Some(socket2::Protocol::UDP),
+ )
+ }
+ .map_err(|err| {
+ gst::error_msg!(
+ gst::ResourceError::OpenRead,
+ ["Failed to create socket: {}", err]
+ )
+ })?;
+
+ socket.set_reuse_address(settings.reuse).map_err(|err| {
+ gst::error_msg!(
+ gst::ResourceError::OpenRead,
+ ["Failed to set reuse_address: {}", err]
+ )
+ })?;
+
+ #[cfg(unix)]
+ {
+ socket.set_reuse_port(settings.reuse).map_err(|err| {
+ gst::error_msg!(
+ gst::ResourceError::OpenRead,
+ ["Failed to set reuse_port: {}", err]
+ )
+ })?;
+ }
+
+ socket.bind(&saddr.into()).map_err(|err| {
+ gst::error_msg!(
+ gst::ResourceError::OpenRead,
+ ["Failed to bind socket: {}", err]
+ )
+ })?;
+
+ let socket = Async::<UdpSocket>::try_from(socket).map_err(|err| {
+ gst::error_msg!(
+ gst::ResourceError::OpenRead,
+ ["Failed to setup Async socket: {}", err]
+ )
+ })?;
+
+ if addr.is_multicast() {
+ // TODO: Multicast interface configuration, going to be tricky
+ match addr {
+ IpAddr::V4(addr) => {
+ socket
+ .as_ref()
+ .join_multicast_v4(&addr, &Ipv4Addr::new(0, 0, 0, 0))
+ .map_err(|err| {
+ gst::error_msg!(
+ gst::ResourceError::OpenRead,
+ ["Failed to join multicast group: {}", err]
+ )
+ })?;
+ }
+ IpAddr::V6(addr) => {
+ socket.as_ref().join_multicast_v6(&addr, 0).map_err(|err| {
+ gst::error_msg!(
+ gst::ResourceError::OpenRead,
+ ["Failed to join multicast group: {}", err]
+ )
+ })?;
+ }
+ }
+ }
+
+ settings.used_socket = Some(wrap_socket(&socket)?);
+
+ socket
+ };
+
+ let port: i32 = socket.as_ref().local_addr().unwrap().port().into();
+ if settings.port != port {
+ settings.port = port;
+ drop(settings);
+ self.element.notify("port");
+
+ settings = udpsrc.settings.lock().unwrap();
+ };
+
+ let buffer_pool = gst::BufferPool::new();
+ let mut config = buffer_pool.config();
+ config.set_params(None, settings.mtu, 0, 0);
+ buffer_pool.set_config(config).map_err(|err| {
+ gst::error_msg!(
+ gst::ResourceError::Settings,
+ ["Failed to configure buffer pool {:?}", err]
+ )
+ })?;
+
+ self.socket = Some(
+ Socket::try_new(
+ self.element.clone().upcast(),
+ buffer_pool,
+ UdpReader::new(socket),
+ )
+ .map_err(|err| {
+ gst::error_msg!(
+ gst::ResourceError::OpenRead,
+ ["Failed to prepare socket {:?}", err]
+ )
+ })?,
+ );
+
+ self.element.notify("used-socket");
+
+ Ok(())
+ }
+ .boxed()
+ }
+
+ fn unprepare(&mut self) -> BoxFuture<'_, ()> {
+ async move {
+ gst::debug!(CAT, obj: &self.element, "Unpreparing Task");
+ let udpsrc = self.element.imp();
+ udpsrc.settings.lock().unwrap().used_socket = None;
+ self.element.notify("used-socket");
+ }
+ .boxed()
+ }
+
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::log!(CAT, obj: &self.element, "Starting task");
self.socket
+ .as_mut()
+ .unwrap()
.set_clock(self.element.clock(), self.element.base_time());
gst::log!(CAT, obj: &self.element, "Task started");
Ok(())
@@ -312,47 +491,36 @@ impl TaskImpl for UdpSrcTask {
fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
- let item = self.socket.next().await;
-
- let (mut buffer, saddr) = match item {
- Some(Ok((buffer, saddr))) => (buffer, saddr),
- Some(Err(err)) => {
- gst::error!(CAT, obj: &self.element, "Got error {:?}", err);
- match err {
- SocketError::Gst(err) => {
- gst::element_error!(
- self.element,
- gst::StreamError::Failed,
- ("Internal data stream error"),
- ["streaming stopped, reason {}", err]
- );
- }
- SocketError::Io(err) => {
- gst::element_error!(
- self.element,
- gst::StreamError::Failed,
- ("I/O error"),
- ["streaming stopped, I/O error {}", err]
- );
- }
+ let item = self.socket.as_mut().unwrap().next().await.ok_or_else(|| {
+ gst::log!(CAT, obj: &self.element, "SocketStream Stopped");
+ gst::FlowError::Flushing
+ })?;
+
+ let (mut buffer, saddr) = item.map_err(|err| {
+ gst::error!(CAT, obj: &self.element, "Got error {:?}", err);
+ match err {
+ SocketError::Gst(err) => {
+ gst::element_error!(
+ self.element,
+ gst::StreamError::Failed,
+ ("Internal data stream error"),
+ ["streaming stopped, reason {}", err]
+ );
+ }
+ SocketError::Io(err) => {
+ gst::element_error!(
+ self.element,
+ gst::StreamError::Failed,
+ ("I/O error"),
+ ["streaming stopped, I/O error {}", err]
+ );
}
- return Err(gst::FlowError::Error);
- }
- None => {
- gst::log!(CAT, obj: &self.element, "SocketStream Stopped");
- return Err(gst::FlowError::Flushing);
}
- };
+ gst::FlowError::Error
+ })?;
if let Some(saddr) = saddr {
- if self
- .src_pad_handler
- .0
- .state
- .lock()
- .await
- .retrieve_sender_address
- {
+ if self.retrieve_sender_address {
NetAddressMeta::add(
buffer.get_mut().unwrap(),
&gio::InetSocketAddress::from(saddr),
@@ -360,30 +528,7 @@ impl TaskImpl for UdpSrcTask {
}
}
- let pad = self.src_pad.upgrade().expect("PadSrc no longer exists");
- let res = self
- .src_pad_handler
- .push_buffer(&pad, &self.element, buffer)
- .await;
- match res {
- Ok(_) => gst::log!(CAT, obj: &self.element, "Successfully pushed buffer"),
- Err(gst::FlowError::Flushing) => gst::debug!(CAT, obj: &self.element, "Flushing"),
- Err(gst::FlowError::Eos) => {
- gst::debug!(CAT, obj: &self.element, "EOS");
- pad.push_event(gst::event::Eos::new()).await;
- }
- Err(err) => {
- gst::error!(CAT, obj: &self.element, "Got error {}", err);
- gst::element_error!(
- self.element,
- gst::StreamError::Failed,
- ("Internal data stream error"),
- ["streaming stopped, reason {}", err]
- );
- }
- }
-
- res.map(drop)
+ self.push_buffer(buffer).await.map(drop)
}
.boxed()
}
@@ -391,7 +536,8 @@ impl TaskImpl for UdpSrcTask {
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::log!(CAT, obj: &self.element, "Stopping task");
- self.src_pad_handler.reset_state().await;
+ self.need_initial_events = true;
+ self.need_segment = true;
gst::log!(CAT, obj: &self.element, "Task stopped");
Ok(())
}
@@ -401,7 +547,7 @@ impl TaskImpl for UdpSrcTask {
fn flush_stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::log!(CAT, obj: &self.element, "Stopping task flush");
- self.src_pad_handler.set_need_segment().await;
+ self.need_segment = true;
gst::log!(CAT, obj: &self.element, "Stopped task flush");
Ok(())
}
@@ -411,9 +557,9 @@ impl TaskImpl for UdpSrcTask {
pub struct UdpSrc {
src_pad: PadSrc,
- src_pad_handler: UdpSrcPadHandler,
task: Task,
- settings: StdMutex<Settings>,
+ configured_caps: Mutex<Option<gst::Caps>>,
+ settings: Mutex<Settings>,
}
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
@@ -426,219 +572,21 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
impl UdpSrc {
fn prepare(&self, element: &super::UdpSrc) -> Result<(), gst::ErrorMessage> {
- let mut settings_guard = self.settings.lock().unwrap();
-
gst::debug!(CAT, obj: element, "Preparing");
- let context = Context::acquire(&settings_guard.context, settings_guard.context_wait)
- .map_err(|err| {
+ let settings = self.settings.lock().unwrap();
+ let context =
+ Context::acquire(&settings.context, settings.context_wait).map_err(|err| {
gst::error_msg!(
gst::ResourceError::OpenRead,
["Failed to acquire Context: {}", err]
)
})?;
+ drop(settings);
- let socket = if let Some(ref wrapped_socket) = settings_guard.socket {
- let socket: UdpSocket;
-
- #[cfg(unix)]
- {
- socket = wrapped_socket.get()
- }
- #[cfg(windows)]
- {
- socket = wrapped_socket.get()
- }
-
- let socket = context.enter(|| {
- Async::<UdpSocket>::try_from(socket).map_err(|err| {
- gst::error_msg!(
- gst::ResourceError::OpenRead,
- ["Failed to setup Async socket: {}", err]
- )
- })
- })?;
-
- settings_guard.used_socket = Some(wrapped_socket.clone());
-
- socket
- } else {
- let addr: IpAddr = match settings_guard.address {
- None => {
- return Err(gst::error_msg!(
- gst::ResourceError::Settings,
- ["No address set"]
- ));
- }
- Some(ref addr) => match addr.parse() {
- Err(err) => {
- return Err(gst::error_msg!(
- gst::ResourceError::Settings,
- ["Invalid address '{}' set: {}", addr, err]
- ));
- }
- Ok(addr) => addr,
- },
- };
- let port = settings_guard.port;
-
- // TODO: TTL, multicast loopback, etc
- let saddr = if addr.is_multicast() {
- let bind_addr = if addr.is_ipv4() {
- IpAddr::V4(Ipv4Addr::UNSPECIFIED)
- } else {
- IpAddr::V6(Ipv6Addr::UNSPECIFIED)
- };
-
- let saddr = SocketAddr::new(bind_addr, port as u16);
- gst::debug!(
- CAT,
- obj: element,
- "Binding to {:?} for multicast group {:?}",
- saddr,
- addr
- );
-
- saddr
- } else {
- let saddr = SocketAddr::new(addr, port as u16);
- gst::debug!(CAT, obj: element, "Binding to {:?}", saddr);
-
- saddr
- };
-
- let socket = if addr.is_ipv4() {
- socket2::Socket::new(
- socket2::Domain::IPV4,
- socket2::Type::DGRAM,
- Some(socket2::Protocol::UDP),
- )
- } else {
- socket2::Socket::new(
- socket2::Domain::IPV6,
- socket2::Type::DGRAM,
- Some(socket2::Protocol::UDP),
- )
- }
- .map_err(|err| {
- gst::error_msg!(
- gst::ResourceError::OpenRead,
- ["Failed to create socket: {}", err]
- )
- })?;
-
- socket
- .set_reuse_address(settings_guard.reuse)
- .map_err(|err| {
- gst::error_msg!(
- gst::ResourceError::OpenRead,
- ["Failed to set reuse_address: {}", err]
- )
- })?;
-
- #[cfg(unix)]
- {
- socket.set_reuse_port(settings_guard.reuse).map_err(|err| {
- gst::error_msg!(
- gst::ResourceError::OpenRead,
- ["Failed to set reuse_port: {}", err]
- )
- })?;
- }
-
- socket.bind(&saddr.into()).map_err(|err| {
- gst::error_msg!(
- gst::ResourceError::OpenRead,
- ["Failed to bind socket: {}", err]
- )
- })?;
-
- let socket = context.enter(|| {
- Async::<UdpSocket>::try_from(socket).map_err(|err| {
- gst::error_msg!(
- gst::ResourceError::OpenRead,
- ["Failed to setup Async socket: {}", err]
- )
- })
- })?;
-
- if addr.is_multicast() {
- // TODO: Multicast interface configuration, going to be tricky
- match addr {
- IpAddr::V4(addr) => {
- socket
- .as_ref()
- .join_multicast_v4(&addr, &Ipv4Addr::new(0, 0, 0, 0))
- .map_err(|err| {
- gst::error_msg!(
- gst::ResourceError::OpenRead,
- ["Failed to join multicast group: {}", err]
- )
- })?;
- }
- IpAddr::V6(addr) => {
- socket.as_ref().join_multicast_v6(&addr, 0).map_err(|err| {
- gst::error_msg!(
- gst::ResourceError::OpenRead,
- ["Failed to join multicast group: {}", err]
- )
- })?;
- }
- }
- }
-
- settings_guard.used_socket = Some(wrap_socket(&socket)?);
-
- socket
- };
-
- let port: i32 = socket.as_ref().local_addr().unwrap().port().into();
- let settings = if settings_guard.port != port {
- settings_guard.port = port;
- let settings = settings_guard.clone();
- drop(settings_guard);
- element.notify("port");
-
- settings
- } else {
- let settings = settings_guard.clone();
- drop(settings_guard);
-
- settings
- };
-
- let buffer_pool = gst::BufferPool::new();
- let mut config = buffer_pool.config();
- config.set_params(None, settings.mtu, 0, 0);
- buffer_pool.set_config(config).map_err(|err| {
- gst::error_msg!(
- gst::ResourceError::Settings,
- ["Failed to configure buffer pool {:?}", err]
- )
- })?;
-
- let socket = Socket::try_new(
- element.clone().upcast(),
- buffer_pool,
- UdpReader::new(socket),
- )
- .map_err(|err| {
- gst::error_msg!(
- gst::ResourceError::OpenRead,
- ["Failed to prepare socket {:?}", err]
- )
- })?;
-
- element.notify("used-socket");
-
- self.src_pad_handler
- .prepare(settings.caps, settings.retrieve_sender_address);
-
+ *self.configured_caps.lock().unwrap() = None;
self.task
- .prepare(
- UdpSrcTask::new(element, &self.src_pad, &self.src_pad_handler, socket),
- context,
- )
+ .prepare(UdpSrcTask::new(element.clone()), context)
.map_err(|err| {
gst::error_msg!(
gst::ResourceError::OpenRead,
@@ -653,12 +601,7 @@ impl UdpSrc {
fn unprepare(&self, element: &super::UdpSrc) {
gst::debug!(CAT, obj: element, "Unpreparing");
-
- self.settings.lock().unwrap().used_socket = None;
- element.notify("used-socket");
-
self.task.unprepare().unwrap();
-
gst::debug!(CAT, obj: element, "Unprepared");
}
@@ -691,16 +634,14 @@ impl ObjectSubclass for UdpSrc {
type ParentType = gst::Element;
fn with_class(klass: &Self::Class) -> Self {
- let src_pad_handler = UdpSrcPadHandler::default();
-
Self {
src_pad: PadSrc::new(
gst::Pad::from_template(&klass.pad_template("src").unwrap(), Some("src")),
- src_pad_handler.clone(),
+ UdpSrcPadHandler,
),
- src_pad_handler,
task: Task::default(),
- settings: StdMutex::new(Settings::default()),
+ configured_caps: Default::default(),
+ settings: Default::default(),
}
}
}