Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/sdroege/gst-plugin-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/net
diff options
context:
space:
mode:
authorArun Raghavan <arun@asymptotic.io>2022-02-23 21:10:30 +0300
committerSebastian Dröge <sebastian@centricular.com>2022-03-08 20:23:26 +0300
commit249b0ac4c1e26d3ad772ab10830361efbcf89307 (patch)
tree4b99ddef1aed8b47359816a7a3bc22d07ed2a074 /net
parent930bfce629ded7f6eebad3ce4905d13f9b3c6abd (diff)
rusoto: s3sink: Implement timeout/retry for part uploads
Rusoto does not implement timeouts or retries for any of its HTTP requests. This is particularly problematic for the part upload stage of multipart uploads, as a blip in the network could cause part uploads to freeze for a long duration and eventually bail. To avoid this, for part uploads, we add (a) (configurable) timeouts for each request, and (b) retries with exponential backoff, upto a configurable duration. It is not clear if/how we want to do this for other types of requests. The creation of a multipart upload should be relatively quick, but the completion of an upload might take several minutes, so there is no one-size-fits-all configuration, necessarily. It would likely make more sense to implement some sensible hard-coded defaults for these other sorts of requests.
Diffstat (limited to 'net')
-rw-r--r--net/rusoto/Cargo.toml3
-rw-r--r--net/rusoto/src/s3sink/imp.rs201
-rw-r--r--net/rusoto/src/s3utils.rs123
3 files changed, 253 insertions, 74 deletions
diff --git a/net/rusoto/Cargo.toml b/net/rusoto/Cargo.toml
index d4a2db39..551b8e6d 100644
--- a/net/rusoto/Cargo.toml
+++ b/net/rusoto/Cargo.toml
@@ -21,7 +21,7 @@ rusoto_credential = "0.47"
rusoto_signature = "0.47"
url = "2"
percent-encoding = "2"
-tokio = { version = "1.0", features = [ "rt-multi-thread" ] }
+tokio = { version = "1.0", features = [ "rt-multi-thread", "time" ] }
async-tungstenite = { version = "0.16", features = ["tokio", "tokio-runtime", "tokio-native-tls"] }
nom = "7"
crc = "2"
@@ -32,6 +32,7 @@ serde_derive = "1"
serde_json = "1"
atomic_refcell = "0.1"
base32 = "0.4"
+backoff = { version = "0.4", features = [ "futures", "tokio" ] }
[lib]
name = "gstrusoto"
diff --git a/net/rusoto/src/s3sink/imp.rs b/net/rusoto/src/s3sink/imp.rs
index 850dcbe2..a0d6e054 100644
--- a/net/rusoto/src/s3sink/imp.rs
+++ b/net/rusoto/src/s3sink/imp.rs
@@ -25,6 +25,7 @@ use once_cell::sync::Lazy;
use std::collections::HashMap;
use std::sync::Mutex;
+use std::time::Duration;
use crate::s3url::*;
use crate::s3utils::{self, WaitError};
@@ -32,6 +33,8 @@ use crate::s3utils::{self, WaitError};
use super::OnError;
const DEFAULT_MULTIPART_UPLOAD_ON_ERROR: OnError = OnError::DoNothing;
+const DEFAULT_UPLOAD_PART_REQUEST_TIMEOUT_MSEC: u64 = 10_000;
+const DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC: u64 = 60_000;
struct Started {
client: S3Client,
@@ -94,6 +97,8 @@ struct Settings {
secret_access_key: Option<String>,
metadata: Option<gst::Structure>,
multipart_upload_on_error: OnError,
+ upload_part_request_timeout: Option<Duration>,
+ upload_part_retry_duration: Option<Duration>,
}
impl Settings {
@@ -162,6 +167,12 @@ impl Default for Settings {
secret_access_key: None,
metadata: None,
multipart_upload_on_error: DEFAULT_MULTIPART_UPLOAD_ON_ERROR,
+ upload_part_request_timeout: Some(Duration::from_millis(
+ DEFAULT_UPLOAD_PART_REQUEST_TIMEOUT_MSEC,
+ )),
+ upload_part_retry_duration: Some(Duration::from_millis(
+ DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC,
+ )),
}
}
}
@@ -188,10 +199,8 @@ impl S3Sink {
&self,
element: &super::S3Sink,
) -> Result<(), Option<gst::ErrorMessage>> {
- let upload_part_req = self.create_upload_part_request()?;
- let part_number = upload_part_req.part_number;
-
let mut state = self.state.lock().unwrap();
+ let settings = self.settings.lock().unwrap();
let state = match *state {
State::Started(ref mut started_state) => started_state,
State::Stopped => {
@@ -199,68 +208,81 @@ impl S3Sink {
}
};
- let upload_part_req_future = state.client.upload_part(upload_part_req);
+ let part_number = state.increment_part_number()?;
+ let body = std::mem::replace(
+ &mut state.buffer,
+ Vec::with_capacity(settings.buffer_size as usize),
+ );
+ let upload_id = &state.upload_id;
+ let client = &state.client;
- let output =
- s3utils::wait(&self.canceller, upload_part_req_future).map_err(|err| match err {
- WaitError::FutureError(err) => {
- let settings = self.settings.lock().unwrap();
- match settings.multipart_upload_on_error {
- OnError::Abort => {
- gst_log!(
- CAT,
- obj: element,
- "Aborting multipart upload request with id: {}",
- state.upload_id
- );
- match self.abort_multipart_upload_request(state) {
- Ok(()) => {
- gst_log!(
- CAT,
- obj: element,
- "Aborting multipart upload request succeeded."
- );
- }
- Err(err) => gst_error!(
+ let upload_part_req_future =
+ || client.upload_part(self.create_upload_part_request(&body, part_number, upload_id));
+
+ let output = s3utils::wait_retry(
+ &self.canceller,
+ settings.upload_part_request_timeout,
+ settings.upload_part_retry_duration,
+ upload_part_req_future,
+ )
+ .map_err(|err| match err {
+ WaitError::FutureError(err) => {
+ match settings.multipart_upload_on_error {
+ OnError::Abort => {
+ gst_log!(
+ CAT,
+ obj: element,
+ "Aborting multipart upload request with id: {}",
+ state.upload_id
+ );
+ match self.abort_multipart_upload_request(state) {
+ Ok(()) => {
+ gst_log!(
CAT,
obj: element,
- "Aborting multipart upload failed: {}",
- err.to_string()
- ),
+ "Aborting multipart upload request succeeded."
+ );
}
- }
- OnError::Complete => {
- gst_log!(
+ Err(err) => gst_error!(
CAT,
obj: element,
- "Completing multipart upload request with id: {}",
- state.upload_id
- );
- match self.complete_multipart_upload_request(state) {
- Ok(()) => {
- gst_log!(
- CAT,
- obj: element,
- "Complete multipart upload request succeeded."
- );
- }
- Err(err) => gst_error!(
+ "Aborting multipart upload failed: {}",
+ err.to_string()
+ ),
+ }
+ }
+ OnError::Complete => {
+ gst_log!(
+ CAT,
+ obj: element,
+ "Completing multipart upload request with id: {}",
+ state.upload_id
+ );
+ match self.complete_multipart_upload_request(state) {
+ Ok(()) => {
+ gst_log!(
CAT,
obj: element,
- "Completing multipart upload failed: {}",
- err.to_string()
- ),
+ "Complete multipart upload request succeeded."
+ );
}
+ Err(err) => gst_error!(
+ CAT,
+ obj: element,
+ "Completing multipart upload failed: {}",
+ err.to_string()
+ ),
}
- OnError::DoNothing => (),
}
- Some(gst::error_msg!(
- gst::ResourceError::OpenWrite,
- ["Failed to upload part: {}", err]
- ))
+ OnError::DoNothing => (),
}
- WaitError::Cancelled => None,
- })?;
+ Some(gst::error_msg!(
+ gst::ResourceError::OpenWrite,
+ ["Failed to upload part: {}", err]
+ ))
+ }
+ WaitError::Cancelled => None,
+ })?;
state.completed_parts.push(CompletedPart {
e_tag: output.e_tag,
@@ -271,29 +293,22 @@ impl S3Sink {
Ok(())
}
- fn create_upload_part_request(&self) -> Result<UploadPartRequest, gst::ErrorMessage> {
+ fn create_upload_part_request(
+ &self,
+ body: &[u8],
+ part_number: i64,
+ upload_id: &str,
+ ) -> UploadPartRequest {
let url = self.url.lock().unwrap();
- let settings = self.settings.lock().unwrap();
- let mut state = self.state.lock().unwrap();
- let state = match *state {
- State::Started(ref mut started_state) => started_state,
- State::Stopped => {
- unreachable!("Element should be started");
- }
- };
- let part_number = state.increment_part_number()?;
- Ok(UploadPartRequest {
- body: Some(rusoto_core::ByteStream::from(std::mem::replace(
- &mut state.buffer,
- Vec::with_capacity(settings.buffer_size as usize),
- ))),
+ UploadPartRequest {
+ body: Some(rusoto_core::ByteStream::from(body.to_owned())),
bucket: url.as_ref().unwrap().bucket.to_owned(),
key: url.as_ref().unwrap().object.to_owned(),
- upload_id: state.upload_id.to_owned(),
+ upload_id: upload_id.to_owned(),
part_number,
..Default::default()
- })
+ }
}
fn create_complete_multipart_upload_request(
@@ -641,6 +656,24 @@ impl ObjectImpl for S3Sink {
DEFAULT_MULTIPART_UPLOAD_ON_ERROR as i32,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
+ glib::ParamSpecInt64::new(
+ "upload-part-request-timeout",
+ "Upload part request timeout",
+ "Timeout for a single upload part request (in ms, set to -1 for infinity)",
+ -1,
+ std::i64::MAX,
+ DEFAULT_UPLOAD_PART_REQUEST_TIMEOUT_MSEC as i64,
+ glib::ParamFlags::READWRITE,
+ ),
+ glib::ParamSpecInt64::new(
+ "upload-part-retry-duration",
+ "Upload part retry duration",
+ "How long we should retry upload part requests before giving up (in ms, set to -1 for infinity)",
+ -1,
+ std::i64::MAX,
+ DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC as i64,
+ glib::ParamFlags::READWRITE,
+ ),
]
});
@@ -717,6 +750,20 @@ impl ObjectImpl for S3Sink {
settings.multipart_upload_on_error =
value.get::<OnError>().expect("type checked upstream");
}
+ "upload-part-request-timeout" => {
+ settings.upload_part_request_timeout =
+ match value.get::<i64>().expect("type checked upstream") {
+ -1 => None,
+ v => Some(Duration::from_millis(v as u64)),
+ }
+ }
+ "upload-part-retry-duration" => {
+ settings.upload_part_retry_duration =
+ match value.get::<i64>().expect("type checked upstream") {
+ -1 => None,
+ v => Some(Duration::from_millis(v as u64)),
+ }
+ }
_ => unimplemented!(),
}
}
@@ -741,6 +788,20 @@ impl ObjectImpl for S3Sink {
"secret-access-key" => settings.secret_access_key.to_value(),
"metadata" => settings.metadata.to_value(),
"on-error" => settings.multipart_upload_on_error.to_value(),
+ "upload-part-request-timeout" => {
+ let timeout: i64 = match settings.upload_part_request_timeout {
+ None => -1,
+ Some(v) => v.as_millis() as i64,
+ };
+ timeout.to_value()
+ }
+ "upload-part-retry-duration" => {
+ let timeout: i64 = match settings.upload_part_retry_duration {
+ None => -1,
+ Some(v) => v.as_millis() as i64,
+ };
+ timeout.to_value()
+ }
_ => unimplemented!(),
}
}
diff --git a/net/rusoto/src/s3utils.rs b/net/rusoto/src/s3utils.rs
index 9775ebd4..2b344dc3 100644
--- a/net/rusoto/src/s3utils.rs
+++ b/net/rusoto/src/s3utils.rs
@@ -7,13 +7,24 @@
// SPDX-License-Identifier: MPL-2.0
use bytes::{buf::BufMut, Bytes, BytesMut};
-use futures::stream::TryStreamExt;
-use futures::{future, Future};
+use futures::{future, Future, FutureExt, TryFutureExt, TryStreamExt};
use once_cell::sync::Lazy;
-use rusoto_core::ByteStream;
+use rusoto_core::RusotoError::HttpDispatch;
+use rusoto_core::{ByteStream, HttpDispatchError, RusotoError};
use std::sync::Mutex;
+use std::time::Duration;
use tokio::runtime;
+use gst::gst_warning;
+
+static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
+ gst::DebugCategory::new(
+ "rusotos3utils",
+ gst::DebugColorFlags::empty(),
+ Some("Amazon S3 utilities"),
+ )
+});
+
static RUNTIME: Lazy<runtime::Runtime> = Lazy::new(|| {
runtime::Builder::new_multi_thread()
.enable_all()
@@ -28,6 +39,112 @@ pub enum WaitError<E> {
FutureError(E),
}
+fn make_timeout<F, T, E>(
+ timeout: Duration,
+ future: F,
+) -> impl Future<Output = Result<T, RusotoError<E>>>
+where
+ E: std::fmt::Debug,
+ F: Future<Output = Result<T, RusotoError<E>>>,
+{
+ tokio::time::timeout(timeout, future).map(|v| match v {
+ // Future resolved succesfully
+ Ok(Ok(v)) => Ok(v),
+ // Future resolved with an error
+ Ok(Err(e)) => Err(e),
+ // Timeout elapsed
+ // Use an HttpDispatch error so the caller doesn't have to deal with this separately from
+ // other HTTP dispatch errors
+ _ => Err(HttpDispatch(HttpDispatchError::new("Timeout".to_owned()))),
+ })
+}
+
+fn make_retry<F, T, E, Fut>(
+ timeout: Option<Duration>,
+ mut future: F,
+) -> impl Future<Output = Result<T, RusotoError<E>>>
+where
+ E: std::fmt::Debug,
+ F: FnMut() -> Fut,
+ Fut: Future<Output = Result<T, RusotoError<E>>>,
+{
+ backoff::future::retry(
+ backoff::ExponentialBackoffBuilder::new()
+ .with_initial_interval(Duration::from_millis(500))
+ .with_multiplier(1.5)
+ .with_max_elapsed_time(timeout)
+ .build(),
+ move || {
+ future().map_err(|err| match err {
+ HttpDispatch(_) => {
+ gst_warning!(CAT, "Error waiting for operation ({:?}), retrying", err);
+ backoff::Error::transient(err)
+ }
+ _ => backoff::Error::permanent(err),
+ })
+ },
+ )
+}
+
+pub fn wait_retry<F, T, E, Fut>(
+ canceller: &Mutex<Option<future::AbortHandle>>,
+ req_timeout: Option<Duration>,
+ retry_timeout: Option<Duration>,
+ mut future: F,
+) -> Result<T, WaitError<RusotoError<E>>>
+where
+ E: std::fmt::Debug,
+ F: FnMut() -> Fut,
+ Fut: Send + Future<Output = Result<T, RusotoError<E>>>,
+ Fut::Output: Send,
+ T: Send,
+ E: Send,
+{
+ let mut canceller_guard = canceller.lock().unwrap();
+ let (abort_handle, abort_registration) = future::AbortHandle::new_pair();
+
+ canceller_guard.replace(abort_handle);
+ drop(canceller_guard);
+
+ let res = {
+ let _enter = RUNTIME.enter();
+
+ futures::executor::block_on(async {
+ // The order of this future stack matters: the innermost future is the supplied future
+ // generator closure. We wrap that in a timeout to bound how long we wait. This, in
+ // turn, is wrapped in a retrying future which will make multiple attempts until it
+ // ultimately fails.
+ // The timeout must be created within the tokio executor
+ let res = match req_timeout {
+ None => {
+ let retry_future = make_retry(retry_timeout, future);
+ future::Abortable::new(retry_future, abort_registration).await
+ }
+ Some(t) => {
+ let timeout_future = || make_timeout(t, future());
+ let retry_future = make_retry(retry_timeout, timeout_future);
+ future::Abortable::new(retry_future, abort_registration).await
+ }
+ };
+
+ match res {
+ // Future resolved successfully
+ Ok(Ok(res)) => Ok(res),
+ // Future resolved with an error
+ Ok(Err(err)) => Err(WaitError::FutureError(err)),
+ // Canceller called before future resolved
+ Err(future::Aborted) => Err(WaitError::Cancelled),
+ }
+ })
+ };
+
+ /* Clear out the canceller */
+ canceller_guard = canceller.lock().unwrap();
+ *canceller_guard = None;
+
+ res
+}
+
pub fn wait<F, T, E>(
canceller: &Mutex<Option<future::AbortHandle>>,
future: F,