Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/sdroege/gst-plugin-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Dröge <sebastian@centricular.com>2022-10-23 18:23:45 +0300
committerSebastian Dröge <sebastian@centricular.com>2022-10-23 20:25:08 +0300
commit211cd095d69726a3a2208feddd921d05b60c6540 (patch)
tree648bbbe80a68a68b3f6315fcf580cae146d0249e /generic
parent5d44e0eb3c309ce7ad0cfb378d0169d8ce3305b3 (diff)
Add new mux subdirectory for container formats
Contains the (incomplete) flavors FLV demuxer and the fragmented MP4 muxer for now. Fixes https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/173
Diffstat (limited to 'generic')
-rw-r--r--generic/fmp4/Cargo.toml52
l---------generic/fmp4/LICENSE1
-rw-r--r--generic/fmp4/build.rs3
-rw-r--r--generic/fmp4/examples/dash_vod.rs266
-rw-r--r--generic/fmp4/examples/hls_live.rs572
-rw-r--r--generic/fmp4/examples/hls_vod.rs472
-rw-r--r--generic/fmp4/src/fmp4mux/boxes.rs2073
-rw-r--r--generic/fmp4/src/fmp4mux/imp.rs2596
-rw-r--r--generic/fmp4/src/fmp4mux/mod.rs146
-rw-r--r--generic/fmp4/src/lib.rs34
-rw-r--r--generic/fmp4/tests/tests.rs1241
11 files changed, 0 insertions, 7456 deletions
diff --git a/generic/fmp4/Cargo.toml b/generic/fmp4/Cargo.toml
deleted file mode 100644
index f72c84a7..00000000
--- a/generic/fmp4/Cargo.toml
+++ /dev/null
@@ -1,52 +0,0 @@
-[package]
-name = "gst-plugin-fmp4"
-version = "0.9.0-alpha.1"
-authors = ["Sebastian Dröge <sebastian@centricular.com>"]
-license = "MPL-2.0"
-description = "GStreamer Fragmented MP4 Plugin"
-repository = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs"
-edition = "2021"
-rust-version = "1.63"
-
-[dependencies]
-anyhow = "1"
-gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
-gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
-gst-audio = { package = "gstreamer-audio", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
-gst-video = { package = "gstreamer-video", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
-once_cell = "1.0"
-uuid = { version = "1", features = ["v4"] }
-
-[lib]
-name = "gstfmp4"
-crate-type = ["cdylib", "rlib"]
-path = "src/lib.rs"
-
-[dev-dependencies]
-gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_18"] }
-gst-check = { package = "gstreamer-check", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_18"] }
-gst-pbutils = { package = "gstreamer-pbutils", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] }
-m3u8-rs = "5.0"
-chrono = "0.4"
-
-[build-dependencies]
-gst-plugin-version-helper = { path="../../version-helper" }
-
-[features]
-default = ["v1_18"]
-static = []
-capi = []
-v1_18 = ["gst-video/v1_18"]
-
-[package.metadata.capi]
-min_version = "0.8.0"
-
-[package.metadata.capi.header]
-enabled = false
-
-[package.metadata.capi.library]
-install_subdir = "gstreamer-1.0"
-versioning = false
-
-[package.metadata.capi.pkg_config]
-requires_private = "gstreamer-1.0, gstreamer-base-1.0, gstreamer-audio-1.0, gstreamer-video-1.0, gobject-2.0, glib-2.0, gmodule-2.0"
diff --git a/generic/fmp4/LICENSE b/generic/fmp4/LICENSE
deleted file mode 120000
index eb5d24fe..00000000
--- a/generic/fmp4/LICENSE
+++ /dev/null
@@ -1 +0,0 @@
-../../LICENSE-MPL-2.0 \ No newline at end of file
diff --git a/generic/fmp4/build.rs b/generic/fmp4/build.rs
deleted file mode 100644
index cda12e57..00000000
--- a/generic/fmp4/build.rs
+++ /dev/null
@@ -1,3 +0,0 @@
-fn main() {
- gst_plugin_version_helper::info()
-}
diff --git a/generic/fmp4/examples/dash_vod.rs b/generic/fmp4/examples/dash_vod.rs
deleted file mode 100644
index eb291d6e..00000000
--- a/generic/fmp4/examples/dash_vod.rs
+++ /dev/null
@@ -1,266 +0,0 @@
-// Copyright (C) 2021 Sebastian Dröge <sebastian@centricular.com>
-//
-// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
-// If a copy of the MPL was not distributed with this file, You can obtain one at
-// <https://mozilla.org/MPL/2.0/>.
-//
-// SPDX-License-Identifier: MPL-2.0
-
-// This creates a VoD DASH manifest based on the output of `cmafmux`. The media header
-// ("initialization segment") is written into a separate file as the segments, and each segment is
-// its own file too.
-//
-// All segments that are created are exactly 10s, expect for the last one which is only 3.333s.
-
-use gst::prelude::*;
-
-use std::fmt::Write;
-use std::path::PathBuf;
-use std::sync::{Arc, Mutex};
-
-use anyhow::Error;
-
-struct Segment {
- start_time: gst::ClockTime,
- duration: gst::ClockTime,
-}
-
-struct State {
- start_time: Option<gst::ClockTime>,
- end_time: Option<gst::ClockTime>,
- segments: Vec<Segment>,
- path: PathBuf,
-}
-
-fn main() -> Result<(), Error> {
- gst::init()?;
-
- gstfmp4::plugin_register_static()?;
-
- let state = Arc::new(Mutex::new(State {
- start_time: None,
- end_time: None,
- segments: Vec::new(),
- path: PathBuf::from("dash_stream"),
- }));
-
- let pipeline = gst::parse_launch("videotestsrc num-buffers=2500 ! timecodestamper ! video/x-raw,format=I420,width=1280,height=720,framerate=30/1 ! timeoverlay ! x264enc bframes=0 bitrate=2048 ! video/x-h264,profile=main ! cmafmux fragment-duration=10000000000 header-update-mode=update write-mehd=true ! appsink name=sink").unwrap().downcast::<gst::Pipeline>().unwrap();
-
- let sink = pipeline
- .by_name("sink")
- .unwrap()
- .dynamic_cast::<gst_app::AppSink>()
- .unwrap();
- sink.set_buffer_list(true);
-
- let state_clone = state.clone();
- sink.set_callbacks(
- gst_app::AppSinkCallbacks::builder()
- .new_sample(move |sink| {
- let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
- let mut state = state.lock().unwrap();
-
- // The muxer only outputs non-empty buffer lists
- let mut buffer_list = sample.buffer_list_owned().expect("no buffer list");
- assert!(!buffer_list.is_empty());
-
- let mut first = buffer_list.get(0).unwrap();
-
- // Each list contains a full segment, i.e. does not start with a DELTA_UNIT
- assert!(!first.flags().contains(gst::BufferFlags::DELTA_UNIT));
-
- // If the buffer has the DISCONT and HEADER flag set then it contains the media
- // header, i.e. the `ftyp`, `moov` and other media boxes.
- //
- // This might be the initial header or the updated header at the end of the stream.
- if first.flags().contains(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER) {
- let mut path = state.path.clone();
- std::fs::create_dir_all(&path).expect("failed to create directory");
- path.push("init.cmfi");
-
- println!("writing header to {}", path.display());
- let map = first.map_readable().unwrap();
- std::fs::write(path, &map).expect("failed to write header");
- drop(map);
-
- // Remove the header from the buffer list
- buffer_list.make_mut().remove(0, 1);
-
- // If the list is now empty then it only contained the media header and nothing
- // else.
- if buffer_list.is_empty() {
- return Ok(gst::FlowSuccess::Ok);
- }
-
- // Otherwise get the next buffer and continue working with that.
- first = buffer_list.get(0).unwrap();
- }
-
- // If the buffer only has the HEADER flag set then this is a segment header that is
- // followed by one or more actual media buffers.
- assert!(first.flags().contains(gst::BufferFlags::HEADER));
-
- let segment = sample.segment().expect("no segment")
- .downcast_ref::<gst::ClockTime>().expect("no time segment");
-
- // Initialize the start time with the first PTS we observed. This will be used
- // later for calculating the duration of the whole media for the DASH manifest.
- //
- // The PTS of the segment header is equivalent to the earliest PTS of the whole
- // segment.
- let pts = segment.to_running_time(first.pts().unwrap()).expect("can't get running time");
- if state.start_time.is_none() {
- state.start_time = Some(pts);
- }
-
- // The metadata of the first media buffer is duplicated to the segment header.
- // Based on this we can know the timecode of the first frame in this segment.
- let meta = first.meta::<gst_video::VideoTimeCodeMeta>().expect("no timecode meta");
-
- let mut path = state.path.clone();
- path.push(format!("segment_{}.cmfv", state.segments.len() + 1));
- println!("writing segment with timecode {} to {}", meta.tc(), path.display());
-
- // Calculate the end time at this point. The duration of the segment header is set
- // to the whole duration of this segment.
- let duration = first.duration().unwrap();
- let end_time = first.pts().unwrap() + first.duration().unwrap();
- state.end_time = Some(segment.to_running_time(end_time).expect("can't get running time"));
-
- let mut file = std::fs::File::create(path).expect("failed to open fragment");
- for buffer in &*buffer_list {
- use std::io::prelude::*;
-
- let map = buffer.map_readable().unwrap();
- file.write_all(&map).expect("failed to write fragment");
- }
-
- state.segments.push(Segment {
- start_time: pts,
- duration,
- });
-
- Ok(gst::FlowSuccess::Ok)
- })
- .eos(move |_sink| {
- let state = state_clone.lock().unwrap();
-
- // Now write the manifest
- let mut path = state.path.clone();
- path.push("manifest.mpd");
-
- println!("writing manifest to {}", path.display());
-
- let duration = state.end_time.opt_checked_sub(state.start_time).ok().flatten().unwrap().mseconds() as f64 / 1000.0;
-
- // Write the whole segment timeline out here, compressing multiple segments with
- // the same duration to a repeated segment.
- let mut segment_timeline = String::new();
- let mut write_segment = |start: gst::ClockTime, duration: gst::ClockTime, repeat: usize| {
- if repeat > 0 {
- writeln!(
- &mut segment_timeline,
- " <S t=\"{time}\" d=\"{duration}\" r=\"{repeat}\" />",
- time = start.mseconds(),
- duration = duration.mseconds(),
- repeat = repeat
- ).unwrap();
- } else {
- writeln!(
- &mut segment_timeline,
- " <S t=\"{time}\" d=\"{duration}\" />",
- time = start.mseconds(),
- duration = duration.mseconds()
- ).unwrap();
- }
- };
-
- let mut start = None;
- let mut num_segments = 0;
- let mut last_duration = None;
- for segment in &state.segments {
- if start.is_none() {
- start = Some(segment.start_time);
- }
- if last_duration.is_none() {
- last_duration = Some(segment.duration);
- }
-
- // If the duration of this segment is different from the previous one then we
- // have to write out the segment now.
- if last_duration != Some(segment.duration) {
- write_segment(start.unwrap(), last_duration.unwrap(), num_segments - 1);
- start = Some(segment.start_time);
- last_duration = Some(segment.duration);
- num_segments = 1;
- } else {
- num_segments += 1;
- }
- }
-
- // Write the last segment if any
- if num_segments > 0 {
- write_segment(start.unwrap(), last_duration.unwrap(), num_segments - 1);
- }
-
- let manifest = format!(r###"<?xml version="1.0" encoding="UTF-8"?>
-<MPD
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns="urn:mpeg:dash:schema:mpd:2011"
- xsi:schemaLocation="urn:mpeg:dash:schema:mpd:2011 DASH-MPD.xsd"
- type="static"
- mediaPresentationDuration="PT{duration:.3}S"
- profiles="urn:mpeg:dash:profile:isoff-on-demand:2011">
- <Period>
- <AdaptationSet mimeType="video/mp4" codecs="avc1.4d0228" frameRate="30/1" segmentAlignment="true" startWithSAP="1">
- <Representation id="A" bandwidth="2048000" with="1280" height="720">
- <SegmentTemplate timescale="1000" initialization="init.cmfi" media="segment_$Number$.cmfv">
- <SegmentTimeline>
-{segment_timeline} </SegmentTimeline>
- </SegmentTemplate>
- </Representation>
- </AdaptationSet>
- </Period>
-</MPD>
-"###,
- duration = duration, segment_timeline = segment_timeline);
-
- std::fs::write(path, &manifest).expect("failed to write manifest");
- })
- .build(),
- );
-
- pipeline.set_state(gst::State::Playing)?;
-
- let bus = pipeline
- .bus()
- .expect("Pipeline without bus. Shouldn't happen!");
-
- for msg in bus.iter_timed(gst::ClockTime::NONE) {
- use gst::MessageView;
-
- match msg.view() {
- MessageView::Eos(..) => {
- println!("EOS");
- break;
- }
- MessageView::Error(err) => {
- pipeline.set_state(gst::State::Null)?;
- eprintln!(
- "Got error from {}: {} ({})",
- msg.src()
- .map(|s| String::from(s.path_string()))
- .unwrap_or_else(|| "None".into()),
- err.error(),
- err.debug().unwrap_or_else(|| "".into()),
- );
- break;
- }
- _ => (),
- }
- }
-
- pipeline.set_state(gst::State::Null)?;
-
- Ok(())
-}
diff --git a/generic/fmp4/examples/hls_live.rs b/generic/fmp4/examples/hls_live.rs
deleted file mode 100644
index f1d9ed6d..00000000
--- a/generic/fmp4/examples/hls_live.rs
+++ /dev/null
@@ -1,572 +0,0 @@
-// Copyright (C) 2022 Mathieu Duponchelle <mathieu@centricular.com>
-//
-// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
-// If a copy of the MPL was not distributed with this file, You can obtain one at
-// <https://mozilla.org/MPL/2.0/>.
-//
-// SPDX-License-Identifier: MPL-2.0
-
-// This creates a live HLS stream with one video playlist and two video playlists.
-// Basic trimming is implemented
-
-use gst::prelude::*;
-
-use std::collections::VecDeque;
-use std::path::{Path, PathBuf};
-use std::sync::{Arc, Mutex};
-
-use anyhow::Error;
-use chrono::{DateTime, Duration, Utc};
-use m3u8_rs::{
- AlternativeMedia, AlternativeMediaType, MasterPlaylist, MediaPlaylist, MediaSegment,
- VariantStream,
-};
-
-struct State {
- video_streams: Vec<VideoStream>,
- audio_streams: Vec<AudioStream>,
- all_mimes: Vec<String>,
- path: PathBuf,
- wrote_manifest: bool,
-}
-
-impl State {
- fn maybe_write_manifest(&mut self) {
- if self.wrote_manifest {
- return;
- }
-
- if self.all_mimes.len() < self.video_streams.len() + self.audio_streams.len() {
- return;
- }
-
- let mut all_mimes = self.all_mimes.clone();
- all_mimes.sort();
- all_mimes.dedup();
-
- let playlist = MasterPlaylist {
- version: Some(7),
- variants: self
- .video_streams
- .iter()
- .map(|stream| {
- let mut path = PathBuf::new();
-
- path.push(&stream.name);
- path.push("manifest.m3u8");
-
- VariantStream {
- uri: path.as_path().display().to_string(),
- bandwidth: stream.bitrate,
- codecs: Some(all_mimes.join(",")),
- resolution: Some(m3u8_rs::Resolution {
- width: stream.width,
- height: stream.height,
- }),
- audio: Some("audio".to_string()),
- ..Default::default()
- }
- })
- .collect(),
- alternatives: self
- .audio_streams
- .iter()
- .map(|stream| {
- let mut path = PathBuf::new();
- path.push(&stream.name);
- path.push("manifest.m3u8");
-
- AlternativeMedia {
- media_type: AlternativeMediaType::Audio,
- uri: Some(path.as_path().display().to_string()),
- group_id: "audio".to_string(),
- language: Some(stream.lang.clone()),
- name: stream.name.clone(),
- default: stream.default,
- autoselect: stream.default,
- channels: Some("2".to_string()),
- ..Default::default()
- }
- })
- .collect(),
- independent_segments: true,
- ..Default::default()
- };
-
- println!("Writing master manifest to {}", self.path.display());
-
- let mut file = std::fs::File::create(&self.path).unwrap();
- playlist
- .write_to(&mut file)
- .expect("Failed to write master playlist");
-
- self.wrote_manifest = true;
- }
-}
-
-struct Segment {
- date_time: DateTime<Utc>,
- duration: gst::ClockTime,
- path: String,
-}
-
-struct UnreffedSegment {
- removal_time: DateTime<Utc>,
- path: String,
-}
-
-struct StreamState {
- path: PathBuf,
- segments: VecDeque<Segment>,
- trimmed_segments: VecDeque<UnreffedSegment>,
- start_date_time: Option<DateTime<Utc>>,
- start_time: Option<gst::ClockTime>,
- media_sequence: u64,
- segment_index: u32,
-}
-
-struct VideoStream {
- name: String,
- bitrate: u64,
- width: u64,
- height: u64,
-}
-
-struct AudioStream {
- name: String,
- lang: String,
- default: bool,
- wave: String,
-}
-
-fn trim_segments(state: &mut StreamState) {
- // Arbitrary 5 segments window
- while state.segments.len() > 5 {
- let segment = state.segments.pop_front().unwrap();
-
- state.media_sequence += 1;
-
- state.trimmed_segments.push_back(UnreffedSegment {
- // HLS spec mandates that segments are removed from the filesystem no sooner
- // than the duration of the longest playlist + duration of the segment.
- // This is 15 seconds (12.5 + 2.5) in our case, we use 20 seconds to be on the
- // safe side
- removal_time: segment
- .date_time
- .checked_add_signed(Duration::seconds(20))
- .unwrap(),
- path: segment.path.clone(),
- });
- }
-
- while let Some(segment) = state.trimmed_segments.front() {
- if segment.removal_time < state.segments.front().unwrap().date_time {
- let segment = state.trimmed_segments.pop_front().unwrap();
-
- let mut path = state.path.clone();
- path.push(segment.path);
- println!("Removing {}", path.display());
- std::fs::remove_file(path).expect("Failed to remove old segment");
- } else {
- break;
- }
- }
-}
-
-fn update_manifest(state: &mut StreamState) {
- // Now write the manifest
- let mut path = state.path.clone();
- path.push("manifest.m3u8");
-
- println!("writing manifest to {}", path.display());
-
- trim_segments(state);
-
- let playlist = MediaPlaylist {
- version: Some(7),
- target_duration: 2.5,
- media_sequence: state.media_sequence,
- segments: state
- .segments
- .iter()
- .enumerate()
- .map(|(idx, segment)| MediaSegment {
- uri: segment.path.to_string(),
- duration: (segment.duration.nseconds() as f64
- / gst::ClockTime::SECOND.nseconds() as f64) as f32,
- map: Some(m3u8_rs::Map {
- uri: "init.cmfi".into(),
- ..Default::default()
- }),
- program_date_time: if idx == 0 {
- Some(segment.date_time.into())
- } else {
- None
- },
- ..Default::default()
- })
- .collect(),
- end_list: false,
- playlist_type: None,
- i_frames_only: false,
- start: None,
- independent_segments: true,
- ..Default::default()
- };
-
- let mut file = std::fs::File::create(path).unwrap();
- playlist
- .write_to(&mut file)
- .expect("Failed to write media playlist");
-}
-
-fn setup_appsink(appsink: &gst_app::AppSink, name: &str, path: &Path, is_video: bool) {
- let mut path: PathBuf = path.into();
- path.push(name);
-
- let state = Arc::new(Mutex::new(StreamState {
- segments: VecDeque::new(),
- trimmed_segments: VecDeque::new(),
- path,
- start_date_time: None,
- start_time: gst::ClockTime::NONE,
- media_sequence: 0,
- segment_index: 0,
- }));
-
- appsink.set_callbacks(
- gst_app::AppSinkCallbacks::builder()
- .new_sample(move |sink| {
- let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
- let mut state = state.lock().unwrap();
-
- // The muxer only outputs non-empty buffer lists
- let mut buffer_list = sample.buffer_list_owned().expect("no buffer list");
- assert!(!buffer_list.is_empty());
-
- let mut first = buffer_list.get(0).unwrap();
-
- // Each list contains a full segment, i.e. does not start with a DELTA_UNIT
- assert!(!first.flags().contains(gst::BufferFlags::DELTA_UNIT));
-
- // If the buffer has the DISCONT and HEADER flag set then it contains the media
- // header, i.e. the `ftyp`, `moov` and other media boxes.
- //
- // This might be the initial header or the updated header at the end of the stream.
- if first
- .flags()
- .contains(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER)
- {
- let mut path = state.path.clone();
- std::fs::create_dir_all(&path).expect("failed to create directory");
- path.push("init.cmfi");
-
- println!("writing header to {}", path.display());
- let map = first.map_readable().unwrap();
- std::fs::write(path, &map).expect("failed to write header");
- drop(map);
-
- // Remove the header from the buffer list
- buffer_list.make_mut().remove(0, 1);
-
- // If the list is now empty then it only contained the media header and nothing
- // else.
- if buffer_list.is_empty() {
- return Ok(gst::FlowSuccess::Ok);
- }
-
- // Otherwise get the next buffer and continue working with that.
- first = buffer_list.get(0).unwrap();
- }
-
- // If the buffer only has the HEADER flag set then this is a segment header that is
- // followed by one or more actual media buffers.
- assert!(first.flags().contains(gst::BufferFlags::HEADER));
-
- let mut path = state.path.clone();
- let basename = format!(
- "segment_{}.{}",
- state.segment_index,
- if is_video { "cmfv" } else { "cmfa" }
- );
- state.segment_index += 1;
- path.push(&basename);
-
- let segment = sample
- .segment()
- .expect("no segment")
- .downcast_ref::<gst::ClockTime>()
- .expect("no time segment");
- let pts = segment
- .to_running_time(first.pts().unwrap())
- .expect("can't get running time");
-
- if state.start_time.is_none() {
- state.start_time = Some(pts);
- }
-
- if state.start_date_time.is_none() {
- let now_utc = Utc::now();
- let now_gst = sink.clock().unwrap().time().unwrap();
- let pts_clock_time = pts + sink.base_time().unwrap();
-
- let diff = now_gst.checked_sub(pts_clock_time).unwrap();
- let pts_utc = now_utc
- .checked_sub_signed(Duration::nanoseconds(diff.nseconds() as i64))
- .unwrap();
-
- state.start_date_time = Some(pts_utc);
- }
-
- let duration = first.duration().unwrap();
-
- let mut file = std::fs::File::create(&path).expect("failed to open fragment");
- for buffer in &*buffer_list {
- use std::io::prelude::*;
-
- let map = buffer.map_readable().unwrap();
- file.write_all(&map).expect("failed to write fragment");
- }
-
- let date_time = state
- .start_date_time
- .unwrap()
- .checked_add_signed(Duration::nanoseconds(
- pts.opt_checked_sub(state.start_time)
- .unwrap()
- .unwrap()
- .nseconds() as i64,
- ))
- .unwrap();
-
- println!(
- "wrote segment with date time {} to {}",
- date_time,
- path.display()
- );
-
- state.segments.push_back(Segment {
- duration,
- path: basename.to_string(),
- date_time,
- });
-
- update_manifest(&mut state);
-
- Ok(gst::FlowSuccess::Ok)
- })
- .eos(move |_sink| {
- unreachable!();
- })
- .build(),
- );
-}
-
-fn probe_encoder(state: Arc<Mutex<State>>, enc: gst::Element) {
- enc.static_pad("src").unwrap().add_probe(
- gst::PadProbeType::EVENT_DOWNSTREAM,
- move |_pad, info| match info.data {
- Some(gst::PadProbeData::Event(ref ev)) => match ev.view() {
- gst::EventView::Caps(e) => {
- let mime = gst_pbutils::codec_utils_caps_get_mime_codec(&e.caps().to_owned());
-
- let mut state = state.lock().unwrap();
- state.all_mimes.push(mime.unwrap().into());
- state.maybe_write_manifest();
- gst::PadProbeReturn::Remove
- }
- _ => gst::PadProbeReturn::Ok,
- },
- _ => gst::PadProbeReturn::Ok,
- },
- );
-}
-
-impl VideoStream {
- fn setup(
- &self,
- state: Arc<Mutex<State>>,
- pipeline: &gst::Pipeline,
- path: &Path,
- ) -> Result<(), Error> {
- let src = gst::ElementFactory::make("videotestsrc")
- .property("is-live", true)
- .build()?;
-
- let raw_capsfilter = gst::ElementFactory::make("capsfilter")
- .property(
- "caps",
- gst_video::VideoCapsBuilder::new()
- .format(gst_video::VideoFormat::I420)
- .width(self.width as i32)
- .height(self.height as i32)
- .framerate(30.into())
- .build(),
- )
- .build()?;
- let timeoverlay = gst::ElementFactory::make("timeoverlay").build()?;
- let enc = gst::ElementFactory::make("x264enc")
- .property("bframes", 0u32)
- .property("bitrate", self.bitrate as u32 / 1000u32)
- .property_from_str("tune", "zerolatency")
- .build()?;
- let h264_capsfilter = gst::ElementFactory::make("capsfilter")
- .property(
- "caps",
- gst::Caps::builder("video/x-h264")
- .field("profile", "main")
- .build(),
- )
- .build()?;
- let mux = gst::ElementFactory::make("cmafmux")
- .property("fragment-duration", 2500.mseconds())
- .property_from_str("header-update-mode", "update")
- .property("write-mehd", true)
- .build()?;
- let appsink = gst_app::AppSink::builder().buffer_list(true).build();
-
- pipeline.add_many(&[
- &src,
- &raw_capsfilter,
- &timeoverlay,
- &enc,
- &h264_capsfilter,
- &mux,
- appsink.upcast_ref(),
- ])?;
-
- gst::Element::link_many(&[
- &src,
- &raw_capsfilter,
- &timeoverlay,
- &enc,
- &h264_capsfilter,
- &mux,
- appsink.upcast_ref(),
- ])?;
-
- probe_encoder(state, enc);
-
- setup_appsink(&appsink, &self.name, path, true);
-
- Ok(())
- }
-}
-
-impl AudioStream {
- fn setup(
- &self,
- state: Arc<Mutex<State>>,
- pipeline: &gst::Pipeline,
- path: &Path,
- ) -> Result<(), Error> {
- let src = gst::ElementFactory::make("audiotestsrc")
- .property("is-live", true)
- .property_from_str("wave", &self.wave)
- .property("fragment-duration", 2500.mseconds())
- .build()?;
- let enc = gst::ElementFactory::make("avenc_aac").build()?;
- let mux = gst::ElementFactory::make("cmafmux")
- .property_from_str("header-update-mode", "update")
- .property("write-mehd", true)
- .build()?;
- let appsink = gst_app::AppSink::builder().buffer_list(true).build();
-
- pipeline.add_many(&[&src, &enc, &mux, appsink.upcast_ref()])?;
-
- gst::Element::link_many(&[&src, &enc, &mux, appsink.upcast_ref()])?;
-
- probe_encoder(state, enc);
-
- setup_appsink(&appsink, &self.name, path, false);
-
- Ok(())
- }
-}
-
-fn main() -> Result<(), Error> {
- gst::init()?;
-
- gstfmp4::plugin_register_static()?;
-
- let path = PathBuf::from("hls_live_stream");
-
- let pipeline = gst::Pipeline::default();
-
- std::fs::create_dir_all(&path).expect("failed to create directory");
-
- let mut manifest_path = path.clone();
- manifest_path.push("manifest.m3u8");
-
- let state = Arc::new(Mutex::new(State {
- video_streams: vec![VideoStream {
- name: "video_0".to_string(),
- bitrate: 2_048_000,
- width: 1280,
- height: 720,
- }],
- audio_streams: vec![
- AudioStream {
- name: "audio_0".to_string(),
- lang: "eng".to_string(),
- default: true,
- wave: "sine".to_string(),
- },
- AudioStream {
- name: "audio_1".to_string(),
- lang: "fre".to_string(),
- default: false,
- wave: "white-noise".to_string(),
- },
- ],
- all_mimes: vec![],
- path: manifest_path.clone(),
- wrote_manifest: false,
- }));
-
- {
- let state_lock = state.lock().unwrap();
-
- for stream in &state_lock.video_streams {
- stream.setup(state.clone(), &pipeline, &path)?;
- }
-
- for stream in &state_lock.audio_streams {
- stream.setup(state.clone(), &pipeline, &path)?;
- }
- }
-
- pipeline.set_state(gst::State::Playing)?;
-
- let bus = pipeline
- .bus()
- .expect("Pipeline without bus. Shouldn't happen!");
-
- for msg in bus.iter_timed(gst::ClockTime::NONE) {
- use gst::MessageView;
-
- match msg.view() {
- MessageView::Eos(..) => {
- println!("EOS");
- break;
- }
- MessageView::Error(err) => {
- pipeline.set_state(gst::State::Null)?;
- eprintln!(
- "Got error from {}: {} ({})",
- msg.src()
- .map(|s| String::from(s.path_string()))
- .unwrap_or_else(|| "None".into()),
- err.error(),
- err.debug().unwrap_or_else(|| "".into()),
- );
- break;
- }
- _ => (),
- }
- }
-
- pipeline.set_state(gst::State::Null)?;
-
- Ok(())
-}
diff --git a/generic/fmp4/examples/hls_vod.rs b/generic/fmp4/examples/hls_vod.rs
deleted file mode 100644
index 4a2533cd..00000000
--- a/generic/fmp4/examples/hls_vod.rs
+++ /dev/null
@@ -1,472 +0,0 @@
-// Copyright (C) 2022 Mathieu Duponchelle <mathieu@centricular.com>
-//
-// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
-// If a copy of the MPL was not distributed with this file, You can obtain one at
-// <https://mozilla.org/MPL/2.0/>.
-//
-// SPDX-License-Identifier: MPL-2.0
-
-// This creates a 10 second VOD HLS stream with one video playlist and two audio
-// playlists. Each segment is 2.5 second long.
-
-use gst::prelude::*;
-
-use std::path::{Path, PathBuf};
-use std::sync::{Arc, Mutex};
-
-use anyhow::Error;
-
-use m3u8_rs::{
- AlternativeMedia, AlternativeMediaType, MasterPlaylist, MediaPlaylist, MediaPlaylistType,
- MediaSegment, VariantStream,
-};
-
-struct Segment {
- duration: gst::ClockTime,
- path: String,
-}
-
-struct StreamState {
- path: PathBuf,
- segments: Vec<Segment>,
-}
-
-struct VideoStream {
- name: String,
- bitrate: u64,
- width: u64,
- height: u64,
-}
-
-struct AudioStream {
- name: String,
- lang: String,
- default: bool,
- wave: String,
-}
-
-struct State {
- video_streams: Vec<VideoStream>,
- audio_streams: Vec<AudioStream>,
- all_mimes: Vec<String>,
- path: PathBuf,
- wrote_manifest: bool,
-}
-
-impl State {
- fn maybe_write_manifest(&mut self) {
- if self.wrote_manifest {
- return;
- }
-
- if self.all_mimes.len() < self.video_streams.len() + self.audio_streams.len() {
- return;
- }
-
- let mut all_mimes = self.all_mimes.clone();
- all_mimes.sort();
- all_mimes.dedup();
-
- let playlist = MasterPlaylist {
- version: Some(7),
- variants: self
- .video_streams
- .iter()
- .map(|stream| {
- let mut path = PathBuf::new();
-
- path.push(&stream.name);
- path.push("manifest.m3u8");
-
- VariantStream {
- uri: path.as_path().display().to_string(),
- bandwidth: stream.bitrate,
- codecs: Some(all_mimes.join(",")),
- resolution: Some(m3u8_rs::Resolution {
- width: stream.width,
- height: stream.height,
- }),
- audio: Some("audio".to_string()),
- ..Default::default()
- }
- })
- .collect(),
- alternatives: self
- .audio_streams
- .iter()
- .map(|stream| {
- let mut path = PathBuf::new();
- path.push(&stream.name);
- path.push("manifest.m3u8");
-
- AlternativeMedia {
- media_type: AlternativeMediaType::Audio,
- uri: Some(path.as_path().display().to_string()),
- group_id: "audio".to_string(),
- language: Some(stream.lang.clone()),
- name: stream.name.clone(),
- default: stream.default,
- autoselect: stream.default,
- channels: Some("2".to_string()),
- ..Default::default()
- }
- })
- .collect(),
- independent_segments: true,
- ..Default::default()
- };
-
- println!("Writing master manifest to {}", self.path.display());
-
- let mut file = std::fs::File::create(&self.path).unwrap();
- playlist
- .write_to(&mut file)
- .expect("Failed to write master playlist");
-
- self.wrote_manifest = true;
- }
-}
-
-fn setup_appsink(appsink: &gst_app::AppSink, name: &str, path: &Path, is_video: bool) {
- let mut path: PathBuf = path.into();
- path.push(name);
-
- let state = Arc::new(Mutex::new(StreamState {
- segments: Vec::new(),
- path,
- }));
-
- let state_clone = state.clone();
- appsink.set_callbacks(
- gst_app::AppSinkCallbacks::builder()
- .new_sample(move |sink| {
- let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
- let mut state = state.lock().unwrap();
-
- // The muxer only outputs non-empty buffer lists
- let mut buffer_list = sample.buffer_list_owned().expect("no buffer list");
- assert!(!buffer_list.is_empty());
-
- let mut first = buffer_list.get(0).unwrap();
-
- // Each list contains a full segment, i.e. does not start with a DELTA_UNIT
- assert!(!first.flags().contains(gst::BufferFlags::DELTA_UNIT));
-
- // If the buffer has the DISCONT and HEADER flag set then it contains the media
- // header, i.e. the `ftyp`, `moov` and other media boxes.
- //
- // This might be the initial header or the updated header at the end of the stream.
- if first
- .flags()
- .contains(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER)
- {
- let mut path = state.path.clone();
- std::fs::create_dir_all(&path).expect("failed to create directory");
- path.push("init.cmfi");
-
- println!("writing header to {}", path.display());
- let map = first.map_readable().unwrap();
- std::fs::write(path, &map).expect("failed to write header");
- drop(map);
-
- // Remove the header from the buffer list
- buffer_list.make_mut().remove(0, 1);
-
- // If the list is now empty then it only contained the media header and nothing
- // else.
- if buffer_list.is_empty() {
- return Ok(gst::FlowSuccess::Ok);
- }
-
- // Otherwise get the next buffer and continue working with that.
- first = buffer_list.get(0).unwrap();
- }
-
- // If the buffer only has the HEADER flag set then this is a segment header that is
- // followed by one or more actual media buffers.
- assert!(first.flags().contains(gst::BufferFlags::HEADER));
-
- let mut path = state.path.clone();
- let basename = format!(
- "segment_{}.{}",
- state.segments.len() + 1,
- if is_video { "cmfv" } else { "cmfa" }
- );
- path.push(&basename);
- println!("writing segment to {}", path.display());
-
- let duration = first.duration().unwrap();
-
- let mut file = std::fs::File::create(path).expect("failed to open fragment");
- for buffer in &*buffer_list {
- use std::io::prelude::*;
-
- let map = buffer.map_readable().unwrap();
- file.write_all(&map).expect("failed to write fragment");
- }
-
- state.segments.push(Segment {
- duration,
- path: basename.to_string(),
- });
-
- Ok(gst::FlowSuccess::Ok)
- })
- .eos(move |_sink| {
- let state = state_clone.lock().unwrap();
-
- // Now write the manifest
- let mut path = state.path.clone();
- path.push("manifest.m3u8");
-
- println!("writing manifest to {}", path.display());
-
- let playlist = MediaPlaylist {
- version: Some(7),
- target_duration: 2.5,
- media_sequence: 0,
- segments: state
- .segments
- .iter()
- .map(|segment| MediaSegment {
- uri: segment.path.to_string(),
- duration: (segment.duration.nseconds() as f64
- / gst::ClockTime::SECOND.nseconds() as f64)
- as f32,
- map: Some(m3u8_rs::Map {
- uri: "init.cmfi".into(),
- ..Default::default()
- }),
- ..Default::default()
- })
- .collect(),
- end_list: true,
- playlist_type: Some(MediaPlaylistType::Vod),
- i_frames_only: false,
- start: None,
- independent_segments: true,
- ..Default::default()
- };
-
- let mut file = std::fs::File::create(path).unwrap();
- playlist
- .write_to(&mut file)
- .expect("Failed to write media playlist");
- })
- .build(),
- );
-}
-
-fn probe_encoder(state: Arc<Mutex<State>>, enc: gst::Element) {
- enc.static_pad("src").unwrap().add_probe(
- gst::PadProbeType::EVENT_DOWNSTREAM,
- move |_pad, info| match info.data {
- Some(gst::PadProbeData::Event(ref ev)) => match ev.view() {
- gst::EventView::Caps(e) => {
- let mime = gst_pbutils::codec_utils_caps_get_mime_codec(&e.caps().to_owned());
-
- let mut state = state.lock().unwrap();
- state.all_mimes.push(mime.unwrap().into());
- state.maybe_write_manifest();
- gst::PadProbeReturn::Remove
- }
- _ => gst::PadProbeReturn::Ok,
- },
- _ => gst::PadProbeReturn::Ok,
- },
- );
-}
-
-impl VideoStream {
- fn setup(
- &self,
- state: Arc<Mutex<State>>,
- pipeline: &gst::Pipeline,
- path: &Path,
- ) -> Result<(), Error> {
- let src = gst::ElementFactory::make("videotestsrc")
- .property("num-buffers", 300)
- .build()?;
- let raw_capsfilter = gst::ElementFactory::make("capsfilter")
- .property(
- "caps",
- gst_video::VideoCapsBuilder::new()
- .format(gst_video::VideoFormat::I420)
- .width(self.width as i32)
- .height(self.height as i32)
- .framerate(30.into())
- .build(),
- )
- .build()?;
- let timeoverlay = gst::ElementFactory::make("timeoverlay").build()?;
- let enc = gst::ElementFactory::make("x264enc")
- .property("bframes", 0u32)
- .property("bitrate", self.bitrate as u32 / 1000u32)
- .build()?;
- let h264_capsfilter = gst::ElementFactory::make("capsfilter")
- .property(
- "caps",
- gst::Caps::builder("video/x-h264")
- .field("profile", "main")
- .build(),
- )
- .build()?;
- let mux = gst::ElementFactory::make("cmafmux")
- .property("fragment-duration", 2500.mseconds())
- .property_from_str("header-update-mode", "update")
- .property("write-mehd", true)
- .build()?;
- let appsink = gst_app::AppSink::builder().buffer_list(true).build();
-
- pipeline.add_many(&[
- &src,
- &raw_capsfilter,
- &timeoverlay,
- &enc,
- &h264_capsfilter,
- &mux,
- appsink.upcast_ref(),
- ])?;
-
- gst::Element::link_many(&[
- &src,
- &raw_capsfilter,
- &timeoverlay,
- &enc,
- &h264_capsfilter,
- &mux,
- appsink.upcast_ref(),
- ])?;
-
- probe_encoder(state, enc);
-
- setup_appsink(&appsink, &self.name, path, true);
-
- Ok(())
- }
-}
-
-impl AudioStream {
- fn setup(
- &self,
- state: Arc<Mutex<State>>,
- pipeline: &gst::Pipeline,
- path: &Path,
- ) -> Result<(), Error> {
- let src = gst::ElementFactory::make("audiotestsrc")
- .property("num-buffers", 100)
- .property("samplesperbuffer", 4410)
- .property_from_str("wave", &self.wave)
- .build()?;
- let raw_capsfilter = gst::ElementFactory::make("capsfilter")
- .property(
- "caps",
- gst_audio::AudioCapsBuilder::new().rate(44100).build(),
- )
- .build()?;
- let enc = gst::ElementFactory::make("avenc_aac").build()?;
- let mux = gst::ElementFactory::make("cmafmux")
- .property("fragment-duration", 2500.mseconds())
- .property_from_str("header-update-mode", "update")
- .property("write-mehd", true)
- .build()?;
- let appsink = gst_app::AppSink::builder().buffer_list(true).build();
-
- pipeline.add_many(&[&src, &raw_capsfilter, &enc, &mux, appsink.upcast_ref()])?;
-
- gst::Element::link_many(&[&src, &raw_capsfilter, &enc, &mux, appsink.upcast_ref()])?;
-
- probe_encoder(state, enc);
-
- setup_appsink(&appsink, &self.name, path, false);
-
- Ok(())
- }
-}
-
-fn main() -> Result<(), Error> {
- gst::init()?;
-
- gstfmp4::plugin_register_static()?;
-
- let path = PathBuf::from("hls_vod_stream");
-
- let pipeline = gst::Pipeline::default();
-
- std::fs::create_dir_all(&path).expect("failed to create directory");
-
- let mut manifest_path = path.clone();
- manifest_path.push("manifest.m3u8");
-
- let state = Arc::new(Mutex::new(State {
- video_streams: vec![VideoStream {
- name: "video_0".to_string(),
- bitrate: 2_048_000,
- width: 1280,
- height: 720,
- }],
- audio_streams: vec![
- AudioStream {
- name: "audio_0".to_string(),
- lang: "eng".to_string(),
- default: true,
- wave: "sine".to_string(),
- },
- AudioStream {
- name: "audio_1".to_string(),
- lang: "fre".to_string(),
- default: false,
- wave: "white-noise".to_string(),
- },
- ],
- all_mimes: vec![],
- path: manifest_path.clone(),
- wrote_manifest: false,
- }));
-
- {
- let state_lock = state.lock().unwrap();
-
- for stream in &state_lock.video_streams {
- stream.setup(state.clone(), &pipeline, &path)?;
- }
-
- for stream in &state_lock.audio_streams {
- stream.setup(state.clone(), &pipeline, &path)?;
- }
- }
-
- pipeline.set_state(gst::State::Playing)?;
-
- let bus = pipeline
- .bus()
- .expect("Pipeline without bus. Shouldn't happen!");
-
- for msg in bus.iter_timed(gst::ClockTime::NONE) {
- use gst::MessageView;
-
- match msg.view() {
- MessageView::Eos(..) => {
- println!("EOS");
- break;
- }
- MessageView::Error(err) => {
- pipeline.set_state(gst::State::Null)?;
- eprintln!(
- "Got error from {}: {} ({})",
- msg.src()
- .map(|s| String::from(s.path_string()))
- .unwrap_or_else(|| "None".into()),
- err.error(),
- err.debug().unwrap_or_else(|| "".into()),
- );
- break;
- }
- _ => (),
- }
- }
-
- pipeline.set_state(gst::State::Null)?;
-
- Ok(())
-}
diff --git a/generic/fmp4/src/fmp4mux/boxes.rs b/generic/fmp4/src/fmp4mux/boxes.rs
deleted file mode 100644
index 9b69fb36..00000000
--- a/generic/fmp4/src/fmp4mux/boxes.rs
+++ /dev/null
@@ -1,2073 +0,0 @@
-// Copyright (C) 2021 Sebastian Dröge <sebastian@centricular.com>
-//
-// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
-// If a copy of the MPL was not distributed with this file, You can obtain one at
-// <https://mozilla.org/MPL/2.0/>.
-//
-// SPDX-License-Identifier: MPL-2.0
-
-use gst::prelude::*;
-
-use anyhow::{anyhow, bail, Context, Error};
-
-use super::Buffer;
-
-fn write_box<T, F: FnOnce(&mut Vec<u8>) -> Result<T, Error>>(
- vec: &mut Vec<u8>,
- fourcc: impl std::borrow::Borrow<[u8; 4]>,
- content_func: F,
-) -> Result<T, Error> {
- // Write zero size ...
- let size_pos = vec.len();
- vec.extend([0u8; 4]);
- vec.extend(fourcc.borrow());
-
- let res = content_func(vec)?;
-
- // ... and update it here later.
- let size: u32 = vec
- .len()
- .checked_sub(size_pos)
- .expect("vector shrunk")
- .try_into()
- .context("too big box content")?;
- vec[size_pos..][..4].copy_from_slice(&size.to_be_bytes());
-
- Ok(res)
-}
-
-const FULL_BOX_VERSION_0: u8 = 0;
-const FULL_BOX_VERSION_1: u8 = 1;
-
-const FULL_BOX_FLAGS_NONE: u32 = 0;
-
-fn write_full_box<T, F: FnOnce(&mut Vec<u8>) -> Result<T, Error>>(
- vec: &mut Vec<u8>,
- fourcc: impl std::borrow::Borrow<[u8; 4]>,
- version: u8,
- flags: u32,
- content_func: F,
-) -> Result<T, Error> {
- write_box(vec, fourcc, move |vec| {
- assert_eq!(flags >> 24, 0);
- vec.extend(((u32::from(version) << 24) | flags).to_be_bytes());
- content_func(vec)
- })
-}
-
-fn cmaf_brands_from_caps(caps: &gst::CapsRef, compatible_brands: &mut Vec<&'static [u8; 4]>) {
- let s = caps.structure(0).unwrap();
- match s.name() {
- "video/x-h264" => {
- let width = s.get::<i32>("width").ok();
- let height = s.get::<i32>("height").ok();
- let fps = s.get::<gst::Fraction>("framerate").ok();
- let profile = s.get::<&str>("profile").ok();
- let level = s
- .get::<&str>("level")
- .ok()
- .map(|l| l.split_once('.').unwrap_or((l, "0")));
- let colorimetry = s.get::<&str>("colorimetry").ok();
-
- if let (Some(width), Some(height), Some(profile), Some(level), Some(fps)) =
- (width, height, profile, level, fps)
- {
- if profile == "high"
- || profile == "main"
- || profile == "baseline"
- || profile == "constrained-baseline"
- {
- if width <= 864
- && height <= 576
- && level <= ("3", "1")
- && fps <= gst::Fraction::new(60, 1)
- {
- #[cfg(feature = "v1_18")]
- {
- if let Some(colorimetry) = colorimetry
- .and_then(|c| c.parse::<gst_video::VideoColorimetry>().ok())
- {
- if matches!(
- colorimetry.primaries(),
- gst_video::VideoColorPrimaries::Bt709
- | gst_video::VideoColorPrimaries::Bt470bg
- | gst_video::VideoColorPrimaries::Smpte170m
- ) && matches!(
- colorimetry.transfer(),
- gst_video::VideoTransferFunction::Bt709
- | gst_video::VideoTransferFunction::Bt601
- ) && matches!(
- colorimetry.matrix(),
- gst_video::VideoColorMatrix::Bt709
- | gst_video::VideoColorMatrix::Bt601
- ) {
- compatible_brands.push(b"cfsd");
- }
- } else {
- // Assume it's OK
- compatible_brands.push(b"cfsd");
- }
- }
- #[cfg(not(feature = "v1_18"))]
- {
- // Assume it's OK
- compatible_brands.push(b"cfsd");
- }
- } else if width <= 1920
- && height <= 1080
- && level <= ("4", "0")
- && fps <= gst::Fraction::new(60, 1)
- {
- #[cfg(feature = "v1_18")]
- {
- if let Some(colorimetry) = colorimetry
- .and_then(|c| c.parse::<gst_video::VideoColorimetry>().ok())
- {
- if matches!(
- colorimetry.primaries(),
- gst_video::VideoColorPrimaries::Bt709
- ) && matches!(
- colorimetry.transfer(),
- gst_video::VideoTransferFunction::Bt709
- ) && matches!(
- colorimetry.matrix(),
- gst_video::VideoColorMatrix::Bt709
- ) {
- compatible_brands.push(b"cfhd");
- }
- } else {
- // Assume it's OK
- compatible_brands.push(b"cfhd");
- }
- }
- #[cfg(not(feature = "v1_18"))]
- {
- // Assume it's OK
- compatible_brands.push(b"cfhd");
- }
- } else if width <= 1920
- && height <= 1080
- && level <= ("4", "2")
- && fps <= gst::Fraction::new(60, 1)
- {
- if let Some(colorimetry) =
- colorimetry.and_then(|c| c.parse::<gst_video::VideoColorimetry>().ok())
- {
- if matches!(
- colorimetry.primaries(),
- gst_video::VideoColorPrimaries::Bt709
- ) && matches!(
- colorimetry.transfer(),
- gst_video::VideoTransferFunction::Bt709
- ) && matches!(
- colorimetry.matrix(),
- gst_video::VideoColorMatrix::Bt709
- ) {
- compatible_brands.push(b"chdf");
- }
- } else {
- // Assume it's OK
- compatible_brands.push(b"chdf");
- }
- }
- }
- }
- }
- "audio/mpeg" => {
- compatible_brands.push(b"caac");
- }
- "video/x-h265" => {
- let width = s.get::<i32>("width").ok();
- let height = s.get::<i32>("height").ok();
- let fps = s.get::<gst::Fraction>("framerate").ok();
- let profile = s.get::<&str>("profile").ok();
- let tier = s.get::<&str>("tier").ok();
- let level = s
- .get::<&str>("level")
- .ok()
- .map(|l| l.split_once('.').unwrap_or((l, "0")));
- let colorimetry = s.get::<&str>("colorimetry").ok();
-
- if let (Some(width), Some(height), Some(profile), Some(tier), Some(level), Some(fps)) =
- (width, height, profile, tier, level, fps)
- {
- if profile == "main" && tier == "main" {
- if width <= 1920
- && height <= 1080
- && level <= ("4", "1")
- && fps <= gst::Fraction::new(60, 1)
- {
- if let Some(colorimetry) =
- colorimetry.and_then(|c| c.parse::<gst_video::VideoColorimetry>().ok())
- {
- if matches!(
- colorimetry.primaries(),
- gst_video::VideoColorPrimaries::Bt709
- ) && matches!(
- colorimetry.transfer(),
- gst_video::VideoTransferFunction::Bt709
- ) && matches!(
- colorimetry.matrix(),
- gst_video::VideoColorMatrix::Bt709
- ) {
- compatible_brands.push(b"chhd");
- }
- } else {
- // Assume it's OK
- compatible_brands.push(b"chhd");
- }
- } else if width <= 3840
- && height <= 2160
- && level <= ("5", "0")
- && fps <= gst::Fraction::new(60, 1)
- {
- if let Some(colorimetry) =
- colorimetry.and_then(|c| c.parse::<gst_video::VideoColorimetry>().ok())
- {
- if matches!(
- colorimetry.primaries(),
- gst_video::VideoColorPrimaries::Bt709
- ) && matches!(
- colorimetry.transfer(),
- gst_video::VideoTransferFunction::Bt709
- ) && matches!(
- colorimetry.matrix(),
- gst_video::VideoColorMatrix::Bt709
- ) {
- compatible_brands.push(b"cud8");
- }
- } else {
- // Assume it's OK
- compatible_brands.push(b"cud8");
- }
- }
- } else if profile == "main-10" && tier == "main-10" {
- if width <= 1920
- && height <= 1080
- && level <= ("4", "1")
- && fps <= gst::Fraction::new(60, 1)
- {
- if let Some(colorimetry) =
- colorimetry.and_then(|c| c.parse::<gst_video::VideoColorimetry>().ok())
- {
- if matches!(
- colorimetry.primaries(),
- gst_video::VideoColorPrimaries::Bt709
- ) && matches!(
- colorimetry.transfer(),
- gst_video::VideoTransferFunction::Bt709
- ) && matches!(
- colorimetry.matrix(),
- gst_video::VideoColorMatrix::Bt709
- ) {
- compatible_brands.push(b"chh1");
- }
- } else {
- // Assume it's OK
- compatible_brands.push(b"chh1");
- }
- } else if width <= 3840
- && height <= 2160
- && level <= ("5", "1")
- && fps <= gst::Fraction::new(60, 1)
- {
- #[cfg(feature = "v1_18")]
- if let Some(colorimetry) =
- colorimetry.and_then(|c| c.parse::<gst_video::VideoColorimetry>().ok())
- {
- if matches!(
- colorimetry.primaries(),
- gst_video::VideoColorPrimaries::Bt709
- | gst_video::VideoColorPrimaries::Bt2020
- ) && matches!(
- colorimetry.transfer(),
- gst_video::VideoTransferFunction::Bt709
- | gst_video::VideoTransferFunction::Bt202010
- | gst_video::VideoTransferFunction::Bt202012
- ) && matches!(
- colorimetry.matrix(),
- gst_video::VideoColorMatrix::Bt709
- | gst_video::VideoColorMatrix::Bt2020
- ) {
- compatible_brands.push(b"cud1");
- } else if matches!(
- colorimetry.primaries(),
- gst_video::VideoColorPrimaries::Bt2020
- ) && matches!(
- colorimetry.transfer(),
- gst_video::VideoTransferFunction::Smpte2084
- ) && matches!(
- colorimetry.matrix(),
- gst_video::VideoColorMatrix::Bt2020
- ) {
- compatible_brands.push(b"chd1");
- } else if matches!(
- colorimetry.primaries(),
- gst_video::VideoColorPrimaries::Bt2020
- ) && matches!(
- colorimetry.transfer(),
- gst_video::VideoTransferFunction::AribStdB67
- ) && matches!(
- colorimetry.matrix(),
- gst_video::VideoColorMatrix::Bt2020
- ) {
- compatible_brands.push(b"clg1");
- }
- } else {
- // Assume it's OK
- compatible_brands.push(b"cud1");
- }
- }
- #[cfg(not(feature = "v1_18"))]
- {
- // Assume it's OK
- compatible_brands.push(b"cud1");
- }
- }
- }
- }
- _ => (),
- }
-}
-
-fn brands_from_variant_and_caps<'a>(
- variant: super::Variant,
- mut caps: impl Iterator<Item = &'a gst::Caps>,
-) -> (&'static [u8; 4], Vec<&'static [u8; 4]>) {
- match variant {
- super::Variant::ISO | super::Variant::ONVIF => (b"iso6", vec![b"iso6"]),
- super::Variant::DASH => {
- // FIXME: `dsms` / `dash` brands, `msix`
- (b"msdh", vec![b"dums", b"msdh", b"iso6"])
- }
- super::Variant::CMAF => {
- let mut compatible_brands = vec![b"iso6", b"cmfc"];
-
- cmaf_brands_from_caps(caps.next().unwrap(), &mut compatible_brands);
- assert_eq!(caps.next(), None);
-
- (b"cmf2", compatible_brands)
- }
- }
-}
-
-/// Creates `ftyp` and `moov` boxes
-pub(super) fn create_fmp4_header(cfg: super::HeaderConfiguration) -> Result<gst::Buffer, Error> {
- let mut v = vec![];
-
- let (brand, compatible_brands) = brands_from_variant_and_caps(cfg.variant, cfg.streams.iter());
-
- write_box(&mut v, b"ftyp", |v| {
- // major brand
- v.extend(brand);
- // minor version
- v.extend(0u32.to_be_bytes());
- // compatible brands
- v.extend(compatible_brands.into_iter().flatten());
-
- Ok(())
- })?;
-
- write_box(&mut v, b"moov", |v| write_moov(v, &cfg))?;
-
- if cfg.variant == super::Variant::ONVIF {
- write_full_box(
- &mut v,
- b"meta",
- FULL_BOX_VERSION_0,
- FULL_BOX_FLAGS_NONE,
- |v| {
- write_full_box(v, b"hdlr", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| {
- // Handler type
- v.extend(b"null");
-
- // Reserved
- v.extend([0u8; 3 * 4]);
-
- // Name
- v.extend(b"MetadataHandler");
-
- Ok(())
- })?;
-
- write_box(v, b"cstb", |v| {
- // entry count
- v.extend(1u32.to_be_bytes());
-
- // track id
- v.extend(0u32.to_be_bytes());
-
- // start UTC time in 100ns units since Jan 1 1601
- v.extend(cfg.start_utc_time.unwrap().to_be_bytes());
-
- Ok(())
- })
- },
- )?;
- }
-
- Ok(gst::Buffer::from_mut_slice(v))
-}
-
-fn write_moov(v: &mut Vec<u8>, cfg: &super::HeaderConfiguration) -> Result<(), Error> {
- use gst::glib;
-
- let base = glib::DateTime::from_utc(1904, 1, 1, 0, 0, 0.0)?;
- let now = glib::DateTime::now_utc()?;
- let creation_time =
- u64::try_from(now.difference(&base).as_seconds()).expect("time before 1904");
-
- write_full_box(v, b"mvhd", FULL_BOX_VERSION_1, FULL_BOX_FLAGS_NONE, |v| {
- write_mvhd(v, cfg, creation_time)
- })?;
- for (idx, caps) in cfg.streams.iter().enumerate() {
- write_box(v, b"trak", |v| {
- let mut references = vec![];
-
- // Reference the video track for ONVIF metadata tracks
- if cfg.variant == super::Variant::ONVIF
- && caps.structure(0).unwrap().name() == "application/x-onvif-metadata"
- {
- // Find the first video track
- for (idx, caps) in cfg.streams.iter().enumerate() {
- let s = caps.structure(0).unwrap();
-
- if matches!(s.name(), "video/x-h264" | "video/x-h265" | "image/jpeg") {
- references.push(TrackReference {
- reference_type: *b"cdsc",
- track_ids: vec![idx as u32 + 1],
- });
- break;
- }
- }
- }
-
- write_trak(v, cfg, idx, caps, creation_time, &references)
- })?;
- }
- write_box(v, b"mvex", |v| write_mvex(v, cfg))?;
-
- Ok(())
-}
-
-fn caps_to_timescale(caps: &gst::CapsRef) -> u32 {
- let s = caps.structure(0).unwrap();
-
- if let Ok(fps) = s.get::<gst::Fraction>("framerate") {
- if fps.numer() == 0 {
- return 10_000;
- }
-
- if fps.denom() != 1 && fps.denom() != 1001 {
- if let Some(fps) = (fps.denom() as u64)
- .nseconds()
- .mul_div_round(1_000_000_000, fps.numer() as u64)
- .and_then(gst_video::guess_framerate)
- {
- return (fps.numer() as u32)
- .mul_div_round(100, fps.denom() as u32)
- .unwrap_or(10_000);
- }
- }
-
- (fps.numer() as u32)
- .mul_div_round(100, fps.denom() as u32)
- .unwrap_or(10_000)
- } else if let Ok(rate) = s.get::<i32>("rate") {
- rate as u32
- } else {
- 10_000
- }
-}
-
-fn write_mvhd(
- v: &mut Vec<u8>,
- cfg: &super::HeaderConfiguration,
- creation_time: u64,
-) -> Result<(), Error> {
- // Creation time
- v.extend(creation_time.to_be_bytes());
- // Modification time
- v.extend(creation_time.to_be_bytes());
- // Timescale: uses the reference track timescale
- v.extend(caps_to_timescale(&cfg.streams[0]).to_be_bytes());
- // Duration
- v.extend(0u64.to_be_bytes());
-
- // Rate 1.0
- v.extend((1u32 << 16).to_be_bytes());
- // Volume 1.0
- v.extend((1u16 << 8).to_be_bytes());
- // Reserved
- v.extend([0u8; 2 + 2 * 4]);
-
- // Matrix
- v.extend(
- [
- (1u32 << 16).to_be_bytes(),
- 0u32.to_be_bytes(),
- 0u32.to_be_bytes(),
- 0u32.to_be_bytes(),
- (1u32 << 16).to_be_bytes(),
- 0u32.to_be_bytes(),
- 0u32.to_be_bytes(),
- 0u32.to_be_bytes(),
- (16384u32 << 16).to_be_bytes(),
- ]
- .into_iter()
- .flatten(),
- );
-
- // Pre defined
- v.extend([0u8; 6 * 4]);
-
- // Next track id
- v.extend((cfg.streams.len() as u32 + 1).to_be_bytes());
-
- Ok(())
-}
-
-const TKHD_FLAGS_TRACK_ENABLED: u32 = 0x1;
-const TKHD_FLAGS_TRACK_IN_MOVIE: u32 = 0x2;
-const TKHD_FLAGS_TRACK_IN_PREVIEW: u32 = 0x4;
-
-struct TrackReference {
- reference_type: [u8; 4],
- track_ids: Vec<u32>,
-}
-
-fn write_trak(
- v: &mut Vec<u8>,
- cfg: &super::HeaderConfiguration,
- idx: usize,
- caps: &gst::CapsRef,
- creation_time: u64,
- references: &[TrackReference],
-) -> Result<(), Error> {
- write_full_box(
- v,
- b"tkhd",
- FULL_BOX_VERSION_1,
- TKHD_FLAGS_TRACK_ENABLED | TKHD_FLAGS_TRACK_IN_MOVIE | TKHD_FLAGS_TRACK_IN_PREVIEW,
- |v| write_tkhd(v, cfg, idx, caps, creation_time),
- )?;
-
- // TODO: write edts if necessary: for audio tracks to remove initialization samples
- // TODO: write edts optionally for negative DTS instead of offsetting the DTS
-
- write_box(v, b"mdia", |v| write_mdia(v, cfg, caps, creation_time))?;
-
- if !references.is_empty() {
- write_box(v, b"tref", |v| write_tref(v, cfg, references))?;
- }
-
- Ok(())
-}
-
-fn write_tkhd(
- v: &mut Vec<u8>,
- _cfg: &super::HeaderConfiguration,
- idx: usize,
- caps: &gst::CapsRef,
- creation_time: u64,
-) -> Result<(), Error> {
- // Creation time
- v.extend(creation_time.to_be_bytes());
- // Modification time
- v.extend(creation_time.to_be_bytes());
- // Track ID
- v.extend((idx as u32 + 1).to_be_bytes());
- // Reserved
- v.extend(0u32.to_be_bytes());
- // Duration
- v.extend(0u64.to_be_bytes());
-
- // Reserved
- v.extend([0u8; 2 * 4]);
-
- // Layer
- v.extend(0u16.to_be_bytes());
- // Alternate group
- v.extend(0u16.to_be_bytes());
-
- // Volume
- let s = caps.structure(0).unwrap();
- match s.name() {
- "audio/mpeg" | "audio/x-alaw" | "audio/x-mulaw" | "audio/x-adpcm" => {
- v.extend((1u16 << 8).to_be_bytes())
- }
- _ => v.extend(0u16.to_be_bytes()),
- }
-
- // Reserved
- v.extend([0u8; 2]);
-
- // Matrix
- v.extend(
- [
- (1u32 << 16).to_be_bytes(),
- 0u32.to_be_bytes(),
- 0u32.to_be_bytes(),
- 0u32.to_be_bytes(),
- (1u32 << 16).to_be_bytes(),
- 0u32.to_be_bytes(),
- 0u32.to_be_bytes(),
- 0u32.to_be_bytes(),
- (16384u32 << 16).to_be_bytes(),
- ]
- .into_iter()
- .flatten(),
- );
-
- // Width/height
- match s.name() {
- "video/x-h264" | "video/x-h265" | "image/jpeg" => {
- let width = s.get::<i32>("width").context("video caps without width")? as u32;
- let height = s
- .get::<i32>("height")
- .context("video caps without height")? as u32;
- let par = s
- .get::<gst::Fraction>("pixel-aspect-ratio")
- .unwrap_or_else(|_| gst::Fraction::new(1, 1));
-
- let width = std::cmp::min(
- width
- .mul_div_round(par.numer() as u32, par.denom() as u32)
- .unwrap_or(u16::MAX as u32),
- u16::MAX as u32,
- );
- let height = std::cmp::min(height, u16::MAX as u32);
-
- v.extend((width << 16).to_be_bytes());
- v.extend((height << 16).to_be_bytes());
- }
- _ => v.extend([0u8; 2 * 4]),
- }
-
- Ok(())
-}
-
-fn write_mdia(
- v: &mut Vec<u8>,
- cfg: &super::HeaderConfiguration,
- caps: &gst::CapsRef,
- creation_time: u64,
-) -> Result<(), Error> {
- write_full_box(v, b"mdhd", FULL_BOX_VERSION_1, FULL_BOX_FLAGS_NONE, |v| {
- write_mdhd(v, cfg, caps, creation_time)
- })?;
- write_full_box(v, b"hdlr", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| {
- write_hdlr(v, cfg, caps)
- })?;
-
- // TODO: write elng if needed
-
- write_box(v, b"minf", |v| write_minf(v, cfg, caps))?;
-
- Ok(())
-}
-
-fn write_tref(
- v: &mut Vec<u8>,
- _cfg: &super::HeaderConfiguration,
- references: &[TrackReference],
-) -> Result<(), Error> {
- for reference in references {
- write_box(v, &reference.reference_type, |v| {
- for track_id in &reference.track_ids {
- v.extend(track_id.to_be_bytes());
- }
-
- Ok(())
- })?;
- }
-
- Ok(())
-}
-
-fn language_code(lang: impl std::borrow::Borrow<[u8; 3]>) -> u16 {
- let lang = lang.borrow();
-
- // TODO: Need to relax this once we get the language code from tags
- assert!(lang.iter().all(u8::is_ascii_lowercase));
-
- (((lang[0] as u16 - 0x60) & 0x1F) << 10)
- + (((lang[1] as u16 - 0x60) & 0x1F) << 5)
- + ((lang[2] as u16 - 0x60) & 0x1F)
-}
-
-fn write_mdhd(
- v: &mut Vec<u8>,
- _cfg: &super::HeaderConfiguration,
- caps: &gst::CapsRef,
- creation_time: u64,
-) -> Result<(), Error> {
- // Creation time
- v.extend(creation_time.to_be_bytes());
- // Modification time
- v.extend(creation_time.to_be_bytes());
- // Timescale
- v.extend(caps_to_timescale(caps).to_be_bytes());
- // Duration
- v.extend(0u64.to_be_bytes());
-
- // Language as ISO-639-2/T
- // TODO: get actual language from the tags
- v.extend(language_code(b"und").to_be_bytes());
-
- // Pre-defined
- v.extend([0u8; 2]);
-
- Ok(())
-}
-
-fn write_hdlr(
- v: &mut Vec<u8>,
- _cfg: &super::HeaderConfiguration,
- caps: &gst::CapsRef,
-) -> Result<(), Error> {
- // Pre-defined
- v.extend([0u8; 4]);
-
- let s = caps.structure(0).unwrap();
- let (handler_type, name) = match s.name() {
- "video/x-h264" | "video/x-h265" | "image/jpeg" => (b"vide", b"VideoHandler\0".as_slice()),
- "audio/mpeg" | "audio/x-alaw" | "audio/x-mulaw" | "audio/x-adpcm" => {
- (b"soun", b"SoundHandler\0".as_slice())
- }
- "application/x-onvif-metadata" => (b"meta", b"MetadataHandler\0".as_slice()),
- _ => unreachable!(),
- };
-
- // Handler type
- v.extend(handler_type);
-
- // Reserved
- v.extend([0u8; 3 * 4]);
-
- // Name
- v.extend(name);
-
- Ok(())
-}
-
-fn write_minf(
- v: &mut Vec<u8>,
- cfg: &super::HeaderConfiguration,
- caps: &gst::CapsRef,
-) -> Result<(), Error> {
- let s = caps.structure(0).unwrap();
-
- match s.name() {
- "video/x-h264" | "video/x-h265" | "image/jpeg" => {
- // Flags are always 1 for unspecified reasons
- write_full_box(v, b"vmhd", FULL_BOX_VERSION_0, 1, |v| write_vmhd(v, cfg))?
- }
- "audio/mpeg" | "audio/x-alaw" | "audio/x-mulaw" | "audio/x-adpcm" => {
- write_full_box(v, b"smhd", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| {
- write_smhd(v, cfg)
- })?
- }
- "application/x-onvif-metadata" => {
- write_full_box(v, b"nmhd", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |_v| {
- Ok(())
- })?
- }
- _ => unreachable!(),
- }
-
- write_box(v, b"dinf", |v| write_dinf(v, cfg))?;
-
- write_box(v, b"stbl", |v| write_stbl(v, cfg, caps))?;
-
- Ok(())
-}
-
-fn write_vmhd(v: &mut Vec<u8>, _cfg: &super::HeaderConfiguration) -> Result<(), Error> {
- // Graphics mode
- v.extend([0u8; 2]);
-
- // opcolor
- v.extend([0u8; 2 * 3]);
-
- Ok(())
-}
-
-fn write_smhd(v: &mut Vec<u8>, _cfg: &super::HeaderConfiguration) -> Result<(), Error> {
- // Balance
- v.extend([0u8; 2]);
-
- // Reserved
- v.extend([0u8; 2]);
-
- Ok(())
-}
-
-fn write_dinf(v: &mut Vec<u8>, cfg: &super::HeaderConfiguration) -> Result<(), Error> {
- write_full_box(v, b"dref", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| {
- write_dref(v, cfg)
- })?;
-
- Ok(())
-}
-
-const DREF_FLAGS_MEDIA_IN_SAME_FILE: u32 = 0x1;
-
-fn write_dref(v: &mut Vec<u8>, _cfg: &super::HeaderConfiguration) -> Result<(), Error> {
- // Entry count
- v.extend(1u32.to_be_bytes());
-
- write_full_box(
- v,
- b"url ",
- FULL_BOX_VERSION_0,
- DREF_FLAGS_MEDIA_IN_SAME_FILE,
- |_v| Ok(()),
- )?;
-
- Ok(())
-}
-
-fn write_stbl(
- v: &mut Vec<u8>,
- cfg: &super::HeaderConfiguration,
- caps: &gst::CapsRef,
-) -> Result<(), Error> {
- write_full_box(v, b"stsd", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| {
- write_stsd(v, cfg, caps)
- })?;
- write_full_box(v, b"stts", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| {
- write_stts(v, cfg)
- })?;
- write_full_box(v, b"stsc", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| {
- write_stsc(v, cfg)
- })?;
- write_full_box(v, b"stsz", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| {
- write_stsz(v, cfg)
- })?;
-
- write_full_box(v, b"stco", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| {
- write_stco(v, cfg)
- })?;
-
- // For video write a sync sample box as indication that not all samples are sync samples
- let s = caps.structure(0).unwrap();
- match s.name() {
- "video/x-h264" | "video/x-h265" => {
- write_full_box(v, b"stss", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| {
- write_stss(v, cfg)
- })?
- }
- _ => (),
- }
-
- Ok(())
-}
-
-fn write_stsd(
- v: &mut Vec<u8>,
- cfg: &super::HeaderConfiguration,
- caps: &gst::CapsRef,
-) -> Result<(), Error> {
- // Entry count
- v.extend(1u32.to_be_bytes());
-
- let s = caps.structure(0).unwrap();
- match s.name() {
- "video/x-h264" | "video/x-h265" | "image/jpeg" => write_visual_sample_entry(v, cfg, caps)?,
- "audio/mpeg" | "audio/x-alaw" | "audio/x-mulaw" | "audio/x-adpcm" => {
- write_audio_sample_entry(v, cfg, caps)?
- }
- "application/x-onvif-metadata" => write_xml_meta_data_sample_entry(v, cfg, caps)?,
- _ => unreachable!(),
- }
-
- Ok(())
-}
-
-fn write_sample_entry_box<T, F: FnOnce(&mut Vec<u8>) -> Result<T, Error>>(
- v: &mut Vec<u8>,
- fourcc: impl std::borrow::Borrow<[u8; 4]>,
- content_func: F,
-) -> Result<T, Error> {
- write_box(v, fourcc, move |v| {
- // Reserved
- v.extend([0u8; 6]);
-
- // Data reference index
- v.extend(1u16.to_be_bytes());
-
- content_func(v)
- })
-}
-
-fn write_visual_sample_entry(
- v: &mut Vec<u8>,
- _cfg: &super::HeaderConfiguration,
- caps: &gst::CapsRef,
-) -> Result<(), Error> {
- let s = caps.structure(0).unwrap();
- let fourcc = match s.name() {
- "video/x-h264" => {
- let stream_format = s.get::<&str>("stream-format").context("no stream-format")?;
- match stream_format {
- "avc" => b"avc1",
- "avc3" => b"avc3",
- _ => unreachable!(),
- }
- }
- "video/x-h265" => {
- let stream_format = s.get::<&str>("stream-format").context("no stream-format")?;
- match stream_format {
- "hvc1" => b"hvc1",
- "hev1" => b"hev1",
- _ => unreachable!(),
- }
- }
- "image/jpeg" => b"jpeg",
- _ => unreachable!(),
- };
-
- write_sample_entry_box(v, fourcc, move |v| {
- // pre-defined
- v.extend([0u8; 2]);
- // Reserved
- v.extend([0u8; 2]);
- // pre-defined
- v.extend([0u8; 3 * 4]);
-
- // Width
- let width =
- u16::try_from(s.get::<i32>("width").context("no width")?).context("too big width")?;
- v.extend(width.to_be_bytes());
-
- // Height
- let height = u16::try_from(s.get::<i32>("height").context("no height")?)
- .context("too big height")?;
- v.extend(height.to_be_bytes());
-
- // Horizontal resolution
- v.extend(0x00480000u32.to_be_bytes());
-
- // Vertical resolution
- v.extend(0x00480000u32.to_be_bytes());
-
- // Reserved
- v.extend([0u8; 4]);
-
- // Frame count
- v.extend(1u16.to_be_bytes());
-
- // Compressor name
- v.extend([0u8; 32]);
-
- // Depth
- v.extend(0x0018u16.to_be_bytes());
-
- // Pre-defined
- v.extend((-1i16).to_be_bytes());
-
- // Codec specific boxes
- match s.name() {
- "video/x-h264" => {
- let codec_data = s
- .get::<&gst::BufferRef>("codec_data")
- .context("no codec_data")?;
- let map = codec_data
- .map_readable()
- .context("codec_data not mappable")?;
- write_box(v, b"avcC", move |v| {
- v.extend_from_slice(&map);
- Ok(())
- })?;
- }
- "video/x-h265" => {
- let codec_data = s
- .get::<&gst::BufferRef>("codec_data")
- .context("no codec_data")?;
- let map = codec_data
- .map_readable()
- .context("codec_data not mappable")?;
- write_box(v, b"hvcC", move |v| {
- v.extend_from_slice(&map);
- Ok(())
- })?;
- }
- "image/jpeg" => {
- // Nothing to do here
- }
- _ => unreachable!(),
- }
-
- if let Ok(par) = s.get::<gst::Fraction>("pixel-aspect-ratio") {
- write_box(v, b"pasp", move |v| {
- v.extend((par.numer() as u32).to_be_bytes());
- v.extend((par.denom() as u32).to_be_bytes());
- Ok(())
- })?;
- }
-
- if let Some(colorimetry) = s
- .get::<&str>("colorimetry")
- .ok()
- .and_then(|c| c.parse::<gst_video::VideoColorimetry>().ok())
- {
- write_box(v, b"colr", move |v| {
- v.extend(b"nclx");
- let (primaries, transfer, matrix) = {
- #[cfg(feature = "v1_18")]
- {
- (
- (colorimetry.primaries().to_iso() as u16),
- (colorimetry.transfer().to_iso() as u16),
- (colorimetry.matrix().to_iso() as u16),
- )
- }
- #[cfg(not(feature = "v1_18"))]
- {
- let primaries = match colorimetry.primaries() {
- gst_video::VideoColorPrimaries::Bt709 => 1u16,
- gst_video::VideoColorPrimaries::Bt470m => 4u16,
- gst_video::VideoColorPrimaries::Bt470bg => 5u16,
- gst_video::VideoColorPrimaries::Smpte170m => 6u16,
- gst_video::VideoColorPrimaries::Smpte240m => 7u16,
- gst_video::VideoColorPrimaries::Film => 8u16,
- gst_video::VideoColorPrimaries::Bt2020 => 9u16,
- _ => 2,
- };
- let transfer = match colorimetry.transfer() {
- gst_video::VideoTransferFunction::Bt709 => 1u16,
- gst_video::VideoTransferFunction::Gamma22 => 4u16,
- gst_video::VideoTransferFunction::Gamma28 => 5u16,
- gst_video::VideoTransferFunction::Smpte240m => 7u16,
- gst_video::VideoTransferFunction::Gamma10 => 8u16,
- gst_video::VideoTransferFunction::Log100 => 9u16,
- gst_video::VideoTransferFunction::Log316 => 10u16,
- gst_video::VideoTransferFunction::Srgb => 13u16,
- gst_video::VideoTransferFunction::Bt202012 => 15u16,
- _ => 2,
- };
- let matrix = match colorimetry.matrix() {
- gst_video::VideoColorMatrix::Rgb => 0u16,
- gst_video::VideoColorMatrix::Bt709 => 1u16,
- gst_video::VideoColorMatrix::Fcc => 4u16,
- gst_video::VideoColorMatrix::Bt601 => 6u16,
- gst_video::VideoColorMatrix::Smpte240m => 7u16,
- gst_video::VideoColorMatrix::Bt2020 => 9u16,
- _ => 2,
- };
-
- (primaries, transfer, matrix)
- }
- };
-
- let full_range = match colorimetry.range() {
- gst_video::VideoColorRange::Range0_255 => 0x80u8,
- gst_video::VideoColorRange::Range16_235 => 0x00u8,
- _ => 0x00,
- };
-
- v.extend(primaries.to_be_bytes());
- v.extend(transfer.to_be_bytes());
- v.extend(matrix.to_be_bytes());
- v.push(full_range);
-
- Ok(())
- })?;
- }
-
- #[cfg(feature = "v1_18")]
- {
- if let Ok(cll) = gst_video::VideoContentLightLevel::from_caps(caps) {
- write_box(v, b"clli", move |v| {
- v.extend((cll.max_content_light_level() as u16).to_be_bytes());
- v.extend((cll.max_frame_average_light_level() as u16).to_be_bytes());
- Ok(())
- })?;
- }
-
- if let Ok(mastering) = gst_video::VideoMasteringDisplayInfo::from_caps(caps) {
- write_box(v, b"mdcv", move |v| {
- for primary in mastering.display_primaries() {
- v.extend(primary.x.to_be_bytes());
- v.extend(primary.y.to_be_bytes());
- }
- v.extend(mastering.white_point().x.to_be_bytes());
- v.extend(mastering.white_point().y.to_be_bytes());
- v.extend(mastering.max_display_mastering_luminance().to_be_bytes());
- v.extend(mastering.max_display_mastering_luminance().to_be_bytes());
- Ok(())
- })?;
- }
- }
-
- // Write fiel box for codecs that require it
- if ["image/jpeg"].contains(&s.name()) {
- let interlace_mode = s
- .get::<&str>("interlace-mode")
- .ok()
- .map(gst_video::VideoInterlaceMode::from_string)
- .unwrap_or(gst_video::VideoInterlaceMode::Progressive);
- let field_order = s
- .get::<&str>("field-order")
- .ok()
- .map(gst_video::VideoFieldOrder::from_string)
- .unwrap_or(gst_video::VideoFieldOrder::Unknown);
-
- write_box(v, b"fiel", move |v| {
- let (interlace, field_order) = match interlace_mode {
- gst_video::VideoInterlaceMode::Progressive => (1, 0),
- gst_video::VideoInterlaceMode::Interleaved
- if field_order == gst_video::VideoFieldOrder::TopFieldFirst =>
- {
- (2, 9)
- }
- gst_video::VideoInterlaceMode::Interleaved => (2, 14),
- _ => (0, 0),
- };
-
- v.push(interlace);
- v.push(field_order);
- Ok(())
- })?;
- }
-
- // TODO: write btrt bitrate box based on tags
-
- Ok(())
- })?;
-
- Ok(())
-}
-
-fn write_audio_sample_entry(
- v: &mut Vec<u8>,
- _cfg: &super::HeaderConfiguration,
- caps: &gst::CapsRef,
-) -> Result<(), Error> {
- let s = caps.structure(0).unwrap();
- let fourcc = match s.name() {
- "audio/mpeg" => b"mp4a",
- "audio/x-alaw" => b"alaw",
- "audio/x-mulaw" => b"ulaw",
- "audio/x-adpcm" => {
- let layout = s.get::<&str>("layout").context("no ADPCM layout field")?;
-
- match layout {
- "g726" => b"ms\x00\x45",
- _ => unreachable!(),
- }
- }
- _ => unreachable!(),
- };
-
- let sample_size = match s.name() {
- "audio/x-adpcm" => {
- let bitrate = s.get::<i32>("bitrate").context("no ADPCM bitrate field")?;
- (bitrate / 8000) as u16
- }
- _ => 16u16,
- };
-
- write_sample_entry_box(v, fourcc, move |v| {
- // Reserved
- v.extend([0u8; 2 * 4]);
-
- // Channel count
- let channels = u16::try_from(s.get::<i32>("channels").context("no channels")?)
- .context("too many channels")?;
- v.extend(channels.to_be_bytes());
-
- // Sample size
- v.extend(sample_size.to_be_bytes());
-
- // Pre-defined
- v.extend([0u8; 2]);
-
- // Reserved
- v.extend([0u8; 2]);
-
- // Sample rate
- let rate = u16::try_from(s.get::<i32>("rate").context("no rate")?).unwrap_or(0);
- v.extend((u32::from(rate) << 16).to_be_bytes());
-
- // Codec specific boxes
- match s.name() {
- "audio/mpeg" => {
- let codec_data = s
- .get::<&gst::BufferRef>("codec_data")
- .context("no codec_data")?;
- let map = codec_data
- .map_readable()
- .context("codec_data not mappable")?;
- if map.len() < 2 {
- bail!("too small codec_data");
- }
- write_esds_aac(v, &map)?;
- }
- "audio/x-alaw" | "audio/x-mulaw" | "audio/x-adpcm" => {
- // Nothing to do here
- }
- _ => unreachable!(),
- }
-
- // If rate did not fit into 16 bits write a full `srat` box
- if rate == 0 {
- let rate = s.get::<i32>("rate").context("no rate")?;
- // FIXME: This is defined as full box?
- write_full_box(
- v,
- b"srat",
- FULL_BOX_VERSION_0,
- FULL_BOX_FLAGS_NONE,
- move |v| {
- v.extend((rate as u32).to_be_bytes());
- Ok(())
- },
- )?;
- }
-
- // TODO: write btrt bitrate box based on tags
-
- // TODO: chnl box for channel ordering? probably not needed for AAC
-
- Ok(())
- })?;
-
- Ok(())
-}
-
-fn write_esds_aac(v: &mut Vec<u8>, codec_data: &[u8]) -> Result<(), Error> {
- let calculate_len = |mut len| {
- if len > 260144641 {
- bail!("too big descriptor length");
- }
-
- if len == 0 {
- return Ok(([0; 4], 1));
- }
-
- let mut idx = 0;
- let mut lens = [0u8; 4];
- while len > 0 {
- lens[idx] = ((if len > 0x7f { 0x80 } else { 0x00 }) | (len & 0x7f)) as u8;
- idx += 1;
- len >>= 7;
- }
-
- Ok((lens, idx))
- };
-
- write_full_box(
- v,
- b"esds",
- FULL_BOX_VERSION_0,
- FULL_BOX_FLAGS_NONE,
- move |v| {
- // Calculate all lengths bottom up
-
- // Decoder specific info
- let decoder_specific_info_len = calculate_len(codec_data.len())?;
-
- // Decoder config
- let decoder_config_len =
- calculate_len(13 + 1 + decoder_specific_info_len.1 + codec_data.len())?;
-
- // SL config
- let sl_config_len = calculate_len(1)?;
-
- // ES descriptor
- let es_descriptor_len = calculate_len(
- 3 + 1
- + decoder_config_len.1
- + 13
- + 1
- + decoder_specific_info_len.1
- + codec_data.len()
- + 1
- + sl_config_len.1
- + 1,
- )?;
-
- // ES descriptor tag
- v.push(0x03);
-
- // Length
- v.extend_from_slice(&es_descriptor_len.0[..(es_descriptor_len.1)]);
-
- // Track ID
- v.extend(1u16.to_be_bytes());
- // Flags
- v.push(0u8);
-
- // Decoder config descriptor
- v.push(0x04);
-
- // Length
- v.extend_from_slice(&decoder_config_len.0[..(decoder_config_len.1)]);
-
- // Object type ESDS_OBJECT_TYPE_MPEG4_P3
- v.push(0x40);
- // Stream type ESDS_STREAM_TYPE_AUDIO
- v.push((0x05 << 2) | 0x01);
-
- // Buffer size db?
- v.extend([0u8; 3]);
-
- // Max bitrate
- v.extend(0u32.to_be_bytes());
-
- // Avg bitrate
- v.extend(0u32.to_be_bytes());
-
- // Decoder specific info
- v.push(0x05);
-
- // Length
- v.extend_from_slice(&decoder_specific_info_len.0[..(decoder_specific_info_len.1)]);
- v.extend_from_slice(codec_data);
-
- // SL config descriptor
- v.push(0x06);
-
- // Length: 1 (tag) + 1 (length) + 1 (predefined)
- v.extend_from_slice(&sl_config_len.0[..(sl_config_len.1)]);
-
- // Predefined
- v.push(0x02);
- Ok(())
- },
- )
-}
-
-fn write_xml_meta_data_sample_entry(
- v: &mut Vec<u8>,
- _cfg: &super::HeaderConfiguration,
- caps: &gst::CapsRef,
-) -> Result<(), Error> {
- let s = caps.structure(0).unwrap();
- let namespace = match s.name() {
- "application/x-onvif-metadata" => b"http://www.onvif.org/ver10/schema",
- _ => unreachable!(),
- };
-
- write_sample_entry_box(v, b"metx", move |v| {
- // content_encoding, empty string
- v.push(0);
-
- // namespace
- v.extend_from_slice(namespace);
- v.push(0);
-
- // schema_location, empty string list
- v.push(0);
-
- Ok(())
- })?;
-
- Ok(())
-}
-
-fn write_stts(v: &mut Vec<u8>, _cfg: &super::HeaderConfiguration) -> Result<(), Error> {
- // Entry count
- v.extend(0u32.to_be_bytes());
-
- Ok(())
-}
-
-fn write_stsc(v: &mut Vec<u8>, _cfg: &super::HeaderConfiguration) -> Result<(), Error> {
- // Entry count
- v.extend(0u32.to_be_bytes());
-
- Ok(())
-}
-
-fn write_stsz(v: &mut Vec<u8>, _cfg: &super::HeaderConfiguration) -> Result<(), Error> {
- // Sample size
- v.extend(0u32.to_be_bytes());
-
- // Sample count
- v.extend(0u32.to_be_bytes());
-
- Ok(())
-}
-
-fn write_stco(v: &mut Vec<u8>, _cfg: &super::HeaderConfiguration) -> Result<(), Error> {
- // Entry count
- v.extend(0u32.to_be_bytes());
-
- Ok(())
-}
-
-fn write_stss(v: &mut Vec<u8>, _cfg: &super::HeaderConfiguration) -> Result<(), Error> {
- // Entry count
- v.extend(0u32.to_be_bytes());
-
- Ok(())
-}
-
-fn write_mvex(v: &mut Vec<u8>, cfg: &super::HeaderConfiguration) -> Result<(), Error> {
- if cfg.write_mehd {
- if cfg.update && cfg.duration.is_some() {
- write_full_box(v, b"mehd", FULL_BOX_VERSION_1, FULL_BOX_FLAGS_NONE, |v| {
- write_mehd(v, cfg)
- })?;
- } else {
- write_box(v, b"free", |v| {
- // version/flags of full box
- v.extend(0u32.to_be_bytes());
- // mehd duration
- v.extend(0u64.to_be_bytes());
-
- Ok(())
- })?;
- }
- }
-
- for (idx, _caps) in cfg.streams.iter().enumerate() {
- write_full_box(v, b"trex", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| {
- write_trex(v, cfg, idx)
- })?;
- }
-
- Ok(())
-}
-
-fn write_mehd(v: &mut Vec<u8>, cfg: &super::HeaderConfiguration) -> Result<(), Error> {
- // Use the reference track timescale
- let timescale = caps_to_timescale(&cfg.streams[0]);
-
- let duration = cfg
- .duration
- .expect("no duration")
- .mul_div_ceil(timescale as u64, gst::ClockTime::SECOND.nseconds())
- .context("too long duration")?;
-
- // Media duration in mvhd.timescale units
- v.extend(duration.to_be_bytes());
-
- Ok(())
-}
-
-fn write_trex(v: &mut Vec<u8>, _cfg: &super::HeaderConfiguration, idx: usize) -> Result<(), Error> {
- // Track ID
- v.extend((idx as u32 + 1).to_be_bytes());
-
- // Default sample description index
- v.extend(1u32.to_be_bytes());
-
- // Default sample duration
- v.extend(0u32.to_be_bytes());
-
- // Default sample size
- v.extend(0u32.to_be_bytes());
-
- // Default sample flags
- v.extend(0u32.to_be_bytes());
-
- // Default sample duration/size/etc will be provided in the traf/trun if one can be determined
- // for a whole fragment
-
- Ok(())
-}
-
-/// Creates `styp` and `moof` boxes and `mdat` header
-pub(super) fn create_fmp4_fragment_header(
- cfg: super::FragmentHeaderConfiguration,
-) -> Result<(gst::Buffer, u64), Error> {
- let mut v = vec![];
-
- let (brand, compatible_brands) =
- brands_from_variant_and_caps(cfg.variant, cfg.streams.iter().map(|s| &s.0));
-
- write_box(&mut v, b"styp", |v| {
- // major brand
- v.extend(brand);
- // minor version
- v.extend(0u32.to_be_bytes());
- // compatible brands
- v.extend(compatible_brands.into_iter().flatten());
-
- Ok(())
- })?;
-
- let styp_len = v.len();
-
- let data_offset_offsets = write_box(&mut v, b"moof", |v| write_moof(v, &cfg))?;
-
- let size = cfg
- .buffers
- .iter()
- .map(|buffer| buffer.buffer.size() as u64)
- .sum::<u64>();
- if let Ok(size) = u32::try_from(size + 8) {
- v.extend(size.to_be_bytes());
- v.extend(b"mdat");
- } else {
- v.extend(1u32.to_be_bytes());
- v.extend(b"mdat");
- v.extend((size + 16).to_be_bytes());
- }
-
- let data_offset = v.len() - styp_len;
- for data_offset_offset in data_offset_offsets {
- let val = u32::from_be_bytes(v[data_offset_offset..][..4].try_into()?)
- .checked_add(u32::try_from(data_offset)?)
- .ok_or_else(|| anyhow!("can't calculate track run data offset"))?;
- v[data_offset_offset..][..4].copy_from_slice(&val.to_be_bytes());
- }
-
- Ok((gst::Buffer::from_mut_slice(v), styp_len as u64))
-}
-
-fn write_moof(
- v: &mut Vec<u8>,
- cfg: &super::FragmentHeaderConfiguration,
-) -> Result<Vec<usize>, Error> {
- write_full_box(v, b"mfhd", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| {
- write_mfhd(v, cfg)
- })?;
-
- let mut data_offset_offsets = vec![];
- for (idx, (caps, timing_info)) in cfg.streams.iter().enumerate() {
- // Skip tracks without any buffers for this fragment.
- let timing_info = match timing_info {
- None => continue,
- Some(ref timing_info) => timing_info,
- };
-
- write_box(v, b"traf", |v| {
- write_traf(v, cfg, &mut data_offset_offsets, idx, caps, timing_info)
- })?;
- }
-
- Ok(data_offset_offsets)
-}
-
-fn write_mfhd(v: &mut Vec<u8>, cfg: &super::FragmentHeaderConfiguration) -> Result<(), Error> {
- v.extend(cfg.sequence_number.to_be_bytes());
-
- Ok(())
-}
-
-#[allow(clippy::identity_op)]
-fn sample_flags_from_buffer(
- timing_info: &super::FragmentTimingInfo,
- buffer: &gst::BufferRef,
-) -> u32 {
- if timing_info.intra_only {
- (0b00u32 << (16 + 10)) | // leading: unknown
- (0b10u32 << (16 + 8)) | // depends: no
- (0b10u32 << (16 + 6)) | // depended: no
- (0b00u32 << (16 + 4)) | // redundancy: unknown
- (0b000u32 << (16 + 1)) | // padding: no
- (0b0u32 << 16) | // non-sync-sample: no
- (0u32) // degradation priority
- } else {
- let depends = if buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) {
- 0b01u32
- } else {
- 0b10u32
- };
- let depended = if buffer.flags().contains(gst::BufferFlags::DROPPABLE) {
- 0b10u32
- } else {
- 0b00u32
- };
- let non_sync_sample = if buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) {
- 0b1u32
- } else {
- 0b0u32
- };
-
- (0b00u32 << (16 + 10)) | // leading: unknown
- (depends << (16 + 8)) | // depends
- (depended << (16 + 6)) | // depended
- (0b00u32 << (16 + 4)) | // redundancy: unknown
- (0b000u32 << (16 + 1)) | // padding: no
- (non_sync_sample << 16) | // non-sync-sample
- (0u32) // degradation priority
- }
-}
-
-const DEFAULT_SAMPLE_DURATION_PRESENT: u32 = 0x08;
-const DEFAULT_SAMPLE_SIZE_PRESENT: u32 = 0x10;
-const DEFAULT_SAMPLE_FLAGS_PRESENT: u32 = 0x20;
-const DEFAULT_BASE_IS_MOOF: u32 = 0x2_00_00;
-
-const DATA_OFFSET_PRESENT: u32 = 0x0_01;
-const FIRST_SAMPLE_FLAGS_PRESENT: u32 = 0x0_04;
-const SAMPLE_DURATION_PRESENT: u32 = 0x1_00;
-const SAMPLE_SIZE_PRESENT: u32 = 0x2_00;
-const SAMPLE_FLAGS_PRESENT: u32 = 0x4_00;
-const SAMPLE_COMPOSITION_TIME_OFFSET_PRESENT: u32 = 0x8_00;
-
-#[allow(clippy::type_complexity)]
-fn analyze_buffers(
- cfg: &super::FragmentHeaderConfiguration,
- idx: usize,
- timing_info: &super::FragmentTimingInfo,
- timescale: u32,
-) -> Result<
- (
- // tf_flags
- u32,
- // tr_flags
- u32,
- // default size
- Option<u32>,
- // default duration
- Option<u32>,
- // default flags
- Option<u32>,
- // negative composition time offsets
- bool,
- ),
- Error,
-> {
- let mut tf_flags = DEFAULT_BASE_IS_MOOF;
- let mut tr_flags = DATA_OFFSET_PRESENT;
-
- let mut duration = None;
- let mut size = None;
- let mut first_buffer_flags = None;
- let mut flags = None;
-
- let mut negative_composition_time_offsets = false;
-
- for Buffer {
- idx: _idx,
- buffer,
- timestamp: _timestamp,
- duration: sample_duration,
- composition_time_offset,
- } in cfg.buffers.iter().filter(|b| b.idx == idx)
- {
- if size.is_none() {
- size = Some(buffer.size() as u32);
- }
- if Some(buffer.size() as u32) != size {
- tr_flags |= SAMPLE_SIZE_PRESENT;
- }
-
- let sample_duration = u32::try_from(
- sample_duration
- .nseconds()
- .mul_div_round(timescale as u64, gst::ClockTime::SECOND.nseconds())
- .context("too big sample duration")?,
- )
- .context("too big sample duration")?;
-
- if duration.is_none() {
- duration = Some(sample_duration);
- }
- if Some(sample_duration) != duration {
- tr_flags |= SAMPLE_DURATION_PRESENT;
- }
-
- let f = sample_flags_from_buffer(timing_info, buffer);
- if first_buffer_flags.is_none() {
- first_buffer_flags = Some(f);
- } else {
- flags = Some(f);
- if Some(f) != first_buffer_flags {
- tr_flags |= FIRST_SAMPLE_FLAGS_PRESENT;
- }
- }
-
- if flags.is_some() && Some(f) != flags {
- tr_flags &= !FIRST_SAMPLE_FLAGS_PRESENT;
- tr_flags |= SAMPLE_FLAGS_PRESENT;
- }
-
- if let Some(composition_time_offset) = *composition_time_offset {
- assert!(!timing_info.intra_only);
- if composition_time_offset != 0 {
- tr_flags |= SAMPLE_COMPOSITION_TIME_OFFSET_PRESENT;
- }
- if composition_time_offset < 0 {
- negative_composition_time_offsets = true;
- }
- }
- }
-
- if (tr_flags & SAMPLE_SIZE_PRESENT) == 0 {
- tf_flags |= DEFAULT_SAMPLE_SIZE_PRESENT;
- } else {
- size = None;
- }
-
- if (tr_flags & SAMPLE_DURATION_PRESENT) == 0 {
- tf_flags |= DEFAULT_SAMPLE_DURATION_PRESENT;
- } else {
- duration = None;
- }
-
- // If there is only a single buffer use its flags as default sample flags
- // instead of first sample flags.
- if flags.is_none() && first_buffer_flags.is_some() {
- tr_flags &= !FIRST_SAMPLE_FLAGS_PRESENT;
- flags = first_buffer_flags.take();
- }
-
- if (tr_flags & SAMPLE_FLAGS_PRESENT) == 0 {
- tf_flags |= DEFAULT_SAMPLE_FLAGS_PRESENT;
- } else {
- flags = None;
- }
-
- Ok((
- tf_flags,
- tr_flags,
- size,
- duration,
- flags,
- negative_composition_time_offsets,
- ))
-}
-
-#[allow(clippy::ptr_arg)]
-fn write_traf(
- v: &mut Vec<u8>,
- cfg: &super::FragmentHeaderConfiguration,
- data_offset_offsets: &mut Vec<usize>,
- idx: usize,
- caps: &gst::CapsRef,
- timing_info: &super::FragmentTimingInfo,
-) -> Result<(), Error> {
- let timescale = caps_to_timescale(caps);
-
- // Analyze all buffers to know what values can be put into the tfhd for all samples and what
- // has to be stored for every single sample
- let (
- tf_flags,
- tr_flags,
- default_size,
- default_duration,
- default_flags,
- negative_composition_time_offsets,
- ) = analyze_buffers(cfg, idx, timing_info, timescale)?;
-
- assert!((tf_flags & DEFAULT_SAMPLE_SIZE_PRESENT == 0) ^ default_size.is_some());
- assert!((tf_flags & DEFAULT_SAMPLE_DURATION_PRESENT == 0) ^ default_duration.is_some());
- assert!((tf_flags & DEFAULT_SAMPLE_FLAGS_PRESENT == 0) ^ default_flags.is_some());
-
- write_full_box(v, b"tfhd", FULL_BOX_VERSION_0, tf_flags, |v| {
- write_tfhd(v, cfg, idx, default_size, default_duration, default_flags)
- })?;
- write_full_box(v, b"tfdt", FULL_BOX_VERSION_1, FULL_BOX_FLAGS_NONE, |v| {
- write_tfdt(v, cfg, idx, timing_info, timescale)
- })?;
-
- let mut current_data_offset = 0;
-
- for run in GroupBy::new(cfg.buffers, |a: &Buffer, b: &Buffer| a.idx == b.idx) {
- if run[0].idx != idx {
- // FIXME: What to do with >4GB offsets?
- current_data_offset = (current_data_offset as u64
- + run.iter().map(|b| b.buffer.size() as u64).sum::<u64>())
- .try_into()?;
- continue;
- }
-
- let data_offset_offset = write_full_box(
- v,
- b"trun",
- if negative_composition_time_offsets {
- FULL_BOX_VERSION_1
- } else {
- FULL_BOX_VERSION_0
- },
- tr_flags,
- |v| {
- write_trun(
- v,
- cfg,
- current_data_offset,
- tr_flags,
- timescale,
- timing_info,
- run,
- )
- },
- )?;
- data_offset_offsets.push(data_offset_offset);
-
- // FIXME: What to do with >4GB offsets?
- current_data_offset = (current_data_offset as u64
- + run.iter().map(|b| b.buffer.size() as u64).sum::<u64>())
- .try_into()?;
- }
-
- // TODO: saio, saiz, sbgp, sgpd, subs?
-
- Ok(())
-}
-
-fn write_tfhd(
- v: &mut Vec<u8>,
- _cfg: &super::FragmentHeaderConfiguration,
- idx: usize,
- default_size: Option<u32>,
- default_duration: Option<u32>,
- default_flags: Option<u32>,
-) -> Result<(), Error> {
- // Track ID
- v.extend((idx as u32 + 1).to_be_bytes());
-
- // No base data offset, no sample description index
-
- if let Some(default_duration) = default_duration {
- v.extend(default_duration.to_be_bytes());
- }
-
- if let Some(default_size) = default_size {
- v.extend(default_size.to_be_bytes());
- }
-
- if let Some(default_flags) = default_flags {
- v.extend(default_flags.to_be_bytes());
- }
-
- Ok(())
-}
-
-fn write_tfdt(
- v: &mut Vec<u8>,
- _cfg: &super::FragmentHeaderConfiguration,
- _idx: usize,
- timing_info: &super::FragmentTimingInfo,
- timescale: u32,
-) -> Result<(), Error> {
- let base_time = timing_info
- .start_time
- .mul_div_floor(timescale as u64, gst::ClockTime::SECOND.nseconds())
- .context("base time overflow")?;
-
- v.extend(base_time.to_be_bytes());
-
- Ok(())
-}
-
-#[allow(clippy::too_many_arguments)]
-fn write_trun(
- v: &mut Vec<u8>,
- _cfg: &super::FragmentHeaderConfiguration,
- current_data_offset: u32,
- tr_flags: u32,
- timescale: u32,
- timing_info: &super::FragmentTimingInfo,
- buffers: &[Buffer],
-) -> Result<usize, Error> {
- // Sample count
- v.extend((buffers.len() as u32).to_be_bytes());
-
- let data_offset_offset = v.len();
- // Data offset, will be rewritten later
- v.extend(current_data_offset.to_be_bytes());
-
- if (tr_flags & FIRST_SAMPLE_FLAGS_PRESENT) != 0 {
- v.extend(sample_flags_from_buffer(timing_info, &buffers[0].buffer).to_be_bytes());
- }
-
- for Buffer {
- idx: _idx,
- ref buffer,
- timestamp: _timestamp,
- duration,
- composition_time_offset,
- } in buffers.iter()
- {
- if (tr_flags & SAMPLE_DURATION_PRESENT) != 0 {
- // Sample duration
- let sample_duration = u32::try_from(
- duration
- .nseconds()
- .mul_div_round(timescale as u64, gst::ClockTime::SECOND.nseconds())
- .context("too big sample duration")?,
- )
- .context("too big sample duration")?;
- v.extend(sample_duration.to_be_bytes());
- }
-
- if (tr_flags & SAMPLE_SIZE_PRESENT) != 0 {
- // Sample size
- v.extend((buffer.size() as u32).to_be_bytes());
- }
-
- if (tr_flags & SAMPLE_FLAGS_PRESENT) != 0 {
- assert!((tr_flags & FIRST_SAMPLE_FLAGS_PRESENT) == 0);
-
- // Sample flags
- v.extend(sample_flags_from_buffer(timing_info, buffer).to_be_bytes());
- }
-
- if (tr_flags & SAMPLE_COMPOSITION_TIME_OFFSET_PRESENT) != 0 {
- // Sample composition time offset
- let composition_time_offset = i32::try_from(
- composition_time_offset
- .unwrap_or(0)
- .mul_div_round(timescale as i64, gst::ClockTime::SECOND.nseconds() as i64)
- .context("too big composition time offset")?,
- )
- .context("too big composition time offset")?;
- v.extend(composition_time_offset.to_be_bytes());
- }
- }
-
- Ok(data_offset_offset)
-}
-
-/// Creates `mfra` box
-pub(crate) fn create_mfra(
- caps: &gst::CapsRef,
- fragment_offsets: &[super::FragmentOffset],
-) -> Result<gst::Buffer, Error> {
- let timescale = caps_to_timescale(caps);
-
- let mut v = vec![];
-
- let offset = write_box(&mut v, b"mfra", |v| {
- write_full_box(v, b"tfra", FULL_BOX_VERSION_1, FULL_BOX_FLAGS_NONE, |v| {
- // Track ID
- v.extend(1u32.to_be_bytes());
-
- // Reserved / length of traf/trun/sample
- v.extend(0u32.to_be_bytes());
-
- // Number of entries
- v.extend(
- u32::try_from(fragment_offsets.len())
- .context("too many fragments")?
- .to_be_bytes(),
- );
-
- for super::FragmentOffset { time, offset } in fragment_offsets {
- // Time
- let time = time
- .nseconds()
- .mul_div_round(timescale as u64, gst::ClockTime::SECOND.nseconds())
- .context("time overflow")?;
- v.extend(time.to_be_bytes());
-
- // moof offset
- v.extend(offset.to_be_bytes());
-
- // traf/trun/sample number
- v.extend_from_slice(&[1u8; 3][..]);
- }
-
- Ok(())
- })?;
-
- let offset = write_full_box(v, b"mfro", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| {
- let offset = v.len();
- // Parent size
- v.extend(0u32.to_be_bytes());
- Ok(offset)
- })?;
-
- Ok(offset)
- })?;
-
- let len = u32::try_from(v.len() as u64).context("too big mfra")?;
- v[offset..][..4].copy_from_slice(&len.to_be_bytes());
-
- Ok(gst::Buffer::from_mut_slice(v))
-}
-
-// Copy from std while this is still nightly-only
-use std::fmt;
-
-/// An iterator over slice in (non-overlapping) chunks separated by a predicate.
-///
-/// This struct is created by the [`group_by`] method on [slices].
-///
-/// [`group_by`]: slice::group_by
-/// [slices]: slice
-struct GroupBy<'a, T: 'a, P> {
- slice: &'a [T],
- predicate: P,
-}
-
-impl<'a, T: 'a, P> GroupBy<'a, T, P> {
- fn new(slice: &'a [T], predicate: P) -> Self {
- GroupBy { slice, predicate }
- }
-}
-
-impl<'a, T: 'a, P> Iterator for GroupBy<'a, T, P>
-where
- P: FnMut(&T, &T) -> bool,
-{
- type Item = &'a [T];
-
- #[inline]
- fn next(&mut self) -> Option<Self::Item> {
- if self.slice.is_empty() {
- None
- } else {
- let mut len = 1;
- let mut iter = self.slice.windows(2);
- while let Some([l, r]) = iter.next() {
- if (self.predicate)(l, r) {
- len += 1
- } else {
- break;
- }
- }
- let (head, tail) = self.slice.split_at(len);
- self.slice = tail;
- Some(head)
- }
- }
-
- #[inline]
- fn size_hint(&self) -> (usize, Option<usize>) {
- if self.slice.is_empty() {
- (0, Some(0))
- } else {
- (1, Some(self.slice.len()))
- }
- }
-
- #[inline]
- fn last(mut self) -> Option<Self::Item> {
- self.next_back()
- }
-}
-
-impl<'a, T: 'a, P> DoubleEndedIterator for GroupBy<'a, T, P>
-where
- P: FnMut(&T, &T) -> bool,
-{
- #[inline]
- fn next_back(&mut self) -> Option<Self::Item> {
- if self.slice.is_empty() {
- None
- } else {
- let mut len = 1;
- let mut iter = self.slice.windows(2);
- while let Some([l, r]) = iter.next_back() {
- if (self.predicate)(l, r) {
- len += 1
- } else {
- break;
- }
- }
- let (head, tail) = self.slice.split_at(self.slice.len() - len);
- self.slice = head;
- Some(tail)
- }
- }
-}
-
-impl<'a, T: 'a, P> std::iter::FusedIterator for GroupBy<'a, T, P> where P: FnMut(&T, &T) -> bool {}
-
-impl<'a, T: 'a + fmt::Debug, P> fmt::Debug for GroupBy<'a, T, P> {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("GroupBy")
- .field("slice", &self.slice)
- .finish()
- }
-}
diff --git a/generic/fmp4/src/fmp4mux/imp.rs b/generic/fmp4/src/fmp4mux/imp.rs
deleted file mode 100644
index a7a8932d..00000000
--- a/generic/fmp4/src/fmp4mux/imp.rs
+++ /dev/null
@@ -1,2596 +0,0 @@
-// Copyright (C) 2021 Sebastian Dröge <sebastian@centricular.com>
-//
-// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
-// If a copy of the MPL was not distributed with this file, You can obtain one at
-// <https://mozilla.org/MPL/2.0/>.
-//
-// SPDX-License-Identifier: MPL-2.0
-
-use gst::glib;
-use gst::prelude::*;
-use gst::subclass::prelude::*;
-use gst_base::prelude::*;
-use gst_base::subclass::prelude::*;
-
-use std::collections::VecDeque;
-use std::sync::Mutex;
-
-use once_cell::sync::Lazy;
-
-use super::boxes;
-use super::Buffer;
-
-/// Offset for the segment in non-single-stream variants.
-const SEGMENT_OFFSET: gst::ClockTime = gst::ClockTime::from_seconds(60 * 60 * 1000);
-
-/// Offset between UNIX epoch and Jan 1 1601 epoch in seconds.
-/// 1601 = UNIX + UNIX_1601_OFFSET.
-const UNIX_1601_OFFSET: u64 = 11_644_473_600;
-
-/// Offset between NTP and UNIX epoch in seconds.
-/// NTP = UNIX + NTP_UNIX_OFFSET.
-const NTP_UNIX_OFFSET: u64 = 2_208_988_800;
-
-/// Reference timestamp meta caps for NTP timestamps.
-static NTP_CAPS: Lazy<gst::Caps> = Lazy::new(|| gst::Caps::builder("timestamp/x-ntp").build());
-
-/// Reference timestamp meta caps for UNIX timestamps.
-static UNIX_CAPS: Lazy<gst::Caps> = Lazy::new(|| gst::Caps::builder("timestamp/x-unix").build());
-
-/// Returns the UTC time of the buffer in the UNIX epoch.
-fn get_utc_time_from_buffer(buffer: &gst::BufferRef) -> Option<gst::ClockTime> {
- buffer
- .iter_meta::<gst::ReferenceTimestampMeta>()
- .find_map(|meta| {
- if meta.reference().can_intersect(&UNIX_CAPS) {
- Some(meta.timestamp())
- } else if meta.reference().can_intersect(&NTP_CAPS) {
- meta.timestamp().checked_sub(NTP_UNIX_OFFSET.seconds())
- } else {
- None
- }
- })
-}
-
-static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
- gst::DebugCategory::new(
- "fmp4mux",
- gst::DebugColorFlags::empty(),
- Some("FMP4Mux Element"),
- )
-});
-
-const DEFAULT_FRAGMENT_DURATION: gst::ClockTime = gst::ClockTime::from_seconds(10);
-const DEFAULT_HEADER_UPDATE_MODE: super::HeaderUpdateMode = super::HeaderUpdateMode::None;
-const DEFAULT_WRITE_MFRA: bool = false;
-const DEFAULT_WRITE_MEHD: bool = false;
-const DEFAULT_INTERLEAVE_BYTES: Option<u64> = None;
-const DEFAULT_INTERLEAVE_TIME: Option<gst::ClockTime> = Some(gst::ClockTime::from_mseconds(250));
-
-#[derive(Debug, Clone)]
-struct Settings {
- fragment_duration: gst::ClockTime,
- header_update_mode: super::HeaderUpdateMode,
- write_mfra: bool,
- write_mehd: bool,
- interleave_bytes: Option<u64>,
- interleave_time: Option<gst::ClockTime>,
-}
-
-impl Default for Settings {
- fn default() -> Self {
- Settings {
- fragment_duration: DEFAULT_FRAGMENT_DURATION,
- header_update_mode: DEFAULT_HEADER_UPDATE_MODE,
- write_mfra: DEFAULT_WRITE_MFRA,
- write_mehd: DEFAULT_WRITE_MEHD,
- interleave_bytes: DEFAULT_INTERLEAVE_BYTES,
- interleave_time: DEFAULT_INTERLEAVE_TIME,
- }
- }
-}
-
-#[derive(Debug)]
-struct GopBuffer {
- buffer: gst::Buffer,
- pts: gst::ClockTime,
- dts: Option<gst::ClockTime>,
-}
-
-#[derive(Debug)]
-struct Gop {
- // Running times
- start_pts: gst::ClockTime,
- start_dts: Option<gst::ClockTime>,
- earliest_pts: gst::ClockTime,
- // Once this is known to be the final earliest PTS/DTS
- final_earliest_pts: bool,
- // PTS plus duration of last buffer, or start of next GOP
- end_pts: gst::ClockTime,
- // Once this is known to be the final end PTS/DTS
- final_end_pts: bool,
- // DTS plus duration of last buffer, or start of next GOP
- end_dts: Option<gst::ClockTime>,
-
- // Buffer positions
- earliest_pts_position: gst::ClockTime,
- start_dts_position: Option<gst::ClockTime>,
-
- // Buffer, PTS running time, DTS running time
- buffers: Vec<GopBuffer>,
-}
-
-struct Stream {
- sinkpad: gst_base::AggregatorPad,
-
- caps: gst::Caps,
- intra_only: bool,
-
- queued_gops: VecDeque<Gop>,
- fragment_filled: bool,
-
- // Difference between the first DTS and 0 in case of negative DTS
- dts_offset: Option<gst::ClockTime>,
-
- // Current position (DTS, or PTS for intra-only) to prevent
- // timestamps from going backwards when queueing new buffers
- current_position: gst::ClockTime,
-
- // Current UTC time in ONVIF mode to prevent timestamps from
- // going backwards when draining a fragment.
- // UNIX epoch.
- current_utc_time: gst::ClockTime,
-}
-
-#[derive(Default)]
-struct State {
- streams: Vec<Stream>,
-
- // Created once we received caps and kept up to date with the caps,
- // sent as part of the buffer list for the first fragment.
- stream_header: Option<gst::Buffer>,
-
- sequence_number: u32,
-
- // Fragment tracking for mfra
- current_offset: u64,
- fragment_offsets: Vec<super::FragmentOffset>,
-
- // Start / end PTS of the whole stream
- earliest_pts: Option<gst::ClockTime>,
- end_pts: Option<gst::ClockTime>,
-
- // Start PTS of the current fragment
- fragment_start_pts: Option<gst::ClockTime>,
- // Additional timeout delay in case GOPs are bigger than the fragment duration
- timeout_delay: gst::ClockTime,
-
- // In ONVIF mode the UTC time corresponding to the beginning of the stream
- // UNIX epoch.
- start_utc_time: Option<gst::ClockTime>,
- end_utc_time: Option<gst::ClockTime>,
-
- sent_headers: bool,
-}
-
-#[derive(Default)]
-pub(crate) struct FMP4Mux {
- state: Mutex<State>,
- settings: Mutex<Settings>,
-}
-
-impl FMP4Mux {
- fn find_earliest_stream<'a>(
- &self,
- state: &'a mut State,
- timeout: bool,
- ) -> Result<Option<(usize, &'a mut Stream)>, gst::FlowError> {
- let mut earliest_stream = None;
- let mut all_have_data_or_eos = true;
-
- for (idx, stream) in state.streams.iter_mut().enumerate() {
- let buffer = match stream.sinkpad.peek_buffer() {
- Some(buffer) => buffer,
- None => {
- if stream.sinkpad.is_eos() {
- gst::trace!(CAT, obj: &stream.sinkpad, "Stream is EOS");
- } else {
- all_have_data_or_eos = false;
- gst::trace!(CAT, obj: &stream.sinkpad, "Stream has no buffer");
- }
- continue;
- }
- };
-
- if stream.fragment_filled {
- gst::trace!(CAT, obj: &stream.sinkpad, "Stream has current fragment filled");
- continue;
- }
-
- let segment = match stream
- .sinkpad
- .segment()
- .clone()
- .downcast::<gst::ClockTime>()
- .ok()
- {
- Some(segment) => segment,
- None => {
- gst::error!(CAT, obj: &stream.sinkpad, "Got buffer before segment");
- return Err(gst::FlowError::Error);
- }
- };
-
- // If the stream has no valid running time, assume it's before everything else.
- let running_time = match segment.to_running_time(buffer.dts_or_pts()) {
- None => {
- gst::trace!(CAT, obj: &stream.sinkpad, "Stream has no valid running time");
- if earliest_stream
- .as_ref()
- .map_or(true, |(_, _, earliest_running_time)| {
- *earliest_running_time > gst::ClockTime::ZERO
- })
- {
- earliest_stream = Some((idx, stream, gst::ClockTime::ZERO));
- }
- continue;
- }
- Some(running_time) => running_time,
- };
-
- gst::trace!(CAT, obj: &stream.sinkpad, "Stream has running time {} queued", running_time);
-
- if earliest_stream
- .as_ref()
- .map_or(true, |(_idx, _stream, earliest_running_time)| {
- *earliest_running_time > running_time
- })
- {
- earliest_stream = Some((idx, stream, running_time));
- }
- }
-
- if !timeout && !all_have_data_or_eos {
- gst::trace!(
- CAT,
- imp: self,
- "No timeout and not all streams have a buffer or are EOS"
- );
- Ok(None)
- } else if let Some((idx, stream, earliest_running_time)) = earliest_stream {
- gst::trace!(
- CAT,
- imp: self,
- "Stream {} is earliest stream with running time {}",
- stream.sinkpad.name(),
- earliest_running_time
- );
- Ok(Some((idx, stream)))
- } else {
- gst::trace!(CAT, imp: self, "No streams have data queued currently");
- Ok(None)
- }
- }
-
- // Queue incoming buffers as individual GOPs.
- fn queue_gops(
- &self,
- _idx: usize,
- stream: &mut Stream,
- segment: &gst::FormattedSegment<gst::ClockTime>,
- mut buffer: gst::Buffer,
- ) -> Result<(), gst::FlowError> {
- use gst::Signed::*;
-
- assert!(!stream.fragment_filled);
-
- gst::trace!(CAT, obj: &stream.sinkpad, "Handling buffer {:?}", buffer);
-
- let intra_only = stream.intra_only;
-
- if !intra_only && buffer.dts().is_none() {
- gst::error!(CAT, obj: &stream.sinkpad, "Require DTS for video streams");
- return Err(gst::FlowError::Error);
- }
-
- if intra_only && buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) {
- gst::error!(CAT, obj: &stream.sinkpad, "Intra-only stream with delta units");
- return Err(gst::FlowError::Error);
- }
-
- let pts_position = buffer.pts().ok_or_else(|| {
- gst::error!(CAT, obj: &stream.sinkpad, "Require timestamped buffers");
- gst::FlowError::Error
- })?;
- let duration = buffer.duration();
- let end_pts_position = duration.opt_add(pts_position).unwrap_or(pts_position);
-
- let mut pts = segment
- .to_running_time_full(pts_position)
- .ok_or_else(|| {
- gst::error!(CAT, obj: &stream.sinkpad, "Couldn't convert PTS to running time");
- gst::FlowError::Error
- })?
- .positive_or_else(|_| {
- gst::error!(CAT, obj: &stream.sinkpad, "Negative PTSs are not supported");
- gst::FlowError::Error
- })?;
-
- let mut end_pts = segment
- .to_running_time_full(end_pts_position)
- .ok_or_else(|| {
- gst::error!(CAT, obj: &stream.sinkpad, "Couldn't convert end PTS to running time");
- gst::FlowError::Error
- })?
- .positive_or_else(|_| {
- gst::error!(CAT, obj: &stream.sinkpad, "Negative PTSs are not supported");
- gst::FlowError::Error
- })?;
-
- // Enforce monotonically increasing PTS for intra-only streams
- if intra_only {
- if pts < stream.current_position {
- gst::warning!(
- CAT,
- obj: &stream.sinkpad,
- "Decreasing PTS {} < {} for intra-only stream",
- pts,
- stream.current_position,
- );
- pts = stream.current_position;
- } else {
- stream.current_position = pts;
- }
- end_pts = std::cmp::max(end_pts, pts);
- }
-
- let (dts_position, dts, end_dts) = if intra_only {
- (None, None, None)
- } else {
- // Negative DTS are handled via the dts_offset and by having negative composition time
- // offsets in the `trun` box. The smallest DTS here is shifted to zero.
- let dts_position = buffer.dts().expect("not DTS");
- let end_dts_position = duration.opt_add(dts_position).unwrap_or(dts_position);
-
- let signed_dts = segment.to_running_time_full(dts_position).ok_or_else(|| {
- gst::error!(CAT, obj: &stream.sinkpad, "Couldn't convert DTS to running time");
- gst::FlowError::Error
- })?;
- let mut dts = match signed_dts {
- Positive(dts) => {
- if let Some(dts_offset) = stream.dts_offset {
- dts + dts_offset
- } else {
- dts
- }
- }
- Negative(dts) => {
- if stream.dts_offset.is_none() {
- stream.dts_offset = Some(dts);
- }
-
- let dts_offset = stream.dts_offset.unwrap();
- if dts > dts_offset {
- gst::warning!(CAT, obj: &stream.sinkpad, "DTS before first DTS");
- gst::ClockTime::ZERO
- } else {
- dts_offset - dts
- }
- }
- };
-
- let signed_end_dts =
- segment
- .to_running_time_full(end_dts_position)
- .ok_or_else(|| {
- gst::error!(
- CAT,
- obj: &stream.sinkpad,
- "Couldn't convert end DTS to running time"
- );
- gst::FlowError::Error
- })?;
- let mut end_dts = match signed_end_dts {
- Positive(dts) => {
- if let Some(dts_offset) = stream.dts_offset {
- dts + dts_offset
- } else {
- dts
- }
- }
- Negative(dts) => {
- if stream.dts_offset.is_none() {
- stream.dts_offset = Some(dts);
- }
-
- let dts_offset = stream.dts_offset.unwrap();
- if dts > dts_offset {
- gst::warning!(CAT, obj: &stream.sinkpad, "End DTS before first DTS");
- gst::ClockTime::ZERO
- } else {
- dts_offset - dts
- }
- }
- };
-
- // Enforce monotonically increasing DTS for intra-only streams
- // NOTE: PTS stays the same so this will cause a bigger PTS/DTS difference
- // FIXME: Is this correct?
- if dts < stream.current_position {
- gst::warning!(
- CAT,
- obj: &stream.sinkpad,
- "Decreasing DTS {} < {}",
- dts,
- stream.current_position,
- );
- dts = stream.current_position;
- } else {
- stream.current_position = dts;
- }
- end_dts = std::cmp::max(end_dts, dts);
-
- (Some(dts_position), Some(dts), Some(end_dts))
- };
-
- // If this is a multi-stream element then we need to update the PTS/DTS positions according
- // to the output segment, specifically to re-timestamp them with the running time and
- // adjust for the segment shift to compensate for negative DTS.
- let aggregator = self.instance();
- let class = aggregator.class();
- let (pts_position, dts_position) = if class.as_ref().variant.is_single_stream() {
- (pts_position, dts_position)
- } else {
- let pts_position = pts + SEGMENT_OFFSET;
- let dts_position = dts.map(|dts| {
- dts + SEGMENT_OFFSET - stream.dts_offset.unwrap_or(gst::ClockTime::ZERO)
- });
-
- let buffer = buffer.make_mut();
- buffer.set_pts(pts_position);
- buffer.set_dts(dts_position);
-
- (pts_position, dts_position)
- };
-
- if !buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) {
- gst::debug!(
- CAT,
- obj: &stream.sinkpad,
- "Starting new GOP at PTS {} DTS {} (DTS offset {})",
- pts,
- dts.display(),
- stream.dts_offset.display(),
- );
-
- let gop = Gop {
- start_pts: pts,
- start_dts: dts,
- start_dts_position: if intra_only { None } else { dts_position },
- earliest_pts: pts,
- earliest_pts_position: pts_position,
- final_earliest_pts: intra_only,
- end_pts,
- end_dts,
- final_end_pts: false,
- buffers: vec![GopBuffer { buffer, pts, dts }],
- };
- stream.queued_gops.push_front(gop);
-
- if let Some(prev_gop) = stream.queued_gops.get_mut(1) {
- gst::debug!(
- CAT,
- obj: &stream.sinkpad,
- "Updating previous GOP starting at PTS {} to end PTS {} DTS {}",
- prev_gop.earliest_pts,
- pts,
- dts.display(),
- );
-
- prev_gop.end_pts = std::cmp::max(prev_gop.end_pts, pts);
- prev_gop.end_dts = std::cmp::max(prev_gop.end_dts, dts);
-
- if intra_only {
- prev_gop.final_end_pts = true;
- }
-
- if !prev_gop.final_earliest_pts {
- // Don't bother logging this for intra-only streams as it would be for every
- // single buffer.
- if !intra_only {
- gst::debug!(
- CAT,
- obj: &stream.sinkpad,
- "Previous GOP has final earliest PTS at {}",
- prev_gop.earliest_pts
- );
- }
-
- prev_gop.final_earliest_pts = true;
- if let Some(prev_prev_gop) = stream.queued_gops.get_mut(2) {
- prev_prev_gop.final_end_pts = true;
- }
- }
- }
- } else if let Some(gop) = stream.queued_gops.front_mut() {
- assert!(!intra_only);
-
- // We require DTS for non-intra-only streams
- let dts = dts.unwrap();
- let end_dts = end_dts.unwrap();
-
- gop.end_pts = std::cmp::max(gop.end_pts, end_pts);
- gop.end_dts = Some(std::cmp::max(gop.end_dts.expect("no end DTS"), end_dts));
- gop.buffers.push(GopBuffer {
- buffer,
- pts,
- dts: Some(dts),
- });
-
- if gop.earliest_pts > pts && !gop.final_earliest_pts {
- gst::debug!(
- CAT,
- obj: &stream.sinkpad,
- "Updating current GOP earliest PTS from {} to {}",
- gop.earliest_pts,
- pts
- );
- gop.earliest_pts = pts;
- gop.earliest_pts_position = pts_position;
-
- if let Some(prev_gop) = stream.queued_gops.get_mut(1) {
- if prev_gop.end_pts < pts {
- gst::debug!(
- CAT,
- obj: &stream.sinkpad,
- "Updating previous GOP starting PTS {} end time from {} to {}",
- pts,
- prev_gop.end_pts,
- pts
- );
- prev_gop.end_pts = pts;
- }
- }
- }
-
- let gop = stream.queued_gops.front_mut().unwrap();
-
- // The earliest PTS is known when the current DTS is bigger or equal to the first
- // PTS that was observed in this GOP. If there was another frame later that had a
- // lower PTS then it wouldn't be possible to display it in time anymore, i.e. the
- // stream would be invalid.
- if gop.start_pts <= dts && !gop.final_earliest_pts {
- gst::debug!(
- CAT,
- obj: &stream.sinkpad,
- "GOP has final earliest PTS at {}",
- gop.earliest_pts
- );
- gop.final_earliest_pts = true;
-
- if let Some(prev_gop) = stream.queued_gops.get_mut(1) {
- prev_gop.final_end_pts = true;
- }
- }
- } else {
- gst::warning!(
- CAT,
- obj: &stream.sinkpad,
- "Waiting for keyframe at the beginning of the stream"
- );
- }
-
- if let Some((prev_gop, first_gop)) = Option::zip(
- stream.queued_gops.iter().find(|gop| gop.final_end_pts),
- stream.queued_gops.back(),
- ) {
- gst::debug!(
- CAT,
- obj: &stream.sinkpad,
- "Queued full GOPs duration updated to {}",
- prev_gop.end_pts.saturating_sub(first_gop.earliest_pts),
- );
- }
-
- gst::debug!(
- CAT,
- obj: &stream.sinkpad,
- "Queued duration updated to {}",
- Option::zip(stream.queued_gops.front(), stream.queued_gops.back())
- .map(|(end, start)| end.end_pts.saturating_sub(start.start_pts))
- .unwrap_or(gst::ClockTime::ZERO)
- );
-
- Ok(())
- }
-
- #[allow(clippy::type_complexity)]
- fn drain_buffers(
- &self,
- state: &mut State,
- settings: &Settings,
- timeout: bool,
- at_eos: bool,
- ) -> Result<
- (
- // Drained streams
- Vec<(
- gst::Caps,
- Option<super::FragmentTimingInfo>,
- VecDeque<Buffer>,
- )>,
- // Minimum earliest PTS position of all streams
- Option<gst::ClockTime>,
- // Minimum earliest PTS of all streams
- Option<gst::ClockTime>,
- // Minimum start DTS position of all streams (if any stream has DTS)
- Option<gst::ClockTime>,
- // End PTS of this drained fragment, i.e. start PTS of the next fragment
- Option<gst::ClockTime>,
- ),
- gst::FlowError,
- > {
- let mut drained_streams = Vec::with_capacity(state.streams.len());
-
- let mut min_earliest_pts_position = None;
- let mut min_earliest_pts = None;
- let mut min_start_dts_position = None;
- let mut fragment_end_pts = None;
-
- // The first stream decides how much can be dequeued, if anything at all.
- //
- // All complete GOPs (or at EOS everything) up to the fragment duration will be dequeued
- // but on timeout in live pipelines it might happen that the first stream does not have a
- // complete GOP queued. In that case nothing is dequeued for any of the streams and the
- // timeout is advanced by 1s until at least one complete GOP can be dequeued.
- //
- // If the first stream is already EOS then the next stream that is not EOS yet will be
- // taken in its place.
- let fragment_start_pts = state.fragment_start_pts.unwrap();
- gst::info!(
- CAT,
- imp: self,
- "Starting to drain at {}",
- fragment_start_pts
- );
-
- for (idx, stream) in state.streams.iter_mut().enumerate() {
- assert!(
- timeout
- || at_eos
- || stream.sinkpad.is_eos()
- || stream.queued_gops.get(1).map(|gop| gop.final_earliest_pts) == Some(true)
- );
-
- // Drain all complete GOPs until at most one fragment duration was dequeued for the
- // first stream, or until the dequeued duration of the first stream.
- let mut gops = Vec::with_capacity(stream.queued_gops.len());
- let dequeue_end_pts =
- fragment_end_pts.unwrap_or(fragment_start_pts + settings.fragment_duration);
- gst::trace!(
- CAT,
- obj: &stream.sinkpad,
- "Draining up to end PTS {} / duration {}",
- dequeue_end_pts,
- dequeue_end_pts - fragment_start_pts
- );
-
- while let Some(gop) = stream.queued_gops.back() {
- // If this GOP is not complete then we can't pop it yet.
- //
- // If there was no complete GOP at all yet then it might be bigger than the
- // fragment duration. In this case we might not be able to handle the latency
- // requirements in a live pipeline.
- if !gop.final_end_pts && !at_eos && !stream.sinkpad.is_eos() {
- break;
- }
-
- // If this GOP starts after the fragment end then don't dequeue it yet unless this is
- // the first stream and no GOPs were dequeued at all yet. This would mean that the
- // GOP is bigger than the fragment duration.
- if gop.end_pts > dequeue_end_pts && (fragment_end_pts.is_some() || !gops.is_empty())
- {
- break;
- }
-
- gops.push(stream.queued_gops.pop_back().unwrap());
- }
- stream.fragment_filled = false;
-
- // If we don't have a next fragment start PTS then this is the first stream as above.
- if fragment_end_pts.is_none() {
- if let Some(last_gop) = gops.last() {
- // Dequeued something so let's take the end PTS of the last GOP
- fragment_end_pts = Some(last_gop.end_pts);
- gst::info!(
- CAT,
- obj: &stream.sinkpad,
- "Draining up to PTS {} for this fragment",
- last_gop.end_pts,
- );
- } else {
- // If nothing was dequeued for the first stream then this is OK if we're at
- // EOS: we just consider the next stream as first stream then.
- if at_eos || stream.sinkpad.is_eos() {
- // This is handled below generally if nothing was dequeued
- } else {
- // Otherwise this can only really happen on timeout in live pipelines.
- assert!(timeout);
-
- gst::warning!(
- CAT,
- obj: &stream.sinkpad,
- "Don't have a complete GOP for the first stream on timeout in a live pipeline",
- );
-
- // In this case we advance the timeout by 1s and hope that things are
- // better then.
- return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA);
- }
- }
- }
-
- if gops.is_empty() {
- gst::info!(
- CAT,
- obj: &stream.sinkpad,
- "Draining no buffers",
- );
-
- drained_streams.push((stream.caps.clone(), None, VecDeque::new()));
- continue;
- }
-
- assert!(fragment_end_pts.is_some());
-
- let first_gop = gops.first().unwrap();
- let last_gop = gops.last().unwrap();
- let earliest_pts = first_gop.earliest_pts;
- let earliest_pts_position = first_gop.earliest_pts_position;
- let start_dts = first_gop.start_dts;
- let start_dts_position = first_gop.start_dts_position;
- let end_pts = last_gop.end_pts;
- let dts_offset = stream.dts_offset;
-
- if min_earliest_pts.opt_gt(earliest_pts).unwrap_or(true) {
- min_earliest_pts = Some(earliest_pts);
- }
- if min_earliest_pts_position
- .opt_gt(earliest_pts_position)
- .unwrap_or(true)
- {
- min_earliest_pts_position = Some(earliest_pts_position);
- }
- if let Some(start_dts_position) = start_dts_position {
- if min_start_dts_position
- .opt_gt(start_dts_position)
- .unwrap_or(true)
- {
- min_start_dts_position = Some(start_dts_position);
- }
- }
-
- gst::info!(
- CAT,
- obj: &stream.sinkpad,
- "Draining {} worth of buffers starting at PTS {} DTS {}, DTS offset {}",
- end_pts.saturating_sub(earliest_pts),
- earliest_pts,
- start_dts.display(),
- dts_offset.display(),
- );
-
- if let Some((prev_gop, first_gop)) = Option::zip(
- stream.queued_gops.iter().find(|gop| gop.final_end_pts),
- stream.queued_gops.back(),
- ) {
- gst::debug!(
- CAT,
- obj: &stream.sinkpad,
- "Queued full GOPs duration updated to {}",
- prev_gop.end_pts.saturating_sub(first_gop.earliest_pts),
- );
- }
-
- gst::debug!(
- CAT,
- obj: &stream.sinkpad,
- "Queued duration updated to {}",
- Option::zip(stream.queued_gops.front(), stream.queued_gops.back())
- .map(|(end, start)| end.end_pts.saturating_sub(start.start_pts))
- .unwrap_or(gst::ClockTime::ZERO)
- );
-
- let start_time = if stream.intra_only {
- earliest_pts
- } else {
- start_dts.unwrap()
- };
-
- let mut buffers = VecDeque::with_capacity(gops.iter().map(|g| g.buffers.len()).sum());
-
- for gop in gops {
- let mut gop_buffers = gop.buffers.into_iter().peekable();
- while let Some(buffer) = gop_buffers.next() {
- let timestamp = if stream.intra_only {
- buffer.pts
- } else {
- buffer.dts.unwrap()
- };
-
- let end_timestamp = match gop_buffers.peek() {
- Some(buffer) => {
- if stream.intra_only {
- buffer.pts
- } else {
- buffer.dts.unwrap()
- }
- }
- None => {
- if stream.intra_only {
- gop.end_pts
- } else {
- gop.end_dts.unwrap()
- }
- }
- };
-
- // Timestamps are enforced to monotonically increase when queueing buffers
- let duration = end_timestamp
- .checked_sub(timestamp)
- .expect("Timestamps going backwards");
-
- let composition_time_offset = if stream.intra_only {
- None
- } else {
- let pts = buffer.pts;
- let dts = buffer.dts.unwrap();
-
- if pts > dts {
- Some(
- i64::try_from((pts - dts).nseconds())
- .map_err(|_| {
- gst::error!(CAT, obj: &stream.sinkpad, "Too big PTS/DTS difference");
- gst::FlowError::Error
- })?,
- )
- } else {
- let diff = i64::try_from((dts - pts).nseconds())
- .map_err(|_| {
- gst::error!(CAT, obj: &stream.sinkpad, "Too big PTS/DTS difference");
- gst::FlowError::Error
- })?;
- Some(-diff)
- }
- };
-
- buffers.push_back(Buffer {
- idx,
- buffer: buffer.buffer,
- timestamp,
- duration,
- composition_time_offset,
- });
- }
- }
-
- drained_streams.push((
- stream.caps.clone(),
- Some(super::FragmentTimingInfo {
- start_time,
- intra_only: stream.intra_only,
- }),
- buffers,
- ));
- }
-
- Ok((
- drained_streams,
- min_earliest_pts_position,
- min_earliest_pts,
- min_start_dts_position,
- fragment_end_pts,
- ))
- }
-
- fn preprocess_drained_streams_onvif(
- &self,
- state: &mut State,
- drained_streams: &mut [(
- gst::Caps,
- Option<super::FragmentTimingInfo>,
- VecDeque<Buffer>,
- )],
- ) -> Result<Option<gst::ClockTime>, gst::FlowError> {
- let aggregator = self.instance();
- if aggregator.class().as_ref().variant != super::Variant::ONVIF {
- return Ok(None);
- }
-
- let mut max_end_utc_time = None;
-
- let calculate_pts = |buffer: &Buffer| -> gst::ClockTime {
- let composition_time_offset = buffer.composition_time_offset.unwrap_or(0);
- if composition_time_offset > 0 {
- buffer.timestamp + (composition_time_offset as u64).nseconds()
- } else {
- buffer
- .timestamp
- .checked_sub(((-composition_time_offset) as u64).nseconds())
- .unwrap()
- }
- };
-
- // If this is the first fragment then allow the first buffers to not have a reference
- // timestamp meta and backdate them
- if state.stream_header.is_none() {
- for (idx, (_, _, drain_buffers)) in drained_streams.iter_mut().enumerate() {
- let (buffer_idx, utc_time, buffer) =
- match drain_buffers.iter().enumerate().find_map(|(idx, buffer)| {
- get_utc_time_from_buffer(&buffer.buffer)
- .map(|timestamp| (idx, timestamp, buffer))
- }) {
- None => {
- gst::error!(
- CAT,
- obj: &state.streams[idx].sinkpad,
- "No reference timestamp set on any buffers in the first fragment",
- );
- return Err(gst::FlowError::Error);
- }
- Some(res) => res,
- };
-
- // Now do the backdating
- if buffer_idx > 0 {
- let utc_time_pts = calculate_pts(buffer);
-
- for buffer in drain_buffers.iter_mut().take(buffer_idx) {
- let buffer_pts = calculate_pts(buffer);
- let buffer_pts_diff = if utc_time_pts >= buffer_pts {
- (utc_time_pts - buffer_pts).nseconds() as i64
- } else {
- -((buffer_pts - utc_time_pts).nseconds() as i64)
- };
- let buffer_utc_time = if buffer_pts_diff >= 0 {
- utc_time
- .checked_sub((buffer_pts_diff as u64).nseconds())
- .unwrap()
- } else {
- utc_time
- .checked_add(((-buffer_pts_diff) as u64).nseconds())
- .unwrap()
- };
-
- let buffer = buffer.buffer.make_mut();
- gst::ReferenceTimestampMeta::add(
- buffer,
- &UNIX_CAPS,
- buffer_utc_time,
- gst::ClockTime::NONE,
- );
- }
- }
- }
- }
-
- // Calculate the minimum across all streams and remember that
- if state.start_utc_time.is_none() {
- let mut start_utc_time = None;
-
- for (idx, (_, _, drain_buffers)) in drained_streams.iter().enumerate() {
- for buffer in drain_buffers {
- let utc_time = match get_utc_time_from_buffer(&buffer.buffer) {
- None => {
- gst::error!(
- CAT,
- obj: &state.streams[idx].sinkpad,
- "No reference timestamp set on all buffers"
- );
- return Err(gst::FlowError::Error);
- }
- Some(utc_time) => utc_time,
- };
-
- if start_utc_time.is_none() || start_utc_time > Some(utc_time) {
- start_utc_time = Some(utc_time);
- }
- }
- }
-
- gst::debug!(
- CAT,
- imp: self,
- "Configuring start UTC time {}",
- start_utc_time.unwrap()
- );
- state.start_utc_time = start_utc_time;
- }
-
- // Update all buffer timestamps based on the UTC time and offset to the start UTC time
- let start_utc_time = state.start_utc_time.unwrap();
- for (idx, (_, timing_info, drain_buffers)) in drained_streams.iter_mut().enumerate() {
- let mut start_time = None;
-
- for buffer in drain_buffers.iter_mut() {
- let utc_time = match get_utc_time_from_buffer(&buffer.buffer) {
- None => {
- gst::error!(
- CAT,
- obj: &state.streams[idx].sinkpad,
- "No reference timestamp set on all buffers"
- );
- return Err(gst::FlowError::Error);
- }
- Some(utc_time) => utc_time,
- };
-
- // Convert PTS UTC time to DTS
- let mut utc_time_dts =
- if let Some(composition_time_offset) = buffer.composition_time_offset {
- if composition_time_offset >= 0 {
- utc_time
- .checked_sub((composition_time_offset as u64).nseconds())
- .unwrap()
- } else {
- utc_time
- .checked_add(((-composition_time_offset) as u64).nseconds())
- .unwrap()
- }
- } else {
- utc_time
- };
-
- // Enforce monotonically increasing timestamps
- if utc_time_dts < state.streams[idx].current_utc_time {
- gst::warning!(
- CAT,
- obj: &state.streams[idx].sinkpad,
- "Decreasing UTC DTS timestamp for buffer {} < {}",
- utc_time_dts,
- state.streams[idx].current_utc_time,
- );
- utc_time_dts = state.streams[idx].current_utc_time;
- } else {
- state.streams[idx].current_utc_time = utc_time_dts;
- }
-
- let timestamp = utc_time_dts.checked_sub(start_utc_time).unwrap();
-
- gst::trace!(
- CAT,
- obj: &state.streams[idx].sinkpad,
- "Updating buffer timestamp from {} to relative UTC DTS time {} / absolute DTS time {}, UTC PTS time {}",
- buffer.timestamp,
- timestamp,
- utc_time_dts,
- utc_time,
- );
-
- buffer.timestamp = timestamp;
- if start_time.is_none() || start_time > Some(buffer.timestamp) {
- start_time = Some(buffer.timestamp);
- }
- }
-
- // Update durations for all buffers except for the last in the fragment unless all
- // have the same duration anyway
- let mut common_duration = Ok(None);
- let mut drain_buffers_iter = drain_buffers.iter_mut().peekable();
- while let Some(buffer) = drain_buffers_iter.next() {
- let next_timestamp = drain_buffers_iter.peek().map(|b| b.timestamp);
-
- if let Some(next_timestamp) = next_timestamp {
- let duration = next_timestamp.saturating_sub(buffer.timestamp);
- if common_duration == Ok(None) {
- common_duration = Ok(Some(duration));
- } else if common_duration != Ok(Some(duration)) {
- common_duration = Err(());
- }
-
- gst::trace!(
- CAT,
- obj: &state.streams[idx].sinkpad,
- "Updating buffer with timestamp {} duration from {} to relative UTC duration {}",
- buffer.timestamp,
- buffer.duration,
- duration,
- );
-
- buffer.duration = duration;
- } else if let Ok(Some(common_duration)) = common_duration {
- gst::trace!(
- CAT,
- obj: &state.streams[idx].sinkpad,
- "Updating last buffer with timestamp {} duration from {} to common relative UTC duration {}",
- buffer.timestamp,
- buffer.duration,
- common_duration,
- );
-
- buffer.duration = common_duration;
- } else {
- gst::trace!(
- CAT,
- obj: &state.streams[idx].sinkpad,
- "Keeping last buffer with timestamp {} duration at {}",
- buffer.timestamp,
- buffer.duration,
- );
- }
-
- let end_utc_time = start_utc_time + buffer.timestamp + buffer.duration;
- if max_end_utc_time.is_none() || max_end_utc_time < Some(end_utc_time) {
- max_end_utc_time = Some(end_utc_time);
- }
- }
-
- if let Some(start_time) = start_time {
- gst::debug!(CAT, obj: &state.streams[idx].sinkpad, "Fragment starting at UTC time {}", start_time);
- timing_info.as_mut().unwrap().start_time = start_time;
- } else {
- assert!(timing_info.is_none());
- }
- }
-
- Ok(max_end_utc_time)
- }
-
- #[allow(clippy::type_complexity)]
- fn interleave_buffers(
- &self,
- settings: &Settings,
- mut drained_streams: Vec<(
- gst::Caps,
- Option<super::FragmentTimingInfo>,
- VecDeque<Buffer>,
- )>,
- ) -> Result<
- (
- Vec<Buffer>,
- Vec<(gst::Caps, Option<super::FragmentTimingInfo>)>,
- ),
- gst::FlowError,
- > {
- let mut interleaved_buffers =
- Vec::with_capacity(drained_streams.iter().map(|(_, _, bufs)| bufs.len()).sum());
- while let Some((_idx, (_, _, bufs))) = drained_streams.iter_mut().enumerate().min_by(
- |(a_idx, (_, _, a)), (b_idx, (_, _, b))| {
- let (a, b) = match (a.front(), b.front()) {
- (None, None) => return std::cmp::Ordering::Equal,
- (None, _) => return std::cmp::Ordering::Greater,
- (_, None) => return std::cmp::Ordering::Less,
- (Some(a), Some(b)) => (a, b),
- };
-
- match a.timestamp.cmp(&b.timestamp) {
- std::cmp::Ordering::Equal => a_idx.cmp(b_idx),
- cmp => cmp,
- }
- },
- ) {
- let start_time = match bufs.front() {
- None => {
- // No more buffers now
- break;
- }
- Some(buf) => buf.timestamp,
- };
- let mut current_end_time = start_time;
- let mut dequeued_bytes = 0;
-
- while settings
- .interleave_bytes
- .opt_ge(dequeued_bytes)
- .unwrap_or(true)
- && settings
- .interleave_time
- .opt_ge(current_end_time.saturating_sub(start_time))
- .unwrap_or(true)
- {
- if let Some(buffer) = bufs.pop_front() {
- current_end_time = buffer.timestamp + buffer.duration;
- dequeued_bytes += buffer.buffer.size() as u64;
- interleaved_buffers.push(buffer);
- } else {
- // No buffers left in this stream, go to next stream
- break;
- }
- }
- }
-
- // All buffers should be consumed now
- assert!(drained_streams.iter().all(|(_, _, bufs)| bufs.is_empty()));
-
- let streams = drained_streams
- .into_iter()
- .map(|(caps, timing_info, _)| (caps, timing_info))
- .collect::<Vec<_>>();
-
- Ok((interleaved_buffers, streams))
- }
-
- fn drain(
- &self,
- state: &mut State,
- settings: &Settings,
- timeout: bool,
- at_eos: bool,
- upstream_events: &mut Vec<(gst_base::AggregatorPad, gst::Event)>,
- ) -> Result<(Option<gst::Caps>, Option<gst::BufferList>), gst::FlowError> {
- if at_eos {
- gst::info!(CAT, imp: self, "Draining at EOS");
- } else if timeout {
- gst::info!(CAT, imp: self, "Draining at timeout");
- } else {
- for stream in &state.streams {
- if !stream.fragment_filled && !stream.sinkpad.is_eos() {
- return Ok((None, None));
- }
- }
- }
-
- // Collect all buffers and their timing information that are to be drained right now.
- let (
- mut drained_streams,
- min_earliest_pts_position,
- min_earliest_pts,
- min_start_dts_position,
- fragment_end_pts,
- ) = self.drain_buffers(state, settings, timeout, at_eos)?;
-
- // Remove all GAP buffers before processing them further
- for (_, _, buffers) in &mut drained_streams {
- buffers.retain(|buf| {
- !buf.buffer.flags().contains(gst::BufferFlags::GAP)
- || !buf.buffer.flags().contains(gst::BufferFlags::DROPPABLE)
- || buf.buffer.size() != 0
- });
- }
-
- // For ONVIF, replace all timestamps with timestamps based on UTC times.
- let max_end_utc_time =
- self.preprocess_drained_streams_onvif(state, &mut drained_streams)?;
-
- // Create header now if it was not created before and return the caps
- let mut caps = None;
- if state.stream_header.is_none() {
- let (_, new_caps) = self.update_header(state, settings, false)?.unwrap();
- caps = Some(new_caps);
- }
-
- // Interleave buffers according to the settings into a single vec
- let (mut interleaved_buffers, streams) =
- self.interleave_buffers(settings, drained_streams)?;
-
- let mut buffer_list = None;
- if interleaved_buffers.is_empty() {
- assert!(at_eos);
- } else {
- // If there are actual buffers to output then create headers as needed and create a
- // bufferlist for all buffers that have to be output.
- let min_earliest_pts_position = min_earliest_pts_position.unwrap();
- let min_earliest_pts = min_earliest_pts.unwrap();
- let fragment_end_pts = fragment_end_pts.unwrap();
-
- let mut fmp4_header = None;
- if !state.sent_headers {
- let mut buffer = state.stream_header.as_ref().unwrap().copy();
- {
- let buffer = buffer.get_mut().unwrap();
-
- buffer.set_pts(min_earliest_pts_position);
- buffer.set_dts(min_start_dts_position);
-
- // Header is DISCONT|HEADER
- buffer.set_flags(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER);
- }
-
- fmp4_header = Some(buffer);
-
- state.sent_headers = true;
- }
-
- // TODO: Write prft boxes before moof
- // TODO: Write sidx boxes before moof and rewrite once offsets are known
-
- if state.sequence_number == 0 {
- state.sequence_number = 1;
- }
- let sequence_number = state.sequence_number;
- state.sequence_number += 1;
- let (mut fmp4_fragment_header, moof_offset) =
- boxes::create_fmp4_fragment_header(super::FragmentHeaderConfiguration {
- variant: self.instance().class().as_ref().variant,
- sequence_number,
- streams: streams.as_slice(),
- buffers: interleaved_buffers.as_slice(),
- })
- .map_err(|err| {
- gst::error!(
- CAT,
- imp: self,
- "Failed to create FMP4 fragment header: {}",
- err
- );
- gst::FlowError::Error
- })?;
-
- {
- let buffer = fmp4_fragment_header.get_mut().unwrap();
- buffer.set_pts(min_earliest_pts_position);
- buffer.set_dts(min_start_dts_position);
- buffer.set_duration(fragment_end_pts.checked_sub(min_earliest_pts));
-
- // Fragment header is HEADER
- buffer.set_flags(gst::BufferFlags::HEADER);
-
- // Copy metas from the first actual buffer to the fragment header. This allows
- // getting things like the reference timestamp meta or the timecode meta to identify
- // the fragment.
- let _ = interleaved_buffers[0].buffer.copy_into(
- buffer,
- gst::BufferCopyFlags::META,
- 0,
- None,
- );
- }
-
- let moof_offset = state.current_offset
- + fmp4_header.as_ref().map(|h| h.size()).unwrap_or(0) as u64
- + moof_offset;
-
- let buffers_len = interleaved_buffers.len();
- for (idx, buffer) in interleaved_buffers.iter_mut().enumerate() {
- // Fix up buffer flags, all other buffers are DELTA_UNIT
- let buffer_ref = buffer.buffer.make_mut();
- buffer_ref.unset_flags(gst::BufferFlags::all());
- buffer_ref.set_flags(gst::BufferFlags::DELTA_UNIT);
-
- // Set the marker flag for the last buffer of the segment
- if idx == buffers_len - 1 {
- buffer_ref.set_flags(gst::BufferFlags::MARKER);
- }
- }
-
- buffer_list = Some(
- fmp4_header
- .into_iter()
- .chain(Some(fmp4_fragment_header))
- .chain(interleaved_buffers.into_iter().map(|buffer| buffer.buffer))
- .inspect(|b| {
- state.current_offset += b.size() as u64;
- })
- .collect::<gst::BufferList>(),
- );
-
- // Write mfra only for the main stream, and if there are no buffers for the main stream
- // in this segment then don't write anything.
- if let Some((_caps, Some(ref timing_info))) = streams.get(0) {
- state.fragment_offsets.push(super::FragmentOffset {
- time: timing_info.start_time,
- offset: moof_offset,
- });
- }
-
- state.end_pts = Some(fragment_end_pts);
- state.end_utc_time = max_end_utc_time;
-
- // Update for the start PTS of the next fragment
- gst::info!(
- CAT,
- imp: self,
- "Starting new fragment at {}",
- fragment_end_pts,
- );
- state.fragment_start_pts = Some(fragment_end_pts);
-
- gst::debug!(
- CAT,
- imp: self,
- "Sending force-keyunit events for running time {}",
- fragment_end_pts + settings.fragment_duration,
- );
-
- let fku = gst_video::UpstreamForceKeyUnitEvent::builder()
- .running_time(fragment_end_pts + settings.fragment_duration)
- .all_headers(true)
- .build();
-
- for stream in &state.streams {
- upstream_events.push((stream.sinkpad.clone(), fku.clone()));
- }
-
- // Reset timeout delay now that we've output an actual fragment
- state.timeout_delay = gst::ClockTime::ZERO;
- }
-
- if settings.write_mfra && at_eos {
- match boxes::create_mfra(&streams[0].0, &state.fragment_offsets) {
- Ok(mut mfra) => {
- {
- let mfra = mfra.get_mut().unwrap();
- // mfra is HEADER|DELTA_UNIT like other boxes
- mfra.set_flags(gst::BufferFlags::HEADER | gst::BufferFlags::DELTA_UNIT);
- }
-
- if buffer_list.is_none() {
- buffer_list = Some(gst::BufferList::new_sized(1));
- }
- buffer_list.as_mut().unwrap().get_mut().unwrap().add(mfra);
- }
- Err(err) => {
- gst::error!(CAT, imp: self, "Failed to create mfra box: {}", err);
- }
- }
- }
-
- // TODO: Write edit list at EOS
- // TODO: Rewrite bitrates at EOS
-
- Ok((caps, buffer_list))
- }
-
- fn create_streams(&self, state: &mut State) -> Result<(), gst::FlowError> {
- for pad in self
- .instance()
- .sink_pads()
- .into_iter()
- .map(|pad| pad.downcast::<gst_base::AggregatorPad>().unwrap())
- {
- let caps = match pad.current_caps() {
- Some(caps) => caps,
- None => {
- gst::warning!(CAT, obj: &pad, "Skipping pad without caps");
- continue;
- }
- };
-
- gst::info!(CAT, obj: &pad, "Configuring caps {:?}", caps);
-
- let s = caps.structure(0).unwrap();
-
- let mut intra_only = false;
- match s.name() {
- "video/x-h264" | "video/x-h265" => {
- if !s.has_field_with_type("codec_data", gst::Buffer::static_type()) {
- gst::error!(CAT, obj: &pad, "Received caps without codec_data");
- return Err(gst::FlowError::NotNegotiated);
- }
- }
- "image/jpeg" => {
- intra_only = true;
- }
- "audio/mpeg" => {
- if !s.has_field_with_type("codec_data", gst::Buffer::static_type()) {
- gst::error!(CAT, obj: &pad, "Received caps without codec_data");
- return Err(gst::FlowError::NotNegotiated);
- }
- intra_only = true;
- }
- "audio/x-alaw" | "audio/x-mulaw" => {
- intra_only = true;
- }
- "audio/x-adpcm" => {
- intra_only = true;
- }
- "application/x-onvif-metadata" => {
- intra_only = true;
- }
- _ => unreachable!(),
- }
-
- state.streams.push(Stream {
- sinkpad: pad,
- caps,
- intra_only,
- queued_gops: VecDeque::new(),
- fragment_filled: false,
- dts_offset: None,
- current_position: gst::ClockTime::ZERO,
- current_utc_time: gst::ClockTime::ZERO,
- });
- }
-
- if state.streams.is_empty() {
- gst::error!(CAT, imp: self, "No streams available");
- return Err(gst::FlowError::Error);
- }
-
- // Sort video streams first and then audio streams and then metadata streams, and each group by pad name.
- state.streams.sort_by(|a, b| {
- let order_of_caps = |caps: &gst::CapsRef| {
- let s = caps.structure(0).unwrap();
-
- if s.name().starts_with("video/") {
- 0
- } else if s.name().starts_with("audio/") {
- 1
- } else if s.name().starts_with("application/x-onvif-metadata") {
- 2
- } else {
- unimplemented!();
- }
- };
-
- let st_a = order_of_caps(&a.caps);
- let st_b = order_of_caps(&b.caps);
-
- if st_a == st_b {
- return a.sinkpad.name().cmp(&b.sinkpad.name());
- }
-
- st_a.cmp(&st_b)
- });
-
- Ok(())
- }
-
- fn update_header(
- &self,
- state: &mut State,
- settings: &Settings,
- at_eos: bool,
- ) -> Result<Option<(gst::BufferList, gst::Caps)>, gst::FlowError> {
- let aggregator = self.instance();
- let class = aggregator.class();
- let variant = class.as_ref().variant;
-
- if settings.header_update_mode == super::HeaderUpdateMode::None && at_eos {
- return Ok(None);
- }
-
- assert!(!at_eos || state.streams.iter().all(|s| s.queued_gops.is_empty()));
-
- let duration = if variant == super::Variant::ONVIF {
- state
- .end_utc_time
- .opt_checked_sub(state.start_utc_time)
- .ok()
- .flatten()
- } else {
- state
- .end_pts
- .opt_checked_sub(state.earliest_pts)
- .ok()
- .flatten()
- };
-
- let streams = state
- .streams
- .iter()
- .map(|s| s.caps.clone())
- .collect::<Vec<_>>();
-
- let mut buffer = boxes::create_fmp4_header(super::HeaderConfiguration {
- variant,
- update: at_eos,
- streams: streams.as_slice(),
- write_mehd: settings.write_mehd,
- duration: if at_eos { duration } else { None },
- start_utc_time: state
- .start_utc_time
- .map(|unix| unix.nseconds() / 100 + UNIX_1601_OFFSET * 10_000_000),
- })
- .map_err(|err| {
- gst::error!(CAT, imp: self, "Failed to create FMP4 header: {}", err);
- gst::FlowError::Error
- })?;
-
- {
- let buffer = buffer.get_mut().unwrap();
-
- // No timestamps
-
- // Header is DISCONT|HEADER
- buffer.set_flags(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER);
- }
-
- // Remember stream header for later
- state.stream_header = Some(buffer.clone());
-
- let variant = match variant {
- super::Variant::ISO | super::Variant::DASH | super::Variant::ONVIF => "iso-fragmented",
- super::Variant::CMAF => "cmaf",
- };
- let caps = gst::Caps::builder("video/quicktime")
- .field("variant", variant)
- .field("streamheader", gst::Array::new(&[&buffer]))
- .build();
-
- let mut list = gst::BufferList::new_sized(1);
- {
- let list = list.get_mut().unwrap();
- list.add(buffer);
- }
-
- Ok(Some((list, caps)))
- }
-}
-
-#[glib::object_subclass]
-impl ObjectSubclass for FMP4Mux {
- const NAME: &'static str = "GstFMP4Mux";
- type Type = super::FMP4Mux;
- type ParentType = gst_base::Aggregator;
- type Class = Class;
-}
-
-impl ObjectImpl for FMP4Mux {
- fn properties() -> &'static [glib::ParamSpec] {
- static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
- vec![
- // TODO: Add chunk-duration property separate from fragment-size
- glib::ParamSpecUInt64::builder("fragment-duration")
- .nick("Fragment Duration")
- .blurb("Duration for each FMP4 fragment")
- .default_value(DEFAULT_FRAGMENT_DURATION.nseconds())
- .mutable_ready()
- .build(),
- glib::ParamSpecEnum::builder::<super::HeaderUpdateMode>("header-update-mode", DEFAULT_HEADER_UPDATE_MODE)
- .nick("Header update mode")
- .blurb("Mode for updating the header at the end of the stream")
- .mutable_ready()
- .build(),
- glib::ParamSpecBoolean::builder("write-mfra")
- .nick("Write mfra box")
- .blurb("Write fragment random access box at the end of the stream")
- .default_value(DEFAULT_WRITE_MFRA)
- .mutable_ready()
- .build(),
- glib::ParamSpecBoolean::builder("write-mehd")
- .nick("Write mehd box")
- .blurb("Write movie extends header box with the duration at the end of the stream (needs a header-update-mode enabled)")
- .default_value(DEFAULT_WRITE_MFRA)
- .mutable_ready()
- .build(),
- glib::ParamSpecUInt64::builder("interleave-bytes")
- .nick("Interleave Bytes")
- .blurb("Interleave between streams in bytes")
- .default_value(DEFAULT_INTERLEAVE_BYTES.unwrap_or(0))
- .mutable_ready()
- .build(),
- glib::ParamSpecUInt64::builder("interleave-time")
- .nick("Interleave Time")
- .blurb("Interleave between streams in nanoseconds")
- .default_value(DEFAULT_INTERLEAVE_TIME.map(gst::ClockTime::nseconds).unwrap_or(u64::MAX))
- .mutable_ready()
- .build(),
- ]
- });
-
- &*PROPERTIES
- }
-
- fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
- match pspec.name() {
- "fragment-duration" => {
- let mut settings = self.settings.lock().unwrap();
- let fragment_duration = value.get().expect("type checked upstream");
- if settings.fragment_duration != fragment_duration {
- settings.fragment_duration = fragment_duration;
- drop(settings);
- self.instance().set_latency(fragment_duration, None);
- }
- }
-
- "header-update-mode" => {
- let mut settings = self.settings.lock().unwrap();
- settings.header_update_mode = value.get().expect("type checked upstream");
- }
-
- "write-mfra" => {
- let mut settings = self.settings.lock().unwrap();
- settings.write_mfra = value.get().expect("type checked upstream");
- }
-
- "write-mehd" => {
- let mut settings = self.settings.lock().unwrap();
- settings.write_mehd = value.get().expect("type checked upstream");
- }
-
- "interleave-bytes" => {
- let mut settings = self.settings.lock().unwrap();
- settings.interleave_bytes = match value.get().expect("type checked upstream") {
- 0 => None,
- v => Some(v),
- };
- }
-
- "interleave-time" => {
- let mut settings = self.settings.lock().unwrap();
- settings.interleave_time = match value.get().expect("type checked upstream") {
- Some(gst::ClockTime::ZERO) | None => None,
- v => v,
- };
- }
-
- _ => unimplemented!(),
- }
- }
-
- fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
- match pspec.name() {
- "fragment-duration" => {
- let settings = self.settings.lock().unwrap();
- settings.fragment_duration.to_value()
- }
-
- "header-update-mode" => {
- let settings = self.settings.lock().unwrap();
- settings.header_update_mode.to_value()
- }
-
- "write-mfra" => {
- let settings = self.settings.lock().unwrap();
- settings.write_mfra.to_value()
- }
-
- "write-mehd" => {
- let settings = self.settings.lock().unwrap();
- settings.write_mehd.to_value()
- }
-
- "interleave-bytes" => {
- let settings = self.settings.lock().unwrap();
- settings.interleave_bytes.unwrap_or(0).to_value()
- }
-
- "interleave-time" => {
- let settings = self.settings.lock().unwrap();
- settings.interleave_time.to_value()
- }
-
- _ => unimplemented!(),
- }
- }
-
- fn constructed(&self) {
- self.parent_constructed();
-
- let obj = self.instance();
- let class = obj.class();
- for templ in class.pad_template_list().filter(|templ| {
- templ.presence() == gst::PadPresence::Always
- && templ.direction() == gst::PadDirection::Sink
- }) {
- let sinkpad =
- gst::PadBuilder::<gst_base::AggregatorPad>::from_template(&templ, Some("sink"))
- .flags(gst::PadFlags::ACCEPT_INTERSECT)
- .build();
-
- obj.add_pad(&sinkpad).unwrap();
- }
-
- obj.set_latency(Settings::default().fragment_duration, None);
- }
-}
-
-impl GstObjectImpl for FMP4Mux {}
-
-impl ElementImpl for FMP4Mux {
- fn request_new_pad(
- &self,
- templ: &gst::PadTemplate,
- name: Option<&str>,
- caps: Option<&gst::Caps>,
- ) -> Option<gst::Pad> {
- let state = self.state.lock().unwrap();
- if state.stream_header.is_some() {
- gst::error!(
- CAT,
- imp: self,
- "Can't request new pads after header was generated"
- );
- return None;
- }
-
- self.parent_request_new_pad(templ, name, caps)
- }
-}
-
-impl AggregatorImpl for FMP4Mux {
- fn next_time(&self) -> Option<gst::ClockTime> {
- let state = self.state.lock().unwrap();
- state.fragment_start_pts.opt_add(state.timeout_delay)
- }
-
- fn sink_query(
- &self,
- aggregator_pad: &gst_base::AggregatorPad,
- query: &mut gst::QueryRef,
- ) -> bool {
- use gst::QueryViewMut;
-
- gst::trace!(CAT, obj: aggregator_pad, "Handling query {:?}", query);
-
- match query.view_mut() {
- QueryViewMut::Caps(q) => {
- let allowed_caps = aggregator_pad
- .current_caps()
- .unwrap_or_else(|| aggregator_pad.pad_template_caps());
-
- if let Some(filter_caps) = q.filter() {
- let res = filter_caps
- .intersect_with_mode(&allowed_caps, gst::CapsIntersectMode::First);
- q.set_result(&res);
- } else {
- q.set_result(&allowed_caps);
- }
-
- true
- }
- _ => self.parent_sink_query(aggregator_pad, query),
- }
- }
-
- fn sink_event_pre_queue(
- &self,
- aggregator_pad: &gst_base::AggregatorPad,
- mut event: gst::Event,
- ) -> Result<gst::FlowSuccess, gst::FlowError> {
- use gst::EventView;
-
- gst::trace!(CAT, obj: aggregator_pad, "Handling event {:?}", event);
-
- match event.view() {
- EventView::Segment(ev) => {
- if ev.segment().format() != gst::Format::Time {
- gst::warning!(
- CAT,
- obj: aggregator_pad,
- "Received non-TIME segment, replacing with default TIME segment"
- );
- let segment = gst::FormattedSegment::<gst::ClockTime>::new();
- event = gst::event::Segment::builder(&segment)
- .seqnum(event.seqnum())
- .build();
- }
- self.parent_sink_event_pre_queue(aggregator_pad, event)
- }
- _ => self.parent_sink_event_pre_queue(aggregator_pad, event),
- }
- }
-
- fn sink_event(&self, aggregator_pad: &gst_base::AggregatorPad, event: gst::Event) -> bool {
- use gst::EventView;
-
- gst::trace!(CAT, obj: aggregator_pad, "Handling event {:?}", event);
-
- match event.view() {
- EventView::Segment(ev) => {
- // Already fixed-up above to always be a TIME segment
- let segment = ev
- .segment()
- .clone()
- .downcast::<gst::ClockTime>()
- .expect("non-TIME segment");
- gst::info!(CAT, obj: aggregator_pad, "Received segment {:?}", segment);
-
- // Only forward the segment event verbatim if this is a single stream variant.
- // Otherwise we have to produce a default segment and re-timestamp all buffers
- // with their running time.
- let aggregator = self.instance();
- let class = aggregator.class();
- if class.as_ref().variant.is_single_stream() {
- aggregator.update_segment(&segment);
- }
-
- self.parent_sink_event(aggregator_pad, event)
- }
- EventView::Tag(_ev) => {
- // TODO: Maybe store for putting into the headers of the next fragment?
-
- self.parent_sink_event(aggregator_pad, event)
- }
- _ => self.parent_sink_event(aggregator_pad, event),
- }
- }
-
- fn src_query(&self, query: &mut gst::QueryRef) -> bool {
- use gst::QueryViewMut;
-
- gst::trace!(CAT, imp: self, "Handling query {:?}", query);
-
- match query.view_mut() {
- QueryViewMut::Seeking(q) => {
- // We can't really handle seeking, it would break everything
- q.set(false, gst::ClockTime::ZERO, gst::ClockTime::NONE);
- true
- }
- _ => self.parent_src_query(query),
- }
- }
-
- fn src_event(&self, event: gst::Event) -> bool {
- use gst::EventView;
-
- gst::trace!(CAT, imp: self, "Handling event {:?}", event);
-
- match event.view() {
- EventView::Seek(_ev) => false,
- _ => self.parent_src_event(event),
- }
- }
-
- fn flush(&self) -> Result<gst::FlowSuccess, gst::FlowError> {
- self.parent_flush()?;
-
- let mut state = self.state.lock().unwrap();
-
- for stream in &mut state.streams {
- stream.queued_gops.clear();
- stream.dts_offset = None;
- stream.current_position = gst::ClockTime::ZERO;
- stream.current_utc_time = gst::ClockTime::ZERO;
- stream.fragment_filled = false;
- }
-
- state.current_offset = 0;
- state.fragment_offsets.clear();
-
- Ok(gst::FlowSuccess::Ok)
- }
-
- fn stop(&self) -> Result<(), gst::ErrorMessage> {
- gst::trace!(CAT, imp: self, "Stopping");
-
- let _ = self.parent_stop();
-
- *self.state.lock().unwrap() = State::default();
-
- Ok(())
- }
-
- fn start(&self) -> Result<(), gst::ErrorMessage> {
- gst::trace!(CAT, imp: self, "Starting");
-
- self.parent_start()?;
-
- // For non-single-stream variants configure a default segment that allows for negative
- // DTS so that we can correctly re-timestamp buffers with their running times.
- let aggregator = self.instance();
- let class = aggregator.class();
- if !class.as_ref().variant.is_single_stream() {
- let mut segment = gst::FormattedSegment::<gst::ClockTime>::new();
- segment.set_start(SEGMENT_OFFSET);
- segment.set_position(SEGMENT_OFFSET);
- aggregator.update_segment(&segment);
- }
-
- *self.state.lock().unwrap() = State::default();
-
- Ok(())
- }
-
- fn negotiate(&self) -> bool {
- true
- }
-
- fn aggregate(&self, timeout: bool) -> Result<gst::FlowSuccess, gst::FlowError> {
- let settings = self.settings.lock().unwrap().clone();
-
- let mut upstream_events = vec![];
-
- let all_eos;
- let (caps, buffers) = {
- let mut state = self.state.lock().unwrap();
-
- // Create streams
- if state.streams.is_empty() {
- self.create_streams(&mut state)?;
- }
-
- // Queue buffers from all streams that are not filled for the current fragment yet
- //
- // Always take a buffer from the stream with the earliest queued buffer to keep the
- // fill-level at all sinkpads in sync.
- let fragment_start_pts = state.fragment_start_pts;
-
- while let Some((idx, stream)) = self.find_earliest_stream(&mut state, timeout)? {
- // Can only happen if the stream was flushed in the meantime
- let buffer = match stream.sinkpad.pop_buffer() {
- Some(buffer) => buffer,
- None => continue,
- };
-
- // Can only happen if the stream was flushed in the meantime
- let segment = match stream
- .sinkpad
- .segment()
- .clone()
- .downcast::<gst::ClockTime>()
- .ok()
- {
- Some(segment) => segment,
- None => {
- gst::error!(CAT, obj: &stream.sinkpad, "Got buffer before segment");
- return Err(gst::FlowError::Error);
- }
- };
-
- // Queue up the buffer and update GOP tracking state
- self.queue_gops(idx, stream, &segment, buffer)?;
-
- // Check if this stream is filled enough now.
- if let Some((queued_end_pts, fragment_start_pts)) = Option::zip(
- stream
- .queued_gops
- .iter()
- .find(|gop| gop.final_end_pts)
- .map(|gop| gop.end_pts),
- fragment_start_pts,
- ) {
- if queued_end_pts.saturating_sub(fragment_start_pts)
- >= settings.fragment_duration
- {
- gst::debug!(CAT, obj: &stream.sinkpad, "Stream queued enough data for this fragment");
- stream.fragment_filled = true;
- }
- }
- }
-
- // Calculate the earliest PTS after queueing input if we can now.
- if state.earliest_pts.is_none() {
- let mut earliest_pts = None;
-
- for stream in &state.streams {
- let stream_earliest_pts = match stream.queued_gops.back() {
- None => {
- earliest_pts = None;
- break;
- }
- Some(oldest_gop) => {
- if !timeout && !oldest_gop.final_earliest_pts {
- earliest_pts = None;
- break;
- }
-
- oldest_gop.earliest_pts
- }
- };
-
- if earliest_pts.opt_gt(stream_earliest_pts).unwrap_or(true) {
- earliest_pts = Some(stream_earliest_pts);
- }
- }
-
- if let Some(earliest_pts) = earliest_pts {
- gst::info!(CAT, imp: self, "Got earliest PTS {}", earliest_pts);
- state.earliest_pts = Some(earliest_pts);
- state.fragment_start_pts = Some(earliest_pts);
-
- gst::debug!(
- CAT,
- imp: self,
- "Sending first force-keyunit event for running time {}",
- earliest_pts + settings.fragment_duration,
- );
-
- let fku = gst_video::UpstreamForceKeyUnitEvent::builder()
- .running_time(earliest_pts + settings.fragment_duration)
- .all_headers(true)
- .build();
-
- for stream in &mut state.streams {
- upstream_events.push((stream.sinkpad.clone(), fku.clone()));
-
- // Check if this stream is filled enough now.
- if let Some(queued_end_pts) = stream
- .queued_gops
- .iter()
- .find(|gop| gop.final_end_pts)
- .map(|gop| gop.end_pts)
- {
- if queued_end_pts.saturating_sub(earliest_pts)
- >= settings.fragment_duration
- {
- gst::debug!(CAT, obj: &stream.sinkpad, "Stream queued enough data for this fragment");
- stream.fragment_filled = true;
- }
- }
- }
- }
- }
-
- all_eos = state.streams.iter().all(|stream| stream.sinkpad.is_eos());
- if all_eos {
- gst::debug!(CAT, imp: self, "All streams are EOS now");
- }
-
- // If enough GOPs were queued, drain and create the output fragment
- match self.drain(
- &mut state,
- &settings,
- timeout,
- all_eos,
- &mut upstream_events,
- ) {
- Ok(res) => res,
- Err(gst_base::AGGREGATOR_FLOW_NEED_DATA) => {
- gst::element_imp_warning!(
- self,
- gst::StreamError::Format,
- ["Longer GOPs than fragment duration"]
- );
- state.timeout_delay += 1.seconds();
-
- drop(state);
- for (sinkpad, event) in upstream_events {
- sinkpad.push_event(event);
- }
- return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA);
- }
- Err(err) => return Err(err),
- }
- };
-
- for (sinkpad, event) in upstream_events {
- sinkpad.push_event(event);
- }
-
- if let Some(caps) = caps {
- gst::debug!(CAT, imp: self, "Setting caps on source pad: {:?}", caps);
- self.instance().set_src_caps(&caps);
- }
-
- if let Some(buffers) = buffers {
- gst::trace!(CAT, imp: self, "Pushing buffer list {:?}", buffers);
- self.instance().finish_buffer_list(buffers)?;
- }
-
- if all_eos {
- gst::debug!(CAT, imp: self, "Doing EOS handling");
-
- if settings.header_update_mode != super::HeaderUpdateMode::None {
- let updated_header =
- self.update_header(&mut self.state.lock().unwrap(), &settings, true);
- match updated_header {
- Ok(Some((buffer_list, caps))) => {
- match settings.header_update_mode {
- super::HeaderUpdateMode::None => unreachable!(),
- super::HeaderUpdateMode::Rewrite => {
- let mut q = gst::query::Seeking::new(gst::Format::Bytes);
- if self.instance().src_pad().peer_query(&mut q) && q.result().0 {
- let aggregator = self.instance();
-
- aggregator.set_src_caps(&caps);
-
- // Seek to the beginning with a default bytes segment
- aggregator
- .update_segment(
- &gst::FormattedSegment::<gst::format::Bytes>::new(),
- );
-
- if let Err(err) = aggregator.finish_buffer_list(buffer_list) {
- gst::error!(
- CAT,
- imp: self,
- "Failed pushing updated header buffer downstream: {:?}",
- err,
- );
- }
- } else {
- gst::error!(
- CAT,
- imp: self,
- "Can't rewrite header because downstream is not seekable"
- );
- }
- }
- super::HeaderUpdateMode::Update => {
- let aggregator = self.instance();
-
- aggregator.set_src_caps(&caps);
- if let Err(err) = aggregator.finish_buffer_list(buffer_list) {
- gst::error!(
- CAT,
- imp: self,
- "Failed pushing updated header buffer downstream: {:?}",
- err,
- );
- }
- }
- }
- }
- Ok(None) => {}
- Err(err) => {
- gst::error!(
- CAT,
- imp: self,
- "Failed to generate updated header: {:?}",
- err
- );
- }
- }
- }
-
- // Need to output new headers if started again after EOS
- self.state.lock().unwrap().sent_headers = false;
-
- Err(gst::FlowError::Eos)
- } else {
- Ok(gst::FlowSuccess::Ok)
- }
- }
-}
-
-#[repr(C)]
-pub(crate) struct Class {
- parent: gst_base::ffi::GstAggregatorClass,
- variant: super::Variant,
-}
-
-unsafe impl ClassStruct for Class {
- type Type = FMP4Mux;
-}
-
-impl std::ops::Deref for Class {
- type Target = glib::Class<gst_base::Aggregator>;
-
- fn deref(&self) -> &Self::Target {
- unsafe { &*(&self.parent as *const _ as *const _) }
- }
-}
-
-unsafe impl<T: FMP4MuxImpl> IsSubclassable<T> for super::FMP4Mux {
- fn class_init(class: &mut glib::Class<Self>) {
- Self::parent_class_init::<T>(class);
-
- let class = class.as_mut();
- class.variant = T::VARIANT;
- }
-}
-
-pub(crate) trait FMP4MuxImpl: AggregatorImpl {
- const VARIANT: super::Variant;
-}
-
-#[derive(Default)]
-pub(crate) struct ISOFMP4Mux;
-
-#[glib::object_subclass]
-impl ObjectSubclass for ISOFMP4Mux {
- const NAME: &'static str = "GstISOFMP4Mux";
- type Type = super::ISOFMP4Mux;
- type ParentType = super::FMP4Mux;
-}
-
-impl ObjectImpl for ISOFMP4Mux {}
-
-impl GstObjectImpl for ISOFMP4Mux {}
-
-impl ElementImpl for ISOFMP4Mux {
- fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
- static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
- gst::subclass::ElementMetadata::new(
- "ISOFMP4Mux",
- "Codec/Muxer",
- "ISO fragmented MP4 muxer",
- "Sebastian Dröge <sebastian@centricular.com>",
- )
- });
-
- Some(&*ELEMENT_METADATA)
- }
-
- fn pad_templates() -> &'static [gst::PadTemplate] {
- static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
- let src_pad_template = gst::PadTemplate::new(
- "src",
- gst::PadDirection::Src,
- gst::PadPresence::Always,
- &gst::Caps::builder("video/quicktime")
- .field("variant", "iso-fragmented")
- .build(),
- )
- .unwrap();
-
- let sink_pad_template = gst::PadTemplate::new(
- "sink_%u",
- gst::PadDirection::Sink,
- gst::PadPresence::Request,
- &[
- gst::Structure::builder("video/x-h264")
- .field("stream-format", gst::List::new(["avc", "avc3"]))
- .field("alignment", "au")
- .field("width", gst::IntRange::new(1, u16::MAX as i32))
- .field("height", gst::IntRange::new(1, u16::MAX as i32))
- .build(),
- gst::Structure::builder("video/x-h265")
- .field("stream-format", gst::List::new(["hvc1", "hev1"]))
- .field("alignment", "au")
- .field("width", gst::IntRange::new(1, u16::MAX as i32))
- .field("height", gst::IntRange::new(1, u16::MAX as i32))
- .build(),
- gst::Structure::builder("audio/mpeg")
- .field("mpegversion", 4i32)
- .field("stream-format", "raw")
- .field("channels", gst::IntRange::new(1, u16::MAX as i32))
- .field("rate", gst::IntRange::new(1, i32::MAX))
- .build(),
- ]
- .into_iter()
- .collect::<gst::Caps>(),
- )
- .unwrap();
-
- vec![src_pad_template, sink_pad_template]
- });
-
- PAD_TEMPLATES.as_ref()
- }
-}
-
-impl AggregatorImpl for ISOFMP4Mux {}
-
-impl FMP4MuxImpl for ISOFMP4Mux {
- const VARIANT: super::Variant = super::Variant::ISO;
-}
-
-#[derive(Default)]
-pub(crate) struct CMAFMux;
-
-#[glib::object_subclass]
-impl ObjectSubclass for CMAFMux {
- const NAME: &'static str = "GstCMAFMux";
- type Type = super::CMAFMux;
- type ParentType = super::FMP4Mux;
-}
-
-impl ObjectImpl for CMAFMux {}
-
-impl GstObjectImpl for CMAFMux {}
-
-impl ElementImpl for CMAFMux {
- fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
- static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
- gst::subclass::ElementMetadata::new(
- "CMAFMux",
- "Codec/Muxer",
- "CMAF fragmented MP4 muxer",
- "Sebastian Dröge <sebastian@centricular.com>",
- )
- });
-
- Some(&*ELEMENT_METADATA)
- }
-
- fn pad_templates() -> &'static [gst::PadTemplate] {
- static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
- let src_pad_template = gst::PadTemplate::new(
- "src",
- gst::PadDirection::Src,
- gst::PadPresence::Always,
- &gst::Caps::builder("video/quicktime")
- .field("variant", "cmaf")
- .build(),
- )
- .unwrap();
-
- let sink_pad_template = gst::PadTemplate::new(
- "sink",
- gst::PadDirection::Sink,
- gst::PadPresence::Always,
- &[
- gst::Structure::builder("video/x-h264")
- .field("stream-format", gst::List::new(["avc", "avc3"]))
- .field("alignment", "au")
- .field("width", gst::IntRange::new(1, u16::MAX as i32))
- .field("height", gst::IntRange::new(1, u16::MAX as i32))
- .build(),
- gst::Structure::builder("video/x-h265")
- .field("stream-format", gst::List::new(["hvc1", "hev1"]))
- .field("alignment", "au")
- .field("width", gst::IntRange::new(1, u16::MAX as i32))
- .field("height", gst::IntRange::new(1, u16::MAX as i32))
- .build(),
- gst::Structure::builder("audio/mpeg")
- .field("mpegversion", 4i32)
- .field("stream-format", "raw")
- .field("channels", gst::IntRange::new(1, u16::MAX as i32))
- .field("rate", gst::IntRange::new(1, i32::MAX))
- .build(),
- ]
- .into_iter()
- .collect::<gst::Caps>(),
- )
- .unwrap();
-
- vec![src_pad_template, sink_pad_template]
- });
-
- PAD_TEMPLATES.as_ref()
- }
-}
-
-impl AggregatorImpl for CMAFMux {}
-
-impl FMP4MuxImpl for CMAFMux {
- const VARIANT: super::Variant = super::Variant::CMAF;
-}
-
-#[derive(Default)]
-pub(crate) struct DASHMP4Mux;
-
-#[glib::object_subclass]
-impl ObjectSubclass for DASHMP4Mux {
- const NAME: &'static str = "GstDASHMP4Mux";
- type Type = super::DASHMP4Mux;
- type ParentType = super::FMP4Mux;
-}
-
-impl ObjectImpl for DASHMP4Mux {}
-
-impl GstObjectImpl for DASHMP4Mux {}
-
-impl ElementImpl for DASHMP4Mux {
- fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
- static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
- gst::subclass::ElementMetadata::new(
- "DASHMP4Mux",
- "Codec/Muxer",
- "DASH fragmented MP4 muxer",
- "Sebastian Dröge <sebastian@centricular.com>",
- )
- });
-
- Some(&*ELEMENT_METADATA)
- }
-
- fn pad_templates() -> &'static [gst::PadTemplate] {
- static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
- let src_pad_template = gst::PadTemplate::new(
- "src",
- gst::PadDirection::Src,
- gst::PadPresence::Always,
- &gst::Caps::builder("video/quicktime")
- .field("variant", "iso-fragmented")
- .build(),
- )
- .unwrap();
-
- let sink_pad_template = gst::PadTemplate::new(
- "sink",
- gst::PadDirection::Sink,
- gst::PadPresence::Always,
- &[
- gst::Structure::builder("video/x-h264")
- .field("stream-format", gst::List::new(&[&"avc", &"avc3"]))
- .field("alignment", "au")
- .field("width", gst::IntRange::<i32>::new(1, u16::MAX as i32))
- .field("height", gst::IntRange::<i32>::new(1, u16::MAX as i32))
- .build(),
- gst::Structure::builder("video/x-h265")
- .field("stream-format", gst::List::new(&[&"hvc1", &"hev1"]))
- .field("alignment", "au")
- .field("width", gst::IntRange::<i32>::new(1, u16::MAX as i32))
- .field("height", gst::IntRange::<i32>::new(1, u16::MAX as i32))
- .build(),
- gst::Structure::builder("audio/mpeg")
- .field("mpegversion", 4i32)
- .field("stream-format", "raw")
- .field("channels", gst::IntRange::<i32>::new(1, u16::MAX as i32))
- .field("rate", gst::IntRange::<i32>::new(1, i32::MAX))
- .build(),
- ]
- .into_iter()
- .collect::<gst::Caps>(),
- )
- .unwrap();
-
- vec![src_pad_template, sink_pad_template]
- });
-
- PAD_TEMPLATES.as_ref()
- }
-}
-
-impl AggregatorImpl for DASHMP4Mux {}
-
-impl FMP4MuxImpl for DASHMP4Mux {
- const VARIANT: super::Variant = super::Variant::DASH;
-}
-
-#[derive(Default)]
-pub(crate) struct ONVIFFMP4Mux;
-
-#[glib::object_subclass]
-impl ObjectSubclass for ONVIFFMP4Mux {
- const NAME: &'static str = "GstONVIFFMP4Mux";
- type Type = super::ONVIFFMP4Mux;
- type ParentType = super::FMP4Mux;
-}
-
-impl ObjectImpl for ONVIFFMP4Mux {}
-
-impl GstObjectImpl for ONVIFFMP4Mux {}
-
-impl ElementImpl for ONVIFFMP4Mux {
- fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
- static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
- gst::subclass::ElementMetadata::new(
- "ONVIFFMP4Mux",
- "Codec/Muxer",
- "ONVIF fragmented MP4 muxer",
- "Sebastian Dröge <sebastian@centricular.com>",
- )
- });
-
- Some(&*ELEMENT_METADATA)
- }
-
- fn pad_templates() -> &'static [gst::PadTemplate] {
- static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
- let src_pad_template = gst::PadTemplate::new(
- "src",
- gst::PadDirection::Src,
- gst::PadPresence::Always,
- &gst::Caps::builder("video/quicktime")
- .field("variant", "iso-fragmented")
- .build(),
- )
- .unwrap();
-
- let sink_pad_template = gst::PadTemplate::new(
- "sink_%u",
- gst::PadDirection::Sink,
- gst::PadPresence::Request,
- &[
- gst::Structure::builder("video/x-h264")
- .field("stream-format", gst::List::new(&[&"avc", &"avc3"]))
- .field("alignment", "au")
- .field("width", gst::IntRange::<i32>::new(1, u16::MAX as i32))
- .field("height", gst::IntRange::<i32>::new(1, u16::MAX as i32))
- .build(),
- gst::Structure::builder("video/x-h265")
- .field("stream-format", gst::List::new(&[&"hvc1", &"hev1"]))
- .field("alignment", "au")
- .field("width", gst::IntRange::<i32>::new(1, u16::MAX as i32))
- .field("height", gst::IntRange::<i32>::new(1, u16::MAX as i32))
- .build(),
- gst::Structure::builder("image/jpeg")
- .field("width", gst::IntRange::<i32>::new(1, u16::MAX as i32))
- .field("height", gst::IntRange::<i32>::new(1, u16::MAX as i32))
- .build(),
- gst::Structure::builder("audio/mpeg")
- .field("mpegversion", 4i32)
- .field("stream-format", "raw")
- .field("channels", gst::IntRange::<i32>::new(1, u16::MAX as i32))
- .field("rate", gst::IntRange::<i32>::new(1, i32::MAX))
- .build(),
- gst::Structure::builder("audio/x-alaw")
- .field("channels", gst::IntRange::<i32>::new(1, 2))
- .field("rate", gst::IntRange::<i32>::new(1, i32::MAX))
- .build(),
- gst::Structure::builder("audio/x-mulaw")
- .field("channels", gst::IntRange::<i32>::new(1, 2))
- .field("rate", gst::IntRange::<i32>::new(1, i32::MAX))
- .build(),
- gst::Structure::builder("audio/x-adpcm")
- .field("layout", "g726")
- .field("channels", 1i32)
- .field("rate", 8000i32)
- .field("bitrate", gst::List::new([16000i32, 24000, 32000, 40000]))
- .build(),
- gst::Structure::builder("application/x-onvif-metadata")
- .field("parsed", true)
- .build(),
- ]
- .into_iter()
- .collect::<gst::Caps>(),
- )
- .unwrap();
-
- vec![src_pad_template, sink_pad_template]
- });
-
- PAD_TEMPLATES.as_ref()
- }
-}
-
-impl AggregatorImpl for ONVIFFMP4Mux {}
-
-impl FMP4MuxImpl for ONVIFFMP4Mux {
- const VARIANT: super::Variant = super::Variant::ONVIF;
-}
diff --git a/generic/fmp4/src/fmp4mux/mod.rs b/generic/fmp4/src/fmp4mux/mod.rs
deleted file mode 100644
index 76c2da9c..00000000
--- a/generic/fmp4/src/fmp4mux/mod.rs
+++ /dev/null
@@ -1,146 +0,0 @@
-// Copyright (C) 2021 Sebastian Dröge <sebastian@centricular.com>
-//
-// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
-// If a copy of the MPL was not distributed with this file, You can obtain one at
-// <https://mozilla.org/MPL/2.0/>.
-//
-// SPDX-License-Identifier: MPL-2.0
-
-use gst::glib;
-use gst::prelude::*;
-
-mod boxes;
-mod imp;
-
-glib::wrapper! {
- pub(crate) struct FMP4Mux(ObjectSubclass<imp::FMP4Mux>) @extends gst_base::Aggregator, gst::Element, gst::Object;
-}
-
-glib::wrapper! {
- pub(crate) struct ISOFMP4Mux(ObjectSubclass<imp::ISOFMP4Mux>) @extends FMP4Mux, gst_base::Aggregator, gst::Element, gst::Object;
-}
-
-glib::wrapper! {
- pub(crate) struct CMAFMux(ObjectSubclass<imp::CMAFMux>) @extends FMP4Mux, gst_base::Aggregator, gst::Element, gst::Object;
-}
-
-glib::wrapper! {
- pub(crate) struct DASHMP4Mux(ObjectSubclass<imp::DASHMP4Mux>) @extends FMP4Mux, gst_base::Aggregator, gst::Element, gst::Object;
-}
-
-glib::wrapper! {
- pub(crate) struct ONVIFFMP4Mux(ObjectSubclass<imp::ONVIFFMP4Mux>) @extends FMP4Mux, gst_base::Aggregator, gst::Element, gst::Object;
-}
-
-pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
- FMP4Mux::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty());
- HeaderUpdateMode::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty());
- gst::Element::register(
- Some(plugin),
- "isofmp4mux",
- gst::Rank::Primary,
- ISOFMP4Mux::static_type(),
- )?;
- gst::Element::register(
- Some(plugin),
- "cmafmux",
- gst::Rank::Primary,
- CMAFMux::static_type(),
- )?;
- gst::Element::register(
- Some(plugin),
- "dashmp4mux",
- gst::Rank::Primary,
- DASHMP4Mux::static_type(),
- )?;
- gst::Element::register(
- Some(plugin),
- "onviffmp4mux",
- gst::Rank::Primary,
- ONVIFFMP4Mux::static_type(),
- )?;
-
- Ok(())
-}
-
-#[derive(Debug)]
-pub(crate) struct HeaderConfiguration<'a> {
- variant: Variant,
- update: bool,
- /// First caps must be the video/reference stream. Must be in the order the tracks are going to
- /// be used later for the fragments too.
- streams: &'a [gst::Caps],
- write_mehd: bool,
- duration: Option<gst::ClockTime>,
- /// Start UTC time in ONVIF mode.
- /// Since Jan 1 1601 in 100ns units.
- start_utc_time: Option<u64>,
-}
-
-#[derive(Debug)]
-pub(crate) struct FragmentHeaderConfiguration<'a> {
- variant: Variant,
- sequence_number: u32,
- streams: &'a [(gst::Caps, Option<FragmentTimingInfo>)],
- buffers: &'a [Buffer],
-}
-
-#[derive(Debug)]
-pub(crate) struct FragmentTimingInfo {
- /// Start time of this fragment
- start_time: gst::ClockTime,
- /// Set if this is an intra-only stream
- intra_only: bool,
-}
-
-#[derive(Debug)]
-pub(crate) struct Buffer {
- /// Track index
- idx: usize,
-
- /// Actual buffer
- buffer: gst::Buffer,
-
- /// Timestamp
- timestamp: gst::ClockTime,
-
- /// Sample duration
- duration: gst::ClockTime,
-
- /// Composition time offset
- composition_time_offset: Option<i64>,
-}
-
-#[allow(clippy::upper_case_acronyms)]
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-pub(crate) enum Variant {
- ISO,
- CMAF,
- DASH,
- ONVIF,
-}
-
-impl Variant {
- pub(crate) fn is_single_stream(self) -> bool {
- match self {
- Variant::ISO | Variant::ONVIF => false,
- Variant::CMAF | Variant::DASH => true,
- }
- }
-}
-
-#[derive(Debug)]
-pub(crate) struct FragmentOffset {
- time: gst::ClockTime,
- offset: u64,
-}
-
-#[allow(clippy::upper_case_acronyms)]
-#[derive(Debug, Clone, Copy, PartialEq, Eq, glib::Enum)]
-#[repr(i32)]
-#[enum_type(name = "GstFMP4MuxHeaderUpdateMode")]
-pub(crate) enum HeaderUpdateMode {
- None,
- Rewrite,
- Update,
-}
diff --git a/generic/fmp4/src/lib.rs b/generic/fmp4/src/lib.rs
deleted file mode 100644
index 697c7cbf..00000000
--- a/generic/fmp4/src/lib.rs
+++ /dev/null
@@ -1,34 +0,0 @@
-// Copyright (C) 2021 Sebastian Dröge <sebastian@centricular.com>
-//
-// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
-// If a copy of the MPL was not distributed with this file, You can obtain one at
-// <https://mozilla.org/MPL/2.0/>.
-//
-// SPDX-License-Identifier: MPL-2.0
-#![allow(clippy::non_send_fields_in_send_ty, unused_doc_comments)]
-
-/**
- * plugin-fmp4:
- *
- * Since: plugins-rs-0.8.0
- */
-use gst::glib;
-
-mod fmp4mux;
-
-fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
- fmp4mux::register(plugin)
-}
-
-gst::plugin_define!(
- fmp4,
- env!("CARGO_PKG_DESCRIPTION"),
- plugin_init,
- concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")),
- // FIXME: MPL-2.0 is only allowed since 1.18.3 (as unknown) and 1.20 (as known)
- "MPL",
- env!("CARGO_PKG_NAME"),
- env!("CARGO_PKG_NAME"),
- env!("CARGO_PKG_REPOSITORY"),
- env!("BUILD_REL_DATE")
-);
diff --git a/generic/fmp4/tests/tests.rs b/generic/fmp4/tests/tests.rs
deleted file mode 100644
index 073b6ae0..00000000
--- a/generic/fmp4/tests/tests.rs
+++ /dev/null
@@ -1,1241 +0,0 @@
-//
-// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
-// If a copy of the MPL was not distributed with this file, You can obtain one at
-// <https://mozilla.org/MPL/2.0/>.
-//
-// SPDX-License-Identifier: MPL-2.0
-//
-
-use gst::prelude::*;
-
-fn init() {
- use std::sync::Once;
- static INIT: Once = Once::new();
-
- INIT.call_once(|| {
- gst::init().unwrap();
- gstfmp4::plugin_register_static().unwrap();
- });
-}
-
-fn test_buffer_flags_single_stream(cmaf: bool) {
- let mut h = if cmaf {
- gst_check::Harness::new("cmafmux")
- } else {
- gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src"))
- };
-
- // 5s fragment duration
- h.element()
- .unwrap()
- .set_property("fragment-duration", 5.seconds());
-
- h.set_src_caps(
- gst::Caps::builder("video/x-h264")
- .field("width", 1920i32)
- .field("height", 1080i32)
- .field("framerate", gst::Fraction::new(30, 1))
- .field("stream-format", "avc")
- .field("alignment", "au")
- .field("codec_data", gst::Buffer::with_size(1).unwrap())
- .build(),
- );
- h.play();
-
- let output_offset = if cmaf {
- gst::ClockTime::ZERO
- } else {
- (60 * 60 * 1000).seconds()
- };
-
- // Push 7 buffers of 1s each, 1st and 6 buffer without DELTA_UNIT flag
- for i in 0..7 {
- let mut buffer = gst::Buffer::with_size(1).unwrap();
- {
- let buffer = buffer.get_mut().unwrap();
- buffer.set_pts(i.seconds());
- buffer.set_dts(i.seconds());
- buffer.set_duration(gst::ClockTime::SECOND);
- if i != 0 && i != 5 {
- buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
- }
- }
- assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok));
-
- if i == 2 {
- let ev = loop {
- let ev = h.pull_upstream_event().unwrap();
- if ev.type_() != gst::EventType::Reconfigure
- && ev.type_() != gst::EventType::Latency
- {
- break ev;
- }
- };
-
- assert_eq!(ev.type_(), gst::EventType::CustomUpstream);
- assert_eq!(
- gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(),
- gst_video::UpstreamForceKeyUnitEvent {
- running_time: Some(5.seconds()),
- all_headers: true,
- count: 0
- }
- );
- }
- }
-
- let header = h.pull().unwrap();
- assert_eq!(
- header.flags(),
- gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT
- );
- assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset));
- assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset));
-
- let fragment_header = h.pull().unwrap();
- assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
- assert_eq!(
- fragment_header.pts(),
- Some(gst::ClockTime::ZERO + output_offset)
- );
- assert_eq!(
- fragment_header.dts(),
- Some(gst::ClockTime::ZERO + output_offset)
- );
- assert_eq!(fragment_header.duration(), Some(5.seconds()));
-
- for i in 0..5 {
- let buffer = h.pull().unwrap();
- if i == 4 {
- assert_eq!(
- buffer.flags(),
- gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
- );
- } else {
- assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
- }
- assert_eq!(buffer.pts(), Some(i.seconds() + output_offset));
- assert_eq!(buffer.dts(), Some(i.seconds() + output_offset));
- assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
- }
-
- h.push_event(gst::event::Eos::new());
-
- let fragment_header = h.pull().unwrap();
- assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
- assert_eq!(fragment_header.pts(), Some(5.seconds() + output_offset));
- assert_eq!(fragment_header.dts(), Some(5.seconds() + output_offset));
- assert_eq!(fragment_header.duration(), Some(2.seconds()));
-
- for i in 5..7 {
- let buffer = h.pull().unwrap();
- if i == 6 {
- assert_eq!(
- buffer.flags(),
- gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
- );
- } else {
- assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
- }
- assert_eq!(buffer.pts(), Some(i.seconds() + output_offset));
- assert_eq!(buffer.dts(), Some(i.seconds() + output_offset));
- assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
- }
-
- let ev = h.pull_event().unwrap();
- assert_eq!(ev.type_(), gst::EventType::StreamStart);
- let ev = h.pull_event().unwrap();
- assert_eq!(ev.type_(), gst::EventType::Caps);
- let ev = h.pull_event().unwrap();
- assert_eq!(ev.type_(), gst::EventType::Segment);
- let ev = h.pull_event().unwrap();
- assert_eq!(ev.type_(), gst::EventType::Eos);
-}
-
-#[test]
-fn test_buffer_flags_single_stream_cmaf() {
- init();
-
- test_buffer_flags_single_stream(true);
-}
-
-#[test]
-fn test_buffer_flags_single_stream_iso() {
- init();
-
- test_buffer_flags_single_stream(false);
-}
-
-#[test]
-fn test_buffer_flags_multi_stream() {
- init();
-
- let mut h1 = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src"));
- let mut h2 = gst_check::Harness::with_element(&h1.element().unwrap(), Some("sink_1"), None);
-
- // 5s fragment duration
- h1.element()
- .unwrap()
- .set_property("fragment-duration", 5.seconds());
-
- h1.set_src_caps(
- gst::Caps::builder("video/x-h264")
- .field("width", 1920i32)
- .field("height", 1080i32)
- .field("framerate", gst::Fraction::new(30, 1))
- .field("stream-format", "avc")
- .field("alignment", "au")
- .field("codec_data", gst::Buffer::with_size(1).unwrap())
- .build(),
- );
- h1.play();
-
- h2.set_src_caps(
- gst::Caps::builder("audio/mpeg")
- .field("mpegversion", 4i32)
- .field("channels", 1i32)
- .field("rate", 44100i32)
- .field("stream-format", "raw")
- .field("base-profile", "lc")
- .field("profile", "lc")
- .field("level", "2")
- .field(
- "codec_data",
- gst::Buffer::from_slice([0x12, 0x08, 0x56, 0xe5, 0x00]),
- )
- .build(),
- );
- h2.play();
-
- let output_offset = (60 * 60 * 1000).seconds();
-
- // Push 7 buffers of 1s each, 1st and last buffer without DELTA_UNIT flag
- for i in 0..7 {
- let mut buffer = gst::Buffer::with_size(1).unwrap();
- {
- let buffer = buffer.get_mut().unwrap();
- buffer.set_pts(i.seconds());
- buffer.set_dts(i.seconds());
- buffer.set_duration(gst::ClockTime::SECOND);
- if i != 0 && i != 5 {
- buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
- }
- }
- assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok));
-
- let mut buffer = gst::Buffer::with_size(1).unwrap();
- {
- let buffer = buffer.get_mut().unwrap();
- buffer.set_pts(i.seconds());
- buffer.set_dts(i.seconds());
- buffer.set_duration(gst::ClockTime::SECOND);
- }
- assert_eq!(h2.push(buffer), Ok(gst::FlowSuccess::Ok));
-
- if i == 2 {
- let ev = loop {
- let ev = h1.pull_upstream_event().unwrap();
- if ev.type_() != gst::EventType::Reconfigure
- && ev.type_() != gst::EventType::Latency
- {
- break ev;
- }
- };
-
- assert_eq!(ev.type_(), gst::EventType::CustomUpstream);
- assert_eq!(
- gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(),
- gst_video::UpstreamForceKeyUnitEvent {
- running_time: Some(5.seconds()),
- all_headers: true,
- count: 0
- }
- );
-
- let ev = loop {
- let ev = h2.pull_upstream_event().unwrap();
- if ev.type_() != gst::EventType::Reconfigure
- && ev.type_() != gst::EventType::Latency
- {
- break ev;
- }
- };
-
- assert_eq!(ev.type_(), gst::EventType::CustomUpstream);
- assert_eq!(
- gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(),
- gst_video::UpstreamForceKeyUnitEvent {
- running_time: Some(5.seconds()),
- all_headers: true,
- count: 0
- }
- );
- }
- }
-
- let header = h1.pull().unwrap();
- assert_eq!(
- header.flags(),
- gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT
- );
- assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset));
- assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset));
-
- let fragment_header = h1.pull().unwrap();
- assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
- assert_eq!(
- fragment_header.pts(),
- Some(gst::ClockTime::ZERO + output_offset)
- );
- assert_eq!(
- fragment_header.dts(),
- Some(gst::ClockTime::ZERO + output_offset)
- );
- assert_eq!(fragment_header.duration(), Some(5.seconds()));
-
- for i in 0..5 {
- for j in 0..2 {
- let buffer = h1.pull().unwrap();
- if i == 4 && j == 1 {
- assert_eq!(
- buffer.flags(),
- gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
- );
- } else {
- assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
- }
-
- assert_eq!(buffer.pts(), Some(i.seconds() + output_offset));
-
- if j == 0 {
- assert_eq!(buffer.dts(), Some(i.seconds() + output_offset));
- } else {
- assert!(buffer.dts().is_none());
- }
- assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
- }
- }
-
- h1.push_event(gst::event::Eos::new());
- h2.push_event(gst::event::Eos::new());
-
- let fragment_header = h1.pull().unwrap();
- assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
- assert_eq!(fragment_header.pts(), Some(5.seconds() + output_offset));
- assert_eq!(fragment_header.dts(), Some(5.seconds() + output_offset));
- assert_eq!(fragment_header.duration(), Some(2.seconds()));
-
- for i in 5..7 {
- for j in 0..2 {
- let buffer = h1.pull().unwrap();
- if i == 6 && j == 1 {
- assert_eq!(
- buffer.flags(),
- gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
- );
- } else {
- assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
- }
- assert_eq!(buffer.pts(), Some(i.seconds() + output_offset));
- if j == 0 {
- assert_eq!(buffer.dts(), Some(i.seconds() + output_offset));
- } else {
- assert!(buffer.dts().is_none());
- }
- assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
- }
- }
-
- let ev = h1.pull_event().unwrap();
- assert_eq!(ev.type_(), gst::EventType::StreamStart);
- let ev = h1.pull_event().unwrap();
- assert_eq!(ev.type_(), gst::EventType::Caps);
- let ev = h1.pull_event().unwrap();
- assert_eq!(ev.type_(), gst::EventType::Segment);
- let ev = h1.pull_event().unwrap();
- assert_eq!(ev.type_(), gst::EventType::Eos);
-}
-
-#[test]
-fn test_live_timeout() {
- init();
-
- let mut h1 = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src"));
- let mut h2 = gst_check::Harness::with_element(&h1.element().unwrap(), Some("sink_1"), None);
-
- h1.use_testclock();
-
- // 5s fragment duration
- h1.element()
- .unwrap()
- .set_property("fragment-duration", 5.seconds());
-
- h1.set_src_caps(
- gst::Caps::builder("video/x-h264")
- .field("width", 1920i32)
- .field("height", 1080i32)
- .field("framerate", gst::Fraction::new(30, 1))
- .field("stream-format", "avc")
- .field("alignment", "au")
- .field("codec_data", gst::Buffer::with_size(1).unwrap())
- .build(),
- );
- h1.play();
-
- h2.set_src_caps(
- gst::Caps::builder("audio/mpeg")
- .field("mpegversion", 4i32)
- .field("channels", 1i32)
- .field("rate", 44100i32)
- .field("stream-format", "raw")
- .field("base-profile", "lc")
- .field("profile", "lc")
- .field("level", "2")
- .field(
- "codec_data",
- gst::Buffer::from_slice([0x12, 0x08, 0x56, 0xe5, 0x00]),
- )
- .build(),
- );
- h2.play();
-
- let output_offset = (60 * 60 * 1000).seconds();
-
- // Push 7 buffers of 1s each, 1st and last buffer without DELTA_UNIT flag
- for i in 0..7 {
- let mut buffer = gst::Buffer::with_size(1).unwrap();
- {
- let buffer = buffer.get_mut().unwrap();
- buffer.set_pts(i.seconds());
- buffer.set_dts(i.seconds());
- buffer.set_duration(gst::ClockTime::SECOND);
- if i != 0 && i != 5 {
- buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
- }
- }
- assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok));
-
- // Skip buffer 4th and 6th buffer (end of fragment / stream)
- if i == 4 || i == 6 {
- continue;
- } else {
- let mut buffer = gst::Buffer::with_size(1).unwrap();
- {
- let buffer = buffer.get_mut().unwrap();
- buffer.set_pts(i.seconds());
- buffer.set_dts(i.seconds());
- buffer.set_duration(gst::ClockTime::SECOND);
- }
- assert_eq!(h2.push(buffer), Ok(gst::FlowSuccess::Ok));
- }
-
- if i == 2 {
- let ev = loop {
- let ev = h1.pull_upstream_event().unwrap();
- if ev.type_() != gst::EventType::Reconfigure
- && ev.type_() != gst::EventType::Latency
- {
- break ev;
- }
- };
-
- assert_eq!(ev.type_(), gst::EventType::CustomUpstream);
- assert_eq!(
- gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(),
- gst_video::UpstreamForceKeyUnitEvent {
- running_time: Some(5.seconds()),
- all_headers: true,
- count: 0
- }
- );
-
- let ev = loop {
- let ev = h2.pull_upstream_event().unwrap();
- if ev.type_() != gst::EventType::Reconfigure
- && ev.type_() != gst::EventType::Latency
- {
- break ev;
- }
- };
-
- assert_eq!(ev.type_(), gst::EventType::CustomUpstream);
- assert_eq!(
- gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(),
- gst_video::UpstreamForceKeyUnitEvent {
- running_time: Some(5.seconds()),
- all_headers: true,
- count: 0
- }
- );
- }
- }
-
- // Advance time and crank the clock: this should bring us to the end of the first fragment
- h1.set_time(5.seconds()).unwrap();
- h1.crank_single_clock_wait().unwrap();
-
- let header = h1.pull().unwrap();
- assert_eq!(
- header.flags(),
- gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT
- );
- assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset));
- assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset));
-
- let fragment_header = h1.pull().unwrap();
- assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
- assert_eq!(
- fragment_header.pts(),
- Some(gst::ClockTime::ZERO + output_offset)
- );
- assert_eq!(
- fragment_header.dts(),
- Some(gst::ClockTime::ZERO + output_offset)
- );
- assert_eq!(fragment_header.duration(), Some(5.seconds()));
-
- for i in 0..5 {
- for j in 0..2 {
- // Skip gap events that don't result in buffers
- if j == 1 && i == 4 {
- // Advance time and crank the clock another time. This brings us at the end of the
- // EOS.
- h1.crank_single_clock_wait().unwrap();
- continue;
- }
-
- let buffer = h1.pull().unwrap();
- if i == 4 && j == 0 {
- assert_eq!(
- buffer.flags(),
- gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
- );
- } else if i == 5 && j == 0 {
- assert_eq!(buffer.flags(), gst::BufferFlags::HEADER);
- } else {
- assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
- }
-
- assert_eq!(buffer.pts(), Some(i.seconds() + output_offset));
-
- if j == 0 {
- assert_eq!(buffer.dts(), Some(i.seconds() + output_offset));
- } else {
- assert!(buffer.dts().is_none());
- }
- assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
- }
- }
-
- h1.push_event(gst::event::Eos::new());
- h2.push_event(gst::event::Eos::new());
-
- let fragment_header = h1.pull().unwrap();
- assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
- assert_eq!(fragment_header.pts(), Some(5.seconds() + output_offset));
- assert_eq!(fragment_header.dts(), Some(5.seconds() + output_offset));
- assert_eq!(fragment_header.duration(), Some(2.seconds()));
-
- for i in 5..7 {
- for j in 0..2 {
- // Skip gap events that don't result in buffers
- if j == 1 && i == 6 {
- continue;
- }
-
- let buffer = h1.pull().unwrap();
- if i == 6 && j == 0 {
- assert_eq!(
- buffer.flags(),
- gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
- );
- } else {
- assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
- }
- assert_eq!(buffer.pts(), Some(i.seconds() + output_offset));
- if j == 0 {
- assert_eq!(buffer.dts(), Some(i.seconds() + output_offset));
- } else {
- assert!(buffer.dts().is_none());
- }
- assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
- }
- }
-
- let ev = h1.pull_event().unwrap();
- assert_eq!(ev.type_(), gst::EventType::StreamStart);
- let ev = h1.pull_event().unwrap();
- assert_eq!(ev.type_(), gst::EventType::Caps);
- let ev = h1.pull_event().unwrap();
- assert_eq!(ev.type_(), gst::EventType::Segment);
- let ev = h1.pull_event().unwrap();
- assert_eq!(ev.type_(), gst::EventType::Eos);
-}
-
-#[test]
-fn test_gap_events() {
- init();
-
- let mut h1 = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src"));
- let mut h2 = gst_check::Harness::with_element(&h1.element().unwrap(), Some("sink_1"), None);
-
- h1.use_testclock();
-
- // 5s fragment duration
- h1.element()
- .unwrap()
- .set_property("fragment-duration", 5.seconds());
-
- h1.set_src_caps(
- gst::Caps::builder("video/x-h264")
- .field("width", 1920i32)
- .field("height", 1080i32)
- .field("framerate", gst::Fraction::new(30, 1))
- .field("stream-format", "avc")
- .field("alignment", "au")
- .field("codec_data", gst::Buffer::with_size(1).unwrap())
- .build(),
- );
- h1.play();
-
- h2.set_src_caps(
- gst::Caps::builder("audio/mpeg")
- .field("mpegversion", 4i32)
- .field("channels", 1i32)
- .field("rate", 44100i32)
- .field("stream-format", "raw")
- .field("base-profile", "lc")
- .field("profile", "lc")
- .field("level", "2")
- .field(
- "codec_data",
- gst::Buffer::from_slice([0x12, 0x08, 0x56, 0xe5, 0x00]),
- )
- .build(),
- );
- h2.play();
-
- let output_offset = (60 * 60 * 1000).seconds();
-
- // Push 7 buffers of 1s each, 1st and last buffer without DELTA_UNIT flag
- for i in 0..7 {
- let mut buffer = gst::Buffer::with_size(1).unwrap();
- {
- let buffer = buffer.get_mut().unwrap();
- buffer.set_pts(i.seconds());
- buffer.set_dts(i.seconds());
- buffer.set_duration(gst::ClockTime::SECOND);
- if i != 0 && i != 5 {
- buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
- }
- }
- assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok));
-
- // Replace buffer 3 and 6 with a gap event
- if i == 3 || i == 6 {
- let ev = gst::event::Gap::builder(i.seconds())
- .duration(gst::ClockTime::SECOND)
- .build();
- assert!(h2.push_event(ev));
- } else {
- let mut buffer = gst::Buffer::with_size(1).unwrap();
- {
- let buffer = buffer.get_mut().unwrap();
- buffer.set_pts(i.seconds());
- buffer.set_dts(i.seconds());
- buffer.set_duration(gst::ClockTime::SECOND);
- }
- assert_eq!(h2.push(buffer), Ok(gst::FlowSuccess::Ok));
- }
-
- if i == 2 {
- let ev = loop {
- let ev = h1.pull_upstream_event().unwrap();
- if ev.type_() != gst::EventType::Reconfigure
- && ev.type_() != gst::EventType::Latency
- {
- break ev;
- }
- };
-
- assert_eq!(ev.type_(), gst::EventType::CustomUpstream);
- assert_eq!(
- gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(),
- gst_video::UpstreamForceKeyUnitEvent {
- running_time: Some(5.seconds()),
- all_headers: true,
- count: 0
- }
- );
-
- let ev = loop {
- let ev = h2.pull_upstream_event().unwrap();
- if ev.type_() != gst::EventType::Reconfigure
- && ev.type_() != gst::EventType::Latency
- {
- break ev;
- }
- };
-
- assert_eq!(ev.type_(), gst::EventType::CustomUpstream);
- assert_eq!(
- gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(),
- gst_video::UpstreamForceKeyUnitEvent {
- running_time: Some(5.seconds()),
- all_headers: true,
- count: 0
- }
- );
- }
- }
-
- // Advance time and crank the clock: this should bring us to the end of the first fragment
- h1.set_time(5.seconds()).unwrap();
- h1.crank_single_clock_wait().unwrap();
-
- let header = h1.pull().unwrap();
- assert_eq!(
- header.flags(),
- gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT
- );
- assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset));
- assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset));
-
- let fragment_header = h1.pull().unwrap();
- assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
- assert_eq!(
- fragment_header.pts(),
- Some(gst::ClockTime::ZERO + output_offset)
- );
- assert_eq!(
- fragment_header.dts(),
- Some(gst::ClockTime::ZERO + output_offset)
- );
- assert_eq!(fragment_header.duration(), Some(5.seconds()));
-
- for i in 0..5 {
- for j in 0..2 {
- // Skip gap events that don't result in buffers
- if j == 1 && i == 3 {
- continue;
- }
-
- let buffer = h1.pull().unwrap();
- if i == 4 && j == 1 {
- assert_eq!(
- buffer.flags(),
- gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
- );
- } else {
- assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
- }
-
- assert_eq!(buffer.pts(), Some(i.seconds() + output_offset));
-
- if j == 0 {
- assert_eq!(buffer.dts(), Some(i.seconds() + output_offset));
- } else {
- assert!(buffer.dts().is_none());
- }
- assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
- }
- }
-
- h1.push_event(gst::event::Eos::new());
- h2.push_event(gst::event::Eos::new());
-
- let fragment_header = h1.pull().unwrap();
- assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
- assert_eq!(fragment_header.pts(), Some(5.seconds() + output_offset));
- assert_eq!(fragment_header.dts(), Some(5.seconds() + output_offset));
- assert_eq!(fragment_header.duration(), Some(2.seconds()));
-
- for i in 5..7 {
- for j in 0..2 {
- // Skip gap events that don't result in buffers
- if j == 1 && i == 6 {
- continue;
- }
-
- let buffer = h1.pull().unwrap();
- if i == 6 && j == 0 {
- assert_eq!(
- buffer.flags(),
- gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
- );
- } else {
- assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
- }
- assert_eq!(buffer.pts(), Some(i.seconds() + output_offset));
- if j == 0 {
- assert_eq!(buffer.dts(), Some(i.seconds() + output_offset));
- } else {
- assert!(buffer.dts().is_none());
- }
- assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
- }
- }
-
- let ev = h1.pull_event().unwrap();
- assert_eq!(ev.type_(), gst::EventType::StreamStart);
- let ev = h1.pull_event().unwrap();
- assert_eq!(ev.type_(), gst::EventType::Caps);
- let ev = h1.pull_event().unwrap();
- assert_eq!(ev.type_(), gst::EventType::Segment);
- let ev = h1.pull_event().unwrap();
- assert_eq!(ev.type_(), gst::EventType::Eos);
-}
-
-#[test]
-fn test_single_stream_short_gops() {
- init();
-
- let mut h = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src"));
-
- // 5s fragment duration
- h.element()
- .unwrap()
- .set_property("fragment-duration", 5.seconds());
-
- h.set_src_caps(
- gst::Caps::builder("video/x-h264")
- .field("width", 1920i32)
- .field("height", 1080i32)
- .field("framerate", gst::Fraction::new(30, 1))
- .field("stream-format", "avc")
- .field("alignment", "au")
- .field("codec_data", gst::Buffer::with_size(1).unwrap())
- .build(),
- );
- h.play();
-
- let output_offset = (60 * 60 * 1000).seconds();
-
- // Push 8 buffers of 1s each, 1st, 4th and 7th buffer without DELTA_UNIT flag
- for i in 0..8 {
- let mut buffer = gst::Buffer::with_size(1).unwrap();
- {
- let buffer = buffer.get_mut().unwrap();
- buffer.set_pts(i.seconds());
- buffer.set_dts(i.seconds());
- buffer.set_duration(gst::ClockTime::SECOND);
- if i != 0 && i != 3 && i != 6 {
- buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
- }
- }
- assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok));
-
- if i == 2 || i == 7 {
- let ev = loop {
- let ev = h.pull_upstream_event().unwrap();
- if ev.type_() != gst::EventType::Reconfigure
- && ev.type_() != gst::EventType::Latency
- {
- break ev;
- }
- };
-
- let fku_time = if i == 2 { 5.seconds() } else { 8.seconds() };
-
- assert_eq!(ev.type_(), gst::EventType::CustomUpstream);
- assert_eq!(
- gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(),
- gst_video::UpstreamForceKeyUnitEvent {
- running_time: Some(fku_time),
- all_headers: true,
- count: 0
- }
- );
- }
- }
-
- let header = h.pull().unwrap();
- assert_eq!(
- header.flags(),
- gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT
- );
- assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset));
- assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset));
-
- let fragment_header = h.pull().unwrap();
- assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
- assert_eq!(
- fragment_header.pts(),
- Some(gst::ClockTime::ZERO + output_offset)
- );
- assert_eq!(
- fragment_header.dts(),
- Some(gst::ClockTime::ZERO + output_offset)
- );
- assert_eq!(fragment_header.duration(), Some(3.seconds()));
-
- for i in 0..3 {
- let buffer = h.pull().unwrap();
- if i == 2 {
- assert_eq!(
- buffer.flags(),
- gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
- );
- } else {
- assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
- }
- assert_eq!(buffer.pts(), Some(i.seconds() + output_offset));
- assert_eq!(buffer.dts(), Some(i.seconds() + output_offset));
- assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
- }
-
- h.push_event(gst::event::Eos::new());
-
- let fragment_header = h.pull().unwrap();
- assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
- assert_eq!(fragment_header.pts(), Some(3.seconds() + output_offset));
- assert_eq!(fragment_header.dts(), Some(3.seconds() + output_offset));
- assert_eq!(fragment_header.duration(), Some(5.seconds()));
-
- for i in 3..8 {
- let buffer = h.pull().unwrap();
- if i == 7 {
- assert_eq!(
- buffer.flags(),
- gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
- );
- } else {
- assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
- }
- assert_eq!(buffer.pts(), Some(i.seconds() + output_offset));
- assert_eq!(buffer.dts(), Some(i.seconds() + output_offset));
- assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
- }
-
- let ev = h.pull_event().unwrap();
- assert_eq!(ev.type_(), gst::EventType::StreamStart);
- let ev = h.pull_event().unwrap();
- assert_eq!(ev.type_(), gst::EventType::Caps);
- let ev = h.pull_event().unwrap();
- assert_eq!(ev.type_(), gst::EventType::Segment);
- let ev = h.pull_event().unwrap();
- assert_eq!(ev.type_(), gst::EventType::Eos);
-}
-
-#[test]
-fn test_single_stream_long_gops() {
- init();
-
- let mut h = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src"));
-
- // 5s fragment duration
- h.element()
- .unwrap()
- .set_property("fragment-duration", 5.seconds());
-
- h.set_src_caps(
- gst::Caps::builder("video/x-h264")
- .field("width", 1920i32)
- .field("height", 1080i32)
- .field("framerate", gst::Fraction::new(30, 1))
- .field("stream-format", "avc")
- .field("alignment", "au")
- .field("codec_data", gst::Buffer::with_size(1).unwrap())
- .build(),
- );
- h.play();
-
- let output_offset = (60 * 60 * 1000).seconds();
-
- // Push 10 buffers of 1s each, 1st and 7th buffer without DELTA_UNIT flag
- for i in 0..10 {
- let mut buffer = gst::Buffer::with_size(1).unwrap();
- {
- let buffer = buffer.get_mut().unwrap();
- buffer.set_pts(i.seconds());
- buffer.set_dts(i.seconds());
- buffer.set_duration(gst::ClockTime::SECOND);
- if i != 0 && i != 6 {
- buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
- }
- }
- assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok));
-
- if i == 2 || i == 7 {
- let ev = loop {
- let ev = h.pull_upstream_event().unwrap();
- if ev.type_() != gst::EventType::Reconfigure
- && ev.type_() != gst::EventType::Latency
- {
- break ev;
- }
- };
-
- let fku_time = if i == 2 { 5.seconds() } else { 11.seconds() };
-
- assert_eq!(ev.type_(), gst::EventType::CustomUpstream);
- assert_eq!(
- gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(),
- gst_video::UpstreamForceKeyUnitEvent {
- running_time: Some(fku_time),
- all_headers: true,
- count: 0
- }
- );
- }
- }
-
- let header = h.pull().unwrap();
- assert_eq!(
- header.flags(),
- gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT
- );
- assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset));
- assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset));
-
- let fragment_header = h.pull().unwrap();
- assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
- assert_eq!(
- fragment_header.pts(),
- Some(gst::ClockTime::ZERO + output_offset)
- );
- assert_eq!(
- fragment_header.dts(),
- Some(gst::ClockTime::ZERO + output_offset)
- );
- assert_eq!(fragment_header.duration(), Some(6.seconds()));
-
- for i in 0..6 {
- let buffer = h.pull().unwrap();
- if i == 5 {
- assert_eq!(
- buffer.flags(),
- gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
- );
- } else {
- assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
- }
- assert_eq!(buffer.pts(), Some(i.seconds() + output_offset));
- assert_eq!(buffer.dts(), Some(i.seconds() + output_offset));
- assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
- }
-
- h.push_event(gst::event::Eos::new());
-
- let fragment_header = h.pull().unwrap();
- assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
- assert_eq!(fragment_header.pts(), Some(6.seconds() + output_offset));
- assert_eq!(fragment_header.dts(), Some(6.seconds() + output_offset));
- assert_eq!(fragment_header.duration(), Some(4.seconds()));
-
- for i in 6..10 {
- let buffer = h.pull().unwrap();
- if i == 9 {
- assert_eq!(
- buffer.flags(),
- gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
- );
- } else {
- assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
- }
- assert_eq!(buffer.pts(), Some(i.seconds() + output_offset));
- assert_eq!(buffer.dts(), Some(i.seconds() + output_offset));
- assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
- }
-
- let ev = h.pull_event().unwrap();
- assert_eq!(ev.type_(), gst::EventType::StreamStart);
- let ev = h.pull_event().unwrap();
- assert_eq!(ev.type_(), gst::EventType::Caps);
- let ev = h.pull_event().unwrap();
- assert_eq!(ev.type_(), gst::EventType::Segment);
- let ev = h.pull_event().unwrap();
- assert_eq!(ev.type_(), gst::EventType::Eos);
-}
-
-#[test]
-fn test_buffer_multi_stream_short_gops() {
- init();
-
- let mut h1 = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src"));
- let mut h2 = gst_check::Harness::with_element(&h1.element().unwrap(), Some("sink_1"), None);
-
- // 5s fragment duration
- h1.element()
- .unwrap()
- .set_property("fragment-duration", 5.seconds());
-
- h1.set_src_caps(
- gst::Caps::builder("video/x-h264")
- .field("width", 1920i32)
- .field("height", 1080i32)
- .field("framerate", gst::Fraction::new(30, 1))
- .field("stream-format", "avc")
- .field("alignment", "au")
- .field("codec_data", gst::Buffer::with_size(1).unwrap())
- .build(),
- );
- h1.play();
-
- h2.set_src_caps(
- gst::Caps::builder("audio/mpeg")
- .field("mpegversion", 4i32)
- .field("channels", 1i32)
- .field("rate", 44100i32)
- .field("stream-format", "raw")
- .field("base-profile", "lc")
- .field("profile", "lc")
- .field("level", "2")
- .field(
- "codec_data",
- gst::Buffer::from_slice([0x12, 0x08, 0x56, 0xe5, 0x00]),
- )
- .build(),
- );
- h2.play();
-
- let output_offset = (60 * 60 * 1000).seconds();
-
- // Push 8 buffers of 1s each, 1st, 4th and 7th buffer without DELTA_UNIT flag
- for i in 0..8 {
- let mut buffer = gst::Buffer::with_size(1).unwrap();
- {
- let buffer = buffer.get_mut().unwrap();
- buffer.set_pts(i.seconds());
- buffer.set_dts(i.seconds());
- buffer.set_duration(gst::ClockTime::SECOND);
- if i != 0 && i != 3 && i != 6 {
- buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
- }
- }
- assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok));
-
- let mut buffer = gst::Buffer::with_size(1).unwrap();
- {
- let buffer = buffer.get_mut().unwrap();
- buffer.set_pts(i.seconds());
- buffer.set_dts(i.seconds());
- buffer.set_duration(gst::ClockTime::SECOND);
- }
- assert_eq!(h2.push(buffer), Ok(gst::FlowSuccess::Ok));
-
- if i == 2 || i == 7 {
- let ev = loop {
- let ev = h1.pull_upstream_event().unwrap();
- if ev.type_() != gst::EventType::Reconfigure
- && ev.type_() != gst::EventType::Latency
- {
- break ev;
- }
- };
-
- let fku_time = if i == 2 { 5.seconds() } else { 8.seconds() };
-
- assert_eq!(ev.type_(), gst::EventType::CustomUpstream);
- assert_eq!(
- gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(),
- gst_video::UpstreamForceKeyUnitEvent {
- running_time: Some(fku_time),
- all_headers: true,
- count: 0
- }
- );
-
- let ev = loop {
- let ev = h2.pull_upstream_event().unwrap();
- if ev.type_() != gst::EventType::Reconfigure
- && ev.type_() != gst::EventType::Latency
- {
- break ev;
- }
- };
-
- assert_eq!(ev.type_(), gst::EventType::CustomUpstream);
- assert_eq!(
- gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(),
- gst_video::UpstreamForceKeyUnitEvent {
- running_time: Some(fku_time),
- all_headers: true,
- count: 0
- }
- );
- }
- }
-
- let header = h1.pull().unwrap();
- assert_eq!(
- header.flags(),
- gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT
- );
- assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset));
- assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset));
-
- let fragment_header = h1.pull().unwrap();
- assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
- assert_eq!(
- fragment_header.pts(),
- Some(gst::ClockTime::ZERO + output_offset)
- );
- assert_eq!(
- fragment_header.dts(),
- Some(gst::ClockTime::ZERO + output_offset)
- );
- assert_eq!(fragment_header.duration(), Some(3.seconds()));
-
- for i in 0..3 {
- for j in 0..2 {
- let buffer = h1.pull().unwrap();
- if i == 2 && j == 1 {
- assert_eq!(
- buffer.flags(),
- gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
- );
- } else {
- assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
- }
-
- assert_eq!(buffer.pts(), Some(i.seconds() + output_offset));
-
- if j == 0 {
- assert_eq!(buffer.dts(), Some(i.seconds() + output_offset));
- } else {
- assert!(buffer.dts().is_none());
- }
- assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
- }
- }
-
- h1.push_event(gst::event::Eos::new());
- h2.push_event(gst::event::Eos::new());
-
- let fragment_header = h1.pull().unwrap();
- assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
- assert_eq!(fragment_header.pts(), Some(3.seconds() + output_offset));
- assert_eq!(fragment_header.dts(), Some(3.seconds() + output_offset));
- assert_eq!(fragment_header.duration(), Some(5.seconds()));
-
- for i in 3..8 {
- for j in 0..2 {
- let buffer = h1.pull().unwrap();
- if i == 7 && j == 1 {
- assert_eq!(
- buffer.flags(),
- gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
- );
- } else {
- assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
- }
- assert_eq!(buffer.pts(), Some(i.seconds() + output_offset));
- if j == 0 {
- assert_eq!(buffer.dts(), Some(i.seconds() + output_offset));
- } else {
- assert!(buffer.dts().is_none());
- }
- assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
- }
- }
-
- let ev = h1.pull_event().unwrap();
- assert_eq!(ev.type_(), gst::EventType::StreamStart);
- let ev = h1.pull_event().unwrap();
- assert_eq!(ev.type_(), gst::EventType::Caps);
- let ev = h1.pull_event().unwrap();
- assert_eq!(ev.type_(), gst::EventType::Segment);
- let ev = h1.pull_event().unwrap();
- assert_eq!(ev.type_(), gst::EventType::Eos);
-}