Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGuillaume Desmottes <guillaume.desmottes@onestream.live>2023-08-14 12:51:42 +0300
committerSebastian Dröge <sebastian@centricular.com>2023-09-20 19:44:22 +0300
commit91d2406180ba8434cadafb2813753cfa74f3acc0 (patch)
treef831e09b54efb30e2d17a2640f6597c11ae5b9a1
parente1d35536adfa9049a9d58888855abc69597ac4bc (diff)
fallbackswitch: protect src pad stream lock using Cond0.10
Should prevent stream and State deadlocks, see https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/202 Fix #202 Hopefully fix #192 as well. Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1330>
-rw-r--r--utils/fallbackswitch/src/fallbackswitch/imp.rs67
1 files changed, 55 insertions, 12 deletions
diff --git a/utils/fallbackswitch/src/fallbackswitch/imp.rs b/utils/fallbackswitch/src/fallbackswitch/imp.rs
index 3e76be4b4..2300e27a2 100644
--- a/utils/fallbackswitch/src/fallbackswitch/imp.rs
+++ b/utils/fallbackswitch/src/fallbackswitch/imp.rs
@@ -14,7 +14,7 @@ use gst::{debug, log, trace};
use once_cell::sync::Lazy;
-use parking_lot::{Mutex, MutexGuard};
+use parking_lot::{Condvar, Mutex, MutexGuard};
use std::sync::atomic::{AtomicU32, Ordering};
const PROP_PRIORITY: &str = "priority";
@@ -87,6 +87,10 @@ struct State {
timeout_running_time: Option<gst::ClockTime>,
timeout_clock_id: Option<gst::ClockId>,
+
+ /// If the src pad is currently busy. Should be checked and waited on using `src_busy_cond`
+ /// before calling anything requiring the stream lock.
+ src_busy: bool,
}
impl Default for State {
@@ -102,6 +106,8 @@ impl Default for State {
timeout_running_time: None,
timeout_clock_id: None,
+
+ src_busy: false,
}
}
}
@@ -420,6 +426,7 @@ impl SinkState {
#[derive(Debug)]
pub struct FallbackSwitch {
state: Mutex<State>,
+ src_busy_cond: Condvar,
settings: Mutex<Settings>,
// Separated from the rest of the `state` because it can be
@@ -952,10 +959,12 @@ impl FallbackSwitch {
}
if switched_pad {
- let _ = pad.push_event(gst::event::Reconfigure::new());
- pad.sticky_events_foreach(|event| {
- self.src_pad.push_event(event.clone());
- std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep)
+ self.with_src_busy(|| {
+ let _ = pad.push_event(gst::event::Reconfigure::new());
+ pad.sticky_events_foreach(|event| {
+ self.src_pad.push_event(event.clone());
+ std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep)
+ });
});
self.obj().notify(PROP_ACTIVE_PAD);
@@ -987,11 +996,13 @@ impl FallbackSwitch {
let out_gap_event = builder.build();
- self.src_pad.push_event(out_gap_event);
+ self.with_src_busy(|| {
+ self.src_pad.push_event(out_gap_event);
+ });
Ok(gst::FlowSuccess::Ok)
} else {
- self.src_pad.push(buffer)
+ self.with_src_busy(|| self.src_pad.push(buffer))
}
}
@@ -1114,15 +1125,18 @@ impl FallbackSwitch {
drop(state);
if fwd_sticky {
- let _ = pad.push_event(gst::event::Reconfigure::new());
- pad.sticky_events_foreach(|event| {
- self.src_pad.push_event(event.clone());
- std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep)
+ self.with_src_busy(|| {
+ let _ = pad.push_event(gst::event::Reconfigure::new());
+ pad.sticky_events_foreach(|event| {
+ self.src_pad.push_event(event.clone());
+ std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep)
+ });
});
self.obj().notify(PROP_ACTIVE_PAD);
}
- self.src_pad.push_event(event)
+
+ self.with_src_busy(|| self.src_pad.push_event(event))
}
fn sink_query(&self, pad: &super::FallbackSwitchSinkPad, query: &mut gst::QueryRef) -> bool {
@@ -1237,6 +1251,34 @@ impl FallbackSwitch {
false
}
+
+ /// Wait until src_busy is not set and set it, execute
+ /// the closure, then unset it again and notify its Cond.
+ ///
+ /// The State lock is taken while modifying src_busy,
+ /// but not while executing the closure.
+ fn with_src_busy<F, R>(&self, func: F) -> R
+ where
+ F: FnOnce() -> R,
+ {
+ {
+ let mut state = self.state.lock();
+ while state.src_busy {
+ self.src_busy_cond.wait(&mut state);
+ }
+ state.src_busy = true;
+ }
+
+ let ret = func();
+
+ {
+ let mut state = self.state.lock();
+ state.src_busy = false;
+ self.src_busy_cond.notify_one();
+ }
+
+ ret
+ }
}
#[glib::object_subclass]
@@ -1260,6 +1302,7 @@ impl ObjectSubclass for FallbackSwitch {
Self {
state: Mutex::new(State::default()),
+ src_busy_cond: Condvar::default(),
settings: Mutex::new(Settings::default()),
active_sinkpad: Mutex::new(None),
src_pad: srcpad,