Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrançois Laignel <fengalin@free.fr>2022-08-10 21:10:08 +0300
committerFrançois Laignel <fengalin@free.fr>2022-08-10 21:10:08 +0300
commit33e601d33e50f7867a33436f60372b388dc22caa (patch)
tree09b8e57dc8453b03eb1b4c11ecd3d935d57305ac /generic
parent8b54c3fed699fb8f47c30c531d58eec67f0079a9 (diff)
ts: migrate elements to try_next / handle_item
See previous commit for details. Also switched to panicking for some programming errors.
Diffstat (limited to 'generic')
-rw-r--r--generic/threadshare/src/appsrc/imp.rs25
-rw-r--r--generic/threadshare/src/jitterbuffer/imp.rs18
-rw-r--r--generic/threadshare/src/proxy/imp.rs24
-rw-r--r--generic/threadshare/src/queue/imp.rs22
-rw-r--r--generic/threadshare/src/socket.rs12
-rw-r--r--generic/threadshare/src/tcpclientsrc/imp.rs65
-rw-r--r--generic/threadshare/src/udpsink/imp.rs26
-rw-r--r--generic/threadshare/src/udpsrc/imp.rs183
-rw-r--r--generic/threadshare/tests/pad.rs28
9 files changed, 222 insertions, 181 deletions
diff --git a/generic/threadshare/src/appsrc/imp.rs b/generic/threadshare/src/appsrc/imp.rs
index 0fa4c2826..7f5a6360e 100644
--- a/generic/threadshare/src/appsrc/imp.rs
+++ b/generic/threadshare/src/appsrc/imp.rs
@@ -238,19 +238,20 @@ impl AppSrcTask {
}
impl TaskImpl for AppSrcTask {
- fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ type Item = StreamItem;
+
+ fn try_next(&mut self) -> BoxFuture<'_, Result<StreamItem, gst::FlowError>> {
async move {
- let item = self.receiver.next().await.ok_or_else(|| {
- gst::error!(CAT, obj: &self.element, "SrcPad channel aborted");
- gst::element_error!(
- &self.element,
- gst::StreamError::Failed,
- ("Internal data stream error"),
- ["streaming stopped, reason: channel aborted"]
- );
- gst::FlowError::Flushing
- })?;
+ self.receiver
+ .next()
+ .await
+ .ok_or_else(|| panic!("Internal channel sender dropped while Task is Started"))
+ }
+ .boxed()
+ }
+ fn handle_item(&mut self, item: StreamItem) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ async move {
let res = self.push_item(item).await;
match res {
Ok(_) => {
@@ -428,7 +429,7 @@ impl AppSrc {
fn pause(&self, element: &super::AppSrc) -> Result<(), gst::ErrorMessage> {
gst::debug!(CAT, obj: element, "Pausing");
- let _ = self.task.pause().check()?;
+ self.task.pause().block_on()?;
gst::debug!(CAT, obj: element, "Paused");
Ok(())
}
diff --git a/generic/threadshare/src/jitterbuffer/imp.rs b/generic/threadshare/src/jitterbuffer/imp.rs
index 76f589ea8..cb96c963b 100644
--- a/generic/threadshare/src/jitterbuffer/imp.rs
+++ b/generic/threadshare/src/jitterbuffer/imp.rs
@@ -1066,6 +1066,8 @@ impl JitterBufferTask {
}
impl TaskImpl for JitterBufferTask {
+ type Item = ();
+
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::log!(CAT, obj: &self.element, "Starting task");
@@ -1087,7 +1089,17 @@ impl TaskImpl for JitterBufferTask {
.boxed()
}
- fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ // FIXME this function was migrated to the try_next / handle_item model
+ // but hasn't been touched as there are pending changes to jitterbuffer
+ // in https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/756.
+ // It should be possible to remove the loop below as try_next / handle_item
+ // are executed in a loop by the Task state machine.
+ // It should also be possible to store latency and context_wait as
+ // fields of JitterBufferTask so as to avoid locking the settings.
+ // If latency can change during processing, a command based mechanism
+ // could be implemented. See the command implemention for ts-udpsink as
+ // an example.
+ fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
let jb = self.element.imp();
let (latency, context_wait) = {
@@ -1223,6 +1235,10 @@ impl TaskImpl for JitterBufferTask {
.boxed()
}
+ fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ future::ok(()).boxed()
+ }
+
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::log!(CAT, obj: &self.element, "Stopping task");
diff --git a/generic/threadshare/src/proxy/imp.rs b/generic/threadshare/src/proxy/imp.rs
index 3d47e3665..06e9e7d61 100644
--- a/generic/threadshare/src/proxy/imp.rs
+++ b/generic/threadshare/src/proxy/imp.rs
@@ -858,6 +858,8 @@ impl ProxySrcTask {
}
impl TaskImpl for ProxySrcTask {
+ type Item = DataQueueItem;
+
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::log!(SRC_CAT, obj: &self.element, "Starting task");
@@ -880,18 +882,18 @@ impl TaskImpl for ProxySrcTask {
.boxed()
}
- fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ fn try_next(&mut self) -> BoxFuture<'_, Result<DataQueueItem, gst::FlowError>> {
async move {
- let item = self.dataqueue.next().await;
-
- let item = match item {
- Some(item) => item,
- None => {
- gst::log!(SRC_CAT, obj: &self.element, "DataQueue Stopped");
- return Err(gst::FlowError::Flushing);
- }
- };
+ self.dataqueue
+ .next()
+ .await
+ .ok_or_else(|| panic!("DataQueue stopped while Task is Started"))
+ }
+ .boxed()
+ }
+ fn handle_item(&mut self, item: DataQueueItem) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ async move {
let res = self.push_item(item).await;
let proxysrc = self.element.imp();
match res {
@@ -1085,7 +1087,7 @@ impl ProxySrc {
fn pause(&self, element: &super::ProxySrc) -> Result<(), gst::ErrorMessage> {
gst::debug!(SRC_CAT, obj: element, "Pausing");
- let _ = self.task.pause().check()?;
+ self.task.pause().block_on()?;
gst::debug!(SRC_CAT, obj: element, "Paused");
Ok(())
}
diff --git a/generic/threadshare/src/queue/imp.rs b/generic/threadshare/src/queue/imp.rs
index 9ed640b14..5ee8fc1d6 100644
--- a/generic/threadshare/src/queue/imp.rs
+++ b/generic/threadshare/src/queue/imp.rs
@@ -328,6 +328,8 @@ impl QueueTask {
}
impl TaskImpl for QueueTask {
+ type Item = DataQueueItem;
+
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::log!(CAT, obj: &self.element, "Starting task");
@@ -345,18 +347,18 @@ impl TaskImpl for QueueTask {
.boxed()
}
- fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ fn try_next(&mut self) -> BoxFuture<'_, Result<DataQueueItem, gst::FlowError>> {
async move {
- let item = self.dataqueue.next().await;
-
- let item = match item {
- Some(item) => item,
- None => {
- gst::log!(CAT, obj: &self.element, "DataQueue Stopped");
- return Err(gst::FlowError::Flushing);
- }
- };
+ self.dataqueue
+ .next()
+ .await
+ .ok_or_else(|| panic!("DataQueue stopped while Task is Started"))
+ }
+ .boxed()
+ }
+ fn handle_item(&mut self, item: DataQueueItem) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ async {
let res = self.push_item(item).await;
let queue = self.element.imp();
match res {
diff --git a/generic/threadshare/src/socket.rs b/generic/threadshare/src/socket.rs
index 790170b9b..aa53d610e 100644
--- a/generic/threadshare/src/socket.rs
+++ b/generic/threadshare/src/socket.rs
@@ -117,13 +117,13 @@ impl fmt::Display for SocketError {
}
}
-pub type SocketStreamItem = Result<(gst::Buffer, Option<std::net::SocketAddr>), SocketError>;
-
impl<T: SocketRead> Socket<T> {
// Can't implement this as a Stream trait because we end up using things like
// tokio::net::UdpSocket which don't implement pollable functions.
#[allow(clippy::should_implement_trait)]
- pub async fn next(&mut self) -> Option<SocketStreamItem> {
+ pub async fn try_next(
+ &mut self,
+ ) -> Result<(gst::Buffer, Option<std::net::SocketAddr>), SocketError> {
gst::log!(SOCKET_CAT, obj: &self.element, "Trying to read data");
if self.mapped_buffer.is_none() {
@@ -133,7 +133,7 @@ impl<T: SocketRead> Socket<T> {
}
Err(err) => {
gst::debug!(SOCKET_CAT, obj: &self.element, "Failed to acquire buffer {:?}", err);
- return Some(Err(SocketError::Gst(err)));
+ return Err(SocketError::Gst(err));
}
}
}
@@ -172,12 +172,12 @@ impl<T: SocketRead> Socket<T> {
buffer.set_dts(dts);
}
- Some(Ok((buffer, saddr)))
+ Ok((buffer, saddr))
}
Err(err) => {
gst::debug!(SOCKET_CAT, obj: &self.element, "Read error {:?}", err);
- Some(Err(SocketError::Io(err)))
+ Err(SocketError::Io(err))
}
}
}
diff --git a/generic/threadshare/src/tcpclientsrc/imp.rs b/generic/threadshare/src/tcpclientsrc/imp.rs
index 32ef55e5e..6ebe12535 100644
--- a/generic/threadshare/src/tcpclientsrc/imp.rs
+++ b/generic/threadshare/src/tcpclientsrc/imp.rs
@@ -278,6 +278,8 @@ impl TcpClientSrcTask {
}
impl TaskImpl for TcpClientSrcTask {
+ type Item = gst::Buffer;
+
fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::log!(CAT, obj: &self.element, "Preparing task connecting to {:?}", self.saddr);
@@ -331,41 +333,44 @@ impl TaskImpl for TcpClientSrcTask {
.boxed()
}
- fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ fn try_next(&mut self) -> BoxFuture<'_, Result<gst::Buffer, gst::FlowError>> {
async move {
- let item = self.socket.as_mut().unwrap().next().await.ok_or_else(|| {
- gst::log!(CAT, obj: &self.element, "SocketStream Stopped");
- gst::FlowError::Flushing
- })?;
-
- let (buffer, _) = item.map_err(|err| {
- gst::error!(CAT, obj: &self.element, "Got error {:?}", err);
- match err {
- SocketError::Gst(err) => {
- gst::element_error!(
- self.element,
- gst::StreamError::Failed,
- ("Internal data stream error"),
- ["streaming stopped, reason {}", err]
- );
- }
- SocketError::Io(err) => {
- gst::element_error!(
- self.element,
- gst::StreamError::Failed,
- ("I/O error"),
- ["streaming stopped, I/O error {}", err]
- );
+ self.socket
+ .as_mut()
+ .unwrap()
+ .try_next()
+ .await
+ .map(|(buffer, _saddr)| buffer)
+ .map_err(|err| {
+ gst::error!(CAT, obj: &self.element, "Got error {:?}", err);
+ match err {
+ SocketError::Gst(err) => {
+ gst::element_error!(
+ self.element,
+ gst::StreamError::Failed,
+ ("Internal data stream error"),
+ ["streaming stopped, reason {}", err]
+ );
+ }
+ SocketError::Io(err) => {
+ gst::element_error!(
+ self.element,
+ gst::StreamError::Failed,
+ ("I/O error"),
+ ["streaming stopped, I/O error {}", err]
+ );
+ }
}
- }
- gst::FlowError::Error
- })?;
-
- self.push_buffer(buffer).await.map(drop)
+ gst::FlowError::Error
+ })
}
.boxed()
}
+ fn handle_item(&mut self, buffer: gst::Buffer) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ self.push_buffer(buffer).map_ok(drop).boxed()
+ }
+
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::log!(CAT, obj: &self.element, "Stopping task");
@@ -486,7 +491,7 @@ impl TcpClientSrc {
fn pause(&self, element: &super::TcpClientSrc) -> Result<(), gst::ErrorMessage> {
gst::debug!(CAT, obj: element, "Pausing");
- let _ = self.task.pause().check()?;
+ self.task.pause().block_on()?;
gst::debug!(CAT, obj: element, "Paused");
Ok(())
}
diff --git a/generic/threadshare/src/udpsink/imp.rs b/generic/threadshare/src/udpsink/imp.rs
index 1a909c91b..fa07ffe90 100644
--- a/generic/threadshare/src/udpsink/imp.rs
+++ b/generic/threadshare/src/udpsink/imp.rs
@@ -691,6 +691,8 @@ impl UdpSinkTask {
}
impl TaskImpl for UdpSinkTask {
+ type Item = TaskItem;
+
fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::info!(CAT, obj: &self.element, "Preparing Task");
@@ -725,17 +727,25 @@ impl TaskImpl for UdpSinkTask {
.boxed()
}
- fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ fn try_next(&mut self) -> BoxFuture<'_, Result<TaskItem, gst::FlowError>> {
async move {
- let item = futures::select_biased! {
- cmd = self.cmd_receiver.recv_async() => {
- self.process_command(cmd.unwrap());
- return Ok(());
+ loop {
+ futures::select_biased! {
+ cmd = self.cmd_receiver.recv_async() => {
+ self.process_command(cmd.unwrap());
+ }
+ item = self.item_receiver.recv_async() => {
+ break item.map_err(|_| panic!("Internal channel sender dropped while Task is Started"))
+ }
}
- item = self.item_receiver.recv_async() => item,
- };
+ }
+ }
+ .boxed()
+ }
- match item.map_err(|_| gst::FlowError::Flushing)? {
+ fn handle_item(&mut self, item: TaskItem) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ async move {
+ match item {
TaskItem::Buffer(buffer) => self.render(buffer).await.map_err(|err| {
element_error!(
&self.element,
diff --git a/generic/threadshare/src/udpsrc/imp.rs b/generic/threadshare/src/udpsrc/imp.rs
index 5c66f0ac4..1f6ded99d 100644
--- a/generic/threadshare/src/udpsrc/imp.rs
+++ b/generic/threadshare/src/udpsrc/imp.rs
@@ -208,67 +208,11 @@ impl UdpSrcTask {
need_segment: true,
}
}
-
- async fn push_buffer(
- &mut self,
- buffer: gst::Buffer,
- ) -> Result<gst::FlowSuccess, gst::FlowError> {
- gst::log!(CAT, obj: &self.element, "Handling {:?}", buffer);
- let udpsrc = self.element.imp();
-
- if self.need_initial_events {
- gst::debug!(CAT, obj: &self.element, "Pushing initial events");
-
- let stream_id = format!("{:08x}{:08x}", rand::random::<u32>(), rand::random::<u32>());
- let stream_start_evt = gst::event::StreamStart::builder(&stream_id)
- .group_id(gst::GroupId::next())
- .build();
- udpsrc.src_pad.push_event(stream_start_evt).await;
-
- let caps = udpsrc.settings.lock().unwrap().caps.clone();
- if let Some(caps) = caps {
- udpsrc
- .src_pad
- .push_event(gst::event::Caps::new(&caps))
- .await;
- *udpsrc.configured_caps.lock().unwrap() = Some(caps);
- }
-
- self.need_initial_events = false;
- }
-
- if self.need_segment {
- let segment_evt =
- gst::event::Segment::new(&gst::FormattedSegment::<gst::format::Time>::new());
- udpsrc.src_pad.push_event(segment_evt).await;
-
- self.need_segment = false;
- }
-
- let res = udpsrc.src_pad.push(buffer).await;
- match res {
- Ok(_) => gst::log!(CAT, obj: &self.element, "Successfully pushed buffer"),
- Err(gst::FlowError::Flushing) => gst::debug!(CAT, obj: &self.element, "Flushing"),
- Err(gst::FlowError::Eos) => {
- gst::debug!(CAT, obj: &self.element, "EOS");
- udpsrc.src_pad.push_event(gst::event::Eos::new()).await;
- }
- Err(err) => {
- gst::error!(CAT, obj: &self.element, "Got error {}", err);
- gst::element_error!(
- self.element,
- gst::StreamError::Failed,
- ("Internal data stream error"),
- ["streaming stopped, reason {}", err]
- );
- }
- }
-
- res
- }
}
impl TaskImpl for UdpSrcTask {
+ type Item = gst::Buffer;
+
fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
let udpsrc = self.element.imp();
@@ -489,46 +433,105 @@ impl TaskImpl for UdpSrcTask {
.boxed()
}
- fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ fn try_next(&mut self) -> BoxFuture<'_, Result<gst::Buffer, gst::FlowError>> {
async move {
- let item = self.socket.as_mut().unwrap().next().await.ok_or_else(|| {
- gst::log!(CAT, obj: &self.element, "SocketStream Stopped");
- gst::FlowError::Flushing
- })?;
-
- let (mut buffer, saddr) = item.map_err(|err| {
- gst::error!(CAT, obj: &self.element, "Got error {:?}", err);
- match err {
- SocketError::Gst(err) => {
- gst::element_error!(
- self.element,
- gst::StreamError::Failed,
- ("Internal data stream error"),
- ["streaming stopped, reason {}", err]
- );
+ self.socket
+ .as_mut()
+ .unwrap()
+ .try_next()
+ .await
+ .map(|(mut buffer, saddr)| {
+ if let Some(saddr) = saddr {
+ if self.retrieve_sender_address {
+ NetAddressMeta::add(
+ buffer.get_mut().unwrap(),
+ &gio::InetSocketAddress::from(saddr),
+ );
+ }
}
- SocketError::Io(err) => {
- gst::element_error!(
- self.element,
- gst::StreamError::Failed,
- ("I/O error"),
- ["streaming stopped, I/O error {}", err]
- );
+ buffer
+ })
+ .map_err(|err| {
+ gst::error!(CAT, obj: &self.element, "Got error {:?}", err);
+ match err {
+ SocketError::Gst(err) => {
+ gst::element_error!(
+ self.element,
+ gst::StreamError::Failed,
+ ("Internal data stream error"),
+ ["streaming stopped, reason {}", err]
+ );
+ }
+ SocketError::Io(err) => {
+ gst::element_error!(
+ self.element,
+ gst::StreamError::Failed,
+ ("I/O error"),
+ ["streaming stopped, I/O error {}", err]
+ );
+ }
}
+ gst::FlowError::Error
+ })
+ }
+ .boxed()
+ }
+
+ fn handle_item(&mut self, buffer: gst::Buffer) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ async {
+ gst::log!(CAT, obj: &self.element, "Handling {:?}", buffer);
+ let udpsrc = self.element.imp();
+
+ if self.need_initial_events {
+ gst::debug!(CAT, obj: &self.element, "Pushing initial events");
+
+ let stream_id =
+ format!("{:08x}{:08x}", rand::random::<u32>(), rand::random::<u32>());
+ let stream_start_evt = gst::event::StreamStart::builder(&stream_id)
+ .group_id(gst::GroupId::next())
+ .build();
+ udpsrc.src_pad.push_event(stream_start_evt).await;
+
+ let caps = udpsrc.settings.lock().unwrap().caps.clone();
+ if let Some(caps) = caps {
+ udpsrc
+ .src_pad
+ .push_event(gst::event::Caps::new(&caps))
+ .await;
+ *udpsrc.configured_caps.lock().unwrap() = Some(caps);
}
- gst::FlowError::Error
- })?;
- if let Some(saddr) = saddr {
- if self.retrieve_sender_address {
- NetAddressMeta::add(
- buffer.get_mut().unwrap(),
- &gio::InetSocketAddress::from(saddr),
+ self.need_initial_events = false;
+ }
+
+ if self.need_segment {
+ let segment_evt =
+ gst::event::Segment::new(&gst::FormattedSegment::<gst::format::Time>::new());
+ udpsrc.src_pad.push_event(segment_evt).await;
+
+ self.need_segment = false;
+ }
+
+ let res = udpsrc.src_pad.push(buffer).await.map(drop);
+ match res {
+ Ok(_) => gst::log!(CAT, obj: &self.element, "Successfully pushed buffer"),
+ Err(gst::FlowError::Flushing) => gst::debug!(CAT, obj: &self.element, "Flushing"),
+ Err(gst::FlowError::Eos) => {
+ gst::debug!(CAT, obj: &self.element, "EOS");
+ udpsrc.src_pad.push_event(gst::event::Eos::new()).await;
+ }
+ Err(err) => {
+ gst::error!(CAT, obj: &self.element, "Got error {}", err);
+ gst::element_error!(
+ self.element,
+ gst::StreamError::Failed,
+ ("Internal data stream error"),
+ ["streaming stopped, reason {}", err]
);
}
}
- self.push_buffer(buffer).await.map(drop)
+ res
}
.boxed()
}
@@ -616,7 +619,7 @@ impl UdpSrc {
fn pause(&self, element: &super::UdpSrc) -> Result<(), gst::ErrorMessage> {
gst::debug!(CAT, obj: element, "Pausing");
- let _ = self.task.pause().check()?;
+ self.task.pause().block_on()?;
gst::debug!(CAT, obj: element, "Paused");
Ok(())
}
diff --git a/generic/threadshare/tests/pad.rs b/generic/threadshare/tests/pad.rs
index 89917b52f..4920e2c78 100644
--- a/generic/threadshare/tests/pad.rs
+++ b/generic/threadshare/tests/pad.rs
@@ -155,19 +155,21 @@ mod imp_src {
}
impl TaskImpl for ElementSrcTestTask {
- fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
- async move {
- let item = self.receiver.next().await;
+ type Item = Item;
- let item = match item {
- Some(item) => item,
- None => {
- gst::log!(SRC_CAT, obj: &self.element, "SrcPad channel aborted");
- return Err(gst::FlowError::Eos);
- }
- };
+ fn try_next(&mut self) -> BoxFuture<'_, Result<Item, gst::FlowError>> {
+ async move {
+ self.receiver.next().await.ok_or_else(|| {
+ gst::log!(SRC_CAT, obj: &self.element, "SrcPad channel aborted");
+ gst::FlowError::Eos
+ })
+ }
+ .boxed()
+ }
- let res = self.push_item(item).await;
+ fn handle_item(&mut self, item: Item) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ async move {
+ let res = self.push_item(item).await.map(drop);
match res {
Ok(_) => gst::log!(SRC_CAT, obj: &self.element, "Successfully pushed item"),
Err(gst::FlowError::Flushing) => {
@@ -176,7 +178,7 @@ mod imp_src {
Err(err) => panic!("Got error {}", err),
}
- res.map(drop)
+ res
}
.boxed()
}
@@ -275,7 +277,7 @@ mod imp_src {
fn pause(&self, element: &super::ElementSrcTest) {
gst::debug!(SRC_CAT, obj: element, "Pausing");
- let _ = self.task.pause().check().unwrap();
+ self.task.pause().block_on().unwrap();
gst::debug!(SRC_CAT, obj: element, "Paused");
}
}