Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/net
diff options
context:
space:
mode:
authorSanchayan Maity <sanchayan@asymptotic.io>2021-04-30 16:05:35 +0300
committerSanchayan Maity <sanchayan@asymptotic.io>2021-05-01 11:25:49 +0300
commitbf5e231e5b8c8c978fb74cfc8f0910a46906f422 (patch)
treef72142852ac75fb64540a58e31994fc9e5530b2a /net
parent8bda233d02e24711d94cba73ad704f8e3606fe2a (diff)
rusoto: s3sink: Implement support for GstUriHandler interface
With the URI handler interface implemented, we can drop the old method of specifying bucket, key and region. This also brings it in line with how it is for s3src.
Diffstat (limited to 'net')
-rw-r--r--net/rusoto/README.md6
-rw-r--r--net/rusoto/src/s3sink/imp.rs166
2 files changed, 96 insertions, 76 deletions
diff --git a/net/rusoto/README.md b/net/rusoto/README.md
index 19460fbb2..18c49900c 100644
--- a/net/rusoto/README.md
+++ b/net/rusoto/README.md
@@ -38,15 +38,15 @@ $ gst-launch-1.0 \
## s3sink
-Writes data to a specified S3 bucket. The `region` parameter is optional, and
-if not specified, the default parameter will be used (from `.aws/config` file).
+Writes data to a specified S3 (region, bucket, object, version?) tuple. The
+version may be omitted.
```
$ gst-launch-1.0 \
videotestsrc ! \
theoraenc ! \
oggmux ! \
- s3sink bucket=example-bucket key=my/file.ogv region=us-west-1
+ s3sink uri=s3://us-west-1/example-bucket/my/file.ogv?version=my-optional-version
```
## awstranscriber
diff --git a/net/rusoto/src/s3sink/imp.rs b/net/rusoto/src/s3sink/imp.rs
index 55de4cef2..c02e91f5a 100644
--- a/net/rusoto/src/s3sink/imp.rs
+++ b/net/rusoto/src/s3sink/imp.rs
@@ -16,7 +16,6 @@ use gst::{gst_error, gst_info, gst_trace};
use gst_base::subclass::prelude::*;
use futures::future;
-use rusoto_core::region::Region;
use rusoto_s3::{
CompleteMultipartUploadRequest, CompletedMultipartUpload, CompletedPart,
CreateMultipartUploadRequest, S3Client, UploadPartRequest, S3,
@@ -25,9 +24,9 @@ use rusoto_s3::{
use once_cell::sync::Lazy;
use std::convert::From;
-use std::str::FromStr;
use std::sync::Mutex;
+use crate::s3url::*;
use crate::s3utils::{self, WaitError};
struct Started {
@@ -82,15 +81,13 @@ impl Default for State {
const DEFAULT_BUFFER_SIZE: u64 = 5 * 1024 * 1024;
struct Settings {
- region: Region,
- bucket: Option<String>,
- key: Option<String>,
content_type: Option<String>,
buffer_size: u64,
}
#[derive(Default)]
pub struct S3Sink {
+ url: Mutex<Option<GstS3Url>>,
settings: Mutex<Settings>,
state: Mutex<State>,
canceller: Mutex<Option<future::AbortHandle>>,
@@ -107,9 +104,6 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
impl Default for Settings {
fn default() -> Self {
Settings {
- region: Region::default(),
- bucket: None,
- key: None,
content_type: None,
buffer_size: DEFAULT_BUFFER_SIZE,
}
@@ -153,6 +147,7 @@ impl S3Sink {
}
fn create_upload_part_request(&self) -> Result<UploadPartRequest, gst::ErrorMessage> {
+ let url = self.url.lock().unwrap();
let settings = self.settings.lock().unwrap();
let mut state = self.state.lock().unwrap();
let state = match *state {
@@ -168,8 +163,8 @@ impl S3Sink {
&mut state.buffer,
Vec::with_capacity(settings.buffer_size as usize),
))),
- bucket: settings.bucket.as_ref().unwrap().to_owned(),
- key: settings.key.as_ref().unwrap().to_owned(),
+ bucket: url.as_ref().unwrap().bucket.to_owned(),
+ key: url.as_ref().unwrap().object.to_owned(),
upload_id: state.upload_id.to_owned(),
part_number,
..Default::default()
@@ -179,7 +174,6 @@ impl S3Sink {
fn create_complete_multipart_upload_request(
&self,
started_state: &mut Started,
- settings: &Settings,
) -> CompleteMultipartUploadRequest {
started_state
.completed_parts
@@ -192,9 +186,10 @@ impl S3Sink {
)),
};
+ let url = self.url.lock().unwrap();
CompleteMultipartUploadRequest {
- bucket: settings.bucket.as_ref().unwrap().to_owned(),
- key: settings.key.as_ref().unwrap().to_owned(),
+ bucket: url.as_ref().unwrap().bucket.to_owned(),
+ key: url.as_ref().unwrap().object.to_owned(),
upload_id: started_state.upload_id.to_owned(),
multipart_upload: Some(completed_upload),
..Default::default()
@@ -203,24 +198,15 @@ impl S3Sink {
fn create_create_multipart_upload_request(
&self,
+ url: &GstS3Url,
settings: &Settings,
- ) -> Result<CreateMultipartUploadRequest, gst::ErrorMessage> {
- if settings.bucket.is_none() || settings.key.is_none() {
- return Err(gst::error_msg!(
- gst::ResourceError::Settings,
- ["Bucket or key is not defined"]
- ));
- }
-
- let bucket = settings.bucket.as_ref().unwrap();
- let key = settings.key.as_ref().unwrap();
-
- Ok(CreateMultipartUploadRequest {
- bucket: bucket.clone(),
- key: key.clone(),
+ ) -> CreateMultipartUploadRequest {
+ CreateMultipartUploadRequest {
+ bucket: url.bucket.clone(),
+ key: url.object.clone(),
content_type: settings.content_type.clone(),
..Default::default()
- })
+ }
}
fn finalize_upload(&self, element: &super::S3Sink) -> Result<(), gst::ErrorMessage> {
@@ -239,9 +225,7 @@ impl S3Sink {
}
};
- let settings = self.settings.lock().unwrap();
-
- let complete_req = self.create_complete_multipart_upload_request(started_state, &settings);
+ let complete_req = self.create_complete_multipart_upload_request(started_state);
let complete_req_future = started_state.client.complete_multipart_upload(complete_req);
s3utils::wait(&self.canceller, complete_req_future)
@@ -265,9 +249,19 @@ impl S3Sink {
unreachable!("Element should be started");
}
- let client = S3Client::new(settings.region.clone());
+ let s3url = match *self.url.lock().unwrap() {
+ Some(ref url) => url.clone(),
+ None => {
+ return Err(gst::error_msg!(
+ gst::ResourceError::Settings,
+ ["Cannot start without a URL being set"]
+ ));
+ }
+ };
+
+ let client = S3Client::new(s3url.region.clone());
- let create_multipart_req = self.create_create_multipart_upload_request(&settings)?;
+ let create_multipart_req = self.create_create_multipart_upload_request(&s3url, &settings);
let create_multipart_req_future = client.create_multipart_upload(create_multipart_req);
let response = s3utils::wait(&self.canceller, create_multipart_req_future).map_err(
@@ -339,6 +333,36 @@ impl S3Sink {
c.abort()
};
}
+
+ fn set_uri(self: &S3Sink, _: &super::S3Sink, url_str: Option<&str>) -> Result<(), glib::Error> {
+ let state = self.state.lock().unwrap();
+
+ if let State::Started { .. } = *state {
+ return Err(glib::Error::new(
+ gst::URIError::BadState,
+ "Cannot set URI on a started s3sink",
+ ));
+ }
+
+ let mut url = self.url.lock().unwrap();
+
+ if url_str.is_none() {
+ *url = None;
+ return Ok(());
+ }
+
+ let url_str = url_str.unwrap();
+ match parse_s3_url(url_str) {
+ Ok(s3url) => {
+ *url = Some(s3url);
+ Ok(())
+ }
+ Err(_) => Err(glib::Error::new(
+ gst::URIError::BadUri,
+ "Could not parse URI",
+ )),
+ }
+ }
}
#[glib::object_subclass]
@@ -346,33 +370,13 @@ impl ObjectSubclass for S3Sink {
const NAME: &'static str = "RusotoS3Sink";
type Type = super::S3Sink;
type ParentType = gst_base::BaseSink;
+ type Interfaces = (gst::URIHandler,);
}
impl ObjectImpl for S3Sink {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![
- glib::ParamSpec::new_string(
- "bucket",
- "S3 Bucket",
- "The bucket of the file to write",
- None,
- glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
- ),
- glib::ParamSpec::new_string(
- "key",
- "S3 Key",
- "The key of the file to write",
- None,
- glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
- ),
- glib::ParamSpec::new_string(
- "region",
- "AWS Region",
- "An AWS region (e.g. eu-west-2).",
- None,
- glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
- ),
glib::ParamSpec::new_uint64(
"part-size",
"Part size",
@@ -382,6 +386,13 @@ impl ObjectImpl for S3Sink {
DEFAULT_BUFFER_SIZE,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
+ glib::ParamSpec::new_string(
+ "uri",
+ "URI",
+ "The S3 object URI",
+ None,
+ glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
+ ),
]
});
@@ -390,7 +401,7 @@ impl ObjectImpl for S3Sink {
fn set_property(
&self,
- _obj: &Self::Type,
+ obj: &Self::Type,
_id: usize,
value: &glib::Value,
pspec: &glib::ParamSpec,
@@ -398,24 +409,12 @@ impl ObjectImpl for S3Sink {
let mut settings = self.settings.lock().unwrap();
match pspec.name() {
- "bucket" => {
- settings.bucket = value
- .get::<Option<String>>()
- .expect("type checked upstream");
- }
- "key" => {
- settings.key = value
- .get::<Option<String>>()
- .expect("type checked upstream");
- }
- "region" => {
- settings.region =
- Region::from_str(&value.get::<String>().expect("type checked upstream"))
- .unwrap();
- }
"part-size" => {
settings.buffer_size = value.get::<u64>().expect("type checked upstream");
}
+ "uri" => {
+ let _ = self.set_uri(obj, value.get().expect("type checked upstream"));
+ }
_ => unimplemented!(),
}
}
@@ -424,10 +423,15 @@ impl ObjectImpl for S3Sink {
let settings = self.settings.lock().unwrap();
match pspec.name() {
- "key" => settings.key.to_value(),
- "bucket" => settings.bucket.to_value(),
- "region" => settings.region.name().to_value(),
"part-size" => settings.buffer_size.to_value(),
+ "uri" => {
+ let url = match *self.url.lock().unwrap() {
+ Some(ref url) => url.to_string(),
+ None => "".to_string(),
+ };
+
+ url.to_value()
+ }
_ => unimplemented!(),
}
}
@@ -465,6 +469,22 @@ impl ElementImpl for S3Sink {
}
}
+impl URIHandlerImpl for S3Sink {
+ const URI_TYPE: gst::URIType = gst::URIType::Sink;
+
+ fn protocols() -> &'static [&'static str] {
+ &["s3"]
+ }
+
+ fn uri(&self, _: &Self::Type) -> Option<String> {
+ self.url.lock().unwrap().as_ref().map(|s| s.to_string())
+ }
+
+ fn set_uri(&self, element: &Self::Type, uri: &str) -> Result<(), glib::Error> {
+ self.set_uri(element, Some(uri))
+ }
+}
+
impl BaseSinkImpl for S3Sink {
fn start(&self, _element: &Self::Type) -> Result<(), gst::ErrorMessage> {
self.start()