Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/utils
diff options
context:
space:
mode:
authorSebastian Dröge <sebastian@centricular.com>2022-08-19 17:34:17 +0300
committerSebastian Dröge <sebastian@centricular.com>2022-09-27 12:56:15 +0300
commit38753b08acc0fe045997cf1deede912e8c839bcb (patch)
tree5528af2a6c43ea4db1a28db38e824112fbd339a4 /utils
parent00411523d4d5fdec6e53b2a5d3048a684f58faca (diff)
fallbacksrc: Implement support for fallback streams
Diffstat (limited to 'utils')
-rw-r--r--utils/fallbackswitch/src/fallbacksrc/imp.rs2028
-rw-r--r--utils/fallbackswitch/src/fallbacksrc/mod.rs1
-rw-r--r--utils/fallbackswitch/src/fallbacksrc/video_fallback/imp.rs464
-rw-r--r--utils/fallbackswitch/src/fallbacksrc/video_fallback/mod.rs22
4 files changed, 1461 insertions, 1054 deletions
diff --git a/utils/fallbackswitch/src/fallbacksrc/imp.rs b/utils/fallbackswitch/src/fallbacksrc/imp.rs
index 907c007eb..0dc74ff88 100644
--- a/utils/fallbackswitch/src/fallbacksrc/imp.rs
+++ b/utils/fallbackswitch/src/fallbacksrc/imp.rs
@@ -11,13 +11,12 @@ use gst::prelude::*;
use gst::subclass::prelude::*;
use parking_lot::Mutex;
-use std::mem;
use std::time::Instant;
+use std::{cmp, mem};
use once_cell::sync::Lazy;
use super::custom_source::CustomSource;
-use super::video_fallback::VideoFallbackSource;
use super::{RetryReason, Status};
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
@@ -31,16 +30,22 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
#[derive(Debug, Clone)]
struct Stats {
num_retry: u64,
+ num_fallback_retry: u64,
last_retry_reason: RetryReason,
+ last_fallback_retry_reason: RetryReason,
buffering_percent: i32,
+ fallback_buffering_percent: i32,
}
impl Default for Stats {
fn default() -> Self {
Self {
num_retry: 0,
+ num_fallback_retry: 0,
last_retry_reason: RetryReason::None,
+ last_fallback_retry_reason: RetryReason::None,
buffering_percent: 100,
+ fallback_buffering_percent: 100,
}
}
}
@@ -49,8 +54,17 @@ impl Stats {
fn to_structure(&self) -> gst::Structure {
gst::Structure::builder("application/x-fallbacksrc-stats")
.field("num-retry", self.num_retry)
+ .field("num-fallback-retry", &self.num_fallback_retry)
.field("last-retry-reason", self.last_retry_reason)
+ .field(
+ "last-fallback-retry-reason",
+ self.last_fallback_retry_reason,
+ )
.field("buffering-percent", self.buffering_percent)
+ .field(
+ "fallback-buffering-percent",
+ self.fallback_buffering_percent,
+ )
.build()
}
}
@@ -113,47 +127,72 @@ struct Block {
running_time: Option<gst::ClockTime>,
}
-// Connects one source pad with fallbackswitch and the corresponding fallback input
-struct Stream {
- // Fallback input stream
- // for video: filesrc, decoder, converters, imagefreeze
- // for audio: live audiotestsrc, converters
- fallback_input: gst::Element,
-
- fallback_capsfilter: gst::Element,
-
- // source pad from source
- source_srcpad: Option<gst::Pad>,
+struct StreamBranch {
+ // source pad from actual source inside the source bin
+ source_srcpad: gst::Pad,
+ // blocking pad probe on the source pad of the source queue
source_srcpad_block: Option<Block>,
- // clocksync for source source pad
+ // other elements in the source bin before the ghostpad
+ // imagefreeze before the clocksync if this is a stillframe stream
+ imagefreeze: Option<gst::Element>,
clocksync: gst::Element,
+ converters: gst::Element,
+ queue: gst::Element,
+ // queue source pad, target pad of the source ghost pad
+ queue_srcpad: gst::Pad,
- // imagefreeze if this is an image stream
- imagefreeze: Option<gst::Element>,
+ // Request pad on the fallbackswitch
+ switch_pad: gst::Pad,
+}
- clocksync_queue: gst::Element,
- clocksync_queue_srcpad: gst::Pad,
+// Connects one source pad with fallbackswitch and the corresponding fallback input
+struct Stream {
+ // Main stream and fallback stream branches to the fallback switch
+ main_branch: Option<StreamBranch>,
+ // If this does not exist then the fallbackswitch is connected directly to the dummy
+ // audio/video sources
+ fallback_branch: Option<StreamBranch>,
// fallbackswitch
+ // fallbackswitch in the main bin, linked to the ghostpads above
switch: gst::Element,
- // output source pad, connected to switch
+ // output source pad on the main bin, switch source pad is ghostpad target
srcpad: gst::GhostPad,
+
+ // filter caps for the fallback/dummy streams
+ filter_caps: gst::Caps,
}
-struct State {
- // uridecodebin3 or custom source element
- source: gst::Element,
- source_is_live: bool,
- source_pending_restart: bool,
+struct SourceBin {
+ // uridecodebin3 or custom source element inside a bin.
+ //
+ // This bin would also contain imagefreeze, clocksync and queue elements as needed for the
+ // outputs and would be connected via ghost pads to the fallbackswitch elements.
+ source: gst::Bin,
+ pending_restart: bool,
+ is_live: bool,
+ is_image: bool,
// For timing out the source and shutting it down to restart it
- source_restart_timeout: Option<gst::SingleShotClockId>,
+ restart_timeout: Option<gst::SingleShotClockId>,
// For restarting the source after shutting it down
- source_pending_restart_timeout: Option<gst::SingleShotClockId>,
+ pending_restart_timeout: Option<gst::SingleShotClockId>,
// For failing completely if we didn't recover after the retry timeout
- source_retry_timeout: Option<gst::SingleShotClockId>,
+ retry_timeout: Option<gst::SingleShotClockId>,
+
+ // Stream collection posted by source
+ streams: Option<gst::StreamCollection>,
+}
+
+struct State {
+ source: SourceBin,
+ fallback_source: Option<SourceBin>,
+
+ // audio/video dummy source if the fallback source fails or is not started yet
+ audio_dummy_source: Option<gst::Bin>,
+ video_dummy_source: Option<gst::Bin>,
// All our output streams, selected by properties
video_stream: Option<Stream>,
@@ -161,9 +200,7 @@ struct State {
flow_combiner: gst_base::UniqueFlowCombiner,
last_buffering_update: Option<Instant>,
-
- // Stream collection posted by source
- streams: Option<gst::StreamCollection>,
+ fallback_last_buffering_update: Option<Instant>,
// Configure settings
settings: Settings,
@@ -177,8 +214,6 @@ struct State {
// So that we don't schedule a restart when manually unblocking
// and our source hasn't reached the required state
schedule_restart_on_unblock: bool,
-
- is_image: bool,
}
#[derive(Default)]
@@ -555,9 +590,9 @@ impl ObjectImpl for FallbackSrc {
};
// If any restarts/retries are pending, we're retrying
- if state.source_pending_restart
- || state.source_pending_restart_timeout.is_some()
- || state.source_retry_timeout.is_some()
+ if state.source.pending_restart
+ || state.source.pending_restart_timeout.is_some()
+ || state.source.retry_timeout.is_some()
{
return Status::Retrying.to_value();
}
@@ -566,7 +601,7 @@ impl ObjectImpl for FallbackSrc {
// streams there is no source pad yet, we're buffering
let mut have_audio = false;
let mut have_video = false;
- if let Some(ref streams) = state.streams {
+ if let Some(ref streams) = state.source.streams {
for stream in streams.iter() {
have_audio =
have_audio || stream.stream_type().contains(gst::StreamType::AUDIO);
@@ -576,19 +611,21 @@ impl ObjectImpl for FallbackSrc {
}
if state.stats.buffering_percent < 100
- || state.source_restart_timeout.is_some()
- || state.streams.is_none()
+ || state.source.restart_timeout.is_some()
+ || state.source.streams.is_none()
|| (have_audio
&& state
.audio_stream
.as_ref()
- .map(|s| s.source_srcpad.is_none() || s.source_srcpad_block.is_some())
+ .and_then(|s| s.main_branch.as_ref())
+ .map(|b| b.source_srcpad_block.is_some())
.unwrap_or(true))
|| (have_video
&& state
.video_stream
.as_ref()
- .map(|s| s.source_srcpad.is_none() || s.source_srcpad_block.is_some())
+ .and_then(|s| s.main_branch.as_ref())
+ .map(|b| b.source_srcpad_block.is_some())
.unwrap_or(true))
{
return Status::Buffering.to_value();
@@ -664,10 +701,11 @@ impl ObjectImpl for FallbackSrc {
&element,
state,
gst::ClockTime::ZERO,
+ false,
);
}
- src.unblock_pads(&element, state);
+ src.unblock_pads(&element, state, false);
None
})
@@ -697,7 +735,7 @@ impl ElementImpl for FallbackSrc {
gst::subclass::ElementMetadata::new(
"Fallback Source",
"Generic/Source",
- "Live source with uridecodebin3 or custom source, and fallback image stream",
+ "Live source with uridecodebin3 or custom source, and fallback stream",
"Sebastian Dröge <sebastian@centricular.com>",
)
});
@@ -735,6 +773,8 @@ impl ElementImpl for FallbackSrc {
element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
+ gst::debug!(CAT, obj: element, "Changing state {:?}", transition);
+
match transition {
gst::StateChange::NullToReady => {
self.start(element)?;
@@ -742,11 +782,25 @@ impl ElementImpl for FallbackSrc {
_ => (),
}
- self.parent_change_state(element, transition)?;
+ self.parent_change_state(element, transition)
+ .map_err(|err| {
+ gst::error!(
+ CAT,
+ obj: element,
+ "Parent state change transition {:?} failed",
+ transition
+ );
+
+ err
+ })?;
// Change the source state manually here to be able to catch errors. State changes always
// happen from sink to source, so we do this after chaining up.
- self.change_source_state(element, transition);
+ self.change_source_state(element, transition, false);
+
+ // Change the fallback source state manually here to be able to catch errors. State changes always
+ // happen from sink to source, so we do this after chaining up.
+ self.change_source_state(element, transition, true);
// Ignore parent state change return to prevent spurious async/no-preroll return values
// due to core state change bugs
@@ -762,10 +816,14 @@ impl ElementImpl for FallbackSrc {
}
}
- fn send_event(&self, _element: &Self::Type, event: gst::Event) -> bool {
+ fn send_event(&self, element: &Self::Type, event: gst::Event) -> bool {
match event.view() {
gst::EventView::Eos(..) => {
- gst::debug!(CAT, "Handling element-level EOS, forwarding to all streams");
+ gst::debug!(
+ CAT,
+ obj: element,
+ "Handling element-level EOS, forwarding to all streams"
+ );
let mut state_guard = self.state.lock();
let state = match &mut *state_guard {
@@ -776,34 +834,42 @@ impl ElementImpl for FallbackSrc {
};
// We don't want to hold the state lock while pushing out EOS
- let mut send_eos_elements: Vec<gst::Element> = vec![];
- let mut send_eos_pads: Vec<gst::Pad> = vec![];
+ let mut send_eos_elements = vec![];
+ let mut send_eos_pads = vec![];
- send_eos_elements.push(state.source.clone());
+ send_eos_elements.push(state.source.source.clone());
+
+ // Not strictly necessary as the switch will EOS when receiving
+ // EOS on its primary pad, just good form.
+ if let Some(ref source) = state.fallback_source {
+ send_eos_elements.push(source.source.clone());
+ }
+ if let Some(ref source) = state.audio_dummy_source {
+ send_eos_elements.push(source.clone());
+ }
+ if let Some(ref source) = state.video_dummy_source {
+ send_eos_elements.push(source.clone());
+ }
- for stream in [&mut state.video_stream, &mut state.audio_stream]
+ for branch in [&mut state.video_stream, &mut state.audio_stream]
.iter_mut()
.filter_map(|v| v.as_mut())
+ .flat_map(|s| [s.main_branch.as_mut(), s.fallback_branch.as_mut()])
+ .flatten()
{
- // Not strictly necessary as the switch will EOS when receiving
- // EOS on its primary pad, just good form.
- send_eos_elements.push(stream.fallback_input.clone());
// If our source hadn't been connected to the switch as a primary
// stream, we need to send EOS there ourselves
- if stream.source_srcpad.is_none() {
- let clocksync_queue_sinkpad =
- stream.clocksync_queue.static_pad("sink").unwrap();
- send_eos_pads.push(clocksync_queue_sinkpad.clone());
- }
+ let queue_sinkpad = branch.queue.static_pad("sink").unwrap();
+ send_eos_pads.push(queue_sinkpad.clone());
}
drop(state_guard);
- for elem in send_eos_elements.drain(..) {
+ for elem in send_eos_elements {
elem.send_event(event.clone());
}
- for pad in send_eos_pads.drain(..) {
+ for pad in send_eos_pads {
pad.send_event(event.clone());
}
@@ -841,12 +907,128 @@ impl BinImpl for FallbackSrc {
}
impl FallbackSrc {
+ fn create_dummy_audio_source(filter_caps: &gst::Caps, min_latency: gst::ClockTime) -> gst::Bin {
+ let bin = gst::Bin::new(None);
+
+ let audiotestsrc = gst::ElementFactory::make("audiotestsrc", Some("audiosrc"))
+ .expect("No audiotestsrc found");
+
+ let audioconvert = gst::ElementFactory::make("audioconvert", Some("audio_audioconvert"))
+ .expect("No audioconvert found");
+
+ let audioresample = gst::ElementFactory::make("audioresample", Some("audio_audioresample"))
+ .expect("No audioresample found");
+
+ let capsfilter = gst::ElementFactory::make("capsfilter", Some("audio_capsfilter"))
+ .expect("No capsfilter found");
+
+ let queue = gst::ElementFactory::make("queue", None).expect("No queue found");
+
+ audiotestsrc.set_property_from_str("wave", "silence");
+ audiotestsrc.set_property("is-live", true);
+
+ capsfilter.set_property("caps", filter_caps);
+
+ queue.set_properties(&[
+ ("max-size-bytes", &0u32),
+ ("max-size-buffers", &0u32),
+ (
+ "max-size-time",
+ &(cmp::max(min_latency, gst::ClockTime::from_seconds(1))),
+ ),
+ ]);
+
+ bin.add_many(&[
+ &audiotestsrc,
+ &audioconvert,
+ &audioresample,
+ &capsfilter,
+ &queue,
+ ])
+ .unwrap();
+
+ gst::Element::link_many(&[
+ &audiotestsrc,
+ &audioconvert,
+ &audioresample,
+ &capsfilter,
+ &queue,
+ ])
+ .unwrap();
+
+ let ghostpad =
+ gst::GhostPad::with_target(Some("src"), &queue.static_pad("src").unwrap()).unwrap();
+ ghostpad.set_active(true).unwrap();
+ bin.add_pad(&ghostpad).unwrap();
+
+ bin
+ }
+
+ fn create_dummy_video_source(filter_caps: &gst::Caps, min_latency: gst::ClockTime) -> gst::Bin {
+ let bin = gst::Bin::new(None);
+
+ let videotestsrc = gst::ElementFactory::make("videotestsrc", Some("videosrc"))
+ .expect("No videotestsrc found");
+
+ let videoconvert = gst::ElementFactory::make("videoconvert", Some("video_videoconvert"))
+ .expect("No videoconvert found");
+
+ let videoscale = gst::ElementFactory::make("videoscale", Some("video_videoscale"))
+ .expect("No videoscale found");
+
+ let capsfilter = gst::ElementFactory::make("capsfilter", Some("video_capsfilter"))
+ .expect("No capsfilter found");
+
+ let queue = gst::ElementFactory::make("queue", None).expect("No queue found");
+
+ videotestsrc.set_property_from_str("pattern", "black");
+ videotestsrc.set_property("is-live", true);
+
+ capsfilter.set_property("caps", filter_caps);
+
+ queue.set_properties(&[
+ ("max-size-bytes", &0u32),
+ ("max-size-buffers", &0u32),
+ (
+ "max-size-time",
+ &(cmp::max(min_latency, gst::ClockTime::from_seconds(1))),
+ ),
+ ]);
+
+ bin.add_many(&[
+ &videotestsrc,
+ &videoconvert,
+ &videoscale,
+ &capsfilter,
+ &queue,
+ ])
+ .unwrap();
+
+ gst::Element::link_many(&[
+ &videotestsrc,
+ &videoconvert,
+ &videoscale,
+ &capsfilter,
+ &queue,
+ ])
+ .unwrap();
+
+ let ghostpad =
+ gst::GhostPad::with_target(Some("src"), &queue.static_pad("src").unwrap()).unwrap();
+ ghostpad.set_active(true).unwrap();
+ bin.add_pad(&ghostpad).unwrap();
+
+ bin
+ }
+
fn create_main_input(
&self,
element: &super::FallbackSrc,
source: &Source,
buffer_duration: i64,
- ) -> gst::Element {
+ ) -> SourceBin {
+ let bin = gst::Bin::new(None);
+
let source = match source {
Source::Uri(ref uri) => {
let source = gst::ElementFactory::make("uridecodebin3", Some("uridecodebin"))
@@ -863,12 +1045,14 @@ impl FallbackSrc {
Source::Element(ref source) => CustomSource::new(source).upcast(),
};
+ bin.add(&source).unwrap();
+
// Handle any async state changes internally, they don't affect the pipeline because we
// convert everything to a live stream
- source.set_property("async-handling", true);
+ bin.set_property("async-handling", true);
// Don't let the bin handle state changes of the source. We want to do it manually to catch
// possible errors and retry, without causing the whole bin state change to fail
- source.set_locked_state(true);
+ bin.set_locked_state(true);
let element_weak = element.downgrade();
source.connect_pad_added(move |_, pad| {
@@ -878,7 +1062,7 @@ impl FallbackSrc {
};
let src = element.imp();
- if let Err(msg) = src.handle_source_pad_added(&element, pad) {
+ if let Err(msg) = src.handle_source_pad_added(&element, pad, false) {
element.post_error_message(msg);
}
});
@@ -890,42 +1074,88 @@ impl FallbackSrc {
};
let src = element.imp();
- src.handle_source_pad_removed(&element, pad);
+ src.handle_source_pad_removed(&element, pad, false);
});
- element.add_many(&[&source]).unwrap();
+ element.add(&bin).unwrap();
- source
+ SourceBin {
+ source: bin,
+ pending_restart: false,
+ is_live: false,
+ is_image: false,
+ restart_timeout: None,
+ pending_restart_timeout: None,
+ retry_timeout: None,
+ streams: None,
+ }
}
- fn create_fallback_video_input(
+ fn create_fallback_input(
&self,
- _element: &super::FallbackSrc,
- min_latency: gst::ClockTime,
+ element: &super::FallbackSrc,
fallback_uri: Option<&str>,
- ) -> gst::Element {
- VideoFallbackSource::new(fallback_uri, min_latency).upcast()
- }
+ buffer_duration: i64,
+ ) -> Option<SourceBin> {
+ let source: gst::Element = match fallback_uri {
+ Some(uri) => {
+ let dbin = gst::ElementFactory::make("uridecodebin3", Some("uridecodebin"))
+ .expect("No uridecodebin3 found");
+ dbin.set_property("uri", uri);
+ dbin.set_property("use-buffering", true);
+ dbin.set_property("buffer-duration", buffer_duration);
- fn create_fallback_audio_input(&self, _element: &super::FallbackSrc) -> gst::Element {
- let input = gst::Bin::new(Some("fallback_audio"));
- let audiotestsrc = gst::ElementFactory::make("audiotestsrc", Some("fallback_audiosrc"))
- .expect("No audiotestsrc found");
- input.add_many(&[&audiotestsrc]).unwrap();
+ dbin
+ }
+ None => return None,
+ };
- audiotestsrc.set_property_from_str("wave", "silence");
- audiotestsrc.set_property("is-live", true);
+ let bin = gst::Bin::new(None);
- let srcpad = audiotestsrc.static_pad("src").unwrap();
- input
- .add_pad(
- &gst::GhostPad::builder(Some("src"), gst::PadDirection::Src)
- .build_with_target(&srcpad)
- .unwrap(),
- )
- .unwrap();
+ bin.add(&source).unwrap();
+
+ let element_weak = element.downgrade();
+ source.connect_pad_added(move |_, pad| {
+ let element = match element_weak.upgrade() {
+ None => return,
+ Some(element) => element,
+ };
+ let src = FallbackSrc::from_instance(&element);
+
+ if let Err(msg) = src.handle_source_pad_added(&element, pad, true) {
+ element.post_error_message(msg);
+ }
+ });
+ let element_weak = element.downgrade();
+ source.connect_pad_removed(move |_, pad| {
+ let element = match element_weak.upgrade() {
+ None => return,
+ Some(element) => element,
+ };
+ let src = FallbackSrc::from_instance(&element);
+
+ src.handle_source_pad_removed(&element, pad, true);
+ });
+
+ // Handle any async state changes internally, they don't affect the pipeline because we
+ // convert everything to a live stream
+ bin.set_property("async-handling", true);
+ // Don't let the bin handle state changes of the dbin. We want to do it manually to catch
+ // possible errors and retry, without causing the whole bin state change to fail
+ bin.set_locked_state(true);
- input.upcast()
+ element.add(&bin).unwrap();
+
+ Some(SourceBin {
+ source: bin,
+ pending_restart: false,
+ is_live: false,
+ is_image: false,
+ restart_timeout: None,
+ pending_restart_timeout: None,
+ retry_timeout: None,
+ streams: None,
+ })
}
#[allow(clippy::too_many_arguments)]
@@ -935,65 +1165,23 @@ impl FallbackSrc {
timeout: gst::ClockTime,
min_latency: gst::ClockTime,
is_audio: bool,
- fallback_uri: Option<&str>,
immediate_fallback: bool,
- fallback_caps: &gst::Caps,
+ dummy_source: &gst::Bin,
+ filter_caps: &gst::Caps,
) -> Stream {
- let fallback_input = if is_audio {
- self.create_fallback_audio_input(element)
- } else {
- self.create_fallback_video_input(element, min_latency, fallback_uri)
- };
-
- let fallback_capsfilter =
- gst::ElementFactory::make("capsfilter", None).expect("No capsfilter found");
- fallback_capsfilter.set_property("caps", fallback_caps);
-
let switch =
gst::ElementFactory::make("fallbackswitch", None).expect("No fallbackswitch found");
- let clocksync = gst::ElementFactory::make("clocksync", None)
- .or_else(|_| -> Result<_, glib::BoolError> {
- let identity = gst::ElementFactory::make("identity", None)?;
- identity.set_property("sync", true);
- Ok(identity)
- })
- .expect("No clocksync or identity found");
-
- // Workaround for issues caused by https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/issues/800
- let clocksync_queue = gst::ElementFactory::make("queue", None).expect("No queue found");
- clocksync_queue.set_properties(&[
- ("max-size-buffers", &0u32),
- ("max-size-bytes", &0u32),
- ("max-size-time", &gst::ClockTime::SECOND),
- ]);
- element
- .add_many(&[
- &fallback_input,
- &fallback_capsfilter,
- &switch,
- &clocksync_queue,
- &clocksync,
- ])
- .unwrap();
- fallback_input.link(&fallback_capsfilter).unwrap();
+ element.add(&switch).unwrap();
switch.set_property("timeout", timeout.nseconds());
switch.set_property("min-upstream-latency", min_latency.nseconds());
switch.set_property("immediate-fallback", immediate_fallback);
- gst::Element::link_pads(&clocksync_queue, Some("src"), &clocksync, Some("sink")).unwrap();
-
- let clocksync_srcpad = clocksync.static_pad("src").unwrap();
- let switch_mainsink = switch.request_pad_simple("sink_%u").unwrap();
- clocksync_srcpad.link(&switch_mainsink).unwrap();
- switch_mainsink.set_property("priority", 0u32);
- // clocksync_queue sink pad is not connected to anything yet at this point!
-
- let fallback_srcpad = fallback_capsfilter.static_pad("src").unwrap();
- let switch_fallbacksink = switch.request_pad_simple("sink_%u").unwrap();
- fallback_srcpad.link(&switch_fallbacksink).unwrap();
- switch_fallbacksink.set_property("priority", 1u32);
+ let dummy_srcpad = dummy_source.static_pad("src").unwrap();
+ let dummy_sinkpad = switch.request_pad_simple("sink_%u").unwrap();
+ dummy_srcpad.link(&dummy_sinkpad).unwrap();
+ dummy_sinkpad.set_property("priority", 2u32);
let element_weak = element.downgrade();
switch.connect_notify(Some("active-pad"), move |_switch, _pspec| {
@@ -1003,7 +1191,7 @@ impl FallbackSrc {
};
let src = element.imp();
- src.handle_switch_active_pad_change(&element);
+ src.handle_switch_active_pad_change(&element, is_audio);
});
let srcpad = switch.static_pad("src").unwrap();
@@ -1026,19 +1214,16 @@ impl FallbackSrc {
.build_with_target(&srcpad)
.unwrap();
+ let _ = ghostpad.set_active(true);
+
element.add_pad(&ghostpad).unwrap();
Stream {
- fallback_input,
- fallback_capsfilter,
- source_srcpad: None,
- source_srcpad_block: None,
- clocksync,
- imagefreeze: None,
- clocksync_queue_srcpad: clocksync_queue.static_pad("src").unwrap(),
- clocksync_queue,
+ main_branch: None,
+ fallback_branch: None,
switch,
srcpad: ghostpad.upcast(),
+ filter_caps: filter_caps.clone(),
}
}
@@ -1074,62 +1259,77 @@ impl FallbackSrc {
// Create main input
let source = self.create_main_input(element, &configured_source, settings.buffer_duration);
+ // Create fallback input
+ let fallback_source =
+ self.create_fallback_input(element, fallback_uri.as_deref(), settings.buffer_duration);
+
let mut flow_combiner = gst_base::UniqueFlowCombiner::new();
- // Create video stream
- let video_stream = if settings.enable_video {
+ // Create video stream and video dummy input
+ let (video_stream, video_dummy_source) = if settings.enable_video {
+ let video_dummy_source = Self::create_dummy_video_source(
+ &settings.fallback_video_caps,
+ settings.min_latency,
+ );
+ element.add(&video_dummy_source).unwrap();
+
let stream = self.create_stream(
element,
settings.timeout,
settings.min_latency,
false,
- fallback_uri.as_deref(),
settings.immediate_fallback,
+ &video_dummy_source,
&settings.fallback_video_caps,
);
flow_combiner.add_pad(&stream.srcpad);
- Some(stream)
+
+ (Some(stream), Some(video_dummy_source))
} else {
- None
+ (None, None)
};
- // Create audio stream
- let audio_stream = if settings.enable_audio {
+ // Create audio stream and out dummy input
+ let (audio_stream, audio_dummy_source) = if settings.enable_audio {
+ let audio_dummy_source = Self::create_dummy_audio_source(
+ &settings.fallback_audio_caps,
+ settings.min_latency,
+ );
+ element.add(&audio_dummy_source).unwrap();
+
let stream = self.create_stream(
element,
settings.timeout,
settings.min_latency,
true,
- None,
settings.immediate_fallback,
+ &audio_dummy_source,
&settings.fallback_audio_caps,
);
flow_combiner.add_pad(&stream.srcpad);
- Some(stream)
+
+ (Some(stream), Some(audio_dummy_source))
} else {
- None
+ (None, None)
};
let manually_blocked = settings.manual_unblock;
*state_guard = Some(State {
source,
- source_is_live: false,
- source_pending_restart: false,
- source_restart_timeout: None,
- source_pending_restart_timeout: None,
- source_retry_timeout: None,
+ fallback_source,
video_stream,
audio_stream,
+ audio_dummy_source,
+ video_dummy_source,
flow_combiner,
last_buffering_update: None,
- streams: None,
+ fallback_last_buffering_update: None,
settings,
configured_source,
stats: Stats::default(),
manually_blocked,
schedule_restart_on_unblock: false,
- is_image: false,
});
drop(state_guard);
@@ -1159,11 +1359,21 @@ impl FallbackSrc {
.iter()
.filter_map(|v| v.as_ref())
{
+ for branch in [&stream.main_branch, &stream.fallback_branch]
+ .iter()
+ .filter_map(|v| v.as_ref())
+ {
+ element.remove(&branch.queue).unwrap();
+ element.remove(&branch.converters).unwrap();
+ element.remove(&branch.clocksync).unwrap();
+ if let Some(ref imagefreeze) = branch.imagefreeze {
+ element.remove(imagefreeze).unwrap();
+ }
+ if branch.switch_pad.parent().as_ref() == Some(stream.switch.upcast_ref()) {
+ stream.switch.release_request_pad(&branch.switch_pad);
+ }
+ }
element.remove(&stream.switch).unwrap();
- element.remove(&stream.clocksync_queue).unwrap();
- element.remove(&stream.clocksync).unwrap();
- element.remove(&stream.fallback_capsfilter).unwrap();
- element.remove(&stream.fallback_input).unwrap();
let _ = stream.srcpad.set_target(None::<&gst::Pad>);
let _ = element.remove_pad(&stream.srcpad);
}
@@ -1173,59 +1383,100 @@ impl FallbackSrc {
if let Source::Element(ref source) = state.configured_source {
// Explicitly remove the source element from the CustomSource so that we can
// later create a new CustomSource and add it again there.
- if source.has_as_parent(&state.source) {
+ if source.has_as_parent(&state.source.source) {
let _ = source.set_state(gst::State::Null);
let _ = state
.source
+ .source
.downcast_ref::<gst::Bin>()
.unwrap()
.remove(source);
}
}
- element.remove(&state.source).unwrap();
- if let Some(timeout) = state.source_pending_restart_timeout.take() {
- timeout.unschedule();
- }
+ for source in [Some(&mut state.source), state.fallback_source.as_mut()]
+ .iter_mut()
+ .flatten()
+ {
+ element.remove(&source.source).unwrap();
- if let Some(timeout) = state.source_retry_timeout.take() {
- timeout.unschedule();
+ if let Some(timeout) = source.pending_restart_timeout.take() {
+ timeout.unschedule();
+ }
+
+ if let Some(timeout) = source.retry_timeout.take() {
+ timeout.unschedule();
+ }
+
+ if let Some(timeout) = source.restart_timeout.take() {
+ timeout.unschedule();
+ }
}
- if let Some(timeout) = state.source_restart_timeout.take() {
- timeout.unschedule();
+ for source in [
+ state.video_dummy_source.take(),
+ state.audio_dummy_source.take(),
+ ]
+ .iter()
+ .flatten()
+ {
+ let _ = source.set_state(gst::State::Null);
+ element.remove(source).unwrap();
}
gst::debug!(CAT, obj: element, "Stopped");
}
- fn change_source_state(&self, element: &super::FallbackSrc, transition: gst::StateChange) {
- gst::debug!(CAT, obj: element, "Changing source state: {:?}", transition);
+ fn change_source_state(
+ &self,
+ element: &super::FallbackSrc,
+ transition: gst::StateChange,
+ fallback_source: bool,
+ ) {
+ gst::debug!(
+ CAT,
+ obj: element,
+ "Changing {}source state: {:?}",
+ if fallback_source { "fallback " } else { "" },
+ transition
+ );
let mut state_guard = self.state.lock();
let state = match &mut *state_guard {
Some(state) => state,
None => return,
};
- if transition.current() <= transition.next() && state.source_pending_restart {
+ let mut source = if fallback_source {
+ if let Some(ref mut source) = state.fallback_source {
+ source
+ } else {
+ return;
+ }
+ } else {
+ &mut state.source
+ };
+
+ if transition.current() <= transition.next() && source.pending_restart {
gst::debug!(
CAT,
obj: element,
- "Not starting source because pending restart"
+ "Not starting {}source because pending restart",
+ if fallback_source { "fallback " } else { "" }
);
return;
- } else if transition.next() <= gst::State::Ready && state.source_pending_restart {
+ } else if transition.next() <= gst::State::Ready && source.pending_restart {
gst::debug!(
CAT,
obj: element,
- "Unsetting pending restart because shutting down"
+ "Unsetting pending {}restart because shutting down",
+ if fallback_source { "fallback " } else { "" }
);
- state.source_pending_restart = false;
- if let Some(timeout) = state.source_pending_restart_timeout.take() {
+ source.pending_restart = false;
+ if let Some(timeout) = source.pending_restart_timeout.take() {
timeout.unschedule();
}
}
- let source = state.source.clone();
+ let source = source.source.clone();
drop(state_guard);
element.notify("status");
@@ -1233,13 +1484,23 @@ impl FallbackSrc {
let res = source.set_state(transition.next());
match res {
Err(_) => {
- gst::error!(CAT, obj: element, "Source failed to change state");
+ gst::error!(
+ CAT,
+ obj: element,
+ "{}source failed to change state",
+ if fallback_source { "fallback " } else { "" }
+ );
// Try again later if we're not shutting down
if transition != gst::StateChange::ReadyToNull {
let _ = source.set_state(gst::State::Null);
let mut state_guard = self.state.lock();
let state = state_guard.as_mut().expect("no state");
- self.handle_source_error(element, state, RetryReason::StateChangeFailure);
+ self.handle_source_error(
+ element,
+ state,
+ RetryReason::StateChangeFailure,
+ fallback_source,
+ );
drop(state_guard);
element.notify("statistics");
}
@@ -1248,24 +1509,57 @@ impl FallbackSrc {
gst::debug!(
CAT,
obj: element,
- "Source changed state successfully: {:?}",
+ "{}source changed state successfully: {:?}",
+ if fallback_source { "fallback " } else { "" },
res
);
let mut state_guard = self.state.lock();
let state = state_guard.as_mut().expect("no state");
+ let source = if fallback_source {
+ if let Some(ref mut source) = state.fallback_source {
+ source
+ } else {
+ return;
+ }
+ } else {
+ &mut state.source
+ };
+
// Remember if the source is live
if transition == gst::StateChange::ReadyToPaused {
- state.source_is_live = res == gst::StateChangeSuccess::NoPreroll;
+ source.is_live = res == gst::StateChangeSuccess::NoPreroll;
}
- if (state.source_is_live && transition == gst::StateChange::ReadyToPaused)
- || (!state.source_is_live && transition == gst::StateChange::PausedToPlaying)
+ if (!source.is_live && transition == gst::StateChange::ReadyToPaused)
+ || (source.is_live && transition == gst::StateChange::PausedToPlaying)
+ {
+ if !fallback_source {
+ state.schedule_restart_on_unblock = true;
+ }
+ if source.restart_timeout.is_none() {
+ self.schedule_source_restart_timeout(
+ element,
+ state,
+ gst::ClockTime::ZERO,
+ fallback_source,
+ );
+ }
+ } else if (!source.is_live && transition == gst::StateChange::PausedToReady)
+ || (source.is_live && transition == gst::StateChange::PlayingToPaused)
{
- assert!(state.source_restart_timeout.is_none());
- state.schedule_restart_on_unblock = true;
- self.schedule_source_restart_timeout(element, state, gst::ClockTime::ZERO);
+ if let Some(timeout) = source.pending_restart_timeout.take() {
+ timeout.unschedule();
+ }
+
+ if let Some(timeout) = source.retry_timeout.take() {
+ timeout.unschedule();
+ }
+
+ if let Some(timeout) = source.restart_timeout.take() {
+ timeout.unschedule();
+ }
}
}
}
@@ -1292,8 +1586,27 @@ impl FallbackSrc {
&self,
element: &super::FallbackSrc,
pad: &gst::Pad,
+ fallback_source: bool,
) -> Result<(), gst::ErrorMessage> {
- gst::debug!(CAT, obj: element, "Pad {} added to source", pad.name(),);
+ gst::debug!(
+ CAT,
+ obj: element,
+ "Pad {} added to {}source",
+ pad.name(),
+ if fallback_source { "fallback " } else { "" }
+ );
+
+ let mut is_image = false;
+
+ if let Some(ev) = pad.sticky_event::<gst::event::StreamStart>(0) {
+ let stream = ev.stream();
+
+ if let Some(caps) = stream.and_then(|s| s.caps()) {
+ if let Some(s) = caps.structure(0) {
+ is_image = s.name().starts_with("image/");
+ }
+ }
+ }
let mut state_guard = self.state.lock();
let state = match &mut *state_guard {
@@ -1303,39 +1616,35 @@ impl FallbackSrc {
Some(state) => state,
};
- let mut is_image = false;
-
- if let Some(ev) = pad.sticky_event::<gst::event::StreamStart>(0) {
- let stream = ev.stream();
-
- if let Some(stream) = stream {
- if let Some(caps) = stream.caps() {
- if let Some(s) = caps.structure(0) {
- is_image = s.name().starts_with("image/");
- }
- }
+ let source = if fallback_source {
+ if let Some(ref mut source) = state.fallback_source {
+ source
+ } else {
+ return Ok(());
}
- }
+ } else {
+ &mut state.source
+ };
if is_image {
- if let Some(timeout) = state.source_pending_restart_timeout.take() {
+ if let Some(timeout) = source.pending_restart_timeout.take() {
timeout.unschedule();
}
- if let Some(timeout) = state.source_retry_timeout.take() {
+ if let Some(timeout) = source.retry_timeout.take() {
timeout.unschedule();
}
- if let Some(timeout) = state.source_restart_timeout.take() {
+ if let Some(timeout) = source.restart_timeout.take() {
timeout.unschedule();
}
}
- state.is_image |= is_image;
+ source.is_image |= is_image;
let (is_video, stream) = match pad.name() {
- x if x.starts_with("audio_") => (false, &mut state.audio_stream),
- x if x.starts_with("video_") => (true, &mut state.video_stream),
+ x if x.starts_with("audio") => (false, &mut state.audio_stream),
+ x if x.starts_with("video") => (true, &mut state.video_stream),
_ => {
let caps = match pad.current_caps().unwrap_or_else(|| pad.query_caps(None)) {
caps if !caps.is_any() && !caps.is_empty() => caps,
@@ -1357,34 +1666,151 @@ impl FallbackSrc {
let type_ = if is_video { "video" } else { "audio" };
- let stream = match stream {
+ let (branch_storage, filter_caps, switch) = match stream {
None => {
gst::debug!(CAT, obj: element, "No {} stream enabled", type_);
return Ok(());
}
Some(Stream {
- source_srcpad: Some(_),
+ ref mut main_branch,
+ ref switch,
+ ref filter_caps,
+ ..
+ }) if !fallback_source => {
+ if main_branch.is_some() {
+ gst::debug!(CAT, obj: element, "Already configured a {} stream", type_);
+ return Ok(());
+ }
+
+ (main_branch, filter_caps, switch)
+ }
+ Some(Stream {
+ ref mut fallback_branch,
+ ref switch,
+ ref filter_caps,
..
}) => {
- gst::debug!(CAT, obj: element, "Already configured a {} stream", type_);
- return Ok(());
+ if fallback_branch.is_some() {
+ gst::debug!(
+ CAT,
+ obj: element,
+ "Already configured a {} fallback stream",
+ type_
+ );
+ return Ok(());
+ }
+
+ (fallback_branch, filter_caps, switch)
}
- Some(ref mut stream) => stream,
};
- let sinkpad = if is_image {
- let imagefreeze = if let Some(ref imagefreeze) = stream.imagefreeze {
- imagefreeze
- } else {
- let imagefreeze =
- gst::ElementFactory::make("imagefreeze", None).expect("no imagefreeze found");
-
- gst::debug!(CAT, "image stream, inserting imagefreeze");
- element.add(&imagefreeze).unwrap();
- imagefreeze.set_property("is-live", true);
- stream.imagefreeze = Some(imagefreeze);
- stream.imagefreeze.as_ref().unwrap()
- };
+ let converters = if is_video {
+ let bin = gst::Bin::new(None);
+
+ let videoconvert =
+ gst::ElementFactory::make("videoconvert", Some("video_videoconvert"))
+ .expect("No videoconvert found");
+
+ let videoscale = gst::ElementFactory::make("videoscale", Some("video_videoscale"))
+ .expect("No videoscale found");
+
+ let capsfilter = gst::ElementFactory::make("capsfilter", Some("video_capsfilter"))
+ .expect("No capsfilter found");
+
+ capsfilter.set_property("caps", filter_caps);
+
+ bin.add_many(&[&videoconvert, &videoscale, &capsfilter])
+ .unwrap();
+
+ gst::Element::link_many(&[&videoconvert, &videoscale, &capsfilter]).unwrap();
+
+ let ghostpad =
+ gst::GhostPad::with_target(Some("sink"), &videoconvert.static_pad("sink").unwrap())
+ .unwrap();
+ ghostpad.set_active(true).unwrap();
+ bin.add_pad(&ghostpad).unwrap();
+
+ let ghostpad =
+ gst::GhostPad::with_target(Some("src"), &capsfilter.static_pad("src").unwrap())
+ .unwrap();
+ ghostpad.set_active(true).unwrap();
+ bin.add_pad(&ghostpad).unwrap();
+
+ bin.upcast()
+ } else {
+ let bin = gst::Bin::new(None);
+
+ let audioconvert =
+ gst::ElementFactory::make("audioconvert", Some("audio_audioconvert"))
+ .expect("No audioconvert found");
+
+ let audioresample =
+ gst::ElementFactory::make("audioresample", Some("audio_audioresample"))
+ .expect("No audioresample found");
+
+ let capsfilter = gst::ElementFactory::make("capsfilter", Some("audio_capsfilter"))
+ .expect("No capsfilter found");
+
+ capsfilter.set_property("caps", filter_caps);
+
+ bin.add_many(&[&audioconvert, &audioresample, &capsfilter])
+ .unwrap();
+
+ gst::Element::link_many(&[&audioconvert, &audioresample, &capsfilter]).unwrap();
+
+ let ghostpad =
+ gst::GhostPad::with_target(Some("sink"), &audioconvert.static_pad("sink").unwrap())
+ .unwrap();
+ ghostpad.set_active(true).unwrap();
+ bin.add_pad(&ghostpad).unwrap();
+
+ let ghostpad =
+ gst::GhostPad::with_target(Some("src"), &capsfilter.static_pad("src").unwrap())
+ .unwrap();
+ ghostpad.set_active(true).unwrap();
+ bin.add_pad(&ghostpad).unwrap();
+
+ bin.upcast()
+ };
+
+ let queue = gst::ElementFactory::make("queue", None).unwrap();
+ queue.set_properties(&[
+ ("max-size-bytes", &0u32),
+ ("max-size-buffers", &0u32),
+ (
+ "max-size-time",
+ &(cmp::max(state.settings.min_latency, gst::ClockTime::from_seconds(1))),
+ ),
+ ]);
+ let clocksync = gst::ElementFactory::make("clocksync", None).unwrap_or_else(|_| {
+ let identity = gst::ElementFactory::make("identity", None).unwrap();
+ identity.set_property("sync", true);
+ identity
+ });
+
+ source
+ .source
+ .add_many(&[&converters, &queue, &clocksync])
+ .unwrap();
+ converters.sync_state_with_parent().unwrap();
+ queue.sync_state_with_parent().unwrap();
+ clocksync.sync_state_with_parent().unwrap();
+
+ let sinkpad = converters.static_pad("sink").unwrap();
+ pad.link(&sinkpad).map_err(|err| {
+ gst::error!(CAT, obj: element, "Failed to link new source pad: {}", err);
+ gst::error_msg!(
+ gst::CoreError::Negotiation,
+ ["Failed to link new source pad: {}", err]
+ )
+ })?;
+
+ let imagefreeze = if is_image {
+ gst::debug!(CAT, obj: element, "Image stream, inserting imagefreeze");
+ let imagefreeze =
+ gst::ElementFactory::make("imagefreeze", None).expect("no imagefreeze found");
+ source.source.add(&imagefreeze).unwrap();
+ imagefreeze.set_property("is-live", true);
if imagefreeze.sync_state_with_parent().is_err() {
gst::error!(CAT, obj: element, "imagefreeze failed to change state",);
@@ -1393,30 +1819,23 @@ impl FallbackSrc {
["Failed to change imagefreeze state"]
));
}
- imagefreeze.link(&stream.clocksync_queue).unwrap();
- imagefreeze.static_pad("sink").unwrap()
+ converters.link(&imagefreeze).unwrap();
+ imagefreeze.link(&queue).unwrap();
+ Some(imagefreeze)
} else {
- if let Some(imagefreeze) = stream.imagefreeze.take() {
- imagefreeze.set_locked_state(true);
- let _ = imagefreeze.set_state(gst::State::Null);
- element.remove(&imagefreeze).unwrap();
- }
-
- stream.clocksync_queue.static_pad("sink").unwrap()
+ converters.link(&queue).unwrap();
+ None
};
- pad.link(&sinkpad).map_err(|err| {
- gst::error!(
- CAT,
- obj: element,
- "Failed to link source pad to clocksync: {}",
- err
- );
- gst::error_msg!(
- gst::CoreError::Negotiation,
- ["Failed to link source pad to clocksync: {}", err]
- )
- })?;
+ let ghostpad =
+ gst::GhostPad::with_target(Some(type_), &queue.static_pad("src").unwrap()).unwrap();
+ let _ = ghostpad.set_active(true);
+ source.source.add_pad(&ghostpad).unwrap();
+
+ // Link the new source pad in
+ let switch_pad = switch.request_pad_simple("sink_%u").unwrap();
+ switch_pad.set_property("priority", if fallback_source { 1u32 } else { 0u32 });
+ ghostpad.link(&switch_pad).unwrap();
let element_weak = element.downgrade();
pad.add_probe(gst::PadProbeType::EVENT_DOWNSTREAM, move |pad, info| {
@@ -1432,7 +1851,8 @@ impl FallbackSrc {
gst::debug!(
CAT,
obj: &element,
- "Received EOS from source on pad {}",
+ "Received EOS from {}source on pad {}",
+ if fallback_source { "fallback " } else { "" },
pad.name()
);
@@ -1447,7 +1867,7 @@ impl FallbackSrc {
if is_image {
gst::PadProbeReturn::Ok
} else if state.settings.restart_on_eos {
- src.handle_source_error(&element, state, RetryReason::Eos);
+ src.handle_source_error(&element, state, RetryReason::Eos, fallback_source);
drop(state_guard);
element.notify("statistics");
@@ -1460,13 +1880,10 @@ impl FallbackSrc {
state.video_stream.as_ref()
}
} {
- if other_stream.source_srcpad.is_none() {
- let fallback_input = &other_stream.fallback_input;
- let clocksync_queue_sinkpad =
- other_stream.clocksync_queue.static_pad("sink").unwrap();
- fallback_input.call_async(move |fallback_input| {
- fallback_input.send_event(gst::event::Eos::new());
- clocksync_queue_sinkpad.send_event(gst::event::Eos::new());
+ if other_stream.main_branch.is_none() {
+ let sinkpad = other_stream.switch.static_pad("sink").unwrap();
+ element.call_async(move |_| {
+ sinkpad.send_event(gst::event::Eos::new());
});
}
}
@@ -1478,9 +1895,20 @@ impl FallbackSrc {
}
});
- assert!(stream.source_srcpad_block.is_none());
- stream.source_srcpad = Some(pad.clone());
- stream.source_srcpad_block = Some(self.add_pad_probe(element, stream));
+ let queue_srcpad = queue.static_pad("src").unwrap();
+ let source_srcpad_block =
+ Some(self.add_pad_probe(element, pad, &queue_srcpad, fallback_source));
+
+ *branch_storage = Some(StreamBranch {
+ source_srcpad: pad.clone(),
+ source_srcpad_block,
+ imagefreeze,
+ clocksync,
+ converters,
+ queue,
+ queue_srcpad,
+ switch_pad,
+ });
drop(state_guard);
element.notify("status");
@@ -1488,7 +1916,13 @@ impl FallbackSrc {
Ok(())
}
- fn add_pad_probe(&self, element: &super::FallbackSrc, stream: &mut Stream) -> Block {
+ fn add_pad_probe(
+ &self,
+ element: &super::FallbackSrc,
+ pad: &gst::Pad,
+ block_pad: &gst::Pad,
+ fallback_source: bool,
+ ) -> Block {
// FIXME: Not literally correct as we add the probe to the queue source pad but that's only
// a workaround until
// https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/issues/800
@@ -1496,13 +1930,14 @@ impl FallbackSrc {
gst::debug!(
CAT,
obj: element,
- "Adding probe to pad {}",
- stream.source_srcpad.as_ref().unwrap().name()
+ "Adding blocking probe to pad {} for pad {} (fallback: {})",
+ block_pad.name(),
+ pad.name(),
+ fallback_source,
);
let element_weak = element.downgrade();
- let probe_id = stream
- .clocksync_queue_srcpad
+ let probe_id = block_pad
.add_probe(
gst::PadProbeType::BLOCK
| gst::PadProbeType::BUFFER
@@ -1523,7 +1958,7 @@ impl FallbackSrc {
let src = element.imp();
- if let Err(msg) = src.handle_pad_blocked(&element, pad, pts) {
+ if let Err(msg) = src.handle_pad_blocked(&element, pad, pts, fallback_source) {
element.post_error_message(msg);
}
@@ -1533,7 +1968,7 @@ impl FallbackSrc {
.unwrap();
Block {
- pad: stream.clocksync_queue_srcpad.clone(),
+ pad: block_pad.clone(),
probe_id,
running_time: gst::ClockTime::NONE,
}
@@ -1544,6 +1979,7 @@ impl FallbackSrc {
element: &super::FallbackSrc,
pad: &gst::Pad,
pts: impl Into<Option<gst::ClockTime>>,
+ fallback_source: bool,
) -> Result<(), gst::ErrorMessage> {
let mut state_guard = self.state.lock();
let state = match &mut *state_guard {
@@ -1553,57 +1989,100 @@ impl FallbackSrc {
Some(state) => state,
};
- // FIXME: Not literally correct as we added the probe to the queue source pad but that's only
- // a workaround until
- // https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/issues/800
- // is fixed.
+ let (branch, source) = match &mut *state {
+ State {
+ audio_stream:
+ Some(Stream {
+ main_branch: Some(ref mut branch),
+ ..
+ }),
+ ref source,
+ ..
+ } if !fallback_source && &branch.queue_srcpad == pad => {
+ gst::debug!(
+ CAT,
+ obj: element,
+ "Called probe on pad {} for pad {} (fallback: {})",
+ pad.name(),
+ branch.source_srcpad.name(),
+ fallback_source
+ );
- let stream = if let Some(stream) = state
- .audio_stream
- .as_mut()
- .filter(|s| &s.clocksync_queue_srcpad == pad)
- {
- gst::debug!(
- CAT,
- obj: element,
- "Called probe on pad {}",
- stream.source_srcpad.as_ref().unwrap().name()
- );
- stream
- } else if let Some(stream) = state
- .video_stream
- .as_mut()
- .filter(|s| &s.clocksync_queue_srcpad == pad)
- {
- gst::debug!(
- CAT,
- obj: element,
- "Called probe on pad {}",
- stream.source_srcpad.as_ref().unwrap().name()
- );
- stream
- } else {
- unreachable!();
+ (branch, source)
+ }
+ State {
+ audio_stream:
+ Some(Stream {
+ fallback_branch: Some(ref mut branch),
+ ..
+ }),
+ fallback_source: Some(ref source),
+ ..
+ } if fallback_source && &branch.queue_srcpad == pad => {
+ gst::debug!(
+ CAT,
+ obj: element,
+ "Called probe on pad {} for pad {} (fallback: {})",
+ pad.name(),
+ branch.source_srcpad.name(),
+ fallback_source
+ );
+
+ (branch, source)
+ }
+ State {
+ video_stream:
+ Some(Stream {
+ main_branch: Some(ref mut branch),
+ ..
+ }),
+ ref source,
+ ..
+ } if !fallback_source && &branch.queue_srcpad == pad => {
+ gst::debug!(
+ CAT,
+ obj: element,
+ "Called probe on pad {} for pad {} (fallback: {})",
+ pad.name(),
+ branch.source_srcpad.name(),
+ fallback_source,
+ );
+
+ (branch, source)
+ }
+ State {
+ video_stream:
+ Some(Stream {
+ fallback_branch: Some(ref mut branch),
+ ..
+ }),
+ fallback_source: Some(ref source),
+ ..
+ } if fallback_source && &branch.queue_srcpad == pad => {
+ gst::debug!(
+ CAT,
+ obj: element,
+ "Called probe on pad {} for pad {} (fallback: {})",
+ pad.name(),
+ branch.source_srcpad.name(),
+ fallback_source
+ );
+
+ (branch, source)
+ }
+ _ => unreachable!(),
};
// Directly unblock for live streams
- if state.source_is_live {
- for (source_srcpad, block) in [state.video_stream.as_mut(), state.audio_stream.as_mut()]
- .iter_mut()
- .filter_map(|s| s.as_mut())
- .filter_map(|s| {
- if let Some(block) = s.source_srcpad_block.take() {
- Some((s.source_srcpad.as_ref().unwrap(), block))
- } else {
- None
- }
- })
- {
+ if source.is_live {
+ if let Some(block) = branch.source_srcpad_block.take() {
gst::debug!(
CAT,
obj: element,
- "Removing pad probe for pad {}",
- source_srcpad.name()
+ "Removing pad probe on pad {} for pad {} (fallback: {})",
+ pad.name(),
+ branch.source_srcpad.name(),
+ fallback_source,
);
block.pad.remove_probe(block.probe_id);
}
@@ -1617,7 +2096,7 @@ impl FallbackSrc {
}
// Update running time for this block
- let block = match stream.source_srcpad_block {
+ let block = match branch.source_srcpad_block {
Some(ref mut block) => block,
None => return Ok(()),
};
@@ -1655,7 +2134,7 @@ impl FallbackSrc {
block.running_time = running_time;
- self.unblock_pads(element, state);
+ self.unblock_pads(element, state, fallback_source);
drop(state_guard);
element.notify("status");
@@ -1663,15 +2142,25 @@ impl FallbackSrc {
Ok(())
}
- fn unblock_pads(&self, element: &super::FallbackSrc, state: &mut State) {
- if state.manually_blocked {
+ fn unblock_pads(&self, element: &super::FallbackSrc, state: &mut State, fallback_source: bool) {
+ let current_running_time = match element.current_running_time() {
+ Some(current_running_time) => current_running_time,
+ None => {
+ gst::debug!(CAT, obj: element, "Waiting for current_running_time");
+ return;
+ }
+ };
+
+ if !fallback_source && state.manually_blocked {
gst::debug!(CAT, obj: element, "Not unblocking yet: manual unblock",);
return;
}
// Check if all streams are blocked and have a running time and we have
// 100% buffering
- if state.stats.buffering_percent < 100 {
+ if (fallback_source && state.stats.fallback_buffering_percent < 100)
+ || (!fallback_source && state.stats.buffering_percent < 100)
+ {
gst::debug!(
CAT,
obj: element,
@@ -1681,7 +2170,18 @@ impl FallbackSrc {
return;
}
- let streams = match state.streams {
+ let source = if fallback_source {
+ if let Some(ref source) = state.fallback_source {
+ source
+ } else {
+ // There are no blocked pads if there is no fallback source
+ return;
+ }
+ } else {
+ &state.source
+ };
+
+ let streams = match source.streams {
None => {
gst::debug!(CAT, obj: element, "Have no stream collection yet");
return;
@@ -1695,28 +2195,46 @@ impl FallbackSrc {
have_video = have_video || stream.stream_type().contains(gst::StreamType::VIDEO);
}
- let want_audio = state.settings.enable_audio;
- let want_video = state.settings.enable_video;
+ // For the fallback source, if we have no audio/video then that's OK and we would continue
+ // using the corresponding dummy source
+ let want_audio = if fallback_source {
+ have_audio
+ } else {
+ state.settings.enable_audio
+ };
+ let want_video = if fallback_source {
+ have_video
+ } else {
+ state.settings.enable_video
+ };
+
+ // FIXME: All this surely can be simplified somehow
+ let mut audio_branch = state.audio_stream.as_mut().and_then(|s| {
+ if fallback_source {
+ s.fallback_branch.as_mut()
+ } else {
+ s.main_branch.as_mut()
+ }
+ });
+ let mut video_branch = state.video_stream.as_mut().and_then(|s| {
+ if fallback_source {
+ s.fallback_branch.as_mut()
+ } else {
+ s.main_branch.as_mut()
+ }
+ });
- let audio_running_time = state
- .audio_stream
+ let audio_running_time = audio_branch
.as_ref()
- .and_then(|s| s.source_srcpad_block.as_ref())
+ .and_then(|b| b.source_srcpad_block.as_ref())
.and_then(|b| b.running_time);
- let video_running_time = state
- .video_stream
+ let video_running_time = video_branch
.as_ref()
- .and_then(|s| s.source_srcpad_block.as_ref())
+ .and_then(|b| b.source_srcpad_block.as_ref())
.and_then(|b| b.running_time);
- let audio_srcpad = state
- .audio_stream
- .as_ref()
- .and_then(|s| s.source_srcpad.as_ref().cloned());
- let video_srcpad = state
- .video_stream
- .as_ref()
- .and_then(|s| s.source_srcpad.as_ref().cloned());
+ let audio_srcpad = audio_branch.as_ref().map(|b| b.source_srcpad.clone());
+ let video_srcpad = video_branch.as_ref().map(|b| b.source_srcpad.clone());
let audio_is_eos = audio_srcpad
.as_ref()
@@ -1731,15 +2249,6 @@ impl FallbackSrc {
// Also consider EOS, we'd never get a new running time after EOS so don't need to wait.
// FIXME: All this surely can be simplified somehow
- // FIXME I guess this could be moved up
- let current_running_time = match element.current_running_time() {
- Some(current_running_time) => current_running_time,
- None => {
- gst::debug!(CAT, obj: element, "Waiting for current_running_time");
- return;
- }
- };
-
if have_audio && want_audio && have_video && want_video {
if audio_running_time.is_none()
&& !audio_is_eos
@@ -1789,10 +2298,9 @@ impl FallbackSrc {
video_is_eos,
);
- if let Some(block) = state
- .audio_stream
+ if let Some(block) = audio_branch
.as_mut()
- .and_then(|s| s.source_srcpad_block.take())
+ .and_then(|b| b.source_srcpad_block.take())
{
if !audio_is_eos {
block.pad.set_offset(offset);
@@ -1800,10 +2308,9 @@ impl FallbackSrc {
block.pad.remove_probe(block.probe_id);
}
- if let Some(block) = state
- .video_stream
+ if let Some(block) = video_branch
.as_mut()
- .and_then(|s| s.source_srcpad_block.take())
+ .and_then(|b| b.source_srcpad_block.take())
{
if !video_is_eos {
block.pad.set_offset(offset);
@@ -1835,10 +2342,9 @@ impl FallbackSrc {
audio_is_eos
);
- if let Some(block) = state
- .audio_stream
+ if let Some(block) = audio_branch
.as_mut()
- .and_then(|s| s.source_srcpad_block.take())
+ .and_then(|b| b.source_srcpad_block.take())
{
if !audio_is_eos {
block.pad.set_offset(offset);
@@ -1870,10 +2376,9 @@ impl FallbackSrc {
video_is_eos
);
- if let Some(block) = state
- .video_stream
+ if let Some(block) = video_branch
.as_mut()
- .and_then(|s| s.source_srcpad_block.take())
+ .and_then(|b| b.source_srcpad_block.take())
{
if !video_is_eos {
block.pad.set_offset(offset);
@@ -1883,8 +2388,19 @@ impl FallbackSrc {
}
}
- fn handle_source_pad_removed(&self, element: &super::FallbackSrc, pad: &gst::Pad) {
- gst::debug!(CAT, obj: element, "Pad {} removed from source", pad.name());
+ fn handle_source_pad_removed(
+ &self,
+ element: &super::FallbackSrc,
+ pad: &gst::Pad,
+ fallback_source: bool,
+ ) {
+ gst::debug!(
+ CAT,
+ obj: element,
+ "Pad {} removed from {}source",
+ pad.name(),
+ if fallback_source { "fallback " } else { "" }
+ );
let mut state_guard = self.state.lock();
let state = match &mut *state_guard {
@@ -1894,28 +2410,96 @@ impl FallbackSrc {
Some(state) => state,
};
- // Don't have to do anything here other than forgetting about the pad. Unlinking will
- // automatically happen while the pad is being removed from source and thus leaves the
- // bin hierarchy
- let stream = if let Some(stream) = state
- .audio_stream
- .as_mut()
- .filter(|s| s.source_srcpad.as_ref() == Some(pad))
- {
- stream
- } else if let Some(stream) = state
- .video_stream
- .as_mut()
- .filter(|s| s.source_srcpad.as_ref() == Some(pad))
- {
- stream
- } else {
- return;
+ let (mut branch, is_video, source, switch) = match &mut *state {
+ State {
+ audio_stream:
+ Some(Stream {
+ ref mut main_branch,
+ ref switch,
+ ..
+ }),
+ ref source,
+ ..
+ } if !fallback_source
+ && main_branch.as_ref().map(|b| &b.source_srcpad) == Some(pad) =>
+ {
+ (main_branch.take().unwrap(), false, source, switch)
+ }
+ State {
+ audio_stream:
+ Some(Stream {
+ ref mut fallback_branch,
+ ref switch,
+ ..
+ }),
+ fallback_source: Some(ref source),
+ ..
+ } if fallback_source
+ && fallback_branch.as_ref().map(|b| &b.source_srcpad) == Some(pad) =>
+ {
+ (fallback_branch.take().unwrap(), false, source, switch)
+ }
+ State {
+ video_stream:
+ Some(Stream {
+ ref mut main_branch,
+ ref switch,
+ ..
+ }),
+ ref source,
+ ..
+ } if !fallback_source
+ && main_branch.as_ref().map(|b| &b.source_srcpad) == Some(pad) =>
+ {
+ (main_branch.take().unwrap(), true, source, switch)
+ }
+ State {
+ video_stream:
+ Some(Stream {
+ ref mut fallback_branch,
+ ref switch,
+ ..
+ }),
+ fallback_source: Some(ref source),
+ ..
+ } if fallback_source
+ && fallback_branch.as_ref().map(|b| &b.source_srcpad) == Some(pad) =>
+ {
+ (fallback_branch.take().unwrap(), true, source, switch)
+ }
+ _ => return,
};
- stream.source_srcpad = None;
+ branch.queue.set_locked_state(true);
+ let _ = branch.queue.set_state(gst::State::Null);
+ source.source.remove(&branch.queue).unwrap();
+
+ branch.converters.set_locked_state(true);
+ let _ = branch.converters.set_state(gst::State::Null);
+ source.source.remove(&branch.converters).unwrap();
+
+ branch.clocksync.set_locked_state(true);
+ let _ = branch.clocksync.set_state(gst::State::Null);
+ source.source.remove(&branch.clocksync).unwrap();
+
+ if let Some(imagefreeze) = branch.imagefreeze.take() {
+ imagefreeze.set_locked_state(true);
+ let _ = imagefreeze.set_state(gst::State::Null);
+ source.source.remove(&imagefreeze).unwrap();
+ }
+
+ if branch.switch_pad.parent().as_ref() == Some(switch.upcast_ref()) {
+ switch.release_request_pad(&branch.switch_pad);
+ }
+
+ let ghostpad = source
+ .source
+ .static_pad(if is_video { "video" } else { "audio" })
+ .unwrap();
+ let _ = ghostpad.set_active(false);
+ source.source.remove_pad(&ghostpad).unwrap();
- self.unblock_pads(element, state);
+ self.unblock_pads(element, state, fallback_source);
drop(state_guard);
element.notify("status");
@@ -1930,30 +2514,85 @@ impl FallbackSrc {
Some(state) => state,
};
- if state.source_pending_restart {
+ let src = match m.src() {
+ Some(src) => src,
+ None => return,
+ };
+
+ let fallback_source = if let Some(ref source) = state.fallback_source {
+ src.has_as_ancestor(&source.source)
+ } else if src.has_as_ancestor(&state.source.source) {
+ false
+ } else {
+ return;
+ };
+
+ let source = if fallback_source {
+ if let Some(ref mut source) = state.fallback_source {
+ source
+ } else {
+ return;
+ }
+ } else {
+ &mut state.source
+ };
+
+ if source.pending_restart {
gst::debug!(CAT, obj: element, "Has pending restart");
return;
}
- gst::debug!(CAT, obj: element, "Got buffering {}%", m.percent());
+ gst::debug!(
+ CAT,
+ obj: element,
+ "Got buffering {}% (fallback: {})",
+ m.percent(),
+ fallback_source
+ );
+
+ let buffering_percent = if fallback_source {
+ &mut state.stats.fallback_buffering_percent
+ } else {
+ &mut state.stats.buffering_percent
+ };
+ let last_buffering_update = if fallback_source {
+ &mut state.fallback_last_buffering_update
+ } else {
+ &mut state.last_buffering_update
+ };
- state.stats.buffering_percent = m.percent();
- if state.stats.buffering_percent < 100 {
- state.last_buffering_update = Some(Instant::now());
+ *buffering_percent = m.percent();
+ if *buffering_percent < 100 {
+ *last_buffering_update = Some(Instant::now());
// Block source pads if needed to pause
- if let Some(ref mut stream) = state.audio_stream {
- if stream.source_srcpad_block.is_none() && stream.source_srcpad.is_some() {
- stream.source_srcpad_block = Some(self.add_pad_probe(element, stream));
- }
- }
- if let Some(ref mut stream) = state.video_stream {
- if stream.source_srcpad_block.is_none() && stream.source_srcpad.is_some() {
- stream.source_srcpad_block = Some(self.add_pad_probe(element, stream));
+ for stream in [state.audio_stream.as_mut(), state.video_stream.as_mut()]
+ .iter_mut()
+ .flatten()
+ {
+ let branch = match stream {
+ Stream {
+ main_branch: Some(ref mut branch),
+ ..
+ } if !fallback_source => branch,
+ Stream {
+ fallback_branch: Some(ref mut branch),
+ ..
+ } if fallback_source => branch,
+ _ => continue,
+ };
+
+ if branch.source_srcpad_block.is_none() {
+ branch.source_srcpad_block = Some(self.add_pad_probe(
+ element,
+ &branch.source_srcpad,
+ &branch.queue_srcpad,
+ fallback_source,
+ ));
}
}
} else {
// Check if we can unblock now
- self.unblock_pads(element, state);
+ self.unblock_pads(element, state, fallback_source);
}
drop(state_guard);
@@ -1974,13 +2613,27 @@ impl FallbackSrc {
Some(state) => state,
};
+ let src = match m.src() {
+ Some(src) => src,
+ None => return,
+ };
+
+ let fallback_source = if let Some(ref source) = state.fallback_source {
+ src.has_as_ancestor(&source.source)
+ } else if src.has_as_ancestor(&state.source.source) {
+ false
+ } else {
+ return;
+ };
+
let streams = m.stream_collection();
gst::debug!(
CAT,
obj: element,
- "Got stream collection {:?}",
- streams.debug()
+ "Got stream collection {:?} (fallback: {})",
+ streams.debug(),
+ fallback_source,
);
let mut have_audio = false;
@@ -2006,20 +2659,38 @@ impl FallbackSrc {
);
}
- state.streams = Some(streams);
+ if fallback_source {
+ if let Some(ref mut source) = state.fallback_source {
+ source.streams = Some(streams);
+ }
+ } else {
+ state.source.streams = Some(streams);
+ }
// This might not be the first stream collection and we might have some unblocked pads from
// before already, which would need to be blocked again now for keeping things in sync
- for stream in [&mut state.video_stream, &mut state.audio_stream]
+ for branch in [state.video_stream.as_mut(), state.audio_stream.as_mut()]
.iter_mut()
- .filter_map(|v| v.as_mut())
+ .flatten()
+ .filter_map(|s| {
+ if fallback_source {
+ s.fallback_branch.as_mut()
+ } else {
+ s.main_branch.as_mut()
+ }
+ })
{
- if stream.source_srcpad.is_some() && stream.source_srcpad_block.is_none() {
- stream.source_srcpad_block = Some(self.add_pad_probe(element, stream));
+ if branch.source_srcpad_block.is_none() {
+ branch.source_srcpad_block = Some(self.add_pad_probe(
+ element,
+ &branch.source_srcpad,
+ &branch.queue_srcpad,
+ fallback_source,
+ ));
}
}
- self.unblock_pads(element, state);
+ self.unblock_pads(element, state, fallback_source);
drop(state_guard);
element.notify("status");
@@ -2046,41 +2717,21 @@ impl FallbackSrc {
src.path_string()
);
- if src == state.source || src.has_as_ancestor(&state.source) {
- self.handle_source_error(element, state, RetryReason::Error);
+ if src == state.source.source || src.has_as_ancestor(&state.source.source) {
+ self.handle_source_error(element, state, RetryReason::Error, false);
drop(state_guard);
element.notify("status");
element.notify("statistics");
return true;
}
- // Check if error is from video fallback input and if so, try another
- // fallback to videotestsrc
- if let Some(ref mut video_stream) = state.video_stream {
- if src == video_stream.fallback_input
- || src.has_as_ancestor(&video_stream.fallback_input)
- {
- gst::debug!(CAT, obj: element, "Got error from video fallback input");
-
- let prev_fallback_uri = video_stream
- .fallback_input
- .property::<Option<String>>("uri");
-
- // This means previously videotestsrc was configured
- // Something went wrong and there is no other way than to error out
- if prev_fallback_uri.is_none() {
- return false;
- }
-
- let fallback_input = &video_stream.fallback_input;
- fallback_input.call_async(|fallback_input| {
- // Re-run video fallback input with videotestsrc
- let _ = fallback_input.set_state(gst::State::Null);
- fallback_input.set_property("uri", None::<&str>);
- let _ = fallback_input.sync_state_with_parent();
- });
-
- return true;
+ // Check if error is from fallback input and if so, use a dummy fallback
+ if let Some(ref source) = state.fallback_source {
+ if src == source.source || src.has_as_ancestor(&source.source) {
+ self.handle_source_error(element, state, RetryReason::Error, true);
+ drop(state_guard);
+ element.notify("status");
+ element.notify("statistics");
}
}
@@ -2099,30 +2750,57 @@ impl FallbackSrc {
element: &super::FallbackSrc,
state: &mut State,
reason: RetryReason,
+ fallback_source: bool,
) {
- gst::debug!(CAT, obj: element, "Handling source error: {:?}", reason);
+ gst::debug!(
+ CAT,
+ obj: element,
+ "Handling source error (fallback: {}): {:?}",
+ fallback_source,
+ reason
+ );
+
+ if fallback_source {
+ state.stats.last_fallback_retry_reason = reason;
+ } else {
+ state.stats.last_retry_reason = reason;
+ }
+
+ let source = if fallback_source {
+ state.fallback_source.as_mut().unwrap()
+ } else {
+ &mut state.source
+ };
- state.stats.last_retry_reason = reason;
- if state.source_pending_restart {
- gst::debug!(CAT, obj: element, "Source is already pending restart");
+ if source.pending_restart {
+ gst::debug!(
+ CAT,
+ obj: element,
+ "{}source is already pending restart",
+ if fallback_source { "fallback " } else { "" }
+ );
return;
}
// Increase retry count only if there was no pending restart
- state.stats.num_retry += 1;
+ if fallback_source {
+ state.stats.num_fallback_retry += 1;
+ } else {
+ state.stats.num_retry += 1;
+ }
// Unschedule pending timeout, we're restarting now
- if let Some(timeout) = state.source_restart_timeout.take() {
+ if let Some(timeout) = source.restart_timeout.take() {
timeout.unschedule();
}
// Prevent state changes from changing the state in an uncoordinated way
- state.source_pending_restart = true;
+ source.pending_restart = true;
// Drop any EOS events from any source pads of the source that might happen because of the
// error. We don't need to remove these pad probes because restarting the source will also
// remove/add the pads again.
- for pad in state.source.src_pads() {
+ for pad in source.source.src_pads() {
pad.add_probe(
gst::PadProbeType::EVENT_DOWNSTREAM,
|_pad, info| match info.data {
@@ -2139,7 +2817,7 @@ impl FallbackSrc {
.unwrap();
}
- let source_weak = state.source.downgrade();
+ let source_weak = source.source.downgrade();
element.call_async(move |element| {
let src = element.imp();
@@ -2152,113 +2830,179 @@ impl FallbackSrc {
// source will deadlock on the probes.
let mut state_guard = src.state.lock();
let state = match &mut *state_guard {
- None
- | Some(State {
- source_pending_restart: false,
+ None => {
+ gst::debug!(
+ CAT,
+ obj: element,
+ "Restarting {}source not needed anymore",
+ if fallback_source { "fallback " } else { "" }
+ );
+ return;
+ }
+ Some(State {
+ source:
+ SourceBin {
+ pending_restart: false,
+ ..
+ },
+ ..
+ }) if !fallback_source => {
+ gst::debug!(
+ CAT,
+ obj: element,
+ "Restarting {}source not needed anymore",
+ if fallback_source { "fallback " } else { "" }
+ );
+ return;
+ }
+ Some(State {
+ fallback_source:
+ Some(SourceBin {
+ pending_restart: false,
+ ..
+ }),
..
- }) => {
- gst::debug!(CAT, obj: element, "Restarting source not needed anymore");
+ }) if fallback_source => {
+ gst::debug!(
+ CAT,
+ obj: element,
+ "Restarting {}source not needed anymore",
+ if fallback_source { "fallback " } else { "" }
+ );
return;
}
Some(state) => state,
};
- for (source_srcpad_name, block) in
- [state.video_stream.as_mut(), state.audio_stream.as_mut()]
- .iter_mut()
- .filter_map(|s| s.as_mut())
- .filter_map(|s| {
- if let Some(block) = s.source_srcpad_block.take() {
- Some((s.source_srcpad.as_ref().map(|pad| pad.name()), block))
- } else {
- None
- }
- })
+ for (source_srcpad, block) in [state.video_stream.as_mut(), state.audio_stream.as_mut()]
+ .iter_mut()
+ .flatten()
+ .filter_map(|s| {
+ if fallback_source {
+ s.fallback_branch.as_mut()
+ } else {
+ s.main_branch.as_mut()
+ }
+ })
+ .filter_map(|branch| {
+ if let Some(block) = branch.source_srcpad_block.take() {
+ Some((&branch.source_srcpad, block))
+ } else {
+ None
+ }
+ })
{
gst::debug!(
CAT,
obj: element,
"Removing pad probe for pad {}",
- source_srcpad_name.as_deref().unwrap_or("UNKNOWN")
+ source_srcpad.name()
);
block.pad.remove_probe(block.probe_id);
}
- let stream_sinkpads = [state.audio_stream.as_ref(), state.video_stream.as_ref()]
+ let switch_sinkpads = [state.audio_stream.as_ref(), state.video_stream.as_ref()]
.into_iter()
.flatten()
- .map(|s| {
- if let Some(ref imagefreeze) = s.imagefreeze {
- imagefreeze.static_pad("sink").unwrap()
+ .filter_map(|s| {
+ if fallback_source {
+ s.fallback_branch.as_ref()
} else {
- s.clocksync_queue.static_pad("sink").unwrap()
+ s.main_branch.as_ref()
}
})
- .collect::<Vec<_>>();
-
- let stream_srcpads = [state.audio_stream.as_ref(), state.video_stream.as_ref()]
- .into_iter()
- .flatten()
- .map(|s| {
- let srcpad = s.srcpad.clone();
- let probe_id = srcpad
- .add_probe(
- gst::PadProbeType::EVENT_DOWNSTREAM | gst::PadProbeType::EVENT_FLUSH,
- move |_pad, info| match info.data {
- Some(gst::PadProbeData::Event(ref ev)) => match ev.view() {
- gst::EventView::FlushStart(_) => gst::PadProbeReturn::Drop,
- gst::EventView::FlushStop(_) => gst::PadProbeReturn::Drop,
- _ => gst::PadProbeReturn::Ok,
- },
- _ => gst::PadProbeReturn::Ok,
- },
- )
- .unwrap();
- (probe_id, srcpad)
- })
+ .map(|branch| branch.switch_pad.clone())
.collect::<Vec<_>>();
drop(state_guard);
gst::debug!(CAT, obj: element, "Flushing source");
- let _ = source.send_event(gst::event::FlushStart::builder().build());
-
- gst::debug!(CAT, obj: element, "Shutting down source");
- let _ = source.set_state(gst::State::Null);
-
- gst::debug!(CAT, obj: element, "Stop flushing downstream of source");
- for pad in stream_sinkpads {
- let _ = pad.send_event(gst::event::FlushStop::builder(true).build());
+ for pad in switch_sinkpads {
+ let _ = pad.push_event(gst::event::FlushStart::builder().build());
+ if let Some(switch) = pad.parent().map(|p| p.downcast::<gst::Element>().unwrap()) {
+ switch.release_request_pad(&pad);
+ }
}
- for (probe_id, pad) in stream_srcpads {
- pad.remove_probe(probe_id);
- }
+ gst::debug!(
+ CAT,
+ obj: element,
+ "Shutting down {}source",
+ if fallback_source { "fallback " } else { "" }
+ );
+ let _ = source.set_state(gst::State::Null);
// Sleep for 1s before retrying
let mut state_guard = src.state.lock();
let state = match &mut *state_guard {
- None
- | Some(State {
- source_pending_restart: false,
+ None => {
+ gst::debug!(
+ CAT,
+ obj: element,
+ "Restarting {}source not needed anymore",
+ if fallback_source { "fallback " } else { "" }
+ );
+ return;
+ }
+ Some(State {
+ source:
+ SourceBin {
+ pending_restart: false,
+ ..
+ },
+ ..
+ }) if !fallback_source => {
+ gst::debug!(
+ CAT,
+ obj: element,
+ "Restarting {}source not needed anymore",
+ if fallback_source { "fallback " } else { "" }
+ );
+ return;
+ }
+ Some(State {
+ fallback_source:
+ Some(SourceBin {
+ pending_restart: false,
+ ..
+ }),
..
- }) => {
- gst::debug!(CAT, obj: element, "Restarting source not needed anymore");
+ }) if fallback_source => {
+ gst::debug!(
+ CAT,
+ obj: element,
+ "Restarting {}source not needed anymore",
+ if fallback_source { "fallback " } else { "" }
+ );
return;
}
Some(state) => state,
};
- for stream in [state.video_stream.as_mut(), state.audio_stream.as_mut()]
+ for branch in [state.video_stream.as_mut(), state.audio_stream.as_mut()]
.iter_mut()
- .filter_map(|s| s.as_mut())
+ .flatten()
+ .filter_map(|s| {
+ if fallback_source {
+ s.fallback_branch.as_mut()
+ } else {
+ s.main_branch.as_mut()
+ }
+ })
{
- stream.source_srcpad_block = None;
- stream.source_srcpad = None;
+ branch.source_srcpad_block = None;
}
gst::debug!(CAT, obj: element, "Waiting for 1s before retrying");
let clock = gst::SystemClock::obtain();
let wait_time = clock.time().unwrap() + gst::ClockTime::SECOND;
- assert!(state.source_pending_restart_timeout.is_none());
+ if fallback_source {
+ assert!(state
+ .fallback_source
+ .as_ref()
+ .map(|s| s.pending_restart_timeout.is_none())
+ .unwrap_or(true));
+ } else {
+ assert!(state.source.pending_restart_timeout.is_none());
+ }
let timeout = clock.new_single_shot_id(wait_time);
let element_weak = element.downgrade();
@@ -2270,55 +3014,100 @@ impl FallbackSrc {
};
gst::debug!(CAT, obj: &element, "Woke up, retrying");
- element.call_async(|element| {
+ element.call_async(move |element| {
let src = element.imp();
let mut state_guard = src.state.lock();
let state = match &mut *state_guard {
- None
- | Some(State {
- source_pending_restart: false,
+ None => {
+ gst::debug!(
+ CAT,
+ obj: element,
+ "Restarting {}source not needed anymore",
+ if fallback_source { "fallback " } else { "" }
+ );
+ return;
+ }
+ Some(State {
+ source:
+ SourceBin {
+ pending_restart: false,
+ ..
+ },
+ ..
+ }) if !fallback_source => {
+ gst::debug!(
+ CAT,
+ obj: element,
+ "Restarting {}source not needed anymore",
+ if fallback_source { "fallback " } else { "" }
+ );
+ return;
+ }
+ Some(State {
+ fallback_source:
+ Some(SourceBin {
+ pending_restart: false,
+ ..
+ }),
..
- }) => {
+ }) if fallback_source => {
gst::debug!(
CAT,
obj: element,
- "Restarting source not needed anymore"
+ "Restarting {}source not needed anymore",
+ if fallback_source { "fallback " } else { "" }
);
return;
}
Some(state) => state,
};
- let (source, old_source) = if let Source::Uri(..) = state.configured_source
- {
- // FIXME: Create a new uridecodebin3 because it currently is not reusable
- // See https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/issues/746
- element.remove(&state.source).unwrap();
+ let (source, old_source) = if !fallback_source {
+ if let Source::Uri(..) = state.configured_source {
+ // FIXME: Create a new uridecodebin3 because it currently is not reusable
+ // See https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/issues/746
+ element.remove(&state.source.source).unwrap();
- let source = src.create_main_input(
- element,
- &state.configured_source,
- state.settings.buffer_duration,
- );
+ let source = src.create_main_input(
+ element,
+ &state.configured_source,
+ state.settings.buffer_duration,
+ );
- (
- source.clone(),
- Some(mem::replace(&mut state.source, source)),
- )
+ (
+ source.source.clone(),
+ Some(mem::replace(&mut state.source, source)),
+ )
+ } else {
+ state.source.pending_restart = false;
+ state.source.pending_restart_timeout = None;
+ state.stats.buffering_percent = 100;
+ state.last_buffering_update = None;
+
+ if let Some(timeout) = state.source.restart_timeout.take() {
+ gst::debug!(CAT, obj: element, "Unscheduling restart timeout");
+ timeout.unschedule();
+ }
+
+ (state.source.source.clone(), None)
+ }
+ } else if let Some(ref mut source) = state.fallback_source {
+ source.pending_restart = false;
+ source.pending_restart_timeout = None;
+ state.stats.fallback_buffering_percent = 100;
+ state.fallback_last_buffering_update = None;
+
+ if let Some(timeout) = source.restart_timeout.take() {
+ gst::debug!(CAT, obj: element, "Unscheduling restart timeout");
+ timeout.unschedule();
+ }
+
+ (source.source.clone(), None)
} else {
- (state.source.clone(), None)
+ return;
};
- state.source_pending_restart = false;
- state.source_pending_restart_timeout = None;
- state.stats.buffering_percent = 100;
- state.last_buffering_update = None;
-
- if let Some(timeout) = state.source_restart_timeout.take() {
- gst::debug!(CAT, obj: element, "Unscheduling restart timeout");
- timeout.unschedule();
- }
drop(state_guard);
if let Some(old_source) = old_source {
@@ -2328,7 +3117,12 @@ impl FallbackSrc {
}
if source.sync_state_with_parent().is_err() {
- gst::error!(CAT, obj: element, "Source failed to change state");
+ gst::error!(
+ CAT,
+ obj: element,
+ "{}source failed to change state",
+ if fallback_source { "fallback " } else { "" }
+ );
let _ = source.set_state(gst::State::Null);
let mut state_guard = src.state.lock();
let state = state_guard.as_mut().expect("no state");
@@ -2336,23 +3130,39 @@ impl FallbackSrc {
element,
state,
RetryReason::StateChangeFailure,
+ fallback_source,
);
drop(state_guard);
element.notify("statistics");
} else {
let mut state_guard = src.state.lock();
let state = state_guard.as_mut().expect("no state");
- assert!(state.source_restart_timeout.is_none());
+ if fallback_source {
+ assert!(state
+ .fallback_source
+ .as_ref()
+ .map(|s| s.restart_timeout.is_none())
+ .unwrap_or(true));
+ } else {
+ assert!(state.source.restart_timeout.is_none());
+ }
src.schedule_source_restart_timeout(
element,
state,
gst::ClockTime::ZERO,
+ fallback_source,
);
}
});
})
.expect("Failed to wait async");
- state.source_pending_restart_timeout = Some(timeout);
+ if fallback_source {
+ if let Some(ref mut source) = state.fallback_source {
+ source.pending_restart_timeout = Some(timeout);
+ }
+ } else {
+ state.source.pending_restart_timeout = Some(timeout);
+ }
});
}
@@ -2362,26 +3172,48 @@ impl FallbackSrc {
element: &super::FallbackSrc,
state: &mut State,
elapsed: gst::ClockTime,
+ fallback_source: bool,
) {
- if state.source_pending_restart {
+ if fallback_source {
+ gst::fixme!(
+ CAT,
+ obj: element,
+ "Restart timeout not implemented for fallback source"
+ );
+ return;
+ }
+
+ let source = if fallback_source {
+ if let Some(ref mut source) = state.fallback_source {
+ source
+ } else {
+ return;
+ }
+ } else {
+ &mut state.source
+ };
+
+ if source.pending_restart {
gst::debug!(
CAT,
obj: element,
- "Not scheduling source restart timeout because source is pending restart already",
+ "Not scheduling {}source restart timeout because source is pending restart already",
+ if fallback_source { "fallback " } else { "" },
);
return;
}
- if state.is_image {
+ if source.is_image {
gst::debug!(
CAT,
obj: element,
- "Not scheduling source restart timeout because we are playing back an image",
+ "Not scheduling {}source restart timeout because we are playing back an image",
+ if fallback_source { "fallback " } else { "" },
);
return;
}
- if state.manually_blocked {
+ if !fallback_source && state.manually_blocked {
gst::debug!(
CAT,
obj: element,
@@ -2395,7 +3227,8 @@ impl FallbackSrc {
gst::debug!(
CAT,
obj: element,
- "Scheduling source restart timeout for {}",
+ "Scheduling {}source restart timeout for {}",
+ if fallback_source { "fallback " } else { "" },
wait_time,
);
@@ -2411,57 +3244,108 @@ impl FallbackSrc {
element.call_async(move |element| {
let src = element.imp();
- gst::debug!(CAT, obj: element, "Source restart timeout triggered");
+ gst::debug!(
+ CAT,
+ obj: element,
+ "{}source restart timeout triggered",
+ if fallback_source { "fallback " } else { "" }
+ );
let mut state_guard = src.state.lock();
let state = match &mut *state_guard {
None => {
- gst::debug!(CAT, obj: element, "Restarting source not needed anymore");
+ gst::debug!(
+ CAT,
+ obj: element,
+ "Restarting {}source not needed anymore",
+ if fallback_source { "fallback " } else { "" }
+ );
return;
}
Some(state) => state,
};
- state.source_restart_timeout = None;
+ let source = if fallback_source {
+ if let Some(ref mut source) = state.fallback_source {
+ source
+ } else {
+ return;
+ }
+ } else {
+ &mut state.source
+ };
+
+ source.restart_timeout = None;
// If we have the fallback activated then restart the source now.
- if src.have_fallback_activated(element, state) {
+ if fallback_source || src.have_fallback_activated(element, state) {
+ let (last_buffering_update, buffering_percent) = if fallback_source {
+ (
+ state.fallback_last_buffering_update,
+ state.stats.fallback_buffering_percent,
+ )
+ } else {
+ (state.last_buffering_update, state.stats.buffering_percent)
+ };
// If we're not actively buffering right now let's restart the source
- if state
- .last_buffering_update
+ if last_buffering_update
.map(|i| i.elapsed() >= state.settings.restart_timeout.into())
- .unwrap_or(state.stats.buffering_percent == 100)
+ .unwrap_or(buffering_percent == 100)
{
- gst::debug!(CAT, obj: element, "Not buffering, restarting source");
+ gst::debug!(
+ CAT,
+ obj: element,
+ "Not buffering, restarting {}source",
+ if fallback_source { "fallback " } else { "" }
+ );
- src.handle_source_error(element, state, RetryReason::Timeout);
+ src.handle_source_error(
+ element,
+ state,
+ RetryReason::Timeout,
+ fallback_source,
+ );
drop(state_guard);
element.notify("statistics");
} else {
- gst::debug!(CAT, obj: element, "Buffering, restarting source later");
- let elapsed = state
- .last_buffering_update
+ gst::debug!(
+ CAT,
+ obj: element,
+ "Buffering, restarting {}source later",
+ if fallback_source { "fallback " } else { "" }
+ );
+ let elapsed = last_buffering_update
.and_then(|last_buffering_update| {
gst::ClockTime::try_from(last_buffering_update.elapsed()).ok()
})
.unwrap_or(gst::ClockTime::ZERO);
- src.schedule_source_restart_timeout(element, state, elapsed);
+ src.schedule_source_restart_timeout(
+ element,
+ state,
+ elapsed,
+ fallback_source,
+ );
}
} else {
- gst::debug!(CAT, obj: element, "Restarting source not needed anymore");
+ gst::debug!(
+ CAT,
+ obj: element,
+ "Restarting {}source not needed anymore",
+ if fallback_source { "fallback " } else { "" }
+ );
}
});
})
.expect("Failed to wait async");
- state.source_restart_timeout = Some(timeout);
+ source.restart_timeout = Some(timeout);
}
#[allow(clippy::blocks_in_if_conditions)]
fn have_fallback_activated(&self, _element: &super::FallbackSrc, state: &State) -> bool {
let mut have_audio = false;
let mut have_video = false;
- if let Some(ref streams) = state.streams {
+ if let Some(ref streams) = state.source.streams {
for stream in streams.iter() {
have_audio = have_audio || stream.stream_type().contains(gst::StreamType::AUDIO);
have_video = have_video || stream.stream_type().contains(gst::StreamType::VIDEO);
@@ -2489,7 +3373,7 @@ impl FallbackSrc {
.unwrap_or(true))
}
- fn handle_switch_active_pad_change(&self, element: &super::FallbackSrc) {
+ fn handle_switch_active_pad_change(&self, element: &super::FallbackSrc, is_audio: bool) {
let mut state_guard = self.state.lock();
let state = match &mut *state_guard {
None => {
@@ -2501,18 +3385,28 @@ impl FallbackSrc {
// If we have the fallback activated then start the retry timeout unless it was started
// already. Otherwise cancel the retry timeout.
if self.have_fallback_activated(element, state) {
- gst::warning!(CAT, obj: element, "Switched to fallback stream");
- if state.source_restart_timeout.is_none() {
- self.schedule_source_restart_timeout(element, state, gst::ClockTime::ZERO);
+ gst::warning!(
+ CAT,
+ obj: element,
+ "Switched to {} fallback stream",
+ if is_audio { "audio" } else { "video " }
+ );
+ if state.source.restart_timeout.is_none() {
+ self.schedule_source_restart_timeout(element, state, gst::ClockTime::ZERO, false);
}
} else {
- gst::debug!(CAT, obj: element, "Switched to main stream");
- if let Some(timeout) = state.source_retry_timeout.take() {
+ gst::debug!(
+ CAT,
+ obj: element,
+ "Switched to {} main stream",
+ if is_audio { "audio" } else { "video" }
+ );
+ if let Some(timeout) = state.source.retry_timeout.take() {
gst::debug!(CAT, obj: element, "Unscheduling retry timeout");
timeout.unschedule();
}
- if let Some(timeout) = state.source_restart_timeout.take() {
+ if let Some(timeout) = state.source.restart_timeout.take() {
gst::debug!(CAT, obj: element, "Unscheduling restart timeout");
timeout.unschedule();
}
diff --git a/utils/fallbackswitch/src/fallbacksrc/mod.rs b/utils/fallbackswitch/src/fallbacksrc/mod.rs
index 7bb80af86..bbc2380a9 100644
--- a/utils/fallbackswitch/src/fallbacksrc/mod.rs
+++ b/utils/fallbackswitch/src/fallbacksrc/mod.rs
@@ -11,7 +11,6 @@ use gst::prelude::*;
mod custom_source;
mod imp;
-mod video_fallback;
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)]
#[repr(u32)]
diff --git a/utils/fallbackswitch/src/fallbacksrc/video_fallback/imp.rs b/utils/fallbackswitch/src/fallbacksrc/video_fallback/imp.rs
deleted file mode 100644
index 30eec5b8a..000000000
--- a/utils/fallbackswitch/src/fallbacksrc/video_fallback/imp.rs
+++ /dev/null
@@ -1,464 +0,0 @@
-// Copyright (C) 2020 Sebastian Dröge <sebastian@centricular.com>
-// Copyright (C) 2020 Seungha Yang <seungha@centricular.com>
-//
-// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
-// If a copy of the MPL was not distributed with this file, You can obtain one at
-// <https://mozilla.org/MPL/2.0/>.
-//
-// SPDX-License-Identifier: MPL-2.0
-
-use gst::glib;
-use gst::prelude::*;
-use gst::subclass::prelude::*;
-
-use std::sync::{atomic::AtomicBool, atomic::Ordering, Mutex};
-
-use once_cell::sync::Lazy;
-
-static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
- gst::DebugCategory::new(
- "fallbacksrc-video-source",
- gst::DebugColorFlags::empty(),
- Some("Fallback Video Source Bin"),
- )
-});
-
-#[derive(Debug, Clone)]
-struct Settings {
- uri: Option<String>,
- min_latency: gst::ClockTime,
-}
-
-impl Default for Settings {
- fn default() -> Self {
- Settings {
- uri: None,
- min_latency: gst::ClockTime::ZERO,
- }
- }
-}
-
-struct State {
- source: gst::Element,
-}
-
-pub struct VideoFallbackSource {
- srcpad: gst::GhostPad,
- got_error: AtomicBool,
-
- state: Mutex<Option<State>>,
- settings: Mutex<Settings>,
-}
-
-#[glib::object_subclass]
-impl ObjectSubclass for VideoFallbackSource {
- const NAME: &'static str = "FallbackSrcVideoFallbackSource";
- type Type = super::VideoFallbackSource;
- type ParentType = gst::Bin;
-
- fn with_class(klass: &Self::Class) -> Self {
- let templ = klass.pad_template("src").unwrap();
- let srcpad = gst::GhostPad::builder_with_template(&templ, Some(&templ.name())).build();
-
- Self {
- srcpad,
- got_error: AtomicBool::new(false),
- state: Mutex::new(None),
- settings: Mutex::new(Settings::default()),
- }
- }
-}
-
-impl ObjectImpl for VideoFallbackSource {
- fn properties() -> &'static [glib::ParamSpec] {
- static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
- vec![
- glib::ParamSpecString::builder("uri")
- .nick("URI")
- .blurb("URI to use for video in case the main stream doesn't work")
- .build(),
- glib::ParamSpecUInt64::builder("min-latency")
- .nick("Minimum Latency")
- .blurb("Minimum Latency")
- .build(),
- ]
- });
-
- PROPERTIES.as_ref()
- }
-
- fn set_property(
- &self,
- obj: &Self::Type,
- _id: usize,
- value: &glib::Value,
- pspec: &glib::ParamSpec,
- ) {
- match pspec.name() {
- "uri" => {
- let mut settings = self.settings.lock().unwrap();
- let new_value = value.get().expect("type checked upstream");
- gst::info!(
- CAT,
- obj: obj,
- "Changing URI from {:?} to {:?}",
- settings.uri,
- new_value,
- );
- settings.uri = new_value;
- }
- "min-latency" => {
- let mut settings = self.settings.lock().unwrap();
- let new_value = value.get().expect("type checked upstream");
- gst::info!(
- CAT,
- obj: obj,
- "Changing Minimum Latency from {} to {}",
- settings.min_latency,
- new_value,
- );
- settings.min_latency = new_value;
- }
- _ => unreachable!(),
- }
- }
-
- fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
- match pspec.name() {
- "uri" => {
- let settings = self.settings.lock().unwrap();
- settings.uri.to_value()
- }
- "min-latency" => {
- let settings = self.settings.lock().unwrap();
- settings.min_latency.to_value()
- }
- _ => unimplemented!(),
- }
- }
-
- fn constructed(&self, obj: &Self::Type) {
- self.parent_constructed(obj);
-
- obj.set_suppressed_flags(gst::ElementFlags::SOURCE | gst::ElementFlags::SINK);
- obj.set_element_flags(gst::ElementFlags::SOURCE);
- obj.add_pad(&self.srcpad).unwrap();
- }
-}
-
-impl GstObjectImpl for VideoFallbackSource {}
-
-impl ElementImpl for VideoFallbackSource {
- fn pad_templates() -> &'static [gst::PadTemplate] {
- static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
- let src_pad_template = gst::PadTemplate::new(
- "src",
- gst::PadDirection::Src,
- gst::PadPresence::Always,
- &gst::Caps::new_any(),
- )
- .unwrap();
-
- vec![src_pad_template]
- });
-
- PAD_TEMPLATES.as_ref()
- }
-
- #[allow(clippy::single_match)]
- fn change_state(
- &self,
- element: &Self::Type,
- transition: gst::StateChange,
- ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
- match transition {
- gst::StateChange::NullToReady => {
- self.start(element)?;
- }
- _ => (),
- }
-
- self.parent_change_state(element, transition)?;
-
- match transition {
- gst::StateChange::ReadyToNull => {
- self.stop(element);
- }
- _ => (),
- }
-
- Ok(gst::StateChangeSuccess::Success)
- }
-}
-
-impl BinImpl for VideoFallbackSource {
- #[allow(clippy::single_match)]
- fn handle_message(&self, bin: &Self::Type, msg: gst::Message) {
- use gst::MessageView;
-
- match msg.view() {
- MessageView::Error(err) => {
- if self
- .got_error
- .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
- .is_err()
- {
- gst::warning!(CAT, obj: bin, "Got error {:?}", err);
- self.parent_handle_message(bin, msg)
- } else {
- // Suppress error message if we posted error previously.
- // Otherwise parent fallbacksrc would be confused by
- // multiple error message.
- gst::debug!(CAT, obj: bin, "Ignore error {:?}", err);
- }
- }
- _ => self.parent_handle_message(bin, msg),
- }
- }
-}
-
-impl VideoFallbackSource {
- fn file_src_for_uri(
- &self,
- element: &super::VideoFallbackSource,
- uri: Option<&str>,
- ) -> Option<gst::Element> {
- uri?;
-
- let uri = uri.unwrap();
- let filesrc = gst::ElementFactory::make("filesrc", Some("fallback_filesrc"))
- .expect("No filesrc found");
-
- if let Err(err) = filesrc
- .dynamic_cast_ref::<gst::URIHandler>()
- .unwrap()
- .set_uri(uri)
- {
- gst::warning!(CAT, obj: element, "Failed to set URI: {}", err);
- return None;
- }
-
- if filesrc.set_state(gst::State::Ready).is_err() {
- gst::warning!(CAT, obj: element, "Couldn't set state READY");
- let _ = filesrc.set_state(gst::State::Null);
- return None;
- }
-
- // To invoke GstBaseSrc::start() method, activate pad manually.
- // filesrc will check whether given file is readable or not
- // via open() and fstat() in there.
- let pad = filesrc.static_pad("src").unwrap();
- if pad.set_active(true).is_err() {
- gst::warning!(CAT, obj: element, "Couldn't active pad");
- let _ = filesrc.set_state(gst::State::Null);
- return None;
- }
-
- Some(filesrc)
- }
-
- fn create_source(
- &self,
- element: &super::VideoFallbackSource,
- min_latency: gst::ClockTime,
- uri: Option<&str>,
- ) -> gst::Element {
- gst::debug!(CAT, obj: element, "Creating source with uri {:?}", uri);
-
- let source = gst::Bin::new(None);
- let filesrc = self.file_src_for_uri(element, uri);
-
- let srcpad = match filesrc {
- Some(filesrc) => {
- let typefind = gst::ElementFactory::make("typefind", Some("fallback_typefind"))
- .expect("No typefind found");
- let videoconvert =
- gst::ElementFactory::make("videoconvert", Some("fallback_videoconvert"))
- .expect("No videoconvert found");
- let videoscale =
- gst::ElementFactory::make("videoscale", Some("fallback_videoscale"))
- .expect("No videoscale found");
- let imagefreeze =
- gst::ElementFactory::make("imagefreeze", Some("fallback_imagefreeze"))
- .expect("No imagefreeze found");
- let clocksync = gst::ElementFactory::make("clocksync", Some("fallback_clocksync"))
- .or_else(|_| -> Result<_, glib::BoolError> {
- let identity =
- gst::ElementFactory::make("identity", Some("fallback_clocksync"))?;
- identity.set_property("sync", true);
- Ok(identity)
- })
- .expect("No clocksync or identity found");
- let queue = gst::ElementFactory::make("queue", Some("fallback_queue"))
- .expect("No queue found");
- queue.set_properties(&[
- ("max-size-buffers", &0u32),
- ("max-size-bytes", &0u32),
- (
- "max-size-time",
- &min_latency.max(5 * gst::ClockTime::SECOND).nseconds(),
- ),
- ]);
-
- source
- .add_many(&[
- &filesrc,
- &typefind,
- &videoconvert,
- &videoscale,
- &imagefreeze,
- &clocksync,
- &queue,
- ])
- .unwrap();
- gst::Element::link_many(&[&filesrc, &typefind]).unwrap();
- gst::Element::link_many(&[
- &videoconvert,
- &videoscale,
- &imagefreeze,
- &clocksync,
- &queue,
- ])
- .unwrap();
-
- if imagefreeze.try_set_property("is-live", true).is_err() {
- gst::error!(
- CAT,
- obj: element,
- "imagefreeze does not support live mode, this will probably misbehave"
- );
- gst::element_warning!(
- element,
- gst::LibraryError::Settings,
- ["imagefreeze does not support live mode, this will probably misbehave"]
- );
- }
-
- let element_weak = element.downgrade();
- let source_weak = source.downgrade();
- let videoconvert_weak = videoconvert.downgrade();
- typefind.connect("have-type", false, move |args| {
- let typefind = args[0].get::<gst::Element>().unwrap();
- let _probability = args[1].get::<u32>().unwrap();
- let caps = args[2].get::<gst::Caps>().unwrap();
-
- let element = match element_weak.upgrade() {
- Some(element) => element,
- None => return None,
- };
-
- let source = match source_weak.upgrade() {
- Some(element) => element,
- None => return None,
- };
-
- let videoconvert = match videoconvert_weak.upgrade() {
- Some(element) => element,
- None => return None,
- };
-
- let s = caps.structure(0).unwrap();
- let decoder;
- if s.name() == "image/jpeg" {
- decoder = gst::ElementFactory::make("jpegdec", Some("decoder"))
- .expect("jpegdec not found");
- } else if s.name() == "image/png" {
- decoder = gst::ElementFactory::make("pngdec", Some("decoder"))
- .expect("pngdec not found");
- } else {
- gst::error!(CAT, obj: &element, "Unsupported caps {}", caps);
- gst::element_error!(
- element,
- gst::StreamError::Format,
- ["Unsupported caps {}", caps]
- );
- return None;
- }
-
- source.add(&decoder).unwrap();
- decoder.sync_state_with_parent().unwrap();
- if let Err(_err) =
- gst::Element::link_many(&[&typefind, &decoder, &videoconvert])
- {
- gst::error!(CAT, obj: &element, "Can't link fallback image decoder");
- gst::element_error!(
- element,
- gst::StreamError::Format,
- ["Can't link fallback image decoder"]
- );
- return None;
- }
-
- None
- });
-
- queue.static_pad("src").unwrap()
- }
- None => {
- let videotestsrc =
- gst::ElementFactory::make("videotestsrc", Some("fallback_videosrc"))
- .expect("No videotestsrc found");
- source.add_many(&[&videotestsrc]).unwrap();
-
- videotestsrc.set_property_from_str("pattern", "black");
- videotestsrc.set_property("is-live", true);
-
- videotestsrc.static_pad("src").unwrap()
- }
- };
-
- source
- .add_pad(
- &gst::GhostPad::builder(Some("src"), gst::PadDirection::Src)
- .build_with_target(&srcpad)
- .unwrap(),
- )
- .unwrap();
-
- source.upcast()
- }
-
- fn start(
- &self,
- element: &super::VideoFallbackSource,
- ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
- gst::debug!(CAT, obj: element, "Starting");
-
- let mut state_guard = self.state.lock().unwrap();
- if state_guard.is_some() {
- gst::error!(CAT, obj: element, "State struct wasn't cleared");
- return Err(gst::StateChangeError);
- }
-
- let settings = self.settings.lock().unwrap().clone();
- let uri = &settings.uri;
- let source = self.create_source(element, settings.min_latency, uri.as_deref());
-
- element.add(&source).unwrap();
-
- let srcpad = source.static_pad("src").unwrap();
- let _ = self.srcpad.set_target(Some(&srcpad));
-
- *state_guard = Some(State { source });
-
- Ok(gst::StateChangeSuccess::Success)
- }
-
- fn stop(&self, element: &super::VideoFallbackSource) {
- gst::debug!(CAT, obj: element, "Stopping");
-
- let mut state_guard = self.state.lock().unwrap();
- let state = match state_guard.take() {
- Some(state) => state,
- None => return,
- };
-
- drop(state_guard);
-
- let _ = state.source.set_state(gst::State::Null);
- let _ = self.srcpad.set_target(None::<&gst::Pad>);
- element.remove(&state.source).unwrap();
- self.got_error.store(false, Ordering::Relaxed);
- gst::debug!(CAT, obj: element, "Stopped");
- }
-}
diff --git a/utils/fallbackswitch/src/fallbacksrc/video_fallback/mod.rs b/utils/fallbackswitch/src/fallbacksrc/video_fallback/mod.rs
deleted file mode 100644
index 0dad207c4..000000000
--- a/utils/fallbackswitch/src/fallbacksrc/video_fallback/mod.rs
+++ /dev/null
@@ -1,22 +0,0 @@
-// Copyright (C) 2020 Sebastian Dröge <sebastian@centricular.com>
-// Copyright (C) 2020 Seungha Yang <seungha@centricular.com>
-//
-// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
-// If a copy of the MPL was not distributed with this file, You can obtain one at
-// <https://mozilla.org/MPL/2.0/>.
-//
-// SPDX-License-Identifier: MPL-2.0
-
-use gst::glib;
-
-mod imp;
-
-glib::wrapper! {
- pub struct VideoFallbackSource(ObjectSubclass<imp::VideoFallbackSource>) @extends gst::Bin, gst::Element, gst::Object;
-}
-
-impl VideoFallbackSource {
- pub fn new(uri: Option<&str>, min_latency: gst::ClockTime) -> VideoFallbackSource {
- glib::Object::new(&[("uri", &uri), ("min-latency", &min_latency.nseconds())]).unwrap()
- }
-}