Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/sdroege/gst-plugin-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/plugins/gst_plugins_cache.json14
-rw-r--r--mux/fmp4/src/fmp4mux/boxes.rs27
-rw-r--r--mux/fmp4/src/fmp4mux/imp.rs1224
-rw-r--r--mux/fmp4/src/fmp4mux/mod.rs3
-rw-r--r--mux/fmp4/tests/tests.rs384
5 files changed, 1293 insertions, 359 deletions
diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json
index 52857746..c1434941 100644
--- a/docs/plugins/gst_plugins_cache.json
+++ b/docs/plugins/gst_plugins_cache.json
@@ -1728,6 +1728,20 @@
],
"kind": "object",
"properties": {
+ "chunk-duration": {
+ "blurb": "Duration for each FMP4 chunk (default = no chunks)",
+ "conditionally-available": false,
+ "construct": false,
+ "construct-only": false,
+ "controllable": false,
+ "default": "18446744073709551615",
+ "max": "18446744073709551615",
+ "min": "0",
+ "mutable": "ready",
+ "readable": true,
+ "type": "guint64",
+ "writable": true
+ },
"fragment-duration": {
"blurb": "Duration for each FMP4 fragment",
"conditionally-available": false,
diff --git a/mux/fmp4/src/fmp4mux/boxes.rs b/mux/fmp4/src/fmp4mux/boxes.rs
index 55ada48b..103b5098 100644
--- a/mux/fmp4/src/fmp4mux/boxes.rs
+++ b/mux/fmp4/src/fmp4mux/boxes.rs
@@ -1573,19 +1573,22 @@ pub(super) fn create_fmp4_fragment_header(
) -> Result<(gst::Buffer, u64), Error> {
let mut v = vec![];
- let (brand, compatible_brands) =
- brands_from_variant_and_caps(cfg.variant, cfg.streams.iter().map(|s| &s.caps));
-
- write_box(&mut v, b"styp", |v| {
- // major brand
- v.extend(brand);
- // minor version
- v.extend(0u32.to_be_bytes());
- // compatible brands
- v.extend(compatible_brands.into_iter().flatten());
+ // Don't write a `styp` if this is only a chunk.
+ if !cfg.chunk {
+ let (brand, compatible_brands) =
+ brands_from_variant_and_caps(cfg.variant, cfg.streams.iter().map(|s| &s.caps));
+
+ write_box(&mut v, b"styp", |v| {
+ // major brand
+ v.extend(brand);
+ // minor version
+ v.extend(0u32.to_be_bytes());
+ // compatible brands
+ v.extend(compatible_brands.into_iter().flatten());
- Ok(())
- })?;
+ Ok(())
+ })?;
+ }
let styp_len = v.len();
diff --git a/mux/fmp4/src/fmp4mux/imp.rs b/mux/fmp4/src/fmp4mux/imp.rs
index db76d4bc..88c84b14 100644
--- a/mux/fmp4/src/fmp4mux/imp.rs
+++ b/mux/fmp4/src/fmp4mux/imp.rs
@@ -13,6 +13,7 @@ use gst_base::prelude::*;
use gst_base::subclass::prelude::*;
use std::collections::VecDeque;
+use std::mem;
use std::sync::Mutex;
use once_cell::sync::Lazy;
@@ -94,6 +95,7 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
});
const DEFAULT_FRAGMENT_DURATION: gst::ClockTime = gst::ClockTime::from_seconds(10);
+const DEFAULT_CHUNK_DURATION: Option<gst::ClockTime> = gst::ClockTime::NONE;
const DEFAULT_HEADER_UPDATE_MODE: super::HeaderUpdateMode = super::HeaderUpdateMode::None;
const DEFAULT_WRITE_MFRA: bool = false;
const DEFAULT_WRITE_MEHD: bool = false;
@@ -103,6 +105,7 @@ const DEFAULT_INTERLEAVE_TIME: Option<gst::ClockTime> = Some(gst::ClockTime::fro
#[derive(Debug, Clone)]
struct Settings {
fragment_duration: gst::ClockTime,
+ chunk_duration: Option<gst::ClockTime>,
header_update_mode: super::HeaderUpdateMode,
write_mfra: bool,
write_mehd: bool,
@@ -116,6 +119,7 @@ impl Default for Settings {
fn default() -> Self {
Settings {
fragment_duration: DEFAULT_FRAGMENT_DURATION,
+ chunk_duration: DEFAULT_CHUNK_DURATION,
header_update_mode: DEFAULT_HEADER_UPDATE_MODE,
write_mfra: DEFAULT_WRITE_MFRA,
write_mehd: DEFAULT_WRITE_MEHD,
@@ -159,6 +163,7 @@ struct PreQueuedBuffer {
struct GopBuffer {
buffer: gst::Buffer,
pts: gst::ClockTime,
+ pts_position: gst::ClockTime,
dts: Option<gst::ClockTime>,
}
@@ -205,6 +210,8 @@ struct Stream {
queued_gops: VecDeque<Gop>,
/// Whether the fully queued GOPs are filling a whole fragment.
fragment_filled: bool,
+ /// Whether a whole chunk is queued.
+ chunk_filled: bool,
/// Difference between the first DTS and 0 in case of negative DTS
dts_offset: Option<gst::ClockTime>,
@@ -244,6 +251,11 @@ struct State {
/// Start PTS of the current fragment
fragment_start_pts: Option<gst::ClockTime>,
+ /// Start PTS of the current chunk
+ ///
+ /// This is equal to `fragment_start_pts` if the current chunk is the first of a fragment,
+ /// and always equal to `fragment_start_pts` if no `chunk_duration` is set.
+ chunk_start_pts: Option<gst::ClockTime>,
/// Additional timeout delay in case GOPs are bigger than the fragment duration
timeout_delay: gst::ClockTime,
@@ -618,6 +630,11 @@ impl FMP4Mux {
continue;
}
+ if stream.chunk_filled {
+ gst::trace!(CAT, obj: stream.sinkpad, "Stream has current chunk filled");
+ continue;
+ }
+
gst::trace!(CAT, obj: stream.sinkpad, "Stream has running time PTS {} / DTS {} queued", pre_queued_buffer.pts, pre_queued_buffer.dts.display());
let running_time = if stream.delta_frames.requires_dts() {
@@ -783,7 +800,12 @@ impl FMP4Mux {
end_pts,
end_dts,
final_end_pts: false,
- buffers: vec![GopBuffer { buffer, pts, dts }],
+ buffers: vec![GopBuffer {
+ buffer,
+ pts,
+ pts_position,
+ dts,
+ }],
};
stream.queued_gops.push_front(gop);
@@ -827,7 +849,12 @@ impl FMP4Mux {
gop.end_pts = std::cmp::max(gop.end_pts, end_pts);
gop.end_dts = gop.end_dts.opt_max(end_dts);
- gop.buffers.push(GopBuffer { buffer, pts, dts });
+ gop.buffers.push(GopBuffer {
+ buffer,
+ pts,
+ pts_position,
+ dts,
+ });
if delta_frames.requires_dts() {
let dts = dts.unwrap();
@@ -910,6 +937,255 @@ impl FMP4Mux {
Ok(())
}
+ fn check_stream_filled(
+ &self,
+ settings: &Settings,
+ stream: &mut Stream,
+ fragment_start_pts: Option<gst::ClockTime>,
+ chunk_start_pts: Option<gst::ClockTime>,
+ all_eos: bool,
+ ) {
+ // Either both are none or neither
+ let (chunk_start_pts, fragment_start_pts) = match (chunk_start_pts, fragment_start_pts) {
+ (Some(chunk_start_pts), Some(fragment_start_pts)) => {
+ (chunk_start_pts, fragment_start_pts)
+ }
+ _ => return,
+ };
+
+ // Check if this stream is filled enough now.
+ if let Some(chunk_duration) = settings.chunk_duration {
+ // In chunk mode
+ let (gop_idx, gop) = match stream
+ .queued_gops
+ .iter()
+ .enumerate()
+ .find(|(_idx, gop)| gop.final_earliest_pts || all_eos || stream.sinkpad.is_eos())
+ {
+ Some(res) => res,
+ None => {
+ gst::trace!(CAT, obj: stream.sinkpad, "Chunked mode but no GOP with final earliest PTS known yet");
+ return;
+ }
+ };
+
+ gst::trace!(
+ CAT,
+ obj: stream.sinkpad,
+ "GOP {gop_idx} start PTS {}, GOP end PTS {} (final {})",
+ gop.start_pts,
+ gop.end_pts,
+ gop.final_end_pts || all_eos || stream.sinkpad.is_eos(),
+ );
+ gst::trace!(
+ CAT,
+ obj: stream.sinkpad,
+ "Current chunk start {}, current fragment start {}",
+ chunk_start_pts,
+ fragment_start_pts,
+ );
+
+ let chunk_end_pts = chunk_start_pts + chunk_duration;
+ let fragment_end_pts = fragment_start_pts + settings.fragment_duration;
+
+ gst::trace!(
+ CAT,
+ obj: stream.sinkpad,
+ "Current chunk end {}, current fragment end {}",
+ chunk_end_pts,
+ fragment_end_pts,
+ );
+
+ // First check if the next split should be the end of a fragment or the end of a chunk.
+ // If both are the same then a fragment split has preference.
+ if fragment_end_pts <= chunk_end_pts && gop.start_pts >= fragment_end_pts {
+ gst::debug!(CAT, obj: stream.sinkpad, "Stream queued enough data for finishing this fragment");
+ stream.fragment_filled = true;
+ } else if chunk_end_pts < fragment_end_pts {
+ let last_pts = gop.buffers.last().map(|b| b.pts);
+
+ if gop.end_pts >= chunk_end_pts
+ // only if there's another GOP or at least one further buffer
+ && (gop_idx > 0
+ || last_pts.map_or(false, |last_pts| last_pts.saturating_sub(chunk_start_pts) > chunk_duration))
+ {
+ gst::debug!(CAT, obj: stream.sinkpad, "Stream queued enough data for this chunk");
+ stream.chunk_filled = true;
+ }
+ }
+ } else {
+ let gop = match stream
+ .queued_gops
+ .iter()
+ .find(|gop| gop.final_end_pts || all_eos || stream.sinkpad.is_eos())
+ {
+ Some(gop) => gop,
+ None => {
+ gst::trace!(CAT, obj: stream.sinkpad, "Fragment mode but no GOP with final end PTS known yet");
+ return;
+ }
+ };
+
+ gst::trace!(
+ CAT,
+ obj: stream.sinkpad,
+ "GOP start PTS {}, GOP end PTS {}",
+ gop.start_pts,
+ gop.end_pts,
+ );
+
+ // Check if the end of the latest finalized GOP is after the fragment end
+ let fragment_end_pts = fragment_start_pts + settings.fragment_duration;
+ gst::trace!(
+ CAT,
+ obj: stream.sinkpad,
+ "Current fragment start{}, current fragment end {}",
+ fragment_start_pts,
+ fragment_start_pts + settings.fragment_duration,
+ );
+
+ if gop.end_pts >= fragment_end_pts {
+ gst::debug!(CAT, obj: stream.sinkpad, "Stream queued enough data for this fragment");
+ stream.fragment_filled = true;
+ }
+ }
+ }
+
+ fn calculate_earliest_pts(
+ &self,
+ settings: &Settings,
+ state: &mut State,
+ upstream_events: &mut Vec<(super::FMP4MuxPad, gst::Event)>,
+ all_eos: bool,
+ timeout: bool,
+ ) {
+ if state.earliest_pts.is_some() {
+ return;
+ }
+
+ let fragment_start_pts = state.fragment_start_pts;
+ let chunk_start_pts = state.chunk_start_pts;
+
+ // Calculate the earliest PTS after queueing input if we can now.
+ let mut earliest_pts = None;
+ let mut start_dts = None;
+ for stream in &state.streams {
+ let (stream_earliest_pts, stream_start_dts) = match stream.queued_gops.back() {
+ None => {
+ if !all_eos && !timeout {
+ earliest_pts = None;
+ start_dts = None;
+ break;
+ }
+ continue;
+ }
+ Some(oldest_gop) => {
+ if !all_eos && !timeout && !oldest_gop.final_earliest_pts {
+ earliest_pts = None;
+ start_dts = None;
+ break;
+ }
+
+ (oldest_gop.earliest_pts, oldest_gop.start_dts)
+ }
+ };
+
+ if earliest_pts.opt_gt(stream_earliest_pts).unwrap_or(true) {
+ earliest_pts = Some(stream_earliest_pts);
+ }
+
+ if let Some(stream_start_dts) = stream_start_dts {
+ if start_dts.opt_gt(stream_start_dts).unwrap_or(true) {
+ start_dts = Some(stream_start_dts);
+ }
+ }
+ }
+
+ let earliest_pts = match earliest_pts {
+ Some(earliest_pts) => earliest_pts,
+ None => return,
+ };
+
+ // The earliest PTS is known and as such the start of the first and second fragment.
+ gst::info!(
+ CAT,
+ imp: self,
+ "Got earliest PTS {}, start DTS {} (timeout: {timeout}, all eos: {all_eos})",
+ earliest_pts,
+ start_dts.display()
+ );
+ state.earliest_pts = Some(earliest_pts);
+ state.start_dts = start_dts;
+ state.fragment_start_pts = Some(earliest_pts);
+ state.chunk_start_pts = Some(earliest_pts);
+
+ // Now send force-keyunit events for the second fragment start.
+ let fku_time = earliest_pts + settings.fragment_duration;
+ for stream in &state.streams {
+ let current_position = stream.current_position;
+
+ // In case of ONVIF this needs to be converted back from UTC time to
+ // the stream's running time
+ let (fku_time, current_position) =
+ if self.obj().class().as_ref().variant == super::Variant::ONVIF {
+ (
+ if let Some(fku_time) = utc_time_to_running_time(
+ fku_time,
+ stream.running_time_utc_time_mapping.unwrap(),
+ ) {
+ fku_time
+ } else {
+ continue;
+ },
+ utc_time_to_running_time(
+ current_position,
+ stream.running_time_utc_time_mapping.unwrap(),
+ ),
+ )
+ } else {
+ (fku_time, Some(current_position))
+ };
+
+ let fku_time =
+ if current_position.map_or(false, |current_position| current_position > fku_time) {
+ gst::warning!(
+ CAT,
+ obj: stream.sinkpad,
+ "Sending first force-keyunit event late for running time {} at {}",
+ fku_time,
+ current_position.display(),
+ );
+ None
+ } else {
+ gst::debug!(
+ CAT,
+ obj: stream.sinkpad,
+ "Sending first force-keyunit event for running time {}",
+ fku_time,
+ );
+ Some(fku_time)
+ };
+
+ let fku = gst_video::UpstreamForceKeyUnitEvent::builder()
+ .running_time(fku_time)
+ .all_headers(true)
+ .build();
+
+ upstream_events.push((stream.sinkpad.clone(), fku));
+ }
+
+ // Check if any of the streams are already filled enough for the first chunk/fragment.
+ for stream in &mut state.streams {
+ self.check_stream_filled(
+ settings,
+ stream,
+ fragment_start_pts,
+ chunk_start_pts,
+ all_eos,
+ );
+ }
+ }
+
#[allow(clippy::type_complexity)]
fn drain_buffers(
&self,
@@ -927,8 +1203,13 @@ impl FMP4Mux {
Option<gst::ClockTime>,
// Minimum start DTS position of all streams (if any stream has DTS)
Option<gst::ClockTime>,
- // End PTS of this drained fragment, i.e. start PTS of the next fragment
+ // End PTS of this drained fragment or chunk, i.e. start PTS of the next fragment or
+ // chunk
Option<gst::ClockTime>,
+ // With these drained buffers the current fragment is filled
+ bool,
+ // These buffers make the start of a new fragment
+ bool,
),
gst::FlowError,
> {
@@ -937,22 +1218,48 @@ impl FMP4Mux {
let mut min_earliest_pts_position = None;
let mut min_earliest_pts = None;
let mut min_start_dts_position = None;
- let mut fragment_end_pts = None;
+ let mut chunk_end_pts = None;
+ let mut fragment_start = false;
+
+ // In fragment mode, each chunk is a full fragment. Otherwise, in chunk mode,
+ // this fragment is filled if it is filled for the first non-EOS stream
+ let fragment_filled = settings.chunk_duration.is_none()
+ || state
+ .streams
+ .iter()
+ .find(|s| !s.sinkpad.is_eos())
+ .map(|s| s.fragment_filled)
+ == Some(true);
// The first stream decides how much can be dequeued, if anything at all.
//
- // All complete GOPs (or at EOS everything) up to the fragment duration will be dequeued
- // but on timeout in live pipelines it might happen that the first stream does not have a
- // complete GOP queued. In that case nothing is dequeued for any of the streams and the
- // timeout is advanced by 1s until at least one complete GOP can be dequeued.
+ // In chunk mode:
+ // If more than the fragment duration has passed until the latest GOPs earliest PTS then
+ // the fragment is considered filled and all GOPs until that GOP are drained. The next
+ // chunk would start a new fragment, and would start with the keyframe at the beginning
+ // of that latest GOP.
+ //
+ // Otherwise if more than a chunk duration is currently queued in GOPs of which the
+ // earliest PTS is known then drain everything up to that position. If nothing can be
+ // drained at all then advance the timeout by 1s until something can be dequeued.
+ //
+ // Otherwise:
+ // All complete GOPs (or at EOS everything) up to the fragment duration will be dequeued
+ // but on timeout in live pipelines it might happen that the first stream does not have a
+ // complete GOP queued. In that case nothing is dequeued for any of the streams and the
+ // timeout is advanced by 1s until at least one complete GOP can be dequeued.
//
// If the first stream is already EOS then the next stream that is not EOS yet will be
// taken in its place.
gst::info!(
CAT,
imp: self,
- "Starting to drain at {}",
- state.fragment_start_pts.display()
+ "Starting to drain at {} (fragment start {}, fragment end {}, chunk start {}, chunk end {})",
+ state.chunk_start_pts.display(),
+ state.fragment_start_pts.display(),
+ state.fragment_start_pts.map(|start| start + settings.fragment_duration).display(),
+ state.chunk_start_pts.display(),
+ Option::zip(state.chunk_start_pts, settings.chunk_duration).map(|(start, duration)| start + duration).display(),
);
for (idx, stream) in state.streams.iter_mut().enumerate() {
@@ -963,58 +1270,274 @@ impl FMP4Mux {
|| at_eos
|| stream.sinkpad.is_eos()
|| stream.queued_gops.get(1).map(|gop| gop.final_earliest_pts) == Some(true)
+ || settings.chunk_duration.is_some()
);
let mut gops = Vec::with_capacity(stream.queued_gops.len());
if !stream.queued_gops.is_empty() {
let fragment_start_pts = state.fragment_start_pts.unwrap();
+ let chunk_start_pts = state.chunk_start_pts.unwrap();
+
+ // For the first stream drain as much as necessary and decide the end of this
+ // fragment or chunk, for all other streams drain up to that position.
+ if let Some(chunk_duration) = settings.chunk_duration {
+ let dequeue_end_pts = if let Some(chunk_end_pts) = chunk_end_pts {
+ // Not the first stream
+ chunk_end_pts
+ } else if fragment_filled {
+ // Fragment is filled, so only dequeue everything until the latest GOP
+ fragment_start_pts + settings.fragment_duration
+ } else {
+ // Fragment is not filled and we either have a full chunk or timeout
+ chunk_start_pts + chunk_duration
+ };
- // Drain all complete GOPs until at most one fragment duration was dequeued for the
- // first stream, or until the dequeued duration of the first stream.
- let dequeue_end_pts =
- fragment_end_pts.unwrap_or(fragment_start_pts + settings.fragment_duration);
- gst::trace!(
- CAT,
- obj: stream.sinkpad,
- "Draining up to end PTS {} / duration {}",
- dequeue_end_pts,
- dequeue_end_pts - fragment_start_pts
- );
+ gst::trace!(
+ CAT,
+ obj: stream.sinkpad,
+ "Draining up to end PTS {} / duration {}",
+ dequeue_end_pts,
+ dequeue_end_pts - chunk_start_pts
+ );
- while let Some(gop) = stream.queued_gops.back() {
- // If this GOP is not complete then we can't pop it yet.
- //
- // If there was no complete GOP at all yet then it might be bigger than the
- // fragment duration. In this case we might not be able to handle the latency
- // requirements in a live pipeline.
- if !gop.final_end_pts && !at_eos && !stream.sinkpad.is_eos() {
- break;
+ while let Some(gop) = stream.queued_gops.back() {
+ // If this should be the last chunk of a fragment then only drain every
+ // finished GOP until the chunk end PTS. If there is no finished GOP for
+ // this stream (it would be not the first stream then), then drain
+ // everything up to the chunk end PTS.
+ //
+ // If this chunk is not the last chunk of a fragment then simply dequeue
+ // everything up to the chunk end PTS.
+ if fragment_filled {
+ gst::trace!(
+ CAT,
+ obj: stream.sinkpad,
+ "Fragment filled, current GOP start {} end {} (final {})",
+ gop.start_pts, gop.end_pts,
+ gop.final_end_pts || at_eos || stream.sinkpad.is_eos()
+ );
+
+ if (gop.final_end_pts || at_eos || stream.sinkpad.is_eos())
+ && gop.end_pts <= dequeue_end_pts
+ {
+ gst::trace!(
+ CAT,
+ obj: stream.sinkpad,
+ "Pushing whole GOP",
+ );
+ gops.push(stream.queued_gops.pop_back().unwrap());
+ continue;
+ }
+ if !gops.is_empty() {
+ break;
+ }
+
+ gst::error!(CAT, obj: stream.sinkpad, "Don't have a full GOP at the end of a fragment");
+ } else {
+ gst::trace!(
+ CAT,
+ obj: stream.sinkpad,
+ "Chunk filled, current GOP start {} end {} (final {})",
+ gop.start_pts, gop.end_pts,
+ gop.final_end_pts || at_eos || stream.sinkpad.is_eos()
+ );
+ }
+
+ if gop.end_pts <= dequeue_end_pts
+ && (gop.final_end_pts || at_eos || stream.sinkpad.is_eos())
+ {
+ gst::trace!(
+ CAT,
+ obj: stream.sinkpad,
+ "Pushing whole GOP",
+ );
+ gops.push(stream.queued_gops.pop_back().unwrap());
+ } else if gop.start_pts >= dequeue_end_pts
+ || (!gop.final_earliest_pts && !at_eos && !stream.sinkpad.is_eos())
+ {
+ gst::trace!(
+ CAT,
+ obj: stream.sinkpad,
+ "GOP starts after chunk end",
+ );
+ break;
+ } else {
+ let gop = stream.queued_gops.back_mut().unwrap();
+
+ let start_pts = gop.start_pts;
+ let start_dts = gop.start_dts;
+ let earliest_pts = gop.earliest_pts;
+ let earliest_pts_position = gop.earliest_pts_position;
+
+ let mut split_index = None;
+
+ for (idx, buffer) in gop.buffers.iter().enumerate() {
+ if buffer.pts >= dequeue_end_pts {
+ break;
+ }
+ split_index = Some(idx);
+ }
+ let split_index = match split_index {
+ Some(split_index) => split_index,
+ None => {
+ // We have B frames and the first buffer of this GOP is too far
+ // in the future.
+ gst::trace!(
+ CAT,
+ obj: stream.sinkpad,
+ "First buffer of GOP too far in the future",
+ );
+ break;
+ }
+ };
+
+ // The last buffer of the GOP starts before the chunk end but ends
+ // after the end. We still take it here and remove the whole GOP.
+ if split_index == gop.buffers.len() - 1 {
+ if gop.final_end_pts || at_eos || stream.sinkpad.is_eos() {
+ gst::trace!(
+ CAT,
+ obj: stream.sinkpad,
+ "Pushing whole GOP",
+ );
+ gops.push(stream.queued_gops.pop_back().unwrap());
+ } else {
+ gst::trace!(
+ CAT,
+ obj: stream.sinkpad,
+ "Can't push whole GOP as it's not final yet",
+ );
+ }
+ break;
+ }
+
+ let mut buffers = mem::take(&mut gop.buffers);
+ // Contains all buffers from `split_index + 1` to the end
+ gop.buffers = buffers.split_off(split_index + 1);
+
+ gop.start_pts = gop.buffers[0].pts;
+ gop.start_dts = gop.buffers[0].dts;
+ gop.earliest_pts_position = gop.buffers[0].pts_position;
+ gop.earliest_pts = gop.buffers[0].pts;
+
+ gst::trace!(
+ CAT,
+ obj: stream.sinkpad,
+ "Splitting GOP and keeping PTS {}",
+ gop.buffers[0].pts,
+ );
+
+ let queue_gop = Gop {
+ start_pts,
+ start_dts,
+ earliest_pts,
+ final_earliest_pts: true,
+ end_pts: gop.start_pts,
+ final_end_pts: true,
+ end_dts: gop.start_dts,
+ earliest_pts_position,
+ buffers,
+ };
+
+ gops.push(queue_gop);
+ break;
+ }
}
- // If this GOP starts after the fragment end then don't dequeue it yet unless this is
- // the first stream and no GOPs were dequeued at all yet. This would mean that the
- // GOP is bigger than the fragment duration.
- if !at_eos
- && gop.end_pts > dequeue_end_pts
- && (fragment_end_pts.is_some() || !gops.is_empty())
- {
- break;
+ fragment_start = fragment_start_pts == chunk_start_pts;
+ if fragment_start {
+ if let Some(first_buffer) = gops.first().and_then(|gop| gop.buffers.first())
+ {
+ if first_buffer
+ .buffer
+ .flags()
+ .contains(gst::BufferFlags::DELTA_UNIT)
+ {
+ gst::error!(CAT, obj: stream.sinkpad, "First buffer of a new fragment is not a keyframe");
+ }
+ }
+ }
+ } else {
+ let dequeue_end_pts = if let Some(chunk_end_pts) = chunk_end_pts {
+ // Not the first stream
+ chunk_end_pts
+ } else {
+ fragment_start_pts + settings.fragment_duration
+ };
+
+ gst::trace!(
+ CAT,
+ obj: stream.sinkpad,
+ "Draining up to end PTS {} / duration {}",
+ dequeue_end_pts,
+ dequeue_end_pts - chunk_start_pts
+ );
+
+ while let Some(gop) = stream.queued_gops.back() {
+ gst::trace!(
+ CAT,
+ obj: stream.sinkpad,
+ "Current GOP start {} end {} (final {})",
+ gop.start_pts, gop.end_pts,
+ gop.final_end_pts || at_eos || stream.sinkpad.is_eos()
+ );
+
+ // If this GOP is not complete then we can't pop it yet.
+ //
+ // If there was no complete GOP at all yet then it might be bigger than the
+ // fragment duration. In this case we might not be able to handle the latency
+ // requirements in a live pipeline.
+ if !gop.final_end_pts && !at_eos && !stream.sinkpad.is_eos() {
+ gst::trace!(
+ CAT,
+ obj: stream.sinkpad,
+ "Not including GOP without final end PTS",
+ );
+ break;
+ }
+
+ // If this GOP starts after the fragment end then don't dequeue it yet unless this is
+ // the first stream and no GOPs were dequeued at all yet. This would mean that the
+ // GOP is bigger than the fragment duration.
+ if !at_eos
+ && gop.end_pts > dequeue_end_pts
+ && (chunk_end_pts.is_some() || !gops.is_empty())
+ {
+ gst::trace!(
+ CAT,
+ obj: stream.sinkpad,
+ "Not including GOP yet",
+ );
+ break;
+ }
+
+ gst::trace!(
+ CAT,
+ obj: stream.sinkpad,
+ "Pushing complete GOP",
+ );
+ gops.push(stream.queued_gops.pop_back().unwrap());
}
- gops.push(stream.queued_gops.pop_back().unwrap());
+ // If we don't have a next chunk start PTS then this is the first stream as above.
+ if chunk_end_pts.is_none() {
+ // In fragment mode, each chunk is a full fragment
+ fragment_start = true;
+ }
}
}
stream.fragment_filled = false;
+ stream.chunk_filled = false;
- // If we don't have a next fragment start PTS then this is the first stream as above.
- if fragment_end_pts.is_none() {
+ // If we don't have a next chunk start PTS then this is the first stream as above.
+ if chunk_end_pts.is_none() {
if let Some(last_gop) = gops.last() {
// Dequeued something so let's take the end PTS of the last GOP
- fragment_end_pts = Some(last_gop.end_pts);
+ chunk_end_pts = Some(last_gop.end_pts);
gst::info!(
CAT,
obj: stream.sinkpad,
- "Draining up to PTS {} for this fragment",
+ "Draining up to PTS {} for this chunk",
last_gop.end_pts,
);
} else {
@@ -1026,11 +1549,19 @@ impl FMP4Mux {
// Otherwise this can only really happen on timeout in live pipelines.
assert!(timeout);
- gst::warning!(
- CAT,
- obj: stream.sinkpad,
- "Don't have a complete GOP for the first stream on timeout in a live pipeline",
- );
+ if settings.chunk_duration.is_some() {
+ gst::warning!(
+ CAT,
+ obj: stream.sinkpad,
+ "Don't have anything to drain for the first stream on timeout in a live pipeline",
+ );
+ } else {
+ gst::warning!(
+ CAT,
+ obj: stream.sinkpad,
+ "Don't have a complete GOP for the first stream on timeout in a live pipeline",
+ );
+ }
// In this case we advance the timeout by 1s and hope that things are
// better then.
@@ -1039,10 +1570,9 @@ impl FMP4Mux {
}
} else if at_eos {
if let Some(last_gop) = gops.last() {
- if fragment_end_pts
- .map_or(true, |fragment_end_pts| fragment_end_pts < last_gop.end_pts)
+ if chunk_end_pts.map_or(true, |chunk_end_pts| chunk_end_pts < last_gop.end_pts)
{
- fragment_end_pts = Some(last_gop.end_pts);
+ chunk_end_pts = Some(last_gop.end_pts);
}
}
}
@@ -1067,7 +1597,7 @@ impl FMP4Mux {
continue;
}
- assert!(fragment_end_pts.is_some());
+ assert!(chunk_end_pts.is_some());
if let Some((prev_gop, first_gop)) = Option::zip(
stream.queued_gops.iter().find(|gop| gop.final_end_pts),
@@ -1112,7 +1642,7 @@ impl FMP4Mux {
while let Some(buffer) = gop_buffers.next() {
// If this is a GAP buffer then skip it. Its duration was already considered
// below for the non-GAP buffer preceding it, and if there was none then the
- // fragment start would be adjusted accordingly for this stream.
+ // chunk start would be adjusted accordingly for this stream.
if buffer.buffer.flags().contains(gst::BufferFlags::GAP)
&& buffer.buffer.flags().contains(gst::BufferFlags::DROPPABLE)
&& buffer.buffer.size() == 0
@@ -1278,7 +1808,9 @@ impl FMP4Mux {
min_earliest_pts_position,
min_earliest_pts,
min_start_dts_position,
- fragment_end_pts,
+ chunk_end_pts,
+ fragment_filled,
+ fragment_start,
))
}
@@ -1349,6 +1881,9 @@ impl FMP4Mux {
Ok((interleaved_buffers, streams))
}
+ /// Fills upstream events as needed and returns the caps the first time draining can happen.
+ ///
+ /// If it returns `(_, None)` then there's currently nothing to drain anymore.
fn drain(
&self,
state: &mut State,
@@ -1363,10 +1898,15 @@ impl FMP4Mux {
gst::info!(CAT, imp: self, "Draining at timeout");
} else {
for stream in &state.streams {
- if !stream.fragment_filled && !stream.sinkpad.is_eos() {
+ if !stream.chunk_filled && !stream.fragment_filled && !stream.sinkpad.is_eos() {
return Ok((None, None));
}
}
+ gst::info!(
+ CAT,
+ imp: self,
+ "Draining because all streams have enough data queued"
+ );
}
// Collect all buffers and their timing information that are to be drained right now.
@@ -1375,7 +1915,9 @@ impl FMP4Mux {
min_earliest_pts_position,
min_earliest_pts,
min_start_dts_position,
- fragment_end_pts,
+ chunk_end_pts,
+ fragment_filled,
+ fragment_start,
) = self.drain_buffers(state, settings, timeout, at_eos)?;
// Create header now if it was not created before and return the caps
@@ -1414,7 +1956,7 @@ impl FMP4Mux {
// bufferlist for all buffers that have to be output.
let min_earliest_pts_position = min_earliest_pts_position.unwrap();
let min_earliest_pts = min_earliest_pts.unwrap();
- let fragment_end_pts = fragment_end_pts.unwrap();
+ let chunk_end_pts = chunk_end_pts.unwrap();
let mut fmp4_header = None;
if !state.sent_headers {
@@ -1441,11 +1983,16 @@ impl FMP4Mux {
state.sequence_number = 1;
}
let sequence_number = state.sequence_number;
- state.sequence_number += 1;
+ // If this is the last chunk of a fragment then increment the sequence number for the
+ // start of the next fragment.
+ if fragment_filled {
+ state.sequence_number += 1;
+ }
let (mut fmp4_fragment_header, moof_offset) =
boxes::create_fmp4_fragment_header(super::FragmentHeaderConfiguration {
variant: self.obj().class().as_ref().variant,
sequence_number,
+ chunk: !fragment_start,
streams: streams.as_slice(),
buffers: interleaved_buffers.as_slice(),
})
@@ -1463,10 +2010,14 @@ impl FMP4Mux {
let buffer = fmp4_fragment_header.get_mut().unwrap();
buffer.set_pts(min_earliest_pts_position);
buffer.set_dts(min_start_dts_position);
- buffer.set_duration(fragment_end_pts.checked_sub(min_earliest_pts));
+ buffer.set_duration(chunk_end_pts.checked_sub(min_earliest_pts));
- // Fragment header is HEADER
+ // Fragment and chunk header is HEADER
buffer.set_flags(gst::BufferFlags::HEADER);
+ // Chunk header is DELTA_UNIT
+ if !fragment_start {
+ buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
+ }
// Copy metas from the first actual buffer to the fragment header. This allows
// getting things like the reference timestamp meta or the timecode meta to identify
@@ -1507,9 +2058,9 @@ impl FMP4Mux {
.collect::<gst::BufferList>(),
);
- if settings.write_mfra {
- // Write mfra only for the main stream, and if there are no buffers for the main stream
- // in this segment then don't write anything.
+ if settings.write_mfra && fragment_start {
+ // Write mfra only for the main stream on fragment starts, and if there are no
+ // buffers for the main stream in this segment then don't write anything.
if let Some(super::FragmentHeaderStream {
start_time: Some(start_time),
..
@@ -1522,98 +2073,82 @@ impl FMP4Mux {
}
}
- state.end_pts = Some(fragment_end_pts);
+ state.end_pts = Some(chunk_end_pts);
- // Update for the start PTS of the next fragment
- gst::info!(
- CAT,
- imp: self,
- "Starting new fragment at {}",
- fragment_end_pts,
- );
- state.fragment_start_pts = Some(fragment_end_pts);
+ // Update for the start PTS of the next fragment / chunk
- let fku_time = fragment_end_pts + settings.fragment_duration;
+ if fragment_filled {
+ state.fragment_start_pts = Some(chunk_end_pts);
+ gst::info!(CAT, imp: self, "Starting new fragment at {}", chunk_end_pts,);
+ } else {
+ gst::info!(CAT, imp: self, "Starting new chunk at {}", chunk_end_pts,);
+ }
+ state.chunk_start_pts = Some(chunk_end_pts);
- for stream in &state.streams {
- let current_position = stream.current_position;
-
- // In case of ONVIF this needs to be converted back from UTC time to
- // the stream's running time
- let (fku_time, current_position) =
- if self.obj().class().as_ref().variant == super::Variant::ONVIF {
- (
- if let Some(fku_time) = utc_time_to_running_time(
- fku_time,
- stream.running_time_utc_time_mapping.unwrap(),
- ) {
- fku_time
- } else {
- continue;
- },
- utc_time_to_running_time(
- current_position,
- stream.running_time_utc_time_mapping.unwrap(),
- ),
- )
+ // If the current fragment is filled we already have the next fragment's start
+ // keyframe and can request the following one.
+ if fragment_filled {
+ let fku_time = chunk_end_pts + settings.fragment_duration;
+
+ for stream in &state.streams {
+ let current_position = stream.current_position;
+
+ // In case of ONVIF this needs to be converted back from UTC time to
+ // the stream's running time
+ let (fku_time, current_position) =
+ if self.obj().class().as_ref().variant == super::Variant::ONVIF {
+ (
+ if let Some(fku_time) = utc_time_to_running_time(
+ fku_time,
+ stream.running_time_utc_time_mapping.unwrap(),
+ ) {
+ fku_time
+ } else {
+ continue;
+ },
+ utc_time_to_running_time(
+ current_position,
+ stream.running_time_utc_time_mapping.unwrap(),
+ ),
+ )
+ } else {
+ (fku_time, Some(current_position))
+ };
+
+ let fku_time = if current_position
+ .map_or(false, |current_position| current_position > fku_time)
+ {
+ gst::warning!(
+ CAT,
+ obj: stream.sinkpad,
+ "Sending force-keyunit event late for running time {} at {}",
+ fku_time,
+ current_position.display(),
+ );
+ None
} else {
- (fku_time, Some(current_position))
+ gst::debug!(
+ CAT,
+ obj: stream.sinkpad,
+ "Sending force-keyunit event for running time {}",
+ fku_time,
+ );
+ Some(fku_time)
};
- let fku_time = if current_position
- .map_or(false, |current_position| current_position > fku_time)
- {
- gst::warning!(
- CAT,
- obj: stream.sinkpad,
- "Sending force-keyunit event late for running time {} at {}",
- fku_time,
- current_position.display(),
- );
- None
- } else {
- gst::debug!(
- CAT,
- obj: stream.sinkpad,
- "Sending force-keyunit event for running time {}",
- fku_time,
- );
- Some(fku_time)
- };
-
- let fku = gst_video::UpstreamForceKeyUnitEvent::builder()
- .running_time(fku_time)
- .all_headers(true)
- .build();
+ let fku = gst_video::UpstreamForceKeyUnitEvent::builder()
+ .running_time(fku_time)
+ .all_headers(true)
+ .build();
- upstream_events.push((stream.sinkpad.clone(), fku));
+ upstream_events.push((stream.sinkpad.clone(), fku));
+ }
}
- // Reset timeout delay now that we've output an actual fragment
+ // Reset timeout delay now that we've output an actual fragment or chunk
state.timeout_delay = gst::ClockTime::ZERO;
}
- if settings.write_mfra && at_eos {
- gst::debug!(CAT, imp: self, "Writing mfra box");
- match boxes::create_mfra(&streams[0].caps, &state.fragment_offsets) {
- Ok(mut mfra) => {
- {
- let mfra = mfra.get_mut().unwrap();
- // mfra is HEADER|DELTA_UNIT like other boxes
- mfra.set_flags(gst::BufferFlags::HEADER | gst::BufferFlags::DELTA_UNIT);
- }
-
- if buffer_list.is_none() {
- buffer_list = Some(gst::BufferList::new_sized(1));
- }
- buffer_list.as_mut().unwrap().get_mut().unwrap().add(mfra);
- }
- Err(err) => {
- gst::error!(CAT, imp: self, "Failed to create mfra box: {}", err);
- }
- }
- }
-
// TODO: Write edit list at EOS
// TODO: Rewrite bitrates at EOS
@@ -1690,6 +2225,7 @@ impl FMP4Mux {
pre_queue: VecDeque::new(),
queued_gops: VecDeque::new(),
fragment_filled: false,
+ chunk_filled: false,
dts_offset: None,
current_position: gst::ClockTime::ZERO,
running_time_utc_time_mapping: None,
@@ -1825,13 +2361,18 @@ impl ObjectImpl for FMP4Mux {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![
- // TODO: Add chunk-duration property separate from fragment-size
glib::ParamSpecUInt64::builder("fragment-duration")
.nick("Fragment Duration")
.blurb("Duration for each FMP4 fragment")
.default_value(DEFAULT_FRAGMENT_DURATION.nseconds())
.mutable_ready()
.build(),
+ glib::ParamSpecUInt64::builder("chunk-duration")
+ .nick("Chunk Duration")
+ .blurb("Duration for each FMP4 chunk (default = no chunks)")
+ .default_value(u64::MAX)
+ .mutable_ready()
+ .build(),
glib::ParamSpecEnum::builder_with_default("header-update-mode", DEFAULT_HEADER_UPDATE_MODE)
.nick("Header update mode")
.blurb("Mode for updating the header at the end of the stream")
@@ -1879,8 +2420,24 @@ impl ObjectImpl for FMP4Mux {
let fragment_duration = value.get().expect("type checked upstream");
if settings.fragment_duration != fragment_duration {
settings.fragment_duration = fragment_duration;
+ let latency = settings
+ .chunk_duration
+ .unwrap_or(settings.fragment_duration);
+ drop(settings);
+ self.obj().set_latency(latency, None);
+ }
+ }
+
+ "chunk-duration" => {
+ let mut settings = self.settings.lock().unwrap();
+ let chunk_duration = value.get().expect("type checked upstream");
+ if settings.chunk_duration != chunk_duration {
+ settings.chunk_duration = chunk_duration;
+ let latency = settings
+ .chunk_duration
+ .unwrap_or(settings.fragment_duration);
drop(settings);
- self.obj().set_latency(fragment_duration, None);
+ self.obj().set_latency(latency, None);
}
}
@@ -1931,6 +2488,11 @@ impl ObjectImpl for FMP4Mux {
settings.fragment_duration.to_value()
}
+ "chunk-duration" => {
+ let settings = self.settings.lock().unwrap();
+ settings.chunk_duration.to_value()
+ }
+
"header-update-mode" => {
let settings = self.settings.lock().unwrap();
settings.header_update_mode.to_value()
@@ -2012,7 +2574,7 @@ impl ElementImpl for FMP4Mux {
impl AggregatorImpl for FMP4Mux {
fn next_time(&self) -> Option<gst::ClockTime> {
let state = self.state.lock().unwrap();
- state.fragment_start_pts.opt_add(state.timeout_delay)
+ state.chunk_start_pts.opt_add(state.timeout_delay)
}
fn sink_query(
@@ -2194,7 +2756,9 @@ impl AggregatorImpl for FMP4Mux {
let mut upstream_events = vec![];
let all_eos;
- let (caps, buffers) = {
+ let mut caps = None;
+ let mut buffers = vec![];
+ {
let mut state = self.state.lock().unwrap();
// Create streams
@@ -2207,6 +2771,7 @@ impl AggregatorImpl for FMP4Mux {
// Always take a buffer from the stream with the earliest queued buffer to keep the
// fill-level at all sinkpads in sync.
let fragment_start_pts = state.fragment_start_pts;
+ let chunk_start_pts = state.chunk_start_pts;
while let Some((idx, stream)) =
self.find_earliest_stream(&mut state, timeout, settings.fragment_duration)?
@@ -2222,176 +2787,140 @@ impl AggregatorImpl for FMP4Mux {
self.queue_gops(idx, stream, pre_queued_buffer)?;
// Check if this stream is filled enough now.
- if let Some((queued_end_pts, fragment_start_pts)) = Option::zip(
- stream
- .queued_gops
- .iter()
- .find(|gop| gop.final_end_pts)
- .map(|gop| gop.end_pts),
+ self.check_stream_filled(
+ &settings,
+ stream,
fragment_start_pts,
- ) {
- if queued_end_pts.saturating_sub(fragment_start_pts)
- >= settings.fragment_duration
- {
- gst::debug!(CAT, obj: stream.sinkpad, "Stream queued enough data for this fragment");
- stream.fragment_filled = true;
- }
- }
+ chunk_start_pts,
+ false,
+ );
}
all_eos = state.streams.iter().all(|stream| stream.sinkpad.is_eos());
if all_eos {
gst::debug!(CAT, imp: self, "All streams are EOS now");
- }
- // Calculate the earliest PTS after queueing input if we can now.
- if state.earliest_pts.is_none() {
- let mut earliest_pts = None;
- let mut start_dts = None;
+ for stream in &mut state.streams {
+ // Check if this stream is filled enough now that everything is EOS.
+ self.check_stream_filled(
+ &settings,
+ stream,
+ fragment_start_pts,
+ chunk_start_pts,
+ true,
+ );
+ }
+ }
- for stream in &state.streams {
- let (stream_earliest_pts, stream_start_dts) = match stream.queued_gops.back() {
- None => {
- if !all_eos && !timeout {
- earliest_pts = None;
- start_dts = None;
- break;
- }
- continue;
- }
- Some(oldest_gop) => {
- if !all_eos && !timeout && !oldest_gop.final_earliest_pts {
- earliest_pts = None;
- start_dts = None;
- break;
- }
+ // Calculate the earliest PTS, i.e. the start of the first fragment, if not known yet.
+ self.calculate_earliest_pts(
+ &settings,
+ &mut state,
+ &mut upstream_events,
+ all_eos,
+ timeout,
+ );
- (oldest_gop.earliest_pts, oldest_gop.start_dts)
+ // Loop as long as new chunks can be drained.
+ // Only the first iteration is considered a timeout.
+ let mut timeout = timeout;
+ loop {
+ // If enough GOPs were queued, drain and create the output fragment or chunk
+ let res = self.drain(
+ &mut state,
+ &settings,
+ timeout,
+ all_eos,
+ &mut upstream_events,
+ );
+ let mut buffer_list = match res {
+ Ok((new_caps, buffer_list)) => {
+ if caps.is_none() {
+ caps = new_caps;
}
- };
- if earliest_pts.opt_gt(stream_earliest_pts).unwrap_or(true) {
- earliest_pts = Some(stream_earliest_pts);
+ buffer_list
}
+ Err(err) => {
+ if err == gst_base::AGGREGATOR_FLOW_NEED_DATA {
+ assert!(!all_eos);
+ assert!(timeout);
+ gst::element_imp_warning!(
+ self,
+ gst::StreamError::Format,
+ ["Longer GOPs than fragment duration"]
+ );
+ state.timeout_delay += 1.seconds();
+ }
- if let Some(stream_start_dts) = stream_start_dts {
- if start_dts.opt_gt(stream_start_dts).unwrap_or(true) {
- start_dts = Some(stream_start_dts);
+ // Although we had an error, push out everything that was produced so far
+ drop(state);
+ for (sinkpad, event) in upstream_events {
+ sinkpad.push_event(event);
}
- }
- }
- if let Some(earliest_pts) = earliest_pts {
- gst::info!(
- CAT,
- imp: self,
- "Got earliest PTS {}, start DTS {} (timeout: {timeout}, all eos: {all_eos})",
- earliest_pts,
- start_dts.display()
- );
- state.earliest_pts = Some(earliest_pts);
- state.start_dts = start_dts;
- state.fragment_start_pts = Some(earliest_pts);
-
- let fku_time = earliest_pts + settings.fragment_duration;
-
- for stream in &mut state.streams {
- let current_position = stream.current_position;
-
- // In case of ONVIF this needs to be converted back from UTC time to
- // the stream's running time
- let (fku_time, current_position) =
- if self.obj().class().as_ref().variant == super::Variant::ONVIF {
- (
- if let Some(fku_time) = utc_time_to_running_time(
- fku_time,
- stream.running_time_utc_time_mapping.unwrap(),
- ) {
- fku_time
- } else {
- continue;
- },
- utc_time_to_running_time(
- current_position,
- stream.running_time_utc_time_mapping.unwrap(),
- ),
- )
- } else {
- (fku_time, Some(current_position))
- };
+ if let Some(caps) = caps {
+ gst::debug!(CAT, imp: self, "Setting caps on source pad: {:?}", caps);
+ self.obj().set_src_caps(&caps);
+ }
- let fku_time = if current_position
- .map_or(false, |current_position| current_position > fku_time)
- {
- gst::warning!(
- CAT,
- obj: stream.sinkpad,
- "Sending first force-keyunit event late for running time {} at {}",
- fku_time,
- current_position.display(),
- );
- None
- } else {
- gst::debug!(
- CAT,
- obj: stream.sinkpad,
- "Sending first force-keyunit event for running time {}",
- fku_time,
- );
- Some(fku_time)
- };
+ for buffer_list in buffers {
+ gst::trace!(CAT, imp: self, "Pushing buffer list {:?}", buffer_list);
+ self.obj().finish_buffer_list(buffer_list)?;
+ }
- let fku = gst_video::UpstreamForceKeyUnitEvent::builder()
- .running_time(fku_time)
- .all_headers(true)
- .build();
+ return Err(err);
+ }
+ };
- upstream_events.push((stream.sinkpad.clone(), fku));
+ // If nothing can't be drained anymore then break the loop, and if all streams are
+ // EOS do EOS handling.
+ if buffer_list.is_none() {
+ if settings.write_mfra && all_eos {
+ gst::debug!(CAT, imp: self, "Writing mfra box");
+ match boxes::create_mfra(&state.streams[0].caps, &state.fragment_offsets) {
+ Ok(mut mfra) => {
+ {
+ let mfra = mfra.get_mut().unwrap();
+ // mfra is DELTA_UNIT like other buffers
+ mfra.set_flags(gst::BufferFlags::DELTA_UNIT);
+ }
- // Check if this stream is filled enough now.
- if let Some(queued_end_pts) = stream
- .queued_gops
- .iter()
- .find(|gop| gop.final_end_pts)
- .map(|gop| gop.end_pts)
- {
- if queued_end_pts.saturating_sub(earliest_pts)
- >= settings.fragment_duration
- {
- gst::debug!(CAT, obj: stream.sinkpad, "Stream queued enough data for this fragment");
- stream.fragment_filled = true;
+ if buffer_list.is_none() {
+ buffer_list = Some(gst::BufferList::new_sized(1));
+ }
+ buffer_list.as_mut().unwrap().get_mut().unwrap().add(mfra);
+ buffers.extend(buffer_list);
+ }
+ Err(err) => {
+ gst::error!(CAT, imp: self, "Failed to create mfra box: {}", err);
}
}
}
+
+ break;
}
- }
- // If enough GOPs were queued, drain and create the output fragment
- match self.drain(
- &mut state,
- &settings,
- timeout,
- all_eos,
- &mut upstream_events,
- ) {
- Ok(res) => res,
- Err(gst_base::AGGREGATOR_FLOW_NEED_DATA) => {
- gst::element_imp_warning!(
- self,
- gst::StreamError::Format,
- ["Longer GOPs than fragment duration"]
+ // Otherwise extend the list of bufferlists and check again if something can be
+ // drained.
+ buffers.extend(buffer_list);
+
+ timeout = false;
+
+ let fragment_start_pts = state.fragment_start_pts;
+ let chunk_start_pts = state.chunk_start_pts;
+ for stream in &mut state.streams {
+ // Check if this stream is still filled enough now.
+ self.check_stream_filled(
+ &settings,
+ stream,
+ fragment_start_pts,
+ chunk_start_pts,
+ all_eos,
);
- state.timeout_delay += 1.seconds();
-
- drop(state);
- for (sinkpad, event) in upstream_events {
- sinkpad.push_event(event);
- }
- return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA);
}
- Err(err) => return Err(err),
}
- };
+ }
for (sinkpad, event) in upstream_events {
sinkpad.push_event(event);
@@ -2402,54 +2931,38 @@ impl AggregatorImpl for FMP4Mux {
self.obj().set_src_caps(&caps);
}
- if let Some(buffers) = buffers {
- gst::trace!(CAT, imp: self, "Pushing buffer list {:?}", buffers);
- self.obj().finish_buffer_list(buffers)?;
+ for buffer_list in buffers {
+ gst::trace!(CAT, imp: self, "Pushing buffer list {:?}", buffer_list);
+ self.obj().finish_buffer_list(buffer_list)?;
}
- if all_eos {
- gst::debug!(CAT, imp: self, "Doing EOS handling");
-
- if settings.header_update_mode != super::HeaderUpdateMode::None {
- let updated_header =
- self.update_header(&mut self.state.lock().unwrap(), &settings, true);
- match updated_header {
- Ok(Some((buffer_list, caps))) => {
- match settings.header_update_mode {
- super::HeaderUpdateMode::None => unreachable!(),
- super::HeaderUpdateMode::Rewrite => {
- let mut q = gst::query::Seeking::new(gst::Format::Bytes);
- if self.obj().src_pad().peer_query(&mut q) && q.result().0 {
- let aggregator = self.obj();
-
- aggregator.set_src_caps(&caps);
-
- // Seek to the beginning with a default bytes segment
- aggregator
- .update_segment(
- &gst::FormattedSegment::<gst::format::Bytes>::new(),
- );
-
- if let Err(err) = aggregator.finish_buffer_list(buffer_list) {
- gst::error!(
- CAT,
- imp: self,
- "Failed pushing updated header buffer downstream: {:?}",
- err,
- );
- }
- } else {
- gst::error!(
- CAT,
- imp: self,
- "Can't rewrite header because downstream is not seekable"
- );
- }
- }
- super::HeaderUpdateMode::Update => {
+ if !all_eos {
+ return Ok(gst::FlowSuccess::Ok);
+ }
+
+ // Do remaining EOS handling after the end of the stream was pushed.
+ gst::debug!(CAT, imp: self, "Doing EOS handling");
+
+ if settings.header_update_mode != super::HeaderUpdateMode::None {
+ let updated_header =
+ self.update_header(&mut self.state.lock().unwrap(), &settings, true);
+ match updated_header {
+ Ok(Some((buffer_list, caps))) => {
+ match settings.header_update_mode {
+ super::HeaderUpdateMode::None => unreachable!(),
+ super::HeaderUpdateMode::Rewrite => {
+ let mut q = gst::query::Seeking::new(gst::Format::Bytes);
+ if self.obj().src_pad().peer_query(&mut q) && q.result().0 {
let aggregator = self.obj();
aggregator.set_src_caps(&caps);
+
+ // Seek to the beginning with a default bytes segment
+ aggregator
+ .update_segment(
+ &gst::FormattedSegment::<gst::format::Bytes>::new(),
+ );
+
if let Err(err) = aggregator.finish_buffer_list(buffer_list) {
gst::error!(
CAT,
@@ -2458,28 +2971,45 @@ impl AggregatorImpl for FMP4Mux {
err,
);
}
+ } else {
+ gst::error!(
+ CAT,
+ imp: self,
+ "Can't rewrite header because downstream is not seekable"
+ );
+ }
+ }
+ super::HeaderUpdateMode::Update => {
+ let aggregator = self.obj();
+
+ aggregator.set_src_caps(&caps);
+ if let Err(err) = aggregator.finish_buffer_list(buffer_list) {
+ gst::error!(
+ CAT,
+ imp: self,
+ "Failed pushing updated header buffer downstream: {:?}",
+ err,
+ );
}
}
}
- Ok(None) => {}
- Err(err) => {
- gst::error!(
- CAT,
- imp: self,
- "Failed to generate updated header: {:?}",
- err
- );
- }
+ }
+ Ok(None) => {}
+ Err(err) => {
+ gst::error!(
+ CAT,
+ imp: self,
+ "Failed to generate updated header: {:?}",
+ err
+ );
}
}
+ }
- // Need to output new headers if started again after EOS
- self.state.lock().unwrap().sent_headers = false;
+ // Need to output new headers if started again after EOS
+ self.state.lock().unwrap().sent_headers = false;
- Err(gst::FlowError::Eos)
- } else {
- Ok(gst::FlowSuccess::Ok)
- }
+ Err(gst::FlowError::Eos)
}
}
diff --git a/mux/fmp4/src/fmp4mux/mod.rs b/mux/fmp4/src/fmp4mux/mod.rs
index 396d8b81..4d513a59 100644
--- a/mux/fmp4/src/fmp4mux/mod.rs
+++ b/mux/fmp4/src/fmp4mux/mod.rs
@@ -110,6 +110,9 @@ pub(crate) struct FragmentHeaderConfiguration<'a> {
/// Sequence number for this fragment.
sequence_number: u32,
+ /// If this is a full fragment or only a chunk.
+ chunk: bool,
+
streams: &'a [FragmentHeaderStream],
buffers: &'a [Buffer],
}
diff --git a/mux/fmp4/tests/tests.rs b/mux/fmp4/tests/tests.rs
index bb1b794d..9af83898 100644
--- a/mux/fmp4/tests/tests.rs
+++ b/mux/fmp4/tests/tests.rs
@@ -1,3 +1,4 @@
+// Copyright (C) 2021 Sebastian Dröge <sebastian@centricular.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
@@ -1285,3 +1286,386 @@ fn test_buffer_multi_stream_short_gops() {
let ev = h1.pull_event().unwrap();
assert_eq!(ev.type_(), gst::EventType::Eos);
}
+
+#[test]
+fn test_chunking_single_stream() {
+ init();
+
+ let caps = gst::Caps::builder("video/x-h264")
+ .field("width", 1920i32)
+ .field("height", 1080i32)
+ .field("framerate", gst::Fraction::new(30, 1))
+ .field("stream-format", "avc")
+ .field("alignment", "au")
+ .field("codec_data", gst::Buffer::with_size(1).unwrap())
+ .build();
+
+ let mut h = gst_check::Harness::new("cmafmux");
+
+ // 5s fragment duration, 1s chunk duration
+ h.element()
+ .unwrap()
+ .set_property("fragment-duration", 5.seconds());
+ h.element()
+ .unwrap()
+ .set_property("chunk-duration", 1.seconds());
+
+ h.set_src_caps(caps);
+ h.play();
+
+ // Push 15 buffers of 0.5s each, 1st and 11th buffer without DELTA_UNIT flag
+ for i in 0..15 {
+ let mut buffer = gst::Buffer::with_size(1).unwrap();
+ {
+ let buffer = buffer.get_mut().unwrap();
+ buffer.set_pts(i * 500.mseconds());
+ buffer.set_dts(i * 500.mseconds());
+ buffer.set_duration(500.mseconds());
+ if i != 0 && i != 10 {
+ buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
+ }
+ }
+ assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok));
+
+ if i == 2 {
+ let ev = loop {
+ let ev = h.pull_upstream_event().unwrap();
+ if ev.type_() != gst::EventType::Reconfigure
+ && ev.type_() != gst::EventType::Latency
+ {
+ break ev;
+ }
+ };
+
+ assert_eq!(ev.type_(), gst::EventType::CustomUpstream);
+ assert_eq!(
+ gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(),
+ gst_video::UpstreamForceKeyUnitEvent {
+ running_time: Some(5.seconds()),
+ all_headers: true,
+ count: 0
+ }
+ );
+ }
+ }
+
+ // Crank the clock: this should bring us to the end of the first fragment
+ h.crank_single_clock_wait().unwrap();
+
+ let header = h.pull().unwrap();
+ assert_eq!(
+ header.flags(),
+ gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT
+ );
+ assert_eq!(header.pts(), Some(gst::ClockTime::ZERO));
+ assert_eq!(header.dts(), Some(gst::ClockTime::ZERO));
+
+ // There should be 7 chunks now, and the 1st and 6th are starting a fragment.
+ // Each chunk should have two buffers.
+ for chunk in 0..7 {
+ let chunk_header = h.pull().unwrap();
+ if chunk == 0 || chunk == 5 {
+ assert_eq!(chunk_header.flags(), gst::BufferFlags::HEADER);
+ } else {
+ assert_eq!(
+ chunk_header.flags(),
+ gst::BufferFlags::HEADER | gst::BufferFlags::DELTA_UNIT
+ );
+ }
+ assert_eq!(chunk_header.pts(), Some(chunk * 1.seconds()));
+ assert_eq!(chunk_header.dts(), Some(chunk * 1.seconds()));
+ assert_eq!(chunk_header.duration(), Some(1.seconds()));
+
+ for buffer_idx in 0..2 {
+ let buffer = h.pull().unwrap();
+ if buffer_idx == 1 {
+ assert_eq!(
+ buffer.flags(),
+ gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
+ );
+ } else {
+ assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
+ }
+ assert_eq!(
+ buffer.pts(),
+ Some((chunk * 2 + buffer_idx) * 500.mseconds())
+ );
+ assert_eq!(
+ buffer.dts(),
+ Some((chunk * 2 + buffer_idx) * 500.mseconds())
+ );
+ assert_eq!(buffer.duration(), Some(500.mseconds()));
+ }
+ }
+
+ h.push_event(gst::event::Eos::new());
+
+ // There should be the remaining chunk now, containing one 500ms buffer.
+ for chunk in 7..8 {
+ let chunk_header = h.pull().unwrap();
+ assert_eq!(
+ chunk_header.flags(),
+ gst::BufferFlags::HEADER | gst::BufferFlags::DELTA_UNIT
+ );
+ assert_eq!(chunk_header.pts(), Some(chunk * 1.seconds()));
+ assert_eq!(chunk_header.dts(), Some(chunk * 1.seconds()));
+ assert_eq!(chunk_header.duration(), Some(500.mseconds()));
+
+ for buffer_idx in 0..1 {
+ let buffer = h.pull().unwrap();
+ assert_eq!(
+ buffer.flags(),
+ gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
+ );
+ assert_eq!(
+ buffer.pts(),
+ Some((chunk * 2 + buffer_idx) * 500.mseconds())
+ );
+ assert_eq!(
+ buffer.dts(),
+ Some((chunk * 2 + buffer_idx) * 500.mseconds())
+ );
+ assert_eq!(buffer.duration(), Some(500.mseconds()));
+ }
+ }
+
+ let ev = h.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::StreamStart);
+ let ev = h.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::Caps);
+ let ev = h.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::Segment);
+ let ev = h.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::Eos);
+}
+
+#[test]
+fn test_chunking_multi_stream() {
+ init();
+
+ let mut h1 = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src"));
+ let mut h2 = gst_check::Harness::with_element(&h1.element().unwrap(), Some("sink_1"), None);
+
+ // 5s fragment duration, 1s chunk duration
+ h1.element()
+ .unwrap()
+ .set_property("fragment-duration", 5.seconds());
+ h1.element()
+ .unwrap()
+ .set_property("chunk-duration", 1.seconds());
+
+ h1.set_src_caps(
+ gst::Caps::builder("video/x-h264")
+ .field("width", 1920i32)
+ .field("height", 1080i32)
+ .field("framerate", gst::Fraction::new(30, 1))
+ .field("stream-format", "avc")
+ .field("alignment", "au")
+ .field("codec_data", gst::Buffer::with_size(1).unwrap())
+ .build(),
+ );
+ h1.play();
+
+ h2.set_src_caps(
+ gst::Caps::builder("audio/mpeg")
+ .field("mpegversion", 4i32)
+ .field("channels", 1i32)
+ .field("rate", 44100i32)
+ .field("stream-format", "raw")
+ .field("base-profile", "lc")
+ .field("profile", "lc")
+ .field("level", "2")
+ .field(
+ "codec_data",
+ gst::Buffer::from_slice([0x12, 0x08, 0x56, 0xe5, 0x00]),
+ )
+ .build(),
+ );
+ h2.play();
+
+ let output_offset = (60 * 60 * 1000).seconds();
+
+ // Push 15 buffers of 0.5s each, 1st and 11th buffer without DELTA_UNIT flag
+ for i in 0..15 {
+ let mut buffer = gst::Buffer::with_size(1).unwrap();
+ {
+ let buffer = buffer.get_mut().unwrap();
+ buffer.set_pts(i * 500.mseconds());
+ buffer.set_dts(i * 500.mseconds());
+ buffer.set_duration(500.mseconds());
+ if i != 0 && i != 10 {
+ buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
+ }
+ }
+ assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok));
+
+ let mut buffer = gst::Buffer::with_size(1).unwrap();
+ {
+ let buffer = buffer.get_mut().unwrap();
+ buffer.set_pts(i * 500.mseconds());
+ buffer.set_dts(i * 500.mseconds());
+ buffer.set_duration(500.mseconds());
+ }
+ assert_eq!(h2.push(buffer), Ok(gst::FlowSuccess::Ok));
+
+ if i == 2 {
+ let ev = loop {
+ let ev = h1.pull_upstream_event().unwrap();
+ if ev.type_() != gst::EventType::Reconfigure
+ && ev.type_() != gst::EventType::Latency
+ {
+ break ev;
+ }
+ };
+
+ assert_eq!(ev.type_(), gst::EventType::CustomUpstream);
+ assert_eq!(
+ gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(),
+ gst_video::UpstreamForceKeyUnitEvent {
+ running_time: Some(5.seconds()),
+ all_headers: true,
+ count: 0
+ }
+ );
+
+ let ev = loop {
+ let ev = h2.pull_upstream_event().unwrap();
+ if ev.type_() != gst::EventType::Reconfigure
+ && ev.type_() != gst::EventType::Latency
+ {
+ break ev;
+ }
+ };
+
+ assert_eq!(ev.type_(), gst::EventType::CustomUpstream);
+ assert_eq!(
+ gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(),
+ gst_video::UpstreamForceKeyUnitEvent {
+ running_time: Some(5.seconds()),
+ all_headers: true,
+ count: 0
+ }
+ );
+ }
+ }
+
+ // Crank the clock: this should bring us to the end of the first fragment
+ h1.crank_single_clock_wait().unwrap();
+
+ let header = h1.pull().unwrap();
+ assert_eq!(
+ header.flags(),
+ gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT
+ );
+ assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset));
+ assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset));
+
+ // There should be 7 chunks now, and the 1st and 6th are starting a fragment.
+ // Each chunk should have two buffers.
+ for chunk in 0..7 {
+ let chunk_header = h1.pull().unwrap();
+ if chunk == 0 || chunk == 5 {
+ assert_eq!(chunk_header.flags(), gst::BufferFlags::HEADER);
+ } else {
+ assert_eq!(
+ chunk_header.flags(),
+ gst::BufferFlags::HEADER | gst::BufferFlags::DELTA_UNIT
+ );
+ }
+ assert_eq!(
+ chunk_header.pts(),
+ Some(chunk * 1.seconds() + output_offset)
+ );
+ assert_eq!(
+ chunk_header.dts(),
+ Some(chunk * 1.seconds() + output_offset)
+ );
+ assert_eq!(chunk_header.duration(), Some(1.seconds()));
+
+ for buffer_idx in 0..2 {
+ for stream_idx in 0..2 {
+ let buffer = h1.pull().unwrap();
+ if buffer_idx == 1 && stream_idx == 1 {
+ assert_eq!(
+ buffer.flags(),
+ gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
+ );
+ } else {
+ assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
+ }
+ assert_eq!(
+ buffer.pts(),
+ Some((chunk * 2 + buffer_idx) * 500.mseconds() + output_offset)
+ );
+
+ if stream_idx == 0 {
+ assert_eq!(
+ buffer.dts(),
+ Some((chunk * 2 + buffer_idx) * 500.mseconds() + output_offset)
+ );
+ } else {
+ assert!(buffer.dts().is_none());
+ }
+ assert_eq!(buffer.duration(), Some(500.mseconds()));
+ }
+ }
+ }
+
+ h1.push_event(gst::event::Eos::new());
+ h2.push_event(gst::event::Eos::new());
+
+ // There should be the remaining chunk now, containing one 500ms buffer.
+ for chunk in 7..8 {
+ let chunk_header = h1.pull().unwrap();
+ assert_eq!(
+ chunk_header.flags(),
+ gst::BufferFlags::HEADER | gst::BufferFlags::DELTA_UNIT
+ );
+ assert_eq!(
+ chunk_header.pts(),
+ Some(chunk * 1.seconds() + output_offset)
+ );
+ assert_eq!(
+ chunk_header.dts(),
+ Some(chunk * 1.seconds() + output_offset)
+ );
+ assert_eq!(chunk_header.duration(), Some(500.mseconds()));
+
+ for buffer_idx in 0..1 {
+ for stream_idx in 0..2 {
+ let buffer = h1.pull().unwrap();
+ if buffer_idx == 0 && stream_idx == 1 {
+ assert_eq!(
+ buffer.flags(),
+ gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
+ );
+ } else {
+ assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
+ }
+
+ assert_eq!(
+ buffer.pts(),
+ Some((chunk * 2 + buffer_idx) * 500.mseconds() + output_offset)
+ );
+ if stream_idx == 0 {
+ assert_eq!(
+ buffer.dts(),
+ Some((chunk * 2 + buffer_idx) * 500.mseconds() + output_offset)
+ );
+ } else {
+ assert!(buffer.dts().is_none());
+ }
+ assert_eq!(buffer.duration(), Some(500.mseconds()));
+ }
+ }
+ }
+
+ let ev = h1.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::StreamStart);
+ let ev = h1.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::Caps);
+ let ev = h1.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::Segment);
+ let ev = h1.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::Eos);
+}