diff options
author | Seungha Yang <seungha@centricular.com> | 2023-09-09 18:17:18 +0300 |
---|---|---|
committer | Sebastian Dröge <sebastian@centricular.com> | 2023-09-20 19:24:38 +0300 |
commit | d134e165c5f69a5904a0176d62e0b76da592eab9 (patch) | |
tree | d002ebebf338587c0e316615f6cf1a081b9449a5 | |
parent | 938d3d73b901859dcece0cf2473aeef896f5555c (diff) |
webrtcsink: Propagate GstContext messages
Implement CustomBusStream so that NEED_CONTEXT and HAVE_CONTEXT
messages from session/discovery can be forwarded to parent
pipeline and also GstContext can be shared.
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1330>
-rw-r--r-- | net/webrtc/src/webrtcsink/imp.rs | 66 |
1 files changed, 64 insertions, 2 deletions
diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs index f65a5ef0..aefb4468 100644 --- a/net/webrtc/src/webrtcsink/imp.rs +++ b/net/webrtc/src/webrtcsink/imp.rs @@ -99,6 +99,66 @@ impl Codec { } } +// Same gst::bus::BusStream but hooking context message from the thread +// where the message is posted, so that GstContext can be shared +#[derive(Debug)] +struct CustomBusStream { + bus: glib::WeakRef<gst::Bus>, + receiver: futures::channel::mpsc::UnboundedReceiver<gst::Message>, +} + +impl CustomBusStream { + fn new(bin: &super::WebRTCSink, bus: &gst::Bus) -> Self { + let (sender, receiver) = futures::channel::mpsc::unbounded(); + + let bin_weak = bin.downgrade(); + bus.set_sync_handler(move |_, msg| { + match msg.view() { + gst::MessageView::NeedContext(..) | gst::MessageView::HaveContext(..) => { + if let Some(bin) = bin_weak.upgrade() { + let _ = bin.post_message(msg.to_owned()); + } + } + _ => { + let _ = sender.unbounded_send(msg.to_owned()); + } + } + + gst::BusSyncReply::Drop + }); + + Self { + bus: bus.downgrade(), + receiver, + } + } +} + +impl Drop for CustomBusStream { + fn drop(&mut self) { + if let Some(bus) = self.bus.upgrade() { + bus.unset_sync_handler(); + } + } +} + +impl futures::Stream for CustomBusStream { + type Item = gst::Message; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + context: &mut std::task::Context, + ) -> std::task::Poll<Option<Self::Item>> { + self.receiver.poll_next_unpin(context) + } +} + +impl futures::stream::FusedStream for CustomBusStream { + fn is_terminated(&self) -> bool { + self.receiver.is_terminated() + } +} + /// Wrapper around our sink pads #[derive(Debug, Clone)] struct InputStream { @@ -1795,7 +1855,8 @@ impl WebRTCSink { pipeline.set_start_time(gst::ClockTime::NONE); pipeline.set_base_time(element.base_time().unwrap()); - let mut bus_stream = pipeline.bus().unwrap().stream(); + let bus = pipeline.bus().unwrap(); + let mut bus_stream = CustomBusStream::new(element, &bus); let element_clone = element.downgrade(); let pipeline_clone = pipeline.downgrade(); let session_id_clone = session_id.to_owned(); @@ -2251,7 +2312,8 @@ impl WebRTCSink { src.set_property("num-buffers", 1); - let mut stream = pipe.0.bus().unwrap().stream(); + let bus = pipe.0.bus().unwrap(); + let mut stream = CustomBusStream::new(element, &bus); pipe.0 .set_state(gst::State::Playing) |