Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/sdroege/gst-plugin-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Dröge <sebastian@centricular.com>2022-05-17 15:05:19 +0300
committerSebastian Dröge <sebastian@centricular.com>2022-05-20 15:02:46 +0300
commit987e4efc020b8d3244e607437b3e1a0f8f93bb3f (patch)
treef106d52006735b5204de56bd98881b18054b4268
parentffea0e2d2dcbbb334e8b7d34d1f6a607f1c7e855 (diff)
fmp4mux: In live pipelines use the current fragment end time as timeout
This allows muxing even if some streams are sparse or have big gaps.
-rw-r--r--generic/fmp4/src/fmp4mux/imp.rs21
-rw-r--r--generic/fmp4/tests/tests.rs241
2 files changed, 256 insertions, 6 deletions
diff --git a/generic/fmp4/src/fmp4mux/imp.rs b/generic/fmp4/src/fmp4mux/imp.rs
index 399047be..e08ed50e 100644
--- a/generic/fmp4/src/fmp4mux/imp.rs
+++ b/generic/fmp4/src/fmp4mux/imp.rs
@@ -500,12 +500,15 @@ impl FMP4Mux {
element: &super::FMP4Mux,
state: &mut State,
settings: &Settings,
+ timeout: bool,
at_eos: bool,
) -> Result<Option<gst::BufferList>, gst::FlowError> {
let class = element.class();
if at_eos {
gst::info!(CAT, obj: element, "Draining at EOS");
+ } else if timeout {
+ gst::info!(CAT, obj: element, "Draining at timeout");
} else {
for stream in &state.streams {
if !stream.fragment_filled && !stream.sinkpad.is_eos() {
@@ -525,7 +528,8 @@ impl FMP4Mux {
for stream in &mut state.streams {
assert!(
- at_eos
+ timeout
+ || at_eos
|| stream.sinkpad.is_eos()
|| stream.queued_gops.get(1).map(|gop| gop.final_earliest_pts) == Some(true)
);
@@ -540,7 +544,7 @@ impl FMP4Mux {
let fragment_start_pts = state.fragment_start_pts.unwrap();
while let Some(gop) = stream.queued_gops.pop_back() {
- assert!(gop.final_end_pts);
+ assert!(timeout || gop.final_end_pts);
let end_pts = gop.end_pts;
gops.push(gop);
@@ -674,7 +678,7 @@ impl FMP4Mux {
let mut buffer_list = None;
if interleaved_buffers.is_empty() {
- assert!(at_eos);
+ assert!(timeout || at_eos);
} else {
let min_earliest_pts_position = min_earliest_pts_position.unwrap();
let min_earliest_pts = min_earliest_pts.unwrap();
@@ -1181,6 +1185,11 @@ impl ElementImpl for FMP4Mux {
}
impl AggregatorImpl for FMP4Mux {
+ fn next_time(&self, _aggregator: &Self::Type) -> Option<gst::ClockTime> {
+ let state = self.state.lock().unwrap();
+ state.fragment_start_pts
+ }
+
fn sink_query(
&self,
aggregator: &Self::Type,
@@ -1365,7 +1374,7 @@ impl AggregatorImpl for FMP4Mux {
fn aggregate(
&self,
aggregator: &Self::Type,
- _timeout: bool,
+ timeout: bool,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let settings = self.settings.lock().unwrap().clone();
@@ -1468,7 +1477,7 @@ impl AggregatorImpl for FMP4Mux {
break;
}
Some(oldest_gop) => {
- if !oldest_gop.final_earliest_pts {
+ if !timeout && !oldest_gop.final_earliest_pts {
earliest_pts = None;
break;
}
@@ -1517,7 +1526,7 @@ impl AggregatorImpl for FMP4Mux {
}
// If enough GOPs were queued, drain and create the output fragment
- self.drain(aggregator, &mut state, &settings, all_eos)?
+ self.drain(aggregator, &mut state, &settings, timeout, all_eos)?
};
for (sinkpad, event) in upstream_events {
diff --git a/generic/fmp4/tests/tests.rs b/generic/fmp4/tests/tests.rs
index 9e4aa6f5..d21cc7b4 100644
--- a/generic/fmp4/tests/tests.rs
+++ b/generic/fmp4/tests/tests.rs
@@ -403,3 +403,244 @@ fn test_buffer_flags_multi_stream() {
let ev = h1.pull_event().unwrap();
assert_eq!(ev.type_(), gst::EventType::Eos);
}
+
+#[test]
+fn test_live_timeout() {
+ init();
+
+ let mut h1 = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src"));
+ let mut h2 = gst_check::Harness::with_element(&h1.element().unwrap(), Some("sink_1"), None);
+
+ h1.use_testclock();
+
+ // 5s fragment duration
+ h1.element()
+ .unwrap()
+ .set_property("fragment-duration", gst::ClockTime::from_seconds(5));
+
+ h1.set_src_caps(
+ gst::Caps::builder("video/x-h264")
+ .field("width", 1920i32)
+ .field("height", 1080i32)
+ .field("framerate", gst::Fraction::new(30, 1))
+ .field("stream-format", "avc")
+ .field("alignment", "au")
+ .field("codec_data", gst::Buffer::with_size(1).unwrap())
+ .build(),
+ );
+ h1.play();
+
+ h2.set_src_caps(
+ gst::Caps::builder("audio/mpeg")
+ .field("mpegversion", 4i32)
+ .field("channels", 1i32)
+ .field("rate", 44100i32)
+ .field("stream-format", "raw")
+ .field("base-profile", "lc")
+ .field("profile", "lc")
+ .field("level", "2")
+ .field(
+ "codec_data",
+ gst::Buffer::from_slice([0x12, 0x08, 0x56, 0xe5, 0x00]),
+ )
+ .build(),
+ );
+ h2.play();
+
+ let output_offset = gst::ClockTime::from_seconds(60 * 60 * 1000);
+
+ // Push 7 buffers of 1s each, 1st and last buffer without DELTA_UNIT flag
+ for i in 0..7 {
+ let mut buffer = gst::Buffer::with_size(1).unwrap();
+ {
+ let buffer = buffer.get_mut().unwrap();
+ buffer.set_pts(gst::ClockTime::from_seconds(i));
+ buffer.set_dts(gst::ClockTime::from_seconds(i));
+ buffer.set_duration(gst::ClockTime::SECOND);
+ if i != 0 && i != 5 {
+ buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
+ }
+ }
+ assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok));
+
+ // Skip buffer 4th and 6th buffer (end of fragment / stream)
+ if i == 4 || i == 6 {
+ continue;
+ } else {
+ let mut buffer = gst::Buffer::with_size(1).unwrap();
+ {
+ let buffer = buffer.get_mut().unwrap();
+ buffer.set_pts(gst::ClockTime::from_seconds(i));
+ buffer.set_dts(gst::ClockTime::from_seconds(i));
+ buffer.set_duration(gst::ClockTime::SECOND);
+ }
+ assert_eq!(h2.push(buffer), Ok(gst::FlowSuccess::Ok));
+ }
+
+ if i == 2 {
+ let ev = loop {
+ let ev = h1.pull_upstream_event().unwrap();
+ if ev.type_() != gst::EventType::Reconfigure
+ && ev.type_() != gst::EventType::Latency
+ {
+ break ev;
+ }
+ };
+
+ assert_eq!(ev.type_(), gst::EventType::CustomUpstream);
+ assert_eq!(
+ gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(),
+ gst_video::UpstreamForceKeyUnitEvent {
+ running_time: Some(gst::ClockTime::from_seconds(5)),
+ all_headers: true,
+ count: 0
+ }
+ );
+
+ let ev = loop {
+ let ev = h2.pull_upstream_event().unwrap();
+ if ev.type_() != gst::EventType::Reconfigure
+ && ev.type_() != gst::EventType::Latency
+ {
+ break ev;
+ }
+ };
+
+ assert_eq!(ev.type_(), gst::EventType::CustomUpstream);
+ assert_eq!(
+ gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(),
+ gst_video::UpstreamForceKeyUnitEvent {
+ running_time: Some(gst::ClockTime::from_seconds(5)),
+ all_headers: true,
+ count: 0
+ }
+ );
+ }
+ }
+
+ // Advance time and crank the clock: this should bring us to the end of the first fragment
+ h1.set_time(gst::ClockTime::from_seconds(5)).unwrap();
+ h1.crank_single_clock_wait().unwrap();
+
+ let header = h1.pull().unwrap();
+ assert_eq!(
+ header.flags(),
+ gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT
+ );
+ assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset));
+ assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset));
+
+ let fragment_header = h1.pull().unwrap();
+ assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
+ assert_eq!(
+ fragment_header.pts(),
+ Some(gst::ClockTime::ZERO + output_offset)
+ );
+ assert_eq!(
+ fragment_header.dts(),
+ Some(gst::ClockTime::ZERO + output_offset)
+ );
+ assert_eq!(
+ fragment_header.duration(),
+ Some(gst::ClockTime::from_seconds(5))
+ );
+
+ for i in 0..5 {
+ for j in 0..2 {
+ // Skip gap events that don't result in buffers
+ if j == 1 && i == 4 {
+ // Advance time and crank the clock another time. This brings us at the end of the
+ // EOS.
+ h1.set_time(gst::ClockTime::from_seconds(7)).unwrap();
+ h1.crank_single_clock_wait().unwrap();
+ continue;
+ }
+
+ let buffer = h1.pull().unwrap();
+ if i == 4 && j == 0 {
+ assert_eq!(
+ buffer.flags(),
+ gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
+ );
+ } else if i == 5 && j == 0 {
+ assert_eq!(buffer.flags(), gst::BufferFlags::HEADER);
+ } else {
+ assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
+ }
+
+ assert_eq!(
+ buffer.pts(),
+ Some(gst::ClockTime::from_seconds(i) + output_offset)
+ );
+
+ if j == 0 {
+ assert_eq!(
+ buffer.dts(),
+ Some(gst::ClockTime::from_seconds(i) + output_offset)
+ );
+ } else {
+ assert!(buffer.dts().is_none());
+ }
+ assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
+ }
+ }
+
+ h1.push_event(gst::event::Eos::new());
+ h2.push_event(gst::event::Eos::new());
+
+ let fragment_header = h1.pull().unwrap();
+ assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
+ assert_eq!(
+ fragment_header.pts(),
+ Some(gst::ClockTime::from_seconds(5) + output_offset)
+ );
+ assert_eq!(
+ fragment_header.dts(),
+ Some(gst::ClockTime::from_seconds(5) + output_offset)
+ );
+ assert_eq!(
+ fragment_header.duration(),
+ Some(gst::ClockTime::from_seconds(2))
+ );
+
+ for i in 5..7 {
+ for j in 0..2 {
+ // Skip gap events that don't result in buffers
+ if j == 1 && i == 6 {
+ continue;
+ }
+
+ let buffer = h1.pull().unwrap();
+ if i == 6 && j == 0 {
+ assert_eq!(
+ buffer.flags(),
+ gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
+ );
+ } else {
+ assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
+ }
+ assert_eq!(
+ buffer.pts(),
+ Some(gst::ClockTime::from_seconds(i) + output_offset)
+ );
+ if j == 0 {
+ assert_eq!(
+ buffer.dts(),
+ Some(gst::ClockTime::from_seconds(i) + output_offset)
+ );
+ } else {
+ assert!(buffer.dts().is_none());
+ }
+ assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
+ }
+ }
+
+ let ev = h1.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::StreamStart);
+ let ev = h1.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::Caps);
+ let ev = h1.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::Segment);
+ let ev = h1.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::Eos);
+}