diff options
author | Sebastian Dröge <sebastian@centricular.com> | 2022-10-09 16:06:59 +0300 |
---|---|---|
committer | Sebastian Dröge <sebastian@centricular.com> | 2022-10-10 15:03:25 +0300 |
commit | 7ee4afacf413b2e3c386bb1070994ed4325994e6 (patch) | |
tree | eddcc0e047ab4704e5a459dd551a55196e8a1848 /generic | |
parent | 7818ac658b02417fda071ce025b6d6a7fdb54a76 (diff) |
Change *Impl trait methods to only take &self and not Self::Type in addition
Diffstat (limited to 'generic')
-rw-r--r-- | generic/file/src/filesink/imp.rs | 60 | ||||
-rw-r--r-- | generic/file/src/filesrc/imp.rs | 67 | ||||
-rw-r--r-- | generic/fmp4/src/fmp4mux/imp.rs | 203 | ||||
-rw-r--r-- | generic/sodium/src/decrypter/imp.rs | 86 | ||||
-rw-r--r-- | generic/sodium/src/encrypter/imp.rs | 56 | ||||
-rw-r--r-- | generic/threadshare/examples/standalone/sink/imp.rs | 75 | ||||
-rw-r--r-- | generic/threadshare/examples/standalone/src/imp.rs | 87 | ||||
-rw-r--r-- | generic/threadshare/src/appsrc/imp.rs | 87 | ||||
-rw-r--r-- | generic/threadshare/src/inputselector/imp.rs | 55 | ||||
-rw-r--r-- | generic/threadshare/src/jitterbuffer/imp.rs | 74 | ||||
-rw-r--r-- | generic/threadshare/src/lib.rs | 32 | ||||
-rw-r--r-- | generic/threadshare/src/proxy/imp.rs | 173 | ||||
-rw-r--r-- | generic/threadshare/src/queue/imp.rs | 96 | ||||
-rw-r--r-- | generic/threadshare/src/runtime/pad.rs | 30 | ||||
-rw-r--r-- | generic/threadshare/src/tcpclientsrc/imp.rs | 67 | ||||
-rw-r--r-- | generic/threadshare/src/udpsink/imp.rs | 81 | ||||
-rw-r--r-- | generic/threadshare/src/udpsrc/imp.rs | 67 | ||||
-rw-r--r-- | generic/threadshare/tests/pad.rs | 145 |
18 files changed, 664 insertions, 877 deletions
diff --git a/generic/file/src/filesink/imp.rs b/generic/file/src/filesink/imp.rs index 334bc1394..437b1370b 100644 --- a/generic/file/src/filesink/imp.rs +++ b/generic/file/src/filesink/imp.rs @@ -65,11 +65,7 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| { }); impl FileSink { - fn set_location( - &self, - element: &super::FileSink, - location: Option<FileLocation>, - ) -> Result<(), glib::Error> { + fn set_location(&self, location: Option<FileLocation>) -> Result<(), glib::Error> { let state = self.state.lock().unwrap(); if let State::Started { .. } = *state { return Err(glib::Error::new( @@ -85,20 +81,20 @@ impl FileSink { Some(ref location_cur) => { gst::info!( CAT, - obj: element, + imp: self, "Changing `location` from {:?} to {}", location_cur, location, ); } None => { - gst::info!(CAT, obj: element, "Setting `location` to {}", location,); + gst::info!(CAT, imp: self, "Setting `location` to {}", location,); } } Some(location) } None => { - gst::info!(CAT, obj: element, "Resetting `location` to None",); + gst::info!(CAT, imp: self, "Resetting `location` to None",); None } }; @@ -127,31 +123,25 @@ impl ObjectImpl for FileSink { PROPERTIES.as_ref() } - fn set_property( - &self, - obj: &Self::Type, - _id: usize, - value: &glib::Value, - pspec: &glib::ParamSpec, - ) { + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { match pspec.name() { "location" => { let res = match value.get::<Option<String>>() { Ok(Some(location)) => FileLocation::try_from_path_str(location) - .and_then(|file_location| self.set_location(obj, Some(file_location))), - Ok(None) => self.set_location(obj, None), + .and_then(|file_location| self.set_location(Some(file_location))), + Ok(None) => self.set_location(None), Err(_) => unreachable!("type checked upstream"), }; if let Err(err) = res { - gst::error!(CAT, obj: obj, "Failed to set property `location`: {}", err); + gst::error!(CAT, imp: self, "Failed to set property `location`: {}", err); } } _ => unimplemented!(), }; } - fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { match pspec.name() { "location" => { let settings = self.settings.lock().unwrap(); @@ -202,7 +192,7 @@ impl ElementImpl for FileSink { } impl BaseSinkImpl for FileSink { - fn start(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> { + fn start(&self) -> Result<(), gst::ErrorMessage> { let mut state = self.state.lock().unwrap(); if let State::Started { .. } = *state { unreachable!("FileSink already started"); @@ -226,15 +216,15 @@ impl BaseSinkImpl for FileSink { ] ) })?; - gst::debug!(CAT, obj: element, "Opened file {:?}", file); + gst::debug!(CAT, imp: self, "Opened file {:?}", file); *state = State::Started { file, position: 0 }; - gst::info!(CAT, obj: element, "Started"); + gst::info!(CAT, imp: self, "Started"); Ok(()) } - fn stop(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> { + fn stop(&self) -> Result<(), gst::ErrorMessage> { let mut state = self.state.lock().unwrap(); if let State::Stopped = *state { return Err(gst::error_msg!( @@ -244,18 +234,14 @@ impl BaseSinkImpl for FileSink { } *state = State::Stopped; - gst::info!(CAT, obj: element, "Stopped"); + gst::info!(CAT, imp: self, "Stopped"); Ok(()) } // TODO: implement seek in BYTES format - fn render( - &self, - element: &Self::Type, - buffer: &gst::Buffer, - ) -> Result<gst::FlowSuccess, gst::FlowError> { + fn render(&self, buffer: &gst::Buffer) -> Result<gst::FlowSuccess, gst::FlowError> { let mut state = self.state.lock().unwrap(); let (file, position) = match *state { State::Started { @@ -263,20 +249,20 @@ impl BaseSinkImpl for FileSink { ref mut position, } => (file, position), State::Stopped => { - gst::element_error!(element, gst::CoreError::Failed, ["Not started yet"]); + gst::element_imp_error!(self, gst::CoreError::Failed, ["Not started yet"]); return Err(gst::FlowError::Error); } }; - gst::trace!(CAT, obj: element, "Rendering {:?}", buffer); + gst::trace!(CAT, imp: self, "Rendering {:?}", buffer); let map = buffer.map_readable().map_err(|_| { - gst::element_error!(element, gst::CoreError::Failed, ["Failed to map buffer"]); + gst::element_imp_error!(self, gst::CoreError::Failed, ["Failed to map buffer"]); gst::FlowError::Error })?; file.write_all(map.as_ref()).map_err(|err| { - gst::element_error!( - element, + gst::element_imp_error!( + self, gst::ResourceError::Write, ["Failed to write buffer: {}", err] ); @@ -296,7 +282,7 @@ impl URIHandlerImpl for FileSink { &["file"] } - fn uri(&self, _element: &Self::Type) -> Option<String> { + fn uri(&self) -> Option<String> { let settings = self.settings.lock().unwrap(); // Conversion to Url already checked while building the `FileLocation` @@ -307,13 +293,13 @@ impl URIHandlerImpl for FileSink { }) } - fn set_uri(&self, element: &Self::Type, uri: &str) -> Result<(), glib::Error> { + fn set_uri(&self, uri: &str) -> Result<(), glib::Error> { // Special case for "file://" as this is used by some applications to test // with `gst_element_make_from_uri` if there's an element that supports the URI protocol if uri != "file://" { let file_location = FileLocation::try_from_uri_str(uri)?; - self.set_location(element, Some(file_location)) + self.set_location(Some(file_location)) } else { Ok(()) } diff --git a/generic/file/src/filesrc/imp.rs b/generic/file/src/filesrc/imp.rs index 886db2e33..be08200db 100644 --- a/generic/file/src/filesrc/imp.rs +++ b/generic/file/src/filesrc/imp.rs @@ -65,11 +65,7 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| { }); impl FileSrc { - fn set_location( - &self, - element: &super::FileSrc, - location: Option<FileLocation>, - ) -> Result<(), glib::Error> { + fn set_location(&self, location: Option<FileLocation>) -> Result<(), glib::Error> { let state = self.state.lock().unwrap(); if let State::Started { .. } = *state { return Err(glib::Error::new( @@ -99,20 +95,20 @@ impl FileSrc { Some(ref location_cur) => { gst::info!( CAT, - obj: element, + imp: self, "Changing `location` from {:?} to {}", location_cur, location, ); } None => { - gst::info!(CAT, obj: element, "Setting `location to {}", location,); + gst::info!(CAT, imp: self, "Setting `location to {}", location,); } } Some(location) } None => { - gst::info!(CAT, obj: element, "Resetting `location` to None",); + gst::info!(CAT, imp: self, "Resetting `location` to None",); None } }; @@ -142,31 +138,25 @@ impl ObjectImpl for FileSrc { PROPERTIES.as_ref() } - fn set_property( - &self, - obj: &Self::Type, - _id: usize, - value: &glib::Value, - pspec: &glib::ParamSpec, - ) { + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { match pspec.name() { "location" => { let res = match value.get::<Option<String>>() { Ok(Some(location)) => FileLocation::try_from_path_str(location) - .and_then(|file_location| self.set_location(obj, Some(file_location))), - Ok(None) => self.set_location(obj, None), + .and_then(|file_location| self.set_location(Some(file_location))), + Ok(None) => self.set_location(None), Err(_) => unreachable!("type checked upstream"), }; if let Err(err) = res { - gst::error!(CAT, obj: obj, "Failed to set property `location`: {}", err); + gst::error!(CAT, imp: self, "Failed to set property `location`: {}", err); } } _ => unimplemented!(), }; } - fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { match pspec.name() { "location" => { let settings = self.settings.lock().unwrap(); @@ -181,10 +171,10 @@ impl ObjectImpl for FileSrc { } } - fn constructed(&self, obj: &Self::Type) { - self.parent_constructed(obj); + fn constructed(&self) { + self.parent_constructed(); - obj.set_format(gst::Format::Bytes); + self.instance().set_format(gst::Format::Bytes); } } @@ -223,11 +213,11 @@ impl ElementImpl for FileSrc { } impl BaseSrcImpl for FileSrc { - fn is_seekable(&self, _src: &Self::Type) -> bool { + fn is_seekable(&self) -> bool { true } - fn size(&self, _src: &Self::Type) -> Option<u64> { + fn size(&self) -> Option<u64> { let state = self.state.lock().unwrap(); if let State::Started { ref file, .. } = *state { file.metadata().ok().map(|m| m.len()) @@ -236,7 +226,7 @@ impl BaseSrcImpl for FileSrc { } } - fn start(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> { + fn start(&self) -> Result<(), gst::ErrorMessage> { let mut state = self.state.lock().unwrap(); if let State::Started { .. } = *state { unreachable!("FileSrc already started"); @@ -261,16 +251,16 @@ impl BaseSrcImpl for FileSrc { ) })?; - gst::debug!(CAT, obj: element, "Opened file {:?}", file); + gst::debug!(CAT, imp: self, "Opened file {:?}", file); *state = State::Started { file, position: 0 }; - gst::info!(CAT, obj: element, "Started"); + gst::info!(CAT, imp: self, "Started"); Ok(()) } - fn stop(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> { + fn stop(&self) -> Result<(), gst::ErrorMessage> { let mut state = self.state.lock().unwrap(); if let State::Stopped = *state { return Err(gst::error_msg!( @@ -281,14 +271,13 @@ impl BaseSrcImpl for FileSrc { *state = State::Stopped; - gst::info!(CAT, obj: element, "Stopped"); + gst::info!(CAT, imp: self, "Stopped"); Ok(()) } fn fill( &self, - element: &Self::Type, offset: u64, _length: u32, buffer: &mut gst::BufferRef, @@ -301,15 +290,15 @@ impl BaseSrcImpl for FileSrc { ref mut position, } => (file, position), State::Stopped => { - gst::element_error!(element, gst::CoreError::Failed, ["Not started yet"]); + gst::element_imp_error!(self, gst::CoreError::Failed, ["Not started yet"]); return Err(gst::FlowError::Error); } }; if *position != offset { file.seek(SeekFrom::Start(offset)).map_err(|err| { - gst::element_error!( - element, + gst::element_imp_error!( + self, gst::LibraryError::Failed, ["Failed to seek to {}: {}", offset, err.to_string()] ); @@ -321,13 +310,13 @@ impl BaseSrcImpl for FileSrc { let size = { let mut map = buffer.map_writable().map_err(|_| { - gst::element_error!(element, gst::LibraryError::Failed, ["Failed to map buffer"]); + gst::element_imp_error!(self, gst::LibraryError::Failed, ["Failed to map buffer"]); gst::FlowError::Error })?; file.read(map.as_mut()).map_err(|err| { - gst::element_error!( - element, + gst::element_imp_error!( + self, gst::LibraryError::Failed, ["Failed to read at {}: {}", offset, err.to_string()] ); @@ -350,7 +339,7 @@ impl URIHandlerImpl for FileSrc { &["file"] } - fn uri(&self, _element: &Self::Type) -> Option<String> { + fn uri(&self) -> Option<String> { let settings = self.settings.lock().unwrap(); // Conversion to Url already checked while building the `FileLocation` @@ -361,13 +350,13 @@ impl URIHandlerImpl for FileSrc { }) } - fn set_uri(&self, element: &Self::Type, uri: &str) -> Result<(), glib::Error> { + fn set_uri(&self, uri: &str) -> Result<(), glib::Error> { // Special case for "file://" as this is used by some applications to test // with `gst_element_make_from_uri` if there's an element that supports the URI protocol if uri != "file://" { let file_location = FileLocation::try_from_uri_str(uri)?; - self.set_location(element, Some(file_location)) + self.set_location(Some(file_location)) } else { Ok(()) } diff --git a/generic/fmp4/src/fmp4mux/imp.rs b/generic/fmp4/src/fmp4mux/imp.rs index a2d232e19..7433a1dd0 100644 --- a/generic/fmp4/src/fmp4mux/imp.rs +++ b/generic/fmp4/src/fmp4mux/imp.rs @@ -183,7 +183,6 @@ pub(crate) struct FMP4Mux { impl FMP4Mux { fn find_earliest_stream<'a>( &self, - element: &super::FMP4Mux, state: &'a mut State, timeout: bool, ) -> Result<Option<(usize, &'a mut Stream)>, gst::FlowError> { @@ -255,21 +254,21 @@ impl FMP4Mux { if !timeout && !all_have_data_or_eos { gst::trace!( CAT, - obj: element, + imp: self, "No timeout and not all streams have a buffer or are EOS" ); Ok(None) } else if let Some((idx, stream, earliest_running_time)) = earliest_stream { gst::trace!( CAT, - obj: element, + imp: self, "Stream {} is earliest stream with running time {}", stream.sinkpad.name(), earliest_running_time ); Ok(Some((idx, stream))) } else { - gst::trace!(CAT, obj: element, "No streams have data queued currently"); + gst::trace!(CAT, imp: self, "No streams have data queued currently"); Ok(None) } } @@ -277,7 +276,6 @@ impl FMP4Mux { // Queue incoming buffers as individual GOPs. fn queue_gops( &self, - element: &super::FMP4Mux, _idx: usize, stream: &mut Stream, segment: &gst::FormattedSegment<gst::ClockTime>, @@ -439,7 +437,8 @@ impl FMP4Mux { // If this is a multi-stream element then we need to update the PTS/DTS positions according // to the output segment, specifically to re-timestamp them with the running time and // adjust for the segment shift to compensate for negative DTS. - let class = element.class(); + let aggregator = self.instance(); + let class = aggregator.class(); let (pts_position, dts_position) = if class.as_ref().variant.is_single_stream() { (pts_position, dts_position) } else { @@ -609,7 +608,6 @@ impl FMP4Mux { #[allow(clippy::type_complexity)] fn drain_buffers( &self, - element: &super::FMP4Mux, state: &mut State, settings: &Settings, timeout: bool, @@ -652,7 +650,7 @@ impl FMP4Mux { let fragment_start_pts = state.fragment_start_pts.unwrap(); gst::info!( CAT, - obj: element, + imp: self, "Starting to drain at {}", fragment_start_pts ); @@ -898,7 +896,6 @@ impl FMP4Mux { fn preprocess_drained_streams_onvif( &self, - element: &super::FMP4Mux, state: &mut State, drained_streams: &mut [( gst::Caps, @@ -906,7 +903,8 @@ impl FMP4Mux { VecDeque<Buffer>, )], ) -> Result<Option<gst::ClockTime>, gst::FlowError> { - if element.class().as_ref().variant != super::Variant::ONVIF { + let aggregator = self.instance(); + if aggregator.class().as_ref().variant != super::Variant::ONVIF { return Ok(None); } @@ -1007,7 +1005,7 @@ impl FMP4Mux { gst::debug!( CAT, - obj: element, + imp: self, "Configuring start UTC time {}", start_utc_time.unwrap() ); @@ -1150,7 +1148,6 @@ impl FMP4Mux { #[allow(clippy::type_complexity)] fn interleave_buffers( &self, - _element: &super::FMP4Mux, settings: &Settings, mut drained_streams: Vec<( gst::Caps, @@ -1224,7 +1221,6 @@ impl FMP4Mux { fn drain( &self, - element: &super::FMP4Mux, state: &mut State, settings: &Settings, timeout: bool, @@ -1232,9 +1228,9 @@ impl FMP4Mux { upstream_events: &mut Vec<(gst_base::AggregatorPad, gst::Event)>, ) -> Result<(Option<gst::Caps>, Option<gst::BufferList>), gst::FlowError> { if at_eos { - gst::info!(CAT, obj: element, "Draining at EOS"); + gst::info!(CAT, imp: self, "Draining at EOS"); } else if timeout { - gst::info!(CAT, obj: element, "Draining at timeout"); + gst::info!(CAT, imp: self, "Draining at timeout"); } else { for stream in &state.streams { if !stream.fragment_filled && !stream.sinkpad.is_eos() { @@ -1250,7 +1246,7 @@ impl FMP4Mux { min_earliest_pts, min_start_dts_position, fragment_end_pts, - ) = self.drain_buffers(element, state, settings, timeout, at_eos)?; + ) = self.drain_buffers(state, settings, timeout, at_eos)?; // Remove all GAP buffers before processing them further for (_, _, buffers) in &mut drained_streams { @@ -1263,20 +1259,18 @@ impl FMP4Mux { // For ONVIF, replace all timestamps with timestamps based on UTC times. let max_end_utc_time = - self.preprocess_drained_streams_onvif(element, state, &mut drained_streams)?; + self.preprocess_drained_streams_onvif(state, &mut drained_streams)?; // Create header now if it was not created before and return the caps let mut caps = None; if state.stream_header.is_none() { - let (_, new_caps) = self - .update_header(element, state, settings, false)? - .unwrap(); + let (_, new_caps) = self.update_header(state, settings, false)?.unwrap(); caps = Some(new_caps); } // Interleave buffers according to the settings into a single vec let (mut interleaved_buffers, streams) = - self.interleave_buffers(element, settings, drained_streams)?; + self.interleave_buffers(settings, drained_streams)?; let mut buffer_list = None; if interleaved_buffers.is_empty() { @@ -1316,7 +1310,7 @@ impl FMP4Mux { state.sequence_number += 1; let (mut fmp4_fragment_header, moof_offset) = boxes::create_fmp4_fragment_header(super::FragmentHeaderConfiguration { - variant: element.class().as_ref().variant, + variant: self.instance().class().as_ref().variant, sequence_number, streams: streams.as_slice(), buffers: interleaved_buffers.as_slice(), @@ -1324,7 +1318,7 @@ impl FMP4Mux { .map_err(|err| { gst::error!( CAT, - obj: element, + imp: self, "Failed to create FMP4 fragment header: {}", err ); @@ -1394,7 +1388,7 @@ impl FMP4Mux { // Update for the start PTS of the next fragment gst::info!( CAT, - obj: element, + imp: self, "Starting new fragment at {}", fragment_end_pts, ); @@ -1402,7 +1396,7 @@ impl FMP4Mux { gst::debug!( CAT, - obj: element, + imp: self, "Sending force-keyunit events for running time {}", fragment_end_pts + settings.fragment_duration, ); @@ -1435,7 +1429,7 @@ impl FMP4Mux { buffer_list.as_mut().unwrap().get_mut().unwrap().add(mfra); } Err(err) => { - gst::error!(CAT, obj: element, "Failed to create mfra box: {}", err); + gst::error!(CAT, imp: self, "Failed to create mfra box: {}", err); } } } @@ -1446,12 +1440,9 @@ impl FMP4Mux { Ok((caps, buffer_list)) } - fn create_streams( - &self, - element: &super::FMP4Mux, - state: &mut State, - ) -> Result<(), gst::FlowError> { - for pad in element + fn create_streams(&self, state: &mut State) -> Result<(), gst::FlowError> { + for pad in self + .instance() .sink_pads() .into_iter() .map(|pad| pad.downcast::<gst_base::AggregatorPad>().unwrap()) @@ -1511,7 +1502,7 @@ impl FMP4Mux { } if state.streams.is_empty() { - gst::error!(CAT, obj: element, "No streams available"); + gst::error!(CAT, imp: self, "No streams available"); return Err(gst::FlowError::Error); } @@ -1546,12 +1537,12 @@ impl FMP4Mux { fn update_header( &self, - element: &super::FMP4Mux, state: &mut State, settings: &Settings, at_eos: bool, ) -> Result<Option<(gst::BufferList, gst::Caps)>, gst::FlowError> { - let class = element.class(); + let aggregator = self.instance(); + let class = aggregator.class(); let variant = class.as_ref().variant; if settings.header_update_mode == super::HeaderUpdateMode::None && at_eos { @@ -1591,7 +1582,7 @@ impl FMP4Mux { .map(|unix| unix.nseconds() / 100 + UNIX_1601_OFFSET * 10_000_000), }) .map_err(|err| { - gst::error!(CAT, obj: element, "Failed to create FMP4 header: {}", err); + gst::error!(CAT, imp: self, "Failed to create FMP4 header: {}", err); gst::FlowError::Error })?; @@ -1680,13 +1671,7 @@ impl ObjectImpl for FMP4Mux { &*PROPERTIES } - fn set_property( - &self, - obj: &Self::Type, - _id: usize, - value: &glib::Value, - pspec: &glib::ParamSpec, - ) { + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { match pspec.name() { "fragment-duration" => { let mut settings = self.settings.lock().unwrap(); @@ -1694,7 +1679,7 @@ impl ObjectImpl for FMP4Mux { if settings.fragment_duration != fragment_duration { settings.fragment_duration = fragment_duration; drop(settings); - obj.set_latency(fragment_duration, None); + self.instance().set_latency(fragment_duration, None); } } @@ -1733,7 +1718,7 @@ impl ObjectImpl for FMP4Mux { } } - fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { match pspec.name() { "fragment-duration" => { let settings = self.settings.lock().unwrap(); @@ -1769,9 +1754,10 @@ impl ObjectImpl for FMP4Mux { } } - fn constructed(&self, obj: &Self::Type) { - self.parent_constructed(obj); + fn constructed(&self) { + self.parent_constructed(); + let obj = self.instance(); let class = obj.class(); for templ in class.pad_template_list().filter(|templ| { templ.presence() == gst::PadPresence::Always @@ -1794,7 +1780,6 @@ impl GstObjectImpl for FMP4Mux {} impl ElementImpl for FMP4Mux { fn request_new_pad( &self, - element: &Self::Type, templ: &gst::PadTemplate, name: Option<String>, caps: Option<&gst::Caps>, @@ -1803,25 +1788,24 @@ impl ElementImpl for FMP4Mux { if state.stream_header.is_some() { gst::error!( CAT, - obj: element, + imp: self, "Can't request new pads after header was generated" ); return None; } - self.parent_request_new_pad(element, templ, name, caps) + self.parent_request_new_pad(templ, name, caps) } } impl AggregatorImpl for FMP4Mux { - fn next_time(&self, _aggregator: &Self::Type) -> Option<gst::ClockTime> { + fn next_time(&self) -> Option<gst::ClockTime> { let state = self.state.lock().unwrap(); state.fragment_start_pts.opt_add(state.timeout_delay) } fn sink_query( &self, - aggregator: &Self::Type, aggregator_pad: &gst_base::AggregatorPad, query: &mut gst::QueryRef, ) -> bool { @@ -1845,13 +1829,12 @@ impl AggregatorImpl for FMP4Mux { true } - _ => self.parent_sink_query(aggregator, aggregator_pad, query), + _ => self.parent_sink_query(aggregator_pad, query), } } fn sink_event_pre_queue( &self, - aggregator: &Self::Type, aggregator_pad: &gst_base::AggregatorPad, mut event: gst::Event, ) -> Result<gst::FlowSuccess, gst::FlowError> { @@ -1872,18 +1855,13 @@ impl AggregatorImpl for FMP4Mux { .seqnum(event.seqnum()) .build(); } - self.parent_sink_event_pre_queue(aggregator, aggregator_pad, event) + self.parent_sink_event_pre_queue(aggregator_pad, event) } - _ => self.parent_sink_event_pre_queue(aggregator, aggregator_pad, event), + _ => self.parent_sink_event_pre_queue(aggregator_pad, event), } } - fn sink_event( - &self, - aggregator: &Self::Type, - aggregator_pad: &gst_base::AggregatorPad, - event: gst::Event, - ) -> bool { + fn sink_event(&self, aggregator_pad: &gst_base::AggregatorPad, event: gst::Event) -> bool { use gst::EventView; gst::trace!(CAT, obj: aggregator_pad, "Handling event {:?}", event); @@ -1901,26 +1879,27 @@ impl AggregatorImpl for FMP4Mux { // Only forward the segment event verbatim if this is a single stream variant. // Otherwise we have to produce a default segment and re-timestamp all buffers // with their running time. + let aggregator = self.instance(); let class = aggregator.class(); if class.as_ref().variant.is_single_stream() { aggregator.update_segment(&segment); } - self.parent_sink_event(aggregator, aggregator_pad, event) + self.parent_sink_event(aggregator_pad, event) } EventView::Tag(_ev) => { // TODO: Maybe store for putting into the headers of the next fragment? - self.parent_sink_event(aggregator, aggregator_pad, event) + self.parent_sink_event(aggregator_pad, event) } - _ => self.parent_sink_event(aggregator, aggregator_pad, event), + _ => self.parent_sink_event(aggregator_pad, event), } } - fn src_query(&self, aggregator: &Self::Type, query: &mut gst::QueryRef) -> bool { + fn src_query(&self, query: &mut gst::QueryRef) -> bool { use gst::QueryViewMut; - gst::trace!(CAT, obj: aggregator, "Handling query {:?}", query); + gst::trace!(CAT, imp: self, "Handling query {:?}", query); match query.view_mut() { QueryViewMut::Seeking(q) => { @@ -1928,23 +1907,23 @@ impl AggregatorImpl for FMP4Mux { q.set(false, gst::ClockTime::ZERO, gst::ClockTime::NONE); true } - _ => self.parent_src_query(aggregator, query), + _ => self.parent_src_query(query), } } - fn src_event(&self, aggregator: &Self::Type, event: gst::Event) -> bool { + fn src_event(&self, event: gst::Event) -> bool { use gst::EventView; - gst::trace!(CAT, obj: aggregator, "Handling event {:?}", event); + gst::trace!(CAT, imp: self, "Handling event {:?}", event); match event.view() { EventView::Seek(_ev) => false, - _ => self.parent_src_event(aggregator, event), + _ => self.parent_src_event(event), } } - fn flush(&self, aggregator: &Self::Type) -> Result<gst::FlowSuccess, gst::FlowError> { - self.parent_flush(aggregator)?; + fn flush(&self) -> Result<gst::FlowSuccess, gst::FlowError> { + self.parent_flush()?; let mut state = self.state.lock().unwrap(); @@ -1962,23 +1941,24 @@ impl AggregatorImpl for FMP4Mux { Ok(gst::FlowSuccess::Ok) } - fn stop(&self, aggregator: &Self::Type) -> Result<(), gst::ErrorMessage> { - gst::trace!(CAT, obj: aggregator, "Stopping"); + fn stop(&self) -> Result<(), gst::ErrorMessage> { + gst::trace!(CAT, imp: self, "Stopping"); - let _ = self.parent_stop(aggregator); + let _ = self.parent_stop(); *self.state.lock().unwrap() = State::default(); Ok(()) } - fn start(&self, aggregator: &Self::Type) -> Result<(), gst::ErrorMessage> { - gst::trace!(CAT, obj: aggregator, "Starting"); + fn start(&self) -> Result<(), gst::ErrorMessage> { + gst::trace!(CAT, imp: self, "Starting"); - self.parent_start(aggregator)?; + self.parent_start()?; // For non-single-stream variants configure a default segment that allows for negative // DTS so that we can correctly re-timestamp buffers with their running times. + let aggregator = self.instance(); let class = aggregator.class(); if !class.as_ref().variant.is_single_stream() { let mut segment = gst::FormattedSegment::<gst::ClockTime>::new(); @@ -1992,15 +1972,11 @@ impl AggregatorImpl for FMP4Mux { Ok(()) } - fn negotiate(&self, _aggregator: &Self::Type) -> bool { + fn negotiate(&self) -> bool { true } - fn aggregate( - &self, - aggregator: &Self::Type, - timeout: bool, - ) -> Result<gst::FlowSuccess, gst::FlowError> { + fn aggregate(&self, timeout: bool) -> Result<gst::FlowSuccess, gst::FlowError> { let settings = self.settings.lock().unwrap().clone(); let mut upstream_events = vec![]; @@ -2011,7 +1987,7 @@ impl AggregatorImpl for FMP4Mux { // Create streams if state.streams.is_empty() { - self.create_streams(aggregator, &mut state)?; + self.create_streams(&mut state)?; } // Queue buffers from all streams that are not filled for the current fragment yet @@ -2020,9 +1996,7 @@ impl AggregatorImpl for FMP4Mux { // fill-level at all sinkpads in sync. let fragment_start_pts = state.fragment_start_pts; - while let Some((idx, stream)) = - self.find_earliest_stream(aggregator, &mut state, timeout)? - { + while let Some((idx, stream)) = self.find_earliest_stream(&mut state, timeout)? { // Can only happen if the stream was flushed in the meantime let buffer = match stream.sinkpad.pop_buffer() { Some(buffer) => buffer, @@ -2045,7 +2019,7 @@ impl AggregatorImpl for FMP4Mux { }; // Queue up the buffer and update GOP tracking state - self.queue_gops(aggregator, idx, stream, &segment, buffer)?; + self.queue_gops(idx, stream, &segment, buffer)?; // Check if this stream is filled enough now. if let Some((queued_end_pts, fragment_start_pts)) = Option::zip( @@ -2091,13 +2065,13 @@ impl AggregatorImpl for FMP4Mux { } if let Some(earliest_pts) = earliest_pts { - gst::info!(CAT, obj: aggregator, "Got earliest PTS {}", earliest_pts); + gst::info!(CAT, imp: self, "Got earliest PTS {}", earliest_pts); state.earliest_pts = Some(earliest_pts); state.fragment_start_pts = Some(earliest_pts); gst::debug!( CAT, - obj: aggregator, + imp: self, "Sending first force-keyunit event for running time {}", earliest_pts + settings.fragment_duration, ); @@ -2130,12 +2104,11 @@ impl AggregatorImpl for FMP4Mux { all_eos = state.streams.iter().all(|stream| stream.sinkpad.is_eos()); if all_eos { - gst::debug!(CAT, obj: aggregator, "All streams are EOS now"); + gst::debug!(CAT, imp: self, "All streams are EOS now"); } // If enough GOPs were queued, drain and create the output fragment match self.drain( - aggregator, &mut state, &settings, timeout, @@ -2144,8 +2117,8 @@ impl AggregatorImpl for FMP4Mux { ) { Ok(res) => res, Err(gst_base::AGGREGATOR_FLOW_NEED_DATA) => { - gst::element_warning!( - aggregator, + gst::element_imp_warning!( + self, gst::StreamError::Format, ["Longer GOPs than fragment duration"] ); @@ -2166,38 +2139,30 @@ impl AggregatorImpl for FMP4Mux { } if let Some(caps) = caps { - gst::debug!( - CAT, - obj: aggregator, - "Setting caps on source pad: {:?}", - caps - ); - aggregator.set_src_caps(&caps); + gst::debug!(CAT, imp: self, "Setting caps on source pad: {:?}", caps); + self.instance().set_src_caps(&caps); } if let Some(buffers) = buffers { - gst::trace!(CAT, obj: aggregator, "Pushing buffer list {:?}", buffers); - aggregator.finish_buffer_list(buffers)?; + gst::trace!(CAT, imp: self, "Pushing buffer list {:?}", buffers); + self.instance().finish_buffer_list(buffers)?; } if all_eos { - gst::debug!(CAT, obj: aggregator, "Doing EOS handling"); + gst::debug!(CAT, imp: self, "Doing EOS handling"); if settings.header_update_mode != super::HeaderUpdateMode::None { - let updated_header = self.update_header( - aggregator, - &mut self.state.lock().unwrap(), - &settings, - true, - ); + let updated_header = + self.update_header(&mut self.state.lock().unwrap(), &settings, true); match updated_header { Ok(Some((buffer_list, caps))) => { match settings.header_update_mode { super::HeaderUpdateMode::None => unreachable!(), super::HeaderUpdateMode::Rewrite => { - let src_pad = aggregator.src_pad(); let mut q = gst::query::Seeking::new(gst::Format::Bytes); - if src_pad.peer_query(&mut q) && q.result().0 { + if self.instance().src_pad().peer_query(&mut q) && q.result().0 { + let aggregator = self.instance(); + aggregator.set_src_caps(&caps); // Seek to the beginning with a default bytes segment @@ -2209,7 +2174,7 @@ impl AggregatorImpl for FMP4Mux { if let Err(err) = aggregator.finish_buffer_list(buffer_list) { gst::error!( CAT, - obj: aggregator, + imp: self, "Failed pushing updated header buffer downstream: {:?}", err, ); @@ -2217,17 +2182,19 @@ impl AggregatorImpl for FMP4Mux { } else { gst::error!( CAT, - obj: aggregator, + imp: self, "Can't rewrite header because downstream is not seekable" ); } } super::HeaderUpdateMode::Update => { + let aggregator = self.instance(); + aggregator.set_src_caps(&caps); if let Err(err) = aggregator.finish_buffer_list(buffer_list) { gst::error!( CAT, - obj: aggregator, + imp: self, "Failed pushing updated header buffer downstream: {:?}", err, ); @@ -2239,7 +2206,7 @@ impl AggregatorImpl for FMP4Mux { Err(err) => { gst::error!( CAT, - obj: aggregator, + imp: self, "Failed to generate updated header: {:?}", err ); diff --git a/generic/sodium/src/decrypter/imp.rs b/generic/sodium/src/decrypter/imp.rs index 7f35582cc..546b25033 100644 --- a/generic/sodium/src/decrypter/imp.rs +++ b/generic/sodium/src/decrypter/imp.rs @@ -99,14 +99,14 @@ impl State { // retrieval fn decrypt_into_adapter( &mut self, - element: &super::Decrypter, + imp: &Decrypter, pad: &gst::Pad, buffer: &gst::Buffer, chunk_index: u64, ) -> Result<gst::FlowSuccess, gst::FlowError> { let map = buffer.map_readable().map_err(|_| { - gst::element_error!( - element, + gst::element_imp_error!( + imp, gst::StreamError::Format, ["Failed to map buffer readable"] ); @@ -122,8 +122,8 @@ impl State { for subbuffer in map.chunks(block_size) { let plain = box_::open_precomputed(subbuffer, &nonce, &self.precomputed_key).map_err(|_| { - gst::element_error!( - element, + gst::element_imp_error!( + imp, gst::StreamError::Format, ["Failed to decrypt buffer"] ); @@ -255,7 +255,6 @@ impl Decrypter { fn src_activatemode_function( &self, _pad: &gst::Pad, - element: &super::Decrypter, mode: gst::PadMode, active: bool, ) -> Result<(), gst::LoggableError> { @@ -267,7 +266,7 @@ impl Decrypter { // Set the nonce and block size from the headers // right after we activate the pad - self.check_headers(element) + self.check_headers() } gst::PadMode::Push => Err(gst::loggable_error!(CAT, "Push mode not supported")), _ => Err(gst::loggable_error!( @@ -278,12 +277,7 @@ impl Decrypter { } } - fn src_query( - &self, - pad: &gst::Pad, - element: &super::Decrypter, - query: &mut gst::QueryRef, - ) -> bool { + fn src_query(&self, pad: &gst::Pad, query: &mut gst::QueryRef) -> bool { use gst::QueryViewMut; gst::log!(CAT, obj: pad, "Handling query {:?}", query); @@ -306,7 +300,7 @@ impl Decrypter { } QueryViewMut::Duration(q) => { if q.format() != gst::Format::Bytes { - return pad.query_default(Some(element), query); + return pad.query_default(Some(&*self.instance()), query); } /* First let's query the bytes duration upstream */ @@ -348,11 +342,11 @@ impl Decrypter { true } - _ => pad.query_default(Some(element), query), + _ => pad.query_default(Some(&*self.instance()), query), } } - fn check_headers(&self, element: &super::Decrypter) -> Result<(), gst::LoggableError> { + fn check_headers(&self) -> Result<(), gst::LoggableError> { let is_none = { let mutex_state = self.state.lock().unwrap(); let state = mutex_state.as_ref().unwrap(); @@ -372,26 +366,22 @@ impl Decrypter { "Failed to pull nonce from the stream, reason: {:?}", err ); - err.log_with_object(element); err })?; if buffer.size() != crate::HEADERS_SIZE { let err = gst::loggable_error!(CAT, "Headers buffer has wrong size"); - err.log_with_object(element); return Err(err); } let map = buffer.map_readable().map_err(|_| { let err = gst::loggable_error!(CAT, "Failed to map buffer readable"); - err.log_with_object(element); err })?; let sodium_header_slice = &map[..crate::TYPEFIND_HEADER_SIZE]; if sodium_header_slice != crate::TYPEFIND_HEADER { let err = gst::loggable_error!(CAT, "Buffer has wrong typefind header"); - err.log_with_object(element); return Err(err); } @@ -400,7 +390,6 @@ impl Decrypter { assert_eq!(nonce_slice.len(), box_::NONCEBYTES); let nonce = box_::Nonce::from_slice(nonce_slice).ok_or_else(|| { let err = gst::loggable_error!(CAT, "Failed to create nonce from buffer"); - err.log_with_object(&self.srcpad); err })?; @@ -416,9 +405,9 @@ impl Decrypter { let state = state.as_mut().unwrap(); state.initial_nonce = Some(nonce); - gst::debug!(CAT, obj: element, "Setting nonce to: {:?}", nonce.0); + gst::debug!(CAT, imp: self, "Setting nonce to: {:?}", nonce.0); state.block_size = Some(block_size); - gst::debug!(CAT, obj: element, "Setting block size to: {}", block_size); + gst::debug!(CAT, imp: self, "Setting block size to: {}", block_size); Ok(()) } @@ -426,7 +415,6 @@ impl Decrypter { fn pull_requested_buffer( &self, pad: &gst::Pad, - element: &super::Decrypter, requested_size: u32, block_size: u32, chunk_index: u64, @@ -441,8 +429,8 @@ impl Decrypter { // calculate how many chunks are needed, if we need something like 3.2 // round the number to 4 and cut the buffer afterwards. let checked = requested_size.checked_add(block_size).ok_or_else(|| { - gst::element_error!( - element, + gst::element_imp_error!( + self, gst::LibraryError::Failed, [ "Addition overflow when adding requested pull size and block size: {} + {}", @@ -459,8 +447,8 @@ impl Decrypter { // Pull a buffer of all the chunks we will need let checked_size = total_chunks.checked_mul(block_size).ok_or_else(|| { - gst::element_error!( - element, + gst::element_imp_error!( + self, gst::LibraryError::Failed, [ "Overflowed trying to calculate the buffer size to pull: {} * {}", @@ -494,7 +482,6 @@ impl Decrypter { fn range( &self, pad: &gst::Pad, - element: &super::Decrypter, offset: u64, buffer: Option<&mut gst::BufferRef>, requested_size: u32, @@ -519,20 +506,15 @@ impl Decrypter { assert!(pull_offset <= std::u32::MAX as u64); let pull_offset = pull_offset as u32; - let pulled_buffer = self.pull_requested_buffer( - pad, - element, - requested_size + pull_offset, - block_size, - chunk_index, - )?; + let pulled_buffer = + self.pull_requested_buffer(pad, requested_size + pull_offset, block_size, chunk_index)?; let mut state = self.state.lock().unwrap(); // This will only be run after READY state, // and will be guaranted to be initialized let state = state.as_mut().unwrap(); - state.decrypt_into_adapter(element, &self.srcpad, &pulled_buffer, chunk_index)?; + state.decrypt_into_adapter(self, &self.srcpad, &pulled_buffer, chunk_index)?; let adapter_offset = pull_offset as usize; state.requested_buffer(&self.srcpad, buffer, requested_size, adapter_offset) @@ -555,7 +537,7 @@ impl ObjectSubclass for Decrypter { Decrypter::catch_panic_pad_function( parent, || Err(gst::FlowError::Error), - |decrypter, element| decrypter.range(pad, element, offset, buffer, size), + |decrypter| decrypter.range(pad, offset, buffer, size), ) }) .activatemode_function(|pad, parent, mode, active| { @@ -567,16 +549,14 @@ impl ObjectSubclass for Decrypter { "Panic activating srcpad with mode" )) }, - |decrypter, element| { - decrypter.src_activatemode_function(pad, element, mode, active) - }, + |decrypter| decrypter.src_activatemode_function(pad, mode, active), ) }) .query_function(|pad, parent, query| { Decrypter::catch_panic_pad_function( parent, || false, - |decrypter, element| decrypter.src_query(pad, element, query), + |decrypter| decrypter.src_query(pad, query), ) }) .build(); @@ -612,20 +592,15 @@ impl ObjectImpl for Decrypter { PROPERTIES.as_ref() } - fn constructed(&self, obj: &Self::Type) { - self.parent_constructed(obj); + fn constructed(&self) { + self.parent_constructed(); + let obj = self.instance(); obj.add_pad(&self.sinkpad).unwrap(); obj.add_pad(&self.srcpad).unwrap(); } - fn set_property( - &self, - _obj: &Self::Type, - _id: usize, - value: &glib::Value, - pspec: &glib::ParamSpec, - ) { + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { match pspec.name() { "sender-key" => { let mut props = self.props.lock().unwrap(); @@ -641,7 +616,7 @@ impl ObjectImpl for Decrypter { } } - fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { match pspec.name() { "receiver-key" => { let props = self.props.lock().unwrap(); @@ -696,10 +671,9 @@ impl ElementImpl for Decrypter { fn change_state( &self, - element: &Self::Type, transition: gst::StateChange, ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> { - gst::debug!(CAT, obj: element, "Changing state {:?}", transition); + gst::debug!(CAT, imp: self, "Changing state {:?}", transition); match transition { gst::StateChange::NullToReady => { @@ -708,7 +682,7 @@ impl ElementImpl for Decrypter { // Create an internal state struct from the provided properties or // refuse to change state let state_ = State::from_props(&props).map_err(|err| { - element.post_error_message(err); + self.post_error_message(err); gst::StateChangeError })?; @@ -721,7 +695,7 @@ impl ElementImpl for Decrypter { _ => (), } - let success = self.parent_change_state(element, transition)?; + let success = self.parent_change_state(transition)?; if transition == gst::StateChange::ReadyToNull { let _ = self.state.lock().unwrap().take(); diff --git a/generic/sodium/src/encrypter/imp.rs b/generic/sodium/src/encrypter/imp.rs index b56080c23..b0cc8a4c2 100644 --- a/generic/sodium/src/encrypter/imp.rs +++ b/generic/sodium/src/encrypter/imp.rs @@ -166,7 +166,6 @@ impl Encrypter { fn sink_chain( &self, pad: &gst::Pad, - element: &super::Encrypter, buffer: gst::Buffer, ) -> Result<gst::FlowSuccess, gst::FlowError> { gst::log!(CAT, obj: pad, "Handling buffer {:?}", buffer); @@ -196,7 +195,7 @@ impl Encrypter { for buffer in buffers { self.srcpad.push(buffer).map_err(|err| { - gst::error!(CAT, obj: element, "Failed to push buffer {:?}", err); + gst::error!(CAT, imp: self, "Failed to push buffer {:?}", err); err })?; } @@ -204,7 +203,7 @@ impl Encrypter { Ok(gst::FlowSuccess::Ok) } - fn sink_event(&self, pad: &gst::Pad, element: &super::Encrypter, event: gst::Event) -> bool { + fn sink_event(&self, pad: &gst::Pad, event: gst::Event) -> bool { use gst::EventView; gst::log!(CAT, obj: pad, "Handling event {:?}", event); @@ -239,34 +238,29 @@ impl Encrypter { for buffer in buffers { if let Err(err) = self.srcpad.push(buffer) { - gst::error!(CAT, obj: element, "Failed to push buffer at EOS {:?}", err); + gst::error!(CAT, imp: self, "Failed to push buffer at EOS {:?}", err); return false; } } - pad.event_default(Some(element), event) + pad.event_default(Some(&*self.instance()), event) } - _ => pad.event_default(Some(element), event), + _ => pad.event_default(Some(&*self.instance()), event), } } - fn src_event(&self, pad: &gst::Pad, element: &super::Encrypter, event: gst::Event) -> bool { + fn src_event(&self, pad: &gst::Pad, event: gst::Event) -> bool { use gst::EventView; gst::log!(CAT, obj: pad, "Handling event {:?}", event); match event.view() { EventView::Seek(_) => false, - _ => pad.event_default(Some(element), event), + _ => pad.event_default(Some(&*self.instance()), event), } } - fn src_query( - &self, - pad: &gst::Pad, - element: &super::Encrypter, - query: &mut gst::QueryRef, - ) -> bool { + fn src_query(&self, pad: &gst::Pad, query: &mut gst::QueryRef) -> bool { use gst::QueryViewMut; gst::log!(CAT, obj: pad, "Handling query {:?}", query); @@ -284,7 +278,7 @@ impl Encrypter { } QueryViewMut::Duration(q) => { if q.format() != gst::Format::Bytes { - return pad.query_default(Some(element), query); + return pad.query_default(Some(&*self.instance()), query); } /* First let's query the bytes duration upstream */ @@ -325,7 +319,7 @@ impl Encrypter { true } - _ => pad.query_default(Some(element), query), + _ => pad.query_default(Some(&*self.instance()), query), } } } @@ -343,14 +337,14 @@ impl ObjectSubclass for Encrypter { Encrypter::catch_panic_pad_function( parent, || Err(gst::FlowError::Error), - |encrypter, element| encrypter.sink_chain(pad, element, buffer), + |encrypter| encrypter.sink_chain(pad, buffer), ) }) .event_function(|pad, parent, event| { Encrypter::catch_panic_pad_function( parent, || false, - |encrypter, element| encrypter.sink_event(pad, element, event), + |encrypter| encrypter.sink_event(pad, event), ) }) .build(); @@ -361,14 +355,14 @@ impl ObjectSubclass for Encrypter { Encrypter::catch_panic_pad_function( parent, || false, - |encrypter, element| encrypter.src_query(pad, element, query), + |encrypter| encrypter.src_query(pad, query), ) }) .event_function(|pad, parent, event| { Encrypter::catch_panic_pad_function( parent, || false, - |encrypter, element| encrypter.src_event(pad, element, event), + |encrypter| encrypter.src_event(pad, event), ) }) .build(); @@ -410,20 +404,15 @@ impl ObjectImpl for Encrypter { PROPERTIES.as_ref() } - fn constructed(&self, obj: &Self::Type) { - self.parent_constructed(obj); + fn constructed(&self) { + self.parent_constructed(); + let obj = self.instance(); obj.add_pad(&self.sinkpad).unwrap(); obj.add_pad(&self.srcpad).unwrap(); } - fn set_property( - &self, - _obj: &Self::Type, - _id: usize, - value: &glib::Value, - pspec: &glib::ParamSpec, - ) { + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { match pspec.name() { "sender-key" => { let mut props = self.props.lock().unwrap(); @@ -444,7 +433,7 @@ impl ObjectImpl for Encrypter { } } - fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { match pspec.name() { "receiver-key" => { let props = self.props.lock().unwrap(); @@ -504,10 +493,9 @@ impl ElementImpl for Encrypter { fn change_state( &self, - element: &Self::Type, transition: gst::StateChange, ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> { - gst::debug!(CAT, obj: element, "Changing state {:?}", transition); + gst::debug!(CAT, imp: self, "Changing state {:?}", transition); match transition { gst::StateChange::NullToReady => { @@ -516,7 +504,7 @@ impl ElementImpl for Encrypter { // Create an internal state struct from the provided properties or // refuse to change state let state_ = State::from_props(&props).map_err(|err| { - element.post_error_message(err); + self.post_error_message(err); gst::StateChangeError })?; @@ -529,7 +517,7 @@ impl ElementImpl for Encrypter { _ => (), } - let success = self.parent_change_state(element, transition)?; + let success = self.parent_change_state(transition)?; if transition == gst::StateChange::ReadyToNull { let _ = self.state.lock().unwrap().take(); diff --git a/generic/threadshare/examples/standalone/sink/imp.rs b/generic/threadshare/examples/standalone/sink/imp.rs index de0406f58..62ea73b60 100644 --- a/generic/threadshare/examples/standalone/sink/imp.rs +++ b/generic/threadshare/examples/standalone/sink/imp.rs @@ -608,12 +608,12 @@ impl TestSink { self.item_sender.lock().unwrap().as_ref().unwrap().clone() } - fn prepare(&self, element: &super::TestSink) -> Result<(), gst::ErrorMessage> { + fn prepare(&self) -> Result<(), gst::ErrorMessage> { let raise_log_level = self.settings.lock().unwrap().raise_log_level; if raise_log_level { - gst::debug!(CAT, obj: element, "Preparing"); + gst::debug!(CAT, imp: self, "Preparing"); } else { - gst::trace!(CAT, obj: element, "Preparing"); + gst::trace!(CAT, imp: self, "Preparing"); } let context = { @@ -629,70 +629,70 @@ impl TestSink { // Enable backpressure for items let (item_sender, item_receiver) = flume::bounded(0); - let task_impl = TestSinkTask::new(element, item_receiver); + let task_impl = TestSinkTask::new(&*self.instance(), item_receiver); self.task.prepare(task_impl, context).block_on()?; *self.item_sender.lock().unwrap() = Some(item_sender); if raise_log_level { - gst::debug!(CAT, obj: element, "Prepared"); + gst::debug!(CAT, imp: self, "Prepared"); } else { - gst::trace!(CAT, obj: element, "Prepared"); + gst::trace!(CAT, imp: self, "Prepared"); } Ok(()) } - fn unprepare(&self, element: &super::TestSink) { + fn unprepare(&self) { let raise_log_level = self.settings.lock().unwrap().raise_log_level; if raise_log_level { - gst::debug!(CAT, obj: element, "Unpreparing"); + gst::debug!(CAT, imp: self, "Unpreparing"); } else { - gst::trace!(CAT, obj: element, "Unpreparing"); + gst::trace!(CAT, imp: self, "Unpreparing"); } self.task.unprepare().block_on().unwrap(); if raise_log_level { - gst::debug!(CAT, obj: element, "Unprepared"); + gst::debug!(CAT, imp: self, "Unprepared"); } else { - gst::trace!(CAT, obj: element, "Unprepared"); + gst::trace!(CAT, imp: self, "Unprepared"); } } - fn stop(&self, element: &super::TestSink) -> Result<(), gst::ErrorMessage> { + fn stop(&self) -> Result<(), gst::ErrorMessage> { let raise_log_level = self.settings.lock().unwrap().raise_log_level; if raise_log_level { - gst::debug!(CAT, obj: element, "Stopping"); + gst::debug!(CAT, imp: self, "Stopping"); } else { - gst::trace!(CAT, obj: element, "Stopping"); + gst::trace!(CAT, imp: self, "Stopping"); } self.task.stop().block_on()?; if raise_log_level { - gst::debug!(CAT, obj: element, "Stopped"); + gst::debug!(CAT, imp: self, "Stopped"); } else { - gst::trace!(CAT, obj: element, "Stopped"); + gst::trace!(CAT, imp: self, "Stopped"); } Ok(()) } - fn start(&self, element: &super::TestSink) -> Result<(), gst::ErrorMessage> { + fn start(&self) -> Result<(), gst::ErrorMessage> { let raise_log_level = self.settings.lock().unwrap().raise_log_level; if raise_log_level { - gst::debug!(CAT, obj: element, "Starting"); + gst::debug!(CAT, imp: self, "Starting"); } else { - gst::trace!(CAT, obj: element, "Starting"); + gst::trace!(CAT, imp: self, "Starting"); } self.task.start().block_on()?; if raise_log_level { - gst::debug!(CAT, obj: element, "Started"); + gst::debug!(CAT, imp: self, "Started"); } else { - gst::trace!(CAT, obj: element, "Started"); + gst::trace!(CAT, imp: self, "Started"); } Ok(()) @@ -760,13 +760,7 @@ impl ObjectImpl for TestSink { PROPERTIES.as_ref() } - fn set_property( - &self, - _obj: &Self::Type, - _id: usize, - value: &glib::Value, - pspec: &glib::ParamSpec, - ) { + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { let mut settings = self.settings.lock().unwrap(); match pspec.name() { "context" => { @@ -800,7 +794,7 @@ impl ObjectImpl for TestSink { } } - fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { let settings = self.settings.lock().unwrap(); match pspec.name() { "context" => settings.context.to_value(), @@ -816,12 +810,12 @@ impl ObjectImpl for TestSink { } } - fn constructed(&self, obj: &Self::Type) { - self.parent_constructed(obj); + fn constructed(&self) { + self.parent_constructed(); + let obj = self.instance(); obj.add_pad(self.sink_pad.gst_pad()).unwrap(); - - gstthreadshare::set_element_flags(obj, gst::ElementFlags::SINK); + obj.set_element_flags(gst::ElementFlags::SINK); } } @@ -861,30 +855,29 @@ impl ElementImpl for TestSink { fn change_state( &self, - element: &Self::Type, transition: gst::StateChange, ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> { - gst::trace!(CAT, obj: element, "Changing state {:?}", transition); + gst::trace!(CAT, imp: self, "Changing state {:?}", transition); match transition { gst::StateChange::NullToReady => { - self.prepare(element).map_err(|err| { - element.post_error_message(err); + self.prepare().map_err(|err| { + self.post_error_message(err); gst::StateChangeError })?; } gst::StateChange::ReadyToPaused => { - self.start(element).map_err(|_| gst::StateChangeError)?; + self.start().map_err(|_| gst::StateChangeError)?; } gst::StateChange::PausedToReady => { - self.stop(element).map_err(|_| gst::StateChangeError)?; + self.stop().map_err(|_| gst::StateChangeError)?; } gst::StateChange::ReadyToNull => { - self.unprepare(element); + self.unprepare(); } _ => (), } - self.parent_change_state(element, transition) + self.parent_change_state(transition) } } diff --git a/generic/threadshare/examples/standalone/src/imp.rs b/generic/threadshare/examples/standalone/src/imp.rs index 1680f75a7..1efa12751 100644 --- a/generic/threadshare/examples/standalone/src/imp.rs +++ b/generic/threadshare/examples/standalone/src/imp.rs @@ -318,12 +318,12 @@ pub struct TestSrc { } impl TestSrc { - fn prepare(&self, element: &super::TestSrc) -> Result<(), gst::ErrorMessage> { + fn prepare(&self) -> Result<(), gst::ErrorMessage> { let raise_log_level = self.settings.lock().unwrap().raise_log_level; if raise_log_level { - gst::debug!(CAT, obj: element, "Preparing"); + gst::debug!(CAT, imp: self, "Preparing"); } else { - gst::trace!(CAT, obj: element, "Preparing"); + gst::trace!(CAT, imp: self, "Preparing"); } let settings = self.settings.lock().unwrap(); @@ -337,87 +337,87 @@ impl TestSrc { drop(settings); self.task - .prepare(SrcTask::new(element.clone()), context) + .prepare(SrcTask::new(self.instance().clone()), context) .block_on()?; if raise_log_level { - gst::debug!(CAT, obj: element, "Prepared"); + gst::debug!(CAT, imp: self, "Prepared"); } else { - gst::trace!(CAT, obj: element, "Prepared"); + gst::trace!(CAT, imp: self, "Prepared"); } Ok(()) } - fn unprepare(&self, element: &super::TestSrc) { + fn unprepare(&self) { let raise_log_level = self.settings.lock().unwrap().raise_log_level; if raise_log_level { - gst::debug!(CAT, obj: element, "Unpreparing"); + gst::debug!(CAT, imp: self, "Unpreparing"); } else { - gst::trace!(CAT, obj: element, "Unpreparing"); + gst::trace!(CAT, imp: self, "Unpreparing"); } self.task.unprepare().block_on().unwrap(); if raise_log_level { - gst::debug!(CAT, obj: element, "Unprepared"); + gst::debug!(CAT, imp: self, "Unprepared"); } else { - gst::trace!(CAT, obj: element, "Unprepared"); + gst::trace!(CAT, imp: self, "Unprepared"); } } - fn stop(&self, element: &super::TestSrc) -> Result<(), gst::ErrorMessage> { + fn stop(&self) -> Result<(), gst::ErrorMessage> { let raise_log_level = self.settings.lock().unwrap().raise_log_level; if raise_log_level { - gst::debug!(CAT, obj: element, "Stopping"); + gst::debug!(CAT, imp: self, "Stopping"); } else { - gst::trace!(CAT, obj: element, "Stopping"); + gst::trace!(CAT, imp: self, "Stopping"); } self.task.stop().block_on()?; if raise_log_level { - gst::debug!(CAT, obj: element, "Stopped"); + gst::debug!(CAT, imp: self, "Stopped"); } else { - gst::trace!(CAT, obj: element, "Stopped"); + gst::trace!(CAT, imp: self, "Stopped"); } Ok(()) } - fn start(&self, element: &super::TestSrc) -> Result<(), gst::ErrorMessage> { + fn start(&self) -> Result<(), gst::ErrorMessage> { let raise_log_level = self.settings.lock().unwrap().raise_log_level; if raise_log_level { - gst::debug!(CAT, obj: element, "Starting"); + gst::debug!(CAT, imp: self, "Starting"); } else { - gst::trace!(CAT, obj: element, "Starting"); + gst::trace!(CAT, imp: self, "Starting"); } self.task.start().block_on()?; if raise_log_level { - gst::debug!(CAT, obj: element, "Started"); + gst::debug!(CAT, imp: self, "Started"); } else { - gst::trace!(CAT, obj: element, "Started"); + gst::trace!(CAT, imp: self, "Started"); } Ok(()) } - fn pause(&self, element: &super::TestSrc) -> Result<(), gst::ErrorMessage> { + fn pause(&self) -> Result<(), gst::ErrorMessage> { let raise_log_level = self.settings.lock().unwrap().raise_log_level; if raise_log_level { - gst::debug!(CAT, obj: element, "Pausing"); + gst::debug!(CAT, imp: self, "Pausing"); } else { - gst::trace!(CAT, obj: element, "Pausing"); + gst::trace!(CAT, imp: self, "Pausing"); } self.task.pause().block_on()?; if raise_log_level { - gst::debug!(CAT, obj: element, "Paused"); + gst::debug!(CAT, imp: self, "Paused"); } else { - gst::trace!(CAT, obj: element, "Paused"); + gst::trace!(CAT, imp: self, "Paused"); } Ok(()) @@ -479,13 +479,7 @@ impl ObjectImpl for TestSrc { PROPERTIES.as_ref() } - fn set_property( - &self, - _obj: &Self::Type, - _id: usize, - value: &glib::Value, - pspec: &glib::ParamSpec, - ) { + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { let mut settings = self.settings.lock().unwrap(); match pspec.name() { "context" => { @@ -515,7 +509,7 @@ impl ObjectImpl for TestSrc { } } - fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { let settings = self.settings.lock().unwrap(); match pspec.name() { "context" => settings.context.to_value(), @@ -531,12 +525,12 @@ impl ObjectImpl for TestSrc { } } - fn constructed(&self, obj: &Self::Type) { - self.parent_constructed(obj); + fn constructed(&self) { + self.parent_constructed(); + let obj = self.instance(); obj.add_pad(self.src_pad.gst_pad()).unwrap(); - - gstthreadshare::set_element_flags(obj, gst::ElementFlags::SOURCE); + obj.set_element_flags(gst::ElementFlags::SOURCE); } } @@ -575,41 +569,40 @@ impl ElementImpl for TestSrc { fn change_state( &self, - element: &Self::Type, transition: gst::StateChange, ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> { - gst::trace!(CAT, obj: element, "Changing state {:?}", transition); + gst::trace!(CAT, imp: self, "Changing state {:?}", transition); match transition { gst::StateChange::NullToReady => { - self.prepare(element).map_err(|err| { - element.post_error_message(err); + self.prepare().map_err(|err| { + self.post_error_message(err); gst::StateChangeError })?; } gst::StateChange::PlayingToPaused => { - self.pause(element).map_err(|_| gst::StateChangeError)?; + self.pause().map_err(|_| gst::StateChangeError)?; } gst::StateChange::ReadyToNull => { - self.unprepare(element); + self.unprepare(); } _ => (), } - let mut success = self.parent_change_state(element, transition)?; + let mut success = self.parent_change_state(transition)?; match transition { gst::StateChange::ReadyToPaused => { success = gst::StateChangeSuccess::NoPreroll; } gst::StateChange::PausedToPlaying => { - self.start(element).map_err(|_| gst::StateChangeError)?; + self.start().map_err(|_| gst::StateChangeError)?; } gst::StateChange::PlayingToPaused => { success = gst::StateChangeSuccess::NoPreroll; } gst::StateChange::PausedToReady => { - self.stop(element).map_err(|_| gst::StateChangeError)?; + self.stop().map_err(|_| gst::StateChangeError)?; } _ => (), } diff --git a/generic/threadshare/src/appsrc/imp.rs b/generic/threadshare/src/appsrc/imp.rs index a88e83523..8eee681ae 100644 --- a/generic/threadshare/src/appsrc/imp.rs +++ b/generic/threadshare/src/appsrc/imp.rs @@ -319,24 +319,24 @@ pub struct AppSrc { } impl AppSrc { - fn push_buffer(&self, element: &super::AppSrc, mut buffer: gst::Buffer) -> bool { + fn push_buffer(&self, mut buffer: gst::Buffer) -> bool { let state = self.task.lock_state(); if *state != TaskState::Started && *state != TaskState::Paused { - gst::debug!(CAT, obj: element, "Rejecting buffer due to element state"); + gst::debug!(CAT, imp: self, "Rejecting buffer due to element state"); return false; } let do_timestamp = self.settings.lock().unwrap().do_timestamp; if do_timestamp { - if let Some(clock) = element.clock() { - let base_time = element.base_time(); + if let Some(clock) = self.instance().clock() { + let base_time = self.instance().base_time(); let now = clock.time(); let buffer = buffer.make_mut(); buffer.set_dts(now.opt_checked_sub(base_time).ok().flatten()); buffer.set_pts(None); } else { - gst::error!(CAT, obj: element, "Don't have a clock yet"); + gst::error!(CAT, imp: self, "Don't have a clock yet"); return false; } } @@ -349,13 +349,13 @@ impl AppSrc { { Ok(_) => true, Err(err) => { - gst::error!(CAT, obj: element, "Failed to queue buffer: {}", err); + gst::error!(CAT, imp: self, "Failed to queue buffer: {}", err); false } } } - fn end_of_stream(&self, element: &super::AppSrc) -> bool { + fn end_of_stream(&self) -> bool { let mut sender = self.sender.lock().unwrap(); let sender = match sender.as_mut() { Some(sender) => sender, @@ -365,14 +365,14 @@ impl AppSrc { match sender.try_send(StreamItem::Event(gst::event::Eos::new())) { Ok(_) => true, Err(err) => { - gst::error!(CAT, obj: element, "Failed to queue EOS: {}", err); + gst::error!(CAT, imp: self, "Failed to queue EOS: {}", err); false } } } - fn prepare(&self, element: &super::AppSrc) -> Result<(), gst::ErrorMessage> { - gst::debug!(CAT, obj: element, "Preparing"); + fn prepare(&self) -> Result<(), gst::ErrorMessage> { + gst::debug!(CAT, imp: self, "Preparing"); let settings = self.settings.lock().unwrap(); let context = @@ -396,41 +396,41 @@ impl AppSrc { *self.sender.lock().unwrap() = Some(sender); self.task - .prepare(AppSrcTask::new(element.clone(), receiver), context) + .prepare(AppSrcTask::new(self.instance().clone(), receiver), context) .block_on()?; - gst::debug!(CAT, obj: element, "Prepared"); + gst::debug!(CAT, imp: self, "Prepared"); Ok(()) } - fn unprepare(&self, element: &super::AppSrc) { - gst::debug!(CAT, obj: element, "Unpreparing"); + fn unprepare(&self) { + gst::debug!(CAT, imp: self, "Unpreparing"); *self.sender.lock().unwrap() = None; self.task.unprepare().block_on().unwrap(); - gst::debug!(CAT, obj: element, "Unprepared"); + gst::debug!(CAT, imp: self, "Unprepared"); } - fn stop(&self, element: &super::AppSrc) -> Result<(), gst::ErrorMessage> { - gst::debug!(CAT, obj: element, "Stopping"); + fn stop(&self) -> Result<(), gst::ErrorMessage> { + gst::debug!(CAT, imp: self, "Stopping"); self.task.stop().block_on()?; - gst::debug!(CAT, obj: element, "Stopped"); + gst::debug!(CAT, imp: self, "Stopped"); Ok(()) } - fn start(&self, element: &super::AppSrc) -> Result<(), gst::ErrorMessage> { - gst::debug!(CAT, obj: element, "Starting"); + fn start(&self) -> Result<(), gst::ErrorMessage> { + gst::debug!(CAT, imp: self, "Starting"); self.task.start().block_on()?; - gst::debug!(CAT, obj: element, "Started"); + gst::debug!(CAT, imp: self, "Started"); Ok(()) } - fn pause(&self, element: &super::AppSrc) -> Result<(), gst::ErrorMessage> { - gst::debug!(CAT, obj: element, "Pausing"); + fn pause(&self) -> Result<(), gst::ErrorMessage> { + gst::debug!(CAT, imp: self, "Pausing"); self.task.pause().block_on()?; - gst::debug!(CAT, obj: element, "Paused"); + gst::debug!(CAT, imp: self, "Paused"); Ok(()) } } @@ -503,7 +503,7 @@ impl ObjectImpl for AppSrc { let buffer = args[1].get::<gst::Buffer>().expect("signal arg"); let appsrc = element.imp(); - Some(appsrc.push_buffer(&element, buffer).to_value()) + Some(appsrc.push_buffer(buffer).to_value()) }) .build(), /** @@ -519,7 +519,7 @@ impl ObjectImpl for AppSrc { let element = args[0].get::<super::AppSrc>().expect("signal arg"); let appsrc = element.imp(); - Some(appsrc.end_of_stream(&element).to_value()) + Some(appsrc.end_of_stream().to_value()) }) .build(), ] @@ -528,13 +528,7 @@ impl ObjectImpl for AppSrc { SIGNALS.as_ref() } - fn set_property( - &self, - _obj: &Self::Type, - _id: usize, - value: &glib::Value, - pspec: &glib::ParamSpec, - ) { + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { let mut settings = self.settings.lock().unwrap(); match pspec.name() { "context" => { @@ -561,7 +555,7 @@ impl ObjectImpl for AppSrc { } } - fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { let settings = self.settings.lock().unwrap(); match pspec.name() { "context" => settings.context.to_value(), @@ -573,12 +567,12 @@ impl ObjectImpl for AppSrc { } } - fn constructed(&self, obj: &Self::Type) { - self.parent_constructed(obj); + fn constructed(&self) { + self.parent_constructed(); + let obj = self.instance(); obj.add_pad(self.src_pad.gst_pad()).unwrap(); - - crate::set_element_flags(obj, gst::ElementFlags::SOURCE); + obj.set_element_flags(gst::ElementFlags::SOURCE); } } @@ -617,41 +611,40 @@ impl ElementImpl for AppSrc { fn change_state( &self, - element: &Self::Type, transition: gst::StateChange, ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> { - gst::trace!(CAT, obj: element, "Changing state {:?}", transition); + gst::trace!(CAT, imp: self, "Changing state {:?}", transition); match transition { gst::StateChange::NullToReady => { - self.prepare(element).map_err(|err| { - element.post_error_message(err); + self.prepare().map_err(|err| { + self.post_error_message(err); gst::StateChangeError })?; } gst::StateChange::PlayingToPaused => { - self.pause(element).map_err(|_| gst::StateChangeError)?; + self.pause().map_err(|_| gst::StateChangeError)?; } gst::StateChange::ReadyToNull => { - self.unprepare(element); + self.unprepare(); } _ => (), } - let mut success = self.parent_change_state(element, transition)?; + let mut success = self.parent_change_state(transition)?; match transition { gst::StateChange::ReadyToPaused => { success = gst::StateChangeSuccess::NoPreroll; } gst::StateChange::PausedToPlaying => { - self.start(element).map_err(|_| gst::StateChangeError)?; + self.start().map_err(|_| gst::StateChangeError)?; } gst::StateChange::PlayingToPaused => { success = gst::StateChangeSuccess::NoPreroll; } gst::StateChange::PausedToReady => { - self.stop(element).map_err(|_| gst::StateChangeError)?; + self.stop().map_err(|_| gst::StateChangeError)?; } _ => (), } diff --git a/generic/threadshare/src/inputselector/imp.rs b/generic/threadshare/src/inputselector/imp.rs index 41d962de4..4175f4cff 100644 --- a/generic/threadshare/src/inputselector/imp.rs +++ b/generic/threadshare/src/inputselector/imp.rs @@ -92,8 +92,8 @@ impl InputSelectorPadSinkHandler { async fn handle_item( &self, - pad: &PadSinkRef<'_>, element: &super::InputSelector, + pad: &PadSinkRef<'_>, mut buffer: gst::Buffer, ) -> Result<gst::FlowSuccess, gst::FlowError> { let inputselector = element.imp(); @@ -173,7 +173,7 @@ impl PadSinkHandler for InputSelectorPadSinkHandler { let pad_weak = pad.downgrade(); async move { let pad = pad_weak.upgrade().expect("PadSink no longer exists"); - this.handle_item(&pad, &element, buffer).await + this.handle_item(&element, &pad, buffer).await } .boxed() } @@ -193,7 +193,7 @@ impl PadSinkHandler for InputSelectorPadSinkHandler { gst::log!(CAT, obj: pad.gst_pad(), "Handling buffer list {:?}", list); // TODO: Ideally we would keep the list intact and forward it in one go for buffer in list.iter_owned() { - this.handle_item(&pad, &element, buffer).await?; + this.handle_item(&element, &pad, buffer).await?; } Ok(gst::FlowSuccess::Ok) @@ -377,11 +377,11 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| { }); impl InputSelector { - fn unprepare(&self, element: &super::InputSelector) { + fn unprepare(&self) { let mut state = self.state.lock().unwrap(); - gst::debug!(CAT, obj: element, "Unpreparing"); + gst::debug!(CAT, imp: self, "Unpreparing"); *state = State::default(); - gst::debug!(CAT, obj: element, "Unprepared"); + gst::debug!(CAT, imp: self, "Unprepared"); } } @@ -433,13 +433,7 @@ impl ObjectImpl for InputSelector { PROPERTIES.as_ref() } - fn set_property( - &self, - _obj: &Self::Type, - _id: usize, - value: &glib::Value, - pspec: &glib::ParamSpec, - ) { + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { match pspec.name() { "context" => { let mut settings = self.settings.lock().unwrap(); @@ -488,7 +482,7 @@ impl ObjectImpl for InputSelector { } } - fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { match pspec.name() { "context" => { let settings = self.settings.lock().unwrap(); @@ -507,9 +501,10 @@ impl ObjectImpl for InputSelector { } } - fn constructed(&self, obj: &Self::Type) { - self.parent_constructed(obj); + fn constructed(&self) { + self.parent_constructed(); + let obj = self.instance(); obj.add_pad(self.src_pad.gst_pad()).unwrap(); obj.set_element_flags(gst::ElementFlags::PROVIDE_CLOCK | gst::ElementFlags::REQUIRE_CLOCK); } @@ -558,16 +553,15 @@ impl ElementImpl for InputSelector { fn change_state( &self, - element: &Self::Type, transition: gst::StateChange, ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> { - gst::trace!(CAT, obj: element, "Changing state {:?}", transition); + gst::trace!(CAT, imp: self, "Changing state {:?}", transition); if let gst::StateChange::ReadyToNull = transition { - self.unprepare(element); + self.unprepare(); } - let mut success = self.parent_change_state(element, transition)?; + let mut success = self.parent_change_state(transition)?; match transition { gst::StateChange::ReadyToPaused => { @@ -584,7 +578,6 @@ impl ElementImpl for InputSelector { fn request_new_pad( &self, - element: &Self::Type, templ: &gst::PadTemplate, _name: Option<String>, _caps: Option<&gst::Caps>, @@ -595,7 +588,7 @@ impl ElementImpl for InputSelector { gst::Pad::from_template(templ, Some(format!("sink_{}", pads.pad_serial).as_str())); pads.pad_serial += 1; sink_pad.set_active(true).unwrap(); - element.add_pad(&sink_pad).unwrap(); + self.instance().add_pad(&sink_pad).unwrap(); let sink_pad = PadSink::new(sink_pad, InputSelectorPadSinkHandler::default()); let ret = sink_pad.gst_pad().clone(); @@ -608,22 +601,30 @@ impl ElementImpl for InputSelector { drop(pads); drop(state); - let _ = element.post_message(gst::message::Latency::builder().src(element).build()); + let _ = self.instance().post_message( + gst::message::Latency::builder() + .src(&*self.instance()) + .build(), + ); Some(ret) } - fn release_pad(&self, element: &Self::Type, pad: &gst::Pad) { + fn release_pad(&self, pad: &gst::Pad) { let mut pads = self.pads.lock().unwrap(); let sink_pad = pads.sink_pads.remove(pad).unwrap(); drop(sink_pad); - element.remove_pad(pad).unwrap(); + self.instance().remove_pad(pad).unwrap(); drop(pads); - let _ = element.post_message(gst::message::Latency::builder().src(element).build()); + let _ = self.instance().post_message( + gst::message::Latency::builder() + .src(&*self.instance()) + .build(), + ); } - fn provide_clock(&self, _element: &Self::Type) -> Option<gst::Clock> { + fn provide_clock(&self) -> Option<gst::Clock> { Some(gst::SystemClock::obtain()) } } diff --git a/generic/threadshare/src/jitterbuffer/imp.rs b/generic/threadshare/src/jitterbuffer/imp.rs index 516c71c4f..f2ec87fa4 100644 --- a/generic/threadshare/src/jitterbuffer/imp.rs +++ b/generic/threadshare/src/jitterbuffer/imp.rs @@ -1279,16 +1279,16 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| { }); impl JitterBuffer { - fn clear_pt_map(&self, element: &super::JitterBuffer) { - gst::debug!(CAT, obj: element, "Clearing PT map"); + fn clear_pt_map(&self) { + gst::debug!(CAT, imp: self, "Clearing PT map"); let mut state = self.state.lock().unwrap(); state.clock_rate = None; state.jbuf.reset_skew(); } - fn prepare(&self, element: &super::JitterBuffer) -> Result<(), gst::ErrorMessage> { - gst::debug!(CAT, obj: element, "Preparing"); + fn prepare(&self) -> Result<(), gst::ErrorMessage> { + gst::debug!(CAT, imp: self, "Preparing"); let context = { let settings = self.settings.lock().unwrap(); @@ -1297,33 +1297,37 @@ impl JitterBuffer { self.task .prepare( - JitterBufferTask::new(element, &self.src_pad_handler, &self.sink_pad_handler), + JitterBufferTask::new( + &*self.instance(), + &self.src_pad_handler, + &self.sink_pad_handler, + ), context, ) .block_on()?; - gst::debug!(CAT, obj: element, "Prepared"); + gst::debug!(CAT, imp: self, "Prepared"); Ok(()) } - fn unprepare(&self, element: &super::JitterBuffer) { - gst::debug!(CAT, obj: element, "Unpreparing"); + fn unprepare(&self) { + gst::debug!(CAT, imp: self, "Unpreparing"); self.task.unprepare().block_on().unwrap(); - gst::debug!(CAT, obj: element, "Unprepared"); + gst::debug!(CAT, imp: self, "Unprepared"); } - fn start(&self, element: &super::JitterBuffer) -> Result<(), gst::ErrorMessage> { - gst::debug!(CAT, obj: element, "Starting"); + fn start(&self) -> Result<(), gst::ErrorMessage> { + gst::debug!(CAT, imp: self, "Starting"); self.task.start().block_on()?; - gst::debug!(CAT, obj: element, "Started"); + gst::debug!(CAT, imp: self, "Started"); Ok(()) } - fn stop(&self, element: &super::JitterBuffer) -> Result<(), gst::ErrorMessage> { - gst::debug!(CAT, obj: element, "Stopping"); + fn stop(&self) -> Result<(), gst::ErrorMessage> { + gst::debug!(CAT, imp: self, "Stopping"); self.task.stop().block_on()?; - gst::debug!(CAT, obj: element, "Stopped"); + gst::debug!(CAT, imp: self, "Stopped"); Ok(()) } } @@ -1410,7 +1414,7 @@ impl ObjectImpl for JitterBuffer { .class_handler(|_, args| { let element = args[0].get::<super::JitterBuffer>().expect("signal arg"); let jb = element.imp(); - jb.clear_pt_map(&element); + jb.clear_pt_map(); None }) .build(), @@ -1424,13 +1428,7 @@ impl ObjectImpl for JitterBuffer { SIGNALS.as_ref() } - fn set_property( - &self, - obj: &Self::Type, - _id: usize, - value: &glib::Value, - pspec: &glib::ParamSpec, - ) { + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { match pspec.name() { "latency" => { let latency = { @@ -1444,7 +1442,11 @@ impl ObjectImpl for JitterBuffer { let state = self.state.lock().unwrap(); state.jbuf.set_delay(latency); - let _ = obj.post_message(gst::message::Latency::builder().src(obj).build()); + let _ = self.instance().post_message( + gst::message::Latency::builder() + .src(&*self.instance()) + .build(), + ); } "do-lost" => { let mut settings = self.settings.lock().unwrap(); @@ -1475,7 +1477,7 @@ impl ObjectImpl for JitterBuffer { } } - fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { match pspec.name() { "latency" => { let settings = self.settings.lock().unwrap(); @@ -1514,9 +1516,10 @@ impl ObjectImpl for JitterBuffer { } } - fn constructed(&self, obj: &Self::Type) { - self.parent_constructed(obj); + fn constructed(&self) { + self.parent_constructed(); + let obj = self.instance(); obj.add_pad(self.sink_pad.gst_pad()).unwrap(); obj.add_pad(self.src_pad.gst_pad()).unwrap(); obj.set_element_flags(gst::ElementFlags::PROVIDE_CLOCK | gst::ElementFlags::REQUIRE_CLOCK); @@ -1567,32 +1570,31 @@ impl ElementImpl for JitterBuffer { fn change_state( &self, - element: &Self::Type, transition: gst::StateChange, ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> { - gst::trace!(CAT, obj: element, "Changing state {:?}", transition); + gst::trace!(CAT, imp: self, "Changing state {:?}", transition); match transition { gst::StateChange::NullToReady => { - self.prepare(element).map_err(|err| { - element.post_error_message(err); + self.prepare().map_err(|err| { + self.post_error_message(err); gst::StateChangeError })?; } gst::StateChange::PausedToReady => { - self.stop(element).map_err(|_| gst::StateChangeError)?; + self.stop().map_err(|_| gst::StateChangeError)?; } gst::StateChange::ReadyToNull => { - self.unprepare(element); + self.unprepare(); } _ => (), } - let mut success = self.parent_change_state(element, transition)?; + let mut success = self.parent_change_state(transition)?; match transition { gst::StateChange::ReadyToPaused => { - self.start(element).map_err(|_| gst::StateChangeError)?; + self.start().map_err(|_| gst::StateChangeError)?; success = gst::StateChangeSuccess::NoPreroll; } gst::StateChange::PlayingToPaused => { @@ -1604,7 +1606,7 @@ impl ElementImpl for JitterBuffer { Ok(success) } - fn provide_clock(&self, _element: &Self::Type) -> Option<gst::Clock> { + fn provide_clock(&self) -> Option<gst::Clock> { Some(gst::SystemClock::obtain()) } } diff --git a/generic/threadshare/src/lib.rs b/generic/threadshare/src/lib.rs index 91debce1e..94671379b 100644 --- a/generic/threadshare/src/lib.rs +++ b/generic/threadshare/src/lib.rs @@ -28,7 +28,6 @@ mod jitterbuffer; mod proxy; mod queue; -use glib::translate::*; use gst::glib; fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { @@ -55,34 +54,3 @@ gst::plugin_define!( env!("CARGO_PKG_REPOSITORY"), env!("BUILD_REL_DATE") ); - -pub fn set_element_flags<T: glib::IsA<gst::Object> + glib::IsA<gst::Element>>( - element: &T, - flags: gst::ElementFlags, -) { - unsafe { - let ptr: *mut gst::ffi::GstObject = element.as_ptr() as *mut _; - let _guard = MutexGuard::lock(&(*ptr).lock); - (*ptr).flags |= flags.into_glib(); - } -} - -#[must_use = "if unused the Mutex will immediately unlock"] -struct MutexGuard<'a>(&'a glib::ffi::GMutex); - -impl<'a> MutexGuard<'a> { - pub fn lock(mutex: &'a glib::ffi::GMutex) -> Self { - unsafe { - glib::ffi::g_mutex_lock(mut_override(mutex)); - } - MutexGuard(mutex) - } -} - -impl<'a> Drop for MutexGuard<'a> { - fn drop(&mut self) { - unsafe { - glib::ffi::g_mutex_unlock(mut_override(self.0)); - } - } -} diff --git a/generic/threadshare/src/proxy/imp.rs b/generic/threadshare/src/proxy/imp.rs index f706db702..e7ce6b303 100644 --- a/generic/threadshare/src/proxy/imp.rs +++ b/generic/threadshare/src/proxy/imp.rs @@ -227,9 +227,7 @@ impl PadSinkHandler for ProxySinkPadHandler { let pad = pad_weak.upgrade().expect("PadSink no longer exists"); gst::log!(SINK_CAT, obj: pad.gst_pad(), "Handling {:?}", buffer); let proxysink = element.imp(); - proxysink - .enqueue_item(&element, DataQueueItem::Buffer(buffer)) - .await + proxysink.enqueue_item(DataQueueItem::Buffer(buffer)).await } .boxed() } @@ -248,7 +246,7 @@ impl PadSinkHandler for ProxySinkPadHandler { gst::log!(SINK_CAT, obj: pad.gst_pad(), "Handling {:?}", list); let proxysink = element.imp(); proxysink - .enqueue_item(&element, DataQueueItem::BufferList(list)) + .enqueue_item(DataQueueItem::BufferList(list)) .await } .boxed() @@ -258,7 +256,7 @@ impl PadSinkHandler for ProxySinkPadHandler { &self, pad: &PadSinkRef, proxysink: &ProxySink, - element: &gst::Element, + _element: &gst::Element, event: gst::Event, ) -> bool { use gst::EventView; @@ -277,7 +275,7 @@ impl PadSinkHandler for ProxySinkPadHandler { }; if let EventView::FlushStart(..) = event.view() { - proxysink.stop(element.downcast_ref::<super::ProxySink>().unwrap()); + proxysink.stop(); } if let Some(src_pad) = src_pad { @@ -311,13 +309,13 @@ impl PadSinkHandler for ProxySinkPadHandler { let _ = element.post_message(gst::message::Eos::builder().src(&element).build()); } - EventView::FlushStop(..) => proxysink.start(&element), + EventView::FlushStop(..) => proxysink.start(), _ => (), } gst::log!(SINK_CAT, obj: pad.gst_pad(), "Queuing serialized {:?}", event); proxysink - .enqueue_item(&element, DataQueueItem::Event(event)) + .enqueue_item(DataQueueItem::Event(event)) .await .is_ok() } @@ -341,13 +339,13 @@ static SINK_CAT: Lazy<gst::DebugCategory> = Lazy::new(|| { }); impl ProxySink { - async fn schedule_pending_queue(&self, element: &super::ProxySink) { + async fn schedule_pending_queue(&self) { loop { let more_queue_space_receiver = { let proxy_ctx = self.proxy_ctx.lock().unwrap(); let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared(); - gst::log!(SINK_CAT, obj: element, "Trying to empty pending queue"); + gst::log!(SINK_CAT, imp: self, "Trying to empty pending queue"); let ProxyContextInner { pending_queue: ref mut pq, @@ -372,7 +370,7 @@ impl ProxySink { receiver } else { - gst::log!(SINK_CAT, obj: element, "Pending queue is empty now"); + gst::log!(SINK_CAT, imp: self, "Pending queue is empty now"); *pq = None; return; } @@ -383,22 +381,18 @@ impl ProxySink { receiver } } else { - gst::log!(SINK_CAT, obj: element, "Flushing, dropping pending queue"); + gst::log!(SINK_CAT, imp: self, "Flushing, dropping pending queue"); *pq = None; return; } }; - gst::log!(SINK_CAT, obj: element, "Waiting for more queue space"); + gst::log!(SINK_CAT, imp: self, "Waiting for more queue space"); let _ = more_queue_space_receiver.await; } } - async fn enqueue_item( - &self, - element: &super::ProxySink, - item: DataQueueItem, - ) -> Result<gst::FlowSuccess, gst::FlowError> { + async fn enqueue_item(&self, item: DataQueueItem) -> Result<gst::FlowSuccess, gst::FlowError> { let wait_fut = { let proxy_ctx = self.proxy_ctx.lock().unwrap(); let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared(); @@ -463,18 +457,18 @@ impl ProxySink { gst::log!( SINK_CAT, - obj: element, + imp: self, "Proxy is full - Pushing first item on pending queue" ); if schedule_now { - gst::log!(SINK_CAT, obj: element, "Scheduling pending queue now"); + gst::log!(SINK_CAT, imp: self, "Scheduling pending queue now"); pending_queue.scheduled = true; - let wait_fut = self.schedule_pending_queue(element); + let wait_fut = self.schedule_pending_queue(); Some(wait_fut) } else { - gst::log!(SINK_CAT, obj: element, "Scheduling pending queue later"); + gst::log!(SINK_CAT, imp: self, "Scheduling pending queue later"); None } @@ -494,11 +488,7 @@ impl ProxySink { }; if let Some(wait_fut) = wait_fut { - gst::log!( - SINK_CAT, - obj: element, - "Blocking until queue has space again" - ); + gst::log!(SINK_CAT, imp: self, "Blocking until queue has space again"); wait_fut.await; } @@ -507,8 +497,8 @@ impl ProxySink { shared_ctx.last_res } - fn prepare(&self, element: &super::ProxySink) -> Result<(), gst::ErrorMessage> { - gst::debug!(SINK_CAT, obj: element, "Preparing"); + fn prepare(&self) -> Result<(), gst::ErrorMessage> { + gst::debug!(SINK_CAT, imp: self, "Preparing"); let proxy_context = self.settings.lock().unwrap().proxy_context.to_string(); @@ -527,22 +517,22 @@ impl ProxySink { *self.proxy_ctx.lock().unwrap() = Some(proxy_ctx); - gst::debug!(SINK_CAT, obj: element, "Prepared"); + gst::debug!(SINK_CAT, imp: self, "Prepared"); Ok(()) } - fn unprepare(&self, element: &super::ProxySink) { - gst::debug!(SINK_CAT, obj: element, "Unpreparing"); + fn unprepare(&self) { + gst::debug!(SINK_CAT, imp: self, "Unpreparing"); *self.proxy_ctx.lock().unwrap() = None; - gst::debug!(SINK_CAT, obj: element, "Unprepared"); + gst::debug!(SINK_CAT, imp: self, "Unprepared"); } - fn start(&self, element: &super::ProxySink) { + fn start(&self) { let proxy_ctx = self.proxy_ctx.lock().unwrap(); let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared(); - gst::debug!(SINK_CAT, obj: element, "Starting"); + gst::debug!(SINK_CAT, imp: self, "Starting"); { let settings = self.settings.lock().unwrap(); @@ -552,19 +542,19 @@ impl ProxySink { shared_ctx.last_res = Ok(gst::FlowSuccess::Ok); - gst::debug!(SINK_CAT, obj: element, "Started"); + gst::debug!(SINK_CAT, imp: self, "Started"); } - fn stop(&self, element: &super::ProxySink) { + fn stop(&self) { let proxy_ctx = self.proxy_ctx.lock().unwrap(); let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared(); - gst::debug!(SINK_CAT, obj: element, "Stopping"); + gst::debug!(SINK_CAT, imp: self, "Stopping"); let _ = shared_ctx.pending_queue.take(); shared_ctx.last_res = Err(gst::FlowError::Flushing); - gst::debug!(SINK_CAT, obj: element, "Stopped"); + gst::debug!(SINK_CAT, imp: self, "Stopped"); } } @@ -599,13 +589,7 @@ impl ObjectImpl for ProxySink { PROPERTIES.as_ref() } - fn set_property( - &self, - _obj: &Self::Type, - _id: usize, - value: &glib::Value, - pspec: &glib::ParamSpec, - ) { + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { let mut settings = self.settings.lock().unwrap(); match pspec.name() { "proxy-context" => { @@ -618,7 +602,7 @@ impl ObjectImpl for ProxySink { } } - fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { let settings = self.settings.lock().unwrap(); match pspec.name() { "proxy-context" => settings.proxy_context.to_value(), @@ -626,12 +610,12 @@ impl ObjectImpl for ProxySink { } } - fn constructed(&self, obj: &Self::Type) { - self.parent_constructed(obj); + fn constructed(&self) { + self.parent_constructed(); + let obj = self.instance(); obj.add_pad(self.sink_pad.gst_pad()).unwrap(); - - crate::set_element_flags(obj, gst::ElementFlags::SINK); + obj.set_element_flags(gst::ElementFlags::SINK); } } @@ -671,31 +655,30 @@ impl ElementImpl for ProxySink { fn change_state( &self, - element: &Self::Type, transition: gst::StateChange, ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> { - gst::trace!(SINK_CAT, obj: element, "Changing state {:?}", transition); + gst::trace!(SINK_CAT, imp: self, "Changing state {:?}", transition); match transition { gst::StateChange::NullToReady => { - self.prepare(element).map_err(|err| { - element.post_error_message(err); + self.prepare().map_err(|err| { + self.post_error_message(err); gst::StateChangeError })?; } gst::StateChange::PausedToReady => { - self.stop(element); + self.stop(); } gst::StateChange::ReadyToNull => { - self.unprepare(element); + self.unprepare(); } _ => (), } - let success = self.parent_change_state(element, transition)?; + let success = self.parent_change_state(transition)?; if transition == gst::StateChange::ReadyToPaused { - self.start(element); + self.start(); } Ok(success) @@ -992,8 +975,8 @@ static SRC_CAT: Lazy<gst::DebugCategory> = Lazy::new(|| { }); impl ProxySrc { - fn prepare(&self, element: &super::ProxySrc) -> Result<(), gst::ErrorMessage> { - gst::debug!(SRC_CAT, obj: element, "Preparing"); + fn prepare(&self) -> Result<(), gst::ErrorMessage> { + gst::debug!(SRC_CAT, imp: self, "Preparing"); let settings = self.settings.lock().unwrap().clone(); @@ -1012,7 +995,7 @@ impl ProxySrc { })?; let dataqueue = DataQueue::new( - &element.clone().upcast(), + &self.instance().clone().upcast(), self.src_pad.gst_pad(), if settings.max_size_buffers == 0 { None @@ -1044,16 +1027,19 @@ impl ProxySrc { *self.dataqueue.lock().unwrap() = Some(dataqueue.clone()); self.task - .prepare(ProxySrcTask::new(element.clone(), dataqueue), ts_ctx) + .prepare( + ProxySrcTask::new(self.instance().clone(), dataqueue), + ts_ctx, + ) .block_on()?; - gst::debug!(SRC_CAT, obj: element, "Prepared"); + gst::debug!(SRC_CAT, imp: self, "Prepared"); Ok(()) } - fn unprepare(&self, element: &super::ProxySrc) { - gst::debug!(SRC_CAT, obj: element, "Unpreparing"); + fn unprepare(&self) { + gst::debug!(SRC_CAT, imp: self, "Unpreparing"); { let settings = self.settings.lock().unwrap(); @@ -1066,27 +1052,27 @@ impl ProxySrc { *self.dataqueue.lock().unwrap() = None; *self.proxy_ctx.lock().unwrap() = None; - gst::debug!(SRC_CAT, obj: element, "Unprepared"); + gst::debug!(SRC_CAT, imp: self, "Unprepared"); } - fn stop(&self, element: &super::ProxySrc) -> Result<(), gst::ErrorMessage> { - gst::debug!(SRC_CAT, obj: element, "Stopping"); + fn stop(&self) -> Result<(), gst::ErrorMessage> { + gst::debug!(SRC_CAT, imp: self, "Stopping"); self.task.stop().await_maybe_on_context()?; - gst::debug!(SRC_CAT, obj: element, "Stopped"); + gst::debug!(SRC_CAT, imp: self, "Stopped"); Ok(()) } - fn start(&self, element: &super::ProxySrc) -> Result<(), gst::ErrorMessage> { - gst::debug!(SRC_CAT, obj: element, "Starting"); + fn start(&self) -> Result<(), gst::ErrorMessage> { + gst::debug!(SRC_CAT, imp: self, "Starting"); self.task.start().await_maybe_on_context()?; - gst::debug!(SRC_CAT, obj: element, "Started"); + gst::debug!(SRC_CAT, imp: self, "Started"); Ok(()) } - fn pause(&self, element: &super::ProxySrc) -> Result<(), gst::ErrorMessage> { - gst::debug!(SRC_CAT, obj: element, "Pausing"); + fn pause(&self) -> Result<(), gst::ErrorMessage> { + gst::debug!(SRC_CAT, imp: self, "Pausing"); self.task.pause().block_on()?; - gst::debug!(SRC_CAT, obj: element, "Paused"); + gst::debug!(SRC_CAT, imp: self, "Paused"); Ok(()) } } @@ -1153,13 +1139,7 @@ impl ObjectImpl for ProxySrc { PROPERTIES.as_ref() } - fn set_property( - &self, - _obj: &Self::Type, - _id: usize, - value: &glib::Value, - pspec: &glib::ParamSpec, - ) { + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { let mut settings = self.settings.lock().unwrap(); match pspec.name() { "max-size-buffers" => { @@ -1193,7 +1173,7 @@ impl ObjectImpl for ProxySrc { } } - fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { let settings = self.settings.lock().unwrap(); match pspec.name() { "max-size-buffers" => settings.max_size_buffers.to_value(), @@ -1206,12 +1186,12 @@ impl ObjectImpl for ProxySrc { } } - fn constructed(&self, obj: &Self::Type) { - self.parent_constructed(obj); + fn constructed(&self) { + self.parent_constructed(); + let obj = self.instance(); obj.add_pad(self.src_pad.gst_pad()).unwrap(); - - crate::set_element_flags(obj, gst::ElementFlags::SOURCE); + obj.set_element_flags(gst::ElementFlags::SOURCE); } } @@ -1251,41 +1231,40 @@ impl ElementImpl for ProxySrc { fn change_state( &self, - element: &Self::Type, transition: gst::StateChange, ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> { - gst::trace!(SRC_CAT, obj: element, "Changing state {:?}", transition); + gst::trace!(SRC_CAT, imp: self, "Changing state {:?}", transition); match transition { gst::StateChange::NullToReady => { - self.prepare(element).map_err(|err| { - element.post_error_message(err); + self.prepare().map_err(|err| { + self.post_error_message(err); gst::StateChangeError })?; } gst::StateChange::PlayingToPaused => { - self.pause(element).map_err(|_| gst::StateChangeError)?; + self.pause().map_err(|_| gst::StateChangeError)?; } gst::StateChange::ReadyToNull => { - self.unprepare(element); + self.unprepare(); } _ => (), } - let mut success = self.parent_change_state(element, transition)?; + let mut success = self.parent_change_state(transition)?; match transition { gst::StateChange::ReadyToPaused => { success = gst::StateChangeSuccess::NoPreroll; } gst::StateChange::PausedToPlaying => { - self.start(element).map_err(|_| gst::StateChangeError)?; + self.start().map_err(|_| gst::StateChangeError)?; } gst::StateChange::PlayingToPaused => { success = gst::StateChangeSuccess::NoPreroll; } gst::StateChange::PausedToReady => { - self.stop(element).map_err(|_| gst::StateChangeError)?; + self.stop().map_err(|_| gst::StateChangeError)?; } _ => (), } diff --git a/generic/threadshare/src/queue/imp.rs b/generic/threadshare/src/queue/imp.rs index a3c3b6a1c..b3d1df622 100644 --- a/generic/threadshare/src/queue/imp.rs +++ b/generic/threadshare/src/queue/imp.rs @@ -96,9 +96,7 @@ impl PadSinkHandler for QueuePadSinkHandler { let pad = pad_weak.upgrade().expect("PadSink no longer exists"); gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", buffer); let queue = element.imp(); - queue - .enqueue_item(&element, DataQueueItem::Buffer(buffer)) - .await + queue.enqueue_item(DataQueueItem::Buffer(buffer)).await } .boxed() } @@ -116,9 +114,7 @@ impl PadSinkHandler for QueuePadSinkHandler { let pad = pad_weak.upgrade().expect("PadSink no longer exists"); gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", list); let queue = element.imp(); - queue - .enqueue_item(&element, DataQueueItem::BufferList(list)) - .await + queue.enqueue_item(DataQueueItem::BufferList(list)).await } .boxed() } @@ -183,7 +179,7 @@ impl PadSinkHandler for QueuePadSinkHandler { gst::log!(CAT, obj: pad.gst_pad(), "Queuing serialized {:?}", event); queue - .enqueue_item(&element, DataQueueItem::Event(event)) + .enqueue_item(DataQueueItem::Event(event)) .await .is_ok() } @@ -495,7 +491,7 @@ impl Queue { /* Schedules emptying of the pending queue. If there is an upstream * TaskContext, the new task is spawned, it is otherwise * returned, for the caller to block on */ - async fn schedule_pending_queue(&self, element: &super::Queue) { + async fn schedule_pending_queue(&self) { loop { let more_queue_space_receiver = { let dataqueue = self.dataqueue.lock().unwrap(); @@ -504,7 +500,7 @@ impl Queue { } let mut pending_queue_grd = self.pending_queue.lock().unwrap(); - gst::log!(CAT, obj: element, "Trying to empty pending queue"); + gst::log!(CAT, imp: self, "Trying to empty pending queue"); if let Some(pending_queue) = pending_queue_grd.as_mut() { let mut failed_item = None; @@ -521,30 +517,26 @@ impl Queue { receiver } else { - gst::log!(CAT, obj: element, "Pending queue is empty now"); + gst::log!(CAT, imp: self, "Pending queue is empty now"); *pending_queue_grd = None; return; } } else { - gst::log!(CAT, obj: element, "Flushing, dropping pending queue"); + gst::log!(CAT, imp: self, "Flushing, dropping pending queue"); return; } }; - gst::log!(CAT, obj: element, "Waiting for more queue space"); + gst::log!(CAT, imp: self, "Waiting for more queue space"); let _ = more_queue_space_receiver.await; } } - async fn enqueue_item( - &self, - element: &super::Queue, - item: DataQueueItem, - ) -> Result<gst::FlowSuccess, gst::FlowError> { + async fn enqueue_item(&self, item: DataQueueItem) -> Result<gst::FlowSuccess, gst::FlowError> { let wait_fut = { let dataqueue = self.dataqueue.lock().unwrap(); let dataqueue = dataqueue.as_ref().ok_or_else(|| { - gst::error!(CAT, obj: element, "No DataQueue"); + gst::error!(CAT, imp: self, "No DataQueue"); gst::FlowError::Error })?; @@ -573,18 +565,18 @@ impl Queue { gst::log!( CAT, - obj: element, + imp: self, "Queue is full - Pushing first item on pending queue" ); if schedule_now { - gst::log!(CAT, obj: element, "Scheduling pending queue now"); + gst::log!(CAT, imp: self, "Scheduling pending queue now"); pending_queue.as_mut().unwrap().scheduled = true; - let wait_fut = self.schedule_pending_queue(element); + let wait_fut = self.schedule_pending_queue(); Some(wait_fut) } else { - gst::log!(CAT, obj: element, "Scheduling pending queue later"); + gst::log!(CAT, imp: self, "Scheduling pending queue later"); None } } else { @@ -597,20 +589,20 @@ impl Queue { }; if let Some(wait_fut) = wait_fut { - gst::log!(CAT, obj: element, "Blocking until queue has space again"); + gst::log!(CAT, imp: self, "Blocking until queue has space again"); wait_fut.await; } *self.last_res.lock().unwrap() } - fn prepare(&self, element: &super::Queue) -> Result<(), gst::ErrorMessage> { - gst::debug!(CAT, obj: element, "Preparing"); + fn prepare(&self) -> Result<(), gst::ErrorMessage> { + gst::debug!(CAT, imp: self, "Preparing"); let settings = self.settings.lock().unwrap().clone(); let dataqueue = DataQueue::new( - &element.clone().upcast(), + &self.instance().clone().upcast(), self.src_pad.gst_pad(), if settings.max_size_buffers == 0 { None @@ -640,16 +632,16 @@ impl Queue { })?; self.task - .prepare(QueueTask::new(element.clone(), dataqueue), context) + .prepare(QueueTask::new(self.instance().clone(), dataqueue), context) .block_on()?; - gst::debug!(CAT, obj: element, "Prepared"); + gst::debug!(CAT, imp: self, "Prepared"); Ok(()) } - fn unprepare(&self, element: &super::Queue) { - gst::debug!(CAT, obj: element, "Unpreparing"); + fn unprepare(&self) { + gst::debug!(CAT, imp: self, "Unpreparing"); self.task.unprepare().block_on().unwrap(); @@ -658,20 +650,20 @@ impl Queue { *self.last_res.lock().unwrap() = Ok(gst::FlowSuccess::Ok); - gst::debug!(CAT, obj: element, "Unprepared"); + gst::debug!(CAT, imp: self, "Unprepared"); } - fn stop(&self, element: &super::Queue) -> Result<(), gst::ErrorMessage> { - gst::debug!(CAT, obj: element, "Stopping"); + fn stop(&self) -> Result<(), gst::ErrorMessage> { + gst::debug!(CAT, imp: self, "Stopping"); self.task.stop().await_maybe_on_context()?; - gst::debug!(CAT, obj: element, "Stopped"); + gst::debug!(CAT, imp: self, "Stopped"); Ok(()) } - fn start(&self, element: &super::Queue) -> Result<(), gst::ErrorMessage> { - gst::debug!(CAT, obj: element, "Starting"); + fn start(&self) -> Result<(), gst::ErrorMessage> { + gst::debug!(CAT, imp: self, "Starting"); self.task.start().await_maybe_on_context()?; - gst::debug!(CAT, obj: element, "Started"); + gst::debug!(CAT, imp: self, "Started"); Ok(()) } } @@ -738,13 +730,7 @@ impl ObjectImpl for Queue { PROPERTIES.as_ref() } - fn set_property( - &self, - _obj: &Self::Type, - _id: usize, - value: &glib::Value, - pspec: &glib::ParamSpec, - ) { + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { let mut settings = self.settings.lock().unwrap(); match pspec.name() { "max-size-buffers" => { @@ -772,7 +758,7 @@ impl ObjectImpl for Queue { } } - fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { let settings = self.settings.lock().unwrap(); match pspec.name() { "max-size-buffers" => settings.max_size_buffers.to_value(), @@ -784,9 +770,10 @@ impl ObjectImpl for Queue { } } - fn constructed(&self, obj: &Self::Type) { - self.parent_constructed(obj); + fn constructed(&self) { + self.parent_constructed(); + let obj = self.instance(); obj.add_pad(self.sink_pad.gst_pad()).unwrap(); obj.add_pad(self.src_pad.gst_pad()).unwrap(); } @@ -836,31 +823,30 @@ impl ElementImpl for Queue { fn change_state( &self, - element: &Self::Type, transition: gst::StateChange, ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> { - gst::trace!(CAT, obj: element, "Changing state {:?}", transition); + gst::trace!(CAT, imp: self, "Changing state {:?}", transition); match transition { gst::StateChange::NullToReady => { - self.prepare(element).map_err(|err| { - element.post_error_message(err); + self.prepare().map_err(|err| { + self.post_error_message(err); gst::StateChangeError })?; } gst::StateChange::PausedToReady => { - self.stop(element).map_err(|_| gst::StateChangeError)?; + self.stop().map_err(|_| gst::StateChangeError)?; } gst::StateChange::ReadyToNull => { - self.unprepare(element); + self.unprepare(); } _ => (), } - let success = self.parent_change_state(element, transition)?; + let success = self.parent_change_state(transition)?; if transition == gst::StateChange::ReadyToPaused { - self.start(element).map_err(|_| gst::StateChangeError)?; + self.start().map_err(|_| gst::StateChangeError)?; } Ok(success) diff --git a/generic/threadshare/src/runtime/pad.rs b/generic/threadshare/src/runtime/pad.rs index 89e144776..7b1b1496e 100644 --- a/generic/threadshare/src/runtime/pad.rs +++ b/generic/threadshare/src/runtime/pad.rs @@ -398,8 +398,9 @@ impl PadSrc { "Panic in PadSrc activate" )) }, - move |imp, element| { + move |imp| { let this_ref = PadSrcRef::new(inner_arc); + let element = imp.instance(); handler.src_activate( &this_ref, imp, @@ -424,8 +425,9 @@ impl PadSrc { "Panic in PadSrc activatemode" )) }, - move |imp, element| { + move |imp| { let this_ref = PadSrcRef::new(inner_arc); + let element = imp.instance(); this_ref.activate_mode_hook(mode, active)?; handler.src_activatemode( &this_ref, @@ -449,8 +451,9 @@ impl PadSrc { H::ElementImpl::catch_panic_pad_function( parent, || Err(FlowError::Error), - move |imp, element| { + move |imp| { let this_ref = PadSrcRef::new(inner_arc); + let element = imp.instance(); handler.src_event_full( &this_ref, imp, @@ -469,8 +472,9 @@ impl PadSrc { H::ElementImpl::catch_panic_pad_function( parent, || false, - move |imp, element| { + move |imp| { let this_ref = PadSrcRef::new(inner_arc); + let element = imp.instance(); if !query.is_serialized() { handler.src_query(&this_ref, imp, element.dynamic_cast_ref::<gst::Element>().unwrap(), query) } else { @@ -807,8 +811,9 @@ impl PadSink { "Panic in PadSink activate" )) }, - move |imp, element| { + move |imp| { let this_ref = PadSinkRef::new(inner_arc); + let element = imp.instance(); handler.sink_activate( &this_ref, imp, @@ -833,8 +838,9 @@ impl PadSink { "Panic in PadSink activatemode" )) }, - move |imp, element| { + move |imp| { let this_ref = PadSinkRef::new(inner_arc); + let element = imp.instance(); this_ref.activate_mode_hook(mode, active)?; handler.sink_activatemode( @@ -857,7 +863,8 @@ impl PadSink { H::ElementImpl::catch_panic_pad_function( parent, || Err(FlowError::Error), - move |imp, element| { + move |imp| { + let element = imp.instance(); if let Some((ctx, task_id)) = Context::current_task() { let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc)); let handler = handler.clone(); @@ -898,7 +905,8 @@ impl PadSink { H::ElementImpl::catch_panic_pad_function( parent, || Err(FlowError::Error), - move |imp, element| { + move |imp| { + let element = imp.instance(); if let Some((ctx, task_id)) = Context::current_task() { let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc)); let handler = handler.clone(); @@ -943,7 +951,8 @@ impl PadSink { H::ElementImpl::catch_panic_pad_function( parent, || Err(FlowError::Error), - move |imp, element| { + move |imp| { + let element = imp.instance(); if event.is_serialized() { if let Some((ctx, task_id)) = Context::current_task() { let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc)); @@ -1001,8 +1010,9 @@ impl PadSink { H::ElementImpl::catch_panic_pad_function( parent, || false, - move |imp, element| { + move |imp| { let this_ref = PadSinkRef::new(inner_arc); + let element = imp.instance(); if !query.is_serialized() { handler.sink_query(&this_ref, imp, element.dynamic_cast_ref::<gst::Element>().unwrap(), query) } else { diff --git a/generic/threadshare/src/tcpclientsrc/imp.rs b/generic/threadshare/src/tcpclientsrc/imp.rs index bd272c693..41cdaced3 100644 --- a/generic/threadshare/src/tcpclientsrc/imp.rs +++ b/generic/threadshare/src/tcpclientsrc/imp.rs @@ -408,8 +408,8 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| { }); impl TcpClientSrc { - fn prepare(&self, element: &super::TcpClientSrc) -> Result<(), gst::ErrorMessage> { - gst::debug!(CAT, obj: element, "Preparing"); + fn prepare(&self) -> Result<(), gst::ErrorMessage> { + gst::debug!(CAT, imp: self, "Preparing"); let settings = self.settings.lock().unwrap().clone(); let context = @@ -459,40 +459,40 @@ impl TcpClientSrc { let _ = self .task .prepare( - TcpClientSrcTask::new(element.clone(), saddr, buffer_pool), + TcpClientSrcTask::new(self.instance().clone(), saddr, buffer_pool), context, ) .check()?; - gst::debug!(CAT, obj: element, "Preparing asynchronously"); + gst::debug!(CAT, imp: self, "Preparing asynchronously"); Ok(()) } - fn unprepare(&self, element: &super::TcpClientSrc) { - gst::debug!(CAT, obj: element, "Unpreparing"); + fn unprepare(&self) { + gst::debug!(CAT, imp: self, "Unpreparing"); self.task.unprepare().block_on().unwrap(); - gst::debug!(CAT, obj: element, "Unprepared"); + gst::debug!(CAT, imp: self, "Unprepared"); } - fn stop(&self, element: &super::TcpClientSrc) -> Result<(), gst::ErrorMessage> { - gst::debug!(CAT, obj: element, "Stopping"); + fn stop(&self) -> Result<(), gst::ErrorMessage> { + gst::debug!(CAT, imp: self, "Stopping"); self.task.stop().block_on()?; - gst::debug!(CAT, obj: element, "Stopped"); + gst::debug!(CAT, imp: self, "Stopped"); Ok(()) } - fn start(&self, element: &super::TcpClientSrc) -> Result<(), gst::ErrorMessage> { - gst::debug!(CAT, obj: element, "Starting"); + fn start(&self) -> Result<(), gst::ErrorMessage> { + gst::debug!(CAT, imp: self, "Starting"); self.task.start().block_on()?; - gst::debug!(CAT, obj: element, "Started"); + gst::debug!(CAT, imp: self, "Started"); Ok(()) } - fn pause(&self, element: &super::TcpClientSrc) -> Result<(), gst::ErrorMessage> { - gst::debug!(CAT, obj: element, "Pausing"); + fn pause(&self) -> Result<(), gst::ErrorMessage> { + gst::debug!(CAT, imp: self, "Pausing"); self.task.pause().block_on()?; - gst::debug!(CAT, obj: element, "Paused"); + gst::debug!(CAT, imp: self, "Paused"); Ok(()) } } @@ -558,13 +558,7 @@ impl ObjectImpl for TcpClientSrc { PROPERTIES.as_ref() } - fn set_property( - &self, - _obj: &Self::Type, - _id: usize, - value: &glib::Value, - pspec: &glib::ParamSpec, - ) { + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { let mut settings = self.settings.lock().unwrap(); match pspec.name() { "host" => { @@ -594,7 +588,7 @@ impl ObjectImpl for TcpClientSrc { } } - fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { let settings = self.settings.lock().unwrap(); match pspec.name() { "host" => settings.host.to_value(), @@ -607,12 +601,12 @@ impl ObjectImpl for TcpClientSrc { } } - fn constructed(&self, obj: &Self::Type) { - self.parent_constructed(obj); + fn constructed(&self) { + self.parent_constructed(); + let obj = self.instance(); obj.add_pad(self.src_pad.gst_pad()).unwrap(); - - crate::set_element_flags(obj, gst::ElementFlags::SOURCE); + obj.set_element_flags(gst::ElementFlags::SOURCE); } } @@ -651,41 +645,40 @@ impl ElementImpl for TcpClientSrc { fn change_state( &self, - element: &Self::Type, transition: gst::StateChange, ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> { - gst::trace!(CAT, obj: element, "Changing state {:?}", transition); + gst::trace!(CAT, imp: self, "Changing state {:?}", transition); match transition { gst::StateChange::NullToReady => { - self.prepare(element).map_err(|err| { - element.post_error_message(err); + self.prepare().map_err(|err| { + self.post_error_message(err); gst::StateChangeError })?; } gst::StateChange::PlayingToPaused => { - self.pause(element).map_err(|_| gst::StateChangeError)?; + self.pause().map_err(|_| gst::StateChangeError)?; } gst::StateChange::ReadyToNull => { - self.unprepare(element); + self.unprepare(); } _ => (), } - let mut success = self.parent_change_state(element, transition)?; + let mut success = self.parent_change_state(transition)?; match transition { gst::StateChange::ReadyToPaused => { success = gst::StateChangeSuccess::NoPreroll; } gst::StateChange::PausedToPlaying => { - self.start(element).map_err(|_| gst::StateChangeError)?; + self.start().map_err(|_| gst::StateChangeError)?; } gst::StateChange::PlayingToPaused => { success = gst::StateChangeSuccess::NoPreroll; } gst::StateChange::PausedToReady => { - self.stop(element).map_err(|_| gst::StateChangeError)?; + self.stop().map_err(|_| gst::StateChangeError)?; } _ => (), } diff --git a/generic/threadshare/src/udpsink/imp.rs b/generic/threadshare/src/udpsink/imp.rs index bc9f6c208..b99e432c3 100644 --- a/generic/threadshare/src/udpsink/imp.rs +++ b/generic/threadshare/src/udpsink/imp.rs @@ -838,8 +838,8 @@ impl UdpSink { self.item_sender.lock().unwrap().as_ref().unwrap().clone() } - fn prepare(&self, element: &super::UdpSink) -> Result<(), gst::ErrorMessage> { - gst::debug!(CAT, obj: element, "Preparing"); + fn prepare(&self) -> Result<(), gst::ErrorMessage> { + gst::debug!(CAT, imp: self, "Preparing"); let context = { let settings = self.settings.lock().unwrap(); @@ -855,39 +855,37 @@ impl UdpSink { // Enable backpressure for items let (item_sender, item_receiver) = flume::bounded(0); let (cmd_sender, cmd_receiver) = flume::unbounded(); - let task_impl = UdpSinkTask::new(element, item_receiver, cmd_receiver); + let task_impl = UdpSinkTask::new(&*self.instance(), item_receiver, cmd_receiver); self.task.prepare(task_impl, context).block_on()?; *self.item_sender.lock().unwrap() = Some(item_sender); *self.cmd_sender.lock().unwrap() = Some(cmd_sender); - gst::debug!(CAT, obj: element, "Started preparation"); + gst::debug!(CAT, imp: self, "Started preparation"); Ok(()) } - fn unprepare(&self, element: &super::UdpSink) { - gst::debug!(CAT, obj: element, "Unpreparing"); + fn unprepare(&self) { + gst::debug!(CAT, imp: self, "Unpreparing"); self.task.unprepare().block_on().unwrap(); - gst::debug!(CAT, obj: element, "Unprepared"); + gst::debug!(CAT, imp: self, "Unprepared"); } - fn stop(&self, element: &super::UdpSink) -> Result<(), gst::ErrorMessage> { - gst::debug!(CAT, obj: element, "Stopping"); + fn stop(&self) -> Result<(), gst::ErrorMessage> { + gst::debug!(CAT, imp: self, "Stopping"); self.task.stop().block_on()?; - gst::debug!(CAT, obj: element, "Stopped"); + gst::debug!(CAT, imp: self, "Stopped"); Ok(()) } - fn start(&self, element: &super::UdpSink) -> Result<(), gst::ErrorMessage> { - gst::debug!(CAT, obj: element, "Starting"); + fn start(&self) -> Result<(), gst::ErrorMessage> { + gst::debug!(CAT, imp: self, "Starting"); self.task.start().block_on()?; - gst::debug!(CAT, obj: element, "Started"); + gst::debug!(CAT, imp: self, "Started"); Ok(()) } -} -impl UdpSink { fn add_client(&self, settings: &mut Settings, client: SocketAddr) { settings.clients.insert(client); if let Some(cmd_sender) = self.cmd_sender.lock().unwrap().as_mut() { @@ -919,10 +917,10 @@ impl UdpSink { } } -fn try_into_socket_addr(element: &super::UdpSink, host: &str, port: i32) -> Result<SocketAddr, ()> { +fn try_into_socket_addr(imp: &UdpSink, host: &str, port: i32) -> Result<SocketAddr, ()> { let addr: IpAddr = match host.parse() { Err(err) => { - gst::error!(CAT, obj: element, "Failed to parse host {}: {}", host, err); + gst::error!(CAT, imp: imp, "Failed to parse host {}: {}", host, err); return Err(()); } Ok(addr) => addr, @@ -930,7 +928,7 @@ fn try_into_socket_addr(element: &super::UdpSink, host: &str, port: i32) -> Resu let port: u16 = match port.try_into() { Err(err) => { - gst::error!(CAT, obj: element, "Invalid port {}: {}", port, err); + gst::error!(CAT, imp: imp, "Invalid port {}: {}", port, err); return Err(()); } Ok(port) => port, @@ -1071,9 +1069,9 @@ impl ObjectImpl for UdpSink { let element = args[0].get::<super::UdpSink>().expect("signal arg"); let host = args[1].get::<String>().expect("signal arg"); let port = args[2].get::<i32>().expect("signal arg"); + let udpsink = element.imp(); - if let Ok(addr) = try_into_socket_addr(&element, &host, port) { - let udpsink = element.imp(); + if let Ok(addr) = try_into_socket_addr(udpsink, &host, port) { let mut settings = udpsink.settings.lock().unwrap(); udpsink.add_client(&mut settings, addr); } @@ -1088,9 +1086,9 @@ impl ObjectImpl for UdpSink { let element = args[0].get::<super::UdpSink>().expect("signal arg"); let host = args[1].get::<String>().expect("signal arg"); let port = args[2].get::<i32>().expect("signal arg"); + let udpsink = element.imp(); - if let Ok(addr) = try_into_socket_addr(&element, &host, port) { - let udpsink = element.imp(); + if let Ok(addr) = try_into_socket_addr(udpsink, &host, port) { let mut settings = udpsink.settings.lock().unwrap(); udpsink.remove_client(&mut settings, addr); } @@ -1116,13 +1114,7 @@ impl ObjectImpl for UdpSink { SIGNALS.as_ref() } - fn set_property( - &self, - obj: &Self::Type, - _id: usize, - value: &glib::Value, - pspec: &glib::ParamSpec, - ) { + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { let mut settings = self.settings.lock().unwrap(); match pspec.name() { "sync" => { @@ -1196,9 +1188,9 @@ impl ObjectImpl for UdpSink { rsplit[0] .parse::<i32>() .map_err(|err| { - gst::error!(CAT, obj: obj, "Invalid port {}: {}", rsplit[0], err); + gst::error!(CAT, imp: self, "Invalid port {}: {}", rsplit[0], err); }) - .and_then(|port| try_into_socket_addr(obj, rsplit[1], port)) + .and_then(|port| try_into_socket_addr(self, rsplit[1], port)) .ok() } else { None @@ -1222,7 +1214,7 @@ impl ObjectImpl for UdpSink { } } - fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { let settings = self.settings.lock().unwrap(); match pspec.name() { "sync" => settings.sync.to_value(), @@ -1268,12 +1260,12 @@ impl ObjectImpl for UdpSink { } } - fn constructed(&self, obj: &Self::Type) { - self.parent_constructed(obj); + fn constructed(&self) { + self.parent_constructed(); + let obj = self.instance(); obj.add_pad(self.sink_pad.gst_pad()).unwrap(); - - crate::set_element_flags(obj, gst::ElementFlags::SINK); + obj.set_element_flags(gst::ElementFlags::SINK); } } @@ -1313,34 +1305,33 @@ impl ElementImpl for UdpSink { fn change_state( &self, - element: &Self::Type, transition: gst::StateChange, ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> { - gst::trace!(CAT, obj: element, "Changing state {:?}", transition); + gst::trace!(CAT, imp: self, "Changing state {:?}", transition); match transition { gst::StateChange::NullToReady => { - self.prepare(element).map_err(|err| { - element.post_error_message(err); + self.prepare().map_err(|err| { + self.post_error_message(err); gst::StateChangeError })?; } gst::StateChange::ReadyToPaused => { - self.start(element).map_err(|_| gst::StateChangeError)?; + self.start().map_err(|_| gst::StateChangeError)?; } gst::StateChange::PausedToReady => { - self.stop(element).map_err(|_| gst::StateChangeError)?; + self.stop().map_err(|_| gst::StateChangeError)?; } gst::StateChange::ReadyToNull => { - self.unprepare(element); + self.unprepare(); } _ => (), } - self.parent_change_state(element, transition) + self.parent_change_state(transition) } - fn send_event(&self, _element: &Self::Type, event: gst::Event) -> bool { + fn send_event(&self, event: gst::Event) -> bool { match event.view() { EventView::Latency(ev) => { let latency = Some(ev.latency()); diff --git a/generic/threadshare/src/udpsrc/imp.rs b/generic/threadshare/src/udpsrc/imp.rs index 78c9acd26..5c7b79bf6 100644 --- a/generic/threadshare/src/udpsrc/imp.rs +++ b/generic/threadshare/src/udpsrc/imp.rs @@ -574,8 +574,8 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| { }); impl UdpSrc { - fn prepare(&self, element: &super::UdpSrc) -> Result<(), gst::ErrorMessage> { - gst::debug!(CAT, obj: element, "Preparing"); + fn prepare(&self) -> Result<(), gst::ErrorMessage> { + gst::debug!(CAT, imp: self, "Preparing"); let settings = self.settings.lock().unwrap(); let context = @@ -589,38 +589,38 @@ impl UdpSrc { *self.configured_caps.lock().unwrap() = None; self.task - .prepare(UdpSrcTask::new(element.clone()), context) + .prepare(UdpSrcTask::new(self.instance().clone()), context) .block_on()?; - gst::debug!(CAT, obj: element, "Prepared"); + gst::debug!(CAT, imp: self, "Prepared"); Ok(()) } - fn unprepare(&self, element: &super::UdpSrc) { - gst::debug!(CAT, obj: element, "Unpreparing"); + fn unprepare(&self) { + gst::debug!(CAT, imp: self, "Unpreparing"); self.task.unprepare().block_on().unwrap(); - gst::debug!(CAT, obj: element, "Unprepared"); + gst::debug!(CAT, imp: self, "Unprepared"); } - fn stop(&self, element: &super::UdpSrc) -> Result<(), gst::ErrorMessage> { - gst::debug!(CAT, obj: element, "Stopping"); + fn stop(&self) -> Result<(), gst::ErrorMessage> { + gst::debug!(CAT, imp: self, "Stopping"); self.task.stop().block_on()?; - gst::debug!(CAT, obj: element, "Stopped"); + gst::debug!(CAT, imp: self, "Stopped"); Ok(()) } - fn start(&self, element: &super::UdpSrc) -> Result<(), gst::ErrorMessage> { - gst::debug!(CAT, obj: element, "Starting"); + fn start(&self) -> Result<(), gst::ErrorMessage> { + gst::debug!(CAT, imp: self, "Starting"); self.task.start().block_on()?; - gst::debug!(CAT, obj: element, "Started"); + gst::debug!(CAT, imp: self, "Started"); Ok(()) } - fn pause(&self, element: &super::UdpSrc) -> Result<(), gst::ErrorMessage> { - gst::debug!(CAT, obj: element, "Pausing"); + fn pause(&self) -> Result<(), gst::ErrorMessage> { + gst::debug!(CAT, imp: self, "Pausing"); self.task.pause().block_on()?; - gst::debug!(CAT, obj: element, "Paused"); + gst::debug!(CAT, imp: self, "Paused"); Ok(()) } } @@ -716,13 +716,7 @@ impl ObjectImpl for UdpSrc { PROPERTIES.as_ref() } - fn set_property( - &self, - _obj: &Self::Type, - _id: usize, - value: &glib::Value, - pspec: &glib::ParamSpec, - ) { + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { let mut settings = self.settings.lock().unwrap(); match pspec.name() { "address" => { @@ -767,7 +761,7 @@ impl ObjectImpl for UdpSrc { } } - fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { let settings = self.settings.lock().unwrap(); match pspec.name() { "address" => settings.address.to_value(), @@ -792,12 +786,12 @@ impl ObjectImpl for UdpSrc { } } - fn constructed(&self, obj: &Self::Type) { - self.parent_constructed(obj); + fn constructed(&self) { + self.parent_constructed(); + let obj = self.instance(); obj.add_pad(self.src_pad.gst_pad()).unwrap(); - - crate::set_element_flags(obj, gst::ElementFlags::SOURCE); + obj.set_element_flags(gst::ElementFlags::SOURCE); } } @@ -836,41 +830,40 @@ impl ElementImpl for UdpSrc { fn change_state( &self, - element: &Self::Type, transition: gst::StateChange, ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> { - gst::trace!(CAT, obj: element, "Changing state {:?}", transition); + gst::trace!(CAT, imp: self, "Changing state {:?}", transition); match transition { gst::StateChange::NullToReady => { - self.prepare(element).map_err(|err| { - element.post_error_message(err); + self.prepare().map_err(|err| { + self.post_error_message(err); gst::StateChangeError })?; } gst::StateChange::PlayingToPaused => { - self.pause(element).map_err(|_| gst::StateChangeError)?; + self.pause().map_err(|_| gst::StateChangeError)?; } gst::StateChange::ReadyToNull => { - self.unprepare(element); + self.unprepare(); } _ => (), } - let mut success = self.parent_change_state(element, transition)?; + let mut success = self.parent_change_state(transition)?; match transition { gst::StateChange::ReadyToPaused => { success = gst::StateChangeSuccess::NoPreroll; } gst::StateChange::PausedToPlaying => { - self.start(element).map_err(|_| gst::StateChangeError)?; + self.start().map_err(|_| gst::StateChangeError)?; } gst::StateChange::PlayingToPaused => { success = gst::StateChangeSuccess::NoPreroll; } gst::StateChange::PausedToReady => { - self.stop(element).map_err(|_| gst::StateChangeError)?; + self.stop().map_err(|_| gst::StateChangeError)?; } _ => (), } diff --git a/generic/threadshare/tests/pad.rs b/generic/threadshare/tests/pad.rs index 84b3d4642..312d28208 100644 --- a/generic/threadshare/tests/pad.rs +++ b/generic/threadshare/tests/pad.rs @@ -230,8 +230,8 @@ mod imp_src { } } - fn prepare(&self, element: &super::ElementSrcTest) -> Result<(), gst::ErrorMessage> { - gst::debug!(SRC_CAT, obj: element, "Preparing"); + fn prepare(&self) -> Result<(), gst::ErrorMessage> { + gst::debug!(SRC_CAT, imp: self, "Preparing"); let settings = self.settings.lock().unwrap().clone(); let context = @@ -246,39 +246,42 @@ mod imp_src { *self.sender.lock().unwrap() = Some(sender); self.task - .prepare(ElementSrcTestTask::new(element.clone(), receiver), context) + .prepare( + ElementSrcTestTask::new(self.instance().clone(), receiver), + context, + ) .block_on()?; - gst::debug!(SRC_CAT, obj: element, "Prepared"); + gst::debug!(SRC_CAT, imp: self, "Prepared"); Ok(()) } - fn unprepare(&self, element: &super::ElementSrcTest) { - gst::debug!(SRC_CAT, obj: element, "Unpreparing"); + fn unprepare(&self) { + gst::debug!(SRC_CAT, imp: self, "Unpreparing"); *self.sender.lock().unwrap() = None; self.task.unprepare().block_on().unwrap(); - gst::debug!(SRC_CAT, obj: element, "Unprepared"); + gst::debug!(SRC_CAT, imp: self, "Unprepared"); } - fn stop(&self, element: &super::ElementSrcTest) { - gst::debug!(SRC_CAT, obj: element, "Stopping"); + fn stop(&self) { + gst::debug!(SRC_CAT, imp: self, "Stopping"); self.task.stop().await_maybe_on_context().unwrap(); - gst::debug!(SRC_CAT, obj: element, "Stopped"); + gst::debug!(SRC_CAT, imp: self, "Stopped"); } - fn start(&self, element: &super::ElementSrcTest) { - gst::debug!(SRC_CAT, obj: element, "Starting"); + fn start(&self) { + gst::debug!(SRC_CAT, imp: self, "Starting"); self.task.start().await_maybe_on_context().unwrap(); - gst::debug!(SRC_CAT, obj: element, "Started"); + gst::debug!(SRC_CAT, imp: self, "Started"); } - fn pause(&self, element: &super::ElementSrcTest) { - gst::debug!(SRC_CAT, obj: element, "Pausing"); + fn pause(&self) { + gst::debug!(SRC_CAT, imp: self, "Pausing"); self.task.pause().block_on().unwrap(); - gst::debug!(SRC_CAT, obj: element, "Paused"); + gst::debug!(SRC_CAT, imp: self, "Paused"); } } @@ -315,13 +318,7 @@ mod imp_src { PROPERTIES.as_ref() } - fn set_property( - &self, - _obj: &Self::Type, - _id: usize, - value: &glib::Value, - pspec: &glib::ParamSpec, - ) { + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { match pspec.name() { "context" => { let context = value @@ -335,9 +332,10 @@ mod imp_src { } } - fn constructed(&self, obj: &Self::Type) { - self.parent_constructed(obj); + fn constructed(&self) { + self.parent_constructed(); + let obj = self.instance(); obj.add_pad(self.src_pad.gst_pad()).unwrap(); } } @@ -377,35 +375,34 @@ mod imp_src { fn change_state( &self, - element: &Self::Type, transition: gst::StateChange, ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> { - gst::log!(SRC_CAT, obj: element, "Changing state {:?}", transition); + gst::log!(SRC_CAT, imp: self, "Changing state {:?}", transition); match transition { gst::StateChange::NullToReady => { - self.prepare(element).map_err(|err| { - element.post_error_message(err); + self.prepare().map_err(|err| { + self.post_error_message(err); gst::StateChangeError })?; } gst::StateChange::PlayingToPaused => { - self.pause(element); + self.pause(); } gst::StateChange::ReadyToNull => { - self.unprepare(element); + self.unprepare(); } _ => (), } - let mut success = self.parent_change_state(element, transition)?; + let mut success = self.parent_change_state(transition)?; match transition { gst::StateChange::PausedToReady => { - self.stop(element); + self.stop(); } gst::StateChange::PausedToPlaying => { - self.start(element); + self.start(); } gst::StateChange::ReadyToPaused | gst::StateChange::PlayingToPaused => { success = gst::StateChangeSuccess::NoPreroll; @@ -416,7 +413,7 @@ mod imp_src { Ok(success) } - fn send_event(&self, _element: &Self::Type, event: gst::Event) -> bool { + fn send_event(&self, event: gst::Event) -> bool { match event.view() { EventView::FlushStart(..) => { self.task.flush_start().await_maybe_on_context().unwrap(); @@ -464,9 +461,7 @@ mod imp_sink { .unwrap(); async move { let elem_sink_test = element.imp(); - elem_sink_test - .forward_item(&element, Item::Buffer(buffer)) - .await + elem_sink_test.forward_item(Item::Buffer(buffer)).await } .boxed() } @@ -484,9 +479,7 @@ mod imp_sink { .unwrap(); async move { let elem_sink_test = element.imp(); - elem_sink_test - .forward_item(&element, Item::BufferList(list)) - .await + elem_sink_test.forward_item(Item::BufferList(list)).await } .boxed() } @@ -495,14 +488,14 @@ mod imp_sink { &self, pad: &PadSinkRef, elem_sink_test: &ElementSinkTest, - element: &gst::Element, + _element: &gst::Element, event: gst::Event, ) -> bool { gst::debug!(SINK_CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event); match event.view() { EventView::FlushStart(..) => { - elem_sink_test.stop(element.downcast_ref::<super::ElementSinkTest>().unwrap()); + elem_sink_test.stop(); true } _ => false, @@ -526,11 +519,11 @@ mod imp_sink { let elem_sink_test = element.imp(); if let EventView::FlushStop(..) = event.view() { - elem_sink_test.start(&element); + elem_sink_test.start(); } elem_sink_test - .forward_item(&element, Item::Event(event)) + .forward_item(Item::Event(event)) .await .is_ok() } @@ -546,13 +539,9 @@ mod imp_sink { } impl ElementSinkTest { - async fn forward_item( - &self, - element: &super::ElementSinkTest, - item: Item, - ) -> Result<gst::FlowSuccess, gst::FlowError> { + async fn forward_item(&self, item: Item) -> Result<gst::FlowSuccess, gst::FlowError> { if !self.flushing.load(Ordering::SeqCst) { - gst::debug!(SINK_CAT, obj: element, "Fowarding {:?}", item); + gst::debug!(SINK_CAT, imp: self, "Fowarding {:?}", item); let mut sender = self .sender .lock() @@ -568,7 +557,7 @@ mod imp_sink { } else { gst::debug!( SINK_CAT, - obj: element, + imp: self, "Not fowarding {:?} due to flushing", item ); @@ -576,34 +565,32 @@ mod imp_sink { } } - fn start(&self, element: &super::ElementSinkTest) { - gst::debug!(SINK_CAT, obj: element, "Starting"); + fn start(&self) { + gst::debug!(SINK_CAT, imp: self, "Starting"); self.flushing.store(false, Ordering::SeqCst); - gst::debug!(SINK_CAT, obj: element, "Started"); + gst::debug!(SINK_CAT, imp: self, "Started"); } - fn stop(&self, element: &super::ElementSinkTest) { - gst::debug!(SINK_CAT, obj: element, "Stopping"); + fn stop(&self) { + gst::debug!(SINK_CAT, imp: self, "Stopping"); self.flushing.store(true, Ordering::SeqCst); - gst::debug!(SINK_CAT, obj: element, "Stopped"); + gst::debug!(SINK_CAT, imp: self, "Stopped"); } - } - impl ElementSinkTest { - pub fn push_flush_start(&self, element: &super::ElementSinkTest) { - gst::debug!(SINK_CAT, obj: element, "Pushing FlushStart"); + pub fn push_flush_start(&self) { + gst::debug!(SINK_CAT, imp: self, "Pushing FlushStart"); self.sink_pad .gst_pad() .push_event(gst::event::FlushStart::new()); - gst::debug!(SINK_CAT, obj: element, "FlushStart pushed"); + gst::debug!(SINK_CAT, imp: self, "FlushStart pushed"); } - pub fn push_flush_stop(&self, element: &super::ElementSinkTest) { - gst::debug!(SINK_CAT, obj: element, "Pushing FlushStop"); + pub fn push_flush_stop(&self) { + gst::debug!(SINK_CAT, imp: self, "Pushing FlushStop"); self.sink_pad .gst_pad() .push_event(gst::event::FlushStop::new(true)); - gst::debug!(SINK_CAT, obj: element, "FlushStop pushed"); + gst::debug!(SINK_CAT, imp: self, "FlushStop pushed"); } } @@ -647,13 +634,7 @@ mod imp_sink { PROPERTIES.as_ref() } - fn set_property( - &self, - _obj: &Self::Type, - _id: usize, - value: &glib::Value, - pspec: &glib::ParamSpec, - ) { + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { match pspec.name() { "sender" => { let ItemSender { sender } = value @@ -666,9 +647,10 @@ mod imp_sink { } } - fn constructed(&self, obj: &Self::Type) { - self.parent_constructed(obj); + fn constructed(&self) { + self.parent_constructed(); + let obj = self.instance(); obj.add_pad(self.sink_pad.gst_pad()).unwrap(); } } @@ -708,19 +690,18 @@ mod imp_sink { fn change_state( &self, - element: &Self::Type, transition: gst::StateChange, ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> { - gst::log!(SINK_CAT, obj: element, "Changing state {:?}", transition); + gst::log!(SINK_CAT, imp: self, "Changing state {:?}", transition); if let gst::StateChange::PausedToReady = transition { - self.stop(element); + self.stop(); } - let success = self.parent_change_state(element, transition)?; + let success = self.parent_change_state(transition)?; if let gst::StateChange::ReadyToPaused = transition { - self.start(element); + self.start(); } Ok(success) @@ -1225,13 +1206,13 @@ fn start_flush() { let elem_sink_test = sink_element.imp(); - elem_sink_test.push_flush_start(&sink_element); + elem_sink_test.push_flush_start(); elem_src_test .try_push(Item::Buffer(gst::Buffer::from_slice(vec![5, 6, 7]))) .unwrap_err(); - elem_sink_test.push_flush_stop(&sink_element); + elem_sink_test.push_flush_stop(); elem_src_test .try_push(Item::Event(gst::event::Segment::new( |