Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/mux/mp4
diff options
context:
space:
mode:
authorSebastian Dröge <sebastian@centricular.com>2022-11-09 10:02:35 +0300
committerSebastian Dröge <sebastian@centricular.com>2022-11-10 13:59:53 +0300
commit2b4fd40d6267058ce10a83dfb48d5408f913957a (patch)
tree494a3543403195a7f7d2b544ba474db2bb4f3bea /mux/mp4
parent97bb327b2a5fc1b459b958be28d2b281f1f9e390 (diff)
mp4: Add ONVIF non-fragmented MP4 muxer
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/974>
Diffstat (limited to 'mux/mp4')
-rw-r--r--mux/mp4/src/mp4mux/boxes.rs137
-rw-r--r--mux/mp4/src/mp4mux/imp.rs576
-rw-r--r--mux/mp4/src/mp4mux/mod.rs11
3 files changed, 617 insertions, 107 deletions
diff --git a/mux/mp4/src/mp4mux/boxes.rs b/mux/mp4/src/mp4mux/boxes.rs
index 5f1214661..a4bc5e166 100644
--- a/mux/mp4/src/mp4mux/boxes.rs
+++ b/mux/mp4/src/mp4mux/boxes.rs
@@ -60,7 +60,7 @@ pub(super) fn create_ftyp(variant: super::Variant) -> Result<gst::Buffer, Error>
let mut v = vec![];
let (brand, compatible_brands) = match variant {
- super::Variant::ISO => (b"isom", vec![b"mp41", b"mp42"]),
+ super::Variant::ISO | super::Variant::ONVIF => (b"iso4", vec![b"mp41", b"mp42", b"isom"]),
};
write_box(&mut v, b"ftyp", |v| {
@@ -102,15 +102,71 @@ pub(super) fn create_mdat_header(size: Option<u64>) -> Result<gst::Buffer, Error
Ok(gst::Buffer::from_mut_slice(v))
}
+/// Offset between UNIX epoch and Jan 1 1601 epoch in seconds.
+/// 1601 = UNIX + UNIX_1601_OFFSET.
+const UNIX_1601_OFFSET: u64 = 11_644_473_600;
+
/// Creates `moov` box
pub(super) fn create_moov(header: super::Header) -> Result<gst::Buffer, Error> {
let mut v = vec![];
write_box(&mut v, b"moov", |v| write_moov(v, &header))?;
+ if header.variant == super::Variant::ONVIF {
+ write_full_box(
+ &mut v,
+ b"meta",
+ FULL_BOX_VERSION_0,
+ FULL_BOX_FLAGS_NONE,
+ |v| {
+ write_full_box(v, b"hdlr", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| {
+ // Handler type
+ v.extend(b"null");
+
+ // Reserved
+ v.extend([0u8; 3 * 4]);
+
+ // Name
+ v.extend(b"MetadataHandler");
+
+ Ok(())
+ })?;
+
+ write_box(v, b"cstb", |v| {
+ // entry count
+ v.extend(1u32.to_be_bytes());
+
+ // track id
+ v.extend(0u32.to_be_bytes());
+
+ // start UTC time in 100ns units since Jan 1 1601
+ // This is the UTC time of the earliest stream, which has to be converted to
+ // the correct epoch and scale.
+ let start_utc_time = header
+ .streams
+ .iter()
+ .map(|s| s.earliest_pts)
+ .min()
+ .unwrap()
+ .nseconds()
+ / 100;
+ let start_utc_time = start_utc_time + UNIX_1601_OFFSET * 10_000_000;
+ v.extend(start_utc_time.to_be_bytes());
+
+ Ok(())
+ })
+ },
+ )?;
+ }
+
Ok(gst::Buffer::from_mut_slice(v))
}
+struct TrackReference {
+ reference_type: [u8; 4],
+ track_ids: Vec<u32>,
+}
+
fn write_moov(v: &mut Vec<u8>, header: &super::Header) -> Result<(), Error> {
use gst::glib;
@@ -124,7 +180,27 @@ fn write_moov(v: &mut Vec<u8>, header: &super::Header) -> Result<(), Error> {
})?;
for (idx, stream) in header.streams.iter().enumerate() {
write_box(v, b"trak", |v| {
- write_trak(v, header, idx, stream, creation_time)
+ let mut references = Vec::new();
+
+ // Reference the video track for ONVIF metadata tracks
+ if header.variant == super::Variant::ONVIF
+ && stream.caps.structure(0).unwrap().name() == "application/x-onvif-metadata"
+ {
+ // Find the first video track
+ for (idx, other_stream) in header.streams.iter().enumerate() {
+ let s = other_stream.caps.structure(0).unwrap();
+
+ if matches!(s.name(), "video/x-h264" | "video/x-h265" | "image/jpeg") {
+ references.push(TrackReference {
+ reference_type: *b"cdsc",
+ track_ids: vec![idx as u32 + 1],
+ });
+ break;
+ }
+ }
+ }
+
+ write_trak(v, header, idx, stream, creation_time, &references)
})?;
}
@@ -240,6 +316,7 @@ fn write_trak(
idx: usize,
stream: &super::Stream,
creation_time: u64,
+ references: &[TrackReference],
) -> Result<(), Error> {
write_full_box(
v,
@@ -250,6 +327,9 @@ fn write_trak(
)?;
write_box(v, b"mdia", |v| write_mdia(v, header, stream, creation_time))?;
+ if !references.is_empty() {
+ write_box(v, b"tref", |v| write_tref(v, header, references))?;
+ }
write_box(v, b"edts", |v| write_edts(v, header, stream))?;
Ok(())
@@ -430,6 +510,7 @@ fn write_hdlr(
"audio/mpeg" | "audio/x-opus" | "audio/x-alaw" | "audio/x-mulaw" | "audio/x-adpcm" => {
(b"soun", b"SoundHandler\0".as_slice())
}
+ "application/x-onvif-metadata" => (b"meta", b"MetadataHandler\0".as_slice()),
_ => unreachable!(),
};
@@ -462,6 +543,11 @@ fn write_minf(
write_smhd(v, header)
})?
}
+ "application/x-onvif-metadata" => {
+ write_full_box(v, b"nmhd", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |_v| {
+ Ok(())
+ })?
+ }
_ => unreachable!(),
}
@@ -613,6 +699,7 @@ fn write_stsd(
"audio/mpeg" | "audio/x-opus" | "audio/x-alaw" | "audio/x-mulaw" | "audio/x-adpcm" => {
write_audio_sample_entry(v, header, stream)?
}
+ "application/x-onvif-metadata" => write_xml_meta_data_sample_entry(v, header, stream)?,
_ => unreachable!(),
}
@@ -1201,6 +1288,34 @@ fn write_dops(v: &mut Vec<u8>, caps: &gst::Caps) -> Result<(), Error> {
})
}
+fn write_xml_meta_data_sample_entry(
+ v: &mut Vec<u8>,
+ _header: &super::Header,
+ stream: &super::Stream,
+) -> Result<(), Error> {
+ let s = stream.caps.structure(0).unwrap();
+ let namespace = match s.name() {
+ "application/x-onvif-metadata" => b"http://www.onvif.org/ver10/schema",
+ _ => unreachable!(),
+ };
+
+ write_sample_entry_box(v, b"metx", move |v| {
+ // content_encoding, empty string
+ v.push(0);
+
+ // namespace
+ v.extend_from_slice(namespace);
+ v.push(0);
+
+ // schema_location, empty string list
+ v.push(0);
+
+ Ok(())
+ })?;
+
+ Ok(())
+}
+
fn write_stts(
v: &mut Vec<u8>,
_header: &super::Header,
@@ -1528,6 +1643,24 @@ fn write_stco(
Ok(())
}
+fn write_tref(
+ v: &mut Vec<u8>,
+ _header: &super::Header,
+ references: &[TrackReference],
+) -> Result<(), Error> {
+ for reference in references {
+ write_box(v, reference.reference_type, |v| {
+ for track_id in &reference.track_ids {
+ v.extend(track_id.to_be_bytes());
+ }
+
+ Ok(())
+ })?;
+ }
+
+ Ok(())
+}
+
fn write_edts(
v: &mut Vec<u8>,
header: &super::Header,
diff --git a/mux/mp4/src/mp4mux/imp.rs b/mux/mp4/src/mp4mux/imp.rs
index b8ec1d57e..a57de86cd 100644
--- a/mux/mp4/src/mp4mux/imp.rs
+++ b/mux/mp4/src/mp4mux/imp.rs
@@ -12,12 +12,38 @@ use gst::subclass::prelude::*;
use gst_base::prelude::*;
use gst_base::subclass::prelude::*;
+use std::collections::VecDeque;
use std::sync::Mutex;
use once_cell::sync::Lazy;
use super::boxes;
+/// Offset between NTP and UNIX epoch in seconds.
+/// NTP = UNIX + NTP_UNIX_OFFSET.
+const NTP_UNIX_OFFSET: u64 = 2_208_988_800;
+
+/// Reference timestamp meta caps for NTP timestamps.
+static NTP_CAPS: Lazy<gst::Caps> = Lazy::new(|| gst::Caps::builder("timestamp/x-ntp").build());
+
+/// Reference timestamp meta caps for UNIX timestamps.
+static UNIX_CAPS: Lazy<gst::Caps> = Lazy::new(|| gst::Caps::builder("timestamp/x-unix").build());
+
+/// Returns the UTC time of the buffer in the UNIX epoch.
+fn get_utc_time_from_buffer(buffer: &gst::BufferRef) -> Option<gst::ClockTime> {
+ buffer
+ .iter_meta::<gst::ReferenceTimestampMeta>()
+ .find_map(|meta| {
+ if meta.reference().can_intersect(&UNIX_CAPS) {
+ Some(meta.timestamp())
+ } else if meta.reference().can_intersect(&NTP_CAPS) {
+ meta.timestamp().checked_sub(NTP_UNIX_OFFSET.seconds())
+ } else {
+ None
+ }
+ })
+}
+
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"mp4mux",
@@ -46,6 +72,7 @@ impl Default for Settings {
}
}
+#[derive(Debug)]
struct PendingBuffer {
buffer: gst::Buffer,
timestamp: gst::Signed<gst::ClockTime>,
@@ -58,6 +85,9 @@ struct Stream {
/// Sink pad for this stream.
sinkpad: super::MP4MuxPad,
+ /// Pre-queue for ONVIF variant to timestamp all buffers with their UTC time.
+ pre_queue: VecDeque<(gst::FormattedSegment<gst::ClockTime>, gst::Buffer)>,
+
/// Currently configured caps for this stream.
caps: gst::Caps,
/// Whether this stream is intra-only and has frame reordering.
@@ -84,6 +114,9 @@ struct Stream {
earliest_pts: Option<gst::ClockTime>,
/// Current end PTS.
end_pts: Option<gst::ClockTime>,
+
+ /// In ONVIF mode, the mapping between running time and UTC time (UNIX)
+ running_time_utc_time_mapping: Option<(gst::ClockTime, gst::ClockTime)>,
}
#[derive(Default)]
@@ -111,6 +144,287 @@ pub(crate) struct MP4Mux {
}
impl MP4Mux {
+ /// Checks if a buffer is valid according to the stream configuration.
+ fn check_buffer(
+ buffer: &gst::BufferRef,
+ sinkpad: &super::MP4MuxPad,
+ delta_frames: super::DeltaFrames,
+ ) -> Result<(), gst::FlowError> {
+ if delta_frames.requires_dts() && buffer.dts().is_none() {
+ gst::error!(CAT, obj: sinkpad, "Require DTS for video streams");
+ return Err(gst::FlowError::Error);
+ }
+
+ if buffer.pts().is_none() {
+ gst::error!(CAT, obj: sinkpad, "Require timestamped buffers");
+ return Err(gst::FlowError::Error);
+ }
+
+ if delta_frames.intra_only() && buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) {
+ gst::error!(CAT, obj: sinkpad, "Intra-only stream with delta units");
+ return Err(gst::FlowError::Error);
+ }
+
+ Ok(())
+ }
+
+ fn peek_buffer(
+ &self,
+ sinkpad: &super::MP4MuxPad,
+ delta_frames: super::DeltaFrames,
+ pre_queue: &mut VecDeque<(gst::FormattedSegment<gst::ClockTime>, gst::Buffer)>,
+ running_time_utc_time_mapping: &Option<(gst::ClockTime, gst::ClockTime)>,
+ ) -> Result<Option<(gst::FormattedSegment<gst::ClockTime>, gst::Buffer)>, gst::FlowError> {
+ if let Some((segment, buffer)) = pre_queue.front() {
+ return Ok(Some((segment.clone(), buffer.clone())));
+ }
+
+ let mut buffer = match sinkpad.peek_buffer() {
+ None => return Ok(None),
+ Some(buffer) => buffer,
+ };
+
+ Self::check_buffer(&buffer, sinkpad, delta_frames)?;
+
+ let mut segment = match sinkpad.segment().downcast::<gst::ClockTime>().ok() {
+ Some(segment) => segment,
+ None => {
+ gst::error!(CAT, obj: sinkpad, "Got buffer before segment");
+ return Err(gst::FlowError::Error);
+ }
+ };
+
+ // For ONVIF we need to re-timestamp the buffer with its UTC time.
+ // We can only possibly end up here after the running-time UTC mapping is known.
+ //
+ // After re-timestamping, put the buffer into the pre-queue so re-timestamping only has to
+ // happen once.
+ if self.obj().class().as_ref().variant == super::Variant::ONVIF {
+ let running_time_utc_time_mapping = running_time_utc_time_mapping.unwrap();
+
+ let pts_position = buffer.pts().unwrap();
+ let dts_position = buffer.dts();
+
+ let pts = segment.to_running_time_full(pts_position).unwrap();
+
+ let dts = dts_position
+ .map(|dts_position| segment.to_running_time_full(dts_position).unwrap());
+
+ let utc_time = match get_utc_time_from_buffer(&buffer) {
+ None => {
+ // Calculate from the mapping
+ gst::Signed::Positive(running_time_utc_time_mapping.1)
+ .checked_sub_unsigned(running_time_utc_time_mapping.0)
+ .and_then(|res| res.checked_add(pts))
+ .and_then(|res| res.positive())
+ .ok_or_else(|| {
+ gst::error!(CAT, obj: sinkpad, "Stream has negative PTS UTC time");
+ gst::FlowError::Error
+ })?
+ }
+ Some(utc_time) => utc_time,
+ };
+
+ gst::trace!(
+ CAT,
+ obj: sinkpad,
+ "Mapped PTS running time {pts} to UTC time {utc_time}"
+ );
+
+ {
+ let buffer = buffer.make_mut();
+ buffer.set_pts(utc_time);
+
+ if let Some(dts) = dts {
+ let dts_utc_time = gst::Signed::Positive(utc_time)
+ .checked_sub(pts)
+ .and_then(|res| res.checked_add(dts))
+ .and_then(|res| res.positive())
+ .ok_or_else(|| {
+ gst::error!(CAT, obj: sinkpad, "Stream has negative DTS UTC time");
+ gst::FlowError::Error
+ })?;
+ gst::trace!(
+ CAT,
+ obj: sinkpad,
+ "Mapped DTS running time {dts} to UTC time {dts_utc_time}"
+ );
+ buffer.set_dts(dts_utc_time);
+ }
+ }
+
+ segment = gst::FormattedSegment::default();
+
+ // Drop current buffer as it is now queued
+ sinkpad.drop_buffer();
+ pre_queue.push_back((segment.clone(), buffer.clone()));
+ }
+
+ Ok(Some((segment, buffer)))
+ }
+
+ fn pop_buffer(
+ &self,
+ sinkpad: &super::MP4MuxPad,
+ delta_frames: super::DeltaFrames,
+ pre_queue: &mut VecDeque<(gst::FormattedSegment<gst::ClockTime>, gst::Buffer)>,
+ running_time_utc_time_mapping: &mut Option<(gst::ClockTime, gst::ClockTime)>,
+ ) -> Result<Option<(gst::FormattedSegment<gst::ClockTime>, gst::Buffer)>, gst::FlowError> {
+ // In ONVIF mode we need to get UTC times for each buffer and synchronize based on that.
+ // Queue up to 6s of data to get the first UTC time and then backdate.
+ if self.obj().class().as_ref().variant == super::Variant::ONVIF
+ && running_time_utc_time_mapping.is_none()
+ {
+ if let Some((last, first)) = Option::zip(pre_queue.back(), pre_queue.front()) {
+ // Existence of PTS/DTS checked below
+ let (last, first) = if delta_frames.requires_dts() {
+ (
+ last.0.to_running_time_full(last.1.dts()).unwrap(),
+ first.0.to_running_time_full(first.1.dts()).unwrap(),
+ )
+ } else {
+ (
+ last.0.to_running_time_full(last.1.pts()).unwrap(),
+ first.0.to_running_time_full(first.1.pts()).unwrap(),
+ )
+ };
+
+ if last.saturating_sub(first)
+ > gst::Signed::Positive(gst::ClockTime::from_seconds(6))
+ {
+ gst::error!(
+ CAT,
+ obj: sinkpad,
+ "Got no UTC time in the first 6s of the stream"
+ );
+ return Err(gst::FlowError::Error);
+ }
+ }
+
+ let buffer = match sinkpad.pop_buffer() {
+ None => {
+ if sinkpad.is_eos() {
+ gst::error!(CAT, obj: sinkpad, "Got no UTC time before EOS");
+ return Err(gst::FlowError::Error);
+ } else {
+ return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA);
+ }
+ }
+ Some(buffer) => buffer,
+ };
+
+ Self::check_buffer(&buffer, sinkpad, delta_frames)?;
+
+ let segment = match sinkpad.segment().downcast::<gst::ClockTime>().ok() {
+ Some(segment) => segment,
+ None => {
+ gst::error!(CAT, obj: sinkpad, "Got buffer before segment");
+ return Err(gst::FlowError::Error);
+ }
+ };
+
+ let utc_time = match get_utc_time_from_buffer(&buffer) {
+ Some(utc_time) => utc_time,
+ None => {
+ pre_queue.push_back((segment, buffer));
+ return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA);
+ }
+ };
+
+ let running_time = segment.to_running_time_full(buffer.pts()).unwrap();
+ gst::info!(
+ CAT,
+ obj: sinkpad,
+ "Got initial UTC time {utc_time} at PTS running time {running_time}",
+ );
+
+ let running_time = running_time.positive().unwrap_or_else(|| {
+ gst::error!(CAT, obj: sinkpad, "Stream has negative PTS running time");
+ gst::ClockTime::ZERO
+ });
+
+ *running_time_utc_time_mapping = Some((running_time, utc_time));
+
+ // Push the buffer onto the pre-queue and re-timestamp it and all other buffers
+ // based on the mapping above.
+ pre_queue.push_back((segment, buffer));
+
+ for (segment, buffer) in pre_queue.iter_mut() {
+ let buffer = buffer.make_mut();
+
+ let pts = segment.to_running_time_full(buffer.pts().unwrap()).unwrap();
+ let pts_utc_time = gst::Signed::Positive(utc_time)
+ .checked_sub_unsigned(running_time)
+ .and_then(|res| res.checked_add(pts))
+ .and_then(|res| res.positive())
+ .ok_or_else(|| {
+ gst::error!(CAT, obj: sinkpad, "Stream has negative PTS UTC time");
+ gst::FlowError::Error
+ })?;
+ gst::trace!(
+ CAT,
+ obj: sinkpad,
+ "Mapped PTS running time {pts} to UTC time {pts_utc_time}"
+ );
+ buffer.set_pts(pts_utc_time);
+
+ if let Some(dts) = buffer.dts() {
+ let dts = segment.to_running_time_full(dts).unwrap();
+ let dts_utc_time = gst::Signed::Positive(pts_utc_time)
+ .checked_sub(pts)
+ .and_then(|res| res.checked_add(dts))
+ .and_then(|res| res.positive())
+ .ok_or_else(|| {
+ gst::error!(CAT, obj: sinkpad, "Stream has negative DTS UTC time");
+ gst::FlowError::Error
+ })?;
+ gst::trace!(
+ CAT,
+ obj: sinkpad,
+ "Mapped DTS running time {dts} to UTC time {dts_utc_time}"
+ );
+ buffer.set_dts(dts_utc_time);
+ }
+
+ *segment = gst::FormattedSegment::default();
+ }
+
+ // Fall through below and pop the first buffer finally
+ }
+
+ if let Some((segment, buffer)) = pre_queue.pop_front() {
+ return Ok(Some((segment, buffer)));
+ }
+
+ // If the mapping is set, then we would get the buffer always from the pre-queue:
+ // - either it was set before already, in which case the next buffer would've been peeked
+ // for calculating the duration to the previous buffer, and then put into the pre-queue
+ // - or this is the very first buffer and we just put it into the queue overselves above
+ if self.obj().class().as_ref().variant == super::Variant::ONVIF {
+ if sinkpad.is_eos() {
+ return Ok(None);
+ }
+ unreachable!();
+ }
+
+ let buffer = match sinkpad.pop_buffer() {
+ None => return Ok(None),
+ Some(buffer) => buffer,
+ };
+
+ Self::check_buffer(&buffer, sinkpad, delta_frames)?;
+
+ let segment = match sinkpad.segment().downcast::<gst::ClockTime>().ok() {
+ Some(segment) => segment,
+ None => {
+ gst::error!(CAT, obj: sinkpad, "Got buffer before segment");
+ return Err(gst::FlowError::Error);
+ }
+ };
+
+ Ok(Some((segment, buffer)))
+ }
+
/// Queue a buffer and calculate its duration.
///
/// Returns `Ok(())` if a buffer with duration is known or if the stream is EOS and a buffer is
@@ -137,8 +451,13 @@ impl MP4Mux {
..
}) => {
// Already have a pending buffer but no duration, so try to get that now
- let buffer = match stream.sinkpad.peek_buffer() {
- Some(buffer) => buffer,
+ let (segment, buffer) = match self.peek_buffer(
+ &stream.sinkpad,
+ stream.delta_frames,
+ &mut stream.pre_queue,
+ &stream.running_time_utc_time_mapping,
+ )? {
+ Some(res) => res,
None => {
if stream.sinkpad.is_eos() {
let dur = buffer.duration().unwrap_or(gst::ClockTime::ZERO);
@@ -164,23 +483,8 @@ impl MP4Mux {
}
};
- if stream.delta_frames.requires_dts() && buffer.dts().is_none() {
- gst::error!(CAT, obj: stream.sinkpad, "Require DTS for video streams");
- return Err(gst::FlowError::Error);
- }
-
- if stream.delta_frames.intra_only()
- && buffer.flags().contains(gst::BufferFlags::DELTA_UNIT)
- {
- gst::error!(CAT, obj: stream.sinkpad, "Intra-only stream with delta units");
- return Err(gst::FlowError::Error);
- }
-
- let pts_position = buffer.pts().ok_or_else(|| {
- gst::error!(CAT, obj: stream.sinkpad, "Require timestamped buffers");
- gst::FlowError::Error
- })?;
-
+ // Was checked above
+ let pts_position = buffer.pts().unwrap();
let next_timestamp_position = if stream.delta_frames.requires_dts() {
// Was checked above
buffer.dts().unwrap()
@@ -188,23 +492,9 @@ impl MP4Mux {
pts_position
};
- let segment = match stream.sinkpad.segment().downcast::<gst::ClockTime>().ok() {
- Some(segment) => segment,
- None => {
- gst::error!(CAT, obj: stream.sinkpad, "Got buffer before segment");
- return Err(gst::FlowError::Error);
- }
- };
-
- // If the stream has no valid running time, assume it's before everything else.
- let next_timestamp = match segment.to_running_time_full(next_timestamp_position)
- {
- None => {
- gst::error!(CAT, obj: stream.sinkpad, "Stream has no valid running time");
- return Err(gst::FlowError::Error);
- }
- Some(running_time) => running_time,
- };
+ let next_timestamp = segment
+ .to_running_time_full(next_timestamp_position)
+ .unwrap();
gst::trace!(
CAT,
@@ -243,8 +533,13 @@ impl MP4Mux {
None => {
// Have no buffer queued at all yet
- let buffer = match stream.sinkpad.pop_buffer() {
- Some(buffer) => buffer,
+ let (segment, buffer) = match self.pop_buffer(
+ &stream.sinkpad,
+ stream.delta_frames,
+ &mut stream.pre_queue,
+ &mut stream.running_time_utc_time_mapping,
+ )? {
+ Some(res) => res,
None => {
if stream.sinkpad.is_eos() {
gst::trace!(
@@ -261,59 +556,18 @@ impl MP4Mux {
}
};
- if stream.delta_frames.requires_dts() && buffer.dts().is_none() {
- gst::error!(CAT, obj: stream.sinkpad, "Require DTS for video streams");
- return Err(gst::FlowError::Error);
- }
-
- if stream.delta_frames.intra_only()
- && buffer.flags().contains(gst::BufferFlags::DELTA_UNIT)
- {
- gst::error!(CAT, obj: stream.sinkpad, "Intra-only stream with delta units");
- return Err(gst::FlowError::Error);
- }
-
- let pts_position = buffer.pts().ok_or_else(|| {
- gst::error!(CAT, obj: stream.sinkpad, "Require timestamped buffers");
- gst::FlowError::Error
- })?;
+ // Was checked above
+ let pts_position = buffer.pts().unwrap();
let dts_position = buffer.dts();
- let segment = match stream
- .sinkpad
- .segment()
- .clone()
- .downcast::<gst::ClockTime>()
- .ok()
- {
- Some(segment) => segment,
- None => {
- gst::error!(CAT, obj: stream.sinkpad, "Got buffer before segment");
- return Err(gst::FlowError::Error);
- }
- };
-
- let pts = match segment.to_running_time_full(pts_position) {
- None => {
- gst::error!(CAT, obj: stream.sinkpad, "Stream has no valid PTS running time");
- return Err(gst::FlowError::Error);
- }
- Some(running_time) => running_time,
- }.positive().unwrap_or_else(|| {
- gst::error!(CAT, obj: stream.sinkpad, "Stream has negative PTS running time");
- gst::ClockTime::ZERO
- });
+ let pts = segment.to_running_time_full(pts_position).unwrap()
+ .positive().unwrap_or_else(|| {
+ gst::error!(CAT, obj: stream.sinkpad, "Stream has negative PTS running time");
+ gst::ClockTime::ZERO
+ });
- let dts = match dts_position {
- None => None,
- Some(dts_position) => match segment.to_running_time_full(dts_position) {
- None => {
- gst::error!(CAT, obj: stream.sinkpad, "Stream has no valid DTS running time");
- return Err(gst::FlowError::Error);
- }
- Some(running_time) => Some(running_time),
- },
- };
+ let dts = dts_position
+ .map(|dts_position| segment.to_running_time_full(dts_position).unwrap());
let timestamp = if stream.delta_frames.requires_dts() {
// Was checked above
@@ -407,8 +661,7 @@ impl MP4Mux {
{
gst::trace!(CAT,
obj: stream.sinkpad,
- "Continuing current chunk: single stream {}, or {} >= {} and {} >= {}",
- single_stream,
+ "Continuing current chunk: single stream {single_stream}, or {} >= {} and {} >= {}",
gst::format::Bytes::from_u64(stream.queued_chunk_bytes),
settings.interleave_bytes.map(gst::format::Bytes::from_u64).display(),
stream.queued_chunk_time, settings.interleave_time.display(),
@@ -528,8 +781,10 @@ impl MP4Mux {
// Now we can start handling buffers
while let Some(idx) = self.find_earliest_stream(settings, state)? {
let stream = &mut state.streams[idx];
-
let buffer = stream.pending_buffer.take().unwrap();
+
+ gst::trace!(CAT, obj: stream.sinkpad, "Handling buffer {buffer:?} at offset {}", state.current_offset);
+
let duration = buffer.duration.unwrap();
let composition_time_offset = buffer.composition_time_offset;
let mut buffer = buffer.buffer;
@@ -582,7 +837,7 @@ impl MP4Mux {
}
};
- gst::info!(CAT, obj: pad, "Configuring caps {:?}", caps);
+ gst::info!(CAT, obj: pad, "Configuring caps {caps:?}");
let s = caps.structure(0).unwrap();
@@ -632,6 +887,7 @@ impl MP4Mux {
state.streams.push(Stream {
sinkpad: pad,
+ pre_queue: VecDeque::new(),
caps,
delta_frames,
chunks: Vec::new(),
@@ -641,6 +897,7 @@ impl MP4Mux {
start_dts: None,
earliest_pts: None,
end_pts: None,
+ running_time_utc_time_mapping: None,
});
}
@@ -781,7 +1038,7 @@ impl ElementImpl for MP4Mux {
gst::error!(
CAT,
imp: self,
- "Can't request new pads after start was generated"
+ "Can't request new pads after stream was started"
);
return None;
}
@@ -802,7 +1059,7 @@ impl AggregatorImpl for MP4Mux {
) -> bool {
use gst::QueryViewMut;
- gst::trace!(CAT, obj: aggregator_pad, "Handling query {:?}", query);
+ gst::trace!(CAT, obj: aggregator_pad, "Handling query {query:?}");
match query.view_mut() {
QueryViewMut::Caps(q) => {
@@ -831,7 +1088,7 @@ impl AggregatorImpl for MP4Mux {
) -> Result<gst::FlowSuccess, gst::FlowError> {
use gst::EventView;
- gst::trace!(CAT, obj: aggregator_pad, "Handling event {:?}", event);
+ gst::trace!(CAT, obj: aggregator_pad, "Handling event {event:?}");
match event.view() {
EventView::Segment(ev) => {
@@ -855,7 +1112,7 @@ impl AggregatorImpl for MP4Mux {
fn sink_event(&self, aggregator_pad: &gst_base::AggregatorPad, event: gst::Event) -> bool {
use gst::EventView;
- gst::trace!(CAT, obj: aggregator_pad, "Handling event {:?}", event);
+ gst::trace!(CAT, obj: aggregator_pad, "Handling event {event:?}");
match event.view() {
EventView::Tag(_ev) => {
@@ -870,7 +1127,7 @@ impl AggregatorImpl for MP4Mux {
fn src_query(&self, query: &mut gst::QueryRef) -> bool {
use gst::QueryViewMut;
- gst::trace!(CAT, imp: self, "Handling query {:?}", query);
+ gst::trace!(CAT, imp: self, "Handling query {query:?}");
match query.view_mut() {
QueryViewMut::Seeking(q) => {
@@ -885,7 +1142,7 @@ impl AggregatorImpl for MP4Mux {
fn src_event(&self, event: gst::Event) -> bool {
use gst::EventView;
- gst::trace!(CAT, imp: self, "Handling event {:?}", event);
+ gst::trace!(CAT, imp: self, "Handling event {event:?}");
match event.view() {
EventView::Seek(_ev) => false,
@@ -894,9 +1151,13 @@ impl AggregatorImpl for MP4Mux {
}
fn flush(&self) -> Result<gst::FlowSuccess, gst::FlowError> {
+ gst::info!(CAT, imp: self, "Flushing");
+
let mut state = self.state.lock().unwrap();
for stream in &mut state.streams {
stream.pending_buffer = None;
+ stream.pre_queue.clear();
+ stream.running_time_utc_time_mapping = None;
}
drop(state);
@@ -977,14 +1238,12 @@ impl AggregatorImpl for MP4Mux {
// ... and then create the ftyp box plus mdat box header so we can start outputting
// actual data
- let buffers = buffers.get_mut().unwrap();
-
let ftyp = boxes::create_ftyp(self.obj().class().as_ref().variant).map_err(|err| {
gst::error!(CAT, imp: self, "Failed to create ftyp box: {err}");
gst::FlowError::Error
})?;
state.current_offset += ftyp.size() as u64;
- buffers.add(ftyp);
+ buffers.get_mut().unwrap().add(ftyp);
gst::info!(
CAT,
@@ -999,7 +1258,7 @@ impl AggregatorImpl for MP4Mux {
})?;
state.current_offset += mdat.size() as u64;
state.mdat_size = 0;
- buffers.add(mdat);
+ buffers.get_mut().unwrap().add(mdat);
}
let res = match self.drain_buffers(&settings, &mut state, buffers.get_mut().unwrap()) {
@@ -1062,7 +1321,7 @@ impl AggregatorImpl for MP4Mux {
if !buffers.is_empty() {
if let Err(err) = self.obj().finish_buffer_list(buffers) {
- gst::error!(CAT, imp: self, "Failed pushing buffer: {:?}", err);
+ gst::error!(CAT, imp: self, "Failed pushing buffers: {err:?}");
return Err(err);
}
}
@@ -1091,8 +1350,7 @@ impl AggregatorImpl for MP4Mux {
gst::error!(
CAT,
imp: self,
- "Failed pushing updated mdat box header buffer downstream: {:?}",
- err,
+ "Failed pushing updated mdat box header buffer downstream: {err:?}",
);
}
}
@@ -1229,6 +1487,110 @@ impl MP4MuxImpl for ISOMP4Mux {
const VARIANT: super::Variant = super::Variant::ISO;
}
+#[derive(Default)]
+pub(crate) struct ONVIFMP4Mux;
+
+#[glib::object_subclass]
+impl ObjectSubclass for ONVIFMP4Mux {
+ const NAME: &'static str = "GstONVIFMP4Mux";
+ type Type = super::ONVIFMP4Mux;
+ type ParentType = super::MP4Mux;
+}
+
+impl ObjectImpl for ONVIFMP4Mux {}
+
+impl GstObjectImpl for ONVIFMP4Mux {}
+
+impl ElementImpl for ONVIFMP4Mux {
+ fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
+ static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
+ gst::subclass::ElementMetadata::new(
+ "ONVIFMP4Mux",
+ "Codec/Muxer",
+ "ONVIF MP4 muxer",
+ "Sebastian Dröge <sebastian@centricular.com>",
+ )
+ });
+
+ Some(&*ELEMENT_METADATA)
+ }
+
+ fn pad_templates() -> &'static [gst::PadTemplate] {
+ static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
+ let src_pad_template = gst::PadTemplate::new(
+ "src",
+ gst::PadDirection::Src,
+ gst::PadPresence::Always,
+ &gst::Caps::builder("video/quicktime")
+ .field("variant", "iso")
+ .build(),
+ )
+ .unwrap();
+
+ let sink_pad_template = gst::PadTemplate::with_gtype(
+ "sink_%u",
+ gst::PadDirection::Sink,
+ gst::PadPresence::Request,
+ &[
+ gst::Structure::builder("video/x-h264")
+ .field("stream-format", gst::List::new(["avc", "avc3"]))
+ .field("alignment", "au")
+ .field("width", gst::IntRange::<i32>::new(1, u16::MAX as i32))
+ .field("height", gst::IntRange::<i32>::new(1, u16::MAX as i32))
+ .build(),
+ gst::Structure::builder("video/x-h265")
+ .field("stream-format", gst::List::new(["hvc1", "hev1"]))
+ .field("alignment", "au")
+ .field("width", gst::IntRange::<i32>::new(1, u16::MAX as i32))
+ .field("height", gst::IntRange::<i32>::new(1, u16::MAX as i32))
+ .build(),
+ gst::Structure::builder("image/jpeg")
+ .field("width", gst::IntRange::<i32>::new(1, u16::MAX as i32))
+ .field("height", gst::IntRange::<i32>::new(1, u16::MAX as i32))
+ .build(),
+ gst::Structure::builder("audio/mpeg")
+ .field("mpegversion", 4i32)
+ .field("stream-format", "raw")
+ .field("channels", gst::IntRange::<i32>::new(1, u16::MAX as i32))
+ .field("rate", gst::IntRange::<i32>::new(1, i32::MAX))
+ .build(),
+ gst::Structure::builder("audio/x-alaw")
+ .field("channels", gst::IntRange::<i32>::new(1, 2))
+ .field("rate", gst::IntRange::<i32>::new(1, i32::MAX))
+ .build(),
+ gst::Structure::builder("audio/x-mulaw")
+ .field("channels", gst::IntRange::<i32>::new(1, 2))
+ .field("rate", gst::IntRange::<i32>::new(1, i32::MAX))
+ .build(),
+ gst::Structure::builder("audio/x-adpcm")
+ .field("layout", "g726")
+ .field("channels", 1i32)
+ .field("rate", 8000i32)
+ .field("bitrate", gst::List::new([16000i32, 24000, 32000, 40000]))
+ .build(),
+ gst::Structure::builder("application/x-onvif-metadata")
+ .field("parsed", true)
+ .build(),
+ ]
+ .into_iter()
+ .collect::<gst::Caps>(),
+ super::MP4MuxPad::static_type(),
+ )
+ .unwrap();
+
+ vec![src_pad_template, sink_pad_template]
+ });
+
+ PAD_TEMPLATES.as_ref()
+ }
+}
+
+impl AggregatorImpl for ONVIFMP4Mux {}
+
+impl MP4MuxImpl for ONVIFMP4Mux {
+ const VARIANT: super::Variant = super::Variant::ONVIF;
+}
+
#[derive(Default, Clone)]
struct PadSettings {
trak_timescale: u32,
@@ -1291,9 +1653,13 @@ impl AggregatorPadImpl for MP4MuxPad {
let mux = aggregator.downcast_ref::<super::MP4Mux>().unwrap();
let mut mux_state = mux.imp().state.lock().unwrap();
+ gst::info!(CAT, imp: self, "Flushing");
+
for stream in &mut mux_state.streams {
if stream.sinkpad == *self.obj() {
stream.pending_buffer = None;
+ stream.pre_queue.clear();
+ stream.running_time_utc_time_mapping = None;
break;
}
}
diff --git a/mux/mp4/src/mp4mux/mod.rs b/mux/mp4/src/mp4mux/mod.rs
index 1e6ad28be..7ee2f4390 100644
--- a/mux/mp4/src/mp4mux/mod.rs
+++ b/mux/mp4/src/mp4mux/mod.rs
@@ -24,6 +24,10 @@ glib::wrapper! {
pub(crate) struct ISOMP4Mux(ObjectSubclass<imp::ISOMP4Mux>) @extends MP4Mux, gst_base::Aggregator, gst::Element, gst::Object;
}
+glib::wrapper! {
+ pub(crate) struct ONVIFMP4Mux(ObjectSubclass<imp::ONVIFMP4Mux>) @extends MP4Mux, gst_base::Aggregator, gst::Element, gst::Object;
+}
+
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
#[cfg(feature = "doc")]
{
@@ -36,6 +40,12 @@ pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Rank::Marginal,
ISOMP4Mux::static_type(),
)?;
+ gst::Element::register(
+ Some(plugin),
+ "onvifmp4mux",
+ gst::Rank::Marginal,
+ ONVIFMP4Mux::static_type(),
+ )?;
Ok(())
}
@@ -131,4 +141,5 @@ pub(crate) struct Header {
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum Variant {
ISO,
+ ONVIF,
}