Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/net
diff options
context:
space:
mode:
authorTaruntej Kanakamalla <taruntej@asymptotic.io>2023-07-19 08:05:33 +0300
committerGStreamer Marge Bot <gitlab-merge-bot@gstreamer-foundation.org>2023-11-17 21:08:44 +0300
commit43ee6bfc1c991b8fd8117d7f57df2c39f0f9377c (patch)
treec36ec158c082b8da54ed19bed60af891a506e5ca /net
parented3aa740bedf2cf308d91d89b017071af34260a2 (diff)
net/webrtc: add whipserversrc
Implement new signaller WhipServerSignaller - an http server using 'warp' - handlers for the POST, OPTIONS, PATCH and DELETE - fixed path `/whip/endpoint` as the URI - fixed value 'whip-client' as the producer peer id - fixed resource url `/whip/resource/whip-client` Derive whipserversrc element from BaseWebRTCSrc - implement constructed method for ObjectImpl to set non-default signaller, i.e., WhipServerSignaller - bind the properties stun-server and turn-servers to those on the Signaller Connect to 'webrtcbin-ready' signal in the constructor of WhipServerSignaller - it will be emitted by the webrtcsrc when the webrtcbin element is ready - the closure for this signal will in turn connect to webrtcbin's ice-gathering-state and perform send with the answer sdp via the channel - the WhipServer will hold its HTTP response in POST handler until this signal is received or timeout which happens early Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1284>
Diffstat (limited to 'net')
-rw-r--r--net/webrtc/Cargo.toml3
-rw-r--r--net/webrtc/README.md53
-rw-r--r--net/webrtc/src/utils.rs37
-rw-r--r--net/webrtc/src/webrtcsrc/imp.rs68
-rw-r--r--net/webrtc/src/webrtcsrc/mod.rs15
-rw-r--r--net/webrtc/src/whip_signaller/imp.rs626
-rw-r--r--net/webrtc/src/whip_signaller/mod.rs17
7 files changed, 814 insertions, 5 deletions
diff --git a/net/webrtc/Cargo.toml b/net/webrtc/Cargo.toml
index 02f8a81f9..6a8b2e3f8 100644
--- a/net/webrtc/Cargo.toml
+++ b/net/webrtc/Cargo.toml
@@ -54,6 +54,9 @@ async-recursion = "1.0.0"
livekit-protocol = { version = "0.2" }
livekit-api = { version = "0.2", default-features = false, features = ["signal-client", "access-token", "native-tls"] }
+warp = "0.3"
+crossbeam-channel = "0.5"
+
[dev-dependencies]
tracing = { version = "0.1", features = ["log"] }
tracing-subscriber = { version = "0.3", features = ["registry", "env-filter"] }
diff --git a/net/webrtc/README.md b/net/webrtc/README.md
index 97c5e5975..7dbc9ebae 100644
--- a/net/webrtc/README.md
+++ b/net/webrtc/README.md
@@ -245,7 +245,11 @@ AWS_ACCESS_KEY_ID="XXX" AWS_SECRET_ACCESS_KEY="XXX" gst-launch-1.0 videotestsrc
## Using the WHIP Signaller
-Testing the whip signaller can be done by setting up janus and
+### WHIP Client
+
+WHIP Client Signaller uses BaseWebRTCSink
+
+Testing the whip client as the signaller can be done by setting up janus and
<https://github.com/meetecho/simple-whip-server/>.
* Set up a [janus] instance with the videoroom plugin configured
@@ -269,6 +273,53 @@ gst-launch-1.0 -e uridecodebin uri=file:///home/meh/path/to/video/file ! \
You should see a second video displayed in the videoroomtest web page.
+### WHIP Server
+
+WHIP Server Signaller uses BaseWebRTCSrc
+
+The WHIP Server as the signaller can be tested in two ways.
+
+Note: The initial version of `whipserversrc` does not check any auth or encryption.
+Host application using `whipserversrc` behind an HTTP(s) proxy to enforce the auth and encryption between the WHIP client and server
+
+#### 1. Using the Gstreamer element `whipwebrtcsink`
+
+a. In one tab of the terminal start the WHIP server using the below command
+
+``` shell
+RUST_BACKTRACE=full GST_DEBUG=webrtc*:6 GST_PLUGIN_PATH=target/x86_64-unknown-linux-gnu/debug:$GST_PLUGIN_PATH gst-launch-1.0 whipserversrc signaller::host-addr=http://127.0.0.1:8190 stun-server="stun://stun.l.google.com:19302" turn-servers="\<\"turns://user1:pass1@turn.serverone.com:7806\", \"turn://user2:pass2@turn.servertwo.com:7809\"\>" ! videoconvert ! autovideosink
+```
+
+b. In the second tab start the WHIP Client by sending a test video as shown in the below command
+
+``` shell
+RUST_BACKTRACE=full GST_DEBUG=webrtc*:6 GST_PLUGIN_PATH=target/x86_64-unknown-linux-gnu/debug:$GST_PLUGIN_PATH gst-launch-1.0 videotestsrc ! videoconvert ! video/x-raw ! queue ! \
+ whipwebrtcsink name=ws signaller::whip-endpoint="http://127.0.0.1:8190/whip/endpoint"
+```
+
+#### 2. Using Meetecho's `simple-whip-client`
+
+Set up the simple whip client using using the instructions present in https://github.com/meetecho/simple-whip-client#readme
+
+a. In one tab of the terminal start the WHIP server using the below command
+
+``` shell
+RUST_BACKTRACE=full GST_DEBUG=webrtc*:6 GST_PLUGIN_PATH=target/x86_64-unknown-linux-gnu/debug:$GST_PLUGIN_PATH gst-launch-1.0 whipserversrc signaller::host-addr=http://127.0.0.1:8190 stun-server="stun://stun.l.google.com:19302" turn-servers="\<\"turns://user1:pass1@turn.serverone.com:7806\", \"turn://user2:pass2@turn.servertwo.com:7809\"\>" name=ws ! videoconvert ! autovideosink ws. ! audioconvert ! autoaudiosink
+```
+
+b. In the second tab start the `simple-whip-client` as shown in the below command
+
+``` shell
+./whip-client --url http://127.0.0.1:8190/whip/endpoint \
+ -A "audiotestsrc is-live=true wave=red-noise ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay pt=100 ssrc=1 ! queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=100" \
+ -V "videotestsrc is-live=true pattern=ball ! videoconvert ! queue ! vp8enc deadline=1 ! rtpvp8pay pt=96 ssrc=2 ! queue ! application/x-rtp,media=video,encoding-name=VP8,payload=96" \
+ -S stun://stun.l.google.com:19302 \
+ -l 7 \
+ -n true
+```
+
+Terminating the client will close the session and the client should receive 200 (OK) as the response to the DELETE request
+
## Using the LiveKit Signaller
Testing the LiveKit signaller can be done by setting up [LiveKit] and creating a room.
diff --git a/net/webrtc/src/utils.rs b/net/webrtc/src/utils.rs
index 6b28ee96e..808bc09a9 100644
--- a/net/webrtc/src/utils.rs
+++ b/net/webrtc/src/utils.rs
@@ -357,6 +357,43 @@ pub fn set_ice_servers(
Ok(())
}
+pub fn build_link_header(url_str: &str) -> Result<String, url::ParseError> {
+ let url = url::Url::parse(url_str)?;
+
+ let mut link_str: String = "<".to_owned() + url.scheme();
+ if let Some(host) = url.host_str() {
+ link_str = link_str + ":" + host;
+ }
+
+ if let Some(port) = url.port() {
+ link_str = link_str + ":" + port.to_string().as_str();
+ }
+
+ link_str += url.path();
+
+ if let Some(query) = url.query() {
+ link_str = link_str + "?" + query;
+ }
+
+ link_str += ">";
+
+ if let Some(password) = url.password() {
+ link_str = link_str
+ + "; "
+ + "rel=\"ice-server\""
+ + "; "
+ + "username=\""
+ + url.username()
+ + "\"; "
+ + "credential:\""
+ + password
+ + "\"; "
+ + "credential-type:\"password\";";
+ }
+
+ Ok(link_str)
+}
+
/// Wrapper around `gst::ElementFactory::make` with a better error
/// message
pub fn make_element(element: &str, name: Option<&str>) -> Result<gst::Element, Error> {
diff --git a/net/webrtc/src/webrtcsrc/imp.rs b/net/webrtc/src/webrtcsrc/imp.rs
index 42a05108c..5dbc53eb1 100644
--- a/net/webrtc/src/webrtcsrc/imp.rs
+++ b/net/webrtc/src/webrtcsrc/imp.rs
@@ -5,6 +5,7 @@ use gst::prelude::*;
use crate::signaller::{prelude::*, Signallable, Signaller};
use crate::utils::{Codec, Codecs, NavigationEvent, AUDIO_CAPS, RTP_CAPS, VIDEO_CAPS};
use crate::webrtcsrc::WebRTCSrcPad;
+use crate::whip_signaller::WhipServerSignaller;
use anyhow::{Context, Error};
use gst::glib;
use gst::glib::once_cell::sync::Lazy;
@@ -552,6 +553,9 @@ impl BaseWebRTCSrc {
}),
);
+ self.signaller()
+ .emit_by_name::<()>("webrtcbin-ready", &[&"none", &webrtcbin]);
+
bin.add(&webrtcbin).unwrap();
self.obj().add(&bin).context("Could not add `webrtcbin`")?;
@@ -995,6 +999,16 @@ impl BaseWebRTCSrc {
gst::info!(CAT, imp: self, "Stopped signaller");
}
}
+
+ pub fn set_signaller(&self, signaller: Signallable) -> Result<(), Error> {
+ let sigobj = signaller.clone();
+ let mut settings = self.settings.lock().unwrap();
+
+ self.connect_signaller(&sigobj);
+ settings.signaller = signaller;
+
+ Ok(())
+ }
}
impl ElementImpl for BaseWebRTCSrc {
@@ -1225,3 +1239,57 @@ impl ObjectSubclass for WebRTCSrc {
type ParentType = super::BaseWebRTCSrc;
type Interfaces = (gst::URIHandler,);
}
+
+#[derive(Default)]
+pub struct WhipServerSrc {}
+
+impl ObjectImpl for WhipServerSrc {
+ fn constructed(&self) {
+ self.parent_constructed();
+ let element = self.obj();
+ let ws = element.upcast_ref::<super::BaseWebRTCSrc>().imp();
+
+ let _ = ws.set_signaller(WhipServerSignaller::default().upcast());
+
+ let obj = &*self.obj();
+
+ obj.set_suppressed_flags(gst::ElementFlags::SINK | gst::ElementFlags::SOURCE);
+ obj.set_element_flags(gst::ElementFlags::SOURCE);
+
+ let settings = ws.settings.lock().unwrap();
+ element
+ .bind_property("stun-server", &settings.signaller, "stun-server")
+ .build();
+ element
+ .bind_property("turn-servers", &settings.signaller, "turn-servers")
+ .build();
+ }
+}
+
+impl GstObjectImpl for WhipServerSrc {}
+
+impl BinImpl for WhipServerSrc {}
+
+impl ElementImpl for WhipServerSrc {
+ fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
+ static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
+ gst::subclass::ElementMetadata::new(
+ "WhipServerSrc",
+ "Source/Network/WebRTC",
+ "WebRTC source element using WHIP Server as the signaller",
+ "Taruntej Kanakamalla <taruntej@asymptotic.io>",
+ )
+ });
+
+ Some(&*ELEMENT_METADATA)
+ }
+}
+
+impl BaseWebRTCSrcImpl for WhipServerSrc {}
+
+#[glib::object_subclass]
+impl ObjectSubclass for WhipServerSrc {
+ const NAME: &'static str = "GstWhipServerSrc";
+ type Type = super::WhipServerSrc;
+ type ParentType = super::BaseWebRTCSrc;
+}
diff --git a/net/webrtc/src/webrtcsrc/mod.rs b/net/webrtc/src/webrtcsrc/mod.rs
index b85612bcc..45bef4bac 100644
--- a/net/webrtc/src/webrtcsrc/mod.rs
+++ b/net/webrtc/src/webrtcsrc/mod.rs
@@ -50,6 +50,10 @@ glib::wrapper! {
}
glib::wrapper! {
+ pub struct WhipServerSrc(ObjectSubclass<imp::WhipServerSrc>) @extends BaseWebRTCSrc, gst::Bin, gst::Element, gst::Object, @implements gst::URIHandler, gst::ChildProxy;
+}
+
+glib::wrapper! {
pub struct WebRTCSrcPad(ObjectSubclass<pad::WebRTCSrcPad>) @extends gst::GhostPad, gst::ProxyPad, gst::Pad, gst::Object;
}
@@ -63,5 +67,14 @@ pub fn register(plugin: Option<&gst::Plugin>) -> Result<(), glib::BoolError> {
"webrtcsrc",
gst::Rank::PRIMARY,
WebRTCSrc::static_type(),
- )
+ )?;
+
+ gst::Element::register(
+ plugin,
+ "whipserversrc",
+ gst::Rank::PRIMARY,
+ WhipServerSrc::static_type(),
+ )?;
+
+ Ok(())
}
diff --git a/net/webrtc/src/whip_signaller/imp.rs b/net/webrtc/src/whip_signaller/imp.rs
index 741a50c6a..50ff91f62 100644
--- a/net/webrtc/src/whip_signaller/imp.rs
+++ b/net/webrtc/src/whip_signaller/imp.rs
@@ -2,20 +2,35 @@
use crate::signaller::{Signallable, SignallableImpl};
use crate::utils::{
- build_reqwest_client, parse_redirect_location, set_ice_servers, wait, wait_async, WaitError,
+ build_link_header, build_reqwest_client, parse_redirect_location, set_ice_servers, wait,
+ wait_async, WaitError,
};
use crate::RUNTIME;
use async_recursion::async_recursion;
-use gst::glib;
use gst::glib::once_cell::sync::Lazy;
+use gst::glib::{self, RustClosure};
use gst::prelude::*;
use gst::subclass::prelude::*;
+use gst_sdp::SDPMessage;
use gst_webrtc::{WebRTCICEGatheringState, WebRTCSessionDescription};
use reqwest::header::HeaderMap;
use reqwest::header::HeaderValue;
use reqwest::StatusCode;
use std::sync::Mutex;
+use core::time::Duration;
+use crossbeam_channel::unbounded;
+use std::net::SocketAddr;
+use url::Url;
+use warp::{
+ http,
+ hyper::{
+ header::{CONTENT_TYPE, LINK},
+ Body,
+ },
+ Filter, Reply,
+};
+
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"webrtc-whip-signaller",
@@ -27,6 +42,15 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
const MAX_REDIRECTS: u8 = 10;
const DEFAULT_TIMEOUT: u32 = 15;
+const ROOT: &str = "whip";
+const ENDPOINT_PATH: &str = "endpoint";
+const RESOURCE_PATH: &str = "resource";
+const DEFAULT_HOST_ADDR: &str = "http://127.0.0.1:8080";
+const DEFAULT_STUN_SERVER: Option<&str> = Some("stun://stun.l.google.com:19303");
+const DEFAULT_PRODUCER_PEER_ID: Option<&str> = Some("whip-client");
+const CONTENT_SDP: &str = "application/sdp";
+const CONTENT_TRICKLE_ICE: &str = "application/trickle-ice-sdpfrag";
+
#[derive(Debug)]
enum WhipClientState {
Stopped,
@@ -588,3 +612,601 @@ impl ObjectImpl for WhipClient {
}
}
}
+
+// WHIP server implementation
+
+#[derive(Debug)]
+enum WhipServerState {
+ Idle,
+ Negotiating,
+ Ready,
+}
+
+impl Default for WhipServerState {
+ fn default() -> Self {
+ Self::Idle
+ }
+}
+
+struct WhipServerSettings {
+ stun_server: Option<String>,
+ turn_servers: gst::Array,
+ host_addr: Url,
+ producer_peer_id: Option<String>,
+ timeout: u32,
+ shutdown_signal: Option<tokio::sync::oneshot::Sender<()>>,
+ server_handle: Option<tokio::task::JoinHandle<()>>,
+ sdp_answer: Option<crossbeam_channel::Sender<Option<SDPMessage>>>,
+}
+
+impl Default for WhipServerSettings {
+ fn default() -> Self {
+ Self {
+ host_addr: Url::parse(DEFAULT_HOST_ADDR).unwrap(),
+ stun_server: DEFAULT_STUN_SERVER.map(String::from),
+ turn_servers: gst::Array::new(Vec::new() as Vec<glib::SendValue>),
+ producer_peer_id: DEFAULT_PRODUCER_PEER_ID.map(String::from),
+ timeout: DEFAULT_TIMEOUT,
+ shutdown_signal: None,
+ server_handle: None,
+ sdp_answer: None,
+ }
+ }
+}
+
+pub struct WhipServer {
+ state: Mutex<WhipServerState>,
+ settings: Mutex<WhipServerSettings>,
+}
+
+impl Default for WhipServer {
+ fn default() -> Self {
+ Self {
+ settings: Mutex::new(WhipServerSettings::default()),
+ state: Mutex::new(WhipServerState::default()),
+ }
+ }
+}
+
+#[derive(Debug)]
+struct InternalError;
+
+impl warp::reject::Reject for InternalError {}
+
+impl WhipServer {
+ pub fn on_webrtcbin_ready(&self) -> RustClosure {
+ glib::closure!(|signaller: &super::WhipServerSignaller,
+ _producer_identifier: &str,
+ webrtcbin: &gst::Element| {
+ let obj_weak = signaller.downgrade();
+ webrtcbin.connect_notify(Some("ice-gathering-state"), move |webrtcbin, _pspec| {
+ let obj = match obj_weak.upgrade() {
+ Some(obj) => obj,
+ None => return,
+ };
+
+ let state = webrtcbin.property::<WebRTCICEGatheringState>("ice-gathering-state");
+
+ match state {
+ WebRTCICEGatheringState::Gathering => {
+ gst::info!(CAT, obj: obj, "ICE gathering started");
+ }
+ WebRTCICEGatheringState::Complete => {
+ gst::info!(CAT, obj: obj, "ICE gathering complete");
+ let ans: Option<gst_sdp::SDPMessage>;
+ let settings = obj.imp().settings.lock().unwrap();
+ if let Some(answer_sdp) = webrtcbin
+ .property::<Option<WebRTCSessionDescription>>("local-description")
+ {
+ ans = Some(answer_sdp.sdp());
+ } else {
+ ans = None;
+ }
+ if let Some(tx) = &settings.sdp_answer {
+ tx.send(ans).unwrap()
+ }
+ }
+ _ => (),
+ }
+ });
+ })
+ }
+
+ async fn patch_handler(&self, _id: String) -> Result<impl warp::Reply, warp::Rejection> {
+ // FIXME: implement ICE Trickle and ICE restart
+ // emit signal `handle-ice` to for ICE trickle
+ let reply = warp::reply::reply();
+ let res = warp::reply::with_status(reply, http::StatusCode::NOT_IMPLEMENTED);
+ Ok(res.into_response())
+
+ //FIXME: add state checking once ICE trickle is implemented
+ }
+
+ async fn delete_handler(&self, _id: String) -> Result<impl warp::Reply, warp::Rejection> {
+ let mut state = self.state.lock().unwrap();
+ match *state {
+ WhipServerState::Ready => {
+ // FIXME: session-ended will make webrtcsrc send EOS
+ // and producer-removed is not handled
+ // Need to address the usecase where when the client terminates
+ // the webrtcsrc should be running without sending EOS and reset
+ // for next client connection like a usual server
+
+ self.obj().emit_by_name::<bool>("session-ended", &[&ROOT]);
+
+ gst::info!(CAT, imp:self, "Ending session");
+ *state = WhipServerState::Idle;
+ Ok(warp::reply::reply().into_response())
+ }
+ _ => {
+ gst::error!(CAT, imp: self, "DELETE requested in {state:?} state. Can't Proceed");
+ let res = http::Response::builder()
+ .status(http::StatusCode::CONFLICT)
+ .body(Body::from(String::from("Session not Ready")))
+ .unwrap();
+ Ok(res)
+ }
+ }
+ }
+
+ async fn options_handler(&self) -> Result<impl warp::Reply, warp::Rejection> {
+ let settings = self.settings.lock().unwrap();
+ let peer_id = settings.producer_peer_id.clone().unwrap();
+ drop(settings);
+
+ let mut state = self.state.lock().unwrap();
+ match *state {
+ WhipServerState::Idle => {
+ self.obj()
+ .emit_by_name::<()>("session-started", &[&ROOT, &peer_id]);
+ *state = WhipServerState::Negotiating
+ }
+ WhipServerState::Ready => {
+ gst::error!(CAT, imp: self, "OPTIONS requested in {state:?} state. Can't proceed");
+ let res = http::Response::builder()
+ .status(http::StatusCode::CONFLICT)
+ .body(Body::from(String::from("Session active already")))
+ .unwrap();
+ return Ok(res);
+ }
+ _ => {}
+ };
+ drop(state);
+
+ let mut links = HeaderMap::new();
+ let settings = self.settings.lock().unwrap();
+ match &settings.stun_server {
+ Some(stun) => match build_link_header(stun.as_str()) {
+ Ok(stun_link) => {
+ links.append(LINK, HeaderValue::from_str(stun_link.as_str()).unwrap());
+ }
+ Err(e) => {
+ gst::error!(CAT, imp: self, "Failed to parse {stun:?} : {e:?}");
+ }
+ },
+ None => {}
+ }
+
+ if !settings.turn_servers.is_empty() {
+ for turn_server in settings.turn_servers.iter() {
+ if let Ok(turn) = turn_server.get::<String>() {
+ gst::debug!(CAT, imp: self, "turn server: {}",turn.as_str());
+ match build_link_header(turn.as_str()) {
+ Ok(turn_link) => {
+ links.append(LINK, HeaderValue::from_str(turn_link.as_str()).unwrap());
+ }
+ Err(e) => {
+ gst::error!(CAT, imp: self, "Failed to parse {turn_server:?} : {e:?}");
+ }
+ }
+ } else {
+ gst::debug!(CAT, imp: self, "Failed to get String value of {turn_server:?}");
+ }
+ }
+ }
+
+ let mut res = http::Response::builder()
+ .header("Access-Post", "application/sdp")
+ .body(Body::empty())
+ .unwrap();
+
+ let headers = res.headers_mut();
+ headers.extend(links);
+
+ Ok(res)
+ }
+
+ async fn post_handler(
+ &self,
+ body: warp::hyper::body::Bytes,
+ ) -> Result<http::Response<warp::hyper::Body>, warp::Rejection> {
+ let mut settings = self.settings.lock().unwrap();
+ let peer_id = settings.producer_peer_id.clone().unwrap();
+ let wait_timeout = settings.timeout;
+ let (tx, rx) = unbounded::<Option<SDPMessage>>();
+ settings.sdp_answer = Some(tx);
+ drop(settings);
+
+ let mut state = self.state.lock().unwrap();
+ match *state {
+ WhipServerState::Idle => {
+ self.obj()
+ .emit_by_name::<()>("session-started", &[&ROOT, &peer_id]);
+ *state = WhipServerState::Negotiating
+ }
+ WhipServerState::Ready => {
+ gst::error!(CAT, imp: self, "POST requested in {state:?} state. Can't Proceed");
+ let res = http::Response::builder()
+ .status(http::StatusCode::CONFLICT)
+ .body(Body::from(String::from("Session active already")))
+ .unwrap();
+ return Ok(res);
+ }
+ _ => {}
+ };
+ drop(state);
+
+ match gst_sdp::SDPMessage::parse_buffer(body.as_ref()) {
+ Ok(offer_sdp) => {
+ let offer = gst_webrtc::WebRTCSessionDescription::new(
+ gst_webrtc::WebRTCSDPType::Offer,
+ offer_sdp,
+ );
+
+ self.obj()
+ .emit_by_name::<()>("session-description", &[&"unique", &offer]);
+ }
+ Err(err) => {
+ gst::error!(CAT, imp: self, "Could not parse offer SDP: {err}");
+ let reply = warp::reply::reply();
+ let res = warp::reply::with_status(reply, http::StatusCode::NOT_ACCEPTABLE);
+ return Ok(res.into_response());
+ }
+ }
+
+ // We don't want to wait infinitely for the ice gathering to complete.
+ let answer = match rx.recv_timeout(Duration::from_secs(wait_timeout as u64)) {
+ Ok(a) => a,
+ Err(e) => {
+ let reply = warp::reply::reply();
+ let res;
+ if e.is_timeout() {
+ res = warp::reply::with_status(reply, http::StatusCode::REQUEST_TIMEOUT);
+ gst::error!(CAT, imp: self, "Timedout waiting for SDP answer");
+ } else {
+ res = warp::reply::with_status(reply, http::StatusCode::INTERNAL_SERVER_ERROR);
+ gst::error!(CAT, imp: self, "Channel got disconnected");
+ }
+ return Ok(res.into_response());
+ }
+ };
+
+ let settings = self.settings.lock().unwrap();
+ let mut links = HeaderMap::new();
+
+ match &settings.stun_server {
+ Some(stun) => match build_link_header(stun.as_str()) {
+ Ok(stun_link) => {
+ links.append(LINK, HeaderValue::from_str(stun_link.as_str()).unwrap());
+ }
+ Err(e) => {
+ gst::error!(CAT, imp: self, "Failed to parse {stun:?} : {e:?}");
+ }
+ },
+ None => {}
+ }
+
+ if !settings.turn_servers.is_empty() {
+ for turn_server in settings.turn_servers.iter() {
+ if let Ok(turn) = turn_server.get::<String>() {
+ gst::debug!(CAT, imp: self, "turn server: {}",turn.as_str());
+ match build_link_header(turn.as_str()) {
+ Ok(turn_link) => {
+ links.append(LINK, HeaderValue::from_str(turn_link.as_str()).unwrap());
+ }
+ Err(e) => {
+ gst::error!(CAT, imp: self, "Failed to parse {turn_server:?} : {e:?}");
+ }
+ }
+ } else {
+ gst::error!(CAT, imp: self, "Failed to get String value of {turn_server:?}");
+ }
+ }
+ }
+
+ // Note: including the ETag in the original "201 Created" response is only REQUIRED
+ // if the WHIP resource supports ICE restarts and OPTIONAL otherwise.
+
+ let ans_text: Result<String, String>;
+ if let Some(sdp) = answer {
+ match sdp.as_text() {
+ Ok(text) => {
+ ans_text = Ok(text);
+ gst::debug!(CAT, imp: self, "{ans_text:?}");
+ }
+ Err(e) => {
+ ans_text = Err(format!("Failed to get SDP answer: {e:?}"));
+ gst::error!(CAT, imp: self, "{e:?}");
+ }
+ }
+ } else {
+ let e = "SDP Answer is empty!".to_string();
+ gst::error!(CAT, imp: self, "{e:?}");
+ ans_text = Err(e);
+ }
+
+ // If ans_text is an error. Send error code and error string in the response
+ if let Err(e) = ans_text {
+ let res = http::Response::builder()
+ .status(http::StatusCode::INTERNAL_SERVER_ERROR)
+ .body(Body::from(e))
+ .unwrap();
+ return Ok(res);
+ }
+
+ drop(settings);
+
+ // Got SDP answer, send answer in the response
+ let resource_url = "/".to_owned() + ROOT + "/" + RESOURCE_PATH + "/" + &peer_id;
+ let mut res = http::Response::builder()
+ .status(StatusCode::CREATED)
+ .header(CONTENT_TYPE, "application/sdp")
+ .header("location", resource_url)
+ .body(Body::from(ans_text.unwrap()))
+ .unwrap();
+
+ let headers = res.headers_mut();
+ headers.extend(links);
+
+ let mut state = self.state.lock().unwrap();
+ *state = WhipServerState::Ready;
+ drop(state);
+
+ Ok(res)
+ }
+
+ fn serve(&self) -> Option<tokio::task::JoinHandle<()>> {
+ let mut settings = self.settings.lock().unwrap();
+ let addr: SocketAddr;
+ match settings.host_addr.socket_addrs(|| None) {
+ Ok(v) => {
+ // pick the first vector item
+ addr = v[0];
+ gst::info!(CAT, imp:self, "using {addr:?} as address");
+ }
+ Err(e) => {
+ gst::error!(CAT, imp:self, "error getting addr from uri {e:?}");
+ self.obj()
+ .emit_by_name::<()>("error", &[&format!("Unable to start WHIP Server: {e:?}")]);
+ return None;
+ }
+ }
+
+ let (tx, rx) = tokio::sync::oneshot::channel::<()>();
+ settings.shutdown_signal = Some(tx);
+ drop(settings);
+
+ let prefix = warp::path(ROOT);
+
+ let self_weak = self.downgrade();
+
+ // POST /endpoint
+ let post_filter = warp::post()
+ .and(warp::path(ENDPOINT_PATH))
+ .and(warp::path::end())
+ .and(warp::header::exact(CONTENT_TYPE.as_str(), CONTENT_SDP))
+ .and(warp::body::bytes())
+ .and_then(move |body| {
+ let s = self_weak.upgrade();
+ async {
+ let self_ = s.expect("Need to have the ObjectRef");
+ self_.post_handler(body).await
+ }
+ });
+
+ let self_weak = self.downgrade();
+
+ // OPTIONS /endpoint
+ let options_filter = warp::options()
+ .and(warp::path(ENDPOINT_PATH))
+ .and(warp::path::end())
+ .and_then(move || {
+ let s = self_weak.upgrade();
+ async {
+ let self_ = s.expect("Need to have the ObjectRef");
+ self_.options_handler().await
+ }
+ });
+
+ let self_weak = self.downgrade();
+
+ // PATCH /resource/:id
+ let patch_filter = warp::patch()
+ .and(warp::path(RESOURCE_PATH))
+ .and(warp::path::param::<String>())
+ .and(warp::path::end())
+ .and(warp::header::exact(
+ CONTENT_TYPE.as_str(),
+ CONTENT_TRICKLE_ICE,
+ ))
+ .and_then(move |id| {
+ let s = self_weak.upgrade();
+ async {
+ let self_ = s.expect("Need to have the ObjectRef");
+ self_.patch_handler(id).await
+ }
+ });
+
+ let self_weak = self.downgrade();
+
+ // DELETE /resource/:id
+ let delete_filter = warp::delete()
+ .and(warp::path(RESOURCE_PATH))
+ .and(warp::path::param::<String>())
+ .and(warp::path::end())
+ .and_then(move |id| {
+ let s = self_weak.upgrade();
+ async {
+ let self_ = s.expect("Need to have the ObjectRef");
+ self_.delete_handler(id).await
+ }
+ });
+
+ let api = prefix
+ .and(post_filter)
+ .or(prefix.and(options_filter))
+ .or(prefix.and(patch_filter))
+ .or(prefix.and(delete_filter));
+
+ let s = warp::serve(api);
+ let jh = RUNTIME.spawn(async move {
+ let (_, server) = s.bind_with_graceful_shutdown(addr, async move {
+ match rx.await {
+ Ok(_) => gst::debug!(CAT, "Server shut down signal received"),
+ Err(e) => gst::error!(CAT, "{e:?}: Sender dropped"),
+ }
+ });
+
+ server.await;
+ gst::debug!(CAT, "Stopped the server task...");
+ });
+
+ gst::debug!(CAT, imp: self, "Started the server...");
+ Some(jh)
+ }
+
+ fn set_host_addr(&self, host_addr: &str) -> Result<(), url::ParseError> {
+ let mut settings = self.settings.lock().unwrap();
+ settings.host_addr = Url::parse(host_addr)?;
+ Ok(())
+ }
+}
+
+impl SignallableImpl for WhipServer {
+ fn start(&self) {
+ gst::info!(CAT, imp: self, "starting the WHIP server");
+ let jh = self.serve();
+ let mut settings = self.settings.lock().unwrap();
+ settings.server_handle = jh;
+ }
+
+ fn stop(&self) {
+ let mut settings = self.settings.lock().unwrap();
+
+ let handle = settings
+ .server_handle
+ .take()
+ .expect("Server handle should be set");
+
+ let tx = settings
+ .shutdown_signal
+ .take()
+ .expect("Shutdown signal Sender needs to be valid");
+
+ if tx.send(()).is_err() {
+ gst::error!(CAT, imp: self, "Failed to send shutdown signal. Receiver dropped");
+ }
+
+ gst::debug!(CAT, imp: self, "Await server handle to join");
+ RUNTIME.block_on(async {
+ if let Err(e) = handle.await {
+ gst::error!(CAT, imp:self, "Failed to join server handle: {e:?}");
+ };
+ });
+
+ gst::info!(CAT, imp: self, "stopped the WHIP server");
+ }
+
+ fn end_session(&self, _session_id: &str) {
+ //FIXME: send any events to the client
+ }
+}
+
+#[glib::object_subclass]
+impl ObjectSubclass for WhipServer {
+ const NAME: &'static str = "GstWhipServerSignaller";
+ type Type = super::WhipServerSignaller;
+ type ParentType = glib::Object;
+ type Interfaces = (Signallable,);
+}
+
+impl ObjectImpl for WhipServer {
+ fn properties() -> &'static [glib::ParamSpec] {
+ static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
+ vec![
+ glib::ParamSpecString::builder("host-addr")
+ .nick("Host address")
+ .blurb("The the host address of the WHIP endpoint e.g., http://127.0.0.1:8080")
+ .default_value(DEFAULT_HOST_ADDR)
+ .flags(glib::ParamFlags::READWRITE)
+ .build(),
+ // needed by webrtcsrc in handle_webrtc_src_pad
+ glib::ParamSpecString::builder("producer-peer-id")
+ .default_value(DEFAULT_PRODUCER_PEER_ID)
+ .flags(glib::ParamFlags::READABLE)
+ .build(),
+ glib::ParamSpecString::builder("stun-server")
+ .nick("STUN Server")
+ .blurb("The STUN server of the form stun://hostname:port")
+ .default_value(DEFAULT_STUN_SERVER)
+ .build(),
+ gst::ParamSpecArray::builder("turn-servers")
+ .nick("List of TURN Servers to user")
+ .blurb("The TURN servers of the form <\"turn(s)://username:password@host:port\", \"turn(s)://username1:password1@host1:port1\">")
+ .element_spec(&glib::ParamSpecString::builder("turn-server")
+ .nick("TURN Server")
+ .blurb("The TURN server of the form turn(s)://username:password@host:port.")
+ .build()
+ )
+ .mutable_ready()
+ .build(),
+ glib::ParamSpecUInt::builder("timeout")
+ .nick("Timeout")
+ .blurb("Value in seconds to timeout WHIP endpoint requests (0 = No timeout).")
+ .maximum(3600)
+ .default_value(DEFAULT_TIMEOUT)
+ .build(),
+ ]
+ });
+ PROPERTIES.as_ref()
+ }
+
+ fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
+ match pspec.name() {
+ "host-addr" => {
+ if let Err(e) =
+ self.set_host_addr(value.get::<&str>().expect("type checked upstream"))
+ {
+ gst::error!(CAT, "Couldn't set the host address as {e:?}, fallback to the default value {DEFAULT_HOST_ADDR:?}");
+ }
+ }
+ "stun-server" => {
+ let mut settings = self.settings.lock().unwrap();
+ settings.stun_server = value
+ .get::<Option<String>>()
+ .expect("type checked upstream")
+ }
+ "turn-servers" => {
+ let mut settings = self.settings.lock().unwrap();
+ settings.turn_servers = value.get::<gst::Array>().expect("type checked upstream")
+ }
+ "timeout" => {
+ let mut settings = self.settings.lock().unwrap();
+ settings.timeout = value.get().unwrap();
+ }
+ _ => unimplemented!(),
+ }
+ }
+
+ fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
+ let settings = self.settings.lock().unwrap();
+ match pspec.name() {
+ "host-addr" => settings.host_addr.to_string().to_value(),
+ "stun-server" => settings.stun_server.to_value(),
+ "turn-servers" => settings.turn_servers.to_value(),
+ "producer-peer-id" => settings.producer_peer_id.to_value(),
+ "timeout" => settings.timeout.to_value(),
+ _ => unimplemented!(),
+ }
+ }
+}
diff --git a/net/webrtc/src/whip_signaller/mod.rs b/net/webrtc/src/whip_signaller/mod.rs
index 16b20d0a6..d699b015c 100644
--- a/net/webrtc/src/whip_signaller/mod.rs
+++ b/net/webrtc/src/whip_signaller/mod.rs
@@ -1,7 +1,7 @@
// SPDX-License-Identifier: MPL-2.0
use crate::signaller::Signallable;
-use gst::glib;
+use gst::{glib, prelude::ObjectExt, subclass::prelude::ObjectSubclassIsExt};
mod imp;
@@ -9,6 +9,10 @@ glib::wrapper! {
pub struct WhipClientSignaller(ObjectSubclass<imp::WhipClient>) @implements Signallable;
}
+glib::wrapper! {
+ pub struct WhipServerSignaller(ObjectSubclass<imp::WhipServer>) @implements Signallable;
+}
+
unsafe impl Send for WhipClientSignaller {}
unsafe impl Sync for WhipClientSignaller {}
@@ -17,3 +21,14 @@ impl Default for WhipClientSignaller {
glib::Object::new()
}
}
+
+unsafe impl Send for WhipServerSignaller {}
+unsafe impl Sync for WhipServerSignaller {}
+
+impl Default for WhipServerSignaller {
+ fn default() -> Self {
+ let sig: WhipServerSignaller = glib::Object::new();
+ sig.connect_closure("webrtcbin-ready", false, sig.imp().on_webrtcbin_ready());
+ sig
+ }
+}