Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/sdroege/gst-plugin-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'net/aws/src/s3sink/putobjectsink.rs')
-rw-r--r--net/aws/src/s3sink/putobjectsink.rs93
1 files changed, 92 insertions, 1 deletions
diff --git a/net/aws/src/s3sink/putobjectsink.rs b/net/aws/src/s3sink/putobjectsink.rs
index ae044bd1..2275a5d6 100644
--- a/net/aws/src/s3sink/putobjectsink.rs
+++ b/net/aws/src/s3sink/putobjectsink.rs
@@ -31,6 +31,9 @@ use crate::s3url::*;
use crate::s3utils::{self, duration_from_millis, duration_to_millis, WaitError};
const DEFAULT_RETRY_ATTEMPTS: u32 = 5;
+const DEFAULT_FLUSH_INTERVAL_BUFFERS: u64 = 1;
+const DEFAULT_FLUSH_INTERVAL_BYTES: u64 = 0;
+const DEFAULT_FLUSH_INTERVAL_TIME: gst::ClockTime = gst::ClockTime::from_nseconds(0);
// General setting for create / abort requests
const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 15_000;
@@ -38,11 +41,18 @@ const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 15_000;
struct Started {
client: Client,
buffer: Vec<u8>,
+ start_pts: Option<gst::ClockTime>,
+ num_buffers: u64,
}
impl Started {
pub fn new(client: Client, buffer: Vec<u8>) -> Started {
- Started { client, buffer }
+ Started {
+ client,
+ buffer,
+ start_pts: gst::ClockTime::NONE,
+ num_buffers: 0,
+ }
}
}
@@ -66,6 +76,9 @@ struct Settings {
retry_attempts: u32,
request_timeout: Duration,
endpoint_uri: Option<String>,
+ flush_interval_buffers: u64,
+ flush_interval_bytes: u64,
+ flush_interval_time: Option<gst::ClockTime>,
}
impl Settings {
@@ -118,6 +131,9 @@ impl Default for Settings {
retry_attempts: DEFAULT_RETRY_ATTEMPTS,
request_timeout: Duration::from_millis(DEFAULT_REQUEST_TIMEOUT_MSEC),
endpoint_uri: None,
+ flush_interval_buffers: DEFAULT_FLUSH_INTERVAL_BUFFERS,
+ flush_interval_bytes: DEFAULT_FLUSH_INTERVAL_BYTES,
+ flush_interval_time: Some(DEFAULT_FLUSH_INTERVAL_TIME),
}
}
}
@@ -139,6 +155,40 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
});
impl S3PutObjectSink {
+ fn check_thresholds(
+ &self,
+ state: &Started,
+ pts: Option<gst::ClockTime>,
+ duration: Option<gst::ClockTime>,
+ ) -> bool {
+ let settings = self.settings.lock().unwrap();
+
+ #[allow(clippy::if_same_then_else)]
+ #[allow(clippy::needless_bool)]
+ // Verbose if/else form for readability
+ if settings.flush_interval_buffers > 0
+ && (state.num_buffers % settings.flush_interval_buffers) == 0
+ {
+ true
+ } else if settings.flush_interval_bytes > 0
+ && (state.buffer.len() as u64 % settings.flush_interval_bytes) == 0
+ {
+ true
+ } else if settings.flush_interval_time.is_some()
+ && settings.flush_interval_time.unwrap() != DEFAULT_FLUSH_INTERVAL_TIME
+ && state.start_pts.is_some()
+ && pts.is_some()
+ && duration.is_some()
+ && (pts.unwrap() - state.start_pts.unwrap() + duration.unwrap())
+ % settings.flush_interval_time.unwrap()
+ == gst::ClockTime::from_nseconds(0)
+ {
+ true
+ } else {
+ false
+ }
+ }
+
fn flush_buffer(&self) -> Result<(), Option<gst::ErrorMessage>> {
let put_object_req = self.create_put_object_request();
@@ -370,6 +420,21 @@ impl ObjectImpl for S3PutObjectSink {
.nick("content-disposition")
.blurb("Content-Disposition header to set for uploaded object")
.build(),
+ glib::ParamSpecUInt64::builder("flush-interval-buffers")
+ .nick("Flush interval in buffers")
+ .blurb("Number of buffers to accumulate before doing a write (0 => disable)")
+ .default_value(DEFAULT_FLUSH_INTERVAL_BUFFERS)
+ .build(),
+ glib::ParamSpecUInt64::builder("flush-interval-bytes")
+ .nick("Flush interval in bytes")
+ .blurb("Number of bytes to accumulate before doing a write (0 => disable)")
+ .default_value(DEFAULT_FLUSH_INTERVAL_BYTES)
+ .build(),
+ glib::ParamSpecUInt64::builder("flush-interval-time")
+ .nick("Flush interval in duration")
+ .blurb("Total duration of buffers to accumulate before doing a write (0 => disable)")
+ .default_value(DEFAULT_FLUSH_INTERVAL_TIME.nseconds())
+ .build(),
]
});
@@ -451,6 +516,18 @@ impl ObjectImpl for S3PutObjectSink {
.get::<Option<String>>()
.expect("type checked upstream");
}
+ "flush-interval-buffers" => {
+ settings.flush_interval_buffers =
+ value.get::<u64>().expect("type checked upstream");
+ }
+ "flush-interval-bytes" => {
+ settings.flush_interval_bytes = value.get::<u64>().expect("type checked upstream");
+ }
+ "flush-interval-time" => {
+ settings.flush_interval_time = value
+ .get::<Option<gst::ClockTime>>()
+ .expect("type checked upstream");
+ }
_ => unimplemented!(),
}
}
@@ -480,6 +557,9 @@ impl ObjectImpl for S3PutObjectSink {
"endpoint-uri" => settings.endpoint_uri.to_value(),
"content-type" => settings.content_type.to_value(),
"content-disposition" => settings.content_disposition.to_value(),
+ "flush-interval-buffers" => settings.flush_interval_buffers.to_value(),
+ "flush-interval-bytes" => settings.flush_interval_bytes.to_value(),
+ "flush-interval-time" => settings.flush_interval_time.to_value(),
_ => unimplemented!(),
}
}
@@ -544,6 +624,12 @@ impl BaseSinkImpl for S3PutObjectSink {
}
};
+ if started_state.start_pts.is_none() {
+ started_state.start_pts = buffer.pts();
+ }
+
+ started_state.num_buffers += 1;
+
gst::trace!(CAT, imp: self, "Rendering {:?}", buffer);
let map = buffer.map_readable().map_err(|_| {
gst::element_imp_error!(self, gst::CoreError::Failed, ["Failed to map buffer"]);
@@ -551,6 +637,11 @@ impl BaseSinkImpl for S3PutObjectSink {
})?;
started_state.buffer.extend_from_slice(map.as_slice());
+
+ if !self.check_thresholds(started_state, buffer.pts(), buffer.duration()) {
+ return Ok(gst::FlowSuccess::Ok);
+ }
+
drop(state);
match self.flush_buffer() {