Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Dröge <sebastian@centricular.com>2022-05-12 13:44:20 +0300
committerSebastian Dröge <sebastian@centricular.com>2022-05-12 18:31:02 +0300
commite06665b92d4ce97f1856c6f072b5277460df5ba5 (patch)
treed4e3e0dee186bcf814b1768a4e9cf49bf9631b24 /generic
parent31a32a7e2ecc9c24d2ca5ba4365e35abafe20fa0 (diff)
fmp4mux: Add support for multiple input streams
Diffstat (limited to 'generic')
-rw-r--r--generic/fmp4/src/fmp4mux/imp.rs1072
-rw-r--r--generic/fmp4/src/fmp4mux/mod.rs9
-rw-r--r--generic/fmp4/tests/tests.rs80
3 files changed, 798 insertions, 363 deletions
diff --git a/generic/fmp4/src/fmp4mux/imp.rs b/generic/fmp4/src/fmp4mux/imp.rs
index 85b212306..634747b93 100644
--- a/generic/fmp4/src/fmp4mux/imp.rs
+++ b/generic/fmp4/src/fmp4mux/imp.rs
@@ -20,6 +20,9 @@ use once_cell::sync::Lazy;
use super::boxes;
use super::Buffer;
+/// Offset for the segment in non-single-stream variants.
+const SEGMENT_OFFSET: gst::ClockTime = gst::ClockTime::from_seconds(60 * 60 * 1000);
+
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"fmp4mux",
@@ -32,6 +35,8 @@ const DEFAULT_FRAGMENT_DURATION: gst::ClockTime = gst::ClockTime::from_seconds(1
const DEFAULT_HEADER_UPDATE_MODE: super::HeaderUpdateMode = super::HeaderUpdateMode::None;
const DEFAULT_WRITE_MFRA: bool = false;
const DEFAULT_WRITE_MEHD: bool = false;
+const DEFAULT_INTERLEAVE_BYTES: Option<u64> = None;
+const DEFAULT_INTERLEAVE_TIME: Option<gst::ClockTime> = Some(gst::ClockTime::from_mseconds(250));
#[derive(Debug, Clone)]
struct Settings {
@@ -39,6 +44,8 @@ struct Settings {
header_update_mode: super::HeaderUpdateMode,
write_mfra: bool,
write_mehd: bool,
+ interleave_bytes: Option<u64>,
+ interleave_time: Option<gst::ClockTime>,
}
impl Default for Settings {
@@ -48,6 +55,8 @@ impl Default for Settings {
header_update_mode: DEFAULT_HEADER_UPDATE_MODE,
write_mfra: DEFAULT_WRITE_MFRA,
write_mehd: DEFAULT_WRITE_MEHD,
+ interleave_bytes: DEFAULT_INTERLEAVE_BYTES,
+ interleave_time: DEFAULT_INTERLEAVE_TIME,
}
}
}
@@ -61,6 +70,8 @@ struct Gop {
final_earliest_pts: bool,
// PTS plus duration of last buffer, or start of next GOP
end_pts: gst::ClockTime,
+ // Once this is known to be the final end PTS/DTS
+ final_end_pts: bool,
// DTS plus duration of last buffer, or start of next GOP
end_dts: Option<gst::ClockTime>,
@@ -72,24 +83,30 @@ struct Gop {
buffers: Vec<Buffer>,
}
-#[derive(Default)]
-struct State {
- caps: Option<gst::Caps>,
- intra_only: bool,
+struct Stream {
+ sinkpad: gst_base::AggregatorPad,
- // Created once we received caps and kept up to date with the caps,
- // sent as part of the buffer list for the first fragment.
- stream_header: Option<gst::Buffer>,
+ caps: gst::Caps,
+ intra_only: bool,
- sequence_number: u32,
queued_gops: VecDeque<Gop>,
- // Duration of all GOPs except for the newest one that is still being filled
- queued_duration: gst::ClockTime,
+ fragment_filled: bool,
// Difference between the first DTS and 0 in case of negative DTS
dts_offset: Option<gst::ClockTime>,
last_force_keyunit_time: Option<gst::ClockTime>,
+}
+
+#[derive(Default)]
+struct State {
+ streams: Vec<Stream>,
+
+ // Created once we received caps and kept up to date with the caps,
+ // sent as part of the buffer list for the first fragment.
+ stream_header: Option<gst::Buffer>,
+
+ sequence_number: u32,
// Fragment tracking for mfra
current_offset: u64,
@@ -99,11 +116,14 @@ struct State {
earliest_pts: Option<gst::ClockTime>,
end_pts: Option<gst::ClockTime>,
- generated_headers: bool,
+ // Start PTS of the current fragment
+ fragment_start_pts: Option<gst::ClockTime>,
+
+ sent_headers: bool,
}
+#[derive(Default)]
pub(crate) struct FMP4Mux {
- sinkpad: gst_base::AggregatorPad,
state: Mutex<State>,
settings: Mutex<Settings>,
}
@@ -112,91 +132,91 @@ impl FMP4Mux {
fn queue_input(
&self,
element: &super::FMP4Mux,
- state: &mut State,
+ idx: usize,
+ stream: &mut Stream,
segment: &gst::FormattedSegment<gst::ClockTime>,
- buffer: gst::Buffer,
+ mut buffer: gst::Buffer,
) -> Result<(), gst::FlowError> {
- gst::trace!(CAT, obj: element, "Handling buffer {:?}", buffer);
+ assert!(!stream.fragment_filled);
- if state.caps.is_none() {
- gst::error!(CAT, obj: element, "Got buffer before caps");
- return Err(gst::FlowError::NotNegotiated);
- }
+ gst::trace!(CAT, obj: &stream.sinkpad, "Handling buffer {:?}", buffer);
- let intra_only = state.intra_only;
+ let intra_only = stream.intra_only;
if !intra_only && buffer.dts().is_none() {
- gst::error!(CAT, obj: element, "Require DTS for video streams");
+ gst::error!(CAT, obj: &stream.sinkpad, "Require DTS for video streams");
return Err(gst::FlowError::Error);
}
if intra_only && buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) {
- gst::error!(CAT, obj: element, "Intra-only stream with delta units");
+ gst::error!(CAT, obj: &stream.sinkpad, "Intra-only stream with delta units");
return Err(gst::FlowError::Error);
}
- let pts = buffer.pts().ok_or_else(|| {
- gst::error!(CAT, obj: element, "Require timestamped buffers");
+ let pts_position = buffer.pts().ok_or_else(|| {
+ gst::error!(CAT, obj: &stream.sinkpad, "Require timestamped buffers");
gst::FlowError::Error
})?;
let duration = buffer.duration();
- let end_pts = duration.opt_add(pts).unwrap_or(pts);
+ let end_pts_position = duration.map_or(pts_position, |duration| pts_position + duration);
- let pts = match segment.to_running_time_full(pts) {
+ let pts = match segment.to_running_time_full(pts_position) {
(_, None) => {
- gst::error!(CAT, obj: element, "Couldn't convert PTS to running time");
+ gst::error!(CAT, obj: &stream.sinkpad, "Couldn't convert PTS to running time");
return Err(gst::FlowError::Error);
}
(pts_signum, _) if pts_signum < 0 => {
- gst::error!(CAT, obj: element, "Negative PTSs are not supported");
+ gst::error!(CAT, obj: &stream.sinkpad, "Negative PTSs are not supported");
return Err(gst::FlowError::Error);
}
(_, Some(pts)) => pts,
};
- let end_pts = match segment.to_running_time_full(end_pts) {
+ let end_pts = match segment.to_running_time_full(end_pts_position) {
(_, None) => {
gst::error!(
CAT,
- obj: element,
+ obj: &stream.sinkpad,
"Couldn't convert end PTS to running time"
);
return Err(gst::FlowError::Error);
}
(pts_signum, _) if pts_signum < 0 => {
- gst::error!(CAT, obj: element, "Negative PTSs are not supported");
+ gst::error!(CAT, obj: &stream.sinkpad, "Negative PTSs are not supported");
return Err(gst::FlowError::Error);
}
(_, Some(pts)) => pts,
};
- let (dts, end_dts) = if intra_only {
- (None, None)
+ let (dts_position, dts, end_dts) = if intra_only {
+ (None, None, None)
} else {
- // with the dts_offset by having negative composition time offsets in the `trun` box.
- let dts = buffer.dts().expect("not DTS");
- let end_dts = duration.opt_add(dts).unwrap_or(dts);
+ // Negative DTS are handled via the dts_offset and by having negative composition time
+ // offsets in the `trun` box. The smallest DTS here is shifted to zero.
+ let dts_position = buffer.dts().expect("not DTS");
+ let end_dts_position =
+ duration.map_or(dts_position, |duration| dts_position + duration);
- let dts = match segment.to_running_time_full(dts) {
+ let dts = match segment.to_running_time_full(dts_position) {
(_, None) => {
- gst::error!(CAT, obj: element, "Couldn't convert DTS to running time");
+ gst::error!(CAT, obj: &stream.sinkpad, "Couldn't convert DTS to running time");
return Err(gst::FlowError::Error);
}
(pts_signum, Some(dts)) if pts_signum < 0 => {
- if state.dts_offset.is_none() {
- state.dts_offset = Some(dts);
+ if stream.dts_offset.is_none() {
+ stream.dts_offset = Some(dts);
}
- let dts_offset = state.dts_offset.unwrap();
+ let dts_offset = stream.dts_offset.unwrap();
if dts > dts_offset {
- gst::warning!(CAT, obj: element, "DTS before first DTS");
+ gst::warning!(CAT, obj: &stream.sinkpad, "DTS before first DTS");
gst::ClockTime::ZERO
} else {
dts_offset - dts
}
}
(_, Some(dts)) => {
- if let Some(dts_offset) = state.dts_offset {
+ if let Some(dts_offset) = stream.dts_offset {
dts + dts_offset
} else {
dts
@@ -204,70 +224,91 @@ impl FMP4Mux {
}
};
- let end_dts = match segment.to_running_time_full(end_dts) {
+ let end_dts = match segment.to_running_time_full(end_dts_position) {
(_, None) => {
gst::error!(
CAT,
- obj: element,
+ obj: &stream.sinkpad,
"Couldn't convert end DTS to running time"
);
return Err(gst::FlowError::Error);
}
(pts_signum, Some(dts)) if pts_signum < 0 => {
- if state.dts_offset.is_none() {
- state.dts_offset = Some(dts);
+ if stream.dts_offset.is_none() {
+ stream.dts_offset = Some(dts);
}
- let dts_offset = state.dts_offset.unwrap();
+ let dts_offset = stream.dts_offset.unwrap();
if dts > dts_offset {
- gst::warning!(CAT, obj: element, "End DTS before first DTS");
+ gst::warning!(CAT, obj: &stream.sinkpad, "End DTS before first DTS");
gst::ClockTime::ZERO
} else {
dts_offset - dts
}
}
(_, Some(dts)) => {
- if let Some(dts_offset) = state.dts_offset {
+ if let Some(dts_offset) = stream.dts_offset {
dts + dts_offset
} else {
dts
}
}
};
- (Some(dts), Some(end_dts))
+ (Some(dts_position), Some(dts), Some(end_dts))
+ };
+
+ // If this is a multi-stream element then we need to update the PTS/DTS positions according
+ // to the output segment, specifically to re-timestamp them with the running time and
+ // adjust for the segment shift to compensate for negative DTS.
+ let class = element.class();
+ let (pts_position, dts_position) = if class.as_ref().variant.is_single_stream() {
+ (pts_position, dts_position)
+ } else {
+ let pts_position = pts + SEGMENT_OFFSET;
+ let dts_position = dts.map(|dts| {
+ dts + SEGMENT_OFFSET - stream.dts_offset.unwrap_or(gst::ClockTime::ZERO)
+ });
+
+ let buffer = buffer.make_mut();
+ buffer.set_pts(pts_position);
+ buffer.set_dts(dts_position);
+
+ (pts_position, dts_position)
};
if !buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) {
gst::debug!(
CAT,
- obj: element,
- "Starting new GOP at PTS {} DTS {}",
+ obj: &stream.sinkpad,
+ "Starting new GOP at PTS {} DTS {} (DTS offset {})",
pts,
- dts.display()
+ dts.display(),
+ stream.dts_offset.display(),
);
let gop = Gop {
start_pts: pts,
start_dts: dts,
- start_dts_position: if intra_only { None } else { buffer.dts() },
+ start_dts_position: if intra_only { None } else { dts_position },
earliest_pts: pts,
- earliest_pts_position: buffer.pts().expect("no PTS"),
+ earliest_pts_position: pts_position,
final_earliest_pts: intra_only,
end_pts,
end_dts,
+ final_end_pts: false,
buffers: vec![Buffer {
- idx: 0,
+ idx,
buffer,
pts,
dts,
}],
};
- state.queued_gops.push_front(gop);
+ stream.queued_gops.push_front(gop);
- if let Some(prev_gop) = state.queued_gops.get_mut(1) {
+ if let Some(prev_gop) = stream.queued_gops.get_mut(1) {
gst::debug!(
CAT,
- obj: element,
+ obj: &stream.sinkpad,
"Updating previous GOP starting at PTS {} to end PTS {} DTS {}",
prev_gop.earliest_pts,
pts,
@@ -276,51 +317,39 @@ impl FMP4Mux {
prev_gop.end_pts = pts;
prev_gop.end_dts = dts;
+ if intra_only {
+ prev_gop.final_end_pts = true;
+ }
+
if !prev_gop.final_earliest_pts {
// Don't bother logging this for intra-only streams as it would be for every
// single buffer.
if !intra_only {
gst::debug!(
CAT,
- obj: element,
+ obj: &stream.sinkpad,
"Previous GOP has final earliest PTS at {}",
prev_gop.earliest_pts
);
}
prev_gop.final_earliest_pts = true;
-
- state.queued_duration =
- prev_gop.end_pts - state.queued_gops.back().unwrap().earliest_pts;
- gst::debug!(
- CAT,
- obj: element,
- "Queued duration updated to {}",
- state.queued_duration
- );
- } else if intra_only {
- state.queued_duration =
- prev_gop.end_pts - state.queued_gops.back().unwrap().earliest_pts;
- gst::debug!(
- CAT,
- obj: element,
- "Queued duration updated to {}",
- state.queued_duration
- );
+ if let Some(prev_prev_gop) = stream.queued_gops.get_mut(2) {
+ prev_prev_gop.final_end_pts = true;
+ }
}
}
- } else if let Some(gop) = state.queued_gops.front_mut() {
+ } else if let Some(gop) = stream.queued_gops.front_mut() {
assert!(!intra_only);
// We require DTS for non-intra-only streams
let dts = dts.unwrap();
let end_dts = end_dts.unwrap();
- let pts_position = buffer.pts().expect("no PTS");
gop.end_pts = std::cmp::max(gop.end_pts, end_pts);
gop.end_dts = Some(std::cmp::max(gop.end_dts.expect("no end DTS"), end_dts));
gop.buffers.push(Buffer {
- idx: 0,
+ idx,
buffer,
pts,
dts: Some(dts),
@@ -329,7 +358,7 @@ impl FMP4Mux {
if gop.earliest_pts > pts && !gop.final_earliest_pts {
gst::debug!(
CAT,
- obj: element,
+ obj: &stream.sinkpad,
"Updating current GOP earliest PTS from {} to {}",
gop.earliest_pts,
pts
@@ -337,10 +366,10 @@ impl FMP4Mux {
gop.earliest_pts = pts;
gop.earliest_pts_position = pts_position;
- if let Some(prev_gop) = state.queued_gops.get_mut(1) {
+ if let Some(prev_gop) = stream.queued_gops.get_mut(1) {
gst::debug!(
CAT,
- obj: element,
+ obj: &stream.sinkpad,
"Updating previous GOP starting PTS {} end time from {} to {}",
pts,
prev_gop.end_pts,
@@ -350,7 +379,7 @@ impl FMP4Mux {
}
}
- let gop = state.queued_gops.front_mut().unwrap();
+ let gop = stream.queued_gops.front_mut().unwrap();
// The earliest PTS is known when the current DTS is bigger or equal to the first
// PTS that was observed in this GOP. If there was another frame later that had a
@@ -359,89 +388,111 @@ impl FMP4Mux {
if gop.start_pts <= dts && !gop.final_earliest_pts {
gst::debug!(
CAT,
- obj: element,
+ obj: &stream.sinkpad,
"GOP has final earliest PTS at {}",
gop.earliest_pts
);
gop.final_earliest_pts = true;
- if let Some(prev_gop) = state.queued_gops.get_mut(1) {
- state.queued_duration =
- prev_gop.end_pts - state.queued_gops.back().unwrap().earliest_pts;
- gst::debug!(
- CAT,
- obj: element,
- "Queued duration updated to {}",
- state.queued_duration
- );
+ if let Some(prev_gop) = stream.queued_gops.get_mut(1) {
+ prev_gop.final_end_pts = true;
}
}
} else {
gst::warning!(
CAT,
- obj: element,
+ obj: &stream.sinkpad,
"Waiting for keyframe at the beginning of the stream"
);
}
+ if let Some((prev_gop, first_gop)) = Option::zip(
+ stream.queued_gops.iter().find(|gop| gop.final_end_pts),
+ stream.queued_gops.back(),
+ ) {
+ gst::debug!(
+ CAT,
+ obj: &stream.sinkpad,
+ "Queued full GOPs duration updated to {}",
+ prev_gop.end_pts - first_gop.earliest_pts,
+ );
+ }
+
+ gst::debug!(
+ CAT,
+ obj: &stream.sinkpad,
+ "Queued duration updated to {}",
+ Option::zip(stream.queued_gops.front(), stream.queued_gops.back())
+ .map(|(end, start)| end.end_pts - start.start_pts)
+ .unwrap_or(gst::ClockTime::ZERO)
+ );
+
Ok(())
}
+ fn create_initial_force_keyunit_event(
+ &self,
+ _element: &super::FMP4Mux,
+ stream: &mut Stream,
+ settings: &Settings,
+ earliest_pts: gst::ClockTime,
+ ) -> Result<Option<gst::Event>, gst::FlowError> {
+ assert!(stream.last_force_keyunit_time.is_none());
+
+ // If we never sent a force-keyunit event then send one now.
+ let fku_running_time = earliest_pts + settings.fragment_duration;
+ gst::debug!(
+ CAT,
+ obj: &stream.sinkpad,
+ "Sending first force-keyunit event for running time {}",
+ fku_running_time
+ );
+ stream.last_force_keyunit_time = Some(fku_running_time);
+
+ return Ok(Some(
+ gst_video::UpstreamForceKeyUnitEvent::builder()
+ .running_time(fku_running_time)
+ .all_headers(true)
+ .build(),
+ ));
+ }
+
fn create_force_keyunit_event(
&self,
- element: &super::FMP4Mux,
- state: &mut State,
+ _element: &super::FMP4Mux,
+ stream: &mut Stream,
settings: &Settings,
segment: &gst::FormattedSegment<gst::ClockTime>,
pts: gst::ClockTime,
) -> Result<Option<gst::Event>, gst::FlowError> {
// If we never sent a force-keyunit event then wait until the earliest PTS of the first GOP
- // is known and send one now.
+ // is known and send it then.
//
// Otherwise if the current PTS is a fragment duration in the future, send the next one
// now.
- let oldest_gop = state.queued_gops.back().unwrap();
- let earliest_pts = oldest_gop.earliest_pts;
let pts = segment.to_running_time(pts).expect("no running time");
- if state.last_force_keyunit_time.is_none() && oldest_gop.final_earliest_pts {
- let fku_running_time = earliest_pts + settings.fragment_duration;
- gst::debug!(
- CAT,
- obj: element,
- "Sending first force-keyunit event for running time {}",
- fku_running_time
- );
- state.last_force_keyunit_time = Some(fku_running_time);
-
- return Ok(Some(
- gst_video::UpstreamForceKeyUnitEvent::builder()
- .running_time(fku_running_time)
- .all_headers(true)
- .build(),
- ));
- } else if state.last_force_keyunit_time.is_some()
- && state.last_force_keyunit_time <= Some(pts)
- {
- let fku_running_time =
- state.last_force_keyunit_time.unwrap() + settings.fragment_duration;
- gst::debug!(
- CAT,
- obj: element,
- "Sending force-keyunit event for running time {}",
- fku_running_time
- );
- state.last_force_keyunit_time = Some(fku_running_time);
-
- return Ok(Some(
- gst_video::UpstreamForceKeyUnitEvent::builder()
- .running_time(fku_running_time)
- .all_headers(true)
- .build(),
- ));
- }
+ let last_force_keyunit_time = match stream.last_force_keyunit_time {
+ None => return Ok(None),
+ Some(last_force_keyunit_time) if last_force_keyunit_time > pts => return Ok(None),
+ Some(last_force_keyunit_time) => last_force_keyunit_time,
+ };
- Ok(None)
+ let fku_running_time = last_force_keyunit_time + settings.fragment_duration;
+ gst::debug!(
+ CAT,
+ obj: &stream.sinkpad,
+ "Sending force-keyunit event for running time {}",
+ fku_running_time
+ );
+ stream.last_force_keyunit_time = Some(fku_running_time);
+
+ Ok(Some(
+ gst_video::UpstreamForceKeyUnitEvent::builder()
+ .running_time(fku_running_time)
+ .all_headers(true)
+ .build(),
+ ))
}
fn drain(
@@ -453,75 +504,190 @@ impl FMP4Mux {
) -> Result<Option<gst::BufferList>, gst::FlowError> {
let class = element.class();
- if state.queued_duration < settings.fragment_duration && !at_eos {
- return Ok(None);
+ if at_eos {
+ gst::info!(CAT, obj: element, "Draining at EOS");
+ } else {
+ for stream in &state.streams {
+ if !stream.fragment_filled && !stream.sinkpad.is_eos() {
+ return Ok(None);
+ }
+ }
}
- assert!(at_eos || state.queued_gops.get(1).map(|gop| gop.final_earliest_pts) == Some(true));
+ let mut drain_buffers = Vec::with_capacity(state.streams.len());
+ let mut timing_infos = Vec::with_capacity(state.streams.len());
+ let mut caps = Vec::with_capacity(state.streams.len());
- // At EOS, finalize all GOPs and drain them out. Otherwise if the queued duration is
- // equal to the fragment duration then drain out all complete GOPs, otherwise all
- // except for the newest complete GOP.
- let drain_gops = if at_eos {
- gst::info!(CAT, obj: element, "Draining at EOS");
- state.queued_duration = gst::ClockTime::ZERO;
- state
- .queued_gops
- .drain(..)
- .map(|mut gop| {
- gop.final_earliest_pts = true;
- gop
- })
- .collect::<Vec<_>>()
- } else if state.queued_duration == settings.fragment_duration
- || state.queued_gops.len() == 2
- {
- state.queued_duration = gst::ClockTime::ZERO;
- state.queued_gops.drain(1..).collect::<Vec<_>>()
- } else {
- let gops = state.queued_gops.drain(2..).collect::<Vec<_>>();
+ let mut min_earliest_pts_position = None;
+ let mut min_earliest_pts = None;
+ let mut min_start_dts_position = None;
+ let mut max_end_pts = None;
+
+ for stream in &mut state.streams {
+ assert!(
+ at_eos
+ || stream.sinkpad.is_eos()
+ || stream.queued_gops.get(1).map(|gop| gop.final_earliest_pts) == Some(true)
+ );
+
+ // At EOS, finalize all GOPs and drain them out. Otherwise if the queued duration is
+ // equal to the fragment duration then drain out all complete GOPs, otherwise all
+ // except for the newest complete GOP.
+ let gops = if at_eos || stream.sinkpad.is_eos() {
+ stream.queued_gops.drain(..).rev().collect::<Vec<_>>()
+ } else {
+ let mut gops = vec![];
+
+ let fragment_start_pts = state.fragment_start_pts.unwrap();
+ while let Some(gop) = stream.queued_gops.pop_back() {
+ assert!(gop.final_end_pts);
+
+ let end_pts = gop.end_pts;
+ gops.push(gop);
+ if end_pts.saturating_sub(fragment_start_pts) >= settings.fragment_duration {
+ break;
+ }
+ }
- let gop = state.queued_gops.front().unwrap();
- if gop.final_earliest_pts {
- let prev_gop = state.queued_gops.get(1).unwrap();
- state.queued_duration =
- prev_gop.end_pts - state.queued_gops.back().unwrap().earliest_pts;
+ gops
+ };
+ stream.fragment_filled = false;
+
+ if gops.is_empty() {
+ timing_infos.push(None);
} else {
- state.queued_duration = gst::ClockTime::ZERO;
+ let first_gop = gops.first().unwrap();
+ let last_gop = gops.last().unwrap();
+ let earliest_pts = first_gop.earliest_pts;
+ let earliest_pts_position = first_gop.earliest_pts_position;
+ let start_dts = first_gop.start_dts;
+ let start_dts_position = first_gop.start_dts_position;
+ let end_pts = last_gop.end_pts;
+ let end_dts = last_gop.end_dts;
+ let dts_offset = stream.dts_offset;
+
+ if min_earliest_pts.map_or(true, |min| min > earliest_pts) {
+ min_earliest_pts = Some(earliest_pts);
+ }
+ if min_earliest_pts_position.map_or(true, |min| min > earliest_pts_position) {
+ min_earliest_pts_position = Some(earliest_pts_position);
+ }
+ if let Some(start_dts_position) = start_dts_position {
+ if min_start_dts_position.map_or(true, |min| min > start_dts_position) {
+ min_start_dts_position = Some(start_dts_position);
+ }
+ }
+ if max_end_pts.map_or(true, |max| max < end_pts) {
+ max_end_pts = Some(end_pts);
+ }
+
+ gst::info!(
+ CAT,
+ obj: &stream.sinkpad,
+ "Draining {} worth of buffers starting at PTS {} DTS {}, DTS offset {}",
+ end_pts - earliest_pts,
+ earliest_pts,
+ start_dts.display(),
+ dts_offset.display(),
+ );
+
+ if let Some((prev_gop, first_gop)) = Option::zip(
+ stream.queued_gops.iter().find(|gop| gop.final_end_pts),
+ stream.queued_gops.back(),
+ ) {
+ gst::debug!(
+ CAT,
+ obj: &stream.sinkpad,
+ "Queued full GOPs duration updated to {}",
+ prev_gop.end_pts - first_gop.earliest_pts,
+ );
+ }
+
+ gst::debug!(
+ CAT,
+ obj: &stream.sinkpad,
+ "Queued duration updated to {}",
+ Option::zip(stream.queued_gops.front(), stream.queued_gops.back())
+ .map(|(end, start)| end.end_pts - start.start_pts)
+ .unwrap_or(gst::ClockTime::ZERO)
+ );
+
+ timing_infos.push(Some(super::FragmentTimingInfo {
+ earliest_pts,
+ start_dts,
+ end_pts,
+ end_dts,
+ dts_offset,
+ }));
}
- gops
- };
+ caps.push(&stream.caps);
- let mut buffer_list = None;
+ let mut buffers = VecDeque::with_capacity(gops.iter().map(|g| g.buffers.len()).sum());
+ for gop in gops {
+ for buffer in gop.buffers {
+ buffers.push_back(buffer);
+ }
+ }
+ drain_buffers.push(buffers);
+ }
- if !drain_gops.is_empty() {
- let earliest_pts = drain_gops.last().unwrap().earliest_pts;
- let earliest_pts_position = drain_gops.last().unwrap().earliest_pts_position;
- let start_dts = drain_gops.last().unwrap().start_dts;
- let start_dts_position = drain_gops.last().unwrap().start_dts_position;
- let end_pts = drain_gops[0].end_pts;
- let end_dts = drain_gops[0].end_dts;
- let dts_offset = state.dts_offset;
+ // Interleave buffers according to the settings into a single vec
+ let mut interleaved_buffers =
+ Vec::with_capacity(drain_buffers.iter().map(|bs| bs.len()).sum());
+ while drain_buffers.iter().any(|bs| !bs.is_empty()) {
+ for (idx, bs) in drain_buffers.iter_mut().enumerate() {
+ let mut dequeued_time = gst::ClockTime::ZERO;
+ let mut dequeued_bytes = 0;
+
+ while settings
+ .interleave_bytes
+ .map_or(true, |max_bytes| dequeued_bytes <= max_bytes)
+ && settings
+ .interleave_time
+ .map_or(true, |max_time| dequeued_time <= max_time)
+ {
+ if let Some(buffer) = bs.pop_front() {
+ dequeued_time += match bs.front() {
+ Some(next_buffer) => match Option::zip(next_buffer.dts, buffer.dts) {
+ Some((b, a)) => b - a,
+ None => next_buffer.pts - buffer.pts,
+ },
+ None => {
+ let timing_info = timing_infos[idx].as_ref().unwrap();
+ match Option::zip(timing_info.end_dts, buffer.dts) {
+ Some((b, a)) => b - a,
+ None => timing_info.end_pts - buffer.pts,
+ }
+ }
+ };
+ dequeued_bytes += buffer.buffer.size() as u64;
+ interleaved_buffers.push(buffer);
+ } else {
+ // No buffers left in this stream, go to next stream
+ break;
+ }
+ }
+ }
+ }
- gst::info!(
- CAT,
- obj: element,
- "Draining {} worth of buffers starting at PTS {} DTS {}, DTS offset {}",
- end_pts - earliest_pts,
- earliest_pts,
- start_dts.display(),
- dts_offset.display(),
- );
+ let mut buffer_list = None;
+
+ if interleaved_buffers.is_empty() {
+ assert!(at_eos);
+ } else {
+ let min_earliest_pts_position = min_earliest_pts_position.unwrap();
+ let min_earliest_pts = min_earliest_pts.unwrap();
+ let max_end_pts = max_end_pts.unwrap();
let mut fmp4_header = None;
- if !state.generated_headers {
+ if !state.sent_headers {
let mut buffer = state.stream_header.as_ref().unwrap().copy();
{
let buffer = buffer.get_mut().unwrap();
- buffer.set_pts(earliest_pts_position);
- buffer.set_dts(start_dts_position);
+ buffer.set_pts(min_earliest_pts_position);
+ buffer.set_dts(min_start_dts_position);
// Header is DISCONT|HEADER
buffer.set_flags(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER);
@@ -529,16 +695,9 @@ impl FMP4Mux {
fmp4_header = Some(buffer);
- state.earliest_pts = Some(earliest_pts);
- state.generated_headers = true;
+ state.sent_headers = true;
}
- let mut buffers = drain_gops
- .into_iter()
- .rev()
- .flat_map(|gop| gop.buffers)
- .collect::<Vec<Buffer>>();
-
// TODO: Write prft boxes before moof
// TODO: Write sidx boxes before moof and rewrite once offsets are known
@@ -551,15 +710,9 @@ impl FMP4Mux {
boxes::create_fmp4_fragment_header(super::FragmentHeaderConfiguration {
variant: class.as_ref().variant,
sequence_number,
- caps: &[state.caps.as_ref().unwrap()],
- timing_infos: &[Some(super::FragmentTimingInfo {
- earliest_pts,
- start_dts,
- end_pts,
- end_dts,
- dts_offset,
- })],
- buffers: &buffers,
+ caps: caps.as_slice(),
+ timing_infos: timing_infos.as_slice(),
+ buffers: interleaved_buffers.as_slice(),
})
.map_err(|err| {
gst::error!(
@@ -573,9 +726,9 @@ impl FMP4Mux {
{
let buffer = fmp4_fragment_header.get_mut().unwrap();
- buffer.set_pts(earliest_pts_position);
- buffer.set_dts(start_dts_position);
- buffer.set_duration(end_pts.checked_sub(earliest_pts));
+ buffer.set_pts(min_earliest_pts_position);
+ buffer.set_dts(min_start_dts_position);
+ buffer.set_duration(max_end_pts.checked_sub(min_earliest_pts));
// Fragment header is HEADER
buffer.set_flags(gst::BufferFlags::HEADER);
@@ -583,17 +736,20 @@ impl FMP4Mux {
// Copy metas from the first actual buffer to the fragment header. This allows
// getting things like the reference timestamp meta or the timecode meta to identify
// the fragment.
- let _ = buffers[0]
- .buffer
- .copy_into(buffer, gst::BufferCopyFlags::META, 0, None);
+ let _ = interleaved_buffers[0].buffer.copy_into(
+ buffer,
+ gst::BufferCopyFlags::META,
+ 0,
+ None,
+ );
}
let moof_offset = state.current_offset
+ fmp4_header.as_ref().map(|h| h.size()).unwrap_or(0) as u64
+ moof_offset;
- let buffers_len = buffers.len();
- for (idx, buffer) in buffers.iter_mut().enumerate() {
+ let buffers_len = interleaved_buffers.len();
+ for (idx, buffer) in interleaved_buffers.iter_mut().enumerate() {
// Fix up buffer flags, all other buffers are DELTA_UNIT
let buffer_ref = buffer.buffer.make_mut();
buffer_ref.unset_flags(gst::BufferFlags::all());
@@ -609,29 +765,31 @@ impl FMP4Mux {
fmp4_header
.into_iter()
.chain(Some(fmp4_fragment_header))
- .chain(buffers.into_iter().map(|buffer| buffer.buffer))
+ .chain(interleaved_buffers.into_iter().map(|buffer| buffer.buffer))
.inspect(|b| {
state.current_offset += b.size() as u64;
})
.collect::<gst::BufferList>(),
);
- state.fragment_offsets.push(super::FragmentOffset {
- time: earliest_pts,
- offset: moof_offset,
- });
- state.end_pts = Some(end_pts);
+ // Write mfra only for the main stream, and if there are no buffers for the main stream
+ // in this segment then don't write anything.
+ if let Some(Some(ref timing_info)) = timing_infos.get(0) {
+ state.fragment_offsets.push(super::FragmentOffset {
+ time: timing_info.earliest_pts,
+ offset: moof_offset,
+ });
+ }
+ state.end_pts = Some(max_end_pts);
- gst::debug!(
- CAT,
- obj: element,
- "Queued duration updated to {} after draining",
- state.queued_duration
- );
+ // Update for the start PTS of the next fragment
+ state.fragment_start_pts = state
+ .fragment_start_pts
+ .map(|start| start + settings.fragment_duration);
}
if settings.write_mfra && at_eos {
- match boxes::create_mfra(state.caps.as_ref().unwrap(), &state.fragment_offsets) {
+ match boxes::create_mfra(caps[0], &state.fragment_offsets) {
Ok(mut mfra) => {
{
let mfra = mfra.get_mut().unwrap();
@@ -656,6 +814,93 @@ impl FMP4Mux {
Ok(buffer_list)
}
+ fn create_streams(
+ &self,
+ element: &super::FMP4Mux,
+ state: &mut State,
+ ) -> Result<(), gst::FlowError> {
+ for pad in element
+ .sink_pads()
+ .into_iter()
+ .map(|pad| pad.downcast::<gst_base::AggregatorPad>().unwrap())
+ {
+ let caps = match pad.current_caps() {
+ Some(caps) => caps,
+ None => {
+ gst::warning!(CAT, obj: &pad, "Skipping pad without caps");
+ continue;
+ }
+ };
+
+ gst::info!(CAT, obj: &pad, "Configuring caps {:?}", caps);
+
+ let s = caps.structure(0).unwrap();
+
+ let mut intra_only = false;
+ match s.name() {
+ "video/x-h264" | "video/x-h265" => {
+ if !s.has_field_with_type("codec_data", gst::Buffer::static_type()) {
+ gst::error!(CAT, obj: &pad, "Received caps without codec_data");
+ return Err(gst::FlowError::NotNegotiated);
+ }
+ }
+ "audio/mpeg" => {
+ if !s.has_field_with_type("codec_data", gst::Buffer::static_type()) {
+ gst::error!(CAT, obj: &pad, "Received caps without codec_data");
+ return Err(gst::FlowError::NotNegotiated);
+ }
+ intra_only = true;
+ }
+ _ => unreachable!(),
+ }
+
+ state.streams.push(Stream {
+ sinkpad: pad,
+ caps,
+ intra_only,
+ queued_gops: VecDeque::new(),
+ fragment_filled: false,
+ dts_offset: None,
+ last_force_keyunit_time: None,
+ });
+ }
+
+ if state.streams.is_empty() {
+ gst::error!(CAT, obj: element, "No streams available");
+ return Err(gst::FlowError::Error);
+ }
+
+ // Sort video streams first and then audio streams, and each group by pad name.
+ state.streams.sort_by(|a, b| {
+ let stream_type_of_caps = |caps: &gst::CapsRef| {
+ let s = caps.structure(0).unwrap();
+
+ if s.name().starts_with("video/") {
+ gst::StreamType::VIDEO
+ } else if s.name().starts_with("audio/") {
+ gst::StreamType::AUDIO
+ } else {
+ unimplemented!();
+ }
+ };
+
+ let st_a = stream_type_of_caps(&a.caps);
+ let st_b = stream_type_of_caps(&b.caps);
+
+ if st_a == st_b {
+ return a.sinkpad.name().cmp(&b.sinkpad.name());
+ }
+
+ if st_a == gst::StreamType::VIDEO {
+ std::cmp::Ordering::Less
+ } else {
+ std::cmp::Ordering::Greater
+ }
+ });
+
+ Ok(())
+ }
+
fn update_header(
&self,
element: &super::FMP4Mux,
@@ -670,7 +915,7 @@ impl FMP4Mux {
return Ok(None);
}
- assert!(!at_eos || state.queued_gops.is_empty());
+ assert!(!at_eos || state.streams.iter().all(|s| s.queued_gops.is_empty()));
let duration = state
.end_pts
@@ -678,10 +923,12 @@ impl FMP4Mux {
.ok()
.flatten();
+ let caps = state.streams.iter().map(|s| &s.caps).collect::<Vec<_>>();
+
let mut buffer = boxes::create_fmp4_header(super::HeaderConfiguration {
variant,
update: at_eos,
- caps: &[state.caps.as_ref().unwrap()],
+ caps: caps.as_slice(),
write_mehd: settings.write_mehd,
duration: if at_eos { duration } else { None },
})
@@ -727,20 +974,6 @@ impl ObjectSubclass for FMP4Mux {
type Type = super::FMP4Mux;
type ParentType = gst_base::Aggregator;
type Class = Class;
-
- fn with_class(klass: &Self::Class) -> Self {
- let templ = klass.pad_template("sink").unwrap();
- let sinkpad =
- gst::PadBuilder::<gst_base::AggregatorPad>::from_template(&templ, Some("sink"))
- .flags(gst::PadFlags::ACCEPT_INTERSECT)
- .build();
-
- Self {
- sinkpad,
- settings: Mutex::default(),
- state: Mutex::default(),
- }
- }
}
impl ObjectImpl for FMP4Mux {
@@ -779,6 +1012,25 @@ impl ObjectImpl for FMP4Mux {
DEFAULT_WRITE_MFRA,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
+ glib::ParamSpecUInt64::new(
+ "interleave-bytes",
+ "Interleave Bytes",
+ "Interleave between streams in bytes",
+ 0,
+ u64::MAX,
+ DEFAULT_INTERLEAVE_BYTES.unwrap_or(0),
+ glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
+ ),
+ glib::ParamSpecUInt64::new(
+ "interleave-time",
+ "Interleave Time",
+ "Interleave between streams in nanoseconds",
+ 0,
+ u64::MAX,
+ DEFAULT_INTERLEAVE_TIME.map(gst::ClockTime::nseconds).unwrap_or(u64::MAX),
+ glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
+ ),
+
]
});
@@ -818,6 +1070,22 @@ impl ObjectImpl for FMP4Mux {
settings.write_mehd = value.get().expect("type checked upstream");
}
+ "interleave-bytes" => {
+ let mut settings = self.settings.lock().unwrap();
+ settings.interleave_bytes = match value.get().expect("type checked upstream") {
+ 0 => None,
+ v => Some(v),
+ };
+ }
+
+ "interleave-time" => {
+ let mut settings = self.settings.lock().unwrap();
+ settings.interleave_time = match value.get().expect("type checked upstream") {
+ Some(gst::ClockTime::ZERO) | None => None,
+ v => v,
+ };
+ }
+
_ => unimplemented!(),
}
}
@@ -844,6 +1112,16 @@ impl ObjectImpl for FMP4Mux {
settings.write_mehd.to_value()
}
+ "interleave-bytes" => {
+ let settings = self.settings.lock().unwrap();
+ settings.interleave_bytes.unwrap_or(0).to_value()
+ }
+
+ "interleave-time" => {
+ let settings = self.settings.lock().unwrap();
+ settings.interleave_time.to_value()
+ }
+
_ => unimplemented!(),
}
}
@@ -851,7 +1129,19 @@ impl ObjectImpl for FMP4Mux {
fn constructed(&self, obj: &Self::Type) {
self.parent_constructed(obj);
- obj.add_pad(&self.sinkpad).unwrap();
+ let class = obj.class();
+ for templ in class.pad_template_list().filter(|templ| {
+ templ.presence() == gst::PadPresence::Always
+ && templ.direction() == gst::PadDirection::Sink
+ }) {
+ let sinkpad =
+ gst::PadBuilder::<gst_base::AggregatorPad>::from_template(&templ, Some("sink"))
+ .flags(gst::PadFlags::ACCEPT_INTERSECT)
+ .build();
+
+ obj.add_pad(&sinkpad).unwrap();
+ }
+
obj.set_latency(Settings::default().fragment_duration, None);
}
}
@@ -859,16 +1149,24 @@ impl ObjectImpl for FMP4Mux {
impl GstObjectImpl for FMP4Mux {}
impl ElementImpl for FMP4Mux {
- fn release_pad(&self, _element: &Self::Type, _pad: &gst::Pad) {}
-
fn request_new_pad(
&self,
- _element: &Self::Type,
- _templ: &gst::PadTemplate,
- _name: Option<String>,
- _caps: Option<&gst::Caps>,
+ element: &Self::Type,
+ templ: &gst::PadTemplate,
+ name: Option<String>,
+ caps: Option<&gst::Caps>,
) -> Option<gst::Pad> {
- None
+ let state = self.state.lock().unwrap();
+ if state.stream_header.is_some() {
+ gst::error!(
+ CAT,
+ obj: element,
+ "Can't request new pads after header was generated"
+ );
+ return None;
+ }
+
+ self.parent_request_new_pad(element, templ, name, caps)
}
}
@@ -885,14 +1183,9 @@ impl AggregatorImpl for FMP4Mux {
match query.view_mut() {
QueryViewMut::Caps(q) => {
- let state = self.state.lock().unwrap();
-
- let allowed_caps = if let Some(ref caps) = state.caps {
- // TODO: Maybe allow codec_data changes and similar?
- caps.clone()
- } else {
- aggregator_pad.pad_template_caps()
- };
+ let allowed_caps = aggregator_pad
+ .current_caps()
+ .unwrap_or_else(|| aggregator_pad.pad_template_caps());
if let Some(filter_caps) = q.filter() {
let res = filter_caps
@@ -956,59 +1249,14 @@ impl AggregatorImpl for FMP4Mux {
.downcast::<gst::ClockTime>()
.expect("non-TIME segment");
gst::info!(CAT, obj: aggregator_pad, "Received segment {:?}", segment);
- aggregator.update_segment(&segment);
- self.parent_sink_event(aggregator, aggregator_pad, event)
- }
- EventView::Caps(ev) => {
- let caps = ev.caps_owned();
-
- gst::info!(CAT, obj: aggregator_pad, "Received caps {:?}", caps);
- let caps = {
- let settings = self.settings.lock().unwrap().clone();
- let mut state = self.state.lock().unwrap();
-
- let s = caps.structure(0).unwrap();
-
- match s.name() {
- "video/x-h264" | "video/x-h265" => {
- if !s.has_field_with_type("codec_data", gst::Buffer::static_type()) {
- gst::error!(
- CAT,
- obj: aggregator_pad,
- "Received caps without codec_data"
- );
- return false;
- }
- }
- "audio/mpeg" => {
- if !s.has_field_with_type("codec_data", gst::Buffer::static_type()) {
- gst::error!(
- CAT,
- obj: aggregator_pad,
- "Received caps without codec_data"
- );
- return false;
- }
- state.intra_only = true;
- }
- _ => unreachable!(),
- }
-
- state.caps = Some(caps);
-
- let (_, caps) =
- match self.update_header(aggregator, &mut state, &settings, false) {
- Ok(Some(res)) => res,
- _ => {
- return false;
- }
- };
-
- caps
- };
-
- aggregator.set_src_caps(&caps);
+ // Only forward the segment event verbatim if this is a single stream variant.
+ // Otherwise we have to produce a default segment and re-timestamp all buffers
+ // with their running time.
+ let class = aggregator.class();
+ if class.as_ref().variant.is_single_stream() {
+ aggregator.update_segment(&segment);
+ }
self.parent_sink_event(aggregator, aggregator_pad, event)
}
@@ -1057,10 +1305,13 @@ impl AggregatorImpl for FMP4Mux {
let mut state = self.state.lock().unwrap();
- state.queued_gops.clear();
- state.queued_duration = gst::ClockTime::ZERO;
- state.dts_offset = None;
- state.last_force_keyunit_time = None;
+ for stream in &mut state.streams {
+ stream.queued_gops.clear();
+ stream.dts_offset = None;
+ stream.last_force_keyunit_time = None;
+ stream.fragment_filled = false;
+ }
+
state.current_offset = 0;
state.fragment_offsets.clear();
@@ -1071,7 +1322,9 @@ impl AggregatorImpl for FMP4Mux {
gst::trace!(CAT, obj: aggregator, "Stopping");
let _ = self.parent_stop(aggregator);
+
*self.state.lock().unwrap() = State::default();
+
Ok(())
}
@@ -1079,7 +1332,19 @@ impl AggregatorImpl for FMP4Mux {
gst::trace!(CAT, obj: aggregator, "Starting");
self.parent_start(aggregator)?;
+
+ // For non-single-stream variants configure a default segment that allows for negative
+ // DTS so that we can correctly re-timestamp buffers with their running times.
+ let class = aggregator.class();
+ if !class.as_ref().variant.is_single_stream() {
+ let mut segment = gst::FormattedSegment::<gst::ClockTime>::new();
+ segment.set_start(SEGMENT_OFFSET);
+ segment.set_position(SEGMENT_OFFSET);
+ aggregator.update_segment(&segment);
+ }
+
*self.state.lock().unwrap() = State::default();
+
Ok(())
}
@@ -1094,52 +1359,159 @@ impl AggregatorImpl for FMP4Mux {
) -> Result<gst::FlowSuccess, gst::FlowError> {
let settings = self.settings.lock().unwrap().clone();
- let is_eos;
+ let mut all_eos = true;
let mut upstream_events = vec![];
let buffers = {
let mut state = self.state.lock().unwrap();
- let segment = match self
- .sinkpad
- .segment()
- .clone()
- .downcast::<gst::ClockTime>()
- .ok()
- {
- Some(segment) => segment,
- None => {
- gst::error!(CAT, obj: aggregator, "Got buffer before segment");
- return Err(gst::FlowError::Error);
+ // Create streams, stream header in the beginning and set output caps.
+ if state.stream_header.is_none() {
+ self.create_streams(aggregator, &mut state)?;
+
+ let (_, caps) = self
+ .update_header(aggregator, &mut state, &settings, false)?
+ .unwrap();
+
+ drop(state);
+ aggregator.set_src_caps(&caps);
+ state = self.state.lock().unwrap();
+ }
+
+ // Queue buffers from all streams that are not filled for the current fragment yet
+ let fragment_start_pts = state.fragment_start_pts;
+ for (idx, stream) in state.streams.iter_mut().enumerate() {
+ if stream.fragment_filled {
+ let buffer = stream.sinkpad.peek_buffer();
+ all_eos &= buffer.is_none() && stream.sinkpad.is_eos();
+
+ continue;
}
- };
- let buffer = self.sinkpad.pop_buffer();
- is_eos = buffer.is_none() && self.sinkpad.is_eos();
+ let buffer = stream.sinkpad.pop_buffer();
+ all_eos &= buffer.is_none() && stream.sinkpad.is_eos();
+
+ let buffer = match buffer {
+ None => continue,
+ Some(buffer) => buffer,
+ };
+
+ let segment = match stream
+ .sinkpad
+ .segment()
+ .clone()
+ .downcast::<gst::ClockTime>()
+ .ok()
+ {
+ Some(segment) => segment,
+ None => {
+ gst::error!(CAT, obj: &stream.sinkpad, "Got buffer before segment");
+ return Err(gst::FlowError::Error);
+ }
+ };
- if let Some(buffer) = buffer {
let pts = buffer.pts();
// Queue up the buffer and update GOP tracking state
- self.queue_input(aggregator, &mut state, &segment, buffer)?;
+ self.queue_input(aggregator, idx, stream, &segment, buffer)?;
// If we have a PTS with this buffer, check if a new force-keyunit event for the next
// fragment start has to be created
if let Some(pts) = pts {
- if let Some(event) = self.create_force_keyunit_event(
- aggregator, &mut state, &settings, &segment, pts,
- )? {
- upstream_events.push(event);
+ if let Some(event) = self
+ .create_force_keyunit_event(aggregator, stream, &settings, &segment, pts)?
+ {
+ upstream_events.push((stream.sinkpad.clone(), event));
+ }
+ }
+
+ // Check if this stream is filled enough now.
+ if let Some((queued_end_pts, fragment_start_pts)) = Option::zip(
+ stream
+ .queued_gops
+ .iter()
+ .find(|gop| gop.final_end_pts)
+ .map(|gop| gop.end_pts),
+ fragment_start_pts,
+ ) {
+ if queued_end_pts.saturating_sub(fragment_start_pts)
+ >= settings.fragment_duration
+ {
+ gst::debug!(CAT, obj: &stream.sinkpad, "Stream queued enough data for this fragment");
+ stream.fragment_filled = true;
+ }
+ }
+ }
+
+ if all_eos {
+ gst::debug!(CAT, obj: aggregator, "All streams are EOS now");
+ }
+
+ // Calculate the earliest PTS after queueing input if we can now.
+ if state.earliest_pts.is_none() {
+ let mut earliest_pts = None;
+
+ for stream in &state.streams {
+ let stream_earliest_pts = match stream.queued_gops.back() {
+ None => {
+ earliest_pts = None;
+ break;
+ }
+ Some(oldest_gop) => {
+ if !oldest_gop.final_earliest_pts {
+ earliest_pts = None;
+ break;
+ }
+
+ oldest_gop.earliest_pts
+ }
+ };
+
+ if earliest_pts.map_or(true, |earliest_pts| earliest_pts > stream_earliest_pts)
+ {
+ earliest_pts = Some(stream_earliest_pts);
+ }
+ }
+
+ if let Some(earliest_pts) = earliest_pts {
+ gst::info!(CAT, obj: aggregator, "Got earliest PTS {}", earliest_pts);
+ state.earliest_pts = Some(earliest_pts);
+ state.fragment_start_pts = Some(earliest_pts);
+
+ for stream in &mut state.streams {
+ if let Some(event) = self.create_initial_force_keyunit_event(
+ aggregator,
+ stream,
+ &settings,
+ earliest_pts,
+ )? {
+ upstream_events.push((stream.sinkpad.clone(), event));
+ }
+
+ // Check if this stream is filled enough now.
+ if let Some(queued_end_pts) = stream
+ .queued_gops
+ .iter()
+ .find(|gop| gop.final_end_pts)
+ .map(|gop| gop.end_pts)
+ {
+ if queued_end_pts.saturating_sub(earliest_pts)
+ >= settings.fragment_duration
+ {
+ gst::debug!(CAT, obj: &stream.sinkpad, "Stream queued enough data for this fragment");
+ stream.fragment_filled = true;
+ }
+ }
}
}
}
// If enough GOPs were queued, drain and create the output fragment
- self.drain(aggregator, &mut state, &settings, is_eos)?
+ self.drain(aggregator, &mut state, &settings, all_eos)?
};
- for event in upstream_events {
- self.sinkpad.push_event(event);
+ for (sinkpad, event) in upstream_events {
+ sinkpad.push_event(event);
}
if let Some(buffers) = buffers {
@@ -1147,7 +1519,9 @@ impl AggregatorImpl for FMP4Mux {
aggregator.finish_buffer_list(buffers)?;
}
- if is_eos {
+ if all_eos {
+ gst::debug!(CAT, obj: aggregator, "Doing EOS handling");
+
if settings.header_update_mode != super::HeaderUpdateMode::None {
let updated_header = self.update_header(
aggregator,
@@ -1163,14 +1537,14 @@ impl AggregatorImpl for FMP4Mux {
let src_pad = aggregator.src_pad();
let mut q = gst::query::Seeking::new(gst::Format::Bytes);
if src_pad.peer_query(&mut q) && q.result().0 {
+ aggregator.set_src_caps(&caps);
+
// Seek to the beginning with a default bytes segment
aggregator
.update_segment(
&gst::FormattedSegment::<gst::format::Bytes>::new(),
);
- aggregator.set_src_caps(&caps);
-
if let Err(err) = aggregator.finish_buffer_list(buffer_list) {
gst::error!(
CAT,
@@ -1212,8 +1586,8 @@ impl AggregatorImpl for FMP4Mux {
}
}
- // Need to generate new headers if started again after EOS
- self.state.lock().unwrap().generated_headers = false;
+ // Need to output new headers if started again after EOS
+ self.state.lock().unwrap().sent_headers = false;
Err(gst::FlowError::Eos)
} else {
@@ -1294,9 +1668,9 @@ impl ElementImpl for ISOFMP4Mux {
.unwrap();
let sink_pad_template = gst::PadTemplate::new(
- "sink",
+ "sink_%u",
gst::PadDirection::Sink,
- gst::PadPresence::Always,
+ gst::PadPresence::Request,
&[
gst::Structure::builder("video/x-h264")
.field("stream-format", gst::List::new(["avc", "avc3"]))
diff --git a/generic/fmp4/src/fmp4mux/mod.rs b/generic/fmp4/src/fmp4mux/mod.rs
index b3d23e25a..98e328a3d 100644
--- a/generic/fmp4/src/fmp4mux/mod.rs
+++ b/generic/fmp4/src/fmp4mux/mod.rs
@@ -99,6 +99,15 @@ pub(crate) enum Variant {
DASH,
}
+impl Variant {
+ pub(crate) fn is_single_stream(self) -> bool {
+ match self {
+ Variant::ISO => false,
+ Variant::CMAF | Variant::DASH => true,
+ }
+ }
+}
+
#[derive(Debug)]
pub(crate) struct FragmentOffset {
time: gst::ClockTime,
diff --git a/generic/fmp4/tests/tests.rs b/generic/fmp4/tests/tests.rs
index 6c871ccc0..c9221e70f 100644
--- a/generic/fmp4/tests/tests.rs
+++ b/generic/fmp4/tests/tests.rs
@@ -6,6 +6,8 @@
// SPDX-License-Identifier: MPL-2.0
//
+use gst::prelude::*;
+
fn init() {
use std::sync::Once;
static INIT: Once = Once::new();
@@ -16,12 +18,18 @@ fn init() {
});
}
-#[test]
-fn test_buffer_flags() {
- init();
+fn test_buffer_flags_single_stream(cmaf: bool) {
+ let mut h = if cmaf {
+ gst_check::Harness::new("cmafmux")
+ } else {
+ gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src"))
+ };
// 5s fragment duration
- let mut h = gst_check::Harness::new_parse("isofmp4mux fragment-duration=5000000000");
+ h.element()
+ .unwrap()
+ .set_property("fragment-duration", gst::ClockTime::from_seconds(5));
+
h.set_src_caps(
gst::Caps::builder("video/x-h264")
.field("width", 1920i32)
@@ -34,6 +42,12 @@ fn test_buffer_flags() {
);
h.play();
+ let output_offset = if cmaf {
+ gst::ClockTime::ZERO
+ } else {
+ gst::ClockTime::from_seconds(60 * 60 * 1000)
+ };
+
// Push 7 buffers of 1s each, 1st and last buffer without DELTA_UNIT flag
for i in 0..7 {
let mut buffer = gst::Buffer::with_size(1).unwrap();
@@ -75,13 +89,19 @@ fn test_buffer_flags() {
header.flags(),
gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT
);
- assert_eq!(header.pts(), Some(gst::ClockTime::ZERO));
- assert_eq!(header.dts(), Some(gst::ClockTime::ZERO));
+ assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset));
+ assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset));
let fragment_header = h.pull().unwrap();
assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
- assert_eq!(fragment_header.pts(), Some(gst::ClockTime::ZERO));
- assert_eq!(fragment_header.dts(), Some(gst::ClockTime::ZERO));
+ assert_eq!(
+ fragment_header.pts(),
+ Some(gst::ClockTime::ZERO + output_offset)
+ );
+ assert_eq!(
+ fragment_header.dts(),
+ Some(gst::ClockTime::ZERO + output_offset)
+ );
assert_eq!(
fragment_header.duration(),
Some(gst::ClockTime::from_seconds(5))
@@ -97,8 +117,14 @@ fn test_buffer_flags() {
} else {
assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
}
- assert_eq!(buffer.pts(), Some(gst::ClockTime::from_seconds(i)));
- assert_eq!(buffer.dts(), Some(gst::ClockTime::from_seconds(i)));
+ assert_eq!(
+ buffer.pts(),
+ Some(gst::ClockTime::from_seconds(i) + output_offset)
+ );
+ assert_eq!(
+ buffer.dts(),
+ Some(gst::ClockTime::from_seconds(i) + output_offset)
+ );
assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
}
@@ -106,8 +132,14 @@ fn test_buffer_flags() {
let fragment_header = h.pull().unwrap();
assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
- assert_eq!(fragment_header.pts(), Some(gst::ClockTime::from_seconds(5)));
- assert_eq!(fragment_header.dts(), Some(gst::ClockTime::from_seconds(5)));
+ assert_eq!(
+ fragment_header.pts(),
+ Some(gst::ClockTime::from_seconds(5) + output_offset)
+ );
+ assert_eq!(
+ fragment_header.dts(),
+ Some(gst::ClockTime::from_seconds(5) + output_offset)
+ );
assert_eq!(
fragment_header.duration(),
Some(gst::ClockTime::from_seconds(2))
@@ -123,8 +155,14 @@ fn test_buffer_flags() {
} else {
assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
}
- assert_eq!(buffer.pts(), Some(gst::ClockTime::from_seconds(i)));
- assert_eq!(buffer.dts(), Some(gst::ClockTime::from_seconds(i)));
+ assert_eq!(
+ buffer.pts(),
+ Some(gst::ClockTime::from_seconds(i) + output_offset)
+ );
+ assert_eq!(
+ buffer.dts(),
+ Some(gst::ClockTime::from_seconds(i) + output_offset)
+ );
assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
}
@@ -137,3 +175,17 @@ fn test_buffer_flags() {
let ev = h.pull_event().unwrap();
assert_eq!(ev.type_(), gst::EventType::Eos);
}
+
+#[test]
+fn test_buffer_flags_single_stream_cmaf() {
+ init();
+
+ test_buffer_flags_single_stream(true);
+}
+
+#[test]
+fn test_buffer_flags_single_stream_iso() {
+ init();
+
+ test_buffer_flags_single_stream(false);
+}