diff options
author | Sebastian Dröge <sebastian@centricular.com> | 2022-10-23 18:23:45 +0300 |
---|---|---|
committer | Sebastian Dröge <sebastian@centricular.com> | 2022-10-23 20:25:08 +0300 |
commit | 211cd095d69726a3a2208feddd921d05b60c6540 (patch) | |
tree | 648bbbe80a68a68b3f6315fcf580cae146d0249e /mux | |
parent | 5d44e0eb3c309ce7ad0cfb378d0169d8ce3305b3 (diff) |
Add new mux subdirectory for container formats
Contains the (incomplete) flavors FLV demuxer and the fragmented MP4
muxer for now.
Fixes https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/173
Diffstat (limited to 'mux')
-rw-r--r-- | mux/flavors/Cargo.toml | 47 | ||||
l--------- | mux/flavors/LICENSE-APACHE | 1 | ||||
l--------- | mux/flavors/LICENSE-MIT | 1 | ||||
-rw-r--r-- | mux/flavors/build.rs | 3 | ||||
-rw-r--r-- | mux/flavors/src/bytes.rs | 142 | ||||
-rw-r--r-- | mux/flavors/src/flvdemux/imp.rs | 1538 | ||||
-rw-r--r-- | mux/flavors/src/flvdemux/mod.rs | 27 | ||||
-rw-r--r-- | mux/flavors/src/lib.rs | 36 | ||||
-rw-r--r-- | mux/fmp4/Cargo.toml | 52 | ||||
l--------- | mux/fmp4/LICENSE | 1 | ||||
-rw-r--r-- | mux/fmp4/build.rs | 3 | ||||
-rw-r--r-- | mux/fmp4/examples/dash_vod.rs | 266 | ||||
-rw-r--r-- | mux/fmp4/examples/hls_live.rs | 572 | ||||
-rw-r--r-- | mux/fmp4/examples/hls_vod.rs | 472 | ||||
-rw-r--r-- | mux/fmp4/src/fmp4mux/boxes.rs | 2073 | ||||
-rw-r--r-- | mux/fmp4/src/fmp4mux/imp.rs | 2596 | ||||
-rw-r--r-- | mux/fmp4/src/fmp4mux/mod.rs | 146 | ||||
-rw-r--r-- | mux/fmp4/src/lib.rs | 34 | ||||
-rw-r--r-- | mux/fmp4/tests/tests.rs | 1241 |
19 files changed, 9251 insertions, 0 deletions
diff --git a/mux/flavors/Cargo.toml b/mux/flavors/Cargo.toml new file mode 100644 index 000000000..7f8eceeb8 --- /dev/null +++ b/mux/flavors/Cargo.toml @@ -0,0 +1,47 @@ +[package] +name = "gst-plugin-flavors" +version = "0.9.0-alpha.1" +authors = ["Sebastian Dröge <sebastian@centricular.com>"] +repository = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" +license = "MIT OR Apache-2.0" +edition = "2021" +rust-version = "1.63" +description = "GStreamer Rust FLV Plugin" + +[dependencies] +gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } +gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } +gst-audio = { package = "gstreamer-audio", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } +num-rational = { version = "0.4", default-features = false, features = [] } +nom = "7" +flavors = { git = "https://github.com/rust-av/flavors" } +muldiv = "1.0" +byteorder = "1.0" +once_cell = "1.0" +smallvec = "1.0" + +[lib] +name = "gstrsflv" +crate-type = ["cdylib", "rlib"] +path = "src/lib.rs" + +[build-dependencies] +gst-plugin-version-helper = { path="../../version-helper" } + +[features] +static = [] +capi = [] +doc = ["gst/v1_18"] + +[package.metadata.capi] +min_version = "0.8.0" + +[package.metadata.capi.header] +enabled = false + +[package.metadata.capi.library] +install_subdir = "gstreamer-1.0" +versioning = false + +[package.metadata.capi.pkg_config] +requires_private = "gstreamer-1.0, gstreamer-base-1.0, gobject-2.0, glib-2.0, gmodule-2.0" diff --git a/mux/flavors/LICENSE-APACHE b/mux/flavors/LICENSE-APACHE new file mode 120000 index 000000000..1cd601d0a --- /dev/null +++ b/mux/flavors/LICENSE-APACHE @@ -0,0 +1 @@ +../../LICENSE-APACHE
\ No newline at end of file diff --git a/mux/flavors/LICENSE-MIT b/mux/flavors/LICENSE-MIT new file mode 120000 index 000000000..b2cfbdc7b --- /dev/null +++ b/mux/flavors/LICENSE-MIT @@ -0,0 +1 @@ +../../LICENSE-MIT
\ No newline at end of file diff --git a/mux/flavors/build.rs b/mux/flavors/build.rs new file mode 100644 index 000000000..cda12e57e --- /dev/null +++ b/mux/flavors/build.rs @@ -0,0 +1,3 @@ +fn main() { + gst_plugin_version_helper::info() +} diff --git a/mux/flavors/src/bytes.rs b/mux/flavors/src/bytes.rs new file mode 100644 index 000000000..02263993c --- /dev/null +++ b/mux/flavors/src/bytes.rs @@ -0,0 +1,142 @@ +// Copyright (C) 2016-2017 Sebastian Dröge <sebastian@centricular.com> +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. +// +// SPDX-License-Identifier: MIT OR Apache-2.0 + +pub use byteorder::{BigEndian, LittleEndian, ReadBytesExt, WriteBytesExt}; +use std::io; + +pub trait ReadBytesExtShort: io::Read { + fn read_u16le(&mut self) -> io::Result<u16> { + self.read_u16::<LittleEndian>() + } + fn read_i16le(&mut self) -> io::Result<i16> { + self.read_i16::<LittleEndian>() + } + fn read_u32le(&mut self) -> io::Result<u32> { + self.read_u32::<LittleEndian>() + } + fn read_i32le(&mut self) -> io::Result<i32> { + self.read_i32::<LittleEndian>() + } + fn read_u64le(&mut self) -> io::Result<u64> { + self.read_u64::<LittleEndian>() + } + fn read_i64le(&mut self) -> io::Result<i64> { + self.read_i64::<LittleEndian>() + } + fn read_uintle(&mut self, nbytes: usize) -> io::Result<u64> { + self.read_uint::<LittleEndian>(nbytes) + } + fn read_intle(&mut self, nbytes: usize) -> io::Result<i64> { + self.read_int::<LittleEndian>(nbytes) + } + fn read_f32le(&mut self) -> io::Result<f32> { + self.read_f32::<LittleEndian>() + } + fn read_f64le(&mut self) -> io::Result<f64> { + self.read_f64::<LittleEndian>() + } + fn read_u16be(&mut self) -> io::Result<u16> { + self.read_u16::<BigEndian>() + } + fn read_i16be(&mut self) -> io::Result<i16> { + self.read_i16::<BigEndian>() + } + fn read_u32be(&mut self) -> io::Result<u32> { + self.read_u32::<BigEndian>() + } + fn read_i32be(&mut self) -> io::Result<i32> { + self.read_i32::<BigEndian>() + } + fn read_u64be(&mut self) -> io::Result<u64> { + self.read_u64::<BigEndian>() + } + fn read_i64be(&mut self) -> io::Result<i64> { + self.read_i64::<BigEndian>() + } + fn read_uintbe(&mut self, nbytes: usize) -> io::Result<u64> { + self.read_uint::<BigEndian>(nbytes) + } + fn read_intbe(&mut self, nbytes: usize) -> io::Result<i64> { + self.read_int::<BigEndian>(nbytes) + } + fn read_f32be(&mut self) -> io::Result<f32> { + self.read_f32::<BigEndian>() + } + fn read_f64be(&mut self) -> io::Result<f64> { + self.read_f64::<BigEndian>() + } +} + +impl<T> ReadBytesExtShort for T where T: ReadBytesExt {} + +pub trait WriteBytesExtShort: WriteBytesExt { + fn write_u16le(&mut self, n: u16) -> io::Result<()> { + self.write_u16::<LittleEndian>(n) + } + fn write_i16le(&mut self, n: i16) -> io::Result<()> { + self.write_i16::<LittleEndian>(n) + } + fn write_u32le(&mut self, n: u32) -> io::Result<()> { + self.write_u32::<LittleEndian>(n) + } + fn write_i32le(&mut self, n: i32) -> io::Result<()> { + self.write_i32::<LittleEndian>(n) + } + fn write_u64le(&mut self, n: u64) -> io::Result<()> { + self.write_u64::<LittleEndian>(n) + } + fn write_i64le(&mut self, n: i64) -> io::Result<()> { + self.write_i64::<LittleEndian>(n) + } + fn write_uintle(&mut self, n: u64, nbytes: usize) -> io::Result<()> { + self.write_uint::<LittleEndian>(n, nbytes) + } + fn write_intle(&mut self, n: i64, nbytes: usize) -> io::Result<()> { + self.write_int::<LittleEndian>(n, nbytes) + } + fn write_f32le(&mut self, n: f32) -> io::Result<()> { + self.write_f32::<LittleEndian>(n) + } + fn write_f64le(&mut self, n: f64) -> io::Result<()> { + self.write_f64::<LittleEndian>(n) + } + fn write_u16be(&mut self, n: u16) -> io::Result<()> { + self.write_u16::<BigEndian>(n) + } + fn write_i16be(&mut self, n: i16) -> io::Result<()> { + self.write_i16::<BigEndian>(n) + } + fn write_u32be(&mut self, n: u32) -> io::Result<()> { + self.write_u32::<BigEndian>(n) + } + fn write_i32be(&mut self, n: i32) -> io::Result<()> { + self.write_i32::<BigEndian>(n) + } + fn write_u64be(&mut self, n: u64) -> io::Result<()> { + self.write_u64::<BigEndian>(n) + } + fn write_i64be(&mut self, n: i64) -> io::Result<()> { + self.write_i64::<BigEndian>(n) + } + fn write_uintbe(&mut self, n: u64, nbytes: usize) -> io::Result<()> { + self.write_uint::<BigEndian>(n, nbytes) + } + fn write_intbe(&mut self, n: i64, nbytes: usize) -> io::Result<()> { + self.write_int::<BigEndian>(n, nbytes) + } + fn write_f32be(&mut self, n: f32) -> io::Result<()> { + self.write_f32::<BigEndian>(n) + } + fn write_f64be(&mut self, n: f64) -> io::Result<()> { + self.write_f64::<BigEndian>(n) + } +} + +impl<T> WriteBytesExtShort for T where T: WriteBytesExt {} diff --git a/mux/flavors/src/flvdemux/imp.rs b/mux/flavors/src/flvdemux/imp.rs new file mode 100644 index 000000000..67d2f7d9d --- /dev/null +++ b/mux/flavors/src/flvdemux/imp.rs @@ -0,0 +1,1538 @@ +// Copyright (C) 2016-2018 Sebastian Dröge <sebastian@centricular.com> +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. +// +// SPDX-License-Identifier: MIT OR Apache-2.0 + +use std::cmp; +use std::sync::Mutex; + +// FIXME: rustfmt removes the :: but they're required here +#[rustfmt::skip] +use ::flavors::parser as flavors; + +use gst::glib; +use gst::prelude::*; +use gst::subclass::prelude::*; + +use num_rational::Rational32; + +use once_cell::sync::Lazy; + +use smallvec::SmallVec; + +static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| { + gst::DebugCategory::new( + "rsflvdemux", + gst::DebugColorFlags::empty(), + Some("Rust FLV demuxer"), + ) +}); + +pub struct FlvDemux { + sinkpad: gst::Pad, + audio_srcpad: Mutex<Option<gst::Pad>>, + video_srcpad: Mutex<Option<gst::Pad>>, + adapter: Mutex<gst_base::UniqueAdapter>, + flow_combiner: Mutex<gst_base::UniqueFlowCombiner>, + state: Mutex<State>, +} + +#[allow(clippy::large_enum_variant)] +enum State { + Stopped, + NeedHeader, + Skipping { + audio: bool, + video: bool, + skip_left: u32, + }, + Streaming(StreamingState), +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +enum Stream { + Audio, + Video, +} + +#[derive(Clone, PartialEq, Eq)] +enum Event { + StreamChanged(Stream, gst::Caps), + Buffer(Stream, gst::Buffer), + HaveAllStreams, +} + +struct StreamingState { + audio: Option<AudioFormat>, + expect_audio: bool, + video: Option<VideoFormat>, + expect_video: bool, + got_all_streams: bool, + last_position: Option<gst::ClockTime>, + + metadata: Option<Metadata>, + + aac_sequence_header: Option<gst::Buffer>, + avc_sequence_header: Option<gst::Buffer>, +} + +#[derive(Debug, Eq, Clone)] +struct AudioFormat { + format: flavors::SoundFormat, + rate: u16, + width: u8, + channels: u8, + bitrate: Option<u32>, + aac_sequence_header: Option<gst::Buffer>, +} + +#[derive(Debug, Eq, Clone)] +struct VideoFormat { + format: flavors::CodecId, + width: Option<u32>, + height: Option<u32>, + pixel_aspect_ratio: Option<Rational32>, + framerate: Option<Rational32>, + bitrate: Option<u32>, + avc_sequence_header: Option<gst::Buffer>, +} + +#[derive(Debug, PartialEq, Eq, Clone, Default)] +struct Metadata { + duration: Option<gst::ClockTime>, + + creation_date: Option<String>, + creator: Option<String>, + title: Option<String>, + metadata_creator: Option<String>, /* TODO: seek_table: _, + * filepositions / times metadata arrays */ + + audio_bitrate: Option<u32>, + + video_width: Option<u32>, + video_height: Option<u32>, + video_pixel_aspect_ratio: Option<Rational32>, + video_framerate: Option<Rational32>, + video_bitrate: Option<u32>, +} + +#[glib::object_subclass] +impl ObjectSubclass for FlvDemux { + const NAME: &'static str = "RsFlvDemux"; + type Type = super::FlvDemux; + type ParentType = gst::Element; + + fn with_class(klass: &Self::Class) -> Self { + let templ = klass.pad_template("sink").unwrap(); + let sinkpad = gst::Pad::builder_with_template(&templ, Some("sink")) + .activate_function(|pad, parent| { + FlvDemux::catch_panic_pad_function( + parent, + || Err(gst::loggable_error!(CAT, "Panic activating sink pad")), + |demux| demux.sink_activate(pad), + ) + }) + .activatemode_function(|pad, parent, mode, active| { + FlvDemux::catch_panic_pad_function( + parent, + || { + Err(gst::loggable_error!( + CAT, + "Panic activating sink pad with mode" + )) + }, + |demux| { + demux.sink_activatemode(pad, mode, active); + Ok(()) + }, + ) + }) + .chain_function(|pad, parent, buffer| { + FlvDemux::catch_panic_pad_function( + parent, + || Err(gst::FlowError::Error), + |demux| demux.sink_chain(pad, buffer), + ) + }) + .event_function(|pad, parent, event| { + FlvDemux::catch_panic_pad_function( + parent, + || false, + |demux| demux.sink_event(pad, event), + ) + }) + .build(); + + FlvDemux { + sinkpad, + audio_srcpad: Mutex::new(None), + video_srcpad: Mutex::new(None), + state: Mutex::new(State::Stopped), + adapter: Mutex::new(gst_base::UniqueAdapter::new()), + flow_combiner: Mutex::new(gst_base::UniqueFlowCombiner::new()), + } + } +} + +impl ObjectImpl for FlvDemux { + fn constructed(&self) { + self.parent_constructed(); + + self.instance().add_pad(&self.sinkpad).unwrap(); + } +} + +impl GstObjectImpl for FlvDemux {} + +impl ElementImpl for FlvDemux { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "FLV Demuxer", + "Codec/Demuxer", + "Demuxes FLV Streams", + "Sebastian Dröge <sebastian@centricular.com>", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| { + let mut caps = gst::Caps::new_empty(); + { + let caps = caps.get_mut().unwrap(); + + caps.append( + gst::Caps::builder("audio/mpeg") + .field("mpegversion", 1i32) + .build(), + ); + caps.append( + gst_audio::AudioCapsBuilder::new_interleaved() + .format_list([gst_audio::AudioFormat::U8, gst_audio::AudioFormat::S16le]) + .build(), + ); + caps.append( + gst::Caps::builder("audio/x-adpcm") + .field("layout", "swf") + .build(), + ); + caps.append(gst::Caps::builder("audio/x-nellymoser").build()); + caps.append(gst::Caps::builder("audio/x-alaw").build()); + caps.append(gst::Caps::builder("audio/x-mulaw").build()); + caps.append( + gst::Caps::builder("audio/mpeg") + .field("mpegversion", 4i32) + .field("framed", true) + .field("stream-format", "raw") + .build(), + ); + caps.append(gst::Caps::builder("audio/x-speex").build()); + } + let audiosrc_pad_template = gst::PadTemplate::new( + "audio", + gst::PadDirection::Src, + gst::PadPresence::Sometimes, + &caps, + ) + .unwrap(); + + let mut caps = gst::Caps::new_empty(); + { + let caps = caps.get_mut().unwrap(); + + caps.append( + gst::Caps::builder("video/x-flash-video") + .field("flvversion", 1i32) + .build(), + ); + caps.append(gst::Caps::builder("video/x-flash-screen").build()); + caps.append(gst::Caps::builder("video/x-vp6-flash").build()); + caps.append(gst::Caps::builder("video/x-vp6-flash-alpha").build()); + caps.append(gst::Caps::builder("video/x-flash-screen2").build()); + caps.append( + gst::Caps::builder("video/x-h264") + .field("stream-format", "avc") + .build(), + ); + caps.append(gst::Caps::builder("video/x-h263").build()); + caps.append( + gst::Caps::builder("video/mpeg") + .field("mpegversion", 4i32) + .build(), + ); + } + let videosrc_pad_template = gst::PadTemplate::new( + "video", + gst::PadDirection::Src, + gst::PadPresence::Sometimes, + &caps, + ) + .unwrap(); + + let caps = gst::Caps::builder("video/x-flv").build(); + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &caps, + ) + .unwrap(); + + vec![ + audiosrc_pad_template, + videosrc_pad_template, + sink_pad_template, + ] + }); + + PAD_TEMPLATES.as_ref() + } +} + +impl FlvDemux { + fn sink_activate(&self, pad: &gst::Pad) -> Result<(), gst::LoggableError> { + let mode = { + let mut query = gst::query::Scheduling::new(); + if !pad.peer_query(&mut query) { + return Err(gst::loggable_error!(CAT, "Scheduling query failed on peer")); + } + + // TODO: pull mode + // if query.has_scheduling_mode_with_flags( + // gst::PadMode::Pull, + // gst::SchedulingFlags::SEEKABLE, + // ) + // { + // gst::debug!(CAT, obj: pad, "Activating in Pull mode"); + // gst::PadMode::Pull + // } else { + gst::debug!(CAT, obj: pad, "Activating in Push mode"); + gst::PadMode::Push + // } + }; + + pad.activate_mode(mode, true)?; + Ok(()) + } + + fn sink_activatemode(&self, _pad: &gst::Pad, mode: gst::PadMode, active: bool) { + if active { + self.start(mode); + + if mode == gst::PadMode::Pull { + // TODO implement pull mode + // self.sinkpad.start_task(...) + unimplemented!(); + } + } else { + if mode == gst::PadMode::Pull { + let _ = self.sinkpad.stop_task(); + } + + self.stop(); + } + } + + fn start(&self, _mode: gst::PadMode) { + *self.state.lock().unwrap() = State::NeedHeader; + } + + fn stop(&self) { + *self.state.lock().unwrap() = State::Stopped; + self.adapter.lock().unwrap().clear(); + + let mut flow_combiner = self.flow_combiner.lock().unwrap(); + if let Some(pad) = self.audio_srcpad.lock().unwrap().take() { + self.instance().remove_pad(&pad).unwrap(); + flow_combiner.remove_pad(&pad); + } + + if let Some(pad) = self.video_srcpad.lock().unwrap().take() { + self.instance().remove_pad(&pad).unwrap(); + flow_combiner.remove_pad(&pad); + } + + flow_combiner.reset(); + } + + fn sink_event(&self, pad: &gst::Pad, event: gst::Event) -> bool { + use gst::EventView; + + gst::log!(CAT, obj: pad, "Handling event {:?}", event); + match event.view() { + EventView::Eos(..) => { + // TODO implement + gst::Pad::event_default(pad, Some(&*self.instance()), event) + } + EventView::Segment(..) => { + // TODO implement + gst::Pad::event_default(pad, Some(&*self.instance()), event) + } + EventView::FlushStart(..) => { + // TODO implement + gst::Pad::event_default(pad, Some(&*self.instance()), event) + } + EventView::FlushStop(..) => { + // TODO implement + gst::Pad::event_default(pad, Some(&*self.instance()), event) + } + _ => gst::Pad::event_default(pad, Some(&*self.instance()), event), + } + } + + fn src_query(&self, pad: &gst::Pad, query: &mut gst::QueryRef) -> bool { + use gst::QueryViewMut; + + match query.view_mut() { + QueryViewMut::Position(q) => { + let fmt = q.format(); + if fmt == gst::Format::Time { + if self.sinkpad.peer_query(q.query_mut()) { + return true; + } + + if let State::Streaming(StreamingState { last_position, .. }) = + *self.state.lock().unwrap() + { + q.set(last_position); + return true; + } + + false + } else { + false + } + } + QueryViewMut::Duration(q) => { + let fmt = q.format(); + if fmt == gst::Format::Time { + if self.sinkpad.peer_query(q.query_mut()) { + return true; + } + + if let State::Streaming(StreamingState { + metadata: Some(Metadata { duration, .. }), + .. + }) = *self.state.lock().unwrap() + { + q.set(duration); + return true; + } + + false + } else { + false + } + } + _ => gst::Pad::query_default(pad, Some(&*self.instance()), query), + } + } + + fn src_event(&self, pad: &gst::Pad, event: gst::Event) -> bool { + use gst::EventView; + + match event.view() { + EventView::Seek(..) => { + // TODO: Implement + false + } + _ => gst::Pad::event_default(pad, Some(&*self.instance()), event), + } + } + + fn sink_chain( + &self, + pad: &gst::Pad, + buffer: gst::Buffer, + ) -> Result<gst::FlowSuccess, gst::FlowError> { + gst::log!(CAT, obj: pad, "Handling buffer {:?}", buffer); + + let mut adapter = self.adapter.lock().unwrap(); + adapter.push(buffer); + + let mut state = self.state.lock().unwrap(); + loop { + match *state { + State::Stopped => unreachable!(), + State::NeedHeader => { + let header = match self.find_header(&mut *adapter) { + Ok(header) => header, + Err(_) => { + gst::trace!(CAT, imp: self, "Need more data"); + return Ok(gst::FlowSuccess::Ok); + } + }; + + let skip = if header.offset < 9 { + 0 + } else { + header.offset - 9 + }; + + *state = State::Skipping { + audio: header.audio, + video: header.video, + skip_left: skip, + }; + } + State::Skipping { + audio, + video, + skip_left: 0, + } => { + *state = State::Streaming(StreamingState::new(audio, video)); + } + State::Skipping { + ref mut skip_left, .. + } => { + let avail = adapter.available(); + if avail == 0 { + gst::trace!(CAT, imp: self, "Need more data"); + return Ok(gst::FlowSuccess::Ok); + } + let skip = cmp::min(avail, *skip_left as usize); + adapter.flush(skip); + *skip_left -= skip as u32; + } + State::Streaming(ref mut sstate) => { + let res = sstate.handle_tag(self, &mut *adapter); + + match res { + Ok(None) => { + gst::trace!(CAT, imp: self, "Need more data"); + return Ok(gst::FlowSuccess::Ok); + } + Ok(Some(events)) => { + drop(state); + drop(adapter); + + self.handle_events(events)?; + + adapter = self.adapter.lock().unwrap(); + state = self.state.lock().unwrap(); + } + Err(err) => { + self.post_error_message(err); + return Err(gst::FlowError::Error); + } + } + } + } + } + } + + fn find_header(&self, adapter: &mut gst_base::UniqueAdapter) -> Result<flavors::Header, ()> { + while adapter.available() >= 9 { + let data = adapter.map(9).unwrap(); + + if let Ok((_, header)) = flavors::header(&*data) { + gst::debug!(CAT, imp: self, "Found FLV header: {:?}", header); + drop(data); + adapter.flush(9); + + return Ok(header); + } + + drop(data); + adapter.flush(1); + } + + Err(()) + } + + fn handle_events( + &self, + events: SmallVec<[Event; 4]>, + ) -> Result<gst::FlowSuccess, gst::FlowError> { + for event in events { + match event { + Event::StreamChanged(stream, caps) => { + let pad = match stream { + Stream::Audio => { + let mut audio_srcpad = self.audio_srcpad.lock().unwrap(); + if let Some(ref srcpad) = *audio_srcpad { + srcpad.clone() + } else { + let srcpad = self.create_srcpad("audio", &caps); + *audio_srcpad = Some(srcpad.clone()); + + srcpad + } + } + Stream::Video => { + let mut video_srcpad = self.video_srcpad.lock().unwrap(); + if let Some(ref srcpad) = *video_srcpad { + srcpad.clone() + } else { + let srcpad = self.create_srcpad("video", &caps); + + *video_srcpad = Some(srcpad.clone()); + + srcpad + } + } + }; + + pad.push_event(gst::event::Caps::new(&caps)); + } + Event::Buffer(stream, buffer) => { + let pad = match stream { + Stream::Audio => { + self.audio_srcpad.lock().unwrap().as_ref().map(Clone::clone) + } + Stream::Video => { + self.video_srcpad.lock().unwrap().as_ref().map(Clone::clone) + } + }; + + if let Some(pad) = pad { + let res = pad.push(buffer); + gst::trace!( + CAT, + imp: self, + "Pushing buffer for stream {:?} returned {:?}", + stream, + res + ); + + self.flow_combiner + .lock() + .unwrap() + .update_pad_flow(&pad, res)?; + } + } + Event::HaveAllStreams => { + self.instance().no_more_pads(); + } + } + } + + Ok(gst::FlowSuccess::Ok) + } + + fn create_srcpad(&self, name: &str, caps: &gst::Caps) -> gst::Pad { + let templ = self.instance().element_class().pad_template(name).unwrap(); + let srcpad = gst::Pad::builder_with_template(&templ, Some(name)) + .event_function(|pad, parent, event| { + FlvDemux::catch_panic_pad_function( + parent, + || false, + |demux| demux.src_event(pad, event), + ) + }) + .query_function(|pad, parent, query| { + FlvDemux::catch_panic_pad_function( + parent, + || false, + |demux| demux.src_query(pad, query), + ) + }) + .build(); + + srcpad.set_active(true).unwrap(); + + let full_stream_id = srcpad.create_stream_id(&*self.instance(), Some(name)); + // FIXME group id + srcpad.push_event(gst::event::StreamStart::new(&full_stream_id)); + srcpad.push_event(gst::event::Caps::new(caps)); + + // FIXME proper segment handling + let segment = gst::FormattedSegment::<gst::ClockTime>::default(); + srcpad.push_event(gst::event::Segment::new(&segment)); + + self.flow_combiner.lock().unwrap().add_pad(&srcpad); + + self.instance().add_pad(&srcpad).unwrap(); + + srcpad + } +} + +impl StreamingState { + fn new(audio: bool, video: bool) -> StreamingState { + StreamingState { + audio: None, + expect_audio: audio, + video: None, + expect_video: video, + got_all_streams: false, + last_position: gst::ClockTime::NONE, + metadata: None, + aac_sequence_header: None, + avc_sequence_header: None, + } + } + + fn handle_tag( + &mut self, + imp: &FlvDemux, + adapter: &mut gst_base::UniqueAdapter, + ) -> Result<Option<SmallVec<[Event; 4]>>, gst::ErrorMessage> { + use nom::number::complete::be_u32; + + if adapter.available() < 15 { + return Ok(None); + } + + let data = adapter.map(15).unwrap(); + + match be_u32::<_, (_, nom::error::ErrorKind)>(&data[0..4]) { + Err(_) => unreachable!(), + Ok((_, previous_size)) => { + gst::trace!(CAT, imp: imp, "Previous tag size {}", previous_size); + // Nothing to do here, we just consume it for now + } + } + + let tag_header = match flavors::tag_header(&data[4..]) { + Err(nom::Err::Error(err)) | Err(nom::Err::Failure(err)) => { + return Err(gst::error_msg!( + gst::StreamError::Demux, + ["Invalid tag header: {:?}", err] + )); + } + Err(nom::Err::Incomplete(_)) => unreachable!(), + Ok((_, tag_header)) => tag_header, + }; + + gst::trace!(CAT, imp: imp, "Parsed tag header {:?}", tag_header); + + drop(data); + + if adapter.available() < (15 + tag_header.data_size) as usize { + return Ok(None); + } + + adapter.flush(15); + + match tag_header.tag_type { + flavors::TagType::Script => { + gst::trace!(CAT, imp: imp, "Found script tag"); + + Ok(self.handle_script_tag(imp, &tag_header, adapter)) + } + flavors::TagType::Audio => { + gst::trace!(CAT, imp: imp, "Found audio tag"); + + self.handle_audio_tag(imp, &tag_header, adapter) + } + flavors::TagType::Video => { + gst::trace!(CAT, imp: imp, "Found video tag"); + + self.handle_video_tag(imp, &tag_header, adapter) + } + } + .map(Option::Some) + } + + fn handle_script_tag( + &mut self, + imp: &FlvDemux, + tag_header: &flavors::TagHeader, + adapter: &mut gst_base::UniqueAdapter, + ) -> SmallVec<[Event; 4]> { + assert!(adapter.available() >= tag_header.data_size as usize); + + let mut events = SmallVec::new(); + + let data = adapter.map(tag_header.data_size as usize).unwrap(); + + match flavors::script_data(&*data) { + Ok((_, ref script_data)) if script_data.name == "onMetaData" => { + gst::trace!(CAT, imp: imp, "Got script tag: {:?}", script_data); + + let metadata = Metadata::new(script_data); + gst::debug!(CAT, imp: imp, "Got metadata: {:?}", metadata); + + let audio_changed = self + .audio + .as_mut() + .map(|a| a.update_with_metadata(&metadata)) + .unwrap_or(false); + let video_changed = self + .video + .as_mut() + .map(|v| v.update_with_metadata(&metadata)) + .unwrap_or(false); + self.metadata = Some(metadata); + + if audio_changed || video_changed { + if audio_changed { + if let Some(caps) = self.audio.as_ref().and_then(|a| a.to_caps()) { + events.push(Event::StreamChanged(Stream::Audio, caps)); + } + } + if video_changed { + if let Some(caps) = self.video.as_ref().and_then(|v| v.to_caps()) { + events.push(Event::StreamChanged(Stream::Video, caps)); + } + } + } + } + Ok((_, ref script_data)) => { + gst::trace!(CAT, imp: imp, "Got script tag: {:?}", script_data); + } + Err(nom::Err::Error(err)) | Err(nom::Err::Failure(err)) => { + gst::error!(CAT, imp: imp, "Error parsing script tag: {:?}", err); + } + Err(nom::Err::Incomplete(_)) => { + // ignore + } + } + + drop(data); + adapter.flush(tag_header.data_size as usize); + + events + } + + fn update_audio_stream( + &mut self, + imp: &FlvDemux, + data_header: &flavors::AudioDataHeader, + ) -> SmallVec<[Event; 4]> { + let mut events = SmallVec::new(); + + gst::trace!(CAT, imp: imp, "Got audio data header: {:?}", data_header); + + let new_audio_format = + AudioFormat::new(data_header, &self.metadata, &self.aac_sequence_header); + + if self.audio.as_ref() != Some(&new_audio_format) { + gst::debug!( + CAT, + imp: imp, + "Got new audio format: {:?}", + new_audio_format + ); + + let caps = new_audio_format.to_caps(); + if let Some(caps) = caps { + self.audio = Some(new_audio_format); + events.push(Event::StreamChanged(Stream::Audio, caps)); + } else { + self.audio = None; + } + } + + if (!self.expect_video || self.video != None) && self.audio != None && !self.got_all_streams + { + gst::debug!(CAT, imp: imp, "Have all expected streams now"); + self.got_all_streams = true; + events.push(Event::HaveAllStreams); + } + + events + } + + fn handle_aac_audio_packet_header( + &mut self, + imp: &FlvDemux, + tag_header: &flavors::TagHeader, + adapter: &mut gst_base::UniqueAdapter, + ) -> Result<bool, gst::ErrorMessage> { + // Not big enough for the AAC packet header, ship! + if tag_header.data_size < 1 + 1 { + adapter.flush((tag_header.data_size - 1) as usize); + gst::warning!( + CAT, + imp: imp, + "Too small packet for AAC packet header {}", + tag_header.data_size + ); + return Ok(true); + } + + let data = adapter.map(1).unwrap(); + + match flavors::aac_audio_packet_header(&*data) { + Err(nom::Err::Error(err)) | Err(nom::Err::Failure(err)) => { + gst::error!(CAT, imp: imp, "Invalid AAC audio packet header: {:?}", err); + drop(data); + adapter.flush((tag_header.data_size - 1) as usize); + Ok(true) + } + Err(nom::Err::Incomplete(_)) => unreachable!(), + Ok((_, header)) => { + gst::trace!(CAT, imp: imp, "Got AAC packet header {:?}", header); + match header.packet_type { + flavors::AACPacketType::SequenceHeader => { + drop(data); + adapter.flush(1); + let buffer = adapter + .take_buffer((tag_header.data_size - 1 - 1) as usize) + .unwrap(); + gst::debug!(CAT, imp: imp, "Got AAC sequence header {:?}", buffer,); + + self.aac_sequence_header = Some(buffer); + Ok(true) + } + flavors::AACPacketType::Raw => { + drop(data); + adapter.flush(1); + Ok(false) + } + } + } + } + } + + fn handle_audio_tag( + &mut self, + imp: &FlvDemux, + tag_header: &flavors::TagHeader, + adapter: &mut gst_base::UniqueAdapter, + ) -> Result<SmallVec<[Event; 4]>, gst::ErrorMessage> { + assert!(adapter.available() >= tag_header.data_size as usize); + + let data = adapter.map(1).unwrap(); + let data_header = match flavors::audio_data_header(&*data) { + Err(nom::Err::Error(err)) | Err(nom::Err::Failure(err)) => { + gst::error!(CAT, imp: imp, "Invalid audio data header: {:?}", err); + drop(data); + adapter.flush(tag_header.data_size as usize); + return Ok(SmallVec::new()); + } + Err(nom::Err::Incomplete(_)) => unreachable!(), + Ok((_, data_header)) => data_header, + }; + drop(data); + adapter.flush(1); + + let mut events = self.update_audio_stream(imp, &data_header); + + // AAC special case + if data_header.sound_format == flavors::SoundFormat::AAC + && self.handle_aac_audio_packet_header(imp, tag_header, adapter)? + { + return Ok(events); + } + + let offset = match data_header.sound_format { + flavors::SoundFormat::AAC => 2, + _ => 1, + }; + + if tag_header.data_size == offset { + return Ok(events); + } + + if self.audio == None { + adapter.flush((tag_header.data_size - offset) as usize); + return Ok(events); + } + + let mut buffer = adapter + .take_buffer((tag_header.data_size - offset) as usize) + .unwrap(); + + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts((tag_header.timestamp as u64).mseconds()); + } + + gst::trace!( + CAT, + imp: imp, + "Outputting audio buffer {:?} for tag {:?}", + buffer, + tag_header, + ); + + self.update_position(&buffer); + + events.push(Event::Buffer(Stream::Audio, buffer)); + + Ok(events) + } + + fn update_video_stream( + &mut self, + imp: &FlvDemux, + data_header: &flavors::VideoDataHeader, + ) -> SmallVec<[Event; 4]> { + let mut events = SmallVec::new(); + + gst::trace!(CAT, imp: imp, "Got video data header: {:?}", data_header); + + let new_video_format = + VideoFormat::new(data_header, &self.metadata, &self.avc_sequence_header); + + if self.video.as_ref() != Some(&new_video_format) { + gst::debug!( + CAT, + imp: imp, + "Got new video format: {:?}", + new_video_format + ); + + let caps = new_video_format.to_caps(); + if let Some(caps) = caps { + self.video = Some(new_video_format); + events.push(Event::StreamChanged(Stream::Video, caps)); + } else { + self.video = None; + } + } + + if (!self.expect_audio || self.audio != None) && self.video != None && !self.got_all_streams + { + gst::debug!(CAT, imp: imp, "Have all expected streams now"); + self.got_all_streams = true; + events.push(Event::HaveAllStreams); + } + + events + } + + fn handle_avc_video_packet_header( + &mut self, + imp: &FlvDemux, + tag_header: &flavors::TagHeader, + adapter: &mut gst_base::UniqueAdapter, + ) -> Result<Option<i32>, gst::ErrorMessage> { + // Not big enough for the AVC packet header, skip! + if tag_header.data_size < 1 + 4 { + adapter.flush((tag_header.data_size - 1) as usize); + gst::warning!( + CAT, + imp: imp, + "Too small packet for AVC packet header {}", + tag_header.data_size + ); + return Ok(None); + } + + let data = adapter.map(4).unwrap(); + match flavors::avc_video_packet_header(&*data) { + Err(nom::Err::Error(err)) | Err(nom::Err::Failure(err)) => { + gst::error!(CAT, imp: imp, "Invalid AVC video packet header: {:?}", err); + drop(data); + adapter.flush((tag_header.data_size - 1) as usize); + Ok(None) + } + Err(nom::Err::Incomplete(_)) => unreachable!(), + Ok((_, header)) => { + gst::trace!(CAT, imp: imp, "Got AVC packet header {:?}", header); + match header.packet_type { + flavors::AVCPacketType::SequenceHeader => { + drop(data); + adapter.flush(4); + let buffer = adapter + .take_buffer((tag_header.data_size - 1 - 4) as usize) + .unwrap(); + gst::debug!( + CAT, + imp: imp, + "Got AVC sequence header {:?} of size {}", + buffer, + tag_header.data_size - 1 - 4 + ); + + self.avc_sequence_header = Some(buffer); + Ok(None) + } + flavors::AVCPacketType::NALU => { + drop(data); + adapter.flush(4); + Ok(Some(header.composition_time)) + } + flavors::AVCPacketType::EndOfSequence => { + // Skip + drop(data); + adapter.flush((tag_header.data_size - 1) as usize); + Ok(None) + } + } + } + } + } + + fn handle_video_tag( + &mut self, + imp: &FlvDemux, + tag_header: &flavors::TagHeader, + adapter: &mut gst_base::UniqueAdapter, + ) -> Result<SmallVec<[Event; 4]>, gst::ErrorMessage> { + assert!(adapter.available() >= tag_header.data_size as usize); + + let data = adapter.map(1).unwrap(); + let data_header = match flavors::video_data_header(&*data) { + Err(nom::Err::Error(err)) | Err(nom::Err::Failure(err)) => { + gst::error!(CAT, imp: imp, "Invalid video data header: {:?}", err); + drop(data); + adapter.flush(tag_header.data_size as usize); + return Ok(SmallVec::new()); + } + Err(nom::Err::Incomplete(_)) => unreachable!(), + Ok((_, data_header)) => data_header, + }; + drop(data); + adapter.flush(1); + + let mut events = self.update_video_stream(imp, &data_header); + + // AVC/H264 special case + let cts = if data_header.codec_id == flavors::CodecId::H264 { + match self.handle_avc_video_packet_header(imp, tag_header, adapter)? { + Some(cts) => cts, + None => { + return Ok(events); + } + } + } else { + 0 + }; + + let offset = match data_header.codec_id { + flavors::CodecId::H264 => 5, + _ => 1, + }; + + if tag_header.data_size == offset { + return Ok(events); + } + + if self.video == None { + adapter.flush((tag_header.data_size - offset) as usize); + return Ok(events); + } + + let is_keyframe = data_header.frame_type == flavors::FrameType::Key; + + let skip = match data_header.codec_id { + flavors::CodecId::VP6 | flavors::CodecId::VP6A => 1, + _ => 0, + }; + + if skip > 0 { + adapter.flush(skip as usize); + } + + if tag_header.data_size == offset + skip { + return Ok(events); + } + + let mut buffer = adapter + .take_buffer((tag_header.data_size - offset - skip) as usize) + .unwrap(); + + { + let buffer = buffer.get_mut().unwrap(); + if !is_keyframe { + buffer.set_flags(gst::BufferFlags::DELTA_UNIT); + } + buffer.set_dts((tag_header.timestamp as u64).mseconds()); + + // Prevent negative numbers + let pts = if cts < 0 && tag_header.timestamp < (-cts) as u32 { + 0 + } else { + ((tag_header.timestamp as i64) + (cts as i64)) as u64 + }; + buffer.set_pts(pts.mseconds()); + } + + gst::trace!( + CAT, + imp: imp, + "Outputting video buffer {:?} for tag {:?}, keyframe: {}", + buffer, + tag_header, + is_keyframe + ); + + self.update_position(&buffer); + + events.push(Event::Buffer(Stream::Video, buffer)); + + Ok(events) + } + + fn update_position(&mut self, buffer: &gst::Buffer) { + if let Some(pts) = buffer.pts() { + self.last_position = self.last_position.opt_max(pts).or(Some(pts)); + } else if let Some(dts) = buffer.dts() { + self.last_position = self.last_position.opt_max(dts).or(Some(dts)); + } + } +} + +// Ignores bitrate +impl PartialEq for AudioFormat { + fn eq(&self, other: &Self) -> bool { + self.format.eq(&other.format) + && self.rate.eq(&other.rate) + && self.width.eq(&other.width) + && self.channels.eq(&other.channels) + && self.aac_sequence_header.eq(&other.aac_sequence_header) + } +} + +impl AudioFormat { + fn new( + data_header: &flavors::AudioDataHeader, + metadata: &Option<Metadata>, + aac_sequence_header: &Option<gst::Buffer>, + ) -> AudioFormat { + let numeric_rate = match (data_header.sound_format, data_header.sound_rate) { + (flavors::SoundFormat::NELLYMOSER_16KHZ_MONO, _) => 16_000, + (flavors::SoundFormat::NELLYMOSER_8KHZ_MONO, _) + | (flavors::SoundFormat::PCM_ALAW, _) + | (flavors::SoundFormat::PCM_ULAW, _) + | (flavors::SoundFormat::MP3_8KHZ, _) => 8_000, + (flavors::SoundFormat::SPEEX, _) => 16_000, + (_, flavors::SoundRate::_5_5KHZ) => 5_512, + (_, flavors::SoundRate::_11KHZ) => 11_025, + (_, flavors::SoundRate::_22KHZ) => 22_050, + (_, flavors::SoundRate::_44KHZ) => 44_100, + }; + + let numeric_width = match data_header.sound_size { + flavors::SoundSize::Snd8bit => 8, + flavors::SoundSize::Snd16bit => 16, + }; + + let numeric_channels = match data_header.sound_type { + flavors::SoundType::SndMono => 1, + flavors::SoundType::SndStereo => 2, + }; + + AudioFormat { + format: data_header.sound_format, + rate: numeric_rate, + width: numeric_width, + channels: numeric_channels, + bitrate: metadata.as_ref().and_then(|m| m.audio_bitrate), + aac_sequence_header: aac_sequence_header.clone(), + } + } + + fn update_with_metadata(&mut self, metadata: &Metadata) -> bool { + if self.bitrate != metadata.audio_bitrate { + self.bitrate = metadata.audio_bitrate; + true + } else { + false + } + } + + fn to_caps(&self) -> Option<gst::Caps> { + let mut caps = match self.format { + flavors::SoundFormat::MP3 | flavors::SoundFormat::MP3_8KHZ => Some( + gst::Caps::builder("audio/mpeg") + .field("mpegversion", 1i32) + .field("layer", 3i32) + .build(), + ), + flavors::SoundFormat::PCM_NE | flavors::SoundFormat::PCM_LE => { + if self.rate != 0 && self.channels != 0 { + // Assume little-endian for "PCM_NE", it's probably more common and we have no + // way to know what the endianness of the system creating the stream was + Some( + gst_audio::AudioCapsBuilder::new_interleaved() + .format(if self.width == 8 { + gst_audio::AudioFormat::U8 + } else { + gst_audio::AudioFormat::S16le + }) + .build(), + ) + } else { + None + } + } + flavors::SoundFormat::ADPCM => Some( + gst::Caps::builder("audio/x-adpcm") + .field("layout", "swf") + .build(), + ), + flavors::SoundFormat::NELLYMOSER_16KHZ_MONO + | flavors::SoundFormat::NELLYMOSER_8KHZ_MONO + | flavors::SoundFormat::NELLYMOSER => { + Some(gst::Caps::builder("audio/x-nellymoser").build()) + } + flavors::SoundFormat::PCM_ALAW => Some(gst::Caps::builder("audio/x-alaw").build()), + flavors::SoundFormat::PCM_ULAW => Some(gst::Caps::builder("audio/x-mulaw").build()), + flavors::SoundFormat::AAC => self.aac_sequence_header.as_ref().map(|header| { + gst::Caps::builder("audio/mpeg") + .field("mpegversion", 4i32) + .field("framed", true) + .field("stream-format", "raw") + .field("codec_data", header) + .build() + }), + flavors::SoundFormat::SPEEX => { + use crate::bytes::*; + use std::io::{Cursor, Write}; + + let header = { + let header_size = 80; + let mut data = Cursor::new(Vec::with_capacity(header_size)); + data.write_all(b"Speex 1.1.12").unwrap(); + data.write_all(&[0; 14]).unwrap(); + data.write_u32le(1).unwrap(); // version + data.write_u32le(80).unwrap(); // header size + data.write_u32le(16_000).unwrap(); // sample rate + data.write_u32le(1).unwrap(); // mode = wideband + data.write_u32le(4).unwrap(); // mode bitstream version + data.write_u32le(1).unwrap(); // channels + data.write_i32le(-1).unwrap(); // bitrate + data.write_u32le(0x50).unwrap(); // frame size + data.write_u32le(0).unwrap(); // VBR + data.write_u32le(1).unwrap(); // frames per packet + data.write_u32le(0).unwrap(); // extra headers + data.write_u32le(0).unwrap(); // reserved 1 + data.write_u32le(0).unwrap(); // reserved 2 + + assert_eq!(data.position() as usize, header_size); + + data.into_inner() + }; + let header = gst::Buffer::from_mut_slice(header); + + let comment = { + let comment_size = 4 + 7 /* nothing */ + 4 + 1; + let mut data = Cursor::new(Vec::with_capacity(comment_size)); + data.write_u32le(7).unwrap(); // length of "nothing" + data.write_all(b"nothing").unwrap(); // "vendor" string + data.write_u32le(0).unwrap(); // number of elements + data.write_u8(1).unwrap(); + + assert_eq!(data.position() as usize, comment_size); + + data.into_inner() + }; + let comment = gst::Buffer::from_mut_slice(comment); + + Some( + gst::Caps::builder("audio/x-speex") + .field("streamheader", gst::Array::new([header, comment])) + .build(), + ) + } + flavors::SoundFormat::DEVICE_SPECIFIC => { + // Nobody knows + None + } + }; + + if self.rate != 0 { + if let Some(ref mut caps) = caps.as_mut() { + caps.get_mut() + .unwrap() + .set_simple(&[("rate", &(self.rate as i32))]) + } + } + if self.channels != 0 { + if let Some(ref mut caps) = caps.as_mut() { + caps.get_mut() + .unwrap() + .set_simple(&[("channels", &(self.channels as i32))]) + } + } + + caps + } +} + +// Ignores bitrate +impl PartialEq for VideoFormat { + fn eq(&self, other: &Self) -> bool { + self.format.eq(&other.format) + && self.width.eq(&other.width) + && self.height.eq(&other.height) + && self.pixel_aspect_ratio.eq(&other.pixel_aspect_ratio) + && self.framerate.eq(&other.framerate) + && self.avc_sequence_header.eq(&other.avc_sequence_header) + } +} + +impl VideoFormat { + fn new( + data_header: &flavors::VideoDataHeader, + metadata: &Option<Metadata>, + avc_sequence_header: &Option<gst::Buffer>, + ) -> VideoFormat { + VideoFormat { + format: data_header.codec_id, + width: metadata.as_ref().and_then(|m| m.video_width), + height: metadata.as_ref().and_then(|m| m.video_height), + pixel_aspect_ratio: metadata.as_ref().and_then(|m| m.video_pixel_aspect_ratio), + framerate: metadata.as_ref().and_then(|m| m.video_framerate), + bitrate: metadata.as_ref().and_then(|m| m.video_bitrate), + avc_sequence_header: avc_sequence_header.clone(), + } + } + + #[allow(clippy::useless_let_if_seq)] + fn update_with_metadata(&mut self, metadata: &Metadata) -> bool { + let mut changed = false; + + if self.width != metadata.video_width { + self.width = metadata.video_width; + changed = true; + } + + if self.height != metadata.video_height { + self.height = metadata.video_height; + changed = true; + } + + if self.pixel_aspect_ratio != metadata.video_pixel_aspect_ratio { + self.pixel_aspect_ratio = metadata.video_pixel_aspect_ratio; + changed = true; + } + + if self.framerate != metadata.video_framerate { + self.framerate = metadata.video_framerate; + changed = true; + } + + if self.bitrate != metadata.video_bitrate { + self.bitrate = metadata.video_bitrate; + changed = true; + } + + changed + } + + fn to_caps(&self) -> Option<gst::Caps> { + let mut caps = match self.format { + flavors::CodecId::SORENSON_H263 => Some( + gst::Caps::builder("video/x-flash-video") + .field("flvversion", 1i32) + .build(), + ), + flavors::CodecId::SCREEN => Some(gst::Caps::builder("video/x-flash-screen").build()), + flavors::CodecId::VP6 => Some(gst::Caps::builder("video/x-vp6-flash").build()), + flavors::CodecId::VP6A => Some(gst::Caps::builder("video/x-vp6-flash-alpha").build()), + flavors::CodecId::SCREEN2 => Some(gst::Caps::builder("video/x-flash-screen2").build()), + flavors::CodecId::H264 => self.avc_sequence_header.as_ref().map(|header| { + gst::Caps::builder("video/x-h264") + .field("stream-format", "avc") + .field("codec_data", &header) + .build() + }), + flavors::CodecId::H263 => Some(gst::Caps::builder("video/x-h263").build()), + flavors::CodecId::MPEG4Part2 => Some( + gst::Caps::builder("video/mpeg") + .field("mpegversion", 4i32) + .field("systemstream", false) + .build(), + ), + flavors::CodecId::JPEG => { + // Unused according to spec + None + } + }; + + if let (Some(width), Some(height)) = (self.width, self.height) { + if let Some(ref mut caps) = caps.as_mut() { + caps.get_mut() + .unwrap() + .set_simple(&[("width", &(width as i32)), ("height", &(height as i32))]) + } + } + + if let Some(par) = self.pixel_aspect_ratio { + if *par.numer() != 0 && par.numer() != par.denom() { + if let Some(ref mut caps) = caps.as_mut() { + caps.get_mut().unwrap().set_simple(&[( + "pixel-aspect-ratio", + &gst::Fraction::new(*par.numer(), *par.denom()), + )]) + } + } + } + + if let Some(fps) = self.framerate { + if *fps.numer() != 0 { + if let Some(ref mut caps) = caps.as_mut() { + caps.get_mut().unwrap().set_simple(&[( + "framerate", + &gst::Fraction::new(*fps.numer(), *fps.denom()), + )]) + } + } + } + + caps + } +} + +impl Metadata { + fn new(script_data: &flavors::ScriptData) -> Metadata { + assert_eq!(script_data.name, "onMetaData"); + + let mut metadata = Metadata::default(); + + let args = match script_data.arguments { + flavors::ScriptDataValue::Object(ref objects) + | flavors::ScriptDataValue::ECMAArray(ref objects) => objects, + _ => return metadata, + }; + + let mut par_n = None; + let mut par_d = None; + + for arg in args { + match (arg.name, &arg.data) { + ("duration", &flavors::ScriptDataValue::Number(duration)) => { + metadata.duration = + Some(((duration * 1000.0 * 1000.0 * 1000.0) as u64).nseconds()); + } + ("creationdate", &flavors::ScriptDataValue::String(date)) => { + metadata.creation_date = Some(String::from(date)); + } + ("creator", &flavors::ScriptDataValue::String(creator)) => { + metadata.creator = Some(String::from(creator)); + } + ("title", &flavors::ScriptDataValue::String(title)) => { + metadata.title = Some(String::from(title)); + } + ("metadatacreator", &flavors::ScriptDataValue::String(creator)) => { + metadata.metadata_creator = Some(String::from(creator)); + } + ("audiodatarate", &flavors::ScriptDataValue::Number(datarate)) => { + metadata.audio_bitrate = Some((datarate * 1024.0) as u32); + } + ("width", &flavors::ScriptDataValue::Number(width)) => { + metadata.video_width = Some(width as u32); + } + ("height", &flavors::ScriptDataValue::Number(height)) => { + metadata.video_height = Some(height as u32); + } + ("framerate", &flavors::ScriptDataValue::Number(framerate)) if framerate >= 0.0 => { + if let Some(framerate) = Rational32::approximate_float(framerate) { + metadata.video_framerate = Some(framerate); + } + } + ("AspectRatioX", &flavors::ScriptDataValue::Number(par_x)) if par_x > 0.0 => { + par_n = Some(par_x as i32); + } + ("AspectRatioY", &flavors::ScriptDataValue::Number(par_y)) if par_y > 0.0 => { + par_d = Some(par_y as i32); + } + ("videodatarate", &flavors::ScriptDataValue::Number(datarate)) => { + metadata.video_bitrate = Some((datarate * 1024.0) as u32); + } + _ => {} + } + } + + if let (Some(par_n), Some(par_d)) = (par_n, par_d) { + metadata.video_pixel_aspect_ratio = Some(Rational32::new(par_n, par_d)); + } + + metadata + } +} diff --git a/mux/flavors/src/flvdemux/mod.rs b/mux/flavors/src/flvdemux/mod.rs new file mode 100644 index 000000000..aa311bdf8 --- /dev/null +++ b/mux/flavors/src/flvdemux/mod.rs @@ -0,0 +1,27 @@ +// Copyright (C) 2016-2018 Sebastian Dröge <sebastian@centricular.com> +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. +// +// SPDX-License-Identifier: MIT OR Apache-2.0 + +use gst::glib; +use gst::prelude::*; + +mod imp; + +glib::wrapper! { + pub struct FlvDemux(ObjectSubclass<imp::FlvDemux>) @extends gst::Element, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "rsflvdemux", + gst::Rank::None, + FlvDemux::static_type(), + ) +} diff --git a/mux/flavors/src/lib.rs b/mux/flavors/src/lib.rs new file mode 100644 index 000000000..7813d92e3 --- /dev/null +++ b/mux/flavors/src/lib.rs @@ -0,0 +1,36 @@ +// Copyright (C) 2016-2017 Sebastian Dröge <sebastian@centricular.com> +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. +// +// SPDX-License-Identifier: MIT OR Apache-2.0 +#![allow(clippy::non_send_fields_in_send_ty, unused_doc_comments)] + +/** + * plugin-rsflv: + * + * Since: plugins-rs-0.4.0 + */ +use gst::glib; + +mod bytes; +mod flvdemux; + +fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + flvdemux::register(plugin) +} + +gst::plugin_define!( + rsflv, + env!("CARGO_PKG_DESCRIPTION"), + plugin_init, + concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")), + "MIT/X11", + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_REPOSITORY"), + env!("BUILD_REL_DATE") +); diff --git a/mux/fmp4/Cargo.toml b/mux/fmp4/Cargo.toml new file mode 100644 index 000000000..f72c84a7b --- /dev/null +++ b/mux/fmp4/Cargo.toml @@ -0,0 +1,52 @@ +[package] +name = "gst-plugin-fmp4" +version = "0.9.0-alpha.1" +authors = ["Sebastian Dröge <sebastian@centricular.com>"] +license = "MPL-2.0" +description = "GStreamer Fragmented MP4 Plugin" +repository = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" +edition = "2021" +rust-version = "1.63" + +[dependencies] +anyhow = "1" +gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } +gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } +gst-audio = { package = "gstreamer-audio", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } +gst-video = { package = "gstreamer-video", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } +once_cell = "1.0" +uuid = { version = "1", features = ["v4"] } + +[lib] +name = "gstfmp4" +crate-type = ["cdylib", "rlib"] +path = "src/lib.rs" + +[dev-dependencies] +gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_18"] } +gst-check = { package = "gstreamer-check", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_18"] } +gst-pbutils = { package = "gstreamer-pbutils", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] } +m3u8-rs = "5.0" +chrono = "0.4" + +[build-dependencies] +gst-plugin-version-helper = { path="../../version-helper" } + +[features] +default = ["v1_18"] +static = [] +capi = [] +v1_18 = ["gst-video/v1_18"] + +[package.metadata.capi] +min_version = "0.8.0" + +[package.metadata.capi.header] +enabled = false + +[package.metadata.capi.library] +install_subdir = "gstreamer-1.0" +versioning = false + +[package.metadata.capi.pkg_config] +requires_private = "gstreamer-1.0, gstreamer-base-1.0, gstreamer-audio-1.0, gstreamer-video-1.0, gobject-2.0, glib-2.0, gmodule-2.0" diff --git a/mux/fmp4/LICENSE b/mux/fmp4/LICENSE new file mode 120000 index 000000000..eb5d24fe9 --- /dev/null +++ b/mux/fmp4/LICENSE @@ -0,0 +1 @@ +../../LICENSE-MPL-2.0
\ No newline at end of file diff --git a/mux/fmp4/build.rs b/mux/fmp4/build.rs new file mode 100644 index 000000000..cda12e57e --- /dev/null +++ b/mux/fmp4/build.rs @@ -0,0 +1,3 @@ +fn main() { + gst_plugin_version_helper::info() +} diff --git a/mux/fmp4/examples/dash_vod.rs b/mux/fmp4/examples/dash_vod.rs new file mode 100644 index 000000000..eb291d6eb --- /dev/null +++ b/mux/fmp4/examples/dash_vod.rs @@ -0,0 +1,266 @@ +// Copyright (C) 2021 Sebastian Dröge <sebastian@centricular.com> +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// <https://mozilla.org/MPL/2.0/>. +// +// SPDX-License-Identifier: MPL-2.0 + +// This creates a VoD DASH manifest based on the output of `cmafmux`. The media header +// ("initialization segment") is written into a separate file as the segments, and each segment is +// its own file too. +// +// All segments that are created are exactly 10s, expect for the last one which is only 3.333s. + +use gst::prelude::*; + +use std::fmt::Write; +use std::path::PathBuf; +use std::sync::{Arc, Mutex}; + +use anyhow::Error; + +struct Segment { + start_time: gst::ClockTime, + duration: gst::ClockTime, +} + +struct State { + start_time: Option<gst::ClockTime>, + end_time: Option<gst::ClockTime>, + segments: Vec<Segment>, + path: PathBuf, +} + +fn main() -> Result<(), Error> { + gst::init()?; + + gstfmp4::plugin_register_static()?; + + let state = Arc::new(Mutex::new(State { + start_time: None, + end_time: None, + segments: Vec::new(), + path: PathBuf::from("dash_stream"), + })); + + let pipeline = gst::parse_launch("videotestsrc num-buffers=2500 ! timecodestamper ! video/x-raw,format=I420,width=1280,height=720,framerate=30/1 ! timeoverlay ! x264enc bframes=0 bitrate=2048 ! video/x-h264,profile=main ! cmafmux fragment-duration=10000000000 header-update-mode=update write-mehd=true ! appsink name=sink").unwrap().downcast::<gst::Pipeline>().unwrap(); + + let sink = pipeline + .by_name("sink") + .unwrap() + .dynamic_cast::<gst_app::AppSink>() + .unwrap(); + sink.set_buffer_list(true); + + let state_clone = state.clone(); + sink.set_callbacks( + gst_app::AppSinkCallbacks::builder() + .new_sample(move |sink| { + let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?; + let mut state = state.lock().unwrap(); + + // The muxer only outputs non-empty buffer lists + let mut buffer_list = sample.buffer_list_owned().expect("no buffer list"); + assert!(!buffer_list.is_empty()); + + let mut first = buffer_list.get(0).unwrap(); + + // Each list contains a full segment, i.e. does not start with a DELTA_UNIT + assert!(!first.flags().contains(gst::BufferFlags::DELTA_UNIT)); + + // If the buffer has the DISCONT and HEADER flag set then it contains the media + // header, i.e. the `ftyp`, `moov` and other media boxes. + // + // This might be the initial header or the updated header at the end of the stream. + if first.flags().contains(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER) { + let mut path = state.path.clone(); + std::fs::create_dir_all(&path).expect("failed to create directory"); + path.push("init.cmfi"); + + println!("writing header to {}", path.display()); + let map = first.map_readable().unwrap(); + std::fs::write(path, &map).expect("failed to write header"); + drop(map); + + // Remove the header from the buffer list + buffer_list.make_mut().remove(0, 1); + + // If the list is now empty then it only contained the media header and nothing + // else. + if buffer_list.is_empty() { + return Ok(gst::FlowSuccess::Ok); + } + + // Otherwise get the next buffer and continue working with that. + first = buffer_list.get(0).unwrap(); + } + + // If the buffer only has the HEADER flag set then this is a segment header that is + // followed by one or more actual media buffers. + assert!(first.flags().contains(gst::BufferFlags::HEADER)); + + let segment = sample.segment().expect("no segment") + .downcast_ref::<gst::ClockTime>().expect("no time segment"); + + // Initialize the start time with the first PTS we observed. This will be used + // later for calculating the duration of the whole media for the DASH manifest. + // + // The PTS of the segment header is equivalent to the earliest PTS of the whole + // segment. + let pts = segment.to_running_time(first.pts().unwrap()).expect("can't get running time"); + if state.start_time.is_none() { + state.start_time = Some(pts); + } + + // The metadata of the first media buffer is duplicated to the segment header. + // Based on this we can know the timecode of the first frame in this segment. + let meta = first.meta::<gst_video::VideoTimeCodeMeta>().expect("no timecode meta"); + + let mut path = state.path.clone(); + path.push(format!("segment_{}.cmfv", state.segments.len() + 1)); + println!("writing segment with timecode {} to {}", meta.tc(), path.display()); + + // Calculate the end time at this point. The duration of the segment header is set + // to the whole duration of this segment. + let duration = first.duration().unwrap(); + let end_time = first.pts().unwrap() + first.duration().unwrap(); + state.end_time = Some(segment.to_running_time(end_time).expect("can't get running time")); + + let mut file = std::fs::File::create(path).expect("failed to open fragment"); + for buffer in &*buffer_list { + use std::io::prelude::*; + + let map = buffer.map_readable().unwrap(); + file.write_all(&map).expect("failed to write fragment"); + } + + state.segments.push(Segment { + start_time: pts, + duration, + }); + + Ok(gst::FlowSuccess::Ok) + }) + .eos(move |_sink| { + let state = state_clone.lock().unwrap(); + + // Now write the manifest + let mut path = state.path.clone(); + path.push("manifest.mpd"); + + println!("writing manifest to {}", path.display()); + + let duration = state.end_time.opt_checked_sub(state.start_time).ok().flatten().unwrap().mseconds() as f64 / 1000.0; + + // Write the whole segment timeline out here, compressing multiple segments with + // the same duration to a repeated segment. + let mut segment_timeline = String::new(); + let mut write_segment = |start: gst::ClockTime, duration: gst::ClockTime, repeat: usize| { + if repeat > 0 { + writeln!( + &mut segment_timeline, + " <S t=\"{time}\" d=\"{duration}\" r=\"{repeat}\" />", + time = start.mseconds(), + duration = duration.mseconds(), + repeat = repeat + ).unwrap(); + } else { + writeln!( + &mut segment_timeline, + " <S t=\"{time}\" d=\"{duration}\" />", + time = start.mseconds(), + duration = duration.mseconds() + ).unwrap(); + } + }; + + let mut start = None; + let mut num_segments = 0; + let mut last_duration = None; + for segment in &state.segments { + if start.is_none() { + start = Some(segment.start_time); + } + if last_duration.is_none() { + last_duration = Some(segment.duration); + } + + // If the duration of this segment is different from the previous one then we + // have to write out the segment now. + if last_duration != Some(segment.duration) { + write_segment(start.unwrap(), last_duration.unwrap(), num_segments - 1); + start = Some(segment.start_time); + last_duration = Some(segment.duration); + num_segments = 1; + } else { + num_segments += 1; + } + } + + // Write the last segment if any + if num_segments > 0 { + write_segment(start.unwrap(), last_duration.unwrap(), num_segments - 1); + } + + let manifest = format!(r###"<?xml version="1.0" encoding="UTF-8"?> +<MPD + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns="urn:mpeg:dash:schema:mpd:2011" + xsi:schemaLocation="urn:mpeg:dash:schema:mpd:2011 DASH-MPD.xsd" + type="static" + mediaPresentationDuration="PT{duration:.3}S" + profiles="urn:mpeg:dash:profile:isoff-on-demand:2011"> + <Period> + <AdaptationSet mimeType="video/mp4" codecs="avc1.4d0228" frameRate="30/1" segmentAlignment="true" startWithSAP="1"> + <Representation id="A" bandwidth="2048000" with="1280" height="720"> + <SegmentTemplate timescale="1000" initialization="init.cmfi" media="segment_$Number$.cmfv"> + <SegmentTimeline> +{segment_timeline} </SegmentTimeline> + </SegmentTemplate> + </Representation> + </AdaptationSet> + </Period> +</MPD> +"###, + duration = duration, segment_timeline = segment_timeline); + + std::fs::write(path, &manifest).expect("failed to write manifest"); + }) + .build(), + ); + + pipeline.set_state(gst::State::Playing)?; + + let bus = pipeline + .bus() + .expect("Pipeline without bus. Shouldn't happen!"); + + for msg in bus.iter_timed(gst::ClockTime::NONE) { + use gst::MessageView; + + match msg.view() { + MessageView::Eos(..) => { + println!("EOS"); + break; + } + MessageView::Error(err) => { + pipeline.set_state(gst::State::Null)?; + eprintln!( + "Got error from {}: {} ({})", + msg.src() + .map(|s| String::from(s.path_string())) + .unwrap_or_else(|| "None".into()), + err.error(), + err.debug().unwrap_or_else(|| "".into()), + ); + break; + } + _ => (), + } + } + + pipeline.set_state(gst::State::Null)?; + + Ok(()) +} diff --git a/mux/fmp4/examples/hls_live.rs b/mux/fmp4/examples/hls_live.rs new file mode 100644 index 000000000..f1d9ed6d1 --- /dev/null +++ b/mux/fmp4/examples/hls_live.rs @@ -0,0 +1,572 @@ +// Copyright (C) 2022 Mathieu Duponchelle <mathieu@centricular.com> +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// <https://mozilla.org/MPL/2.0/>. +// +// SPDX-License-Identifier: MPL-2.0 + +// This creates a live HLS stream with one video playlist and two video playlists. +// Basic trimming is implemented + +use gst::prelude::*; + +use std::collections::VecDeque; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; + +use anyhow::Error; +use chrono::{DateTime, Duration, Utc}; +use m3u8_rs::{ + AlternativeMedia, AlternativeMediaType, MasterPlaylist, MediaPlaylist, MediaSegment, + VariantStream, +}; + +struct State { + video_streams: Vec<VideoStream>, + audio_streams: Vec<AudioStream>, + all_mimes: Vec<String>, + path: PathBuf, + wrote_manifest: bool, +} + +impl State { + fn maybe_write_manifest(&mut self) { + if self.wrote_manifest { + return; + } + + if self.all_mimes.len() < self.video_streams.len() + self.audio_streams.len() { + return; + } + + let mut all_mimes = self.all_mimes.clone(); + all_mimes.sort(); + all_mimes.dedup(); + + let playlist = MasterPlaylist { + version: Some(7), + variants: self + .video_streams + .iter() + .map(|stream| { + let mut path = PathBuf::new(); + + path.push(&stream.name); + path.push("manifest.m3u8"); + + VariantStream { + uri: path.as_path().display().to_string(), + bandwidth: stream.bitrate, + codecs: Some(all_mimes.join(",")), + resolution: Some(m3u8_rs::Resolution { + width: stream.width, + height: stream.height, + }), + audio: Some("audio".to_string()), + ..Default::default() + } + }) + .collect(), + alternatives: self + .audio_streams + .iter() + .map(|stream| { + let mut path = PathBuf::new(); + path.push(&stream.name); + path.push("manifest.m3u8"); + + AlternativeMedia { + media_type: AlternativeMediaType::Audio, + uri: Some(path.as_path().display().to_string()), + group_id: "audio".to_string(), + language: Some(stream.lang.clone()), + name: stream.name.clone(), + default: stream.default, + autoselect: stream.default, + channels: Some("2".to_string()), + ..Default::default() + } + }) + .collect(), + independent_segments: true, + ..Default::default() + }; + + println!("Writing master manifest to {}", self.path.display()); + + let mut file = std::fs::File::create(&self.path).unwrap(); + playlist + .write_to(&mut file) + .expect("Failed to write master playlist"); + + self.wrote_manifest = true; + } +} + +struct Segment { + date_time: DateTime<Utc>, + duration: gst::ClockTime, + path: String, +} + +struct UnreffedSegment { + removal_time: DateTime<Utc>, + path: String, +} + +struct StreamState { + path: PathBuf, + segments: VecDeque<Segment>, + trimmed_segments: VecDeque<UnreffedSegment>, + start_date_time: Option<DateTime<Utc>>, + start_time: Option<gst::ClockTime>, + media_sequence: u64, + segment_index: u32, +} + +struct VideoStream { + name: String, + bitrate: u64, + width: u64, + height: u64, +} + +struct AudioStream { + name: String, + lang: String, + default: bool, + wave: String, +} + +fn trim_segments(state: &mut StreamState) { + // Arbitrary 5 segments window + while state.segments.len() > 5 { + let segment = state.segments.pop_front().unwrap(); + + state.media_sequence += 1; + + state.trimmed_segments.push_back(UnreffedSegment { + // HLS spec mandates that segments are removed from the filesystem no sooner + // than the duration of the longest playlist + duration of the segment. + // This is 15 seconds (12.5 + 2.5) in our case, we use 20 seconds to be on the + // safe side + removal_time: segment + .date_time + .checked_add_signed(Duration::seconds(20)) + .unwrap(), + path: segment.path.clone(), + }); + } + + while let Some(segment) = state.trimmed_segments.front() { + if segment.removal_time < state.segments.front().unwrap().date_time { + let segment = state.trimmed_segments.pop_front().unwrap(); + + let mut path = state.path.clone(); + path.push(segment.path); + println!("Removing {}", path.display()); + std::fs::remove_file(path).expect("Failed to remove old segment"); + } else { + break; + } + } +} + +fn update_manifest(state: &mut StreamState) { + // Now write the manifest + let mut path = state.path.clone(); + path.push("manifest.m3u8"); + + println!("writing manifest to {}", path.display()); + + trim_segments(state); + + let playlist = MediaPlaylist { + version: Some(7), + target_duration: 2.5, + media_sequence: state.media_sequence, + segments: state + .segments + .iter() + .enumerate() + .map(|(idx, segment)| MediaSegment { + uri: segment.path.to_string(), + duration: (segment.duration.nseconds() as f64 + / gst::ClockTime::SECOND.nseconds() as f64) as f32, + map: Some(m3u8_rs::Map { + uri: "init.cmfi".into(), + ..Default::default() + }), + program_date_time: if idx == 0 { + Some(segment.date_time.into()) + } else { + None + }, + ..Default::default() + }) + .collect(), + end_list: false, + playlist_type: None, + i_frames_only: false, + start: None, + independent_segments: true, + ..Default::default() + }; + + let mut file = std::fs::File::create(path).unwrap(); + playlist + .write_to(&mut file) + .expect("Failed to write media playlist"); +} + +fn setup_appsink(appsink: &gst_app::AppSink, name: &str, path: &Path, is_video: bool) { + let mut path: PathBuf = path.into(); + path.push(name); + + let state = Arc::new(Mutex::new(StreamState { + segments: VecDeque::new(), + trimmed_segments: VecDeque::new(), + path, + start_date_time: None, + start_time: gst::ClockTime::NONE, + media_sequence: 0, + segment_index: 0, + })); + + appsink.set_callbacks( + gst_app::AppSinkCallbacks::builder() + .new_sample(move |sink| { + let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?; + let mut state = state.lock().unwrap(); + + // The muxer only outputs non-empty buffer lists + let mut buffer_list = sample.buffer_list_owned().expect("no buffer list"); + assert!(!buffer_list.is_empty()); + + let mut first = buffer_list.get(0).unwrap(); + + // Each list contains a full segment, i.e. does not start with a DELTA_UNIT + assert!(!first.flags().contains(gst::BufferFlags::DELTA_UNIT)); + + // If the buffer has the DISCONT and HEADER flag set then it contains the media + // header, i.e. the `ftyp`, `moov` and other media boxes. + // + // This might be the initial header or the updated header at the end of the stream. + if first + .flags() + .contains(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER) + { + let mut path = state.path.clone(); + std::fs::create_dir_all(&path).expect("failed to create directory"); + path.push("init.cmfi"); + + println!("writing header to {}", path.display()); + let map = first.map_readable().unwrap(); + std::fs::write(path, &map).expect("failed to write header"); + drop(map); + + // Remove the header from the buffer list + buffer_list.make_mut().remove(0, 1); + + // If the list is now empty then it only contained the media header and nothing + // else. + if buffer_list.is_empty() { + return Ok(gst::FlowSuccess::Ok); + } + + // Otherwise get the next buffer and continue working with that. + first = buffer_list.get(0).unwrap(); + } + + // If the buffer only has the HEADER flag set then this is a segment header that is + // followed by one or more actual media buffers. + assert!(first.flags().contains(gst::BufferFlags::HEADER)); + + let mut path = state.path.clone(); + let basename = format!( + "segment_{}.{}", + state.segment_index, + if is_video { "cmfv" } else { "cmfa" } + ); + state.segment_index += 1; + path.push(&basename); + + let segment = sample + .segment() + .expect("no segment") + .downcast_ref::<gst::ClockTime>() + .expect("no time segment"); + let pts = segment + .to_running_time(first.pts().unwrap()) + .expect("can't get running time"); + + if state.start_time.is_none() { + state.start_time = Some(pts); + } + + if state.start_date_time.is_none() { + let now_utc = Utc::now(); + let now_gst = sink.clock().unwrap().time().unwrap(); + let pts_clock_time = pts + sink.base_time().unwrap(); + + let diff = now_gst.checked_sub(pts_clock_time).unwrap(); + let pts_utc = now_utc + .checked_sub_signed(Duration::nanoseconds(diff.nseconds() as i64)) + .unwrap(); + + state.start_date_time = Some(pts_utc); + } + + let duration = first.duration().unwrap(); + + let mut file = std::fs::File::create(&path).expect("failed to open fragment"); + for buffer in &*buffer_list { + use std::io::prelude::*; + + let map = buffer.map_readable().unwrap(); + file.write_all(&map).expect("failed to write fragment"); + } + + let date_time = state + .start_date_time + .unwrap() + .checked_add_signed(Duration::nanoseconds( + pts.opt_checked_sub(state.start_time) + .unwrap() + .unwrap() + .nseconds() as i64, + )) + .unwrap(); + + println!( + "wrote segment with date time {} to {}", + date_time, + path.display() + ); + + state.segments.push_back(Segment { + duration, + path: basename.to_string(), + date_time, + }); + + update_manifest(&mut state); + + Ok(gst::FlowSuccess::Ok) + }) + .eos(move |_sink| { + unreachable!(); + }) + .build(), + ); +} + +fn probe_encoder(state: Arc<Mutex<State>>, enc: gst::Element) { + enc.static_pad("src").unwrap().add_probe( + gst::PadProbeType::EVENT_DOWNSTREAM, + move |_pad, info| match info.data { + Some(gst::PadProbeData::Event(ref ev)) => match ev.view() { + gst::EventView::Caps(e) => { + let mime = gst_pbutils::codec_utils_caps_get_mime_codec(&e.caps().to_owned()); + + let mut state = state.lock().unwrap(); + state.all_mimes.push(mime.unwrap().into()); + state.maybe_write_manifest(); + gst::PadProbeReturn::Remove + } + _ => gst::PadProbeReturn::Ok, + }, + _ => gst::PadProbeReturn::Ok, + }, + ); +} + +impl VideoStream { + fn setup( + &self, + state: Arc<Mutex<State>>, + pipeline: &gst::Pipeline, + path: &Path, + ) -> Result<(), Error> { + let src = gst::ElementFactory::make("videotestsrc") + .property("is-live", true) + .build()?; + + let raw_capsfilter = gst::ElementFactory::make("capsfilter") + .property( + "caps", + gst_video::VideoCapsBuilder::new() + .format(gst_video::VideoFormat::I420) + .width(self.width as i32) + .height(self.height as i32) + .framerate(30.into()) + .build(), + ) + .build()?; + let timeoverlay = gst::ElementFactory::make("timeoverlay").build()?; + let enc = gst::ElementFactory::make("x264enc") + .property("bframes", 0u32) + .property("bitrate", self.bitrate as u32 / 1000u32) + .property_from_str("tune", "zerolatency") + .build()?; + let h264_capsfilter = gst::ElementFactory::make("capsfilter") + .property( + "caps", + gst::Caps::builder("video/x-h264") + .field("profile", "main") + .build(), + ) + .build()?; + let mux = gst::ElementFactory::make("cmafmux") + .property("fragment-duration", 2500.mseconds()) + .property_from_str("header-update-mode", "update") + .property("write-mehd", true) + .build()?; + let appsink = gst_app::AppSink::builder().buffer_list(true).build(); + + pipeline.add_many(&[ + &src, + &raw_capsfilter, + &timeoverlay, + &enc, + &h264_capsfilter, + &mux, + appsink.upcast_ref(), + ])?; + + gst::Element::link_many(&[ + &src, + &raw_capsfilter, + &timeoverlay, + &enc, + &h264_capsfilter, + &mux, + appsink.upcast_ref(), + ])?; + + probe_encoder(state, enc); + + setup_appsink(&appsink, &self.name, path, true); + + Ok(()) + } +} + +impl AudioStream { + fn setup( + &self, + state: Arc<Mutex<State>>, + pipeline: &gst::Pipeline, + path: &Path, + ) -> Result<(), Error> { + let src = gst::ElementFactory::make("audiotestsrc") + .property("is-live", true) + .property_from_str("wave", &self.wave) + .property("fragment-duration", 2500.mseconds()) + .build()?; + let enc = gst::ElementFactory::make("avenc_aac").build()?; + let mux = gst::ElementFactory::make("cmafmux") + .property_from_str("header-update-mode", "update") + .property("write-mehd", true) + .build()?; + let appsink = gst_app::AppSink::builder().buffer_list(true).build(); + + pipeline.add_many(&[&src, &enc, &mux, appsink.upcast_ref()])?; + + gst::Element::link_many(&[&src, &enc, &mux, appsink.upcast_ref()])?; + + probe_encoder(state, enc); + + setup_appsink(&appsink, &self.name, path, false); + + Ok(()) + } +} + +fn main() -> Result<(), Error> { + gst::init()?; + + gstfmp4::plugin_register_static()?; + + let path = PathBuf::from("hls_live_stream"); + + let pipeline = gst::Pipeline::default(); + + std::fs::create_dir_all(&path).expect("failed to create directory"); + + let mut manifest_path = path.clone(); + manifest_path.push("manifest.m3u8"); + + let state = Arc::new(Mutex::new(State { + video_streams: vec![VideoStream { + name: "video_0".to_string(), + bitrate: 2_048_000, + width: 1280, + height: 720, + }], + audio_streams: vec![ + AudioStream { + name: "audio_0".to_string(), + lang: "eng".to_string(), + default: true, + wave: "sine".to_string(), + }, + AudioStream { + name: "audio_1".to_string(), + lang: "fre".to_string(), + default: false, + wave: "white-noise".to_string(), + }, + ], + all_mimes: vec![], + path: manifest_path.clone(), + wrote_manifest: false, + })); + + { + let state_lock = state.lock().unwrap(); + + for stream in &state_lock.video_streams { + stream.setup(state.clone(), &pipeline, &path)?; + } + + for stream in &state_lock.audio_streams { + stream.setup(state.clone(), &pipeline, &path)?; + } + } + + pipeline.set_state(gst::State::Playing)?; + + let bus = pipeline + .bus() + .expect("Pipeline without bus. Shouldn't happen!"); + + for msg in bus.iter_timed(gst::ClockTime::NONE) { + use gst::MessageView; + + match msg.view() { + MessageView::Eos(..) => { + println!("EOS"); + break; + } + MessageView::Error(err) => { + pipeline.set_state(gst::State::Null)?; + eprintln!( + "Got error from {}: {} ({})", + msg.src() + .map(|s| String::from(s.path_string())) + .unwrap_or_else(|| "None".into()), + err.error(), + err.debug().unwrap_or_else(|| "".into()), + ); + break; + } + _ => (), + } + } + + pipeline.set_state(gst::State::Null)?; + + Ok(()) +} diff --git a/mux/fmp4/examples/hls_vod.rs b/mux/fmp4/examples/hls_vod.rs new file mode 100644 index 000000000..4a2533cd1 --- /dev/null +++ b/mux/fmp4/examples/hls_vod.rs @@ -0,0 +1,472 @@ +// Copyright (C) 2022 Mathieu Duponchelle <mathieu@centricular.com> +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// <https://mozilla.org/MPL/2.0/>. +// +// SPDX-License-Identifier: MPL-2.0 + +// This creates a 10 second VOD HLS stream with one video playlist and two audio +// playlists. Each segment is 2.5 second long. + +use gst::prelude::*; + +use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; + +use anyhow::Error; + +use m3u8_rs::{ + AlternativeMedia, AlternativeMediaType, MasterPlaylist, MediaPlaylist, MediaPlaylistType, + MediaSegment, VariantStream, +}; + +struct Segment { + duration: gst::ClockTime, + path: String, +} + +struct StreamState { + path: PathBuf, + segments: Vec<Segment>, +} + +struct VideoStream { + name: String, + bitrate: u64, + width: u64, + height: u64, +} + +struct AudioStream { + name: String, + lang: String, + default: bool, + wave: String, +} + +struct State { + video_streams: Vec<VideoStream>, + audio_streams: Vec<AudioStream>, + all_mimes: Vec<String>, + path: PathBuf, + wrote_manifest: bool, +} + +impl State { + fn maybe_write_manifest(&mut self) { + if self.wrote_manifest { + return; + } + + if self.all_mimes.len() < self.video_streams.len() + self.audio_streams.len() { + return; + } + + let mut all_mimes = self.all_mimes.clone(); + all_mimes.sort(); + all_mimes.dedup(); + + let playlist = MasterPlaylist { + version: Some(7), + variants: self + .video_streams + .iter() + .map(|stream| { + let mut path = PathBuf::new(); + + path.push(&stream.name); + path.push("manifest.m3u8"); + + VariantStream { + uri: path.as_path().display().to_string(), + bandwidth: stream.bitrate, + codecs: Some(all_mimes.join(",")), + resolution: Some(m3u8_rs::Resolution { + width: stream.width, + height: stream.height, + }), + audio: Some("audio".to_string()), + ..Default::default() + } + }) + .collect(), + alternatives: self + .audio_streams + .iter() + .map(|stream| { + let mut path = PathBuf::new(); + path.push(&stream.name); + path.push("manifest.m3u8"); + + AlternativeMedia { + media_type: AlternativeMediaType::Audio, + uri: Some(path.as_path().display().to_string()), + group_id: "audio".to_string(), + language: Some(stream.lang.clone()), + name: stream.name.clone(), + default: stream.default, + autoselect: stream.default, + channels: Some("2".to_string()), + ..Default::default() + } + }) + .collect(), + independent_segments: true, + ..Default::default() + }; + + println!("Writing master manifest to {}", self.path.display()); + + let mut file = std::fs::File::create(&self.path).unwrap(); + playlist + .write_to(&mut file) + .expect("Failed to write master playlist"); + + self.wrote_manifest = true; + } +} + +fn setup_appsink(appsink: &gst_app::AppSink, name: &str, path: &Path, is_video: bool) { + let mut path: PathBuf = path.into(); + path.push(name); + + let state = Arc::new(Mutex::new(StreamState { + segments: Vec::new(), + path, + })); + + let state_clone = state.clone(); + appsink.set_callbacks( + gst_app::AppSinkCallbacks::builder() + .new_sample(move |sink| { + let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?; + let mut state = state.lock().unwrap(); + + // The muxer only outputs non-empty buffer lists + let mut buffer_list = sample.buffer_list_owned().expect("no buffer list"); + assert!(!buffer_list.is_empty()); + + let mut first = buffer_list.get(0).unwrap(); + + // Each list contains a full segment, i.e. does not start with a DELTA_UNIT + assert!(!first.flags().contains(gst::BufferFlags::DELTA_UNIT)); + + // If the buffer has the DISCONT and HEADER flag set then it contains the media + // header, i.e. the `ftyp`, `moov` and other media boxes. + // + // This might be the initial header or the updated header at the end of the stream. + if first + .flags() + .contains(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER) + { + let mut path = state.path.clone(); + std::fs::create_dir_all(&path).expect("failed to create directory"); + path.push("init.cmfi"); + + println!("writing header to {}", path.display()); + let map = first.map_readable().unwrap(); + std::fs::write(path, &map).expect("failed to write header"); + drop(map); + + // Remove the header from the buffer list + buffer_list.make_mut().remove(0, 1); + + // If the list is now empty then it only contained the media header and nothing + // else. + if buffer_list.is_empty() { + return Ok(gst::FlowSuccess::Ok); + } + + // Otherwise get the next buffer and continue working with that. + first = buffer_list.get(0).unwrap(); + } + + // If the buffer only has the HEADER flag set then this is a segment header that is + // followed by one or more actual media buffers. + assert!(first.flags().contains(gst::BufferFlags::HEADER)); + + let mut path = state.path.clone(); + let basename = format!( + "segment_{}.{}", + state.segments.len() + 1, + if is_video { "cmfv" } else { "cmfa" } + ); + path.push(&basename); + println!("writing segment to {}", path.display()); + + let duration = first.duration().unwrap(); + + let mut file = std::fs::File::create(path).expect("failed to open fragment"); + for buffer in &*buffer_list { + use std::io::prelude::*; + + let map = buffer.map_readable().unwrap(); + file.write_all(&map).expect("failed to write fragment"); + } + + state.segments.push(Segment { + duration, + path: basename.to_string(), + }); + + Ok(gst::FlowSuccess::Ok) + }) + .eos(move |_sink| { + let state = state_clone.lock().unwrap(); + + // Now write the manifest + let mut path = state.path.clone(); + path.push("manifest.m3u8"); + + println!("writing manifest to {}", path.display()); + + let playlist = MediaPlaylist { + version: Some(7), + target_duration: 2.5, + media_sequence: 0, + segments: state + .segments + .iter() + .map(|segment| MediaSegment { + uri: segment.path.to_string(), + duration: (segment.duration.nseconds() as f64 + / gst::ClockTime::SECOND.nseconds() as f64) + as f32, + map: Some(m3u8_rs::Map { + uri: "init.cmfi".into(), + ..Default::default() + }), + ..Default::default() + }) + .collect(), + end_list: true, + playlist_type: Some(MediaPlaylistType::Vod), + i_frames_only: false, + start: None, + independent_segments: true, + ..Default::default() + }; + + let mut file = std::fs::File::create(path).unwrap(); + playlist + .write_to(&mut file) + .expect("Failed to write media playlist"); + }) + .build(), + ); +} + +fn probe_encoder(state: Arc<Mutex<State>>, enc: gst::Element) { + enc.static_pad("src").unwrap().add_probe( + gst::PadProbeType::EVENT_DOWNSTREAM, + move |_pad, info| match info.data { + Some(gst::PadProbeData::Event(ref ev)) => match ev.view() { + gst::EventView::Caps(e) => { + let mime = gst_pbutils::codec_utils_caps_get_mime_codec(&e.caps().to_owned()); + + let mut state = state.lock().unwrap(); + state.all_mimes.push(mime.unwrap().into()); + state.maybe_write_manifest(); + gst::PadProbeReturn::Remove + } + _ => gst::PadProbeReturn::Ok, + }, + _ => gst::PadProbeReturn::Ok, + }, + ); +} + +impl VideoStream { + fn setup( + &self, + state: Arc<Mutex<State>>, + pipeline: &gst::Pipeline, + path: &Path, + ) -> Result<(), Error> { + let src = gst::ElementFactory::make("videotestsrc") + .property("num-buffers", 300) + .build()?; + let raw_capsfilter = gst::ElementFactory::make("capsfilter") + .property( + "caps", + gst_video::VideoCapsBuilder::new() + .format(gst_video::VideoFormat::I420) + .width(self.width as i32) + .height(self.height as i32) + .framerate(30.into()) + .build(), + ) + .build()?; + let timeoverlay = gst::ElementFactory::make("timeoverlay").build()?; + let enc = gst::ElementFactory::make("x264enc") + .property("bframes", 0u32) + .property("bitrate", self.bitrate as u32 / 1000u32) + .build()?; + let h264_capsfilter = gst::ElementFactory::make("capsfilter") + .property( + "caps", + gst::Caps::builder("video/x-h264") + .field("profile", "main") + .build(), + ) + .build()?; + let mux = gst::ElementFactory::make("cmafmux") + .property("fragment-duration", 2500.mseconds()) + .property_from_str("header-update-mode", "update") + .property("write-mehd", true) + .build()?; + let appsink = gst_app::AppSink::builder().buffer_list(true).build(); + + pipeline.add_many(&[ + &src, + &raw_capsfilter, + &timeoverlay, + &enc, + &h264_capsfilter, + &mux, + appsink.upcast_ref(), + ])?; + + gst::Element::link_many(&[ + &src, + &raw_capsfilter, + &timeoverlay, + &enc, + &h264_capsfilter, + &mux, + appsink.upcast_ref(), + ])?; + + probe_encoder(state, enc); + + setup_appsink(&appsink, &self.name, path, true); + + Ok(()) + } +} + +impl AudioStream { + fn setup( + &self, + state: Arc<Mutex<State>>, + pipeline: &gst::Pipeline, + path: &Path, + ) -> Result<(), Error> { + let src = gst::ElementFactory::make("audiotestsrc") + .property("num-buffers", 100) + .property("samplesperbuffer", 4410) + .property_from_str("wave", &self.wave) + .build()?; + let raw_capsfilter = gst::ElementFactory::make("capsfilter") + .property( + "caps", + gst_audio::AudioCapsBuilder::new().rate(44100).build(), + ) + .build()?; + let enc = gst::ElementFactory::make("avenc_aac").build()?; + let mux = gst::ElementFactory::make("cmafmux") + .property("fragment-duration", 2500.mseconds()) + .property_from_str("header-update-mode", "update") + .property("write-mehd", true) + .build()?; + let appsink = gst_app::AppSink::builder().buffer_list(true).build(); + + pipeline.add_many(&[&src, &raw_capsfilter, &enc, &mux, appsink.upcast_ref()])?; + + gst::Element::link_many(&[&src, &raw_capsfilter, &enc, &mux, appsink.upcast_ref()])?; + + probe_encoder(state, enc); + + setup_appsink(&appsink, &self.name, path, false); + + Ok(()) + } +} + +fn main() -> Result<(), Error> { + gst::init()?; + + gstfmp4::plugin_register_static()?; + + let path = PathBuf::from("hls_vod_stream"); + + let pipeline = gst::Pipeline::default(); + + std::fs::create_dir_all(&path).expect("failed to create directory"); + + let mut manifest_path = path.clone(); + manifest_path.push("manifest.m3u8"); + + let state = Arc::new(Mutex::new(State { + video_streams: vec![VideoStream { + name: "video_0".to_string(), + bitrate: 2_048_000, + width: 1280, + height: 720, + }], + audio_streams: vec![ + AudioStream { + name: "audio_0".to_string(), + lang: "eng".to_string(), + default: true, + wave: "sine".to_string(), + }, + AudioStream { + name: "audio_1".to_string(), + lang: "fre".to_string(), + default: false, + wave: "white-noise".to_string(), + }, + ], + all_mimes: vec![], + path: manifest_path.clone(), + wrote_manifest: false, + })); + + { + let state_lock = state.lock().unwrap(); + + for stream in &state_lock.video_streams { + stream.setup(state.clone(), &pipeline, &path)?; + } + + for stream in &state_lock.audio_streams { + stream.setup(state.clone(), &pipeline, &path)?; + } + } + + pipeline.set_state(gst::State::Playing)?; + + let bus = pipeline + .bus() + .expect("Pipeline without bus. Shouldn't happen!"); + + for msg in bus.iter_timed(gst::ClockTime::NONE) { + use gst::MessageView; + + match msg.view() { + MessageView::Eos(..) => { + println!("EOS"); + break; + } + MessageView::Error(err) => { + pipeline.set_state(gst::State::Null)?; + eprintln!( + "Got error from {}: {} ({})", + msg.src() + .map(|s| String::from(s.path_string())) + .unwrap_or_else(|| "None".into()), + err.error(), + err.debug().unwrap_or_else(|| "".into()), + ); + break; + } + _ => (), + } + } + + pipeline.set_state(gst::State::Null)?; + + Ok(()) +} diff --git a/mux/fmp4/src/fmp4mux/boxes.rs b/mux/fmp4/src/fmp4mux/boxes.rs new file mode 100644 index 000000000..9b69fb36f --- /dev/null +++ b/mux/fmp4/src/fmp4mux/boxes.rs @@ -0,0 +1,2073 @@ +// Copyright (C) 2021 Sebastian Dröge <sebastian@centricular.com> +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// <https://mozilla.org/MPL/2.0/>. +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::prelude::*; + +use anyhow::{anyhow, bail, Context, Error}; + +use super::Buffer; + +fn write_box<T, F: FnOnce(&mut Vec<u8>) -> Result<T, Error>>( + vec: &mut Vec<u8>, + fourcc: impl std::borrow::Borrow<[u8; 4]>, + content_func: F, +) -> Result<T, Error> { + // Write zero size ... + let size_pos = vec.len(); + vec.extend([0u8; 4]); + vec.extend(fourcc.borrow()); + + let res = content_func(vec)?; + + // ... and update it here later. + let size: u32 = vec + .len() + .checked_sub(size_pos) + .expect("vector shrunk") + .try_into() + .context("too big box content")?; + vec[size_pos..][..4].copy_from_slice(&size.to_be_bytes()); + + Ok(res) +} + +const FULL_BOX_VERSION_0: u8 = 0; +const FULL_BOX_VERSION_1: u8 = 1; + +const FULL_BOX_FLAGS_NONE: u32 = 0; + +fn write_full_box<T, F: FnOnce(&mut Vec<u8>) -> Result<T, Error>>( + vec: &mut Vec<u8>, + fourcc: impl std::borrow::Borrow<[u8; 4]>, + version: u8, + flags: u32, + content_func: F, +) -> Result<T, Error> { + write_box(vec, fourcc, move |vec| { + assert_eq!(flags >> 24, 0); + vec.extend(((u32::from(version) << 24) | flags).to_be_bytes()); + content_func(vec) + }) +} + +fn cmaf_brands_from_caps(caps: &gst::CapsRef, compatible_brands: &mut Vec<&'static [u8; 4]>) { + let s = caps.structure(0).unwrap(); + match s.name() { + "video/x-h264" => { + let width = s.get::<i32>("width").ok(); + let height = s.get::<i32>("height").ok(); + let fps = s.get::<gst::Fraction>("framerate").ok(); + let profile = s.get::<&str>("profile").ok(); + let level = s + .get::<&str>("level") + .ok() + .map(|l| l.split_once('.').unwrap_or((l, "0"))); + let colorimetry = s.get::<&str>("colorimetry").ok(); + + if let (Some(width), Some(height), Some(profile), Some(level), Some(fps)) = + (width, height, profile, level, fps) + { + if profile == "high" + || profile == "main" + || profile == "baseline" + || profile == "constrained-baseline" + { + if width <= 864 + && height <= 576 + && level <= ("3", "1") + && fps <= gst::Fraction::new(60, 1) + { + #[cfg(feature = "v1_18")] + { + if let Some(colorimetry) = colorimetry + .and_then(|c| c.parse::<gst_video::VideoColorimetry>().ok()) + { + if matches!( + colorimetry.primaries(), + gst_video::VideoColorPrimaries::Bt709 + | gst_video::VideoColorPrimaries::Bt470bg + | gst_video::VideoColorPrimaries::Smpte170m + ) && matches!( + colorimetry.transfer(), + gst_video::VideoTransferFunction::Bt709 + | gst_video::VideoTransferFunction::Bt601 + ) && matches!( + colorimetry.matrix(), + gst_video::VideoColorMatrix::Bt709 + | gst_video::VideoColorMatrix::Bt601 + ) { + compatible_brands.push(b"cfsd"); + } + } else { + // Assume it's OK + compatible_brands.push(b"cfsd"); + } + } + #[cfg(not(feature = "v1_18"))] + { + // Assume it's OK + compatible_brands.push(b"cfsd"); + } + } else if width <= 1920 + && height <= 1080 + && level <= ("4", "0") + && fps <= gst::Fraction::new(60, 1) + { + #[cfg(feature = "v1_18")] + { + if let Some(colorimetry) = colorimetry + .and_then(|c| c.parse::<gst_video::VideoColorimetry>().ok()) + { + if matches!( + colorimetry.primaries(), + gst_video::VideoColorPrimaries::Bt709 + ) && matches!( + colorimetry.transfer(), + gst_video::VideoTransferFunction::Bt709 + ) && matches!( + colorimetry.matrix(), + gst_video::VideoColorMatrix::Bt709 + ) { + compatible_brands.push(b"cfhd"); + } + } else { + // Assume it's OK + compatible_brands.push(b"cfhd"); + } + } + #[cfg(not(feature = "v1_18"))] + { + // Assume it's OK + compatible_brands.push(b"cfhd"); + } + } else if width <= 1920 + && height <= 1080 + && level <= ("4", "2") + && fps <= gst::Fraction::new(60, 1) + { + if let Some(colorimetry) = + colorimetry.and_then(|c| c.parse::<gst_video::VideoColorimetry>().ok()) + { + if matches!( + colorimetry.primaries(), + gst_video::VideoColorPrimaries::Bt709 + ) && matches!( + colorimetry.transfer(), + gst_video::VideoTransferFunction::Bt709 + ) && matches!( + colorimetry.matrix(), + gst_video::VideoColorMatrix::Bt709 + ) { + compatible_brands.push(b"chdf"); + } + } else { + // Assume it's OK + compatible_brands.push(b"chdf"); + } + } + } + } + } + "audio/mpeg" => { + compatible_brands.push(b"caac"); + } + "video/x-h265" => { + let width = s.get::<i32>("width").ok(); + let height = s.get::<i32>("height").ok(); + let fps = s.get::<gst::Fraction>("framerate").ok(); + let profile = s.get::<&str>("profile").ok(); + let tier = s.get::<&str>("tier").ok(); + let level = s + .get::<&str>("level") + .ok() + .map(|l| l.split_once('.').unwrap_or((l, "0"))); + let colorimetry = s.get::<&str>("colorimetry").ok(); + + if let (Some(width), Some(height), Some(profile), Some(tier), Some(level), Some(fps)) = + (width, height, profile, tier, level, fps) + { + if profile == "main" && tier == "main" { + if width <= 1920 + && height <= 1080 + && level <= ("4", "1") + && fps <= gst::Fraction::new(60, 1) + { + if let Some(colorimetry) = + colorimetry.and_then(|c| c.parse::<gst_video::VideoColorimetry>().ok()) + { + if matches!( + colorimetry.primaries(), + gst_video::VideoColorPrimaries::Bt709 + ) && matches!( + colorimetry.transfer(), + gst_video::VideoTransferFunction::Bt709 + ) && matches!( + colorimetry.matrix(), + gst_video::VideoColorMatrix::Bt709 + ) { + compatible_brands.push(b"chhd"); + } + } else { + // Assume it's OK + compatible_brands.push(b"chhd"); + } + } else if width <= 3840 + && height <= 2160 + && level <= ("5", "0") + && fps <= gst::Fraction::new(60, 1) + { + if let Some(colorimetry) = + colorimetry.and_then(|c| c.parse::<gst_video::VideoColorimetry>().ok()) + { + if matches!( + colorimetry.primaries(), + gst_video::VideoColorPrimaries::Bt709 + ) && matches!( + colorimetry.transfer(), + gst_video::VideoTransferFunction::Bt709 + ) && matches!( + colorimetry.matrix(), + gst_video::VideoColorMatrix::Bt709 + ) { + compatible_brands.push(b"cud8"); + } + } else { + // Assume it's OK + compatible_brands.push(b"cud8"); + } + } + } else if profile == "main-10" && tier == "main-10" { + if width <= 1920 + && height <= 1080 + && level <= ("4", "1") + && fps <= gst::Fraction::new(60, 1) + { + if let Some(colorimetry) = + colorimetry.and_then(|c| c.parse::<gst_video::VideoColorimetry>().ok()) + { + if matches!( + colorimetry.primaries(), + gst_video::VideoColorPrimaries::Bt709 + ) && matches!( + colorimetry.transfer(), + gst_video::VideoTransferFunction::Bt709 + ) && matches!( + colorimetry.matrix(), + gst_video::VideoColorMatrix::Bt709 + ) { + compatible_brands.push(b"chh1"); + } + } else { + // Assume it's OK + compatible_brands.push(b"chh1"); + } + } else if width <= 3840 + && height <= 2160 + && level <= ("5", "1") + && fps <= gst::Fraction::new(60, 1) + { + #[cfg(feature = "v1_18")] + if let Some(colorimetry) = + colorimetry.and_then(|c| c.parse::<gst_video::VideoColorimetry>().ok()) + { + if matches!( + colorimetry.primaries(), + gst_video::VideoColorPrimaries::Bt709 + | gst_video::VideoColorPrimaries::Bt2020 + ) && matches!( + colorimetry.transfer(), + gst_video::VideoTransferFunction::Bt709 + | gst_video::VideoTransferFunction::Bt202010 + | gst_video::VideoTransferFunction::Bt202012 + ) && matches!( + colorimetry.matrix(), + gst_video::VideoColorMatrix::Bt709 + | gst_video::VideoColorMatrix::Bt2020 + ) { + compatible_brands.push(b"cud1"); + } else if matches!( + colorimetry.primaries(), + gst_video::VideoColorPrimaries::Bt2020 + ) && matches!( + colorimetry.transfer(), + gst_video::VideoTransferFunction::Smpte2084 + ) && matches!( + colorimetry.matrix(), + gst_video::VideoColorMatrix::Bt2020 + ) { + compatible_brands.push(b"chd1"); + } else if matches!( + colorimetry.primaries(), + gst_video::VideoColorPrimaries::Bt2020 + ) && matches!( + colorimetry.transfer(), + gst_video::VideoTransferFunction::AribStdB67 + ) && matches!( + colorimetry.matrix(), + gst_video::VideoColorMatrix::Bt2020 + ) { + compatible_brands.push(b"clg1"); + } + } else { + // Assume it's OK + compatible_brands.push(b"cud1"); + } + } + #[cfg(not(feature = "v1_18"))] + { + // Assume it's OK + compatible_brands.push(b"cud1"); + } + } + } + } + _ => (), + } +} + +fn brands_from_variant_and_caps<'a>( + variant: super::Variant, + mut caps: impl Iterator<Item = &'a gst::Caps>, +) -> (&'static [u8; 4], Vec<&'static [u8; 4]>) { + match variant { + super::Variant::ISO | super::Variant::ONVIF => (b"iso6", vec![b"iso6"]), + super::Variant::DASH => { + // FIXME: `dsms` / `dash` brands, `msix` + (b"msdh", vec![b"dums", b"msdh", b"iso6"]) + } + super::Variant::CMAF => { + let mut compatible_brands = vec![b"iso6", b"cmfc"]; + + cmaf_brands_from_caps(caps.next().unwrap(), &mut compatible_brands); + assert_eq!(caps.next(), None); + + (b"cmf2", compatible_brands) + } + } +} + +/// Creates `ftyp` and `moov` boxes +pub(super) fn create_fmp4_header(cfg: super::HeaderConfiguration) -> Result<gst::Buffer, Error> { + let mut v = vec![]; + + let (brand, compatible_brands) = brands_from_variant_and_caps(cfg.variant, cfg.streams.iter()); + + write_box(&mut v, b"ftyp", |v| { + // major brand + v.extend(brand); + // minor version + v.extend(0u32.to_be_bytes()); + // compatible brands + v.extend(compatible_brands.into_iter().flatten()); + + Ok(()) + })?; + + write_box(&mut v, b"moov", |v| write_moov(v, &cfg))?; + + if cfg.variant == super::Variant::ONVIF { + write_full_box( + &mut v, + b"meta", + FULL_BOX_VERSION_0, + FULL_BOX_FLAGS_NONE, + |v| { + write_full_box(v, b"hdlr", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| { + // Handler type + v.extend(b"null"); + + // Reserved + v.extend([0u8; 3 * 4]); + + // Name + v.extend(b"MetadataHandler"); + + Ok(()) + })?; + + write_box(v, b"cstb", |v| { + // entry count + v.extend(1u32.to_be_bytes()); + + // track id + v.extend(0u32.to_be_bytes()); + + // start UTC time in 100ns units since Jan 1 1601 + v.extend(cfg.start_utc_time.unwrap().to_be_bytes()); + + Ok(()) + }) + }, + )?; + } + + Ok(gst::Buffer::from_mut_slice(v)) +} + +fn write_moov(v: &mut Vec<u8>, cfg: &super::HeaderConfiguration) -> Result<(), Error> { + use gst::glib; + + let base = glib::DateTime::from_utc(1904, 1, 1, 0, 0, 0.0)?; + let now = glib::DateTime::now_utc()?; + let creation_time = + u64::try_from(now.difference(&base).as_seconds()).expect("time before 1904"); + + write_full_box(v, b"mvhd", FULL_BOX_VERSION_1, FULL_BOX_FLAGS_NONE, |v| { + write_mvhd(v, cfg, creation_time) + })?; + for (idx, caps) in cfg.streams.iter().enumerate() { + write_box(v, b"trak", |v| { + let mut references = vec![]; + + // Reference the video track for ONVIF metadata tracks + if cfg.variant == super::Variant::ONVIF + && caps.structure(0).unwrap().name() == "application/x-onvif-metadata" + { + // Find the first video track + for (idx, caps) in cfg.streams.iter().enumerate() { + let s = caps.structure(0).unwrap(); + + if matches!(s.name(), "video/x-h264" | "video/x-h265" | "image/jpeg") { + references.push(TrackReference { + reference_type: *b"cdsc", + track_ids: vec![idx as u32 + 1], + }); + break; + } + } + } + + write_trak(v, cfg, idx, caps, creation_time, &references) + })?; + } + write_box(v, b"mvex", |v| write_mvex(v, cfg))?; + + Ok(()) +} + +fn caps_to_timescale(caps: &gst::CapsRef) -> u32 { + let s = caps.structure(0).unwrap(); + + if let Ok(fps) = s.get::<gst::Fraction>("framerate") { + if fps.numer() == 0 { + return 10_000; + } + + if fps.denom() != 1 && fps.denom() != 1001 { + if let Some(fps) = (fps.denom() as u64) + .nseconds() + .mul_div_round(1_000_000_000, fps.numer() as u64) + .and_then(gst_video::guess_framerate) + { + return (fps.numer() as u32) + .mul_div_round(100, fps.denom() as u32) + .unwrap_or(10_000); + } + } + + (fps.numer() as u32) + .mul_div_round(100, fps.denom() as u32) + .unwrap_or(10_000) + } else if let Ok(rate) = s.get::<i32>("rate") { + rate as u32 + } else { + 10_000 + } +} + +fn write_mvhd( + v: &mut Vec<u8>, + cfg: &super::HeaderConfiguration, + creation_time: u64, +) -> Result<(), Error> { + // Creation time + v.extend(creation_time.to_be_bytes()); + // Modification time + v.extend(creation_time.to_be_bytes()); + // Timescale: uses the reference track timescale + v.extend(caps_to_timescale(&cfg.streams[0]).to_be_bytes()); + // Duration + v.extend(0u64.to_be_bytes()); + + // Rate 1.0 + v.extend((1u32 << 16).to_be_bytes()); + // Volume 1.0 + v.extend((1u16 << 8).to_be_bytes()); + // Reserved + v.extend([0u8; 2 + 2 * 4]); + + // Matrix + v.extend( + [ + (1u32 << 16).to_be_bytes(), + 0u32.to_be_bytes(), + 0u32.to_be_bytes(), + 0u32.to_be_bytes(), + (1u32 << 16).to_be_bytes(), + 0u32.to_be_bytes(), + 0u32.to_be_bytes(), + 0u32.to_be_bytes(), + (16384u32 << 16).to_be_bytes(), + ] + .into_iter() + .flatten(), + ); + + // Pre defined + v.extend([0u8; 6 * 4]); + + // Next track id + v.extend((cfg.streams.len() as u32 + 1).to_be_bytes()); + + Ok(()) +} + +const TKHD_FLAGS_TRACK_ENABLED: u32 = 0x1; +const TKHD_FLAGS_TRACK_IN_MOVIE: u32 = 0x2; +const TKHD_FLAGS_TRACK_IN_PREVIEW: u32 = 0x4; + +struct TrackReference { + reference_type: [u8; 4], + track_ids: Vec<u32>, +} + +fn write_trak( + v: &mut Vec<u8>, + cfg: &super::HeaderConfiguration, + idx: usize, + caps: &gst::CapsRef, + creation_time: u64, + references: &[TrackReference], +) -> Result<(), Error> { + write_full_box( + v, + b"tkhd", + FULL_BOX_VERSION_1, + TKHD_FLAGS_TRACK_ENABLED | TKHD_FLAGS_TRACK_IN_MOVIE | TKHD_FLAGS_TRACK_IN_PREVIEW, + |v| write_tkhd(v, cfg, idx, caps, creation_time), + )?; + + // TODO: write edts if necessary: for audio tracks to remove initialization samples + // TODO: write edts optionally for negative DTS instead of offsetting the DTS + + write_box(v, b"mdia", |v| write_mdia(v, cfg, caps, creation_time))?; + + if !references.is_empty() { + write_box(v, b"tref", |v| write_tref(v, cfg, references))?; + } + + Ok(()) +} + +fn write_tkhd( + v: &mut Vec<u8>, + _cfg: &super::HeaderConfiguration, + idx: usize, + caps: &gst::CapsRef, + creation_time: u64, +) -> Result<(), Error> { + // Creation time + v.extend(creation_time.to_be_bytes()); + // Modification time + v.extend(creation_time.to_be_bytes()); + // Track ID + v.extend((idx as u32 + 1).to_be_bytes()); + // Reserved + v.extend(0u32.to_be_bytes()); + // Duration + v.extend(0u64.to_be_bytes()); + + // Reserved + v.extend([0u8; 2 * 4]); + + // Layer + v.extend(0u16.to_be_bytes()); + // Alternate group + v.extend(0u16.to_be_bytes()); + + // Volume + let s = caps.structure(0).unwrap(); + match s.name() { + "audio/mpeg" | "audio/x-alaw" | "audio/x-mulaw" | "audio/x-adpcm" => { + v.extend((1u16 << 8).to_be_bytes()) + } + _ => v.extend(0u16.to_be_bytes()), + } + + // Reserved + v.extend([0u8; 2]); + + // Matrix + v.extend( + [ + (1u32 << 16).to_be_bytes(), + 0u32.to_be_bytes(), + 0u32.to_be_bytes(), + 0u32.to_be_bytes(), + (1u32 << 16).to_be_bytes(), + 0u32.to_be_bytes(), + 0u32.to_be_bytes(), + 0u32.to_be_bytes(), + (16384u32 << 16).to_be_bytes(), + ] + .into_iter() + .flatten(), + ); + + // Width/height + match s.name() { + "video/x-h264" | "video/x-h265" | "image/jpeg" => { + let width = s.get::<i32>("width").context("video caps without width")? as u32; + let height = s + .get::<i32>("height") + .context("video caps without height")? as u32; + let par = s + .get::<gst::Fraction>("pixel-aspect-ratio") + .unwrap_or_else(|_| gst::Fraction::new(1, 1)); + + let width = std::cmp::min( + width + .mul_div_round(par.numer() as u32, par.denom() as u32) + .unwrap_or(u16::MAX as u32), + u16::MAX as u32, + ); + let height = std::cmp::min(height, u16::MAX as u32); + + v.extend((width << 16).to_be_bytes()); + v.extend((height << 16).to_be_bytes()); + } + _ => v.extend([0u8; 2 * 4]), + } + + Ok(()) +} + +fn write_mdia( + v: &mut Vec<u8>, + cfg: &super::HeaderConfiguration, + caps: &gst::CapsRef, + creation_time: u64, +) -> Result<(), Error> { + write_full_box(v, b"mdhd", FULL_BOX_VERSION_1, FULL_BOX_FLAGS_NONE, |v| { + write_mdhd(v, cfg, caps, creation_time) + })?; + write_full_box(v, b"hdlr", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| { + write_hdlr(v, cfg, caps) + })?; + + // TODO: write elng if needed + + write_box(v, b"minf", |v| write_minf(v, cfg, caps))?; + + Ok(()) +} + +fn write_tref( + v: &mut Vec<u8>, + _cfg: &super::HeaderConfiguration, + references: &[TrackReference], +) -> Result<(), Error> { + for reference in references { + write_box(v, &reference.reference_type, |v| { + for track_id in &reference.track_ids { + v.extend(track_id.to_be_bytes()); + } + + Ok(()) + })?; + } + + Ok(()) +} + +fn language_code(lang: impl std::borrow::Borrow<[u8; 3]>) -> u16 { + let lang = lang.borrow(); + + // TODO: Need to relax this once we get the language code from tags + assert!(lang.iter().all(u8::is_ascii_lowercase)); + + (((lang[0] as u16 - 0x60) & 0x1F) << 10) + + (((lang[1] as u16 - 0x60) & 0x1F) << 5) + + ((lang[2] as u16 - 0x60) & 0x1F) +} + +fn write_mdhd( + v: &mut Vec<u8>, + _cfg: &super::HeaderConfiguration, + caps: &gst::CapsRef, + creation_time: u64, +) -> Result<(), Error> { + // Creation time + v.extend(creation_time.to_be_bytes()); + // Modification time + v.extend(creation_time.to_be_bytes()); + // Timescale + v.extend(caps_to_timescale(caps).to_be_bytes()); + // Duration + v.extend(0u64.to_be_bytes()); + + // Language as ISO-639-2/T + // TODO: get actual language from the tags + v.extend(language_code(b"und").to_be_bytes()); + + // Pre-defined + v.extend([0u8; 2]); + + Ok(()) +} + +fn write_hdlr( + v: &mut Vec<u8>, + _cfg: &super::HeaderConfiguration, + caps: &gst::CapsRef, +) -> Result<(), Error> { + // Pre-defined + v.extend([0u8; 4]); + + let s = caps.structure(0).unwrap(); + let (handler_type, name) = match s.name() { + "video/x-h264" | "video/x-h265" | "image/jpeg" => (b"vide", b"VideoHandler\0".as_slice()), + "audio/mpeg" | "audio/x-alaw" | "audio/x-mulaw" | "audio/x-adpcm" => { + (b"soun", b"SoundHandler\0".as_slice()) + } + "application/x-onvif-metadata" => (b"meta", b"MetadataHandler\0".as_slice()), + _ => unreachable!(), + }; + + // Handler type + v.extend(handler_type); + + // Reserved + v.extend([0u8; 3 * 4]); + + // Name + v.extend(name); + + Ok(()) +} + +fn write_minf( + v: &mut Vec<u8>, + cfg: &super::HeaderConfiguration, + caps: &gst::CapsRef, +) -> Result<(), Error> { + let s = caps.structure(0).unwrap(); + + match s.name() { + "video/x-h264" | "video/x-h265" | "image/jpeg" => { + // Flags are always 1 for unspecified reasons + write_full_box(v, b"vmhd", FULL_BOX_VERSION_0, 1, |v| write_vmhd(v, cfg))? + } + "audio/mpeg" | "audio/x-alaw" | "audio/x-mulaw" | "audio/x-adpcm" => { + write_full_box(v, b"smhd", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| { + write_smhd(v, cfg) + })? + } + "application/x-onvif-metadata" => { + write_full_box(v, b"nmhd", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |_v| { + Ok(()) + })? + } + _ => unreachable!(), + } + + write_box(v, b"dinf", |v| write_dinf(v, cfg))?; + + write_box(v, b"stbl", |v| write_stbl(v, cfg, caps))?; + + Ok(()) +} + +fn write_vmhd(v: &mut Vec<u8>, _cfg: &super::HeaderConfiguration) -> Result<(), Error> { + // Graphics mode + v.extend([0u8; 2]); + + // opcolor + v.extend([0u8; 2 * 3]); + + Ok(()) +} + +fn write_smhd(v: &mut Vec<u8>, _cfg: &super::HeaderConfiguration) -> Result<(), Error> { + // Balance + v.extend([0u8; 2]); + + // Reserved + v.extend([0u8; 2]); + + Ok(()) +} + +fn write_dinf(v: &mut Vec<u8>, cfg: &super::HeaderConfiguration) -> Result<(), Error> { + write_full_box(v, b"dref", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| { + write_dref(v, cfg) + })?; + + Ok(()) +} + +const DREF_FLAGS_MEDIA_IN_SAME_FILE: u32 = 0x1; + +fn write_dref(v: &mut Vec<u8>, _cfg: &super::HeaderConfiguration) -> Result<(), Error> { + // Entry count + v.extend(1u32.to_be_bytes()); + + write_full_box( + v, + b"url ", + FULL_BOX_VERSION_0, + DREF_FLAGS_MEDIA_IN_SAME_FILE, + |_v| Ok(()), + )?; + + Ok(()) +} + +fn write_stbl( + v: &mut Vec<u8>, + cfg: &super::HeaderConfiguration, + caps: &gst::CapsRef, +) -> Result<(), Error> { + write_full_box(v, b"stsd", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| { + write_stsd(v, cfg, caps) + })?; + write_full_box(v, b"stts", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| { + write_stts(v, cfg) + })?; + write_full_box(v, b"stsc", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| { + write_stsc(v, cfg) + })?; + write_full_box(v, b"stsz", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| { + write_stsz(v, cfg) + })?; + + write_full_box(v, b"stco", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| { + write_stco(v, cfg) + })?; + + // For video write a sync sample box as indication that not all samples are sync samples + let s = caps.structure(0).unwrap(); + match s.name() { + "video/x-h264" | "video/x-h265" => { + write_full_box(v, b"stss", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| { + write_stss(v, cfg) + })? + } + _ => (), + } + + Ok(()) +} + +fn write_stsd( + v: &mut Vec<u8>, + cfg: &super::HeaderConfiguration, + caps: &gst::CapsRef, +) -> Result<(), Error> { + // Entry count + v.extend(1u32.to_be_bytes()); + + let s = caps.structure(0).unwrap(); + match s.name() { + "video/x-h264" | "video/x-h265" | "image/jpeg" => write_visual_sample_entry(v, cfg, caps)?, + "audio/mpeg" | "audio/x-alaw" | "audio/x-mulaw" | "audio/x-adpcm" => { + write_audio_sample_entry(v, cfg, caps)? + } + "application/x-onvif-metadata" => write_xml_meta_data_sample_entry(v, cfg, caps)?, + _ => unreachable!(), + } + + Ok(()) +} + +fn write_sample_entry_box<T, F: FnOnce(&mut Vec<u8>) -> Result<T, Error>>( + v: &mut Vec<u8>, + fourcc: impl std::borrow::Borrow<[u8; 4]>, + content_func: F, +) -> Result<T, Error> { + write_box(v, fourcc, move |v| { + // Reserved + v.extend([0u8; 6]); + + // Data reference index + v.extend(1u16.to_be_bytes()); + + content_func(v) + }) +} + +fn write_visual_sample_entry( + v: &mut Vec<u8>, + _cfg: &super::HeaderConfiguration, + caps: &gst::CapsRef, +) -> Result<(), Error> { + let s = caps.structure(0).unwrap(); + let fourcc = match s.name() { + "video/x-h264" => { + let stream_format = s.get::<&str>("stream-format").context("no stream-format")?; + match stream_format { + "avc" => b"avc1", + "avc3" => b"avc3", + _ => unreachable!(), + } + } + "video/x-h265" => { + let stream_format = s.get::<&str>("stream-format").context("no stream-format")?; + match stream_format { + "hvc1" => b"hvc1", + "hev1" => b"hev1", + _ => unreachable!(), + } + } + "image/jpeg" => b"jpeg", + _ => unreachable!(), + }; + + write_sample_entry_box(v, fourcc, move |v| { + // pre-defined + v.extend([0u8; 2]); + // Reserved + v.extend([0u8; 2]); + // pre-defined + v.extend([0u8; 3 * 4]); + + // Width + let width = + u16::try_from(s.get::<i32>("width").context("no width")?).context("too big width")?; + v.extend(width.to_be_bytes()); + + // Height + let height = u16::try_from(s.get::<i32>("height").context("no height")?) + .context("too big height")?; + v.extend(height.to_be_bytes()); + + // Horizontal resolution + v.extend(0x00480000u32.to_be_bytes()); + + // Vertical resolution + v.extend(0x00480000u32.to_be_bytes()); + + // Reserved + v.extend([0u8; 4]); + + // Frame count + v.extend(1u16.to_be_bytes()); + + // Compressor name + v.extend([0u8; 32]); + + // Depth + v.extend(0x0018u16.to_be_bytes()); + + // Pre-defined + v.extend((-1i16).to_be_bytes()); + + // Codec specific boxes + match s.name() { + "video/x-h264" => { + let codec_data = s + .get::<&gst::BufferRef>("codec_data") + .context("no codec_data")?; + let map = codec_data + .map_readable() + .context("codec_data not mappable")?; + write_box(v, b"avcC", move |v| { + v.extend_from_slice(&map); + Ok(()) + })?; + } + "video/x-h265" => { + let codec_data = s + .get::<&gst::BufferRef>("codec_data") + .context("no codec_data")?; + let map = codec_data + .map_readable() + .context("codec_data not mappable")?; + write_box(v, b"hvcC", move |v| { + v.extend_from_slice(&map); + Ok(()) + })?; + } + "image/jpeg" => { + // Nothing to do here + } + _ => unreachable!(), + } + + if let Ok(par) = s.get::<gst::Fraction>("pixel-aspect-ratio") { + write_box(v, b"pasp", move |v| { + v.extend((par.numer() as u32).to_be_bytes()); + v.extend((par.denom() as u32).to_be_bytes()); + Ok(()) + })?; + } + + if let Some(colorimetry) = s + .get::<&str>("colorimetry") + .ok() + .and_then(|c| c.parse::<gst_video::VideoColorimetry>().ok()) + { + write_box(v, b"colr", move |v| { + v.extend(b"nclx"); + let (primaries, transfer, matrix) = { + #[cfg(feature = "v1_18")] + { + ( + (colorimetry.primaries().to_iso() as u16), + (colorimetry.transfer().to_iso() as u16), + (colorimetry.matrix().to_iso() as u16), + ) + } + #[cfg(not(feature = "v1_18"))] + { + let primaries = match colorimetry.primaries() { + gst_video::VideoColorPrimaries::Bt709 => 1u16, + gst_video::VideoColorPrimaries::Bt470m => 4u16, + gst_video::VideoColorPrimaries::Bt470bg => 5u16, + gst_video::VideoColorPrimaries::Smpte170m => 6u16, + gst_video::VideoColorPrimaries::Smpte240m => 7u16, + gst_video::VideoColorPrimaries::Film => 8u16, + gst_video::VideoColorPrimaries::Bt2020 => 9u16, + _ => 2, + }; + let transfer = match colorimetry.transfer() { + gst_video::VideoTransferFunction::Bt709 => 1u16, + gst_video::VideoTransferFunction::Gamma22 => 4u16, + gst_video::VideoTransferFunction::Gamma28 => 5u16, + gst_video::VideoTransferFunction::Smpte240m => 7u16, + gst_video::VideoTransferFunction::Gamma10 => 8u16, + gst_video::VideoTransferFunction::Log100 => 9u16, + gst_video::VideoTransferFunction::Log316 => 10u16, + gst_video::VideoTransferFunction::Srgb => 13u16, + gst_video::VideoTransferFunction::Bt202012 => 15u16, + _ => 2, + }; + let matrix = match colorimetry.matrix() { + gst_video::VideoColorMatrix::Rgb => 0u16, + gst_video::VideoColorMatrix::Bt709 => 1u16, + gst_video::VideoColorMatrix::Fcc => 4u16, + gst_video::VideoColorMatrix::Bt601 => 6u16, + gst_video::VideoColorMatrix::Smpte240m => 7u16, + gst_video::VideoColorMatrix::Bt2020 => 9u16, + _ => 2, + }; + + (primaries, transfer, matrix) + } + }; + + let full_range = match colorimetry.range() { + gst_video::VideoColorRange::Range0_255 => 0x80u8, + gst_video::VideoColorRange::Range16_235 => 0x00u8, + _ => 0x00, + }; + + v.extend(primaries.to_be_bytes()); + v.extend(transfer.to_be_bytes()); + v.extend(matrix.to_be_bytes()); + v.push(full_range); + + Ok(()) + })?; + } + + #[cfg(feature = "v1_18")] + { + if let Ok(cll) = gst_video::VideoContentLightLevel::from_caps(caps) { + write_box(v, b"clli", move |v| { + v.extend((cll.max_content_light_level() as u16).to_be_bytes()); + v.extend((cll.max_frame_average_light_level() as u16).to_be_bytes()); + Ok(()) + })?; + } + + if let Ok(mastering) = gst_video::VideoMasteringDisplayInfo::from_caps(caps) { + write_box(v, b"mdcv", move |v| { + for primary in mastering.display_primaries() { + v.extend(primary.x.to_be_bytes()); + v.extend(primary.y.to_be_bytes()); + } + v.extend(mastering.white_point().x.to_be_bytes()); + v.extend(mastering.white_point().y.to_be_bytes()); + v.extend(mastering.max_display_mastering_luminance().to_be_bytes()); + v.extend(mastering.max_display_mastering_luminance().to_be_bytes()); + Ok(()) + })?; + } + } + + // Write fiel box for codecs that require it + if ["image/jpeg"].contains(&s.name()) { + let interlace_mode = s + .get::<&str>("interlace-mode") + .ok() + .map(gst_video::VideoInterlaceMode::from_string) + .unwrap_or(gst_video::VideoInterlaceMode::Progressive); + let field_order = s + .get::<&str>("field-order") + .ok() + .map(gst_video::VideoFieldOrder::from_string) + .unwrap_or(gst_video::VideoFieldOrder::Unknown); + + write_box(v, b"fiel", move |v| { + let (interlace, field_order) = match interlace_mode { + gst_video::VideoInterlaceMode::Progressive => (1, 0), + gst_video::VideoInterlaceMode::Interleaved + if field_order == gst_video::VideoFieldOrder::TopFieldFirst => + { + (2, 9) + } + gst_video::VideoInterlaceMode::Interleaved => (2, 14), + _ => (0, 0), + }; + + v.push(interlace); + v.push(field_order); + Ok(()) + })?; + } + + // TODO: write btrt bitrate box based on tags + + Ok(()) + })?; + + Ok(()) +} + +fn write_audio_sample_entry( + v: &mut Vec<u8>, + _cfg: &super::HeaderConfiguration, + caps: &gst::CapsRef, +) -> Result<(), Error> { + let s = caps.structure(0).unwrap(); + let fourcc = match s.name() { + "audio/mpeg" => b"mp4a", + "audio/x-alaw" => b"alaw", + "audio/x-mulaw" => b"ulaw", + "audio/x-adpcm" => { + let layout = s.get::<&str>("layout").context("no ADPCM layout field")?; + + match layout { + "g726" => b"ms\x00\x45", + _ => unreachable!(), + } + } + _ => unreachable!(), + }; + + let sample_size = match s.name() { + "audio/x-adpcm" => { + let bitrate = s.get::<i32>("bitrate").context("no ADPCM bitrate field")?; + (bitrate / 8000) as u16 + } + _ => 16u16, + }; + + write_sample_entry_box(v, fourcc, move |v| { + // Reserved + v.extend([0u8; 2 * 4]); + + // Channel count + let channels = u16::try_from(s.get::<i32>("channels").context("no channels")?) + .context("too many channels")?; + v.extend(channels.to_be_bytes()); + + // Sample size + v.extend(sample_size.to_be_bytes()); + + // Pre-defined + v.extend([0u8; 2]); + + // Reserved + v.extend([0u8; 2]); + + // Sample rate + let rate = u16::try_from(s.get::<i32>("rate").context("no rate")?).unwrap_or(0); + v.extend((u32::from(rate) << 16).to_be_bytes()); + + // Codec specific boxes + match s.name() { + "audio/mpeg" => { + let codec_data = s + .get::<&gst::BufferRef>("codec_data") + .context("no codec_data")?; + let map = codec_data + .map_readable() + .context("codec_data not mappable")?; + if map.len() < 2 { + bail!("too small codec_data"); + } + write_esds_aac(v, &map)?; + } + "audio/x-alaw" | "audio/x-mulaw" | "audio/x-adpcm" => { + // Nothing to do here + } + _ => unreachable!(), + } + + // If rate did not fit into 16 bits write a full `srat` box + if rate == 0 { + let rate = s.get::<i32>("rate").context("no rate")?; + // FIXME: This is defined as full box? + write_full_box( + v, + b"srat", + FULL_BOX_VERSION_0, + FULL_BOX_FLAGS_NONE, + move |v| { + v.extend((rate as u32).to_be_bytes()); + Ok(()) + }, + )?; + } + + // TODO: write btrt bitrate box based on tags + + // TODO: chnl box for channel ordering? probably not needed for AAC + + Ok(()) + })?; + + Ok(()) +} + +fn write_esds_aac(v: &mut Vec<u8>, codec_data: &[u8]) -> Result<(), Error> { + let calculate_len = |mut len| { + if len > 260144641 { + bail!("too big descriptor length"); + } + + if len == 0 { + return Ok(([0; 4], 1)); + } + + let mut idx = 0; + let mut lens = [0u8; 4]; + while len > 0 { + lens[idx] = ((if len > 0x7f { 0x80 } else { 0x00 }) | (len & 0x7f)) as u8; + idx += 1; + len >>= 7; + } + + Ok((lens, idx)) + }; + + write_full_box( + v, + b"esds", + FULL_BOX_VERSION_0, + FULL_BOX_FLAGS_NONE, + move |v| { + // Calculate all lengths bottom up + + // Decoder specific info + let decoder_specific_info_len = calculate_len(codec_data.len())?; + + // Decoder config + let decoder_config_len = + calculate_len(13 + 1 + decoder_specific_info_len.1 + codec_data.len())?; + + // SL config + let sl_config_len = calculate_len(1)?; + + // ES descriptor + let es_descriptor_len = calculate_len( + 3 + 1 + + decoder_config_len.1 + + 13 + + 1 + + decoder_specific_info_len.1 + + codec_data.len() + + 1 + + sl_config_len.1 + + 1, + )?; + + // ES descriptor tag + v.push(0x03); + + // Length + v.extend_from_slice(&es_descriptor_len.0[..(es_descriptor_len.1)]); + + // Track ID + v.extend(1u16.to_be_bytes()); + // Flags + v.push(0u8); + + // Decoder config descriptor + v.push(0x04); + + // Length + v.extend_from_slice(&decoder_config_len.0[..(decoder_config_len.1)]); + + // Object type ESDS_OBJECT_TYPE_MPEG4_P3 + v.push(0x40); + // Stream type ESDS_STREAM_TYPE_AUDIO + v.push((0x05 << 2) | 0x01); + + // Buffer size db? + v.extend([0u8; 3]); + + // Max bitrate + v.extend(0u32.to_be_bytes()); + + // Avg bitrate + v.extend(0u32.to_be_bytes()); + + // Decoder specific info + v.push(0x05); + + // Length + v.extend_from_slice(&decoder_specific_info_len.0[..(decoder_specific_info_len.1)]); + v.extend_from_slice(codec_data); + + // SL config descriptor + v.push(0x06); + + // Length: 1 (tag) + 1 (length) + 1 (predefined) + v.extend_from_slice(&sl_config_len.0[..(sl_config_len.1)]); + + // Predefined + v.push(0x02); + Ok(()) + }, + ) +} + +fn write_xml_meta_data_sample_entry( + v: &mut Vec<u8>, + _cfg: &super::HeaderConfiguration, + caps: &gst::CapsRef, +) -> Result<(), Error> { + let s = caps.structure(0).unwrap(); + let namespace = match s.name() { + "application/x-onvif-metadata" => b"http://www.onvif.org/ver10/schema", + _ => unreachable!(), + }; + + write_sample_entry_box(v, b"metx", move |v| { + // content_encoding, empty string + v.push(0); + + // namespace + v.extend_from_slice(namespace); + v.push(0); + + // schema_location, empty string list + v.push(0); + + Ok(()) + })?; + + Ok(()) +} + +fn write_stts(v: &mut Vec<u8>, _cfg: &super::HeaderConfiguration) -> Result<(), Error> { + // Entry count + v.extend(0u32.to_be_bytes()); + + Ok(()) +} + +fn write_stsc(v: &mut Vec<u8>, _cfg: &super::HeaderConfiguration) -> Result<(), Error> { + // Entry count + v.extend(0u32.to_be_bytes()); + + Ok(()) +} + +fn write_stsz(v: &mut Vec<u8>, _cfg: &super::HeaderConfiguration) -> Result<(), Error> { + // Sample size + v.extend(0u32.to_be_bytes()); + + // Sample count + v.extend(0u32.to_be_bytes()); + + Ok(()) +} + +fn write_stco(v: &mut Vec<u8>, _cfg: &super::HeaderConfiguration) -> Result<(), Error> { + // Entry count + v.extend(0u32.to_be_bytes()); + + Ok(()) +} + +fn write_stss(v: &mut Vec<u8>, _cfg: &super::HeaderConfiguration) -> Result<(), Error> { + // Entry count + v.extend(0u32.to_be_bytes()); + + Ok(()) +} + +fn write_mvex(v: &mut Vec<u8>, cfg: &super::HeaderConfiguration) -> Result<(), Error> { + if cfg.write_mehd { + if cfg.update && cfg.duration.is_some() { + write_full_box(v, b"mehd", FULL_BOX_VERSION_1, FULL_BOX_FLAGS_NONE, |v| { + write_mehd(v, cfg) + })?; + } else { + write_box(v, b"free", |v| { + // version/flags of full box + v.extend(0u32.to_be_bytes()); + // mehd duration + v.extend(0u64.to_be_bytes()); + + Ok(()) + })?; + } + } + + for (idx, _caps) in cfg.streams.iter().enumerate() { + write_full_box(v, b"trex", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| { + write_trex(v, cfg, idx) + })?; + } + + Ok(()) +} + +fn write_mehd(v: &mut Vec<u8>, cfg: &super::HeaderConfiguration) -> Result<(), Error> { + // Use the reference track timescale + let timescale = caps_to_timescale(&cfg.streams[0]); + + let duration = cfg + .duration + .expect("no duration") + .mul_div_ceil(timescale as u64, gst::ClockTime::SECOND.nseconds()) + .context("too long duration")?; + + // Media duration in mvhd.timescale units + v.extend(duration.to_be_bytes()); + + Ok(()) +} + +fn write_trex(v: &mut Vec<u8>, _cfg: &super::HeaderConfiguration, idx: usize) -> Result<(), Error> { + // Track ID + v.extend((idx as u32 + 1).to_be_bytes()); + + // Default sample description index + v.extend(1u32.to_be_bytes()); + + // Default sample duration + v.extend(0u32.to_be_bytes()); + + // Default sample size + v.extend(0u32.to_be_bytes()); + + // Default sample flags + v.extend(0u32.to_be_bytes()); + + // Default sample duration/size/etc will be provided in the traf/trun if one can be determined + // for a whole fragment + + Ok(()) +} + +/// Creates `styp` and `moof` boxes and `mdat` header +pub(super) fn create_fmp4_fragment_header( + cfg: super::FragmentHeaderConfiguration, +) -> Result<(gst::Buffer, u64), Error> { + let mut v = vec![]; + + let (brand, compatible_brands) = + brands_from_variant_and_caps(cfg.variant, cfg.streams.iter().map(|s| &s.0)); + + write_box(&mut v, b"styp", |v| { + // major brand + v.extend(brand); + // minor version + v.extend(0u32.to_be_bytes()); + // compatible brands + v.extend(compatible_brands.into_iter().flatten()); + + Ok(()) + })?; + + let styp_len = v.len(); + + let data_offset_offsets = write_box(&mut v, b"moof", |v| write_moof(v, &cfg))?; + + let size = cfg + .buffers + .iter() + .map(|buffer| buffer.buffer.size() as u64) + .sum::<u64>(); + if let Ok(size) = u32::try_from(size + 8) { + v.extend(size.to_be_bytes()); + v.extend(b"mdat"); + } else { + v.extend(1u32.to_be_bytes()); + v.extend(b"mdat"); + v.extend((size + 16).to_be_bytes()); + } + + let data_offset = v.len() - styp_len; + for data_offset_offset in data_offset_offsets { + let val = u32::from_be_bytes(v[data_offset_offset..][..4].try_into()?) + .checked_add(u32::try_from(data_offset)?) + .ok_or_else(|| anyhow!("can't calculate track run data offset"))?; + v[data_offset_offset..][..4].copy_from_slice(&val.to_be_bytes()); + } + + Ok((gst::Buffer::from_mut_slice(v), styp_len as u64)) +} + +fn write_moof( + v: &mut Vec<u8>, + cfg: &super::FragmentHeaderConfiguration, +) -> Result<Vec<usize>, Error> { + write_full_box(v, b"mfhd", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| { + write_mfhd(v, cfg) + })?; + + let mut data_offset_offsets = vec![]; + for (idx, (caps, timing_info)) in cfg.streams.iter().enumerate() { + // Skip tracks without any buffers for this fragment. + let timing_info = match timing_info { + None => continue, + Some(ref timing_info) => timing_info, + }; + + write_box(v, b"traf", |v| { + write_traf(v, cfg, &mut data_offset_offsets, idx, caps, timing_info) + })?; + } + + Ok(data_offset_offsets) +} + +fn write_mfhd(v: &mut Vec<u8>, cfg: &super::FragmentHeaderConfiguration) -> Result<(), Error> { + v.extend(cfg.sequence_number.to_be_bytes()); + + Ok(()) +} + +#[allow(clippy::identity_op)] +fn sample_flags_from_buffer( + timing_info: &super::FragmentTimingInfo, + buffer: &gst::BufferRef, +) -> u32 { + if timing_info.intra_only { + (0b00u32 << (16 + 10)) | // leading: unknown + (0b10u32 << (16 + 8)) | // depends: no + (0b10u32 << (16 + 6)) | // depended: no + (0b00u32 << (16 + 4)) | // redundancy: unknown + (0b000u32 << (16 + 1)) | // padding: no + (0b0u32 << 16) | // non-sync-sample: no + (0u32) // degradation priority + } else { + let depends = if buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) { + 0b01u32 + } else { + 0b10u32 + }; + let depended = if buffer.flags().contains(gst::BufferFlags::DROPPABLE) { + 0b10u32 + } else { + 0b00u32 + }; + let non_sync_sample = if buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) { + 0b1u32 + } else { + 0b0u32 + }; + + (0b00u32 << (16 + 10)) | // leading: unknown + (depends << (16 + 8)) | // depends + (depended << (16 + 6)) | // depended + (0b00u32 << (16 + 4)) | // redundancy: unknown + (0b000u32 << (16 + 1)) | // padding: no + (non_sync_sample << 16) | // non-sync-sample + (0u32) // degradation priority + } +} + +const DEFAULT_SAMPLE_DURATION_PRESENT: u32 = 0x08; +const DEFAULT_SAMPLE_SIZE_PRESENT: u32 = 0x10; +const DEFAULT_SAMPLE_FLAGS_PRESENT: u32 = 0x20; +const DEFAULT_BASE_IS_MOOF: u32 = 0x2_00_00; + +const DATA_OFFSET_PRESENT: u32 = 0x0_01; +const FIRST_SAMPLE_FLAGS_PRESENT: u32 = 0x0_04; +const SAMPLE_DURATION_PRESENT: u32 = 0x1_00; +const SAMPLE_SIZE_PRESENT: u32 = 0x2_00; +const SAMPLE_FLAGS_PRESENT: u32 = 0x4_00; +const SAMPLE_COMPOSITION_TIME_OFFSET_PRESENT: u32 = 0x8_00; + +#[allow(clippy::type_complexity)] +fn analyze_buffers( + cfg: &super::FragmentHeaderConfiguration, + idx: usize, + timing_info: &super::FragmentTimingInfo, + timescale: u32, +) -> Result< + ( + // tf_flags + u32, + // tr_flags + u32, + // default size + Option<u32>, + // default duration + Option<u32>, + // default flags + Option<u32>, + // negative composition time offsets + bool, + ), + Error, +> { + let mut tf_flags = DEFAULT_BASE_IS_MOOF; + let mut tr_flags = DATA_OFFSET_PRESENT; + + let mut duration = None; + let mut size = None; + let mut first_buffer_flags = None; + let mut flags = None; + + let mut negative_composition_time_offsets = false; + + for Buffer { + idx: _idx, + buffer, + timestamp: _timestamp, + duration: sample_duration, + composition_time_offset, + } in cfg.buffers.iter().filter(|b| b.idx == idx) + { + if size.is_none() { + size = Some(buffer.size() as u32); + } + if Some(buffer.size() as u32) != size { + tr_flags |= SAMPLE_SIZE_PRESENT; + } + + let sample_duration = u32::try_from( + sample_duration + .nseconds() + .mul_div_round(timescale as u64, gst::ClockTime::SECOND.nseconds()) + .context("too big sample duration")?, + ) + .context("too big sample duration")?; + + if duration.is_none() { + duration = Some(sample_duration); + } + if Some(sample_duration) != duration { + tr_flags |= SAMPLE_DURATION_PRESENT; + } + + let f = sample_flags_from_buffer(timing_info, buffer); + if first_buffer_flags.is_none() { + first_buffer_flags = Some(f); + } else { + flags = Some(f); + if Some(f) != first_buffer_flags { + tr_flags |= FIRST_SAMPLE_FLAGS_PRESENT; + } + } + + if flags.is_some() && Some(f) != flags { + tr_flags &= !FIRST_SAMPLE_FLAGS_PRESENT; + tr_flags |= SAMPLE_FLAGS_PRESENT; + } + + if let Some(composition_time_offset) = *composition_time_offset { + assert!(!timing_info.intra_only); + if composition_time_offset != 0 { + tr_flags |= SAMPLE_COMPOSITION_TIME_OFFSET_PRESENT; + } + if composition_time_offset < 0 { + negative_composition_time_offsets = true; + } + } + } + + if (tr_flags & SAMPLE_SIZE_PRESENT) == 0 { + tf_flags |= DEFAULT_SAMPLE_SIZE_PRESENT; + } else { + size = None; + } + + if (tr_flags & SAMPLE_DURATION_PRESENT) == 0 { + tf_flags |= DEFAULT_SAMPLE_DURATION_PRESENT; + } else { + duration = None; + } + + // If there is only a single buffer use its flags as default sample flags + // instead of first sample flags. + if flags.is_none() && first_buffer_flags.is_some() { + tr_flags &= !FIRST_SAMPLE_FLAGS_PRESENT; + flags = first_buffer_flags.take(); + } + + if (tr_flags & SAMPLE_FLAGS_PRESENT) == 0 { + tf_flags |= DEFAULT_SAMPLE_FLAGS_PRESENT; + } else { + flags = None; + } + + Ok(( + tf_flags, + tr_flags, + size, + duration, + flags, + negative_composition_time_offsets, + )) +} + +#[allow(clippy::ptr_arg)] +fn write_traf( + v: &mut Vec<u8>, + cfg: &super::FragmentHeaderConfiguration, + data_offset_offsets: &mut Vec<usize>, + idx: usize, + caps: &gst::CapsRef, + timing_info: &super::FragmentTimingInfo, +) -> Result<(), Error> { + let timescale = caps_to_timescale(caps); + + // Analyze all buffers to know what values can be put into the tfhd for all samples and what + // has to be stored for every single sample + let ( + tf_flags, + tr_flags, + default_size, + default_duration, + default_flags, + negative_composition_time_offsets, + ) = analyze_buffers(cfg, idx, timing_info, timescale)?; + + assert!((tf_flags & DEFAULT_SAMPLE_SIZE_PRESENT == 0) ^ default_size.is_some()); + assert!((tf_flags & DEFAULT_SAMPLE_DURATION_PRESENT == 0) ^ default_duration.is_some()); + assert!((tf_flags & DEFAULT_SAMPLE_FLAGS_PRESENT == 0) ^ default_flags.is_some()); + + write_full_box(v, b"tfhd", FULL_BOX_VERSION_0, tf_flags, |v| { + write_tfhd(v, cfg, idx, default_size, default_duration, default_flags) + })?; + write_full_box(v, b"tfdt", FULL_BOX_VERSION_1, FULL_BOX_FLAGS_NONE, |v| { + write_tfdt(v, cfg, idx, timing_info, timescale) + })?; + + let mut current_data_offset = 0; + + for run in GroupBy::new(cfg.buffers, |a: &Buffer, b: &Buffer| a.idx == b.idx) { + if run[0].idx != idx { + // FIXME: What to do with >4GB offsets? + current_data_offset = (current_data_offset as u64 + + run.iter().map(|b| b.buffer.size() as u64).sum::<u64>()) + .try_into()?; + continue; + } + + let data_offset_offset = write_full_box( + v, + b"trun", + if negative_composition_time_offsets { + FULL_BOX_VERSION_1 + } else { + FULL_BOX_VERSION_0 + }, + tr_flags, + |v| { + write_trun( + v, + cfg, + current_data_offset, + tr_flags, + timescale, + timing_info, + run, + ) + }, + )?; + data_offset_offsets.push(data_offset_offset); + + // FIXME: What to do with >4GB offsets? + current_data_offset = (current_data_offset as u64 + + run.iter().map(|b| b.buffer.size() as u64).sum::<u64>()) + .try_into()?; + } + + // TODO: saio, saiz, sbgp, sgpd, subs? + + Ok(()) +} + +fn write_tfhd( + v: &mut Vec<u8>, + _cfg: &super::FragmentHeaderConfiguration, + idx: usize, + default_size: Option<u32>, + default_duration: Option<u32>, + default_flags: Option<u32>, +) -> Result<(), Error> { + // Track ID + v.extend((idx as u32 + 1).to_be_bytes()); + + // No base data offset, no sample description index + + if let Some(default_duration) = default_duration { + v.extend(default_duration.to_be_bytes()); + } + + if let Some(default_size) = default_size { + v.extend(default_size.to_be_bytes()); + } + + if let Some(default_flags) = default_flags { + v.extend(default_flags.to_be_bytes()); + } + + Ok(()) +} + +fn write_tfdt( + v: &mut Vec<u8>, + _cfg: &super::FragmentHeaderConfiguration, + _idx: usize, + timing_info: &super::FragmentTimingInfo, + timescale: u32, +) -> Result<(), Error> { + let base_time = timing_info + .start_time + .mul_div_floor(timescale as u64, gst::ClockTime::SECOND.nseconds()) + .context("base time overflow")?; + + v.extend(base_time.to_be_bytes()); + + Ok(()) +} + +#[allow(clippy::too_many_arguments)] +fn write_trun( + v: &mut Vec<u8>, + _cfg: &super::FragmentHeaderConfiguration, + current_data_offset: u32, + tr_flags: u32, + timescale: u32, + timing_info: &super::FragmentTimingInfo, + buffers: &[Buffer], +) -> Result<usize, Error> { + // Sample count + v.extend((buffers.len() as u32).to_be_bytes()); + + let data_offset_offset = v.len(); + // Data offset, will be rewritten later + v.extend(current_data_offset.to_be_bytes()); + + if (tr_flags & FIRST_SAMPLE_FLAGS_PRESENT) != 0 { + v.extend(sample_flags_from_buffer(timing_info, &buffers[0].buffer).to_be_bytes()); + } + + for Buffer { + idx: _idx, + ref buffer, + timestamp: _timestamp, + duration, + composition_time_offset, + } in buffers.iter() + { + if (tr_flags & SAMPLE_DURATION_PRESENT) != 0 { + // Sample duration + let sample_duration = u32::try_from( + duration + .nseconds() + .mul_div_round(timescale as u64, gst::ClockTime::SECOND.nseconds()) + .context("too big sample duration")?, + ) + .context("too big sample duration")?; + v.extend(sample_duration.to_be_bytes()); + } + + if (tr_flags & SAMPLE_SIZE_PRESENT) != 0 { + // Sample size + v.extend((buffer.size() as u32).to_be_bytes()); + } + + if (tr_flags & SAMPLE_FLAGS_PRESENT) != 0 { + assert!((tr_flags & FIRST_SAMPLE_FLAGS_PRESENT) == 0); + + // Sample flags + v.extend(sample_flags_from_buffer(timing_info, buffer).to_be_bytes()); + } + + if (tr_flags & SAMPLE_COMPOSITION_TIME_OFFSET_PRESENT) != 0 { + // Sample composition time offset + let composition_time_offset = i32::try_from( + composition_time_offset + .unwrap_or(0) + .mul_div_round(timescale as i64, gst::ClockTime::SECOND.nseconds() as i64) + .context("too big composition time offset")?, + ) + .context("too big composition time offset")?; + v.extend(composition_time_offset.to_be_bytes()); + } + } + + Ok(data_offset_offset) +} + +/// Creates `mfra` box +pub(crate) fn create_mfra( + caps: &gst::CapsRef, + fragment_offsets: &[super::FragmentOffset], +) -> Result<gst::Buffer, Error> { + let timescale = caps_to_timescale(caps); + + let mut v = vec![]; + + let offset = write_box(&mut v, b"mfra", |v| { + write_full_box(v, b"tfra", FULL_BOX_VERSION_1, FULL_BOX_FLAGS_NONE, |v| { + // Track ID + v.extend(1u32.to_be_bytes()); + + // Reserved / length of traf/trun/sample + v.extend(0u32.to_be_bytes()); + + // Number of entries + v.extend( + u32::try_from(fragment_offsets.len()) + .context("too many fragments")? + .to_be_bytes(), + ); + + for super::FragmentOffset { time, offset } in fragment_offsets { + // Time + let time = time + .nseconds() + .mul_div_round(timescale as u64, gst::ClockTime::SECOND.nseconds()) + .context("time overflow")?; + v.extend(time.to_be_bytes()); + + // moof offset + v.extend(offset.to_be_bytes()); + + // traf/trun/sample number + v.extend_from_slice(&[1u8; 3][..]); + } + + Ok(()) + })?; + + let offset = write_full_box(v, b"mfro", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| { + let offset = v.len(); + // Parent size + v.extend(0u32.to_be_bytes()); + Ok(offset) + })?; + + Ok(offset) + })?; + + let len = u32::try_from(v.len() as u64).context("too big mfra")?; + v[offset..][..4].copy_from_slice(&len.to_be_bytes()); + + Ok(gst::Buffer::from_mut_slice(v)) +} + +// Copy from std while this is still nightly-only +use std::fmt; + +/// An iterator over slice in (non-overlapping) chunks separated by a predicate. +/// +/// This struct is created by the [`group_by`] method on [slices]. +/// +/// [`group_by`]: slice::group_by +/// [slices]: slice +struct GroupBy<'a, T: 'a, P> { + slice: &'a [T], + predicate: P, +} + +impl<'a, T: 'a, P> GroupBy<'a, T, P> { + fn new(slice: &'a [T], predicate: P) -> Self { + GroupBy { slice, predicate } + } +} + +impl<'a, T: 'a, P> Iterator for GroupBy<'a, T, P> +where + P: FnMut(&T, &T) -> bool, +{ + type Item = &'a [T]; + + #[inline] + fn next(&mut self) -> Option<Self::Item> { + if self.slice.is_empty() { + None + } else { + let mut len = 1; + let mut iter = self.slice.windows(2); + while let Some([l, r]) = iter.next() { + if (self.predicate)(l, r) { + len += 1 + } else { + break; + } + } + let (head, tail) = self.slice.split_at(len); + self.slice = tail; + Some(head) + } + } + + #[inline] + fn size_hint(&self) -> (usize, Option<usize>) { + if self.slice.is_empty() { + (0, Some(0)) + } else { + (1, Some(self.slice.len())) + } + } + + #[inline] + fn last(mut self) -> Option<Self::Item> { + self.next_back() + } +} + +impl<'a, T: 'a, P> DoubleEndedIterator for GroupBy<'a, T, P> +where + P: FnMut(&T, &T) -> bool, +{ + #[inline] + fn next_back(&mut self) -> Option<Self::Item> { + if self.slice.is_empty() { + None + } else { + let mut len = 1; + let mut iter = self.slice.windows(2); + while let Some([l, r]) = iter.next_back() { + if (self.predicate)(l, r) { + len += 1 + } else { + break; + } + } + let (head, tail) = self.slice.split_at(self.slice.len() - len); + self.slice = head; + Some(tail) + } + } +} + +impl<'a, T: 'a, P> std::iter::FusedIterator for GroupBy<'a, T, P> where P: FnMut(&T, &T) -> bool {} + +impl<'a, T: 'a + fmt::Debug, P> fmt::Debug for GroupBy<'a, T, P> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("GroupBy") + .field("slice", &self.slice) + .finish() + } +} diff --git a/mux/fmp4/src/fmp4mux/imp.rs b/mux/fmp4/src/fmp4mux/imp.rs new file mode 100644 index 000000000..a7a8932dc --- /dev/null +++ b/mux/fmp4/src/fmp4mux/imp.rs @@ -0,0 +1,2596 @@ +// Copyright (C) 2021 Sebastian Dröge <sebastian@centricular.com> +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// <https://mozilla.org/MPL/2.0/>. +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::glib; +use gst::prelude::*; +use gst::subclass::prelude::*; +use gst_base::prelude::*; +use gst_base::subclass::prelude::*; + +use std::collections::VecDeque; +use std::sync::Mutex; + +use once_cell::sync::Lazy; + +use super::boxes; +use super::Buffer; + +/// Offset for the segment in non-single-stream variants. +const SEGMENT_OFFSET: gst::ClockTime = gst::ClockTime::from_seconds(60 * 60 * 1000); + +/// Offset between UNIX epoch and Jan 1 1601 epoch in seconds. +/// 1601 = UNIX + UNIX_1601_OFFSET. +const UNIX_1601_OFFSET: u64 = 11_644_473_600; + +/// Offset between NTP and UNIX epoch in seconds. +/// NTP = UNIX + NTP_UNIX_OFFSET. +const NTP_UNIX_OFFSET: u64 = 2_208_988_800; + +/// Reference timestamp meta caps for NTP timestamps. +static NTP_CAPS: Lazy<gst::Caps> = Lazy::new(|| gst::Caps::builder("timestamp/x-ntp").build()); + +/// Reference timestamp meta caps for UNIX timestamps. +static UNIX_CAPS: Lazy<gst::Caps> = Lazy::new(|| gst::Caps::builder("timestamp/x-unix").build()); + +/// Returns the UTC time of the buffer in the UNIX epoch. +fn get_utc_time_from_buffer(buffer: &gst::BufferRef) -> Option<gst::ClockTime> { + buffer + .iter_meta::<gst::ReferenceTimestampMeta>() + .find_map(|meta| { + if meta.reference().can_intersect(&UNIX_CAPS) { + Some(meta.timestamp()) + } else if meta.reference().can_intersect(&NTP_CAPS) { + meta.timestamp().checked_sub(NTP_UNIX_OFFSET.seconds()) + } else { + None + } + }) +} + +static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| { + gst::DebugCategory::new( + "fmp4mux", + gst::DebugColorFlags::empty(), + Some("FMP4Mux Element"), + ) +}); + +const DEFAULT_FRAGMENT_DURATION: gst::ClockTime = gst::ClockTime::from_seconds(10); +const DEFAULT_HEADER_UPDATE_MODE: super::HeaderUpdateMode = super::HeaderUpdateMode::None; +const DEFAULT_WRITE_MFRA: bool = false; +const DEFAULT_WRITE_MEHD: bool = false; +const DEFAULT_INTERLEAVE_BYTES: Option<u64> = None; +const DEFAULT_INTERLEAVE_TIME: Option<gst::ClockTime> = Some(gst::ClockTime::from_mseconds(250)); + +#[derive(Debug, Clone)] +struct Settings { + fragment_duration: gst::ClockTime, + header_update_mode: super::HeaderUpdateMode, + write_mfra: bool, + write_mehd: bool, + interleave_bytes: Option<u64>, + interleave_time: Option<gst::ClockTime>, +} + +impl Default for Settings { + fn default() -> Self { + Settings { + fragment_duration: DEFAULT_FRAGMENT_DURATION, + header_update_mode: DEFAULT_HEADER_UPDATE_MODE, + write_mfra: DEFAULT_WRITE_MFRA, + write_mehd: DEFAULT_WRITE_MEHD, + interleave_bytes: DEFAULT_INTERLEAVE_BYTES, + interleave_time: DEFAULT_INTERLEAVE_TIME, + } + } +} + +#[derive(Debug)] +struct GopBuffer { + buffer: gst::Buffer, + pts: gst::ClockTime, + dts: Option<gst::ClockTime>, +} + +#[derive(Debug)] +struct Gop { + // Running times + start_pts: gst::ClockTime, + start_dts: Option<gst::ClockTime>, + earliest_pts: gst::ClockTime, + // Once this is known to be the final earliest PTS/DTS + final_earliest_pts: bool, + // PTS plus duration of last buffer, or start of next GOP + end_pts: gst::ClockTime, + // Once this is known to be the final end PTS/DTS + final_end_pts: bool, + // DTS plus duration of last buffer, or start of next GOP + end_dts: Option<gst::ClockTime>, + + // Buffer positions + earliest_pts_position: gst::ClockTime, + start_dts_position: Option<gst::ClockTime>, + + // Buffer, PTS running time, DTS running time + buffers: Vec<GopBuffer>, +} + +struct Stream { + sinkpad: gst_base::AggregatorPad, + + caps: gst::Caps, + intra_only: bool, + + queued_gops: VecDeque<Gop>, + fragment_filled: bool, + + // Difference between the first DTS and 0 in case of negative DTS + dts_offset: Option<gst::ClockTime>, + + // Current position (DTS, or PTS for intra-only) to prevent + // timestamps from going backwards when queueing new buffers + current_position: gst::ClockTime, + + // Current UTC time in ONVIF mode to prevent timestamps from + // going backwards when draining a fragment. + // UNIX epoch. + current_utc_time: gst::ClockTime, +} + +#[derive(Default)] +struct State { + streams: Vec<Stream>, + + // Created once we received caps and kept up to date with the caps, + // sent as part of the buffer list for the first fragment. + stream_header: Option<gst::Buffer>, + + sequence_number: u32, + + // Fragment tracking for mfra + current_offset: u64, + fragment_offsets: Vec<super::FragmentOffset>, + + // Start / end PTS of the whole stream + earliest_pts: Option<gst::ClockTime>, + end_pts: Option<gst::ClockTime>, + + // Start PTS of the current fragment + fragment_start_pts: Option<gst::ClockTime>, + // Additional timeout delay in case GOPs are bigger than the fragment duration + timeout_delay: gst::ClockTime, + + // In ONVIF mode the UTC time corresponding to the beginning of the stream + // UNIX epoch. + start_utc_time: Option<gst::ClockTime>, + end_utc_time: Option<gst::ClockTime>, + + sent_headers: bool, +} + +#[derive(Default)] +pub(crate) struct FMP4Mux { + state: Mutex<State>, + settings: Mutex<Settings>, +} + +impl FMP4Mux { + fn find_earliest_stream<'a>( + &self, + state: &'a mut State, + timeout: bool, + ) -> Result<Option<(usize, &'a mut Stream)>, gst::FlowError> { + let mut earliest_stream = None; + let mut all_have_data_or_eos = true; + + for (idx, stream) in state.streams.iter_mut().enumerate() { + let buffer = match stream.sinkpad.peek_buffer() { + Some(buffer) => buffer, + None => { + if stream.sinkpad.is_eos() { + gst::trace!(CAT, obj: &stream.sinkpad, "Stream is EOS"); + } else { + all_have_data_or_eos = false; + gst::trace!(CAT, obj: &stream.sinkpad, "Stream has no buffer"); + } + continue; + } + }; + + if stream.fragment_filled { + gst::trace!(CAT, obj: &stream.sinkpad, "Stream has current fragment filled"); + continue; + } + + let segment = match stream + .sinkpad + .segment() + .clone() + .downcast::<gst::ClockTime>() + .ok() + { + Some(segment) => segment, + None => { + gst::error!(CAT, obj: &stream.sinkpad, "Got buffer before segment"); + return Err(gst::FlowError::Error); + } + }; + + // If the stream has no valid running time, assume it's before everything else. + let running_time = match segment.to_running_time(buffer.dts_or_pts()) { + None => { + gst::trace!(CAT, obj: &stream.sinkpad, "Stream has no valid running time"); + if earliest_stream + .as_ref() + .map_or(true, |(_, _, earliest_running_time)| { + *earliest_running_time > gst::ClockTime::ZERO + }) + { + earliest_stream = Some((idx, stream, gst::ClockTime::ZERO)); + } + continue; + } + Some(running_time) => running_time, + }; + + gst::trace!(CAT, obj: &stream.sinkpad, "Stream has running time {} queued", running_time); + + if earliest_stream + .as_ref() + .map_or(true, |(_idx, _stream, earliest_running_time)| { + *earliest_running_time > running_time + }) + { + earliest_stream = Some((idx, stream, running_time)); + } + } + + if !timeout && !all_have_data_or_eos { + gst::trace!( + CAT, + imp: self, + "No timeout and not all streams have a buffer or are EOS" + ); + Ok(None) + } else if let Some((idx, stream, earliest_running_time)) = earliest_stream { + gst::trace!( + CAT, + imp: self, + "Stream {} is earliest stream with running time {}", + stream.sinkpad.name(), + earliest_running_time + ); + Ok(Some((idx, stream))) + } else { + gst::trace!(CAT, imp: self, "No streams have data queued currently"); + Ok(None) + } + } + + // Queue incoming buffers as individual GOPs. + fn queue_gops( + &self, + _idx: usize, + stream: &mut Stream, + segment: &gst::FormattedSegment<gst::ClockTime>, + mut buffer: gst::Buffer, + ) -> Result<(), gst::FlowError> { + use gst::Signed::*; + + assert!(!stream.fragment_filled); + + gst::trace!(CAT, obj: &stream.sinkpad, "Handling buffer {:?}", buffer); + + let intra_only = stream.intra_only; + + if !intra_only && buffer.dts().is_none() { + gst::error!(CAT, obj: &stream.sinkpad, "Require DTS for video streams"); + return Err(gst::FlowError::Error); + } + + if intra_only && buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) { + gst::error!(CAT, obj: &stream.sinkpad, "Intra-only stream with delta units"); + return Err(gst::FlowError::Error); + } + + let pts_position = buffer.pts().ok_or_else(|| { + gst::error!(CAT, obj: &stream.sinkpad, "Require timestamped buffers"); + gst::FlowError::Error + })?; + let duration = buffer.duration(); + let end_pts_position = duration.opt_add(pts_position).unwrap_or(pts_position); + + let mut pts = segment + .to_running_time_full(pts_position) + .ok_or_else(|| { + gst::error!(CAT, obj: &stream.sinkpad, "Couldn't convert PTS to running time"); + gst::FlowError::Error + })? + .positive_or_else(|_| { + gst::error!(CAT, obj: &stream.sinkpad, "Negative PTSs are not supported"); + gst::FlowError::Error + })?; + + let mut end_pts = segment + .to_running_time_full(end_pts_position) + .ok_or_else(|| { + gst::error!(CAT, obj: &stream.sinkpad, "Couldn't convert end PTS to running time"); + gst::FlowError::Error + })? + .positive_or_else(|_| { + gst::error!(CAT, obj: &stream.sinkpad, "Negative PTSs are not supported"); + gst::FlowError::Error + })?; + + // Enforce monotonically increasing PTS for intra-only streams + if intra_only { + if pts < stream.current_position { + gst::warning!( + CAT, + obj: &stream.sinkpad, + "Decreasing PTS {} < {} for intra-only stream", + pts, + stream.current_position, + ); + pts = stream.current_position; + } else { + stream.current_position = pts; + } + end_pts = std::cmp::max(end_pts, pts); + } + + let (dts_position, dts, end_dts) = if intra_only { + (None, None, None) + } else { + // Negative DTS are handled via the dts_offset and by having negative composition time + // offsets in the `trun` box. The smallest DTS here is shifted to zero. + let dts_position = buffer.dts().expect("not DTS"); + let end_dts_position = duration.opt_add(dts_position).unwrap_or(dts_position); + + let signed_dts = segment.to_running_time_full(dts_position).ok_or_else(|| { + gst::error!(CAT, obj: &stream.sinkpad, "Couldn't convert DTS to running time"); + gst::FlowError::Error + })?; + let mut dts = match signed_dts { + Positive(dts) => { + if let Some(dts_offset) = stream.dts_offset { + dts + dts_offset + } else { + dts + } + } + Negative(dts) => { + if stream.dts_offset.is_none() { + stream.dts_offset = Some(dts); + } + + let dts_offset = stream.dts_offset.unwrap(); + if dts > dts_offset { + gst::warning!(CAT, obj: &stream.sinkpad, "DTS before first DTS"); + gst::ClockTime::ZERO + } else { + dts_offset - dts + } + } + }; + + let signed_end_dts = + segment + .to_running_time_full(end_dts_position) + .ok_or_else(|| { + gst::error!( + CAT, + obj: &stream.sinkpad, + "Couldn't convert end DTS to running time" + ); + gst::FlowError::Error + })?; + let mut end_dts = match signed_end_dts { + Positive(dts) => { + if let Some(dts_offset) = stream.dts_offset { + dts + dts_offset + } else { + dts + } + } + Negative(dts) => { + if stream.dts_offset.is_none() { + stream.dts_offset = Some(dts); + } + + let dts_offset = stream.dts_offset.unwrap(); + if dts > dts_offset { + gst::warning!(CAT, obj: &stream.sinkpad, "End DTS before first DTS"); + gst::ClockTime::ZERO + } else { + dts_offset - dts + } + } + }; + + // Enforce monotonically increasing DTS for intra-only streams + // NOTE: PTS stays the same so this will cause a bigger PTS/DTS difference + // FIXME: Is this correct? + if dts < stream.current_position { + gst::warning!( + CAT, + obj: &stream.sinkpad, + "Decreasing DTS {} < {}", + dts, + stream.current_position, + ); + dts = stream.current_position; + } else { + stream.current_position = dts; + } + end_dts = std::cmp::max(end_dts, dts); + + (Some(dts_position), Some(dts), Some(end_dts)) + }; + + // If this is a multi-stream element then we need to update the PTS/DTS positions according + // to the output segment, specifically to re-timestamp them with the running time and + // adjust for the segment shift to compensate for negative DTS. + let aggregator = self.instance(); + let class = aggregator.class(); + let (pts_position, dts_position) = if class.as_ref().variant.is_single_stream() { + (pts_position, dts_position) + } else { + let pts_position = pts + SEGMENT_OFFSET; + let dts_position = dts.map(|dts| { + dts + SEGMENT_OFFSET - stream.dts_offset.unwrap_or(gst::ClockTime::ZERO) + }); + + let buffer = buffer.make_mut(); + buffer.set_pts(pts_position); + buffer.set_dts(dts_position); + + (pts_position, dts_position) + }; + + if !buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) { + gst::debug!( + CAT, + obj: &stream.sinkpad, + "Starting new GOP at PTS {} DTS {} (DTS offset {})", + pts, + dts.display(), + stream.dts_offset.display(), + ); + + let gop = Gop { + start_pts: pts, + start_dts: dts, + start_dts_position: if intra_only { None } else { dts_position }, + earliest_pts: pts, + earliest_pts_position: pts_position, + final_earliest_pts: intra_only, + end_pts, + end_dts, + final_end_pts: false, + buffers: vec![GopBuffer { buffer, pts, dts }], + }; + stream.queued_gops.push_front(gop); + + if let Some(prev_gop) = stream.queued_gops.get_mut(1) { + gst::debug!( + CAT, + obj: &stream.sinkpad, + "Updating previous GOP starting at PTS {} to end PTS {} DTS {}", + prev_gop.earliest_pts, + pts, + dts.display(), + ); + + prev_gop.end_pts = std::cmp::max(prev_gop.end_pts, pts); + prev_gop.end_dts = std::cmp::max(prev_gop.end_dts, dts); + + if intra_only { + prev_gop.final_end_pts = true; + } + + if !prev_gop.final_earliest_pts { + // Don't bother logging this for intra-only streams as it would be for every + // single buffer. + if !intra_only { + gst::debug!( + CAT, + obj: &stream.sinkpad, + "Previous GOP has final earliest PTS at {}", + prev_gop.earliest_pts + ); + } + + prev_gop.final_earliest_pts = true; + if let Some(prev_prev_gop) = stream.queued_gops.get_mut(2) { + prev_prev_gop.final_end_pts = true; + } + } + } + } else if let Some(gop) = stream.queued_gops.front_mut() { + assert!(!intra_only); + + // We require DTS for non-intra-only streams + let dts = dts.unwrap(); + let end_dts = end_dts.unwrap(); + + gop.end_pts = std::cmp::max(gop.end_pts, end_pts); + gop.end_dts = Some(std::cmp::max(gop.end_dts.expect("no end DTS"), end_dts)); + gop.buffers.push(GopBuffer { + buffer, + pts, + dts: Some(dts), + }); + + if gop.earliest_pts > pts && !gop.final_earliest_pts { + gst::debug!( + CAT, + obj: &stream.sinkpad, + "Updating current GOP earliest PTS from {} to {}", + gop.earliest_pts, + pts + ); + gop.earliest_pts = pts; + gop.earliest_pts_position = pts_position; + + if let Some(prev_gop) = stream.queued_gops.get_mut(1) { + if prev_gop.end_pts < pts { + gst::debug!( + CAT, + obj: &stream.sinkpad, + "Updating previous GOP starting PTS {} end time from {} to {}", + pts, + prev_gop.end_pts, + pts + ); + prev_gop.end_pts = pts; + } + } + } + + let gop = stream.queued_gops.front_mut().unwrap(); + + // The earliest PTS is known when the current DTS is bigger or equal to the first + // PTS that was observed in this GOP. If there was another frame later that had a + // lower PTS then it wouldn't be possible to display it in time anymore, i.e. the + // stream would be invalid. + if gop.start_pts <= dts && !gop.final_earliest_pts { + gst::debug!( + CAT, + obj: &stream.sinkpad, + "GOP has final earliest PTS at {}", + gop.earliest_pts + ); + gop.final_earliest_pts = true; + + if let Some(prev_gop) = stream.queued_gops.get_mut(1) { + prev_gop.final_end_pts = true; + } + } + } else { + gst::warning!( + CAT, + obj: &stream.sinkpad, + "Waiting for keyframe at the beginning of the stream" + ); + } + + if let Some((prev_gop, first_gop)) = Option::zip( + stream.queued_gops.iter().find(|gop| gop.final_end_pts), + stream.queued_gops.back(), + ) { + gst::debug!( + CAT, + obj: &stream.sinkpad, + "Queued full GOPs duration updated to {}", + prev_gop.end_pts.saturating_sub(first_gop.earliest_pts), + ); + } + + gst::debug!( + CAT, + obj: &stream.sinkpad, + "Queued duration updated to {}", + Option::zip(stream.queued_gops.front(), stream.queued_gops.back()) + .map(|(end, start)| end.end_pts.saturating_sub(start.start_pts)) + .unwrap_or(gst::ClockTime::ZERO) + ); + + Ok(()) + } + + #[allow(clippy::type_complexity)] + fn drain_buffers( + &self, + state: &mut State, + settings: &Settings, + timeout: bool, + at_eos: bool, + ) -> Result< + ( + // Drained streams + Vec<( + gst::Caps, + Option<super::FragmentTimingInfo>, + VecDeque<Buffer>, + )>, + // Minimum earliest PTS position of all streams + Option<gst::ClockTime>, + // Minimum earliest PTS of all streams + Option<gst::ClockTime>, + // Minimum start DTS position of all streams (if any stream has DTS) + Option<gst::ClockTime>, + // End PTS of this drained fragment, i.e. start PTS of the next fragment + Option<gst::ClockTime>, + ), + gst::FlowError, + > { + let mut drained_streams = Vec::with_capacity(state.streams.len()); + + let mut min_earliest_pts_position = None; + let mut min_earliest_pts = None; + let mut min_start_dts_position = None; + let mut fragment_end_pts = None; + + // The first stream decides how much can be dequeued, if anything at all. + // + // All complete GOPs (or at EOS everything) up to the fragment duration will be dequeued + // but on timeout in live pipelines it might happen that the first stream does not have a + // complete GOP queued. In that case nothing is dequeued for any of the streams and the + // timeout is advanced by 1s until at least one complete GOP can be dequeued. + // + // If the first stream is already EOS then the next stream that is not EOS yet will be + // taken in its place. + let fragment_start_pts = state.fragment_start_pts.unwrap(); + gst::info!( + CAT, + imp: self, + "Starting to drain at {}", + fragment_start_pts + ); + + for (idx, stream) in state.streams.iter_mut().enumerate() { + assert!( + timeout + || at_eos + || stream.sinkpad.is_eos() + || stream.queued_gops.get(1).map(|gop| gop.final_earliest_pts) == Some(true) + ); + + // Drain all complete GOPs until at most one fragment duration was dequeued for the + // first stream, or until the dequeued duration of the first stream. + let mut gops = Vec::with_capacity(stream.queued_gops.len()); + let dequeue_end_pts = + fragment_end_pts.unwrap_or(fragment_start_pts + settings.fragment_duration); + gst::trace!( + CAT, + obj: &stream.sinkpad, + "Draining up to end PTS {} / duration {}", + dequeue_end_pts, + dequeue_end_pts - fragment_start_pts + ); + + while let Some(gop) = stream.queued_gops.back() { + // If this GOP is not complete then we can't pop it yet. + // + // If there was no complete GOP at all yet then it might be bigger than the + // fragment duration. In this case we might not be able to handle the latency + // requirements in a live pipeline. + if !gop.final_end_pts && !at_eos && !stream.sinkpad.is_eos() { + break; + } + + // If this GOP starts after the fragment end then don't dequeue it yet unless this is + // the first stream and no GOPs were dequeued at all yet. This would mean that the + // GOP is bigger than the fragment duration. + if gop.end_pts > dequeue_end_pts && (fragment_end_pts.is_some() || !gops.is_empty()) + { + break; + } + + gops.push(stream.queued_gops.pop_back().unwrap()); + } + stream.fragment_filled = false; + + // If we don't have a next fragment start PTS then this is the first stream as above. + if fragment_end_pts.is_none() { + if let Some(last_gop) = gops.last() { + // Dequeued something so let's take the end PTS of the last GOP + fragment_end_pts = Some(last_gop.end_pts); + gst::info!( + CAT, + obj: &stream.sinkpad, + "Draining up to PTS {} for this fragment", + last_gop.end_pts, + ); + } else { + // If nothing was dequeued for the first stream then this is OK if we're at + // EOS: we just consider the next stream as first stream then. + if at_eos || stream.sinkpad.is_eos() { + // This is handled below generally if nothing was dequeued + } else { + // Otherwise this can only really happen on timeout in live pipelines. + assert!(timeout); + + gst::warning!( + CAT, + obj: &stream.sinkpad, + "Don't have a complete GOP for the first stream on timeout in a live pipeline", + ); + + // In this case we advance the timeout by 1s and hope that things are + // better then. + return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA); + } + } + } + + if gops.is_empty() { + gst::info!( + CAT, + obj: &stream.sinkpad, + "Draining no buffers", + ); + + drained_streams.push((stream.caps.clone(), None, VecDeque::new())); + continue; + } + + assert!(fragment_end_pts.is_some()); + + let first_gop = gops.first().unwrap(); + let last_gop = gops.last().unwrap(); + let earliest_pts = first_gop.earliest_pts; + let earliest_pts_position = first_gop.earliest_pts_position; + let start_dts = first_gop.start_dts; + let start_dts_position = first_gop.start_dts_position; + let end_pts = last_gop.end_pts; + let dts_offset = stream.dts_offset; + + if min_earliest_pts.opt_gt(earliest_pts).unwrap_or(true) { + min_earliest_pts = Some(earliest_pts); + } + if min_earliest_pts_position + .opt_gt(earliest_pts_position) + .unwrap_or(true) + { + min_earliest_pts_position = Some(earliest_pts_position); + } + if let Some(start_dts_position) = start_dts_position { + if min_start_dts_position + .opt_gt(start_dts_position) + .unwrap_or(true) + { + min_start_dts_position = Some(start_dts_position); + } + } + + gst::info!( + CAT, + obj: &stream.sinkpad, + "Draining {} worth of buffers starting at PTS {} DTS {}, DTS offset {}", + end_pts.saturating_sub(earliest_pts), + earliest_pts, + start_dts.display(), + dts_offset.display(), + ); + + if let Some((prev_gop, first_gop)) = Option::zip( + stream.queued_gops.iter().find(|gop| gop.final_end_pts), + stream.queued_gops.back(), + ) { + gst::debug!( + CAT, + obj: &stream.sinkpad, + "Queued full GOPs duration updated to {}", + prev_gop.end_pts.saturating_sub(first_gop.earliest_pts), + ); + } + + gst::debug!( + CAT, + obj: &stream.sinkpad, + "Queued duration updated to {}", + Option::zip(stream.queued_gops.front(), stream.queued_gops.back()) + .map(|(end, start)| end.end_pts.saturating_sub(start.start_pts)) + .unwrap_or(gst::ClockTime::ZERO) + ); + + let start_time = if stream.intra_only { + earliest_pts + } else { + start_dts.unwrap() + }; + + let mut buffers = VecDeque::with_capacity(gops.iter().map(|g| g.buffers.len()).sum()); + + for gop in gops { + let mut gop_buffers = gop.buffers.into_iter().peekable(); + while let Some(buffer) = gop_buffers.next() { + let timestamp = if stream.intra_only { + buffer.pts + } else { + buffer.dts.unwrap() + }; + + let end_timestamp = match gop_buffers.peek() { + Some(buffer) => { + if stream.intra_only { + buffer.pts + } else { + buffer.dts.unwrap() + } + } + None => { + if stream.intra_only { + gop.end_pts + } else { + gop.end_dts.unwrap() + } + } + }; + + // Timestamps are enforced to monotonically increase when queueing buffers + let duration = end_timestamp + .checked_sub(timestamp) + .expect("Timestamps going backwards"); + + let composition_time_offset = if stream.intra_only { + None + } else { + let pts = buffer.pts; + let dts = buffer.dts.unwrap(); + + if pts > dts { + Some( + i64::try_from((pts - dts).nseconds()) + .map_err(|_| { + gst::error!(CAT, obj: &stream.sinkpad, "Too big PTS/DTS difference"); + gst::FlowError::Error + })?, + ) + } else { + let diff = i64::try_from((dts - pts).nseconds()) + .map_err(|_| { + gst::error!(CAT, obj: &stream.sinkpad, "Too big PTS/DTS difference"); + gst::FlowError::Error + })?; + Some(-diff) + } + }; + + buffers.push_back(Buffer { + idx, + buffer: buffer.buffer, + timestamp, + duration, + composition_time_offset, + }); + } + } + + drained_streams.push(( + stream.caps.clone(), + Some(super::FragmentTimingInfo { + start_time, + intra_only: stream.intra_only, + }), + buffers, + )); + } + + Ok(( + drained_streams, + min_earliest_pts_position, + min_earliest_pts, + min_start_dts_position, + fragment_end_pts, + )) + } + + fn preprocess_drained_streams_onvif( + &self, + state: &mut State, + drained_streams: &mut [( + gst::Caps, + Option<super::FragmentTimingInfo>, + VecDeque<Buffer>, + )], + ) -> Result<Option<gst::ClockTime>, gst::FlowError> { + let aggregator = self.instance(); + if aggregator.class().as_ref().variant != super::Variant::ONVIF { + return Ok(None); + } + + let mut max_end_utc_time = None; + + let calculate_pts = |buffer: &Buffer| -> gst::ClockTime { + let composition_time_offset = buffer.composition_time_offset.unwrap_or(0); + if composition_time_offset > 0 { + buffer.timestamp + (composition_time_offset as u64).nseconds() + } else { + buffer + .timestamp + .checked_sub(((-composition_time_offset) as u64).nseconds()) + .unwrap() + } + }; + + // If this is the first fragment then allow the first buffers to not have a reference + // timestamp meta and backdate them + if state.stream_header.is_none() { + for (idx, (_, _, drain_buffers)) in drained_streams.iter_mut().enumerate() { + let (buffer_idx, utc_time, buffer) = + match drain_buffers.iter().enumerate().find_map(|(idx, buffer)| { + get_utc_time_from_buffer(&buffer.buffer) + .map(|timestamp| (idx, timestamp, buffer)) + }) { + None => { + gst::error!( + CAT, + obj: &state.streams[idx].sinkpad, + "No reference timestamp set on any buffers in the first fragment", + ); + return Err(gst::FlowError::Error); + } + Some(res) => res, + }; + + // Now do the backdating + if buffer_idx > 0 { + let utc_time_pts = calculate_pts(buffer); + + for buffer in drain_buffers.iter_mut().take(buffer_idx) { + let buffer_pts = calculate_pts(buffer); + let buffer_pts_diff = if utc_time_pts >= buffer_pts { + (utc_time_pts - buffer_pts).nseconds() as i64 + } else { + -((buffer_pts - utc_time_pts).nseconds() as i64) + }; + let buffer_utc_time = if buffer_pts_diff >= 0 { + utc_time + .checked_sub((buffer_pts_diff as u64).nseconds()) + .unwrap() + } else { + utc_time + .checked_add(((-buffer_pts_diff) as u64).nseconds()) + .unwrap() + }; + + let buffer = buffer.buffer.make_mut(); + gst::ReferenceTimestampMeta::add( + buffer, + &UNIX_CAPS, + buffer_utc_time, + gst::ClockTime::NONE, + ); + } + } + } + } + + // Calculate the minimum across all streams and remember that + if state.start_utc_time.is_none() { + let mut start_utc_time = None; + + for (idx, (_, _, drain_buffers)) in drained_streams.iter().enumerate() { + for buffer in drain_buffers { + let utc_time = match get_utc_time_from_buffer(&buffer.buffer) { + None => { + gst::error!( + CAT, + obj: &state.streams[idx].sinkpad, + "No reference timestamp set on all buffers" + ); + return Err(gst::FlowError::Error); + } + Some(utc_time) => utc_time, + }; + + if start_utc_time.is_none() || start_utc_time > Some(utc_time) { + start_utc_time = Some(utc_time); + } + } + } + + gst::debug!( + CAT, + imp: self, + "Configuring start UTC time {}", + start_utc_time.unwrap() + ); + state.start_utc_time = start_utc_time; + } + + // Update all buffer timestamps based on the UTC time and offset to the start UTC time + let start_utc_time = state.start_utc_time.unwrap(); + for (idx, (_, timing_info, drain_buffers)) in drained_streams.iter_mut().enumerate() { + let mut start_time = None; + + for buffer in drain_buffers.iter_mut() { + let utc_time = match get_utc_time_from_buffer(&buffer.buffer) { + None => { + gst::error!( + CAT, + obj: &state.streams[idx].sinkpad, + "No reference timestamp set on all buffers" + ); + return Err(gst::FlowError::Error); + } + Some(utc_time) => utc_time, + }; + + // Convert PTS UTC time to DTS + let mut utc_time_dts = + if let Some(composition_time_offset) = buffer.composition_time_offset { + if composition_time_offset >= 0 { + utc_time + .checked_sub((composition_time_offset as u64).nseconds()) + .unwrap() + } else { + utc_time + .checked_add(((-composition_time_offset) as u64).nseconds()) + .unwrap() + } + } else { + utc_time + }; + + // Enforce monotonically increasing timestamps + if utc_time_dts < state.streams[idx].current_utc_time { + gst::warning!( + CAT, + obj: &state.streams[idx].sinkpad, + "Decreasing UTC DTS timestamp for buffer {} < {}", + utc_time_dts, + state.streams[idx].current_utc_time, + ); + utc_time_dts = state.streams[idx].current_utc_time; + } else { + state.streams[idx].current_utc_time = utc_time_dts; + } + + let timestamp = utc_time_dts.checked_sub(start_utc_time).unwrap(); + + gst::trace!( + CAT, + obj: &state.streams[idx].sinkpad, + "Updating buffer timestamp from {} to relative UTC DTS time {} / absolute DTS time {}, UTC PTS time {}", + buffer.timestamp, + timestamp, + utc_time_dts, + utc_time, + ); + + buffer.timestamp = timestamp; + if start_time.is_none() || start_time > Some(buffer.timestamp) { + start_time = Some(buffer.timestamp); + } + } + + // Update durations for all buffers except for the last in the fragment unless all + // have the same duration anyway + let mut common_duration = Ok(None); + let mut drain_buffers_iter = drain_buffers.iter_mut().peekable(); + while let Some(buffer) = drain_buffers_iter.next() { + let next_timestamp = drain_buffers_iter.peek().map(|b| b.timestamp); + + if let Some(next_timestamp) = next_timestamp { + let duration = next_timestamp.saturating_sub(buffer.timestamp); + if common_duration == Ok(None) { + common_duration = Ok(Some(duration)); + } else if common_duration != Ok(Some(duration)) { + common_duration = Err(()); + } + + gst::trace!( + CAT, + obj: &state.streams[idx].sinkpad, + "Updating buffer with timestamp {} duration from {} to relative UTC duration {}", + buffer.timestamp, + buffer.duration, + duration, + ); + + buffer.duration = duration; + } else if let Ok(Some(common_duration)) = common_duration { + gst::trace!( + CAT, + obj: &state.streams[idx].sinkpad, + "Updating last buffer with timestamp {} duration from {} to common relative UTC duration {}", + buffer.timestamp, + buffer.duration, + common_duration, + ); + + buffer.duration = common_duration; + } else { + gst::trace!( + CAT, + obj: &state.streams[idx].sinkpad, + "Keeping last buffer with timestamp {} duration at {}", + buffer.timestamp, + buffer.duration, + ); + } + + let end_utc_time = start_utc_time + buffer.timestamp + buffer.duration; + if max_end_utc_time.is_none() || max_end_utc_time < Some(end_utc_time) { + max_end_utc_time = Some(end_utc_time); + } + } + + if let Some(start_time) = start_time { + gst::debug!(CAT, obj: &state.streams[idx].sinkpad, "Fragment starting at UTC time {}", start_time); + timing_info.as_mut().unwrap().start_time = start_time; + } else { + assert!(timing_info.is_none()); + } + } + + Ok(max_end_utc_time) + } + + #[allow(clippy::type_complexity)] + fn interleave_buffers( + &self, + settings: &Settings, + mut drained_streams: Vec<( + gst::Caps, + Option<super::FragmentTimingInfo>, + VecDeque<Buffer>, + )>, + ) -> Result< + ( + Vec<Buffer>, + Vec<(gst::Caps, Option<super::FragmentTimingInfo>)>, + ), + gst::FlowError, + > { + let mut interleaved_buffers = + Vec::with_capacity(drained_streams.iter().map(|(_, _, bufs)| bufs.len()).sum()); + while let Some((_idx, (_, _, bufs))) = drained_streams.iter_mut().enumerate().min_by( + |(a_idx, (_, _, a)), (b_idx, (_, _, b))| { + let (a, b) = match (a.front(), b.front()) { + (None, None) => return std::cmp::Ordering::Equal, + (None, _) => return std::cmp::Ordering::Greater, + (_, None) => return std::cmp::Ordering::Less, + (Some(a), Some(b)) => (a, b), + }; + + match a.timestamp.cmp(&b.timestamp) { + std::cmp::Ordering::Equal => a_idx.cmp(b_idx), + cmp => cmp, + } + }, + ) { + let start_time = match bufs.front() { + None => { + // No more buffers now + break; + } + Some(buf) => buf.timestamp, + }; + let mut current_end_time = start_time; + let mut dequeued_bytes = 0; + + while settings + .interleave_bytes + .opt_ge(dequeued_bytes) + .unwrap_or(true) + && settings + .interleave_time + .opt_ge(current_end_time.saturating_sub(start_time)) + .unwrap_or(true) + { + if let Some(buffer) = bufs.pop_front() { + current_end_time = buffer.timestamp + buffer.duration; + dequeued_bytes += buffer.buffer.size() as u64; + interleaved_buffers.push(buffer); + } else { + // No buffers left in this stream, go to next stream + break; + } + } + } + + // All buffers should be consumed now + assert!(drained_streams.iter().all(|(_, _, bufs)| bufs.is_empty())); + + let streams = drained_streams + .into_iter() + .map(|(caps, timing_info, _)| (caps, timing_info)) + .collect::<Vec<_>>(); + + Ok((interleaved_buffers, streams)) + } + + fn drain( + &self, + state: &mut State, + settings: &Settings, + timeout: bool, + at_eos: bool, + upstream_events: &mut Vec<(gst_base::AggregatorPad, gst::Event)>, + ) -> Result<(Option<gst::Caps>, Option<gst::BufferList>), gst::FlowError> { + if at_eos { + gst::info!(CAT, imp: self, "Draining at EOS"); + } else if timeout { + gst::info!(CAT, imp: self, "Draining at timeout"); + } else { + for stream in &state.streams { + if !stream.fragment_filled && !stream.sinkpad.is_eos() { + return Ok((None, None)); + } + } + } + + // Collect all buffers and their timing information that are to be drained right now. + let ( + mut drained_streams, + min_earliest_pts_position, + min_earliest_pts, + min_start_dts_position, + fragment_end_pts, + ) = self.drain_buffers(state, settings, timeout, at_eos)?; + + // Remove all GAP buffers before processing them further + for (_, _, buffers) in &mut drained_streams { + buffers.retain(|buf| { + !buf.buffer.flags().contains(gst::BufferFlags::GAP) + || !buf.buffer.flags().contains(gst::BufferFlags::DROPPABLE) + || buf.buffer.size() != 0 + }); + } + + // For ONVIF, replace all timestamps with timestamps based on UTC times. + let max_end_utc_time = + self.preprocess_drained_streams_onvif(state, &mut drained_streams)?; + + // Create header now if it was not created before and return the caps + let mut caps = None; + if state.stream_header.is_none() { + let (_, new_caps) = self.update_header(state, settings, false)?.unwrap(); + caps = Some(new_caps); + } + + // Interleave buffers according to the settings into a single vec + let (mut interleaved_buffers, streams) = + self.interleave_buffers(settings, drained_streams)?; + + let mut buffer_list = None; + if interleaved_buffers.is_empty() { + assert!(at_eos); + } else { + // If there are actual buffers to output then create headers as needed and create a + // bufferlist for all buffers that have to be output. + let min_earliest_pts_position = min_earliest_pts_position.unwrap(); + let min_earliest_pts = min_earliest_pts.unwrap(); + let fragment_end_pts = fragment_end_pts.unwrap(); + + let mut fmp4_header = None; + if !state.sent_headers { + let mut buffer = state.stream_header.as_ref().unwrap().copy(); + { + let buffer = buffer.get_mut().unwrap(); + + buffer.set_pts(min_earliest_pts_position); + buffer.set_dts(min_start_dts_position); + + // Header is DISCONT|HEADER + buffer.set_flags(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER); + } + + fmp4_header = Some(buffer); + + state.sent_headers = true; + } + + // TODO: Write prft boxes before moof + // TODO: Write sidx boxes before moof and rewrite once offsets are known + + if state.sequence_number == 0 { + state.sequence_number = 1; + } + let sequence_number = state.sequence_number; + state.sequence_number += 1; + let (mut fmp4_fragment_header, moof_offset) = + boxes::create_fmp4_fragment_header(super::FragmentHeaderConfiguration { + variant: self.instance().class().as_ref().variant, + sequence_number, + streams: streams.as_slice(), + buffers: interleaved_buffers.as_slice(), + }) + .map_err(|err| { + gst::error!( + CAT, + imp: self, + "Failed to create FMP4 fragment header: {}", + err + ); + gst::FlowError::Error + })?; + + { + let buffer = fmp4_fragment_header.get_mut().unwrap(); + buffer.set_pts(min_earliest_pts_position); + buffer.set_dts(min_start_dts_position); + buffer.set_duration(fragment_end_pts.checked_sub(min_earliest_pts)); + + // Fragment header is HEADER + buffer.set_flags(gst::BufferFlags::HEADER); + + // Copy metas from the first actual buffer to the fragment header. This allows + // getting things like the reference timestamp meta or the timecode meta to identify + // the fragment. + let _ = interleaved_buffers[0].buffer.copy_into( + buffer, + gst::BufferCopyFlags::META, + 0, + None, + ); + } + + let moof_offset = state.current_offset + + fmp4_header.as_ref().map(|h| h.size()).unwrap_or(0) as u64 + + moof_offset; + + let buffers_len = interleaved_buffers.len(); + for (idx, buffer) in interleaved_buffers.iter_mut().enumerate() { + // Fix up buffer flags, all other buffers are DELTA_UNIT + let buffer_ref = buffer.buffer.make_mut(); + buffer_ref.unset_flags(gst::BufferFlags::all()); + buffer_ref.set_flags(gst::BufferFlags::DELTA_UNIT); + + // Set the marker flag for the last buffer of the segment + if idx == buffers_len - 1 { + buffer_ref.set_flags(gst::BufferFlags::MARKER); + } + } + + buffer_list = Some( + fmp4_header + .into_iter() + .chain(Some(fmp4_fragment_header)) + .chain(interleaved_buffers.into_iter().map(|buffer| buffer.buffer)) + .inspect(|b| { + state.current_offset += b.size() as u64; + }) + .collect::<gst::BufferList>(), + ); + + // Write mfra only for the main stream, and if there are no buffers for the main stream + // in this segment then don't write anything. + if let Some((_caps, Some(ref timing_info))) = streams.get(0) { + state.fragment_offsets.push(super::FragmentOffset { + time: timing_info.start_time, + offset: moof_offset, + }); + } + + state.end_pts = Some(fragment_end_pts); + state.end_utc_time = max_end_utc_time; + + // Update for the start PTS of the next fragment + gst::info!( + CAT, + imp: self, + "Starting new fragment at {}", + fragment_end_pts, + ); + state.fragment_start_pts = Some(fragment_end_pts); + + gst::debug!( + CAT, + imp: self, + "Sending force-keyunit events for running time {}", + fragment_end_pts + settings.fragment_duration, + ); + + let fku = gst_video::UpstreamForceKeyUnitEvent::builder() + .running_time(fragment_end_pts + settings.fragment_duration) + .all_headers(true) + .build(); + + for stream in &state.streams { + upstream_events.push((stream.sinkpad.clone(), fku.clone())); + } + + // Reset timeout delay now that we've output an actual fragment + state.timeout_delay = gst::ClockTime::ZERO; + } + + if settings.write_mfra && at_eos { + match boxes::create_mfra(&streams[0].0, &state.fragment_offsets) { + Ok(mut mfra) => { + { + let mfra = mfra.get_mut().unwrap(); + // mfra is HEADER|DELTA_UNIT like other boxes + mfra.set_flags(gst::BufferFlags::HEADER | gst::BufferFlags::DELTA_UNIT); + } + + if buffer_list.is_none() { + buffer_list = Some(gst::BufferList::new_sized(1)); + } + buffer_list.as_mut().unwrap().get_mut().unwrap().add(mfra); + } + Err(err) => { + gst::error!(CAT, imp: self, "Failed to create mfra box: {}", err); + } + } + } + + // TODO: Write edit list at EOS + // TODO: Rewrite bitrates at EOS + + Ok((caps, buffer_list)) + } + + fn create_streams(&self, state: &mut State) -> Result<(), gst::FlowError> { + for pad in self + .instance() + .sink_pads() + .into_iter() + .map(|pad| pad.downcast::<gst_base::AggregatorPad>().unwrap()) + { + let caps = match pad.current_caps() { + Some(caps) => caps, + None => { + gst::warning!(CAT, obj: &pad, "Skipping pad without caps"); + continue; + } + }; + + gst::info!(CAT, obj: &pad, "Configuring caps {:?}", caps); + + let s = caps.structure(0).unwrap(); + + let mut intra_only = false; + match s.name() { + "video/x-h264" | "video/x-h265" => { + if !s.has_field_with_type("codec_data", gst::Buffer::static_type()) { + gst::error!(CAT, obj: &pad, "Received caps without codec_data"); + return Err(gst::FlowError::NotNegotiated); + } + } + "image/jpeg" => { + intra_only = true; + } + "audio/mpeg" => { + if !s.has_field_with_type("codec_data", gst::Buffer::static_type()) { + gst::error!(CAT, obj: &pad, "Received caps without codec_data"); + return Err(gst::FlowError::NotNegotiated); + } + intra_only = true; + } + "audio/x-alaw" | "audio/x-mulaw" => { + intra_only = true; + } + "audio/x-adpcm" => { + intra_only = true; + } + "application/x-onvif-metadata" => { + intra_only = true; + } + _ => unreachable!(), + } + + state.streams.push(Stream { + sinkpad: pad, + caps, + intra_only, + queued_gops: VecDeque::new(), + fragment_filled: false, + dts_offset: None, + current_position: gst::ClockTime::ZERO, + current_utc_time: gst::ClockTime::ZERO, + }); + } + + if state.streams.is_empty() { + gst::error!(CAT, imp: self, "No streams available"); + return Err(gst::FlowError::Error); + } + + // Sort video streams first and then audio streams and then metadata streams, and each group by pad name. + state.streams.sort_by(|a, b| { + let order_of_caps = |caps: &gst::CapsRef| { + let s = caps.structure(0).unwrap(); + + if s.name().starts_with("video/") { + 0 + } else if s.name().starts_with("audio/") { + 1 + } else if s.name().starts_with("application/x-onvif-metadata") { + 2 + } else { + unimplemented!(); + } + }; + + let st_a = order_of_caps(&a.caps); + let st_b = order_of_caps(&b.caps); + + if st_a == st_b { + return a.sinkpad.name().cmp(&b.sinkpad.name()); + } + + st_a.cmp(&st_b) + }); + + Ok(()) + } + + fn update_header( + &self, + state: &mut State, + settings: &Settings, + at_eos: bool, + ) -> Result<Option<(gst::BufferList, gst::Caps)>, gst::FlowError> { + let aggregator = self.instance(); + let class = aggregator.class(); + let variant = class.as_ref().variant; + + if settings.header_update_mode == super::HeaderUpdateMode::None && at_eos { + return Ok(None); + } + + assert!(!at_eos || state.streams.iter().all(|s| s.queued_gops.is_empty())); + + let duration = if variant == super::Variant::ONVIF { + state + .end_utc_time + .opt_checked_sub(state.start_utc_time) + .ok() + .flatten() + } else { + state + .end_pts + .opt_checked_sub(state.earliest_pts) + .ok() + .flatten() + }; + + let streams = state + .streams + .iter() + .map(|s| s.caps.clone()) + .collect::<Vec<_>>(); + + let mut buffer = boxes::create_fmp4_header(super::HeaderConfiguration { + variant, + update: at_eos, + streams: streams.as_slice(), + write_mehd: settings.write_mehd, + duration: if at_eos { duration } else { None }, + start_utc_time: state + .start_utc_time + .map(|unix| unix.nseconds() / 100 + UNIX_1601_OFFSET * 10_000_000), + }) + .map_err(|err| { + gst::error!(CAT, imp: self, "Failed to create FMP4 header: {}", err); + gst::FlowError::Error + })?; + + { + let buffer = buffer.get_mut().unwrap(); + + // No timestamps + + // Header is DISCONT|HEADER + buffer.set_flags(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER); + } + + // Remember stream header for later + state.stream_header = Some(buffer.clone()); + + let variant = match variant { + super::Variant::ISO | super::Variant::DASH | super::Variant::ONVIF => "iso-fragmented", + super::Variant::CMAF => "cmaf", + }; + let caps = gst::Caps::builder("video/quicktime") + .field("variant", variant) + .field("streamheader", gst::Array::new(&[&buffer])) + .build(); + + let mut list = gst::BufferList::new_sized(1); + { + let list = list.get_mut().unwrap(); + list.add(buffer); + } + + Ok(Some((list, caps))) + } +} + +#[glib::object_subclass] +impl ObjectSubclass for FMP4Mux { + const NAME: &'static str = "GstFMP4Mux"; + type Type = super::FMP4Mux; + type ParentType = gst_base::Aggregator; + type Class = Class; +} + +impl ObjectImpl for FMP4Mux { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| { + vec![ + // TODO: Add chunk-duration property separate from fragment-size + glib::ParamSpecUInt64::builder("fragment-duration") + .nick("Fragment Duration") + .blurb("Duration for each FMP4 fragment") + .default_value(DEFAULT_FRAGMENT_DURATION.nseconds()) + .mutable_ready() + .build(), + glib::ParamSpecEnum::builder::<super::HeaderUpdateMode>("header-update-mode", DEFAULT_HEADER_UPDATE_MODE) + .nick("Header update mode") + .blurb("Mode for updating the header at the end of the stream") + .mutable_ready() + .build(), + glib::ParamSpecBoolean::builder("write-mfra") + .nick("Write mfra box") + .blurb("Write fragment random access box at the end of the stream") + .default_value(DEFAULT_WRITE_MFRA) + .mutable_ready() + .build(), + glib::ParamSpecBoolean::builder("write-mehd") + .nick("Write mehd box") + .blurb("Write movie extends header box with the duration at the end of the stream (needs a header-update-mode enabled)") + .default_value(DEFAULT_WRITE_MFRA) + .mutable_ready() + .build(), + glib::ParamSpecUInt64::builder("interleave-bytes") + .nick("Interleave Bytes") + .blurb("Interleave between streams in bytes") + .default_value(DEFAULT_INTERLEAVE_BYTES.unwrap_or(0)) + .mutable_ready() + .build(), + glib::ParamSpecUInt64::builder("interleave-time") + .nick("Interleave Time") + .blurb("Interleave between streams in nanoseconds") + .default_value(DEFAULT_INTERLEAVE_TIME.map(gst::ClockTime::nseconds).unwrap_or(u64::MAX)) + .mutable_ready() + .build(), + ] + }); + + &*PROPERTIES + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + match pspec.name() { + "fragment-duration" => { + let mut settings = self.settings.lock().unwrap(); + let fragment_duration = value.get().expect("type checked upstream"); + if settings.fragment_duration != fragment_duration { + settings.fragment_duration = fragment_duration; + drop(settings); + self.instance().set_latency(fragment_duration, None); + } + } + + "header-update-mode" => { + let mut settings = self.settings.lock().unwrap(); + settings.header_update_mode = value.get().expect("type checked upstream"); + } + + "write-mfra" => { + let mut settings = self.settings.lock().unwrap(); + settings.write_mfra = value.get().expect("type checked upstream"); + } + + "write-mehd" => { + let mut settings = self.settings.lock().unwrap(); + settings.write_mehd = value.get().expect("type checked upstream"); + } + + "interleave-bytes" => { + let mut settings = self.settings.lock().unwrap(); + settings.interleave_bytes = match value.get().expect("type checked upstream") { + 0 => None, + v => Some(v), + }; + } + + "interleave-time" => { + let mut settings = self.settings.lock().unwrap(); + settings.interleave_time = match value.get().expect("type checked upstream") { + Some(gst::ClockTime::ZERO) | None => None, + v => v, + }; + } + + _ => unimplemented!(), + } + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + match pspec.name() { + "fragment-duration" => { + let settings = self.settings.lock().unwrap(); + settings.fragment_duration.to_value() + } + + "header-update-mode" => { + let settings = self.settings.lock().unwrap(); + settings.header_update_mode.to_value() + } + + "write-mfra" => { + let settings = self.settings.lock().unwrap(); + settings.write_mfra.to_value() + } + + "write-mehd" => { + let settings = self.settings.lock().unwrap(); + settings.write_mehd.to_value() + } + + "interleave-bytes" => { + let settings = self.settings.lock().unwrap(); + settings.interleave_bytes.unwrap_or(0).to_value() + } + + "interleave-time" => { + let settings = self.settings.lock().unwrap(); + settings.interleave_time.to_value() + } + + _ => unimplemented!(), + } + } + + fn constructed(&self) { + self.parent_constructed(); + + let obj = self.instance(); + let class = obj.class(); + for templ in class.pad_template_list().filter(|templ| { + templ.presence() == gst::PadPresence::Always + && templ.direction() == gst::PadDirection::Sink + }) { + let sinkpad = + gst::PadBuilder::<gst_base::AggregatorPad>::from_template(&templ, Some("sink")) + .flags(gst::PadFlags::ACCEPT_INTERSECT) + .build(); + + obj.add_pad(&sinkpad).unwrap(); + } + + obj.set_latency(Settings::default().fragment_duration, None); + } +} + +impl GstObjectImpl for FMP4Mux {} + +impl ElementImpl for FMP4Mux { + fn request_new_pad( + &self, + templ: &gst::PadTemplate, + name: Option<&str>, + caps: Option<&gst::Caps>, + ) -> Option<gst::Pad> { + let state = self.state.lock().unwrap(); + if state.stream_header.is_some() { + gst::error!( + CAT, + imp: self, + "Can't request new pads after header was generated" + ); + return None; + } + + self.parent_request_new_pad(templ, name, caps) + } +} + +impl AggregatorImpl for FMP4Mux { + fn next_time(&self) -> Option<gst::ClockTime> { + let state = self.state.lock().unwrap(); + state.fragment_start_pts.opt_add(state.timeout_delay) + } + + fn sink_query( + &self, + aggregator_pad: &gst_base::AggregatorPad, + query: &mut gst::QueryRef, + ) -> bool { + use gst::QueryViewMut; + + gst::trace!(CAT, obj: aggregator_pad, "Handling query {:?}", query); + + match query.view_mut() { + QueryViewMut::Caps(q) => { + let allowed_caps = aggregator_pad + .current_caps() + .unwrap_or_else(|| aggregator_pad.pad_template_caps()); + + if let Some(filter_caps) = q.filter() { + let res = filter_caps + .intersect_with_mode(&allowed_caps, gst::CapsIntersectMode::First); + q.set_result(&res); + } else { + q.set_result(&allowed_caps); + } + + true + } + _ => self.parent_sink_query(aggregator_pad, query), + } + } + + fn sink_event_pre_queue( + &self, + aggregator_pad: &gst_base::AggregatorPad, + mut event: gst::Event, + ) -> Result<gst::FlowSuccess, gst::FlowError> { + use gst::EventView; + + gst::trace!(CAT, obj: aggregator_pad, "Handling event {:?}", event); + + match event.view() { + EventView::Segment(ev) => { + if ev.segment().format() != gst::Format::Time { + gst::warning!( + CAT, + obj: aggregator_pad, + "Received non-TIME segment, replacing with default TIME segment" + ); + let segment = gst::FormattedSegment::<gst::ClockTime>::new(); + event = gst::event::Segment::builder(&segment) + .seqnum(event.seqnum()) + .build(); + } + self.parent_sink_event_pre_queue(aggregator_pad, event) + } + _ => self.parent_sink_event_pre_queue(aggregator_pad, event), + } + } + + fn sink_event(&self, aggregator_pad: &gst_base::AggregatorPad, event: gst::Event) -> bool { + use gst::EventView; + + gst::trace!(CAT, obj: aggregator_pad, "Handling event {:?}", event); + + match event.view() { + EventView::Segment(ev) => { + // Already fixed-up above to always be a TIME segment + let segment = ev + .segment() + .clone() + .downcast::<gst::ClockTime>() + .expect("non-TIME segment"); + gst::info!(CAT, obj: aggregator_pad, "Received segment {:?}", segment); + + // Only forward the segment event verbatim if this is a single stream variant. + // Otherwise we have to produce a default segment and re-timestamp all buffers + // with their running time. + let aggregator = self.instance(); + let class = aggregator.class(); + if class.as_ref().variant.is_single_stream() { + aggregator.update_segment(&segment); + } + + self.parent_sink_event(aggregator_pad, event) + } + EventView::Tag(_ev) => { + // TODO: Maybe store for putting into the headers of the next fragment? + + self.parent_sink_event(aggregator_pad, event) + } + _ => self.parent_sink_event(aggregator_pad, event), + } + } + + fn src_query(&self, query: &mut gst::QueryRef) -> bool { + use gst::QueryViewMut; + + gst::trace!(CAT, imp: self, "Handling query {:?}", query); + + match query.view_mut() { + QueryViewMut::Seeking(q) => { + // We can't really handle seeking, it would break everything + q.set(false, gst::ClockTime::ZERO, gst::ClockTime::NONE); + true + } + _ => self.parent_src_query(query), + } + } + + fn src_event(&self, event: gst::Event) -> bool { + use gst::EventView; + + gst::trace!(CAT, imp: self, "Handling event {:?}", event); + + match event.view() { + EventView::Seek(_ev) => false, + _ => self.parent_src_event(event), + } + } + + fn flush(&self) -> Result<gst::FlowSuccess, gst::FlowError> { + self.parent_flush()?; + + let mut state = self.state.lock().unwrap(); + + for stream in &mut state.streams { + stream.queued_gops.clear(); + stream.dts_offset = None; + stream.current_position = gst::ClockTime::ZERO; + stream.current_utc_time = gst::ClockTime::ZERO; + stream.fragment_filled = false; + } + + state.current_offset = 0; + state.fragment_offsets.clear(); + + Ok(gst::FlowSuccess::Ok) + } + + fn stop(&self) -> Result<(), gst::ErrorMessage> { + gst::trace!(CAT, imp: self, "Stopping"); + + let _ = self.parent_stop(); + + *self.state.lock().unwrap() = State::default(); + + Ok(()) + } + + fn start(&self) -> Result<(), gst::ErrorMessage> { + gst::trace!(CAT, imp: self, "Starting"); + + self.parent_start()?; + + // For non-single-stream variants configure a default segment that allows for negative + // DTS so that we can correctly re-timestamp buffers with their running times. + let aggregator = self.instance(); + let class = aggregator.class(); + if !class.as_ref().variant.is_single_stream() { + let mut segment = gst::FormattedSegment::<gst::ClockTime>::new(); + segment.set_start(SEGMENT_OFFSET); + segment.set_position(SEGMENT_OFFSET); + aggregator.update_segment(&segment); + } + + *self.state.lock().unwrap() = State::default(); + + Ok(()) + } + + fn negotiate(&self) -> bool { + true + } + + fn aggregate(&self, timeout: bool) -> Result<gst::FlowSuccess, gst::FlowError> { + let settings = self.settings.lock().unwrap().clone(); + + let mut upstream_events = vec![]; + + let all_eos; + let (caps, buffers) = { + let mut state = self.state.lock().unwrap(); + + // Create streams + if state.streams.is_empty() { + self.create_streams(&mut state)?; + } + + // Queue buffers from all streams that are not filled for the current fragment yet + // + // Always take a buffer from the stream with the earliest queued buffer to keep the + // fill-level at all sinkpads in sync. + let fragment_start_pts = state.fragment_start_pts; + + while let Some((idx, stream)) = self.find_earliest_stream(&mut state, timeout)? { + // Can only happen if the stream was flushed in the meantime + let buffer = match stream.sinkpad.pop_buffer() { + Some(buffer) => buffer, + None => continue, + }; + + // Can only happen if the stream was flushed in the meantime + let segment = match stream + .sinkpad + .segment() + .clone() + .downcast::<gst::ClockTime>() + .ok() + { + Some(segment) => segment, + None => { + gst::error!(CAT, obj: &stream.sinkpad, "Got buffer before segment"); + return Err(gst::FlowError::Error); + } + }; + + // Queue up the buffer and update GOP tracking state + self.queue_gops(idx, stream, &segment, buffer)?; + + // Check if this stream is filled enough now. + if let Some((queued_end_pts, fragment_start_pts)) = Option::zip( + stream + .queued_gops + .iter() + .find(|gop| gop.final_end_pts) + .map(|gop| gop.end_pts), + fragment_start_pts, + ) { + if queued_end_pts.saturating_sub(fragment_start_pts) + >= settings.fragment_duration + { + gst::debug!(CAT, obj: &stream.sinkpad, "Stream queued enough data for this fragment"); + stream.fragment_filled = true; + } + } + } + + // Calculate the earliest PTS after queueing input if we can now. + if state.earliest_pts.is_none() { + let mut earliest_pts = None; + + for stream in &state.streams { + let stream_earliest_pts = match stream.queued_gops.back() { + None => { + earliest_pts = None; + break; + } + Some(oldest_gop) => { + if !timeout && !oldest_gop.final_earliest_pts { + earliest_pts = None; + break; + } + + oldest_gop.earliest_pts + } + }; + + if earliest_pts.opt_gt(stream_earliest_pts).unwrap_or(true) { + earliest_pts = Some(stream_earliest_pts); + } + } + + if let Some(earliest_pts) = earliest_pts { + gst::info!(CAT, imp: self, "Got earliest PTS {}", earliest_pts); + state.earliest_pts = Some(earliest_pts); + state.fragment_start_pts = Some(earliest_pts); + + gst::debug!( + CAT, + imp: self, + "Sending first force-keyunit event for running time {}", + earliest_pts + settings.fragment_duration, + ); + + let fku = gst_video::UpstreamForceKeyUnitEvent::builder() + .running_time(earliest_pts + settings.fragment_duration) + .all_headers(true) + .build(); + + for stream in &mut state.streams { + upstream_events.push((stream.sinkpad.clone(), fku.clone())); + + // Check if this stream is filled enough now. + if let Some(queued_end_pts) = stream + .queued_gops + .iter() + .find(|gop| gop.final_end_pts) + .map(|gop| gop.end_pts) + { + if queued_end_pts.saturating_sub(earliest_pts) + >= settings.fragment_duration + { + gst::debug!(CAT, obj: &stream.sinkpad, "Stream queued enough data for this fragment"); + stream.fragment_filled = true; + } + } + } + } + } + + all_eos = state.streams.iter().all(|stream| stream.sinkpad.is_eos()); + if all_eos { + gst::debug!(CAT, imp: self, "All streams are EOS now"); + } + + // If enough GOPs were queued, drain and create the output fragment + match self.drain( + &mut state, + &settings, + timeout, + all_eos, + &mut upstream_events, + ) { + Ok(res) => res, + Err(gst_base::AGGREGATOR_FLOW_NEED_DATA) => { + gst::element_imp_warning!( + self, + gst::StreamError::Format, + ["Longer GOPs than fragment duration"] + ); + state.timeout_delay += 1.seconds(); + + drop(state); + for (sinkpad, event) in upstream_events { + sinkpad.push_event(event); + } + return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA); + } + Err(err) => return Err(err), + } + }; + + for (sinkpad, event) in upstream_events { + sinkpad.push_event(event); + } + + if let Some(caps) = caps { + gst::debug!(CAT, imp: self, "Setting caps on source pad: {:?}", caps); + self.instance().set_src_caps(&caps); + } + + if let Some(buffers) = buffers { + gst::trace!(CAT, imp: self, "Pushing buffer list {:?}", buffers); + self.instance().finish_buffer_list(buffers)?; + } + + if all_eos { + gst::debug!(CAT, imp: self, "Doing EOS handling"); + + if settings.header_update_mode != super::HeaderUpdateMode::None { + let updated_header = + self.update_header(&mut self.state.lock().unwrap(), &settings, true); + match updated_header { + Ok(Some((buffer_list, caps))) => { + match settings.header_update_mode { + super::HeaderUpdateMode::None => unreachable!(), + super::HeaderUpdateMode::Rewrite => { + let mut q = gst::query::Seeking::new(gst::Format::Bytes); + if self.instance().src_pad().peer_query(&mut q) && q.result().0 { + let aggregator = self.instance(); + + aggregator.set_src_caps(&caps); + + // Seek to the beginning with a default bytes segment + aggregator + .update_segment( + &gst::FormattedSegment::<gst::format::Bytes>::new(), + ); + + if let Err(err) = aggregator.finish_buffer_list(buffer_list) { + gst::error!( + CAT, + imp: self, + "Failed pushing updated header buffer downstream: {:?}", + err, + ); + } + } else { + gst::error!( + CAT, + imp: self, + "Can't rewrite header because downstream is not seekable" + ); + } + } + super::HeaderUpdateMode::Update => { + let aggregator = self.instance(); + + aggregator.set_src_caps(&caps); + if let Err(err) = aggregator.finish_buffer_list(buffer_list) { + gst::error!( + CAT, + imp: self, + "Failed pushing updated header buffer downstream: {:?}", + err, + ); + } + } + } + } + Ok(None) => {} + Err(err) => { + gst::error!( + CAT, + imp: self, + "Failed to generate updated header: {:?}", + err + ); + } + } + } + + // Need to output new headers if started again after EOS + self.state.lock().unwrap().sent_headers = false; + + Err(gst::FlowError::Eos) + } else { + Ok(gst::FlowSuccess::Ok) + } + } +} + +#[repr(C)] +pub(crate) struct Class { + parent: gst_base::ffi::GstAggregatorClass, + variant: super::Variant, +} + +unsafe impl ClassStruct for Class { + type Type = FMP4Mux; +} + +impl std::ops::Deref for Class { + type Target = glib::Class<gst_base::Aggregator>; + + fn deref(&self) -> &Self::Target { + unsafe { &*(&self.parent as *const _ as *const _) } + } +} + +unsafe impl<T: FMP4MuxImpl> IsSubclassable<T> for super::FMP4Mux { + fn class_init(class: &mut glib::Class<Self>) { + Self::parent_class_init::<T>(class); + + let class = class.as_mut(); + class.variant = T::VARIANT; + } +} + +pub(crate) trait FMP4MuxImpl: AggregatorImpl { + const VARIANT: super::Variant; +} + +#[derive(Default)] +pub(crate) struct ISOFMP4Mux; + +#[glib::object_subclass] +impl ObjectSubclass for ISOFMP4Mux { + const NAME: &'static str = "GstISOFMP4Mux"; + type Type = super::ISOFMP4Mux; + type ParentType = super::FMP4Mux; +} + +impl ObjectImpl for ISOFMP4Mux {} + +impl GstObjectImpl for ISOFMP4Mux {} + +impl ElementImpl for ISOFMP4Mux { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "ISOFMP4Mux", + "Codec/Muxer", + "ISO fragmented MP4 muxer", + "Sebastian Dröge <sebastian@centricular.com>", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| { + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &gst::Caps::builder("video/quicktime") + .field("variant", "iso-fragmented") + .build(), + ) + .unwrap(); + + let sink_pad_template = gst::PadTemplate::new( + "sink_%u", + gst::PadDirection::Sink, + gst::PadPresence::Request, + &[ + gst::Structure::builder("video/x-h264") + .field("stream-format", gst::List::new(["avc", "avc3"])) + .field("alignment", "au") + .field("width", gst::IntRange::new(1, u16::MAX as i32)) + .field("height", gst::IntRange::new(1, u16::MAX as i32)) + .build(), + gst::Structure::builder("video/x-h265") + .field("stream-format", gst::List::new(["hvc1", "hev1"])) + .field("alignment", "au") + .field("width", gst::IntRange::new(1, u16::MAX as i32)) + .field("height", gst::IntRange::new(1, u16::MAX as i32)) + .build(), + gst::Structure::builder("audio/mpeg") + .field("mpegversion", 4i32) + .field("stream-format", "raw") + .field("channels", gst::IntRange::new(1, u16::MAX as i32)) + .field("rate", gst::IntRange::new(1, i32::MAX)) + .build(), + ] + .into_iter() + .collect::<gst::Caps>(), + ) + .unwrap(); + + vec![src_pad_template, sink_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } +} + +impl AggregatorImpl for ISOFMP4Mux {} + +impl FMP4MuxImpl for ISOFMP4Mux { + const VARIANT: super::Variant = super::Variant::ISO; +} + +#[derive(Default)] +pub(crate) struct CMAFMux; + +#[glib::object_subclass] +impl ObjectSubclass for CMAFMux { + const NAME: &'static str = "GstCMAFMux"; + type Type = super::CMAFMux; + type ParentType = super::FMP4Mux; +} + +impl ObjectImpl for CMAFMux {} + +impl GstObjectImpl for CMAFMux {} + +impl ElementImpl for CMAFMux { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "CMAFMux", + "Codec/Muxer", + "CMAF fragmented MP4 muxer", + "Sebastian Dröge <sebastian@centricular.com>", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| { + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &gst::Caps::builder("video/quicktime") + .field("variant", "cmaf") + .build(), + ) + .unwrap(); + + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &[ + gst::Structure::builder("video/x-h264") + .field("stream-format", gst::List::new(["avc", "avc3"])) + .field("alignment", "au") + .field("width", gst::IntRange::new(1, u16::MAX as i32)) + .field("height", gst::IntRange::new(1, u16::MAX as i32)) + .build(), + gst::Structure::builder("video/x-h265") + .field("stream-format", gst::List::new(["hvc1", "hev1"])) + .field("alignment", "au") + .field("width", gst::IntRange::new(1, u16::MAX as i32)) + .field("height", gst::IntRange::new(1, u16::MAX as i32)) + .build(), + gst::Structure::builder("audio/mpeg") + .field("mpegversion", 4i32) + .field("stream-format", "raw") + .field("channels", gst::IntRange::new(1, u16::MAX as i32)) + .field("rate", gst::IntRange::new(1, i32::MAX)) + .build(), + ] + .into_iter() + .collect::<gst::Caps>(), + ) + .unwrap(); + + vec![src_pad_template, sink_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } +} + +impl AggregatorImpl for CMAFMux {} + +impl FMP4MuxImpl for CMAFMux { + const VARIANT: super::Variant = super::Variant::CMAF; +} + +#[derive(Default)] +pub(crate) struct DASHMP4Mux; + +#[glib::object_subclass] +impl ObjectSubclass for DASHMP4Mux { + const NAME: &'static str = "GstDASHMP4Mux"; + type Type = super::DASHMP4Mux; + type ParentType = super::FMP4Mux; +} + +impl ObjectImpl for DASHMP4Mux {} + +impl GstObjectImpl for DASHMP4Mux {} + +impl ElementImpl for DASHMP4Mux { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "DASHMP4Mux", + "Codec/Muxer", + "DASH fragmented MP4 muxer", + "Sebastian Dröge <sebastian@centricular.com>", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| { + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &gst::Caps::builder("video/quicktime") + .field("variant", "iso-fragmented") + .build(), + ) + .unwrap(); + + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &[ + gst::Structure::builder("video/x-h264") + .field("stream-format", gst::List::new(&[&"avc", &"avc3"])) + .field("alignment", "au") + .field("width", gst::IntRange::<i32>::new(1, u16::MAX as i32)) + .field("height", gst::IntRange::<i32>::new(1, u16::MAX as i32)) + .build(), + gst::Structure::builder("video/x-h265") + .field("stream-format", gst::List::new(&[&"hvc1", &"hev1"])) + .field("alignment", "au") + .field("width", gst::IntRange::<i32>::new(1, u16::MAX as i32)) + .field("height", gst::IntRange::<i32>::new(1, u16::MAX as i32)) + .build(), + gst::Structure::builder("audio/mpeg") + .field("mpegversion", 4i32) + .field("stream-format", "raw") + .field("channels", gst::IntRange::<i32>::new(1, u16::MAX as i32)) + .field("rate", gst::IntRange::<i32>::new(1, i32::MAX)) + .build(), + ] + .into_iter() + .collect::<gst::Caps>(), + ) + .unwrap(); + + vec![src_pad_template, sink_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } +} + +impl AggregatorImpl for DASHMP4Mux {} + +impl FMP4MuxImpl for DASHMP4Mux { + const VARIANT: super::Variant = super::Variant::DASH; +} + +#[derive(Default)] +pub(crate) struct ONVIFFMP4Mux; + +#[glib::object_subclass] +impl ObjectSubclass for ONVIFFMP4Mux { + const NAME: &'static str = "GstONVIFFMP4Mux"; + type Type = super::ONVIFFMP4Mux; + type ParentType = super::FMP4Mux; +} + +impl ObjectImpl for ONVIFFMP4Mux {} + +impl GstObjectImpl for ONVIFFMP4Mux {} + +impl ElementImpl for ONVIFFMP4Mux { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "ONVIFFMP4Mux", + "Codec/Muxer", + "ONVIF fragmented MP4 muxer", + "Sebastian Dröge <sebastian@centricular.com>", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| { + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &gst::Caps::builder("video/quicktime") + .field("variant", "iso-fragmented") + .build(), + ) + .unwrap(); + + let sink_pad_template = gst::PadTemplate::new( + "sink_%u", + gst::PadDirection::Sink, + gst::PadPresence::Request, + &[ + gst::Structure::builder("video/x-h264") + .field("stream-format", gst::List::new(&[&"avc", &"avc3"])) + .field("alignment", "au") + .field("width", gst::IntRange::<i32>::new(1, u16::MAX as i32)) + .field("height", gst::IntRange::<i32>::new(1, u16::MAX as i32)) + .build(), + gst::Structure::builder("video/x-h265") + .field("stream-format", gst::List::new(&[&"hvc1", &"hev1"])) + .field("alignment", "au") + .field("width", gst::IntRange::<i32>::new(1, u16::MAX as i32)) + .field("height", gst::IntRange::<i32>::new(1, u16::MAX as i32)) + .build(), + gst::Structure::builder("image/jpeg") + .field("width", gst::IntRange::<i32>::new(1, u16::MAX as i32)) + .field("height", gst::IntRange::<i32>::new(1, u16::MAX as i32)) + .build(), + gst::Structure::builder("audio/mpeg") + .field("mpegversion", 4i32) + .field("stream-format", "raw") + .field("channels", gst::IntRange::<i32>::new(1, u16::MAX as i32)) + .field("rate", gst::IntRange::<i32>::new(1, i32::MAX)) + .build(), + gst::Structure::builder("audio/x-alaw") + .field("channels", gst::IntRange::<i32>::new(1, 2)) + .field("rate", gst::IntRange::<i32>::new(1, i32::MAX)) + .build(), + gst::Structure::builder("audio/x-mulaw") + .field("channels", gst::IntRange::<i32>::new(1, 2)) + .field("rate", gst::IntRange::<i32>::new(1, i32::MAX)) + .build(), + gst::Structure::builder("audio/x-adpcm") + .field("layout", "g726") + .field("channels", 1i32) + .field("rate", 8000i32) + .field("bitrate", gst::List::new([16000i32, 24000, 32000, 40000])) + .build(), + gst::Structure::builder("application/x-onvif-metadata") + .field("parsed", true) + .build(), + ] + .into_iter() + .collect::<gst::Caps>(), + ) + .unwrap(); + + vec![src_pad_template, sink_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } +} + +impl AggregatorImpl for ONVIFFMP4Mux {} + +impl FMP4MuxImpl for ONVIFFMP4Mux { + const VARIANT: super::Variant = super::Variant::ONVIF; +} diff --git a/mux/fmp4/src/fmp4mux/mod.rs b/mux/fmp4/src/fmp4mux/mod.rs new file mode 100644 index 000000000..76c2da9c2 --- /dev/null +++ b/mux/fmp4/src/fmp4mux/mod.rs @@ -0,0 +1,146 @@ +// Copyright (C) 2021 Sebastian Dröge <sebastian@centricular.com> +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// <https://mozilla.org/MPL/2.0/>. +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::glib; +use gst::prelude::*; + +mod boxes; +mod imp; + +glib::wrapper! { + pub(crate) struct FMP4Mux(ObjectSubclass<imp::FMP4Mux>) @extends gst_base::Aggregator, gst::Element, gst::Object; +} + +glib::wrapper! { + pub(crate) struct ISOFMP4Mux(ObjectSubclass<imp::ISOFMP4Mux>) @extends FMP4Mux, gst_base::Aggregator, gst::Element, gst::Object; +} + +glib::wrapper! { + pub(crate) struct CMAFMux(ObjectSubclass<imp::CMAFMux>) @extends FMP4Mux, gst_base::Aggregator, gst::Element, gst::Object; +} + +glib::wrapper! { + pub(crate) struct DASHMP4Mux(ObjectSubclass<imp::DASHMP4Mux>) @extends FMP4Mux, gst_base::Aggregator, gst::Element, gst::Object; +} + +glib::wrapper! { + pub(crate) struct ONVIFFMP4Mux(ObjectSubclass<imp::ONVIFFMP4Mux>) @extends FMP4Mux, gst_base::Aggregator, gst::Element, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + FMP4Mux::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty()); + HeaderUpdateMode::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty()); + gst::Element::register( + Some(plugin), + "isofmp4mux", + gst::Rank::Primary, + ISOFMP4Mux::static_type(), + )?; + gst::Element::register( + Some(plugin), + "cmafmux", + gst::Rank::Primary, + CMAFMux::static_type(), + )?; + gst::Element::register( + Some(plugin), + "dashmp4mux", + gst::Rank::Primary, + DASHMP4Mux::static_type(), + )?; + gst::Element::register( + Some(plugin), + "onviffmp4mux", + gst::Rank::Primary, + ONVIFFMP4Mux::static_type(), + )?; + + Ok(()) +} + +#[derive(Debug)] +pub(crate) struct HeaderConfiguration<'a> { + variant: Variant, + update: bool, + /// First caps must be the video/reference stream. Must be in the order the tracks are going to + /// be used later for the fragments too. + streams: &'a [gst::Caps], + write_mehd: bool, + duration: Option<gst::ClockTime>, + /// Start UTC time in ONVIF mode. + /// Since Jan 1 1601 in 100ns units. + start_utc_time: Option<u64>, +} + +#[derive(Debug)] +pub(crate) struct FragmentHeaderConfiguration<'a> { + variant: Variant, + sequence_number: u32, + streams: &'a [(gst::Caps, Option<FragmentTimingInfo>)], + buffers: &'a [Buffer], +} + +#[derive(Debug)] +pub(crate) struct FragmentTimingInfo { + /// Start time of this fragment + start_time: gst::ClockTime, + /// Set if this is an intra-only stream + intra_only: bool, +} + +#[derive(Debug)] +pub(crate) struct Buffer { + /// Track index + idx: usize, + + /// Actual buffer + buffer: gst::Buffer, + + /// Timestamp + timestamp: gst::ClockTime, + + /// Sample duration + duration: gst::ClockTime, + + /// Composition time offset + composition_time_offset: Option<i64>, +} + +#[allow(clippy::upper_case_acronyms)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum Variant { + ISO, + CMAF, + DASH, + ONVIF, +} + +impl Variant { + pub(crate) fn is_single_stream(self) -> bool { + match self { + Variant::ISO | Variant::ONVIF => false, + Variant::CMAF | Variant::DASH => true, + } + } +} + +#[derive(Debug)] +pub(crate) struct FragmentOffset { + time: gst::ClockTime, + offset: u64, +} + +#[allow(clippy::upper_case_acronyms)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, glib::Enum)] +#[repr(i32)] +#[enum_type(name = "GstFMP4MuxHeaderUpdateMode")] +pub(crate) enum HeaderUpdateMode { + None, + Rewrite, + Update, +} diff --git a/mux/fmp4/src/lib.rs b/mux/fmp4/src/lib.rs new file mode 100644 index 000000000..697c7cbf1 --- /dev/null +++ b/mux/fmp4/src/lib.rs @@ -0,0 +1,34 @@ +// Copyright (C) 2021 Sebastian Dröge <sebastian@centricular.com> +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// <https://mozilla.org/MPL/2.0/>. +// +// SPDX-License-Identifier: MPL-2.0 +#![allow(clippy::non_send_fields_in_send_ty, unused_doc_comments)] + +/** + * plugin-fmp4: + * + * Since: plugins-rs-0.8.0 + */ +use gst::glib; + +mod fmp4mux; + +fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + fmp4mux::register(plugin) +} + +gst::plugin_define!( + fmp4, + env!("CARGO_PKG_DESCRIPTION"), + plugin_init, + concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")), + // FIXME: MPL-2.0 is only allowed since 1.18.3 (as unknown) and 1.20 (as known) + "MPL", + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_REPOSITORY"), + env!("BUILD_REL_DATE") +); diff --git a/mux/fmp4/tests/tests.rs b/mux/fmp4/tests/tests.rs new file mode 100644 index 000000000..073b6ae05 --- /dev/null +++ b/mux/fmp4/tests/tests.rs @@ -0,0 +1,1241 @@ +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// <https://mozilla.org/MPL/2.0/>. +// +// SPDX-License-Identifier: MPL-2.0 +// + +use gst::prelude::*; + +fn init() { + use std::sync::Once; + static INIT: Once = Once::new(); + + INIT.call_once(|| { + gst::init().unwrap(); + gstfmp4::plugin_register_static().unwrap(); + }); +} + +fn test_buffer_flags_single_stream(cmaf: bool) { + let mut h = if cmaf { + gst_check::Harness::new("cmafmux") + } else { + gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src")) + }; + + // 5s fragment duration + h.element() + .unwrap() + .set_property("fragment-duration", 5.seconds()); + + h.set_src_caps( + gst::Caps::builder("video/x-h264") + .field("width", 1920i32) + .field("height", 1080i32) + .field("framerate", gst::Fraction::new(30, 1)) + .field("stream-format", "avc") + .field("alignment", "au") + .field("codec_data", gst::Buffer::with_size(1).unwrap()) + .build(), + ); + h.play(); + + let output_offset = if cmaf { + gst::ClockTime::ZERO + } else { + (60 * 60 * 1000).seconds() + }; + + // Push 7 buffers of 1s each, 1st and 6 buffer without DELTA_UNIT flag + for i in 0..7 { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(i.seconds()); + buffer.set_dts(i.seconds()); + buffer.set_duration(gst::ClockTime::SECOND); + if i != 0 && i != 5 { + buffer.set_flags(gst::BufferFlags::DELTA_UNIT); + } + } + assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok)); + + if i == 2 { + let ev = loop { + let ev = h.pull_upstream_event().unwrap(); + if ev.type_() != gst::EventType::Reconfigure + && ev.type_() != gst::EventType::Latency + { + break ev; + } + }; + + assert_eq!(ev.type_(), gst::EventType::CustomUpstream); + assert_eq!( + gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(), + gst_video::UpstreamForceKeyUnitEvent { + running_time: Some(5.seconds()), + all_headers: true, + count: 0 + } + ); + } + } + + let header = h.pull().unwrap(); + assert_eq!( + header.flags(), + gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT + ); + assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset)); + assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset)); + + let fragment_header = h.pull().unwrap(); + assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER); + assert_eq!( + fragment_header.pts(), + Some(gst::ClockTime::ZERO + output_offset) + ); + assert_eq!( + fragment_header.dts(), + Some(gst::ClockTime::ZERO + output_offset) + ); + assert_eq!(fragment_header.duration(), Some(5.seconds())); + + for i in 0..5 { + let buffer = h.pull().unwrap(); + if i == 4 { + assert_eq!( + buffer.flags(), + gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER + ); + } else { + assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); + } + assert_eq!(buffer.pts(), Some(i.seconds() + output_offset)); + assert_eq!(buffer.dts(), Some(i.seconds() + output_offset)); + assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND)); + } + + h.push_event(gst::event::Eos::new()); + + let fragment_header = h.pull().unwrap(); + assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER); + assert_eq!(fragment_header.pts(), Some(5.seconds() + output_offset)); + assert_eq!(fragment_header.dts(), Some(5.seconds() + output_offset)); + assert_eq!(fragment_header.duration(), Some(2.seconds())); + + for i in 5..7 { + let buffer = h.pull().unwrap(); + if i == 6 { + assert_eq!( + buffer.flags(), + gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER + ); + } else { + assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); + } + assert_eq!(buffer.pts(), Some(i.seconds() + output_offset)); + assert_eq!(buffer.dts(), Some(i.seconds() + output_offset)); + assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND)); + } + + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::StreamStart); + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Caps); + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Segment); + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Eos); +} + +#[test] +fn test_buffer_flags_single_stream_cmaf() { + init(); + + test_buffer_flags_single_stream(true); +} + +#[test] +fn test_buffer_flags_single_stream_iso() { + init(); + + test_buffer_flags_single_stream(false); +} + +#[test] +fn test_buffer_flags_multi_stream() { + init(); + + let mut h1 = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src")); + let mut h2 = gst_check::Harness::with_element(&h1.element().unwrap(), Some("sink_1"), None); + + // 5s fragment duration + h1.element() + .unwrap() + .set_property("fragment-duration", 5.seconds()); + + h1.set_src_caps( + gst::Caps::builder("video/x-h264") + .field("width", 1920i32) + .field("height", 1080i32) + .field("framerate", gst::Fraction::new(30, 1)) + .field("stream-format", "avc") + .field("alignment", "au") + .field("codec_data", gst::Buffer::with_size(1).unwrap()) + .build(), + ); + h1.play(); + + h2.set_src_caps( + gst::Caps::builder("audio/mpeg") + .field("mpegversion", 4i32) + .field("channels", 1i32) + .field("rate", 44100i32) + .field("stream-format", "raw") + .field("base-profile", "lc") + .field("profile", "lc") + .field("level", "2") + .field( + "codec_data", + gst::Buffer::from_slice([0x12, 0x08, 0x56, 0xe5, 0x00]), + ) + .build(), + ); + h2.play(); + + let output_offset = (60 * 60 * 1000).seconds(); + + // Push 7 buffers of 1s each, 1st and last buffer without DELTA_UNIT flag + for i in 0..7 { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(i.seconds()); + buffer.set_dts(i.seconds()); + buffer.set_duration(gst::ClockTime::SECOND); + if i != 0 && i != 5 { + buffer.set_flags(gst::BufferFlags::DELTA_UNIT); + } + } + assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok)); + + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(i.seconds()); + buffer.set_dts(i.seconds()); + buffer.set_duration(gst::ClockTime::SECOND); + } + assert_eq!(h2.push(buffer), Ok(gst::FlowSuccess::Ok)); + + if i == 2 { + let ev = loop { + let ev = h1.pull_upstream_event().unwrap(); + if ev.type_() != gst::EventType::Reconfigure + && ev.type_() != gst::EventType::Latency + { + break ev; + } + }; + + assert_eq!(ev.type_(), gst::EventType::CustomUpstream); + assert_eq!( + gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(), + gst_video::UpstreamForceKeyUnitEvent { + running_time: Some(5.seconds()), + all_headers: true, + count: 0 + } + ); + + let ev = loop { + let ev = h2.pull_upstream_event().unwrap(); + if ev.type_() != gst::EventType::Reconfigure + && ev.type_() != gst::EventType::Latency + { + break ev; + } + }; + + assert_eq!(ev.type_(), gst::EventType::CustomUpstream); + assert_eq!( + gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(), + gst_video::UpstreamForceKeyUnitEvent { + running_time: Some(5.seconds()), + all_headers: true, + count: 0 + } + ); + } + } + + let header = h1.pull().unwrap(); + assert_eq!( + header.flags(), + gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT + ); + assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset)); + assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset)); + + let fragment_header = h1.pull().unwrap(); + assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER); + assert_eq!( + fragment_header.pts(), + Some(gst::ClockTime::ZERO + output_offset) + ); + assert_eq!( + fragment_header.dts(), + Some(gst::ClockTime::ZERO + output_offset) + ); + assert_eq!(fragment_header.duration(), Some(5.seconds())); + + for i in 0..5 { + for j in 0..2 { + let buffer = h1.pull().unwrap(); + if i == 4 && j == 1 { + assert_eq!( + buffer.flags(), + gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER + ); + } else { + assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); + } + + assert_eq!(buffer.pts(), Some(i.seconds() + output_offset)); + + if j == 0 { + assert_eq!(buffer.dts(), Some(i.seconds() + output_offset)); + } else { + assert!(buffer.dts().is_none()); + } + assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND)); + } + } + + h1.push_event(gst::event::Eos::new()); + h2.push_event(gst::event::Eos::new()); + + let fragment_header = h1.pull().unwrap(); + assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER); + assert_eq!(fragment_header.pts(), Some(5.seconds() + output_offset)); + assert_eq!(fragment_header.dts(), Some(5.seconds() + output_offset)); + assert_eq!(fragment_header.duration(), Some(2.seconds())); + + for i in 5..7 { + for j in 0..2 { + let buffer = h1.pull().unwrap(); + if i == 6 && j == 1 { + assert_eq!( + buffer.flags(), + gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER + ); + } else { + assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); + } + assert_eq!(buffer.pts(), Some(i.seconds() + output_offset)); + if j == 0 { + assert_eq!(buffer.dts(), Some(i.seconds() + output_offset)); + } else { + assert!(buffer.dts().is_none()); + } + assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND)); + } + } + + let ev = h1.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::StreamStart); + let ev = h1.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Caps); + let ev = h1.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Segment); + let ev = h1.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Eos); +} + +#[test] +fn test_live_timeout() { + init(); + + let mut h1 = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src")); + let mut h2 = gst_check::Harness::with_element(&h1.element().unwrap(), Some("sink_1"), None); + + h1.use_testclock(); + + // 5s fragment duration + h1.element() + .unwrap() + .set_property("fragment-duration", 5.seconds()); + + h1.set_src_caps( + gst::Caps::builder("video/x-h264") + .field("width", 1920i32) + .field("height", 1080i32) + .field("framerate", gst::Fraction::new(30, 1)) + .field("stream-format", "avc") + .field("alignment", "au") + .field("codec_data", gst::Buffer::with_size(1).unwrap()) + .build(), + ); + h1.play(); + + h2.set_src_caps( + gst::Caps::builder("audio/mpeg") + .field("mpegversion", 4i32) + .field("channels", 1i32) + .field("rate", 44100i32) + .field("stream-format", "raw") + .field("base-profile", "lc") + .field("profile", "lc") + .field("level", "2") + .field( + "codec_data", + gst::Buffer::from_slice([0x12, 0x08, 0x56, 0xe5, 0x00]), + ) + .build(), + ); + h2.play(); + + let output_offset = (60 * 60 * 1000).seconds(); + + // Push 7 buffers of 1s each, 1st and last buffer without DELTA_UNIT flag + for i in 0..7 { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(i.seconds()); + buffer.set_dts(i.seconds()); + buffer.set_duration(gst::ClockTime::SECOND); + if i != 0 && i != 5 { + buffer.set_flags(gst::BufferFlags::DELTA_UNIT); + } + } + assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok)); + + // Skip buffer 4th and 6th buffer (end of fragment / stream) + if i == 4 || i == 6 { + continue; + } else { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(i.seconds()); + buffer.set_dts(i.seconds()); + buffer.set_duration(gst::ClockTime::SECOND); + } + assert_eq!(h2.push(buffer), Ok(gst::FlowSuccess::Ok)); + } + + if i == 2 { + let ev = loop { + let ev = h1.pull_upstream_event().unwrap(); + if ev.type_() != gst::EventType::Reconfigure + && ev.type_() != gst::EventType::Latency + { + break ev; + } + }; + + assert_eq!(ev.type_(), gst::EventType::CustomUpstream); + assert_eq!( + gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(), + gst_video::UpstreamForceKeyUnitEvent { + running_time: Some(5.seconds()), + all_headers: true, + count: 0 + } + ); + + let ev = loop { + let ev = h2.pull_upstream_event().unwrap(); + if ev.type_() != gst::EventType::Reconfigure + && ev.type_() != gst::EventType::Latency + { + break ev; + } + }; + + assert_eq!(ev.type_(), gst::EventType::CustomUpstream); + assert_eq!( + gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(), + gst_video::UpstreamForceKeyUnitEvent { + running_time: Some(5.seconds()), + all_headers: true, + count: 0 + } + ); + } + } + + // Advance time and crank the clock: this should bring us to the end of the first fragment + h1.set_time(5.seconds()).unwrap(); + h1.crank_single_clock_wait().unwrap(); + + let header = h1.pull().unwrap(); + assert_eq!( + header.flags(), + gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT + ); + assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset)); + assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset)); + + let fragment_header = h1.pull().unwrap(); + assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER); + assert_eq!( + fragment_header.pts(), + Some(gst::ClockTime::ZERO + output_offset) + ); + assert_eq!( + fragment_header.dts(), + Some(gst::ClockTime::ZERO + output_offset) + ); + assert_eq!(fragment_header.duration(), Some(5.seconds())); + + for i in 0..5 { + for j in 0..2 { + // Skip gap events that don't result in buffers + if j == 1 && i == 4 { + // Advance time and crank the clock another time. This brings us at the end of the + // EOS. + h1.crank_single_clock_wait().unwrap(); + continue; + } + + let buffer = h1.pull().unwrap(); + if i == 4 && j == 0 { + assert_eq!( + buffer.flags(), + gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER + ); + } else if i == 5 && j == 0 { + assert_eq!(buffer.flags(), gst::BufferFlags::HEADER); + } else { + assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); + } + + assert_eq!(buffer.pts(), Some(i.seconds() + output_offset)); + + if j == 0 { + assert_eq!(buffer.dts(), Some(i.seconds() + output_offset)); + } else { + assert!(buffer.dts().is_none()); + } + assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND)); + } + } + + h1.push_event(gst::event::Eos::new()); + h2.push_event(gst::event::Eos::new()); + + let fragment_header = h1.pull().unwrap(); + assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER); + assert_eq!(fragment_header.pts(), Some(5.seconds() + output_offset)); + assert_eq!(fragment_header.dts(), Some(5.seconds() + output_offset)); + assert_eq!(fragment_header.duration(), Some(2.seconds())); + + for i in 5..7 { + for j in 0..2 { + // Skip gap events that don't result in buffers + if j == 1 && i == 6 { + continue; + } + + let buffer = h1.pull().unwrap(); + if i == 6 && j == 0 { + assert_eq!( + buffer.flags(), + gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER + ); + } else { + assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); + } + assert_eq!(buffer.pts(), Some(i.seconds() + output_offset)); + if j == 0 { + assert_eq!(buffer.dts(), Some(i.seconds() + output_offset)); + } else { + assert!(buffer.dts().is_none()); + } + assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND)); + } + } + + let ev = h1.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::StreamStart); + let ev = h1.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Caps); + let ev = h1.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Segment); + let ev = h1.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Eos); +} + +#[test] +fn test_gap_events() { + init(); + + let mut h1 = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src")); + let mut h2 = gst_check::Harness::with_element(&h1.element().unwrap(), Some("sink_1"), None); + + h1.use_testclock(); + + // 5s fragment duration + h1.element() + .unwrap() + .set_property("fragment-duration", 5.seconds()); + + h1.set_src_caps( + gst::Caps::builder("video/x-h264") + .field("width", 1920i32) + .field("height", 1080i32) + .field("framerate", gst::Fraction::new(30, 1)) + .field("stream-format", "avc") + .field("alignment", "au") + .field("codec_data", gst::Buffer::with_size(1).unwrap()) + .build(), + ); + h1.play(); + + h2.set_src_caps( + gst::Caps::builder("audio/mpeg") + .field("mpegversion", 4i32) + .field("channels", 1i32) + .field("rate", 44100i32) + .field("stream-format", "raw") + .field("base-profile", "lc") + .field("profile", "lc") + .field("level", "2") + .field( + "codec_data", + gst::Buffer::from_slice([0x12, 0x08, 0x56, 0xe5, 0x00]), + ) + .build(), + ); + h2.play(); + + let output_offset = (60 * 60 * 1000).seconds(); + + // Push 7 buffers of 1s each, 1st and last buffer without DELTA_UNIT flag + for i in 0..7 { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(i.seconds()); + buffer.set_dts(i.seconds()); + buffer.set_duration(gst::ClockTime::SECOND); + if i != 0 && i != 5 { + buffer.set_flags(gst::BufferFlags::DELTA_UNIT); + } + } + assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok)); + + // Replace buffer 3 and 6 with a gap event + if i == 3 || i == 6 { + let ev = gst::event::Gap::builder(i.seconds()) + .duration(gst::ClockTime::SECOND) + .build(); + assert!(h2.push_event(ev)); + } else { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(i.seconds()); + buffer.set_dts(i.seconds()); + buffer.set_duration(gst::ClockTime::SECOND); + } + assert_eq!(h2.push(buffer), Ok(gst::FlowSuccess::Ok)); + } + + if i == 2 { + let ev = loop { + let ev = h1.pull_upstream_event().unwrap(); + if ev.type_() != gst::EventType::Reconfigure + && ev.type_() != gst::EventType::Latency + { + break ev; + } + }; + + assert_eq!(ev.type_(), gst::EventType::CustomUpstream); + assert_eq!( + gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(), + gst_video::UpstreamForceKeyUnitEvent { + running_time: Some(5.seconds()), + all_headers: true, + count: 0 + } + ); + + let ev = loop { + let ev = h2.pull_upstream_event().unwrap(); + if ev.type_() != gst::EventType::Reconfigure + && ev.type_() != gst::EventType::Latency + { + break ev; + } + }; + + assert_eq!(ev.type_(), gst::EventType::CustomUpstream); + assert_eq!( + gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(), + gst_video::UpstreamForceKeyUnitEvent { + running_time: Some(5.seconds()), + all_headers: true, + count: 0 + } + ); + } + } + + // Advance time and crank the clock: this should bring us to the end of the first fragment + h1.set_time(5.seconds()).unwrap(); + h1.crank_single_clock_wait().unwrap(); + + let header = h1.pull().unwrap(); + assert_eq!( + header.flags(), + gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT + ); + assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset)); + assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset)); + + let fragment_header = h1.pull().unwrap(); + assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER); + assert_eq!( + fragment_header.pts(), + Some(gst::ClockTime::ZERO + output_offset) + ); + assert_eq!( + fragment_header.dts(), + Some(gst::ClockTime::ZERO + output_offset) + ); + assert_eq!(fragment_header.duration(), Some(5.seconds())); + + for i in 0..5 { + for j in 0..2 { + // Skip gap events that don't result in buffers + if j == 1 && i == 3 { + continue; + } + + let buffer = h1.pull().unwrap(); + if i == 4 && j == 1 { + assert_eq!( + buffer.flags(), + gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER + ); + } else { + assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); + } + + assert_eq!(buffer.pts(), Some(i.seconds() + output_offset)); + + if j == 0 { + assert_eq!(buffer.dts(), Some(i.seconds() + output_offset)); + } else { + assert!(buffer.dts().is_none()); + } + assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND)); + } + } + + h1.push_event(gst::event::Eos::new()); + h2.push_event(gst::event::Eos::new()); + + let fragment_header = h1.pull().unwrap(); + assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER); + assert_eq!(fragment_header.pts(), Some(5.seconds() + output_offset)); + assert_eq!(fragment_header.dts(), Some(5.seconds() + output_offset)); + assert_eq!(fragment_header.duration(), Some(2.seconds())); + + for i in 5..7 { + for j in 0..2 { + // Skip gap events that don't result in buffers + if j == 1 && i == 6 { + continue; + } + + let buffer = h1.pull().unwrap(); + if i == 6 && j == 0 { + assert_eq!( + buffer.flags(), + gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER + ); + } else { + assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); + } + assert_eq!(buffer.pts(), Some(i.seconds() + output_offset)); + if j == 0 { + assert_eq!(buffer.dts(), Some(i.seconds() + output_offset)); + } else { + assert!(buffer.dts().is_none()); + } + assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND)); + } + } + + let ev = h1.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::StreamStart); + let ev = h1.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Caps); + let ev = h1.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Segment); + let ev = h1.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Eos); +} + +#[test] +fn test_single_stream_short_gops() { + init(); + + let mut h = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src")); + + // 5s fragment duration + h.element() + .unwrap() + .set_property("fragment-duration", 5.seconds()); + + h.set_src_caps( + gst::Caps::builder("video/x-h264") + .field("width", 1920i32) + .field("height", 1080i32) + .field("framerate", gst::Fraction::new(30, 1)) + .field("stream-format", "avc") + .field("alignment", "au") + .field("codec_data", gst::Buffer::with_size(1).unwrap()) + .build(), + ); + h.play(); + + let output_offset = (60 * 60 * 1000).seconds(); + + // Push 8 buffers of 1s each, 1st, 4th and 7th buffer without DELTA_UNIT flag + for i in 0..8 { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(i.seconds()); + buffer.set_dts(i.seconds()); + buffer.set_duration(gst::ClockTime::SECOND); + if i != 0 && i != 3 && i != 6 { + buffer.set_flags(gst::BufferFlags::DELTA_UNIT); + } + } + assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok)); + + if i == 2 || i == 7 { + let ev = loop { + let ev = h.pull_upstream_event().unwrap(); + if ev.type_() != gst::EventType::Reconfigure + && ev.type_() != gst::EventType::Latency + { + break ev; + } + }; + + let fku_time = if i == 2 { 5.seconds() } else { 8.seconds() }; + + assert_eq!(ev.type_(), gst::EventType::CustomUpstream); + assert_eq!( + gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(), + gst_video::UpstreamForceKeyUnitEvent { + running_time: Some(fku_time), + all_headers: true, + count: 0 + } + ); + } + } + + let header = h.pull().unwrap(); + assert_eq!( + header.flags(), + gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT + ); + assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset)); + assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset)); + + let fragment_header = h.pull().unwrap(); + assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER); + assert_eq!( + fragment_header.pts(), + Some(gst::ClockTime::ZERO + output_offset) + ); + assert_eq!( + fragment_header.dts(), + Some(gst::ClockTime::ZERO + output_offset) + ); + assert_eq!(fragment_header.duration(), Some(3.seconds())); + + for i in 0..3 { + let buffer = h.pull().unwrap(); + if i == 2 { + assert_eq!( + buffer.flags(), + gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER + ); + } else { + assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); + } + assert_eq!(buffer.pts(), Some(i.seconds() + output_offset)); + assert_eq!(buffer.dts(), Some(i.seconds() + output_offset)); + assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND)); + } + + h.push_event(gst::event::Eos::new()); + + let fragment_header = h.pull().unwrap(); + assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER); + assert_eq!(fragment_header.pts(), Some(3.seconds() + output_offset)); + assert_eq!(fragment_header.dts(), Some(3.seconds() + output_offset)); + assert_eq!(fragment_header.duration(), Some(5.seconds())); + + for i in 3..8 { + let buffer = h.pull().unwrap(); + if i == 7 { + assert_eq!( + buffer.flags(), + gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER + ); + } else { + assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); + } + assert_eq!(buffer.pts(), Some(i.seconds() + output_offset)); + assert_eq!(buffer.dts(), Some(i.seconds() + output_offset)); + assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND)); + } + + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::StreamStart); + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Caps); + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Segment); + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Eos); +} + +#[test] +fn test_single_stream_long_gops() { + init(); + + let mut h = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src")); + + // 5s fragment duration + h.element() + .unwrap() + .set_property("fragment-duration", 5.seconds()); + + h.set_src_caps( + gst::Caps::builder("video/x-h264") + .field("width", 1920i32) + .field("height", 1080i32) + .field("framerate", gst::Fraction::new(30, 1)) + .field("stream-format", "avc") + .field("alignment", "au") + .field("codec_data", gst::Buffer::with_size(1).unwrap()) + .build(), + ); + h.play(); + + let output_offset = (60 * 60 * 1000).seconds(); + + // Push 10 buffers of 1s each, 1st and 7th buffer without DELTA_UNIT flag + for i in 0..10 { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(i.seconds()); + buffer.set_dts(i.seconds()); + buffer.set_duration(gst::ClockTime::SECOND); + if i != 0 && i != 6 { + buffer.set_flags(gst::BufferFlags::DELTA_UNIT); + } + } + assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok)); + + if i == 2 || i == 7 { + let ev = loop { + let ev = h.pull_upstream_event().unwrap(); + if ev.type_() != gst::EventType::Reconfigure + && ev.type_() != gst::EventType::Latency + { + break ev; + } + }; + + let fku_time = if i == 2 { 5.seconds() } else { 11.seconds() }; + + assert_eq!(ev.type_(), gst::EventType::CustomUpstream); + assert_eq!( + gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(), + gst_video::UpstreamForceKeyUnitEvent { + running_time: Some(fku_time), + all_headers: true, + count: 0 + } + ); + } + } + + let header = h.pull().unwrap(); + assert_eq!( + header.flags(), + gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT + ); + assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset)); + assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset)); + + let fragment_header = h.pull().unwrap(); + assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER); + assert_eq!( + fragment_header.pts(), + Some(gst::ClockTime::ZERO + output_offset) + ); + assert_eq!( + fragment_header.dts(), + Some(gst::ClockTime::ZERO + output_offset) + ); + assert_eq!(fragment_header.duration(), Some(6.seconds())); + + for i in 0..6 { + let buffer = h.pull().unwrap(); + if i == 5 { + assert_eq!( + buffer.flags(), + gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER + ); + } else { + assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); + } + assert_eq!(buffer.pts(), Some(i.seconds() + output_offset)); + assert_eq!(buffer.dts(), Some(i.seconds() + output_offset)); + assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND)); + } + + h.push_event(gst::event::Eos::new()); + + let fragment_header = h.pull().unwrap(); + assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER); + assert_eq!(fragment_header.pts(), Some(6.seconds() + output_offset)); + assert_eq!(fragment_header.dts(), Some(6.seconds() + output_offset)); + assert_eq!(fragment_header.duration(), Some(4.seconds())); + + for i in 6..10 { + let buffer = h.pull().unwrap(); + if i == 9 { + assert_eq!( + buffer.flags(), + gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER + ); + } else { + assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); + } + assert_eq!(buffer.pts(), Some(i.seconds() + output_offset)); + assert_eq!(buffer.dts(), Some(i.seconds() + output_offset)); + assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND)); + } + + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::StreamStart); + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Caps); + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Segment); + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Eos); +} + +#[test] +fn test_buffer_multi_stream_short_gops() { + init(); + + let mut h1 = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src")); + let mut h2 = gst_check::Harness::with_element(&h1.element().unwrap(), Some("sink_1"), None); + + // 5s fragment duration + h1.element() + .unwrap() + .set_property("fragment-duration", 5.seconds()); + + h1.set_src_caps( + gst::Caps::builder("video/x-h264") + .field("width", 1920i32) + .field("height", 1080i32) + .field("framerate", gst::Fraction::new(30, 1)) + .field("stream-format", "avc") + .field("alignment", "au") + .field("codec_data", gst::Buffer::with_size(1).unwrap()) + .build(), + ); + h1.play(); + + h2.set_src_caps( + gst::Caps::builder("audio/mpeg") + .field("mpegversion", 4i32) + .field("channels", 1i32) + .field("rate", 44100i32) + .field("stream-format", "raw") + .field("base-profile", "lc") + .field("profile", "lc") + .field("level", "2") + .field( + "codec_data", + gst::Buffer::from_slice([0x12, 0x08, 0x56, 0xe5, 0x00]), + ) + .build(), + ); + h2.play(); + + let output_offset = (60 * 60 * 1000).seconds(); + + // Push 8 buffers of 1s each, 1st, 4th and 7th buffer without DELTA_UNIT flag + for i in 0..8 { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(i.seconds()); + buffer.set_dts(i.seconds()); + buffer.set_duration(gst::ClockTime::SECOND); + if i != 0 && i != 3 && i != 6 { + buffer.set_flags(gst::BufferFlags::DELTA_UNIT); + } + } + assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok)); + + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(i.seconds()); + buffer.set_dts(i.seconds()); + buffer.set_duration(gst::ClockTime::SECOND); + } + assert_eq!(h2.push(buffer), Ok(gst::FlowSuccess::Ok)); + + if i == 2 || i == 7 { + let ev = loop { + let ev = h1.pull_upstream_event().unwrap(); + if ev.type_() != gst::EventType::Reconfigure + && ev.type_() != gst::EventType::Latency + { + break ev; + } + }; + + let fku_time = if i == 2 { 5.seconds() } else { 8.seconds() }; + + assert_eq!(ev.type_(), gst::EventType::CustomUpstream); + assert_eq!( + gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(), + gst_video::UpstreamForceKeyUnitEvent { + running_time: Some(fku_time), + all_headers: true, + count: 0 + } + ); + + let ev = loop { + let ev = h2.pull_upstream_event().unwrap(); + if ev.type_() != gst::EventType::Reconfigure + && ev.type_() != gst::EventType::Latency + { + break ev; + } + }; + + assert_eq!(ev.type_(), gst::EventType::CustomUpstream); + assert_eq!( + gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(), + gst_video::UpstreamForceKeyUnitEvent { + running_time: Some(fku_time), + all_headers: true, + count: 0 + } + ); + } + } + + let header = h1.pull().unwrap(); + assert_eq!( + header.flags(), + gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT + ); + assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset)); + assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset)); + + let fragment_header = h1.pull().unwrap(); + assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER); + assert_eq!( + fragment_header.pts(), + Some(gst::ClockTime::ZERO + output_offset) + ); + assert_eq!( + fragment_header.dts(), + Some(gst::ClockTime::ZERO + output_offset) + ); + assert_eq!(fragment_header.duration(), Some(3.seconds())); + + for i in 0..3 { + for j in 0..2 { + let buffer = h1.pull().unwrap(); + if i == 2 && j == 1 { + assert_eq!( + buffer.flags(), + gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER + ); + } else { + assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); + } + + assert_eq!(buffer.pts(), Some(i.seconds() + output_offset)); + + if j == 0 { + assert_eq!(buffer.dts(), Some(i.seconds() + output_offset)); + } else { + assert!(buffer.dts().is_none()); + } + assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND)); + } + } + + h1.push_event(gst::event::Eos::new()); + h2.push_event(gst::event::Eos::new()); + + let fragment_header = h1.pull().unwrap(); + assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER); + assert_eq!(fragment_header.pts(), Some(3.seconds() + output_offset)); + assert_eq!(fragment_header.dts(), Some(3.seconds() + output_offset)); + assert_eq!(fragment_header.duration(), Some(5.seconds())); + + for i in 3..8 { + for j in 0..2 { + let buffer = h1.pull().unwrap(); + if i == 7 && j == 1 { + assert_eq!( + buffer.flags(), + gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER + ); + } else { + assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); + } + assert_eq!(buffer.pts(), Some(i.seconds() + output_offset)); + if j == 0 { + assert_eq!(buffer.dts(), Some(i.seconds() + output_offset)); + } else { + assert!(buffer.dts().is_none()); + } + assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND)); + } + } + + let ev = h1.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::StreamStart); + let ev = h1.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Caps); + let ev = h1.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Segment); + let ev = h1.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Eos); +} |