Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMathieu Duponchelle <mathieu@centricular.com>2021-05-28 02:43:50 +0300
committerGStreamer Marge Bot <gitlab-merge-bot@gstreamer-foundation.org>2021-06-30 03:46:46 +0300
commitef41adf776830eeb54ad21505fafb8ad7fc931e7 (patch)
tree9ba0ed49f660b3f24e5127a418cfd9e26ad58630
parent29052b1acbfb0a014767b59cf3e35d00c782c407 (diff)
fallbacksrc: implement manual unblocking feature
This enables a use case for preparing slow to start up sources ahead of time in a live cueing system, where a stream is scheduled to start at some point in the future, and the application wants to make sure it is ready for prime time by that time, instead of spinning it up at the last moment and waiting for the stream to actually come up. Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/515>
-rw-r--r--utils/fallbackswitch/src/fallbacksrc/imp.rs114
1 files changed, 98 insertions, 16 deletions
diff --git a/utils/fallbackswitch/src/fallbacksrc/imp.rs b/utils/fallbackswitch/src/fallbacksrc/imp.rs
index 744057231..a6e6a9b77 100644
--- a/utils/fallbackswitch/src/fallbacksrc/imp.rs
+++ b/utils/fallbackswitch/src/fallbacksrc/imp.rs
@@ -80,6 +80,7 @@ struct Settings {
min_latency: gst::ClockTime,
buffer_duration: i64,
immediate_fallback: bool,
+ manual_unblock: bool,
}
impl Default for Settings {
@@ -97,6 +98,7 @@ impl Default for Settings {
min_latency: gst::ClockTime::ZERO,
buffer_duration: -1,
immediate_fallback: false,
+ manual_unblock: false,
}
}
}
@@ -171,6 +173,12 @@ struct State {
// Statistics
stats: Stats,
+
+ // When application is using the manual-unblock property
+ manually_blocked: bool,
+ // So that we don't schedule a restart when manually unblocking
+ // and our source hasn't reached the required state
+ schedule_restart_on_unblock: bool,
}
#[derive(Default)]
@@ -289,6 +297,13 @@ impl ObjectImpl for FallbackSrc {
glib::ParamFlags::READABLE,
),
glib::ParamSpec::new_boolean(
+ "manual-unblock",
+ "Manual unblock",
+ "When enabled, the application must call the unblock signal, except for live streams",
+ false,
+ glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
+ ),
+ glib::ParamSpec::new_boolean(
"immediate-fallback",
"Immediate fallback",
"Forward the fallback streams immediately at startup, when the primary streams are slow to start up and immediate output is required",
@@ -453,6 +468,18 @@ impl ObjectImpl for FallbackSrc {
);
settings.immediate_fallback = new_value;
}
+ "manual-unblock" => {
+ let mut settings = self.settings.lock().unwrap();
+ let new_value = value.get().expect("type checked upstream");
+ gst_info!(
+ CAT,
+ obj: obj,
+ "Changing manual-unblock from {:?} to {:?}",
+ settings.manual_unblock,
+ new_value,
+ );
+ settings.manual_unblock = new_value;
+ }
_ => unimplemented!(),
}
}
@@ -563,28 +590,64 @@ impl ObjectImpl for FallbackSrc {
let settings = self.settings.lock().unwrap();
settings.immediate_fallback.to_value()
}
+ "manual-unblock" => {
+ let settings = self.settings.lock().unwrap();
+ settings.manual_unblock.to_value()
+ }
_ => unimplemented!(),
}
}
fn signals() -> &'static [glib::subclass::Signal] {
static SIGNALS: Lazy<Vec<glib::subclass::Signal>> = Lazy::new(|| {
- vec![glib::subclass::Signal::builder(
- "update-uri",
- &[String::static_type().into()],
- String::static_type().into(),
- )
- .action()
- .class_handler(|_token, args| {
- // Simply return the input by default
- Some(args[1].clone())
- })
- .accumulator(|_hint, ret, value| {
- // First signal handler wins
- *ret = value.clone();
- false
- })
- .build()]
+ vec![
+ glib::subclass::Signal::builder(
+ "update-uri",
+ &[String::static_type().into()],
+ String::static_type().into(),
+ )
+ .action()
+ .class_handler(|_token, args| {
+ // Simply return the input by default
+ Some(args[1].clone())
+ })
+ .accumulator(|_hint, ret, value| {
+ // First signal handler wins
+ *ret = value.clone();
+ false
+ })
+ .build(),
+ glib::subclass::Signal::builder("unblock", &[], glib::types::Type::UNIT.into())
+ .action()
+ .class_handler(|_token, args| {
+ let element = args[0].get::<super::FallbackSrc>().expect("signal arg");
+ let src = FallbackSrc::from_instance(&element);
+ let mut state_guard = src.state.lock().unwrap();
+ let state = match &mut *state_guard {
+ None => {
+ return None;
+ }
+ Some(state) => state,
+ };
+
+ state.manually_blocked = false;
+
+ if state.schedule_restart_on_unblock
+ && src.have_fallback_activated(&element, state)
+ {
+ src.schedule_source_restart_timeout(
+ &element,
+ state,
+ gst::ClockTime::ZERO,
+ );
+ }
+
+ src.unblock_pads(&element, state);
+
+ None
+ })
+ .build(),
+ ]
});
SIGNALS.as_ref()
@@ -1007,6 +1070,8 @@ impl FallbackSrc {
None
};
+ let manually_blocked = settings.manual_unblock;
+
*state_guard = Some(State {
source,
source_is_live: false,
@@ -1022,6 +1087,8 @@ impl FallbackSrc {
settings,
configured_source,
stats: Stats::default(),
+ manually_blocked,
+ schedule_restart_on_unblock: false,
});
drop(state_guard);
@@ -1155,6 +1222,7 @@ impl FallbackSrc {
|| (!state.source_is_live && transition == gst::StateChange::PausedToPlaying)
{
assert!(state.source_restart_timeout.is_none());
+ state.schedule_restart_on_unblock = true;
self.schedule_source_restart_timeout(element, state, gst::ClockTime::ZERO);
}
}
@@ -1493,6 +1561,11 @@ impl FallbackSrc {
}
fn unblock_pads(&self, element: &super::FallbackSrc, state: &mut State) {
+ if state.manually_blocked {
+ gst_debug!(CAT, obj: element, "Not unblocking yet: manual unblock",);
+ return;
+ }
+
// Check if all streams are blocked and have a running time and we have
// 100% buffering
if state.stats.buffering_percent < 100 {
@@ -2153,6 +2226,15 @@ impl FallbackSrc {
return;
}
+ if state.manually_blocked {
+ gst_debug!(
+ CAT,
+ obj: element,
+ "Not scheduling source restart timeout because we are manually blocked",
+ );
+ return;
+ }
+
let clock = gst::SystemClock::obtain();
let wait_time = clock.time().unwrap() + state.settings.restart_timeout - elapsed;
gst_debug!(