Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJan Alexander Steffens (heftig) <jan.steffens@ltnglobal.com>2023-10-25 01:38:28 +0300
committerSebastian Dröge <sebastian@centricular.com>2023-11-10 18:47:41 +0300
commit98d839a9205b64af3c3e9618b9c9727527be9441 (patch)
treebc8472d887cdf2e3025982c0f80459a68f001652
parentf565875b6cce646febb25a43c5d900c9e062c244 (diff)
livesync: Handle flags and late buffer patching after queueing
This makes the chain function almost independent of the output state. We still do the early discard check with `buffer_is_backwards` so we don't try to queue buffers we can't use, allowing us to fast-forward upstream without blocking on the src task. Don't accept `LateOverThreshold` buffers when we have `pending_caps` or a `pending_segment`. We need to apply these first before we can sensibly patch buffers from the new stream. Deduplicate most of the output buffer patching code into a new `patch_output_buffer` method. For: https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/450 Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1387>
-rw-r--r--utils/livesync/src/livesync/imp.rs179
1 files changed, 95 insertions, 84 deletions
diff --git a/utils/livesync/src/livesync/imp.rs b/utils/livesync/src/livesync/imp.rs
index b107fcf33..4dbbee5cd 100644
--- a/utils/livesync/src/livesync/imp.rs
+++ b/utils/livesync/src/livesync/imp.rs
@@ -507,6 +507,10 @@ impl State {
// Safe default
.unwrap_or(DEFAULT_DURATION);
}
+
+ fn pending_events(&self) -> bool {
+ self.pending_caps.is_some() || self.pending_segment.is_some()
+ }
}
impl LiveSync {
@@ -916,52 +920,13 @@ impl LiveSync {
buf_mut.set_pts(pts.map(|t| t + state.latency));
}
- if state
- .out_buffer
- .as_ref()
- .map_or(false, |b| b.flags().contains(gst::BufferFlags::GAP))
- {
- // We are done bridging a gap, so mark it as DISCONT instead
- buf_mut.unset_flags(gst::BufferFlags::GAP);
- buf_mut.set_flags(gst::BufferFlags::DISCONT);
- }
-
- let mut timestamp = state.ts_range(buf_mut, segment);
+ let timestamp = state.ts_range(buf_mut, segment);
let lateness = self.buffer_is_backwards(&state, timestamp);
- match lateness {
- BufferLateness::OnTime => {}
-
- BufferLateness::LateUnderThreshold => {
- gst::debug!(CAT, imp: self, "Discarding late {:?}", buf_mut);
- state.num_drop += 1;
- return Ok(gst::FlowSuccess::Ok);
- }
-
- BufferLateness::LateOverThreshold => {
- gst::debug!(CAT, imp: self, "Accepting late {:?}", buf_mut);
- let prev = state.out_buffer.as_ref().unwrap();
- let prev_duration = prev.duration().unwrap();
-
- if let Some(audio_info) = &state.in_audio_info {
- let mut map_info = buf_mut.map_writable().map_err(|e| {
- gst::error!(CAT, imp: self, "Failed to map buffer: {}", e);
- gst::FlowError::Error
- })?;
-
- audio_info
- .format_info()
- .fill_silence(map_info.as_mut_slice());
- } else {
- buf_mut.set_duration(Some(state.fallback_duration));
- }
-
- buf_mut.set_dts(prev.dts().map(|t| t + prev_duration));
- buf_mut.set_pts(prev.pts().map(|t| t + prev_duration));
- buf_mut.set_flags(gst::BufferFlags::GAP);
-
- timestamp = state.ts_range(buf_mut, state.out_segment.as_ref().unwrap());
- }
+ if lateness == BufferLateness::LateUnderThreshold {
+ gst::debug!(CAT, imp: self, "Discarding late {:?}", buf_mut);
+ state.num_drop += 1;
+ return Ok(gst::FlowSuccess::Ok);
}
gst::trace!(CAT, imp: self, "Queueing {:?} ({:?})", buffer, lateness);
@@ -1135,57 +1100,49 @@ impl LiveSync {
let mut segment = None;
match in_buffer {
- Some((buffer, lateness)) => {
+ Some((mut buffer, BufferLateness::OnTime)) => {
state.num_in += 1;
+ if state
+ .out_buffer
+ .as_ref()
+ .map_or(false, |b| b.flags().contains(gst::BufferFlags::GAP))
+ {
+ // We are done bridging a gap, so mark it as DISCONT instead
+ let buf_mut = buffer.make_mut();
+ buf_mut.unset_flags(gst::BufferFlags::GAP);
+ buf_mut.set_flags(gst::BufferFlags::DISCONT);
+ }
+
state.out_buffer = Some(buffer);
state.out_timestamp = state.in_timestamp;
caps = state.pending_caps.take();
segment = state.pending_segment.take();
+ }
- if lateness != BufferLateness::OnTime {
- state.num_duplicate += 1;
- }
+ Some((buffer, BufferLateness::LateOverThreshold)) if !state.pending_events() => {
+ gst::debug!(CAT, imp: self, "Accepting late {:?}", buffer);
+ state.num_in += 1;
+
+ self.patch_output_buffer(&mut state, Some(buffer))?;
}
- None => {
- // Work around borrow checker
- let State {
- fallback_duration,
- out_buffer: ref mut buffer,
- out_audio_info: ref audio_info,
- ..
- } = *state;
- gst::debug!(CAT, imp: self, "Repeating {:?}", buffer);
-
- let buffer = buffer.as_mut().unwrap().make_mut();
- let prev_duration = buffer.duration().unwrap();
-
- if let Some(audio_info) = audio_info {
- if !buffer.flags().contains(gst::BufferFlags::GAP) {
- let mut map_info = buffer.map_writable().map_err(|e| {
- gst::error!(CAT, imp: self, "Failed to map buffer: {}", e);
- gst::FlowError::Error
- })?;
-
- audio_info
- .format_info()
- .fill_silence(map_info.as_mut_slice());
- }
- } else {
- buffer.set_duration(Some(fallback_duration));
- }
- buffer.set_dts(buffer.dts().map(|t| t + prev_duration));
- buffer.set_pts(buffer.pts().map(|t| t + prev_duration));
- buffer.set_flags(gst::BufferFlags::GAP);
- buffer.unset_flags(gst::BufferFlags::DISCONT);
+ Some((buffer, BufferLateness::LateOverThreshold)) => {
+ // Cannot accept late-over-threshold buffers while we have pending events
+ gst::debug!(CAT, imp: self, "Discarding late {:?}", buffer);
+ state.num_drop += 1;
- state.out_timestamp = state.ts_range(
- state.out_buffer.as_ref().unwrap(),
- state.out_segment.as_ref().unwrap(),
- );
- state.num_duplicate += 1;
+ self.patch_output_buffer(&mut state, None)?;
+ }
+
+ None => {
+ self.patch_output_buffer(&mut state, None)?;
+ }
+
+ Some((_, BufferLateness::LateUnderThreshold)) => {
+ // Is discarded before queueing
+ unreachable!();
}
}
@@ -1323,4 +1280,58 @@ impl LiveSync {
details: details
);
}
+
+ /// Patches the output buffer for repeating, setting out_buffer and out_timestamp
+ fn patch_output_buffer(
+ &self,
+ state: &mut State,
+ source: Option<gst::Buffer>,
+ ) -> Result<(), gst::FlowError> {
+ let out_buffer = state.out_buffer.as_mut().unwrap();
+
+ let duration = out_buffer.duration().unwrap();
+ let dts = out_buffer.dts().map(|t| t + duration);
+ let pts = out_buffer.pts().map(|t| t + duration);
+
+ if let Some(source) = source {
+ gst::debug!(
+ CAT,
+ imp: self,
+ "Repeating {:?} using {:?}",
+ out_buffer,
+ source
+ );
+ *out_buffer = source;
+ } else {
+ gst::debug!(CAT, imp: self, "Repeating {:?}", out_buffer);
+ }
+
+ let buffer = out_buffer.make_mut();
+
+ if let Some(audio_info) = &state.out_audio_info {
+ if !buffer.flags().contains(gst::BufferFlags::GAP) {
+ let mut map_info = buffer.map_writable().map_err(|e| {
+ gst::error!(CAT, imp: self, "Failed to map buffer: {}", e);
+ gst::FlowError::Error
+ })?;
+ audio_info
+ .format_info()
+ .fill_silence(map_info.as_mut_slice());
+ }
+ } else {
+ buffer.set_duration(state.fallback_duration);
+ }
+
+ buffer.set_dts(dts);
+ buffer.set_pts(pts);
+ buffer.set_flags(gst::BufferFlags::GAP);
+ buffer.unset_flags(gst::BufferFlags::DISCONT);
+
+ state.out_timestamp = state.ts_range(
+ state.out_buffer.as_ref().unwrap(),
+ state.out_segment.as_ref().unwrap(),
+ );
+ state.num_duplicate += 1;
+ Ok(())
+ }
}