diff options
author | Sebastian Dröge <sebastian@centricular.com> | 2022-10-23 18:23:45 +0300 |
---|---|---|
committer | Sebastian Dröge <sebastian@centricular.com> | 2022-10-23 20:25:08 +0300 |
commit | 211cd095d69726a3a2208feddd921d05b60c6540 (patch) | |
tree | 648bbbe80a68a68b3f6315fcf580cae146d0249e /generic | |
parent | 5d44e0eb3c309ce7ad0cfb378d0169d8ce3305b3 (diff) |
Add new mux subdirectory for container formats
Contains the (incomplete) flavors FLV demuxer and the fragmented MP4
muxer for now.
Fixes https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/173
Diffstat (limited to 'generic')
-rw-r--r-- | generic/fmp4/Cargo.toml | 52 | ||||
l--------- | generic/fmp4/LICENSE | 1 | ||||
-rw-r--r-- | generic/fmp4/build.rs | 3 | ||||
-rw-r--r-- | generic/fmp4/examples/dash_vod.rs | 266 | ||||
-rw-r--r-- | generic/fmp4/examples/hls_live.rs | 572 | ||||
-rw-r--r-- | generic/fmp4/examples/hls_vod.rs | 472 | ||||
-rw-r--r-- | generic/fmp4/src/fmp4mux/boxes.rs | 2073 | ||||
-rw-r--r-- | generic/fmp4/src/fmp4mux/imp.rs | 2596 | ||||
-rw-r--r-- | generic/fmp4/src/fmp4mux/mod.rs | 146 | ||||
-rw-r--r-- | generic/fmp4/src/lib.rs | 34 | ||||
-rw-r--r-- | generic/fmp4/tests/tests.rs | 1241 |
11 files changed, 0 insertions, 7456 deletions
diff --git a/generic/fmp4/Cargo.toml b/generic/fmp4/Cargo.toml deleted file mode 100644 index f72c84a7..00000000 --- a/generic/fmp4/Cargo.toml +++ /dev/null @@ -1,52 +0,0 @@ -[package] -name = "gst-plugin-fmp4" -version = "0.9.0-alpha.1" -authors = ["Sebastian Dröge <sebastian@centricular.com>"] -license = "MPL-2.0" -description = "GStreamer Fragmented MP4 Plugin" -repository = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" -edition = "2021" -rust-version = "1.63" - -[dependencies] -anyhow = "1" -gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } -gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } -gst-audio = { package = "gstreamer-audio", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } -gst-video = { package = "gstreamer-video", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } -once_cell = "1.0" -uuid = { version = "1", features = ["v4"] } - -[lib] -name = "gstfmp4" -crate-type = ["cdylib", "rlib"] -path = "src/lib.rs" - -[dev-dependencies] -gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_18"] } -gst-check = { package = "gstreamer-check", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_18"] } -gst-pbutils = { package = "gstreamer-pbutils", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] } -m3u8-rs = "5.0" -chrono = "0.4" - -[build-dependencies] -gst-plugin-version-helper = { path="../../version-helper" } - -[features] -default = ["v1_18"] -static = [] -capi = [] -v1_18 = ["gst-video/v1_18"] - -[package.metadata.capi] -min_version = "0.8.0" - -[package.metadata.capi.header] -enabled = false - -[package.metadata.capi.library] -install_subdir = "gstreamer-1.0" -versioning = false - -[package.metadata.capi.pkg_config] -requires_private = "gstreamer-1.0, gstreamer-base-1.0, gstreamer-audio-1.0, gstreamer-video-1.0, gobject-2.0, glib-2.0, gmodule-2.0" diff --git a/generic/fmp4/LICENSE b/generic/fmp4/LICENSE deleted file mode 120000 index eb5d24fe..00000000 --- a/generic/fmp4/LICENSE +++ /dev/null @@ -1 +0,0 @@ -../../LICENSE-MPL-2.0
\ No newline at end of file diff --git a/generic/fmp4/build.rs b/generic/fmp4/build.rs deleted file mode 100644 index cda12e57..00000000 --- a/generic/fmp4/build.rs +++ /dev/null @@ -1,3 +0,0 @@ -fn main() { - gst_plugin_version_helper::info() -} diff --git a/generic/fmp4/examples/dash_vod.rs b/generic/fmp4/examples/dash_vod.rs deleted file mode 100644 index eb291d6e..00000000 --- a/generic/fmp4/examples/dash_vod.rs +++ /dev/null @@ -1,266 +0,0 @@ -// Copyright (C) 2021 Sebastian Dröge <sebastian@centricular.com> -// -// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. -// If a copy of the MPL was not distributed with this file, You can obtain one at -// <https://mozilla.org/MPL/2.0/>. -// -// SPDX-License-Identifier: MPL-2.0 - -// This creates a VoD DASH manifest based on the output of `cmafmux`. The media header -// ("initialization segment") is written into a separate file as the segments, and each segment is -// its own file too. -// -// All segments that are created are exactly 10s, expect for the last one which is only 3.333s. - -use gst::prelude::*; - -use std::fmt::Write; -use std::path::PathBuf; -use std::sync::{Arc, Mutex}; - -use anyhow::Error; - -struct Segment { - start_time: gst::ClockTime, - duration: gst::ClockTime, -} - -struct State { - start_time: Option<gst::ClockTime>, - end_time: Option<gst::ClockTime>, - segments: Vec<Segment>, - path: PathBuf, -} - -fn main() -> Result<(), Error> { - gst::init()?; - - gstfmp4::plugin_register_static()?; - - let state = Arc::new(Mutex::new(State { - start_time: None, - end_time: None, - segments: Vec::new(), - path: PathBuf::from("dash_stream"), - })); - - let pipeline = gst::parse_launch("videotestsrc num-buffers=2500 ! timecodestamper ! video/x-raw,format=I420,width=1280,height=720,framerate=30/1 ! timeoverlay ! x264enc bframes=0 bitrate=2048 ! video/x-h264,profile=main ! cmafmux fragment-duration=10000000000 header-update-mode=update write-mehd=true ! appsink name=sink").unwrap().downcast::<gst::Pipeline>().unwrap(); - - let sink = pipeline - .by_name("sink") - .unwrap() - .dynamic_cast::<gst_app::AppSink>() - .unwrap(); - sink.set_buffer_list(true); - - let state_clone = state.clone(); - sink.set_callbacks( - gst_app::AppSinkCallbacks::builder() - .new_sample(move |sink| { - let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?; - let mut state = state.lock().unwrap(); - - // The muxer only outputs non-empty buffer lists - let mut buffer_list = sample.buffer_list_owned().expect("no buffer list"); - assert!(!buffer_list.is_empty()); - - let mut first = buffer_list.get(0).unwrap(); - - // Each list contains a full segment, i.e. does not start with a DELTA_UNIT - assert!(!first.flags().contains(gst::BufferFlags::DELTA_UNIT)); - - // If the buffer has the DISCONT and HEADER flag set then it contains the media - // header, i.e. the `ftyp`, `moov` and other media boxes. - // - // This might be the initial header or the updated header at the end of the stream. - if first.flags().contains(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER) { - let mut path = state.path.clone(); - std::fs::create_dir_all(&path).expect("failed to create directory"); - path.push("init.cmfi"); - - println!("writing header to {}", path.display()); - let map = first.map_readable().unwrap(); - std::fs::write(path, &map).expect("failed to write header"); - drop(map); - - // Remove the header from the buffer list - buffer_list.make_mut().remove(0, 1); - - // If the list is now empty then it only contained the media header and nothing - // else. - if buffer_list.is_empty() { - return Ok(gst::FlowSuccess::Ok); - } - - // Otherwise get the next buffer and continue working with that. - first = buffer_list.get(0).unwrap(); - } - - // If the buffer only has the HEADER flag set then this is a segment header that is - // followed by one or more actual media buffers. - assert!(first.flags().contains(gst::BufferFlags::HEADER)); - - let segment = sample.segment().expect("no segment") - .downcast_ref::<gst::ClockTime>().expect("no time segment"); - - // Initialize the start time with the first PTS we observed. This will be used - // later for calculating the duration of the whole media for the DASH manifest. - // - // The PTS of the segment header is equivalent to the earliest PTS of the whole - // segment. - let pts = segment.to_running_time(first.pts().unwrap()).expect("can't get running time"); - if state.start_time.is_none() { - state.start_time = Some(pts); - } - - // The metadata of the first media buffer is duplicated to the segment header. - // Based on this we can know the timecode of the first frame in this segment. - let meta = first.meta::<gst_video::VideoTimeCodeMeta>().expect("no timecode meta"); - - let mut path = state.path.clone(); - path.push(format!("segment_{}.cmfv", state.segments.len() + 1)); - println!("writing segment with timecode {} to {}", meta.tc(), path.display()); - - // Calculate the end time at this point. The duration of the segment header is set - // to the whole duration of this segment. - let duration = first.duration().unwrap(); - let end_time = first.pts().unwrap() + first.duration().unwrap(); - state.end_time = Some(segment.to_running_time(end_time).expect("can't get running time")); - - let mut file = std::fs::File::create(path).expect("failed to open fragment"); - for buffer in &*buffer_list { - use std::io::prelude::*; - - let map = buffer.map_readable().unwrap(); - file.write_all(&map).expect("failed to write fragment"); - } - - state.segments.push(Segment { - start_time: pts, - duration, - }); - - Ok(gst::FlowSuccess::Ok) - }) - .eos(move |_sink| { - let state = state_clone.lock().unwrap(); - - // Now write the manifest - let mut path = state.path.clone(); - path.push("manifest.mpd"); - - println!("writing manifest to {}", path.display()); - - let duration = state.end_time.opt_checked_sub(state.start_time).ok().flatten().unwrap().mseconds() as f64 / 1000.0; - - // Write the whole segment timeline out here, compressing multiple segments with - // the same duration to a repeated segment. - let mut segment_timeline = String::new(); - let mut write_segment = |start: gst::ClockTime, duration: gst::ClockTime, repeat: usize| { - if repeat > 0 { - writeln!( - &mut segment_timeline, - " <S t=\"{time}\" d=\"{duration}\" r=\"{repeat}\" />", - time = start.mseconds(), - duration = duration.mseconds(), - repeat = repeat - ).unwrap(); - } else { - writeln!( - &mut segment_timeline, - " <S t=\"{time}\" d=\"{duration}\" />", - time = start.mseconds(), - duration = duration.mseconds() - ).unwrap(); - } - }; - - let mut start = None; - let mut num_segments = 0; - let mut last_duration = None; - for segment in &state.segments { - if start.is_none() { - start = Some(segment.start_time); - } - if last_duration.is_none() { - last_duration = Some(segment.duration); - } - - // If the duration of this segment is different from the previous one then we - // have to write out the segment now. - if last_duration != Some(segment.duration) { - write_segment(start.unwrap(), last_duration.unwrap(), num_segments - 1); - start = Some(segment.start_time); - last_duration = Some(segment.duration); - num_segments = 1; - } else { - num_segments += 1; - } - } - - // Write the last segment if any - if num_segments > 0 { - write_segment(start.unwrap(), last_duration.unwrap(), num_segments - 1); - } - - let manifest = format!(r###"<?xml version="1.0" encoding="UTF-8"?> -<MPD - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xmlns="urn:mpeg:dash:schema:mpd:2011" - xsi:schemaLocation="urn:mpeg:dash:schema:mpd:2011 DASH-MPD.xsd" - type="static" - mediaPresentationDuration="PT{duration:.3}S" - profiles="urn:mpeg:dash:profile:isoff-on-demand:2011"> - <Period> - <AdaptationSet mimeType="video/mp4" codecs="avc1.4d0228" frameRate="30/1" segmentAlignment="true" startWithSAP="1"> - <Representation id="A" bandwidth="2048000" with="1280" height="720"> - <SegmentTemplate timescale="1000" initialization="init.cmfi" media="segment_$Number$.cmfv"> - <SegmentTimeline> -{segment_timeline} </SegmentTimeline> - </SegmentTemplate> - </Representation> - </AdaptationSet> - </Period> -</MPD> -"###, - duration = duration, segment_timeline = segment_timeline); - - std::fs::write(path, &manifest).expect("failed to write manifest"); - }) - .build(), - ); - - pipeline.set_state(gst::State::Playing)?; - - let bus = pipeline - .bus() - .expect("Pipeline without bus. Shouldn't happen!"); - - for msg in bus.iter_timed(gst::ClockTime::NONE) { - use gst::MessageView; - - match msg.view() { - MessageView::Eos(..) => { - println!("EOS"); - break; - } - MessageView::Error(err) => { - pipeline.set_state(gst::State::Null)?; - eprintln!( - "Got error from {}: {} ({})", - msg.src() - .map(|s| String::from(s.path_string())) - .unwrap_or_else(|| "None".into()), - err.error(), - err.debug().unwrap_or_else(|| "".into()), - ); - break; - } - _ => (), - } - } - - pipeline.set_state(gst::State::Null)?; - - Ok(()) -} diff --git a/generic/fmp4/examples/hls_live.rs b/generic/fmp4/examples/hls_live.rs deleted file mode 100644 index f1d9ed6d..00000000 --- a/generic/fmp4/examples/hls_live.rs +++ /dev/null @@ -1,572 +0,0 @@ -// Copyright (C) 2022 Mathieu Duponchelle <mathieu@centricular.com> -// -// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. -// If a copy of the MPL was not distributed with this file, You can obtain one at -// <https://mozilla.org/MPL/2.0/>. -// -// SPDX-License-Identifier: MPL-2.0 - -// This creates a live HLS stream with one video playlist and two video playlists. -// Basic trimming is implemented - -use gst::prelude::*; - -use std::collections::VecDeque; -use std::path::{Path, PathBuf}; -use std::sync::{Arc, Mutex}; - -use anyhow::Error; -use chrono::{DateTime, Duration, Utc}; -use m3u8_rs::{ - AlternativeMedia, AlternativeMediaType, MasterPlaylist, MediaPlaylist, MediaSegment, - VariantStream, -}; - -struct State { - video_streams: Vec<VideoStream>, - audio_streams: Vec<AudioStream>, - all_mimes: Vec<String>, - path: PathBuf, - wrote_manifest: bool, -} - -impl State { - fn maybe_write_manifest(&mut self) { - if self.wrote_manifest { - return; - } - - if self.all_mimes.len() < self.video_streams.len() + self.audio_streams.len() { - return; - } - - let mut all_mimes = self.all_mimes.clone(); - all_mimes.sort(); - all_mimes.dedup(); - - let playlist = MasterPlaylist { - version: Some(7), - variants: self - .video_streams - .iter() - .map(|stream| { - let mut path = PathBuf::new(); - - path.push(&stream.name); - path.push("manifest.m3u8"); - - VariantStream { - uri: path.as_path().display().to_string(), - bandwidth: stream.bitrate, - codecs: Some(all_mimes.join(",")), - resolution: Some(m3u8_rs::Resolution { - width: stream.width, - height: stream.height, - }), - audio: Some("audio".to_string()), - ..Default::default() - } - }) - .collect(), - alternatives: self - .audio_streams - .iter() - .map(|stream| { - let mut path = PathBuf::new(); - path.push(&stream.name); - path.push("manifest.m3u8"); - - AlternativeMedia { - media_type: AlternativeMediaType::Audio, - uri: Some(path.as_path().display().to_string()), - group_id: "audio".to_string(), - language: Some(stream.lang.clone()), - name: stream.name.clone(), - default: stream.default, - autoselect: stream.default, - channels: Some("2".to_string()), - ..Default::default() - } - }) - .collect(), - independent_segments: true, - ..Default::default() - }; - - println!("Writing master manifest to {}", self.path.display()); - - let mut file = std::fs::File::create(&self.path).unwrap(); - playlist - .write_to(&mut file) - .expect("Failed to write master playlist"); - - self.wrote_manifest = true; - } -} - -struct Segment { - date_time: DateTime<Utc>, - duration: gst::ClockTime, - path: String, -} - -struct UnreffedSegment { - removal_time: DateTime<Utc>, - path: String, -} - -struct StreamState { - path: PathBuf, - segments: VecDeque<Segment>, - trimmed_segments: VecDeque<UnreffedSegment>, - start_date_time: Option<DateTime<Utc>>, - start_time: Option<gst::ClockTime>, - media_sequence: u64, - segment_index: u32, -} - -struct VideoStream { - name: String, - bitrate: u64, - width: u64, - height: u64, -} - -struct AudioStream { - name: String, - lang: String, - default: bool, - wave: String, -} - -fn trim_segments(state: &mut StreamState) { - // Arbitrary 5 segments window - while state.segments.len() > 5 { - let segment = state.segments.pop_front().unwrap(); - - state.media_sequence += 1; - - state.trimmed_segments.push_back(UnreffedSegment { - // HLS spec mandates that segments are removed from the filesystem no sooner - // than the duration of the longest playlist + duration of the segment. - // This is 15 seconds (12.5 + 2.5) in our case, we use 20 seconds to be on the - // safe side - removal_time: segment - .date_time - .checked_add_signed(Duration::seconds(20)) - .unwrap(), - path: segment.path.clone(), - }); - } - - while let Some(segment) = state.trimmed_segments.front() { - if segment.removal_time < state.segments.front().unwrap().date_time { - let segment = state.trimmed_segments.pop_front().unwrap(); - - let mut path = state.path.clone(); - path.push(segment.path); - println!("Removing {}", path.display()); - std::fs::remove_file(path).expect("Failed to remove old segment"); - } else { - break; - } - } -} - -fn update_manifest(state: &mut StreamState) { - // Now write the manifest - let mut path = state.path.clone(); - path.push("manifest.m3u8"); - - println!("writing manifest to {}", path.display()); - - trim_segments(state); - - let playlist = MediaPlaylist { - version: Some(7), - target_duration: 2.5, - media_sequence: state.media_sequence, - segments: state - .segments - .iter() - .enumerate() - .map(|(idx, segment)| MediaSegment { - uri: segment.path.to_string(), - duration: (segment.duration.nseconds() as f64 - / gst::ClockTime::SECOND.nseconds() as f64) as f32, - map: Some(m3u8_rs::Map { - uri: "init.cmfi".into(), - ..Default::default() - }), - program_date_time: if idx == 0 { - Some(segment.date_time.into()) - } else { - None - }, - ..Default::default() - }) - .collect(), - end_list: false, - playlist_type: None, - i_frames_only: false, - start: None, - independent_segments: true, - ..Default::default() - }; - - let mut file = std::fs::File::create(path).unwrap(); - playlist - .write_to(&mut file) - .expect("Failed to write media playlist"); -} - -fn setup_appsink(appsink: &gst_app::AppSink, name: &str, path: &Path, is_video: bool) { - let mut path: PathBuf = path.into(); - path.push(name); - - let state = Arc::new(Mutex::new(StreamState { - segments: VecDeque::new(), - trimmed_segments: VecDeque::new(), - path, - start_date_time: None, - start_time: gst::ClockTime::NONE, - media_sequence: 0, - segment_index: 0, - })); - - appsink.set_callbacks( - gst_app::AppSinkCallbacks::builder() - .new_sample(move |sink| { - let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?; - let mut state = state.lock().unwrap(); - - // The muxer only outputs non-empty buffer lists - let mut buffer_list = sample.buffer_list_owned().expect("no buffer list"); - assert!(!buffer_list.is_empty()); - - let mut first = buffer_list.get(0).unwrap(); - - // Each list contains a full segment, i.e. does not start with a DELTA_UNIT - assert!(!first.flags().contains(gst::BufferFlags::DELTA_UNIT)); - - // If the buffer has the DISCONT and HEADER flag set then it contains the media - // header, i.e. the `ftyp`, `moov` and other media boxes. - // - // This might be the initial header or the updated header at the end of the stream. - if first - .flags() - .contains(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER) - { - let mut path = state.path.clone(); - std::fs::create_dir_all(&path).expect("failed to create directory"); - path.push("init.cmfi"); - - println!("writing header to {}", path.display()); - let map = first.map_readable().unwrap(); - std::fs::write(path, &map).expect("failed to write header"); - drop(map); - - // Remove the header from the buffer list - buffer_list.make_mut().remove(0, 1); - - // If the list is now empty then it only contained the media header and nothing - // else. - if buffer_list.is_empty() { - return Ok(gst::FlowSuccess::Ok); - } - - // Otherwise get the next buffer and continue working with that. - first = buffer_list.get(0).unwrap(); - } - - // If the buffer only has the HEADER flag set then this is a segment header that is - // followed by one or more actual media buffers. - assert!(first.flags().contains(gst::BufferFlags::HEADER)); - - let mut path = state.path.clone(); - let basename = format!( - "segment_{}.{}", - state.segment_index, - if is_video { "cmfv" } else { "cmfa" } - ); - state.segment_index += 1; - path.push(&basename); - - let segment = sample - .segment() - .expect("no segment") - .downcast_ref::<gst::ClockTime>() - .expect("no time segment"); - let pts = segment - .to_running_time(first.pts().unwrap()) - .expect("can't get running time"); - - if state.start_time.is_none() { - state.start_time = Some(pts); - } - - if state.start_date_time.is_none() { - let now_utc = Utc::now(); - let now_gst = sink.clock().unwrap().time().unwrap(); - let pts_clock_time = pts + sink.base_time().unwrap(); - - let diff = now_gst.checked_sub(pts_clock_time).unwrap(); - let pts_utc = now_utc - .checked_sub_signed(Duration::nanoseconds(diff.nseconds() as i64)) - .unwrap(); - - state.start_date_time = Some(pts_utc); - } - - let duration = first.duration().unwrap(); - - let mut file = std::fs::File::create(&path).expect("failed to open fragment"); - for buffer in &*buffer_list { - use std::io::prelude::*; - - let map = buffer.map_readable().unwrap(); - file.write_all(&map).expect("failed to write fragment"); - } - - let date_time = state - .start_date_time - .unwrap() - .checked_add_signed(Duration::nanoseconds( - pts.opt_checked_sub(state.start_time) - .unwrap() - .unwrap() - .nseconds() as i64, - )) - .unwrap(); - - println!( - "wrote segment with date time {} to {}", - date_time, - path.display() - ); - - state.segments.push_back(Segment { - duration, - path: basename.to_string(), - date_time, - }); - - update_manifest(&mut state); - - Ok(gst::FlowSuccess::Ok) - }) - .eos(move |_sink| { - unreachable!(); - }) - .build(), - ); -} - -fn probe_encoder(state: Arc<Mutex<State>>, enc: gst::Element) { - enc.static_pad("src").unwrap().add_probe( - gst::PadProbeType::EVENT_DOWNSTREAM, - move |_pad, info| match info.data { - Some(gst::PadProbeData::Event(ref ev)) => match ev.view() { - gst::EventView::Caps(e) => { - let mime = gst_pbutils::codec_utils_caps_get_mime_codec(&e.caps().to_owned()); - - let mut state = state.lock().unwrap(); - state.all_mimes.push(mime.unwrap().into()); - state.maybe_write_manifest(); - gst::PadProbeReturn::Remove - } - _ => gst::PadProbeReturn::Ok, - }, - _ => gst::PadProbeReturn::Ok, - }, - ); -} - -impl VideoStream { - fn setup( - &self, - state: Arc<Mutex<State>>, - pipeline: &gst::Pipeline, - path: &Path, - ) -> Result<(), Error> { - let src = gst::ElementFactory::make("videotestsrc") - .property("is-live", true) - .build()?; - - let raw_capsfilter = gst::ElementFactory::make("capsfilter") - .property( - "caps", - gst_video::VideoCapsBuilder::new() - .format(gst_video::VideoFormat::I420) - .width(self.width as i32) - .height(self.height as i32) - .framerate(30.into()) - .build(), - ) - .build()?; - let timeoverlay = gst::ElementFactory::make("timeoverlay").build()?; - let enc = gst::ElementFactory::make("x264enc") - .property("bframes", 0u32) - .property("bitrate", self.bitrate as u32 / 1000u32) - .property_from_str("tune", "zerolatency") - .build()?; - let h264_capsfilter = gst::ElementFactory::make("capsfilter") - .property( - "caps", - gst::Caps::builder("video/x-h264") - .field("profile", "main") - .build(), - ) - .build()?; - let mux = gst::ElementFactory::make("cmafmux") - .property("fragment-duration", 2500.mseconds()) - .property_from_str("header-update-mode", "update") - .property("write-mehd", true) - .build()?; - let appsink = gst_app::AppSink::builder().buffer_list(true).build(); - - pipeline.add_many(&[ - &src, - &raw_capsfilter, - &timeoverlay, - &enc, - &h264_capsfilter, - &mux, - appsink.upcast_ref(), - ])?; - - gst::Element::link_many(&[ - &src, - &raw_capsfilter, - &timeoverlay, - &enc, - &h264_capsfilter, - &mux, - appsink.upcast_ref(), - ])?; - - probe_encoder(state, enc); - - setup_appsink(&appsink, &self.name, path, true); - - Ok(()) - } -} - -impl AudioStream { - fn setup( - &self, - state: Arc<Mutex<State>>, - pipeline: &gst::Pipeline, - path: &Path, - ) -> Result<(), Error> { - let src = gst::ElementFactory::make("audiotestsrc") - .property("is-live", true) - .property_from_str("wave", &self.wave) - .property("fragment-duration", 2500.mseconds()) - .build()?; - let enc = gst::ElementFactory::make("avenc_aac").build()?; - let mux = gst::ElementFactory::make("cmafmux") - .property_from_str("header-update-mode", "update") - .property("write-mehd", true) - .build()?; - let appsink = gst_app::AppSink::builder().buffer_list(true).build(); - - pipeline.add_many(&[&src, &enc, &mux, appsink.upcast_ref()])?; - - gst::Element::link_many(&[&src, &enc, &mux, appsink.upcast_ref()])?; - - probe_encoder(state, enc); - - setup_appsink(&appsink, &self.name, path, false); - - Ok(()) - } -} - -fn main() -> Result<(), Error> { - gst::init()?; - - gstfmp4::plugin_register_static()?; - - let path = PathBuf::from("hls_live_stream"); - - let pipeline = gst::Pipeline::default(); - - std::fs::create_dir_all(&path).expect("failed to create directory"); - - let mut manifest_path = path.clone(); - manifest_path.push("manifest.m3u8"); - - let state = Arc::new(Mutex::new(State { - video_streams: vec![VideoStream { - name: "video_0".to_string(), - bitrate: 2_048_000, - width: 1280, - height: 720, - }], - audio_streams: vec![ - AudioStream { - name: "audio_0".to_string(), - lang: "eng".to_string(), - default: true, - wave: "sine".to_string(), - }, - AudioStream { - name: "audio_1".to_string(), - lang: "fre".to_string(), - default: false, - wave: "white-noise".to_string(), - }, - ], - all_mimes: vec![], - path: manifest_path.clone(), - wrote_manifest: false, - })); - - { - let state_lock = state.lock().unwrap(); - - for stream in &state_lock.video_streams { - stream.setup(state.clone(), &pipeline, &path)?; - } - - for stream in &state_lock.audio_streams { - stream.setup(state.clone(), &pipeline, &path)?; - } - } - - pipeline.set_state(gst::State::Playing)?; - - let bus = pipeline - .bus() - .expect("Pipeline without bus. Shouldn't happen!"); - - for msg in bus.iter_timed(gst::ClockTime::NONE) { - use gst::MessageView; - - match msg.view() { - MessageView::Eos(..) => { - println!("EOS"); - break; - } - MessageView::Error(err) => { - pipeline.set_state(gst::State::Null)?; - eprintln!( - "Got error from {}: {} ({})", - msg.src() - .map(|s| String::from(s.path_string())) - .unwrap_or_else(|| "None".into()), - err.error(), - err.debug().unwrap_or_else(|| "".into()), - ); - break; - } - _ => (), - } - } - - pipeline.set_state(gst::State::Null)?; - - Ok(()) -} diff --git a/generic/fmp4/examples/hls_vod.rs b/generic/fmp4/examples/hls_vod.rs deleted file mode 100644 index 4a2533cd..00000000 --- a/generic/fmp4/examples/hls_vod.rs +++ /dev/null @@ -1,472 +0,0 @@ -// Copyright (C) 2022 Mathieu Duponchelle <mathieu@centricular.com> -// -// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. -// If a copy of the MPL was not distributed with this file, You can obtain one at -// <https://mozilla.org/MPL/2.0/>. -// -// SPDX-License-Identifier: MPL-2.0 - -// This creates a 10 second VOD HLS stream with one video playlist and two audio -// playlists. Each segment is 2.5 second long. - -use gst::prelude::*; - -use std::path::{Path, PathBuf}; -use std::sync::{Arc, Mutex}; - -use anyhow::Error; - -use m3u8_rs::{ - AlternativeMedia, AlternativeMediaType, MasterPlaylist, MediaPlaylist, MediaPlaylistType, - MediaSegment, VariantStream, -}; - -struct Segment { - duration: gst::ClockTime, - path: String, -} - -struct StreamState { - path: PathBuf, - segments: Vec<Segment>, -} - -struct VideoStream { - name: String, - bitrate: u64, - width: u64, - height: u64, -} - -struct AudioStream { - name: String, - lang: String, - default: bool, - wave: String, -} - -struct State { - video_streams: Vec<VideoStream>, - audio_streams: Vec<AudioStream>, - all_mimes: Vec<String>, - path: PathBuf, - wrote_manifest: bool, -} - -impl State { - fn maybe_write_manifest(&mut self) { - if self.wrote_manifest { - return; - } - - if self.all_mimes.len() < self.video_streams.len() + self.audio_streams.len() { - return; - } - - let mut all_mimes = self.all_mimes.clone(); - all_mimes.sort(); - all_mimes.dedup(); - - let playlist = MasterPlaylist { - version: Some(7), - variants: self - .video_streams - .iter() - .map(|stream| { - let mut path = PathBuf::new(); - - path.push(&stream.name); - path.push("manifest.m3u8"); - - VariantStream { - uri: path.as_path().display().to_string(), - bandwidth: stream.bitrate, - codecs: Some(all_mimes.join(",")), - resolution: Some(m3u8_rs::Resolution { - width: stream.width, - height: stream.height, - }), - audio: Some("audio".to_string()), - ..Default::default() - } - }) - .collect(), - alternatives: self - .audio_streams - .iter() - .map(|stream| { - let mut path = PathBuf::new(); - path.push(&stream.name); - path.push("manifest.m3u8"); - - AlternativeMedia { - media_type: AlternativeMediaType::Audio, - uri: Some(path.as_path().display().to_string()), - group_id: "audio".to_string(), - language: Some(stream.lang.clone()), - name: stream.name.clone(), - default: stream.default, - autoselect: stream.default, - channels: Some("2".to_string()), - ..Default::default() - } - }) - .collect(), - independent_segments: true, - ..Default::default() - }; - - println!("Writing master manifest to {}", self.path.display()); - - let mut file = std::fs::File::create(&self.path).unwrap(); - playlist - .write_to(&mut file) - .expect("Failed to write master playlist"); - - self.wrote_manifest = true; - } -} - -fn setup_appsink(appsink: &gst_app::AppSink, name: &str, path: &Path, is_video: bool) { - let mut path: PathBuf = path.into(); - path.push(name); - - let state = Arc::new(Mutex::new(StreamState { - segments: Vec::new(), - path, - })); - - let state_clone = state.clone(); - appsink.set_callbacks( - gst_app::AppSinkCallbacks::builder() - .new_sample(move |sink| { - let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?; - let mut state = state.lock().unwrap(); - - // The muxer only outputs non-empty buffer lists - let mut buffer_list = sample.buffer_list_owned().expect("no buffer list"); - assert!(!buffer_list.is_empty()); - - let mut first = buffer_list.get(0).unwrap(); - - // Each list contains a full segment, i.e. does not start with a DELTA_UNIT - assert!(!first.flags().contains(gst::BufferFlags::DELTA_UNIT)); - - // If the buffer has the DISCONT and HEADER flag set then it contains the media - // header, i.e. the `ftyp`, `moov` and other media boxes. - // - // This might be the initial header or the updated header at the end of the stream. - if first - .flags() - .contains(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER) - { - let mut path = state.path.clone(); - std::fs::create_dir_all(&path).expect("failed to create directory"); - path.push("init.cmfi"); - - println!("writing header to {}", path.display()); - let map = first.map_readable().unwrap(); - std::fs::write(path, &map).expect("failed to write header"); - drop(map); - - // Remove the header from the buffer list - buffer_list.make_mut().remove(0, 1); - - // If the list is now empty then it only contained the media header and nothing - // else. - if buffer_list.is_empty() { - return Ok(gst::FlowSuccess::Ok); - } - - // Otherwise get the next buffer and continue working with that. - first = buffer_list.get(0).unwrap(); - } - - // If the buffer only has the HEADER flag set then this is a segment header that is - // followed by one or more actual media buffers. - assert!(first.flags().contains(gst::BufferFlags::HEADER)); - - let mut path = state.path.clone(); - let basename = format!( - "segment_{}.{}", - state.segments.len() + 1, - if is_video { "cmfv" } else { "cmfa" } - ); - path.push(&basename); - println!("writing segment to {}", path.display()); - - let duration = first.duration().unwrap(); - - let mut file = std::fs::File::create(path).expect("failed to open fragment"); - for buffer in &*buffer_list { - use std::io::prelude::*; - - let map = buffer.map_readable().unwrap(); - file.write_all(&map).expect("failed to write fragment"); - } - - state.segments.push(Segment { - duration, - path: basename.to_string(), - }); - - Ok(gst::FlowSuccess::Ok) - }) - .eos(move |_sink| { - let state = state_clone.lock().unwrap(); - - // Now write the manifest - let mut path = state.path.clone(); - path.push("manifest.m3u8"); - - println!("writing manifest to {}", path.display()); - - let playlist = MediaPlaylist { - version: Some(7), - target_duration: 2.5, - media_sequence: 0, - segments: state - .segments - .iter() - .map(|segment| MediaSegment { - uri: segment.path.to_string(), - duration: (segment.duration.nseconds() as f64 - / gst::ClockTime::SECOND.nseconds() as f64) - as f32, - map: Some(m3u8_rs::Map { - uri: "init.cmfi".into(), - ..Default::default() - }), - ..Default::default() - }) - .collect(), - end_list: true, - playlist_type: Some(MediaPlaylistType::Vod), - i_frames_only: false, - start: None, - independent_segments: true, - ..Default::default() - }; - - let mut file = std::fs::File::create(path).unwrap(); - playlist - .write_to(&mut file) - .expect("Failed to write media playlist"); - }) - .build(), - ); -} - -fn probe_encoder(state: Arc<Mutex<State>>, enc: gst::Element) { - enc.static_pad("src").unwrap().add_probe( - gst::PadProbeType::EVENT_DOWNSTREAM, - move |_pad, info| match info.data { - Some(gst::PadProbeData::Event(ref ev)) => match ev.view() { - gst::EventView::Caps(e) => { - let mime = gst_pbutils::codec_utils_caps_get_mime_codec(&e.caps().to_owned()); - - let mut state = state.lock().unwrap(); - state.all_mimes.push(mime.unwrap().into()); - state.maybe_write_manifest(); - gst::PadProbeReturn::Remove - } - _ => gst::PadProbeReturn::Ok, - }, - _ => gst::PadProbeReturn::Ok, - }, - ); -} - -impl VideoStream { - fn setup( - &self, - state: Arc<Mutex<State>>, - pipeline: &gst::Pipeline, - path: &Path, - ) -> Result<(), Error> { - let src = gst::ElementFactory::make("videotestsrc") - .property("num-buffers", 300) - .build()?; - let raw_capsfilter = gst::ElementFactory::make("capsfilter") - .property( - "caps", - gst_video::VideoCapsBuilder::new() - .format(gst_video::VideoFormat::I420) - .width(self.width as i32) - .height(self.height as i32) - .framerate(30.into()) - .build(), - ) - .build()?; - let timeoverlay = gst::ElementFactory::make("timeoverlay").build()?; - let enc = gst::ElementFactory::make("x264enc") - .property("bframes", 0u32) - .property("bitrate", self.bitrate as u32 / 1000u32) - .build()?; - let h264_capsfilter = gst::ElementFactory::make("capsfilter") - .property( - "caps", - gst::Caps::builder("video/x-h264") - .field("profile", "main") - .build(), - ) - .build()?; - let mux = gst::ElementFactory::make("cmafmux") - .property("fragment-duration", 2500.mseconds()) - .property_from_str("header-update-mode", "update") - .property("write-mehd", true) - .build()?; - let appsink = gst_app::AppSink::builder().buffer_list(true).build(); - - pipeline.add_many(&[ - &src, - &raw_capsfilter, - &timeoverlay, - &enc, - &h264_capsfilter, - &mux, - appsink.upcast_ref(), - ])?; - - gst::Element::link_many(&[ - &src, - &raw_capsfilter, - &timeoverlay, - &enc, - &h264_capsfilter, - &mux, - appsink.upcast_ref(), - ])?; - - probe_encoder(state, enc); - - setup_appsink(&appsink, &self.name, path, true); - - Ok(()) - } -} - -impl AudioStream { - fn setup( - &self, - state: Arc<Mutex<State>>, - pipeline: &gst::Pipeline, - path: &Path, - ) -> Result<(), Error> { - let src = gst::ElementFactory::make("audiotestsrc") - .property("num-buffers", 100) - .property("samplesperbuffer", 4410) - .property_from_str("wave", &self.wave) - .build()?; - let raw_capsfilter = gst::ElementFactory::make("capsfilter") - .property( - "caps", - gst_audio::AudioCapsBuilder::new().rate(44100).build(), - ) - .build()?; - let enc = gst::ElementFactory::make("avenc_aac").build()?; - let mux = gst::ElementFactory::make("cmafmux") - .property("fragment-duration", 2500.mseconds()) - .property_from_str("header-update-mode", "update") - .property("write-mehd", true) - .build()?; - let appsink = gst_app::AppSink::builder().buffer_list(true).build(); - - pipeline.add_many(&[&src, &raw_capsfilter, &enc, &mux, appsink.upcast_ref()])?; - - gst::Element::link_many(&[&src, &raw_capsfilter, &enc, &mux, appsink.upcast_ref()])?; - - probe_encoder(state, enc); - - setup_appsink(&appsink, &self.name, path, false); - - Ok(()) - } -} - -fn main() -> Result<(), Error> { - gst::init()?; - - gstfmp4::plugin_register_static()?; - - let path = PathBuf::from("hls_vod_stream"); - - let pipeline = gst::Pipeline::default(); - - std::fs::create_dir_all(&path).expect("failed to create directory"); - - let mut manifest_path = path.clone(); - manifest_path.push("manifest.m3u8"); - - let state = Arc::new(Mutex::new(State { - video_streams: vec![VideoStream { - name: "video_0".to_string(), - bitrate: 2_048_000, - width: 1280, - height: 720, - }], - audio_streams: vec![ - AudioStream { - name: "audio_0".to_string(), - lang: "eng".to_string(), - default: true, - wave: "sine".to_string(), - }, - AudioStream { - name: "audio_1".to_string(), - lang: "fre".to_string(), - default: false, - wave: "white-noise".to_string(), - }, - ], - all_mimes: vec![], - path: manifest_path.clone(), - wrote_manifest: false, - })); - - { - let state_lock = state.lock().unwrap(); - - for stream in &state_lock.video_streams { - stream.setup(state.clone(), &pipeline, &path)?; - } - - for stream in &state_lock.audio_streams { - stream.setup(state.clone(), &pipeline, &path)?; - } - } - - pipeline.set_state(gst::State::Playing)?; - - let bus = pipeline - .bus() - .expect("Pipeline without bus. Shouldn't happen!"); - - for msg in bus.iter_timed(gst::ClockTime::NONE) { - use gst::MessageView; - - match msg.view() { - MessageView::Eos(..) => { - println!("EOS"); - break; - } - MessageView::Error(err) => { - pipeline.set_state(gst::State::Null)?; - eprintln!( - "Got error from {}: {} ({})", - msg.src() - .map(|s| String::from(s.path_string())) - .unwrap_or_else(|| "None".into()), - err.error(), - err.debug().unwrap_or_else(|| "".into()), - ); - break; - } - _ => (), - } - } - - pipeline.set_state(gst::State::Null)?; - - Ok(()) -} diff --git a/generic/fmp4/src/fmp4mux/boxes.rs b/generic/fmp4/src/fmp4mux/boxes.rs deleted file mode 100644 index 9b69fb36..00000000 --- a/generic/fmp4/src/fmp4mux/boxes.rs +++ /dev/null @@ -1,2073 +0,0 @@ -// Copyright (C) 2021 Sebastian Dröge <sebastian@centricular.com> -// -// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. -// If a copy of the MPL was not distributed with this file, You can obtain one at -// <https://mozilla.org/MPL/2.0/>. -// -// SPDX-License-Identifier: MPL-2.0 - -use gst::prelude::*; - -use anyhow::{anyhow, bail, Context, Error}; - -use super::Buffer; - -fn write_box<T, F: FnOnce(&mut Vec<u8>) -> Result<T, Error>>( - vec: &mut Vec<u8>, - fourcc: impl std::borrow::Borrow<[u8; 4]>, - content_func: F, -) -> Result<T, Error> { - // Write zero size ... - let size_pos = vec.len(); - vec.extend([0u8; 4]); - vec.extend(fourcc.borrow()); - - let res = content_func(vec)?; - - // ... and update it here later. - let size: u32 = vec - .len() - .checked_sub(size_pos) - .expect("vector shrunk") - .try_into() - .context("too big box content")?; - vec[size_pos..][..4].copy_from_slice(&size.to_be_bytes()); - - Ok(res) -} - -const FULL_BOX_VERSION_0: u8 = 0; -const FULL_BOX_VERSION_1: u8 = 1; - -const FULL_BOX_FLAGS_NONE: u32 = 0; - -fn write_full_box<T, F: FnOnce(&mut Vec<u8>) -> Result<T, Error>>( - vec: &mut Vec<u8>, - fourcc: impl std::borrow::Borrow<[u8; 4]>, - version: u8, - flags: u32, - content_func: F, -) -> Result<T, Error> { - write_box(vec, fourcc, move |vec| { - assert_eq!(flags >> 24, 0); - vec.extend(((u32::from(version) << 24) | flags).to_be_bytes()); - content_func(vec) - }) -} - -fn cmaf_brands_from_caps(caps: &gst::CapsRef, compatible_brands: &mut Vec<&'static [u8; 4]>) { - let s = caps.structure(0).unwrap(); - match s.name() { - "video/x-h264" => { - let width = s.get::<i32>("width").ok(); - let height = s.get::<i32>("height").ok(); - let fps = s.get::<gst::Fraction>("framerate").ok(); - let profile = s.get::<&str>("profile").ok(); - let level = s - .get::<&str>("level") - .ok() - .map(|l| l.split_once('.').unwrap_or((l, "0"))); - let colorimetry = s.get::<&str>("colorimetry").ok(); - - if let (Some(width), Some(height), Some(profile), Some(level), Some(fps)) = - (width, height, profile, level, fps) - { - if profile == "high" - || profile == "main" - || profile == "baseline" - || profile == "constrained-baseline" - { - if width <= 864 - && height <= 576 - && level <= ("3", "1") - && fps <= gst::Fraction::new(60, 1) - { - #[cfg(feature = "v1_18")] - { - if let Some(colorimetry) = colorimetry - .and_then(|c| c.parse::<gst_video::VideoColorimetry>().ok()) - { - if matches!( - colorimetry.primaries(), - gst_video::VideoColorPrimaries::Bt709 - | gst_video::VideoColorPrimaries::Bt470bg - | gst_video::VideoColorPrimaries::Smpte170m - ) && matches!( - colorimetry.transfer(), - gst_video::VideoTransferFunction::Bt709 - | gst_video::VideoTransferFunction::Bt601 - ) && matches!( - colorimetry.matrix(), - gst_video::VideoColorMatrix::Bt709 - | gst_video::VideoColorMatrix::Bt601 - ) { - compatible_brands.push(b"cfsd"); - } - } else { - // Assume it's OK - compatible_brands.push(b"cfsd"); - } - } - #[cfg(not(feature = "v1_18"))] - { - // Assume it's OK - compatible_brands.push(b"cfsd"); - } - } else if width <= 1920 - && height <= 1080 - && level <= ("4", "0") - && fps <= gst::Fraction::new(60, 1) - { - #[cfg(feature = "v1_18")] - { - if let Some(colorimetry) = colorimetry - .and_then(|c| c.parse::<gst_video::VideoColorimetry>().ok()) - { - if matches!( - colorimetry.primaries(), - gst_video::VideoColorPrimaries::Bt709 - ) && matches!( - colorimetry.transfer(), - gst_video::VideoTransferFunction::Bt709 - ) && matches!( - colorimetry.matrix(), - gst_video::VideoColorMatrix::Bt709 - ) { - compatible_brands.push(b"cfhd"); - } - } else { - // Assume it's OK - compatible_brands.push(b"cfhd"); - } - } - #[cfg(not(feature = "v1_18"))] - { - // Assume it's OK - compatible_brands.push(b"cfhd"); - } - } else if width <= 1920 - && height <= 1080 - && level <= ("4", "2") - && fps <= gst::Fraction::new(60, 1) - { - if let Some(colorimetry) = - colorimetry.and_then(|c| c.parse::<gst_video::VideoColorimetry>().ok()) - { - if matches!( - colorimetry.primaries(), - gst_video::VideoColorPrimaries::Bt709 - ) && matches!( - colorimetry.transfer(), - gst_video::VideoTransferFunction::Bt709 - ) && matches!( - colorimetry.matrix(), - gst_video::VideoColorMatrix::Bt709 - ) { - compatible_brands.push(b"chdf"); - } - } else { - // Assume it's OK - compatible_brands.push(b"chdf"); - } - } - } - } - } - "audio/mpeg" => { - compatible_brands.push(b"caac"); - } - "video/x-h265" => { - let width = s.get::<i32>("width").ok(); - let height = s.get::<i32>("height").ok(); - let fps = s.get::<gst::Fraction>("framerate").ok(); - let profile = s.get::<&str>("profile").ok(); - let tier = s.get::<&str>("tier").ok(); - let level = s - .get::<&str>("level") - .ok() - .map(|l| l.split_once('.').unwrap_or((l, "0"))); - let colorimetry = s.get::<&str>("colorimetry").ok(); - - if let (Some(width), Some(height), Some(profile), Some(tier), Some(level), Some(fps)) = - (width, height, profile, tier, level, fps) - { - if profile == "main" && tier == "main" { - if width <= 1920 - && height <= 1080 - && level <= ("4", "1") - && fps <= gst::Fraction::new(60, 1) - { - if let Some(colorimetry) = - colorimetry.and_then(|c| c.parse::<gst_video::VideoColorimetry>().ok()) - { - if matches!( - colorimetry.primaries(), - gst_video::VideoColorPrimaries::Bt709 - ) && matches!( - colorimetry.transfer(), - gst_video::VideoTransferFunction::Bt709 - ) && matches!( - colorimetry.matrix(), - gst_video::VideoColorMatrix::Bt709 - ) { - compatible_brands.push(b"chhd"); - } - } else { - // Assume it's OK - compatible_brands.push(b"chhd"); - } - } else if width <= 3840 - && height <= 2160 - && level <= ("5", "0") - && fps <= gst::Fraction::new(60, 1) - { - if let Some(colorimetry) = - colorimetry.and_then(|c| c.parse::<gst_video::VideoColorimetry>().ok()) - { - if matches!( - colorimetry.primaries(), - gst_video::VideoColorPrimaries::Bt709 - ) && matches!( - colorimetry.transfer(), - gst_video::VideoTransferFunction::Bt709 - ) && matches!( - colorimetry.matrix(), - gst_video::VideoColorMatrix::Bt709 - ) { - compatible_brands.push(b"cud8"); - } - } else { - // Assume it's OK - compatible_brands.push(b"cud8"); - } - } - } else if profile == "main-10" && tier == "main-10" { - if width <= 1920 - && height <= 1080 - && level <= ("4", "1") - && fps <= gst::Fraction::new(60, 1) - { - if let Some(colorimetry) = - colorimetry.and_then(|c| c.parse::<gst_video::VideoColorimetry>().ok()) - { - if matches!( - colorimetry.primaries(), - gst_video::VideoColorPrimaries::Bt709 - ) && matches!( - colorimetry.transfer(), - gst_video::VideoTransferFunction::Bt709 - ) && matches!( - colorimetry.matrix(), - gst_video::VideoColorMatrix::Bt709 - ) { - compatible_brands.push(b"chh1"); - } - } else { - // Assume it's OK - compatible_brands.push(b"chh1"); - } - } else if width <= 3840 - && height <= 2160 - && level <= ("5", "1") - && fps <= gst::Fraction::new(60, 1) - { - #[cfg(feature = "v1_18")] - if let Some(colorimetry) = - colorimetry.and_then(|c| c.parse::<gst_video::VideoColorimetry>().ok()) - { - if matches!( - colorimetry.primaries(), - gst_video::VideoColorPrimaries::Bt709 - | gst_video::VideoColorPrimaries::Bt2020 - ) && matches!( - colorimetry.transfer(), - gst_video::VideoTransferFunction::Bt709 - | gst_video::VideoTransferFunction::Bt202010 - | gst_video::VideoTransferFunction::Bt202012 - ) && matches!( - colorimetry.matrix(), - gst_video::VideoColorMatrix::Bt709 - | gst_video::VideoColorMatrix::Bt2020 - ) { - compatible_brands.push(b"cud1"); - } else if matches!( - colorimetry.primaries(), - gst_video::VideoColorPrimaries::Bt2020 - ) && matches!( - colorimetry.transfer(), - gst_video::VideoTransferFunction::Smpte2084 - ) && matches!( - colorimetry.matrix(), - gst_video::VideoColorMatrix::Bt2020 - ) { - compatible_brands.push(b"chd1"); - } else if matches!( - colorimetry.primaries(), - gst_video::VideoColorPrimaries::Bt2020 - ) && matches!( - colorimetry.transfer(), - gst_video::VideoTransferFunction::AribStdB67 - ) && matches!( - colorimetry.matrix(), - gst_video::VideoColorMatrix::Bt2020 - ) { - compatible_brands.push(b"clg1"); - } - } else { - // Assume it's OK - compatible_brands.push(b"cud1"); - } - } - #[cfg(not(feature = "v1_18"))] - { - // Assume it's OK - compatible_brands.push(b"cud1"); - } - } - } - } - _ => (), - } -} - -fn brands_from_variant_and_caps<'a>( - variant: super::Variant, - mut caps: impl Iterator<Item = &'a gst::Caps>, -) -> (&'static [u8; 4], Vec<&'static [u8; 4]>) { - match variant { - super::Variant::ISO | super::Variant::ONVIF => (b"iso6", vec![b"iso6"]), - super::Variant::DASH => { - // FIXME: `dsms` / `dash` brands, `msix` - (b"msdh", vec![b"dums", b"msdh", b"iso6"]) - } - super::Variant::CMAF => { - let mut compatible_brands = vec![b"iso6", b"cmfc"]; - - cmaf_brands_from_caps(caps.next().unwrap(), &mut compatible_brands); - assert_eq!(caps.next(), None); - - (b"cmf2", compatible_brands) - } - } -} - -/// Creates `ftyp` and `moov` boxes -pub(super) fn create_fmp4_header(cfg: super::HeaderConfiguration) -> Result<gst::Buffer, Error> { - let mut v = vec![]; - - let (brand, compatible_brands) = brands_from_variant_and_caps(cfg.variant, cfg.streams.iter()); - - write_box(&mut v, b"ftyp", |v| { - // major brand - v.extend(brand); - // minor version - v.extend(0u32.to_be_bytes()); - // compatible brands - v.extend(compatible_brands.into_iter().flatten()); - - Ok(()) - })?; - - write_box(&mut v, b"moov", |v| write_moov(v, &cfg))?; - - if cfg.variant == super::Variant::ONVIF { - write_full_box( - &mut v, - b"meta", - FULL_BOX_VERSION_0, - FULL_BOX_FLAGS_NONE, - |v| { - write_full_box(v, b"hdlr", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| { - // Handler type - v.extend(b"null"); - - // Reserved - v.extend([0u8; 3 * 4]); - - // Name - v.extend(b"MetadataHandler"); - - Ok(()) - })?; - - write_box(v, b"cstb", |v| { - // entry count - v.extend(1u32.to_be_bytes()); - - // track id - v.extend(0u32.to_be_bytes()); - - // start UTC time in 100ns units since Jan 1 1601 - v.extend(cfg.start_utc_time.unwrap().to_be_bytes()); - - Ok(()) - }) - }, - )?; - } - - Ok(gst::Buffer::from_mut_slice(v)) -} - -fn write_moov(v: &mut Vec<u8>, cfg: &super::HeaderConfiguration) -> Result<(), Error> { - use gst::glib; - - let base = glib::DateTime::from_utc(1904, 1, 1, 0, 0, 0.0)?; - let now = glib::DateTime::now_utc()?; - let creation_time = - u64::try_from(now.difference(&base).as_seconds()).expect("time before 1904"); - - write_full_box(v, b"mvhd", FULL_BOX_VERSION_1, FULL_BOX_FLAGS_NONE, |v| { - write_mvhd(v, cfg, creation_time) - })?; - for (idx, caps) in cfg.streams.iter().enumerate() { - write_box(v, b"trak", |v| { - let mut references = vec![]; - - // Reference the video track for ONVIF metadata tracks - if cfg.variant == super::Variant::ONVIF - && caps.structure(0).unwrap().name() == "application/x-onvif-metadata" - { - // Find the first video track - for (idx, caps) in cfg.streams.iter().enumerate() { - let s = caps.structure(0).unwrap(); - - if matches!(s.name(), "video/x-h264" | "video/x-h265" | "image/jpeg") { - references.push(TrackReference { - reference_type: *b"cdsc", - track_ids: vec![idx as u32 + 1], - }); - break; - } - } - } - - write_trak(v, cfg, idx, caps, creation_time, &references) - })?; - } - write_box(v, b"mvex", |v| write_mvex(v, cfg))?; - - Ok(()) -} - -fn caps_to_timescale(caps: &gst::CapsRef) -> u32 { - let s = caps.structure(0).unwrap(); - - if let Ok(fps) = s.get::<gst::Fraction>("framerate") { - if fps.numer() == 0 { - return 10_000; - } - - if fps.denom() != 1 && fps.denom() != 1001 { - if let Some(fps) = (fps.denom() as u64) - .nseconds() - .mul_div_round(1_000_000_000, fps.numer() as u64) - .and_then(gst_video::guess_framerate) - { - return (fps.numer() as u32) - .mul_div_round(100, fps.denom() as u32) - .unwrap_or(10_000); - } - } - - (fps.numer() as u32) - .mul_div_round(100, fps.denom() as u32) - .unwrap_or(10_000) - } else if let Ok(rate) = s.get::<i32>("rate") { - rate as u32 - } else { - 10_000 - } -} - -fn write_mvhd( - v: &mut Vec<u8>, - cfg: &super::HeaderConfiguration, - creation_time: u64, -) -> Result<(), Error> { - // Creation time - v.extend(creation_time.to_be_bytes()); - // Modification time - v.extend(creation_time.to_be_bytes()); - // Timescale: uses the reference track timescale - v.extend(caps_to_timescale(&cfg.streams[0]).to_be_bytes()); - // Duration - v.extend(0u64.to_be_bytes()); - - // Rate 1.0 - v.extend((1u32 << 16).to_be_bytes()); - // Volume 1.0 - v.extend((1u16 << 8).to_be_bytes()); - // Reserved - v.extend([0u8; 2 + 2 * 4]); - - // Matrix - v.extend( - [ - (1u32 << 16).to_be_bytes(), - 0u32.to_be_bytes(), - 0u32.to_be_bytes(), - 0u32.to_be_bytes(), - (1u32 << 16).to_be_bytes(), - 0u32.to_be_bytes(), - 0u32.to_be_bytes(), - 0u32.to_be_bytes(), - (16384u32 << 16).to_be_bytes(), - ] - .into_iter() - .flatten(), - ); - - // Pre defined - v.extend([0u8; 6 * 4]); - - // Next track id - v.extend((cfg.streams.len() as u32 + 1).to_be_bytes()); - - Ok(()) -} - -const TKHD_FLAGS_TRACK_ENABLED: u32 = 0x1; -const TKHD_FLAGS_TRACK_IN_MOVIE: u32 = 0x2; -const TKHD_FLAGS_TRACK_IN_PREVIEW: u32 = 0x4; - -struct TrackReference { - reference_type: [u8; 4], - track_ids: Vec<u32>, -} - -fn write_trak( - v: &mut Vec<u8>, - cfg: &super::HeaderConfiguration, - idx: usize, - caps: &gst::CapsRef, - creation_time: u64, - references: &[TrackReference], -) -> Result<(), Error> { - write_full_box( - v, - b"tkhd", - FULL_BOX_VERSION_1, - TKHD_FLAGS_TRACK_ENABLED | TKHD_FLAGS_TRACK_IN_MOVIE | TKHD_FLAGS_TRACK_IN_PREVIEW, - |v| write_tkhd(v, cfg, idx, caps, creation_time), - )?; - - // TODO: write edts if necessary: for audio tracks to remove initialization samples - // TODO: write edts optionally for negative DTS instead of offsetting the DTS - - write_box(v, b"mdia", |v| write_mdia(v, cfg, caps, creation_time))?; - - if !references.is_empty() { - write_box(v, b"tref", |v| write_tref(v, cfg, references))?; - } - - Ok(()) -} - -fn write_tkhd( - v: &mut Vec<u8>, - _cfg: &super::HeaderConfiguration, - idx: usize, - caps: &gst::CapsRef, - creation_time: u64, -) -> Result<(), Error> { - // Creation time - v.extend(creation_time.to_be_bytes()); - // Modification time - v.extend(creation_time.to_be_bytes()); - // Track ID - v.extend((idx as u32 + 1).to_be_bytes()); - // Reserved - v.extend(0u32.to_be_bytes()); - // Duration - v.extend(0u64.to_be_bytes()); - - // Reserved - v.extend([0u8; 2 * 4]); - - // Layer - v.extend(0u16.to_be_bytes()); - // Alternate group - v.extend(0u16.to_be_bytes()); - - // Volume - let s = caps.structure(0).unwrap(); - match s.name() { - "audio/mpeg" | "audio/x-alaw" | "audio/x-mulaw" | "audio/x-adpcm" => { - v.extend((1u16 << 8).to_be_bytes()) - } - _ => v.extend(0u16.to_be_bytes()), - } - - // Reserved - v.extend([0u8; 2]); - - // Matrix - v.extend( - [ - (1u32 << 16).to_be_bytes(), - 0u32.to_be_bytes(), - 0u32.to_be_bytes(), - 0u32.to_be_bytes(), - (1u32 << 16).to_be_bytes(), - 0u32.to_be_bytes(), - 0u32.to_be_bytes(), - 0u32.to_be_bytes(), - (16384u32 << 16).to_be_bytes(), - ] - .into_iter() - .flatten(), - ); - - // Width/height - match s.name() { - "video/x-h264" | "video/x-h265" | "image/jpeg" => { - let width = s.get::<i32>("width").context("video caps without width")? as u32; - let height = s - .get::<i32>("height") - .context("video caps without height")? as u32; - let par = s - .get::<gst::Fraction>("pixel-aspect-ratio") - .unwrap_or_else(|_| gst::Fraction::new(1, 1)); - - let width = std::cmp::min( - width - .mul_div_round(par.numer() as u32, par.denom() as u32) - .unwrap_or(u16::MAX as u32), - u16::MAX as u32, - ); - let height = std::cmp::min(height, u16::MAX as u32); - - v.extend((width << 16).to_be_bytes()); - v.extend((height << 16).to_be_bytes()); - } - _ => v.extend([0u8; 2 * 4]), - } - - Ok(()) -} - -fn write_mdia( - v: &mut Vec<u8>, - cfg: &super::HeaderConfiguration, - caps: &gst::CapsRef, - creation_time: u64, -) -> Result<(), Error> { - write_full_box(v, b"mdhd", FULL_BOX_VERSION_1, FULL_BOX_FLAGS_NONE, |v| { - write_mdhd(v, cfg, caps, creation_time) - })?; - write_full_box(v, b"hdlr", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| { - write_hdlr(v, cfg, caps) - })?; - - // TODO: write elng if needed - - write_box(v, b"minf", |v| write_minf(v, cfg, caps))?; - - Ok(()) -} - -fn write_tref( - v: &mut Vec<u8>, - _cfg: &super::HeaderConfiguration, - references: &[TrackReference], -) -> Result<(), Error> { - for reference in references { - write_box(v, &reference.reference_type, |v| { - for track_id in &reference.track_ids { - v.extend(track_id.to_be_bytes()); - } - - Ok(()) - })?; - } - - Ok(()) -} - -fn language_code(lang: impl std::borrow::Borrow<[u8; 3]>) -> u16 { - let lang = lang.borrow(); - - // TODO: Need to relax this once we get the language code from tags - assert!(lang.iter().all(u8::is_ascii_lowercase)); - - (((lang[0] as u16 - 0x60) & 0x1F) << 10) - + (((lang[1] as u16 - 0x60) & 0x1F) << 5) - + ((lang[2] as u16 - 0x60) & 0x1F) -} - -fn write_mdhd( - v: &mut Vec<u8>, - _cfg: &super::HeaderConfiguration, - caps: &gst::CapsRef, - creation_time: u64, -) -> Result<(), Error> { - // Creation time - v.extend(creation_time.to_be_bytes()); - // Modification time - v.extend(creation_time.to_be_bytes()); - // Timescale - v.extend(caps_to_timescale(caps).to_be_bytes()); - // Duration - v.extend(0u64.to_be_bytes()); - - // Language as ISO-639-2/T - // TODO: get actual language from the tags - v.extend(language_code(b"und").to_be_bytes()); - - // Pre-defined - v.extend([0u8; 2]); - - Ok(()) -} - -fn write_hdlr( - v: &mut Vec<u8>, - _cfg: &super::HeaderConfiguration, - caps: &gst::CapsRef, -) -> Result<(), Error> { - // Pre-defined - v.extend([0u8; 4]); - - let s = caps.structure(0).unwrap(); - let (handler_type, name) = match s.name() { - "video/x-h264" | "video/x-h265" | "image/jpeg" => (b"vide", b"VideoHandler\0".as_slice()), - "audio/mpeg" | "audio/x-alaw" | "audio/x-mulaw" | "audio/x-adpcm" => { - (b"soun", b"SoundHandler\0".as_slice()) - } - "application/x-onvif-metadata" => (b"meta", b"MetadataHandler\0".as_slice()), - _ => unreachable!(), - }; - - // Handler type - v.extend(handler_type); - - // Reserved - v.extend([0u8; 3 * 4]); - - // Name - v.extend(name); - - Ok(()) -} - -fn write_minf( - v: &mut Vec<u8>, - cfg: &super::HeaderConfiguration, - caps: &gst::CapsRef, -) -> Result<(), Error> { - let s = caps.structure(0).unwrap(); - - match s.name() { - "video/x-h264" | "video/x-h265" | "image/jpeg" => { - // Flags are always 1 for unspecified reasons - write_full_box(v, b"vmhd", FULL_BOX_VERSION_0, 1, |v| write_vmhd(v, cfg))? - } - "audio/mpeg" | "audio/x-alaw" | "audio/x-mulaw" | "audio/x-adpcm" => { - write_full_box(v, b"smhd", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| { - write_smhd(v, cfg) - })? - } - "application/x-onvif-metadata" => { - write_full_box(v, b"nmhd", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |_v| { - Ok(()) - })? - } - _ => unreachable!(), - } - - write_box(v, b"dinf", |v| write_dinf(v, cfg))?; - - write_box(v, b"stbl", |v| write_stbl(v, cfg, caps))?; - - Ok(()) -} - -fn write_vmhd(v: &mut Vec<u8>, _cfg: &super::HeaderConfiguration) -> Result<(), Error> { - // Graphics mode - v.extend([0u8; 2]); - - // opcolor - v.extend([0u8; 2 * 3]); - - Ok(()) -} - -fn write_smhd(v: &mut Vec<u8>, _cfg: &super::HeaderConfiguration) -> Result<(), Error> { - // Balance - v.extend([0u8; 2]); - - // Reserved - v.extend([0u8; 2]); - - Ok(()) -} - -fn write_dinf(v: &mut Vec<u8>, cfg: &super::HeaderConfiguration) -> Result<(), Error> { - write_full_box(v, b"dref", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| { - write_dref(v, cfg) - })?; - - Ok(()) -} - -const DREF_FLAGS_MEDIA_IN_SAME_FILE: u32 = 0x1; - -fn write_dref(v: &mut Vec<u8>, _cfg: &super::HeaderConfiguration) -> Result<(), Error> { - // Entry count - v.extend(1u32.to_be_bytes()); - - write_full_box( - v, - b"url ", - FULL_BOX_VERSION_0, - DREF_FLAGS_MEDIA_IN_SAME_FILE, - |_v| Ok(()), - )?; - - Ok(()) -} - -fn write_stbl( - v: &mut Vec<u8>, - cfg: &super::HeaderConfiguration, - caps: &gst::CapsRef, -) -> Result<(), Error> { - write_full_box(v, b"stsd", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| { - write_stsd(v, cfg, caps) - })?; - write_full_box(v, b"stts", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| { - write_stts(v, cfg) - })?; - write_full_box(v, b"stsc", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| { - write_stsc(v, cfg) - })?; - write_full_box(v, b"stsz", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| { - write_stsz(v, cfg) - })?; - - write_full_box(v, b"stco", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| { - write_stco(v, cfg) - })?; - - // For video write a sync sample box as indication that not all samples are sync samples - let s = caps.structure(0).unwrap(); - match s.name() { - "video/x-h264" | "video/x-h265" => { - write_full_box(v, b"stss", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| { - write_stss(v, cfg) - })? - } - _ => (), - } - - Ok(()) -} - -fn write_stsd( - v: &mut Vec<u8>, - cfg: &super::HeaderConfiguration, - caps: &gst::CapsRef, -) -> Result<(), Error> { - // Entry count - v.extend(1u32.to_be_bytes()); - - let s = caps.structure(0).unwrap(); - match s.name() { - "video/x-h264" | "video/x-h265" | "image/jpeg" => write_visual_sample_entry(v, cfg, caps)?, - "audio/mpeg" | "audio/x-alaw" | "audio/x-mulaw" | "audio/x-adpcm" => { - write_audio_sample_entry(v, cfg, caps)? - } - "application/x-onvif-metadata" => write_xml_meta_data_sample_entry(v, cfg, caps)?, - _ => unreachable!(), - } - - Ok(()) -} - -fn write_sample_entry_box<T, F: FnOnce(&mut Vec<u8>) -> Result<T, Error>>( - v: &mut Vec<u8>, - fourcc: impl std::borrow::Borrow<[u8; 4]>, - content_func: F, -) -> Result<T, Error> { - write_box(v, fourcc, move |v| { - // Reserved - v.extend([0u8; 6]); - - // Data reference index - v.extend(1u16.to_be_bytes()); - - content_func(v) - }) -} - -fn write_visual_sample_entry( - v: &mut Vec<u8>, - _cfg: &super::HeaderConfiguration, - caps: &gst::CapsRef, -) -> Result<(), Error> { - let s = caps.structure(0).unwrap(); - let fourcc = match s.name() { - "video/x-h264" => { - let stream_format = s.get::<&str>("stream-format").context("no stream-format")?; - match stream_format { - "avc" => b"avc1", - "avc3" => b"avc3", - _ => unreachable!(), - } - } - "video/x-h265" => { - let stream_format = s.get::<&str>("stream-format").context("no stream-format")?; - match stream_format { - "hvc1" => b"hvc1", - "hev1" => b"hev1", - _ => unreachable!(), - } - } - "image/jpeg" => b"jpeg", - _ => unreachable!(), - }; - - write_sample_entry_box(v, fourcc, move |v| { - // pre-defined - v.extend([0u8; 2]); - // Reserved - v.extend([0u8; 2]); - // pre-defined - v.extend([0u8; 3 * 4]); - - // Width - let width = - u16::try_from(s.get::<i32>("width").context("no width")?).context("too big width")?; - v.extend(width.to_be_bytes()); - - // Height - let height = u16::try_from(s.get::<i32>("height").context("no height")?) - .context("too big height")?; - v.extend(height.to_be_bytes()); - - // Horizontal resolution - v.extend(0x00480000u32.to_be_bytes()); - - // Vertical resolution - v.extend(0x00480000u32.to_be_bytes()); - - // Reserved - v.extend([0u8; 4]); - - // Frame count - v.extend(1u16.to_be_bytes()); - - // Compressor name - v.extend([0u8; 32]); - - // Depth - v.extend(0x0018u16.to_be_bytes()); - - // Pre-defined - v.extend((-1i16).to_be_bytes()); - - // Codec specific boxes - match s.name() { - "video/x-h264" => { - let codec_data = s - .get::<&gst::BufferRef>("codec_data") - .context("no codec_data")?; - let map = codec_data - .map_readable() - .context("codec_data not mappable")?; - write_box(v, b"avcC", move |v| { - v.extend_from_slice(&map); - Ok(()) - })?; - } - "video/x-h265" => { - let codec_data = s - .get::<&gst::BufferRef>("codec_data") - .context("no codec_data")?; - let map = codec_data - .map_readable() - .context("codec_data not mappable")?; - write_box(v, b"hvcC", move |v| { - v.extend_from_slice(&map); - Ok(()) - })?; - } - "image/jpeg" => { - // Nothing to do here - } - _ => unreachable!(), - } - - if let Ok(par) = s.get::<gst::Fraction>("pixel-aspect-ratio") { - write_box(v, b"pasp", move |v| { - v.extend((par.numer() as u32).to_be_bytes()); - v.extend((par.denom() as u32).to_be_bytes()); - Ok(()) - })?; - } - - if let Some(colorimetry) = s - .get::<&str>("colorimetry") - .ok() - .and_then(|c| c.parse::<gst_video::VideoColorimetry>().ok()) - { - write_box(v, b"colr", move |v| { - v.extend(b"nclx"); - let (primaries, transfer, matrix) = { - #[cfg(feature = "v1_18")] - { - ( - (colorimetry.primaries().to_iso() as u16), - (colorimetry.transfer().to_iso() as u16), - (colorimetry.matrix().to_iso() as u16), - ) - } - #[cfg(not(feature = "v1_18"))] - { - let primaries = match colorimetry.primaries() { - gst_video::VideoColorPrimaries::Bt709 => 1u16, - gst_video::VideoColorPrimaries::Bt470m => 4u16, - gst_video::VideoColorPrimaries::Bt470bg => 5u16, - gst_video::VideoColorPrimaries::Smpte170m => 6u16, - gst_video::VideoColorPrimaries::Smpte240m => 7u16, - gst_video::VideoColorPrimaries::Film => 8u16, - gst_video::VideoColorPrimaries::Bt2020 => 9u16, - _ => 2, - }; - let transfer = match colorimetry.transfer() { - gst_video::VideoTransferFunction::Bt709 => 1u16, - gst_video::VideoTransferFunction::Gamma22 => 4u16, - gst_video::VideoTransferFunction::Gamma28 => 5u16, - gst_video::VideoTransferFunction::Smpte240m => 7u16, - gst_video::VideoTransferFunction::Gamma10 => 8u16, - gst_video::VideoTransferFunction::Log100 => 9u16, - gst_video::VideoTransferFunction::Log316 => 10u16, - gst_video::VideoTransferFunction::Srgb => 13u16, - gst_video::VideoTransferFunction::Bt202012 => 15u16, - _ => 2, - }; - let matrix = match colorimetry.matrix() { - gst_video::VideoColorMatrix::Rgb => 0u16, - gst_video::VideoColorMatrix::Bt709 => 1u16, - gst_video::VideoColorMatrix::Fcc => 4u16, - gst_video::VideoColorMatrix::Bt601 => 6u16, - gst_video::VideoColorMatrix::Smpte240m => 7u16, - gst_video::VideoColorMatrix::Bt2020 => 9u16, - _ => 2, - }; - - (primaries, transfer, matrix) - } - }; - - let full_range = match colorimetry.range() { - gst_video::VideoColorRange::Range0_255 => 0x80u8, - gst_video::VideoColorRange::Range16_235 => 0x00u8, - _ => 0x00, - }; - - v.extend(primaries.to_be_bytes()); - v.extend(transfer.to_be_bytes()); - v.extend(matrix.to_be_bytes()); - v.push(full_range); - - Ok(()) - })?; - } - - #[cfg(feature = "v1_18")] - { - if let Ok(cll) = gst_video::VideoContentLightLevel::from_caps(caps) { - write_box(v, b"clli", move |v| { - v.extend((cll.max_content_light_level() as u16).to_be_bytes()); - v.extend((cll.max_frame_average_light_level() as u16).to_be_bytes()); - Ok(()) - })?; - } - - if let Ok(mastering) = gst_video::VideoMasteringDisplayInfo::from_caps(caps) { - write_box(v, b"mdcv", move |v| { - for primary in mastering.display_primaries() { - v.extend(primary.x.to_be_bytes()); - v.extend(primary.y.to_be_bytes()); - } - v.extend(mastering.white_point().x.to_be_bytes()); - v.extend(mastering.white_point().y.to_be_bytes()); - v.extend(mastering.max_display_mastering_luminance().to_be_bytes()); - v.extend(mastering.max_display_mastering_luminance().to_be_bytes()); - Ok(()) - })?; - } - } - - // Write fiel box for codecs that require it - if ["image/jpeg"].contains(&s.name()) { - let interlace_mode = s - .get::<&str>("interlace-mode") - .ok() - .map(gst_video::VideoInterlaceMode::from_string) - .unwrap_or(gst_video::VideoInterlaceMode::Progressive); - let field_order = s - .get::<&str>("field-order") - .ok() - .map(gst_video::VideoFieldOrder::from_string) - .unwrap_or(gst_video::VideoFieldOrder::Unknown); - - write_box(v, b"fiel", move |v| { - let (interlace, field_order) = match interlace_mode { - gst_video::VideoInterlaceMode::Progressive => (1, 0), - gst_video::VideoInterlaceMode::Interleaved - if field_order == gst_video::VideoFieldOrder::TopFieldFirst => - { - (2, 9) - } - gst_video::VideoInterlaceMode::Interleaved => (2, 14), - _ => (0, 0), - }; - - v.push(interlace); - v.push(field_order); - Ok(()) - })?; - } - - // TODO: write btrt bitrate box based on tags - - Ok(()) - })?; - - Ok(()) -} - -fn write_audio_sample_entry( - v: &mut Vec<u8>, - _cfg: &super::HeaderConfiguration, - caps: &gst::CapsRef, -) -> Result<(), Error> { - let s = caps.structure(0).unwrap(); - let fourcc = match s.name() { - "audio/mpeg" => b"mp4a", - "audio/x-alaw" => b"alaw", - "audio/x-mulaw" => b"ulaw", - "audio/x-adpcm" => { - let layout = s.get::<&str>("layout").context("no ADPCM layout field")?; - - match layout { - "g726" => b"ms\x00\x45", - _ => unreachable!(), - } - } - _ => unreachable!(), - }; - - let sample_size = match s.name() { - "audio/x-adpcm" => { - let bitrate = s.get::<i32>("bitrate").context("no ADPCM bitrate field")?; - (bitrate / 8000) as u16 - } - _ => 16u16, - }; - - write_sample_entry_box(v, fourcc, move |v| { - // Reserved - v.extend([0u8; 2 * 4]); - - // Channel count - let channels = u16::try_from(s.get::<i32>("channels").context("no channels")?) - .context("too many channels")?; - v.extend(channels.to_be_bytes()); - - // Sample size - v.extend(sample_size.to_be_bytes()); - - // Pre-defined - v.extend([0u8; 2]); - - // Reserved - v.extend([0u8; 2]); - - // Sample rate - let rate = u16::try_from(s.get::<i32>("rate").context("no rate")?).unwrap_or(0); - v.extend((u32::from(rate) << 16).to_be_bytes()); - - // Codec specific boxes - match s.name() { - "audio/mpeg" => { - let codec_data = s - .get::<&gst::BufferRef>("codec_data") - .context("no codec_data")?; - let map = codec_data - .map_readable() - .context("codec_data not mappable")?; - if map.len() < 2 { - bail!("too small codec_data"); - } - write_esds_aac(v, &map)?; - } - "audio/x-alaw" | "audio/x-mulaw" | "audio/x-adpcm" => { - // Nothing to do here - } - _ => unreachable!(), - } - - // If rate did not fit into 16 bits write a full `srat` box - if rate == 0 { - let rate = s.get::<i32>("rate").context("no rate")?; - // FIXME: This is defined as full box? - write_full_box( - v, - b"srat", - FULL_BOX_VERSION_0, - FULL_BOX_FLAGS_NONE, - move |v| { - v.extend((rate as u32).to_be_bytes()); - Ok(()) - }, - )?; - } - - // TODO: write btrt bitrate box based on tags - - // TODO: chnl box for channel ordering? probably not needed for AAC - - Ok(()) - })?; - - Ok(()) -} - -fn write_esds_aac(v: &mut Vec<u8>, codec_data: &[u8]) -> Result<(), Error> { - let calculate_len = |mut len| { - if len > 260144641 { - bail!("too big descriptor length"); - } - - if len == 0 { - return Ok(([0; 4], 1)); - } - - let mut idx = 0; - let mut lens = [0u8; 4]; - while len > 0 { - lens[idx] = ((if len > 0x7f { 0x80 } else { 0x00 }) | (len & 0x7f)) as u8; - idx += 1; - len >>= 7; - } - - Ok((lens, idx)) - }; - - write_full_box( - v, - b"esds", - FULL_BOX_VERSION_0, - FULL_BOX_FLAGS_NONE, - move |v| { - // Calculate all lengths bottom up - - // Decoder specific info - let decoder_specific_info_len = calculate_len(codec_data.len())?; - - // Decoder config - let decoder_config_len = - calculate_len(13 + 1 + decoder_specific_info_len.1 + codec_data.len())?; - - // SL config - let sl_config_len = calculate_len(1)?; - - // ES descriptor - let es_descriptor_len = calculate_len( - 3 + 1 - + decoder_config_len.1 - + 13 - + 1 - + decoder_specific_info_len.1 - + codec_data.len() - + 1 - + sl_config_len.1 - + 1, - )?; - - // ES descriptor tag - v.push(0x03); - - // Length - v.extend_from_slice(&es_descriptor_len.0[..(es_descriptor_len.1)]); - - // Track ID - v.extend(1u16.to_be_bytes()); - // Flags - v.push(0u8); - - // Decoder config descriptor - v.push(0x04); - - // Length - v.extend_from_slice(&decoder_config_len.0[..(decoder_config_len.1)]); - - // Object type ESDS_OBJECT_TYPE_MPEG4_P3 - v.push(0x40); - // Stream type ESDS_STREAM_TYPE_AUDIO - v.push((0x05 << 2) | 0x01); - - // Buffer size db? - v.extend([0u8; 3]); - - // Max bitrate - v.extend(0u32.to_be_bytes()); - - // Avg bitrate - v.extend(0u32.to_be_bytes()); - - // Decoder specific info - v.push(0x05); - - // Length - v.extend_from_slice(&decoder_specific_info_len.0[..(decoder_specific_info_len.1)]); - v.extend_from_slice(codec_data); - - // SL config descriptor - v.push(0x06); - - // Length: 1 (tag) + 1 (length) + 1 (predefined) - v.extend_from_slice(&sl_config_len.0[..(sl_config_len.1)]); - - // Predefined - v.push(0x02); - Ok(()) - }, - ) -} - -fn write_xml_meta_data_sample_entry( - v: &mut Vec<u8>, - _cfg: &super::HeaderConfiguration, - caps: &gst::CapsRef, -) -> Result<(), Error> { - let s = caps.structure(0).unwrap(); - let namespace = match s.name() { - "application/x-onvif-metadata" => b"http://www.onvif.org/ver10/schema", - _ => unreachable!(), - }; - - write_sample_entry_box(v, b"metx", move |v| { - // content_encoding, empty string - v.push(0); - - // namespace - v.extend_from_slice(namespace); - v.push(0); - - // schema_location, empty string list - v.push(0); - - Ok(()) - })?; - - Ok(()) -} - -fn write_stts(v: &mut Vec<u8>, _cfg: &super::HeaderConfiguration) -> Result<(), Error> { - // Entry count - v.extend(0u32.to_be_bytes()); - - Ok(()) -} - -fn write_stsc(v: &mut Vec<u8>, _cfg: &super::HeaderConfiguration) -> Result<(), Error> { - // Entry count - v.extend(0u32.to_be_bytes()); - - Ok(()) -} - -fn write_stsz(v: &mut Vec<u8>, _cfg: &super::HeaderConfiguration) -> Result<(), Error> { - // Sample size - v.extend(0u32.to_be_bytes()); - - // Sample count - v.extend(0u32.to_be_bytes()); - - Ok(()) -} - -fn write_stco(v: &mut Vec<u8>, _cfg: &super::HeaderConfiguration) -> Result<(), Error> { - // Entry count - v.extend(0u32.to_be_bytes()); - - Ok(()) -} - -fn write_stss(v: &mut Vec<u8>, _cfg: &super::HeaderConfiguration) -> Result<(), Error> { - // Entry count - v.extend(0u32.to_be_bytes()); - - Ok(()) -} - -fn write_mvex(v: &mut Vec<u8>, cfg: &super::HeaderConfiguration) -> Result<(), Error> { - if cfg.write_mehd { - if cfg.update && cfg.duration.is_some() { - write_full_box(v, b"mehd", FULL_BOX_VERSION_1, FULL_BOX_FLAGS_NONE, |v| { - write_mehd(v, cfg) - })?; - } else { - write_box(v, b"free", |v| { - // version/flags of full box - v.extend(0u32.to_be_bytes()); - // mehd duration - v.extend(0u64.to_be_bytes()); - - Ok(()) - })?; - } - } - - for (idx, _caps) in cfg.streams.iter().enumerate() { - write_full_box(v, b"trex", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| { - write_trex(v, cfg, idx) - })?; - } - - Ok(()) -} - -fn write_mehd(v: &mut Vec<u8>, cfg: &super::HeaderConfiguration) -> Result<(), Error> { - // Use the reference track timescale - let timescale = caps_to_timescale(&cfg.streams[0]); - - let duration = cfg - .duration - .expect("no duration") - .mul_div_ceil(timescale as u64, gst::ClockTime::SECOND.nseconds()) - .context("too long duration")?; - - // Media duration in mvhd.timescale units - v.extend(duration.to_be_bytes()); - - Ok(()) -} - -fn write_trex(v: &mut Vec<u8>, _cfg: &super::HeaderConfiguration, idx: usize) -> Result<(), Error> { - // Track ID - v.extend((idx as u32 + 1).to_be_bytes()); - - // Default sample description index - v.extend(1u32.to_be_bytes()); - - // Default sample duration - v.extend(0u32.to_be_bytes()); - - // Default sample size - v.extend(0u32.to_be_bytes()); - - // Default sample flags - v.extend(0u32.to_be_bytes()); - - // Default sample duration/size/etc will be provided in the traf/trun if one can be determined - // for a whole fragment - - Ok(()) -} - -/// Creates `styp` and `moof` boxes and `mdat` header -pub(super) fn create_fmp4_fragment_header( - cfg: super::FragmentHeaderConfiguration, -) -> Result<(gst::Buffer, u64), Error> { - let mut v = vec![]; - - let (brand, compatible_brands) = - brands_from_variant_and_caps(cfg.variant, cfg.streams.iter().map(|s| &s.0)); - - write_box(&mut v, b"styp", |v| { - // major brand - v.extend(brand); - // minor version - v.extend(0u32.to_be_bytes()); - // compatible brands - v.extend(compatible_brands.into_iter().flatten()); - - Ok(()) - })?; - - let styp_len = v.len(); - - let data_offset_offsets = write_box(&mut v, b"moof", |v| write_moof(v, &cfg))?; - - let size = cfg - .buffers - .iter() - .map(|buffer| buffer.buffer.size() as u64) - .sum::<u64>(); - if let Ok(size) = u32::try_from(size + 8) { - v.extend(size.to_be_bytes()); - v.extend(b"mdat"); - } else { - v.extend(1u32.to_be_bytes()); - v.extend(b"mdat"); - v.extend((size + 16).to_be_bytes()); - } - - let data_offset = v.len() - styp_len; - for data_offset_offset in data_offset_offsets { - let val = u32::from_be_bytes(v[data_offset_offset..][..4].try_into()?) - .checked_add(u32::try_from(data_offset)?) - .ok_or_else(|| anyhow!("can't calculate track run data offset"))?; - v[data_offset_offset..][..4].copy_from_slice(&val.to_be_bytes()); - } - - Ok((gst::Buffer::from_mut_slice(v), styp_len as u64)) -} - -fn write_moof( - v: &mut Vec<u8>, - cfg: &super::FragmentHeaderConfiguration, -) -> Result<Vec<usize>, Error> { - write_full_box(v, b"mfhd", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| { - write_mfhd(v, cfg) - })?; - - let mut data_offset_offsets = vec![]; - for (idx, (caps, timing_info)) in cfg.streams.iter().enumerate() { - // Skip tracks without any buffers for this fragment. - let timing_info = match timing_info { - None => continue, - Some(ref timing_info) => timing_info, - }; - - write_box(v, b"traf", |v| { - write_traf(v, cfg, &mut data_offset_offsets, idx, caps, timing_info) - })?; - } - - Ok(data_offset_offsets) -} - -fn write_mfhd(v: &mut Vec<u8>, cfg: &super::FragmentHeaderConfiguration) -> Result<(), Error> { - v.extend(cfg.sequence_number.to_be_bytes()); - - Ok(()) -} - -#[allow(clippy::identity_op)] -fn sample_flags_from_buffer( - timing_info: &super::FragmentTimingInfo, - buffer: &gst::BufferRef, -) -> u32 { - if timing_info.intra_only { - (0b00u32 << (16 + 10)) | // leading: unknown - (0b10u32 << (16 + 8)) | // depends: no - (0b10u32 << (16 + 6)) | // depended: no - (0b00u32 << (16 + 4)) | // redundancy: unknown - (0b000u32 << (16 + 1)) | // padding: no - (0b0u32 << 16) | // non-sync-sample: no - (0u32) // degradation priority - } else { - let depends = if buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) { - 0b01u32 - } else { - 0b10u32 - }; - let depended = if buffer.flags().contains(gst::BufferFlags::DROPPABLE) { - 0b10u32 - } else { - 0b00u32 - }; - let non_sync_sample = if buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) { - 0b1u32 - } else { - 0b0u32 - }; - - (0b00u32 << (16 + 10)) | // leading: unknown - (depends << (16 + 8)) | // depends - (depended << (16 + 6)) | // depended - (0b00u32 << (16 + 4)) | // redundancy: unknown - (0b000u32 << (16 + 1)) | // padding: no - (non_sync_sample << 16) | // non-sync-sample - (0u32) // degradation priority - } -} - -const DEFAULT_SAMPLE_DURATION_PRESENT: u32 = 0x08; -const DEFAULT_SAMPLE_SIZE_PRESENT: u32 = 0x10; -const DEFAULT_SAMPLE_FLAGS_PRESENT: u32 = 0x20; -const DEFAULT_BASE_IS_MOOF: u32 = 0x2_00_00; - -const DATA_OFFSET_PRESENT: u32 = 0x0_01; -const FIRST_SAMPLE_FLAGS_PRESENT: u32 = 0x0_04; -const SAMPLE_DURATION_PRESENT: u32 = 0x1_00; -const SAMPLE_SIZE_PRESENT: u32 = 0x2_00; -const SAMPLE_FLAGS_PRESENT: u32 = 0x4_00; -const SAMPLE_COMPOSITION_TIME_OFFSET_PRESENT: u32 = 0x8_00; - -#[allow(clippy::type_complexity)] -fn analyze_buffers( - cfg: &super::FragmentHeaderConfiguration, - idx: usize, - timing_info: &super::FragmentTimingInfo, - timescale: u32, -) -> Result< - ( - // tf_flags - u32, - // tr_flags - u32, - // default size - Option<u32>, - // default duration - Option<u32>, - // default flags - Option<u32>, - // negative composition time offsets - bool, - ), - Error, -> { - let mut tf_flags = DEFAULT_BASE_IS_MOOF; - let mut tr_flags = DATA_OFFSET_PRESENT; - - let mut duration = None; - let mut size = None; - let mut first_buffer_flags = None; - let mut flags = None; - - let mut negative_composition_time_offsets = false; - - for Buffer { - idx: _idx, - buffer, - timestamp: _timestamp, - duration: sample_duration, - composition_time_offset, - } in cfg.buffers.iter().filter(|b| b.idx == idx) - { - if size.is_none() { - size = Some(buffer.size() as u32); - } - if Some(buffer.size() as u32) != size { - tr_flags |= SAMPLE_SIZE_PRESENT; - } - - let sample_duration = u32::try_from( - sample_duration - .nseconds() - .mul_div_round(timescale as u64, gst::ClockTime::SECOND.nseconds()) - .context("too big sample duration")?, - ) - .context("too big sample duration")?; - - if duration.is_none() { - duration = Some(sample_duration); - } - if Some(sample_duration) != duration { - tr_flags |= SAMPLE_DURATION_PRESENT; - } - - let f = sample_flags_from_buffer(timing_info, buffer); - if first_buffer_flags.is_none() { - first_buffer_flags = Some(f); - } else { - flags = Some(f); - if Some(f) != first_buffer_flags { - tr_flags |= FIRST_SAMPLE_FLAGS_PRESENT; - } - } - - if flags.is_some() && Some(f) != flags { - tr_flags &= !FIRST_SAMPLE_FLAGS_PRESENT; - tr_flags |= SAMPLE_FLAGS_PRESENT; - } - - if let Some(composition_time_offset) = *composition_time_offset { - assert!(!timing_info.intra_only); - if composition_time_offset != 0 { - tr_flags |= SAMPLE_COMPOSITION_TIME_OFFSET_PRESENT; - } - if composition_time_offset < 0 { - negative_composition_time_offsets = true; - } - } - } - - if (tr_flags & SAMPLE_SIZE_PRESENT) == 0 { - tf_flags |= DEFAULT_SAMPLE_SIZE_PRESENT; - } else { - size = None; - } - - if (tr_flags & SAMPLE_DURATION_PRESENT) == 0 { - tf_flags |= DEFAULT_SAMPLE_DURATION_PRESENT; - } else { - duration = None; - } - - // If there is only a single buffer use its flags as default sample flags - // instead of first sample flags. - if flags.is_none() && first_buffer_flags.is_some() { - tr_flags &= !FIRST_SAMPLE_FLAGS_PRESENT; - flags = first_buffer_flags.take(); - } - - if (tr_flags & SAMPLE_FLAGS_PRESENT) == 0 { - tf_flags |= DEFAULT_SAMPLE_FLAGS_PRESENT; - } else { - flags = None; - } - - Ok(( - tf_flags, - tr_flags, - size, - duration, - flags, - negative_composition_time_offsets, - )) -} - -#[allow(clippy::ptr_arg)] -fn write_traf( - v: &mut Vec<u8>, - cfg: &super::FragmentHeaderConfiguration, - data_offset_offsets: &mut Vec<usize>, - idx: usize, - caps: &gst::CapsRef, - timing_info: &super::FragmentTimingInfo, -) -> Result<(), Error> { - let timescale = caps_to_timescale(caps); - - // Analyze all buffers to know what values can be put into the tfhd for all samples and what - // has to be stored for every single sample - let ( - tf_flags, - tr_flags, - default_size, - default_duration, - default_flags, - negative_composition_time_offsets, - ) = analyze_buffers(cfg, idx, timing_info, timescale)?; - - assert!((tf_flags & DEFAULT_SAMPLE_SIZE_PRESENT == 0) ^ default_size.is_some()); - assert!((tf_flags & DEFAULT_SAMPLE_DURATION_PRESENT == 0) ^ default_duration.is_some()); - assert!((tf_flags & DEFAULT_SAMPLE_FLAGS_PRESENT == 0) ^ default_flags.is_some()); - - write_full_box(v, b"tfhd", FULL_BOX_VERSION_0, tf_flags, |v| { - write_tfhd(v, cfg, idx, default_size, default_duration, default_flags) - })?; - write_full_box(v, b"tfdt", FULL_BOX_VERSION_1, FULL_BOX_FLAGS_NONE, |v| { - write_tfdt(v, cfg, idx, timing_info, timescale) - })?; - - let mut current_data_offset = 0; - - for run in GroupBy::new(cfg.buffers, |a: &Buffer, b: &Buffer| a.idx == b.idx) { - if run[0].idx != idx { - // FIXME: What to do with >4GB offsets? - current_data_offset = (current_data_offset as u64 - + run.iter().map(|b| b.buffer.size() as u64).sum::<u64>()) - .try_into()?; - continue; - } - - let data_offset_offset = write_full_box( - v, - b"trun", - if negative_composition_time_offsets { - FULL_BOX_VERSION_1 - } else { - FULL_BOX_VERSION_0 - }, - tr_flags, - |v| { - write_trun( - v, - cfg, - current_data_offset, - tr_flags, - timescale, - timing_info, - run, - ) - }, - )?; - data_offset_offsets.push(data_offset_offset); - - // FIXME: What to do with >4GB offsets? - current_data_offset = (current_data_offset as u64 - + run.iter().map(|b| b.buffer.size() as u64).sum::<u64>()) - .try_into()?; - } - - // TODO: saio, saiz, sbgp, sgpd, subs? - - Ok(()) -} - -fn write_tfhd( - v: &mut Vec<u8>, - _cfg: &super::FragmentHeaderConfiguration, - idx: usize, - default_size: Option<u32>, - default_duration: Option<u32>, - default_flags: Option<u32>, -) -> Result<(), Error> { - // Track ID - v.extend((idx as u32 + 1).to_be_bytes()); - - // No base data offset, no sample description index - - if let Some(default_duration) = default_duration { - v.extend(default_duration.to_be_bytes()); - } - - if let Some(default_size) = default_size { - v.extend(default_size.to_be_bytes()); - } - - if let Some(default_flags) = default_flags { - v.extend(default_flags.to_be_bytes()); - } - - Ok(()) -} - -fn write_tfdt( - v: &mut Vec<u8>, - _cfg: &super::FragmentHeaderConfiguration, - _idx: usize, - timing_info: &super::FragmentTimingInfo, - timescale: u32, -) -> Result<(), Error> { - let base_time = timing_info - .start_time - .mul_div_floor(timescale as u64, gst::ClockTime::SECOND.nseconds()) - .context("base time overflow")?; - - v.extend(base_time.to_be_bytes()); - - Ok(()) -} - -#[allow(clippy::too_many_arguments)] -fn write_trun( - v: &mut Vec<u8>, - _cfg: &super::FragmentHeaderConfiguration, - current_data_offset: u32, - tr_flags: u32, - timescale: u32, - timing_info: &super::FragmentTimingInfo, - buffers: &[Buffer], -) -> Result<usize, Error> { - // Sample count - v.extend((buffers.len() as u32).to_be_bytes()); - - let data_offset_offset = v.len(); - // Data offset, will be rewritten later - v.extend(current_data_offset.to_be_bytes()); - - if (tr_flags & FIRST_SAMPLE_FLAGS_PRESENT) != 0 { - v.extend(sample_flags_from_buffer(timing_info, &buffers[0].buffer).to_be_bytes()); - } - - for Buffer { - idx: _idx, - ref buffer, - timestamp: _timestamp, - duration, - composition_time_offset, - } in buffers.iter() - { - if (tr_flags & SAMPLE_DURATION_PRESENT) != 0 { - // Sample duration - let sample_duration = u32::try_from( - duration - .nseconds() - .mul_div_round(timescale as u64, gst::ClockTime::SECOND.nseconds()) - .context("too big sample duration")?, - ) - .context("too big sample duration")?; - v.extend(sample_duration.to_be_bytes()); - } - - if (tr_flags & SAMPLE_SIZE_PRESENT) != 0 { - // Sample size - v.extend((buffer.size() as u32).to_be_bytes()); - } - - if (tr_flags & SAMPLE_FLAGS_PRESENT) != 0 { - assert!((tr_flags & FIRST_SAMPLE_FLAGS_PRESENT) == 0); - - // Sample flags - v.extend(sample_flags_from_buffer(timing_info, buffer).to_be_bytes()); - } - - if (tr_flags & SAMPLE_COMPOSITION_TIME_OFFSET_PRESENT) != 0 { - // Sample composition time offset - let composition_time_offset = i32::try_from( - composition_time_offset - .unwrap_or(0) - .mul_div_round(timescale as i64, gst::ClockTime::SECOND.nseconds() as i64) - .context("too big composition time offset")?, - ) - .context("too big composition time offset")?; - v.extend(composition_time_offset.to_be_bytes()); - } - } - - Ok(data_offset_offset) -} - -/// Creates `mfra` box -pub(crate) fn create_mfra( - caps: &gst::CapsRef, - fragment_offsets: &[super::FragmentOffset], -) -> Result<gst::Buffer, Error> { - let timescale = caps_to_timescale(caps); - - let mut v = vec![]; - - let offset = write_box(&mut v, b"mfra", |v| { - write_full_box(v, b"tfra", FULL_BOX_VERSION_1, FULL_BOX_FLAGS_NONE, |v| { - // Track ID - v.extend(1u32.to_be_bytes()); - - // Reserved / length of traf/trun/sample - v.extend(0u32.to_be_bytes()); - - // Number of entries - v.extend( - u32::try_from(fragment_offsets.len()) - .context("too many fragments")? - .to_be_bytes(), - ); - - for super::FragmentOffset { time, offset } in fragment_offsets { - // Time - let time = time - .nseconds() - .mul_div_round(timescale as u64, gst::ClockTime::SECOND.nseconds()) - .context("time overflow")?; - v.extend(time.to_be_bytes()); - - // moof offset - v.extend(offset.to_be_bytes()); - - // traf/trun/sample number - v.extend_from_slice(&[1u8; 3][..]); - } - - Ok(()) - })?; - - let offset = write_full_box(v, b"mfro", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| { - let offset = v.len(); - // Parent size - v.extend(0u32.to_be_bytes()); - Ok(offset) - })?; - - Ok(offset) - })?; - - let len = u32::try_from(v.len() as u64).context("too big mfra")?; - v[offset..][..4].copy_from_slice(&len.to_be_bytes()); - - Ok(gst::Buffer::from_mut_slice(v)) -} - -// Copy from std while this is still nightly-only -use std::fmt; - -/// An iterator over slice in (non-overlapping) chunks separated by a predicate. -/// -/// This struct is created by the [`group_by`] method on [slices]. -/// -/// [`group_by`]: slice::group_by -/// [slices]: slice -struct GroupBy<'a, T: 'a, P> { - slice: &'a [T], - predicate: P, -} - -impl<'a, T: 'a, P> GroupBy<'a, T, P> { - fn new(slice: &'a [T], predicate: P) -> Self { - GroupBy { slice, predicate } - } -} - -impl<'a, T: 'a, P> Iterator for GroupBy<'a, T, P> -where - P: FnMut(&T, &T) -> bool, -{ - type Item = &'a [T]; - - #[inline] - fn next(&mut self) -> Option<Self::Item> { - if self.slice.is_empty() { - None - } else { - let mut len = 1; - let mut iter = self.slice.windows(2); - while let Some([l, r]) = iter.next() { - if (self.predicate)(l, r) { - len += 1 - } else { - break; - } - } - let (head, tail) = self.slice.split_at(len); - self.slice = tail; - Some(head) - } - } - - #[inline] - fn size_hint(&self) -> (usize, Option<usize>) { - if self.slice.is_empty() { - (0, Some(0)) - } else { - (1, Some(self.slice.len())) - } - } - - #[inline] - fn last(mut self) -> Option<Self::Item> { - self.next_back() - } -} - -impl<'a, T: 'a, P> DoubleEndedIterator for GroupBy<'a, T, P> -where - P: FnMut(&T, &T) -> bool, -{ - #[inline] - fn next_back(&mut self) -> Option<Self::Item> { - if self.slice.is_empty() { - None - } else { - let mut len = 1; - let mut iter = self.slice.windows(2); - while let Some([l, r]) = iter.next_back() { - if (self.predicate)(l, r) { - len += 1 - } else { - break; - } - } - let (head, tail) = self.slice.split_at(self.slice.len() - len); - self.slice = head; - Some(tail) - } - } -} - -impl<'a, T: 'a, P> std::iter::FusedIterator for GroupBy<'a, T, P> where P: FnMut(&T, &T) -> bool {} - -impl<'a, T: 'a + fmt::Debug, P> fmt::Debug for GroupBy<'a, T, P> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("GroupBy") - .field("slice", &self.slice) - .finish() - } -} diff --git a/generic/fmp4/src/fmp4mux/imp.rs b/generic/fmp4/src/fmp4mux/imp.rs deleted file mode 100644 index a7a8932d..00000000 --- a/generic/fmp4/src/fmp4mux/imp.rs +++ /dev/null @@ -1,2596 +0,0 @@ -// Copyright (C) 2021 Sebastian Dröge <sebastian@centricular.com> -// -// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. -// If a copy of the MPL was not distributed with this file, You can obtain one at -// <https://mozilla.org/MPL/2.0/>. -// -// SPDX-License-Identifier: MPL-2.0 - -use gst::glib; -use gst::prelude::*; -use gst::subclass::prelude::*; -use gst_base::prelude::*; -use gst_base::subclass::prelude::*; - -use std::collections::VecDeque; -use std::sync::Mutex; - -use once_cell::sync::Lazy; - -use super::boxes; -use super::Buffer; - -/// Offset for the segment in non-single-stream variants. -const SEGMENT_OFFSET: gst::ClockTime = gst::ClockTime::from_seconds(60 * 60 * 1000); - -/// Offset between UNIX epoch and Jan 1 1601 epoch in seconds. -/// 1601 = UNIX + UNIX_1601_OFFSET. -const UNIX_1601_OFFSET: u64 = 11_644_473_600; - -/// Offset between NTP and UNIX epoch in seconds. -/// NTP = UNIX + NTP_UNIX_OFFSET. -const NTP_UNIX_OFFSET: u64 = 2_208_988_800; - -/// Reference timestamp meta caps for NTP timestamps. -static NTP_CAPS: Lazy<gst::Caps> = Lazy::new(|| gst::Caps::builder("timestamp/x-ntp").build()); - -/// Reference timestamp meta caps for UNIX timestamps. -static UNIX_CAPS: Lazy<gst::Caps> = Lazy::new(|| gst::Caps::builder("timestamp/x-unix").build()); - -/// Returns the UTC time of the buffer in the UNIX epoch. -fn get_utc_time_from_buffer(buffer: &gst::BufferRef) -> Option<gst::ClockTime> { - buffer - .iter_meta::<gst::ReferenceTimestampMeta>() - .find_map(|meta| { - if meta.reference().can_intersect(&UNIX_CAPS) { - Some(meta.timestamp()) - } else if meta.reference().can_intersect(&NTP_CAPS) { - meta.timestamp().checked_sub(NTP_UNIX_OFFSET.seconds()) - } else { - None - } - }) -} - -static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| { - gst::DebugCategory::new( - "fmp4mux", - gst::DebugColorFlags::empty(), - Some("FMP4Mux Element"), - ) -}); - -const DEFAULT_FRAGMENT_DURATION: gst::ClockTime = gst::ClockTime::from_seconds(10); -const DEFAULT_HEADER_UPDATE_MODE: super::HeaderUpdateMode = super::HeaderUpdateMode::None; -const DEFAULT_WRITE_MFRA: bool = false; -const DEFAULT_WRITE_MEHD: bool = false; -const DEFAULT_INTERLEAVE_BYTES: Option<u64> = None; -const DEFAULT_INTERLEAVE_TIME: Option<gst::ClockTime> = Some(gst::ClockTime::from_mseconds(250)); - -#[derive(Debug, Clone)] -struct Settings { - fragment_duration: gst::ClockTime, - header_update_mode: super::HeaderUpdateMode, - write_mfra: bool, - write_mehd: bool, - interleave_bytes: Option<u64>, - interleave_time: Option<gst::ClockTime>, -} - -impl Default for Settings { - fn default() -> Self { - Settings { - fragment_duration: DEFAULT_FRAGMENT_DURATION, - header_update_mode: DEFAULT_HEADER_UPDATE_MODE, - write_mfra: DEFAULT_WRITE_MFRA, - write_mehd: DEFAULT_WRITE_MEHD, - interleave_bytes: DEFAULT_INTERLEAVE_BYTES, - interleave_time: DEFAULT_INTERLEAVE_TIME, - } - } -} - -#[derive(Debug)] -struct GopBuffer { - buffer: gst::Buffer, - pts: gst::ClockTime, - dts: Option<gst::ClockTime>, -} - -#[derive(Debug)] -struct Gop { - // Running times - start_pts: gst::ClockTime, - start_dts: Option<gst::ClockTime>, - earliest_pts: gst::ClockTime, - // Once this is known to be the final earliest PTS/DTS - final_earliest_pts: bool, - // PTS plus duration of last buffer, or start of next GOP - end_pts: gst::ClockTime, - // Once this is known to be the final end PTS/DTS - final_end_pts: bool, - // DTS plus duration of last buffer, or start of next GOP - end_dts: Option<gst::ClockTime>, - - // Buffer positions - earliest_pts_position: gst::ClockTime, - start_dts_position: Option<gst::ClockTime>, - - // Buffer, PTS running time, DTS running time - buffers: Vec<GopBuffer>, -} - -struct Stream { - sinkpad: gst_base::AggregatorPad, - - caps: gst::Caps, - intra_only: bool, - - queued_gops: VecDeque<Gop>, - fragment_filled: bool, - - // Difference between the first DTS and 0 in case of negative DTS - dts_offset: Option<gst::ClockTime>, - - // Current position (DTS, or PTS for intra-only) to prevent - // timestamps from going backwards when queueing new buffers - current_position: gst::ClockTime, - - // Current UTC time in ONVIF mode to prevent timestamps from - // going backwards when draining a fragment. - // UNIX epoch. - current_utc_time: gst::ClockTime, -} - -#[derive(Default)] -struct State { - streams: Vec<Stream>, - - // Created once we received caps and kept up to date with the caps, - // sent as part of the buffer list for the first fragment. - stream_header: Option<gst::Buffer>, - - sequence_number: u32, - - // Fragment tracking for mfra - current_offset: u64, - fragment_offsets: Vec<super::FragmentOffset>, - - // Start / end PTS of the whole stream - earliest_pts: Option<gst::ClockTime>, - end_pts: Option<gst::ClockTime>, - - // Start PTS of the current fragment - fragment_start_pts: Option<gst::ClockTime>, - // Additional timeout delay in case GOPs are bigger than the fragment duration - timeout_delay: gst::ClockTime, - - // In ONVIF mode the UTC time corresponding to the beginning of the stream - // UNIX epoch. - start_utc_time: Option<gst::ClockTime>, - end_utc_time: Option<gst::ClockTime>, - - sent_headers: bool, -} - -#[derive(Default)] -pub(crate) struct FMP4Mux { - state: Mutex<State>, - settings: Mutex<Settings>, -} - -impl FMP4Mux { - fn find_earliest_stream<'a>( - &self, - state: &'a mut State, - timeout: bool, - ) -> Result<Option<(usize, &'a mut Stream)>, gst::FlowError> { - let mut earliest_stream = None; - let mut all_have_data_or_eos = true; - - for (idx, stream) in state.streams.iter_mut().enumerate() { - let buffer = match stream.sinkpad.peek_buffer() { - Some(buffer) => buffer, - None => { - if stream.sinkpad.is_eos() { - gst::trace!(CAT, obj: &stream.sinkpad, "Stream is EOS"); - } else { - all_have_data_or_eos = false; - gst::trace!(CAT, obj: &stream.sinkpad, "Stream has no buffer"); - } - continue; - } - }; - - if stream.fragment_filled { - gst::trace!(CAT, obj: &stream.sinkpad, "Stream has current fragment filled"); - continue; - } - - let segment = match stream - .sinkpad - .segment() - .clone() - .downcast::<gst::ClockTime>() - .ok() - { - Some(segment) => segment, - None => { - gst::error!(CAT, obj: &stream.sinkpad, "Got buffer before segment"); - return Err(gst::FlowError::Error); - } - }; - - // If the stream has no valid running time, assume it's before everything else. - let running_time = match segment.to_running_time(buffer.dts_or_pts()) { - None => { - gst::trace!(CAT, obj: &stream.sinkpad, "Stream has no valid running time"); - if earliest_stream - .as_ref() - .map_or(true, |(_, _, earliest_running_time)| { - *earliest_running_time > gst::ClockTime::ZERO - }) - { - earliest_stream = Some((idx, stream, gst::ClockTime::ZERO)); - } - continue; - } - Some(running_time) => running_time, - }; - - gst::trace!(CAT, obj: &stream.sinkpad, "Stream has running time {} queued", running_time); - - if earliest_stream - .as_ref() - .map_or(true, |(_idx, _stream, earliest_running_time)| { - *earliest_running_time > running_time - }) - { - earliest_stream = Some((idx, stream, running_time)); - } - } - - if !timeout && !all_have_data_or_eos { - gst::trace!( - CAT, - imp: self, - "No timeout and not all streams have a buffer or are EOS" - ); - Ok(None) - } else if let Some((idx, stream, earliest_running_time)) = earliest_stream { - gst::trace!( - CAT, - imp: self, - "Stream {} is earliest stream with running time {}", - stream.sinkpad.name(), - earliest_running_time - ); - Ok(Some((idx, stream))) - } else { - gst::trace!(CAT, imp: self, "No streams have data queued currently"); - Ok(None) - } - } - - // Queue incoming buffers as individual GOPs. - fn queue_gops( - &self, - _idx: usize, - stream: &mut Stream, - segment: &gst::FormattedSegment<gst::ClockTime>, - mut buffer: gst::Buffer, - ) -> Result<(), gst::FlowError> { - use gst::Signed::*; - - assert!(!stream.fragment_filled); - - gst::trace!(CAT, obj: &stream.sinkpad, "Handling buffer {:?}", buffer); - - let intra_only = stream.intra_only; - - if !intra_only && buffer.dts().is_none() { - gst::error!(CAT, obj: &stream.sinkpad, "Require DTS for video streams"); - return Err(gst::FlowError::Error); - } - - if intra_only && buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) { - gst::error!(CAT, obj: &stream.sinkpad, "Intra-only stream with delta units"); - return Err(gst::FlowError::Error); - } - - let pts_position = buffer.pts().ok_or_else(|| { - gst::error!(CAT, obj: &stream.sinkpad, "Require timestamped buffers"); - gst::FlowError::Error - })?; - let duration = buffer.duration(); - let end_pts_position = duration.opt_add(pts_position).unwrap_or(pts_position); - - let mut pts = segment - .to_running_time_full(pts_position) - .ok_or_else(|| { - gst::error!(CAT, obj: &stream.sinkpad, "Couldn't convert PTS to running time"); - gst::FlowError::Error - })? - .positive_or_else(|_| { - gst::error!(CAT, obj: &stream.sinkpad, "Negative PTSs are not supported"); - gst::FlowError::Error - })?; - - let mut end_pts = segment - .to_running_time_full(end_pts_position) - .ok_or_else(|| { - gst::error!(CAT, obj: &stream.sinkpad, "Couldn't convert end PTS to running time"); - gst::FlowError::Error - })? - .positive_or_else(|_| { - gst::error!(CAT, obj: &stream.sinkpad, "Negative PTSs are not supported"); - gst::FlowError::Error - })?; - - // Enforce monotonically increasing PTS for intra-only streams - if intra_only { - if pts < stream.current_position { - gst::warning!( - CAT, - obj: &stream.sinkpad, - "Decreasing PTS {} < {} for intra-only stream", - pts, - stream.current_position, - ); - pts = stream.current_position; - } else { - stream.current_position = pts; - } - end_pts = std::cmp::max(end_pts, pts); - } - - let (dts_position, dts, end_dts) = if intra_only { - (None, None, None) - } else { - // Negative DTS are handled via the dts_offset and by having negative composition time - // offsets in the `trun` box. The smallest DTS here is shifted to zero. - let dts_position = buffer.dts().expect("not DTS"); - let end_dts_position = duration.opt_add(dts_position).unwrap_or(dts_position); - - let signed_dts = segment.to_running_time_full(dts_position).ok_or_else(|| { - gst::error!(CAT, obj: &stream.sinkpad, "Couldn't convert DTS to running time"); - gst::FlowError::Error - })?; - let mut dts = match signed_dts { - Positive(dts) => { - if let Some(dts_offset) = stream.dts_offset { - dts + dts_offset - } else { - dts - } - } - Negative(dts) => { - if stream.dts_offset.is_none() { - stream.dts_offset = Some(dts); - } - - let dts_offset = stream.dts_offset.unwrap(); - if dts > dts_offset { - gst::warning!(CAT, obj: &stream.sinkpad, "DTS before first DTS"); - gst::ClockTime::ZERO - } else { - dts_offset - dts - } - } - }; - - let signed_end_dts = - segment - .to_running_time_full(end_dts_position) - .ok_or_else(|| { - gst::error!( - CAT, - obj: &stream.sinkpad, - "Couldn't convert end DTS to running time" - ); - gst::FlowError::Error - })?; - let mut end_dts = match signed_end_dts { - Positive(dts) => { - if let Some(dts_offset) = stream.dts_offset { - dts + dts_offset - } else { - dts - } - } - Negative(dts) => { - if stream.dts_offset.is_none() { - stream.dts_offset = Some(dts); - } - - let dts_offset = stream.dts_offset.unwrap(); - if dts > dts_offset { - gst::warning!(CAT, obj: &stream.sinkpad, "End DTS before first DTS"); - gst::ClockTime::ZERO - } else { - dts_offset - dts - } - } - }; - - // Enforce monotonically increasing DTS for intra-only streams - // NOTE: PTS stays the same so this will cause a bigger PTS/DTS difference - // FIXME: Is this correct? - if dts < stream.current_position { - gst::warning!( - CAT, - obj: &stream.sinkpad, - "Decreasing DTS {} < {}", - dts, - stream.current_position, - ); - dts = stream.current_position; - } else { - stream.current_position = dts; - } - end_dts = std::cmp::max(end_dts, dts); - - (Some(dts_position), Some(dts), Some(end_dts)) - }; - - // If this is a multi-stream element then we need to update the PTS/DTS positions according - // to the output segment, specifically to re-timestamp them with the running time and - // adjust for the segment shift to compensate for negative DTS. - let aggregator = self.instance(); - let class = aggregator.class(); - let (pts_position, dts_position) = if class.as_ref().variant.is_single_stream() { - (pts_position, dts_position) - } else { - let pts_position = pts + SEGMENT_OFFSET; - let dts_position = dts.map(|dts| { - dts + SEGMENT_OFFSET - stream.dts_offset.unwrap_or(gst::ClockTime::ZERO) - }); - - let buffer = buffer.make_mut(); - buffer.set_pts(pts_position); - buffer.set_dts(dts_position); - - (pts_position, dts_position) - }; - - if !buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) { - gst::debug!( - CAT, - obj: &stream.sinkpad, - "Starting new GOP at PTS {} DTS {} (DTS offset {})", - pts, - dts.display(), - stream.dts_offset.display(), - ); - - let gop = Gop { - start_pts: pts, - start_dts: dts, - start_dts_position: if intra_only { None } else { dts_position }, - earliest_pts: pts, - earliest_pts_position: pts_position, - final_earliest_pts: intra_only, - end_pts, - end_dts, - final_end_pts: false, - buffers: vec![GopBuffer { buffer, pts, dts }], - }; - stream.queued_gops.push_front(gop); - - if let Some(prev_gop) = stream.queued_gops.get_mut(1) { - gst::debug!( - CAT, - obj: &stream.sinkpad, - "Updating previous GOP starting at PTS {} to end PTS {} DTS {}", - prev_gop.earliest_pts, - pts, - dts.display(), - ); - - prev_gop.end_pts = std::cmp::max(prev_gop.end_pts, pts); - prev_gop.end_dts = std::cmp::max(prev_gop.end_dts, dts); - - if intra_only { - prev_gop.final_end_pts = true; - } - - if !prev_gop.final_earliest_pts { - // Don't bother logging this for intra-only streams as it would be for every - // single buffer. - if !intra_only { - gst::debug!( - CAT, - obj: &stream.sinkpad, - "Previous GOP has final earliest PTS at {}", - prev_gop.earliest_pts - ); - } - - prev_gop.final_earliest_pts = true; - if let Some(prev_prev_gop) = stream.queued_gops.get_mut(2) { - prev_prev_gop.final_end_pts = true; - } - } - } - } else if let Some(gop) = stream.queued_gops.front_mut() { - assert!(!intra_only); - - // We require DTS for non-intra-only streams - let dts = dts.unwrap(); - let end_dts = end_dts.unwrap(); - - gop.end_pts = std::cmp::max(gop.end_pts, end_pts); - gop.end_dts = Some(std::cmp::max(gop.end_dts.expect("no end DTS"), end_dts)); - gop.buffers.push(GopBuffer { - buffer, - pts, - dts: Some(dts), - }); - - if gop.earliest_pts > pts && !gop.final_earliest_pts { - gst::debug!( - CAT, - obj: &stream.sinkpad, - "Updating current GOP earliest PTS from {} to {}", - gop.earliest_pts, - pts - ); - gop.earliest_pts = pts; - gop.earliest_pts_position = pts_position; - - if let Some(prev_gop) = stream.queued_gops.get_mut(1) { - if prev_gop.end_pts < pts { - gst::debug!( - CAT, - obj: &stream.sinkpad, - "Updating previous GOP starting PTS {} end time from {} to {}", - pts, - prev_gop.end_pts, - pts - ); - prev_gop.end_pts = pts; - } - } - } - - let gop = stream.queued_gops.front_mut().unwrap(); - - // The earliest PTS is known when the current DTS is bigger or equal to the first - // PTS that was observed in this GOP. If there was another frame later that had a - // lower PTS then it wouldn't be possible to display it in time anymore, i.e. the - // stream would be invalid. - if gop.start_pts <= dts && !gop.final_earliest_pts { - gst::debug!( - CAT, - obj: &stream.sinkpad, - "GOP has final earliest PTS at {}", - gop.earliest_pts - ); - gop.final_earliest_pts = true; - - if let Some(prev_gop) = stream.queued_gops.get_mut(1) { - prev_gop.final_end_pts = true; - } - } - } else { - gst::warning!( - CAT, - obj: &stream.sinkpad, - "Waiting for keyframe at the beginning of the stream" - ); - } - - if let Some((prev_gop, first_gop)) = Option::zip( - stream.queued_gops.iter().find(|gop| gop.final_end_pts), - stream.queued_gops.back(), - ) { - gst::debug!( - CAT, - obj: &stream.sinkpad, - "Queued full GOPs duration updated to {}", - prev_gop.end_pts.saturating_sub(first_gop.earliest_pts), - ); - } - - gst::debug!( - CAT, - obj: &stream.sinkpad, - "Queued duration updated to {}", - Option::zip(stream.queued_gops.front(), stream.queued_gops.back()) - .map(|(end, start)| end.end_pts.saturating_sub(start.start_pts)) - .unwrap_or(gst::ClockTime::ZERO) - ); - - Ok(()) - } - - #[allow(clippy::type_complexity)] - fn drain_buffers( - &self, - state: &mut State, - settings: &Settings, - timeout: bool, - at_eos: bool, - ) -> Result< - ( - // Drained streams - Vec<( - gst::Caps, - Option<super::FragmentTimingInfo>, - VecDeque<Buffer>, - )>, - // Minimum earliest PTS position of all streams - Option<gst::ClockTime>, - // Minimum earliest PTS of all streams - Option<gst::ClockTime>, - // Minimum start DTS position of all streams (if any stream has DTS) - Option<gst::ClockTime>, - // End PTS of this drained fragment, i.e. start PTS of the next fragment - Option<gst::ClockTime>, - ), - gst::FlowError, - > { - let mut drained_streams = Vec::with_capacity(state.streams.len()); - - let mut min_earliest_pts_position = None; - let mut min_earliest_pts = None; - let mut min_start_dts_position = None; - let mut fragment_end_pts = None; - - // The first stream decides how much can be dequeued, if anything at all. - // - // All complete GOPs (or at EOS everything) up to the fragment duration will be dequeued - // but on timeout in live pipelines it might happen that the first stream does not have a - // complete GOP queued. In that case nothing is dequeued for any of the streams and the - // timeout is advanced by 1s until at least one complete GOP can be dequeued. - // - // If the first stream is already EOS then the next stream that is not EOS yet will be - // taken in its place. - let fragment_start_pts = state.fragment_start_pts.unwrap(); - gst::info!( - CAT, - imp: self, - "Starting to drain at {}", - fragment_start_pts - ); - - for (idx, stream) in state.streams.iter_mut().enumerate() { - assert!( - timeout - || at_eos - || stream.sinkpad.is_eos() - || stream.queued_gops.get(1).map(|gop| gop.final_earliest_pts) == Some(true) - ); - - // Drain all complete GOPs until at most one fragment duration was dequeued for the - // first stream, or until the dequeued duration of the first stream. - let mut gops = Vec::with_capacity(stream.queued_gops.len()); - let dequeue_end_pts = - fragment_end_pts.unwrap_or(fragment_start_pts + settings.fragment_duration); - gst::trace!( - CAT, - obj: &stream.sinkpad, - "Draining up to end PTS {} / duration {}", - dequeue_end_pts, - dequeue_end_pts - fragment_start_pts - ); - - while let Some(gop) = stream.queued_gops.back() { - // If this GOP is not complete then we can't pop it yet. - // - // If there was no complete GOP at all yet then it might be bigger than the - // fragment duration. In this case we might not be able to handle the latency - // requirements in a live pipeline. - if !gop.final_end_pts && !at_eos && !stream.sinkpad.is_eos() { - break; - } - - // If this GOP starts after the fragment end then don't dequeue it yet unless this is - // the first stream and no GOPs were dequeued at all yet. This would mean that the - // GOP is bigger than the fragment duration. - if gop.end_pts > dequeue_end_pts && (fragment_end_pts.is_some() || !gops.is_empty()) - { - break; - } - - gops.push(stream.queued_gops.pop_back().unwrap()); - } - stream.fragment_filled = false; - - // If we don't have a next fragment start PTS then this is the first stream as above. - if fragment_end_pts.is_none() { - if let Some(last_gop) = gops.last() { - // Dequeued something so let's take the end PTS of the last GOP - fragment_end_pts = Some(last_gop.end_pts); - gst::info!( - CAT, - obj: &stream.sinkpad, - "Draining up to PTS {} for this fragment", - last_gop.end_pts, - ); - } else { - // If nothing was dequeued for the first stream then this is OK if we're at - // EOS: we just consider the next stream as first stream then. - if at_eos || stream.sinkpad.is_eos() { - // This is handled below generally if nothing was dequeued - } else { - // Otherwise this can only really happen on timeout in live pipelines. - assert!(timeout); - - gst::warning!( - CAT, - obj: &stream.sinkpad, - "Don't have a complete GOP for the first stream on timeout in a live pipeline", - ); - - // In this case we advance the timeout by 1s and hope that things are - // better then. - return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA); - } - } - } - - if gops.is_empty() { - gst::info!( - CAT, - obj: &stream.sinkpad, - "Draining no buffers", - ); - - drained_streams.push((stream.caps.clone(), None, VecDeque::new())); - continue; - } - - assert!(fragment_end_pts.is_some()); - - let first_gop = gops.first().unwrap(); - let last_gop = gops.last().unwrap(); - let earliest_pts = first_gop.earliest_pts; - let earliest_pts_position = first_gop.earliest_pts_position; - let start_dts = first_gop.start_dts; - let start_dts_position = first_gop.start_dts_position; - let end_pts = last_gop.end_pts; - let dts_offset = stream.dts_offset; - - if min_earliest_pts.opt_gt(earliest_pts).unwrap_or(true) { - min_earliest_pts = Some(earliest_pts); - } - if min_earliest_pts_position - .opt_gt(earliest_pts_position) - .unwrap_or(true) - { - min_earliest_pts_position = Some(earliest_pts_position); - } - if let Some(start_dts_position) = start_dts_position { - if min_start_dts_position - .opt_gt(start_dts_position) - .unwrap_or(true) - { - min_start_dts_position = Some(start_dts_position); - } - } - - gst::info!( - CAT, - obj: &stream.sinkpad, - "Draining {} worth of buffers starting at PTS {} DTS {}, DTS offset {}", - end_pts.saturating_sub(earliest_pts), - earliest_pts, - start_dts.display(), - dts_offset.display(), - ); - - if let Some((prev_gop, first_gop)) = Option::zip( - stream.queued_gops.iter().find(|gop| gop.final_end_pts), - stream.queued_gops.back(), - ) { - gst::debug!( - CAT, - obj: &stream.sinkpad, - "Queued full GOPs duration updated to {}", - prev_gop.end_pts.saturating_sub(first_gop.earliest_pts), - ); - } - - gst::debug!( - CAT, - obj: &stream.sinkpad, - "Queued duration updated to {}", - Option::zip(stream.queued_gops.front(), stream.queued_gops.back()) - .map(|(end, start)| end.end_pts.saturating_sub(start.start_pts)) - .unwrap_or(gst::ClockTime::ZERO) - ); - - let start_time = if stream.intra_only { - earliest_pts - } else { - start_dts.unwrap() - }; - - let mut buffers = VecDeque::with_capacity(gops.iter().map(|g| g.buffers.len()).sum()); - - for gop in gops { - let mut gop_buffers = gop.buffers.into_iter().peekable(); - while let Some(buffer) = gop_buffers.next() { - let timestamp = if stream.intra_only { - buffer.pts - } else { - buffer.dts.unwrap() - }; - - let end_timestamp = match gop_buffers.peek() { - Some(buffer) => { - if stream.intra_only { - buffer.pts - } else { - buffer.dts.unwrap() - } - } - None => { - if stream.intra_only { - gop.end_pts - } else { - gop.end_dts.unwrap() - } - } - }; - - // Timestamps are enforced to monotonically increase when queueing buffers - let duration = end_timestamp - .checked_sub(timestamp) - .expect("Timestamps going backwards"); - - let composition_time_offset = if stream.intra_only { - None - } else { - let pts = buffer.pts; - let dts = buffer.dts.unwrap(); - - if pts > dts { - Some( - i64::try_from((pts - dts).nseconds()) - .map_err(|_| { - gst::error!(CAT, obj: &stream.sinkpad, "Too big PTS/DTS difference"); - gst::FlowError::Error - })?, - ) - } else { - let diff = i64::try_from((dts - pts).nseconds()) - .map_err(|_| { - gst::error!(CAT, obj: &stream.sinkpad, "Too big PTS/DTS difference"); - gst::FlowError::Error - })?; - Some(-diff) - } - }; - - buffers.push_back(Buffer { - idx, - buffer: buffer.buffer, - timestamp, - duration, - composition_time_offset, - }); - } - } - - drained_streams.push(( - stream.caps.clone(), - Some(super::FragmentTimingInfo { - start_time, - intra_only: stream.intra_only, - }), - buffers, - )); - } - - Ok(( - drained_streams, - min_earliest_pts_position, - min_earliest_pts, - min_start_dts_position, - fragment_end_pts, - )) - } - - fn preprocess_drained_streams_onvif( - &self, - state: &mut State, - drained_streams: &mut [( - gst::Caps, - Option<super::FragmentTimingInfo>, - VecDeque<Buffer>, - )], - ) -> Result<Option<gst::ClockTime>, gst::FlowError> { - let aggregator = self.instance(); - if aggregator.class().as_ref().variant != super::Variant::ONVIF { - return Ok(None); - } - - let mut max_end_utc_time = None; - - let calculate_pts = |buffer: &Buffer| -> gst::ClockTime { - let composition_time_offset = buffer.composition_time_offset.unwrap_or(0); - if composition_time_offset > 0 { - buffer.timestamp + (composition_time_offset as u64).nseconds() - } else { - buffer - .timestamp - .checked_sub(((-composition_time_offset) as u64).nseconds()) - .unwrap() - } - }; - - // If this is the first fragment then allow the first buffers to not have a reference - // timestamp meta and backdate them - if state.stream_header.is_none() { - for (idx, (_, _, drain_buffers)) in drained_streams.iter_mut().enumerate() { - let (buffer_idx, utc_time, buffer) = - match drain_buffers.iter().enumerate().find_map(|(idx, buffer)| { - get_utc_time_from_buffer(&buffer.buffer) - .map(|timestamp| (idx, timestamp, buffer)) - }) { - None => { - gst::error!( - CAT, - obj: &state.streams[idx].sinkpad, - "No reference timestamp set on any buffers in the first fragment", - ); - return Err(gst::FlowError::Error); - } - Some(res) => res, - }; - - // Now do the backdating - if buffer_idx > 0 { - let utc_time_pts = calculate_pts(buffer); - - for buffer in drain_buffers.iter_mut().take(buffer_idx) { - let buffer_pts = calculate_pts(buffer); - let buffer_pts_diff = if utc_time_pts >= buffer_pts { - (utc_time_pts - buffer_pts).nseconds() as i64 - } else { - -((buffer_pts - utc_time_pts).nseconds() as i64) - }; - let buffer_utc_time = if buffer_pts_diff >= 0 { - utc_time - .checked_sub((buffer_pts_diff as u64).nseconds()) - .unwrap() - } else { - utc_time - .checked_add(((-buffer_pts_diff) as u64).nseconds()) - .unwrap() - }; - - let buffer = buffer.buffer.make_mut(); - gst::ReferenceTimestampMeta::add( - buffer, - &UNIX_CAPS, - buffer_utc_time, - gst::ClockTime::NONE, - ); - } - } - } - } - - // Calculate the minimum across all streams and remember that - if state.start_utc_time.is_none() { - let mut start_utc_time = None; - - for (idx, (_, _, drain_buffers)) in drained_streams.iter().enumerate() { - for buffer in drain_buffers { - let utc_time = match get_utc_time_from_buffer(&buffer.buffer) { - None => { - gst::error!( - CAT, - obj: &state.streams[idx].sinkpad, - "No reference timestamp set on all buffers" - ); - return Err(gst::FlowError::Error); - } - Some(utc_time) => utc_time, - }; - - if start_utc_time.is_none() || start_utc_time > Some(utc_time) { - start_utc_time = Some(utc_time); - } - } - } - - gst::debug!( - CAT, - imp: self, - "Configuring start UTC time {}", - start_utc_time.unwrap() - ); - state.start_utc_time = start_utc_time; - } - - // Update all buffer timestamps based on the UTC time and offset to the start UTC time - let start_utc_time = state.start_utc_time.unwrap(); - for (idx, (_, timing_info, drain_buffers)) in drained_streams.iter_mut().enumerate() { - let mut start_time = None; - - for buffer in drain_buffers.iter_mut() { - let utc_time = match get_utc_time_from_buffer(&buffer.buffer) { - None => { - gst::error!( - CAT, - obj: &state.streams[idx].sinkpad, - "No reference timestamp set on all buffers" - ); - return Err(gst::FlowError::Error); - } - Some(utc_time) => utc_time, - }; - - // Convert PTS UTC time to DTS - let mut utc_time_dts = - if let Some(composition_time_offset) = buffer.composition_time_offset { - if composition_time_offset >= 0 { - utc_time - .checked_sub((composition_time_offset as u64).nseconds()) - .unwrap() - } else { - utc_time - .checked_add(((-composition_time_offset) as u64).nseconds()) - .unwrap() - } - } else { - utc_time - }; - - // Enforce monotonically increasing timestamps - if utc_time_dts < state.streams[idx].current_utc_time { - gst::warning!( - CAT, - obj: &state.streams[idx].sinkpad, - "Decreasing UTC DTS timestamp for buffer {} < {}", - utc_time_dts, - state.streams[idx].current_utc_time, - ); - utc_time_dts = state.streams[idx].current_utc_time; - } else { - state.streams[idx].current_utc_time = utc_time_dts; - } - - let timestamp = utc_time_dts.checked_sub(start_utc_time).unwrap(); - - gst::trace!( - CAT, - obj: &state.streams[idx].sinkpad, - "Updating buffer timestamp from {} to relative UTC DTS time {} / absolute DTS time {}, UTC PTS time {}", - buffer.timestamp, - timestamp, - utc_time_dts, - utc_time, - ); - - buffer.timestamp = timestamp; - if start_time.is_none() || start_time > Some(buffer.timestamp) { - start_time = Some(buffer.timestamp); - } - } - - // Update durations for all buffers except for the last in the fragment unless all - // have the same duration anyway - let mut common_duration = Ok(None); - let mut drain_buffers_iter = drain_buffers.iter_mut().peekable(); - while let Some(buffer) = drain_buffers_iter.next() { - let next_timestamp = drain_buffers_iter.peek().map(|b| b.timestamp); - - if let Some(next_timestamp) = next_timestamp { - let duration = next_timestamp.saturating_sub(buffer.timestamp); - if common_duration == Ok(None) { - common_duration = Ok(Some(duration)); - } else if common_duration != Ok(Some(duration)) { - common_duration = Err(()); - } - - gst::trace!( - CAT, - obj: &state.streams[idx].sinkpad, - "Updating buffer with timestamp {} duration from {} to relative UTC duration {}", - buffer.timestamp, - buffer.duration, - duration, - ); - - buffer.duration = duration; - } else if let Ok(Some(common_duration)) = common_duration { - gst::trace!( - CAT, - obj: &state.streams[idx].sinkpad, - "Updating last buffer with timestamp {} duration from {} to common relative UTC duration {}", - buffer.timestamp, - buffer.duration, - common_duration, - ); - - buffer.duration = common_duration; - } else { - gst::trace!( - CAT, - obj: &state.streams[idx].sinkpad, - "Keeping last buffer with timestamp {} duration at {}", - buffer.timestamp, - buffer.duration, - ); - } - - let end_utc_time = start_utc_time + buffer.timestamp + buffer.duration; - if max_end_utc_time.is_none() || max_end_utc_time < Some(end_utc_time) { - max_end_utc_time = Some(end_utc_time); - } - } - - if let Some(start_time) = start_time { - gst::debug!(CAT, obj: &state.streams[idx].sinkpad, "Fragment starting at UTC time {}", start_time); - timing_info.as_mut().unwrap().start_time = start_time; - } else { - assert!(timing_info.is_none()); - } - } - - Ok(max_end_utc_time) - } - - #[allow(clippy::type_complexity)] - fn interleave_buffers( - &self, - settings: &Settings, - mut drained_streams: Vec<( - gst::Caps, - Option<super::FragmentTimingInfo>, - VecDeque<Buffer>, - )>, - ) -> Result< - ( - Vec<Buffer>, - Vec<(gst::Caps, Option<super::FragmentTimingInfo>)>, - ), - gst::FlowError, - > { - let mut interleaved_buffers = - Vec::with_capacity(drained_streams.iter().map(|(_, _, bufs)| bufs.len()).sum()); - while let Some((_idx, (_, _, bufs))) = drained_streams.iter_mut().enumerate().min_by( - |(a_idx, (_, _, a)), (b_idx, (_, _, b))| { - let (a, b) = match (a.front(), b.front()) { - (None, None) => return std::cmp::Ordering::Equal, - (None, _) => return std::cmp::Ordering::Greater, - (_, None) => return std::cmp::Ordering::Less, - (Some(a), Some(b)) => (a, b), - }; - - match a.timestamp.cmp(&b.timestamp) { - std::cmp::Ordering::Equal => a_idx.cmp(b_idx), - cmp => cmp, - } - }, - ) { - let start_time = match bufs.front() { - None => { - // No more buffers now - break; - } - Some(buf) => buf.timestamp, - }; - let mut current_end_time = start_time; - let mut dequeued_bytes = 0; - - while settings - .interleave_bytes - .opt_ge(dequeued_bytes) - .unwrap_or(true) - && settings - .interleave_time - .opt_ge(current_end_time.saturating_sub(start_time)) - .unwrap_or(true) - { - if let Some(buffer) = bufs.pop_front() { - current_end_time = buffer.timestamp + buffer.duration; - dequeued_bytes += buffer.buffer.size() as u64; - interleaved_buffers.push(buffer); - } else { - // No buffers left in this stream, go to next stream - break; - } - } - } - - // All buffers should be consumed now - assert!(drained_streams.iter().all(|(_, _, bufs)| bufs.is_empty())); - - let streams = drained_streams - .into_iter() - .map(|(caps, timing_info, _)| (caps, timing_info)) - .collect::<Vec<_>>(); - - Ok((interleaved_buffers, streams)) - } - - fn drain( - &self, - state: &mut State, - settings: &Settings, - timeout: bool, - at_eos: bool, - upstream_events: &mut Vec<(gst_base::AggregatorPad, gst::Event)>, - ) -> Result<(Option<gst::Caps>, Option<gst::BufferList>), gst::FlowError> { - if at_eos { - gst::info!(CAT, imp: self, "Draining at EOS"); - } else if timeout { - gst::info!(CAT, imp: self, "Draining at timeout"); - } else { - for stream in &state.streams { - if !stream.fragment_filled && !stream.sinkpad.is_eos() { - return Ok((None, None)); - } - } - } - - // Collect all buffers and their timing information that are to be drained right now. - let ( - mut drained_streams, - min_earliest_pts_position, - min_earliest_pts, - min_start_dts_position, - fragment_end_pts, - ) = self.drain_buffers(state, settings, timeout, at_eos)?; - - // Remove all GAP buffers before processing them further - for (_, _, buffers) in &mut drained_streams { - buffers.retain(|buf| { - !buf.buffer.flags().contains(gst::BufferFlags::GAP) - || !buf.buffer.flags().contains(gst::BufferFlags::DROPPABLE) - || buf.buffer.size() != 0 - }); - } - - // For ONVIF, replace all timestamps with timestamps based on UTC times. - let max_end_utc_time = - self.preprocess_drained_streams_onvif(state, &mut drained_streams)?; - - // Create header now if it was not created before and return the caps - let mut caps = None; - if state.stream_header.is_none() { - let (_, new_caps) = self.update_header(state, settings, false)?.unwrap(); - caps = Some(new_caps); - } - - // Interleave buffers according to the settings into a single vec - let (mut interleaved_buffers, streams) = - self.interleave_buffers(settings, drained_streams)?; - - let mut buffer_list = None; - if interleaved_buffers.is_empty() { - assert!(at_eos); - } else { - // If there are actual buffers to output then create headers as needed and create a - // bufferlist for all buffers that have to be output. - let min_earliest_pts_position = min_earliest_pts_position.unwrap(); - let min_earliest_pts = min_earliest_pts.unwrap(); - let fragment_end_pts = fragment_end_pts.unwrap(); - - let mut fmp4_header = None; - if !state.sent_headers { - let mut buffer = state.stream_header.as_ref().unwrap().copy(); - { - let buffer = buffer.get_mut().unwrap(); - - buffer.set_pts(min_earliest_pts_position); - buffer.set_dts(min_start_dts_position); - - // Header is DISCONT|HEADER - buffer.set_flags(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER); - } - - fmp4_header = Some(buffer); - - state.sent_headers = true; - } - - // TODO: Write prft boxes before moof - // TODO: Write sidx boxes before moof and rewrite once offsets are known - - if state.sequence_number == 0 { - state.sequence_number = 1; - } - let sequence_number = state.sequence_number; - state.sequence_number += 1; - let (mut fmp4_fragment_header, moof_offset) = - boxes::create_fmp4_fragment_header(super::FragmentHeaderConfiguration { - variant: self.instance().class().as_ref().variant, - sequence_number, - streams: streams.as_slice(), - buffers: interleaved_buffers.as_slice(), - }) - .map_err(|err| { - gst::error!( - CAT, - imp: self, - "Failed to create FMP4 fragment header: {}", - err - ); - gst::FlowError::Error - })?; - - { - let buffer = fmp4_fragment_header.get_mut().unwrap(); - buffer.set_pts(min_earliest_pts_position); - buffer.set_dts(min_start_dts_position); - buffer.set_duration(fragment_end_pts.checked_sub(min_earliest_pts)); - - // Fragment header is HEADER - buffer.set_flags(gst::BufferFlags::HEADER); - - // Copy metas from the first actual buffer to the fragment header. This allows - // getting things like the reference timestamp meta or the timecode meta to identify - // the fragment. - let _ = interleaved_buffers[0].buffer.copy_into( - buffer, - gst::BufferCopyFlags::META, - 0, - None, - ); - } - - let moof_offset = state.current_offset - + fmp4_header.as_ref().map(|h| h.size()).unwrap_or(0) as u64 - + moof_offset; - - let buffers_len = interleaved_buffers.len(); - for (idx, buffer) in interleaved_buffers.iter_mut().enumerate() { - // Fix up buffer flags, all other buffers are DELTA_UNIT - let buffer_ref = buffer.buffer.make_mut(); - buffer_ref.unset_flags(gst::BufferFlags::all()); - buffer_ref.set_flags(gst::BufferFlags::DELTA_UNIT); - - // Set the marker flag for the last buffer of the segment - if idx == buffers_len - 1 { - buffer_ref.set_flags(gst::BufferFlags::MARKER); - } - } - - buffer_list = Some( - fmp4_header - .into_iter() - .chain(Some(fmp4_fragment_header)) - .chain(interleaved_buffers.into_iter().map(|buffer| buffer.buffer)) - .inspect(|b| { - state.current_offset += b.size() as u64; - }) - .collect::<gst::BufferList>(), - ); - - // Write mfra only for the main stream, and if there are no buffers for the main stream - // in this segment then don't write anything. - if let Some((_caps, Some(ref timing_info))) = streams.get(0) { - state.fragment_offsets.push(super::FragmentOffset { - time: timing_info.start_time, - offset: moof_offset, - }); - } - - state.end_pts = Some(fragment_end_pts); - state.end_utc_time = max_end_utc_time; - - // Update for the start PTS of the next fragment - gst::info!( - CAT, - imp: self, - "Starting new fragment at {}", - fragment_end_pts, - ); - state.fragment_start_pts = Some(fragment_end_pts); - - gst::debug!( - CAT, - imp: self, - "Sending force-keyunit events for running time {}", - fragment_end_pts + settings.fragment_duration, - ); - - let fku = gst_video::UpstreamForceKeyUnitEvent::builder() - .running_time(fragment_end_pts + settings.fragment_duration) - .all_headers(true) - .build(); - - for stream in &state.streams { - upstream_events.push((stream.sinkpad.clone(), fku.clone())); - } - - // Reset timeout delay now that we've output an actual fragment - state.timeout_delay = gst::ClockTime::ZERO; - } - - if settings.write_mfra && at_eos { - match boxes::create_mfra(&streams[0].0, &state.fragment_offsets) { - Ok(mut mfra) => { - { - let mfra = mfra.get_mut().unwrap(); - // mfra is HEADER|DELTA_UNIT like other boxes - mfra.set_flags(gst::BufferFlags::HEADER | gst::BufferFlags::DELTA_UNIT); - } - - if buffer_list.is_none() { - buffer_list = Some(gst::BufferList::new_sized(1)); - } - buffer_list.as_mut().unwrap().get_mut().unwrap().add(mfra); - } - Err(err) => { - gst::error!(CAT, imp: self, "Failed to create mfra box: {}", err); - } - } - } - - // TODO: Write edit list at EOS - // TODO: Rewrite bitrates at EOS - - Ok((caps, buffer_list)) - } - - fn create_streams(&self, state: &mut State) -> Result<(), gst::FlowError> { - for pad in self - .instance() - .sink_pads() - .into_iter() - .map(|pad| pad.downcast::<gst_base::AggregatorPad>().unwrap()) - { - let caps = match pad.current_caps() { - Some(caps) => caps, - None => { - gst::warning!(CAT, obj: &pad, "Skipping pad without caps"); - continue; - } - }; - - gst::info!(CAT, obj: &pad, "Configuring caps {:?}", caps); - - let s = caps.structure(0).unwrap(); - - let mut intra_only = false; - match s.name() { - "video/x-h264" | "video/x-h265" => { - if !s.has_field_with_type("codec_data", gst::Buffer::static_type()) { - gst::error!(CAT, obj: &pad, "Received caps without codec_data"); - return Err(gst::FlowError::NotNegotiated); - } - } - "image/jpeg" => { - intra_only = true; - } - "audio/mpeg" => { - if !s.has_field_with_type("codec_data", gst::Buffer::static_type()) { - gst::error!(CAT, obj: &pad, "Received caps without codec_data"); - return Err(gst::FlowError::NotNegotiated); - } - intra_only = true; - } - "audio/x-alaw" | "audio/x-mulaw" => { - intra_only = true; - } - "audio/x-adpcm" => { - intra_only = true; - } - "application/x-onvif-metadata" => { - intra_only = true; - } - _ => unreachable!(), - } - - state.streams.push(Stream { - sinkpad: pad, - caps, - intra_only, - queued_gops: VecDeque::new(), - fragment_filled: false, - dts_offset: None, - current_position: gst::ClockTime::ZERO, - current_utc_time: gst::ClockTime::ZERO, - }); - } - - if state.streams.is_empty() { - gst::error!(CAT, imp: self, "No streams available"); - return Err(gst::FlowError::Error); - } - - // Sort video streams first and then audio streams and then metadata streams, and each group by pad name. - state.streams.sort_by(|a, b| { - let order_of_caps = |caps: &gst::CapsRef| { - let s = caps.structure(0).unwrap(); - - if s.name().starts_with("video/") { - 0 - } else if s.name().starts_with("audio/") { - 1 - } else if s.name().starts_with("application/x-onvif-metadata") { - 2 - } else { - unimplemented!(); - } - }; - - let st_a = order_of_caps(&a.caps); - let st_b = order_of_caps(&b.caps); - - if st_a == st_b { - return a.sinkpad.name().cmp(&b.sinkpad.name()); - } - - st_a.cmp(&st_b) - }); - - Ok(()) - } - - fn update_header( - &self, - state: &mut State, - settings: &Settings, - at_eos: bool, - ) -> Result<Option<(gst::BufferList, gst::Caps)>, gst::FlowError> { - let aggregator = self.instance(); - let class = aggregator.class(); - let variant = class.as_ref().variant; - - if settings.header_update_mode == super::HeaderUpdateMode::None && at_eos { - return Ok(None); - } - - assert!(!at_eos || state.streams.iter().all(|s| s.queued_gops.is_empty())); - - let duration = if variant == super::Variant::ONVIF { - state - .end_utc_time - .opt_checked_sub(state.start_utc_time) - .ok() - .flatten() - } else { - state - .end_pts - .opt_checked_sub(state.earliest_pts) - .ok() - .flatten() - }; - - let streams = state - .streams - .iter() - .map(|s| s.caps.clone()) - .collect::<Vec<_>>(); - - let mut buffer = boxes::create_fmp4_header(super::HeaderConfiguration { - variant, - update: at_eos, - streams: streams.as_slice(), - write_mehd: settings.write_mehd, - duration: if at_eos { duration } else { None }, - start_utc_time: state - .start_utc_time - .map(|unix| unix.nseconds() / 100 + UNIX_1601_OFFSET * 10_000_000), - }) - .map_err(|err| { - gst::error!(CAT, imp: self, "Failed to create FMP4 header: {}", err); - gst::FlowError::Error - })?; - - { - let buffer = buffer.get_mut().unwrap(); - - // No timestamps - - // Header is DISCONT|HEADER - buffer.set_flags(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER); - } - - // Remember stream header for later - state.stream_header = Some(buffer.clone()); - - let variant = match variant { - super::Variant::ISO | super::Variant::DASH | super::Variant::ONVIF => "iso-fragmented", - super::Variant::CMAF => "cmaf", - }; - let caps = gst::Caps::builder("video/quicktime") - .field("variant", variant) - .field("streamheader", gst::Array::new(&[&buffer])) - .build(); - - let mut list = gst::BufferList::new_sized(1); - { - let list = list.get_mut().unwrap(); - list.add(buffer); - } - - Ok(Some((list, caps))) - } -} - -#[glib::object_subclass] -impl ObjectSubclass for FMP4Mux { - const NAME: &'static str = "GstFMP4Mux"; - type Type = super::FMP4Mux; - type ParentType = gst_base::Aggregator; - type Class = Class; -} - -impl ObjectImpl for FMP4Mux { - fn properties() -> &'static [glib::ParamSpec] { - static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| { - vec![ - // TODO: Add chunk-duration property separate from fragment-size - glib::ParamSpecUInt64::builder("fragment-duration") - .nick("Fragment Duration") - .blurb("Duration for each FMP4 fragment") - .default_value(DEFAULT_FRAGMENT_DURATION.nseconds()) - .mutable_ready() - .build(), - glib::ParamSpecEnum::builder::<super::HeaderUpdateMode>("header-update-mode", DEFAULT_HEADER_UPDATE_MODE) - .nick("Header update mode") - .blurb("Mode for updating the header at the end of the stream") - .mutable_ready() - .build(), - glib::ParamSpecBoolean::builder("write-mfra") - .nick("Write mfra box") - .blurb("Write fragment random access box at the end of the stream") - .default_value(DEFAULT_WRITE_MFRA) - .mutable_ready() - .build(), - glib::ParamSpecBoolean::builder("write-mehd") - .nick("Write mehd box") - .blurb("Write movie extends header box with the duration at the end of the stream (needs a header-update-mode enabled)") - .default_value(DEFAULT_WRITE_MFRA) - .mutable_ready() - .build(), - glib::ParamSpecUInt64::builder("interleave-bytes") - .nick("Interleave Bytes") - .blurb("Interleave between streams in bytes") - .default_value(DEFAULT_INTERLEAVE_BYTES.unwrap_or(0)) - .mutable_ready() - .build(), - glib::ParamSpecUInt64::builder("interleave-time") - .nick("Interleave Time") - .blurb("Interleave between streams in nanoseconds") - .default_value(DEFAULT_INTERLEAVE_TIME.map(gst::ClockTime::nseconds).unwrap_or(u64::MAX)) - .mutable_ready() - .build(), - ] - }); - - &*PROPERTIES - } - - fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { - match pspec.name() { - "fragment-duration" => { - let mut settings = self.settings.lock().unwrap(); - let fragment_duration = value.get().expect("type checked upstream"); - if settings.fragment_duration != fragment_duration { - settings.fragment_duration = fragment_duration; - drop(settings); - self.instance().set_latency(fragment_duration, None); - } - } - - "header-update-mode" => { - let mut settings = self.settings.lock().unwrap(); - settings.header_update_mode = value.get().expect("type checked upstream"); - } - - "write-mfra" => { - let mut settings = self.settings.lock().unwrap(); - settings.write_mfra = value.get().expect("type checked upstream"); - } - - "write-mehd" => { - let mut settings = self.settings.lock().unwrap(); - settings.write_mehd = value.get().expect("type checked upstream"); - } - - "interleave-bytes" => { - let mut settings = self.settings.lock().unwrap(); - settings.interleave_bytes = match value.get().expect("type checked upstream") { - 0 => None, - v => Some(v), - }; - } - - "interleave-time" => { - let mut settings = self.settings.lock().unwrap(); - settings.interleave_time = match value.get().expect("type checked upstream") { - Some(gst::ClockTime::ZERO) | None => None, - v => v, - }; - } - - _ => unimplemented!(), - } - } - - fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { - match pspec.name() { - "fragment-duration" => { - let settings = self.settings.lock().unwrap(); - settings.fragment_duration.to_value() - } - - "header-update-mode" => { - let settings = self.settings.lock().unwrap(); - settings.header_update_mode.to_value() - } - - "write-mfra" => { - let settings = self.settings.lock().unwrap(); - settings.write_mfra.to_value() - } - - "write-mehd" => { - let settings = self.settings.lock().unwrap(); - settings.write_mehd.to_value() - } - - "interleave-bytes" => { - let settings = self.settings.lock().unwrap(); - settings.interleave_bytes.unwrap_or(0).to_value() - } - - "interleave-time" => { - let settings = self.settings.lock().unwrap(); - settings.interleave_time.to_value() - } - - _ => unimplemented!(), - } - } - - fn constructed(&self) { - self.parent_constructed(); - - let obj = self.instance(); - let class = obj.class(); - for templ in class.pad_template_list().filter(|templ| { - templ.presence() == gst::PadPresence::Always - && templ.direction() == gst::PadDirection::Sink - }) { - let sinkpad = - gst::PadBuilder::<gst_base::AggregatorPad>::from_template(&templ, Some("sink")) - .flags(gst::PadFlags::ACCEPT_INTERSECT) - .build(); - - obj.add_pad(&sinkpad).unwrap(); - } - - obj.set_latency(Settings::default().fragment_duration, None); - } -} - -impl GstObjectImpl for FMP4Mux {} - -impl ElementImpl for FMP4Mux { - fn request_new_pad( - &self, - templ: &gst::PadTemplate, - name: Option<&str>, - caps: Option<&gst::Caps>, - ) -> Option<gst::Pad> { - let state = self.state.lock().unwrap(); - if state.stream_header.is_some() { - gst::error!( - CAT, - imp: self, - "Can't request new pads after header was generated" - ); - return None; - } - - self.parent_request_new_pad(templ, name, caps) - } -} - -impl AggregatorImpl for FMP4Mux { - fn next_time(&self) -> Option<gst::ClockTime> { - let state = self.state.lock().unwrap(); - state.fragment_start_pts.opt_add(state.timeout_delay) - } - - fn sink_query( - &self, - aggregator_pad: &gst_base::AggregatorPad, - query: &mut gst::QueryRef, - ) -> bool { - use gst::QueryViewMut; - - gst::trace!(CAT, obj: aggregator_pad, "Handling query {:?}", query); - - match query.view_mut() { - QueryViewMut::Caps(q) => { - let allowed_caps = aggregator_pad - .current_caps() - .unwrap_or_else(|| aggregator_pad.pad_template_caps()); - - if let Some(filter_caps) = q.filter() { - let res = filter_caps - .intersect_with_mode(&allowed_caps, gst::CapsIntersectMode::First); - q.set_result(&res); - } else { - q.set_result(&allowed_caps); - } - - true - } - _ => self.parent_sink_query(aggregator_pad, query), - } - } - - fn sink_event_pre_queue( - &self, - aggregator_pad: &gst_base::AggregatorPad, - mut event: gst::Event, - ) -> Result<gst::FlowSuccess, gst::FlowError> { - use gst::EventView; - - gst::trace!(CAT, obj: aggregator_pad, "Handling event {:?}", event); - - match event.view() { - EventView::Segment(ev) => { - if ev.segment().format() != gst::Format::Time { - gst::warning!( - CAT, - obj: aggregator_pad, - "Received non-TIME segment, replacing with default TIME segment" - ); - let segment = gst::FormattedSegment::<gst::ClockTime>::new(); - event = gst::event::Segment::builder(&segment) - .seqnum(event.seqnum()) - .build(); - } - self.parent_sink_event_pre_queue(aggregator_pad, event) - } - _ => self.parent_sink_event_pre_queue(aggregator_pad, event), - } - } - - fn sink_event(&self, aggregator_pad: &gst_base::AggregatorPad, event: gst::Event) -> bool { - use gst::EventView; - - gst::trace!(CAT, obj: aggregator_pad, "Handling event {:?}", event); - - match event.view() { - EventView::Segment(ev) => { - // Already fixed-up above to always be a TIME segment - let segment = ev - .segment() - .clone() - .downcast::<gst::ClockTime>() - .expect("non-TIME segment"); - gst::info!(CAT, obj: aggregator_pad, "Received segment {:?}", segment); - - // Only forward the segment event verbatim if this is a single stream variant. - // Otherwise we have to produce a default segment and re-timestamp all buffers - // with their running time. - let aggregator = self.instance(); - let class = aggregator.class(); - if class.as_ref().variant.is_single_stream() { - aggregator.update_segment(&segment); - } - - self.parent_sink_event(aggregator_pad, event) - } - EventView::Tag(_ev) => { - // TODO: Maybe store for putting into the headers of the next fragment? - - self.parent_sink_event(aggregator_pad, event) - } - _ => self.parent_sink_event(aggregator_pad, event), - } - } - - fn src_query(&self, query: &mut gst::QueryRef) -> bool { - use gst::QueryViewMut; - - gst::trace!(CAT, imp: self, "Handling query {:?}", query); - - match query.view_mut() { - QueryViewMut::Seeking(q) => { - // We can't really handle seeking, it would break everything - q.set(false, gst::ClockTime::ZERO, gst::ClockTime::NONE); - true - } - _ => self.parent_src_query(query), - } - } - - fn src_event(&self, event: gst::Event) -> bool { - use gst::EventView; - - gst::trace!(CAT, imp: self, "Handling event {:?}", event); - - match event.view() { - EventView::Seek(_ev) => false, - _ => self.parent_src_event(event), - } - } - - fn flush(&self) -> Result<gst::FlowSuccess, gst::FlowError> { - self.parent_flush()?; - - let mut state = self.state.lock().unwrap(); - - for stream in &mut state.streams { - stream.queued_gops.clear(); - stream.dts_offset = None; - stream.current_position = gst::ClockTime::ZERO; - stream.current_utc_time = gst::ClockTime::ZERO; - stream.fragment_filled = false; - } - - state.current_offset = 0; - state.fragment_offsets.clear(); - - Ok(gst::FlowSuccess::Ok) - } - - fn stop(&self) -> Result<(), gst::ErrorMessage> { - gst::trace!(CAT, imp: self, "Stopping"); - - let _ = self.parent_stop(); - - *self.state.lock().unwrap() = State::default(); - - Ok(()) - } - - fn start(&self) -> Result<(), gst::ErrorMessage> { - gst::trace!(CAT, imp: self, "Starting"); - - self.parent_start()?; - - // For non-single-stream variants configure a default segment that allows for negative - // DTS so that we can correctly re-timestamp buffers with their running times. - let aggregator = self.instance(); - let class = aggregator.class(); - if !class.as_ref().variant.is_single_stream() { - let mut segment = gst::FormattedSegment::<gst::ClockTime>::new(); - segment.set_start(SEGMENT_OFFSET); - segment.set_position(SEGMENT_OFFSET); - aggregator.update_segment(&segment); - } - - *self.state.lock().unwrap() = State::default(); - - Ok(()) - } - - fn negotiate(&self) -> bool { - true - } - - fn aggregate(&self, timeout: bool) -> Result<gst::FlowSuccess, gst::FlowError> { - let settings = self.settings.lock().unwrap().clone(); - - let mut upstream_events = vec![]; - - let all_eos; - let (caps, buffers) = { - let mut state = self.state.lock().unwrap(); - - // Create streams - if state.streams.is_empty() { - self.create_streams(&mut state)?; - } - - // Queue buffers from all streams that are not filled for the current fragment yet - // - // Always take a buffer from the stream with the earliest queued buffer to keep the - // fill-level at all sinkpads in sync. - let fragment_start_pts = state.fragment_start_pts; - - while let Some((idx, stream)) = self.find_earliest_stream(&mut state, timeout)? { - // Can only happen if the stream was flushed in the meantime - let buffer = match stream.sinkpad.pop_buffer() { - Some(buffer) => buffer, - None => continue, - }; - - // Can only happen if the stream was flushed in the meantime - let segment = match stream - .sinkpad - .segment() - .clone() - .downcast::<gst::ClockTime>() - .ok() - { - Some(segment) => segment, - None => { - gst::error!(CAT, obj: &stream.sinkpad, "Got buffer before segment"); - return Err(gst::FlowError::Error); - } - }; - - // Queue up the buffer and update GOP tracking state - self.queue_gops(idx, stream, &segment, buffer)?; - - // Check if this stream is filled enough now. - if let Some((queued_end_pts, fragment_start_pts)) = Option::zip( - stream - .queued_gops - .iter() - .find(|gop| gop.final_end_pts) - .map(|gop| gop.end_pts), - fragment_start_pts, - ) { - if queued_end_pts.saturating_sub(fragment_start_pts) - >= settings.fragment_duration - { - gst::debug!(CAT, obj: &stream.sinkpad, "Stream queued enough data for this fragment"); - stream.fragment_filled = true; - } - } - } - - // Calculate the earliest PTS after queueing input if we can now. - if state.earliest_pts.is_none() { - let mut earliest_pts = None; - - for stream in &state.streams { - let stream_earliest_pts = match stream.queued_gops.back() { - None => { - earliest_pts = None; - break; - } - Some(oldest_gop) => { - if !timeout && !oldest_gop.final_earliest_pts { - earliest_pts = None; - break; - } - - oldest_gop.earliest_pts - } - }; - - if earliest_pts.opt_gt(stream_earliest_pts).unwrap_or(true) { - earliest_pts = Some(stream_earliest_pts); - } - } - - if let Some(earliest_pts) = earliest_pts { - gst::info!(CAT, imp: self, "Got earliest PTS {}", earliest_pts); - state.earliest_pts = Some(earliest_pts); - state.fragment_start_pts = Some(earliest_pts); - - gst::debug!( - CAT, - imp: self, - "Sending first force-keyunit event for running time {}", - earliest_pts + settings.fragment_duration, - ); - - let fku = gst_video::UpstreamForceKeyUnitEvent::builder() - .running_time(earliest_pts + settings.fragment_duration) - .all_headers(true) - .build(); - - for stream in &mut state.streams { - upstream_events.push((stream.sinkpad.clone(), fku.clone())); - - // Check if this stream is filled enough now. - if let Some(queued_end_pts) = stream - .queued_gops - .iter() - .find(|gop| gop.final_end_pts) - .map(|gop| gop.end_pts) - { - if queued_end_pts.saturating_sub(earliest_pts) - >= settings.fragment_duration - { - gst::debug!(CAT, obj: &stream.sinkpad, "Stream queued enough data for this fragment"); - stream.fragment_filled = true; - } - } - } - } - } - - all_eos = state.streams.iter().all(|stream| stream.sinkpad.is_eos()); - if all_eos { - gst::debug!(CAT, imp: self, "All streams are EOS now"); - } - - // If enough GOPs were queued, drain and create the output fragment - match self.drain( - &mut state, - &settings, - timeout, - all_eos, - &mut upstream_events, - ) { - Ok(res) => res, - Err(gst_base::AGGREGATOR_FLOW_NEED_DATA) => { - gst::element_imp_warning!( - self, - gst::StreamError::Format, - ["Longer GOPs than fragment duration"] - ); - state.timeout_delay += 1.seconds(); - - drop(state); - for (sinkpad, event) in upstream_events { - sinkpad.push_event(event); - } - return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA); - } - Err(err) => return Err(err), - } - }; - - for (sinkpad, event) in upstream_events { - sinkpad.push_event(event); - } - - if let Some(caps) = caps { - gst::debug!(CAT, imp: self, "Setting caps on source pad: {:?}", caps); - self.instance().set_src_caps(&caps); - } - - if let Some(buffers) = buffers { - gst::trace!(CAT, imp: self, "Pushing buffer list {:?}", buffers); - self.instance().finish_buffer_list(buffers)?; - } - - if all_eos { - gst::debug!(CAT, imp: self, "Doing EOS handling"); - - if settings.header_update_mode != super::HeaderUpdateMode::None { - let updated_header = - self.update_header(&mut self.state.lock().unwrap(), &settings, true); - match updated_header { - Ok(Some((buffer_list, caps))) => { - match settings.header_update_mode { - super::HeaderUpdateMode::None => unreachable!(), - super::HeaderUpdateMode::Rewrite => { - let mut q = gst::query::Seeking::new(gst::Format::Bytes); - if self.instance().src_pad().peer_query(&mut q) && q.result().0 { - let aggregator = self.instance(); - - aggregator.set_src_caps(&caps); - - // Seek to the beginning with a default bytes segment - aggregator - .update_segment( - &gst::FormattedSegment::<gst::format::Bytes>::new(), - ); - - if let Err(err) = aggregator.finish_buffer_list(buffer_list) { - gst::error!( - CAT, - imp: self, - "Failed pushing updated header buffer downstream: {:?}", - err, - ); - } - } else { - gst::error!( - CAT, - imp: self, - "Can't rewrite header because downstream is not seekable" - ); - } - } - super::HeaderUpdateMode::Update => { - let aggregator = self.instance(); - - aggregator.set_src_caps(&caps); - if let Err(err) = aggregator.finish_buffer_list(buffer_list) { - gst::error!( - CAT, - imp: self, - "Failed pushing updated header buffer downstream: {:?}", - err, - ); - } - } - } - } - Ok(None) => {} - Err(err) => { - gst::error!( - CAT, - imp: self, - "Failed to generate updated header: {:?}", - err - ); - } - } - } - - // Need to output new headers if started again after EOS - self.state.lock().unwrap().sent_headers = false; - - Err(gst::FlowError::Eos) - } else { - Ok(gst::FlowSuccess::Ok) - } - } -} - -#[repr(C)] -pub(crate) struct Class { - parent: gst_base::ffi::GstAggregatorClass, - variant: super::Variant, -} - -unsafe impl ClassStruct for Class { - type Type = FMP4Mux; -} - -impl std::ops::Deref for Class { - type Target = glib::Class<gst_base::Aggregator>; - - fn deref(&self) -> &Self::Target { - unsafe { &*(&self.parent as *const _ as *const _) } - } -} - -unsafe impl<T: FMP4MuxImpl> IsSubclassable<T> for super::FMP4Mux { - fn class_init(class: &mut glib::Class<Self>) { - Self::parent_class_init::<T>(class); - - let class = class.as_mut(); - class.variant = T::VARIANT; - } -} - -pub(crate) trait FMP4MuxImpl: AggregatorImpl { - const VARIANT: super::Variant; -} - -#[derive(Default)] -pub(crate) struct ISOFMP4Mux; - -#[glib::object_subclass] -impl ObjectSubclass for ISOFMP4Mux { - const NAME: &'static str = "GstISOFMP4Mux"; - type Type = super::ISOFMP4Mux; - type ParentType = super::FMP4Mux; -} - -impl ObjectImpl for ISOFMP4Mux {} - -impl GstObjectImpl for ISOFMP4Mux {} - -impl ElementImpl for ISOFMP4Mux { - fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { - static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| { - gst::subclass::ElementMetadata::new( - "ISOFMP4Mux", - "Codec/Muxer", - "ISO fragmented MP4 muxer", - "Sebastian Dröge <sebastian@centricular.com>", - ) - }); - - Some(&*ELEMENT_METADATA) - } - - fn pad_templates() -> &'static [gst::PadTemplate] { - static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| { - let src_pad_template = gst::PadTemplate::new( - "src", - gst::PadDirection::Src, - gst::PadPresence::Always, - &gst::Caps::builder("video/quicktime") - .field("variant", "iso-fragmented") - .build(), - ) - .unwrap(); - - let sink_pad_template = gst::PadTemplate::new( - "sink_%u", - gst::PadDirection::Sink, - gst::PadPresence::Request, - &[ - gst::Structure::builder("video/x-h264") - .field("stream-format", gst::List::new(["avc", "avc3"])) - .field("alignment", "au") - .field("width", gst::IntRange::new(1, u16::MAX as i32)) - .field("height", gst::IntRange::new(1, u16::MAX as i32)) - .build(), - gst::Structure::builder("video/x-h265") - .field("stream-format", gst::List::new(["hvc1", "hev1"])) - .field("alignment", "au") - .field("width", gst::IntRange::new(1, u16::MAX as i32)) - .field("height", gst::IntRange::new(1, u16::MAX as i32)) - .build(), - gst::Structure::builder("audio/mpeg") - .field("mpegversion", 4i32) - .field("stream-format", "raw") - .field("channels", gst::IntRange::new(1, u16::MAX as i32)) - .field("rate", gst::IntRange::new(1, i32::MAX)) - .build(), - ] - .into_iter() - .collect::<gst::Caps>(), - ) - .unwrap(); - - vec![src_pad_template, sink_pad_template] - }); - - PAD_TEMPLATES.as_ref() - } -} - -impl AggregatorImpl for ISOFMP4Mux {} - -impl FMP4MuxImpl for ISOFMP4Mux { - const VARIANT: super::Variant = super::Variant::ISO; -} - -#[derive(Default)] -pub(crate) struct CMAFMux; - -#[glib::object_subclass] -impl ObjectSubclass for CMAFMux { - const NAME: &'static str = "GstCMAFMux"; - type Type = super::CMAFMux; - type ParentType = super::FMP4Mux; -} - -impl ObjectImpl for CMAFMux {} - -impl GstObjectImpl for CMAFMux {} - -impl ElementImpl for CMAFMux { - fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { - static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| { - gst::subclass::ElementMetadata::new( - "CMAFMux", - "Codec/Muxer", - "CMAF fragmented MP4 muxer", - "Sebastian Dröge <sebastian@centricular.com>", - ) - }); - - Some(&*ELEMENT_METADATA) - } - - fn pad_templates() -> &'static [gst::PadTemplate] { - static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| { - let src_pad_template = gst::PadTemplate::new( - "src", - gst::PadDirection::Src, - gst::PadPresence::Always, - &gst::Caps::builder("video/quicktime") - .field("variant", "cmaf") - .build(), - ) - .unwrap(); - - let sink_pad_template = gst::PadTemplate::new( - "sink", - gst::PadDirection::Sink, - gst::PadPresence::Always, - &[ - gst::Structure::builder("video/x-h264") - .field("stream-format", gst::List::new(["avc", "avc3"])) - .field("alignment", "au") - .field("width", gst::IntRange::new(1, u16::MAX as i32)) - .field("height", gst::IntRange::new(1, u16::MAX as i32)) - .build(), - gst::Structure::builder("video/x-h265") - .field("stream-format", gst::List::new(["hvc1", "hev1"])) - .field("alignment", "au") - .field("width", gst::IntRange::new(1, u16::MAX as i32)) - .field("height", gst::IntRange::new(1, u16::MAX as i32)) - .build(), - gst::Structure::builder("audio/mpeg") - .field("mpegversion", 4i32) - .field("stream-format", "raw") - .field("channels", gst::IntRange::new(1, u16::MAX as i32)) - .field("rate", gst::IntRange::new(1, i32::MAX)) - .build(), - ] - .into_iter() - .collect::<gst::Caps>(), - ) - .unwrap(); - - vec![src_pad_template, sink_pad_template] - }); - - PAD_TEMPLATES.as_ref() - } -} - -impl AggregatorImpl for CMAFMux {} - -impl FMP4MuxImpl for CMAFMux { - const VARIANT: super::Variant = super::Variant::CMAF; -} - -#[derive(Default)] -pub(crate) struct DASHMP4Mux; - -#[glib::object_subclass] -impl ObjectSubclass for DASHMP4Mux { - const NAME: &'static str = "GstDASHMP4Mux"; - type Type = super::DASHMP4Mux; - type ParentType = super::FMP4Mux; -} - -impl ObjectImpl for DASHMP4Mux {} - -impl GstObjectImpl for DASHMP4Mux {} - -impl ElementImpl for DASHMP4Mux { - fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { - static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| { - gst::subclass::ElementMetadata::new( - "DASHMP4Mux", - "Codec/Muxer", - "DASH fragmented MP4 muxer", - "Sebastian Dröge <sebastian@centricular.com>", - ) - }); - - Some(&*ELEMENT_METADATA) - } - - fn pad_templates() -> &'static [gst::PadTemplate] { - static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| { - let src_pad_template = gst::PadTemplate::new( - "src", - gst::PadDirection::Src, - gst::PadPresence::Always, - &gst::Caps::builder("video/quicktime") - .field("variant", "iso-fragmented") - .build(), - ) - .unwrap(); - - let sink_pad_template = gst::PadTemplate::new( - "sink", - gst::PadDirection::Sink, - gst::PadPresence::Always, - &[ - gst::Structure::builder("video/x-h264") - .field("stream-format", gst::List::new(&[&"avc", &"avc3"])) - .field("alignment", "au") - .field("width", gst::IntRange::<i32>::new(1, u16::MAX as i32)) - .field("height", gst::IntRange::<i32>::new(1, u16::MAX as i32)) - .build(), - gst::Structure::builder("video/x-h265") - .field("stream-format", gst::List::new(&[&"hvc1", &"hev1"])) - .field("alignment", "au") - .field("width", gst::IntRange::<i32>::new(1, u16::MAX as i32)) - .field("height", gst::IntRange::<i32>::new(1, u16::MAX as i32)) - .build(), - gst::Structure::builder("audio/mpeg") - .field("mpegversion", 4i32) - .field("stream-format", "raw") - .field("channels", gst::IntRange::<i32>::new(1, u16::MAX as i32)) - .field("rate", gst::IntRange::<i32>::new(1, i32::MAX)) - .build(), - ] - .into_iter() - .collect::<gst::Caps>(), - ) - .unwrap(); - - vec![src_pad_template, sink_pad_template] - }); - - PAD_TEMPLATES.as_ref() - } -} - -impl AggregatorImpl for DASHMP4Mux {} - -impl FMP4MuxImpl for DASHMP4Mux { - const VARIANT: super::Variant = super::Variant::DASH; -} - -#[derive(Default)] -pub(crate) struct ONVIFFMP4Mux; - -#[glib::object_subclass] -impl ObjectSubclass for ONVIFFMP4Mux { - const NAME: &'static str = "GstONVIFFMP4Mux"; - type Type = super::ONVIFFMP4Mux; - type ParentType = super::FMP4Mux; -} - -impl ObjectImpl for ONVIFFMP4Mux {} - -impl GstObjectImpl for ONVIFFMP4Mux {} - -impl ElementImpl for ONVIFFMP4Mux { - fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { - static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| { - gst::subclass::ElementMetadata::new( - "ONVIFFMP4Mux", - "Codec/Muxer", - "ONVIF fragmented MP4 muxer", - "Sebastian Dröge <sebastian@centricular.com>", - ) - }); - - Some(&*ELEMENT_METADATA) - } - - fn pad_templates() -> &'static [gst::PadTemplate] { - static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| { - let src_pad_template = gst::PadTemplate::new( - "src", - gst::PadDirection::Src, - gst::PadPresence::Always, - &gst::Caps::builder("video/quicktime") - .field("variant", "iso-fragmented") - .build(), - ) - .unwrap(); - - let sink_pad_template = gst::PadTemplate::new( - "sink_%u", - gst::PadDirection::Sink, - gst::PadPresence::Request, - &[ - gst::Structure::builder("video/x-h264") - .field("stream-format", gst::List::new(&[&"avc", &"avc3"])) - .field("alignment", "au") - .field("width", gst::IntRange::<i32>::new(1, u16::MAX as i32)) - .field("height", gst::IntRange::<i32>::new(1, u16::MAX as i32)) - .build(), - gst::Structure::builder("video/x-h265") - .field("stream-format", gst::List::new(&[&"hvc1", &"hev1"])) - .field("alignment", "au") - .field("width", gst::IntRange::<i32>::new(1, u16::MAX as i32)) - .field("height", gst::IntRange::<i32>::new(1, u16::MAX as i32)) - .build(), - gst::Structure::builder("image/jpeg") - .field("width", gst::IntRange::<i32>::new(1, u16::MAX as i32)) - .field("height", gst::IntRange::<i32>::new(1, u16::MAX as i32)) - .build(), - gst::Structure::builder("audio/mpeg") - .field("mpegversion", 4i32) - .field("stream-format", "raw") - .field("channels", gst::IntRange::<i32>::new(1, u16::MAX as i32)) - .field("rate", gst::IntRange::<i32>::new(1, i32::MAX)) - .build(), - gst::Structure::builder("audio/x-alaw") - .field("channels", gst::IntRange::<i32>::new(1, 2)) - .field("rate", gst::IntRange::<i32>::new(1, i32::MAX)) - .build(), - gst::Structure::builder("audio/x-mulaw") - .field("channels", gst::IntRange::<i32>::new(1, 2)) - .field("rate", gst::IntRange::<i32>::new(1, i32::MAX)) - .build(), - gst::Structure::builder("audio/x-adpcm") - .field("layout", "g726") - .field("channels", 1i32) - .field("rate", 8000i32) - .field("bitrate", gst::List::new([16000i32, 24000, 32000, 40000])) - .build(), - gst::Structure::builder("application/x-onvif-metadata") - .field("parsed", true) - .build(), - ] - .into_iter() - .collect::<gst::Caps>(), - ) - .unwrap(); - - vec![src_pad_template, sink_pad_template] - }); - - PAD_TEMPLATES.as_ref() - } -} - -impl AggregatorImpl for ONVIFFMP4Mux {} - -impl FMP4MuxImpl for ONVIFFMP4Mux { - const VARIANT: super::Variant = super::Variant::ONVIF; -} diff --git a/generic/fmp4/src/fmp4mux/mod.rs b/generic/fmp4/src/fmp4mux/mod.rs deleted file mode 100644 index 76c2da9c..00000000 --- a/generic/fmp4/src/fmp4mux/mod.rs +++ /dev/null @@ -1,146 +0,0 @@ -// Copyright (C) 2021 Sebastian Dröge <sebastian@centricular.com> -// -// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. -// If a copy of the MPL was not distributed with this file, You can obtain one at -// <https://mozilla.org/MPL/2.0/>. -// -// SPDX-License-Identifier: MPL-2.0 - -use gst::glib; -use gst::prelude::*; - -mod boxes; -mod imp; - -glib::wrapper! { - pub(crate) struct FMP4Mux(ObjectSubclass<imp::FMP4Mux>) @extends gst_base::Aggregator, gst::Element, gst::Object; -} - -glib::wrapper! { - pub(crate) struct ISOFMP4Mux(ObjectSubclass<imp::ISOFMP4Mux>) @extends FMP4Mux, gst_base::Aggregator, gst::Element, gst::Object; -} - -glib::wrapper! { - pub(crate) struct CMAFMux(ObjectSubclass<imp::CMAFMux>) @extends FMP4Mux, gst_base::Aggregator, gst::Element, gst::Object; -} - -glib::wrapper! { - pub(crate) struct DASHMP4Mux(ObjectSubclass<imp::DASHMP4Mux>) @extends FMP4Mux, gst_base::Aggregator, gst::Element, gst::Object; -} - -glib::wrapper! { - pub(crate) struct ONVIFFMP4Mux(ObjectSubclass<imp::ONVIFFMP4Mux>) @extends FMP4Mux, gst_base::Aggregator, gst::Element, gst::Object; -} - -pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { - FMP4Mux::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty()); - HeaderUpdateMode::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty()); - gst::Element::register( - Some(plugin), - "isofmp4mux", - gst::Rank::Primary, - ISOFMP4Mux::static_type(), - )?; - gst::Element::register( - Some(plugin), - "cmafmux", - gst::Rank::Primary, - CMAFMux::static_type(), - )?; - gst::Element::register( - Some(plugin), - "dashmp4mux", - gst::Rank::Primary, - DASHMP4Mux::static_type(), - )?; - gst::Element::register( - Some(plugin), - "onviffmp4mux", - gst::Rank::Primary, - ONVIFFMP4Mux::static_type(), - )?; - - Ok(()) -} - -#[derive(Debug)] -pub(crate) struct HeaderConfiguration<'a> { - variant: Variant, - update: bool, - /// First caps must be the video/reference stream. Must be in the order the tracks are going to - /// be used later for the fragments too. - streams: &'a [gst::Caps], - write_mehd: bool, - duration: Option<gst::ClockTime>, - /// Start UTC time in ONVIF mode. - /// Since Jan 1 1601 in 100ns units. - start_utc_time: Option<u64>, -} - -#[derive(Debug)] -pub(crate) struct FragmentHeaderConfiguration<'a> { - variant: Variant, - sequence_number: u32, - streams: &'a [(gst::Caps, Option<FragmentTimingInfo>)], - buffers: &'a [Buffer], -} - -#[derive(Debug)] -pub(crate) struct FragmentTimingInfo { - /// Start time of this fragment - start_time: gst::ClockTime, - /// Set if this is an intra-only stream - intra_only: bool, -} - -#[derive(Debug)] -pub(crate) struct Buffer { - /// Track index - idx: usize, - - /// Actual buffer - buffer: gst::Buffer, - - /// Timestamp - timestamp: gst::ClockTime, - - /// Sample duration - duration: gst::ClockTime, - - /// Composition time offset - composition_time_offset: Option<i64>, -} - -#[allow(clippy::upper_case_acronyms)] -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub(crate) enum Variant { - ISO, - CMAF, - DASH, - ONVIF, -} - -impl Variant { - pub(crate) fn is_single_stream(self) -> bool { - match self { - Variant::ISO | Variant::ONVIF => false, - Variant::CMAF | Variant::DASH => true, - } - } -} - -#[derive(Debug)] -pub(crate) struct FragmentOffset { - time: gst::ClockTime, - offset: u64, -} - -#[allow(clippy::upper_case_acronyms)] -#[derive(Debug, Clone, Copy, PartialEq, Eq, glib::Enum)] -#[repr(i32)] -#[enum_type(name = "GstFMP4MuxHeaderUpdateMode")] -pub(crate) enum HeaderUpdateMode { - None, - Rewrite, - Update, -} diff --git a/generic/fmp4/src/lib.rs b/generic/fmp4/src/lib.rs deleted file mode 100644 index 697c7cbf..00000000 --- a/generic/fmp4/src/lib.rs +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright (C) 2021 Sebastian Dröge <sebastian@centricular.com> -// -// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. -// If a copy of the MPL was not distributed with this file, You can obtain one at -// <https://mozilla.org/MPL/2.0/>. -// -// SPDX-License-Identifier: MPL-2.0 -#![allow(clippy::non_send_fields_in_send_ty, unused_doc_comments)] - -/** - * plugin-fmp4: - * - * Since: plugins-rs-0.8.0 - */ -use gst::glib; - -mod fmp4mux; - -fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { - fmp4mux::register(plugin) -} - -gst::plugin_define!( - fmp4, - env!("CARGO_PKG_DESCRIPTION"), - plugin_init, - concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")), - // FIXME: MPL-2.0 is only allowed since 1.18.3 (as unknown) and 1.20 (as known) - "MPL", - env!("CARGO_PKG_NAME"), - env!("CARGO_PKG_NAME"), - env!("CARGO_PKG_REPOSITORY"), - env!("BUILD_REL_DATE") -); diff --git a/generic/fmp4/tests/tests.rs b/generic/fmp4/tests/tests.rs deleted file mode 100644 index 073b6ae0..00000000 --- a/generic/fmp4/tests/tests.rs +++ /dev/null @@ -1,1241 +0,0 @@ -// -// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. -// If a copy of the MPL was not distributed with this file, You can obtain one at -// <https://mozilla.org/MPL/2.0/>. -// -// SPDX-License-Identifier: MPL-2.0 -// - -use gst::prelude::*; - -fn init() { - use std::sync::Once; - static INIT: Once = Once::new(); - - INIT.call_once(|| { - gst::init().unwrap(); - gstfmp4::plugin_register_static().unwrap(); - }); -} - -fn test_buffer_flags_single_stream(cmaf: bool) { - let mut h = if cmaf { - gst_check::Harness::new("cmafmux") - } else { - gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src")) - }; - - // 5s fragment duration - h.element() - .unwrap() - .set_property("fragment-duration", 5.seconds()); - - h.set_src_caps( - gst::Caps::builder("video/x-h264") - .field("width", 1920i32) - .field("height", 1080i32) - .field("framerate", gst::Fraction::new(30, 1)) - .field("stream-format", "avc") - .field("alignment", "au") - .field("codec_data", gst::Buffer::with_size(1).unwrap()) - .build(), - ); - h.play(); - - let output_offset = if cmaf { - gst::ClockTime::ZERO - } else { - (60 * 60 * 1000).seconds() - }; - - // Push 7 buffers of 1s each, 1st and 6 buffer without DELTA_UNIT flag - for i in 0..7 { - let mut buffer = gst::Buffer::with_size(1).unwrap(); - { - let buffer = buffer.get_mut().unwrap(); - buffer.set_pts(i.seconds()); - buffer.set_dts(i.seconds()); - buffer.set_duration(gst::ClockTime::SECOND); - if i != 0 && i != 5 { - buffer.set_flags(gst::BufferFlags::DELTA_UNIT); - } - } - assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok)); - - if i == 2 { - let ev = loop { - let ev = h.pull_upstream_event().unwrap(); - if ev.type_() != gst::EventType::Reconfigure - && ev.type_() != gst::EventType::Latency - { - break ev; - } - }; - - assert_eq!(ev.type_(), gst::EventType::CustomUpstream); - assert_eq!( - gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(), - gst_video::UpstreamForceKeyUnitEvent { - running_time: Some(5.seconds()), - all_headers: true, - count: 0 - } - ); - } - } - - let header = h.pull().unwrap(); - assert_eq!( - header.flags(), - gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT - ); - assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset)); - assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset)); - - let fragment_header = h.pull().unwrap(); - assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER); - assert_eq!( - fragment_header.pts(), - Some(gst::ClockTime::ZERO + output_offset) - ); - assert_eq!( - fragment_header.dts(), - Some(gst::ClockTime::ZERO + output_offset) - ); - assert_eq!(fragment_header.duration(), Some(5.seconds())); - - for i in 0..5 { - let buffer = h.pull().unwrap(); - if i == 4 { - assert_eq!( - buffer.flags(), - gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER - ); - } else { - assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); - } - assert_eq!(buffer.pts(), Some(i.seconds() + output_offset)); - assert_eq!(buffer.dts(), Some(i.seconds() + output_offset)); - assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND)); - } - - h.push_event(gst::event::Eos::new()); - - let fragment_header = h.pull().unwrap(); - assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER); - assert_eq!(fragment_header.pts(), Some(5.seconds() + output_offset)); - assert_eq!(fragment_header.dts(), Some(5.seconds() + output_offset)); - assert_eq!(fragment_header.duration(), Some(2.seconds())); - - for i in 5..7 { - let buffer = h.pull().unwrap(); - if i == 6 { - assert_eq!( - buffer.flags(), - gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER - ); - } else { - assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); - } - assert_eq!(buffer.pts(), Some(i.seconds() + output_offset)); - assert_eq!(buffer.dts(), Some(i.seconds() + output_offset)); - assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND)); - } - - let ev = h.pull_event().unwrap(); - assert_eq!(ev.type_(), gst::EventType::StreamStart); - let ev = h.pull_event().unwrap(); - assert_eq!(ev.type_(), gst::EventType::Caps); - let ev = h.pull_event().unwrap(); - assert_eq!(ev.type_(), gst::EventType::Segment); - let ev = h.pull_event().unwrap(); - assert_eq!(ev.type_(), gst::EventType::Eos); -} - -#[test] -fn test_buffer_flags_single_stream_cmaf() { - init(); - - test_buffer_flags_single_stream(true); -} - -#[test] -fn test_buffer_flags_single_stream_iso() { - init(); - - test_buffer_flags_single_stream(false); -} - -#[test] -fn test_buffer_flags_multi_stream() { - init(); - - let mut h1 = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src")); - let mut h2 = gst_check::Harness::with_element(&h1.element().unwrap(), Some("sink_1"), None); - - // 5s fragment duration - h1.element() - .unwrap() - .set_property("fragment-duration", 5.seconds()); - - h1.set_src_caps( - gst::Caps::builder("video/x-h264") - .field("width", 1920i32) - .field("height", 1080i32) - .field("framerate", gst::Fraction::new(30, 1)) - .field("stream-format", "avc") - .field("alignment", "au") - .field("codec_data", gst::Buffer::with_size(1).unwrap()) - .build(), - ); - h1.play(); - - h2.set_src_caps( - gst::Caps::builder("audio/mpeg") - .field("mpegversion", 4i32) - .field("channels", 1i32) - .field("rate", 44100i32) - .field("stream-format", "raw") - .field("base-profile", "lc") - .field("profile", "lc") - .field("level", "2") - .field( - "codec_data", - gst::Buffer::from_slice([0x12, 0x08, 0x56, 0xe5, 0x00]), - ) - .build(), - ); - h2.play(); - - let output_offset = (60 * 60 * 1000).seconds(); - - // Push 7 buffers of 1s each, 1st and last buffer without DELTA_UNIT flag - for i in 0..7 { - let mut buffer = gst::Buffer::with_size(1).unwrap(); - { - let buffer = buffer.get_mut().unwrap(); - buffer.set_pts(i.seconds()); - buffer.set_dts(i.seconds()); - buffer.set_duration(gst::ClockTime::SECOND); - if i != 0 && i != 5 { - buffer.set_flags(gst::BufferFlags::DELTA_UNIT); - } - } - assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok)); - - let mut buffer = gst::Buffer::with_size(1).unwrap(); - { - let buffer = buffer.get_mut().unwrap(); - buffer.set_pts(i.seconds()); - buffer.set_dts(i.seconds()); - buffer.set_duration(gst::ClockTime::SECOND); - } - assert_eq!(h2.push(buffer), Ok(gst::FlowSuccess::Ok)); - - if i == 2 { - let ev = loop { - let ev = h1.pull_upstream_event().unwrap(); - if ev.type_() != gst::EventType::Reconfigure - && ev.type_() != gst::EventType::Latency - { - break ev; - } - }; - - assert_eq!(ev.type_(), gst::EventType::CustomUpstream); - assert_eq!( - gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(), - gst_video::UpstreamForceKeyUnitEvent { - running_time: Some(5.seconds()), - all_headers: true, - count: 0 - } - ); - - let ev = loop { - let ev = h2.pull_upstream_event().unwrap(); - if ev.type_() != gst::EventType::Reconfigure - && ev.type_() != gst::EventType::Latency - { - break ev; - } - }; - - assert_eq!(ev.type_(), gst::EventType::CustomUpstream); - assert_eq!( - gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(), - gst_video::UpstreamForceKeyUnitEvent { - running_time: Some(5.seconds()), - all_headers: true, - count: 0 - } - ); - } - } - - let header = h1.pull().unwrap(); - assert_eq!( - header.flags(), - gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT - ); - assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset)); - assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset)); - - let fragment_header = h1.pull().unwrap(); - assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER); - assert_eq!( - fragment_header.pts(), - Some(gst::ClockTime::ZERO + output_offset) - ); - assert_eq!( - fragment_header.dts(), - Some(gst::ClockTime::ZERO + output_offset) - ); - assert_eq!(fragment_header.duration(), Some(5.seconds())); - - for i in 0..5 { - for j in 0..2 { - let buffer = h1.pull().unwrap(); - if i == 4 && j == 1 { - assert_eq!( - buffer.flags(), - gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER - ); - } else { - assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); - } - - assert_eq!(buffer.pts(), Some(i.seconds() + output_offset)); - - if j == 0 { - assert_eq!(buffer.dts(), Some(i.seconds() + output_offset)); - } else { - assert!(buffer.dts().is_none()); - } - assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND)); - } - } - - h1.push_event(gst::event::Eos::new()); - h2.push_event(gst::event::Eos::new()); - - let fragment_header = h1.pull().unwrap(); - assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER); - assert_eq!(fragment_header.pts(), Some(5.seconds() + output_offset)); - assert_eq!(fragment_header.dts(), Some(5.seconds() + output_offset)); - assert_eq!(fragment_header.duration(), Some(2.seconds())); - - for i in 5..7 { - for j in 0..2 { - let buffer = h1.pull().unwrap(); - if i == 6 && j == 1 { - assert_eq!( - buffer.flags(), - gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER - ); - } else { - assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); - } - assert_eq!(buffer.pts(), Some(i.seconds() + output_offset)); - if j == 0 { - assert_eq!(buffer.dts(), Some(i.seconds() + output_offset)); - } else { - assert!(buffer.dts().is_none()); - } - assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND)); - } - } - - let ev = h1.pull_event().unwrap(); - assert_eq!(ev.type_(), gst::EventType::StreamStart); - let ev = h1.pull_event().unwrap(); - assert_eq!(ev.type_(), gst::EventType::Caps); - let ev = h1.pull_event().unwrap(); - assert_eq!(ev.type_(), gst::EventType::Segment); - let ev = h1.pull_event().unwrap(); - assert_eq!(ev.type_(), gst::EventType::Eos); -} - -#[test] -fn test_live_timeout() { - init(); - - let mut h1 = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src")); - let mut h2 = gst_check::Harness::with_element(&h1.element().unwrap(), Some("sink_1"), None); - - h1.use_testclock(); - - // 5s fragment duration - h1.element() - .unwrap() - .set_property("fragment-duration", 5.seconds()); - - h1.set_src_caps( - gst::Caps::builder("video/x-h264") - .field("width", 1920i32) - .field("height", 1080i32) - .field("framerate", gst::Fraction::new(30, 1)) - .field("stream-format", "avc") - .field("alignment", "au") - .field("codec_data", gst::Buffer::with_size(1).unwrap()) - .build(), - ); - h1.play(); - - h2.set_src_caps( - gst::Caps::builder("audio/mpeg") - .field("mpegversion", 4i32) - .field("channels", 1i32) - .field("rate", 44100i32) - .field("stream-format", "raw") - .field("base-profile", "lc") - .field("profile", "lc") - .field("level", "2") - .field( - "codec_data", - gst::Buffer::from_slice([0x12, 0x08, 0x56, 0xe5, 0x00]), - ) - .build(), - ); - h2.play(); - - let output_offset = (60 * 60 * 1000).seconds(); - - // Push 7 buffers of 1s each, 1st and last buffer without DELTA_UNIT flag - for i in 0..7 { - let mut buffer = gst::Buffer::with_size(1).unwrap(); - { - let buffer = buffer.get_mut().unwrap(); - buffer.set_pts(i.seconds()); - buffer.set_dts(i.seconds()); - buffer.set_duration(gst::ClockTime::SECOND); - if i != 0 && i != 5 { - buffer.set_flags(gst::BufferFlags::DELTA_UNIT); - } - } - assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok)); - - // Skip buffer 4th and 6th buffer (end of fragment / stream) - if i == 4 || i == 6 { - continue; - } else { - let mut buffer = gst::Buffer::with_size(1).unwrap(); - { - let buffer = buffer.get_mut().unwrap(); - buffer.set_pts(i.seconds()); - buffer.set_dts(i.seconds()); - buffer.set_duration(gst::ClockTime::SECOND); - } - assert_eq!(h2.push(buffer), Ok(gst::FlowSuccess::Ok)); - } - - if i == 2 { - let ev = loop { - let ev = h1.pull_upstream_event().unwrap(); - if ev.type_() != gst::EventType::Reconfigure - && ev.type_() != gst::EventType::Latency - { - break ev; - } - }; - - assert_eq!(ev.type_(), gst::EventType::CustomUpstream); - assert_eq!( - gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(), - gst_video::UpstreamForceKeyUnitEvent { - running_time: Some(5.seconds()), - all_headers: true, - count: 0 - } - ); - - let ev = loop { - let ev = h2.pull_upstream_event().unwrap(); - if ev.type_() != gst::EventType::Reconfigure - && ev.type_() != gst::EventType::Latency - { - break ev; - } - }; - - assert_eq!(ev.type_(), gst::EventType::CustomUpstream); - assert_eq!( - gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(), - gst_video::UpstreamForceKeyUnitEvent { - running_time: Some(5.seconds()), - all_headers: true, - count: 0 - } - ); - } - } - - // Advance time and crank the clock: this should bring us to the end of the first fragment - h1.set_time(5.seconds()).unwrap(); - h1.crank_single_clock_wait().unwrap(); - - let header = h1.pull().unwrap(); - assert_eq!( - header.flags(), - gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT - ); - assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset)); - assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset)); - - let fragment_header = h1.pull().unwrap(); - assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER); - assert_eq!( - fragment_header.pts(), - Some(gst::ClockTime::ZERO + output_offset) - ); - assert_eq!( - fragment_header.dts(), - Some(gst::ClockTime::ZERO + output_offset) - ); - assert_eq!(fragment_header.duration(), Some(5.seconds())); - - for i in 0..5 { - for j in 0..2 { - // Skip gap events that don't result in buffers - if j == 1 && i == 4 { - // Advance time and crank the clock another time. This brings us at the end of the - // EOS. - h1.crank_single_clock_wait().unwrap(); - continue; - } - - let buffer = h1.pull().unwrap(); - if i == 4 && j == 0 { - assert_eq!( - buffer.flags(), - gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER - ); - } else if i == 5 && j == 0 { - assert_eq!(buffer.flags(), gst::BufferFlags::HEADER); - } else { - assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); - } - - assert_eq!(buffer.pts(), Some(i.seconds() + output_offset)); - - if j == 0 { - assert_eq!(buffer.dts(), Some(i.seconds() + output_offset)); - } else { - assert!(buffer.dts().is_none()); - } - assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND)); - } - } - - h1.push_event(gst::event::Eos::new()); - h2.push_event(gst::event::Eos::new()); - - let fragment_header = h1.pull().unwrap(); - assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER); - assert_eq!(fragment_header.pts(), Some(5.seconds() + output_offset)); - assert_eq!(fragment_header.dts(), Some(5.seconds() + output_offset)); - assert_eq!(fragment_header.duration(), Some(2.seconds())); - - for i in 5..7 { - for j in 0..2 { - // Skip gap events that don't result in buffers - if j == 1 && i == 6 { - continue; - } - - let buffer = h1.pull().unwrap(); - if i == 6 && j == 0 { - assert_eq!( - buffer.flags(), - gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER - ); - } else { - assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); - } - assert_eq!(buffer.pts(), Some(i.seconds() + output_offset)); - if j == 0 { - assert_eq!(buffer.dts(), Some(i.seconds() + output_offset)); - } else { - assert!(buffer.dts().is_none()); - } - assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND)); - } - } - - let ev = h1.pull_event().unwrap(); - assert_eq!(ev.type_(), gst::EventType::StreamStart); - let ev = h1.pull_event().unwrap(); - assert_eq!(ev.type_(), gst::EventType::Caps); - let ev = h1.pull_event().unwrap(); - assert_eq!(ev.type_(), gst::EventType::Segment); - let ev = h1.pull_event().unwrap(); - assert_eq!(ev.type_(), gst::EventType::Eos); -} - -#[test] -fn test_gap_events() { - init(); - - let mut h1 = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src")); - let mut h2 = gst_check::Harness::with_element(&h1.element().unwrap(), Some("sink_1"), None); - - h1.use_testclock(); - - // 5s fragment duration - h1.element() - .unwrap() - .set_property("fragment-duration", 5.seconds()); - - h1.set_src_caps( - gst::Caps::builder("video/x-h264") - .field("width", 1920i32) - .field("height", 1080i32) - .field("framerate", gst::Fraction::new(30, 1)) - .field("stream-format", "avc") - .field("alignment", "au") - .field("codec_data", gst::Buffer::with_size(1).unwrap()) - .build(), - ); - h1.play(); - - h2.set_src_caps( - gst::Caps::builder("audio/mpeg") - .field("mpegversion", 4i32) - .field("channels", 1i32) - .field("rate", 44100i32) - .field("stream-format", "raw") - .field("base-profile", "lc") - .field("profile", "lc") - .field("level", "2") - .field( - "codec_data", - gst::Buffer::from_slice([0x12, 0x08, 0x56, 0xe5, 0x00]), - ) - .build(), - ); - h2.play(); - - let output_offset = (60 * 60 * 1000).seconds(); - - // Push 7 buffers of 1s each, 1st and last buffer without DELTA_UNIT flag - for i in 0..7 { - let mut buffer = gst::Buffer::with_size(1).unwrap(); - { - let buffer = buffer.get_mut().unwrap(); - buffer.set_pts(i.seconds()); - buffer.set_dts(i.seconds()); - buffer.set_duration(gst::ClockTime::SECOND); - if i != 0 && i != 5 { - buffer.set_flags(gst::BufferFlags::DELTA_UNIT); - } - } - assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok)); - - // Replace buffer 3 and 6 with a gap event - if i == 3 || i == 6 { - let ev = gst::event::Gap::builder(i.seconds()) - .duration(gst::ClockTime::SECOND) - .build(); - assert!(h2.push_event(ev)); - } else { - let mut buffer = gst::Buffer::with_size(1).unwrap(); - { - let buffer = buffer.get_mut().unwrap(); - buffer.set_pts(i.seconds()); - buffer.set_dts(i.seconds()); - buffer.set_duration(gst::ClockTime::SECOND); - } - assert_eq!(h2.push(buffer), Ok(gst::FlowSuccess::Ok)); - } - - if i == 2 { - let ev = loop { - let ev = h1.pull_upstream_event().unwrap(); - if ev.type_() != gst::EventType::Reconfigure - && ev.type_() != gst::EventType::Latency - { - break ev; - } - }; - - assert_eq!(ev.type_(), gst::EventType::CustomUpstream); - assert_eq!( - gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(), - gst_video::UpstreamForceKeyUnitEvent { - running_time: Some(5.seconds()), - all_headers: true, - count: 0 - } - ); - - let ev = loop { - let ev = h2.pull_upstream_event().unwrap(); - if ev.type_() != gst::EventType::Reconfigure - && ev.type_() != gst::EventType::Latency - { - break ev; - } - }; - - assert_eq!(ev.type_(), gst::EventType::CustomUpstream); - assert_eq!( - gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(), - gst_video::UpstreamForceKeyUnitEvent { - running_time: Some(5.seconds()), - all_headers: true, - count: 0 - } - ); - } - } - - // Advance time and crank the clock: this should bring us to the end of the first fragment - h1.set_time(5.seconds()).unwrap(); - h1.crank_single_clock_wait().unwrap(); - - let header = h1.pull().unwrap(); - assert_eq!( - header.flags(), - gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT - ); - assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset)); - assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset)); - - let fragment_header = h1.pull().unwrap(); - assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER); - assert_eq!( - fragment_header.pts(), - Some(gst::ClockTime::ZERO + output_offset) - ); - assert_eq!( - fragment_header.dts(), - Some(gst::ClockTime::ZERO + output_offset) - ); - assert_eq!(fragment_header.duration(), Some(5.seconds())); - - for i in 0..5 { - for j in 0..2 { - // Skip gap events that don't result in buffers - if j == 1 && i == 3 { - continue; - } - - let buffer = h1.pull().unwrap(); - if i == 4 && j == 1 { - assert_eq!( - buffer.flags(), - gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER - ); - } else { - assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); - } - - assert_eq!(buffer.pts(), Some(i.seconds() + output_offset)); - - if j == 0 { - assert_eq!(buffer.dts(), Some(i.seconds() + output_offset)); - } else { - assert!(buffer.dts().is_none()); - } - assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND)); - } - } - - h1.push_event(gst::event::Eos::new()); - h2.push_event(gst::event::Eos::new()); - - let fragment_header = h1.pull().unwrap(); - assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER); - assert_eq!(fragment_header.pts(), Some(5.seconds() + output_offset)); - assert_eq!(fragment_header.dts(), Some(5.seconds() + output_offset)); - assert_eq!(fragment_header.duration(), Some(2.seconds())); - - for i in 5..7 { - for j in 0..2 { - // Skip gap events that don't result in buffers - if j == 1 && i == 6 { - continue; - } - - let buffer = h1.pull().unwrap(); - if i == 6 && j == 0 { - assert_eq!( - buffer.flags(), - gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER - ); - } else { - assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); - } - assert_eq!(buffer.pts(), Some(i.seconds() + output_offset)); - if j == 0 { - assert_eq!(buffer.dts(), Some(i.seconds() + output_offset)); - } else { - assert!(buffer.dts().is_none()); - } - assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND)); - } - } - - let ev = h1.pull_event().unwrap(); - assert_eq!(ev.type_(), gst::EventType::StreamStart); - let ev = h1.pull_event().unwrap(); - assert_eq!(ev.type_(), gst::EventType::Caps); - let ev = h1.pull_event().unwrap(); - assert_eq!(ev.type_(), gst::EventType::Segment); - let ev = h1.pull_event().unwrap(); - assert_eq!(ev.type_(), gst::EventType::Eos); -} - -#[test] -fn test_single_stream_short_gops() { - init(); - - let mut h = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src")); - - // 5s fragment duration - h.element() - .unwrap() - .set_property("fragment-duration", 5.seconds()); - - h.set_src_caps( - gst::Caps::builder("video/x-h264") - .field("width", 1920i32) - .field("height", 1080i32) - .field("framerate", gst::Fraction::new(30, 1)) - .field("stream-format", "avc") - .field("alignment", "au") - .field("codec_data", gst::Buffer::with_size(1).unwrap()) - .build(), - ); - h.play(); - - let output_offset = (60 * 60 * 1000).seconds(); - - // Push 8 buffers of 1s each, 1st, 4th and 7th buffer without DELTA_UNIT flag - for i in 0..8 { - let mut buffer = gst::Buffer::with_size(1).unwrap(); - { - let buffer = buffer.get_mut().unwrap(); - buffer.set_pts(i.seconds()); - buffer.set_dts(i.seconds()); - buffer.set_duration(gst::ClockTime::SECOND); - if i != 0 && i != 3 && i != 6 { - buffer.set_flags(gst::BufferFlags::DELTA_UNIT); - } - } - assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok)); - - if i == 2 || i == 7 { - let ev = loop { - let ev = h.pull_upstream_event().unwrap(); - if ev.type_() != gst::EventType::Reconfigure - && ev.type_() != gst::EventType::Latency - { - break ev; - } - }; - - let fku_time = if i == 2 { 5.seconds() } else { 8.seconds() }; - - assert_eq!(ev.type_(), gst::EventType::CustomUpstream); - assert_eq!( - gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(), - gst_video::UpstreamForceKeyUnitEvent { - running_time: Some(fku_time), - all_headers: true, - count: 0 - } - ); - } - } - - let header = h.pull().unwrap(); - assert_eq!( - header.flags(), - gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT - ); - assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset)); - assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset)); - - let fragment_header = h.pull().unwrap(); - assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER); - assert_eq!( - fragment_header.pts(), - Some(gst::ClockTime::ZERO + output_offset) - ); - assert_eq!( - fragment_header.dts(), - Some(gst::ClockTime::ZERO + output_offset) - ); - assert_eq!(fragment_header.duration(), Some(3.seconds())); - - for i in 0..3 { - let buffer = h.pull().unwrap(); - if i == 2 { - assert_eq!( - buffer.flags(), - gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER - ); - } else { - assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); - } - assert_eq!(buffer.pts(), Some(i.seconds() + output_offset)); - assert_eq!(buffer.dts(), Some(i.seconds() + output_offset)); - assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND)); - } - - h.push_event(gst::event::Eos::new()); - - let fragment_header = h.pull().unwrap(); - assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER); - assert_eq!(fragment_header.pts(), Some(3.seconds() + output_offset)); - assert_eq!(fragment_header.dts(), Some(3.seconds() + output_offset)); - assert_eq!(fragment_header.duration(), Some(5.seconds())); - - for i in 3..8 { - let buffer = h.pull().unwrap(); - if i == 7 { - assert_eq!( - buffer.flags(), - gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER - ); - } else { - assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); - } - assert_eq!(buffer.pts(), Some(i.seconds() + output_offset)); - assert_eq!(buffer.dts(), Some(i.seconds() + output_offset)); - assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND)); - } - - let ev = h.pull_event().unwrap(); - assert_eq!(ev.type_(), gst::EventType::StreamStart); - let ev = h.pull_event().unwrap(); - assert_eq!(ev.type_(), gst::EventType::Caps); - let ev = h.pull_event().unwrap(); - assert_eq!(ev.type_(), gst::EventType::Segment); - let ev = h.pull_event().unwrap(); - assert_eq!(ev.type_(), gst::EventType::Eos); -} - -#[test] -fn test_single_stream_long_gops() { - init(); - - let mut h = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src")); - - // 5s fragment duration - h.element() - .unwrap() - .set_property("fragment-duration", 5.seconds()); - - h.set_src_caps( - gst::Caps::builder("video/x-h264") - .field("width", 1920i32) - .field("height", 1080i32) - .field("framerate", gst::Fraction::new(30, 1)) - .field("stream-format", "avc") - .field("alignment", "au") - .field("codec_data", gst::Buffer::with_size(1).unwrap()) - .build(), - ); - h.play(); - - let output_offset = (60 * 60 * 1000).seconds(); - - // Push 10 buffers of 1s each, 1st and 7th buffer without DELTA_UNIT flag - for i in 0..10 { - let mut buffer = gst::Buffer::with_size(1).unwrap(); - { - let buffer = buffer.get_mut().unwrap(); - buffer.set_pts(i.seconds()); - buffer.set_dts(i.seconds()); - buffer.set_duration(gst::ClockTime::SECOND); - if i != 0 && i != 6 { - buffer.set_flags(gst::BufferFlags::DELTA_UNIT); - } - } - assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok)); - - if i == 2 || i == 7 { - let ev = loop { - let ev = h.pull_upstream_event().unwrap(); - if ev.type_() != gst::EventType::Reconfigure - && ev.type_() != gst::EventType::Latency - { - break ev; - } - }; - - let fku_time = if i == 2 { 5.seconds() } else { 11.seconds() }; - - assert_eq!(ev.type_(), gst::EventType::CustomUpstream); - assert_eq!( - gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(), - gst_video::UpstreamForceKeyUnitEvent { - running_time: Some(fku_time), - all_headers: true, - count: 0 - } - ); - } - } - - let header = h.pull().unwrap(); - assert_eq!( - header.flags(), - gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT - ); - assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset)); - assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset)); - - let fragment_header = h.pull().unwrap(); - assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER); - assert_eq!( - fragment_header.pts(), - Some(gst::ClockTime::ZERO + output_offset) - ); - assert_eq!( - fragment_header.dts(), - Some(gst::ClockTime::ZERO + output_offset) - ); - assert_eq!(fragment_header.duration(), Some(6.seconds())); - - for i in 0..6 { - let buffer = h.pull().unwrap(); - if i == 5 { - assert_eq!( - buffer.flags(), - gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER - ); - } else { - assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); - } - assert_eq!(buffer.pts(), Some(i.seconds() + output_offset)); - assert_eq!(buffer.dts(), Some(i.seconds() + output_offset)); - assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND)); - } - - h.push_event(gst::event::Eos::new()); - - let fragment_header = h.pull().unwrap(); - assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER); - assert_eq!(fragment_header.pts(), Some(6.seconds() + output_offset)); - assert_eq!(fragment_header.dts(), Some(6.seconds() + output_offset)); - assert_eq!(fragment_header.duration(), Some(4.seconds())); - - for i in 6..10 { - let buffer = h.pull().unwrap(); - if i == 9 { - assert_eq!( - buffer.flags(), - gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER - ); - } else { - assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); - } - assert_eq!(buffer.pts(), Some(i.seconds() + output_offset)); - assert_eq!(buffer.dts(), Some(i.seconds() + output_offset)); - assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND)); - } - - let ev = h.pull_event().unwrap(); - assert_eq!(ev.type_(), gst::EventType::StreamStart); - let ev = h.pull_event().unwrap(); - assert_eq!(ev.type_(), gst::EventType::Caps); - let ev = h.pull_event().unwrap(); - assert_eq!(ev.type_(), gst::EventType::Segment); - let ev = h.pull_event().unwrap(); - assert_eq!(ev.type_(), gst::EventType::Eos); -} - -#[test] -fn test_buffer_multi_stream_short_gops() { - init(); - - let mut h1 = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src")); - let mut h2 = gst_check::Harness::with_element(&h1.element().unwrap(), Some("sink_1"), None); - - // 5s fragment duration - h1.element() - .unwrap() - .set_property("fragment-duration", 5.seconds()); - - h1.set_src_caps( - gst::Caps::builder("video/x-h264") - .field("width", 1920i32) - .field("height", 1080i32) - .field("framerate", gst::Fraction::new(30, 1)) - .field("stream-format", "avc") - .field("alignment", "au") - .field("codec_data", gst::Buffer::with_size(1).unwrap()) - .build(), - ); - h1.play(); - - h2.set_src_caps( - gst::Caps::builder("audio/mpeg") - .field("mpegversion", 4i32) - .field("channels", 1i32) - .field("rate", 44100i32) - .field("stream-format", "raw") - .field("base-profile", "lc") - .field("profile", "lc") - .field("level", "2") - .field( - "codec_data", - gst::Buffer::from_slice([0x12, 0x08, 0x56, 0xe5, 0x00]), - ) - .build(), - ); - h2.play(); - - let output_offset = (60 * 60 * 1000).seconds(); - - // Push 8 buffers of 1s each, 1st, 4th and 7th buffer without DELTA_UNIT flag - for i in 0..8 { - let mut buffer = gst::Buffer::with_size(1).unwrap(); - { - let buffer = buffer.get_mut().unwrap(); - buffer.set_pts(i.seconds()); - buffer.set_dts(i.seconds()); - buffer.set_duration(gst::ClockTime::SECOND); - if i != 0 && i != 3 && i != 6 { - buffer.set_flags(gst::BufferFlags::DELTA_UNIT); - } - } - assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok)); - - let mut buffer = gst::Buffer::with_size(1).unwrap(); - { - let buffer = buffer.get_mut().unwrap(); - buffer.set_pts(i.seconds()); - buffer.set_dts(i.seconds()); - buffer.set_duration(gst::ClockTime::SECOND); - } - assert_eq!(h2.push(buffer), Ok(gst::FlowSuccess::Ok)); - - if i == 2 || i == 7 { - let ev = loop { - let ev = h1.pull_upstream_event().unwrap(); - if ev.type_() != gst::EventType::Reconfigure - && ev.type_() != gst::EventType::Latency - { - break ev; - } - }; - - let fku_time = if i == 2 { 5.seconds() } else { 8.seconds() }; - - assert_eq!(ev.type_(), gst::EventType::CustomUpstream); - assert_eq!( - gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(), - gst_video::UpstreamForceKeyUnitEvent { - running_time: Some(fku_time), - all_headers: true, - count: 0 - } - ); - - let ev = loop { - let ev = h2.pull_upstream_event().unwrap(); - if ev.type_() != gst::EventType::Reconfigure - && ev.type_() != gst::EventType::Latency - { - break ev; - } - }; - - assert_eq!(ev.type_(), gst::EventType::CustomUpstream); - assert_eq!( - gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(), - gst_video::UpstreamForceKeyUnitEvent { - running_time: Some(fku_time), - all_headers: true, - count: 0 - } - ); - } - } - - let header = h1.pull().unwrap(); - assert_eq!( - header.flags(), - gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT - ); - assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset)); - assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset)); - - let fragment_header = h1.pull().unwrap(); - assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER); - assert_eq!( - fragment_header.pts(), - Some(gst::ClockTime::ZERO + output_offset) - ); - assert_eq!( - fragment_header.dts(), - Some(gst::ClockTime::ZERO + output_offset) - ); - assert_eq!(fragment_header.duration(), Some(3.seconds())); - - for i in 0..3 { - for j in 0..2 { - let buffer = h1.pull().unwrap(); - if i == 2 && j == 1 { - assert_eq!( - buffer.flags(), - gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER - ); - } else { - assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); - } - - assert_eq!(buffer.pts(), Some(i.seconds() + output_offset)); - - if j == 0 { - assert_eq!(buffer.dts(), Some(i.seconds() + output_offset)); - } else { - assert!(buffer.dts().is_none()); - } - assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND)); - } - } - - h1.push_event(gst::event::Eos::new()); - h2.push_event(gst::event::Eos::new()); - - let fragment_header = h1.pull().unwrap(); - assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER); - assert_eq!(fragment_header.pts(), Some(3.seconds() + output_offset)); - assert_eq!(fragment_header.dts(), Some(3.seconds() + output_offset)); - assert_eq!(fragment_header.duration(), Some(5.seconds())); - - for i in 3..8 { - for j in 0..2 { - let buffer = h1.pull().unwrap(); - if i == 7 && j == 1 { - assert_eq!( - buffer.flags(), - gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER - ); - } else { - assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); - } - assert_eq!(buffer.pts(), Some(i.seconds() + output_offset)); - if j == 0 { - assert_eq!(buffer.dts(), Some(i.seconds() + output_offset)); - } else { - assert!(buffer.dts().is_none()); - } - assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND)); - } - } - - let ev = h1.pull_event().unwrap(); - assert_eq!(ev.type_(), gst::EventType::StreamStart); - let ev = h1.pull_event().unwrap(); - assert_eq!(ev.type_(), gst::EventType::Caps); - let ev = h1.pull_event().unwrap(); - assert_eq!(ev.type_(), gst::EventType::Segment); - let ev = h1.pull_event().unwrap(); - assert_eq!(ev.type_(), gst::EventType::Eos); -} |