Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSeungha Yang <seungha@centricular.com>2023-09-09 18:17:18 +0300
committerSeungha Yang <seungha@centricular.com>2023-09-14 18:26:08 +0300
commit225482f7ede7ec08d4b33ced473e4dcd68e41d42 (patch)
treea6736e7eda72987970592fa25f8e7ef3e700cac8 /net/webrtc
parent1de7754616709f624a51c32a71c3ccf0a02450e5 (diff)
webrtcsink: Propagate GstContext messages
Implement CustomBusStream so that NEED_CONTEXT and HAVE_CONTEXT messages from session/discovery can be forwarded to parent pipeline and also GstContext can be shared. Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1322>
Diffstat (limited to 'net/webrtc')
-rw-r--r--net/webrtc/src/webrtcsink/imp.rs66
1 files changed, 64 insertions, 2 deletions
diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs
index 45c68d927..f6b83ae6c 100644
--- a/net/webrtc/src/webrtcsink/imp.rs
+++ b/net/webrtc/src/webrtcsink/imp.rs
@@ -118,6 +118,66 @@ impl DiscoveryInfo {
}
}
+// Same gst::bus::BusStream but hooking context message from the thread
+// where the message is posted, so that GstContext can be shared
+#[derive(Debug)]
+struct CustomBusStream {
+ bus: glib::WeakRef<gst::Bus>,
+ receiver: futures::channel::mpsc::UnboundedReceiver<gst::Message>,
+}
+
+impl CustomBusStream {
+ fn new(bin: &super::BaseWebRTCSink, bus: &gst::Bus) -> Self {
+ let (sender, receiver) = futures::channel::mpsc::unbounded();
+
+ let bin_weak = bin.downgrade();
+ bus.set_sync_handler(move |_, msg| {
+ match msg.view() {
+ gst::MessageView::NeedContext(..) | gst::MessageView::HaveContext(..) => {
+ if let Some(bin) = bin_weak.upgrade() {
+ let _ = bin.post_message(msg.to_owned());
+ }
+ }
+ _ => {
+ let _ = sender.unbounded_send(msg.to_owned());
+ }
+ }
+
+ gst::BusSyncReply::Drop
+ });
+
+ Self {
+ bus: bus.downgrade(),
+ receiver,
+ }
+ }
+}
+
+impl Drop for CustomBusStream {
+ fn drop(&mut self) {
+ if let Some(bus) = self.bus.upgrade() {
+ bus.unset_sync_handler();
+ }
+ }
+}
+
+impl futures::Stream for CustomBusStream {
+ type Item = gst::Message;
+
+ fn poll_next(
+ mut self: std::pin::Pin<&mut Self>,
+ context: &mut std::task::Context,
+ ) -> std::task::Poll<Option<Self::Item>> {
+ self.receiver.poll_next_unpin(context)
+ }
+}
+
+impl futures::stream::FusedStream for CustomBusStream {
+ fn is_terminated(&self) -> bool {
+ self.receiver.is_terminated()
+ }
+}
+
/// Wrapper around our sink pads
#[derive(Debug, Clone)]
struct InputStream {
@@ -2241,7 +2301,8 @@ impl BaseWebRTCSink {
pipeline.set_start_time(gst::ClockTime::NONE);
pipeline.set_base_time(element.base_time().unwrap());
- let mut bus_stream = pipeline.bus().unwrap().stream();
+ let bus = pipeline.bus().unwrap();
+ let mut bus_stream = CustomBusStream::new(&element, &bus);
let element_clone = element.downgrade();
let pipeline_clone = pipeline.downgrade();
let session_id_clone = session_id.clone();
@@ -2851,7 +2912,8 @@ impl BaseWebRTCSink {
.link(&sink)
.with_context(|| format!("Running discovery pipeline for caps {input_caps}"))?;
- let mut stream = pipe.0.bus().unwrap().stream();
+ let bus = pipe.0.bus().unwrap();
+ let mut stream = CustomBusStream::new(element, &bus);
pipe.0
.set_state(gst::State::Playing)