Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/sdroege/gst-plugin-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMathieu Duponchelle <mathieu@centricular.com>2022-05-24 23:59:29 +0300
committerGStreamer Marge Bot <gitlab-merge-bot@gstreamer-foundation.org>2022-05-25 20:35:04 +0300
commitab01fc614301b2be79975a535338037aac209764 (patch)
tree39290fa7fe6c91d186058a7404e0de88c6525e45
parent77260a8442fea3db4c22ce81df9766493ebe8fc6 (diff)
onvifaggregator: refactor to support duration-less media buffers
For instance when dealing with a variable framerate media stream, input media buffers may not hold a duration, in which case we try to calculate one by waiting for the following buffer. Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/765>
-rw-r--r--net/onvif/src/onvifaggregator/imp.rs137
1 files changed, 113 insertions, 24 deletions
diff --git a/net/onvif/src/onvifaggregator/imp.rs b/net/onvif/src/onvifaggregator/imp.rs
index 44de7464..35bc951c 100644
--- a/net/onvif/src/onvifaggregator/imp.rs
+++ b/net/onvif/src/onvifaggregator/imp.rs
@@ -45,6 +45,9 @@ impl PartialEq for MetaFrame {
struct State {
// FIFO of MetaFrames
meta_frames: BTreeSet<MetaFrame>,
+ // We may store the next buffer we output here while waiting
+ // for a future buffer, when we need one to calculate its duration
+ current_media_buffer: Option<gst::Buffer>,
}
pub struct OnvifAggregator {
@@ -277,7 +280,7 @@ impl OnvifAggregator {
Ok(())
}
- fn lookup_reference_timestamp(&self, buffer: gst::Buffer) -> Option<gst::ClockTime> {
+ fn lookup_reference_timestamp(&self, buffer: &gst::Buffer) -> Option<gst::ClockTime> {
for meta in buffer.iter_meta::<gst::ReferenceTimestampMeta>() {
if meta.reference().is_subset(&NTP_CAPS) {
return Some(meta.timestamp());
@@ -287,7 +290,66 @@ impl OnvifAggregator {
None
}
- // Called after consuming metadata buffers, we peek the current media buffer
+ fn media_buffer_duration(
+ &self,
+ element: &super::OnvifAggregator,
+ current_media_buffer: &gst::Buffer,
+ timeout: bool,
+ ) -> Option<gst::ClockTime> {
+ match current_media_buffer.duration() {
+ Some(duration) => {
+ gst::log!(
+ CAT,
+ obj: element,
+ "Current media buffer has a duration, using it: {}",
+ duration
+ );
+ Some(duration)
+ }
+ None => {
+ if let Some(next_buffer) = self.media_sink_pad.peek_buffer() {
+ match next_buffer.pts().zip(current_media_buffer.pts()) {
+ Some((next_pts, current_pts)) => {
+ let duration = next_pts.saturating_sub(current_pts);
+
+ gst::log!(
+ CAT,
+ obj: element,
+ "calculated duration for current media buffer from next buffer: {}",
+ duration
+ );
+
+ Some(duration)
+ }
+ None => {
+ gst::log!(
+ CAT,
+ obj: element,
+ "could not calculate duration for current media buffer"
+ );
+ Some(gst::ClockTime::from_nseconds(0))
+ }
+ }
+ } else if timeout {
+ gst::log!(
+ CAT,
+ obj: element,
+ "could not calculate duration for current media buffer"
+ );
+ Some(gst::ClockTime::from_nseconds(0))
+ } else {
+ gst::trace!(
+ CAT,
+ obj: element,
+ "No next buffer to peek at yet to calculate duration"
+ );
+ None
+ }
+ }
+ }
+ }
+
+ // Called after consuming metadata buffers, we consume the current media buffer
// and output it when:
//
// * it does not have a reference timestamp meta
@@ -299,51 +361,71 @@ impl OnvifAggregator {
element: &super::OnvifAggregator,
timeout: bool,
) -> Result<Option<(gst::Buffer, Option<gst::ClockTime>)>, gst::FlowError> {
- if let Some(media_buffer) = self.media_sink_pad.peek_buffer() {
- let duration = media_buffer.duration().ok_or_else(|| {
- gst::error!(CAT, obj: element, "Require buffers with duration");
- gst::FlowError::Error
- })?;
-
- if let Some(start) = self.lookup_reference_timestamp(media_buffer) {
- let end = start + duration;
+ if let Some(mut current_media_buffer) = state
+ .current_media_buffer
+ .take()
+ .or_else(|| self.media_sink_pad.pop_buffer())
+ {
+ if let Some(current_media_start) =
+ self.lookup_reference_timestamp(&current_media_buffer)
+ {
+ let duration =
+ match self.media_buffer_duration(element, &current_media_buffer, timeout) {
+ Some(duration) => {
+ // Update the buffer duration for good measure, in order to
+ // set a fully-accurate position later on in aggregate()
+ {
+ let buf_mut = current_media_buffer.make_mut();
+ buf_mut.set_duration(duration);
+ }
+
+ duration
+ }
+ None => {
+ state.current_media_buffer = Some(current_media_buffer);
+ return Ok(None);
+ }
+ };
+
+ let end = current_media_start + duration;
if timeout {
gst::debug!(
CAT,
obj: element,
"Media buffer spanning {} -> {} is ready (timeout)",
- start,
+ current_media_start,
end
);
- Ok(Some((self.media_sink_pad.pop_buffer().unwrap(), Some(end))))
+ Ok(Some((current_media_buffer, Some(end))))
} else if self.meta_sink_pad.is_eos() {
gst::debug!(
CAT,
obj: element,
"Media buffer spanning {} -> {} is ready (meta pad is EOS)",
- start,
+ current_media_start,
end
);
- Ok(Some((self.media_sink_pad.pop_buffer().unwrap(), Some(end))))
+ Ok(Some((current_media_buffer, Some(end))))
} else if let Some(latest_frame) = state.meta_frames.iter().next_back() {
if latest_frame.timestamp > end {
gst::debug!(
CAT,
obj: element,
"Media buffer spanning {} -> {} is ready",
- start,
+ current_media_start,
end
);
- Ok(Some((self.media_sink_pad.pop_buffer().unwrap(), Some(end))))
+ Ok(Some((current_media_buffer, Some(end))))
} else {
gst::trace!(
CAT,
obj: element,
"Media buffer spanning {} -> {} isn't ready yet",
- start,
+ current_media_start,
end
);
+ state.current_media_buffer = Some(current_media_buffer);
Ok(None)
}
} else {
@@ -351,10 +433,10 @@ impl OnvifAggregator {
CAT,
obj: element,
"Media buffer spanning {} -> {} isn't ready yet",
- start,
+ current_media_start,
end
);
-
+ state.current_media_buffer = Some(current_media_buffer);
Ok(None)
}
} else {
@@ -364,10 +446,7 @@ impl OnvifAggregator {
"Consuming media buffer with no reference NTP timestamp"
);
- Ok(Some((
- self.media_sink_pad.pop_buffer().unwrap(),
- gst::ClockTime::NONE,
- )))
+ Ok(Some((current_media_buffer, gst::ClockTime::NONE)))
}
} else {
gst::trace!(CAT, obj: element, "No media buffer queued");
@@ -383,6 +462,8 @@ impl AggregatorImpl for OnvifAggregator {
element: &Self::Type,
timeout: bool,
) -> Result<gst::FlowSuccess, gst::FlowError> {
+ gst::trace!(CAT, obj: element, "aggregate, timeout: {}", timeout);
+
let mut state = self.state.lock().unwrap();
self.consume_meta(&mut state, element)?;
@@ -434,7 +515,15 @@ impl AggregatorImpl for OnvifAggregator {
s.set("frames", buflist);
}
- element.set_position(buffer.pts().opt_add(buffer.duration()));
+ let position = buffer.pts().opt_add(
+ buffer
+ .duration()
+ .unwrap_or_else(|| gst::ClockTime::from_nseconds(0)),
+ );
+
+ gst::log!(CAT, obj: element, "Updating position: {:?}", position);
+
+ element.set_position(position);
self.finish_buffer(element, buffer)
} else if self.media_sink_pad.is_eos() {