Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/sdroege/gst-plugin-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrançois Laignel <fengalin@free.fr>2022-10-15 21:13:32 +0300
committerSebastian Dröge <slomo@coaxion.net>2022-11-09 10:55:04 +0300
commit9b96cfc4526090f153f9b6a5b539780c71f08499 (patch)
tree499ea4021c9a1abdd5740223564036a047f08fc6 /generic
parent4616f0a4a46abbe88a9d827f084c419dbb12bfd8 (diff)
ts/standalone: add new Sinks
Contrary to the existing Task Sink, the Async and Sync Mutex Sinks handle buffers in the `PadSinkHandler` directly. The Async Mutex Sink uses an async Mutex for the `PadSinkHandlerInner` while the Sync Mutex Sink uses... a sync Mutex. All Sinks share the same settings and stats manager. Use the `--sink` command line option to select the sink (default is `sync-mutex` since it allows evaluating the framework with as little overhead as possible. Also apply various fixes: - Only keep the segment start instead of the full `Segment`. This helps with cache locality (`Segment` is a plain struct with many fields) and avoids downcasting the generic `Segment` upon each buffer handling. - Box the `Stat`s. This should improve cache locality a bit. - Fix EOS handling which took ages for no benefits in this particular use case. - Use a macro to raise log level in the main element. - Move error handling during item processing in `handle_loop_error`. This function was precisely designed for this and it should reduce the `handle_item`'s Future size.
Diffstat (limited to 'generic')
-rw-r--r--generic/threadshare/examples/standalone/args/clap_args.rs66
-rw-r--r--generic/threadshare/examples/standalone/args/default_args.rs47
-rw-r--r--generic/threadshare/examples/standalone/args/mod.rs9
-rw-r--r--generic/threadshare/examples/standalone/macros.rs19
-rw-r--r--generic/threadshare/examples/standalone/main.rs144
-rw-r--r--generic/threadshare/examples/standalone/sink/async_mutex/imp.rs334
-rw-r--r--generic/threadshare/examples/standalone/sink/async_mutex/mod.rs17
-rw-r--r--generic/threadshare/examples/standalone/sink/mod.rs21
-rw-r--r--generic/threadshare/examples/standalone/sink/settings.rs115
-rw-r--r--generic/threadshare/examples/standalone/sink/stats.rs270
-rw-r--r--generic/threadshare/examples/standalone/sink/sync_mutex/imp.rs327
-rw-r--r--generic/threadshare/examples/standalone/sink/sync_mutex/mod.rs17
-rw-r--r--generic/threadshare/examples/standalone/sink/task/imp.rs736
-rw-r--r--generic/threadshare/examples/standalone/sink/task/mod.rs6
-rw-r--r--generic/threadshare/examples/standalone/src/imp.rs372
-rw-r--r--generic/threadshare/examples/standalone/src/mod.rs4
16 files changed, 1555 insertions, 949 deletions
diff --git a/generic/threadshare/examples/standalone/args/clap_args.rs b/generic/threadshare/examples/standalone/args/clap_args.rs
new file mode 100644
index 00000000..60291f27
--- /dev/null
+++ b/generic/threadshare/examples/standalone/args/clap_args.rs
@@ -0,0 +1,66 @@
+use super::super::CAT;
+use clap::Parser;
+
+#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, clap::ValueEnum)]
+pub enum Sink {
+ /// Item handling in PadHandler with async Mutex
+ AsyncMutex,
+ /// Item handling in PadHandler with sync Mutex
+ SyncMutex,
+ /// Item handling in runtime::Task
+ Task,
+}
+
+impl Sink {
+ pub fn element_name(self) -> &'static str {
+ use super::super::sink;
+ use Sink::*;
+ match self {
+ AsyncMutex => sink::ASYNC_MUTEX_ELEMENT_NAME,
+ SyncMutex => sink::SYNC_MUTEX_ELEMENT_NAME,
+ Task => sink::TASK_ELEMENT_NAME,
+ }
+ }
+}
+
+#[derive(Parser, Debug)]
+#[clap(version)]
+#[clap(
+ about = "Standalone pipeline threadshare runtime test. Use `GST_DEBUG=ts-standalone*:4` for stats"
+)]
+pub struct Args {
+ /// Parallel streams to process.
+ #[clap(short, long, default_value_t = 5000)]
+ pub streams: u32,
+
+ /// Threadshare groups.
+ #[clap(short, long, default_value_t = 2)]
+ pub groups: u32,
+
+ /// Threadshare Context wait in ms (max throttling duration).
+ #[clap(short, long, default_value_t = 20)]
+ pub wait: u32,
+
+ /// Buffer push period in ms.
+ #[clap(short, long, default_value_t = 20)]
+ pub push_period: u32,
+
+ /// Number of buffers per stream to output before sending EOS (-1 = unlimited).
+ #[clap(short, long, default_value_t = 5000)]
+ pub num_buffers: i32,
+
+ /// The Sink variant to use.
+ #[clap(long, value_enum, default_value_t = Sink::SyncMutex)]
+ pub sink: Sink,
+
+ /// Disables statistics logging.
+ #[clap(short, long)]
+ pub disable_stats_log: bool,
+}
+
+pub fn args() -> Args {
+ let args = Args::parse();
+ gst::info!(CAT, "{:?}", args);
+
+ args
+}
diff --git a/generic/threadshare/examples/standalone/args/default_args.rs b/generic/threadshare/examples/standalone/args/default_args.rs
new file mode 100644
index 00000000..9f84aaf4
--- /dev/null
+++ b/generic/threadshare/examples/standalone/args/default_args.rs
@@ -0,0 +1,47 @@
+use super::super::CAT;
+
+#[derive(Copy, Clone, Debug)]
+pub struct SyncMutexSink;
+
+impl SyncMutexSink {
+ pub fn element_name(self) -> &'static str {
+ super::super::sink::SYNC_MUTEX_ELEMENT_NAME
+ }
+}
+
+#[derive(Debug)]
+pub struct Args {
+ pub streams: u32,
+ pub groups: u32,
+ pub wait: u32,
+ pub push_period: u32,
+ pub num_buffers: i32,
+ pub sink: SyncMutexSink,
+ pub disable_stats_log: bool,
+}
+
+impl Default for Args {
+ fn default() -> Self {
+ Args {
+ streams: 5000,
+ groups: 2,
+ wait: 20,
+ push_period: 20,
+ num_buffers: 5000,
+ sink: SyncMutexSink,
+ disable_stats_log: false,
+ }
+ }
+}
+
+pub fn args() -> Args {
+ if std::env::args().len() > 1 {
+ gst::warning!(CAT, "Ignoring command line arguments");
+ gst::warning!(CAT, "Build with `--features=clap`");
+ }
+
+ let args = Args::default();
+ gst::warning!(CAT, "{:?}", args);
+
+ args
+}
diff --git a/generic/threadshare/examples/standalone/args/mod.rs b/generic/threadshare/examples/standalone/args/mod.rs
new file mode 100644
index 00000000..483b0a26
--- /dev/null
+++ b/generic/threadshare/examples/standalone/args/mod.rs
@@ -0,0 +1,9 @@
+#[cfg(not(feature = "clap"))]
+mod default_args;
+#[cfg(not(feature = "clap"))]
+pub use default_args::*;
+
+#[cfg(feature = "clap")]
+mod clap_args;
+#[cfg(feature = "clap")]
+pub use clap_args::*;
diff --git a/generic/threadshare/examples/standalone/macros.rs b/generic/threadshare/examples/standalone/macros.rs
new file mode 100644
index 00000000..dc6043ef
--- /dev/null
+++ b/generic/threadshare/examples/standalone/macros.rs
@@ -0,0 +1,19 @@
+macro_rules! debug_or_trace {
+ ($cat:expr, $raise_log_level:expr, $qual:ident: $obj:expr, $rest:tt $(,)?) => {
+ if $raise_log_level {
+ gst::debug!($cat, $qual: $obj, $rest);
+ } else {
+ gst::trace!($cat, $qual: $obj, $rest);
+ }
+ };
+}
+
+macro_rules! log_or_trace {
+ ($cat:expr, $raise_log_level:expr, $qual:ident: $obj:expr, $rest:tt $(,)?) => {
+ if $raise_log_level {
+ gst::log!($cat, $qual: $obj, $rest);
+ } else {
+ gst::trace!($cat, $qual: $obj, $rest);
+ }
+ };
+}
diff --git a/generic/threadshare/examples/standalone/main.rs b/generic/threadshare/examples/standalone/main.rs
index 28064331..ff1edc41 100644
--- a/generic/threadshare/examples/standalone/main.rs
+++ b/generic/threadshare/examples/standalone/main.rs
@@ -1,12 +1,21 @@
use gst::glib;
use once_cell::sync::Lazy;
+mod args;
+use args::*;
+
+#[macro_use]
+mod macros;
+
mod sink;
mod src;
+use std::sync::atomic::{AtomicU32, Ordering};
+use std::sync::Arc;
+
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
- "ts-standalone-test-main",
+ "ts-standalone-main",
gst::DebugColorFlags::empty(),
Some("Thread-sharing standalone test main"),
)
@@ -14,6 +23,8 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
src::register(plugin)?;
+ sink::async_mutex::register(plugin)?;
+ sink::sync_mutex::register(plugin)?;
sink::task::register(plugin)?;
Ok(())
@@ -31,91 +42,6 @@ gst::plugin_define!(
env!("BUILD_REL_DATE")
);
-#[cfg(feature = "clap")]
-use clap::Parser;
-
-#[cfg(feature = "clap")]
-#[derive(Parser, Debug)]
-#[clap(version)]
-#[clap(
- about = "Standalone pipeline threadshare runtime test. Use `GST_DEBUG=ts-standalone*:4` for stats"
-)]
-struct Args {
- /// Parallel streams to process.
- #[clap(short, long, default_value_t = 5000)]
- streams: u32,
-
- /// Threadshare groups.
- #[clap(short, long, default_value_t = 2)]
- groups: u32,
-
- /// Threadshare Context wait in ms (max throttling duration).
- #[clap(short, long, default_value_t = 20)]
- wait: u32,
-
- /// Buffer push period in ms.
- #[clap(short, long, default_value_t = 20)]
- push_period: u32,
-
- /// Number of buffers per stream to output before sending EOS (-1 = unlimited).
- #[clap(short, long, default_value_t = 5000)]
- num_buffers: i32,
-
- /// Disables statistics logging.
- #[clap(short, long)]
- disable_stats_log: bool,
-}
-
-#[cfg(not(feature = "clap"))]
-#[derive(Debug)]
-struct Args {
- streams: u32,
- groups: u32,
- wait: u32,
- push_period: u32,
- num_buffers: i32,
- disable_stats_log: bool,
-}
-
-#[cfg(not(feature = "clap"))]
-impl Default for Args {
- fn default() -> Self {
- Args {
- streams: 5000,
- groups: 2,
- wait: 20,
- push_period: 20,
- num_buffers: 5000,
- disable_stats_log: false,
- }
- }
-}
-
-fn args() -> Args {
- #[cfg(feature = "clap")]
- let args = {
- let args = Args::parse();
- gst::info!(CAT, "{:?}", args);
-
- args
- };
-
- #[cfg(not(feature = "clap"))]
- let args = {
- if std::env::args().len() > 1 {
- gst::warning!(CAT, "Ignoring command line arguments");
- gst::warning!(CAT, "Build with `--features=clap`");
- }
-
- let args = Args::default();
- gst::warning!(CAT, "{:?}", args);
-
- args
- };
-
- args
-}
-
fn main() {
use gst::prelude::*;
use std::time::Instant;
@@ -133,8 +59,8 @@ fn main() {
for i in 0..args.streams {
let ctx_name = format!("standalone {}", i % args.groups);
- let src = gst::ElementFactory::make("ts-standalone-test-src")
- .name(format!("src-{}", i).as_str())
+ let src = gst::ElementFactory::make(src::ELEMENT_NAME)
+ .name(format!("src-{i}").as_str())
.property("context", &ctx_name)
.property("context-wait", args.wait)
.property("push-period", args.push_period)
@@ -142,16 +68,16 @@ fn main() {
.build()
.unwrap();
- let sink = gst::ElementFactory::make("ts-standalone-test-sink")
- .name(format!("sink-{}", i).as_str())
+ let sink = gst::ElementFactory::make(args.sink.element_name())
+ .name(format!("sink-{i}").as_str())
.property("context", &ctx_name)
.property("context-wait", args.wait)
.build()
.unwrap();
if i == 0 {
- src.set_property("raise-log-level", true);
- sink.set_property("raise-log-level", true);
+ src.set_property("main-elem", true);
+ sink.set_property("main-elem", true);
if !args.disable_stats_log {
// Don't use the last 5 secs in stats
@@ -179,30 +105,46 @@ fn main() {
let l = glib::MainLoop::new(None, false);
let bus = pipeline.bus().unwrap();
+ let terminated_count = Arc::new(AtomicU32::new(0));
let pipeline_clone = pipeline.clone();
let l_clone = l.clone();
bus.add_watch(move |_, msg| {
use gst::MessageView;
-
match msg.view() {
MessageView::Eos(_) => {
+ // Actually, we don't post EOS (see sinks impl).
gst::info!(CAT, "Received eos");
l_clone.quit();
+
+ glib::Continue(false)
}
- MessageView::Error(err) => {
+ MessageView::Error(msg) => {
+ if let gst::MessageView::Error(msg) = msg.message().view() {
+ if msg.error().matches(gst::LibraryError::Shutdown) {
+ if terminated_count.fetch_add(1, Ordering::SeqCst) == args.streams - 1 {
+ gst::info!(CAT, "Received all shutdown requests");
+ l_clone.quit();
+
+ return glib::Continue(false);
+ } else {
+ return glib::Continue(true);
+ }
+ }
+ }
+
gst::error!(
CAT,
"Error from {:?}: {} ({:?})",
- err.src().map(|s| s.path_string()),
- err.error(),
- err.debug()
+ msg.src().map(|s| s.path_string()),
+ msg.error(),
+ msg.debug()
);
l_clone.quit();
- }
- _ => (),
- };
- glib::Continue(true)
+ glib::Continue(false)
+ }
+ _ => glib::Continue(true),
+ }
})
.expect("Failed to add bus watch");
diff --git a/generic/threadshare/examples/standalone/sink/async_mutex/imp.rs b/generic/threadshare/examples/standalone/sink/async_mutex/imp.rs
new file mode 100644
index 00000000..b909ab48
--- /dev/null
+++ b/generic/threadshare/examples/standalone/sink/async_mutex/imp.rs
@@ -0,0 +1,334 @@
+// Copyright (C) 2022 François Laignel <fengalin@free.fr>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
+// If a copy of the MPL was not distributed with this file, You can obtain one at
+// <https://mozilla.org/MPL/2.0/>.
+//
+// SPDX-License-Identifier: MPL-2.0
+
+use futures::future::BoxFuture;
+use futures::prelude::*;
+
+use gst::glib;
+use gst::prelude::*;
+use gst::subclass::prelude::*;
+use gst::EventView;
+
+use once_cell::sync::Lazy;
+
+use gstthreadshare::runtime::executor::block_on_or_add_sub_task;
+use gstthreadshare::runtime::{prelude::*, PadSink};
+
+use std::sync::{Arc, Mutex};
+use std::time::Duration;
+
+use super::super::{Settings, Stats, CAT};
+
+#[derive(Debug, Default)]
+struct PadSinkHandlerInner {
+ is_flushing: bool,
+ is_main_elem: bool,
+ last_dts: Option<gst::ClockTime>,
+ segment_start: Option<gst::ClockTime>,
+ stats: Option<Box<Stats>>,
+}
+
+impl PadSinkHandlerInner {
+ fn handle_buffer(
+ &mut self,
+ elem: &super::AsyncMutexSink,
+ buffer: gst::Buffer,
+ ) -> Result<(), gst::FlowError> {
+ if self.is_flushing {
+ log_or_trace!(
+ CAT,
+ self.is_main_elem,
+ obj: elem,
+ "Discarding {buffer:?} (flushing)"
+ );
+
+ return Err(gst::FlowError::Flushing);
+ }
+
+ debug_or_trace!(CAT, self.is_main_elem, obj: elem, "Received {buffer:?}");
+
+ let dts = buffer
+ .dts()
+ .expect("Buffer without dts")
+ .checked_sub(self.segment_start.expect("Buffer without Time Segment"))
+ .expect("dts before Segment start");
+
+ if let Some(last_dts) = self.last_dts {
+ let cur_ts = elem.current_running_time().unwrap();
+ let latency: Duration = (cur_ts - dts).into();
+ let interval: Duration = (dts - last_dts).into();
+
+ if let Some(stats) = self.stats.as_mut() {
+ stats.add_buffer(latency, interval);
+ }
+
+ debug_or_trace!(CAT, self.is_main_elem, obj: elem, "o latency {latency:.2?}");
+ debug_or_trace!(
+ CAT,
+ self.is_main_elem,
+ obj: elem,
+ "o interval {interval:.2?}",
+ );
+ }
+
+ self.last_dts = Some(dts);
+
+ log_or_trace!(CAT, self.is_main_elem, obj: elem, "Buffer processed");
+
+ Ok(())
+ }
+}
+
+#[derive(Clone, Debug, Default)]
+struct AsyncPadSinkHandler(Arc<futures::lock::Mutex<PadSinkHandlerInner>>);
+
+impl PadSinkHandler for AsyncPadSinkHandler {
+ type ElementImpl = AsyncMutexSink;
+
+ fn sink_chain(
+ self,
+ _pad: gst::Pad,
+ elem: super::AsyncMutexSink,
+ buffer: gst::Buffer,
+ ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
+ async move {
+ if self.0.lock().await.handle_buffer(&elem, buffer).is_err() {
+ return Err(gst::FlowError::Flushing);
+ }
+
+ Ok(gst::FlowSuccess::Ok)
+ }
+ .boxed()
+ }
+
+ fn sink_event_serialized(
+ self,
+ _pad: gst::Pad,
+ elem: super::AsyncMutexSink,
+ event: gst::Event,
+ ) -> BoxFuture<'static, bool> {
+ async move {
+ match event.view() {
+ EventView::Eos(_) => {
+ {
+ let mut inner = self.0.lock().await;
+ debug_or_trace!(CAT, inner.is_main_elem, obj: elem, "EOS");
+ inner.is_flushing = true;
+ }
+
+ // When each element sends its own EOS message,
+ // it takes ages for the pipeline to process all of them.
+ // Let's just post an error message and let main shuts down
+ // after all streams have posted this message.
+ let _ = elem
+ .post_message(gst::message::Error::new(gst::LibraryError::Shutdown, "EOS"));
+ }
+ EventView::FlushStop(_) => {
+ self.0.lock().await.is_flushing = false;
+ }
+ EventView::Segment(evt) => {
+ if let Some(time_seg) = evt.segment().downcast_ref::<gst::ClockTime>() {
+ self.0.lock().await.segment_start = time_seg.start();
+ }
+ }
+ EventView::SinkMessage(evt) => {
+ let _ = elem.post_message(evt.message());
+ }
+ _ => (),
+ }
+
+ true
+ }
+ .boxed()
+ }
+
+ fn sink_event(self, _pad: &gst::Pad, _imp: &AsyncMutexSink, event: gst::Event) -> bool {
+ if let EventView::FlushStart(..) = event.view() {
+ block_on_or_add_sub_task(async move { self.0.lock().await.is_flushing = true });
+ }
+
+ true
+ }
+}
+
+impl AsyncPadSinkHandler {
+ fn prepare(&self, is_main_elem: bool, stats: Option<Stats>) {
+ futures::executor::block_on(async move {
+ let mut inner = self.0.lock().await;
+ inner.is_main_elem = is_main_elem;
+ inner.stats = stats.map(Box::new);
+ });
+ }
+
+ fn start(&self) {
+ futures::executor::block_on(async move {
+ let mut inner = self.0.lock().await;
+
+ inner.is_flushing = false;
+ inner.last_dts = None;
+
+ if let Some(stats) = inner.stats.as_mut() {
+ stats.start();
+ }
+ });
+ }
+
+ fn stop(&self) {
+ futures::executor::block_on(async move {
+ let mut inner = self.0.lock().await;
+ inner.is_flushing = true;
+ });
+ }
+}
+
+#[derive(Debug)]
+pub struct AsyncMutexSink {
+ sink_pad: PadSink,
+ sink_pad_handler: AsyncPadSinkHandler,
+ settings: Mutex<Settings>,
+}
+
+impl AsyncMutexSink {
+ fn prepare(&self) -> Result<(), gst::ErrorMessage> {
+ let settings = self.settings.lock().unwrap();
+ debug_or_trace!(CAT, settings.is_main_elem, imp: self, "Preparing");
+ let stats = if settings.logs_stats {
+ Some(Stats::new(
+ settings.max_buffers,
+ settings.push_period + settings.context_wait / 2,
+ ))
+ } else {
+ None
+ };
+
+ self.sink_pad_handler.prepare(settings.is_main_elem, stats);
+ debug_or_trace!(CAT, settings.is_main_elem, imp: self, "Prepared");
+
+ Ok(())
+ }
+
+ fn stop(&self) -> Result<(), gst::ErrorMessage> {
+ let is_main_elem = self.settings.lock().unwrap().is_main_elem;
+ debug_or_trace!(CAT, is_main_elem, imp: self, "Stopping");
+ self.sink_pad_handler.stop();
+ debug_or_trace!(CAT, is_main_elem, imp: self, "Stopped");
+
+ Ok(())
+ }
+
+ fn start(&self) -> Result<(), gst::ErrorMessage> {
+ let is_main_elem = self.settings.lock().unwrap().is_main_elem;
+ debug_or_trace!(CAT, is_main_elem, imp: self, "Starting");
+ self.sink_pad_handler.start();
+ debug_or_trace!(CAT, is_main_elem, imp: self, "Started");
+
+ Ok(())
+ }
+}
+
+#[glib::object_subclass]
+impl ObjectSubclass for AsyncMutexSink {
+ const NAME: &'static str = "TsStandaloneAsyncMutexSink";
+ type Type = super::AsyncMutexSink;
+ type ParentType = gst::Element;
+
+ fn with_class(klass: &Self::Class) -> Self {
+ let sink_pad_handler = AsyncPadSinkHandler::default();
+ Self {
+ sink_pad: PadSink::new(
+ gst::Pad::from_template(&klass.pad_template("sink").unwrap(), Some("sink")),
+ sink_pad_handler.clone(),
+ ),
+ sink_pad_handler,
+ settings: Default::default(),
+ }
+ }
+}
+
+impl ObjectImpl for AsyncMutexSink {
+ fn properties() -> &'static [glib::ParamSpec] {
+ static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(Settings::properties);
+ PROPERTIES.as_ref()
+ }
+
+ fn set_property(&self, id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
+ self.settings.lock().unwrap().set_property(id, value, pspec);
+ }
+
+ fn property(&self, id: usize, pspec: &glib::ParamSpec) -> glib::Value {
+ self.settings.lock().unwrap().property(id, pspec)
+ }
+
+ fn constructed(&self) {
+ self.parent_constructed();
+
+ let obj = self.obj();
+ obj.add_pad(self.sink_pad.gst_pad()).unwrap();
+ obj.set_element_flags(gst::ElementFlags::SINK);
+ }
+}
+
+impl GstObjectImpl for AsyncMutexSink {}
+
+impl ElementImpl for AsyncMutexSink {
+ fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
+ static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
+ gst::subclass::ElementMetadata::new(
+ "Thread-sharing standalone test async mutex sink",
+ "Sink/Test",
+ "Thread-sharing standalone test async mutex sink",
+ "François Laignel <fengalin@free.fr>",
+ )
+ });
+
+ Some(&*ELEMENT_METADATA)
+ }
+
+ fn pad_templates() -> &'static [gst::PadTemplate] {
+ static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
+ let caps = gst::Caps::new_any();
+
+ let sink_pad_template = gst::PadTemplate::new(
+ "sink",
+ gst::PadDirection::Sink,
+ gst::PadPresence::Always,
+ &caps,
+ )
+ .unwrap();
+
+ vec![sink_pad_template]
+ });
+
+ PAD_TEMPLATES.as_ref()
+ }
+
+ fn change_state(
+ &self,
+ transition: gst::StateChange,
+ ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
+ gst::trace!(CAT, imp: self, "Changing state {transition:?}");
+
+ match transition {
+ gst::StateChange::NullToReady => {
+ self.prepare().map_err(|err| {
+ self.post_error_message(err);
+ gst::StateChangeError
+ })?;
+ }
+ gst::StateChange::ReadyToPaused => {
+ self.start().map_err(|_| gst::StateChangeError)?;
+ }
+ gst::StateChange::PausedToReady => {
+ self.stop().map_err(|_| gst::StateChangeError)?;
+ }
+ _ => (),
+ }
+
+ self.parent_change_state(transition)
+ }
+}
diff --git a/generic/threadshare/examples/standalone/sink/async_mutex/mod.rs b/generic/threadshare/examples/standalone/sink/async_mutex/mod.rs
new file mode 100644
index 00000000..8c711d33
--- /dev/null
+++ b/generic/threadshare/examples/standalone/sink/async_mutex/mod.rs
@@ -0,0 +1,17 @@
+use gst::glib;
+use gst::prelude::*;
+
+mod imp;
+
+glib::wrapper! {
+ pub struct AsyncMutexSink(ObjectSubclass<imp::AsyncMutexSink>) @extends gst::Element, gst::Object;
+}
+
+pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ gst::Element::register(
+ Some(plugin),
+ super::ASYNC_MUTEX_ELEMENT_NAME,
+ gst::Rank::None,
+ AsyncMutexSink::static_type(),
+ )
+}
diff --git a/generic/threadshare/examples/standalone/sink/mod.rs b/generic/threadshare/examples/standalone/sink/mod.rs
index cdafe4ad..bf9b7aae 100644
--- a/generic/threadshare/examples/standalone/sink/mod.rs
+++ b/generic/threadshare/examples/standalone/sink/mod.rs
@@ -1 +1,22 @@
+pub mod async_mutex;
+pub mod sync_mutex;
pub mod task;
+
+mod settings;
+pub use settings::Settings;
+
+mod stats;
+pub use stats::Stats;
+
+pub const ASYNC_MUTEX_ELEMENT_NAME: &str = "ts-standalone-async-mutex-sink";
+pub const SYNC_MUTEX_ELEMENT_NAME: &str = "ts-standalone-sync-mutex-sink";
+pub const TASK_ELEMENT_NAME: &str = "ts-standalone-task-sink";
+
+use once_cell::sync::Lazy;
+static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
+ gst::DebugCategory::new(
+ "ts-standalone-sink",
+ gst::DebugColorFlags::empty(),
+ Some("Thread-sharing standalone test sink"),
+ )
+});
diff --git a/generic/threadshare/examples/standalone/sink/settings.rs b/generic/threadshare/examples/standalone/sink/settings.rs
new file mode 100644
index 00000000..cc04187d
--- /dev/null
+++ b/generic/threadshare/examples/standalone/sink/settings.rs
@@ -0,0 +1,115 @@
+use gst::glib;
+use gst::prelude::*;
+
+use std::time::Duration;
+
+const DEFAULT_CONTEXT: &str = "";
+const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_millis(20);
+const DEFAULT_PUSH_PERIOD: Duration = Duration::from_millis(20);
+const DEFAULT_MAX_BUFFERS: i32 = 50 * (100 - 25);
+
+#[derive(Debug, Clone)]
+pub struct Settings {
+ pub context: String,
+ pub context_wait: Duration,
+ pub is_main_elem: bool,
+ pub logs_stats: bool,
+ pub push_period: Duration,
+ pub max_buffers: Option<u32>,
+}
+
+impl Default for Settings {
+ fn default() -> Self {
+ Settings {
+ context: DEFAULT_CONTEXT.into(),
+ context_wait: DEFAULT_CONTEXT_WAIT,
+ is_main_elem: false,
+ logs_stats: false,
+ push_period: DEFAULT_PUSH_PERIOD,
+ max_buffers: Some(DEFAULT_MAX_BUFFERS as u32),
+ }
+ }
+}
+
+impl Settings {
+ pub fn properties() -> Vec<glib::ParamSpec> {
+ vec![
+ glib::ParamSpecString::builder("context")
+ .nick("Context")
+ .blurb("Context name to share threads with")
+ .default_value(Some(DEFAULT_CONTEXT))
+ .build(),
+ glib::ParamSpecUInt::builder("context-wait")
+ .nick("Context Wait")
+ .blurb("Throttle poll loop to run at most once every this many ms")
+ .maximum(1000)
+ .default_value(DEFAULT_CONTEXT_WAIT.as_millis() as u32)
+ .build(),
+ glib::ParamSpecBoolean::builder("main-elem")
+ .nick("Main Element")
+ .blurb("Declare this element as the main one")
+ .write_only()
+ .build(),
+ glib::ParamSpecBoolean::builder("logs-stats")
+ .nick("Logs Stats")
+ .blurb("Whether statistics should be logged")
+ .write_only()
+ .build(),
+ glib::ParamSpecUInt::builder("push-period")
+ .nick("Src buffer Push Period")
+ .blurb("Push period used by `src` element (used for stats warnings)")
+ .default_value(DEFAULT_PUSH_PERIOD.as_millis() as u32)
+ .build(),
+ glib::ParamSpecInt::builder("max-buffers")
+ .nick("Max Buffers")
+ .blurb("Number of buffers to count before stopping stats (-1 = unlimited)")
+ .minimum(-1i32)
+ .default_value(DEFAULT_MAX_BUFFERS)
+ .build(),
+ ]
+ }
+
+ pub fn set_property(&mut self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
+ match pspec.name() {
+ "context" => {
+ self.context = value
+ .get::<Option<String>>()
+ .unwrap()
+ .unwrap_or_else(|| DEFAULT_CONTEXT.into());
+ }
+ "context-wait" => {
+ self.context_wait = Duration::from_millis(value.get::<u32>().unwrap().into());
+ }
+ "main-elem" => {
+ self.is_main_elem = value.get::<bool>().unwrap();
+ }
+ "logs-stats" => {
+ let logs_stats = value.get().unwrap();
+ self.logs_stats = logs_stats;
+ }
+ "push-period" => {
+ self.push_period = Duration::from_millis(value.get::<u32>().unwrap().into());
+ }
+ "max-buffers" => {
+ let value = value.get::<i32>().unwrap();
+ self.max_buffers = if value > 0 { Some(value as u32) } else { None };
+ }
+ _ => unimplemented!(),
+ }
+ }
+
+ pub fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
+ match pspec.name() {
+ "context" => self.context.to_value(),
+ "context-wait" => (self.context_wait.as_millis() as u32).to_value(),
+ "main-elem" => self.is_main_elem.to_value(),
+ "push-period" => (self.push_period.as_millis() as u32).to_value(),
+ "max-buffers" => self
+ .max_buffers
+ .and_then(|val| val.try_into().ok())
+ .unwrap_or(-1i32)
+ .to_value(),
+ _ => unimplemented!(),
+ }
+ }
+}
diff --git a/generic/threadshare/examples/standalone/sink/stats.rs b/generic/threadshare/examples/standalone/sink/stats.rs
new file mode 100644
index 00000000..69370d9c
--- /dev/null
+++ b/generic/threadshare/examples/standalone/sink/stats.rs
@@ -0,0 +1,270 @@
+use gst::prelude::*;
+use std::time::{Duration, Instant};
+
+#[cfg(feature = "tuning")]
+use gstthreadshare::runtime::Context;
+
+use super::CAT;
+
+const LOG_PERIOD: Duration = Duration::from_secs(20);
+
+#[derive(Debug, Default)]
+pub struct Stats {
+ ramp_up_instant: Option<Instant>,
+ log_start_instant: Option<Instant>,
+ last_delta_instant: Option<Instant>,
+ max_buffers: Option<f32>,
+ buffer_count: f32,
+ buffer_count_delta: f32,
+ latency_sum: f32,
+ latency_square_sum: f32,
+ latency_sum_delta: f32,
+ latency_square_sum_delta: f32,
+ latency_min: Duration,
+ latency_min_delta: Duration,
+ latency_max: Duration,
+ latency_max_delta: Duration,
+ interval_sum: f32,
+ interval_square_sum: f32,
+ interval_sum_delta: f32,
+ interval_square_sum_delta: f32,
+ interval_min: Duration,
+ interval_min_delta: Duration,
+ interval_max: Duration,
+ interval_max_delta: Duration,
+ interval_late_warn: Duration,
+ interval_late_count: f32,
+ interval_late_count_delta: f32,
+ #[cfg(feature = "tuning")]
+ parked_duration_init: Duration,
+}
+
+impl Stats {
+ pub fn new(max_buffers: Option<u32>, interval_late_warn: Duration) -> Self {
+ Stats {
+ max_buffers: max_buffers.map(|max_buffers| max_buffers as f32),
+ interval_late_warn,
+ ..Default::default()
+ }
+ }
+
+ pub fn start(&mut self) {
+ self.buffer_count = 0.0;
+ self.buffer_count_delta = 0.0;
+ self.latency_sum = 0.0;
+ self.latency_square_sum = 0.0;
+ self.latency_sum_delta = 0.0;
+ self.latency_square_sum_delta = 0.0;
+ self.latency_min = Duration::MAX;
+ self.latency_min_delta = Duration::MAX;
+ self.latency_max = Duration::ZERO;
+ self.latency_max_delta = Duration::ZERO;
+ self.interval_sum = 0.0;
+ self.interval_square_sum = 0.0;
+ self.interval_sum_delta = 0.0;
+ self.interval_square_sum_delta = 0.0;
+ self.interval_min = Duration::MAX;
+ self.interval_min_delta = Duration::MAX;
+ self.interval_max = Duration::ZERO;
+ self.interval_max_delta = Duration::ZERO;
+ self.interval_late_count = 0.0;
+ self.interval_late_count_delta = 0.0;
+ self.last_delta_instant = None;
+ self.log_start_instant = None;
+
+ self.ramp_up_instant = Some(Instant::now());
+ gst::info!(CAT, "First stats logs in {:2?}", 2 * LOG_PERIOD);
+ }
+
+ pub fn is_active(&mut self) -> bool {
+ if let Some(ramp_up_instant) = self.ramp_up_instant {
+ if ramp_up_instant.elapsed() < LOG_PERIOD {
+ return false;
+ }
+
+ self.ramp_up_instant = None;
+ gst::info!(CAT, "Ramp up complete. Stats logs in {:2?}", LOG_PERIOD);
+ self.log_start_instant = Some(Instant::now());
+ self.last_delta_instant = self.log_start_instant;
+
+ #[cfg(feature = "tuning")]
+ {
+ self.parked_duration_init = Context::current().unwrap().parked_duration();
+ }
+ }
+
+ use std::cmp::Ordering::*;
+ match self.max_buffers.opt_cmp(self.buffer_count) {
+ Some(Equal) => {
+ self.log_global();
+ self.buffer_count += 1.0;
+ false
+ }
+ Some(Less) => false,
+ _ => true,
+ }
+ }
+
+ pub fn add_buffer(&mut self, latency: Duration, interval: Duration) {
+ if !self.is_active() {
+ return;
+ }
+
+ self.buffer_count += 1.0;
+ self.buffer_count_delta += 1.0;
+
+ // Latency
+ let latency_f32 = latency.as_nanos() as f32;
+ let latency_square = latency_f32.powi(2);
+
+ self.latency_sum += latency_f32;
+ self.latency_square_sum += latency_square;
+ self.latency_min = self.latency_min.min(latency);
+ self.latency_max = self.latency_max.max(latency);
+
+ self.latency_sum_delta += latency_f32;
+ self.latency_square_sum_delta += latency_square;
+ self.latency_min_delta = self.latency_min_delta.min(latency);
+ self.latency_max_delta = self.latency_max_delta.max(latency);
+
+ // Interval
+ let interval_f32 = interval.as_nanos() as f32;
+ let interval_square = interval_f32.powi(2);
+
+ self.interval_sum += interval_f32;
+ self.interval_square_sum += interval_square;
+ self.interval_min = self.interval_min.min(interval);
+ self.interval_max = self.interval_max.max(interval);
+
+ self.interval_sum_delta += interval_f32;
+ self.interval_square_sum_delta += interval_square;
+ self.interval_min_delta = self.interval_min_delta.min(interval);
+ self.interval_max_delta = self.interval_max_delta.max(interval);
+
+ if interval > self.interval_late_warn {
+ self.interval_late_count += 1.0;
+ self.interval_late_count_delta += 1.0;
+ }
+
+ let delta_duration = match self.last_delta_instant {
+ Some(last_delta) => last_delta.elapsed(),
+ None => return,
+ };
+
+ if delta_duration < LOG_PERIOD {
+ return;
+ }
+
+ self.last_delta_instant = Some(Instant::now());
+
+ gst::info!(CAT, "Delta stats:");
+ let interval_mean = self.interval_sum_delta / self.buffer_count_delta;
+ let interval_std_dev = f32::sqrt(
+ self.interval_square_sum_delta / self.buffer_count_delta - interval_mean.powi(2),
+ );
+
+ gst::info!(
+ CAT,
+ "o interval: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
+ Duration::from_nanos(interval_mean as u64),
+ Duration::from_nanos(interval_std_dev as u64),
+ self.interval_min_delta,
+ self.interval_max_delta,
+ );
+
+ if self.interval_late_count_delta > f32::EPSILON {
+ gst::warning!(
+ CAT,
+ "o {:5.2}% late buffers",
+ 100f32 * self.interval_late_count_delta / self.buffer_count_delta
+ );
+ }
+
+ self.interval_sum_delta = 0.0;
+ self.interval_square_sum_delta = 0.0;
+ self.interval_min_delta = Duration::MAX;
+ self.interval_max_delta = Duration::ZERO;
+ self.interval_late_count_delta = 0.0;
+
+ let latency_mean = self.latency_sum_delta / self.buffer_count_delta;
+ let latency_std_dev = f32::sqrt(
+ self.latency_square_sum_delta / self.buffer_count_delta - latency_mean.powi(2),
+ );
+
+ gst::info!(
+ CAT,
+ "o latency: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
+ Duration::from_nanos(latency_mean as u64),
+ Duration::from_nanos(latency_std_dev as u64),
+ self.latency_min_delta,
+ self.latency_max_delta,
+ );
+
+ self.latency_sum_delta = 0.0;
+ self.latency_square_sum_delta = 0.0;
+ self.latency_min_delta = Duration::MAX;
+ self.latency_max_delta = Duration::ZERO;
+
+ self.buffer_count_delta = 0.0;
+ }
+
+ pub fn log_global(&mut self) {
+ if self.buffer_count < 1.0 {
+ return;
+ }
+
+ let _log_start = if let Some(log_start) = self.log_start_instant {
+ log_start
+ } else {
+ return;
+ };
+
+ gst::info!(CAT, "Global stats:");
+
+ #[cfg(feature = "tuning")]
+ {
+ let duration = _log_start.elapsed();
+ let parked_duration =
+ Context::current().unwrap().parked_duration() - self.parked_duration_init;
+ gst::info!(
+ CAT,
+ "o parked: {parked_duration:4.2?} ({:5.2?}%)",
+ (parked_duration.as_nanos() as f32 * 100.0 / duration.as_nanos() as f32)
+ );
+ }
+
+ let interval_mean = self.interval_sum / self.buffer_count;
+ let interval_std_dev =
+ f32::sqrt(self.interval_square_sum / self.buffer_count - interval_mean.powi(2));
+
+ gst::info!(
+ CAT,
+ "o interval: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
+ Duration::from_nanos(interval_mean as u64),
+ Duration::from_nanos(interval_std_dev as u64),
+ self.interval_min,
+ self.interval_max,
+ );
+
+ if self.interval_late_count > f32::EPSILON {
+ gst::warning!(
+ CAT,
+ "o {:5.2}% late buffers",
+ 100f32 * self.interval_late_count / self.buffer_count
+ );
+ }
+
+ let latency_mean = self.latency_sum / self.buffer_count;
+ let latency_std_dev =
+ f32::sqrt(self.latency_square_sum / self.buffer_count - latency_mean.powi(2));
+
+ gst::info!(
+ CAT,
+ "o latency: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
+ Duration::from_nanos(latency_mean as u64),
+ Duration::from_nanos(latency_std_dev as u64),
+ self.latency_min,
+ self.latency_max,
+ );
+ }
+}
diff --git a/generic/threadshare/examples/standalone/sink/sync_mutex/imp.rs b/generic/threadshare/examples/standalone/sink/sync_mutex/imp.rs
new file mode 100644
index 00000000..bef18db1
--- /dev/null
+++ b/generic/threadshare/examples/standalone/sink/sync_mutex/imp.rs
@@ -0,0 +1,327 @@
+// Copyright (C) 2022 François Laignel <fengalin@free.fr>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
+// If a copy of the MPL was not distributed with this file, You can obtain one at
+// <https://mozilla.org/MPL/2.0/>.
+//
+// SPDX-License-Identifier: MPL-2.0
+
+use futures::future::BoxFuture;
+use futures::prelude::*;
+
+use gst::glib;
+use gst::prelude::*;
+use gst::subclass::prelude::*;
+use gst::EventView;
+
+use once_cell::sync::Lazy;
+
+use gstthreadshare::runtime::{prelude::*, PadSink};
+
+use std::sync::{Arc, Mutex};
+use std::time::Duration;
+
+use super::super::{Settings, Stats, CAT};
+
+#[derive(Debug, Default)]
+struct PadSinkHandlerInner {
+ is_flushing: bool,
+ is_main_elem: bool,
+ last_dts: Option<gst::ClockTime>,
+ segment_start: Option<gst::ClockTime>,
+ stats: Option<Box<Stats>>,
+}
+
+impl PadSinkHandlerInner {
+ fn handle_buffer(
+ &mut self,
+ elem: &super::DirectSink,
+ buffer: gst::Buffer,
+ ) -> Result<(), gst::FlowError> {
+ if self.is_flushing {
+ log_or_trace!(
+ CAT,
+ self.is_main_elem,
+ obj: elem,
+ "Discarding {buffer:?} (flushing)"
+ );
+
+ return Err(gst::FlowError::Flushing);
+ }
+
+ debug_or_trace!(CAT, self.is_main_elem, obj: elem, "Received {buffer:?}");
+
+ let dts = buffer
+ .dts()
+ .expect("Buffer without dts")
+ .checked_sub(self.segment_start.expect("Buffer without Time Segment"))
+ .expect("dts before Segment start");
+
+ if let Some(last_dts) = self.last_dts {
+ let cur_ts = elem.current_running_time().unwrap();
+ let latency: Duration = (cur_ts - dts).into();
+ let interval: Duration = (dts - last_dts).into();
+
+ if let Some(stats) = self.stats.as_mut() {
+ stats.add_buffer(latency, interval);
+ }
+
+ debug_or_trace!(CAT, self.is_main_elem, obj: elem, "o latency {latency:.2?}");
+ debug_or_trace!(
+ CAT,
+ self.is_main_elem,
+ obj: elem,
+ "o interval {interval:.2?}",
+ );
+ }
+
+ self.last_dts = Some(dts);
+
+ log_or_trace!(CAT, self.is_main_elem, obj: elem, "Buffer processed");
+
+ Ok(())
+ }
+}
+
+#[derive(Clone, Debug, Default)]
+struct SyncPadSinkHandler(Arc<Mutex<PadSinkHandlerInner>>);
+
+impl PadSinkHandler for SyncPadSinkHandler {
+ type ElementImpl = DirectSink;
+
+ fn sink_chain(
+ self,
+ _pad: gst::Pad,
+ elem: super::DirectSink,
+ buffer: gst::Buffer,
+ ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
+ async move {
+ if self.0.lock().unwrap().handle_buffer(&elem, buffer).is_err() {
+ return Err(gst::FlowError::Flushing);
+ }
+
+ Ok(gst::FlowSuccess::Ok)
+ }
+ .boxed()
+ }
+
+ fn sink_event_serialized(
+ self,
+ _pad: gst::Pad,
+ elem: super::DirectSink,
+ event: gst::Event,
+ ) -> BoxFuture<'static, bool> {
+ async move {
+ match event.view() {
+ EventView::Eos(_) => {
+ {
+ let mut inner = self.0.lock().unwrap();
+ debug_or_trace!(CAT, inner.is_main_elem, obj: elem, "EOS");
+ inner.is_flushing = true;
+ }
+
+ // When each element sends its own EOS message,
+ // it takes ages for the pipeline to process all of them.
+ // Let's just post an error message and let main shuts down
+ // after all streams have posted this message.
+ let _ = elem
+ .post_message(gst::message::Error::new(gst::LibraryError::Shutdown, "EOS"));
+ }
+ EventView::FlushStop(_) => {
+ self.0.lock().unwrap().is_flushing = false;
+ }
+ EventView::Segment(evt) => {
+ if let Some(time_seg) = evt.segment().downcast_ref::<gst::ClockTime>() {
+ self.0.lock().unwrap().segment_start = time_seg.start();
+ }
+ }
+ EventView::SinkMessage(evt) => {
+ let _ = elem.post_message(evt.message());
+ }
+ _ => (),
+ }
+
+ true
+ }
+ .boxed()
+ }
+
+ fn sink_event(self, _pad: &gst::Pad, _imp: &DirectSink, event: gst::Event) -> bool {
+ if let EventView::FlushStart(..) = event.view() {
+ self.0.lock().unwrap().is_flushing = true;
+ }
+
+ true
+ }
+}
+
+impl SyncPadSinkHandler {
+ fn prepare(&self, is_main_elem: bool, stats: Option<Stats>) {
+ let mut inner = self.0.lock().unwrap();
+ inner.is_main_elem = is_main_elem;
+ inner.stats = stats.map(Box::new);
+ }
+
+ fn start(&self) {
+ let mut inner = self.0.lock().unwrap();
+
+ inner.is_flushing = false;
+ inner.last_dts = None;
+
+ if let Some(stats) = inner.stats.as_mut() {
+ stats.start();
+ }
+ }
+
+ fn stop(&self) {
+ let mut inner = self.0.lock().unwrap();
+ inner.is_flushing = true;
+ }
+}
+
+#[derive(Debug)]
+pub struct DirectSink {
+ sink_pad: PadSink,
+ sink_pad_handler: SyncPadSinkHandler,
+ settings: Mutex<Settings>,
+}
+
+impl DirectSink {
+ fn prepare(&self) -> Result<(), gst::ErrorMessage> {
+ let settings = self.settings.lock().unwrap();
+ debug_or_trace!(CAT, settings.is_main_elem, imp: self, "Preparing");
+ let stats = if settings.logs_stats {
+ Some(Stats::new(
+ settings.max_buffers,
+ settings.push_period + settings.context_wait / 2,
+ ))
+ } else {
+ None
+ };
+
+ self.sink_pad_handler.prepare(settings.is_main_elem, stats);
+ debug_or_trace!(CAT, settings.is_main_elem, imp: self, "Prepared");
+
+ Ok(())
+ }
+
+ fn stop(&self) -> Result<(), gst::ErrorMessage> {
+ let is_main_elem = self.settings.lock().unwrap().is_main_elem;
+ debug_or_trace!(CAT, is_main_elem, imp: self, "Stopping");
+ self.sink_pad_handler.stop();
+ debug_or_trace!(CAT, is_main_elem, imp: self, "Stopped");
+
+ Ok(())
+ }
+
+ fn start(&self) -> Result<(), gst::ErrorMessage> {
+ let is_main_elem = self.settings.lock().unwrap().is_main_elem;
+ debug_or_trace!(CAT, is_main_elem, imp: self, "Starting");
+ self.sink_pad_handler.start();
+ debug_or_trace!(CAT, is_main_elem, imp: self, "Started");
+
+ Ok(())
+ }
+}
+
+#[glib::object_subclass]
+impl ObjectSubclass for DirectSink {
+ const NAME: &'static str = "TsStandaloneDirectSink";
+ type Type = super::DirectSink;
+ type ParentType = gst::Element;
+
+ fn with_class(klass: &Self::Class) -> Self {
+ let sink_pad_handler = SyncPadSinkHandler::default();
+ Self {
+ sink_pad: PadSink::new(
+ gst::Pad::from_template(&klass.pad_template("sink").unwrap(), Some("sink")),
+ sink_pad_handler.clone(),
+ ),
+ sink_pad_handler,
+ settings: Default::default(),
+ }
+ }
+}
+
+impl ObjectImpl for DirectSink {
+ fn properties() -> &'static [glib::ParamSpec] {
+ static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(Settings::properties);
+ PROPERTIES.as_ref()
+ }
+
+ fn set_property(&self, id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
+ self.settings.lock().unwrap().set_property(id, value, pspec);
+ }
+
+ fn property(&self, id: usize, pspec: &glib::ParamSpec) -> glib::Value {
+ self.settings.lock().unwrap().property(id, pspec)
+ }
+
+ fn constructed(&self) {
+ self.parent_constructed();
+
+ let obj = self.obj();
+ obj.add_pad(self.sink_pad.gst_pad()).unwrap();
+ obj.set_element_flags(gst::ElementFlags::SINK);
+ }
+}
+
+impl GstObjectImpl for DirectSink {}
+
+impl ElementImpl for DirectSink {
+ fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
+ static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
+ gst::subclass::ElementMetadata::new(
+ "Thread-sharing standalone test direct sink",
+ "Sink/Test",
+ "Thread-sharing standalone test direct sink",
+ "François Laignel <fengalin@free.fr>",
+ )
+ });
+
+ Some(&*ELEMENT_METADATA)
+ }
+
+ fn pad_templates() -> &'static [gst::PadTemplate] {
+ static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
+ let caps = gst::Caps::new_any();
+
+ let sink_pad_template = gst::PadTemplate::new(
+ "sink",
+ gst::PadDirection::Sink,
+ gst::PadPresence::Always,
+ &caps,
+ )
+ .unwrap();
+
+ vec![sink_pad_template]
+ });
+
+ PAD_TEMPLATES.as_ref()
+ }
+
+ fn change_state(
+ &self,
+ transition: gst::StateChange,
+ ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
+ gst::trace!(CAT, imp: self, "Changing state {transition:?}");
+
+ match transition {
+ gst::StateChange::NullToReady => {
+ self.prepare().map_err(|err| {
+ self.post_error_message(err);
+ gst::StateChangeError
+ })?;
+ }
+ gst::StateChange::ReadyToPaused => {
+ self.start().map_err(|_| gst::StateChangeError)?;
+ }
+ gst::StateChange::PausedToReady => {
+ self.stop().map_err(|_| gst::StateChangeError)?;
+ }
+ _ => (),
+ }
+
+ self.parent_change_state(transition)
+ }
+}
diff --git a/generic/threadshare/examples/standalone/sink/sync_mutex/mod.rs b/generic/threadshare/examples/standalone/sink/sync_mutex/mod.rs
new file mode 100644
index 00000000..c3bfb4a0
--- /dev/null
+++ b/generic/threadshare/examples/standalone/sink/sync_mutex/mod.rs
@@ -0,0 +1,17 @@
+use gst::glib;
+use gst::prelude::*;
+
+mod imp;
+
+glib::wrapper! {
+ pub struct DirectSink(ObjectSubclass<imp::DirectSink>) @extends gst::Element, gst::Object;
+}
+
+pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ gst::Element::register(
+ Some(plugin),
+ super::SYNC_MUTEX_ELEMENT_NAME,
+ gst::Rank::None,
+ DirectSink::static_type(),
+ )
+}
diff --git a/generic/threadshare/examples/standalone/sink/task/imp.rs b/generic/threadshare/examples/standalone/sink/task/imp.rs
index dae30931..3979a0e3 100644
--- a/generic/threadshare/examples/standalone/sink/task/imp.rs
+++ b/generic/threadshare/examples/standalone/sink/task/imp.rs
@@ -8,7 +8,6 @@
use futures::future::BoxFuture;
use futures::prelude::*;
-use futures::stream::Peekable;
use gst::error_msg;
use gst::glib;
@@ -22,46 +21,9 @@ use gstthreadshare::runtime::prelude::*;
use gstthreadshare::runtime::{Context, PadSink, Task};
use std::sync::Mutex;
-use std::task::Poll;
-use std::time::{Duration, Instant};
-
-static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
- gst::DebugCategory::new(
- "ts-standalone-test-sink",
- gst::DebugColorFlags::empty(),
- Some("Thread-sharing standalone test sink"),
- )
-});
-
-const DEFAULT_CONTEXT: &str = "";
-const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_millis(20);
-const DEFAULT_PUSH_PERIOD: Duration = Duration::from_millis(20);
-const DEFAULT_MAX_BUFFERS: i32 = 50 * (100 - 25);
-
-const LOG_PERIOD: Duration = Duration::from_secs(20);
-
-#[derive(Debug, Clone)]
-struct Settings {
- context: String,
- context_wait: Duration,
- raise_log_level: bool,
- logs_stats: bool,
- push_period: Duration,
- max_buffers: Option<u32>,
-}
+use std::time::Duration;
-impl Default for Settings {
- fn default() -> Self {
- Settings {
- context: DEFAULT_CONTEXT.into(),
- context_wait: DEFAULT_CONTEXT_WAIT,
- raise_log_level: false,
- logs_stats: false,
- push_period: DEFAULT_PUSH_PERIOD,
- max_buffers: Some(DEFAULT_MAX_BUFFERS as u32),
- }
- }
-}
+use super::super::{Settings, Stats, CAT};
#[derive(Debug)]
enum StreamItem {
@@ -70,21 +32,20 @@ enum StreamItem {
}
#[derive(Clone, Debug)]
-struct TestSinkPadHandler;
+struct TaskPadSinkHandler;
-impl PadSinkHandler for TestSinkPadHandler {
- type ElementImpl = TestSink;
+impl PadSinkHandler for TaskPadSinkHandler {
+ type ElementImpl = TaskSink;
fn sink_chain(
self,
_pad: gst::Pad,
- elem: super::TestSink,
+ elem: super::TaskSink,
buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
let sender = elem.imp().clone_item_sender();
async move {
if sender.send_async(StreamItem::Buffer(buffer)).await.is_err() {
- gst::debug!(CAT, obj: elem, "Flushing");
return Err(gst::FlowError::Flushing);
}
@@ -93,39 +54,37 @@ impl PadSinkHandler for TestSinkPadHandler {
.boxed()
}
- fn sink_chain_list(
- self,
- _pad: gst::Pad,
- elem: super::TestSink,
- list: gst::BufferList,
- ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
- let sender = elem.imp().clone_item_sender();
- async move {
- for buffer in list.iter_owned() {
- if sender.send_async(StreamItem::Buffer(buffer)).await.is_err() {
- gst::debug!(CAT, obj: elem, "Flushing");
- return Err(gst::FlowError::Flushing);
- }
- }
-
- Ok(gst::FlowSuccess::Ok)
- }
- .boxed()
- }
-
fn sink_event_serialized(
self,
_pad: gst::Pad,
- elem: super::TestSink,
+ elem: super::TaskSink,
event: gst::Event,
) -> BoxFuture<'static, bool> {
let sender = elem.imp().clone_item_sender();
async move {
- if let EventView::FlushStop(_) = event.view() {
- let imp = elem.imp();
- return imp.task.flush_stop().await_maybe_on_context().is_ok();
- } else if sender.send_async(StreamItem::Event(event)).await.is_err() {
- gst::debug!(CAT, obj: elem, "Flushing");
+ match event.view() {
+ EventView::Segment(_) => {
+ let _ = sender.send_async(StreamItem::Event(event)).await;
+ }
+ EventView::Eos(_) => {
+ let is_main_elem = elem.imp().settings.lock().unwrap().is_main_elem;
+ debug_or_trace!(CAT, is_main_elem, obj: elem, "EOS");
+
+ // When each element sends its own EOS message,
+ // it takes ages for the pipeline to process all of them.
+ // Let's just post an error message and let main shuts down
+ // after all streams have posted this message.
+ let _ = elem
+ .post_message(gst::message::Error::new(gst::LibraryError::Shutdown, "EOS"));
+ }
+ EventView::FlushStop(_) => {
+ let imp = elem.imp();
+ return imp.task.flush_stop().await_maybe_on_context().is_ok();
+ }
+ EventView::SinkMessage(evt) => {
+ let _ = elem.post_message(evt.message());
+ }
+ _ => (),
}
true
@@ -133,7 +92,7 @@ impl PadSinkHandler for TestSinkPadHandler {
.boxed()
}
- fn sink_event(self, _pad: &gst::Pad, imp: &TestSink, event: gst::Event) -> bool {
+ fn sink_event(self, _pad: &gst::Pad, imp: &TaskSink, event: gst::Event) -> bool {
if let EventView::FlushStart(..) = event.view() {
return imp.task.flush_start().await_maybe_on_context().is_ok();
}
@@ -142,329 +101,54 @@ impl PadSinkHandler for TestSinkPadHandler {
}
}
-#[derive(Default)]
-struct Stats {
- must_log: bool,
- ramp_up_instant: Option<Instant>,
- log_start_instant: Option<Instant>,
- last_delta_instant: Option<Instant>,
- max_buffers: Option<f32>,
- buffer_count: f32,
- buffer_count_delta: f32,
- latency_sum: f32,
- latency_square_sum: f32,
- latency_sum_delta: f32,
- latency_square_sum_delta: f32,
- latency_min: Duration,
- latency_min_delta: Duration,
- latency_max: Duration,
- latency_max_delta: Duration,
- interval_sum: f32,
- interval_square_sum: f32,
- interval_sum_delta: f32,
- interval_square_sum_delta: f32,
- interval_min: Duration,
- interval_min_delta: Duration,
- interval_max: Duration,
- interval_max_delta: Duration,
- interval_late_warn: Duration,
- interval_late_count: f32,
- interval_late_count_delta: f32,
- #[cfg(feature = "tuning")]
- parked_duration_init: Duration,
-}
-
-impl Stats {
- fn start(&mut self) {
- if !self.must_log {
- return;
- }
-
- self.buffer_count = 0.0;
- self.buffer_count_delta = 0.0;
- self.latency_sum = 0.0;
- self.latency_square_sum = 0.0;
- self.latency_sum_delta = 0.0;
- self.latency_square_sum_delta = 0.0;
- self.latency_min = Duration::MAX;
- self.latency_min_delta = Duration::MAX;
- self.latency_max = Duration::ZERO;
- self.latency_max_delta = Duration::ZERO;
- self.interval_sum = 0.0;
- self.interval_square_sum = 0.0;
- self.interval_sum_delta = 0.0;
- self.interval_square_sum_delta = 0.0;
- self.interval_min = Duration::MAX;
- self.interval_min_delta = Duration::MAX;
- self.interval_max = Duration::ZERO;
- self.interval_max_delta = Duration::ZERO;
- self.interval_late_count = 0.0;
- self.interval_late_count_delta = 0.0;
- self.last_delta_instant = None;
- self.log_start_instant = None;
-
- self.ramp_up_instant = Some(Instant::now());
- gst::info!(CAT, "First stats logs in {:2?}", 2 * LOG_PERIOD);
- }
-
- fn is_active(&mut self) -> bool {
- if !self.must_log {
- return false;
- }
-
- if let Some(ramp_up_instant) = self.ramp_up_instant {
- if ramp_up_instant.elapsed() < LOG_PERIOD {
- return false;
- }
-
- self.ramp_up_instant = None;
- gst::info!(CAT, "Ramp up complete. Stats logs in {:2?}", LOG_PERIOD);
- self.log_start_instant = Some(Instant::now());
- self.last_delta_instant = self.log_start_instant;
-
- #[cfg(feature = "tuning")]
- {
- self.parked_duration_init = Context::current().unwrap().parked_duration();
- }
- }
-
- use std::cmp::Ordering::*;
- match self.max_buffers.opt_cmp(self.buffer_count) {
- Some(Equal) => {
- self.log_global();
- self.buffer_count += 1.0;
- false
- }
- Some(Less) => false,
- _ => true,
- }
- }
-
- fn add_buffer(&mut self, latency: Duration, interval: Duration) {
- if !self.is_active() {
- return;
- }
-
- self.buffer_count += 1.0;
- self.buffer_count_delta += 1.0;
-
- // Latency
- let latency_f32 = latency.as_nanos() as f32;
- let latency_square = latency_f32.powi(2);
-
- self.latency_sum += latency_f32;
- self.latency_square_sum += latency_square;
- self.latency_min = self.latency_min.min(latency);
- self.latency_max = self.latency_max.max(latency);
-
- self.latency_sum_delta += latency_f32;
- self.latency_square_sum_delta += latency_square;
- self.latency_min_delta = self.latency_min_delta.min(latency);
- self.latency_max_delta = self.latency_max_delta.max(latency);
-
- // Interval
- let interval_f32 = interval.as_nanos() as f32;
- let interval_square = interval_f32.powi(2);
-
- self.interval_sum += interval_f32;
- self.interval_square_sum += interval_square;
- self.interval_min = self.interval_min.min(interval);
- self.interval_max = self.interval_max.max(interval);
-
- self.interval_sum_delta += interval_f32;
- self.interval_square_sum_delta += interval_square;
- self.interval_min_delta = self.interval_min_delta.min(interval);
- self.interval_max_delta = self.interval_max_delta.max(interval);
-
- if interval > self.interval_late_warn {
- self.interval_late_count += 1.0;
- self.interval_late_count_delta += 1.0;
- }
-
- let delta_duration = match self.last_delta_instant {
- Some(last_delta) => last_delta.elapsed(),
- None => return,
- };
-
- if delta_duration < LOG_PERIOD {
- return;
- }
-
- self.last_delta_instant = Some(Instant::now());
-
- gst::info!(CAT, "Delta stats:");
- let interval_mean = self.interval_sum_delta / self.buffer_count_delta;
- let interval_std_dev = f32::sqrt(
- self.interval_square_sum_delta / self.buffer_count_delta - interval_mean.powi(2),
- );
-
- gst::info!(
- CAT,
- "o interval: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
- Duration::from_nanos(interval_mean as u64),
- Duration::from_nanos(interval_std_dev as u64),
- self.interval_min_delta,
- self.interval_max_delta,
- );
-
- if self.interval_late_count_delta > f32::EPSILON {
- gst::warning!(
- CAT,
- "o {:5.2}% late buffers",
- 100f32 * self.interval_late_count_delta / self.buffer_count_delta
- );
- }
-
- self.interval_sum_delta = 0.0;
- self.interval_square_sum_delta = 0.0;
- self.interval_min_delta = Duration::MAX;
- self.interval_max_delta = Duration::ZERO;
- self.interval_late_count_delta = 0.0;
-
- let latency_mean = self.latency_sum_delta / self.buffer_count_delta;
- let latency_std_dev = f32::sqrt(
- self.latency_square_sum_delta / self.buffer_count_delta - latency_mean.powi(2),
- );
-
- gst::info!(
- CAT,
- "o latency: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
- Duration::from_nanos(latency_mean as u64),
- Duration::from_nanos(latency_std_dev as u64),
- self.latency_min_delta,
- self.latency_max_delta,
- );
-
- self.latency_sum_delta = 0.0;
- self.latency_square_sum_delta = 0.0;
- self.latency_min_delta = Duration::MAX;
- self.latency_max_delta = Duration::ZERO;
-
- self.buffer_count_delta = 0.0;
- }
-
- fn log_global(&mut self) {
- if self.buffer_count < 1.0 {
- return;
- }
-
- let _log_start = if let Some(log_start) = self.log_start_instant {
- log_start
- } else {
- return;
- };
-
- gst::info!(CAT, "Global stats:");
-
- #[cfg(feature = "tuning")]
- {
- let duration = _log_start.elapsed();
- let parked_duration =
- Context::current().unwrap().parked_duration() - self.parked_duration_init;
- gst::info!(
- CAT,
- "o parked: {parked_duration:4.2?} ({:5.2?}%)",
- (parked_duration.as_nanos() as f32 * 100.0 / duration.as_nanos() as f32)
- );
- }
-
- let interval_mean = self.interval_sum / self.buffer_count;
- let interval_std_dev =
- f32::sqrt(self.interval_square_sum / self.buffer_count - interval_mean.powi(2));
-
- gst::info!(
- CAT,
- "o interval: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
- Duration::from_nanos(interval_mean as u64),
- Duration::from_nanos(interval_std_dev as u64),
- self.interval_min,
- self.interval_max,
- );
-
- if self.interval_late_count > f32::EPSILON {
- gst::warning!(
- CAT,
- "o {:5.2}% late buffers",
- 100f32 * self.interval_late_count / self.buffer_count
- );
- }
-
- let latency_mean = self.latency_sum / self.buffer_count;
- let latency_std_dev =
- f32::sqrt(self.latency_square_sum / self.buffer_count - latency_mean.powi(2));
-
- gst::info!(
- CAT,
- "o latency: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
- Duration::from_nanos(latency_mean as u64),
- Duration::from_nanos(latency_std_dev as u64),
- self.latency_min,
- self.latency_max,
- );
- }
-}
-
-struct TestSinkTask {
- element: super::TestSink,
- raise_log_level: bool,
+struct TaskSinkTask {
+ elem: super::TaskSink,
+ item_receiver: flume::Receiver<StreamItem>,
+ is_main_elem: bool,
last_dts: Option<gst::ClockTime>,
- item_receiver: Peekable<flume::r#async::RecvStream<'static, StreamItem>>,
- stats: Stats,
- segment: Option<gst::Segment>,
+ segment_start: Option<gst::ClockTime>,
+ stats: Option<Box<Stats>>,
}
-impl TestSinkTask {
- fn new(element: &super::TestSink, item_receiver: flume::Receiver<StreamItem>) -> Self {
- TestSinkTask {
- element: element.clone(),
- raise_log_level: false,
+impl TaskSinkTask {
+ fn new(
+ elem: &super::TaskSink,
+ item_receiver: flume::Receiver<StreamItem>,
+ is_main_elem: bool,
+ stats: Option<Box<Stats>>,
+ ) -> Self {
+ TaskSinkTask {
+ elem: elem.clone(),
+ item_receiver,
+ is_main_elem,
last_dts: None,
- item_receiver: item_receiver.into_stream().peekable(),
- stats: Stats::default(),
- segment: None,
+ stats,
+ segment_start: None,
}
}
- async fn flush(&mut self) {
+ fn flush(&mut self) {
// Purge the channel
- while let Poll::Ready(Some(_item)) = futures::poll!(self.item_receiver.next()) {}
+ while !self.item_receiver.is_empty() {}
}
}
-impl TaskImpl for TestSinkTask {
+impl TaskImpl for TaskSinkTask {
type Item = StreamItem;
fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
- async move {
- let sink = self.element.imp();
- let settings = sink.settings.lock().unwrap();
- self.raise_log_level = settings.raise_log_level;
-
- if self.raise_log_level {
- gst::log!(CAT, obj: self.element, "Preparing Task");
- } else {
- gst::trace!(CAT, obj: self.element, "Preparing Task");
- }
-
- self.stats.must_log = settings.logs_stats;
- self.stats.max_buffers = settings.max_buffers.map(|max_buffers| max_buffers as f32);
- self.stats.interval_late_warn = settings.push_period + settings.context_wait / 2;
-
- Ok(())
- }
- .boxed()
+ log_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Preparing Task");
+ future::ok(()).boxed()
}
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async {
- if self.raise_log_level {
- gst::log!(CAT, obj: self.element, "Starting Task");
- } else {
- gst::trace!(CAT, obj: self.element, "Starting Task");
+ log_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Starting Task");
+ self.last_dts = None;
+ if let Some(stats) = self.stats.as_mut() {
+ stats.start();
}
- self.last_dts = None;
- self.stats.start();
Ok(())
}
.boxed()
@@ -472,101 +156,66 @@ impl TaskImpl for TestSinkTask {
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async {
- if self.raise_log_level {
- gst::log!(CAT, obj: self.element, "Stopping Task");
- } else {
- gst::trace!(CAT, obj: self.element, "Stopping Task");
- }
-
- self.flush().await;
+ log_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Stopping Task");
+ self.flush();
Ok(())
}
.boxed()
}
fn try_next(&mut self) -> BoxFuture<'_, Result<StreamItem, gst::FlowError>> {
- async move {
- let item = self.item_receiver.next().await.unwrap();
-
- if self.raise_log_level {
- gst::log!(CAT, obj: self.element, "Popped item");
- } else {
- gst::trace!(CAT, obj: self.element, "Popped item");
- }
-
- Ok(item)
- }
- .boxed()
+ self.item_receiver
+ .recv_async()
+ .map(|opt_item| Ok(opt_item.unwrap()))
+ .boxed()
}
fn handle_item(&mut self, item: StreamItem) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
- if self.raise_log_level {
- gst::debug!(CAT, obj: self.element, "Received {:?}", item);
- } else {
- gst::trace!(CAT, obj: self.element, "Received {:?}", item);
- }
+ debug_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Received {item:?}");
match item {
StreamItem::Buffer(buffer) => {
- let dts = self
- .segment
- .as_ref()
- .and_then(|segment| {
- segment
- .downcast_ref::<gst::format::Time>()
- .and_then(|segment| segment.to_running_time(buffer.dts()))
- })
- .unwrap();
+ let dts = buffer
+ .dts()
+ .expect("Buffer without dts")
+ .checked_sub(self.segment_start.expect("Buffer without Time Segment"))
+ .expect("dts before Segment start");
if let Some(last_dts) = self.last_dts {
- let cur_ts = self.element.current_running_time().unwrap();
+ let cur_ts = self.elem.current_running_time().unwrap();
let latency: Duration = (cur_ts - dts).into();
let interval: Duration = (dts - last_dts).into();
- self.stats.add_buffer(latency, interval);
-
- if self.raise_log_level {
- gst::debug!(CAT, obj: self.element, "o latency {:.2?}", latency);
- gst::debug!(CAT, obj: self.element, "o interval {:.2?}", interval);
- } else {
- gst::trace!(CAT, obj: self.element, "o latency {:.2?}", latency);
- gst::trace!(CAT, obj: self.element, "o interval {:.2?}", interval);
+ if let Some(stats) = self.stats.as_mut() {
+ stats.add_buffer(latency, interval);
}
+
+ debug_or_trace!(
+ CAT,
+ self.is_main_elem,
+ obj: self.elem,
+ "o latency {latency:.2?}",
+ );
+ debug_or_trace!(
+ CAT,
+ self.is_main_elem,
+ obj: self.elem,
+ "o interval {interval:.2?}",
+ );
}
self.last_dts = Some(dts);
- if self.raise_log_level {
- gst::log!(CAT, obj: self.element, "Buffer processed");
- } else {
- gst::trace!(CAT, obj: self.element, "Buffer processed");
- }
+ log_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Buffer processed");
}
- StreamItem::Event(event) => match event.view() {
- EventView::Eos(_) => {
- if self.raise_log_level {
- gst::debug!(CAT, obj: self.element, "EOS");
- } else {
- gst::trace!(CAT, obj: self.element, "EOS");
+ StreamItem::Event(evt) => {
+ if let EventView::Segment(evt) = evt.view() {
+ if let Some(time_seg) = evt.segment().downcast_ref::<gst::ClockTime>() {
+ self.segment_start = time_seg.start();
}
-
- let elem = self.element.clone();
- self.element.call_async(move |_| {
- let _ =
- elem.post_message(gst::message::Eos::builder().src(&elem).build());
- });
-
- return Err(gst::FlowError::Eos);
- }
- EventView::Segment(e) => {
- self.segment = Some(e.segment().clone());
- }
- EventView::SinkMessage(e) => {
- let _ = self.element.post_message(e.message());
}
- _ => (),
- },
+ }
}
Ok(())
@@ -576,121 +225,88 @@ impl TaskImpl for TestSinkTask {
}
#[derive(Debug)]
-pub struct TestSink {
+pub struct TaskSink {
sink_pad: PadSink,
task: Task,
item_sender: Mutex<Option<flume::Sender<StreamItem>>>,
settings: Mutex<Settings>,
}
-impl TestSink {
+impl TaskSink {
#[track_caller]
fn clone_item_sender(&self) -> flume::Sender<StreamItem> {
self.item_sender.lock().unwrap().as_ref().unwrap().clone()
}
fn prepare(&self) -> Result<(), gst::ErrorMessage> {
- let raise_log_level = self.settings.lock().unwrap().raise_log_level;
- if raise_log_level {
- gst::debug!(CAT, imp: self, "Preparing");
+ let settings = self.settings.lock().unwrap();
+ let stats = if settings.logs_stats {
+ Some(Box::new(Stats::new(
+ settings.max_buffers,
+ settings.push_period + settings.context_wait / 2,
+ )))
} else {
- gst::trace!(CAT, imp: self, "Preparing");
- }
+ None
+ };
- let context = {
- let settings = self.settings.lock().unwrap();
+ debug_or_trace!(CAT, settings.is_main_elem, imp: self, "Preparing");
- Context::acquire(&settings.context, settings.context_wait).map_err(|err| {
- error_msg!(
- gst::ResourceError::OpenWrite,
- ["Failed to acquire Context: {}", err]
- )
- })?
- };
+ let ts_ctx = Context::acquire(&settings.context, settings.context_wait).map_err(|err| {
+ error_msg!(
+ gst::ResourceError::OpenWrite,
+ ["Failed to acquire Context: {}", err]
+ )
+ })?;
// Enable backpressure for items
let (item_sender, item_receiver) = flume::bounded(0);
- let task_impl = TestSinkTask::new(&self.obj(), item_receiver);
- self.task.prepare(task_impl, context).block_on()?;
+ let task_impl = TaskSinkTask::new(&self.obj(), item_receiver, settings.is_main_elem, stats);
+ self.task.prepare(task_impl, ts_ctx).block_on()?;
*self.item_sender.lock().unwrap() = Some(item_sender);
- if raise_log_level {
- gst::debug!(CAT, imp: self, "Prepared");
- } else {
- gst::trace!(CAT, imp: self, "Prepared");
- }
+ debug_or_trace!(CAT, settings.is_main_elem, imp: self, "Prepared");
Ok(())
}
fn unprepare(&self) {
- let raise_log_level = self.settings.lock().unwrap().raise_log_level;
- if raise_log_level {
- gst::debug!(CAT, imp: self, "Unpreparing");
- } else {
- gst::trace!(CAT, imp: self, "Unpreparing");
- }
-
+ let is_main_elem = self.settings.lock().unwrap().is_main_elem;
+ debug_or_trace!(CAT, is_main_elem, imp: self, "Unpreparing");
self.task.unprepare().block_on().unwrap();
-
- if raise_log_level {
- gst::debug!(CAT, imp: self, "Unprepared");
- } else {
- gst::trace!(CAT, imp: self, "Unprepared");
- }
+ debug_or_trace!(CAT, is_main_elem, imp: self, "Unprepared");
}
fn stop(&self) -> Result<(), gst::ErrorMessage> {
- let raise_log_level = self.settings.lock().unwrap().raise_log_level;
- if raise_log_level {
- gst::debug!(CAT, imp: self, "Stopping");
- } else {
- gst::trace!(CAT, imp: self, "Stopping");
- }
-
+ let is_main_elem = self.settings.lock().unwrap().is_main_elem;
+ debug_or_trace!(CAT, is_main_elem, imp: self, "Stopping");
self.task.stop().block_on()?;
-
- if raise_log_level {
- gst::debug!(CAT, imp: self, "Stopped");
- } else {
- gst::trace!(CAT, imp: self, "Stopped");
- }
+ debug_or_trace!(CAT, is_main_elem, imp: self, "Stopped");
Ok(())
}
fn start(&self) -> Result<(), gst::ErrorMessage> {
- let raise_log_level = self.settings.lock().unwrap().raise_log_level;
- if raise_log_level {
- gst::debug!(CAT, imp: self, "Starting");
- } else {
- gst::trace!(CAT, imp: self, "Starting");
- }
-
+ let is_main_elem = self.settings.lock().unwrap().is_main_elem;
+ debug_or_trace!(CAT, is_main_elem, imp: self, "Starting");
self.task.start().block_on()?;
-
- if raise_log_level {
- gst::debug!(CAT, imp: self, "Started");
- } else {
- gst::trace!(CAT, imp: self, "Started");
- }
+ debug_or_trace!(CAT, is_main_elem, imp: self, "Started");
Ok(())
}
}
#[glib::object_subclass]
-impl ObjectSubclass for TestSink {
- const NAME: &'static str = "StandaloneTestSink";
- type Type = super::TestSink;
+impl ObjectSubclass for TaskSink {
+ const NAME: &'static str = "TsStandaloneTaskSink";
+ type Type = super::TaskSink;
type ParentType = gst::Element;
fn with_class(klass: &Self::Class) -> Self {
Self {
sink_pad: PadSink::new(
gst::Pad::from_template(&klass.pad_template("sink").unwrap(), Some("sink")),
- TestSinkPadHandler,
+ TaskPadSinkHandler,
),
task: Task::default(),
item_sender: Default::default(),
@@ -699,96 +315,18 @@ impl ObjectSubclass for TestSink {
}
}
-impl ObjectImpl for TestSink {
+impl ObjectImpl for TaskSink {
fn properties() -> &'static [glib::ParamSpec] {
- static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
- vec![
- glib::ParamSpecString::builder("context")
- .nick("Context")
- .blurb("Context name to share threads with")
- .default_value(Some(DEFAULT_CONTEXT))
- .build(),
- glib::ParamSpecUInt::builder("context-wait")
- .nick("Context Wait")
- .blurb("Throttle poll loop to run at most once every this many ms")
- .maximum(1000)
- .default_value(DEFAULT_CONTEXT_WAIT.as_millis() as u32)
- .build(),
- glib::ParamSpecBoolean::builder("raise-log-level")
- .nick("Raise log level")
- .blurb("Raises the log level so that this element stands out")
- .write_only()
- .build(),
- glib::ParamSpecBoolean::builder("logs-stats")
- .nick("Logs Stats")
- .blurb("Whether statistics should be logged")
- .write_only()
- .build(),
- glib::ParamSpecUInt::builder("push-period")
- .nick("Src buffer Push Period")
- .blurb("Push period used by `src` element (used for stats warnings)")
- .default_value(DEFAULT_PUSH_PERIOD.as_millis() as u32)
- .build(),
- glib::ParamSpecInt::builder("max-buffers")
- .nick("Max Buffers")
- .blurb("Number of buffers to count before stopping stats (-1 = unlimited)")
- .minimum(-1i32)
- .default_value(DEFAULT_MAX_BUFFERS)
- .build(),
- ]
- });
-
+ static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(Settings::properties);
PROPERTIES.as_ref()
}
- fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
- let mut settings = self.settings.lock().unwrap();
- match pspec.name() {
- "context" => {
- settings.context = value
- .get::<Option<String>>()
- .expect("type checked upstream")
- .unwrap_or_else(|| DEFAULT_CONTEXT.into());
- }
- "context-wait" => {
- settings.context_wait = Duration::from_millis(
- value.get::<u32>().expect("type checked upstream").into(),
- );
- }
- "raise-log-level" => {
- settings.raise_log_level = value.get::<bool>().expect("type checked upstream");
- }
- "logs-stats" => {
- let logs_stats = value.get().expect("type checked upstream");
- settings.logs_stats = logs_stats;
- }
- "push-period" => {
- settings.push_period = Duration::from_millis(
- value.get::<u32>().expect("type checked upstream").into(),
- );
- }
- "max-buffers" => {
- let value = value.get::<i32>().expect("type checked upstream");
- settings.max_buffers = if value > 0 { Some(value as u32) } else { None };
- }
- _ => unimplemented!(),
- }
+ fn set_property(&self, id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
+ self.settings.lock().unwrap().set_property(id, value, pspec);
}
- fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
- let settings = self.settings.lock().unwrap();
- match pspec.name() {
- "context" => settings.context.to_value(),
- "context-wait" => (settings.context_wait.as_millis() as u32).to_value(),
- "raise-log-level" => settings.raise_log_level.to_value(),
- "push-period" => (settings.push_period.as_millis() as u32).to_value(),
- "max-buffers" => settings
- .max_buffers
- .and_then(|val| val.try_into().ok())
- .unwrap_or(-1i32)
- .to_value(),
- _ => unimplemented!(),
- }
+ fn property(&self, id: usize, pspec: &glib::ParamSpec) -> glib::Value {
+ self.settings.lock().unwrap().property(id, pspec)
}
fn constructed(&self) {
@@ -800,15 +338,15 @@ impl ObjectImpl for TestSink {
}
}
-impl GstObjectImpl for TestSink {}
+impl GstObjectImpl for TaskSink {}
-impl ElementImpl for TestSink {
+impl ElementImpl for TaskSink {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
- "Thread-sharing standalone test sink",
+ "Thread-sharing standalone test task sink",
"Sink/Test",
- "Thread-sharing standalone test sink",
+ "Thread-sharing standalone test task sink",
"François Laignel <fengalin@free.fr>",
)
});
@@ -838,7 +376,7 @@ impl ElementImpl for TestSink {
&self,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
- gst::trace!(CAT, imp: self, "Changing state {:?}", transition);
+ gst::trace!(CAT, imp: self, "Changing state {transition:?}");
match transition {
gst::StateChange::NullToReady => {
diff --git a/generic/threadshare/examples/standalone/sink/task/mod.rs b/generic/threadshare/examples/standalone/sink/task/mod.rs
index 6b8ce952..1dea85a1 100644
--- a/generic/threadshare/examples/standalone/sink/task/mod.rs
+++ b/generic/threadshare/examples/standalone/sink/task/mod.rs
@@ -4,14 +4,14 @@ use gst::prelude::*;
mod imp;
glib::wrapper! {
- pub struct TestSink(ObjectSubclass<imp::TestSink>) @extends gst::Element, gst::Object;
+ pub struct TaskSink(ObjectSubclass<imp::TaskSink>) @extends gst::Element, gst::Object;
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
- "ts-standalone-test-sink",
+ super::TASK_ELEMENT_NAME,
gst::Rank::None,
- TestSink::static_type(),
+ TaskSink::static_type(),
)
}
diff --git a/generic/threadshare/examples/standalone/src/imp.rs b/generic/threadshare/examples/standalone/src/imp.rs
index 521d63c1..c6f9c82b 100644
--- a/generic/threadshare/examples/standalone/src/imp.rs
+++ b/generic/threadshare/examples/standalone/src/imp.rs
@@ -19,11 +19,11 @@ use std::sync::Mutex;
use std::time::Duration;
use gstthreadshare::runtime::prelude::*;
-use gstthreadshare::runtime::{timer, Context, PadSrc, Task};
+use gstthreadshare::runtime::{task, timer, Context, PadSrc, Task};
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
- "ts-standalone-test-src",
+ super::ELEMENT_NAME,
gst::DebugColorFlags::empty(),
Some("Thread-sharing standalone test src"),
)
@@ -39,7 +39,7 @@ struct Settings {
context: String,
context_wait: Duration,
push_period: gst::ClockTime,
- raise_log_level: bool,
+ is_main_elem: bool,
num_buffers: Option<u32>,
}
@@ -49,7 +49,7 @@ impl Default for Settings {
context: DEFAULT_CONTEXT.into(),
context_wait: DEFAULT_CONTEXT_WAIT,
push_period: DEFAULT_PUSH_PERIOD,
- raise_log_level: false,
+ is_main_elem: false,
num_buffers: Some(DEFAULT_NUM_BUFFERS as u32),
}
}
@@ -63,19 +63,18 @@ impl PadSrcHandler for TestSrcPadHandler {
#[derive(Debug)]
struct SrcTask {
- element: super::TestSrc,
+ elem: super::TestSrc,
buffer_pool: gst::BufferPool,
timer: Option<timer::Interval>,
- raise_log_level: bool,
+ is_main_elem: bool,
push_period: gst::ClockTime,
need_initial_events: bool,
- need_segment: bool,
num_buffers: Option<u32>,
buffer_count: u32,
}
impl SrcTask {
- fn new(element: super::TestSrc) -> Self {
+ fn new(elem: super::TestSrc) -> Self {
let buffer_pool = gst::BufferPool::new();
let mut pool_config = buffer_pool.config();
pool_config
@@ -84,13 +83,12 @@ impl SrcTask {
buffer_pool.set_config(pool_config).unwrap();
SrcTask {
- element,
+ elem,
buffer_pool,
timer: None,
- raise_log_level: false,
+ is_main_elem: false,
push_period: gst::ClockTime::ZERO,
need_initial_events: true,
- need_segment: true,
num_buffers: Some(DEFAULT_NUM_BUFFERS as u32),
buffer_count: 0,
}
@@ -98,34 +96,48 @@ impl SrcTask {
}
impl TaskImpl for SrcTask {
- type Item = gst::Buffer;
+ type Item = ();
fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
- async move {
- let src = self.element.imp();
- let settings = src.settings.lock().unwrap();
- self.raise_log_level = settings.raise_log_level;
-
- if self.raise_log_level {
- gst::log!(CAT, obj: self.element, "Preparing Task");
- } else {
- gst::trace!(CAT, obj: self.element, "Preparing Task");
- }
+ let imp = self.elem.imp();
+ let settings = imp.settings.lock().unwrap();
+ self.is_main_elem = settings.is_main_elem;
- self.push_period = settings.push_period;
- self.num_buffers = settings.num_buffers;
+ log_or_trace!(CAT, self.is_main_elem, imp: imp, "Preparing Task");
- Ok(())
- }
- .boxed()
+ self.push_period = settings.push_period;
+ self.num_buffers = settings.num_buffers;
+
+ future::ok(()).boxed()
}
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
- async {
- if self.raise_log_level {
- gst::log!(CAT, obj: self.element, "Starting Task");
- } else {
- gst::trace!(CAT, obj: self.element, "Starting Task");
+ async move {
+ log_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Starting Task");
+
+ if self.need_initial_events {
+ let imp = self.elem.imp();
+
+ debug_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Pushing initial events");
+
+ let stream_id =
+ format!("{:08x}{:08x}", rand::random::<u32>(), rand::random::<u32>());
+ let stream_start_evt = gst::event::StreamStart::builder(&stream_id)
+ .group_id(gst::GroupId::next())
+ .build();
+ imp.src_pad.push_event(stream_start_evt).await;
+
+ imp.src_pad
+ .push_event(gst::event::Caps::new(
+ &gst::Caps::builder("foo/bar").build(),
+ ))
+ .await;
+
+ let segment_evt =
+ gst::event::Segment::new(&gst::FormattedSegment::<gst::format::Time>::new());
+ imp.src_pad.push_event(segment_evt).await;
+
+ self.need_initial_events = false;
}
self.timer = Some(
@@ -138,178 +150,100 @@ impl TaskImpl for SrcTask {
);
self.buffer_count = 0;
self.buffer_pool.set_active(true).unwrap();
+
Ok(())
}
.boxed()
}
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
- async move {
- if self.raise_log_level {
- gst::log!(CAT, obj: self.element, "Stopping Task");
- } else {
- gst::trace!(CAT, obj: self.element, "Stopping Task");
- }
+ log_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Stopping Task");
+ self.buffer_pool.set_active(false).unwrap();
+ self.timer = None;
+ self.need_initial_events = true;
- self.buffer_pool.set_active(false).unwrap();
- self.timer = None;
- self.need_initial_events = true;
- self.need_segment = true;
+ future::ok(()).boxed()
+ }
+
+ fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ async move {
+ log_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Awaiting timer");
+ self.timer.as_mut().unwrap().next().await;
+ log_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Timer ticked");
Ok(())
}
.boxed()
}
- fn try_next(&mut self) -> BoxFuture<'_, Result<gst::Buffer, gst::FlowError>> {
+ fn handle_item(&mut self, _: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
- if self.raise_log_level {
- gst::log!(CAT, obj: self.element, "Awaiting timer");
- } else {
- gst::trace!(CAT, obj: self.element, "Awaiting timer");
- }
-
- self.timer.as_mut().unwrap().next().await;
-
- if self.raise_log_level {
- gst::log!(CAT, obj: self.element, "Timer ticked");
- } else {
- gst::trace!(CAT, obj: self.element, "Timer ticked");
- }
-
- self.buffer_pool
+ let buffer = self
+ .buffer_pool
.acquire_buffer(None)
.map(|mut buffer| {
{
let buffer = buffer.get_mut().unwrap();
- let rtime = self.element.current_running_time().unwrap();
+ let rtime = self.elem.current_running_time().unwrap();
buffer.set_dts(rtime);
}
buffer
})
.map_err(|err| {
- gst::error!(CAT, obj: self.element, "Failed to acquire buffer {}", err);
+ gst::error!(CAT, obj: self.elem, "Failed to acquire buffer {err}");
err
- })
+ })?;
+
+ debug_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Forwarding buffer");
+ self.elem.imp().src_pad.push(buffer).await?;
+ log_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Successfully pushed buffer");
+
+ self.buffer_count += 1;
+
+ if self.num_buffers.opt_eq(self.buffer_count) == Some(true) {
+ return Err(gst::FlowError::Eos);
+ }
+
+ Ok(())
}
.boxed()
}
- fn handle_item(&mut self, buffer: gst::Buffer) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ fn handle_loop_error(&mut self, err: gst::FlowError) -> BoxFuture<'_, task::Trigger> {
async move {
- let res = self.push(buffer).await;
- match res {
- Ok(_) => {
- if self.raise_log_level {
- gst::log!(CAT, obj: self.element, "Successfully pushed buffer");
- } else {
- gst::trace!(CAT, obj: self.element, "Successfully pushed buffer");
- }
- }
- Err(gst::FlowError::Eos) => {
- if self.raise_log_level {
- gst::debug!(CAT, obj: self.element, "EOS");
- } else {
- gst::trace!(CAT, obj: self.element, "EOS");
+ match err {
+ gst::FlowError::Eos => {
+ debug_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Pushing EOS");
+
+ let imp = self.elem.imp();
+ if !imp.src_pad.push_event(gst::event::Eos::new()).await {
+ gst::error!(CAT, imp: imp, "Error pushing EOS");
}
- let test_src = self.element.imp();
- test_src.src_pad.push_event(gst::event::Eos::new()).await;
- return Err(gst::FlowError::Eos);
+ task::Trigger::Stop
}
- Err(gst::FlowError::Flushing) => {
- if self.raise_log_level {
- gst::debug!(CAT, obj: self.element, "Flushing");
- } else {
- gst::trace!(CAT, obj: self.element, "Flushing");
- }
+ gst::FlowError::Flushing => {
+ debug_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Flushing");
+
+ task::Trigger::FlushStart
}
- Err(err) => {
- gst::error!(CAT, obj: self.element, "Got error {}", err);
+ err => {
+ gst::error!(CAT, obj: self.elem, "Got error {err}");
gst::element_error!(
- &self.element,
+ &self.elem,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
+
+ task::Trigger::Error
}
}
-
- res.map(drop)
}
.boxed()
}
}
-impl SrcTask {
- async fn push(&mut self, buffer: gst::Buffer) -> Result<gst::FlowSuccess, gst::FlowError> {
- if self.raise_log_level {
- gst::debug!(CAT, obj: self.element, "Pushing {:?}", buffer);
- } else {
- gst::trace!(CAT, obj: self.element, "Pushing {:?}", buffer);
- }
-
- let test_src = self.element.imp();
-
- if self.need_initial_events {
- if self.raise_log_level {
- gst::debug!(CAT, obj: self.element, "Pushing initial events");
- } else {
- gst::trace!(CAT, obj: self.element, "Pushing initial events");
- }
-
- let stream_id = format!("{:08x}{:08x}", rand::random::<u32>(), rand::random::<u32>());
- let stream_start_evt = gst::event::StreamStart::builder(&stream_id)
- .group_id(gst::GroupId::next())
- .build();
- test_src.src_pad.push_event(stream_start_evt).await;
-
- test_src
- .src_pad
- .push_event(gst::event::Caps::new(
- &gst::Caps::builder("foo/bar").build(),
- ))
- .await;
-
- self.need_initial_events = false;
- }
-
- if self.need_segment {
- let segment_evt =
- gst::event::Segment::new(&gst::FormattedSegment::<gst::format::Time>::new());
- test_src.src_pad.push_event(segment_evt).await;
-
- self.need_segment = false;
- }
-
- if self.raise_log_level {
- gst::debug!(CAT, obj: self.element, "Forwarding buffer");
- } else {
- gst::trace!(CAT, obj: self.element, "Forwarding buffer");
- }
-
- let ok = test_src.src_pad.push(buffer).await?;
-
- self.buffer_count += 1;
-
- if self.num_buffers.opt_eq(self.buffer_count).unwrap_or(false) {
- if self.raise_log_level {
- gst::debug!(CAT, obj: self.element, "Pushing EOS");
- } else {
- gst::trace!(CAT, obj: self.element, "Pushing EOS");
- }
-
- let test_src = self.element.imp();
- if !test_src.src_pad.push_event(gst::event::Eos::new()).await {
- gst::error!(CAT, obj: self.element, "Error pushing EOS");
- }
- return Err(gst::FlowError::Eos);
- }
-
- Ok(ok)
- }
-}
-
#[derive(Debug)]
pub struct TestSrc {
src_pad: PadSrc,
@@ -319,106 +253,57 @@ pub struct TestSrc {
impl TestSrc {
fn prepare(&self) -> Result<(), gst::ErrorMessage> {
- let raise_log_level = self.settings.lock().unwrap().raise_log_level;
- if raise_log_level {
- gst::debug!(CAT, imp: self, "Preparing");
- } else {
- gst::trace!(CAT, imp: self, "Preparing");
- }
+ let is_main_elem = self.settings.lock().unwrap().is_main_elem;
+ debug_or_trace!(CAT, is_main_elem, imp: self, "Preparing");
let settings = self.settings.lock().unwrap();
- let context =
- Context::acquire(&settings.context, settings.context_wait).map_err(|err| {
- gst::error_msg!(
- gst::ResourceError::OpenRead,
- ["Failed to acquire Context: {}", err]
- )
- })?;
+ let ts_ctx = Context::acquire(&settings.context, settings.context_wait).map_err(|err| {
+ gst::error_msg!(
+ gst::ResourceError::OpenRead,
+ ["Failed to acquire Context: {}", err]
+ )
+ })?;
drop(settings);
self.task
- .prepare(SrcTask::new(self.obj().clone()), context)
+ .prepare(SrcTask::new(self.instance().clone()), ts_ctx)
.block_on()?;
- if raise_log_level {
- gst::debug!(CAT, imp: self, "Prepared");
- } else {
- gst::trace!(CAT, imp: self, "Prepared");
- }
+ debug_or_trace!(CAT, is_main_elem, imp: self, "Prepared");
Ok(())
}
fn unprepare(&self) {
- let raise_log_level = self.settings.lock().unwrap().raise_log_level;
- if raise_log_level {
- gst::debug!(CAT, imp: self, "Unpreparing");
- } else {
- gst::trace!(CAT, imp: self, "Unpreparing");
- }
-
+ let is_main_elem = self.settings.lock().unwrap().is_main_elem;
+ debug_or_trace!(CAT, is_main_elem, imp: self, "Unpreparing");
self.task.unprepare().block_on().unwrap();
-
- if raise_log_level {
- gst::debug!(CAT, imp: self, "Unprepared");
- } else {
- gst::trace!(CAT, imp: self, "Unprepared");
- }
+ debug_or_trace!(CAT, is_main_elem, imp: self, "Unprepared");
}
fn stop(&self) -> Result<(), gst::ErrorMessage> {
- let raise_log_level = self.settings.lock().unwrap().raise_log_level;
- if raise_log_level {
- gst::debug!(CAT, imp: self, "Stopping");
- } else {
- gst::trace!(CAT, imp: self, "Stopping");
- }
-
+ let is_main_elem = self.settings.lock().unwrap().is_main_elem;
+ debug_or_trace!(CAT, is_main_elem, imp: self, "Stopping");
self.task.stop().block_on()?;
-
- if raise_log_level {
- gst::debug!(CAT, imp: self, "Stopped");
- } else {
- gst::trace!(CAT, imp: self, "Stopped");
- }
+ debug_or_trace!(CAT, is_main_elem, imp: self, "Stopped");
Ok(())
}
fn start(&self) -> Result<(), gst::ErrorMessage> {
- let raise_log_level = self.settings.lock().unwrap().raise_log_level;
- if raise_log_level {
- gst::debug!(CAT, imp: self, "Starting");
- } else {
- gst::trace!(CAT, imp: self, "Starting");
- }
-
+ let is_main_elem = self.settings.lock().unwrap().is_main_elem;
+ debug_or_trace!(CAT, is_main_elem, imp: self, "Starting");
self.task.start().block_on()?;
-
- if raise_log_level {
- gst::debug!(CAT, imp: self, "Started");
- } else {
- gst::trace!(CAT, imp: self, "Started");
- }
+ debug_or_trace!(CAT, is_main_elem, imp: self, "Started");
Ok(())
}
fn pause(&self) -> Result<(), gst::ErrorMessage> {
- let raise_log_level = self.settings.lock().unwrap().raise_log_level;
- if raise_log_level {
- gst::debug!(CAT, imp: self, "Pausing");
- } else {
- gst::trace!(CAT, imp: self, "Pausing");
- }
-
+ let is_main_elem = self.settings.lock().unwrap().is_main_elem;
+ debug_or_trace!(CAT, is_main_elem, imp: self, "Pausing");
self.task.pause().block_on()?;
-
- if raise_log_level {
- gst::debug!(CAT, imp: self, "Paused");
- } else {
- gst::trace!(CAT, imp: self, "Paused");
- }
+ debug_or_trace!(CAT, is_main_elem, imp: self, "Paused");
Ok(())
}
@@ -462,9 +347,9 @@ impl ObjectImpl for TestSrc {
.blurb("Push a new buffer every this many ms")
.default_value(DEFAULT_PUSH_PERIOD.mseconds() as u32)
.build(),
- glib::ParamSpecBoolean::builder("raise-log-level")
- .nick("Raise log level")
- .blurb("Raises the log level so that this element stands out")
+ glib::ParamSpecBoolean::builder("main-elem")
+ .nick("Main Element")
+ .blurb("Declare this element as the main one")
.write_only()
.build(),
glib::ParamSpecInt::builder("num-buffers")
@@ -485,24 +370,21 @@ impl ObjectImpl for TestSrc {
"context" => {
settings.context = value
.get::<Option<String>>()
- .expect("type checked upstream")
+ .unwrap()
.unwrap_or_else(|| DEFAULT_CONTEXT.into());
}
"context-wait" => {
- settings.context_wait = Duration::from_millis(
- value.get::<u32>().expect("type checked upstream").into(),
- );
+ settings.context_wait = Duration::from_millis(value.get::<u32>().unwrap().into());
}
"push-period" => {
- settings.push_period = gst::ClockTime::from_mseconds(
- value.get::<u32>().expect("type checked upstream").into(),
- );
+ let value: u64 = value.get::<u32>().unwrap().into();
+ settings.push_period = value.mseconds();
}
- "raise-log-level" => {
- settings.raise_log_level = value.get::<bool>().expect("type checked upstream");
+ "main-elem" => {
+ settings.is_main_elem = value.get::<bool>().unwrap();
}
"num-buffers" => {
- let value = value.get::<i32>().expect("type checked upstream");
+ let value = value.get::<i32>().unwrap();
settings.num_buffers = if value > 0 { Some(value as u32) } else { None };
}
_ => unimplemented!(),
@@ -515,7 +397,7 @@ impl ObjectImpl for TestSrc {
"context" => settings.context.to_value(),
"context-wait" => (settings.context_wait.as_millis() as u32).to_value(),
"push-period" => (settings.push_period.mseconds() as u32).to_value(),
- "raise-log-level" => settings.raise_log_level.to_value(),
+ "main-elem" => settings.is_main_elem.to_value(),
"num-buffers" => settings
.num_buffers
.and_then(|val| val.try_into().ok())
@@ -571,7 +453,7 @@ impl ElementImpl for TestSrc {
&self,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
- gst::trace!(CAT, imp: self, "Changing state {:?}", transition);
+ gst::trace!(CAT, imp: self, "Changing state {transition:?}");
match transition {
gst::StateChange::NullToReady => {
diff --git a/generic/threadshare/examples/standalone/src/mod.rs b/generic/threadshare/examples/standalone/src/mod.rs
index c0fa2777..0a2cbcf8 100644
--- a/generic/threadshare/examples/standalone/src/mod.rs
+++ b/generic/threadshare/examples/standalone/src/mod.rs
@@ -3,6 +3,8 @@ use gst::prelude::*;
mod imp;
+pub const ELEMENT_NAME: &str = "ts-standalone-src";
+
glib::wrapper! {
pub struct TestSrc(ObjectSubclass<imp::TestSrc>) @extends gst::Element, gst::Object;
}
@@ -10,7 +12,7 @@ glib::wrapper! {
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
- "ts-standalone-test-src",
+ "ts-standalone-src",
gst::Rank::None,
TestSrc::static_type(),
)