diff options
author | Sebastian Dröge <sebastian@centricular.com> | 2022-12-16 17:54:38 +0300 |
---|---|---|
committer | Sebastian Dröge <sebastian@centricular.com> | 2022-12-16 18:43:44 +0300 |
commit | 13b6f8fad4b9412787d6b64fa9f761b7e87dfe35 (patch) | |
tree | 298fbf7bff6bd57152fa2f703ab37a9606bddc72 /mux | |
parent | e344585d998b7d35e61a1a14d51f2fe8c1d8d1b8 (diff) |
fmp4mux: Skip gap buffers earlier to consider them for the sample durations and fragment start durations
Otherwise dropping the gap buffers would offset the timestamps of
following samples.
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1016>
Diffstat (limited to 'mux')
-rw-r--r-- | mux/fmp4/src/fmp4mux/imp.rs | 275 |
1 files changed, 161 insertions, 114 deletions
diff --git a/mux/fmp4/src/fmp4mux/imp.rs b/mux/fmp4/src/fmp4mux/imp.rs index 3351a510..6f4fdf21 100644 --- a/mux/fmp4/src/fmp4mux/imp.rs +++ b/mux/fmp4/src/fmp4mux/imp.rs @@ -179,8 +179,6 @@ struct Gop { /// Earliest PTS buffer position earliest_pts_position: gst::ClockTime, - /// Start DTS buffer position - start_dts_position: Option<gst::ClockTime>, /// Buffer, PTS running time, DTS running time buffers: Vec<GopBuffer>, @@ -763,7 +761,6 @@ impl FMP4Mux { let end_dts = end_dts.map(|v| v.positive().unwrap()); let pts_position = buffer.pts().unwrap(); - let dts_position = buffer.dts(); if !buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) { gst::debug!( @@ -778,11 +775,6 @@ impl FMP4Mux { let gop = Gop { start_pts: pts, start_dts: dts, - start_dts_position: if !delta_frames.requires_dts() { - None - } else { - dts_position - }, earliest_pts: pts, earliest_pts_position: pts_position, final_earliest_pts: !delta_frames.requires_dts(), @@ -1075,43 +1067,6 @@ impl FMP4Mux { assert!(fragment_end_pts.is_some()); - let first_gop = gops.first().unwrap(); - let last_gop = gops.last().unwrap(); - let earliest_pts = first_gop.earliest_pts; - let earliest_pts_position = first_gop.earliest_pts_position; - let start_dts = first_gop.start_dts; - let start_dts_position = first_gop.start_dts_position; - let end_pts = last_gop.end_pts; - let dts_offset = stream.dts_offset; - - if min_earliest_pts.opt_gt(earliest_pts).unwrap_or(true) { - min_earliest_pts = Some(earliest_pts); - } - if min_earliest_pts_position - .opt_gt(earliest_pts_position) - .unwrap_or(true) - { - min_earliest_pts_position = Some(earliest_pts_position); - } - if let Some(start_dts_position) = start_dts_position { - if min_start_dts_position - .opt_gt(start_dts_position) - .unwrap_or(true) - { - min_start_dts_position = Some(start_dts_position); - } - } - - gst::info!( - CAT, - obj: stream.sinkpad, - "Draining {} worth of buffers starting at PTS {} DTS {}, DTS offset {}", - end_pts.saturating_sub(earliest_pts), - earliest_pts, - start_dts.display(), - dts_offset.display(), - ); - if let Some((prev_gop, first_gop)) = Option::zip( stream.queued_gops.iter().find(|gop| gop.final_end_pts), stream.queued_gops.back(), @@ -1133,70 +1088,175 @@ impl FMP4Mux { .unwrap_or(gst::ClockTime::ZERO) ); - let start_time = if !stream.delta_frames.requires_dts() { - earliest_pts - } else { - start_dts.unwrap() - }; + let last_gop = gops.last().unwrap(); + let end_pts = last_gop.end_pts; + let end_dts = last_gop.end_dts; + + // First flatten all GOPs into a single `Vec` + let mut gop_buffers = Vec::with_capacity(gops.iter().map(|g| g.buffers.len()).sum()); + gop_buffers.extend(gops.into_iter().flat_map(|gop| gop.buffers.into_iter())); + + // Then calculate durations for all of the buffers and get rid of any GAP buffers in + // the process. + // Also calculate the earliest PTS / start DTS here, which needs to consider GAP + // buffers too. + let mut buffers = VecDeque::with_capacity(gop_buffers.len()); + let mut earliest_pts = None; + let mut earliest_pts_position = None; + let mut start_dts = None; + let mut start_dts_position = None; + + let mut gop_buffers = gop_buffers.into_iter(); + while let Some(buffer) = gop_buffers.next() { + // If this is a GAP buffer then skip it. Its duration was already considered + // below for the non-GAP buffer preceding it, and if there was none then the + // fragment start would be adjusted accordingly for this stream. + if buffer.buffer.flags().contains(gst::BufferFlags::GAP) + && buffer.buffer.flags().contains(gst::BufferFlags::DROPPABLE) + && buffer.buffer.size() == 0 + { + gst::trace!( + CAT, + obj: stream.sinkpad, + "Skipping gap buffer {buffer:?}", + ); + continue; + } - let mut buffers = VecDeque::with_capacity(gops.iter().map(|g| g.buffers.len()).sum()); + if earliest_pts.map_or(true, |earliest_pts| buffer.pts < earliest_pts) { + earliest_pts = Some(buffer.pts); + } + if earliest_pts_position.map_or(true, |earliest_pts_position| { + buffer.buffer.pts().unwrap() < earliest_pts_position + }) { + earliest_pts_position = Some(buffer.buffer.pts().unwrap()); + } + if stream.delta_frames.requires_dts() && start_dts.is_none() { + start_dts = Some(buffer.dts.unwrap()); + } + if stream.delta_frames.requires_dts() && start_dts_position.is_none() { + start_dts_position = Some(buffer.buffer.dts().unwrap()); + } - for gop in gops { - let mut gop_buffers = gop.buffers.into_iter().peekable(); - while let Some(buffer) = gop_buffers.next() { - let timestamp = if !stream.delta_frames.requires_dts() { - buffer.pts - } else { - buffer.dts.unwrap() - }; + let timestamp = if !stream.delta_frames.requires_dts() { + buffer.pts + } else { + buffer.dts.unwrap() + }; - let end_timestamp = match gop_buffers.peek() { - Some(buffer) => { - if !stream.delta_frames.requires_dts() { - buffer.pts - } else { - buffer.dts.unwrap() - } + // Take as end timestamp the timestamp of the next non-GAP buffer + let end_timestamp = match gop_buffers.as_slice().iter().find(|buf| { + !buf.buffer.flags().contains(gst::BufferFlags::GAP) + || !buf.buffer.flags().contains(gst::BufferFlags::DROPPABLE) + || buf.buffer.size() != 0 + }) { + Some(buffer) => { + if !stream.delta_frames.requires_dts() { + buffer.pts + } else { + buffer.dts.unwrap() } - None => { - if !stream.delta_frames.requires_dts() { - gop.end_pts - } else { - gop.end_dts.unwrap() - } + } + None => { + if !stream.delta_frames.requires_dts() { + end_pts + } else { + end_dts.unwrap() } - }; + } + }; - // Timestamps are enforced to monotonically increase when queueing buffers - let duration = end_timestamp - .checked_sub(timestamp) - .expect("Timestamps going backwards"); + // Timestamps are enforced to monotonically increase when queueing buffers + let duration = end_timestamp + .checked_sub(timestamp) + .expect("Timestamps going backwards"); - let composition_time_offset = if !stream.delta_frames.requires_dts() { - None - } else { - let pts = buffer.pts; - let dts = buffer.dts.unwrap(); - - Some( - i64::try_from( - (gst::Signed::Positive(pts) - gst::Signed::Positive(dts)) - .nseconds(), - ) - .map_err(|_| { - gst::error!(CAT, obj: stream.sinkpad, "Too big PTS/DTS difference"); - gst::FlowError::Error - })?, + let composition_time_offset = if !stream.delta_frames.requires_dts() { + None + } else { + let pts = buffer.pts; + let dts = buffer.dts.unwrap(); + + Some( + i64::try_from( + (gst::Signed::Positive(pts) - gst::Signed::Positive(dts)).nseconds(), ) - }; + .map_err(|_| { + gst::error!(CAT, obj: stream.sinkpad, "Too big PTS/DTS difference"); + gst::FlowError::Error + })?, + ) + }; - buffers.push_back(Buffer { - idx, - buffer: buffer.buffer, - timestamp, - duration, - composition_time_offset, - }); + buffers.push_back(Buffer { + idx, + buffer: buffer.buffer, + timestamp, + duration, + composition_time_offset, + }); + } + + if buffers.is_empty() { + gst::info!( + CAT, + obj: stream.sinkpad, + "Drained only gap buffers", + ); + + drained_streams.push(( + super::FragmentHeaderStream { + caps: stream.caps.clone(), + start_time: None, + delta_frames: stream.delta_frames, + trak_timescale: stream_settings.trak_timescale, + }, + VecDeque::new(), + )); + + continue; + } + + let earliest_pts = earliest_pts.unwrap(); + let earliest_pts_position = earliest_pts_position.unwrap(); + if stream.delta_frames.requires_dts() { + assert!(start_dts.is_some()); + assert!(start_dts_position.is_some()); + } + let start_dts = start_dts; + let start_dts_position = start_dts_position; + + gst::info!( + CAT, + obj: stream.sinkpad, + "Draining {} worth of buffers starting at PTS {} DTS {}, DTS offset {}", + end_pts.saturating_sub(earliest_pts), + earliest_pts, + start_dts.display(), + stream.dts_offset.display(), + ); + + let start_time = if !stream.delta_frames.requires_dts() { + earliest_pts + } else { + start_dts.unwrap() + }; + + if min_earliest_pts.opt_gt(earliest_pts).unwrap_or(true) { + min_earliest_pts = Some(earliest_pts); + } + if min_earliest_pts_position + .opt_gt(earliest_pts_position) + .unwrap_or(true) + { + min_earliest_pts_position = Some(earliest_pts_position); + } + if let Some(start_dts_position) = start_dts_position { + if min_start_dts_position + .opt_gt(start_dts_position) + .unwrap_or(true) + { + min_start_dts_position = Some(start_dts_position); } } @@ -1309,26 +1369,13 @@ impl FMP4Mux { // Collect all buffers and their timing information that are to be drained right now. let ( - mut drained_streams, + drained_streams, min_earliest_pts_position, min_earliest_pts, min_start_dts_position, fragment_end_pts, ) = self.drain_buffers(state, settings, timeout, at_eos)?; - // Remove all GAP buffers before processing them further - for (stream, buffers) in &mut drained_streams { - buffers.retain(|buf| { - !buf.buffer.flags().contains(gst::BufferFlags::GAP) - || !buf.buffer.flags().contains(gst::BufferFlags::DROPPABLE) - || buf.buffer.size() != 0 - }); - - if buffers.is_empty() { - stream.start_time = None; - } - } - // Create header now if it was not created before and return the caps let mut caps = None; if state.stream_header.is_none() { |