Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJan Schmidt <jan@centricular.com>2023-04-12 19:30:54 +0300
committerSebastian Dröge <sebastian@centricular.com>2023-07-19 09:37:38 +0300
commit081a90fefc385026ce20bf945aca63389a7dac2b (patch)
tree1a421a960cce814586cbb87f2987dce0798760fd
parent4b0330f6805063a4c19ced86b8c40b59670fdb79 (diff)
fallbackswitch: Fix pad health calculation and notifies
Change the pad health calculation to consider a pad 'healthy' if it has received data within the last 'timeout' window. Previously, inactive pads were constantly flip-flopping between healthy and not healthy depending on whether they were slightly ahead of or behind the active pad running_time. When the health status of a pad changes, make sure to always notify the property, so that applications that are manually controlling the active pad can make their switching decisions. Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1285>
-rw-r--r--utils/fallbackswitch/src/fallbackswitch/imp.rs219
-rw-r--r--utils/fallbackswitch/tests/fallbackswitch.rs5
2 files changed, 191 insertions, 33 deletions
diff --git a/utils/fallbackswitch/src/fallbackswitch/imp.rs b/utils/fallbackswitch/src/fallbackswitch/imp.rs
index 35b1d4601..ee472c435 100644
--- a/utils/fallbackswitch/src/fallbackswitch/imp.rs
+++ b/utils/fallbackswitch/src/fallbackswitch/imp.rs
@@ -77,7 +77,7 @@ struct State {
output_running_time: Option<gst::ClockTime>,
- timeout_running_time: gst::ClockTime,
+ timeout_running_time: Option<gst::ClockTime>,
timeout_clock_id: Option<gst::ClockId>,
}
@@ -92,7 +92,7 @@ impl Default for State {
output_running_time: None,
- timeout_running_time: gst::ClockTime::ZERO,
+ timeout_running_time: None,
timeout_clock_id: None,
}
}
@@ -359,13 +359,52 @@ impl SinkState {
Some(clock_id)
}
- fn is_healthy(&self, state: &State, settings: &Settings) -> bool {
- match self.current_running_time {
- Some(current_running_time) => {
- current_running_time >= state.timeout_running_time.saturating_sub(settings.timeout)
- && current_running_time <= state.timeout_running_time
+ fn is_healthy(
+ &self,
+ pad: &super::FallbackSwitchSinkPad,
+ state: &State,
+ settings: &Settings,
+ now_running_time: Option<gst::ClockTime>,
+ ) -> bool {
+ /* The pad is healthy if it has received data within the
+ * last 'timeout' duration, which means the pad's current_running_time+timeout
+ * is later than 'now' according to the passed in running time, but not later
+ * than the timeout_running_time that would mean we time out before outputting
+ * that buffer */
+ match (
+ self.current_running_time,
+ now_running_time,
+ state.timeout_running_time,
+ ) {
+ (Some(pad_running_time), Some(now_running_time), Some(global_timeout_running_time)) => {
+ let timeout_running_time = pad_running_time.saturating_add(settings.timeout);
+ log!(
+ CAT,
+ obj: pad,
+ "pad_running_time {} timeout_running_time {} now_running_time {}",
+ pad_running_time,
+ timeout_running_time,
+ now_running_time,
+ );
+
+ timeout_running_time > now_running_time // Must be > not >=
+ && pad_running_time <= global_timeout_running_time
+ }
+ (Some(pad_running_time), Some(now_running_time), None) => {
+ let timeout_running_time = pad_running_time.saturating_add(settings.timeout);
+ log!(
+ CAT,
+ obj: pad,
+ "pad_running_time {} timeout_running_time {} now_running_time {}",
+ pad_running_time,
+ timeout_running_time,
+ now_running_time,
+ );
+
+ timeout_running_time > now_running_time // Must be > not >=
}
- None => false,
+ (Some(_input_running_time), None, _) => true,
+ (None, _, _) => false,
}
}
}
@@ -411,13 +450,22 @@ impl FallbackSwitch {
);
/* Advance the output running time to this timeout */
- state.output_running_time = Some(state.timeout_running_time);
+ state.output_running_time = state.timeout_running_time;
+
+ if !settings.auto_switch {
+ /* If auto-switching is disabled, don't check for a new
+ * pad */
+ state.timed_out = true;
+ return;
+ }
let active_sinkpad = self.active_sinkpad.lock().clone();
let mut best_priority = 0u32;
let mut best_pad = None;
+ let now_running_time = state.timeout_running_time;
+
for pad in self.obj().sink_pads() {
/* Don't consider the active sinkpad */
let pad = pad.downcast_ref::<super::FallbackSwitchSinkPad>().unwrap();
@@ -430,7 +478,7 @@ impl FallbackSwitch {
#[allow(clippy::collapsible_if)]
/* If this pad has data that arrived within the 'timeout' window
* before the timeout fired, we can switch to it */
- if pad_state.is_healthy(state, settings) {
+ if pad_state.is_healthy(pad, state, settings, now_running_time) {
if best_pad.is_none() || pad_settings.priority < best_priority {
best_pad = Some(pad.clone());
best_priority = pad_settings.priority;
@@ -451,7 +499,8 @@ impl FallbackSwitch {
}
}
- fn on_timeout(&self, clock_id: &gst::ClockId, settings: &Settings) {
+ fn on_timeout(&self, clock_id: &gst::ClockId) {
+ let settings = self.settings.lock().clone();
let mut state = self.state.lock();
if state.timeout_clock_id.as_ref() != Some(clock_id) {
@@ -463,7 +512,12 @@ impl FallbackSwitch {
// Ensure sink_chain on an inactive pad can schedule another timeout
state.timeout_clock_id = None;
- self.handle_timeout(&mut state, settings);
+ self.handle_timeout(&mut state, &settings);
+ let changed = self.update_health_statuses(&state, &settings);
+ drop(state);
+ for pad in changed {
+ pad.notify(PROP_IS_HEALTHY);
+ }
}
fn cancel_waits(&self) {
@@ -480,29 +534,29 @@ impl FallbackSwitch {
state: &mut State,
settings: &Settings,
running_time: gst::ClockTime,
- ) {
+ ) -> bool {
state.cancel_timeout();
let clock = match self.obj().clock() {
- None => return,
+ None => return false,
Some(clock) => clock,
};
let base_time = match self.obj().base_time() {
Some(base_time) => base_time,
- None => return,
+ None => return false,
};
let timeout_running_time = running_time
.saturating_add(state.upstream_latency + settings.timeout + settings.latency);
let wait_until = timeout_running_time + base_time;
- state.timeout_running_time = timeout_running_time;
+ state.timeout_running_time = Some(timeout_running_time);
/* If we're already running behind, fire the timeout immediately */
let now = clock.time();
if now.map_or(false, |now| wait_until <= now) {
self.handle_timeout(state, settings);
- return;
+ return true;
}
debug!(CAT, imp: self, "Scheduling timeout for {}", wait_until);
@@ -518,10 +572,41 @@ impl FallbackSwitch {
None => return,
Some(imp) => imp,
};
- let settings = imp.settings.lock().clone();
- imp.on_timeout(clock_id, &settings);
+ imp.on_timeout(clock_id);
})
.expect("Failed to wait async");
+ false
+ }
+
+ fn update_health_statuses(
+ &self,
+ state: &State,
+ settings: &Settings,
+ ) -> Vec<super::FallbackSwitchSinkPad> {
+ let mut changed = Vec::<super::FallbackSwitchSinkPad>::new();
+
+ /* Iterate over sink pads and update their is_healthy status,
+ * returning a Vec of pads whose health changed and need notifying */
+ for pad in self.obj().sink_pads() {
+ let pad = pad.downcast_ref::<super::FallbackSwitchSinkPad>().unwrap();
+ let pad_imp = pad.imp();
+ let mut pad_state = pad_imp.state.lock();
+
+ /* If this pad has data that arrived within the 'timeout' window
+ * before the timeout fired, we can switch to it */
+ let is_healthy = pad_state.is_healthy(pad, state, settings, state.output_running_time);
+ let health_changed = is_healthy != pad_state.is_healthy;
+ pad_state.is_healthy = is_healthy;
+
+ drop(pad_state);
+
+ if health_changed {
+ log!(CAT, obj: pad, "Health changed to {}", is_healthy);
+ changed.push(pad.clone());
+ }
+ }
+
+ changed
}
fn sink_activatemode(
@@ -600,7 +685,7 @@ impl FallbackSwitch {
* - sleep until the buffer running time, then check if we're still active
*/
- /* First see if we should become the active pad */
+ /* see if we should become the active pad */
let active_sinkpad = self.active_sinkpad.lock().clone();
let mut is_active = active_sinkpad.as_ref() == Some(pad);
if !is_active && settings.auto_switch {
@@ -633,6 +718,35 @@ impl FallbackSwitch {
let raw_pad = !matches!(pad_state.caps_info, CapsInfo::None);
let (start_running_time, end_running_time) = pad_state.get_sync_time(&buffer);
+ if let Some(running_time) = start_running_time {
+ pad_state.current_running_time = Some(running_time);
+ }
+
+ /* Update pad is-healthy state if necessary and notify
+ * if it changes, as that might affect which pad is
+ * active */
+ let is_healthy = pad_state.is_healthy(pad, &state, &settings, state.output_running_time);
+ let health_changed = is_healthy != pad_state.is_healthy;
+ pad_state.is_healthy = is_healthy;
+
+ /* Need to drop state locks before notifying */
+ let (mut state, mut pad_state) = if health_changed {
+ drop(pad_state);
+ drop(state);
+ log!(CAT, obj: pad, "Health changed to {}", is_healthy);
+ pad.notify(PROP_IS_HEALTHY);
+
+ if !settings.auto_switch {
+ /* Re-check if this is the active sinkpad */
+ let active_sinkpad = self.active_sinkpad.lock().clone();
+ is_active = active_sinkpad.as_ref() == Some(pad);
+ }
+
+ (self.state.lock(), pad_imp.state.lock())
+ } else {
+ (state, pad_state)
+ };
+
log!(
CAT,
obj: pad,
@@ -651,16 +765,18 @@ impl FallbackSwitch {
start_running_time,
state.upstream_latency + settings.latency,
)
- } else if end_running_time.map_or(false, |end_running_time| {
- end_running_time < state.timeout_running_time
- }) {
+ } else if state.timeout_running_time.is_some()
+ && end_running_time.map_or(false, |end_running_time| {
+ end_running_time < state.timeout_running_time.unwrap()
+ })
+ {
if raw_pad {
log!(
CAT,
obj: pad,
"Dropping trailing raw {:?} before timeout {}",
buffer,
- state.timeout_running_time
+ state.timeout_running_time.unwrap()
);
return Ok(gst::FlowSuccess::Ok);
} else {
@@ -669,7 +785,7 @@ impl FallbackSwitch {
obj: pad,
"Not dropping trailing non-raw {:?} before timeout {}",
buffer,
- state.timeout_running_time
+ state.timeout_running_time.unwrap()
);
None
@@ -683,17 +799,16 @@ impl FallbackSwitch {
)
};
- if let Some(running_time) = start_running_time {
- pad_state.current_running_time = Some(running_time);
- }
drop(pad_state);
+ let mut update_all_pad_health = false;
+
/* Before sleeping, ensure there is a timeout to switch active pads,
* in case the initial active pad never receives a buffer */
if let Some(running_time) = start_running_time {
if state.timeout_clock_id.is_none() && !is_active {
// May change active pad immediately
- self.schedule_timeout(&mut state, &settings, running_time);
+ update_all_pad_health = self.schedule_timeout(&mut state, &settings, running_time);
is_active = self.active_sinkpad.lock().as_ref() == Some(pad);
}
}
@@ -748,7 +863,8 @@ impl FallbackSwitch {
if let Some(end_running_time) = end_running_time {
// May change active pad immediately
- self.schedule_timeout(&mut state, &settings, end_running_time);
+ update_all_pad_health |=
+ self.schedule_timeout(&mut state, &settings, end_running_time);
is_active = self.active_sinkpad.lock().as_ref() == Some(pad);
} else {
state.cancel_timeout();
@@ -758,11 +874,37 @@ impl FallbackSwitch {
if let Some(running_time) = end_running_time {
pad_state.current_running_time = Some(running_time);
}
- pad_state.is_healthy = pad_state.is_healthy(&state, &settings);
+ let is_healthy = pad_state.is_healthy(pad, &state, &settings, state.output_running_time);
+ let health_changed = is_healthy != pad_state.is_healthy;
+ if health_changed {
+ log!(CAT, obj: pad, "Health changed to {}", is_healthy);
+ }
+ pad_state.is_healthy = is_healthy;
drop(pad_state);
+ /* If the schedule_timeout() calls above said the timeout happened,
+ * we should update the health of all pads here */
+ let mut state = if update_all_pad_health {
+ let changed_health_pads = self.update_health_statuses(&state, &settings);
+ drop(state);
+
+ for pad in changed_health_pads {
+ pad.notify(PROP_IS_HEALTHY);
+ }
+
+ self.state.lock()
+ } else {
+ state
+ };
+
if !is_active {
log!(CAT, obj: pad, "Dropping {:?} on inactive pad", buffer);
+
+ drop(state);
+ if health_changed {
+ pad.notify(PROP_IS_HEALTHY);
+ }
+
return Ok(gst::FlowSuccess::Ok);
}
@@ -772,15 +914,30 @@ impl FallbackSwitch {
is_active = self.active_sinkpad.lock().as_ref() == Some(pad);
if !is_active {
log!(CAT, obj: pad, "Dropping {:?} on inactive pad", buffer);
+
+ drop(state);
+ if health_changed {
+ pad.notify(PROP_IS_HEALTHY);
+ }
return Ok(gst::FlowSuccess::Ok);
}
+ /* Update the health status for all pads, since we're the active pad */
+ let changed_health_pads = self.update_health_statuses(&state, &settings);
+
let switched_pad = state.switched_pad;
let discont_pending = state.discont_pending;
state.switched_pad = false;
state.discont_pending = false;
drop(state);
+ if health_changed {
+ pad.notify(PROP_IS_HEALTHY);
+ }
+ for pad in changed_health_pads {
+ pad.notify(PROP_IS_HEALTHY);
+ }
+
if switched_pad {
let _ = pad.push_event(gst::event::Reconfigure::new());
pad.sticky_events_foreach(|event| {
diff --git a/utils/fallbackswitch/tests/fallbackswitch.rs b/utils/fallbackswitch/tests/fallbackswitch.rs
index 8257c73d6..564f7af6e 100644
--- a/utils/fallbackswitch/tests/fallbackswitch.rs
+++ b/utils/fallbackswitch/tests/fallbackswitch.rs
@@ -279,12 +279,13 @@ fn test_long_drop_and_recover(live: bool) {
let buffer = pull_buffer(&pipeline);
assert_fallback_buffer!(buffer, Some(4.seconds()));
- // Produce a sixth frame from the normal source
+ // Produce a sixth frame from the normal source, which
+ // will make it healthy again
push_buffer(&pipeline, 5.seconds());
set_time(&pipeline, 5.seconds() + 10.mseconds());
let buffer = pull_buffer(&pipeline);
assert_buffer!(buffer, Some(5.seconds()));
- assert!(!mainsink.property::<bool>("is-healthy"));
+ assert!(mainsink.property::<bool>("is-healthy"));
drop(mainsink);
drop(switch);