Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrançois Laignel <fengalin@free.fr>2022-09-08 00:10:29 +0300
committerSebastian Dröge <slomo@coaxion.net>2022-09-13 10:29:50 +0300
commit235ded35fd71082ac9ec78502ba462adb238d38f (patch)
treedc39075c68974f78f97390ab60924378a6c4aaf5 /generic
parent72acbebff0147043a93abd4190e28544afa41801 (diff)
ts: add feature to add counters for performance evaluation
Add a `tuning` feature which adds counters that help with performance evaluation. The only counter added so far accumulates the duration a Scheduler has been parked, which is pretty accurate an indication of CPU usage of the Scheduler.
Diffstat (limited to 'generic')
-rw-r--r--generic/threadshare/Cargo.toml2
-rw-r--r--generic/threadshare/examples/benchmark.rs84
-rw-r--r--generic/threadshare/examples/standalone/sink/imp.rs162
-rw-r--r--generic/threadshare/src/runtime/executor/context.rs8
-rw-r--r--generic/threadshare/src/runtime/executor/scheduler.rs19
5 files changed, 163 insertions, 112 deletions
diff --git a/generic/threadshare/Cargo.toml b/generic/threadshare/Cargo.toml
index 80ae0deab..cd14971a3 100644
--- a/generic/threadshare/Cargo.toml
+++ b/generic/threadshare/Cargo.toml
@@ -65,6 +65,8 @@ pkg-config = "0.3.15"
[features]
static = []
capi = []
+# Adds performance counters used by benchmarking tools.
+tuning = []
doc = ["gst/v1_18"]
[package.metadata.capi]
diff --git a/generic/threadshare/examples/benchmark.rs b/generic/threadshare/examples/benchmark.rs
index 6dc904ba9..32b00ed28 100644
--- a/generic/threadshare/examples/benchmark.rs
+++ b/generic/threadshare/examples/benchmark.rs
@@ -39,29 +39,14 @@ pub static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
fn main() {
gst::init().unwrap();
-
- #[cfg(debug_assertions)]
- {
- use std::path::Path;
-
- let mut path = Path::new("target/debug");
- if !path.exists() {
- path = Path::new("../../target/debug");
- }
-
- gst::Registry::get().scan_path(path);
- }
- #[cfg(not(debug_assertions))]
- {
- use std::path::Path;
-
- let mut path = Path::new("target/release");
- if !path.exists() {
- path = Path::new("../../target/release");
- }
-
- gst::Registry::get().scan_path(path);
- }
+ // Register the plugins statically:
+ // - The executable can be run from anywhere.
+ // - No risk of running against a previous version.
+ // - `main` can use features that rely on `static`s or `thread_local`
+ // such as `Context::acquire` which otherwise don't point to
+ // the same `static` or `thread_local`, probably because
+ // the shared object uses its owns and the executable, others.
+ gstthreadshare::plugin_register_static().unwrap();
let args = env::args().collect::<Vec<_>>();
assert!(args.len() > 4);
@@ -71,7 +56,7 @@ fn main() {
let wait: u32 = args[4].parse().unwrap();
// Nb buffers to await before stopping.
- let max_buffers: Option<u64> = if args.len() > 5 {
+ let max_buffers: Option<f32> = if args.len() > 5 {
args[5].parse().ok()
} else {
None
@@ -243,15 +228,24 @@ fn main() {
let l_clone = l.clone();
thread::spawn(move || {
- let throughput_factor = 1_000f32 / (n_streams as f32);
- let mut prev_reset_instant: Option<Instant> = None;
- let mut count;
- let mut reset_instant;
+ let n_streams_f32 = n_streams as f32;
+
+ let mut total_count = 0.0;
+ let mut ramp_up_complete_instant: Option<Instant> = None;
+
+ #[cfg(feature = "tuning")]
+ let ctx_0 = gstthreadshare::runtime::Context::acquire(
+ "context-0",
+ Duration::from_millis(wait as u64),
+ )
+ .unwrap();
+ #[cfg(feature = "tuning")]
+ let mut parked_init = Duration::ZERO;
loop {
- count = counter.fetch_and(0, Ordering::SeqCst);
+ total_count += counter.fetch_and(0, Ordering::SeqCst) as f32 / n_streams_f32;
if let Some(max_buffers) = max_buffers {
- if count > max_buffers {
+ if total_count > max_buffers {
gst::info!(CAT, "Stopping");
let stopping_instant = Instant::now();
pipeline.set_state(gst::State::Ready).unwrap();
@@ -263,22 +257,34 @@ fn main() {
}
}
- reset_instant = Instant::now();
-
- if let Some(prev_reset_instant) = prev_reset_instant {
+ if let Some(init) = ramp_up_complete_instant {
+ let elapsed = init.elapsed();
gst::info!(
CAT,
"{:>6.2} / s / stream",
- (count as f32) * throughput_factor
- / ((reset_instant - prev_reset_instant).as_millis() as f32)
+ total_count * 1_000.0 / elapsed.as_millis() as f32
);
- }
- if let Some(sleep_duration) = THROUGHPUT_PERIOD.checked_sub(reset_instant.elapsed()) {
- thread::sleep(sleep_duration);
+ #[cfg(feature = "tuning")]
+ gst::info!(
+ CAT,
+ "{:>6.2}% parked",
+ (ctx_0.parked_duration() - parked_init).as_nanos() as f32 * 100.0
+ / elapsed.as_nanos() as f32
+ );
+ } else {
+ // Ramp up 30s worth of buffers before following parked
+ if total_count > 50.0 * 30.0 {
+ total_count = 0.0;
+ ramp_up_complete_instant = Some(Instant::now());
+ #[cfg(feature = "tuning")]
+ {
+ parked_init = ctx_0.parked_duration();
+ }
+ }
}
- prev_reset_instant = Some(reset_instant);
+ thread::sleep(THROUGHPUT_PERIOD);
}
});
diff --git a/generic/threadshare/examples/standalone/sink/imp.rs b/generic/threadshare/examples/standalone/sink/imp.rs
index 67ab406d1..de0406f58 100644
--- a/generic/threadshare/examples/standalone/sink/imp.rs
+++ b/generic/threadshare/examples/standalone/sink/imp.rs
@@ -189,6 +189,8 @@ struct Stats {
interval_late_warn: Duration,
interval_late_count: f32,
interval_late_count_delta: f32,
+ #[cfg(feature = "tuning")]
+ parked_duration_init: Duration,
}
impl Stats {
@@ -238,6 +240,11 @@ impl Stats {
gst::info!(CAT, "Ramp up complete. Stats logs in {:2?}", LOG_PERIOD);
self.log_start_instant = Some(Instant::now());
self.last_delta_instant = self.log_start_instant;
+
+ #[cfg(feature = "tuning")]
+ {
+ self.parked_duration_init = Context::current().unwrap().parked_duration();
+ }
}
use std::cmp::Ordering::*;
@@ -304,102 +311,115 @@ impl Stats {
self.last_delta_instant = Some(Instant::now());
- if self.buffer_count_delta > 1.0 {
- gst::info!(CAT, "Delta stats:");
-
- let interval_mean = self.interval_sum_delta / self.buffer_count_delta;
- let interval_std_dev = f32::sqrt(
- self.interval_square_sum_delta / self.buffer_count_delta - interval_mean.powi(2),
- );
-
- gst::info!(
+ gst::info!(CAT, "Delta stats:");
+ let interval_mean = self.interval_sum_delta / self.buffer_count_delta;
+ let interval_std_dev = f32::sqrt(
+ self.interval_square_sum_delta / self.buffer_count_delta - interval_mean.powi(2),
+ );
+
+ gst::info!(
+ CAT,
+ "o interval: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
+ Duration::from_nanos(interval_mean as u64),
+ Duration::from_nanos(interval_std_dev as u64),
+ self.interval_min_delta,
+ self.interval_max_delta,
+ );
+
+ if self.interval_late_count_delta > f32::EPSILON {
+ gst::warning!(
CAT,
- "o interval: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
- Duration::from_nanos(interval_mean as u64),
- Duration::from_nanos(interval_std_dev as u64),
- self.interval_min_delta,
- self.interval_max_delta,
+ "o {:5.2}% late buffers",
+ 100f32 * self.interval_late_count_delta / self.buffer_count_delta
);
+ }
- if self.interval_late_count_delta > 1.0 {
- gst::warning!(
- CAT,
- "o {:5.2}% late buffers",
- 100f32 * self.interval_late_count_delta / self.buffer_count_delta
- );
- }
+ self.interval_sum_delta = 0.0;
+ self.interval_square_sum_delta = 0.0;
+ self.interval_min_delta = Duration::MAX;
+ self.interval_max_delta = Duration::ZERO;
+ self.interval_late_count_delta = 0.0;
- self.interval_sum_delta = 0.0;
- self.interval_square_sum_delta = 0.0;
- self.interval_min_delta = Duration::MAX;
- self.interval_max_delta = Duration::ZERO;
- self.interval_late_count_delta = 0.0;
+ let latency_mean = self.latency_sum_delta / self.buffer_count_delta;
+ let latency_std_dev = f32::sqrt(
+ self.latency_square_sum_delta / self.buffer_count_delta - latency_mean.powi(2),
+ );
- let latency_mean = self.latency_sum_delta / self.buffer_count_delta;
- let latency_std_dev = f32::sqrt(
- self.latency_square_sum_delta / self.buffer_count_delta - latency_mean.powi(2),
- );
+ gst::info!(
+ CAT,
+ "o latency: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
+ Duration::from_nanos(latency_mean as u64),
+ Duration::from_nanos(latency_std_dev as u64),
+ self.latency_min_delta,
+ self.latency_max_delta,
+ );
- gst::info!(
- CAT,
- "o latency: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
- Duration::from_nanos(latency_mean as u64),
- Duration::from_nanos(latency_std_dev as u64),
- self.latency_min_delta,
- self.latency_max_delta,
- );
-
- self.latency_sum_delta = 0.0;
- self.latency_square_sum_delta = 0.0;
- self.latency_min_delta = Duration::MAX;
- self.latency_max_delta = Duration::ZERO;
- }
+ self.latency_sum_delta = 0.0;
+ self.latency_square_sum_delta = 0.0;
+ self.latency_min_delta = Duration::MAX;
+ self.latency_max_delta = Duration::ZERO;
self.buffer_count_delta = 0.0;
}
fn log_global(&mut self) {
- if self.log_start_instant.is_none() {
+ if self.buffer_count < 1.0 {
return;
}
- if self.buffer_count > 1.0 {
- gst::info!(CAT, "Global stats:");
+ let _log_start = if let Some(log_start) = self.log_start_instant {
+ log_start
+ } else {
+ return;
+ };
- let interval_mean = self.interval_sum / self.buffer_count;
- let interval_std_dev =
- f32::sqrt(self.interval_square_sum / self.buffer_count - interval_mean.powi(2));
+ gst::info!(CAT, "Global stats:");
+ #[cfg(feature = "tuning")]
+ {
+ let duration = _log_start.elapsed();
+ let parked_duration =
+ Context::current().unwrap().parked_duration() - self.parked_duration_init;
gst::info!(
CAT,
- "o interval: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
- Duration::from_nanos(interval_mean as u64),
- Duration::from_nanos(interval_std_dev as u64),
- self.interval_min,
- self.interval_max,
+ "o parked: {parked_duration:4.2?} ({:5.2?}%)",
+ (parked_duration.as_nanos() as f32 * 100.0 / duration.as_nanos() as f32)
);
+ }
- if self.interval_late_count > f32::EPSILON {
- gst::warning!(
- CAT,
- "o {:5.2}% late buffers",
- 100f32 * self.interval_late_count / self.buffer_count
- );
- }
+ let interval_mean = self.interval_sum / self.buffer_count;
+ let interval_std_dev =
+ f32::sqrt(self.interval_square_sum / self.buffer_count - interval_mean.powi(2));
- let latency_mean = self.latency_sum / self.buffer_count;
- let latency_std_dev =
- f32::sqrt(self.latency_square_sum / self.buffer_count - latency_mean.powi(2));
+ gst::info!(
+ CAT,
+ "o interval: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
+ Duration::from_nanos(interval_mean as u64),
+ Duration::from_nanos(interval_std_dev as u64),
+ self.interval_min,
+ self.interval_max,
+ );
- gst::info!(
+ if self.interval_late_count > f32::EPSILON {
+ gst::warning!(
CAT,
- "o latency: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
- Duration::from_nanos(latency_mean as u64),
- Duration::from_nanos(latency_std_dev as u64),
- self.latency_min,
- self.latency_max,
+ "o {:5.2}% late buffers",
+ 100f32 * self.interval_late_count / self.buffer_count
);
}
+
+ let latency_mean = self.latency_sum / self.buffer_count;
+ let latency_std_dev =
+ f32::sqrt(self.latency_square_sum / self.buffer_count - latency_mean.powi(2));
+
+ gst::info!(
+ CAT,
+ "o latency: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
+ Duration::from_nanos(latency_mean as u64),
+ Duration::from_nanos(latency_std_dev as u64),
+ self.latency_min,
+ self.latency_max,
+ );
}
}
diff --git a/generic/threadshare/src/runtime/executor/context.rs b/generic/threadshare/src/runtime/executor/context.rs
index f1555627a..325e20190 100644
--- a/generic/threadshare/src/runtime/executor/context.rs
+++ b/generic/threadshare/src/runtime/executor/context.rs
@@ -187,6 +187,14 @@ impl Context {
self.0.max_throttling()
}
+ /// Total duration the scheduler spent parked.
+ ///
+ /// This is only useful for performance evaluation.
+ #[cfg(feature = "tuning")]
+ pub fn parked_duration(&self) -> Duration {
+ self.0.parked_duration()
+ }
+
/// Returns `true` if a `Context` is running on current thread.
pub fn is_context_thread() -> bool {
Scheduler::is_scheduler_thread()
diff --git a/generic/threadshare/src/runtime/executor/scheduler.rs b/generic/threadshare/src/runtime/executor/scheduler.rs
index 53a7fab7f..958d88b3f 100644
--- a/generic/threadshare/src/runtime/executor/scheduler.rs
+++ b/generic/threadshare/src/runtime/executor/scheduler.rs
@@ -11,6 +11,8 @@ use gio::glib::clone::Downgrade;
use std::cell::RefCell;
use std::future::Future;
use std::panic;
+#[cfg(feature = "tuning")]
+use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::mpsc as sync_mpsc;
use std::sync::{Arc, Condvar, Mutex, Weak};
use std::task::Poll;
@@ -34,6 +36,8 @@ pub(super) struct Scheduler {
tasks: TaskQueue,
must_unpark: Mutex<bool>,
must_unpark_cvar: Condvar,
+ #[cfg(feature = "tuning")]
+ parked_duration: AtomicU64,
}
impl Scheduler {
@@ -104,6 +108,8 @@ impl Scheduler {
tasks: TaskQueue::new(context_name),
must_unpark: Mutex::new(false),
must_unpark_cvar: Condvar::new(),
+ #[cfg(feature = "tuning")]
+ parked_duration: AtomicU64::new(0),
}));
*cur_scheduler = Some(handle.downgrade());
@@ -201,12 +207,16 @@ impl Scheduler {
break;
}
- if let Some(wait_duration) = self.max_throttling.checked_sub(last.elapsed()) {
+ if let Some(parking_duration) = self.max_throttling.checked_sub(last.elapsed()) {
let result = self
.must_unpark_cvar
- .wait_timeout(must_unpark, wait_duration)
+ .wait_timeout(must_unpark, parking_duration)
.unwrap();
+ #[cfg(feature = "tuning")]
+ self.parked_duration
+ .fetch_add(parking_duration.subsec_nanos() as u64, Ordering::Relaxed);
+
must_unpark = result.0;
} else {
*must_unpark = false;
@@ -360,6 +370,11 @@ impl Handle {
self.0.scheduler.max_throttling
}
+ #[cfg(feature = "tuning")]
+ pub fn parked_duration(&self) -> Duration {
+ Duration::from_nanos(self.0.scheduler.parked_duration.load(Ordering::Relaxed))
+ }
+
/// Executes the provided function relatively to this [`Scheduler`]'s [`Reactor`].
///
/// Usefull to initialze i/o sources and timers from outside