diff options
author | Mathieu Duponchelle <mathieu@centricular.com> | 2022-05-24 23:59:29 +0300 |
---|---|---|
committer | GStreamer Marge Bot <gitlab-merge-bot@gstreamer-foundation.org> | 2022-05-25 20:35:04 +0300 |
commit | ab01fc614301b2be79975a535338037aac209764 (patch) | |
tree | 39290fa7fe6c91d186058a7404e0de88c6525e45 | |
parent | 77260a8442fea3db4c22ce81df9766493ebe8fc6 (diff) |
onvifaggregator: refactor to support duration-less media buffers
For instance when dealing with a variable framerate media stream,
input media buffers may not hold a duration, in which case we try
to calculate one by waiting for the following buffer.
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/765>
-rw-r--r-- | net/onvif/src/onvifaggregator/imp.rs | 137 |
1 files changed, 113 insertions, 24 deletions
diff --git a/net/onvif/src/onvifaggregator/imp.rs b/net/onvif/src/onvifaggregator/imp.rs index 44de7464..35bc951c 100644 --- a/net/onvif/src/onvifaggregator/imp.rs +++ b/net/onvif/src/onvifaggregator/imp.rs @@ -45,6 +45,9 @@ impl PartialEq for MetaFrame { struct State { // FIFO of MetaFrames meta_frames: BTreeSet<MetaFrame>, + // We may store the next buffer we output here while waiting + // for a future buffer, when we need one to calculate its duration + current_media_buffer: Option<gst::Buffer>, } pub struct OnvifAggregator { @@ -277,7 +280,7 @@ impl OnvifAggregator { Ok(()) } - fn lookup_reference_timestamp(&self, buffer: gst::Buffer) -> Option<gst::ClockTime> { + fn lookup_reference_timestamp(&self, buffer: &gst::Buffer) -> Option<gst::ClockTime> { for meta in buffer.iter_meta::<gst::ReferenceTimestampMeta>() { if meta.reference().is_subset(&NTP_CAPS) { return Some(meta.timestamp()); @@ -287,7 +290,66 @@ impl OnvifAggregator { None } - // Called after consuming metadata buffers, we peek the current media buffer + fn media_buffer_duration( + &self, + element: &super::OnvifAggregator, + current_media_buffer: &gst::Buffer, + timeout: bool, + ) -> Option<gst::ClockTime> { + match current_media_buffer.duration() { + Some(duration) => { + gst::log!( + CAT, + obj: element, + "Current media buffer has a duration, using it: {}", + duration + ); + Some(duration) + } + None => { + if let Some(next_buffer) = self.media_sink_pad.peek_buffer() { + match next_buffer.pts().zip(current_media_buffer.pts()) { + Some((next_pts, current_pts)) => { + let duration = next_pts.saturating_sub(current_pts); + + gst::log!( + CAT, + obj: element, + "calculated duration for current media buffer from next buffer: {}", + duration + ); + + Some(duration) + } + None => { + gst::log!( + CAT, + obj: element, + "could not calculate duration for current media buffer" + ); + Some(gst::ClockTime::from_nseconds(0)) + } + } + } else if timeout { + gst::log!( + CAT, + obj: element, + "could not calculate duration for current media buffer" + ); + Some(gst::ClockTime::from_nseconds(0)) + } else { + gst::trace!( + CAT, + obj: element, + "No next buffer to peek at yet to calculate duration" + ); + None + } + } + } + } + + // Called after consuming metadata buffers, we consume the current media buffer // and output it when: // // * it does not have a reference timestamp meta @@ -299,51 +361,71 @@ impl OnvifAggregator { element: &super::OnvifAggregator, timeout: bool, ) -> Result<Option<(gst::Buffer, Option<gst::ClockTime>)>, gst::FlowError> { - if let Some(media_buffer) = self.media_sink_pad.peek_buffer() { - let duration = media_buffer.duration().ok_or_else(|| { - gst::error!(CAT, obj: element, "Require buffers with duration"); - gst::FlowError::Error - })?; - - if let Some(start) = self.lookup_reference_timestamp(media_buffer) { - let end = start + duration; + if let Some(mut current_media_buffer) = state + .current_media_buffer + .take() + .or_else(|| self.media_sink_pad.pop_buffer()) + { + if let Some(current_media_start) = + self.lookup_reference_timestamp(¤t_media_buffer) + { + let duration = + match self.media_buffer_duration(element, ¤t_media_buffer, timeout) { + Some(duration) => { + // Update the buffer duration for good measure, in order to + // set a fully-accurate position later on in aggregate() + { + let buf_mut = current_media_buffer.make_mut(); + buf_mut.set_duration(duration); + } + + duration + } + None => { + state.current_media_buffer = Some(current_media_buffer); + return Ok(None); + } + }; + + let end = current_media_start + duration; if timeout { gst::debug!( CAT, obj: element, "Media buffer spanning {} -> {} is ready (timeout)", - start, + current_media_start, end ); - Ok(Some((self.media_sink_pad.pop_buffer().unwrap(), Some(end)))) + Ok(Some((current_media_buffer, Some(end)))) } else if self.meta_sink_pad.is_eos() { gst::debug!( CAT, obj: element, "Media buffer spanning {} -> {} is ready (meta pad is EOS)", - start, + current_media_start, end ); - Ok(Some((self.media_sink_pad.pop_buffer().unwrap(), Some(end)))) + Ok(Some((current_media_buffer, Some(end)))) } else if let Some(latest_frame) = state.meta_frames.iter().next_back() { if latest_frame.timestamp > end { gst::debug!( CAT, obj: element, "Media buffer spanning {} -> {} is ready", - start, + current_media_start, end ); - Ok(Some((self.media_sink_pad.pop_buffer().unwrap(), Some(end)))) + Ok(Some((current_media_buffer, Some(end)))) } else { gst::trace!( CAT, obj: element, "Media buffer spanning {} -> {} isn't ready yet", - start, + current_media_start, end ); + state.current_media_buffer = Some(current_media_buffer); Ok(None) } } else { @@ -351,10 +433,10 @@ impl OnvifAggregator { CAT, obj: element, "Media buffer spanning {} -> {} isn't ready yet", - start, + current_media_start, end ); - + state.current_media_buffer = Some(current_media_buffer); Ok(None) } } else { @@ -364,10 +446,7 @@ impl OnvifAggregator { "Consuming media buffer with no reference NTP timestamp" ); - Ok(Some(( - self.media_sink_pad.pop_buffer().unwrap(), - gst::ClockTime::NONE, - ))) + Ok(Some((current_media_buffer, gst::ClockTime::NONE))) } } else { gst::trace!(CAT, obj: element, "No media buffer queued"); @@ -383,6 +462,8 @@ impl AggregatorImpl for OnvifAggregator { element: &Self::Type, timeout: bool, ) -> Result<gst::FlowSuccess, gst::FlowError> { + gst::trace!(CAT, obj: element, "aggregate, timeout: {}", timeout); + let mut state = self.state.lock().unwrap(); self.consume_meta(&mut state, element)?; @@ -434,7 +515,15 @@ impl AggregatorImpl for OnvifAggregator { s.set("frames", buflist); } - element.set_position(buffer.pts().opt_add(buffer.duration())); + let position = buffer.pts().opt_add( + buffer + .duration() + .unwrap_or_else(|| gst::ClockTime::from_nseconds(0)), + ); + + gst::log!(CAT, obj: element, "Updating position: {:?}", position); + + element.set_position(position); self.finish_buffer(element, buffer) } else if self.media_sink_pad.is_eos() { |