Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/sdroege/gst-plugin-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSanchayan Maity <sanchayan@asymptotic.io>2022-12-01 08:59:40 +0300
committerSanchayan Maity <sanchayan@asymptotic.io>2022-12-05 09:57:07 +0300
commit40680a47abd4857ae6769db462d1ebd7aeb77946 (patch)
tree5b57a893f7d54a4fecb63bab9a8f802fe4990228
parentd18761892edf0f816722b9ac622d6a64e1567ab3 (diff)
webrtchttp: Use tokio runtime for spawning thread used for candidate offer
While at it, we had a bug in whepsrc where for redirect we were incorrectly calling initial_post_request instead of do_post. Fix that. Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/949>
-rw-r--r--net/webrtchttp/Cargo.toml1
-rw-r--r--net/webrtchttp/src/utils.rs53
-rw-r--r--net/webrtchttp/src/whepsrc/imp.rs455
-rw-r--r--net/webrtchttp/src/whipsink/imp.rs449
4 files changed, 509 insertions, 449 deletions
diff --git a/net/webrtchttp/Cargo.toml b/net/webrtchttp/Cargo.toml
index e6eba698..2631ebc2 100644
--- a/net/webrtchttp/Cargo.toml
+++ b/net/webrtchttp/Cargo.toml
@@ -20,6 +20,7 @@ parse_link_header = {version = "0.3", features = ["url"]}
tokio = { version = "1.20.1", default-features = false, features = ["time", "rt-multi-thread"] }
futures = "0.3.23"
bytes = "1"
+async-recursion = "1.0.0"
[lib]
name = "gstwebrtchttp"
diff --git a/net/webrtchttp/src/utils.rs b/net/webrtchttp/src/utils.rs
index e980a963..b2db1551 100644
--- a/net/webrtchttp/src/utils.rs
+++ b/net/webrtchttp/src/utils.rs
@@ -23,6 +23,59 @@ pub static RUNTIME: Lazy<runtime::Runtime> = Lazy::new(|| {
.unwrap()
});
+pub async fn wait_async<F, T>(
+ canceller: &Mutex<Option<future::AbortHandle>>,
+ future: F,
+ timeout: u32,
+) -> Result<T, WaitError>
+where
+ F: Send + Future<Output = T>,
+ T: Send + 'static,
+{
+ let (abort_handle, abort_registration) = future::AbortHandle::new_pair();
+ {
+ let mut canceller_guard = canceller.lock().unwrap();
+ canceller_guard.replace(abort_handle);
+ drop(canceller_guard);
+ }
+
+ let future = async {
+ if timeout == 0 {
+ Ok(future.await)
+ } else {
+ let res = tokio::time::timeout(Duration::from_secs(timeout.into()), future).await;
+
+ match res {
+ Ok(r) => Ok(r),
+ Err(e) => Err(WaitError::FutureError(gst::error_msg!(
+ gst::ResourceError::Read,
+ ["Request timeout, elapsed: {}", e]
+ ))),
+ }
+ }
+ };
+
+ let future = async {
+ match future::Abortable::new(future, abort_registration).await {
+ Ok(Ok(r)) => Ok(r),
+
+ Ok(Err(err)) => Err(WaitError::FutureError(gst::error_msg!(
+ gst::ResourceError::Failed,
+ ["Future resolved with an error {:?}", err]
+ ))),
+
+ Err(future::Aborted) => Err(WaitError::FutureAborted),
+ }
+ };
+
+ let res = future.await;
+
+ let mut canceller_guard = canceller.lock().unwrap();
+ *canceller_guard = None;
+
+ res
+}
+
pub fn wait<F, T>(
canceller: &Mutex<Option<future::AbortHandle>>,
future: F,
diff --git a/net/webrtchttp/src/whepsrc/imp.rs b/net/webrtchttp/src/whepsrc/imp.rs
index ec911fa7..5b38ef1f 100644
--- a/net/webrtchttp/src/whepsrc/imp.rs
+++ b/net/webrtchttp/src/whepsrc/imp.rs
@@ -8,19 +8,20 @@
// SPDX-License-Identifier: MPL-2.0
use crate::utils::{
- build_reqwest_client, parse_redirect_location, set_ice_servers, wait, WaitError,
+ build_reqwest_client, parse_redirect_location, set_ice_servers, wait, wait_async, WaitError,
+ RUNTIME,
};
use crate::IceTransportPolicy;
+use async_recursion::async_recursion;
use bytes::Bytes;
use futures::future;
-use gst::{glib, prelude::*, subclass::prelude::*, ErrorMessage};
+use gst::{glib, prelude::*, subclass::prelude::*};
use gst_sdp::*;
use gst_webrtc::*;
use once_cell::sync::Lazy;
use reqwest::header::{HeaderMap, HeaderValue};
use reqwest::StatusCode;
use std::sync::Mutex;
-use std::thread::{spawn, JoinHandle};
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
@@ -79,14 +80,8 @@ impl Default for Settings {
#[derive(Debug)]
enum State {
Stopped,
- Post {
- redirects: u8,
- thread_handle: Option<JoinHandle<()>>,
- },
- Running {
- whep_resource: String,
- thread_handle: Option<JoinHandle<()>>,
- },
+ Post { redirects: u8 },
+ Running { whep_resource: String },
}
impl Default for State {
@@ -407,6 +402,22 @@ impl BinImpl for WhepSrc {
}
impl WhepSrc {
+ fn raise_error(&self, resource_error: gst::ResourceError, msg: String) {
+ gst::error_msg!(resource_error, [msg.as_str()]);
+ gst::element_imp_error!(self, resource_error, [msg.as_str()]);
+ }
+
+ fn handle_future_error(&self, err: WaitError) {
+ match err {
+ WaitError::FutureAborted => {
+ gst::warning!(CAT, imp: self, "Future aborted")
+ }
+ WaitError::FutureError(err) => {
+ self.raise_error(gst::ResourceError::Failed, err.to_string())
+ }
+ };
+ }
+
fn setup_webrtcbin(&self) {
// The specification requires all m= lines to be bundled (section 4.5)
self.webrtcbin
@@ -429,17 +440,18 @@ impl WhepSrc {
WebRTCICEGatheringState::Complete => {
gst::info!(CAT, imp: self_, "ICE gathering completed");
- let mut state = self_.state.lock().unwrap();
let self_ref = self_.ref_counted();
- gst::debug!(CAT, imp: self_, "Spawning thread to send offer");
- let handle = spawn(move || self_ref.whep_offer());
-
- *state = State::Post {
- redirects: 0,
- thread_handle: Some(handle),
- };
- drop(state);
+ // With tokio's spawn one does not have to .await the
+ // returned JoinHandle to make the provided future start
+ // execution. It will start running in the background
+ // immediately when spawn is called. So silence the clippy
+ // warning.
+ #[allow(clippy::let_underscore_future)]
+ let _ = RUNTIME.spawn(async move {
+ /* Note that we check for a valid WHEP endpoint in change_state */
+ self_ref.whep_offer().await
+ });
}
_ => (),
}
@@ -560,30 +572,27 @@ impl WhepSrc {
drop(settings);
let mut state = self_.state.lock().unwrap();
- *state = State::Post {
- redirects: 0,
- thread_handle: None,
- };
+ *state = State::Post { redirects: 0 };
drop(state);
- if let Err(e) = self_.initial_post_request(endpoint.unwrap()) {
- gst::element_imp_error!(
- self_,
- gst::ResourceError::Failed,
- ["Error in initial post request - {}", e.to_string()]
- );
- return None;
- }
+ self_.initial_post_request(endpoint.unwrap());
None
}
});
}
- fn sdp_message_parse(&self, sdp_bytes: Bytes) -> Result<(), ErrorMessage> {
- let sdp = sdp_message::SDPMessage::parse_buffer(&sdp_bytes).map_err(|_| {
- gst::error_msg!(gst::ResourceError::Failed, ["Could not parse answer SDP"])
- })?;
+ fn sdp_message_parse(&self, sdp_bytes: Bytes) {
+ let sdp = match sdp_message::SDPMessage::parse_buffer(&sdp_bytes) {
+ Ok(sdp) => sdp,
+ Err(_) => {
+ self.raise_error(
+ gst::ResourceError::Failed,
+ "Could not parse answer SDP".to_string(),
+ );
+ return;
+ }
+ };
let remote_sdp = WebRTCSessionDescription::new(WebRTCSDPType::Answer, sdp);
@@ -612,55 +621,63 @@ impl WhepSrc {
.emit_by_name::<()>("add-ice-candidate", &[&m_line_index, &c]);
}
}
-
- Ok(())
}
- fn parse_endpoint_response(
+ async fn parse_endpoint_response(
&self,
- endpoint: reqwest::Url,
- redirects: u8,
+ sess_desc: WebRTCSessionDescription,
resp: reqwest::Response,
- ) -> Result<(), ErrorMessage> {
+ redirects: u8,
+ ) {
+ let endpoint;
+ let timeout;
+ let use_link_headers;
+
+ {
+ let settings = self.settings.lock().unwrap();
+ endpoint =
+ reqwest::Url::parse(settings.whep_endpoint.as_ref().unwrap().as_str()).unwrap();
+ use_link_headers = settings.use_link_headers;
+ timeout = settings.timeout;
+ drop(settings);
+ }
+
match resp.status() {
StatusCode::OK | StatusCode::NO_CONTENT => {
gst::info!(CAT, imp: self, "SDP offer successfully send");
- Ok(())
}
StatusCode::CREATED => {
gst::debug!(CAT, imp: self, "Response headers: {:?}", resp.headers());
- let settings = self
- .settings
- .lock()
- .expect("Failed to acquire settings lock");
- let timeout = settings.timeout;
-
- if settings.use_link_headers {
- set_ice_servers(&self.webrtcbin, resp.headers())?;
+ if use_link_headers {
+ if let Err(e) = set_ice_servers(&self.webrtcbin, resp.headers()) {
+ self.raise_error(gst::ResourceError::Failed, e.to_string());
+ return;
+ };
}
- drop(settings);
-
/* See section 4.2 of the WHEP specification */
let location = match resp.headers().get(reqwest::header::LOCATION) {
Some(location) => location,
None => {
- return Err(gst::error_msg!(
+ self.raise_error(
gst::ResourceError::Failed,
- ["Location header field should be present for WHEP resource URL"]
- ));
+ "Location header field should be present for WHEP resource URL"
+ .to_string(),
+ );
+ return;
}
};
let location = match location.to_str() {
Ok(loc) => loc,
Err(e) => {
- return Err(gst::error_msg!(
+ self.raise_error(
gst::ResourceError::Failed,
- ["Failed to convert location to string {}", e]
- ));
+ format!("Failed to convert location to string: {e}"),
+ );
+ return;
}
};
@@ -668,123 +685,109 @@ impl WhepSrc {
gst::debug!(CAT, imp: self, "WHEP resource: {:?}", location);
- let url = url.join(location).map_err(|err| {
- gst::error_msg!(
- gst::ResourceError::Failed,
- ["URL join operation failed: {:?}", err]
- )
- })?;
-
- let mut state = self.state.lock().unwrap();
- *state = match *state {
- State::Post {
- redirects: _r,
- thread_handle: ref mut h,
- } => State::Running {
- whep_resource: url.to_string(),
- thread_handle: h.take(),
- },
- _ => {
- return Err(gst::error_msg!(
+ let url = match url.join(location) {
+ Ok(joined_url) => joined_url,
+ Err(err) => {
+ self.raise_error(
gst::ResourceError::Failed,
- ["Expected to be in POST state"]
- ));
+ format!("URL join operation failed: {err:?}"),
+ );
+ return;
}
};
- drop(state);
-
- let future = async {
- resp.bytes().await.map_err(|err| {
- gst::error_msg!(
- gst::ResourceError::Failed,
- ["Failed to get response body: {:?}", err]
- )
- })
- };
- match wait(&self.canceller, future, timeout) {
- Ok(ans_bytes) => self.sdp_message_parse(ans_bytes),
- Err(err) => match err {
- WaitError::FutureAborted => Ok(()),
- WaitError::FutureError(e) => Err(e),
+ match wait_async(&self.canceller, resp.bytes(), timeout).await {
+ Ok(res) => match res {
+ Ok(ans_bytes) => {
+ let mut state = self.state.lock().unwrap();
+ *state = match *state {
+ State::Post { redirects: _r } => State::Running {
+ whep_resource: url.to_string(),
+ },
+ _ => {
+ self.raise_error(
+ gst::ResourceError::Failed,
+ "Expected to be in POST state".to_string(),
+ );
+ return;
+ }
+ };
+ drop(state);
+
+ self.sdp_message_parse(ans_bytes)
+ }
+ Err(err) => self.raise_error(gst::ResourceError::Failed, err.to_string()),
},
+ Err(err) => self.handle_future_error(err),
}
}
status if status.is_redirection() => {
if redirects < MAX_REDIRECTS {
- let mut state = self.state.lock().unwrap();
- *state = match *state {
- State::Post {
- redirects: _r,
- thread_handle: ref mut h,
- } => State::Post {
- redirects: redirects + 1,
- thread_handle: h.take(),
- },
- /*
- * As per section 4.6 of the specification, redirection is
- * not required to be supported for the PATCH and DELETE
- * requests to the final WHEP resource URL. Only the initial
- * POST request may support redirection.
- */
- State::Running { .. } => {
- return Err(gst::error_msg!(
- gst::ResourceError::Failed,
- ["Unexpected redirection in RUNNING state"]
- ));
- }
- State::Stopped => unreachable!(),
- };
-
- drop(state);
-
match parse_redirect_location(resp.headers(), &endpoint) {
Ok(redirect_url) => {
+ {
+ let mut state = self.state.lock().unwrap();
+ *state = match *state {
+ State::Post { redirects: _r } => State::Post {
+ redirects: redirects + 1,
+ },
+ /*
+ * As per section 4.6 of the specification, redirection is
+ * not required to be supported for the PATCH and DELETE
+ * requests to the final WHEP resource URL. Only the initial
+ * POST request may support redirection.
+ */
+ State::Running { .. } => {
+ self.raise_error(
+ gst::ResourceError::Failed,
+ "Unexpected redirection in RUNNING state".to_string(),
+ );
+ return;
+ }
+ State::Stopped => unreachable!(),
+ };
+ drop(state);
+ }
+
gst::warning!(
CAT,
imp: self,
"Redirecting endpoint to {}",
redirect_url.as_str()
);
- self.initial_post_request(redirect_url)
+
+ if let Err(err) =
+ wait_async(&self.canceller, self.do_post(sess_desc), timeout).await
+ {
+ self.handle_future_error(err);
+ }
}
- Err(e) => Err(e),
+ Err(e) => self.raise_error(gst::ResourceError::Failed, e.to_string()),
}
} else {
- Err(gst::error_msg!(
+ self.raise_error(
gst::ResourceError::Failed,
- ["Too many redirects. Unable to connect."]
- ))
+ "Too many redirects. Unable to connect.".to_string(),
+ );
}
}
s => {
- let future = async {
- resp.bytes().await.map_err(|err| {
- gst::error_msg!(
+ match wait_async(&self.canceller, resp.bytes(), timeout).await {
+ Ok(r) => {
+ let res = r
+ .map(|x| x.escape_ascii().to_string())
+ .unwrap_or_else(|_| "(no further details)".to_string());
+
+ // FIXME: Check and handle 'Retry-After' header in case of server error
+ self.raise_error(
gst::ResourceError::Failed,
- ["Failed to get response body: {:?}", err]
- )
- })
- };
-
- let settings = self
- .settings
- .lock()
- .expect("Failed to acquire settings lock");
- let timeout = settings.timeout;
- drop(settings);
-
- let res = wait(&self.canceller, future, timeout)
- .map(|x| x.escape_ascii().to_string())
- .unwrap_or_else(|_| "(no further details)".to_string());
-
- // FIXME: Check and handle 'Retry-After' header in case of server error
- Err(gst::error_msg!(
- gst::ResourceError::Failed,
- ["Unexpected response: {} - {}", s.as_str(), res]
- ))
+ format!("Unexpected response: {} - {}", s.as_str(), res),
+ );
+ }
+ Err(err) => self.handle_future_error(err),
+ }
}
}
}
@@ -833,11 +836,16 @@ impl WhepSrc {
&[&offer_sdp, &None::<gst::Promise>],
);
} else {
- gst::error!(CAT, imp: self_, "Reply without an offer: {}", reply);
+ let error = reply
+ .value("error")
+ .expect("structure must have an error value")
+ .get::<glib::Error>()
+ .expect("value must be a GLib error");
+
gst::element_imp_error!(
self_,
gst::LibraryError::Failed,
- ["generate offer::Promise returned with no reply"]
+ ["generate offer::Promise returned with no reply: {}", error]
);
}
});
@@ -878,7 +886,7 @@ impl WhepSrc {
.emit_by_name::<()>("create-offer", &[&None::<gst::Structure>, &promise]);
}
- fn initial_post_request(&self, endpoint: reqwest::Url) -> Result<(), ErrorMessage> {
+ fn initial_post_request(&self, endpoint: reqwest::Url) {
let state = self.state.lock().unwrap();
gst::info!(CAT, imp: self, "WHEP endpoint url: {}", endpoint.as_str());
@@ -886,20 +894,19 @@ impl WhepSrc {
match *state {
State::Post { .. } => (),
_ => {
- return Err(gst::error_msg!(
+ self.raise_error(
gst::ResourceError::Failed,
- ["Trying to do POST in unexpected state"]
- ));
+ "Trying to do POST in unexpected state".to_string(),
+ );
+ return;
}
};
drop(state);
- self.generate_offer();
-
- Ok(())
+ self.generate_offer()
}
- fn whep_offer(&self) {
+ async fn whep_offer(&self) {
let local_desc = self
.webrtcbin
.property::<Option<WebRTCSessionDescription>>("local-description");
@@ -923,40 +930,34 @@ impl WhepSrc {
offer_sdp.sdp().as_text()
);
- if let Err(e) = self.send_sdp(offer_sdp.sdp()) {
- gst::element_imp_error!(
- self,
- gst::ResourceError::Failed,
- ["Error in sending answer - {}", e.to_string()]
- );
- }
- }
-
- fn send_sdp(&self, sdp: SDPMessage) -> Result<(), gst::ErrorMessage> {
- let sess_desc = WebRTCSessionDescription::new(WebRTCSDPType::Offer, sdp);
- let settings = self.settings.lock().unwrap();
-
- let endpoint = reqwest::Url::parse(settings.whep_endpoint.as_ref().unwrap().as_str());
+ let sess_desc = WebRTCSessionDescription::new(WebRTCSDPType::Offer, offer_sdp.sdp());
- if let Err(e) = endpoint {
- return Err(gst::error_msg!(
- gst::ResourceError::Failed,
- ["Could not parse endpoint URL: {}", e]
- ));
+ let timeout;
+ {
+ let settings = self.settings.lock().unwrap();
+ timeout = settings.timeout;
+ drop(settings);
}
- drop(settings);
-
- self.do_post(sess_desc, endpoint.unwrap())
+ if let Err(e) = wait_async(&self.canceller, self.do_post(sess_desc), timeout).await {
+ self.handle_future_error(e);
+ }
}
- fn do_post(
- &self,
- offer: WebRTCSessionDescription,
- endpoint: reqwest::Url,
- ) -> Result<(), gst::ErrorMessage> {
- let settings = self.settings.lock().unwrap();
- let timeout = settings.timeout;
+ #[async_recursion]
+ async fn do_post(&self, offer: WebRTCSessionDescription) {
+ let auth_token;
+ let endpoint;
+ let timeout;
+
+ {
+ let settings = self.settings.lock().unwrap();
+ endpoint =
+ reqwest::Url::parse(settings.whep_endpoint.as_ref().unwrap().as_str()).unwrap();
+ auth_token = settings.auth_token.clone();
+ timeout = settings.timeout;
+ drop(settings);
+ }
let sdp = offer.sdp();
let body = sdp.as_text().unwrap();
@@ -969,8 +970,8 @@ impl WhepSrc {
HeaderValue::from_static("application/sdp"),
);
- if let Some(token) = &settings.auth_token {
- let bearer_token = "Bearer ".to_owned() + token;
+ if let Some(token) = auth_token.as_ref() {
+ let bearer_token = "Bearer ".to_owned() + token.as_str();
headermap.insert(
reqwest::header::AUTHORIZATION,
HeaderValue::from_str(bearer_token.as_str())
@@ -978,8 +979,6 @@ impl WhepSrc {
);
}
- drop(settings);
-
gst::debug!(
CAT,
imp: self,
@@ -987,51 +986,63 @@ impl WhepSrc {
endpoint.as_str()
);
- let future = async {
+ let res = wait_async(
+ &self.canceller,
self.client
.request(reqwest::Method::POST, endpoint.clone())
.headers(headermap)
.body(body)
- .send()
- .await
- .map_err(|err| {
- gst::error_msg!(
- gst::ResourceError::Failed,
- ["HTTP POST request failed {}: {:?}", endpoint.as_str(), err]
- )
- })
- };
+ .send(),
+ timeout,
+ )
+ .await;
- match wait(&self.canceller, future, timeout) {
- Ok(resp) => self.parse_endpoint_response(endpoint, 0, resp),
- Err(err) => match err {
- WaitError::FutureAborted => Ok(()),
- WaitError::FutureError(e) => Err(e),
+ match res {
+ Ok(resp) => match resp {
+ Ok(r) => {
+ #[allow(unused_mut)]
+ let mut redirects;
+
+ {
+ let state = self.state.lock().unwrap();
+ redirects = match *state {
+ State::Post { redirects } => redirects,
+ _ => {
+ self.raise_error(
+ gst::ResourceError::Failed,
+ "Trying to do POST in unexpected state".to_string(),
+ );
+ return;
+ }
+ };
+ drop(state);
+ }
+
+ if let Err(e) = wait_async(
+ &self.canceller,
+ self.parse_endpoint_response(offer, r, redirects),
+ timeout,
+ )
+ .await
+ {
+ self.handle_future_error(e);
+ }
+ }
+ Err(err) => self.raise_error(gst::ResourceError::Failed, err.to_string()),
},
+ Err(err) => self.handle_future_error(err),
}
}
fn terminate_session(&self) {
let settings = self.settings.lock().unwrap();
- let mut state = self.state.lock().unwrap();
+ let state = self.state.lock().unwrap();
let timeout = settings.timeout;
- let resource_url;
- (*state, resource_url) = match *state {
+ let resource_url = match *state {
State::Running {
whep_resource: ref whep_resource_url,
- thread_handle: ref mut h,
- } => {
- if let Some(th) = h.take() {
- match th.join() {
- Ok(_) => {
- gst::debug!(CAT, imp: self, "Send offer thread joined successfully");
- }
- Err(e) => gst::error!(CAT, imp: self, "Failed to join thread: {:?}", e),
- }
- }
- (State::Stopped, whep_resource_url.clone())
- }
+ } => whep_resource_url.clone(),
_ => {
gst::element_imp_error!(
self,
diff --git a/net/webrtchttp/src/whipsink/imp.rs b/net/webrtchttp/src/whipsink/imp.rs
index 35f28067..12ef0dee 100644
--- a/net/webrtchttp/src/whipsink/imp.rs
+++ b/net/webrtchttp/src/whipsink/imp.rs
@@ -8,14 +8,15 @@
// SPDX-License-Identifier: MPL-2.0
use crate::utils::{
- build_reqwest_client, parse_redirect_location, set_ice_servers, wait, WaitError,
+ build_reqwest_client, parse_redirect_location, set_ice_servers, wait, wait_async, WaitError,
+ RUNTIME,
};
use crate::IceTransportPolicy;
+use async_recursion::async_recursion;
use futures::future;
use gst::glib;
use gst::prelude::*;
use gst::subclass::prelude::*;
-use gst::ErrorMessage;
use gst_sdp::*;
use gst_webrtc::*;
use once_cell::sync::Lazy;
@@ -23,14 +24,12 @@ use reqwest::header::HeaderMap;
use reqwest::header::HeaderValue;
use reqwest::StatusCode;
use std::sync::Mutex;
-use std::thread::{spawn, JoinHandle};
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new("whipsink", gst::DebugColorFlags::empty(), Some("WHIP Sink"))
});
-const DEFAULT_ICE_TRANSPORT_POLICY: IceTransportPolicy =
- IceTransportPolicy::All;
+const DEFAULT_ICE_TRANSPORT_POLICY: IceTransportPolicy = IceTransportPolicy::All;
const MAX_REDIRECTS: u8 = 10;
const DEFAULT_TIMEOUT: u32 = 15;
@@ -63,14 +62,8 @@ impl Default for Settings {
#[derive(Debug)]
enum State {
Stopped,
- Post {
- redirects: u8,
- thread_handle: Option<JoinHandle<()>>,
- },
- Running {
- whip_resource_url: String,
- thread_handle: Option<JoinHandle<()>>,
- },
+ Post { redirects: u8 },
+ Running { whip_resource_url: String },
}
impl Default for State {
@@ -368,17 +361,18 @@ impl ObjectImpl for WhipSink {
WebRTCICEGatheringState::Complete => {
gst::info!(CAT, imp: self_, "ICE gathering completed");
- let mut state = self_.state.lock().unwrap();
let self_ref = self_.ref_counted();
- gst::debug!(CAT, imp: self_, "Spawning thread to send offer");
- let handle = spawn(move || self_ref.send_offer());
-
- *state = State::Post {
- redirects: 0,
- thread_handle: Some(handle),
- };
- drop(state);
+ // With tokio's spawn one does not have to .await the
+ // returned JoinHandle to make the provided future start
+ // execution. It will start running in the background
+ // immediately when spawn is called. So silence the clippy
+ // warning.
+ #[allow(clippy::let_underscore_future)]
+ let _ = RUNTIME.spawn(async move {
+ /* Note that we check for a valid WHIP endpoint in change_state */
+ self_ref.send_offer().await
+ });
}
_ => (),
}
@@ -491,27 +485,28 @@ impl ObjectSubclass for WhipSink {
}
impl WhipSink {
- fn send_offer(&self) {
- let settings = self.settings.lock().unwrap();
-
- /* Note that we check for a valid WHIP endpoint in change_state */
- let endpoint = reqwest::Url::parse(settings.whip_endpoint.as_ref().unwrap().as_str());
- if let Err(e) = endpoint {
- gst::element_imp_error!(
- self,
- gst::ResourceError::Failed,
- ["Could not parse endpoint URL: {}", e]
- );
- return;
- }
+ fn raise_error(&self, resource_error: gst::ResourceError, msg: String) {
+ gst::error_msg!(resource_error, [msg.as_str()]);
+ gst::element_imp_error!(self, resource_error, [msg.as_str()]);
+ }
- drop(settings);
- let mut state = self.state.lock().unwrap();
- *state = State::Post {
- redirects: 0,
- thread_handle: None,
+ fn handle_future_error(&self, err: WaitError) {
+ match err {
+ WaitError::FutureAborted => {
+ gst::warning!(CAT, imp: self, "Future aborted")
+ }
+ WaitError::FutureError(err) => {
+ self.raise_error(gst::ResourceError::Failed, err.to_string())
+ }
};
- drop(state);
+ }
+
+ async fn send_offer(&self) {
+ {
+ let mut state = self.state.lock().unwrap();
+ *state = State::Post { redirects: 0 };
+ drop(state);
+ }
let local_desc = self
.webrtcbin
@@ -536,40 +531,50 @@ impl WhipSink {
offer_sdp.sdp().as_text()
);
- match self.do_post(offer_sdp, endpoint.unwrap()) {
- Ok(_) => (),
- Err(e) => {
- gst::element_imp_error!(
- self,
- gst::ResourceError::Failed,
- ["Failed to send offer: {}", e]
- );
- }
+ let timeout;
+ {
+ let settings = self.settings.lock().unwrap();
+ timeout = settings.timeout;
+ drop(settings);
+ }
+
+ if let Err(e) = wait_async(&self.canceller, self.do_post(offer_sdp), timeout).await {
+ self.handle_future_error(e);
}
}
- fn do_post(
- &self,
- offer: gst_webrtc::WebRTCSessionDescription,
- endpoint: reqwest::Url,
- ) -> Result<(), gst::ErrorMessage> {
- let settings = self.settings.lock().unwrap();
- let state = self.state.lock().unwrap();
- let timeout = settings.timeout;
+ #[async_recursion]
+ async fn do_post(&self, offer: gst_webrtc::WebRTCSessionDescription) {
+ let auth_token;
+ let endpoint;
+ let timeout;
- let redirects = match *state {
- State::Post {
- redirects,
- thread_handle: ref _h,
- } => redirects,
- _ => {
- return Err(gst::error_msg!(
- gst::ResourceError::Failed,
- ["Trying to POST in unexpected state"]
- ));
- }
- };
- drop(state);
+ {
+ let settings = self.settings.lock().unwrap();
+ endpoint =
+ reqwest::Url::parse(settings.whip_endpoint.as_ref().unwrap().as_str()).unwrap();
+ auth_token = settings.auth_token.clone();
+ timeout = settings.timeout;
+ drop(settings);
+ }
+
+ #[allow(unused_mut)]
+ let mut redirects;
+
+ {
+ let state = self.state.lock().unwrap();
+ redirects = match *state {
+ State::Post { redirects } => redirects,
+ _ => {
+ self.raise_error(
+ gst::ResourceError::Failed,
+ "Trying to do POST in unexpected state".to_string(),
+ );
+ return;
+ }
+ };
+ drop(state);
+ }
// Default policy for redirect does not share the auth token to new location
// So disable inbuilt redirecting and do a recursive call upon 3xx response code
@@ -586,7 +591,7 @@ impl WhipSink {
HeaderValue::from_static("application/sdp"),
);
- if let Some(token) = &settings.auth_token {
+ if let Some(token) = auth_token.as_ref() {
let bearer_token = "Bearer ".to_owned() + token;
headermap.insert(
reqwest::header::AUTHORIZATION,
@@ -595,51 +600,63 @@ impl WhipSink {
);
}
- let future = async {
+ let res = wait_async(
+ &self.canceller,
client
- .request(reqwest::Method::POST, endpoint.as_ref())
+ .request(reqwest::Method::POST, endpoint.clone())
.headers(headermap)
.body(body)
- .send()
- .await
- .map_err(|err| {
- gst::error_msg!(
- gst::ResourceError::Failed,
- ["POST request failed: {:?}", err]
- )
- })
- };
-
- drop(settings);
+ .send(),
+ timeout,
+ )
+ .await;
- match wait(&self.canceller, future, timeout) {
- Ok(resp) => self.parse_endpoint_response(offer, endpoint, redirects, resp),
- Err(err) => match err {
- WaitError::FutureAborted => Ok(()),
- WaitError::FutureError(e) => Err(e),
+ match res {
+ Ok(r) => match r {
+ Ok(resp) => {
+ if let Err(e) = wait_async(
+ &self.canceller,
+ self.parse_endpoint_response(offer, resp, redirects),
+ timeout,
+ )
+ .await
+ {
+ self.handle_future_error(e);
+ }
+ }
+ Err(err) => self.raise_error(gst::ResourceError::Failed, err.to_string()),
},
+ Err(err) => self.handle_future_error(err),
}
}
- fn parse_endpoint_response(
+ async fn parse_endpoint_response(
&self,
offer: gst_webrtc::WebRTCSessionDescription,
- endpoint: reqwest::Url,
- redirects: u8,
resp: reqwest::Response,
- ) -> Result<(), ErrorMessage> {
+ redirects: u8,
+ ) {
+ let endpoint;
+ let timeout;
+ let use_link_headers;
+
+ {
+ let settings = self.settings.lock().unwrap();
+ endpoint =
+ reqwest::Url::parse(settings.whip_endpoint.as_ref().unwrap().as_str()).unwrap();
+ use_link_headers = settings.use_link_headers;
+ timeout = settings.timeout;
+ drop(settings);
+ }
+
match resp.status() {
StatusCode::OK | StatusCode::CREATED => {
- let settings = self
- .settings
- .lock()
- .expect("Failed to acquire settings lock");
- let timeout = settings.timeout;
-
- if settings.use_link_headers {
- set_ice_servers(&self.webrtcbin, resp.headers())?;
+ if use_link_headers {
+ if let Err(e) = set_ice_servers(&self.webrtcbin, resp.headers()) {
+ self.raise_error(gst::ResourceError::Failed, e.to_string());
+ return;
+ };
}
- drop(settings);
// Get the url of the resource from 'location' header.
// The resource created is expected be a relative path
@@ -650,20 +667,23 @@ impl WhipSink {
let location = match resp.headers().get(reqwest::header::LOCATION) {
Some(location) => location,
None => {
- return Err(gst::error_msg!(
+ self.raise_error(
gst::ResourceError::Failed,
- ["Location header field should be present for WHIP resource URL"]
- ));
+ "Location header field should be present for WHIP resource URL"
+ .to_string(),
+ );
+ return;
}
};
let location = match location.to_str() {
Ok(loc) => loc,
Err(e) => {
- return Err(gst::error_msg!(
+ self.raise_error(
gst::ResourceError::Failed,
- ["Failed to convert location to string {}", e]
- ));
+ format!("Failed to convert location to string: {e}"),
+ );
+ return;
}
};
@@ -671,164 +691,139 @@ impl WhipSink {
gst::debug!(CAT, imp: self, "WHIP resource: {:?}", location);
- let url = url.join(location).map_err(|err| {
- gst::error_msg!(
- gst::ResourceError::Failed,
- ["URL join operation failed: {:?}", err]
- )
- })?;
-
- let mut state = self.state.lock().unwrap();
- *state = match *state {
- State::Post {
- redirects: _r,
- thread_handle: ref mut h,
- } => State::Running {
- whip_resource_url: url.to_string(),
- thread_handle: h.take(),
- },
- _ => {
- return Err(gst::error_msg!(
+ let url = match url.join(location) {
+ Ok(joined_url) => joined_url,
+ Err(err) => {
+ self.raise_error(
gst::ResourceError::Failed,
- ["Expected to be in POST state"]
- ));
+ format!("URL join operation failed: {err:?}"),
+ );
+ return;
}
};
- drop(state);
- let future = async {
- resp.bytes().await.map_err(|err| {
- gst::error_msg!(
- gst::ResourceError::Failed,
- ["Failed to get response body: {:?}", err]
- )
- })
- };
-
- match wait(&self.canceller, future, timeout) {
- Ok(ans_bytes) => match sdp_message::SDPMessage::parse_buffer(&ans_bytes) {
- Ok(ans_sdp) => {
- let answer = gst_webrtc::WebRTCSessionDescription::new(
- gst_webrtc::WebRTCSDPType::Answer,
- ans_sdp,
- );
- self.webrtcbin.emit_by_name::<()>(
- "set-remote-description",
- &[&answer, &None::<gst::Promise>],
+ {
+ let mut state = self.state.lock().unwrap();
+ *state = match *state {
+ State::Post { redirects: _r } => State::Running {
+ whip_resource_url: url.to_string(),
+ },
+ _ => {
+ self.raise_error(
+ gst::ResourceError::Failed,
+ "Expected to be in POST state".to_string(),
);
- Ok(())
+ return;
}
+ };
+ drop(state);
+ }
- Err(e) => Err(gst::error_msg!(
- gst::ResourceError::Failed,
- ["Could not parse answer SDP: {}", e]
- )),
- },
- Err(err) => match err {
- WaitError::FutureAborted => Ok(()),
- WaitError::FutureError(e) => Err(e),
+ match wait_async(&self.canceller, resp.bytes(), timeout).await {
+ Ok(res) => match res {
+ Ok(ans_bytes) => match sdp_message::SDPMessage::parse_buffer(&ans_bytes) {
+ Ok(ans_sdp) => {
+ let answer = gst_webrtc::WebRTCSessionDescription::new(
+ gst_webrtc::WebRTCSDPType::Answer,
+ ans_sdp,
+ );
+ self.webrtcbin.emit_by_name::<()>(
+ "set-remote-description",
+ &[&answer, &None::<gst::Promise>],
+ );
+ }
+ Err(err) => {
+ self.raise_error(
+ gst::ResourceError::Failed,
+ format!("Could not parse answer SDP: {err}"),
+ );
+ }
+ },
+ Err(err) => self.raise_error(gst::ResourceError::Failed, err.to_string()),
},
+ Err(err) => self.handle_future_error(err),
}
}
s if s.is_redirection() => {
if redirects < MAX_REDIRECTS {
- let mut state = self.state.lock().unwrap();
- *state = match *state {
- State::Post {
- redirects: _r,
- thread_handle: ref mut h,
- } => State::Post {
- redirects: redirects + 1,
- thread_handle: h.take(),
- },
- /*
- * As per section 4.6 of the specification, redirection is
- * not required to be supported for the PATCH and DELETE
- * requests to the final WHEP resource URL. Only the initial
- * POST request may support redirection.
- */
- State::Running { .. } => {
- return Err(gst::error_msg!(
- gst::ResourceError::Failed,
- ["Unexpected redirection in RUNNING state"]
- ));
- }
- State::Stopped => unreachable!(),
- };
- drop(state);
-
match parse_redirect_location(resp.headers(), &endpoint) {
Ok(redirect_url) => {
+ {
+ let mut state = self.state.lock().unwrap();
+ *state = match *state {
+ State::Post { redirects: _r } => State::Post {
+ redirects: redirects + 1,
+ },
+ /*
+ * As per section 4.6 of the specification, redirection is
+ * not required to be supported for the PATCH and DELETE
+ * requests to the final WHEP resource URL. Only the initial
+ * POST request may support redirection.
+ */
+ State::Running { .. } => {
+ self.raise_error(
+ gst::ResourceError::Failed,
+ "Unexpected redirection in RUNNING state".to_string(),
+ );
+ return;
+ }
+ State::Stopped => unreachable!(),
+ };
+ drop(state);
+ }
+
gst::debug!(
CAT,
imp: self,
"Redirecting endpoint to {}",
redirect_url.as_str()
);
- self.do_post(offer, redirect_url)
+
+ if let Err(err) =
+ wait_async(&self.canceller, self.do_post(offer), timeout).await
+ {
+ self.handle_future_error(err);
+ }
}
- Err(e) => Err(e),
+ Err(e) => self.raise_error(gst::ResourceError::Failed, e.to_string()),
}
} else {
- Err(gst::error_msg!(
+ self.raise_error(
gst::ResourceError::Failed,
- ["Too many redirects. Unable to connect to do POST"]
- ))
+ "Too many redirects. Unable to connect.".to_string(),
+ );
}
}
s => {
- let future = async {
- resp.bytes().await.map_err(|err| {
- gst::error_msg!(
+ match wait_async(&self.canceller, resp.bytes(), timeout).await {
+ Ok(r) => {
+ let res = r
+ .map(|x| x.escape_ascii().to_string())
+ .unwrap_or_else(|_| "(no further details)".to_string());
+
+ // FIXME: Check and handle 'Retry-After' header in case of server error
+ self.raise_error(
gst::ResourceError::Failed,
- ["Failed to get response body: {:?}", err]
- )
- })
- };
-
- let settings = self
- .settings
- .lock()
- .expect("Failed to acquire settings lock");
- let timeout = settings.timeout;
- drop(settings);
-
- let resp = wait(&self.canceller, future, timeout)
- .map(|x| x.escape_ascii().to_string())
- .unwrap_or_else(|_| "(no further details)".to_string());
-
- // FIXME: Check and handle 'Retry-After' header in case of server error
- Err(gst::error_msg!(
- gst::ResourceError::Failed,
- ["Server returned error: {} - {}", s.as_str(), resp]
- ))
+ format!("Unexpected response: {} - {}", s.as_str(), res),
+ );
+ }
+ Err(err) => self.handle_future_error(err),
+ }
}
}
}
fn terminate_session(&self) {
let settings = self.settings.lock().unwrap();
- let mut state = self.state.lock().unwrap();
+ let state = self.state.lock().unwrap();
let timeout = settings.timeout;
- let resource_url;
- (*state, resource_url) = match *state {
+ let resource_url = match *state {
State::Running {
whip_resource_url: ref resource_url,
- thread_handle: ref mut h,
- } => {
- if let Some(th) = h.take() {
- match th.join() {
- Ok(_) => {
- gst::debug!(CAT, imp: self, "Send offer thread joined successfully");
- }
- Err(e) => gst::error!(CAT, imp: self, "Failed to join thread: {:?}", e),
- }
- }
- (State::Stopped, resource_url.clone())
- }
+ } => resource_url.clone(),
_ => {
gst::element_imp_error!(
self,