Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/sdroege/gst-plugin-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Dröge <sebastian@centricular.com>2022-05-05 15:09:19 +0300
committerSebastian Dröge <sebastian@centricular.com>2022-05-12 18:31:02 +0300
commitc09b7b9e41b9152ece6707b390e2bb93a1d8f50b (patch)
tree07ad7a425ea167086be565e7505249ba14f44845
parent23cc00ce4b0856be5a2471dd163f746bddebaf8e (diff)
fmp4mux: Port to aggregator as base class as preparation for supporting multiple streams
-rw-r--r--generic/fmp4/Cargo.toml3
-rw-r--r--generic/fmp4/src/fmp4mux/imp.rs789
-rw-r--r--generic/fmp4/src/fmp4mux/mod.rs8
3 files changed, 390 insertions, 410 deletions
diff --git a/generic/fmp4/Cargo.toml b/generic/fmp4/Cargo.toml
index cd2d909b..ce80b4bd 100644
--- a/generic/fmp4/Cargo.toml
+++ b/generic/fmp4/Cargo.toml
@@ -11,6 +11,7 @@ rust-version = "1.57"
[dependencies]
anyhow = "1"
gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
+gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gst-audio = { package = "gstreamer-audio", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gst-video = { package = "gstreamer-video", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
once_cell = "1.0"
@@ -47,4 +48,4 @@ install_subdir = "gstreamer-1.0"
versioning = false
[package.metadata.capi.pkg_config]
-requires_private = "gstreamer-1.0, gstreamer-audio-1.0, gstreamer-video-1.0, gobject-2.0, glib-2.0, gmodule-2.0"
+requires_private = "gstreamer-1.0, gstreamer-base-1.0, gstreamer-audio-1.0, gstreamer-video-1.0, gobject-2.0, glib-2.0, gmodule-2.0"
diff --git a/generic/fmp4/src/fmp4mux/imp.rs b/generic/fmp4/src/fmp4mux/imp.rs
index 96117f70..0b600f34 100644
--- a/generic/fmp4/src/fmp4mux/imp.rs
+++ b/generic/fmp4/src/fmp4mux/imp.rs
@@ -9,6 +9,8 @@
use gst::glib;
use gst::prelude::*;
use gst::subclass::prelude::*;
+use gst_base::prelude::*;
+use gst_base::subclass::prelude::*;
use std::collections::VecDeque;
use std::sync::Mutex;
@@ -72,7 +74,6 @@ struct Gop {
#[derive(Default)]
struct State {
- segment: Option<gst::FormattedSegment<gst::ClockTime>>,
caps: Option<gst::Caps>,
intra_only: bool,
@@ -98,12 +99,11 @@ struct State {
earliest_pts: Option<gst::ClockTime>,
end_pts: Option<gst::ClockTime>,
- headers_since_eos: bool,
+ generated_headers: bool,
}
pub(crate) struct FMP4Mux {
- srcpad: gst::Pad,
- sinkpad: gst::Pad,
+ sinkpad: gst_base::AggregatorPad,
state: Mutex<State>,
settings: Mutex<Settings>,
}
@@ -113,18 +113,11 @@ impl FMP4Mux {
&self,
element: &super::FMP4Mux,
state: &mut State,
+ segment: &gst::FormattedSegment<gst::ClockTime>,
buffer: gst::Buffer,
) -> Result<(), gst::FlowError> {
gst::trace!(CAT, obj: element, "Handling buffer {:?}", buffer);
- let segment = match state.segment {
- Some(ref segment) => segment,
- None => {
- gst::error!(CAT, obj: element, "Got buffer before segment");
- return Err(gst::FlowError::Error);
- }
- };
-
if state.caps.is_none() {
gst::error!(CAT, obj: element, "Got buffer before caps");
return Err(gst::FlowError::NotNegotiated);
@@ -393,10 +386,9 @@ impl FMP4Mux {
element: &super::FMP4Mux,
state: &mut State,
settings: &Settings,
+ segment: &gst::FormattedSegment<gst::ClockTime>,
pts: gst::ClockTime,
) -> Result<Option<gst::Event>, gst::FlowError> {
- let segment = state.segment.as_ref().expect("no segment");
-
// If we never sent a force-keyunit event then wait until the earliest PTS of the first GOP
// is known and send one now.
//
@@ -517,7 +509,7 @@ impl FMP4Mux {
);
let mut fmp4_header = None;
- if !state.headers_since_eos {
+ if !state.generated_headers {
let mut buffer = state.stream_header.as_ref().unwrap().copy();
{
let buffer = buffer.get_mut().unwrap();
@@ -532,7 +524,7 @@ impl FMP4Mux {
fmp4_header = Some(buffer);
state.earliest_pts = Some(earliest_pts);
- state.headers_since_eos = true;
+ state.generated_headers = true;
}
let mut buffers = drain_gops
@@ -719,389 +711,23 @@ impl FMP4Mux {
Ok(Some((list, caps)))
}
-
- fn sink_chain(
- &self,
- _pad: &gst::Pad,
- element: &super::FMP4Mux,
- buffer: gst::Buffer,
- ) -> Result<gst::FlowSuccess, gst::FlowError> {
- let settings = self.settings.lock().unwrap().clone();
-
- let mut upstream_events = vec![];
-
- let buffers = {
- let mut state = self.state.lock().unwrap();
-
- let pts = buffer.pts();
-
- // Queue up the buffer and update GOP tracking state
- self.queue_input(element, &mut state, buffer)?;
-
- // If we have a PTS with this buffer, check if a new force-keyunit event for the next
- // fragment start has to be created
- if let Some(pts) = pts {
- if let Some(event) =
- self.create_force_keyunit_event(element, &mut state, &settings, pts)?
- {
- upstream_events.push(event);
- }
- }
-
- // If enough GOPs were queued, drain and create the output fragment
- self.drain(element, &mut state, &settings, false)?
- };
-
- for event in upstream_events {
- self.sinkpad.push_event(event);
- }
-
- if let Some(buffers) = buffers {
- gst::trace!(CAT, obj: element, "Pushing buffer list {:?}", buffers);
- self.srcpad.push_list(buffers)?;
- }
-
- Ok(gst::FlowSuccess::Ok)
- }
-
- fn sink_event(&self, pad: &gst::Pad, element: &super::FMP4Mux, mut event: gst::Event) -> bool {
- use gst::EventView;
-
- gst::trace!(CAT, obj: pad, "Handling event {:?}", event);
-
- match event.view() {
- EventView::Segment(ev) => {
- let segment = match ev.segment().downcast_ref::<gst::ClockTime>() {
- Some(segment) => {
- gst::info!(CAT, obj: pad, "Received segment {:?}", segment);
- segment.clone()
- }
- None => {
- gst::warning!(
- CAT,
- obj: pad,
- "Received non-TIME segment, replacing with default TIME segment"
- );
- let segment = gst::FormattedSegment::new();
- event = gst::event::Segment::builder(&segment)
- .seqnum(event.seqnum())
- .build();
- segment
- }
- };
-
- self.state.lock().unwrap().segment = Some(segment);
-
- self.srcpad.push_event(event)
- }
- EventView::Caps(ev) => {
- let caps = ev.caps_owned();
-
- gst::info!(CAT, obj: pad, "Received caps {:?}", caps);
- let caps = {
- let settings = self.settings.lock().unwrap().clone();
- let mut state = self.state.lock().unwrap();
-
- let s = caps.structure(0).unwrap();
-
- match s.name() {
- "video/x-h264" | "video/x-h265" => {
- if !s.has_field_with_type("codec_data", gst::Buffer::static_type()) {
- gst::error!(CAT, obj: pad, "Received caps without codec_data");
- return false;
- }
- }
- "audio/mpeg" => {
- if !s.has_field_with_type("codec_data", gst::Buffer::static_type()) {
- gst::error!(CAT, obj: pad, "Received caps without codec_data");
- return false;
- }
- state.intra_only = true;
- }
- _ => unreachable!(),
- }
-
- state.caps = Some(caps);
-
- let (_, caps) = match self.update_header(element, &mut state, &settings, false)
- {
- Ok(Some(res)) => res,
- _ => {
- return false;
- }
- };
-
- caps
- };
-
- self.srcpad.push_event(gst::event::Caps::new(&caps))
- }
- EventView::Tag(_ev) => {
- // TODO: Maybe store for putting into the headers of the next fragment?
-
- pad.event_default(Some(element), event)
- }
- EventView::Gap(_ev) => {
- // TODO: queue up and check if draining is needed now
- // i.e. make the last sample much longer
- true
- }
- EventView::Eos(_ev) => {
- let settings = self.settings.lock().unwrap().clone();
-
- let drained = self.drain(element, &mut self.state.lock().unwrap(), &settings, true);
- let update_header =
- drained.is_ok() && settings.header_update_mode != super::HeaderUpdateMode::None;
-
- match drained {
- Ok(Some(buffers)) => {
- gst::trace!(CAT, obj: element, "Pushing buffer list {:?}", buffers);
-
- if let Err(err) = self.srcpad.push_list(buffers) {
- gst::error!(
- CAT,
- obj: element,
- "Failed pushing EOS buffers downstream: {:?}",
- err,
- );
- }
- }
- Ok(None) => {}
- Err(err) => {
- gst::error!(CAT, obj: element, "Failed draining at EOS: {:?}", err);
- }
- }
-
- if update_header {
- let updated_header = self.update_header(
- element,
- &mut self.state.lock().unwrap(),
- &settings,
- true,
- );
- match updated_header {
- Ok(Some((buffer_list, caps))) => {
- match settings.header_update_mode {
- super::HeaderUpdateMode::None => unreachable!(),
- super::HeaderUpdateMode::Rewrite => {
- let mut q = gst::query::Seeking::new(gst::Format::Bytes);
- if self.srcpad.peer_query(&mut q) && q.result().0 {
- // Seek to the beginning with a default bytes segment
- self.srcpad.push_event(gst::event::Segment::new(
- &gst::FormattedSegment::<gst::format::Bytes>::new(),
- ));
-
- self.srcpad.push_event(gst::event::Caps::new(&caps));
- if let Err(err) = self.srcpad.push_list(buffer_list) {
- gst::error!(
- CAT,
- obj: element,
- "Failed pushing updated header buffer downstream: {:?}",
- err,
- );
- }
- } else {
- gst::error!(CAT, obj: element, "Can't rewrite header because downstream is not seekable");
- }
- }
- super::HeaderUpdateMode::Update => {
- self.srcpad.push_event(gst::event::Caps::new(&caps));
- if let Err(err) = self.srcpad.push_list(buffer_list) {
- gst::error!(
- CAT,
- obj: element,
- "Failed pushing updated header buffer downstream: {:?}",
- err,
- );
- }
- }
- }
- }
- Ok(None) => {}
- Err(err) => {
- gst::error!(
- CAT,
- obj: element,
- "Failed to generate updated header: {:?}",
- err
- );
- }
- }
- }
- self.state.lock().unwrap().headers_since_eos = false;
-
- pad.event_default(Some(element), event)
- }
- EventView::FlushStop(_ev) => {
- let mut state = self.state.lock().unwrap();
-
- state.segment = None;
- state.queued_gops.clear();
- state.queued_duration = gst::ClockTime::ZERO;
- state.dts_offset = None;
- state.last_force_keyunit_time = None;
- state.current_offset = 0;
- state.fragment_offsets.clear();
-
- pad.event_default(Some(element), event)
- }
- _ => pad.event_default(Some(element), event),
- }
- }
-
- fn sink_query(
- &self,
- pad: &gst::Pad,
- element: &super::FMP4Mux,
- query: &mut gst::QueryRef,
- ) -> bool {
- use gst::QueryViewMut;
-
- gst::trace!(CAT, obj: pad, "Handling query {:?}", query);
-
- match query.view_mut() {
- QueryViewMut::Caps(q) => {
- let state = self.state.lock().unwrap();
-
- let allowed_caps = if let Some(ref caps) = state.caps {
- // TODO: Maybe allow codec_data changes and similar?
- caps.clone()
- } else {
- pad.pad_template_caps()
- };
-
- if let Some(filter_caps) = q.filter() {
- let res = filter_caps
- .intersect_with_mode(&allowed_caps, gst::CapsIntersectMode::First);
- q.set_result(&res);
- } else {
- q.set_result(&allowed_caps);
- }
-
- true
- }
- _ => pad.query_default(Some(element), query),
- }
- }
-
- fn src_event(&self, pad: &gst::Pad, element: &super::FMP4Mux, event: gst::Event) -> bool {
- use gst::EventView;
-
- gst::trace!(CAT, obj: pad, "Handling event {:?}", event);
-
- match event.view() {
- EventView::Seek(_ev) => false,
- _ => pad.event_default(Some(element), event),
- }
- }
-
- fn src_query(
- &self,
- pad: &gst::Pad,
- element: &super::FMP4Mux,
- query: &mut gst::QueryRef,
- ) -> bool {
- use gst::QueryViewMut;
-
- gst::trace!(CAT, obj: pad, "Handling query {:?}", query);
-
- match query.view_mut() {
- QueryViewMut::Seeking(q) => {
- // We can't really handle seeking, it would break everything
- q.set(false, gst::ClockTime::ZERO.into(), gst::ClockTime::NONE);
- true
- }
- QueryViewMut::Latency(q) => {
- if !self.sinkpad.peer_query(q.query_mut()) {
- return false;
- }
-
- let settings = self.settings.lock().unwrap();
- let (live, min, max) = q.result();
- gst::info!(
- CAT,
- obj: pad,
- "Upstream latency: live {}, min {}, max {}",
- live,
- min,
- max.display()
- );
- let (min, max) = (
- min + settings.fragment_duration,
- max.opt_add(settings.fragment_duration),
- );
- gst::info!(
- CAT,
- obj: pad,
- "Returning latency: live {}, min {}, max {}",
- live,
- min,
- max.display()
- );
- q.set(live, min, max);
-
- true
- }
- _ => pad.query_default(Some(element), query),
- }
- }
}
#[glib::object_subclass]
impl ObjectSubclass for FMP4Mux {
const NAME: &'static str = "GstFMP4Mux";
type Type = super::FMP4Mux;
- type ParentType = gst::Element;
+ type ParentType = gst_base::Aggregator;
type Class = Class;
fn with_class(klass: &Self::Class) -> Self {
let templ = klass.pad_template("sink").unwrap();
- let sinkpad = gst::Pad::builder_with_template(&templ, Some("sink"))
- .chain_function(|pad, parent, buffer| {
- FMP4Mux::catch_panic_pad_function(
- parent,
- || Err(gst::FlowError::Error),
- |fmp4mux, element| fmp4mux.sink_chain(pad, element, buffer),
- )
- })
- .event_function(|pad, parent, event| {
- FMP4Mux::catch_panic_pad_function(
- parent,
- || false,
- |fmp4mux, element| fmp4mux.sink_event(pad, element, event),
- )
- })
- .query_function(|pad, parent, query| {
- FMP4Mux::catch_panic_pad_function(
- parent,
- || false,
- |fmp4mux, element| fmp4mux.sink_query(pad, element, query),
- )
- })
- .flags(gst::PadFlags::ACCEPT_INTERSECT)
- .build();
-
- let templ = klass.pad_template("src").unwrap();
- let srcpad = gst::Pad::builder_with_template(&templ, Some("src"))
- .event_function(|pad, parent, event| {
- FMP4Mux::catch_panic_pad_function(
- parent,
- || false,
- |fmp4mux, element| fmp4mux.src_event(pad, element, event),
- )
- })
- .query_function(|pad, parent, query| {
- FMP4Mux::catch_panic_pad_function(
- parent,
- || false,
- |fmp4mux, element| fmp4mux.src_query(pad, element, query),
- )
- })
- .flags(gst::PadFlags::FIXED_CAPS | gst::PadFlags::ACCEPT_TEMPLATE)
- .build();
+ let sinkpad =
+ gst::PadBuilder::<gst_base::AggregatorPad>::from_template(&templ, Some("sink"))
+ .flags(gst::PadFlags::ACCEPT_INTERSECT)
+ .build();
Self {
- srcpad,
sinkpad,
settings: Mutex::default(),
state: Mutex::default(),
@@ -1153,7 +779,7 @@ impl ObjectImpl for FMP4Mux {
fn set_property(
&self,
- _obj: &Self::Type,
+ obj: &Self::Type,
_id: usize,
value: &glib::Value,
pspec: &glib::ParamSpec,
@@ -1161,7 +787,12 @@ impl ObjectImpl for FMP4Mux {
match pspec.name() {
"fragment-duration" => {
let mut settings = self.settings.lock().unwrap();
- settings.fragment_duration = value.get().expect("type checked upstream");
+ let fragment_duration = value.get().expect("type checked upstream");
+ if settings.fragment_duration != fragment_duration {
+ settings.fragment_duration = fragment_duration;
+ drop(settings);
+ obj.set_latency(fragment_duration, None);
+ }
}
"header-update-mode" => {
@@ -1213,37 +844,379 @@ impl ObjectImpl for FMP4Mux {
self.parent_constructed(obj);
obj.add_pad(&self.sinkpad).unwrap();
- obj.add_pad(&self.srcpad).unwrap();
+ obj.set_latency(Settings::default().fragment_duration, None);
}
}
impl GstObjectImpl for FMP4Mux {}
impl ElementImpl for FMP4Mux {
- #[allow(clippy::single_match)]
- fn change_state(
+ fn release_pad(&self, _element: &Self::Type, _pad: &gst::Pad) {}
+
+ fn request_new_pad(
&self,
- element: &Self::Type,
- transition: gst::StateChange,
- ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
- gst::trace!(CAT, obj: element, "Changing state {:?}", transition);
+ _element: &Self::Type,
+ _templ: &gst::PadTemplate,
+ _name: Option<String>,
+ _caps: Option<&gst::Caps>,
+ ) -> Option<gst::Pad> {
+ None
+ }
+}
+
+impl AggregatorImpl for FMP4Mux {
+ fn sink_query(
+ &self,
+ aggregator: &Self::Type,
+ aggregator_pad: &gst_base::AggregatorPad,
+ query: &mut gst::QueryRef,
+ ) -> bool {
+ use gst::QueryViewMut;
- let res = self.parent_change_state(element, transition)?;
+ gst::trace!(CAT, obj: aggregator_pad, "Handling query {:?}", query);
+
+ match query.view_mut() {
+ QueryViewMut::Caps(q) => {
+ let state = self.state.lock().unwrap();
+
+ let allowed_caps = if let Some(ref caps) = state.caps {
+ // TODO: Maybe allow codec_data changes and similar?
+ caps.clone()
+ } else {
+ aggregator_pad.pad_template_caps()
+ };
+
+ if let Some(filter_caps) = q.filter() {
+ let res = filter_caps
+ .intersect_with_mode(&allowed_caps, gst::CapsIntersectMode::First);
+ q.set_result(&res);
+ } else {
+ q.set_result(&allowed_caps);
+ }
- match transition {
- gst::StateChange::PausedToReady => {
- *self.state.lock().unwrap() = State::default();
+ true
}
- _ => (),
+ _ => self.parent_sink_query(aggregator, aggregator_pad, query),
}
+ }
+
+ fn sink_event_pre_queue(
+ &self,
+ aggregator: &Self::Type,
+ aggregator_pad: &gst_base::AggregatorPad,
+ mut event: gst::Event,
+ ) -> Result<gst::FlowSuccess, gst::FlowError> {
+ use gst::EventView;
- Ok(res)
+ gst::trace!(CAT, obj: aggregator_pad, "Handling event {:?}", event);
+
+ match event.view() {
+ EventView::Segment(ev) => {
+ if ev.segment().format() != gst::Format::Time {
+ gst::warning!(
+ CAT,
+ obj: aggregator_pad,
+ "Received non-TIME segment, replacing with default TIME segment"
+ );
+ let segment = gst::FormattedSegment::<gst::ClockTime>::new();
+ event = gst::event::Segment::builder(&segment)
+ .seqnum(event.seqnum())
+ .build();
+ }
+ self.parent_sink_event_pre_queue(aggregator, aggregator_pad, event)
+ }
+ _ => self.parent_sink_event_pre_queue(aggregator, aggregator_pad, event),
+ }
+ }
+
+ fn sink_event(
+ &self,
+ aggregator: &Self::Type,
+ aggregator_pad: &gst_base::AggregatorPad,
+ event: gst::Event,
+ ) -> bool {
+ use gst::EventView;
+
+ gst::trace!(CAT, obj: aggregator_pad, "Handling event {:?}", event);
+
+ match event.view() {
+ EventView::Segment(ev) => {
+ // Already fixed-up above to always be a TIME segment
+ let segment = ev
+ .segment()
+ .clone()
+ .downcast::<gst::ClockTime>()
+ .expect("non-TIME segment");
+ gst::info!(CAT, obj: aggregator_pad, "Received segment {:?}", segment);
+ aggregator.update_segment(&segment);
+
+ self.parent_sink_event(aggregator, aggregator_pad, event)
+ }
+ EventView::Caps(ev) => {
+ let caps = ev.caps_owned();
+
+ gst::info!(CAT, obj: aggregator_pad, "Received caps {:?}", caps);
+ let caps = {
+ let settings = self.settings.lock().unwrap().clone();
+ let mut state = self.state.lock().unwrap();
+
+ let s = caps.structure(0).unwrap();
+
+ match s.name() {
+ "video/x-h264" | "video/x-h265" => {
+ if !s.has_field_with_type("codec_data", gst::Buffer::static_type()) {
+ gst::error!(
+ CAT,
+ obj: aggregator_pad,
+ "Received caps without codec_data"
+ );
+ return false;
+ }
+ }
+ "audio/mpeg" => {
+ if !s.has_field_with_type("codec_data", gst::Buffer::static_type()) {
+ gst::error!(
+ CAT,
+ obj: aggregator_pad,
+ "Received caps without codec_data"
+ );
+ return false;
+ }
+ state.intra_only = true;
+ }
+ _ => unreachable!(),
+ }
+
+ state.caps = Some(caps);
+
+ let (_, caps) =
+ match self.update_header(aggregator, &mut state, &settings, false) {
+ Ok(Some(res)) => res,
+ _ => {
+ return false;
+ }
+ };
+
+ caps
+ };
+
+ aggregator.set_src_caps(&caps);
+
+ self.parent_sink_event(aggregator, aggregator_pad, event)
+ }
+ EventView::Tag(_ev) => {
+ // TODO: Maybe store for putting into the headers of the next fragment?
+
+ self.parent_sink_event(aggregator, aggregator_pad, event)
+ }
+ EventView::Gap(_ev) => {
+ // TODO: queue up and check if draining is needed now
+ // i.e. make the last sample much longer
+ true
+ }
+ _ => self.parent_sink_event(aggregator, aggregator_pad, event),
+ }
+ }
+
+ fn src_query(&self, aggregator: &Self::Type, query: &mut gst::QueryRef) -> bool {
+ use gst::QueryViewMut;
+
+ gst::trace!(CAT, obj: aggregator, "Handling query {:?}", query);
+
+ match query.view_mut() {
+ QueryViewMut::Seeking(q) => {
+ // We can't really handle seeking, it would break everything
+ q.set(false, gst::ClockTime::ZERO.into(), gst::ClockTime::NONE);
+ true
+ }
+ _ => self.parent_src_query(aggregator, query),
+ }
+ }
+
+ fn src_event(&self, aggregator: &Self::Type, event: gst::Event) -> bool {
+ use gst::EventView;
+
+ gst::trace!(CAT, obj: aggregator, "Handling event {:?}", event);
+
+ match event.view() {
+ EventView::Seek(_ev) => false,
+ _ => self.parent_src_event(aggregator, event),
+ }
+ }
+
+ fn flush(&self, aggregator: &Self::Type) -> Result<gst::FlowSuccess, gst::FlowError> {
+ self.parent_flush(aggregator)?;
+
+ let mut state = self.state.lock().unwrap();
+
+ state.queued_gops.clear();
+ state.queued_duration = gst::ClockTime::ZERO;
+ state.dts_offset = None;
+ state.last_force_keyunit_time = None;
+ state.current_offset = 0;
+ state.fragment_offsets.clear();
+
+ Ok(gst::FlowSuccess::Ok)
+ }
+
+ fn stop(&self, aggregator: &Self::Type) -> Result<(), gst::ErrorMessage> {
+ gst::trace!(CAT, obj: aggregator, "Stopping");
+
+ let _ = self.parent_stop(aggregator);
+ *self.state.lock().unwrap() = State::default();
+ Ok(())
+ }
+
+ fn start(&self, aggregator: &Self::Type) -> Result<(), gst::ErrorMessage> {
+ gst::trace!(CAT, obj: aggregator, "Starting");
+
+ self.parent_start(aggregator)?;
+ *self.state.lock().unwrap() = State::default();
+ Ok(())
+ }
+
+ fn negotiate(&self, _aggregator: &Self::Type) -> bool {
+ true
+ }
+
+ fn aggregate(
+ &self,
+ aggregator: &Self::Type,
+ _timeout: bool,
+ ) -> Result<gst::FlowSuccess, gst::FlowError> {
+ let settings = self.settings.lock().unwrap().clone();
+
+ let is_eos;
+ let mut upstream_events = vec![];
+
+ let buffers = {
+ let mut state = self.state.lock().unwrap();
+
+ let segment = match self
+ .sinkpad
+ .segment()
+ .clone()
+ .downcast::<gst::ClockTime>()
+ .ok()
+ {
+ Some(segment) => segment,
+ None => {
+ gst::error!(CAT, obj: aggregator, "Got buffer before segment");
+ return Err(gst::FlowError::Error);
+ }
+ };
+
+ let buffer = self.sinkpad.pop_buffer();
+ is_eos = buffer.is_none() && self.sinkpad.is_eos();
+
+ if let Some(buffer) = buffer {
+ let pts = buffer.pts();
+
+ // Queue up the buffer and update GOP tracking state
+ self.queue_input(aggregator, &mut state, &segment, buffer)?;
+
+ // If we have a PTS with this buffer, check if a new force-keyunit event for the next
+ // fragment start has to be created
+ if let Some(pts) = pts {
+ if let Some(event) = self.create_force_keyunit_event(
+ aggregator, &mut state, &settings, &segment, pts,
+ )? {
+ upstream_events.push(event);
+ }
+ }
+ }
+
+ // If enough GOPs were queued, drain and create the output fragment
+ self.drain(aggregator, &mut state, &settings, is_eos)?
+ };
+
+ for event in upstream_events {
+ self.sinkpad.push_event(event);
+ }
+
+ if let Some(buffers) = buffers {
+ gst::trace!(CAT, obj: aggregator, "Pushing buffer list {:?}", buffers);
+ aggregator.finish_buffer_list(buffers)?;
+ }
+
+ if is_eos {
+ if settings.header_update_mode != super::HeaderUpdateMode::None {
+ let updated_header = self.update_header(
+ aggregator,
+ &mut self.state.lock().unwrap(),
+ &settings,
+ true,
+ );
+ match updated_header {
+ Ok(Some((buffer_list, caps))) => {
+ match settings.header_update_mode {
+ super::HeaderUpdateMode::None => unreachable!(),
+ super::HeaderUpdateMode::Rewrite => {
+ let src_pad = aggregator.src_pad();
+ let mut q = gst::query::Seeking::new(gst::Format::Bytes);
+ if src_pad.peer_query(&mut q) && q.result().0 {
+ // Seek to the beginning with a default bytes segment
+ aggregator
+ .update_segment(
+ &gst::FormattedSegment::<gst::format::Bytes>::new(),
+ );
+
+ aggregator.set_src_caps(&caps);
+
+ if let Err(err) = aggregator.finish_buffer_list(buffer_list) {
+ gst::error!(
+ CAT,
+ obj: aggregator,
+ "Failed pushing updated header buffer downstream: {:?}",
+ err,
+ );
+ }
+ } else {
+ gst::error!(
+ CAT,
+ obj: aggregator,
+ "Can't rewrite header because downstream is not seekable"
+ );
+ }
+ }
+ super::HeaderUpdateMode::Update => {
+ aggregator.set_src_caps(&caps);
+ if let Err(err) = aggregator.finish_buffer_list(buffer_list) {
+ gst::error!(
+ CAT,
+ obj: aggregator,
+ "Failed pushing updated header buffer downstream: {:?}",
+ err,
+ );
+ }
+ }
+ }
+ }
+ Ok(None) => {}
+ Err(err) => {
+ gst::error!(
+ CAT,
+ obj: aggregator,
+ "Failed to generate updated header: {:?}",
+ err
+ );
+ }
+ }
+ }
+
+ // Need to generate new headers if started again after EOS
+ self.state.lock().unwrap().generated_headers = false;
+
+ Err(gst::FlowError::Eos)
+ } else {
+ Ok(gst::FlowSuccess::Ok)
+ }
}
}
#[repr(C)]
pub(crate) struct Class {
- parent: gst::ffi::GstElementClass,
+ parent: gst_base::ffi::GstAggregatorClass,
variant: super::Variant,
}
@@ -1252,7 +1225,7 @@ unsafe impl ClassStruct for Class {
}
impl std::ops::Deref for Class {
- type Target = glib::Class<gst::Element>;
+ type Target = glib::Class<gst_base::Aggregator>;
fn deref(&self) -> &Self::Target {
unsafe { &*(&self.parent as *const _ as *const _) }
@@ -1268,7 +1241,7 @@ unsafe impl<T: FMP4MuxImpl> IsSubclassable<T> for super::FMP4Mux {
}
}
-pub(crate) trait FMP4MuxImpl: ElementImpl {
+pub(crate) trait FMP4MuxImpl: AggregatorImpl {
const VARIANT: super::Variant;
}
@@ -1348,6 +1321,8 @@ impl ElementImpl for ISOFMP4Mux {
}
}
+impl AggregatorImpl for ISOFMP4Mux {}
+
impl FMP4MuxImpl for ISOFMP4Mux {
const VARIANT: super::Variant = super::Variant::ISO;
}
@@ -1428,6 +1403,8 @@ impl ElementImpl for CMAFMux {
}
}
+impl AggregatorImpl for CMAFMux {}
+
impl FMP4MuxImpl for CMAFMux {
const VARIANT: super::Variant = super::Variant::CMAF;
}
@@ -1508,6 +1485,8 @@ impl ElementImpl for DASHMP4Mux {
}
}
+impl AggregatorImpl for DASHMP4Mux {}
+
impl FMP4MuxImpl for DASHMP4Mux {
const VARIANT: super::Variant = super::Variant::DASH;
}
diff --git a/generic/fmp4/src/fmp4mux/mod.rs b/generic/fmp4/src/fmp4mux/mod.rs
index 567195ff..0311c417 100644
--- a/generic/fmp4/src/fmp4mux/mod.rs
+++ b/generic/fmp4/src/fmp4mux/mod.rs
@@ -13,19 +13,19 @@ mod boxes;
mod imp;
glib::wrapper! {
- pub(crate) struct FMP4Mux(ObjectSubclass<imp::FMP4Mux>) @extends gst::Element, gst::Object;
+ pub(crate) struct FMP4Mux(ObjectSubclass<imp::FMP4Mux>) @extends gst_base::Aggregator, gst::Element, gst::Object;
}
glib::wrapper! {
- pub(crate) struct ISOFMP4Mux(ObjectSubclass<imp::ISOFMP4Mux>) @extends FMP4Mux, gst::Element, gst::Object;
+ pub(crate) struct ISOFMP4Mux(ObjectSubclass<imp::ISOFMP4Mux>) @extends FMP4Mux, gst_base::Aggregator, gst::Element, gst::Object;
}
glib::wrapper! {
- pub(crate) struct CMAFMux(ObjectSubclass<imp::CMAFMux>) @extends FMP4Mux, gst::Element, gst::Object;
+ pub(crate) struct CMAFMux(ObjectSubclass<imp::CMAFMux>) @extends FMP4Mux, gst_base::Aggregator, gst::Element, gst::Object;
}
glib::wrapper! {
- pub(crate) struct DASHMP4Mux(ObjectSubclass<imp::DASHMP4Mux>) @extends FMP4Mux, gst::Element, gst::Object;
+ pub(crate) struct DASHMP4Mux(ObjectSubclass<imp::DASHMP4Mux>) @extends FMP4Mux, gst_base::Aggregator, gst::Element, gst::Object;
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {