Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrançois Laignel <fengalin@free.fr>2022-08-16 15:26:24 +0300
committerFrançois Laignel <fengalin@free.fr>2022-08-18 19:42:18 +0300
commit57da8e649dddf984929d0bd1ceaae29c522e03c1 (patch)
tree63c0abc8f59e02abe0112645ac86784c8302595f /generic
parent374bb8323ff187222c472457ac8b7921f48c8854 (diff)
ts/examples: introduce a standalone pipeline test
Implement a test that initializes pipelines with minimalistic theadshare src and sink. This can help with the evaluation of changes to the threadshare runtime or with element implementation details. It makes it easy to run flamegraph or callgrind and to focus on the threadshare runtime overhead.
Diffstat (limited to 'generic')
-rw-r--r--generic/threadshare/Cargo.toml8
-rw-r--r--generic/threadshare/examples/standalone/main.rs142
-rw-r--r--generic/threadshare/examples/standalone/sink/imp.rs757
-rw-r--r--generic/threadshare/examples/standalone/sink/mod.rs17
-rw-r--r--generic/threadshare/examples/standalone/src/imp.rs494
-rw-r--r--generic/threadshare/examples/standalone/src/mod.rs17
6 files changed, 1435 insertions, 0 deletions
diff --git a/generic/threadshare/Cargo.toml b/generic/threadshare/Cargo.toml
index 774773fff..bf23f862a 100644
--- a/generic/threadshare/Cargo.toml
+++ b/generic/threadshare/Cargo.toml
@@ -26,6 +26,9 @@ slab = "0.4.7"
socket2 = {features = ["all"], version = "0.4"}
waker-fn = "1.1"
+# Used by examples
+clap = { version = "3.2", features = ["derive"], optional = true }
+
[target.'cfg(windows)'.dependencies]
winapi = { version = "0.3.9", features = ["winsock2", "processthreadsapi"] }
@@ -50,6 +53,11 @@ path = "examples/udpsrc_benchmark_sender.rs"
name = "tcpclientsrc-benchmark-sender"
path = "examples/tcpclientsrc_benchmark_sender.rs"
+[[example]]
+name = "standalone"
+path = "examples/standalone/main.rs"
+required-features = ["clap"]
+
[build-dependencies]
gst-plugin-version-helper = { path="../../version-helper" }
cc = "1.0.38"
diff --git a/generic/threadshare/examples/standalone/main.rs b/generic/threadshare/examples/standalone/main.rs
new file mode 100644
index 000000000..b8ecfd3f5
--- /dev/null
+++ b/generic/threadshare/examples/standalone/main.rs
@@ -0,0 +1,142 @@
+use gst::glib;
+use once_cell::sync::Lazy;
+
+mod sink;
+mod src;
+
+static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
+ gst::DebugCategory::new(
+ "ts-standalone-test-main",
+ gst::DebugColorFlags::empty(),
+ Some("Thread-sharing standalone test main"),
+ )
+});
+
+fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ src::register(plugin)?;
+ sink::register(plugin)?;
+
+ Ok(())
+}
+
+gst::plugin_define!(
+ threadshare_standalone_test,
+ env!("CARGO_PKG_DESCRIPTION"),
+ plugin_init,
+ concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")),
+ "LGPL",
+ env!("CARGO_PKG_NAME"),
+ env!("CARGO_PKG_NAME"),
+ env!("CARGO_PKG_REPOSITORY"),
+ env!("BUILD_REL_DATE")
+);
+
+use clap::Parser;
+
+#[derive(Parser, Debug)]
+#[clap(version)]
+#[clap(about = "Standalone pipeline threadshare runtime test")]
+struct Args {
+ /// Parallel streams to process.
+ #[clap(short, long, default_value_t = 100)]
+ streams: u32,
+
+ /// Threadshare groups.
+ #[clap(short, long, default_value_t = 2)]
+ groups: u32,
+
+ /// Threadshare Context wait in ms (max throttling duration).
+ #[clap(short, long, default_value_t = 20)]
+ wait: u32,
+
+ /// Number of buffers per stream to output before sending EOS (-1 = unlimited).
+ #[clap(short, long, default_value_t = 6000)]
+ num_buffers: i32,
+
+ /// Enable statistics logging (use GST_DEBUG=ts-standalone*:4).
+ #[clap(short, long)]
+ log_stats: bool,
+}
+
+fn main() {
+ use gst::prelude::*;
+ use std::time::Instant;
+
+ gst::init().unwrap();
+ self::plugin_register_static().unwrap();
+
+ #[cfg(debug_assertions)]
+ gst::warning!(CAT, "RUNNING DEBUG BUILD");
+
+ let args = Args::parse();
+
+ let pipeline = gst::Pipeline::new(None);
+
+ for i in 0..args.streams {
+ let ctx_name = format!("standalone {}", i % args.groups);
+
+ let src = gst::ElementFactory::make(
+ "ts-standalone-test-src",
+ Some(format!("src-{}", i).as_str()),
+ )
+ .unwrap();
+ src.set_property("context", &ctx_name);
+ src.set_property("context-wait", args.wait);
+ src.set_property("num-buffers", args.num_buffers);
+
+ let sink = gst::ElementFactory::make(
+ "ts-standalone-test-sink",
+ Some(format!("sink-{}", i).as_str()),
+ )
+ .unwrap();
+ sink.set_property("context", &ctx_name);
+ sink.set_property("context-wait", args.wait);
+ if i == 0 && args.log_stats {
+ sink.set_property("must-log-stats", true);
+ }
+
+ let elements = &[&src, &sink];
+ pipeline.add_many(elements).unwrap();
+ gst::Element::link_many(elements).unwrap();
+ }
+
+ let l = glib::MainLoop::new(None, false);
+
+ let bus = pipeline.bus().unwrap();
+ let pipeline_clone = pipeline.clone();
+ let l_clone = l.clone();
+ bus.add_watch(move |_, msg| {
+ use gst::MessageView;
+
+ match msg.view() {
+ MessageView::Eos(..) => {
+ gst::info!(CAT, "Shuting down");
+ let stop = Instant::now();
+ pipeline_clone.set_state(gst::State::Null).unwrap();
+ gst::info!(CAT, "Shuting down took {:.2?}", stop.elapsed());
+ l_clone.quit();
+ }
+ MessageView::Error(err) => {
+ gst::error!(
+ CAT,
+ "Error from {:?}: {} ({:?})",
+ err.src().map(|s| s.path_string()),
+ err.error(),
+ err.debug()
+ );
+ l_clone.quit();
+ }
+ _ => (),
+ };
+
+ glib::Continue(true)
+ })
+ .expect("Failed to add bus watch");
+
+ gst::info!(CAT, "Starting");
+ let start = Instant::now();
+ pipeline.set_state(gst::State::Playing).unwrap();
+ gst::info!(CAT, "Starting took {:.2?}", start.elapsed());
+
+ l.run();
+}
diff --git a/generic/threadshare/examples/standalone/sink/imp.rs b/generic/threadshare/examples/standalone/sink/imp.rs
new file mode 100644
index 000000000..3b7e06f74
--- /dev/null
+++ b/generic/threadshare/examples/standalone/sink/imp.rs
@@ -0,0 +1,757 @@
+// Copyright (C) 2022 François Laignel <fengalin@free.fr>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
+// If a copy of the MPL was not distributed with this file, You can obtain one at
+// <https://mozilla.org/MPL/2.0/>.
+//
+// SPDX-License-Identifier: MPL-2.0
+
+use futures::future::BoxFuture;
+use futures::prelude::*;
+use futures::stream::Peekable;
+
+use gst::glib;
+use gst::prelude::*;
+use gst::subclass::prelude::*;
+use gst::EventView;
+use gst::{element_error, error_msg};
+
+use once_cell::sync::Lazy;
+
+use gstthreadshare::runtime::prelude::*;
+use gstthreadshare::runtime::{self, Context, PadSink, PadSinkRef, Task};
+
+use std::pin::Pin;
+use std::sync::Mutex;
+use std::task::Poll;
+use std::time::{Duration, Instant};
+
+static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
+ gst::DebugCategory::new(
+ "ts-standalone-test-sink",
+ gst::DebugColorFlags::empty(),
+ Some("Thread-sharing standalone test sink"),
+ )
+});
+
+const DEFAULT_CONTEXT: &str = "";
+const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_millis(20);
+const DEFAULT_SYNC: bool = true;
+const DEFAULT_MUST_LOG_STATS: bool = false;
+
+const LOG_PERIOD: Duration = Duration::from_secs(20);
+
+#[derive(Debug, Clone)]
+struct Settings {
+ sync: bool,
+ context: String,
+ context_wait: Duration,
+ must_log_stats: bool,
+}
+
+impl Default for Settings {
+ fn default() -> Self {
+ Settings {
+ sync: DEFAULT_SYNC,
+ context: DEFAULT_CONTEXT.into(),
+ context_wait: DEFAULT_CONTEXT_WAIT,
+ must_log_stats: DEFAULT_MUST_LOG_STATS,
+ }
+ }
+}
+
+#[derive(Debug)]
+enum TaskItem {
+ Buffer(gst::Buffer),
+ Event(gst::Event),
+}
+
+#[derive(Clone, Debug)]
+struct TestSinkPadHandler;
+
+impl PadSinkHandler for TestSinkPadHandler {
+ type ElementImpl = TestSink;
+
+ fn sink_chain(
+ &self,
+ _pad: &PadSinkRef,
+ test_sink: &TestSink,
+ element: &gst::Element,
+ buffer: gst::Buffer,
+ ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
+ let sender = test_sink.clone_item_sender();
+ let element = element.clone().downcast::<super::TestSink>().unwrap();
+
+ async move {
+ if sender.send_async(TaskItem::Buffer(buffer)).await.is_err() {
+ gst::debug!(CAT, obj: &element, "Flushing");
+ return Err(gst::FlowError::Flushing);
+ }
+
+ Ok(gst::FlowSuccess::Ok)
+ }
+ .boxed()
+ }
+
+ fn sink_chain_list(
+ &self,
+ _pad: &PadSinkRef,
+ test_sink: &TestSink,
+ element: &gst::Element,
+ list: gst::BufferList,
+ ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
+ let sender = test_sink.clone_item_sender();
+ let element = element.clone().downcast::<super::TestSink>().unwrap();
+
+ async move {
+ for buffer in list.iter_owned() {
+ if sender.send_async(TaskItem::Buffer(buffer)).await.is_err() {
+ gst::debug!(CAT, obj: &element, "Flushing");
+ return Err(gst::FlowError::Flushing);
+ }
+ }
+
+ Ok(gst::FlowSuccess::Ok)
+ }
+ .boxed()
+ }
+
+ fn sink_event_serialized(
+ &self,
+ _pad: &PadSinkRef,
+ test_sink: &TestSink,
+ element: &gst::Element,
+ event: gst::Event,
+ ) -> BoxFuture<'static, bool> {
+ let sender = test_sink.clone_item_sender();
+ let element = element.clone().downcast::<super::TestSink>().unwrap();
+
+ async move {
+ if let EventView::FlushStop(_) = event.view() {
+ let test_sink = element.imp();
+ return test_sink.task.flush_stop().await_maybe_on_context().is_ok();
+ } else if sender.send_async(TaskItem::Event(event)).await.is_err() {
+ gst::debug!(CAT, obj: &element, "Flushing");
+ }
+
+ true
+ }
+ .boxed()
+ }
+
+ fn sink_event(
+ &self,
+ _pad: &PadSinkRef,
+ test_sink: &TestSink,
+ _element: &gst::Element,
+ event: gst::Event,
+ ) -> bool {
+ if let EventView::FlushStart(..) = event.view() {
+ return test_sink
+ .task
+ .flush_start()
+ .await_maybe_on_context()
+ .is_ok();
+ }
+
+ true
+ }
+}
+
+#[derive(Default)]
+struct Stats {
+ must_log: bool,
+ sync: bool,
+ ramp_up_instant: Option<Instant>,
+ log_start_instant: Option<Instant>,
+ last_delta_instant: Option<Instant>,
+ buffer_count: u32,
+ buffer_count_delta: u32,
+ buffer_headroom: Duration,
+ buffer_headroom_delta: Duration,
+ late_buffer_count: u32,
+ lateness: Duration,
+ max_lateness: Duration,
+ late_buffer_count_delta: u32,
+ lateness_delta: Duration,
+ max_lateness_delta: Duration,
+}
+
+impl Stats {
+ fn start(&mut self) {
+ if !self.must_log {
+ return;
+ }
+
+ self.buffer_count = 0;
+ self.buffer_count_delta = 0;
+ self.buffer_headroom = Duration::ZERO;
+ self.buffer_headroom_delta = Duration::ZERO;
+ self.late_buffer_count = 0;
+ self.lateness = Duration::ZERO;
+ self.max_lateness = Duration::ZERO;
+ self.late_buffer_count_delta = 0;
+ self.lateness_delta = Duration::ZERO;
+ self.max_lateness_delta = Duration::ZERO;
+ self.last_delta_instant = None;
+ self.log_start_instant = None;
+
+ self.ramp_up_instant = Some(Instant::now());
+ gst::info!(CAT, "First stats logs in {:.2?}", 2 * LOG_PERIOD);
+ }
+
+ fn can_count(&mut self) -> bool {
+ if !self.must_log {
+ return false;
+ }
+
+ if let Some(ramp_up_instant) = self.ramp_up_instant {
+ if ramp_up_instant.elapsed() < LOG_PERIOD {
+ return false;
+ }
+
+ self.ramp_up_instant = None;
+ gst::info!(CAT, "Ramp up complete. Stats logs in {:.2?}", LOG_PERIOD);
+ self.log_start_instant = Some(Instant::now());
+ self.last_delta_instant = self.log_start_instant;
+ }
+
+ true
+ }
+
+ fn notify_buffer(&mut self) {
+ if !self.can_count() {
+ return;
+ }
+
+ self.buffer_count += 1;
+ self.buffer_count_delta += 1;
+ }
+
+ fn notify_buffer_headroom(&mut self, headroom: Duration) {
+ if !self.can_count() {
+ return;
+ }
+
+ self.buffer_headroom += headroom;
+ self.buffer_headroom_delta += headroom;
+ }
+
+ fn notify_late_buffer(&mut self, now: Option<gst::ClockTime>, pts: gst::ClockTime) {
+ if !self.can_count() {
+ return;
+ }
+
+ let lateness = now
+ .opt_checked_sub(pts)
+ .ok()
+ .flatten()
+ .map_or(Duration::ZERO, Duration::from);
+
+ self.late_buffer_count += 1;
+ self.lateness += lateness;
+ self.max_lateness = self.max_lateness.max(lateness);
+
+ self.late_buffer_count_delta += 1;
+ self.lateness_delta += lateness;
+ self.max_lateness_delta = self.max_lateness_delta.max(lateness);
+ }
+
+ fn log_delta(&mut self) {
+ let delta_duration = match self.last_delta_instant {
+ Some(last_delta) => last_delta.elapsed(),
+ None => return,
+ };
+
+ if delta_duration < LOG_PERIOD {
+ return;
+ }
+
+ self.last_delta_instant = Some(Instant::now());
+
+ gst::info!(CAT, "Delta stats:");
+ gst::info!(
+ CAT,
+ "o {:>5.2} buffers / s",
+ self.buffer_count_delta as f32 / delta_duration.as_millis() as f32 * 1_000f32,
+ );
+
+ if self.sync && self.buffer_count_delta > 0 {
+ let early_buffers_count = self
+ .buffer_count_delta
+ .saturating_sub(self.late_buffer_count_delta);
+ if early_buffers_count > 0 {
+ gst::info!(
+ CAT,
+ "o {:>5.2?} headroom / early buffers",
+ self.buffer_headroom_delta / early_buffers_count,
+ );
+
+ gst::info!(
+ CAT,
+ "o {:>5.2}% late buffers - mean {:>5.2?}, max {:>5.2?}",
+ self.late_buffer_count_delta as f32 / self.buffer_count_delta as f32 * 100f32,
+ self.lateness_delta
+ .checked_div(self.late_buffer_count_delta)
+ .unwrap_or(Duration::ZERO),
+ self.max_lateness_delta,
+ );
+ }
+
+ self.buffer_headroom_delta = Duration::ZERO;
+ self.late_buffer_count_delta = 0;
+ self.lateness_delta = Duration::ZERO;
+ self.max_lateness_delta = Duration::ZERO;
+ }
+
+ self.buffer_count_delta = 0;
+ }
+
+ fn log_global(&mut self) {
+ let log_duration = match self.log_start_instant {
+ Some(start) => start.elapsed(),
+ None => return,
+ };
+
+ gst::info!(CAT, "Global stats:");
+ gst::info!(
+ CAT,
+ "o {:>5.2} buffers / s",
+ self.buffer_count as f32 / log_duration.as_millis() as f32 * 1_000f32,
+ );
+
+ if self.sync && self.buffer_count > 0 {
+ let early_buffers_count = self.buffer_count.saturating_sub(self.late_buffer_count);
+ if early_buffers_count > 0 {
+ gst::info!(
+ CAT,
+ "o {:>5.2?} headroom / early buffers",
+ self.buffer_headroom / early_buffers_count,
+ );
+
+ gst::info!(
+ CAT,
+ "o {:>5.2}% late buffers - mean {:>5.2?}, max {:>5.2?}",
+ self.late_buffer_count as f32 / self.buffer_count as f32 * 100f32,
+ self.lateness
+ .checked_div(self.late_buffer_count)
+ .unwrap_or(Duration::ZERO),
+ self.max_lateness,
+ );
+ }
+ }
+ }
+}
+
+struct TestSinkTask {
+ element: super::TestSink,
+ item_receiver: Peekable<flume::r#async::RecvStream<'static, TaskItem>>,
+ sync: bool,
+ stats: Stats,
+ segment: Option<gst::Segment>,
+}
+
+impl TestSinkTask {
+ fn new(element: &super::TestSink, item_receiver: flume::Receiver<TaskItem>) -> Self {
+ TestSinkTask {
+ element: element.clone(),
+ item_receiver: item_receiver.into_stream().peekable(),
+ sync: DEFAULT_SYNC,
+ stats: Stats::default(),
+ segment: None,
+ }
+ }
+
+ async fn flush(&mut self) {
+ // Purge the channel
+ while let Poll::Ready(Some(_item)) = futures::poll!(self.item_receiver.next()) {}
+ }
+}
+
+impl TaskImpl for TestSinkTask {
+ type Item = TaskItem;
+
+ fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
+ async move {
+ gst::log!(CAT, obj: &self.element, "Preparing Task");
+
+ let sink = self.element.imp();
+ let settings = sink.settings.lock().unwrap();
+ self.sync = settings.sync;
+ self.stats.sync = self.sync;
+ self.stats.must_log = settings.must_log_stats;
+
+ Ok(())
+ }
+ .boxed()
+ }
+
+ fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
+ async {
+ gst::log!(CAT, obj: &self.element, "Starting Task");
+ self.stats.start();
+ Ok(())
+ }
+ .boxed()
+ }
+
+ fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
+ async {
+ gst::log!(CAT, obj: &self.element, "Stopping Task");
+ self.flush().await;
+ Ok(())
+ }
+ .boxed()
+ }
+
+ fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
+ async {
+ gst::log!(CAT, obj: &self.element, "Starting Task Flush");
+ self.flush().await;
+ Ok(())
+ }
+ .boxed()
+ }
+
+ fn try_next(&mut self) -> BoxFuture<'_, Result<TaskItem, gst::FlowError>> {
+ async move {
+ let item_opt = Pin::new(&mut self.item_receiver).peek().await;
+
+ // Check the peeked item in case we need to sync.
+ // The item will still be available in the channel
+ // in case this is cancelled by a state transition.
+ match item_opt {
+ Some(TaskItem::Buffer(buffer)) => {
+ self.stats.notify_buffer();
+
+ if self.sync {
+ let rtime = self.segment.as_ref().and_then(|segment| {
+ segment
+ .downcast_ref::<gst::format::Time>()
+ .and_then(|segment| segment.to_running_time(buffer.pts()))
+ });
+ if let Some(pts) = rtime {
+ // This can be cancelled by a state transition.
+ self.sync(pts).await;
+ }
+ }
+ }
+ Some(_) => (),
+ None => {
+ panic!("Internal channel sender dropped while Task is Started");
+ }
+ }
+
+ // An item was peeked above, we can now pop it without losing it.
+ Ok(self.item_receiver.next().await.unwrap())
+ }
+ .boxed()
+ }
+
+ fn handle_item(&mut self, item: TaskItem) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ async move {
+ gst::debug!(CAT, obj: &self.element, "Handling {:?}", item);
+
+ match item {
+ TaskItem::Buffer(buffer) => {
+ self.render(buffer).await.map_err(|err| {
+ element_error!(
+ &self.element,
+ gst::StreamError::Failed,
+ ["Failed to render item, stopping task: {}", err]
+ );
+ gst::FlowError::Error
+ })?;
+
+ self.stats.log_delta();
+ }
+ TaskItem::Event(event) => match event.view() {
+ EventView::Eos(_) => {
+ self.stats.log_global();
+
+ let _ = self
+ .element
+ .post_message(gst::message::Eos::builder().src(&self.element).build());
+ }
+ EventView::Segment(e) => {
+ self.segment = Some(e.segment().clone());
+ }
+ EventView::SinkMessage(e) => {
+ let _ = self.element.post_message(e.message());
+ }
+ _ => (),
+ },
+ }
+
+ Ok(())
+ }
+ .boxed()
+ }
+}
+
+impl TestSinkTask {
+ async fn render(&mut self, buffer: gst::Buffer) -> Result<(), gst::FlowError> {
+ let _data = buffer.map_readable().map_err(|_| {
+ element_error!(
+ self.element,
+ gst::StreamError::Format,
+ ["Failed to map buffer readable"]
+ );
+ gst::FlowError::Error
+ })?;
+
+ gst::log!(CAT, obj: &self.element, "buffer {:?} rendered", buffer);
+
+ Ok(())
+ }
+
+ /// Waits until specified time.
+ async fn sync(&mut self, pts: gst::ClockTime) {
+ let now = self.element.current_running_time();
+
+ if let Ok(Some(delay)) = pts.opt_checked_sub(now) {
+ let delay = delay.into();
+ gst::trace!(CAT, obj: &self.element, "sync: waiting {:?}", delay);
+ runtime::time::delay_for(delay).await;
+
+ self.stats.notify_buffer_headroom(delay);
+ } else {
+ self.stats.notify_late_buffer(now, pts);
+ }
+ }
+}
+
+#[derive(Debug)]
+pub struct TestSink {
+ sink_pad: PadSink,
+ task: Task,
+ item_sender: Mutex<Option<flume::Sender<TaskItem>>>,
+ settings: Mutex<Settings>,
+}
+
+impl TestSink {
+ #[track_caller]
+ fn clone_item_sender(&self) -> flume::Sender<TaskItem> {
+ self.item_sender.lock().unwrap().as_ref().unwrap().clone()
+ }
+
+ fn prepare(&self, element: &super::TestSink) -> Result<(), gst::ErrorMessage> {
+ gst::debug!(CAT, obj: element, "Preparing");
+
+ let context = {
+ let settings = self.settings.lock().unwrap();
+
+ Context::acquire(&settings.context, settings.context_wait).map_err(|err| {
+ error_msg!(
+ gst::ResourceError::OpenWrite,
+ ["Failed to acquire Context: {}", err]
+ )
+ })?
+ };
+
+ // Enable backpressure for items
+ let (item_sender, item_receiver) = flume::bounded(0);
+ let task_impl = TestSinkTask::new(element, item_receiver);
+ self.task.prepare(task_impl, context).block_on()?;
+
+ *self.item_sender.lock().unwrap() = Some(item_sender);
+
+ gst::debug!(CAT, obj: element, "Started preparation");
+
+ Ok(())
+ }
+
+ fn unprepare(&self, element: &super::TestSink) {
+ gst::debug!(CAT, obj: element, "Unpreparing");
+ self.task.unprepare().block_on().unwrap();
+ gst::debug!(CAT, obj: element, "Unprepared");
+ }
+
+ fn stop(&self, element: &super::TestSink) -> Result<(), gst::ErrorMessage> {
+ gst::debug!(CAT, obj: element, "Stopping");
+ self.task.stop().block_on()?;
+ gst::debug!(CAT, obj: element, "Stopped");
+ Ok(())
+ }
+
+ fn start(&self, element: &super::TestSink) -> Result<(), gst::ErrorMessage> {
+ gst::debug!(CAT, obj: element, "Starting");
+ self.task.start().block_on()?;
+ gst::debug!(CAT, obj: element, "Started");
+ Ok(())
+ }
+}
+
+#[glib::object_subclass]
+impl ObjectSubclass for TestSink {
+ const NAME: &'static str = "StandaloneTestSink";
+ type Type = super::TestSink;
+ type ParentType = gst::Element;
+
+ fn with_class(klass: &Self::Class) -> Self {
+ Self {
+ sink_pad: PadSink::new(
+ gst::Pad::from_template(&klass.pad_template("sink").unwrap(), Some("sink")),
+ TestSinkPadHandler,
+ ),
+ task: Task::default(),
+ item_sender: Default::default(),
+ settings: Default::default(),
+ }
+ }
+}
+
+impl ObjectImpl for TestSink {
+ fn properties() -> &'static [glib::ParamSpec] {
+ static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
+ vec![
+ glib::ParamSpecString::new(
+ "context",
+ "Context",
+ "Context name to share threads with",
+ Some(DEFAULT_CONTEXT),
+ glib::ParamFlags::READWRITE,
+ ),
+ glib::ParamSpecUInt::new(
+ "context-wait",
+ "Context Wait",
+ "Throttle poll loop to run at most once every this many ms",
+ 0,
+ 1000,
+ DEFAULT_CONTEXT_WAIT.as_millis() as u32,
+ glib::ParamFlags::READWRITE,
+ ),
+ glib::ParamSpecBoolean::new(
+ "sync",
+ "Sync",
+ "Sync on the clock",
+ DEFAULT_SYNC,
+ glib::ParamFlags::READWRITE,
+ ),
+ glib::ParamSpecBoolean::new(
+ "must-log-stats",
+ "Must Log Stats",
+ "Whether statistics should be logged",
+ DEFAULT_MUST_LOG_STATS,
+ glib::ParamFlags::WRITABLE,
+ ),
+ ]
+ });
+
+ PROPERTIES.as_ref()
+ }
+
+ fn set_property(
+ &self,
+ _obj: &Self::Type,
+ _id: usize,
+ value: &glib::Value,
+ pspec: &glib::ParamSpec,
+ ) {
+ let mut settings = self.settings.lock().unwrap();
+ match pspec.name() {
+ "sync" => {
+ let sync = value.get().expect("type checked upstream");
+ settings.sync = sync;
+ }
+ "context" => {
+ settings.context = value
+ .get::<Option<String>>()
+ .expect("type checked upstream")
+ .unwrap_or_else(|| DEFAULT_CONTEXT.into());
+ }
+ "context-wait" => {
+ settings.context_wait = Duration::from_millis(
+ value.get::<u32>().expect("type checked upstream").into(),
+ );
+ }
+ "must-log-stats" => {
+ let must_log_stats = value.get().expect("type checked upstream");
+ settings.must_log_stats = must_log_stats;
+ }
+ _ => unimplemented!(),
+ }
+ }
+
+ fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
+ let settings = self.settings.lock().unwrap();
+ match pspec.name() {
+ "sync" => settings.sync.to_value(),
+ "context" => settings.context.to_value(),
+ "context-wait" => (settings.context_wait.as_millis() as u32).to_value(),
+ _ => unimplemented!(),
+ }
+ }
+
+ fn constructed(&self, obj: &Self::Type) {
+ self.parent_constructed(obj);
+
+ obj.add_pad(self.sink_pad.gst_pad()).unwrap();
+
+ gstthreadshare::set_element_flags(obj, gst::ElementFlags::SINK);
+ }
+}
+
+impl GstObjectImpl for TestSink {}
+
+impl ElementImpl for TestSink {
+ fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
+ static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
+ gst::subclass::ElementMetadata::new(
+ "Thread-sharing standalone test sink",
+ "Sink/Test",
+ "Thread-sharing standalone test sink",
+ "François Laignel <fengalin@free.fr>",
+ )
+ });
+
+ Some(&*ELEMENT_METADATA)
+ }
+
+ fn pad_templates() -> &'static [gst::PadTemplate] {
+ static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
+ let caps = gst::Caps::new_any();
+
+ let sink_pad_template = gst::PadTemplate::new(
+ "sink",
+ gst::PadDirection::Sink,
+ gst::PadPresence::Always,
+ &caps,
+ )
+ .unwrap();
+
+ vec![sink_pad_template]
+ });
+
+ PAD_TEMPLATES.as_ref()
+ }
+
+ fn change_state(
+ &self,
+ element: &Self::Type,
+ transition: gst::StateChange,
+ ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
+ gst::trace!(CAT, obj: element, "Changing state {:?}", transition);
+
+ match transition {
+ gst::StateChange::NullToReady => {
+ self.prepare(element).map_err(|err| {
+ element.post_error_message(err);
+ gst::StateChangeError
+ })?;
+ }
+ gst::StateChange::ReadyToPaused => {
+ self.start(element).map_err(|_| gst::StateChangeError)?;
+ }
+ gst::StateChange::PausedToReady => {
+ self.stop(element).map_err(|_| gst::StateChangeError)?;
+ }
+ gst::StateChange::ReadyToNull => {
+ self.unprepare(element);
+ }
+ _ => (),
+ }
+
+ self.parent_change_state(element, transition)
+ }
+}
diff --git a/generic/threadshare/examples/standalone/sink/mod.rs b/generic/threadshare/examples/standalone/sink/mod.rs
new file mode 100644
index 000000000..6b8ce9527
--- /dev/null
+++ b/generic/threadshare/examples/standalone/sink/mod.rs
@@ -0,0 +1,17 @@
+use gst::glib;
+use gst::prelude::*;
+
+mod imp;
+
+glib::wrapper! {
+ pub struct TestSink(ObjectSubclass<imp::TestSink>) @extends gst::Element, gst::Object;
+}
+
+pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ gst::Element::register(
+ Some(plugin),
+ "ts-standalone-test-sink",
+ gst::Rank::None,
+ TestSink::static_type(),
+ )
+}
diff --git a/generic/threadshare/examples/standalone/src/imp.rs b/generic/threadshare/examples/standalone/src/imp.rs
new file mode 100644
index 000000000..6cb6ebba2
--- /dev/null
+++ b/generic/threadshare/examples/standalone/src/imp.rs
@@ -0,0 +1,494 @@
+// Copyright (C) 2022 François Laignel <fengalin@free.fr>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
+// If a copy of the MPL was not distributed with this file, You can obtain one at
+// <https://mozilla.org/MPL/2.0/>.
+//
+// SPDX-License-Identifier: MPL-2.0
+
+use futures::future::BoxFuture;
+use futures::prelude::*;
+
+use gst::glib;
+use gst::prelude::*;
+use gst::subclass::prelude::*;
+
+use once_cell::sync::Lazy;
+
+use std::sync::Mutex;
+use std::time::{Duration, Instant};
+
+use gstthreadshare::runtime::prelude::*;
+use gstthreadshare::runtime::{Context, PadSrc, Task, Timer};
+
+static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
+ gst::DebugCategory::new(
+ "ts-standalone-test-src",
+ gst::DebugColorFlags::empty(),
+ Some("Thread-sharing standalone test src"),
+ )
+});
+
+const BUFFER_DURATION: gst::ClockTime = gst::ClockTime::from_mseconds(20);
+
+const DEFAULT_CONTEXT: &str = "";
+const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_millis(20);
+const DEFAULT_NUM_BUFFERS: i32 = 50 * 60 * 2;
+
+#[derive(Debug, Clone)]
+struct Settings {
+ context: String,
+ context_wait: Duration,
+ num_buffers: Option<i32>,
+}
+
+impl Default for Settings {
+ fn default() -> Self {
+ Settings {
+ context: DEFAULT_CONTEXT.into(),
+ context_wait: DEFAULT_CONTEXT_WAIT,
+ num_buffers: Some(DEFAULT_NUM_BUFFERS),
+ }
+ }
+}
+
+#[derive(Clone, Debug)]
+struct TestSrcPadHandler;
+impl PadSrcHandler for TestSrcPadHandler {
+ type ElementImpl = TestSrc;
+}
+
+#[derive(Debug)]
+struct SrcTask {
+ element: super::TestSrc,
+ buffer_pool: gst::BufferPool,
+ last_pts: gst::ClockTime,
+ last_buf_instant: Option<Instant>,
+ push_period: Duration,
+ need_initial_events: bool,
+ need_segment: bool,
+ num_buffers: Option<i32>,
+ buffer_count: i32,
+}
+
+impl SrcTask {
+ fn new(element: super::TestSrc) -> Self {
+ let buffer_pool = gst::BufferPool::new();
+ let mut pool_config = buffer_pool.config();
+ pool_config
+ .as_mut()
+ .set_params(Some(&gst::Caps::builder("foo/bar").build()), 10, 10, 10);
+ buffer_pool.set_config(pool_config).unwrap();
+
+ SrcTask {
+ element,
+ buffer_pool,
+ last_pts: gst::ClockTime::ZERO,
+ last_buf_instant: None,
+ push_period: Duration::ZERO,
+ need_initial_events: true,
+ need_segment: true,
+ num_buffers: Some(DEFAULT_NUM_BUFFERS),
+ buffer_count: 0,
+ }
+ }
+}
+
+impl TaskImpl for SrcTask {
+ type Item = gst::Buffer;
+
+ fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
+ async move {
+ gst::log!(CAT, obj: &self.element, "Preparing Task");
+
+ let src = self.element.imp();
+ let settings = src.settings.lock().unwrap();
+ self.push_period = settings.context_wait;
+ self.num_buffers = settings.num_buffers;
+
+ Ok(())
+ }
+ .boxed()
+ }
+
+ fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
+ async {
+ gst::log!(CAT, obj: &self.element, "Starting Task");
+ self.buffer_count = 0;
+ self.last_buf_instant = None;
+ self.buffer_pool.set_active(true).unwrap();
+ Ok(())
+ }
+ .boxed()
+ }
+
+ fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
+ async move {
+ gst::log!(CAT, obj: &self.element, "Stopping task");
+
+ self.buffer_pool.set_active(false).unwrap();
+ self.last_pts = gst::ClockTime::ZERO;
+ self.need_initial_events = true;
+ self.need_segment = true;
+
+ gst::log!(CAT, obj: &self.element, "Task stopped");
+ Ok(())
+ }
+ .boxed()
+ }
+
+ fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
+ async move {
+ gst::log!(CAT, obj: &self.element, "Starting task flush");
+
+ self.buffer_pool.set_active(false).unwrap();
+ self.need_segment = true;
+
+ gst::log!(CAT, obj: &self.element, "Task flush started");
+ Ok(())
+ }
+ .boxed()
+ }
+
+ fn try_next(&mut self) -> BoxFuture<'_, Result<gst::Buffer, gst::FlowError>> {
+ async move {
+ if let Some(delay) = self
+ .last_buf_instant
+ .map(|last| last.elapsed())
+ .opt_checked_sub(self.push_period)
+ .ok()
+ .flatten()
+ {
+ Timer::after(delay).await;
+ }
+
+ self.last_buf_instant = Some(Instant::now());
+
+ let start = self.last_pts;
+ self.last_pts = start + BUFFER_DURATION;
+
+ self.buffer_pool.acquire_buffer(None).map(|mut buffer| {
+ {
+ let buffer = buffer.get_mut().unwrap();
+ buffer.set_pts(start);
+ buffer.set_duration(BUFFER_DURATION);
+ }
+
+ buffer
+ })
+ }
+ .boxed()
+ }
+
+ fn handle_item(&mut self, buffer: gst::Buffer) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ async move {
+ let res = self.push(buffer).await;
+ match res {
+ Ok(_) => {
+ gst::log!(CAT, obj: &self.element, "Successfully pushed buffer");
+ }
+ Err(gst::FlowError::Eos) => {
+ gst::debug!(CAT, obj: &self.element, "EOS");
+ let test_src = self.element.imp();
+ test_src.src_pad.push_event(gst::event::Eos::new()).await;
+ }
+ Err(gst::FlowError::Flushing) => {
+ gst::debug!(CAT, obj: &self.element, "Flushing");
+ }
+ Err(err) => {
+ gst::error!(CAT, obj: &self.element, "Got error {}", err);
+ gst::element_error!(
+ &self.element,
+ gst::StreamError::Failed,
+ ("Internal data stream error"),
+ ["streaming stopped, reason {}", err]
+ );
+ }
+ }
+
+ res.map(drop)
+ }
+ .boxed()
+ }
+}
+
+impl SrcTask {
+ async fn push(&mut self, buffer: gst::Buffer) -> Result<gst::FlowSuccess, gst::FlowError> {
+ gst::log!(CAT, obj: &self.element, "Handling {:?}", buffer);
+ let test_src = self.element.imp();
+
+ if self.need_initial_events {
+ gst::debug!(CAT, obj: &self.element, "Pushing initial events");
+
+ let stream_id = format!("{:08x}{:08x}", rand::random::<u32>(), rand::random::<u32>());
+ let stream_start_evt = gst::event::StreamStart::builder(&stream_id)
+ .group_id(gst::GroupId::next())
+ .build();
+ test_src.src_pad.push_event(stream_start_evt).await;
+
+ test_src
+ .src_pad
+ .push_event(gst::event::Caps::new(
+ &gst::Caps::builder("foo/bar").build(),
+ ))
+ .await;
+
+ self.need_initial_events = false;
+ }
+
+ if self.need_segment {
+ let segment_evt =
+ gst::event::Segment::new(&gst::FormattedSegment::<gst::format::Time>::new());
+ test_src.src_pad.push_event(segment_evt).await;
+
+ self.need_segment = false;
+ }
+
+ gst::debug!(CAT, obj: &self.element, "Forwarding {:?}", buffer);
+ let ok = test_src.src_pad.push(buffer).await?;
+
+ self.buffer_count += 1;
+
+ if self.num_buffers.opt_eq(self.buffer_count).unwrap_or(false) {
+ return Err(gst::FlowError::Eos);
+ }
+
+ Ok(ok)
+ }
+}
+
+#[derive(Debug)]
+pub struct TestSrc {
+ src_pad: PadSrc,
+ task: Task,
+ settings: Mutex<Settings>,
+}
+
+impl TestSrc {
+ fn prepare(&self, element: &super::TestSrc) -> Result<(), gst::ErrorMessage> {
+ gst::debug!(CAT, obj: element, "Preparing");
+
+ let settings = self.settings.lock().unwrap();
+ let context =
+ Context::acquire(&settings.context, settings.context_wait).map_err(|err| {
+ gst::error_msg!(
+ gst::ResourceError::OpenRead,
+ ["Failed to acquire Context: {}", err]
+ )
+ })?;
+ drop(settings);
+
+ self.task
+ .prepare(SrcTask::new(element.clone()), context)
+ .block_on()?;
+
+ gst::debug!(CAT, obj: element, "Prepared");
+
+ Ok(())
+ }
+
+ fn unprepare(&self, element: &super::TestSrc) {
+ gst::debug!(CAT, obj: element, "Unpreparing");
+ self.task.unprepare().block_on().unwrap();
+ gst::debug!(CAT, obj: element, "Unprepared");
+ }
+
+ fn stop(&self, element: &super::TestSrc) -> Result<(), gst::ErrorMessage> {
+ gst::debug!(CAT, obj: element, "Stopping");
+ self.task.stop().block_on()?;
+ gst::debug!(CAT, obj: element, "Stopped");
+ Ok(())
+ }
+
+ fn start(&self, element: &super::TestSrc) -> Result<(), gst::ErrorMessage> {
+ gst::debug!(CAT, obj: element, "Starting");
+ self.task.start().block_on()?;
+ gst::debug!(CAT, obj: element, "Started");
+ Ok(())
+ }
+
+ fn pause(&self, element: &super::TestSrc) -> Result<(), gst::ErrorMessage> {
+ gst::debug!(CAT, obj: element, "Pausing");
+ self.task.pause().block_on()?;
+ gst::debug!(CAT, obj: element, "Paused");
+ Ok(())
+ }
+}
+
+#[glib::object_subclass]
+impl ObjectSubclass for TestSrc {
+ const NAME: &'static str = "StandaloneTestSrc";
+ type Type = super::TestSrc;
+ type ParentType = gst::Element;
+
+ fn with_class(klass: &Self::Class) -> Self {
+ Self {
+ src_pad: PadSrc::new(
+ gst::Pad::from_template(&klass.pad_template("src").unwrap(), Some("src")),
+ TestSrcPadHandler,
+ ),
+ task: Task::default(),
+ settings: Default::default(),
+ }
+ }
+}
+
+impl ObjectImpl for TestSrc {
+ fn properties() -> &'static [glib::ParamSpec] {
+ static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
+ vec![
+ glib::ParamSpecString::new(
+ "context",
+ "Context",
+ "Context name to share threads with",
+ Some(DEFAULT_CONTEXT),
+ glib::ParamFlags::READWRITE,
+ ),
+ glib::ParamSpecUInt::new(
+ "context-wait",
+ "Context Wait",
+ "Throttle poll loop to run at most once every this many ms",
+ 0,
+ 1000,
+ DEFAULT_CONTEXT_WAIT.as_millis() as u32,
+ glib::ParamFlags::READWRITE,
+ ),
+ glib::ParamSpecInt::new(
+ "num-buffers",
+ "Num Buffers",
+ "Number of buffers to output before sending EOS (-1 = unlimited)",
+ -1i32,
+ i32::MAX,
+ DEFAULT_NUM_BUFFERS,
+ glib::ParamFlags::READWRITE,
+ ),
+ ]
+ });
+
+ PROPERTIES.as_ref()
+ }
+
+ fn set_property(
+ &self,
+ _obj: &Self::Type,
+ _id: usize,
+ value: &glib::Value,
+ pspec: &glib::ParamSpec,
+ ) {
+ let mut settings = self.settings.lock().unwrap();
+ match pspec.name() {
+ "context" => {
+ settings.context = value
+ .get::<Option<String>>()
+ .expect("type checked upstream")
+ .unwrap_or_else(|| DEFAULT_CONTEXT.into());
+ }
+ "context-wait" => {
+ settings.context_wait = Duration::from_millis(
+ value.get::<u32>().expect("type checked upstream").into(),
+ );
+ }
+ "num-buffers" => {
+ let value = value.get::<i32>().expect("type checked upstream");
+ settings.num_buffers = if value > 0 { Some(value) } else { None };
+ }
+ _ => unimplemented!(),
+ }
+ }
+
+ fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
+ let settings = self.settings.lock().unwrap();
+ match pspec.name() {
+ "context" => settings.context.to_value(),
+ "context-wait" => (settings.context_wait.as_millis() as u32).to_value(),
+ "num-buffers" => settings.num_buffers.unwrap_or(-1).to_value(),
+ _ => unimplemented!(),
+ }
+ }
+
+ fn constructed(&self, obj: &Self::Type) {
+ self.parent_constructed(obj);
+
+ obj.add_pad(self.src_pad.gst_pad()).unwrap();
+
+ gstthreadshare::set_element_flags(obj, gst::ElementFlags::SOURCE);
+ }
+}
+
+impl GstObjectImpl for TestSrc {}
+
+impl ElementImpl for TestSrc {
+ fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
+ static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
+ gst::subclass::ElementMetadata::new(
+ "Thread-sharing standalone test source",
+ "Source/Test",
+ "Thread-sharing standalone test source",
+ "François Laignel <fengalin@free.fr>",
+ )
+ });
+
+ Some(&*ELEMENT_METADATA)
+ }
+
+ fn pad_templates() -> &'static [gst::PadTemplate] {
+ static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
+ let caps = gst::Caps::new_any();
+ let src_pad_template = gst::PadTemplate::new(
+ "src",
+ gst::PadDirection::Src,
+ gst::PadPresence::Always,
+ &caps,
+ )
+ .unwrap();
+
+ vec![src_pad_template]
+ });
+
+ PAD_TEMPLATES.as_ref()
+ }
+
+ fn change_state(
+ &self,
+ element: &Self::Type,
+ transition: gst::StateChange,
+ ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
+ gst::trace!(CAT, obj: element, "Changing state {:?}", transition);
+
+ match transition {
+ gst::StateChange::NullToReady => {
+ self.prepare(element).map_err(|err| {
+ element.post_error_message(err);
+ gst::StateChangeError
+ })?;
+ }
+ gst::StateChange::PlayingToPaused => {
+ self.pause(element).map_err(|_| gst::StateChangeError)?;
+ }
+ gst::StateChange::ReadyToNull => {
+ self.unprepare(element);
+ }
+ _ => (),
+ }
+
+ let mut success = self.parent_change_state(element, transition)?;
+
+ match transition {
+ gst::StateChange::ReadyToPaused => {
+ success = gst::StateChangeSuccess::NoPreroll;
+ }
+ gst::StateChange::PausedToPlaying => {
+ self.start(element).map_err(|_| gst::StateChangeError)?;
+ }
+ gst::StateChange::PlayingToPaused => {
+ success = gst::StateChangeSuccess::NoPreroll;
+ }
+ gst::StateChange::PausedToReady => {
+ self.stop(element).map_err(|_| gst::StateChangeError)?;
+ }
+ _ => (),
+ }
+
+ Ok(success)
+ }
+}
diff --git a/generic/threadshare/examples/standalone/src/mod.rs b/generic/threadshare/examples/standalone/src/mod.rs
new file mode 100644
index 000000000..c0fa27775
--- /dev/null
+++ b/generic/threadshare/examples/standalone/src/mod.rs
@@ -0,0 +1,17 @@
+use gst::glib;
+use gst::prelude::*;
+
+mod imp;
+
+glib::wrapper! {
+ pub struct TestSrc(ObjectSubclass<imp::TestSrc>) @extends gst::Element, gst::Object;
+}
+
+pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ gst::Element::register(
+ Some(plugin),
+ "ts-standalone-test-src",
+ gst::Rank::None,
+ TestSrc::static_type(),
+ )
+}