Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrançois Laignel <fengalin@free.fr>2022-08-10 21:10:08 +0300
committerFrançois Laignel <fengalin@free.fr>2022-08-10 21:10:08 +0300
commit33e601d33e50f7867a33436f60372b388dc22caa (patch)
tree09b8e57dc8453b03eb1b4c11ecd3d935d57305ac /generic/threadshare/src/tcpclientsrc
parent8b54c3fed699fb8f47c30c531d58eec67f0079a9 (diff)
ts: migrate elements to try_next / handle_item
See previous commit for details. Also switched to panicking for some programming errors.
Diffstat (limited to 'generic/threadshare/src/tcpclientsrc')
-rw-r--r--generic/threadshare/src/tcpclientsrc/imp.rs65
1 files changed, 35 insertions, 30 deletions
diff --git a/generic/threadshare/src/tcpclientsrc/imp.rs b/generic/threadshare/src/tcpclientsrc/imp.rs
index 32ef55e5e..6ebe12535 100644
--- a/generic/threadshare/src/tcpclientsrc/imp.rs
+++ b/generic/threadshare/src/tcpclientsrc/imp.rs
@@ -278,6 +278,8 @@ impl TcpClientSrcTask {
}
impl TaskImpl for TcpClientSrcTask {
+ type Item = gst::Buffer;
+
fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::log!(CAT, obj: &self.element, "Preparing task connecting to {:?}", self.saddr);
@@ -331,41 +333,44 @@ impl TaskImpl for TcpClientSrcTask {
.boxed()
}
- fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ fn try_next(&mut self) -> BoxFuture<'_, Result<gst::Buffer, gst::FlowError>> {
async move {
- let item = self.socket.as_mut().unwrap().next().await.ok_or_else(|| {
- gst::log!(CAT, obj: &self.element, "SocketStream Stopped");
- gst::FlowError::Flushing
- })?;
-
- let (buffer, _) = item.map_err(|err| {
- gst::error!(CAT, obj: &self.element, "Got error {:?}", err);
- match err {
- SocketError::Gst(err) => {
- gst::element_error!(
- self.element,
- gst::StreamError::Failed,
- ("Internal data stream error"),
- ["streaming stopped, reason {}", err]
- );
- }
- SocketError::Io(err) => {
- gst::element_error!(
- self.element,
- gst::StreamError::Failed,
- ("I/O error"),
- ["streaming stopped, I/O error {}", err]
- );
+ self.socket
+ .as_mut()
+ .unwrap()
+ .try_next()
+ .await
+ .map(|(buffer, _saddr)| buffer)
+ .map_err(|err| {
+ gst::error!(CAT, obj: &self.element, "Got error {:?}", err);
+ match err {
+ SocketError::Gst(err) => {
+ gst::element_error!(
+ self.element,
+ gst::StreamError::Failed,
+ ("Internal data stream error"),
+ ["streaming stopped, reason {}", err]
+ );
+ }
+ SocketError::Io(err) => {
+ gst::element_error!(
+ self.element,
+ gst::StreamError::Failed,
+ ("I/O error"),
+ ["streaming stopped, I/O error {}", err]
+ );
+ }
}
- }
- gst::FlowError::Error
- })?;
-
- self.push_buffer(buffer).await.map(drop)
+ gst::FlowError::Error
+ })
}
.boxed()
}
+ fn handle_item(&mut self, buffer: gst::Buffer) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ self.push_buffer(buffer).map_ok(drop).boxed()
+ }
+
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::log!(CAT, obj: &self.element, "Stopping task");
@@ -486,7 +491,7 @@ impl TcpClientSrc {
fn pause(&self, element: &super::TcpClientSrc) -> Result<(), gst::ErrorMessage> {
gst::debug!(CAT, obj: element, "Pausing");
- let _ = self.task.pause().check()?;
+ self.task.pause().block_on()?;
gst::debug!(CAT, obj: element, "Paused");
Ok(())
}