diff options
author | François Laignel <fengalin@free.fr> | 2022-08-10 21:10:08 +0300 |
---|---|---|
committer | François Laignel <fengalin@free.fr> | 2022-08-10 21:10:08 +0300 |
commit | 33e601d33e50f7867a33436f60372b388dc22caa (patch) | |
tree | 09b8e57dc8453b03eb1b4c11ecd3d935d57305ac /generic/threadshare/src/tcpclientsrc | |
parent | 8b54c3fed699fb8f47c30c531d58eec67f0079a9 (diff) |
ts: migrate elements to try_next / handle_item
See previous commit for details.
Also switched to panicking for some programming errors.
Diffstat (limited to 'generic/threadshare/src/tcpclientsrc')
-rw-r--r-- | generic/threadshare/src/tcpclientsrc/imp.rs | 65 |
1 files changed, 35 insertions, 30 deletions
diff --git a/generic/threadshare/src/tcpclientsrc/imp.rs b/generic/threadshare/src/tcpclientsrc/imp.rs index 32ef55e5e..6ebe12535 100644 --- a/generic/threadshare/src/tcpclientsrc/imp.rs +++ b/generic/threadshare/src/tcpclientsrc/imp.rs @@ -278,6 +278,8 @@ impl TcpClientSrcTask { } impl TaskImpl for TcpClientSrcTask { + type Item = gst::Buffer; + fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { gst::log!(CAT, obj: &self.element, "Preparing task connecting to {:?}", self.saddr); @@ -331,41 +333,44 @@ impl TaskImpl for TcpClientSrcTask { .boxed() } - fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { + fn try_next(&mut self) -> BoxFuture<'_, Result<gst::Buffer, gst::FlowError>> { async move { - let item = self.socket.as_mut().unwrap().next().await.ok_or_else(|| { - gst::log!(CAT, obj: &self.element, "SocketStream Stopped"); - gst::FlowError::Flushing - })?; - - let (buffer, _) = item.map_err(|err| { - gst::error!(CAT, obj: &self.element, "Got error {:?}", err); - match err { - SocketError::Gst(err) => { - gst::element_error!( - self.element, - gst::StreamError::Failed, - ("Internal data stream error"), - ["streaming stopped, reason {}", err] - ); - } - SocketError::Io(err) => { - gst::element_error!( - self.element, - gst::StreamError::Failed, - ("I/O error"), - ["streaming stopped, I/O error {}", err] - ); + self.socket + .as_mut() + .unwrap() + .try_next() + .await + .map(|(buffer, _saddr)| buffer) + .map_err(|err| { + gst::error!(CAT, obj: &self.element, "Got error {:?}", err); + match err { + SocketError::Gst(err) => { + gst::element_error!( + self.element, + gst::StreamError::Failed, + ("Internal data stream error"), + ["streaming stopped, reason {}", err] + ); + } + SocketError::Io(err) => { + gst::element_error!( + self.element, + gst::StreamError::Failed, + ("I/O error"), + ["streaming stopped, I/O error {}", err] + ); + } } - } - gst::FlowError::Error - })?; - - self.push_buffer(buffer).await.map(drop) + gst::FlowError::Error + }) } .boxed() } + fn handle_item(&mut self, buffer: gst::Buffer) -> BoxFuture<'_, Result<(), gst::FlowError>> { + self.push_buffer(buffer).map_ok(drop).boxed() + } + fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { gst::log!(CAT, obj: &self.element, "Stopping task"); @@ -486,7 +491,7 @@ impl TcpClientSrc { fn pause(&self, element: &super::TcpClientSrc) -> Result<(), gst::ErrorMessage> { gst::debug!(CAT, obj: element, "Pausing"); - let _ = self.task.pause().check()?; + self.task.pause().block_on()?; gst::debug!(CAT, obj: element, "Paused"); Ok(()) } |