Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSanchayan Maity <sanchayan@asymptotic.io>2022-10-28 11:32:23 +0300
committerSanchayan Maity <sanchayan@asymptotic.io>2022-12-05 08:34:45 +0300
commitb5daa92c9dda1e3e28e9c5000940d0256d5c8ece (patch)
tree1b0a14e21de5023bb1b64399e0f126597f204757 /net/webrtchttp
parentcc7419308bbc212e8da8d7be326561b43fe716c7 (diff)
webrtchttp: Implement timeout for waiting on futures
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/949>
Diffstat (limited to 'net/webrtchttp')
-rw-r--r--net/webrtchttp/src/utils.rs24
-rw-r--r--net/webrtchttp/src/whepsrc/imp.rs107
-rw-r--r--net/webrtchttp/src/whipsink/imp.rs149
3 files changed, 214 insertions, 66 deletions
diff --git a/net/webrtchttp/src/utils.rs b/net/webrtchttp/src/utils.rs
index 5980fd320..99eec59d4 100644
--- a/net/webrtchttp/src/utils.rs
+++ b/net/webrtchttp/src/utils.rs
@@ -5,8 +5,8 @@ use once_cell::sync::Lazy;
use parse_link_header;
use reqwest::header::HeaderMap;
use reqwest::redirect::Policy;
-use std::fmt::Display;
use std::sync::Mutex;
+use std::time::Duration;
use tokio::runtime;
pub static RUNTIME: Lazy<runtime::Runtime> = Lazy::new(|| {
@@ -18,14 +18,14 @@ pub static RUNTIME: Lazy<runtime::Runtime> = Lazy::new(|| {
.unwrap()
});
-pub fn wait<F, T, E>(
+pub fn wait<F, T>(
canceller: &Mutex<Option<future::AbortHandle>>,
future: F,
+ timeout: u32
) -> Result<T, ErrorMessage>
where
- F: Send + Future<Output = Result<T, E>>,
+ F: Send + Future<Output = Result<T, ErrorMessage>>,
T: Send + 'static,
- E: Send + Display,
{
let mut canceller_guard = canceller.lock().unwrap();
let (abort_handle, abort_registration) = future::AbortHandle::new_pair();
@@ -41,6 +41,22 @@ where
drop(canceller_guard);
let future = async {
+ if timeout == 0 {
+ future.await
+ } else {
+ let res = tokio::time::timeout(Duration::from_secs(timeout.into()), future).await;
+
+ match res {
+ Ok(r) => r,
+ Err(e) => Err(gst::error_msg!(
+ gst::ResourceError::Read,
+ ["Request timeout, elapsed: {}", e.to_string()]
+ )),
+ }
+ }
+ };
+
+ let future = async {
match future::Abortable::new(future, abort_registration).await {
Ok(Ok(res)) => Ok(res),
diff --git a/net/webrtchttp/src/whepsrc/imp.rs b/net/webrtchttp/src/whepsrc/imp.rs
index a9a038f7c..8a1acdcc5 100644
--- a/net/webrtchttp/src/whepsrc/imp.rs
+++ b/net/webrtchttp/src/whepsrc/imp.rs
@@ -30,6 +30,7 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
const DEFAULT_ICE_TRANSPORT_POLICY: GstRsWebRTCICETransportPolicy =
GstRsWebRTCICETransportPolicy::All;
const MAX_REDIRECTS: u8 = 10;
+const DEFAULT_TIMEOUT: u32 = 15;
#[derive(Debug, Clone)]
struct Settings {
@@ -41,6 +42,7 @@ struct Settings {
auth_token: Option<String>,
use_link_headers: bool,
ice_transport_policy: GstRsWebRTCICETransportPolicy,
+ timeout: u32,
}
#[allow(clippy::derivable_impls)]
@@ -67,6 +69,7 @@ impl Default for Settings {
auth_token: None,
use_link_headers: false,
ice_transport_policy: DEFAULT_ICE_TRANSPORT_POLICY,
+ timeout: DEFAULT_TIMEOUT,
}
}
}
@@ -242,6 +245,13 @@ impl ObjectImpl for WhepSrc {
.nick("ICE transport policy")
.blurb("The policy to apply for ICE transport")
.build(),
+ glib::ParamSpecUInt::builder("timeout")
+ .nick("Timeout")
+ .blurb("Value in seconds to timeout WHEP endpoint requests (0 = No timeout).")
+ .maximum(3600)
+ .default_value(DEFAULT_TIMEOUT)
+ .readwrite()
+ .build(),
]
});
PROPERTIES.as_ref()
@@ -307,6 +317,10 @@ impl ObjectImpl for WhepSrc {
.set_property_from_str("ice-transport-policy", "all");
}
}
+ "timeout" => {
+ let mut settings = self.settings.lock().unwrap();
+ settings.timeout = value.get().expect("type checked upstream");
+ }
_ => unimplemented!(),
}
}
@@ -345,6 +359,10 @@ impl ObjectImpl for WhepSrc {
let settings = self.settings.lock().unwrap();
settings.ice_transport_policy.to_value()
}
+ "timeout" => {
+ let settings = self.settings.lock().unwrap();
+ settings.timeout.to_value()
+ }
_ => unimplemented!(),
}
}
@@ -601,9 +619,12 @@ impl WhepSrc {
.settings
.lock()
.expect("Failed to acquire settings lock");
+ let timeout = settings.timeout;
+
if settings.use_link_headers {
set_ice_servers(&self.webrtcbin, resp.headers())?;
}
+
drop(settings);
/* See section 4.2 of the WHEP specification */
@@ -644,7 +665,16 @@ impl WhepSrc {
};
drop(state);
- let ans_bytes = wait(&self.canceller, resp.bytes())?;
+ let future = async {
+ resp.bytes().await.map_err(|err| {
+ gst::error_msg!(
+ gst::ResourceError::Failed,
+ ["Failed to get response body: {:?}", err]
+ )
+ })
+ };
+
+ let ans_bytes = wait(&self.canceller, future, timeout)?;
self.sdp_message_parse(ans_bytes)
}
@@ -692,16 +722,30 @@ impl WhepSrc {
}
s => {
+ let future = async {
+ resp.bytes().await.map_err(|err| {
+ gst::error_msg!(
+ gst::ResourceError::Failed,
+ ["Failed to get response body: {:?}", err]
+ )
+ })
+ };
+
+ let settings = self
+ .settings
+ .lock()
+ .expect("Failed to acquire settings lock");
+ let timeout = settings.timeout;
+ drop(settings);
+
+ let res = wait(&self.canceller, future, timeout)
+ .map(|x| x.escape_ascii().to_string())
+ .unwrap_or_else(|_| "(no further details)".to_string());
+
// FIXME: Check and handle 'Retry-After' header in case of server error
Err(gst::error_msg!(
gst::ResourceError::Failed,
- [
- "Unexpected response: {} - {}",
- s.as_str(),
- wait(&self.canceller, resp.bytes())
- .map(|x| x.escape_ascii().to_string())
- .unwrap_or_else(|_| "(no further details)".to_string())
- ]
+ ["Unexpected response: {} - {}", s.as_str(), res]
))
}
}
@@ -751,12 +795,7 @@ impl WhepSrc {
&[&offer_sdp, &None::<gst::Promise>],
);
} else {
- gst::error!(
- CAT,
- imp: self_,
- "Reply without an offer: {}",
- reply
- );
+ gst::error!(CAT, imp: self_, "Reply without an offer: {}", reply);
element_imp_error!(
self_,
gst::LibraryError::Failed,
@@ -879,6 +918,7 @@ impl WhepSrc {
endpoint: reqwest::Url,
) -> Result<(), gst::ErrorMessage> {
let settings = self.settings.lock().unwrap();
+ let timeout = settings.timeout;
let sdp = offer.sdp();
let body = sdp.as_text().unwrap();
@@ -909,14 +949,22 @@ impl WhepSrc {
endpoint.as_str()
);
- let future = self
- .client
- .request(reqwest::Method::POST, endpoint.clone())
- .headers(headermap)
- .body(body)
- .send();
+ let future = async {
+ self.client
+ .request(reqwest::Method::POST, endpoint.clone())
+ .headers(headermap)
+ .body(body)
+ .send()
+ .await
+ .map_err(|err| {
+ gst::error_msg!(
+ gst::ResourceError::Failed,
+ ["HTTP POST request failed {}: {:?}", endpoint.as_str(), err]
+ )
+ })
+ };
- let resp = wait(&self.canceller, future)?;
+ let resp = wait(&self.canceller, future, timeout)?;
self.parse_endpoint_response(endpoint, 0, resp)
}
@@ -924,6 +972,7 @@ impl WhepSrc {
fn terminate_session(&self) {
let settings = self.settings.lock().unwrap();
let state = self.state.lock().unwrap();
+ let timeout = settings.timeout;
let resource_url = match *state {
State::Running {
@@ -957,9 +1006,21 @@ impl WhepSrc {
/* DELETE request goes to the WHEP resource URL. See section 3 of the specification. */
let client = build_reqwest_client(reqwest::redirect::Policy::default());
- let future = client.delete(resource_url).headers(headermap).send();
+ let future = async {
+ client
+ .delete(resource_url.clone())
+ .headers(headermap)
+ .send()
+ .await
+ .map_err(|err| {
+ gst::error_msg!(
+ gst::ResourceError::Failed,
+ ["DELETE request failed {}: {:?}", resource_url, err]
+ )
+ })
+ };
- let res = wait(&self.canceller, future);
+ let res = wait(&self.canceller, future, timeout);
match res {
Ok(r) => {
gst::debug!(CAT, imp: self, "Response to DELETE : {}", r.status());
diff --git a/net/webrtchttp/src/whipsink/imp.rs b/net/webrtchttp/src/whipsink/imp.rs
index ee712a3c6..5d0616e93 100644
--- a/net/webrtchttp/src/whipsink/imp.rs
+++ b/net/webrtchttp/src/whipsink/imp.rs
@@ -30,6 +30,7 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
const DEFAULT_ICE_TRANSPORT_POLICY: GstRsWebRTCICETransportPolicy =
GstRsWebRTCICETransportPolicy::All;
const MAX_REDIRECTS: u8 = 10;
+const DEFAULT_TIMEOUT: u32 = 15;
#[derive(Debug, Clone)]
struct Settings {
@@ -39,6 +40,7 @@ struct Settings {
turn_server: Option<String>,
stun_server: Option<String>,
ice_transport_policy: GstRsWebRTCICETransportPolicy,
+ timeout: u32,
}
#[allow(clippy::derivable_impls)]
@@ -51,6 +53,7 @@ impl Default for Settings {
stun_server: None,
turn_server: None,
ice_transport_policy: DEFAULT_ICE_TRANSPORT_POLICY,
+ timeout: DEFAULT_TIMEOUT,
}
}
}
@@ -232,6 +235,13 @@ impl ObjectImpl for WhipSink {
.nick("ICE transport policy")
.blurb("The policy to apply for ICE transport")
.build(),
+
+ glib::ParamSpecUInt::builder("timeout")
+ .nick("Timeout")
+ .blurb("Value in seconds to timeout WHIP endpoint requests (0 = No timeout).")
+ .maximum(3600)
+ .default_value(DEFAULT_TIMEOUT)
+ .build(),
]
});
PROPERTIES.as_ref()
@@ -283,6 +293,10 @@ impl ObjectImpl for WhipSink {
.set_property_from_str("ice-transport-policy", "all");
}
}
+ "timeout" => {
+ let mut settings = self.settings.lock().unwrap();
+ settings.timeout = value.get().expect("type checked upstream");
+ }
_ => unimplemented!(),
}
}
@@ -313,6 +327,10 @@ impl ObjectImpl for WhipSink {
let settings = self.settings.lock().unwrap();
settings.ice_transport_policy.to_value()
}
+ "timeout" => {
+ let settings = self.settings.lock().unwrap();
+ settings.timeout.to_value()
+ }
_ => unimplemented!(),
}
}
@@ -472,6 +490,7 @@ impl WhipSink {
fn lookup_ice_servers(&self, endpoint: reqwest::Url) -> Result<(), ErrorMessage> {
let settings = self.settings.lock().unwrap();
let state = self.state.lock().unwrap();
+ let timeout = settings.timeout;
let redirects = match *state {
State::Options { redirects } => redirects,
@@ -505,12 +524,21 @@ impl WhipSink {
);
}
- let future = client
- .request(reqwest::Method::OPTIONS, endpoint.as_ref())
- .headers(headermap)
- .send();
+ let future = async {
+ client
+ .request(reqwest::Method::OPTIONS, endpoint.as_ref())
+ .headers(headermap)
+ .send()
+ .await
+ .map_err(|err| {
+ gst::error_msg!(
+ gst::ResourceError::Failed,
+ ["OPTIONS request failed: {:?}", err]
+ )
+ })
+ };
- let resp = wait(&self.canceller, future)?;
+ let resp = wait(&self.canceller, future, timeout)?;
match resp.status() {
StatusCode::NO_CONTENT => {
@@ -544,14 +572,27 @@ impl WhipSink {
))
}
}
- status => Err(gst::error_msg!(
- gst::ResourceError::Failed,
- [
- "lookup_ice_servers - Unexpected response {} {:?}",
- status,
- wait(&self.canceller, resp.bytes()).unwrap()
- ]
- )),
+ status => {
+ let future = async {
+ resp.bytes().await.map_err(|err| {
+ gst::error_msg!(
+ gst::ResourceError::Failed,
+ ["Failed to get response body: {:?}", err]
+ )
+ })
+ };
+
+ let res = wait(&self.canceller, future, timeout);
+
+ Err(gst::error_msg!(
+ gst::ResourceError::Failed,
+ [
+ "lookup_ice_servers - Unexpected response {} {:?}",
+ status,
+ res
+ ]
+ ))
+ }
}
}
@@ -621,6 +662,7 @@ impl WhipSink {
) -> Result<gst_webrtc::WebRTCSessionDescription, gst::ErrorMessage> {
let settings = self.settings.lock().unwrap();
let state = self.state.lock().unwrap();
+ let timeout = settings.timeout;
let redirects = match *state {
State::Post { redirects } => redirects,
@@ -658,13 +700,22 @@ impl WhipSink {
);
}
- let future = client
- .request(reqwest::Method::POST, endpoint.as_ref())
- .headers(headermap)
- .body(body)
- .send();
+ let future = async {
+ client
+ .request(reqwest::Method::POST, endpoint.as_ref())
+ .headers(headermap)
+ .body(body)
+ .send()
+ .await
+ .map_err(|err| {
+ gst::error_msg!(
+ gst::ResourceError::Failed,
+ ["POST request failed: {:?}", err]
+ )
+ })
+ };
- let resp = wait(&self.canceller, future)?;
+ let resp = wait(&self.canceller, future, timeout)?;
let res = match resp.status() {
StatusCode::OK | StatusCode::CREATED => {
@@ -716,7 +767,16 @@ impl WhipSink {
};
drop(state);
- let ans_bytes = wait(&self.canceller, resp.bytes())?;
+ let future = async {
+ resp.bytes().await.map_err(|err| {
+ gst::error_msg!(
+ gst::ResourceError::Failed,
+ ["Failed to get response body: {:?}", err]
+ )
+ })
+ };
+
+ let ans_bytes = wait(&self.canceller, future, timeout)?;
match sdp_message::SDPMessage::parse_buffer(&ans_bytes) {
Ok(ans_sdp) => {
@@ -762,28 +822,26 @@ impl WhipSink {
}
}
- s if s.is_server_error() => {
+ s => {
+ let future = async {
+ resp.bytes().await.map_err(|err| {
+ gst::error_msg!(
+ gst::ResourceError::Failed,
+ ["Failed to get response body: {:?}", err]
+ )
+ })
+ };
+
+ let resp = wait(&self.canceller, future, timeout)
+ .map(|x| x.escape_ascii().to_string())
+ .unwrap_or_else(|_| "(no further details)".to_string());
+
// FIXME: Check and handle 'Retry-After' header in case of server error
Err(gst::error_msg!(
gst::ResourceError::Failed,
- [
- "Server returned error: {} - {}",
- s.as_str(),
- wait(&self.canceller, resp.bytes())
- .map(|x| x.escape_ascii().to_string())
- .unwrap_or_else(|_| "(no further details)".to_string())
- ]
+ ["Server returned error: {} - {}", s.as_str(), resp]
))
}
-
- s => Err(gst::error_msg!(
- gst::ResourceError::Failed,
- [
- "Unexpected response {:?} {:?}",
- s,
- wait(&self.canceller, resp.bytes()).unwrap()
- ]
- )),
};
res
@@ -792,6 +850,7 @@ impl WhipSink {
fn terminate_session(&self) {
let settings = self.settings.lock().unwrap();
let state = self.state.lock().unwrap();
+ let timeout = settings.timeout;
let resource_url = match *state {
State::Running {
ref whip_resource_url,
@@ -815,9 +874,21 @@ impl WhipSink {
gst::debug!(CAT, imp: self, "DELETE request on {}", resource_url);
let client = build_reqwest_client(reqwest::redirect::Policy::default());
- let future = client.delete(resource_url).headers(headermap).send();
+ let future = async {
+ client
+ .delete(resource_url.clone())
+ .headers(headermap)
+ .send()
+ .await
+ .map_err(|err| {
+ gst::error_msg!(
+ gst::ResourceError::Failed,
+ ["DELETE request failed {}: {:?}", resource_url, err]
+ )
+ })
+ };
- let res = wait(&self.canceller, future);
+ let res = wait(&self.canceller, future, timeout);
match res {
Ok(r) => {
gst::debug!(CAT, imp: self, "Response to DELETE : {}", r.status());