Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/mux
diff options
context:
space:
mode:
authorSebastian Dröge <sebastian@centricular.com>2023-01-30 17:27:46 +0300
committerGStreamer Marge Bot <gitlab-merge-bot@gstreamer-foundation.org>2023-01-30 17:41:33 +0300
commit1998ecab4529ac4afb587da89eea1bda338709ec (patch)
tree08fe2a8c75f2b98c21477a2dc462b9bb9906b58f /mux
parenta1cce9b7965e5b0275ec65fe65685995ce611cc1 (diff)
fmp4mux: Refactor and clean up code
Split many longer functions into multiple functions and simplify various parts. Overall this is functionally still the same as before. Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1068>
Diffstat (limited to 'mux')
-rw-r--r--mux/fmp4/src/fmp4mux/imp.rs1823
1 files changed, 974 insertions, 849 deletions
diff --git a/mux/fmp4/src/fmp4mux/imp.rs b/mux/fmp4/src/fmp4mux/imp.rs
index 88c84b144..27ca82008 100644
--- a/mux/fmp4/src/fmp4mux/imp.rs
+++ b/mux/fmp4/src/fmp4mux/imp.rs
@@ -294,36 +294,37 @@ impl FMP4Mux {
Ok(())
}
+ /// Peek the currently queued buffer on this stream.
+ ///
+ /// This also determines the PTS/DTS that is finally going to be used, including
+ /// timestamp conversion to the UTC times in ONVIF mode.
fn peek_buffer(
&self,
- sinkpad: &super::FMP4MuxPad,
- delta_frames: super::DeltaFrames,
- pre_queue: &mut VecDeque<PreQueuedBuffer>,
- running_time_utc_time_mapping: &mut Option<(gst::Signed<gst::ClockTime>, gst::ClockTime)>,
+ stream: &mut Stream,
fragment_duration: gst::ClockTime,
) -> Result<Option<PreQueuedBuffer>, gst::FlowError> {
// If not in ONVIF mode or the mapping is already known and there is a pre-queued buffer
// then we can directly return it from here.
if self.obj().class().as_ref().variant != super::Variant::ONVIF
- || running_time_utc_time_mapping.is_some()
+ || stream.running_time_utc_time_mapping.is_some()
{
- if let Some(pre_queued_buffer) = pre_queue.front() {
+ if let Some(pre_queued_buffer) = stream.pre_queue.front() {
return Ok(Some(pre_queued_buffer.clone()));
}
}
// Pop buffer here, it will be stored in the pre-queue after calculating its timestamps
- let mut buffer = match sinkpad.pop_buffer() {
+ let mut buffer = match stream.sinkpad.pop_buffer() {
None => return Ok(None),
Some(buffer) => buffer,
};
- Self::check_buffer(&buffer, sinkpad, delta_frames)?;
+ Self::check_buffer(&buffer, &stream.sinkpad, stream.delta_frames)?;
- let segment = match sinkpad.segment().downcast::<gst::ClockTime>().ok() {
+ let segment = match stream.sinkpad.segment().downcast::<gst::ClockTime>().ok() {
Some(segment) => segment,
None => {
- gst::error!(CAT, obj: sinkpad, "Got buffer before segment");
+ gst::error!(CAT, obj: stream.sinkpad, "Got buffer before segment");
return Err(gst::FlowError::Error);
}
};
@@ -335,12 +336,12 @@ impl FMP4Mux {
let pts = segment
.to_running_time_full(pts_position)
.ok_or_else(|| {
- gst::error!(CAT, obj: sinkpad, "Couldn't convert PTS to running time");
+ gst::error!(CAT, obj: stream.sinkpad, "Couldn't convert PTS to running time");
gst::FlowError::Error
})?
.positive()
.unwrap_or_else(|| {
- gst::warning!(CAT, obj: sinkpad, "Negative PTSs are not supported");
+ gst::warning!(CAT, obj: stream.sinkpad, "Negative PTSs are not supported");
gst::ClockTime::ZERO
});
@@ -349,18 +350,18 @@ impl FMP4Mux {
.ok_or_else(|| {
gst::error!(
CAT,
- obj: sinkpad,
+ obj: stream.sinkpad,
"Couldn't convert end PTS to running time"
);
gst::FlowError::Error
})?
.positive()
.unwrap_or_else(|| {
- gst::warning!(CAT, obj: sinkpad, "Negative PTSs are not supported");
+ gst::warning!(CAT, obj: stream.sinkpad, "Negative PTSs are not supported");
gst::ClockTime::ZERO
});
- let (dts, end_dts) = if !delta_frames.requires_dts() {
+ let (dts, end_dts) = if !stream.delta_frames.requires_dts() {
(None, None)
} else {
// Negative DTS are handled via the dts_offset and by having negative composition time
@@ -369,7 +370,7 @@ impl FMP4Mux {
let end_dts_position = duration.opt_add(dts_position).unwrap_or(dts_position);
let dts = segment.to_running_time_full(dts_position).ok_or_else(|| {
- gst::error!(CAT, obj: sinkpad, "Couldn't convert DTS to running time");
+ gst::error!(CAT, obj: stream.sinkpad, "Couldn't convert DTS to running time");
gst::FlowError::Error
})?;
@@ -378,7 +379,7 @@ impl FMP4Mux {
.ok_or_else(|| {
gst::error!(
CAT,
- obj: sinkpad,
+ obj: stream.sinkpad,
"Couldn't convert end DTS to running time"
);
gst::FlowError::Error
@@ -407,14 +408,14 @@ impl FMP4Mux {
if self.obj().class().as_ref().variant != super::Variant::ONVIF {
// Store in the queue so we don't have to recalculate this all the time
- pre_queue.push_back(PreQueuedBuffer {
+ stream.pre_queue.push_back(PreQueuedBuffer {
buffer,
pts,
end_pts,
dts,
end_dts,
});
- } else if let Some(running_time_utc_time_mapping) = running_time_utc_time_mapping {
+ } else if let Some(running_time_utc_time_mapping) = stream.running_time_utc_time_mapping {
// For ONVIF we need to re-timestamp the buffer with its UTC time.
//
// After re-timestamping, put the buffer into the pre-queue so re-timestamping only has to
@@ -422,9 +423,9 @@ impl FMP4Mux {
let utc_time = match get_utc_time_from_buffer(&buffer) {
None => {
// Calculate from the mapping
- running_time_to_utc_time(pts, *running_time_utc_time_mapping).ok_or_else(
+ running_time_to_utc_time(pts, running_time_utc_time_mapping).ok_or_else(
|| {
- gst::error!(CAT, obj: sinkpad, "Stream has negative PTS UTC time");
+ gst::error!(CAT, obj: stream.sinkpad, "Stream has negative PTS UTC time");
gst::FlowError::Error
},
)?
@@ -433,31 +434,31 @@ impl FMP4Mux {
};
gst::trace!(
CAT,
- obj: sinkpad,
+ obj: stream.sinkpad,
"Mapped PTS running time {pts} to UTC time {utc_time}"
);
let end_pts_utc_time =
running_time_to_utc_time(end_pts, (pts, utc_time)).ok_or_else(|| {
- gst::error!(CAT, obj: sinkpad, "Stream has negative end PTS UTC time");
+ gst::error!(CAT, obj: stream.sinkpad, "Stream has negative end PTS UTC time");
gst::FlowError::Error
})?;
let (dts_utc_time, end_dts_utc_time) = if let Some(dts) = dts {
let dts_utc_time =
running_time_to_utc_time(dts, (pts, utc_time)).ok_or_else(|| {
- gst::error!(CAT, obj: sinkpad, "Stream has negative DTS UTC time");
+ gst::error!(CAT, obj: stream.sinkpad, "Stream has negative DTS UTC time");
gst::FlowError::Error
})?;
gst::trace!(
CAT,
- obj: sinkpad,
+ obj: stream.sinkpad,
"Mapped DTS running time {dts} to UTC time {dts_utc_time}"
);
let end_dts_utc_time = running_time_to_utc_time(end_dts.unwrap(), (pts, utc_time))
.ok_or_else(|| {
- gst::error!(CAT, obj: sinkpad, "Stream has negative end DTS UTC time");
+ gst::error!(CAT, obj: stream.sinkpad, "Stream has negative end DTS UTC time");
gst::FlowError::Error
})?;
@@ -469,7 +470,7 @@ impl FMP4Mux {
(None, None)
};
- pre_queue.push_back(PreQueuedBuffer {
+ stream.pre_queue.push_back(PreQueuedBuffer {
buffer,
pts: utc_time,
end_pts: end_pts_utc_time,
@@ -479,9 +480,11 @@ impl FMP4Mux {
} else {
// In ONVIF mode we need to get UTC times for each buffer and synchronize based on that.
// Queue up to min(6s, fragment_duration) of data in the very beginning to get the first UTC time and then backdate.
- if let Some((last, first)) = Option::zip(pre_queue.back(), pre_queue.front()) {
+ if let Some((last, first)) =
+ Option::zip(stream.pre_queue.back(), stream.pre_queue.front())
+ {
// Existence of PTS/DTS checked below
- let (last, first) = if delta_frames.requires_dts() {
+ let (last, first) = if stream.delta_frames.requires_dts() {
(last.end_dts.unwrap(), first.end_dts.unwrap())
} else {
(
@@ -494,7 +497,7 @@ impl FMP4Mux {
if last.saturating_sub(first) > gst::Signed::Positive(limit) {
gst::error!(
CAT,
- obj: sinkpad,
+ obj: stream.sinkpad,
"Got no UTC time in the first {limit} of the stream"
);
return Err(gst::FlowError::Error);
@@ -504,7 +507,7 @@ impl FMP4Mux {
let utc_time = match get_utc_time_from_buffer(&buffer) {
Some(utc_time) => utc_time,
None => {
- pre_queue.push_back(PreQueuedBuffer {
+ stream.pre_queue.push_back(PreQueuedBuffer {
buffer,
pts,
end_pts,
@@ -516,11 +519,11 @@ impl FMP4Mux {
};
let mapping = (gst::Signed::Positive(pts), utc_time);
- *running_time_utc_time_mapping = Some(mapping);
+ stream.running_time_utc_time_mapping = Some(mapping);
// Push the buffer onto the pre-queue and re-timestamp it and all other buffers
// based on the mapping above once we have an UTC time.
- pre_queue.push_back(PreQueuedBuffer {
+ stream.pre_queue.push_back(PreQueuedBuffer {
buffer,
pts,
end_pts,
@@ -528,15 +531,15 @@ impl FMP4Mux {
end_dts,
});
- for pre_queued_buffer in pre_queue.iter_mut() {
+ for pre_queued_buffer in stream.pre_queue.iter_mut() {
let pts_utc_time = running_time_to_utc_time(pre_queued_buffer.pts, mapping)
.ok_or_else(|| {
- gst::error!(CAT, obj: sinkpad, "Stream has negative PTS UTC time");
+ gst::error!(CAT, obj: stream.sinkpad, "Stream has negative PTS UTC time");
gst::FlowError::Error
})?;
gst::trace!(
CAT,
- obj: sinkpad,
+ obj: stream.sinkpad,
"Mapped PTS running time {} to UTC time {pts_utc_time}",
pre_queued_buffer.pts,
);
@@ -544,29 +547,31 @@ impl FMP4Mux {
let end_pts_utc_time = running_time_to_utc_time(pre_queued_buffer.end_pts, mapping)
.ok_or_else(|| {
- gst::error!(CAT, obj: sinkpad, "Stream has negative end PTS UTC time");
+ gst::error!(CAT, obj: stream.sinkpad, "Stream has negative end PTS UTC time");
gst::FlowError::Error
})?;
pre_queued_buffer.end_pts = end_pts_utc_time;
if let Some(dts) = pre_queued_buffer.dts {
let dts_utc_time = running_time_to_utc_time(dts, mapping).ok_or_else(|| {
- gst::error!(CAT, obj: sinkpad, "Stream has negative DTS UTC time");
+ gst::error!(CAT, obj: stream.sinkpad, "Stream has negative DTS UTC time");
gst::FlowError::Error
})?;
gst::trace!(
CAT,
- obj: sinkpad,
+ obj: stream.sinkpad,
"Mapped DTS running time {dts} to UTC time {dts_utc_time}"
);
pre_queued_buffer.dts = Some(gst::Signed::Positive(dts_utc_time));
- let end_dts_utc_time =
- running_time_to_utc_time(pre_queued_buffer.end_dts.unwrap(), mapping)
- .ok_or_else(|| {
- gst::error!(CAT, obj: sinkpad, "Stream has negative DTS UTC time");
- gst::FlowError::Error
- })?;
+ let end_dts_utc_time = running_time_to_utc_time(
+ pre_queued_buffer.end_dts.unwrap(),
+ mapping,
+ )
+ .ok_or_else(|| {
+ gst::error!(CAT, obj: stream.sinkpad, "Stream has negative DTS UTC time");
+ gst::FlowError::Error
+ })?;
pre_queued_buffer.end_dts = Some(gst::Signed::Positive(end_dts_utc_time));
}
}
@@ -574,44 +579,34 @@ impl FMP4Mux {
// Fall through and return the front of the queue
}
- Ok(Some(pre_queue.front().unwrap().clone()))
+ Ok(Some(stream.pre_queue.front().unwrap().clone()))
}
- fn pop_buffer(
- &self,
- _sinkpad: &super::FMP4MuxPad,
- pre_queue: &mut VecDeque<PreQueuedBuffer>,
- running_time_utc_time_mapping: &Option<(gst::Signed<gst::ClockTime>, gst::ClockTime)>,
- ) -> PreQueuedBuffer {
+ /// Pop the currently queued buffer from this stream.
+ fn pop_buffer(&self, stream: &mut Stream) -> PreQueuedBuffer {
// Only allowed to be called after peek was successful so there must be a buffer now
// or in ONVIF mode we must also know the mapping now.
- assert!(!pre_queue.is_empty());
+ assert!(!stream.pre_queue.is_empty());
if self.obj().class().as_ref().variant == super::Variant::ONVIF {
- assert!(running_time_utc_time_mapping.is_some());
+ assert!(stream.running_time_utc_time_mapping.is_some());
}
- pre_queue.pop_front().unwrap()
+ stream.pre_queue.pop_front().unwrap()
}
+ /// Finds the stream that has the earliest buffer queued.
fn find_earliest_stream<'a>(
&self,
state: &'a mut State,
timeout: bool,
fragment_duration: gst::ClockTime,
- ) -> Result<Option<(usize, &'a mut Stream)>, gst::FlowError> {
+ ) -> Result<Option<&'a mut Stream>, gst::FlowError> {
let mut earliest_stream = None;
let mut all_have_data_or_eos = true;
- for (idx, stream) in state.streams.iter_mut().enumerate() {
- let pre_queued_buffer = match Self::peek_buffer(
- self,
- &stream.sinkpad,
- stream.delta_frames,
- &mut stream.pre_queue,
- &mut stream.running_time_utc_time_mapping,
- fragment_duration,
- ) {
+ for stream in state.streams.iter_mut() {
+ let pre_queued_buffer = match Self::peek_buffer(self, stream, fragment_duration) {
Ok(Some(buffer)) => buffer,
Ok(None) | Err(gst_base::AGGREGATOR_FLOW_NEED_DATA) => {
if stream.sinkpad.is_eos() {
@@ -635,7 +630,13 @@ impl FMP4Mux {
continue;
}
- gst::trace!(CAT, obj: stream.sinkpad, "Stream has running time PTS {} / DTS {} queued", pre_queued_buffer.pts, pre_queued_buffer.dts.display());
+ gst::trace!(
+ CAT,
+ obj: stream.sinkpad,
+ "Stream has running time PTS {} / DTS {} queued",
+ pre_queued_buffer.pts,
+ pre_queued_buffer.dts.display(),
+ );
let running_time = if stream.delta_frames.requires_dts() {
pre_queued_buffer.dts.unwrap()
@@ -645,11 +646,11 @@ impl FMP4Mux {
if earliest_stream
.as_ref()
- .map_or(true, |(_idx, _stream, earliest_running_time)| {
+ .map_or(true, |(_stream, earliest_running_time)| {
*earliest_running_time > running_time
})
{
- earliest_stream = Some((idx, stream, running_time));
+ earliest_stream = Some((stream, running_time));
}
}
@@ -660,7 +661,7 @@ impl FMP4Mux {
"No timeout and not all streams have a buffer or are EOS"
);
Ok(None)
- } else if let Some((idx, stream, earliest_running_time)) = earliest_stream {
+ } else if let Some((stream, earliest_running_time)) = earliest_stream {
gst::trace!(
CAT,
imp: self,
@@ -668,17 +669,16 @@ impl FMP4Mux {
stream.sinkpad.name(),
earliest_running_time
);
- Ok(Some((idx, stream)))
+ Ok(Some(stream))
} else {
gst::trace!(CAT, imp: self, "No streams have data queued currently");
Ok(None)
}
}
- // Queue incoming buffers as individual GOPs.
+ /// Queue incoming buffer as individual GOPs.
fn queue_gops(
&self,
- _idx: usize,
stream: &mut Stream,
mut pre_queued_buffer: PreQueuedBuffer,
) -> Result<(), gst::FlowError> {
@@ -937,6 +937,34 @@ impl FMP4Mux {
Ok(())
}
+ /// Queue buffers from all streams that are not filled for the current fragment yet
+ fn queue_available_buffers(
+ &self,
+ state: &mut State,
+ settings: &Settings,
+ timeout: bool,
+ ) -> Result<(), gst::FlowError> {
+ let fragment_start_pts = state.fragment_start_pts;
+ let chunk_start_pts = state.chunk_start_pts;
+
+ // Always take a buffer from the stream with the earliest queued buffer to keep the
+ // fill-level at all sinkpads in sync.
+ while let Some(stream) =
+ self.find_earliest_stream(state, timeout, settings.fragment_duration)?
+ {
+ let pre_queued_buffer = Self::pop_buffer(self, stream);
+
+ // Queue up the buffer and update GOP tracking state
+ self.queue_gops(stream, pre_queued_buffer)?;
+
+ // Check if this stream is filled enough now.
+ self.check_stream_filled(settings, stream, fragment_start_pts, chunk_start_pts, false);
+ }
+
+ Ok(())
+ }
+
+ /// Check if the stream is filled enough for the current chunk / fragment.
fn check_stream_filled(
&self,
settings: &Settings,
@@ -1051,6 +1079,9 @@ impl FMP4Mux {
}
}
+ /// Calculate earliest PTS, i.e. PTS of the very first fragment.
+ ///
+ /// This also sends a force-keyunit event for the start of the second fragment.
fn calculate_earliest_pts(
&self,
settings: &Settings,
@@ -1186,13 +1217,450 @@ impl FMP4Mux {
}
}
+ /// Drain buffers from a single stream.
+ #[allow(clippy::too_many_arguments)]
+ fn drain_buffers_one_stream(
+ &self,
+ settings: &Settings,
+ stream: &mut Stream,
+ timeout: bool,
+ all_eos: bool,
+ fragment_start_pts: gst::ClockTime,
+ chunk_start_pts: gst::ClockTime,
+ chunk_end_pts: Option<gst::ClockTime>,
+ fragment_start: bool,
+ fragment_filled: bool,
+ ) -> Result<Vec<Gop>, gst::FlowError> {
+ assert!(
+ timeout
+ || all_eos
+ || stream.sinkpad.is_eos()
+ || stream.queued_gops.get(1).map(|gop| gop.final_earliest_pts) == Some(true)
+ || settings.chunk_duration.is_some()
+ );
+
+ let mut gops = Vec::with_capacity(stream.queued_gops.len());
+ if stream.queued_gops.is_empty() {
+ return Ok(gops);
+ }
+
+ // For the first stream drain as much as necessary and decide the end of this
+ // fragment or chunk, for all other streams drain up to that position.
+
+ if let Some(chunk_duration) = settings.chunk_duration {
+ // Chunk mode
+
+ let dequeue_end_pts = if let Some(chunk_end_pts) = chunk_end_pts {
+ // Not the first stream
+ chunk_end_pts
+ } else if fragment_filled {
+ // Fragment is filled, so only dequeue everything until the latest GOP
+ fragment_start_pts + settings.fragment_duration
+ } else {
+ // Fragment is not filled and we either have a full chunk or timeout
+ chunk_start_pts + chunk_duration
+ };
+
+ gst::trace!(
+ CAT,
+ obj: stream.sinkpad,
+ "Draining up to end PTS {} / duration {}",
+ dequeue_end_pts,
+ dequeue_end_pts - chunk_start_pts
+ );
+
+ while let Some(gop) = stream.queued_gops.back() {
+ // If this should be the last chunk of a fragment then only drain every
+ // finished GOP until the chunk end PTS. If there is no finished GOP for
+ // this stream (it would be not the first stream then), then drain
+ // everything up to the chunk end PTS.
+ //
+ // If this chunk is not the last chunk of a fragment then simply dequeue
+ // everything up to the chunk end PTS.
+ if fragment_filled {
+ gst::trace!(
+ CAT,
+ obj: stream.sinkpad,
+ "Fragment filled, current GOP start {} end {} (final {})",
+ gop.start_pts, gop.end_pts,
+ gop.final_end_pts || all_eos || stream.sinkpad.is_eos()
+ );
+
+ if (gop.final_end_pts || all_eos || stream.sinkpad.is_eos())
+ && gop.end_pts <= dequeue_end_pts
+ {
+ gst::trace!(
+ CAT,
+ obj: stream.sinkpad,
+ "Pushing whole GOP",
+ );
+ gops.push(stream.queued_gops.pop_back().unwrap());
+ continue;
+ }
+ if !gops.is_empty() {
+ break;
+ }
+
+ gst::error!(CAT, obj: stream.sinkpad, "Don't have a full GOP at the end of a fragment");
+ } else {
+ gst::trace!(
+ CAT,
+ obj: stream.sinkpad,
+ "Chunk filled, current GOP start {} end {} (final {})",
+ gop.start_pts, gop.end_pts,
+ gop.final_end_pts || all_eos || stream.sinkpad.is_eos()
+ );
+ }
+
+ if gop.end_pts <= dequeue_end_pts
+ && (gop.final_end_pts || all_eos || stream.sinkpad.is_eos())
+ {
+ gst::trace!(
+ CAT,
+ obj: stream.sinkpad,
+ "Pushing whole GOP",
+ );
+ gops.push(stream.queued_gops.pop_back().unwrap());
+ } else if gop.start_pts >= dequeue_end_pts
+ || (!gop.final_earliest_pts && !all_eos && !stream.sinkpad.is_eos())
+ {
+ gst::trace!(
+ CAT,
+ obj: stream.sinkpad,
+ "GOP starts after chunk end",
+ );
+ break;
+ } else {
+ let gop = stream.queued_gops.back_mut().unwrap();
+
+ let start_pts = gop.start_pts;
+ let start_dts = gop.start_dts;
+ let earliest_pts = gop.earliest_pts;
+ let earliest_pts_position = gop.earliest_pts_position;
+
+ let mut split_index = None;
+
+ for (idx, buffer) in gop.buffers.iter().enumerate() {
+ if buffer.pts >= dequeue_end_pts {
+ break;
+ }
+ split_index = Some(idx);
+ }
+ let split_index = match split_index {
+ Some(split_index) => split_index,
+ None => {
+ // We have B frames and the first buffer of this GOP is too far
+ // in the future.
+ gst::trace!(
+ CAT,
+ obj: stream.sinkpad,
+ "First buffer of GOP too far in the future",
+ );
+ break;
+ }
+ };
+
+ // The last buffer of the GOP starts before the chunk end but ends
+ // after the end. We still take it here and remove the whole GOP.
+ if split_index == gop.buffers.len() - 1 {
+ if gop.final_end_pts || all_eos || stream.sinkpad.is_eos() {
+ gst::trace!(
+ CAT,
+ obj: stream.sinkpad,
+ "Pushing whole GOP",
+ );
+ gops.push(stream.queued_gops.pop_back().unwrap());
+ } else {
+ gst::trace!(
+ CAT,
+ obj: stream.sinkpad,
+ "Can't push whole GOP as it's not final yet",
+ );
+ }
+ break;
+ }
+
+ let mut buffers = mem::take(&mut gop.buffers);
+ // Contains all buffers from `split_index + 1` to the end
+ gop.buffers = buffers.split_off(split_index + 1);
+
+ gop.start_pts = gop.buffers[0].pts;
+ gop.start_dts = gop.buffers[0].dts;
+ gop.earliest_pts_position = gop.buffers[0].pts_position;
+ gop.earliest_pts = gop.buffers[0].pts;
+
+ gst::trace!(
+ CAT,
+ obj: stream.sinkpad,
+ "Splitting GOP and keeping PTS {}",
+ gop.buffers[0].pts,
+ );
+
+ let queue_gop = Gop {
+ start_pts,
+ start_dts,
+ earliest_pts,
+ final_earliest_pts: true,
+ end_pts: gop.start_pts,
+ final_end_pts: true,
+ end_dts: gop.start_dts,
+ earliest_pts_position,
+ buffers,
+ };
+
+ gops.push(queue_gop);
+ break;
+ }
+ }
+
+ if fragment_start {
+ if let Some(first_buffer) = gops.first().and_then(|gop| gop.buffers.first()) {
+ if first_buffer
+ .buffer
+ .flags()
+ .contains(gst::BufferFlags::DELTA_UNIT)
+ {
+ gst::error!(CAT, obj: stream.sinkpad, "First buffer of a new fragment is not a keyframe");
+ }
+ }
+ }
+ } else {
+ // Non-chunk mode
+
+ let dequeue_end_pts = if let Some(chunk_end_pts) = chunk_end_pts {
+ // Not the first stream
+ chunk_end_pts
+ } else {
+ fragment_start_pts + settings.fragment_duration
+ };
+
+ gst::trace!(
+ CAT,
+ obj: stream.sinkpad,
+ "Draining up to end PTS {} / duration {}",
+ dequeue_end_pts,
+ dequeue_end_pts - chunk_start_pts
+ );
+
+ while let Some(gop) = stream.queued_gops.back() {
+ gst::trace!(
+ CAT,
+ obj: stream.sinkpad,
+ "Current GOP start {} end {} (final {})",
+ gop.start_pts, gop.end_pts,
+ gop.final_end_pts || all_eos || stream.sinkpad.is_eos()
+ );
+
+ // If this GOP is not complete then we can't pop it yet.
+ //
+ // If there was no complete GOP at all yet then it might be bigger than the
+ // fragment duration. In this case we might not be able to handle the latency
+ // requirements in a live pipeline.
+ if !gop.final_end_pts && !all_eos && !stream.sinkpad.is_eos() {
+ gst::trace!(
+ CAT,
+ obj: stream.sinkpad,
+ "Not including GOP without final end PTS",
+ );
+ break;
+ }
+
+ // If this GOP starts after the fragment end then don't dequeue it yet unless this is
+ // the first stream and no GOPs were dequeued at all yet. This would mean that the
+ // GOP is bigger than the fragment duration.
+ if !all_eos
+ && gop.end_pts > dequeue_end_pts
+ && (chunk_end_pts.is_some() || !gops.is_empty())
+ {
+ gst::trace!(
+ CAT,
+ obj: stream.sinkpad,
+ "Not including GOP yet",
+ );
+ break;
+ }
+
+ gst::trace!(
+ CAT,
+ obj: stream.sinkpad,
+ "Pushing complete GOP",
+ );
+ gops.push(stream.queued_gops.pop_back().unwrap());
+ }
+ }
+
+ Ok(gops)
+ }
+
+ /// Flatten all GOPs, remove any gaps and calculate durations.
+ #[allow(clippy::type_complexity)]
+ fn flatten_gops(
+ &self,
+ idx: usize,
+ stream: &Stream,
+ gops: Vec<Gop>,
+ ) -> Result<
+ Option<(
+ // All buffers of the GOPs without gaps
+ VecDeque<super::Buffer>,
+ // Earliest PTS
+ gst::ClockTime,
+ // Earliest PTS position
+ gst::ClockTime,
+ // End PTS
+ gst::ClockTime,
+ // Start DTS
+ Option<gst::ClockTime>,
+ // Start DTS position
+ Option<gst::ClockTime>,
+ // End DTS
+ Option<gst::ClockTime>,
+ )>,
+ gst::FlowError,
+ > {
+ let last_gop = gops.last().unwrap();
+ let end_pts = last_gop.end_pts;
+ let end_dts = last_gop.end_dts;
+
+ let mut gop_buffers = Vec::with_capacity(gops.iter().map(|g| g.buffers.len()).sum());
+ gop_buffers.extend(gops.into_iter().flat_map(|gop| gop.buffers.into_iter()));
+
+ // Then calculate durations for all of the buffers and get rid of any GAP buffers in
+ // the process.
+ // Also calculate the earliest PTS / start DTS here, which needs to consider GAP
+ // buffers too.
+ let mut buffers = VecDeque::with_capacity(gop_buffers.len());
+ let mut earliest_pts = None;
+ let mut earliest_pts_position = None;
+ let mut start_dts = None;
+ let mut start_dts_position = None;
+
+ let mut gop_buffers = gop_buffers.into_iter();
+ while let Some(buffer) = gop_buffers.next() {
+ // If this is a GAP buffer then skip it. Its duration was already considered
+ // below for the non-GAP buffer preceding it, and if there was none then the
+ // chunk start would be adjusted accordingly for this stream.
+ if buffer.buffer.flags().contains(gst::BufferFlags::GAP)
+ && buffer.buffer.flags().contains(gst::BufferFlags::DROPPABLE)
+ && buffer.buffer.size() == 0
+ {
+ gst::trace!(
+ CAT,
+ obj: stream.sinkpad,
+ "Skipping gap buffer {buffer:?}",
+ );
+ continue;
+ }
+
+ if earliest_pts.map_or(true, |earliest_pts| buffer.pts < earliest_pts) {
+ earliest_pts = Some(buffer.pts);
+ }
+ if earliest_pts_position.map_or(true, |earliest_pts_position| {
+ buffer.buffer.pts().unwrap() < earliest_pts_position
+ }) {
+ earliest_pts_position = Some(buffer.buffer.pts().unwrap());
+ }
+ if stream.delta_frames.requires_dts() && start_dts.is_none() {
+ start_dts = Some(buffer.dts.unwrap());
+ }
+ if stream.delta_frames.requires_dts() && start_dts_position.is_none() {
+ start_dts_position = Some(buffer.buffer.dts().unwrap());
+ }
+
+ let timestamp = if !stream.delta_frames.requires_dts() {
+ buffer.pts
+ } else {
+ buffer.dts.unwrap()
+ };
+
+ // Take as end timestamp the timestamp of the next non-GAP buffer
+ let end_timestamp = match gop_buffers.as_slice().iter().find(|buf| {
+ !buf.buffer.flags().contains(gst::BufferFlags::GAP)
+ || !buf.buffer.flags().contains(gst::BufferFlags::DROPPABLE)
+ || buf.buffer.size() != 0
+ }) {
+ Some(buffer) => {
+ if !stream.delta_frames.requires_dts() {
+ buffer.pts
+ } else {
+ buffer.dts.unwrap()
+ }
+ }
+ None => {
+ if !stream.delta_frames.requires_dts() {
+ end_pts
+ } else {
+ end_dts.unwrap()
+ }
+ }
+ };
+
+ // Timestamps are enforced to monotonically increase when queueing buffers
+ let duration = end_timestamp
+ .checked_sub(timestamp)
+ .expect("Timestamps going backwards");
+
+ let composition_time_offset = if !stream.delta_frames.requires_dts() {
+ None
+ } else {
+ let pts = buffer.pts;
+ let dts = buffer.dts.unwrap();
+
+ Some(
+ i64::try_from(
+ (gst::Signed::Positive(pts) - gst::Signed::Positive(dts)).nseconds(),
+ )
+ .map_err(|_| {
+ gst::error!(CAT, obj: stream.sinkpad, "Too big PTS/DTS difference");
+ gst::FlowError::Error
+ })?,
+ )
+ };
+
+ buffers.push_back(Buffer {
+ idx,
+ buffer: buffer.buffer,
+ timestamp,
+ duration,
+ composition_time_offset,
+ });
+ }
+
+ if buffers.is_empty() {
+ return Ok(None);
+ }
+
+ let earliest_pts = earliest_pts.unwrap();
+ let earliest_pts_position = earliest_pts_position.unwrap();
+ if stream.delta_frames.requires_dts() {
+ assert!(start_dts.is_some());
+ assert!(start_dts_position.is_some());
+ }
+ let start_dts = start_dts;
+ let start_dts_position = start_dts_position;
+
+ Ok(Some((
+ buffers,
+ earliest_pts,
+ earliest_pts_position,
+ end_pts,
+ start_dts,
+ start_dts_position,
+ end_dts,
+ )))
+ }
+
+ /// Drain buffers from all streams for the current chunk.
+ ///
+ /// Also removes gap buffers, calculates buffer durations and various timestamps relevant for
+ /// the current chunk.
#[allow(clippy::type_complexity)]
fn drain_buffers(
&self,
state: &mut State,
settings: &Settings,
timeout: bool,
- at_eos: bool,
+ all_eos: bool,
) -> Result<
(
// Drained streams
@@ -1219,7 +1687,6 @@ impl FMP4Mux {
let mut min_earliest_pts = None;
let mut min_start_dts_position = None;
let mut chunk_end_pts = None;
- let mut fragment_start = false;
// In fragment mode, each chunk is a full fragment. Otherwise, in chunk mode,
// this fragment is filled if it is filled for the first non-EOS stream
@@ -1231,6 +1698,10 @@ impl FMP4Mux {
.map(|s| s.fragment_filled)
== Some(true);
+ let fragment_start_pts = state.fragment_start_pts.unwrap();
+ let chunk_start_pts = state.chunk_start_pts.unwrap();
+ let fragment_start = fragment_start_pts == chunk_start_pts;
+
// The first stream decides how much can be dequeued, if anything at all.
//
// In chunk mode:
@@ -1255,277 +1726,27 @@ impl FMP4Mux {
CAT,
imp: self,
"Starting to drain at {} (fragment start {}, fragment end {}, chunk start {}, chunk end {})",
- state.chunk_start_pts.display(),
- state.fragment_start_pts.display(),
- state.fragment_start_pts.map(|start| start + settings.fragment_duration).display(),
- state.chunk_start_pts.display(),
- Option::zip(state.chunk_start_pts, settings.chunk_duration).map(|(start, duration)| start + duration).display(),
+ chunk_start_pts,
+ fragment_start_pts,
+ fragment_start_pts + settings.fragment_duration,
+ chunk_start_pts.display(),
+ settings.chunk_duration.map(|duration| chunk_start_pts + duration).display(),
);
for (idx, stream) in state.streams.iter_mut().enumerate() {
let stream_settings = stream.sinkpad.imp().settings.lock().unwrap().clone();
- assert!(
- timeout
- || at_eos
- || stream.sinkpad.is_eos()
- || stream.queued_gops.get(1).map(|gop| gop.final_earliest_pts) == Some(true)
- || settings.chunk_duration.is_some()
- );
-
- let mut gops = Vec::with_capacity(stream.queued_gops.len());
- if !stream.queued_gops.is_empty() {
- let fragment_start_pts = state.fragment_start_pts.unwrap();
- let chunk_start_pts = state.chunk_start_pts.unwrap();
-
- // For the first stream drain as much as necessary and decide the end of this
- // fragment or chunk, for all other streams drain up to that position.
- if let Some(chunk_duration) = settings.chunk_duration {
- let dequeue_end_pts = if let Some(chunk_end_pts) = chunk_end_pts {
- // Not the first stream
- chunk_end_pts
- } else if fragment_filled {
- // Fragment is filled, so only dequeue everything until the latest GOP
- fragment_start_pts + settings.fragment_duration
- } else {
- // Fragment is not filled and we either have a full chunk or timeout
- chunk_start_pts + chunk_duration
- };
-
- gst::trace!(
- CAT,
- obj: stream.sinkpad,
- "Draining up to end PTS {} / duration {}",
- dequeue_end_pts,
- dequeue_end_pts - chunk_start_pts
- );
-
- while let Some(gop) = stream.queued_gops.back() {
- // If this should be the last chunk of a fragment then only drain every
- // finished GOP until the chunk end PTS. If there is no finished GOP for
- // this stream (it would be not the first stream then), then drain
- // everything up to the chunk end PTS.
- //
- // If this chunk is not the last chunk of a fragment then simply dequeue
- // everything up to the chunk end PTS.
- if fragment_filled {
- gst::trace!(
- CAT,
- obj: stream.sinkpad,
- "Fragment filled, current GOP start {} end {} (final {})",
- gop.start_pts, gop.end_pts,
- gop.final_end_pts || at_eos || stream.sinkpad.is_eos()
- );
-
- if (gop.final_end_pts || at_eos || stream.sinkpad.is_eos())
- && gop.end_pts <= dequeue_end_pts
- {
- gst::trace!(
- CAT,
- obj: stream.sinkpad,
- "Pushing whole GOP",
- );
- gops.push(stream.queued_gops.pop_back().unwrap());
- continue;
- }
- if !gops.is_empty() {
- break;
- }
-
- gst::error!(CAT, obj: stream.sinkpad, "Don't have a full GOP at the end of a fragment");
- } else {
- gst::trace!(
- CAT,
- obj: stream.sinkpad,
- "Chunk filled, current GOP start {} end {} (final {})",
- gop.start_pts, gop.end_pts,
- gop.final_end_pts || at_eos || stream.sinkpad.is_eos()
- );
- }
-
- if gop.end_pts <= dequeue_end_pts
- && (gop.final_end_pts || at_eos || stream.sinkpad.is_eos())
- {
- gst::trace!(
- CAT,
- obj: stream.sinkpad,
- "Pushing whole GOP",
- );
- gops.push(stream.queued_gops.pop_back().unwrap());
- } else if gop.start_pts >= dequeue_end_pts
- || (!gop.final_earliest_pts && !at_eos && !stream.sinkpad.is_eos())
- {
- gst::trace!(
- CAT,
- obj: stream.sinkpad,
- "GOP starts after chunk end",
- );
- break;
- } else {
- let gop = stream.queued_gops.back_mut().unwrap();
-
- let start_pts = gop.start_pts;
- let start_dts = gop.start_dts;
- let earliest_pts = gop.earliest_pts;
- let earliest_pts_position = gop.earliest_pts_position;
-
- let mut split_index = None;
-
- for (idx, buffer) in gop.buffers.iter().enumerate() {
- if buffer.pts >= dequeue_end_pts {
- break;
- }
- split_index = Some(idx);
- }
- let split_index = match split_index {
- Some(split_index) => split_index,
- None => {
- // We have B frames and the first buffer of this GOP is too far
- // in the future.
- gst::trace!(
- CAT,
- obj: stream.sinkpad,
- "First buffer of GOP too far in the future",
- );
- break;
- }
- };
-
- // The last buffer of the GOP starts before the chunk end but ends
- // after the end. We still take it here and remove the whole GOP.
- if split_index == gop.buffers.len() - 1 {
- if gop.final_end_pts || at_eos || stream.sinkpad.is_eos() {
- gst::trace!(
- CAT,
- obj: stream.sinkpad,
- "Pushing whole GOP",
- );
- gops.push(stream.queued_gops.pop_back().unwrap());
- } else {
- gst::trace!(
- CAT,
- obj: stream.sinkpad,
- "Can't push whole GOP as it's not final yet",
- );
- }
- break;
- }
-
- let mut buffers = mem::take(&mut gop.buffers);
- // Contains all buffers from `split_index + 1` to the end
- gop.buffers = buffers.split_off(split_index + 1);
-
- gop.start_pts = gop.buffers[0].pts;
- gop.start_dts = gop.buffers[0].dts;
- gop.earliest_pts_position = gop.buffers[0].pts_position;
- gop.earliest_pts = gop.buffers[0].pts;
-
- gst::trace!(
- CAT,
- obj: stream.sinkpad,
- "Splitting GOP and keeping PTS {}",
- gop.buffers[0].pts,
- );
-
- let queue_gop = Gop {
- start_pts,
- start_dts,
- earliest_pts,
- final_earliest_pts: true,
- end_pts: gop.start_pts,
- final_end_pts: true,
- end_dts: gop.start_dts,
- earliest_pts_position,
- buffers,
- };
-
- gops.push(queue_gop);
- break;
- }
- }
-
- fragment_start = fragment_start_pts == chunk_start_pts;
- if fragment_start {
- if let Some(first_buffer) = gops.first().and_then(|gop| gop.buffers.first())
- {
- if first_buffer
- .buffer
- .flags()
- .contains(gst::BufferFlags::DELTA_UNIT)
- {
- gst::error!(CAT, obj: stream.sinkpad, "First buffer of a new fragment is not a keyframe");
- }
- }
- }
- } else {
- let dequeue_end_pts = if let Some(chunk_end_pts) = chunk_end_pts {
- // Not the first stream
- chunk_end_pts
- } else {
- fragment_start_pts + settings.fragment_duration
- };
-
- gst::trace!(
- CAT,
- obj: stream.sinkpad,
- "Draining up to end PTS {} / duration {}",
- dequeue_end_pts,
- dequeue_end_pts - chunk_start_pts
- );
-
- while let Some(gop) = stream.queued_gops.back() {
- gst::trace!(
- CAT,
- obj: stream.sinkpad,
- "Current GOP start {} end {} (final {})",
- gop.start_pts, gop.end_pts,
- gop.final_end_pts || at_eos || stream.sinkpad.is_eos()
- );
-
- // If this GOP is not complete then we can't pop it yet.
- //
- // If there was no complete GOP at all yet then it might be bigger than the
- // fragment duration. In this case we might not be able to handle the latency
- // requirements in a live pipeline.
- if !gop.final_end_pts && !at_eos && !stream.sinkpad.is_eos() {
- gst::trace!(
- CAT,
- obj: stream.sinkpad,
- "Not including GOP without final end PTS",
- );
- break;
- }
-
- // If this GOP starts after the fragment end then don't dequeue it yet unless this is
- // the first stream and no GOPs were dequeued at all yet. This would mean that the
- // GOP is bigger than the fragment duration.
- if !at_eos
- && gop.end_pts > dequeue_end_pts
- && (chunk_end_pts.is_some() || !gops.is_empty())
- {
- gst::trace!(
- CAT,
- obj: stream.sinkpad,
- "Not including GOP yet",
- );
- break;
- }
-
- gst::trace!(
- CAT,
- obj: stream.sinkpad,
- "Pushing complete GOP",
- );
- gops.push(stream.queued_gops.pop_back().unwrap());
- }
-
- // If we don't have a next chunk start PTS then this is the first stream as above.
- if chunk_end_pts.is_none() {
- // In fragment mode, each chunk is a full fragment
- fragment_start = true;
- }
- }
- }
+ let gops = self.drain_buffers_one_stream(
+ settings,
+ stream,
+ timeout,
+ all_eos,
+ fragment_start_pts,
+ chunk_start_pts,
+ chunk_end_pts,
+ fragment_start,
+ fragment_filled,
+ )?;
stream.fragment_filled = false;
stream.chunk_filled = false;
@@ -1543,7 +1764,7 @@ impl FMP4Mux {
} else {
// If nothing was dequeued for the first stream then this is OK if we're at
// EOS: we just consider the next stream as first stream then.
- if at_eos || stream.sinkpad.is_eos() {
+ if all_eos || stream.sinkpad.is_eos() {
// This is handled below generally if nothing was dequeued
} else {
// Otherwise this can only really happen on timeout in live pipelines.
@@ -1568,7 +1789,7 @@ impl FMP4Mux {
return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA);
}
}
- } else if at_eos {
+ } else if all_eos {
if let Some(last_gop) = gops.last() {
if chunk_end_pts.map_or(true, |chunk_end_pts| chunk_end_pts < last_gop.end_pts)
{
@@ -1620,143 +1841,38 @@ impl FMP4Mux {
.unwrap_or(gst::ClockTime::ZERO)
);
- let last_gop = gops.last().unwrap();
- let end_pts = last_gop.end_pts;
- let end_dts = last_gop.end_dts;
-
// First flatten all GOPs into a single `Vec`
- let mut gop_buffers = Vec::with_capacity(gops.iter().map(|g| g.buffers.len()).sum());
- gop_buffers.extend(gops.into_iter().flat_map(|gop| gop.buffers.into_iter()));
-
- // Then calculate durations for all of the buffers and get rid of any GAP buffers in
- // the process.
- // Also calculate the earliest PTS / start DTS here, which needs to consider GAP
- // buffers too.
- let mut buffers = VecDeque::with_capacity(gop_buffers.len());
- let mut earliest_pts = None;
- let mut earliest_pts_position = None;
- let mut start_dts = None;
- let mut start_dts_position = None;
-
- let mut gop_buffers = gop_buffers.into_iter();
- while let Some(buffer) = gop_buffers.next() {
- // If this is a GAP buffer then skip it. Its duration was already considered
- // below for the non-GAP buffer preceding it, and if there was none then the
- // chunk start would be adjusted accordingly for this stream.
- if buffer.buffer.flags().contains(gst::BufferFlags::GAP)
- && buffer.buffer.flags().contains(gst::BufferFlags::DROPPABLE)
- && buffer.buffer.size() == 0
- {
- gst::trace!(
+ let buffers = self.flatten_gops(idx, stream, gops)?;
+ let (
+ buffers,
+ earliest_pts,
+ earliest_pts_position,
+ end_pts,
+ start_dts,
+ start_dts_position,
+ _end_dts,
+ ) = match buffers {
+ Some(res) => res,
+ None => {
+ gst::info!(
CAT,
obj: stream.sinkpad,
- "Skipping gap buffer {buffer:?}",
+ "Drained only gap buffers",
);
- continue;
- }
- if earliest_pts.map_or(true, |earliest_pts| buffer.pts < earliest_pts) {
- earliest_pts = Some(buffer.pts);
- }
- if earliest_pts_position.map_or(true, |earliest_pts_position| {
- buffer.buffer.pts().unwrap() < earliest_pts_position
- }) {
- earliest_pts_position = Some(buffer.buffer.pts().unwrap());
- }
- if stream.delta_frames.requires_dts() && start_dts.is_none() {
- start_dts = Some(buffer.dts.unwrap());
- }
- if stream.delta_frames.requires_dts() && start_dts_position.is_none() {
- start_dts_position = Some(buffer.buffer.dts().unwrap());
- }
-
- let timestamp = if !stream.delta_frames.requires_dts() {
- buffer.pts
- } else {
- buffer.dts.unwrap()
- };
-
- // Take as end timestamp the timestamp of the next non-GAP buffer
- let end_timestamp = match gop_buffers.as_slice().iter().find(|buf| {
- !buf.buffer.flags().contains(gst::BufferFlags::GAP)
- || !buf.buffer.flags().contains(gst::BufferFlags::DROPPABLE)
- || buf.buffer.size() != 0
- }) {
- Some(buffer) => {
- if !stream.delta_frames.requires_dts() {
- buffer.pts
- } else {
- buffer.dts.unwrap()
- }
- }
- None => {
- if !stream.delta_frames.requires_dts() {
- end_pts
- } else {
- end_dts.unwrap()
- }
- }
- };
-
- // Timestamps are enforced to monotonically increase when queueing buffers
- let duration = end_timestamp
- .checked_sub(timestamp)
- .expect("Timestamps going backwards");
-
- let composition_time_offset = if !stream.delta_frames.requires_dts() {
- None
- } else {
- let pts = buffer.pts;
- let dts = buffer.dts.unwrap();
-
- Some(
- i64::try_from(
- (gst::Signed::Positive(pts) - gst::Signed::Positive(dts)).nseconds(),
- )
- .map_err(|_| {
- gst::error!(CAT, obj: stream.sinkpad, "Too big PTS/DTS difference");
- gst::FlowError::Error
- })?,
- )
- };
-
- buffers.push_back(Buffer {
- idx,
- buffer: buffer.buffer,
- timestamp,
- duration,
- composition_time_offset,
- });
- }
-
- if buffers.is_empty() {
- gst::info!(
- CAT,
- obj: stream.sinkpad,
- "Drained only gap buffers",
- );
-
- drained_streams.push((
- super::FragmentHeaderStream {
- caps: stream.caps.clone(),
- start_time: None,
- delta_frames: stream.delta_frames,
- trak_timescale: stream_settings.trak_timescale,
- },
- VecDeque::new(),
- ));
-
- continue;
- }
+ drained_streams.push((
+ super::FragmentHeaderStream {
+ caps: stream.caps.clone(),
+ start_time: None,
+ delta_frames: stream.delta_frames,
+ trak_timescale: stream_settings.trak_timescale,
+ },
+ VecDeque::new(),
+ ));
- let earliest_pts = earliest_pts.unwrap();
- let earliest_pts_position = earliest_pts_position.unwrap();
- if stream.delta_frames.requires_dts() {
- assert!(start_dts.is_some());
- assert!(start_dts_position.is_some());
- }
- let start_dts = start_dts;
- let start_dts_position = start_dts_position;
+ continue;
+ }
+ };
gst::info!(
CAT,
@@ -1814,6 +1930,7 @@ impl FMP4Mux {
))
}
+ /// Interleave drained buffers of each stream for this chunk according to the settings.
#[allow(clippy::type_complexity)]
fn interleave_buffers(
&self,
@@ -1881,10 +1998,79 @@ impl FMP4Mux {
Ok((interleaved_buffers, streams))
}
+ /// Request a force-keyunit event for the start of the next fragment.
+ ///
+ /// This is called whenever the last chunk of a fragment is pushed out.
+ ///
+ /// `chunk_end_pts` gives the time of the previously drained chunk, which
+ /// ideally should be lower than the next fragment starts PTS.
+ fn request_force_keyunit_event(
+ &self,
+ state: &State,
+ settings: &Settings,
+ upstream_events: &mut Vec<(super::FMP4MuxPad, gst::Event)>,
+ chunk_end_pts: gst::ClockTime,
+ ) {
+ let fku_time = chunk_end_pts + settings.fragment_duration;
+
+ for stream in &state.streams {
+ let current_position = stream.current_position;
+
+ // In case of ONVIF this needs to be converted back from UTC time to
+ // the stream's running time
+ let (fku_time, current_position) =
+ if self.obj().class().as_ref().variant == super::Variant::ONVIF {
+ (
+ if let Some(fku_time) = utc_time_to_running_time(
+ fku_time,
+ stream.running_time_utc_time_mapping.unwrap(),
+ ) {
+ fku_time
+ } else {
+ continue;
+ },
+ utc_time_to_running_time(
+ current_position,
+ stream.running_time_utc_time_mapping.unwrap(),
+ ),
+ )
+ } else {
+ (fku_time, Some(current_position))
+ };
+
+ let fku_time =
+ if current_position.map_or(false, |current_position| current_position > fku_time) {
+ gst::warning!(
+ CAT,
+ obj: stream.sinkpad,
+ "Sending force-keyunit event late for running time {} at {}",
+ fku_time,
+ current_position.display(),
+ );
+ None
+ } else {
+ gst::debug!(
+ CAT,
+ obj: stream.sinkpad,
+ "Sending force-keyunit event for running time {}",
+ fku_time,
+ );
+ Some(fku_time)
+ };
+
+ let fku = gst_video::UpstreamForceKeyUnitEvent::builder()
+ .running_time(fku_time)
+ .all_headers(true)
+ .build();
+
+ upstream_events.push((stream.sinkpad.clone(), fku));
+ }
+ }
+
/// Fills upstream events as needed and returns the caps the first time draining can happen.
///
/// If it returns `(_, None)` then there's currently nothing to drain anymore.
- fn drain(
+ fn drain_one_chunk(
&self,
state: &mut State,
settings: &Settings,
@@ -1948,213 +2134,254 @@ impl FMP4Mux {
}
}
- let mut buffer_list = None;
if interleaved_buffers.is_empty() {
assert!(at_eos);
- } else {
- // If there are actual buffers to output then create headers as needed and create a
- // bufferlist for all buffers that have to be output.
- let min_earliest_pts_position = min_earliest_pts_position.unwrap();
- let min_earliest_pts = min_earliest_pts.unwrap();
- let chunk_end_pts = chunk_end_pts.unwrap();
-
- let mut fmp4_header = None;
- if !state.sent_headers {
- let mut buffer = state.stream_header.as_ref().unwrap().copy();
- {
- let buffer = buffer.get_mut().unwrap();
+ return Ok((caps, None));
+ }
- buffer.set_pts(min_earliest_pts_position);
- buffer.set_dts(min_start_dts_position);
+ // If there are actual buffers to output then create headers as needed and create a
+ // bufferlist for all buffers that have to be output.
+ let min_earliest_pts_position = min_earliest_pts_position.unwrap();
+ let min_earliest_pts = min_earliest_pts.unwrap();
+ let chunk_end_pts = chunk_end_pts.unwrap();
- // Header is DISCONT|HEADER
- buffer.set_flags(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER);
- }
+ let mut fmp4_header = None;
+ if !state.sent_headers {
+ let mut buffer = state.stream_header.as_ref().unwrap().copy();
+ {
+ let buffer = buffer.get_mut().unwrap();
- fmp4_header = Some(buffer);
+ buffer.set_pts(min_earliest_pts_position);
+ buffer.set_dts(min_start_dts_position);
- state.sent_headers = true;
+ // Header is DISCONT|HEADER
+ buffer.set_flags(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER);
}
- // TODO: Write prft boxes before moof
- // TODO: Write sidx boxes before moof and rewrite once offsets are known
+ fmp4_header = Some(buffer);
- if state.sequence_number == 0 {
- state.sequence_number = 1;
- }
- let sequence_number = state.sequence_number;
- // If this is the last chunk of a fragment then increment the sequence number for the
- // start of the next fragment.
- if fragment_filled {
- state.sequence_number += 1;
- }
- let (mut fmp4_fragment_header, moof_offset) =
- boxes::create_fmp4_fragment_header(super::FragmentHeaderConfiguration {
- variant: self.obj().class().as_ref().variant,
- sequence_number,
- chunk: !fragment_start,
- streams: streams.as_slice(),
- buffers: interleaved_buffers.as_slice(),
- })
- .map_err(|err| {
- gst::error!(
- CAT,
- imp: self,
- "Failed to create FMP4 fragment header: {}",
- err
- );
- gst::FlowError::Error
- })?;
-
- {
- let buffer = fmp4_fragment_header.get_mut().unwrap();
- buffer.set_pts(min_earliest_pts_position);
- buffer.set_dts(min_start_dts_position);
- buffer.set_duration(chunk_end_pts.checked_sub(min_earliest_pts));
+ state.sent_headers = true;
+ }
- // Fragment and chunk header is HEADER
- buffer.set_flags(gst::BufferFlags::HEADER);
- // Chunk header is DELTA_UNIT
- if !fragment_start {
- buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
- }
+ // TODO: Write prft boxes before moof
+ // TODO: Write sidx boxes before moof and rewrite once offsets are known
- // Copy metas from the first actual buffer to the fragment header. This allows
- // getting things like the reference timestamp meta or the timecode meta to identify
- // the fragment.
- let _ = interleaved_buffers[0].buffer.copy_into(
- buffer,
- gst::BufferCopyFlags::META,
- 0,
- None,
+ // First sequence number must be 1
+ if state.sequence_number == 0 {
+ state.sequence_number = 1;
+ }
+ let sequence_number = state.sequence_number;
+ // If this is the last chunk of a fragment then increment the sequence number for the
+ // start of the next fragment.
+ if fragment_filled {
+ state.sequence_number += 1;
+ }
+ let (mut fmp4_fragment_header, moof_offset) =
+ boxes::create_fmp4_fragment_header(super::FragmentHeaderConfiguration {
+ variant: self.obj().class().as_ref().variant,
+ sequence_number,
+ chunk: !fragment_start,
+ streams: streams.as_slice(),
+ buffers: interleaved_buffers.as_slice(),
+ })
+ .map_err(|err| {
+ gst::error!(
+ CAT,
+ imp: self,
+ "Failed to create FMP4 fragment header: {}",
+ err
);
- }
+ gst::FlowError::Error
+ })?;
+
+ {
+ let buffer = fmp4_fragment_header.get_mut().unwrap();
+ buffer.set_pts(min_earliest_pts_position);
+ buffer.set_dts(min_start_dts_position);
+ buffer.set_duration(chunk_end_pts.checked_sub(min_earliest_pts));
+
+ // Fragment and chunk header is HEADER
+ buffer.set_flags(gst::BufferFlags::HEADER);
+ // Chunk header is DELTA_UNIT
+ if !fragment_start {
+ buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
+ }
+
+ // Copy metas from the first actual buffer to the fragment header. This allows
+ // getting things like the reference timestamp meta or the timecode meta to identify
+ // the fragment.
+ let _ = interleaved_buffers[0].buffer.copy_into(
+ buffer,
+ gst::BufferCopyFlags::META,
+ 0,
+ None,
+ );
+ }
- let moof_offset = state.current_offset
- + fmp4_header.as_ref().map(|h| h.size()).unwrap_or(0) as u64
- + moof_offset;
+ let moof_offset = state.current_offset
+ + fmp4_header.as_ref().map(|h| h.size()).unwrap_or(0) as u64
+ + moof_offset;
- let buffers_len = interleaved_buffers.len();
- for (idx, buffer) in interleaved_buffers.iter_mut().enumerate() {
- // Fix up buffer flags, all other buffers are DELTA_UNIT
- let buffer_ref = buffer.buffer.make_mut();
- buffer_ref.unset_flags(gst::BufferFlags::all());
- buffer_ref.set_flags(gst::BufferFlags::DELTA_UNIT);
+ let buffers_len = interleaved_buffers.len();
+ for (idx, buffer) in interleaved_buffers.iter_mut().enumerate() {
+ // Fix up buffer flags, all other buffers are DELTA_UNIT
+ let buffer_ref = buffer.buffer.make_mut();
+ buffer_ref.unset_flags(gst::BufferFlags::all());
+ buffer_ref.set_flags(gst::BufferFlags::DELTA_UNIT);
- // Set the marker flag for the last buffer of the segment
- if idx == buffers_len - 1 {
- buffer_ref.set_flags(gst::BufferFlags::MARKER);
- }
+ // Set the marker flag for the last buffer of the segment
+ if idx == buffers_len - 1 {
+ buffer_ref.set_flags(gst::BufferFlags::MARKER);
}
+ }
- buffer_list = Some(
- fmp4_header
- .into_iter()
- .chain(Some(fmp4_fragment_header))
- .chain(interleaved_buffers.into_iter().map(|buffer| buffer.buffer))
- .inspect(|b| {
- state.current_offset += b.size() as u64;
- })
- .collect::<gst::BufferList>(),
- );
-
- if settings.write_mfra && fragment_start {
- // Write mfra only for the main stream on fragment starts, and if there are no
- // buffers for the main stream in this segment then don't write anything.
- if let Some(super::FragmentHeaderStream {
- start_time: Some(start_time),
- ..
- }) = streams.get(0)
- {
- state.fragment_offsets.push(super::FragmentOffset {
- time: *start_time,
- offset: moof_offset,
- });
- }
+ let buffer_list = fmp4_header
+ .into_iter()
+ .chain(Some(fmp4_fragment_header))
+ .chain(interleaved_buffers.into_iter().map(|buffer| buffer.buffer))
+ .inspect(|b| {
+ state.current_offset += b.size() as u64;
+ })
+ .collect::<gst::BufferList>();
+
+ if settings.write_mfra && fragment_start {
+ // Write mfra only for the main stream on fragment starts, and if there are no
+ // buffers for the main stream in this segment then don't write anything.
+ if let Some(super::FragmentHeaderStream {
+ start_time: Some(start_time),
+ ..
+ }) = streams.get(0)
+ {
+ state.fragment_offsets.push(super::FragmentOffset {
+ time: *start_time,
+ offset: moof_offset,
+ });
}
+ }
- state.end_pts = Some(chunk_end_pts);
+ state.end_pts = Some(chunk_end_pts);
- // Update for the start PTS of the next fragment / chunk
+ // Update for the start PTS of the next fragment / chunk
- if fragment_filled {
- state.fragment_start_pts = Some(chunk_end_pts);
- gst::info!(CAT, imp: self, "Starting new fragment at {}", chunk_end_pts,);
- } else {
- gst::info!(CAT, imp: self, "Starting new chunk at {}", chunk_end_pts,);
- }
- state.chunk_start_pts = Some(chunk_end_pts);
-
- // If the current fragment is filled we already have the next fragment's start
- // keyframe and can request the following one.
- if fragment_filled {
- let fku_time = chunk_end_pts + settings.fragment_duration;
-
- for stream in &state.streams {
- let current_position = stream.current_position;
-
- // In case of ONVIF this needs to be converted back from UTC time to
- // the stream's running time
- let (fku_time, current_position) =
- if self.obj().class().as_ref().variant == super::Variant::ONVIF {
- (
- if let Some(fku_time) = utc_time_to_running_time(
- fku_time,
- stream.running_time_utc_time_mapping.unwrap(),
- ) {
- fku_time
- } else {
- continue;
- },
- utc_time_to_running_time(
- current_position,
- stream.running_time_utc_time_mapping.unwrap(),
- ),
- )
- } else {
- (fku_time, Some(current_position))
- };
+ if fragment_filled {
+ state.fragment_start_pts = Some(chunk_end_pts);
+ gst::info!(CAT, imp: self, "Starting new fragment at {}", chunk_end_pts,);
+ } else {
+ gst::info!(CAT, imp: self, "Starting new chunk at {}", chunk_end_pts,);
+ }
+ state.chunk_start_pts = Some(chunk_end_pts);
- let fku_time = if current_position
- .map_or(false, |current_position| current_position > fku_time)
- {
- gst::warning!(
- CAT,
- obj: stream.sinkpad,
- "Sending force-keyunit event late for running time {} at {}",
- fku_time,
- current_position.display(),
- );
- None
- } else {
- gst::debug!(
- CAT,
- obj: stream.sinkpad,
- "Sending force-keyunit event for running time {}",
- fku_time,
+ // If the current fragment is filled we already have the next fragment's start
+ // keyframe and can request the following one.
+ if fragment_filled {
+ self.request_force_keyunit_event(state, settings, upstream_events, chunk_end_pts);
+ }
+
+ // Reset timeout delay now that we've output an actual fragment or chunk
+ state.timeout_delay = gst::ClockTime::ZERO;
+
+ // TODO: Write edit list at EOS
+ // TODO: Rewrite bitrates at EOS
+
+ Ok((caps, Some(buffer_list)))
+ }
+
+ /// Drain all chunks that can currently be drained.
+ ///
+ /// On error the `caps`, `buffers` or `upstream_events` can contain data of already finished
+ /// chunks that were complete before the error.
+ #[allow(clippy::too_many_arguments)]
+ fn drain(
+ &self,
+ state: &mut State,
+ settings: &Settings,
+ all_eos: bool,
+ mut timeout: bool,
+ caps: &mut Option<gst::Caps>,
+ buffers: &mut Vec<gst::BufferList>,
+ upstream_events: &mut Vec<(super::FMP4MuxPad, gst::Event)>,
+ ) -> Result<(), gst::FlowError> {
+ // Loop as long as new chunks can be drained.
+ loop {
+ // If enough GOPs were queued, drain and create the output fragment or chunk
+ let res = self.drain_one_chunk(state, settings, timeout, all_eos, upstream_events);
+ let mut buffer_list = match res {
+ Ok((new_caps, buffer_list)) => {
+ if caps.is_none() {
+ *caps = new_caps;
+ }
+
+ buffer_list
+ }
+ Err(err) => {
+ if err == gst_base::AGGREGATOR_FLOW_NEED_DATA {
+ assert!(!all_eos);
+ assert!(timeout);
+ gst::element_imp_warning!(
+ self,
+ gst::StreamError::Format,
+ ["Longer GOPs than fragment duration"]
);
- Some(fku_time)
- };
+ state.timeout_delay += 1.seconds();
+ }
- let fku = gst_video::UpstreamForceKeyUnitEvent::builder()
- .running_time(fku_time)
- .all_headers(true)
- .build();
+ return Err(err);
+ }
+ };
- upstream_events.push((stream.sinkpad.clone(), fku));
+ // If nothing can't be drained anymore then break the loop, and if all streams are
+ // EOS add the footers.
+ if buffer_list.is_none() {
+ if settings.write_mfra && all_eos {
+ gst::debug!(CAT, imp: self, "Writing mfra box");
+ match boxes::create_mfra(&state.streams[0].caps, &state.fragment_offsets) {
+ Ok(mut mfra) => {
+ {
+ let mfra = mfra.get_mut().unwrap();
+ // mfra is DELTA_UNIT like other buffers
+ mfra.set_flags(gst::BufferFlags::DELTA_UNIT);
+ }
+
+ if buffer_list.is_none() {
+ buffer_list = Some(gst::BufferList::new_sized(1));
+ }
+ buffer_list.as_mut().unwrap().get_mut().unwrap().add(mfra);
+ buffers.extend(buffer_list);
+ }
+ Err(err) => {
+ gst::error!(CAT, imp: self, "Failed to create mfra box: {}", err);
+ }
+ }
}
+
+ break Ok(());
}
- // Reset timeout delay now that we've output an actual fragment or chunk
- state.timeout_delay = gst::ClockTime::ZERO;
- }
+ // Otherwise extend the list of bufferlists and check again if something can be
+ // drained.
+ buffers.extend(buffer_list);
- // TODO: Write edit list at EOS
- // TODO: Rewrite bitrates at EOS
+ // Only the first iteration is considered a timeout.
+ timeout = false;
+
+ let fragment_start_pts = state.fragment_start_pts;
+ let chunk_start_pts = state.chunk_start_pts;
+ for stream in &mut state.streams {
+ // Check if this stream is still filled enough now.
+ self.check_stream_filled(
+ settings,
+ stream,
+ fragment_start_pts,
+ chunk_start_pts,
+ all_eos,
+ );
+ }
- Ok((caps, buffer_list))
+ // And try draining a fragment again
+ }
}
+ /// Create all streams.
fn create_streams(&self, state: &mut State) -> Result<(), gst::FlowError> {
for pad in self
.obj()
@@ -2266,6 +2493,7 @@ impl FMP4Mux {
Ok(())
}
+ /// Generate an updated header at the end and the corresponding caps with the new streamheader.
fn update_header(
&self,
state: &mut State,
@@ -2347,6 +2575,80 @@ impl FMP4Mux {
Ok(Some((list, caps)))
}
+
+ /// Finish the stream be rewriting / updating headers.
+ fn finish(&self, settings: &Settings) {
+ // Do remaining EOS handling after the end of the stream was pushed.
+ gst::debug!(CAT, imp: self, "Doing EOS handling");
+
+ if settings.header_update_mode == super::HeaderUpdateMode::None {
+ // Need to output new headers if started again after EOS
+ self.state.lock().unwrap().sent_headers = false;
+ return;
+ }
+
+ let updated_header = self.update_header(&mut self.state.lock().unwrap(), settings, true);
+ match updated_header {
+ Ok(Some((buffer_list, caps))) => {
+ match settings.header_update_mode {
+ super::HeaderUpdateMode::None => unreachable!(),
+ super::HeaderUpdateMode::Rewrite => {
+ let mut q = gst::query::Seeking::new(gst::Format::Bytes);
+ if self.obj().src_pad().peer_query(&mut q) && q.result().0 {
+ let aggregator = self.obj();
+
+ aggregator.set_src_caps(&caps);
+
+ // Seek to the beginning with a default bytes segment
+ aggregator.update_segment(
+ &gst::FormattedSegment::<gst::format::Bytes>::new(),
+ );
+
+ if let Err(err) = aggregator.finish_buffer_list(buffer_list) {
+ gst::error!(
+ CAT,
+ imp: self,
+ "Failed pushing updated header buffer downstream: {:?}",
+ err,
+ );
+ }
+ } else {
+ gst::error!(
+ CAT,
+ imp: self,
+ "Can't rewrite header because downstream is not seekable"
+ );
+ }
+ }
+ super::HeaderUpdateMode::Update => {
+ let aggregator = self.obj();
+
+ aggregator.set_src_caps(&caps);
+ if let Err(err) = aggregator.finish_buffer_list(buffer_list) {
+ gst::error!(
+ CAT,
+ imp: self,
+ "Failed pushing updated header buffer downstream: {:?}",
+ err,
+ );
+ }
+ }
+ }
+ }
+ Ok(None) => {}
+ Err(err) => {
+ gst::error!(
+ CAT,
+ imp: self,
+ "Failed to generate updated header: {:?}",
+ err
+ );
+ }
+ }
+
+ // Need to output new headers if started again after EOS
+ self.state.lock().unwrap().sent_headers = false;
+ }
}
#[glib::object_subclass]
@@ -2753,12 +3055,11 @@ impl AggregatorImpl for FMP4Mux {
fn aggregate(&self, timeout: bool) -> Result<gst::FlowSuccess, gst::FlowError> {
let settings = self.settings.lock().unwrap().clone();
- let mut upstream_events = vec![];
-
let all_eos;
let mut caps = None;
let mut buffers = vec![];
- {
+ let mut upstream_events = vec![];
+ let res = {
let mut state = self.state.lock().unwrap();
// Create streams
@@ -2766,40 +3067,15 @@ impl AggregatorImpl for FMP4Mux {
self.create_streams(&mut state)?;
}
- // Queue buffers from all streams that are not filled for the current fragment yet
- //
- // Always take a buffer from the stream with the earliest queued buffer to keep the
- // fill-level at all sinkpads in sync.
- let fragment_start_pts = state.fragment_start_pts;
- let chunk_start_pts = state.chunk_start_pts;
-
- while let Some((idx, stream)) =
- self.find_earliest_stream(&mut state, timeout, settings.fragment_duration)?
- {
- let pre_queued_buffer = Self::pop_buffer(
- self,
- &stream.sinkpad,
- &mut stream.pre_queue,
- &stream.running_time_utc_time_mapping,
- );
-
- // Queue up the buffer and update GOP tracking state
- self.queue_gops(idx, stream, pre_queued_buffer)?;
-
- // Check if this stream is filled enough now.
- self.check_stream_filled(
- &settings,
- stream,
- fragment_start_pts,
- chunk_start_pts,
- false,
- );
- }
+ self.queue_available_buffers(&mut state, &settings, timeout)?;
all_eos = state.streams.iter().all(|stream| stream.sinkpad.is_eos());
if all_eos {
gst::debug!(CAT, imp: self, "All streams are EOS now");
+ let fragment_start_pts = state.fragment_start_pts;
+ let chunk_start_pts = state.chunk_start_pts;
+
for stream in &mut state.streams {
// Check if this stream is filled enough now that everything is EOS.
self.check_stream_filled(
@@ -2821,106 +3097,17 @@ impl AggregatorImpl for FMP4Mux {
timeout,
);
- // Loop as long as new chunks can be drained.
- // Only the first iteration is considered a timeout.
- let mut timeout = timeout;
- loop {
- // If enough GOPs were queued, drain and create the output fragment or chunk
- let res = self.drain(
- &mut state,
- &settings,
- timeout,
- all_eos,
- &mut upstream_events,
- );
- let mut buffer_list = match res {
- Ok((new_caps, buffer_list)) => {
- if caps.is_none() {
- caps = new_caps;
- }
-
- buffer_list
- }
- Err(err) => {
- if err == gst_base::AGGREGATOR_FLOW_NEED_DATA {
- assert!(!all_eos);
- assert!(timeout);
- gst::element_imp_warning!(
- self,
- gst::StreamError::Format,
- ["Longer GOPs than fragment duration"]
- );
- state.timeout_delay += 1.seconds();
- }
-
- // Although we had an error, push out everything that was produced so far
- drop(state);
- for (sinkpad, event) in upstream_events {
- sinkpad.push_event(event);
- }
-
- if let Some(caps) = caps {
- gst::debug!(CAT, imp: self, "Setting caps on source pad: {:?}", caps);
- self.obj().set_src_caps(&caps);
- }
-
- for buffer_list in buffers {
- gst::trace!(CAT, imp: self, "Pushing buffer list {:?}", buffer_list);
- self.obj().finish_buffer_list(buffer_list)?;
- }
-
- return Err(err);
- }
- };
-
- // If nothing can't be drained anymore then break the loop, and if all streams are
- // EOS do EOS handling.
- if buffer_list.is_none() {
- if settings.write_mfra && all_eos {
- gst::debug!(CAT, imp: self, "Writing mfra box");
- match boxes::create_mfra(&state.streams[0].caps, &state.fragment_offsets) {
- Ok(mut mfra) => {
- {
- let mfra = mfra.get_mut().unwrap();
- // mfra is DELTA_UNIT like other buffers
- mfra.set_flags(gst::BufferFlags::DELTA_UNIT);
- }
-
- if buffer_list.is_none() {
- buffer_list = Some(gst::BufferList::new_sized(1));
- }
- buffer_list.as_mut().unwrap().get_mut().unwrap().add(mfra);
- buffers.extend(buffer_list);
- }
- Err(err) => {
- gst::error!(CAT, imp: self, "Failed to create mfra box: {}", err);
- }
- }
- }
-
- break;
- }
-
- // Otherwise extend the list of bufferlists and check again if something can be
- // drained.
- buffers.extend(buffer_list);
-
- timeout = false;
-
- let fragment_start_pts = state.fragment_start_pts;
- let chunk_start_pts = state.chunk_start_pts;
- for stream in &mut state.streams {
- // Check if this stream is still filled enough now.
- self.check_stream_filled(
- &settings,
- stream,
- fragment_start_pts,
- chunk_start_pts,
- all_eos,
- );
- }
- }
- }
+ // Drain everything that can be drained at this point
+ self.drain(
+ &mut state,
+ &settings,
+ all_eos,
+ timeout,
+ &mut caps,
+ &mut buffers,
+ &mut upstream_events,
+ )
+ };
for (sinkpad, event) in upstream_events {
sinkpad.push_event(event);
@@ -2936,78 +3123,16 @@ impl AggregatorImpl for FMP4Mux {
self.obj().finish_buffer_list(buffer_list)?;
}
+ // If an error happened above while draining, return this now after pushing
+ // any output that was produced before the error.
+ res?;
+
if !all_eos {
return Ok(gst::FlowSuccess::Ok);
}
- // Do remaining EOS handling after the end of the stream was pushed.
- gst::debug!(CAT, imp: self, "Doing EOS handling");
-
- if settings.header_update_mode != super::HeaderUpdateMode::None {
- let updated_header =
- self.update_header(&mut self.state.lock().unwrap(), &settings, true);
- match updated_header {
- Ok(Some((buffer_list, caps))) => {
- match settings.header_update_mode {
- super::HeaderUpdateMode::None => unreachable!(),
- super::HeaderUpdateMode::Rewrite => {
- let mut q = gst::query::Seeking::new(gst::Format::Bytes);
- if self.obj().src_pad().peer_query(&mut q) && q.result().0 {
- let aggregator = self.obj();
-
- aggregator.set_src_caps(&caps);
-
- // Seek to the beginning with a default bytes segment
- aggregator
- .update_segment(
- &gst::FormattedSegment::<gst::format::Bytes>::new(),
- );
-
- if let Err(err) = aggregator.finish_buffer_list(buffer_list) {
- gst::error!(
- CAT,
- imp: self,
- "Failed pushing updated header buffer downstream: {:?}",
- err,
- );
- }
- } else {
- gst::error!(
- CAT,
- imp: self,
- "Can't rewrite header because downstream is not seekable"
- );
- }
- }
- super::HeaderUpdateMode::Update => {
- let aggregator = self.obj();
-
- aggregator.set_src_caps(&caps);
- if let Err(err) = aggregator.finish_buffer_list(buffer_list) {
- gst::error!(
- CAT,
- imp: self,
- "Failed pushing updated header buffer downstream: {:?}",
- err,
- );
- }
- }
- }
- }
- Ok(None) => {}
- Err(err) => {
- gst::error!(
- CAT,
- imp: self,
- "Failed to generate updated header: {:?}",
- err
- );
- }
- }
- }
-
- // Need to output new headers if started again after EOS
- self.state.lock().unwrap().sent_headers = false;
+ // Finish the stream.
+ self.finish(&settings);
Err(gst::FlowError::Eos)
}