Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/sdroege/gst-plugin-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'net/aws/src/s3sink/multipartsink.rs')
-rw-r--r--net/aws/src/s3sink/multipartsink.rs1070
1 files changed, 1070 insertions, 0 deletions
diff --git a/net/aws/src/s3sink/multipartsink.rs b/net/aws/src/s3sink/multipartsink.rs
new file mode 100644
index 00000000..603823a5
--- /dev/null
+++ b/net/aws/src/s3sink/multipartsink.rs
@@ -0,0 +1,1070 @@
+// Copyright (C) 2019 Amazon.com, Inc. or its affiliates <mkolny@amazon.com>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
+// If a copy of the MPL was not distributed with this file, You can obtain one at
+// <https://mozilla.org/MPL/2.0/>.
+//
+// SPDX-License-Identifier: MPL-2.0
+
+use gst::glib;
+use gst::prelude::*;
+use gst::subclass::prelude::*;
+use gst_base::prelude::*;
+use gst_base::subclass::prelude::*;
+
+use aws_sdk_s3::{
+ config::{self, retry::RetryConfig, Credentials, Region},
+ operation::{
+ abort_multipart_upload::builders::AbortMultipartUploadFluentBuilder,
+ complete_multipart_upload::builders::CompleteMultipartUploadFluentBuilder,
+ create_multipart_upload::builders::CreateMultipartUploadFluentBuilder,
+ upload_part::builders::UploadPartFluentBuilder,
+ },
+ primitives::ByteStream,
+ types::{CompletedMultipartUpload, CompletedPart},
+ Client,
+};
+
+use futures::future;
+use gst::glib::once_cell::sync::Lazy;
+use std::collections::HashMap;
+use std::convert::From;
+use std::sync::Mutex;
+use std::time::Duration;
+
+use crate::s3url::*;
+use crate::s3utils::{self, duration_from_millis, duration_to_millis, WaitError};
+
+use super::OnError;
+
+const DEFAULT_RETRY_ATTEMPTS: u32 = 5;
+const DEFAULT_BUFFER_SIZE: u64 = 5 * 1024 * 1024;
+const DEFAULT_MULTIPART_UPLOAD_ON_ERROR: OnError = OnError::DoNothing;
+
+// General setting for create / abort requests
+const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 15_000;
+const DEFAULT_RETRY_DURATION_MSEC: u64 = 60_000;
+// This needs to be independently configurable, as the part size can be upto 5GB
+const DEFAULT_UPLOAD_PART_REQUEST_TIMEOUT_MSEC: u64 = 10_000;
+const DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC: u64 = 60_000;
+// CompletedMultipartUpload can take minutes to complete, so we need a longer value here
+// https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
+const DEFAULT_COMPLETE_REQUEST_TIMEOUT_MSEC: u64 = 600_000; // 10 minutes
+const DEFAULT_COMPLETE_RETRY_DURATION_MSEC: u64 = 3_600_000; // 60 minutes
+
+struct Started {
+ client: Client,
+ buffer: Vec<u8>,
+ upload_id: String,
+ part_number: i64,
+ completed_parts: Vec<CompletedPart>,
+}
+
+impl Started {
+ pub fn new(client: Client, buffer: Vec<u8>, upload_id: String) -> Started {
+ Started {
+ client,
+ buffer,
+ upload_id,
+ part_number: 0,
+ completed_parts: Vec::new(),
+ }
+ }
+
+ pub fn increment_part_number(&mut self) -> Result<i64, gst::ErrorMessage> {
+ // https://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html
+ const MAX_MULTIPART_NUMBER: i64 = 10000;
+
+ if self.part_number > MAX_MULTIPART_NUMBER {
+ return Err(gst::error_msg!(
+ gst::ResourceError::Failed,
+ [
+ "Maximum number of parts ({}) reached.",
+ MAX_MULTIPART_NUMBER
+ ]
+ ));
+ }
+
+ self.part_number += 1;
+ Ok(self.part_number)
+ }
+}
+
+#[derive(Default)]
+enum State {
+ #[default]
+ Stopped,
+ Completed,
+ Started(Started),
+}
+
+struct Settings {
+ region: Region,
+ bucket: Option<String>,
+ key: Option<String>,
+ content_type: Option<String>,
+ content_disposition: Option<String>,
+ buffer_size: u64,
+ access_key: Option<String>,
+ secret_access_key: Option<String>,
+ session_token: Option<String>,
+ metadata: Option<gst::Structure>,
+ retry_attempts: u32,
+ multipart_upload_on_error: OnError,
+ request_timeout: Duration,
+ endpoint_uri: Option<String>,
+}
+
+impl Settings {
+ fn to_uri(&self) -> String {
+ GstS3Url {
+ region: self.region.clone(),
+ bucket: self.bucket.clone().unwrap(),
+ object: self.key.clone().unwrap(),
+ version: None,
+ }
+ .to_string()
+ }
+
+ fn to_metadata(&self, imp: &S3Sink) -> Option<HashMap<String, String>> {
+ self.metadata.as_ref().map(|structure| {
+ let mut hash = HashMap::new();
+
+ for (key, value) in structure.iter() {
+ if let Ok(Ok(value_str)) = value.transform::<String>().map(|v| v.get()) {
+ gst::log!(CAT, imp: imp, "metadata '{}' -> '{}'", key, value_str);
+ hash.insert(key.to_string(), value_str);
+ } else {
+ gst::warning!(
+ CAT,
+ imp: imp,
+ "Failed to convert metadata '{}' to string ('{:?}')",
+ key,
+ value
+ );
+ }
+ }
+
+ hash
+ })
+ }
+}
+
+impl Default for Settings {
+ fn default() -> Self {
+ Settings {
+ region: Region::new("us-west-2"),
+ bucket: None,
+ key: None,
+ content_type: None,
+ content_disposition: None,
+ access_key: None,
+ secret_access_key: None,
+ session_token: None,
+ metadata: None,
+ buffer_size: DEFAULT_BUFFER_SIZE,
+ retry_attempts: DEFAULT_RETRY_ATTEMPTS,
+ multipart_upload_on_error: DEFAULT_MULTIPART_UPLOAD_ON_ERROR,
+ request_timeout: Duration::from_millis(DEFAULT_REQUEST_TIMEOUT_MSEC),
+ endpoint_uri: None,
+ }
+ }
+}
+
+#[derive(Default)]
+pub struct S3Sink {
+ url: Mutex<Option<GstS3Url>>,
+ settings: Mutex<Settings>,
+ state: Mutex<State>,
+ canceller: Mutex<Option<future::AbortHandle>>,
+ abort_multipart_canceller: Mutex<Option<future::AbortHandle>>,
+}
+
+static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
+ gst::DebugCategory::new(
+ "aws3sink",
+ gst::DebugColorFlags::empty(),
+ Some("Amazon S3 Sink"),
+ )
+});
+
+impl S3Sink {
+ fn flush_multipart_upload(&self, state: &mut Started) {
+ let settings = self.settings.lock().unwrap();
+ match settings.multipart_upload_on_error {
+ OnError::Abort => {
+ gst::log!(
+ CAT,
+ imp: self,
+ "Aborting multipart upload request with id: {}",
+ state.upload_id
+ );
+ match self.abort_multipart_upload_request(state) {
+ Ok(()) => {
+ gst::log!(
+ CAT,
+ imp: self,
+ "Aborting multipart upload request succeeded."
+ );
+ }
+ Err(err) => gst::error!(
+ CAT,
+ imp: self,
+ "Aborting multipart upload failed: {}",
+ err.to_string()
+ ),
+ }
+ }
+ OnError::Complete => {
+ gst::log!(
+ CAT,
+ imp: self,
+ "Completing multipart upload request with id: {}",
+ state.upload_id
+ );
+ match self.complete_multipart_upload_request(state) {
+ Ok(()) => {
+ gst::log!(
+ CAT,
+ imp: self,
+ "Complete multipart upload request succeeded."
+ );
+ }
+ Err(err) => gst::error!(
+ CAT,
+ imp: self,
+ "Completing multipart upload failed: {}",
+ err.to_string()
+ ),
+ }
+ }
+ OnError::DoNothing => (),
+ }
+ }
+
+ fn flush_current_buffer(&self) -> Result<(), Option<gst::ErrorMessage>> {
+ let upload_part_req = self.create_upload_part_request()?;
+
+ let mut state = self.state.lock().unwrap();
+ let state = match *state {
+ State::Started(ref mut started_state) => started_state,
+ State::Completed => {
+ unreachable!("Upload should not be completed yet");
+ }
+ State::Stopped => {
+ unreachable!("Element should be started");
+ }
+ };
+
+ let part_number = state.part_number;
+
+ let upload_part_req_future = upload_part_req.send();
+ let output =
+ s3utils::wait(&self.canceller, upload_part_req_future).map_err(|err| match err {
+ WaitError::FutureError(err) => {
+ self.flush_multipart_upload(state);
+ Some(gst::error_msg!(
+ gst::ResourceError::OpenWrite,
+ ["Failed to upload part: {}", err]
+ ))
+ }
+ WaitError::Cancelled => None,
+ })?;
+
+ let completed_part = CompletedPart::builder()
+ .set_e_tag(output.e_tag)
+ .set_part_number(Some(part_number as i32))
+ .build();
+ state.completed_parts.push(completed_part);
+
+ gst::info!(CAT, imp: self, "Uploaded part {}", part_number);
+
+ Ok(())
+ }
+
+ fn create_upload_part_request(&self) -> Result<UploadPartFluentBuilder, gst::ErrorMessage> {
+ let url = self.url.lock().unwrap();
+ let settings = self.settings.lock().unwrap();
+ let mut state = self.state.lock().unwrap();
+ let state = match *state {
+ State::Started(ref mut started_state) => started_state,
+ State::Completed => {
+ unreachable!("Upload should not be completed yet");
+ }
+ State::Stopped => {
+ unreachable!("Element should be started");
+ }
+ };
+
+ let part_number = state.increment_part_number()?;
+ let body = Some(ByteStream::from(std::mem::replace(
+ &mut state.buffer,
+ Vec::with_capacity(settings.buffer_size as usize),
+ )));
+
+ let bucket = Some(url.as_ref().unwrap().bucket.to_owned());
+ let key = Some(url.as_ref().unwrap().object.to_owned());
+ let upload_id = Some(state.upload_id.to_owned());
+
+ let client = &state.client;
+ let upload_part = client
+ .upload_part()
+ .set_body(body)
+ .set_bucket(bucket)
+ .set_key(key)
+ .set_upload_id(upload_id)
+ .set_part_number(Some(part_number as i32));
+
+ Ok(upload_part)
+ }
+
+ fn create_complete_multipart_upload_request(
+ &self,
+ started_state: &mut Started,
+ ) -> CompleteMultipartUploadFluentBuilder {
+ started_state
+ .completed_parts
+ .sort_by(|a, b| a.part_number.cmp(&b.part_number));
+
+ let parts = Some(std::mem::take(&mut started_state.completed_parts));
+
+ let completed_upload = CompletedMultipartUpload::builder().set_parts(parts).build();
+
+ let url = self.url.lock().unwrap();
+ let client = &started_state.client;
+
+ let bucket = Some(url.as_ref().unwrap().bucket.to_owned());
+ let key = Some(url.as_ref().unwrap().object.to_owned());
+ let upload_id = Some(started_state.upload_id.to_owned());
+ let multipart_upload = Some(completed_upload);
+
+ client
+ .complete_multipart_upload()
+ .set_bucket(bucket)
+ .set_key(key)
+ .set_upload_id(upload_id)
+ .set_multipart_upload(multipart_upload)
+ }
+
+ fn create_create_multipart_upload_request(
+ &self,
+ client: &Client,
+ url: &GstS3Url,
+ settings: &Settings,
+ ) -> CreateMultipartUploadFluentBuilder {
+ let bucket = Some(url.bucket.clone());
+ let key = Some(url.object.clone());
+ let content_type = settings.content_type.clone();
+ let content_disposition = settings.content_disposition.clone();
+ let metadata = settings.to_metadata(self);
+
+ client
+ .create_multipart_upload()
+ .set_bucket(bucket)
+ .set_key(key)
+ .set_content_type(content_type)
+ .set_content_disposition(content_disposition)
+ .set_metadata(metadata)
+ }
+
+ fn create_abort_multipart_upload_request(
+ &self,
+ client: &Client,
+ url: &GstS3Url,
+ started_state: &Started,
+ ) -> AbortMultipartUploadFluentBuilder {
+ let bucket = Some(url.bucket.clone());
+ let key = Some(url.object.clone());
+
+ client
+ .abort_multipart_upload()
+ .set_bucket(bucket)
+ .set_expected_bucket_owner(None)
+ .set_key(key)
+ .set_request_payer(None)
+ .set_upload_id(Some(started_state.upload_id.to_owned()))
+ }
+
+ fn abort_multipart_upload_request(
+ &self,
+ started_state: &Started,
+ ) -> Result<(), gst::ErrorMessage> {
+ let s3url = {
+ let url = self.url.lock().unwrap();
+ match *url {
+ Some(ref url) => url.clone(),
+ None => unreachable!("Element should be started"),
+ }
+ };
+
+ let client = &started_state.client;
+ let abort_req = self.create_abort_multipart_upload_request(client, &s3url, started_state);
+ let abort_req_future = abort_req.send();
+
+ s3utils::wait(&self.abort_multipart_canceller, abort_req_future)
+ .map(|_| ())
+ .map_err(|err| match err {
+ WaitError::FutureError(err) => {
+ gst::error_msg!(
+ gst::ResourceError::Write,
+ ["Failed to abort multipart upload: {}.", err.to_string()]
+ )
+ }
+ WaitError::Cancelled => {
+ gst::error_msg!(
+ gst::ResourceError::Write,
+ ["Abort multipart upload request interrupted."]
+ )
+ }
+ })
+ }
+
+ fn complete_multipart_upload_request(
+ &self,
+ started_state: &mut Started,
+ ) -> Result<(), gst::ErrorMessage> {
+ let complete_req = self.create_complete_multipart_upload_request(started_state);
+ let complete_req_future = complete_req.send();
+
+ s3utils::wait(&self.canceller, complete_req_future)
+ .map(|_| ())
+ .map_err(|err| match err {
+ WaitError::FutureError(err) => gst::error_msg!(
+ gst::ResourceError::Write,
+ ["Failed to complete multipart upload: {}.", err.to_string()]
+ ),
+ WaitError::Cancelled => {
+ gst::error_msg!(
+ gst::LibraryError::Failed,
+ ["Complete multipart upload request interrupted"]
+ )
+ }
+ })
+ }
+
+ fn finalize_upload(&self) -> Result<(), gst::ErrorMessage> {
+ if self.flush_current_buffer().is_err() {
+ return Err(gst::error_msg!(
+ gst::ResourceError::Settings,
+ ["Failed to flush internal buffer."]
+ ));
+ }
+
+ let mut state = self.state.lock().unwrap();
+ let started_state = match *state {
+ State::Started(ref mut started_state) => started_state,
+ State::Completed => {
+ unreachable!("Upload should not be completed yet");
+ }
+ State::Stopped => {
+ unreachable!("Element should be started");
+ }
+ };
+
+ let res = self.complete_multipart_upload_request(started_state);
+
+ if res.is_ok() {
+ *state = State::Completed;
+ }
+
+ res
+ }
+
+ fn start(&self) -> Result<(), gst::ErrorMessage> {
+ let mut state = self.state.lock().unwrap();
+ let settings = self.settings.lock().unwrap();
+
+ if let State::Started { .. } = *state {
+ unreachable!("Element should be started");
+ }
+
+ let s3url = {
+ let url = self.url.lock().unwrap();
+ match *url {
+ Some(ref url) => url.clone(),
+ None => {
+ return Err(gst::error_msg!(
+ gst::ResourceError::Settings,
+ ["Cannot start without a URL being set"]
+ ));
+ }
+ }
+ };
+
+ let timeout_config = s3utils::timeout_config(settings.request_timeout);
+
+ let cred = match (
+ settings.access_key.as_ref(),
+ settings.secret_access_key.as_ref(),
+ ) {
+ (Some(access_key), Some(secret_access_key)) => Some(Credentials::new(
+ access_key.clone(),
+ secret_access_key.clone(),
+ settings.session_token.clone(),
+ None,
+ "aws-s3-sink",
+ )),
+ _ => None,
+ };
+
+ let sdk_config =
+ s3utils::wait_config(&self.canceller, s3url.region.clone(), timeout_config, cred)
+ .map_err(|err| match err {
+ WaitError::FutureError(err) => gst::error_msg!(
+ gst::ResourceError::OpenWrite,
+ ["Failed to create SDK config: {}", err]
+ ),
+ WaitError::Cancelled => {
+ gst::error_msg!(
+ gst::LibraryError::Failed,
+ ["SDK config request interrupted during start"]
+ )
+ }
+ })?;
+
+ let config_builder = config::Builder::from(&sdk_config)
+ .retry_config(RetryConfig::standard().with_max_attempts(settings.retry_attempts));
+
+ let config = if let Some(ref uri) = settings.endpoint_uri {
+ config_builder.endpoint_url(uri).build()
+ } else {
+ config_builder.build()
+ };
+
+ let client = Client::from_conf(config);
+
+ let create_multipart_req =
+ self.create_create_multipart_upload_request(&client, &s3url, &settings);
+ let create_multipart_req_future = create_multipart_req.send();
+
+ let response = s3utils::wait(&self.canceller, create_multipart_req_future).map_err(
+ |err| match err {
+ WaitError::FutureError(err) => gst::error_msg!(
+ gst::ResourceError::OpenWrite,
+ ["Failed to create multipart upload: {}", err]
+ ),
+ WaitError::Cancelled => {
+ gst::error_msg!(
+ gst::LibraryError::Failed,
+ ["Create multipart request interrupted during start"]
+ )
+ }
+ },
+ )?;
+
+ let upload_id = response.upload_id.ok_or_else(|| {
+ gst::error_msg!(
+ gst::ResourceError::Failed,
+ ["Failed to get multipart upload ID"]
+ )
+ })?;
+
+ *state = State::Started(Started::new(
+ client,
+ Vec::with_capacity(settings.buffer_size as usize),
+ upload_id,
+ ));
+
+ Ok(())
+ }
+
+ fn update_buffer(&self, src: &[u8]) -> Result<(), Option<gst::ErrorMessage>> {
+ let mut state = self.state.lock().unwrap();
+ let started_state = match *state {
+ State::Started(ref mut started_state) => started_state,
+ State::Completed => {
+ unreachable!("Upload should not be completed yet");
+ }
+ State::Stopped => {
+ unreachable!("Element should be started already");
+ }
+ };
+
+ let to_copy = std::cmp::min(
+ started_state.buffer.capacity() - started_state.buffer.len(),
+ src.len(),
+ );
+
+ let (head, tail) = src.split_at(to_copy);
+ started_state.buffer.extend_from_slice(head);
+ let do_flush = started_state.buffer.capacity() == started_state.buffer.len();
+ drop(state);
+
+ if do_flush {
+ self.flush_current_buffer()?;
+ }
+
+ if to_copy < src.len() {
+ self.update_buffer(tail)?;
+ }
+
+ Ok(())
+ }
+
+ fn cancel(&self) {
+ let mut canceller = self.canceller.lock().unwrap();
+ let mut abort_canceller = self.abort_multipart_canceller.lock().unwrap();
+
+ if let Some(c) = abort_canceller.take() {
+ c.abort()
+ };
+
+ if let Some(c) = canceller.take() {
+ c.abort()
+ };
+ }
+
+ fn set_uri(self: &S3Sink, url_str: Option<&str>) -> Result<(), glib::Error> {
+ let state = self.state.lock().unwrap();
+
+ if let State::Started { .. } = *state {
+ return Err(glib::Error::new(
+ gst::URIError::BadState,
+ "Cannot set URI on a started s3sink",
+ ));
+ }
+
+ let mut url = self.url.lock().unwrap();
+
+ if url_str.is_none() {
+ *url = None;
+ return Ok(());
+ }
+
+ gst::debug!(CAT, imp: self, "Setting uri to {:?}", url_str);
+
+ let url_str = url_str.unwrap();
+ match parse_s3_url(url_str) {
+ Ok(s3url) => {
+ *url = Some(s3url);
+ Ok(())
+ }
+ Err(_) => Err(glib::Error::new(
+ gst::URIError::BadUri,
+ "Could not parse URI",
+ )),
+ }
+ }
+}
+
+#[glib::object_subclass]
+impl ObjectSubclass for S3Sink {
+ const NAME: &'static str = "GstAwsS3Sink";
+ type Type = super::S3Sink;
+ type ParentType = gst_base::BaseSink;
+ type Interfaces = (gst::URIHandler,);
+}
+
+impl ObjectImpl for S3Sink {
+ fn constructed(&self) {
+ self.parent_constructed();
+
+ self.obj().set_sync(false);
+ }
+
+ fn properties() -> &'static [glib::ParamSpec] {
+ static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
+ vec![
+ glib::ParamSpecString::builder("bucket")
+ .nick("S3 Bucket")
+ .blurb("The bucket of the file to write")
+ .mutable_ready()
+ .build(),
+ glib::ParamSpecString::builder("key")
+ .nick("S3 Key")
+ .blurb("The key of the file to write")
+ .mutable_ready()
+ .build(),
+ glib::ParamSpecString::builder("region")
+ .nick("AWS Region")
+ .blurb("An AWS region (e.g. eu-west-2).")
+ .default_value(Some("us-west-2"))
+ .mutable_ready()
+ .build(),
+ glib::ParamSpecUInt64::builder("part-size")
+ .nick("Part size")
+ .blurb("A size (in bytes) of an individual part used for multipart upload.")
+ .minimum(5 * 1024 * 1024) // 5 MB
+ .maximum(5 * 1024 * 1024 * 1024) // 5 GB
+ .default_value(DEFAULT_BUFFER_SIZE)
+ .mutable_ready()
+ .build(),
+ glib::ParamSpecString::builder("uri")
+ .nick("URI")
+ .blurb("The S3 object URI")
+ .mutable_ready()
+ .build(),
+ glib::ParamSpecString::builder("access-key")
+ .nick("Access Key")
+ .blurb("AWS Access Key")
+ .mutable_ready()
+ .build(),
+ glib::ParamSpecString::builder("secret-access-key")
+ .nick("Secret Access Key")
+ .blurb("AWS Secret Access Key")
+ .mutable_ready()
+ .build(),
+ glib::ParamSpecString::builder("session-token")
+ .nick("Session Token")
+ .blurb("AWS temporary Session Token from STS")
+ .mutable_ready()
+ .build(),
+ glib::ParamSpecBoxed::builder::<gst::Structure>("metadata")
+ .nick("Metadata")
+ .blurb("A map of metadata to store with the object in S3; field values need to be convertible to strings.")
+ .mutable_ready()
+ .build(),
+ glib::ParamSpecEnum::builder_with_default("on-error", DEFAULT_MULTIPART_UPLOAD_ON_ERROR)
+ .nick("Whether to upload or complete the multipart upload on error")
+ .blurb("Do nothing, abort or complete a multipart upload request on error")
+ .mutable_ready()
+ .build(),
+ glib::ParamSpecUInt::builder("retry-attempts")
+ .nick("Retry attempts")
+ .blurb("Number of times AWS SDK attempts a request before abandoning the request")
+ .minimum(1)
+ .maximum(10)
+ .default_value(DEFAULT_RETRY_ATTEMPTS)
+ .build(),
+ glib::ParamSpecInt64::builder("request-timeout")
+ .nick("Request timeout")
+ .blurb("Timeout for general S3 requests (in ms, set to -1 for infinity)")
+ .minimum(-1)
+ .default_value(DEFAULT_REQUEST_TIMEOUT_MSEC as i64)
+ .build(),
+ glib::ParamSpecInt64::builder("upload-part-request-timeout")
+ .nick("Upload part request timeout")
+ .blurb("Timeout for a single upload part request (in ms, set to -1 for infinity) (Deprecated. Use request-timeout.)")
+ .minimum(-1)
+ .default_value(DEFAULT_UPLOAD_PART_REQUEST_TIMEOUT_MSEC as i64)
+ .build(),
+ glib::ParamSpecInt64::builder("complete-upload-request-timeout")
+ .nick("Complete upload request timeout")
+ .blurb("Timeout for the complete multipart upload request (in ms, set to -1 for infinity) (Deprecated. Use request-timeout.)")
+ .minimum(-1)
+ .default_value(DEFAULT_COMPLETE_REQUEST_TIMEOUT_MSEC as i64)
+ .build(),
+ glib::ParamSpecInt64::builder("retry-duration")
+ .nick("Retry duration")
+ .blurb("How long we should retry general S3 requests before giving up (in ms, set to -1 for infinity) (Deprecated. Use retry-attempts.)")
+ .minimum(-1)
+ .default_value(DEFAULT_RETRY_DURATION_MSEC as i64)
+ .build(),
+ glib::ParamSpecInt64::builder("upload-part-retry-duration")
+ .nick("Upload part retry duration")
+ .blurb("How long we should retry upload part requests before giving up (in ms, set to -1 for infinity) (Deprecated. Use retry-attempts.)")
+ .minimum(-1)
+ .default_value(DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC as i64)
+ .build(),
+ glib::ParamSpecInt64::builder("complete-upload-retry-duration")
+ .nick("Complete upload retry duration")
+ .blurb("How long we should retry complete multipart upload requests before giving up (in ms, set to -1 for infinity) (Deprecated. Use retry-attempts.)")
+ .minimum(-1)
+ .default_value(DEFAULT_COMPLETE_RETRY_DURATION_MSEC as i64)
+ .build(),
+ glib::ParamSpecString::builder("endpoint-uri")
+ .nick("S3 endpoint URI")
+ .blurb("The S3 endpoint URI to use")
+ .build(),
+ glib::ParamSpecString::builder("content-type")
+ .nick("content-type")
+ .blurb("Content-Type header to set for uploaded object")
+ .build(),
+ glib::ParamSpecString::builder("content-disposition")
+ .nick("content-disposition")
+ .blurb("Content-Disposition header to set for uploaded object")
+ .build(),
+ ]
+ });
+
+ PROPERTIES.as_ref()
+ }
+
+ fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
+ let mut settings = self.settings.lock().unwrap();
+
+ gst::debug!(
+ CAT,
+ imp: self,
+ "Setting property '{}' to '{:?}'",
+ pspec.name(),
+ value
+ );
+
+ match pspec.name() {
+ "bucket" => {
+ settings.bucket = value
+ .get::<Option<String>>()
+ .expect("type checked upstream");
+ if settings.key.is_some() {
+ let _ = self.set_uri(Some(&settings.to_uri()));
+ }
+ }
+ "key" => {
+ settings.key = value
+ .get::<Option<String>>()
+ .expect("type checked upstream");
+ if settings.bucket.is_some() {
+ let _ = self.set_uri(Some(&settings.to_uri()));
+ }
+ }
+ "region" => {
+ let region = value.get::<String>().expect("type checked upstream");
+ settings.region = Region::new(region);
+ if settings.key.is_some() && settings.bucket.is_some() {
+ let _ = self.set_uri(Some(&settings.to_uri()));
+ }
+ }
+ "part-size" => {
+ settings.buffer_size = value.get::<u64>().expect("type checked upstream");
+ }
+ "uri" => {
+ let _ = self.set_uri(value.get().expect("type checked upstream"));
+ }
+ "access-key" => {
+ settings.access_key = value.get().expect("type checked upstream");
+ }
+ "secret-access-key" => {
+ settings.secret_access_key = value.get().expect("type checked upstream");
+ }
+ "session-token" => {
+ settings.session_token = value.get().expect("type checked upstream");
+ }
+ "metadata" => {
+ settings.metadata = value.get().expect("type checked upstream");
+ }
+ "on-error" => {
+ settings.multipart_upload_on_error =
+ value.get::<OnError>().expect("type checked upstream");
+ }
+ "retry-attempts" => {
+ settings.retry_attempts = value.get::<u32>().expect("type checked upstream");
+ }
+ "request-timeout" => {
+ settings.request_timeout =
+ duration_from_millis(value.get::<i64>().expect("type checked upstream"));
+ }
+ "upload-part-request-timeout" => {
+ settings.request_timeout =
+ duration_from_millis(value.get::<i64>().expect("type checked upstream"));
+ }
+ "complete-upload-request-timeout" => {
+ settings.request_timeout =
+ duration_from_millis(value.get::<i64>().expect("type checked upstream"));
+ }
+ "retry-duration" => {
+ /*
+ * To maintain backwards compatibility calculate retry attempts
+ * by dividing the provided duration from request timeout.
+ */
+ let value = value.get::<i64>().expect("type checked upstream");
+ let request_timeout = duration_to_millis(Some(settings.request_timeout));
+ let retry_attempts = if value > request_timeout {
+ value / request_timeout
+ } else {
+ 1
+ };
+ settings.retry_attempts = retry_attempts as u32;
+ }
+ "upload-part-retry-duration" | "complete-upload-retry-duration" => {
+ gst::warning!(CAT, "Use retry-attempts. retry/upload-part/complete-upload-retry duration are deprecated.");
+ }
+ "endpoint-uri" => {
+ settings.endpoint_uri = value
+ .get::<Option<String>>()
+ .expect("type checked upstream");
+ if settings.key.is_some() && settings.bucket.is_some() {
+ let _ = self.set_uri(Some(&settings.to_uri()));
+ }
+ }
+ "content-type" => {
+ settings.content_type = value
+ .get::<Option<String>>()
+ .expect("type checked upstream");
+ }
+ "content-disposition" => {
+ settings.content_disposition = value
+ .get::<Option<String>>()
+ .expect("type checked upstream");
+ }
+ _ => unimplemented!(),
+ }
+ }
+
+ fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
+ let settings = self.settings.lock().unwrap();
+
+ match pspec.name() {
+ "key" => settings.key.to_value(),
+ "bucket" => settings.bucket.to_value(),
+ "region" => settings.region.to_string().to_value(),
+ "part-size" => settings.buffer_size.to_value(),
+ "uri" => {
+ let url = self.url.lock().unwrap();
+ let url = match *url {
+ Some(ref url) => url.to_string(),
+ None => "".to_string(),
+ };
+
+ url.to_value()
+ }
+ "access-key" => settings.access_key.to_value(),
+ "secret-access-key" => settings.secret_access_key.to_value(),
+ "session-token" => settings.session_token.to_value(),
+ "metadata" => settings.metadata.to_value(),
+ "on-error" => settings.multipart_upload_on_error.to_value(),
+ "retry-attempts" => settings.retry_attempts.to_value(),
+ "request-timeout" => duration_to_millis(Some(settings.request_timeout)).to_value(),
+ "upload-part-request-timeout" => {
+ duration_to_millis(Some(settings.request_timeout)).to_value()
+ }
+ "complete-upload-request-timeout" => {
+ duration_to_millis(Some(settings.request_timeout)).to_value()
+ }
+ "retry-duration" | "upload-part-retry-duration" | "complete-upload-retry-duration" => {
+ let request_timeout = duration_to_millis(Some(settings.request_timeout));
+ (settings.retry_attempts as i64 * request_timeout).to_value()
+ }
+ "endpoint-uri" => settings.endpoint_uri.to_value(),
+ "content-type" => settings.content_type.to_value(),
+ "content-disposition" => settings.content_disposition.to_value(),
+ _ => unimplemented!(),
+ }
+ }
+}
+
+impl GstObjectImpl for S3Sink {}
+
+impl ElementImpl for S3Sink {
+ fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
+ static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
+ #[cfg(feature = "doc")]
+ OnError::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty());
+ gst::subclass::ElementMetadata::new(
+ "Amazon S3 sink",
+ "Source/Network",
+ "Writes an object to Amazon S3",
+ "Marcin Kolny <mkolny@amazon.com>",
+ )
+ });
+
+ Some(&*ELEMENT_METADATA)
+ }
+
+ fn pad_templates() -> &'static [gst::PadTemplate] {
+ static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
+ let caps = gst::Caps::new_any();
+ let sink_pad_template = gst::PadTemplate::new(
+ "sink",
+ gst::PadDirection::Sink,
+ gst::PadPresence::Always,
+ &caps,
+ )
+ .unwrap();
+
+ vec![sink_pad_template]
+ });
+
+ PAD_TEMPLATES.as_ref()
+ }
+}
+
+impl URIHandlerImpl for S3Sink {
+ const URI_TYPE: gst::URIType = gst::URIType::Sink;
+
+ fn protocols() -> &'static [&'static str] {
+ &["s3"]
+ }
+
+ fn uri(&self) -> Option<String> {
+ self.url.lock().unwrap().as_ref().map(|s| s.to_string())
+ }
+
+ fn set_uri(&self, uri: &str) -> Result<(), glib::Error> {
+ self.set_uri(Some(uri))
+ }
+}
+
+impl BaseSinkImpl for S3Sink {
+ fn start(&self) -> Result<(), gst::ErrorMessage> {
+ self.start()
+ }
+
+ fn stop(&self) -> Result<(), gst::ErrorMessage> {
+ let mut state = self.state.lock().unwrap();
+
+ if let State::Started(ref mut state) = *state {
+ gst::warning!(CAT, imp: self, "Stopped without EOS");
+
+ // We're stopping without an EOS -- treat this as an error and deal with the open
+ // multipart upload accordingly _if_ we managed to upload any parts
+ if !state.completed_parts.is_empty() {
+ self.flush_multipart_upload(state);
+ }
+ }
+
+ *state = State::Stopped;
+ gst::info!(CAT, imp: self, "Stopped");
+
+ Ok(())
+ }
+
+ fn render(&self, buffer: &gst::Buffer) -> Result<gst::FlowSuccess, gst::FlowError> {
+ if let State::Stopped = *self.state.lock().unwrap() {
+ gst::element_imp_error!(self, gst::CoreError::Failed, ["Not started yet"]);
+ return Err(gst::FlowError::Error);
+ }
+
+ if let State::Completed = *self.state.lock().unwrap() {
+ gst::element_imp_error!(
+ self,
+ gst::CoreError::Failed,
+ ["Trying to render after upload complete"]
+ );
+ return Err(gst::FlowError::Error);
+ }
+
+ gst::trace!(CAT, imp: self, "Rendering {:?}", buffer);
+ let map = buffer.map_readable().map_err(|_| {
+ gst::element_imp_error!(self, gst::CoreError::Failed, ["Failed to map buffer"]);
+ gst::FlowError::Error
+ })?;
+
+ match self.update_buffer(&map) {
+ Ok(_) => Ok(gst::FlowSuccess::Ok),
+ Err(err) => match err {
+ Some(error_message) => {
+ gst::error!(CAT, imp: self, "Multipart upload failed: {}", error_message);
+ self.post_error_message(error_message);
+ Err(gst::FlowError::Error)
+ }
+ _ => {
+ gst::info!(CAT, imp: self, "Upload interrupted. Flushing...");
+ Err(gst::FlowError::Flushing)
+ }
+ },
+ }
+ }
+
+ fn unlock(&self) -> Result<(), gst::ErrorMessage> {
+ self.cancel();
+
+ Ok(())
+ }
+
+ fn event(&self, event: gst::Event) -> bool {
+ if let gst::EventView::Eos(_) = event.view() {
+ if let Err(error_message) = self.finalize_upload() {
+ gst::error!(
+ CAT,
+ imp: self,
+ "Failed to finalize the upload: {}",
+ error_message
+ );
+ return false;
+ }
+ }
+
+ BaseSinkImplExt::parent_event(self, event)
+ }
+}