Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/utils
diff options
context:
space:
mode:
authorSebastian Dröge <sebastian@centricular.com>2023-10-16 19:16:52 +0300
committerSebastian Dröge <sebastian@centricular.com>2023-10-17 08:44:06 +0300
commitd468e1e4a695902ce011a8fb9f35f0118b9d1e87 (patch)
tree34c64e66c9040feee349703d8d12c92e2a942324 /utils
parent50dd519c4f99e162fc1bca3b9d85eb056a2fa78f (diff)
Clean up usage of pad probes
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1361>
Diffstat (limited to 'utils')
-rw-r--r--utils/fallbackswitch/src/fallbacksrc/imp.rs162
-rw-r--r--utils/togglerecord/tests/tests.rs8
-rw-r--r--utils/uriplaylistbin/src/uriplaylistbin/imp.rs128
3 files changed, 150 insertions, 148 deletions
diff --git a/utils/fallbackswitch/src/fallbacksrc/imp.rs b/utils/fallbackswitch/src/fallbacksrc/imp.rs
index 745ad8490..2001758fe 100644
--- a/utils/fallbackswitch/src/fallbacksrc/imp.rs
+++ b/utils/fallbackswitch/src/fallbacksrc/imp.rs
@@ -1911,77 +1911,79 @@ impl FallbackSrc {
let imp = element.imp();
- match info.data {
- Some(gst::PadProbeData::Event(ref ev)) if ev.type_() == gst::EventType::Eos => {
- gst::debug!(
- CAT,
- obj: element,
- "Received EOS from {}source on pad {}",
- if fallback_source { "fallback " } else { "" },
- pad.name()
- );
+ let Some(ev) = info.event() else {
+ return gst::PadProbeReturn::Ok;
+ };
- let mut state_guard = imp.state.lock();
- let state = match &mut *state_guard {
- None => {
- return gst::PadProbeReturn::Ok;
- }
- Some(state) => state,
- };
+ if ev.type_() != gst::EventType::Eos {
+ return gst::PadProbeReturn::Ok;
+ }
- if is_image {
- gst::PadProbeReturn::Ok
- } else if state.settings.restart_on_eos || fallback_source {
- imp.handle_source_error(state, RetryReason::Eos, fallback_source);
- drop(state_guard);
- element.notify("statistics");
+ gst::debug!(
+ CAT,
+ obj: element,
+ "Received EOS from {}source on pad {}",
+ if fallback_source { "fallback " } else { "" },
+ pad.name()
+ );
- gst::PadProbeReturn::Drop
- } else {
- // Send EOS to all sinkpads of the fallbackswitch and also to the other
- // stream's fallbackswitch if it doesn't have a main branch.
- let mut sinkpads = vec![];
+ let mut state_guard = imp.state.lock();
+ let state = match &mut *state_guard {
+ None => {
+ return gst::PadProbeReturn::Ok;
+ }
+ Some(state) => state,
+ };
- if let Some(stream) = {
- if is_video {
- state.video_stream.as_ref()
- } else {
- state.audio_stream.as_ref()
- }
- } {
- sinkpads
- .extend(stream.switch.sink_pads().into_iter().filter(|p| p != pad));
- }
+ if is_image {
+ gst::PadProbeReturn::Ok
+ } else if state.settings.restart_on_eos || fallback_source {
+ imp.handle_source_error(state, RetryReason::Eos, fallback_source);
+ drop(state_guard);
+ element.notify("statistics");
- if let Some(other_stream) = {
- if is_video {
- state.audio_stream.as_ref()
- } else {
- state.video_stream.as_ref()
- }
- } {
- if other_stream.main_branch.is_none() {
- sinkpads.extend(
- other_stream
- .switch
- .sink_pads()
- .into_iter()
- .filter(|p| p != pad),
- );
- }
- }
+ gst::PadProbeReturn::Drop
+ } else {
+ // Send EOS to all sinkpads of the fallbackswitch and also to the other
+ // stream's fallbackswitch if it doesn't have a main branch.
+ let mut sinkpads = vec![];
- let event = ev.clone();
- element.call_async(move |_| {
- for sinkpad in sinkpads {
- sinkpad.send_event(event.clone());
- }
- });
+ if let Some(stream) = {
+ if is_video {
+ state.video_stream.as_ref()
+ } else {
+ state.audio_stream.as_ref()
+ }
+ } {
+ sinkpads.extend(stream.switch.sink_pads().into_iter().filter(|p| p != pad));
+ }
- gst::PadProbeReturn::Ok
+ if let Some(other_stream) = {
+ if is_video {
+ state.audio_stream.as_ref()
+ } else {
+ state.video_stream.as_ref()
+ }
+ } {
+ if other_stream.main_branch.is_none() {
+ sinkpads.extend(
+ other_stream
+ .switch
+ .sink_pads()
+ .into_iter()
+ .filter(|p| p != pad),
+ );
}
}
- _ => gst::PadProbeReturn::Ok,
+
+ let event = ev.clone();
+ element.call_async(move |_| {
+ for sinkpad in sinkpads {
+ sinkpad.send_event(event.clone());
+ }
+ });
+
+ gst::PadProbeReturn::Ok
}
});
@@ -2056,13 +2058,15 @@ impl FallbackSrc {
let qos_probe_id = block_pad
.add_probe(gst::PadProbeType::EVENT_UPSTREAM, |_pad, info| {
- if let Some(gst::PadProbeData::Event(ref ev)) = info.data {
- if let gst::EventView::Qos(_) = ev.view() {
- return gst::PadProbeReturn::Drop;
- }
+ let Some(ev) = info.event() else {
+ return gst::PadProbeReturn::Ok;
+ };
+
+ if ev.type_() != gst::EventType::Qos {
+ return gst::PadProbeReturn::Ok;
}
- gst::PadProbeReturn::Ok
+ gst::PadProbeReturn::Drop
})
.unwrap();
@@ -2871,19 +2875,17 @@ impl FallbackSrc {
// error. We don't need to remove these pad probes because restarting the source will also
// remove/add the pads again.
for pad in source.source.src_pads() {
- pad.add_probe(
- gst::PadProbeType::EVENT_DOWNSTREAM,
- |_pad, info| match info.data {
- Some(gst::PadProbeData::Event(ref event)) => {
- if event.type_() == gst::EventType::Eos {
- gst::PadProbeReturn::Drop
- } else {
- gst::PadProbeReturn::Ok
- }
- }
- _ => unreachable!(),
- },
- )
+ pad.add_probe(gst::PadProbeType::EVENT_DOWNSTREAM, |_pad, info| {
+ let Some(ev) = info.event() else {
+ return gst::PadProbeReturn::Pass;
+ };
+
+ if ev.type_() != gst::EventType::Eos {
+ return gst::PadProbeReturn::Pass;
+ }
+
+ gst::PadProbeReturn::Drop
+ })
.unwrap();
}
diff --git a/utils/togglerecord/tests/tests.rs b/utils/togglerecord/tests/tests.rs
index e59fcdfcc..e3614fc15 100644
--- a/utils/togglerecord/tests/tests.rs
+++ b/utils/togglerecord/tests/tests.rs
@@ -66,14 +66,12 @@ fn setup_sender_receiver(
sinkpad.add_probe(
gst::PadProbeType::QUERY_UPSTREAM,
move |_pad, probe_info| {
- let query = match &mut probe_info.data {
- Some(gst::PadProbeData::Query(q)) => q,
- _ => unreachable!(),
+ let Some(query) = probe_info.query_mut() else {
+ unreachable!();
};
- use gst::QueryViewMut::*;
match query.view_mut() {
- Latency(q) => {
+ gst::QueryViewMut::Latency(q) => {
q.set(live, gst::ClockTime::ZERO, None);
gst::PadProbeReturn::Handled
}
diff --git a/utils/uriplaylistbin/src/uriplaylistbin/imp.rs b/utils/uriplaylistbin/src/uriplaylistbin/imp.rs
index d213b05ca..c8f32f272 100644
--- a/utils/uriplaylistbin/src/uriplaylistbin/imp.rs
+++ b/utils/uriplaylistbin/src/uriplaylistbin/imp.rs
@@ -1286,35 +1286,36 @@ impl UriPlaylistBin {
let src_pad_name = sync_sink.name().to_string().replace("sink", "src");
let sync_src = state.streamsynchronizer.static_pad(&src_pad_name).unwrap();
sync_src.add_probe(gst::PadProbeType::EVENT_DOWNSTREAM, move |_pad, info| {
- match info.data {
- Some(gst::PadProbeData::Event(ref ev))
- if ev.type_() == gst::EventType::Eos =>
- {
- let element = match element_weak.upgrade() {
- Some(element) => element,
- None => return gst::PadProbeReturn::Remove,
- };
- let imp = element.imp();
-
- let item = {
- let mut state_guard = imp.state.lock().unwrap();
- let state = state_guard.as_mut().unwrap();
- state.waiting_for_ss_eos.as_ref().cloned()
- };
-
- if let Some(item) = item {
- if item.dec_waiting_eos_ss() {
- gst::debug!(CAT, imp: imp, "streamsynchronizer has been flushed, reorganize pipeline to fit new streams topology and unblock item");
- imp.handle_topology_change();
- gst::PadProbeReturn::Drop
- } else {
- gst::PadProbeReturn::Drop
- }
- } else {
- gst::PadProbeReturn::Pass
- }
+ let Some(ev) = info.event() else {
+ return gst::PadProbeReturn::Pass;
+ };
+
+ if ev.type_() != gst::EventType::Eos {
+ return gst::PadProbeReturn::Pass;
+ }
+
+ let element = match element_weak.upgrade() {
+ Some(element) => element,
+ None => return gst::PadProbeReturn::Remove,
+ };
+ let imp = element.imp();
+
+ let item = {
+ let mut state_guard = imp.state.lock().unwrap();
+ let state = state_guard.as_mut().unwrap();
+ state.waiting_for_ss_eos.as_ref().cloned()
+ };
+
+ if let Some(item) = item {
+ if item.dec_waiting_eos_ss() {
+ gst::debug!(CAT, imp: imp, "streamsynchronizer has been flushed, reorganize pipeline to fit new streams topology and unblock item");
+ imp.handle_topology_change();
+ gst::PadProbeReturn::Drop
+ } else {
+ gst::PadProbeReturn::Drop
}
- _ => gst::PadProbeReturn::Pass,
+ } else {
+ gst::PadProbeReturn::Pass
}
});
@@ -1434,47 +1435,48 @@ impl UriPlaylistBin {
gst::PadProbeReturn::Pass
} else {
- match info.data {
- Some(gst::PadProbeData::Event(ref ev))
- if ev.type_() == gst::EventType::Eos =>
+ let Some(ev) = info.event() else {
+ return gst::PadProbeReturn::Pass;
+ };
+
+ if ev.type_() != gst::EventType::Eos {
+ return gst::PadProbeReturn::Pass;
+ }
+
+ if item.dec_waiting_eos() {
+ // all the streams are eos, item is now done
+ gst::log!(
+ CAT,
+ obj: element,
+ "all streams of item #{} are eos",
+ item.index()
+ );
+
+ let imp = element.imp();
{
- if item.dec_waiting_eos() {
- // all the streams are eos, item is now done
- gst::log!(
- CAT,
- obj: element,
- "all streams of item #{} are eos",
- item.index()
- );
-
- let imp = element.imp();
- {
- let mut state_guard = imp.state.lock().unwrap();
- let state = state_guard.as_mut().unwrap();
-
- let index = item.index();
-
- let removed = state
- .streaming
- .iter()
- .position(|i| i.index() == index)
- .map(|e| state.streaming.remove(e));
-
- if let Some(item) = removed {
- item.set_done();
- state.done.push(item);
- }
- }
+ let mut state_guard = imp.state.lock().unwrap();
+ let state = state_guard.as_mut().unwrap();
- if let Err(e) = imp.start_next_item() {
- imp.failed(e);
- }
+ let index = item.index();
+
+ let removed = state
+ .streaming
+ .iter()
+ .position(|i| i.index() == index)
+ .map(|e| state.streaming.remove(e));
+
+ if let Some(item) = removed {
+ item.set_done();
+ state.done.push(item);
}
+ }
- gst::PadProbeReturn::Remove
+ if let Err(e) = imp.start_next_item() {
+ imp.failed(e);
}
- _ => gst::PadProbeReturn::Pass,
}
+
+ gst::PadProbeReturn::Remove
}
});