Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Dröge <sebastian@centricular.com>2022-09-28 19:35:47 +0300
committerSebastian Dröge <sebastian@centricular.com>2022-09-29 10:40:41 +0300
commitb63627025e0dc334d6052aefff564d792d8079f9 (patch)
tree78b4670dce97d154e1d6a1ec01fd48fba1eecc3c /generic
parent4ba4b002353dc0808262296bf5ee4739f0a2512c (diff)
fmp4mux: Split huge drain function into separate functions
Diffstat (limited to 'generic')
-rw-r--r--generic/fmp4/src/fmp4mux/imp.rs794
1 files changed, 436 insertions, 358 deletions
diff --git a/generic/fmp4/src/fmp4mux/imp.rs b/generic/fmp4/src/fmp4mux/imp.rs
index 2e9ff88fb..0b2cf53c9 100644
--- a/generic/fmp4/src/fmp4mux/imp.rs
+++ b/generic/fmp4/src/fmp4mux/imp.rs
@@ -672,30 +672,29 @@ impl FMP4Mux {
))
}
- fn drain(
+ #[allow(clippy::type_complexity)]
+ fn drain_buffers(
&self,
- element: &super::FMP4Mux,
+ _element: &super::FMP4Mux,
state: &mut State,
settings: &Settings,
timeout: bool,
at_eos: bool,
- ) -> Result<(Option<gst::Caps>, Option<gst::BufferList>), gst::FlowError> {
- let class = element.class();
-
- if at_eos {
- gst::info!(CAT, obj: element, "Draining at EOS");
- } else if timeout {
- gst::info!(CAT, obj: element, "Draining at timeout");
- } else {
- for stream in &state.streams {
- if !stream.fragment_filled && !stream.sinkpad.is_eos() {
- return Ok((None, None));
- }
- }
- }
-
- let mut drain_buffers = Vec::with_capacity(state.streams.len());
- let mut streams = Vec::with_capacity(state.streams.len());
+ ) -> Result<
+ (
+ Vec<(
+ gst::Caps,
+ Option<super::FragmentTimingInfo>,
+ VecDeque<Buffer>,
+ )>,
+ Option<gst::ClockTime>,
+ Option<gst::ClockTime>,
+ Option<gst::ClockTime>,
+ Option<gst::ClockTime>,
+ ),
+ gst::FlowError,
+ > {
+ let mut drained_streams = Vec::with_capacity(state.streams.len());
let mut min_earliest_pts_position = None;
let mut min_earliest_pts = None;
@@ -740,192 +739,202 @@ impl FMP4Mux {
"Draining no buffers",
);
- streams.push((stream.caps.clone(), None));
- drain_buffers.push(VecDeque::new());
- } else {
- let first_gop = gops.first().unwrap();
- let last_gop = gops.last().unwrap();
- let earliest_pts = first_gop.earliest_pts;
- let earliest_pts_position = first_gop.earliest_pts_position;
- let start_dts = first_gop.start_dts;
- let start_dts_position = first_gop.start_dts_position;
- let end_pts = last_gop.end_pts;
- let dts_offset = stream.dts_offset;
-
- if min_earliest_pts.opt_gt(earliest_pts).unwrap_or(true) {
- min_earliest_pts = Some(earliest_pts);
- }
- if min_earliest_pts_position
- .opt_gt(earliest_pts_position)
+ drained_streams.push((stream.caps.clone(), None, VecDeque::new()));
+ continue;
+ }
+
+ let first_gop = gops.first().unwrap();
+ let last_gop = gops.last().unwrap();
+ let earliest_pts = first_gop.earliest_pts;
+ let earliest_pts_position = first_gop.earliest_pts_position;
+ let start_dts = first_gop.start_dts;
+ let start_dts_position = first_gop.start_dts_position;
+ let end_pts = last_gop.end_pts;
+ let dts_offset = stream.dts_offset;
+
+ if min_earliest_pts.opt_gt(earliest_pts).unwrap_or(true) {
+ min_earliest_pts = Some(earliest_pts);
+ }
+ if min_earliest_pts_position
+ .opt_gt(earliest_pts_position)
+ .unwrap_or(true)
+ {
+ min_earliest_pts_position = Some(earliest_pts_position);
+ }
+ if let Some(start_dts_position) = start_dts_position {
+ if min_start_dts_position
+ .opt_gt(start_dts_position)
.unwrap_or(true)
{
- min_earliest_pts_position = Some(earliest_pts_position);
- }
- if let Some(start_dts_position) = start_dts_position {
- if min_start_dts_position
- .opt_gt(start_dts_position)
- .unwrap_or(true)
- {
- min_start_dts_position = Some(start_dts_position);
- }
+ min_start_dts_position = Some(start_dts_position);
}
- if max_end_pts.opt_lt(end_pts).unwrap_or(true) {
- max_end_pts = Some(end_pts);
- }
-
- gst::info!(
- CAT,
- obj: &stream.sinkpad,
- "Draining {} worth of buffers starting at PTS {} DTS {}, DTS offset {}",
- end_pts.saturating_sub(earliest_pts),
- earliest_pts,
- start_dts.display(),
- dts_offset.display(),
- );
+ }
+ if max_end_pts.opt_lt(end_pts).unwrap_or(true) {
+ max_end_pts = Some(end_pts);
+ }
- if let Some((prev_gop, first_gop)) = Option::zip(
- stream.queued_gops.iter().find(|gop| gop.final_end_pts),
- stream.queued_gops.back(),
- ) {
- gst::debug!(
- CAT,
- obj: &stream.sinkpad,
- "Queued full GOPs duration updated to {}",
- prev_gop.end_pts.saturating_sub(first_gop.earliest_pts),
- );
- }
+ gst::info!(
+ CAT,
+ obj: &stream.sinkpad,
+ "Draining {} worth of buffers starting at PTS {} DTS {}, DTS offset {}",
+ end_pts.saturating_sub(earliest_pts),
+ earliest_pts,
+ start_dts.display(),
+ dts_offset.display(),
+ );
+ if let Some((prev_gop, first_gop)) = Option::zip(
+ stream.queued_gops.iter().find(|gop| gop.final_end_pts),
+ stream.queued_gops.back(),
+ ) {
gst::debug!(
CAT,
obj: &stream.sinkpad,
- "Queued duration updated to {}",
- Option::zip(stream.queued_gops.front(), stream.queued_gops.back())
- .map(|(end, start)| end.end_pts.saturating_sub(start.start_pts))
- .unwrap_or(gst::ClockTime::ZERO)
+ "Queued full GOPs duration updated to {}",
+ prev_gop.end_pts.saturating_sub(first_gop.earliest_pts),
);
+ }
- let start_time = if stream.intra_only {
- earliest_pts
- } else {
- start_dts.unwrap()
- };
+ gst::debug!(
+ CAT,
+ obj: &stream.sinkpad,
+ "Queued duration updated to {}",
+ Option::zip(stream.queued_gops.front(), stream.queued_gops.back())
+ .map(|(end, start)| end.end_pts.saturating_sub(start.start_pts))
+ .unwrap_or(gst::ClockTime::ZERO)
+ );
- streams.push((
- stream.caps.clone(),
- Some(super::FragmentTimingInfo {
- start_time,
- intra_only: stream.intra_only,
- }),
- ));
-
- let mut buffers =
- VecDeque::with_capacity(gops.iter().map(|g| g.buffers.len()).sum());
-
- for gop in gops {
- let mut gop_buffers = gop.buffers.into_iter().peekable();
- while let Some(buffer) = gop_buffers.next() {
- let timestamp = if stream.intra_only {
- buffer.pts
- } else {
- buffer.dts.unwrap()
- };
+ let start_time = if stream.intra_only {
+ earliest_pts
+ } else {
+ start_dts.unwrap()
+ };
- let end_timestamp = match gop_buffers.peek() {
- Some(buffer) => {
- if stream.intra_only {
- buffer.pts
- } else {
- buffer.dts.unwrap()
- }
+ let mut buffers = VecDeque::with_capacity(gops.iter().map(|g| g.buffers.len()).sum());
+
+ for gop in gops {
+ let mut gop_buffers = gop.buffers.into_iter().peekable();
+ while let Some(buffer) = gop_buffers.next() {
+ let timestamp = if stream.intra_only {
+ buffer.pts
+ } else {
+ buffer.dts.unwrap()
+ };
+
+ let end_timestamp = match gop_buffers.peek() {
+ Some(buffer) => {
+ if stream.intra_only {
+ buffer.pts
+ } else {
+ buffer.dts.unwrap()
}
- None => {
- if stream.intra_only {
- gop.end_pts
- } else {
- gop.end_dts.unwrap()
- }
+ }
+ None => {
+ if stream.intra_only {
+ gop.end_pts
+ } else {
+ gop.end_dts.unwrap()
}
- };
+ }
+ };
- // Timestamps are enforced to monotonically increase when queueing buffers
- let duration = end_timestamp
- .checked_sub(timestamp)
- .expect("Timestamps going backwards");
+ // Timestamps are enforced to monotonically increase when queueing buffers
+ let duration = end_timestamp
+ .checked_sub(timestamp)
+ .expect("Timestamps going backwards");
- let composition_time_offset = if stream.intra_only {
- None
- } else {
- let pts = buffer.pts;
- let dts = buffer.dts.unwrap();
+ let composition_time_offset = if stream.intra_only {
+ None
+ } else {
+ let pts = buffer.pts;
+ let dts = buffer.dts.unwrap();
- if pts > dts {
- Some(
+ if pts > dts {
+ Some(
i64::try_from((pts - dts).nseconds())
.map_err(|_| {
gst::error!(CAT, obj: &stream.sinkpad, "Too big PTS/DTS difference");
gst::FlowError::Error
})?,
)
- } else {
- let diff = i64::try_from((dts - pts).nseconds())
+ } else {
+ let diff = i64::try_from((dts - pts).nseconds())
.map_err(|_| {
gst::error!(CAT, obj: &stream.sinkpad, "Too big PTS/DTS difference");
gst::FlowError::Error
})?;
- Some(-diff)
- }
- };
+ Some(-diff)
+ }
+ };
- buffers.push_back(Buffer {
- idx,
- buffer: buffer.buffer,
- timestamp,
- duration,
- composition_time_offset,
- });
- }
+ buffers.push_back(Buffer {
+ idx,
+ buffer: buffer.buffer,
+ timestamp,
+ duration,
+ composition_time_offset,
+ });
}
- drain_buffers.push(buffers);
}
+
+ drained_streams.push((
+ stream.caps.clone(),
+ Some(super::FragmentTimingInfo {
+ start_time,
+ intra_only: stream.intra_only,
+ }),
+ buffers,
+ ));
}
- // Remove all GAP buffers before processing them further
- for buffers in &mut drain_buffers {
- buffers.retain(|buf| {
- !buf.buffer.flags().contains(gst::BufferFlags::GAP)
- || !buf.buffer.flags().contains(gst::BufferFlags::DROPPABLE)
- || buf.buffer.size() != 0
- });
+ Ok((
+ drained_streams,
+ min_earliest_pts_position,
+ min_earliest_pts,
+ min_start_dts_position,
+ max_end_pts,
+ ))
+ }
+
+ fn preprocess_drained_streams_onvif(
+ &self,
+ element: &super::FMP4Mux,
+ state: &mut State,
+ drained_streams: &mut [(
+ gst::Caps,
+ Option<super::FragmentTimingInfo>,
+ VecDeque<Buffer>,
+ )],
+ ) -> Result<Option<gst::ClockTime>, gst::FlowError> {
+ if element.class().as_ref().variant != super::Variant::ONVIF {
+ return Ok(None);
}
let mut max_end_utc_time = None;
- // For ONVIF, replace all timestamps with timestamps based on UTC times.
- if class.as_ref().variant == super::Variant::ONVIF {
- let calculate_pts = |buffer: &Buffer| -> gst::ClockTime {
- let composition_time_offset = buffer.composition_time_offset.unwrap_or(0);
- if composition_time_offset > 0 {
- buffer.timestamp + gst::ClockTime::from_nseconds(composition_time_offset as u64)
- } else {
- buffer
- .timestamp
- .checked_sub(gst::ClockTime::from_nseconds(
- (-composition_time_offset) as u64,
- ))
- .unwrap()
- }
- };
- // If this is the first fragment then allow the first buffers to not have a reference
- // timestamp meta and backdate them
- if state.stream_header.is_none() {
- for (idx, drain_buffers) in drain_buffers.iter_mut().enumerate() {
- let (buffer_idx, utc_time, buffer) = match drain_buffers
- .iter()
- .enumerate()
- .find_map(|(idx, buffer)| {
- get_utc_time_from_buffer(&buffer.buffer)
- .map(|timestamp| (idx, timestamp, buffer))
- }) {
+ let calculate_pts = |buffer: &Buffer| -> gst::ClockTime {
+ let composition_time_offset = buffer.composition_time_offset.unwrap_or(0);
+ if composition_time_offset > 0 {
+ buffer.timestamp + gst::ClockTime::from_nseconds(composition_time_offset as u64)
+ } else {
+ buffer
+ .timestamp
+ .checked_sub(gst::ClockTime::from_nseconds(
+ (-composition_time_offset) as u64,
+ ))
+ .unwrap()
+ }
+ };
+
+ // If this is the first fragment then allow the first buffers to not have a reference
+ // timestamp meta and backdate them
+ if state.stream_header.is_none() {
+ for (idx, (_, _, drain_buffers)) in drained_streams.iter_mut().enumerate() {
+ let (buffer_idx, utc_time, buffer) =
+ match drain_buffers.iter().enumerate().find_map(|(idx, buffer)| {
+ get_utc_time_from_buffer(&buffer.buffer)
+ .map(|timestamp| (idx, timestamp, buffer))
+ }) {
None => {
gst::error!(
CAT,
@@ -937,82 +946,47 @@ impl FMP4Mux {
Some(res) => res,
};
- // Now do the backdating
- if buffer_idx > 0 {
- let utc_time_pts = calculate_pts(buffer);
+ // Now do the backdating
+ if buffer_idx > 0 {
+ let utc_time_pts = calculate_pts(buffer);
- for buffer in drain_buffers.iter_mut().take(buffer_idx) {
- let buffer_pts = calculate_pts(buffer);
- let buffer_pts_diff = if utc_time_pts >= buffer_pts {
- (utc_time_pts - buffer_pts).nseconds() as i64
- } else {
- -((buffer_pts - utc_time_pts).nseconds() as i64)
- };
- let buffer_utc_time = if buffer_pts_diff >= 0 {
- utc_time
- .checked_sub(gst::ClockTime::from_nseconds(
- buffer_pts_diff as u64,
- ))
- .unwrap()
- } else {
- utc_time
- .checked_add(gst::ClockTime::from_nseconds(
- (-buffer_pts_diff) as u64,
- ))
- .unwrap()
- };
-
- let buffer = buffer.buffer.make_mut();
- gst::ReferenceTimestampMeta::add(
- buffer,
- &UNIX_CAPS,
- buffer_utc_time,
- gst::ClockTime::NONE,
- );
- }
- }
- }
- }
-
- // Calculate the minimum across all streams and remember that
- if state.start_utc_time.is_none() {
- let mut start_utc_time = None;
-
- for (idx, drain_buffers) in drain_buffers.iter().enumerate() {
- for buffer in drain_buffers {
- let utc_time = match get_utc_time_from_buffer(&buffer.buffer) {
- None => {
- gst::error!(
- CAT,
- obj: &state.streams[idx].sinkpad,
- "No reference timestamp set on all buffers"
- );
- return Err(gst::FlowError::Error);
- }
- Some(utc_time) => utc_time,
+ for buffer in drain_buffers.iter_mut().take(buffer_idx) {
+ let buffer_pts = calculate_pts(buffer);
+ let buffer_pts_diff = if utc_time_pts >= buffer_pts {
+ (utc_time_pts - buffer_pts).nseconds() as i64
+ } else {
+ -((buffer_pts - utc_time_pts).nseconds() as i64)
+ };
+ let buffer_utc_time = if buffer_pts_diff >= 0 {
+ utc_time
+ .checked_sub(gst::ClockTime::from_nseconds(buffer_pts_diff as u64))
+ .unwrap()
+ } else {
+ utc_time
+ .checked_add(gst::ClockTime::from_nseconds(
+ (-buffer_pts_diff) as u64,
+ ))
+ .unwrap()
};
- if start_utc_time.is_none() || start_utc_time > Some(utc_time) {
- start_utc_time = Some(utc_time);
- }
+ let buffer = buffer.buffer.make_mut();
+ gst::ReferenceTimestampMeta::add(
+ buffer,
+ &UNIX_CAPS,
+ buffer_utc_time,
+ gst::ClockTime::NONE,
+ );
}
}
-
- gst::debug!(
- CAT,
- obj: element,
- "Configuring start UTC time {}",
- start_utc_time.unwrap()
- );
- state.start_utc_time = start_utc_time;
}
+ }
- // Update all buffer timestamps based on the UTC time and offset to the start UTC time
- let start_utc_time = state.start_utc_time.unwrap();
- for (idx, drain_buffers) in drain_buffers.iter_mut().enumerate() {
- let mut start_time = None;
+ // Calculate the minimum across all streams and remember that
+ if state.start_utc_time.is_none() {
+ let mut start_utc_time = None;
- for buffer in drain_buffers.iter_mut() {
+ for (idx, (_, _, drain_buffers)) in drained_streams.iter().enumerate() {
+ for buffer in drain_buffers {
let utc_time = match get_utc_time_from_buffer(&buffer.buffer) {
None => {
gst::error!(
@@ -1025,150 +999,189 @@ impl FMP4Mux {
Some(utc_time) => utc_time,
};
- // Convert PTS UTC time to DTS
- let mut utc_time_dts =
- if let Some(composition_time_offset) = buffer.composition_time_offset {
- if composition_time_offset >= 0 {
- utc_time
- .checked_sub(gst::ClockTime::from_nseconds(
- composition_time_offset as u64,
- ))
- .unwrap()
- } else {
- utc_time
- .checked_add(gst::ClockTime::from_nseconds(
- (-composition_time_offset) as u64,
- ))
- .unwrap()
- }
- } else {
- utc_time
- };
+ if start_utc_time.is_none() || start_utc_time > Some(utc_time) {
+ start_utc_time = Some(utc_time);
+ }
+ }
+ }
+
+ gst::debug!(
+ CAT,
+ obj: element,
+ "Configuring start UTC time {}",
+ start_utc_time.unwrap()
+ );
+ state.start_utc_time = start_utc_time;
+ }
+
+ // Update all buffer timestamps based on the UTC time and offset to the start UTC time
+ let start_utc_time = state.start_utc_time.unwrap();
+ for (idx, (_, timing_info, drain_buffers)) in drained_streams.iter_mut().enumerate() {
+ let mut start_time = None;
- // Enforce monotonically increasing timestamps
- if utc_time_dts < state.streams[idx].current_utc_time {
- gst::warning!(
+ for buffer in drain_buffers.iter_mut() {
+ let utc_time = match get_utc_time_from_buffer(&buffer.buffer) {
+ None => {
+ gst::error!(
CAT,
obj: &state.streams[idx].sinkpad,
- "Decreasing UTC DTS timestamp for buffer {} < {}",
- utc_time_dts,
- state.streams[idx].current_utc_time,
+ "No reference timestamp set on all buffers"
);
- utc_time_dts = state.streams[idx].current_utc_time;
- } else {
- state.streams[idx].current_utc_time = utc_time_dts;
+ return Err(gst::FlowError::Error);
}
+ Some(utc_time) => utc_time,
+ };
- let timestamp = utc_time_dts.checked_sub(start_utc_time).unwrap();
+ // Convert PTS UTC time to DTS
+ let mut utc_time_dts =
+ if let Some(composition_time_offset) = buffer.composition_time_offset {
+ if composition_time_offset >= 0 {
+ utc_time
+ .checked_sub(gst::ClockTime::from_nseconds(
+ composition_time_offset as u64,
+ ))
+ .unwrap()
+ } else {
+ utc_time
+ .checked_add(gst::ClockTime::from_nseconds(
+ (-composition_time_offset) as u64,
+ ))
+ .unwrap()
+ }
+ } else {
+ utc_time
+ };
- gst::trace!(
+ // Enforce monotonically increasing timestamps
+ if utc_time_dts < state.streams[idx].current_utc_time {
+ gst::warning!(
CAT,
obj: &state.streams[idx].sinkpad,
- "Updating buffer timestamp from {} to relative UTC DTS time {} / absolute DTS time {}, UTC PTS time {}",
- buffer.timestamp,
- timestamp,
+ "Decreasing UTC DTS timestamp for buffer {} < {}",
utc_time_dts,
- utc_time,
+ state.streams[idx].current_utc_time,
);
-
- buffer.timestamp = timestamp;
- if start_time.is_none() || start_time > Some(buffer.timestamp) {
- start_time = Some(buffer.timestamp);
- }
+ utc_time_dts = state.streams[idx].current_utc_time;
+ } else {
+ state.streams[idx].current_utc_time = utc_time_dts;
}
- // Update durations for all buffers except for the last in the fragment unless all
- // have the same duration anyway
- let mut common_duration = Ok(None);
- let mut drain_buffers_iter = drain_buffers.iter_mut().peekable();
- while let Some(buffer) = drain_buffers_iter.next() {
- let next_timestamp = drain_buffers_iter.peek().map(|b| b.timestamp);
-
- if let Some(next_timestamp) = next_timestamp {
- let duration = next_timestamp.saturating_sub(buffer.timestamp);
- if common_duration == Ok(None) {
- common_duration = Ok(Some(duration));
- } else if common_duration != Ok(Some(duration)) {
- common_duration = Err(());
- }
+ let timestamp = utc_time_dts.checked_sub(start_utc_time).unwrap();
- gst::trace!(
- CAT,
- obj: &state.streams[idx].sinkpad,
- "Updating buffer with timestamp {} duration from {} to relative UTC duration {}",
- buffer.timestamp,
- buffer.duration,
- duration,
- );
+ gst::trace!(
+ CAT,
+ obj: &state.streams[idx].sinkpad,
+ "Updating buffer timestamp from {} to relative UTC DTS time {} / absolute DTS time {}, UTC PTS time {}",
+ buffer.timestamp,
+ timestamp,
+ utc_time_dts,
+ utc_time,
+ );
- buffer.duration = duration;
- } else if let Ok(Some(common_duration)) = common_duration {
- gst::trace!(
- CAT,
- obj: &state.streams[idx].sinkpad,
- "Updating last buffer with timestamp {} duration from {} to common relative UTC duration {}",
- buffer.timestamp,
- buffer.duration,
- common_duration,
- );
+ buffer.timestamp = timestamp;
+ if start_time.is_none() || start_time > Some(buffer.timestamp) {
+ start_time = Some(buffer.timestamp);
+ }
+ }
- buffer.duration = common_duration;
- } else {
- gst::trace!(
- CAT,
- obj: &state.streams[idx].sinkpad,
- "Keeping last buffer with timestamp {} duration at {}",
- buffer.timestamp,
- buffer.duration,
- );
+ // Update durations for all buffers except for the last in the fragment unless all
+ // have the same duration anyway
+ let mut common_duration = Ok(None);
+ let mut drain_buffers_iter = drain_buffers.iter_mut().peekable();
+ while let Some(buffer) = drain_buffers_iter.next() {
+ let next_timestamp = drain_buffers_iter.peek().map(|b| b.timestamp);
+
+ if let Some(next_timestamp) = next_timestamp {
+ let duration = next_timestamp.saturating_sub(buffer.timestamp);
+ if common_duration == Ok(None) {
+ common_duration = Ok(Some(duration));
+ } else if common_duration != Ok(Some(duration)) {
+ common_duration = Err(());
}
- let end_utc_time = start_utc_time + buffer.timestamp + buffer.duration;
- if max_end_utc_time.is_none() || max_end_utc_time < Some(end_utc_time) {
- max_end_utc_time = Some(end_utc_time);
- }
- }
+ gst::trace!(
+ CAT,
+ obj: &state.streams[idx].sinkpad,
+ "Updating buffer with timestamp {} duration from {} to relative UTC duration {}",
+ buffer.timestamp,
+ buffer.duration,
+ duration,
+ );
- if let Some(start_time) = start_time {
- gst::debug!(CAT, obj: &state.streams[idx].sinkpad, "Fragment starting at UTC time {}", start_time);
- streams[idx].1.as_mut().unwrap().start_time = start_time;
+ buffer.duration = duration;
+ } else if let Ok(Some(common_duration)) = common_duration {
+ gst::trace!(
+ CAT,
+ obj: &state.streams[idx].sinkpad,
+ "Updating last buffer with timestamp {} duration from {} to common relative UTC duration {}",
+ buffer.timestamp,
+ buffer.duration,
+ common_duration,
+ );
+
+ buffer.duration = common_duration;
} else {
- assert!(streams[idx].1.is_none());
+ gst::trace!(
+ CAT,
+ obj: &state.streams[idx].sinkpad,
+ "Keeping last buffer with timestamp {} duration at {}",
+ buffer.timestamp,
+ buffer.duration,
+ );
+ }
+
+ let end_utc_time = start_utc_time + buffer.timestamp + buffer.duration;
+ if max_end_utc_time.is_none() || max_end_utc_time < Some(end_utc_time) {
+ max_end_utc_time = Some(end_utc_time);
}
}
- }
- // Create header now if it was not created before and return the caps
- let mut caps = None;
- if state.stream_header.is_none() {
- let (_, new_caps) = self
- .update_header(element, state, settings, false)?
- .unwrap();
- caps = Some(new_caps);
+ if let Some(start_time) = start_time {
+ gst::debug!(CAT, obj: &state.streams[idx].sinkpad, "Fragment starting at UTC time {}", start_time);
+ timing_info.as_mut().unwrap().start_time = start_time;
+ } else {
+ assert!(timing_info.is_none());
+ }
}
- // Interleave buffers according to the settings into a single vec
+ Ok(max_end_utc_time)
+ }
+
+ #[allow(clippy::type_complexity)]
+ fn interleave_buffers(
+ &self,
+ _element: &super::FMP4Mux,
+ settings: &Settings,
+ mut drained_streams: Vec<(
+ gst::Caps,
+ Option<super::FragmentTimingInfo>,
+ VecDeque<Buffer>,
+ )>,
+ ) -> Result<
+ (
+ Vec<Buffer>,
+ Vec<(gst::Caps, Option<super::FragmentTimingInfo>)>,
+ ),
+ gst::FlowError,
+ > {
let mut interleaved_buffers =
- Vec::with_capacity(drain_buffers.iter().map(|bs| bs.len()).sum());
- while let Some((_idx, bs)) =
- drain_buffers
- .iter_mut()
- .enumerate()
- .min_by(|(a_idx, a), (b_idx, b)| {
- let (a, b) = match (a.front(), b.front()) {
- (None, None) => return std::cmp::Ordering::Equal,
- (None, _) => return std::cmp::Ordering::Greater,
- (_, None) => return std::cmp::Ordering::Less,
- (Some(a), Some(b)) => (a, b),
- };
+ Vec::with_capacity(drained_streams.iter().map(|(_, _, bufs)| bufs.len()).sum());
+ while let Some((_idx, (_, _, bufs))) = drained_streams.iter_mut().enumerate().min_by(
+ |(a_idx, (_, _, a)), (b_idx, (_, _, b))| {
+ let (a, b) = match (a.front(), b.front()) {
+ (None, None) => return std::cmp::Ordering::Equal,
+ (None, _) => return std::cmp::Ordering::Greater,
+ (_, None) => return std::cmp::Ordering::Less,
+ (Some(a), Some(b)) => (a, b),
+ };
- match a.timestamp.cmp(&b.timestamp) {
- std::cmp::Ordering::Equal => a_idx.cmp(b_idx),
- cmp => cmp,
- }
- })
- {
- let start_time = match bs.front() {
+ match a.timestamp.cmp(&b.timestamp) {
+ std::cmp::Ordering::Equal => a_idx.cmp(b_idx),
+ cmp => cmp,
+ }
+ },
+ ) {
+ let start_time = match bufs.front() {
None => {
// No more buffers now
break;
@@ -1187,7 +1200,7 @@ impl FMP4Mux {
.opt_ge(current_end_time.saturating_sub(start_time))
.unwrap_or(true)
{
- if let Some(buffer) = bs.pop_front() {
+ if let Some(buffer) = bufs.pop_front() {
current_end_time = buffer.timestamp + buffer.duration;
dequeued_bytes += buffer.buffer.size() as u64;
interleaved_buffers.push(buffer);
@@ -1198,13 +1211,78 @@ impl FMP4Mux {
}
}
- assert!(drain_buffers.iter().all(|bs| bs.is_empty()));
+ // All buffers should be consumed now
+ assert!(drained_streams.iter().all(|(_, _, bufs)| bufs.is_empty()));
- let mut buffer_list = None;
+ let streams = drained_streams
+ .into_iter()
+ .map(|(caps, timing_info, _)| (caps, timing_info))
+ .collect::<Vec<_>>();
+
+ Ok((interleaved_buffers, streams))
+ }
+
+ fn drain(
+ &self,
+ element: &super::FMP4Mux,
+ state: &mut State,
+ settings: &Settings,
+ timeout: bool,
+ at_eos: bool,
+ ) -> Result<(Option<gst::Caps>, Option<gst::BufferList>), gst::FlowError> {
+ if at_eos {
+ gst::info!(CAT, obj: element, "Draining at EOS");
+ } else if timeout {
+ gst::info!(CAT, obj: element, "Draining at timeout");
+ } else {
+ for stream in &state.streams {
+ if !stream.fragment_filled && !stream.sinkpad.is_eos() {
+ return Ok((None, None));
+ }
+ }
+ }
+
+ // Collect all buffers and their timing information that are to be drained right now.
+ let (
+ mut drained_streams,
+ min_earliest_pts_position,
+ min_earliest_pts,
+ min_start_dts_position,
+ max_end_pts,
+ ) = self.drain_buffers(element, state, settings, timeout, at_eos)?;
+
+ // For ONVIF, replace all timestamps with timestamps based on UTC times.
+ let max_end_utc_time =
+ self.preprocess_drained_streams_onvif(element, state, &mut drained_streams)?;
+ // Remove all GAP buffers before processing them further
+ for (_, _, buffers) in &mut drained_streams {
+ buffers.retain(|buf| {
+ !buf.buffer.flags().contains(gst::BufferFlags::GAP)
+ || !buf.buffer.flags().contains(gst::BufferFlags::DROPPABLE)
+ || buf.buffer.size() != 0
+ });
+ }
+
+ // Create header now if it was not created before and return the caps
+ let mut caps = None;
+ if state.stream_header.is_none() {
+ let (_, new_caps) = self
+ .update_header(element, state, settings, false)?
+ .unwrap();
+ caps = Some(new_caps);
+ }
+
+ // Interleave buffers according to the settings into a single vec
+ let (mut interleaved_buffers, streams) =
+ self.interleave_buffers(element, settings, drained_streams)?;
+
+ let mut buffer_list = None;
if interleaved_buffers.is_empty() {
assert!(timeout || at_eos);
} else {
+ // If there are actual buffers to output then create headers as needed and create a
+ // bufferlist for all buffers that have to be output.
let min_earliest_pts_position = min_earliest_pts_position.unwrap();
let min_earliest_pts = min_earliest_pts.unwrap();
let max_end_pts = max_end_pts.unwrap();
@@ -1237,7 +1315,7 @@ impl FMP4Mux {
state.sequence_number += 1;
let (mut fmp4_fragment_header, moof_offset) =
boxes::create_fmp4_fragment_header(super::FragmentHeaderConfiguration {
- variant: class.as_ref().variant,
+ variant: element.class().as_ref().variant,
sequence_number,
streams: streams.as_slice(),
buffers: interleaved_buffers.as_slice(),