Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/sdroege/gst-plugin-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/net
diff options
context:
space:
mode:
authorArun Raghavan <arun@asymptotic.io>2022-03-18 12:44:17 +0300
committerArun Raghavan <arun@asymptotic.io>2022-03-21 11:20:07 +0300
commit724c6d6e32200d5ff17bd43a04d9980156332881 (patch)
tree9c579591c238427ec1d1f8f81acf12f79f83d90a /net
parent7b8d3acf10d1ce41753ceb332336a94f99624fa0 (diff)
rusoto: s3sink: Make remaining requests bounded in time
This implements a default timeout and retry duration for the remaining S3 requests that were still able to be blocked indefinitely. There are 3 classes of operations: multipart upload creation/abort (should not take too long), uploads (duration depends on part size), multipart upload completion (can take several minutes according to documentation). We currently only expose the part upload times as configurable, and hard code the rest. If it seems sensible, we can expose the other two sets of parameters as well. Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/690>
Diffstat (limited to 'net')
-rw-r--r--net/rusoto/src/s3sink/imp.rs148
-rw-r--r--net/rusoto/src/s3utils.rs41
2 files changed, 94 insertions, 95 deletions
diff --git a/net/rusoto/src/s3sink/imp.rs b/net/rusoto/src/s3sink/imp.rs
index ed8d45fa..67e311ba 100644
--- a/net/rusoto/src/s3sink/imp.rs
+++ b/net/rusoto/src/s3sink/imp.rs
@@ -34,8 +34,17 @@ use crate::s3utils::{self, RetriableError, WaitError};
use super::OnError;
const DEFAULT_MULTIPART_UPLOAD_ON_ERROR: OnError = OnError::DoNothing;
+// This needs to be independently configurable, as the part size can be upto 5GB
const DEFAULT_UPLOAD_PART_REQUEST_TIMEOUT_MSEC: u64 = 10_000;
const DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC: u64 = 60_000;
+// The rest of these aren't exposed as properties yet
+// General setting for create / abort requests
+const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 10_000;
+const DEFAULT_RETRY_DURATION_MSEC: u64 = 60_000;
+// CompletedMultipartUpload can take minutes to complete, so we need a longer value here
+// https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
+const DEFAULT_COMPLETE_REQUEST_TIMEOUT_MSEC: u64 = 600_000; // 10 minutes
+const DEFAULT_COMPLETE_RETRY_DURATION_MSEC: u64 = 3_600_000; // 60 minutes
struct Started {
client: S3Client,
@@ -317,16 +326,9 @@ impl S3Sink {
fn create_complete_multipart_upload_request(
&self,
- started_state: &mut Started,
+ started_state: &Started,
+ completed_upload: CompletedMultipartUpload,
) -> CompleteMultipartUploadRequest {
- started_state
- .completed_parts
- .sort_by(|a, b| a.part_number.cmp(&b.part_number));
-
- let completed_upload = CompletedMultipartUpload {
- parts: Some(std::mem::take(&mut started_state.completed_parts)),
- };
-
let url = self.url.lock().unwrap();
CompleteMultipartUploadRequest {
bucket: url.as_ref().unwrap().bucket.to_owned(),
@@ -373,45 +375,74 @@ impl S3Sink {
Some(ref url) => url.clone(),
None => unreachable!("Element should be started"),
};
- let abort_req = self.create_abort_multipart_upload_request(&s3url, started_state);
- let abort_req_future = started_state.client.abort_multipart_upload(abort_req);
-
- s3utils::wait(&self.abort_multipart_canceller, abort_req_future)
- .map(|_| ())
- .map_err(|err| match err {
- WaitError::FutureError(err) => {
- gst::error_msg!(
- gst::ResourceError::Write,
- ["Failed to abort multipart upload: {}.", err.to_string()]
- )
- }
- WaitError::Cancelled => {
- gst::error_msg!(
- gst::ResourceError::Write,
- ["Abort multipart upload request interrupted."]
- )
- }
- })
+ let abort_req_future = || {
+ let abort_req = self.create_abort_multipart_upload_request(&s3url, started_state);
+ started_state
+ .client
+ .abort_multipart_upload(abort_req)
+ .map_err(RetriableError::Rusoto)
+ };
+
+ s3utils::wait_retry(
+ &self.abort_multipart_canceller,
+ Some(Duration::from_millis(DEFAULT_REQUEST_TIMEOUT_MSEC)),
+ Some(Duration::from_millis(DEFAULT_RETRY_DURATION_MSEC)),
+ abort_req_future,
+ )
+ .map(|_| ())
+ .map_err(|err| match err {
+ WaitError::FutureError(err) => {
+ gst::error_msg!(
+ gst::ResourceError::Write,
+ ["Failed to abort multipart upload: {:?}.", err]
+ )
+ }
+ WaitError::Cancelled => {
+ gst::error_msg!(
+ gst::ResourceError::Write,
+ ["Abort multipart upload request interrupted."]
+ )
+ }
+ })
}
fn complete_multipart_upload_request(
&self,
started_state: &mut Started,
) -> Result<(), gst::ErrorMessage> {
- let complete_req = self.create_complete_multipart_upload_request(started_state);
- let complete_req_future = started_state.client.complete_multipart_upload(complete_req);
+ started_state
+ .completed_parts
+ .sort_by(|a, b| a.part_number.cmp(&b.part_number));
- s3utils::wait(&self.canceller, complete_req_future)
- .map(|_| ())
- .map_err(|err| match err {
- WaitError::FutureError(err) => gst::error_msg!(
- gst::ResourceError::Write,
- ["Failed to complete multipart upload: {}.", err.to_string()]
- ),
- WaitError::Cancelled => {
- gst::error_msg!(gst::LibraryError::Failed, ["Interrupted during stop"])
- }
- })
+ let completed_upload = CompletedMultipartUpload {
+ parts: Some(std::mem::take(&mut started_state.completed_parts)),
+ };
+
+ let complete_req_future = || {
+ let complete_req = self
+ .create_complete_multipart_upload_request(started_state, completed_upload.clone());
+ started_state
+ .client
+ .complete_multipart_upload(complete_req)
+ .map_err(RetriableError::Rusoto)
+ };
+
+ s3utils::wait_retry(
+ &self.canceller,
+ Some(Duration::from_millis(DEFAULT_COMPLETE_REQUEST_TIMEOUT_MSEC)),
+ Some(Duration::from_millis(DEFAULT_COMPLETE_RETRY_DURATION_MSEC)),
+ complete_req_future,
+ )
+ .map(|_| ())
+ .map_err(|err| match err {
+ WaitError::FutureError(err) => gst::error_msg!(
+ gst::ResourceError::Write,
+ ["Failed to complete multipart upload: {:?}.", err]
+ ),
+ WaitError::Cancelled => {
+ gst::error_msg!(gst::LibraryError::Failed, ["Interrupted during stop"])
+ }
+ })
}
fn finalize_upload(&self, element: &super::S3Sink) -> Result<(), gst::ErrorMessage> {
@@ -467,20 +498,29 @@ impl S3Sink {
_ => S3Client::new(s3url.region.clone()),
};
- let create_multipart_req = self.create_create_multipart_upload_request(&s3url, &settings);
- let create_multipart_req_future = client.create_multipart_upload(create_multipart_req);
+ let create_multipart_req_future = || {
+ let create_multipart_req =
+ self.create_create_multipart_upload_request(&s3url, &settings);
+ client
+ .create_multipart_upload(create_multipart_req)
+ .map_err(RetriableError::Rusoto)
+ };
- let response = s3utils::wait(&self.canceller, create_multipart_req_future).map_err(
- |err| match err {
- WaitError::FutureError(err) => gst::error_msg!(
- gst::ResourceError::OpenWrite,
- ["Failed to create multipart upload: {}", err]
- ),
- WaitError::Cancelled => {
- gst::error_msg!(gst::LibraryError::Failed, ["Interrupted during start"])
- }
- },
- )?;
+ let response = s3utils::wait_retry(
+ &self.canceller,
+ Some(Duration::from_millis(DEFAULT_REQUEST_TIMEOUT_MSEC)),
+ Some(Duration::from_millis(DEFAULT_RETRY_DURATION_MSEC)),
+ create_multipart_req_future,
+ )
+ .map_err(|err| match err {
+ WaitError::FutureError(err) => gst::error_msg!(
+ gst::ResourceError::OpenWrite,
+ ["Failed to create multipart upload: {:?}", err]
+ ),
+ WaitError::Cancelled => {
+ gst::error_msg!(gst::LibraryError::Failed, ["Interrupted during start"])
+ }
+ })?;
let upload_id = response.upload_id.ok_or_else(|| {
gst::error_msg!(
diff --git a/net/rusoto/src/s3utils.rs b/net/rusoto/src/s3utils.rs
index 94b7ba8a..d11e0ddd 100644
--- a/net/rusoto/src/s3utils.rs
+++ b/net/rusoto/src/s3utils.rs
@@ -165,44 +165,3 @@ where
res
}
-
-pub fn wait<F, T, E>(
- canceller: &Mutex<Option<future::AbortHandle>>,
- future: F,
-) -> Result<T, WaitError<E>>
-where
- F: Send + Future<Output = Result<T, E>>,
- F::Output: Send,
- T: Send,
- E: Send,
-{
- let mut canceller_guard = canceller.lock().unwrap();
- let (abort_handle, abort_registration) = future::AbortHandle::new_pair();
-
- canceller_guard.replace(abort_handle);
- drop(canceller_guard);
-
- let abortable_future = future::Abortable::new(future, abort_registration);
-
- // FIXME: add a timeout as well
-
- let res = {
- let _enter = RUNTIME.enter();
- futures::executor::block_on(async {
- match abortable_future.await {
- // Future resolved successfully
- Ok(Ok(res)) => Ok(res),
- // Future resolved with an error
- Ok(Err(err)) => Err(WaitError::FutureError(err)),
- // Canceller called before future resolved
- Err(future::Aborted) => Err(WaitError::Cancelled),
- }
- })
- };
-
- /* Clear out the canceller */
- canceller_guard = canceller.lock().unwrap();
- *canceller_guard = None;
-
- res
-}