Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/utils
diff options
context:
space:
mode:
authorJan Schmidt <jan@centricular.com>2020-09-24 23:09:01 +0300
committerSebastian Dröge <slomo@coaxion.net>2020-12-11 16:02:00 +0300
commitd5b648921cd286094a5a68ec30cbbf81a023bbec (patch)
tree6e1c83d189c68bbc1300c39d677593bb26344c93 /utils
parentc7fe08bf6da46c6e97e5a0741a1917f92d595576 (diff)
fallbackswitch: Add manual stream control mode
Add properties to report and notify on stream health changes, and a mode where the app can control the stream switching by setting the active-pad property manually. This is useful for modifying the policy of fallbackswitch stream choices, and to synchronise switching of multiple fallbackswitches
Diffstat (limited to 'utils')
-rw-r--r--utils/fallbackswitch/src/fallbackswitch/imp.rs646
1 files changed, 497 insertions, 149 deletions
diff --git a/utils/fallbackswitch/src/fallbackswitch/imp.rs b/utils/fallbackswitch/src/fallbackswitch/imp.rs
index 68515ca93..00af5c22e 100644
--- a/utils/fallbackswitch/src/fallbackswitch/imp.rs
+++ b/utils/fallbackswitch/src/fallbackswitch/imp.rs
@@ -36,12 +36,25 @@ use once_cell::sync::Lazy;
use std::sync::{Mutex, RwLock};
+#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, GEnum)]
+#[repr(u32)]
+#[genum(type_name = "GstFallbackSwitchStreamHealth")]
+pub(crate) enum StreamHealth {
+ #[genum(name = "Data flow is inactive or late", nick = "inactive")]
+ Inactive = 0,
+ #[genum(name = "Data is currently flowing in the stream", nick = "present")]
+ Present = 1,
+}
+
pub struct FallbackSwitch {
- sinkpad: gst_base::AggregatorPad,
+ primary_sinkpad: gst_base::AggregatorPad,
+ primary_state: RwLock<PadInputState>,
+
fallback_sinkpad: RwLock<Option<gst_base::AggregatorPad>>,
+ fallback_state: RwLock<PadInputState>,
+
active_sinkpad: Mutex<Option<gst::Pad>>,
output_state: Mutex<OutputState>,
- pad_states: RwLock<PadStates>,
settings: Mutex<Settings>,
}
@@ -53,35 +66,48 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
)
});
-#[derive(Debug)]
-struct OutputState {
+#[derive(Debug, Default)]
+struct PadOutputState {
last_sinkpad_time: gst::ClockTime,
+ stream_health: StreamHealth,
}
-#[derive(Debug, Default)]
-struct PadStates {
- sinkpad: PadState,
- fallback_sinkpad: Option<PadState>,
+#[derive(Debug)]
+struct OutputState {
+ last_output_time: gst::ClockTime,
+ primary: PadOutputState,
+ fallback: PadOutputState,
}
#[derive(Debug, Default)]
-struct PadState {
+struct PadInputState {
caps: Option<gst::Caps>,
audio_info: Option<gst_audio::AudioInfo>,
video_info: Option<gst_video::VideoInfo>,
}
const DEFAULT_TIMEOUT: u64 = 5 * gst::SECOND_VAL;
+const DEFAULT_AUTO_SWITCH: bool = true;
+const DEFAULT_STREAM_HEALTH: StreamHealth = StreamHealth::Inactive;
#[derive(Debug, Clone)]
struct Settings {
timeout: gst::ClockTime,
+ auto_switch: bool,
+}
+
+impl Default for StreamHealth {
+ fn default() -> Self {
+ DEFAULT_STREAM_HEALTH
+ }
}
impl Default for OutputState {
fn default() -> Self {
OutputState {
- last_sinkpad_time: gst::CLOCK_TIME_NONE,
+ last_output_time: gst::CLOCK_TIME_NONE,
+ primary: PadOutputState::default(),
+ fallback: PadOutputState::default(),
}
}
}
@@ -90,11 +116,12 @@ impl Default for Settings {
fn default() -> Self {
Settings {
timeout: DEFAULT_TIMEOUT.into(),
+ auto_switch: DEFAULT_AUTO_SWITCH,
}
}
}
-static PROPERTIES: [subclass::Property; 2] = [
+static PROPERTIES: [subclass::Property; 5] = [
subclass::Property("timeout", |name| {
glib::ParamSpec::uint64(
name,
@@ -110,32 +137,171 @@ static PROPERTIES: [subclass::Property; 2] = [
glib::ParamSpec::object(
name,
"Active Pad",
- "Currently active pad",
+ "Currently active pad. Writes are ignored if auto-switch=true",
gst::Pad::static_type(),
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("auto-switch", |name| {
+ glib::ParamSpec::boolean(
+ name,
+ "Automatically switch pads",
+ "Automatically switch pads (If true, prefer primary sink, otherwise manual selection via the active-pad property)",
+ DEFAULT_AUTO_SWITCH,
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("primary-health", |name| {
+ glib::ParamSpec::enum_(
+ name,
+ "Primary stream state",
+ "Reports the health of the primary stream on the sink pad",
+ StreamHealth::static_type(),
+ DEFAULT_STREAM_HEALTH as i32,
+ glib::ParamFlags::READABLE,
+ )
+ }),
+ subclass::Property("fallback-health", |name| {
+ glib::ParamSpec::enum_(
+ name,
+ "Fallback stream state",
+ "Reports the health of the fallback stream on the fallback_sink pad",
+ StreamHealth::static_type(),
+ DEFAULT_STREAM_HEALTH as i32,
glib::ParamFlags::READABLE,
)
}),
];
impl FallbackSwitch {
+ fn drain_pad_to_time(
+ &self,
+ agg: &super::FallbackSwitch,
+ state: &mut OutputState,
+ pad: &gst_base::AggregatorPad,
+ target_running_time: gst::ClockTime,
+ ) -> Result<(), gst::FlowError> {
+ let segment = pad.get_segment();
+
+ /* No segment yet - no data */
+ if segment.get_format() == gst::Format::Undefined {
+ return Ok(());
+ }
+
+ let segment = segment.downcast::<gst::ClockTime>().map_err(|_| {
+ gst_error!(CAT, obj: agg, "Only TIME segments supported");
+ gst::FlowError::Error
+ })?;
+
+ let mut running_time = gst::ClockTime::none();
+
+ while let Some(buffer) = pad.peek_buffer() {
+ let pts = buffer.get_dts_or_pts();
+ let new_running_time = segment.to_running_time(pts);
+ if pts.is_none() || running_time <= target_running_time {
+ gst_debug!(CAT, obj: agg, "Dropping trailing buffer {:?}", buffer);
+ pad.drop_buffer();
+ running_time = new_running_time;
+ } else {
+ break;
+ }
+ }
+ if running_time != gst::ClockTime::none() {
+ if pad == &self.primary_sinkpad {
+ state.primary.last_sinkpad_time = running_time;
+ } else {
+ state.fallback.last_sinkpad_time = running_time;
+ }
+ }
+ Ok(())
+ }
+
+ fn get_health(
+ &self,
+ state: &OutputState,
+ settings: &Settings,
+ pad: &gst_base::AggregatorPad,
+ cur_running_time: gst::ClockTime,
+ ) -> StreamHealth {
+ let last_sinkpad_time = if pad == &self.primary_sinkpad {
+ state.primary.last_sinkpad_time
+ } else {
+ state.fallback.last_sinkpad_time
+ };
+
+ if last_sinkpad_time == gst::ClockTime::none() {
+ StreamHealth::Inactive
+ } else if cur_running_time != gst::ClockTime::none()
+ && cur_running_time < last_sinkpad_time + settings.timeout
+ {
+ StreamHealth::Present
+ } else {
+ StreamHealth::Inactive
+ }
+ }
+
+ fn check_health_changes(
+ &self,
+ state: &mut OutputState,
+ settings: &Settings,
+ preferred_pad: &gst_base::AggregatorPad,
+ backup_pad: &Option<&gst_base::AggregatorPad>,
+ cur_running_time: gst::ClockTime,
+ ) -> (bool, bool) {
+ let preferred_is_primary = preferred_pad == &self.primary_sinkpad;
+
+ let preferred_health = self.get_health(state, settings, preferred_pad, cur_running_time);
+ let backup_health = if let Some(pad) = backup_pad {
+ self.get_health(state, settings, pad, cur_running_time)
+ } else {
+ StreamHealth::Inactive
+ };
+
+ if preferred_is_primary {
+ let primary_changed = preferred_health != state.primary.stream_health;
+ let fallback_changed = backup_health != state.fallback.stream_health;
+
+ state.primary.stream_health = preferred_health;
+ state.fallback.stream_health = backup_health;
+
+ (primary_changed, fallback_changed)
+ } else {
+ let primary_changed = backup_health != state.primary.stream_health;
+ let fallback_changed = preferred_health != state.fallback.stream_health;
+
+ state.primary.stream_health = backup_health;
+ state.fallback.stream_health = preferred_health;
+
+ (primary_changed, fallback_changed)
+ }
+ }
+
+ #[allow(clippy::too_many_arguments)]
fn handle_main_buffer(
&self,
agg: &super::FallbackSwitch,
state: &mut OutputState,
settings: &Settings,
mut buffer: gst::Buffer,
- fallback_sinkpad: Option<&gst_base::AggregatorPad>,
+ preferred_pad: &gst_base::AggregatorPad,
+ backup_pad: &Option<&gst_base::AggregatorPad>,
+ cur_running_time: gst::ClockTime,
) -> Result<Option<(gst::Buffer, gst::Caps, bool)>, gst::FlowError> {
// If we got a buffer on the sinkpad just handle it
- gst_debug!(CAT, obj: agg, "Got buffer on sinkpad {:?}", buffer);
+ gst_debug!(
+ CAT,
+ obj: agg,
+ "Got buffer on pad {} - {:?}",
+ preferred_pad.get_name(),
+ buffer
+ );
if buffer.get_pts().is_none() {
gst_error!(CAT, obj: agg, "Only buffers with PTS supported");
return Err(gst::FlowError::Error);
}
- let segment = self
- .sinkpad
+ let segment = preferred_pad
.get_segment()
.downcast::<gst::ClockTime>()
.map_err(|_| {
@@ -144,6 +310,7 @@ impl FallbackSwitch {
})?;
let running_time = segment.to_running_time(buffer.get_dts_or_pts());
+
{
// FIXME: This will not work correctly for negative DTS
let buffer = buffer.make_mut();
@@ -151,19 +318,26 @@ impl FallbackSwitch {
buffer.set_dts(segment.to_running_time(buffer.get_dts()));
}
- let is_late = {
- let clock = agg.get_clock();
- let base_time = agg.get_base_time();
+ if preferred_pad == &self.primary_sinkpad {
+ state.primary.last_sinkpad_time = running_time;
+ } else {
+ state.fallback.last_sinkpad_time = running_time;
+ }
- if let Some(clock) = clock {
- let now = clock.get_time();
+ let is_late = {
+ if cur_running_time != gst::ClockTime::none() {
let latency = agg.get_latency();
-
if latency.is_some() {
- let deadline = base_time + running_time + latency + 40 * gst::MSECOND;
+ let deadline = running_time + latency + 40 * gst::MSECOND;
- if now > deadline {
- gst_debug!(CAT, obj: agg, "Buffer is too late: {} > {}", now, deadline);
+ if cur_running_time > deadline {
+ gst_debug!(
+ CAT,
+ obj: agg,
+ "Buffer is too late: {} > {}",
+ cur_running_time,
+ deadline
+ );
true
} else {
false
@@ -176,15 +350,17 @@ impl FallbackSwitch {
}
};
- if state.last_sinkpad_time.is_some()
+ if state.last_output_time.is_some()
&& is_late
- && state.last_sinkpad_time + settings.timeout <= running_time
+ && state.last_output_time + settings.timeout <= running_time
{
+ /* This buffer arrived too late - we either already switched
+ * to the other pad or there's no point outputting this anyway */
gst_debug!(
CAT,
obj: agg,
"Buffer is too late and timeout reached: {} + {} <= {}",
- state.last_sinkpad_time,
+ state.last_output_time,
settings.timeout,
running_time,
);
@@ -193,15 +369,18 @@ impl FallbackSwitch {
}
let mut active_sinkpad = self.active_sinkpad.lock().unwrap();
- let pad_change = active_sinkpad.as_ref() != Some(self.sinkpad.upcast_ref::<gst::Pad>());
+ let pad_change = settings.auto_switch
+ && active_sinkpad.as_ref() != Some(preferred_pad.upcast_ref::<gst::Pad>());
+
if pad_change {
if buffer.get_flags().contains(gst::BufferFlags::DELTA_UNIT) {
gst_info!(
CAT,
obj: agg,
- "Can't change back to sinkpad, waiting for keyframe"
+ "Can't change back to sinkpad {}, waiting for keyframe",
+ preferred_pad.get_name()
);
- self.sinkpad.push_event(
+ preferred_pad.push_event(
gst_video::UpstreamForceKeyUnitEvent::builder()
.all_headers(true)
.build(),
@@ -210,65 +389,41 @@ impl FallbackSwitch {
}
gst_info!(CAT, obj: agg, "Active pad changed to sinkpad");
- *active_sinkpad = Some(self.sinkpad.clone().upcast());
+ *active_sinkpad = Some(preferred_pad.clone().upcast());
}
drop(active_sinkpad);
- if !is_late || state.last_sinkpad_time.is_none() {
- state.last_sinkpad_time = running_time;
+ if !is_late || state.last_output_time.is_none() {
+ state.last_output_time = running_time;
}
+ let active_caps = if preferred_pad == &self.primary_sinkpad {
+ let pad_state = self.primary_state.read().unwrap();
+ pad_state.caps.as_ref().unwrap().clone()
+ } else {
+ let pad_state = self.fallback_state.read().unwrap();
+ pad_state.caps.as_ref().unwrap().clone()
+ };
+
// Drop all older buffers from the fallback sinkpad
- if let Some(fallback_sinkpad) = fallback_sinkpad {
- let segment = fallback_sinkpad.get_segment();
-
- // Might have no segment at all yet
- if segment.get_format() != gst::Format::Undefined {
- let fallback_segment = fallback_sinkpad
- .get_segment()
- .downcast::<gst::ClockTime>()
- .map_err(|_| {
- gst_error!(CAT, obj: agg, "Only TIME segments supported");
- gst::FlowError::Error
- })?;
-
- while let Some(fallback_buffer) = fallback_sinkpad.peek_buffer() {
- let fallback_pts = fallback_buffer.get_dts_or_pts();
- if fallback_pts.is_none()
- || fallback_segment.to_running_time(fallback_pts) <= state.last_sinkpad_time
- {
- gst_debug!(
- CAT,
- obj: agg,
- "Dropping fallback buffer {:?}",
- fallback_buffer
- );
- fallback_sinkpad.drop_buffer();
- } else {
- break;
- }
- }
- }
+ if let Some(backup_pad) = backup_pad {
+ self.drain_pad_to_time(&agg, state, &backup_pad, state.last_output_time)?;
}
- let pad_states = self.pad_states.read().unwrap();
- let active_caps = pad_states.sinkpad.caps.as_ref().unwrap().clone();
- drop(pad_states);
-
Ok(Some((buffer, active_caps, pad_change)))
}
- fn get_fallback_buffer(
+ fn get_backup_buffer(
&self,
agg: &super::FallbackSwitch,
state: &mut OutputState,
settings: &Settings,
- fallback_sinkpad: &gst_base::AggregatorPad,
+ backup_pad: &gst_base::AggregatorPad,
) -> Result<(gst::Buffer, gst::Caps, bool), gst::FlowError> {
// If we have a fallback sinkpad and timeout, try to get a fallback buffer from here
// and drop all too old buffers in the process
loop {
- let mut buffer = fallback_sinkpad
+ let mut buffer = backup_pad
.pop_buffer()
.ok_or(gst_base::AGGREGATOR_FLOW_NEED_DATA)?;
@@ -279,35 +434,40 @@ impl FallbackSwitch {
return Err(gst::FlowError::Error);
}
- let fallback_segment = fallback_sinkpad
+ let backup_segment = backup_pad
.get_segment()
.downcast::<gst::ClockTime>()
.map_err(|_| {
gst_error!(CAT, obj: agg, "Only TIME segments supported");
gst::FlowError::Error
})?;
- let running_time = fallback_segment.to_running_time(buffer.get_dts_or_pts());
+ let running_time = backup_segment.to_running_time(buffer.get_dts_or_pts());
{
// FIXME: This will not work correctly for negative DTS
let buffer = buffer.make_mut();
- buffer.set_pts(fallback_segment.to_running_time(buffer.get_pts()));
- buffer.set_dts(fallback_segment.to_running_time(buffer.get_dts()));
+ buffer.set_pts(backup_segment.to_running_time(buffer.get_pts()));
+ buffer.set_dts(backup_segment.to_running_time(buffer.get_dts()));
}
// If we never had a real buffer, initialize with the running time of the fallback
// sinkpad so that we still output fallback buffers after the timeout
- if state.last_sinkpad_time.is_none() {
- state.last_sinkpad_time = running_time;
+ if state.last_output_time.is_none() {
+ state.last_output_time = running_time;
+ }
+ if backup_pad == &self.primary_sinkpad {
+ state.primary.last_sinkpad_time = running_time;
+ } else {
+ state.fallback.last_sinkpad_time = running_time;
}
// Get the next one if this one is before the timeout
- if state.last_sinkpad_time + settings.timeout > running_time {
+ if state.last_output_time + settings.timeout > running_time {
gst_debug!(
CAT,
obj: agg,
"Timeout not reached yet: {} + {} > {}",
- state.last_sinkpad_time,
+ state.last_output_time,
settings.timeout,
running_time
);
@@ -319,22 +479,23 @@ impl FallbackSwitch {
CAT,
obj: agg,
"Timeout reached: {} + {} <= {}",
- state.last_sinkpad_time,
+ state.last_output_time,
settings.timeout,
running_time
);
let mut active_sinkpad = self.active_sinkpad.lock().unwrap();
- let pad_change =
- active_sinkpad.as_ref() != Some(fallback_sinkpad.upcast_ref::<gst::Pad>());
+ let pad_change = settings.auto_switch
+ && active_sinkpad.as_ref() != Some(backup_pad.upcast_ref::<gst::Pad>());
if pad_change {
if buffer.get_flags().contains(gst::BufferFlags::DELTA_UNIT) {
gst_info!(
CAT,
obj: agg,
- "Can't change to fallback sinkpad yet, waiting for keyframe"
+ "Can't change to sinkpad {} yet, waiting for keyframe",
+ backup_pad.get_name()
);
- fallback_sinkpad.push_event(
+ backup_pad.push_event(
gst_video::UpstreamForceKeyUnitEvent::builder()
.all_headers(true)
.build(),
@@ -343,57 +504,165 @@ impl FallbackSwitch {
}
gst_info!(CAT, obj: agg, "Active pad changed to fallback sinkpad");
- *active_sinkpad = Some(fallback_sinkpad.clone().upcast());
+ *active_sinkpad = Some(backup_pad.clone().upcast());
}
drop(active_sinkpad);
- let pad_states = self.pad_states.read().unwrap();
- let active_caps = match pad_states.fallback_sinkpad {
- None => {
- // This can happen if the pad is removed in the meantime,
- // not a problem really
- return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA);
- }
- Some(ref fallback_sinkpad) => fallback_sinkpad.caps.as_ref().unwrap().clone(),
+ let active_caps = if backup_pad == &self.primary_sinkpad {
+ let pad_state = self.primary_state.read().unwrap();
+ pad_state.caps.as_ref().unwrap().clone()
+ } else {
+ let pad_state = self.fallback_state.read().unwrap();
+ pad_state.caps.as_ref().unwrap().clone()
};
- drop(pad_states);
break Ok((buffer, active_caps, pad_change));
}
}
+ #[allow(clippy::type_complexity)]
fn get_next_buffer(
&self,
agg: &super::FallbackSwitch,
timeout: bool,
- ) -> Result<(gst::Buffer, gst::Caps, bool), gst::FlowError> {
+ ) -> (
+ Result<(gst::Buffer, gst::Caps, bool), gst::FlowError>,
+ (bool, bool),
+ ) {
let settings = self.settings.lock().unwrap().clone();
let mut state = self.output_state.lock().unwrap();
- let fallback_sinkpad = self.fallback_sinkpad.read().unwrap();
gst_debug!(CAT, obj: agg, "Aggregate called: timeout {}", timeout);
- if let Some(buffer) = self.sinkpad.pop_buffer() {
- if let Some(res) = self.handle_main_buffer(
+ if self.primary_sinkpad.is_eos() {
+ gst_log!(CAT, obj: agg, "Sinkpad is EOS");
+ return (Err(gst::FlowError::Eos), (false, false));
+ }
+
+ /* Choose which pad we check first */
+ let active_sinkpad = self.active_sinkpad.lock().unwrap();
+ let prefer_primary = settings.auto_switch
+ || active_sinkpad.is_none()
+ || active_sinkpad.as_ref() == Some(self.primary_sinkpad.upcast_ref::<gst::Pad>());
+ drop(active_sinkpad);
+
+ let fallback_sinkpad = self.fallback_sinkpad.read().unwrap();
+
+ let (preferred_pad, backup_pad) = if prefer_primary {
+ (&self.primary_sinkpad, fallback_sinkpad.as_ref())
+ } else {
+ (
+ fallback_sinkpad.as_ref().unwrap(),
+ Some(&self.primary_sinkpad),
+ )
+ };
+
+ let clock = agg.get_clock();
+ let base_time = agg.get_base_time();
+
+ let cur_running_time = if let Some(clock) = clock {
+ clock.get_time() - base_time
+ } else {
+ gst::ClockTime::none()
+ };
+
+ /* See if there's a buffer on the preferred pad and output that */
+ if let Some(buffer) = preferred_pad.pop_buffer() {
+ match self.handle_main_buffer(
agg,
&mut *state,
&settings,
buffer,
- fallback_sinkpad.as_ref(),
- )? {
- return Ok(res);
+ preferred_pad,
+ &backup_pad,
+ cur_running_time,
+ ) {
+ Ok(Some(res)) => {
+ return (
+ Ok(res),
+ self.check_health_changes(
+ &mut *state,
+ &settings,
+ preferred_pad,
+ &backup_pad,
+ cur_running_time,
+ ),
+ )
+ }
+ Err(e) => {
+ return (
+ Err(e),
+ self.check_health_changes(
+ &mut *state,
+ &settings,
+ preferred_pad,
+ &backup_pad,
+ cur_running_time,
+ ),
+ )
+ }
+ _ => (),
}
- } else if self.sinkpad.is_eos() {
- gst_log!(CAT, obj: agg, "Sinkpad is EOS");
- return Err(gst::FlowError::Eos);
}
- if let (false, Some(_)) = (timeout, &*fallback_sinkpad) {
- gst_debug!(CAT, obj: agg, "Have fallback sinkpad but no timeout yet");
+ /* If we can't auto-switch, then can't fetch anything from the backup pad */
+ if !settings.auto_switch {
+ /* Use a dummy drain_pad_to_time() call to update the last_sinkpad_time */
+ if let Some(backup_pad) = &backup_pad {
+ if let Err(e) = self.drain_pad_to_time(
+ &agg,
+ &mut *state,
+ &backup_pad,
+ gst::ClockTime::from_seconds(0),
+ ) {
+ return (
+ Err(e),
+ self.check_health_changes(
+ &mut *state,
+ &settings,
+ preferred_pad,
+ &Some(backup_pad),
+ cur_running_time,
+ ),
+ );
+ }
+ }
- Err(gst_base::AGGREGATOR_FLOW_NEED_DATA)
- } else if let (true, Some(fallback_sinkpad)) = (timeout, &*fallback_sinkpad) {
- self.get_fallback_buffer(agg, &mut *state, &settings, fallback_sinkpad)
+ return (
+ Err(gst_base::AGGREGATOR_FLOW_NEED_DATA),
+ self.check_health_changes(
+ &mut *state,
+ &settings,
+ preferred_pad,
+ &backup_pad,
+ cur_running_time,
+ ),
+ );
+ }
+
+ if let (false, Some(backup_pad)) = (timeout, &backup_pad) {
+ gst_debug!(CAT, obj: agg, "Have fallback sinkpad but no timeout yet");
+ (
+ Err(gst_base::AGGREGATOR_FLOW_NEED_DATA),
+ self.check_health_changes(
+ &mut *state,
+ &settings,
+ preferred_pad,
+ &Some(backup_pad),
+ cur_running_time,
+ ),
+ )
+ } else if let (true, Some(backup_pad)) = (timeout, &backup_pad) {
+ (
+ self.get_backup_buffer(agg, &mut *state, &settings, backup_pad),
+ self.check_health_changes(
+ &mut *state,
+ &settings,
+ preferred_pad,
+ &Some(backup_pad),
+ cur_running_time,
+ ),
+ )
} else {
// Otherwise there's not much we can do at this point
gst_debug!(
@@ -401,7 +670,16 @@ impl FallbackSwitch {
obj: agg,
"Got no buffer on sinkpad and have no fallback sinkpad"
);
- Err(gst_base::AGGREGATOR_FLOW_NEED_DATA)
+ (
+ Err(gst_base::AGGREGATOR_FLOW_NEED_DATA),
+ self.check_health_changes(
+ &mut *state,
+ &settings,
+ preferred_pad,
+ &backup_pad,
+ cur_running_time,
+ ),
+ )
}
}
}
@@ -421,11 +699,12 @@ impl ObjectSubclass for FallbackSwitch {
gst::PadBuilder::<gst_base::AggregatorPad>::from_template(&templ, Some("sink")).build();
Self {
- sinkpad,
+ primary_sinkpad: sinkpad,
+ primary_state: RwLock::new(PadInputState::default()),
fallback_sinkpad: RwLock::new(None),
+ fallback_state: RwLock::new(PadInputState::default()),
active_sinkpad: Mutex::new(None),
output_state: Mutex::new(OutputState::default()),
- pad_states: RwLock::new(PadStates::default()),
settings: Mutex::new(Settings::default()),
}
}
@@ -477,7 +756,7 @@ impl ObjectImpl for FallbackSwitch {
fn constructed(&self, obj: &Self::Type) {
self.parent_constructed(obj);
- obj.add_pad(&self.sinkpad).unwrap();
+ obj.add_pad(&self.primary_sinkpad).unwrap();
}
fn set_property(&self, obj: &Self::Type, id: usize, value: &glib::Value) {
@@ -497,6 +776,29 @@ impl ObjectImpl for FallbackSwitch {
settings.timeout = timeout;
drop(settings);
}
+ subclass::Property("active-pad", ..) => {
+ let settings = self.settings.lock().unwrap();
+ if settings.auto_switch {
+ gst_warning!(
+ CAT,
+ obj: obj,
+ "active-pad property setting ignored, because auto-switch=true"
+ );
+ } else {
+ let active_pad = value.get::<gst::Pad>().expect("type checked upstream");
+ /* Trigger a pad switch if needed */
+ let mut cur_active_pad = self.active_sinkpad.lock().unwrap();
+ if *cur_active_pad != active_pad {
+ *cur_active_pad = active_pad;
+ }
+ drop(cur_active_pad);
+ }
+ drop(settings);
+ }
+ subclass::Property("auto-switch", ..) => {
+ let mut settings = self.settings.lock().unwrap();
+ settings.auto_switch = value.get_some().expect("type checked upstream");
+ }
_ => unimplemented!(),
}
}
@@ -513,6 +815,18 @@ impl ObjectImpl for FallbackSwitch {
let active_pad = self.active_sinkpad.lock().unwrap().clone();
active_pad.to_value()
}
+ subclass::Property("auto-switch", ..) => {
+ let settings = self.settings.lock().unwrap();
+ Ok(settings.auto_switch.to_value())
+ }
+ subclass::Property("primary-health", ..) => {
+ let state = self.output_state.lock().unwrap();
+ Ok(state.primary.stream_health.to_value())
+ }
+ subclass::Property("fallback-health", ..) => {
+ let state = self.output_state.lock().unwrap();
+ Ok(state.fallback.stream_health.to_value())
+ }
_ => unimplemented!(),
}
}
@@ -545,9 +859,14 @@ impl ElementImpl for FallbackSwitch {
Some("fallback_sink"),
)
.build();
+
*fallback_sinkpad = Some(sinkpad.clone());
drop(fallback_sinkpad);
+ let mut state = self.output_state.lock().unwrap();
+ state.fallback = PadOutputState::default();
+ drop(state);
+
element.add_pad(&sinkpad).unwrap();
Some(sinkpad.upcast())
@@ -555,12 +874,9 @@ impl ElementImpl for FallbackSwitch {
fn release_pad(&self, element: &Self::Type, pad: &gst::Pad) {
let mut fallback_sinkpad = self.fallback_sinkpad.write().unwrap();
- let mut pad_states = self.pad_states.write().unwrap();
if fallback_sinkpad.as_ref().map(|p| p.upcast_ref()) == Some(pad) {
*fallback_sinkpad = None;
- pad_states.fallback_sinkpad = None;
- drop(pad_states);
drop(fallback_sinkpad);
element.remove_pad(pad).unwrap();
gst_debug!(CAT, obj: element, "Removed fallback sinkpad {:?}", pad);
@@ -571,7 +887,9 @@ impl ElementImpl for FallbackSwitch {
impl AggregatorImpl for FallbackSwitch {
fn start(&self, _agg: &Self::Type) -> Result<(), gst::ErrorMessage> {
*self.output_state.lock().unwrap() = OutputState::default();
- *self.pad_states.write().unwrap() = PadStates::default();
+
+ *self.primary_state.write().unwrap() = PadInputState::default();
+ *self.fallback_state.write().unwrap() = PadInputState::default();
Ok(())
}
@@ -625,19 +943,17 @@ impl AggregatorImpl for FallbackSwitch {
video_info = None;
}
- let new_pad_state = PadState {
+ let new_pad_state = PadInputState {
caps: Some(caps),
audio_info,
video_info,
};
- let mut pad_states = self.pad_states.write().unwrap();
- if agg_pad == &self.sinkpad {
- pad_states.sinkpad = new_pad_state;
+ if agg_pad == &self.primary_sinkpad {
+ *self.primary_state.write().unwrap() = new_pad_state;
} else if Some(agg_pad) == self.fallback_sinkpad.read().unwrap().as_ref() {
- pad_states.fallback_sinkpad = Some(new_pad_state);
+ *self.fallback_state.write().unwrap() = new_pad_state;
}
- drop(pad_states);
self.parent_sink_event(agg, agg_pad, event)
}
@@ -646,22 +962,46 @@ impl AggregatorImpl for FallbackSwitch {
}
fn get_next_time(&self, agg: &Self::Type) -> gst::ClockTime {
- // If we have a buffer on the sinkpad then the timeout is always going to be immediately,
- // i.e. 0. We want to output that buffer immediately, no matter what.
- //
- // Otherwise if we have a fallback sinkpad and it has a buffer, then the timeout is going
- // to be its running time. We will then either output the buffer or drop it, depending on
- // its distance from the last sinkpad time
- if self.sinkpad.peek_buffer().is_some() {
- gst_debug!(CAT, obj: agg, "Have buffer on sinkpad, immediate timeout");
+ /* At each iteration, we have a preferred pad and a backup pad. If autoswitch is true,
+ * the sinkpad is always preferred, otherwise it's the active sinkpad as set by the app.
+ * The backup pad is the other one (may be None if there's no fallback pad yet).
+ *
+ * If we have a buffer on the preferred pad then the timeout is always going to be immediately,
+ * i.e. 0. We want to output that buffer immediately, no matter what.
+ *
+ * Otherwise if we have a backup sinkpad and it has a buffer, then the timeout is going
+ * to be that buffer's running time. We will then either output the buffer or drop it, depending on
+ * its distance from the last output time
+ */
+ let settings = self.settings.lock().unwrap();
+ let active_sinkpad = self.active_sinkpad.lock().unwrap();
+ let fallback_sinkpad = self.fallback_sinkpad.read().unwrap();
+
+ let prefer_primary = settings.auto_switch
+ || active_sinkpad.is_none()
+ || active_sinkpad.as_ref() == Some(self.primary_sinkpad.upcast_ref::<gst::Pad>());
+
+ let (preferred_pad, backup_pad) = if prefer_primary {
+ (&self.primary_sinkpad, fallback_sinkpad.as_ref())
+ } else {
+ (
+ fallback_sinkpad.as_ref().unwrap(),
+ Some(&self.primary_sinkpad),
+ )
+ };
+
+ if preferred_pad.peek_buffer().is_some() {
+ gst_debug!(
+ CAT,
+ obj: agg,
+ "Have buffer on sinkpad {}, immediate timeout",
+ preferred_pad.get_name()
+ );
0.into()
- } else if self.sinkpad.is_eos() {
+ } else if self.primary_sinkpad.is_eos() {
gst_debug!(CAT, obj: agg, "Sinkpad is EOS, immediate timeout");
0.into()
- } else if let Some((buffer, fallback_sinkpad)) = self
- .fallback_sinkpad
- .read()
- .unwrap()
+ } else if let Some((buffer, backup_sinkpad)) = backup_pad
.as_ref()
.and_then(|p| p.peek_buffer().map(|buffer| (buffer, p)))
{
@@ -671,7 +1011,7 @@ impl AggregatorImpl for FallbackSwitch {
return 0.into();
}
- let segment = match fallback_sinkpad.get_segment().downcast::<gst::ClockTime>() {
+ let segment = match backup_sinkpad.get_segment().downcast::<gst::ClockTime>() {
Ok(segment) => segment,
Err(_) => {
gst_error!(CAT, obj: agg, "Only TIME segments supported");
@@ -684,12 +1024,13 @@ impl AggregatorImpl for FallbackSwitch {
gst_debug!(
CAT,
obj: agg,
- "Have buffer on fallback sinkpad, timeout at {}",
+ "Have buffer on {} pad, timeout at {}",
+ backup_sinkpad.get_name(),
running_time
);
running_time
} else {
- gst_debug!(CAT, obj: agg, "Have no buffer at all yet");
+ gst_debug!(CAT, obj: agg, "No buffer available on either input");
gst::CLOCK_TIME_NONE
}
}
@@ -716,15 +1057,13 @@ impl AggregatorImpl for FallbackSwitch {
return Some(buffer);
}
- let pad_states = self.pad_states.read().unwrap();
- let pad_state = if agg_pad == &self.sinkpad {
- &pad_states.sinkpad
+ let primary_state = self.primary_state.read().unwrap();
+ let fallback_state = self.fallback_state.read().unwrap();
+
+ let pad_state = if agg_pad == &self.primary_sinkpad {
+ &primary_state
} else if Some(agg_pad) == self.fallback_sinkpad.read().unwrap().as_ref() {
- if let Some(ref pad_state) = pad_states.fallback_sinkpad {
- pad_state
- } else {
- return Some(buffer);
- }
+ &fallback_state
} else {
unreachable!()
};
@@ -798,7 +1137,17 @@ impl AggregatorImpl for FallbackSwitch {
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst_debug!(CAT, obj: agg, "Aggregate called: timeout {}", timeout);
- let (mut buffer, active_caps, pad_change) = self.get_next_buffer(agg, timeout)?;
+ let (res, (primary_health_change, fallback_health_change)) =
+ self.get_next_buffer(agg, timeout);
+
+ if primary_health_change {
+ agg.notify("primary-health");
+ }
+ if fallback_health_change {
+ agg.notify("fallback-health");
+ }
+
+ let (mut buffer, active_caps, pad_change) = res?;
let current_src_caps = agg.get_static_pad("src").unwrap().get_current_caps();
if Some(&active_caps) != current_src_caps.as_ref() {
@@ -816,7 +1165,6 @@ impl AggregatorImpl for FallbackSwitch {
agg.notify("active-pad");
buffer.make_mut().set_flags(gst::BufferFlags::DISCONT);
}
-
gst_debug!(CAT, obj: agg, "Finishing buffer {:?}", buffer);
agg.finish_buffer(buffer)
}