diff options
author | Sebastian Dröge <sebastian@centricular.com> | 2022-06-29 16:53:10 +0300 |
---|---|---|
committer | Sebastian Dröge <sebastian@centricular.com> | 2022-07-19 18:03:40 +0300 |
commit | 320cb735276f9e0944f6d11350271649eeefdefc (patch) | |
tree | e84cd2533ee13d23435974bc051c3d5417cc05b2 /utils/tracers | |
parent | 2f987b09ee4aaf8d1fb80d30d2a304565f22a628 (diff) |
tracers: Add new tracer for logging the lateness of each buffer when it leaves a pad
Diffstat (limited to 'utils/tracers')
-rw-r--r-- | utils/tracers/scripts/buffer_lateness.py | 79 | ||||
-rw-r--r-- | utils/tracers/src/buffer_lateness/imp.rs | 377 | ||||
-rw-r--r-- | utils/tracers/src/buffer_lateness/mod.rs | 24 | ||||
-rw-r--r-- | utils/tracers/src/lib.rs | 2 |
4 files changed, 482 insertions, 0 deletions
diff --git a/utils/tracers/scripts/buffer_lateness.py b/utils/tracers/scripts/buffer_lateness.py new file mode 100644 index 000000000..f5e17f5a2 --- /dev/null +++ b/utils/tracers/scripts/buffer_lateness.py @@ -0,0 +1,79 @@ +import argparse +import csv +import re + +import matplotlib +import matplotlib.pyplot as plt + +parser = argparse.ArgumentParser() +parser.add_argument("file", help="Input file with queue levels") +parser.add_argument("--include-filter", help="Regular expression for element:pad names that should be included") +parser.add_argument("--exclude-filter", help="Regular expression for element:pad names that should be excluded") +parser.add_argument("--no-latency", help="do not include latency (enabled by default)", action="store_true") +args = parser.parse_args() + +include_filter = None +if args.include_filter is not None: + include_filter = re.compile(args.include_filter) +exclude_filter = None +if args.exclude_filter is not None: + exclude_filter = re.compile(args.exclude_filter) + +pads = {} + +with open(args.file, mode='r', encoding='utf_8', newline='') as csvfile: + reader = csv.reader(csvfile, delimiter=',', quotechar='|') + for row in reader: + if len(row) != 7: + continue + + if include_filter is not None and not include_filter.match(row[1]): + continue + if exclude_filter is not None and exclude_filter.match(row[1]): + continue + + if not row[1] in pads: + pads[row[1]] = { + 'buffer-clock-time': [], + 'pipeline-clock-time': [], + 'lateness': [], + 'latency': [], + } + + wallclock = float(row[0]) / 1000000000.0 + pads[row[1]]['buffer-clock-time'].append((wallclock, float(row[3]) / 1000000000.0)) + pads[row[1]]['pipeline-clock-time'].append((wallclock, float(row[4]) / 1000000000.0)) + pads[row[1]]['lateness'].append((wallclock, float(row[5]) / 1000000000.0)) + pads[row[1]]['latency'].append((wallclock, float(row[6]) / 1000000000.0)) + +matplotlib.rcParams['figure.dpi'] = 200 + +prop_cycle = plt.rcParams['axes.prop_cycle'] +colors = prop_cycle.by_key()['color'] + +fig, ax1 = plt.subplots() + +ax1.set_xlabel("wallclock (s)") +ax1.set_ylabel("time (s)") +ax1.tick_params(axis='y') + +for (i, (pad, values)) in enumerate(pads.items()): + ax1.plot( + [x[0] for x in values['lateness']], + [x[1] for x in values['lateness']], + '.', label = '{}: lateness'.format(pad), + color = colors[i], + ) + + if not args.no_latency: + ax1.plot( + [x[0] for x in values['latency']], + [x[1] for x in values['latency']], + '-', label = '{}: latency'.format(pad), + color = colors[i], + ) + +fig.tight_layout() +plt.legend(loc='best') + +plt.show() diff --git a/utils/tracers/src/buffer_lateness/imp.rs b/utils/tracers/src/buffer_lateness/imp.rs new file mode 100644 index 000000000..80f461c04 --- /dev/null +++ b/utils/tracers/src/buffer_lateness/imp.rs @@ -0,0 +1,377 @@ +// Copyright (C) 2022 Sebastian Dröge <sebastian@centricular.com> +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// <https://mozilla.org/MPL/2.0/>. +// +// SPDX-License-Identifier: MPL-2.0 + +/// This tracer provides an easy way to collect lateness of each buffer when it is pushed out of a +/// pad in live pipelines. +/// +/// Example: +/// +/// ```console +/// $ GST_TRACERS='buffer-lateness(file="/tmp/buffer_lateness.log")' gst-launch-1.0 audiotestsrc is-live=true ! queue ! fakesink +/// ``` +/// +/// The generated file is a CSV file of the format +/// +/// ```csv +/// timestamp,element:pad name,pad pointer,buffer clock time,pipeline clock time,lateness,min latency +/// ``` +/// +/// ## Parameters +/// +/// ### `file` +/// +/// Specifies the path to the file that will collect the CSV file with the buffer lateness. +/// +/// By default the file is written to `/tmp/buffer_lateness.log`. +/// +/// ### `include-filter` +/// +/// Specifies a regular expression for the `element:pad` names that should be included. +/// +/// By default this is not set. +/// +/// ### `exclude-filter` +/// +/// Specifies a regular expression for the `element:pad` names that should **not** be included. +/// +/// By default this is not set. +/// +use std::collections::HashMap; +use std::path::PathBuf; +use std::str::FromStr; +use std::sync::{Arc, Mutex}; + +use gst::glib; +use gst::prelude::*; +use gst::subclass::prelude::*; +use once_cell::sync::Lazy; +use regex::Regex; + +static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| { + gst::DebugCategory::new( + "buffer-lateness", + gst::DebugColorFlags::empty(), + Some("Tracer to collect buffer lateness"), + ) +}); + +#[derive(Debug)] +struct Settings { + file: PathBuf, + include_filter: Option<Regex>, + exclude_filter: Option<Regex>, +} + +impl Default for Settings { + fn default() -> Self { + let mut file = glib::tmp_dir(); + file.push("buffer_lateness.log"); + + Self { + file, + include_filter: None, + exclude_filter: None, + } + } +} + +impl Settings { + fn update_from_params(&mut self, obj: &super::BufferLateness, params: String) { + let s = match gst::Structure::from_str(&format!("buffer-lateness,{}", params)) { + Ok(s) => s, + Err(err) => { + gst::warning!(CAT, obj: obj, "failed to parse tracer parameters: {}", err); + return; + } + }; + + if let Ok(file) = s.get::<&str>("file") { + gst::log!(CAT, obj: obj, "file= {}", file); + self.file = PathBuf::from(file); + } + + if let Ok(filter) = s.get::<&str>("include-filter") { + gst::log!(CAT, obj: obj, "include filter= {}", filter); + let filter = match Regex::new(filter) { + Ok(filter) => Some(filter), + Err(err) => { + gst::error!( + CAT, + obj: obj, + "Failed to compile include-filter regex: {}", + err + ); + None + } + }; + self.include_filter = filter; + } + + if let Ok(filter) = s.get::<&str>("exclude-filter") { + gst::log!(CAT, obj: obj, "exclude filter= {}", filter); + let filter = match Regex::new(filter) { + Ok(filter) => Some(filter), + Err(err) => { + gst::error!( + CAT, + obj: obj, + "Failed to compile exclude-filter regex: {}", + err + ); + None + } + }; + self.exclude_filter = filter; + } + } +} + +#[derive(Default)] +struct State { + pads: HashMap<usize, Pad>, + log: Vec<LogLine>, + settings: Settings, +} + +struct Pad { + element_name: Option<Arc<glib::GString>>, + pad_name: Arc<glib::GString>, + latency: u64, +} + +struct LogLine { + timestamp: u64, + element_name: Arc<glib::GString>, + pad_name: Arc<glib::GString>, + ptr: usize, + buffer_clock_time: u64, + pipeline_clock_time: u64, + lateness: i64, + min_latency: u64, +} + +#[derive(Default)] +pub struct BufferLateness { + state: Mutex<State>, +} + +#[glib::object_subclass] +impl ObjectSubclass for BufferLateness { + const NAME: &'static str = "GstBufferLateness"; + type Type = super::BufferLateness; + type ParentType = gst::Tracer; +} + +impl ObjectImpl for BufferLateness { + fn constructed(&self, obj: &Self::Type) { + self.parent_constructed(obj); + + if let Some(params) = obj.property::<Option<String>>("params") { + let mut state = self.state.lock().unwrap(); + state.settings.update_from_params(obj, params); + } + + self.register_hook(TracerHook::ElementAddPad); + self.register_hook(TracerHook::ElementRemovePad); + self.register_hook(TracerHook::PadPushPre); + self.register_hook(TracerHook::PadPushListPre); + self.register_hook(TracerHook::PadQueryPost); + } + + fn dispose(&self, obj: &Self::Type) { + use std::io::prelude::*; + + let state = self.state.lock().unwrap(); + + let mut file = match std::fs::File::create(&state.settings.file) { + Ok(file) => file, + Err(err) => { + gst::error!(CAT, obj: obj, "Failed to create file: {err}"); + return; + } + }; + + for LogLine { + timestamp, + element_name, + pad_name, + ptr, + buffer_clock_time, + pipeline_clock_time, + lateness, + min_latency, + } in &state.log + { + if let Err(err) = writeln!(&mut file, "{timestamp},{element_name}:{pad_name},0x{ptr:08x},{buffer_clock_time},{pipeline_clock_time},{lateness},{min_latency}") { + gst::error!(CAT, obj: obj, "Failed to write to file: {err}"); + return; + } + } + } +} + +impl GstObjectImpl for BufferLateness {} + +impl TracerImpl for BufferLateness { + fn element_add_pad(&self, _ts: u64, _element: &gst::Element, pad: &gst::Pad) { + if pad.direction() != gst::PadDirection::Src { + return; + } + + let tracer = self.instance(); + let ptr = pad.as_ptr() as usize; + gst::debug!( + CAT, + obj: &tracer, + "new source pad: {} 0x{:08x}", + pad.name(), + ptr + ); + + let mut state = self.state.lock().unwrap(); + // FIXME: Element name might not be set yet here if the pad is added in instance_init + // already. + state.pads.entry(ptr).or_insert_with(|| Pad { + element_name: None, + pad_name: Arc::new(pad.name()), + latency: 0, + }); + } + + fn element_remove_pad(&self, _ts: u64, _element: &gst::Element, pad: &gst::Pad) { + let ptr = pad.as_ptr() as usize; + let mut state = self.state.lock().unwrap(); + state.pads.remove(&ptr); + } + + fn pad_push_pre(&self, ts: u64, pad: &gst::Pad, buffer: &gst::Buffer) { + let timestamp = match buffer.dts_or_pts() { + Some(timestamp) => timestamp, + None => return, + }; + + let element = match pad.parent().and_then(|p| p.downcast::<gst::Element>().ok()) { + Some(element) => element, + None => return, + }; + + let clock = match element.clock() { + Some(clock) => clock, + None => return, + }; + + let base_time = match element.base_time() { + // FIXME: Workaround for base time being set to 0 initially instead of None + Some(base_time) + if base_time == gst::ClockTime::ZERO && element.start_time().is_some() => + { + return + } + Some(base_time) => base_time, + None => return, + }; + + let segment = match pad + .sticky_event::<gst::event::Segment>(0) + .map(|s| s.segment().clone()) + .and_then(|s| s.downcast::<gst::ClockTime>().ok()) + { + Some(segment) => segment, + None => return, + }; + + let ptr = pad.as_ptr() as usize; + + let mut state = self.state.lock().unwrap(); + let State { + ref mut pads, + ref mut log, + ref settings, + .. + } = &mut *state; + if let Some(pad) = pads.get_mut(&ptr) { + if pad.element_name.is_none() { + pad.element_name = Some(Arc::new(element.name())); + + let name = format!("{}:{}", pad.element_name.as_ref().unwrap(), pad.pad_name); + if let Some(ref filter) = settings.include_filter { + if !filter.is_match(&name) { + pads.remove(&ptr); + return; + } + } + if let Some(ref filter) = settings.exclude_filter { + if filter.is_match(&name) { + pads.remove(&ptr); + return; + } + } + } + + let element_name = pad.element_name.as_ref().unwrap(); + + let running_time = match segment.to_running_time(timestamp) { + Some(running_time) => running_time, + None => return, + }; + + let buffer_clock_time = running_time + base_time; + let pipeline_clock_time = match clock.time() { + Some(time) => time, + None => return, + }; + + log.push(LogLine { + timestamp: ts, + element_name: element_name.clone(), + pad_name: pad.pad_name.clone(), + ptr, + buffer_clock_time: buffer_clock_time.nseconds(), + pipeline_clock_time: pipeline_clock_time.nseconds(), + lateness: if buffer_clock_time > pipeline_clock_time { + -((buffer_clock_time.nseconds() - pipeline_clock_time.nseconds()) as i64) + } else { + (pipeline_clock_time.nseconds() - buffer_clock_time.nseconds()) as i64 + }, + min_latency: pad.latency, + }); + } + } + + fn pad_push_list_pre(&self, ts: u64, pad: &gst::Pad, buffer_list: &gst::BufferList) { + for buffer in buffer_list.iter_owned() { + self.pad_push_pre(ts, pad, &buffer); + } + } + + #[allow(clippy::single_match)] + fn pad_query_post(&self, _ts: u64, pad: &gst::Pad, query: &gst::Query, res: bool) { + if !res { + return; + } + + if pad.direction() != gst::PadDirection::Src { + return; + } + + match query.view() { + gst::QueryView::Latency(l) => { + let mut state = self.state.lock().unwrap(); + if let Some(pad) = state.pads.get_mut(&(pad.as_ptr() as usize)) { + let (live, min, _max) = l.result(); + if live { + pad.latency = min.nseconds(); + } else { + pad.latency = 0; + } + } + } + _ => (), + } + } +} diff --git a/utils/tracers/src/buffer_lateness/mod.rs b/utils/tracers/src/buffer_lateness/mod.rs new file mode 100644 index 000000000..c9783eeea --- /dev/null +++ b/utils/tracers/src/buffer_lateness/mod.rs @@ -0,0 +1,24 @@ +// Copyright (C) 2022 Sebastian Dröge <sebastian@centricular.com> +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// <https://mozilla.org/MPL/2.0/>. +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::glib; +use gst::prelude::*; + +mod imp; + +glib::wrapper! { + pub struct BufferLateness(ObjectSubclass<imp::BufferLateness>) @extends gst::Tracer, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Tracer::register( + Some(plugin), + "buffer-lateness", + BufferLateness::static_type(), + ) +} diff --git a/utils/tracers/src/lib.rs b/utils/tracers/src/lib.rs index bd037e8d8..c72d4b85b 100644 --- a/utils/tracers/src/lib.rs +++ b/utils/tracers/src/lib.rs @@ -9,12 +9,14 @@ use gst::glib; +mod buffer_lateness; mod pipeline_snapshot; mod queue_levels; fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { pipeline_snapshot::register(plugin)?; queue_levels::register(plugin)?; + buffer_lateness::register(plugin)?; Ok(()) } |