diff options
author | Guillaume Desmottes <guillaume.desmottes@onestream.live> | 2022-11-12 17:57:25 +0300 |
---|---|---|
committer | Guillaume Desmottes <guillaume.desmottes@onestream.live> | 2022-11-16 15:20:38 +0300 |
commit | 2642410702ef05fa3981a9e5a9b9817f0fbfa6ea (patch) | |
tree | d7d1e6f848a028e7e17951aafa40223fb8549549 /audio | |
parent | 3abd13e57bbcd7318af8cca1d3ddadfaf7d8a02a (diff) |
spotify: fix "start a runtime from within a runtime" with static link
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/976>
Diffstat (limited to 'audio')
-rw-r--r-- | audio/spotify/src/spotifyaudiosrc/imp.rs | 220 |
1 files changed, 154 insertions, 66 deletions
diff --git a/audio/spotify/src/spotifyaudiosrc/imp.rs b/audio/spotify/src/spotifyaudiosrc/imp.rs index c168fc3d..54e92a4e 100644 --- a/audio/spotify/src/spotifyaudiosrc/imp.rs +++ b/audio/spotify/src/spotifyaudiosrc/imp.rs @@ -6,9 +6,10 @@ // // SPDX-License-Identifier: MPL-2.0 -use std::sync::{mpsc, Arc, Mutex}; +use std::sync::{mpsc, Arc, Mutex, MutexGuard}; use anyhow::bail; +use futures::future::{AbortHandle, Abortable, Aborted}; use once_cell::sync::Lazy; use tokio::{runtime, task::JoinHandle}; @@ -74,10 +75,16 @@ struct Settings { #[derive(Default)] pub struct SpotifyAudioSrc { + setup_thread: Mutex<Option<SetupThread>>, state: Arc<Mutex<Option<State>>>, settings: Mutex<Settings>, } +struct SetupThread { + thread_handle: std::thread::JoinHandle<Result<anyhow::Result<()>, Aborted>>, + abort_handle: AbortHandle, +} + #[glib::object_subclass] impl ObjectSubclass for SpotifyAudioSrc { const NAME: &'static str = "GstSpotifyAudioSrc"; @@ -237,17 +244,22 @@ impl BaseSrcImpl for SpotifyAudioSrc { } } - if let Err(err) = RUNTIME.block_on(async move { self.setup().await }) { - let details = format!("{:?}", err); - gst::error!(CAT, imp: self, "failed to start: {}", details); - gst::element_imp_error!(self, gst::ResourceError::Settings, [&details]); - return Err(gst::error_msg!(gst::ResourceError::Settings, [&details])); + { + let setup_thread = self.setup_thread.lock().unwrap(); + if setup_thread.is_some() { + // already starting + return Ok(()); + } + self.start_setup(setup_thread); } Ok(()) } fn stop(&self) -> Result<(), gst::ErrorMessage> { + // stop the setup if it's not completed yet + self.cancel_setup(); + if let Some(state) = self.state.lock().unwrap().take() { gst::debug!(CAT, imp: self, "stopping"); state.player.stop(); @@ -258,6 +270,12 @@ impl BaseSrcImpl for SpotifyAudioSrc { Ok(()) } + + fn unlock(&self) -> Result<(), gst::ErrorMessage> { + self.cancel_setup(); + + self.parent_unlock() + } } impl PushSrcImpl for SpotifyAudioSrc { @@ -265,6 +283,41 @@ impl PushSrcImpl for SpotifyAudioSrc { &self, _buffer: Option<&mut gst::BufferRef>, ) -> Result<CreateSuccess, gst::FlowError> { + let state_set = { + let state = self.state.lock().unwrap(); + state.is_some() + }; + + if !state_set { + let setup_thread = self.setup_thread.lock().unwrap(); + if setup_thread.is_none() { + // unlock() could potentially cancel the setup, and create() can be called after unlock() without going through start() again. + self.start_setup(setup_thread); + } + } + + { + // wait for the setup to be completed + let mut setup_thread = self.setup_thread.lock().unwrap(); + if let Some(setup) = setup_thread.take() { + let res = setup.thread_handle.join().unwrap(); + + match res { + Err(_aborted) => { + gst::debug!(CAT, imp: self, "setup has been cancelled"); + return Err(gst::FlowError::Flushing); + } + Ok(Err(err)) => { + let details = format!("{:?}", err); + gst::error!(CAT, imp: self, "failed to start: {}", details); + gst::element_imp_error!(self, gst::ResourceError::Settings, [&details]); + return Err(gst::FlowError::Error); + } + Ok(Ok(_)) => {} + } + } + } + let state = self.state.lock().unwrap(); let state = state.as_ref().unwrap(); @@ -290,8 +343,97 @@ impl PushSrcImpl for SpotifyAudioSrc { } } +struct BufferSink { + sender: mpsc::SyncSender<Message>, +} + +impl Sink for BufferSink { + fn write(&mut self, packet: AudioPacket, _converter: &mut Converter) -> SinkResult<()> { + let oggdata = match packet { + AudioPacket::OggData(data) => data, + AudioPacket::Samples(_) => unimplemented!(), + }; + let buffer = gst::Buffer::from_slice(oggdata); + + // ignore if sending fails as that means the source element is being shutdown + let _ = self.sender.send(Message::Buffer(buffer)); + + Ok(()) + } +} + +impl URIHandlerImpl for SpotifyAudioSrc { + const URI_TYPE: gst::URIType = gst::URIType::Src; + + fn protocols() -> &'static [&'static str] { + &["spotify"] + } + + fn uri(&self) -> Option<String> { + let settings = self.settings.lock().unwrap(); + + if settings.track.is_empty() { + None + } else { + Some(settings.track.clone()) + } + } + + fn set_uri(&self, uri: &str) -> Result<(), glib::Error> { + gst::debug!(CAT, imp: self, "set URI: {}", uri); + + let url = url::Url::parse(uri) + .map_err(|e| glib::Error::new(gst::URIError::BadUri, &format!("{:?}", e)))?; + + // allow to configure auth and cache settings from the URI + for (key, value) in url.query_pairs() { + match key.as_ref() { + "username" | "password" | "cache-credentials" | "cache-files" => { + self.obj().set_property(&key, value.as_ref()); + } + _ => { + gst::warning!(CAT, imp: self, "unsupported query: {}={}", key, value); + } + } + } + + self.obj() + .set_property("track", format!("{}:{}", url.scheme(), url.path())); + + Ok(()) + } +} + impl SpotifyAudioSrc { + fn start_setup(&self, mut setup_thread: MutexGuard<Option<SetupThread>>) { + let self_ = self.to_owned(); + + // run the runtime from another thread to prevent the "start a runtime from within a runtime" panic + // when the plugin is statically linked. + let (abort_handle, abort_registration) = AbortHandle::new_pair(); + let thread_handle = std::thread::spawn(move || { + RUNTIME.block_on(async move { + let future = Abortable::new(self_.setup(), abort_registration); + future.await + }) + }); + + setup_thread.replace(SetupThread { + thread_handle, + abort_handle, + }); + } + async fn setup(&self) -> anyhow::Result<()> { + { + let state = self.state.lock().unwrap(); + + if state.is_some() { + // already setup + return Ok(()); + } + } + let (credentials, cache, track) = { let settings = self.settings.lock().unwrap(); @@ -343,8 +485,6 @@ impl SpotifyAudioSrc { (credentials, cache, settings.track.clone()) }; - let state = self.state.clone(); - let (session, _credentials) = Session::connect(SessionConfig::default(), credentials, Some(cache), false).await?; @@ -385,7 +525,8 @@ impl SpotifyAudioSrc { } }); - let mut state = state.lock().unwrap(); + let mut state = self.state.lock().unwrap(); + state.replace(State { player, receiver, @@ -394,65 +535,12 @@ impl SpotifyAudioSrc { Ok(()) } -} - -struct BufferSink { - sender: mpsc::SyncSender<Message>, -} -impl Sink for BufferSink { - fn write(&mut self, packet: AudioPacket, _converter: &mut Converter) -> SinkResult<()> { - let oggdata = match packet { - AudioPacket::OggData(data) => data, - AudioPacket::Samples(_) => unimplemented!(), - }; - let buffer = gst::Buffer::from_slice(oggdata); - - // ignore if sending fails as that means the source element is being shutdown - let _ = self.sender.send(Message::Buffer(buffer)); - - Ok(()) - } -} + fn cancel_setup(&self) { + let mut setup_thread = self.setup_thread.lock().unwrap(); -impl URIHandlerImpl for SpotifyAudioSrc { - const URI_TYPE: gst::URIType = gst::URIType::Src; - - fn protocols() -> &'static [&'static str] { - &["spotify"] - } - - fn uri(&self) -> Option<String> { - let settings = self.settings.lock().unwrap(); - - if settings.track.is_empty() { - None - } else { - Some(settings.track.clone()) - } - } - - fn set_uri(&self, uri: &str) -> Result<(), glib::Error> { - gst::debug!(CAT, imp: self, "set URI: {}", uri); - - let url = url::Url::parse(uri) - .map_err(|e| glib::Error::new(gst::URIError::BadUri, &format!("{:?}", e)))?; - - // allow to configure auth and cache settings from the URI - for (key, value) in url.query_pairs() { - match key.as_ref() { - "username" | "password" | "cache-credentials" | "cache-files" => { - self.obj().set_property(&key, value.as_ref()); - } - _ => { - gst::warning!(CAT, imp: self, "unsupported query: {}={}", key, value); - } - } + if let Some(setup) = setup_thread.take() { + setup.abort_handle.abort(); } - - self.obj() - .set_property("track", format!("{}:{}", url.scheme(), url.path())); - - Ok(()) } } |