Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/sdroege/gst-plugin-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Dröge <sebastian@centricular.com>2022-08-17 13:19:08 +0300
committerSebastian Dröge <sebastian@centricular.com>2022-08-17 19:23:18 +0300
commit2c99f66ea583b15aeff1ba9f9ae56a192d80ea13 (patch)
treee1f55c9d3514d3eb885d100758f29633d04c784c /generic
parent9827406113e176a3717c82613c70e4898248e9ec (diff)
fmp4mux: Dequeue the earliest buffer from any pad first instead of dequeueing up to a whole fragment from the same pad
This keeps the fill levels of each sinkpad in sync.
Diffstat (limited to 'generic')
-rw-r--r--generic/fmp4/src/fmp4mux/imp.rs121
-rw-r--r--generic/fmp4/tests/tests.rs7
2 files changed, 110 insertions, 18 deletions
diff --git a/generic/fmp4/src/fmp4mux/imp.rs b/generic/fmp4/src/fmp4mux/imp.rs
index e338a057..aa74ab03 100644
--- a/generic/fmp4/src/fmp4mux/imp.rs
+++ b/generic/fmp4/src/fmp4mux/imp.rs
@@ -179,6 +179,94 @@ pub(crate) struct FMP4Mux {
}
impl FMP4Mux {
+ fn find_earliest_stream<'a>(
+ &self,
+ element: &super::FMP4Mux,
+ state: &'a mut State,
+ timeout: bool,
+ ) -> Result<Option<(usize, &'a mut Stream)>, gst::FlowError> {
+ let mut earliest_stream = None;
+ let mut all_have_data_or_eos = true;
+
+ for (idx, stream) in state.streams.iter_mut().enumerate() {
+ let buffer = match stream.sinkpad.peek_buffer() {
+ Some(buffer) => buffer,
+ None => {
+ if stream.sinkpad.is_eos() {
+ gst::trace!(CAT, obj: &stream.sinkpad, "Stream is EOS");
+ } else {
+ all_have_data_or_eos = false;
+ gst::trace!(CAT, obj: &stream.sinkpad, "Stream has no buffer");
+ }
+ continue;
+ }
+ };
+
+ if stream.fragment_filled {
+ gst::trace!(CAT, obj: &stream.sinkpad, "Stream has current fragment filled");
+ continue;
+ }
+
+ let segment = match stream
+ .sinkpad
+ .segment()
+ .clone()
+ .downcast::<gst::ClockTime>()
+ .ok()
+ {
+ Some(segment) => segment,
+ None => {
+ gst::error!(CAT, obj: &stream.sinkpad, "Got buffer before segment");
+ return Err(gst::FlowError::Error);
+ }
+ };
+
+ // If the stream has no valid running time, assume it's before everything else.
+ let running_time = match segment.to_running_time(buffer.dts_or_pts()) {
+ None => {
+ gst::trace!(CAT, obj: &stream.sinkpad, "Stream has no valid running time");
+ if earliest_stream.is_none() {
+ earliest_stream = Some((idx, stream, gst::ClockTime::ZERO));
+ }
+ continue;
+ }
+ Some(running_time) => running_time,
+ };
+
+ gst::trace!(CAT, obj: &stream.sinkpad, "Stream has running time {} queued", running_time);
+
+ if earliest_stream
+ .as_ref()
+ .map_or(true, |(_idx, _stream, earliest_running_time)| {
+ *earliest_running_time > running_time
+ })
+ {
+ earliest_stream = Some((idx, stream, running_time));
+ }
+ }
+
+ if !timeout && !all_have_data_or_eos {
+ gst::trace!(
+ CAT,
+ obj: element,
+ "No timeout and not all streams have a buffer or are EOS"
+ );
+ Ok(None)
+ } else if let Some((idx, stream, earliest_running_time)) = earliest_stream {
+ gst::trace!(
+ CAT,
+ obj: element,
+ "Stream {} is earliest stream with running time {}",
+ stream.sinkpad.name(),
+ earliest_running_time
+ );
+ Ok(Some((idx, stream)))
+ } else {
+ gst::trace!(CAT, obj: element, "No streams have data queued currently");
+ Ok(None)
+ }
+ }
+
// Queue incoming buffers as individual GOPs.
fn queue_gops(
&self,
@@ -1831,9 +1919,9 @@ impl AggregatorImpl for FMP4Mux {
) -> Result<gst::FlowSuccess, gst::FlowError> {
let settings = self.settings.lock().unwrap().clone();
- let mut all_eos = true;
let mut upstream_events = vec![];
+ let all_eos;
let (caps, buffers) = {
let mut state = self.state.lock().unwrap();
@@ -1843,23 +1931,21 @@ impl AggregatorImpl for FMP4Mux {
}
// Queue buffers from all streams that are not filled for the current fragment yet
+ //
+ // Always take a buffer from the stream with the earliest queued buffer to keep the
+ // fill-level at all sinkpads in sync.
let fragment_start_pts = state.fragment_start_pts;
- for (idx, stream) in state.streams.iter_mut().enumerate() {
- if stream.fragment_filled {
- let buffer = stream.sinkpad.peek_buffer();
- all_eos &= buffer.is_none() && stream.sinkpad.is_eos();
-
- continue;
- }
- let buffer = stream.sinkpad.pop_buffer();
- all_eos &= buffer.is_none() && stream.sinkpad.is_eos();
-
- let buffer = match buffer {
- None => continue,
+ while let Some((idx, stream)) =
+ self.find_earliest_stream(aggregator, &mut state, timeout)?
+ {
+ // Can only happen if the stream was flushed in the meantime
+ let buffer = match stream.sinkpad.pop_buffer() {
Some(buffer) => buffer,
+ None => continue,
};
+ // Can only happen if the stream was flushed in the meantime
let segment = match stream
.sinkpad
.segment()
@@ -1907,10 +1993,6 @@ impl AggregatorImpl for FMP4Mux {
}
}
- if all_eos {
- gst::debug!(CAT, obj: aggregator, "All streams are EOS now");
- }
-
// Calculate the earliest PTS after queueing input if we can now.
if state.earliest_pts.is_none() {
let mut earliest_pts = None;
@@ -1969,6 +2051,11 @@ impl AggregatorImpl for FMP4Mux {
}
}
+ all_eos = state.streams.iter().all(|stream| stream.sinkpad.is_eos());
+ if all_eos {
+ gst::debug!(CAT, obj: aggregator, "All streams are EOS now");
+ }
+
// If enough GOPs were queued, drain and create the output fragment
self.drain(aggregator, &mut state, &settings, timeout, all_eos)?
};
diff --git a/generic/fmp4/tests/tests.rs b/generic/fmp4/tests/tests.rs
index 266e2462..556b29c6 100644
--- a/generic/fmp4/tests/tests.rs
+++ b/generic/fmp4/tests/tests.rs
@@ -551,7 +551,6 @@ fn test_live_timeout() {
if j == 1 && i == 4 {
// Advance time and crank the clock another time. This brings us at the end of the
// EOS.
- h1.set_time(gst::ClockTime::from_seconds(7)).unwrap();
h1.crank_single_clock_wait().unwrap();
continue;
}
@@ -652,6 +651,8 @@ fn test_gap_events() {
let mut h1 = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src"));
let mut h2 = gst_check::Harness::with_element(&h1.element().unwrap(), Some("sink_1"), None);
+ h1.use_testclock();
+
// 5s fragment duration
h1.element()
.unwrap()
@@ -760,6 +761,10 @@ fn test_gap_events() {
}
}
+ // Advance time and crank the clock: this should bring us to the end of the first fragment
+ h1.set_time(gst::ClockTime::from_seconds(5)).unwrap();
+ h1.crank_single_clock_wait().unwrap();
+
let header = h1.pull().unwrap();
assert_eq!(
header.flags(),