Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMathieu Duponchelle <mathieu@centricular.com>2023-06-21 20:55:27 +0300
committerGStreamer Marge Bot <gitlab-merge-bot@gstreamer-foundation.org>2023-08-14 11:13:12 +0300
commite905299ebaac0630758214c43553bb535c4b64c6 (patch)
tree2fc6b3f1fcd64f94129a8ab488a5d80c351a6dd6 /generic
parent6523a07a9fe4737bf7085e5f2d3cbb79a7e41a78 (diff)
generic: expose inter plugin
This new plugin exposes two elements, intersink and intersrc. These act as wormholes for data in the same process and can be used to forward data from one pipeline to another. The implementation makes use of gstreamer-utils' StreamProducer, and supports dynamically adding and removing consumers, before and after producers, and changing producer names while PLAYING, both on the sink and the src. This initial implementation comes with a small demo, and a few tests. Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1257>
Diffstat (limited to 'generic')
-rw-r--r--generic/inter/Cargo.toml53
-rw-r--r--generic/inter/build.rs5
-rw-r--r--generic/inter/examples/basic.rs74
-rw-r--r--generic/inter/examples/plug-and-play.rs325
-rw-r--r--generic/inter/src/lib.rs44
-rw-r--r--generic/inter/src/sink/imp.rs217
-rw-r--r--generic/inter/src/sink/mod.rs35
-rw-r--r--generic/inter/src/src/imp.rs203
-rw-r--r--generic/inter/src/src/mod.rs34
-rw-r--r--generic/inter/src/streamproducer/mod.rs159
-rw-r--r--generic/inter/tests/inter.rs138
11 files changed, 1287 insertions, 0 deletions
diff --git a/generic/inter/Cargo.toml b/generic/inter/Cargo.toml
new file mode 100644
index 000000000..096575421
--- /dev/null
+++ b/generic/inter/Cargo.toml
@@ -0,0 +1,53 @@
+[package]
+name = "gst-plugin-inter"
+version = "0.11.0-alpha.1"
+authors = ["Mathieu Duponchelle <mathieu@centricular.com>"]
+license = "MPL-2.0"
+description = "GStreamer Inter Plugin"
+repository = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs"
+edition = "2021"
+rust-version = "1.66"
+
+[dependencies]
+anyhow = "1"
+gst = { package = "gstreamer", git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_18"] }
+gst_utils = { package = "gstreamer-utils", git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
+gst_app = { package = "gstreamer-app", git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
+once_cell = "1.0"
+
+[dev-dependencies]
+pretty_assertions = "1"
+gst-check = { package = "gstreamer-check", git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
+futures = "0.3"
+tokio = { version = "1", features = ["fs", "macros", "rt-multi-thread", "time"] }
+tokio-stream = "0.1.11"
+serial_test = "2"
+
+[lib]
+name = "gstrsinter"
+crate-type = ["cdylib", "rlib"]
+path = "src/lib.rs"
+
+[build-dependencies]
+gst-plugin-version-helper = { path="../../version-helper" }
+
+[features]
+static = []
+capi = []
+doc = ["gst/v1_18"]
+
+[package.metadata.capi]
+min_version = "0.8.0"
+
+[package.metadata.capi.header]
+enabled = false
+
+[package.metadata.capi.library]
+install_subdir = "gstreamer-1.0"
+versioning = false
+
+[package.metadata.capi.pkg_config]
+requires_private = "gstreamer-1.0, gobject-2.0, glib-2.0, gmodule-2.0"
+
+[[example]]
+name = "plug-and-play"
diff --git a/generic/inter/build.rs b/generic/inter/build.rs
new file mode 100644
index 000000000..76b2a7c23
--- /dev/null
+++ b/generic/inter/build.rs
@@ -0,0 +1,5 @@
+// SPDX-License-Identifier: MPL-2.0
+
+fn main() {
+ gst_plugin_version_helper::info()
+}
diff --git a/generic/inter/examples/basic.rs b/generic/inter/examples/basic.rs
new file mode 100644
index 000000000..1afbe8a4c
--- /dev/null
+++ b/generic/inter/examples/basic.rs
@@ -0,0 +1,74 @@
+use anyhow::Error;
+use futures::prelude::*;
+use futures::stream::select_all;
+use gst::prelude::*;
+
+fn toplevel(obj: &gst::Object) -> gst::Object {
+ if let Some(parent) = obj.parent() {
+ toplevel(&parent)
+ } else {
+ obj.clone()
+ }
+}
+
+#[tokio::main]
+async fn main() -> Result<(), Error> {
+ gst::init()?;
+
+ let src_pipeline = gst::parse_launch("videotestsrc is-live=true ! intersink")?;
+ let sink_pipeline = gst::parse_launch("intersrc ! videoconvert ! autovideosink")?;
+
+ let mut stream = select_all([
+ src_pipeline.bus().unwrap().stream(),
+ sink_pipeline.bus().unwrap().stream(),
+ ]);
+
+ let base_time = gst::SystemClock::obtain().time().unwrap();
+
+ src_pipeline.set_clock(Some(&gst::SystemClock::obtain()))?;
+ src_pipeline.set_start_time(gst::ClockTime::NONE);
+ src_pipeline.set_base_time(base_time);
+
+ sink_pipeline.set_clock(Some(&gst::SystemClock::obtain()))?;
+ sink_pipeline.set_start_time(gst::ClockTime::NONE);
+ sink_pipeline.set_base_time(base_time);
+
+ src_pipeline.set_state(gst::State::Playing)?;
+ sink_pipeline.set_state(gst::State::Playing)?;
+
+ while let Some(msg) = stream.next().await {
+ use gst::MessageView;
+
+ match msg.view() {
+ MessageView::Latency(..) => {
+ if let Some(o) = msg.src() {
+ if let Ok(pipeline) = toplevel(o).downcast::<gst::Pipeline>() {
+ eprintln!("Recalculating latency {:?}", pipeline);
+ let _ = pipeline.recalculate_latency();
+ }
+ }
+ }
+ MessageView::Eos(..) => {
+ eprintln!("Unexpected EOS");
+ break;
+ }
+ MessageView::Error(err) => {
+ eprintln!(
+ "Got error from {}: {} ({})",
+ msg.src()
+ .map(|s| String::from(s.path_string()))
+ .unwrap_or_else(|| "None".into()),
+ err.error(),
+ err.debug().unwrap_or_else(|| "".into()),
+ );
+ break;
+ }
+ _ => (),
+ }
+ }
+
+ src_pipeline.set_state(gst::State::Null)?;
+ sink_pipeline.set_state(gst::State::Null)?;
+
+ Ok(())
+}
diff --git a/generic/inter/examples/plug-and-play.rs b/generic/inter/examples/plug-and-play.rs
new file mode 100644
index 000000000..f797e0b43
--- /dev/null
+++ b/generic/inter/examples/plug-and-play.rs
@@ -0,0 +1,325 @@
+use anyhow::Error;
+use futures::prelude::*;
+use gst::prelude::*;
+use std::collections::HashMap;
+use std::io::prelude::*;
+use tokio::task;
+
+struct Producer {
+ pipeline: gst::Pipeline,
+ sink: gst::Element,
+ overlay: gst::Element,
+}
+
+struct Consumer {
+ pipeline: gst::Pipeline,
+ src: gst::Element,
+}
+
+fn create_sink_pipeline(producer_name: &str) -> Result<Producer, Error> {
+ let pipeline = gst::Pipeline::builder()
+ .name(format!("producer-{producer_name}"))
+ .build();
+
+ let videotestsrc = gst::ElementFactory::make("videotestsrc")
+ .property_from_str("pattern", "ball")
+ .property("is-live", true)
+ .build()?;
+ let capsfilter = gst::ElementFactory::make("capsfilter")
+ .property(
+ "caps",
+ gst::Caps::builder("video/x-raw")
+ .field("framerate", gst::Fraction::new(50, 1))
+ .build(),
+ )
+ .build()?;
+ let queue = gst::ElementFactory::make("queue").build()?;
+ let overlay = gst::ElementFactory::make("textoverlay")
+ .property("font-desc", "Sans 30")
+ .property("text", format!("Producer: {producer_name}"))
+ .property_from_str("valignment", "top")
+ .build()?;
+ let timeoverlay = gst::ElementFactory::make("timeoverlay")
+ .property("font-desc", "Sans 30")
+ .property_from_str("valignment", "center")
+ .property_from_str("halignment", "center")
+ .build()?;
+ let sink = gst::ElementFactory::make("intersink")
+ .property("producer-name", producer_name)
+ .build()?;
+
+ pipeline.add_many([
+ &videotestsrc,
+ &capsfilter,
+ &queue,
+ &overlay,
+ &timeoverlay,
+ &sink,
+ ])?;
+ gst::Element::link_many([
+ &videotestsrc,
+ &capsfilter,
+ &queue,
+ &overlay,
+ &timeoverlay,
+ &sink,
+ ])?;
+
+ Ok(Producer {
+ pipeline,
+ sink,
+ overlay,
+ })
+}
+
+fn create_src_pipeline(producer_name: &str, consumer_name: &str) -> Result<Consumer, Error> {
+ let pipeline = gst::Pipeline::builder()
+ .name(format!("consumer-{consumer_name}"))
+ .build();
+
+ let src = gst::ElementFactory::make("intersrc")
+ .property("producer-name", producer_name)
+ .build()?;
+ let queue = gst::ElementFactory::make("queue").build()?;
+ let vconv = gst::ElementFactory::make("videoconvert").build()?;
+ let overlay = gst::ElementFactory::make("textoverlay")
+ .property("font-desc", "Sans 30")
+ .property("text", format!("Consumer: {consumer_name}"))
+ .property_from_str("valignment", "bottom")
+ .build()?;
+ let vconv2 = gst::ElementFactory::make("videoconvert").build()?;
+ let sink = gst::ElementFactory::make("autovideosink").build()?;
+
+ pipeline.add_many([&src, &queue, &vconv, &overlay, &vconv2, &sink])?;
+ gst::Element::link_many([&src, &queue, &vconv, &overlay, &vconv2, &sink])?;
+
+ Ok(Consumer { pipeline, src })
+}
+
+fn prompt_on() {
+ print!("$ ");
+ let _ = std::io::stdout().flush();
+}
+
+fn monitor_pipeline(pipeline: &gst::Pipeline, base_time: gst::ClockTime) -> Result<(), Error> {
+ pipeline.set_clock(Some(&gst::SystemClock::obtain()))?;
+ pipeline.set_start_time(gst::ClockTime::NONE);
+ pipeline.set_base_time(base_time);
+
+ pipeline.set_state(gst::State::Playing)?;
+
+ let mut bus_stream = pipeline.bus().expect("Pipeline should have a bus").stream();
+
+ let pipeline_clone = pipeline.downgrade();
+ task::spawn(async move {
+ while let Some(msg) = bus_stream.next().await {
+ use gst::MessageView;
+
+ if let Some(pipeline) = pipeline_clone.upgrade() {
+ match msg.view() {
+ MessageView::Latency(..) => {
+ let _ = pipeline.recalculate_latency();
+ }
+ MessageView::Eos(..) => {
+ println!(
+ "EOS from {}",
+ msg.src()
+ .map(|s| String::from(s.path_string()))
+ .unwrap_or_else(|| "None".into())
+ );
+ prompt_on();
+ break;
+ }
+ MessageView::Error(err) => {
+ let _ = pipeline.set_state(gst::State::Null);
+ println!(
+ "Got error from {}: {} ({})",
+ msg.src()
+ .map(|s| String::from(s.path_string()))
+ .unwrap_or_else(|| "None".into()),
+ err.error(),
+ err.debug().unwrap_or_else(|| "".into()),
+ );
+ prompt_on();
+ break;
+ }
+ MessageView::StateChanged(sc) => {
+ if msg.src() == Some(pipeline.upcast_ref()) {
+ gst::debug_bin_to_dot_file(
+ pipeline.upcast_ref::<gst::Bin>(),
+ gst::DebugGraphDetails::all(),
+ format!("{}-{:?}-{:?}", pipeline.name(), sc.old(), sc.current()),
+ );
+ }
+ }
+ _ => (),
+ }
+ } else {
+ break;
+ }
+ }
+ });
+
+ Ok(())
+}
+
+#[tokio::main]
+async fn main() -> Result<(), Error> {
+ gst::init()?;
+
+ println!("h for help");
+
+ let base_time = gst::SystemClock::obtain().time().unwrap();
+
+ let mut producers: HashMap<String, Producer> = HashMap::new();
+ let mut consumers: HashMap<String, Consumer> = HashMap::new();
+
+ let mut stdin = std::io::stdin().lock();
+ loop {
+ let mut buf = String::new();
+
+ prompt_on();
+
+ match stdin.read_line(&mut buf)? {
+ 0 => {
+ eprintln!("EOF!");
+ break;
+ }
+ _ => {
+ let command: Vec<_> = buf.split_whitespace().collect();
+
+ match command.first() {
+ Some(&"ap") => {
+ if command.len() != 2 {
+ println!("ap <producer_name>: Add a producer");
+ } else {
+ let producer_name = command.get(1).unwrap().to_string();
+
+ if producers.contains_key(&producer_name) {
+ println!("Producer with name {producer_name} already exists!");
+ continue;
+ }
+
+ let producer = create_sink_pipeline(&producer_name)?;
+ monitor_pipeline(&producer.pipeline, base_time)?;
+
+ println!("Added producer with name {producer_name}");
+
+ producers.insert(producer_name, producer);
+ }
+ }
+ Some(&"ac") => {
+ if command.len() != 3 {
+ println!("ac <consumer_name> <producer_name>: Add a consumer");
+ } else {
+ let consumer_name = command.get(1).unwrap().to_string();
+ let producer_name = command.get(2).unwrap().to_string();
+
+ if consumers.contains_key(&consumer_name) {
+ println!("Consumer with name {consumer_name} already exists!");
+ continue;
+ }
+
+ let consumer = create_src_pipeline(&producer_name, &consumer_name)?;
+ monitor_pipeline(&consumer.pipeline, base_time)?;
+
+ println!("Added consumer with name {consumer_name} and producer name {producer_name}");
+
+ consumers.insert(consumer_name, consumer);
+ }
+ }
+ Some(&"rp") => {
+ if command.len() != 2 {
+ println!("rp <producer_name>: Remove a producer");
+ } else {
+ let producer_name = command.get(1).unwrap().to_string();
+ if let Some(producer) = producers.remove(&producer_name) {
+ let _ = producer.pipeline.set_state(gst::State::Null);
+ println!("Removed producer with name {producer_name}");
+ } else {
+ println!("No producer with name {producer_name}");
+ }
+ }
+ }
+ Some(&"rc") => {
+ if command.len() != 2 {
+ println!("rc <consumer_name>: Remove a consumer");
+ } else {
+ let consumer_name = command.get(1).unwrap().to_string();
+ if let Some(consumer) = consumers.remove(&consumer_name) {
+ let _ = consumer.pipeline.set_state(gst::State::Null);
+ println!("Removed consumer with name {consumer_name}");
+ } else {
+ println!("No consumer with name {consumer_name}");
+ }
+ }
+ }
+ Some(&"cnp") => {
+ if command.len() != 3 {
+ println!("cnp <old_producer_name> <new_producer_name>: Change the name of a producer");
+ } else {
+ let old_producer_name = command.get(1).unwrap().to_string();
+ let producer_name = command.get(2).unwrap().to_string();
+
+ if producers.contains_key(&producer_name) {
+ println!("Producer with name {producer_name} already exists!");
+ continue;
+ }
+
+ if let Some(producer) = producers.remove(&old_producer_name) {
+ producer.sink.set_property("producer-name", &producer_name);
+ producer
+ .overlay
+ .set_property("text", format!("Producer: {producer_name}"));
+ println!(
+ "Changed producer name {old_producer_name} -> {producer_name}"
+ );
+ producers.insert(producer_name, producer);
+ } else {
+ println!("No producer with name {old_producer_name}");
+ }
+ }
+ }
+ Some(&"cpn") => {
+ if command.len() != 3 {
+ println!("cpn <consumer_name> <new_producer_name>: Change the producer name for a consumer");
+ } else {
+ let consumer_name = command.get(1).unwrap().to_string();
+ let producer_name = command.get(2).unwrap().to_string();
+
+ if let Some(consumer) = consumers.get_mut(&consumer_name) {
+ consumer.src.set_property("producer-name", &producer_name);
+ println!("Changed producer name for consumer {consumer_name} to {producer_name}");
+ } else {
+ println!("No consumer with name {consumer_name}");
+ }
+ }
+ }
+ Some(&"h") => {
+ println!("h: show this help");
+ println!("ap <producer_name>: Add a producer");
+ println!("ac <consumer_name> <producer_name>: Add a consumer");
+ println!("rp <producer_name>: Remove a producer");
+ println!("rc <consumer_name>: Remove a consumer");
+ println!("cnp <old_producer_name> <new_producer_name>: Change the name of a producer");
+ println!("cpn <consumer_name> <new_producer_name>: Change the producer name for a consumer");
+ }
+ _ => {
+ println!("Unknown command");
+ }
+ }
+ }
+ }
+ buf.clear();
+ }
+
+ for (_, producer) in producers {
+ let _ = producer.pipeline.set_state(gst::State::Null);
+ }
+
+ for (_, consumer) in consumers {
+ let _ = consumer.pipeline.set_state(gst::State::Null);
+ }
+
+ Ok(())
+}
diff --git a/generic/inter/src/lib.rs b/generic/inter/src/lib.rs
new file mode 100644
index 000000000..93ed1c117
--- /dev/null
+++ b/generic/inter/src/lib.rs
@@ -0,0 +1,44 @@
+// Copyright (C) 2023 Mathieu Duponchelle <mathieu@centricular.com>
+//
+// Take a look at the license at the top of the repository in the LICENSE file.
+#![allow(unused_doc_comments)]
+
+//! GStreamer elements for connecting pipelines in the same process
+
+mod sink;
+mod src;
+mod streamproducer;
+/**
+ * plugin-rsinter:
+ * @title: Rust inter elements
+ * @short_description: A set of elements for transferring data between pipelines
+ *
+ * This plugin exposes two elements, `intersink` and `intersrc`, that can be
+ * used to transfer data from one pipeline to multiple others in the same
+ * process.
+ *
+ * The elements are implemented using the `StreamProducer` API from
+ * gstreamer-utils.
+ *
+ * Since: plugins-rs-0.11.0
+ */
+use gst::glib;
+
+fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ sink::register(plugin)?;
+ src::register(plugin)?;
+
+ Ok(())
+}
+
+gst::plugin_define!(
+ rsinter,
+ env!("CARGO_PKG_DESCRIPTION"),
+ plugin_init,
+ concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")),
+ "MPL-2.0",
+ env!("CARGO_PKG_NAME"),
+ env!("CARGO_PKG_NAME"),
+ env!("CARGO_PKG_REPOSITORY"),
+ env!("BUILD_REL_DATE")
+);
diff --git a/generic/inter/src/sink/imp.rs b/generic/inter/src/sink/imp.rs
new file mode 100644
index 000000000..25e2fb3a1
--- /dev/null
+++ b/generic/inter/src/sink/imp.rs
@@ -0,0 +1,217 @@
+// SPDX-License-Identifier: MPL-2.0
+
+use crate::streamproducer::InterStreamProducer;
+use anyhow::Error;
+use gst::glib;
+use gst::prelude::*;
+use gst::subclass::prelude::*;
+
+use std::sync::Mutex;
+
+use once_cell::sync::Lazy;
+
+const DEFAULT_PRODUCER_NAME: &str = "default";
+
+#[derive(Debug)]
+struct Settings {
+ producer_name: String,
+}
+
+impl Default for Settings {
+ fn default() -> Self {
+ Settings {
+ producer_name: DEFAULT_PRODUCER_NAME.to_string(),
+ }
+ }
+}
+
+struct State {
+ appsink: gst_app::AppSink,
+ sinkpad: gst::GhostPad,
+}
+
+/* Locking order is field order */
+pub struct InterSink {
+ settings: Mutex<Settings>,
+ state: Mutex<State>,
+}
+
+impl InterSink {
+ fn prepare(&self) -> Result<(), Error> {
+ let settings = self.settings.lock().unwrap();
+ let state = self.state.lock().unwrap();
+
+ InterStreamProducer::acquire(&settings.producer_name, &state.appsink)?;
+
+ Ok(())
+ }
+
+ fn unprepare(&self) {
+ let settings = self.settings.lock().unwrap();
+ InterStreamProducer::release(&settings.producer_name);
+ }
+}
+
+static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
+ gst::DebugCategory::new(
+ "intersink",
+ gst::DebugColorFlags::empty(),
+ Some("Inter Sink"),
+ )
+});
+
+#[glib::object_subclass]
+impl ObjectSubclass for InterSink {
+ const NAME: &'static str = "GstInterSink";
+ type Type = super::InterSink;
+ type ParentType = gst::Bin;
+
+ fn with_class(klass: &Self::Class) -> Self {
+ let templ = klass.pad_template("sink").unwrap();
+ let sinkpad = gst::GhostPad::from_template(&templ);
+
+ Self {
+ settings: Mutex::new(Default::default()),
+ state: Mutex::new(State {
+ appsink: gst_app::AppSink::builder().name("appsink").build(),
+ sinkpad: sinkpad.upcast(),
+ }),
+ }
+ }
+}
+
+impl ObjectImpl for InterSink {
+ fn properties() -> &'static [glib::ParamSpec] {
+ static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
+ vec![glib::ParamSpecString::builder("producer-name")
+ .nick("Producer Name")
+ .blurb("Producer Name to use")
+ .doc_show_default()
+ .mutable_playing()
+ .build()]
+ });
+
+ PROPERTIES.as_ref()
+ }
+
+ fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
+ match pspec.name() {
+ "producer-name" => {
+ let mut settings = self.settings.lock().unwrap();
+ let old_producer_name = settings.producer_name.clone();
+ settings.producer_name = value
+ .get::<String>()
+ .unwrap_or_else(|_| DEFAULT_PRODUCER_NAME.to_string());
+
+ if let Some(appsink) = InterStreamProducer::release(&old_producer_name) {
+ if let Err(err) =
+ InterStreamProducer::acquire(&settings.producer_name, &appsink)
+ {
+ drop(settings);
+ gst::error!(CAT, imp: self, "{err}");
+ self.post_error_message(gst::error_msg!(
+ gst::StreamError::Failed,
+ ["{err}"]
+ ))
+ } else {
+ drop(settings);
+ // This is required because StreamProducer obtains the latency
+ // it needs to forward from Latency events, and we need to let the
+ // application know it should recalculate latency to get the event
+ // to travel upstream again
+ self.post_message(gst::message::Latency::new());
+ }
+ }
+ }
+ _ => unimplemented!(),
+ };
+ }
+
+ fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
+ match pspec.name() {
+ "producer-name" => {
+ let settings = self.settings.lock().unwrap();
+ settings.producer_name.to_value()
+ }
+ _ => unimplemented!(),
+ }
+ }
+
+ fn constructed(&self) {
+ self.parent_constructed();
+ let obj = self.obj();
+
+ obj.set_suppressed_flags(gst::ElementFlags::SINK | gst::ElementFlags::SOURCE);
+ obj.set_element_flags(gst::ElementFlags::SINK);
+
+ let state = self.state.lock().unwrap();
+ obj.add(&state.appsink).unwrap();
+ obj.add_pad(&state.sinkpad).unwrap();
+ state
+ .sinkpad
+ .set_target(Some(&state.appsink.static_pad("sink").unwrap()))
+ .unwrap();
+ }
+}
+
+impl GstObjectImpl for InterSink {}
+
+impl ElementImpl for InterSink {
+ fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
+ static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
+ gst::subclass::ElementMetadata::new(
+ "Inter Sink",
+ "Generic/Sink",
+ "Inter Sink",
+ "Mathieu Duponchelle <mathieu@centricular.com>",
+ )
+ });
+
+ Some(&*ELEMENT_METADATA)
+ }
+
+ fn pad_templates() -> &'static [gst::PadTemplate] {
+ static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
+ let caps = gst::Caps::new_any();
+
+ let sink_pad_template = gst::PadTemplate::new(
+ "sink",
+ gst::PadDirection::Sink,
+ gst::PadPresence::Always,
+ &caps,
+ )
+ .unwrap();
+ vec![sink_pad_template]
+ });
+
+ PAD_TEMPLATES.as_ref()
+ }
+
+ fn change_state(
+ &self,
+ transition: gst::StateChange,
+ ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
+ gst::trace!(CAT, imp: self, "Changing state {:?}", transition);
+
+ if transition == gst::StateChange::ReadyToPaused {
+ if let Err(err) = self.prepare() {
+ gst::element_error!(
+ self.obj(),
+ gst::StreamError::Failed,
+ ["Failed to prepare: {}", err]
+ );
+ return Err(gst::StateChangeError);
+ }
+ }
+
+ let ret = self.parent_change_state(transition)?;
+
+ if transition == gst::StateChange::PausedToReady {
+ self.unprepare();
+ }
+
+ Ok(ret)
+ }
+}
+
+impl BinImpl for InterSink {}
diff --git a/generic/inter/src/sink/mod.rs b/generic/inter/src/sink/mod.rs
new file mode 100644
index 000000000..e71f7cf35
--- /dev/null
+++ b/generic/inter/src/sink/mod.rs
@@ -0,0 +1,35 @@
+// SPDX-License-Identifier: MPL-2.0
+
+use glib::prelude::*;
+use gst::glib;
+
+mod imp;
+
+/**
+ * SECTION:element-intersink
+ *
+ * #intersink is an element that can be used to produce data for
+ * multiple #intersrc elements to consume.
+ *
+ * You can access the underlying appsink element through the static name
+ * "appsink".
+ *
+ * #intersink should not reside in the same pipeline as the #intersrc
+ * that consumes from it, here is an example of how to use those elements
+ * in separate pipelines:
+ *
+ * {{ generic/inter/examples/basic.rs }}
+ */
+
+glib::wrapper! {
+ pub struct InterSink(ObjectSubclass<imp::InterSink>) @extends gst::Bin, gst::Element, gst::Object;
+}
+
+pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ gst::Element::register(
+ Some(plugin),
+ "intersink",
+ gst::Rank::None,
+ InterSink::static_type(),
+ )
+}
diff --git a/generic/inter/src/src/imp.rs b/generic/inter/src/src/imp.rs
new file mode 100644
index 000000000..211b9cba7
--- /dev/null
+++ b/generic/inter/src/src/imp.rs
@@ -0,0 +1,203 @@
+// SPDX-License-Identifier: MPL-2.0
+
+use crate::streamproducer::InterStreamProducer;
+use anyhow::Error;
+use gst::glib;
+use gst::prelude::*;
+use gst::subclass::prelude::*;
+
+use std::sync::Mutex;
+
+use once_cell::sync::Lazy;
+
+const DEFAULT_PRODUCER_NAME: &str = "default";
+
+#[derive(Debug)]
+struct Settings {
+ producer_name: String,
+}
+
+impl Default for Settings {
+ fn default() -> Self {
+ Settings {
+ producer_name: DEFAULT_PRODUCER_NAME.to_string(),
+ }
+ }
+}
+
+struct State {
+ srcpad: gst::GhostPad,
+ appsrc: gst_app::AppSrc,
+}
+
+/* Locking order is field order */
+pub struct InterSrc {
+ settings: Mutex<Settings>,
+ state: Mutex<State>,
+}
+
+impl InterSrc {
+ fn prepare(&self) -> Result<(), Error> {
+ let settings = self.settings.lock().unwrap();
+ let state = self.state.lock().unwrap();
+
+ InterStreamProducer::subscribe(&settings.producer_name, &state.appsrc);
+
+ Ok(())
+ }
+
+ fn unprepare(&self) {
+ let settings = self.settings.lock().unwrap();
+ let state = self.state.lock().unwrap();
+
+ InterStreamProducer::unsubscribe(&settings.producer_name, &state.appsrc);
+ }
+}
+
+static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
+ gst::DebugCategory::new("intersrc", gst::DebugColorFlags::empty(), Some("Inter Src"))
+});
+
+#[glib::object_subclass]
+impl ObjectSubclass for InterSrc {
+ const NAME: &'static str = "GstInterSrc";
+
+ type Type = super::InterSrc;
+ type ParentType = gst::Bin;
+
+ fn with_class(klass: &Self::Class) -> Self {
+ let templ = klass.pad_template("src").unwrap();
+ let srcpad = gst::GhostPad::from_template(&templ);
+
+ Self {
+ settings: Mutex::new(Default::default()),
+ state: Mutex::new(State {
+ srcpad: srcpad.upcast(),
+ appsrc: gst_app::AppSrc::builder().name("appsrc").build(),
+ }),
+ }
+ }
+}
+
+impl ObjectImpl for InterSrc {
+ fn properties() -> &'static [glib::ParamSpec] {
+ static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
+ vec![glib::ParamSpecString::builder("producer-name")
+ .nick("Producer Name")
+ .blurb("Producer Name to consume from")
+ .doc_show_default()
+ .mutable_playing()
+ .build()]
+ });
+
+ PROPERTIES.as_ref()
+ }
+
+ fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
+ match pspec.name() {
+ "producer-name" => {
+ let mut settings = self.settings.lock().unwrap();
+ let old_producer_name = settings.producer_name.clone();
+ settings.producer_name = value
+ .get::<String>()
+ .unwrap_or_else(|_| DEFAULT_PRODUCER_NAME.to_string());
+
+ let state = self.state.lock().unwrap();
+
+ if InterStreamProducer::unsubscribe(&old_producer_name, &state.appsrc) {
+ InterStreamProducer::subscribe(&settings.producer_name, &state.appsrc);
+ }
+ }
+ _ => unimplemented!(),
+ };
+ }
+
+ fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
+ match pspec.name() {
+ "producer-name" => {
+ let settings = self.settings.lock().unwrap();
+ settings.producer_name.to_value()
+ }
+ _ => unimplemented!(),
+ }
+ }
+
+ fn constructed(&self) {
+ self.parent_constructed();
+ let obj = self.obj();
+
+ obj.set_suppressed_flags(gst::ElementFlags::SINK | gst::ElementFlags::SOURCE);
+ obj.set_element_flags(gst::ElementFlags::SOURCE);
+
+ let state = self.state.lock().unwrap();
+ gst_utils::StreamProducer::configure_consumer(&state.appsrc);
+ obj.add(&state.appsrc).unwrap();
+ obj.add_pad(&state.srcpad).unwrap();
+ state
+ .srcpad
+ .set_target(Some(&state.appsrc.static_pad("src").unwrap()))
+ .unwrap();
+ }
+}
+
+impl GstObjectImpl for InterSrc {}
+
+impl ElementImpl for InterSrc {
+ fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
+ static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
+ gst::subclass::ElementMetadata::new(
+ "Inter Src",
+ "Generic/Src",
+ "Inter Src",
+ "Mathieu Duponchelle <mathieu@centricular.com>",
+ )
+ });
+
+ Some(&*ELEMENT_METADATA)
+ }
+
+ fn pad_templates() -> &'static [gst::PadTemplate] {
+ static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
+ let caps = gst::Caps::new_any();
+
+ let src_pad_template = gst::PadTemplate::new(
+ "src",
+ gst::PadDirection::Src,
+ gst::PadPresence::Always,
+ &caps,
+ )
+ .unwrap();
+ vec![src_pad_template]
+ });
+
+ PAD_TEMPLATES.as_ref()
+ }
+
+ fn change_state(
+ &self,
+ transition: gst::StateChange,
+ ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
+ gst::trace!(CAT, imp: self, "Changing state {:?}", transition);
+
+ if transition == gst::StateChange::ReadyToPaused {
+ if let Err(err) = self.prepare() {
+ gst::element_error!(
+ self.obj(),
+ gst::StreamError::Failed,
+ ["Failed to prepare: {}", err]
+ );
+ return Err(gst::StateChangeError);
+ }
+ }
+
+ let ret = self.parent_change_state(transition)?;
+
+ if transition == gst::StateChange::PausedToReady {
+ self.unprepare();
+ }
+
+ Ok(ret)
+ }
+}
+
+impl BinImpl for InterSrc {}
diff --git a/generic/inter/src/src/mod.rs b/generic/inter/src/src/mod.rs
new file mode 100644
index 000000000..4d3f2beb6
--- /dev/null
+++ b/generic/inter/src/src/mod.rs
@@ -0,0 +1,34 @@
+// SPDX-License-Identifier: MPL-2.0
+
+use glib::prelude::*;
+use gst::glib;
+
+mod imp;
+
+/**
+ * SECTION:element-intersrc
+ *
+ * #intersrc is an element that can be used to consume data from an #intersink.
+ *
+ * You can access the underlying appsrc element through the static name
+ * "appsrc".
+ *
+ * #intersrc should not reside in the same pipeline as the #intersink
+ * that it consumes from, here is an example of how to use those elements
+ * in separate pipelines:
+ *
+ * {{ generic/inter/examples/basic.rs }}
+ */
+
+glib::wrapper! {
+ pub struct InterSrc(ObjectSubclass<imp::InterSrc>) @extends gst::Bin, gst::Element, gst::Object;
+}
+
+pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ gst::Element::register(
+ Some(plugin),
+ "intersrc",
+ gst::Rank::None,
+ InterSrc::static_type(),
+ )
+}
diff --git a/generic/inter/src/streamproducer/mod.rs b/generic/inter/src/streamproducer/mod.rs
new file mode 100644
index 000000000..fc83e1a01
--- /dev/null
+++ b/generic/inter/src/streamproducer/mod.rs
@@ -0,0 +1,159 @@
+use gst::prelude::*;
+use std::collections::{HashMap, HashSet};
+use std::sync::Mutex;
+
+use anyhow::{anyhow, Error};
+use once_cell::sync::Lazy;
+
+pub enum InterStreamProducer {
+ Pending {
+ consumers: HashSet<gst_app::AppSrc>,
+ },
+ Active {
+ producer: gst_utils::StreamProducer,
+ links: HashMap<gst_app::AppSrc, gst_utils::ConsumptionLink>,
+ },
+}
+
+static PRODUCERS: Lazy<Mutex<HashMap<String, InterStreamProducer>>> =
+ Lazy::new(|| Mutex::new(HashMap::new()));
+
+fn toplevel(obj: &gst::Object) -> gst::Object {
+ if let Some(parent) = obj.parent() {
+ toplevel(&parent)
+ } else {
+ obj.clone()
+ }
+}
+
+fn ensure_different_toplevel(producer: &gst_app::AppSink, consumer: &gst_app::AppSrc) {
+ let top_a = toplevel(producer.upcast_ref());
+ let top_b = toplevel(consumer.upcast_ref());
+
+ if top_a == top_b {
+ gst::glib::g_critical!(
+ "gstrsinter",
+ "Intersink with appsink {} should not share the same toplevel bin \
+ as intersrc with appsrc {}, this results in loops in latency calculation",
+ producer.name(),
+ consumer.name()
+ );
+ }
+}
+
+impl InterStreamProducer {
+ pub fn acquire(
+ name: &str,
+ appsink: &gst_app::AppSink,
+ ) -> Result<gst_utils::StreamProducer, Error> {
+ let mut producers = PRODUCERS.lock().unwrap();
+
+ if let Some(producer) = producers.remove(name) {
+ match producer {
+ InterStreamProducer::Pending { consumers } => {
+ let producer = gst_utils::StreamProducer::from(appsink);
+ let mut links = HashMap::new();
+
+ for consumer in consumers {
+ ensure_different_toplevel(appsink, &consumer);
+
+ let link = producer
+ .add_consumer(&consumer)
+ .expect("consumer should not have already been added");
+ links.insert(consumer, link);
+ }
+
+ producers.insert(
+ name.to_string(),
+ InterStreamProducer::Active {
+ producer: producer.clone(),
+ links,
+ },
+ );
+
+ Ok(producer)
+ }
+ InterStreamProducer::Active { .. } => {
+ producers.insert(name.to_string(), producer);
+
+ Err(anyhow!(
+ "An active producer already exists with name {}",
+ name
+ ))
+ }
+ }
+ } else {
+ let producer = gst_utils::StreamProducer::from(appsink);
+
+ producers.insert(
+ name.to_string(),
+ InterStreamProducer::Active {
+ producer: producer.clone(),
+ links: HashMap::new(),
+ },
+ );
+
+ Ok(producer)
+ }
+ }
+
+ pub fn release(name: &str) -> Option<gst_app::AppSink> {
+ let mut producers = PRODUCERS.lock().unwrap();
+
+ if let Some(producer) = producers.remove(name) {
+ match producer {
+ InterStreamProducer::Pending { .. } => None,
+ InterStreamProducer::Active { links, producer } => {
+ producers.insert(
+ name.to_string(),
+ InterStreamProducer::Pending {
+ consumers: links.into_keys().collect(),
+ },
+ );
+
+ Some(producer.appsink().clone())
+ }
+ }
+ } else {
+ None
+ }
+ }
+
+ pub fn subscribe(name: &str, consumer: &gst_app::AppSrc) {
+ let mut producers = PRODUCERS.lock().unwrap();
+
+ if let Some(producer) = producers.get_mut(name) {
+ match producer {
+ InterStreamProducer::Pending { consumers } => {
+ consumers.insert(consumer.clone());
+ }
+ InterStreamProducer::Active { producer, links } => {
+ ensure_different_toplevel(producer.appsink(), consumer);
+
+ let link = producer
+ .add_consumer(consumer)
+ .expect("consumer should not already have been added");
+ links.insert(consumer.clone(), link);
+ }
+ }
+ } else {
+ let producer = InterStreamProducer::Pending {
+ consumers: [consumer.clone()].into(),
+ };
+ producers.insert(name.to_string(), producer);
+ }
+ }
+
+ pub fn unsubscribe(name: &str, consumer: &gst_app::AppSrc) -> bool {
+ let mut producers = PRODUCERS.lock().unwrap();
+
+ if let Some(producer) = producers.get_mut(name) {
+ match producer {
+ InterStreamProducer::Pending { consumers } => consumers.remove(consumer),
+ InterStreamProducer::Active { links, .. } => links.remove(consumer).is_some(),
+ }
+ } else {
+ false
+ }
+ }
+}
diff --git a/generic/inter/tests/inter.rs b/generic/inter/tests/inter.rs
new file mode 100644
index 000000000..bc7e746f9
--- /dev/null
+++ b/generic/inter/tests/inter.rs
@@ -0,0 +1,138 @@
+// SPDX-License-Identifier: MPL-2.0
+
+use gst::prelude::*;
+use serial_test::serial;
+
+use pretty_assertions::assert_eq;
+
+fn init() {
+ use std::sync::Once;
+ static INIT: Once = Once::new();
+
+ INIT.call_once(|| {
+ gst::init().unwrap();
+ gstrsinter::plugin_register_static().unwrap();
+ });
+}
+
+fn start_consumer(producer_name: &str) -> gst_check::Harness {
+ let mut hc = gst_check::Harness::new("intersrc");
+
+ hc.element()
+ .unwrap()
+ .set_property("producer-name", producer_name);
+ hc.play();
+
+ hc
+}
+
+fn start_producer(producer_name: &str) -> (gst::Pad, gst::Element) {
+ let element = gst::ElementFactory::make("intersink").build().unwrap();
+
+ element.set_property("producer-name", producer_name);
+ element.set_state(gst::State::Playing).unwrap();
+
+ let sinkpad = element.static_pad("sink").unwrap();
+ let srcpad = gst::Pad::new(gst::PadDirection::Src);
+ srcpad.set_active(true).unwrap();
+ srcpad.link(&sinkpad).unwrap();
+
+ srcpad.push_event(gst::event::StreamStart::builder("foo").build());
+ srcpad
+ .push_event(gst::event::Caps::builder(&gst::Caps::builder("video/x-raw").build()).build());
+ srcpad.push_event(
+ gst::event::Segment::builder(&gst::FormattedSegment::<gst::format::Time>::new()).build(),
+ );
+
+ (srcpad, element)
+}
+
+fn push_one(srcpad: &gst::Pad, pts: gst::ClockTime) {
+ let mut inbuf = gst::Buffer::with_size(1).unwrap();
+
+ {
+ let buf = inbuf.get_mut().unwrap();
+ buf.set_pts(pts);
+ }
+
+ srcpad.push(inbuf).unwrap();
+}
+
+#[test]
+#[serial]
+fn test_forward_one_buffer() {
+ init();
+
+ let mut hc = start_consumer("p1");
+ let (srcpad, element) = start_producer("p1");
+
+ push_one(&srcpad, gst::ClockTime::from_nseconds(1));
+
+ let outbuf = hc.pull().unwrap();
+
+ assert_eq!(outbuf.pts(), Some(gst::ClockTime::from_nseconds(1)));
+
+ element.set_state(gst::State::Null).unwrap();
+}
+
+#[test]
+#[serial]
+fn test_change_name_of_producer() {
+ init();
+
+ let mut hc1 = start_consumer("p1");
+ let mut hc2 = start_consumer("p2");
+ let (srcpad, element) = start_producer("p1");
+
+ /* Once this returns, the buffer should have been dispatched only to hc1 */
+ push_one(&srcpad, gst::ClockTime::from_nseconds(1));
+ let outbuf = hc1.pull().unwrap();
+ assert_eq!(outbuf.pts(), Some(gst::ClockTime::from_nseconds(1)));
+
+ element.set_property("producer-name", "p2");
+
+ /* This should only get dispatched to hc2, and it should be its first buffer */
+ push_one(&srcpad, gst::ClockTime::from_nseconds(2));
+ let outbuf = hc2.pull().unwrap();
+ assert_eq!(outbuf.pts(), Some(gst::ClockTime::from_nseconds(2)));
+
+ element.set_property("producer-name", "p1");
+
+ /* Back to hc1, which should not see the buffer we pushed in the previous step */
+ push_one(&srcpad, gst::ClockTime::from_nseconds(3));
+ let outbuf = hc1.pull().unwrap();
+ assert_eq!(outbuf.pts(), Some(gst::ClockTime::from_nseconds(3)));
+
+ element.set_state(gst::State::Null).unwrap();
+}
+
+#[test]
+#[serial]
+fn test_change_producer_name() {
+ init();
+
+ let mut hc = start_consumer("p1");
+ let (srcpad1, element1) = start_producer("p1");
+ let (srcpad2, element2) = start_producer("p2");
+
+ /* This buffer should be dispatched to no consumer */
+ push_one(&srcpad2, gst::ClockTime::from_nseconds(1));
+
+ /* This one should be dispatched to hc, and it should be its first buffer */
+ push_one(&srcpad1, gst::ClockTime::from_nseconds(2));
+ let outbuf = hc.pull().unwrap();
+ assert_eq!(outbuf.pts(), Some(gst::ClockTime::from_nseconds(2)));
+
+ hc.element().unwrap().set_property("producer-name", "p2");
+
+ /* This buffer should be dispatched to no consumer */
+ push_one(&srcpad1, gst::ClockTime::from_nseconds(3));
+
+ /* This one should be dispatched to hc, and it should be its next buffer */
+ push_one(&srcpad2, gst::ClockTime::from_nseconds(4));
+ let outbuf = hc.pull().unwrap();
+ assert_eq!(outbuf.pts(), Some(gst::ClockTime::from_nseconds(4)));
+
+ element1.set_state(gst::State::Null).unwrap();
+ element2.set_state(gst::State::Null).unwrap();
+}