diff options
Diffstat (limited to 'net/aws/src/s3sink/putobjectsink.rs')
-rw-r--r-- | net/aws/src/s3sink/putobjectsink.rs | 93 |
1 files changed, 92 insertions, 1 deletions
diff --git a/net/aws/src/s3sink/putobjectsink.rs b/net/aws/src/s3sink/putobjectsink.rs index ae044bd1..2275a5d6 100644 --- a/net/aws/src/s3sink/putobjectsink.rs +++ b/net/aws/src/s3sink/putobjectsink.rs @@ -31,6 +31,9 @@ use crate::s3url::*; use crate::s3utils::{self, duration_from_millis, duration_to_millis, WaitError}; const DEFAULT_RETRY_ATTEMPTS: u32 = 5; +const DEFAULT_FLUSH_INTERVAL_BUFFERS: u64 = 1; +const DEFAULT_FLUSH_INTERVAL_BYTES: u64 = 0; +const DEFAULT_FLUSH_INTERVAL_TIME: gst::ClockTime = gst::ClockTime::from_nseconds(0); // General setting for create / abort requests const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 15_000; @@ -38,11 +41,18 @@ const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 15_000; struct Started { client: Client, buffer: Vec<u8>, + start_pts: Option<gst::ClockTime>, + num_buffers: u64, } impl Started { pub fn new(client: Client, buffer: Vec<u8>) -> Started { - Started { client, buffer } + Started { + client, + buffer, + start_pts: gst::ClockTime::NONE, + num_buffers: 0, + } } } @@ -66,6 +76,9 @@ struct Settings { retry_attempts: u32, request_timeout: Duration, endpoint_uri: Option<String>, + flush_interval_buffers: u64, + flush_interval_bytes: u64, + flush_interval_time: Option<gst::ClockTime>, } impl Settings { @@ -118,6 +131,9 @@ impl Default for Settings { retry_attempts: DEFAULT_RETRY_ATTEMPTS, request_timeout: Duration::from_millis(DEFAULT_REQUEST_TIMEOUT_MSEC), endpoint_uri: None, + flush_interval_buffers: DEFAULT_FLUSH_INTERVAL_BUFFERS, + flush_interval_bytes: DEFAULT_FLUSH_INTERVAL_BYTES, + flush_interval_time: Some(DEFAULT_FLUSH_INTERVAL_TIME), } } } @@ -139,6 +155,40 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| { }); impl S3PutObjectSink { + fn check_thresholds( + &self, + state: &Started, + pts: Option<gst::ClockTime>, + duration: Option<gst::ClockTime>, + ) -> bool { + let settings = self.settings.lock().unwrap(); + + #[allow(clippy::if_same_then_else)] + #[allow(clippy::needless_bool)] + // Verbose if/else form for readability + if settings.flush_interval_buffers > 0 + && (state.num_buffers % settings.flush_interval_buffers) == 0 + { + true + } else if settings.flush_interval_bytes > 0 + && (state.buffer.len() as u64 % settings.flush_interval_bytes) == 0 + { + true + } else if settings.flush_interval_time.is_some() + && settings.flush_interval_time.unwrap() != DEFAULT_FLUSH_INTERVAL_TIME + && state.start_pts.is_some() + && pts.is_some() + && duration.is_some() + && (pts.unwrap() - state.start_pts.unwrap() + duration.unwrap()) + % settings.flush_interval_time.unwrap() + == gst::ClockTime::from_nseconds(0) + { + true + } else { + false + } + } + fn flush_buffer(&self) -> Result<(), Option<gst::ErrorMessage>> { let put_object_req = self.create_put_object_request(); @@ -370,6 +420,21 @@ impl ObjectImpl for S3PutObjectSink { .nick("content-disposition") .blurb("Content-Disposition header to set for uploaded object") .build(), + glib::ParamSpecUInt64::builder("flush-interval-buffers") + .nick("Flush interval in buffers") + .blurb("Number of buffers to accumulate before doing a write (0 => disable)") + .default_value(DEFAULT_FLUSH_INTERVAL_BUFFERS) + .build(), + glib::ParamSpecUInt64::builder("flush-interval-bytes") + .nick("Flush interval in bytes") + .blurb("Number of bytes to accumulate before doing a write (0 => disable)") + .default_value(DEFAULT_FLUSH_INTERVAL_BYTES) + .build(), + glib::ParamSpecUInt64::builder("flush-interval-time") + .nick("Flush interval in duration") + .blurb("Total duration of buffers to accumulate before doing a write (0 => disable)") + .default_value(DEFAULT_FLUSH_INTERVAL_TIME.nseconds()) + .build(), ] }); @@ -451,6 +516,18 @@ impl ObjectImpl for S3PutObjectSink { .get::<Option<String>>() .expect("type checked upstream"); } + "flush-interval-buffers" => { + settings.flush_interval_buffers = + value.get::<u64>().expect("type checked upstream"); + } + "flush-interval-bytes" => { + settings.flush_interval_bytes = value.get::<u64>().expect("type checked upstream"); + } + "flush-interval-time" => { + settings.flush_interval_time = value + .get::<Option<gst::ClockTime>>() + .expect("type checked upstream"); + } _ => unimplemented!(), } } @@ -480,6 +557,9 @@ impl ObjectImpl for S3PutObjectSink { "endpoint-uri" => settings.endpoint_uri.to_value(), "content-type" => settings.content_type.to_value(), "content-disposition" => settings.content_disposition.to_value(), + "flush-interval-buffers" => settings.flush_interval_buffers.to_value(), + "flush-interval-bytes" => settings.flush_interval_bytes.to_value(), + "flush-interval-time" => settings.flush_interval_time.to_value(), _ => unimplemented!(), } } @@ -544,6 +624,12 @@ impl BaseSinkImpl for S3PutObjectSink { } }; + if started_state.start_pts.is_none() { + started_state.start_pts = buffer.pts(); + } + + started_state.num_buffers += 1; + gst::trace!(CAT, imp: self, "Rendering {:?}", buffer); let map = buffer.map_readable().map_err(|_| { gst::element_imp_error!(self, gst::CoreError::Failed, ["Failed to map buffer"]); @@ -551,6 +637,11 @@ impl BaseSinkImpl for S3PutObjectSink { })?; started_state.buffer.extend_from_slice(map.as_slice()); + + if !self.check_thresholds(started_state, buffer.pts(), buffer.duration()) { + return Ok(gst::FlowSuccess::Ok); + } + drop(state); match self.flush_buffer() { |