diff options
author | Arun Raghavan <arun@asymptotic.io> | 2022-03-16 10:10:28 +0300 |
---|---|---|
committer | Arun Raghavan <arun@asymptotic.io> | 2022-03-21 11:20:07 +0300 |
commit | 1ad277a41002acb4784383460de0048db39e1dce (patch) | |
tree | 06283c5f38acadbf192beb2b2c266462eb27b42b /net | |
parent | 191b1644d6866cb0220b6668151845d9631bf25d (diff) |
rusoto: s3src: Implement timeout and retries
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/690>
Diffstat (limited to 'net')
-rw-r--r-- | net/rusoto/src/s3src/imp.rs | 103 |
1 files changed, 86 insertions, 17 deletions
diff --git a/net/rusoto/src/s3src/imp.rs b/net/rusoto/src/s3src/imp.rs index 9e2dd939..d75bd010 100644 --- a/net/rusoto/src/s3src/imp.rs +++ b/net/rusoto/src/s3src/imp.rs @@ -7,6 +7,7 @@ // SPDX-License-Identifier: MPL-2.0 use std::sync::Mutex; +use std::time::Duration; use bytes::Bytes; use futures::future; @@ -27,6 +28,9 @@ use gst_base::subclass::prelude::*; use crate::s3url::*; use crate::s3utils::{self, WaitError}; +const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 10_000; +const DEFAULT_RETRY_DURATION_MSEC: u64 = 60_000; + #[allow(clippy::large_enum_variant)] enum StreamingState { Stopped, @@ -48,6 +52,8 @@ struct Settings { url: Option<GstS3Url>, access_key: Option<String>, secret_access_key: Option<String>, + request_timeout: Option<Duration>, + retry_duration: Option<Duration>, } #[derive(Default)] @@ -130,16 +136,24 @@ impl S3Src { client: &S3Client, url: &GstS3Url, ) -> Result<u64, gst::ErrorMessage> { - let request = HeadObjectRequest { - bucket: url.bucket.clone(), - key: url.object.clone(), - version_id: url.version.clone(), - ..Default::default() - }; + let settings = self.settings.lock().unwrap(); - let response = client.head_object(request); + let head_object_future = || { + client.head_object(HeadObjectRequest { + bucket: url.bucket.clone(), + key: url.object.clone(), + version_id: url.version.clone(), + ..Default::default() + }) + }; - let output = s3utils::wait(&self.canceller, response).map_err(|err| match err { + let output = s3utils::wait_retry( + &self.canceller, + settings.request_timeout, + settings.retry_duration, + head_object_future, + ) + .map_err(|err| match err { WaitError::FutureError(err) => gst::error_msg!( gst::ResourceError::NotFound, ["Failed to HEAD object: {}", err] @@ -183,13 +197,7 @@ impl S3Src { } }; - let request = GetObjectRequest { - bucket: url.bucket.clone(), - key: url.object.clone(), - range: Some(format!("bytes={}-{}", offset, offset + length - 1)), - version_id: url.version.clone(), - ..Default::default() - }; + let settings = self.settings.lock().unwrap(); gst_debug!( CAT, @@ -199,9 +207,23 @@ impl S3Src { offset + length - 1 ); - let response = client.get_object(request); + let get_object_future = || { + client.get_object(GetObjectRequest { + bucket: url.bucket.clone(), + key: url.object.clone(), + range: Some(format!("bytes={}-{}", offset, offset + length - 1)), + version_id: url.version.clone(), + ..Default::default() + }) + }; - let output = s3utils::wait(&self.canceller, response).map_err(|err| match err { + let output = s3utils::wait_retry( + &self.canceller, + settings.request_timeout, + settings.retry_duration, + get_object_future, + ) + .map_err(|err| match err { WaitError::FutureError(err) => Some(gst::error_msg!( gst::ResourceError::Read, ["Could not read: {}", err] @@ -259,6 +281,24 @@ impl ObjectImpl for S3Src { None, glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, ), + glib::ParamSpecInt64::new( + "request-timeout", + "Request timeout", + "Timeout for each S3 request (in ms, set to -1 for infinity)", + -1, + std::i64::MAX, + DEFAULT_REQUEST_TIMEOUT_MSEC as i64, + glib::ParamFlags::READWRITE, + ), + glib::ParamSpecInt64::new( + "retry-duration", + "Retry duration", + "How long we should retry S3 requests before giving up (in ms, set to -1 for infinity)", + -1, + std::i64::MAX, + DEFAULT_RETRY_DURATION_MSEC as i64, + glib::ParamFlags::READWRITE, + ), ] }); @@ -284,6 +324,21 @@ impl ObjectImpl for S3Src { let mut settings = self.settings.lock().unwrap(); settings.secret_access_key = value.get().expect("type checked upstream"); } + "request-timeout" => { + let mut settings = self.settings.lock().unwrap(); + settings.request_timeout = match value.get::<i64>().expect("type checked upstream") + { + -1 => None, + v => Some(Duration::from_millis(v as u64)), + } + } + "retry-duration" => { + let mut settings = self.settings.lock().unwrap(); + settings.retry_duration = match value.get::<i64>().expect("type checked upstream") { + -1 => None, + v => Some(Duration::from_millis(v as u64)), + } + } _ => unimplemented!(), } } @@ -302,6 +357,20 @@ impl ObjectImpl for S3Src { } "access-key" => settings.access_key.to_value(), "secret-access-key" => settings.secret_access_key.to_value(), + "request-timeout" => { + let timeout: i64 = match settings.request_timeout { + None => -1, + Some(v) => v.as_millis() as i64, + }; + timeout.to_value() + } + "retry-duration" => { + let timeout: i64 = match settings.retry_duration { + None => -1, + Some(v) => v.as_millis() as i64, + }; + timeout.to_value() + } _ => unimplemented!(), } } |