Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKarl Rikte <rikte88@gmail.com>2021-04-15 00:58:37 +0300
committerKarl Rikte <karl.rikte@gmail.com>2021-05-03 20:13:33 +0300
commite1ea71fec758f70442e8b583599edf60ad2d8caa (patch)
tree721dc5683650c1fdea2450d5fea4c63fe47b61e9 /net/reqwest
parentbf5e231e5b8c8c978fb74cfc8f0910a46906f422 (diff)
Implemented proxy support
Implemented analogously to souphttpsrc for compatibility. Proxy prevents sharing the client between element instances. Change-Id: I50d676fd55f0e1d7051d8cd7d5922b7be4f0c6e8
Diffstat (limited to 'net/reqwest')
-rw-r--r--net/reqwest/src/reqwesthttpsrc/imp.rs279
-rw-r--r--net/reqwest/tests/reqwesthttpsrc.rs110
2 files changed, 335 insertions, 54 deletions
diff --git a/net/reqwest/src/reqwesthttpsrc/imp.rs b/net/reqwest/src/reqwesthttpsrc/imp.rs
index 699efa404..46e3aeeff 100644
--- a/net/reqwest/src/reqwesthttpsrc/imp.rs
+++ b/net/reqwest/src/reqwesthttpsrc/imp.rs
@@ -49,6 +49,15 @@ struct Settings {
cookies: Vec<String>,
iradio_mode: bool,
keep_alive: bool,
+ // Notes about souphttpsrc compatibility:
+ // Internal representation of no proxy is None,
+ // but externally Some("").
+ // Default is set from env var 'http_proxy'.
+ // Prepends http:// if not protocol specified.
+ proxy: Option<String>,
+ // Nullable fields that behave normally:
+ proxy_id: Option<String>,
+ proxy_pw: Option<String>,
}
impl Default for Settings {
@@ -64,6 +73,38 @@ impl Default for Settings {
cookies: Vec::new(),
iradio_mode: DEFAULT_IRADIO_MODE,
keep_alive: DEFAULT_KEEP_ALIVE,
+ proxy: match proxy_from_str(std::env::var("http_proxy").ok()) {
+ Ok(a) => a,
+ Err(_) => None,
+ },
+ proxy_id: None,
+ proxy_pw: None,
+ }
+ }
+}
+
+fn proxy_from_str(s: Option<String>) -> Result<Option<String>, glib::Error> {
+ match s {
+ None => Ok(None),
+ Some(s) if s.is_empty() => Ok(None),
+ Some(not_empty_str) => {
+ // If no protocol specified, prepend http for compatibility
+ // https://gstreamer.freedesktop.org/documentation/soup/souphttpsrc.html
+ let url_string = if !not_empty_str.contains("://") {
+ format!("http://{}", not_empty_str)
+ } else {
+ not_empty_str
+ };
+ match reqwest::Url::parse(&url_string) {
+ Ok(url) => {
+ // this may urlencode and add trailing /
+ Ok(Some(url.to_string()))
+ }
+ Err(err) => Err(glib::Error::new(
+ gst::URIError::BadUri,
+ format!("Failed to parse URI '{}': {:?}", url_string, err).as_str(),
+ )),
+ }
}
}
}
@@ -168,9 +209,51 @@ impl ReqwestHttpSrc {
Ok(())
}
+ /// Set a proxy-related property and perform necessary state checks and modifications to client.
+ fn set_proxy_prop<F>(
+ &self,
+ property_name: &str,
+ desired_value: Option<String>,
+ prop_memory_location: F,
+ ) -> Result<(), glib::Error>
+ where
+ F: Fn(&mut Settings) -> &mut Option<String>,
+ {
+ // Proxy props can only be changed when not started.
+ let state = self.state.lock().unwrap();
+ if let State::Started { .. } = *state {
+ return Err(glib::Error::new(
+ gst::URIError::BadState,
+ &format!(
+ "Changing the `{}` property on a started `reqwesthttpsrc` is not supported",
+ property_name
+ ),
+ ));
+ }
+
+ // Get memory address of specific variable to change.
+ let mut settings = self.settings.lock().unwrap();
+ let target_variable = prop_memory_location(&mut settings);
+ if &desired_value == target_variable {
+ return Ok(());
+ }
+
+ // If the Proxy is changed we need to throw away the old client since it isn't properly
+ // configured with a proxy anymore. Since element is not started, an existing client
+ // without proxy will be used, or a new one with/without proxy will be built on next call
+ // to ensure_client.
+ *self.client.lock().unwrap() = None;
+ *target_variable = desired_value;
+
+ Ok(())
+ }
+
fn ensure_client(
&self,
src: &super::ReqwestHttpSrc,
+ proxy: Option<String>,
+ proxy_id: Option<String>,
+ proxy_pw: Option<String>,
) -> Result<ClientContext, gst::ErrorMessage> {
let mut client_guard = self.client.lock().unwrap();
if let Some(ref client) = *client_guard {
@@ -178,57 +261,70 @@ impl ReqwestHttpSrc {
return Ok(client.clone());
}
- let srcpad = src.static_pad("src").unwrap();
- let mut q = gst::query::Context::new(REQWEST_CLIENT_CONTEXT);
- if srcpad.peer_query(&mut q) {
- if let Some(context) = q.context_owned() {
- src.set_context(&context);
+ // Attempt to acquire an existing client context from another element instance
+ // unless using proxy, because proxy is client specific.
+ if proxy.is_none() {
+ let srcpad = src.static_pad("src").unwrap();
+ let mut q = gst::query::Context::new(REQWEST_CLIENT_CONTEXT);
+ if srcpad.peer_query(&mut q) {
+ if let Some(context) = q.context_owned() {
+ src.set_context(&context);
+ }
+ } else {
+ let _ = src.post_message(
+ gst::message::NeedContext::builder(REQWEST_CLIENT_CONTEXT)
+ .src(src)
+ .build(),
+ );
+ }
+
+ // Hopefully now, self.set_context will have been synchronously called
+ if let Some(client) = self.external_client.lock().unwrap().clone() {
+ gst_debug!(CAT, obj: src, "Using shared client");
+ *client_guard = Some(client.clone());
+
+ return Ok(client);
}
- } else {
- let _ = src.post_message(
- gst::message::NeedContext::builder(REQWEST_CLIENT_CONTEXT)
- .src(src)
- .build(),
- );
}
- if let Some(client) = {
- // FIXME: Is there a simpler way to ensure the lock is not hold
- // after this block anymore?
- let external_client = self.external_client.lock().unwrap();
- let client = external_client.as_ref().cloned();
- drop(external_client);
- client
- } {
- gst_debug!(CAT, obj: src, "Using shared client");
- *client_guard = Some(client.clone());
-
- return Ok(client);
+ let mut builder = Client::builder().cookie_store(true).gzip(true);
+
+ if let Some(proxy) = &proxy {
+ // Proxy is url-checked on property set but perhaps this might still fail.
+ let mut p = reqwest::Proxy::all(proxy).map_err(|err| {
+ gst::error_msg!(gst::ResourceError::OpenRead, ["Bad proxy URI: {}", err])
+ })?;
+ if let Some(proxy_id) = &proxy_id {
+ let proxy_pw = proxy_pw.as_deref().unwrap_or("");
+ p = p.basic_auth(proxy_id, proxy_pw);
+ }
+ builder = builder.proxy(p);
}
gst_debug!(CAT, obj: src, "Creating new client");
let client = ClientContext(Arc::new(ClientContextInner {
- client: Client::builder()
- .cookie_store(true)
- .gzip(true)
- .build()
- .map_err(|err| {
- gst::error_msg!(
- gst::ResourceError::OpenRead,
- ["Failed to create Client: {}", err]
- )
- })?,
+ client: builder.build().map_err(|err| {
+ gst::error_msg!(
+ gst::ResourceError::OpenRead,
+ ["Failed to create Client: {}", err]
+ )
+ })?,
}));
- gst_debug!(CAT, obj: src, "Sharing new client with other elements");
- let mut context = gst::Context::new(REQWEST_CLIENT_CONTEXT, true);
- {
- let context = context.get_mut().unwrap();
- let s = context.structure_mut();
- s.set("client", &client);
+ // Share created client with other elements, unless using proxy. Shared client never uses proxy.
+ // The alternative would be different contexts for different proxy settings, or one context with a
+ // map from proxy settings to client, but then, how and when to discard those, retaining reuse benefits?
+ if proxy.is_none() {
+ gst_debug!(CAT, obj: src, "Sharing new client with other elements");
+ let mut context = gst::Context::new(REQWEST_CLIENT_CONTEXT, true);
+ {
+ let context = context.get_mut().unwrap();
+ let s = context.structure_mut();
+ s.set("client", &client);
+ }
+ src.set_context(&context);
+ let _ = src.post_message(gst::message::HaveContext::builder(context).src(src).build());
}
- src.set_context(&context);
- let _ = src.post_message(gst::message::HaveContext::builder(context).src(src).build());
*client_guard = Some(client.clone());
@@ -251,12 +347,14 @@ impl ReqwestHttpSrc {
gst_debug!(CAT, obj: src, "Creating new request for {}", uri);
- let req = {
- let client = self.ensure_client(src)?;
- client.0.client.get(uri.clone())
- };
let settings = self.settings.lock().unwrap().clone();
+ let req = self
+ .ensure_client(src, settings.proxy, settings.proxy_id, settings.proxy_pw)?
+ .0
+ .client
+ .get(uri.clone());
+
let mut headers = HeaderMap::new();
if settings.keep_alive {
@@ -658,6 +756,27 @@ impl ObjectImpl for ReqwestHttpSrc {
DEFAULT_KEEP_ALIVE,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
+ glib::ParamSpec::new_string(
+ "proxy",
+ "Proxy",
+ "HTTP proxy server URI",
+ Some(""),
+ glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
+ ),
+ glib::ParamSpec::new_string(
+ "proxy-id",
+ "Proxy-id",
+ "HTTP proxy URI user id for authentication",
+ Some(""),
+ glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
+ ),
+ glib::ParamSpec::new_string(
+ "proxy-pw",
+ "Proxy-pw",
+ "HTTP proxy URI user password for authentication",
+ Some(""),
+ glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
+ ),
]
});
@@ -671,17 +790,10 @@ impl ObjectImpl for ReqwestHttpSrc {
value: &glib::Value,
pspec: &glib::ParamSpec,
) {
- match pspec.name() {
+ let res = match pspec.name() {
"location" => {
let location = value.get::<Option<&str>>().expect("type checked upstream");
- if let Err(err) = self.set_location(obj, location) {
- gst_error!(
- CAT,
- obj: obj,
- "Failed to set property `location`: {:?}",
- err
- );
- }
+ self.set_location(obj, location)
}
"user-agent" => {
let mut settings = self.settings.lock().unwrap();
@@ -690,52 +802,100 @@ impl ObjectImpl for ReqwestHttpSrc {
.expect("type checked upstream")
.unwrap_or_else(|| DEFAULT_USER_AGENT.into());
settings.user_agent = user_agent;
+ Ok(())
}
"is-live" => {
let is_live = value.get().expect("type checked upstream");
obj.set_live(is_live);
+ Ok(())
}
"user-id" => {
let mut settings = self.settings.lock().unwrap();
let user_id = value.get().expect("type checked upstream");
settings.user_id = user_id;
+ Ok(())
}
"user-pw" => {
let mut settings = self.settings.lock().unwrap();
let user_pw = value.get().expect("type checked upstream");
settings.user_pw = user_pw;
+ Ok(())
}
"timeout" => {
let mut settings = self.settings.lock().unwrap();
let timeout = value.get().expect("type checked upstream");
settings.timeout = timeout;
+ Ok(())
}
"compress" => {
let mut settings = self.settings.lock().unwrap();
let compress = value.get().expect("type checked upstream");
settings.compress = compress;
+ Ok(())
}
"extra-headers" => {
let mut settings = self.settings.lock().unwrap();
let extra_headers = value.get().expect("type checked upstream");
settings.extra_headers = extra_headers;
+ Ok(())
}
"cookies" => {
let mut settings = self.settings.lock().unwrap();
settings.cookies = value.get::<Vec<String>>().expect("type checked upstream");
+ Ok(())
}
"iradio-mode" => {
let mut settings = self.settings.lock().unwrap();
let iradio_mode = value.get().expect("type checked upstream");
settings.iradio_mode = iradio_mode;
+ Ok(())
}
"keep-alive" => {
let mut settings = self.settings.lock().unwrap();
let keep_alive = value.get().expect("type checked upstream");
settings.keep_alive = keep_alive;
+ Ok(())
+ }
+ "proxy" => {
+ let proxy = proxy_from_str(
+ value
+ .get::<Option<String>>()
+ .expect("type checked upstream"),
+ );
+ match proxy {
+ Ok(proxy) => self
+ .set_proxy_prop(pspec.name(), proxy, move |settings| &mut settings.proxy),
+ Err(e) => Err(e),
+ }
+ }
+ "proxy-id" => {
+ let proxy_id = value
+ .get::<Option<String>>()
+ .expect("type checked upstream");
+ self.set_proxy_prop(pspec.name(), proxy_id, move |settings| {
+ &mut settings.proxy_id
+ })
+ }
+ "proxy-pw" => {
+ let proxy_pw = value
+ .get::<Option<String>>()
+ .expect("type checked upstream");
+ self.set_proxy_prop(pspec.name(), proxy_pw, move |settings| {
+ &mut settings.proxy_pw
+ })
}
_ => unimplemented!(),
};
+
+ if let Err(err) = res {
+ gst_error!(
+ CAT,
+ obj: obj,
+ "Failed to set property `{}`: {:?}",
+ pspec.name(),
+ err
+ );
+ }
}
fn property(&self, obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
@@ -783,6 +943,17 @@ impl ObjectImpl for ReqwestHttpSrc {
let settings = self.settings.lock().unwrap();
settings.keep_alive.to_value()
}
+ // return None values as Some("") for compatibility with souphttpsrc
+ "proxy" => self
+ .settings
+ .lock()
+ .unwrap()
+ .proxy
+ .as_deref()
+ .unwrap_or("")
+ .to_value(),
+ "proxy-id" => self.settings.lock().unwrap().proxy_id.to_value(),
+ "proxy-pw" => self.settings.lock().unwrap().proxy_pw.to_value(),
_ => unimplemented!(),
}
}
diff --git a/net/reqwest/tests/reqwesthttpsrc.rs b/net/reqwest/tests/reqwesthttpsrc.rs
index a107ff5f4..6eb7c9e0b 100644
--- a/net/reqwest/tests/reqwesthttpsrc.rs
+++ b/net/reqwest/tests/reqwesthttpsrc.rs
@@ -15,6 +15,8 @@ fn init() {
static INIT: Once = Once::new();
INIT.call_once(|| {
+ // clear this environment because it affects the default settings
+ std::env::remove_var("http_proxy");
gst::init().unwrap();
gstreqwest::plugin_register_static().expect("reqwesthttpsrc tests");
});
@@ -1183,3 +1185,111 @@ fn test_cookies() {
}
assert_eq!(num_bytes, 12);
}
+
+#[test]
+fn test_proxy_prop_souphttpsrc_compatibility() {
+ init();
+
+ fn assert_proxy_set(set_to: Option<&str>, expected: Option<&str>) {
+ // The same assertions should hold for "souphttpsrc".
+ let src = gst::ElementFactory::make("reqwesthttpsrc", None).unwrap();
+ src.set_property("proxy", set_to).unwrap();
+ assert_eq!(
+ src.property("proxy")
+ .unwrap()
+ .get::<Option<&str>>()
+ .unwrap(),
+ expected
+ );
+ }
+
+ // Test env var proxy.
+ assert_proxy_set(Some("http://mydomain/"), Some("http://mydomain/"));
+
+ // It should prepend http if no protocol specified and add /.
+ assert_proxy_set(Some("myotherdomain"), Some("http://myotherdomain/"));
+
+ // Empty env var should result in "" proxy (meaning None) for compatibility.
+ assert_proxy_set(Some(""), Some(""));
+
+ // It should allow setting this value for proxy for compatibility.
+ assert_proxy_set(Some("&$"), Some("http://&$/"));
+
+ // No env var should result in "" proxy (meaning None) for compatibility.
+ assert_proxy_set(None, Some(""));
+}
+
+#[test]
+fn test_proxy() {
+ init();
+
+ // Simplest possible implementation of naive oneshot proxy server?
+ // Listen on socket before spawning thread (we won't error out with connection refused).
+ let incoming = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
+ let proxy_addr = incoming.local_addr().unwrap();
+ println!("listening on {}, starting proxy server", proxy_addr);
+ let proxy_server = std::thread::spawn(move || {
+ use std::io::*;
+ println!("awaiting connection to proxy server");
+ let (mut conn, _addr) = incoming.accept().unwrap();
+
+ println!("client connected, reading request line");
+ let mut reader = BufReader::new(conn.try_clone().unwrap());
+ let mut buf = String::new();
+ reader.read_line(&mut buf).unwrap();
+ let parts: Vec<&str> = buf.split(' ').collect();
+ let url = reqwest::Url::parse(parts[1]).unwrap();
+ let host = format!(
+ "{}:{}",
+ url.host_str().unwrap(),
+ url.port_or_known_default().unwrap()
+ );
+
+ println!("connecting to target server {}", host);
+ let mut server_connection = std::net::TcpStream::connect(host).unwrap();
+
+ println!("connected to target server, sending modified request line");
+ server_connection
+ .write_all(format!("{} {} {}\r\n", parts[0], url.path(), parts[2]).as_bytes())
+ .unwrap();
+
+ println!("sent modified request line, forwarding data in both directions");
+ let send_join_handle = {
+ let mut server_connection = server_connection.try_clone().unwrap();
+ std::thread::spawn(move || {
+ copy(&mut reader, &mut server_connection).unwrap();
+ })
+ };
+ copy(&mut server_connection, &mut conn).unwrap();
+ send_join_handle.join().unwrap();
+ println!("shutting down proxy server");
+ });
+
+ let mut h = Harness::new(
+ |_req| {
+ use hyper::{Body, Response};
+
+ Response::builder()
+ .body(Body::from("Hello Proxy World"))
+ .unwrap()
+ },
+ |src| {
+ src.set_property("proxy", proxy_addr.to_string()).unwrap();
+ },
+ );
+
+ // Set the HTTP source to Playing so that everything can start.
+ h.run(|src| {
+ src.set_state(gst::State::Playing).unwrap();
+ });
+
+ // Wait for a buffer.
+ let mut num_bytes = 0;
+ while let Some(buffer) = h.wait_buffer_or_eos() {
+ num_bytes += buffer.size();
+ }
+ assert_eq!(num_bytes, "Hello Proxy World".len());
+
+ // Don't leave threads hanging around.
+ proxy_server.join().unwrap();
+}