Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/net
diff options
context:
space:
mode:
authorArun Raghavan <arun@asymptotic.io>2020-04-04 02:26:41 +0300
committerArun Raghavan <arun@arunraghavan.net>2020-04-05 22:10:46 +0300
commit205b6040fbb918c0fa736874b09f8e3f3f261e44 (patch)
tree37ed0d22ece764ec91dccefd3c2148dc26d5b96b /net
parent5d992692f0c494c64a379de6201239efe614a15e (diff)
Reorganise plugins into directories by function
This should start making navigating the tree a little easier to start with, and we can then move to allowing building specific groups of plugins as well. The plugins are moved into the following hierarchy: audio / gst-plugin-audiofx / gst-plugin-claxon / gst-plugin-csound / gst-plugin-lewton generic / gst-plugin-file / gst-plugin-sodium / gst-plugin-threadshare net / gst-plugin-reqwest / gst-plugin-rusoto utils / gst-plugin-fallbackswitch / gst-plugin-togglerecord video / gst-plugin-cdg / gst-plugin-closedcaption / gst-plugin-dav1d / gst-plugin-flv / gst-plugin-gif / gst-plugin-rav1e gst-plugin-tutorial gst-plugin-version-helper
Diffstat (limited to 'net')
-rw-r--r--net/gst-plugin-reqwest/Cargo.toml31
-rw-r--r--net/gst-plugin-reqwest/build.rs5
-rw-r--r--net/gst-plugin-reqwest/src/lib.rs35
-rw-r--r--net/gst-plugin-reqwest/src/reqwesthttpsrc.rs1157
-rw-r--r--net/gst-plugin-reqwest/tests/reqwesthttpsrc.rs1186
-rw-r--r--net/gst-plugin-rusoto/Cargo.toml29
-rw-r--r--net/gst-plugin-rusoto/README.md49
-rw-r--r--net/gst-plugin-rusoto/build.rs5
-rw-r--r--net/gst-plugin-rusoto/src/lib.rs41
-rw-r--r--net/gst-plugin-rusoto/src/s3sink.rs566
-rw-r--r--net/gst-plugin-rusoto/src/s3src.rs462
-rw-r--r--net/gst-plugin-rusoto/src/s3url.rs178
-rw-r--r--net/gst-plugin-rusoto/src/s3utils.rs48
13 files changed, 3792 insertions, 0 deletions
diff --git a/net/gst-plugin-reqwest/Cargo.toml b/net/gst-plugin-reqwest/Cargo.toml
new file mode 100644
index 000000000..7ef9c8976
--- /dev/null
+++ b/net/gst-plugin-reqwest/Cargo.toml
@@ -0,0 +1,31 @@
+[package]
+name = "gst-plugin-reqwest"
+version = "0.6.0"
+authors = ["Sebastian Dröge <sebastian@centricular.com>"]
+repository = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs"
+license = "MIT/Apache-2.0"
+description = "Rust HTTP Plugin"
+edition = "2018"
+
+[dependencies]
+url = "2.1"
+glib = { git = "https://github.com/gtk-rs/glib" }
+reqwest = { version = "0.10", features = ["cookies", "gzip"] }
+futures = "0.3"
+hyperx = "1.0"
+mime = "0.3"
+gstreamer = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_10"] }
+gstreamer-base = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
+tokio = { version = "0.2", features = ["time", "rt-threaded"] }
+lazy_static = "1.0"
+
+[dev-dependencies]
+hyper = "0.13"
+
+[lib]
+name = "gstreqwest"
+crate-type = ["cdylib", "rlib"]
+path = "src/lib.rs"
+
+[build-dependencies]
+gst-plugin-version-helper = { path="../../gst-plugin-version-helper" }
diff --git a/net/gst-plugin-reqwest/build.rs b/net/gst-plugin-reqwest/build.rs
new file mode 100644
index 000000000..0d1ddb61d
--- /dev/null
+++ b/net/gst-plugin-reqwest/build.rs
@@ -0,0 +1,5 @@
+extern crate gst_plugin_version_helper;
+
+fn main() {
+ gst_plugin_version_helper::get_info()
+}
diff --git a/net/gst-plugin-reqwest/src/lib.rs b/net/gst-plugin-reqwest/src/lib.rs
new file mode 100644
index 000000000..7960329b9
--- /dev/null
+++ b/net/gst-plugin-reqwest/src/lib.rs
@@ -0,0 +1,35 @@
+// Copyright (C) 2016-2017 Sebastian Dröge <sebastian@centricular.com>
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+#![crate_type = "cdylib"]
+
+#[macro_use]
+extern crate glib;
+#[macro_use]
+extern crate gstreamer as gst;
+extern crate gstreamer_base as gst_base;
+#[macro_use]
+extern crate lazy_static;
+
+mod reqwesthttpsrc;
+
+fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ reqwesthttpsrc::register(plugin)
+}
+
+gst_plugin_define!(
+ reqwest,
+ env!("CARGO_PKG_DESCRIPTION"),
+ plugin_init,
+ concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")),
+ "MIT/X11",
+ env!("CARGO_PKG_NAME"),
+ env!("CARGO_PKG_NAME"),
+ env!("CARGO_PKG_REPOSITORY"),
+ env!("BUILD_REL_DATE")
+);
diff --git a/net/gst-plugin-reqwest/src/reqwesthttpsrc.rs b/net/gst-plugin-reqwest/src/reqwesthttpsrc.rs
new file mode 100644
index 000000000..380f5df23
--- /dev/null
+++ b/net/gst-plugin-reqwest/src/reqwesthttpsrc.rs
@@ -0,0 +1,1157 @@
+// Copyright (C) 2016-2018 Sebastian Dröge <sebastian@centricular.com>
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+use std::sync::{Arc, Mutex};
+use std::time::Duration;
+use std::u64;
+
+use futures::future;
+use futures::prelude::*;
+use reqwest::{Client, Response, StatusCode};
+use tokio::runtime;
+use url::Url;
+
+use glib;
+use glib::subclass;
+use glib::subclass::prelude::*;
+use gst;
+use gst::prelude::*;
+use gst::subclass::prelude::*;
+use gst_base;
+use gst_base::prelude::*;
+use gst_base::subclass::prelude::*;
+
+const DEFAULT_LOCATION: Option<Url> = None;
+const DEFAULT_USER_AGENT: &str = concat!(
+ "GStreamer reqwesthttpsrc ",
+ env!("CARGO_PKG_VERSION"),
+ "-",
+ env!("COMMIT_ID")
+);
+const DEFAULT_IS_LIVE: bool = false;
+const DEFAULT_TIMEOUT: u32 = 15;
+const DEFAULT_COMPRESS: bool = false;
+const DEFAULT_IRADIO_MODE: bool = true;
+const DEFAULT_KEEP_ALIVE: bool = true;
+
+#[derive(Debug, Clone)]
+struct Settings {
+ location: Option<Url>,
+ user_agent: String,
+ user_id: Option<String>,
+ user_pw: Option<String>,
+ timeout: u32,
+ compress: bool,
+ extra_headers: Option<gst::Structure>,
+ cookies: Vec<String>,
+ iradio_mode: bool,
+ keep_alive: bool,
+}
+
+impl Default for Settings {
+ fn default() -> Self {
+ Settings {
+ location: DEFAULT_LOCATION,
+ user_agent: DEFAULT_USER_AGENT.into(),
+ user_id: None,
+ user_pw: None,
+ timeout: DEFAULT_TIMEOUT,
+ compress: DEFAULT_COMPRESS,
+ extra_headers: None,
+ cookies: Vec::new(),
+ iradio_mode: DEFAULT_IRADIO_MODE,
+ keep_alive: DEFAULT_KEEP_ALIVE,
+ }
+ }
+}
+
+static PROPERTIES: [subclass::Property; 11] = [
+ subclass::Property("location", |name| {
+ glib::ParamSpec::string(
+ name,
+ "Location",
+ "URL to read from",
+ None,
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("user-agent", |name| {
+ glib::ParamSpec::string(
+ name,
+ "User-Agent",
+ "Value of the User-Agent HTTP request header field",
+ DEFAULT_USER_AGENT.into(),
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("is-live", |name| {
+ glib::ParamSpec::boolean(
+ name,
+ "Is Live",
+ "Act like a live source",
+ DEFAULT_IS_LIVE,
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("user-id", |name| {
+ glib::ParamSpec::string(
+ name,
+ "User-id",
+ "HTTP location URI user id for authentication",
+ None,
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("user-pw", |name| {
+ glib::ParamSpec::string(
+ name,
+ "User-pw",
+ "HTTP location URI user password for authentication",
+ None,
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("timeout", |name| {
+ glib::ParamSpec::uint(
+ name,
+ "Timeout",
+ "Value in seconds to timeout a blocking I/O (0 = No timeout).",
+ 0,
+ 3600,
+ DEFAULT_TIMEOUT,
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("compress", |name| {
+ glib::ParamSpec::boolean(
+ name,
+ "Compress",
+ "Allow compressed content encodings",
+ DEFAULT_COMPRESS,
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("extra-headers", |name| {
+ glib::ParamSpec::boxed(
+ name,
+ "Extra Headers",
+ "Extra headers to append to the HTTP request",
+ gst::Structure::static_type(),
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("cookies", |name| {
+ glib::ParamSpec::boxed(
+ name,
+ "Cookies",
+ "HTTP request cookies",
+ Vec::<String>::static_type(),
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("iradio-mode", |name| {
+ glib::ParamSpec::boolean(
+ name,
+ "I-Radio Mode",
+ "Enable internet radio mode (ask server to send shoutcast/icecast metadata interleaved with the actual stream data",
+ DEFAULT_IRADIO_MODE,
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("keep-alive", |name| {
+ glib::ParamSpec::boolean(
+ name,
+ "Keep Alive",
+ "Use HTTP persistent connections",
+ DEFAULT_KEEP_ALIVE,
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+];
+
+const REQWEST_CLIENT_CONTEXT: &str = "gst.reqwest.client";
+
+#[derive(Clone, Debug, GBoxed)]
+#[gboxed(type_name = "ReqwestClientContext")]
+struct ClientContext(Arc<ClientContextInner>);
+
+#[derive(Debug)]
+struct ClientContextInner {
+ client: Client,
+}
+
+#[allow(clippy::large_enum_variant)]
+#[derive(Debug)]
+enum State {
+ Stopped,
+ Started {
+ uri: Url,
+ response: Option<Response>,
+ seekable: bool,
+ position: u64,
+ size: Option<u64>,
+ start: u64,
+ stop: Option<u64>,
+ caps: Option<gst::Caps>,
+ tags: Option<gst::TagList>,
+ },
+}
+
+impl Default for State {
+ fn default() -> Self {
+ State::Stopped
+ }
+}
+
+#[derive(Debug)]
+pub struct ReqwestHttpSrc {
+ client: Mutex<Option<ClientContext>>,
+ external_client: Mutex<Option<ClientContext>>,
+ settings: Mutex<Settings>,
+ state: Mutex<State>,
+ canceller: Mutex<Option<future::AbortHandle>>,
+}
+
+lazy_static! {
+ static ref CAT: gst::DebugCategory = gst::DebugCategory::new(
+ "reqwesthttpsrc",
+ gst::DebugColorFlags::empty(),
+ Some("Rust HTTP source"),
+ );
+ static ref RUNTIME: runtime::Runtime = runtime::Builder::new()
+ .threaded_scheduler()
+ .enable_all()
+ .core_threads(1)
+ .build()
+ .unwrap();
+}
+
+impl ReqwestHttpSrc {
+ fn set_location(
+ &self,
+ _element: &gst_base::BaseSrc,
+ uri: Option<&str>,
+ ) -> Result<(), glib::Error> {
+ let state = self.state.lock().unwrap();
+ if let State::Started { .. } = *state {
+ return Err(glib::Error::new(
+ gst::URIError::BadState,
+ "Changing the `location` property on a started `reqwesthttpsrc` is not supported",
+ ));
+ }
+
+ let mut settings = self.settings.lock().unwrap();
+
+ if uri.is_none() {
+ settings.location = DEFAULT_LOCATION;
+ return Ok(());
+ }
+
+ let uri = uri.unwrap();
+ let uri = Url::parse(uri).map_err(|err| {
+ glib::Error::new(
+ gst::URIError::BadUri,
+ format!("Failed to parse URI '{}': {:?}", uri, err).as_str(),
+ )
+ })?;
+
+ if uri.scheme() != "http" && uri.scheme() != "https" {
+ return Err(glib::Error::new(
+ gst::URIError::UnsupportedProtocol,
+ format!("Unsupported URI scheme '{}'", uri.scheme()).as_str(),
+ ));
+ }
+
+ settings.location = Some(uri);
+
+ Ok(())
+ }
+
+ fn ensure_client(&self, src: &gst_base::BaseSrc) -> Result<ClientContext, gst::ErrorMessage> {
+ let mut client_guard = self.client.lock().unwrap();
+ if let Some(ref client) = *client_guard {
+ gst_debug!(CAT, obj: src, "Using already configured client");
+ return Ok(client.clone());
+ }
+
+ let srcpad = src.get_static_pad("src").unwrap();
+ let mut q = gst::Query::new_context(REQWEST_CLIENT_CONTEXT);
+ if srcpad.peer_query(&mut q) {
+ if let Some(context) = q.get_context_owned() {
+ src.set_context(&context);
+ }
+ } else {
+ let _ = src.post_message(
+ &gst::Message::new_need_context(REQWEST_CLIENT_CONTEXT)
+ .src(Some(src))
+ .build(),
+ );
+ }
+
+ if let Some(client) = {
+ // FIXME: Is there a simpler way to ensure the lock is not hold
+ // after this block anymore?
+ let external_client = self.external_client.lock().unwrap();
+ let client = external_client.as_ref().cloned();
+ drop(external_client);
+ client
+ } {
+ gst_debug!(CAT, obj: src, "Using shared client");
+ *client_guard = Some(client.clone());
+
+ return Ok(client);
+ }
+
+ gst_debug!(CAT, obj: src, "Creating new client");
+ let client = ClientContext(Arc::new(ClientContextInner {
+ client: Client::builder()
+ .cookie_store(true)
+ .gzip(true)
+ .build()
+ .map_err(|err| {
+ gst_error_msg!(
+ gst::ResourceError::OpenRead,
+ ["Failed to create Client: {}", err]
+ )
+ })?,
+ }));
+
+ gst_debug!(CAT, obj: src, "Sharing new client with other elements");
+ let mut context = gst::Context::new(REQWEST_CLIENT_CONTEXT, true);
+ {
+ let context = context.get_mut().unwrap();
+ let s = context.get_mut_structure();
+ s.set("client", &client);
+ }
+ src.set_context(&context);
+ let _ = src.post_message(
+ &gst::Message::new_have_context(context)
+ .src(Some(src))
+ .build(),
+ );
+
+ *client_guard = Some(client.clone());
+
+ Ok(client)
+ }
+
+ fn do_request(
+ &self,
+ src: &gst_base::BaseSrc,
+ uri: Url,
+ start: u64,
+ stop: Option<u64>,
+ ) -> Result<State, Option<gst::ErrorMessage>> {
+ use hyperx::header::{
+ qitem, AcceptEncoding, AcceptRanges, ByteRangeSpec, Connection, ContentLength,
+ ContentRange, ContentRangeSpec, ContentType, Cookie, Encoding, Range, RangeUnit,
+ TypedHeaders, UserAgent,
+ };
+ use reqwest::header::HeaderMap;
+
+ gst_debug!(CAT, obj: src, "Creating new request for {}", uri);
+
+ let req = {
+ let client = self.ensure_client(src)?;
+ client.0.client.get(uri.clone())
+ };
+ let settings = self.settings.lock().unwrap().clone();
+
+ let mut headers = HeaderMap::new();
+
+ if settings.keep_alive {
+ headers.encode(&Connection::keep_alive());
+ } else {
+ headers.encode(&Connection::close());
+ }
+
+ match (start != 0, stop) {
+ (false, None) => (),
+ (true, None) => {
+ headers.encode(&Range::Bytes(vec![ByteRangeSpec::AllFrom(start)]));
+ }
+ (_, Some(stop)) => {
+ headers.encode(&Range::Bytes(vec![ByteRangeSpec::FromTo(start, stop - 1)]));
+ }
+ }
+
+ headers.encode(&UserAgent::new(settings.user_agent));
+
+ if !settings.compress {
+ // Compression is the default
+ headers.encode(&AcceptEncoding(vec![qitem(Encoding::Identity)]));
+ };
+
+ if let Some(ref extra_headers) = settings.extra_headers {
+ use reqwest::header::{HeaderName, HeaderValue};
+ use std::convert::TryFrom;
+
+ for (field, value) in extra_headers.iter() {
+ let field = match HeaderName::try_from(field) {
+ Ok(field) => field,
+ Err(err) => {
+ gst_warning!(
+ CAT,
+ obj: src,
+ "Failed to transform extra-header field name '{}' to header name: {}",
+ field,
+ err,
+ );
+
+ continue;
+ }
+ };
+
+ let mut append_header = |field: &HeaderName, value: &glib::Value| {
+ let value = match value.transform::<String>() {
+ Some(value) => value,
+ None => {
+ gst_warning!(
+ CAT,
+ obj: src,
+ "Failed to transform extra-header '{}' value to string",
+ field
+ );
+ return;
+ }
+ };
+
+ let value = value.get::<&str>().unwrap().unwrap_or("");
+
+ let value = match HeaderValue::from_str(value) {
+ Ok(value) => value,
+ Err(_) => {
+ gst_warning!(
+ CAT,
+ obj: src,
+ "Failed to transform extra-header '{}' value to header value",
+ field
+ );
+ return;
+ }
+ };
+
+ headers.append(field.clone(), value);
+ };
+
+ if let Ok(Some(values)) = value.get::<gst::Array>() {
+ for value in values.as_slice() {
+ append_header(&field, value);
+ }
+ } else if let Ok(Some(values)) = value.get::<gst::List>() {
+ for value in values.as_slice() {
+ append_header(&field, value);
+ }
+ } else {
+ append_header(&field, value);
+ }
+ }
+ }
+
+ if !settings.cookies.is_empty() {
+ let mut cookies = Cookie::new();
+ for cookie in settings.cookies {
+ let mut split = cookie.splitn(2, '=');
+ let key = split.next();
+ let value = split.next();
+ if let (Some(key), Some(value)) = (key, value) {
+ cookies.append(String::from(key), String::from(value));
+ }
+ }
+ headers.encode(&cookies);
+ }
+
+ if settings.iradio_mode {
+ headers.append("icy-metadata", "1".parse().unwrap());
+ }
+
+ // Add all headers for the request here
+ let req = req.headers(headers);
+
+ let req = if let Some(ref user_id) = settings.user_id {
+ // HTTP auth available
+ req.basic_auth(user_id, settings.user_pw)
+ } else {
+ req
+ };
+
+ gst_debug!(CAT, obj: src, "Sending new request: {:?}", req);
+
+ let future = async {
+ req.send().await.map_err(|err| {
+ gst_error_msg!(
+ gst::ResourceError::OpenRead,
+ ["Failed to fetch {}: {:?}", uri, err]
+ )
+ })
+ };
+ let res = self.wait(future);
+
+ let res = match res {
+ Ok(res) => res,
+ Err(Some(err)) => {
+ gst_debug!(CAT, obj: src, "Error {:?}", err);
+ return Err(Some(err));
+ }
+ Err(None) => {
+ gst_debug!(CAT, obj: src, "Flushing");
+ return Err(None);
+ }
+ };
+
+ gst_debug!(CAT, obj: src, "Received response: {:?}", res);
+
+ if !res.status().is_success() {
+ match res.status() {
+ StatusCode::NOT_FOUND => {
+ gst_error!(CAT, obj: src, "Resource not found");
+ return Err(Some(gst_error_msg!(
+ gst::ResourceError::NotFound,
+ ["Resource '{}' not found", uri]
+ )));
+ }
+ StatusCode::UNAUTHORIZED
+ | StatusCode::PAYMENT_REQUIRED
+ | StatusCode::FORBIDDEN
+ | StatusCode::PROXY_AUTHENTICATION_REQUIRED => {
+ gst_error!(CAT, obj: src, "Not authorized: {}", res.status());
+ return Err(Some(gst_error_msg!(
+ gst::ResourceError::NotAuthorized,
+ ["Not Authorized for resource '{}': {}", uri, res.status()]
+ )));
+ }
+ _ => {
+ gst_error!(CAT, obj: src, "Request failed: {}", res.status());
+ return Err(Some(gst_error_msg!(
+ gst::ResourceError::OpenRead,
+ ["Request for '{}' failed: {}", uri, res.status()]
+ )));
+ }
+ }
+ }
+
+ let headers = res.headers();
+ let size = headers.decode().map(|ContentLength(cl)| cl + start).ok();
+
+ let accept_byte_ranges = if let Ok(AcceptRanges(ref ranges)) = headers.decode() {
+ ranges.iter().any(|u| *u == RangeUnit::Bytes)
+ } else {
+ false
+ };
+ let seekable = size.is_some() && accept_byte_ranges;
+
+ let position = if let Ok(ContentRange(ContentRangeSpec::Bytes {
+ range: Some((range_start, _)),
+ ..
+ })) = headers.decode()
+ {
+ range_start
+ } else {
+ 0
+ };
+
+ if position != start {
+ return Err(Some(gst_error_msg!(
+ gst::ResourceError::Seek,
+ ["Failed to seek to {}: Got {}", start, position]
+ )));
+ }
+
+ let mut caps = headers
+ .get("icy-metaint")
+ .and_then(|s| s.to_str().ok())
+ .and_then(|s| s.parse::<i32>().ok())
+ .map(|icy_metaint| {
+ gst::Caps::builder("application/x-icy")
+ .field("metadata-interval", &icy_metaint)
+ .build()
+ });
+
+ if let Ok(ContentType(ref content_type)) = headers.decode() {
+ gst_debug!(CAT, obj: src, "Got content type {}", content_type);
+ if let Some(ref mut caps) = caps {
+ let caps = caps.get_mut().unwrap();
+ let s = caps.get_mut_structure(0).unwrap();
+ s.set("content-type", &content_type.as_ref());
+ } else if content_type.type_() == "audio" && content_type.subtype() == "L16" {
+ let channels = content_type
+ .get_param("channels")
+ .and_then(|s| s.as_ref().parse::<i32>().ok())
+ .unwrap_or(2);
+ let rate = content_type
+ .get_param("rate")
+ .and_then(|s| s.as_ref().parse::<i32>().ok())
+ .unwrap_or(44_100);
+
+ caps = Some(
+ gst::Caps::builder("audio/x-unaligned-raw")
+ .field("format", &"S16BE")
+ .field("layout", &"interleaved")
+ .field("channels", &channels)
+ .field("rate", &rate)
+ .build(),
+ );
+ }
+ }
+
+ let mut tags = gst::TagList::new();
+ {
+ let tags = tags.get_mut().unwrap();
+
+ if let Some(ref icy_name) = headers.get("icy-name").and_then(|s| s.to_str().ok()) {
+ tags.add::<gst::tags::Organization>(icy_name, gst::TagMergeMode::Replace);
+ }
+
+ if let Some(ref icy_genre) = headers.get("icy-genre").and_then(|s| s.to_str().ok()) {
+ tags.add::<gst::tags::Genre>(icy_genre, gst::TagMergeMode::Replace);
+ }
+
+ if let Some(ref icy_url) = headers.get("icy-url").and_then(|s| s.to_str().ok()) {
+ tags.add::<gst::tags::Location>(icy_url, gst::TagMergeMode::Replace);
+ }
+ }
+
+ gst_debug!(CAT, obj: src, "Request successful");
+
+ Ok(State::Started {
+ uri,
+ response: Some(res),
+ seekable,
+ position,
+ size,
+ start,
+ stop,
+ caps,
+ tags: if tags.n_tags() > 0 { Some(tags) } else { None },
+ })
+ }
+
+ fn wait<F, T>(&self, future: F) -> Result<T, Option<gst::ErrorMessage>>
+ where
+ F: Send + Future<Output = Result<T, gst::ErrorMessage>>,
+ T: Send + 'static,
+ {
+ let timeout = self.settings.lock().unwrap().timeout;
+
+ let mut canceller = self.canceller.lock().unwrap();
+ let (abort_handle, abort_registration) = future::AbortHandle::new_pair();
+ canceller.replace(abort_handle);
+ drop(canceller);
+
+ // Wrap in a timeout
+ let future = async {
+ if timeout == 0 {
+ future.await
+ } else {
+ let res = tokio::time::timeout(Duration::from_secs(timeout.into()), future).await;
+
+ match res {
+ Ok(res) => res,
+ Err(_) => Err(gst_error_msg!(
+ gst::ResourceError::Read,
+ ["Request timeout"]
+ )),
+ }
+ }
+ };
+
+ // And make abortable
+ let future = async {
+ match future::Abortable::new(future, abort_registration).await {
+ Ok(res) => res.map_err(Some),
+ Err(_) => Err(None),
+ }
+ };
+
+ let res = RUNTIME.enter(|| futures::executor::block_on(future));
+
+ /* Clear out the canceller */
+ let _ = self.canceller.lock().unwrap().take();
+
+ res
+ }
+}
+
+impl ObjectImpl for ReqwestHttpSrc {
+ glib_object_impl!();
+
+ fn set_property(&self, obj: &glib::Object, id: usize, value: &glib::Value) {
+ let prop = &PROPERTIES[id];
+ match *prop {
+ subclass::Property("location", ..) => {
+ let element = obj.downcast_ref::<gst_base::BaseSrc>().unwrap();
+
+ let location = value.get::<&str>().expect("type checked upstream");
+ if let Err(err) = self.set_location(element, location) {
+ gst_error!(
+ CAT,
+ obj: element,
+ "Failed to set property `location`: {:?}",
+ err
+ );
+ }
+ }
+ subclass::Property("user-agent", ..) => {
+ let mut settings = self.settings.lock().unwrap();
+ let user_agent = value
+ .get()
+ .expect("type checked upstream")
+ .unwrap_or_else(|| DEFAULT_USER_AGENT.into());
+ settings.user_agent = user_agent;
+ }
+ subclass::Property("is-live", ..) => {
+ let element = obj.downcast_ref::<gst_base::BaseSrc>().unwrap();
+ let is_live = value.get_some().expect("type checked upstream");
+ element.set_live(is_live);
+ }
+ subclass::Property("user-id", ..) => {
+ let mut settings = self.settings.lock().unwrap();
+ let user_id = value.get().expect("type checked upstream");
+ settings.user_id = user_id;
+ }
+ subclass::Property("user-pw", ..) => {
+ let mut settings = self.settings.lock().unwrap();
+ let user_pw = value.get().expect("type checked upstream");
+ settings.user_pw = user_pw;
+ }
+ subclass::Property("timeout", ..) => {
+ let mut settings = self.settings.lock().unwrap();
+ let timeout = value.get_some().expect("type checked upstream");
+ settings.timeout = timeout;
+ }
+ subclass::Property("compress", ..) => {
+ let mut settings = self.settings.lock().unwrap();
+ let compress = value.get_some().expect("type checked upstream");
+ settings.compress = compress;
+ }
+ subclass::Property("extra-headers", ..) => {
+ let mut settings = self.settings.lock().unwrap();
+ let extra_headers = value.get().expect("type checked upstream");
+ settings.extra_headers = extra_headers;
+ }
+ subclass::Property("cookies", ..) => {
+ let mut settings = self.settings.lock().unwrap();
+ let cookies = value.get().expect("type checked upstream");
+ settings.cookies = cookies.unwrap_or_else(Vec::new);
+ }
+ subclass::Property("iradio-mode", ..) => {
+ let mut settings = self.settings.lock().unwrap();
+ let iradio_mode = value.get_some().expect("type checked upstream");
+ settings.iradio_mode = iradio_mode;
+ }
+ subclass::Property("keep-alive", ..) => {
+ let mut settings = self.settings.lock().unwrap();
+ let keep_alive = value.get_some().expect("type checked upstream");
+ settings.keep_alive = keep_alive;
+ }
+ _ => unimplemented!(),
+ };
+ }
+
+ fn get_property(&self, obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
+ let prop = &PROPERTIES[id];
+ match *prop {
+ subclass::Property("location", ..) => {
+ let settings = self.settings.lock().unwrap();
+ let location = settings.location.as_ref().map(Url::to_string);
+
+ Ok(location.to_value())
+ }
+ subclass::Property("user-agent", ..) => {
+ let settings = self.settings.lock().unwrap();
+ Ok(settings.user_agent.to_value())
+ }
+ subclass::Property("is-live", ..) => {
+ let element = obj.downcast_ref::<gst_base::BaseSrc>().unwrap();
+ Ok(element.is_live().to_value())
+ }
+ subclass::Property("user-id", ..) => {
+ let settings = self.settings.lock().unwrap();
+ Ok(settings.user_id.to_value())
+ }
+ subclass::Property("user-pw", ..) => {
+ let settings = self.settings.lock().unwrap();
+ Ok(settings.user_pw.to_value())
+ }
+ subclass::Property("timeout", ..) => {
+ let settings = self.settings.lock().unwrap();
+ Ok(settings.timeout.to_value())
+ }
+ subclass::Property("compress", ..) => {
+ let settings = self.settings.lock().unwrap();
+ Ok(settings.compress.to_value())
+ }
+ subclass::Property("extra-headers", ..) => {
+ let settings = self.settings.lock().unwrap();
+ Ok(settings.extra_headers.to_value())
+ }
+ subclass::Property("cookies", ..) => {
+ let settings = self.settings.lock().unwrap();
+ Ok(settings.cookies.to_value())
+ }
+ subclass::Property("iradio-mode", ..) => {
+ let settings = self.settings.lock().unwrap();
+ Ok(settings.iradio_mode.to_value())
+ }
+ subclass::Property("keep-alive", ..) => {
+ let settings = self.settings.lock().unwrap();
+ Ok(settings.keep_alive.to_value())
+ }
+ _ => unimplemented!(),
+ }
+ }
+
+ fn constructed(&self, obj: &glib::Object) {
+ self.parent_constructed(obj);
+ let element = obj.downcast_ref::<gst_base::BaseSrc>().unwrap();
+ element.set_automatic_eos(false);
+ element.set_format(gst::Format::Bytes);
+ }
+}
+
+impl ElementImpl for ReqwestHttpSrc {
+ fn set_context(&self, element: &gst::Element, context: &gst::Context) {
+ if context.get_context_type() == REQWEST_CLIENT_CONTEXT {
+ let mut external_client = self.external_client.lock().unwrap();
+ let s = context.get_structure();
+ *external_client = s
+ .get_some::<&ClientContext>("client")
+ .map(|c| Some(c.clone()))
+ .unwrap_or(None);
+ }
+
+ self.parent_set_context(element, context);
+ }
+
+ fn change_state(
+ &self,
+ element: &gst::Element,
+ transition: gst::StateChange,
+ ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
+ if let gst::StateChange::ReadyToNull = transition {
+ *self.client.lock().unwrap() = None;
+ }
+
+ self.parent_change_state(element, transition)
+ }
+}
+
+impl BaseSrcImpl for ReqwestHttpSrc {
+ fn is_seekable(&self, _src: &gst_base::BaseSrc) -> bool {
+ match *self.state.lock().unwrap() {
+ State::Started { seekable, .. } => seekable,
+ _ => false,
+ }
+ }
+
+ fn get_size(&self, _src: &gst_base::BaseSrc) -> Option<u64> {
+ match *self.state.lock().unwrap() {
+ State::Started { size, .. } => size,
+ _ => None,
+ }
+ }
+
+ fn unlock(&self, _src: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
+ let canceller = self.canceller.lock().unwrap();
+ if let Some(ref canceller) = *canceller {
+ canceller.abort();
+ }
+ Ok(())
+ }
+
+ fn start(&self, src: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
+ let mut state = self.state.lock().unwrap();
+
+ *state = State::Stopped;
+
+ let uri = self
+ .settings
+ .lock()
+ .unwrap()
+ .location
+ .as_ref()
+ .ok_or_else(|| {
+ gst_error_msg!(gst::CoreError::StateChange, ["Can't start without an URI"])
+ })
+ .map(|uri| uri.clone())?;
+
+ gst_debug!(CAT, obj: src, "Starting for URI {}", uri);
+
+ *state = self.do_request(src, uri, 0, None).map_err(|err| {
+ err.unwrap_or_else(|| {
+ gst_error_msg!(gst::LibraryError::Failed, ["Interrupted during start"])
+ })
+ })?;
+
+ Ok(())
+ }
+
+ fn stop(&self, src: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
+ gst_debug!(CAT, obj: src, "Stopping");
+ *self.state.lock().unwrap() = State::Stopped;
+
+ Ok(())
+ }
+
+ fn query(&self, element: &gst_base::BaseSrc, query: &mut gst::QueryRef) -> bool {
+ use gst::QueryView;
+
+ match query.view_mut() {
+ QueryView::Scheduling(ref mut q) => {
+ q.set(
+ gst::SchedulingFlags::SEQUENTIAL | gst::SchedulingFlags::BANDWIDTH_LIMITED,
+ 1,
+ -1,
+ 0,
+ );
+ q.add_scheduling_modes(&[gst::PadMode::Push]);
+ true
+ }
+ _ => BaseSrcImplExt::parent_query(self, element, query),
+ }
+ }
+
+ fn do_seek(&self, src: &gst_base::BaseSrc, segment: &mut gst::Segment) -> bool {
+ let segment = segment.downcast_mut::<gst::format::Bytes>().unwrap();
+
+ let mut state = self.state.lock().unwrap();
+
+ let (position, old_stop, uri) = match *state {
+ State::Started {
+ position,
+ stop,
+ ref uri,
+ ..
+ } => (position, stop, uri.clone()),
+ State::Stopped => {
+ gst_element_error!(src, gst::LibraryError::Failed, ["Not started yet"]);
+
+ return false;
+ }
+ };
+
+ let start = segment.get_start().expect("No start position given");
+ let stop = segment.get_stop();
+
+ gst_debug!(CAT, obj: src, "Seeking to {}-{:?}", start, stop);
+
+ if position == start && old_stop == stop.0 {
+ gst_debug!(CAT, obj: src, "No change to current request");
+ return true;
+ }
+
+ *state = State::Stopped;
+ match self.do_request(src, uri, start, stop.0) {
+ Ok(s) => {
+ *state = s;
+ true
+ }
+ Err(Some(err)) => {
+ src.post_error_message(&err);
+ false
+ }
+ Err(None) => false,
+ }
+ }
+}
+
+impl PushSrcImpl for ReqwestHttpSrc {
+ fn create(&self, src: &gst_base::PushSrc) -> Result<gst::Buffer, gst::FlowError> {
+ let mut state = self.state.lock().unwrap();
+
+ let (response, position, caps, tags) = match *state {
+ State::Started {
+ ref mut response,
+ ref mut position,
+ ref mut tags,
+ ref mut caps,
+ ..
+ } => (response, position, caps, tags),
+ State::Stopped => {
+ gst_element_error!(src, gst::LibraryError::Failed, ["Not started yet"]);
+
+ return Err(gst::FlowError::Error);
+ }
+ };
+
+ let offset = *position;
+
+ let mut current_response = match response.take() {
+ Some(response) => response,
+ None => {
+ gst_error!(CAT, obj: src, "Don't have a response");
+ gst_element_error!(src, gst::ResourceError::Read, ["Don't have a response"]);
+
+ return Err(gst::FlowError::Error);
+ }
+ };
+
+ let tags = tags.take();
+ let caps = caps.take();
+ drop(state);
+
+ if let Some(caps) = caps {
+ gst_debug!(CAT, obj: src, "Setting caps {:?}", caps);
+ src.set_caps(&caps)
+ .map_err(|_| gst::FlowError::NotNegotiated)?;
+ }
+
+ if let Some(tags) = tags {
+ gst_debug!(CAT, obj: src, "Sending iradio tags {:?}", tags);
+ let pad = src.get_static_pad("src").unwrap();
+ pad.push_event(gst::Event::new_tag(tags).build());
+ }
+
+ let future = async {
+ current_response.chunk().await.map_err(move |err| {
+ gst_error_msg!(
+ gst::ResourceError::Read,
+ ["Failed to read chunk at offset {}: {:?}", offset, err]
+ )
+ })
+ };
+ let res = self.wait(future);
+
+ let res = match res {
+ Ok(res) => res,
+ Err(Some(err)) => {
+ gst_debug!(CAT, obj: src, "Error {:?}", err);
+ src.post_error_message(&err);
+ return Err(gst::FlowError::Error);
+ }
+ Err(None) => {
+ gst_debug!(CAT, obj: src, "Flushing");
+ return Err(gst::FlowError::Flushing);
+ }
+ };
+
+ let mut state = self.state.lock().unwrap();
+ let (response, position) = match *state {
+ State::Started {
+ ref mut response,
+ ref mut position,
+ ..
+ } => (response, position),
+ State::Stopped => {
+ gst_element_error!(src, gst::LibraryError::Failed, ["Not started yet"]);
+
+ return Err(gst::FlowError::Error);
+ }
+ };
+
+ match res {
+ Some(chunk) => {
+ /* do something with the chunk and store the body again in the state */
+
+ gst_trace!(
+ CAT,
+ obj: src,
+ "Chunk of {} bytes received at offset {}",
+ chunk.len(),
+ offset
+ );
+ let size = chunk.len();
+ assert_ne!(chunk.len(), 0);
+
+ *position += size as u64;
+
+ let mut buffer = gst::Buffer::from_slice(chunk);
+
+ *response = Some(current_response);
+
+ {
+ let buffer = buffer.get_mut().unwrap();
+ buffer.set_offset(offset);
+ buffer.set_offset_end(offset + size as u64);
+ }
+
+ Ok(buffer)
+ }
+ None => {
+ /* No further data, end of stream */
+ gst_debug!(CAT, obj: src, "End of stream");
+ *response = Some(current_response);
+ Err(gst::FlowError::Eos)
+ }
+ }
+ }
+}
+
+impl URIHandlerImpl for ReqwestHttpSrc {
+ fn get_uri(&self, _element: &gst::URIHandler) -> Option<String> {
+ let settings = self.settings.lock().unwrap();
+
+ settings.location.as_ref().map(Url::to_string)
+ }
+
+ fn set_uri(&self, element: &gst::URIHandler, uri: &str) -> Result<(), glib::Error> {
+ let element = element.dynamic_cast_ref::<gst_base::BaseSrc>().unwrap();
+
+ self.set_location(&element, Some(uri))
+ }
+
+ fn get_uri_type() -> gst::URIType {
+ gst::URIType::Src
+ }
+
+ fn get_protocols() -> Vec<String> {
+ vec!["http".to_string(), "https".to_string()]
+ }
+}
+
+impl ObjectSubclass for ReqwestHttpSrc {
+ const NAME: &'static str = "ReqwestHttpSrc";
+ type ParentType = gst_base::PushSrc;
+ type Instance = gst::subclass::ElementInstanceStruct<Self>;
+ type Class = subclass::simple::ClassStruct<Self>;
+
+ glib_object_subclass!();
+
+ fn new() -> Self {
+ Self {
+ client: Mutex::new(None),
+ external_client: Mutex::new(None),
+ settings: Mutex::new(Default::default()),
+ state: Mutex::new(Default::default()),
+ canceller: Mutex::new(None),
+ }
+ }
+
+ fn type_init(type_: &mut subclass::InitializingType<Self>) {
+ type_.add_interface::<gst::URIHandler>();
+ }
+
+ fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
+ klass.set_metadata(
+ "HTTP Source",
+ "Source/Network/HTTP",
+ "Read stream from an HTTP/HTTPS location",
+ "Sebastian Dröge <sebastian@centricular.com>",
+ );
+
+ let caps = gst::Caps::new_any();
+ let src_pad_template = gst::PadTemplate::new(
+ "src",
+ gst::PadDirection::Src,
+ gst::PadPresence::Always,
+ &caps,
+ )
+ .unwrap();
+ klass.add_pad_template(src_pad_template);
+
+ klass.install_properties(&PROPERTIES);
+ }
+}
+
+pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ gst::Element::register(
+ Some(plugin),
+ "reqwesthttpsrc",
+ gst::Rank::Marginal,
+ ReqwestHttpSrc::get_type(),
+ )
+}
diff --git a/net/gst-plugin-reqwest/tests/reqwesthttpsrc.rs b/net/gst-plugin-reqwest/tests/reqwesthttpsrc.rs
new file mode 100644
index 000000000..4daa14215
--- /dev/null
+++ b/net/gst-plugin-reqwest/tests/reqwesthttpsrc.rs
@@ -0,0 +1,1186 @@
+// Copyright (C) 2019 Sebastian Dröge <sebastian@centricular.com>
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use gst::prelude::*;
+use gstreamer as gst;
+
+use std::sync::mpsc;
+
+fn init() {
+ use std::sync::Once;
+ static INIT: Once = Once::new();
+
+ INIT.call_once(|| {
+ gst::init().unwrap();
+ gstreqwest::plugin_register_static().expect("reqwesthttpsrc tests");
+ });
+}
+
+/// Our custom test harness around the HTTP source
+#[derive(Debug)]
+struct Harness {
+ src: gst::Element,
+ pad: gst::Pad,
+ receiver: Option<mpsc::Receiver<Message>>,
+ rt: Option<tokio::runtime::Runtime>,
+}
+
+/// Messages sent from our test harness
+#[derive(Debug, Clone)]
+enum Message {
+ Buffer(gst::Buffer),
+ Event(gst::Event),
+ Message(gst::Message),
+ ServerError(String),
+}
+
+impl Harness {
+ /// Creates a new HTTP source and test harness around it
+ ///
+ /// `http_func`: Function to generate HTTP responses based on a request
+ /// `setup_func`: Setup function for the HTTP source, should only set properties and similar
+ fn new<
+ F: FnMut(hyper::Request<hyper::Body>) -> hyper::Response<hyper::Body> + Send + 'static,
+ G: FnOnce(&gst::Element),
+ >(
+ http_func: F,
+ setup_func: G,
+ ) -> Harness {
+ use hyper::service::{make_service_fn, service_fn};
+ use hyper::Server;
+ use std::sync::{Arc, Mutex};
+
+ // Create the HTTP source
+ let src = gst::ElementFactory::make("reqwesthttpsrc", None).unwrap();
+
+ // Sender/receiver for the messages we generate from various places for the tests
+ //
+ // Sending to this sender will block until the corresponding item was received from the
+ // receiver, which allows us to handle everything as if it is running in a single thread
+ let (sender, receiver) = mpsc::sync_channel(0);
+
+ // Sink pad that receives everything the source is generating
+ let pad = gst::Pad::new(Some("sink"), gst::PadDirection::Sink);
+ let srcpad = src.get_static_pad("src").unwrap();
+ srcpad.link(&pad).unwrap();
+
+ // Collect all buffers, events and messages sent from the source
+ let sender_clone = sender.clone();
+ pad.set_chain_function(move |_pad, _parent, buffer| {
+ let _ = sender_clone.send(Message::Buffer(buffer));
+ Ok(gst::FlowSuccess::Ok)
+ });
+ let sender_clone = sender.clone();
+ pad.set_event_function(move |_pad, _parent, event| {
+ let _ = sender_clone.send(Message::Event(event));
+ true
+ });
+ let bus = gst::Bus::new();
+ bus.set_flushing(false);
+ src.set_bus(Some(&bus));
+ let sender_clone = sender.clone();
+ bus.set_sync_handler(move |_bus, msg| {
+ let _ = sender_clone.send(Message::Message(msg.clone()));
+ gst::BusSyncReply::Drop
+ });
+
+ // Activate the pad so that it can be used now
+ pad.set_active(true).unwrap();
+
+ // Create the tokio runtime used for the HTTP server in this test
+ let rt = tokio::runtime::Builder::new()
+ .core_threads(1)
+ .enable_all()
+ .threaded_scheduler()
+ .build()
+ .unwrap();
+
+ // Create an HTTP sever that listens on localhost on some random, free port
+ let addr = ([127, 0, 0, 1], 0).into();
+
+ // Whenever a new client is connecting, a new service function is requested. For each
+ // client we use the same service function, which simply calls the function used by the
+ // test
+ let http_func = Arc::new(Mutex::new(http_func));
+ let make_service = make_service_fn(move |_ctx| {
+ let http_func = http_func.clone();
+ async move {
+ let http_func = http_func.clone();
+ Ok::<_, hyper::Error>(service_fn(move |req| {
+ let http_func = http_func.clone();
+ async move { Ok::<_, hyper::Error>((&mut *http_func.lock().unwrap())(req)) }
+ }))
+ }
+ });
+
+ let (local_addr_sender, local_addr_receiver) = tokio::sync::oneshot::channel();
+
+ // Spawn the server in the background so that it can handle requests
+ rt.spawn(async move {
+ // Bind the server, retrieve the local port that was selected in the end and set this as
+ // the location property on the source
+ let server = Server::bind(&addr).serve(make_service);
+ let local_addr = server.local_addr();
+
+ local_addr_sender.send(local_addr).unwrap();
+
+ if let Err(e) = server.await {
+ let _ = sender.send(Message::ServerError(format!("{:?}", e)));
+ }
+ });
+
+ let local_addr = futures::executor::block_on(local_addr_receiver).unwrap();
+ src.set_property("location", &format!("http://{}/", local_addr))
+ .unwrap();
+
+ // Let the test setup anything needed on the HTTP source now
+ setup_func(&src);
+
+ Harness {
+ src,
+ pad,
+ receiver: Some(receiver),
+ rt: Some(rt),
+ }
+ }
+
+ fn wait_for_error(&mut self) -> glib::Error {
+ loop {
+ match self.receiver.as_mut().unwrap().recv().unwrap() {
+ Message::ServerError(err) => {
+ panic!("Got server error: {}", err);
+ }
+ Message::Event(ev) => {
+ use gst::EventView;
+
+ match ev.view() {
+ EventView::Eos(_) => {
+ panic!("Got EOS but expected error");
+ }
+ _ => (),
+ }
+ }
+ Message::Message(msg) => {
+ use gst::MessageView;
+
+ match msg.view() {
+ MessageView::Error(err) => {
+ return err.get_error();
+ }
+ _ => (),
+ }
+ }
+ Message::Buffer(_buffer) => {
+ panic!("Got buffer but expected error");
+ }
+ }
+ }
+ }
+
+ fn wait_for_state_change(&mut self) -> gst::State {
+ loop {
+ match self.receiver.as_mut().unwrap().recv().unwrap() {
+ Message::ServerError(err) => {
+ panic!("Got server error: {}", err);
+ }
+ Message::Event(ev) => {
+ use gst::EventView;
+
+ match ev.view() {
+ EventView::Eos(_) => {
+ panic!("Got EOS but expected state change");
+ }
+ _ => (),
+ }
+ }
+ Message::Message(msg) => {
+ use gst::MessageView;
+
+ match msg.view() {
+ MessageView::StateChanged(state) => {
+ return state.get_current();
+ }
+ MessageView::Error(err) => {
+ panic!(
+ "Got error: {} ({})",
+ err.get_error(),
+ err.get_debug().unwrap_or_else(|| String::from("None"))
+ );
+ }
+ _ => (),
+ }
+ }
+ Message::Buffer(_buffer) => {
+ panic!("Got buffer but expected state change");
+ }
+ }
+ }
+ }
+
+ fn wait_for_segment(
+ &mut self,
+ allow_buffer: bool,
+ ) -> gst::FormattedSegment<gst::format::Bytes> {
+ loop {
+ match self.receiver.as_mut().unwrap().recv().unwrap() {
+ Message::ServerError(err) => {
+ panic!("Got server error: {}", err);
+ }
+ Message::Event(ev) => {
+ use gst::EventView;
+
+ match ev.view() {
+ EventView::Segment(seg) => {
+ return seg
+ .get_segment()
+ .clone()
+ .downcast::<gst::format::Bytes>()
+ .unwrap();
+ }
+ _ => (),
+ }
+ }
+ Message::Message(msg) => {
+ use gst::MessageView;
+
+ match msg.view() {
+ MessageView::Error(err) => {
+ panic!(
+ "Got error: {} ({})",
+ err.get_error(),
+ err.get_debug().unwrap_or_else(|| String::from("None"))
+ );
+ }
+ _ => (),
+ }
+ }
+ Message::Buffer(_buffer) => {
+ if !allow_buffer {
+ panic!("Got buffer but expected segment");
+ }
+ }
+ }
+ }
+ }
+
+ /// Wait until a buffer is available or EOS was reached
+ ///
+ /// This function will panic on errors.
+ fn wait_buffer_or_eos(&mut self) -> Option<gst::Buffer> {
+ loop {
+ match self.receiver.as_mut().unwrap().recv().unwrap() {
+ Message::ServerError(err) => {
+ panic!("Got server error: {}", err);
+ }
+ Message::Event(ev) => {
+ use gst::EventView;
+
+ match ev.view() {
+ EventView::Eos(_) => return None,
+ _ => (),
+ }
+ }
+ Message::Message(msg) => {
+ use gst::MessageView;
+
+ match msg.view() {
+ MessageView::Error(err) => {
+ panic!(
+ "Got error: {} ({})",
+ err.get_error(),
+ err.get_debug().unwrap_or_else(|| String::from("None"))
+ );
+ }
+ _ => (),
+ }
+ }
+ Message::Buffer(buffer) => return Some(buffer),
+ }
+ }
+ }
+
+ /// Run some code asynchronously on another thread with the HTTP source
+ fn run<F: FnOnce(&gst::Element) + Send + 'static>(&self, func: F) {
+ self.src.call_async(move |src| func(src));
+ }
+}
+
+impl Drop for Harness {
+ fn drop(&mut self) {
+ // Shut down everything that was set up for this test harness
+ // and wait until the tokio runtime exited
+ let bus = self.src.get_bus().unwrap();
+ bus.set_flushing(true);
+
+ // Drop the receiver first before setting the state so that
+ // any threads that might still be blocked on the sender
+ // are immediately unblocked
+ self.receiver.take().unwrap();
+
+ self.pad.set_active(false).unwrap();
+ self.src.set_state(gst::State::Null).unwrap();
+
+ self.rt.take().unwrap();
+ }
+}
+
+#[test]
+fn test_basic_request() {
+ use std::io::{Cursor, Read};
+ init();
+
+ // Set up a harness that returns "Hello World" for any HTTP request and checks if the
+ // default headers are all sent
+ let mut h = Harness::new(
+ |req| {
+ use hyper::{Body, Response};
+
+ let headers = req.headers();
+ assert_eq!(headers.get("connection").unwrap(), "keep-alive");
+ assert_eq!(headers.get("accept-encoding").unwrap(), "identity");
+ assert_eq!(headers.get("icy-metadata").unwrap(), "1");
+
+ Response::new(Body::from("Hello World"))
+ },
+ |_src| {
+ // No additional setup needed here
+ },
+ );
+
+ // Set the HTTP source to Playing so that everything can start
+ h.run(|src| {
+ src.set_state(gst::State::Playing).unwrap();
+ });
+
+ // And now check if the data we receive is exactly what we expect it to be
+ let expected_output = "Hello World";
+ let mut cursor = Cursor::new(expected_output);
+
+ while let Some(buffer) = h.wait_buffer_or_eos() {
+ // On the first buffer also check if the duration reported by the HTTP source is what we
+ // would expect it to be
+ if cursor.position() == 0 {
+ assert_eq!(
+ h.src.query_duration::<gst::format::Bytes>(),
+ Some(gst::format::Bytes::from(expected_output.len() as u64))
+ );
+ }
+
+ // Map the buffer readable and check if it contains exactly the data we would expect at
+ // this point after reading everything else we read in previous runs
+ let map = buffer.map_readable().unwrap();
+ let mut read_buf = vec![0; map.get_size()];
+ assert_eq!(cursor.read(&mut read_buf).unwrap(), map.get_size());
+ assert_eq!(&*map, &*read_buf);
+ }
+
+ // Check if everything was read
+ assert_eq!(cursor.position(), 11);
+}
+
+#[test]
+fn test_basic_request_inverted_defaults() {
+ use std::io::{Cursor, Read};
+ init();
+
+ // Set up a harness that returns "Hello World" for any HTTP request and override various
+ // default properties to check if the corresponding headers are set correctly
+ let mut h = Harness::new(
+ |req| {
+ use hyper::{Body, Response};
+
+ let headers = req.headers();
+ assert_eq!(headers.get("connection").unwrap(), "close");
+ assert_eq!(headers.get("accept-encoding").unwrap(), "gzip");
+ assert_eq!(headers.get("icy-metadata"), None);
+ assert_eq!(headers.get("user-agent").unwrap(), "test user-agent");
+
+ Response::new(Body::from("Hello World"))
+ },
+ |src| {
+ src.set_property("keep-alive", &false).unwrap();
+ src.set_property("compress", &true).unwrap();
+ src.set_property("iradio-mode", &false).unwrap();
+ src.set_property("user-agent", &"test user-agent").unwrap();
+ },
+ );
+
+ // Set the HTTP source to Playing so that everything can start
+ h.run(|src| {
+ src.set_state(gst::State::Playing).unwrap();
+ });
+
+ // And now check if the data we receive is exactly what we expect it to be
+ let expected_output = "Hello World";
+ let mut cursor = Cursor::new(expected_output);
+
+ while let Some(buffer) = h.wait_buffer_or_eos() {
+ // On the first buffer also check if the duration reported by the HTTP source is what we
+ // would expect it to be
+ if cursor.position() == 0 {
+ assert_eq!(
+ h.src.query_duration::<gst::format::Bytes>(),
+ Some(gst::format::Bytes::from(expected_output.len() as u64))
+ );
+ }
+
+ // Map the buffer readable and check if it contains exactly the data we would expect at
+ // this point after reading everything else we read in previous runs
+ let map = buffer.map_readable().unwrap();
+ let mut read_buf = vec![0; map.get_size()];
+ assert_eq!(cursor.read(&mut read_buf).unwrap(), map.get_size());
+ assert_eq!(&*map, &*read_buf);
+ }
+
+ // Check if everything was read
+ assert_eq!(cursor.position(), 11);
+}
+
+#[test]
+fn test_extra_headers() {
+ use std::io::{Cursor, Read};
+ init();
+
+ // Set up a harness that returns "Hello World" for any HTTP request and check if the
+ // extra-headers property works correctly for setting additional headers
+ let mut h = Harness::new(
+ |req| {
+ use hyper::{Body, Response};
+
+ let headers = req.headers();
+ assert_eq!(headers.get("foo").unwrap(), "bar");
+ assert_eq!(headers.get("baz").unwrap(), "1");
+ assert_eq!(
+ headers
+ .get_all("list")
+ .iter()
+ .map(|v| v.to_str().unwrap())
+ .collect::<Vec<&str>>(),
+ vec!["1", "2"]
+ );
+ assert_eq!(
+ headers
+ .get_all("array")
+ .iter()
+ .map(|v| v.to_str().unwrap())
+ .collect::<Vec<&str>>(),
+ vec!["1", "2"]
+ );
+
+ Response::new(Body::from("Hello World"))
+ },
+ |src| {
+ src.set_property(
+ "extra-headers",
+ &gst::Structure::builder("headers")
+ .field("foo", &"bar")
+ .field("baz", &1i32)
+ .field("list", &gst::List::new(&[&1i32, &2i32]))
+ .field("array", &gst::Array::new(&[&1i32, &2i32]))
+ .build(),
+ )
+ .unwrap();
+ },
+ );
+
+ // Set the HTTP source to Playing so that everything can start
+ h.run(|src| {
+ src.set_state(gst::State::Playing).unwrap();
+ });
+
+ // And now check if the data we receive is exactly what we expect it to be
+ let expected_output = "Hello World";
+ let mut cursor = Cursor::new(expected_output);
+
+ while let Some(buffer) = h.wait_buffer_or_eos() {
+ // On the first buffer also check if the duration reported by the HTTP source is what we
+ // would expect it to be
+ if cursor.position() == 0 {
+ assert_eq!(
+ h.src.query_duration::<gst::format::Bytes>(),
+ Some(gst::format::Bytes::from(expected_output.len() as u64))
+ );
+ }
+
+ // Map the buffer readable and check if it contains exactly the data we would expect at
+ // this point after reading everything else we read in previous runs
+ let map = buffer.map_readable().unwrap();
+ let mut read_buf = vec![0; map.get_size()];
+ assert_eq!(cursor.read(&mut read_buf).unwrap(), map.get_size());
+ assert_eq!(&*map, &*read_buf);
+ }
+
+ // Check if everything was read
+ assert_eq!(cursor.position(), 11);
+}
+
+#[test]
+fn test_cookies_property() {
+ use std::io::{Cursor, Read};
+ init();
+
+ // Set up a harness that returns "Hello World" for any HTTP request and check if the
+ // cookies property can be used to set cookies correctly
+ let mut h = Harness::new(
+ |req| {
+ use hyper::{Body, Response};
+
+ let headers = req.headers();
+ assert_eq!(headers.get("cookie").unwrap(), "foo=1; bar=2; baz=3");
+
+ Response::new(Body::from("Hello World"))
+ },
+ |src| {
+ src.set_property(
+ "cookies",
+ &vec![
+ String::from("foo=1"),
+ String::from("bar=2"),
+ String::from("baz=3"),
+ ],
+ )
+ .unwrap();
+ },
+ );
+
+ // Set the HTTP source to Playing so that everything can start
+ h.run(|src| {
+ src.set_state(gst::State::Playing).unwrap();
+ });
+
+ // And now check if the data we receive is exactly what we expect it to be
+ let expected_output = "Hello World";
+ let mut cursor = Cursor::new(expected_output);
+
+ while let Some(buffer) = h.wait_buffer_or_eos() {
+ // On the first buffer also check if the duration reported by the HTTP source is what we
+ // would expect it to be
+ if cursor.position() == 0 {
+ assert_eq!(
+ h.src.query_duration::<gst::format::Bytes>(),
+ Some(gst::format::Bytes::from(expected_output.len() as u64))
+ );
+ }
+
+ // Map the buffer readable and check if it contains exactly the data we would expect at
+ // this point after reading everything else we read in previous runs
+ let map = buffer.map_readable().unwrap();
+ let mut read_buf = vec![0; map.get_size()];
+ assert_eq!(cursor.read(&mut read_buf).unwrap(), map.get_size());
+ assert_eq!(&*map, &*read_buf);
+ }
+
+ // Check if everything was read
+ assert_eq!(cursor.position(), 11);
+}
+
+#[test]
+fn test_iradio_mode() {
+ use std::io::{Cursor, Read};
+ init();
+
+ // Set up a harness that returns "Hello World" for any HTTP request and check if the
+ // iradio-mode property works correctly, and especially the icy- headers are parsed correctly
+ // and put into caps/tags
+ let mut h = Harness::new(
+ |req| {
+ use hyper::{Body, Response};
+
+ let headers = req.headers();
+ assert_eq!(headers.get("icy-metadata").unwrap(), "1");
+
+ Response::builder()
+ .header("icy-metaint", "8192")
+ .header("icy-name", "Name")
+ .header("icy-genre", "Genre")
+ .header("icy-url", "http://www.example.com")
+ .header("Content-Type", "audio/mpeg; rate=44100")
+ .body(Body::from("Hello World"))
+ .unwrap()
+ },
+ |_src| {
+ // No additional setup needed here
+ },
+ );
+
+ // Set the HTTP source to Playing so that everything can start
+ h.run(|src| {
+ src.set_state(gst::State::Playing).unwrap();
+ });
+
+ // And now check if the data we receive is exactly what we expect it to be
+ let expected_output = "Hello World";
+ let mut cursor = Cursor::new(expected_output);
+
+ while let Some(buffer) = h.wait_buffer_or_eos() {
+ // On the first buffer also check if the duration reported by the HTTP source is what we
+ // would expect it to be
+ if cursor.position() == 0 {
+ assert_eq!(
+ h.src.query_duration::<gst::format::Bytes>(),
+ Some(gst::format::Bytes::from(expected_output.len() as u64))
+ );
+ }
+
+ // Map the buffer readable and check if it contains exactly the data we would expect at
+ // this point after reading everything else we read in previous runs
+ let map = buffer.map_readable().unwrap();
+ let mut read_buf = vec![0; map.get_size()];
+ assert_eq!(cursor.read(&mut read_buf).unwrap(), map.get_size());
+ assert_eq!(&*map, &*read_buf);
+ }
+
+ // Check if everything was read
+ assert_eq!(cursor.position(), 11);
+
+ let srcpad = h.src.get_static_pad("src").unwrap();
+ let caps = srcpad.get_current_caps().unwrap();
+ assert_eq!(
+ caps,
+ gst::Caps::builder("application/x-icy")
+ .field("metadata-interval", &8192i32)
+ .field("content-type", &"audio/mpeg; rate=44100")
+ .build()
+ );
+
+ {
+ use gst::EventView;
+ let tag_event = srcpad.get_sticky_event(gst::EventType::Tag, 0).unwrap();
+ if let EventView::Tag(tags) = tag_event.view() {
+ let tags = tags.get_tag();
+ assert_eq!(
+ tags.get::<gst::tags::Organization>().unwrap().get(),
+ Some("Name")
+ );
+ assert_eq!(tags.get::<gst::tags::Genre>().unwrap().get(), Some("Genre"));
+ assert_eq!(
+ tags.get::<gst::tags::Location>().unwrap().get(),
+ Some("http://www.example.com"),
+ );
+ } else {
+ unreachable!();
+ }
+ }
+}
+
+#[test]
+fn test_audio_l16() {
+ use std::io::{Cursor, Read};
+ init();
+
+ // Set up a harness that returns "Hello World" for any HTTP request and check if the
+ // audio/L16 content type is parsed correctly and put into the caps
+ let mut h = Harness::new(
+ |_req| {
+ use hyper::{Body, Response};
+
+ Response::builder()
+ .header("Content-Type", "audio/L16; rate=48000; channels=2")
+ .body(Body::from("Hello World"))
+ .unwrap()
+ },
+ |_src| {
+ // No additional setup needed here
+ },
+ );
+
+ // Set the HTTP source to Playing so that everything can start
+ h.run(|src| {
+ src.set_state(gst::State::Playing).unwrap();
+ });
+
+ // And now check if the data we receive is exactly what we expect it to be
+ let expected_output = "Hello World";
+ let mut cursor = Cursor::new(expected_output);
+
+ while let Some(buffer) = h.wait_buffer_or_eos() {
+ // On the first buffer also check if the duration reported by the HTTP source is what we
+ // would expect it to be
+ if cursor.position() == 0 {
+ assert_eq!(
+ h.src.query_duration::<gst::format::Bytes>(),
+ Some(gst::format::Bytes::from(expected_output.len() as u64))
+ );
+ }
+
+ // Map the buffer readable and check if it contains exactly the data we would expect at
+ // this point after reading everything else we read in previous runs
+ let map = buffer.map_readable().unwrap();
+ let mut read_buf = vec![0; map.get_size()];
+ assert_eq!(cursor.read(&mut read_buf).unwrap(), map.get_size());
+ assert_eq!(&*map, &*read_buf);
+ }
+
+ // Check if everything was read
+ assert_eq!(cursor.position(), 11);
+
+ let srcpad = h.src.get_static_pad("src").unwrap();
+ let caps = srcpad.get_current_caps().unwrap();
+ assert_eq!(
+ caps,
+ gst::Caps::builder("audio/x-unaligned-raw")
+ .field("format", &"S16BE")
+ .field("layout", &"interleaved")
+ .field("channels", &2i32)
+ .field("rate", &48_000i32)
+ .build()
+ );
+}
+
+#[test]
+fn test_authorization() {
+ use std::io::{Cursor, Read};
+ init();
+
+ // Set up a harness that returns "Hello World" for any HTTP request
+ // but requires authentication first
+ let mut h = Harness::new(
+ |req| {
+ use hyper::{Body, Response};
+ use reqwest::StatusCode;
+
+ let headers = req.headers();
+
+ if let Some(authorization) = headers.get("authorization") {
+ assert_eq!(authorization, "Basic dXNlcjpwYXNzd29yZA==");
+ Response::new(Body::from("Hello World"))
+ } else {
+ Response::builder()
+ .status(StatusCode::UNAUTHORIZED.as_u16())
+ .header("WWW-Authenticate", "Basic realm=\"realm\"")
+ .body(Body::empty())
+ .unwrap()
+ }
+ },
+ |src| {
+ src.set_property("user-id", &"user").unwrap();
+ src.set_property("user-pw", &"password").unwrap();
+ },
+ );
+
+ // Set the HTTP source to Playing so that everything can start
+ h.run(|src| {
+ src.set_state(gst::State::Playing).unwrap();
+ });
+
+ // And now check if the data we receive is exactly what we expect it to be
+ let expected_output = "Hello World";
+ let mut cursor = Cursor::new(expected_output);
+
+ while let Some(buffer) = h.wait_buffer_or_eos() {
+ // On the first buffer also check if the duration reported by the HTTP source is what we
+ // would expect it to be
+ if cursor.position() == 0 {
+ assert_eq!(
+ h.src.query_duration::<gst::format::Bytes>(),
+ Some(gst::format::Bytes::from(expected_output.len() as u64))
+ );
+ }
+
+ // Map the buffer readable and check if it contains exactly the data we would expect at
+ // this point after reading everything else we read in previous runs
+ let map = buffer.map_readable().unwrap();
+ let mut read_buf = vec![0; map.get_size()];
+ assert_eq!(cursor.read(&mut read_buf).unwrap(), map.get_size());
+ assert_eq!(&*map, &*read_buf);
+ }
+
+ // Check if everything was read
+ assert_eq!(cursor.position(), 11);
+}
+
+#[test]
+fn test_404_error() {
+ use reqwest::StatusCode;
+ init();
+
+ // Harness that always returns 404 and we check if that is mapped to the correct error code
+ let mut h = Harness::new(
+ |_req| {
+ use hyper::{Body, Response};
+
+ Response::builder()
+ .status(StatusCode::NOT_FOUND.as_u16())
+ .body(Body::empty())
+ .unwrap()
+ },
+ |_src| {},
+ );
+
+ h.run(|src| {
+ let _ = src.set_state(gst::State::Playing);
+ });
+
+ let err_code = h.wait_for_error();
+ if let Some(err) = err_code.kind::<gst::ResourceError>() {
+ assert_eq!(err, gst::ResourceError::NotFound);
+ }
+}
+
+#[test]
+fn test_403_error() {
+ use reqwest::StatusCode;
+ init();
+
+ // Harness that always returns 403 and we check if that is mapped to the correct error code
+ let mut h = Harness::new(
+ |_req| {
+ use hyper::{Body, Response};
+
+ Response::builder()
+ .status(StatusCode::FORBIDDEN.as_u16())
+ .body(Body::empty())
+ .unwrap()
+ },
+ |_src| {},
+ );
+
+ h.run(|src| {
+ let _ = src.set_state(gst::State::Playing);
+ });
+
+ let err_code = h.wait_for_error();
+ if let Some(err) = err_code.kind::<gst::ResourceError>() {
+ assert_eq!(err, gst::ResourceError::NotAuthorized);
+ }
+}
+
+#[test]
+fn test_network_error() {
+ init();
+
+ // Harness that always fails with a network error
+ let mut h = Harness::new(
+ |_req| unreachable!(),
+ |src| {
+ src.set_property("location", &"http://0.0.0.0:0").unwrap();
+ },
+ );
+
+ h.run(|src| {
+ let _ = src.set_state(gst::State::Playing);
+ });
+
+ let err_code = h.wait_for_error();
+ if let Some(err) = err_code.kind::<gst::ResourceError>() {
+ assert_eq!(err, gst::ResourceError::OpenRead);
+ }
+}
+
+#[test]
+fn test_seek_after_ready() {
+ use std::io::{Cursor, Read};
+ init();
+
+ // Harness that checks if seeking in Ready state works correctly
+ let mut h = Harness::new(
+ |req| {
+ use hyper::{Body, Response};
+
+ let headers = req.headers();
+ if let Some(range) = headers.get("Range") {
+ if range == "bytes=123-" {
+ let mut data_seek = vec![0; 8192 - 123];
+ for (i, d) in data_seek.iter_mut().enumerate() {
+ *d = (i + 123 % 256) as u8;
+ }
+
+ Response::builder()
+ .header("content-length", 8192 - 123)
+ .header("accept-ranges", "bytes")
+ .header("content-range", "bytes 123-8192/8192")
+ .body(Body::from(data_seek))
+ .unwrap()
+ } else {
+ panic!("Received an unexpected Range header")
+ }
+ } else {
+ // `panic!("Received no Range header")` should be called here but due to a bug
+ // in `basesrc` we cant do that here. If we do a seek in READY state, basesrc
+ // will do a `start()` call without seek. Once we get seek forwarded, the call
+ // with seek is made. This issue has to be solved.
+ // issue link: https://gitlab.freedesktop.org/gstreamer/gstreamer/issues/413
+ let mut data_full = vec![0; 8192];
+ for (i, d) in data_full.iter_mut().enumerate() {
+ *d = (i % 256) as u8;
+ }
+
+ Response::builder()
+ .header("content-length", 8192)
+ .header("accept-ranges", "bytes")
+ .body(Body::from(data_full))
+ .unwrap()
+ }
+ },
+ |_src| {},
+ );
+
+ h.run(|src| {
+ src.set_state(gst::State::Ready).unwrap();
+ });
+
+ let current_state = h.wait_for_state_change();
+ assert_eq!(current_state, gst::State::Ready);
+
+ h.run(|src| {
+ src.seek_simple(gst::SeekFlags::FLUSH, gst::format::Bytes::from(123))
+ .unwrap();
+ src.set_state(gst::State::Playing).unwrap();
+ });
+
+ let segment = h.wait_for_segment(false);
+ assert_eq!(segment.get_start(), gst::format::Bytes::from(123));
+
+ let mut expected_output = vec![0; 8192 - 123];
+ for (i, d) in expected_output.iter_mut().enumerate() {
+ *d = ((123 + i) % 256) as u8;
+ }
+ let mut cursor = Cursor::new(expected_output);
+
+ while let Some(buffer) = h.wait_buffer_or_eos() {
+ assert_eq!(buffer.get_offset(), 123 + cursor.position());
+
+ let map = buffer.map_readable().unwrap();
+ let mut read_buf = vec![0; map.get_size()];
+
+ assert_eq!(cursor.read(&mut read_buf).unwrap(), map.get_size());
+ assert_eq!(&*map, &*read_buf);
+ }
+}
+
+#[test]
+fn test_seek_after_buffer_received() {
+ use std::io::{Cursor, Read};
+ init();
+
+ // Harness that checks if seeking in Playing state after having received a buffer works
+ // correctly
+ let mut h = Harness::new(
+ |req| {
+ use hyper::{Body, Response};
+
+ let headers = req.headers();
+ if let Some(range) = headers.get("Range") {
+ if range == "bytes=123-" {
+ let mut data_seek = vec![0; 8192 - 123];
+ for (i, d) in data_seek.iter_mut().enumerate() {
+ *d = (i + 123 % 256) as u8;
+ }
+
+ Response::builder()
+ .header("content-length", 8192 - 123)
+ .header("accept-ranges", "bytes")
+ .header("content-range", "bytes 123-8192/8192")
+ .body(Body::from(data_seek))
+ .unwrap()
+ } else {
+ panic!("Received an unexpected Range header")
+ }
+ } else {
+ let mut data_full = vec![0; 8192];
+ for (i, d) in data_full.iter_mut().enumerate() {
+ *d = (i % 256) as u8;
+ }
+
+ Response::builder()
+ .header("content-length", 8192)
+ .header("accept-ranges", "bytes")
+ .body(Body::from(data_full))
+ .unwrap()
+ }
+ },
+ |_src| {},
+ );
+
+ h.run(|src| {
+ src.set_state(gst::State::Playing).unwrap();
+ });
+
+ //wait for a buffer
+ let buffer = h.wait_buffer_or_eos().unwrap();
+ assert_eq!(buffer.get_offset(), 0);
+
+ //seek to a position after a buffer is Received
+ h.run(|src| {
+ src.seek_simple(gst::SeekFlags::FLUSH, gst::format::Bytes::from(123))
+ .unwrap();
+ });
+
+ let segment = h.wait_for_segment(true);
+ assert_eq!(segment.get_start(), gst::format::Bytes::from(123));
+
+ let mut expected_output = vec![0; 8192 - 123];
+ for (i, d) in expected_output.iter_mut().enumerate() {
+ *d = ((123 + i) % 256) as u8;
+ }
+ let mut cursor = Cursor::new(expected_output);
+
+ while let Some(buffer) = h.wait_buffer_or_eos() {
+ assert_eq!(buffer.get_offset(), 123 + cursor.position());
+
+ let map = buffer.map_readable().unwrap();
+ let mut read_buf = vec![0; map.get_size()];
+
+ assert_eq!(cursor.read(&mut read_buf).unwrap(), map.get_size());
+ assert_eq!(&*map, &*read_buf);
+ }
+}
+
+#[test]
+fn test_seek_with_stop_position() {
+ use std::io::{Cursor, Read};
+ init();
+
+ // Harness that checks if seeking in Playing state after having received a buffer works
+ // correctly
+ let mut h = Harness::new(
+ |req| {
+ use hyper::{Body, Response};
+
+ let headers = req.headers();
+ if let Some(range) = headers.get("Range") {
+ if range == "bytes=123-130" {
+ let mut data_seek = vec![0; 8];
+ for (i, d) in data_seek.iter_mut().enumerate() {
+ *d = (i + 123 % 256) as u8;
+ }
+
+ Response::builder()
+ .header("content-length", 8)
+ .header("accept-ranges", "bytes")
+ .header("content-range", "bytes 123-130/8192")
+ .body(Body::from(data_seek))
+ .unwrap()
+ } else {
+ panic!("Received an unexpected Range header")
+ }
+ } else {
+ let mut data_full = vec![0; 8192];
+ for (i, d) in data_full.iter_mut().enumerate() {
+ *d = (i % 256) as u8;
+ }
+
+ Response::builder()
+ .header("content-length", 8192)
+ .header("accept-ranges", "bytes")
+ .body(Body::from(data_full))
+ .unwrap()
+ }
+ },
+ |_src| {},
+ );
+
+ h.run(|src| {
+ src.set_state(gst::State::Playing).unwrap();
+ });
+
+ //wait for a buffer
+ let buffer = h.wait_buffer_or_eos().unwrap();
+ assert_eq!(buffer.get_offset(), 0);
+
+ //seek to a position after a buffer is Received
+ h.run(|src| {
+ src.seek(
+ 1.0,
+ gst::SeekFlags::FLUSH,
+ gst::SeekType::Set,
+ gst::format::Bytes::from(123),
+ gst::SeekType::Set,
+ gst::format::Bytes::from(131),
+ )
+ .unwrap();
+ });
+
+ let segment = h.wait_for_segment(true);
+ assert_eq!(segment.get_start(), gst::format::Bytes::from(123));
+ assert_eq!(segment.get_stop(), gst::format::Bytes::from(131));
+
+ let mut expected_output = vec![0; 8];
+ for (i, d) in expected_output.iter_mut().enumerate() {
+ *d = ((123 + i) % 256) as u8;
+ }
+ let mut cursor = Cursor::new(expected_output);
+
+ while let Some(buffer) = h.wait_buffer_or_eos() {
+ assert_eq!(buffer.get_offset(), 123 + cursor.position());
+
+ let map = buffer.map_readable().unwrap();
+ let mut read_buf = vec![0; map.get_size()];
+
+ assert_eq!(cursor.read(&mut read_buf).unwrap(), map.get_size());
+ assert_eq!(&*map, &*read_buf);
+ }
+}
+
+#[test]
+fn test_cookies() {
+ init();
+
+ // Set up a harness that returns "Hello World" for any HTTP request and sets a cookie in our
+ // client
+ let mut h = Harness::new(
+ |_req| {
+ use hyper::{Body, Response};
+
+ Response::builder()
+ .header("Set-Cookie", "foo=bar")
+ .body(Body::from("Hello World"))
+ .unwrap()
+ },
+ |_src| {
+ // No additional setup needed here
+ },
+ );
+
+ // Set the HTTP source to Playing so that everything can start
+ h.run(|src| {
+ src.set_state(gst::State::Playing).unwrap();
+ });
+
+ let mut num_bytes = 0;
+ while let Some(buffer) = h.wait_buffer_or_eos() {
+ num_bytes += buffer.get_size();
+ }
+ assert_eq!(num_bytes, 11);
+
+ // Set up a second harness that returns "Hello World" for any HTTP request that checks if our
+ // client provides the cookie that was set in the previous request
+ let mut h2 = Harness::new(
+ |req| {
+ use hyper::{Body, Response};
+
+ let headers = req.headers();
+ let cookies = headers
+ .get("Cookie")
+ .expect("No cookies set")
+ .to_str()
+ .unwrap();
+ assert!(cookies.split(';').any(|c| c == "foo=bar"));
+ Response::builder()
+ .body(Body::from("Hello again!"))
+ .unwrap()
+ },
+ |_src| {
+ // No additional setup needed here
+ },
+ );
+
+ let context = h.src.get_context("gst.reqwest.client").expect("No context");
+ h2.src.set_context(&context);
+
+ // Set the HTTP source to Playing so that everything can start
+ h2.run(|src| {
+ src.set_state(gst::State::Playing).unwrap();
+ });
+
+ let mut num_bytes = 0;
+ while let Some(buffer) = h2.wait_buffer_or_eos() {
+ num_bytes += buffer.get_size();
+ }
+ assert_eq!(num_bytes, 12);
+}
diff --git a/net/gst-plugin-rusoto/Cargo.toml b/net/gst-plugin-rusoto/Cargo.toml
new file mode 100644
index 000000000..657772223
--- /dev/null
+++ b/net/gst-plugin-rusoto/Cargo.toml
@@ -0,0 +1,29 @@
+[package]
+name = "gst-plugin-rusoto"
+version = "0.6.0"
+authors = ["Arun Raghavan <arun@arunraghavan.net>"]
+repository = "https://gitlab.freedesktop.org/gstreamer/gst-plugin-rs"
+license = "MIT/Apache-2.0"
+description = "Amazon S3 Plugin"
+edition = "2018"
+
+[dependencies]
+bytes = "0.4"
+futures = "0.1"
+glib = { git = "https://github.com/gtk-rs/glib" }
+gstreamer = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_12"] }
+gstreamer-base = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_12"] }
+rusoto_core = "0.42"
+rusoto_s3 = "0.42"
+url = "2"
+percent-encoding = "2"
+tokio = "0.1"
+lazy_static = "1.0"
+
+[lib]
+name = "gstrusoto"
+crate-type = ["cdylib", "rlib"]
+path = "src/lib.rs"
+
+[build-dependencies]
+gst-plugin-version-helper = { path="../../gst-plugin-version-helper" }
diff --git a/net/gst-plugin-rusoto/README.md b/net/gst-plugin-rusoto/README.md
new file mode 100644
index 000000000..9b86b7468
--- /dev/null
+++ b/net/gst-plugin-rusoto/README.md
@@ -0,0 +1,49 @@
+# gst-plugin-s3
+
+This is a [GStreamer](https://gstreamer.freedesktop.org/) plugin to interact
+with the [Amazon Simple Storage Service (S3)](https://aws.amazon.com/s3/).
+
+Currently, a simple source element exists. The eventual plan is to also add a
+sink, to allow writing out objects directly to S3.
+
+## AWS Credentials
+
+AWS credentials are picked up using the mechanism that
+[rusoto's ChainProvider](http://rusoto.github.io/rusoto/rusoto/struct.ChainProvider.html)
+uses. At the moment, that is:
+
+ 1. Environment variables: `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`
+ 2. AWS credentials file. Usually located at ~/.aws/credentials.
+ 3. IAM instance profile. Will only work if running on an EC2 instance with an instance profile/role.
+
+An example credentials file might look like:
+
+```ini
+[default]
+aws_access_key_id = ...
+aws_secret_access_key = ...
+```
+
+## s3src
+
+Reads from a given S3 (region, bucket, object, version?) tuple. The version may
+be omitted, in which case the default behaviour of fetching the latest version
+applies.
+
+```
+$ gst-launch-1.0 \
+ s3src uri=s3://ap-south-1/my-bucket/my-object-key/which-can-have-slashes?version=my-optional-version !
+ filesink name=my-object.out
+```
+
+## s3sink
+
+Writes data to a specified S3 bucket. The `region` parameter is optional, and if not specified, the default parameter will be used (from `.aws/config` file).
+
+```
+$ gst-launch-1.0 \
+ videotestsrc ! \
+ theoraenc ! \
+ oggmux ! \
+ s3sink bucket=example-bucket key=my/file.ogv region=us-west-1
+```
diff --git a/net/gst-plugin-rusoto/build.rs b/net/gst-plugin-rusoto/build.rs
new file mode 100644
index 000000000..0d1ddb61d
--- /dev/null
+++ b/net/gst-plugin-rusoto/build.rs
@@ -0,0 +1,5 @@
+extern crate gst_plugin_version_helper;
+
+fn main() {
+ gst_plugin_version_helper::get_info()
+}
diff --git a/net/gst-plugin-rusoto/src/lib.rs b/net/gst-plugin-rusoto/src/lib.rs
new file mode 100644
index 000000000..245e9626d
--- /dev/null
+++ b/net/gst-plugin-rusoto/src/lib.rs
@@ -0,0 +1,41 @@
+// Copyright (C) 2017 Author: Arun Raghavan <arun@arunraghavan.net>
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+#![crate_type = "cdylib"]
+
+#[macro_use]
+extern crate glib;
+#[macro_use]
+extern crate gstreamer as gst;
+extern crate gstreamer_base as gst_base;
+#[macro_use]
+extern crate lazy_static;
+
+mod s3sink;
+mod s3src;
+mod s3url;
+mod s3utils;
+
+fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ s3sink::register(plugin)?;
+ s3src::register(plugin)?;
+
+ Ok(())
+}
+
+gst_plugin_define!(
+ rusoto,
+ env!("CARGO_PKG_DESCRIPTION"),
+ plugin_init,
+ concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")),
+ "MIT/X11",
+ env!("CARGO_PKG_NAME"),
+ env!("CARGO_PKG_NAME"),
+ env!("CARGO_PKG_REPOSITORY"),
+ env!("BUILD_REL_DATE")
+);
diff --git a/net/gst-plugin-rusoto/src/s3sink.rs b/net/gst-plugin-rusoto/src/s3sink.rs
new file mode 100644
index 000000000..3cd368223
--- /dev/null
+++ b/net/gst-plugin-rusoto/src/s3sink.rs
@@ -0,0 +1,566 @@
+// Copyright (C) 2019 Amazon.com, Inc. or its affiliates <mkolny@amazon.com>
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use glib::prelude::*;
+use glib::subclass;
+use glib::subclass::prelude::*;
+
+use gst;
+use gst::prelude::*;
+use gst::subclass::prelude::*;
+
+use gst_base;
+use gst_base::subclass::prelude::*;
+
+use futures::prelude::*;
+use futures::sync::oneshot;
+
+use rusoto_core::region::Region;
+
+use tokio::runtime;
+
+use rusoto_s3::{
+ CompleteMultipartUploadRequest, CompletedMultipartUpload, CompletedPart,
+ CreateMultipartUploadRequest, S3Client, UploadPartRequest, S3,
+};
+
+use std::convert::From;
+use std::str::FromStr;
+use std::sync::Mutex;
+
+use crate::s3utils;
+
+struct Started {
+ buffer: Vec<u8>,
+ upload_id: String,
+ part_number: i64,
+ completed_parts: Vec<CompletedPart>,
+}
+
+impl Started {
+ pub fn new(buffer: Vec<u8>, upload_id: String) -> Started {
+ Started {
+ buffer,
+ upload_id,
+ part_number: 0,
+ completed_parts: Vec::new(),
+ }
+ }
+
+ pub fn increment_part_number(&mut self) -> Result<i64, gst::ErrorMessage> {
+ // https://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html
+ const MAX_MULTIPART_NUMBER: i64 = 10000;
+
+ if self.part_number > MAX_MULTIPART_NUMBER {
+ return Err(gst_error_msg!(
+ gst::ResourceError::Failed,
+ [
+ "Maximum number of parts ({}) reached.",
+ MAX_MULTIPART_NUMBER
+ ]
+ ));
+ }
+
+ self.part_number += 1;
+ Ok(self.part_number)
+ }
+}
+
+enum State {
+ Stopped,
+ Started(Started),
+}
+
+impl Default for State {
+ fn default() -> State {
+ State::Stopped
+ }
+}
+
+const DEFAULT_BUFFER_SIZE: u64 = 5 * 1024 * 1024;
+
+struct Settings {
+ region: Region,
+ bucket: Option<String>,
+ key: Option<String>,
+ content_type: Option<String>,
+ buffer_size: u64,
+}
+
+pub struct S3Sink {
+ settings: Mutex<Settings>,
+ state: Mutex<State>,
+ runtime: runtime::Runtime,
+ canceller: Mutex<Option<oneshot::Sender<()>>>,
+ client: Mutex<S3Client>,
+}
+
+lazy_static! {
+ static ref CAT: gst::DebugCategory = gst::DebugCategory::new(
+ "rusotos3sink",
+ gst::DebugColorFlags::empty(),
+ Some("Amazon S3 Sink"),
+ );
+}
+
+impl Default for Settings {
+ fn default() -> Self {
+ Settings {
+ region: Region::default(),
+ bucket: None,
+ key: None,
+ content_type: None,
+ buffer_size: DEFAULT_BUFFER_SIZE,
+ }
+ }
+}
+
+static PROPERTIES: [subclass::Property; 4] = [
+ subclass::Property("bucket", |name| {
+ glib::ParamSpec::string(
+ name,
+ "S3 Bucket",
+ "The bucket of the file to write",
+ None,
+ glib::ParamFlags::READWRITE, /* + GST_PARAM_MUTABLE_READY) */
+ )
+ }),
+ subclass::Property("key", |name| {
+ glib::ParamSpec::string(
+ name,
+ "S3 Key",
+ "The key of the file to write",
+ None,
+ glib::ParamFlags::READWRITE, /* + GST_PARAM_MUTABLE_READY) */
+ )
+ }),
+ subclass::Property("region", |name| {
+ glib::ParamSpec::string(
+ name,
+ "AWS Region",
+ "An AWS region (e.g. eu-west-2).",
+ None,
+ glib::ParamFlags::READWRITE, /* + GST_PARAM_MUTABLE_READY) */
+ )
+ }),
+ subclass::Property("part-size", |name| {
+ glib::ParamSpec::uint64(
+ name,
+ "Part size",
+ "A size (in bytes) of an individual part used for multipart upload.",
+ 5 * 1024 * 1024, // 5 MB
+ 5 * 1024 * 1024 * 1024, // 5 GB
+ DEFAULT_BUFFER_SIZE,
+ glib::ParamFlags::READWRITE, /* + GST_PARAM_MUTABLE_READY) */
+ )
+ }),
+];
+
+impl S3Sink {
+ fn flush_current_buffer(
+ &self,
+ element: &gst_base::BaseSink,
+ ) -> Result<(), Option<gst::ErrorMessage>> {
+ let upload_part_req = self.create_upload_part_request()?;
+ let part_number = upload_part_req.part_number;
+
+ let upload_part_req_future = self
+ .client
+ .lock()
+ .unwrap()
+ .upload_part(upload_part_req)
+ .map_err(|err| {
+ gst_error_msg!(
+ gst::ResourceError::OpenWrite,
+ ["Failed to upload part: {}", err]
+ )
+ });
+
+ let output = s3utils::wait(&self.canceller, &self.runtime, upload_part_req_future)?;
+
+ let mut state = self.state.lock().unwrap();
+ let state = match *state {
+ State::Started(ref mut started_state) => started_state,
+ State::Stopped => {
+ unreachable!("Element should be started");
+ }
+ };
+ state.completed_parts.push(CompletedPart {
+ e_tag: output.e_tag,
+ part_number: Some(part_number),
+ });
+ gst_info!(CAT, obj: element, "Uploaded part {}", part_number);
+
+ Ok(())
+ }
+
+ fn create_upload_part_request(&self) -> Result<UploadPartRequest, gst::ErrorMessage> {
+ let settings = self.settings.lock().unwrap();
+ let mut state = self.state.lock().unwrap();
+ let state = match *state {
+ State::Started(ref mut started_state) => started_state,
+ State::Stopped => {
+ unreachable!("Element should be started");
+ }
+ };
+
+ let part_number = state.increment_part_number()?;
+ Ok(UploadPartRequest {
+ body: Some(rusoto_core::ByteStream::from(std::mem::replace(
+ &mut state.buffer,
+ Vec::with_capacity(settings.buffer_size as usize),
+ ))),
+ bucket: settings.bucket.as_ref().unwrap().to_owned(),
+ key: settings.key.as_ref().unwrap().to_owned(),
+ upload_id: state.upload_id.to_owned(),
+ part_number,
+ ..Default::default()
+ })
+ }
+
+ fn create_complete_multipart_upload_request(&self) -> CompleteMultipartUploadRequest {
+ let mut state = self.state.lock().unwrap();
+
+ let started_state = match *state {
+ State::Started(ref mut started_state) => started_state,
+ State::Stopped => unreachable!("Cannot stop before start"),
+ };
+
+ started_state
+ .completed_parts
+ .sort_by(|a, b| a.part_number.cmp(&b.part_number));
+
+ let completed_upload = CompletedMultipartUpload {
+ parts: Some(std::mem::replace(
+ &mut started_state.completed_parts,
+ Vec::new(),
+ )),
+ };
+
+ let settings = self.settings.lock().unwrap();
+ CompleteMultipartUploadRequest {
+ bucket: settings.bucket.as_ref().unwrap().to_owned(),
+ key: settings.key.as_ref().unwrap().to_owned(),
+ upload_id: started_state.upload_id.to_owned(),
+ multipart_upload: Some(completed_upload),
+ ..Default::default()
+ }
+ }
+
+ fn create_create_multipart_upload_request(
+ &self,
+ ) -> Result<CreateMultipartUploadRequest, gst::ErrorMessage> {
+ let settings = self.settings.lock().unwrap();
+ if settings.bucket.is_none() || settings.key.is_none() {
+ return Err(gst_error_msg!(
+ gst::ResourceError::Settings,
+ ["Bucket or key is not defined"]
+ ));
+ }
+
+ let bucket = settings.bucket.as_ref().unwrap();
+ let key = settings.key.as_ref().unwrap();
+
+ Ok(CreateMultipartUploadRequest {
+ bucket: bucket.clone(),
+ key: key.clone(),
+ content_type: settings.content_type.clone(),
+ ..Default::default()
+ })
+ }
+
+ fn finalize_upload(&self, element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> {
+ if self.flush_current_buffer(element).is_err() {
+ return Err(gst_error_msg!(
+ gst::ResourceError::Settings,
+ ["Failed to flush internal buffer."]
+ ));
+ }
+
+ let complete_req = self.create_complete_multipart_upload_request();
+ let complete_req_future = self
+ .client
+ .lock()
+ .unwrap()
+ .complete_multipart_upload(complete_req)
+ .map_err(|err| {
+ gst_error_msg!(
+ gst::ResourceError::Write,
+ ["Failed to complete multipart upload: {}.", err.to_string()]
+ )
+ });
+
+ s3utils::wait(&self.canceller, &self.runtime, complete_req_future)
+ .map_err(|err| {
+ err.unwrap_or_else(|| {
+ gst_error_msg!(gst::LibraryError::Failed, ["Interrupted during stop"])
+ })
+ })
+ .map(|_| ())
+ }
+
+ fn start(&self) -> Result<Started, gst::ErrorMessage> {
+ let create_multipart_req = self.create_create_multipart_upload_request()?;
+ let create_multipart_req_future = self
+ .client
+ .lock()
+ .unwrap()
+ .create_multipart_upload(create_multipart_req)
+ .map_err(|err| {
+ gst_error_msg!(
+ gst::ResourceError::OpenWrite,
+ ["Failed to create multipart upload: {}", err]
+ )
+ });
+ let response = s3utils::wait(&self.canceller, &self.runtime, create_multipart_req_future)
+ .map_err(|err| {
+ err.unwrap_or_else(|| {
+ gst_error_msg!(gst::LibraryError::Failed, ["Interrupted during start"])
+ })
+ })?;
+
+ let upload_id = response.upload_id.ok_or_else(|| {
+ gst_error_msg!(
+ gst::ResourceError::Failed,
+ ["Failed to get multipart upload ID"]
+ )
+ })?;
+
+ Ok(Started::new(
+ Vec::with_capacity(self.settings.lock().unwrap().buffer_size as usize),
+ upload_id,
+ ))
+ }
+
+ fn update_buffer(
+ &self,
+ src: &[u8],
+ element: &gst_base::BaseSink,
+ ) -> Result<(), Option<gst::ErrorMessage>> {
+ let mut state = self.state.lock().unwrap();
+ let started_state = match *state {
+ State::Started(ref mut started_state) => started_state,
+ State::Stopped => {
+ unreachable!("Element should be started already");
+ }
+ };
+
+ let to_copy = std::cmp::min(
+ started_state.buffer.capacity() - started_state.buffer.len(),
+ src.len(),
+ );
+
+ let (head, tail) = src.split_at(to_copy);
+ started_state.buffer.extend_from_slice(head);
+ let do_flush = started_state.buffer.capacity() == started_state.buffer.len();
+ drop(state);
+
+ if do_flush {
+ self.flush_current_buffer(element)?;
+ }
+
+ if to_copy < src.len() {
+ self.update_buffer(tail, element)?;
+ }
+
+ Ok(())
+ }
+
+ fn cancel(&self) {
+ let mut canceller = self.canceller.lock().unwrap();
+
+ if canceller.take().is_some() {
+ /* We don't do anything, the Sender will be dropped, and that will cause the
+ * Receiver to be cancelled */
+ }
+ }
+}
+
+impl ObjectSubclass for S3Sink {
+ const NAME: &'static str = "RusotoS3Sink";
+ type ParentType = gst_base::BaseSink;
+ type Instance = gst::subclass::ElementInstanceStruct<Self>;
+ type Class = subclass::simple::ClassStruct<Self>;
+
+ glib_object_subclass!();
+
+ fn new() -> Self {
+ Self {
+ settings: Mutex::new(Default::default()),
+ state: Mutex::new(Default::default()),
+ canceller: Mutex::new(None),
+ runtime: runtime::Builder::new()
+ .core_threads(1)
+ .name_prefix("rusotos3sink-runtime")
+ .build()
+ .unwrap(),
+ client: Mutex::new(S3Client::new(Region::default())),
+ }
+ }
+
+ fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
+ klass.set_metadata(
+ "Amazon S3 sink",
+ "Source/Network",
+ "Writes an object to Amazon S3",
+ "Marcin Kolny <mkolny@amazon.com>",
+ );
+
+ let caps = gst::Caps::new_any();
+ let sink_pad_template = gst::PadTemplate::new(
+ "sink",
+ gst::PadDirection::Sink,
+ gst::PadPresence::Always,
+ &caps,
+ )
+ .unwrap();
+ klass.add_pad_template(sink_pad_template);
+
+ klass.install_properties(&PROPERTIES);
+ }
+}
+
+impl ObjectImpl for S3Sink {
+ glib_object_impl!();
+
+ fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
+ let prop = &PROPERTIES[id as usize];
+ let mut settings = self.settings.lock().unwrap();
+
+ match *prop {
+ subclass::Property("bucket", ..) => {
+ settings.bucket = value.get::<String>().expect("type checked upstream");
+ }
+ subclass::Property("key", ..) => {
+ settings.key = value.get::<String>().expect("type checked upstream");
+ }
+ subclass::Property("region", ..) => {
+ let region = Region::from_str(
+ &value
+ .get::<String>()
+ .expect("type checked upstream")
+ .expect("set_property(\"region\"): no value provided"),
+ )
+ .unwrap();
+ if settings.region != region {
+ let mut client = self.client.lock().unwrap();
+ std::mem::replace(&mut *client, S3Client::new(region.clone()));
+ settings.region = region;
+ }
+ }
+ subclass::Property("part-size", ..) => {
+ settings.buffer_size = value.get_some::<u64>().expect("type checked upstream");
+ }
+ _ => unimplemented!(),
+ }
+ }
+
+ fn get_property(&self, _: &glib::Object, id: usize) -> Result<glib::Value, ()> {
+ let prop = &PROPERTIES[id as usize];
+ let settings = self.settings.lock().unwrap();
+
+ match *prop {
+ subclass::Property("key", ..) => Ok(settings.key.to_value()),
+ subclass::Property("bucket", ..) => Ok(settings.bucket.to_value()),
+ subclass::Property("region", ..) => Ok(settings.region.name().to_value()),
+ subclass::Property("part-size", ..) => Ok(settings.buffer_size.to_value()),
+ _ => unimplemented!(),
+ }
+ }
+}
+
+impl ElementImpl for S3Sink {}
+
+impl BaseSinkImpl for S3Sink {
+ fn start(&self, _element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> {
+ let mut state = self.state.lock().unwrap();
+ if let State::Started(_) = *state {
+ unreachable!("RusotoS3Sink already started");
+ }
+
+ *state = State::Started(self.start()?);
+
+ Ok(())
+ }
+
+ fn stop(&self, element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> {
+ let mut state = self.state.lock().unwrap();
+ *state = State::Stopped;
+ gst_info!(CAT, obj: element, "Stopped");
+
+ Ok(())
+ }
+
+ fn render(
+ &self,
+ element: &gst_base::BaseSink,
+ buffer: &gst::Buffer,
+ ) -> Result<gst::FlowSuccess, gst::FlowError> {
+ if let State::Stopped = *self.state.lock().unwrap() {
+ gst_element_error!(element, gst::CoreError::Failed, ["Not started yet"]);
+ return Err(gst::FlowError::Error);
+ }
+
+ gst_trace!(CAT, obj: element, "Rendering {:?}", buffer);
+ let map = buffer.map_readable().map_err(|_| {
+ gst_element_error!(element, gst::CoreError::Failed, ["Failed to map buffer"]);
+ gst::FlowError::Error
+ })?;
+
+ match self.update_buffer(&map, element) {
+ Ok(_) => Ok(gst::FlowSuccess::Ok),
+ Err(err) => match err {
+ Some(error_message) => {
+ gst_error!(
+ CAT,
+ obj: element,
+ "Multipart upload failed: {}",
+ error_message
+ );
+ element.post_error_message(&error_message);
+ Err(gst::FlowError::Error)
+ }
+ _ => {
+ gst_info!(CAT, obj: element, "Upload interrupted. Flushing...");
+ Err(gst::FlowError::Flushing)
+ }
+ },
+ }
+ }
+
+ fn unlock(&self, _element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> {
+ self.cancel();
+
+ Ok(())
+ }
+
+ fn event(&self, element: &gst_base::BaseSink, event: gst::Event) -> bool {
+ if let gst::EventView::Eos(_) = event.view() {
+ if let Err(error_message) = self.finalize_upload(element) {
+ gst_error!(
+ CAT,
+ obj: element,
+ "Failed to finalize the upload: {}",
+ error_message
+ );
+ return false;
+ }
+ }
+
+ BaseSinkImplExt::parent_event(self, element, event)
+ }
+}
+
+pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ gst::Element::register(
+ Some(plugin),
+ "rusotos3sink",
+ gst::Rank::Primary,
+ S3Sink::get_type(),
+ )
+}
diff --git a/net/gst-plugin-rusoto/src/s3src.rs b/net/gst-plugin-rusoto/src/s3src.rs
new file mode 100644
index 000000000..2984098f5
--- /dev/null
+++ b/net/gst-plugin-rusoto/src/s3src.rs
@@ -0,0 +1,462 @@
+// Copyright (C) 2017 Author: Arun Raghavan <arun@arunraghavan.net>
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use std::sync::Mutex;
+
+use bytes::Bytes;
+use futures::sync::oneshot;
+use futures::{Future, Stream};
+use rusoto_s3::*;
+use tokio::runtime;
+
+use glib::prelude::*;
+use glib::subclass;
+use glib::subclass::prelude::*;
+
+use gst;
+use gst::subclass::prelude::*;
+
+use gst_base;
+use gst_base::prelude::*;
+use gst_base::subclass::base_src::CreateSuccess;
+use gst_base::subclass::prelude::*;
+
+use crate::s3url::*;
+use crate::s3utils;
+
+#[allow(clippy::large_enum_variant)]
+enum StreamingState {
+ Stopped,
+ Started {
+ url: GstS3Url,
+ client: S3Client,
+ size: u64,
+ },
+}
+
+pub struct S3Src {
+ url: Mutex<Option<GstS3Url>>,
+ state: Mutex<StreamingState>,
+ runtime: runtime::Runtime,
+ canceller: Mutex<Option<oneshot::Sender<Bytes>>>,
+}
+
+lazy_static! {
+ static ref CAT: gst::DebugCategory = gst::DebugCategory::new(
+ "rusotos3src",
+ gst::DebugColorFlags::empty(),
+ Some("Amazon S3 Source"),
+ );
+}
+
+static PROPERTIES: [subclass::Property; 1] = [subclass::Property("uri", |name| {
+ glib::ParamSpec::string(
+ name,
+ "URI",
+ "The S3 object URI",
+ None,
+ glib::ParamFlags::READWRITE, /* + GST_PARAM_MUTABLE_READY) */
+ )
+})];
+
+impl S3Src {
+ fn cancel(&self) {
+ let mut canceller = self.canceller.lock().unwrap();
+
+ if canceller.take().is_some() {
+ /* We don't do anything, the Sender will be dropped, and that will cause the
+ * Receiver to be cancelled */
+ }
+ }
+
+ fn connect(self: &S3Src, url: &GstS3Url) -> Result<S3Client, gst::ErrorMessage> {
+ Ok(S3Client::new(url.region.clone()))
+ }
+
+ fn set_uri(
+ self: &S3Src,
+ _: &gst_base::BaseSrc,
+ url_str: Option<&str>,
+ ) -> Result<(), glib::Error> {
+ let state = self.state.lock().unwrap();
+
+ if let StreamingState::Started { .. } = *state {
+ return Err(glib::Error::new(
+ gst::URIError::BadState,
+ "Cannot set URI on a started s3src",
+ ));
+ }
+
+ let mut url = self.url.lock().unwrap();
+
+ if url_str.is_none() {
+ *url = None;
+ return Ok(());
+ }
+
+ let url_str = url_str.unwrap();
+ match parse_s3_url(url_str) {
+ Ok(s3url) => {
+ *url = Some(s3url);
+ Ok(())
+ }
+ Err(_) => Err(glib::Error::new(
+ gst::URIError::BadUri,
+ "Could not parse URI",
+ )),
+ }
+ }
+
+ fn head(
+ self: &S3Src,
+ src: &gst_base::BaseSrc,
+ client: &S3Client,
+ url: &GstS3Url,
+ ) -> Result<u64, gst::ErrorMessage> {
+ let request = HeadObjectRequest {
+ bucket: url.bucket.clone(),
+ key: url.object.clone(),
+ version_id: url.version.clone(),
+ ..Default::default()
+ };
+
+ let response = client.head_object(request);
+
+ let output = s3utils::wait(
+ &self.canceller,
+ &self.runtime,
+ response.map_err(|err| {
+ gst_error_msg!(
+ gst::ResourceError::NotFound,
+ ["Failed to HEAD object: {}", err]
+ )
+ }),
+ )
+ .map_err(|err| {
+ err.unwrap_or_else(|| {
+ gst_error_msg!(gst::LibraryError::Failed, ["Interrupted during start"])
+ })
+ })?;
+
+ if let Some(size) = output.content_length {
+ gst_info!(CAT, obj: src, "HEAD success, content length = {}", size);
+ Ok(size as u64)
+ } else {
+ Err(gst_error_msg!(
+ gst::ResourceError::Read,
+ ["Failed to get content length"]
+ ))
+ }
+ }
+
+ /* Returns the bytes, Some(error) if one occured, or a None error if interrupted */
+ fn get(
+ self: &S3Src,
+ src: &gst_base::BaseSrc,
+ offset: u64,
+ length: u64,
+ ) -> Result<Bytes, Option<gst::ErrorMessage>> {
+ let state = self.state.lock().unwrap();
+
+ let (url, client) = match *state {
+ StreamingState::Started {
+ ref url,
+ ref client,
+ ..
+ } => (url, client),
+ StreamingState::Stopped => {
+ return Err(Some(gst_error_msg!(
+ gst::LibraryError::Failed,
+ ["Cannot GET before start()"]
+ )));
+ }
+ };
+
+ let request = GetObjectRequest {
+ bucket: url.bucket.clone(),
+ key: url.object.clone(),
+ range: Some(format!("bytes={}-{}", offset, offset + length - 1)),
+ version_id: url.version.clone(),
+ ..Default::default()
+ };
+
+ gst_debug!(
+ CAT,
+ obj: src,
+ "Requesting range: {}-{}",
+ offset,
+ offset + length - 1
+ );
+
+ let response = client.get_object(request);
+
+ /* Drop the state lock now that we're done with it and need the next part to be
+ * interruptible */
+ drop(state);
+
+ let output = s3utils::wait(
+ &self.canceller,
+ &self.runtime,
+ response.map_err(|err| {
+ gst_error_msg!(gst::ResourceError::Read, ["Could not read: {}", err])
+ }),
+ )?;
+
+ gst_debug!(
+ CAT,
+ obj: src,
+ "Read {} bytes",
+ output.content_length.unwrap()
+ );
+
+ s3utils::wait(
+ &self.canceller,
+ &self.runtime,
+ output.body.unwrap().concat2().map_err(|err| {
+ gst_error_msg!(gst::ResourceError::Read, ["Could not read: {}", err])
+ }),
+ )
+ }
+}
+
+impl ObjectSubclass for S3Src {
+ const NAME: &'static str = "RusotoS3Src";
+ type ParentType = gst_base::BaseSrc;
+ type Instance = gst::subclass::ElementInstanceStruct<Self>;
+ type Class = subclass::simple::ClassStruct<Self>;
+
+ glib_object_subclass!();
+
+ fn new() -> Self {
+ Self {
+ url: Mutex::new(None),
+ state: Mutex::new(StreamingState::Stopped),
+ runtime: runtime::Builder::new()
+ .core_threads(1)
+ .name_prefix("rusotos3src-runtime")
+ .build()
+ .unwrap(),
+ canceller: Mutex::new(None),
+ }
+ }
+
+ fn type_init(typ: &mut subclass::InitializingType<Self>) {
+ typ.add_interface::<gst::URIHandler>();
+ }
+
+ fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
+ klass.set_metadata(
+ "Amazon S3 source",
+ "Source/Network",
+ "Reads an object from Amazon S3",
+ "Arun Raghavan <arun@arunraghavan.net>",
+ );
+
+ let caps = gst::Caps::new_any();
+ let src_pad_template = gst::PadTemplate::new(
+ "src",
+ gst::PadDirection::Src,
+ gst::PadPresence::Always,
+ &caps,
+ )
+ .unwrap();
+ klass.add_pad_template(src_pad_template);
+
+ klass.install_properties(&PROPERTIES);
+ }
+}
+
+impl ObjectImpl for S3Src {
+ glib_object_impl!();
+
+ fn set_property(&self, obj: &glib::Object, id: usize, value: &glib::Value) {
+ let prop = &PROPERTIES[id as usize];
+ let basesrc = obj.downcast_ref::<gst_base::BaseSrc>().unwrap();
+
+ match *prop {
+ subclass::Property("uri", ..) => {
+ let _ = self.set_uri(basesrc, value.get().expect("type checked upstream"));
+ }
+ _ => unimplemented!(),
+ }
+ }
+
+ fn get_property(&self, _: &glib::Object, id: usize) -> Result<glib::Value, ()> {
+ let prop = &PROPERTIES[id as usize];
+
+ match *prop {
+ subclass::Property("uri", ..) => {
+ let url = match *self.url.lock().unwrap() {
+ Some(ref url) => url.to_string(),
+ None => "".to_string(),
+ };
+
+ Ok(url.to_value())
+ }
+ _ => unimplemented!(),
+ }
+ }
+
+ fn constructed(&self, obj: &glib::Object) {
+ self.parent_constructed(obj);
+
+ let basesrc = obj.downcast_ref::<gst_base::BaseSrc>().unwrap();
+ basesrc.set_format(gst::Format::Bytes);
+ /* Set a larger default blocksize to make read more efficient */
+ basesrc.set_blocksize(256 * 1024);
+ }
+}
+
+impl ElementImpl for S3Src {
+ // No overrides
+}
+
+impl URIHandlerImpl for S3Src {
+ fn get_uri(&self, _: &gst::URIHandler) -> Option<String> {
+ self.url.lock().unwrap().as_ref().map(|s| s.to_string())
+ }
+
+ fn set_uri(&self, element: &gst::URIHandler, uri: &str) -> Result<(), glib::Error> {
+ let basesrc = element.dynamic_cast_ref::<gst_base::BaseSrc>().unwrap();
+ self.set_uri(basesrc, Some(uri))
+ }
+
+ fn get_uri_type() -> gst::URIType {
+ gst::URIType::Src
+ }
+
+ fn get_protocols() -> Vec<String> {
+ vec!["s3".to_string()]
+ }
+}
+
+impl BaseSrcImpl for S3Src {
+ fn is_seekable(&self, _: &gst_base::BaseSrc) -> bool {
+ true
+ }
+
+ fn get_size(&self, _: &gst_base::BaseSrc) -> Option<u64> {
+ match *self.state.lock().unwrap() {
+ StreamingState::Stopped => None,
+ StreamingState::Started { size, .. } => Some(size),
+ }
+ }
+
+ fn start(&self, src: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
+ let state = self.state.lock().unwrap();
+
+ if let StreamingState::Started { .. } = *state {
+ unreachable!("RusotoS3Src is already started");
+ }
+
+ /* Drop the lock as self.head() needs it */
+ drop(state);
+
+ let s3url = match *self.url.lock().unwrap() {
+ Some(ref url) => url.clone(),
+ None => {
+ return Err(gst_error_msg!(
+ gst::ResourceError::Settings,
+ ["Cannot start without a URL being set"]
+ ));
+ }
+ };
+
+ let s3client = self.connect(&s3url)?;
+
+ let size = self.head(src, &s3client, &s3url)?;
+
+ let mut state = self.state.lock().unwrap();
+
+ *state = StreamingState::Started {
+ url: s3url,
+ client: s3client,
+ size,
+ };
+
+ Ok(())
+ }
+
+ fn stop(&self, _: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
+ let mut state = self.state.lock().unwrap();
+
+ if let StreamingState::Stopped = *state {
+ unreachable!("Cannot stop before start");
+ }
+
+ *state = StreamingState::Stopped;
+
+ Ok(())
+ }
+
+ fn query(&self, src: &gst_base::BaseSrc, query: &mut gst::QueryRef) -> bool {
+ if let gst::QueryView::Scheduling(ref mut q) = query.view_mut() {
+ q.set(
+ gst::SchedulingFlags::SEQUENTIAL | gst::SchedulingFlags::BANDWIDTH_LIMITED,
+ 1,
+ -1,
+ 0,
+ );
+ q.add_scheduling_modes(&[gst::PadMode::Push, gst::PadMode::Pull]);
+ return true;
+ }
+
+ BaseSrcImplExt::parent_query(self, src, query)
+ }
+
+ fn create(
+ &self,
+ src: &gst_base::BaseSrc,
+ offset: u64,
+ buffer: Option<&mut gst::BufferRef>,
+ length: u32,
+ ) -> Result<CreateSuccess, gst::FlowError> {
+ // FIXME: sanity check on offset and length
+ let data = self.get(src, offset, u64::from(length));
+
+ match data {
+ /* Got data */
+ Ok(bytes) => {
+ if let Some(buffer) = buffer {
+ if let Err(copied_bytes) = buffer.copy_from_slice(0, bytes.as_ref()) {
+ buffer.set_size(copied_bytes);
+ }
+ Ok(CreateSuccess::FilledBuffer)
+ } else {
+ Ok(CreateSuccess::NewBuffer(gst::Buffer::from_slice(bytes)))
+ }
+ }
+ /* Interrupted */
+ Err(None) => Err(gst::FlowError::Flushing),
+ /* Actual Error */
+ Err(Some(err)) => {
+ gst_error!(CAT, obj: src, "Could not GET: {}", err);
+ Err(gst::FlowError::Error)
+ }
+ }
+ }
+
+ /* FIXME: implement */
+ fn do_seek(&self, _: &gst_base::BaseSrc, _: &mut gst::Segment) -> bool {
+ true
+ }
+
+ fn unlock(&self, _: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
+ self.cancel();
+ Ok(())
+ }
+}
+
+pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ gst::Element::register(
+ Some(plugin),
+ "rusotos3src",
+ gst::Rank::Primary,
+ S3Src::get_type(),
+ )
+}
diff --git a/net/gst-plugin-rusoto/src/s3url.rs b/net/gst-plugin-rusoto/src/s3url.rs
new file mode 100644
index 000000000..01a41a833
--- /dev/null
+++ b/net/gst-plugin-rusoto/src/s3url.rs
@@ -0,0 +1,178 @@
+// Copyright (C) 2017 Author: Arun Raghavan <arun@arunraghavan.net>
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use std::str::FromStr;
+
+use percent_encoding::{percent_decode, percent_encode, AsciiSet, CONTROLS};
+use rusoto_core::Region;
+use url::Url;
+
+#[derive(Clone)]
+pub struct GstS3Url {
+ pub region: Region,
+ pub bucket: String,
+ pub object: String,
+ pub version: Option<String>,
+}
+
+// FIXME: Copied from the url crate, see https://github.com/servo/rust-url/issues/529
+// https://url.spec.whatwg.org/#fragment-percent-encode-set
+const FRAGMENT: &AsciiSet = &CONTROLS.add(b' ').add(b'"').add(b'<').add(b'>').add(b'`');
+// https://url.spec.whatwg.org/#path-percent-encode-set
+const PATH: &AsciiSet = &FRAGMENT.add(b'#').add(b'?').add(b'{').add(b'}');
+const PATH_SEGMENT: &AsciiSet = &PATH.add(b'/').add(b'%');
+
+impl ToString for GstS3Url {
+ fn to_string(&self) -> String {
+ format!(
+ "s3://{}/{}/{}{}",
+ self.region.name(),
+ self.bucket,
+ percent_encode(self.object.as_bytes(), PATH_SEGMENT),
+ if self.version.is_some() {
+ format!("?version={}", self.version.clone().unwrap())
+ } else {
+ "".to_string()
+ }
+ )
+ }
+}
+
+pub fn parse_s3_url(url_str: &str) -> Result<GstS3Url, String> {
+ let url = Url::parse(url_str).or_else(|err| Err(format!("Parse error: {}", err)))?;
+
+ if url.scheme() != "s3" {
+ return Err(format!("Unsupported URI '{}'", url.scheme()));
+ }
+
+ if !url.has_host() {
+ return Err(format!("Invalid host in uri '{}'", url));
+ }
+
+ let host = url.host_str().unwrap();
+ let region = Region::from_str(host).or_else(|_| Err(format!("Invalid region '{}'", host)))?;
+
+ let mut path = url
+ .path_segments()
+ .ok_or_else(|| format!("Invalid uri '{}'", url))?;
+
+ let bucket = path.next().unwrap().to_string();
+
+ let o = path
+ .next()
+ .ok_or_else(|| format!("Invalid empty object/bucket '{}'", url))?;
+
+ let mut object = percent_decode(o.as_bytes())
+ .decode_utf8()
+ .unwrap()
+ .to_string();
+ if o.is_empty() {
+ return Err(format!("Invalid empty object/bucket '{}'", url));
+ }
+
+ object = path.fold(object, |o, p| format!("{}/{}", o, p));
+
+ let mut q = url.query_pairs();
+ let v = q.next();
+ let version;
+
+ match v {
+ Some((ref k, ref v)) if k == "version" => version = Some((*v).to_string()),
+ None => version = None,
+ Some(_) => return Err("Bad query, only 'version' is supported".to_owned()),
+ }
+
+ if q.next() != None {
+ return Err("Extra query terms, only 'version' is supported".to_owned());
+ }
+
+ Ok(GstS3Url {
+ region,
+ bucket,
+ object,
+ version,
+ })
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn cannot_be_base() {
+ assert!(parse_s3_url("data:something").is_err());
+ }
+
+ #[test]
+ fn invalid_scheme() {
+ assert!(parse_s3_url("file:///dev/zero").is_err());
+ }
+
+ #[test]
+ fn bad_region() {
+ assert!(parse_s3_url("s3://atlantis-1/i-hope-we/dont-find-this").is_err());
+ }
+
+ #[test]
+ fn no_bucket() {
+ assert!(parse_s3_url("s3://ap-south-1").is_err());
+ assert!(parse_s3_url("s3://ap-south-1/").is_err());
+ }
+
+ #[test]
+ fn no_object() {
+ assert!(parse_s3_url("s3://ap-south-1/my-bucket").is_err());
+ assert!(parse_s3_url("s3://ap-south-1/my-bucket/").is_err());
+ }
+
+ #[test]
+ fn valid_simple() {
+ assert!(parse_s3_url("s3://ap-south-1/my-bucket/my-object").is_ok());
+ }
+
+ #[test]
+ fn extraneous_query() {
+ assert!(parse_s3_url("s3://ap-south-1/my-bucket/my-object?foo=bar").is_err());
+ }
+
+ #[test]
+ fn valid_version() {
+ assert!(parse_s3_url("s3://ap-south-1/my-bucket/my-object?version=one").is_ok());
+ }
+
+ #[test]
+ fn trailing_slash() {
+ // Slashes are valid at the end of the object key
+ assert_eq!(
+ parse_s3_url("s3://ap-south-1/my-bucket/my-object/")
+ .unwrap()
+ .object,
+ "my-object/"
+ );
+ }
+
+ #[test]
+ fn percent_encoding() {
+ assert_eq!(
+ parse_s3_url("s3://ap-south-1/my-bucket/my%20object")
+ .unwrap()
+ .object,
+ "my object"
+ );
+ }
+
+ #[test]
+ fn percent_decoding() {
+ assert_eq!(
+ parse_s3_url("s3://ap-south-1/my-bucket/my object")
+ .unwrap()
+ .to_string(),
+ "s3://ap-south-1/my-bucket/my%20object"
+ );
+ }
+}
diff --git a/net/gst-plugin-rusoto/src/s3utils.rs b/net/gst-plugin-rusoto/src/s3utils.rs
new file mode 100644
index 000000000..706207307
--- /dev/null
+++ b/net/gst-plugin-rusoto/src/s3utils.rs
@@ -0,0 +1,48 @@
+// Copyright (C) 2017 Author: Arun Raghavan <arun@arunraghavan.net>
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use futures::sync::oneshot;
+use futures::Future;
+use std::sync::Mutex;
+use tokio::runtime;
+
+pub fn wait<F, T>(
+ canceller: &Mutex<Option<oneshot::Sender<T>>>,
+ runtime: &runtime::Runtime,
+ future: F,
+) -> Result<F::Item, Option<gst::ErrorMessage>>
+where
+ F: Send + Future<Error = gst::ErrorMessage> + 'static,
+ F::Item: Send,
+{
+ let mut canceller_guard = canceller.lock().unwrap();
+ let (sender, receiver) = oneshot::channel::<T>();
+
+ canceller_guard.replace(sender);
+ drop(canceller_guard);
+
+ let unlock_error = gst_error_msg!(gst::ResourceError::Busy, ["unlock"]);
+
+ let res = oneshot::spawn(future, &runtime.executor())
+ .select(receiver.then(|_| Err(unlock_error.clone())))
+ .wait()
+ .map(|v| v.0)
+ .map_err(|err| {
+ if err.0 == unlock_error {
+ None
+ } else {
+ Some(err.0)
+ }
+ });
+
+ /* Clear out the canceller */
+ canceller_guard = canceller.lock().unwrap();
+ *canceller_guard = None;
+
+ res
+}