Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'generic/threadshare/examples/standalone/src/imp.rs')
-rw-r--r--generic/threadshare/examples/standalone/src/imp.rs278
1 files changed, 204 insertions, 74 deletions
diff --git a/generic/threadshare/examples/standalone/src/imp.rs b/generic/threadshare/examples/standalone/src/imp.rs
index 6b5f1d426..afcb2c418 100644
--- a/generic/threadshare/examples/standalone/src/imp.rs
+++ b/generic/threadshare/examples/standalone/src/imp.rs
@@ -16,7 +16,7 @@ use gst::subclass::prelude::*;
use once_cell::sync::Lazy;
use std::sync::Mutex;
-use std::time::{Duration, Instant};
+use std::time::Duration;
use gstthreadshare::runtime::prelude::*;
use gstthreadshare::runtime::{Context, PadSrc, Task, Timer};
@@ -29,17 +29,18 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
)
});
-const BUFFER_DURATION: gst::ClockTime = gst::ClockTime::from_mseconds(20);
-
const DEFAULT_CONTEXT: &str = "";
const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_millis(20);
-const DEFAULT_NUM_BUFFERS: i32 = 50 * 60 * 2;
+const DEFAULT_PUSH_PERIOD: gst::ClockTime = gst::ClockTime::from_mseconds(20);
+const DEFAULT_NUM_BUFFERS: i32 = 50 * 100;
#[derive(Debug, Clone)]
struct Settings {
context: String,
context_wait: Duration,
- num_buffers: Option<i32>,
+ push_period: gst::ClockTime,
+ raise_log_level: bool,
+ num_buffers: Option<u32>,
}
impl Default for Settings {
@@ -47,7 +48,9 @@ impl Default for Settings {
Settings {
context: DEFAULT_CONTEXT.into(),
context_wait: DEFAULT_CONTEXT_WAIT,
- num_buffers: Some(DEFAULT_NUM_BUFFERS),
+ push_period: DEFAULT_PUSH_PERIOD,
+ raise_log_level: false,
+ num_buffers: Some(DEFAULT_NUM_BUFFERS as u32),
}
}
}
@@ -62,13 +65,13 @@ impl PadSrcHandler for TestSrcPadHandler {
struct SrcTask {
element: super::TestSrc,
buffer_pool: gst::BufferPool,
- last_pts: gst::ClockTime,
- last_buf_instant: Option<Instant>,
- push_period: Duration,
+ timer: Option<Timer>,
+ raise_log_level: bool,
+ push_period: gst::ClockTime,
need_initial_events: bool,
need_segment: bool,
- num_buffers: Option<i32>,
- buffer_count: i32,
+ num_buffers: Option<u32>,
+ buffer_count: u32,
}
impl SrcTask {
@@ -83,12 +86,12 @@ impl SrcTask {
SrcTask {
element,
buffer_pool,
- last_pts: gst::ClockTime::ZERO,
- last_buf_instant: None,
- push_period: Duration::ZERO,
+ timer: None,
+ raise_log_level: false,
+ push_period: gst::ClockTime::ZERO,
need_initial_events: true,
need_segment: true,
- num_buffers: Some(DEFAULT_NUM_BUFFERS),
+ num_buffers: Some(DEFAULT_NUM_BUFFERS as u32),
buffer_count: 0,
}
}
@@ -99,11 +102,17 @@ impl TaskImpl for SrcTask {
fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
- gst::log!(CAT, obj: &self.element, "Preparing Task");
-
let src = self.element.imp();
let settings = src.settings.lock().unwrap();
- self.push_period = settings.context_wait;
+ self.raise_log_level = settings.raise_log_level;
+
+ if self.raise_log_level {
+ gst::log!(CAT, obj: &self.element, "Preparing Task");
+ } else {
+ gst::trace!(CAT, obj: &self.element, "Preparing Task");
+ }
+
+ self.push_period = settings.push_period;
self.num_buffers = settings.num_buffers;
Ok(())
@@ -113,9 +122,18 @@ impl TaskImpl for SrcTask {
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async {
- gst::log!(CAT, obj: &self.element, "Starting Task");
+ if self.raise_log_level {
+ gst::log!(CAT, obj: &self.element, "Starting Task");
+ } else {
+ gst::trace!(CAT, obj: &self.element, "Starting Task");
+ }
+
+ self.timer = Some(Timer::interval_delayed_by(
+ // Delay first buffer push so as to let others start.
+ Duration::from_secs(2),
+ self.push_period.into(),
+ ));
self.buffer_count = 0;
- self.last_buf_instant = None;
self.buffer_pool.set_active(true).unwrap();
Ok(())
}
@@ -124,27 +142,17 @@ impl TaskImpl for SrcTask {
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
- gst::log!(CAT, obj: &self.element, "Stopping task");
+ if self.raise_log_level {
+ gst::log!(CAT, obj: &self.element, "Stopping Task");
+ } else {
+ gst::trace!(CAT, obj: &self.element, "Stopping Task");
+ }
self.buffer_pool.set_active(false).unwrap();
- self.last_pts = gst::ClockTime::ZERO;
+ self.timer = None;
self.need_initial_events = true;
self.need_segment = true;
- gst::log!(CAT, obj: &self.element, "Task stopped");
- Ok(())
- }
- .boxed()
- }
-
- fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
- async move {
- gst::log!(CAT, obj: &self.element, "Starting task flush");
-
- self.buffer_pool.set_active(false).unwrap();
- self.need_segment = true;
-
- gst::log!(CAT, obj: &self.element, "Task flush started");
Ok(())
}
.boxed()
@@ -152,30 +160,34 @@ impl TaskImpl for SrcTask {
fn try_next(&mut self) -> BoxFuture<'_, Result<gst::Buffer, gst::FlowError>> {
async move {
- if let Some(delay) = self
- .last_buf_instant
- .map(|last| last.elapsed())
- .opt_checked_sub(self.push_period)
- .ok()
- .flatten()
- {
- Timer::after(delay).await;
+ if self.raise_log_level {
+ gst::log!(CAT, obj: &self.element, "Awaiting timer");
+ } else {
+ gst::trace!(CAT, obj: &self.element, "Awaiting timer");
}
- self.last_buf_instant = Some(Instant::now());
-
- let start = self.last_pts;
- self.last_pts = start + BUFFER_DURATION;
+ self.timer.as_mut().unwrap().next().await;
- self.buffer_pool.acquire_buffer(None).map(|mut buffer| {
- {
- let buffer = buffer.get_mut().unwrap();
- buffer.set_pts(start);
- buffer.set_duration(BUFFER_DURATION);
- }
+ if self.raise_log_level {
+ gst::log!(CAT, obj: &self.element, "Timer ticked");
+ } else {
+ gst::trace!(CAT, obj: &self.element, "Timer ticked");
+ }
- buffer
- })
+ self.buffer_pool
+ .acquire_buffer(None)
+ .map(|mut buffer| {
+ {
+ let buffer = buffer.get_mut().unwrap();
+ let rtime = self.element.current_running_time().unwrap();
+ buffer.set_dts(rtime);
+ }
+ buffer
+ })
+ .map_err(|err| {
+ gst::error!(CAT, obj: &self.element, "Failed to acquire buffer {}", err);
+ err
+ })
}
.boxed()
}
@@ -185,15 +197,29 @@ impl TaskImpl for SrcTask {
let res = self.push(buffer).await;
match res {
Ok(_) => {
- gst::log!(CAT, obj: &self.element, "Successfully pushed buffer");
+ if self.raise_log_level {
+ gst::log!(CAT, obj: &self.element, "Successfully pushed buffer");
+ } else {
+ gst::trace!(CAT, obj: &self.element, "Successfully pushed buffer");
+ }
}
Err(gst::FlowError::Eos) => {
- gst::debug!(CAT, obj: &self.element, "EOS");
+ if self.raise_log_level {
+ gst::debug!(CAT, obj: &self.element, "EOS");
+ } else {
+ gst::trace!(CAT, obj: &self.element, "EOS");
+ }
let test_src = self.element.imp();
test_src.src_pad.push_event(gst::event::Eos::new()).await;
+
+ return Err(gst::FlowError::Eos);
}
Err(gst::FlowError::Flushing) => {
- gst::debug!(CAT, obj: &self.element, "Flushing");
+ if self.raise_log_level {
+ gst::debug!(CAT, obj: &self.element, "Flushing");
+ } else {
+ gst::trace!(CAT, obj: &self.element, "Flushing");
+ }
}
Err(err) => {
gst::error!(CAT, obj: &self.element, "Got error {}", err);
@@ -214,11 +240,20 @@ impl TaskImpl for SrcTask {
impl SrcTask {
async fn push(&mut self, buffer: gst::Buffer) -> Result<gst::FlowSuccess, gst::FlowError> {
- gst::log!(CAT, obj: &self.element, "Handling {:?}", buffer);
+ if self.raise_log_level {
+ gst::debug!(CAT, obj: &self.element, "Pushing {:?}", buffer);
+ } else {
+ gst::trace!(CAT, obj: &self.element, "Pushing {:?}", buffer);
+ }
+
let test_src = self.element.imp();
if self.need_initial_events {
- gst::debug!(CAT, obj: &self.element, "Pushing initial events");
+ if self.raise_log_level {
+ gst::debug!(CAT, obj: &self.element, "Pushing initial events");
+ } else {
+ gst::trace!(CAT, obj: &self.element, "Pushing initial events");
+ }
let stream_id = format!("{:08x}{:08x}", rand::random::<u32>(), rand::random::<u32>());
let stream_start_evt = gst::event::StreamStart::builder(&stream_id)
@@ -244,12 +279,27 @@ impl SrcTask {
self.need_segment = false;
}
- gst::debug!(CAT, obj: &self.element, "Forwarding {:?}", buffer);
+ if self.raise_log_level {
+ gst::debug!(CAT, obj: &self.element, "Forwarding buffer");
+ } else {
+ gst::trace!(CAT, obj: &self.element, "Forwarding buffer");
+ }
+
let ok = test_src.src_pad.push(buffer).await?;
self.buffer_count += 1;
if self.num_buffers.opt_eq(self.buffer_count).unwrap_or(false) {
+ if self.raise_log_level {
+ gst::debug!(CAT, obj: &self.element, "Pushing EOS");
+ } else {
+ gst::trace!(CAT, obj: &self.element, "Pushing EOS");
+ }
+
+ let test_src = self.element.imp();
+ if !test_src.src_pad.push_event(gst::event::Eos::new()).await {
+ gst::error!(CAT, obj: &self.element, "Error pushing EOS");
+ }
return Err(gst::FlowError::Eos);
}
@@ -266,7 +316,12 @@ pub struct TestSrc {
impl TestSrc {
fn prepare(&self, element: &super::TestSrc) -> Result<(), gst::ErrorMessage> {
- gst::debug!(CAT, obj: element, "Preparing");
+ let raise_log_level = self.settings.lock().unwrap().raise_log_level;
+ if raise_log_level {
+ gst::debug!(CAT, obj: element, "Preparing");
+ } else {
+ gst::trace!(CAT, obj: element, "Preparing");
+ }
let settings = self.settings.lock().unwrap();
let context =
@@ -282,35 +337,86 @@ impl TestSrc {
.prepare(SrcTask::new(element.clone()), context)
.block_on()?;
- gst::debug!(CAT, obj: element, "Prepared");
+ if raise_log_level {
+ gst::debug!(CAT, obj: element, "Prepared");
+ } else {
+ gst::trace!(CAT, obj: element, "Prepared");
+ }
Ok(())
}
fn unprepare(&self, element: &super::TestSrc) {
- gst::debug!(CAT, obj: element, "Unpreparing");
+ let raise_log_level = self.settings.lock().unwrap().raise_log_level;
+ if raise_log_level {
+ gst::debug!(CAT, obj: element, "Unpreparing");
+ } else {
+ gst::trace!(CAT, obj: element, "Unpreparing");
+ }
+
self.task.unprepare().block_on().unwrap();
- gst::debug!(CAT, obj: element, "Unprepared");
+
+ if raise_log_level {
+ gst::debug!(CAT, obj: element, "Unprepared");
+ } else {
+ gst::trace!(CAT, obj: element, "Unprepared");
+ }
}
fn stop(&self, element: &super::TestSrc) -> Result<(), gst::ErrorMessage> {
- gst::debug!(CAT, obj: element, "Stopping");
+ let raise_log_level = self.settings.lock().unwrap().raise_log_level;
+ if raise_log_level {
+ gst::debug!(CAT, obj: element, "Stopping");
+ } else {
+ gst::trace!(CAT, obj: element, "Stopping");
+ }
+
self.task.stop().block_on()?;
- gst::debug!(CAT, obj: element, "Stopped");
+
+ if raise_log_level {
+ gst::debug!(CAT, obj: element, "Stopped");
+ } else {
+ gst::trace!(CAT, obj: element, "Stopped");
+ }
+
Ok(())
}
fn start(&self, element: &super::TestSrc) -> Result<(), gst::ErrorMessage> {
- gst::debug!(CAT, obj: element, "Starting");
+ let raise_log_level = self.settings.lock().unwrap().raise_log_level;
+ if raise_log_level {
+ gst::debug!(CAT, obj: element, "Starting");
+ } else {
+ gst::trace!(CAT, obj: element, "Starting");
+ }
+
self.task.start().block_on()?;
- gst::debug!(CAT, obj: element, "Started");
+
+ if raise_log_level {
+ gst::debug!(CAT, obj: element, "Started");
+ } else {
+ gst::trace!(CAT, obj: element, "Started");
+ }
+
Ok(())
}
fn pause(&self, element: &super::TestSrc) -> Result<(), gst::ErrorMessage> {
- gst::debug!(CAT, obj: element, "Pausing");
+ let raise_log_level = self.settings.lock().unwrap().raise_log_level;
+ if raise_log_level {
+ gst::debug!(CAT, obj: element, "Pausing");
+ } else {
+ gst::trace!(CAT, obj: element, "Pausing");
+ }
+
self.task.pause().block_on()?;
- gst::debug!(CAT, obj: element, "Paused");
+
+ if raise_log_level {
+ gst::debug!(CAT, obj: element, "Paused");
+ } else {
+ gst::trace!(CAT, obj: element, "Paused");
+ }
+
Ok(())
}
}
@@ -348,6 +454,16 @@ impl ObjectImpl for TestSrc {
.maximum(1000)
.default_value(DEFAULT_CONTEXT_WAIT.as_millis() as u32)
.build(),
+ glib::ParamSpecUInt::builder("push-period")
+ .nick("Buffer Push Period")
+ .blurb("Push a new buffer every this many ms")
+ .default_value(DEFAULT_PUSH_PERIOD.mseconds() as u32)
+ .build(),
+ glib::ParamSpecBoolean::builder("raise-log-level")
+ .nick("Raise log level")
+ .blurb("Raises the log level so that this element stands out")
+ .write_only()
+ .build(),
glib::ParamSpecInt::builder("num-buffers")
.nick("Num Buffers")
.blurb("Number of buffers to output before sending EOS (-1 = unlimited)")
@@ -380,9 +496,17 @@ impl ObjectImpl for TestSrc {
value.get::<u32>().expect("type checked upstream").into(),
);
}
+ "push-period" => {
+ settings.push_period = gst::ClockTime::from_mseconds(
+ value.get::<u32>().expect("type checked upstream").into(),
+ );
+ }
+ "raise-log-level" => {
+ settings.raise_log_level = value.get::<bool>().expect("type checked upstream");
+ }
"num-buffers" => {
let value = value.get::<i32>().expect("type checked upstream");
- settings.num_buffers = if value > 0 { Some(value) } else { None };
+ settings.num_buffers = if value > 0 { Some(value as u32) } else { None };
}
_ => unimplemented!(),
}
@@ -393,7 +517,13 @@ impl ObjectImpl for TestSrc {
match pspec.name() {
"context" => settings.context.to_value(),
"context-wait" => (settings.context_wait.as_millis() as u32).to_value(),
- "num-buffers" => settings.num_buffers.unwrap_or(-1).to_value(),
+ "push-period" => (settings.push_period.mseconds() as u32).to_value(),
+ "raise-log-level" => settings.raise_log_level.to_value(),
+ "num-buffers" => settings
+ .num_buffers
+ .and_then(|val| val.try_into().ok())
+ .unwrap_or(-1i32)
+ .to_value(),
_ => unimplemented!(),
}
}