Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/sdroege/gst-plugin-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--generic/fmp4/src/fmp4mux/imp.rs295
-rw-r--r--generic/fmp4/tests/tests.rs540
2 files changed, 702 insertions, 133 deletions
diff --git a/generic/fmp4/src/fmp4mux/imp.rs b/generic/fmp4/src/fmp4mux/imp.rs
index 0b2cf53c..ba14f819 100644
--- a/generic/fmp4/src/fmp4mux/imp.rs
+++ b/generic/fmp4/src/fmp4mux/imp.rs
@@ -91,12 +91,14 @@ impl Default for Settings {
}
}
+#[derive(Debug)]
struct GopBuffer {
buffer: gst::Buffer,
pts: gst::ClockTime,
dts: Option<gst::ClockTime>,
}
+#[derive(Debug)]
struct Gop {
// Running times
start_pts: gst::ClockTime,
@@ -139,8 +141,6 @@ struct Stream {
// going backwards when draining a fragment.
// UNIX epoch.
current_utc_time: gst::ClockTime,
-
- last_force_keyunit_time: Option<gst::ClockTime>,
}
#[derive(Default)]
@@ -163,6 +163,8 @@ struct State {
// Start PTS of the current fragment
fragment_start_pts: Option<gst::ClockTime>,
+ // Additional timeout delay in case GOPs are bigger than the fragment duration
+ timeout_delay: gst::ClockTime,
// In ONVIF mode the UTC time corresponding to the beginning of the stream
// UNIX epoch.
@@ -604,92 +606,29 @@ impl FMP4Mux {
Ok(())
}
- fn create_initial_force_keyunit_event(
- &self,
- _element: &super::FMP4Mux,
- stream: &mut Stream,
- settings: &Settings,
- earliest_pts: gst::ClockTime,
- ) -> Result<Option<gst::Event>, gst::FlowError> {
- assert!(stream.last_force_keyunit_time.is_none());
-
- // If we never sent a force-keyunit event then send one now.
- let fku_running_time = earliest_pts + settings.fragment_duration;
- gst::debug!(
- CAT,
- obj: &stream.sinkpad,
- "Sending first force-keyunit event for running time {}",
- fku_running_time
- );
- stream.last_force_keyunit_time = Some(fku_running_time);
-
- return Ok(Some(
- gst_video::UpstreamForceKeyUnitEvent::builder()
- .running_time(fku_running_time)
- .all_headers(true)
- .build(),
- ));
- }
-
- fn create_force_keyunit_event(
- &self,
- _element: &super::FMP4Mux,
- stream: &mut Stream,
- settings: &Settings,
- segment: &gst::FormattedSegment<gst::ClockTime>,
- pts: gst::ClockTime,
- ) -> Result<Option<gst::Event>, gst::FlowError> {
- // If we never sent a force-keyunit event then wait until the earliest PTS of the first GOP
- // is known and send it then.
- //
- // Otherwise if the current PTS is a fragment duration in the future, send the next one
- // now.
-
- let last_force_keyunit_time = match stream.last_force_keyunit_time {
- None => return Ok(None),
- Some(last_force_keyunit_time) => last_force_keyunit_time,
- };
-
- let pts = segment.to_running_time(pts);
- if pts.opt_lt(last_force_keyunit_time).unwrap_or(true) {
- return Ok(None);
- }
-
- let fku_running_time = last_force_keyunit_time + settings.fragment_duration;
- gst::debug!(
- CAT,
- obj: &stream.sinkpad,
- "Sending force-keyunit event for running time {}",
- fku_running_time
- );
- stream.last_force_keyunit_time = Some(fku_running_time);
-
- Ok(Some(
- gst_video::UpstreamForceKeyUnitEvent::builder()
- .running_time(fku_running_time)
- .all_headers(true)
- .build(),
- ))
- }
-
#[allow(clippy::type_complexity)]
fn drain_buffers(
&self,
- _element: &super::FMP4Mux,
+ element: &super::FMP4Mux,
state: &mut State,
settings: &Settings,
timeout: bool,
at_eos: bool,
) -> Result<
(
+ // Drained streams
Vec<(
gst::Caps,
Option<super::FragmentTimingInfo>,
VecDeque<Buffer>,
)>,
+ // Minimum earliest PTS position of all streams
Option<gst::ClockTime>,
+ // Minimum earliest PTS of all streams
Option<gst::ClockTime>,
+ // Minimum start DTS position of all streams (if any stream has DTS)
Option<gst::ClockTime>,
+ // End PTS of this drained fragment, i.e. start PTS of the next fragment
Option<gst::ClockTime>,
),
gst::FlowError,
@@ -699,7 +638,24 @@ impl FMP4Mux {
let mut min_earliest_pts_position = None;
let mut min_earliest_pts = None;
let mut min_start_dts_position = None;
- let mut max_end_pts = None;
+ let mut fragment_end_pts = None;
+
+ // The first stream decides how much can be dequeued, if anything at all.
+ //
+ // All complete GOPs (or at EOS everything) up to the fragment duration will be dequeued
+ // but on timeout in live pipelines it might happen that the first stream does not have a
+ // complete GOP queued. In that case nothing is dequeued for any of the streams and the
+ // timeout is advanced by 1s until at least one complete GOP can be dequeued.
+ //
+ // If the first stream is already EOS then the next stream that is not EOS yet will be
+ // taken in its place.
+ let fragment_start_pts = state.fragment_start_pts.unwrap();
+ gst::info!(
+ CAT,
+ obj: element,
+ "Starting to drain at {}",
+ fragment_start_pts
+ );
for (idx, stream) in state.streams.iter_mut().enumerate() {
assert!(
@@ -709,29 +665,74 @@ impl FMP4Mux {
|| stream.queued_gops.get(1).map(|gop| gop.final_earliest_pts) == Some(true)
);
- // At EOS, finalize all GOPs and drain them out. Otherwise if the queued duration is
- // equal to the fragment duration then drain out all complete GOPs, otherwise all
- // except for the newest complete GOP.
- let gops = if at_eos || stream.sinkpad.is_eos() {
- stream.queued_gops.drain(..).rev().collect::<Vec<_>>()
- } else {
- let mut gops = vec![];
+ // Drain all complete GOPs until at most one fragment duration was dequeued for the
+ // first stream, or until the dequeued duration of the first stream.
+ let mut gops = Vec::with_capacity(stream.queued_gops.len());
+ let dequeue_end_pts =
+ fragment_end_pts.unwrap_or(fragment_start_pts + settings.fragment_duration);
+ gst::trace!(
+ CAT,
+ obj: &stream.sinkpad,
+ "Draining up to end PTS {} / duration {}",
+ dequeue_end_pts,
+ dequeue_end_pts - fragment_start_pts
+ );
- let fragment_start_pts = state.fragment_start_pts.unwrap();
- while let Some(gop) = stream.queued_gops.pop_back() {
- assert!(timeout || gop.final_end_pts);
+ while let Some(gop) = stream.queued_gops.back() {
+ // If this GOP is not complete then we can't pop it yet.
+ //
+ // If there was no complete GOP at all yet then it might be bigger than the
+ // fragment duration. In this case we might not be able to handle the latency
+ // requirements in a live pipeline.
+ if !gop.final_end_pts && !at_eos && !stream.sinkpad.is_eos() {
+ break;
+ }
- let end_pts = gop.end_pts;
- gops.push(gop);
- if end_pts.saturating_sub(fragment_start_pts) >= settings.fragment_duration {
- break;
- }
+ // If this GOP starts after the fragment end then don't dequeue it yet unless this is
+ // the first stream and no GOPs were dequeued at all yet. This would mean that the
+ // GOP is bigger than the fragment duration.
+ if gop.end_pts > dequeue_end_pts && (fragment_end_pts.is_some() || !gops.is_empty())
+ {
+ break;
}
- gops
- };
+ gops.push(stream.queued_gops.pop_back().unwrap());
+ }
stream.fragment_filled = false;
+ // If we don't have a next fragment start PTS then this is the first stream as above.
+ if fragment_end_pts.is_none() {
+ if let Some(last_gop) = gops.last() {
+ // Dequeued something so let's take the end PTS of the last GOP
+ fragment_end_pts = Some(last_gop.end_pts);
+ gst::info!(
+ CAT,
+ obj: &stream.sinkpad,
+ "Draining up to PTS {} for this fragment",
+ last_gop.end_pts,
+ );
+ } else {
+ // If nothing was dequeued for the first stream then this is OK if we're at
+ // EOS: we just consider the next stream as first stream then.
+ if at_eos || stream.sinkpad.is_eos() {
+ // This is handled below generally if nothing was dequeued
+ } else {
+ // Otherwise this can only really happen on timeout in live pipelines.
+ assert!(timeout);
+
+ gst::warning!(
+ CAT,
+ obj: &stream.sinkpad,
+ "Don't have a complete GOP for the first stream on timeout in a live pipeline",
+ );
+
+ // In this case we advance the timeout by 1s and hope that things are
+ // better then.
+ return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA);
+ }
+ }
+ }
+
if gops.is_empty() {
gst::info!(
CAT,
@@ -743,6 +744,8 @@ impl FMP4Mux {
continue;
}
+ assert!(fragment_end_pts.is_some());
+
let first_gop = gops.first().unwrap();
let last_gop = gops.last().unwrap();
let earliest_pts = first_gop.earliest_pts;
@@ -769,9 +772,6 @@ impl FMP4Mux {
min_start_dts_position = Some(start_dts_position);
}
}
- if max_end_pts.opt_lt(end_pts).unwrap_or(true) {
- max_end_pts = Some(end_pts);
- }
gst::info!(
CAT,
@@ -892,7 +892,7 @@ impl FMP4Mux {
min_earliest_pts_position,
min_earliest_pts,
min_start_dts_position,
- max_end_pts,
+ fragment_end_pts,
))
}
@@ -1229,6 +1229,7 @@ impl FMP4Mux {
settings: &Settings,
timeout: bool,
at_eos: bool,
+ upstream_events: &mut Vec<(gst_base::AggregatorPad, gst::Event)>,
) -> Result<(Option<gst::Caps>, Option<gst::BufferList>), gst::FlowError> {
if at_eos {
gst::info!(CAT, obj: element, "Draining at EOS");
@@ -1248,7 +1249,7 @@ impl FMP4Mux {
min_earliest_pts_position,
min_earliest_pts,
min_start_dts_position,
- max_end_pts,
+ fragment_end_pts,
) = self.drain_buffers(element, state, settings, timeout, at_eos)?;
// For ONVIF, replace all timestamps with timestamps based on UTC times.
@@ -1279,13 +1280,13 @@ impl FMP4Mux {
let mut buffer_list = None;
if interleaved_buffers.is_empty() {
- assert!(timeout || at_eos);
+ assert!(at_eos);
} else {
// If there are actual buffers to output then create headers as needed and create a
// bufferlist for all buffers that have to be output.
let min_earliest_pts_position = min_earliest_pts_position.unwrap();
let min_earliest_pts = min_earliest_pts.unwrap();
- let max_end_pts = max_end_pts.unwrap();
+ let fragment_end_pts = fragment_end_pts.unwrap();
let mut fmp4_header = None;
if !state.sent_headers {
@@ -1334,7 +1335,7 @@ impl FMP4Mux {
let buffer = fmp4_fragment_header.get_mut().unwrap();
buffer.set_pts(min_earliest_pts_position);
buffer.set_dts(min_start_dts_position);
- buffer.set_duration(max_end_pts.checked_sub(min_earliest_pts));
+ buffer.set_duration(fragment_end_pts.checked_sub(min_earliest_pts));
// Fragment header is HEADER
buffer.set_flags(gst::BufferFlags::HEADER);
@@ -1386,22 +1387,37 @@ impl FMP4Mux {
offset: moof_offset,
});
}
- state.end_pts = Some(max_end_pts);
+
+ state.end_pts = Some(fragment_end_pts);
state.end_utc_time = max_end_utc_time;
// Update for the start PTS of the next fragment
- state.fragment_start_pts = state.fragment_start_pts.map(|start| {
- let new_fragment_start = start + settings.fragment_duration;
+ gst::info!(
+ CAT,
+ obj: element,
+ "Starting new fragment at {}",
+ fragment_end_pts,
+ );
+ state.fragment_start_pts = Some(fragment_end_pts);
- gst::info!(
- CAT,
- obj: element,
- "Starting new fragment at {}",
- new_fragment_start
- );
+ gst::debug!(
+ CAT,
+ obj: element,
+ "Sending force-keyunit events for running time {}",
+ fragment_end_pts + settings.fragment_duration,
+ );
- new_fragment_start
- });
+ let fku = gst_video::UpstreamForceKeyUnitEvent::builder()
+ .running_time(fragment_end_pts + settings.fragment_duration)
+ .all_headers(true)
+ .build();
+
+ for stream in &state.streams {
+ upstream_events.push((stream.sinkpad.clone(), fku.clone()));
+ }
+
+ // Reset timeout delay now that we've output an actual fragment
+ state.timeout_delay = gst::ClockTime::ZERO;
}
if settings.write_mfra && at_eos {
@@ -1491,7 +1507,6 @@ impl FMP4Mux {
dts_offset: None,
current_position: gst::ClockTime::ZERO,
current_utc_time: gst::ClockTime::ZERO,
- last_force_keyunit_time: None,
});
}
@@ -1801,7 +1816,7 @@ impl ElementImpl for FMP4Mux {
impl AggregatorImpl for FMP4Mux {
fn next_time(&self, _aggregator: &Self::Type) -> Option<gst::ClockTime> {
let state = self.state.lock().unwrap();
- state.fragment_start_pts
+ state.fragment_start_pts.opt_add(state.timeout_delay)
}
fn sink_query(
@@ -1938,7 +1953,6 @@ impl AggregatorImpl for FMP4Mux {
stream.dts_offset = None;
stream.current_position = gst::ClockTime::ZERO;
stream.current_utc_time = gst::ClockTime::ZERO;
- stream.last_force_keyunit_time = None;
stream.fragment_filled = false;
}
@@ -2030,21 +2044,9 @@ impl AggregatorImpl for FMP4Mux {
}
};
- let pts = buffer.pts();
-
// Queue up the buffer and update GOP tracking state
self.queue_gops(aggregator, idx, stream, &segment, buffer)?;
- // If we have a PTS with this buffer, check if a new force-keyunit event for the next
- // fragment start has to be created
- if let Some(pts) = pts {
- if let Some(event) = self
- .create_force_keyunit_event(aggregator, stream, &settings, &segment, pts)?
- {
- upstream_events.push((stream.sinkpad.clone(), event));
- }
- }
-
// Check if this stream is filled enough now.
if let Some((queued_end_pts, fragment_start_pts)) = Option::zip(
stream
@@ -2093,15 +2095,20 @@ impl AggregatorImpl for FMP4Mux {
state.earliest_pts = Some(earliest_pts);
state.fragment_start_pts = Some(earliest_pts);
+ gst::debug!(
+ CAT,
+ obj: aggregator,
+ "Sending first force-keyunit event for running time {}",
+ earliest_pts + settings.fragment_duration,
+ );
+
+ let fku = gst_video::UpstreamForceKeyUnitEvent::builder()
+ .running_time(earliest_pts + settings.fragment_duration)
+ .all_headers(true)
+ .build();
+
for stream in &mut state.streams {
- if let Some(event) = self.create_initial_force_keyunit_event(
- aggregator,
- stream,
- &settings,
- earliest_pts,
- )? {
- upstream_events.push((stream.sinkpad.clone(), event));
- }
+ upstream_events.push((stream.sinkpad.clone(), fku.clone()));
// Check if this stream is filled enough now.
if let Some(queued_end_pts) = stream
@@ -2127,7 +2134,31 @@ impl AggregatorImpl for FMP4Mux {
}
// If enough GOPs were queued, drain and create the output fragment
- self.drain(aggregator, &mut state, &settings, timeout, all_eos)?
+ match self.drain(
+ aggregator,
+ &mut state,
+ &settings,
+ timeout,
+ all_eos,
+ &mut upstream_events,
+ ) {
+ Ok(res) => res,
+ Err(gst_base::AGGREGATOR_FLOW_NEED_DATA) => {
+ gst::element_warning!(
+ aggregator,
+ gst::StreamError::Format,
+ ["Longer GOPs than fragment duration"]
+ );
+ state.timeout_delay += gst::ClockTime::from_seconds(1);
+
+ drop(state);
+ for (sinkpad, event) in upstream_events {
+ sinkpad.push_event(event);
+ }
+ return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA);
+ }
+ Err(err) => return Err(err),
+ }
};
for (sinkpad, event) in upstream_events {
diff --git a/generic/fmp4/tests/tests.rs b/generic/fmp4/tests/tests.rs
index 556b29c6..bf44643f 100644
--- a/generic/fmp4/tests/tests.rs
+++ b/generic/fmp4/tests/tests.rs
@@ -48,7 +48,7 @@ fn test_buffer_flags_single_stream(cmaf: bool) {
gst::ClockTime::from_seconds(60 * 60 * 1000)
};
- // Push 7 buffers of 1s each, 1st and last buffer without DELTA_UNIT flag
+ // Push 7 buffers of 1s each, 1st and 6 buffer without DELTA_UNIT flag
for i in 0..7 {
let mut buffer = gst::Buffer::with_size(1).unwrap();
{
@@ -881,3 +881,541 @@ fn test_gap_events() {
let ev = h1.pull_event().unwrap();
assert_eq!(ev.type_(), gst::EventType::Eos);
}
+
+#[test]
+fn test_single_stream_short_gops() {
+ init();
+
+ let mut h = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src"));
+
+ // 5s fragment duration
+ h.element()
+ .unwrap()
+ .set_property("fragment-duration", gst::ClockTime::from_seconds(5));
+
+ h.set_src_caps(
+ gst::Caps::builder("video/x-h264")
+ .field("width", 1920i32)
+ .field("height", 1080i32)
+ .field("framerate", gst::Fraction::new(30, 1))
+ .field("stream-format", "avc")
+ .field("alignment", "au")
+ .field("codec_data", gst::Buffer::with_size(1).unwrap())
+ .build(),
+ );
+ h.play();
+
+ let output_offset = gst::ClockTime::from_seconds(60 * 60 * 1000);
+
+ // Push 8 buffers of 1s each, 1st, 4th and 7th buffer without DELTA_UNIT flag
+ for i in 0..8 {
+ let mut buffer = gst::Buffer::with_size(1).unwrap();
+ {
+ let buffer = buffer.get_mut().unwrap();
+ buffer.set_pts(gst::ClockTime::from_seconds(i));
+ buffer.set_dts(gst::ClockTime::from_seconds(i));
+ buffer.set_duration(gst::ClockTime::SECOND);
+ if i != 0 && i != 3 && i != 6 {
+ buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
+ }
+ }
+ assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok));
+
+ if i == 2 || i == 7 {
+ let ev = loop {
+ let ev = h.pull_upstream_event().unwrap();
+ if ev.type_() != gst::EventType::Reconfigure
+ && ev.type_() != gst::EventType::Latency
+ {
+ break ev;
+ }
+ };
+
+ let fku_time = if i == 2 {
+ gst::ClockTime::from_seconds(5)
+ } else {
+ gst::ClockTime::from_seconds(8)
+ };
+
+ assert_eq!(ev.type_(), gst::EventType::CustomUpstream);
+ assert_eq!(
+ gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(),
+ gst_video::UpstreamForceKeyUnitEvent {
+ running_time: Some(fku_time),
+ all_headers: true,
+ count: 0
+ }
+ );
+ }
+ }
+
+ let header = h.pull().unwrap();
+ assert_eq!(
+ header.flags(),
+ gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT
+ );
+ assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset));
+ assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset));
+
+ let fragment_header = h.pull().unwrap();
+ assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
+ assert_eq!(
+ fragment_header.pts(),
+ Some(gst::ClockTime::ZERO + output_offset)
+ );
+ assert_eq!(
+ fragment_header.dts(),
+ Some(gst::ClockTime::ZERO + output_offset)
+ );
+ assert_eq!(
+ fragment_header.duration(),
+ Some(gst::ClockTime::from_seconds(3))
+ );
+
+ for i in 0..3 {
+ let buffer = h.pull().unwrap();
+ if i == 2 {
+ assert_eq!(
+ buffer.flags(),
+ gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
+ );
+ } else {
+ assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
+ }
+ assert_eq!(
+ buffer.pts(),
+ Some(gst::ClockTime::from_seconds(i) + output_offset)
+ );
+ assert_eq!(
+ buffer.dts(),
+ Some(gst::ClockTime::from_seconds(i) + output_offset)
+ );
+ assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
+ }
+
+ h.push_event(gst::event::Eos::new());
+
+ let fragment_header = h.pull().unwrap();
+ assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
+ assert_eq!(
+ fragment_header.pts(),
+ Some(gst::ClockTime::from_seconds(3) + output_offset)
+ );
+ assert_eq!(
+ fragment_header.dts(),
+ Some(gst::ClockTime::from_seconds(3) + output_offset)
+ );
+ assert_eq!(
+ fragment_header.duration(),
+ Some(gst::ClockTime::from_seconds(5))
+ );
+
+ for i in 3..8 {
+ let buffer = h.pull().unwrap();
+ if i == 7 {
+ assert_eq!(
+ buffer.flags(),
+ gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
+ );
+ } else {
+ assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
+ }
+ assert_eq!(
+ buffer.pts(),
+ Some(gst::ClockTime::from_seconds(i) + output_offset)
+ );
+ assert_eq!(
+ buffer.dts(),
+ Some(gst::ClockTime::from_seconds(i) + output_offset)
+ );
+ assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
+ }
+
+ let ev = h.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::StreamStart);
+ let ev = h.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::Caps);
+ let ev = h.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::Segment);
+ let ev = h.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::Eos);
+}
+
+#[test]
+fn test_single_stream_long_gops() {
+ init();
+
+ let mut h = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src"));
+
+ // 5s fragment duration
+ h.element()
+ .unwrap()
+ .set_property("fragment-duration", gst::ClockTime::from_seconds(5));
+
+ h.set_src_caps(
+ gst::Caps::builder("video/x-h264")
+ .field("width", 1920i32)
+ .field("height", 1080i32)
+ .field("framerate", gst::Fraction::new(30, 1))
+ .field("stream-format", "avc")
+ .field("alignment", "au")
+ .field("codec_data", gst::Buffer::with_size(1).unwrap())
+ .build(),
+ );
+ h.play();
+
+ let output_offset = gst::ClockTime::from_seconds(60 * 60 * 1000);
+
+ // Push 10 buffers of 1s each, 1st and 7th buffer without DELTA_UNIT flag
+ for i in 0..10 {
+ let mut buffer = gst::Buffer::with_size(1).unwrap();
+ {
+ let buffer = buffer.get_mut().unwrap();
+ buffer.set_pts(gst::ClockTime::from_seconds(i));
+ buffer.set_dts(gst::ClockTime::from_seconds(i));
+ buffer.set_duration(gst::ClockTime::SECOND);
+ if i != 0 && i != 6 {
+ buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
+ }
+ }
+ assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok));
+
+ if i == 2 || i == 7 {
+ let ev = loop {
+ let ev = h.pull_upstream_event().unwrap();
+ if ev.type_() != gst::EventType::Reconfigure
+ && ev.type_() != gst::EventType::Latency
+ {
+ break ev;
+ }
+ };
+
+ let fku_time = if i == 2 {
+ gst::ClockTime::from_seconds(5)
+ } else {
+ gst::ClockTime::from_seconds(11)
+ };
+
+ assert_eq!(ev.type_(), gst::EventType::CustomUpstream);
+ assert_eq!(
+ gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(),
+ gst_video::UpstreamForceKeyUnitEvent {
+ running_time: Some(fku_time),
+ all_headers: true,
+ count: 0
+ }
+ );
+ }
+ }
+
+ let header = h.pull().unwrap();
+ assert_eq!(
+ header.flags(),
+ gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT
+ );
+ assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset));
+ assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset));
+
+ let fragment_header = h.pull().unwrap();
+ assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
+ assert_eq!(
+ fragment_header.pts(),
+ Some(gst::ClockTime::ZERO + output_offset)
+ );
+ assert_eq!(
+ fragment_header.dts(),
+ Some(gst::ClockTime::ZERO + output_offset)
+ );
+ assert_eq!(
+ fragment_header.duration(),
+ Some(gst::ClockTime::from_seconds(6))
+ );
+
+ for i in 0..6 {
+ let buffer = h.pull().unwrap();
+ if i == 5 {
+ assert_eq!(
+ buffer.flags(),
+ gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
+ );
+ } else {
+ assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
+ }
+ assert_eq!(
+ buffer.pts(),
+ Some(gst::ClockTime::from_seconds(i) + output_offset)
+ );
+ assert_eq!(
+ buffer.dts(),
+ Some(gst::ClockTime::from_seconds(i) + output_offset)
+ );
+ assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
+ }
+
+ h.push_event(gst::event::Eos::new());
+
+ let fragment_header = h.pull().unwrap();
+ assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
+ assert_eq!(
+ fragment_header.pts(),
+ Some(gst::ClockTime::from_seconds(6) + output_offset)
+ );
+ assert_eq!(
+ fragment_header.dts(),
+ Some(gst::ClockTime::from_seconds(6) + output_offset)
+ );
+ assert_eq!(
+ fragment_header.duration(),
+ Some(gst::ClockTime::from_seconds(4))
+ );
+
+ for i in 6..10 {
+ let buffer = h.pull().unwrap();
+ if i == 9 {
+ assert_eq!(
+ buffer.flags(),
+ gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
+ );
+ } else {
+ assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
+ }
+ assert_eq!(
+ buffer.pts(),
+ Some(gst::ClockTime::from_seconds(i) + output_offset)
+ );
+ assert_eq!(
+ buffer.dts(),
+ Some(gst::ClockTime::from_seconds(i) + output_offset)
+ );
+ assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
+ }
+
+ let ev = h.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::StreamStart);
+ let ev = h.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::Caps);
+ let ev = h.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::Segment);
+ let ev = h.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::Eos);
+}
+
+#[test]
+fn test_buffer_multi_stream_short_gops() {
+ init();
+
+ let mut h1 = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src"));
+ let mut h2 = gst_check::Harness::with_element(&h1.element().unwrap(), Some("sink_1"), None);
+
+ // 5s fragment duration
+ h1.element()
+ .unwrap()
+ .set_property("fragment-duration", gst::ClockTime::from_seconds(5));
+
+ h1.set_src_caps(
+ gst::Caps::builder("video/x-h264")
+ .field("width", 1920i32)
+ .field("height", 1080i32)
+ .field("framerate", gst::Fraction::new(30, 1))
+ .field("stream-format", "avc")
+ .field("alignment", "au")
+ .field("codec_data", gst::Buffer::with_size(1).unwrap())
+ .build(),
+ );
+ h1.play();
+
+ h2.set_src_caps(
+ gst::Caps::builder("audio/mpeg")
+ .field("mpegversion", 4i32)
+ .field("channels", 1i32)
+ .field("rate", 44100i32)
+ .field("stream-format", "raw")
+ .field("base-profile", "lc")
+ .field("profile", "lc")
+ .field("level", "2")
+ .field(
+ "codec_data",
+ gst::Buffer::from_slice([0x12, 0x08, 0x56, 0xe5, 0x00]),
+ )
+ .build(),
+ );
+ h2.play();
+
+ let output_offset = gst::ClockTime::from_seconds(60 * 60 * 1000);
+
+ // Push 8 buffers of 1s each, 1st, 4th and 7th buffer without DELTA_UNIT flag
+ for i in 0..8 {
+ let mut buffer = gst::Buffer::with_size(1).unwrap();
+ {
+ let buffer = buffer.get_mut().unwrap();
+ buffer.set_pts(gst::ClockTime::from_seconds(i));
+ buffer.set_dts(gst::ClockTime::from_seconds(i));
+ buffer.set_duration(gst::ClockTime::SECOND);
+ if i != 0 && i != 3 && i != 6 {
+ buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
+ }
+ }
+ assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok));
+
+ let mut buffer = gst::Buffer::with_size(1).unwrap();
+ {
+ let buffer = buffer.get_mut().unwrap();
+ buffer.set_pts(gst::ClockTime::from_seconds(i));
+ buffer.set_dts(gst::ClockTime::from_seconds(i));
+ buffer.set_duration(gst::ClockTime::SECOND);
+ }
+ assert_eq!(h2.push(buffer), Ok(gst::FlowSuccess::Ok));
+
+ if i == 2 || i == 7 {
+ let ev = loop {
+ let ev = h1.pull_upstream_event().unwrap();
+ if ev.type_() != gst::EventType::Reconfigure
+ && ev.type_() != gst::EventType::Latency
+ {
+ break ev;
+ }
+ };
+
+ let fku_time = if i == 2 {
+ gst::ClockTime::from_seconds(5)
+ } else {
+ gst::ClockTime::from_seconds(8)
+ };
+
+ assert_eq!(ev.type_(), gst::EventType::CustomUpstream);
+ assert_eq!(
+ gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(),
+ gst_video::UpstreamForceKeyUnitEvent {
+ running_time: Some(fku_time),
+ all_headers: true,
+ count: 0
+ }
+ );
+
+ let ev = loop {
+ let ev = h2.pull_upstream_event().unwrap();
+ if ev.type_() != gst::EventType::Reconfigure
+ && ev.type_() != gst::EventType::Latency
+ {
+ break ev;
+ }
+ };
+
+ assert_eq!(ev.type_(), gst::EventType::CustomUpstream);
+ assert_eq!(
+ gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(),
+ gst_video::UpstreamForceKeyUnitEvent {
+ running_time: Some(fku_time),
+ all_headers: true,
+ count: 0
+ }
+ );
+ }
+ }
+
+ let header = h1.pull().unwrap();
+ assert_eq!(
+ header.flags(),
+ gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT
+ );
+ assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset));
+ assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset));
+
+ let fragment_header = h1.pull().unwrap();
+ assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
+ assert_eq!(
+ fragment_header.pts(),
+ Some(gst::ClockTime::ZERO + output_offset)
+ );
+ assert_eq!(
+ fragment_header.dts(),
+ Some(gst::ClockTime::ZERO + output_offset)
+ );
+ assert_eq!(
+ fragment_header.duration(),
+ Some(gst::ClockTime::from_seconds(3))
+ );
+
+ for i in 0..3 {
+ for j in 0..2 {
+ let buffer = h1.pull().unwrap();
+ if i == 2 && j == 1 {
+ assert_eq!(
+ buffer.flags(),
+ gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
+ );
+ } else {
+ assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
+ }
+
+ assert_eq!(
+ buffer.pts(),
+ Some(gst::ClockTime::from_seconds(i) + output_offset)
+ );
+
+ if j == 0 {
+ assert_eq!(
+ buffer.dts(),
+ Some(gst::ClockTime::from_seconds(i) + output_offset)
+ );
+ } else {
+ assert!(buffer.dts().is_none());
+ }
+ assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
+ }
+ }
+
+ h1.push_event(gst::event::Eos::new());
+ h2.push_event(gst::event::Eos::new());
+
+ let fragment_header = h1.pull().unwrap();
+ assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
+ assert_eq!(
+ fragment_header.pts(),
+ Some(gst::ClockTime::from_seconds(3) + output_offset)
+ );
+ assert_eq!(
+ fragment_header.dts(),
+ Some(gst::ClockTime::from_seconds(3) + output_offset)
+ );
+ assert_eq!(
+ fragment_header.duration(),
+ Some(gst::ClockTime::from_seconds(5))
+ );
+
+ for i in 3..8 {
+ for j in 0..2 {
+ let buffer = h1.pull().unwrap();
+ if i == 7 && j == 1 {
+ assert_eq!(
+ buffer.flags(),
+ gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
+ );
+ } else {
+ assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
+ }
+ assert_eq!(
+ buffer.pts(),
+ Some(gst::ClockTime::from_seconds(i) + output_offset)
+ );
+ if j == 0 {
+ assert_eq!(
+ buffer.dts(),
+ Some(gst::ClockTime::from_seconds(i) + output_offset)
+ );
+ } else {
+ assert!(buffer.dts().is_none());
+ }
+ assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
+ }
+ }
+
+ let ev = h1.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::StreamStart);
+ let ev = h1.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::Caps);
+ let ev = h1.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::Segment);
+ let ev = h1.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::Eos);
+}