Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.toml2
-rw-r--r--ci/utils.py2
-rw-r--r--meson.build1
-rw-r--r--net/onvif/Cargo.toml47
l---------net/onvif/LICENSE-MPL-2.01
-rw-r--r--net/onvif/build.rs3
-rw-r--r--net/onvif/src/lib.rs38
-rw-r--r--net/onvif/src/onvifaggregator/imp.rs541
-rw-r--r--net/onvif/src/onvifaggregator/mod.rs17
-rw-r--r--net/onvif/src/onvifdepay/imp.rs204
-rw-r--r--net/onvif/src/onvifdepay/mod.rs17
-rw-r--r--net/onvif/src/onvifoverlay/imp.rs758
-rw-r--r--net/onvif/src/onvifoverlay/mod.rs17
-rw-r--r--net/onvif/src/onvifpay/imp.rs197
-rw-r--r--net/onvif/src/onvifpay/mod.rs17
15 files changed, 1861 insertions, 1 deletions
diff --git a/Cargo.toml b/Cargo.toml
index ea472bd4e..bfca285d9 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,6 +13,7 @@ members = [
"generic/sodium",
"generic/threadshare",
"net/hlssink3",
+ "net/onvif",
"net/reqwest",
"net/rusoto",
"utils/fallbackswitch",
@@ -45,6 +46,7 @@ default-members = [
"audio/lewton",
"generic/file",
"generic/threadshare",
+ "net/onvif",
"net/reqwest",
"net/rusoto",
"utils/fallbackswitch",
diff --git a/ci/utils.py b/ci/utils.py
index 1dd5a79a3..777535516 100644
--- a/ci/utils.py
+++ b/ci/utils.py
@@ -3,7 +3,7 @@ import os
DIRS = ['audio', 'generic', 'net', 'text', 'utils', 'video']
# Plugins whose name is prefixed by 'rs'
RS_PREFIXED = ['audiofx', 'closedcaption',
- 'dav1d', 'file', 'json', 'regex', 'webp']
+ 'dav1d', 'file', 'json', 'onvif', 'regex', 'webp']
OVERRIDE = {'wrap': 'rstextwrap', 'flavors': 'rsflv', 'ahead': 'textahead'}
diff --git a/meson.build b/meson.build
index a5a540a9c..1cc88311a 100644
--- a/meson.build
+++ b/meson.build
@@ -61,6 +61,7 @@ plugins = {
'gst-plugin-uriplaylistbin': 'libgsturiplaylistbin',
'gst-plugin-spotify': 'libgstspotify',
'gst-plugin-textahead': 'libgsttextahead',
+ 'gst-plugin-onvif': 'libgstrsonvif',
}
extra_env = {}
diff --git a/net/onvif/Cargo.toml b/net/onvif/Cargo.toml
new file mode 100644
index 000000000..684904210
--- /dev/null
+++ b/net/onvif/Cargo.toml
@@ -0,0 +1,47 @@
+[package]
+name = "gst-plugin-onvif"
+version = "0.9.0"
+authors = ["Mathieu Duponchelle <mathieu@centricular.com>"]
+repository = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs"
+license = "MPL-2.0"
+description = "Rust ONVIF Plugin"
+edition = "2021"
+rust-version = "1.57"
+
+[dependencies]
+gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] }
+gst-rtp = { package = "gstreamer-rtp", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] }
+gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] }
+gst-video = { package = "gstreamer-video", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] }
+once_cell = "1.0"
+xmlparser = "0.13"
+minidom = "0.14"
+chrono = "0.4"
+cairo-rs = { git = "https://github.com/gtk-rs/gtk-rs-core", features=["use_glib"] }
+pango = { git = "https://github.com/gtk-rs/gtk-rs-core" }
+pangocairo = { git = "https://github.com/gtk-rs/gtk-rs-core" }
+
+[lib]
+name = "gstrsonvif"
+crate-type = ["cdylib", "rlib"]
+path = "src/lib.rs"
+
+[build-dependencies]
+gst-plugin-version-helper = { path="../../version-helper" }
+
+[features]
+static = []
+capi = []
+
+[package.metadata.capi]
+min_version = "0.8.0"
+
+[package.metadata.capi.header]
+enabled = false
+
+[package.metadata.capi.library]
+install_subdir = "gstreamer-1.0"
+versioning = false
+
+[package.metadata.capi.pkg_config]
+requires_private = "gstreamer-1.0, gstreamer-base-1.0, gobject-2.0, glib-2.0, gmodule-2.0"
diff --git a/net/onvif/LICENSE-MPL-2.0 b/net/onvif/LICENSE-MPL-2.0
new file mode 120000
index 000000000..eb5d24fe9
--- /dev/null
+++ b/net/onvif/LICENSE-MPL-2.0
@@ -0,0 +1 @@
+../../LICENSE-MPL-2.0 \ No newline at end of file
diff --git a/net/onvif/build.rs b/net/onvif/build.rs
new file mode 100644
index 000000000..cda12e57e
--- /dev/null
+++ b/net/onvif/build.rs
@@ -0,0 +1,3 @@
+fn main() {
+ gst_plugin_version_helper::info()
+}
diff --git a/net/onvif/src/lib.rs b/net/onvif/src/lib.rs
new file mode 100644
index 000000000..c62a3626e
--- /dev/null
+++ b/net/onvif/src/lib.rs
@@ -0,0 +1,38 @@
+// Copyright (C) 2022 Mathieu Duponchelle <mathieu@centricular.com>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
+// If a copy of the MPL was not distributed with this file, You can obtain one at
+// <https://mozilla.org/MPL/2.0/>.
+//
+// SPDX-License-Identifier: MPL-2.0
+#![allow(clippy::non_send_fields_in_send_ty)]
+
+use gst::glib;
+
+mod onvifaggregator;
+mod onvifdepay;
+mod onvifoverlay;
+mod onvifpay;
+
+fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ onvifpay::register(plugin)?;
+ onvifdepay::register(plugin)?;
+ onvifaggregator::register(plugin)?;
+ onvifoverlay::register(plugin)?;
+
+ gst::meta::CustomMeta::register("OnvifXMLFrameMeta", &[], |_, _, _, _| true);
+
+ Ok(())
+}
+
+gst::plugin_define!(
+ rsonvif,
+ env!("CARGO_PKG_DESCRIPTION"),
+ plugin_init,
+ concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")),
+ "MPL",
+ env!("CARGO_PKG_NAME"),
+ env!("CARGO_PKG_NAME"),
+ env!("CARGO_PKG_REPOSITORY"),
+ env!("BUILD_REL_DATE")
+);
diff --git a/net/onvif/src/onvifaggregator/imp.rs b/net/onvif/src/onvifaggregator/imp.rs
new file mode 100644
index 000000000..26de81a3d
--- /dev/null
+++ b/net/onvif/src/onvifaggregator/imp.rs
@@ -0,0 +1,541 @@
+use gst::glib;
+use gst::prelude::*;
+use gst::subclass::prelude::*;
+use gst_base::prelude::*;
+use gst_base::subclass::prelude::*;
+use gst_base::AGGREGATOR_FLOW_NEED_DATA;
+use minidom::Element;
+use once_cell::sync::Lazy;
+use std::collections::BTreeSet;
+use std::io::Cursor;
+use std::sync::Mutex;
+
+// Offset in nanoseconds from midnight 01-01-1900 (prime epoch) to
+// midnight 01-01-1970 (UNIX epoch)
+const PRIME_EPOCH_OFFSET: gst::ClockTime = gst::ClockTime::from_seconds(2_208_988_800);
+
+// Incoming metadata is split up frame-wise, and stored in a FIFO.
+#[derive(Eq, Clone)]
+struct MetaFrame {
+ // From UtcTime attribute, in nanoseconds since prime epoch
+ timestamp: gst::ClockTime,
+ // The frame element, dumped to XML
+ buffer: gst::Buffer,
+}
+
+impl Ord for MetaFrame {
+ fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+ self.timestamp.cmp(&other.timestamp)
+ }
+}
+
+impl PartialOrd for MetaFrame {
+ fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+ Some(self.cmp(other))
+ }
+}
+
+impl PartialEq for MetaFrame {
+ fn eq(&self, other: &Self) -> bool {
+ self.timestamp == other.timestamp
+ }
+}
+
+#[derive(Default)]
+struct State {
+ // FIFO of MetaFrames
+ meta_frames: BTreeSet<MetaFrame>,
+}
+
+pub struct OnvifAggregator {
+ // Input media stream, can be anything with a reference timestamp meta
+ media_sink_pad: gst_base::AggregatorPad,
+ // Input metadata stream, must be complete VideoAnalytics XML documents
+ // as output by onvifdepay
+ meta_sink_pad: gst_base::AggregatorPad,
+ state: Mutex<State>,
+}
+
+static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
+ gst::DebugCategory::new(
+ "onvifaggregator",
+ gst::DebugColorFlags::empty(),
+ Some("ONVIF metadata / video aggregator"),
+ )
+});
+
+static NTP_CAPS: Lazy<gst::Caps> = Lazy::new(|| gst::Caps::builder("timestamp/x-ntp").build());
+
+#[glib::object_subclass]
+impl ObjectSubclass for OnvifAggregator {
+ const NAME: &'static str = "GstOnvifAggregator";
+ type Type = super::OnvifAggregator;
+ type ParentType = gst_base::Aggregator;
+
+ fn with_class(klass: &Self::Class) -> Self {
+ let templ = klass.pad_template("media").unwrap();
+ let media_sink_pad =
+ gst::PadBuilder::<gst_base::AggregatorPad>::from_template(&templ, Some("media"))
+ .build();
+
+ let templ = klass.pad_template("meta").unwrap();
+ let meta_sink_pad =
+ gst::PadBuilder::<gst_base::AggregatorPad>::from_template(&templ, Some("meta")).build();
+
+ Self {
+ media_sink_pad,
+ meta_sink_pad,
+ state: Mutex::default(),
+ }
+ }
+}
+
+impl ObjectImpl for OnvifAggregator {
+ fn constructed(&self, obj: &Self::Type) {
+ self.parent_constructed(obj);
+
+ obj.add_pad(&self.media_sink_pad).unwrap();
+ obj.add_pad(&self.meta_sink_pad).unwrap();
+ }
+}
+
+impl GstObjectImpl for OnvifAggregator {}
+
+impl ElementImpl for OnvifAggregator {
+ fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
+ static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
+ gst::subclass::ElementMetadata::new(
+ "ONVIF metadata aggregator",
+ "Aggregator",
+ "ONVIF metadata aggregator",
+ "Mathieu Duponchelle <mathieu@centricular.com>",
+ )
+ });
+
+ Some(&*ELEMENT_METADATA)
+ }
+
+ fn pad_templates() -> &'static [gst::PadTemplate] {
+ static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
+ let media_caps = gst::Caps::new_any();
+ let media_sink_pad_template = gst::PadTemplate::with_gtype(
+ "media",
+ gst::PadDirection::Sink,
+ gst::PadPresence::Always,
+ &media_caps,
+ gst_base::AggregatorPad::static_type(),
+ )
+ .unwrap();
+
+ let meta_caps = gst::Caps::builder("application/x-onvif-metadata")
+ .field("encoding", "utf8")
+ .build();
+
+ let meta_sink_pad_template = gst::PadTemplate::with_gtype(
+ "meta",
+ gst::PadDirection::Sink,
+ gst::PadPresence::Always,
+ &meta_caps,
+ gst_base::AggregatorPad::static_type(),
+ )
+ .unwrap();
+
+ let src_pad_template = gst::PadTemplate::new(
+ "src",
+ gst::PadDirection::Src,
+ gst::PadPresence::Always,
+ &media_caps,
+ )
+ .unwrap();
+
+ vec![
+ media_sink_pad_template,
+ meta_sink_pad_template,
+ src_pad_template,
+ ]
+ });
+
+ PAD_TEMPLATES.as_ref()
+ }
+
+ fn request_new_pad(
+ &self,
+ element: &Self::Type,
+ _templ: &gst::PadTemplate,
+ _name: Option<String>,
+ _caps: Option<&gst::Caps>,
+ ) -> Option<gst::Pad> {
+ gst::error!(
+ CAT,
+ obj: element,
+ "onvifaggregator doesn't expose request pads"
+ );
+
+ None
+ }
+
+ fn release_pad(&self, element: &Self::Type, _pad: &gst::Pad) {
+ gst::error!(
+ CAT,
+ obj: element,
+ "onvifaggregator doesn't expose request pads"
+ );
+ }
+}
+
+impl OnvifAggregator {
+ // We simply consume all the incoming meta buffers and store them in a FIFO
+ // as they arrive
+ fn consume_meta(
+ &self,
+ state: &mut State,
+ element: &super::OnvifAggregator,
+ ) -> Result<(), gst::FlowError> {
+ while let Some(buffer) = self.meta_sink_pad.pop_buffer() {
+ let buffer = buffer.into_mapped_buffer_readable().map_err(|_| {
+ gst::element_error!(
+ element,
+ gst::ResourceError::Read,
+ ["Failed to map buffer readable"]
+ );
+
+ gst::FlowError::Error
+ })?;
+
+ let utf8 = std::str::from_utf8(buffer.as_ref()).map_err(|err| {
+ gst::element_error!(
+ element,
+ gst::StreamError::Format,
+ ["Failed to decode buffer as UTF-8: {}", err]
+ );
+
+ gst::FlowError::Error
+ })?;
+
+ let root = utf8.parse::<Element>().map_err(|err| {
+ gst::element_error!(
+ element,
+ gst::ResourceError::Read,
+ ["Failed to parse buffer as XML: {}", err]
+ );
+
+ gst::FlowError::Error
+ })?;
+
+ if let Some(analytics) =
+ root.get_child("VideoAnalytics", "http://www.onvif.org/ver10/schema")
+ {
+ for el in analytics.children() {
+ // We are only interested in associating Frame metadata with video frames
+ if el.is("Frame", "http://www.onvif.org/ver10/schema") {
+ let timestamp = el.attr("UtcTime").ok_or_else(|| {
+ gst::element_error!(
+ element,
+ gst::ResourceError::Read,
+ ["Frame element has no UtcTime attribute"]
+ );
+
+ gst::FlowError::Error
+ })?;
+
+ let dt =
+ chrono::DateTime::parse_from_rfc3339(timestamp).map_err(|err| {
+ gst::element_error!(
+ element,
+ gst::ResourceError::Read,
+ ["Failed to parse UtcTime {}: {}", timestamp, err]
+ );
+
+ gst::FlowError::Error
+ })?;
+
+ let prime_dt_ns = PRIME_EPOCH_OFFSET
+ + gst::ClockTime::from_nseconds(dt.timestamp_nanos() as u64);
+
+ let mut writer = Cursor::new(Vec::new());
+ el.write_to(&mut writer).map_err(|err| {
+ gst::element_error!(
+ element,
+ gst::ResourceError::Write,
+ ["Failed to write back frame as XML: {}", err]
+ );
+
+ gst::FlowError::Error
+ })?;
+
+ gst::trace!(CAT, "Consuming metadata buffer {}", prime_dt_ns);
+
+ state.meta_frames.insert(MetaFrame {
+ timestamp: prime_dt_ns,
+ buffer: gst::Buffer::from_slice(writer.into_inner()),
+ });
+ }
+ }
+ }
+ }
+
+ Ok(())
+ }
+
+ fn lookup_reference_timestamp(&self, buffer: gst::Buffer) -> Option<gst::ClockTime> {
+ for meta in buffer.iter_meta::<gst::ReferenceTimestampMeta>() {
+ if meta.reference().is_subset(&NTP_CAPS) {
+ return Some(meta.timestamp());
+ }
+ }
+
+ None
+ }
+
+ // Called after consuming metadata buffers, we peek the current media buffer
+ // and output it when:
+ //
+ // * it does not have a reference timestamp meta
+ // * we have timed out
+ // * we have consumed a metadata buffer for a future frame
+ fn consume_media(
+ &self,
+ state: &mut State,
+ element: &super::OnvifAggregator,
+ timeout: bool,
+ ) -> Result<Option<(gst::Buffer, Option<gst::ClockTime>)>, gst::FlowError> {
+ if let Some(media_buffer) = self.media_sink_pad.peek_buffer() {
+ let duration = media_buffer.duration().ok_or_else(|| {
+ gst::error!(CAT, obj: element, "Require buffers with duration");
+ gst::FlowError::Error
+ })?;
+
+ if let Some(start) = self.lookup_reference_timestamp(media_buffer) {
+ let end = start + duration;
+
+ if let Some(latest_frame) = state.meta_frames.iter().next_back() {
+ if latest_frame.timestamp > end || timeout {
+ gst::debug!(
+ CAT,
+ obj: element,
+ "Media buffer spanning {} -> {} is ready",
+ start,
+ end
+ );
+ Ok(Some((self.media_sink_pad.pop_buffer().unwrap(), Some(end))))
+ } else {
+ gst::trace!(
+ CAT,
+ obj: element,
+ "Media buffer spanning {} -> {} isn't ready yet",
+ start,
+ end
+ );
+ Ok(None)
+ }
+ } else {
+ gst::trace!(
+ CAT,
+ obj: element,
+ "Media buffer spanning {} -> {} isn't ready yet",
+ start,
+ end
+ );
+
+ Ok(None)
+ }
+ } else {
+ gst::debug!(
+ CAT,
+ obj: element,
+ "Consuming media buffer with no reference NTP timestamp"
+ );
+
+ Ok(Some((
+ self.media_sink_pad.pop_buffer().unwrap(),
+ gst::ClockTime::NONE,
+ )))
+ }
+ } else {
+ gst::trace!(CAT, obj: element, "No media buffer queued");
+
+ Ok(None)
+ }
+ }
+}
+
+impl AggregatorImpl for OnvifAggregator {
+ fn aggregate(
+ &self,
+ element: &Self::Type,
+ timeout: bool,
+ ) -> Result<gst::FlowSuccess, gst::FlowError> {
+ let mut state = self.state.lock().unwrap();
+
+ self.consume_meta(&mut state, element)?;
+
+ // When the current media buffer is ready, we attach all matching metadata buffers
+ // and push it out
+ if let Some((mut buffer, end)) = self.consume_media(&mut state, element, timeout)? {
+ let mut buflist = gst::BufferList::new();
+
+ if let Some(end) = end {
+ let mut split_at: Option<MetaFrame> = None;
+ let buflist_mut = buflist.get_mut().unwrap();
+
+ for frame in state.meta_frames.iter() {
+ if frame.timestamp > end {
+ gst::trace!(
+ CAT,
+ obj: element,
+ "keeping metadata buffer at {} for next media buffer",
+ frame.timestamp
+ );
+ split_at = Some(frame.clone());
+ break;
+ } else {
+ gst::debug!(
+ CAT,
+ obj: element,
+ "Attaching meta buffer {}",
+ frame.timestamp
+ );
+ buflist_mut.add(frame.buffer.clone());
+ }
+ }
+
+ if let Some(split_at) = split_at {
+ state.meta_frames = state.meta_frames.split_off(&split_at);
+ } else {
+ state.meta_frames.clear();
+ }
+ }
+
+ drop(state);
+
+ {
+ let buf = buffer.make_mut();
+ let mut meta = gst::meta::CustomMeta::add(buf, "OnvifXMLFrameMeta").unwrap();
+
+ let s = meta.mut_structure();
+ s.set("frames", buflist);
+ }
+
+ element.set_position(buffer.pts().opt_add(buffer.duration()));
+
+ self.finish_buffer(element, buffer)
+ } else {
+ Err(AGGREGATOR_FLOW_NEED_DATA)
+ }
+ }
+
+ fn src_query(&self, aggregator: &Self::Type, query: &mut gst::QueryRef) -> bool {
+ use gst::QueryViewMut;
+
+ match query.view_mut() {
+ QueryViewMut::Position(..)
+ | QueryViewMut::Duration(..)
+ | QueryViewMut::Uri(..)
+ | QueryViewMut::Caps(..)
+ | QueryViewMut::Allocation(..) => self.media_sink_pad.peer_query(query),
+ QueryViewMut::AcceptCaps(q) => {
+ let caps = q.caps_owned();
+ let class = aggregator.class();
+ let templ_caps = class.pad_template("media").unwrap().caps();
+
+ q.set_result(caps.is_subset(&templ_caps));
+
+ true
+ }
+ _ => self.parent_src_query(aggregator, query),
+ }
+ }
+
+ fn sink_event(
+ &self,
+ aggregator: &Self::Type,
+ aggregator_pad: &gst_base::AggregatorPad,
+ event: gst::Event,
+ ) -> bool {
+ use gst::EventView;
+
+ match event.view() {
+ EventView::Caps(e) => {
+ if aggregator_pad.upcast_ref::<gst::Pad>() == &self.media_sink_pad {
+ gst::info!(CAT, obj: aggregator, "Pushing caps {}", e.caps());
+ aggregator.set_src_caps(&e.caps_owned());
+ }
+
+ true
+ }
+ EventView::Segment(e) => {
+ if aggregator_pad.upcast_ref::<gst::Pad>() == &self.media_sink_pad {
+ aggregator.update_segment(e.segment());
+ }
+ self.parent_sink_event(aggregator, aggregator_pad, event)
+ }
+ _ => self.parent_sink_event(aggregator, aggregator_pad, event),
+ }
+ }
+
+ fn sink_query(
+ &self,
+ aggregator: &Self::Type,
+ aggregator_pad: &gst_base::AggregatorPad,
+ query: &mut gst::QueryRef,
+ ) -> bool {
+ use gst::QueryViewMut;
+
+ match query.view_mut() {
+ QueryViewMut::Position(..)
+ | QueryViewMut::Duration(..)
+ | QueryViewMut::Uri(..)
+ | QueryViewMut::Allocation(..) => {
+ if aggregator_pad == &self.media_sink_pad {
+ let srcpad = aggregator.src_pad();
+ srcpad.peer_query(query)
+ } else {
+ self.parent_sink_query(aggregator, aggregator_pad, query)
+ }
+ }
+ QueryViewMut::Caps(q) => {
+ if aggregator_pad == &self.media_sink_pad {
+ let srcpad = aggregator.src_pad();
+ srcpad.peer_query(query)
+ } else {
+ let filter = q.filter_owned();
+ let class = aggregator.class();
+ let templ_caps = class.pad_template("meta").unwrap().caps();
+
+ if let Some(filter) = filter {
+ q.set_result(
+ &filter.intersect_with_mode(&templ_caps, gst::CapsIntersectMode::First),
+ );
+ } else {
+ q.set_result(&templ_caps);
+ }
+
+ true
+ }
+ }
+ QueryViewMut::AcceptCaps(q) => {
+ if aggregator_pad.upcast_ref::<gst::Pad>() == &self.media_sink_pad {
+ let srcpad = aggregator.src_pad();
+ srcpad.peer_query(query);
+ } else {
+ let caps = q.caps_owned();
+ let class = aggregator.class();
+ let templ_caps = class.pad_template("meta").unwrap().caps();
+
+ q.set_result(caps.is_subset(&templ_caps));
+ }
+
+ true
+ }
+ _ => self.parent_src_query(aggregator, query),
+ }
+ }
+
+ fn next_time(&self, aggregator: &Self::Type) -> Option<gst::ClockTime> {
+ aggregator.simple_get_next_time()
+ }
+
+ fn negotiate(&self, _aggregator: &Self::Type) -> bool {
+ true
+ }
+}
diff --git a/net/onvif/src/onvifaggregator/mod.rs b/net/onvif/src/onvifaggregator/mod.rs
new file mode 100644
index 000000000..50513f861
--- /dev/null
+++ b/net/onvif/src/onvifaggregator/mod.rs
@@ -0,0 +1,17 @@
+use gst::glib;
+use gst::prelude::*;
+
+mod imp;
+
+glib::wrapper! {
+ pub struct OnvifAggregator(ObjectSubclass<imp::OnvifAggregator>) @extends gst_base::Aggregator, gst::Element, gst::Object;
+}
+
+pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ gst::Element::register(
+ Some(plugin),
+ "onvifaggregator",
+ gst::Rank::Primary,
+ OnvifAggregator::static_type(),
+ )
+}
diff --git a/net/onvif/src/onvifdepay/imp.rs b/net/onvif/src/onvifdepay/imp.rs
new file mode 100644
index 000000000..6f36d3b0d
--- /dev/null
+++ b/net/onvif/src/onvifdepay/imp.rs
@@ -0,0 +1,204 @@
+use gst::glib;
+use gst::subclass::prelude::*;
+use gst_rtp::subclass::prelude::*;
+use once_cell::sync::Lazy;
+use std::sync::Mutex;
+
+#[derive(Default)]
+struct State {
+ // Aggregate payloads to form a complete XML document
+ adapter: gst_base::UniqueAdapter,
+}
+
+#[derive(Default)]
+pub struct OnvifDepay {
+ state: Mutex<State>,
+}
+
+static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
+ gst::DebugCategory::new(
+ "onvifdepay",
+ gst::DebugColorFlags::empty(),
+ Some("ONVIF metadata depayloader"),
+ )
+});
+
+#[glib::object_subclass]
+impl ObjectSubclass for OnvifDepay {
+ const NAME: &'static str = "GstOnvifDepay";
+ type Type = super::OnvifDepay;
+ type ParentType = gst_rtp::RTPBaseDepayload;
+}
+
+impl ObjectImpl for OnvifDepay {}
+
+impl GstObjectImpl for OnvifDepay {}
+
+impl ElementImpl for OnvifDepay {
+ fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
+ static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
+ gst::subclass::ElementMetadata::new(
+ "ONVIF metadata RTP depayloader",
+ "Depayloader/Network/RTP",
+ "ONVIF metadata RTP depayloader",
+ "Mathieu Duponchelle <mathieu@centricular.com>",
+ )
+ });
+
+ Some(&*ELEMENT_METADATA)
+ }
+
+ fn pad_templates() -> &'static [gst::PadTemplate] {
+ static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
+ let sink_caps = gst::Caps::builder("application/x-rtp")
+ .field("media", "application")
+ .field("payload", gst::IntRange::new(96, 127))
+ .field("clock-rate", 90000)
+ .field("encoding-name", "VND.ONVIF.METADATA")
+ .build();
+ let sink_pad_template = gst::PadTemplate::new(
+ "sink",
+ gst::PadDirection::Sink,
+ gst::PadPresence::Always,
+ &sink_caps,
+ )
+ .unwrap();
+
+ let src_caps = gst::Caps::builder("application/x-onvif-metadata")
+ .field("encoding", "utf8")
+ .build();
+ let src_pad_template = gst::PadTemplate::new(
+ "src",
+ gst::PadDirection::Src,
+ gst::PadPresence::Always,
+ &src_caps,
+ )
+ .unwrap();
+
+ vec![src_pad_template, sink_pad_template]
+ });
+
+ PAD_TEMPLATES.as_ref()
+ }
+}
+
+impl RTPBaseDepayloadImpl for OnvifDepay {
+ fn process_rtp_packet(
+ &self,
+ element: &Self::Type,
+ rtp_buffer: &gst_rtp::RTPBuffer<gst_rtp::rtp_buffer::Readable>,
+ ) -> Option<gst::Buffer> {
+ // Retrieve the payload subbuffer
+ let payload_buffer = match rtp_buffer.payload_buffer() {
+ Ok(buffer) => buffer,
+ Err(..) => {
+ gst::element_error!(
+ element,
+ gst::ResourceError::Read,
+ ["Failed to retrieve RTP buffer payload"]
+ );
+
+ return None;
+ }
+ };
+
+ let mut state = self.state.lock().unwrap();
+
+ if rtp_buffer
+ .buffer()
+ .flags()
+ .contains(gst::BufferFlags::DISCONT)
+ {
+ gst::debug!(CAT, obj: element, "processing discont RTP buffer");
+ state.adapter.clear();
+ }
+
+ // Now store in the adapter
+ state.adapter.push(payload_buffer);
+
+ if !rtp_buffer.is_marker() {
+ return None;
+ }
+
+ // We have found the last chunk for this document, empty the adapter
+ let available = state.adapter.available();
+ let buffer = match state.adapter.take_buffer(available) {
+ Ok(buffer) => buffer,
+ Err(err) => {
+ gst::element_error!(
+ element,
+ gst::ResourceError::Read,
+ ["Failed to empty adapter: {}", err]
+ );
+
+ return None;
+ }
+ };
+
+ // Sanity check the document
+ let map = buffer.map_readable().unwrap();
+
+ let utf8 = match std::str::from_utf8(map.as_ref()) {
+ Ok(s) => s,
+ Err(err) => {
+ gst::warning!(
+ CAT,
+ obj: element,
+ "Failed to decode payload as UTF-8: {}",
+ err
+ );
+
+ return None;
+ }
+ };
+
+ let forward = {
+ let mut forward = false;
+
+ for token in xmlparser::Tokenizer::from(utf8) {
+ match token {
+ Ok(token) => match token {
+ xmlparser::Token::Comment { .. } => {
+ continue;
+ }
+ xmlparser::Token::Declaration { .. } => {
+ continue;
+ }
+ xmlparser::Token::ElementStart { local, .. } => {
+ if local.as_str() == "MetadataStream" {
+ forward = true;
+ }
+ break;
+ }
+ _ => {
+ forward = false;
+ break;
+ }
+ },
+ Err(err) => {
+ gst::warning!(CAT, obj: element, "Invalid XML in payload: {}", err);
+
+ return None;
+ }
+ }
+ }
+
+ forward
+ };
+
+ // Incomplete, wait for the next document
+ if !forward {
+ gst::warning!(
+ CAT,
+ obj: element,
+ "document must start with tt:MetadataStream element",
+ );
+
+ return None;
+ }
+
+ drop(map);
+
+ Some(buffer)
+ }
+}
diff --git a/net/onvif/src/onvifdepay/mod.rs b/net/onvif/src/onvifdepay/mod.rs
new file mode 100644
index 000000000..8c56bbbf6
--- /dev/null
+++ b/net/onvif/src/onvifdepay/mod.rs
@@ -0,0 +1,17 @@
+use gst::glib;
+use gst::prelude::*;
+
+mod imp;
+
+glib::wrapper! {
+ pub struct OnvifDepay(ObjectSubclass<imp::OnvifDepay>) @extends gst_rtp::RTPBaseDepayload, gst::Element, gst::Object;
+}
+
+pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ gst::Element::register(
+ Some(plugin),
+ "rtponvifdepay",
+ gst::Rank::Primary,
+ OnvifDepay::static_type(),
+ )
+}
diff --git a/net/onvif/src/onvifoverlay/imp.rs b/net/onvif/src/onvifoverlay/imp.rs
new file mode 100644
index 000000000..07a8c336e
--- /dev/null
+++ b/net/onvif/src/onvifoverlay/imp.rs
@@ -0,0 +1,758 @@
+use gst::glib;
+use gst::prelude::*;
+use gst::subclass::prelude::*;
+use gst_video::prelude::*;
+use pango::prelude::*;
+
+use once_cell::sync::Lazy;
+
+use std::collections::HashSet;
+use std::sync::Mutex;
+
+use minidom::Element;
+
+static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
+ gst::DebugCategory::new(
+ "onvifoverlay",
+ gst::DebugColorFlags::empty(),
+ Some("ONVIF overlay element"),
+ )
+});
+
+const DEFAULT_FONT_DESC: &str = "monospace 12";
+
+// Shape description in cairo coordinates (0, 0) is top left
+#[derive(Debug)]
+struct Shape {
+ x: u32,
+ y: u32,
+ width: u32,
+ height: u32,
+ // Optional text rendered from top left of rectangle
+ tag: Option<String>,
+}
+
+#[derive(Default)]
+struct State {
+ video_info: Option<gst_video::VideoInfo>,
+ composition: Option<gst_video::VideoOverlayComposition>,
+ layout: Option<pango::Layout>,
+ attach: bool,
+}
+
+// SAFETY: Required because `pango::Layout` is not `Send` but the whole `State` needs to be.
+// We ensure that no additional references to the layout are ever created, which makes it safe
+// to send it to other threads as long as only a single thread uses it concurrently.
+unsafe impl Send for State {}
+
+struct Settings {
+ font_desc: String,
+}
+
+impl Default for Settings {
+ fn default() -> Self {
+ Self {
+ font_desc: String::from(DEFAULT_FONT_DESC),
+ }
+ }
+}
+
+pub struct OnvifOverlay {
+ srcpad: gst::Pad,
+ sinkpad: gst::Pad,
+ state: Mutex<State>,
+ settings: Mutex<Settings>,
+}
+
+impl OnvifOverlay {
+ fn negotiate(&self, element: &super::OnvifOverlay) -> Result<gst::FlowSuccess, gst::FlowError> {
+ let video_info = match self.state.lock().unwrap().video_info.as_ref() {
+ Some(video_info) => Ok(video_info.clone()),
+ None => {
+ gst::element_error!(
+ element,
+ gst::CoreError::Negotiation,
+ ["Element hasn't received valid video caps at negotiation time"]
+ );
+ Err(gst::FlowError::NotNegotiated)
+ }
+ }?;
+
+ let mut caps = video_info.to_caps().unwrap();
+ let mut downstream_accepts_meta = false;
+
+ let upstream_has_meta = caps
+ .features(0)
+ .map(|f| f.contains(&gst_video::CAPS_FEATURE_META_GST_VIDEO_OVERLAY_COMPOSITION))
+ .unwrap_or(false);
+
+ if !upstream_has_meta {
+ let mut caps_clone = caps.clone();
+ let overlay_caps = caps_clone.make_mut();
+
+ if let Some(features) = overlay_caps.features_mut(0) {
+ features.add(&gst_video::CAPS_FEATURE_META_GST_VIDEO_OVERLAY_COMPOSITION);
+ let peercaps = self.srcpad.peer_query_caps(Some(&caps_clone));
+ downstream_accepts_meta = !peercaps.is_empty();
+ if downstream_accepts_meta {
+ caps = caps_clone;
+ }
+ }
+ }
+
+ gst::debug!(
+ CAT,
+ obj: element,
+ "upstream has meta: {}, downstream accepts meta: {}",
+ upstream_has_meta,
+ downstream_accepts_meta
+ );
+
+ if upstream_has_meta || downstream_accepts_meta {
+ let mut query = gst::query::Allocation::new(&caps, false);
+
+ if !self.srcpad.push_event(gst::event::Caps::new(&caps)) {
+ return Err(gst::FlowError::NotNegotiated);
+ }
+
+ if !self.srcpad.peer_query(&mut query)
+ && self.srcpad.pad_flags().contains(gst::PadFlags::FLUSHING)
+ {
+ return Err(gst::FlowError::NotNegotiated);
+ }
+
+ let attach = query
+ .find_allocation_meta::<gst_video::VideoOverlayCompositionMeta>()
+ .is_some();
+
+ gst::debug!(CAT, obj: element, "attach meta: {}", attach);
+
+ self.state.lock().unwrap().attach = attach;
+
+ Ok(gst::FlowSuccess::Ok)
+ } else {
+ self.state.lock().unwrap().attach = false;
+
+ if !self.srcpad.push_event(gst::event::Caps::new(&caps)) {
+ Err(gst::FlowError::NotNegotiated)
+ } else {
+ Ok(gst::FlowSuccess::Ok)
+ }
+ }
+ }
+
+ fn render_shape_buffer(
+ &self,
+ state: &mut State,
+ width: u32,
+ height: u32,
+ tag: Option<&str>,
+ ) -> Option<(gst::Buffer, u32, u32)> {
+ let mut text_width = 0;
+ let mut text_height = 0;
+
+ // If we have text to render, update the layout first in order to compute
+ // the final size
+ let layout = tag.and_then(|tag| {
+ state.layout.as_ref().unwrap().set_text(tag);
+
+ state.layout.clone()
+ });
+
+ if let Some(ref layout) = layout {
+ let (_ink_rect, logical_rect) = layout.extents();
+
+ text_width = logical_rect.width() / pango::SCALE;
+ text_height = logical_rect.height() / pango::SCALE;
+ }
+
+ let total_height = height.max(text_height as u32);
+ let total_width = width.max(text_width as u32);
+
+ let mut buffer = gst::Buffer::with_size((total_width * total_height) as usize * 4).ok()?;
+
+ gst_video::VideoMeta::add(
+ buffer.get_mut().unwrap(),
+ gst_video::VideoFrameFlags::empty(),
+ #[cfg(target_endian = "little")]
+ gst_video::VideoFormat::Bgra,
+ #[cfg(target_endian = "big")]
+ gst_video::VideoFormat::Argb,
+ total_width as u32,
+ total_height as u32,
+ )
+ .ok()?;
+
+ let buffer = buffer.into_mapped_buffer_writable().unwrap();
+
+ // Pass ownership of the buffer to the cairo surface but keep around
+ // a raw pointer so we can later retrieve it again when the surface
+ // is done
+ let buffer_ptr = unsafe { buffer.buffer().as_ptr() };
+ let surface = cairo::ImageSurface::create_for_data(
+ buffer,
+ cairo::Format::ARgb32,
+ total_width as i32,
+ total_height as i32,
+ total_width as i32 * 4,
+ )
+ .ok()?;
+
+ let cr = cairo::Context::new(&surface).ok()?;
+ let line_width = 1.;
+
+ // Clear background
+ cr.set_operator(cairo::Operator::Source);
+ cr.set_source_rgba(0.0, 0.0, 0.0, 0.0);
+ cr.paint().ok()?;
+
+ cr.save().ok()?;
+
+ // Now render the rectangle
+ cr.move_to(line_width, line_width);
+ cr.line_to(line_width, height as f64 - line_width);
+ cr.line_to(width as f64 - line_width, height as f64 - line_width);
+ cr.line_to(width as f64 - line_width, line_width);
+ cr.close_path();
+ cr.set_source_rgba(1., 0., 0., 1.);
+ cr.set_line_width(line_width);
+ let _ = cr.stroke();
+
+ cr.restore().ok()?;
+
+ // Finally render the text, if any
+ if let Some(layout) = layout {
+ cr.save().ok()?;
+
+ cr.move_to(0., 0.);
+ cr.set_operator(cairo::Operator::Over);
+ cr.set_source_rgba(1.0, 1.0, 1.0, 1.0);
+ pangocairo::functions::layout_path(&cr, &layout);
+ cr.stroke().ok()?;
+
+ cr.restore().ok()?;
+ cr.save().ok()?;
+
+ cr.move_to(0., 0.);
+ cr.set_source_rgba(0.0, 0.0, 0.0, 1.0);
+ pangocairo::functions::show_layout(&cr, &layout);
+
+ cr.restore().ok()?;
+ }
+
+ drop(cr);
+
+ // Safety: The surface still owns a mutable reference to the buffer but our reference
+ // to the surface here is the last one. After dropping the surface the buffer would be
+ // freed, so we keep an additional strong reference here before dropping the surface,
+ // which is then returned. As such it's guaranteed that nothing is using the buffer
+ // anymore mutably.
+ unsafe {
+ assert_eq!(
+ cairo::ffi::cairo_surface_get_reference_count(surface.to_raw_none()),
+ 1
+ );
+ let buffer = glib::translate::from_glib_none(buffer_ptr);
+ drop(surface);
+ Some((buffer, total_width, total_height))
+ }
+ }
+
+ // Update our overlay composition with a set of rectangles
+ fn overlay_shapes(&self, state: &mut State, element: &super::OnvifOverlay, shapes: Vec<Shape>) {
+ if shapes.is_empty() {
+ state.composition = None;
+ return;
+ }
+
+ if state.layout.is_none() {
+ let fontmap = match pangocairo::FontMap::new() {
+ Some(fontmap) => Ok(fontmap),
+ None => {
+ gst::element_error!(
+ element,
+ gst::LibraryError::Failed,
+ ["Failed to create pangocairo font map"]
+ );
+ Err(gst::FlowError::Error)
+ }
+ }
+ .unwrap();
+ let context = match fontmap.create_context() {
+ Some(context) => Ok(context),
+ None => {
+ gst::element_error!(
+ element,
+ gst::LibraryError::Failed,
+ ["Failed to create font map context"]
+ );
+ Err(gst::FlowError::Error)
+ }
+ }
+ .unwrap();
+ context.set_language(&pango::Language::from_string("en_US"));
+ context.set_base_dir(pango::Direction::Ltr);
+ let layout = pango::Layout::new(&context);
+ layout.set_alignment(pango::Alignment::Left);
+ let font_desc =
+ pango::FontDescription::from_string(&self.settings.lock().unwrap().font_desc);
+ layout.set_font_description(Some(&font_desc));
+
+ state.layout = Some(layout);
+ }
+
+ let mut composition = gst_video::VideoOverlayComposition::default();
+ let composition_mut = composition.get_mut().unwrap();
+ for shape in &shapes {
+ // Sanity check: don't render 0-sized shapes
+ if shape.width == 0 || shape.height == 0 {
+ continue;
+ }
+
+ gst::debug!(
+ CAT,
+ obj: element,
+ "Rendering shape with tag {:?} x {} y {} width {} height {}",
+ shape.tag,
+ shape.x,
+ shape.y,
+ shape.width,
+ shape.height
+ );
+
+ let (buffer, width, height) = match self.render_shape_buffer(
+ state,
+ shape.width,
+ shape.height,
+ shape.tag.as_deref(),
+ ) {
+ Some(ret) => ret,
+ None => {
+ gst::error!(CAT, obj: element, "Failed to render buffer");
+ state.composition = None;
+ return;
+ }
+ };
+
+ let rect = gst_video::VideoOverlayRectangle::new_raw(
+ &buffer,
+ shape.x as i32,
+ shape.y as i32,
+ width,
+ height,
+ gst_video::VideoOverlayFormatFlags::PREMULTIPLIED_ALPHA,
+ );
+
+ composition_mut.add_rectangle(&rect);
+ }
+
+ state.composition = Some(composition);
+ }
+
+ fn sink_chain(
+ &self,
+ pad: &gst::Pad,
+ element: &super::OnvifOverlay,
+ mut buffer: gst::Buffer,
+ ) -> Result<gst::FlowSuccess, gst::FlowError> {
+ gst::trace!(CAT, obj: pad, "Handling buffer {:?}", buffer);
+
+ if self.srcpad.check_reconfigure() {
+ if let Err(err) = self.negotiate(element) {
+ if self.srcpad.pad_flags().contains(gst::PadFlags::FLUSHING) {
+ self.srcpad.mark_reconfigure();
+ return Ok(gst::FlowSuccess::Ok);
+ } else {
+ return Err(err);
+ }
+ }
+ }
+
+ let mut state = self.state.lock().unwrap();
+
+ let video_info = state.video_info.as_ref().unwrap();
+ let width = video_info.width() as i32;
+ let height = video_info.height() as i32;
+
+ if let Ok(meta) = gst::meta::CustomMeta::from_buffer(&buffer, "OnvifXMLFrameMeta") {
+ let s = meta.structure();
+ let mut shapes: Vec<Shape> = Vec::new();
+
+ if let Ok(frames) = s.get::<gst::BufferList>("frames") {
+ gst::log!(CAT, obj: element, "Overlaying {} frames", frames.len());
+
+ // Metadata for multiple frames may be attached to this frame, either because:
+ //
+ // * Multiple analytics modules are producing metadata
+ // * The metadata for two frames produced by the same module is attached
+ // to this frame, for instance because of resynchronization or other
+ // timing-related situations
+ //
+ // We want to display all detected objects for the first case, but only the
+ // latest version for the second case. As frames are sorted in increasing temporal
+ // order, we iterate them in reverse to start with the most recent, and deduplicate
+ // by object id.
+
+ let mut object_ids = HashSet::new();
+
+ for buffer in frames.iter().rev() {
+ let buffer = buffer.map_readable().map_err(|_| {
+ gst::element_error!(
+ element,
+ gst::ResourceError::Read,
+ ["Failed to map buffer readable"]
+ );
+
+ gst::FlowError::Error
+ })?;
+
+ let utf8 = std::str::from_utf8(buffer.as_ref()).map_err(|err| {
+ gst::element_error!(
+ element,
+ gst::StreamError::Format,
+ ["Failed to decode buffer as UTF-8: {}", err]
+ );
+
+ gst::FlowError::Error
+ })?;
+
+ let root = utf8.parse::<Element>().map_err(|err| {
+ gst::element_error!(
+ element,
+ gst::ResourceError::Read,
+ ["Failed to parse buffer as XML: {}", err]
+ );
+
+ gst::FlowError::Error
+ })?;
+
+ for object in root.children() {
+ if object.is("Object", "http://www.onvif.org/ver10/schema") {
+ gst::trace!(CAT, obj: element, "Handling object {:?}", object);
+
+ let object_id = match object.attr("ObjectId") {
+ Some(id) => id.to_string(),
+ None => {
+ gst::warning!(CAT, obj: element, "XML Object with no ObjectId");
+ continue;
+ }
+ };
+
+ if !object_ids.insert(object_id.clone()) {
+ gst::debug!(CAT, "Skipping older version of object {}", object_id);
+ continue;
+ }
+
+ let appearance = match object
+ .get_child("Appearance", "http://www.onvif.org/ver10/schema")
+ {
+ Some(appearance) => appearance,
+ None => continue,
+ };
+
+ let shape = match appearance
+ .get_child("Shape", "http://www.onvif.org/ver10/schema")
+ {
+ Some(shape) => shape,
+ None => continue,
+ };
+
+ let tag = appearance
+ .get_child("Class", "http://www.onvif.org/ver10/schema")
+ .and_then(|class| {
+ class.get_child("Type", "http://www.onvif.org/ver10/schema")
+ })
+ .map(|t| t.text());
+
+ let bbox = match shape
+ .get_child("BoundingBox", "http://www.onvif.org/ver10/schema")
+ {
+ Some(bbox) => bbox,
+ None => {
+ gst::warning!(
+ CAT,
+ obj: element,
+ "XML Shape with no BoundingBox"
+ );
+ continue;
+ }
+ };
+
+ let left: f64 = match bbox.attr("left").and_then(|val| val.parse().ok())
+ {
+ Some(val) => val,
+ None => {
+ gst::warning!(
+ CAT,
+ obj: element,
+ "BoundingBox with no left attribute"
+ );
+ continue;
+ }
+ };
+
+ let right: f64 =
+ match bbox.attr("right").and_then(|val| val.parse().ok()) {
+ Some(val) => val,
+ None => {
+ gst::warning!(
+ CAT,
+ obj: element,
+ "BoundingBox with no right attribute"
+ );
+ continue;
+ }
+ };
+
+ let top: f64 = match bbox.attr("top").and_then(|val| val.parse().ok()) {
+ Some(val) => val,
+ None => {
+ gst::warning!(
+ CAT,
+ obj: element,
+ "BoundingBox with no top attribute"
+ );
+ continue;
+ }
+ };
+
+ let bottom: f64 =
+ match bbox.attr("bottom").and_then(|val| val.parse().ok()) {
+ Some(val) => val,
+ None => {
+ gst::warning!(
+ CAT,
+ obj: element,
+ "BoundingBox with no bottom attribute"
+ );
+ continue;
+ }
+ };
+
+ let x1 = width / 2 + ((left * (width / 2) as f64) as i32);
+ let y1 = height / 2 - ((top * (height / 2) as f64) as i32);
+ let x2 = width / 2 + ((right * (width / 2) as f64) as i32);
+ let y2 = height / 2 - ((bottom * (height / 2) as f64) as i32);
+
+ shapes.push(Shape {
+ x: x1 as u32,
+ y: y1 as u32,
+ width: (x2 - x1) as u32,
+ height: (y2 - y1) as u32,
+ tag,
+ });
+ }
+ }
+ }
+
+ if !frames.is_empty() {
+ self.overlay_shapes(&mut state, element, shapes);
+ }
+ }
+ }
+
+ if let Some(composition) = &state.composition {
+ let buffer = buffer.make_mut();
+ if state.attach {
+ gst_video::VideoOverlayCompositionMeta::add(buffer, composition);
+ } else {
+ let mut frame = gst_video::VideoFrameRef::from_buffer_ref_writable(
+ buffer,
+ state.video_info.as_ref().unwrap(),
+ )
+ .unwrap();
+
+ if composition.blend(&mut frame).is_err() {
+ gst::error!(CAT, obj: pad, "Failed to blend composition");
+ }
+ }
+ }
+ drop(state);
+
+ self.srcpad.push(buffer)
+ }
+
+ fn sink_event(&self, pad: &gst::Pad, element: &super::OnvifOverlay, event: gst::Event) -> bool {
+ use gst::EventView;
+
+ gst::log!(CAT, obj: pad, "Handling event {:?}", event);
+
+ match event.view() {
+ EventView::Caps(c) => {
+ let mut state = self.state.lock().unwrap();
+ state.video_info = gst_video::VideoInfo::from_caps(c.caps()).ok();
+ drop(state);
+ self.srcpad.check_reconfigure();
+ match self.negotiate(element) {
+ Ok(_) => true,
+ Err(_) => {
+ self.srcpad.mark_reconfigure();
+ true
+ }
+ }
+ }
+ EventView::FlushStop(..) => {
+ let mut state = self.state.lock().unwrap();
+ state.composition = None;
+ pad.event_default(Some(element), event)
+ }
+ _ => pad.event_default(Some(element), event),
+ }
+ }
+}
+
+#[glib::object_subclass]
+impl ObjectSubclass for OnvifOverlay {
+ const NAME: &'static str = "GstOnvifOverlay";
+ type Type = super::OnvifOverlay;
+ type ParentType = gst::Element;
+
+ fn with_class(klass: &Self::Class) -> Self {
+ let templ = klass.pad_template("sink").unwrap();
+ let sinkpad = gst::Pad::builder_with_template(&templ, Some("sink"))
+ .chain_function(|pad, parent, buffer| {
+ OnvifOverlay::catch_panic_pad_function(
+ parent,
+ || Err(gst::FlowError::Error),
+ |overlay, element| overlay.sink_chain(pad, element, buffer),
+ )
+ })
+ .event_function(|pad, parent, event| {
+ OnvifOverlay::catch_panic_pad_function(
+ parent,
+ || false,
+ |overlay, element| overlay.sink_event(pad, element, event),
+ )
+ })
+ .flags(gst::PadFlags::PROXY_CAPS)
+ .flags(gst::PadFlags::PROXY_ALLOCATION)
+ .build();
+
+ let templ = klass.pad_template("src").unwrap();
+ let srcpad = gst::Pad::builder_with_template(&templ, Some("src"))
+ .flags(gst::PadFlags::PROXY_CAPS)
+ .flags(gst::PadFlags::PROXY_ALLOCATION)
+ .build();
+
+ Self {
+ srcpad,
+ sinkpad,
+ state: Mutex::new(State::default()),
+ settings: Mutex::new(Settings::default()),
+ }
+ }
+}
+
+impl ObjectImpl for OnvifOverlay {
+ fn properties() -> &'static [glib::ParamSpec] {
+ static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
+ vec![glib::ParamSpecString::new(
+ "font-desc",
+ "Font Description",
+ "Pango font description of font to be used for rendering",
+ Some(DEFAULT_FONT_DESC),
+ glib::ParamFlags::READWRITE,
+ )]
+ });
+
+ PROPERTIES.as_ref()
+ }
+
+ fn set_property(
+ &self,
+ _obj: &Self::Type,
+ _id: usize,
+ value: &glib::Value,
+ pspec: &glib::ParamSpec,
+ ) {
+ match pspec.name() {
+ "font-desc" => {
+ self.settings.lock().unwrap().font_desc = value
+ .get::<Option<String>>()
+ .expect("type checked upstream")
+ .unwrap_or_else(|| DEFAULT_FONT_DESC.into());
+ self.state.lock().unwrap().layout.take();
+ }
+ _ => unimplemented!(),
+ };
+ }
+
+ fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
+ match pspec.name() {
+ "font-desc" => self.settings.lock().unwrap().font_desc.to_value(),
+ _ => unimplemented!(),
+ }
+ }
+
+ fn constructed(&self, obj: &Self::Type) {
+ self.parent_constructed(obj);
+
+ obj.add_pad(&self.sinkpad).unwrap();
+ obj.add_pad(&self.srcpad).unwrap();
+ }
+}
+
+impl GstObjectImpl for OnvifOverlay {}
+
+impl ElementImpl for OnvifOverlay {
+ fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
+ static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
+ gst::subclass::ElementMetadata::new(
+ "ONVIF overlay",
+ "Video/Overlay",
+ "Renders ONVIF analytics meta over raw video frames",
+ "Mathieu Duponchelle <mathieu@centricular.com>",
+ )
+ });
+
+ Some(&*ELEMENT_METADATA)
+ }
+
+ fn pad_templates() -> &'static [gst::PadTemplate] {
+ static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
+ let caps = gst_video::VideoFormat::iter_raw()
+ .into_video_caps()
+ .unwrap()
+ .build();
+
+ let sink_pad_template = gst::PadTemplate::new(
+ "sink",
+ gst::PadDirection::Sink,
+ gst::PadPresence::Always,
+ &caps,
+ )
+ .unwrap();
+
+ let src_pad_template = gst::PadTemplate::new(
+ "src",
+ gst::PadDirection::Src,
+ gst::PadPresence::Always,
+ &caps,
+ )
+ .unwrap();
+
+ vec![src_pad_template, sink_pad_template]
+ });
+
+ PAD_TEMPLATES.as_ref()
+ }
+
+ fn change_state(
+ &self,
+ element: &Self::Type,
+ transition: gst::StateChange,
+ ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
+ gst::trace!(CAT, obj: element, "Changing state {:?}", transition);
+
+ match transition {
+ gst::StateChange::ReadyToPaused | gst::StateChange::PausedToReady => {
+ // Reset the whole state
+ let mut state = self.state.lock().unwrap();
+ *state = State::default();
+ }
+ _ => (),
+ }
+
+ self.parent_change_state(element, transition)
+ }
+}
diff --git a/net/onvif/src/onvifoverlay/mod.rs b/net/onvif/src/onvifoverlay/mod.rs
new file mode 100644
index 000000000..d37635e50
--- /dev/null
+++ b/net/onvif/src/onvifoverlay/mod.rs
@@ -0,0 +1,17 @@
+use gst::glib;
+use gst::prelude::*;
+
+mod imp;
+
+glib::wrapper! {
+ pub struct OnvifOverlay(ObjectSubclass<imp::OnvifOverlay>) @extends gst::Element, gst::Object;
+}
+
+pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ gst::Element::register(
+ Some(plugin),
+ "onvifoverlay",
+ gst::Rank::Primary,
+ OnvifOverlay::static_type(),
+ )
+}
diff --git a/net/onvif/src/onvifpay/imp.rs b/net/onvif/src/onvifpay/imp.rs
new file mode 100644
index 000000000..59ac47a09
--- /dev/null
+++ b/net/onvif/src/onvifpay/imp.rs
@@ -0,0 +1,197 @@
+use gst::glib;
+use gst::subclass::prelude::*;
+use gst_rtp::prelude::*;
+use gst_rtp::subclass::prelude::*;
+use once_cell::sync::Lazy;
+
+#[derive(Default)]
+pub struct OnvifPay {}
+
+#[glib::object_subclass]
+impl ObjectSubclass for OnvifPay {
+ const NAME: &'static str = "GstOnvifPay";
+ type Type = super::OnvifPay;
+ type ParentType = gst_rtp::RTPBasePayload;
+}
+
+impl ObjectImpl for OnvifPay {}
+
+impl GstObjectImpl for OnvifPay {}
+
+impl ElementImpl for OnvifPay {
+ fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
+ static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
+ gst::subclass::ElementMetadata::new(
+ "ONVIF metadata RTP payloader",
+ "Payloader/Network/RTP",
+ "ONVIF metadata RTP payloader",
+ "Mathieu Duponchelle <mathieu@centricular.com>",
+ )
+ });
+
+ Some(&*ELEMENT_METADATA)
+ }
+
+ fn pad_templates() -> &'static [gst::PadTemplate] {
+ static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
+ let sink_caps = gst::Caps::builder("application/x-onvif-metadata")
+ .field("encoding", "utf8")
+ .build();
+ let sink_pad_template = gst::PadTemplate::new(
+ "sink",
+ gst::PadDirection::Sink,
+ gst::PadPresence::Always,
+ &sink_caps,
+ )
+ .unwrap();
+
+ let src_caps = gst::Caps::builder("application/x-rtp")
+ .field("media", "application")
+ .field("payload", gst::IntRange::new(96, 127))
+ .field("clock-rate", 90000)
+ .field("encoding-name", "VND.ONVIF.METADATA")
+ .build();
+ let src_pad_template = gst::PadTemplate::new(
+ "src",
+ gst::PadDirection::Src,
+ gst::PadPresence::Always,
+ &src_caps,
+ )
+ .unwrap();
+
+ vec![src_pad_template, sink_pad_template]
+ });
+
+ PAD_TEMPLATES.as_ref()
+ }
+}
+
+impl RTPBasePayloadImpl for OnvifPay {
+ fn handle_buffer(
+ &self,
+ element: &Self::Type,
+ buffer: gst::Buffer,
+ ) -> Result<gst::FlowSuccess, gst::FlowError> {
+ let pts = buffer.pts();
+ let dts = buffer.dts();
+
+ // Input buffer must be readable
+ let buffer = buffer.into_mapped_buffer_readable().map_err(|_| {
+ gst::element_error!(
+ element,
+ gst::ResourceError::Read,
+ ["Failed to map buffer readable"]
+ );
+
+ gst::FlowError::Error
+ })?;
+
+ // Input buffer must be valid UTF-8
+ let utf8 = std::str::from_utf8(buffer.as_ref()).map_err(|err| {
+ gst::element_error!(
+ element,
+ gst::StreamError::Format,
+ ["Failed to decode buffer as UTF-8: {}", err]
+ );
+
+ gst::FlowError::Error
+ })?;
+
+ // Input buffer must start with a tt:MetadataStream node
+ let process = {
+ let mut process = false;
+
+ for token in xmlparser::Tokenizer::from(utf8) {
+ match token {
+ Ok(token) => match token {
+ xmlparser::Token::Comment { .. } => {
+ continue;
+ }
+ xmlparser::Token::Declaration { .. } => {
+ continue;
+ }
+ xmlparser::Token::ElementStart { local, .. } => {
+ if local.as_str() == "MetadataStream" {
+ process = true;
+ }
+ break;
+ }
+ _ => {
+ process = false;
+ break;
+ }
+ },
+ Err(err) => {
+ gst::element_error!(
+ element,
+ gst::StreamError::Format,
+ ["Invalid XML: {}", err]
+ );
+
+ return Err(gst::FlowError::Error);
+ }
+ }
+ }
+
+ process
+ };
+
+ if !process {
+ gst::element_error!(
+ element,
+ gst::StreamError::Format,
+ ["document must start with tt:MetadataStream element"]
+ );
+
+ return Err(gst::FlowError::Error);
+ }
+
+ let mtu = element.mtu();
+ let payload_size = gst_rtp::RTPBuffer::<()>::calc_payload_len(mtu, 0, 0) as usize;
+
+ let mut chunks = utf8.as_bytes().chunks(payload_size).peekable();
+ let mut buflist = gst::BufferList::new_sized((utf8.len() / payload_size) + 1);
+
+ {
+ let buflist_mut = buflist.get_mut().unwrap();
+
+ while let Some(chunk) = chunks.next() {
+ let mut outbuf = gst::Buffer::new_rtp_with_sizes(chunk.len() as u32, 0, 0)
+ .map_err(|err| {
+ gst::element_error!(
+ element,
+ gst::ResourceError::Write,
+ ["Failed to allocate output buffer: {}", err]
+ );
+
+ gst::FlowError::Error
+ })?;
+
+ {
+ let outbuf_mut = outbuf.get_mut().unwrap();
+ outbuf_mut.set_pts(pts);
+ outbuf_mut.set_dts(dts);
+
+ let mut outrtp = gst_rtp::RTPBuffer::from_buffer_writable(outbuf_mut).unwrap();
+ let payload = outrtp.payload_mut().unwrap();
+ payload.copy_from_slice(chunk);
+
+ // Last chunk, set marker bit
+ if chunks.peek().is_none() {
+ outrtp.set_marker(true);
+ }
+ }
+
+ buflist_mut.add(outbuf);
+ }
+ }
+
+ element.push_list(buflist)
+ }
+
+ fn set_caps(&self, element: &Self::Type, _caps: &gst::Caps) -> Result<(), gst::LoggableError> {
+ element.set_options("application", true, "VND.ONVIF.METADATA", 90000);
+
+ Ok(())
+ }
+}
diff --git a/net/onvif/src/onvifpay/mod.rs b/net/onvif/src/onvifpay/mod.rs
new file mode 100644
index 000000000..18b4a8a30
--- /dev/null
+++ b/net/onvif/src/onvifpay/mod.rs
@@ -0,0 +1,17 @@
+use gst::glib;
+use gst::prelude::*;
+
+mod imp;
+
+glib::wrapper! {
+ pub struct OnvifPay(ObjectSubclass<imp::OnvifPay>) @extends gst_rtp::RTPBasePayload, gst::Element, gst::Object;
+}
+
+pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ gst::Element::register(
+ Some(plugin),
+ "rtponvifpay",
+ gst::Rank::Primary,
+ OnvifPay::static_type(),
+ )
+}