Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/sdroege/gst-plugin-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/utils
diff options
context:
space:
mode:
authorSebastian Dröge <sebastian@centricular.com>2022-06-29 12:50:31 +0300
committerSebastian Dröge <sebastian@centricular.com>2022-07-19 18:03:40 +0300
commit2f987b09ee4aaf8d1fb80d30d2a304565f22a628 (patch)
treef95b63044c77b428dfb898913894abbcff20c3df /utils
parent44b05b428525fb79ae54047242204efa8db9a0ab (diff)
tracers: Add queue levels tracer and Python script for plotting
Diffstat (limited to 'utils')
-rw-r--r--utils/tracers/Cargo.toml2
-rw-r--r--utils/tracers/scripts/queue_levels.py153
-rw-r--r--utils/tracers/src/lib.rs2
-rw-r--r--utils/tracers/src/queue_levels/imp.rs437
-rw-r--r--utils/tracers/src/queue_levels/mod.rs20
5 files changed, 614 insertions, 0 deletions
diff --git a/utils/tracers/Cargo.toml b/utils/tracers/Cargo.toml
index daf52d62..fd1fdc56 100644
--- a/utils/tracers/Cargo.toml
+++ b/utils/tracers/Cargo.toml
@@ -11,6 +11,7 @@ description = "GStreamer tracers plugin"
gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
once_cell = "1.0"
anyhow = "1"
+regex = "1"
[target.'cfg(unix)'.dependencies]
signal-hook = "0.3"
@@ -26,6 +27,7 @@ gst-plugin-version-helper = { path="../../version-helper" }
[features]
static = []
capi = []
+v1_22 = ["gst/v1_22"]
[package.metadata.capi]
min_version = "0.8.0"
diff --git a/utils/tracers/scripts/queue_levels.py b/utils/tracers/scripts/queue_levels.py
new file mode 100644
index 00000000..90ae8df3
--- /dev/null
+++ b/utils/tracers/scripts/queue_levels.py
@@ -0,0 +1,153 @@
+import argparse
+import csv
+import re
+
+import matplotlib
+import matplotlib.pyplot as plt
+
+parser = argparse.ArgumentParser()
+parser.add_argument("file", help="Input file with queue levels")
+parser.add_argument("--include-filter", help="Regular expression for queue names that should be included")
+parser.add_argument("--exclude-filter", help="Regular expression for queue names that should be excluded")
+parser.add_argument("--bytes", help="include bytes levels", action="store_true")
+parser.add_argument("--time", help="include time levels (default if none of the others are enabled)", action="store_true")
+parser.add_argument("--buffers", help="include buffers levels", action="store_true")
+parser.add_argument("--no-max", help="do not include max levels (enabled by default)", action="store_true")
+args = parser.parse_args()
+
+include_filter = None
+if args.include_filter is not None:
+ include_filter = re.compile(args.include_filter)
+exclude_filter = None
+if args.exclude_filter is not None:
+ exclude_filter = re.compile(args.exclude_filter)
+
+queues = {}
+
+with open(args.file, mode='r', encoding='utf_8', newline='') as csvfile:
+ reader = csv.reader(csvfile, delimiter=',', quotechar='|')
+ for row in reader:
+ if len(row) != 9:
+ continue
+
+ if include_filter is not None and not include_filter.match(row[1]):
+ continue
+ if exclude_filter is not None and exclude_filter.match(row[1]):
+ continue
+
+ if not row[1] in queues:
+ queues[row[1]] = {
+ 'cur-level-bytes': [],
+ 'cur-level-time': [],
+ 'cur-level-buffers': [],
+ 'max-size-bytes': [],
+ 'max-size-time': [],
+ 'max-size-buffers': [],
+ }
+
+ wallclock = float(row[0]) / 1000000000.0
+ queues[row[1]]['cur-level-bytes'].append((wallclock, int(row[3])))
+ queues[row[1]]['cur-level-time'].append((wallclock, float(row[4]) / 1000000000.0))
+ queues[row[1]]['cur-level-buffers'].append((wallclock, int(row[5])))
+ queues[row[1]]['max-size-bytes'].append((wallclock, int(row[6])))
+ queues[row[1]]['max-size-time'].append((wallclock, float(row[7]) / 1000000000.0))
+ queues[row[1]]['max-size-buffers'].append((wallclock, int(row[8])))
+
+matplotlib.rcParams['figure.dpi'] = 200
+
+prop_cycle = plt.rcParams['axes.prop_cycle']
+colors = prop_cycle.by_key()['color']
+
+num_plots = 0
+axes_names = []
+if args.buffers:
+ num_plots += 1
+ axes_names.append("buffers")
+if args.time:
+ num_plots += 1
+ axes_names.append("time (s)")
+if args.bytes:
+ num_plots += 1
+ axes_names.append("bytes")
+
+if num_plots == 0:
+ num_plots += 1
+ axes_names.append("time (s)")
+
+fig, ax1 = plt.subplots()
+ax1.set_xlabel("wallclock (s)")
+ax1.set_ylabel(axes_names[0])
+ax1.tick_params(axis='y')
+axes = [ax1]
+
+if num_plots > 1:
+ ax2 = ax1.twinx()
+ ax2.set_ylabel(axes_names[1])
+ axes.append(ax2)
+if num_plots > 2:
+ ax3 = ax1.twinx()
+ ax3.set_ylabel(axes_names[2])
+ ax3.spines['right'].set_position(('outward', 60))
+ axes.append(ax3)
+
+for (i, (queue, values)) in enumerate(queues.items()):
+ axis = 0
+
+ if args.buffers:
+ axes[axis].plot(
+ [x[0] for x in values['cur-level-buffers']],
+ [x[1] for x in values['cur-level-buffers']],
+ '.', label = '{}: cur-level-buffers'.format(queue),
+ color = colors[i],
+ )
+
+ if not args.no_max:
+ axes[axis].plot(
+ [x[0] for x in values['max-size-buffers']],
+ [x[1] for x in values['max-size-buffers']],
+ '-', label = '{}: max-size-buffers'.format(queue),
+ color = colors[i],
+ )
+
+ axis += 1
+
+ if args.time:
+ axes[axis].plot(
+ [x[0] for x in values['cur-level-time']],
+ [x[1] for x in values['cur-level-time']],
+ 'p', label = '{}: cur-level-time'.format(queue),
+ color = colors[i],
+ )
+
+ if not args.no_max:
+ axes[axis].plot(
+ [x[0] for x in values['max-size-time']],
+ [x[1] for x in values['max-size-time']],
+ '-.', label = '{}: max-size-time'.format(queue),
+ color = colors[i],
+ )
+
+ axis += 1
+
+ if args.bytes:
+ axes[axis].plot(
+ [x[0] for x in values['cur-level-bytes']],
+ [x[1] for x in values['cur-level-bytes']],
+ 'x', label = '{}: cur-level-bytes'.format(queue),
+ color = colors[i],
+ )
+
+ if not args.no_max:
+ axes[axis].plot(
+ [x[0] for x in values['max-size-bytes']],
+ [x[1] for x in values['max-size-bytes']],
+ '--', label = '{}: max-size-bytes'.format(queue),
+ color = colors[i],
+ )
+
+ axis += 1
+
+fig.tight_layout()
+fig.legend()
+
+plt.show()
diff --git a/utils/tracers/src/lib.rs b/utils/tracers/src/lib.rs
index 8250d27c..bd037e8d 100644
--- a/utils/tracers/src/lib.rs
+++ b/utils/tracers/src/lib.rs
@@ -10,9 +10,11 @@
use gst::glib;
mod pipeline_snapshot;
+mod queue_levels;
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
pipeline_snapshot::register(plugin)?;
+ queue_levels::register(plugin)?;
Ok(())
}
diff --git a/utils/tracers/src/queue_levels/imp.rs b/utils/tracers/src/queue_levels/imp.rs
new file mode 100644
index 00000000..48256fa1
--- /dev/null
+++ b/utils/tracers/src/queue_levels/imp.rs
@@ -0,0 +1,437 @@
+// Copyright (C) 2022 Sebastian Dröge <sebastian@centricular.com>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
+// If a copy of the MPL was not distributed with this file, You can obtain one at
+// <https://mozilla.org/MPL/2.0/>.
+//
+// SPDX-License-Identifier: MPL-2.0
+
+/// This tracer provides an easy way to collect queue levels over time of all queues inside a
+/// pipeline.
+///
+/// Example:
+///
+/// ```console
+/// $ GST_TRACERS='queue-levels(file="/tmp/queue_levels.log")' gst-launch-1.0 audiotestsrc ! queue ! fakesink
+/// ```
+///
+/// The generated file is a CSV file of the format
+///
+/// ```csv
+/// timestamp,queue name,queue pointer,cur-level-bytes,cur-level-time,cur-level-buffers,max-size-bytes,max-size-time,max-size-buffers
+/// ```
+///
+/// ## Parameters
+///
+/// ### `file`
+///
+/// Specifies the path to the file that will collect the CSV file with the queue levels.
+///
+/// By default the file is written to `/tmp/queue_levels.log`.
+///
+/// ### `include-filter`
+///
+/// Specifies a regular expression for the queue object names that should be included.
+///
+/// By default this is not set.
+///
+/// ### `exclude-filter`
+///
+/// Specifies a regular expression for the queue object names that should **not** be included.
+///
+/// By default this is not set.
+use std::collections::HashMap;
+use std::path::PathBuf;
+use std::str::FromStr;
+use std::sync::{Arc, Mutex};
+
+use gst::glib;
+use gst::prelude::*;
+use gst::subclass::prelude::*;
+use once_cell::sync::Lazy;
+use regex::Regex;
+
+static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
+ gst::DebugCategory::new(
+ "queue-levels",
+ gst::DebugColorFlags::empty(),
+ Some("Tracer to collect queue levels"),
+ )
+});
+
+static QUEUE_TYPE: Lazy<glib::Type> = Lazy::new(|| {
+ if let Ok(queue) = gst::ElementFactory::make("queue", None) {
+ queue.type_()
+ } else {
+ gst::warning!(CAT, "Can't instantiate queue element");
+ glib::Type::INVALID
+ }
+});
+
+#[derive(Debug)]
+struct Settings {
+ file: PathBuf,
+ include_filter: Option<Regex>,
+ exclude_filter: Option<Regex>,
+}
+
+impl Default for Settings {
+ fn default() -> Self {
+ let mut file = glib::tmp_dir();
+ file.push("queue_levels.log");
+
+ Self {
+ file,
+ include_filter: None,
+ exclude_filter: None,
+ }
+ }
+}
+
+impl Settings {
+ fn update_from_params(&mut self, obj: &super::QueueLevels, params: String) {
+ let s = match gst::Structure::from_str(&format!("queue-levels,{}", params)) {
+ Ok(s) => s,
+ Err(err) => {
+ gst::warning!(CAT, obj: obj, "failed to parse tracer parameters: {}", err);
+ return;
+ }
+ };
+
+ if let Ok(file) = s.get::<&str>("file") {
+ gst::log!(CAT, obj: obj, "file= {}", file);
+ self.file = PathBuf::from(file);
+ }
+
+ if let Ok(filter) = s.get::<&str>("include-filter") {
+ gst::log!(CAT, obj: obj, "include filter= {}", filter);
+ let filter = match Regex::new(filter) {
+ Ok(filter) => Some(filter),
+ Err(err) => {
+ gst::error!(
+ CAT,
+ obj: obj,
+ "Failed to compile include-filter regex: {}",
+ err
+ );
+ None
+ }
+ };
+ self.include_filter = filter;
+ }
+
+ if let Ok(filter) = s.get::<&str>("exclude-filter") {
+ gst::log!(CAT, obj: obj, "exclude filter= {}", filter);
+ let filter = match Regex::new(filter) {
+ Ok(filter) => Some(filter),
+ Err(err) => {
+ gst::error!(
+ CAT,
+ obj: obj,
+ "Failed to compile exclude-filter regex: {}",
+ err
+ );
+ None
+ }
+ };
+ self.exclude_filter = filter;
+ }
+ }
+}
+
+#[derive(Default)]
+struct State {
+ queues: HashMap<usize, Arc<glib::GString>>,
+ log: Vec<LogLine>,
+ settings: Settings,
+}
+
+struct LogLine {
+ timestamp: u64,
+ name: Arc<glib::GString>,
+ ptr: usize,
+ cur_level_bytes: u32,
+ cur_level_time: u64,
+ cur_level_buffers: u32,
+ max_size_bytes: u32,
+ max_size_time: u64,
+ max_size_buffers: u32,
+}
+
+#[derive(Default)]
+pub struct QueueLevels {
+ state: Mutex<State>,
+}
+
+#[glib::object_subclass]
+impl ObjectSubclass for QueueLevels {
+ const NAME: &'static str = "GstQueueLevels";
+ type Type = super::QueueLevels;
+ type ParentType = gst::Tracer;
+}
+
+impl ObjectImpl for QueueLevels {
+ fn constructed(&self, obj: &Self::Type) {
+ self.parent_constructed(obj);
+
+ if let Some(params) = obj.property::<Option<String>>("params") {
+ let mut state = self.state.lock().unwrap();
+ state.settings.update_from_params(obj, params);
+ }
+
+ Lazy::force(&QUEUE_TYPE);
+
+ self.register_hook(TracerHook::ElementNew);
+ self.register_hook(TracerHook::ObjectDestroyed);
+ self.register_hook(TracerHook::PadPushPost);
+ self.register_hook(TracerHook::PadPushListPost);
+ #[cfg(feature = "v1_22")]
+ {
+ self.register_hook(TracerHook::PadChainPost);
+ self.register_hook(TracerHook::PadChainListPost);
+ }
+ #[cfg(not(feature = "v1_22"))]
+ {
+ self.register_hook(TracerHook::PadPushPost);
+ self.register_hook(TracerHook::PadPushListPost);
+ }
+ self.register_hook(TracerHook::PadPushPre);
+ self.register_hook(TracerHook::PadPushListPre);
+ self.register_hook(TracerHook::ElementChangeStatePost);
+ self.register_hook(TracerHook::PadPushEventPre);
+ }
+
+ fn dispose(&self, obj: &Self::Type) {
+ use std::io::prelude::*;
+
+ let state = self.state.lock().unwrap();
+
+ let mut file = match std::fs::File::create(&state.settings.file) {
+ Ok(file) => file,
+ Err(err) => {
+ gst::error!(CAT, obj: obj, "Failed to create file: {err}");
+ return;
+ }
+ };
+
+ for LogLine {
+ timestamp,
+ name,
+ ptr,
+ cur_level_bytes,
+ cur_level_time,
+ cur_level_buffers,
+ max_size_bytes,
+ max_size_time,
+ max_size_buffers,
+ } in &state.log
+ {
+ if let Err(err) = writeln!(&mut file, "{timestamp},{name},0x{ptr:08x},{cur_level_bytes},{cur_level_time},{cur_level_buffers},{max_size_bytes},{max_size_time},{max_size_buffers}") {
+ gst::error!(CAT, obj: obj, "Failed to write to file: {err}");
+ return;
+ }
+ }
+ }
+}
+
+impl GstObjectImpl for QueueLevels {}
+
+impl TracerImpl for QueueLevels {
+ fn element_new(&self, _ts: u64, element: &gst::Element) {
+ if element.type_() != *QUEUE_TYPE {
+ return;
+ }
+
+ let tracer = self.instance();
+ let ptr = element.as_ptr() as usize;
+ gst::debug!(
+ CAT,
+ obj: &tracer,
+ "new queue: {} 0x{:08x}",
+ element.name(),
+ ptr
+ );
+
+ let mut state = self.state.lock().unwrap();
+
+ let name = element.name();
+ if let Some(ref filter) = state.settings.include_filter {
+ if !filter.is_match(&name) {
+ return;
+ }
+ }
+ if let Some(ref filter) = state.settings.exclude_filter {
+ if filter.is_match(&name) {
+ return;
+ }
+ }
+
+ state.queues.entry(ptr).or_insert_with(|| Arc::new(name));
+ }
+
+ fn object_destroyed(&self, _ts: u64, object: std::ptr::NonNull<gst::ffi::GstObject>) {
+ let ptr = object.as_ptr() as usize;
+ let mut state = self.state.lock().unwrap();
+ state.queues.remove(&ptr);
+ }
+
+ fn pad_push_pre(&self, ts: u64, pad: &gst::Pad, _buffer: &gst::Buffer) {
+ let element =
+ if let Some(parent) = pad.parent().and_then(|p| p.downcast::<gst::Element>().ok()) {
+ if parent.type_() == *QUEUE_TYPE {
+ parent
+ } else {
+ return;
+ }
+ } else {
+ return;
+ };
+
+ self.log(&element, ts);
+ }
+
+ fn pad_push_list_pre(&self, ts: u64, pad: &gst::Pad, _list: &gst::BufferList) {
+ let element =
+ if let Some(parent) = pad.parent().and_then(|p| p.downcast::<gst::Element>().ok()) {
+ if parent.type_() == *QUEUE_TYPE {
+ parent
+ } else {
+ return;
+ }
+ } else {
+ return;
+ };
+
+ self.log(&element, ts);
+ }
+
+ #[cfg(not(feature = "v1_22"))]
+ fn pad_push_post(&self, ts: u64, pad: &gst::Pad, _result: gst::FlowReturn) {
+ let element = if let Some(parent) = pad
+ .peer()
+ .and_then(|p| p.parent())
+ .and_then(|p| p.downcast::<gst::Element>().ok())
+ {
+ if parent.type_() == *QUEUE_TYPE {
+ parent
+ } else {
+ return;
+ }
+ } else {
+ return;
+ };
+
+ self.log(&element, ts);
+ }
+
+ #[cfg(not(feature = "v1_22"))]
+ fn pad_push_list_post(&self, ts: u64, pad: &gst::Pad, _result: gst::FlowReturn) {
+ let element = if let Some(parent) = pad
+ .peer()
+ .and_then(|p| p.parent())
+ .and_then(|p| p.downcast::<gst::Element>().ok())
+ {
+ if parent.type_() == *QUEUE_TYPE {
+ parent
+ } else {
+ return;
+ }
+ } else {
+ return;
+ };
+
+ self.log(&element, ts);
+ }
+
+ #[cfg(feature = "v1_22")]
+ fn pad_chain_post(&self, ts: u64, pad: &gst::Pad, _result: gst::FlowReturn) {
+ let element =
+ if let Some(parent) = pad.parent().and_then(|p| p.downcast::<gst::Element>().ok()) {
+ if parent.type_() == *QUEUE_TYPE {
+ parent
+ } else {
+ return;
+ }
+ } else {
+ return;
+ };
+
+ self.log(&element, ts);
+ }
+
+ #[cfg(feature = "v1_22")]
+ fn pad_chain_list_post(&self, ts: u64, pad: &gst::Pad, _result: gst::FlowReturn) {
+ let element =
+ if let Some(parent) = pad.parent().and_then(|p| p.downcast::<gst::Element>().ok()) {
+ if parent.type_() == *QUEUE_TYPE {
+ parent
+ } else {
+ return;
+ }
+ } else {
+ return;
+ };
+
+ self.log(&element, ts);
+ }
+
+ fn element_change_state_post(
+ &self,
+ ts: u64,
+ element: &gst::Element,
+ change: gst::StateChange,
+ _result: gst::StateChangeReturn,
+ ) {
+ if change.next() != gst::State::Null {
+ return;
+ }
+
+ if element.type_() != *QUEUE_TYPE {
+ return;
+ }
+
+ self.log(element, ts);
+ }
+
+ fn pad_push_event_pre(&self, ts: u64, pad: &gst::Pad, ev: &gst::Event) {
+ if ev.type_() != gst::EventType::FlushStop {
+ return;
+ }
+
+ if let Some(parent) = pad.parent().and_then(|p| p.downcast::<gst::Element>().ok()) {
+ if parent.type_() == *QUEUE_TYPE {
+ self.log(&parent, ts);
+ }
+ }
+ }
+}
+
+impl QueueLevels {
+ fn log(&self, element: &gst::Element, timestamp: u64) {
+ let ptr = element.as_ptr() as usize;
+
+ let mut state = self.state.lock().unwrap();
+ let name = match state.queues.get(&ptr) {
+ Some(name) => name.clone(),
+ None => return,
+ };
+
+ let cur_level_bytes = element.property::<u32>("current-level-bytes");
+ let cur_level_time = element.property::<u64>("current-level-time");
+ let cur_level_buffers = element.property::<u32>("current-level-buffers");
+ let max_size_bytes = element.property::<u32>("max-size-bytes");
+ let max_size_time = element.property::<u64>("max-size-time");
+ let max_size_buffers = element.property::<u32>("max-size-buffers");
+ state.log.push(LogLine {
+ timestamp,
+ name,
+ ptr,
+ cur_level_bytes,
+ cur_level_time,
+ cur_level_buffers,
+ max_size_bytes,
+ max_size_time,
+ max_size_buffers,
+ });
+ }
+}
diff --git a/utils/tracers/src/queue_levels/mod.rs b/utils/tracers/src/queue_levels/mod.rs
new file mode 100644
index 00000000..f88a2f9e
--- /dev/null
+++ b/utils/tracers/src/queue_levels/mod.rs
@@ -0,0 +1,20 @@
+// Copyright (C) 2022 Sebastian Dröge <sebastian@centricular.com>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
+// If a copy of the MPL was not distributed with this file, You can obtain one at
+// <https://mozilla.org/MPL/2.0/>.
+//
+// SPDX-License-Identifier: MPL-2.0
+
+use gst::glib;
+use gst::prelude::*;
+
+mod imp;
+
+glib::wrapper! {
+ pub struct QueueLevels(ObjectSubclass<imp::QueueLevels>) @extends gst::Tracer, gst::Object;
+}
+
+pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ gst::Tracer::register(Some(plugin), "queue-levels", QueueLevels::static_type())
+}