Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/mux
diff options
context:
space:
mode:
authorSebastian Dröge <sebastian@centricular.com>2022-10-23 18:23:45 +0300
committerSebastian Dröge <sebastian@centricular.com>2022-10-23 20:25:08 +0300
commit211cd095d69726a3a2208feddd921d05b60c6540 (patch)
tree648bbbe80a68a68b3f6315fcf580cae146d0249e /mux
parent5d44e0eb3c309ce7ad0cfb378d0169d8ce3305b3 (diff)
Add new mux subdirectory for container formats
Contains the (incomplete) flavors FLV demuxer and the fragmented MP4 muxer for now. Fixes https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/173
Diffstat (limited to 'mux')
-rw-r--r--mux/flavors/Cargo.toml47
l---------mux/flavors/LICENSE-APACHE1
l---------mux/flavors/LICENSE-MIT1
-rw-r--r--mux/flavors/build.rs3
-rw-r--r--mux/flavors/src/bytes.rs142
-rw-r--r--mux/flavors/src/flvdemux/imp.rs1538
-rw-r--r--mux/flavors/src/flvdemux/mod.rs27
-rw-r--r--mux/flavors/src/lib.rs36
-rw-r--r--mux/fmp4/Cargo.toml52
l---------mux/fmp4/LICENSE1
-rw-r--r--mux/fmp4/build.rs3
-rw-r--r--mux/fmp4/examples/dash_vod.rs266
-rw-r--r--mux/fmp4/examples/hls_live.rs572
-rw-r--r--mux/fmp4/examples/hls_vod.rs472
-rw-r--r--mux/fmp4/src/fmp4mux/boxes.rs2073
-rw-r--r--mux/fmp4/src/fmp4mux/imp.rs2596
-rw-r--r--mux/fmp4/src/fmp4mux/mod.rs146
-rw-r--r--mux/fmp4/src/lib.rs34
-rw-r--r--mux/fmp4/tests/tests.rs1241
19 files changed, 9251 insertions, 0 deletions
diff --git a/mux/flavors/Cargo.toml b/mux/flavors/Cargo.toml
new file mode 100644
index 000000000..7f8eceeb8
--- /dev/null
+++ b/mux/flavors/Cargo.toml
@@ -0,0 +1,47 @@
+[package]
+name = "gst-plugin-flavors"
+version = "0.9.0-alpha.1"
+authors = ["Sebastian Dröge <sebastian@centricular.com>"]
+repository = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs"
+license = "MIT OR Apache-2.0"
+edition = "2021"
+rust-version = "1.63"
+description = "GStreamer Rust FLV Plugin"
+
+[dependencies]
+gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
+gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
+gst-audio = { package = "gstreamer-audio", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
+num-rational = { version = "0.4", default-features = false, features = [] }
+nom = "7"
+flavors = { git = "https://github.com/rust-av/flavors" }
+muldiv = "1.0"
+byteorder = "1.0"
+once_cell = "1.0"
+smallvec = "1.0"
+
+[lib]
+name = "gstrsflv"
+crate-type = ["cdylib", "rlib"]
+path = "src/lib.rs"
+
+[build-dependencies]
+gst-plugin-version-helper = { path="../../version-helper" }
+
+[features]
+static = []
+capi = []
+doc = ["gst/v1_18"]
+
+[package.metadata.capi]
+min_version = "0.8.0"
+
+[package.metadata.capi.header]
+enabled = false
+
+[package.metadata.capi.library]
+install_subdir = "gstreamer-1.0"
+versioning = false
+
+[package.metadata.capi.pkg_config]
+requires_private = "gstreamer-1.0, gstreamer-base-1.0, gobject-2.0, glib-2.0, gmodule-2.0"
diff --git a/mux/flavors/LICENSE-APACHE b/mux/flavors/LICENSE-APACHE
new file mode 120000
index 000000000..1cd601d0a
--- /dev/null
+++ b/mux/flavors/LICENSE-APACHE
@@ -0,0 +1 @@
+../../LICENSE-APACHE \ No newline at end of file
diff --git a/mux/flavors/LICENSE-MIT b/mux/flavors/LICENSE-MIT
new file mode 120000
index 000000000..b2cfbdc7b
--- /dev/null
+++ b/mux/flavors/LICENSE-MIT
@@ -0,0 +1 @@
+../../LICENSE-MIT \ No newline at end of file
diff --git a/mux/flavors/build.rs b/mux/flavors/build.rs
new file mode 100644
index 000000000..cda12e57e
--- /dev/null
+++ b/mux/flavors/build.rs
@@ -0,0 +1,3 @@
+fn main() {
+ gst_plugin_version_helper::info()
+}
diff --git a/mux/flavors/src/bytes.rs b/mux/flavors/src/bytes.rs
new file mode 100644
index 000000000..02263993c
--- /dev/null
+++ b/mux/flavors/src/bytes.rs
@@ -0,0 +1,142 @@
+// Copyright (C) 2016-2017 Sebastian Dröge <sebastian@centricular.com>
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+//
+// SPDX-License-Identifier: MIT OR Apache-2.0
+
+pub use byteorder::{BigEndian, LittleEndian, ReadBytesExt, WriteBytesExt};
+use std::io;
+
+pub trait ReadBytesExtShort: io::Read {
+ fn read_u16le(&mut self) -> io::Result<u16> {
+ self.read_u16::<LittleEndian>()
+ }
+ fn read_i16le(&mut self) -> io::Result<i16> {
+ self.read_i16::<LittleEndian>()
+ }
+ fn read_u32le(&mut self) -> io::Result<u32> {
+ self.read_u32::<LittleEndian>()
+ }
+ fn read_i32le(&mut self) -> io::Result<i32> {
+ self.read_i32::<LittleEndian>()
+ }
+ fn read_u64le(&mut self) -> io::Result<u64> {
+ self.read_u64::<LittleEndian>()
+ }
+ fn read_i64le(&mut self) -> io::Result<i64> {
+ self.read_i64::<LittleEndian>()
+ }
+ fn read_uintle(&mut self, nbytes: usize) -> io::Result<u64> {
+ self.read_uint::<LittleEndian>(nbytes)
+ }
+ fn read_intle(&mut self, nbytes: usize) -> io::Result<i64> {
+ self.read_int::<LittleEndian>(nbytes)
+ }
+ fn read_f32le(&mut self) -> io::Result<f32> {
+ self.read_f32::<LittleEndian>()
+ }
+ fn read_f64le(&mut self) -> io::Result<f64> {
+ self.read_f64::<LittleEndian>()
+ }
+ fn read_u16be(&mut self) -> io::Result<u16> {
+ self.read_u16::<BigEndian>()
+ }
+ fn read_i16be(&mut self) -> io::Result<i16> {
+ self.read_i16::<BigEndian>()
+ }
+ fn read_u32be(&mut self) -> io::Result<u32> {
+ self.read_u32::<BigEndian>()
+ }
+ fn read_i32be(&mut self) -> io::Result<i32> {
+ self.read_i32::<BigEndian>()
+ }
+ fn read_u64be(&mut self) -> io::Result<u64> {
+ self.read_u64::<BigEndian>()
+ }
+ fn read_i64be(&mut self) -> io::Result<i64> {
+ self.read_i64::<BigEndian>()
+ }
+ fn read_uintbe(&mut self, nbytes: usize) -> io::Result<u64> {
+ self.read_uint::<BigEndian>(nbytes)
+ }
+ fn read_intbe(&mut self, nbytes: usize) -> io::Result<i64> {
+ self.read_int::<BigEndian>(nbytes)
+ }
+ fn read_f32be(&mut self) -> io::Result<f32> {
+ self.read_f32::<BigEndian>()
+ }
+ fn read_f64be(&mut self) -> io::Result<f64> {
+ self.read_f64::<BigEndian>()
+ }
+}
+
+impl<T> ReadBytesExtShort for T where T: ReadBytesExt {}
+
+pub trait WriteBytesExtShort: WriteBytesExt {
+ fn write_u16le(&mut self, n: u16) -> io::Result<()> {
+ self.write_u16::<LittleEndian>(n)
+ }
+ fn write_i16le(&mut self, n: i16) -> io::Result<()> {
+ self.write_i16::<LittleEndian>(n)
+ }
+ fn write_u32le(&mut self, n: u32) -> io::Result<()> {
+ self.write_u32::<LittleEndian>(n)
+ }
+ fn write_i32le(&mut self, n: i32) -> io::Result<()> {
+ self.write_i32::<LittleEndian>(n)
+ }
+ fn write_u64le(&mut self, n: u64) -> io::Result<()> {
+ self.write_u64::<LittleEndian>(n)
+ }
+ fn write_i64le(&mut self, n: i64) -> io::Result<()> {
+ self.write_i64::<LittleEndian>(n)
+ }
+ fn write_uintle(&mut self, n: u64, nbytes: usize) -> io::Result<()> {
+ self.write_uint::<LittleEndian>(n, nbytes)
+ }
+ fn write_intle(&mut self, n: i64, nbytes: usize) -> io::Result<()> {
+ self.write_int::<LittleEndian>(n, nbytes)
+ }
+ fn write_f32le(&mut self, n: f32) -> io::Result<()> {
+ self.write_f32::<LittleEndian>(n)
+ }
+ fn write_f64le(&mut self, n: f64) -> io::Result<()> {
+ self.write_f64::<LittleEndian>(n)
+ }
+ fn write_u16be(&mut self, n: u16) -> io::Result<()> {
+ self.write_u16::<BigEndian>(n)
+ }
+ fn write_i16be(&mut self, n: i16) -> io::Result<()> {
+ self.write_i16::<BigEndian>(n)
+ }
+ fn write_u32be(&mut self, n: u32) -> io::Result<()> {
+ self.write_u32::<BigEndian>(n)
+ }
+ fn write_i32be(&mut self, n: i32) -> io::Result<()> {
+ self.write_i32::<BigEndian>(n)
+ }
+ fn write_u64be(&mut self, n: u64) -> io::Result<()> {
+ self.write_u64::<BigEndian>(n)
+ }
+ fn write_i64be(&mut self, n: i64) -> io::Result<()> {
+ self.write_i64::<BigEndian>(n)
+ }
+ fn write_uintbe(&mut self, n: u64, nbytes: usize) -> io::Result<()> {
+ self.write_uint::<BigEndian>(n, nbytes)
+ }
+ fn write_intbe(&mut self, n: i64, nbytes: usize) -> io::Result<()> {
+ self.write_int::<BigEndian>(n, nbytes)
+ }
+ fn write_f32be(&mut self, n: f32) -> io::Result<()> {
+ self.write_f32::<BigEndian>(n)
+ }
+ fn write_f64be(&mut self, n: f64) -> io::Result<()> {
+ self.write_f64::<BigEndian>(n)
+ }
+}
+
+impl<T> WriteBytesExtShort for T where T: WriteBytesExt {}
diff --git a/mux/flavors/src/flvdemux/imp.rs b/mux/flavors/src/flvdemux/imp.rs
new file mode 100644
index 000000000..67d2f7d9d
--- /dev/null
+++ b/mux/flavors/src/flvdemux/imp.rs
@@ -0,0 +1,1538 @@
+// Copyright (C) 2016-2018 Sebastian Dröge <sebastian@centricular.com>
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+//
+// SPDX-License-Identifier: MIT OR Apache-2.0
+
+use std::cmp;
+use std::sync::Mutex;
+
+// FIXME: rustfmt removes the :: but they're required here
+#[rustfmt::skip]
+use ::flavors::parser as flavors;
+
+use gst::glib;
+use gst::prelude::*;
+use gst::subclass::prelude::*;
+
+use num_rational::Rational32;
+
+use once_cell::sync::Lazy;
+
+use smallvec::SmallVec;
+
+static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
+ gst::DebugCategory::new(
+ "rsflvdemux",
+ gst::DebugColorFlags::empty(),
+ Some("Rust FLV demuxer"),
+ )
+});
+
+pub struct FlvDemux {
+ sinkpad: gst::Pad,
+ audio_srcpad: Mutex<Option<gst::Pad>>,
+ video_srcpad: Mutex<Option<gst::Pad>>,
+ adapter: Mutex<gst_base::UniqueAdapter>,
+ flow_combiner: Mutex<gst_base::UniqueFlowCombiner>,
+ state: Mutex<State>,
+}
+
+#[allow(clippy::large_enum_variant)]
+enum State {
+ Stopped,
+ NeedHeader,
+ Skipping {
+ audio: bool,
+ video: bool,
+ skip_left: u32,
+ },
+ Streaming(StreamingState),
+}
+
+#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+enum Stream {
+ Audio,
+ Video,
+}
+
+#[derive(Clone, PartialEq, Eq)]
+enum Event {
+ StreamChanged(Stream, gst::Caps),
+ Buffer(Stream, gst::Buffer),
+ HaveAllStreams,
+}
+
+struct StreamingState {
+ audio: Option<AudioFormat>,
+ expect_audio: bool,
+ video: Option<VideoFormat>,
+ expect_video: bool,
+ got_all_streams: bool,
+ last_position: Option<gst::ClockTime>,
+
+ metadata: Option<Metadata>,
+
+ aac_sequence_header: Option<gst::Buffer>,
+ avc_sequence_header: Option<gst::Buffer>,
+}
+
+#[derive(Debug, Eq, Clone)]
+struct AudioFormat {
+ format: flavors::SoundFormat,
+ rate: u16,
+ width: u8,
+ channels: u8,
+ bitrate: Option<u32>,
+ aac_sequence_header: Option<gst::Buffer>,
+}
+
+#[derive(Debug, Eq, Clone)]
+struct VideoFormat {
+ format: flavors::CodecId,
+ width: Option<u32>,
+ height: Option<u32>,
+ pixel_aspect_ratio: Option<Rational32>,
+ framerate: Option<Rational32>,
+ bitrate: Option<u32>,
+ avc_sequence_header: Option<gst::Buffer>,
+}
+
+#[derive(Debug, PartialEq, Eq, Clone, Default)]
+struct Metadata {
+ duration: Option<gst::ClockTime>,
+
+ creation_date: Option<String>,
+ creator: Option<String>,
+ title: Option<String>,
+ metadata_creator: Option<String>, /* TODO: seek_table: _,
+ * filepositions / times metadata arrays */
+
+ audio_bitrate: Option<u32>,
+
+ video_width: Option<u32>,
+ video_height: Option<u32>,
+ video_pixel_aspect_ratio: Option<Rational32>,
+ video_framerate: Option<Rational32>,
+ video_bitrate: Option<u32>,
+}
+
+#[glib::object_subclass]
+impl ObjectSubclass for FlvDemux {
+ const NAME: &'static str = "RsFlvDemux";
+ type Type = super::FlvDemux;
+ type ParentType = gst::Element;
+
+ fn with_class(klass: &Self::Class) -> Self {
+ let templ = klass.pad_template("sink").unwrap();
+ let sinkpad = gst::Pad::builder_with_template(&templ, Some("sink"))
+ .activate_function(|pad, parent| {
+ FlvDemux::catch_panic_pad_function(
+ parent,
+ || Err(gst::loggable_error!(CAT, "Panic activating sink pad")),
+ |demux| demux.sink_activate(pad),
+ )
+ })
+ .activatemode_function(|pad, parent, mode, active| {
+ FlvDemux::catch_panic_pad_function(
+ parent,
+ || {
+ Err(gst::loggable_error!(
+ CAT,
+ "Panic activating sink pad with mode"
+ ))
+ },
+ |demux| {
+ demux.sink_activatemode(pad, mode, active);
+ Ok(())
+ },
+ )
+ })
+ .chain_function(|pad, parent, buffer| {
+ FlvDemux::catch_panic_pad_function(
+ parent,
+ || Err(gst::FlowError::Error),
+ |demux| demux.sink_chain(pad, buffer),
+ )
+ })
+ .event_function(|pad, parent, event| {
+ FlvDemux::catch_panic_pad_function(
+ parent,
+ || false,
+ |demux| demux.sink_event(pad, event),
+ )
+ })
+ .build();
+
+ FlvDemux {
+ sinkpad,
+ audio_srcpad: Mutex::new(None),
+ video_srcpad: Mutex::new(None),
+ state: Mutex::new(State::Stopped),
+ adapter: Mutex::new(gst_base::UniqueAdapter::new()),
+ flow_combiner: Mutex::new(gst_base::UniqueFlowCombiner::new()),
+ }
+ }
+}
+
+impl ObjectImpl for FlvDemux {
+ fn constructed(&self) {
+ self.parent_constructed();
+
+ self.instance().add_pad(&self.sinkpad).unwrap();
+ }
+}
+
+impl GstObjectImpl for FlvDemux {}
+
+impl ElementImpl for FlvDemux {
+ fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
+ static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
+ gst::subclass::ElementMetadata::new(
+ "FLV Demuxer",
+ "Codec/Demuxer",
+ "Demuxes FLV Streams",
+ "Sebastian Dröge <sebastian@centricular.com>",
+ )
+ });
+
+ Some(&*ELEMENT_METADATA)
+ }
+
+ fn pad_templates() -> &'static [gst::PadTemplate] {
+ static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
+ let mut caps = gst::Caps::new_empty();
+ {
+ let caps = caps.get_mut().unwrap();
+
+ caps.append(
+ gst::Caps::builder("audio/mpeg")
+ .field("mpegversion", 1i32)
+ .build(),
+ );
+ caps.append(
+ gst_audio::AudioCapsBuilder::new_interleaved()
+ .format_list([gst_audio::AudioFormat::U8, gst_audio::AudioFormat::S16le])
+ .build(),
+ );
+ caps.append(
+ gst::Caps::builder("audio/x-adpcm")
+ .field("layout", "swf")
+ .build(),
+ );
+ caps.append(gst::Caps::builder("audio/x-nellymoser").build());
+ caps.append(gst::Caps::builder("audio/x-alaw").build());
+ caps.append(gst::Caps::builder("audio/x-mulaw").build());
+ caps.append(
+ gst::Caps::builder("audio/mpeg")
+ .field("mpegversion", 4i32)
+ .field("framed", true)
+ .field("stream-format", "raw")
+ .build(),
+ );
+ caps.append(gst::Caps::builder("audio/x-speex").build());
+ }
+ let audiosrc_pad_template = gst::PadTemplate::new(
+ "audio",
+ gst::PadDirection::Src,
+ gst::PadPresence::Sometimes,
+ &caps,
+ )
+ .unwrap();
+
+ let mut caps = gst::Caps::new_empty();
+ {
+ let caps = caps.get_mut().unwrap();
+
+ caps.append(
+ gst::Caps::builder("video/x-flash-video")
+ .field("flvversion", 1i32)
+ .build(),
+ );
+ caps.append(gst::Caps::builder("video/x-flash-screen").build());
+ caps.append(gst::Caps::builder("video/x-vp6-flash").build());
+ caps.append(gst::Caps::builder("video/x-vp6-flash-alpha").build());
+ caps.append(gst::Caps::builder("video/x-flash-screen2").build());
+ caps.append(
+ gst::Caps::builder("video/x-h264")
+ .field("stream-format", "avc")
+ .build(),
+ );
+ caps.append(gst::Caps::builder("video/x-h263").build());
+ caps.append(
+ gst::Caps::builder("video/mpeg")
+ .field("mpegversion", 4i32)
+ .build(),
+ );
+ }
+ let videosrc_pad_template = gst::PadTemplate::new(
+ "video",
+ gst::PadDirection::Src,
+ gst::PadPresence::Sometimes,
+ &caps,
+ )
+ .unwrap();
+
+ let caps = gst::Caps::builder("video/x-flv").build();
+ let sink_pad_template = gst::PadTemplate::new(
+ "sink",
+ gst::PadDirection::Sink,
+ gst::PadPresence::Always,
+ &caps,
+ )
+ .unwrap();
+
+ vec![
+ audiosrc_pad_template,
+ videosrc_pad_template,
+ sink_pad_template,
+ ]
+ });
+
+ PAD_TEMPLATES.as_ref()
+ }
+}
+
+impl FlvDemux {
+ fn sink_activate(&self, pad: &gst::Pad) -> Result<(), gst::LoggableError> {
+ let mode = {
+ let mut query = gst::query::Scheduling::new();
+ if !pad.peer_query(&mut query) {
+ return Err(gst::loggable_error!(CAT, "Scheduling query failed on peer"));
+ }
+
+ // TODO: pull mode
+ // if query.has_scheduling_mode_with_flags(
+ // gst::PadMode::Pull,
+ // gst::SchedulingFlags::SEEKABLE,
+ // )
+ // {
+ // gst::debug!(CAT, obj: pad, "Activating in Pull mode");
+ // gst::PadMode::Pull
+ // } else {
+ gst::debug!(CAT, obj: pad, "Activating in Push mode");
+ gst::PadMode::Push
+ // }
+ };
+
+ pad.activate_mode(mode, true)?;
+ Ok(())
+ }
+
+ fn sink_activatemode(&self, _pad: &gst::Pad, mode: gst::PadMode, active: bool) {
+ if active {
+ self.start(mode);
+
+ if mode == gst::PadMode::Pull {
+ // TODO implement pull mode
+ // self.sinkpad.start_task(...)
+ unimplemented!();
+ }
+ } else {
+ if mode == gst::PadMode::Pull {
+ let _ = self.sinkpad.stop_task();
+ }
+
+ self.stop();
+ }
+ }
+
+ fn start(&self, _mode: gst::PadMode) {
+ *self.state.lock().unwrap() = State::NeedHeader;
+ }
+
+ fn stop(&self) {
+ *self.state.lock().unwrap() = State::Stopped;
+ self.adapter.lock().unwrap().clear();
+
+ let mut flow_combiner = self.flow_combiner.lock().unwrap();
+ if let Some(pad) = self.audio_srcpad.lock().unwrap().take() {
+ self.instance().remove_pad(&pad).unwrap();
+ flow_combiner.remove_pad(&pad);
+ }
+
+ if let Some(pad) = self.video_srcpad.lock().unwrap().take() {
+ self.instance().remove_pad(&pad).unwrap();
+ flow_combiner.remove_pad(&pad);
+ }
+
+ flow_combiner.reset();
+ }
+
+ fn sink_event(&self, pad: &gst::Pad, event: gst::Event) -> bool {
+ use gst::EventView;
+
+ gst::log!(CAT, obj: pad, "Handling event {:?}", event);
+ match event.view() {
+ EventView::Eos(..) => {
+ // TODO implement
+ gst::Pad::event_default(pad, Some(&*self.instance()), event)
+ }
+ EventView::Segment(..) => {
+ // TODO implement
+ gst::Pad::event_default(pad, Some(&*self.instance()), event)
+ }
+ EventView::FlushStart(..) => {
+ // TODO implement
+ gst::Pad::event_default(pad, Some(&*self.instance()), event)
+ }
+ EventView::FlushStop(..) => {
+ // TODO implement
+ gst::Pad::event_default(pad, Some(&*self.instance()), event)
+ }
+ _ => gst::Pad::event_default(pad, Some(&*self.instance()), event),
+ }
+ }
+
+ fn src_query(&self, pad: &gst::Pad, query: &mut gst::QueryRef) -> bool {
+ use gst::QueryViewMut;
+
+ match query.view_mut() {
+ QueryViewMut::Position(q) => {
+ let fmt = q.format();
+ if fmt == gst::Format::Time {
+ if self.sinkpad.peer_query(q.query_mut()) {
+ return true;
+ }
+
+ if let State::Streaming(StreamingState { last_position, .. }) =
+ *self.state.lock().unwrap()
+ {
+ q.set(last_position);
+ return true;
+ }
+
+ false
+ } else {
+ false
+ }
+ }
+ QueryViewMut::Duration(q) => {
+ let fmt = q.format();
+ if fmt == gst::Format::Time {
+ if self.sinkpad.peer_query(q.query_mut()) {
+ return true;
+ }
+
+ if let State::Streaming(StreamingState {
+ metadata: Some(Metadata { duration, .. }),
+ ..
+ }) = *self.state.lock().unwrap()
+ {
+ q.set(duration);
+ return true;
+ }
+
+ false
+ } else {
+ false
+ }
+ }
+ _ => gst::Pad::query_default(pad, Some(&*self.instance()), query),
+ }
+ }
+
+ fn src_event(&self, pad: &gst::Pad, event: gst::Event) -> bool {
+ use gst::EventView;
+
+ match event.view() {
+ EventView::Seek(..) => {
+ // TODO: Implement
+ false
+ }
+ _ => gst::Pad::event_default(pad, Some(&*self.instance()), event),
+ }
+ }
+
+ fn sink_chain(
+ &self,
+ pad: &gst::Pad,
+ buffer: gst::Buffer,
+ ) -> Result<gst::FlowSuccess, gst::FlowError> {
+ gst::log!(CAT, obj: pad, "Handling buffer {:?}", buffer);
+
+ let mut adapter = self.adapter.lock().unwrap();
+ adapter.push(buffer);
+
+ let mut state = self.state.lock().unwrap();
+ loop {
+ match *state {
+ State::Stopped => unreachable!(),
+ State::NeedHeader => {
+ let header = match self.find_header(&mut *adapter) {
+ Ok(header) => header,
+ Err(_) => {
+ gst::trace!(CAT, imp: self, "Need more data");
+ return Ok(gst::FlowSuccess::Ok);
+ }
+ };
+
+ let skip = if header.offset < 9 {
+ 0
+ } else {
+ header.offset - 9
+ };
+
+ *state = State::Skipping {
+ audio: header.audio,
+ video: header.video,
+ skip_left: skip,
+ };
+ }
+ State::Skipping {
+ audio,
+ video,
+ skip_left: 0,
+ } => {
+ *state = State::Streaming(StreamingState::new(audio, video));
+ }
+ State::Skipping {
+ ref mut skip_left, ..
+ } => {
+ let avail = adapter.available();
+ if avail == 0 {
+ gst::trace!(CAT, imp: self, "Need more data");
+ return Ok(gst::FlowSuccess::Ok);
+ }
+ let skip = cmp::min(avail, *skip_left as usize);
+ adapter.flush(skip);
+ *skip_left -= skip as u32;
+ }
+ State::Streaming(ref mut sstate) => {
+ let res = sstate.handle_tag(self, &mut *adapter);
+
+ match res {
+ Ok(None) => {
+ gst::trace!(CAT, imp: self, "Need more data");
+ return Ok(gst::FlowSuccess::Ok);
+ }
+ Ok(Some(events)) => {
+ drop(state);
+ drop(adapter);
+
+ self.handle_events(events)?;
+
+ adapter = self.adapter.lock().unwrap();
+ state = self.state.lock().unwrap();
+ }
+ Err(err) => {
+ self.post_error_message(err);
+ return Err(gst::FlowError::Error);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ fn find_header(&self, adapter: &mut gst_base::UniqueAdapter) -> Result<flavors::Header, ()> {
+ while adapter.available() >= 9 {
+ let data = adapter.map(9).unwrap();
+
+ if let Ok((_, header)) = flavors::header(&*data) {
+ gst::debug!(CAT, imp: self, "Found FLV header: {:?}", header);
+ drop(data);
+ adapter.flush(9);
+
+ return Ok(header);
+ }
+
+ drop(data);
+ adapter.flush(1);
+ }
+
+ Err(())
+ }
+
+ fn handle_events(
+ &self,
+ events: SmallVec<[Event; 4]>,
+ ) -> Result<gst::FlowSuccess, gst::FlowError> {
+ for event in events {
+ match event {
+ Event::StreamChanged(stream, caps) => {
+ let pad = match stream {
+ Stream::Audio => {
+ let mut audio_srcpad = self.audio_srcpad.lock().unwrap();
+ if let Some(ref srcpad) = *audio_srcpad {
+ srcpad.clone()
+ } else {
+ let srcpad = self.create_srcpad("audio", &caps);
+ *audio_srcpad = Some(srcpad.clone());
+
+ srcpad
+ }
+ }
+ Stream::Video => {
+ let mut video_srcpad = self.video_srcpad.lock().unwrap();
+ if let Some(ref srcpad) = *video_srcpad {
+ srcpad.clone()
+ } else {
+ let srcpad = self.create_srcpad("video", &caps);
+
+ *video_srcpad = Some(srcpad.clone());
+
+ srcpad
+ }
+ }
+ };
+
+ pad.push_event(gst::event::Caps::new(&caps));
+ }
+ Event::Buffer(stream, buffer) => {
+ let pad = match stream {
+ Stream::Audio => {
+ self.audio_srcpad.lock().unwrap().as_ref().map(Clone::clone)
+ }
+ Stream::Video => {
+ self.video_srcpad.lock().unwrap().as_ref().map(Clone::clone)
+ }
+ };
+
+ if let Some(pad) = pad {
+ let res = pad.push(buffer);
+ gst::trace!(
+ CAT,
+ imp: self,
+ "Pushing buffer for stream {:?} returned {:?}",
+ stream,
+ res
+ );
+
+ self.flow_combiner
+ .lock()
+ .unwrap()
+ .update_pad_flow(&pad, res)?;
+ }
+ }
+ Event::HaveAllStreams => {
+ self.instance().no_more_pads();
+ }
+ }
+ }
+
+ Ok(gst::FlowSuccess::Ok)
+ }
+
+ fn create_srcpad(&self, name: &str, caps: &gst::Caps) -> gst::Pad {
+ let templ = self.instance().element_class().pad_template(name).unwrap();
+ let srcpad = gst::Pad::builder_with_template(&templ, Some(name))
+ .event_function(|pad, parent, event| {
+ FlvDemux::catch_panic_pad_function(
+ parent,
+ || false,
+ |demux| demux.src_event(pad, event),
+ )
+ })
+ .query_function(|pad, parent, query| {
+ FlvDemux::catch_panic_pad_function(
+ parent,
+ || false,
+ |demux| demux.src_query(pad, query),
+ )
+ })
+ .build();
+
+ srcpad.set_active(true).unwrap();
+
+ let full_stream_id = srcpad.create_stream_id(&*self.instance(), Some(name));
+ // FIXME group id
+ srcpad.push_event(gst::event::StreamStart::new(&full_stream_id));
+ srcpad.push_event(gst::event::Caps::new(caps));
+
+ // FIXME proper segment handling
+ let segment = gst::FormattedSegment::<gst::ClockTime>::default();
+ srcpad.push_event(gst::event::Segment::new(&segment));
+
+ self.flow_combiner.lock().unwrap().add_pad(&srcpad);
+
+ self.instance().add_pad(&srcpad).unwrap();
+
+ srcpad
+ }
+}
+
+impl StreamingState {
+ fn new(audio: bool, video: bool) -> StreamingState {
+ StreamingState {
+ audio: None,
+ expect_audio: audio,
+ video: None,
+ expect_video: video,
+ got_all_streams: false,
+ last_position: gst::ClockTime::NONE,
+ metadata: None,
+ aac_sequence_header: None,
+ avc_sequence_header: None,
+ }
+ }
+
+ fn handle_tag(
+ &mut self,
+ imp: &FlvDemux,
+ adapter: &mut gst_base::UniqueAdapter,
+ ) -> Result<Option<SmallVec<[Event; 4]>>, gst::ErrorMessage> {
+ use nom::number::complete::be_u32;
+
+ if adapter.available() < 15 {
+ return Ok(None);
+ }
+
+ let data = adapter.map(15).unwrap();
+
+ match be_u32::<_, (_, nom::error::ErrorKind)>(&data[0..4]) {
+ Err(_) => unreachable!(),
+ Ok((_, previous_size)) => {
+ gst::trace!(CAT, imp: imp, "Previous tag size {}", previous_size);
+ // Nothing to do here, we just consume it for now
+ }
+ }
+
+ let tag_header = match flavors::tag_header(&data[4..]) {
+ Err(nom::Err::Error(err)) | Err(nom::Err::Failure(err)) => {
+ return Err(gst::error_msg!(
+ gst::StreamError::Demux,
+ ["Invalid tag header: {:?}", err]
+ ));
+ }
+ Err(nom::Err::Incomplete(_)) => unreachable!(),
+ Ok((_, tag_header)) => tag_header,
+ };
+
+ gst::trace!(CAT, imp: imp, "Parsed tag header {:?}", tag_header);
+
+ drop(data);
+
+ if adapter.available() < (15 + tag_header.data_size) as usize {
+ return Ok(None);
+ }
+
+ adapter.flush(15);
+
+ match tag_header.tag_type {
+ flavors::TagType::Script => {
+ gst::trace!(CAT, imp: imp, "Found script tag");
+
+ Ok(self.handle_script_tag(imp, &tag_header, adapter))
+ }
+ flavors::TagType::Audio => {
+ gst::trace!(CAT, imp: imp, "Found audio tag");
+
+ self.handle_audio_tag(imp, &tag_header, adapter)
+ }
+ flavors::TagType::Video => {
+ gst::trace!(CAT, imp: imp, "Found video tag");
+
+ self.handle_video_tag(imp, &tag_header, adapter)
+ }
+ }
+ .map(Option::Some)
+ }
+
+ fn handle_script_tag(
+ &mut self,
+ imp: &FlvDemux,
+ tag_header: &flavors::TagHeader,
+ adapter: &mut gst_base::UniqueAdapter,
+ ) -> SmallVec<[Event; 4]> {
+ assert!(adapter.available() >= tag_header.data_size as usize);
+
+ let mut events = SmallVec::new();
+
+ let data = adapter.map(tag_header.data_size as usize).unwrap();
+
+ match flavors::script_data(&*data) {
+ Ok((_, ref script_data)) if script_data.name == "onMetaData" => {
+ gst::trace!(CAT, imp: imp, "Got script tag: {:?}", script_data);
+
+ let metadata = Metadata::new(script_data);
+ gst::debug!(CAT, imp: imp, "Got metadata: {:?}", metadata);
+
+ let audio_changed = self
+ .audio
+ .as_mut()
+ .map(|a| a.update_with_metadata(&metadata))
+ .unwrap_or(false);
+ let video_changed = self
+ .video
+ .as_mut()
+ .map(|v| v.update_with_metadata(&metadata))
+ .unwrap_or(false);
+ self.metadata = Some(metadata);
+
+ if audio_changed || video_changed {
+ if audio_changed {
+ if let Some(caps) = self.audio.as_ref().and_then(|a| a.to_caps()) {
+ events.push(Event::StreamChanged(Stream::Audio, caps));
+ }
+ }
+ if video_changed {
+ if let Some(caps) = self.video.as_ref().and_then(|v| v.to_caps()) {
+ events.push(Event::StreamChanged(Stream::Video, caps));
+ }
+ }
+ }
+ }
+ Ok((_, ref script_data)) => {
+ gst::trace!(CAT, imp: imp, "Got script tag: {:?}", script_data);
+ }
+ Err(nom::Err::Error(err)) | Err(nom::Err::Failure(err)) => {
+ gst::error!(CAT, imp: imp, "Error parsing script tag: {:?}", err);
+ }
+ Err(nom::Err::Incomplete(_)) => {
+ // ignore
+ }
+ }
+
+ drop(data);
+ adapter.flush(tag_header.data_size as usize);
+
+ events
+ }
+
+ fn update_audio_stream(
+ &mut self,
+ imp: &FlvDemux,
+ data_header: &flavors::AudioDataHeader,
+ ) -> SmallVec<[Event; 4]> {
+ let mut events = SmallVec::new();
+
+ gst::trace!(CAT, imp: imp, "Got audio data header: {:?}", data_header);
+
+ let new_audio_format =
+ AudioFormat::new(data_header, &self.metadata, &self.aac_sequence_header);
+
+ if self.audio.as_ref() != Some(&new_audio_format) {
+ gst::debug!(
+ CAT,
+ imp: imp,
+ "Got new audio format: {:?}",
+ new_audio_format
+ );
+
+ let caps = new_audio_format.to_caps();
+ if let Some(caps) = caps {
+ self.audio = Some(new_audio_format);
+ events.push(Event::StreamChanged(Stream::Audio, caps));
+ } else {
+ self.audio = None;
+ }
+ }
+
+ if (!self.expect_video || self.video != None) && self.audio != None && !self.got_all_streams
+ {
+ gst::debug!(CAT, imp: imp, "Have all expected streams now");
+ self.got_all_streams = true;
+ events.push(Event::HaveAllStreams);
+ }
+
+ events
+ }
+
+ fn handle_aac_audio_packet_header(
+ &mut self,
+ imp: &FlvDemux,
+ tag_header: &flavors::TagHeader,
+ adapter: &mut gst_base::UniqueAdapter,
+ ) -> Result<bool, gst::ErrorMessage> {
+ // Not big enough for the AAC packet header, ship!
+ if tag_header.data_size < 1 + 1 {
+ adapter.flush((tag_header.data_size - 1) as usize);
+ gst::warning!(
+ CAT,
+ imp: imp,
+ "Too small packet for AAC packet header {}",
+ tag_header.data_size
+ );
+ return Ok(true);
+ }
+
+ let data = adapter.map(1).unwrap();
+
+ match flavors::aac_audio_packet_header(&*data) {
+ Err(nom::Err::Error(err)) | Err(nom::Err::Failure(err)) => {
+ gst::error!(CAT, imp: imp, "Invalid AAC audio packet header: {:?}", err);
+ drop(data);
+ adapter.flush((tag_header.data_size - 1) as usize);
+ Ok(true)
+ }
+ Err(nom::Err::Incomplete(_)) => unreachable!(),
+ Ok((_, header)) => {
+ gst::trace!(CAT, imp: imp, "Got AAC packet header {:?}", header);
+ match header.packet_type {
+ flavors::AACPacketType::SequenceHeader => {
+ drop(data);
+ adapter.flush(1);
+ let buffer = adapter
+ .take_buffer((tag_header.data_size - 1 - 1) as usize)
+ .unwrap();
+ gst::debug!(CAT, imp: imp, "Got AAC sequence header {:?}", buffer,);
+
+ self.aac_sequence_header = Some(buffer);
+ Ok(true)
+ }
+ flavors::AACPacketType::Raw => {
+ drop(data);
+ adapter.flush(1);
+ Ok(false)
+ }
+ }
+ }
+ }
+ }
+
+ fn handle_audio_tag(
+ &mut self,
+ imp: &FlvDemux,
+ tag_header: &flavors::TagHeader,
+ adapter: &mut gst_base::UniqueAdapter,
+ ) -> Result<SmallVec<[Event; 4]>, gst::ErrorMessage> {
+ assert!(adapter.available() >= tag_header.data_size as usize);
+
+ let data = adapter.map(1).unwrap();
+ let data_header = match flavors::audio_data_header(&*data) {
+ Err(nom::Err::Error(err)) | Err(nom::Err::Failure(err)) => {
+ gst::error!(CAT, imp: imp, "Invalid audio data header: {:?}", err);
+ drop(data);
+ adapter.flush(tag_header.data_size as usize);
+ return Ok(SmallVec::new());
+ }
+ Err(nom::Err::Incomplete(_)) => unreachable!(),
+ Ok((_, data_header)) => data_header,
+ };
+ drop(data);
+ adapter.flush(1);
+
+ let mut events = self.update_audio_stream(imp, &data_header);
+
+ // AAC special case
+ if data_header.sound_format == flavors::SoundFormat::AAC
+ && self.handle_aac_audio_packet_header(imp, tag_header, adapter)?
+ {
+ return Ok(events);
+ }
+
+ let offset = match data_header.sound_format {
+ flavors::SoundFormat::AAC => 2,
+ _ => 1,
+ };
+
+ if tag_header.data_size == offset {
+ return Ok(events);
+ }
+
+ if self.audio == None {
+ adapter.flush((tag_header.data_size - offset) as usize);
+ return Ok(events);
+ }
+
+ let mut buffer = adapter
+ .take_buffer((tag_header.data_size - offset) as usize)
+ .unwrap();
+
+ {
+ let buffer = buffer.get_mut().unwrap();
+ buffer.set_pts((tag_header.timestamp as u64).mseconds());
+ }
+
+ gst::trace!(
+ CAT,
+ imp: imp,
+ "Outputting audio buffer {:?} for tag {:?}",
+ buffer,
+ tag_header,
+ );
+
+ self.update_position(&buffer);
+
+ events.push(Event::Buffer(Stream::Audio, buffer));
+
+ Ok(events)
+ }
+
+ fn update_video_stream(
+ &mut self,
+ imp: &FlvDemux,
+ data_header: &flavors::VideoDataHeader,
+ ) -> SmallVec<[Event; 4]> {
+ let mut events = SmallVec::new();
+
+ gst::trace!(CAT, imp: imp, "Got video data header: {:?}", data_header);
+
+ let new_video_format =
+ VideoFormat::new(data_header, &self.metadata, &self.avc_sequence_header);
+
+ if self.video.as_ref() != Some(&new_video_format) {
+ gst::debug!(
+ CAT,
+ imp: imp,
+ "Got new video format: {:?}",
+ new_video_format
+ );
+
+ let caps = new_video_format.to_caps();
+ if let Some(caps) = caps {
+ self.video = Some(new_video_format);
+ events.push(Event::StreamChanged(Stream::Video, caps));
+ } else {
+ self.video = None;
+ }
+ }
+
+ if (!self.expect_audio || self.audio != None) && self.video != None && !self.got_all_streams
+ {
+ gst::debug!(CAT, imp: imp, "Have all expected streams now");
+ self.got_all_streams = true;
+ events.push(Event::HaveAllStreams);
+ }
+
+ events
+ }
+
+ fn handle_avc_video_packet_header(
+ &mut self,
+ imp: &FlvDemux,
+ tag_header: &flavors::TagHeader,
+ adapter: &mut gst_base::UniqueAdapter,
+ ) -> Result<Option<i32>, gst::ErrorMessage> {
+ // Not big enough for the AVC packet header, skip!
+ if tag_header.data_size < 1 + 4 {
+ adapter.flush((tag_header.data_size - 1) as usize);
+ gst::warning!(
+ CAT,
+ imp: imp,
+ "Too small packet for AVC packet header {}",
+ tag_header.data_size
+ );
+ return Ok(None);
+ }
+
+ let data = adapter.map(4).unwrap();
+ match flavors::avc_video_packet_header(&*data) {
+ Err(nom::Err::Error(err)) | Err(nom::Err::Failure(err)) => {
+ gst::error!(CAT, imp: imp, "Invalid AVC video packet header: {:?}", err);
+ drop(data);
+ adapter.flush((tag_header.data_size - 1) as usize);
+ Ok(None)
+ }
+ Err(nom::Err::Incomplete(_)) => unreachable!(),
+ Ok((_, header)) => {
+ gst::trace!(CAT, imp: imp, "Got AVC packet header {:?}", header);
+ match header.packet_type {
+ flavors::AVCPacketType::SequenceHeader => {
+ drop(data);
+ adapter.flush(4);
+ let buffer = adapter
+ .take_buffer((tag_header.data_size - 1 - 4) as usize)
+ .unwrap();
+ gst::debug!(
+ CAT,
+ imp: imp,
+ "Got AVC sequence header {:?} of size {}",
+ buffer,
+ tag_header.data_size - 1 - 4
+ );
+
+ self.avc_sequence_header = Some(buffer);
+ Ok(None)
+ }
+ flavors::AVCPacketType::NALU => {
+ drop(data);
+ adapter.flush(4);
+ Ok(Some(header.composition_time))
+ }
+ flavors::AVCPacketType::EndOfSequence => {
+ // Skip
+ drop(data);
+ adapter.flush((tag_header.data_size - 1) as usize);
+ Ok(None)
+ }
+ }
+ }
+ }
+ }
+
+ fn handle_video_tag(
+ &mut self,
+ imp: &FlvDemux,
+ tag_header: &flavors::TagHeader,
+ adapter: &mut gst_base::UniqueAdapter,
+ ) -> Result<SmallVec<[Event; 4]>, gst::ErrorMessage> {
+ assert!(adapter.available() >= tag_header.data_size as usize);
+
+ let data = adapter.map(1).unwrap();
+ let data_header = match flavors::video_data_header(&*data) {
+ Err(nom::Err::Error(err)) | Err(nom::Err::Failure(err)) => {
+ gst::error!(CAT, imp: imp, "Invalid video data header: {:?}", err);
+ drop(data);
+ adapter.flush(tag_header.data_size as usize);
+ return Ok(SmallVec::new());
+ }
+ Err(nom::Err::Incomplete(_)) => unreachable!(),
+ Ok((_, data_header)) => data_header,
+ };
+ drop(data);
+ adapter.flush(1);
+
+ let mut events = self.update_video_stream(imp, &data_header);
+
+ // AVC/H264 special case
+ let cts = if data_header.codec_id == flavors::CodecId::H264 {
+ match self.handle_avc_video_packet_header(imp, tag_header, adapter)? {
+ Some(cts) => cts,
+ None => {
+ return Ok(events);
+ }
+ }
+ } else {
+ 0
+ };
+
+ let offset = match data_header.codec_id {
+ flavors::CodecId::H264 => 5,
+ _ => 1,
+ };
+
+ if tag_header.data_size == offset {
+ return Ok(events);
+ }
+
+ if self.video == None {
+ adapter.flush((tag_header.data_size - offset) as usize);
+ return Ok(events);
+ }
+
+ let is_keyframe = data_header.frame_type == flavors::FrameType::Key;
+
+ let skip = match data_header.codec_id {
+ flavors::CodecId::VP6 | flavors::CodecId::VP6A => 1,
+ _ => 0,
+ };
+
+ if skip > 0 {
+ adapter.flush(skip as usize);
+ }
+
+ if tag_header.data_size == offset + skip {
+ return Ok(events);
+ }
+
+ let mut buffer = adapter
+ .take_buffer((tag_header.data_size - offset - skip) as usize)
+ .unwrap();
+
+ {
+ let buffer = buffer.get_mut().unwrap();
+ if !is_keyframe {
+ buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
+ }
+ buffer.set_dts((tag_header.timestamp as u64).mseconds());
+
+ // Prevent negative numbers
+ let pts = if cts < 0 && tag_header.timestamp < (-cts) as u32 {
+ 0
+ } else {
+ ((tag_header.timestamp as i64) + (cts as i64)) as u64
+ };
+ buffer.set_pts(pts.mseconds());
+ }
+
+ gst::trace!(
+ CAT,
+ imp: imp,
+ "Outputting video buffer {:?} for tag {:?}, keyframe: {}",
+ buffer,
+ tag_header,
+ is_keyframe
+ );
+
+ self.update_position(&buffer);
+
+ events.push(Event::Buffer(Stream::Video, buffer));
+
+ Ok(events)
+ }
+
+ fn update_position(&mut self, buffer: &gst::Buffer) {
+ if let Some(pts) = buffer.pts() {
+ self.last_position = self.last_position.opt_max(pts).or(Some(pts));
+ } else if let Some(dts) = buffer.dts() {
+ self.last_position = self.last_position.opt_max(dts).or(Some(dts));
+ }
+ }
+}
+
+// Ignores bitrate
+impl PartialEq for AudioFormat {
+ fn eq(&self, other: &Self) -> bool {
+ self.format.eq(&other.format)
+ && self.rate.eq(&other.rate)
+ && self.width.eq(&other.width)
+ && self.channels.eq(&other.channels)
+ && self.aac_sequence_header.eq(&other.aac_sequence_header)
+ }
+}
+
+impl AudioFormat {
+ fn new(
+ data_header: &flavors::AudioDataHeader,
+ metadata: &Option<Metadata>,
+ aac_sequence_header: &Option<gst::Buffer>,
+ ) -> AudioFormat {
+ let numeric_rate = match (data_header.sound_format, data_header.sound_rate) {
+ (flavors::SoundFormat::NELLYMOSER_16KHZ_MONO, _) => 16_000,
+ (flavors::SoundFormat::NELLYMOSER_8KHZ_MONO, _)
+ | (flavors::SoundFormat::PCM_ALAW, _)
+ | (flavors::SoundFormat::PCM_ULAW, _)
+ | (flavors::SoundFormat::MP3_8KHZ, _) => 8_000,
+ (flavors::SoundFormat::SPEEX, _) => 16_000,
+ (_, flavors::SoundRate::_5_5KHZ) => 5_512,
+ (_, flavors::SoundRate::_11KHZ) => 11_025,
+ (_, flavors::SoundRate::_22KHZ) => 22_050,
+ (_, flavors::SoundRate::_44KHZ) => 44_100,
+ };
+
+ let numeric_width = match data_header.sound_size {
+ flavors::SoundSize::Snd8bit => 8,
+ flavors::SoundSize::Snd16bit => 16,
+ };
+
+ let numeric_channels = match data_header.sound_type {
+ flavors::SoundType::SndMono => 1,
+ flavors::SoundType::SndStereo => 2,
+ };
+
+ AudioFormat {
+ format: data_header.sound_format,
+ rate: numeric_rate,
+ width: numeric_width,
+ channels: numeric_channels,
+ bitrate: metadata.as_ref().and_then(|m| m.audio_bitrate),
+ aac_sequence_header: aac_sequence_header.clone(),
+ }
+ }
+
+ fn update_with_metadata(&mut self, metadata: &Metadata) -> bool {
+ if self.bitrate != metadata.audio_bitrate {
+ self.bitrate = metadata.audio_bitrate;
+ true
+ } else {
+ false
+ }
+ }
+
+ fn to_caps(&self) -> Option<gst::Caps> {
+ let mut caps = match self.format {
+ flavors::SoundFormat::MP3 | flavors::SoundFormat::MP3_8KHZ => Some(
+ gst::Caps::builder("audio/mpeg")
+ .field("mpegversion", 1i32)
+ .field("layer", 3i32)
+ .build(),
+ ),
+ flavors::SoundFormat::PCM_NE | flavors::SoundFormat::PCM_LE => {
+ if self.rate != 0 && self.channels != 0 {
+ // Assume little-endian for "PCM_NE", it's probably more common and we have no
+ // way to know what the endianness of the system creating the stream was
+ Some(
+ gst_audio::AudioCapsBuilder::new_interleaved()
+ .format(if self.width == 8 {
+ gst_audio::AudioFormat::U8
+ } else {
+ gst_audio::AudioFormat::S16le
+ })
+ .build(),
+ )
+ } else {
+ None
+ }
+ }
+ flavors::SoundFormat::ADPCM => Some(
+ gst::Caps::builder("audio/x-adpcm")
+ .field("layout", "swf")
+ .build(),
+ ),
+ flavors::SoundFormat::NELLYMOSER_16KHZ_MONO
+ | flavors::SoundFormat::NELLYMOSER_8KHZ_MONO
+ | flavors::SoundFormat::NELLYMOSER => {
+ Some(gst::Caps::builder("audio/x-nellymoser").build())
+ }
+ flavors::SoundFormat::PCM_ALAW => Some(gst::Caps::builder("audio/x-alaw").build()),
+ flavors::SoundFormat::PCM_ULAW => Some(gst::Caps::builder("audio/x-mulaw").build()),
+ flavors::SoundFormat::AAC => self.aac_sequence_header.as_ref().map(|header| {
+ gst::Caps::builder("audio/mpeg")
+ .field("mpegversion", 4i32)
+ .field("framed", true)
+ .field("stream-format", "raw")
+ .field("codec_data", header)
+ .build()
+ }),
+ flavors::SoundFormat::SPEEX => {
+ use crate::bytes::*;
+ use std::io::{Cursor, Write};
+
+ let header = {
+ let header_size = 80;
+ let mut data = Cursor::new(Vec::with_capacity(header_size));
+ data.write_all(b"Speex 1.1.12").unwrap();
+ data.write_all(&[0; 14]).unwrap();
+ data.write_u32le(1).unwrap(); // version
+ data.write_u32le(80).unwrap(); // header size
+ data.write_u32le(16_000).unwrap(); // sample rate
+ data.write_u32le(1).unwrap(); // mode = wideband
+ data.write_u32le(4).unwrap(); // mode bitstream version
+ data.write_u32le(1).unwrap(); // channels
+ data.write_i32le(-1).unwrap(); // bitrate
+ data.write_u32le(0x50).unwrap(); // frame size
+ data.write_u32le(0).unwrap(); // VBR
+ data.write_u32le(1).unwrap(); // frames per packet
+ data.write_u32le(0).unwrap(); // extra headers
+ data.write_u32le(0).unwrap(); // reserved 1
+ data.write_u32le(0).unwrap(); // reserved 2
+
+ assert_eq!(data.position() as usize, header_size);
+
+ data.into_inner()
+ };
+ let header = gst::Buffer::from_mut_slice(header);
+
+ let comment = {
+ let comment_size = 4 + 7 /* nothing */ + 4 + 1;
+ let mut data = Cursor::new(Vec::with_capacity(comment_size));
+ data.write_u32le(7).unwrap(); // length of "nothing"
+ data.write_all(b"nothing").unwrap(); // "vendor" string
+ data.write_u32le(0).unwrap(); // number of elements
+ data.write_u8(1).unwrap();
+
+ assert_eq!(data.position() as usize, comment_size);
+
+ data.into_inner()
+ };
+ let comment = gst::Buffer::from_mut_slice(comment);
+
+ Some(
+ gst::Caps::builder("audio/x-speex")
+ .field("streamheader", gst::Array::new([header, comment]))
+ .build(),
+ )
+ }
+ flavors::SoundFormat::DEVICE_SPECIFIC => {
+ // Nobody knows
+ None
+ }
+ };
+
+ if self.rate != 0 {
+ if let Some(ref mut caps) = caps.as_mut() {
+ caps.get_mut()
+ .unwrap()
+ .set_simple(&[("rate", &(self.rate as i32))])
+ }
+ }
+ if self.channels != 0 {
+ if let Some(ref mut caps) = caps.as_mut() {
+ caps.get_mut()
+ .unwrap()
+ .set_simple(&[("channels", &(self.channels as i32))])
+ }
+ }
+
+ caps
+ }
+}
+
+// Ignores bitrate
+impl PartialEq for VideoFormat {
+ fn eq(&self, other: &Self) -> bool {
+ self.format.eq(&other.format)
+ && self.width.eq(&other.width)
+ && self.height.eq(&other.height)
+ && self.pixel_aspect_ratio.eq(&other.pixel_aspect_ratio)
+ && self.framerate.eq(&other.framerate)
+ && self.avc_sequence_header.eq(&other.avc_sequence_header)
+ }
+}
+
+impl VideoFormat {
+ fn new(
+ data_header: &flavors::VideoDataHeader,
+ metadata: &Option<Metadata>,
+ avc_sequence_header: &Option<gst::Buffer>,
+ ) -> VideoFormat {
+ VideoFormat {
+ format: data_header.codec_id,
+ width: metadata.as_ref().and_then(|m| m.video_width),
+ height: metadata.as_ref().and_then(|m| m.video_height),
+ pixel_aspect_ratio: metadata.as_ref().and_then(|m| m.video_pixel_aspect_ratio),
+ framerate: metadata.as_ref().and_then(|m| m.video_framerate),
+ bitrate: metadata.as_ref().and_then(|m| m.video_bitrate),
+ avc_sequence_header: avc_sequence_header.clone(),
+ }
+ }
+
+ #[allow(clippy::useless_let_if_seq)]
+ fn update_with_metadata(&mut self, metadata: &Metadata) -> bool {
+ let mut changed = false;
+
+ if self.width != metadata.video_width {
+ self.width = metadata.video_width;
+ changed = true;
+ }
+
+ if self.height != metadata.video_height {
+ self.height = metadata.video_height;
+ changed = true;
+ }
+
+ if self.pixel_aspect_ratio != metadata.video_pixel_aspect_ratio {
+ self.pixel_aspect_ratio = metadata.video_pixel_aspect_ratio;
+ changed = true;
+ }
+
+ if self.framerate != metadata.video_framerate {
+ self.framerate = metadata.video_framerate;
+ changed = true;
+ }
+
+ if self.bitrate != metadata.video_bitrate {
+ self.bitrate = metadata.video_bitrate;
+ changed = true;
+ }
+
+ changed
+ }
+
+ fn to_caps(&self) -> Option<gst::Caps> {
+ let mut caps = match self.format {
+ flavors::CodecId::SORENSON_H263 => Some(
+ gst::Caps::builder("video/x-flash-video")
+ .field("flvversion", 1i32)
+ .build(),
+ ),
+ flavors::CodecId::SCREEN => Some(gst::Caps::builder("video/x-flash-screen").build()),
+ flavors::CodecId::VP6 => Some(gst::Caps::builder("video/x-vp6-flash").build()),
+ flavors::CodecId::VP6A => Some(gst::Caps::builder("video/x-vp6-flash-alpha").build()),
+ flavors::CodecId::SCREEN2 => Some(gst::Caps::builder("video/x-flash-screen2").build()),
+ flavors::CodecId::H264 => self.avc_sequence_header.as_ref().map(|header| {
+ gst::Caps::builder("video/x-h264")
+ .field("stream-format", "avc")
+ .field("codec_data", &header)
+ .build()
+ }),
+ flavors::CodecId::H263 => Some(gst::Caps::builder("video/x-h263").build()),
+ flavors::CodecId::MPEG4Part2 => Some(
+ gst::Caps::builder("video/mpeg")
+ .field("mpegversion", 4i32)
+ .field("systemstream", false)
+ .build(),
+ ),
+ flavors::CodecId::JPEG => {
+ // Unused according to spec
+ None
+ }
+ };
+
+ if let (Some(width), Some(height)) = (self.width, self.height) {
+ if let Some(ref mut caps) = caps.as_mut() {
+ caps.get_mut()
+ .unwrap()
+ .set_simple(&[("width", &(width as i32)), ("height", &(height as i32))])
+ }
+ }
+
+ if let Some(par) = self.pixel_aspect_ratio {
+ if *par.numer() != 0 && par.numer() != par.denom() {
+ if let Some(ref mut caps) = caps.as_mut() {
+ caps.get_mut().unwrap().set_simple(&[(
+ "pixel-aspect-ratio",
+ &gst::Fraction::new(*par.numer(), *par.denom()),
+ )])
+ }
+ }
+ }
+
+ if let Some(fps) = self.framerate {
+ if *fps.numer() != 0 {
+ if let Some(ref mut caps) = caps.as_mut() {
+ caps.get_mut().unwrap().set_simple(&[(
+ "framerate",
+ &gst::Fraction::new(*fps.numer(), *fps.denom()),
+ )])
+ }
+ }
+ }
+
+ caps
+ }
+}
+
+impl Metadata {
+ fn new(script_data: &flavors::ScriptData) -> Metadata {
+ assert_eq!(script_data.name, "onMetaData");
+
+ let mut metadata = Metadata::default();
+
+ let args = match script_data.arguments {
+ flavors::ScriptDataValue::Object(ref objects)
+ | flavors::ScriptDataValue::ECMAArray(ref objects) => objects,
+ _ => return metadata,
+ };
+
+ let mut par_n = None;
+ let mut par_d = None;
+
+ for arg in args {
+ match (arg.name, &arg.data) {
+ ("duration", &flavors::ScriptDataValue::Number(duration)) => {
+ metadata.duration =
+ Some(((duration * 1000.0 * 1000.0 * 1000.0) as u64).nseconds());
+ }
+ ("creationdate", &flavors::ScriptDataValue::String(date)) => {
+ metadata.creation_date = Some(String::from(date));
+ }
+ ("creator", &flavors::ScriptDataValue::String(creator)) => {
+ metadata.creator = Some(String::from(creator));
+ }
+ ("title", &flavors::ScriptDataValue::String(title)) => {
+ metadata.title = Some(String::from(title));
+ }
+ ("metadatacreator", &flavors::ScriptDataValue::String(creator)) => {
+ metadata.metadata_creator = Some(String::from(creator));
+ }
+ ("audiodatarate", &flavors::ScriptDataValue::Number(datarate)) => {
+ metadata.audio_bitrate = Some((datarate * 1024.0) as u32);
+ }
+ ("width", &flavors::ScriptDataValue::Number(width)) => {
+ metadata.video_width = Some(width as u32);
+ }
+ ("height", &flavors::ScriptDataValue::Number(height)) => {
+ metadata.video_height = Some(height as u32);
+ }
+ ("framerate", &flavors::ScriptDataValue::Number(framerate)) if framerate >= 0.0 => {
+ if let Some(framerate) = Rational32::approximate_float(framerate) {
+ metadata.video_framerate = Some(framerate);
+ }
+ }
+ ("AspectRatioX", &flavors::ScriptDataValue::Number(par_x)) if par_x > 0.0 => {
+ par_n = Some(par_x as i32);
+ }
+ ("AspectRatioY", &flavors::ScriptDataValue::Number(par_y)) if par_y > 0.0 => {
+ par_d = Some(par_y as i32);
+ }
+ ("videodatarate", &flavors::ScriptDataValue::Number(datarate)) => {
+ metadata.video_bitrate = Some((datarate * 1024.0) as u32);
+ }
+ _ => {}
+ }
+ }
+
+ if let (Some(par_n), Some(par_d)) = (par_n, par_d) {
+ metadata.video_pixel_aspect_ratio = Some(Rational32::new(par_n, par_d));
+ }
+
+ metadata
+ }
+}
diff --git a/mux/flavors/src/flvdemux/mod.rs b/mux/flavors/src/flvdemux/mod.rs
new file mode 100644
index 000000000..aa311bdf8
--- /dev/null
+++ b/mux/flavors/src/flvdemux/mod.rs
@@ -0,0 +1,27 @@
+// Copyright (C) 2016-2018 Sebastian Dröge <sebastian@centricular.com>
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+//
+// SPDX-License-Identifier: MIT OR Apache-2.0
+
+use gst::glib;
+use gst::prelude::*;
+
+mod imp;
+
+glib::wrapper! {
+ pub struct FlvDemux(ObjectSubclass<imp::FlvDemux>) @extends gst::Element, gst::Object;
+}
+
+pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ gst::Element::register(
+ Some(plugin),
+ "rsflvdemux",
+ gst::Rank::None,
+ FlvDemux::static_type(),
+ )
+}
diff --git a/mux/flavors/src/lib.rs b/mux/flavors/src/lib.rs
new file mode 100644
index 000000000..7813d92e3
--- /dev/null
+++ b/mux/flavors/src/lib.rs
@@ -0,0 +1,36 @@
+// Copyright (C) 2016-2017 Sebastian Dröge <sebastian@centricular.com>
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+//
+// SPDX-License-Identifier: MIT OR Apache-2.0
+#![allow(clippy::non_send_fields_in_send_ty, unused_doc_comments)]
+
+/**
+ * plugin-rsflv:
+ *
+ * Since: plugins-rs-0.4.0
+ */
+use gst::glib;
+
+mod bytes;
+mod flvdemux;
+
+fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ flvdemux::register(plugin)
+}
+
+gst::plugin_define!(
+ rsflv,
+ env!("CARGO_PKG_DESCRIPTION"),
+ plugin_init,
+ concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")),
+ "MIT/X11",
+ env!("CARGO_PKG_NAME"),
+ env!("CARGO_PKG_NAME"),
+ env!("CARGO_PKG_REPOSITORY"),
+ env!("BUILD_REL_DATE")
+);
diff --git a/mux/fmp4/Cargo.toml b/mux/fmp4/Cargo.toml
new file mode 100644
index 000000000..f72c84a7b
--- /dev/null
+++ b/mux/fmp4/Cargo.toml
@@ -0,0 +1,52 @@
+[package]
+name = "gst-plugin-fmp4"
+version = "0.9.0-alpha.1"
+authors = ["Sebastian Dröge <sebastian@centricular.com>"]
+license = "MPL-2.0"
+description = "GStreamer Fragmented MP4 Plugin"
+repository = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs"
+edition = "2021"
+rust-version = "1.63"
+
+[dependencies]
+anyhow = "1"
+gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
+gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
+gst-audio = { package = "gstreamer-audio", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
+gst-video = { package = "gstreamer-video", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
+once_cell = "1.0"
+uuid = { version = "1", features = ["v4"] }
+
+[lib]
+name = "gstfmp4"
+crate-type = ["cdylib", "rlib"]
+path = "src/lib.rs"
+
+[dev-dependencies]
+gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_18"] }
+gst-check = { package = "gstreamer-check", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_18"] }
+gst-pbutils = { package = "gstreamer-pbutils", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] }
+m3u8-rs = "5.0"
+chrono = "0.4"
+
+[build-dependencies]
+gst-plugin-version-helper = { path="../../version-helper" }
+
+[features]
+default = ["v1_18"]
+static = []
+capi = []
+v1_18 = ["gst-video/v1_18"]
+
+[package.metadata.capi]
+min_version = "0.8.0"
+
+[package.metadata.capi.header]
+enabled = false
+
+[package.metadata.capi.library]
+install_subdir = "gstreamer-1.0"
+versioning = false
+
+[package.metadata.capi.pkg_config]
+requires_private = "gstreamer-1.0, gstreamer-base-1.0, gstreamer-audio-1.0, gstreamer-video-1.0, gobject-2.0, glib-2.0, gmodule-2.0"
diff --git a/mux/fmp4/LICENSE b/mux/fmp4/LICENSE
new file mode 120000
index 000000000..eb5d24fe9
--- /dev/null
+++ b/mux/fmp4/LICENSE
@@ -0,0 +1 @@
+../../LICENSE-MPL-2.0 \ No newline at end of file
diff --git a/mux/fmp4/build.rs b/mux/fmp4/build.rs
new file mode 100644
index 000000000..cda12e57e
--- /dev/null
+++ b/mux/fmp4/build.rs
@@ -0,0 +1,3 @@
+fn main() {
+ gst_plugin_version_helper::info()
+}
diff --git a/mux/fmp4/examples/dash_vod.rs b/mux/fmp4/examples/dash_vod.rs
new file mode 100644
index 000000000..eb291d6eb
--- /dev/null
+++ b/mux/fmp4/examples/dash_vod.rs
@@ -0,0 +1,266 @@
+// Copyright (C) 2021 Sebastian Dröge <sebastian@centricular.com>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
+// If a copy of the MPL was not distributed with this file, You can obtain one at
+// <https://mozilla.org/MPL/2.0/>.
+//
+// SPDX-License-Identifier: MPL-2.0
+
+// This creates a VoD DASH manifest based on the output of `cmafmux`. The media header
+// ("initialization segment") is written into a separate file as the segments, and each segment is
+// its own file too.
+//
+// All segments that are created are exactly 10s, expect for the last one which is only 3.333s.
+
+use gst::prelude::*;
+
+use std::fmt::Write;
+use std::path::PathBuf;
+use std::sync::{Arc, Mutex};
+
+use anyhow::Error;
+
+struct Segment {
+ start_time: gst::ClockTime,
+ duration: gst::ClockTime,
+}
+
+struct State {
+ start_time: Option<gst::ClockTime>,
+ end_time: Option<gst::ClockTime>,
+ segments: Vec<Segment>,
+ path: PathBuf,
+}
+
+fn main() -> Result<(), Error> {
+ gst::init()?;
+
+ gstfmp4::plugin_register_static()?;
+
+ let state = Arc::new(Mutex::new(State {
+ start_time: None,
+ end_time: None,
+ segments: Vec::new(),
+ path: PathBuf::from("dash_stream"),
+ }));
+
+ let pipeline = gst::parse_launch("videotestsrc num-buffers=2500 ! timecodestamper ! video/x-raw,format=I420,width=1280,height=720,framerate=30/1 ! timeoverlay ! x264enc bframes=0 bitrate=2048 ! video/x-h264,profile=main ! cmafmux fragment-duration=10000000000 header-update-mode=update write-mehd=true ! appsink name=sink").unwrap().downcast::<gst::Pipeline>().unwrap();
+
+ let sink = pipeline
+ .by_name("sink")
+ .unwrap()
+ .dynamic_cast::<gst_app::AppSink>()
+ .unwrap();
+ sink.set_buffer_list(true);
+
+ let state_clone = state.clone();
+ sink.set_callbacks(
+ gst_app::AppSinkCallbacks::builder()
+ .new_sample(move |sink| {
+ let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
+ let mut state = state.lock().unwrap();
+
+ // The muxer only outputs non-empty buffer lists
+ let mut buffer_list = sample.buffer_list_owned().expect("no buffer list");
+ assert!(!buffer_list.is_empty());
+
+ let mut first = buffer_list.get(0).unwrap();
+
+ // Each list contains a full segment, i.e. does not start with a DELTA_UNIT
+ assert!(!first.flags().contains(gst::BufferFlags::DELTA_UNIT));
+
+ // If the buffer has the DISCONT and HEADER flag set then it contains the media
+ // header, i.e. the `ftyp`, `moov` and other media boxes.
+ //
+ // This might be the initial header or the updated header at the end of the stream.
+ if first.flags().contains(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER) {
+ let mut path = state.path.clone();
+ std::fs::create_dir_all(&path).expect("failed to create directory");
+ path.push("init.cmfi");
+
+ println!("writing header to {}", path.display());
+ let map = first.map_readable().unwrap();
+ std::fs::write(path, &map).expect("failed to write header");
+ drop(map);
+
+ // Remove the header from the buffer list
+ buffer_list.make_mut().remove(0, 1);
+
+ // If the list is now empty then it only contained the media header and nothing
+ // else.
+ if buffer_list.is_empty() {
+ return Ok(gst::FlowSuccess::Ok);
+ }
+
+ // Otherwise get the next buffer and continue working with that.
+ first = buffer_list.get(0).unwrap();
+ }
+
+ // If the buffer only has the HEADER flag set then this is a segment header that is
+ // followed by one or more actual media buffers.
+ assert!(first.flags().contains(gst::BufferFlags::HEADER));
+
+ let segment = sample.segment().expect("no segment")
+ .downcast_ref::<gst::ClockTime>().expect("no time segment");
+
+ // Initialize the start time with the first PTS we observed. This will be used
+ // later for calculating the duration of the whole media for the DASH manifest.
+ //
+ // The PTS of the segment header is equivalent to the earliest PTS of the whole
+ // segment.
+ let pts = segment.to_running_time(first.pts().unwrap()).expect("can't get running time");
+ if state.start_time.is_none() {
+ state.start_time = Some(pts);
+ }
+
+ // The metadata of the first media buffer is duplicated to the segment header.
+ // Based on this we can know the timecode of the first frame in this segment.
+ let meta = first.meta::<gst_video::VideoTimeCodeMeta>().expect("no timecode meta");
+
+ let mut path = state.path.clone();
+ path.push(format!("segment_{}.cmfv", state.segments.len() + 1));
+ println!("writing segment with timecode {} to {}", meta.tc(), path.display());
+
+ // Calculate the end time at this point. The duration of the segment header is set
+ // to the whole duration of this segment.
+ let duration = first.duration().unwrap();
+ let end_time = first.pts().unwrap() + first.duration().unwrap();
+ state.end_time = Some(segment.to_running_time(end_time).expect("can't get running time"));
+
+ let mut file = std::fs::File::create(path).expect("failed to open fragment");
+ for buffer in &*buffer_list {
+ use std::io::prelude::*;
+
+ let map = buffer.map_readable().unwrap();
+ file.write_all(&map).expect("failed to write fragment");
+ }
+
+ state.segments.push(Segment {
+ start_time: pts,
+ duration,
+ });
+
+ Ok(gst::FlowSuccess::Ok)
+ })
+ .eos(move |_sink| {
+ let state = state_clone.lock().unwrap();
+
+ // Now write the manifest
+ let mut path = state.path.clone();
+ path.push("manifest.mpd");
+
+ println!("writing manifest to {}", path.display());
+
+ let duration = state.end_time.opt_checked_sub(state.start_time).ok().flatten().unwrap().mseconds() as f64 / 1000.0;
+
+ // Write the whole segment timeline out here, compressing multiple segments with
+ // the same duration to a repeated segment.
+ let mut segment_timeline = String::new();
+ let mut write_segment = |start: gst::ClockTime, duration: gst::ClockTime, repeat: usize| {
+ if repeat > 0 {
+ writeln!(
+ &mut segment_timeline,
+ " <S t=\"{time}\" d=\"{duration}\" r=\"{repeat}\" />",
+ time = start.mseconds(),
+ duration = duration.mseconds(),
+ repeat = repeat
+ ).unwrap();
+ } else {
+ writeln!(
+ &mut segment_timeline,
+ " <S t=\"{time}\" d=\"{duration}\" />",
+ time = start.mseconds(),
+ duration = duration.mseconds()
+ ).unwrap();
+ }
+ };
+
+ let mut start = None;
+ let mut num_segments = 0;
+ let mut last_duration = None;
+ for segment in &state.segments {
+ if start.is_none() {
+ start = Some(segment.start_time);
+ }
+ if last_duration.is_none() {
+ last_duration = Some(segment.duration);
+ }
+
+ // If the duration of this segment is different from the previous one then we
+ // have to write out the segment now.
+ if last_duration != Some(segment.duration) {
+ write_segment(start.unwrap(), last_duration.unwrap(), num_segments - 1);
+ start = Some(segment.start_time);
+ last_duration = Some(segment.duration);
+ num_segments = 1;
+ } else {
+ num_segments += 1;
+ }
+ }
+
+ // Write the last segment if any
+ if num_segments > 0 {
+ write_segment(start.unwrap(), last_duration.unwrap(), num_segments - 1);
+ }
+
+ let manifest = format!(r###"<?xml version="1.0" encoding="UTF-8"?>
+<MPD
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="urn:mpeg:dash:schema:mpd:2011"
+ xsi:schemaLocation="urn:mpeg:dash:schema:mpd:2011 DASH-MPD.xsd"
+ type="static"
+ mediaPresentationDuration="PT{duration:.3}S"
+ profiles="urn:mpeg:dash:profile:isoff-on-demand:2011">
+ <Period>
+ <AdaptationSet mimeType="video/mp4" codecs="avc1.4d0228" frameRate="30/1" segmentAlignment="true" startWithSAP="1">
+ <Representation id="A" bandwidth="2048000" with="1280" height="720">
+ <SegmentTemplate timescale="1000" initialization="init.cmfi" media="segment_$Number$.cmfv">
+ <SegmentTimeline>
+{segment_timeline} </SegmentTimeline>
+ </SegmentTemplate>
+ </Representation>
+ </AdaptationSet>
+ </Period>
+</MPD>
+"###,
+ duration = duration, segment_timeline = segment_timeline);
+
+ std::fs::write(path, &manifest).expect("failed to write manifest");
+ })
+ .build(),
+ );
+
+ pipeline.set_state(gst::State::Playing)?;
+
+ let bus = pipeline
+ .bus()
+ .expect("Pipeline without bus. Shouldn't happen!");
+
+ for msg in bus.iter_timed(gst::ClockTime::NONE) {
+ use gst::MessageView;
+
+ match msg.view() {
+ MessageView::Eos(..) => {
+ println!("EOS");
+ break;
+ }
+ MessageView::Error(err) => {
+ pipeline.set_state(gst::State::Null)?;
+ eprintln!(
+ "Got error from {}: {} ({})",
+ msg.src()
+ .map(|s| String::from(s.path_string()))
+ .unwrap_or_else(|| "None".into()),
+ err.error(),
+ err.debug().unwrap_or_else(|| "".into()),
+ );
+ break;
+ }
+ _ => (),
+ }
+ }
+
+ pipeline.set_state(gst::State::Null)?;
+
+ Ok(())
+}
diff --git a/mux/fmp4/examples/hls_live.rs b/mux/fmp4/examples/hls_live.rs
new file mode 100644
index 000000000..f1d9ed6d1
--- /dev/null
+++ b/mux/fmp4/examples/hls_live.rs
@@ -0,0 +1,572 @@
+// Copyright (C) 2022 Mathieu Duponchelle <mathieu@centricular.com>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
+// If a copy of the MPL was not distributed with this file, You can obtain one at
+// <https://mozilla.org/MPL/2.0/>.
+//
+// SPDX-License-Identifier: MPL-2.0
+
+// This creates a live HLS stream with one video playlist and two video playlists.
+// Basic trimming is implemented
+
+use gst::prelude::*;
+
+use std::collections::VecDeque;
+use std::path::{Path, PathBuf};
+use std::sync::{Arc, Mutex};
+
+use anyhow::Error;
+use chrono::{DateTime, Duration, Utc};
+use m3u8_rs::{
+ AlternativeMedia, AlternativeMediaType, MasterPlaylist, MediaPlaylist, MediaSegment,
+ VariantStream,
+};
+
+struct State {
+ video_streams: Vec<VideoStream>,
+ audio_streams: Vec<AudioStream>,
+ all_mimes: Vec<String>,
+ path: PathBuf,
+ wrote_manifest: bool,
+}
+
+impl State {
+ fn maybe_write_manifest(&mut self) {
+ if self.wrote_manifest {
+ return;
+ }
+
+ if self.all_mimes.len() < self.video_streams.len() + self.audio_streams.len() {
+ return;
+ }
+
+ let mut all_mimes = self.all_mimes.clone();
+ all_mimes.sort();
+ all_mimes.dedup();
+
+ let playlist = MasterPlaylist {
+ version: Some(7),
+ variants: self
+ .video_streams
+ .iter()
+ .map(|stream| {
+ let mut path = PathBuf::new();
+
+ path.push(&stream.name);
+ path.push("manifest.m3u8");
+
+ VariantStream {
+ uri: path.as_path().display().to_string(),
+ bandwidth: stream.bitrate,
+ codecs: Some(all_mimes.join(",")),
+ resolution: Some(m3u8_rs::Resolution {
+ width: stream.width,
+ height: stream.height,
+ }),
+ audio: Some("audio".to_string()),
+ ..Default::default()
+ }
+ })
+ .collect(),
+ alternatives: self
+ .audio_streams
+ .iter()
+ .map(|stream| {
+ let mut path = PathBuf::new();
+ path.push(&stream.name);
+ path.push("manifest.m3u8");
+
+ AlternativeMedia {
+ media_type: AlternativeMediaType::Audio,
+ uri: Some(path.as_path().display().to_string()),
+ group_id: "audio".to_string(),
+ language: Some(stream.lang.clone()),
+ name: stream.name.clone(),
+ default: stream.default,
+ autoselect: stream.default,
+ channels: Some("2".to_string()),
+ ..Default::default()
+ }
+ })
+ .collect(),
+ independent_segments: true,
+ ..Default::default()
+ };
+
+ println!("Writing master manifest to {}", self.path.display());
+
+ let mut file = std::fs::File::create(&self.path).unwrap();
+ playlist
+ .write_to(&mut file)
+ .expect("Failed to write master playlist");
+
+ self.wrote_manifest = true;
+ }
+}
+
+struct Segment {
+ date_time: DateTime<Utc>,
+ duration: gst::ClockTime,
+ path: String,
+}
+
+struct UnreffedSegment {
+ removal_time: DateTime<Utc>,
+ path: String,
+}
+
+struct StreamState {
+ path: PathBuf,
+ segments: VecDeque<Segment>,
+ trimmed_segments: VecDeque<UnreffedSegment>,
+ start_date_time: Option<DateTime<Utc>>,
+ start_time: Option<gst::ClockTime>,
+ media_sequence: u64,
+ segment_index: u32,
+}
+
+struct VideoStream {
+ name: String,
+ bitrate: u64,
+ width: u64,
+ height: u64,
+}
+
+struct AudioStream {
+ name: String,
+ lang: String,
+ default: bool,
+ wave: String,
+}
+
+fn trim_segments(state: &mut StreamState) {
+ // Arbitrary 5 segments window
+ while state.segments.len() > 5 {
+ let segment = state.segments.pop_front().unwrap();
+
+ state.media_sequence += 1;
+
+ state.trimmed_segments.push_back(UnreffedSegment {
+ // HLS spec mandates that segments are removed from the filesystem no sooner
+ // than the duration of the longest playlist + duration of the segment.
+ // This is 15 seconds (12.5 + 2.5) in our case, we use 20 seconds to be on the
+ // safe side
+ removal_time: segment
+ .date_time
+ .checked_add_signed(Duration::seconds(20))
+ .unwrap(),
+ path: segment.path.clone(),
+ });
+ }
+
+ while let Some(segment) = state.trimmed_segments.front() {
+ if segment.removal_time < state.segments.front().unwrap().date_time {
+ let segment = state.trimmed_segments.pop_front().unwrap();
+
+ let mut path = state.path.clone();
+ path.push(segment.path);
+ println!("Removing {}", path.display());
+ std::fs::remove_file(path).expect("Failed to remove old segment");
+ } else {
+ break;
+ }
+ }
+}
+
+fn update_manifest(state: &mut StreamState) {
+ // Now write the manifest
+ let mut path = state.path.clone();
+ path.push("manifest.m3u8");
+
+ println!("writing manifest to {}", path.display());
+
+ trim_segments(state);
+
+ let playlist = MediaPlaylist {
+ version: Some(7),
+ target_duration: 2.5,
+ media_sequence: state.media_sequence,
+ segments: state
+ .segments
+ .iter()
+ .enumerate()
+ .map(|(idx, segment)| MediaSegment {
+ uri: segment.path.to_string(),
+ duration: (segment.duration.nseconds() as f64
+ / gst::ClockTime::SECOND.nseconds() as f64) as f32,
+ map: Some(m3u8_rs::Map {
+ uri: "init.cmfi".into(),
+ ..Default::default()
+ }),
+ program_date_time: if idx == 0 {
+ Some(segment.date_time.into())
+ } else {
+ None
+ },
+ ..Default::default()
+ })
+ .collect(),
+ end_list: false,
+ playlist_type: None,
+ i_frames_only: false,
+ start: None,
+ independent_segments: true,
+ ..Default::default()
+ };
+
+ let mut file = std::fs::File::create(path).unwrap();
+ playlist
+ .write_to(&mut file)
+ .expect("Failed to write media playlist");
+}
+
+fn setup_appsink(appsink: &gst_app::AppSink, name: &str, path: &Path, is_video: bool) {
+ let mut path: PathBuf = path.into();
+ path.push(name);
+
+ let state = Arc::new(Mutex::new(StreamState {
+ segments: VecDeque::new(),
+ trimmed_segments: VecDeque::new(),
+ path,
+ start_date_time: None,
+ start_time: gst::ClockTime::NONE,
+ media_sequence: 0,
+ segment_index: 0,
+ }));
+
+ appsink.set_callbacks(
+ gst_app::AppSinkCallbacks::builder()
+ .new_sample(move |sink| {
+ let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
+ let mut state = state.lock().unwrap();
+
+ // The muxer only outputs non-empty buffer lists
+ let mut buffer_list = sample.buffer_list_owned().expect("no buffer list");
+ assert!(!buffer_list.is_empty());
+
+ let mut first = buffer_list.get(0).unwrap();
+
+ // Each list contains a full segment, i.e. does not start with a DELTA_UNIT
+ assert!(!first.flags().contains(gst::BufferFlags::DELTA_UNIT));
+
+ // If the buffer has the DISCONT and HEADER flag set then it contains the media
+ // header, i.e. the `ftyp`, `moov` and other media boxes.
+ //
+ // This might be the initial header or the updated header at the end of the stream.
+ if first
+ .flags()
+ .contains(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER)
+ {
+ let mut path = state.path.clone();
+ std::fs::create_dir_all(&path).expect("failed to create directory");
+ path.push("init.cmfi");
+
+ println!("writing header to {}", path.display());
+ let map = first.map_readable().unwrap();
+ std::fs::write(path, &map).expect("failed to write header");
+ drop(map);
+
+ // Remove the header from the buffer list
+ buffer_list.make_mut().remove(0, 1);
+
+ // If the list is now empty then it only contained the media header and nothing
+ // else.
+ if buffer_list.is_empty() {
+ return Ok(gst::FlowSuccess::Ok);
+ }
+
+ // Otherwise get the next buffer and continue working with that.
+ first = buffer_list.get(0).unwrap();
+ }
+
+ // If the buffer only has the HEADER flag set then this is a segment header that is
+ // followed by one or more actual media buffers.
+ assert!(first.flags().contains(gst::BufferFlags::HEADER));
+
+ let mut path = state.path.clone();
+ let basename = format!(
+ "segment_{}.{}",
+ state.segment_index,
+ if is_video { "cmfv" } else { "cmfa" }
+ );
+ state.segment_index += 1;
+ path.push(&basename);
+
+ let segment = sample
+ .segment()
+ .expect("no segment")
+ .downcast_ref::<gst::ClockTime>()
+ .expect("no time segment");
+ let pts = segment
+ .to_running_time(first.pts().unwrap())
+ .expect("can't get running time");
+
+ if state.start_time.is_none() {
+ state.start_time = Some(pts);
+ }
+
+ if state.start_date_time.is_none() {
+ let now_utc = Utc::now();
+ let now_gst = sink.clock().unwrap().time().unwrap();
+ let pts_clock_time = pts + sink.base_time().unwrap();
+
+ let diff = now_gst.checked_sub(pts_clock_time).unwrap();
+ let pts_utc = now_utc
+ .checked_sub_signed(Duration::nanoseconds(diff.nseconds() as i64))
+ .unwrap();
+
+ state.start_date_time = Some(pts_utc);
+ }
+
+ let duration = first.duration().unwrap();
+
+ let mut file = std::fs::File::create(&path).expect("failed to open fragment");
+ for buffer in &*buffer_list {
+ use std::io::prelude::*;
+
+ let map = buffer.map_readable().unwrap();
+ file.write_all(&map).expect("failed to write fragment");
+ }
+
+ let date_time = state
+ .start_date_time
+ .unwrap()
+ .checked_add_signed(Duration::nanoseconds(
+ pts.opt_checked_sub(state.start_time)
+ .unwrap()
+ .unwrap()
+ .nseconds() as i64,
+ ))
+ .unwrap();
+
+ println!(
+ "wrote segment with date time {} to {}",
+ date_time,
+ path.display()
+ );
+
+ state.segments.push_back(Segment {
+ duration,
+ path: basename.to_string(),
+ date_time,
+ });
+
+ update_manifest(&mut state);
+
+ Ok(gst::FlowSuccess::Ok)
+ })
+ .eos(move |_sink| {
+ unreachable!();
+ })
+ .build(),
+ );
+}
+
+fn probe_encoder(state: Arc<Mutex<State>>, enc: gst::Element) {
+ enc.static_pad("src").unwrap().add_probe(
+ gst::PadProbeType::EVENT_DOWNSTREAM,
+ move |_pad, info| match info.data {
+ Some(gst::PadProbeData::Event(ref ev)) => match ev.view() {
+ gst::EventView::Caps(e) => {
+ let mime = gst_pbutils::codec_utils_caps_get_mime_codec(&e.caps().to_owned());
+
+ let mut state = state.lock().unwrap();
+ state.all_mimes.push(mime.unwrap().into());
+ state.maybe_write_manifest();
+ gst::PadProbeReturn::Remove
+ }
+ _ => gst::PadProbeReturn::Ok,
+ },
+ _ => gst::PadProbeReturn::Ok,
+ },
+ );
+}
+
+impl VideoStream {
+ fn setup(
+ &self,
+ state: Arc<Mutex<State>>,
+ pipeline: &gst::Pipeline,
+ path: &Path,
+ ) -> Result<(), Error> {
+ let src = gst::ElementFactory::make("videotestsrc")
+ .property("is-live", true)
+ .build()?;
+
+ let raw_capsfilter = gst::ElementFactory::make("capsfilter")
+ .property(
+ "caps",
+ gst_video::VideoCapsBuilder::new()
+ .format(gst_video::VideoFormat::I420)
+ .width(self.width as i32)
+ .height(self.height as i32)
+ .framerate(30.into())
+ .build(),
+ )
+ .build()?;
+ let timeoverlay = gst::ElementFactory::make("timeoverlay").build()?;
+ let enc = gst::ElementFactory::make("x264enc")
+ .property("bframes", 0u32)
+ .property("bitrate", self.bitrate as u32 / 1000u32)
+ .property_from_str("tune", "zerolatency")
+ .build()?;
+ let h264_capsfilter = gst::ElementFactory::make("capsfilter")
+ .property(
+ "caps",
+ gst::Caps::builder("video/x-h264")
+ .field("profile", "main")
+ .build(),
+ )
+ .build()?;
+ let mux = gst::ElementFactory::make("cmafmux")
+ .property("fragment-duration", 2500.mseconds())
+ .property_from_str("header-update-mode", "update")
+ .property("write-mehd", true)
+ .build()?;
+ let appsink = gst_app::AppSink::builder().buffer_list(true).build();
+
+ pipeline.add_many(&[
+ &src,
+ &raw_capsfilter,
+ &timeoverlay,
+ &enc,
+ &h264_capsfilter,
+ &mux,
+ appsink.upcast_ref(),
+ ])?;
+
+ gst::Element::link_many(&[
+ &src,
+ &raw_capsfilter,
+ &timeoverlay,
+ &enc,
+ &h264_capsfilter,
+ &mux,
+ appsink.upcast_ref(),
+ ])?;
+
+ probe_encoder(state, enc);
+
+ setup_appsink(&appsink, &self.name, path, true);
+
+ Ok(())
+ }
+}
+
+impl AudioStream {
+ fn setup(
+ &self,
+ state: Arc<Mutex<State>>,
+ pipeline: &gst::Pipeline,
+ path: &Path,
+ ) -> Result<(), Error> {
+ let src = gst::ElementFactory::make("audiotestsrc")
+ .property("is-live", true)
+ .property_from_str("wave", &self.wave)
+ .property("fragment-duration", 2500.mseconds())
+ .build()?;
+ let enc = gst::ElementFactory::make("avenc_aac").build()?;
+ let mux = gst::ElementFactory::make("cmafmux")
+ .property_from_str("header-update-mode", "update")
+ .property("write-mehd", true)
+ .build()?;
+ let appsink = gst_app::AppSink::builder().buffer_list(true).build();
+
+ pipeline.add_many(&[&src, &enc, &mux, appsink.upcast_ref()])?;
+
+ gst::Element::link_many(&[&src, &enc, &mux, appsink.upcast_ref()])?;
+
+ probe_encoder(state, enc);
+
+ setup_appsink(&appsink, &self.name, path, false);
+
+ Ok(())
+ }
+}
+
+fn main() -> Result<(), Error> {
+ gst::init()?;
+
+ gstfmp4::plugin_register_static()?;
+
+ let path = PathBuf::from("hls_live_stream");
+
+ let pipeline = gst::Pipeline::default();
+
+ std::fs::create_dir_all(&path).expect("failed to create directory");
+
+ let mut manifest_path = path.clone();
+ manifest_path.push("manifest.m3u8");
+
+ let state = Arc::new(Mutex::new(State {
+ video_streams: vec![VideoStream {
+ name: "video_0".to_string(),
+ bitrate: 2_048_000,
+ width: 1280,
+ height: 720,
+ }],
+ audio_streams: vec![
+ AudioStream {
+ name: "audio_0".to_string(),
+ lang: "eng".to_string(),
+ default: true,
+ wave: "sine".to_string(),
+ },
+ AudioStream {
+ name: "audio_1".to_string(),
+ lang: "fre".to_string(),
+ default: false,
+ wave: "white-noise".to_string(),
+ },
+ ],
+ all_mimes: vec![],
+ path: manifest_path.clone(),
+ wrote_manifest: false,
+ }));
+
+ {
+ let state_lock = state.lock().unwrap();
+
+ for stream in &state_lock.video_streams {
+ stream.setup(state.clone(), &pipeline, &path)?;
+ }
+
+ for stream in &state_lock.audio_streams {
+ stream.setup(state.clone(), &pipeline, &path)?;
+ }
+ }
+
+ pipeline.set_state(gst::State::Playing)?;
+
+ let bus = pipeline
+ .bus()
+ .expect("Pipeline without bus. Shouldn't happen!");
+
+ for msg in bus.iter_timed(gst::ClockTime::NONE) {
+ use gst::MessageView;
+
+ match msg.view() {
+ MessageView::Eos(..) => {
+ println!("EOS");
+ break;
+ }
+ MessageView::Error(err) => {
+ pipeline.set_state(gst::State::Null)?;
+ eprintln!(
+ "Got error from {}: {} ({})",
+ msg.src()
+ .map(|s| String::from(s.path_string()))
+ .unwrap_or_else(|| "None".into()),
+ err.error(),
+ err.debug().unwrap_or_else(|| "".into()),
+ );
+ break;
+ }
+ _ => (),
+ }
+ }
+
+ pipeline.set_state(gst::State::Null)?;
+
+ Ok(())
+}
diff --git a/mux/fmp4/examples/hls_vod.rs b/mux/fmp4/examples/hls_vod.rs
new file mode 100644
index 000000000..4a2533cd1
--- /dev/null
+++ b/mux/fmp4/examples/hls_vod.rs
@@ -0,0 +1,472 @@
+// Copyright (C) 2022 Mathieu Duponchelle <mathieu@centricular.com>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
+// If a copy of the MPL was not distributed with this file, You can obtain one at
+// <https://mozilla.org/MPL/2.0/>.
+//
+// SPDX-License-Identifier: MPL-2.0
+
+// This creates a 10 second VOD HLS stream with one video playlist and two audio
+// playlists. Each segment is 2.5 second long.
+
+use gst::prelude::*;
+
+use std::path::{Path, PathBuf};
+use std::sync::{Arc, Mutex};
+
+use anyhow::Error;
+
+use m3u8_rs::{
+ AlternativeMedia, AlternativeMediaType, MasterPlaylist, MediaPlaylist, MediaPlaylistType,
+ MediaSegment, VariantStream,
+};
+
+struct Segment {
+ duration: gst::ClockTime,
+ path: String,
+}
+
+struct StreamState {
+ path: PathBuf,
+ segments: Vec<Segment>,
+}
+
+struct VideoStream {
+ name: String,
+ bitrate: u64,
+ width: u64,
+ height: u64,
+}
+
+struct AudioStream {
+ name: String,
+ lang: String,
+ default: bool,
+ wave: String,
+}
+
+struct State {
+ video_streams: Vec<VideoStream>,
+ audio_streams: Vec<AudioStream>,
+ all_mimes: Vec<String>,
+ path: PathBuf,
+ wrote_manifest: bool,
+}
+
+impl State {
+ fn maybe_write_manifest(&mut self) {
+ if self.wrote_manifest {
+ return;
+ }
+
+ if self.all_mimes.len() < self.video_streams.len() + self.audio_streams.len() {
+ return;
+ }
+
+ let mut all_mimes = self.all_mimes.clone();
+ all_mimes.sort();
+ all_mimes.dedup();
+
+ let playlist = MasterPlaylist {
+ version: Some(7),
+ variants: self
+ .video_streams
+ .iter()
+ .map(|stream| {
+ let mut path = PathBuf::new();
+
+ path.push(&stream.name);
+ path.push("manifest.m3u8");
+
+ VariantStream {
+ uri: path.as_path().display().to_string(),
+ bandwidth: stream.bitrate,
+ codecs: Some(all_mimes.join(",")),
+ resolution: Some(m3u8_rs::Resolution {
+ width: stream.width,
+ height: stream.height,
+ }),
+ audio: Some("audio".to_string()),
+ ..Default::default()
+ }
+ })
+ .collect(),
+ alternatives: self
+ .audio_streams
+ .iter()
+ .map(|stream| {
+ let mut path = PathBuf::new();
+ path.push(&stream.name);
+ path.push("manifest.m3u8");
+
+ AlternativeMedia {
+ media_type: AlternativeMediaType::Audio,
+ uri: Some(path.as_path().display().to_string()),
+ group_id: "audio".to_string(),
+ language: Some(stream.lang.clone()),
+ name: stream.name.clone(),
+ default: stream.default,
+ autoselect: stream.default,
+ channels: Some("2".to_string()),
+ ..Default::default()
+ }
+ })
+ .collect(),
+ independent_segments: true,
+ ..Default::default()
+ };
+
+ println!("Writing master manifest to {}", self.path.display());
+
+ let mut file = std::fs::File::create(&self.path).unwrap();
+ playlist
+ .write_to(&mut file)
+ .expect("Failed to write master playlist");
+
+ self.wrote_manifest = true;
+ }
+}
+
+fn setup_appsink(appsink: &gst_app::AppSink, name: &str, path: &Path, is_video: bool) {
+ let mut path: PathBuf = path.into();
+ path.push(name);
+
+ let state = Arc::new(Mutex::new(StreamState {
+ segments: Vec::new(),
+ path,
+ }));
+
+ let state_clone = state.clone();
+ appsink.set_callbacks(
+ gst_app::AppSinkCallbacks::builder()
+ .new_sample(move |sink| {
+ let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
+ let mut state = state.lock().unwrap();
+
+ // The muxer only outputs non-empty buffer lists
+ let mut buffer_list = sample.buffer_list_owned().expect("no buffer list");
+ assert!(!buffer_list.is_empty());
+
+ let mut first = buffer_list.get(0).unwrap();
+
+ // Each list contains a full segment, i.e. does not start with a DELTA_UNIT
+ assert!(!first.flags().contains(gst::BufferFlags::DELTA_UNIT));
+
+ // If the buffer has the DISCONT and HEADER flag set then it contains the media
+ // header, i.e. the `ftyp`, `moov` and other media boxes.
+ //
+ // This might be the initial header or the updated header at the end of the stream.
+ if first
+ .flags()
+ .contains(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER)
+ {
+ let mut path = state.path.clone();
+ std::fs::create_dir_all(&path).expect("failed to create directory");
+ path.push("init.cmfi");
+
+ println!("writing header to {}", path.display());
+ let map = first.map_readable().unwrap();
+ std::fs::write(path, &map).expect("failed to write header");
+ drop(map);
+
+ // Remove the header from the buffer list
+ buffer_list.make_mut().remove(0, 1);
+
+ // If the list is now empty then it only contained the media header and nothing
+ // else.
+ if buffer_list.is_empty() {
+ return Ok(gst::FlowSuccess::Ok);
+ }
+
+ // Otherwise get the next buffer and continue working with that.
+ first = buffer_list.get(0).unwrap();
+ }
+
+ // If the buffer only has the HEADER flag set then this is a segment header that is
+ // followed by one or more actual media buffers.
+ assert!(first.flags().contains(gst::BufferFlags::HEADER));
+
+ let mut path = state.path.clone();
+ let basename = format!(
+ "segment_{}.{}",
+ state.segments.len() + 1,
+ if is_video { "cmfv" } else { "cmfa" }
+ );
+ path.push(&basename);
+ println!("writing segment to {}", path.display());
+
+ let duration = first.duration().unwrap();
+
+ let mut file = std::fs::File::create(path).expect("failed to open fragment");
+ for buffer in &*buffer_list {
+ use std::io::prelude::*;
+
+ let map = buffer.map_readable().unwrap();
+ file.write_all(&map).expect("failed to write fragment");
+ }
+
+ state.segments.push(Segment {
+ duration,
+ path: basename.to_string(),
+ });
+
+ Ok(gst::FlowSuccess::Ok)
+ })
+ .eos(move |_sink| {
+ let state = state_clone.lock().unwrap();
+
+ // Now write the manifest
+ let mut path = state.path.clone();
+ path.push("manifest.m3u8");
+
+ println!("writing manifest to {}", path.display());
+
+ let playlist = MediaPlaylist {
+ version: Some(7),
+ target_duration: 2.5,
+ media_sequence: 0,
+ segments: state
+ .segments
+ .iter()
+ .map(|segment| MediaSegment {
+ uri: segment.path.to_string(),
+ duration: (segment.duration.nseconds() as f64
+ / gst::ClockTime::SECOND.nseconds() as f64)
+ as f32,
+ map: Some(m3u8_rs::Map {
+ uri: "init.cmfi".into(),
+ ..Default::default()
+ }),
+ ..Default::default()
+ })
+ .collect(),
+ end_list: true,
+ playlist_type: Some(MediaPlaylistType::Vod),
+ i_frames_only: false,
+ start: None,
+ independent_segments: true,
+ ..Default::default()
+ };
+
+ let mut file = std::fs::File::create(path).unwrap();
+ playlist
+ .write_to(&mut file)
+ .expect("Failed to write media playlist");
+ })
+ .build(),
+ );
+}
+
+fn probe_encoder(state: Arc<Mutex<State>>, enc: gst::Element) {
+ enc.static_pad("src").unwrap().add_probe(
+ gst::PadProbeType::EVENT_DOWNSTREAM,
+ move |_pad, info| match info.data {
+ Some(gst::PadProbeData::Event(ref ev)) => match ev.view() {
+ gst::EventView::Caps(e) => {
+ let mime = gst_pbutils::codec_utils_caps_get_mime_codec(&e.caps().to_owned());
+
+ let mut state = state.lock().unwrap();
+ state.all_mimes.push(mime.unwrap().into());
+ state.maybe_write_manifest();
+ gst::PadProbeReturn::Remove
+ }
+ _ => gst::PadProbeReturn::Ok,
+ },
+ _ => gst::PadProbeReturn::Ok,
+ },
+ );
+}
+
+impl VideoStream {
+ fn setup(
+ &self,
+ state: Arc<Mutex<State>>,
+ pipeline: &gst::Pipeline,
+ path: &Path,
+ ) -> Result<(), Error> {
+ let src = gst::ElementFactory::make("videotestsrc")
+ .property("num-buffers", 300)
+ .build()?;
+ let raw_capsfilter = gst::ElementFactory::make("capsfilter")
+ .property(
+ "caps",
+ gst_video::VideoCapsBuilder::new()
+ .format(gst_video::VideoFormat::I420)
+ .width(self.width as i32)
+ .height(self.height as i32)
+ .framerate(30.into())
+ .build(),
+ )
+ .build()?;
+ let timeoverlay = gst::ElementFactory::make("timeoverlay").build()?;
+ let enc = gst::ElementFactory::make("x264enc")
+ .property("bframes", 0u32)
+ .property("bitrate", self.bitrate as u32 / 1000u32)
+ .build()?;
+ let h264_capsfilter = gst::ElementFactory::make("capsfilter")
+ .property(
+ "caps",
+ gst::Caps::builder("video/x-h264")
+ .field("profile", "main")
+ .build(),
+ )
+ .build()?;
+ let mux = gst::ElementFactory::make("cmafmux")
+ .property("fragment-duration", 2500.mseconds())
+ .property_from_str("header-update-mode", "update")
+ .property("write-mehd", true)
+ .build()?;
+ let appsink = gst_app::AppSink::builder().buffer_list(true).build();
+
+ pipeline.add_many(&[
+ &src,
+ &raw_capsfilter,
+ &timeoverlay,
+ &enc,
+ &h264_capsfilter,
+ &mux,
+ appsink.upcast_ref(),
+ ])?;
+
+ gst::Element::link_many(&[
+ &src,
+ &raw_capsfilter,
+ &timeoverlay,
+ &enc,
+ &h264_capsfilter,
+ &mux,
+ appsink.upcast_ref(),
+ ])?;
+
+ probe_encoder(state, enc);
+
+ setup_appsink(&appsink, &self.name, path, true);
+
+ Ok(())
+ }
+}
+
+impl AudioStream {
+ fn setup(
+ &self,
+ state: Arc<Mutex<State>>,
+ pipeline: &gst::Pipeline,
+ path: &Path,
+ ) -> Result<(), Error> {
+ let src = gst::ElementFactory::make("audiotestsrc")
+ .property("num-buffers", 100)
+ .property("samplesperbuffer", 4410)
+ .property_from_str("wave", &self.wave)
+ .build()?;
+ let raw_capsfilter = gst::ElementFactory::make("capsfilter")
+ .property(
+ "caps",
+ gst_audio::AudioCapsBuilder::new().rate(44100).build(),
+ )
+ .build()?;
+ let enc = gst::ElementFactory::make("avenc_aac").build()?;
+ let mux = gst::ElementFactory::make("cmafmux")
+ .property("fragment-duration", 2500.mseconds())
+ .property_from_str("header-update-mode", "update")
+ .property("write-mehd", true)
+ .build()?;
+ let appsink = gst_app::AppSink::builder().buffer_list(true).build();
+
+ pipeline.add_many(&[&src, &raw_capsfilter, &enc, &mux, appsink.upcast_ref()])?;
+
+ gst::Element::link_many(&[&src, &raw_capsfilter, &enc, &mux, appsink.upcast_ref()])?;
+
+ probe_encoder(state, enc);
+
+ setup_appsink(&appsink, &self.name, path, false);
+
+ Ok(())
+ }
+}
+
+fn main() -> Result<(), Error> {
+ gst::init()?;
+
+ gstfmp4::plugin_register_static()?;
+
+ let path = PathBuf::from("hls_vod_stream");
+
+ let pipeline = gst::Pipeline::default();
+
+ std::fs::create_dir_all(&path).expect("failed to create directory");
+
+ let mut manifest_path = path.clone();
+ manifest_path.push("manifest.m3u8");
+
+ let state = Arc::new(Mutex::new(State {
+ video_streams: vec![VideoStream {
+ name: "video_0".to_string(),
+ bitrate: 2_048_000,
+ width: 1280,
+ height: 720,
+ }],
+ audio_streams: vec![
+ AudioStream {
+ name: "audio_0".to_string(),
+ lang: "eng".to_string(),
+ default: true,
+ wave: "sine".to_string(),
+ },
+ AudioStream {
+ name: "audio_1".to_string(),
+ lang: "fre".to_string(),
+ default: false,
+ wave: "white-noise".to_string(),
+ },
+ ],
+ all_mimes: vec![],
+ path: manifest_path.clone(),
+ wrote_manifest: false,
+ }));
+
+ {
+ let state_lock = state.lock().unwrap();
+
+ for stream in &state_lock.video_streams {
+ stream.setup(state.clone(), &pipeline, &path)?;
+ }
+
+ for stream in &state_lock.audio_streams {
+ stream.setup(state.clone(), &pipeline, &path)?;
+ }
+ }
+
+ pipeline.set_state(gst::State::Playing)?;
+
+ let bus = pipeline
+ .bus()
+ .expect("Pipeline without bus. Shouldn't happen!");
+
+ for msg in bus.iter_timed(gst::ClockTime::NONE) {
+ use gst::MessageView;
+
+ match msg.view() {
+ MessageView::Eos(..) => {
+ println!("EOS");
+ break;
+ }
+ MessageView::Error(err) => {
+ pipeline.set_state(gst::State::Null)?;
+ eprintln!(
+ "Got error from {}: {} ({})",
+ msg.src()
+ .map(|s| String::from(s.path_string()))
+ .unwrap_or_else(|| "None".into()),
+ err.error(),
+ err.debug().unwrap_or_else(|| "".into()),
+ );
+ break;
+ }
+ _ => (),
+ }
+ }
+
+ pipeline.set_state(gst::State::Null)?;
+
+ Ok(())
+}
diff --git a/mux/fmp4/src/fmp4mux/boxes.rs b/mux/fmp4/src/fmp4mux/boxes.rs
new file mode 100644
index 000000000..9b69fb36f
--- /dev/null
+++ b/mux/fmp4/src/fmp4mux/boxes.rs
@@ -0,0 +1,2073 @@
+// Copyright (C) 2021 Sebastian Dröge <sebastian@centricular.com>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
+// If a copy of the MPL was not distributed with this file, You can obtain one at
+// <https://mozilla.org/MPL/2.0/>.
+//
+// SPDX-License-Identifier: MPL-2.0
+
+use gst::prelude::*;
+
+use anyhow::{anyhow, bail, Context, Error};
+
+use super::Buffer;
+
+fn write_box<T, F: FnOnce(&mut Vec<u8>) -> Result<T, Error>>(
+ vec: &mut Vec<u8>,
+ fourcc: impl std::borrow::Borrow<[u8; 4]>,
+ content_func: F,
+) -> Result<T, Error> {
+ // Write zero size ...
+ let size_pos = vec.len();
+ vec.extend([0u8; 4]);
+ vec.extend(fourcc.borrow());
+
+ let res = content_func(vec)?;
+
+ // ... and update it here later.
+ let size: u32 = vec
+ .len()
+ .checked_sub(size_pos)
+ .expect("vector shrunk")
+ .try_into()
+ .context("too big box content")?;
+ vec[size_pos..][..4].copy_from_slice(&size.to_be_bytes());
+
+ Ok(res)
+}
+
+const FULL_BOX_VERSION_0: u8 = 0;
+const FULL_BOX_VERSION_1: u8 = 1;
+
+const FULL_BOX_FLAGS_NONE: u32 = 0;
+
+fn write_full_box<T, F: FnOnce(&mut Vec<u8>) -> Result<T, Error>>(
+ vec: &mut Vec<u8>,
+ fourcc: impl std::borrow::Borrow<[u8; 4]>,
+ version: u8,
+ flags: u32,
+ content_func: F,
+) -> Result<T, Error> {
+ write_box(vec, fourcc, move |vec| {
+ assert_eq!(flags >> 24, 0);
+ vec.extend(((u32::from(version) << 24) | flags).to_be_bytes());
+ content_func(vec)
+ })
+}
+
+fn cmaf_brands_from_caps(caps: &gst::CapsRef, compatible_brands: &mut Vec<&'static [u8; 4]>) {
+ let s = caps.structure(0).unwrap();
+ match s.name() {
+ "video/x-h264" => {
+ let width = s.get::<i32>("width").ok();
+ let height = s.get::<i32>("height").ok();
+ let fps = s.get::<gst::Fraction>("framerate").ok();
+ let profile = s.get::<&str>("profile").ok();
+ let level = s
+ .get::<&str>("level")
+ .ok()
+ .map(|l| l.split_once('.').unwrap_or((l, "0")));
+ let colorimetry = s.get::<&str>("colorimetry").ok();
+
+ if let (Some(width), Some(height), Some(profile), Some(level), Some(fps)) =
+ (width, height, profile, level, fps)
+ {
+ if profile == "high"
+ || profile == "main"
+ || profile == "baseline"
+ || profile == "constrained-baseline"
+ {
+ if width <= 864
+ && height <= 576
+ && level <= ("3", "1")
+ && fps <= gst::Fraction::new(60, 1)
+ {
+ #[cfg(feature = "v1_18")]
+ {
+ if let Some(colorimetry) = colorimetry
+ .and_then(|c| c.parse::<gst_video::VideoColorimetry>().ok())
+ {
+ if matches!(
+ colorimetry.primaries(),
+ gst_video::VideoColorPrimaries::Bt709
+ | gst_video::VideoColorPrimaries::Bt470bg
+ | gst_video::VideoColorPrimaries::Smpte170m
+ ) && matches!(
+ colorimetry.transfer(),
+ gst_video::VideoTransferFunction::Bt709
+ | gst_video::VideoTransferFunction::Bt601
+ ) && matches!(
+ colorimetry.matrix(),
+ gst_video::VideoColorMatrix::Bt709
+ | gst_video::VideoColorMatrix::Bt601
+ ) {
+ compatible_brands.push(b"cfsd");
+ }
+ } else {
+ // Assume it's OK
+ compatible_brands.push(b"cfsd");
+ }
+ }
+ #[cfg(not(feature = "v1_18"))]
+ {
+ // Assume it's OK
+ compatible_brands.push(b"cfsd");
+ }
+ } else if width <= 1920
+ && height <= 1080
+ && level <= ("4", "0")
+ && fps <= gst::Fraction::new(60, 1)
+ {
+ #[cfg(feature = "v1_18")]
+ {
+ if let Some(colorimetry) = colorimetry
+ .and_then(|c| c.parse::<gst_video::VideoColorimetry>().ok())
+ {
+ if matches!(
+ colorimetry.primaries(),
+ gst_video::VideoColorPrimaries::Bt709
+ ) && matches!(
+ colorimetry.transfer(),
+ gst_video::VideoTransferFunction::Bt709
+ ) && matches!(
+ colorimetry.matrix(),
+ gst_video::VideoColorMatrix::Bt709
+ ) {
+ compatible_brands.push(b"cfhd");
+ }
+ } else {
+ // Assume it's OK
+ compatible_brands.push(b"cfhd");
+ }
+ }
+ #[cfg(not(feature = "v1_18"))]
+ {
+ // Assume it's OK
+ compatible_brands.push(b"cfhd");
+ }
+ } else if width <= 1920
+ && height <= 1080
+ && level <= ("4", "2")
+ && fps <= gst::Fraction::new(60, 1)
+ {
+ if let Some(colorimetry) =
+ colorimetry.and_then(|c| c.parse::<gst_video::VideoColorimetry>().ok())
+ {
+ if matches!(
+ colorimetry.primaries(),
+ gst_video::VideoColorPrimaries::Bt709
+ ) && matches!(
+ colorimetry.transfer(),
+ gst_video::VideoTransferFunction::Bt709
+ ) && matches!(
+ colorimetry.matrix(),
+ gst_video::VideoColorMatrix::Bt709
+ ) {
+ compatible_brands.push(b"chdf");
+ }
+ } else {
+ // Assume it's OK
+ compatible_brands.push(b"chdf");
+ }
+ }
+ }
+ }
+ }
+ "audio/mpeg" => {
+ compatible_brands.push(b"caac");
+ }
+ "video/x-h265" => {
+ let width = s.get::<i32>("width").ok();
+ let height = s.get::<i32>("height").ok();
+ let fps = s.get::<gst::Fraction>("framerate").ok();
+ let profile = s.get::<&str>("profile").ok();
+ let tier = s.get::<&str>("tier").ok();
+ let level = s
+ .get::<&str>("level")
+ .ok()
+ .map(|l| l.split_once('.').unwrap_or((l, "0")));
+ let colorimetry = s.get::<&str>("colorimetry").ok();
+
+ if let (Some(width), Some(height), Some(profile), Some(tier), Some(level), Some(fps)) =
+ (width, height, profile, tier, level, fps)
+ {
+ if profile == "main" && tier == "main" {
+ if width <= 1920
+ && height <= 1080
+ && level <= ("4", "1")
+ && fps <= gst::Fraction::new(60, 1)
+ {
+ if let Some(colorimetry) =
+ colorimetry.and_then(|c| c.parse::<gst_video::VideoColorimetry>().ok())
+ {
+ if matches!(
+ colorimetry.primaries(),
+ gst_video::VideoColorPrimaries::Bt709
+ ) && matches!(
+ colorimetry.transfer(),
+ gst_video::VideoTransferFunction::Bt709
+ ) && matches!(
+ colorimetry.matrix(),
+ gst_video::VideoColorMatrix::Bt709
+ ) {
+ compatible_brands.push(b"chhd");
+ }
+ } else {
+ // Assume it's OK
+ compatible_brands.push(b"chhd");
+ }
+ } else if width <= 3840
+ && height <= 2160
+ && level <= ("5", "0")
+ && fps <= gst::Fraction::new(60, 1)
+ {
+ if let Some(colorimetry) =
+ colorimetry.and_then(|c| c.parse::<gst_video::VideoColorimetry>().ok())
+ {
+ if matches!(
+ colorimetry.primaries(),
+ gst_video::VideoColorPrimaries::Bt709
+ ) && matches!(
+ colorimetry.transfer(),
+ gst_video::VideoTransferFunction::Bt709
+ ) && matches!(
+ colorimetry.matrix(),
+ gst_video::VideoColorMatrix::Bt709
+ ) {
+ compatible_brands.push(b"cud8");
+ }
+ } else {
+ // Assume it's OK
+ compatible_brands.push(b"cud8");
+ }
+ }
+ } else if profile == "main-10" && tier == "main-10" {
+ if width <= 1920
+ && height <= 1080
+ && level <= ("4", "1")
+ && fps <= gst::Fraction::new(60, 1)
+ {
+ if let Some(colorimetry) =
+ colorimetry.and_then(|c| c.parse::<gst_video::VideoColorimetry>().ok())
+ {
+ if matches!(
+ colorimetry.primaries(),
+ gst_video::VideoColorPrimaries::Bt709
+ ) && matches!(
+ colorimetry.transfer(),
+ gst_video::VideoTransferFunction::Bt709
+ ) && matches!(
+ colorimetry.matrix(),
+ gst_video::VideoColorMatrix::Bt709
+ ) {
+ compatible_brands.push(b"chh1");
+ }
+ } else {
+ // Assume it's OK
+ compatible_brands.push(b"chh1");
+ }
+ } else if width <= 3840
+ && height <= 2160
+ && level <= ("5", "1")
+ && fps <= gst::Fraction::new(60, 1)
+ {
+ #[cfg(feature = "v1_18")]
+ if let Some(colorimetry) =
+ colorimetry.and_then(|c| c.parse::<gst_video::VideoColorimetry>().ok())
+ {
+ if matches!(
+ colorimetry.primaries(),
+ gst_video::VideoColorPrimaries::Bt709
+ | gst_video::VideoColorPrimaries::Bt2020
+ ) && matches!(
+ colorimetry.transfer(),
+ gst_video::VideoTransferFunction::Bt709
+ | gst_video::VideoTransferFunction::Bt202010
+ | gst_video::VideoTransferFunction::Bt202012
+ ) && matches!(
+ colorimetry.matrix(),
+ gst_video::VideoColorMatrix::Bt709
+ | gst_video::VideoColorMatrix::Bt2020
+ ) {
+ compatible_brands.push(b"cud1");
+ } else if matches!(
+ colorimetry.primaries(),
+ gst_video::VideoColorPrimaries::Bt2020
+ ) && matches!(
+ colorimetry.transfer(),
+ gst_video::VideoTransferFunction::Smpte2084
+ ) && matches!(
+ colorimetry.matrix(),
+ gst_video::VideoColorMatrix::Bt2020
+ ) {
+ compatible_brands.push(b"chd1");
+ } else if matches!(
+ colorimetry.primaries(),
+ gst_video::VideoColorPrimaries::Bt2020
+ ) && matches!(
+ colorimetry.transfer(),
+ gst_video::VideoTransferFunction::AribStdB67
+ ) && matches!(
+ colorimetry.matrix(),
+ gst_video::VideoColorMatrix::Bt2020
+ ) {
+ compatible_brands.push(b"clg1");
+ }
+ } else {
+ // Assume it's OK
+ compatible_brands.push(b"cud1");
+ }
+ }
+ #[cfg(not(feature = "v1_18"))]
+ {
+ // Assume it's OK
+ compatible_brands.push(b"cud1");
+ }
+ }
+ }
+ }
+ _ => (),
+ }
+}
+
+fn brands_from_variant_and_caps<'a>(
+ variant: super::Variant,
+ mut caps: impl Iterator<Item = &'a gst::Caps>,
+) -> (&'static [u8; 4], Vec<&'static [u8; 4]>) {
+ match variant {
+ super::Variant::ISO | super::Variant::ONVIF => (b"iso6", vec![b"iso6"]),
+ super::Variant::DASH => {
+ // FIXME: `dsms` / `dash` brands, `msix`
+ (b"msdh", vec![b"dums", b"msdh", b"iso6"])
+ }
+ super::Variant::CMAF => {
+ let mut compatible_brands = vec![b"iso6", b"cmfc"];
+
+ cmaf_brands_from_caps(caps.next().unwrap(), &mut compatible_brands);
+ assert_eq!(caps.next(), None);
+
+ (b"cmf2", compatible_brands)
+ }
+ }
+}
+
+/// Creates `ftyp` and `moov` boxes
+pub(super) fn create_fmp4_header(cfg: super::HeaderConfiguration) -> Result<gst::Buffer, Error> {
+ let mut v = vec![];
+
+ let (brand, compatible_brands) = brands_from_variant_and_caps(cfg.variant, cfg.streams.iter());
+
+ write_box(&mut v, b"ftyp", |v| {
+ // major brand
+ v.extend(brand);
+ // minor version
+ v.extend(0u32.to_be_bytes());
+ // compatible brands
+ v.extend(compatible_brands.into_iter().flatten());
+
+ Ok(())
+ })?;
+
+ write_box(&mut v, b"moov", |v| write_moov(v, &cfg))?;
+
+ if cfg.variant == super::Variant::ONVIF {
+ write_full_box(
+ &mut v,
+ b"meta",
+ FULL_BOX_VERSION_0,
+ FULL_BOX_FLAGS_NONE,
+ |v| {
+ write_full_box(v, b"hdlr", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| {
+ // Handler type
+ v.extend(b"null");
+
+ // Reserved
+ v.extend([0u8; 3 * 4]);
+
+ // Name
+ v.extend(b"MetadataHandler");
+
+ Ok(())
+ })?;
+
+ write_box(v, b"cstb", |v| {
+ // entry count
+ v.extend(1u32.to_be_bytes());
+
+ // track id
+ v.extend(0u32.to_be_bytes());
+
+ // start UTC time in 100ns units since Jan 1 1601
+ v.extend(cfg.start_utc_time.unwrap().to_be_bytes());
+
+ Ok(())
+ })
+ },
+ )?;
+ }
+
+ Ok(gst::Buffer::from_mut_slice(v))
+}
+
+fn write_moov(v: &mut Vec<u8>, cfg: &super::HeaderConfiguration) -> Result<(), Error> {
+ use gst::glib;
+
+ let base = glib::DateTime::from_utc(1904, 1, 1, 0, 0, 0.0)?;
+ let now = glib::DateTime::now_utc()?;
+ let creation_time =
+ u64::try_from(now.difference(&base).as_seconds()).expect("time before 1904");
+
+ write_full_box(v, b"mvhd", FULL_BOX_VERSION_1, FULL_BOX_FLAGS_NONE, |v| {
+ write_mvhd(v, cfg, creation_time)
+ })?;
+ for (idx, caps) in cfg.streams.iter().enumerate() {
+ write_box(v, b"trak", |v| {
+ let mut references = vec![];
+
+ // Reference the video track for ONVIF metadata tracks
+ if cfg.variant == super::Variant::ONVIF
+ && caps.structure(0).unwrap().name() == "application/x-onvif-metadata"
+ {
+ // Find the first video track
+ for (idx, caps) in cfg.streams.iter().enumerate() {
+ let s = caps.structure(0).unwrap();
+
+ if matches!(s.name(), "video/x-h264" | "video/x-h265" | "image/jpeg") {
+ references.push(TrackReference {
+ reference_type: *b"cdsc",
+ track_ids: vec![idx as u32 + 1],
+ });
+ break;
+ }
+ }
+ }
+
+ write_trak(v, cfg, idx, caps, creation_time, &references)
+ })?;
+ }
+ write_box(v, b"mvex", |v| write_mvex(v, cfg))?;
+
+ Ok(())
+}
+
+fn caps_to_timescale(caps: &gst::CapsRef) -> u32 {
+ let s = caps.structure(0).unwrap();
+
+ if let Ok(fps) = s.get::<gst::Fraction>("framerate") {
+ if fps.numer() == 0 {
+ return 10_000;
+ }
+
+ if fps.denom() != 1 && fps.denom() != 1001 {
+ if let Some(fps) = (fps.denom() as u64)
+ .nseconds()
+ .mul_div_round(1_000_000_000, fps.numer() as u64)
+ .and_then(gst_video::guess_framerate)
+ {
+ return (fps.numer() as u32)
+ .mul_div_round(100, fps.denom() as u32)
+ .unwrap_or(10_000);
+ }
+ }
+
+ (fps.numer() as u32)
+ .mul_div_round(100, fps.denom() as u32)
+ .unwrap_or(10_000)
+ } else if let Ok(rate) = s.get::<i32>("rate") {
+ rate as u32
+ } else {
+ 10_000
+ }
+}
+
+fn write_mvhd(
+ v: &mut Vec<u8>,
+ cfg: &super::HeaderConfiguration,
+ creation_time: u64,
+) -> Result<(), Error> {
+ // Creation time
+ v.extend(creation_time.to_be_bytes());
+ // Modification time
+ v.extend(creation_time.to_be_bytes());
+ // Timescale: uses the reference track timescale
+ v.extend(caps_to_timescale(&cfg.streams[0]).to_be_bytes());
+ // Duration
+ v.extend(0u64.to_be_bytes());
+
+ // Rate 1.0
+ v.extend((1u32 << 16).to_be_bytes());
+ // Volume 1.0
+ v.extend((1u16 << 8).to_be_bytes());
+ // Reserved
+ v.extend([0u8; 2 + 2 * 4]);
+
+ // Matrix
+ v.extend(
+ [
+ (1u32 << 16).to_be_bytes(),
+ 0u32.to_be_bytes(),
+ 0u32.to_be_bytes(),
+ 0u32.to_be_bytes(),
+ (1u32 << 16).to_be_bytes(),
+ 0u32.to_be_bytes(),
+ 0u32.to_be_bytes(),
+ 0u32.to_be_bytes(),
+ (16384u32 << 16).to_be_bytes(),
+ ]
+ .into_iter()
+ .flatten(),
+ );
+
+ // Pre defined
+ v.extend([0u8; 6 * 4]);
+
+ // Next track id
+ v.extend((cfg.streams.len() as u32 + 1).to_be_bytes());
+
+ Ok(())
+}
+
+const TKHD_FLAGS_TRACK_ENABLED: u32 = 0x1;
+const TKHD_FLAGS_TRACK_IN_MOVIE: u32 = 0x2;
+const TKHD_FLAGS_TRACK_IN_PREVIEW: u32 = 0x4;
+
+struct TrackReference {
+ reference_type: [u8; 4],
+ track_ids: Vec<u32>,
+}
+
+fn write_trak(
+ v: &mut Vec<u8>,
+ cfg: &super::HeaderConfiguration,
+ idx: usize,
+ caps: &gst::CapsRef,
+ creation_time: u64,
+ references: &[TrackReference],
+) -> Result<(), Error> {
+ write_full_box(
+ v,
+ b"tkhd",
+ FULL_BOX_VERSION_1,
+ TKHD_FLAGS_TRACK_ENABLED | TKHD_FLAGS_TRACK_IN_MOVIE | TKHD_FLAGS_TRACK_IN_PREVIEW,
+ |v| write_tkhd(v, cfg, idx, caps, creation_time),
+ )?;
+
+ // TODO: write edts if necessary: for audio tracks to remove initialization samples
+ // TODO: write edts optionally for negative DTS instead of offsetting the DTS
+
+ write_box(v, b"mdia", |v| write_mdia(v, cfg, caps, creation_time))?;
+
+ if !references.is_empty() {
+ write_box(v, b"tref", |v| write_tref(v, cfg, references))?;
+ }
+
+ Ok(())
+}
+
+fn write_tkhd(
+ v: &mut Vec<u8>,
+ _cfg: &super::HeaderConfiguration,
+ idx: usize,
+ caps: &gst::CapsRef,
+ creation_time: u64,
+) -> Result<(), Error> {
+ // Creation time
+ v.extend(creation_time.to_be_bytes());
+ // Modification time
+ v.extend(creation_time.to_be_bytes());
+ // Track ID
+ v.extend((idx as u32 + 1).to_be_bytes());
+ // Reserved
+ v.extend(0u32.to_be_bytes());
+ // Duration
+ v.extend(0u64.to_be_bytes());
+
+ // Reserved
+ v.extend([0u8; 2 * 4]);
+
+ // Layer
+ v.extend(0u16.to_be_bytes());
+ // Alternate group
+ v.extend(0u16.to_be_bytes());
+
+ // Volume
+ let s = caps.structure(0).unwrap();
+ match s.name() {
+ "audio/mpeg" | "audio/x-alaw" | "audio/x-mulaw" | "audio/x-adpcm" => {
+ v.extend((1u16 << 8).to_be_bytes())
+ }
+ _ => v.extend(0u16.to_be_bytes()),
+ }
+
+ // Reserved
+ v.extend([0u8; 2]);
+
+ // Matrix
+ v.extend(
+ [
+ (1u32 << 16).to_be_bytes(),
+ 0u32.to_be_bytes(),
+ 0u32.to_be_bytes(),
+ 0u32.to_be_bytes(),
+ (1u32 << 16).to_be_bytes(),
+ 0u32.to_be_bytes(),
+ 0u32.to_be_bytes(),
+ 0u32.to_be_bytes(),
+ (16384u32 << 16).to_be_bytes(),
+ ]
+ .into_iter()
+ .flatten(),
+ );
+
+ // Width/height
+ match s.name() {
+ "video/x-h264" | "video/x-h265" | "image/jpeg" => {
+ let width = s.get::<i32>("width").context("video caps without width")? as u32;
+ let height = s
+ .get::<i32>("height")
+ .context("video caps without height")? as u32;
+ let par = s
+ .get::<gst::Fraction>("pixel-aspect-ratio")
+ .unwrap_or_else(|_| gst::Fraction::new(1, 1));
+
+ let width = std::cmp::min(
+ width
+ .mul_div_round(par.numer() as u32, par.denom() as u32)
+ .unwrap_or(u16::MAX as u32),
+ u16::MAX as u32,
+ );
+ let height = std::cmp::min(height, u16::MAX as u32);
+
+ v.extend((width << 16).to_be_bytes());
+ v.extend((height << 16).to_be_bytes());
+ }
+ _ => v.extend([0u8; 2 * 4]),
+ }
+
+ Ok(())
+}
+
+fn write_mdia(
+ v: &mut Vec<u8>,
+ cfg: &super::HeaderConfiguration,
+ caps: &gst::CapsRef,
+ creation_time: u64,
+) -> Result<(), Error> {
+ write_full_box(v, b"mdhd", FULL_BOX_VERSION_1, FULL_BOX_FLAGS_NONE, |v| {
+ write_mdhd(v, cfg, caps, creation_time)
+ })?;
+ write_full_box(v, b"hdlr", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| {
+ write_hdlr(v, cfg, caps)
+ })?;
+
+ // TODO: write elng if needed
+
+ write_box(v, b"minf", |v| write_minf(v, cfg, caps))?;
+
+ Ok(())
+}
+
+fn write_tref(
+ v: &mut Vec<u8>,
+ _cfg: &super::HeaderConfiguration,
+ references: &[TrackReference],
+) -> Result<(), Error> {
+ for reference in references {
+ write_box(v, &reference.reference_type, |v| {
+ for track_id in &reference.track_ids {
+ v.extend(track_id.to_be_bytes());
+ }
+
+ Ok(())
+ })?;
+ }
+
+ Ok(())
+}
+
+fn language_code(lang: impl std::borrow::Borrow<[u8; 3]>) -> u16 {
+ let lang = lang.borrow();
+
+ // TODO: Need to relax this once we get the language code from tags
+ assert!(lang.iter().all(u8::is_ascii_lowercase));
+
+ (((lang[0] as u16 - 0x60) & 0x1F) << 10)
+ + (((lang[1] as u16 - 0x60) & 0x1F) << 5)
+ + ((lang[2] as u16 - 0x60) & 0x1F)
+}
+
+fn write_mdhd(
+ v: &mut Vec<u8>,
+ _cfg: &super::HeaderConfiguration,
+ caps: &gst::CapsRef,
+ creation_time: u64,
+) -> Result<(), Error> {
+ // Creation time
+ v.extend(creation_time.to_be_bytes());
+ // Modification time
+ v.extend(creation_time.to_be_bytes());
+ // Timescale
+ v.extend(caps_to_timescale(caps).to_be_bytes());
+ // Duration
+ v.extend(0u64.to_be_bytes());
+
+ // Language as ISO-639-2/T
+ // TODO: get actual language from the tags
+ v.extend(language_code(b"und").to_be_bytes());
+
+ // Pre-defined
+ v.extend([0u8; 2]);
+
+ Ok(())
+}
+
+fn write_hdlr(
+ v: &mut Vec<u8>,
+ _cfg: &super::HeaderConfiguration,
+ caps: &gst::CapsRef,
+) -> Result<(), Error> {
+ // Pre-defined
+ v.extend([0u8; 4]);
+
+ let s = caps.structure(0).unwrap();
+ let (handler_type, name) = match s.name() {
+ "video/x-h264" | "video/x-h265" | "image/jpeg" => (b"vide", b"VideoHandler\0".as_slice()),
+ "audio/mpeg" | "audio/x-alaw" | "audio/x-mulaw" | "audio/x-adpcm" => {
+ (b"soun", b"SoundHandler\0".as_slice())
+ }
+ "application/x-onvif-metadata" => (b"meta", b"MetadataHandler\0".as_slice()),
+ _ => unreachable!(),
+ };
+
+ // Handler type
+ v.extend(handler_type);
+
+ // Reserved
+ v.extend([0u8; 3 * 4]);
+
+ // Name
+ v.extend(name);
+
+ Ok(())
+}
+
+fn write_minf(
+ v: &mut Vec<u8>,
+ cfg: &super::HeaderConfiguration,
+ caps: &gst::CapsRef,
+) -> Result<(), Error> {
+ let s = caps.structure(0).unwrap();
+
+ match s.name() {
+ "video/x-h264" | "video/x-h265" | "image/jpeg" => {
+ // Flags are always 1 for unspecified reasons
+ write_full_box(v, b"vmhd", FULL_BOX_VERSION_0, 1, |v| write_vmhd(v, cfg))?
+ }
+ "audio/mpeg" | "audio/x-alaw" | "audio/x-mulaw" | "audio/x-adpcm" => {
+ write_full_box(v, b"smhd", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| {
+ write_smhd(v, cfg)
+ })?
+ }
+ "application/x-onvif-metadata" => {
+ write_full_box(v, b"nmhd", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |_v| {
+ Ok(())
+ })?
+ }
+ _ => unreachable!(),
+ }
+
+ write_box(v, b"dinf", |v| write_dinf(v, cfg))?;
+
+ write_box(v, b"stbl", |v| write_stbl(v, cfg, caps))?;
+
+ Ok(())
+}
+
+fn write_vmhd(v: &mut Vec<u8>, _cfg: &super::HeaderConfiguration) -> Result<(), Error> {
+ // Graphics mode
+ v.extend([0u8; 2]);
+
+ // opcolor
+ v.extend([0u8; 2 * 3]);
+
+ Ok(())
+}
+
+fn write_smhd(v: &mut Vec<u8>, _cfg: &super::HeaderConfiguration) -> Result<(), Error> {
+ // Balance
+ v.extend([0u8; 2]);
+
+ // Reserved
+ v.extend([0u8; 2]);
+
+ Ok(())
+}
+
+fn write_dinf(v: &mut Vec<u8>, cfg: &super::HeaderConfiguration) -> Result<(), Error> {
+ write_full_box(v, b"dref", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| {
+ write_dref(v, cfg)
+ })?;
+
+ Ok(())
+}
+
+const DREF_FLAGS_MEDIA_IN_SAME_FILE: u32 = 0x1;
+
+fn write_dref(v: &mut Vec<u8>, _cfg: &super::HeaderConfiguration) -> Result<(), Error> {
+ // Entry count
+ v.extend(1u32.to_be_bytes());
+
+ write_full_box(
+ v,
+ b"url ",
+ FULL_BOX_VERSION_0,
+ DREF_FLAGS_MEDIA_IN_SAME_FILE,
+ |_v| Ok(()),
+ )?;
+
+ Ok(())
+}
+
+fn write_stbl(
+ v: &mut Vec<u8>,
+ cfg: &super::HeaderConfiguration,
+ caps: &gst::CapsRef,
+) -> Result<(), Error> {
+ write_full_box(v, b"stsd", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| {
+ write_stsd(v, cfg, caps)
+ })?;
+ write_full_box(v, b"stts", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| {
+ write_stts(v, cfg)
+ })?;
+ write_full_box(v, b"stsc", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| {
+ write_stsc(v, cfg)
+ })?;
+ write_full_box(v, b"stsz", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| {
+ write_stsz(v, cfg)
+ })?;
+
+ write_full_box(v, b"stco", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| {
+ write_stco(v, cfg)
+ })?;
+
+ // For video write a sync sample box as indication that not all samples are sync samples
+ let s = caps.structure(0).unwrap();
+ match s.name() {
+ "video/x-h264" | "video/x-h265" => {
+ write_full_box(v, b"stss", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| {
+ write_stss(v, cfg)
+ })?
+ }
+ _ => (),
+ }
+
+ Ok(())
+}
+
+fn write_stsd(
+ v: &mut Vec<u8>,
+ cfg: &super::HeaderConfiguration,
+ caps: &gst::CapsRef,
+) -> Result<(), Error> {
+ // Entry count
+ v.extend(1u32.to_be_bytes());
+
+ let s = caps.structure(0).unwrap();
+ match s.name() {
+ "video/x-h264" | "video/x-h265" | "image/jpeg" => write_visual_sample_entry(v, cfg, caps)?,
+ "audio/mpeg" | "audio/x-alaw" | "audio/x-mulaw" | "audio/x-adpcm" => {
+ write_audio_sample_entry(v, cfg, caps)?
+ }
+ "application/x-onvif-metadata" => write_xml_meta_data_sample_entry(v, cfg, caps)?,
+ _ => unreachable!(),
+ }
+
+ Ok(())
+}
+
+fn write_sample_entry_box<T, F: FnOnce(&mut Vec<u8>) -> Result<T, Error>>(
+ v: &mut Vec<u8>,
+ fourcc: impl std::borrow::Borrow<[u8; 4]>,
+ content_func: F,
+) -> Result<T, Error> {
+ write_box(v, fourcc, move |v| {
+ // Reserved
+ v.extend([0u8; 6]);
+
+ // Data reference index
+ v.extend(1u16.to_be_bytes());
+
+ content_func(v)
+ })
+}
+
+fn write_visual_sample_entry(
+ v: &mut Vec<u8>,
+ _cfg: &super::HeaderConfiguration,
+ caps: &gst::CapsRef,
+) -> Result<(), Error> {
+ let s = caps.structure(0).unwrap();
+ let fourcc = match s.name() {
+ "video/x-h264" => {
+ let stream_format = s.get::<&str>("stream-format").context("no stream-format")?;
+ match stream_format {
+ "avc" => b"avc1",
+ "avc3" => b"avc3",
+ _ => unreachable!(),
+ }
+ }
+ "video/x-h265" => {
+ let stream_format = s.get::<&str>("stream-format").context("no stream-format")?;
+ match stream_format {
+ "hvc1" => b"hvc1",
+ "hev1" => b"hev1",
+ _ => unreachable!(),
+ }
+ }
+ "image/jpeg" => b"jpeg",
+ _ => unreachable!(),
+ };
+
+ write_sample_entry_box(v, fourcc, move |v| {
+ // pre-defined
+ v.extend([0u8; 2]);
+ // Reserved
+ v.extend([0u8; 2]);
+ // pre-defined
+ v.extend([0u8; 3 * 4]);
+
+ // Width
+ let width =
+ u16::try_from(s.get::<i32>("width").context("no width")?).context("too big width")?;
+ v.extend(width.to_be_bytes());
+
+ // Height
+ let height = u16::try_from(s.get::<i32>("height").context("no height")?)
+ .context("too big height")?;
+ v.extend(height.to_be_bytes());
+
+ // Horizontal resolution
+ v.extend(0x00480000u32.to_be_bytes());
+
+ // Vertical resolution
+ v.extend(0x00480000u32.to_be_bytes());
+
+ // Reserved
+ v.extend([0u8; 4]);
+
+ // Frame count
+ v.extend(1u16.to_be_bytes());
+
+ // Compressor name
+ v.extend([0u8; 32]);
+
+ // Depth
+ v.extend(0x0018u16.to_be_bytes());
+
+ // Pre-defined
+ v.extend((-1i16).to_be_bytes());
+
+ // Codec specific boxes
+ match s.name() {
+ "video/x-h264" => {
+ let codec_data = s
+ .get::<&gst::BufferRef>("codec_data")
+ .context("no codec_data")?;
+ let map = codec_data
+ .map_readable()
+ .context("codec_data not mappable")?;
+ write_box(v, b"avcC", move |v| {
+ v.extend_from_slice(&map);
+ Ok(())
+ })?;
+ }
+ "video/x-h265" => {
+ let codec_data = s
+ .get::<&gst::BufferRef>("codec_data")
+ .context("no codec_data")?;
+ let map = codec_data
+ .map_readable()
+ .context("codec_data not mappable")?;
+ write_box(v, b"hvcC", move |v| {
+ v.extend_from_slice(&map);
+ Ok(())
+ })?;
+ }
+ "image/jpeg" => {
+ // Nothing to do here
+ }
+ _ => unreachable!(),
+ }
+
+ if let Ok(par) = s.get::<gst::Fraction>("pixel-aspect-ratio") {
+ write_box(v, b"pasp", move |v| {
+ v.extend((par.numer() as u32).to_be_bytes());
+ v.extend((par.denom() as u32).to_be_bytes());
+ Ok(())
+ })?;
+ }
+
+ if let Some(colorimetry) = s
+ .get::<&str>("colorimetry")
+ .ok()
+ .and_then(|c| c.parse::<gst_video::VideoColorimetry>().ok())
+ {
+ write_box(v, b"colr", move |v| {
+ v.extend(b"nclx");
+ let (primaries, transfer, matrix) = {
+ #[cfg(feature = "v1_18")]
+ {
+ (
+ (colorimetry.primaries().to_iso() as u16),
+ (colorimetry.transfer().to_iso() as u16),
+ (colorimetry.matrix().to_iso() as u16),
+ )
+ }
+ #[cfg(not(feature = "v1_18"))]
+ {
+ let primaries = match colorimetry.primaries() {
+ gst_video::VideoColorPrimaries::Bt709 => 1u16,
+ gst_video::VideoColorPrimaries::Bt470m => 4u16,
+ gst_video::VideoColorPrimaries::Bt470bg => 5u16,
+ gst_video::VideoColorPrimaries::Smpte170m => 6u16,
+ gst_video::VideoColorPrimaries::Smpte240m => 7u16,
+ gst_video::VideoColorPrimaries::Film => 8u16,
+ gst_video::VideoColorPrimaries::Bt2020 => 9u16,
+ _ => 2,
+ };
+ let transfer = match colorimetry.transfer() {
+ gst_video::VideoTransferFunction::Bt709 => 1u16,
+ gst_video::VideoTransferFunction::Gamma22 => 4u16,
+ gst_video::VideoTransferFunction::Gamma28 => 5u16,
+ gst_video::VideoTransferFunction::Smpte240m => 7u16,
+ gst_video::VideoTransferFunction::Gamma10 => 8u16,
+ gst_video::VideoTransferFunction::Log100 => 9u16,
+ gst_video::VideoTransferFunction::Log316 => 10u16,
+ gst_video::VideoTransferFunction::Srgb => 13u16,
+ gst_video::VideoTransferFunction::Bt202012 => 15u16,
+ _ => 2,
+ };
+ let matrix = match colorimetry.matrix() {
+ gst_video::VideoColorMatrix::Rgb => 0u16,
+ gst_video::VideoColorMatrix::Bt709 => 1u16,
+ gst_video::VideoColorMatrix::Fcc => 4u16,
+ gst_video::VideoColorMatrix::Bt601 => 6u16,
+ gst_video::VideoColorMatrix::Smpte240m => 7u16,
+ gst_video::VideoColorMatrix::Bt2020 => 9u16,
+ _ => 2,
+ };
+
+ (primaries, transfer, matrix)
+ }
+ };
+
+ let full_range = match colorimetry.range() {
+ gst_video::VideoColorRange::Range0_255 => 0x80u8,
+ gst_video::VideoColorRange::Range16_235 => 0x00u8,
+ _ => 0x00,
+ };
+
+ v.extend(primaries.to_be_bytes());
+ v.extend(transfer.to_be_bytes());
+ v.extend(matrix.to_be_bytes());
+ v.push(full_range);
+
+ Ok(())
+ })?;
+ }
+
+ #[cfg(feature = "v1_18")]
+ {
+ if let Ok(cll) = gst_video::VideoContentLightLevel::from_caps(caps) {
+ write_box(v, b"clli", move |v| {
+ v.extend((cll.max_content_light_level() as u16).to_be_bytes());
+ v.extend((cll.max_frame_average_light_level() as u16).to_be_bytes());
+ Ok(())
+ })?;
+ }
+
+ if let Ok(mastering) = gst_video::VideoMasteringDisplayInfo::from_caps(caps) {
+ write_box(v, b"mdcv", move |v| {
+ for primary in mastering.display_primaries() {
+ v.extend(primary.x.to_be_bytes());
+ v.extend(primary.y.to_be_bytes());
+ }
+ v.extend(mastering.white_point().x.to_be_bytes());
+ v.extend(mastering.white_point().y.to_be_bytes());
+ v.extend(mastering.max_display_mastering_luminance().to_be_bytes());
+ v.extend(mastering.max_display_mastering_luminance().to_be_bytes());
+ Ok(())
+ })?;
+ }
+ }
+
+ // Write fiel box for codecs that require it
+ if ["image/jpeg"].contains(&s.name()) {
+ let interlace_mode = s
+ .get::<&str>("interlace-mode")
+ .ok()
+ .map(gst_video::VideoInterlaceMode::from_string)
+ .unwrap_or(gst_video::VideoInterlaceMode::Progressive);
+ let field_order = s
+ .get::<&str>("field-order")
+ .ok()
+ .map(gst_video::VideoFieldOrder::from_string)
+ .unwrap_or(gst_video::VideoFieldOrder::Unknown);
+
+ write_box(v, b"fiel", move |v| {
+ let (interlace, field_order) = match interlace_mode {
+ gst_video::VideoInterlaceMode::Progressive => (1, 0),
+ gst_video::VideoInterlaceMode::Interleaved
+ if field_order == gst_video::VideoFieldOrder::TopFieldFirst =>
+ {
+ (2, 9)
+ }
+ gst_video::VideoInterlaceMode::Interleaved => (2, 14),
+ _ => (0, 0),
+ };
+
+ v.push(interlace);
+ v.push(field_order);
+ Ok(())
+ })?;
+ }
+
+ // TODO: write btrt bitrate box based on tags
+
+ Ok(())
+ })?;
+
+ Ok(())
+}
+
+fn write_audio_sample_entry(
+ v: &mut Vec<u8>,
+ _cfg: &super::HeaderConfiguration,
+ caps: &gst::CapsRef,
+) -> Result<(), Error> {
+ let s = caps.structure(0).unwrap();
+ let fourcc = match s.name() {
+ "audio/mpeg" => b"mp4a",
+ "audio/x-alaw" => b"alaw",
+ "audio/x-mulaw" => b"ulaw",
+ "audio/x-adpcm" => {
+ let layout = s.get::<&str>("layout").context("no ADPCM layout field")?;
+
+ match layout {
+ "g726" => b"ms\x00\x45",
+ _ => unreachable!(),
+ }
+ }
+ _ => unreachable!(),
+ };
+
+ let sample_size = match s.name() {
+ "audio/x-adpcm" => {
+ let bitrate = s.get::<i32>("bitrate").context("no ADPCM bitrate field")?;
+ (bitrate / 8000) as u16
+ }
+ _ => 16u16,
+ };
+
+ write_sample_entry_box(v, fourcc, move |v| {
+ // Reserved
+ v.extend([0u8; 2 * 4]);
+
+ // Channel count
+ let channels = u16::try_from(s.get::<i32>("channels").context("no channels")?)
+ .context("too many channels")?;
+ v.extend(channels.to_be_bytes());
+
+ // Sample size
+ v.extend(sample_size.to_be_bytes());
+
+ // Pre-defined
+ v.extend([0u8; 2]);
+
+ // Reserved
+ v.extend([0u8; 2]);
+
+ // Sample rate
+ let rate = u16::try_from(s.get::<i32>("rate").context("no rate")?).unwrap_or(0);
+ v.extend((u32::from(rate) << 16).to_be_bytes());
+
+ // Codec specific boxes
+ match s.name() {
+ "audio/mpeg" => {
+ let codec_data = s
+ .get::<&gst::BufferRef>("codec_data")
+ .context("no codec_data")?;
+ let map = codec_data
+ .map_readable()
+ .context("codec_data not mappable")?;
+ if map.len() < 2 {
+ bail!("too small codec_data");
+ }
+ write_esds_aac(v, &map)?;
+ }
+ "audio/x-alaw" | "audio/x-mulaw" | "audio/x-adpcm" => {
+ // Nothing to do here
+ }
+ _ => unreachable!(),
+ }
+
+ // If rate did not fit into 16 bits write a full `srat` box
+ if rate == 0 {
+ let rate = s.get::<i32>("rate").context("no rate")?;
+ // FIXME: This is defined as full box?
+ write_full_box(
+ v,
+ b"srat",
+ FULL_BOX_VERSION_0,
+ FULL_BOX_FLAGS_NONE,
+ move |v| {
+ v.extend((rate as u32).to_be_bytes());
+ Ok(())
+ },
+ )?;
+ }
+
+ // TODO: write btrt bitrate box based on tags
+
+ // TODO: chnl box for channel ordering? probably not needed for AAC
+
+ Ok(())
+ })?;
+
+ Ok(())
+}
+
+fn write_esds_aac(v: &mut Vec<u8>, codec_data: &[u8]) -> Result<(), Error> {
+ let calculate_len = |mut len| {
+ if len > 260144641 {
+ bail!("too big descriptor length");
+ }
+
+ if len == 0 {
+ return Ok(([0; 4], 1));
+ }
+
+ let mut idx = 0;
+ let mut lens = [0u8; 4];
+ while len > 0 {
+ lens[idx] = ((if len > 0x7f { 0x80 } else { 0x00 }) | (len & 0x7f)) as u8;
+ idx += 1;
+ len >>= 7;
+ }
+
+ Ok((lens, idx))
+ };
+
+ write_full_box(
+ v,
+ b"esds",
+ FULL_BOX_VERSION_0,
+ FULL_BOX_FLAGS_NONE,
+ move |v| {
+ // Calculate all lengths bottom up
+
+ // Decoder specific info
+ let decoder_specific_info_len = calculate_len(codec_data.len())?;
+
+ // Decoder config
+ let decoder_config_len =
+ calculate_len(13 + 1 + decoder_specific_info_len.1 + codec_data.len())?;
+
+ // SL config
+ let sl_config_len = calculate_len(1)?;
+
+ // ES descriptor
+ let es_descriptor_len = calculate_len(
+ 3 + 1
+ + decoder_config_len.1
+ + 13
+ + 1
+ + decoder_specific_info_len.1
+ + codec_data.len()
+ + 1
+ + sl_config_len.1
+ + 1,
+ )?;
+
+ // ES descriptor tag
+ v.push(0x03);
+
+ // Length
+ v.extend_from_slice(&es_descriptor_len.0[..(es_descriptor_len.1)]);
+
+ // Track ID
+ v.extend(1u16.to_be_bytes());
+ // Flags
+ v.push(0u8);
+
+ // Decoder config descriptor
+ v.push(0x04);
+
+ // Length
+ v.extend_from_slice(&decoder_config_len.0[..(decoder_config_len.1)]);
+
+ // Object type ESDS_OBJECT_TYPE_MPEG4_P3
+ v.push(0x40);
+ // Stream type ESDS_STREAM_TYPE_AUDIO
+ v.push((0x05 << 2) | 0x01);
+
+ // Buffer size db?
+ v.extend([0u8; 3]);
+
+ // Max bitrate
+ v.extend(0u32.to_be_bytes());
+
+ // Avg bitrate
+ v.extend(0u32.to_be_bytes());
+
+ // Decoder specific info
+ v.push(0x05);
+
+ // Length
+ v.extend_from_slice(&decoder_specific_info_len.0[..(decoder_specific_info_len.1)]);
+ v.extend_from_slice(codec_data);
+
+ // SL config descriptor
+ v.push(0x06);
+
+ // Length: 1 (tag) + 1 (length) + 1 (predefined)
+ v.extend_from_slice(&sl_config_len.0[..(sl_config_len.1)]);
+
+ // Predefined
+ v.push(0x02);
+ Ok(())
+ },
+ )
+}
+
+fn write_xml_meta_data_sample_entry(
+ v: &mut Vec<u8>,
+ _cfg: &super::HeaderConfiguration,
+ caps: &gst::CapsRef,
+) -> Result<(), Error> {
+ let s = caps.structure(0).unwrap();
+ let namespace = match s.name() {
+ "application/x-onvif-metadata" => b"http://www.onvif.org/ver10/schema",
+ _ => unreachable!(),
+ };
+
+ write_sample_entry_box(v, b"metx", move |v| {
+ // content_encoding, empty string
+ v.push(0);
+
+ // namespace
+ v.extend_from_slice(namespace);
+ v.push(0);
+
+ // schema_location, empty string list
+ v.push(0);
+
+ Ok(())
+ })?;
+
+ Ok(())
+}
+
+fn write_stts(v: &mut Vec<u8>, _cfg: &super::HeaderConfiguration) -> Result<(), Error> {
+ // Entry count
+ v.extend(0u32.to_be_bytes());
+
+ Ok(())
+}
+
+fn write_stsc(v: &mut Vec<u8>, _cfg: &super::HeaderConfiguration) -> Result<(), Error> {
+ // Entry count
+ v.extend(0u32.to_be_bytes());
+
+ Ok(())
+}
+
+fn write_stsz(v: &mut Vec<u8>, _cfg: &super::HeaderConfiguration) -> Result<(), Error> {
+ // Sample size
+ v.extend(0u32.to_be_bytes());
+
+ // Sample count
+ v.extend(0u32.to_be_bytes());
+
+ Ok(())
+}
+
+fn write_stco(v: &mut Vec<u8>, _cfg: &super::HeaderConfiguration) -> Result<(), Error> {
+ // Entry count
+ v.extend(0u32.to_be_bytes());
+
+ Ok(())
+}
+
+fn write_stss(v: &mut Vec<u8>, _cfg: &super::HeaderConfiguration) -> Result<(), Error> {
+ // Entry count
+ v.extend(0u32.to_be_bytes());
+
+ Ok(())
+}
+
+fn write_mvex(v: &mut Vec<u8>, cfg: &super::HeaderConfiguration) -> Result<(), Error> {
+ if cfg.write_mehd {
+ if cfg.update && cfg.duration.is_some() {
+ write_full_box(v, b"mehd", FULL_BOX_VERSION_1, FULL_BOX_FLAGS_NONE, |v| {
+ write_mehd(v, cfg)
+ })?;
+ } else {
+ write_box(v, b"free", |v| {
+ // version/flags of full box
+ v.extend(0u32.to_be_bytes());
+ // mehd duration
+ v.extend(0u64.to_be_bytes());
+
+ Ok(())
+ })?;
+ }
+ }
+
+ for (idx, _caps) in cfg.streams.iter().enumerate() {
+ write_full_box(v, b"trex", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| {
+ write_trex(v, cfg, idx)
+ })?;
+ }
+
+ Ok(())
+}
+
+fn write_mehd(v: &mut Vec<u8>, cfg: &super::HeaderConfiguration) -> Result<(), Error> {
+ // Use the reference track timescale
+ let timescale = caps_to_timescale(&cfg.streams[0]);
+
+ let duration = cfg
+ .duration
+ .expect("no duration")
+ .mul_div_ceil(timescale as u64, gst::ClockTime::SECOND.nseconds())
+ .context("too long duration")?;
+
+ // Media duration in mvhd.timescale units
+ v.extend(duration.to_be_bytes());
+
+ Ok(())
+}
+
+fn write_trex(v: &mut Vec<u8>, _cfg: &super::HeaderConfiguration, idx: usize) -> Result<(), Error> {
+ // Track ID
+ v.extend((idx as u32 + 1).to_be_bytes());
+
+ // Default sample description index
+ v.extend(1u32.to_be_bytes());
+
+ // Default sample duration
+ v.extend(0u32.to_be_bytes());
+
+ // Default sample size
+ v.extend(0u32.to_be_bytes());
+
+ // Default sample flags
+ v.extend(0u32.to_be_bytes());
+
+ // Default sample duration/size/etc will be provided in the traf/trun if one can be determined
+ // for a whole fragment
+
+ Ok(())
+}
+
+/// Creates `styp` and `moof` boxes and `mdat` header
+pub(super) fn create_fmp4_fragment_header(
+ cfg: super::FragmentHeaderConfiguration,
+) -> Result<(gst::Buffer, u64), Error> {
+ let mut v = vec![];
+
+ let (brand, compatible_brands) =
+ brands_from_variant_and_caps(cfg.variant, cfg.streams.iter().map(|s| &s.0));
+
+ write_box(&mut v, b"styp", |v| {
+ // major brand
+ v.extend(brand);
+ // minor version
+ v.extend(0u32.to_be_bytes());
+ // compatible brands
+ v.extend(compatible_brands.into_iter().flatten());
+
+ Ok(())
+ })?;
+
+ let styp_len = v.len();
+
+ let data_offset_offsets = write_box(&mut v, b"moof", |v| write_moof(v, &cfg))?;
+
+ let size = cfg
+ .buffers
+ .iter()
+ .map(|buffer| buffer.buffer.size() as u64)
+ .sum::<u64>();
+ if let Ok(size) = u32::try_from(size + 8) {
+ v.extend(size.to_be_bytes());
+ v.extend(b"mdat");
+ } else {
+ v.extend(1u32.to_be_bytes());
+ v.extend(b"mdat");
+ v.extend((size + 16).to_be_bytes());
+ }
+
+ let data_offset = v.len() - styp_len;
+ for data_offset_offset in data_offset_offsets {
+ let val = u32::from_be_bytes(v[data_offset_offset..][..4].try_into()?)
+ .checked_add(u32::try_from(data_offset)?)
+ .ok_or_else(|| anyhow!("can't calculate track run data offset"))?;
+ v[data_offset_offset..][..4].copy_from_slice(&val.to_be_bytes());
+ }
+
+ Ok((gst::Buffer::from_mut_slice(v), styp_len as u64))
+}
+
+fn write_moof(
+ v: &mut Vec<u8>,
+ cfg: &super::FragmentHeaderConfiguration,
+) -> Result<Vec<usize>, Error> {
+ write_full_box(v, b"mfhd", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| {
+ write_mfhd(v, cfg)
+ })?;
+
+ let mut data_offset_offsets = vec![];
+ for (idx, (caps, timing_info)) in cfg.streams.iter().enumerate() {
+ // Skip tracks without any buffers for this fragment.
+ let timing_info = match timing_info {
+ None => continue,
+ Some(ref timing_info) => timing_info,
+ };
+
+ write_box(v, b"traf", |v| {
+ write_traf(v, cfg, &mut data_offset_offsets, idx, caps, timing_info)
+ })?;
+ }
+
+ Ok(data_offset_offsets)
+}
+
+fn write_mfhd(v: &mut Vec<u8>, cfg: &super::FragmentHeaderConfiguration) -> Result<(), Error> {
+ v.extend(cfg.sequence_number.to_be_bytes());
+
+ Ok(())
+}
+
+#[allow(clippy::identity_op)]
+fn sample_flags_from_buffer(
+ timing_info: &super::FragmentTimingInfo,
+ buffer: &gst::BufferRef,
+) -> u32 {
+ if timing_info.intra_only {
+ (0b00u32 << (16 + 10)) | // leading: unknown
+ (0b10u32 << (16 + 8)) | // depends: no
+ (0b10u32 << (16 + 6)) | // depended: no
+ (0b00u32 << (16 + 4)) | // redundancy: unknown
+ (0b000u32 << (16 + 1)) | // padding: no
+ (0b0u32 << 16) | // non-sync-sample: no
+ (0u32) // degradation priority
+ } else {
+ let depends = if buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) {
+ 0b01u32
+ } else {
+ 0b10u32
+ };
+ let depended = if buffer.flags().contains(gst::BufferFlags::DROPPABLE) {
+ 0b10u32
+ } else {
+ 0b00u32
+ };
+ let non_sync_sample = if buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) {
+ 0b1u32
+ } else {
+ 0b0u32
+ };
+
+ (0b00u32 << (16 + 10)) | // leading: unknown
+ (depends << (16 + 8)) | // depends
+ (depended << (16 + 6)) | // depended
+ (0b00u32 << (16 + 4)) | // redundancy: unknown
+ (0b000u32 << (16 + 1)) | // padding: no
+ (non_sync_sample << 16) | // non-sync-sample
+ (0u32) // degradation priority
+ }
+}
+
+const DEFAULT_SAMPLE_DURATION_PRESENT: u32 = 0x08;
+const DEFAULT_SAMPLE_SIZE_PRESENT: u32 = 0x10;
+const DEFAULT_SAMPLE_FLAGS_PRESENT: u32 = 0x20;
+const DEFAULT_BASE_IS_MOOF: u32 = 0x2_00_00;
+
+const DATA_OFFSET_PRESENT: u32 = 0x0_01;
+const FIRST_SAMPLE_FLAGS_PRESENT: u32 = 0x0_04;
+const SAMPLE_DURATION_PRESENT: u32 = 0x1_00;
+const SAMPLE_SIZE_PRESENT: u32 = 0x2_00;
+const SAMPLE_FLAGS_PRESENT: u32 = 0x4_00;
+const SAMPLE_COMPOSITION_TIME_OFFSET_PRESENT: u32 = 0x8_00;
+
+#[allow(clippy::type_complexity)]
+fn analyze_buffers(
+ cfg: &super::FragmentHeaderConfiguration,
+ idx: usize,
+ timing_info: &super::FragmentTimingInfo,
+ timescale: u32,
+) -> Result<
+ (
+ // tf_flags
+ u32,
+ // tr_flags
+ u32,
+ // default size
+ Option<u32>,
+ // default duration
+ Option<u32>,
+ // default flags
+ Option<u32>,
+ // negative composition time offsets
+ bool,
+ ),
+ Error,
+> {
+ let mut tf_flags = DEFAULT_BASE_IS_MOOF;
+ let mut tr_flags = DATA_OFFSET_PRESENT;
+
+ let mut duration = None;
+ let mut size = None;
+ let mut first_buffer_flags = None;
+ let mut flags = None;
+
+ let mut negative_composition_time_offsets = false;
+
+ for Buffer {
+ idx: _idx,
+ buffer,
+ timestamp: _timestamp,
+ duration: sample_duration,
+ composition_time_offset,
+ } in cfg.buffers.iter().filter(|b| b.idx == idx)
+ {
+ if size.is_none() {
+ size = Some(buffer.size() as u32);
+ }
+ if Some(buffer.size() as u32) != size {
+ tr_flags |= SAMPLE_SIZE_PRESENT;
+ }
+
+ let sample_duration = u32::try_from(
+ sample_duration
+ .nseconds()
+ .mul_div_round(timescale as u64, gst::ClockTime::SECOND.nseconds())
+ .context("too big sample duration")?,
+ )
+ .context("too big sample duration")?;
+
+ if duration.is_none() {
+ duration = Some(sample_duration);
+ }
+ if Some(sample_duration) != duration {
+ tr_flags |= SAMPLE_DURATION_PRESENT;
+ }
+
+ let f = sample_flags_from_buffer(timing_info, buffer);
+ if first_buffer_flags.is_none() {
+ first_buffer_flags = Some(f);
+ } else {
+ flags = Some(f);
+ if Some(f) != first_buffer_flags {
+ tr_flags |= FIRST_SAMPLE_FLAGS_PRESENT;
+ }
+ }
+
+ if flags.is_some() && Some(f) != flags {
+ tr_flags &= !FIRST_SAMPLE_FLAGS_PRESENT;
+ tr_flags |= SAMPLE_FLAGS_PRESENT;
+ }
+
+ if let Some(composition_time_offset) = *composition_time_offset {
+ assert!(!timing_info.intra_only);
+ if composition_time_offset != 0 {
+ tr_flags |= SAMPLE_COMPOSITION_TIME_OFFSET_PRESENT;
+ }
+ if composition_time_offset < 0 {
+ negative_composition_time_offsets = true;
+ }
+ }
+ }
+
+ if (tr_flags & SAMPLE_SIZE_PRESENT) == 0 {
+ tf_flags |= DEFAULT_SAMPLE_SIZE_PRESENT;
+ } else {
+ size = None;
+ }
+
+ if (tr_flags & SAMPLE_DURATION_PRESENT) == 0 {
+ tf_flags |= DEFAULT_SAMPLE_DURATION_PRESENT;
+ } else {
+ duration = None;
+ }
+
+ // If there is only a single buffer use its flags as default sample flags
+ // instead of first sample flags.
+ if flags.is_none() && first_buffer_flags.is_some() {
+ tr_flags &= !FIRST_SAMPLE_FLAGS_PRESENT;
+ flags = first_buffer_flags.take();
+ }
+
+ if (tr_flags & SAMPLE_FLAGS_PRESENT) == 0 {
+ tf_flags |= DEFAULT_SAMPLE_FLAGS_PRESENT;
+ } else {
+ flags = None;
+ }
+
+ Ok((
+ tf_flags,
+ tr_flags,
+ size,
+ duration,
+ flags,
+ negative_composition_time_offsets,
+ ))
+}
+
+#[allow(clippy::ptr_arg)]
+fn write_traf(
+ v: &mut Vec<u8>,
+ cfg: &super::FragmentHeaderConfiguration,
+ data_offset_offsets: &mut Vec<usize>,
+ idx: usize,
+ caps: &gst::CapsRef,
+ timing_info: &super::FragmentTimingInfo,
+) -> Result<(), Error> {
+ let timescale = caps_to_timescale(caps);
+
+ // Analyze all buffers to know what values can be put into the tfhd for all samples and what
+ // has to be stored for every single sample
+ let (
+ tf_flags,
+ tr_flags,
+ default_size,
+ default_duration,
+ default_flags,
+ negative_composition_time_offsets,
+ ) = analyze_buffers(cfg, idx, timing_info, timescale)?;
+
+ assert!((tf_flags & DEFAULT_SAMPLE_SIZE_PRESENT == 0) ^ default_size.is_some());
+ assert!((tf_flags & DEFAULT_SAMPLE_DURATION_PRESENT == 0) ^ default_duration.is_some());
+ assert!((tf_flags & DEFAULT_SAMPLE_FLAGS_PRESENT == 0) ^ default_flags.is_some());
+
+ write_full_box(v, b"tfhd", FULL_BOX_VERSION_0, tf_flags, |v| {
+ write_tfhd(v, cfg, idx, default_size, default_duration, default_flags)
+ })?;
+ write_full_box(v, b"tfdt", FULL_BOX_VERSION_1, FULL_BOX_FLAGS_NONE, |v| {
+ write_tfdt(v, cfg, idx, timing_info, timescale)
+ })?;
+
+ let mut current_data_offset = 0;
+
+ for run in GroupBy::new(cfg.buffers, |a: &Buffer, b: &Buffer| a.idx == b.idx) {
+ if run[0].idx != idx {
+ // FIXME: What to do with >4GB offsets?
+ current_data_offset = (current_data_offset as u64
+ + run.iter().map(|b| b.buffer.size() as u64).sum::<u64>())
+ .try_into()?;
+ continue;
+ }
+
+ let data_offset_offset = write_full_box(
+ v,
+ b"trun",
+ if negative_composition_time_offsets {
+ FULL_BOX_VERSION_1
+ } else {
+ FULL_BOX_VERSION_0
+ },
+ tr_flags,
+ |v| {
+ write_trun(
+ v,
+ cfg,
+ current_data_offset,
+ tr_flags,
+ timescale,
+ timing_info,
+ run,
+ )
+ },
+ )?;
+ data_offset_offsets.push(data_offset_offset);
+
+ // FIXME: What to do with >4GB offsets?
+ current_data_offset = (current_data_offset as u64
+ + run.iter().map(|b| b.buffer.size() as u64).sum::<u64>())
+ .try_into()?;
+ }
+
+ // TODO: saio, saiz, sbgp, sgpd, subs?
+
+ Ok(())
+}
+
+fn write_tfhd(
+ v: &mut Vec<u8>,
+ _cfg: &super::FragmentHeaderConfiguration,
+ idx: usize,
+ default_size: Option<u32>,
+ default_duration: Option<u32>,
+ default_flags: Option<u32>,
+) -> Result<(), Error> {
+ // Track ID
+ v.extend((idx as u32 + 1).to_be_bytes());
+
+ // No base data offset, no sample description index
+
+ if let Some(default_duration) = default_duration {
+ v.extend(default_duration.to_be_bytes());
+ }
+
+ if let Some(default_size) = default_size {
+ v.extend(default_size.to_be_bytes());
+ }
+
+ if let Some(default_flags) = default_flags {
+ v.extend(default_flags.to_be_bytes());
+ }
+
+ Ok(())
+}
+
+fn write_tfdt(
+ v: &mut Vec<u8>,
+ _cfg: &super::FragmentHeaderConfiguration,
+ _idx: usize,
+ timing_info: &super::FragmentTimingInfo,
+ timescale: u32,
+) -> Result<(), Error> {
+ let base_time = timing_info
+ .start_time
+ .mul_div_floor(timescale as u64, gst::ClockTime::SECOND.nseconds())
+ .context("base time overflow")?;
+
+ v.extend(base_time.to_be_bytes());
+
+ Ok(())
+}
+
+#[allow(clippy::too_many_arguments)]
+fn write_trun(
+ v: &mut Vec<u8>,
+ _cfg: &super::FragmentHeaderConfiguration,
+ current_data_offset: u32,
+ tr_flags: u32,
+ timescale: u32,
+ timing_info: &super::FragmentTimingInfo,
+ buffers: &[Buffer],
+) -> Result<usize, Error> {
+ // Sample count
+ v.extend((buffers.len() as u32).to_be_bytes());
+
+ let data_offset_offset = v.len();
+ // Data offset, will be rewritten later
+ v.extend(current_data_offset.to_be_bytes());
+
+ if (tr_flags & FIRST_SAMPLE_FLAGS_PRESENT) != 0 {
+ v.extend(sample_flags_from_buffer(timing_info, &buffers[0].buffer).to_be_bytes());
+ }
+
+ for Buffer {
+ idx: _idx,
+ ref buffer,
+ timestamp: _timestamp,
+ duration,
+ composition_time_offset,
+ } in buffers.iter()
+ {
+ if (tr_flags & SAMPLE_DURATION_PRESENT) != 0 {
+ // Sample duration
+ let sample_duration = u32::try_from(
+ duration
+ .nseconds()
+ .mul_div_round(timescale as u64, gst::ClockTime::SECOND.nseconds())
+ .context("too big sample duration")?,
+ )
+ .context("too big sample duration")?;
+ v.extend(sample_duration.to_be_bytes());
+ }
+
+ if (tr_flags & SAMPLE_SIZE_PRESENT) != 0 {
+ // Sample size
+ v.extend((buffer.size() as u32).to_be_bytes());
+ }
+
+ if (tr_flags & SAMPLE_FLAGS_PRESENT) != 0 {
+ assert!((tr_flags & FIRST_SAMPLE_FLAGS_PRESENT) == 0);
+
+ // Sample flags
+ v.extend(sample_flags_from_buffer(timing_info, buffer).to_be_bytes());
+ }
+
+ if (tr_flags & SAMPLE_COMPOSITION_TIME_OFFSET_PRESENT) != 0 {
+ // Sample composition time offset
+ let composition_time_offset = i32::try_from(
+ composition_time_offset
+ .unwrap_or(0)
+ .mul_div_round(timescale as i64, gst::ClockTime::SECOND.nseconds() as i64)
+ .context("too big composition time offset")?,
+ )
+ .context("too big composition time offset")?;
+ v.extend(composition_time_offset.to_be_bytes());
+ }
+ }
+
+ Ok(data_offset_offset)
+}
+
+/// Creates `mfra` box
+pub(crate) fn create_mfra(
+ caps: &gst::CapsRef,
+ fragment_offsets: &[super::FragmentOffset],
+) -> Result<gst::Buffer, Error> {
+ let timescale = caps_to_timescale(caps);
+
+ let mut v = vec![];
+
+ let offset = write_box(&mut v, b"mfra", |v| {
+ write_full_box(v, b"tfra", FULL_BOX_VERSION_1, FULL_BOX_FLAGS_NONE, |v| {
+ // Track ID
+ v.extend(1u32.to_be_bytes());
+
+ // Reserved / length of traf/trun/sample
+ v.extend(0u32.to_be_bytes());
+
+ // Number of entries
+ v.extend(
+ u32::try_from(fragment_offsets.len())
+ .context("too many fragments")?
+ .to_be_bytes(),
+ );
+
+ for super::FragmentOffset { time, offset } in fragment_offsets {
+ // Time
+ let time = time
+ .nseconds()
+ .mul_div_round(timescale as u64, gst::ClockTime::SECOND.nseconds())
+ .context("time overflow")?;
+ v.extend(time.to_be_bytes());
+
+ // moof offset
+ v.extend(offset.to_be_bytes());
+
+ // traf/trun/sample number
+ v.extend_from_slice(&[1u8; 3][..]);
+ }
+
+ Ok(())
+ })?;
+
+ let offset = write_full_box(v, b"mfro", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| {
+ let offset = v.len();
+ // Parent size
+ v.extend(0u32.to_be_bytes());
+ Ok(offset)
+ })?;
+
+ Ok(offset)
+ })?;
+
+ let len = u32::try_from(v.len() as u64).context("too big mfra")?;
+ v[offset..][..4].copy_from_slice(&len.to_be_bytes());
+
+ Ok(gst::Buffer::from_mut_slice(v))
+}
+
+// Copy from std while this is still nightly-only
+use std::fmt;
+
+/// An iterator over slice in (non-overlapping) chunks separated by a predicate.
+///
+/// This struct is created by the [`group_by`] method on [slices].
+///
+/// [`group_by`]: slice::group_by
+/// [slices]: slice
+struct GroupBy<'a, T: 'a, P> {
+ slice: &'a [T],
+ predicate: P,
+}
+
+impl<'a, T: 'a, P> GroupBy<'a, T, P> {
+ fn new(slice: &'a [T], predicate: P) -> Self {
+ GroupBy { slice, predicate }
+ }
+}
+
+impl<'a, T: 'a, P> Iterator for GroupBy<'a, T, P>
+where
+ P: FnMut(&T, &T) -> bool,
+{
+ type Item = &'a [T];
+
+ #[inline]
+ fn next(&mut self) -> Option<Self::Item> {
+ if self.slice.is_empty() {
+ None
+ } else {
+ let mut len = 1;
+ let mut iter = self.slice.windows(2);
+ while let Some([l, r]) = iter.next() {
+ if (self.predicate)(l, r) {
+ len += 1
+ } else {
+ break;
+ }
+ }
+ let (head, tail) = self.slice.split_at(len);
+ self.slice = tail;
+ Some(head)
+ }
+ }
+
+ #[inline]
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ if self.slice.is_empty() {
+ (0, Some(0))
+ } else {
+ (1, Some(self.slice.len()))
+ }
+ }
+
+ #[inline]
+ fn last(mut self) -> Option<Self::Item> {
+ self.next_back()
+ }
+}
+
+impl<'a, T: 'a, P> DoubleEndedIterator for GroupBy<'a, T, P>
+where
+ P: FnMut(&T, &T) -> bool,
+{
+ #[inline]
+ fn next_back(&mut self) -> Option<Self::Item> {
+ if self.slice.is_empty() {
+ None
+ } else {
+ let mut len = 1;
+ let mut iter = self.slice.windows(2);
+ while let Some([l, r]) = iter.next_back() {
+ if (self.predicate)(l, r) {
+ len += 1
+ } else {
+ break;
+ }
+ }
+ let (head, tail) = self.slice.split_at(self.slice.len() - len);
+ self.slice = head;
+ Some(tail)
+ }
+ }
+}
+
+impl<'a, T: 'a, P> std::iter::FusedIterator for GroupBy<'a, T, P> where P: FnMut(&T, &T) -> bool {}
+
+impl<'a, T: 'a + fmt::Debug, P> fmt::Debug for GroupBy<'a, T, P> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("GroupBy")
+ .field("slice", &self.slice)
+ .finish()
+ }
+}
diff --git a/mux/fmp4/src/fmp4mux/imp.rs b/mux/fmp4/src/fmp4mux/imp.rs
new file mode 100644
index 000000000..a7a8932dc
--- /dev/null
+++ b/mux/fmp4/src/fmp4mux/imp.rs
@@ -0,0 +1,2596 @@
+// Copyright (C) 2021 Sebastian Dröge <sebastian@centricular.com>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
+// If a copy of the MPL was not distributed with this file, You can obtain one at
+// <https://mozilla.org/MPL/2.0/>.
+//
+// SPDX-License-Identifier: MPL-2.0
+
+use gst::glib;
+use gst::prelude::*;
+use gst::subclass::prelude::*;
+use gst_base::prelude::*;
+use gst_base::subclass::prelude::*;
+
+use std::collections::VecDeque;
+use std::sync::Mutex;
+
+use once_cell::sync::Lazy;
+
+use super::boxes;
+use super::Buffer;
+
+/// Offset for the segment in non-single-stream variants.
+const SEGMENT_OFFSET: gst::ClockTime = gst::ClockTime::from_seconds(60 * 60 * 1000);
+
+/// Offset between UNIX epoch and Jan 1 1601 epoch in seconds.
+/// 1601 = UNIX + UNIX_1601_OFFSET.
+const UNIX_1601_OFFSET: u64 = 11_644_473_600;
+
+/// Offset between NTP and UNIX epoch in seconds.
+/// NTP = UNIX + NTP_UNIX_OFFSET.
+const NTP_UNIX_OFFSET: u64 = 2_208_988_800;
+
+/// Reference timestamp meta caps for NTP timestamps.
+static NTP_CAPS: Lazy<gst::Caps> = Lazy::new(|| gst::Caps::builder("timestamp/x-ntp").build());
+
+/// Reference timestamp meta caps for UNIX timestamps.
+static UNIX_CAPS: Lazy<gst::Caps> = Lazy::new(|| gst::Caps::builder("timestamp/x-unix").build());
+
+/// Returns the UTC time of the buffer in the UNIX epoch.
+fn get_utc_time_from_buffer(buffer: &gst::BufferRef) -> Option<gst::ClockTime> {
+ buffer
+ .iter_meta::<gst::ReferenceTimestampMeta>()
+ .find_map(|meta| {
+ if meta.reference().can_intersect(&UNIX_CAPS) {
+ Some(meta.timestamp())
+ } else if meta.reference().can_intersect(&NTP_CAPS) {
+ meta.timestamp().checked_sub(NTP_UNIX_OFFSET.seconds())
+ } else {
+ None
+ }
+ })
+}
+
+static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
+ gst::DebugCategory::new(
+ "fmp4mux",
+ gst::DebugColorFlags::empty(),
+ Some("FMP4Mux Element"),
+ )
+});
+
+const DEFAULT_FRAGMENT_DURATION: gst::ClockTime = gst::ClockTime::from_seconds(10);
+const DEFAULT_HEADER_UPDATE_MODE: super::HeaderUpdateMode = super::HeaderUpdateMode::None;
+const DEFAULT_WRITE_MFRA: bool = false;
+const DEFAULT_WRITE_MEHD: bool = false;
+const DEFAULT_INTERLEAVE_BYTES: Option<u64> = None;
+const DEFAULT_INTERLEAVE_TIME: Option<gst::ClockTime> = Some(gst::ClockTime::from_mseconds(250));
+
+#[derive(Debug, Clone)]
+struct Settings {
+ fragment_duration: gst::ClockTime,
+ header_update_mode: super::HeaderUpdateMode,
+ write_mfra: bool,
+ write_mehd: bool,
+ interleave_bytes: Option<u64>,
+ interleave_time: Option<gst::ClockTime>,
+}
+
+impl Default for Settings {
+ fn default() -> Self {
+ Settings {
+ fragment_duration: DEFAULT_FRAGMENT_DURATION,
+ header_update_mode: DEFAULT_HEADER_UPDATE_MODE,
+ write_mfra: DEFAULT_WRITE_MFRA,
+ write_mehd: DEFAULT_WRITE_MEHD,
+ interleave_bytes: DEFAULT_INTERLEAVE_BYTES,
+ interleave_time: DEFAULT_INTERLEAVE_TIME,
+ }
+ }
+}
+
+#[derive(Debug)]
+struct GopBuffer {
+ buffer: gst::Buffer,
+ pts: gst::ClockTime,
+ dts: Option<gst::ClockTime>,
+}
+
+#[derive(Debug)]
+struct Gop {
+ // Running times
+ start_pts: gst::ClockTime,
+ start_dts: Option<gst::ClockTime>,
+ earliest_pts: gst::ClockTime,
+ // Once this is known to be the final earliest PTS/DTS
+ final_earliest_pts: bool,
+ // PTS plus duration of last buffer, or start of next GOP
+ end_pts: gst::ClockTime,
+ // Once this is known to be the final end PTS/DTS
+ final_end_pts: bool,
+ // DTS plus duration of last buffer, or start of next GOP
+ end_dts: Option<gst::ClockTime>,
+
+ // Buffer positions
+ earliest_pts_position: gst::ClockTime,
+ start_dts_position: Option<gst::ClockTime>,
+
+ // Buffer, PTS running time, DTS running time
+ buffers: Vec<GopBuffer>,
+}
+
+struct Stream {
+ sinkpad: gst_base::AggregatorPad,
+
+ caps: gst::Caps,
+ intra_only: bool,
+
+ queued_gops: VecDeque<Gop>,
+ fragment_filled: bool,
+
+ // Difference between the first DTS and 0 in case of negative DTS
+ dts_offset: Option<gst::ClockTime>,
+
+ // Current position (DTS, or PTS for intra-only) to prevent
+ // timestamps from going backwards when queueing new buffers
+ current_position: gst::ClockTime,
+
+ // Current UTC time in ONVIF mode to prevent timestamps from
+ // going backwards when draining a fragment.
+ // UNIX epoch.
+ current_utc_time: gst::ClockTime,
+}
+
+#[derive(Default)]
+struct State {
+ streams: Vec<Stream>,
+
+ // Created once we received caps and kept up to date with the caps,
+ // sent as part of the buffer list for the first fragment.
+ stream_header: Option<gst::Buffer>,
+
+ sequence_number: u32,
+
+ // Fragment tracking for mfra
+ current_offset: u64,
+ fragment_offsets: Vec<super::FragmentOffset>,
+
+ // Start / end PTS of the whole stream
+ earliest_pts: Option<gst::ClockTime>,
+ end_pts: Option<gst::ClockTime>,
+
+ // Start PTS of the current fragment
+ fragment_start_pts: Option<gst::ClockTime>,
+ // Additional timeout delay in case GOPs are bigger than the fragment duration
+ timeout_delay: gst::ClockTime,
+
+ // In ONVIF mode the UTC time corresponding to the beginning of the stream
+ // UNIX epoch.
+ start_utc_time: Option<gst::ClockTime>,
+ end_utc_time: Option<gst::ClockTime>,
+
+ sent_headers: bool,
+}
+
+#[derive(Default)]
+pub(crate) struct FMP4Mux {
+ state: Mutex<State>,
+ settings: Mutex<Settings>,
+}
+
+impl FMP4Mux {
+ fn find_earliest_stream<'a>(
+ &self,
+ state: &'a mut State,
+ timeout: bool,
+ ) -> Result<Option<(usize, &'a mut Stream)>, gst::FlowError> {
+ let mut earliest_stream = None;
+ let mut all_have_data_or_eos = true;
+
+ for (idx, stream) in state.streams.iter_mut().enumerate() {
+ let buffer = match stream.sinkpad.peek_buffer() {
+ Some(buffer) => buffer,
+ None => {
+ if stream.sinkpad.is_eos() {
+ gst::trace!(CAT, obj: &stream.sinkpad, "Stream is EOS");
+ } else {
+ all_have_data_or_eos = false;
+ gst::trace!(CAT, obj: &stream.sinkpad, "Stream has no buffer");
+ }
+ continue;
+ }
+ };
+
+ if stream.fragment_filled {
+ gst::trace!(CAT, obj: &stream.sinkpad, "Stream has current fragment filled");
+ continue;
+ }
+
+ let segment = match stream
+ .sinkpad
+ .segment()
+ .clone()
+ .downcast::<gst::ClockTime>()
+ .ok()
+ {
+ Some(segment) => segment,
+ None => {
+ gst::error!(CAT, obj: &stream.sinkpad, "Got buffer before segment");
+ return Err(gst::FlowError::Error);
+ }
+ };
+
+ // If the stream has no valid running time, assume it's before everything else.
+ let running_time = match segment.to_running_time(buffer.dts_or_pts()) {
+ None => {
+ gst::trace!(CAT, obj: &stream.sinkpad, "Stream has no valid running time");
+ if earliest_stream
+ .as_ref()
+ .map_or(true, |(_, _, earliest_running_time)| {
+ *earliest_running_time > gst::ClockTime::ZERO
+ })
+ {
+ earliest_stream = Some((idx, stream, gst::ClockTime::ZERO));
+ }
+ continue;
+ }
+ Some(running_time) => running_time,
+ };
+
+ gst::trace!(CAT, obj: &stream.sinkpad, "Stream has running time {} queued", running_time);
+
+ if earliest_stream
+ .as_ref()
+ .map_or(true, |(_idx, _stream, earliest_running_time)| {
+ *earliest_running_time > running_time
+ })
+ {
+ earliest_stream = Some((idx, stream, running_time));
+ }
+ }
+
+ if !timeout && !all_have_data_or_eos {
+ gst::trace!(
+ CAT,
+ imp: self,
+ "No timeout and not all streams have a buffer or are EOS"
+ );
+ Ok(None)
+ } else if let Some((idx, stream, earliest_running_time)) = earliest_stream {
+ gst::trace!(
+ CAT,
+ imp: self,
+ "Stream {} is earliest stream with running time {}",
+ stream.sinkpad.name(),
+ earliest_running_time
+ );
+ Ok(Some((idx, stream)))
+ } else {
+ gst::trace!(CAT, imp: self, "No streams have data queued currently");
+ Ok(None)
+ }
+ }
+
+ // Queue incoming buffers as individual GOPs.
+ fn queue_gops(
+ &self,
+ _idx: usize,
+ stream: &mut Stream,
+ segment: &gst::FormattedSegment<gst::ClockTime>,
+ mut buffer: gst::Buffer,
+ ) -> Result<(), gst::FlowError> {
+ use gst::Signed::*;
+
+ assert!(!stream.fragment_filled);
+
+ gst::trace!(CAT, obj: &stream.sinkpad, "Handling buffer {:?}", buffer);
+
+ let intra_only = stream.intra_only;
+
+ if !intra_only && buffer.dts().is_none() {
+ gst::error!(CAT, obj: &stream.sinkpad, "Require DTS for video streams");
+ return Err(gst::FlowError::Error);
+ }
+
+ if intra_only && buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) {
+ gst::error!(CAT, obj: &stream.sinkpad, "Intra-only stream with delta units");
+ return Err(gst::FlowError::Error);
+ }
+
+ let pts_position = buffer.pts().ok_or_else(|| {
+ gst::error!(CAT, obj: &stream.sinkpad, "Require timestamped buffers");
+ gst::FlowError::Error
+ })?;
+ let duration = buffer.duration();
+ let end_pts_position = duration.opt_add(pts_position).unwrap_or(pts_position);
+
+ let mut pts = segment
+ .to_running_time_full(pts_position)
+ .ok_or_else(|| {
+ gst::error!(CAT, obj: &stream.sinkpad, "Couldn't convert PTS to running time");
+ gst::FlowError::Error
+ })?
+ .positive_or_else(|_| {
+ gst::error!(CAT, obj: &stream.sinkpad, "Negative PTSs are not supported");
+ gst::FlowError::Error
+ })?;
+
+ let mut end_pts = segment
+ .to_running_time_full(end_pts_position)
+ .ok_or_else(|| {
+ gst::error!(CAT, obj: &stream.sinkpad, "Couldn't convert end PTS to running time");
+ gst::FlowError::Error
+ })?
+ .positive_or_else(|_| {
+ gst::error!(CAT, obj: &stream.sinkpad, "Negative PTSs are not supported");
+ gst::FlowError::Error
+ })?;
+
+ // Enforce monotonically increasing PTS for intra-only streams
+ if intra_only {
+ if pts < stream.current_position {
+ gst::warning!(
+ CAT,
+ obj: &stream.sinkpad,
+ "Decreasing PTS {} < {} for intra-only stream",
+ pts,
+ stream.current_position,
+ );
+ pts = stream.current_position;
+ } else {
+ stream.current_position = pts;
+ }
+ end_pts = std::cmp::max(end_pts, pts);
+ }
+
+ let (dts_position, dts, end_dts) = if intra_only {
+ (None, None, None)
+ } else {
+ // Negative DTS are handled via the dts_offset and by having negative composition time
+ // offsets in the `trun` box. The smallest DTS here is shifted to zero.
+ let dts_position = buffer.dts().expect("not DTS");
+ let end_dts_position = duration.opt_add(dts_position).unwrap_or(dts_position);
+
+ let signed_dts = segment.to_running_time_full(dts_position).ok_or_else(|| {
+ gst::error!(CAT, obj: &stream.sinkpad, "Couldn't convert DTS to running time");
+ gst::FlowError::Error
+ })?;
+ let mut dts = match signed_dts {
+ Positive(dts) => {
+ if let Some(dts_offset) = stream.dts_offset {
+ dts + dts_offset
+ } else {
+ dts
+ }
+ }
+ Negative(dts) => {
+ if stream.dts_offset.is_none() {
+ stream.dts_offset = Some(dts);
+ }
+
+ let dts_offset = stream.dts_offset.unwrap();
+ if dts > dts_offset {
+ gst::warning!(CAT, obj: &stream.sinkpad, "DTS before first DTS");
+ gst::ClockTime::ZERO
+ } else {
+ dts_offset - dts
+ }
+ }
+ };
+
+ let signed_end_dts =
+ segment
+ .to_running_time_full(end_dts_position)
+ .ok_or_else(|| {
+ gst::error!(
+ CAT,
+ obj: &stream.sinkpad,
+ "Couldn't convert end DTS to running time"
+ );
+ gst::FlowError::Error
+ })?;
+ let mut end_dts = match signed_end_dts {
+ Positive(dts) => {
+ if let Some(dts_offset) = stream.dts_offset {
+ dts + dts_offset
+ } else {
+ dts
+ }
+ }
+ Negative(dts) => {
+ if stream.dts_offset.is_none() {
+ stream.dts_offset = Some(dts);
+ }
+
+ let dts_offset = stream.dts_offset.unwrap();
+ if dts > dts_offset {
+ gst::warning!(CAT, obj: &stream.sinkpad, "End DTS before first DTS");
+ gst::ClockTime::ZERO
+ } else {
+ dts_offset - dts
+ }
+ }
+ };
+
+ // Enforce monotonically increasing DTS for intra-only streams
+ // NOTE: PTS stays the same so this will cause a bigger PTS/DTS difference
+ // FIXME: Is this correct?
+ if dts < stream.current_position {
+ gst::warning!(
+ CAT,
+ obj: &stream.sinkpad,
+ "Decreasing DTS {} < {}",
+ dts,
+ stream.current_position,
+ );
+ dts = stream.current_position;
+ } else {
+ stream.current_position = dts;
+ }
+ end_dts = std::cmp::max(end_dts, dts);
+
+ (Some(dts_position), Some(dts), Some(end_dts))
+ };
+
+ // If this is a multi-stream element then we need to update the PTS/DTS positions according
+ // to the output segment, specifically to re-timestamp them with the running time and
+ // adjust for the segment shift to compensate for negative DTS.
+ let aggregator = self.instance();
+ let class = aggregator.class();
+ let (pts_position, dts_position) = if class.as_ref().variant.is_single_stream() {
+ (pts_position, dts_position)
+ } else {
+ let pts_position = pts + SEGMENT_OFFSET;
+ let dts_position = dts.map(|dts| {
+ dts + SEGMENT_OFFSET - stream.dts_offset.unwrap_or(gst::ClockTime::ZERO)
+ });
+
+ let buffer = buffer.make_mut();
+ buffer.set_pts(pts_position);
+ buffer.set_dts(dts_position);
+
+ (pts_position, dts_position)
+ };
+
+ if !buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) {
+ gst::debug!(
+ CAT,
+ obj: &stream.sinkpad,
+ "Starting new GOP at PTS {} DTS {} (DTS offset {})",
+ pts,
+ dts.display(),
+ stream.dts_offset.display(),
+ );
+
+ let gop = Gop {
+ start_pts: pts,
+ start_dts: dts,
+ start_dts_position: if intra_only { None } else { dts_position },
+ earliest_pts: pts,
+ earliest_pts_position: pts_position,
+ final_earliest_pts: intra_only,
+ end_pts,
+ end_dts,
+ final_end_pts: false,
+ buffers: vec![GopBuffer { buffer, pts, dts }],
+ };
+ stream.queued_gops.push_front(gop);
+
+ if let Some(prev_gop) = stream.queued_gops.get_mut(1) {
+ gst::debug!(
+ CAT,
+ obj: &stream.sinkpad,
+ "Updating previous GOP starting at PTS {} to end PTS {} DTS {}",
+ prev_gop.earliest_pts,
+ pts,
+ dts.display(),
+ );
+
+ prev_gop.end_pts = std::cmp::max(prev_gop.end_pts, pts);
+ prev_gop.end_dts = std::cmp::max(prev_gop.end_dts, dts);
+
+ if intra_only {
+ prev_gop.final_end_pts = true;
+ }
+
+ if !prev_gop.final_earliest_pts {
+ // Don't bother logging this for intra-only streams as it would be for every
+ // single buffer.
+ if !intra_only {
+ gst::debug!(
+ CAT,
+ obj: &stream.sinkpad,
+ "Previous GOP has final earliest PTS at {}",
+ prev_gop.earliest_pts
+ );
+ }
+
+ prev_gop.final_earliest_pts = true;
+ if let Some(prev_prev_gop) = stream.queued_gops.get_mut(2) {
+ prev_prev_gop.final_end_pts = true;
+ }
+ }
+ }
+ } else if let Some(gop) = stream.queued_gops.front_mut() {
+ assert!(!intra_only);
+
+ // We require DTS for non-intra-only streams
+ let dts = dts.unwrap();
+ let end_dts = end_dts.unwrap();
+
+ gop.end_pts = std::cmp::max(gop.end_pts, end_pts);
+ gop.end_dts = Some(std::cmp::max(gop.end_dts.expect("no end DTS"), end_dts));
+ gop.buffers.push(GopBuffer {
+ buffer,
+ pts,
+ dts: Some(dts),
+ });
+
+ if gop.earliest_pts > pts && !gop.final_earliest_pts {
+ gst::debug!(
+ CAT,
+ obj: &stream.sinkpad,
+ "Updating current GOP earliest PTS from {} to {}",
+ gop.earliest_pts,
+ pts
+ );
+ gop.earliest_pts = pts;
+ gop.earliest_pts_position = pts_position;
+
+ if let Some(prev_gop) = stream.queued_gops.get_mut(1) {
+ if prev_gop.end_pts < pts {
+ gst::debug!(
+ CAT,
+ obj: &stream.sinkpad,
+ "Updating previous GOP starting PTS {} end time from {} to {}",
+ pts,
+ prev_gop.end_pts,
+ pts
+ );
+ prev_gop.end_pts = pts;
+ }
+ }
+ }
+
+ let gop = stream.queued_gops.front_mut().unwrap();
+
+ // The earliest PTS is known when the current DTS is bigger or equal to the first
+ // PTS that was observed in this GOP. If there was another frame later that had a
+ // lower PTS then it wouldn't be possible to display it in time anymore, i.e. the
+ // stream would be invalid.
+ if gop.start_pts <= dts && !gop.final_earliest_pts {
+ gst::debug!(
+ CAT,
+ obj: &stream.sinkpad,
+ "GOP has final earliest PTS at {}",
+ gop.earliest_pts
+ );
+ gop.final_earliest_pts = true;
+
+ if let Some(prev_gop) = stream.queued_gops.get_mut(1) {
+ prev_gop.final_end_pts = true;
+ }
+ }
+ } else {
+ gst::warning!(
+ CAT,
+ obj: &stream.sinkpad,
+ "Waiting for keyframe at the beginning of the stream"
+ );
+ }
+
+ if let Some((prev_gop, first_gop)) = Option::zip(
+ stream.queued_gops.iter().find(|gop| gop.final_end_pts),
+ stream.queued_gops.back(),
+ ) {
+ gst::debug!(
+ CAT,
+ obj: &stream.sinkpad,
+ "Queued full GOPs duration updated to {}",
+ prev_gop.end_pts.saturating_sub(first_gop.earliest_pts),
+ );
+ }
+
+ gst::debug!(
+ CAT,
+ obj: &stream.sinkpad,
+ "Queued duration updated to {}",
+ Option::zip(stream.queued_gops.front(), stream.queued_gops.back())
+ .map(|(end, start)| end.end_pts.saturating_sub(start.start_pts))
+ .unwrap_or(gst::ClockTime::ZERO)
+ );
+
+ Ok(())
+ }
+
+ #[allow(clippy::type_complexity)]
+ fn drain_buffers(
+ &self,
+ state: &mut State,
+ settings: &Settings,
+ timeout: bool,
+ at_eos: bool,
+ ) -> Result<
+ (
+ // Drained streams
+ Vec<(
+ gst::Caps,
+ Option<super::FragmentTimingInfo>,
+ VecDeque<Buffer>,
+ )>,
+ // Minimum earliest PTS position of all streams
+ Option<gst::ClockTime>,
+ // Minimum earliest PTS of all streams
+ Option<gst::ClockTime>,
+ // Minimum start DTS position of all streams (if any stream has DTS)
+ Option<gst::ClockTime>,
+ // End PTS of this drained fragment, i.e. start PTS of the next fragment
+ Option<gst::ClockTime>,
+ ),
+ gst::FlowError,
+ > {
+ let mut drained_streams = Vec::with_capacity(state.streams.len());
+
+ let mut min_earliest_pts_position = None;
+ let mut min_earliest_pts = None;
+ let mut min_start_dts_position = None;
+ let mut fragment_end_pts = None;
+
+ // The first stream decides how much can be dequeued, if anything at all.
+ //
+ // All complete GOPs (or at EOS everything) up to the fragment duration will be dequeued
+ // but on timeout in live pipelines it might happen that the first stream does not have a
+ // complete GOP queued. In that case nothing is dequeued for any of the streams and the
+ // timeout is advanced by 1s until at least one complete GOP can be dequeued.
+ //
+ // If the first stream is already EOS then the next stream that is not EOS yet will be
+ // taken in its place.
+ let fragment_start_pts = state.fragment_start_pts.unwrap();
+ gst::info!(
+ CAT,
+ imp: self,
+ "Starting to drain at {}",
+ fragment_start_pts
+ );
+
+ for (idx, stream) in state.streams.iter_mut().enumerate() {
+ assert!(
+ timeout
+ || at_eos
+ || stream.sinkpad.is_eos()
+ || stream.queued_gops.get(1).map(|gop| gop.final_earliest_pts) == Some(true)
+ );
+
+ // Drain all complete GOPs until at most one fragment duration was dequeued for the
+ // first stream, or until the dequeued duration of the first stream.
+ let mut gops = Vec::with_capacity(stream.queued_gops.len());
+ let dequeue_end_pts =
+ fragment_end_pts.unwrap_or(fragment_start_pts + settings.fragment_duration);
+ gst::trace!(
+ CAT,
+ obj: &stream.sinkpad,
+ "Draining up to end PTS {} / duration {}",
+ dequeue_end_pts,
+ dequeue_end_pts - fragment_start_pts
+ );
+
+ while let Some(gop) = stream.queued_gops.back() {
+ // If this GOP is not complete then we can't pop it yet.
+ //
+ // If there was no complete GOP at all yet then it might be bigger than the
+ // fragment duration. In this case we might not be able to handle the latency
+ // requirements in a live pipeline.
+ if !gop.final_end_pts && !at_eos && !stream.sinkpad.is_eos() {
+ break;
+ }
+
+ // If this GOP starts after the fragment end then don't dequeue it yet unless this is
+ // the first stream and no GOPs were dequeued at all yet. This would mean that the
+ // GOP is bigger than the fragment duration.
+ if gop.end_pts > dequeue_end_pts && (fragment_end_pts.is_some() || !gops.is_empty())
+ {
+ break;
+ }
+
+ gops.push(stream.queued_gops.pop_back().unwrap());
+ }
+ stream.fragment_filled = false;
+
+ // If we don't have a next fragment start PTS then this is the first stream as above.
+ if fragment_end_pts.is_none() {
+ if let Some(last_gop) = gops.last() {
+ // Dequeued something so let's take the end PTS of the last GOP
+ fragment_end_pts = Some(last_gop.end_pts);
+ gst::info!(
+ CAT,
+ obj: &stream.sinkpad,
+ "Draining up to PTS {} for this fragment",
+ last_gop.end_pts,
+ );
+ } else {
+ // If nothing was dequeued for the first stream then this is OK if we're at
+ // EOS: we just consider the next stream as first stream then.
+ if at_eos || stream.sinkpad.is_eos() {
+ // This is handled below generally if nothing was dequeued
+ } else {
+ // Otherwise this can only really happen on timeout in live pipelines.
+ assert!(timeout);
+
+ gst::warning!(
+ CAT,
+ obj: &stream.sinkpad,
+ "Don't have a complete GOP for the first stream on timeout in a live pipeline",
+ );
+
+ // In this case we advance the timeout by 1s and hope that things are
+ // better then.
+ return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA);
+ }
+ }
+ }
+
+ if gops.is_empty() {
+ gst::info!(
+ CAT,
+ obj: &stream.sinkpad,
+ "Draining no buffers",
+ );
+
+ drained_streams.push((stream.caps.clone(), None, VecDeque::new()));
+ continue;
+ }
+
+ assert!(fragment_end_pts.is_some());
+
+ let first_gop = gops.first().unwrap();
+ let last_gop = gops.last().unwrap();
+ let earliest_pts = first_gop.earliest_pts;
+ let earliest_pts_position = first_gop.earliest_pts_position;
+ let start_dts = first_gop.start_dts;
+ let start_dts_position = first_gop.start_dts_position;
+ let end_pts = last_gop.end_pts;
+ let dts_offset = stream.dts_offset;
+
+ if min_earliest_pts.opt_gt(earliest_pts).unwrap_or(true) {
+ min_earliest_pts = Some(earliest_pts);
+ }
+ if min_earliest_pts_position
+ .opt_gt(earliest_pts_position)
+ .unwrap_or(true)
+ {
+ min_earliest_pts_position = Some(earliest_pts_position);
+ }
+ if let Some(start_dts_position) = start_dts_position {
+ if min_start_dts_position
+ .opt_gt(start_dts_position)
+ .unwrap_or(true)
+ {
+ min_start_dts_position = Some(start_dts_position);
+ }
+ }
+
+ gst::info!(
+ CAT,
+ obj: &stream.sinkpad,
+ "Draining {} worth of buffers starting at PTS {} DTS {}, DTS offset {}",
+ end_pts.saturating_sub(earliest_pts),
+ earliest_pts,
+ start_dts.display(),
+ dts_offset.display(),
+ );
+
+ if let Some((prev_gop, first_gop)) = Option::zip(
+ stream.queued_gops.iter().find(|gop| gop.final_end_pts),
+ stream.queued_gops.back(),
+ ) {
+ gst::debug!(
+ CAT,
+ obj: &stream.sinkpad,
+ "Queued full GOPs duration updated to {}",
+ prev_gop.end_pts.saturating_sub(first_gop.earliest_pts),
+ );
+ }
+
+ gst::debug!(
+ CAT,
+ obj: &stream.sinkpad,
+ "Queued duration updated to {}",
+ Option::zip(stream.queued_gops.front(), stream.queued_gops.back())
+ .map(|(end, start)| end.end_pts.saturating_sub(start.start_pts))
+ .unwrap_or(gst::ClockTime::ZERO)
+ );
+
+ let start_time = if stream.intra_only {
+ earliest_pts
+ } else {
+ start_dts.unwrap()
+ };
+
+ let mut buffers = VecDeque::with_capacity(gops.iter().map(|g| g.buffers.len()).sum());
+
+ for gop in gops {
+ let mut gop_buffers = gop.buffers.into_iter().peekable();
+ while let Some(buffer) = gop_buffers.next() {
+ let timestamp = if stream.intra_only {
+ buffer.pts
+ } else {
+ buffer.dts.unwrap()
+ };
+
+ let end_timestamp = match gop_buffers.peek() {
+ Some(buffer) => {
+ if stream.intra_only {
+ buffer.pts
+ } else {
+ buffer.dts.unwrap()
+ }
+ }
+ None => {
+ if stream.intra_only {
+ gop.end_pts
+ } else {
+ gop.end_dts.unwrap()
+ }
+ }
+ };
+
+ // Timestamps are enforced to monotonically increase when queueing buffers
+ let duration = end_timestamp
+ .checked_sub(timestamp)
+ .expect("Timestamps going backwards");
+
+ let composition_time_offset = if stream.intra_only {
+ None
+ } else {
+ let pts = buffer.pts;
+ let dts = buffer.dts.unwrap();
+
+ if pts > dts {
+ Some(
+ i64::try_from((pts - dts).nseconds())
+ .map_err(|_| {
+ gst::error!(CAT, obj: &stream.sinkpad, "Too big PTS/DTS difference");
+ gst::FlowError::Error
+ })?,
+ )
+ } else {
+ let diff = i64::try_from((dts - pts).nseconds())
+ .map_err(|_| {
+ gst::error!(CAT, obj: &stream.sinkpad, "Too big PTS/DTS difference");
+ gst::FlowError::Error
+ })?;
+ Some(-diff)
+ }
+ };
+
+ buffers.push_back(Buffer {
+ idx,
+ buffer: buffer.buffer,
+ timestamp,
+ duration,
+ composition_time_offset,
+ });
+ }
+ }
+
+ drained_streams.push((
+ stream.caps.clone(),
+ Some(super::FragmentTimingInfo {
+ start_time,
+ intra_only: stream.intra_only,
+ }),
+ buffers,
+ ));
+ }
+
+ Ok((
+ drained_streams,
+ min_earliest_pts_position,
+ min_earliest_pts,
+ min_start_dts_position,
+ fragment_end_pts,
+ ))
+ }
+
+ fn preprocess_drained_streams_onvif(
+ &self,
+ state: &mut State,
+ drained_streams: &mut [(
+ gst::Caps,
+ Option<super::FragmentTimingInfo>,
+ VecDeque<Buffer>,
+ )],
+ ) -> Result<Option<gst::ClockTime>, gst::FlowError> {
+ let aggregator = self.instance();
+ if aggregator.class().as_ref().variant != super::Variant::ONVIF {
+ return Ok(None);
+ }
+
+ let mut max_end_utc_time = None;
+
+ let calculate_pts = |buffer: &Buffer| -> gst::ClockTime {
+ let composition_time_offset = buffer.composition_time_offset.unwrap_or(0);
+ if composition_time_offset > 0 {
+ buffer.timestamp + (composition_time_offset as u64).nseconds()
+ } else {
+ buffer
+ .timestamp
+ .checked_sub(((-composition_time_offset) as u64).nseconds())
+ .unwrap()
+ }
+ };
+
+ // If this is the first fragment then allow the first buffers to not have a reference
+ // timestamp meta and backdate them
+ if state.stream_header.is_none() {
+ for (idx, (_, _, drain_buffers)) in drained_streams.iter_mut().enumerate() {
+ let (buffer_idx, utc_time, buffer) =
+ match drain_buffers.iter().enumerate().find_map(|(idx, buffer)| {
+ get_utc_time_from_buffer(&buffer.buffer)
+ .map(|timestamp| (idx, timestamp, buffer))
+ }) {
+ None => {
+ gst::error!(
+ CAT,
+ obj: &state.streams[idx].sinkpad,
+ "No reference timestamp set on any buffers in the first fragment",
+ );
+ return Err(gst::FlowError::Error);
+ }
+ Some(res) => res,
+ };
+
+ // Now do the backdating
+ if buffer_idx > 0 {
+ let utc_time_pts = calculate_pts(buffer);
+
+ for buffer in drain_buffers.iter_mut().take(buffer_idx) {
+ let buffer_pts = calculate_pts(buffer);
+ let buffer_pts_diff = if utc_time_pts >= buffer_pts {
+ (utc_time_pts - buffer_pts).nseconds() as i64
+ } else {
+ -((buffer_pts - utc_time_pts).nseconds() as i64)
+ };
+ let buffer_utc_time = if buffer_pts_diff >= 0 {
+ utc_time
+ .checked_sub((buffer_pts_diff as u64).nseconds())
+ .unwrap()
+ } else {
+ utc_time
+ .checked_add(((-buffer_pts_diff) as u64).nseconds())
+ .unwrap()
+ };
+
+ let buffer = buffer.buffer.make_mut();
+ gst::ReferenceTimestampMeta::add(
+ buffer,
+ &UNIX_CAPS,
+ buffer_utc_time,
+ gst::ClockTime::NONE,
+ );
+ }
+ }
+ }
+ }
+
+ // Calculate the minimum across all streams and remember that
+ if state.start_utc_time.is_none() {
+ let mut start_utc_time = None;
+
+ for (idx, (_, _, drain_buffers)) in drained_streams.iter().enumerate() {
+ for buffer in drain_buffers {
+ let utc_time = match get_utc_time_from_buffer(&buffer.buffer) {
+ None => {
+ gst::error!(
+ CAT,
+ obj: &state.streams[idx].sinkpad,
+ "No reference timestamp set on all buffers"
+ );
+ return Err(gst::FlowError::Error);
+ }
+ Some(utc_time) => utc_time,
+ };
+
+ if start_utc_time.is_none() || start_utc_time > Some(utc_time) {
+ start_utc_time = Some(utc_time);
+ }
+ }
+ }
+
+ gst::debug!(
+ CAT,
+ imp: self,
+ "Configuring start UTC time {}",
+ start_utc_time.unwrap()
+ );
+ state.start_utc_time = start_utc_time;
+ }
+
+ // Update all buffer timestamps based on the UTC time and offset to the start UTC time
+ let start_utc_time = state.start_utc_time.unwrap();
+ for (idx, (_, timing_info, drain_buffers)) in drained_streams.iter_mut().enumerate() {
+ let mut start_time = None;
+
+ for buffer in drain_buffers.iter_mut() {
+ let utc_time = match get_utc_time_from_buffer(&buffer.buffer) {
+ None => {
+ gst::error!(
+ CAT,
+ obj: &state.streams[idx].sinkpad,
+ "No reference timestamp set on all buffers"
+ );
+ return Err(gst::FlowError::Error);
+ }
+ Some(utc_time) => utc_time,
+ };
+
+ // Convert PTS UTC time to DTS
+ let mut utc_time_dts =
+ if let Some(composition_time_offset) = buffer.composition_time_offset {
+ if composition_time_offset >= 0 {
+ utc_time
+ .checked_sub((composition_time_offset as u64).nseconds())
+ .unwrap()
+ } else {
+ utc_time
+ .checked_add(((-composition_time_offset) as u64).nseconds())
+ .unwrap()
+ }
+ } else {
+ utc_time
+ };
+
+ // Enforce monotonically increasing timestamps
+ if utc_time_dts < state.streams[idx].current_utc_time {
+ gst::warning!(
+ CAT,
+ obj: &state.streams[idx].sinkpad,
+ "Decreasing UTC DTS timestamp for buffer {} < {}",
+ utc_time_dts,
+ state.streams[idx].current_utc_time,
+ );
+ utc_time_dts = state.streams[idx].current_utc_time;
+ } else {
+ state.streams[idx].current_utc_time = utc_time_dts;
+ }
+
+ let timestamp = utc_time_dts.checked_sub(start_utc_time).unwrap();
+
+ gst::trace!(
+ CAT,
+ obj: &state.streams[idx].sinkpad,
+ "Updating buffer timestamp from {} to relative UTC DTS time {} / absolute DTS time {}, UTC PTS time {}",
+ buffer.timestamp,
+ timestamp,
+ utc_time_dts,
+ utc_time,
+ );
+
+ buffer.timestamp = timestamp;
+ if start_time.is_none() || start_time > Some(buffer.timestamp) {
+ start_time = Some(buffer.timestamp);
+ }
+ }
+
+ // Update durations for all buffers except for the last in the fragment unless all
+ // have the same duration anyway
+ let mut common_duration = Ok(None);
+ let mut drain_buffers_iter = drain_buffers.iter_mut().peekable();
+ while let Some(buffer) = drain_buffers_iter.next() {
+ let next_timestamp = drain_buffers_iter.peek().map(|b| b.timestamp);
+
+ if let Some(next_timestamp) = next_timestamp {
+ let duration = next_timestamp.saturating_sub(buffer.timestamp);
+ if common_duration == Ok(None) {
+ common_duration = Ok(Some(duration));
+ } else if common_duration != Ok(Some(duration)) {
+ common_duration = Err(());
+ }
+
+ gst::trace!(
+ CAT,
+ obj: &state.streams[idx].sinkpad,
+ "Updating buffer with timestamp {} duration from {} to relative UTC duration {}",
+ buffer.timestamp,
+ buffer.duration,
+ duration,
+ );
+
+ buffer.duration = duration;
+ } else if let Ok(Some(common_duration)) = common_duration {
+ gst::trace!(
+ CAT,
+ obj: &state.streams[idx].sinkpad,
+ "Updating last buffer with timestamp {} duration from {} to common relative UTC duration {}",
+ buffer.timestamp,
+ buffer.duration,
+ common_duration,
+ );
+
+ buffer.duration = common_duration;
+ } else {
+ gst::trace!(
+ CAT,
+ obj: &state.streams[idx].sinkpad,
+ "Keeping last buffer with timestamp {} duration at {}",
+ buffer.timestamp,
+ buffer.duration,
+ );
+ }
+
+ let end_utc_time = start_utc_time + buffer.timestamp + buffer.duration;
+ if max_end_utc_time.is_none() || max_end_utc_time < Some(end_utc_time) {
+ max_end_utc_time = Some(end_utc_time);
+ }
+ }
+
+ if let Some(start_time) = start_time {
+ gst::debug!(CAT, obj: &state.streams[idx].sinkpad, "Fragment starting at UTC time {}", start_time);
+ timing_info.as_mut().unwrap().start_time = start_time;
+ } else {
+ assert!(timing_info.is_none());
+ }
+ }
+
+ Ok(max_end_utc_time)
+ }
+
+ #[allow(clippy::type_complexity)]
+ fn interleave_buffers(
+ &self,
+ settings: &Settings,
+ mut drained_streams: Vec<(
+ gst::Caps,
+ Option<super::FragmentTimingInfo>,
+ VecDeque<Buffer>,
+ )>,
+ ) -> Result<
+ (
+ Vec<Buffer>,
+ Vec<(gst::Caps, Option<super::FragmentTimingInfo>)>,
+ ),
+ gst::FlowError,
+ > {
+ let mut interleaved_buffers =
+ Vec::with_capacity(drained_streams.iter().map(|(_, _, bufs)| bufs.len()).sum());
+ while let Some((_idx, (_, _, bufs))) = drained_streams.iter_mut().enumerate().min_by(
+ |(a_idx, (_, _, a)), (b_idx, (_, _, b))| {
+ let (a, b) = match (a.front(), b.front()) {
+ (None, None) => return std::cmp::Ordering::Equal,
+ (None, _) => return std::cmp::Ordering::Greater,
+ (_, None) => return std::cmp::Ordering::Less,
+ (Some(a), Some(b)) => (a, b),
+ };
+
+ match a.timestamp.cmp(&b.timestamp) {
+ std::cmp::Ordering::Equal => a_idx.cmp(b_idx),
+ cmp => cmp,
+ }
+ },
+ ) {
+ let start_time = match bufs.front() {
+ None => {
+ // No more buffers now
+ break;
+ }
+ Some(buf) => buf.timestamp,
+ };
+ let mut current_end_time = start_time;
+ let mut dequeued_bytes = 0;
+
+ while settings
+ .interleave_bytes
+ .opt_ge(dequeued_bytes)
+ .unwrap_or(true)
+ && settings
+ .interleave_time
+ .opt_ge(current_end_time.saturating_sub(start_time))
+ .unwrap_or(true)
+ {
+ if let Some(buffer) = bufs.pop_front() {
+ current_end_time = buffer.timestamp + buffer.duration;
+ dequeued_bytes += buffer.buffer.size() as u64;
+ interleaved_buffers.push(buffer);
+ } else {
+ // No buffers left in this stream, go to next stream
+ break;
+ }
+ }
+ }
+
+ // All buffers should be consumed now
+ assert!(drained_streams.iter().all(|(_, _, bufs)| bufs.is_empty()));
+
+ let streams = drained_streams
+ .into_iter()
+ .map(|(caps, timing_info, _)| (caps, timing_info))
+ .collect::<Vec<_>>();
+
+ Ok((interleaved_buffers, streams))
+ }
+
+ fn drain(
+ &self,
+ state: &mut State,
+ settings: &Settings,
+ timeout: bool,
+ at_eos: bool,
+ upstream_events: &mut Vec<(gst_base::AggregatorPad, gst::Event)>,
+ ) -> Result<(Option<gst::Caps>, Option<gst::BufferList>), gst::FlowError> {
+ if at_eos {
+ gst::info!(CAT, imp: self, "Draining at EOS");
+ } else if timeout {
+ gst::info!(CAT, imp: self, "Draining at timeout");
+ } else {
+ for stream in &state.streams {
+ if !stream.fragment_filled && !stream.sinkpad.is_eos() {
+ return Ok((None, None));
+ }
+ }
+ }
+
+ // Collect all buffers and their timing information that are to be drained right now.
+ let (
+ mut drained_streams,
+ min_earliest_pts_position,
+ min_earliest_pts,
+ min_start_dts_position,
+ fragment_end_pts,
+ ) = self.drain_buffers(state, settings, timeout, at_eos)?;
+
+ // Remove all GAP buffers before processing them further
+ for (_, _, buffers) in &mut drained_streams {
+ buffers.retain(|buf| {
+ !buf.buffer.flags().contains(gst::BufferFlags::GAP)
+ || !buf.buffer.flags().contains(gst::BufferFlags::DROPPABLE)
+ || buf.buffer.size() != 0
+ });
+ }
+
+ // For ONVIF, replace all timestamps with timestamps based on UTC times.
+ let max_end_utc_time =
+ self.preprocess_drained_streams_onvif(state, &mut drained_streams)?;
+
+ // Create header now if it was not created before and return the caps
+ let mut caps = None;
+ if state.stream_header.is_none() {
+ let (_, new_caps) = self.update_header(state, settings, false)?.unwrap();
+ caps = Some(new_caps);
+ }
+
+ // Interleave buffers according to the settings into a single vec
+ let (mut interleaved_buffers, streams) =
+ self.interleave_buffers(settings, drained_streams)?;
+
+ let mut buffer_list = None;
+ if interleaved_buffers.is_empty() {
+ assert!(at_eos);
+ } else {
+ // If there are actual buffers to output then create headers as needed and create a
+ // bufferlist for all buffers that have to be output.
+ let min_earliest_pts_position = min_earliest_pts_position.unwrap();
+ let min_earliest_pts = min_earliest_pts.unwrap();
+ let fragment_end_pts = fragment_end_pts.unwrap();
+
+ let mut fmp4_header = None;
+ if !state.sent_headers {
+ let mut buffer = state.stream_header.as_ref().unwrap().copy();
+ {
+ let buffer = buffer.get_mut().unwrap();
+
+ buffer.set_pts(min_earliest_pts_position);
+ buffer.set_dts(min_start_dts_position);
+
+ // Header is DISCONT|HEADER
+ buffer.set_flags(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER);
+ }
+
+ fmp4_header = Some(buffer);
+
+ state.sent_headers = true;
+ }
+
+ // TODO: Write prft boxes before moof
+ // TODO: Write sidx boxes before moof and rewrite once offsets are known
+
+ if state.sequence_number == 0 {
+ state.sequence_number = 1;
+ }
+ let sequence_number = state.sequence_number;
+ state.sequence_number += 1;
+ let (mut fmp4_fragment_header, moof_offset) =
+ boxes::create_fmp4_fragment_header(super::FragmentHeaderConfiguration {
+ variant: self.instance().class().as_ref().variant,
+ sequence_number,
+ streams: streams.as_slice(),
+ buffers: interleaved_buffers.as_slice(),
+ })
+ .map_err(|err| {
+ gst::error!(
+ CAT,
+ imp: self,
+ "Failed to create FMP4 fragment header: {}",
+ err
+ );
+ gst::FlowError::Error
+ })?;
+
+ {
+ let buffer = fmp4_fragment_header.get_mut().unwrap();
+ buffer.set_pts(min_earliest_pts_position);
+ buffer.set_dts(min_start_dts_position);
+ buffer.set_duration(fragment_end_pts.checked_sub(min_earliest_pts));
+
+ // Fragment header is HEADER
+ buffer.set_flags(gst::BufferFlags::HEADER);
+
+ // Copy metas from the first actual buffer to the fragment header. This allows
+ // getting things like the reference timestamp meta or the timecode meta to identify
+ // the fragment.
+ let _ = interleaved_buffers[0].buffer.copy_into(
+ buffer,
+ gst::BufferCopyFlags::META,
+ 0,
+ None,
+ );
+ }
+
+ let moof_offset = state.current_offset
+ + fmp4_header.as_ref().map(|h| h.size()).unwrap_or(0) as u64
+ + moof_offset;
+
+ let buffers_len = interleaved_buffers.len();
+ for (idx, buffer) in interleaved_buffers.iter_mut().enumerate() {
+ // Fix up buffer flags, all other buffers are DELTA_UNIT
+ let buffer_ref = buffer.buffer.make_mut();
+ buffer_ref.unset_flags(gst::BufferFlags::all());
+ buffer_ref.set_flags(gst::BufferFlags::DELTA_UNIT);
+
+ // Set the marker flag for the last buffer of the segment
+ if idx == buffers_len - 1 {
+ buffer_ref.set_flags(gst::BufferFlags::MARKER);
+ }
+ }
+
+ buffer_list = Some(
+ fmp4_header
+ .into_iter()
+ .chain(Some(fmp4_fragment_header))
+ .chain(interleaved_buffers.into_iter().map(|buffer| buffer.buffer))
+ .inspect(|b| {
+ state.current_offset += b.size() as u64;
+ })
+ .collect::<gst::BufferList>(),
+ );
+
+ // Write mfra only for the main stream, and if there are no buffers for the main stream
+ // in this segment then don't write anything.
+ if let Some((_caps, Some(ref timing_info))) = streams.get(0) {
+ state.fragment_offsets.push(super::FragmentOffset {
+ time: timing_info.start_time,
+ offset: moof_offset,
+ });
+ }
+
+ state.end_pts = Some(fragment_end_pts);
+ state.end_utc_time = max_end_utc_time;
+
+ // Update for the start PTS of the next fragment
+ gst::info!(
+ CAT,
+ imp: self,
+ "Starting new fragment at {}",
+ fragment_end_pts,
+ );
+ state.fragment_start_pts = Some(fragment_end_pts);
+
+ gst::debug!(
+ CAT,
+ imp: self,
+ "Sending force-keyunit events for running time {}",
+ fragment_end_pts + settings.fragment_duration,
+ );
+
+ let fku = gst_video::UpstreamForceKeyUnitEvent::builder()
+ .running_time(fragment_end_pts + settings.fragment_duration)
+ .all_headers(true)
+ .build();
+
+ for stream in &state.streams {
+ upstream_events.push((stream.sinkpad.clone(), fku.clone()));
+ }
+
+ // Reset timeout delay now that we've output an actual fragment
+ state.timeout_delay = gst::ClockTime::ZERO;
+ }
+
+ if settings.write_mfra && at_eos {
+ match boxes::create_mfra(&streams[0].0, &state.fragment_offsets) {
+ Ok(mut mfra) => {
+ {
+ let mfra = mfra.get_mut().unwrap();
+ // mfra is HEADER|DELTA_UNIT like other boxes
+ mfra.set_flags(gst::BufferFlags::HEADER | gst::BufferFlags::DELTA_UNIT);
+ }
+
+ if buffer_list.is_none() {
+ buffer_list = Some(gst::BufferList::new_sized(1));
+ }
+ buffer_list.as_mut().unwrap().get_mut().unwrap().add(mfra);
+ }
+ Err(err) => {
+ gst::error!(CAT, imp: self, "Failed to create mfra box: {}", err);
+ }
+ }
+ }
+
+ // TODO: Write edit list at EOS
+ // TODO: Rewrite bitrates at EOS
+
+ Ok((caps, buffer_list))
+ }
+
+ fn create_streams(&self, state: &mut State) -> Result<(), gst::FlowError> {
+ for pad in self
+ .instance()
+ .sink_pads()
+ .into_iter()
+ .map(|pad| pad.downcast::<gst_base::AggregatorPad>().unwrap())
+ {
+ let caps = match pad.current_caps() {
+ Some(caps) => caps,
+ None => {
+ gst::warning!(CAT, obj: &pad, "Skipping pad without caps");
+ continue;
+ }
+ };
+
+ gst::info!(CAT, obj: &pad, "Configuring caps {:?}", caps);
+
+ let s = caps.structure(0).unwrap();
+
+ let mut intra_only = false;
+ match s.name() {
+ "video/x-h264" | "video/x-h265" => {
+ if !s.has_field_with_type("codec_data", gst::Buffer::static_type()) {
+ gst::error!(CAT, obj: &pad, "Received caps without codec_data");
+ return Err(gst::FlowError::NotNegotiated);
+ }
+ }
+ "image/jpeg" => {
+ intra_only = true;
+ }
+ "audio/mpeg" => {
+ if !s.has_field_with_type("codec_data", gst::Buffer::static_type()) {
+ gst::error!(CAT, obj: &pad, "Received caps without codec_data");
+ return Err(gst::FlowError::NotNegotiated);
+ }
+ intra_only = true;
+ }
+ "audio/x-alaw" | "audio/x-mulaw" => {
+ intra_only = true;
+ }
+ "audio/x-adpcm" => {
+ intra_only = true;
+ }
+ "application/x-onvif-metadata" => {
+ intra_only = true;
+ }
+ _ => unreachable!(),
+ }
+
+ state.streams.push(Stream {
+ sinkpad: pad,
+ caps,
+ intra_only,
+ queued_gops: VecDeque::new(),
+ fragment_filled: false,
+ dts_offset: None,
+ current_position: gst::ClockTime::ZERO,
+ current_utc_time: gst::ClockTime::ZERO,
+ });
+ }
+
+ if state.streams.is_empty() {
+ gst::error!(CAT, imp: self, "No streams available");
+ return Err(gst::FlowError::Error);
+ }
+
+ // Sort video streams first and then audio streams and then metadata streams, and each group by pad name.
+ state.streams.sort_by(|a, b| {
+ let order_of_caps = |caps: &gst::CapsRef| {
+ let s = caps.structure(0).unwrap();
+
+ if s.name().starts_with("video/") {
+ 0
+ } else if s.name().starts_with("audio/") {
+ 1
+ } else if s.name().starts_with("application/x-onvif-metadata") {
+ 2
+ } else {
+ unimplemented!();
+ }
+ };
+
+ let st_a = order_of_caps(&a.caps);
+ let st_b = order_of_caps(&b.caps);
+
+ if st_a == st_b {
+ return a.sinkpad.name().cmp(&b.sinkpad.name());
+ }
+
+ st_a.cmp(&st_b)
+ });
+
+ Ok(())
+ }
+
+ fn update_header(
+ &self,
+ state: &mut State,
+ settings: &Settings,
+ at_eos: bool,
+ ) -> Result<Option<(gst::BufferList, gst::Caps)>, gst::FlowError> {
+ let aggregator = self.instance();
+ let class = aggregator.class();
+ let variant = class.as_ref().variant;
+
+ if settings.header_update_mode == super::HeaderUpdateMode::None && at_eos {
+ return Ok(None);
+ }
+
+ assert!(!at_eos || state.streams.iter().all(|s| s.queued_gops.is_empty()));
+
+ let duration = if variant == super::Variant::ONVIF {
+ state
+ .end_utc_time
+ .opt_checked_sub(state.start_utc_time)
+ .ok()
+ .flatten()
+ } else {
+ state
+ .end_pts
+ .opt_checked_sub(state.earliest_pts)
+ .ok()
+ .flatten()
+ };
+
+ let streams = state
+ .streams
+ .iter()
+ .map(|s| s.caps.clone())
+ .collect::<Vec<_>>();
+
+ let mut buffer = boxes::create_fmp4_header(super::HeaderConfiguration {
+ variant,
+ update: at_eos,
+ streams: streams.as_slice(),
+ write_mehd: settings.write_mehd,
+ duration: if at_eos { duration } else { None },
+ start_utc_time: state
+ .start_utc_time
+ .map(|unix| unix.nseconds() / 100 + UNIX_1601_OFFSET * 10_000_000),
+ })
+ .map_err(|err| {
+ gst::error!(CAT, imp: self, "Failed to create FMP4 header: {}", err);
+ gst::FlowError::Error
+ })?;
+
+ {
+ let buffer = buffer.get_mut().unwrap();
+
+ // No timestamps
+
+ // Header is DISCONT|HEADER
+ buffer.set_flags(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER);
+ }
+
+ // Remember stream header for later
+ state.stream_header = Some(buffer.clone());
+
+ let variant = match variant {
+ super::Variant::ISO | super::Variant::DASH | super::Variant::ONVIF => "iso-fragmented",
+ super::Variant::CMAF => "cmaf",
+ };
+ let caps = gst::Caps::builder("video/quicktime")
+ .field("variant", variant)
+ .field("streamheader", gst::Array::new(&[&buffer]))
+ .build();
+
+ let mut list = gst::BufferList::new_sized(1);
+ {
+ let list = list.get_mut().unwrap();
+ list.add(buffer);
+ }
+
+ Ok(Some((list, caps)))
+ }
+}
+
+#[glib::object_subclass]
+impl ObjectSubclass for FMP4Mux {
+ const NAME: &'static str = "GstFMP4Mux";
+ type Type = super::FMP4Mux;
+ type ParentType = gst_base::Aggregator;
+ type Class = Class;
+}
+
+impl ObjectImpl for FMP4Mux {
+ fn properties() -> &'static [glib::ParamSpec] {
+ static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
+ vec![
+ // TODO: Add chunk-duration property separate from fragment-size
+ glib::ParamSpecUInt64::builder("fragment-duration")
+ .nick("Fragment Duration")
+ .blurb("Duration for each FMP4 fragment")
+ .default_value(DEFAULT_FRAGMENT_DURATION.nseconds())
+ .mutable_ready()
+ .build(),
+ glib::ParamSpecEnum::builder::<super::HeaderUpdateMode>("header-update-mode", DEFAULT_HEADER_UPDATE_MODE)
+ .nick("Header update mode")
+ .blurb("Mode for updating the header at the end of the stream")
+ .mutable_ready()
+ .build(),
+ glib::ParamSpecBoolean::builder("write-mfra")
+ .nick("Write mfra box")
+ .blurb("Write fragment random access box at the end of the stream")
+ .default_value(DEFAULT_WRITE_MFRA)
+ .mutable_ready()
+ .build(),
+ glib::ParamSpecBoolean::builder("write-mehd")
+ .nick("Write mehd box")
+ .blurb("Write movie extends header box with the duration at the end of the stream (needs a header-update-mode enabled)")
+ .default_value(DEFAULT_WRITE_MFRA)
+ .mutable_ready()
+ .build(),
+ glib::ParamSpecUInt64::builder("interleave-bytes")
+ .nick("Interleave Bytes")
+ .blurb("Interleave between streams in bytes")
+ .default_value(DEFAULT_INTERLEAVE_BYTES.unwrap_or(0))
+ .mutable_ready()
+ .build(),
+ glib::ParamSpecUInt64::builder("interleave-time")
+ .nick("Interleave Time")
+ .blurb("Interleave between streams in nanoseconds")
+ .default_value(DEFAULT_INTERLEAVE_TIME.map(gst::ClockTime::nseconds).unwrap_or(u64::MAX))
+ .mutable_ready()
+ .build(),
+ ]
+ });
+
+ &*PROPERTIES
+ }
+
+ fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
+ match pspec.name() {
+ "fragment-duration" => {
+ let mut settings = self.settings.lock().unwrap();
+ let fragment_duration = value.get().expect("type checked upstream");
+ if settings.fragment_duration != fragment_duration {
+ settings.fragment_duration = fragment_duration;
+ drop(settings);
+ self.instance().set_latency(fragment_duration, None);
+ }
+ }
+
+ "header-update-mode" => {
+ let mut settings = self.settings.lock().unwrap();
+ settings.header_update_mode = value.get().expect("type checked upstream");
+ }
+
+ "write-mfra" => {
+ let mut settings = self.settings.lock().unwrap();
+ settings.write_mfra = value.get().expect("type checked upstream");
+ }
+
+ "write-mehd" => {
+ let mut settings = self.settings.lock().unwrap();
+ settings.write_mehd = value.get().expect("type checked upstream");
+ }
+
+ "interleave-bytes" => {
+ let mut settings = self.settings.lock().unwrap();
+ settings.interleave_bytes = match value.get().expect("type checked upstream") {
+ 0 => None,
+ v => Some(v),
+ };
+ }
+
+ "interleave-time" => {
+ let mut settings = self.settings.lock().unwrap();
+ settings.interleave_time = match value.get().expect("type checked upstream") {
+ Some(gst::ClockTime::ZERO) | None => None,
+ v => v,
+ };
+ }
+
+ _ => unimplemented!(),
+ }
+ }
+
+ fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
+ match pspec.name() {
+ "fragment-duration" => {
+ let settings = self.settings.lock().unwrap();
+ settings.fragment_duration.to_value()
+ }
+
+ "header-update-mode" => {
+ let settings = self.settings.lock().unwrap();
+ settings.header_update_mode.to_value()
+ }
+
+ "write-mfra" => {
+ let settings = self.settings.lock().unwrap();
+ settings.write_mfra.to_value()
+ }
+
+ "write-mehd" => {
+ let settings = self.settings.lock().unwrap();
+ settings.write_mehd.to_value()
+ }
+
+ "interleave-bytes" => {
+ let settings = self.settings.lock().unwrap();
+ settings.interleave_bytes.unwrap_or(0).to_value()
+ }
+
+ "interleave-time" => {
+ let settings = self.settings.lock().unwrap();
+ settings.interleave_time.to_value()
+ }
+
+ _ => unimplemented!(),
+ }
+ }
+
+ fn constructed(&self) {
+ self.parent_constructed();
+
+ let obj = self.instance();
+ let class = obj.class();
+ for templ in class.pad_template_list().filter(|templ| {
+ templ.presence() == gst::PadPresence::Always
+ && templ.direction() == gst::PadDirection::Sink
+ }) {
+ let sinkpad =
+ gst::PadBuilder::<gst_base::AggregatorPad>::from_template(&templ, Some("sink"))
+ .flags(gst::PadFlags::ACCEPT_INTERSECT)
+ .build();
+
+ obj.add_pad(&sinkpad).unwrap();
+ }
+
+ obj.set_latency(Settings::default().fragment_duration, None);
+ }
+}
+
+impl GstObjectImpl for FMP4Mux {}
+
+impl ElementImpl for FMP4Mux {
+ fn request_new_pad(
+ &self,
+ templ: &gst::PadTemplate,
+ name: Option<&str>,
+ caps: Option<&gst::Caps>,
+ ) -> Option<gst::Pad> {
+ let state = self.state.lock().unwrap();
+ if state.stream_header.is_some() {
+ gst::error!(
+ CAT,
+ imp: self,
+ "Can't request new pads after header was generated"
+ );
+ return None;
+ }
+
+ self.parent_request_new_pad(templ, name, caps)
+ }
+}
+
+impl AggregatorImpl for FMP4Mux {
+ fn next_time(&self) -> Option<gst::ClockTime> {
+ let state = self.state.lock().unwrap();
+ state.fragment_start_pts.opt_add(state.timeout_delay)
+ }
+
+ fn sink_query(
+ &self,
+ aggregator_pad: &gst_base::AggregatorPad,
+ query: &mut gst::QueryRef,
+ ) -> bool {
+ use gst::QueryViewMut;
+
+ gst::trace!(CAT, obj: aggregator_pad, "Handling query {:?}", query);
+
+ match query.view_mut() {
+ QueryViewMut::Caps(q) => {
+ let allowed_caps = aggregator_pad
+ .current_caps()
+ .unwrap_or_else(|| aggregator_pad.pad_template_caps());
+
+ if let Some(filter_caps) = q.filter() {
+ let res = filter_caps
+ .intersect_with_mode(&allowed_caps, gst::CapsIntersectMode::First);
+ q.set_result(&res);
+ } else {
+ q.set_result(&allowed_caps);
+ }
+
+ true
+ }
+ _ => self.parent_sink_query(aggregator_pad, query),
+ }
+ }
+
+ fn sink_event_pre_queue(
+ &self,
+ aggregator_pad: &gst_base::AggregatorPad,
+ mut event: gst::Event,
+ ) -> Result<gst::FlowSuccess, gst::FlowError> {
+ use gst::EventView;
+
+ gst::trace!(CAT, obj: aggregator_pad, "Handling event {:?}", event);
+
+ match event.view() {
+ EventView::Segment(ev) => {
+ if ev.segment().format() != gst::Format::Time {
+ gst::warning!(
+ CAT,
+ obj: aggregator_pad,
+ "Received non-TIME segment, replacing with default TIME segment"
+ );
+ let segment = gst::FormattedSegment::<gst::ClockTime>::new();
+ event = gst::event::Segment::builder(&segment)
+ .seqnum(event.seqnum())
+ .build();
+ }
+ self.parent_sink_event_pre_queue(aggregator_pad, event)
+ }
+ _ => self.parent_sink_event_pre_queue(aggregator_pad, event),
+ }
+ }
+
+ fn sink_event(&self, aggregator_pad: &gst_base::AggregatorPad, event: gst::Event) -> bool {
+ use gst::EventView;
+
+ gst::trace!(CAT, obj: aggregator_pad, "Handling event {:?}", event);
+
+ match event.view() {
+ EventView::Segment(ev) => {
+ // Already fixed-up above to always be a TIME segment
+ let segment = ev
+ .segment()
+ .clone()
+ .downcast::<gst::ClockTime>()
+ .expect("non-TIME segment");
+ gst::info!(CAT, obj: aggregator_pad, "Received segment {:?}", segment);
+
+ // Only forward the segment event verbatim if this is a single stream variant.
+ // Otherwise we have to produce a default segment and re-timestamp all buffers
+ // with their running time.
+ let aggregator = self.instance();
+ let class = aggregator.class();
+ if class.as_ref().variant.is_single_stream() {
+ aggregator.update_segment(&segment);
+ }
+
+ self.parent_sink_event(aggregator_pad, event)
+ }
+ EventView::Tag(_ev) => {
+ // TODO: Maybe store for putting into the headers of the next fragment?
+
+ self.parent_sink_event(aggregator_pad, event)
+ }
+ _ => self.parent_sink_event(aggregator_pad, event),
+ }
+ }
+
+ fn src_query(&self, query: &mut gst::QueryRef) -> bool {
+ use gst::QueryViewMut;
+
+ gst::trace!(CAT, imp: self, "Handling query {:?}", query);
+
+ match query.view_mut() {
+ QueryViewMut::Seeking(q) => {
+ // We can't really handle seeking, it would break everything
+ q.set(false, gst::ClockTime::ZERO, gst::ClockTime::NONE);
+ true
+ }
+ _ => self.parent_src_query(query),
+ }
+ }
+
+ fn src_event(&self, event: gst::Event) -> bool {
+ use gst::EventView;
+
+ gst::trace!(CAT, imp: self, "Handling event {:?}", event);
+
+ match event.view() {
+ EventView::Seek(_ev) => false,
+ _ => self.parent_src_event(event),
+ }
+ }
+
+ fn flush(&self) -> Result<gst::FlowSuccess, gst::FlowError> {
+ self.parent_flush()?;
+
+ let mut state = self.state.lock().unwrap();
+
+ for stream in &mut state.streams {
+ stream.queued_gops.clear();
+ stream.dts_offset = None;
+ stream.current_position = gst::ClockTime::ZERO;
+ stream.current_utc_time = gst::ClockTime::ZERO;
+ stream.fragment_filled = false;
+ }
+
+ state.current_offset = 0;
+ state.fragment_offsets.clear();
+
+ Ok(gst::FlowSuccess::Ok)
+ }
+
+ fn stop(&self) -> Result<(), gst::ErrorMessage> {
+ gst::trace!(CAT, imp: self, "Stopping");
+
+ let _ = self.parent_stop();
+
+ *self.state.lock().unwrap() = State::default();
+
+ Ok(())
+ }
+
+ fn start(&self) -> Result<(), gst::ErrorMessage> {
+ gst::trace!(CAT, imp: self, "Starting");
+
+ self.parent_start()?;
+
+ // For non-single-stream variants configure a default segment that allows for negative
+ // DTS so that we can correctly re-timestamp buffers with their running times.
+ let aggregator = self.instance();
+ let class = aggregator.class();
+ if !class.as_ref().variant.is_single_stream() {
+ let mut segment = gst::FormattedSegment::<gst::ClockTime>::new();
+ segment.set_start(SEGMENT_OFFSET);
+ segment.set_position(SEGMENT_OFFSET);
+ aggregator.update_segment(&segment);
+ }
+
+ *self.state.lock().unwrap() = State::default();
+
+ Ok(())
+ }
+
+ fn negotiate(&self) -> bool {
+ true
+ }
+
+ fn aggregate(&self, timeout: bool) -> Result<gst::FlowSuccess, gst::FlowError> {
+ let settings = self.settings.lock().unwrap().clone();
+
+ let mut upstream_events = vec![];
+
+ let all_eos;
+ let (caps, buffers) = {
+ let mut state = self.state.lock().unwrap();
+
+ // Create streams
+ if state.streams.is_empty() {
+ self.create_streams(&mut state)?;
+ }
+
+ // Queue buffers from all streams that are not filled for the current fragment yet
+ //
+ // Always take a buffer from the stream with the earliest queued buffer to keep the
+ // fill-level at all sinkpads in sync.
+ let fragment_start_pts = state.fragment_start_pts;
+
+ while let Some((idx, stream)) = self.find_earliest_stream(&mut state, timeout)? {
+ // Can only happen if the stream was flushed in the meantime
+ let buffer = match stream.sinkpad.pop_buffer() {
+ Some(buffer) => buffer,
+ None => continue,
+ };
+
+ // Can only happen if the stream was flushed in the meantime
+ let segment = match stream
+ .sinkpad
+ .segment()
+ .clone()
+ .downcast::<gst::ClockTime>()
+ .ok()
+ {
+ Some(segment) => segment,
+ None => {
+ gst::error!(CAT, obj: &stream.sinkpad, "Got buffer before segment");
+ return Err(gst::FlowError::Error);
+ }
+ };
+
+ // Queue up the buffer and update GOP tracking state
+ self.queue_gops(idx, stream, &segment, buffer)?;
+
+ // Check if this stream is filled enough now.
+ if let Some((queued_end_pts, fragment_start_pts)) = Option::zip(
+ stream
+ .queued_gops
+ .iter()
+ .find(|gop| gop.final_end_pts)
+ .map(|gop| gop.end_pts),
+ fragment_start_pts,
+ ) {
+ if queued_end_pts.saturating_sub(fragment_start_pts)
+ >= settings.fragment_duration
+ {
+ gst::debug!(CAT, obj: &stream.sinkpad, "Stream queued enough data for this fragment");
+ stream.fragment_filled = true;
+ }
+ }
+ }
+
+ // Calculate the earliest PTS after queueing input if we can now.
+ if state.earliest_pts.is_none() {
+ let mut earliest_pts = None;
+
+ for stream in &state.streams {
+ let stream_earliest_pts = match stream.queued_gops.back() {
+ None => {
+ earliest_pts = None;
+ break;
+ }
+ Some(oldest_gop) => {
+ if !timeout && !oldest_gop.final_earliest_pts {
+ earliest_pts = None;
+ break;
+ }
+
+ oldest_gop.earliest_pts
+ }
+ };
+
+ if earliest_pts.opt_gt(stream_earliest_pts).unwrap_or(true) {
+ earliest_pts = Some(stream_earliest_pts);
+ }
+ }
+
+ if let Some(earliest_pts) = earliest_pts {
+ gst::info!(CAT, imp: self, "Got earliest PTS {}", earliest_pts);
+ state.earliest_pts = Some(earliest_pts);
+ state.fragment_start_pts = Some(earliest_pts);
+
+ gst::debug!(
+ CAT,
+ imp: self,
+ "Sending first force-keyunit event for running time {}",
+ earliest_pts + settings.fragment_duration,
+ );
+
+ let fku = gst_video::UpstreamForceKeyUnitEvent::builder()
+ .running_time(earliest_pts + settings.fragment_duration)
+ .all_headers(true)
+ .build();
+
+ for stream in &mut state.streams {
+ upstream_events.push((stream.sinkpad.clone(), fku.clone()));
+
+ // Check if this stream is filled enough now.
+ if let Some(queued_end_pts) = stream
+ .queued_gops
+ .iter()
+ .find(|gop| gop.final_end_pts)
+ .map(|gop| gop.end_pts)
+ {
+ if queued_end_pts.saturating_sub(earliest_pts)
+ >= settings.fragment_duration
+ {
+ gst::debug!(CAT, obj: &stream.sinkpad, "Stream queued enough data for this fragment");
+ stream.fragment_filled = true;
+ }
+ }
+ }
+ }
+ }
+
+ all_eos = state.streams.iter().all(|stream| stream.sinkpad.is_eos());
+ if all_eos {
+ gst::debug!(CAT, imp: self, "All streams are EOS now");
+ }
+
+ // If enough GOPs were queued, drain and create the output fragment
+ match self.drain(
+ &mut state,
+ &settings,
+ timeout,
+ all_eos,
+ &mut upstream_events,
+ ) {
+ Ok(res) => res,
+ Err(gst_base::AGGREGATOR_FLOW_NEED_DATA) => {
+ gst::element_imp_warning!(
+ self,
+ gst::StreamError::Format,
+ ["Longer GOPs than fragment duration"]
+ );
+ state.timeout_delay += 1.seconds();
+
+ drop(state);
+ for (sinkpad, event) in upstream_events {
+ sinkpad.push_event(event);
+ }
+ return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA);
+ }
+ Err(err) => return Err(err),
+ }
+ };
+
+ for (sinkpad, event) in upstream_events {
+ sinkpad.push_event(event);
+ }
+
+ if let Some(caps) = caps {
+ gst::debug!(CAT, imp: self, "Setting caps on source pad: {:?}", caps);
+ self.instance().set_src_caps(&caps);
+ }
+
+ if let Some(buffers) = buffers {
+ gst::trace!(CAT, imp: self, "Pushing buffer list {:?}", buffers);
+ self.instance().finish_buffer_list(buffers)?;
+ }
+
+ if all_eos {
+ gst::debug!(CAT, imp: self, "Doing EOS handling");
+
+ if settings.header_update_mode != super::HeaderUpdateMode::None {
+ let updated_header =
+ self.update_header(&mut self.state.lock().unwrap(), &settings, true);
+ match updated_header {
+ Ok(Some((buffer_list, caps))) => {
+ match settings.header_update_mode {
+ super::HeaderUpdateMode::None => unreachable!(),
+ super::HeaderUpdateMode::Rewrite => {
+ let mut q = gst::query::Seeking::new(gst::Format::Bytes);
+ if self.instance().src_pad().peer_query(&mut q) && q.result().0 {
+ let aggregator = self.instance();
+
+ aggregator.set_src_caps(&caps);
+
+ // Seek to the beginning with a default bytes segment
+ aggregator
+ .update_segment(
+ &gst::FormattedSegment::<gst::format::Bytes>::new(),
+ );
+
+ if let Err(err) = aggregator.finish_buffer_list(buffer_list) {
+ gst::error!(
+ CAT,
+ imp: self,
+ "Failed pushing updated header buffer downstream: {:?}",
+ err,
+ );
+ }
+ } else {
+ gst::error!(
+ CAT,
+ imp: self,
+ "Can't rewrite header because downstream is not seekable"
+ );
+ }
+ }
+ super::HeaderUpdateMode::Update => {
+ let aggregator = self.instance();
+
+ aggregator.set_src_caps(&caps);
+ if let Err(err) = aggregator.finish_buffer_list(buffer_list) {
+ gst::error!(
+ CAT,
+ imp: self,
+ "Failed pushing updated header buffer downstream: {:?}",
+ err,
+ );
+ }
+ }
+ }
+ }
+ Ok(None) => {}
+ Err(err) => {
+ gst::error!(
+ CAT,
+ imp: self,
+ "Failed to generate updated header: {:?}",
+ err
+ );
+ }
+ }
+ }
+
+ // Need to output new headers if started again after EOS
+ self.state.lock().unwrap().sent_headers = false;
+
+ Err(gst::FlowError::Eos)
+ } else {
+ Ok(gst::FlowSuccess::Ok)
+ }
+ }
+}
+
+#[repr(C)]
+pub(crate) struct Class {
+ parent: gst_base::ffi::GstAggregatorClass,
+ variant: super::Variant,
+}
+
+unsafe impl ClassStruct for Class {
+ type Type = FMP4Mux;
+}
+
+impl std::ops::Deref for Class {
+ type Target = glib::Class<gst_base::Aggregator>;
+
+ fn deref(&self) -> &Self::Target {
+ unsafe { &*(&self.parent as *const _ as *const _) }
+ }
+}
+
+unsafe impl<T: FMP4MuxImpl> IsSubclassable<T> for super::FMP4Mux {
+ fn class_init(class: &mut glib::Class<Self>) {
+ Self::parent_class_init::<T>(class);
+
+ let class = class.as_mut();
+ class.variant = T::VARIANT;
+ }
+}
+
+pub(crate) trait FMP4MuxImpl: AggregatorImpl {
+ const VARIANT: super::Variant;
+}
+
+#[derive(Default)]
+pub(crate) struct ISOFMP4Mux;
+
+#[glib::object_subclass]
+impl ObjectSubclass for ISOFMP4Mux {
+ const NAME: &'static str = "GstISOFMP4Mux";
+ type Type = super::ISOFMP4Mux;
+ type ParentType = super::FMP4Mux;
+}
+
+impl ObjectImpl for ISOFMP4Mux {}
+
+impl GstObjectImpl for ISOFMP4Mux {}
+
+impl ElementImpl for ISOFMP4Mux {
+ fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
+ static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
+ gst::subclass::ElementMetadata::new(
+ "ISOFMP4Mux",
+ "Codec/Muxer",
+ "ISO fragmented MP4 muxer",
+ "Sebastian Dröge <sebastian@centricular.com>",
+ )
+ });
+
+ Some(&*ELEMENT_METADATA)
+ }
+
+ fn pad_templates() -> &'static [gst::PadTemplate] {
+ static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
+ let src_pad_template = gst::PadTemplate::new(
+ "src",
+ gst::PadDirection::Src,
+ gst::PadPresence::Always,
+ &gst::Caps::builder("video/quicktime")
+ .field("variant", "iso-fragmented")
+ .build(),
+ )
+ .unwrap();
+
+ let sink_pad_template = gst::PadTemplate::new(
+ "sink_%u",
+ gst::PadDirection::Sink,
+ gst::PadPresence::Request,
+ &[
+ gst::Structure::builder("video/x-h264")
+ .field("stream-format", gst::List::new(["avc", "avc3"]))
+ .field("alignment", "au")
+ .field("width", gst::IntRange::new(1, u16::MAX as i32))
+ .field("height", gst::IntRange::new(1, u16::MAX as i32))
+ .build(),
+ gst::Structure::builder("video/x-h265")
+ .field("stream-format", gst::List::new(["hvc1", "hev1"]))
+ .field("alignment", "au")
+ .field("width", gst::IntRange::new(1, u16::MAX as i32))
+ .field("height", gst::IntRange::new(1, u16::MAX as i32))
+ .build(),
+ gst::Structure::builder("audio/mpeg")
+ .field("mpegversion", 4i32)
+ .field("stream-format", "raw")
+ .field("channels", gst::IntRange::new(1, u16::MAX as i32))
+ .field("rate", gst::IntRange::new(1, i32::MAX))
+ .build(),
+ ]
+ .into_iter()
+ .collect::<gst::Caps>(),
+ )
+ .unwrap();
+
+ vec![src_pad_template, sink_pad_template]
+ });
+
+ PAD_TEMPLATES.as_ref()
+ }
+}
+
+impl AggregatorImpl for ISOFMP4Mux {}
+
+impl FMP4MuxImpl for ISOFMP4Mux {
+ const VARIANT: super::Variant = super::Variant::ISO;
+}
+
+#[derive(Default)]
+pub(crate) struct CMAFMux;
+
+#[glib::object_subclass]
+impl ObjectSubclass for CMAFMux {
+ const NAME: &'static str = "GstCMAFMux";
+ type Type = super::CMAFMux;
+ type ParentType = super::FMP4Mux;
+}
+
+impl ObjectImpl for CMAFMux {}
+
+impl GstObjectImpl for CMAFMux {}
+
+impl ElementImpl for CMAFMux {
+ fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
+ static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
+ gst::subclass::ElementMetadata::new(
+ "CMAFMux",
+ "Codec/Muxer",
+ "CMAF fragmented MP4 muxer",
+ "Sebastian Dröge <sebastian@centricular.com>",
+ )
+ });
+
+ Some(&*ELEMENT_METADATA)
+ }
+
+ fn pad_templates() -> &'static [gst::PadTemplate] {
+ static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
+ let src_pad_template = gst::PadTemplate::new(
+ "src",
+ gst::PadDirection::Src,
+ gst::PadPresence::Always,
+ &gst::Caps::builder("video/quicktime")
+ .field("variant", "cmaf")
+ .build(),
+ )
+ .unwrap();
+
+ let sink_pad_template = gst::PadTemplate::new(
+ "sink",
+ gst::PadDirection::Sink,
+ gst::PadPresence::Always,
+ &[
+ gst::Structure::builder("video/x-h264")
+ .field("stream-format", gst::List::new(["avc", "avc3"]))
+ .field("alignment", "au")
+ .field("width", gst::IntRange::new(1, u16::MAX as i32))
+ .field("height", gst::IntRange::new(1, u16::MAX as i32))
+ .build(),
+ gst::Structure::builder("video/x-h265")
+ .field("stream-format", gst::List::new(["hvc1", "hev1"]))
+ .field("alignment", "au")
+ .field("width", gst::IntRange::new(1, u16::MAX as i32))
+ .field("height", gst::IntRange::new(1, u16::MAX as i32))
+ .build(),
+ gst::Structure::builder("audio/mpeg")
+ .field("mpegversion", 4i32)
+ .field("stream-format", "raw")
+ .field("channels", gst::IntRange::new(1, u16::MAX as i32))
+ .field("rate", gst::IntRange::new(1, i32::MAX))
+ .build(),
+ ]
+ .into_iter()
+ .collect::<gst::Caps>(),
+ )
+ .unwrap();
+
+ vec![src_pad_template, sink_pad_template]
+ });
+
+ PAD_TEMPLATES.as_ref()
+ }
+}
+
+impl AggregatorImpl for CMAFMux {}
+
+impl FMP4MuxImpl for CMAFMux {
+ const VARIANT: super::Variant = super::Variant::CMAF;
+}
+
+#[derive(Default)]
+pub(crate) struct DASHMP4Mux;
+
+#[glib::object_subclass]
+impl ObjectSubclass for DASHMP4Mux {
+ const NAME: &'static str = "GstDASHMP4Mux";
+ type Type = super::DASHMP4Mux;
+ type ParentType = super::FMP4Mux;
+}
+
+impl ObjectImpl for DASHMP4Mux {}
+
+impl GstObjectImpl for DASHMP4Mux {}
+
+impl ElementImpl for DASHMP4Mux {
+ fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
+ static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
+ gst::subclass::ElementMetadata::new(
+ "DASHMP4Mux",
+ "Codec/Muxer",
+ "DASH fragmented MP4 muxer",
+ "Sebastian Dröge <sebastian@centricular.com>",
+ )
+ });
+
+ Some(&*ELEMENT_METADATA)
+ }
+
+ fn pad_templates() -> &'static [gst::PadTemplate] {
+ static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
+ let src_pad_template = gst::PadTemplate::new(
+ "src",
+ gst::PadDirection::Src,
+ gst::PadPresence::Always,
+ &gst::Caps::builder("video/quicktime")
+ .field("variant", "iso-fragmented")
+ .build(),
+ )
+ .unwrap();
+
+ let sink_pad_template = gst::PadTemplate::new(
+ "sink",
+ gst::PadDirection::Sink,
+ gst::PadPresence::Always,
+ &[
+ gst::Structure::builder("video/x-h264")
+ .field("stream-format", gst::List::new(&[&"avc", &"avc3"]))
+ .field("alignment", "au")
+ .field("width", gst::IntRange::<i32>::new(1, u16::MAX as i32))
+ .field("height", gst::IntRange::<i32>::new(1, u16::MAX as i32))
+ .build(),
+ gst::Structure::builder("video/x-h265")
+ .field("stream-format", gst::List::new(&[&"hvc1", &"hev1"]))
+ .field("alignment", "au")
+ .field("width", gst::IntRange::<i32>::new(1, u16::MAX as i32))
+ .field("height", gst::IntRange::<i32>::new(1, u16::MAX as i32))
+ .build(),
+ gst::Structure::builder("audio/mpeg")
+ .field("mpegversion", 4i32)
+ .field("stream-format", "raw")
+ .field("channels", gst::IntRange::<i32>::new(1, u16::MAX as i32))
+ .field("rate", gst::IntRange::<i32>::new(1, i32::MAX))
+ .build(),
+ ]
+ .into_iter()
+ .collect::<gst::Caps>(),
+ )
+ .unwrap();
+
+ vec![src_pad_template, sink_pad_template]
+ });
+
+ PAD_TEMPLATES.as_ref()
+ }
+}
+
+impl AggregatorImpl for DASHMP4Mux {}
+
+impl FMP4MuxImpl for DASHMP4Mux {
+ const VARIANT: super::Variant = super::Variant::DASH;
+}
+
+#[derive(Default)]
+pub(crate) struct ONVIFFMP4Mux;
+
+#[glib::object_subclass]
+impl ObjectSubclass for ONVIFFMP4Mux {
+ const NAME: &'static str = "GstONVIFFMP4Mux";
+ type Type = super::ONVIFFMP4Mux;
+ type ParentType = super::FMP4Mux;
+}
+
+impl ObjectImpl for ONVIFFMP4Mux {}
+
+impl GstObjectImpl for ONVIFFMP4Mux {}
+
+impl ElementImpl for ONVIFFMP4Mux {
+ fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
+ static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
+ gst::subclass::ElementMetadata::new(
+ "ONVIFFMP4Mux",
+ "Codec/Muxer",
+ "ONVIF fragmented MP4 muxer",
+ "Sebastian Dröge <sebastian@centricular.com>",
+ )
+ });
+
+ Some(&*ELEMENT_METADATA)
+ }
+
+ fn pad_templates() -> &'static [gst::PadTemplate] {
+ static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
+ let src_pad_template = gst::PadTemplate::new(
+ "src",
+ gst::PadDirection::Src,
+ gst::PadPresence::Always,
+ &gst::Caps::builder("video/quicktime")
+ .field("variant", "iso-fragmented")
+ .build(),
+ )
+ .unwrap();
+
+ let sink_pad_template = gst::PadTemplate::new(
+ "sink_%u",
+ gst::PadDirection::Sink,
+ gst::PadPresence::Request,
+ &[
+ gst::Structure::builder("video/x-h264")
+ .field("stream-format", gst::List::new(&[&"avc", &"avc3"]))
+ .field("alignment", "au")
+ .field("width", gst::IntRange::<i32>::new(1, u16::MAX as i32))
+ .field("height", gst::IntRange::<i32>::new(1, u16::MAX as i32))
+ .build(),
+ gst::Structure::builder("video/x-h265")
+ .field("stream-format", gst::List::new(&[&"hvc1", &"hev1"]))
+ .field("alignment", "au")
+ .field("width", gst::IntRange::<i32>::new(1, u16::MAX as i32))
+ .field("height", gst::IntRange::<i32>::new(1, u16::MAX as i32))
+ .build(),
+ gst::Structure::builder("image/jpeg")
+ .field("width", gst::IntRange::<i32>::new(1, u16::MAX as i32))
+ .field("height", gst::IntRange::<i32>::new(1, u16::MAX as i32))
+ .build(),
+ gst::Structure::builder("audio/mpeg")
+ .field("mpegversion", 4i32)
+ .field("stream-format", "raw")
+ .field("channels", gst::IntRange::<i32>::new(1, u16::MAX as i32))
+ .field("rate", gst::IntRange::<i32>::new(1, i32::MAX))
+ .build(),
+ gst::Structure::builder("audio/x-alaw")
+ .field("channels", gst::IntRange::<i32>::new(1, 2))
+ .field("rate", gst::IntRange::<i32>::new(1, i32::MAX))
+ .build(),
+ gst::Structure::builder("audio/x-mulaw")
+ .field("channels", gst::IntRange::<i32>::new(1, 2))
+ .field("rate", gst::IntRange::<i32>::new(1, i32::MAX))
+ .build(),
+ gst::Structure::builder("audio/x-adpcm")
+ .field("layout", "g726")
+ .field("channels", 1i32)
+ .field("rate", 8000i32)
+ .field("bitrate", gst::List::new([16000i32, 24000, 32000, 40000]))
+ .build(),
+ gst::Structure::builder("application/x-onvif-metadata")
+ .field("parsed", true)
+ .build(),
+ ]
+ .into_iter()
+ .collect::<gst::Caps>(),
+ )
+ .unwrap();
+
+ vec![src_pad_template, sink_pad_template]
+ });
+
+ PAD_TEMPLATES.as_ref()
+ }
+}
+
+impl AggregatorImpl for ONVIFFMP4Mux {}
+
+impl FMP4MuxImpl for ONVIFFMP4Mux {
+ const VARIANT: super::Variant = super::Variant::ONVIF;
+}
diff --git a/mux/fmp4/src/fmp4mux/mod.rs b/mux/fmp4/src/fmp4mux/mod.rs
new file mode 100644
index 000000000..76c2da9c2
--- /dev/null
+++ b/mux/fmp4/src/fmp4mux/mod.rs
@@ -0,0 +1,146 @@
+// Copyright (C) 2021 Sebastian Dröge <sebastian@centricular.com>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
+// If a copy of the MPL was not distributed with this file, You can obtain one at
+// <https://mozilla.org/MPL/2.0/>.
+//
+// SPDX-License-Identifier: MPL-2.0
+
+use gst::glib;
+use gst::prelude::*;
+
+mod boxes;
+mod imp;
+
+glib::wrapper! {
+ pub(crate) struct FMP4Mux(ObjectSubclass<imp::FMP4Mux>) @extends gst_base::Aggregator, gst::Element, gst::Object;
+}
+
+glib::wrapper! {
+ pub(crate) struct ISOFMP4Mux(ObjectSubclass<imp::ISOFMP4Mux>) @extends FMP4Mux, gst_base::Aggregator, gst::Element, gst::Object;
+}
+
+glib::wrapper! {
+ pub(crate) struct CMAFMux(ObjectSubclass<imp::CMAFMux>) @extends FMP4Mux, gst_base::Aggregator, gst::Element, gst::Object;
+}
+
+glib::wrapper! {
+ pub(crate) struct DASHMP4Mux(ObjectSubclass<imp::DASHMP4Mux>) @extends FMP4Mux, gst_base::Aggregator, gst::Element, gst::Object;
+}
+
+glib::wrapper! {
+ pub(crate) struct ONVIFFMP4Mux(ObjectSubclass<imp::ONVIFFMP4Mux>) @extends FMP4Mux, gst_base::Aggregator, gst::Element, gst::Object;
+}
+
+pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ FMP4Mux::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty());
+ HeaderUpdateMode::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty());
+ gst::Element::register(
+ Some(plugin),
+ "isofmp4mux",
+ gst::Rank::Primary,
+ ISOFMP4Mux::static_type(),
+ )?;
+ gst::Element::register(
+ Some(plugin),
+ "cmafmux",
+ gst::Rank::Primary,
+ CMAFMux::static_type(),
+ )?;
+ gst::Element::register(
+ Some(plugin),
+ "dashmp4mux",
+ gst::Rank::Primary,
+ DASHMP4Mux::static_type(),
+ )?;
+ gst::Element::register(
+ Some(plugin),
+ "onviffmp4mux",
+ gst::Rank::Primary,
+ ONVIFFMP4Mux::static_type(),
+ )?;
+
+ Ok(())
+}
+
+#[derive(Debug)]
+pub(crate) struct HeaderConfiguration<'a> {
+ variant: Variant,
+ update: bool,
+ /// First caps must be the video/reference stream. Must be in the order the tracks are going to
+ /// be used later for the fragments too.
+ streams: &'a [gst::Caps],
+ write_mehd: bool,
+ duration: Option<gst::ClockTime>,
+ /// Start UTC time in ONVIF mode.
+ /// Since Jan 1 1601 in 100ns units.
+ start_utc_time: Option<u64>,
+}
+
+#[derive(Debug)]
+pub(crate) struct FragmentHeaderConfiguration<'a> {
+ variant: Variant,
+ sequence_number: u32,
+ streams: &'a [(gst::Caps, Option<FragmentTimingInfo>)],
+ buffers: &'a [Buffer],
+}
+
+#[derive(Debug)]
+pub(crate) struct FragmentTimingInfo {
+ /// Start time of this fragment
+ start_time: gst::ClockTime,
+ /// Set if this is an intra-only stream
+ intra_only: bool,
+}
+
+#[derive(Debug)]
+pub(crate) struct Buffer {
+ /// Track index
+ idx: usize,
+
+ /// Actual buffer
+ buffer: gst::Buffer,
+
+ /// Timestamp
+ timestamp: gst::ClockTime,
+
+ /// Sample duration
+ duration: gst::ClockTime,
+
+ /// Composition time offset
+ composition_time_offset: Option<i64>,
+}
+
+#[allow(clippy::upper_case_acronyms)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub(crate) enum Variant {
+ ISO,
+ CMAF,
+ DASH,
+ ONVIF,
+}
+
+impl Variant {
+ pub(crate) fn is_single_stream(self) -> bool {
+ match self {
+ Variant::ISO | Variant::ONVIF => false,
+ Variant::CMAF | Variant::DASH => true,
+ }
+ }
+}
+
+#[derive(Debug)]
+pub(crate) struct FragmentOffset {
+ time: gst::ClockTime,
+ offset: u64,
+}
+
+#[allow(clippy::upper_case_acronyms)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq, glib::Enum)]
+#[repr(i32)]
+#[enum_type(name = "GstFMP4MuxHeaderUpdateMode")]
+pub(crate) enum HeaderUpdateMode {
+ None,
+ Rewrite,
+ Update,
+}
diff --git a/mux/fmp4/src/lib.rs b/mux/fmp4/src/lib.rs
new file mode 100644
index 000000000..697c7cbf1
--- /dev/null
+++ b/mux/fmp4/src/lib.rs
@@ -0,0 +1,34 @@
+// Copyright (C) 2021 Sebastian Dröge <sebastian@centricular.com>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
+// If a copy of the MPL was not distributed with this file, You can obtain one at
+// <https://mozilla.org/MPL/2.0/>.
+//
+// SPDX-License-Identifier: MPL-2.0
+#![allow(clippy::non_send_fields_in_send_ty, unused_doc_comments)]
+
+/**
+ * plugin-fmp4:
+ *
+ * Since: plugins-rs-0.8.0
+ */
+use gst::glib;
+
+mod fmp4mux;
+
+fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ fmp4mux::register(plugin)
+}
+
+gst::plugin_define!(
+ fmp4,
+ env!("CARGO_PKG_DESCRIPTION"),
+ plugin_init,
+ concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")),
+ // FIXME: MPL-2.0 is only allowed since 1.18.3 (as unknown) and 1.20 (as known)
+ "MPL",
+ env!("CARGO_PKG_NAME"),
+ env!("CARGO_PKG_NAME"),
+ env!("CARGO_PKG_REPOSITORY"),
+ env!("BUILD_REL_DATE")
+);
diff --git a/mux/fmp4/tests/tests.rs b/mux/fmp4/tests/tests.rs
new file mode 100644
index 000000000..073b6ae05
--- /dev/null
+++ b/mux/fmp4/tests/tests.rs
@@ -0,0 +1,1241 @@
+//
+// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
+// If a copy of the MPL was not distributed with this file, You can obtain one at
+// <https://mozilla.org/MPL/2.0/>.
+//
+// SPDX-License-Identifier: MPL-2.0
+//
+
+use gst::prelude::*;
+
+fn init() {
+ use std::sync::Once;
+ static INIT: Once = Once::new();
+
+ INIT.call_once(|| {
+ gst::init().unwrap();
+ gstfmp4::plugin_register_static().unwrap();
+ });
+}
+
+fn test_buffer_flags_single_stream(cmaf: bool) {
+ let mut h = if cmaf {
+ gst_check::Harness::new("cmafmux")
+ } else {
+ gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src"))
+ };
+
+ // 5s fragment duration
+ h.element()
+ .unwrap()
+ .set_property("fragment-duration", 5.seconds());
+
+ h.set_src_caps(
+ gst::Caps::builder("video/x-h264")
+ .field("width", 1920i32)
+ .field("height", 1080i32)
+ .field("framerate", gst::Fraction::new(30, 1))
+ .field("stream-format", "avc")
+ .field("alignment", "au")
+ .field("codec_data", gst::Buffer::with_size(1).unwrap())
+ .build(),
+ );
+ h.play();
+
+ let output_offset = if cmaf {
+ gst::ClockTime::ZERO
+ } else {
+ (60 * 60 * 1000).seconds()
+ };
+
+ // Push 7 buffers of 1s each, 1st and 6 buffer without DELTA_UNIT flag
+ for i in 0..7 {
+ let mut buffer = gst::Buffer::with_size(1).unwrap();
+ {
+ let buffer = buffer.get_mut().unwrap();
+ buffer.set_pts(i.seconds());
+ buffer.set_dts(i.seconds());
+ buffer.set_duration(gst::ClockTime::SECOND);
+ if i != 0 && i != 5 {
+ buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
+ }
+ }
+ assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok));
+
+ if i == 2 {
+ let ev = loop {
+ let ev = h.pull_upstream_event().unwrap();
+ if ev.type_() != gst::EventType::Reconfigure
+ && ev.type_() != gst::EventType::Latency
+ {
+ break ev;
+ }
+ };
+
+ assert_eq!(ev.type_(), gst::EventType::CustomUpstream);
+ assert_eq!(
+ gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(),
+ gst_video::UpstreamForceKeyUnitEvent {
+ running_time: Some(5.seconds()),
+ all_headers: true,
+ count: 0
+ }
+ );
+ }
+ }
+
+ let header = h.pull().unwrap();
+ assert_eq!(
+ header.flags(),
+ gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT
+ );
+ assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset));
+ assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset));
+
+ let fragment_header = h.pull().unwrap();
+ assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
+ assert_eq!(
+ fragment_header.pts(),
+ Some(gst::ClockTime::ZERO + output_offset)
+ );
+ assert_eq!(
+ fragment_header.dts(),
+ Some(gst::ClockTime::ZERO + output_offset)
+ );
+ assert_eq!(fragment_header.duration(), Some(5.seconds()));
+
+ for i in 0..5 {
+ let buffer = h.pull().unwrap();
+ if i == 4 {
+ assert_eq!(
+ buffer.flags(),
+ gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
+ );
+ } else {
+ assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
+ }
+ assert_eq!(buffer.pts(), Some(i.seconds() + output_offset));
+ assert_eq!(buffer.dts(), Some(i.seconds() + output_offset));
+ assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
+ }
+
+ h.push_event(gst::event::Eos::new());
+
+ let fragment_header = h.pull().unwrap();
+ assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
+ assert_eq!(fragment_header.pts(), Some(5.seconds() + output_offset));
+ assert_eq!(fragment_header.dts(), Some(5.seconds() + output_offset));
+ assert_eq!(fragment_header.duration(), Some(2.seconds()));
+
+ for i in 5..7 {
+ let buffer = h.pull().unwrap();
+ if i == 6 {
+ assert_eq!(
+ buffer.flags(),
+ gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
+ );
+ } else {
+ assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
+ }
+ assert_eq!(buffer.pts(), Some(i.seconds() + output_offset));
+ assert_eq!(buffer.dts(), Some(i.seconds() + output_offset));
+ assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
+ }
+
+ let ev = h.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::StreamStart);
+ let ev = h.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::Caps);
+ let ev = h.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::Segment);
+ let ev = h.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::Eos);
+}
+
+#[test]
+fn test_buffer_flags_single_stream_cmaf() {
+ init();
+
+ test_buffer_flags_single_stream(true);
+}
+
+#[test]
+fn test_buffer_flags_single_stream_iso() {
+ init();
+
+ test_buffer_flags_single_stream(false);
+}
+
+#[test]
+fn test_buffer_flags_multi_stream() {
+ init();
+
+ let mut h1 = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src"));
+ let mut h2 = gst_check::Harness::with_element(&h1.element().unwrap(), Some("sink_1"), None);
+
+ // 5s fragment duration
+ h1.element()
+ .unwrap()
+ .set_property("fragment-duration", 5.seconds());
+
+ h1.set_src_caps(
+ gst::Caps::builder("video/x-h264")
+ .field("width", 1920i32)
+ .field("height", 1080i32)
+ .field("framerate", gst::Fraction::new(30, 1))
+ .field("stream-format", "avc")
+ .field("alignment", "au")
+ .field("codec_data", gst::Buffer::with_size(1).unwrap())
+ .build(),
+ );
+ h1.play();
+
+ h2.set_src_caps(
+ gst::Caps::builder("audio/mpeg")
+ .field("mpegversion", 4i32)
+ .field("channels", 1i32)
+ .field("rate", 44100i32)
+ .field("stream-format", "raw")
+ .field("base-profile", "lc")
+ .field("profile", "lc")
+ .field("level", "2")
+ .field(
+ "codec_data",
+ gst::Buffer::from_slice([0x12, 0x08, 0x56, 0xe5, 0x00]),
+ )
+ .build(),
+ );
+ h2.play();
+
+ let output_offset = (60 * 60 * 1000).seconds();
+
+ // Push 7 buffers of 1s each, 1st and last buffer without DELTA_UNIT flag
+ for i in 0..7 {
+ let mut buffer = gst::Buffer::with_size(1).unwrap();
+ {
+ let buffer = buffer.get_mut().unwrap();
+ buffer.set_pts(i.seconds());
+ buffer.set_dts(i.seconds());
+ buffer.set_duration(gst::ClockTime::SECOND);
+ if i != 0 && i != 5 {
+ buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
+ }
+ }
+ assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok));
+
+ let mut buffer = gst::Buffer::with_size(1).unwrap();
+ {
+ let buffer = buffer.get_mut().unwrap();
+ buffer.set_pts(i.seconds());
+ buffer.set_dts(i.seconds());
+ buffer.set_duration(gst::ClockTime::SECOND);
+ }
+ assert_eq!(h2.push(buffer), Ok(gst::FlowSuccess::Ok));
+
+ if i == 2 {
+ let ev = loop {
+ let ev = h1.pull_upstream_event().unwrap();
+ if ev.type_() != gst::EventType::Reconfigure
+ && ev.type_() != gst::EventType::Latency
+ {
+ break ev;
+ }
+ };
+
+ assert_eq!(ev.type_(), gst::EventType::CustomUpstream);
+ assert_eq!(
+ gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(),
+ gst_video::UpstreamForceKeyUnitEvent {
+ running_time: Some(5.seconds()),
+ all_headers: true,
+ count: 0
+ }
+ );
+
+ let ev = loop {
+ let ev = h2.pull_upstream_event().unwrap();
+ if ev.type_() != gst::EventType::Reconfigure
+ && ev.type_() != gst::EventType::Latency
+ {
+ break ev;
+ }
+ };
+
+ assert_eq!(ev.type_(), gst::EventType::CustomUpstream);
+ assert_eq!(
+ gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(),
+ gst_video::UpstreamForceKeyUnitEvent {
+ running_time: Some(5.seconds()),
+ all_headers: true,
+ count: 0
+ }
+ );
+ }
+ }
+
+ let header = h1.pull().unwrap();
+ assert_eq!(
+ header.flags(),
+ gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT
+ );
+ assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset));
+ assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset));
+
+ let fragment_header = h1.pull().unwrap();
+ assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
+ assert_eq!(
+ fragment_header.pts(),
+ Some(gst::ClockTime::ZERO + output_offset)
+ );
+ assert_eq!(
+ fragment_header.dts(),
+ Some(gst::ClockTime::ZERO + output_offset)
+ );
+ assert_eq!(fragment_header.duration(), Some(5.seconds()));
+
+ for i in 0..5 {
+ for j in 0..2 {
+ let buffer = h1.pull().unwrap();
+ if i == 4 && j == 1 {
+ assert_eq!(
+ buffer.flags(),
+ gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
+ );
+ } else {
+ assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
+ }
+
+ assert_eq!(buffer.pts(), Some(i.seconds() + output_offset));
+
+ if j == 0 {
+ assert_eq!(buffer.dts(), Some(i.seconds() + output_offset));
+ } else {
+ assert!(buffer.dts().is_none());
+ }
+ assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
+ }
+ }
+
+ h1.push_event(gst::event::Eos::new());
+ h2.push_event(gst::event::Eos::new());
+
+ let fragment_header = h1.pull().unwrap();
+ assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
+ assert_eq!(fragment_header.pts(), Some(5.seconds() + output_offset));
+ assert_eq!(fragment_header.dts(), Some(5.seconds() + output_offset));
+ assert_eq!(fragment_header.duration(), Some(2.seconds()));
+
+ for i in 5..7 {
+ for j in 0..2 {
+ let buffer = h1.pull().unwrap();
+ if i == 6 && j == 1 {
+ assert_eq!(
+ buffer.flags(),
+ gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
+ );
+ } else {
+ assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
+ }
+ assert_eq!(buffer.pts(), Some(i.seconds() + output_offset));
+ if j == 0 {
+ assert_eq!(buffer.dts(), Some(i.seconds() + output_offset));
+ } else {
+ assert!(buffer.dts().is_none());
+ }
+ assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
+ }
+ }
+
+ let ev = h1.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::StreamStart);
+ let ev = h1.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::Caps);
+ let ev = h1.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::Segment);
+ let ev = h1.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::Eos);
+}
+
+#[test]
+fn test_live_timeout() {
+ init();
+
+ let mut h1 = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src"));
+ let mut h2 = gst_check::Harness::with_element(&h1.element().unwrap(), Some("sink_1"), None);
+
+ h1.use_testclock();
+
+ // 5s fragment duration
+ h1.element()
+ .unwrap()
+ .set_property("fragment-duration", 5.seconds());
+
+ h1.set_src_caps(
+ gst::Caps::builder("video/x-h264")
+ .field("width", 1920i32)
+ .field("height", 1080i32)
+ .field("framerate", gst::Fraction::new(30, 1))
+ .field("stream-format", "avc")
+ .field("alignment", "au")
+ .field("codec_data", gst::Buffer::with_size(1).unwrap())
+ .build(),
+ );
+ h1.play();
+
+ h2.set_src_caps(
+ gst::Caps::builder("audio/mpeg")
+ .field("mpegversion", 4i32)
+ .field("channels", 1i32)
+ .field("rate", 44100i32)
+ .field("stream-format", "raw")
+ .field("base-profile", "lc")
+ .field("profile", "lc")
+ .field("level", "2")
+ .field(
+ "codec_data",
+ gst::Buffer::from_slice([0x12, 0x08, 0x56, 0xe5, 0x00]),
+ )
+ .build(),
+ );
+ h2.play();
+
+ let output_offset = (60 * 60 * 1000).seconds();
+
+ // Push 7 buffers of 1s each, 1st and last buffer without DELTA_UNIT flag
+ for i in 0..7 {
+ let mut buffer = gst::Buffer::with_size(1).unwrap();
+ {
+ let buffer = buffer.get_mut().unwrap();
+ buffer.set_pts(i.seconds());
+ buffer.set_dts(i.seconds());
+ buffer.set_duration(gst::ClockTime::SECOND);
+ if i != 0 && i != 5 {
+ buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
+ }
+ }
+ assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok));
+
+ // Skip buffer 4th and 6th buffer (end of fragment / stream)
+ if i == 4 || i == 6 {
+ continue;
+ } else {
+ let mut buffer = gst::Buffer::with_size(1).unwrap();
+ {
+ let buffer = buffer.get_mut().unwrap();
+ buffer.set_pts(i.seconds());
+ buffer.set_dts(i.seconds());
+ buffer.set_duration(gst::ClockTime::SECOND);
+ }
+ assert_eq!(h2.push(buffer), Ok(gst::FlowSuccess::Ok));
+ }
+
+ if i == 2 {
+ let ev = loop {
+ let ev = h1.pull_upstream_event().unwrap();
+ if ev.type_() != gst::EventType::Reconfigure
+ && ev.type_() != gst::EventType::Latency
+ {
+ break ev;
+ }
+ };
+
+ assert_eq!(ev.type_(), gst::EventType::CustomUpstream);
+ assert_eq!(
+ gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(),
+ gst_video::UpstreamForceKeyUnitEvent {
+ running_time: Some(5.seconds()),
+ all_headers: true,
+ count: 0
+ }
+ );
+
+ let ev = loop {
+ let ev = h2.pull_upstream_event().unwrap();
+ if ev.type_() != gst::EventType::Reconfigure
+ && ev.type_() != gst::EventType::Latency
+ {
+ break ev;
+ }
+ };
+
+ assert_eq!(ev.type_(), gst::EventType::CustomUpstream);
+ assert_eq!(
+ gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(),
+ gst_video::UpstreamForceKeyUnitEvent {
+ running_time: Some(5.seconds()),
+ all_headers: true,
+ count: 0
+ }
+ );
+ }
+ }
+
+ // Advance time and crank the clock: this should bring us to the end of the first fragment
+ h1.set_time(5.seconds()).unwrap();
+ h1.crank_single_clock_wait().unwrap();
+
+ let header = h1.pull().unwrap();
+ assert_eq!(
+ header.flags(),
+ gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT
+ );
+ assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset));
+ assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset));
+
+ let fragment_header = h1.pull().unwrap();
+ assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
+ assert_eq!(
+ fragment_header.pts(),
+ Some(gst::ClockTime::ZERO + output_offset)
+ );
+ assert_eq!(
+ fragment_header.dts(),
+ Some(gst::ClockTime::ZERO + output_offset)
+ );
+ assert_eq!(fragment_header.duration(), Some(5.seconds()));
+
+ for i in 0..5 {
+ for j in 0..2 {
+ // Skip gap events that don't result in buffers
+ if j == 1 && i == 4 {
+ // Advance time and crank the clock another time. This brings us at the end of the
+ // EOS.
+ h1.crank_single_clock_wait().unwrap();
+ continue;
+ }
+
+ let buffer = h1.pull().unwrap();
+ if i == 4 && j == 0 {
+ assert_eq!(
+ buffer.flags(),
+ gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
+ );
+ } else if i == 5 && j == 0 {
+ assert_eq!(buffer.flags(), gst::BufferFlags::HEADER);
+ } else {
+ assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
+ }
+
+ assert_eq!(buffer.pts(), Some(i.seconds() + output_offset));
+
+ if j == 0 {
+ assert_eq!(buffer.dts(), Some(i.seconds() + output_offset));
+ } else {
+ assert!(buffer.dts().is_none());
+ }
+ assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
+ }
+ }
+
+ h1.push_event(gst::event::Eos::new());
+ h2.push_event(gst::event::Eos::new());
+
+ let fragment_header = h1.pull().unwrap();
+ assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
+ assert_eq!(fragment_header.pts(), Some(5.seconds() + output_offset));
+ assert_eq!(fragment_header.dts(), Some(5.seconds() + output_offset));
+ assert_eq!(fragment_header.duration(), Some(2.seconds()));
+
+ for i in 5..7 {
+ for j in 0..2 {
+ // Skip gap events that don't result in buffers
+ if j == 1 && i == 6 {
+ continue;
+ }
+
+ let buffer = h1.pull().unwrap();
+ if i == 6 && j == 0 {
+ assert_eq!(
+ buffer.flags(),
+ gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
+ );
+ } else {
+ assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
+ }
+ assert_eq!(buffer.pts(), Some(i.seconds() + output_offset));
+ if j == 0 {
+ assert_eq!(buffer.dts(), Some(i.seconds() + output_offset));
+ } else {
+ assert!(buffer.dts().is_none());
+ }
+ assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
+ }
+ }
+
+ let ev = h1.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::StreamStart);
+ let ev = h1.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::Caps);
+ let ev = h1.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::Segment);
+ let ev = h1.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::Eos);
+}
+
+#[test]
+fn test_gap_events() {
+ init();
+
+ let mut h1 = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src"));
+ let mut h2 = gst_check::Harness::with_element(&h1.element().unwrap(), Some("sink_1"), None);
+
+ h1.use_testclock();
+
+ // 5s fragment duration
+ h1.element()
+ .unwrap()
+ .set_property("fragment-duration", 5.seconds());
+
+ h1.set_src_caps(
+ gst::Caps::builder("video/x-h264")
+ .field("width", 1920i32)
+ .field("height", 1080i32)
+ .field("framerate", gst::Fraction::new(30, 1))
+ .field("stream-format", "avc")
+ .field("alignment", "au")
+ .field("codec_data", gst::Buffer::with_size(1).unwrap())
+ .build(),
+ );
+ h1.play();
+
+ h2.set_src_caps(
+ gst::Caps::builder("audio/mpeg")
+ .field("mpegversion", 4i32)
+ .field("channels", 1i32)
+ .field("rate", 44100i32)
+ .field("stream-format", "raw")
+ .field("base-profile", "lc")
+ .field("profile", "lc")
+ .field("level", "2")
+ .field(
+ "codec_data",
+ gst::Buffer::from_slice([0x12, 0x08, 0x56, 0xe5, 0x00]),
+ )
+ .build(),
+ );
+ h2.play();
+
+ let output_offset = (60 * 60 * 1000).seconds();
+
+ // Push 7 buffers of 1s each, 1st and last buffer without DELTA_UNIT flag
+ for i in 0..7 {
+ let mut buffer = gst::Buffer::with_size(1).unwrap();
+ {
+ let buffer = buffer.get_mut().unwrap();
+ buffer.set_pts(i.seconds());
+ buffer.set_dts(i.seconds());
+ buffer.set_duration(gst::ClockTime::SECOND);
+ if i != 0 && i != 5 {
+ buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
+ }
+ }
+ assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok));
+
+ // Replace buffer 3 and 6 with a gap event
+ if i == 3 || i == 6 {
+ let ev = gst::event::Gap::builder(i.seconds())
+ .duration(gst::ClockTime::SECOND)
+ .build();
+ assert!(h2.push_event(ev));
+ } else {
+ let mut buffer = gst::Buffer::with_size(1).unwrap();
+ {
+ let buffer = buffer.get_mut().unwrap();
+ buffer.set_pts(i.seconds());
+ buffer.set_dts(i.seconds());
+ buffer.set_duration(gst::ClockTime::SECOND);
+ }
+ assert_eq!(h2.push(buffer), Ok(gst::FlowSuccess::Ok));
+ }
+
+ if i == 2 {
+ let ev = loop {
+ let ev = h1.pull_upstream_event().unwrap();
+ if ev.type_() != gst::EventType::Reconfigure
+ && ev.type_() != gst::EventType::Latency
+ {
+ break ev;
+ }
+ };
+
+ assert_eq!(ev.type_(), gst::EventType::CustomUpstream);
+ assert_eq!(
+ gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(),
+ gst_video::UpstreamForceKeyUnitEvent {
+ running_time: Some(5.seconds()),
+ all_headers: true,
+ count: 0
+ }
+ );
+
+ let ev = loop {
+ let ev = h2.pull_upstream_event().unwrap();
+ if ev.type_() != gst::EventType::Reconfigure
+ && ev.type_() != gst::EventType::Latency
+ {
+ break ev;
+ }
+ };
+
+ assert_eq!(ev.type_(), gst::EventType::CustomUpstream);
+ assert_eq!(
+ gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(),
+ gst_video::UpstreamForceKeyUnitEvent {
+ running_time: Some(5.seconds()),
+ all_headers: true,
+ count: 0
+ }
+ );
+ }
+ }
+
+ // Advance time and crank the clock: this should bring us to the end of the first fragment
+ h1.set_time(5.seconds()).unwrap();
+ h1.crank_single_clock_wait().unwrap();
+
+ let header = h1.pull().unwrap();
+ assert_eq!(
+ header.flags(),
+ gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT
+ );
+ assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset));
+ assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset));
+
+ let fragment_header = h1.pull().unwrap();
+ assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
+ assert_eq!(
+ fragment_header.pts(),
+ Some(gst::ClockTime::ZERO + output_offset)
+ );
+ assert_eq!(
+ fragment_header.dts(),
+ Some(gst::ClockTime::ZERO + output_offset)
+ );
+ assert_eq!(fragment_header.duration(), Some(5.seconds()));
+
+ for i in 0..5 {
+ for j in 0..2 {
+ // Skip gap events that don't result in buffers
+ if j == 1 && i == 3 {
+ continue;
+ }
+
+ let buffer = h1.pull().unwrap();
+ if i == 4 && j == 1 {
+ assert_eq!(
+ buffer.flags(),
+ gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
+ );
+ } else {
+ assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
+ }
+
+ assert_eq!(buffer.pts(), Some(i.seconds() + output_offset));
+
+ if j == 0 {
+ assert_eq!(buffer.dts(), Some(i.seconds() + output_offset));
+ } else {
+ assert!(buffer.dts().is_none());
+ }
+ assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
+ }
+ }
+
+ h1.push_event(gst::event::Eos::new());
+ h2.push_event(gst::event::Eos::new());
+
+ let fragment_header = h1.pull().unwrap();
+ assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
+ assert_eq!(fragment_header.pts(), Some(5.seconds() + output_offset));
+ assert_eq!(fragment_header.dts(), Some(5.seconds() + output_offset));
+ assert_eq!(fragment_header.duration(), Some(2.seconds()));
+
+ for i in 5..7 {
+ for j in 0..2 {
+ // Skip gap events that don't result in buffers
+ if j == 1 && i == 6 {
+ continue;
+ }
+
+ let buffer = h1.pull().unwrap();
+ if i == 6 && j == 0 {
+ assert_eq!(
+ buffer.flags(),
+ gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
+ );
+ } else {
+ assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
+ }
+ assert_eq!(buffer.pts(), Some(i.seconds() + output_offset));
+ if j == 0 {
+ assert_eq!(buffer.dts(), Some(i.seconds() + output_offset));
+ } else {
+ assert!(buffer.dts().is_none());
+ }
+ assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
+ }
+ }
+
+ let ev = h1.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::StreamStart);
+ let ev = h1.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::Caps);
+ let ev = h1.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::Segment);
+ let ev = h1.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::Eos);
+}
+
+#[test]
+fn test_single_stream_short_gops() {
+ init();
+
+ let mut h = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src"));
+
+ // 5s fragment duration
+ h.element()
+ .unwrap()
+ .set_property("fragment-duration", 5.seconds());
+
+ h.set_src_caps(
+ gst::Caps::builder("video/x-h264")
+ .field("width", 1920i32)
+ .field("height", 1080i32)
+ .field("framerate", gst::Fraction::new(30, 1))
+ .field("stream-format", "avc")
+ .field("alignment", "au")
+ .field("codec_data", gst::Buffer::with_size(1).unwrap())
+ .build(),
+ );
+ h.play();
+
+ let output_offset = (60 * 60 * 1000).seconds();
+
+ // Push 8 buffers of 1s each, 1st, 4th and 7th buffer without DELTA_UNIT flag
+ for i in 0..8 {
+ let mut buffer = gst::Buffer::with_size(1).unwrap();
+ {
+ let buffer = buffer.get_mut().unwrap();
+ buffer.set_pts(i.seconds());
+ buffer.set_dts(i.seconds());
+ buffer.set_duration(gst::ClockTime::SECOND);
+ if i != 0 && i != 3 && i != 6 {
+ buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
+ }
+ }
+ assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok));
+
+ if i == 2 || i == 7 {
+ let ev = loop {
+ let ev = h.pull_upstream_event().unwrap();
+ if ev.type_() != gst::EventType::Reconfigure
+ && ev.type_() != gst::EventType::Latency
+ {
+ break ev;
+ }
+ };
+
+ let fku_time = if i == 2 { 5.seconds() } else { 8.seconds() };
+
+ assert_eq!(ev.type_(), gst::EventType::CustomUpstream);
+ assert_eq!(
+ gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(),
+ gst_video::UpstreamForceKeyUnitEvent {
+ running_time: Some(fku_time),
+ all_headers: true,
+ count: 0
+ }
+ );
+ }
+ }
+
+ let header = h.pull().unwrap();
+ assert_eq!(
+ header.flags(),
+ gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT
+ );
+ assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset));
+ assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset));
+
+ let fragment_header = h.pull().unwrap();
+ assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
+ assert_eq!(
+ fragment_header.pts(),
+ Some(gst::ClockTime::ZERO + output_offset)
+ );
+ assert_eq!(
+ fragment_header.dts(),
+ Some(gst::ClockTime::ZERO + output_offset)
+ );
+ assert_eq!(fragment_header.duration(), Some(3.seconds()));
+
+ for i in 0..3 {
+ let buffer = h.pull().unwrap();
+ if i == 2 {
+ assert_eq!(
+ buffer.flags(),
+ gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
+ );
+ } else {
+ assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
+ }
+ assert_eq!(buffer.pts(), Some(i.seconds() + output_offset));
+ assert_eq!(buffer.dts(), Some(i.seconds() + output_offset));
+ assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
+ }
+
+ h.push_event(gst::event::Eos::new());
+
+ let fragment_header = h.pull().unwrap();
+ assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
+ assert_eq!(fragment_header.pts(), Some(3.seconds() + output_offset));
+ assert_eq!(fragment_header.dts(), Some(3.seconds() + output_offset));
+ assert_eq!(fragment_header.duration(), Some(5.seconds()));
+
+ for i in 3..8 {
+ let buffer = h.pull().unwrap();
+ if i == 7 {
+ assert_eq!(
+ buffer.flags(),
+ gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
+ );
+ } else {
+ assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
+ }
+ assert_eq!(buffer.pts(), Some(i.seconds() + output_offset));
+ assert_eq!(buffer.dts(), Some(i.seconds() + output_offset));
+ assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
+ }
+
+ let ev = h.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::StreamStart);
+ let ev = h.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::Caps);
+ let ev = h.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::Segment);
+ let ev = h.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::Eos);
+}
+
+#[test]
+fn test_single_stream_long_gops() {
+ init();
+
+ let mut h = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src"));
+
+ // 5s fragment duration
+ h.element()
+ .unwrap()
+ .set_property("fragment-duration", 5.seconds());
+
+ h.set_src_caps(
+ gst::Caps::builder("video/x-h264")
+ .field("width", 1920i32)
+ .field("height", 1080i32)
+ .field("framerate", gst::Fraction::new(30, 1))
+ .field("stream-format", "avc")
+ .field("alignment", "au")
+ .field("codec_data", gst::Buffer::with_size(1).unwrap())
+ .build(),
+ );
+ h.play();
+
+ let output_offset = (60 * 60 * 1000).seconds();
+
+ // Push 10 buffers of 1s each, 1st and 7th buffer without DELTA_UNIT flag
+ for i in 0..10 {
+ let mut buffer = gst::Buffer::with_size(1).unwrap();
+ {
+ let buffer = buffer.get_mut().unwrap();
+ buffer.set_pts(i.seconds());
+ buffer.set_dts(i.seconds());
+ buffer.set_duration(gst::ClockTime::SECOND);
+ if i != 0 && i != 6 {
+ buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
+ }
+ }
+ assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok));
+
+ if i == 2 || i == 7 {
+ let ev = loop {
+ let ev = h.pull_upstream_event().unwrap();
+ if ev.type_() != gst::EventType::Reconfigure
+ && ev.type_() != gst::EventType::Latency
+ {
+ break ev;
+ }
+ };
+
+ let fku_time = if i == 2 { 5.seconds() } else { 11.seconds() };
+
+ assert_eq!(ev.type_(), gst::EventType::CustomUpstream);
+ assert_eq!(
+ gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(),
+ gst_video::UpstreamForceKeyUnitEvent {
+ running_time: Some(fku_time),
+ all_headers: true,
+ count: 0
+ }
+ );
+ }
+ }
+
+ let header = h.pull().unwrap();
+ assert_eq!(
+ header.flags(),
+ gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT
+ );
+ assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset));
+ assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset));
+
+ let fragment_header = h.pull().unwrap();
+ assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
+ assert_eq!(
+ fragment_header.pts(),
+ Some(gst::ClockTime::ZERO + output_offset)
+ );
+ assert_eq!(
+ fragment_header.dts(),
+ Some(gst::ClockTime::ZERO + output_offset)
+ );
+ assert_eq!(fragment_header.duration(), Some(6.seconds()));
+
+ for i in 0..6 {
+ let buffer = h.pull().unwrap();
+ if i == 5 {
+ assert_eq!(
+ buffer.flags(),
+ gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
+ );
+ } else {
+ assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
+ }
+ assert_eq!(buffer.pts(), Some(i.seconds() + output_offset));
+ assert_eq!(buffer.dts(), Some(i.seconds() + output_offset));
+ assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
+ }
+
+ h.push_event(gst::event::Eos::new());
+
+ let fragment_header = h.pull().unwrap();
+ assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
+ assert_eq!(fragment_header.pts(), Some(6.seconds() + output_offset));
+ assert_eq!(fragment_header.dts(), Some(6.seconds() + output_offset));
+ assert_eq!(fragment_header.duration(), Some(4.seconds()));
+
+ for i in 6..10 {
+ let buffer = h.pull().unwrap();
+ if i == 9 {
+ assert_eq!(
+ buffer.flags(),
+ gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
+ );
+ } else {
+ assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
+ }
+ assert_eq!(buffer.pts(), Some(i.seconds() + output_offset));
+ assert_eq!(buffer.dts(), Some(i.seconds() + output_offset));
+ assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
+ }
+
+ let ev = h.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::StreamStart);
+ let ev = h.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::Caps);
+ let ev = h.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::Segment);
+ let ev = h.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::Eos);
+}
+
+#[test]
+fn test_buffer_multi_stream_short_gops() {
+ init();
+
+ let mut h1 = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src"));
+ let mut h2 = gst_check::Harness::with_element(&h1.element().unwrap(), Some("sink_1"), None);
+
+ // 5s fragment duration
+ h1.element()
+ .unwrap()
+ .set_property("fragment-duration", 5.seconds());
+
+ h1.set_src_caps(
+ gst::Caps::builder("video/x-h264")
+ .field("width", 1920i32)
+ .field("height", 1080i32)
+ .field("framerate", gst::Fraction::new(30, 1))
+ .field("stream-format", "avc")
+ .field("alignment", "au")
+ .field("codec_data", gst::Buffer::with_size(1).unwrap())
+ .build(),
+ );
+ h1.play();
+
+ h2.set_src_caps(
+ gst::Caps::builder("audio/mpeg")
+ .field("mpegversion", 4i32)
+ .field("channels", 1i32)
+ .field("rate", 44100i32)
+ .field("stream-format", "raw")
+ .field("base-profile", "lc")
+ .field("profile", "lc")
+ .field("level", "2")
+ .field(
+ "codec_data",
+ gst::Buffer::from_slice([0x12, 0x08, 0x56, 0xe5, 0x00]),
+ )
+ .build(),
+ );
+ h2.play();
+
+ let output_offset = (60 * 60 * 1000).seconds();
+
+ // Push 8 buffers of 1s each, 1st, 4th and 7th buffer without DELTA_UNIT flag
+ for i in 0..8 {
+ let mut buffer = gst::Buffer::with_size(1).unwrap();
+ {
+ let buffer = buffer.get_mut().unwrap();
+ buffer.set_pts(i.seconds());
+ buffer.set_dts(i.seconds());
+ buffer.set_duration(gst::ClockTime::SECOND);
+ if i != 0 && i != 3 && i != 6 {
+ buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
+ }
+ }
+ assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok));
+
+ let mut buffer = gst::Buffer::with_size(1).unwrap();
+ {
+ let buffer = buffer.get_mut().unwrap();
+ buffer.set_pts(i.seconds());
+ buffer.set_dts(i.seconds());
+ buffer.set_duration(gst::ClockTime::SECOND);
+ }
+ assert_eq!(h2.push(buffer), Ok(gst::FlowSuccess::Ok));
+
+ if i == 2 || i == 7 {
+ let ev = loop {
+ let ev = h1.pull_upstream_event().unwrap();
+ if ev.type_() != gst::EventType::Reconfigure
+ && ev.type_() != gst::EventType::Latency
+ {
+ break ev;
+ }
+ };
+
+ let fku_time = if i == 2 { 5.seconds() } else { 8.seconds() };
+
+ assert_eq!(ev.type_(), gst::EventType::CustomUpstream);
+ assert_eq!(
+ gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(),
+ gst_video::UpstreamForceKeyUnitEvent {
+ running_time: Some(fku_time),
+ all_headers: true,
+ count: 0
+ }
+ );
+
+ let ev = loop {
+ let ev = h2.pull_upstream_event().unwrap();
+ if ev.type_() != gst::EventType::Reconfigure
+ && ev.type_() != gst::EventType::Latency
+ {
+ break ev;
+ }
+ };
+
+ assert_eq!(ev.type_(), gst::EventType::CustomUpstream);
+ assert_eq!(
+ gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(),
+ gst_video::UpstreamForceKeyUnitEvent {
+ running_time: Some(fku_time),
+ all_headers: true,
+ count: 0
+ }
+ );
+ }
+ }
+
+ let header = h1.pull().unwrap();
+ assert_eq!(
+ header.flags(),
+ gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT
+ );
+ assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset));
+ assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset));
+
+ let fragment_header = h1.pull().unwrap();
+ assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
+ assert_eq!(
+ fragment_header.pts(),
+ Some(gst::ClockTime::ZERO + output_offset)
+ );
+ assert_eq!(
+ fragment_header.dts(),
+ Some(gst::ClockTime::ZERO + output_offset)
+ );
+ assert_eq!(fragment_header.duration(), Some(3.seconds()));
+
+ for i in 0..3 {
+ for j in 0..2 {
+ let buffer = h1.pull().unwrap();
+ if i == 2 && j == 1 {
+ assert_eq!(
+ buffer.flags(),
+ gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
+ );
+ } else {
+ assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
+ }
+
+ assert_eq!(buffer.pts(), Some(i.seconds() + output_offset));
+
+ if j == 0 {
+ assert_eq!(buffer.dts(), Some(i.seconds() + output_offset));
+ } else {
+ assert!(buffer.dts().is_none());
+ }
+ assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
+ }
+ }
+
+ h1.push_event(gst::event::Eos::new());
+ h2.push_event(gst::event::Eos::new());
+
+ let fragment_header = h1.pull().unwrap();
+ assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
+ assert_eq!(fragment_header.pts(), Some(3.seconds() + output_offset));
+ assert_eq!(fragment_header.dts(), Some(3.seconds() + output_offset));
+ assert_eq!(fragment_header.duration(), Some(5.seconds()));
+
+ for i in 3..8 {
+ for j in 0..2 {
+ let buffer = h1.pull().unwrap();
+ if i == 7 && j == 1 {
+ assert_eq!(
+ buffer.flags(),
+ gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
+ );
+ } else {
+ assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
+ }
+ assert_eq!(buffer.pts(), Some(i.seconds() + output_offset));
+ if j == 0 {
+ assert_eq!(buffer.dts(), Some(i.seconds() + output_offset));
+ } else {
+ assert!(buffer.dts().is_none());
+ }
+ assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
+ }
+ }
+
+ let ev = h1.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::StreamStart);
+ let ev = h1.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::Caps);
+ let ev = h1.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::Segment);
+ let ev = h1.pull_event().unwrap();
+ assert_eq!(ev.type_(), gst::EventType::Eos);
+}