Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/utils
diff options
context:
space:
mode:
authorSebastian Dröge <sebastian@centricular.com>2020-07-02 12:23:48 +0300
committerSebastian Dröge <sebastian@centricular.com>2020-07-02 14:46:59 +0300
commit44ad0a2f52550630178ccce35753c1bd45863029 (patch)
treeb7e293883059dac180f9fc89b0bb9c7af88437d6 /utils
parentf7fc5bb0a3536eb9f02427177d58dea58673c5c1 (diff)
fallbacksrc: Differentiate between fallback and restart timeout
This also fixes the bug that the source wouldn't be restarted another time if we switched to the fallback stream before and didn't at least shortly switch to the normal stream. There was no timeout for this. Based on a patch by Mathieu Duponchelle <mathieu@centricular.com>
Diffstat (limited to 'utils')
-rw-r--r--utils/fallbackswitch/src/fallbacksrc.rs320
1 files changed, 196 insertions, 124 deletions
diff --git a/utils/fallbackswitch/src/fallbacksrc.rs b/utils/fallbackswitch/src/fallbacksrc.rs
index 2167223a2..26234d1f2 100644
--- a/utils/fallbackswitch/src/fallbacksrc.rs
+++ b/utils/fallbackswitch/src/fallbacksrc.rs
@@ -43,6 +43,7 @@ struct Settings {
source: Option<gst::Element>,
fallback_uri: Option<String>,
timeout: u64,
+ restart_timeout: u64,
retry_timeout: u64,
}
@@ -55,6 +56,7 @@ impl Default for Settings {
source: None,
fallback_uri: None,
timeout: 5 * gst::SECOND_VAL,
+ restart_timeout: 5 * gst::SECOND_VAL,
retry_timeout: 60 * gst::SECOND_VAL,
}
}
@@ -104,9 +106,8 @@ struct State {
source_is_live: bool,
source_pending_restart: bool,
- // For timing out the source if we have to wait some additional time
- // after fallbackswitch switched due to recent buffering activity
- source_pending_timeout: Option<gst::ClockId>,
+ // For timing out the source and shutting it down to restart it
+ source_restart_timeout: Option<gst::ClockId>,
// For restarting the source after shutting it down
source_pending_restart_timeout: Option<gst::ClockId>,
// For failing completely if we didn't recover after the retry timeout
@@ -143,7 +144,7 @@ enum Status {
Running,
}
-static PROPERTIES: [subclass::Property; 8] = [
+static PROPERTIES: [subclass::Property; 9] = [
subclass::Property("enable-audio", |name| {
glib::ParamSpec::boolean(
name,
@@ -194,6 +195,17 @@ static PROPERTIES: [subclass::Property; 8] = [
glib::ParamFlags::READWRITE,
)
}),
+ subclass::Property("restart-timeout", |name| {
+ glib::ParamSpec::uint64(
+ name,
+ "Timeout",
+ "Timeout for restarting an active source",
+ 0,
+ std::u64::MAX,
+ 5 * gst::SECOND_VAL,
+ glib::ParamFlags::READWRITE,
+ )
+ }),
subclass::Property("retry-timeout", |name| {
glib::ParamSpec::uint64(
name,
@@ -342,6 +354,18 @@ impl ObjectImpl for FallbackSrc {
);
settings.timeout = new_value;
}
+ subclass::Property("restart-timeout", ..) => {
+ let mut settings = self.settings.lock().unwrap();
+ let new_value = value.get_some().expect("type checked upstream");
+ gst_info!(
+ CAT,
+ obj: element,
+ "Changing Restart Timeout from {:?} to {:?}",
+ settings.restart_timeout,
+ new_value,
+ );
+ settings.restart_timeout = new_value;
+ }
subclass::Property("retry-timeout", ..) => {
let mut settings = self.settings.lock().unwrap();
let new_value = value.get_some().expect("type checked upstream");
@@ -389,6 +413,10 @@ impl ObjectImpl for FallbackSrc {
let settings = self.settings.lock().unwrap();
Ok(settings.timeout.to_value())
}
+ subclass::Property("restart-timeout", ..) => {
+ let settings = self.settings.lock().unwrap();
+ Ok(settings.restart_timeout.to_value())
+ }
subclass::Property("retry-timeout", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.retry_timeout.to_value())
@@ -424,7 +452,7 @@ impl ObjectImpl for FallbackSrc {
}
if state.buffering_percent < 100
- || state.source_pending_timeout.is_some()
+ || state.source_restart_timeout.is_some()
|| state.streams.is_none()
|| (have_audio
&& state
@@ -606,6 +634,8 @@ impl FallbackSrc {
Ok(identity)
})
.expect("No clocksync or identity found");
+ let queue = gst::ElementFactory::make("queue", Some("fallback_queue"))
+ .expect("No queue found");
input
.add_many(&[
@@ -615,11 +645,18 @@ impl FallbackSrc {
&videoscale,
&imagefreeze,
&clocksync,
+ &queue,
])
.unwrap();
gst::Element::link_many(&[&filesrc, &typefind]).unwrap();
- gst::Element::link_many(&[&videoconvert, &videoscale, &imagefreeze, &clocksync])
- .unwrap();
+ gst::Element::link_many(&[
+ &videoconvert,
+ &videoscale,
+ &imagefreeze,
+ &clocksync,
+ &queue,
+ ])
+ .unwrap();
filesrc
.dynamic_cast_ref::<gst::URIHandler>()
@@ -708,7 +745,7 @@ impl FallbackSrc {
})
.unwrap();
- clocksync.get_static_pad("src").unwrap()
+ queue.get_static_pad("src").unwrap()
}
None => {
let videotestsrc =
@@ -893,7 +930,7 @@ impl FallbackSrc {
source,
source_is_live: false,
source_pending_restart: false,
- source_pending_timeout: None,
+ source_restart_timeout: None,
source_pending_restart_timeout: None,
source_retry_timeout: None,
video_stream,
@@ -954,7 +991,7 @@ impl FallbackSrc {
timeout.unschedule();
}
- if let Some(timeout) = state.source_pending_timeout.take() {
+ if let Some(timeout) = state.source_restart_timeout.take() {
timeout.unschedule();
}
@@ -1018,12 +1055,21 @@ impl FallbackSrc {
"Source changed state successfully: {:?}",
res
);
+
+ let mut state_guard = self.state.lock().unwrap();
+ let state = state_guard.as_mut().expect("no state");
+
// Remember if the source is live
if transition == gst::StateChange::ReadyToPaused {
- let mut state_guard = self.state.lock().unwrap();
- let state = state_guard.as_mut().expect("no state");
state.source_is_live = res == gst::StateChangeSuccess::NoPreroll;
}
+
+ if (state.source_is_live && transition == gst::StateChange::ReadyToPaused)
+ || (!state.source_is_live && transition == gst::StateChange::PausedToPlaying)
+ {
+ assert!(state.source_restart_timeout.is_none());
+ self.schedule_source_restart_timeout(element, state, 0.into());
+ }
}
}
@@ -1641,7 +1687,7 @@ impl FallbackSrc {
}
// Unschedule pending timeout, we're restarting now
- if let Some(timeout) = state.source_pending_timeout.take() {
+ if let Some(timeout) = state.source_restart_timeout.take() {
timeout.unschedule();
}
@@ -1754,6 +1800,11 @@ impl FallbackSrc {
state.source_pending_restart_timeout = None;
state.buffering_percent = 100;
state.last_buffering_update = None;
+
+ if let Some(timeout) = state.source_restart_timeout.take() {
+ gst_debug!(CAT, obj: element, "Unscheduling restart timeout");
+ timeout.unschedule();
+ }
drop(state_guard);
if let Some(old_source) = old_source {
@@ -1768,6 +1819,11 @@ impl FallbackSrc {
let mut state_guard = src.state.lock().unwrap();
let state = state_guard.as_mut().expect("no state");
src.handle_source_error(element, state);
+ } else {
+ let mut state_guard = src.state.lock().unwrap();
+ let state = state_guard.as_mut().expect("no state");
+ assert!(state.source_restart_timeout.is_none());
+ src.schedule_source_restart_timeout(element, state, 0.into());
}
});
})
@@ -1777,6 +1833,122 @@ impl FallbackSrc {
}
#[allow(clippy::block_in_if_condition_stmt)]
+ fn schedule_source_restart_timeout(
+ &self,
+ element: &gst::Bin,
+ state: &mut State,
+ elapsed: gst::ClockTime,
+ ) {
+ let clock = gst::SystemClock::obtain();
+ let wait_time = clock.get_time()
+ + gst::ClockTime::from_nseconds(state.settings.restart_timeout)
+ - elapsed;
+ assert!(wait_time.is_some());
+ gst_debug!(
+ CAT,
+ obj: element,
+ "Scheduling source restart timeout for {}",
+ wait_time,
+ );
+
+ let timeout = clock
+ .new_single_shot_id(wait_time)
+ .expect("can't create clock id");
+ let element_weak = element.downgrade();
+ timeout
+ .wait_async(move |_clock, _time, _id| {
+ let element = match element_weak.upgrade() {
+ None => return,
+ Some(element) => element,
+ };
+
+ element.call_async(move |element| {
+ let src = FallbackSrc::from_instance(element);
+
+ gst_debug!(CAT, obj: element, "Source restart timeout triggered");
+ let mut state_guard = src.state.lock().unwrap();
+ let state = match &mut *state_guard {
+ None => {
+ gst_debug!(CAT, obj: element, "Restarting source not needed anymore");
+ return;
+ }
+ Some(state) => state,
+ };
+
+ state.source_restart_timeout = None;
+
+ let mut have_audio = false;
+ let mut have_video = false;
+ if let Some(ref streams) = state.streams {
+ for stream in streams.iter() {
+ have_audio = have_audio
+ || stream.get_stream_type().contains(gst::StreamType::AUDIO);
+ have_video = have_video
+ || stream.get_stream_type().contains(gst::StreamType::VIDEO);
+ }
+ }
+
+ // If we have neither audio nor video (no streams yet), or active pad for the ones we have
+ // is the fallback pad then restart the source now.
+ if (!have_audio && !have_video)
+ || (have_audio
+ && state
+ .audio_stream
+ .as_ref()
+ .and_then(|s| {
+ s.switch
+ .get_property("active-pad")
+ .unwrap()
+ .get::<gst::Pad>()
+ .unwrap()
+ })
+ .map(|p| p.get_name() == "fallback_sink")
+ .unwrap_or(true))
+ || (have_video
+ && state
+ .video_stream
+ .as_ref()
+ .and_then(|s| {
+ s.switch
+ .get_property("active-pad")
+ .unwrap()
+ .get::<gst::Pad>()
+ .unwrap()
+ })
+ .map(|p| p.get_name() == "fallback_sink")
+ .unwrap_or(true))
+ {
+ // If we're not actively buffering right now let's restart the source
+ if state
+ .last_buffering_update
+ .map(|i| {
+ i.elapsed() >= Duration::from_nanos(state.settings.restart_timeout)
+ })
+ .unwrap_or(state.buffering_percent == 100)
+ {
+ gst_debug!(CAT, obj: element, "Not buffering, restarting source");
+
+ src.handle_source_error(element, state);
+ } else {
+ gst_debug!(CAT, obj: element, "Buffering, restarting source later");
+ let elapsed = state
+ .last_buffering_update
+ .map(|i| i.elapsed().as_nanos() as u64)
+ .unwrap_or(0);
+
+ src.schedule_source_restart_timeout(element, state, elapsed.into());
+ }
+ } else {
+ gst_debug!(CAT, obj: element, "Restarting source not needed anymore");
+ }
+ });
+ })
+ .expect("Failed to wait async");
+
+ state.source_restart_timeout = Some(timeout);
+ }
+
+ #[allow(clippy::block_in_if_condition_stmt)]
fn handle_switch_active_pad_change(&self, element: &gst::Bin) {
let mut state_guard = self.state.lock().unwrap();
let state = match &mut *state_guard {
@@ -1797,12 +1969,6 @@ impl FallbackSrc {
}
}
- // We will schedule a new one if needed below
- if let Some(timeout) = state.source_pending_timeout.take() {
- gst_debug!(CAT, obj: element, "Unscheduling pending timeout");
- timeout.unschedule();
- }
-
// If we have neither audio nor video (no streams yet), or active pad for the ones we have
// is the fallback pad then start the retry timeout unless it was started already.
// Otherwise cancel the retry timeout.
@@ -1835,114 +2001,20 @@ impl FallbackSrc {
.unwrap_or(true))
{
gst_warning!(CAT, obj: element, "Switched to fallback stream");
-
- // If we're not actively buffering right now let's restart the source
- if state
- .last_buffering_update
- .map(|i| i.elapsed() >= Duration::from_nanos(state.settings.timeout))
- .unwrap_or(state.buffering_percent == 100)
- {
- gst_debug!(CAT, obj: element, "Not buffering, restarting source");
- self.handle_source_error(element, state);
- } else {
- // Schedule another timeout after the last buffering activity
- let clock = gst::SystemClock::obtain();
- let diff = gst::ClockTime::from(
- state.settings.timeout.saturating_sub(
- state
- .last_buffering_update
- .map(|i| i.elapsed().as_nanos() as u64)
- .unwrap_or(state.settings.timeout),
- ),
- );
- let wait_time = clock.get_time() + diff;
- assert!(wait_time.is_some());
-
- gst_debug!(CAT, obj: element, "Starting pending timeout for {}", diff);
- let timeout = clock
- .new_single_shot_id(wait_time)
- .expect("can't create clock id");
-
- let element_weak = element.downgrade();
- timeout
- .wait_async(move |_clock, _time, _id| {
- let element = match element_weak.upgrade() {
- None => return,
- Some(element) => element,
- };
-
- element.call_async(|element| {
- let src = FallbackSrc::from_instance(element);
- src.handle_switch_active_pad_change(element);
- });
- })
- .expect("failed to wait async");
-
- state.source_pending_timeout = Some(timeout);
+ if state.source_restart_timeout.is_none() {
+ self.schedule_source_restart_timeout(element, state, 0.into());
+ }
+ } else {
+ gst_debug!(CAT, obj: element, "Switched to main stream");
+ if let Some(timeout) = state.source_retry_timeout.take() {
+ gst_debug!(CAT, obj: element, "Unscheduling retry timeout");
+ timeout.unschedule();
}
- if state.source_retry_timeout.is_none() {
- let clock = gst::SystemClock::obtain();
- let wait_time =
- clock.get_time() + gst::ClockTime::from(state.settings.retry_timeout);
- assert!(wait_time.is_some());
-
- gst_debug!(CAT, obj: element, "Starting retry timeout");
- let timeout = clock
- .new_single_shot_id(wait_time)
- .expect("can't create clock id");
-
- let element_weak = element.downgrade();
- timeout
- .wait_async(move |_clock, _time, _id| {
- let element = match element_weak.upgrade() {
- None => return,
- Some(element) => element,
- };
-
- element.call_async(|element| {
- let src = FallbackSrc::from_instance(element);
- let mut state_guard = src.state.lock().unwrap();
- let state = match &mut *state_guard {
- None => return,
- Some(ref mut state) => state,
- };
- if state.source_retry_timeout.take().is_none() {
- return;
- }
- if let Some(timeout) = state.source_pending_restart_timeout.take() {
- timeout.unschedule();
- }
- if let Some(timeout) = state.source_pending_timeout.take() {
- timeout.unschedule();
- }
- state.source_pending_restart = false;
- drop(state_guard);
-
- gst_element_warning!(
- element,
- gst::ResourceError::OpenRead,
- ["Failed to start playback"]
- );
- gst_warning!(CAT, obj: element, "Retry timeout, finishing");
-
- for pad in element.get_src_pads() {
- element.call_async(move |_element| {
- pad.push_event(gst::event::Eos::new());
- });
- }
- });
- })
- .expect("failed to wait async");
-
- state.source_retry_timeout = Some(timeout);
-
- drop(state_guard);
- element.notify("status");
+ if let Some(timeout) = state.source_restart_timeout.take() {
+ gst_debug!(CAT, obj: element, "Unscheduling restart timeout");
+ timeout.unschedule();
}
- } else if let Some(timeout) = state.source_retry_timeout.take() {
- gst_debug!(CAT, obj: element, "Unscheduling retry timeout");
- timeout.unschedule();
drop(state_guard);
element.notify("status");