Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrançois Laignel <fengalin@free.fr>2022-10-12 13:35:20 +0300
committerFrançois Laignel <fengalin@free.fr>2022-10-12 13:35:20 +0300
commit2bffdec691c441c9effefa4f27f72d64681c0bf2 (patch)
treea5f1058c6a4a52f1c94f808f5a42f6933d336a18 /generic
parentbc5b51687dacd2a1e4dadae8c4426a253a825ddf (diff)
ts: better use of `imp` & `elem` args in `Pad{Sink,Src}Handler`s
This is a follow-up to commit 7ee4afac. This commit cleans up the `Pad{Sink,Src}Handler` by - Keeping arguments which are strictly necessary. - Passing arguments by value for the trait functions which return a `Future`. The arguments which were previously passed by reference were `clone`d internally and then `clone`d again in most implementations. There are unfortunate differences in trait function signatures between those which return a `Future` and the sync functions. This is due to the requirement for the arguments to be moved to the resulting `Future`, whereas sync functions can rely on references. One particular notable difference is the use of the `imp` in sync functions instead of the `elem` in functions returning a `Future`. Because the `imp` is not guaranteed to implement `Clone`, we can't move it to the resulting `Future`, so the `elem` is used.
Diffstat (limited to 'generic')
-rw-r--r--generic/threadshare/examples/standalone/sink/imp.rs59
-rw-r--r--generic/threadshare/src/appsrc/imp.rs44
-rw-r--r--generic/threadshare/src/inputselector/imp.rs92
-rw-r--r--generic/threadshare/src/jitterbuffer/imp.rs131
-rw-r--r--generic/threadshare/src/proxy/imp.rs116
-rw-r--r--generic/threadshare/src/queue/imp.rs131
-rw-r--r--generic/threadshare/src/runtime/pad.rs287
-rw-r--r--generic/threadshare/src/tcpclientsrc/imp.rs38
-rw-r--r--generic/threadshare/src/udpsink/imp.rs55
-rw-r--r--generic/threadshare/src/udpsrc/imp.rs28
-rw-r--r--generic/threadshare/tests/pad.rs92
11 files changed, 369 insertions, 704 deletions
diff --git a/generic/threadshare/examples/standalone/sink/imp.rs b/generic/threadshare/examples/standalone/sink/imp.rs
index 62ea73b60..7991dd2af 100644
--- a/generic/threadshare/examples/standalone/sink/imp.rs
+++ b/generic/threadshare/examples/standalone/sink/imp.rs
@@ -19,7 +19,7 @@ use gst::EventView;
use once_cell::sync::Lazy;
use gstthreadshare::runtime::prelude::*;
-use gstthreadshare::runtime::{Context, PadSink, PadSinkRef, Task};
+use gstthreadshare::runtime::{Context, PadSink, PadSinkRef, PadSinkWeak, Task};
use std::sync::Mutex;
use std::task::Poll;
@@ -76,18 +76,15 @@ impl PadSinkHandler for TestSinkPadHandler {
type ElementImpl = TestSink;
fn sink_chain(
- &self,
- _pad: &PadSinkRef,
- test_sink: &TestSink,
- element: &gst::Element,
+ self,
+ _pad: PadSinkWeak,
+ elem: super::TestSink,
buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
- let sender = test_sink.clone_item_sender();
- let element = element.clone().downcast::<super::TestSink>().unwrap();
-
+ let sender = elem.imp().clone_item_sender();
async move {
if sender.send_async(StreamItem::Buffer(buffer)).await.is_err() {
- gst::debug!(CAT, obj: &element, "Flushing");
+ gst::debug!(CAT, obj: &elem, "Flushing");
return Err(gst::FlowError::Flushing);
}
@@ -97,19 +94,16 @@ impl PadSinkHandler for TestSinkPadHandler {
}
fn sink_chain_list(
- &self,
- _pad: &PadSinkRef,
- test_sink: &TestSink,
- element: &gst::Element,
+ self,
+ _pad: PadSinkWeak,
+ elem: super::TestSink,
list: gst::BufferList,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
- let sender = test_sink.clone_item_sender();
- let element = element.clone().downcast::<super::TestSink>().unwrap();
-
+ let sender = elem.imp().clone_item_sender();
async move {
for buffer in list.iter_owned() {
if sender.send_async(StreamItem::Buffer(buffer)).await.is_err() {
- gst::debug!(CAT, obj: &element, "Flushing");
+ gst::debug!(CAT, obj: &elem, "Flushing");
return Err(gst::FlowError::Flushing);
}
}
@@ -120,21 +114,18 @@ impl PadSinkHandler for TestSinkPadHandler {
}
fn sink_event_serialized(
- &self,
- _pad: &PadSinkRef,
- test_sink: &TestSink,
- element: &gst::Element,
+ self,
+ _pad: PadSinkWeak,
+ elem: super::TestSink,
event: gst::Event,
) -> BoxFuture<'static, bool> {
- let sender = test_sink.clone_item_sender();
- let element = element.clone().downcast::<super::TestSink>().unwrap();
-
+ let sender = elem.imp().clone_item_sender();
async move {
if let EventView::FlushStop(_) = event.view() {
- let test_sink = element.imp();
- return test_sink.task.flush_stop().await_maybe_on_context().is_ok();
+ let imp = elem.imp();
+ return imp.task.flush_stop().await_maybe_on_context().is_ok();
} else if sender.send_async(StreamItem::Event(event)).await.is_err() {
- gst::debug!(CAT, obj: &element, "Flushing");
+ gst::debug!(CAT, obj: &elem, "Flushing");
}
true
@@ -142,19 +133,9 @@ impl PadSinkHandler for TestSinkPadHandler {
.boxed()
}
- fn sink_event(
- &self,
- _pad: &PadSinkRef,
- test_sink: &TestSink,
- _element: &gst::Element,
- event: gst::Event,
- ) -> bool {
+ fn sink_event(&self, _pad: &PadSinkRef, imp: &TestSink, event: gst::Event) -> bool {
if let EventView::FlushStart(..) = event.view() {
- return test_sink
- .task
- .flush_start()
- .await_maybe_on_context()
- .is_ok();
+ return imp.task.flush_start().await_maybe_on_context().is_ok();
}
true
diff --git a/generic/threadshare/src/appsrc/imp.rs b/generic/threadshare/src/appsrc/imp.rs
index 8eee681ae..b5636e08d 100644
--- a/generic/threadshare/src/appsrc/imp.rs
+++ b/generic/threadshare/src/appsrc/imp.rs
@@ -82,20 +82,13 @@ struct AppSrcPadHandler;
impl PadSrcHandler for AppSrcPadHandler {
type ElementImpl = AppSrc;
- fn src_event(
- &self,
- pad: &PadSrcRef,
- appsrc: &AppSrc,
- _element: &gst::Element,
- event: gst::Event,
- ) -> bool {
- use gst::EventView;
-
+ fn src_event(&self, pad: &PadSrcRef, imp: &AppSrc, event: gst::Event) -> bool {
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
+ use gst::EventView;
let ret = match event.view() {
- EventView::FlushStart(..) => appsrc.task.flush_start().await_maybe_on_context().is_ok(),
- EventView::FlushStop(..) => appsrc.task.flush_stop().await_maybe_on_context().is_ok(),
+ EventView::FlushStart(..) => imp.task.flush_start().await_maybe_on_context().is_ok(),
+ EventView::FlushStop(..) => imp.task.flush_stop().await_maybe_on_context().is_ok(),
EventView::Reconfigure(..) => true,
EventView::Latency(..) => true,
_ => false,
@@ -110,16 +103,10 @@ impl PadSrcHandler for AppSrcPadHandler {
ret
}
- fn src_query(
- &self,
- pad: &PadSrcRef,
- appsrc: &AppSrc,
- _element: &gst::Element,
- query: &mut gst::QueryRef,
- ) -> bool {
- use gst::QueryViewMut;
-
+ fn src_query(&self, pad: &PadSrcRef, imp: &AppSrc, query: &mut gst::QueryRef) -> bool {
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query);
+
+ use gst::QueryViewMut;
let ret = match query.view_mut() {
QueryViewMut::Latency(q) => {
q.set(true, gst::ClockTime::ZERO, gst::ClockTime::NONE);
@@ -131,7 +118,7 @@ impl PadSrcHandler for AppSrcPadHandler {
true
}
QueryViewMut::Caps(q) => {
- let caps = if let Some(caps) = appsrc.configured_caps.lock().unwrap().as_ref() {
+ let caps = if let Some(caps) = imp.configured_caps.lock().unwrap().as_ref() {
q.filter()
.map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First))
.unwrap_or_else(|| caps.clone())
@@ -328,8 +315,9 @@ impl AppSrc {
let do_timestamp = self.settings.lock().unwrap().do_timestamp;
if do_timestamp {
- if let Some(clock) = self.instance().clock() {
- let base_time = self.instance().base_time();
+ let elem = self.instance();
+ if let Some(clock) = elem.clock() {
+ let base_time = elem.base_time();
let now = clock.time();
let buffer = buffer.make_mut();
@@ -499,11 +487,10 @@ impl ObjectImpl for AppSrc {
.return_type::<bool>()
.action()
.class_handler(|_, args| {
- let element = args[0].get::<super::AppSrc>().expect("signal arg");
+ let elem = args[0].get::<super::AppSrc>().expect("signal arg");
let buffer = args[1].get::<gst::Buffer>().expect("signal arg");
- let appsrc = element.imp();
- Some(appsrc.push_buffer(buffer).to_value())
+ Some(elem.imp().push_buffer(buffer).to_value())
})
.build(),
/**
@@ -516,10 +503,9 @@ impl ObjectImpl for AppSrc {
.return_type::<bool>()
.action()
.class_handler(|_, args| {
- let element = args[0].get::<super::AppSrc>().expect("signal arg");
- let appsrc = element.imp();
+ let elem = args[0].get::<super::AppSrc>().expect("signal arg");
- Some(appsrc.end_of_stream().to_value())
+ Some(elem.imp().end_of_stream().to_value())
})
.build(),
]
diff --git a/generic/threadshare/src/inputselector/imp.rs b/generic/threadshare/src/inputselector/imp.rs
index 4175f4cff..d29320b3e 100644
--- a/generic/threadshare/src/inputselector/imp.rs
+++ b/generic/threadshare/src/inputselector/imp.rs
@@ -33,7 +33,7 @@ use std::time::Duration;
use std::u32;
use crate::runtime::prelude::*;
-use crate::runtime::{self, PadSink, PadSinkRef, PadSrc, PadSrcRef};
+use crate::runtime::{self, PadSink, PadSinkRef, PadSinkWeak, PadSrc, PadSrcRef};
const DEFAULT_CONTEXT: &str = "";
const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO;
@@ -75,14 +75,10 @@ struct InputSelectorPadSinkHandler(Arc<Mutex<InputSelectorPadSinkHandlerInner>>)
impl InputSelectorPadSinkHandler {
/* Wait until specified time */
- async fn sync(
- &self,
- element: &super::InputSelector,
- running_time: impl Into<Option<gst::ClockTime>>,
- ) {
- let now = element.current_running_time();
+ async fn sync(&self, elem: &super::InputSelector, running_time: Option<gst::ClockTime>) {
+ let now = elem.current_running_time();
- match running_time.into().opt_checked_sub(now) {
+ match running_time.opt_checked_sub(now) {
Ok(Some(delay)) => {
runtime::timer::delay_for(delay.into()).await;
}
@@ -92,11 +88,11 @@ impl InputSelectorPadSinkHandler {
async fn handle_item(
&self,
- element: &super::InputSelector,
pad: &PadSinkRef<'_>,
+ elem: &super::InputSelector,
mut buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
- let inputselector = element.imp();
+ let inputselector = elem.imp();
let (stickies, is_active, sync_future, switched_pad) = {
let mut state = inputselector.state.lock().unwrap();
@@ -108,7 +104,7 @@ impl InputSelectorPadSinkHandler {
if let Some(segment) = &inner.segment {
if let Some(segment) = segment.downcast_ref::<gst::format::Time>() {
let rtime = segment.to_running_time(buffer.pts());
- let (sync_fut, abort_handle) = abortable(self.sync(element, rtime));
+ let (sync_fut, abort_handle) = abortable(self.sync(elem, rtime));
inner.abort_handle = Some(abort_handle);
sync_future = Some(sync_fut.map_err(|_| gst::FlowError::Flushing));
}
@@ -162,38 +158,30 @@ impl PadSinkHandler for InputSelectorPadSinkHandler {
type ElementImpl = InputSelector;
fn sink_chain(
- &self,
- pad: &PadSinkRef,
- _inputselector: &InputSelector,
- element: &gst::Element,
+ self,
+ pad: PadSinkWeak,
+ elem: super::InputSelector,
buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
- let this = self.clone();
- let element = element.clone().downcast::<super::InputSelector>().unwrap();
- let pad_weak = pad.downgrade();
async move {
- let pad = pad_weak.upgrade().expect("PadSink no longer exists");
- this.handle_item(&element, &pad, buffer).await
+ let pad = pad.upgrade().expect("PadSink no longer exists");
+ self.handle_item(&pad, &elem, buffer).await
}
.boxed()
}
fn sink_chain_list(
- &self,
- pad: &PadSinkRef,
- _inputselector: &InputSelector,
- element: &gst::Element,
+ self,
+ pad: PadSinkWeak,
+ elem: super::InputSelector,
list: gst::BufferList,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
- let this = self.clone();
- let element = element.clone().downcast::<super::InputSelector>().unwrap();
- let pad_weak = pad.downgrade();
async move {
- let pad = pad_weak.upgrade().expect("PadSink no longer exists");
+ let pad = pad.upgrade().expect("PadSink no longer exists");
gst::log!(CAT, obj: pad.gst_pad(), "Handling buffer list {:?}", list);
// TODO: Ideally we would keep the list intact and forward it in one go
for buffer in list.iter_owned() {
- this.handle_item(&element, &pad, buffer).await?;
+ self.handle_item(&pad, &elem, buffer).await?;
}
Ok(gst::FlowSuccess::Ok)
@@ -202,16 +190,13 @@ impl PadSinkHandler for InputSelectorPadSinkHandler {
}
fn sink_event_serialized(
- &self,
- _pad: &PadSinkRef,
- _inputselector: &InputSelector,
- _element: &gst::Element,
+ self,
+ _pad: PadSinkWeak,
+ _elem: super::InputSelector,
event: gst::Event,
) -> BoxFuture<'static, bool> {
- let this = self.clone();
-
async move {
- let mut inner = this.0.lock().unwrap();
+ let mut inner = self.0.lock().unwrap();
// Remember the segment for later use
if let gst::EventView::Segment(e) = event.view() {
@@ -234,17 +219,11 @@ impl PadSinkHandler for InputSelectorPadSinkHandler {
.boxed()
}
- fn sink_event(
- &self,
- _pad: &PadSinkRef,
- inputselector: &InputSelector,
- _element: &gst::Element,
- event: gst::Event,
- ) -> bool {
+ fn sink_event(&self, _pad: &PadSinkRef, imp: &InputSelector, event: gst::Event) -> bool {
/* Drop all events for now */
if let gst::EventView::FlushStart(..) = event.view() {
/* Unblock downstream */
- inputselector.src_pad.gst_pad().push_event(event.clone());
+ imp.src_pad.gst_pad().push_event(event.clone());
let mut inner = self.0.lock().unwrap();
@@ -255,13 +234,7 @@ impl PadSinkHandler for InputSelectorPadSinkHandler {
true
}
- fn sink_query(
- &self,
- pad: &PadSinkRef,
- inputselector: &InputSelector,
- _element: &gst::Element,
- query: &mut gst::QueryRef,
- ) -> bool {
+ fn sink_query(&self, pad: &PadSinkRef, imp: &InputSelector, query: &mut gst::QueryRef) -> bool {
gst::log!(CAT, obj: pad.gst_pad(), "Handling query {:?}", query);
if query.is_serialized() {
@@ -270,7 +243,7 @@ impl PadSinkHandler for InputSelectorPadSinkHandler {
false
} else {
gst::log!(CAT, obj: pad.gst_pad(), "Forwarding query {:?}", query);
- inputselector.src_pad.gst_pad().peer_query(query)
+ imp.src_pad.gst_pad().peer_query(query)
}
}
}
@@ -281,24 +254,17 @@ struct InputSelectorPadSrcHandler;
impl PadSrcHandler for InputSelectorPadSrcHandler {
type ElementImpl = InputSelector;
- fn src_query(
- &self,
- pad: &PadSrcRef,
- inputselector: &InputSelector,
- _element: &gst::Element,
- query: &mut gst::QueryRef,
- ) -> bool {
- use gst::QueryViewMut;
-
+ fn src_query(&self, pad: &PadSrcRef, imp: &InputSelector, query: &mut gst::QueryRef) -> bool {
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query);
+ use gst::QueryViewMut;
match query.view_mut() {
QueryViewMut::Latency(q) => {
let mut ret = true;
let mut min_latency = gst::ClockTime::ZERO;
let mut max_latency = gst::ClockTime::NONE;
let pads = {
- let pads = inputselector.pads.lock().unwrap();
+ let pads = imp.pads.lock().unwrap();
pads.sink_pads
.iter()
.map(|p| p.0.clone())
@@ -325,7 +291,7 @@ impl PadSrcHandler for InputSelectorPadSrcHandler {
}
_ => {
let sinkpad = {
- let state = inputselector.state.lock().unwrap();
+ let state = imp.state.lock().unwrap();
state.active_sinkpad.clone()
};
diff --git a/generic/threadshare/src/jitterbuffer/imp.rs b/generic/threadshare/src/jitterbuffer/imp.rs
index f2ec87fa4..44cd903bc 100644
--- a/generic/threadshare/src/jitterbuffer/imp.rs
+++ b/generic/threadshare/src/jitterbuffer/imp.rs
@@ -36,7 +36,7 @@ use std::sync::Mutex as StdMutex;
use std::time::Duration;
use crate::runtime::prelude::*;
-use crate::runtime::{self, Context, PadSink, PadSinkRef, PadSrc, PadSrcRef, Task};
+use crate::runtime::{self, Context, PadSink, PadSinkRef, PadSinkWeak, PadSrc, PadSrcRef, Task};
use super::jitterbuffer::{RTPJitterBuffer, RTPJitterBufferItem, RTPPacketRateCtx};
@@ -143,14 +143,10 @@ impl SinkHandler {
}
// For resetting if seqnum discontinuities
- fn reset(
- &self,
- inner: &mut SinkHandlerInner,
- state: &mut State,
- element: &super::JitterBuffer,
- ) -> BTreeSet<GapPacket> {
- gst::info!(CAT, obj: element, "Resetting");
+ fn reset(&self, inner: &mut SinkHandlerInner, jb: &JitterBuffer) -> BTreeSet<GapPacket> {
+ gst::info!(CAT, imp: jb, "Resetting");
+ let mut state = jb.state.lock().unwrap();
state.jbuf.flush();
state.jbuf.reset_skew();
state.discont = true;
@@ -174,23 +170,23 @@ impl SinkHandler {
&self,
inner: &mut SinkHandlerInner,
state: &mut State,
- element: &super::JitterBuffer,
+ jb: &JitterBuffer,
caps: &gst::Caps,
pt: u8,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let s = caps.structure(0).ok_or(gst::FlowError::Error)?;
- gst::debug!(CAT, obj: element, "Parsing {:?}", caps);
+ gst::debug!(CAT, imp: jb, "Parsing {:?}", caps);
let payload = s.get::<i32>("payload").map_err(|err| {
- gst::debug!(CAT, obj: element, "Caps 'payload': {}", err);
+ gst::debug!(CAT, imp: jb, "Caps 'payload': {}", err);
gst::FlowError::Error
})?;
if pt != 0 && payload as u8 != pt {
gst::debug!(
CAT,
- obj: element,
+ imp: jb,
"Caps 'payload' ({}) doesn't match payload type ({})",
payload,
pt
@@ -200,12 +196,12 @@ impl SinkHandler {
inner.last_pt = Some(pt);
let clock_rate = s.get::<i32>("clock-rate").map_err(|err| {
- gst::debug!(CAT, obj: element, "Caps 'clock-rate': {}", err);
+ gst::debug!(CAT, imp: jb, "Caps 'clock-rate': {}", err);
gst::FlowError::Error
})?;
if clock_rate <= 0 {
- gst::debug!(CAT, obj: element, "Caps 'clock-rate' <= 0");
+ gst::debug!(CAT, imp: jb, "Caps 'clock-rate' <= 0");
return Err(gst::FlowError::Error);
}
state.clock_rate = Some(clock_rate as u32);
@@ -253,7 +249,7 @@ impl SinkHandler {
fn handle_big_gap_buffer(
&self,
inner: &mut SinkHandlerInner,
- element: &super::JitterBuffer,
+ jb: &JitterBuffer,
buffer: gst::Buffer,
pt: u8,
) -> bool {
@@ -262,7 +258,7 @@ impl SinkHandler {
gst::debug!(
CAT,
- obj: element,
+ imp: jb,
"Handling big gap, gap packets length: {}",
gap_packets_length
);
@@ -276,7 +272,7 @@ impl SinkHandler {
for gap_packet in inner.gap_packets.iter() {
gst::log!(
CAT,
- obj: element,
+ imp: jb,
"Looking at gap packet with seq {}",
gap_packet.seq,
);
@@ -296,7 +292,7 @@ impl SinkHandler {
}
}
- gst::debug!(CAT, obj: element, "all consecutive: {}", all_consecutive);
+ gst::debug!(CAT, imp: jb, "all consecutive: {}", all_consecutive);
if all_consecutive && gap_packets_length > 3 {
reset = true;
@@ -312,10 +308,9 @@ impl SinkHandler {
&self,
inner: &mut SinkHandlerInner,
pad: &gst::Pad,
- element: &super::JitterBuffer,
+ jb: &JitterBuffer,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
- let jb = element.imp();
let mut state = jb.state.lock().unwrap();
let (max_misorder_time, max_dropout_time) = {
@@ -339,13 +334,15 @@ impl SinkHandler {
gst::log!(
CAT,
- obj: element,
+ imp: jb,
"Storing buffer, seq: {}, rtptime: {}, pt: {}",
seq,
rtptime,
pt
);
+ let element = jb.instance();
+
if dts.is_none() {
dts = pts;
} else if pts.is_none() {
@@ -374,7 +371,7 @@ impl SinkHandler {
if let Some(caps) = pad.current_caps() {
/* Ignore errors at this point, as we want to emit request-pt-map */
- let _ = self.parse_caps(inner, &mut state, element, &caps, pt);
+ let _ = self.parse_caps(inner, &mut state, jb, &caps, pt);
}
}
@@ -388,7 +385,7 @@ impl SinkHandler {
gst::FlowError::Error
})?;
let mut state = jb.state.lock().unwrap();
- self.parse_caps(inner, &mut state, element, &caps, pt)?;
+ self.parse_caps(inner, &mut state, jb, &caps, pt)?;
state
} else {
state
@@ -407,7 +404,7 @@ impl SinkHandler {
if pts.is_none() {
gst::debug!(
CAT,
- obj: element,
+ imp: jb,
"cannot calculate a valid pts for #{}, discard",
seq
);
@@ -420,7 +417,7 @@ impl SinkHandler {
self.calculate_packet_spacing(inner, &mut state, rtptime, pts);
} else {
if (gap != -1 && gap < -(max_misorder as i32)) || (gap >= max_dropout as i32) {
- let reset = self.handle_big_gap_buffer(inner, element, buffer, pt);
+ let reset = self.handle_big_gap_buffer(inner, jb, buffer, pt);
if reset {
// Handle reset in `enqueue_item` to avoid recursion
return Err(gst::FlowError::CustomError);
@@ -440,7 +437,7 @@ impl SinkHandler {
if gap <= 0 {
state.stats.num_late += 1;
- gst::debug!(CAT, obj: element, "Dropping late {}", seq);
+ gst::debug!(CAT, imp: jb, "Dropping late {}", seq);
return Ok(gst::FlowSuccess::Ok);
}
}
@@ -492,7 +489,7 @@ impl SinkHandler {
fn enqueue_item(
&self,
pad: &gst::Pad,
- element: &super::JitterBuffer,
+ jb: &JitterBuffer,
buffer: Option<gst::Buffer>,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut inner = self.0.lock().unwrap();
@@ -504,12 +501,10 @@ impl SinkHandler {
// This is to avoid recursion with `store`, `reset` and `enqueue_item`
while let Some(buf) = buffers.pop_front() {
- if let Err(err) = self.store(&mut inner, pad, element, buf) {
+ if let Err(err) = self.store(&mut inner, pad, jb, buf) {
match err {
gst::FlowError::CustomError => {
- let jb = element.imp();
- let mut state = jb.state.lock().unwrap();
- for gap_packet in self.reset(&mut inner, &mut state, element) {
+ for gap_packet in self.reset(&mut inner, jb) {
buffers.push_back(gap_packet.buffer);
}
}
@@ -518,7 +513,6 @@ impl SinkHandler {
}
}
- let jb = element.imp();
let mut state = jb.state.lock().unwrap();
let (latency, context_wait) = {
@@ -529,7 +523,7 @@ impl SinkHandler {
// Reschedule if needed
let (_, next_wakeup) =
jb.src_pad_handler
- .next_wakeup(element, &state, latency, context_wait);
+ .next_wakeup(&jb.instance(), &state, latency, context_wait);
if let Some((next_wakeup, _)) = next_wakeup {
if let Some((previous_next_wakeup, ref abort_handle)) = state.wait_handle {
if previous_next_wakeup.is_none()
@@ -555,32 +549,20 @@ impl PadSinkHandler for SinkHandler {
type ElementImpl = JitterBuffer;
fn sink_chain(
- &self,
- pad: &PadSinkRef,
- _jb: &JitterBuffer,
- element: &gst::Element,
+ self,
+ pad: PadSinkWeak,
+ elem: super::JitterBuffer,
buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
- let pad_weak = pad.downgrade();
- let element = element.clone().downcast::<super::JitterBuffer>().unwrap();
- let this = self.clone();
-
async move {
- let pad = pad_weak.upgrade().expect("PadSink no longer exists");
-
+ let pad = pad.upgrade().expect("PadSink no longer exists");
gst::debug!(CAT, obj: pad.gst_pad(), "Handling {:?}", buffer);
- this.enqueue_item(pad.gst_pad(), &element, Some(buffer))
+ self.enqueue_item(pad.gst_pad(), elem.imp(), Some(buffer))
}
.boxed()
}
- fn sink_event(
- &self,
- pad: &PadSinkRef,
- jb: &JitterBuffer,
- element: &gst::Element,
- event: gst::Event,
- ) -> bool {
+ fn sink_event(&self, pad: &PadSinkRef, jb: &JitterBuffer, event: gst::Event) -> bool {
use gst::EventView;
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
@@ -588,8 +570,8 @@ impl PadSinkHandler for SinkHandler {
if let EventView::FlushStart(..) = event.view() {
if let Err(err) = jb.task.flush_start().await_maybe_on_context() {
gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
- gst::element_error!(
- element,
+ gst::element_imp_error!(
+ jb,
gst::StreamError::Failed,
("Internal data stream error"),
["FlushStart failed {:?}", err]
@@ -603,25 +585,20 @@ impl PadSinkHandler for SinkHandler {
}
fn sink_event_serialized(
- &self,
- pad: &PadSinkRef,
- _jb: &JitterBuffer,
- element: &gst::Element,
+ self,
+ pad: PadSinkWeak,
+ elem: super::JitterBuffer,
event: gst::Event,
) -> BoxFuture<'static, bool> {
- use gst::EventView;
-
- let pad_weak = pad.downgrade();
- let element = element.clone().downcast::<super::JitterBuffer>().unwrap();
-
async move {
- let pad = pad_weak.upgrade().expect("PadSink no longer exists");
+ let pad = pad.upgrade().expect("PadSink no longer exists");
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
- let jb = element.imp();
+ let jb = elem.imp();
let mut forward = true;
+ use gst::EventView;
match event.view() {
EventView::Segment(e) => {
let mut state = jb.state.lock().unwrap();
@@ -631,7 +608,7 @@ impl PadSinkHandler for SinkHandler {
if let Err(err) = jb.task.flush_stop().await_maybe_on_context() {
gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
gst::element_error!(
- element,
+ elem,
gst::StreamError::Failed,
("Internal data stream error"),
["FlushStop failed {:?}", err]
@@ -893,13 +870,7 @@ impl SrcHandler {
impl PadSrcHandler for SrcHandler {
type ElementImpl = JitterBuffer;
- fn src_event(
- &self,
- pad: &PadSrcRef,
- jb: &JitterBuffer,
- element: &gst::Element,
- event: gst::Event,
- ) -> bool {
+ fn src_event(&self, pad: &PadSrcRef, jb: &JitterBuffer, event: gst::Event) -> bool {
use gst::EventView;
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
@@ -908,8 +879,8 @@ impl PadSrcHandler for SrcHandler {
EventView::FlushStart(..) => {
if let Err(err) = jb.task.flush_start().await_maybe_on_context() {
gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
- gst::element_error!(
- element,
+ gst::element_imp_error!(
+ jb,
gst::StreamError::Failed,
("Internal data stream error"),
["FlushStart failed {:?}", err]
@@ -920,8 +891,8 @@ impl PadSrcHandler for SrcHandler {
EventView::FlushStop(..) => {
if let Err(err) = jb.task.flush_stop().await_maybe_on_context() {
gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
- gst::element_error!(
- element,
+ gst::element_imp_error!(
+ jb,
gst::StreamError::Failed,
("Internal data stream error"),
["FlushStop failed {:?}", err]
@@ -936,13 +907,7 @@ impl PadSrcHandler for SrcHandler {
jb.sink_pad.gst_pad().push_event(event)
}
- fn src_query(
- &self,
- pad: &PadSrcRef,
- jb: &JitterBuffer,
- _element: &gst::Element,
- query: &mut gst::QueryRef,
- ) -> bool {
+ fn src_query(&self, pad: &PadSrcRef, jb: &JitterBuffer, query: &mut gst::QueryRef) -> bool {
use gst::QueryViewMut;
gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", query);
diff --git a/generic/threadshare/src/proxy/imp.rs b/generic/threadshare/src/proxy/imp.rs
index e7ce6b303..9cad80789 100644
--- a/generic/threadshare/src/proxy/imp.rs
+++ b/generic/threadshare/src/proxy/imp.rs
@@ -214,57 +214,40 @@ impl PadSinkHandler for ProxySinkPadHandler {
type ElementImpl = ProxySink;
fn sink_chain(
- &self,
- pad: &PadSinkRef,
- _proxysink: &ProxySink,
- element: &gst::Element,
+ self,
+ pad: PadSinkWeak,
+ elem: super::ProxySink,
buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
- let pad_weak = pad.downgrade();
- let element = element.clone().downcast::<super::ProxySink>().unwrap();
-
async move {
- let pad = pad_weak.upgrade().expect("PadSink no longer exists");
+ let pad = pad.upgrade().expect("PadSink no longer exists");
gst::log!(SINK_CAT, obj: pad.gst_pad(), "Handling {:?}", buffer);
- let proxysink = element.imp();
- proxysink.enqueue_item(DataQueueItem::Buffer(buffer)).await
+ let imp = elem.imp();
+ imp.enqueue_item(DataQueueItem::Buffer(buffer)).await
}
.boxed()
}
fn sink_chain_list(
- &self,
- pad: &PadSinkRef,
- _proxysink: &ProxySink,
- element: &gst::Element,
+ self,
+ pad: PadSinkWeak,
+ elem: super::ProxySink,
list: gst::BufferList,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
- let pad_weak = pad.downgrade();
- let element = element.clone().downcast::<super::ProxySink>().unwrap();
async move {
- let pad = pad_weak.upgrade().expect("PadSink no longer exists");
+ let pad = pad.upgrade().expect("PadSink no longer exists");
gst::log!(SINK_CAT, obj: pad.gst_pad(), "Handling {:?}", list);
- let proxysink = element.imp();
- proxysink
- .enqueue_item(DataQueueItem::BufferList(list))
- .await
+ let imp = elem.imp();
+ imp.enqueue_item(DataQueueItem::BufferList(list)).await
}
.boxed()
}
- fn sink_event(
- &self,
- pad: &PadSinkRef,
- proxysink: &ProxySink,
- _element: &gst::Element,
- event: gst::Event,
- ) -> bool {
- use gst::EventView;
-
+ fn sink_event(&self, pad: &PadSinkRef, imp: &ProxySink, event: gst::Event) -> bool {
gst::debug!(SINK_CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event);
let src_pad = {
- let proxy_ctx = proxysink.proxy_ctx.lock().unwrap();
+ let proxy_ctx = imp.proxy_ctx.lock().unwrap();
PROXY_SRC_PADS
.lock()
@@ -274,8 +257,8 @@ impl PadSinkHandler for ProxySinkPadHandler {
.map(|src_pad| src_pad.gst_pad().clone())
};
- if let EventView::FlushStart(..) = event.view() {
- proxysink.stop();
+ if let gst::EventView::FlushStart(..) = event.view() {
+ imp.stop();
}
if let Some(src_pad) = src_pad {
@@ -288,36 +271,28 @@ impl PadSinkHandler for ProxySinkPadHandler {
}
fn sink_event_serialized(
- &self,
- pad: &PadSinkRef,
- _proxysink: &ProxySink,
- element: &gst::Element,
+ self,
+ pad: PadSinkWeak,
+ elem: super::ProxySink,
event: gst::Event,
) -> BoxFuture<'static, bool> {
- use gst::EventView;
-
- gst::log!(SINK_CAT, obj: pad.gst_pad(), "Handling serialized {:?}", event);
-
- let pad_weak = pad.downgrade();
- let element = element.clone().downcast::<super::ProxySink>().unwrap();
async move {
- let pad = pad_weak.upgrade().expect("PadSink no longer exists");
- let proxysink = element.imp();
+ let pad = pad.upgrade().expect("PadSink no longer exists");
+ gst::log!(SINK_CAT, obj: pad.gst_pad(), "Handling serialized {:?}", event);
+
+ let imp = elem.imp();
+ use gst::EventView;
match event.view() {
EventView::Eos(..) => {
- let _ =
- element.post_message(gst::message::Eos::builder().src(&element).build());
+ let _ = elem.post_message(gst::message::Eos::builder().src(&elem).build());
}
- EventView::FlushStop(..) => proxysink.start(),
+ EventView::FlushStop(..) => imp.start(),
_ => (),
}
gst::log!(SINK_CAT, obj: pad.gst_pad(), "Queuing serialized {:?}", event);
- proxysink
- .enqueue_item(DataQueueItem::Event(event))
- .await
- .is_ok()
+ imp.enqueue_item(DataQueueItem::Event(event)).await.is_ok()
}
.boxed()
}
@@ -691,19 +666,11 @@ struct ProxySrcPadHandler;
impl PadSrcHandler for ProxySrcPadHandler {
type ElementImpl = ProxySrc;
- fn src_event(
- &self,
- pad: &PadSrcRef,
- proxysrc: &ProxySrc,
- element: &gst::Element,
- event: gst::Event,
- ) -> bool {
- use gst::EventView;
-
+ fn src_event(&self, pad: &PadSrcRef, imp: &ProxySrc, event: gst::Event) -> bool {
gst::log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
let sink_pad = {
- let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
+ let proxy_ctx = imp.proxy_ctx.lock().unwrap();
PROXY_SINK_PADS
.lock()
@@ -713,12 +680,13 @@ impl PadSrcHandler for ProxySrcPadHandler {
.map(|sink_pad| sink_pad.gst_pad().clone())
};
+ use gst::EventView;
match event.view() {
EventView::FlushStart(..) => {
- if let Err(err) = proxysrc.task.flush_start().await_maybe_on_context() {
+ if let Err(err) = imp.task.flush_start().await_maybe_on_context() {
gst::error!(SRC_CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
- gst::element_error!(
- element,
+ gst::element_imp_error!(
+ imp,
gst::StreamError::Failed,
("Internal data stream error"),
["FlushStart failed {:?}", err]
@@ -727,10 +695,10 @@ impl PadSrcHandler for ProxySrcPadHandler {
}
}
EventView::FlushStop(..) => {
- if let Err(err) = proxysrc.task.flush_stop().await_maybe_on_context() {
+ if let Err(err) = imp.task.flush_stop().await_maybe_on_context() {
gst::error!(SRC_CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
- gst::element_error!(
- element,
+ gst::element_imp_error!(
+ imp,
gst::StreamError::Failed,
("Internal data stream error"),
["FlushStop failed {:?}", err]
@@ -750,16 +718,10 @@ impl PadSrcHandler for ProxySrcPadHandler {
}
}
- fn src_query(
- &self,
- pad: &PadSrcRef,
- _proxysrc: &ProxySrc,
- _element: &gst::Element,
- query: &mut gst::QueryRef,
- ) -> bool {
- use gst::QueryViewMut;
-
+ fn src_query(&self, pad: &PadSrcRef, _proxysrc: &ProxySrc, query: &mut gst::QueryRef) -> bool {
gst::log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", query);
+
+ use gst::QueryViewMut;
let ret = match query.view_mut() {
QueryViewMut::Latency(q) => {
q.set(true, gst::ClockTime::ZERO, gst::ClockTime::NONE);
diff --git a/generic/threadshare/src/queue/imp.rs b/generic/threadshare/src/queue/imp.rs
index b3d1df622..32f6f4721 100644
--- a/generic/threadshare/src/queue/imp.rs
+++ b/generic/threadshare/src/queue/imp.rs
@@ -33,7 +33,7 @@ use std::time::Duration;
use std::{u32, u64};
use crate::runtime::prelude::*;
-use crate::runtime::{Context, PadSink, PadSinkRef, PadSrc, PadSrcRef, Task};
+use crate::runtime::{Context, PadSink, PadSinkRef, PadSinkWeak, PadSrc, PadSrcRef, Task};
use crate::dataqueue::{DataQueue, DataQueueItem};
@@ -84,57 +84,43 @@ impl PadSinkHandler for QueuePadSinkHandler {
type ElementImpl = Queue;
fn sink_chain(
- &self,
- pad: &PadSinkRef,
- _queue: &Queue,
- element: &gst::Element,
+ self,
+ pad: PadSinkWeak,
+ elem: super::Queue,
buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
- let pad_weak = pad.downgrade();
- let element = element.clone().downcast::<super::Queue>().unwrap();
async move {
- let pad = pad_weak.upgrade().expect("PadSink no longer exists");
+ let pad = pad.upgrade().expect("PadSink no longer exists");
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", buffer);
- let queue = element.imp();
- queue.enqueue_item(DataQueueItem::Buffer(buffer)).await
+ let imp = elem.imp();
+ imp.enqueue_item(DataQueueItem::Buffer(buffer)).await
}
.boxed()
}
fn sink_chain_list(
- &self,
- pad: &PadSinkRef,
- _queue: &Queue,
- element: &gst::Element,
+ self,
+ pad: PadSinkWeak,
+ elem: super::Queue,
list: gst::BufferList,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
- let pad_weak = pad.downgrade();
- let element = element.clone().downcast::<super::Queue>().unwrap();
async move {
- let pad = pad_weak.upgrade().expect("PadSink no longer exists");
+ let pad = pad.upgrade().expect("PadSink no longer exists");
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", list);
- let queue = element.imp();
- queue.enqueue_item(DataQueueItem::BufferList(list)).await
+ let imp = elem.imp();
+ imp.enqueue_item(DataQueueItem::BufferList(list)).await
}
.boxed()
}
- fn sink_event(
- &self,
- pad: &PadSinkRef,
- queue: &Queue,
- element: &gst::Element,
- event: gst::Event,
- ) -> bool {
- use gst::EventView;
-
+ fn sink_event(&self, pad: &PadSinkRef, imp: &Queue, event: gst::Event) -> bool {
gst::debug!(CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event);
- if let EventView::FlushStart(..) = event.view() {
- if let Err(err) = queue.task.flush_start().await_maybe_on_context() {
+ if let gst::EventView::FlushStart(..) = event.view() {
+ if let Err(err) = imp.task.flush_start().await_maybe_on_context() {
gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
- gst::element_error!(
- element,
+ gst::element_imp_error!(
+ imp,
gst::StreamError::Failed,
("Internal data stream error"),
["FlushStart failed {:?}", err]
@@ -144,31 +130,26 @@ impl PadSinkHandler for QueuePadSinkHandler {
}
gst::log!(CAT, obj: pad.gst_pad(), "Forwarding non-serialized {:?}", event);
- queue.src_pad.gst_pad().push_event(event)
+ imp.src_pad.gst_pad().push_event(event)
}
fn sink_event_serialized(
- &self,
- pad: &PadSinkRef,
- _queue: &Queue,
- element: &gst::Element,
+ self,
+ pad: PadSinkWeak,
+ elem: super::Queue,
event: gst::Event,
) -> BoxFuture<'static, bool> {
- use gst::EventView;
-
- gst::log!(CAT, obj: pad.gst_pad(), "Handling serialized {:?}", event);
-
- let pad_weak = pad.downgrade();
- let element = element.clone().downcast::<super::Queue>().unwrap();
async move {
- let pad = pad_weak.upgrade().expect("PadSink no longer exists");
- let queue = element.imp();
+ let pad = pad.upgrade().expect("PadSink no longer exists");
+ gst::log!(CAT, obj: pad.gst_pad(), "Handling serialized {:?}", event);
+
+ let imp = elem.imp();
- if let EventView::FlushStop(..) = event.view() {
- if let Err(err) = queue.task.flush_stop().await_maybe_on_context() {
+ if let gst::EventView::FlushStop(..) = event.view() {
+ if let Err(err) = imp.task.flush_stop().await_maybe_on_context() {
gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
- gst::element_error!(
- element,
+ gst::element_imp_error!(
+ imp,
gst::StreamError::Failed,
("Internal data stream error"),
["FlushStop failed {:?}", err]
@@ -178,21 +159,12 @@ impl PadSinkHandler for QueuePadSinkHandler {
}
gst::log!(CAT, obj: pad.gst_pad(), "Queuing serialized {:?}", event);
- queue
- .enqueue_item(DataQueueItem::Event(event))
- .await
- .is_ok()
+ imp.enqueue_item(DataQueueItem::Event(event)).await.is_ok()
}
.boxed()
}
- fn sink_query(
- &self,
- pad: &PadSinkRef,
- queue: &Queue,
- _element: &gst::Element,
- query: &mut gst::QueryRef,
- ) -> bool {
+ fn sink_query(&self, pad: &PadSinkRef, imp: &Queue, query: &mut gst::QueryRef) -> bool {
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query);
if query.is_serialized() {
@@ -201,7 +173,7 @@ impl PadSinkHandler for QueuePadSinkHandler {
false
} else {
gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", query);
- queue.src_pad.gst_pad().peer_query(query)
+ imp.src_pad.gst_pad().peer_query(query)
}
}
}
@@ -212,28 +184,21 @@ struct QueuePadSrcHandler;
impl PadSrcHandler for QueuePadSrcHandler {
type ElementImpl = Queue;
- fn src_event(
- &self,
- pad: &PadSrcRef,
- queue: &Queue,
- element: &gst::Element,
- event: gst::Event,
- ) -> bool {
- use gst::EventView;
-
+ fn src_event(&self, pad: &PadSrcRef, imp: &Queue, event: gst::Event) -> bool {
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
+ use gst::EventView;
match event.view() {
EventView::FlushStart(..) => {
- if let Err(err) = queue.task.flush_start().await_maybe_on_context() {
+ if let Err(err) = imp.task.flush_start().await_maybe_on_context() {
gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
}
}
EventView::FlushStop(..) => {
- if let Err(err) = queue.task.flush_stop().await_maybe_on_context() {
+ if let Err(err) = imp.task.flush_stop().await_maybe_on_context() {
gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
- gst::element_error!(
- element,
+ gst::element_imp_error!(
+ imp,
gst::StreamError::Failed,
("Internal data stream error"),
["FlushStop failed {:?}", err]
@@ -245,23 +210,15 @@ impl PadSrcHandler for QueuePadSrcHandler {
}
gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", event);
- queue.sink_pad.gst_pad().push_event(event)
+ imp.sink_pad.gst_pad().push_event(event)
}
- fn src_query(
- &self,
- pad: &PadSrcRef,
- queue: &Queue,
- _element: &gst::Element,
- query: &mut gst::QueryRef,
- ) -> bool {
- use gst::QueryViewMut;
-
+ fn src_query(&self, pad: &PadSrcRef, imp: &Queue, query: &mut gst::QueryRef) -> bool {
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query);
- if let QueryViewMut::Scheduling(q) = query.view_mut() {
+ if let gst::QueryViewMut::Scheduling(q) = query.view_mut() {
let mut new_query = gst::query::Scheduling::new();
- let res = queue.sink_pad.gst_pad().peer_query(&mut new_query);
+ let res = imp.sink_pad.gst_pad().peer_query(&mut new_query);
if !res {
return res;
}
@@ -283,7 +240,7 @@ impl PadSrcHandler for QueuePadSrcHandler {
}
gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", query);
- queue.sink_pad.gst_pad().peer_query(query)
+ imp.sink_pad.gst_pad().peer_query(query)
}
}
diff --git a/generic/threadshare/src/runtime/pad.rs b/generic/threadshare/src/runtime/pad.rs
index 7b1b1496e..28afb7807 100644
--- a/generic/threadshare/src/runtime/pad.rs
+++ b/generic/threadshare/src/runtime/pad.rs
@@ -118,13 +118,13 @@ fn event_to_event_full_serialized(
/// [`PadSrc`]: struct.PadSrc.html
/// [`pad` module]: index.html
pub trait PadSrcHandler: Clone + Send + Sync + 'static {
+ // FIXME we should use a GAT here: ObjectSubclass<Type: IsA<gst::Element> + Send>
type ElementImpl: ElementImpl + ObjectSubclass;
fn src_activate(
&self,
pad: &PadSrcRef,
_imp: &Self::ElementImpl,
- _element: &gst::Element,
) -> Result<(), gst::LoggableError> {
let gst_pad = pad.gst_pad();
if gst_pad.is_active() {
@@ -154,21 +154,22 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static {
&self,
_pad: &PadSrcRef,
_imp: &Self::ElementImpl,
- _element: &gst::Element,
_mode: gst::PadMode,
_active: bool,
) -> Result<(), gst::LoggableError> {
Ok(())
}
- fn src_event(
- &self,
- pad: &PadSrcRef,
- _imp: &Self::ElementImpl,
- element: &gst::Element,
- event: gst::Event,
- ) -> bool {
+ fn src_event(&self, pad: &PadSrcRef, imp: &Self::ElementImpl, event: gst::Event) -> bool {
gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
+
+ let elem = imp.instance();
+ // FIXME with GAT on `Self::ElementImpl`, we should be able to
+ // use `.upcast::<gst::Element>()`
+ //
+ // Safety: `Self::ElementImpl` is bound to `gst::subclass::ElementImpl`.
+ let element = unsafe { elem.unsafe_cast_ref::<gst::Element>() };
+
pad.gst_pad().event_default(Some(element), event)
}
@@ -176,20 +177,18 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static {
&self,
pad: &PadSrcRef,
imp: &Self::ElementImpl,
- element: &gst::Element,
event: gst::Event,
) -> Result<FlowSuccess, FlowError> {
// default is to dispatch to `src_event`
// (as implemented in `gst_pad_send_event_unchecked`)
let event_type = event.type_();
- event_to_event_full(self.src_event(pad, imp, element, event), event_type)
+ event_to_event_full(self.src_event(pad, imp, event), event_type)
}
fn src_query(
&self,
pad: &PadSrcRef,
- _imp: &Self::ElementImpl,
- element: &gst::Element,
+ imp: &Self::ElementImpl,
query: &mut gst::QueryRef,
) -> bool {
gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", query);
@@ -198,6 +197,15 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static {
// but we can't return a `Future` because we couldn't honor QueryRef's lifetime
false
} else {
+ gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", query);
+
+ let elem = imp.instance();
+ // FIXME with GAT on `Self::ElementImpl`, we should be able to
+ // use `.upcast::<gst::Element>()`
+ //
+ // Safety: `Self::ElementImpl` is bound to `gst::subclass::ElementImpl`.
+ let element = unsafe { elem.unsafe_cast_ref::<gst::Element>() };
+
pad.gst_pad().query_default(Some(element), query)
}
}
@@ -398,15 +406,7 @@ impl PadSrc {
"Panic in PadSrc activate"
))
},
- move |imp| {
- let this_ref = PadSrcRef::new(inner_arc);
- let element = imp.instance();
- handler.src_activate(
- &this_ref,
- imp,
- element.dynamic_cast_ref::<gst::Element>().unwrap(),
- )
- },
+ move |imp| handler.src_activate(&PadSrcRef::new(inner_arc), imp),
)
});
@@ -427,15 +427,8 @@ impl PadSrc {
},
move |imp| {
let this_ref = PadSrcRef::new(inner_arc);
- let element = imp.instance();
this_ref.activate_mode_hook(mode, active)?;
- handler.src_activatemode(
- &this_ref,
- imp,
- element.dynamic_cast_ref::<gst::Element>().unwrap(),
- mode,
- active,
- )
+ handler.src_activatemode(&this_ref, imp, mode, active)
},
)
});
@@ -451,16 +444,7 @@ impl PadSrc {
H::ElementImpl::catch_panic_pad_function(
parent,
|| Err(FlowError::Error),
- move |imp| {
- let this_ref = PadSrcRef::new(inner_arc);
- let element = imp.instance();
- handler.src_event_full(
- &this_ref,
- imp,
- element.dynamic_cast_ref::<gst::Element>().unwrap(),
- event,
- )
- },
+ move |imp| handler.src_event_full(&PadSrcRef::new(inner_arc), imp, event),
)
});
@@ -473,12 +457,10 @@ impl PadSrc {
parent,
|| false,
move |imp| {
- let this_ref = PadSrcRef::new(inner_arc);
- let element = imp.instance();
if !query.is_serialized() {
- handler.src_query(&this_ref, imp, element.dynamic_cast_ref::<gst::Element>().unwrap(), query)
+ handler.src_query(&PadSrcRef::new(inner_arc), imp, query)
} else {
- gst::fixme!(RUNTIME_CAT, obj: this_ref.gst_pad(), "Serialized Query not supported");
+ gst::fixme!(RUNTIME_CAT, obj: inner_arc.gst_pad(), "Serialized Query not supported");
false
}
},
@@ -525,15 +507,13 @@ impl Deref for PadSrc {
/// [`PadSink`]: struct.PadSink.html
/// [`pad` module]: index.html
pub trait PadSinkHandler: Clone + Send + Sync + 'static {
+ // FIXME we should use a GAT here: ObjectSubclass<Type: IsA<gst::Element> + Send>
type ElementImpl: ElementImpl + ObjectSubclass;
- // FIXME: Once associated type bounds are stable we should use ObjectSubclass::Type below
- // instead of &gst::Element
fn sink_activate(
&self,
pad: &PadSinkRef,
_imp: &Self::ElementImpl,
- _element: &gst::Element,
) -> Result<(), gst::LoggableError> {
let gst_pad = pad.gst_pad();
if gst_pad.is_active() {
@@ -563,7 +543,6 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
&self,
_pad: &PadSinkRef,
_imp: &Self::ElementImpl,
- _element: &gst::Element,
_mode: gst::PadMode,
_active: bool,
) -> Result<(), gst::LoggableError> {
@@ -571,50 +550,52 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
}
fn sink_chain(
- &self,
- _pad: &PadSinkRef,
- _imp: &Self::ElementImpl,
- _element: &gst::Element,
+ self,
+ _pad: PadSinkWeak,
+ _elem: <Self::ElementImpl as ObjectSubclass>::Type,
_buffer: gst::Buffer,
) -> BoxFuture<'static, Result<FlowSuccess, FlowError>> {
future::err(FlowError::NotSupported).boxed()
}
fn sink_chain_list(
- &self,
- _pad: &PadSinkRef,
- _imp: &Self::ElementImpl,
- _element: &gst::Element,
+ self,
+ _pad: PadSinkWeak,
+ _elem: <Self::ElementImpl as ObjectSubclass>::Type,
_buffer_list: gst::BufferList,
) -> BoxFuture<'static, Result<FlowSuccess, FlowError>> {
future::err(FlowError::NotSupported).boxed()
}
- fn sink_event(
- &self,
- pad: &PadSinkRef,
- _imp: &Self::ElementImpl,
- element: &gst::Element,
- event: gst::Event,
- ) -> bool {
+ fn sink_event(&self, pad: &PadSinkRef, imp: &Self::ElementImpl, event: gst::Event) -> bool {
assert!(!event.is_serialized());
gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
+
+ let elem = imp.instance();
+ // FIXME with GAT on `Self::ElementImpl`, we should be able to
+ // use `.upcast::<gst::Element>()`
+ //
+ // Safety: `Self::ElementImpl` is bound to `gst::subclass::ElementImpl`.
+ let element = unsafe { elem.unsafe_cast_ref::<gst::Element>() };
+
pad.gst_pad().event_default(Some(element), event)
}
fn sink_event_serialized(
- &self,
- pad: &PadSinkRef,
- _imp: &Self::ElementImpl,
- element: &gst::Element,
+ self,
+ pad: PadSinkWeak,
+ elem: <Self::ElementImpl as ObjectSubclass>::Type,
event: gst::Event,
) -> BoxFuture<'static, bool> {
assert!(event.is_serialized());
- let pad_weak = pad.downgrade();
- let element = element.clone();
+ // FIXME with GAT on `Self::ElementImpl`, we should be able to
+ // use `.upcast::<gst::Element>()`
+ //
+ // Safety: `Self::ElementImpl` is bound to `gst::subclass::ElementImpl`.
+ let element = unsafe { elem.unsafe_cast::<gst::Element>() };
async move {
- let pad = pad_weak.upgrade().expect("PadSink no longer exists");
+ let pad = pad.upgrade().expect("PadSink no longer exists");
gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
pad.gst_pad().event_default(Some(&element), event)
@@ -626,21 +607,19 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
&self,
pad: &PadSinkRef,
imp: &Self::ElementImpl,
- element: &gst::Element,
event: gst::Event,
) -> Result<FlowSuccess, FlowError> {
assert!(!event.is_serialized());
// default is to dispatch to `sink_event`
// (as implemented in `gst_pad_send_event_unchecked`)
let event_type = event.type_();
- event_to_event_full(self.sink_event(pad, imp, element, event), event_type)
+ event_to_event_full(self.sink_event(pad, imp, event), event_type)
}
fn sink_event_full_serialized(
- &self,
- pad: &PadSinkRef,
- imp: &Self::ElementImpl,
- element: &gst::Element,
+ self,
+ pad: PadSinkWeak,
+ elem: <Self::ElementImpl as ObjectSubclass>::Type,
event: gst::Event,
) -> BoxFuture<'static, Result<FlowSuccess, FlowError>> {
assert!(event.is_serialized());
@@ -648,7 +627,7 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
// (as implemented in `gst_pad_send_event_unchecked`)
let event_type = event.type_();
event_to_event_full_serialized(
- self.sink_event_serialized(pad, imp, element, event),
+ Self::sink_event_serialized(self, pad, elem, event),
event_type,
)
}
@@ -656,8 +635,7 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
fn sink_query(
&self,
pad: &PadSinkRef,
- _imp: &Self::ElementImpl,
- element: &gst::Element,
+ imp: &Self::ElementImpl,
query: &mut gst::QueryRef,
) -> bool {
if query.is_serialized() {
@@ -667,6 +645,14 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
false
} else {
gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", query);
+
+ let elem = imp.instance();
+ // FIXME with GAT on `Self::ElementImpl`, we should be able to
+ // use `.upcast::<gst::Element>()`
+ //
+ // Safety: `Self::ElementImpl` is bound to `gst::subclass::ElementImpl`.
+ let element = unsafe { elem.unsafe_cast_ref::<gst::Element>() };
+
pad.gst_pad().query_default(Some(element), query)
}
}
@@ -778,13 +764,6 @@ impl<'a> Deref for PadSinkRef<'a> {
pub struct PadSink(Arc<PadSinkInner>);
impl PadSink {
- pub fn new(gst_pad: gst::Pad, handler: impl PadSinkHandler) -> Self {
- let this = PadSink(Arc::new(PadSinkInner::new(gst_pad)));
- this.init_pad_functions(handler);
-
- this
- }
-
pub fn downgrade(&self) -> PadSinkWeak {
PadSinkWeak(Arc::downgrade(&self.0))
}
@@ -792,9 +771,25 @@ impl PadSink {
pub fn as_ref(&self) -> PadSinkRef<'_> {
PadSinkRef::new(Arc::clone(&self.0))
}
+}
- fn init_pad_functions<H: PadSinkHandler>(&self, handler: H) {
- // FIXME: Do this better
+impl PadSink {
+ pub fn new<H>(gst_pad: gst::Pad, handler: H) -> Self
+ where
+ H: PadSinkHandler,
+ <H::ElementImpl as ObjectSubclass>::Type: IsA<gst::Element> + Send,
+ {
+ let this = PadSink(Arc::new(PadSinkInner::new(gst_pad)));
+ this.init_pad_functions(handler);
+
+ this
+ }
+
+ fn init_pad_functions<H>(&self, handler: H)
+ where
+ H: PadSinkHandler,
+ <H::ElementImpl as ObjectSubclass>::Type: IsA<gst::Element> + Send,
+ {
unsafe {
let handler_clone = handler.clone();
let inner_arc = Arc::clone(&self.0);
@@ -802,6 +797,7 @@ impl PadSink {
.set_activate_function(move |gst_pad, parent| {
let handler = handler_clone.clone();
let inner_arc = inner_arc.clone();
+
H::ElementImpl::catch_panic_pad_function(
parent,
|| {
@@ -811,15 +807,7 @@ impl PadSink {
"Panic in PadSink activate"
))
},
- move |imp| {
- let this_ref = PadSinkRef::new(inner_arc);
- let element = imp.instance();
- handler.sink_activate(
- &this_ref,
- imp,
- element.dynamic_cast_ref::<gst::Element>().unwrap(),
- )
- },
+ move |imp| handler.sink_activate(&PadSinkRef::new(inner_arc), imp),
)
});
@@ -840,16 +828,8 @@ impl PadSink {
},
move |imp| {
let this_ref = PadSinkRef::new(inner_arc);
- let element = imp.instance();
this_ref.activate_mode_hook(mode, active)?;
-
- handler.sink_activatemode(
- &this_ref,
- imp,
- element.dynamic_cast_ref::<gst::Element>().unwrap(),
- mode,
- active,
- )
+ handler.sink_activatemode(&this_ref, imp, mode, active)
},
)
});
@@ -864,32 +844,19 @@ impl PadSink {
parent,
|| Err(FlowError::Error),
move |imp| {
- let element = imp.instance();
+ let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc));
+ let elem = imp.instance().clone();
+
if let Some((ctx, task_id)) = Context::current_task() {
- let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc));
- let handler = handler.clone();
- let element =
- element.clone().dynamic_cast::<gst::Element>().unwrap();
let delayed_fut = async move {
- let imp = <H::ElementImpl as ObjectSubclassExt>::from_instance(
- element.unsafe_cast_ref(),
- );
- let this_ref =
- this_weak.upgrade().ok_or(gst::FlowError::Flushing)?;
- handler.sink_chain(&this_ref, imp, &element, buffer).await
+ H::sink_chain(handler, this_weak, elem, buffer).await
};
let _ =
ctx.add_sub_task(task_id, delayed_fut.map(|res| res.map(drop)));
Ok(gst::FlowSuccess::Ok)
} else {
- let this_ref = PadSinkRef::new(inner_arc);
- let chain_fut = handler.sink_chain(
- &this_ref,
- imp,
- element.dynamic_cast_ref::<gst::Element>().unwrap(),
- buffer,
- );
+ let chain_fut = H::sink_chain(handler, this_weak, elem, buffer);
executor::block_on(chain_fut)
}
},
@@ -906,34 +873,20 @@ impl PadSink {
parent,
|| Err(FlowError::Error),
move |imp| {
- let element = imp.instance();
+ let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc));
+ let elem = imp.instance().clone();
+
if let Some((ctx, task_id)) = Context::current_task() {
- let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc));
- let handler = handler.clone();
- let element =
- element.clone().dynamic_cast::<gst::Element>().unwrap();
let delayed_fut = async move {
- let imp = <H::ElementImpl as ObjectSubclassExt>::from_instance(
- element.unsafe_cast_ref(),
- );
- let this_ref =
- this_weak.upgrade().ok_or(gst::FlowError::Flushing)?;
- handler
- .sink_chain_list(&this_ref, imp, &element, list)
- .await
+ H::sink_chain_list(handler, this_weak, elem, list).await
};
let _ =
ctx.add_sub_task(task_id, delayed_fut.map(|res| res.map(drop)));
Ok(gst::FlowSuccess::Ok)
} else {
- let this_ref = PadSinkRef::new(inner_arc);
- let chain_list_fut = handler.sink_chain_list(
- &this_ref,
- imp,
- element.dynamic_cast_ref::<gst::Element>().unwrap(),
- list,
- );
+ let chain_list_fut =
+ H::sink_chain_list(handler, this_weak, elem, list);
executor::block_on(chain_list_fut)
}
},
@@ -952,26 +905,16 @@ impl PadSink {
parent,
|| Err(FlowError::Error),
move |imp| {
- let element = imp.instance();
if event.is_serialized() {
+ let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc));
+ let elem = imp.instance().clone();
+
if let Some((ctx, task_id)) = Context::current_task() {
- let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc));
- let handler = handler.clone();
- let element =
- element.clone().dynamic_cast::<gst::Element>().unwrap();
let delayed_fut = async move {
- let imp =
- <H::ElementImpl as ObjectSubclassExt>::from_instance(
- element.unsafe_cast_ref(),
- );
- let this_ref =
- this_weak.upgrade().ok_or(gst::FlowError::Flushing)?;
-
- handler
- .sink_event_full_serialized(
- &this_ref, imp, &element, event,
- )
- .await
+ H::sink_event_full_serialized(
+ handler, this_weak, elem, event,
+ )
+ .await
};
let _ = ctx.add_sub_task(
task_id,
@@ -980,23 +923,13 @@ impl PadSink {
Ok(gst::FlowSuccess::Ok)
} else {
- let this_ref = PadSinkRef::new(inner_arc);
- let event_fut = handler.sink_event_full_serialized(
- &this_ref,
- imp,
- element.dynamic_cast_ref::<gst::Element>().unwrap(),
- event,
+ let event_fut = H::sink_event_full_serialized(
+ handler, this_weak, elem, event,
);
executor::block_on(event_fut)
}
} else {
- let this_ref = PadSinkRef::new(inner_arc);
- handler.sink_event_full(
- &this_ref,
- imp,
- element.dynamic_cast_ref::<gst::Element>().unwrap(),
- event,
- )
+ handler.sink_event_full(&PadSinkRef::new(inner_arc), imp, event)
}
},
)
@@ -1011,12 +944,10 @@ impl PadSink {
parent,
|| false,
move |imp| {
- let this_ref = PadSinkRef::new(inner_arc);
- let element = imp.instance();
if !query.is_serialized() {
- handler.sink_query(&this_ref, imp, element.dynamic_cast_ref::<gst::Element>().unwrap(), query)
+ handler.sink_query(&PadSinkRef::new(inner_arc), imp, query)
} else {
- gst::fixme!(RUNTIME_CAT, obj: this_ref.gst_pad(), "Serialized Query not supported");
+ gst::fixme!(RUNTIME_CAT, obj: inner_arc.gst_pad(), "Serialized Query not supported");
false
}
},
diff --git a/generic/threadshare/src/tcpclientsrc/imp.rs b/generic/threadshare/src/tcpclientsrc/imp.rs
index 41cdaced3..f616407d3 100644
--- a/generic/threadshare/src/tcpclientsrc/imp.rs
+++ b/generic/threadshare/src/tcpclientsrc/imp.rs
@@ -96,28 +96,13 @@ struct TcpClientSrcPadHandler;
impl PadSrcHandler for TcpClientSrcPadHandler {
type ElementImpl = TcpClientSrc;
- fn src_event(
- &self,
- pad: &PadSrcRef,
- tcpclientsrc: &TcpClientSrc,
- _element: &gst::Element,
- event: gst::Event,
- ) -> bool {
- use gst::EventView;
-
+ fn src_event(&self, pad: &PadSrcRef, imp: &TcpClientSrc, event: gst::Event) -> bool {
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
+ use gst::EventView;
let ret = match event.view() {
- EventView::FlushStart(..) => tcpclientsrc
- .task
- .flush_start()
- .await_maybe_on_context()
- .is_ok(),
- EventView::FlushStop(..) => tcpclientsrc
- .task
- .flush_stop()
- .await_maybe_on_context()
- .is_ok(),
+ EventView::FlushStart(..) => imp.task.flush_start().await_maybe_on_context().is_ok(),
+ EventView::FlushStop(..) => imp.task.flush_stop().await_maybe_on_context().is_ok(),
EventView::Reconfigure(..) => true,
EventView::Latency(..) => true,
_ => false,
@@ -132,16 +117,10 @@ impl PadSrcHandler for TcpClientSrcPadHandler {
ret
}
- fn src_query(
- &self,
- pad: &PadSrcRef,
- tcpclientsrc: &TcpClientSrc,
- _element: &gst::Element,
- query: &mut gst::QueryRef,
- ) -> bool {
- use gst::QueryViewMut;
-
+ fn src_query(&self, pad: &PadSrcRef, imp: &TcpClientSrc, query: &mut gst::QueryRef) -> bool {
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query);
+
+ use gst::QueryViewMut;
let ret = match query.view_mut() {
QueryViewMut::Latency(q) => {
q.set(false, gst::ClockTime::ZERO, gst::ClockTime::NONE);
@@ -153,8 +132,7 @@ impl PadSrcHandler for TcpClientSrcPadHandler {
true
}
QueryViewMut::Caps(q) => {
- let caps = if let Some(caps) = tcpclientsrc.configured_caps.lock().unwrap().as_ref()
- {
+ let caps = if let Some(caps) = imp.configured_caps.lock().unwrap().as_ref() {
q.filter()
.map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First))
.unwrap_or_else(|| caps.clone())
diff --git a/generic/threadshare/src/udpsink/imp.rs b/generic/threadshare/src/udpsink/imp.rs
index b99e432c3..efb0da6ed 100644
--- a/generic/threadshare/src/udpsink/imp.rs
+++ b/generic/threadshare/src/udpsink/imp.rs
@@ -30,7 +30,7 @@ use gst::{element_error, error_msg};
use once_cell::sync::Lazy;
use crate::runtime::prelude::*;
-use crate::runtime::{self, Async, Context, PadSink, PadSinkRef, Task};
+use crate::runtime::{self, Async, Context, PadSink, PadSinkRef, PadSinkWeak, Task};
use crate::socket::{wrap_socket, GioSocketWrapper};
use std::collections::BTreeSet;
@@ -133,18 +133,15 @@ impl PadSinkHandler for UdpSinkPadHandler {
type ElementImpl = UdpSink;
fn sink_chain(
- &self,
- _pad: &PadSinkRef,
- udpsink: &UdpSink,
- element: &gst::Element,
+ self,
+ _pad: PadSinkWeak,
+ elem: super::UdpSink,
buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
- let sender = udpsink.clone_item_sender();
- let element = element.clone().downcast::<super::UdpSink>().unwrap();
-
+ let sender = elem.imp().clone_item_sender();
async move {
if sender.send_async(TaskItem::Buffer(buffer)).await.is_err() {
- gst::debug!(CAT, obj: &element, "Flushing");
+ gst::debug!(CAT, obj: &elem, "Flushing");
return Err(gst::FlowError::Flushing);
}
@@ -154,19 +151,16 @@ impl PadSinkHandler for UdpSinkPadHandler {
}
fn sink_chain_list(
- &self,
- _pad: &PadSinkRef,
- udpsink: &UdpSink,
- element: &gst::Element,
+ self,
+ _pad: PadSinkWeak,
+ elem: super::UdpSink,
list: gst::BufferList,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
- let sender = udpsink.clone_item_sender();
- let element = element.clone().downcast::<super::UdpSink>().unwrap();
-
+ let sender = elem.imp().clone_item_sender();
async move {
for buffer in list.iter_owned() {
if sender.send_async(TaskItem::Buffer(buffer)).await.is_err() {
- gst::debug!(CAT, obj: &element, "Flushing");
+ gst::debug!(CAT, obj: &elem, "Flushing");
return Err(gst::FlowError::Flushing);
}
}
@@ -177,21 +171,18 @@ impl PadSinkHandler for UdpSinkPadHandler {
}
fn sink_event_serialized(
- &self,
- _pad: &PadSinkRef,
- udpsink: &UdpSink,
- element: &gst::Element,
+ self,
+ _pad: PadSinkWeak,
+ elem: super::UdpSink,
event: gst::Event,
) -> BoxFuture<'static, bool> {
- let sender = udpsink.clone_item_sender();
- let element = element.clone().downcast::<super::UdpSink>().unwrap();
-
+ let sender = elem.imp().clone_item_sender();
async move {
if let EventView::FlushStop(_) = event.view() {
- let udpsink = element.imp();
- return udpsink.task.flush_stop().await_maybe_on_context().is_ok();
+ let imp = elem.imp();
+ return imp.task.flush_stop().await_maybe_on_context().is_ok();
} else if sender.send_async(TaskItem::Event(event)).await.is_err() {
- gst::debug!(CAT, obj: &element, "Flushing");
+ gst::debug!(CAT, obj: &elem, "Flushing");
}
true
@@ -199,15 +190,9 @@ impl PadSinkHandler for UdpSinkPadHandler {
.boxed()
}
- fn sink_event(
- &self,
- _pad: &PadSinkRef,
- udpsink: &UdpSink,
- _element: &gst::Element,
- event: gst::Event,
- ) -> bool {
+ fn sink_event(&self, _pad: &PadSinkRef, imp: &UdpSink, event: gst::Event) -> bool {
if let EventView::FlushStart(..) = event.view() {
- return udpsink.task.flush_start().await_maybe_on_context().is_ok();
+ return imp.task.flush_start().await_maybe_on_context().is_ok();
}
true
diff --git a/generic/threadshare/src/udpsrc/imp.rs b/generic/threadshare/src/udpsrc/imp.rs
index 5c7b79bf6..f6105d0b8 100644
--- a/generic/threadshare/src/udpsrc/imp.rs
+++ b/generic/threadshare/src/udpsrc/imp.rs
@@ -113,20 +113,13 @@ struct UdpSrcPadHandler;
impl PadSrcHandler for UdpSrcPadHandler {
type ElementImpl = UdpSrc;
- fn src_event(
- &self,
- pad: &PadSrcRef,
- udpsrc: &UdpSrc,
- _element: &gst::Element,
- event: gst::Event,
- ) -> bool {
- use gst::EventView;
-
+ fn src_event(&self, pad: &PadSrcRef, imp: &UdpSrc, event: gst::Event) -> bool {
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
+ use gst::EventView;
let ret = match event.view() {
- EventView::FlushStart(..) => udpsrc.task.flush_start().await_maybe_on_context().is_ok(),
- EventView::FlushStop(..) => udpsrc.task.flush_stop().await_maybe_on_context().is_ok(),
+ EventView::FlushStart(..) => imp.task.flush_start().await_maybe_on_context().is_ok(),
+ EventView::FlushStop(..) => imp.task.flush_stop().await_maybe_on_context().is_ok(),
EventView::Reconfigure(..) => true,
EventView::Latency(..) => true,
_ => false,
@@ -141,17 +134,10 @@ impl PadSrcHandler for UdpSrcPadHandler {
ret
}
- fn src_query(
- &self,
- pad: &PadSrcRef,
- udpsrc: &UdpSrc,
- _element: &gst::Element,
- query: &mut gst::QueryRef,
- ) -> bool {
- use gst::QueryViewMut;
-
+ fn src_query(&self, pad: &PadSrcRef, imp: &UdpSrc, query: &mut gst::QueryRef) -> bool {
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query);
+ use gst::QueryViewMut;
let ret = match query.view_mut() {
QueryViewMut::Latency(q) => {
q.set(true, gst::ClockTime::ZERO, gst::ClockTime::NONE);
@@ -163,7 +149,7 @@ impl PadSrcHandler for UdpSrcPadHandler {
true
}
QueryViewMut::Caps(q) => {
- let caps = if let Some(caps) = udpsrc.configured_caps.lock().unwrap().as_ref() {
+ let caps = if let Some(caps) = imp.configured_caps.lock().unwrap().as_ref() {
q.filter()
.map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First))
.unwrap_or_else(|| caps.clone())
diff --git a/generic/threadshare/tests/pad.rs b/generic/threadshare/tests/pad.rs
index 312d28208..e1ca62f32 100644
--- a/generic/threadshare/tests/pad.rs
+++ b/generic/threadshare/tests/pad.rs
@@ -36,7 +36,9 @@ use std::sync::Mutex;
use std::time::Duration;
use gstthreadshare::runtime::prelude::*;
-use gstthreadshare::runtime::{Context, PadSink, PadSinkRef, PadSrc, PadSrcRef, Task, TaskState};
+use gstthreadshare::runtime::{
+ Context, PadSink, PadSinkRef, PadSinkWeak, PadSrc, PadSrcRef, Task, TaskState,
+};
const DEFAULT_CONTEXT: &str = "";
const THROTTLING_DURATION: Duration = Duration::from_millis(2);
@@ -87,27 +89,15 @@ mod imp_src {
impl PadSrcHandler for PadSrcTestHandler {
type ElementImpl = ElementSrcTest;
- fn src_event(
- &self,
- pad: &PadSrcRef,
- elem_src_test: &ElementSrcTest,
- _element: &gst::Element,
- event: gst::Event,
- ) -> bool {
+ fn src_event(&self, pad: &PadSrcRef, imp: &ElementSrcTest, event: gst::Event) -> bool {
gst::log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
let ret = match event.view() {
- EventView::FlushStart(..) => elem_src_test
- .task
- .flush_start()
- .await_maybe_on_context()
- .is_ok(),
+ EventView::FlushStart(..) => {
+ imp.task.flush_start().await_maybe_on_context().is_ok()
+ }
EventView::Qos(..) | EventView::Reconfigure(..) | EventView::Latency(..) => true,
- EventView::FlushStop(..) => elem_src_test
- .task
- .flush_stop()
- .await_maybe_on_context()
- .is_ok(),
+ EventView::FlushStop(..) => imp.task.flush_stop().await_maybe_on_context().is_ok(),
_ => false,
};
@@ -337,6 +327,7 @@ mod imp_src {
let obj = self.instance();
obj.add_pad(self.src_pad.gst_pad()).unwrap();
+ obj.set_element_flags(gst::ElementFlags::SOURCE);
}
}
@@ -449,53 +440,37 @@ mod imp_sink {
type ElementImpl = ElementSinkTest;
fn sink_chain(
- &self,
- _pad: &PadSinkRef,
- _elem_sink_test: &ElementSinkTest,
- element: &gst::Element,
+ self,
+ _pad: PadSinkWeak,
+ elem: super::ElementSinkTest,
buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
- let element = element
- .clone()
- .downcast::<super::ElementSinkTest>()
- .unwrap();
async move {
- let elem_sink_test = element.imp();
- elem_sink_test.forward_item(Item::Buffer(buffer)).await
+ let imp = elem.imp();
+ imp.forward_item(Item::Buffer(buffer)).await
}
.boxed()
}
fn sink_chain_list(
- &self,
- _pad: &PadSinkRef,
- _elem_sink_test: &ElementSinkTest,
- element: &gst::Element,
+ self,
+ _pad: PadSinkWeak,
+ elem: super::ElementSinkTest,
list: gst::BufferList,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
- let element = element
- .clone()
- .downcast::<super::ElementSinkTest>()
- .unwrap();
async move {
- let elem_sink_test = element.imp();
- elem_sink_test.forward_item(Item::BufferList(list)).await
+ let imp = elem.imp();
+ imp.forward_item(Item::BufferList(list)).await
}
.boxed()
}
- fn sink_event(
- &self,
- pad: &PadSinkRef,
- elem_sink_test: &ElementSinkTest,
- _element: &gst::Element,
- event: gst::Event,
- ) -> bool {
+ fn sink_event(&self, pad: &PadSinkRef, imp: &ElementSinkTest, event: gst::Event) -> bool {
gst::debug!(SINK_CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event);
match event.view() {
EventView::FlushStart(..) => {
- elem_sink_test.stop();
+ imp.stop();
true
}
_ => false,
@@ -503,29 +478,21 @@ mod imp_sink {
}
fn sink_event_serialized(
- &self,
- pad: &PadSinkRef,
- _elem_sink_test: &ElementSinkTest,
- element: &gst::Element,
+ self,
+ pad: PadSinkWeak,
+ elem: super::ElementSinkTest,
event: gst::Event,
) -> BoxFuture<'static, bool> {
- gst::log!(SINK_CAT, obj: pad.gst_pad(), "Handling serialized {:?}", event);
-
- let element = element
- .clone()
- .downcast::<super::ElementSinkTest>()
- .unwrap();
async move {
- let elem_sink_test = element.imp();
+ let pad = pad.upgrade().expect("PadSink no longer exists");
+ gst::log!(SINK_CAT, obj: pad.gst_pad(), "Handling serialized {:?}", event);
+ let imp = elem.imp();
if let EventView::FlushStop(..) = event.view() {
- elem_sink_test.start();
+ imp.start();
}
- elem_sink_test
- .forward_item(Item::Event(event))
- .await
- .is_ok()
+ imp.forward_item(Item::Event(event)).await.is_ok()
}
.boxed()
}
@@ -652,6 +619,7 @@ mod imp_sink {
let obj = self.instance();
obj.add_pad(self.sink_pad.gst_pad()).unwrap();
+ obj.set_element_flags(gst::ElementFlags::SINK);
}
}