Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/sdroege/gst-plugin-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrançois Laignel <fengalin@free.fr>2022-11-03 14:33:39 +0300
committerSebastian Dröge <slomo@coaxion.net>2022-11-09 10:55:04 +0300
commit29a490f6dc7b792df7ab45f6a79cbfbee694d332 (patch)
tree48fb1616206c4191d626ea665fbd75afd3036010 /generic
parent9b96cfc4526090f153f9b6a5b539780c71f08499 (diff)
ts: introduce ts-audiotestsrc
This makes it easy to generate "listenable" signals and to evaluate discontinuities. When the `tuning` feature is activated and the `main-elem` property is set, the element can log the parked duration in %, which is an image of the CPU usage for the ts-context. This commit adds a test mode to `udpsrc-benchmark-sender` which generates default audio buffers from `ts-audiotestsrc`. The `rtp` mode is modified so that it uses `ts-audiotestsrc`.
Diffstat (limited to 'generic')
-rw-r--r--generic/threadshare/Cargo.toml1
-rw-r--r--generic/threadshare/examples/benchmark.rs27
-rw-r--r--generic/threadshare/examples/udpsrc_benchmark_sender.rs143
-rw-r--r--generic/threadshare/src/audiotestsrc/imp.rs735
-rw-r--r--generic/threadshare/src/audiotestsrc/mod.rs17
-rw-r--r--generic/threadshare/src/lib.rs23
6 files changed, 882 insertions, 64 deletions
diff --git a/generic/threadshare/Cargo.toml b/generic/threadshare/Cargo.toml
index a1affdfb..0f73e6b5 100644
--- a/generic/threadshare/Cargo.toml
+++ b/generic/threadshare/Cargo.toml
@@ -16,6 +16,7 @@ futures = "0.3.21"
libc = "0.2"
gio = { git = "https://github.com/gtk-rs/gtk-rs-core" }
gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
+gst-audio = { package = "gstreamer-audio", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gst-net = { package = "gstreamer-net", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gst-rtp = { package = "gstreamer-rtp", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
once_cell = "1"
diff --git a/generic/threadshare/examples/benchmark.rs b/generic/threadshare/examples/benchmark.rs
index 2080b777..c7e98af7 100644
--- a/generic/threadshare/examples/benchmark.rs
+++ b/generic/threadshare/examples/benchmark.rs
@@ -63,7 +63,7 @@ fn main() {
};
let is_rtp = args.len() > 6 && (args[6] == "rtp");
- let rtp_caps = gst::Caps::builder("audio/x-rtp")
+ let rtp_caps = gst::Caps::builder("application/x-rtp")
.field("media", "audio")
.field("payload", 8i32)
.field("clock-rate", 8000)
@@ -97,7 +97,7 @@ fn main() {
"udpsrc" => {
let source = gst::ElementFactory::make("udpsrc")
.name(format!("source-{}", i).as_str())
- .property("port", 40000i32 + i as i32)
+ .property("port", 5004i32 + i as i32)
.property("retrieve-sender-address", false)
.build()
.unwrap();
@@ -108,7 +108,7 @@ fn main() {
let context = build_context();
let source = gst::ElementFactory::make("ts-udpsrc")
.name(format!("source-{}", i).as_str())
- .property("port", 40000i32 + i as i32)
+ .property("port", 5004i32 + i as i32)
.property("context", &context)
.property("context-wait", wait)
.build()
@@ -184,22 +184,7 @@ fn main() {
pipeline.add_many(elements).unwrap();
gst::Element::link_many(elements).unwrap();
} else {
- let queue = if let Some(context) = context {
- let queue = gst::ElementFactory::make("ts-queue")
- .name(format!("queue-{}", i).as_str())
- .property("context", &context)
- .property("context-wait", wait)
- .build()
- .unwrap();
- queue
- } else {
- gst::ElementFactory::make("queue")
- .name(format!("queue-{}", i).as_str())
- .build()
- .unwrap()
- };
-
- let elements = &[&source, &queue, &sink];
+ let elements = &[&source, &sink];
pipeline.add_many(elements).unwrap();
gst::Element::link_many(elements).unwrap();
}
@@ -268,14 +253,14 @@ fn main() {
let elapsed = init.elapsed();
gst::info!(
CAT,
- "{:>6.2} / s / stream",
+ "Thrpt: {:>6.2}",
total_count * 1_000.0 / elapsed.as_millis() as f32
);
#[cfg(feature = "tuning")]
gst::info!(
CAT,
- "{:>6.2}% parked",
+ "Parked: {:>6.2}%",
(ctx_0.parked_duration() - parked_init).as_nanos() as f32 * 100.0
/ elapsed.as_nanos() as f32
);
diff --git a/generic/threadshare/examples/udpsrc_benchmark_sender.rs b/generic/threadshare/examples/udpsrc_benchmark_sender.rs
index 196662ac..2ba9e24f 100644
--- a/generic/threadshare/examples/udpsrc_benchmark_sender.rs
+++ b/generic/threadshare/examples/udpsrc_benchmark_sender.rs
@@ -17,19 +17,45 @@
//
// SPDX-License-Identifier: LGPL-2.1-or-later
+use gst::glib;
+use gst::prelude::*;
+
+use once_cell::sync::Lazy;
+
use std::net;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::{env, thread, time};
+static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
+ gst::DebugCategory::new(
+ "ts-udpsrc-benchmark-sender",
+ gst::DebugColorFlags::empty(),
+ Some("Thread-sharing UDP src benchmark sender"),
+ )
+});
+
fn main() {
+ gst::init().unwrap();
+ gstthreadshare::plugin_register_static().unwrap();
+
let args = env::args().collect::<Vec<_>>();
assert!(args.len() > 1);
let n_streams: u16 = args[1].parse().unwrap();
- if args.len() > 2 && args[2] == "rtp" {
- send_rtp_buffers(n_streams);
+ let num_buffers: Option<i32> = if args.len() > 3 {
+ args[3].parse().ok()
} else {
- send_raw_buffers(n_streams);
+ None
+ };
+
+ if args.len() > 2 {
+ match args[2].as_str() {
+ "raw" => send_raw_buffers(n_streams),
+ "rtp" => send_rtp_buffers(n_streams, num_buffers),
+ _ => send_test_buffers(n_streams, num_buffers),
+ }
+ } else {
+ send_test_buffers(n_streams, num_buffers);
}
}
@@ -38,7 +64,7 @@ fn send_raw_buffers(n_streams: u16) {
let socket = net::UdpSocket::bind("0.0.0.0:0").unwrap();
let ipaddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
- let destinations = (40000..(40000 + n_streams))
+ let destinations = (5004..(5004 + n_streams))
.map(|port| SocketAddr::new(ipaddr, port))
.collect::<Vec<_>>();
@@ -60,43 +86,60 @@ fn send_raw_buffers(n_streams: u16) {
}
}
-fn send_rtp_buffers(n_streams: u16) {
- use gst::glib;
- use gst::prelude::*;
-
- gst::init().unwrap();
-
- #[cfg(debug_assertions)]
- {
- use std::path::Path;
+fn send_test_buffers(n_streams: u16, num_buffers: Option<i32>) {
+ let pipeline = gst::Pipeline::default();
+ for i in 0..n_streams {
+ let src = gst::ElementFactory::make("ts-audiotestsrc")
+ .name(format!("ts-audiotestsrc-{}", i).as_str())
+ .property("context-wait", 20u32)
+ .property("is-live", true)
+ .property("do-timestamp", true)
+ .build()
+ .unwrap();
- let mut path = Path::new("target/debug");
- if !path.exists() {
- path = Path::new("../../target/debug");
+ if let Some(num_buffers) = num_buffers {
+ src.set_property("num-buffers", num_buffers);
}
- gst::Registry::get().scan_path(path);
- }
- #[cfg(not(debug_assertions))]
- {
- use std::path::Path;
-
- let mut path = Path::new("target/release");
- if !path.exists() {
- path = Path::new("../../target/release");
+ #[cfg(feature = "tuning")]
+ if i == 0 {
+ src.set_property("main-elem", true);
}
- gst::Registry::get().scan_path(path);
+ let sink = gst::ElementFactory::make("ts-udpsink")
+ .name(format!("udpsink-{}", i).as_str())
+ .property("clients", format!("127.0.0.1:{}", i + 5004))
+ .property("context-wait", 20u32)
+ .build()
+ .unwrap();
+
+ let elements = &[&src, &sink];
+ pipeline.add_many(elements).unwrap();
+ gst::Element::link_many(elements).unwrap();
}
- let l = glib::MainLoop::new(None, false);
+ run(pipeline);
+}
+
+fn send_rtp_buffers(n_streams: u16, num_buffers: Option<i32>) {
let pipeline = gst::Pipeline::default();
for i in 0..n_streams {
- let src = gst::ElementFactory::make("audiotestsrc")
- .name(format!("audiotestsrc-{}", i).as_str())
+ let src = gst::ElementFactory::make("ts-audiotestsrc")
+ .name(format!("ts-audiotestsrc-{}", i).as_str())
+ .property("context-wait", 20u32)
+ .property("is-live", true)
+ .property("do-timestamp", true)
.build()
.unwrap();
- src.set_property("is-live", true);
+
+ if let Some(num_buffers) = num_buffers {
+ src.set_property("num-buffers", num_buffers);
+ }
+
+ #[cfg(feature = "tuning")]
+ if i == 0 {
+ src.set_property("main-elem", true);
+ }
let enc = gst::ElementFactory::make("alawenc")
.name(format!("alawenc-{}", i).as_str())
@@ -106,11 +149,11 @@ fn send_rtp_buffers(n_streams: u16) {
.name(format!("rtppcmapay-{}", i).as_str())
.build()
.unwrap();
+
let sink = gst::ElementFactory::make("ts-udpsink")
.name(format!("udpsink-{}", i).as_str())
- .property("clients", format!("127.0.0.1:{}", i + 40000))
- .property("context", "context-udpsink")
.property("context-wait", 20u32)
+ .property("clients", format!("127.0.0.1:{}", i + 5004))
.build()
.unwrap();
@@ -119,6 +162,42 @@ fn send_rtp_buffers(n_streams: u16) {
gst::Element::link_many(elements).unwrap();
}
+ run(pipeline);
+}
+
+fn run(pipeline: gst::Pipeline) {
+ let l = glib::MainLoop::new(None, false);
+
+ let bus = pipeline.bus().unwrap();
+ let l_clone = l.clone();
+ bus.add_watch(move |_, msg| {
+ use gst::MessageView;
+ match msg.view() {
+ MessageView::Eos(_) => {
+ gst::info!(CAT, "Received eos");
+ l_clone.quit();
+
+ glib::Continue(false)
+ }
+ MessageView::Error(msg) => {
+ gst::error!(
+ CAT,
+ "Error from {:?}: {} ({:?})",
+ msg.src().map(|s| s.path_string()),
+ msg.error(),
+ msg.debug()
+ );
+ l_clone.quit();
+
+ glib::Continue(false)
+ }
+ _ => glib::Continue(true),
+ }
+ })
+ .expect("Failed to add bus watch");
+
pipeline.set_state(gst::State::Playing).unwrap();
l.run();
+
+ pipeline.set_state(gst::State::Null).unwrap();
}
diff --git a/generic/threadshare/src/audiotestsrc/imp.rs b/generic/threadshare/src/audiotestsrc/imp.rs
new file mode 100644
index 00000000..9ae51d0b
--- /dev/null
+++ b/generic/threadshare/src/audiotestsrc/imp.rs
@@ -0,0 +1,735 @@
+// Copyright (C) 2022 François Laignel <fengalin@free.fr>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
+// If a copy of the MPL was not distributed with this file, You can obtain one at
+// <https://mozilla.org/MPL/2.0/>.
+//
+// SPDX-License-Identifier: MPL-2.0
+
+use futures::future::BoxFuture;
+use futures::prelude::*;
+
+use gst::glib;
+use gst::prelude::*;
+use gst::subclass::prelude::*;
+
+use once_cell::sync::Lazy;
+
+use std::mem::size_of;
+use std::sync::Mutex;
+use std::time::Duration;
+#[cfg(feature = "tuning")]
+use std::time::Instant;
+
+use crate::runtime::prelude::*;
+use crate::runtime::{self, task, timer, PadSrc, Task};
+
+static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
+ gst::DebugCategory::new(
+ "ts-audiotestsrc",
+ gst::DebugColorFlags::empty(),
+ Some("Thread-sharing audio test src"),
+ )
+});
+
+const DEFAULT_CONTEXT: &str = "";
+const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO;
+const DEFAULT_BUFFER_DURATION: gst::ClockTime = gst::ClockTime::from_mseconds(10);
+const DEFAULT_DO_TIMESTAMP: bool = false;
+const DEFAULT_IS_LIVE: bool = false;
+const DEFAULT_NUM_BUFFERS: i32 = -1;
+
+const DEFAULT_CHANNELS: usize = 1;
+const DEFAULT_FREQ: f32 = 440.0;
+const DEFAULT_VOLUME: f32 = 0.8;
+const DEFAULT_RATE: u32 = 44_100;
+
+#[cfg(feature = "tuning")]
+const RAMPUP_BUFFER_COUNT: u32 = 500;
+#[cfg(feature = "tuning")]
+const LOG_BUFFER_INTERVAL: u32 = 2000;
+
+static DEFAULT_CAPS: Lazy<gst::Caps> = Lazy::new(|| {
+ gst_audio::AudioCapsBuilder::new_interleaved()
+ .format(gst_audio::AUDIO_FORMAT_S16)
+ .rate_range(8_000..i32::MAX)
+ .channels_range(1..i32::MAX)
+ .build()
+});
+
+#[derive(Debug, Clone)]
+struct Settings {
+ context: String,
+ context_wait: Duration,
+ do_timestamp: bool,
+ is_live: bool,
+ buffer_duration: gst::ClockTime,
+ num_buffers: Option<u32>,
+ #[cfg(feature = "tuning")]
+ is_main_elem: bool,
+}
+
+impl Default for Settings {
+ fn default() -> Self {
+ Settings {
+ context: DEFAULT_CONTEXT.into(),
+ context_wait: DEFAULT_CONTEXT_WAIT,
+ do_timestamp: DEFAULT_DO_TIMESTAMP,
+ is_live: DEFAULT_IS_LIVE,
+ buffer_duration: DEFAULT_BUFFER_DURATION,
+ num_buffers: None,
+ #[cfg(feature = "tuning")]
+ is_main_elem: false,
+ }
+ }
+}
+
+#[derive(Clone, Debug)]
+struct AudioTestSrcPadHandler;
+impl PadSrcHandler for AudioTestSrcPadHandler {
+ type ElementImpl = AudioTestSrc;
+
+ fn src_query(self, pad: &gst::Pad, imp: &Self::ElementImpl, query: &mut gst::QueryRef) -> bool {
+ gst::debug!(CAT, obj: pad, "Received {query:?}");
+
+ if let gst::QueryViewMut::Latency(q) = query.view_mut() {
+ let settings = imp.settings.lock().unwrap();
+ let min_latency = if settings.is_live {
+ settings.buffer_duration
+ } else {
+ gst::ClockTime::ZERO
+ };
+
+ q.set(
+ settings.is_live,
+ min_latency,
+ min_latency
+ + runtime::Context::current().map_or(gst::ClockTime::ZERO, |ctx| {
+ gst::ClockTime::try_from(ctx.wait_duration()).unwrap()
+ }),
+ );
+
+ return true;
+ }
+
+ gst::Pad::query_default(pad, Some(&*imp.obj()), query)
+ }
+}
+
+#[derive(Debug, Copy, Clone)]
+enum Negotiation {
+ Unchanged,
+ Changed,
+}
+
+impl Negotiation {
+ fn has_changed(self) -> bool {
+ matches!(self, Negotiation::Changed)
+ }
+}
+
+#[derive(Debug)]
+struct AudioTestSrcTask {
+ elem: super::AudioTestSrc,
+ buffer_pool: gst::BufferPool,
+ rate: u32,
+ channels: usize,
+ do_timestamp: bool,
+ is_live: bool,
+ buffer_duration: gst::ClockTime,
+ need_initial_events: bool,
+ step: f32,
+ accumulator: f32,
+ last_buffer_end: Option<gst::ClockTime>,
+ caps: gst::Caps,
+ buffer_count: u32,
+ num_buffers: Option<u32>,
+ #[cfg(feature = "tuning")]
+ is_main_elem: bool,
+ #[cfg(feature = "tuning")]
+ parked_duration_init: Option<Duration>,
+ #[cfg(feature = "tuning")]
+ log_start: Instant,
+}
+
+impl AudioTestSrcTask {
+ fn new(elem: super::AudioTestSrc) -> Self {
+ AudioTestSrcTask {
+ elem,
+ buffer_pool: gst::BufferPool::new(),
+ rate: DEFAULT_RATE,
+ channels: DEFAULT_CHANNELS,
+ do_timestamp: DEFAULT_DO_TIMESTAMP,
+ is_live: DEFAULT_IS_LIVE,
+ buffer_duration: DEFAULT_BUFFER_DURATION,
+ need_initial_events: true,
+ step: 0.0,
+ accumulator: 0.0,
+ last_buffer_end: None,
+ caps: gst::Caps::new_empty(),
+ buffer_count: 0,
+ num_buffers: None,
+ #[cfg(feature = "tuning")]
+ is_main_elem: false,
+ #[cfg(feature = "tuning")]
+ parked_duration_init: None,
+ #[cfg(feature = "tuning")]
+ log_start: Instant::now(),
+ }
+ }
+
+ async fn negotiate(&mut self) -> Result<Negotiation, gst::ErrorMessage> {
+ let imp = self.elem.imp();
+ let pad = imp.src_pad.gst_pad();
+
+ if !pad.check_reconfigure() {
+ return Ok(Negotiation::Unchanged);
+ }
+
+ let mut caps = pad.peer_query_caps(Some(&DEFAULT_CAPS));
+ gst::debug!(CAT, imp: imp, "Peer returned {caps:?}");
+
+ if caps.is_empty() {
+ pad.mark_reconfigure();
+ let err = gst::error_msg!(gst::CoreError::Pad, ["No common Caps"]);
+ gst::error!(CAT, imp: imp, "{err}");
+ return Err(err);
+ }
+
+ if caps.is_any() {
+ gst::debug!(CAT, imp: imp, "Using our own Caps");
+ caps = DEFAULT_CAPS.clone();
+ }
+
+ {
+ let caps = caps.make_mut();
+ let s = caps.structure_mut(0).ok_or_else(|| {
+ let err = gst::error_msg!(gst::CoreError::Pad, ["Invalid peer Caps structure"]);
+ gst::error!(CAT, imp: imp, "{err}");
+ err
+ })?;
+
+ s.fixate_field_nearest_int("rate", DEFAULT_RATE as i32);
+ self.rate = s.get::<i32>("rate").unwrap() as u32;
+ self.step = 2.0 * std::f32::consts::PI * DEFAULT_FREQ / (self.rate as f32);
+
+ s.fixate_field_nearest_int("channels", DEFAULT_CHANNELS as i32);
+ self.channels = s.get::<i32>("channels").unwrap() as usize;
+
+ if self.channels > 2 {
+ s.set::<gst::Bitmask>(
+ "channel-mask",
+ gst_audio::AudioChannelPosition::fallback_mask(self.channels as u32).into(),
+ );
+ }
+ }
+
+ caps.fixate();
+ gst::debug!(CAT, imp: imp, "fixated to {caps:?}");
+
+ imp.src_pad.push_event(gst::event::Caps::new(&caps)).await;
+
+ self.caps = caps;
+
+ Ok(Negotiation::Changed)
+ }
+}
+
+impl TaskImpl for AudioTestSrcTask {
+ type Item = gst::Buffer;
+
+ fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
+ gst::log!(CAT, obj: self.elem, "Preparing Task");
+
+ let imp = self.elem.imp();
+ let settings = imp.settings.lock().unwrap();
+ self.do_timestamp = settings.do_timestamp;
+ self.is_live = settings.is_live;
+ self.buffer_duration = settings.buffer_duration;
+ self.num_buffers = settings.num_buffers;
+
+ #[cfg(feature = "tuning")]
+ {
+ self.is_main_elem = settings.is_main_elem;
+ }
+
+ future::ok(()).boxed()
+ }
+
+ fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
+ async move {
+ gst::log!(CAT, obj: self.elem, "Starting Task");
+
+ if self.need_initial_events {
+ gst::debug!(CAT, obj: self.elem, "Pushing initial events");
+
+ let stream_id =
+ format!("{:08x}{:08x}", rand::random::<u32>(), rand::random::<u32>());
+ let stream_start_evt = gst::event::StreamStart::builder(&stream_id)
+ .group_id(gst::GroupId::next())
+ .build();
+ self.elem.imp().src_pad.push_event(stream_start_evt).await;
+ }
+
+ if self.negotiate().await?.has_changed() {
+ let bytes_per_buffer = (self.rate as u64)
+ * self.buffer_duration.mseconds()
+ * self.channels as u64
+ * size_of::<i16>() as u64
+ / 1_000;
+
+ let mut pool_config = self.buffer_pool.config();
+ pool_config
+ .as_mut()
+ .set_params(Some(&self.caps), bytes_per_buffer as u32, 2, 6);
+ self.buffer_pool.set_config(pool_config).unwrap();
+ }
+
+ assert!(!self.caps.is_empty());
+ self.buffer_pool.set_active(true).unwrap();
+
+ if self.need_initial_events {
+ let segment_evt =
+ gst::event::Segment::new(&gst::FormattedSegment::<gst::format::Time>::new());
+ self.elem.imp().src_pad.push_event(segment_evt).await;
+
+ self.need_initial_events = false;
+ }
+
+ self.buffer_count = 0;
+
+ #[cfg(feature = "tuning")]
+ if self.is_main_elem {
+ self.parked_duration_init = None;
+ }
+
+ Ok(())
+ }
+ .boxed()
+ }
+
+ fn pause(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
+ gst::log!(CAT, obj: self.elem, "Pausing Task");
+ self.buffer_pool.set_active(false).unwrap();
+
+ future::ok(()).boxed()
+ }
+
+ fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
+ gst::log!(CAT, obj: self.elem, "Stopping Task");
+
+ self.need_initial_events = true;
+ self.accumulator = 0.0;
+ self.last_buffer_end = None;
+
+ future::ok(()).boxed()
+ }
+
+ fn try_next(&mut self) -> BoxFuture<'_, Result<gst::Buffer, gst::FlowError>> {
+ let mut buffer = match self.buffer_pool.acquire_buffer(None) {
+ Ok(buffer) => buffer,
+ Err(err) => {
+ gst::error!(CAT, obj: self.elem, "Failed to acquire buffer {}", err);
+ return future::err(err).boxed();
+ }
+ };
+
+ let buffer_mut = buffer.get_mut().unwrap();
+
+ let start = if self.is_live | self.do_timestamp {
+ self.last_buffer_end
+ .or_else(|| self.elem.current_running_time())
+ } else {
+ None
+ };
+
+ {
+ use std::io::Write;
+
+ let mut mapped = buffer_mut.map_writable().unwrap();
+ let slice = mapped.as_mut_slice();
+ slice
+ .chunks_mut(self.channels * size_of::<i16>())
+ .for_each(|frame| {
+ let sample = ((self.accumulator.sin() * DEFAULT_VOLUME * (i16::MAX as f32))
+ as i16)
+ .to_ne_bytes();
+
+ frame.chunks_mut(size_of::<i16>()).for_each(|mut channel| {
+ let _ = channel.write(&sample).unwrap();
+ });
+
+ self.accumulator += self.step;
+ if self.accumulator >= 2.0 * std::f32::consts::PI {
+ self.accumulator = -2.0 * std::f32::consts::PI;
+ }
+ });
+ }
+
+ if self.do_timestamp {
+ buffer_mut.set_dts(start);
+ buffer_mut.set_duration(self.buffer_duration);
+ }
+
+ self.last_buffer_end = start.opt_add(self.buffer_duration);
+
+ async move {
+ if self.is_live {
+ if let Some(delay) = self
+ .last_buffer_end
+ .unwrap()
+ .checked_sub(self.elem.current_running_time().unwrap())
+ {
+ // Wait for all samples to fit in last time slice
+ timer::delay_for_at_least(delay.into()).await;
+ }
+ } else {
+ // Let the scheduler share time with other tasks
+ runtime::executor::yield_now().await;
+ }
+
+ Ok(buffer)
+ }
+ .boxed()
+ }
+
+ fn handle_item(&mut self, buffer: gst::Buffer) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ async move {
+ let imp = self.elem.imp();
+
+ gst::debug!(CAT, imp: imp, "Pushing {buffer:?}");
+ imp.src_pad.push(buffer).await?;
+ gst::log!(CAT, imp: imp, "Successfully pushed buffer");
+
+ self.buffer_count += 1;
+
+ #[cfg(feature = "tuning")]
+ if self.is_main_elem {
+ if let Some(parked_duration_init) = self.parked_duration_init {
+ if self.buffer_count % LOG_BUFFER_INTERVAL == 0 {
+ let parked_duration =
+ runtime::Context::current().unwrap().parked_duration()
+ - parked_duration_init;
+
+ gst::info!(
+ CAT,
+ "Parked: {:5.2?}%",
+ parked_duration.as_nanos() as f32 * 100.0
+ / self.log_start.elapsed().as_nanos() as f32,
+ );
+ }
+ } else if self.buffer_count == RAMPUP_BUFFER_COUNT {
+ self.parked_duration_init =
+ Some(runtime::Context::current().unwrap().parked_duration());
+ self.log_start = Instant::now();
+
+ gst::info!(CAT, "Ramp up complete");
+ }
+ }
+
+ if self.num_buffers.opt_eq(self.buffer_count) == Some(true) {
+ return Err(gst::FlowError::Eos);
+ }
+
+ Ok(())
+ }
+ .boxed()
+ }
+
+ fn handle_loop_error(&mut self, err: gst::FlowError) -> BoxFuture<'_, task::Trigger> {
+ async move {
+ match err {
+ gst::FlowError::Flushing => {
+ gst::debug!(CAT, obj: self.elem, "Flushing");
+
+ task::Trigger::FlushStart
+ }
+ gst::FlowError::Eos => {
+ gst::debug!(CAT, obj: self.elem, "EOS");
+ self.elem
+ .imp()
+ .src_pad
+ .push_event(gst::event::Eos::new())
+ .await;
+
+ task::Trigger::Stop
+ }
+ err => {
+ gst::error!(CAT, obj: self.elem, "Got error {err}");
+ gst::element_error!(
+ &self.elem,
+ gst::StreamError::Failed,
+ ("Internal data stream error"),
+ ["streaming stopped, reason {}", err]
+ );
+
+ task::Trigger::Error
+ }
+ }
+ }
+ .boxed()
+ }
+}
+
+#[derive(Debug)]
+pub struct AudioTestSrc {
+ src_pad: PadSrc,
+ task: Task,
+ settings: Mutex<Settings>,
+}
+
+impl AudioTestSrc {
+ fn prepare(&self) -> Result<(), gst::ErrorMessage> {
+ gst::debug!(CAT, imp: self, "Preparing");
+
+ let settings = self.settings.lock().unwrap();
+ let context =
+ runtime::Context::acquire(&settings.context, settings.context_wait).map_err(|err| {
+ gst::error_msg!(
+ gst::ResourceError::OpenRead,
+ ["Failed to acquire Context: {}", err]
+ )
+ })?;
+ drop(settings);
+
+ self.task
+ .prepare(AudioTestSrcTask::new(self.obj().clone()), context)
+ .block_on()?;
+
+ gst::debug!(CAT, imp: self, "Prepared");
+
+ Ok(())
+ }
+
+ fn unprepare(&self) {
+ gst::debug!(CAT, imp: self, "Unpreparing");
+ self.task.unprepare().block_on().unwrap();
+ gst::debug!(CAT, imp: self, "Unprepared");
+ }
+
+ fn stop(&self) -> Result<(), gst::ErrorMessage> {
+ gst::debug!(CAT, imp: self, "Stopping");
+ self.task.stop().block_on()?;
+ gst::debug!(CAT, imp: self, "Stopped");
+
+ Ok(())
+ }
+
+ fn start(&self) -> Result<(), gst::ErrorMessage> {
+ gst::debug!(CAT, imp: self, "Starting");
+ self.task.start().block_on()?;
+ gst::debug!(CAT, imp: self, "Started");
+
+ Ok(())
+ }
+
+ fn pause(&self) -> Result<(), gst::ErrorMessage> {
+ gst::debug!(CAT, imp: self, "Pausing");
+ self.task.pause().block_on()?;
+ gst::debug!(CAT, imp: self, "Paused");
+
+ Ok(())
+ }
+}
+
+#[glib::object_subclass]
+impl ObjectSubclass for AudioTestSrc {
+ const NAME: &'static str = "TsAudioTestSrc";
+ type Type = super::AudioTestSrc;
+ type ParentType = gst::Element;
+
+ fn with_class(klass: &Self::Class) -> Self {
+ Self {
+ src_pad: PadSrc::new(
+ gst::Pad::from_template(&klass.pad_template("src").unwrap(), Some("src")),
+ AudioTestSrcPadHandler,
+ ),
+ task: Task::default(),
+ settings: Default::default(),
+ }
+ }
+}
+
+impl ObjectImpl for AudioTestSrc {
+ fn properties() -> &'static [glib::ParamSpec] {
+ static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
+ vec![
+ glib::ParamSpecString::builder("context")
+ .nick("Context")
+ .blurb("Context name to share threads with")
+ .default_value(Some(DEFAULT_CONTEXT))
+ .build(),
+ glib::ParamSpecUInt::builder("context-wait")
+ .nick("Context Wait")
+ .blurb("Throttle poll loop to run at most once every this many ms")
+ .maximum(1000)
+ .default_value(DEFAULT_CONTEXT_WAIT.as_millis() as u32)
+ .build(),
+ glib::ParamSpecBoolean::builder("do-timestamp")
+ .nick("Do timestamp")
+ .blurb("Apply current stream time to buffers")
+ .build(),
+ glib::ParamSpecBoolean::builder("is-live")
+ .nick("Is live")
+ .blurb("Whether to act as a live source")
+ .build(),
+ glib::ParamSpecUInt::builder("buffer-duration")
+ .nick("Buffer duration")
+ .blurb("Buffer duration in ms")
+ .default_value(DEFAULT_BUFFER_DURATION.mseconds() as u32)
+ .build(),
+ glib::ParamSpecInt::builder("num-buffers")
+ .nick("Num Buffers")
+ .blurb("Number of buffers to output before sending EOS (-1 = unlimited)")
+ .minimum(-1i32)
+ .default_value(DEFAULT_NUM_BUFFERS)
+ .build(),
+ #[cfg(feature = "tuning")]
+ glib::ParamSpecBoolean::builder("main-elem")
+ .nick("Main Element")
+ .blurb("Declare this element as the main one")
+ .write_only()
+ .build(),
+ ]
+ });
+
+ PROPERTIES.as_ref()
+ }
+
+ fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
+ let mut settings = self.settings.lock().unwrap();
+ match pspec.name() {
+ "context" => {
+ settings.context = value
+ .get::<Option<String>>()
+ .unwrap()
+ .unwrap_or_else(|| DEFAULT_CONTEXT.into());
+ }
+ "context-wait" => {
+ settings.context_wait = Duration::from_millis(value.get::<u32>().unwrap().into());
+ }
+ "do-timestamp" => {
+ settings.do_timestamp = value.get::<bool>().unwrap();
+ }
+ "is-live" => {
+ settings.is_live = value.get::<bool>().unwrap();
+ }
+ "buffer-duration" => {
+ settings.buffer_duration = (value.get::<u32>().unwrap() as u64).mseconds();
+ }
+ "num-buffers" => {
+ let value = value.get::<i32>().unwrap();
+ settings.num_buffers = if value > 0 { Some(value as u32) } else { None };
+ }
+ #[cfg(feature = "tuning")]
+ "main-elem" => {
+ settings.is_main_elem = value.get::<bool>().unwrap();
+ }
+ _ => unimplemented!(),
+ }
+ }
+
+ fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
+ let settings = self.settings.lock().unwrap();
+ match pspec.name() {
+ "context" => settings.context.to_value(),
+ "context-wait" => (settings.context_wait.as_millis() as u32).to_value(),
+ "do-timestamp" => settings.do_timestamp.to_value(),
+ "is-live" => settings.is_live.to_value(),
+ "buffer-duration" => (settings.buffer_duration.mseconds() as u32).to_value(),
+ "num-buffers" => settings
+ .num_buffers
+ .and_then(|val| val.try_into().ok())
+ .unwrap_or(-1i32)
+ .to_value(),
+ #[cfg(feature = "tuning")]
+ "main-elem" => settings.is_main_elem.to_value(),
+ _ => unimplemented!(),
+ }
+ }
+
+ fn constructed(&self) {
+ self.parent_constructed();
+
+ let obj = self.obj();
+ obj.add_pad(self.src_pad.gst_pad()).unwrap();
+ obj.set_element_flags(gst::ElementFlags::SOURCE);
+ }
+}
+
+impl GstObjectImpl for AudioTestSrc {}
+
+impl ElementImpl for AudioTestSrc {
+ fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
+ static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
+ gst::subclass::ElementMetadata::new(
+ "Thread-sharing audio test source",
+ "Source/Test",
+ "Thread-sharing audio test source",
+ "François Laignel <fengalin@free.fr>",
+ )
+ });
+
+ Some(&*ELEMENT_METADATA)
+ }
+
+ fn pad_templates() -> &'static [gst::PadTemplate] {
+ static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
+ let src_pad_template = gst::PadTemplate::new(
+ "src",
+ gst::PadDirection::Src,
+ gst::PadPresence::Always,
+ &DEFAULT_CAPS,
+ )
+ .unwrap();
+
+ vec![src_pad_template]
+ });
+
+ PAD_TEMPLATES.as_ref()
+ }
+
+ fn change_state(
+ &self,
+ transition: gst::StateChange,
+ ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
+ gst::trace!(CAT, imp: self, "Changing state {transition:?}");
+
+ match transition {
+ gst::StateChange::NullToReady => {
+ self.prepare().map_err(|err| {
+ self.post_error_message(err);
+ gst::StateChangeError
+ })?;
+ }
+ gst::StateChange::PlayingToPaused => {
+ self.pause().map_err(|_| gst::StateChangeError)?;
+ }
+ gst::StateChange::ReadyToNull => {
+ self.unprepare();
+ }
+ _ => (),
+ }
+
+ let mut success = self.parent_change_state(transition)?;
+
+ match transition {
+ gst::StateChange::ReadyToPaused => {
+ self.pause().map_err(|_| gst::StateChangeError)?;
+ success = gst::StateChangeSuccess::NoPreroll;
+ }
+ gst::StateChange::PausedToPlaying => {
+ self.start().map_err(|_| gst::StateChangeError)?;
+ }
+ gst::StateChange::PlayingToPaused => {
+ success = gst::StateChangeSuccess::NoPreroll;
+ }
+ gst::StateChange::PausedToReady => {
+ self.stop().map_err(|_| gst::StateChangeError)?;
+ }
+ _ => (),
+ }
+
+ Ok(success)
+ }
+}
diff --git a/generic/threadshare/src/audiotestsrc/mod.rs b/generic/threadshare/src/audiotestsrc/mod.rs
new file mode 100644
index 00000000..fbd16f4f
--- /dev/null
+++ b/generic/threadshare/src/audiotestsrc/mod.rs
@@ -0,0 +1,17 @@
+use gst::glib;
+use gst::prelude::*;
+
+mod imp;
+
+glib::wrapper! {
+ pub struct AudioTestSrc(ObjectSubclass<imp::AudioTestSrc>) @extends gst::Element, gst::Object;
+}
+
+pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ gst::Element::register(
+ Some(plugin),
+ "ts-audiotestsrc",
+ gst::Rank::None,
+ AudioTestSrc::static_type(),
+ )
+}
diff --git a/generic/threadshare/src/lib.rs b/generic/threadshare/src/lib.rs
index 94671379..35f2e5a8 100644
--- a/generic/threadshare/src/lib.rs
+++ b/generic/threadshare/src/lib.rs
@@ -16,29 +16,30 @@
#[macro_use]
pub mod runtime;
-pub mod socket;
-mod tcpclientsrc;
-mod udpsink;
-mod udpsrc;
-
mod appsrc;
+mod audiotestsrc;
pub mod dataqueue;
mod inputselector;
mod jitterbuffer;
mod proxy;
mod queue;
+pub mod socket;
+mod tcpclientsrc;
+mod udpsink;
+mod udpsrc;
use gst::glib;
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
- udpsrc::register(plugin)?;
- udpsink::register(plugin)?;
- tcpclientsrc::register(plugin)?;
- queue::register(plugin)?;
- proxy::register(plugin)?;
appsrc::register(plugin)?;
- jitterbuffer::register(plugin)?;
+ audiotestsrc::register(plugin)?;
inputselector::register(plugin)?;
+ jitterbuffer::register(plugin)?;
+ proxy::register(plugin)?;
+ queue::register(plugin)?;
+ tcpclientsrc::register(plugin)?;
+ udpsink::register(plugin)?;
+ udpsrc::register(plugin)?;
Ok(())
}