From d468e1e4a695902ce011a8fb9f35f0118b9d1e87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Mon, 16 Oct 2023 19:16:52 +0300 Subject: Clean up usage of pad probes Part-of: --- utils/fallbackswitch/src/fallbacksrc/imp.rs | 162 +++++++++++++------------ utils/togglerecord/tests/tests.rs | 8 +- utils/uriplaylistbin/src/uriplaylistbin/imp.rs | 128 +++++++++---------- 3 files changed, 150 insertions(+), 148 deletions(-) (limited to 'utils') diff --git a/utils/fallbackswitch/src/fallbacksrc/imp.rs b/utils/fallbackswitch/src/fallbacksrc/imp.rs index 745ad849..2001758f 100644 --- a/utils/fallbackswitch/src/fallbacksrc/imp.rs +++ b/utils/fallbackswitch/src/fallbacksrc/imp.rs @@ -1911,77 +1911,79 @@ impl FallbackSrc { let imp = element.imp(); - match info.data { - Some(gst::PadProbeData::Event(ref ev)) if ev.type_() == gst::EventType::Eos => { - gst::debug!( - CAT, - obj: element, - "Received EOS from {}source on pad {}", - if fallback_source { "fallback " } else { "" }, - pad.name() - ); + let Some(ev) = info.event() else { + return gst::PadProbeReturn::Ok; + }; - let mut state_guard = imp.state.lock(); - let state = match &mut *state_guard { - None => { - return gst::PadProbeReturn::Ok; - } - Some(state) => state, - }; + if ev.type_() != gst::EventType::Eos { + return gst::PadProbeReturn::Ok; + } - if is_image { - gst::PadProbeReturn::Ok - } else if state.settings.restart_on_eos || fallback_source { - imp.handle_source_error(state, RetryReason::Eos, fallback_source); - drop(state_guard); - element.notify("statistics"); + gst::debug!( + CAT, + obj: element, + "Received EOS from {}source on pad {}", + if fallback_source { "fallback " } else { "" }, + pad.name() + ); - gst::PadProbeReturn::Drop - } else { - // Send EOS to all sinkpads of the fallbackswitch and also to the other - // stream's fallbackswitch if it doesn't have a main branch. - let mut sinkpads = vec![]; + let mut state_guard = imp.state.lock(); + let state = match &mut *state_guard { + None => { + return gst::PadProbeReturn::Ok; + } + Some(state) => state, + }; - if let Some(stream) = { - if is_video { - state.video_stream.as_ref() - } else { - state.audio_stream.as_ref() - } - } { - sinkpads - .extend(stream.switch.sink_pads().into_iter().filter(|p| p != pad)); - } + if is_image { + gst::PadProbeReturn::Ok + } else if state.settings.restart_on_eos || fallback_source { + imp.handle_source_error(state, RetryReason::Eos, fallback_source); + drop(state_guard); + element.notify("statistics"); - if let Some(other_stream) = { - if is_video { - state.audio_stream.as_ref() - } else { - state.video_stream.as_ref() - } - } { - if other_stream.main_branch.is_none() { - sinkpads.extend( - other_stream - .switch - .sink_pads() - .into_iter() - .filter(|p| p != pad), - ); - } - } + gst::PadProbeReturn::Drop + } else { + // Send EOS to all sinkpads of the fallbackswitch and also to the other + // stream's fallbackswitch if it doesn't have a main branch. + let mut sinkpads = vec![]; - let event = ev.clone(); - element.call_async(move |_| { - for sinkpad in sinkpads { - sinkpad.send_event(event.clone()); - } - }); + if let Some(stream) = { + if is_video { + state.video_stream.as_ref() + } else { + state.audio_stream.as_ref() + } + } { + sinkpads.extend(stream.switch.sink_pads().into_iter().filter(|p| p != pad)); + } - gst::PadProbeReturn::Ok + if let Some(other_stream) = { + if is_video { + state.audio_stream.as_ref() + } else { + state.video_stream.as_ref() + } + } { + if other_stream.main_branch.is_none() { + sinkpads.extend( + other_stream + .switch + .sink_pads() + .into_iter() + .filter(|p| p != pad), + ); } } - _ => gst::PadProbeReturn::Ok, + + let event = ev.clone(); + element.call_async(move |_| { + for sinkpad in sinkpads { + sinkpad.send_event(event.clone()); + } + }); + + gst::PadProbeReturn::Ok } }); @@ -2056,13 +2058,15 @@ impl FallbackSrc { let qos_probe_id = block_pad .add_probe(gst::PadProbeType::EVENT_UPSTREAM, |_pad, info| { - if let Some(gst::PadProbeData::Event(ref ev)) = info.data { - if let gst::EventView::Qos(_) = ev.view() { - return gst::PadProbeReturn::Drop; - } + let Some(ev) = info.event() else { + return gst::PadProbeReturn::Ok; + }; + + if ev.type_() != gst::EventType::Qos { + return gst::PadProbeReturn::Ok; } - gst::PadProbeReturn::Ok + gst::PadProbeReturn::Drop }) .unwrap(); @@ -2871,19 +2875,17 @@ impl FallbackSrc { // error. We don't need to remove these pad probes because restarting the source will also // remove/add the pads again. for pad in source.source.src_pads() { - pad.add_probe( - gst::PadProbeType::EVENT_DOWNSTREAM, - |_pad, info| match info.data { - Some(gst::PadProbeData::Event(ref event)) => { - if event.type_() == gst::EventType::Eos { - gst::PadProbeReturn::Drop - } else { - gst::PadProbeReturn::Ok - } - } - _ => unreachable!(), - }, - ) + pad.add_probe(gst::PadProbeType::EVENT_DOWNSTREAM, |_pad, info| { + let Some(ev) = info.event() else { + return gst::PadProbeReturn::Pass; + }; + + if ev.type_() != gst::EventType::Eos { + return gst::PadProbeReturn::Pass; + } + + gst::PadProbeReturn::Drop + }) .unwrap(); } diff --git a/utils/togglerecord/tests/tests.rs b/utils/togglerecord/tests/tests.rs index e59fcdfc..e3614fc1 100644 --- a/utils/togglerecord/tests/tests.rs +++ b/utils/togglerecord/tests/tests.rs @@ -66,14 +66,12 @@ fn setup_sender_receiver( sinkpad.add_probe( gst::PadProbeType::QUERY_UPSTREAM, move |_pad, probe_info| { - let query = match &mut probe_info.data { - Some(gst::PadProbeData::Query(q)) => q, - _ => unreachable!(), + let Some(query) = probe_info.query_mut() else { + unreachable!(); }; - use gst::QueryViewMut::*; match query.view_mut() { - Latency(q) => { + gst::QueryViewMut::Latency(q) => { q.set(live, gst::ClockTime::ZERO, None); gst::PadProbeReturn::Handled } diff --git a/utils/uriplaylistbin/src/uriplaylistbin/imp.rs b/utils/uriplaylistbin/src/uriplaylistbin/imp.rs index d213b05c..c8f32f27 100644 --- a/utils/uriplaylistbin/src/uriplaylistbin/imp.rs +++ b/utils/uriplaylistbin/src/uriplaylistbin/imp.rs @@ -1286,35 +1286,36 @@ impl UriPlaylistBin { let src_pad_name = sync_sink.name().to_string().replace("sink", "src"); let sync_src = state.streamsynchronizer.static_pad(&src_pad_name).unwrap(); sync_src.add_probe(gst::PadProbeType::EVENT_DOWNSTREAM, move |_pad, info| { - match info.data { - Some(gst::PadProbeData::Event(ref ev)) - if ev.type_() == gst::EventType::Eos => - { - let element = match element_weak.upgrade() { - Some(element) => element, - None => return gst::PadProbeReturn::Remove, - }; - let imp = element.imp(); - - let item = { - let mut state_guard = imp.state.lock().unwrap(); - let state = state_guard.as_mut().unwrap(); - state.waiting_for_ss_eos.as_ref().cloned() - }; - - if let Some(item) = item { - if item.dec_waiting_eos_ss() { - gst::debug!(CAT, imp: imp, "streamsynchronizer has been flushed, reorganize pipeline to fit new streams topology and unblock item"); - imp.handle_topology_change(); - gst::PadProbeReturn::Drop - } else { - gst::PadProbeReturn::Drop - } - } else { - gst::PadProbeReturn::Pass - } + let Some(ev) = info.event() else { + return gst::PadProbeReturn::Pass; + }; + + if ev.type_() != gst::EventType::Eos { + return gst::PadProbeReturn::Pass; + } + + let element = match element_weak.upgrade() { + Some(element) => element, + None => return gst::PadProbeReturn::Remove, + }; + let imp = element.imp(); + + let item = { + let mut state_guard = imp.state.lock().unwrap(); + let state = state_guard.as_mut().unwrap(); + state.waiting_for_ss_eos.as_ref().cloned() + }; + + if let Some(item) = item { + if item.dec_waiting_eos_ss() { + gst::debug!(CAT, imp: imp, "streamsynchronizer has been flushed, reorganize pipeline to fit new streams topology and unblock item"); + imp.handle_topology_change(); + gst::PadProbeReturn::Drop + } else { + gst::PadProbeReturn::Drop } - _ => gst::PadProbeReturn::Pass, + } else { + gst::PadProbeReturn::Pass } }); @@ -1434,47 +1435,48 @@ impl UriPlaylistBin { gst::PadProbeReturn::Pass } else { - match info.data { - Some(gst::PadProbeData::Event(ref ev)) - if ev.type_() == gst::EventType::Eos => + let Some(ev) = info.event() else { + return gst::PadProbeReturn::Pass; + }; + + if ev.type_() != gst::EventType::Eos { + return gst::PadProbeReturn::Pass; + } + + if item.dec_waiting_eos() { + // all the streams are eos, item is now done + gst::log!( + CAT, + obj: element, + "all streams of item #{} are eos", + item.index() + ); + + let imp = element.imp(); { - if item.dec_waiting_eos() { - // all the streams are eos, item is now done - gst::log!( - CAT, - obj: element, - "all streams of item #{} are eos", - item.index() - ); - - let imp = element.imp(); - { - let mut state_guard = imp.state.lock().unwrap(); - let state = state_guard.as_mut().unwrap(); - - let index = item.index(); - - let removed = state - .streaming - .iter() - .position(|i| i.index() == index) - .map(|e| state.streaming.remove(e)); - - if let Some(item) = removed { - item.set_done(); - state.done.push(item); - } - } + let mut state_guard = imp.state.lock().unwrap(); + let state = state_guard.as_mut().unwrap(); - if let Err(e) = imp.start_next_item() { - imp.failed(e); - } + let index = item.index(); + + let removed = state + .streaming + .iter() + .position(|i| i.index() == index) + .map(|e| state.streaming.remove(e)); + + if let Some(item) = removed { + item.set_done(); + state.done.push(item); } + } - gst::PadProbeReturn::Remove + if let Err(e) = imp.start_next_item() { + imp.failed(e); } - _ => gst::PadProbeReturn::Pass, } + + gst::PadProbeReturn::Remove } }); -- cgit v1.2.3