Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/sdroege/gst-plugin-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThibault Saunier <tsaunier@igalia.com>2022-07-13 00:13:38 +0300
committerThibault Saunier <tsaunier@igalia.com>2023-03-02 20:56:30 +0300
commit4b867d27fec6703b3d6d2ea571c902a19b01d6d9 (patch)
tree40278bde869efce54a5d1d5a4c6272dc14699e73
parentf2b03d37969dc4d082597a35004670bf2ccc0c74 (diff)
Add a webrtcsrc element
Updating the docker image to include: https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/3236 Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1117>
-rw-r--r--Cargo.lock2
-rw-r--r--docs/meson.build8
-rw-r--r--docs/plugins/gst_plugins_cache.json241
-rw-r--r--net/webrtc/Cargo.toml2
-rw-r--r--net/webrtc/src/lib.rs3
-rw-r--r--net/webrtc/src/utils.rs87
-rw-r--r--net/webrtc/src/webrtcsrc/imp.rs1099
-rw-r--r--net/webrtc/src/webrtcsrc/mod.rs65
-rw-r--r--net/webrtc/src/webrtcsrc/pad.rs45
-rw-r--r--net/webrtc/src/webrtcsrc/signaller/iface.rs426
-rw-r--r--net/webrtc/src/webrtcsrc/signaller/imp.rs586
-rw-r--r--net/webrtc/src/webrtcsrc/signaller/mod.rs46
12 files changed, 2610 insertions, 0 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 9f71838f..251c87bf 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2707,6 +2707,7 @@ dependencies = [
"gst-plugin-webrtc-signalling-protocol",
"gstreamer",
"gstreamer-app",
+ "gstreamer-base",
"gstreamer-rtp",
"gstreamer-sdp",
"gstreamer-utils",
@@ -2720,6 +2721,7 @@ dependencies = [
"tracing",
"tracing-log",
"tracing-subscriber",
+ "url",
"uuid",
]
diff --git a/docs/meson.build b/docs/meson.build
index 69d5cbf7..a6697ee7 100644
--- a/docs/meson.build
+++ b/docs/meson.build
@@ -99,9 +99,17 @@ foreach plugin_name: list_plugin_res.stdout().split(':')
gst_index: 'plugins/index.md',
include_paths: join_paths(meson.current_source_dir(), '..'),
gst_smart_index: true,
+ gst_c_source_filters: [
+ '../target/*/*.rs',
+ '../target/*/*/*.rs',
+ '../target/*/*/*/*.rs',
+ '../target/*/*/*/*/*.rs',
+ '../target/*/*/*/*/*/*.rs',
+ ],
gst_c_sources: [
'../*/*/*/*.rs',
'../*/*/*/*/*.rs',
+ '../*/*/*/*/*/*.rs',
],
dependencies: [gst_dep],
gst_order_generated_subpages: true,
diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json
index e5c62498..a564bb6b 100644
--- a/docs/plugins/gst_plugins_cache.json
+++ b/docs/plugins/gst_plugins_cache.json
@@ -6148,11 +6148,240 @@
"when": "last"
}
}
+ },
+ "webrtcsrc": {
+ "author": "Thibault Saunier <tsaunier@igalia.com>",
+ "description": "WebRTC src",
+ "hierarchy": [
+ "GstWebRTCSrc",
+ "GstBin",
+ "GstElement",
+ "GstObject",
+ "GInitiallyUnowned",
+ "GObject"
+ ],
+ "interfaces": [
+ "GstChildProxy",
+ "GstURIHandler"
+ ],
+ "klass": "Source/Network/WebRTC",
+ "long-name": "WebRTCSrc",
+ "pad-templates": {
+ "audio_%%u": {
+ "caps": "audio/x-raw(ANY):\naudio/x-opus:\napplication/x-rtp:\n",
+ "direction": "src",
+ "presence": "sometimes",
+ "type": "GstWebRTCSrcPad"
+ },
+ "video_%%u": {
+ "caps": "video/x-raw(ANY):\napplication/x-rtp:\n",
+ "direction": "src",
+ "presence": "sometimes",
+ "type": "GstWebRTCSrcPad"
+ }
+ },
+ "properties": {
+ "audio-codecs": {
+ "blurb": "Names of audio codecs to be be used during the SDP negotiation. Valid values: [OPUS]",
+ "conditionally-available": false,
+ "construct": false,
+ "construct-only": false,
+ "controllable": false,
+ "mutable": "ready",
+ "readable": true,
+ "type": "GstValueArray",
+ "writable": true
+ },
+ "meta": {
+ "blurb": "Free form metadata about the consumer",
+ "conditionally-available": false,
+ "construct": false,
+ "construct-only": false,
+ "controllable": false,
+ "mutable": "ready",
+ "readable": true,
+ "type": "GstStructure",
+ "writable": true
+ },
+ "signaller": {
+ "blurb": "The Signallable object to use to handle WebRTC Signalling",
+ "conditionally-available": false,
+ "construct": false,
+ "construct-only": false,
+ "controllable": false,
+ "mutable": "ready",
+ "readable": true,
+ "type": "GstRSWebRTCSignallableIface",
+ "writable": true
+ },
+ "stun-server": {
+ "blurb": "NULL",
+ "conditionally-available": false,
+ "construct": false,
+ "construct-only": false,
+ "controllable": false,
+ "default": "stun://stun.l.google.com:19302",
+ "mutable": "null",
+ "readable": true,
+ "type": "gchararray",
+ "writable": true
+ },
+ "video-codecs": {
+ "blurb": "Names of video codecs to be be used during the SDP negotiation. Valid values: [VP8, H264, VP9, H265]",
+ "conditionally-available": false,
+ "construct": false,
+ "construct-only": false,
+ "controllable": false,
+ "mutable": "ready",
+ "readable": true,
+ "type": "GstValueArray",
+ "writable": true
+ }
+ },
+ "rank": "primary"
}
},
"filename": "gstrswebrtc",
"license": "MPL-2.0",
"other-types": {
+ "GstRSWebRTCSignallableIface": {
+ "hierarchy": [
+ "GstRSWebRTCSignallableIface",
+ "GInterface"
+ ],
+ "kind": "interface",
+ "signals": {
+ "error": {
+ "args": [
+ {
+ "name": "arg0",
+ "type": "gchararray"
+ }
+ ],
+ "return-type": "void",
+ "when": "last"
+ },
+ "handle-ice": {
+ "args": [
+ {
+ "name": "arg0",
+ "type": "gchararray"
+ },
+ {
+ "name": "arg1",
+ "type": "guint"
+ },
+ {
+ "name": "arg2",
+ "type": "gchararray"
+ },
+ {
+ "name": "arg3",
+ "type": "gchararray"
+ }
+ ],
+ "return-type": "void",
+ "when": "last"
+ },
+ "producer-added": {
+ "args": [
+ {
+ "name": "arg0",
+ "type": "gchararray"
+ },
+ {
+ "name": "arg1",
+ "type": "GstStructure"
+ }
+ ],
+ "return-type": "void",
+ "when": "last"
+ },
+ "producer-removed": {
+ "args": [
+ {
+ "name": "arg0",
+ "type": "gchararray"
+ },
+ {
+ "name": "arg1",
+ "type": "GstStructure"
+ }
+ ],
+ "return-type": "void",
+ "when": "last"
+ },
+ "request-meta": {
+ "args": [],
+ "return-type": "GstStructure",
+ "when": "last"
+ },
+ "session-description": {
+ "args": [
+ {
+ "name": "arg0",
+ "type": "gchararray"
+ },
+ {
+ "name": "arg1",
+ "type": "GstWebRTCSessionDescription"
+ }
+ ],
+ "return-type": "void",
+ "when": "last"
+ },
+ "session-ended": {
+ "args": [
+ {
+ "name": "arg0",
+ "type": "gchararray"
+ }
+ ],
+ "return-type": "void",
+ "when": "last"
+ },
+ "session-requested": {
+ "args": [
+ {
+ "name": "arg0",
+ "type": "gchararray"
+ },
+ {
+ "name": "arg1",
+ "type": "gchararray"
+ }
+ ],
+ "return-type": "void",
+ "when": "last"
+ },
+ "session-started": {
+ "args": [
+ {
+ "name": "arg0",
+ "type": "gchararray"
+ },
+ {
+ "name": "arg1",
+ "type": "gchararray"
+ }
+ ],
+ "return-type": "void",
+ "when": "last"
+ },
+ "start": {
+ "action": true,
+ "args": [],
+ "return-type": "void",
+ "when": "last"
+ },
+ "stop": {
+ "action": true,
+ "args": [],
+ "return-type": "void",
+ "when": "last"
+ }
+ }
+ },
"GstWebRTCSinkCongestionControl": {
"kind": "enum",
"values": [
@@ -6172,6 +6401,18 @@
"value": "2"
}
]
+ },
+ "GstWebRTCSrcPad": {
+ "hierarchy": [
+ "GstWebRTCSrcPad",
+ "GstGhostPad",
+ "GstProxyPad",
+ "GstPad",
+ "GstObject",
+ "GInitiallyUnowned",
+ "GObject"
+ ],
+ "kind": "object"
}
},
"package": "gst-plugin-webrtc",
diff --git a/net/webrtc/Cargo.toml b/net/webrtc/Cargo.toml
index cb257ba5..ed763602 100644
--- a/net/webrtc/Cargo.toml
+++ b/net/webrtc/Cargo.toml
@@ -16,6 +16,7 @@ gst-webrtc = { git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", bran
gst-sdp = { git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "0.19", version = "0.19", package = "gstreamer-sdp", features = ["v1_20"] }
gst-rtp = { git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "0.19", version = "0.19", package = "gstreamer-rtp", features = ["v1_20"] }
gst-utils = { git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "0.19", version = "0.19", package = "gstreamer-utils" }
+gst-base = { git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "0.19", version = "0.19", package = "gstreamer-base" }
once_cell = "1.0"
anyhow = "1"
thiserror = "1"
@@ -28,6 +29,7 @@ serde_json = "1"
fastrand = "1.0"
gst_plugin_webrtc_protocol = { version = "0.9", path="protocol", package = "gst-plugin-webrtc-signalling-protocol" }
human_bytes = "0.4"
+url = "2"
[dev-dependencies]
tracing = { version = "0.1", features = ["log"] }
diff --git a/net/webrtc/src/lib.rs b/net/webrtc/src/lib.rs
index a0de1937..5b06e395 100644
--- a/net/webrtc/src/lib.rs
+++ b/net/webrtc/src/lib.rs
@@ -9,10 +9,13 @@
use gst::glib;
mod signaller;
+pub mod utils;
pub mod webrtcsink;
+pub mod webrtcsrc;
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
webrtcsink::register(plugin)?;
+ webrtcsrc::register(Some(plugin))?;
Ok(())
}
diff --git a/net/webrtc/src/utils.rs b/net/webrtc/src/utils.rs
new file mode 100644
index 00000000..b9026506
--- /dev/null
+++ b/net/webrtc/src/utils.rs
@@ -0,0 +1,87 @@
+use std::collections::HashMap;
+
+use gst::{glib, prelude::*};
+
+pub fn gvalue_to_json(val: &gst::glib::Value) -> Option<serde_json::Value> {
+ match val.type_() {
+ glib::Type::STRING => Some(val.get::<String>().unwrap().into()),
+ glib::Type::BOOL => Some(val.get::<bool>().unwrap().into()),
+ glib::Type::I32 => Some(val.get::<i32>().unwrap().into()),
+ glib::Type::U32 => Some(val.get::<u32>().unwrap().into()),
+ glib::Type::I_LONG | glib::Type::I64 => Some(val.get::<i64>().unwrap().into()),
+ glib::Type::U_LONG | glib::Type::U64 => Some(val.get::<u64>().unwrap().into()),
+ glib::Type::F32 => Some(val.get::<f32>().unwrap().into()),
+ glib::Type::F64 => Some(val.get::<f64>().unwrap().into()),
+ _ => {
+ if let Ok(s) = val.get::<gst::Structure>() {
+ serde_json::to_value(
+ s.iter()
+ .filter_map(|(name, value)| {
+ gvalue_to_json(value).map(|value| (name.to_string(), value))
+ })
+ .collect::<HashMap<String, serde_json::Value>>(),
+ )
+ .ok()
+ } else if let Ok(a) = val.get::<gst::Array>() {
+ serde_json::to_value(
+ a.iter()
+ .filter_map(|value| gvalue_to_json(value))
+ .collect::<Vec<serde_json::Value>>(),
+ )
+ .ok()
+ } else if let Some((_klass, values)) = gst::glib::FlagsValue::from_value(val) {
+ Some(
+ values
+ .iter()
+ .map(|value| value.nick())
+ .collect::<Vec<&str>>()
+ .join("+")
+ .into(),
+ )
+ } else if let Ok(value) = val.serialize() {
+ Some(value.as_str().into())
+ } else {
+ None
+ }
+ }
+ }
+}
+
+fn json_to_gststructure(val: &serde_json::Value) -> Option<glib::SendValue> {
+ match val {
+ serde_json::Value::Bool(v) => Some(v.to_send_value()),
+ serde_json::Value::Number(n) => {
+ if n.is_u64() {
+ Some(n.as_u64().unwrap().to_send_value())
+ } else if n.is_i64() {
+ Some(n.as_i64().unwrap().to_send_value())
+ } else if n.is_f64() {
+ Some(n.as_f64().unwrap().to_send_value())
+ } else {
+ todo!("Unhandled case {n:?}");
+ }
+ }
+ serde_json::Value::String(v) => Some(v.to_send_value()),
+ serde_json::Value::Array(v) => {
+ let array = v
+ .iter()
+ .filter_map(json_to_gststructure)
+ .collect::<Vec<glib::SendValue>>();
+ Some(gst::Array::from_values(array).to_send_value())
+ }
+ serde_json::Value::Object(v) => Some(serialize_json_object(v).to_send_value()),
+ _ => None,
+ }
+}
+
+pub fn serialize_json_object(val: &serde_json::Map<String, serde_json::Value>) -> gst::Structure {
+ let mut res = gst::Structure::new_empty("v");
+
+ val.iter().for_each(|(k, v)| {
+ if let Some(gvalue) = json_to_gststructure(v) {
+ res.set_value(k, gvalue);
+ }
+ });
+
+ res
+}
diff --git a/net/webrtc/src/webrtcsrc/imp.rs b/net/webrtc/src/webrtcsrc/imp.rs
new file mode 100644
index 00000000..0330b57e
--- /dev/null
+++ b/net/webrtc/src/webrtcsrc/imp.rs
@@ -0,0 +1,1099 @@
+// SPDX-License-Identifier: MPL-2.0
+
+use gst::prelude::*;
+
+use crate::webrtcsrc::signaller::{prelude::*, Signallable, Signaller};
+use crate::webrtcsrc::WebRTCSrcPad;
+use anyhow::{Context, Error};
+use core::ops::Deref;
+use gst::glib;
+use gst::subclass::prelude::*;
+use once_cell::sync::Lazy;
+use std::str::FromStr;
+use std::sync::atomic::AtomicBool;
+use std::sync::atomic::AtomicU16;
+use std::sync::atomic::Ordering;
+use std::sync::Mutex;
+use url::Url;
+
+const DEFAULT_STUN_SERVER: Option<&str> = Some("stun://stun.l.google.com:19302");
+
+static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
+ gst::DebugCategory::new(
+ "webrtcsrc",
+ gst::DebugColorFlags::empty(),
+ Some("WebRTC src"),
+ )
+});
+
+struct Codec {
+ name: String,
+ caps: gst::Caps,
+ has_decoder: AtomicBool,
+ stream_type: gst::StreamType,
+}
+
+impl Clone for Codec {
+ fn clone(&self) -> Self {
+ Self {
+ name: self.name.clone(),
+ caps: self.caps.clone(),
+ has_decoder: AtomicBool::new(self.has_decoder.load(Ordering::SeqCst)),
+ stream_type: self.stream_type,
+ }
+ }
+}
+
+impl Codec {
+ fn new(
+ name: &str,
+ stream_type: gst::StreamType,
+ caps: &gst::Caps,
+ decoders: &glib::List<gst::ElementFactory>,
+ ) -> Self {
+ let has_decoder = Self::has_decoder_for_caps(caps, decoders);
+
+ Self {
+ caps: caps.clone(),
+ stream_type,
+ name: name.into(),
+ has_decoder: AtomicBool::new(has_decoder),
+ }
+ }
+
+ fn has_decoder(&self) -> bool {
+ if self.has_decoder.load(Ordering::SeqCst) {
+ true
+ } else if Self::has_decoder_for_caps(
+ &self.caps,
+ // Replicating decodebin logic
+ &gst::ElementFactory::factories_with_type(
+ gst::ElementFactoryType::DECODER,
+ gst::Rank::Marginal,
+ ),
+ ) {
+ // Check if new decoders have been installed meanwhile
+ self.has_decoder.store(true, Ordering::SeqCst);
+ true
+ } else {
+ false
+ }
+ }
+
+ fn has_decoder_for_caps(caps: &gst::Caps, decoders: &glib::List<gst::ElementFactory>) -> bool {
+ decoders.iter().any(|factory| {
+ factory.static_pad_templates().iter().any(|template| {
+ let template_caps = template.caps();
+ template.direction() == gst::PadDirection::Sink
+ && !template_caps.is_any()
+ && caps.can_intersect(&template_caps)
+ })
+ })
+ }
+}
+
+static AUDIO_CAPS: Lazy<gst::Caps> = Lazy::new(|| gst::Caps::new_empty_simple("audio/x-raw"));
+static OPUS_CAPS: Lazy<gst::Caps> = Lazy::new(|| gst::Caps::new_empty_simple("audio/x-opus"));
+
+static VIDEO_CAPS: Lazy<gst::Caps> = Lazy::new(|| {
+ gst::Caps::builder_full_with_any_features()
+ .structure(gst::Structure::new_empty("video/x-raw"))
+ .build()
+});
+static VP8_CAPS: Lazy<gst::Caps> = Lazy::new(|| gst::Caps::new_empty_simple("video/x-vp8"));
+static VP9_CAPS: Lazy<gst::Caps> = Lazy::new(|| gst::Caps::new_empty_simple("video/x-vp9"));
+static H264_CAPS: Lazy<gst::Caps> = Lazy::new(|| gst::Caps::new_empty_simple("video/x-h264"));
+static H265_CAPS: Lazy<gst::Caps> = Lazy::new(|| gst::Caps::new_empty_simple("video/x-h265"));
+
+static RTP_CAPS: Lazy<gst::Caps> = Lazy::new(|| gst::Caps::new_empty_simple("application/x-rtp"));
+
+struct Codecs(Vec<Codec>);
+
+impl Deref for Codecs {
+ type Target = Vec<Codec>;
+
+ fn deref(&self) -> &Self::Target {
+ &self.0
+ }
+}
+
+static CODECS: Lazy<Codecs> = Lazy::new(|| {
+ let decoders = gst::ElementFactory::factories_with_type(
+ gst::ElementFactoryType::DECODER,
+ gst::Rank::Marginal,
+ );
+
+ Codecs(vec![
+ Codec::new("OPUS", gst::StreamType::AUDIO, &OPUS_CAPS, &decoders),
+ Codec::new("VP8", gst::StreamType::VIDEO, &VP8_CAPS, &decoders),
+ Codec::new("H264", gst::StreamType::VIDEO, &H264_CAPS, &decoders),
+ Codec::new("VP9", gst::StreamType::VIDEO, &VP9_CAPS, &decoders),
+ Codec::new("H265", gst::StreamType::VIDEO, &H265_CAPS, &decoders),
+ ])
+});
+
+struct Settings {
+ stun_server: Option<String>,
+ signaller: Signallable,
+ meta: Option<gst::Structure>,
+ video_codecs: Vec<Codec>,
+ audio_codecs: Vec<Codec>,
+}
+
+#[derive(Default)]
+pub struct WebRTCSrc {
+ settings: Mutex<Settings>,
+ n_video_pads: AtomicU16,
+ n_audio_pads: AtomicU16,
+ state: Mutex<State>,
+}
+
+#[glib::object_subclass]
+impl ObjectSubclass for WebRTCSrc {
+ const NAME: &'static str = "GstWebRTCSrc";
+ type Type = super::WebRTCSrc;
+ type ParentType = gst::Bin;
+ type Interfaces = (gst::URIHandler, gst::ChildProxy);
+}
+
+impl ObjectImpl for WebRTCSrc {
+ fn properties() -> &'static [glib::ParamSpec] {
+ static PROPS: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
+ vec![
+ glib::ParamSpecString::builder("stun-server")
+ .flags(glib::ParamFlags::READWRITE)
+ .default_value(DEFAULT_STUN_SERVER)
+ .build(),
+ glib::ParamSpecObject::builder::<Signallable>("signaller")
+ .flags(glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY)
+ .blurb("The Signallable object to use to handle WebRTC Signalling")
+ .build(),
+ glib::ParamSpecBoxed::builder::<gst::Structure>("meta")
+ .flags(glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY)
+ .blurb("Free form metadata about the consumer")
+ .build(),
+ gst::ParamSpecArray::builder("video-codecs")
+ .flags(glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY)
+ .blurb(&format!("Names of video codecs to be be used during the SDP negotiation. Valid values: [{}]", CODECS.iter().filter_map(|c|
+ if matches!(c.stream_type, gst::StreamType::VIDEO) {
+ Some(c.name.to_owned())
+ } else {
+ None
+ }
+ ).collect::<Vec<String>>().join(", ")
+ ))
+ .element_spec(&glib::ParamSpecString::builder("video-codec-name").build())
+ .build(),
+ gst::ParamSpecArray::builder("audio-codecs")
+ .flags(glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY)
+ .blurb(&format!("Names of audio codecs to be be used during the SDP negotiation. Valid values: [{}]", CODECS.iter().filter_map(|c|
+ if matches!(c.stream_type, gst::StreamType::AUDIO) {
+ Some(c.name.to_owned())
+ } else {
+ None
+ }
+ ).collect::<Vec<String>>().join(", ")
+ ))
+ .element_spec(&glib::ParamSpecString::builder("audio-codec-name").build())
+ .build(),
+ ]
+ });
+
+ PROPS.as_ref()
+ }
+
+ fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
+ match pspec.name() {
+ "signaller" => {
+ self.settings.lock().unwrap().signaller =
+ value.get::<Signallable>().expect("type checked upstream");
+ }
+ "video-codecs" => {
+ self.settings.lock().unwrap().video_codecs = value
+ .get::<gst::ArrayRef>()
+ .expect("Type checked upstream")
+ .as_slice()
+ .iter()
+ .filter_map(|codec_name| {
+ CODECS
+ .iter()
+ .find(|codec| {
+ codec.stream_type == gst::StreamType::VIDEO
+ && codec.name
+ == codec_name.get::<&str>().expect("Type checked upstream")
+ })
+ .cloned()
+ })
+ .collect::<Vec<Codec>>()
+ }
+ "audio-codecs" => {
+ self.settings.lock().unwrap().audio_codecs = value
+ .get::<gst::ArrayRef>()
+ .expect("Type checked upstream")
+ .as_slice()
+ .iter()
+ .filter_map(|codec_name| {
+ CODECS
+ .iter()
+ .find(|codec| {
+ codec.stream_type == gst::StreamType::AUDIO
+ && codec.name
+ == codec_name.get::<&str>().expect("Type checked upstream")
+ })
+ .cloned()
+ })
+ .collect::<Vec<Codec>>()
+ }
+ "stun-server" => {
+ self.settings.lock().unwrap().stun_server = value
+ .get::<Option<String>>()
+ .expect("type checked upstream")
+ }
+ "meta" => {
+ self.settings.lock().unwrap().meta = value
+ .get::<Option<gst::Structure>>()
+ .expect("type checked upstream")
+ }
+ _ => unimplemented!(),
+ }
+ }
+
+ fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
+ match pspec.name() {
+ "signaller" => self.settings.lock().unwrap().signaller.to_value(),
+ "video-codecs" => gst::Array::new(
+ self.settings
+ .lock()
+ .unwrap()
+ .video_codecs
+ .iter()
+ .map(|v| &v.name),
+ )
+ .to_value(),
+ "audio-codecs" => gst::Array::new(
+ self.settings
+ .lock()
+ .unwrap()
+ .audio_codecs
+ .iter()
+ .map(|v| &v.name),
+ )
+ .to_value(),
+ "stun-server" => self.settings.lock().unwrap().stun_server.to_value(),
+ "meta" => self.settings.lock().unwrap().meta.to_value(),
+ name => panic!("{} getter not implemented", name),
+ }
+ }
+
+ fn constructed(&self) {
+ self.parent_constructed();
+ let signaller = self.settings.lock().unwrap().signaller.clone();
+
+ self.connect_signaller(&signaller);
+
+ let obj = &*self.obj();
+
+ obj.set_suppressed_flags(gst::ElementFlags::SINK | gst::ElementFlags::SOURCE);
+ obj.set_element_flags(gst::ElementFlags::SOURCE);
+ }
+}
+
+impl Default for Settings {
+ fn default() -> Self {
+ let signaller = Signaller::default();
+
+ Self {
+ stun_server: DEFAULT_STUN_SERVER.map(|v| v.to_string()),
+ signaller: signaller.upcast(),
+ meta: Default::default(),
+ audio_codecs: CODECS
+ .iter()
+ .filter(|codec| {
+ matches!(codec.stream_type, gst::StreamType::AUDIO) && codec.has_decoder()
+ })
+ .cloned()
+ .collect(),
+ video_codecs: CODECS
+ .iter()
+ .filter(|codec| {
+ matches!(codec.stream_type, gst::StreamType::VIDEO) && codec.has_decoder()
+ })
+ .cloned()
+ .collect(),
+ }
+ }
+}
+
+#[allow(dead_code)]
+struct SignallerSignals {
+ error: glib::SignalHandlerId,
+ session_started: glib::SignalHandlerId,
+ session_ended: glib::SignalHandlerId,
+ request_meta: glib::SignalHandlerId,
+ session_description: glib::SignalHandlerId,
+ handle_ice: glib::SignalHandlerId,
+}
+
+impl WebRTCSrc {
+ fn webrtcbin(&self) -> gst::Bin {
+ let state = self.state.lock().unwrap();
+ let webrtcbin = state
+ .webrtcbin
+ .as_ref()
+ .expect("We should never call `.webrtcbin()` when state not > Ready")
+ .clone()
+ .downcast::<gst::Bin>()
+ .unwrap();
+
+ webrtcbin
+ }
+
+ fn signaller(&self) -> Signallable {
+ self.settings.lock().unwrap().signaller.clone()
+ }
+
+ // Maps the `webrtcbin` pad to our exposed source pad using the pad stream ID.
+ fn get_src_pad_from_webrtcbin_pad(&self, webrtcbin_src: &gst::Pad) -> Option<WebRTCSrcPad> {
+ self.get_stream_id(
+ Some(webrtcbin_src.property::<gst_webrtc::WebRTCRTPTransceiver>("transceiver")),
+ None,
+ )
+ .and_then(|stream_id| {
+ self.obj().iterate_src_pads().into_iter().find_map(|s| {
+ let pad = s.ok()?.downcast::<WebRTCSrcPad>().unwrap();
+ if pad.imp().stream_id() == stream_id {
+ Some(pad)
+ } else {
+ None
+ }
+ })
+ })
+ }
+
+ fn handle_webrtc_src_pad(&self, bin: &gst::Bin, pad: &gst::Pad) {
+ let srcpad = self.get_src_pad_from_webrtcbin_pad(pad);
+ if let Some(ref srcpad) = srcpad {
+ let stream_id = srcpad.imp().stream_id();
+ let mut builder = gst::event::StreamStart::builder(&stream_id);
+ if let Some(stream_start) = pad.sticky_event::<gst::event::StreamStart>(0) {
+ builder = builder
+ .seqnum(stream_start.seqnum())
+ .group_id(stream_start.group_id().unwrap_or_else(gst::GroupId::next));
+ }
+
+ gst::debug!(CAT, imp: self, "Storing id {stream_id} on {pad:?}");
+ pad.store_sticky_event(&builder.build()).ok();
+ }
+
+ let ghostpad = gst::GhostPad::builder(None, gst::PadDirection::Src)
+ .proxy_pad_chain_function(glib::clone!(@weak self as this => @default-panic, move
+ |pad, parent, buffer| {
+ let padret = gst::ProxyPad::chain_default(pad, parent, buffer);
+ let ret = this.state.lock().unwrap().flow_combiner.update_flow(padret);
+
+ ret
+ }
+ ))
+ .proxy_pad_event_function(glib::clone!(@weak self as this => @default-panic, move |pad, parent, event| {
+ let event = if let gst::EventView::StreamStart(stream_start) = event.view() {
+ let webrtcpad = pad.peer().unwrap();
+
+ this.get_src_pad_from_webrtcbin_pad(&webrtcpad)
+ .map(|srcpad| {
+ gst::event::StreamStart::builder(&srcpad.imp().stream_id())
+ .seqnum(stream_start.seqnum())
+ .group_id(stream_start.group_id().unwrap_or_else(gst::GroupId::next))
+ .build()
+ }).unwrap_or(event)
+ } else {
+ event
+ };
+
+ gst::Pad::event_default(pad, parent, event)
+ }))
+ .build_with_target(pad)
+ .unwrap();
+
+ bin.add_pad(&ghostpad)
+ .expect("Adding ghostpad to the bin should always work");
+
+ if let Some(srcpad) = srcpad {
+ if srcpad.imp().needs_decoding() {
+ let decodebin = gst::ElementFactory::make("decodebin3")
+ .build()
+ .expect("decodebin3 needs to be present!");
+ self.obj().add(&decodebin).unwrap();
+ decodebin.sync_state_with_parent().unwrap();
+ decodebin.connect_pad_added(
+ glib::clone!(@weak self as this, @weak srcpad => move |_webrtcbin, pad| {
+ if pad.direction() == gst::PadDirection::Sink {
+ return;
+ }
+
+ srcpad.set_target(Some(pad)).unwrap();
+ }),
+ );
+
+ gst::debug!(CAT, imp: self, "Decoding for {}", srcpad.imp().stream_id());
+ let sinkpad = decodebin
+ .static_pad("sink")
+ .expect("decodebin has a sink pad");
+ ghostpad
+ .link(&sinkpad)
+ .expect("webrtcbin ! decodebin3 linking failed");
+ } else {
+ gst::debug!(
+ CAT,
+ imp: self,
+ "NO decoding for {}",
+ srcpad.imp().stream_id()
+ );
+ srcpad.set_target(Some(&ghostpad)).unwrap();
+ }
+ } else {
+ gst::debug!(CAT, imp: self, "Unused webrtcbin pad {pad:?}");
+ }
+ }
+
+ fn prepare(&self) -> Result<(), Error> {
+ let webrtcbin = gst::ElementFactory::make("webrtcbin")
+ .property("bundle-policy", gst_webrtc::WebRTCBundlePolicy::MaxBundle)
+ .property(
+ "stun-server",
+ &self.settings.lock().unwrap().stun_server.to_value(),
+ )
+ .build()
+ .with_context(|| "Failed to make element webrtcbin".to_string())?;
+
+ let bin = gst::Bin::new(None);
+ bin.connect_pad_removed(glib::clone!(@weak self as this => move |_, pad|
+ this.state.lock().unwrap().flow_combiner.remove_pad(pad);
+ ));
+ bin.connect_pad_added(glib::clone!(@weak self as this => move |_, pad|
+ this.state.lock().unwrap().flow_combiner.add_pad(pad);
+ ));
+ webrtcbin.connect_pad_added(
+ glib::clone!(@weak self as this, @weak bin, => move |_webrtcbin, pad| {
+ if pad.direction() == gst::PadDirection::Sink {
+ return;
+ }
+
+ this.handle_webrtc_src_pad(&bin, pad);
+ }),
+ );
+
+ webrtcbin.connect_closure(
+ "on-ice-candidate",
+ false,
+ glib::closure!(@weak-allow-none self as this => move |
+ _webrtcbin: gst::Bin,
+ sdp_m_line_index: u32,
+ candidate: String| {
+ this.unwrap().on_ice_candidate(sdp_m_line_index, candidate);
+ }),
+ );
+
+ bin.add(&webrtcbin).unwrap();
+ self.obj().add(&bin).context("Could not add `webrtcbin`")?;
+
+ let mut state = self.state.lock().unwrap();
+ state.webrtcbin.replace(webrtcbin);
+
+ Ok(())
+ }
+
+ fn get_stream_id(
+ &self,
+ transceiver: Option<gst_webrtc::WebRTCRTPTransceiver>,
+ mline: Option<u32>,
+ ) -> Option<String> {
+ let mline = transceiver.map_or(mline, |t| Some(t.mlineindex()));
+
+ // Same logic as gst_pad_create_stream_id and friends, making a hash of
+ // the URI and adding `:<some-id>`, here the ID is the mline of the
+ // stream in the SDP.
+ mline.map(|mline| {
+ let mut cs = glib::Checksum::new(glib::ChecksumType::Sha256).unwrap();
+ cs.update(
+ self.uri()
+ .expect("get_stream_id should never be called if no URI has been set")
+ .as_bytes(),
+ );
+
+ format!("{}:{mline}", cs.string().unwrap())
+ })
+ }
+
+ fn unprepare(&self) -> Result<(), Error> {
+ gst::info!(CAT, imp: self, "unpreparing");
+
+ let obj = self.obj();
+ self.maybe_stop_signaller();
+ self.state.lock().unwrap().session_id = None;
+ for pad in obj.src_pads() {
+ obj.remove_pad(&pad)
+ .map_err(|err| anyhow::anyhow!("Couldn't remove pad? {err:?}"))?;
+ }
+
+ self.n_video_pads.store(0, Ordering::SeqCst);
+ self.n_audio_pads.store(0, Ordering::SeqCst);
+
+ Ok(())
+ }
+
+ fn connect_signaller(&self, signaller: &Signallable) {
+ let _ = self
+ .state
+ .lock()
+ .unwrap()
+ .signaller_signals
+ .insert(SignallerSignals {
+ error: signaller.connect_closure(
+ "error",
+ false,
+ glib::closure!(@to-owned self as this => move |
+ _signaller: glib::Object, error: String| {
+ gst::element_error!(
+ this.obj(),
+ gst::StreamError::Failed,
+ ["Signalling error: {}", error]
+ );
+ }),
+ ),
+
+ session_started: signaller.connect_closure(
+ "session-started",
+ false,
+ glib::closure!(@to-owned self as this => move |
+ _signaller: glib::Object,
+ session_id: &str,
+ _peer_id: &str| {
+ gst::info!(CAT, imp: this, "Session started: {session_id}");
+ this.state.lock().unwrap().session_id =
+ Some(session_id.to_string());
+ }),
+ ),
+
+ session_ended: signaller.connect_closure(
+ "session-ended",
+ false,
+ glib::closure!(@to-owned self as this => move |
+ _signaller: glib::Object, _peer_id: &str| {
+ gst::debug!(CAT, imp: this, "Session ended.");
+
+ this.state.lock().unwrap().session_id = None;
+ this.obj().iterate_src_pads().into_iter().for_each(|pad|
+ { if let Err(e) = pad.map(|pad| pad.push_event(gst::event::Eos::new())) {
+ gst::error!(CAT, "Could not send EOS: {e:?}");
+ }}
+ );
+ }),
+ ),
+
+ request_meta: signaller.connect_closure(
+ "request-meta",
+ false,
+ glib::closure!(@to-owned self as this => move |
+ _signaller: glib::Object| -> Option<gst::Structure> {
+ let meta = this.settings.lock().unwrap().meta.clone();
+
+ meta
+ }),
+ ),
+
+ session_description: signaller.connect_closure(
+ "session-description",
+ false,
+ glib::closure!(@to-owned self as this => move |
+ _signaller: glib::Object,
+ _peer_id: &str,
+ desc: &gst_webrtc::WebRTCSessionDescription| {
+ assert_eq!(desc.type_(), gst_webrtc::WebRTCSDPType::Offer);
+
+ this.handle_offer(desc);
+ }),
+ ),
+
+ // sdp_mid is exposed for future proofing, see
+ // https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/issues/1174,
+ // at the moment sdp_m_line_index must be Some
+ handle_ice: signaller.connect_closure(
+ "handle-ice",
+ false,
+ glib::closure!(@to-owned self as this => move |
+ _signaller: glib::Object,
+ peer_id: &str,
+ sdp_m_line_index: u32,
+ _sdp_mid: Option<String>,
+ candidate: &str| {
+ this.handle_ice(peer_id, Some(sdp_m_line_index), None, candidate);
+ }),
+ ),
+ });
+
+ // previous signals are disconnected when dropping the old structure
+ }
+
+ // Creates and adds our `WebRTCSrcPad` source pad, returning caps accepted
+ // downstream
+ fn create_and_probe_src_pad(&self, caps: &gst::Caps, stream_id: &str) -> Option<gst::Caps> {
+ gst::log!(CAT, "Creating pad for {caps:?}, stream: {stream_id}");
+
+ let obj = self.obj();
+ let media_type = caps
+ .structure(0)
+ .expect("Passing empty caps is invalid")
+ .get::<&str>("media")
+ .expect("Only caps with a `media` field are expected when creating the pad");
+
+ let (template, name, raw_caps) = if media_type == "video" {
+ (
+ obj.pad_template("video_%u").unwrap(),
+ format!("video_{}", self.n_video_pads.fetch_add(1, Ordering::SeqCst)),
+ VIDEO_CAPS.to_owned(),
+ )
+ } else if media_type == "audio" {
+ (
+ obj.pad_template("audio_%u").unwrap(),
+ format!("audio_{}", self.n_audio_pads.fetch_add(1, Ordering::SeqCst)),
+ AUDIO_CAPS.to_owned(),
+ )
+ } else {
+ gst::info!(CAT, imp: self, "Not an audio or video media {media_type:?}");
+
+ return None;
+ };
+
+ let caps_with_raw = [caps.clone(), raw_caps.clone()]
+ .into_iter()
+ .collect::<gst::Caps>();
+ let ghost = gst::GhostPad::builder_with_template(&template, Some(&name))
+ .build()
+ .downcast::<WebRTCSrcPad>()
+ .unwrap();
+ ghost.imp().set_stream_id(stream_id);
+ obj.add_pad(&ghost)
+ .expect("Adding ghost pad should never fail");
+
+ let downstream_caps = ghost.peer_query_caps(Some(&caps_with_raw));
+ if let Some(first_struct) = downstream_caps.structure(0) {
+ if first_struct.has_name(raw_caps.structure(0).unwrap().name()) {
+ ghost.imp().set_needs_decoding(true)
+ }
+ }
+
+ if ghost.imp().needs_decoding() {
+ Some(caps.clone())
+ } else {
+ Some(downstream_caps)
+ }
+ }
+
+ fn handle_offer(&self, offer: &gst_webrtc::WebRTCSessionDescription) {
+ gst::log!(CAT, imp: self, "Got offer {}", offer.sdp().to_string());
+
+ let sdp = offer.sdp();
+ let direction = gst_webrtc::WebRTCRTPTransceiverDirection::Recvonly;
+ let webrtcbin = self.webrtcbin();
+ for (i, media) in sdp.medias().enumerate() {
+ let all_caps_for_media = media
+ .formats()
+ .filter_map(|format| {
+ format.parse::<i32>().ok().and_then(|pt| {
+ let mut tmpcaps = media.caps_from_media(pt)?;
+ {
+ let tmpcaps = tmpcaps.get_mut().unwrap();
+
+ tmpcaps
+ .structure_mut(0)
+ .unwrap()
+ .set_name("application/x-rtp");
+
+ if let Err(err) = media.attributes_to_caps(tmpcaps) {
+ gst::error!(CAT, "Couldn't copy media attributes to caps: {err:?}")
+ }
+ }
+
+ Some(tmpcaps)
+ })
+ })
+ .collect::<Vec<gst::Caps>>();
+
+ let mut caps = gst::Caps::new_empty();
+ let settings = self.settings.lock().unwrap();
+ for codec in settings
+ .video_codecs
+ .iter()
+ .chain(settings.audio_codecs.iter())
+ {
+ for media_caps in &all_caps_for_media {
+ let encoding_name = media_caps
+ .structure(0)
+ .unwrap()
+ .get::<&str>("encoding-name")
+ .unwrap();
+ if encoding_name == codec.name {
+ caps.get_mut().unwrap().append(media_caps.clone());
+ }
+ }
+ }
+ drop(settings);
+
+ if !caps.is_empty() {
+ let stream_id = self.get_stream_id(None, Some(i as u32)).unwrap();
+ if let Some(caps) = self.create_and_probe_src_pad(&caps, &stream_id) {
+ gst::info!(
+ CAT,
+ imp: self,
+ "Adding transceiver for {stream_id} with caps: {caps:?}"
+ );
+ let transceiver = webrtcbin.emit_by_name::<gst_webrtc::WebRTCRTPTransceiver>(
+ "add-transceiver",
+ &[&direction, &caps],
+ );
+
+ transceiver.set_property("do_nack", true);
+ transceiver.set_property("fec-type", gst_webrtc::WebRTCFECType::UlpRed);
+ }
+ } else {
+ gst::info!(
+ CAT,
+ "Not using media: {media:#?} as it doesn't match our codec restrictions"
+ );
+ }
+ }
+
+ webrtcbin.emit_by_name::<()>("set-remote-description", &[&offer, &None::<gst::Promise>]);
+
+ let obj = self.obj();
+ obj.no_more_pads();
+
+ let promise =
+ gst::Promise::with_change_func(glib::clone!(@weak self as this => move |reply| {
+ this.on_answer_created(reply);
+ }
+ ));
+
+ webrtcbin.emit_by_name::<()>("create-answer", &[&None::<gst::Structure>, &promise]);
+ }
+
+ fn on_answer_created(&self, reply: Result<Option<&gst::StructureRef>, gst::PromiseError>) {
+ let reply = match reply {
+ Ok(Some(reply)) => {
+ if !reply.has_field_with_type(
+ "answer",
+ gst_webrtc::WebRTCSessionDescription::static_type(),
+ ) {
+ gst::element_error!(
+ self.obj(),
+ gst::StreamError::Failed,
+ ["create-answer::Promise returned with no reply"]
+ );
+ return;
+ } else if reply.has_field_with_type("error", glib::Error::static_type()) {
+ gst::element_error!(
+ self.obj(),
+ gst::LibraryError::Failed,
+ ["create-offer::Promise returned with error: {:?}", reply]
+ );
+ return;
+ }
+
+ reply
+ }
+ Ok(None) => {
+ gst::element_error!(
+ self.obj(),
+ gst::StreamError::Failed,
+ ["create-answer::Promise returned with no reply"]
+ );
+
+ return;
+ }
+ Err(err) => {
+ gst::element_error!(
+ self.obj(),
+ gst::LibraryError::Failed,
+ ["create-answer::Promise returned with error {:?}", err]
+ );
+
+ return;
+ }
+ };
+
+ let answer = reply
+ .value("answer")
+ .unwrap()
+ .get::<gst_webrtc::WebRTCSessionDescription>()
+ .expect("Invalid argument");
+
+ self.webrtcbin()
+ .emit_by_name::<()>("set-local-description", &[&answer, &None::<gst::Promise>]);
+
+ let session_id = {
+ let state = self.state.lock().unwrap();
+ match &state.session_id {
+ Some(id) => id.to_owned(),
+ _ => {
+ gst::element_error!(
+ self.obj(),
+ gst::StreamError::Failed,
+ ["Signalling error, no session started while requesting to send an SDP offer"]
+ );
+
+ return;
+ }
+ }
+ };
+
+ gst::log!(CAT, imp: self, "Sending SDP, {}", answer.sdp().to_string());
+ let signaller = self.signaller();
+ signaller.send_sdp(&session_id, &answer);
+ }
+
+ fn on_ice_candidate(&self, sdp_m_line_index: u32, candidate: String) {
+ let signaller = self.signaller();
+ let session_id = match self.state.lock().unwrap().session_id.as_ref() {
+ Some(id) => id.to_string(),
+ _ => {
+ gst::element_error!(
+ self.obj(),
+ gst::StreamError::Failed,
+ ["Signalling error, no session started while requesting to propose ice candidates"]
+ );
+
+ return;
+ }
+ };
+ signaller.add_ice(
+ &session_id,
+ &candidate,
+ Some(sdp_m_line_index),
+ None::<String>,
+ );
+ }
+
+ /// Called by the signaller with an ice candidate
+ fn handle_ice(
+ &self,
+ peer_id: &str,
+ sdp_m_line_index: Option<u32>,
+ _sdp_mid: Option<String>,
+ candidate: &str,
+ ) {
+ let sdp_m_line_index = match sdp_m_line_index {
+ Some(m_line) => m_line,
+ None => {
+ gst::error!(CAT, imp: self, "No mandatory mline");
+ return;
+ }
+ };
+ gst::log!(CAT, imp: self, "Got ice from {peer_id}: {candidate}");
+
+ self.webrtcbin()
+ .emit_by_name::<()>("add-ice-candidate", &[&sdp_m_line_index, &candidate]);
+ }
+
+ fn maybe_start_signaller(&self) {
+ let obj = self.obj();
+ let mut state = self.state.lock().unwrap();
+ if state.signaller_state == SignallerState::Stopped
+ && obj.current_state() >= gst::State::Paused
+ {
+ self.signaller().start();
+
+ gst::info!(CAT, imp: self, "Started signaller");
+ state.signaller_state = SignallerState::Started;
+ }
+ }
+
+ fn maybe_stop_signaller(&self) {
+ let mut state = self.state.lock().unwrap();
+ if state.signaller_state == SignallerState::Started {
+ self.signaller().stop();
+ state.signaller_state = SignallerState::Stopped;
+ gst::info!(CAT, imp: self, "Stopped signaller");
+ }
+ }
+}
+
+impl ElementImpl for WebRTCSrc {
+ fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
+ static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
+ gst::subclass::ElementMetadata::new(
+ "WebRTCSrc",
+ "Source/Network/WebRTC",
+ "WebRTC src",
+ "Thibault Saunier <tsaunier@igalia.com>",
+ )
+ });
+
+ Some(&*ELEMENT_METADATA)
+ }
+
+ fn pad_templates() -> &'static [gst::PadTemplate] {
+ static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
+ vec![
+ gst::PadTemplate::with_gtype(
+ "video_%u",
+ gst::PadDirection::Src,
+ gst::PadPresence::Sometimes,
+ &gst::Caps::builder_full()
+ .structure_with_any_features(VIDEO_CAPS.structure(0).unwrap().to_owned())
+ .structure(RTP_CAPS.structure(0).unwrap().to_owned())
+ .build(),
+ WebRTCSrcPad::static_type(),
+ )
+ .unwrap(),
+ gst::PadTemplate::with_gtype(
+ "audio_%u",
+ gst::PadDirection::Src,
+ gst::PadPresence::Sometimes,
+ &gst::Caps::builder_full()
+ .structure_with_any_features(AUDIO_CAPS.structure(0).unwrap().to_owned())
+ .structure(OPUS_CAPS.structure(0).unwrap().to_owned())
+ .structure(RTP_CAPS.structure(0).unwrap().to_owned())
+ .build(),
+ WebRTCSrcPad::static_type(),
+ )
+ .unwrap(),
+ ]
+ });
+
+ PAD_TEMPLATES.as_ref()
+ }
+
+ fn change_state(
+ &self,
+ transition: gst::StateChange,
+ ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
+ let obj = &*self.obj();
+ if let gst::StateChange::NullToReady = transition {
+ if let Err(err) = self.prepare() {
+ gst::element_error!(
+ obj,
+ gst::StreamError::Failed,
+ ["Failed to prepare: {}", err]
+ );
+ return Err(gst::StateChangeError);
+ }
+ }
+
+ let mut ret = self.parent_change_state(transition);
+
+ match transition {
+ gst::StateChange::PausedToReady => {
+ if let Err(err) = self.unprepare() {
+ gst::element_error!(
+ obj,
+ gst::StreamError::Failed,
+ ["Failed to unprepare: {}", err]
+ );
+ return Err(gst::StateChangeError);
+ }
+ }
+ gst::StateChange::ReadyToPaused => {
+ ret = Ok(gst::StateChangeSuccess::NoPreroll);
+ }
+ gst::StateChange::PlayingToPaused => {
+ ret = Ok(gst::StateChangeSuccess::NoPreroll);
+ }
+ gst::StateChange::PausedToPlaying => {
+ self.maybe_start_signaller();
+ }
+ _ => (),
+ }
+
+ ret
+ }
+}
+
+impl GstObjectImpl for WebRTCSrc {}
+
+impl BinImpl for WebRTCSrc {}
+
+impl ChildProxyImpl for WebRTCSrc {
+ fn child_by_index(&self, index: u32) -> Option<glib::Object> {
+ if index == 0 {
+ Some(self.signaller().upcast())
+ } else {
+ None
+ }
+ }
+
+ fn children_count(&self) -> u32 {
+ 1
+ }
+
+ fn child_by_name(&self, name: &str) -> Option<glib::Object> {
+ match name {
+ "signaller" => {
+ gst::info!(CAT, imp: self, "Getting signaller");
+ Some(self.signaller().upcast())
+ }
+ _ => None,
+ }
+ }
+}
+
+impl URIHandlerImpl for WebRTCSrc {
+ const URI_TYPE: gst::URIType = gst::URIType::Src;
+
+ fn protocols() -> &'static [&'static str] {
+ &["gstwebrtc", "gstwebrtcs"]
+ }
+
+ fn uri(&self) -> Option<String> {
+ self.signaller().property::<Option<String>>("uri")
+ }
+
+ fn set_uri(&self, uri: &str) -> Result<(), glib::Error> {
+ let uri = Url::from_str(uri)
+ .map_err(|err| glib::Error::new(gst::URIError::BadUri, &format!("{:?}", err)))?;
+
+ let socket_scheme = match uri.scheme() {
+ "gstwebrtc" => Ok("ws"),
+ "gstwebrtcs" => Ok("wss"),
+ _ => Err(glib::Error::new(
+ gst::URIError::BadUri,
+ &format!("Invalid protocol: {}", uri.scheme()),
+ )),
+ }?;
+
+ let mut url_str = uri.to_string();
+
+ // Not using `set_scheme()` because it doesn't work with `http`
+ // See https://github.com/servo/rust-url/pull/768 for a PR implementing that
+ url_str.replace_range(0..uri.scheme().len(), socket_scheme);
+
+ self.signaller().set_property("uri", &url_str);
+
+ Ok(())
+ }
+}
+
+#[derive(PartialEq)]
+enum SignallerState {
+ Started,
+ Stopped,
+}
+
+struct State {
+ session_id: Option<String>,
+ signaller_state: SignallerState,
+ webrtcbin: Option<gst::Element>,
+ flow_combiner: gst_base::UniqueFlowCombiner,
+ signaller_signals: Option<SignallerSignals>,
+}
+
+impl Default for State {
+ fn default() -> Self {
+ Self {
+ signaller_state: SignallerState::Stopped,
+ session_id: None,
+ webrtcbin: None,
+ flow_combiner: Default::default(),
+ signaller_signals: Default::default(),
+ }
+ }
+}
diff --git a/net/webrtc/src/webrtcsrc/mod.rs b/net/webrtc/src/webrtcsrc/mod.rs
new file mode 100644
index 00000000..c869ebcd
--- /dev/null
+++ b/net/webrtc/src/webrtcsrc/mod.rs
@@ -0,0 +1,65 @@
+// SPDX-License-Identifier: MPL-2.0
+
+use crate::webrtcsrc::signaller::WebRTCSignallerRole;
+use gst::prelude::*;
+use gst::{glib, prelude::StaticType};
+
+/**
+ * element-webrtcsrc:
+ *
+ * `webrtcsrc` is the source counterpart of the #webrtcsink element and can be
+ * used to receive streams from it, it can also be used to easily playback WebRTC
+ * streams coming from a web browser.
+ *
+ * To try the element, you should run #webrtcsink as described in its documentation,
+ * finding its `peer-id` (in the signalling server logs for example) and then
+ * run:
+ *
+ * ``` bash
+ * gst-launch-1.0 webrtcsrc signaller::producer-peer-id=<webrtcsink-peer-id> ! videoconvert ! autovideosink
+ * ```
+ *
+ * or directly using `playbin`:
+ *
+ * ``` bash
+ * gst-launch-1.0 playbin3 uri="gstwebrtc://localhost:8443?peer-id=<webrtcsink-peer-id>"
+ * ```
+ *
+ * ## Decoding
+ *
+ * To be able to precisely negotiate the WebRTC SDP, `webrtcsrc` is able to decode streams.
+ * During SDP negotiation we expose our pads based on the peer offer and right after query caps
+ * to see what downstream supports.
+ * In practice in `uridecodebinX` or `playbinX`, decoding will happen
+ * in `decodebinX` but for the case where a `videoconvert` is placed after a `video_XX` pad,
+ * decoding will happen inside `webrtcsrc`.
+ *
+ * Since: 0.10
+ */
+mod imp;
+mod pad;
+pub mod signaller;
+
+pub use signaller::{SignallableImpl, SignallableImplExt};
+
+use self::signaller::Signallable;
+
+glib::wrapper! {
+ pub struct WebRTCSrc(ObjectSubclass<imp::WebRTCSrc>) @extends gst::Bin, gst::Element, gst::Object, @implements gst::URIHandler, gst::ChildProxy;
+}
+
+glib::wrapper! {
+ pub struct WebRTCSrcPad(ObjectSubclass<pad::WebRTCSrcPad>) @extends gst::GhostPad, gst::ProxyPad, gst::Pad, gst::Object;
+}
+
+pub fn register(plugin: Option<&gst::Plugin>) -> Result<(), glib::BoolError> {
+ WebRTCSignallerRole::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty());
+ WebRTCSrcPad::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty());
+ Signallable::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty());
+ gst::Element::register(
+ plugin,
+ "webrtcsrc",
+ gst::Rank::Primary,
+ WebRTCSrc::static_type(),
+ )
+}
diff --git a/net/webrtc/src/webrtcsrc/pad.rs b/net/webrtc/src/webrtcsrc/pad.rs
new file mode 100644
index 00000000..a8b0769f
--- /dev/null
+++ b/net/webrtc/src/webrtcsrc/pad.rs
@@ -0,0 +1,45 @@
+// SPDX-License-Identifier: MPL-2.0
+
+use gst::glib;
+use gst::subclass::prelude::*;
+use std::sync::atomic::AtomicBool;
+use std::sync::atomic::Ordering;
+use std::sync::Mutex;
+
+#[derive(Default)]
+pub struct WebRTCSrcPad {
+ needs_raw: AtomicBool,
+ stream_id: Mutex<Option<String>>,
+}
+
+impl WebRTCSrcPad {
+ pub fn set_needs_decoding(&self, raw_wanted: bool) {
+ self.needs_raw.store(raw_wanted, Ordering::SeqCst);
+ }
+
+ pub fn needs_decoding(&self) -> bool {
+ self.needs_raw.load(Ordering::SeqCst)
+ }
+
+ pub fn set_stream_id(&self, stream_id: &str) {
+ *self.stream_id.lock().unwrap() = Some(stream_id.to_string());
+ }
+
+ pub fn stream_id(&self) -> String {
+ let stream_id = self.stream_id.lock().unwrap();
+ stream_id.as_ref().unwrap().clone()
+ }
+}
+
+#[glib::object_subclass]
+impl ObjectSubclass for WebRTCSrcPad {
+ const NAME: &'static str = "GstWebRTCSrcPad";
+ type Type = super::WebRTCSrcPad;
+ type ParentType = gst::GhostPad;
+}
+
+impl ObjectImpl for WebRTCSrcPad {}
+impl GstObjectImpl for WebRTCSrcPad {}
+impl PadImpl for WebRTCSrcPad {}
+impl ProxyPadImpl for WebRTCSrcPad {}
+impl GhostPadImpl for WebRTCSrcPad {}
diff --git a/net/webrtc/src/webrtcsrc/signaller/iface.rs b/net/webrtc/src/webrtcsrc/signaller/iface.rs
new file mode 100644
index 00000000..5da1bed2
--- /dev/null
+++ b/net/webrtc/src/webrtcsrc/signaller/iface.rs
@@ -0,0 +1,426 @@
+use gst::glib;
+use gst::glib::subclass::*;
+use gst::prelude::*;
+use gst::subclass::prelude::*;
+use once_cell::sync::Lazy;
+
+#[derive(Copy, Clone)]
+pub struct Signallable {
+ _parent: glib::gobject_ffi::GTypeInterface,
+ pub start: fn(&super::Signallable),
+ pub stop: fn(&super::Signallable),
+ pub send_sdp: fn(&super::Signallable, &str, &gst_webrtc::WebRTCSessionDescription),
+ pub add_ice: fn(&super::Signallable, &str, &str, Option<u32>, Option<String>),
+ pub end_session: fn(&super::Signallable, &str),
+}
+
+impl Signallable {
+ fn request_meta(_iface: &super::Signallable) -> Option<gst::Structure> {
+ None
+ }
+ fn start(_iface: &super::Signallable) {}
+ fn stop(_iface: &super::Signallable) {}
+ fn send_sdp(
+ _iface: &super::Signallable,
+ _session_id: &str,
+ _sdp: &gst_webrtc::WebRTCSessionDescription,
+ ) {
+ }
+ fn add_ice(
+ _iface: &super::Signallable,
+ _session_id: &str,
+ _candidate: &str,
+ _sdp_m_line_index: Option<u32>,
+ _sdp_mid: Option<String>,
+ ) {
+ }
+ fn end_session(_iface: &super::Signallable, _session_id: &str) {}
+}
+
+#[glib::object_interface]
+unsafe impl prelude::ObjectInterface for Signallable {
+ const NAME: &'static ::std::primitive::str = "GstRSWebRTCSignallableIface";
+ type Prerequisites = (glib::Object,);
+
+ fn interface_init(&mut self) {
+ self.start = Signallable::start;
+ self.stop = Signallable::stop;
+ self.send_sdp = Signallable::send_sdp;
+ self.add_ice = Signallable::add_ice;
+ self.end_session = Signallable::end_session;
+ }
+
+ fn signals() -> &'static [Signal] {
+ static SIGNALS: Lazy<Vec<Signal>> = Lazy::new(|| {
+ vec![
+ /**
+ * GstRSWebRTCSignallableIface::session-ended:
+ * @self: The object implementing #GstRSWebRTCSignallableIface
+ * @session-id: The ID of the session that ended
+ *
+ * Some WebRTC Session was closed.
+ */
+ Signal::builder("session-ended")
+ .param_types([str::static_type()])
+ .build(),
+ /**
+ * GstRSWebRTCSignallableIface::producer-added:
+ * @self: The object implementing #GstRSWebRTCSignallableIface
+ * @producer_id: The ID of the producer that was added
+ * @meta: The metadata structure of the producer
+ *
+ * Some new producing peer is ready to produce a WebRTC stream.
+ */
+ Signal::builder("producer-added")
+ .param_types([str::static_type(), <Option<gst::Structure>>::static_type()])
+ .build(),
+ /**
+ * GstRSWebRTCSignallableIface::producer-removed:
+ * @self: The object implementing #GstRSWebRTCSignallableIface
+ * @producer_id: The ID of the producer that was added
+ * @meta: The metadata structure of the producer
+ *
+ * Some new producing peer is stopped producing streams.
+ */
+ Signal::builder("producer-removed")
+ .param_types([str::static_type(), <Option<gst::Structure>>::static_type()])
+ .build(),
+ /**
+ * GstRSWebRTCSignallableIface::session-started:
+ * @self: The object implementing #GstRSWebRTCSignallableIface
+ *
+ * A new session started,
+ */
+ Signal::builder("session-started")
+ .param_types([str::static_type(), str::static_type()])
+ .build(),
+ /**
+ * GstRSWebRTCSignallableIface::session-requested:
+ * @self: The object implementing #GstRSWebRTCSignallableIface
+ * @session_id: The ID of the producer that was added
+ * @peer_id: The ID of the consumer peer who wants to initiate a
+ * session
+ */
+ Signal::builder("session-requested")
+ .param_types([str::static_type(), str::static_type()])
+ .build(),
+ /**
+ * GstRSWebRTCSignallableIface::error:
+ * @self: The object implementing #GstRSWebRTCSignallableIface
+ * @error: The error message as a string
+ */
+ Signal::builder("error")
+ .param_types([str::static_type()])
+ .build(),
+ /**
+ * GstRSWebRTCSignallableIface::request-meta:
+ * @self: The object implementing #GstRSWebRTCSignallableIface
+ *
+ * The signaller requests a meta about the peer using it
+ *
+ * Return: The metadata about the peer represented by the signaller
+ */
+ Signal::builder("request-meta")
+ .return_type::<Option<gst::Structure>>()
+ .class_handler(|_token, args| {
+ let arg0 = args[0usize]
+ .get::<&super::Signallable>()
+ .unwrap_or_else(|e| {
+ panic!("Wrong type for argument {}: {:?}", 0usize, e)
+ });
+ Some(Signallable::request_meta(arg0).to_value())
+ })
+ .build(),
+ /**
+ * GstRSWebRTCSignallableIface::handle-ice:
+ * @self: The object implementing #GstRSWebRTCSignallableIface
+ * @session_id: Id of the session the ice information is about
+ * @sdp_m_line_index: The mlineindex of the ice candidate
+ * @sdp_mid: Media ID of the ice candidate
+ * @candiate: Information about the candidate
+ */
+ Signal::builder("handle-ice")
+ .param_types([
+ str::static_type(),
+ u32::static_type(),
+ <Option<String>>::static_type(),
+ str::static_type(),
+ ])
+ .build(),
+ /**
+ * GstRSWebRTCSignallableIface::session-description:
+ * @self: The object implementing #GstRSWebRTCSignallableIface
+ * @session_id: Id of the session being described
+ * @description: The WebRTC session description
+ */
+ Signal::builder("session-description")
+ .param_types([
+ str::static_type(),
+ gst_webrtc::WebRTCSessionDescription::static_type(),
+ ])
+ .build(),
+ /**
+ * GstRSWebRTCSignallableIface::start:
+ * @self: The object implementing #GstRSWebRTCSignallableIface
+ *
+ * Starts the signaller, connecting it to the signalling server.
+ */
+ Signal::builder("start")
+ .flags(glib::SignalFlags::ACTION)
+ .class_handler(|_token, args| {
+ let arg0 = args[0usize]
+ .get::<&super::Signallable>()
+ .unwrap_or_else(|e| {
+ panic!("Wrong type for argument {}: {:?}", 0usize, e)
+ });
+ Signallable::start(arg0);
+
+ None
+ })
+ .build(),
+ /**
+ * GstRSWebRTCSignallableIface::stop:
+ * @self: The object implementing #GstRSWebRTCSignallableIface
+ *
+ * Stops the signaller, disconnecting it to the signalling server.
+ */
+ Signal::builder("stop")
+ .flags(glib::SignalFlags::ACTION)
+ .class_handler(|_tokens, args| {
+ let arg0 = args[0usize]
+ .get::<&super::Signallable>()
+ .unwrap_or_else(|e| {
+ panic!("Wrong type for argument {}: {:?}", 0usize, e)
+ });
+ Signallable::stop(arg0);
+
+ None
+ })
+ .build(),
+ ]
+ });
+ SIGNALS.as_ref()
+ }
+}
+
+unsafe impl<Obj: SignallableImpl> types::IsImplementable<Obj> for super::Signallable
+where
+ <Obj as types::ObjectSubclass>::Type: glib::IsA<glib::Object>,
+{
+ fn interface_init(iface: &mut glib::Interface<Self>) {
+ let iface = ::std::convert::AsMut::as_mut(iface);
+
+ fn vstart_trampoline<Obj: types::ObjectSubclass + SignallableImpl>(
+ obj: &super::Signallable,
+ ) {
+ let this = obj
+ .dynamic_cast_ref::<<Obj as types::ObjectSubclass>::Type>()
+ .unwrap()
+ .imp();
+ SignallableImpl::start(this)
+ }
+ iface.start = vstart_trampoline::<Obj>;
+
+ fn vstop_trampoline<Obj: types::ObjectSubclass + SignallableImpl>(
+ this: &super::Signallable,
+ ) {
+ let this = this
+ .dynamic_cast_ref::<<Obj as types::ObjectSubclass>::Type>()
+ .unwrap();
+ SignallableImpl::stop(this.imp())
+ }
+ iface.stop = vstop_trampoline::<Obj>;
+
+ fn send_sdp_trampoline<Obj: types::ObjectSubclass + SignallableImpl>(
+ this: &super::Signallable,
+ session_id: &str,
+ sdp: &gst_webrtc::WebRTCSessionDescription,
+ ) {
+ let this = this
+ .dynamic_cast_ref::<<Obj as types::ObjectSubclass>::Type>()
+ .unwrap();
+ SignallableImpl::send_sdp(this.imp(), session_id, sdp)
+ }
+ iface.send_sdp = send_sdp_trampoline::<Obj>;
+
+ fn add_ice_trampoline<Obj: types::ObjectSubclass + SignallableImpl>(
+ this: &super::Signallable,
+ session_id: &str,
+ candidate: &str,
+ sdp_m_line_index: Option<u32>,
+ sdp_mid: Option<String>,
+ ) {
+ let this = this
+ .dynamic_cast_ref::<<Obj as types::ObjectSubclass>::Type>()
+ .unwrap();
+ SignallableImpl::add_ice(this.imp(), session_id, candidate, sdp_m_line_index, sdp_mid)
+ }
+ iface.add_ice = add_ice_trampoline::<Obj>;
+
+ fn end_session_trampoline<Obj: types::ObjectSubclass + SignallableImpl>(
+ this: &super::Signallable,
+ session_id: &str,
+ ) {
+ let this = this
+ .dynamic_cast_ref::<<Obj as types::ObjectSubclass>::Type>()
+ .unwrap();
+ SignallableImpl::end_session(this.imp(), session_id)
+ }
+ iface.end_session = end_session_trampoline::<Obj>;
+ }
+}
+
+pub trait SignallableImpl: object::ObjectImpl + 'static {
+ fn start(&self) {
+ SignallableImplExt::parent_vstart(self)
+ }
+ fn stop(&self) {
+ SignallableImplExt::parent_vstop(self)
+ }
+ fn send_sdp(&self, session_id: &str, sdp: &gst_webrtc::WebRTCSessionDescription) {
+ SignallableImplExt::parent_send_sdp(self, session_id, sdp)
+ }
+ fn add_ice(
+ &self,
+ session_id: &str,
+ candidate: &str,
+ sdp_m_line_index: Option<u32>,
+ sdp_mid: Option<String>,
+ ) {
+ SignallableImplExt::parent_add_ice(self, session_id, candidate, sdp_m_line_index, sdp_mid)
+ }
+ fn end_session(&self, session_id: &str) {
+ SignallableImplExt::parent_end_session(self, session_id)
+ }
+}
+
+pub trait SignallableImplExt: types::ObjectSubclass {
+ fn parent_vstart(&self);
+ fn parent_vstop(&self);
+ fn parent_send_sdp(&self, session_id: &str, sdp: &gst_webrtc::WebRTCSessionDescription);
+ fn parent_add_ice(
+ &self,
+ session_id: &str,
+ candidate: &str,
+ sdp_m_line_index: Option<u32>,
+ sdp_mid: Option<String>,
+ );
+ fn parent_end_session(&self, session_id: &str);
+}
+
+type ClassType = *mut <super::Signallable as glib::object::ObjectType>::GlibClassType;
+impl<Obj: SignallableImpl> SignallableImplExt for Obj {
+ fn parent_vstart(&self) {
+ let obj = self.obj();
+ let obj = unsafe { obj.unsafe_cast_ref::<super::Signallable>() };
+ let vtable = unsafe {
+ &*(Self::type_data()
+ .as_ref()
+ .parent_interface::<super::Signallable>() as ClassType)
+ };
+ (vtable.start)(obj)
+ }
+ fn parent_vstop(&self) {
+ let obj = self.obj();
+ let obj = unsafe { obj.unsafe_cast_ref::<super::Signallable>() };
+ let vtable = unsafe {
+ &*(Self::type_data()
+ .as_ref()
+ .parent_interface::<super::Signallable>() as ClassType)
+ };
+ (vtable.stop)(obj)
+ }
+ fn parent_send_sdp(&self, session_id: &str, sdp: &gst_webrtc::WebRTCSessionDescription) {
+ let obj = self.obj();
+ let obj = unsafe { obj.unsafe_cast_ref::<super::Signallable>() };
+ let vtable = unsafe {
+ &*(Self::type_data()
+ .as_ref()
+ .parent_interface::<super::Signallable>() as ClassType)
+ };
+ (vtable.send_sdp)(obj, session_id, sdp)
+ }
+ fn parent_add_ice(
+ &self,
+ session_id: &str,
+ candidate: &str,
+ sdp_m_line_index: Option<u32>,
+ sdp_mid: Option<String>,
+ ) {
+ let obj = self.obj();
+ let obj = unsafe { obj.unsafe_cast_ref::<super::Signallable>() };
+ let vtable = unsafe {
+ &*(Self::type_data()
+ .as_ref()
+ .parent_interface::<super::Signallable>() as ClassType)
+ };
+ (vtable.add_ice)(obj, session_id, candidate, sdp_m_line_index, sdp_mid)
+ }
+ fn parent_end_session(&self, session_id: &str) {
+ let obj = self.obj();
+ let obj = unsafe { obj.unsafe_cast_ref::<super::Signallable>() };
+ let vtable = unsafe {
+ &*(Self::type_data()
+ .as_ref()
+ .parent_interface::<super::Signallable>() as ClassType)
+ };
+ (vtable.end_session)(obj, session_id)
+ }
+}
+
+pub trait SignallableExt: 'static {
+ fn start(&self);
+ fn stop(&self);
+ fn send_sdp(&self, session_id: &str, sdp: &gst_webrtc::WebRTCSessionDescription);
+ fn add_ice(
+ &self,
+ session_id: &str,
+ candidate: &str,
+ sdp_m_line_index: Option<u32>,
+ sdp_mid: Option<String>,
+ );
+ fn end_session(&self, session_id: &str);
+}
+
+impl<Obj: glib::IsA<super::Signallable>> SignallableExt for Obj {
+ fn start(&self) {
+ let obj = self.upcast_ref::<super::Signallable>();
+ let vtable = obj.interface::<super::Signallable>().unwrap();
+ let vtable = vtable.as_ref();
+ (vtable.start)(obj)
+ }
+
+ fn stop(&self) {
+ let obj = self.upcast_ref::<super::Signallable>();
+ let vtable = obj.interface::<super::Signallable>().unwrap();
+ let vtable = vtable.as_ref();
+ (vtable.stop)(obj)
+ }
+
+ fn send_sdp(&self, session_id: &str, sdp: &gst_webrtc::WebRTCSessionDescription) {
+ let obj = self.upcast_ref::<super::Signallable>();
+ let vtable = obj.interface::<super::Signallable>().unwrap();
+ let vtable = vtable.as_ref();
+ (vtable.send_sdp)(obj, session_id, sdp)
+ }
+
+ fn add_ice(
+ &self,
+ session_id: &str,
+ candidate: &str,
+ sdp_m_line_index: Option<u32>,
+ sdp_mid: Option<String>,
+ ) {
+ let obj = self.upcast_ref::<super::Signallable>();
+ let vtable = obj.interface::<super::Signallable>().unwrap();
+ let vtable = vtable.as_ref();
+ (vtable.add_ice)(obj, session_id, candidate, sdp_m_line_index, sdp_mid)
+ }
+
+ fn end_session(&self, session_id: &str) {
+ let obj = self.upcast_ref::<super::Signallable>();
+ let vtable = obj.interface::<super::Signallable>().unwrap();
+ let vtable = vtable.as_ref();
+ (vtable.end_session)(obj, session_id)
+ }
+}
diff --git a/net/webrtc/src/webrtcsrc/signaller/imp.rs b/net/webrtc/src/webrtcsrc/signaller/imp.rs
new file mode 100644
index 00000000..bab44357
--- /dev/null
+++ b/net/webrtc/src/webrtcsrc/signaller/imp.rs
@@ -0,0 +1,586 @@
+use crate::utils::{gvalue_to_json, serialize_json_object};
+use crate::webrtcsrc::signaller::{prelude::*, Signallable};
+use anyhow::{anyhow, Error};
+use async_std::{future::timeout, task};
+use async_tungstenite::tungstenite::Message as WsMessage;
+use futures::channel::mpsc;
+use futures::prelude::*;
+use gst::glib;
+use gst::glib::prelude::*;
+use gst::subclass::prelude::*;
+use gst_plugin_webrtc_protocol as p;
+use once_cell::sync::Lazy;
+use std::collections::HashSet;
+use std::ops::ControlFlow;
+use std::str::FromStr;
+use std::sync::Mutex;
+use std::time::Duration;
+use url::Url;
+
+use super::CAT;
+
+#[derive(Debug, Eq, PartialEq, Clone, Copy, glib::Enum, Default)]
+#[repr(u32)]
+#[enum_type(name = "GstRSWebRTCSignallerRole")]
+pub enum WebRTCSignallerRole {
+ #[default]
+ Consumer,
+ Producer,
+ Listener,
+}
+
+pub struct Settings {
+ uri: Url,
+ producer_peer_id: Option<String>,
+ cafile: Option<String>,
+ role: WebRTCSignallerRole,
+}
+
+impl Default for Settings {
+ fn default() -> Self {
+ Self {
+ uri: Url::from_str("ws://127.0.0.1:8443").unwrap(),
+ producer_peer_id: None,
+ cafile: Default::default(),
+ role: Default::default(),
+ }
+ }
+}
+
+#[derive(Default)]
+pub struct Signaller {
+ state: Mutex<State>,
+ settings: Mutex<Settings>,
+}
+
+#[derive(Default)]
+struct State {
+ /// Sender for the websocket messages
+ websocket_sender: Option<mpsc::Sender<p::IncomingMessage>>,
+ send_task_handle: Option<task::JoinHandle<Result<(), Error>>>,
+ receive_task_handle: Option<task::JoinHandle<()>>,
+ producers: HashSet<String>,
+}
+
+impl Signaller {
+ fn uri(&self) -> Url {
+ self.settings.lock().unwrap().uri.clone()
+ }
+
+ fn set_uri(&self, uri: &str) -> Result<(), Error> {
+ let mut settings = self.settings.lock().unwrap();
+ let mut uri = Url::from_str(uri).map_err(|err| anyhow!("{err:?}"))?;
+
+ if let Some(peer_id) = uri
+ .query_pairs()
+ .find(|(k, _)| k == "peer-id")
+ .map(|v| v.1.to_string())
+ {
+ if !matches!(settings.role, WebRTCSignallerRole::Consumer) {
+ gst::warning!(
+ CAT,
+ "Setting peer-id doesn't make sense for {:?}",
+ settings.role
+ );
+ } else {
+ settings.producer_peer_id = Some(peer_id);
+ }
+ }
+
+ if let Some(peer_id) = &settings.producer_peer_id {
+ uri.query_pairs_mut()
+ .clear()
+ .append_pair("peer-id", peer_id);
+ }
+
+ settings.uri = uri;
+
+ Ok(())
+ }
+
+ async fn connect(&self) -> Result<(), Error> {
+ let obj = self.obj();
+
+ let role = self.settings.lock().unwrap().role;
+ if let super::WebRTCSignallerRole::Consumer = role {
+ self.producer_peer_id()
+ .ok_or_else(|| anyhow!("No target producer peer id set"))?;
+ }
+
+ let connector = if let Some(path) = obj.property::<Option<String>>("cafile") {
+ let cert_content = async_std::fs::read_to_string(&path).await?;
+ let cert = async_native_tls::Certificate::from_pem(cert_content.as_bytes())?;
+ let connector = async_native_tls::TlsConnector::new();
+ Some(connector.add_root_certificate(cert))
+ } else {
+ None
+ };
+
+ let mut uri = self.uri();
+ uri.set_query(None);
+ let (ws, _) = timeout(
+ // FIXME: Make the timeout configurable
+ Duration::from_secs(20),
+ async_tungstenite::async_std::connect_async_with_tls_connector(
+ uri.to_string(),
+ connector,
+ ),
+ )
+ .await??;
+
+ gst::info!(CAT, imp: self, "connected");
+
+ // Channel for asynchronously sending out websocket message
+ let (mut ws_sink, mut ws_stream) = ws.split();
+
+ // 1000 is completely arbitrary, we simply don't want infinite piling
+ // up of messages as with unbounded
+ let (websocket_sender, mut websocket_receiver) = mpsc::channel::<p::IncomingMessage>(1000);
+ let send_task_handle =
+ task::spawn(glib::clone!(@weak-allow-none self as this => async move {
+ while let Some(msg) = websocket_receiver.next().await {
+ gst::log!(CAT, "Sending websocket message {:?}", msg);
+ ws_sink
+ .send(WsMessage::Text(serde_json::to_string(&msg).unwrap()))
+ .await?;
+ }
+
+ let msg = "Done sending";
+ this.map_or_else(|| gst::info!(CAT, "{msg}"),
+ |this| gst::info!(CAT, imp: this, "{msg}")
+ );
+
+ ws_sink.send(WsMessage::Close(None)).await?;
+ ws_sink.close().await?;
+
+ Ok::<(), Error>(())
+ }));
+
+ let obj = self.obj();
+ let meta =
+ if let Some(meta) = obj.emit_by_name::<Option<gst::Structure>>("request-meta", &[]) {
+ gvalue_to_json(&meta.to_value())
+ } else {
+ None
+ };
+
+ let receive_task_handle =
+ task::spawn(glib::clone!(@weak-allow-none self as this => async move {
+ while let Some(msg) = async_std::stream::StreamExt::next(&mut ws_stream).await {
+ if let Some(ref this) = this {
+ if let ControlFlow::Break(_) = this.handle_message(msg, &meta) {
+ break;
+ }
+ } else {
+ break;
+ }
+ }
+
+ let msg = "Stopped websocket receiving";
+ this.map_or_else(|| gst::info!(CAT, "{msg}"),
+ |this| gst::info!(CAT, imp: this, "{msg}")
+ );
+ }));
+
+ let mut state = self.state.lock().unwrap();
+ state.websocket_sender = Some(websocket_sender);
+ state.send_task_handle = Some(send_task_handle);
+ state.receive_task_handle = Some(receive_task_handle);
+
+ Ok(())
+ }
+
+ fn set_status(&self, meta: &Option<serde_json::Value>, peer_id: &str) {
+ let role = self.settings.lock().unwrap().role;
+ self.send(p::IncomingMessage::SetPeerStatus(match role {
+ super::WebRTCSignallerRole::Consumer => p::PeerStatus {
+ meta: meta.clone(),
+ peer_id: Some(peer_id.to_string()),
+ roles: vec![],
+ },
+ super::WebRTCSignallerRole::Producer => p::PeerStatus {
+ meta: meta.clone(),
+ peer_id: Some(peer_id.to_string()),
+ roles: vec![p::PeerRole::Producer],
+ },
+ super::WebRTCSignallerRole::Listener => p::PeerStatus {
+ meta: meta.clone(),
+ peer_id: Some(peer_id.to_string()),
+ roles: vec![p::PeerRole::Listener],
+ },
+ }));
+ }
+
+ fn producer_peer_id(&self) -> Option<String> {
+ let settings = self.settings.lock().unwrap();
+
+ settings.producer_peer_id.clone()
+ }
+
+ fn send(&self, msg: p::IncomingMessage) {
+ let state = self.state.lock().unwrap();
+ if let Some(mut sender) = state.websocket_sender.clone() {
+ task::spawn(glib::clone!(@weak self as this => async move {
+ if let Err(err) = sender.send(msg).await {
+ this.obj().emit_by_name::<()>("error", &[&format!("Error: {}", err)]);
+ }
+ }));
+ }
+ }
+
+ pub fn start_session(&self) {
+ let role = self.settings.lock().unwrap().role;
+ if matches!(role, super::WebRTCSignallerRole::Consumer) {
+ let target_producer = self.producer_peer_id().unwrap();
+
+ self.send(p::IncomingMessage::StartSession(p::StartSessionMessage {
+ peer_id: target_producer.clone(),
+ }));
+
+ gst::info!(
+ CAT,
+ imp: self,
+ "Started session with producer peer id {target_producer}",
+ );
+ }
+ }
+
+ fn handle_message(
+ &self,
+ msg: Result<WsMessage, async_tungstenite::tungstenite::Error>,
+ meta: &Option<serde_json::Value>,
+ ) -> ControlFlow<()> {
+ match msg {
+ Ok(WsMessage::Text(msg)) => {
+ gst::trace!(CAT, imp: self, "Received message {}", msg);
+
+ if let Ok(msg) = serde_json::from_str::<p::OutgoingMessage>(&msg) {
+ match msg {
+ p::OutgoingMessage::Welcome { peer_id } => {
+ self.set_status(meta, &peer_id);
+ self.start_session();
+ }
+ p::OutgoingMessage::PeerStatusChanged(p::PeerStatus {
+ meta,
+ roles,
+ peer_id,
+ }) => {
+ let meta = meta.and_then(|m| match m {
+ serde_json::Value::Object(v) => Some(serialize_json_object(&v)),
+ _ => {
+ gst::error!(CAT, imp: self, "Invalid json value: {m:?}");
+ None
+ }
+ });
+
+ let peer_id =
+ peer_id.expect("Status changed should always contain a peer ID");
+ let mut state = self.state.lock().unwrap();
+ if roles.iter().any(|r| matches!(r, p::PeerRole::Producer)) {
+ if !state.producers.contains(&peer_id) {
+ state.producers.insert(peer_id.clone());
+ drop(state);
+
+ self.obj()
+ .emit_by_name::<()>("producer-added", &[&peer_id, &meta]);
+ }
+ } else if state.producers.remove(&peer_id) {
+ drop(state);
+
+ self.obj()
+ .emit_by_name::<()>("producer-removed", &[&peer_id, &meta]);
+ }
+ }
+ p::OutgoingMessage::SessionStarted {
+ peer_id,
+ session_id,
+ } => {
+ self.obj()
+ .emit_by_name::<()>("session-started", &[&session_id, &peer_id]);
+ }
+ p::OutgoingMessage::StartSession {
+ session_id,
+ peer_id,
+ } => {
+ assert!(matches!(
+ self.obj().property::<WebRTCSignallerRole>("role"),
+ super::WebRTCSignallerRole::Producer
+ ));
+
+ self.obj()
+ .emit_by_name::<()>("session-requested", &[&session_id, &peer_id]);
+ }
+ p::OutgoingMessage::EndSession(p::EndSessionMessage { session_id }) => {
+ gst::info!(CAT, imp: self, "Session {session_id} ended");
+
+ self.obj()
+ .emit_by_name::<()>("session-ended", &[&session_id]);
+ }
+ p::OutgoingMessage::Peer(p::PeerMessage {
+ session_id,
+ peer_message,
+ }) => match peer_message {
+ p::PeerMessageInner::Sdp(reply) => {
+ let (sdp, desc_type) = match reply {
+ p::SdpMessage::Answer { sdp } => {
+ (sdp, gst_webrtc::WebRTCSDPType::Answer)
+ }
+ p::SdpMessage::Offer { sdp } => {
+ (sdp, gst_webrtc::WebRTCSDPType::Offer)
+ }
+ };
+ let sdp = match gst_sdp::SDPMessage::parse_buffer(sdp.as_bytes()) {
+ Ok(sdp) => sdp,
+ Err(err) => {
+ self.obj().emit_by_name::<()>(
+ "error",
+ &[&format!("Error parsing SDP: {sdp} {err:?}")],
+ );
+
+ return ControlFlow::Break(());
+ }
+ };
+
+ let desc =
+ gst_webrtc::WebRTCSessionDescription::new(desc_type, sdp);
+ self.obj().emit_by_name::<()>(
+ "session-description",
+ &[&session_id, &desc],
+ );
+ }
+ p::PeerMessageInner::Ice {
+ candidate,
+ sdp_m_line_index,
+ } => {
+ let sdp_mid: Option<String> = None;
+ self.obj().emit_by_name::<()>(
+ "handle-ice",
+ &[&session_id, &sdp_m_line_index, &sdp_mid, &candidate],
+ );
+ }
+ },
+ p::OutgoingMessage::Error { details } => {
+ self.obj().emit_by_name::<()>(
+ "error",
+ &[&format!("Error message from server: {details}")],
+ );
+ }
+ _ => {
+ gst::warning!(CAT, imp: self, "Ignoring unsupported message {:?}", msg);
+ }
+ }
+ } else {
+ gst::error!(CAT, imp: self, "Unknown message from server: {}", msg);
+
+ self.obj().emit_by_name::<()>(
+ "error",
+ &[&format!("Unknown message from server: {}", msg)],
+ );
+ }
+ }
+ Ok(WsMessage::Close(reason)) => {
+ gst::info!(CAT, imp: self, "websocket connection closed: {:?}", reason);
+ return ControlFlow::Break(());
+ }
+ Ok(_) => (),
+ Err(err) => {
+ self.obj()
+ .emit_by_name::<()>("error", &[&format!("Error receiving: {}", err)]);
+ return ControlFlow::Break(());
+ }
+ }
+ ControlFlow::Continue(())
+ }
+}
+
+#[glib::object_subclass]
+impl ObjectSubclass for Signaller {
+ const NAME: &'static str = "GstWebRTCSignaller";
+ type Type = super::Signaller;
+ type ParentType = glib::Object;
+ type Interfaces = (Signallable,);
+}
+
+impl ObjectImpl for Signaller {
+ fn properties() -> &'static [glib::ParamSpec] {
+ static PROPS: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
+ vec![
+ glib::ParamSpecString::builder("uri")
+ .flags(glib::ParamFlags::READWRITE)
+ .build(),
+ glib::ParamSpecString::builder("producer-peer-id")
+ .flags(glib::ParamFlags::READWRITE)
+ .build(),
+ glib::ParamSpecString::builder("cafile")
+ .flags(glib::ParamFlags::READWRITE)
+ .build(),
+ glib::ParamSpecEnum::builder::<super::WebRTCSignallerRole>(
+ "role",
+ WebRTCSignallerRole::Consumer,
+ )
+ .flags(glib::ParamFlags::READWRITE)
+ .build(),
+ ]
+ });
+
+ PROPS.as_ref()
+ }
+
+ fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
+ match pspec.name() {
+ "uri" => {
+ if let Err(e) = self.set_uri(value.get::<&str>().expect("type checked upstream")) {
+ gst::error!(CAT, "Couldn't set URI: {e:?}");
+ }
+ }
+ "producer-peer-id" => {
+ let mut settings = self.settings.lock().unwrap();
+
+ if !matches!(settings.role, WebRTCSignallerRole::Consumer) {
+ gst::warning!(
+ CAT,
+ "Setting `producer-peer-id` doesn't make sense for {:?}",
+ settings.role
+ );
+ } else {
+ settings.producer_peer_id = value
+ .get::<Option<String>>()
+ .expect("type checked upstream");
+ }
+ }
+ "cafile" => {
+ self.settings.lock().unwrap().cafile = value
+ .get::<Option<String>>()
+ .expect("type checked upstream")
+ }
+ "role" => {
+ self.settings.lock().unwrap().role = value
+ .get::<WebRTCSignallerRole>()
+ .expect("type checked upstream")
+ }
+ _ => unimplemented!(),
+ }
+ }
+
+ fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
+ let settings = self.settings.lock().unwrap();
+ match pspec.name() {
+ "uri" => settings.uri.to_string().to_value(),
+ "producer-peer-id" => {
+ if !matches!(settings.role, WebRTCSignallerRole::Consumer) {
+ gst::warning!(
+ CAT,
+ "`producer-peer-id` doesn't make sense for {:?}",
+ settings.role
+ );
+ }
+
+ settings.producer_peer_id.to_value()
+ }
+ "cafile" => settings.cafile.to_value(),
+ "role" => settings.role.to_value(),
+ _ => unimplemented!(),
+ }
+ }
+}
+
+impl SignallableImpl for Signaller {
+ fn start(&self) {
+ gst::info!(CAT, imp: self, "Starting");
+ task::spawn(glib::clone!(@weak self as this => async move {
+ if let Err(err) = this.connect().await {
+ this.obj().emit_by_name::<()>("error", &[&format!("Error receiving: {}", err)]);
+ }
+ }));
+ }
+
+ fn stop(&self) {
+ gst::info!(CAT, imp: self, "Stopping now");
+
+ let mut state = self.state.lock().unwrap();
+ let send_task_handle = state.send_task_handle.take();
+ let receive_task_handle = state.receive_task_handle.take();
+ if let Some(mut sender) = state.websocket_sender.take() {
+ task::block_on(async move {
+ sender.close_channel();
+
+ if let Some(handle) = send_task_handle {
+ if let Err(err) = handle.await {
+ gst::warning!(CAT, imp: self, "Error while joining send task: {}", err);
+ }
+ }
+
+ if let Some(handle) = receive_task_handle {
+ handle.await;
+ }
+ });
+ }
+ }
+
+ fn send_sdp(&self, session_id: &str, sdp: &gst_webrtc::WebRTCSessionDescription) {
+ gst::debug!(CAT, imp: self, "Sending SDP {sdp:#?}");
+
+ let role = self.settings.lock().unwrap().role;
+ let is_consumer = matches!(role, super::WebRTCSignallerRole::Consumer);
+
+ let msg = p::IncomingMessage::Peer(p::PeerMessage {
+ session_id: session_id.to_owned(),
+ peer_message: p::PeerMessageInner::Sdp(if is_consumer {
+ p::SdpMessage::Answer {
+ sdp: sdp.sdp().as_text().unwrap(),
+ }
+ } else {
+ p::SdpMessage::Offer {
+ sdp: sdp.sdp().as_text().unwrap(),
+ }
+ }),
+ });
+
+ self.send(msg);
+ }
+
+ fn add_ice(
+ &self,
+ session_id: &str,
+ candidate: &str,
+ sdp_m_line_index: Option<u32>,
+ _sdp_mid: Option<String>,
+ ) {
+ gst::debug!(
+ CAT,
+ imp: self,
+ "Adding ice candidate {candidate:?} for {sdp_m_line_index:?} on session {session_id}"
+ );
+
+ let msg = p::IncomingMessage::Peer(p::PeerMessage {
+ session_id: session_id.to_string(),
+ peer_message: p::PeerMessageInner::Ice {
+ candidate: candidate.to_string(),
+ sdp_m_line_index: sdp_m_line_index.unwrap(),
+ },
+ });
+
+ self.send(msg);
+ }
+
+ fn end_session(&self, session_id: &str) {
+ gst::debug!(CAT, imp: self, "Signalling session done {}", session_id);
+
+ let state = self.state.lock().unwrap();
+ let session_id = session_id.to_string();
+ if let Some(mut sender) = state.websocket_sender.clone() {
+ task::spawn(glib::clone!(@weak self as this => async move {
+ if let Err(err) = sender
+ .send(p::IncomingMessage::EndSession(p::EndSessionMessage {
+ session_id,
+ }))
+ .await
+ {
+ this.obj().emit_by_name::<()>("error", &[&format!("Error: {}", err)]);
+ }
+ }));
+ }
+ }
+}
+
+impl GstObjectImpl for Signaller {}
diff --git a/net/webrtc/src/webrtcsrc/signaller/mod.rs b/net/webrtc/src/webrtcsrc/signaller/mod.rs
new file mode 100644
index 00000000..c3f62214
--- /dev/null
+++ b/net/webrtc/src/webrtcsrc/signaller/mod.rs
@@ -0,0 +1,46 @@
+mod iface;
+mod imp;
+use gst::glib;
+
+use once_cell::sync::Lazy;
+// Expose traits and objects from the module itself so it exactly looks like
+// generated bindings
+pub use imp::WebRTCSignallerRole;
+pub mod prelude {
+ pub use {super::SignallableExt, super::SignallableImpl};
+}
+
+pub static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
+ gst::DebugCategory::new(
+ "webrtcsrc-signaller",
+ gst::DebugColorFlags::empty(),
+ Some("WebRTC src signaller"),
+ )
+});
+
+glib::wrapper! {
+ pub struct Signallable(ObjectInterface<iface::Signallable>);
+}
+
+glib::wrapper! {
+ pub struct Signaller(ObjectSubclass <imp::Signaller>) @implements Signallable;
+}
+
+impl Default for Signaller {
+ fn default() -> Self {
+ glib::Object::new(&[])
+ }
+}
+
+impl Signaller {
+ pub fn new(mode: WebRTCSignallerRole) -> Self {
+ glib::Object::new(&[("role", &mode)])
+ }
+}
+
+pub use iface::SignallableExt;
+pub use iface::SignallableImpl;
+pub use iface::SignallableImplExt;
+
+unsafe impl Send for Signallable {}
+unsafe impl Sync for Signallable {}