diff options
Diffstat (limited to 'net/aws/tests/s3.rs')
-rw-r--r-- | net/aws/tests/s3.rs | 83 |
1 files changed, 68 insertions, 15 deletions
diff --git a/net/aws/tests/s3.rs b/net/aws/tests/s3.rs index c9d141e62..4fff59d62 100644 --- a/net/aws/tests/s3.rs +++ b/net/aws/tests/s3.rs @@ -28,6 +28,12 @@ mod tests { }); } + fn make_buffer(content: &[u8]) -> gst::Buffer { + let mut buf = gst::Buffer::from_slice(content.to_owned()); + buf.make_mut().set_pts(gst::ClockTime::from_mseconds(200)); + buf + } + async fn delete_object(region: String, bucket: &str, key: &str) { let region_provider = aws_config::meta::region::RegionProviderChain::first_try( aws_sdk_s3::config::Region::new(region), @@ -70,11 +76,11 @@ mod tests { h1.set_src_caps(gst::Caps::builder("text/plain").build()); h1.play(); - h1.push(gst::Buffer::from_slice(content)).unwrap(); - h1.push(gst::Buffer::from_slice(content)).unwrap(); - h1.push(gst::Buffer::from_slice(content)).unwrap(); - h1.push(gst::Buffer::from_slice(content)).unwrap(); - h1.push(gst::Buffer::from_slice(content)).unwrap(); + h1.push(make_buffer(content)).unwrap(); + h1.push(make_buffer(content)).unwrap(); + h1.push(make_buffer(content)).unwrap(); + h1.push(make_buffer(content)).unwrap(); + h1.push(make_buffer(content)).unwrap(); h1.push_event(gst::event::Eos::new()); let mut h2 = gst_check::Harness::new("awss3src"); @@ -91,7 +97,12 @@ mod tests { } // Common helper - async fn do_s3_putobject_test(key_prefix: &str) { + async fn do_s3_putobject_test( + key_prefix: &str, + buffers: Option<u64>, + bytes: Option<u64>, + time: Option<gst::ClockTime>, + ) { init(); let region = std::env::var("AWS_REGION").unwrap_or_else(|_| DEFAULT_S3_REGION.to_string()); @@ -103,22 +114,40 @@ mod tests { // Manually add the element so we can configure it before it goes to PLAYING let mut h1 = gst_check::Harness::new_empty(); + // Need to add_parse() because the Harness API / Rust bindings aren't conducive to creating and // adding an element manually h1.add_parse( - format!("awss3putobjectsink key=\"{key}\" region=\"{region}\" bucket=\"{bucket}\"") + format!("awss3putobjectsink key=\"{key}\" region=\"{region}\" bucket=\"{bucket}\" name=\"sink\"") .as_str(), ); + let h1el = h1 + .element() + .unwrap() + .dynamic_cast::<gst::Bin>() + .unwrap() + .by_name("sink") + .unwrap(); + if let Some(b) = buffers { + h1el.set_property("flush-interval-buffers", b) + }; + if let Some(b) = bytes { + h1el.set_property("flush-interval-bytes", b) + }; + if time.is_some() { + h1el.set_property("flush-interval-time", time) + }; + h1.set_src_caps(gst::Caps::builder("text/plain").build()); h1.play(); - h1.push(gst::Buffer::from_slice(content)).unwrap(); - h1.push(gst::Buffer::from_slice(content)).unwrap(); - h1.push(gst::Buffer::from_slice(content)).unwrap(); - h1.push(gst::Buffer::from_slice(content)).unwrap(); - h1.push(gst::Buffer::from_slice(content)).unwrap(); + h1.push(make_buffer(content)).unwrap(); + h1.push(make_buffer(content)).unwrap(); + h1.push(make_buffer(content)).unwrap(); + h1.push(make_buffer(content)).unwrap(); + h1.push(make_buffer(content)).unwrap(); h1.push_event(gst::event::Eos::new()); let mut h2 = gst_check::Harness::new("awss3src"); @@ -151,16 +180,40 @@ mod tests { #[tokio::test] async fn test_s3_put_object_simple() { - do_s3_putobject_test("s3-put-object-test").await; + do_s3_putobject_test("s3-put-object-test", None, None, None).await; } #[tokio::test] async fn test_s3_put_object_whitespace() { - do_s3_putobject_test("s3 put object test").await; + do_s3_putobject_test("s3 put object test", None, None, None).await; } #[tokio::test] async fn test_s3_put_object_unicode() { - do_s3_putobject_test("s3 put object 🧪 😱").await; + do_s3_putobject_test("s3 put object 🧪 😱", None, None, None).await; + } + + #[tokio::test] + async fn test_s3_put_object_flush_buffers() { + // Awkward threshold as we push 5 buffers + do_s3_putobject_test("s3-put-object-test fbuf", Some(2), None, None).await; + } + + #[tokio::test] + async fn test_s3_put_object_flush_bytes() { + // Awkward threshold as we push 14 bytes per buffer + do_s3_putobject_test("s3-put-object-test fbytes", None, Some(30), None).await; + } + + #[tokio::test] + async fn test_s3_put_object_flush_time() { + do_s3_putobject_test( + "s3-put-object-test ftime", + None, + None, + // Awkward threshold as we push each buffer with 200ms + Some(gst::ClockTime::from_mseconds(300)), + ) + .await; } } |