Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/audio
diff options
context:
space:
mode:
authorGuillaume Desmottes <guillaume.desmottes@onestream.live>2022-11-12 17:57:25 +0300
committerGuillaume Desmottes <guillaume.desmottes@onestream.live>2022-11-16 15:20:38 +0300
commit2642410702ef05fa3981a9e5a9b9817f0fbfa6ea (patch)
treed7d1e6f848a028e7e17951aafa40223fb8549549 /audio
parent3abd13e57bbcd7318af8cca1d3ddadfaf7d8a02a (diff)
spotify: fix "start a runtime from within a runtime" with static link
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/976>
Diffstat (limited to 'audio')
-rw-r--r--audio/spotify/src/spotifyaudiosrc/imp.rs220
1 files changed, 154 insertions, 66 deletions
diff --git a/audio/spotify/src/spotifyaudiosrc/imp.rs b/audio/spotify/src/spotifyaudiosrc/imp.rs
index c168fc3db..54e92a4e5 100644
--- a/audio/spotify/src/spotifyaudiosrc/imp.rs
+++ b/audio/spotify/src/spotifyaudiosrc/imp.rs
@@ -6,9 +6,10 @@
//
// SPDX-License-Identifier: MPL-2.0
-use std::sync::{mpsc, Arc, Mutex};
+use std::sync::{mpsc, Arc, Mutex, MutexGuard};
use anyhow::bail;
+use futures::future::{AbortHandle, Abortable, Aborted};
use once_cell::sync::Lazy;
use tokio::{runtime, task::JoinHandle};
@@ -74,10 +75,16 @@ struct Settings {
#[derive(Default)]
pub struct SpotifyAudioSrc {
+ setup_thread: Mutex<Option<SetupThread>>,
state: Arc<Mutex<Option<State>>>,
settings: Mutex<Settings>,
}
+struct SetupThread {
+ thread_handle: std::thread::JoinHandle<Result<anyhow::Result<()>, Aborted>>,
+ abort_handle: AbortHandle,
+}
+
#[glib::object_subclass]
impl ObjectSubclass for SpotifyAudioSrc {
const NAME: &'static str = "GstSpotifyAudioSrc";
@@ -237,17 +244,22 @@ impl BaseSrcImpl for SpotifyAudioSrc {
}
}
- if let Err(err) = RUNTIME.block_on(async move { self.setup().await }) {
- let details = format!("{:?}", err);
- gst::error!(CAT, imp: self, "failed to start: {}", details);
- gst::element_imp_error!(self, gst::ResourceError::Settings, [&details]);
- return Err(gst::error_msg!(gst::ResourceError::Settings, [&details]));
+ {
+ let setup_thread = self.setup_thread.lock().unwrap();
+ if setup_thread.is_some() {
+ // already starting
+ return Ok(());
+ }
+ self.start_setup(setup_thread);
}
Ok(())
}
fn stop(&self) -> Result<(), gst::ErrorMessage> {
+ // stop the setup if it's not completed yet
+ self.cancel_setup();
+
if let Some(state) = self.state.lock().unwrap().take() {
gst::debug!(CAT, imp: self, "stopping");
state.player.stop();
@@ -258,6 +270,12 @@ impl BaseSrcImpl for SpotifyAudioSrc {
Ok(())
}
+
+ fn unlock(&self) -> Result<(), gst::ErrorMessage> {
+ self.cancel_setup();
+
+ self.parent_unlock()
+ }
}
impl PushSrcImpl for SpotifyAudioSrc {
@@ -265,6 +283,41 @@ impl PushSrcImpl for SpotifyAudioSrc {
&self,
_buffer: Option<&mut gst::BufferRef>,
) -> Result<CreateSuccess, gst::FlowError> {
+ let state_set = {
+ let state = self.state.lock().unwrap();
+ state.is_some()
+ };
+
+ if !state_set {
+ let setup_thread = self.setup_thread.lock().unwrap();
+ if setup_thread.is_none() {
+ // unlock() could potentially cancel the setup, and create() can be called after unlock() without going through start() again.
+ self.start_setup(setup_thread);
+ }
+ }
+
+ {
+ // wait for the setup to be completed
+ let mut setup_thread = self.setup_thread.lock().unwrap();
+ if let Some(setup) = setup_thread.take() {
+ let res = setup.thread_handle.join().unwrap();
+
+ match res {
+ Err(_aborted) => {
+ gst::debug!(CAT, imp: self, "setup has been cancelled");
+ return Err(gst::FlowError::Flushing);
+ }
+ Ok(Err(err)) => {
+ let details = format!("{:?}", err);
+ gst::error!(CAT, imp: self, "failed to start: {}", details);
+ gst::element_imp_error!(self, gst::ResourceError::Settings, [&details]);
+ return Err(gst::FlowError::Error);
+ }
+ Ok(Ok(_)) => {}
+ }
+ }
+ }
+
let state = self.state.lock().unwrap();
let state = state.as_ref().unwrap();
@@ -290,8 +343,97 @@ impl PushSrcImpl for SpotifyAudioSrc {
}
}
+struct BufferSink {
+ sender: mpsc::SyncSender<Message>,
+}
+
+impl Sink for BufferSink {
+ fn write(&mut self, packet: AudioPacket, _converter: &mut Converter) -> SinkResult<()> {
+ let oggdata = match packet {
+ AudioPacket::OggData(data) => data,
+ AudioPacket::Samples(_) => unimplemented!(),
+ };
+ let buffer = gst::Buffer::from_slice(oggdata);
+
+ // ignore if sending fails as that means the source element is being shutdown
+ let _ = self.sender.send(Message::Buffer(buffer));
+
+ Ok(())
+ }
+}
+
+impl URIHandlerImpl for SpotifyAudioSrc {
+ const URI_TYPE: gst::URIType = gst::URIType::Src;
+
+ fn protocols() -> &'static [&'static str] {
+ &["spotify"]
+ }
+
+ fn uri(&self) -> Option<String> {
+ let settings = self.settings.lock().unwrap();
+
+ if settings.track.is_empty() {
+ None
+ } else {
+ Some(settings.track.clone())
+ }
+ }
+
+ fn set_uri(&self, uri: &str) -> Result<(), glib::Error> {
+ gst::debug!(CAT, imp: self, "set URI: {}", uri);
+
+ let url = url::Url::parse(uri)
+ .map_err(|e| glib::Error::new(gst::URIError::BadUri, &format!("{:?}", e)))?;
+
+ // allow to configure auth and cache settings from the URI
+ for (key, value) in url.query_pairs() {
+ match key.as_ref() {
+ "username" | "password" | "cache-credentials" | "cache-files" => {
+ self.obj().set_property(&key, value.as_ref());
+ }
+ _ => {
+ gst::warning!(CAT, imp: self, "unsupported query: {}={}", key, value);
+ }
+ }
+ }
+
+ self.obj()
+ .set_property("track", format!("{}:{}", url.scheme(), url.path()));
+
+ Ok(())
+ }
+}
+
impl SpotifyAudioSrc {
+ fn start_setup(&self, mut setup_thread: MutexGuard<Option<SetupThread>>) {
+ let self_ = self.to_owned();
+
+ // run the runtime from another thread to prevent the "start a runtime from within a runtime" panic
+ // when the plugin is statically linked.
+ let (abort_handle, abort_registration) = AbortHandle::new_pair();
+ let thread_handle = std::thread::spawn(move || {
+ RUNTIME.block_on(async move {
+ let future = Abortable::new(self_.setup(), abort_registration);
+ future.await
+ })
+ });
+
+ setup_thread.replace(SetupThread {
+ thread_handle,
+ abort_handle,
+ });
+ }
+
async fn setup(&self) -> anyhow::Result<()> {
+ {
+ let state = self.state.lock().unwrap();
+
+ if state.is_some() {
+ // already setup
+ return Ok(());
+ }
+ }
+
let (credentials, cache, track) = {
let settings = self.settings.lock().unwrap();
@@ -343,8 +485,6 @@ impl SpotifyAudioSrc {
(credentials, cache, settings.track.clone())
};
- let state = self.state.clone();
-
let (session, _credentials) =
Session::connect(SessionConfig::default(), credentials, Some(cache), false).await?;
@@ -385,7 +525,8 @@ impl SpotifyAudioSrc {
}
});
- let mut state = state.lock().unwrap();
+ let mut state = self.state.lock().unwrap();
+
state.replace(State {
player,
receiver,
@@ -394,65 +535,12 @@ impl SpotifyAudioSrc {
Ok(())
}
-}
-
-struct BufferSink {
- sender: mpsc::SyncSender<Message>,
-}
-impl Sink for BufferSink {
- fn write(&mut self, packet: AudioPacket, _converter: &mut Converter) -> SinkResult<()> {
- let oggdata = match packet {
- AudioPacket::OggData(data) => data,
- AudioPacket::Samples(_) => unimplemented!(),
- };
- let buffer = gst::Buffer::from_slice(oggdata);
-
- // ignore if sending fails as that means the source element is being shutdown
- let _ = self.sender.send(Message::Buffer(buffer));
-
- Ok(())
- }
-}
+ fn cancel_setup(&self) {
+ let mut setup_thread = self.setup_thread.lock().unwrap();
-impl URIHandlerImpl for SpotifyAudioSrc {
- const URI_TYPE: gst::URIType = gst::URIType::Src;
-
- fn protocols() -> &'static [&'static str] {
- &["spotify"]
- }
-
- fn uri(&self) -> Option<String> {
- let settings = self.settings.lock().unwrap();
-
- if settings.track.is_empty() {
- None
- } else {
- Some(settings.track.clone())
- }
- }
-
- fn set_uri(&self, uri: &str) -> Result<(), glib::Error> {
- gst::debug!(CAT, imp: self, "set URI: {}", uri);
-
- let url = url::Url::parse(uri)
- .map_err(|e| glib::Error::new(gst::URIError::BadUri, &format!("{:?}", e)))?;
-
- // allow to configure auth and cache settings from the URI
- for (key, value) in url.query_pairs() {
- match key.as_ref() {
- "username" | "password" | "cache-credentials" | "cache-files" => {
- self.obj().set_property(&key, value.as_ref());
- }
- _ => {
- gst::warning!(CAT, imp: self, "unsupported query: {}={}", key, value);
- }
- }
+ if let Some(setup) = setup_thread.take() {
+ setup.abort_handle.abort();
}
-
- self.obj()
- .set_property("track", format!("{}:{}", url.scheme(), url.path()));
-
- Ok(())
}
}