Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/sdroege/gst-plugin-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/net
diff options
context:
space:
mode:
authorArun Raghavan <arun@asymptotic.io>2022-03-16 10:10:28 +0300
committerArun Raghavan <arun@asymptotic.io>2022-03-21 11:20:07 +0300
commit1ad277a41002acb4784383460de0048db39e1dce (patch)
tree06283c5f38acadbf192beb2b2c266462eb27b42b /net
parent191b1644d6866cb0220b6668151845d9631bf25d (diff)
rusoto: s3src: Implement timeout and retries
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/690>
Diffstat (limited to 'net')
-rw-r--r--net/rusoto/src/s3src/imp.rs103
1 files changed, 86 insertions, 17 deletions
diff --git a/net/rusoto/src/s3src/imp.rs b/net/rusoto/src/s3src/imp.rs
index 9e2dd939..d75bd010 100644
--- a/net/rusoto/src/s3src/imp.rs
+++ b/net/rusoto/src/s3src/imp.rs
@@ -7,6 +7,7 @@
// SPDX-License-Identifier: MPL-2.0
use std::sync::Mutex;
+use std::time::Duration;
use bytes::Bytes;
use futures::future;
@@ -27,6 +28,9 @@ use gst_base::subclass::prelude::*;
use crate::s3url::*;
use crate::s3utils::{self, WaitError};
+const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 10_000;
+const DEFAULT_RETRY_DURATION_MSEC: u64 = 60_000;
+
#[allow(clippy::large_enum_variant)]
enum StreamingState {
Stopped,
@@ -48,6 +52,8 @@ struct Settings {
url: Option<GstS3Url>,
access_key: Option<String>,
secret_access_key: Option<String>,
+ request_timeout: Option<Duration>,
+ retry_duration: Option<Duration>,
}
#[derive(Default)]
@@ -130,16 +136,24 @@ impl S3Src {
client: &S3Client,
url: &GstS3Url,
) -> Result<u64, gst::ErrorMessage> {
- let request = HeadObjectRequest {
- bucket: url.bucket.clone(),
- key: url.object.clone(),
- version_id: url.version.clone(),
- ..Default::default()
- };
+ let settings = self.settings.lock().unwrap();
- let response = client.head_object(request);
+ let head_object_future = || {
+ client.head_object(HeadObjectRequest {
+ bucket: url.bucket.clone(),
+ key: url.object.clone(),
+ version_id: url.version.clone(),
+ ..Default::default()
+ })
+ };
- let output = s3utils::wait(&self.canceller, response).map_err(|err| match err {
+ let output = s3utils::wait_retry(
+ &self.canceller,
+ settings.request_timeout,
+ settings.retry_duration,
+ head_object_future,
+ )
+ .map_err(|err| match err {
WaitError::FutureError(err) => gst::error_msg!(
gst::ResourceError::NotFound,
["Failed to HEAD object: {}", err]
@@ -183,13 +197,7 @@ impl S3Src {
}
};
- let request = GetObjectRequest {
- bucket: url.bucket.clone(),
- key: url.object.clone(),
- range: Some(format!("bytes={}-{}", offset, offset + length - 1)),
- version_id: url.version.clone(),
- ..Default::default()
- };
+ let settings = self.settings.lock().unwrap();
gst_debug!(
CAT,
@@ -199,9 +207,23 @@ impl S3Src {
offset + length - 1
);
- let response = client.get_object(request);
+ let get_object_future = || {
+ client.get_object(GetObjectRequest {
+ bucket: url.bucket.clone(),
+ key: url.object.clone(),
+ range: Some(format!("bytes={}-{}", offset, offset + length - 1)),
+ version_id: url.version.clone(),
+ ..Default::default()
+ })
+ };
- let output = s3utils::wait(&self.canceller, response).map_err(|err| match err {
+ let output = s3utils::wait_retry(
+ &self.canceller,
+ settings.request_timeout,
+ settings.retry_duration,
+ get_object_future,
+ )
+ .map_err(|err| match err {
WaitError::FutureError(err) => Some(gst::error_msg!(
gst::ResourceError::Read,
["Could not read: {}", err]
@@ -259,6 +281,24 @@ impl ObjectImpl for S3Src {
None,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
+ glib::ParamSpecInt64::new(
+ "request-timeout",
+ "Request timeout",
+ "Timeout for each S3 request (in ms, set to -1 for infinity)",
+ -1,
+ std::i64::MAX,
+ DEFAULT_REQUEST_TIMEOUT_MSEC as i64,
+ glib::ParamFlags::READWRITE,
+ ),
+ glib::ParamSpecInt64::new(
+ "retry-duration",
+ "Retry duration",
+ "How long we should retry S3 requests before giving up (in ms, set to -1 for infinity)",
+ -1,
+ std::i64::MAX,
+ DEFAULT_RETRY_DURATION_MSEC as i64,
+ glib::ParamFlags::READWRITE,
+ ),
]
});
@@ -284,6 +324,21 @@ impl ObjectImpl for S3Src {
let mut settings = self.settings.lock().unwrap();
settings.secret_access_key = value.get().expect("type checked upstream");
}
+ "request-timeout" => {
+ let mut settings = self.settings.lock().unwrap();
+ settings.request_timeout = match value.get::<i64>().expect("type checked upstream")
+ {
+ -1 => None,
+ v => Some(Duration::from_millis(v as u64)),
+ }
+ }
+ "retry-duration" => {
+ let mut settings = self.settings.lock().unwrap();
+ settings.retry_duration = match value.get::<i64>().expect("type checked upstream") {
+ -1 => None,
+ v => Some(Duration::from_millis(v as u64)),
+ }
+ }
_ => unimplemented!(),
}
}
@@ -302,6 +357,20 @@ impl ObjectImpl for S3Src {
}
"access-key" => settings.access_key.to_value(),
"secret-access-key" => settings.secret_access_key.to_value(),
+ "request-timeout" => {
+ let timeout: i64 = match settings.request_timeout {
+ None => -1,
+ Some(v) => v.as_millis() as i64,
+ };
+ timeout.to_value()
+ }
+ "retry-duration" => {
+ let timeout: i64 = match settings.retry_duration {
+ None => -1,
+ Some(v) => v.as_millis() as i64,
+ };
+ timeout.to_value()
+ }
_ => unimplemented!(),
}
}