diff options
author | Seungha Yang <seungha@centricular.com> | 2023-09-09 18:17:18 +0300 |
---|---|---|
committer | Seungha Yang <seungha@centricular.com> | 2023-09-14 18:26:08 +0300 |
commit | 225482f7ede7ec08d4b33ced473e4dcd68e41d42 (patch) | |
tree | a6736e7eda72987970592fa25f8e7ef3e700cac8 /net/webrtc | |
parent | 1de7754616709f624a51c32a71c3ccf0a02450e5 (diff) |
webrtcsink: Propagate GstContext messages
Implement CustomBusStream so that NEED_CONTEXT and HAVE_CONTEXT
messages from session/discovery can be forwarded to parent
pipeline and also GstContext can be shared.
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1322>
Diffstat (limited to 'net/webrtc')
-rw-r--r-- | net/webrtc/src/webrtcsink/imp.rs | 66 |
1 files changed, 64 insertions, 2 deletions
diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs index 45c68d927..f6b83ae6c 100644 --- a/net/webrtc/src/webrtcsink/imp.rs +++ b/net/webrtc/src/webrtcsink/imp.rs @@ -118,6 +118,66 @@ impl DiscoveryInfo { } } +// Same gst::bus::BusStream but hooking context message from the thread +// where the message is posted, so that GstContext can be shared +#[derive(Debug)] +struct CustomBusStream { + bus: glib::WeakRef<gst::Bus>, + receiver: futures::channel::mpsc::UnboundedReceiver<gst::Message>, +} + +impl CustomBusStream { + fn new(bin: &super::BaseWebRTCSink, bus: &gst::Bus) -> Self { + let (sender, receiver) = futures::channel::mpsc::unbounded(); + + let bin_weak = bin.downgrade(); + bus.set_sync_handler(move |_, msg| { + match msg.view() { + gst::MessageView::NeedContext(..) | gst::MessageView::HaveContext(..) => { + if let Some(bin) = bin_weak.upgrade() { + let _ = bin.post_message(msg.to_owned()); + } + } + _ => { + let _ = sender.unbounded_send(msg.to_owned()); + } + } + + gst::BusSyncReply::Drop + }); + + Self { + bus: bus.downgrade(), + receiver, + } + } +} + +impl Drop for CustomBusStream { + fn drop(&mut self) { + if let Some(bus) = self.bus.upgrade() { + bus.unset_sync_handler(); + } + } +} + +impl futures::Stream for CustomBusStream { + type Item = gst::Message; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + context: &mut std::task::Context, + ) -> std::task::Poll<Option<Self::Item>> { + self.receiver.poll_next_unpin(context) + } +} + +impl futures::stream::FusedStream for CustomBusStream { + fn is_terminated(&self) -> bool { + self.receiver.is_terminated() + } +} + /// Wrapper around our sink pads #[derive(Debug, Clone)] struct InputStream { @@ -2241,7 +2301,8 @@ impl BaseWebRTCSink { pipeline.set_start_time(gst::ClockTime::NONE); pipeline.set_base_time(element.base_time().unwrap()); - let mut bus_stream = pipeline.bus().unwrap().stream(); + let bus = pipeline.bus().unwrap(); + let mut bus_stream = CustomBusStream::new(&element, &bus); let element_clone = element.downgrade(); let pipeline_clone = pipeline.downgrade(); let session_id_clone = session_id.clone(); @@ -2851,7 +2912,8 @@ impl BaseWebRTCSink { .link(&sink) .with_context(|| format!("Running discovery pipeline for caps {input_caps}"))?; - let mut stream = pipe.0.bus().unwrap().stream(); + let bus = pipe.0.bus().unwrap(); + let mut stream = CustomBusStream::new(element, &bus); pipe.0 .set_state(gst::State::Playing) |