Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Dröge <sebastian@centricular.com>2020-11-14 19:52:56 +0300
committerSebastian Dröge <sebastian@centricular.com>2020-11-15 19:25:42 +0300
commitf54f9f977ec2143028c6068ac43bff5da4bea2ee (patch)
tree86a035ad3a6a2457f5a1ee1c73288adf28136102 /audio/csound/src/filter/imp.rs
parentf43436056eeb612c573fc722e1f3e8c1d1f74666 (diff)
audio: Update for subclassing API changes
Diffstat (limited to 'audio/csound/src/filter/imp.rs')
-rw-r--r--audio/csound/src/filter/imp.rs675
1 files changed, 675 insertions, 0 deletions
diff --git a/audio/csound/src/filter/imp.rs b/audio/csound/src/filter/imp.rs
new file mode 100644
index 000000000..fa85d786d
--- /dev/null
+++ b/audio/csound/src/filter/imp.rs
@@ -0,0 +1,675 @@
+// Copyright (C) 2020 Natanael Mojica <neithanmo@gmail.com>
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Library General Public
+// License as published by the Free Software Foundation; either
+// version 2 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Library General Public License for more details.
+//
+// You should have received a copy of the GNU Library General Public
+// License along with this library; if not, write to the
+// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
+// Boston, MA 02110-1335, USA.
+
+use glib::glib_object_subclass;
+use glib::subclass;
+use glib::subclass::prelude::*;
+use gst::prelude::*;
+use gst::subclass::prelude::*;
+use gst::{
+ gst_debug, gst_element_error, gst_error, gst_error_msg, gst_info, gst_log, gst_loggable_error,
+ gst_warning,
+};
+use gst_base::subclass::base_transform::GenerateOutputSuccess;
+use gst_base::subclass::prelude::*;
+
+use std::sync::atomic::{AtomicBool, Ordering};
+
+use std::sync::Mutex;
+use std::{f64, i32};
+
+use byte_slice_cast::*;
+
+use csound::{Csound, MessageType};
+
+use once_cell::sync::Lazy;
+
+static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
+ gst::DebugCategory::new(
+ "csoundfilter",
+ gst::DebugColorFlags::empty(),
+ Some("Audio Filter based on Csound"),
+ )
+});
+
+const SCORE_OFFSET_DEFAULT: f64 = 0f64;
+const DEFAULT_LOOP: bool = false;
+
+#[derive(Debug, Clone)]
+struct Settings {
+ pub loop_: bool,
+ pub location: Option<String>,
+ pub csd_text: Option<String>,
+ pub offset: f64,
+}
+
+impl Default for Settings {
+ fn default() -> Self {
+ Settings {
+ loop_: DEFAULT_LOOP,
+ location: None,
+ csd_text: None,
+ offset: SCORE_OFFSET_DEFAULT,
+ }
+ }
+}
+
+struct State {
+ in_info: gst_audio::AudioInfo,
+ out_info: gst_audio::AudioInfo,
+ adapter: gst_base::UniqueAdapter,
+ ksmps: u32,
+}
+
+pub struct CsoundFilter {
+ settings: Mutex<Settings>,
+ state: Mutex<Option<State>>,
+ csound: Mutex<Csound>,
+ compiled: AtomicBool,
+}
+
+static PROPERTIES: [subclass::Property; 4] = [
+ subclass::Property("loop", |name| {
+ glib::ParamSpec::boolean(
+ name,
+ "Loop",
+ "loop over the score (can be changed in PLAYING or PAUSED state)",
+ DEFAULT_LOOP,
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("location", |name| {
+ glib::ParamSpec::string(
+ name,
+ "Location",
+ "Location of the csd file to be used by csound.
+ Use either location or CSD-text but not both at the same time, if so and error would be triggered",
+ None,
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("csd-text", |name| {
+ glib::ParamSpec::string(
+ name,
+ "CSD-text",
+ "The content of a csd file passed as a String.
+ Use either location or csd-text but not both at the same time, if so and error would be triggered",
+ None,
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("score_offset", |name| {
+ glib::ParamSpec::double(
+ name,
+ "Score Offset",
+ "Score offset in seconds to start the performance",
+ 0.0,
+ f64::MAX,
+ SCORE_OFFSET_DEFAULT,
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+];
+
+impl State {
+ // Considering an input of size: input_size and the user's ksmps,
+ // calculates the equivalent output_size
+ fn max_output_size(&self, input_size: usize) -> usize {
+ let in_samples = input_size / self.in_info.bpf() as usize;
+ let in_process_samples = in_samples - (in_samples % self.ksmps as usize);
+ in_process_samples * self.out_info.bpf() as usize
+ }
+
+ fn get_bytes_to_read(&mut self, output_size: usize) -> usize {
+ // The max amount of bytes at the input that We would need
+ // for filling an output buffer of size *output_size*
+ (output_size / self.out_info.bpf() as usize) * self.in_info.bpf() as usize
+ }
+
+ // returns the spin capacity in bytes
+ fn spin_capacity(&self) -> usize {
+ (self.ksmps * self.in_info.bpf()) as _
+ }
+
+ fn needs_more_data(&self) -> bool {
+ self.adapter.available() < self.spin_capacity()
+ }
+
+ fn samples_to_time(&self, samples: u64) -> gst::ClockTime {
+ gst::ClockTime(samples.mul_div_round(gst::SECOND_VAL, self.in_info.rate() as u64))
+ }
+
+ fn get_current_pts(&self) -> gst::ClockTime {
+ // get the last seen pts and the amount of bytes
+ // since then
+ let (prev_pts, distance) = self.adapter.prev_pts();
+
+ // Use the distance to get the amount of samples
+ // and with it calculate the time-offset which
+ // can be added to the prev_pts to get the
+ // pts at the beginning of the adapter.
+ let samples = distance / self.in_info.bpf() as u64;
+ prev_pts + self.samples_to_time(samples)
+ }
+
+ fn buffer_duration(&self, buffer_size: u64) -> gst::ClockTime {
+ let samples = buffer_size / self.out_info.bpf() as u64;
+ self.samples_to_time(samples)
+ }
+}
+
+impl CsoundFilter {
+ fn process(&self, csound: &mut Csound, idata: &[f64], odata: &mut [f64]) -> bool {
+ let spin = csound.get_spin().unwrap();
+ let spout = csound.get_spout().unwrap();
+
+ let in_chunks = idata.chunks_exact(spin.len());
+ let out_chuncks = odata.chunks_exact_mut(spout.len());
+ let mut end_score = false;
+ for (ichunk, ochunk) in in_chunks.zip(out_chuncks) {
+ spin.copy_from_slice(ichunk);
+ end_score = csound.perform_ksmps();
+ spout.copy_to_slice(ochunk);
+ }
+
+ end_score
+ }
+
+ fn compile_score(&self) -> std::result::Result<(), gst::ErrorMessage> {
+ let csound = self.csound.lock().unwrap();
+ let settings = self.settings.lock().unwrap();
+ if let Some(ref location) = settings.location {
+ csound
+ .compile_csd(location)
+ .map_err(|e| gst_error_msg!(gst::LibraryError::Failed, [e]))?;
+ } else if let Some(ref text) = settings.csd_text {
+ csound
+ .compile_csd_text(text)
+ .map_err(|e| gst_error_msg!(gst::LibraryError::Failed, [e]))?;
+ } else {
+ return Err(gst_error_msg!(
+ gst::LibraryError::Failed,
+ ["No Csound score specified to compile. Use either location or csd-text but not both"]
+ ));
+ }
+
+ self.compiled.store(true, Ordering::SeqCst);
+ Ok(())
+ }
+
+ fn message_callback(msg_type: MessageType, msg: &str) {
+ match msg_type {
+ MessageType::CSOUNDMSG_ERROR => gst_error!(CAT, "{}", msg),
+ MessageType::CSOUNDMSG_WARNING => gst_warning!(CAT, "{}", msg),
+ MessageType::CSOUNDMSG_ORCH => gst_info!(CAT, "{}", msg),
+ MessageType::CSOUNDMSG_REALTIME => gst_log!(CAT, "{}", msg),
+ MessageType::CSOUNDMSG_DEFAULT => gst_log!(CAT, "{}", msg),
+ MessageType::CSOUNDMSG_STDOUT => gst_log!(CAT, "{}", msg),
+ }
+ }
+
+ fn drain(&self, element: &super::CsoundFilter) -> Result<gst::FlowSuccess, gst::FlowError> {
+ let csound = self.csound.lock().unwrap();
+ let mut state_lock = self.state.lock().unwrap();
+ let state = state_lock.as_mut().unwrap();
+
+ let avail = state.adapter.available();
+
+ // Complete processing blocks should have been processed in the transform call
+ assert!(avail < state.spin_capacity());
+
+ if avail == 0 {
+ return Ok(gst::FlowSuccess::Ok);
+ }
+
+ let mut spin = csound.get_spin().unwrap();
+ let spout = csound.get_spout().unwrap();
+
+ let out_bytes =
+ (avail / state.in_info.channels() as usize) * state.out_info.channels() as usize;
+
+ let mut buffer = gst::Buffer::with_size(out_bytes).map_err(|e| {
+ gst_error!(
+ CAT,
+ obj: element,
+ "Failed to allocate buffer at EOS {:?}",
+ e
+ );
+ gst::FlowError::Flushing
+ })?;
+
+ let buffer_mut = buffer.get_mut().ok_or(gst::FlowError::NotSupported)?;
+
+ let pts = state.get_current_pts();
+ let duration = state.buffer_duration(out_bytes as _);
+
+ buffer_mut.set_pts(pts);
+ buffer_mut.set_duration(duration);
+
+ let srcpad = element.get_static_pad("src").unwrap();
+
+ let adapter_map = state.adapter.map(avail).unwrap();
+ let data = adapter_map
+ .as_ref()
+ .as_slice_of::<f64>()
+ .map_err(|_| gst::FlowError::NotSupported)?;
+
+ let mut omap = buffer_mut
+ .map_writable()
+ .map_err(|_| gst::FlowError::NotSupported)?;
+ let odata = omap
+ .as_mut_slice_of::<f64>()
+ .map_err(|_| gst::FlowError::NotSupported)?;
+
+ spin.clear();
+ spin.copy_from_slice(data);
+ csound.perform_ksmps();
+ spout.copy_to_slice(odata);
+
+ drop(adapter_map);
+ drop(omap);
+
+ state.adapter.flush(avail);
+ // Drop the locks before pushing buffers into the srcpad
+ drop(state_lock);
+ drop(csound);
+
+ srcpad.push(buffer)
+ }
+
+ fn generate_output(
+ &self,
+ element: &super::CsoundFilter,
+ state: &mut State,
+ ) -> Result<GenerateOutputSuccess, gst::FlowError> {
+ let output_size = state.max_output_size(state.adapter.available());
+
+ let mut output = gst::Buffer::with_size(output_size).map_err(|_| gst::FlowError::Error)?;
+ let outbuf = output.get_mut().ok_or(gst::FlowError::Error)?;
+
+ let pts = state.get_current_pts();
+ let duration = state.buffer_duration(output_size as _);
+
+ outbuf.set_pts(pts);
+ outbuf.set_duration(duration);
+
+ gst_log!(
+ CAT,
+ obj: element,
+ "Generating output at: {} - duration: {}",
+ pts,
+ duration
+ );
+
+ // Get the required amount of bytes to be read from
+ // the adapter to fill an ouput buffer of size output_size
+ let bytes_to_read = state.get_bytes_to_read(output_size);
+
+ let indata = state
+ .adapter
+ .map(bytes_to_read)
+ .map_err(|_| gst::FlowError::Error)?;
+ let idata = indata
+ .as_ref()
+ .as_slice_of::<f64>()
+ .map_err(|_| gst::FlowError::Error)?;
+
+ let mut omap = outbuf.map_writable().map_err(|_| gst::FlowError::Error)?;
+ let odata = omap
+ .as_mut_slice_of::<f64>()
+ .map_err(|_| gst::FlowError::Error)?;
+
+ let mut csound = self.csound.lock().unwrap();
+ let end_score = self.process(&mut csound, idata, odata);
+
+ drop(indata);
+ drop(omap);
+ state.adapter.flush(bytes_to_read);
+
+ if end_score {
+ let settings = self.settings.lock().unwrap();
+ if settings.loop_ {
+ csound.set_score_offset_seconds(settings.offset);
+ csound.rewind_score();
+ } else {
+ // clear the adapter here because our eos event handler
+ // will try to flush it calling csound.perform()
+ // which does not make sense since
+ // the end of score has been reached.
+ state.adapter.clear();
+ return Err(gst::FlowError::Eos);
+ }
+ }
+
+ Ok(GenerateOutputSuccess::Buffer(output))
+ }
+}
+
+impl ObjectSubclass for CsoundFilter {
+ const NAME: &'static str = "CsoundFilter";
+ type Type = super::CsoundFilter;
+ type ParentType = gst_base::BaseTransform;
+ type Instance = gst::subclass::ElementInstanceStruct<Self>;
+ type Class = subclass::simple::ClassStruct<Self>;
+
+ glib_object_subclass!();
+
+ fn new() -> Self {
+ let csound = Csound::new();
+ // create the csound instance and configure
+ csound.message_string_callback(Self::message_callback);
+ // Disable all default handling of sound I/O by csound internal library
+ // by giving to it a hardware buffer size of zero, and setting a state,
+ // higher than zero.
+ csound.set_host_implemented_audioIO(1, 0);
+ // We don't want csound to write samples to our HW
+ csound.set_option("--nosound").unwrap();
+ Self {
+ settings: Mutex::new(Default::default()),
+ state: Mutex::new(None),
+ csound: Mutex::new(csound),
+ compiled: AtomicBool::new(false),
+ }
+ }
+
+ fn class_init(klass: &mut Self::Class) {
+ klass.set_metadata(
+ "Audio filter",
+ "Filter/Effect/Audio",
+ "Implement an audio filter/effects using Csound",
+ "Natanael Mojica <neithanmo@gmail.com>",
+ );
+
+ let caps = gst::Caps::new_simple(
+ "audio/x-raw",
+ &[
+ ("format", &gst_audio::AUDIO_FORMAT_F64.to_str()),
+ ("rate", &gst::IntRange::<i32>::new(1, i32::MAX)),
+ ("channels", &gst::IntRange::<i32>::new(1, i32::MAX)),
+ ("layout", &"interleaved"),
+ ],
+ );
+ let src_pad_template = gst::PadTemplate::new(
+ "src",
+ gst::PadDirection::Src,
+ gst::PadPresence::Always,
+ &caps,
+ )
+ .unwrap();
+ klass.add_pad_template(src_pad_template);
+
+ let sink_pad_template = gst::PadTemplate::new(
+ "sink",
+ gst::PadDirection::Sink,
+ gst::PadPresence::Always,
+ &caps,
+ )
+ .unwrap();
+ klass.add_pad_template(sink_pad_template);
+
+ klass.install_properties(&PROPERTIES);
+
+ klass.configure(
+ gst_base::subclass::BaseTransformMode::NeverInPlace,
+ false,
+ false,
+ );
+ }
+}
+
+impl ObjectImpl for CsoundFilter {
+ fn set_property(&self, _obj: &Self::Type, id: usize, value: &glib::Value) {
+ let prop = &PROPERTIES[id];
+ match *prop {
+ subclass::Property("loop", ..) => {
+ let mut settings = self.settings.lock().unwrap();
+ settings.loop_ = value.get_some().expect("type checked upstream");
+ }
+ subclass::Property("location", ..) => {
+ let mut settings = self.settings.lock().unwrap();
+ if self.state.lock().unwrap().is_none() {
+ settings.location = match value.get::<String>() {
+ Ok(location) => location,
+ _ => unreachable!("type checked upstream"),
+ };
+ }
+ }
+ subclass::Property("csd-text", ..) => {
+ let mut settings = self.settings.lock().unwrap();
+ if self.state.lock().unwrap().is_none() {
+ settings.csd_text = match value.get::<String>() {
+ Ok(text) => text,
+ _ => unreachable!("type checked upstream"),
+ };
+ }
+ }
+ subclass::Property("score_offset", ..) => {
+ let mut settings = self.settings.lock().unwrap();
+ settings.offset = value.get_some().expect("type checked upstream");
+ }
+ _ => unimplemented!(),
+ }
+ }
+
+ fn get_property(&self, _obj: &Self::Type, id: usize) -> Result<glib::Value, ()> {
+ let prop = &PROPERTIES[id];
+
+ match *prop {
+ subclass::Property("loop", ..) => {
+ let settings = self.settings.lock().unwrap();
+ Ok(settings.loop_.to_value())
+ }
+ subclass::Property("location", ..) => {
+ let settings = self.settings.lock().unwrap();
+ Ok(settings.location.to_value())
+ }
+ subclass::Property("csd-text", ..) => {
+ let settings = self.settings.lock().unwrap();
+ Ok(settings.csd_text.to_value())
+ }
+ subclass::Property("score_offset", ..) => {
+ let settings = self.settings.lock().unwrap();
+ Ok(settings.offset.to_value())
+ }
+ _ => unimplemented!(),
+ }
+ }
+}
+
+impl ElementImpl for CsoundFilter {}
+
+impl BaseTransformImpl for CsoundFilter {
+ fn start(&self, _element: &Self::Type) -> std::result::Result<(), gst::ErrorMessage> {
+ self.compile_score()?;
+
+ let csound = self.csound.lock().unwrap();
+ let settings = self.settings.lock().unwrap();
+ csound.set_score_offset_seconds(settings.offset);
+
+ if let Err(e) = csound.start() {
+ return Err(gst_error_msg!(gst::LibraryError::Failed, [e]));
+ }
+
+ Ok(())
+ }
+
+ fn stop(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> {
+ let csound = self.csound.lock().unwrap();
+ csound.stop();
+ csound.reset();
+ let _ = self.state.lock().unwrap().take();
+
+ gst_info!(CAT, obj: element, "Stopped");
+
+ Ok(())
+ }
+
+ fn sink_event(&self, element: &Self::Type, event: gst::Event) -> bool {
+ use gst::EventView;
+
+ if let EventView::Eos(_) = event.view() {
+ gst_log!(CAT, obj: element, "Handling Eos");
+ if self.drain(element).is_err() {
+ return false;
+ }
+ }
+ self.parent_sink_event(element, event)
+ }
+
+ fn transform_caps(
+ &self,
+ element: &Self::Type,
+ direction: gst::PadDirection,
+ caps: &gst::Caps,
+ filter: Option<&gst::Caps>,
+ ) -> Option<gst::Caps> {
+ let compiled = self.compiled.load(Ordering::SeqCst);
+
+ let mut other_caps = {
+ // Our caps proposal
+ let mut new_caps = caps.clone();
+ if compiled {
+ let csound = self.csound.lock().unwrap();
+ // Use the sample rate and channels configured in the csound score
+ let sr = csound.get_sample_rate() as i32;
+ let ichannels = csound.input_channels() as i32;
+ let ochannels = csound.output_channels() as i32;
+ for s in new_caps.make_mut().iter_mut() {
+ s.set("format", &gst_audio::AUDIO_FORMAT_F64.to_str());
+ s.set("rate", &sr);
+
+ // replace the channel property with our values,
+ // if they are not supported, the negotiation will fail.
+ if direction == gst::PadDirection::Src {
+ s.set("channels", &ichannels);
+ } else {
+ s.set("channels", &ochannels);
+ }
+ // Csound does not have a concept of channel-mask
+ s.remove_field("channel-mask");
+ }
+ }
+ new_caps
+ };
+
+ gst_debug!(
+ CAT,
+ obj: element,
+ "Transformed caps from {} to {} in direction {:?}",
+ caps,
+ other_caps,
+ direction
+ );
+
+ if let Some(filter) = filter {
+ other_caps = filter.intersect_with_mode(&other_caps, gst::CapsIntersectMode::First);
+ }
+
+ Some(other_caps)
+ }
+
+ fn set_caps(
+ &self,
+ element: &Self::Type,
+ incaps: &gst::Caps,
+ outcaps: &gst::Caps,
+ ) -> Result<(), gst::LoggableError> {
+ // Flush previous state
+ if self.state.lock().unwrap().is_some() {
+ self.drain(element).map_err(|e| {
+ gst_loggable_error!(CAT, "Error flusing previous state data {:?}", e)
+ })?;
+ }
+
+ let in_info = gst_audio::AudioInfo::from_caps(incaps)
+ .map_err(|_| gst_loggable_error!(CAT, "Failed to parse input caps"))?;
+ let out_info = gst_audio::AudioInfo::from_caps(outcaps)
+ .map_err(|_| gst_loggable_error!(CAT, "Failed to parse output caps"))?;
+
+ let csound = self.csound.lock().unwrap();
+
+ let ichannels = in_info.channels();
+ let ochannels = out_info.channels();
+ let rate = in_info.rate();
+
+ // Check if the negotiated caps are the right ones
+ if rate != out_info.rate() || rate != csound.get_sample_rate() as _ {
+ return Err(gst_loggable_error!(
+ CAT,
+ "Failed to negotiate caps: invalid sample rate {}",
+ rate
+ ));
+ } else if ichannels != csound.input_channels() {
+ return Err(gst_loggable_error!(
+ CAT,
+ "Failed to negotiate caps: input channels {} not supported",
+ ichannels
+ ));
+ } else if ochannels != csound.output_channels() {
+ return Err(gst_loggable_error!(
+ CAT,
+ "Failed to negotiate caps: output channels {} not supported",
+ ochannels
+ ));
+ }
+
+ let ksmps = csound.get_ksmps();
+
+ let adapter = gst_base::UniqueAdapter::new();
+
+ let mut state_lock = self.state.lock().unwrap();
+ *state_lock = Some(State {
+ in_info,
+ out_info,
+ adapter,
+ ksmps,
+ });
+
+ Ok(())
+ }
+
+ fn generate_output(
+ &self,
+ element: &Self::Type,
+ ) -> Result<GenerateOutputSuccess, gst::FlowError> {
+ // Check if there are enough data in the queued buffer and adapter,
+ // if it is not the case, just notify the parent class to not generate
+ // an output
+ if let Some(buffer) = self.take_queued_buffer() {
+ if buffer.get_flags() == gst::BufferFlags::DISCONT {
+ self.drain(element)?;
+ }
+
+ let mut state_guard = self.state.lock().unwrap();
+ let state = state_guard.as_mut().ok_or_else(|| {
+ gst_element_error!(
+ element,
+ gst::CoreError::Negotiation,
+ ["Can not generate an output without State"]
+ );
+ gst::FlowError::NotNegotiated
+ })?;
+
+ state.adapter.push(buffer);
+ if !state.needs_more_data() {
+ return self.generate_output(element, state);
+ }
+ }
+ gst_log!(CAT, "No enough data to generate output");
+ Ok(GenerateOutputSuccess::NoOutput)
+ }
+}