Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrançois Laignel <fengalin@free.fr>2022-08-13 13:48:05 +0300
committerFrançois Laignel <fengalin@free.fr>2022-08-13 14:03:43 +0300
commit21da753607186f466b0a47a6fb731f053c9ab342 (patch)
tree24e25763c24fe8aa2ceeedba5a75ced53f7f3473 /generic/threadshare/src/udpsink
parent837126be76918de37c2be3ed1d142d060178dede (diff)
ts/udpsink: move sync on buffer to try_next
By moving sync on buffer ts to `try_next`, the resulting delay can be cancelled when a state transition occurs. To prevent item loss, this requires first peeking the incoming item from the channel without popping it. After the delay has elasped, we can pop the item as the last await point in `try_next`: either it will be cancelled before popping or the popped item will be passed on to `handle_item`. Also add `flush` which was missing from `stop` and `flush_start` transition actions.
Diffstat (limited to 'generic/threadshare/src/udpsink')
-rw-r--r--generic/threadshare/src/udpsink/imp.rs83
1 files changed, 62 insertions, 21 deletions
diff --git a/generic/threadshare/src/udpsink/imp.rs b/generic/threadshare/src/udpsink/imp.rs
index fa07ffe90..a9efaca36 100644
--- a/generic/threadshare/src/udpsink/imp.rs
+++ b/generic/threadshare/src/udpsink/imp.rs
@@ -19,6 +19,7 @@
use futures::future::BoxFuture;
use futures::prelude::*;
+use futures::stream::Peekable;
use gst::glib;
use gst::prelude::*;
@@ -34,7 +35,9 @@ use crate::socket::{wrap_socket, GioSocketWrapper};
use std::collections::BTreeSet;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
+use std::pin::Pin;
use std::sync::Mutex;
+use std::task::Poll;
use std::time::Duration;
use std::u16;
use std::u8;
@@ -220,10 +223,9 @@ enum Command {
SetSync(bool),
}
-#[derive(Debug)]
struct UdpSinkTask {
element: super::UdpSink,
- item_receiver: flume::Receiver<TaskItem>,
+ item_receiver: Peekable<flume::r#async::RecvStream<'static, TaskItem>>,
cmd_receiver: flume::Receiver<Command>,
clients: BTreeSet<SocketAddr>,
socket: Option<Async<UdpSocket>>,
@@ -241,7 +243,7 @@ impl UdpSinkTask {
) -> Self {
UdpSinkTask {
element: element.clone(),
- item_receiver,
+ item_receiver: item_receiver.into_stream().peekable(),
cmd_receiver,
clients: Default::default(),
socket: None,
@@ -252,6 +254,11 @@ impl UdpSinkTask {
}
}
+ async fn flush(&mut self) {
+ // Purge the channel
+ while let Poll::Ready(Some(_item)) = futures::poll!(self.item_receiver.next()) {}
+ }
+
fn process_command(&mut self, cmd: Command) {
use Command::*;
match cmd {
@@ -619,17 +626,6 @@ impl UdpSinkTask {
/// Buffer handling.
impl UdpSinkTask {
async fn render(&mut self, buffer: gst::Buffer) -> Result<(), gst::FlowError> {
- if self.sync {
- let rtime = self.segment.as_ref().and_then(|segment| {
- segment
- .downcast_ref::<gst::format::Time>()
- .and_then(|segment| segment.to_running_time(buffer.pts()).opt_add(self.latency))
- });
- if let Some(rtime) = rtime {
- self.sync(rtime).await;
- }
- }
-
let data = buffer.map_readable().map_err(|_| {
element_error!(
self.element,
@@ -681,11 +677,9 @@ impl UdpSinkTask {
async fn sync(&self, running_time: gst::ClockTime) {
let now = self.element.current_running_time();
- match running_time.opt_checked_sub(now) {
- Ok(Some(delay)) => {
- runtime::time::delay_for(delay.into()).await;
- }
- _ => runtime::executor::yield_now().await,
+ if let Ok(Some(delay)) = running_time.opt_checked_sub(now) {
+ gst::trace!(CAT, obj: &self.element, "sync: waiting {}", delay);
+ runtime::time::delay_for(delay.into()).await;
}
}
}
@@ -730,12 +724,39 @@ impl TaskImpl for UdpSinkTask {
fn try_next(&mut self) -> BoxFuture<'_, Result<TaskItem, gst::FlowError>> {
async move {
loop {
+ gst::info!(CAT, obj: &self.element, "Awaiting next item or command");
futures::select_biased! {
cmd = self.cmd_receiver.recv_async() => {
self.process_command(cmd.unwrap());
}
- item = self.item_receiver.recv_async() => {
- break item.map_err(|_| panic!("Internal channel sender dropped while Task is Started"))
+ item_opt = Pin::new(&mut self.item_receiver).peek() => {
+ // Check the peeked item in case we need to sync.
+ // The item will still be available in the channel
+ // in case this is cancelled by a state transition.
+ match item_opt {
+ Some(TaskItem::Buffer(buffer)) => {
+ if self.sync {
+ let rtime = self.segment.as_ref().and_then(|segment| {
+ segment
+ .downcast_ref::<gst::format::Time>()
+ .and_then(|segment| {
+ segment.to_running_time(buffer.pts()).opt_add(self.latency)
+ })
+ });
+ if let Some(rtime) = rtime {
+ // This can be cancelled by a state transition.
+ self.sync(rtime).await;
+ }
+ }
+ }
+ Some(_) => (),
+ None => {
+ panic!("Internal channel sender dropped while Task is Started");
+ }
+ }
+
+ // An item was peeked above, we can now pop it without losing it.
+ return Ok(self.item_receiver.next().await.unwrap());
}
}
}
@@ -745,6 +766,8 @@ impl TaskImpl for UdpSinkTask {
fn handle_item(&mut self, item: TaskItem) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
+ gst::info!(CAT, obj: &self.element, "Handling {:?}", item);
+
match item {
TaskItem::Buffer(buffer) => self.render(buffer).await.map_err(|err| {
element_error!(
@@ -774,6 +797,24 @@ impl TaskImpl for UdpSinkTask {
}
.boxed()
}
+
+ fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
+ async {
+ gst::info!(CAT, obj: &self.element, "Stopping Task");
+ self.flush().await;
+ Ok(())
+ }
+ .boxed()
+ }
+
+ fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
+ async {
+ gst::info!(CAT, obj: &self.element, "Starting Task Flush");
+ self.flush().await;
+ Ok(())
+ }
+ .boxed()
+ }
}
#[derive(Debug)]