Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/sdroege/gst-plugin-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/net
diff options
context:
space:
mode:
authorSebastian Dröge <sebastian@centricular.com>2021-09-27 16:49:12 +0300
committerSebastian Dröge <sebastian@centricular.com>2021-09-27 17:00:36 +0300
commit502b336361da8a4ddd5ab96e7009359f2c559394 (patch)
tree564a0c613bbcc9400b6f6fd54f50da716024a2b3 /net
parent3ce6a5f40313e637064344608efc7ca66d873748 (diff)
rusoto: Implement auth via explicit access-key/secret-access-key properties
This allows passing them explicitly as strings to the elements instead of relying on system/per-user configuration.
Diffstat (limited to 'net')
-rw-r--r--net/rusoto/src/aws_transcriber/imp.rs78
-rw-r--r--net/rusoto/src/s3sink/imp.rs45
-rw-r--r--net/rusoto/src/s3src/imp.rs98
3 files changed, 193 insertions, 28 deletions
diff --git a/net/rusoto/src/aws_transcriber/imp.rs b/net/rusoto/src/aws_transcriber/imp.rs
index ce3a53e8..07a3843e 100644
--- a/net/rusoto/src/aws_transcriber/imp.rs
+++ b/net/rusoto/src/aws_transcriber/imp.rs
@@ -26,7 +26,7 @@ use gst::{
use std::default::Default;
use rusoto_core::Region;
-use rusoto_credential::{ChainProvider, ProvideAwsCredentials};
+use rusoto_credential::{ChainProvider, ProvideAwsCredentials, StaticProvider};
use rusoto_signature::signature::SignedRequest;
@@ -127,6 +127,8 @@ struct Settings {
vocabulary: Option<String>,
session_id: Option<String>,
results_stability: AwsTranscriberResultStability,
+ access_key: Option<String>,
+ secret_access_key: Option<String>,
}
impl Default for Settings {
@@ -138,6 +140,8 @@ impl Default for Settings {
vocabulary: None,
session_id: None,
results_stability: DEFAULT_STABILITY,
+ access_key: None,
+ secret_access_key: None,
}
}
}
@@ -857,15 +861,39 @@ impl Transcriber {
gst_info!(CAT, obj: element, "Connecting ..");
- let creds = {
- let _enter = RUNTIME.enter();
- futures::executor::block_on(ChainProvider::new().credentials()).map_err(|err| {
- gst_error!(CAT, obj: element, "Failed to generate credentials: {}", err);
- error_msg!(
- gst::CoreError::Failed,
- ["Failed to generate credentials: {}", err]
- )
- })?
+ let creds = match (
+ settings.access_key.as_ref(),
+ settings.secret_access_key.as_ref(),
+ ) {
+ (Some(access_key), Some(secret_access_key)) => {
+ let _enter = RUNTIME.enter();
+ futures::executor::block_on(
+ StaticProvider::new_minimal(access_key.clone(), secret_access_key.clone())
+ .credentials()
+ .map_err(|err| {
+ gst_error!(
+ CAT,
+ obj: element,
+ "Failed to generate credentials: {}",
+ err
+ );
+ error_msg!(
+ gst::CoreError::Failed,
+ ["Failed to generate credentials: {}", err]
+ )
+ }),
+ )?
+ }
+ _ => {
+ let _enter = RUNTIME.enter();
+ futures::executor::block_on(ChainProvider::new().credentials()).map_err(|err| {
+ gst_error!(CAT, obj: element, "Failed to generate credentials: {}", err);
+ error_msg!(
+ gst::CoreError::Failed,
+ ["Failed to generate credentials: {}", err]
+ )
+ })?
+ }
};
let language_code = settings
@@ -1113,6 +1141,20 @@ impl ObjectImpl for Transcriber {
DEFAULT_STABILITY as i32,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
+ glib::ParamSpec::new_string(
+ "access-key",
+ "Access Key",
+ "AWS Access Key",
+ None,
+ glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
+ ),
+ glib::ParamSpec::new_string(
+ "secret-access-key",
+ "Secret Access Key",
+ "AWS Secret Access Key",
+ None,
+ glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
+ ),
]
});
@@ -1165,6 +1207,14 @@ impl ObjectImpl for Transcriber {
.get::<AwsTranscriberResultStability>()
.expect("type checked upstream");
}
+ "access-key" => {
+ let mut settings = self.settings.lock().unwrap();
+ settings.access_key = value.get().expect("type checked upstream");
+ }
+ "secret-access-key" => {
+ let mut settings = self.settings.lock().unwrap();
+ settings.secret_access_key = value.get().expect("type checked upstream");
+ }
_ => unimplemented!(),
}
}
@@ -1195,6 +1245,14 @@ impl ObjectImpl for Transcriber {
let settings = self.settings.lock().unwrap();
settings.results_stability.to_value()
}
+ "access-key" => {
+ let settings = self.settings.lock().unwrap();
+ settings.access_key.to_value()
+ }
+ "secret-access-key" => {
+ let settings = self.settings.lock().unwrap();
+ settings.secret_access_key.to_value()
+ }
_ => unimplemented!(),
}
}
diff --git a/net/rusoto/src/s3sink/imp.rs b/net/rusoto/src/s3sink/imp.rs
index 1428b38e..6f74e94e 100644
--- a/net/rusoto/src/s3sink/imp.rs
+++ b/net/rusoto/src/s3sink/imp.rs
@@ -14,7 +14,8 @@ use gst::{gst_debug, gst_error, gst_info, gst_trace};
use gst_base::subclass::prelude::*;
use futures::future;
-use rusoto_core::region::Region;
+use rusoto_core::{region::Region, request::HttpClient};
+use rusoto_credential::StaticProvider;
use rusoto_s3::{
CompleteMultipartUploadRequest, CompletedMultipartUpload, CompletedPart,
CreateMultipartUploadRequest, S3Client, UploadPartRequest, S3,
@@ -86,6 +87,8 @@ struct Settings {
key: Option<String>,
content_type: Option<String>,
buffer_size: u64,
+ access_key: Option<String>,
+ secret_access_key: Option<String>,
}
impl Settings {
@@ -107,6 +110,8 @@ impl Default for Settings {
key: None,
content_type: None,
buffer_size: DEFAULT_BUFFER_SIZE,
+ access_key: None,
+ secret_access_key: None,
}
}
}
@@ -273,7 +278,21 @@ impl S3Sink {
}
};
- let client = S3Client::new(s3url.region.clone());
+ let client = match (
+ settings.access_key.as_ref(),
+ settings.secret_access_key.as_ref(),
+ ) {
+ (Some(access_key), Some(secret_access_key)) => {
+ let creds =
+ StaticProvider::new_minimal(access_key.clone(), secret_access_key.clone());
+ S3Client::new_with(
+ HttpClient::new().expect("failed to create request dispatcher"),
+ creds,
+ s3url.region.clone(),
+ )
+ }
+ _ => S3Client::new(s3url.region.clone()),
+ };
let create_multipart_req = self.create_create_multipart_upload_request(&s3url, &settings);
let create_multipart_req_future = client.create_multipart_upload(create_multipart_req);
@@ -434,6 +453,20 @@ impl ObjectImpl for S3Sink {
None,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
+ glib::ParamSpec::new_string(
+ "access-key",
+ "Access Key",
+ "AWS Access Key",
+ None,
+ glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
+ ),
+ glib::ParamSpec::new_string(
+ "secret-access-key",
+ "Secret Access Key",
+ "AWS Secret Access Key",
+ None,
+ glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
+ ),
]
});
@@ -480,6 +513,12 @@ impl ObjectImpl for S3Sink {
"uri" => {
let _ = self.set_uri(obj, value.get().expect("type checked upstream"));
}
+ "access-key" => {
+ settings.access_key = value.get().expect("type checked upstream");
+ }
+ "secret-access-key" => {
+ settings.secret_access_key = value.get().expect("type checked upstream");
+ }
_ => unimplemented!(),
}
}
@@ -500,6 +539,8 @@ impl ObjectImpl for S3Sink {
url.to_value()
}
+ "access-key" => settings.access_key.to_value(),
+ "secret-access-key" => settings.secret_access_key.to_value(),
_ => unimplemented!(),
}
}
diff --git a/net/rusoto/src/s3src/imp.rs b/net/rusoto/src/s3src/imp.rs
index 53b2bc3b..5f1c6f51 100644
--- a/net/rusoto/src/s3src/imp.rs
+++ b/net/rusoto/src/s3src/imp.rs
@@ -11,7 +11,9 @@ use std::sync::Mutex;
use bytes::Bytes;
use futures::future;
use once_cell::sync::Lazy;
-use rusoto_s3::*;
+use rusoto_core::request::HttpClient;
+use rusoto_credential::StaticProvider;
+use rusoto_s3::{GetObjectRequest, HeadObjectRequest, S3Client, S3};
use gst::glib;
use gst::prelude::*;
@@ -41,9 +43,25 @@ impl Default for StreamingState {
}
}
+struct Settings {
+ url: Option<GstS3Url>,
+ access_key: Option<String>,
+ secret_access_key: Option<String>,
+}
+
+impl Default for Settings {
+ fn default() -> Self {
+ Settings {
+ url: None,
+ access_key: None,
+ secret_access_key: None,
+ }
+ }
+}
+
#[derive(Default)]
pub struct S3Src {
- url: Mutex<Option<GstS3Url>>,
+ settings: Mutex<Settings>,
state: Mutex<StreamingState>,
canceller: Mutex<Option<future::AbortHandle>>,
}
@@ -66,7 +84,23 @@ impl S3Src {
}
fn connect(self: &S3Src, url: &GstS3Url) -> S3Client {
- S3Client::new(url.region.clone())
+ let settings = self.settings.lock().unwrap();
+
+ match (
+ settings.access_key.as_ref(),
+ settings.secret_access_key.as_ref(),
+ ) {
+ (Some(access_key), Some(secret_access_key)) => {
+ let creds =
+ StaticProvider::new_minimal(access_key.clone(), secret_access_key.clone());
+ S3Client::new_with(
+ HttpClient::new().expect("failed to create request dispatcher"),
+ creds,
+ url.region.clone(),
+ )
+ }
+ _ => S3Client::new(url.region.clone()),
+ }
}
fn set_uri(self: &S3Src, _: &super::S3Src, url_str: Option<&str>) -> Result<(), glib::Error> {
@@ -79,17 +113,17 @@ impl S3Src {
));
}
- let mut url = self.url.lock().unwrap();
+ let mut settings = self.settings.lock().unwrap();
if url_str.is_none() {
- *url = None;
+ settings.url = None;
return Ok(());
}
let url_str = url_str.unwrap();
match parse_s3_url(url_str) {
Ok(s3url) => {
- *url = Some(s3url);
+ settings.url = Some(s3url);
Ok(())
}
Err(_) => Err(glib::Error::new(
@@ -212,13 +246,29 @@ impl ObjectSubclass for S3Src {
impl ObjectImpl for S3Src {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
- vec![glib::ParamSpec::new_string(
- "uri",
- "URI",
- "The S3 object URI",
- None,
- glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
- )]
+ vec![
+ glib::ParamSpec::new_string(
+ "uri",
+ "URI",
+ "The S3 object URI",
+ None,
+ glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
+ ),
+ glib::ParamSpec::new_string(
+ "access-key",
+ "Access Key",
+ "AWS Access Key",
+ None,
+ glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
+ ),
+ glib::ParamSpec::new_string(
+ "secret-access-key",
+ "Secret Access Key",
+ "AWS Secret Access Key",
+ None,
+ glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
+ ),
+ ]
});
PROPERTIES.as_ref()
@@ -235,20 +285,32 @@ impl ObjectImpl for S3Src {
"uri" => {
let _ = self.set_uri(obj, value.get().expect("type checked upstream"));
}
+ "access-key" => {
+ let mut settings = self.settings.lock().unwrap();
+ settings.access_key = value.get().expect("type checked upstream");
+ }
+ "secret-access-key" => {
+ let mut settings = self.settings.lock().unwrap();
+ settings.secret_access_key = value.get().expect("type checked upstream");
+ }
_ => unimplemented!(),
}
}
fn property(&self, _: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
+ let settings = self.settings.lock().unwrap();
+
match pspec.name() {
"uri" => {
- let url = match *self.url.lock().unwrap() {
+ let url = match settings.url {
Some(ref url) => url.to_string(),
None => "".to_string(),
};
url.to_value()
}
+ "access-key" => settings.access_key.to_value(),
+ "secret-access-key" => settings.secret_access_key.to_value(),
_ => unimplemented!(),
}
}
@@ -302,7 +364,9 @@ impl URIHandlerImpl for S3Src {
}
fn uri(&self, _: &Self::Type) -> Option<String> {
- self.url.lock().unwrap().as_ref().map(|s| s.to_string())
+ let settings = self.settings.lock().unwrap();
+
+ settings.url.as_ref().map(|s| s.to_string())
}
fn set_uri(&self, element: &Self::Type, uri: &str) -> Result<(), glib::Error> {
@@ -329,7 +393,8 @@ impl BaseSrcImpl for S3Src {
unreachable!("RusotoS3Src is already started");
}
- let s3url = match *self.url.lock().unwrap() {
+ let settings = self.settings.lock().unwrap();
+ let s3url = match settings.url {
Some(ref url) => url.clone(),
None => {
return Err(gst::error_msg!(
@@ -338,6 +403,7 @@ impl BaseSrcImpl for S3Src {
));
}
};
+ drop(settings);
let s3client = self.connect(&s3url);
let size = self.head(src, &s3client, &s3url)?;