diff options
author | Mathieu Duponchelle <mathieu@centricular.com> | 2021-05-28 02:43:50 +0300 |
---|---|---|
committer | GStreamer Marge Bot <gitlab-merge-bot@gstreamer-foundation.org> | 2021-06-30 03:46:46 +0300 |
commit | ef41adf776830eeb54ad21505fafb8ad7fc931e7 (patch) | |
tree | 9ba0ed49f660b3f24e5127a418cfd9e26ad58630 | |
parent | 29052b1acbfb0a014767b59cf3e35d00c782c407 (diff) |
fallbacksrc: implement manual unblocking feature
This enables a use case for preparing slow to start up sources
ahead of time in a live cueing system, where a stream is scheduled
to start at some point in the future, and the application wants to
make sure it is ready for prime time by that time, instead of
spinning it up at the last moment and waiting for the stream to
actually come up.
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/515>
-rw-r--r-- | utils/fallbackswitch/src/fallbacksrc/imp.rs | 114 |
1 files changed, 98 insertions, 16 deletions
diff --git a/utils/fallbackswitch/src/fallbacksrc/imp.rs b/utils/fallbackswitch/src/fallbacksrc/imp.rs index 744057231..a6e6a9b77 100644 --- a/utils/fallbackswitch/src/fallbacksrc/imp.rs +++ b/utils/fallbackswitch/src/fallbacksrc/imp.rs @@ -80,6 +80,7 @@ struct Settings { min_latency: gst::ClockTime, buffer_duration: i64, immediate_fallback: bool, + manual_unblock: bool, } impl Default for Settings { @@ -97,6 +98,7 @@ impl Default for Settings { min_latency: gst::ClockTime::ZERO, buffer_duration: -1, immediate_fallback: false, + manual_unblock: false, } } } @@ -171,6 +173,12 @@ struct State { // Statistics stats: Stats, + + // When application is using the manual-unblock property + manually_blocked: bool, + // So that we don't schedule a restart when manually unblocking + // and our source hasn't reached the required state + schedule_restart_on_unblock: bool, } #[derive(Default)] @@ -289,6 +297,13 @@ impl ObjectImpl for FallbackSrc { glib::ParamFlags::READABLE, ), glib::ParamSpec::new_boolean( + "manual-unblock", + "Manual unblock", + "When enabled, the application must call the unblock signal, except for live streams", + false, + glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, + ), + glib::ParamSpec::new_boolean( "immediate-fallback", "Immediate fallback", "Forward the fallback streams immediately at startup, when the primary streams are slow to start up and immediate output is required", @@ -453,6 +468,18 @@ impl ObjectImpl for FallbackSrc { ); settings.immediate_fallback = new_value; } + "manual-unblock" => { + let mut settings = self.settings.lock().unwrap(); + let new_value = value.get().expect("type checked upstream"); + gst_info!( + CAT, + obj: obj, + "Changing manual-unblock from {:?} to {:?}", + settings.manual_unblock, + new_value, + ); + settings.manual_unblock = new_value; + } _ => unimplemented!(), } } @@ -563,28 +590,64 @@ impl ObjectImpl for FallbackSrc { let settings = self.settings.lock().unwrap(); settings.immediate_fallback.to_value() } + "manual-unblock" => { + let settings = self.settings.lock().unwrap(); + settings.manual_unblock.to_value() + } _ => unimplemented!(), } } fn signals() -> &'static [glib::subclass::Signal] { static SIGNALS: Lazy<Vec<glib::subclass::Signal>> = Lazy::new(|| { - vec![glib::subclass::Signal::builder( - "update-uri", - &[String::static_type().into()], - String::static_type().into(), - ) - .action() - .class_handler(|_token, args| { - // Simply return the input by default - Some(args[1].clone()) - }) - .accumulator(|_hint, ret, value| { - // First signal handler wins - *ret = value.clone(); - false - }) - .build()] + vec![ + glib::subclass::Signal::builder( + "update-uri", + &[String::static_type().into()], + String::static_type().into(), + ) + .action() + .class_handler(|_token, args| { + // Simply return the input by default + Some(args[1].clone()) + }) + .accumulator(|_hint, ret, value| { + // First signal handler wins + *ret = value.clone(); + false + }) + .build(), + glib::subclass::Signal::builder("unblock", &[], glib::types::Type::UNIT.into()) + .action() + .class_handler(|_token, args| { + let element = args[0].get::<super::FallbackSrc>().expect("signal arg"); + let src = FallbackSrc::from_instance(&element); + let mut state_guard = src.state.lock().unwrap(); + let state = match &mut *state_guard { + None => { + return None; + } + Some(state) => state, + }; + + state.manually_blocked = false; + + if state.schedule_restart_on_unblock + && src.have_fallback_activated(&element, state) + { + src.schedule_source_restart_timeout( + &element, + state, + gst::ClockTime::ZERO, + ); + } + + src.unblock_pads(&element, state); + + None + }) + .build(), + ] }); SIGNALS.as_ref() @@ -1007,6 +1070,8 @@ impl FallbackSrc { None }; + let manually_blocked = settings.manual_unblock; + *state_guard = Some(State { source, source_is_live: false, @@ -1022,6 +1087,8 @@ impl FallbackSrc { settings, configured_source, stats: Stats::default(), + manually_blocked, + schedule_restart_on_unblock: false, }); drop(state_guard); @@ -1155,6 +1222,7 @@ impl FallbackSrc { || (!state.source_is_live && transition == gst::StateChange::PausedToPlaying) { assert!(state.source_restart_timeout.is_none()); + state.schedule_restart_on_unblock = true; self.schedule_source_restart_timeout(element, state, gst::ClockTime::ZERO); } } @@ -1493,6 +1561,11 @@ impl FallbackSrc { } fn unblock_pads(&self, element: &super::FallbackSrc, state: &mut State) { + if state.manually_blocked { + gst_debug!(CAT, obj: element, "Not unblocking yet: manual unblock",); + return; + } + // Check if all streams are blocked and have a running time and we have // 100% buffering if state.stats.buffering_percent < 100 { @@ -2153,6 +2226,15 @@ impl FallbackSrc { return; } + if state.manually_blocked { + gst_debug!( + CAT, + obj: element, + "Not scheduling source restart timeout because we are manually blocked", + ); + return; + } + let clock = gst::SystemClock::obtain(); let wait_time = clock.time().unwrap() + state.settings.restart_timeout - elapsed; gst_debug!( |