diff options
author | François Laignel <fengalin@free.fr> | 2022-10-12 13:35:20 +0300 |
---|---|---|
committer | François Laignel <fengalin@free.fr> | 2022-10-12 13:35:20 +0300 |
commit | 2bffdec691c441c9effefa4f27f72d64681c0bf2 (patch) | |
tree | a5f1058c6a4a52f1c94f808f5a42f6933d336a18 /generic/threadshare/src/queue | |
parent | bc5b51687dacd2a1e4dadae8c4426a253a825ddf (diff) |
ts: better use of `imp` & `elem` args in `Pad{Sink,Src}Handler`s
This is a follow-up to commit 7ee4afac.
This commit cleans up the `Pad{Sink,Src}Handler` by
- Keeping arguments which are strictly necessary.
- Passing arguments by value for the trait functions which return
a `Future`. The arguments which were previously passed by reference
were `clone`d internally and then `clone`d again in most
implementations.
There are unfortunate differences in trait function signatures
between those which return a `Future` and the sync functions. This
is due to the requirement for the arguments to be moved to the
resulting `Future`, whereas sync functions can rely on references.
One particular notable difference is the use of the `imp` in sync
functions instead of the `elem` in functions returning a `Future`.
Because the `imp` is not guaranteed to implement `Clone`, we can't
move it to the resulting `Future`, so the `elem` is used.
Diffstat (limited to 'generic/threadshare/src/queue')
-rw-r--r-- | generic/threadshare/src/queue/imp.rs | 131 |
1 files changed, 44 insertions, 87 deletions
diff --git a/generic/threadshare/src/queue/imp.rs b/generic/threadshare/src/queue/imp.rs index b3d1df622..32f6f4721 100644 --- a/generic/threadshare/src/queue/imp.rs +++ b/generic/threadshare/src/queue/imp.rs @@ -33,7 +33,7 @@ use std::time::Duration; use std::{u32, u64}; use crate::runtime::prelude::*; -use crate::runtime::{Context, PadSink, PadSinkRef, PadSrc, PadSrcRef, Task}; +use crate::runtime::{Context, PadSink, PadSinkRef, PadSinkWeak, PadSrc, PadSrcRef, Task}; use crate::dataqueue::{DataQueue, DataQueueItem}; @@ -84,57 +84,43 @@ impl PadSinkHandler for QueuePadSinkHandler { type ElementImpl = Queue; fn sink_chain( - &self, - pad: &PadSinkRef, - _queue: &Queue, - element: &gst::Element, + self, + pad: PadSinkWeak, + elem: super::Queue, buffer: gst::Buffer, ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> { - let pad_weak = pad.downgrade(); - let element = element.clone().downcast::<super::Queue>().unwrap(); async move { - let pad = pad_weak.upgrade().expect("PadSink no longer exists"); + let pad = pad.upgrade().expect("PadSink no longer exists"); gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", buffer); - let queue = element.imp(); - queue.enqueue_item(DataQueueItem::Buffer(buffer)).await + let imp = elem.imp(); + imp.enqueue_item(DataQueueItem::Buffer(buffer)).await } .boxed() } fn sink_chain_list( - &self, - pad: &PadSinkRef, - _queue: &Queue, - element: &gst::Element, + self, + pad: PadSinkWeak, + elem: super::Queue, list: gst::BufferList, ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> { - let pad_weak = pad.downgrade(); - let element = element.clone().downcast::<super::Queue>().unwrap(); async move { - let pad = pad_weak.upgrade().expect("PadSink no longer exists"); + let pad = pad.upgrade().expect("PadSink no longer exists"); gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", list); - let queue = element.imp(); - queue.enqueue_item(DataQueueItem::BufferList(list)).await + let imp = elem.imp(); + imp.enqueue_item(DataQueueItem::BufferList(list)).await } .boxed() } - fn sink_event( - &self, - pad: &PadSinkRef, - queue: &Queue, - element: &gst::Element, - event: gst::Event, - ) -> bool { - use gst::EventView; - + fn sink_event(&self, pad: &PadSinkRef, imp: &Queue, event: gst::Event) -> bool { gst::debug!(CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event); - if let EventView::FlushStart(..) = event.view() { - if let Err(err) = queue.task.flush_start().await_maybe_on_context() { + if let gst::EventView::FlushStart(..) = event.view() { + if let Err(err) = imp.task.flush_start().await_maybe_on_context() { gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err); - gst::element_error!( - element, + gst::element_imp_error!( + imp, gst::StreamError::Failed, ("Internal data stream error"), ["FlushStart failed {:?}", err] @@ -144,31 +130,26 @@ impl PadSinkHandler for QueuePadSinkHandler { } gst::log!(CAT, obj: pad.gst_pad(), "Forwarding non-serialized {:?}", event); - queue.src_pad.gst_pad().push_event(event) + imp.src_pad.gst_pad().push_event(event) } fn sink_event_serialized( - &self, - pad: &PadSinkRef, - _queue: &Queue, - element: &gst::Element, + self, + pad: PadSinkWeak, + elem: super::Queue, event: gst::Event, ) -> BoxFuture<'static, bool> { - use gst::EventView; - - gst::log!(CAT, obj: pad.gst_pad(), "Handling serialized {:?}", event); - - let pad_weak = pad.downgrade(); - let element = element.clone().downcast::<super::Queue>().unwrap(); async move { - let pad = pad_weak.upgrade().expect("PadSink no longer exists"); - let queue = element.imp(); + let pad = pad.upgrade().expect("PadSink no longer exists"); + gst::log!(CAT, obj: pad.gst_pad(), "Handling serialized {:?}", event); + + let imp = elem.imp(); - if let EventView::FlushStop(..) = event.view() { - if let Err(err) = queue.task.flush_stop().await_maybe_on_context() { + if let gst::EventView::FlushStop(..) = event.view() { + if let Err(err) = imp.task.flush_stop().await_maybe_on_context() { gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err); - gst::element_error!( - element, + gst::element_imp_error!( + imp, gst::StreamError::Failed, ("Internal data stream error"), ["FlushStop failed {:?}", err] @@ -178,21 +159,12 @@ impl PadSinkHandler for QueuePadSinkHandler { } gst::log!(CAT, obj: pad.gst_pad(), "Queuing serialized {:?}", event); - queue - .enqueue_item(DataQueueItem::Event(event)) - .await - .is_ok() + imp.enqueue_item(DataQueueItem::Event(event)).await.is_ok() } .boxed() } - fn sink_query( - &self, - pad: &PadSinkRef, - queue: &Queue, - _element: &gst::Element, - query: &mut gst::QueryRef, - ) -> bool { + fn sink_query(&self, pad: &PadSinkRef, imp: &Queue, query: &mut gst::QueryRef) -> bool { gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query); if query.is_serialized() { @@ -201,7 +173,7 @@ impl PadSinkHandler for QueuePadSinkHandler { false } else { gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", query); - queue.src_pad.gst_pad().peer_query(query) + imp.src_pad.gst_pad().peer_query(query) } } } @@ -212,28 +184,21 @@ struct QueuePadSrcHandler; impl PadSrcHandler for QueuePadSrcHandler { type ElementImpl = Queue; - fn src_event( - &self, - pad: &PadSrcRef, - queue: &Queue, - element: &gst::Element, - event: gst::Event, - ) -> bool { - use gst::EventView; - + fn src_event(&self, pad: &PadSrcRef, imp: &Queue, event: gst::Event) -> bool { gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); + use gst::EventView; match event.view() { EventView::FlushStart(..) => { - if let Err(err) = queue.task.flush_start().await_maybe_on_context() { + if let Err(err) = imp.task.flush_start().await_maybe_on_context() { gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err); } } EventView::FlushStop(..) => { - if let Err(err) = queue.task.flush_stop().await_maybe_on_context() { + if let Err(err) = imp.task.flush_stop().await_maybe_on_context() { gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err); - gst::element_error!( - element, + gst::element_imp_error!( + imp, gst::StreamError::Failed, ("Internal data stream error"), ["FlushStop failed {:?}", err] @@ -245,23 +210,15 @@ impl PadSrcHandler for QueuePadSrcHandler { } gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", event); - queue.sink_pad.gst_pad().push_event(event) + imp.sink_pad.gst_pad().push_event(event) } - fn src_query( - &self, - pad: &PadSrcRef, - queue: &Queue, - _element: &gst::Element, - query: &mut gst::QueryRef, - ) -> bool { - use gst::QueryViewMut; - + fn src_query(&self, pad: &PadSrcRef, imp: &Queue, query: &mut gst::QueryRef) -> bool { gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query); - if let QueryViewMut::Scheduling(q) = query.view_mut() { + if let gst::QueryViewMut::Scheduling(q) = query.view_mut() { let mut new_query = gst::query::Scheduling::new(); - let res = queue.sink_pad.gst_pad().peer_query(&mut new_query); + let res = imp.sink_pad.gst_pad().peer_query(&mut new_query); if !res { return res; } @@ -283,7 +240,7 @@ impl PadSrcHandler for QueuePadSrcHandler { } gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", query); - queue.sink_pad.gst_pad().peer_query(query) + imp.sink_pad.gst_pad().peer_query(query) } } |