diff options
author | François Laignel <fengalin@free.fr> | 2020-05-26 12:30:14 +0300 |
---|---|---|
committer | Sebastian Dröge <slomo@coaxion.net> | 2020-05-30 11:30:27 +0300 |
commit | dfaf59a59ba1ef0669172210f29a154a4034e4c8 (patch) | |
tree | ac4f10f3f6b0fa41f472ad5315fcafd289019c6d /generic/threadshare | |
parent | 13d3fd1cc86c919a7ccff44b4da52b00d3582f31 (diff) |
threadshare: simplify Pad{Src,Sink} implementations
Pad{Src,Sink}[Ref] delegate some functions to their respective
Pad{Src,Sink}Inner. Since they act as smart pointers, we can
safely implement the Deref trait to simplify the implementations.
Diffstat (limited to 'generic/threadshare')
-rw-r--r-- | generic/threadshare/src/runtime/pad.rs | 340 |
1 files changed, 130 insertions, 210 deletions
diff --git a/generic/threadshare/src/runtime/pad.rs b/generic/threadshare/src/runtime/pad.rs index 026e6f0e5..d42663ef6 100644 --- a/generic/threadshare/src/runtime/pad.rs +++ b/generic/threadshare/src/runtime/pad.rs @@ -42,15 +42,13 @@ //! ╰─────────────────╯ ╭─>│ │╌╌╌>│ │─╮ │ //! ╭───────╯ │ │ ╰──┰──╯ ╰──┰──╯ ╰───────╮ │ //! ╭────────────╮ ╭────────╮ push* │ ┃ ┃ ╭─────────╮ -//! │ Pad Task ↺ │<──│ PadSrc │───────╯ ┃ ┃ │ PadSink │ +//! │ Pad Task ↺ │──>│ PadSrc │───────╯ ┃ ┃ │ PadSink │ //! ╰────────────╯ ╰────────╯ ┃ ┃ ╰─────────╯ -//! ━━━━━━━━━━━━━━━━━━━━━━│━━━━━━━━━━━━━━━━━┛ ┗━━━━━━━━━━━━━━━│━━━━━━━━━━━━ -//! ╰───────────────────╮ ╭─────────────────╯ -//! ╭──────────────╮ -//! │ PadContext │ -//! │╭────────────╮│ -//! ││ Context ↺ ││ -//! ╰╰────────────╯╯ +//! ━━━━━━━│━━━━━━━━━━━━━━│━━━━━━━━━━━━━━━━━┛ ┗━━━━━━━━━━━━━━━│━━━━━━━━━━━━ +//! ╰──────────────┴───────────────────╮ ╭─────────────────╯ +//! ╭────────────╮ +//! │ Context ↺ │ +//! ╰────────────╯ //! ``` //! //! Asynchronous operations for both [`PadSrc`] in `Element A` and [`PadSink`] in `Element B` run on @@ -78,6 +76,7 @@ use gst::{gst_debug, gst_error, gst_fixme, gst_log, gst_loggable_error}; use gst::{FlowError, FlowSuccess}; use std::marker::PhantomData; +use std::ops::Deref; use std::sync::{Arc, Weak}; use super::executor::{block_on_or_add_sub_task, Context}; @@ -204,7 +203,7 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static { } #[derive(Debug)] -struct PadSrcInner { +pub struct PadSrcInner { gst_pad: gst::Pad, } @@ -216,6 +215,66 @@ impl PadSrcInner { PadSrcInner { gst_pad } } + + pub fn gst_pad(&self) -> &gst::Pad { + &self.gst_pad + } + + pub async fn push(&self, buffer: gst::Buffer) -> Result<FlowSuccess, FlowError> { + gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Pushing {:?}", buffer); + + let success = self.gst_pad.push(buffer).map_err(|err| { + gst_error!(RUNTIME_CAT, + obj: self.gst_pad(), + "Failed to push Buffer to PadSrc: {:?}", + err, + ); + err + })?; + + gst_log!(RUNTIME_CAT, obj: &self.gst_pad, "Processing any pending sub tasks"); + while Context::current_has_sub_tasks() { + Context::drain_sub_tasks().await?; + } + + Ok(success) + } + + pub async fn push_list(&self, list: gst::BufferList) -> Result<FlowSuccess, FlowError> { + gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Pushing {:?}", list); + + let success = self.gst_pad.push_list(list).map_err(|err| { + gst_error!( + RUNTIME_CAT, + obj: self.gst_pad(), + "Failed to push BufferList to PadSrc: {:?}", + err, + ); + err + })?; + + gst_log!(RUNTIME_CAT, obj: &self.gst_pad, "Processing any pending sub tasks"); + while Context::current_has_sub_tasks() { + Context::drain_sub_tasks().await?; + } + + Ok(success) + } + + pub async fn push_event(&self, event: gst::Event) -> bool { + gst_log!(RUNTIME_CAT, obj: &self.gst_pad, "Pushing {:?}", event); + + let was_handled = self.gst_pad().push_event(event); + + gst_log!(RUNTIME_CAT, obj: &self.gst_pad, "Processing any pending sub tasks"); + while Context::current_has_sub_tasks() { + if Context::drain_sub_tasks().await.is_err() { + return false; + } + } + + was_handled + } } /// A [`PadSrc`] which can be moved in [`handler`]s functions and `Future`s. @@ -249,51 +308,20 @@ impl PadSrcWeak { /// [`pad` module]: index.html #[derive(Debug)] pub struct PadSrcRef<'a> { - strong: PadSrcStrong, - phantom: PhantomData<&'a PadSrcStrong>, + strong: Arc<PadSrcInner>, + phantom: PhantomData<&'a Self>, } impl<'a> PadSrcRef<'a> { fn new(inner_arc: Arc<PadSrcInner>) -> Self { PadSrcRef { - strong: PadSrcStrong(inner_arc), + strong: inner_arc, phantom: PhantomData, } } - pub fn gst_pad(&self) -> &gst::Pad { - self.strong.gst_pad() - } - - ///// Spawns `future` using current [`PadContext`]. - ///// - ///// # Panics - ///// - ///// This function panics if the `PadSrc` is not prepared. - ///// - ///// [`PadContext`]: ../struct.PadContext.html - //pub fn spawn<Fut>(&self, future: Fut) -> JoinHandle<Fut::Output> - //where - // Fut: Future + Send + 'static, - // Fut::Output: Send + 'static, - //{ - // self.strong.spawn(future) - //} - pub fn downgrade(&self) -> PadSrcWeak { - self.strong.downgrade() - } - - pub async fn push(&self, buffer: gst::Buffer) -> Result<FlowSuccess, FlowError> { - self.strong.push(buffer).await - } - - pub async fn push_list(&self, list: gst::BufferList) -> Result<FlowSuccess, FlowError> { - self.strong.push_list(list).await - } - - pub async fn push_event(&self, event: gst::Event) -> bool { - self.strong.push_event(event).await + PadSrcWeak(Arc::downgrade(&self.strong)) } fn activate_mode_hook( @@ -317,94 +345,11 @@ impl<'a> PadSrcRef<'a> { } } -#[derive(Debug)] -struct PadSrcStrong(Arc<PadSrcInner>); +impl<'a> Deref for PadSrcRef<'a> { + type Target = PadSrcInner; -impl PadSrcStrong { - fn new(gst_pad: gst::Pad) -> Self { - PadSrcStrong(Arc::new(PadSrcInner::new(gst_pad))) - } - - #[inline] - fn gst_pad(&self) -> &gst::Pad { - &self.0.gst_pad - } - - //#[inline] - //fn spawn<Fut>(&self, future: Fut) -> JoinHandle<Fut::Output> - //where - // Fut: Future + Send + 'static, - // Fut::Output: Send + 'static, - //{ - // let pad_ctx = self.pad_context_priv(); - // pad_ctx - // .as_ref() - // .expect("PadContext not initialized") - // .spawn(future) - //} - - #[inline] - fn downgrade(&self) -> PadSrcWeak { - PadSrcWeak(Arc::downgrade(&self.0)) - } - - #[inline] - async fn push(&self, buffer: gst::Buffer) -> Result<FlowSuccess, FlowError> { - gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Pushing {:?}", buffer); - - let success = self.gst_pad().push(buffer).map_err(|err| { - gst_error!(RUNTIME_CAT, - obj: self.gst_pad(), - "Failed to push Buffer to PadSrc: {:?}", - err, - ); - err - })?; - - gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Processing any pending sub tasks"); - while Context::current_has_sub_tasks() { - Context::drain_sub_tasks().await?; - } - - Ok(success) - } - - #[inline] - async fn push_list(&self, list: gst::BufferList) -> Result<FlowSuccess, FlowError> { - gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Pushing {:?}", list); - - let success = self.gst_pad().push_list(list).map_err(|err| { - gst_error!( - RUNTIME_CAT, - obj: self.gst_pad(), - "Failed to push BufferList to PadSrc: {:?}", - err, - ); - err - })?; - - gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Processing any pending sub tasks"); - while Context::current_has_sub_tasks() { - Context::drain_sub_tasks().await?; - } - - Ok(success) - } - - #[inline] - async fn push_event(&self, event: gst::Event) -> bool { - gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Pushing {:?}", event); - - let was_handled = self.gst_pad().push_event(event); - - gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Processing any pending sub tasks"); - while Context::current_has_sub_tasks() { - if Context::drain_sub_tasks().await.is_err() { - return false; - } - } - - was_handled + fn deref(&self) -> &Self::Target { + &self.strong } } @@ -417,36 +362,33 @@ impl PadSrcStrong { /// [`downgrade`]: struct.PadSrc.html#method.downgrade /// [`pad` module]: index.html #[derive(Debug)] -pub struct PadSrc(PadSrcStrong); +pub struct PadSrc(Arc<PadSrcInner>); impl PadSrc { pub fn new(gst_pad: gst::Pad, handler: impl PadSrcHandler) -> Self { - let this = PadSrc(PadSrcStrong::new(gst_pad)); + let this = PadSrc(Arc::new(PadSrcInner::new(gst_pad))); this.init_pad_functions(handler); this } - pub fn as_ref(&self) -> PadSrcRef<'_> { - PadSrcRef::new(Arc::clone(&(self.0).0)) - } - - pub fn gst_pad(&self) -> &gst::Pad { - self.0.gst_pad() + pub fn downgrade(&self) -> PadSrcWeak { + PadSrcWeak(Arc::downgrade(&self.0)) } - pub fn downgrade(&self) -> PadSrcWeak { - self.0.downgrade() + pub fn as_ref(&self) -> PadSrcRef<'_> { + PadSrcRef::new(Arc::clone(&self.0)) } pub fn check_reconfigure(&self) -> bool { - self.gst_pad().check_reconfigure() + self.0.gst_pad().check_reconfigure() } fn init_pad_functions<H: PadSrcHandler>(&self, handler: H) { let handler_clone = handler.clone(); - let inner_arc = Arc::clone(&(self.0).0); - self.gst_pad() + let inner_arc = Arc::clone(&self.0); + self.0 + .gst_pad() .set_activate_function(move |gst_pad, parent| { let handler = handler_clone.clone(); let inner_arc = inner_arc.clone(); @@ -464,7 +406,7 @@ impl PadSrc { }); let handler_clone = handler.clone(); - let inner_arc = Arc::clone(&(self.0).0); + let inner_arc = Arc::clone(&self.0); self.gst_pad() .set_activatemode_function(move |gst_pad, parent, mode, active| { let handler = handler_clone.clone(); @@ -489,7 +431,7 @@ impl PadSrc { // No need to `set_event_function` since `set_event_full_function` // overrides it and dispatches to `src_event` when necessary let handler_clone = handler.clone(); - let inner_arc = Arc::clone(&(self.0).0); + let inner_arc = Arc::clone(&self.0); self.gst_pad() .set_event_full_function(move |_gst_pad, parent, event| { let handler = handler_clone.clone(); @@ -504,7 +446,7 @@ impl PadSrc { ) }); - let inner_arc = Arc::clone(&(self.0).0); + let inner_arc = Arc::clone(&self.0); self.gst_pad() .set_query_function(move |_gst_pad, parent, query| { let handler = handler.clone(); @@ -524,18 +466,6 @@ impl PadSrc { ) }); } - - pub async fn push(&self, buffer: gst::Buffer) -> Result<FlowSuccess, FlowError> { - self.0.push(buffer).await - } - - pub async fn push_list(&self, list: gst::BufferList) -> Result<FlowSuccess, FlowError> { - self.0.push_list(list).await - } - - pub async fn push_event(&self, event: gst::Event) -> bool { - self.0.push_event(event).await - } } impl Drop for PadSrc { @@ -557,6 +487,14 @@ impl Drop for PadSrc { } } +impl Deref for PadSrc { + type Target = PadSrcInner; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + /// A trait to define `handler`s for [`PadSink`] callbacks. /// /// *See the [`pad` module] documentation for a description of the model.* @@ -710,7 +648,7 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static { } #[derive(Debug)] -struct PadSinkInner { +pub struct PadSinkInner { gst_pad: gst::Pad, } @@ -722,6 +660,10 @@ impl PadSinkInner { PadSinkInner { gst_pad } } + + pub fn gst_pad(&self) -> &gst::Pad { + &self.gst_pad + } } /// A [`PadSink`] which can be moved in `Handler`s functions and `Future`s. @@ -754,24 +696,20 @@ impl PadSinkWeak { /// [`pad` module]: index.html #[derive(Debug)] pub struct PadSinkRef<'a> { - strong: PadSinkStrong, - phantom: PhantomData<&'a PadSrcStrong>, + strong: Arc<PadSinkInner>, + phantom: PhantomData<&'a Self>, } impl<'a> PadSinkRef<'a> { fn new(inner_arc: Arc<PadSinkInner>) -> Self { PadSinkRef { - strong: PadSinkStrong(inner_arc), + strong: inner_arc, phantom: PhantomData, } } - pub fn gst_pad(&self) -> &gst::Pad { - self.strong.gst_pad() - } - pub fn downgrade(&self) -> PadSinkWeak { - self.strong.downgrade() + PadSinkWeak(Arc::downgrade(&self.strong)) } fn activate_mode_hook( @@ -798,20 +736,7 @@ impl<'a> PadSinkRef<'a> { &self, fut: impl Future<Output = Result<FlowSuccess, FlowError>> + Send + 'static, ) -> Result<FlowSuccess, FlowError> { - // First try to add it as a sub task to the current task, if any if let Err(fut) = Context::add_sub_task(fut.map(|res| res.map(drop))) { - // FIXME: update comments below - // Not on a context thread: execute the Future immediately. - // - // - If there is no PadContext, we don't have any other options. - // - If there is a PadContext, it means that we received it from - // an upstream element, but there is at least one non-ts element - // operating on another thread in between, so we can't take - // advantage of the task queue. - // - // Note: we don't use `crate::runtime::executor::block_on` here - // because `Context::is_context_thread()` is checked in the `if` - // statement above. block_on_or_add_sub_task(fut.map(|res| res.map(|_| gst::FlowSuccess::Ok))) .unwrap_or(Ok(gst::FlowSuccess::Ok)) } else { @@ -820,20 +745,11 @@ impl<'a> PadSinkRef<'a> { } } -#[derive(Debug)] -struct PadSinkStrong(Arc<PadSinkInner>); - -impl PadSinkStrong { - fn new(gst_pad: gst::Pad) -> Self { - PadSinkStrong(Arc::new(PadSinkInner::new(gst_pad))) - } - - fn gst_pad(&self) -> &gst::Pad { - &self.0.gst_pad - } +impl<'a> Deref for PadSinkRef<'a> { + type Target = PadSinkInner; - fn downgrade(&self) -> PadSinkWeak { - PadSinkWeak(Arc::downgrade(&self.0)) + fn deref(&self) -> &Self::Target { + &self.strong } } @@ -846,31 +762,27 @@ impl PadSinkStrong { /// [`downgrade`]: struct.PadSink.html#method.downgrade /// [`pad` module]: index.html #[derive(Debug)] -pub struct PadSink(PadSinkStrong); +pub struct PadSink(Arc<PadSinkInner>); impl PadSink { pub fn new(gst_pad: gst::Pad, handler: impl PadSinkHandler) -> Self { - let this = PadSink(PadSinkStrong::new(gst_pad)); + let this = PadSink(Arc::new(PadSinkInner::new(gst_pad))); this.init_pad_functions(handler); this } - pub fn as_ref(&self) -> PadSinkRef<'_> { - PadSinkRef::new(Arc::clone(&(self.0).0)) - } - - pub fn gst_pad(&self) -> &gst::Pad { - self.0.gst_pad() + pub fn downgrade(&self) -> PadSinkWeak { + PadSinkWeak(Arc::downgrade(&self.0)) } - pub fn downgrade(&self) -> PadSinkWeak { - self.0.downgrade() + pub fn as_ref(&self) -> PadSinkRef<'_> { + PadSinkRef::new(Arc::clone(&self.0)) } fn init_pad_functions<H: PadSinkHandler>(&self, handler: H) { let handler_clone = handler.clone(); - let inner_arc = Arc::clone(&(self.0).0); + let inner_arc = Arc::clone(&self.0); self.gst_pad() .set_activate_function(move |gst_pad, parent| { let handler = handler_clone.clone(); @@ -892,7 +804,7 @@ impl PadSink { }); let handler_clone = handler.clone(); - let inner_arc = Arc::clone(&(self.0).0); + let inner_arc = Arc::clone(&self.0); self.gst_pad() .set_activatemode_function(move |gst_pad, parent, mode, active| { let handler = handler_clone.clone(); @@ -916,7 +828,7 @@ impl PadSink { }); let handler_clone = handler.clone(); - let inner_arc = Arc::clone(&(self.0).0); + let inner_arc = Arc::clone(&self.0); self.gst_pad() .set_chain_function(move |_gst_pad, parent, buffer| { let handler = handler_clone.clone(); @@ -949,7 +861,7 @@ impl PadSink { }); let handler_clone = handler.clone(); - let inner_arc = Arc::clone(&(self.0).0); + let inner_arc = Arc::clone(&self.0); self.gst_pad() .set_chain_list_function(move |_gst_pad, parent, list| { let handler = handler_clone.clone(); @@ -987,7 +899,7 @@ impl PadSink { // No need to `set_event_function` since `set_event_full_function` // overrides it and dispatches to `sink_event` when necessary let handler_clone = handler.clone(); - let inner_arc = Arc::clone(&(self.0).0); + let inner_arc = Arc::clone(&self.0); self.gst_pad() .set_event_full_function(move |_gst_pad, parent, event| { let handler = handler_clone.clone(); @@ -1028,7 +940,7 @@ impl PadSink { ) }); - let inner_arc = Arc::clone(&(self.0).0); + let inner_arc = Arc::clone(&self.0); self.gst_pad() .set_query_function(move |_gst_pad, parent, query| { let handler = handler.clone(); @@ -1072,3 +984,11 @@ impl Drop for PadSink { .set_query_function(move |_gst_pad, _parent, _query| false); } } + +impl Deref for PadSink { + type Target = PadSinkInner; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} |