diff options
author | Sebastian Dröge <sebastian@centricular.com> | 2022-06-03 10:54:37 +0300 |
---|---|---|
committer | GStreamer Marge Bot <gitlab-merge-bot@gstreamer-foundation.org> | 2022-08-12 17:00:27 +0300 |
commit | 35b42b88d9d4f797c9f2b8dbbc9da0e5089c0820 (patch) | |
tree | 0f3f836c67ba01bf4caac7964863e16707eebde2 /net/onvif/src | |
parent | ef7ed2d953093e191a8afcb9799286b12c55b447 (diff) |
onvif: Add onvifmetadataparse element
This splits XML metadata into separate frames and ensures properly
timestamped metadata.
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/781>
Diffstat (limited to 'net/onvif/src')
-rw-r--r-- | net/onvif/src/lib.rs | 87 | ||||
-rw-r--r-- | net/onvif/src/onvifaggregator/imp.rs | 115 | ||||
-rw-r--r-- | net/onvif/src/onvifmetadataparse/imp.rs | 636 | ||||
-rw-r--r-- | net/onvif/src/onvifmetadataparse/mod.rs | 25 |
4 files changed, 772 insertions, 91 deletions
diff --git a/net/onvif/src/lib.rs b/net/onvif/src/lib.rs index c62a3626e..8c0b7c1fc 100644 --- a/net/onvif/src/lib.rs +++ b/net/onvif/src/lib.rs @@ -8,17 +8,104 @@ #![allow(clippy::non_send_fields_in_send_ty)] use gst::glib; +use once_cell::sync::Lazy; mod onvifaggregator; mod onvifdepay; +mod onvifmetadataparse; mod onvifoverlay; mod onvifpay; +// Offset in nanoseconds from midnight 01-01-1900 (prime epoch) to +// midnight 01-01-1970 (UNIX epoch) +pub(crate) const PRIME_EPOCH_OFFSET: gst::ClockTime = gst::ClockTime::from_seconds(2_208_988_800); + +pub(crate) static NTP_CAPS: Lazy<gst::Caps> = + Lazy::new(|| gst::Caps::builder("timestamp/x-ntp").build()); +pub(crate) static UNIX_CAPS: Lazy<gst::Caps> = + Lazy::new(|| gst::Caps::builder("timestamp/x-unix").build()); + +pub(crate) fn lookup_reference_timestamp(buffer: &gst::Buffer) -> Option<gst::ClockTime> { + for meta in buffer.iter_meta::<gst::ReferenceTimestampMeta>() { + if meta.reference().is_subset(&NTP_CAPS) { + return Some(meta.timestamp()); + } + if meta.reference().is_subset(&UNIX_CAPS) { + return Some(meta.timestamp() + PRIME_EPOCH_OFFSET); + } + } + + None +} + +pub(crate) fn xml_from_buffer(buffer: &gst::Buffer) -> Result<minidom::Element, gst::ErrorMessage> { + let map = buffer.map_readable().map_err(|_| { + gst::error_msg!(gst::ResourceError::Read, ["Failed to map buffer readable"]) + })?; + + let utf8 = std::str::from_utf8(&map).map_err(|err| { + gst::error_msg!( + gst::StreamError::Format, + ["Failed to decode buffer as UTF-8: {}", err] + ) + })?; + + let root = utf8.parse::<minidom::Element>().map_err(|err| { + gst::error_msg!( + gst::ResourceError::Read, + ["Failed to parse buffer as XML: {}", err] + ) + })?; + + Ok(root) +} + +pub(crate) fn iterate_video_analytics_frames( + root: &minidom::Element, +) -> impl Iterator< + Item = Result<(chrono::DateTime<chrono::FixedOffset>, &minidom::Element), gst::ErrorMessage>, +> { + root.get_child("VideoAnalytics", "http://www.onvif.org/ver10/schema") + .map(|analytics| { + analytics.children().filter_map(|el| { + // We are only interested in associating Frame metadata with video frames + if el.is("Frame", "http://www.onvif.org/ver10/schema") { + let timestamp = match el.attr("UtcTime") { + Some(timestamp) => timestamp, + None => { + return Some(Err(gst::error_msg!( + gst::ResourceError::Read, + ["Frame element has no UtcTime attribute"] + ))); + } + }; + + let dt = match chrono::DateTime::parse_from_rfc3339(timestamp) { + Ok(dt) => dt, + Err(err) => { + return Some(Err(gst::error_msg!( + gst::ResourceError::Read, + ["Failed to parse UtcTime {}: {}", timestamp, err] + ))); + } + }; + + Some(Ok((dt, el))) + } else { + None + } + }) + }) + .into_iter() + .flatten() +} + fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { onvifpay::register(plugin)?; onvifdepay::register(plugin)?; onvifaggregator::register(plugin)?; onvifoverlay::register(plugin)?; + onvifmetadataparse::register(plugin)?; gst::meta::CustomMeta::register("OnvifXMLFrameMeta", &[], |_, _, _, _| true); diff --git a/net/onvif/src/onvifaggregator/imp.rs b/net/onvif/src/onvifaggregator/imp.rs index e1c498300..a82ea636f 100644 --- a/net/onvif/src/onvifaggregator/imp.rs +++ b/net/onvif/src/onvifaggregator/imp.rs @@ -4,16 +4,11 @@ use gst::subclass::prelude::*; use gst_base::prelude::*; use gst_base::subclass::prelude::*; use gst_base::AGGREGATOR_FLOW_NEED_DATA; -use minidom::Element; use once_cell::sync::Lazy; use std::collections::BTreeSet; use std::io::Cursor; use std::sync::Mutex; -// Offset in nanoseconds from midnight 01-01-1900 (prime epoch) to -// midnight 01-01-1970 (UNIX epoch) -const PRIME_EPOCH_OFFSET: gst::ClockTime = gst::ClockTime::from_seconds(2_208_988_800); - // Incoming metadata is split up frame-wise, and stored in a FIFO. #[derive(Eq, Clone)] struct MetaFrame { @@ -67,9 +62,6 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| { ) }); -static NTP_CAPS: Lazy<gst::Caps> = Lazy::new(|| gst::Caps::builder("timestamp/x-ntp").build()); -static UNIX_CAPS: Lazy<gst::Caps> = Lazy::new(|| gst::Caps::builder("timestamp/x-unix").build()); - #[glib::object_subclass] impl ObjectSubclass for OnvifAggregator { const NAME: &'static str = "GstOnvifAggregator"; @@ -196,104 +188,45 @@ impl OnvifAggregator { element: &super::OnvifAggregator, ) -> Result<(), gst::FlowError> { while let Some(buffer) = self.meta_sink_pad.pop_buffer() { - let buffer = buffer.into_mapped_buffer_readable().map_err(|_| { - gst::element_error!( - element, - gst::ResourceError::Read, - ["Failed to map buffer readable"] - ); - - gst::FlowError::Error - })?; - - let utf8 = std::str::from_utf8(buffer.as_ref()).map_err(|err| { - gst::element_error!( - element, - gst::StreamError::Format, - ["Failed to decode buffer as UTF-8: {}", err] - ); + let root = crate::xml_from_buffer(&buffer).map_err(|err| { + element.post_error_message(err); gst::FlowError::Error })?; - let root = utf8.parse::<Element>().map_err(|err| { - gst::element_error!( - element, - gst::ResourceError::Read, - ["Failed to parse buffer as XML: {}", err] - ); + for res in crate::iterate_video_analytics_frames(&root) { + let (dt, el) = res.map_err(|err| { + element.post_error_message(err); - gst::FlowError::Error - })?; + gst::FlowError::Error + })?; - if let Some(analytics) = - root.get_child("VideoAnalytics", "http://www.onvif.org/ver10/schema") - { - for el in analytics.children() { - // We are only interested in associating Frame metadata with video frames - if el.is("Frame", "http://www.onvif.org/ver10/schema") { - let timestamp = el.attr("UtcTime").ok_or_else(|| { - gst::element_error!( - element, - gst::ResourceError::Read, - ["Frame element has no UtcTime attribute"] - ); + let prime_dt_ns = crate::PRIME_EPOCH_OFFSET + + gst::ClockTime::from_nseconds(dt.timestamp_nanos() as u64); - gst::FlowError::Error - })?; - - let dt = - chrono::DateTime::parse_from_rfc3339(timestamp).map_err(|err| { - gst::element_error!( - element, - gst::ResourceError::Read, - ["Failed to parse UtcTime {}: {}", timestamp, err] - ); - - gst::FlowError::Error - })?; - - let prime_dt_ns = PRIME_EPOCH_OFFSET - + gst::ClockTime::from_nseconds(dt.timestamp_nanos() as u64); - - let mut writer = Cursor::new(Vec::new()); - el.write_to(&mut writer).map_err(|err| { - gst::element_error!( - element, - gst::ResourceError::Write, - ["Failed to write back frame as XML: {}", err] - ); + let mut writer = Cursor::new(Vec::new()); + el.write_to(&mut writer).map_err(|err| { + gst::element_error!( + element, + gst::ResourceError::Write, + ["Failed to write back frame as XML: {}", err] + ); - gst::FlowError::Error - })?; + gst::FlowError::Error + })?; - gst::trace!(CAT, "Consuming metadata buffer {}", prime_dt_ns); + gst::trace!(CAT, "Consuming metadata buffer {}", prime_dt_ns); - state.meta_frames.insert(MetaFrame { - timestamp: prime_dt_ns, - buffer: gst::Buffer::from_slice(writer.into_inner()), - }); - } - } + state.meta_frames.insert(MetaFrame { + timestamp: prime_dt_ns, + buffer: gst::Buffer::from_slice(writer.into_inner()), + }); } } Ok(()) } - fn lookup_reference_timestamp(&self, buffer: &gst::Buffer) -> Option<gst::ClockTime> { - for meta in buffer.iter_meta::<gst::ReferenceTimestampMeta>() { - if meta.reference().is_subset(&NTP_CAPS) { - return Some(meta.timestamp()); - } - if meta.reference().is_subset(&UNIX_CAPS) { - return Some(meta.timestamp() + PRIME_EPOCH_OFFSET); - } - } - - None - } - fn media_buffer_duration( &self, element: &super::OnvifAggregator, @@ -371,7 +304,7 @@ impl OnvifAggregator { .or_else(|| self.media_sink_pad.pop_buffer()) { if let Some(current_media_start) = - self.lookup_reference_timestamp(¤t_media_buffer) + crate::lookup_reference_timestamp(¤t_media_buffer) { let duration = match self.media_buffer_duration(element, ¤t_media_buffer, timeout) { diff --git a/net/onvif/src/onvifmetadataparse/imp.rs b/net/onvif/src/onvifmetadataparse/imp.rs new file mode 100644 index 000000000..ca09e90e9 --- /dev/null +++ b/net/onvif/src/onvifmetadataparse/imp.rs @@ -0,0 +1,636 @@ +// Copyright (C) 2022 Sebastian Dröge <sebastian@centricular.com> +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// <https://mozilla.org/MPL/2.0/>. +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::glib; +use gst::prelude::*; +use gst::subclass::prelude::*; + +use once_cell::sync::Lazy; + +use minidom::Element; + +use std::collections::BTreeMap; +use std::sync::Mutex; + +static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| { + gst::DebugCategory::new( + "onvifmetadataparse", + gst::DebugColorFlags::empty(), + Some("ONVIF Metadata Parser Element"), + ) +}); + +#[derive(Clone, Debug, Default)] +struct Settings { + latency: Option<gst::ClockTime>, +} + +#[derive(Default, Debug)] +struct State { + // Initially queued buffers until we have a UTC time / PTS mapping + pre_queued_buffers: Vec<gst::Buffer>, + // Mapping of UTC time to PTS + utc_time_pts_mapping: Option<(gst::ClockTime, gst::ClockTime)>, + // UTC time -> XML + queued_frames: BTreeMap<gst::ClockTime, Element>, + // Configured latency + configured_latency: gst::ClockTime, +} + +pub struct OnvifMetadataParse { + srcpad: gst::Pad, + sinkpad: gst::Pad, + settings: Mutex<Settings>, + state: Mutex<State>, +} + +impl OnvifMetadataParse { + fn sink_chain( + &self, + _pad: &gst::Pad, + element: &super::OnvifMetadataParse, + buffer: gst::Buffer, + ) -> Result<gst::FlowSuccess, gst::FlowError> { + gst::log!(CAT, obj: element, "Handling buffer {:?}", buffer); + + let mut state = self.state.lock().unwrap(); + + let pts = match buffer.pts() { + Some(pts) => pts, + None => { + gst::error!(CAT, obj: element, "Need buffers with PTS"); + return Err(gst::FlowError::Error); + } + }; + // First we need to get an UTC/PTS mapping. We wait up to the latency + // for that and otherwise error out. + if state.utc_time_pts_mapping.is_none() { + let utc_time = crate::lookup_reference_timestamp(&buffer); + if let Some(utc_time) = utc_time { + let initial_pts = state + .pre_queued_buffers + .first() + .map(|b| b.pts().unwrap()) + .unwrap_or(pts); + let diff = pts.saturating_sub(initial_pts); + let initial_utc_time = match utc_time.checked_sub(diff) { + Some(initial_utc_time) => initial_utc_time, + None => { + gst::error!(CAT, obj: element, "Can't calculate initial UTC time"); + return Err(gst::FlowError::Error); + } + }; + + gst::info!( + CAT, + obj: element, + "Calculated initial UTC/PTS mapping: {}/{}", + initial_utc_time, + initial_pts + ); + state.utc_time_pts_mapping = Some((initial_utc_time, initial_pts)); + } else { + state.pre_queued_buffers.push(buffer); + + if let Some(front_pts) = state.pre_queued_buffers.first().map(|b| b.pts().unwrap()) + { + if pts.saturating_sub(front_pts) >= state.configured_latency { + gst::error!( + CAT, + obj: element, + "Received no UTC time in the first {}", + state.configured_latency + ); + return Err(gst::FlowError::Error); + } + } + + return Ok(gst::FlowSuccess::Ok); + } + } + + self.queue(element, &mut state, buffer)?; + let buffers = self.drain(element, &mut state, Some(pts))?; + + if let Some(buffers) = buffers { + drop(state); + self.srcpad.push_list(buffers) + } else { + Ok(gst::FlowSuccess::Ok) + } + } + + fn queue( + &self, + element: &super::OnvifMetadataParse, + state: &mut State, + buffer: gst::Buffer, + ) -> Result<(), gst::FlowError> { + let State { + ref mut pre_queued_buffers, + ref mut queued_frames, + .. + } = &mut *state; + + for buffer in pre_queued_buffers.drain(..).chain(std::iter::once(buffer)) { + let root = crate::xml_from_buffer(&buffer).map_err(|err| { + element.post_error_message(err); + + gst::FlowError::Error + })?; + + for res in crate::iterate_video_analytics_frames(&root) { + let (dt, el) = res.map_err(|err| { + element.post_error_message(err); + + gst::FlowError::Error + })?; + + let dt_unix_ns = gst::ClockTime::from_nseconds(dt.timestamp_nanos() as u64); + + gst::trace!( + CAT, + obj: element, + "Queueing frame with UTC time {}", + dt_unix_ns + ); + + let xml = queued_frames.entry(dt_unix_ns).or_insert_with(|| { + Element::builder("VideoAnalytics", "http://www.onvif.org/ver10/schema") + .prefix(Some("tt".into()), "http://www.onvif.org/ver10/schema") + .unwrap() + .build() + }); + + xml.append_child(el.clone()); + } + } + + Ok(()) + } + + fn drain( + &self, + element: &super::OnvifMetadataParse, + state: &mut State, + pts: Option<gst::ClockTime>, + ) -> Result<Option<gst::BufferList>, gst::FlowError> { + let State { + ref mut queued_frames, + utc_time_pts_mapping, + configured_latency, + .. + } = &mut *state; + + let utc_time_pts_mapping = match utc_time_pts_mapping { + Some(utc_time_pts_mapping) => utc_time_pts_mapping, + None => return Ok(None), + }; + + let utc_time_to_pts = |utc_time: gst::ClockTime| { + if utc_time < utc_time_pts_mapping.0 { + let diff = utc_time_pts_mapping.0 - utc_time; + utc_time_pts_mapping.1.checked_sub(diff) + } else { + let diff = utc_time - utc_time_pts_mapping.0; + Some(utc_time_pts_mapping.1 + diff) + } + }; + + let mut buffers = Vec::new(); + + while !queued_frames.is_empty() { + let utc_time = *queued_frames.iter().next().unwrap().0; + let frame_pts = match utc_time_to_pts(utc_time) { + Some(frame_pts) => frame_pts, + None => { + gst::warning!(CAT, obj: element, "UTC time {} outside segment", utc_time); + gst::ClockTime::ZERO + } + }; + + // Not at EOS and not above the latency yet + if pts.map_or(false, |pts| { + pts.saturating_sub(frame_pts) < *configured_latency + }) { + break; + } + + let frame = queued_frames.remove(&utc_time).unwrap(); + + gst::trace!( + CAT, + obj: element, + "Dequeueing frame with UTC time {} / PTS {}", + utc_time, + frame_pts + ); + + let xml = Element::builder("MetadataStream", "http://www.onvif.org/ver10/schema") + .prefix(Some("tt".into()), "http://www.onvif.org/ver10/schema") + .unwrap() + .append(frame) + .build(); + + let mut vec = Vec::new(); + if let Err(err) = xml.write_to_decl(&mut vec) { + gst::error!(CAT, obj: element, "Can't serialize XML element: {}", err); + continue; + } + + let mut buffer = gst::Buffer::from_mut_slice(vec); + let buffer_ref = buffer.get_mut().unwrap(); + buffer_ref.set_pts(frame_pts); + + gst::ReferenceTimestampMeta::add( + buffer_ref, + &crate::UNIX_CAPS, + utc_time, + gst::ClockTime::NONE, + ); + + buffers.push(buffer); + } + + buffers.sort_by_key(|b| b.pts()); + + if !buffers.is_empty() { + let mut buffer_list = gst::BufferList::new_sized(buffers.len()); + let buffer_list_ref = buffer_list.get_mut().unwrap(); + buffer_list_ref.extend(buffers); + + Ok(Some(buffer_list)) + } else { + Ok(None) + } + } + + fn sink_event( + &self, + pad: &gst::Pad, + element: &super::OnvifMetadataParse, + event: gst::Event, + ) -> bool { + gst::log!(CAT, obj: pad, "Handling event {:?}", event); + + match event.view() { + gst::EventView::Segment(_) | gst::EventView::Eos(_) => { + let mut state = self.state.lock().unwrap(); + let buffers = self.drain(element, &mut state, None).ok().flatten(); + state.pre_queued_buffers.clear(); + state.utc_time_pts_mapping = None; + state.queued_frames.clear(); + drop(state); + + if let Some(buffers) = buffers { + if let Err(err) = self.srcpad.push_list(buffers) { + gst::error!(CAT, obj: element, "Failed to drain frames: {}", err); + } + } + + pad.event_default(Some(element), event) + } + gst::EventView::FlushStop(_) => { + let mut state = self.state.lock().unwrap(); + state.pre_queued_buffers.clear(); + state.queued_frames.clear(); + state.utc_time_pts_mapping = None; + drop(state); + pad.event_default(Some(element), event) + } + gst::EventView::Caps(ev) => { + let settings = self.settings.lock().unwrap().clone(); + + let mut state = self.state.lock().unwrap(); + let previous_latency = state.configured_latency; + let latency = if let Some(latency) = settings.latency { + latency + } else { + let caps = ev.caps(); + let s = caps.structure(0).unwrap(); + let parsed = Some(true) == s.get("parsed").ok(); + + if parsed { + gst::ClockTime::ZERO + } else { + gst::ClockTime::from_seconds(6) + } + }; + state.configured_latency = latency; + drop(state); + + gst::debug!(CAT, obj: element, "Configuring latency of {}", latency); + if previous_latency != latency { + let _ = + element.post_message(gst::message::Latency::builder().src(element).build()); + } + + let caps = self.srcpad.pad_template_caps(); + self.srcpad + .push_event(gst::event::Caps::builder(&caps).build()) + } + _ => pad.event_default(Some(element), event), + } + } + + fn sink_query( + &self, + pad: &gst::Pad, + element: &super::OnvifMetadataParse, + query: &mut gst::QueryRef, + ) -> bool { + gst::log!(CAT, obj: pad, "Handling query {:?}", query); + + match query.view_mut() { + gst::QueryViewMut::Caps(q) => { + let caps = pad.pad_template_caps(); + let res = if let Some(filter) = q.filter() { + filter.intersect_with_mode(&caps, gst::CapsIntersectMode::First) + } else { + caps + }; + + q.set_result(&res); + + true + } + gst::QueryViewMut::AcceptCaps(q) => { + let caps = q.caps(); + let res = caps.can_intersect(&pad.pad_template_caps()); + q.set_result(res); + + true + } + _ => pad.query_default(Some(element), query), + } + } + + fn src_event( + &self, + pad: &gst::Pad, + element: &super::OnvifMetadataParse, + event: gst::Event, + ) -> bool { + gst::log!(CAT, obj: pad, "Handling event {:?}", event); + + match event.view() { + gst::EventView::FlushStop(_) => { + let mut state = self.state.lock().unwrap(); + state.pre_queued_buffers.clear(); + state.queued_frames.clear(); + state.utc_time_pts_mapping = None; + drop(state); + pad.event_default(Some(element), event) + } + _ => pad.event_default(Some(element), event), + } + } + + fn src_query( + &self, + pad: &gst::Pad, + element: &super::OnvifMetadataParse, + query: &mut gst::QueryRef, + ) -> bool { + gst::log!(CAT, obj: pad, "Handling query {:?}", query); + + match query.view_mut() { + gst::QueryViewMut::Caps(q) => { + let caps = pad.pad_template_caps(); + let res = if let Some(filter) = q.filter() { + filter.intersect_with_mode(&caps, gst::CapsIntersectMode::First) + } else { + caps + }; + + q.set_result(&res); + + true + } + gst::QueryViewMut::AcceptCaps(q) => { + let caps = q.caps(); + let res = caps.can_intersect(&pad.pad_template_caps()); + q.set_result(res); + + true + } + gst::QueryViewMut::Latency(q) => { + let mut upstream_query = gst::query::Latency::new(); + + let ret = self.sinkpad.peer_query(&mut upstream_query); + + if ret { + let (live, mut min, mut max) = upstream_query.result(); + + let state = self.state.lock().unwrap(); + min += state.configured_latency; + max = max.map(|max| max + state.configured_latency); + + q.set(live, min, max); + + gst::debug!( + CAT, + obj: pad, + "Latency query response: live {} min {} max {}", + live, + min, + max.display() + ); + } + + ret + } + _ => pad.query_default(Some(element), query), + } + } +} + +#[glib::object_subclass] +impl ObjectSubclass for OnvifMetadataParse { + const NAME: &'static str = "OnvifMetadataParse"; + type Type = super::OnvifMetadataParse; + type ParentType = gst::Element; + + fn with_class(klass: &Self::Class) -> Self { + let templ = klass.pad_template("sink").unwrap(); + let sinkpad = gst::Pad::builder_with_template(&templ, Some("sink")) + .chain_function(|pad, parent, buffer| { + OnvifMetadataParse::catch_panic_pad_function( + parent, + || Err(gst::FlowError::Error), + |parse, element| parse.sink_chain(pad, element, buffer), + ) + }) + .event_function(|pad, parent, event| { + OnvifMetadataParse::catch_panic_pad_function( + parent, + || false, + |parse, element| parse.sink_event(pad, element, event), + ) + }) + .query_function(|pad, parent, query| { + OnvifMetadataParse::catch_panic_pad_function( + parent, + || false, + |parse, element| parse.sink_query(pad, element, query), + ) + }) + .flags(gst::PadFlags::PROXY_ALLOCATION) + .build(); + + let templ = klass.pad_template("src").unwrap(); + let srcpad = gst::Pad::builder_with_template(&templ, Some("src")) + .event_function(|pad, parent, event| { + OnvifMetadataParse::catch_panic_pad_function( + parent, + || false, + |parse, element| parse.src_event(pad, element, event), + ) + }) + .query_function(|pad, parent, query| { + OnvifMetadataParse::catch_panic_pad_function( + parent, + || false, + |parse, element| parse.src_query(pad, element, query), + ) + }) + .flags(gst::PadFlags::PROXY_ALLOCATION) + .flags(gst::PadFlags::FIXED_CAPS) + .build(); + + Self { + srcpad, + sinkpad, + settings: Mutex::default(), + state: Mutex::default(), + } + } +} + +impl ObjectImpl for OnvifMetadataParse { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| { + vec![glib::ParamSpecUInt64::new( + "latency", + "Latency", + "Maximum latency to introduce for reordering metadata \ + (max=auto: 6s if unparsed input, 0s if parsed input)", + 0, + u64::MAX, + Settings::default() + .latency + .map(|l| l.nseconds()) + .unwrap_or(u64::MAX), + glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, + )] + }); + + PROPERTIES.as_ref() + } + + fn set_property( + &self, + obj: &Self::Type, + _id: usize, + value: &glib::Value, + pspec: &glib::ParamSpec, + ) { + match pspec.name() { + "latency" => { + self.settings.lock().unwrap().latency = value.get().expect("type checked upstream"); + + let _ = obj.post_message(gst::message::Latency::builder().src(obj).build()); + } + _ => unimplemented!(), + }; + } + + fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + match pspec.name() { + "latency" => self.settings.lock().unwrap().latency.to_value(), + _ => unimplemented!(), + } + } + + fn constructed(&self, obj: &Self::Type) { + self.parent_constructed(obj); + + obj.add_pad(&self.sinkpad).unwrap(); + obj.add_pad(&self.srcpad).unwrap(); + } +} + +impl GstObjectImpl for OnvifMetadataParse {} + +impl ElementImpl for OnvifMetadataParse { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "ONVIF Metadata Parser", + "Metadata/Parser", + "Parses ONVIF Timed XML Metadata", + "Sebastian Dröge <sebastian@centricular.com>", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| { + let caps = gst::Caps::builder("application/x-onvif-metadata") + .field("encoding", "utf8") + .field("parsed", true) + .build(); + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &caps, + ) + .unwrap(); + + let caps = gst::Caps::builder("application/x-onvif-metadata") + .field("encoding", "utf8") + .build(); + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &caps, + ) + .unwrap(); + + vec![src_pad_template, sink_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } + + fn change_state( + &self, + element: &Self::Type, + transition: gst::StateChange, + ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> { + gst::trace!(CAT, obj: element, "Changing state {:?}", transition); + + if transition == gst::StateChange::PausedToReady { + let mut state = self.state.lock().unwrap(); + *state = State::default(); + } + + let ret = self.parent_change_state(element, transition)?; + + if transition == gst::StateChange::ReadyToPaused { + let mut state = self.state.lock().unwrap(); + *state = State::default(); + } + + Ok(ret) + } +} diff --git a/net/onvif/src/onvifmetadataparse/mod.rs b/net/onvif/src/onvifmetadataparse/mod.rs new file mode 100644 index 000000000..a9ca306ad --- /dev/null +++ b/net/onvif/src/onvifmetadataparse/mod.rs @@ -0,0 +1,25 @@ +// Copyright (C) 2022 Sebastian Dröge <sebastian@centricular.com> +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// <https://mozilla.org/MPL/2.0/>. +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::glib; +use gst::prelude::*; + +mod imp; + +glib::wrapper! { + pub struct OnvifMetadataParse(ObjectSubclass<imp::OnvifMetadataParse>) @extends gst::Element, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "onvifmetadataparse", + gst::Rank::None, + OnvifMetadataParse::static_type(), + ) +} |