Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrançois Laignel <fengalin@free.fr>2020-05-26 12:30:14 +0300
committerSebastian Dröge <slomo@coaxion.net>2020-05-30 11:30:27 +0300
commitdfaf59a59ba1ef0669172210f29a154a4034e4c8 (patch)
treeac4f10f3f6b0fa41f472ad5315fcafd289019c6d /generic
parent13d3fd1cc86c919a7ccff44b4da52b00d3582f31 (diff)
threadshare: simplify Pad{Src,Sink} implementations
Pad{Src,Sink}[Ref] delegate some functions to their respective Pad{Src,Sink}Inner. Since they act as smart pointers, we can safely implement the Deref trait to simplify the implementations.
Diffstat (limited to 'generic')
-rw-r--r--generic/threadshare/src/runtime/pad.rs340
1 files changed, 130 insertions, 210 deletions
diff --git a/generic/threadshare/src/runtime/pad.rs b/generic/threadshare/src/runtime/pad.rs
index 026e6f0e5..d42663ef6 100644
--- a/generic/threadshare/src/runtime/pad.rs
+++ b/generic/threadshare/src/runtime/pad.rs
@@ -42,15 +42,13 @@
//! ╰─────────────────╯ ╭─>│ │╌╌╌>│ │─╮ │
//! ╭───────╯ │ │ ╰──┰──╯ ╰──┰──╯ ╰───────╮ │
//! ╭────────────╮ ╭────────╮ push* │ ┃ ┃ ╭─────────╮
-//! │ Pad Task ↺ │<──│ PadSrc │───────╯ ┃ ┃ │ PadSink │
+//! │ Pad Task ↺ │──>│ PadSrc │───────╯ ┃ ┃ │ PadSink │
//! ╰────────────╯ ╰────────╯ ┃ ┃ ╰─────────╯
-//! ━━━━━━━━━━━━━━━━━━━━━━│━━━━━━━━━━━━━━━━━┛ ┗━━━━━━━━━━━━━━━│━━━━━━━━━━━━
-//! ╰───────────────────╮ ╭─────────────────╯
-//! ╭──────────────╮
-//! │ PadContext │
-//! │╭────────────╮│
-//! ││ Context ↺ ││
-//! ╰╰────────────╯╯
+//! ━━━━━━━│━━━━━━━━━━━━━━│━━━━━━━━━━━━━━━━━┛ ┗━━━━━━━━━━━━━━━│━━━━━━━━━━━━
+//! ╰──────────────┴───────────────────╮ ╭─────────────────╯
+//! ╭────────────╮
+//! │ Context ↺ │
+//! ╰────────────╯
//! ```
//!
//! Asynchronous operations for both [`PadSrc`] in `Element A` and [`PadSink`] in `Element B` run on
@@ -78,6 +76,7 @@ use gst::{gst_debug, gst_error, gst_fixme, gst_log, gst_loggable_error};
use gst::{FlowError, FlowSuccess};
use std::marker::PhantomData;
+use std::ops::Deref;
use std::sync::{Arc, Weak};
use super::executor::{block_on_or_add_sub_task, Context};
@@ -204,7 +203,7 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static {
}
#[derive(Debug)]
-struct PadSrcInner {
+pub struct PadSrcInner {
gst_pad: gst::Pad,
}
@@ -216,6 +215,66 @@ impl PadSrcInner {
PadSrcInner { gst_pad }
}
+
+ pub fn gst_pad(&self) -> &gst::Pad {
+ &self.gst_pad
+ }
+
+ pub async fn push(&self, buffer: gst::Buffer) -> Result<FlowSuccess, FlowError> {
+ gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Pushing {:?}", buffer);
+
+ let success = self.gst_pad.push(buffer).map_err(|err| {
+ gst_error!(RUNTIME_CAT,
+ obj: self.gst_pad(),
+ "Failed to push Buffer to PadSrc: {:?}",
+ err,
+ );
+ err
+ })?;
+
+ gst_log!(RUNTIME_CAT, obj: &self.gst_pad, "Processing any pending sub tasks");
+ while Context::current_has_sub_tasks() {
+ Context::drain_sub_tasks().await?;
+ }
+
+ Ok(success)
+ }
+
+ pub async fn push_list(&self, list: gst::BufferList) -> Result<FlowSuccess, FlowError> {
+ gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Pushing {:?}", list);
+
+ let success = self.gst_pad.push_list(list).map_err(|err| {
+ gst_error!(
+ RUNTIME_CAT,
+ obj: self.gst_pad(),
+ "Failed to push BufferList to PadSrc: {:?}",
+ err,
+ );
+ err
+ })?;
+
+ gst_log!(RUNTIME_CAT, obj: &self.gst_pad, "Processing any pending sub tasks");
+ while Context::current_has_sub_tasks() {
+ Context::drain_sub_tasks().await?;
+ }
+
+ Ok(success)
+ }
+
+ pub async fn push_event(&self, event: gst::Event) -> bool {
+ gst_log!(RUNTIME_CAT, obj: &self.gst_pad, "Pushing {:?}", event);
+
+ let was_handled = self.gst_pad().push_event(event);
+
+ gst_log!(RUNTIME_CAT, obj: &self.gst_pad, "Processing any pending sub tasks");
+ while Context::current_has_sub_tasks() {
+ if Context::drain_sub_tasks().await.is_err() {
+ return false;
+ }
+ }
+
+ was_handled
+ }
}
/// A [`PadSrc`] which can be moved in [`handler`]s functions and `Future`s.
@@ -249,51 +308,20 @@ impl PadSrcWeak {
/// [`pad` module]: index.html
#[derive(Debug)]
pub struct PadSrcRef<'a> {
- strong: PadSrcStrong,
- phantom: PhantomData<&'a PadSrcStrong>,
+ strong: Arc<PadSrcInner>,
+ phantom: PhantomData<&'a Self>,
}
impl<'a> PadSrcRef<'a> {
fn new(inner_arc: Arc<PadSrcInner>) -> Self {
PadSrcRef {
- strong: PadSrcStrong(inner_arc),
+ strong: inner_arc,
phantom: PhantomData,
}
}
- pub fn gst_pad(&self) -> &gst::Pad {
- self.strong.gst_pad()
- }
-
- ///// Spawns `future` using current [`PadContext`].
- /////
- ///// # Panics
- /////
- ///// This function panics if the `PadSrc` is not prepared.
- /////
- ///// [`PadContext`]: ../struct.PadContext.html
- //pub fn spawn<Fut>(&self, future: Fut) -> JoinHandle<Fut::Output>
- //where
- // Fut: Future + Send + 'static,
- // Fut::Output: Send + 'static,
- //{
- // self.strong.spawn(future)
- //}
-
pub fn downgrade(&self) -> PadSrcWeak {
- self.strong.downgrade()
- }
-
- pub async fn push(&self, buffer: gst::Buffer) -> Result<FlowSuccess, FlowError> {
- self.strong.push(buffer).await
- }
-
- pub async fn push_list(&self, list: gst::BufferList) -> Result<FlowSuccess, FlowError> {
- self.strong.push_list(list).await
- }
-
- pub async fn push_event(&self, event: gst::Event) -> bool {
- self.strong.push_event(event).await
+ PadSrcWeak(Arc::downgrade(&self.strong))
}
fn activate_mode_hook(
@@ -317,94 +345,11 @@ impl<'a> PadSrcRef<'a> {
}
}
-#[derive(Debug)]
-struct PadSrcStrong(Arc<PadSrcInner>);
+impl<'a> Deref for PadSrcRef<'a> {
+ type Target = PadSrcInner;
-impl PadSrcStrong {
- fn new(gst_pad: gst::Pad) -> Self {
- PadSrcStrong(Arc::new(PadSrcInner::new(gst_pad)))
- }
-
- #[inline]
- fn gst_pad(&self) -> &gst::Pad {
- &self.0.gst_pad
- }
-
- //#[inline]
- //fn spawn<Fut>(&self, future: Fut) -> JoinHandle<Fut::Output>
- //where
- // Fut: Future + Send + 'static,
- // Fut::Output: Send + 'static,
- //{
- // let pad_ctx = self.pad_context_priv();
- // pad_ctx
- // .as_ref()
- // .expect("PadContext not initialized")
- // .spawn(future)
- //}
-
- #[inline]
- fn downgrade(&self) -> PadSrcWeak {
- PadSrcWeak(Arc::downgrade(&self.0))
- }
-
- #[inline]
- async fn push(&self, buffer: gst::Buffer) -> Result<FlowSuccess, FlowError> {
- gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Pushing {:?}", buffer);
-
- let success = self.gst_pad().push(buffer).map_err(|err| {
- gst_error!(RUNTIME_CAT,
- obj: self.gst_pad(),
- "Failed to push Buffer to PadSrc: {:?}",
- err,
- );
- err
- })?;
-
- gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Processing any pending sub tasks");
- while Context::current_has_sub_tasks() {
- Context::drain_sub_tasks().await?;
- }
-
- Ok(success)
- }
-
- #[inline]
- async fn push_list(&self, list: gst::BufferList) -> Result<FlowSuccess, FlowError> {
- gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Pushing {:?}", list);
-
- let success = self.gst_pad().push_list(list).map_err(|err| {
- gst_error!(
- RUNTIME_CAT,
- obj: self.gst_pad(),
- "Failed to push BufferList to PadSrc: {:?}",
- err,
- );
- err
- })?;
-
- gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Processing any pending sub tasks");
- while Context::current_has_sub_tasks() {
- Context::drain_sub_tasks().await?;
- }
-
- Ok(success)
- }
-
- #[inline]
- async fn push_event(&self, event: gst::Event) -> bool {
- gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Pushing {:?}", event);
-
- let was_handled = self.gst_pad().push_event(event);
-
- gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Processing any pending sub tasks");
- while Context::current_has_sub_tasks() {
- if Context::drain_sub_tasks().await.is_err() {
- return false;
- }
- }
-
- was_handled
+ fn deref(&self) -> &Self::Target {
+ &self.strong
}
}
@@ -417,36 +362,33 @@ impl PadSrcStrong {
/// [`downgrade`]: struct.PadSrc.html#method.downgrade
/// [`pad` module]: index.html
#[derive(Debug)]
-pub struct PadSrc(PadSrcStrong);
+pub struct PadSrc(Arc<PadSrcInner>);
impl PadSrc {
pub fn new(gst_pad: gst::Pad, handler: impl PadSrcHandler) -> Self {
- let this = PadSrc(PadSrcStrong::new(gst_pad));
+ let this = PadSrc(Arc::new(PadSrcInner::new(gst_pad)));
this.init_pad_functions(handler);
this
}
- pub fn as_ref(&self) -> PadSrcRef<'_> {
- PadSrcRef::new(Arc::clone(&(self.0).0))
- }
-
- pub fn gst_pad(&self) -> &gst::Pad {
- self.0.gst_pad()
+ pub fn downgrade(&self) -> PadSrcWeak {
+ PadSrcWeak(Arc::downgrade(&self.0))
}
- pub fn downgrade(&self) -> PadSrcWeak {
- self.0.downgrade()
+ pub fn as_ref(&self) -> PadSrcRef<'_> {
+ PadSrcRef::new(Arc::clone(&self.0))
}
pub fn check_reconfigure(&self) -> bool {
- self.gst_pad().check_reconfigure()
+ self.0.gst_pad().check_reconfigure()
}
fn init_pad_functions<H: PadSrcHandler>(&self, handler: H) {
let handler_clone = handler.clone();
- let inner_arc = Arc::clone(&(self.0).0);
- self.gst_pad()
+ let inner_arc = Arc::clone(&self.0);
+ self.0
+ .gst_pad()
.set_activate_function(move |gst_pad, parent| {
let handler = handler_clone.clone();
let inner_arc = inner_arc.clone();
@@ -464,7 +406,7 @@ impl PadSrc {
});
let handler_clone = handler.clone();
- let inner_arc = Arc::clone(&(self.0).0);
+ let inner_arc = Arc::clone(&self.0);
self.gst_pad()
.set_activatemode_function(move |gst_pad, parent, mode, active| {
let handler = handler_clone.clone();
@@ -489,7 +431,7 @@ impl PadSrc {
// No need to `set_event_function` since `set_event_full_function`
// overrides it and dispatches to `src_event` when necessary
let handler_clone = handler.clone();
- let inner_arc = Arc::clone(&(self.0).0);
+ let inner_arc = Arc::clone(&self.0);
self.gst_pad()
.set_event_full_function(move |_gst_pad, parent, event| {
let handler = handler_clone.clone();
@@ -504,7 +446,7 @@ impl PadSrc {
)
});
- let inner_arc = Arc::clone(&(self.0).0);
+ let inner_arc = Arc::clone(&self.0);
self.gst_pad()
.set_query_function(move |_gst_pad, parent, query| {
let handler = handler.clone();
@@ -524,18 +466,6 @@ impl PadSrc {
)
});
}
-
- pub async fn push(&self, buffer: gst::Buffer) -> Result<FlowSuccess, FlowError> {
- self.0.push(buffer).await
- }
-
- pub async fn push_list(&self, list: gst::BufferList) -> Result<FlowSuccess, FlowError> {
- self.0.push_list(list).await
- }
-
- pub async fn push_event(&self, event: gst::Event) -> bool {
- self.0.push_event(event).await
- }
}
impl Drop for PadSrc {
@@ -557,6 +487,14 @@ impl Drop for PadSrc {
}
}
+impl Deref for PadSrc {
+ type Target = PadSrcInner;
+
+ fn deref(&self) -> &Self::Target {
+ &self.0
+ }
+}
+
/// A trait to define `handler`s for [`PadSink`] callbacks.
///
/// *See the [`pad` module] documentation for a description of the model.*
@@ -710,7 +648,7 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
}
#[derive(Debug)]
-struct PadSinkInner {
+pub struct PadSinkInner {
gst_pad: gst::Pad,
}
@@ -722,6 +660,10 @@ impl PadSinkInner {
PadSinkInner { gst_pad }
}
+
+ pub fn gst_pad(&self) -> &gst::Pad {
+ &self.gst_pad
+ }
}
/// A [`PadSink`] which can be moved in `Handler`s functions and `Future`s.
@@ -754,24 +696,20 @@ impl PadSinkWeak {
/// [`pad` module]: index.html
#[derive(Debug)]
pub struct PadSinkRef<'a> {
- strong: PadSinkStrong,
- phantom: PhantomData<&'a PadSrcStrong>,
+ strong: Arc<PadSinkInner>,
+ phantom: PhantomData<&'a Self>,
}
impl<'a> PadSinkRef<'a> {
fn new(inner_arc: Arc<PadSinkInner>) -> Self {
PadSinkRef {
- strong: PadSinkStrong(inner_arc),
+ strong: inner_arc,
phantom: PhantomData,
}
}
- pub fn gst_pad(&self) -> &gst::Pad {
- self.strong.gst_pad()
- }
-
pub fn downgrade(&self) -> PadSinkWeak {
- self.strong.downgrade()
+ PadSinkWeak(Arc::downgrade(&self.strong))
}
fn activate_mode_hook(
@@ -798,20 +736,7 @@ impl<'a> PadSinkRef<'a> {
&self,
fut: impl Future<Output = Result<FlowSuccess, FlowError>> + Send + 'static,
) -> Result<FlowSuccess, FlowError> {
- // First try to add it as a sub task to the current task, if any
if let Err(fut) = Context::add_sub_task(fut.map(|res| res.map(drop))) {
- // FIXME: update comments below
- // Not on a context thread: execute the Future immediately.
- //
- // - If there is no PadContext, we don't have any other options.
- // - If there is a PadContext, it means that we received it from
- // an upstream element, but there is at least one non-ts element
- // operating on another thread in between, so we can't take
- // advantage of the task queue.
- //
- // Note: we don't use `crate::runtime::executor::block_on` here
- // because `Context::is_context_thread()` is checked in the `if`
- // statement above.
block_on_or_add_sub_task(fut.map(|res| res.map(|_| gst::FlowSuccess::Ok)))
.unwrap_or(Ok(gst::FlowSuccess::Ok))
} else {
@@ -820,20 +745,11 @@ impl<'a> PadSinkRef<'a> {
}
}
-#[derive(Debug)]
-struct PadSinkStrong(Arc<PadSinkInner>);
-
-impl PadSinkStrong {
- fn new(gst_pad: gst::Pad) -> Self {
- PadSinkStrong(Arc::new(PadSinkInner::new(gst_pad)))
- }
-
- fn gst_pad(&self) -> &gst::Pad {
- &self.0.gst_pad
- }
+impl<'a> Deref for PadSinkRef<'a> {
+ type Target = PadSinkInner;
- fn downgrade(&self) -> PadSinkWeak {
- PadSinkWeak(Arc::downgrade(&self.0))
+ fn deref(&self) -> &Self::Target {
+ &self.strong
}
}
@@ -846,31 +762,27 @@ impl PadSinkStrong {
/// [`downgrade`]: struct.PadSink.html#method.downgrade
/// [`pad` module]: index.html
#[derive(Debug)]
-pub struct PadSink(PadSinkStrong);
+pub struct PadSink(Arc<PadSinkInner>);
impl PadSink {
pub fn new(gst_pad: gst::Pad, handler: impl PadSinkHandler) -> Self {
- let this = PadSink(PadSinkStrong::new(gst_pad));
+ let this = PadSink(Arc::new(PadSinkInner::new(gst_pad)));
this.init_pad_functions(handler);
this
}
- pub fn as_ref(&self) -> PadSinkRef<'_> {
- PadSinkRef::new(Arc::clone(&(self.0).0))
- }
-
- pub fn gst_pad(&self) -> &gst::Pad {
- self.0.gst_pad()
+ pub fn downgrade(&self) -> PadSinkWeak {
+ PadSinkWeak(Arc::downgrade(&self.0))
}
- pub fn downgrade(&self) -> PadSinkWeak {
- self.0.downgrade()
+ pub fn as_ref(&self) -> PadSinkRef<'_> {
+ PadSinkRef::new(Arc::clone(&self.0))
}
fn init_pad_functions<H: PadSinkHandler>(&self, handler: H) {
let handler_clone = handler.clone();
- let inner_arc = Arc::clone(&(self.0).0);
+ let inner_arc = Arc::clone(&self.0);
self.gst_pad()
.set_activate_function(move |gst_pad, parent| {
let handler = handler_clone.clone();
@@ -892,7 +804,7 @@ impl PadSink {
});
let handler_clone = handler.clone();
- let inner_arc = Arc::clone(&(self.0).0);
+ let inner_arc = Arc::clone(&self.0);
self.gst_pad()
.set_activatemode_function(move |gst_pad, parent, mode, active| {
let handler = handler_clone.clone();
@@ -916,7 +828,7 @@ impl PadSink {
});
let handler_clone = handler.clone();
- let inner_arc = Arc::clone(&(self.0).0);
+ let inner_arc = Arc::clone(&self.0);
self.gst_pad()
.set_chain_function(move |_gst_pad, parent, buffer| {
let handler = handler_clone.clone();
@@ -949,7 +861,7 @@ impl PadSink {
});
let handler_clone = handler.clone();
- let inner_arc = Arc::clone(&(self.0).0);
+ let inner_arc = Arc::clone(&self.0);
self.gst_pad()
.set_chain_list_function(move |_gst_pad, parent, list| {
let handler = handler_clone.clone();
@@ -987,7 +899,7 @@ impl PadSink {
// No need to `set_event_function` since `set_event_full_function`
// overrides it and dispatches to `sink_event` when necessary
let handler_clone = handler.clone();
- let inner_arc = Arc::clone(&(self.0).0);
+ let inner_arc = Arc::clone(&self.0);
self.gst_pad()
.set_event_full_function(move |_gst_pad, parent, event| {
let handler = handler_clone.clone();
@@ -1028,7 +940,7 @@ impl PadSink {
)
});
- let inner_arc = Arc::clone(&(self.0).0);
+ let inner_arc = Arc::clone(&self.0);
self.gst_pad()
.set_query_function(move |_gst_pad, parent, query| {
let handler = handler.clone();
@@ -1072,3 +984,11 @@ impl Drop for PadSink {
.set_query_function(move |_gst_pad, _parent, _query| false);
}
}
+
+impl Deref for PadSink {
+ type Target = PadSinkInner;
+
+ fn deref(&self) -> &Self::Target {
+ &self.0
+ }
+}