diff options
-rw-r--r-- | generic/fmp4/src/fmp4mux/imp.rs | 295 | ||||
-rw-r--r-- | generic/fmp4/tests/tests.rs | 540 |
2 files changed, 702 insertions, 133 deletions
diff --git a/generic/fmp4/src/fmp4mux/imp.rs b/generic/fmp4/src/fmp4mux/imp.rs index 0b2cf53c..ba14f819 100644 --- a/generic/fmp4/src/fmp4mux/imp.rs +++ b/generic/fmp4/src/fmp4mux/imp.rs @@ -91,12 +91,14 @@ impl Default for Settings { } } +#[derive(Debug)] struct GopBuffer { buffer: gst::Buffer, pts: gst::ClockTime, dts: Option<gst::ClockTime>, } +#[derive(Debug)] struct Gop { // Running times start_pts: gst::ClockTime, @@ -139,8 +141,6 @@ struct Stream { // going backwards when draining a fragment. // UNIX epoch. current_utc_time: gst::ClockTime, - - last_force_keyunit_time: Option<gst::ClockTime>, } #[derive(Default)] @@ -163,6 +163,8 @@ struct State { // Start PTS of the current fragment fragment_start_pts: Option<gst::ClockTime>, + // Additional timeout delay in case GOPs are bigger than the fragment duration + timeout_delay: gst::ClockTime, // In ONVIF mode the UTC time corresponding to the beginning of the stream // UNIX epoch. @@ -604,92 +606,29 @@ impl FMP4Mux { Ok(()) } - fn create_initial_force_keyunit_event( - &self, - _element: &super::FMP4Mux, - stream: &mut Stream, - settings: &Settings, - earliest_pts: gst::ClockTime, - ) -> Result<Option<gst::Event>, gst::FlowError> { - assert!(stream.last_force_keyunit_time.is_none()); - - // If we never sent a force-keyunit event then send one now. - let fku_running_time = earliest_pts + settings.fragment_duration; - gst::debug!( - CAT, - obj: &stream.sinkpad, - "Sending first force-keyunit event for running time {}", - fku_running_time - ); - stream.last_force_keyunit_time = Some(fku_running_time); - - return Ok(Some( - gst_video::UpstreamForceKeyUnitEvent::builder() - .running_time(fku_running_time) - .all_headers(true) - .build(), - )); - } - - fn create_force_keyunit_event( - &self, - _element: &super::FMP4Mux, - stream: &mut Stream, - settings: &Settings, - segment: &gst::FormattedSegment<gst::ClockTime>, - pts: gst::ClockTime, - ) -> Result<Option<gst::Event>, gst::FlowError> { - // If we never sent a force-keyunit event then wait until the earliest PTS of the first GOP - // is known and send it then. - // - // Otherwise if the current PTS is a fragment duration in the future, send the next one - // now. - - let last_force_keyunit_time = match stream.last_force_keyunit_time { - None => return Ok(None), - Some(last_force_keyunit_time) => last_force_keyunit_time, - }; - - let pts = segment.to_running_time(pts); - if pts.opt_lt(last_force_keyunit_time).unwrap_or(true) { - return Ok(None); - } - - let fku_running_time = last_force_keyunit_time + settings.fragment_duration; - gst::debug!( - CAT, - obj: &stream.sinkpad, - "Sending force-keyunit event for running time {}", - fku_running_time - ); - stream.last_force_keyunit_time = Some(fku_running_time); - - Ok(Some( - gst_video::UpstreamForceKeyUnitEvent::builder() - .running_time(fku_running_time) - .all_headers(true) - .build(), - )) - } - #[allow(clippy::type_complexity)] fn drain_buffers( &self, - _element: &super::FMP4Mux, + element: &super::FMP4Mux, state: &mut State, settings: &Settings, timeout: bool, at_eos: bool, ) -> Result< ( + // Drained streams Vec<( gst::Caps, Option<super::FragmentTimingInfo>, VecDeque<Buffer>, )>, + // Minimum earliest PTS position of all streams Option<gst::ClockTime>, + // Minimum earliest PTS of all streams Option<gst::ClockTime>, + // Minimum start DTS position of all streams (if any stream has DTS) Option<gst::ClockTime>, + // End PTS of this drained fragment, i.e. start PTS of the next fragment Option<gst::ClockTime>, ), gst::FlowError, @@ -699,7 +638,24 @@ impl FMP4Mux { let mut min_earliest_pts_position = None; let mut min_earliest_pts = None; let mut min_start_dts_position = None; - let mut max_end_pts = None; + let mut fragment_end_pts = None; + + // The first stream decides how much can be dequeued, if anything at all. + // + // All complete GOPs (or at EOS everything) up to the fragment duration will be dequeued + // but on timeout in live pipelines it might happen that the first stream does not have a + // complete GOP queued. In that case nothing is dequeued for any of the streams and the + // timeout is advanced by 1s until at least one complete GOP can be dequeued. + // + // If the first stream is already EOS then the next stream that is not EOS yet will be + // taken in its place. + let fragment_start_pts = state.fragment_start_pts.unwrap(); + gst::info!( + CAT, + obj: element, + "Starting to drain at {}", + fragment_start_pts + ); for (idx, stream) in state.streams.iter_mut().enumerate() { assert!( @@ -709,29 +665,74 @@ impl FMP4Mux { || stream.queued_gops.get(1).map(|gop| gop.final_earliest_pts) == Some(true) ); - // At EOS, finalize all GOPs and drain them out. Otherwise if the queued duration is - // equal to the fragment duration then drain out all complete GOPs, otherwise all - // except for the newest complete GOP. - let gops = if at_eos || stream.sinkpad.is_eos() { - stream.queued_gops.drain(..).rev().collect::<Vec<_>>() - } else { - let mut gops = vec![]; + // Drain all complete GOPs until at most one fragment duration was dequeued for the + // first stream, or until the dequeued duration of the first stream. + let mut gops = Vec::with_capacity(stream.queued_gops.len()); + let dequeue_end_pts = + fragment_end_pts.unwrap_or(fragment_start_pts + settings.fragment_duration); + gst::trace!( + CAT, + obj: &stream.sinkpad, + "Draining up to end PTS {} / duration {}", + dequeue_end_pts, + dequeue_end_pts - fragment_start_pts + ); - let fragment_start_pts = state.fragment_start_pts.unwrap(); - while let Some(gop) = stream.queued_gops.pop_back() { - assert!(timeout || gop.final_end_pts); + while let Some(gop) = stream.queued_gops.back() { + // If this GOP is not complete then we can't pop it yet. + // + // If there was no complete GOP at all yet then it might be bigger than the + // fragment duration. In this case we might not be able to handle the latency + // requirements in a live pipeline. + if !gop.final_end_pts && !at_eos && !stream.sinkpad.is_eos() { + break; + } - let end_pts = gop.end_pts; - gops.push(gop); - if end_pts.saturating_sub(fragment_start_pts) >= settings.fragment_duration { - break; - } + // If this GOP starts after the fragment end then don't dequeue it yet unless this is + // the first stream and no GOPs were dequeued at all yet. This would mean that the + // GOP is bigger than the fragment duration. + if gop.end_pts > dequeue_end_pts && (fragment_end_pts.is_some() || !gops.is_empty()) + { + break; } - gops - }; + gops.push(stream.queued_gops.pop_back().unwrap()); + } stream.fragment_filled = false; + // If we don't have a next fragment start PTS then this is the first stream as above. + if fragment_end_pts.is_none() { + if let Some(last_gop) = gops.last() { + // Dequeued something so let's take the end PTS of the last GOP + fragment_end_pts = Some(last_gop.end_pts); + gst::info!( + CAT, + obj: &stream.sinkpad, + "Draining up to PTS {} for this fragment", + last_gop.end_pts, + ); + } else { + // If nothing was dequeued for the first stream then this is OK if we're at + // EOS: we just consider the next stream as first stream then. + if at_eos || stream.sinkpad.is_eos() { + // This is handled below generally if nothing was dequeued + } else { + // Otherwise this can only really happen on timeout in live pipelines. + assert!(timeout); + + gst::warning!( + CAT, + obj: &stream.sinkpad, + "Don't have a complete GOP for the first stream on timeout in a live pipeline", + ); + + // In this case we advance the timeout by 1s and hope that things are + // better then. + return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA); + } + } + } + if gops.is_empty() { gst::info!( CAT, @@ -743,6 +744,8 @@ impl FMP4Mux { continue; } + assert!(fragment_end_pts.is_some()); + let first_gop = gops.first().unwrap(); let last_gop = gops.last().unwrap(); let earliest_pts = first_gop.earliest_pts; @@ -769,9 +772,6 @@ impl FMP4Mux { min_start_dts_position = Some(start_dts_position); } } - if max_end_pts.opt_lt(end_pts).unwrap_or(true) { - max_end_pts = Some(end_pts); - } gst::info!( CAT, @@ -892,7 +892,7 @@ impl FMP4Mux { min_earliest_pts_position, min_earliest_pts, min_start_dts_position, - max_end_pts, + fragment_end_pts, )) } @@ -1229,6 +1229,7 @@ impl FMP4Mux { settings: &Settings, timeout: bool, at_eos: bool, + upstream_events: &mut Vec<(gst_base::AggregatorPad, gst::Event)>, ) -> Result<(Option<gst::Caps>, Option<gst::BufferList>), gst::FlowError> { if at_eos { gst::info!(CAT, obj: element, "Draining at EOS"); @@ -1248,7 +1249,7 @@ impl FMP4Mux { min_earliest_pts_position, min_earliest_pts, min_start_dts_position, - max_end_pts, + fragment_end_pts, ) = self.drain_buffers(element, state, settings, timeout, at_eos)?; // For ONVIF, replace all timestamps with timestamps based on UTC times. @@ -1279,13 +1280,13 @@ impl FMP4Mux { let mut buffer_list = None; if interleaved_buffers.is_empty() { - assert!(timeout || at_eos); + assert!(at_eos); } else { // If there are actual buffers to output then create headers as needed and create a // bufferlist for all buffers that have to be output. let min_earliest_pts_position = min_earliest_pts_position.unwrap(); let min_earliest_pts = min_earliest_pts.unwrap(); - let max_end_pts = max_end_pts.unwrap(); + let fragment_end_pts = fragment_end_pts.unwrap(); let mut fmp4_header = None; if !state.sent_headers { @@ -1334,7 +1335,7 @@ impl FMP4Mux { let buffer = fmp4_fragment_header.get_mut().unwrap(); buffer.set_pts(min_earliest_pts_position); buffer.set_dts(min_start_dts_position); - buffer.set_duration(max_end_pts.checked_sub(min_earliest_pts)); + buffer.set_duration(fragment_end_pts.checked_sub(min_earliest_pts)); // Fragment header is HEADER buffer.set_flags(gst::BufferFlags::HEADER); @@ -1386,22 +1387,37 @@ impl FMP4Mux { offset: moof_offset, }); } - state.end_pts = Some(max_end_pts); + + state.end_pts = Some(fragment_end_pts); state.end_utc_time = max_end_utc_time; // Update for the start PTS of the next fragment - state.fragment_start_pts = state.fragment_start_pts.map(|start| { - let new_fragment_start = start + settings.fragment_duration; + gst::info!( + CAT, + obj: element, + "Starting new fragment at {}", + fragment_end_pts, + ); + state.fragment_start_pts = Some(fragment_end_pts); - gst::info!( - CAT, - obj: element, - "Starting new fragment at {}", - new_fragment_start - ); + gst::debug!( + CAT, + obj: element, + "Sending force-keyunit events for running time {}", + fragment_end_pts + settings.fragment_duration, + ); - new_fragment_start - }); + let fku = gst_video::UpstreamForceKeyUnitEvent::builder() + .running_time(fragment_end_pts + settings.fragment_duration) + .all_headers(true) + .build(); + + for stream in &state.streams { + upstream_events.push((stream.sinkpad.clone(), fku.clone())); + } + + // Reset timeout delay now that we've output an actual fragment + state.timeout_delay = gst::ClockTime::ZERO; } if settings.write_mfra && at_eos { @@ -1491,7 +1507,6 @@ impl FMP4Mux { dts_offset: None, current_position: gst::ClockTime::ZERO, current_utc_time: gst::ClockTime::ZERO, - last_force_keyunit_time: None, }); } @@ -1801,7 +1816,7 @@ impl ElementImpl for FMP4Mux { impl AggregatorImpl for FMP4Mux { fn next_time(&self, _aggregator: &Self::Type) -> Option<gst::ClockTime> { let state = self.state.lock().unwrap(); - state.fragment_start_pts + state.fragment_start_pts.opt_add(state.timeout_delay) } fn sink_query( @@ -1938,7 +1953,6 @@ impl AggregatorImpl for FMP4Mux { stream.dts_offset = None; stream.current_position = gst::ClockTime::ZERO; stream.current_utc_time = gst::ClockTime::ZERO; - stream.last_force_keyunit_time = None; stream.fragment_filled = false; } @@ -2030,21 +2044,9 @@ impl AggregatorImpl for FMP4Mux { } }; - let pts = buffer.pts(); - // Queue up the buffer and update GOP tracking state self.queue_gops(aggregator, idx, stream, &segment, buffer)?; - // If we have a PTS with this buffer, check if a new force-keyunit event for the next - // fragment start has to be created - if let Some(pts) = pts { - if let Some(event) = self - .create_force_keyunit_event(aggregator, stream, &settings, &segment, pts)? - { - upstream_events.push((stream.sinkpad.clone(), event)); - } - } - // Check if this stream is filled enough now. if let Some((queued_end_pts, fragment_start_pts)) = Option::zip( stream @@ -2093,15 +2095,20 @@ impl AggregatorImpl for FMP4Mux { state.earliest_pts = Some(earliest_pts); state.fragment_start_pts = Some(earliest_pts); + gst::debug!( + CAT, + obj: aggregator, + "Sending first force-keyunit event for running time {}", + earliest_pts + settings.fragment_duration, + ); + + let fku = gst_video::UpstreamForceKeyUnitEvent::builder() + .running_time(earliest_pts + settings.fragment_duration) + .all_headers(true) + .build(); + for stream in &mut state.streams { - if let Some(event) = self.create_initial_force_keyunit_event( - aggregator, - stream, - &settings, - earliest_pts, - )? { - upstream_events.push((stream.sinkpad.clone(), event)); - } + upstream_events.push((stream.sinkpad.clone(), fku.clone())); // Check if this stream is filled enough now. if let Some(queued_end_pts) = stream @@ -2127,7 +2134,31 @@ impl AggregatorImpl for FMP4Mux { } // If enough GOPs were queued, drain and create the output fragment - self.drain(aggregator, &mut state, &settings, timeout, all_eos)? + match self.drain( + aggregator, + &mut state, + &settings, + timeout, + all_eos, + &mut upstream_events, + ) { + Ok(res) => res, + Err(gst_base::AGGREGATOR_FLOW_NEED_DATA) => { + gst::element_warning!( + aggregator, + gst::StreamError::Format, + ["Longer GOPs than fragment duration"] + ); + state.timeout_delay += gst::ClockTime::from_seconds(1); + + drop(state); + for (sinkpad, event) in upstream_events { + sinkpad.push_event(event); + } + return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA); + } + Err(err) => return Err(err), + } }; for (sinkpad, event) in upstream_events { diff --git a/generic/fmp4/tests/tests.rs b/generic/fmp4/tests/tests.rs index 556b29c6..bf44643f 100644 --- a/generic/fmp4/tests/tests.rs +++ b/generic/fmp4/tests/tests.rs @@ -48,7 +48,7 @@ fn test_buffer_flags_single_stream(cmaf: bool) { gst::ClockTime::from_seconds(60 * 60 * 1000) }; - // Push 7 buffers of 1s each, 1st and last buffer without DELTA_UNIT flag + // Push 7 buffers of 1s each, 1st and 6 buffer without DELTA_UNIT flag for i in 0..7 { let mut buffer = gst::Buffer::with_size(1).unwrap(); { @@ -881,3 +881,541 @@ fn test_gap_events() { let ev = h1.pull_event().unwrap(); assert_eq!(ev.type_(), gst::EventType::Eos); } + +#[test] +fn test_single_stream_short_gops() { + init(); + + let mut h = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src")); + + // 5s fragment duration + h.element() + .unwrap() + .set_property("fragment-duration", gst::ClockTime::from_seconds(5)); + + h.set_src_caps( + gst::Caps::builder("video/x-h264") + .field("width", 1920i32) + .field("height", 1080i32) + .field("framerate", gst::Fraction::new(30, 1)) + .field("stream-format", "avc") + .field("alignment", "au") + .field("codec_data", gst::Buffer::with_size(1).unwrap()) + .build(), + ); + h.play(); + + let output_offset = gst::ClockTime::from_seconds(60 * 60 * 1000); + + // Push 8 buffers of 1s each, 1st, 4th and 7th buffer without DELTA_UNIT flag + for i in 0..8 { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(gst::ClockTime::from_seconds(i)); + buffer.set_dts(gst::ClockTime::from_seconds(i)); + buffer.set_duration(gst::ClockTime::SECOND); + if i != 0 && i != 3 && i != 6 { + buffer.set_flags(gst::BufferFlags::DELTA_UNIT); + } + } + assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok)); + + if i == 2 || i == 7 { + let ev = loop { + let ev = h.pull_upstream_event().unwrap(); + if ev.type_() != gst::EventType::Reconfigure + && ev.type_() != gst::EventType::Latency + { + break ev; + } + }; + + let fku_time = if i == 2 { + gst::ClockTime::from_seconds(5) + } else { + gst::ClockTime::from_seconds(8) + }; + + assert_eq!(ev.type_(), gst::EventType::CustomUpstream); + assert_eq!( + gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(), + gst_video::UpstreamForceKeyUnitEvent { + running_time: Some(fku_time), + all_headers: true, + count: 0 + } + ); + } + } + + let header = h.pull().unwrap(); + assert_eq!( + header.flags(), + gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT + ); + assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset)); + assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset)); + + let fragment_header = h.pull().unwrap(); + assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER); + assert_eq!( + fragment_header.pts(), + Some(gst::ClockTime::ZERO + output_offset) + ); + assert_eq!( + fragment_header.dts(), + Some(gst::ClockTime::ZERO + output_offset) + ); + assert_eq!( + fragment_header.duration(), + Some(gst::ClockTime::from_seconds(3)) + ); + + for i in 0..3 { + let buffer = h.pull().unwrap(); + if i == 2 { + assert_eq!( + buffer.flags(), + gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER + ); + } else { + assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); + } + assert_eq!( + buffer.pts(), + Some(gst::ClockTime::from_seconds(i) + output_offset) + ); + assert_eq!( + buffer.dts(), + Some(gst::ClockTime::from_seconds(i) + output_offset) + ); + assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND)); + } + + h.push_event(gst::event::Eos::new()); + + let fragment_header = h.pull().unwrap(); + assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER); + assert_eq!( + fragment_header.pts(), + Some(gst::ClockTime::from_seconds(3) + output_offset) + ); + assert_eq!( + fragment_header.dts(), + Some(gst::ClockTime::from_seconds(3) + output_offset) + ); + assert_eq!( + fragment_header.duration(), + Some(gst::ClockTime::from_seconds(5)) + ); + + for i in 3..8 { + let buffer = h.pull().unwrap(); + if i == 7 { + assert_eq!( + buffer.flags(), + gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER + ); + } else { + assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); + } + assert_eq!( + buffer.pts(), + Some(gst::ClockTime::from_seconds(i) + output_offset) + ); + assert_eq!( + buffer.dts(), + Some(gst::ClockTime::from_seconds(i) + output_offset) + ); + assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND)); + } + + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::StreamStart); + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Caps); + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Segment); + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Eos); +} + +#[test] +fn test_single_stream_long_gops() { + init(); + + let mut h = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src")); + + // 5s fragment duration + h.element() + .unwrap() + .set_property("fragment-duration", gst::ClockTime::from_seconds(5)); + + h.set_src_caps( + gst::Caps::builder("video/x-h264") + .field("width", 1920i32) + .field("height", 1080i32) + .field("framerate", gst::Fraction::new(30, 1)) + .field("stream-format", "avc") + .field("alignment", "au") + .field("codec_data", gst::Buffer::with_size(1).unwrap()) + .build(), + ); + h.play(); + + let output_offset = gst::ClockTime::from_seconds(60 * 60 * 1000); + + // Push 10 buffers of 1s each, 1st and 7th buffer without DELTA_UNIT flag + for i in 0..10 { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(gst::ClockTime::from_seconds(i)); + buffer.set_dts(gst::ClockTime::from_seconds(i)); + buffer.set_duration(gst::ClockTime::SECOND); + if i != 0 && i != 6 { + buffer.set_flags(gst::BufferFlags::DELTA_UNIT); + } + } + assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok)); + + if i == 2 || i == 7 { + let ev = loop { + let ev = h.pull_upstream_event().unwrap(); + if ev.type_() != gst::EventType::Reconfigure + && ev.type_() != gst::EventType::Latency + { + break ev; + } + }; + + let fku_time = if i == 2 { + gst::ClockTime::from_seconds(5) + } else { + gst::ClockTime::from_seconds(11) + }; + + assert_eq!(ev.type_(), gst::EventType::CustomUpstream); + assert_eq!( + gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(), + gst_video::UpstreamForceKeyUnitEvent { + running_time: Some(fku_time), + all_headers: true, + count: 0 + } + ); + } + } + + let header = h.pull().unwrap(); + assert_eq!( + header.flags(), + gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT + ); + assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset)); + assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset)); + + let fragment_header = h.pull().unwrap(); + assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER); + assert_eq!( + fragment_header.pts(), + Some(gst::ClockTime::ZERO + output_offset) + ); + assert_eq!( + fragment_header.dts(), + Some(gst::ClockTime::ZERO + output_offset) + ); + assert_eq!( + fragment_header.duration(), + Some(gst::ClockTime::from_seconds(6)) + ); + + for i in 0..6 { + let buffer = h.pull().unwrap(); + if i == 5 { + assert_eq!( + buffer.flags(), + gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER + ); + } else { + assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); + } + assert_eq!( + buffer.pts(), + Some(gst::ClockTime::from_seconds(i) + output_offset) + ); + assert_eq!( + buffer.dts(), + Some(gst::ClockTime::from_seconds(i) + output_offset) + ); + assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND)); + } + + h.push_event(gst::event::Eos::new()); + + let fragment_header = h.pull().unwrap(); + assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER); + assert_eq!( + fragment_header.pts(), + Some(gst::ClockTime::from_seconds(6) + output_offset) + ); + assert_eq!( + fragment_header.dts(), + Some(gst::ClockTime::from_seconds(6) + output_offset) + ); + assert_eq!( + fragment_header.duration(), + Some(gst::ClockTime::from_seconds(4)) + ); + + for i in 6..10 { + let buffer = h.pull().unwrap(); + if i == 9 { + assert_eq!( + buffer.flags(), + gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER + ); + } else { + assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); + } + assert_eq!( + buffer.pts(), + Some(gst::ClockTime::from_seconds(i) + output_offset) + ); + assert_eq!( + buffer.dts(), + Some(gst::ClockTime::from_seconds(i) + output_offset) + ); + assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND)); + } + + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::StreamStart); + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Caps); + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Segment); + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Eos); +} + +#[test] +fn test_buffer_multi_stream_short_gops() { + init(); + + let mut h1 = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src")); + let mut h2 = gst_check::Harness::with_element(&h1.element().unwrap(), Some("sink_1"), None); + + // 5s fragment duration + h1.element() + .unwrap() + .set_property("fragment-duration", gst::ClockTime::from_seconds(5)); + + h1.set_src_caps( + gst::Caps::builder("video/x-h264") + .field("width", 1920i32) + .field("height", 1080i32) + .field("framerate", gst::Fraction::new(30, 1)) + .field("stream-format", "avc") + .field("alignment", "au") + .field("codec_data", gst::Buffer::with_size(1).unwrap()) + .build(), + ); + h1.play(); + + h2.set_src_caps( + gst::Caps::builder("audio/mpeg") + .field("mpegversion", 4i32) + .field("channels", 1i32) + .field("rate", 44100i32) + .field("stream-format", "raw") + .field("base-profile", "lc") + .field("profile", "lc") + .field("level", "2") + .field( + "codec_data", + gst::Buffer::from_slice([0x12, 0x08, 0x56, 0xe5, 0x00]), + ) + .build(), + ); + h2.play(); + + let output_offset = gst::ClockTime::from_seconds(60 * 60 * 1000); + + // Push 8 buffers of 1s each, 1st, 4th and 7th buffer without DELTA_UNIT flag + for i in 0..8 { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(gst::ClockTime::from_seconds(i)); + buffer.set_dts(gst::ClockTime::from_seconds(i)); + buffer.set_duration(gst::ClockTime::SECOND); + if i != 0 && i != 3 && i != 6 { + buffer.set_flags(gst::BufferFlags::DELTA_UNIT); + } + } + assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok)); + + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(gst::ClockTime::from_seconds(i)); + buffer.set_dts(gst::ClockTime::from_seconds(i)); + buffer.set_duration(gst::ClockTime::SECOND); + } + assert_eq!(h2.push(buffer), Ok(gst::FlowSuccess::Ok)); + + if i == 2 || i == 7 { + let ev = loop { + let ev = h1.pull_upstream_event().unwrap(); + if ev.type_() != gst::EventType::Reconfigure + && ev.type_() != gst::EventType::Latency + { + break ev; + } + }; + + let fku_time = if i == 2 { + gst::ClockTime::from_seconds(5) + } else { + gst::ClockTime::from_seconds(8) + }; + + assert_eq!(ev.type_(), gst::EventType::CustomUpstream); + assert_eq!( + gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(), + gst_video::UpstreamForceKeyUnitEvent { + running_time: Some(fku_time), + all_headers: true, + count: 0 + } + ); + + let ev = loop { + let ev = h2.pull_upstream_event().unwrap(); + if ev.type_() != gst::EventType::Reconfigure + && ev.type_() != gst::EventType::Latency + { + break ev; + } + }; + + assert_eq!(ev.type_(), gst::EventType::CustomUpstream); + assert_eq!( + gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(), + gst_video::UpstreamForceKeyUnitEvent { + running_time: Some(fku_time), + all_headers: true, + count: 0 + } + ); + } + } + + let header = h1.pull().unwrap(); + assert_eq!( + header.flags(), + gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT + ); + assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset)); + assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset)); + + let fragment_header = h1.pull().unwrap(); + assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER); + assert_eq!( + fragment_header.pts(), + Some(gst::ClockTime::ZERO + output_offset) + ); + assert_eq!( + fragment_header.dts(), + Some(gst::ClockTime::ZERO + output_offset) + ); + assert_eq!( + fragment_header.duration(), + Some(gst::ClockTime::from_seconds(3)) + ); + + for i in 0..3 { + for j in 0..2 { + let buffer = h1.pull().unwrap(); + if i == 2 && j == 1 { + assert_eq!( + buffer.flags(), + gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER + ); + } else { + assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); + } + + assert_eq!( + buffer.pts(), + Some(gst::ClockTime::from_seconds(i) + output_offset) + ); + + if j == 0 { + assert_eq!( + buffer.dts(), + Some(gst::ClockTime::from_seconds(i) + output_offset) + ); + } else { + assert!(buffer.dts().is_none()); + } + assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND)); + } + } + + h1.push_event(gst::event::Eos::new()); + h2.push_event(gst::event::Eos::new()); + + let fragment_header = h1.pull().unwrap(); + assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER); + assert_eq!( + fragment_header.pts(), + Some(gst::ClockTime::from_seconds(3) + output_offset) + ); + assert_eq!( + fragment_header.dts(), + Some(gst::ClockTime::from_seconds(3) + output_offset) + ); + assert_eq!( + fragment_header.duration(), + Some(gst::ClockTime::from_seconds(5)) + ); + + for i in 3..8 { + for j in 0..2 { + let buffer = h1.pull().unwrap(); + if i == 7 && j == 1 { + assert_eq!( + buffer.flags(), + gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER + ); + } else { + assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); + } + assert_eq!( + buffer.pts(), + Some(gst::ClockTime::from_seconds(i) + output_offset) + ); + if j == 0 { + assert_eq!( + buffer.dts(), + Some(gst::ClockTime::from_seconds(i) + output_offset) + ); + } else { + assert!(buffer.dts().is_none()); + } + assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND)); + } + } + + let ev = h1.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::StreamStart); + let ev = h1.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Caps); + let ev = h1.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Segment); + let ev = h1.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Eos); +} |