Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/sdroege/gst-plugin-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSeungha Yang <seungha@centricular.com>2023-09-09 18:17:18 +0300
committerSebastian Dröge <sebastian@centricular.com>2023-09-20 19:24:38 +0300
commitd134e165c5f69a5904a0176d62e0b76da592eab9 (patch)
treed002ebebf338587c0e316615f6cf1a081b9449a5
parent938d3d73b901859dcece0cf2473aeef896f5555c (diff)
webrtcsink: Propagate GstContext messages
Implement CustomBusStream so that NEED_CONTEXT and HAVE_CONTEXT messages from session/discovery can be forwarded to parent pipeline and also GstContext can be shared. Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1330>
-rw-r--r--net/webrtc/src/webrtcsink/imp.rs66
1 files changed, 64 insertions, 2 deletions
diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs
index f65a5ef0..aefb4468 100644
--- a/net/webrtc/src/webrtcsink/imp.rs
+++ b/net/webrtc/src/webrtcsink/imp.rs
@@ -99,6 +99,66 @@ impl Codec {
}
}
+// Same gst::bus::BusStream but hooking context message from the thread
+// where the message is posted, so that GstContext can be shared
+#[derive(Debug)]
+struct CustomBusStream {
+ bus: glib::WeakRef<gst::Bus>,
+ receiver: futures::channel::mpsc::UnboundedReceiver<gst::Message>,
+}
+
+impl CustomBusStream {
+ fn new(bin: &super::WebRTCSink, bus: &gst::Bus) -> Self {
+ let (sender, receiver) = futures::channel::mpsc::unbounded();
+
+ let bin_weak = bin.downgrade();
+ bus.set_sync_handler(move |_, msg| {
+ match msg.view() {
+ gst::MessageView::NeedContext(..) | gst::MessageView::HaveContext(..) => {
+ if let Some(bin) = bin_weak.upgrade() {
+ let _ = bin.post_message(msg.to_owned());
+ }
+ }
+ _ => {
+ let _ = sender.unbounded_send(msg.to_owned());
+ }
+ }
+
+ gst::BusSyncReply::Drop
+ });
+
+ Self {
+ bus: bus.downgrade(),
+ receiver,
+ }
+ }
+}
+
+impl Drop for CustomBusStream {
+ fn drop(&mut self) {
+ if let Some(bus) = self.bus.upgrade() {
+ bus.unset_sync_handler();
+ }
+ }
+}
+
+impl futures::Stream for CustomBusStream {
+ type Item = gst::Message;
+
+ fn poll_next(
+ mut self: std::pin::Pin<&mut Self>,
+ context: &mut std::task::Context,
+ ) -> std::task::Poll<Option<Self::Item>> {
+ self.receiver.poll_next_unpin(context)
+ }
+}
+
+impl futures::stream::FusedStream for CustomBusStream {
+ fn is_terminated(&self) -> bool {
+ self.receiver.is_terminated()
+ }
+}
+
/// Wrapper around our sink pads
#[derive(Debug, Clone)]
struct InputStream {
@@ -1795,7 +1855,8 @@ impl WebRTCSink {
pipeline.set_start_time(gst::ClockTime::NONE);
pipeline.set_base_time(element.base_time().unwrap());
- let mut bus_stream = pipeline.bus().unwrap().stream();
+ let bus = pipeline.bus().unwrap();
+ let mut bus_stream = CustomBusStream::new(element, &bus);
let element_clone = element.downgrade();
let pipeline_clone = pipeline.downgrade();
let session_id_clone = session_id.to_owned();
@@ -2251,7 +2312,8 @@ impl WebRTCSink {
src.set_property("num-buffers", 1);
- let mut stream = pipe.0.bus().unwrap().stream();
+ let bus = pipe.0.bus().unwrap();
+ let mut stream = CustomBusStream::new(element, &bus);
pipe.0
.set_state(gst::State::Playing)