Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/sdroege/gst-plugin-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArun Raghavan <arun@asymptotic.io>2023-09-28 19:38:19 +0300
committerArun Raghavan <arun@asymptotic.io>2023-12-19 00:13:48 +0300
commit410d104ad6146421655ec29fe00a9f3134445664 (patch)
tree74415ac533846085623a29a92236cc6e3ae0ebb8
parent12dbf50ddc0a8b38e16928d839c047e07aec5995 (diff)
aws: s3putobjectsink: Add a flush-on-error property
Makes sure we can send out data even if the pipeline shutdown in error. Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1337>
-rw-r--r--net/aws/src/s3sink/putobjectsink.rs43
-rw-r--r--net/aws/tests/s3.rs49
2 files changed, 86 insertions, 6 deletions
diff --git a/net/aws/src/s3sink/putobjectsink.rs b/net/aws/src/s3sink/putobjectsink.rs
index 2275a5d6..5b6d4934 100644
--- a/net/aws/src/s3sink/putobjectsink.rs
+++ b/net/aws/src/s3sink/putobjectsink.rs
@@ -34,6 +34,7 @@ const DEFAULT_RETRY_ATTEMPTS: u32 = 5;
const DEFAULT_FLUSH_INTERVAL_BUFFERS: u64 = 1;
const DEFAULT_FLUSH_INTERVAL_BYTES: u64 = 0;
const DEFAULT_FLUSH_INTERVAL_TIME: gst::ClockTime = gst::ClockTime::from_nseconds(0);
+const DEFAULT_FLUSH_ON_ERROR: bool = false;
// General setting for create / abort requests
const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 15_000;
@@ -43,6 +44,7 @@ struct Started {
buffer: Vec<u8>,
start_pts: Option<gst::ClockTime>,
num_buffers: u64,
+ need_flush: bool,
}
impl Started {
@@ -52,6 +54,7 @@ impl Started {
buffer,
start_pts: gst::ClockTime::NONE,
num_buffers: 0,
+ need_flush: false,
}
}
}
@@ -79,6 +82,7 @@ struct Settings {
flush_interval_buffers: u64,
flush_interval_bytes: u64,
flush_interval_time: Option<gst::ClockTime>,
+ flush_on_error: bool,
}
impl Settings {
@@ -134,6 +138,7 @@ impl Default for Settings {
flush_interval_buffers: DEFAULT_FLUSH_INTERVAL_BUFFERS,
flush_interval_bytes: DEFAULT_FLUSH_INTERVAL_BYTES,
flush_interval_time: Some(DEFAULT_FLUSH_INTERVAL_TIME),
+ flush_on_error: DEFAULT_FLUSH_ON_ERROR,
}
}
}
@@ -435,6 +440,11 @@ impl ObjectImpl for S3PutObjectSink {
.blurb("Total duration of buffers to accumulate before doing a write (0 => disable)")
.default_value(DEFAULT_FLUSH_INTERVAL_TIME.nseconds())
.build(),
+ glib::ParamSpecBoolean::builder("flush-on-error")
+ .nick("Flush on error")
+ .blurb("Whether to write out the data on error (like stopping without an EOS)")
+ .default_value(DEFAULT_FLUSH_ON_ERROR)
+ .build(),
]
});
@@ -528,6 +538,9 @@ impl ObjectImpl for S3PutObjectSink {
.get::<Option<gst::ClockTime>>()
.expect("type checked upstream");
}
+ "flush-on-error" => {
+ settings.flush_on_error = value.get::<bool>().expect("type checked upstream");
+ }
_ => unimplemented!(),
}
}
@@ -560,6 +573,7 @@ impl ObjectImpl for S3PutObjectSink {
"flush-interval-buffers" => settings.flush_interval_buffers.to_value(),
"flush-interval-bytes" => settings.flush_interval_bytes.to_value(),
"flush-interval-time" => settings.flush_interval_time.to_value(),
+ "flush-on-error" => settings.flush_on_error.to_value(),
_ => unimplemented!(),
}
}
@@ -606,6 +620,26 @@ impl BaseSinkImpl for S3PutObjectSink {
fn stop(&self) -> Result<(), gst::ErrorMessage> {
let mut state = self.state.lock().unwrap();
+ let settings = self.settings.lock().unwrap();
+
+ if let State::Started(ref started_state) = *state {
+ if settings.flush_on_error && started_state.need_flush {
+ drop(settings);
+ drop(state);
+
+ gst::warning!(CAT, imp: self, "Stopped without EOS, but flushing");
+ if let Err(error_message) = self.flush_buffer() {
+ gst::error!(
+ CAT,
+ imp: self,
+ "Failed to finalize the upload: {:?}",
+ error_message
+ );
+ }
+
+ state = self.state.lock().unwrap();
+ }
+ }
*state = State::Stopped;
gst::info!(CAT, imp: self, "Stopped");
@@ -629,6 +663,7 @@ impl BaseSinkImpl for S3PutObjectSink {
}
started_state.num_buffers += 1;
+ started_state.need_flush = true;
gst::trace!(CAT, imp: self, "Rendering {:?}", buffer);
let map = buffer.map_readable().map_err(|_| {
@@ -668,6 +703,14 @@ impl BaseSinkImpl for S3PutObjectSink {
fn event(&self, event: gst::Event) -> bool {
if let gst::EventView::Eos(_) = event.view() {
+ let mut state = self.state.lock().unwrap();
+
+ if let State::Started(ref mut started_state) = *state {
+ started_state.need_flush = false;
+ }
+
+ drop(state);
+
if let Err(error_message) = self.flush_buffer() {
gst::error!(
CAT,
diff --git a/net/aws/tests/s3.rs b/net/aws/tests/s3.rs
index 4fff59d6..dfe27376 100644
--- a/net/aws/tests/s3.rs
+++ b/net/aws/tests/s3.rs
@@ -102,6 +102,7 @@ mod tests {
buffers: Option<u64>,
bytes: Option<u64>,
time: Option<gst::ClockTime>,
+ do_eos: bool,
) {
init();
@@ -139,6 +140,9 @@ mod tests {
if time.is_some() {
h1el.set_property("flush-interval-time", time)
};
+ if !do_eos {
+ h1el.set_property("flush-on-error", true)
+ }
h1.set_src_caps(gst::Caps::builder("text/plain").build());
h1.play();
@@ -148,7 +152,13 @@ mod tests {
h1.push(make_buffer(content)).unwrap();
h1.push(make_buffer(content)).unwrap();
h1.push(make_buffer(content)).unwrap();
- h1.push_event(gst::event::Eos::new());
+
+ if do_eos {
+ h1.push_event(gst::event::Eos::new());
+ } else {
+ // teardown to trigger end
+ drop(h1);
+ }
let mut h2 = gst_check::Harness::new("awss3src");
h2.element().unwrap().set_property("uri", uri.clone());
@@ -180,29 +190,29 @@ mod tests {
#[tokio::test]
async fn test_s3_put_object_simple() {
- do_s3_putobject_test("s3-put-object-test", None, None, None).await;
+ do_s3_putobject_test("s3-put-object-test", None, None, None, true).await;
}
#[tokio::test]
async fn test_s3_put_object_whitespace() {
- do_s3_putobject_test("s3 put object test", None, None, None).await;
+ do_s3_putobject_test("s3 put object test", None, None, None, true).await;
}
#[tokio::test]
async fn test_s3_put_object_unicode() {
- do_s3_putobject_test("s3 put object 🧪 😱", None, None, None).await;
+ do_s3_putobject_test("s3 put object 🧪 😱", None, None, None, true).await;
}
#[tokio::test]
async fn test_s3_put_object_flush_buffers() {
// Awkward threshold as we push 5 buffers
- do_s3_putobject_test("s3-put-object-test fbuf", Some(2), None, None).await;
+ do_s3_putobject_test("s3-put-object-test fbuf", Some(2), None, None, true).await;
}
#[tokio::test]
async fn test_s3_put_object_flush_bytes() {
// Awkward threshold as we push 14 bytes per buffer
- do_s3_putobject_test("s3-put-object-test fbytes", None, Some(30), None).await;
+ do_s3_putobject_test("s3-put-object-test fbytes", None, Some(30), None, true).await;
}
#[tokio::test]
@@ -213,6 +223,33 @@ mod tests {
None,
// Awkward threshold as we push each buffer with 200ms
Some(gst::ClockTime::from_mseconds(300)),
+ true,
+ )
+ .await;
+ }
+
+ #[tokio::test]
+ async fn test_s3_put_object_on_eos() {
+ // Disable all flush thresholds, so only EOS causes a flush
+ do_s3_putobject_test(
+ "s3-put-object-test eos",
+ Some(0),
+ Some(0),
+ Some(gst::ClockTime::from_nseconds(0)),
+ true,
+ )
+ .await;
+ }
+
+ #[tokio::test]
+ async fn test_s3_put_object_without_eos() {
+ // Disable all flush thresholds, skip EOS, and cause a flush on error
+ do_s3_putobject_test(
+ "s3-put-object-test !eos",
+ Some(0),
+ Some(0),
+ Some(gst::ClockTime::from_nseconds(0)),
+ false,
)
.await;
}