Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Dröge <sebastian@centricular.com>2022-05-27 13:27:10 +0300
committerSebastian Dröge <sebastian@centricular.com>2022-08-12 18:51:26 +0300
commite4081872c5be8bc1fa551198bbac2d6642d39211 (patch)
treedf35ea713113821f8a9b17a3dab893d85ecd13cf /generic
parent537659655776225320838b426aefdae04994f5c7 (diff)
fmp4mux: Use UTC times from reference timestamp meta in ONVIF mode
Diffstat (limited to 'generic')
-rw-r--r--generic/fmp4/src/fmp4mux/boxes.rs4
-rw-r--r--generic/fmp4/src/fmp4mux/imp.rs304
-rw-r--r--generic/fmp4/src/fmp4mux/mod.rs3
3 files changed, 304 insertions, 7 deletions
diff --git a/generic/fmp4/src/fmp4mux/boxes.rs b/generic/fmp4/src/fmp4mux/boxes.rs
index 5c93ed6be..b82507d7e 100644
--- a/generic/fmp4/src/fmp4mux/boxes.rs
+++ b/generic/fmp4/src/fmp4mux/boxes.rs
@@ -397,8 +397,8 @@ pub(super) fn create_fmp4_header(cfg: super::HeaderConfiguration) -> Result<gst:
// track id
v.extend(0u32.to_be_bytes());
- // XXX: start UTC time in 100ns units since Jan 1 1601
- v.extend(0u64.to_be_bytes());
+ // start UTC time in 100ns units since Jan 1 1601
+ v.extend(cfg.start_utc_time.unwrap().to_be_bytes());
Ok(())
})
diff --git a/generic/fmp4/src/fmp4mux/imp.rs b/generic/fmp4/src/fmp4mux/imp.rs
index 60d8b919c..bcc357e27 100644
--- a/generic/fmp4/src/fmp4mux/imp.rs
+++ b/generic/fmp4/src/fmp4mux/imp.rs
@@ -23,6 +23,36 @@ use super::Buffer;
/// Offset for the segment in non-single-stream variants.
const SEGMENT_OFFSET: gst::ClockTime = gst::ClockTime::from_seconds(60 * 60 * 1000);
+/// Offset between UNIX epoch and Jan 1 1601 epoch in seconds.
+/// 1601 = UNIX + UNIX_1601_OFFSET.
+const UNIX_1601_OFFSET: u64 = 11_644_473_600;
+
+/// Offset between NTP and UNIX epoch in seconds.
+/// NTP = UNIX + NTP_UNIX_OFFSET.
+const NTP_UNIX_OFFSET: u64 = 2_208_988_800;
+
+/// Reference timestamp meta caps for NTP timestamps.
+static NTP_CAPS: Lazy<gst::Caps> = Lazy::new(|| gst::Caps::builder("timestamp/x-ntp").build());
+
+/// Reference timestamp meta caps for UNIX timestamps.
+static UNIX_CAPS: Lazy<gst::Caps> = Lazy::new(|| gst::Caps::builder("timestamp/x-unix").build());
+
+/// Returns the UTC time of the buffer in the UNIX epoch.
+fn get_utc_time_from_buffer(buffer: &gst::BufferRef) -> Option<gst::ClockTime> {
+ buffer
+ .iter_meta::<gst::ReferenceTimestampMeta>()
+ .find_map(|meta| {
+ if meta.reference().can_intersect(&UNIX_CAPS) {
+ Some(meta.timestamp())
+ } else if meta.reference().can_intersect(&NTP_CAPS) {
+ meta.timestamp()
+ .checked_sub(gst::ClockTime::from_seconds(NTP_UNIX_OFFSET))
+ } else {
+ None
+ }
+ })
+}
+
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"fmp4mux",
@@ -105,6 +135,11 @@ struct Stream {
// timestamps from going backwards when queueing new buffers
current_position: gst::ClockTime,
+ // Current UTC time in ONVIF mode to prevent timestamps from
+ // going backwards when draining a fragment.
+ // UNIX epoch.
+ current_utc_time: gst::ClockTime,
+
last_force_keyunit_time: Option<gst::ClockTime>,
}
@@ -129,6 +164,11 @@ struct State {
// Start PTS of the current fragment
fragment_start_pts: Option<gst::ClockTime>,
+ // In ONVIF mode the UTC time corresponding to the beginning of the stream
+ // UNIX epoch.
+ start_utc_time: Option<gst::ClockTime>,
+ end_utc_time: Option<gst::ClockTime>,
+
sent_headers: bool,
}
@@ -756,6 +796,246 @@ impl FMP4Mux {
}
}
+ let mut max_end_utc_time = None;
+ // For ONVIF, replace all timestamps with timestamps based on UTC times.
+ if class.as_ref().variant == super::Variant::ONVIF {
+ let calculate_pts = |buffer: &Buffer| -> gst::ClockTime {
+ let composition_time_offset = buffer.composition_time_offset.unwrap_or(0);
+ if composition_time_offset > 0 {
+ buffer.timestamp + gst::ClockTime::from_nseconds(composition_time_offset as u64)
+ } else {
+ buffer
+ .timestamp
+ .checked_sub(gst::ClockTime::from_nseconds(
+ (-composition_time_offset) as u64,
+ ))
+ .unwrap()
+ }
+ };
+
+ // If this is the first fragment then allow the first buffers to not have a reference
+ // timestamp meta and backdate them
+ if state.stream_header.is_none() {
+ for (idx, drain_buffers) in drain_buffers.iter_mut().enumerate() {
+ let (buffer_idx, utc_time, buffer) = match drain_buffers
+ .iter()
+ .enumerate()
+ .find_map(|(idx, buffer)| {
+ get_utc_time_from_buffer(&buffer.buffer)
+ .map(|timestamp| (idx, timestamp, buffer))
+ }) {
+ None => {
+ gst::error!(
+ CAT,
+ obj: &state.streams[idx].sinkpad,
+ "No reference timestamp set on any buffers in the first fragment",
+ );
+ return Err(gst::FlowError::Error);
+ }
+ Some(res) => res,
+ };
+
+ // Now do the backdating
+ if buffer_idx > 0 {
+ let utc_time_pts = calculate_pts(buffer);
+
+ for buffer in drain_buffers.iter_mut().take(buffer_idx) {
+ let buffer_pts = calculate_pts(buffer);
+ let buffer_pts_diff = if utc_time_pts >= buffer_pts {
+ (utc_time_pts - buffer_pts).nseconds() as i64
+ } else {
+ -((buffer_pts - utc_time_pts).nseconds() as i64)
+ };
+ let buffer_utc_time = if buffer_pts_diff >= 0 {
+ utc_time
+ .checked_sub(gst::ClockTime::from_nseconds(
+ buffer_pts_diff as u64,
+ ))
+ .unwrap()
+ } else {
+ utc_time
+ .checked_add(gst::ClockTime::from_nseconds(
+ (-buffer_pts_diff) as u64,
+ ))
+ .unwrap()
+ };
+
+ let buffer = buffer.buffer.make_mut();
+ gst::ReferenceTimestampMeta::add(
+ buffer,
+ &UNIX_CAPS,
+ buffer_utc_time,
+ gst::ClockTime::NONE,
+ );
+ }
+ }
+ }
+ }
+
+ // Calculate the minimum across all streams and remember that
+ if state.start_utc_time.is_none() {
+ let mut start_utc_time = None;
+
+ for (idx, drain_buffers) in drain_buffers.iter().enumerate() {
+ for buffer in drain_buffers {
+ let utc_time = match get_utc_time_from_buffer(&buffer.buffer) {
+ None => {
+ gst::error!(
+ CAT,
+ obj: &state.streams[idx].sinkpad,
+ "No reference timestamp set on all buffers"
+ );
+ return Err(gst::FlowError::Error);
+ }
+ Some(utc_time) => utc_time,
+ };
+
+ if start_utc_time.is_none() || start_utc_time > Some(utc_time) {
+ start_utc_time = Some(utc_time);
+ }
+ }
+ }
+
+ gst::debug!(
+ CAT,
+ obj: element,
+ "Configuring start UTC time {}",
+ start_utc_time.unwrap()
+ );
+ state.start_utc_time = start_utc_time;
+ }
+
+ // Update all buffer timestamps based on the UTC time and offset to the start UTC time
+ let start_utc_time = state.start_utc_time.unwrap();
+ for (idx, drain_buffers) in drain_buffers.iter_mut().enumerate() {
+ let mut start_time = None;
+
+ for buffer in drain_buffers.iter_mut() {
+ let utc_time = match get_utc_time_from_buffer(&buffer.buffer) {
+ None => {
+ gst::error!(
+ CAT,
+ obj: &state.streams[idx].sinkpad,
+ "No reference timestamp set on all buffers"
+ );
+ return Err(gst::FlowError::Error);
+ }
+ Some(utc_time) => utc_time,
+ };
+
+ // Convert PTS UTC time to DTS
+ let mut utc_time_dts =
+ if let Some(composition_time_offset) = buffer.composition_time_offset {
+ if composition_time_offset >= 0 {
+ utc_time
+ .checked_sub(gst::ClockTime::from_nseconds(
+ composition_time_offset as u64,
+ ))
+ .unwrap()
+ } else {
+ utc_time
+ .checked_add(gst::ClockTime::from_nseconds(
+ (-composition_time_offset) as u64,
+ ))
+ .unwrap()
+ }
+ } else {
+ utc_time
+ };
+
+ // Enforce monotonically increasing timestamps
+ if utc_time_dts < state.streams[idx].current_utc_time {
+ gst::warning!(
+ CAT,
+ obj: &state.streams[idx].sinkpad,
+ "Decreasing UTC DTS timestamp for buffer {} < {}",
+ utc_time_dts,
+ state.streams[idx].current_utc_time,
+ );
+ utc_time_dts = state.streams[idx].current_utc_time;
+ } else {
+ state.streams[idx].current_utc_time = utc_time_dts;
+ }
+
+ let timestamp = utc_time_dts.checked_sub(start_utc_time).unwrap();
+
+ gst::trace!(
+ CAT,
+ obj: &state.streams[idx].sinkpad,
+ "Updating buffer timestamp from {} to relative UTC DTS time {} / absolute DTS time {}, UTC PTS time {}",
+ buffer.timestamp,
+ timestamp,
+ utc_time_dts,
+ utc_time,
+ );
+
+ buffer.timestamp = timestamp;
+ if start_time.is_none() || start_time > Some(buffer.timestamp) {
+ start_time = Some(buffer.timestamp);
+ }
+ }
+
+ // Update durations for all buffers except for the last in the fragment unless all
+ // have the same duration anyway
+ let mut common_duration = Ok(None);
+ let mut drain_buffers_iter = drain_buffers.iter_mut().peekable();
+ while let Some(buffer) = drain_buffers_iter.next() {
+ let next_timestamp = drain_buffers_iter.peek().map(|b| b.timestamp);
+
+ if let Some(next_timestamp) = next_timestamp {
+ let duration = next_timestamp.saturating_sub(buffer.timestamp);
+ if common_duration == Ok(None) {
+ common_duration = Ok(Some(duration));
+ } else if common_duration != Ok(Some(duration)) {
+ common_duration = Err(());
+ }
+
+ gst::trace!(
+ CAT,
+ obj: &state.streams[idx].sinkpad,
+ "Updating buffer with timestamp {} duration from {} to relative UTC duration {}",
+ buffer.timestamp,
+ buffer.duration,
+ duration,
+ );
+
+ buffer.duration = duration;
+ } else if let Ok(Some(common_duration)) = common_duration {
+ gst::trace!(
+ CAT,
+ obj: &state.streams[idx].sinkpad,
+ "Updating last buffer with timestamp {} duration from {} to common relative UTC duration {}",
+ buffer.timestamp,
+ buffer.duration,
+ common_duration,
+ );
+
+ buffer.duration = common_duration;
+ } else {
+ gst::trace!(
+ CAT,
+ obj: &state.streams[idx].sinkpad,
+ "Keeping last buffer with timestamp {} duration at {}",
+ buffer.timestamp,
+ buffer.duration,
+ );
+ }
+
+ let end_utc_time = start_utc_time + buffer.timestamp + buffer.duration;
+ if max_end_utc_time.is_none() || max_end_utc_time < Some(end_utc_time) {
+ max_end_utc_time = Some(end_utc_time);
+ }
+ }
+
+ if let Some(start_time) = start_time {
+ gst::debug!(CAT, obj: &state.streams[idx].sinkpad, "Fragment starting at UTC time {}", start_time);
+ streams[idx].1.as_mut().unwrap().start_time = start_time;
+ } else {
+ assert!(streams[idx].1.is_none());
+ }
+ }
+ }
+
// Create header now if it was not created before and return the caps
let mut caps = None;
if state.stream_header.is_none() {
@@ -934,6 +1214,7 @@ impl FMP4Mux {
});
}
state.end_pts = Some(max_end_pts);
+ state.end_utc_time = max_end_utc_time;
// Update for the start PTS of the next fragment
state.fragment_start_pts = state.fragment_start_pts.map(|start| {
@@ -1036,6 +1317,7 @@ impl FMP4Mux {
fragment_filled: false,
dts_offset: None,
current_position: gst::ClockTime::ZERO,
+ current_utc_time: gst::ClockTime::ZERO,
last_force_keyunit_time: None,
});
}
@@ -1090,11 +1372,19 @@ impl FMP4Mux {
assert!(!at_eos || state.streams.iter().all(|s| s.queued_gops.is_empty()));
- let duration = state
- .end_pts
- .opt_checked_sub(state.earliest_pts)
- .ok()
- .flatten();
+ let duration = if variant == super::Variant::ONVIF {
+ state
+ .end_utc_time
+ .opt_checked_sub(state.start_utc_time)
+ .ok()
+ .flatten()
+ } else {
+ state
+ .end_pts
+ .opt_checked_sub(state.earliest_pts)
+ .ok()
+ .flatten()
+ };
let streams = state
.streams
@@ -1108,6 +1398,9 @@ impl FMP4Mux {
streams: streams.as_slice(),
write_mehd: settings.write_mehd,
duration: if at_eos { duration } else { None },
+ start_utc_time: state
+ .start_utc_time
+ .map(|unix| unix.nseconds() / 100 + UNIX_1601_OFFSET * 10_000_000),
})
.map_err(|err| {
gst::error!(CAT, obj: element, "Failed to create FMP4 header: {}", err);
@@ -1486,6 +1779,7 @@ impl AggregatorImpl for FMP4Mux {
stream.queued_gops.clear();
stream.dts_offset = None;
stream.current_position = gst::ClockTime::ZERO;
+ stream.current_utc_time = gst::ClockTime::ZERO;
stream.last_force_keyunit_time = None;
stream.fragment_filled = false;
}
diff --git a/generic/fmp4/src/fmp4mux/mod.rs b/generic/fmp4/src/fmp4mux/mod.rs
index 2d0ba10ea..1424d9a3c 100644
--- a/generic/fmp4/src/fmp4mux/mod.rs
+++ b/generic/fmp4/src/fmp4mux/mod.rs
@@ -70,6 +70,9 @@ pub(crate) struct HeaderConfiguration<'a> {
streams: &'a [gst::Caps],
write_mehd: bool,
duration: Option<gst::ClockTime>,
+ /// Start UTC time in ONVIF mode.
+ /// Since Jan 1 1601 in 100ns units.
+ start_utc_time: Option<u64>,
}
#[derive(Debug)]