Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrançois Laignel <fengalin@free.fr>2022-08-19 20:40:05 +0300
committerSebastian Dröge <slomo@coaxion.net>2022-09-13 10:29:50 +0300
commit72acbebff0147043a93abd4190e28544afa41801 (patch)
tree5cccc903df20fee8e0ce8235a979dab6158cffc0 /generic
parent348a1d0207c73a2ac3ad0bbdcd7cc874bb216a64 (diff)
ts/standalone: multiple improvements
- Reworked buffer push. - Reworked stats. - Make first elements logs stand out. This make it possible to follow what's going on with pipelines containing 1000s of elements. - Actually handle EOS. - Use more significant defaults. - Allow building without `clap` feature.
Diffstat (limited to 'generic')
-rw-r--r--generic/threadshare/Cargo.toml3
-rw-r--r--generic/threadshare/examples/standalone/main.rs120
-rw-r--r--generic/threadshare/examples/standalone/sink/imp.rs609
-rw-r--r--generic/threadshare/examples/standalone/src/imp.rs278
-rw-r--r--generic/threadshare/src/runtime/executor/timer.rs10
5 files changed, 685 insertions, 335 deletions
diff --git a/generic/threadshare/Cargo.toml b/generic/threadshare/Cargo.toml
index 8a34d3336..80ae0deab 100644
--- a/generic/threadshare/Cargo.toml
+++ b/generic/threadshare/Cargo.toml
@@ -54,9 +54,8 @@ name = "tcpclientsrc-benchmark-sender"
path = "examples/tcpclientsrc_benchmark_sender.rs"
[[example]]
-name = "standalone"
+name = "ts-standalone"
path = "examples/standalone/main.rs"
-required-features = ["clap"]
[build-dependencies]
gst-plugin-version-helper = { path="../../version-helper" }
diff --git a/generic/threadshare/examples/standalone/main.rs b/generic/threadshare/examples/standalone/main.rs
index 1b14bc62e..8852d9f72 100644
--- a/generic/threadshare/examples/standalone/main.rs
+++ b/generic/threadshare/examples/standalone/main.rs
@@ -31,14 +31,18 @@ gst::plugin_define!(
env!("BUILD_REL_DATE")
);
+#[cfg(feature = "clap")]
use clap::Parser;
+#[cfg(feature = "clap")]
#[derive(Parser, Debug)]
#[clap(version)]
-#[clap(about = "Standalone pipeline threadshare runtime test")]
+#[clap(
+ about = "Standalone pipeline threadshare runtime test. Use `GST_DEBUG=ts-standalone*:4` for stats"
+)]
struct Args {
/// Parallel streams to process.
- #[clap(short, long, default_value_t = 100)]
+ #[clap(short, long, default_value_t = 5000)]
streams: u32,
/// Threadshare groups.
@@ -49,13 +53,67 @@ struct Args {
#[clap(short, long, default_value_t = 20)]
wait: u32,
+ /// Buffer push period in ms.
+ #[clap(short, long, default_value_t = 20)]
+ push_period: u32,
+
/// Number of buffers per stream to output before sending EOS (-1 = unlimited).
- #[clap(short, long, default_value_t = 6000)]
+ #[clap(short, long, default_value_t = 5000)]
num_buffers: i32,
- /// Enables statistics logging (use GST_DEBUG=ts-standalone*:4).
+ /// Disables statistics logging.
#[clap(short, long)]
- log_stats: bool,
+ disable_stats_log: bool,
+}
+
+#[cfg(not(feature = "clap"))]
+#[derive(Debug)]
+struct Args {
+ streams: u32,
+ groups: u32,
+ wait: u32,
+ push_period: u32,
+ num_buffers: i32,
+ disable_stats_log: bool,
+}
+
+#[cfg(not(feature = "clap"))]
+impl Default for Args {
+ fn default() -> Self {
+ Args {
+ streams: 5000,
+ groups: 2,
+ wait: 20,
+ push_period: 20,
+ num_buffers: 5000,
+ disable_stats_log: false,
+ }
+ }
+}
+
+fn args() -> Args {
+ #[cfg(feature = "clap")]
+ let args = {
+ let args = Args::parse();
+ gst::info!(CAT, "{:?}", args);
+
+ args
+ };
+
+ #[cfg(not(feature = "clap"))]
+ let args = {
+ if std::env::args().len() > 1 {
+ gst::warning!(CAT, "Ignoring command line arguments");
+ gst::warning!(CAT, "Build with `--features=clap`");
+ }
+
+ let args = Args::default();
+ gst::warning!(CAT, "{:?}", args);
+
+ args
+ };
+
+ args
}
fn main() {
@@ -68,7 +126,7 @@ fn main() {
#[cfg(debug_assertions)]
gst::warning!(CAT, "RUNNING DEBUG BUILD");
- let args = Args::parse();
+ let args = args();
let pipeline = gst::Pipeline::new(None);
@@ -82,6 +140,7 @@ fn main() {
.unwrap();
src.set_property("context", &ctx_name);
src.set_property("context-wait", args.wait);
+ src.set_property("push-period", args.push_period);
src.set_property("num-buffers", args.num_buffers);
let sink = gst::ElementFactory::make(
@@ -91,8 +150,27 @@ fn main() {
.unwrap();
sink.set_property("context", &ctx_name);
sink.set_property("context-wait", args.wait);
- if i == 0 && args.log_stats {
- sink.set_property("must-log-stats", true);
+
+ if i == 0 {
+ src.set_property("raise-log-level", true);
+ sink.set_property("raise-log-level", true);
+
+ if !args.disable_stats_log {
+ // Don't use the last 5 secs in stats
+ // otherwise we get outliers when reaching EOS.
+ // Note that stats don't start before the 20 first seconds
+ // and we get 50 buffers per sec.
+ const BUFFERS_BEFORE_LOGS: i32 = 20 * 50;
+ const BUFFERS_TO_SKIP: i32 = BUFFERS_BEFORE_LOGS + 5 * 50;
+ if args.num_buffers > BUFFERS_TO_SKIP {
+ sink.set_property("push-period", args.push_period);
+ sink.set_property("logs-stats", true);
+ let max_buffers = args.num_buffers - BUFFERS_TO_SKIP;
+ sink.set_property("max-buffers", max_buffers);
+ } else {
+ gst::warning!(CAT, "Not enough buffers to log, disabling stats");
+ }
+ }
}
let elements = &[&src, &sink];
@@ -109,11 +187,8 @@ fn main() {
use gst::MessageView;
match msg.view() {
- MessageView::Eos(..) => {
- gst::info!(CAT, "Shuting down");
- let stop = Instant::now();
- pipeline_clone.set_state(gst::State::Null).unwrap();
- gst::info!(CAT, "Shuting down took {:.2?}", stop.elapsed());
+ MessageView::Eos(_) => {
+ gst::info!(CAT, "Received eos");
l_clone.quit();
}
MessageView::Error(err) => {
@@ -133,10 +208,25 @@ fn main() {
})
.expect("Failed to add bus watch");
- gst::info!(CAT, "Starting");
+ gst::info!(CAT, "Switching to Ready");
+ let start = Instant::now();
+ pipeline.set_state(gst::State::Ready).unwrap();
+ gst::info!(CAT, "Switching to Ready took {:.2?}", start.elapsed());
+
+ gst::info!(CAT, "Switching to Playing");
let start = Instant::now();
pipeline.set_state(gst::State::Playing).unwrap();
- gst::info!(CAT, "Starting took {:.2?}", start.elapsed());
+ gst::info!(CAT, "Switching to Playing took {:.2?}", start.elapsed());
l.run();
+
+ gst::info!(CAT, "Switching to Ready");
+ let stop = Instant::now();
+ pipeline_clone.set_state(gst::State::Ready).unwrap();
+ gst::info!(CAT, "Switching to Ready took {:.2?}", stop.elapsed());
+
+ gst::info!(CAT, "Shutting down");
+ let stop = Instant::now();
+ pipeline_clone.set_state(gst::State::Null).unwrap();
+ gst::info!(CAT, "Shutting down took {:.2?}", stop.elapsed());
}
diff --git a/generic/threadshare/examples/standalone/sink/imp.rs b/generic/threadshare/examples/standalone/sink/imp.rs
index d2083037b..67ab406d1 100644
--- a/generic/threadshare/examples/standalone/sink/imp.rs
+++ b/generic/threadshare/examples/standalone/sink/imp.rs
@@ -10,18 +10,17 @@ use futures::future::BoxFuture;
use futures::prelude::*;
use futures::stream::Peekable;
+use gst::error_msg;
use gst::glib;
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst::EventView;
-use gst::{element_error, error_msg};
use once_cell::sync::Lazy;
use gstthreadshare::runtime::prelude::*;
-use gstthreadshare::runtime::{self, Context, PadSink, PadSinkRef, Task};
+use gstthreadshare::runtime::{Context, PadSink, PadSinkRef, Task};
-use std::pin::Pin;
use std::sync::Mutex;
use std::task::Poll;
use std::time::{Duration, Instant};
@@ -36,32 +35,36 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
const DEFAULT_CONTEXT: &str = "";
const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_millis(20);
-const DEFAULT_SYNC: bool = true;
-const DEFAULT_MUST_LOG_STATS: bool = false;
+const DEFAULT_PUSH_PERIOD: Duration = Duration::from_millis(20);
+const DEFAULT_MAX_BUFFERS: i32 = 50 * (100 - 25);
const LOG_PERIOD: Duration = Duration::from_secs(20);
#[derive(Debug, Clone)]
struct Settings {
- sync: bool,
context: String,
context_wait: Duration,
- must_log_stats: bool,
+ raise_log_level: bool,
+ logs_stats: bool,
+ push_period: Duration,
+ max_buffers: Option<u32>,
}
impl Default for Settings {
fn default() -> Self {
Settings {
- sync: DEFAULT_SYNC,
context: DEFAULT_CONTEXT.into(),
context_wait: DEFAULT_CONTEXT_WAIT,
- must_log_stats: DEFAULT_MUST_LOG_STATS,
+ raise_log_level: false,
+ logs_stats: false,
+ push_period: DEFAULT_PUSH_PERIOD,
+ max_buffers: Some(DEFAULT_MAX_BUFFERS as u32),
}
}
}
#[derive(Debug)]
-enum TaskItem {
+enum StreamItem {
Buffer(gst::Buffer),
Event(gst::Event),
}
@@ -83,7 +86,7 @@ impl PadSinkHandler for TestSinkPadHandler {
let element = element.clone().downcast::<super::TestSink>().unwrap();
async move {
- if sender.send_async(TaskItem::Buffer(buffer)).await.is_err() {
+ if sender.send_async(StreamItem::Buffer(buffer)).await.is_err() {
gst::debug!(CAT, obj: &element, "Flushing");
return Err(gst::FlowError::Flushing);
}
@@ -105,7 +108,7 @@ impl PadSinkHandler for TestSinkPadHandler {
async move {
for buffer in list.iter_owned() {
- if sender.send_async(TaskItem::Buffer(buffer)).await.is_err() {
+ if sender.send_async(StreamItem::Buffer(buffer)).await.is_err() {
gst::debug!(CAT, obj: &element, "Flushing");
return Err(gst::FlowError::Flushing);
}
@@ -130,7 +133,7 @@ impl PadSinkHandler for TestSinkPadHandler {
if let EventView::FlushStop(_) = event.view() {
let test_sink = element.imp();
return test_sink.task.flush_stop().await_maybe_on_context().is_ok();
- } else if sender.send_async(TaskItem::Event(event)).await.is_err() {
+ } else if sender.send_async(StreamItem::Event(event)).await.is_err() {
gst::debug!(CAT, obj: &element, "Flushing");
}
@@ -161,20 +164,31 @@ impl PadSinkHandler for TestSinkPadHandler {
#[derive(Default)]
struct Stats {
must_log: bool,
- sync: bool,
ramp_up_instant: Option<Instant>,
log_start_instant: Option<Instant>,
last_delta_instant: Option<Instant>,
- buffer_count: u32,
- buffer_count_delta: u32,
- buffer_headroom: Duration,
- buffer_headroom_delta: Duration,
- late_buffer_count: u32,
- lateness: Duration,
- max_lateness: Duration,
- late_buffer_count_delta: u32,
- lateness_delta: Duration,
- max_lateness_delta: Duration,
+ max_buffers: Option<f32>,
+ buffer_count: f32,
+ buffer_count_delta: f32,
+ latency_sum: f32,
+ latency_square_sum: f32,
+ latency_sum_delta: f32,
+ latency_square_sum_delta: f32,
+ latency_min: Duration,
+ latency_min_delta: Duration,
+ latency_max: Duration,
+ latency_max_delta: Duration,
+ interval_sum: f32,
+ interval_square_sum: f32,
+ interval_sum_delta: f32,
+ interval_square_sum_delta: f32,
+ interval_min: Duration,
+ interval_min_delta: Duration,
+ interval_max: Duration,
+ interval_max_delta: Duration,
+ interval_late_warn: Duration,
+ interval_late_count: f32,
+ interval_late_count_delta: f32,
}
impl Stats {
@@ -183,24 +197,34 @@ impl Stats {
return;
}
- self.buffer_count = 0;
- self.buffer_count_delta = 0;
- self.buffer_headroom = Duration::ZERO;
- self.buffer_headroom_delta = Duration::ZERO;
- self.late_buffer_count = 0;
- self.lateness = Duration::ZERO;
- self.max_lateness = Duration::ZERO;
- self.late_buffer_count_delta = 0;
- self.lateness_delta = Duration::ZERO;
- self.max_lateness_delta = Duration::ZERO;
+ self.buffer_count = 0.0;
+ self.buffer_count_delta = 0.0;
+ self.latency_sum = 0.0;
+ self.latency_square_sum = 0.0;
+ self.latency_sum_delta = 0.0;
+ self.latency_square_sum_delta = 0.0;
+ self.latency_min = Duration::MAX;
+ self.latency_min_delta = Duration::MAX;
+ self.latency_max = Duration::ZERO;
+ self.latency_max_delta = Duration::ZERO;
+ self.interval_sum = 0.0;
+ self.interval_square_sum = 0.0;
+ self.interval_sum_delta = 0.0;
+ self.interval_square_sum_delta = 0.0;
+ self.interval_min = Duration::MAX;
+ self.interval_min_delta = Duration::MAX;
+ self.interval_max = Duration::ZERO;
+ self.interval_max_delta = Duration::ZERO;
+ self.interval_late_count = 0.0;
+ self.interval_late_count_delta = 0.0;
self.last_delta_instant = None;
self.log_start_instant = None;
self.ramp_up_instant = Some(Instant::now());
- gst::info!(CAT, "First stats logs in {:.2?}", 2 * LOG_PERIOD);
+ gst::info!(CAT, "First stats logs in {:2?}", 2 * LOG_PERIOD);
}
- fn can_count(&mut self) -> bool {
+ fn is_active(&mut self) -> bool {
if !self.must_log {
return false;
}
@@ -211,53 +235,64 @@ impl Stats {
}
self.ramp_up_instant = None;
- gst::info!(CAT, "Ramp up complete. Stats logs in {:.2?}", LOG_PERIOD);
+ gst::info!(CAT, "Ramp up complete. Stats logs in {:2?}", LOG_PERIOD);
self.log_start_instant = Some(Instant::now());
self.last_delta_instant = self.log_start_instant;
}
- true
+ use std::cmp::Ordering::*;
+ match self.max_buffers.opt_cmp(self.buffer_count) {
+ Some(Equal) => {
+ self.log_global();
+ self.buffer_count += 1.0;
+ false
+ }
+ Some(Less) => false,
+ _ => true,
+ }
}
- fn notify_buffer(&mut self) {
- if !self.can_count() {
+ fn add_buffer(&mut self, latency: Duration, interval: Duration) {
+ if !self.is_active() {
return;
}
- self.buffer_count += 1;
- self.buffer_count_delta += 1;
- }
+ self.buffer_count += 1.0;
+ self.buffer_count_delta += 1.0;
- fn notify_buffer_headroom(&mut self, headroom: Duration) {
- if !self.can_count() {
- return;
- }
+ // Latency
+ let latency_f32 = latency.as_nanos() as f32;
+ let latency_square = latency_f32.powi(2);
- self.buffer_headroom += headroom;
- self.buffer_headroom_delta += headroom;
- }
+ self.latency_sum += latency_f32;
+ self.latency_square_sum += latency_square;
+ self.latency_min = self.latency_min.min(latency);
+ self.latency_max = self.latency_max.max(latency);
- fn notify_late_buffer(&mut self, now: Option<gst::ClockTime>, pts: gst::ClockTime) {
- if !self.can_count() {
- return;
- }
+ self.latency_sum_delta += latency_f32;
+ self.latency_square_sum_delta += latency_square;
+ self.latency_min_delta = self.latency_min_delta.min(latency);
+ self.latency_max_delta = self.latency_max_delta.max(latency);
- let lateness = now
- .opt_checked_sub(pts)
- .ok()
- .flatten()
- .map_or(Duration::ZERO, Duration::from);
+ // Interval
+ let interval_f32 = interval.as_nanos() as f32;
+ let interval_square = interval_f32.powi(2);
- self.late_buffer_count += 1;
- self.lateness += lateness;
- self.max_lateness = self.max_lateness.max(lateness);
+ self.interval_sum += interval_f32;
+ self.interval_square_sum += interval_square;
+ self.interval_min = self.interval_min.min(interval);
+ self.interval_max = self.interval_max.max(interval);
- self.late_buffer_count_delta += 1;
- self.lateness_delta += lateness;
- self.max_lateness_delta = self.max_lateness_delta.max(lateness);
- }
+ self.interval_sum_delta += interval_f32;
+ self.interval_square_sum_delta += interval_square;
+ self.interval_min_delta = self.interval_min_delta.min(interval);
+ self.interval_max_delta = self.interval_max_delta.max(interval);
+
+ if interval > self.interval_late_warn {
+ self.interval_late_count += 1.0;
+ self.interval_late_count_delta += 1.0;
+ }
- fn log_delta(&mut self) {
let delta_duration = match self.last_delta_instant {
Some(last_delta) => last_delta.elapsed(),
None => return,
@@ -269,94 +304,121 @@ impl Stats {
self.last_delta_instant = Some(Instant::now());
- gst::info!(CAT, "Delta stats:");
- gst::info!(
- CAT,
- "o {:>5.2} buffers / s",
- self.buffer_count_delta as f32 / delta_duration.as_millis() as f32 * 1_000f32,
- );
-
- if self.sync && self.buffer_count_delta > 0 {
- let early_buffers_count = self
- .buffer_count_delta
- .saturating_sub(self.late_buffer_count_delta);
- if early_buffers_count > 0 {
- gst::info!(
- CAT,
- "o {:>5.2?} headroom / early buffers",
- self.buffer_headroom_delta / early_buffers_count,
- );
+ if self.buffer_count_delta > 1.0 {
+ gst::info!(CAT, "Delta stats:");
- gst::info!(
+ let interval_mean = self.interval_sum_delta / self.buffer_count_delta;
+ let interval_std_dev = f32::sqrt(
+ self.interval_square_sum_delta / self.buffer_count_delta - interval_mean.powi(2),
+ );
+
+ gst::info!(
+ CAT,
+ "o interval: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
+ Duration::from_nanos(interval_mean as u64),
+ Duration::from_nanos(interval_std_dev as u64),
+ self.interval_min_delta,
+ self.interval_max_delta,
+ );
+
+ if self.interval_late_count_delta > 1.0 {
+ gst::warning!(
CAT,
- "o {:>5.2}% late buffers - mean {:>5.2?}, max {:>5.2?}",
- self.late_buffer_count_delta as f32 / self.buffer_count_delta as f32 * 100f32,
- self.lateness_delta
- .checked_div(self.late_buffer_count_delta)
- .unwrap_or(Duration::ZERO),
- self.max_lateness_delta,
+ "o {:5.2}% late buffers",
+ 100f32 * self.interval_late_count_delta / self.buffer_count_delta
);
}
- self.buffer_headroom_delta = Duration::ZERO;
- self.late_buffer_count_delta = 0;
- self.lateness_delta = Duration::ZERO;
- self.max_lateness_delta = Duration::ZERO;
+ self.interval_sum_delta = 0.0;
+ self.interval_square_sum_delta = 0.0;
+ self.interval_min_delta = Duration::MAX;
+ self.interval_max_delta = Duration::ZERO;
+ self.interval_late_count_delta = 0.0;
+
+ let latency_mean = self.latency_sum_delta / self.buffer_count_delta;
+ let latency_std_dev = f32::sqrt(
+ self.latency_square_sum_delta / self.buffer_count_delta - latency_mean.powi(2),
+ );
+
+ gst::info!(
+ CAT,
+ "o latency: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
+ Duration::from_nanos(latency_mean as u64),
+ Duration::from_nanos(latency_std_dev as u64),
+ self.latency_min_delta,
+ self.latency_max_delta,
+ );
+
+ self.latency_sum_delta = 0.0;
+ self.latency_square_sum_delta = 0.0;
+ self.latency_min_delta = Duration::MAX;
+ self.latency_max_delta = Duration::ZERO;
}
- self.buffer_count_delta = 0;
+ self.buffer_count_delta = 0.0;
}
fn log_global(&mut self) {
- let log_duration = match self.log_start_instant {
- Some(start) => start.elapsed(),
- None => return,
- };
+ if self.log_start_instant.is_none() {
+ return;
+ }
- gst::info!(CAT, "Global stats:");
- gst::info!(
- CAT,
- "o {:>5.2} buffers / s",
- self.buffer_count as f32 / log_duration.as_millis() as f32 * 1_000f32,
- );
-
- if self.sync && self.buffer_count > 0 {
- let early_buffers_count = self.buffer_count.saturating_sub(self.late_buffer_count);
- if early_buffers_count > 0 {
- gst::info!(
- CAT,
- "o {:>5.2?} headroom / early buffers",
- self.buffer_headroom / early_buffers_count,
- );
+ if self.buffer_count > 1.0 {
+ gst::info!(CAT, "Global stats:");
- gst::info!(
+ let interval_mean = self.interval_sum / self.buffer_count;
+ let interval_std_dev =
+ f32::sqrt(self.interval_square_sum / self.buffer_count - interval_mean.powi(2));
+
+ gst::info!(
+ CAT,
+ "o interval: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
+ Duration::from_nanos(interval_mean as u64),
+ Duration::from_nanos(interval_std_dev as u64),
+ self.interval_min,
+ self.interval_max,
+ );
+
+ if self.interval_late_count > f32::EPSILON {
+ gst::warning!(
CAT,
- "o {:>5.2}% late buffers - mean {:>5.2?}, max {:>5.2?}",
- self.late_buffer_count as f32 / self.buffer_count as f32 * 100f32,
- self.lateness
- .checked_div(self.late_buffer_count)
- .unwrap_or(Duration::ZERO),
- self.max_lateness,
+ "o {:5.2}% late buffers",
+ 100f32 * self.interval_late_count / self.buffer_count
);
}
+
+ let latency_mean = self.latency_sum / self.buffer_count;
+ let latency_std_dev =
+ f32::sqrt(self.latency_square_sum / self.buffer_count - latency_mean.powi(2));
+
+ gst::info!(
+ CAT,
+ "o latency: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
+ Duration::from_nanos(latency_mean as u64),
+ Duration::from_nanos(latency_std_dev as u64),
+ self.latency_min,
+ self.latency_max,
+ );
}
}
}
struct TestSinkTask {
element: super::TestSink,
- item_receiver: Peekable<flume::r#async::RecvStream<'static, TaskItem>>,
- sync: bool,
+ raise_log_level: bool,
+ last_dts: Option<gst::ClockTime>,
+ item_receiver: Peekable<flume::r#async::RecvStream<'static, StreamItem>>,
stats: Stats,
segment: Option<gst::Segment>,
}
impl TestSinkTask {
- fn new(element: &super::TestSink, item_receiver: flume::Receiver<TaskItem>) -> Self {
+ fn new(element: &super::TestSink, item_receiver: flume::Receiver<StreamItem>) -> Self {
TestSinkTask {
element: element.clone(),
+ raise_log_level: false,
+ last_dts: None,
item_receiver: item_receiver.into_stream().peekable(),
- sync: DEFAULT_SYNC,
stats: Stats::default(),
segment: None,
}
@@ -369,17 +431,23 @@ impl TestSinkTask {
}
impl TaskImpl for TestSinkTask {
- type Item = TaskItem;
+ type Item = StreamItem;
fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
- gst::log!(CAT, obj: &self.element, "Preparing Task");
-
let sink = self.element.imp();
let settings = sink.settings.lock().unwrap();
- self.sync = settings.sync;
- self.stats.sync = self.sync;
- self.stats.must_log = settings.must_log_stats;
+ self.raise_log_level = settings.raise_log_level;
+
+ if self.raise_log_level {
+ gst::log!(CAT, obj: &self.element, "Preparing Task");
+ } else {
+ gst::trace!(CAT, obj: &self.element, "Preparing Task");
+ }
+
+ self.stats.must_log = settings.logs_stats;
+ self.stats.max_buffers = settings.max_buffers.map(|max_buffers| max_buffers as f32);
+ self.stats.interval_late_warn = settings.push_period + settings.context_wait / 2;
Ok(())
}
@@ -388,7 +456,13 @@ impl TaskImpl for TestSinkTask {
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async {
- gst::log!(CAT, obj: &self.element, "Starting Task");
+ if self.raise_log_level {
+ gst::log!(CAT, obj: &self.element, "Starting Task");
+ } else {
+ gst::trace!(CAT, obj: &self.element, "Starting Task");
+ }
+
+ self.last_dts = None;
self.stats.start();
Ok(())
}
@@ -397,81 +471,92 @@ impl TaskImpl for TestSinkTask {
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async {
- gst::log!(CAT, obj: &self.element, "Stopping Task");
- self.flush().await;
- Ok(())
- }
- .boxed()
- }
+ if self.raise_log_level {
+ gst::log!(CAT, obj: &self.element, "Stopping Task");
+ } else {
+ gst::trace!(CAT, obj: &self.element, "Stopping Task");
+ }
- fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
- async {
- gst::log!(CAT, obj: &self.element, "Starting Task Flush");
self.flush().await;
Ok(())
}
.boxed()
}
- fn try_next(&mut self) -> BoxFuture<'_, Result<TaskItem, gst::FlowError>> {
+ fn try_next(&mut self) -> BoxFuture<'_, Result<StreamItem, gst::FlowError>> {
async move {
- let item_opt = Pin::new(&mut self.item_receiver).peek().await;
+ let item = self.item_receiver.next().await.unwrap();
- // Check the peeked item in case we need to sync.
- // The item will still be available in the channel
- // in case this is cancelled by a state transition.
- match item_opt {
- Some(TaskItem::Buffer(buffer)) => {
- self.stats.notify_buffer();
-
- if self.sync {
- let rtime = self.segment.as_ref().and_then(|segment| {
- segment
- .downcast_ref::<gst::format::Time>()
- .and_then(|segment| segment.to_running_time(buffer.pts()))
- });
- if let Some(pts) = rtime {
- // This can be cancelled by a state transition.
- self.sync(pts).await;
- }
- }
- }
- Some(_) => (),
- None => {
- panic!("Internal channel sender dropped while Task is Started");
- }
+ if self.raise_log_level {
+ gst::log!(CAT, obj: &self.element, "Popped item");
+ } else {
+ gst::trace!(CAT, obj: &self.element, "Popped item");
}
- // An item was peeked above, we can now pop it without losing it.
- Ok(self.item_receiver.next().await.unwrap())
+ Ok(item)
}
.boxed()
}
- fn handle_item(&mut self, item: TaskItem) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ fn handle_item(&mut self, item: StreamItem) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
- gst::debug!(CAT, obj: &self.element, "Handling {:?}", item);
+ if self.raise_log_level {
+ gst::debug!(CAT, obj: &self.element, "Received {:?}", item);
+ } else {
+ gst::trace!(CAT, obj: &self.element, "Received {:?}", item);
+ }
match item {
- TaskItem::Buffer(buffer) => {
- self.render(buffer).await.map_err(|err| {
- element_error!(
- &self.element,
- gst::StreamError::Failed,
- ["Failed to render item, stopping task: {}", err]
- );
- gst::FlowError::Error
- })?;
-
- self.stats.log_delta();
+ StreamItem::Buffer(buffer) => {
+ let dts = self
+ .segment
+ .as_ref()
+ .and_then(|segment| {
+ segment
+ .downcast_ref::<gst::format::Time>()
+ .and_then(|segment| segment.to_running_time(buffer.dts()))
+ })
+ .unwrap();
+
+ if let Some(last_dts) = self.last_dts {
+ let cur_ts = self.element.current_running_time().unwrap();
+ let latency: Duration = (cur_ts - dts).into();
+ let interval: Duration = (dts - last_dts).into();
+
+ self.stats.add_buffer(latency, interval);
+
+ if self.raise_log_level {
+ gst::debug!(CAT, obj: &self.element, "o latency {:.2?}", latency);
+ gst::debug!(CAT, obj: &self.element, "o interval {:.2?}", interval);
+ } else {
+ gst::trace!(CAT, obj: &self.element, "o latency {:.2?}", latency);
+ gst::trace!(CAT, obj: &self.element, "o interval {:.2?}", interval);
+ }
+ }
+
+ self.last_dts = Some(dts);
+
+ if self.raise_log_level {
+ gst::log!(CAT, obj: &self.element, "Buffer processed");
+ } else {
+ gst::trace!(CAT, obj: &self.element, "Buffer processed");
+ }
}
- TaskItem::Event(event) => match event.view() {
+ StreamItem::Event(event) => match event.view() {
EventView::Eos(_) => {
- self.stats.log_global();
+ if self.raise_log_level {
+ gst::debug!(CAT, obj: &self.element, "EOS");
+ } else {
+ gst::trace!(CAT, obj: &self.element, "EOS");
+ }
- let _ = self
- .element
- .post_message(gst::message::Eos::builder().src(&self.element).build());
+ let elem = self.element.clone();
+ self.element.call_async(move |_| {
+ let _ =
+ elem.post_message(gst::message::Eos::builder().src(&elem).build());
+ });
+
+ return Err(gst::FlowError::Eos);
}
EventView::Segment(e) => {
self.segment = Some(e.segment().clone());
@@ -489,54 +574,27 @@ impl TaskImpl for TestSinkTask {
}
}
-impl TestSinkTask {
- async fn render(&mut self, buffer: gst::Buffer) -> Result<(), gst::FlowError> {
- let _data = buffer.map_readable().map_err(|_| {
- element_error!(
- self.element,
- gst::StreamError::Format,
- ["Failed to map buffer readable"]
- );
- gst::FlowError::Error
- })?;
-
- gst::log!(CAT, obj: &self.element, "buffer {:?} rendered", buffer);
-
- Ok(())
- }
-
- /// Waits until specified time.
- async fn sync(&mut self, pts: gst::ClockTime) {
- let now = self.element.current_running_time();
-
- if let Ok(Some(delay)) = pts.opt_checked_sub(now) {
- let delay = delay.into();
- gst::trace!(CAT, obj: &self.element, "sync: waiting {:?}", delay);
- runtime::time::delay_for(delay).await;
-
- self.stats.notify_buffer_headroom(delay);
- } else {
- self.stats.notify_late_buffer(now, pts);
- }
- }
-}
-
#[derive(Debug)]
pub struct TestSink {
sink_pad: PadSink,
task: Task,
- item_sender: Mutex<Option<flume::Sender<TaskItem>>>,
+ item_sender: Mutex<Option<flume::Sender<StreamItem>>>,
settings: Mutex<Settings>,
}
impl TestSink {
#[track_caller]
- fn clone_item_sender(&self) -> flume::Sender<TaskItem> {
+ fn clone_item_sender(&self) -> flume::Sender<StreamItem> {
self.item_sender.lock().unwrap().as_ref().unwrap().clone()
}
fn prepare(&self, element: &super::TestSink) -> Result<(), gst::ErrorMessage> {
- gst::debug!(CAT, obj: element, "Preparing");
+ let raise_log_level = self.settings.lock().unwrap().raise_log_level;
+ if raise_log_level {
+ gst::debug!(CAT, obj: element, "Preparing");
+ } else {
+ gst::trace!(CAT, obj: element, "Preparing");
+ }
let context = {
let settings = self.settings.lock().unwrap();
@@ -556,28 +614,67 @@ impl TestSink {
*self.item_sender.lock().unwrap() = Some(item_sender);
- gst::debug!(CAT, obj: element, "Started preparation");
+ if raise_log_level {
+ gst::debug!(CAT, obj: element, "Prepared");
+ } else {
+ gst::trace!(CAT, obj: element, "Prepared");
+ }
Ok(())
}
fn unprepare(&self, element: &super::TestSink) {
- gst::debug!(CAT, obj: element, "Unpreparing");
+ let raise_log_level = self.settings.lock().unwrap().raise_log_level;
+ if raise_log_level {
+ gst::debug!(CAT, obj: element, "Unpreparing");
+ } else {
+ gst::trace!(CAT, obj: element, "Unpreparing");
+ }
+
self.task.unprepare().block_on().unwrap();
- gst::debug!(CAT, obj: element, "Unprepared");
+
+ if raise_log_level {
+ gst::debug!(CAT, obj: element, "Unprepared");
+ } else {
+ gst::trace!(CAT, obj: element, "Unprepared");
+ }
}
fn stop(&self, element: &super::TestSink) -> Result<(), gst::ErrorMessage> {
- gst::debug!(CAT, obj: element, "Stopping");
+ let raise_log_level = self.settings.lock().unwrap().raise_log_level;
+ if raise_log_level {
+ gst::debug!(CAT, obj: element, "Stopping");
+ } else {
+ gst::trace!(CAT, obj: element, "Stopping");
+ }
+
self.task.stop().block_on()?;
- gst::debug!(CAT, obj: element, "Stopped");
+
+ if raise_log_level {
+ gst::debug!(CAT, obj: element, "Stopped");
+ } else {
+ gst::trace!(CAT, obj: element, "Stopped");
+ }
+
Ok(())
}
fn start(&self, element: &super::TestSink) -> Result<(), gst::ErrorMessage> {
- gst::debug!(CAT, obj: element, "Starting");
+ let raise_log_level = self.settings.lock().unwrap().raise_log_level;
+ if raise_log_level {
+ gst::debug!(CAT, obj: element, "Starting");
+ } else {
+ gst::trace!(CAT, obj: element, "Starting");
+ }
+
self.task.start().block_on()?;
- gst::debug!(CAT, obj: element, "Started");
+
+ if raise_log_level {
+ gst::debug!(CAT, obj: element, "Started");
+ } else {
+ gst::trace!(CAT, obj: element, "Started");
+ }
+
Ok(())
}
}
@@ -616,17 +713,27 @@ impl ObjectImpl for TestSink {
.maximum(1000)
.default_value(DEFAULT_CONTEXT_WAIT.as_millis() as u32)
.build(),
- glib::ParamSpecBoolean::builder("sync")
- .nick("Sync")
- .blurb("Sync on the clock")
- .default_value(DEFAULT_SYNC)
+ glib::ParamSpecBoolean::builder("raise-log-level")
+ .nick("Raise log level")
+ .blurb("Raises the log level so that this element stands out")
+ .write_only()
.build(),
- glib::ParamSpecBoolean::builder("must-log-stats")
- .nick("Must Log Stats")
+ glib::ParamSpecBoolean::builder("logs-stats")
+ .nick("Logs Stats")
.blurb("Whether statistics should be logged")
- .default_value(DEFAULT_MUST_LOG_STATS)
.write_only()
.build(),
+ glib::ParamSpecUInt::builder("push-period")
+ .nick("Src buffer Push Period")
+ .blurb("Push period used by `src` element (used for stats warnings)")
+ .default_value(DEFAULT_PUSH_PERIOD.as_millis() as u32)
+ .build(),
+ glib::ParamSpecInt::builder("max-buffers")
+ .nick("Max Buffers")
+ .blurb("Number of buffers to count before stopping stats (-1 = unlimited)")
+ .minimum(-1i32)
+ .default_value(DEFAULT_MAX_BUFFERS)
+ .build(),
]
});
@@ -642,10 +749,6 @@ impl ObjectImpl for TestSink {
) {
let mut settings = self.settings.lock().unwrap();
match pspec.name() {
- "sync" => {
- let sync = value.get().expect("type checked upstream");
- settings.sync = sync;
- }
"context" => {
settings.context = value
.get::<Option<String>>()
@@ -657,9 +760,21 @@ impl ObjectImpl for TestSink {
value.get::<u32>().expect("type checked upstream").into(),
);
}
- "must-log-stats" => {
- let must_log_stats = value.get().expect("type checked upstream");
- settings.must_log_stats = must_log_stats;
+ "raise-log-level" => {
+ settings.raise_log_level = value.get::<bool>().expect("type checked upstream");
+ }
+ "logs-stats" => {
+ let logs_stats = value.get().expect("type checked upstream");
+ settings.logs_stats = logs_stats;
+ }
+ "push-period" => {
+ settings.push_period = Duration::from_millis(
+ value.get::<u32>().expect("type checked upstream").into(),
+ );
+ }
+ "max-buffers" => {
+ let value = value.get::<i32>().expect("type checked upstream");
+ settings.max_buffers = if value > 0 { Some(value as u32) } else { None };
}
_ => unimplemented!(),
}
@@ -668,9 +783,15 @@ impl ObjectImpl for TestSink {
fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let settings = self.settings.lock().unwrap();
match pspec.name() {
- "sync" => settings.sync.to_value(),
"context" => settings.context.to_value(),
"context-wait" => (settings.context_wait.as_millis() as u32).to_value(),
+ "raise-log-level" => settings.raise_log_level.to_value(),
+ "push-period" => (settings.push_period.as_millis() as u32).to_value(),
+ "max-buffers" => settings
+ .max_buffers
+ .and_then(|val| val.try_into().ok())
+ .unwrap_or(-1i32)
+ .to_value(),
_ => unimplemented!(),
}
}
diff --git a/generic/threadshare/examples/standalone/src/imp.rs b/generic/threadshare/examples/standalone/src/imp.rs
index 6b5f1d426..afcb2c418 100644
--- a/generic/threadshare/examples/standalone/src/imp.rs
+++ b/generic/threadshare/examples/standalone/src/imp.rs
@@ -16,7 +16,7 @@ use gst::subclass::prelude::*;
use once_cell::sync::Lazy;
use std::sync::Mutex;
-use std::time::{Duration, Instant};
+use std::time::Duration;
use gstthreadshare::runtime::prelude::*;
use gstthreadshare::runtime::{Context, PadSrc, Task, Timer};
@@ -29,17 +29,18 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
)
});
-const BUFFER_DURATION: gst::ClockTime = gst::ClockTime::from_mseconds(20);
-
const DEFAULT_CONTEXT: &str = "";
const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_millis(20);
-const DEFAULT_NUM_BUFFERS: i32 = 50 * 60 * 2;
+const DEFAULT_PUSH_PERIOD: gst::ClockTime = gst::ClockTime::from_mseconds(20);
+const DEFAULT_NUM_BUFFERS: i32 = 50 * 100;
#[derive(Debug, Clone)]
struct Settings {
context: String,
context_wait: Duration,
- num_buffers: Option<i32>,
+ push_period: gst::ClockTime,
+ raise_log_level: bool,
+ num_buffers: Option<u32>,
}
impl Default for Settings {
@@ -47,7 +48,9 @@ impl Default for Settings {
Settings {
context: DEFAULT_CONTEXT.into(),
context_wait: DEFAULT_CONTEXT_WAIT,
- num_buffers: Some(DEFAULT_NUM_BUFFERS),
+ push_period: DEFAULT_PUSH_PERIOD,
+ raise_log_level: false,
+ num_buffers: Some(DEFAULT_NUM_BUFFERS as u32),
}
}
}
@@ -62,13 +65,13 @@ impl PadSrcHandler for TestSrcPadHandler {
struct SrcTask {
element: super::TestSrc,
buffer_pool: gst::BufferPool,
- last_pts: gst::ClockTime,
- last_buf_instant: Option<Instant>,
- push_period: Duration,
+ timer: Option<Timer>,
+ raise_log_level: bool,
+ push_period: gst::ClockTime,
need_initial_events: bool,
need_segment: bool,
- num_buffers: Option<i32>,
- buffer_count: i32,
+ num_buffers: Option<u32>,
+ buffer_count: u32,
}
impl SrcTask {
@@ -83,12 +86,12 @@ impl SrcTask {
SrcTask {
element,
buffer_pool,
- last_pts: gst::ClockTime::ZERO,
- last_buf_instant: None,
- push_period: Duration::ZERO,
+ timer: None,
+ raise_log_level: false,
+ push_period: gst::ClockTime::ZERO,
need_initial_events: true,
need_segment: true,
- num_buffers: Some(DEFAULT_NUM_BUFFERS),
+ num_buffers: Some(DEFAULT_NUM_BUFFERS as u32),
buffer_count: 0,
}
}
@@ -99,11 +102,17 @@ impl TaskImpl for SrcTask {
fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
- gst::log!(CAT, obj: &self.element, "Preparing Task");
-
let src = self.element.imp();
let settings = src.settings.lock().unwrap();
- self.push_period = settings.context_wait;
+ self.raise_log_level = settings.raise_log_level;
+
+ if self.raise_log_level {
+ gst::log!(CAT, obj: &self.element, "Preparing Task");
+ } else {
+ gst::trace!(CAT, obj: &self.element, "Preparing Task");
+ }
+
+ self.push_period = settings.push_period;
self.num_buffers = settings.num_buffers;
Ok(())
@@ -113,9 +122,18 @@ impl TaskImpl for SrcTask {
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async {
- gst::log!(CAT, obj: &self.element, "Starting Task");
+ if self.raise_log_level {
+ gst::log!(CAT, obj: &self.element, "Starting Task");
+ } else {
+ gst::trace!(CAT, obj: &self.element, "Starting Task");
+ }
+
+ self.timer = Some(Timer::interval_delayed_by(
+ // Delay first buffer push so as to let others start.
+ Duration::from_secs(2),
+ self.push_period.into(),
+ ));
self.buffer_count = 0;
- self.last_buf_instant = None;
self.buffer_pool.set_active(true).unwrap();
Ok(())
}
@@ -124,27 +142,17 @@ impl TaskImpl for SrcTask {
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
- gst::log!(CAT, obj: &self.element, "Stopping task");
+ if self.raise_log_level {
+ gst::log!(CAT, obj: &self.element, "Stopping Task");
+ } else {
+ gst::trace!(CAT, obj: &self.element, "Stopping Task");
+ }
self.buffer_pool.set_active(false).unwrap();
- self.last_pts = gst::ClockTime::ZERO;
+ self.timer = None;
self.need_initial_events = true;
self.need_segment = true;
- gst::log!(CAT, obj: &self.element, "Task stopped");
- Ok(())
- }
- .boxed()
- }
-
- fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
- async move {
- gst::log!(CAT, obj: &self.element, "Starting task flush");
-
- self.buffer_pool.set_active(false).unwrap();
- self.need_segment = true;
-
- gst::log!(CAT, obj: &self.element, "Task flush started");
Ok(())
}
.boxed()
@@ -152,30 +160,34 @@ impl TaskImpl for SrcTask {
fn try_next(&mut self) -> BoxFuture<'_, Result<gst::Buffer, gst::FlowError>> {
async move {
- if let Some(delay) = self
- .last_buf_instant
- .map(|last| last.elapsed())
- .opt_checked_sub(self.push_period)
- .ok()
- .flatten()
- {
- Timer::after(delay).await;
+ if self.raise_log_level {
+ gst::log!(CAT, obj: &self.element, "Awaiting timer");
+ } else {
+ gst::trace!(CAT, obj: &self.element, "Awaiting timer");
}
- self.last_buf_instant = Some(Instant::now());
-
- let start = self.last_pts;
- self.last_pts = start + BUFFER_DURATION;
+ self.timer.as_mut().unwrap().next().await;
- self.buffer_pool.acquire_buffer(None).map(|mut buffer| {
- {
- let buffer = buffer.get_mut().unwrap();
- buffer.set_pts(start);
- buffer.set_duration(BUFFER_DURATION);
- }
+ if self.raise_log_level {
+ gst::log!(CAT, obj: &self.element, "Timer ticked");
+ } else {
+ gst::trace!(CAT, obj: &self.element, "Timer ticked");
+ }
- buffer
- })
+ self.buffer_pool
+ .acquire_buffer(None)
+ .map(|mut buffer| {
+ {
+ let buffer = buffer.get_mut().unwrap();
+ let rtime = self.element.current_running_time().unwrap();
+ buffer.set_dts(rtime);
+ }
+ buffer
+ })
+ .map_err(|err| {
+ gst::error!(CAT, obj: &self.element, "Failed to acquire buffer {}", err);
+ err
+ })
}
.boxed()
}
@@ -185,15 +197,29 @@ impl TaskImpl for SrcTask {
let res = self.push(buffer).await;
match res {
Ok(_) => {
- gst::log!(CAT, obj: &self.element, "Successfully pushed buffer");
+ if self.raise_log_level {
+ gst::log!(CAT, obj: &self.element, "Successfully pushed buffer");
+ } else {
+ gst::trace!(CAT, obj: &self.element, "Successfully pushed buffer");
+ }
}
Err(gst::FlowError::Eos) => {
- gst::debug!(CAT, obj: &self.element, "EOS");
+ if self.raise_log_level {
+ gst::debug!(CAT, obj: &self.element, "EOS");
+ } else {
+ gst::trace!(CAT, obj: &self.element, "EOS");
+ }
let test_src = self.element.imp();
test_src.src_pad.push_event(gst::event::Eos::new()).await;
+
+ return Err(gst::FlowError::Eos);
}
Err(gst::FlowError::Flushing) => {
- gst::debug!(CAT, obj: &self.element, "Flushing");
+ if self.raise_log_level {
+ gst::debug!(CAT, obj: &self.element, "Flushing");
+ } else {
+ gst::trace!(CAT, obj: &self.element, "Flushing");
+ }
}
Err(err) => {
gst::error!(CAT, obj: &self.element, "Got error {}", err);
@@ -214,11 +240,20 @@ impl TaskImpl for SrcTask {
impl SrcTask {
async fn push(&mut self, buffer: gst::Buffer) -> Result<gst::FlowSuccess, gst::FlowError> {
- gst::log!(CAT, obj: &self.element, "Handling {:?}", buffer);
+ if self.raise_log_level {
+ gst::debug!(CAT, obj: &self.element, "Pushing {:?}", buffer);
+ } else {
+ gst::trace!(CAT, obj: &self.element, "Pushing {:?}", buffer);
+ }
+
let test_src = self.element.imp();
if self.need_initial_events {
- gst::debug!(CAT, obj: &self.element, "Pushing initial events");
+ if self.raise_log_level {
+ gst::debug!(CAT, obj: &self.element, "Pushing initial events");
+ } else {
+ gst::trace!(CAT, obj: &self.element, "Pushing initial events");
+ }
let stream_id = format!("{:08x}{:08x}", rand::random::<u32>(), rand::random::<u32>());
let stream_start_evt = gst::event::StreamStart::builder(&stream_id)
@@ -244,12 +279,27 @@ impl SrcTask {
self.need_segment = false;
}
- gst::debug!(CAT, obj: &self.element, "Forwarding {:?}", buffer);
+ if self.raise_log_level {
+ gst::debug!(CAT, obj: &self.element, "Forwarding buffer");
+ } else {
+ gst::trace!(CAT, obj: &self.element, "Forwarding buffer");
+ }
+
let ok = test_src.src_pad.push(buffer).await?;
self.buffer_count += 1;
if self.num_buffers.opt_eq(self.buffer_count).unwrap_or(false) {
+ if self.raise_log_level {
+ gst::debug!(CAT, obj: &self.element, "Pushing EOS");
+ } else {
+ gst::trace!(CAT, obj: &self.element, "Pushing EOS");
+ }
+
+ let test_src = self.element.imp();
+ if !test_src.src_pad.push_event(gst::event::Eos::new()).await {
+ gst::error!(CAT, obj: &self.element, "Error pushing EOS");
+ }
return Err(gst::FlowError::Eos);
}
@@ -266,7 +316,12 @@ pub struct TestSrc {
impl TestSrc {
fn prepare(&self, element: &super::TestSrc) -> Result<(), gst::ErrorMessage> {
- gst::debug!(CAT, obj: element, "Preparing");
+ let raise_log_level = self.settings.lock().unwrap().raise_log_level;
+ if raise_log_level {
+ gst::debug!(CAT, obj: element, "Preparing");
+ } else {
+ gst::trace!(CAT, obj: element, "Preparing");
+ }
let settings = self.settings.lock().unwrap();
let context =
@@ -282,35 +337,86 @@ impl TestSrc {
.prepare(SrcTask::new(element.clone()), context)
.block_on()?;
- gst::debug!(CAT, obj: element, "Prepared");
+ if raise_log_level {
+ gst::debug!(CAT, obj: element, "Prepared");
+ } else {
+ gst::trace!(CAT, obj: element, "Prepared");
+ }
Ok(())
}
fn unprepare(&self, element: &super::TestSrc) {
- gst::debug!(CAT, obj: element, "Unpreparing");
+ let raise_log_level = self.settings.lock().unwrap().raise_log_level;
+ if raise_log_level {
+ gst::debug!(CAT, obj: element, "Unpreparing");
+ } else {
+ gst::trace!(CAT, obj: element, "Unpreparing");
+ }
+
self.task.unprepare().block_on().unwrap();
- gst::debug!(CAT, obj: element, "Unprepared");
+
+ if raise_log_level {
+ gst::debug!(CAT, obj: element, "Unprepared");
+ } else {
+ gst::trace!(CAT, obj: element, "Unprepared");
+ }
}
fn stop(&self, element: &super::TestSrc) -> Result<(), gst::ErrorMessage> {
- gst::debug!(CAT, obj: element, "Stopping");
+ let raise_log_level = self.settings.lock().unwrap().raise_log_level;
+ if raise_log_level {
+ gst::debug!(CAT, obj: element, "Stopping");
+ } else {
+ gst::trace!(CAT, obj: element, "Stopping");
+ }
+
self.task.stop().block_on()?;
- gst::debug!(CAT, obj: element, "Stopped");
+
+ if raise_log_level {
+ gst::debug!(CAT, obj: element, "Stopped");
+ } else {
+ gst::trace!(CAT, obj: element, "Stopped");
+ }
+
Ok(())
}
fn start(&self, element: &super::TestSrc) -> Result<(), gst::ErrorMessage> {
- gst::debug!(CAT, obj: element, "Starting");
+ let raise_log_level = self.settings.lock().unwrap().raise_log_level;
+ if raise_log_level {
+ gst::debug!(CAT, obj: element, "Starting");
+ } else {
+ gst::trace!(CAT, obj: element, "Starting");
+ }
+
self.task.start().block_on()?;
- gst::debug!(CAT, obj: element, "Started");
+
+ if raise_log_level {
+ gst::debug!(CAT, obj: element, "Started");
+ } else {
+ gst::trace!(CAT, obj: element, "Started");
+ }
+
Ok(())
}
fn pause(&self, element: &super::TestSrc) -> Result<(), gst::ErrorMessage> {
- gst::debug!(CAT, obj: element, "Pausing");
+ let raise_log_level = self.settings.lock().unwrap().raise_log_level;
+ if raise_log_level {
+ gst::debug!(CAT, obj: element, "Pausing");
+ } else {
+ gst::trace!(CAT, obj: element, "Pausing");
+ }
+
self.task.pause().block_on()?;
- gst::debug!(CAT, obj: element, "Paused");
+
+ if raise_log_level {
+ gst::debug!(CAT, obj: element, "Paused");
+ } else {
+ gst::trace!(CAT, obj: element, "Paused");
+ }
+
Ok(())
}
}
@@ -348,6 +454,16 @@ impl ObjectImpl for TestSrc {
.maximum(1000)
.default_value(DEFAULT_CONTEXT_WAIT.as_millis() as u32)
.build(),
+ glib::ParamSpecUInt::builder("push-period")
+ .nick("Buffer Push Period")
+ .blurb("Push a new buffer every this many ms")
+ .default_value(DEFAULT_PUSH_PERIOD.mseconds() as u32)
+ .build(),
+ glib::ParamSpecBoolean::builder("raise-log-level")
+ .nick("Raise log level")
+ .blurb("Raises the log level so that this element stands out")
+ .write_only()
+ .build(),
glib::ParamSpecInt::builder("num-buffers")
.nick("Num Buffers")
.blurb("Number of buffers to output before sending EOS (-1 = unlimited)")
@@ -380,9 +496,17 @@ impl ObjectImpl for TestSrc {
value.get::<u32>().expect("type checked upstream").into(),
);
}
+ "push-period" => {
+ settings.push_period = gst::ClockTime::from_mseconds(
+ value.get::<u32>().expect("type checked upstream").into(),
+ );
+ }
+ "raise-log-level" => {
+ settings.raise_log_level = value.get::<bool>().expect("type checked upstream");
+ }
"num-buffers" => {
let value = value.get::<i32>().expect("type checked upstream");
- settings.num_buffers = if value > 0 { Some(value) } else { None };
+ settings.num_buffers = if value > 0 { Some(value as u32) } else { None };
}
_ => unimplemented!(),
}
@@ -393,7 +517,13 @@ impl ObjectImpl for TestSrc {
match pspec.name() {
"context" => settings.context.to_value(),
"context-wait" => (settings.context_wait.as_millis() as u32).to_value(),
- "num-buffers" => settings.num_buffers.unwrap_or(-1).to_value(),
+ "push-period" => (settings.push_period.mseconds() as u32).to_value(),
+ "raise-log-level" => settings.raise_log_level.to_value(),
+ "num-buffers" => settings
+ .num_buffers
+ .and_then(|val| val.try_into().ok())
+ .unwrap_or(-1i32)
+ .to_value(),
_ => unimplemented!(),
}
}
diff --git a/generic/threadshare/src/runtime/executor/timer.rs b/generic/threadshare/src/runtime/executor/timer.rs
index e08ebf39d..079c7a9ef 100644
--- a/generic/threadshare/src/runtime/executor/timer.rs
+++ b/generic/threadshare/src/runtime/executor/timer.rs
@@ -83,6 +83,16 @@ impl Timer {
Timer::interval_at(Instant::now() + period, period)
}
+ /// Creates a timer that emits events periodically, starting after `delay`.
+ ///
+ /// When throttling is activated (i.e. when using a non-`0` `wait`
+ /// duration in `Context::acquire`), timer entries are assigned to
+ /// the nearest time frame, meaning that the delay might elapse
+ /// `wait` / 2 ms earlier or later than the expected instant.
+ pub fn interval_delayed_by(delay: Duration, period: Duration) -> Timer {
+ Timer::interval_at(Instant::now() + delay, period)
+ }
+
/// Creates a timer that emits events periodically, starting at `start`.
///
/// When throttling is activated (i.e. when using a non-`0` `wait`