Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/sdroege/gst-plugin-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/mux
diff options
context:
space:
mode:
authorSebastian Dröge <sebastian@centricular.com>2022-12-16 17:54:38 +0300
committerSebastian Dröge <sebastian@centricular.com>2022-12-16 18:43:44 +0300
commit13b6f8fad4b9412787d6b64fa9f761b7e87dfe35 (patch)
tree298fbf7bff6bd57152fa2f703ab37a9606bddc72 /mux
parente344585d998b7d35e61a1a14d51f2fe8c1d8d1b8 (diff)
fmp4mux: Skip gap buffers earlier to consider them for the sample durations and fragment start durations
Otherwise dropping the gap buffers would offset the timestamps of following samples. Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1016>
Diffstat (limited to 'mux')
-rw-r--r--mux/fmp4/src/fmp4mux/imp.rs275
1 files changed, 161 insertions, 114 deletions
diff --git a/mux/fmp4/src/fmp4mux/imp.rs b/mux/fmp4/src/fmp4mux/imp.rs
index 3351a510..6f4fdf21 100644
--- a/mux/fmp4/src/fmp4mux/imp.rs
+++ b/mux/fmp4/src/fmp4mux/imp.rs
@@ -179,8 +179,6 @@ struct Gop {
/// Earliest PTS buffer position
earliest_pts_position: gst::ClockTime,
- /// Start DTS buffer position
- start_dts_position: Option<gst::ClockTime>,
/// Buffer, PTS running time, DTS running time
buffers: Vec<GopBuffer>,
@@ -763,7 +761,6 @@ impl FMP4Mux {
let end_dts = end_dts.map(|v| v.positive().unwrap());
let pts_position = buffer.pts().unwrap();
- let dts_position = buffer.dts();
if !buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) {
gst::debug!(
@@ -778,11 +775,6 @@ impl FMP4Mux {
let gop = Gop {
start_pts: pts,
start_dts: dts,
- start_dts_position: if !delta_frames.requires_dts() {
- None
- } else {
- dts_position
- },
earliest_pts: pts,
earliest_pts_position: pts_position,
final_earliest_pts: !delta_frames.requires_dts(),
@@ -1075,43 +1067,6 @@ impl FMP4Mux {
assert!(fragment_end_pts.is_some());
- let first_gop = gops.first().unwrap();
- let last_gop = gops.last().unwrap();
- let earliest_pts = first_gop.earliest_pts;
- let earliest_pts_position = first_gop.earliest_pts_position;
- let start_dts = first_gop.start_dts;
- let start_dts_position = first_gop.start_dts_position;
- let end_pts = last_gop.end_pts;
- let dts_offset = stream.dts_offset;
-
- if min_earliest_pts.opt_gt(earliest_pts).unwrap_or(true) {
- min_earliest_pts = Some(earliest_pts);
- }
- if min_earliest_pts_position
- .opt_gt(earliest_pts_position)
- .unwrap_or(true)
- {
- min_earliest_pts_position = Some(earliest_pts_position);
- }
- if let Some(start_dts_position) = start_dts_position {
- if min_start_dts_position
- .opt_gt(start_dts_position)
- .unwrap_or(true)
- {
- min_start_dts_position = Some(start_dts_position);
- }
- }
-
- gst::info!(
- CAT,
- obj: stream.sinkpad,
- "Draining {} worth of buffers starting at PTS {} DTS {}, DTS offset {}",
- end_pts.saturating_sub(earliest_pts),
- earliest_pts,
- start_dts.display(),
- dts_offset.display(),
- );
-
if let Some((prev_gop, first_gop)) = Option::zip(
stream.queued_gops.iter().find(|gop| gop.final_end_pts),
stream.queued_gops.back(),
@@ -1133,70 +1088,175 @@ impl FMP4Mux {
.unwrap_or(gst::ClockTime::ZERO)
);
- let start_time = if !stream.delta_frames.requires_dts() {
- earliest_pts
- } else {
- start_dts.unwrap()
- };
+ let last_gop = gops.last().unwrap();
+ let end_pts = last_gop.end_pts;
+ let end_dts = last_gop.end_dts;
+
+ // First flatten all GOPs into a single `Vec`
+ let mut gop_buffers = Vec::with_capacity(gops.iter().map(|g| g.buffers.len()).sum());
+ gop_buffers.extend(gops.into_iter().flat_map(|gop| gop.buffers.into_iter()));
+
+ // Then calculate durations for all of the buffers and get rid of any GAP buffers in
+ // the process.
+ // Also calculate the earliest PTS / start DTS here, which needs to consider GAP
+ // buffers too.
+ let mut buffers = VecDeque::with_capacity(gop_buffers.len());
+ let mut earliest_pts = None;
+ let mut earliest_pts_position = None;
+ let mut start_dts = None;
+ let mut start_dts_position = None;
+
+ let mut gop_buffers = gop_buffers.into_iter();
+ while let Some(buffer) = gop_buffers.next() {
+ // If this is a GAP buffer then skip it. Its duration was already considered
+ // below for the non-GAP buffer preceding it, and if there was none then the
+ // fragment start would be adjusted accordingly for this stream.
+ if buffer.buffer.flags().contains(gst::BufferFlags::GAP)
+ && buffer.buffer.flags().contains(gst::BufferFlags::DROPPABLE)
+ && buffer.buffer.size() == 0
+ {
+ gst::trace!(
+ CAT,
+ obj: stream.sinkpad,
+ "Skipping gap buffer {buffer:?}",
+ );
+ continue;
+ }
- let mut buffers = VecDeque::with_capacity(gops.iter().map(|g| g.buffers.len()).sum());
+ if earliest_pts.map_or(true, |earliest_pts| buffer.pts < earliest_pts) {
+ earliest_pts = Some(buffer.pts);
+ }
+ if earliest_pts_position.map_or(true, |earliest_pts_position| {
+ buffer.buffer.pts().unwrap() < earliest_pts_position
+ }) {
+ earliest_pts_position = Some(buffer.buffer.pts().unwrap());
+ }
+ if stream.delta_frames.requires_dts() && start_dts.is_none() {
+ start_dts = Some(buffer.dts.unwrap());
+ }
+ if stream.delta_frames.requires_dts() && start_dts_position.is_none() {
+ start_dts_position = Some(buffer.buffer.dts().unwrap());
+ }
- for gop in gops {
- let mut gop_buffers = gop.buffers.into_iter().peekable();
- while let Some(buffer) = gop_buffers.next() {
- let timestamp = if !stream.delta_frames.requires_dts() {
- buffer.pts
- } else {
- buffer.dts.unwrap()
- };
+ let timestamp = if !stream.delta_frames.requires_dts() {
+ buffer.pts
+ } else {
+ buffer.dts.unwrap()
+ };
- let end_timestamp = match gop_buffers.peek() {
- Some(buffer) => {
- if !stream.delta_frames.requires_dts() {
- buffer.pts
- } else {
- buffer.dts.unwrap()
- }
+ // Take as end timestamp the timestamp of the next non-GAP buffer
+ let end_timestamp = match gop_buffers.as_slice().iter().find(|buf| {
+ !buf.buffer.flags().contains(gst::BufferFlags::GAP)
+ || !buf.buffer.flags().contains(gst::BufferFlags::DROPPABLE)
+ || buf.buffer.size() != 0
+ }) {
+ Some(buffer) => {
+ if !stream.delta_frames.requires_dts() {
+ buffer.pts
+ } else {
+ buffer.dts.unwrap()
}
- None => {
- if !stream.delta_frames.requires_dts() {
- gop.end_pts
- } else {
- gop.end_dts.unwrap()
- }
+ }
+ None => {
+ if !stream.delta_frames.requires_dts() {
+ end_pts
+ } else {
+ end_dts.unwrap()
}
- };
+ }
+ };
- // Timestamps are enforced to monotonically increase when queueing buffers
- let duration = end_timestamp
- .checked_sub(timestamp)
- .expect("Timestamps going backwards");
+ // Timestamps are enforced to monotonically increase when queueing buffers
+ let duration = end_timestamp
+ .checked_sub(timestamp)
+ .expect("Timestamps going backwards");
- let composition_time_offset = if !stream.delta_frames.requires_dts() {
- None
- } else {
- let pts = buffer.pts;
- let dts = buffer.dts.unwrap();
-
- Some(
- i64::try_from(
- (gst::Signed::Positive(pts) - gst::Signed::Positive(dts))
- .nseconds(),
- )
- .map_err(|_| {
- gst::error!(CAT, obj: stream.sinkpad, "Too big PTS/DTS difference");
- gst::FlowError::Error
- })?,
+ let composition_time_offset = if !stream.delta_frames.requires_dts() {
+ None
+ } else {
+ let pts = buffer.pts;
+ let dts = buffer.dts.unwrap();
+
+ Some(
+ i64::try_from(
+ (gst::Signed::Positive(pts) - gst::Signed::Positive(dts)).nseconds(),
)
- };
+ .map_err(|_| {
+ gst::error!(CAT, obj: stream.sinkpad, "Too big PTS/DTS difference");
+ gst::FlowError::Error
+ })?,
+ )
+ };
- buffers.push_back(Buffer {
- idx,
- buffer: buffer.buffer,
- timestamp,
- duration,
- composition_time_offset,
- });
+ buffers.push_back(Buffer {
+ idx,
+ buffer: buffer.buffer,
+ timestamp,
+ duration,
+ composition_time_offset,
+ });
+ }
+
+ if buffers.is_empty() {
+ gst::info!(
+ CAT,
+ obj: stream.sinkpad,
+ "Drained only gap buffers",
+ );
+
+ drained_streams.push((
+ super::FragmentHeaderStream {
+ caps: stream.caps.clone(),
+ start_time: None,
+ delta_frames: stream.delta_frames,
+ trak_timescale: stream_settings.trak_timescale,
+ },
+ VecDeque::new(),
+ ));
+
+ continue;
+ }
+
+ let earliest_pts = earliest_pts.unwrap();
+ let earliest_pts_position = earliest_pts_position.unwrap();
+ if stream.delta_frames.requires_dts() {
+ assert!(start_dts.is_some());
+ assert!(start_dts_position.is_some());
+ }
+ let start_dts = start_dts;
+ let start_dts_position = start_dts_position;
+
+ gst::info!(
+ CAT,
+ obj: stream.sinkpad,
+ "Draining {} worth of buffers starting at PTS {} DTS {}, DTS offset {}",
+ end_pts.saturating_sub(earliest_pts),
+ earliest_pts,
+ start_dts.display(),
+ stream.dts_offset.display(),
+ );
+
+ let start_time = if !stream.delta_frames.requires_dts() {
+ earliest_pts
+ } else {
+ start_dts.unwrap()
+ };
+
+ if min_earliest_pts.opt_gt(earliest_pts).unwrap_or(true) {
+ min_earliest_pts = Some(earliest_pts);
+ }
+ if min_earliest_pts_position
+ .opt_gt(earliest_pts_position)
+ .unwrap_or(true)
+ {
+ min_earliest_pts_position = Some(earliest_pts_position);
+ }
+ if let Some(start_dts_position) = start_dts_position {
+ if min_start_dts_position
+ .opt_gt(start_dts_position)
+ .unwrap_or(true)
+ {
+ min_start_dts_position = Some(start_dts_position);
}
}
@@ -1309,26 +1369,13 @@ impl FMP4Mux {
// Collect all buffers and their timing information that are to be drained right now.
let (
- mut drained_streams,
+ drained_streams,
min_earliest_pts_position,
min_earliest_pts,
min_start_dts_position,
fragment_end_pts,
) = self.drain_buffers(state, settings, timeout, at_eos)?;
- // Remove all GAP buffers before processing them further
- for (stream, buffers) in &mut drained_streams {
- buffers.retain(|buf| {
- !buf.buffer.flags().contains(gst::BufferFlags::GAP)
- || !buf.buffer.flags().contains(gst::BufferFlags::DROPPABLE)
- || buf.buffer.size() != 0
- });
-
- if buffers.is_empty() {
- stream.start_time = None;
- }
- }
-
// Create header now if it was not created before and return the caps
let mut caps = None;
if state.stream_header.is_none() {