Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Dröge <sebastian@centricular.com>2022-06-29 16:53:10 +0300
committerSebastian Dröge <sebastian@centricular.com>2022-07-19 18:03:40 +0300
commit320cb735276f9e0944f6d11350271649eeefdefc (patch)
treee84cd2533ee13d23435974bc051c3d5417cc05b2 /utils/tracers
parent2f987b09ee4aaf8d1fb80d30d2a304565f22a628 (diff)
tracers: Add new tracer for logging the lateness of each buffer when it leaves a pad
Diffstat (limited to 'utils/tracers')
-rw-r--r--utils/tracers/scripts/buffer_lateness.py79
-rw-r--r--utils/tracers/src/buffer_lateness/imp.rs377
-rw-r--r--utils/tracers/src/buffer_lateness/mod.rs24
-rw-r--r--utils/tracers/src/lib.rs2
4 files changed, 482 insertions, 0 deletions
diff --git a/utils/tracers/scripts/buffer_lateness.py b/utils/tracers/scripts/buffer_lateness.py
new file mode 100644
index 000000000..f5e17f5a2
--- /dev/null
+++ b/utils/tracers/scripts/buffer_lateness.py
@@ -0,0 +1,79 @@
+import argparse
+import csv
+import re
+
+import matplotlib
+import matplotlib.pyplot as plt
+
+parser = argparse.ArgumentParser()
+parser.add_argument("file", help="Input file with queue levels")
+parser.add_argument("--include-filter", help="Regular expression for element:pad names that should be included")
+parser.add_argument("--exclude-filter", help="Regular expression for element:pad names that should be excluded")
+parser.add_argument("--no-latency", help="do not include latency (enabled by default)", action="store_true")
+args = parser.parse_args()
+
+include_filter = None
+if args.include_filter is not None:
+ include_filter = re.compile(args.include_filter)
+exclude_filter = None
+if args.exclude_filter is not None:
+ exclude_filter = re.compile(args.exclude_filter)
+
+pads = {}
+
+with open(args.file, mode='r', encoding='utf_8', newline='') as csvfile:
+ reader = csv.reader(csvfile, delimiter=',', quotechar='|')
+ for row in reader:
+ if len(row) != 7:
+ continue
+
+ if include_filter is not None and not include_filter.match(row[1]):
+ continue
+ if exclude_filter is not None and exclude_filter.match(row[1]):
+ continue
+
+ if not row[1] in pads:
+ pads[row[1]] = {
+ 'buffer-clock-time': [],
+ 'pipeline-clock-time': [],
+ 'lateness': [],
+ 'latency': [],
+ }
+
+ wallclock = float(row[0]) / 1000000000.0
+ pads[row[1]]['buffer-clock-time'].append((wallclock, float(row[3]) / 1000000000.0))
+ pads[row[1]]['pipeline-clock-time'].append((wallclock, float(row[4]) / 1000000000.0))
+ pads[row[1]]['lateness'].append((wallclock, float(row[5]) / 1000000000.0))
+ pads[row[1]]['latency'].append((wallclock, float(row[6]) / 1000000000.0))
+
+matplotlib.rcParams['figure.dpi'] = 200
+
+prop_cycle = plt.rcParams['axes.prop_cycle']
+colors = prop_cycle.by_key()['color']
+
+fig, ax1 = plt.subplots()
+
+ax1.set_xlabel("wallclock (s)")
+ax1.set_ylabel("time (s)")
+ax1.tick_params(axis='y')
+
+for (i, (pad, values)) in enumerate(pads.items()):
+ ax1.plot(
+ [x[0] for x in values['lateness']],
+ [x[1] for x in values['lateness']],
+ '.', label = '{}: lateness'.format(pad),
+ color = colors[i],
+ )
+
+ if not args.no_latency:
+ ax1.plot(
+ [x[0] for x in values['latency']],
+ [x[1] for x in values['latency']],
+ '-', label = '{}: latency'.format(pad),
+ color = colors[i],
+ )
+
+fig.tight_layout()
+plt.legend(loc='best')
+
+plt.show()
diff --git a/utils/tracers/src/buffer_lateness/imp.rs b/utils/tracers/src/buffer_lateness/imp.rs
new file mode 100644
index 000000000..80f461c04
--- /dev/null
+++ b/utils/tracers/src/buffer_lateness/imp.rs
@@ -0,0 +1,377 @@
+// Copyright (C) 2022 Sebastian Dröge <sebastian@centricular.com>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
+// If a copy of the MPL was not distributed with this file, You can obtain one at
+// <https://mozilla.org/MPL/2.0/>.
+//
+// SPDX-License-Identifier: MPL-2.0
+
+/// This tracer provides an easy way to collect lateness of each buffer when it is pushed out of a
+/// pad in live pipelines.
+///
+/// Example:
+///
+/// ```console
+/// $ GST_TRACERS='buffer-lateness(file="/tmp/buffer_lateness.log")' gst-launch-1.0 audiotestsrc is-live=true ! queue ! fakesink
+/// ```
+///
+/// The generated file is a CSV file of the format
+///
+/// ```csv
+/// timestamp,element:pad name,pad pointer,buffer clock time,pipeline clock time,lateness,min latency
+/// ```
+///
+/// ## Parameters
+///
+/// ### `file`
+///
+/// Specifies the path to the file that will collect the CSV file with the buffer lateness.
+///
+/// By default the file is written to `/tmp/buffer_lateness.log`.
+///
+/// ### `include-filter`
+///
+/// Specifies a regular expression for the `element:pad` names that should be included.
+///
+/// By default this is not set.
+///
+/// ### `exclude-filter`
+///
+/// Specifies a regular expression for the `element:pad` names that should **not** be included.
+///
+/// By default this is not set.
+///
+use std::collections::HashMap;
+use std::path::PathBuf;
+use std::str::FromStr;
+use std::sync::{Arc, Mutex};
+
+use gst::glib;
+use gst::prelude::*;
+use gst::subclass::prelude::*;
+use once_cell::sync::Lazy;
+use regex::Regex;
+
+static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
+ gst::DebugCategory::new(
+ "buffer-lateness",
+ gst::DebugColorFlags::empty(),
+ Some("Tracer to collect buffer lateness"),
+ )
+});
+
+#[derive(Debug)]
+struct Settings {
+ file: PathBuf,
+ include_filter: Option<Regex>,
+ exclude_filter: Option<Regex>,
+}
+
+impl Default for Settings {
+ fn default() -> Self {
+ let mut file = glib::tmp_dir();
+ file.push("buffer_lateness.log");
+
+ Self {
+ file,
+ include_filter: None,
+ exclude_filter: None,
+ }
+ }
+}
+
+impl Settings {
+ fn update_from_params(&mut self, obj: &super::BufferLateness, params: String) {
+ let s = match gst::Structure::from_str(&format!("buffer-lateness,{}", params)) {
+ Ok(s) => s,
+ Err(err) => {
+ gst::warning!(CAT, obj: obj, "failed to parse tracer parameters: {}", err);
+ return;
+ }
+ };
+
+ if let Ok(file) = s.get::<&str>("file") {
+ gst::log!(CAT, obj: obj, "file= {}", file);
+ self.file = PathBuf::from(file);
+ }
+
+ if let Ok(filter) = s.get::<&str>("include-filter") {
+ gst::log!(CAT, obj: obj, "include filter= {}", filter);
+ let filter = match Regex::new(filter) {
+ Ok(filter) => Some(filter),
+ Err(err) => {
+ gst::error!(
+ CAT,
+ obj: obj,
+ "Failed to compile include-filter regex: {}",
+ err
+ );
+ None
+ }
+ };
+ self.include_filter = filter;
+ }
+
+ if let Ok(filter) = s.get::<&str>("exclude-filter") {
+ gst::log!(CAT, obj: obj, "exclude filter= {}", filter);
+ let filter = match Regex::new(filter) {
+ Ok(filter) => Some(filter),
+ Err(err) => {
+ gst::error!(
+ CAT,
+ obj: obj,
+ "Failed to compile exclude-filter regex: {}",
+ err
+ );
+ None
+ }
+ };
+ self.exclude_filter = filter;
+ }
+ }
+}
+
+#[derive(Default)]
+struct State {
+ pads: HashMap<usize, Pad>,
+ log: Vec<LogLine>,
+ settings: Settings,
+}
+
+struct Pad {
+ element_name: Option<Arc<glib::GString>>,
+ pad_name: Arc<glib::GString>,
+ latency: u64,
+}
+
+struct LogLine {
+ timestamp: u64,
+ element_name: Arc<glib::GString>,
+ pad_name: Arc<glib::GString>,
+ ptr: usize,
+ buffer_clock_time: u64,
+ pipeline_clock_time: u64,
+ lateness: i64,
+ min_latency: u64,
+}
+
+#[derive(Default)]
+pub struct BufferLateness {
+ state: Mutex<State>,
+}
+
+#[glib::object_subclass]
+impl ObjectSubclass for BufferLateness {
+ const NAME: &'static str = "GstBufferLateness";
+ type Type = super::BufferLateness;
+ type ParentType = gst::Tracer;
+}
+
+impl ObjectImpl for BufferLateness {
+ fn constructed(&self, obj: &Self::Type) {
+ self.parent_constructed(obj);
+
+ if let Some(params) = obj.property::<Option<String>>("params") {
+ let mut state = self.state.lock().unwrap();
+ state.settings.update_from_params(obj, params);
+ }
+
+ self.register_hook(TracerHook::ElementAddPad);
+ self.register_hook(TracerHook::ElementRemovePad);
+ self.register_hook(TracerHook::PadPushPre);
+ self.register_hook(TracerHook::PadPushListPre);
+ self.register_hook(TracerHook::PadQueryPost);
+ }
+
+ fn dispose(&self, obj: &Self::Type) {
+ use std::io::prelude::*;
+
+ let state = self.state.lock().unwrap();
+
+ let mut file = match std::fs::File::create(&state.settings.file) {
+ Ok(file) => file,
+ Err(err) => {
+ gst::error!(CAT, obj: obj, "Failed to create file: {err}");
+ return;
+ }
+ };
+
+ for LogLine {
+ timestamp,
+ element_name,
+ pad_name,
+ ptr,
+ buffer_clock_time,
+ pipeline_clock_time,
+ lateness,
+ min_latency,
+ } in &state.log
+ {
+ if let Err(err) = writeln!(&mut file, "{timestamp},{element_name}:{pad_name},0x{ptr:08x},{buffer_clock_time},{pipeline_clock_time},{lateness},{min_latency}") {
+ gst::error!(CAT, obj: obj, "Failed to write to file: {err}");
+ return;
+ }
+ }
+ }
+}
+
+impl GstObjectImpl for BufferLateness {}
+
+impl TracerImpl for BufferLateness {
+ fn element_add_pad(&self, _ts: u64, _element: &gst::Element, pad: &gst::Pad) {
+ if pad.direction() != gst::PadDirection::Src {
+ return;
+ }
+
+ let tracer = self.instance();
+ let ptr = pad.as_ptr() as usize;
+ gst::debug!(
+ CAT,
+ obj: &tracer,
+ "new source pad: {} 0x{:08x}",
+ pad.name(),
+ ptr
+ );
+
+ let mut state = self.state.lock().unwrap();
+ // FIXME: Element name might not be set yet here if the pad is added in instance_init
+ // already.
+ state.pads.entry(ptr).or_insert_with(|| Pad {
+ element_name: None,
+ pad_name: Arc::new(pad.name()),
+ latency: 0,
+ });
+ }
+
+ fn element_remove_pad(&self, _ts: u64, _element: &gst::Element, pad: &gst::Pad) {
+ let ptr = pad.as_ptr() as usize;
+ let mut state = self.state.lock().unwrap();
+ state.pads.remove(&ptr);
+ }
+
+ fn pad_push_pre(&self, ts: u64, pad: &gst::Pad, buffer: &gst::Buffer) {
+ let timestamp = match buffer.dts_or_pts() {
+ Some(timestamp) => timestamp,
+ None => return,
+ };
+
+ let element = match pad.parent().and_then(|p| p.downcast::<gst::Element>().ok()) {
+ Some(element) => element,
+ None => return,
+ };
+
+ let clock = match element.clock() {
+ Some(clock) => clock,
+ None => return,
+ };
+
+ let base_time = match element.base_time() {
+ // FIXME: Workaround for base time being set to 0 initially instead of None
+ Some(base_time)
+ if base_time == gst::ClockTime::ZERO && element.start_time().is_some() =>
+ {
+ return
+ }
+ Some(base_time) => base_time,
+ None => return,
+ };
+
+ let segment = match pad
+ .sticky_event::<gst::event::Segment>(0)
+ .map(|s| s.segment().clone())
+ .and_then(|s| s.downcast::<gst::ClockTime>().ok())
+ {
+ Some(segment) => segment,
+ None => return,
+ };
+
+ let ptr = pad.as_ptr() as usize;
+
+ let mut state = self.state.lock().unwrap();
+ let State {
+ ref mut pads,
+ ref mut log,
+ ref settings,
+ ..
+ } = &mut *state;
+ if let Some(pad) = pads.get_mut(&ptr) {
+ if pad.element_name.is_none() {
+ pad.element_name = Some(Arc::new(element.name()));
+
+ let name = format!("{}:{}", pad.element_name.as_ref().unwrap(), pad.pad_name);
+ if let Some(ref filter) = settings.include_filter {
+ if !filter.is_match(&name) {
+ pads.remove(&ptr);
+ return;
+ }
+ }
+ if let Some(ref filter) = settings.exclude_filter {
+ if filter.is_match(&name) {
+ pads.remove(&ptr);
+ return;
+ }
+ }
+ }
+
+ let element_name = pad.element_name.as_ref().unwrap();
+
+ let running_time = match segment.to_running_time(timestamp) {
+ Some(running_time) => running_time,
+ None => return,
+ };
+
+ let buffer_clock_time = running_time + base_time;
+ let pipeline_clock_time = match clock.time() {
+ Some(time) => time,
+ None => return,
+ };
+
+ log.push(LogLine {
+ timestamp: ts,
+ element_name: element_name.clone(),
+ pad_name: pad.pad_name.clone(),
+ ptr,
+ buffer_clock_time: buffer_clock_time.nseconds(),
+ pipeline_clock_time: pipeline_clock_time.nseconds(),
+ lateness: if buffer_clock_time > pipeline_clock_time {
+ -((buffer_clock_time.nseconds() - pipeline_clock_time.nseconds()) as i64)
+ } else {
+ (pipeline_clock_time.nseconds() - buffer_clock_time.nseconds()) as i64
+ },
+ min_latency: pad.latency,
+ });
+ }
+ }
+
+ fn pad_push_list_pre(&self, ts: u64, pad: &gst::Pad, buffer_list: &gst::BufferList) {
+ for buffer in buffer_list.iter_owned() {
+ self.pad_push_pre(ts, pad, &buffer);
+ }
+ }
+
+ #[allow(clippy::single_match)]
+ fn pad_query_post(&self, _ts: u64, pad: &gst::Pad, query: &gst::Query, res: bool) {
+ if !res {
+ return;
+ }
+
+ if pad.direction() != gst::PadDirection::Src {
+ return;
+ }
+
+ match query.view() {
+ gst::QueryView::Latency(l) => {
+ let mut state = self.state.lock().unwrap();
+ if let Some(pad) = state.pads.get_mut(&(pad.as_ptr() as usize)) {
+ let (live, min, _max) = l.result();
+ if live {
+ pad.latency = min.nseconds();
+ } else {
+ pad.latency = 0;
+ }
+ }
+ }
+ _ => (),
+ }
+ }
+}
diff --git a/utils/tracers/src/buffer_lateness/mod.rs b/utils/tracers/src/buffer_lateness/mod.rs
new file mode 100644
index 000000000..c9783eeea
--- /dev/null
+++ b/utils/tracers/src/buffer_lateness/mod.rs
@@ -0,0 +1,24 @@
+// Copyright (C) 2022 Sebastian Dröge <sebastian@centricular.com>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
+// If a copy of the MPL was not distributed with this file, You can obtain one at
+// <https://mozilla.org/MPL/2.0/>.
+//
+// SPDX-License-Identifier: MPL-2.0
+
+use gst::glib;
+use gst::prelude::*;
+
+mod imp;
+
+glib::wrapper! {
+ pub struct BufferLateness(ObjectSubclass<imp::BufferLateness>) @extends gst::Tracer, gst::Object;
+}
+
+pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ gst::Tracer::register(
+ Some(plugin),
+ "buffer-lateness",
+ BufferLateness::static_type(),
+ )
+}
diff --git a/utils/tracers/src/lib.rs b/utils/tracers/src/lib.rs
index bd037e8d8..c72d4b85b 100644
--- a/utils/tracers/src/lib.rs
+++ b/utils/tracers/src/lib.rs
@@ -9,12 +9,14 @@
use gst::glib;
+mod buffer_lateness;
mod pipeline_snapshot;
mod queue_levels;
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
pipeline_snapshot::register(plugin)?;
queue_levels::register(plugin)?;
+ buffer_lateness::register(plugin)?;
Ok(())
}