Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrançois Laignel <fengalin@free.fr>2022-10-24 14:15:13 +0300
committerFrançois Laignel <fengalin@free.fr>2022-10-24 14:15:13 +0300
commit5ca033049ea3e23c48a630cd449bf3fdd3e1cb85 (patch)
tree9aaf39f03b94023231bbb4e5de6482a9451fd4ff /generic
parent554ce7e7d607648264a3158f1d1e031460114992 (diff)
ts/pad: use `gst::Pad` in handlers trait functions...
... instead of the `Pad{Src,Sink}Ref` wrappers: - In practice, only the `gst::Pad` is useful in these functions. Some of these which need a `Pad{Src,Sink}Ref`, but it's the one for the opposite stream direction. In those cases, it is accessed via the element's implementation. - It saves a few `clone`s. - The implementations usually use the `gst::Pad` for logging. They no longer need to access it via `pad.gst_pad()`.
Diffstat (limited to 'generic')
-rw-r--r--generic/threadshare/examples/standalone/sink/imp.rs10
-rw-r--r--generic/threadshare/src/appsrc/imp.rs18
-rw-r--r--generic/threadshare/src/inputselector/imp.rs39
-rw-r--r--generic/threadshare/src/jitterbuffer/imp.rs45
-rw-r--r--generic/threadshare/src/proxy/imp.rs56
-rw-r--r--generic/threadshare/src/queue/imp.rs57
-rw-r--r--generic/threadshare/src/runtime/pad.rs399
-rw-r--r--generic/threadshare/src/tcpclientsrc/imp.rs18
-rw-r--r--generic/threadshare/src/udpsink/imp.rs10
-rw-r--r--generic/threadshare/src/udpsrc/imp.rs18
-rw-r--r--generic/threadshare/tests/pad.rs25
11 files changed, 332 insertions, 363 deletions
diff --git a/generic/threadshare/examples/standalone/sink/imp.rs b/generic/threadshare/examples/standalone/sink/imp.rs
index 5ef839c09..97a7dae26 100644
--- a/generic/threadshare/examples/standalone/sink/imp.rs
+++ b/generic/threadshare/examples/standalone/sink/imp.rs
@@ -19,7 +19,7 @@ use gst::EventView;
use once_cell::sync::Lazy;
use gstthreadshare::runtime::prelude::*;
-use gstthreadshare::runtime::{Context, PadSink, PadSinkRef, PadSinkWeak, Task};
+use gstthreadshare::runtime::{Context, PadSink, Task};
use std::sync::Mutex;
use std::task::Poll;
@@ -77,7 +77,7 @@ impl PadSinkHandler for TestSinkPadHandler {
fn sink_chain(
self,
- _pad: PadSinkWeak,
+ _pad: gst::Pad,
elem: super::TestSink,
buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
@@ -95,7 +95,7 @@ impl PadSinkHandler for TestSinkPadHandler {
fn sink_chain_list(
self,
- _pad: PadSinkWeak,
+ _pad: gst::Pad,
elem: super::TestSink,
list: gst::BufferList,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
@@ -115,7 +115,7 @@ impl PadSinkHandler for TestSinkPadHandler {
fn sink_event_serialized(
self,
- _pad: PadSinkWeak,
+ _pad: gst::Pad,
elem: super::TestSink,
event: gst::Event,
) -> BoxFuture<'static, bool> {
@@ -133,7 +133,7 @@ impl PadSinkHandler for TestSinkPadHandler {
.boxed()
}
- fn sink_event(self, _pad: &PadSinkRef, imp: &TestSink, event: gst::Event) -> bool {
+ fn sink_event(self, _pad: &gst::Pad, imp: &TestSink, event: gst::Event) -> bool {
if let EventView::FlushStart(..) = event.view() {
return imp.task.flush_start().await_maybe_on_context().is_ok();
}
diff --git a/generic/threadshare/src/appsrc/imp.rs b/generic/threadshare/src/appsrc/imp.rs
index ff4a8b98c..ceb79201c 100644
--- a/generic/threadshare/src/appsrc/imp.rs
+++ b/generic/threadshare/src/appsrc/imp.rs
@@ -33,7 +33,7 @@ use std::time::Duration;
use std::u32;
use crate::runtime::prelude::*;
-use crate::runtime::{Context, PadSrc, PadSrcRef, Task, TaskState};
+use crate::runtime::{Context, PadSrc, Task, TaskState};
const DEFAULT_CONTEXT: &str = "";
const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO;
@@ -82,8 +82,8 @@ struct AppSrcPadHandler;
impl PadSrcHandler for AppSrcPadHandler {
type ElementImpl = AppSrc;
- fn src_event(self, pad: &PadSrcRef, imp: &AppSrc, event: gst::Event) -> bool {
- gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
+ fn src_event(self, pad: &gst::Pad, imp: &AppSrc, event: gst::Event) -> bool {
+ gst::log!(CAT, obj: pad, "Handling {:?}", event);
use gst::EventView;
let ret = match event.view() {
@@ -95,16 +95,16 @@ impl PadSrcHandler for AppSrcPadHandler {
};
if ret {
- gst::log!(CAT, obj: pad.gst_pad(), "Handled {:?}", event);
+ gst::log!(CAT, obj: pad, "Handled {:?}", event);
} else {
- gst::log!(CAT, obj: pad.gst_pad(), "Didn't handle {:?}", event);
+ gst::log!(CAT, obj: pad, "Didn't handle {:?}", event);
}
ret
}
- fn src_query(self, pad: &PadSrcRef, imp: &AppSrc, query: &mut gst::QueryRef) -> bool {
- gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query);
+ fn src_query(self, pad: &gst::Pad, imp: &AppSrc, query: &mut gst::QueryRef) -> bool {
+ gst::log!(CAT, obj: pad, "Handling {:?}", query);
use gst::QueryViewMut;
let ret = match query.view_mut() {
@@ -136,9 +136,9 @@ impl PadSrcHandler for AppSrcPadHandler {
};
if ret {
- gst::log!(CAT, obj: pad.gst_pad(), "Handled {:?}", query);
+ gst::log!(CAT, obj: pad, "Handled {:?}", query);
} else {
- gst::log!(CAT, obj: pad.gst_pad(), "Didn't handle {:?}", query);
+ gst::log!(CAT, obj: pad, "Didn't handle {:?}", query);
}
ret
}
diff --git a/generic/threadshare/src/inputselector/imp.rs b/generic/threadshare/src/inputselector/imp.rs
index f8618dbba..0455f2271 100644
--- a/generic/threadshare/src/inputselector/imp.rs
+++ b/generic/threadshare/src/inputselector/imp.rs
@@ -33,7 +33,7 @@ use std::time::Duration;
use std::u32;
use crate::runtime::prelude::*;
-use crate::runtime::{self, PadSink, PadSinkRef, PadSinkWeak, PadSrc, PadSrcRef};
+use crate::runtime::{self, PadSink, PadSrc};
const DEFAULT_CONTEXT: &str = "";
const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO;
@@ -88,7 +88,7 @@ impl InputSelectorPadSinkHandler {
async fn handle_item(
&self,
- pad: &PadSinkRef<'_>,
+ pad: &gst::Pad,
elem: &super::InputSelector,
mut buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
@@ -111,9 +111,9 @@ impl InputSelectorPadSinkHandler {
}
let is_active = {
- if state.active_sinkpad.as_ref() == Some(pad.gst_pad()) {
+ if state.active_sinkpad.as_ref() == Some(pad) {
if inner.send_sticky || state.switched_pad {
- pad.gst_pad().sticky_events_foreach(|event| {
+ pad.sticky_events_foreach(|event| {
use std::ops::ControlFlow;
stickies.push(event.clone());
ControlFlow::Continue(gst::EventForeachAction::Keep)
@@ -140,7 +140,7 @@ impl InputSelectorPadSinkHandler {
}
if is_active {
- gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", buffer);
+ gst::log!(CAT, obj: pad, "Forwarding {:?}", buffer);
if switched_pad && !buffer.flags().contains(gst::BufferFlags::DISCONT) {
let buffer = buffer.make_mut();
@@ -159,26 +159,21 @@ impl PadSinkHandler for InputSelectorPadSinkHandler {
fn sink_chain(
self,
- pad: PadSinkWeak,
+ pad: gst::Pad,
elem: super::InputSelector,
buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
- async move {
- let pad = pad.upgrade().expect("PadSink no longer exists");
- self.handle_item(&pad, &elem, buffer).await
- }
- .boxed()
+ async move { self.handle_item(&pad, &elem, buffer).await }.boxed()
}
fn sink_chain_list(
self,
- pad: PadSinkWeak,
+ pad: gst::Pad,
elem: super::InputSelector,
list: gst::BufferList,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
async move {
- let pad = pad.upgrade().expect("PadSink no longer exists");
- gst::log!(CAT, obj: pad.gst_pad(), "Handling buffer list {:?}", list);
+ gst::log!(CAT, obj: pad, "Handling buffer list {:?}", list);
// TODO: Ideally we would keep the list intact and forward it in one go
for buffer in list.iter_owned() {
self.handle_item(&pad, &elem, buffer).await?;
@@ -191,7 +186,7 @@ impl PadSinkHandler for InputSelectorPadSinkHandler {
fn sink_event_serialized(
self,
- _pad: PadSinkWeak,
+ _pad: gst::Pad,
_elem: super::InputSelector,
event: gst::Event,
) -> BoxFuture<'static, bool> {
@@ -219,7 +214,7 @@ impl PadSinkHandler for InputSelectorPadSinkHandler {
.boxed()
}
- fn sink_event(self, _pad: &PadSinkRef, imp: &InputSelector, event: gst::Event) -> bool {
+ fn sink_event(self, _pad: &gst::Pad, imp: &InputSelector, event: gst::Event) -> bool {
/* Drop all events for now */
if let gst::EventView::FlushStart(..) = event.view() {
/* Unblock downstream */
@@ -234,15 +229,15 @@ impl PadSinkHandler for InputSelectorPadSinkHandler {
true
}
- fn sink_query(self, pad: &PadSinkRef, imp: &InputSelector, query: &mut gst::QueryRef) -> bool {
- gst::log!(CAT, obj: pad.gst_pad(), "Handling query {:?}", query);
+ fn sink_query(self, pad: &gst::Pad, imp: &InputSelector, query: &mut gst::QueryRef) -> bool {
+ gst::log!(CAT, obj: pad, "Handling query {:?}", query);
if query.is_serialized() {
// FIXME: How can we do this (drops ALLOCATION and DRAIN)?
- gst::log!(CAT, obj: pad.gst_pad(), "Dropping serialized query {:?}", query);
+ gst::log!(CAT, obj: pad, "Dropping serialized query {:?}", query);
false
} else {
- gst::log!(CAT, obj: pad.gst_pad(), "Forwarding query {:?}", query);
+ gst::log!(CAT, obj: pad, "Forwarding query {:?}", query);
imp.src_pad.gst_pad().peer_query(query)
}
}
@@ -254,8 +249,8 @@ struct InputSelectorPadSrcHandler;
impl PadSrcHandler for InputSelectorPadSrcHandler {
type ElementImpl = InputSelector;
- fn src_query(self, pad: &PadSrcRef, imp: &InputSelector, query: &mut gst::QueryRef) -> bool {
- gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query);
+ fn src_query(self, pad: &gst::Pad, imp: &InputSelector, query: &mut gst::QueryRef) -> bool {
+ gst::log!(CAT, obj: pad, "Handling {:?}", query);
use gst::QueryViewMut;
match query.view_mut() {
diff --git a/generic/threadshare/src/jitterbuffer/imp.rs b/generic/threadshare/src/jitterbuffer/imp.rs
index 5dbd855f7..0b40eb02f 100644
--- a/generic/threadshare/src/jitterbuffer/imp.rs
+++ b/generic/threadshare/src/jitterbuffer/imp.rs
@@ -36,7 +36,7 @@ use std::sync::Mutex as StdMutex;
use std::time::Duration;
use crate::runtime::prelude::*;
-use crate::runtime::{self, Context, PadSink, PadSinkRef, PadSinkWeak, PadSrc, PadSrcRef, Task};
+use crate::runtime::{self, Context, PadSink, PadSrc, Task};
use super::jitterbuffer::{RTPJitterBuffer, RTPJitterBufferItem, RTPPacketRateCtx};
@@ -488,7 +488,7 @@ impl SinkHandler {
fn enqueue_item(
&self,
- pad: &gst::Pad,
+ pad: gst::Pad,
jb: &JitterBuffer,
buffer: Option<gst::Buffer>,
) -> Result<gst::FlowSuccess, gst::FlowError> {
@@ -501,7 +501,7 @@ impl SinkHandler {
// This is to avoid recursion with `store`, `reset` and `enqueue_item`
while let Some(buf) = buffers.pop_front() {
- if let Err(err) = self.store(&mut inner, pad, jb, buf) {
+ if let Err(err) = self.store(&mut inner, &pad, jb, buf) {
match err {
gst::FlowError::CustomError => {
for gap_packet in self.reset(&mut inner, jb) {
@@ -550,26 +550,25 @@ impl PadSinkHandler for SinkHandler {
fn sink_chain(
self,
- pad: PadSinkWeak,
+ pad: gst::Pad,
elem: super::JitterBuffer,
buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
async move {
- let pad = pad.upgrade().expect("PadSink no longer exists");
- gst::debug!(CAT, obj: pad.gst_pad(), "Handling {:?}", buffer);
- self.enqueue_item(pad.gst_pad(), elem.imp(), Some(buffer))
+ gst::debug!(CAT, obj: pad, "Handling {:?}", buffer);
+ self.enqueue_item(pad, elem.imp(), Some(buffer))
}
.boxed()
}
- fn sink_event(self, pad: &PadSinkRef, jb: &JitterBuffer, event: gst::Event) -> bool {
+ fn sink_event(self, pad: &gst::Pad, jb: &JitterBuffer, event: gst::Event) -> bool {
use gst::EventView;
- gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
+ gst::log!(CAT, obj: pad, "Handling {:?}", event);
if let EventView::FlushStart(..) = event.view() {
if let Err(err) = jb.task.flush_start().await_maybe_on_context() {
- gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
+ gst::error!(CAT, obj: pad, "FlushStart failed {:?}", err);
gst::element_imp_error!(
jb,
gst::StreamError::Failed,
@@ -580,20 +579,18 @@ impl PadSinkHandler for SinkHandler {
}
}
- gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", event);
+ gst::log!(CAT, obj: pad, "Forwarding {:?}", event);
jb.src_pad.gst_pad().push_event(event)
}
fn sink_event_serialized(
self,
- pad: PadSinkWeak,
+ pad: gst::Pad,
elem: super::JitterBuffer,
event: gst::Event,
) -> BoxFuture<'static, bool> {
async move {
- let pad = pad.upgrade().expect("PadSink no longer exists");
-
- gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
+ gst::log!(CAT, obj: pad, "Handling {:?}", event);
let jb = elem.imp();
@@ -606,7 +603,7 @@ impl PadSinkHandler for SinkHandler {
}
EventView::FlushStop(..) => {
if let Err(err) = jb.task.flush_stop().await_maybe_on_context() {
- gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
+ gst::error!(CAT, obj: pad, "FlushStop failed {:?}", err);
gst::element_error!(
elem,
gst::StreamError::Failed,
@@ -629,7 +626,7 @@ impl PadSinkHandler for SinkHandler {
if forward {
// FIXME: These events should really be queued up and stay in order
- gst::log!(CAT, obj: pad.gst_pad(), "Forwarding serialized {:?}", event);
+ gst::log!(CAT, obj: pad, "Forwarding serialized {:?}", event);
jb.src_pad.push_event(event).await
} else {
true
@@ -870,15 +867,15 @@ impl SrcHandler {
impl PadSrcHandler for SrcHandler {
type ElementImpl = JitterBuffer;
- fn src_event(self, pad: &PadSrcRef, jb: &JitterBuffer, event: gst::Event) -> bool {
+ fn src_event(self, pad: &gst::Pad, jb: &JitterBuffer, event: gst::Event) -> bool {
use gst::EventView;
- gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
+ gst::log!(CAT, obj: pad, "Handling {:?}", event);
match event.view() {
EventView::FlushStart(..) => {
if let Err(err) = jb.task.flush_start().await_maybe_on_context() {
- gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
+ gst::error!(CAT, obj: pad, "FlushStart failed {:?}", err);
gst::element_imp_error!(
jb,
gst::StreamError::Failed,
@@ -890,7 +887,7 @@ impl PadSrcHandler for SrcHandler {
}
EventView::FlushStop(..) => {
if let Err(err) = jb.task.flush_stop().await_maybe_on_context() {
- gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
+ gst::error!(CAT, obj: pad, "FlushStop failed {:?}", err);
gst::element_imp_error!(
jb,
gst::StreamError::Failed,
@@ -903,14 +900,14 @@ impl PadSrcHandler for SrcHandler {
_ => (),
}
- gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", event);
+ gst::log!(CAT, obj: pad, "Forwarding {:?}", event);
jb.sink_pad.gst_pad().push_event(event)
}
- fn src_query(self, pad: &PadSrcRef, jb: &JitterBuffer, query: &mut gst::QueryRef) -> bool {
+ fn src_query(self, pad: &gst::Pad, jb: &JitterBuffer, query: &mut gst::QueryRef) -> bool {
use gst::QueryViewMut;
- gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", query);
+ gst::log!(CAT, obj: pad, "Forwarding {:?}", query);
match query.view_mut() {
QueryViewMut::Latency(q) => {
diff --git a/generic/threadshare/src/proxy/imp.rs b/generic/threadshare/src/proxy/imp.rs
index 7a94da0b0..04acca05a 100644
--- a/generic/threadshare/src/proxy/imp.rs
+++ b/generic/threadshare/src/proxy/imp.rs
@@ -34,9 +34,7 @@ use std::time::Duration;
use std::{u32, u64};
use crate::runtime::prelude::*;
-use crate::runtime::{
- Context, PadSink, PadSinkRef, PadSinkWeak, PadSrc, PadSrcRef, PadSrcWeak, Task,
-};
+use crate::runtime::{Context, PadSink, PadSinkWeak, PadSrc, PadSrcWeak, Task};
use crate::dataqueue::{DataQueue, DataQueueItem};
@@ -215,13 +213,12 @@ impl PadSinkHandler for ProxySinkPadHandler {
fn sink_chain(
self,
- pad: PadSinkWeak,
+ pad: gst::Pad,
elem: super::ProxySink,
buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
async move {
- let pad = pad.upgrade().expect("PadSink no longer exists");
- gst::log!(SINK_CAT, obj: pad.gst_pad(), "Handling {:?}", buffer);
+ gst::log!(SINK_CAT, obj: pad, "Handling {:?}", buffer);
let imp = elem.imp();
imp.enqueue_item(DataQueueItem::Buffer(buffer)).await
}
@@ -230,21 +227,20 @@ impl PadSinkHandler for ProxySinkPadHandler {
fn sink_chain_list(
self,
- pad: PadSinkWeak,
+ pad: gst::Pad,
elem: super::ProxySink,
list: gst::BufferList,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
async move {
- let pad = pad.upgrade().expect("PadSink no longer exists");
- gst::log!(SINK_CAT, obj: pad.gst_pad(), "Handling {:?}", list);
+ gst::log!(SINK_CAT, obj: pad, "Handling {:?}", list);
let imp = elem.imp();
imp.enqueue_item(DataQueueItem::BufferList(list)).await
}
.boxed()
}
- fn sink_event(self, pad: &PadSinkRef, imp: &ProxySink, event: gst::Event) -> bool {
- gst::debug!(SINK_CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event);
+ fn sink_event(self, pad: &gst::Pad, imp: &ProxySink, event: gst::Event) -> bool {
+ gst::debug!(SINK_CAT, obj: pad, "Handling non-serialized {:?}", event);
let src_pad = {
let proxy_ctx = imp.proxy_ctx.lock().unwrap();
@@ -262,23 +258,27 @@ impl PadSinkHandler for ProxySinkPadHandler {
}
if let Some(src_pad) = src_pad {
- gst::log!(SINK_CAT, obj: pad.gst_pad(), "Forwarding non-serialized {:?}", event);
+ gst::log!(SINK_CAT, obj: pad, "Forwarding non-serialized {:?}", event);
src_pad.push_event(event)
} else {
- gst::error!(SINK_CAT, obj: pad.gst_pad(), "No src pad to forward non-serialized {:?} to", event);
+ gst::error!(
+ SINK_CAT,
+ obj: pad,
+ "No src pad to forward non-serialized {:?} to",
+ event
+ );
true
}
}
fn sink_event_serialized(
self,
- pad: PadSinkWeak,
+ pad: gst::Pad,
elem: super::ProxySink,
event: gst::Event,
) -> BoxFuture<'static, bool> {
async move {
- let pad = pad.upgrade().expect("PadSink no longer exists");
- gst::log!(SINK_CAT, obj: pad.gst_pad(), "Handling serialized {:?}", event);
+ gst::log!(SINK_CAT, obj: pad, "Handling serialized {:?}", event);
let imp = elem.imp();
@@ -291,7 +291,7 @@ impl PadSinkHandler for ProxySinkPadHandler {
_ => (),
}
- gst::log!(SINK_CAT, obj: pad.gst_pad(), "Queuing serialized {:?}", event);
+ gst::log!(SINK_CAT, obj: pad, "Queuing serialized {:?}", event);
imp.enqueue_item(DataQueueItem::Event(event)).await.is_ok()
}
.boxed()
@@ -666,8 +666,8 @@ struct ProxySrcPadHandler;
impl PadSrcHandler for ProxySrcPadHandler {
type ElementImpl = ProxySrc;
- fn src_event(self, pad: &PadSrcRef, imp: &ProxySrc, event: gst::Event) -> bool {
- gst::log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
+ fn src_event(self, pad: &gst::Pad, imp: &ProxySrc, event: gst::Event) -> bool {
+ gst::log!(SRC_CAT, obj: pad, "Handling {:?}", event);
let sink_pad = {
let proxy_ctx = imp.proxy_ctx.lock().unwrap();
@@ -684,7 +684,7 @@ impl PadSrcHandler for ProxySrcPadHandler {
match event.view() {
EventView::FlushStart(..) => {
if let Err(err) = imp.task.flush_start().await_maybe_on_context() {
- gst::error!(SRC_CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
+ gst::error!(SRC_CAT, obj: pad, "FlushStart failed {:?}", err);
gst::element_imp_error!(
imp,
gst::StreamError::Failed,
@@ -696,7 +696,7 @@ impl PadSrcHandler for ProxySrcPadHandler {
}
EventView::FlushStop(..) => {
if let Err(err) = imp.task.flush_stop().await_maybe_on_context() {
- gst::error!(SRC_CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
+ gst::error!(SRC_CAT, obj: pad, "FlushStop failed {:?}", err);
gst::element_imp_error!(
imp,
gst::StreamError::Failed,
@@ -710,16 +710,16 @@ impl PadSrcHandler for ProxySrcPadHandler {
}
if let Some(sink_pad) = sink_pad {
- gst::log!(SRC_CAT, obj: pad.gst_pad(), "Forwarding {:?}", event);
+ gst::log!(SRC_CAT, obj: pad, "Forwarding {:?}", event);
sink_pad.push_event(event)
} else {
- gst::error!(SRC_CAT, obj: pad.gst_pad(), "No sink pad to forward {:?} to", event);
+ gst::error!(SRC_CAT, obj: pad, "No sink pad to forward {:?} to", event);
false
}
}
- fn src_query(self, pad: &PadSrcRef, _proxysrc: &ProxySrc, query: &mut gst::QueryRef) -> bool {
- gst::log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", query);
+ fn src_query(self, pad: &gst::Pad, _proxysrc: &ProxySrc, query: &mut gst::QueryRef) -> bool {
+ gst::log!(SRC_CAT, obj: pad, "Handling {:?}", query);
use gst::QueryViewMut;
let ret = match query.view_mut() {
@@ -733,7 +733,7 @@ impl PadSrcHandler for ProxySrcPadHandler {
true
}
QueryViewMut::Caps(q) => {
- let caps = if let Some(ref caps) = pad.gst_pad().current_caps() {
+ let caps = if let Some(ref caps) = pad.current_caps() {
q.filter()
.map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First))
.unwrap_or_else(|| caps.clone())
@@ -751,9 +751,9 @@ impl PadSrcHandler for ProxySrcPadHandler {
};
if ret {
- gst::log!(SRC_CAT, obj: pad.gst_pad(), "Handled {:?}", query);
+ gst::log!(SRC_CAT, obj: pad, "Handled {:?}", query);
} else {
- gst::log!(SRC_CAT, obj: pad.gst_pad(), "Didn't handle {:?}", query);
+ gst::log!(SRC_CAT, obj: pad, "Didn't handle {:?}", query);
}
ret
diff --git a/generic/threadshare/src/queue/imp.rs b/generic/threadshare/src/queue/imp.rs
index 2ca02f9cc..683066bcc 100644
--- a/generic/threadshare/src/queue/imp.rs
+++ b/generic/threadshare/src/queue/imp.rs
@@ -33,7 +33,7 @@ use std::time::Duration;
use std::{u32, u64};
use crate::runtime::prelude::*;
-use crate::runtime::{Context, PadSink, PadSinkRef, PadSinkWeak, PadSrc, PadSrcRef, Task};
+use crate::runtime::{Context, PadSink, PadSrc, Task};
use crate::dataqueue::{DataQueue, DataQueueItem};
@@ -85,13 +85,12 @@ impl PadSinkHandler for QueuePadSinkHandler {
fn sink_chain(
self,
- pad: PadSinkWeak,
+ pad: gst::Pad,
elem: super::Queue,
buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
async move {
- let pad = pad.upgrade().expect("PadSink no longer exists");
- gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", buffer);
+ gst::log!(CAT, obj: pad, "Handling {:?}", buffer);
let imp = elem.imp();
imp.enqueue_item(DataQueueItem::Buffer(buffer)).await
}
@@ -100,25 +99,24 @@ impl PadSinkHandler for QueuePadSinkHandler {
fn sink_chain_list(
self,
- pad: PadSinkWeak,
+ pad: gst::Pad,
elem: super::Queue,
list: gst::BufferList,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
async move {
- let pad = pad.upgrade().expect("PadSink no longer exists");
- gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", list);
+ gst::log!(CAT, obj: pad, "Handling {:?}", list);
let imp = elem.imp();
imp.enqueue_item(DataQueueItem::BufferList(list)).await
}
.boxed()
}
- fn sink_event(self, pad: &PadSinkRef, imp: &Queue, event: gst::Event) -> bool {
- gst::debug!(CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event);
+ fn sink_event(self, pad: &gst::Pad, imp: &Queue, event: gst::Event) -> bool {
+ gst::debug!(CAT, obj: pad, "Handling non-serialized {:?}", event);
if let gst::EventView::FlushStart(..) = event.view() {
if let Err(err) = imp.task.flush_start().await_maybe_on_context() {
- gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
+ gst::error!(CAT, obj: pad, "FlushStart failed {:?}", err);
gst::element_imp_error!(
imp,
gst::StreamError::Failed,
@@ -129,25 +127,24 @@ impl PadSinkHandler for QueuePadSinkHandler {
}
}
- gst::log!(CAT, obj: pad.gst_pad(), "Forwarding non-serialized {:?}", event);
+ gst::log!(CAT, obj: pad, "Forwarding non-serialized {:?}", event);
imp.src_pad.gst_pad().push_event(event)
}
fn sink_event_serialized(
self,
- pad: PadSinkWeak,
+ pad: gst::Pad,
elem: super::Queue,
event: gst::Event,
) -> BoxFuture<'static, bool> {
async move {
- let pad = pad.upgrade().expect("PadSink no longer exists");
- gst::log!(CAT, obj: pad.gst_pad(), "Handling serialized {:?}", event);
+ gst::log!(CAT, obj: pad, "Handling serialized {:?}", event);
let imp = elem.imp();
if let gst::EventView::FlushStop(..) = event.view() {
if let Err(err) = imp.task.flush_stop().await_maybe_on_context() {
- gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
+ gst::error!(CAT, obj: pad, "FlushStop failed {:?}", err);
gst::element_imp_error!(
imp,
gst::StreamError::Failed,
@@ -158,21 +155,21 @@ impl PadSinkHandler for QueuePadSinkHandler {
}
}
- gst::log!(CAT, obj: pad.gst_pad(), "Queuing serialized {:?}", event);
+ gst::log!(CAT, obj: pad, "Queuing serialized {:?}", event);
imp.enqueue_item(DataQueueItem::Event(event)).await.is_ok()
}
.boxed()
}
- fn sink_query(self, pad: &PadSinkRef, imp: &Queue, query: &mut gst::QueryRef) -> bool {
- gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query);
+ fn sink_query(self, pad: &gst::Pad, imp: &Queue, query: &mut gst::QueryRef) -> bool {
+ gst::log!(CAT, obj: pad, "Handling {:?}", query);
if query.is_serialized() {
// FIXME: How can we do this?
- gst::log!(CAT, obj: pad.gst_pad(), "Dropping serialized {:?}", query);
+ gst::log!(CAT, obj: pad, "Dropping serialized {:?}", query);
false
} else {
- gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", query);
+ gst::log!(CAT, obj: pad, "Forwarding {:?}", query);
imp.src_pad.gst_pad().peer_query(query)
}
}
@@ -184,19 +181,19 @@ struct QueuePadSrcHandler;
impl PadSrcHandler for QueuePadSrcHandler {
type ElementImpl = Queue;
- fn src_event(self, pad: &PadSrcRef, imp: &Queue, event: gst::Event) -> bool {
- gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
+ fn src_event(self, pad: &gst::Pad, imp: &Queue, event: gst::Event) -> bool {
+ gst::log!(CAT, obj: pad, "Handling {:?}", event);
use gst::EventView;
match event.view() {
EventView::FlushStart(..) => {
if let Err(err) = imp.task.flush_start().await_maybe_on_context() {
- gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
+ gst::error!(CAT, obj: pad, "FlushStart failed {:?}", err);
}
}
EventView::FlushStop(..) => {
if let Err(err) = imp.task.flush_stop().await_maybe_on_context() {
- gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
+ gst::error!(CAT, obj: pad, "FlushStop failed {:?}", err);
gst::element_imp_error!(
imp,
gst::StreamError::Failed,
@@ -209,12 +206,12 @@ impl PadSrcHandler for QueuePadSrcHandler {
_ => (),
}
- gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", event);
+ gst::log!(CAT, obj: pad, "Forwarding {:?}", event);
imp.sink_pad.gst_pad().push_event(event)
}
- fn src_query(self, pad: &PadSrcRef, imp: &Queue, query: &mut gst::QueryRef) -> bool {
- gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query);
+ fn src_query(self, pad: &gst::Pad, imp: &Queue, query: &mut gst::QueryRef) -> bool {
+ gst::log!(CAT, obj: pad, "Handling {:?}", query);
if let gst::QueryViewMut::Scheduling(q) = query.view_mut() {
let mut new_query = gst::query::Scheduling::new();
@@ -223,7 +220,7 @@ impl PadSrcHandler for QueuePadSrcHandler {
return res;
}
- gst::log!(CAT, obj: pad.gst_pad(), "Upstream returned {:?}", new_query);
+ gst::log!(CAT, obj: pad, "Upstream returned {:?}", new_query);
let (flags, min, max, align) = new_query.result();
q.set(flags, min, max, align);
@@ -235,11 +232,11 @@ impl PadSrcHandler for QueuePadSrcHandler {
.filter(|m| m != &gst::PadMode::Pull)
.collect::<Vec<_>>(),
);
- gst::log!(CAT, obj: pad.gst_pad(), "Returning {:?}", q.query_mut());
+ gst::log!(CAT, obj: pad, "Returning {:?}", q.query_mut());
return true;
}
- gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", query);
+ gst::log!(CAT, obj: pad, "Forwarding {:?}", query);
imp.sink_pad.gst_pad().peer_query(query)
}
}
diff --git a/generic/threadshare/src/runtime/pad.rs b/generic/threadshare/src/runtime/pad.rs
index 857f15a12..865dbf84c 100644
--- a/generic/threadshare/src/runtime/pad.rs
+++ b/generic/threadshare/src/runtime/pad.rs
@@ -123,36 +123,28 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static {
fn src_activate(
self,
- pad: &PadSrcRef,
+ pad: &gst::Pad,
_imp: &Self::ElementImpl,
) -> Result<(), gst::LoggableError> {
- let gst_pad = pad.gst_pad();
- if gst_pad.is_active() {
+ if pad.is_active() {
gst::debug!(
RUNTIME_CAT,
- obj: gst_pad,
+ obj: pad,
"Already activated in {:?} mode ",
- gst_pad.mode()
+ pad.mode()
);
return Ok(());
}
- gst_pad
- .activate_mode(gst::PadMode::Push, true)
- .map_err(|err| {
- gst::error!(
- RUNTIME_CAT,
- obj: gst_pad,
- "Error in PadSrc activate: {:?}",
- err
- );
- gst::loggable_error!(RUNTIME_CAT, "Error in PadSrc activate: {:?}", err)
- })
+ pad.activate_mode(gst::PadMode::Push, true).map_err(|err| {
+ gst::error!(RUNTIME_CAT, obj: pad, "Error in PadSrc activate: {:?}", err);
+ gst::loggable_error!(RUNTIME_CAT, "Error in PadSrc activate: {:?}", err)
+ })
}
fn src_activatemode(
self,
- _pad: &PadSrcRef,
+ _pad: &gst::Pad,
_imp: &Self::ElementImpl,
_mode: gst::PadMode,
_active: bool,
@@ -160,8 +152,8 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static {
Ok(())
}
- fn src_event(self, pad: &PadSrcRef, imp: &Self::ElementImpl, event: gst::Event) -> bool {
- gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
+ fn src_event(self, pad: &gst::Pad, imp: &Self::ElementImpl, event: gst::Event) -> bool {
+ gst::log!(RUNTIME_CAT, obj: pad, "Handling {:?}", event);
let elem = imp.obj();
// FIXME with GAT on `Self::ElementImpl`, we should be able to
@@ -170,12 +162,12 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static {
// Safety: `Self::ElementImpl` is bound to `gst::subclass::ElementImpl`.
let element = unsafe { elem.unsafe_cast_ref::<gst::Element>() };
- gst::Pad::event_default(pad.gst_pad(), Some(element), event)
+ gst::Pad::event_default(pad, Some(element), event)
}
fn src_event_full(
self,
- pad: &PadSrcRef,
+ pad: &gst::Pad,
imp: &Self::ElementImpl,
event: gst::Event,
) -> Result<FlowSuccess, FlowError> {
@@ -185,19 +177,14 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static {
event_to_event_full(self.src_event(pad, imp, event), event_type)
}
- fn src_query(
- self,
- pad: &PadSrcRef,
- imp: &Self::ElementImpl,
- query: &mut gst::QueryRef,
- ) -> bool {
- gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", query);
+ fn src_query(self, pad: &gst::Pad, imp: &Self::ElementImpl, query: &mut gst::QueryRef) -> bool {
+ gst::log!(RUNTIME_CAT, obj: pad, "Handling {:?}", query);
if query.is_serialized() {
// FIXME serialized queries should be handled with the dataflow
// but we can't return a `Future` because we couldn't honor QueryRef's lifetime
false
} else {
- gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", query);
+ gst::log!(RUNTIME_CAT, obj: pad, "Handling {:?}", query);
let elem = imp.obj();
// FIXME with GAT on `Self::ElementImpl`, we should be able to
@@ -206,7 +193,7 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static {
// Safety: `Self::ElementImpl` is bound to `gst::subclass::ElementImpl`.
let element = unsafe { elem.unsafe_cast_ref::<gst::Element>() };
- gst::Pad::query_default(pad.gst_pad(), Some(element), query)
+ gst::Pad::query_default(pad, Some(element), query)
}
}
}
@@ -230,11 +217,11 @@ impl PadSrcInner {
}
pub async fn push(&self, buffer: gst::Buffer) -> Result<FlowSuccess, FlowError> {
- gst::log!(RUNTIME_CAT, obj: self.gst_pad(), "Pushing {:?}", buffer);
+ gst::log!(RUNTIME_CAT, obj: self.gst_pad, "Pushing {:?}", buffer);
let success = self.gst_pad.push(buffer).map_err(|err| {
gst::error!(RUNTIME_CAT,
- obj: self.gst_pad(),
+ obj: self.gst_pad,
"Failed to push Buffer to PadSrc: {:?}",
err,
);
@@ -248,12 +235,12 @@ impl PadSrcInner {
}
pub async fn push_list(&self, list: gst::BufferList) -> Result<FlowSuccess, FlowError> {
- gst::log!(RUNTIME_CAT, obj: self.gst_pad(), "Pushing {:?}", list);
+ gst::log!(RUNTIME_CAT, obj: self.gst_pad, "Pushing {:?}", list);
let success = self.gst_pad.push_list(list).map_err(|err| {
gst::error!(
RUNTIME_CAT,
- obj: self.gst_pad(),
+ obj: self.gst_pad,
"Failed to push BufferList to PadSrc: {:?}",
err,
);
@@ -269,7 +256,7 @@ impl PadSrcInner {
pub async fn push_event(&self, event: gst::Event) -> bool {
gst::log!(RUNTIME_CAT, obj: self.gst_pad, "Pushing {:?}", event);
- let was_handled = self.gst_pad().push_event(event);
+ let was_handled = self.gst_pad.push_event(event);
gst::log!(RUNTIME_CAT, obj: self.gst_pad, "Processing any pending sub tasks");
if Context::drain_sub_tasks().await.is_err() {
@@ -326,26 +313,6 @@ impl<'a> PadSrcRef<'a> {
pub fn downgrade(&self) -> PadSrcWeak {
PadSrcWeak(Arc::downgrade(&self.strong))
}
-
- fn activate_mode_hook(
- &self,
- mode: gst::PadMode,
- active: bool,
- ) -> Result<(), gst::LoggableError> {
- // Important: don't panic here as the hook is used without `catch_panic_pad_function`
- // in the default `activatemode` handling
- gst::log!(RUNTIME_CAT, obj: self.gst_pad(), "ActivateMode {:?}, {}", mode, active);
-
- if mode == gst::PadMode::Pull {
- gst::error!(RUNTIME_CAT, obj: self.gst_pad(), "Pull mode not supported by PadSrc");
- return Err(gst::loggable_error!(
- RUNTIME_CAT,
- "Pull mode not supported by PadSrc"
- ));
- }
-
- Ok(())
- }
}
impl<'a> Deref for PadSrcRef<'a> {
@@ -384,19 +351,17 @@ impl PadSrc {
}
pub fn check_reconfigure(&self) -> bool {
- self.0.gst_pad().check_reconfigure()
+ self.0.gst_pad.check_reconfigure()
}
fn init_pad_functions<H: PadSrcHandler>(&self, handler: H) {
// FIXME: Do this better
unsafe {
let handler_clone = handler.clone();
- let inner_arc = Arc::clone(&self.0);
self.0
- .gst_pad()
+ .gst_pad
.set_activate_function(move |gst_pad, parent| {
let handler = handler_clone.clone();
- let inner_arc = inner_arc.clone();
H::ElementImpl::catch_panic_pad_function(
parent,
|| {
@@ -406,16 +371,15 @@ impl PadSrc {
"Panic in PadSrc activate"
))
},
- move |imp| H::src_activate(handler, &PadSrcRef::new(inner_arc), imp),
+ move |imp| H::src_activate(handler, gst_pad, imp),
)
});
let handler_clone = handler.clone();
- let inner_arc = Arc::clone(&self.0);
- self.gst_pad()
+ self.0
+ .gst_pad
.set_activatemode_function(move |gst_pad, parent, mode, active| {
let handler = handler_clone.clone();
- let inner_arc = inner_arc.clone();
H::ElementImpl::catch_panic_pad_function(
parent,
|| {
@@ -426,9 +390,27 @@ impl PadSrc {
))
},
move |imp| {
- let this_ref = PadSrcRef::new(inner_arc);
- this_ref.activate_mode_hook(mode, active)?;
- H::src_activatemode(handler, &this_ref, imp, mode, active)
+ gst::log!(
+ RUNTIME_CAT,
+ obj: gst_pad,
+ "ActivateMode {:?}, {}",
+ mode,
+ active
+ );
+
+ if mode == gst::PadMode::Pull {
+ gst::error!(
+ RUNTIME_CAT,
+ obj: gst_pad,
+ "Pull mode not supported by PadSrc"
+ );
+ return Err(gst::loggable_error!(
+ RUNTIME_CAT,
+ "Pull mode not supported by PadSrc"
+ ));
+ }
+
+ H::src_activatemode(handler, gst_pad, imp, mode, active)
},
)
});
@@ -436,38 +418,38 @@ impl PadSrc {
// No need to `set_event_function` since `set_event_full_function`
// overrides it and dispatches to `src_event` when necessary
let handler_clone = handler.clone();
- let inner_arc = Arc::clone(&self.0);
- self.gst_pad()
- .set_event_full_function(move |_gst_pad, parent, event| {
+ self.0
+ .gst_pad
+ .set_event_full_function(move |gst_pad, parent, event| {
let handler = handler_clone.clone();
- let inner_arc = inner_arc.clone();
H::ElementImpl::catch_panic_pad_function(
parent,
|| Err(FlowError::Error),
+ move |imp| H::src_event_full(handler, gst_pad, imp, event),
+ )
+ });
+
+ self.0
+ .gst_pad
+ .set_query_function(move |gst_pad, parent, query| {
+ let handler = handler.clone();
+ H::ElementImpl::catch_panic_pad_function(
+ parent,
+ || false,
move |imp| {
- H::src_event_full(handler, &PadSrcRef::new(inner_arc), imp, event)
+ if !query.is_serialized() {
+ H::src_query(handler, gst_pad, imp, query)
+ } else {
+ gst::fixme!(
+ RUNTIME_CAT,
+ obj: gst_pad,
+ "Serialized Query not supported"
+ );
+ false
+ }
},
)
});
-
- let inner_arc = Arc::clone(&self.0);
- self.gst_pad()
- .set_query_function(move |_gst_pad, parent, query| {
- let handler = handler.clone();
- let inner_arc = inner_arc.clone();
- H::ElementImpl::catch_panic_pad_function(
- parent,
- || false,
- move |imp| {
- if !query.is_serialized() {
- H::src_query(handler, &PadSrcRef::new(inner_arc), imp, query)
- } else {
- gst::fixme!(RUNTIME_CAT, obj: inner_arc.gst_pad(), "Serialized Query not supported");
- false
- }
- },
- )
- });
}
}
}
@@ -476,19 +458,24 @@ impl Drop for PadSrc {
fn drop(&mut self) {
// FIXME: Do this better
unsafe {
- self.gst_pad()
+ self.0
+ .gst_pad
.set_activate_function(move |_gst_pad, _parent| {
Err(gst::loggable_error!(RUNTIME_CAT, "PadSrc no longer exists"))
});
- self.gst_pad()
+ self.0
+ .gst_pad
.set_activatemode_function(move |_gst_pad, _parent, _mode, _active| {
Err(gst::loggable_error!(RUNTIME_CAT, "PadSrc no longer exists"))
});
- self.gst_pad()
+ self.0
+ .gst_pad
.set_event_function(move |_gst_pad, _parent, _event| false);
- self.gst_pad()
+ self.0
+ .gst_pad
.set_event_full_function(move |_gst_pad, _parent, _event| Err(FlowError::Flushing));
- self.gst_pad()
+ self.0
+ .gst_pad
.set_query_function(move |_gst_pad, _parent, _query| false);
}
}
@@ -514,36 +501,33 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
fn sink_activate(
self,
- pad: &PadSinkRef,
+ pad: &gst::Pad,
_imp: &Self::ElementImpl,
) -> Result<(), gst::LoggableError> {
- let gst_pad = pad.gst_pad();
- if gst_pad.is_active() {
+ if pad.is_active() {
gst::debug!(
RUNTIME_CAT,
- obj: gst_pad,
+ obj: pad,
"Already activated in {:?} mode ",
- gst_pad.mode()
+ pad.mode()
);
return Ok(());
}
- gst_pad
- .activate_mode(gst::PadMode::Push, true)
- .map_err(|err| {
- gst::error!(
- RUNTIME_CAT,
- obj: gst_pad,
- "Error in PadSink activate: {:?}",
- err
- );
- gst::loggable_error!(RUNTIME_CAT, "Error in PadSink activate: {:?}", err)
- })
+ pad.activate_mode(gst::PadMode::Push, true).map_err(|err| {
+ gst::error!(
+ RUNTIME_CAT,
+ obj: pad,
+ "Error in PadSink activate: {:?}",
+ err
+ );
+ gst::loggable_error!(RUNTIME_CAT, "Error in PadSink activate: {:?}", err)
+ })
}
fn sink_activatemode(
self,
- _pad: &PadSinkRef,
+ _pad: &gst::Pad,
_imp: &Self::ElementImpl,
_mode: gst::PadMode,
_active: bool,
@@ -553,7 +537,7 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
fn sink_chain(
self,
- _pad: PadSinkWeak,
+ _pad: gst::Pad,
_elem: <Self::ElementImpl as ObjectSubclass>::Type,
_buffer: gst::Buffer,
) -> BoxFuture<'static, Result<FlowSuccess, FlowError>> {
@@ -562,16 +546,16 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
fn sink_chain_list(
self,
- _pad: PadSinkWeak,
+ _pad: gst::Pad,
_elem: <Self::ElementImpl as ObjectSubclass>::Type,
_buffer_list: gst::BufferList,
) -> BoxFuture<'static, Result<FlowSuccess, FlowError>> {
future::err(FlowError::NotSupported).boxed()
}
- fn sink_event(self, pad: &PadSinkRef, imp: &Self::ElementImpl, event: gst::Event) -> bool {
+ fn sink_event(self, pad: &gst::Pad, imp: &Self::ElementImpl, event: gst::Event) -> bool {
assert!(!event.is_serialized());
- gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
+ gst::log!(RUNTIME_CAT, obj: pad, "Handling {:?}", event);
let elem = imp.obj();
// FIXME with GAT on `Self::ElementImpl`, we should be able to
@@ -580,12 +564,12 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
// Safety: `Self::ElementImpl` is bound to `gst::subclass::ElementImpl`.
let element = unsafe { elem.unsafe_cast_ref::<gst::Element>() };
- gst::Pad::event_default(pad.gst_pad(), Some(element), event)
+ gst::Pad::event_default(pad, Some(element), event)
}
fn sink_event_serialized(
self,
- pad: PadSinkWeak,
+ pad: gst::Pad,
elem: <Self::ElementImpl as ObjectSubclass>::Type,
event: gst::Event,
) -> BoxFuture<'static, bool> {
@@ -597,17 +581,16 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
let element = unsafe { elem.unsafe_cast::<gst::Element>() };
async move {
- let pad = pad.upgrade().expect("PadSink no longer exists");
- gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
+ gst::log!(RUNTIME_CAT, obj: pad, "Handling {:?}", event);
- gst::Pad::event_default(pad.gst_pad(), Some(&element), event)
+ gst::Pad::event_default(&pad, Some(&element), event)
}
.boxed()
}
fn sink_event_full(
self,
- pad: &PadSinkRef,
+ pad: &gst::Pad,
imp: &Self::ElementImpl,
event: gst::Event,
) -> Result<FlowSuccess, FlowError> {
@@ -620,7 +603,7 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
fn sink_event_full_serialized(
self,
- pad: PadSinkWeak,
+ pad: gst::Pad,
elem: <Self::ElementImpl as ObjectSubclass>::Type,
event: gst::Event,
) -> BoxFuture<'static, Result<FlowSuccess, FlowError>> {
@@ -636,17 +619,17 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
fn sink_query(
self,
- pad: &PadSinkRef,
+ pad: &gst::Pad,
imp: &Self::ElementImpl,
query: &mut gst::QueryRef,
) -> bool {
if query.is_serialized() {
- gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Dropping {:?}", query);
+ gst::log!(RUNTIME_CAT, obj: pad, "Dropping {:?}", query);
// FIXME serialized queries should be handled with the dataflow
// but we can't return a `Future` because we couldn't honor QueryRef's lifetime
false
} else {
- gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", query);
+ gst::log!(RUNTIME_CAT, obj: pad, "Handling {:?}", query);
let elem = imp.obj();
// FIXME with GAT on `Self::ElementImpl`, we should be able to
@@ -655,7 +638,7 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
// Safety: `Self::ElementImpl` is bound to `gst::subclass::ElementImpl`.
let element = unsafe { elem.unsafe_cast_ref::<gst::Element>() };
- gst::Pad::query_default(pad.gst_pad(), Some(element), query)
+ gst::Pad::query_default(pad, Some(element), query)
}
}
}
@@ -724,26 +707,6 @@ impl<'a> PadSinkRef<'a> {
pub fn downgrade(&self) -> PadSinkWeak {
PadSinkWeak(Arc::downgrade(&self.strong))
}
-
- fn activate_mode_hook(
- &self,
- mode: gst::PadMode,
- active: bool,
- ) -> Result<(), gst::LoggableError> {
- // Important: don't panic here as the hook is used without `catch_panic_pad_function`
- // in the default `activatemode` handling
- gst::log!(RUNTIME_CAT, obj: self.gst_pad(), "ActivateMode {:?}, {}", mode, active);
-
- if mode == gst::PadMode::Pull {
- gst::error!(RUNTIME_CAT, obj: self.gst_pad(), "Pull mode not supported by PadSink");
- return Err(gst::loggable_error!(
- RUNTIME_CAT,
- "Pull mode not supported by PadSink"
- ));
- }
-
- Ok(())
- }
}
impl<'a> Deref for PadSinkRef<'a> {
@@ -794,12 +757,10 @@ impl PadSink {
{
unsafe {
let handler_clone = handler.clone();
- let inner_arc = Arc::clone(&self.0);
- self.gst_pad()
+ self.0
+ .gst_pad
.set_activate_function(move |gst_pad, parent| {
let handler = handler_clone.clone();
- let inner_arc = inner_arc.clone();
-
H::ElementImpl::catch_panic_pad_function(
parent,
|| {
@@ -809,16 +770,15 @@ impl PadSink {
"Panic in PadSink activate"
))
},
- move |imp| H::sink_activate(handler, &PadSinkRef::new(inner_arc), imp),
+ move |imp| H::sink_activate(handler, gst_pad, imp),
)
});
let handler_clone = handler.clone();
- let inner_arc = Arc::clone(&self.0);
- self.gst_pad()
+ self.0
+ .gst_pad
.set_activatemode_function(move |gst_pad, parent, mode, active| {
let handler = handler_clone.clone();
- let inner_arc = inner_arc.clone();
H::ElementImpl::catch_panic_pad_function(
parent,
|| {
@@ -829,36 +789,53 @@ impl PadSink {
))
},
move |imp| {
- let this_ref = PadSinkRef::new(inner_arc);
- this_ref.activate_mode_hook(mode, active)?;
- H::sink_activatemode(handler, &this_ref, imp, mode, active)
+ gst::log!(
+ RUNTIME_CAT,
+ obj: gst_pad,
+ "ActivateMode {:?}, {}",
+ mode,
+ active
+ );
+
+ if mode == gst::PadMode::Pull {
+ gst::error!(
+ RUNTIME_CAT,
+ obj: gst_pad,
+ "Pull mode not supported by PadSink"
+ );
+ return Err(gst::loggable_error!(
+ RUNTIME_CAT,
+ "Pull mode not supported by PadSink"
+ ));
+ }
+
+ H::sink_activatemode(handler, gst_pad, imp, mode, active)
},
)
});
let handler_clone = handler.clone();
- let inner_arc = Arc::clone(&self.0);
- self.gst_pad()
- .set_chain_function(move |_gst_pad, parent, buffer| {
+ self.0
+ .gst_pad
+ .set_chain_function(move |gst_pad, parent, buffer| {
let handler = handler_clone.clone();
- let inner_arc = inner_arc.clone();
H::ElementImpl::catch_panic_pad_function(
parent,
|| Err(FlowError::Error),
move |imp| {
- let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc));
let elem = imp.obj().clone();
+ let gst_pad = gst_pad.clone();
if let Some((ctx, task_id)) = Context::current_task() {
let delayed_fut = async move {
- H::sink_chain(handler, this_weak, elem, buffer).await
+ H::sink_chain(handler, gst_pad, elem, buffer).await
};
let _ =
ctx.add_sub_task(task_id, delayed_fut.map(|res| res.map(drop)));
Ok(gst::FlowSuccess::Ok)
} else {
- let chain_fut = H::sink_chain(handler, this_weak, elem, buffer);
+ let chain_fut = H::sink_chain(handler, gst_pad, elem, buffer);
executor::block_on(chain_fut)
}
},
@@ -866,21 +843,20 @@ impl PadSink {
});
let handler_clone = handler.clone();
- let inner_arc = Arc::clone(&self.0);
- self.gst_pad()
- .set_chain_list_function(move |_gst_pad, parent, list| {
+ self.0
+ .gst_pad
+ .set_chain_list_function(move |gst_pad, parent, list| {
let handler = handler_clone.clone();
- let inner_arc = inner_arc.clone();
H::ElementImpl::catch_panic_pad_function(
parent,
|| Err(FlowError::Error),
move |imp| {
- let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc));
let elem = imp.obj().clone();
+ let gst_pad = gst_pad.clone();
if let Some((ctx, task_id)) = Context::current_task() {
let delayed_fut = async move {
- H::sink_chain_list(handler, this_weak, elem, list).await
+ H::sink_chain_list(handler, gst_pad, elem, list).await
};
let _ =
ctx.add_sub_task(task_id, delayed_fut.map(|res| res.map(drop)));
@@ -888,7 +864,7 @@ impl PadSink {
Ok(gst::FlowSuccess::Ok)
} else {
let chain_list_fut =
- H::sink_chain_list(handler, this_weak, elem, list);
+ H::sink_chain_list(handler, gst_pad, elem, list);
executor::block_on(chain_list_fut)
}
},
@@ -898,25 +874,22 @@ impl PadSink {
// No need to `set_event_function` since `set_event_full_function`
// overrides it and dispatches to `sink_event` when necessary
let handler_clone = handler.clone();
- let inner_arc = Arc::clone(&self.0);
- self.gst_pad()
- .set_event_full_function(move |_gst_pad, parent, event| {
+ self.0
+ .gst_pad
+ .set_event_full_function(move |gst_pad, parent, event| {
let handler = handler_clone.clone();
- let inner_arc = inner_arc.clone();
H::ElementImpl::catch_panic_pad_function(
parent,
|| Err(FlowError::Error),
move |imp| {
if event.is_serialized() {
- let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc));
let elem = imp.obj().clone();
+ let gst_pad = gst_pad.clone();
if let Some((ctx, task_id)) = Context::current_task() {
let delayed_fut = async move {
- H::sink_event_full_serialized(
- handler, this_weak, elem, event,
- )
- .await
+ H::sink_event_full_serialized(handler, gst_pad, elem, event)
+ .await
};
let _ = ctx.add_sub_task(
task_id,
@@ -926,35 +899,38 @@ impl PadSink {
Ok(gst::FlowSuccess::Ok)
} else {
let event_fut = H::sink_event_full_serialized(
- handler, this_weak, elem, event,
+ handler, gst_pad, elem, event,
);
executor::block_on(event_fut)
}
} else {
- handler.sink_event_full(&PadSinkRef::new(inner_arc), imp, event)
+ handler.sink_event_full(gst_pad, imp, event)
}
},
)
});
- let inner_arc = Arc::clone(&self.0);
- self.gst_pad()
- .set_query_function(move |_gst_pad, parent, query| {
- let handler = handler.clone();
- let inner_arc = inner_arc.clone();
- H::ElementImpl::catch_panic_pad_function(
- parent,
- || false,
- move |imp| {
- if !query.is_serialized() {
- H::sink_query(handler, &PadSinkRef::new(inner_arc), imp, query)
- } else {
- gst::fixme!(RUNTIME_CAT, obj: inner_arc.gst_pad(), "Serialized Query not supported");
- false
- }
- },
- )
- });
+ self.0
+ .gst_pad
+ .set_query_function(move |gst_pad, parent, query| {
+ let handler = handler.clone();
+ H::ElementImpl::catch_panic_pad_function(
+ parent,
+ || false,
+ move |imp| {
+ if !query.is_serialized() {
+ H::sink_query(handler, gst_pad, imp, query)
+ } else {
+ gst::fixme!(
+ RUNTIME_CAT,
+ obj: gst_pad,
+ "Serialized Query not supported"
+ );
+ false
+ }
+ },
+ )
+ });
}
}
}
@@ -963,29 +939,36 @@ impl Drop for PadSink {
fn drop(&mut self) {
// FIXME: Do this better
unsafe {
- self.gst_pad()
+ self.0
+ .gst_pad
.set_activate_function(move |_gst_pad, _parent| {
Err(gst::loggable_error!(
RUNTIME_CAT,
"PadSink no longer exists"
))
});
- self.gst_pad()
+ self.0
+ .gst_pad
.set_activatemode_function(move |_gst_pad, _parent, _mode, _active| {
Err(gst::loggable_error!(
RUNTIME_CAT,
"PadSink no longer exists"
))
});
- self.gst_pad()
+ self.0
+ .gst_pad
.set_chain_function(move |_gst_pad, _parent, _buffer| Err(FlowError::Flushing));
- self.gst_pad()
+ self.0
+ .gst_pad
.set_chain_list_function(move |_gst_pad, _parent, _list| Err(FlowError::Flushing));
- self.gst_pad()
+ self.0
+ .gst_pad
.set_event_function(move |_gst_pad, _parent, _event| false);
- self.gst_pad()
+ self.0
+ .gst_pad
.set_event_full_function(move |_gst_pad, _parent, _event| Err(FlowError::Flushing));
- self.gst_pad()
+ self.0
+ .gst_pad
.set_query_function(move |_gst_pad, _parent, _query| false);
}
}
diff --git a/generic/threadshare/src/tcpclientsrc/imp.rs b/generic/threadshare/src/tcpclientsrc/imp.rs
index 96d105e3d..71e702b59 100644
--- a/generic/threadshare/src/tcpclientsrc/imp.rs
+++ b/generic/threadshare/src/tcpclientsrc/imp.rs
@@ -36,7 +36,7 @@ use std::u32;
use crate::runtime::prelude::*;
use crate::runtime::task;
-use crate::runtime::{Context, PadSrc, PadSrcRef, Task, TaskState};
+use crate::runtime::{Context, PadSrc, Task, TaskState};
use crate::runtime::Async;
use crate::socket::{Socket, SocketError, SocketRead};
@@ -96,8 +96,8 @@ struct TcpClientSrcPadHandler;
impl PadSrcHandler for TcpClientSrcPadHandler {
type ElementImpl = TcpClientSrc;
- fn src_event(self, pad: &PadSrcRef, imp: &TcpClientSrc, event: gst::Event) -> bool {
- gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
+ fn src_event(self, pad: &gst::Pad, imp: &TcpClientSrc, event: gst::Event) -> bool {
+ gst::log!(CAT, obj: pad, "Handling {:?}", event);
use gst::EventView;
let ret = match event.view() {
@@ -109,16 +109,16 @@ impl PadSrcHandler for TcpClientSrcPadHandler {
};
if ret {
- gst::log!(CAT, obj: pad.gst_pad(), "Handled {:?}", event);
+ gst::log!(CAT, obj: pad, "Handled {:?}", event);
} else {
- gst::log!(CAT, obj: pad.gst_pad(), "Didn't handle {:?}", event);
+ gst::log!(CAT, obj: pad, "Didn't handle {:?}", event);
}
ret
}
- fn src_query(self, pad: &PadSrcRef, imp: &TcpClientSrc, query: &mut gst::QueryRef) -> bool {
- gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query);
+ fn src_query(self, pad: &gst::Pad, imp: &TcpClientSrc, query: &mut gst::QueryRef) -> bool {
+ gst::log!(CAT, obj: pad, "Handling {:?}", query);
use gst::QueryViewMut;
let ret = match query.view_mut() {
@@ -150,9 +150,9 @@ impl PadSrcHandler for TcpClientSrcPadHandler {
};
if ret {
- gst::log!(CAT, obj: pad.gst_pad(), "Handled {:?}", query);
+ gst::log!(CAT, obj: pad, "Handled {:?}", query);
} else {
- gst::log!(CAT, obj: pad.gst_pad(), "Didn't handle {:?}", query);
+ gst::log!(CAT, obj: pad, "Didn't handle {:?}", query);
}
ret
diff --git a/generic/threadshare/src/udpsink/imp.rs b/generic/threadshare/src/udpsink/imp.rs
index 2b45f7ffb..f9186b33d 100644
--- a/generic/threadshare/src/udpsink/imp.rs
+++ b/generic/threadshare/src/udpsink/imp.rs
@@ -30,7 +30,7 @@ use gst::{element_error, error_msg};
use once_cell::sync::Lazy;
use crate::runtime::prelude::*;
-use crate::runtime::{self, Async, Context, PadSink, PadSinkRef, PadSinkWeak, Task};
+use crate::runtime::{self, Async, Context, PadSink, Task};
use crate::socket::{wrap_socket, GioSocketWrapper};
use std::collections::BTreeSet;
@@ -134,7 +134,7 @@ impl PadSinkHandler for UdpSinkPadHandler {
fn sink_chain(
self,
- _pad: PadSinkWeak,
+ _pad: gst::Pad,
elem: super::UdpSink,
buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
@@ -152,7 +152,7 @@ impl PadSinkHandler for UdpSinkPadHandler {
fn sink_chain_list(
self,
- _pad: PadSinkWeak,
+ _pad: gst::Pad,
elem: super::UdpSink,
list: gst::BufferList,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
@@ -172,7 +172,7 @@ impl PadSinkHandler for UdpSinkPadHandler {
fn sink_event_serialized(
self,
- _pad: PadSinkWeak,
+ _pad: gst::Pad,
elem: super::UdpSink,
event: gst::Event,
) -> BoxFuture<'static, bool> {
@@ -190,7 +190,7 @@ impl PadSinkHandler for UdpSinkPadHandler {
.boxed()
}
- fn sink_event(self, _pad: &PadSinkRef, imp: &UdpSink, event: gst::Event) -> bool {
+ fn sink_event(self, _pad: &gst::Pad, imp: &UdpSink, event: gst::Event) -> bool {
if let EventView::FlushStart(..) = event.view() {
return imp.task.flush_start().await_maybe_on_context().is_ok();
}
diff --git a/generic/threadshare/src/udpsrc/imp.rs b/generic/threadshare/src/udpsrc/imp.rs
index e9b6fcdc8..cfd95de8f 100644
--- a/generic/threadshare/src/udpsrc/imp.rs
+++ b/generic/threadshare/src/udpsrc/imp.rs
@@ -35,7 +35,7 @@ use std::time::Duration;
use std::u16;
use crate::runtime::prelude::*;
-use crate::runtime::{Async, Context, PadSrc, PadSrcRef, Task};
+use crate::runtime::{Async, Context, PadSrc, Task};
use crate::socket::{wrap_socket, GioSocketWrapper, Socket, SocketError, SocketRead};
@@ -113,8 +113,8 @@ struct UdpSrcPadHandler;
impl PadSrcHandler for UdpSrcPadHandler {
type ElementImpl = UdpSrc;
- fn src_event(self, pad: &PadSrcRef, imp: &UdpSrc, event: gst::Event) -> bool {
- gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
+ fn src_event(self, pad: &gst::Pad, imp: &UdpSrc, event: gst::Event) -> bool {
+ gst::log!(CAT, obj: pad, "Handling {:?}", event);
use gst::EventView;
let ret = match event.view() {
@@ -126,16 +126,16 @@ impl PadSrcHandler for UdpSrcPadHandler {
};
if ret {
- gst::log!(CAT, obj: pad.gst_pad(), "Handled {:?}", event);
+ gst::log!(CAT, obj: pad, "Handled {:?}", event);
} else {
- gst::log!(CAT, obj: pad.gst_pad(), "Didn't handle {:?}", event);
+ gst::log!(CAT, obj: pad, "Didn't handle {:?}", event);
}
ret
}
- fn src_query(self, pad: &PadSrcRef, imp: &UdpSrc, query: &mut gst::QueryRef) -> bool {
- gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query);
+ fn src_query(self, pad: &gst::Pad, imp: &UdpSrc, query: &mut gst::QueryRef) -> bool {
+ gst::log!(CAT, obj: pad, "Handling {:?}", query);
use gst::QueryViewMut;
let ret = match query.view_mut() {
@@ -167,9 +167,9 @@ impl PadSrcHandler for UdpSrcPadHandler {
};
if ret {
- gst::log!(CAT, obj: pad.gst_pad(), "Handled {:?}", query);
+ gst::log!(CAT, obj: pad, "Handled {:?}", query);
} else {
- gst::log!(CAT, obj: pad.gst_pad(), "Didn't handle {:?}", query);
+ gst::log!(CAT, obj: pad, "Didn't handle {:?}", query);
}
ret
diff --git a/generic/threadshare/tests/pad.rs b/generic/threadshare/tests/pad.rs
index 78f32fb9f..9c6257c77 100644
--- a/generic/threadshare/tests/pad.rs
+++ b/generic/threadshare/tests/pad.rs
@@ -36,9 +36,7 @@ use std::sync::Mutex;
use std::time::Duration;
use gstthreadshare::runtime::prelude::*;
-use gstthreadshare::runtime::{
- Context, PadSink, PadSinkRef, PadSinkWeak, PadSrc, PadSrcRef, Task, TaskState,
-};
+use gstthreadshare::runtime::{Context, PadSink, PadSrc, Task, TaskState};
const DEFAULT_CONTEXT: &str = "";
const THROTTLING_DURATION: Duration = Duration::from_millis(2);
@@ -89,8 +87,8 @@ mod imp_src {
impl PadSrcHandler for PadSrcTestHandler {
type ElementImpl = ElementSrcTest;
- fn src_event(self, pad: &PadSrcRef, imp: &ElementSrcTest, event: gst::Event) -> bool {
- gst::log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
+ fn src_event(self, pad: &gst::Pad, imp: &ElementSrcTest, event: gst::Event) -> bool {
+ gst::log!(SRC_CAT, obj: pad, "Handling {:?}", event);
let ret = match event.view() {
EventView::FlushStart(..) => {
@@ -102,9 +100,9 @@ mod imp_src {
};
if ret {
- gst::log!(SRC_CAT, obj: pad.gst_pad(), "Handled {:?}", event);
+ gst::log!(SRC_CAT, obj: pad, "Handled {:?}", event);
} else {
- gst::log!(SRC_CAT, obj: pad.gst_pad(), "Didn't handle {:?}", event);
+ gst::log!(SRC_CAT, obj: pad, "Didn't handle {:?}", event);
}
ret
@@ -441,7 +439,7 @@ mod imp_sink {
fn sink_chain(
self,
- _pad: PadSinkWeak,
+ _pad: gst::Pad,
elem: super::ElementSinkTest,
buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
@@ -454,7 +452,7 @@ mod imp_sink {
fn sink_chain_list(
self,
- _pad: PadSinkWeak,
+ _pad: gst::Pad,
elem: super::ElementSinkTest,
list: gst::BufferList,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
@@ -465,8 +463,8 @@ mod imp_sink {
.boxed()
}
- fn sink_event(self, pad: &PadSinkRef, imp: &ElementSinkTest, event: gst::Event) -> bool {
- gst::debug!(SINK_CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event);
+ fn sink_event(self, pad: &gst::Pad, imp: &ElementSinkTest, event: gst::Event) -> bool {
+ gst::debug!(SINK_CAT, obj: pad, "Handling non-serialized {:?}", event);
match event.view() {
EventView::FlushStart(..) => {
@@ -479,13 +477,12 @@ mod imp_sink {
fn sink_event_serialized(
self,
- pad: PadSinkWeak,
+ pad: gst::Pad,
elem: super::ElementSinkTest,
event: gst::Event,
) -> BoxFuture<'static, bool> {
async move {
- let pad = pad.upgrade().expect("PadSink no longer exists");
- gst::log!(SINK_CAT, obj: pad.gst_pad(), "Handling serialized {:?}", event);
+ gst::log!(SINK_CAT, obj: pad, "Handling serialized {:?}", event);
let imp = elem.imp();
if let EventView::FlushStop(..) = event.view() {