diff options
author | Sebastian Dröge <sebastian@centricular.com> | 2022-05-05 15:09:19 +0300 |
---|---|---|
committer | Sebastian Dröge <sebastian@centricular.com> | 2022-05-12 18:31:02 +0300 |
commit | c09b7b9e41b9152ece6707b390e2bb93a1d8f50b (patch) | |
tree | 07ad7a425ea167086be565e7505249ba14f44845 | |
parent | 23cc00ce4b0856be5a2471dd163f746bddebaf8e (diff) |
fmp4mux: Port to aggregator as base class as preparation for supporting multiple streams
-rw-r--r-- | generic/fmp4/Cargo.toml | 3 | ||||
-rw-r--r-- | generic/fmp4/src/fmp4mux/imp.rs | 789 | ||||
-rw-r--r-- | generic/fmp4/src/fmp4mux/mod.rs | 8 |
3 files changed, 390 insertions, 410 deletions
diff --git a/generic/fmp4/Cargo.toml b/generic/fmp4/Cargo.toml index cd2d909b..ce80b4bd 100644 --- a/generic/fmp4/Cargo.toml +++ b/generic/fmp4/Cargo.toml @@ -11,6 +11,7 @@ rust-version = "1.57" [dependencies] anyhow = "1" gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } +gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } gst-audio = { package = "gstreamer-audio", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } gst-video = { package = "gstreamer-video", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } once_cell = "1.0" @@ -47,4 +48,4 @@ install_subdir = "gstreamer-1.0" versioning = false [package.metadata.capi.pkg_config] -requires_private = "gstreamer-1.0, gstreamer-audio-1.0, gstreamer-video-1.0, gobject-2.0, glib-2.0, gmodule-2.0" +requires_private = "gstreamer-1.0, gstreamer-base-1.0, gstreamer-audio-1.0, gstreamer-video-1.0, gobject-2.0, glib-2.0, gmodule-2.0" diff --git a/generic/fmp4/src/fmp4mux/imp.rs b/generic/fmp4/src/fmp4mux/imp.rs index 96117f70..0b600f34 100644 --- a/generic/fmp4/src/fmp4mux/imp.rs +++ b/generic/fmp4/src/fmp4mux/imp.rs @@ -9,6 +9,8 @@ use gst::glib; use gst::prelude::*; use gst::subclass::prelude::*; +use gst_base::prelude::*; +use gst_base::subclass::prelude::*; use std::collections::VecDeque; use std::sync::Mutex; @@ -72,7 +74,6 @@ struct Gop { #[derive(Default)] struct State { - segment: Option<gst::FormattedSegment<gst::ClockTime>>, caps: Option<gst::Caps>, intra_only: bool, @@ -98,12 +99,11 @@ struct State { earliest_pts: Option<gst::ClockTime>, end_pts: Option<gst::ClockTime>, - headers_since_eos: bool, + generated_headers: bool, } pub(crate) struct FMP4Mux { - srcpad: gst::Pad, - sinkpad: gst::Pad, + sinkpad: gst_base::AggregatorPad, state: Mutex<State>, settings: Mutex<Settings>, } @@ -113,18 +113,11 @@ impl FMP4Mux { &self, element: &super::FMP4Mux, state: &mut State, + segment: &gst::FormattedSegment<gst::ClockTime>, buffer: gst::Buffer, ) -> Result<(), gst::FlowError> { gst::trace!(CAT, obj: element, "Handling buffer {:?}", buffer); - let segment = match state.segment { - Some(ref segment) => segment, - None => { - gst::error!(CAT, obj: element, "Got buffer before segment"); - return Err(gst::FlowError::Error); - } - }; - if state.caps.is_none() { gst::error!(CAT, obj: element, "Got buffer before caps"); return Err(gst::FlowError::NotNegotiated); @@ -393,10 +386,9 @@ impl FMP4Mux { element: &super::FMP4Mux, state: &mut State, settings: &Settings, + segment: &gst::FormattedSegment<gst::ClockTime>, pts: gst::ClockTime, ) -> Result<Option<gst::Event>, gst::FlowError> { - let segment = state.segment.as_ref().expect("no segment"); - // If we never sent a force-keyunit event then wait until the earliest PTS of the first GOP // is known and send one now. // @@ -517,7 +509,7 @@ impl FMP4Mux { ); let mut fmp4_header = None; - if !state.headers_since_eos { + if !state.generated_headers { let mut buffer = state.stream_header.as_ref().unwrap().copy(); { let buffer = buffer.get_mut().unwrap(); @@ -532,7 +524,7 @@ impl FMP4Mux { fmp4_header = Some(buffer); state.earliest_pts = Some(earliest_pts); - state.headers_since_eos = true; + state.generated_headers = true; } let mut buffers = drain_gops @@ -719,389 +711,23 @@ impl FMP4Mux { Ok(Some((list, caps))) } - - fn sink_chain( - &self, - _pad: &gst::Pad, - element: &super::FMP4Mux, - buffer: gst::Buffer, - ) -> Result<gst::FlowSuccess, gst::FlowError> { - let settings = self.settings.lock().unwrap().clone(); - - let mut upstream_events = vec![]; - - let buffers = { - let mut state = self.state.lock().unwrap(); - - let pts = buffer.pts(); - - // Queue up the buffer and update GOP tracking state - self.queue_input(element, &mut state, buffer)?; - - // If we have a PTS with this buffer, check if a new force-keyunit event for the next - // fragment start has to be created - if let Some(pts) = pts { - if let Some(event) = - self.create_force_keyunit_event(element, &mut state, &settings, pts)? - { - upstream_events.push(event); - } - } - - // If enough GOPs were queued, drain and create the output fragment - self.drain(element, &mut state, &settings, false)? - }; - - for event in upstream_events { - self.sinkpad.push_event(event); - } - - if let Some(buffers) = buffers { - gst::trace!(CAT, obj: element, "Pushing buffer list {:?}", buffers); - self.srcpad.push_list(buffers)?; - } - - Ok(gst::FlowSuccess::Ok) - } - - fn sink_event(&self, pad: &gst::Pad, element: &super::FMP4Mux, mut event: gst::Event) -> bool { - use gst::EventView; - - gst::trace!(CAT, obj: pad, "Handling event {:?}", event); - - match event.view() { - EventView::Segment(ev) => { - let segment = match ev.segment().downcast_ref::<gst::ClockTime>() { - Some(segment) => { - gst::info!(CAT, obj: pad, "Received segment {:?}", segment); - segment.clone() - } - None => { - gst::warning!( - CAT, - obj: pad, - "Received non-TIME segment, replacing with default TIME segment" - ); - let segment = gst::FormattedSegment::new(); - event = gst::event::Segment::builder(&segment) - .seqnum(event.seqnum()) - .build(); - segment - } - }; - - self.state.lock().unwrap().segment = Some(segment); - - self.srcpad.push_event(event) - } - EventView::Caps(ev) => { - let caps = ev.caps_owned(); - - gst::info!(CAT, obj: pad, "Received caps {:?}", caps); - let caps = { - let settings = self.settings.lock().unwrap().clone(); - let mut state = self.state.lock().unwrap(); - - let s = caps.structure(0).unwrap(); - - match s.name() { - "video/x-h264" | "video/x-h265" => { - if !s.has_field_with_type("codec_data", gst::Buffer::static_type()) { - gst::error!(CAT, obj: pad, "Received caps without codec_data"); - return false; - } - } - "audio/mpeg" => { - if !s.has_field_with_type("codec_data", gst::Buffer::static_type()) { - gst::error!(CAT, obj: pad, "Received caps without codec_data"); - return false; - } - state.intra_only = true; - } - _ => unreachable!(), - } - - state.caps = Some(caps); - - let (_, caps) = match self.update_header(element, &mut state, &settings, false) - { - Ok(Some(res)) => res, - _ => { - return false; - } - }; - - caps - }; - - self.srcpad.push_event(gst::event::Caps::new(&caps)) - } - EventView::Tag(_ev) => { - // TODO: Maybe store for putting into the headers of the next fragment? - - pad.event_default(Some(element), event) - } - EventView::Gap(_ev) => { - // TODO: queue up and check if draining is needed now - // i.e. make the last sample much longer - true - } - EventView::Eos(_ev) => { - let settings = self.settings.lock().unwrap().clone(); - - let drained = self.drain(element, &mut self.state.lock().unwrap(), &settings, true); - let update_header = - drained.is_ok() && settings.header_update_mode != super::HeaderUpdateMode::None; - - match drained { - Ok(Some(buffers)) => { - gst::trace!(CAT, obj: element, "Pushing buffer list {:?}", buffers); - - if let Err(err) = self.srcpad.push_list(buffers) { - gst::error!( - CAT, - obj: element, - "Failed pushing EOS buffers downstream: {:?}", - err, - ); - } - } - Ok(None) => {} - Err(err) => { - gst::error!(CAT, obj: element, "Failed draining at EOS: {:?}", err); - } - } - - if update_header { - let updated_header = self.update_header( - element, - &mut self.state.lock().unwrap(), - &settings, - true, - ); - match updated_header { - Ok(Some((buffer_list, caps))) => { - match settings.header_update_mode { - super::HeaderUpdateMode::None => unreachable!(), - super::HeaderUpdateMode::Rewrite => { - let mut q = gst::query::Seeking::new(gst::Format::Bytes); - if self.srcpad.peer_query(&mut q) && q.result().0 { - // Seek to the beginning with a default bytes segment - self.srcpad.push_event(gst::event::Segment::new( - &gst::FormattedSegment::<gst::format::Bytes>::new(), - )); - - self.srcpad.push_event(gst::event::Caps::new(&caps)); - if let Err(err) = self.srcpad.push_list(buffer_list) { - gst::error!( - CAT, - obj: element, - "Failed pushing updated header buffer downstream: {:?}", - err, - ); - } - } else { - gst::error!(CAT, obj: element, "Can't rewrite header because downstream is not seekable"); - } - } - super::HeaderUpdateMode::Update => { - self.srcpad.push_event(gst::event::Caps::new(&caps)); - if let Err(err) = self.srcpad.push_list(buffer_list) { - gst::error!( - CAT, - obj: element, - "Failed pushing updated header buffer downstream: {:?}", - err, - ); - } - } - } - } - Ok(None) => {} - Err(err) => { - gst::error!( - CAT, - obj: element, - "Failed to generate updated header: {:?}", - err - ); - } - } - } - self.state.lock().unwrap().headers_since_eos = false; - - pad.event_default(Some(element), event) - } - EventView::FlushStop(_ev) => { - let mut state = self.state.lock().unwrap(); - - state.segment = None; - state.queued_gops.clear(); - state.queued_duration = gst::ClockTime::ZERO; - state.dts_offset = None; - state.last_force_keyunit_time = None; - state.current_offset = 0; - state.fragment_offsets.clear(); - - pad.event_default(Some(element), event) - } - _ => pad.event_default(Some(element), event), - } - } - - fn sink_query( - &self, - pad: &gst::Pad, - element: &super::FMP4Mux, - query: &mut gst::QueryRef, - ) -> bool { - use gst::QueryViewMut; - - gst::trace!(CAT, obj: pad, "Handling query {:?}", query); - - match query.view_mut() { - QueryViewMut::Caps(q) => { - let state = self.state.lock().unwrap(); - - let allowed_caps = if let Some(ref caps) = state.caps { - // TODO: Maybe allow codec_data changes and similar? - caps.clone() - } else { - pad.pad_template_caps() - }; - - if let Some(filter_caps) = q.filter() { - let res = filter_caps - .intersect_with_mode(&allowed_caps, gst::CapsIntersectMode::First); - q.set_result(&res); - } else { - q.set_result(&allowed_caps); - } - - true - } - _ => pad.query_default(Some(element), query), - } - } - - fn src_event(&self, pad: &gst::Pad, element: &super::FMP4Mux, event: gst::Event) -> bool { - use gst::EventView; - - gst::trace!(CAT, obj: pad, "Handling event {:?}", event); - - match event.view() { - EventView::Seek(_ev) => false, - _ => pad.event_default(Some(element), event), - } - } - - fn src_query( - &self, - pad: &gst::Pad, - element: &super::FMP4Mux, - query: &mut gst::QueryRef, - ) -> bool { - use gst::QueryViewMut; - - gst::trace!(CAT, obj: pad, "Handling query {:?}", query); - - match query.view_mut() { - QueryViewMut::Seeking(q) => { - // We can't really handle seeking, it would break everything - q.set(false, gst::ClockTime::ZERO.into(), gst::ClockTime::NONE); - true - } - QueryViewMut::Latency(q) => { - if !self.sinkpad.peer_query(q.query_mut()) { - return false; - } - - let settings = self.settings.lock().unwrap(); - let (live, min, max) = q.result(); - gst::info!( - CAT, - obj: pad, - "Upstream latency: live {}, min {}, max {}", - live, - min, - max.display() - ); - let (min, max) = ( - min + settings.fragment_duration, - max.opt_add(settings.fragment_duration), - ); - gst::info!( - CAT, - obj: pad, - "Returning latency: live {}, min {}, max {}", - live, - min, - max.display() - ); - q.set(live, min, max); - - true - } - _ => pad.query_default(Some(element), query), - } - } } #[glib::object_subclass] impl ObjectSubclass for FMP4Mux { const NAME: &'static str = "GstFMP4Mux"; type Type = super::FMP4Mux; - type ParentType = gst::Element; + type ParentType = gst_base::Aggregator; type Class = Class; fn with_class(klass: &Self::Class) -> Self { let templ = klass.pad_template("sink").unwrap(); - let sinkpad = gst::Pad::builder_with_template(&templ, Some("sink")) - .chain_function(|pad, parent, buffer| { - FMP4Mux::catch_panic_pad_function( - parent, - || Err(gst::FlowError::Error), - |fmp4mux, element| fmp4mux.sink_chain(pad, element, buffer), - ) - }) - .event_function(|pad, parent, event| { - FMP4Mux::catch_panic_pad_function( - parent, - || false, - |fmp4mux, element| fmp4mux.sink_event(pad, element, event), - ) - }) - .query_function(|pad, parent, query| { - FMP4Mux::catch_panic_pad_function( - parent, - || false, - |fmp4mux, element| fmp4mux.sink_query(pad, element, query), - ) - }) - .flags(gst::PadFlags::ACCEPT_INTERSECT) - .build(); - - let templ = klass.pad_template("src").unwrap(); - let srcpad = gst::Pad::builder_with_template(&templ, Some("src")) - .event_function(|pad, parent, event| { - FMP4Mux::catch_panic_pad_function( - parent, - || false, - |fmp4mux, element| fmp4mux.src_event(pad, element, event), - ) - }) - .query_function(|pad, parent, query| { - FMP4Mux::catch_panic_pad_function( - parent, - || false, - |fmp4mux, element| fmp4mux.src_query(pad, element, query), - ) - }) - .flags(gst::PadFlags::FIXED_CAPS | gst::PadFlags::ACCEPT_TEMPLATE) - .build(); + let sinkpad = + gst::PadBuilder::<gst_base::AggregatorPad>::from_template(&templ, Some("sink")) + .flags(gst::PadFlags::ACCEPT_INTERSECT) + .build(); Self { - srcpad, sinkpad, settings: Mutex::default(), state: Mutex::default(), @@ -1153,7 +779,7 @@ impl ObjectImpl for FMP4Mux { fn set_property( &self, - _obj: &Self::Type, + obj: &Self::Type, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec, @@ -1161,7 +787,12 @@ impl ObjectImpl for FMP4Mux { match pspec.name() { "fragment-duration" => { let mut settings = self.settings.lock().unwrap(); - settings.fragment_duration = value.get().expect("type checked upstream"); + let fragment_duration = value.get().expect("type checked upstream"); + if settings.fragment_duration != fragment_duration { + settings.fragment_duration = fragment_duration; + drop(settings); + obj.set_latency(fragment_duration, None); + } } "header-update-mode" => { @@ -1213,37 +844,379 @@ impl ObjectImpl for FMP4Mux { self.parent_constructed(obj); obj.add_pad(&self.sinkpad).unwrap(); - obj.add_pad(&self.srcpad).unwrap(); + obj.set_latency(Settings::default().fragment_duration, None); } } impl GstObjectImpl for FMP4Mux {} impl ElementImpl for FMP4Mux { - #[allow(clippy::single_match)] - fn change_state( + fn release_pad(&self, _element: &Self::Type, _pad: &gst::Pad) {} + + fn request_new_pad( &self, - element: &Self::Type, - transition: gst::StateChange, - ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> { - gst::trace!(CAT, obj: element, "Changing state {:?}", transition); + _element: &Self::Type, + _templ: &gst::PadTemplate, + _name: Option<String>, + _caps: Option<&gst::Caps>, + ) -> Option<gst::Pad> { + None + } +} + +impl AggregatorImpl for FMP4Mux { + fn sink_query( + &self, + aggregator: &Self::Type, + aggregator_pad: &gst_base::AggregatorPad, + query: &mut gst::QueryRef, + ) -> bool { + use gst::QueryViewMut; - let res = self.parent_change_state(element, transition)?; + gst::trace!(CAT, obj: aggregator_pad, "Handling query {:?}", query); + + match query.view_mut() { + QueryViewMut::Caps(q) => { + let state = self.state.lock().unwrap(); + + let allowed_caps = if let Some(ref caps) = state.caps { + // TODO: Maybe allow codec_data changes and similar? + caps.clone() + } else { + aggregator_pad.pad_template_caps() + }; + + if let Some(filter_caps) = q.filter() { + let res = filter_caps + .intersect_with_mode(&allowed_caps, gst::CapsIntersectMode::First); + q.set_result(&res); + } else { + q.set_result(&allowed_caps); + } - match transition { - gst::StateChange::PausedToReady => { - *self.state.lock().unwrap() = State::default(); + true } - _ => (), + _ => self.parent_sink_query(aggregator, aggregator_pad, query), } + } + + fn sink_event_pre_queue( + &self, + aggregator: &Self::Type, + aggregator_pad: &gst_base::AggregatorPad, + mut event: gst::Event, + ) -> Result<gst::FlowSuccess, gst::FlowError> { + use gst::EventView; - Ok(res) + gst::trace!(CAT, obj: aggregator_pad, "Handling event {:?}", event); + + match event.view() { + EventView::Segment(ev) => { + if ev.segment().format() != gst::Format::Time { + gst::warning!( + CAT, + obj: aggregator_pad, + "Received non-TIME segment, replacing with default TIME segment" + ); + let segment = gst::FormattedSegment::<gst::ClockTime>::new(); + event = gst::event::Segment::builder(&segment) + .seqnum(event.seqnum()) + .build(); + } + self.parent_sink_event_pre_queue(aggregator, aggregator_pad, event) + } + _ => self.parent_sink_event_pre_queue(aggregator, aggregator_pad, event), + } + } + + fn sink_event( + &self, + aggregator: &Self::Type, + aggregator_pad: &gst_base::AggregatorPad, + event: gst::Event, + ) -> bool { + use gst::EventView; + + gst::trace!(CAT, obj: aggregator_pad, "Handling event {:?}", event); + + match event.view() { + EventView::Segment(ev) => { + // Already fixed-up above to always be a TIME segment + let segment = ev + .segment() + .clone() + .downcast::<gst::ClockTime>() + .expect("non-TIME segment"); + gst::info!(CAT, obj: aggregator_pad, "Received segment {:?}", segment); + aggregator.update_segment(&segment); + + self.parent_sink_event(aggregator, aggregator_pad, event) + } + EventView::Caps(ev) => { + let caps = ev.caps_owned(); + + gst::info!(CAT, obj: aggregator_pad, "Received caps {:?}", caps); + let caps = { + let settings = self.settings.lock().unwrap().clone(); + let mut state = self.state.lock().unwrap(); + + let s = caps.structure(0).unwrap(); + + match s.name() { + "video/x-h264" | "video/x-h265" => { + if !s.has_field_with_type("codec_data", gst::Buffer::static_type()) { + gst::error!( + CAT, + obj: aggregator_pad, + "Received caps without codec_data" + ); + return false; + } + } + "audio/mpeg" => { + if !s.has_field_with_type("codec_data", gst::Buffer::static_type()) { + gst::error!( + CAT, + obj: aggregator_pad, + "Received caps without codec_data" + ); + return false; + } + state.intra_only = true; + } + _ => unreachable!(), + } + + state.caps = Some(caps); + + let (_, caps) = + match self.update_header(aggregator, &mut state, &settings, false) { + Ok(Some(res)) => res, + _ => { + return false; + } + }; + + caps + }; + + aggregator.set_src_caps(&caps); + + self.parent_sink_event(aggregator, aggregator_pad, event) + } + EventView::Tag(_ev) => { + // TODO: Maybe store for putting into the headers of the next fragment? + + self.parent_sink_event(aggregator, aggregator_pad, event) + } + EventView::Gap(_ev) => { + // TODO: queue up and check if draining is needed now + // i.e. make the last sample much longer + true + } + _ => self.parent_sink_event(aggregator, aggregator_pad, event), + } + } + + fn src_query(&self, aggregator: &Self::Type, query: &mut gst::QueryRef) -> bool { + use gst::QueryViewMut; + + gst::trace!(CAT, obj: aggregator, "Handling query {:?}", query); + + match query.view_mut() { + QueryViewMut::Seeking(q) => { + // We can't really handle seeking, it would break everything + q.set(false, gst::ClockTime::ZERO.into(), gst::ClockTime::NONE); + true + } + _ => self.parent_src_query(aggregator, query), + } + } + + fn src_event(&self, aggregator: &Self::Type, event: gst::Event) -> bool { + use gst::EventView; + + gst::trace!(CAT, obj: aggregator, "Handling event {:?}", event); + + match event.view() { + EventView::Seek(_ev) => false, + _ => self.parent_src_event(aggregator, event), + } + } + + fn flush(&self, aggregator: &Self::Type) -> Result<gst::FlowSuccess, gst::FlowError> { + self.parent_flush(aggregator)?; + + let mut state = self.state.lock().unwrap(); + + state.queued_gops.clear(); + state.queued_duration = gst::ClockTime::ZERO; + state.dts_offset = None; + state.last_force_keyunit_time = None; + state.current_offset = 0; + state.fragment_offsets.clear(); + + Ok(gst::FlowSuccess::Ok) + } + + fn stop(&self, aggregator: &Self::Type) -> Result<(), gst::ErrorMessage> { + gst::trace!(CAT, obj: aggregator, "Stopping"); + + let _ = self.parent_stop(aggregator); + *self.state.lock().unwrap() = State::default(); + Ok(()) + } + + fn start(&self, aggregator: &Self::Type) -> Result<(), gst::ErrorMessage> { + gst::trace!(CAT, obj: aggregator, "Starting"); + + self.parent_start(aggregator)?; + *self.state.lock().unwrap() = State::default(); + Ok(()) + } + + fn negotiate(&self, _aggregator: &Self::Type) -> bool { + true + } + + fn aggregate( + &self, + aggregator: &Self::Type, + _timeout: bool, + ) -> Result<gst::FlowSuccess, gst::FlowError> { + let settings = self.settings.lock().unwrap().clone(); + + let is_eos; + let mut upstream_events = vec![]; + + let buffers = { + let mut state = self.state.lock().unwrap(); + + let segment = match self + .sinkpad + .segment() + .clone() + .downcast::<gst::ClockTime>() + .ok() + { + Some(segment) => segment, + None => { + gst::error!(CAT, obj: aggregator, "Got buffer before segment"); + return Err(gst::FlowError::Error); + } + }; + + let buffer = self.sinkpad.pop_buffer(); + is_eos = buffer.is_none() && self.sinkpad.is_eos(); + + if let Some(buffer) = buffer { + let pts = buffer.pts(); + + // Queue up the buffer and update GOP tracking state + self.queue_input(aggregator, &mut state, &segment, buffer)?; + + // If we have a PTS with this buffer, check if a new force-keyunit event for the next + // fragment start has to be created + if let Some(pts) = pts { + if let Some(event) = self.create_force_keyunit_event( + aggregator, &mut state, &settings, &segment, pts, + )? { + upstream_events.push(event); + } + } + } + + // If enough GOPs were queued, drain and create the output fragment + self.drain(aggregator, &mut state, &settings, is_eos)? + }; + + for event in upstream_events { + self.sinkpad.push_event(event); + } + + if let Some(buffers) = buffers { + gst::trace!(CAT, obj: aggregator, "Pushing buffer list {:?}", buffers); + aggregator.finish_buffer_list(buffers)?; + } + + if is_eos { + if settings.header_update_mode != super::HeaderUpdateMode::None { + let updated_header = self.update_header( + aggregator, + &mut self.state.lock().unwrap(), + &settings, + true, + ); + match updated_header { + Ok(Some((buffer_list, caps))) => { + match settings.header_update_mode { + super::HeaderUpdateMode::None => unreachable!(), + super::HeaderUpdateMode::Rewrite => { + let src_pad = aggregator.src_pad(); + let mut q = gst::query::Seeking::new(gst::Format::Bytes); + if src_pad.peer_query(&mut q) && q.result().0 { + // Seek to the beginning with a default bytes segment + aggregator + .update_segment( + &gst::FormattedSegment::<gst::format::Bytes>::new(), + ); + + aggregator.set_src_caps(&caps); + + if let Err(err) = aggregator.finish_buffer_list(buffer_list) { + gst::error!( + CAT, + obj: aggregator, + "Failed pushing updated header buffer downstream: {:?}", + err, + ); + } + } else { + gst::error!( + CAT, + obj: aggregator, + "Can't rewrite header because downstream is not seekable" + ); + } + } + super::HeaderUpdateMode::Update => { + aggregator.set_src_caps(&caps); + if let Err(err) = aggregator.finish_buffer_list(buffer_list) { + gst::error!( + CAT, + obj: aggregator, + "Failed pushing updated header buffer downstream: {:?}", + err, + ); + } + } + } + } + Ok(None) => {} + Err(err) => { + gst::error!( + CAT, + obj: aggregator, + "Failed to generate updated header: {:?}", + err + ); + } + } + } + + // Need to generate new headers if started again after EOS + self.state.lock().unwrap().generated_headers = false; + + Err(gst::FlowError::Eos) + } else { + Ok(gst::FlowSuccess::Ok) + } } } #[repr(C)] pub(crate) struct Class { - parent: gst::ffi::GstElementClass, + parent: gst_base::ffi::GstAggregatorClass, variant: super::Variant, } @@ -1252,7 +1225,7 @@ unsafe impl ClassStruct for Class { } impl std::ops::Deref for Class { - type Target = glib::Class<gst::Element>; + type Target = glib::Class<gst_base::Aggregator>; fn deref(&self) -> &Self::Target { unsafe { &*(&self.parent as *const _ as *const _) } @@ -1268,7 +1241,7 @@ unsafe impl<T: FMP4MuxImpl> IsSubclassable<T> for super::FMP4Mux { } } -pub(crate) trait FMP4MuxImpl: ElementImpl { +pub(crate) trait FMP4MuxImpl: AggregatorImpl { const VARIANT: super::Variant; } @@ -1348,6 +1321,8 @@ impl ElementImpl for ISOFMP4Mux { } } +impl AggregatorImpl for ISOFMP4Mux {} + impl FMP4MuxImpl for ISOFMP4Mux { const VARIANT: super::Variant = super::Variant::ISO; } @@ -1428,6 +1403,8 @@ impl ElementImpl for CMAFMux { } } +impl AggregatorImpl for CMAFMux {} + impl FMP4MuxImpl for CMAFMux { const VARIANT: super::Variant = super::Variant::CMAF; } @@ -1508,6 +1485,8 @@ impl ElementImpl for DASHMP4Mux { } } +impl AggregatorImpl for DASHMP4Mux {} + impl FMP4MuxImpl for DASHMP4Mux { const VARIANT: super::Variant = super::Variant::DASH; } diff --git a/generic/fmp4/src/fmp4mux/mod.rs b/generic/fmp4/src/fmp4mux/mod.rs index 567195ff..0311c417 100644 --- a/generic/fmp4/src/fmp4mux/mod.rs +++ b/generic/fmp4/src/fmp4mux/mod.rs @@ -13,19 +13,19 @@ mod boxes; mod imp; glib::wrapper! { - pub(crate) struct FMP4Mux(ObjectSubclass<imp::FMP4Mux>) @extends gst::Element, gst::Object; + pub(crate) struct FMP4Mux(ObjectSubclass<imp::FMP4Mux>) @extends gst_base::Aggregator, gst::Element, gst::Object; } glib::wrapper! { - pub(crate) struct ISOFMP4Mux(ObjectSubclass<imp::ISOFMP4Mux>) @extends FMP4Mux, gst::Element, gst::Object; + pub(crate) struct ISOFMP4Mux(ObjectSubclass<imp::ISOFMP4Mux>) @extends FMP4Mux, gst_base::Aggregator, gst::Element, gst::Object; } glib::wrapper! { - pub(crate) struct CMAFMux(ObjectSubclass<imp::CMAFMux>) @extends FMP4Mux, gst::Element, gst::Object; + pub(crate) struct CMAFMux(ObjectSubclass<imp::CMAFMux>) @extends FMP4Mux, gst_base::Aggregator, gst::Element, gst::Object; } glib::wrapper! { - pub(crate) struct DASHMP4Mux(ObjectSubclass<imp::DASHMP4Mux>) @extends FMP4Mux, gst::Element, gst::Object; + pub(crate) struct DASHMP4Mux(ObjectSubclass<imp::DASHMP4Mux>) @extends FMP4Mux, gst_base::Aggregator, gst::Element, gst::Object; } pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { |