Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/sdroege/gst-plugin-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArun Raghavan <arun@asymptotic.io>2023-09-27 20:08:10 +0300
committerArun Raghavan <arun@asymptotic.io>2023-12-18 18:39:23 +0300
commita54b2dd39e9dcfef058763665d1a74342b65abe1 (patch)
tree47e0ec9f9cf4130dda8e77032dd4a7c40b8d4b62
parent0a27b9e6d9c305d69ab38e7aeb09737372f4f906 (diff)
aws: s3: Add a new awss3putobjectsink
When streaming small amounts of data, using awss3sink might not be a good idea, as we need to accumulate at least 5 MB of data for a multipart upload (or we flush on EOS). The alternative, while inefficient, is to do a complete PutObject of _all_ the data periodically so as to not lose data in case of a pipeline failure. This element makes a start on this idea by doing a PutObject for every buffer. Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1337>
-rw-r--r--net/aws/src/s3sink/mod.rs18
-rw-r--r--net/aws/src/s3sink/multipartsink.rs (renamed from net/aws/src/s3sink/imp.rs)0
-rw-r--r--net/aws/src/s3sink/putobjectsink.rs593
-rw-r--r--net/aws/tests/s3.rs118
4 files changed, 701 insertions, 28 deletions
diff --git a/net/aws/src/s3sink/mod.rs b/net/aws/src/s3sink/mod.rs
index 9198f025..78308966 100644
--- a/net/aws/src/s3sink/mod.rs
+++ b/net/aws/src/s3sink/mod.rs
@@ -1,4 +1,6 @@
// Copyright (C) 2019 Amazon.com, Inc. or its affiliates <mkolny@amazon.com>
+// Copyright (C) 2023 Asymptotic Inc
+// Author: Arun Raghavan <arun@asymptotic.io>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
@@ -9,7 +11,8 @@
use gst::glib;
use gst::prelude::*;
-mod imp;
+mod multipartsink;
+mod putobjectsink;
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)]
#[repr(u32)]
@@ -27,7 +30,11 @@ pub(crate) enum OnError {
}
glib::wrapper! {
- pub struct S3Sink(ObjectSubclass<imp::S3Sink>) @extends gst_base::BaseSink, gst::Element, gst::Object;
+ pub struct S3Sink(ObjectSubclass<multipartsink::S3Sink>) @extends gst_base::BaseSink, gst::Element, gst::Object;
+}
+
+glib::wrapper! {
+ pub struct S3PutObjectSink(ObjectSubclass<putobjectsink::S3PutObjectSink>) @extends gst_base::BaseSink, gst::Element, gst::Object;
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
@@ -43,5 +50,12 @@ pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
"awss3sink",
gst::Rank::PRIMARY,
S3Sink::static_type(),
+ )?;
+ gst::Element::register(
+ Some(plugin),
+ "awss3putobjectsink",
+ // This element should not be autoplugged as it is only useful for specific use cases
+ gst::Rank::NONE,
+ S3PutObjectSink::static_type(),
)
}
diff --git a/net/aws/src/s3sink/imp.rs b/net/aws/src/s3sink/multipartsink.rs
index 603823a5..603823a5 100644
--- a/net/aws/src/s3sink/imp.rs
+++ b/net/aws/src/s3sink/multipartsink.rs
diff --git a/net/aws/src/s3sink/putobjectsink.rs b/net/aws/src/s3sink/putobjectsink.rs
new file mode 100644
index 00000000..ae044bd1
--- /dev/null
+++ b/net/aws/src/s3sink/putobjectsink.rs
@@ -0,0 +1,593 @@
+// Copyright (C) 2019 Amazon.com, Inc. or its affiliates <mkolny@amazon.com>
+// Copyright (C) 2023 Asymptotic Inc
+// Author: Arun Raghavan <arun@asymptotic.io>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
+// If a copy of the MPL was not distributed with this file, You can obtain one at
+// <https://mozilla.org/MPL/2.0/>.
+//
+// SPDX-License-Identifier: MPL-2.0
+
+use gst::glib;
+use gst::prelude::*;
+use gst::subclass::prelude::*;
+use gst_base::subclass::prelude::*;
+
+use aws_sdk_s3::{
+ config::{self, retry::RetryConfig, Credentials, Region},
+ operation::put_object::builders::PutObjectFluentBuilder,
+ primitives::ByteStream,
+ Client,
+};
+
+use futures::future;
+use gst::glib::once_cell::sync::Lazy;
+use std::collections::HashMap;
+use std::convert::From;
+use std::sync::Mutex;
+use std::time::Duration;
+
+use crate::s3url::*;
+use crate::s3utils::{self, duration_from_millis, duration_to_millis, WaitError};
+
+const DEFAULT_RETRY_ATTEMPTS: u32 = 5;
+
+// General setting for create / abort requests
+const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 15_000;
+
+struct Started {
+ client: Client,
+ buffer: Vec<u8>,
+}
+
+impl Started {
+ pub fn new(client: Client, buffer: Vec<u8>) -> Started {
+ Started { client, buffer }
+ }
+}
+
+#[derive(Default)]
+enum State {
+ #[default]
+ Stopped,
+ Started(Started),
+}
+
+struct Settings {
+ region: Region,
+ bucket: Option<String>,
+ key: Option<String>,
+ content_type: Option<String>,
+ content_disposition: Option<String>,
+ access_key: Option<String>,
+ secret_access_key: Option<String>,
+ session_token: Option<String>,
+ metadata: Option<gst::Structure>,
+ retry_attempts: u32,
+ request_timeout: Duration,
+ endpoint_uri: Option<String>,
+}
+
+impl Settings {
+ fn to_uri(&self) -> String {
+ GstS3Url {
+ region: self.region.clone(),
+ bucket: self.bucket.clone().unwrap(),
+ object: self.key.clone().unwrap(),
+ version: None,
+ }
+ .to_string()
+ }
+
+ fn to_metadata(&self, imp: &S3PutObjectSink) -> Option<HashMap<String, String>> {
+ self.metadata.as_ref().map(|structure| {
+ let mut hash = HashMap::new();
+
+ for (key, value) in structure.iter() {
+ if let Ok(Ok(value_str)) = value.transform::<String>().map(|v| v.get()) {
+ gst::log!(CAT, imp: imp, "metadata '{}' -> '{}'", key, value_str);
+ hash.insert(key.to_string(), value_str);
+ } else {
+ gst::warning!(
+ CAT,
+ imp: imp,
+ "Failed to convert metadata '{}' to string ('{:?}')",
+ key,
+ value
+ );
+ }
+ }
+
+ hash
+ })
+ }
+}
+
+impl Default for Settings {
+ fn default() -> Self {
+ Settings {
+ region: Region::new("us-west-2"),
+ bucket: None,
+ key: None,
+ content_type: None,
+ content_disposition: None,
+ access_key: None,
+ secret_access_key: None,
+ session_token: None,
+ metadata: None,
+ retry_attempts: DEFAULT_RETRY_ATTEMPTS,
+ request_timeout: Duration::from_millis(DEFAULT_REQUEST_TIMEOUT_MSEC),
+ endpoint_uri: None,
+ }
+ }
+}
+
+#[derive(Default)]
+pub struct S3PutObjectSink {
+ url: Mutex<Option<GstS3Url>>,
+ settings: Mutex<Settings>,
+ state: Mutex<State>,
+ canceller: Mutex<Option<future::AbortHandle>>,
+}
+
+static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
+ gst::DebugCategory::new(
+ "awss3putobjectsink",
+ gst::DebugColorFlags::empty(),
+ Some("Amazon S3 PutObject Sink"),
+ )
+});
+
+impl S3PutObjectSink {
+ fn flush_buffer(&self) -> Result<(), Option<gst::ErrorMessage>> {
+ let put_object_req = self.create_put_object_request();
+
+ let put_object_req_future = put_object_req.send();
+ let _output =
+ s3utils::wait(&self.canceller, put_object_req_future).map_err(|err| match err {
+ WaitError::FutureError(err) => Some(gst::error_msg!(
+ gst::ResourceError::OpenWrite,
+ ["Failed to upload part: {}", err]
+ )),
+ WaitError::Cancelled => None,
+ })?;
+
+ gst::debug!(CAT, imp: self, "Uploaded complete");
+
+ Ok(())
+ }
+
+ fn create_put_object_request(&self) -> PutObjectFluentBuilder {
+ let url = self.url.lock().unwrap();
+ let settings = self.settings.lock().unwrap();
+ let state = self.state.lock().unwrap();
+ let state = match *state {
+ State::Started(ref started_state) => started_state,
+ State::Stopped => {
+ unreachable!("Element should be started");
+ }
+ };
+
+ let body = Some(ByteStream::from(state.buffer.clone()));
+
+ let bucket = Some(url.as_ref().unwrap().bucket.to_owned());
+ let key = Some(url.as_ref().unwrap().object.to_owned());
+ let metadata = settings.to_metadata(self);
+
+ let client = &state.client;
+
+ client
+ .put_object()
+ .set_body(body)
+ .set_bucket(bucket)
+ .set_key(key)
+ .set_metadata(metadata)
+ }
+
+ fn start(&self) -> Result<(), gst::ErrorMessage> {
+ let mut state = self.state.lock().unwrap();
+ let settings = self.settings.lock().unwrap();
+
+ if let State::Started { .. } = *state {
+ unreachable!("Element should be started");
+ }
+
+ let s3url = {
+ let url = self.url.lock().unwrap();
+ match *url {
+ Some(ref url) => url.clone(),
+ None => {
+ return Err(gst::error_msg!(
+ gst::ResourceError::Settings,
+ ["Cannot start without a URL being set"]
+ ));
+ }
+ }
+ };
+
+ let timeout_config = s3utils::timeout_config(settings.request_timeout);
+
+ let cred = match (
+ settings.access_key.as_ref(),
+ settings.secret_access_key.as_ref(),
+ ) {
+ (Some(access_key), Some(secret_access_key)) => Some(Credentials::new(
+ access_key.clone(),
+ secret_access_key.clone(),
+ settings.session_token.clone(),
+ None,
+ "aws-s3-putobject-sink",
+ )),
+ _ => None,
+ };
+
+ let sdk_config =
+ s3utils::wait_config(&self.canceller, s3url.region.clone(), timeout_config, cred)
+ .map_err(|err| match err {
+ WaitError::FutureError(err) => gst::error_msg!(
+ gst::ResourceError::OpenWrite,
+ ["Failed to create SDK config: {}", err]
+ ),
+ WaitError::Cancelled => {
+ gst::error_msg!(
+ gst::LibraryError::Failed,
+ ["SDK config request interrupted during start"]
+ )
+ }
+ })?;
+
+ let config_builder = config::Builder::from(&sdk_config)
+ .retry_config(RetryConfig::standard().with_max_attempts(settings.retry_attempts));
+
+ let config = if let Some(ref uri) = settings.endpoint_uri {
+ config_builder.endpoint_url(uri).build()
+ } else {
+ config_builder.build()
+ };
+
+ let client = Client::from_conf(config);
+
+ *state = State::Started(Started::new(client, Vec::new()));
+
+ Ok(())
+ }
+
+ fn cancel(&self) {
+ let mut canceller = self.canceller.lock().unwrap();
+
+ if let Some(c) = canceller.take() {
+ c.abort()
+ };
+ }
+
+ fn set_uri(self: &S3PutObjectSink, url_str: Option<&str>) -> Result<(), glib::Error> {
+ let state = self.state.lock().unwrap();
+
+ if let State::Started { .. } = *state {
+ return Err(glib::Error::new(
+ gst::URIError::BadState,
+ "Cannot set URI on a started s3sink",
+ ));
+ }
+
+ let mut url = self.url.lock().unwrap();
+
+ if url_str.is_none() {
+ *url = None;
+ return Ok(());
+ }
+
+ gst::debug!(CAT, imp: self, "Setting uri to {:?}", url_str);
+
+ let url_str = url_str.unwrap();
+ match parse_s3_url(url_str) {
+ Ok(s3url) => {
+ *url = Some(s3url);
+ Ok(())
+ }
+ Err(_) => Err(glib::Error::new(
+ gst::URIError::BadUri,
+ "Could not parse URI",
+ )),
+ }
+ }
+}
+
+#[glib::object_subclass]
+impl ObjectSubclass for S3PutObjectSink {
+ const NAME: &'static str = "GstAwsS3PutObjectSink";
+ type Type = super::S3PutObjectSink;
+ type ParentType = gst_base::BaseSink;
+}
+
+impl ObjectImpl for S3PutObjectSink {
+ fn properties() -> &'static [glib::ParamSpec] {
+ static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
+ vec![
+ glib::ParamSpecString::builder("bucket")
+ .nick("S3 Bucket")
+ .blurb("The bucket of the file to write")
+ .mutable_ready()
+ .build(),
+ glib::ParamSpecString::builder("key")
+ .nick("S3 Key")
+ .blurb("The key of the file to write")
+ .mutable_ready()
+ .build(),
+ glib::ParamSpecString::builder("region")
+ .nick("AWS Region")
+ .blurb("An AWS region (e.g. eu-west-2).")
+ .default_value(Some("us-west-2"))
+ .mutable_ready()
+ .build(),
+ glib::ParamSpecString::builder("uri")
+ .nick("URI")
+ .blurb("The S3 object URI")
+ .mutable_ready()
+ .build(),
+ glib::ParamSpecString::builder("access-key")
+ .nick("Access Key")
+ .blurb("AWS Access Key")
+ .mutable_ready()
+ .build(),
+ glib::ParamSpecString::builder("secret-access-key")
+ .nick("Secret Access Key")
+ .blurb("AWS Secret Access Key")
+ .mutable_ready()
+ .build(),
+ glib::ParamSpecString::builder("session-token")
+ .nick("Session Token")
+ .blurb("AWS temporary Session Token from STS")
+ .mutable_ready()
+ .build(),
+ glib::ParamSpecBoxed::builder::<gst::Structure>("metadata")
+ .nick("Metadata")
+ .blurb("A map of metadata to store with the object in S3; field values need to be convertible to strings.")
+ .mutable_ready()
+ .build(),
+ glib::ParamSpecUInt::builder("retry-attempts")
+ .nick("Retry attempts")
+ .blurb("Number of times AWS SDK attempts a request before abandoning the request")
+ .minimum(1)
+ .maximum(10)
+ .default_value(DEFAULT_RETRY_ATTEMPTS)
+ .build(),
+ glib::ParamSpecInt64::builder("request-timeout")
+ .nick("Request timeout")
+ .blurb("Timeout for general S3 requests (in ms, set to -1 for infinity)")
+ .minimum(-1)
+ .default_value(DEFAULT_REQUEST_TIMEOUT_MSEC as i64)
+ .build(),
+ glib::ParamSpecString::builder("endpoint-uri")
+ .nick("S3 endpoint URI")
+ .blurb("The S3 endpoint URI to use")
+ .build(),
+ glib::ParamSpecString::builder("content-type")
+ .nick("content-type")
+ .blurb("Content-Type header to set for uploaded object")
+ .build(),
+ glib::ParamSpecString::builder("content-disposition")
+ .nick("content-disposition")
+ .blurb("Content-Disposition header to set for uploaded object")
+ .build(),
+ ]
+ });
+
+ PROPERTIES.as_ref()
+ }
+
+ fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
+ let mut settings = self.settings.lock().unwrap();
+
+ gst::debug!(
+ CAT,
+ imp: self,
+ "Setting property '{}' to '{:?}'",
+ pspec.name(),
+ value
+ );
+
+ match pspec.name() {
+ "bucket" => {
+ settings.bucket = value
+ .get::<Option<String>>()
+ .expect("type checked upstream");
+ if settings.key.is_some() {
+ let _ = self.set_uri(Some(&settings.to_uri()));
+ }
+ }
+ "key" => {
+ settings.key = value
+ .get::<Option<String>>()
+ .expect("type checked upstream");
+ if settings.bucket.is_some() {
+ let _ = self.set_uri(Some(&settings.to_uri()));
+ }
+ }
+ "region" => {
+ let region = value.get::<String>().expect("type checked upstream");
+ settings.region = Region::new(region);
+ if settings.key.is_some() && settings.bucket.is_some() {
+ let _ = self.set_uri(Some(&settings.to_uri()));
+ }
+ }
+ "uri" => {
+ let _ = self.set_uri(value.get().expect("type checked upstream"));
+ }
+ "access-key" => {
+ settings.access_key = value.get().expect("type checked upstream");
+ }
+ "secret-access-key" => {
+ settings.secret_access_key = value.get().expect("type checked upstream");
+ }
+ "session-token" => {
+ settings.session_token = value.get().expect("type checked upstream");
+ }
+ "metadata" => {
+ settings.metadata = value.get().expect("type checked upstream");
+ }
+ "retry-attempts" => {
+ settings.retry_attempts = value.get::<u32>().expect("type checked upstream");
+ }
+ "request-timeout" => {
+ settings.request_timeout =
+ duration_from_millis(value.get::<i64>().expect("type checked upstream"));
+ }
+ "endpoint-uri" => {
+ settings.endpoint_uri = value
+ .get::<Option<String>>()
+ .expect("type checked upstream");
+ if settings.key.is_some() && settings.bucket.is_some() {
+ let _ = self.set_uri(Some(&settings.to_uri()));
+ }
+ }
+ "content-type" => {
+ settings.content_type = value
+ .get::<Option<String>>()
+ .expect("type checked upstream");
+ }
+ "content-disposition" => {
+ settings.content_disposition = value
+ .get::<Option<String>>()
+ .expect("type checked upstream");
+ }
+ _ => unimplemented!(),
+ }
+ }
+
+ fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
+ let settings = self.settings.lock().unwrap();
+
+ match pspec.name() {
+ "key" => settings.key.to_value(),
+ "bucket" => settings.bucket.to_value(),
+ "region" => settings.region.to_string().to_value(),
+ "uri" => {
+ let url = self.url.lock().unwrap();
+ let url = match *url {
+ Some(ref url) => url.to_string(),
+ None => "".to_string(),
+ };
+
+ url.to_value()
+ }
+ "access-key" => settings.access_key.to_value(),
+ "secret-access-key" => settings.secret_access_key.to_value(),
+ "session-token" => settings.session_token.to_value(),
+ "metadata" => settings.metadata.to_value(),
+ "retry-attempts" => settings.retry_attempts.to_value(),
+ "request-timeout" => duration_to_millis(Some(settings.request_timeout)).to_value(),
+ "endpoint-uri" => settings.endpoint_uri.to_value(),
+ "content-type" => settings.content_type.to_value(),
+ "content-disposition" => settings.content_disposition.to_value(),
+ _ => unimplemented!(),
+ }
+ }
+}
+
+impl GstObjectImpl for S3PutObjectSink {}
+
+impl ElementImpl for S3PutObjectSink {
+ fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
+ static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
+ gst::subclass::ElementMetadata::new(
+ "Amazon S3 PutObject sink",
+ "Source/Network",
+ "Writes an object to Amazon S3 using PutObject (mostly useful for small files)",
+ "Arun Raghavan <arun@asymptotic.io>",
+ )
+ });
+
+ Some(&*ELEMENT_METADATA)
+ }
+
+ fn pad_templates() -> &'static [gst::PadTemplate] {
+ static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
+ let caps = gst::Caps::new_any();
+ let sink_pad_template = gst::PadTemplate::new(
+ "sink",
+ gst::PadDirection::Sink,
+ gst::PadPresence::Always,
+ &caps,
+ )
+ .unwrap();
+
+ vec![sink_pad_template]
+ });
+
+ PAD_TEMPLATES.as_ref()
+ }
+}
+
+impl BaseSinkImpl for S3PutObjectSink {
+ fn start(&self) -> Result<(), gst::ErrorMessage> {
+ self.start()
+ }
+
+ fn stop(&self) -> Result<(), gst::ErrorMessage> {
+ let mut state = self.state.lock().unwrap();
+
+ *state = State::Stopped;
+ gst::info!(CAT, imp: self, "Stopped");
+
+ Ok(())
+ }
+
+ fn render(&self, buffer: &gst::Buffer) -> Result<gst::FlowSuccess, gst::FlowError> {
+ let mut state = self.state.lock().unwrap();
+
+ let started_state = match *state {
+ State::Started(ref mut s) => s,
+ State::Stopped => {
+ gst::element_imp_error!(self, gst::CoreError::Failed, ["Not started yet"]);
+ return Err(gst::FlowError::Error);
+ }
+ };
+
+ gst::trace!(CAT, imp: self, "Rendering {:?}", buffer);
+ let map = buffer.map_readable().map_err(|_| {
+ gst::element_imp_error!(self, gst::CoreError::Failed, ["Failed to map buffer"]);
+ gst::FlowError::Error
+ })?;
+
+ started_state.buffer.extend_from_slice(map.as_slice());
+ drop(state);
+
+ match self.flush_buffer() {
+ Ok(_) => Ok(gst::FlowSuccess::Ok),
+ Err(err) => match err {
+ Some(error_message) => {
+ gst::error!(CAT, imp: self, "Upload failed: {}", error_message);
+ self.post_error_message(error_message);
+ Err(gst::FlowError::Error)
+ }
+ _ => {
+ gst::info!(CAT, imp: self, "Upload interrupted. Flushing...");
+ Err(gst::FlowError::Flushing)
+ }
+ },
+ }
+ }
+
+ fn unlock(&self) -> Result<(), gst::ErrorMessage> {
+ self.cancel();
+
+ Ok(())
+ }
+
+ fn event(&self, event: gst::Event) -> bool {
+ if let gst::EventView::Eos(_) = event.view() {
+ if let Err(error_message) = self.flush_buffer() {
+ gst::error!(
+ CAT,
+ imp: self,
+ "Failed to finalize the upload: {:?}",
+ error_message
+ );
+ return false;
+ }
+ }
+
+ BaseSinkImplExt::parent_event(self, event)
+ }
+}
diff --git a/net/aws/tests/s3.rs b/net/aws/tests/s3.rs
index f4ca8a96..c9d141e6 100644
--- a/net/aws/tests/s3.rs
+++ b/net/aws/tests/s3.rs
@@ -10,8 +10,6 @@
// The test times out on Windows for some reason, skip until we figure out why
#[cfg(not(target_os = "windows"))]
-#[test_with::env(AWS_ACCESS_KEY_ID)]
-#[test_with::env(AWS_SECRET_ACCESS_KEY)]
#[cfg(test)]
mod tests {
use gst::prelude::*;
@@ -30,8 +28,29 @@ mod tests {
});
}
+ async fn delete_object(region: String, bucket: &str, key: &str) {
+ let region_provider = aws_config::meta::region::RegionProviderChain::first_try(
+ aws_sdk_s3::config::Region::new(region),
+ )
+ .or_default_provider();
+
+ let config = aws_config::defaults(aws_config::BehaviorVersion::latest())
+ .region(region_provider)
+ .load()
+ .await;
+ let client = aws_sdk_s3::Client::new(&config);
+
+ client
+ .delete_object()
+ .bucket(bucket)
+ .key(key)
+ .send()
+ .await
+ .unwrap();
+ }
+
// Common helper
- async fn do_s3_test(key_prefix: &str) {
+ async fn do_s3_multipart_test(key_prefix: &str) {
init();
let region = std::env::var("AWS_REGION").unwrap_or_else(|_| DEFAULT_S3_REGION.to_string());
@@ -45,12 +64,17 @@ mod tests {
let mut h1 = gst_check::Harness::new_empty();
// Need to add_parse() because the Harness API / Rust bindings aren't conducive to creating and
// adding an element manually
+
h1.add_parse(format!("awss3sink uri=\"{uri}\"").as_str());
h1.set_src_caps(gst::Caps::builder("text/plain").build());
h1.play();
h1.push(gst::Buffer::from_slice(content)).unwrap();
+ h1.push(gst::Buffer::from_slice(content)).unwrap();
+ h1.push(gst::Buffer::from_slice(content)).unwrap();
+ h1.push(gst::Buffer::from_slice(content)).unwrap();
+ h1.push(gst::Buffer::from_slice(content)).unwrap();
h1.push_event(gst::event::Eos::new());
let mut h2 = gst_check::Harness::new("awss3src");
@@ -59,42 +83,84 @@ mod tests {
let buf = h2.pull_until_eos().unwrap().unwrap();
assert_eq!(
- content,
+ content.repeat(5),
buf.into_mapped_buffer_readable().unwrap().as_slice()
);
- let region_provider = aws_config::meta::region::RegionProviderChain::first_try(
- aws_sdk_s3::config::Region::new(region.clone()),
- )
- .or_default_provider();
+ delete_object(region.clone(), &bucket, &key).await;
+ }
- let config = aws_config::defaults(aws_config::BehaviorVersion::latest())
- .region(region_provider)
- .load()
- .await;
- let client = aws_sdk_s3::Client::new(&config);
+ // Common helper
+ async fn do_s3_putobject_test(key_prefix: &str) {
+ init();
- client
- .delete_object()
- .bucket(bucket)
- .key(key)
- .send()
- .await
- .unwrap();
+ let region = std::env::var("AWS_REGION").unwrap_or_else(|_| DEFAULT_S3_REGION.to_string());
+ let bucket =
+ std::env::var("AWS_S3_BUCKET").unwrap_or_else(|_| "gst-plugins-rs-tests".to_string());
+ let key = format!("{key_prefix}-{:?}.txt", chrono::Utc::now());
+ let uri = format!("s3://{region}/{bucket}/{key}");
+ let content = "Hello, world!\n".as_bytes();
+
+ // Manually add the element so we can configure it before it goes to PLAYING
+ let mut h1 = gst_check::Harness::new_empty();
+ // Need to add_parse() because the Harness API / Rust bindings aren't conducive to creating and
+ // adding an element manually
+
+ h1.add_parse(
+ format!("awss3putobjectsink key=\"{key}\" region=\"{region}\" bucket=\"{bucket}\"")
+ .as_str(),
+ );
+
+ h1.set_src_caps(gst::Caps::builder("text/plain").build());
+ h1.play();
+
+ h1.push(gst::Buffer::from_slice(content)).unwrap();
+ h1.push(gst::Buffer::from_slice(content)).unwrap();
+ h1.push(gst::Buffer::from_slice(content)).unwrap();
+ h1.push(gst::Buffer::from_slice(content)).unwrap();
+ h1.push(gst::Buffer::from_slice(content)).unwrap();
+ h1.push_event(gst::event::Eos::new());
+
+ let mut h2 = gst_check::Harness::new("awss3src");
+ h2.element().unwrap().set_property("uri", uri.clone());
+ h2.play();
+
+ let buf = h2.pull_until_eos().unwrap().unwrap();
+ assert_eq!(
+ content.repeat(5),
+ buf.into_mapped_buffer_readable().unwrap().as_slice()
+ );
+
+ delete_object(region.clone(), &bucket, &key).await;
+ }
+
+ #[tokio::test]
+ async fn test_s3_multipart_simple() {
+ do_s3_multipart_test("s3-test").await;
+ }
+
+ #[tokio::test]
+ async fn test_s3_multipart_whitespace() {
+ do_s3_multipart_test("s3 test").await;
+ }
+
+ #[tokio::test]
+ async fn test_s3_multipart_unicode() {
+ do_s3_multipart_test("s3 🧪 😱").await;
}
#[tokio::test]
- async fn test_s3_simple() {
- do_s3_test("s3-test").await;
+ async fn test_s3_put_object_simple() {
+ do_s3_putobject_test("s3-put-object-test").await;
}
#[tokio::test]
- async fn test_s3_whitespace() {
- do_s3_test("s3 test").await;
+ async fn test_s3_put_object_whitespace() {
+ do_s3_putobject_test("s3 put object test").await;
}
#[tokio::test]
- async fn test_s3_unicode() {
- do_s3_test("s3 🧪 😱").await;
+ async fn test_s3_put_object_unicode() {
+ do_s3_putobject_test("s3 put object 🧪 😱").await;
}
}