Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Dröge <sebastian@centricular.com>2022-10-09 16:06:59 +0300
committerSebastian Dröge <sebastian@centricular.com>2022-10-10 15:03:25 +0300
commit7ee4afacf413b2e3c386bb1070994ed4325994e6 (patch)
treeeddcc0e047ab4704e5a459dd551a55196e8a1848 /generic
parent7818ac658b02417fda071ce025b6d6a7fdb54a76 (diff)
Change *Impl trait methods to only take &self and not Self::Type in addition
Diffstat (limited to 'generic')
-rw-r--r--generic/file/src/filesink/imp.rs60
-rw-r--r--generic/file/src/filesrc/imp.rs67
-rw-r--r--generic/fmp4/src/fmp4mux/imp.rs203
-rw-r--r--generic/sodium/src/decrypter/imp.rs86
-rw-r--r--generic/sodium/src/encrypter/imp.rs56
-rw-r--r--generic/threadshare/examples/standalone/sink/imp.rs75
-rw-r--r--generic/threadshare/examples/standalone/src/imp.rs87
-rw-r--r--generic/threadshare/src/appsrc/imp.rs87
-rw-r--r--generic/threadshare/src/inputselector/imp.rs55
-rw-r--r--generic/threadshare/src/jitterbuffer/imp.rs74
-rw-r--r--generic/threadshare/src/lib.rs32
-rw-r--r--generic/threadshare/src/proxy/imp.rs173
-rw-r--r--generic/threadshare/src/queue/imp.rs96
-rw-r--r--generic/threadshare/src/runtime/pad.rs30
-rw-r--r--generic/threadshare/src/tcpclientsrc/imp.rs67
-rw-r--r--generic/threadshare/src/udpsink/imp.rs81
-rw-r--r--generic/threadshare/src/udpsrc/imp.rs67
-rw-r--r--generic/threadshare/tests/pad.rs145
18 files changed, 664 insertions, 877 deletions
diff --git a/generic/file/src/filesink/imp.rs b/generic/file/src/filesink/imp.rs
index 334bc1394..437b1370b 100644
--- a/generic/file/src/filesink/imp.rs
+++ b/generic/file/src/filesink/imp.rs
@@ -65,11 +65,7 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
});
impl FileSink {
- fn set_location(
- &self,
- element: &super::FileSink,
- location: Option<FileLocation>,
- ) -> Result<(), glib::Error> {
+ fn set_location(&self, location: Option<FileLocation>) -> Result<(), glib::Error> {
let state = self.state.lock().unwrap();
if let State::Started { .. } = *state {
return Err(glib::Error::new(
@@ -85,20 +81,20 @@ impl FileSink {
Some(ref location_cur) => {
gst::info!(
CAT,
- obj: element,
+ imp: self,
"Changing `location` from {:?} to {}",
location_cur,
location,
);
}
None => {
- gst::info!(CAT, obj: element, "Setting `location` to {}", location,);
+ gst::info!(CAT, imp: self, "Setting `location` to {}", location,);
}
}
Some(location)
}
None => {
- gst::info!(CAT, obj: element, "Resetting `location` to None",);
+ gst::info!(CAT, imp: self, "Resetting `location` to None",);
None
}
};
@@ -127,31 +123,25 @@ impl ObjectImpl for FileSink {
PROPERTIES.as_ref()
}
- fn set_property(
- &self,
- obj: &Self::Type,
- _id: usize,
- value: &glib::Value,
- pspec: &glib::ParamSpec,
- ) {
+ fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
match pspec.name() {
"location" => {
let res = match value.get::<Option<String>>() {
Ok(Some(location)) => FileLocation::try_from_path_str(location)
- .and_then(|file_location| self.set_location(obj, Some(file_location))),
- Ok(None) => self.set_location(obj, None),
+ .and_then(|file_location| self.set_location(Some(file_location))),
+ Ok(None) => self.set_location(None),
Err(_) => unreachable!("type checked upstream"),
};
if let Err(err) = res {
- gst::error!(CAT, obj: obj, "Failed to set property `location`: {}", err);
+ gst::error!(CAT, imp: self, "Failed to set property `location`: {}", err);
}
}
_ => unimplemented!(),
};
}
- fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
+ fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
"location" => {
let settings = self.settings.lock().unwrap();
@@ -202,7 +192,7 @@ impl ElementImpl for FileSink {
}
impl BaseSinkImpl for FileSink {
- fn start(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> {
+ fn start(&self) -> Result<(), gst::ErrorMessage> {
let mut state = self.state.lock().unwrap();
if let State::Started { .. } = *state {
unreachable!("FileSink already started");
@@ -226,15 +216,15 @@ impl BaseSinkImpl for FileSink {
]
)
})?;
- gst::debug!(CAT, obj: element, "Opened file {:?}", file);
+ gst::debug!(CAT, imp: self, "Opened file {:?}", file);
*state = State::Started { file, position: 0 };
- gst::info!(CAT, obj: element, "Started");
+ gst::info!(CAT, imp: self, "Started");
Ok(())
}
- fn stop(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> {
+ fn stop(&self) -> Result<(), gst::ErrorMessage> {
let mut state = self.state.lock().unwrap();
if let State::Stopped = *state {
return Err(gst::error_msg!(
@@ -244,18 +234,14 @@ impl BaseSinkImpl for FileSink {
}
*state = State::Stopped;
- gst::info!(CAT, obj: element, "Stopped");
+ gst::info!(CAT, imp: self, "Stopped");
Ok(())
}
// TODO: implement seek in BYTES format
- fn render(
- &self,
- element: &Self::Type,
- buffer: &gst::Buffer,
- ) -> Result<gst::FlowSuccess, gst::FlowError> {
+ fn render(&self, buffer: &gst::Buffer) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut state = self.state.lock().unwrap();
let (file, position) = match *state {
State::Started {
@@ -263,20 +249,20 @@ impl BaseSinkImpl for FileSink {
ref mut position,
} => (file, position),
State::Stopped => {
- gst::element_error!(element, gst::CoreError::Failed, ["Not started yet"]);
+ gst::element_imp_error!(self, gst::CoreError::Failed, ["Not started yet"]);
return Err(gst::FlowError::Error);
}
};
- gst::trace!(CAT, obj: element, "Rendering {:?}", buffer);
+ gst::trace!(CAT, imp: self, "Rendering {:?}", buffer);
let map = buffer.map_readable().map_err(|_| {
- gst::element_error!(element, gst::CoreError::Failed, ["Failed to map buffer"]);
+ gst::element_imp_error!(self, gst::CoreError::Failed, ["Failed to map buffer"]);
gst::FlowError::Error
})?;
file.write_all(map.as_ref()).map_err(|err| {
- gst::element_error!(
- element,
+ gst::element_imp_error!(
+ self,
gst::ResourceError::Write,
["Failed to write buffer: {}", err]
);
@@ -296,7 +282,7 @@ impl URIHandlerImpl for FileSink {
&["file"]
}
- fn uri(&self, _element: &Self::Type) -> Option<String> {
+ fn uri(&self) -> Option<String> {
let settings = self.settings.lock().unwrap();
// Conversion to Url already checked while building the `FileLocation`
@@ -307,13 +293,13 @@ impl URIHandlerImpl for FileSink {
})
}
- fn set_uri(&self, element: &Self::Type, uri: &str) -> Result<(), glib::Error> {
+ fn set_uri(&self, uri: &str) -> Result<(), glib::Error> {
// Special case for "file://" as this is used by some applications to test
// with `gst_element_make_from_uri` if there's an element that supports the URI protocol
if uri != "file://" {
let file_location = FileLocation::try_from_uri_str(uri)?;
- self.set_location(element, Some(file_location))
+ self.set_location(Some(file_location))
} else {
Ok(())
}
diff --git a/generic/file/src/filesrc/imp.rs b/generic/file/src/filesrc/imp.rs
index 886db2e33..be08200db 100644
--- a/generic/file/src/filesrc/imp.rs
+++ b/generic/file/src/filesrc/imp.rs
@@ -65,11 +65,7 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
});
impl FileSrc {
- fn set_location(
- &self,
- element: &super::FileSrc,
- location: Option<FileLocation>,
- ) -> Result<(), glib::Error> {
+ fn set_location(&self, location: Option<FileLocation>) -> Result<(), glib::Error> {
let state = self.state.lock().unwrap();
if let State::Started { .. } = *state {
return Err(glib::Error::new(
@@ -99,20 +95,20 @@ impl FileSrc {
Some(ref location_cur) => {
gst::info!(
CAT,
- obj: element,
+ imp: self,
"Changing `location` from {:?} to {}",
location_cur,
location,
);
}
None => {
- gst::info!(CAT, obj: element, "Setting `location to {}", location,);
+ gst::info!(CAT, imp: self, "Setting `location to {}", location,);
}
}
Some(location)
}
None => {
- gst::info!(CAT, obj: element, "Resetting `location` to None",);
+ gst::info!(CAT, imp: self, "Resetting `location` to None",);
None
}
};
@@ -142,31 +138,25 @@ impl ObjectImpl for FileSrc {
PROPERTIES.as_ref()
}
- fn set_property(
- &self,
- obj: &Self::Type,
- _id: usize,
- value: &glib::Value,
- pspec: &glib::ParamSpec,
- ) {
+ fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
match pspec.name() {
"location" => {
let res = match value.get::<Option<String>>() {
Ok(Some(location)) => FileLocation::try_from_path_str(location)
- .and_then(|file_location| self.set_location(obj, Some(file_location))),
- Ok(None) => self.set_location(obj, None),
+ .and_then(|file_location| self.set_location(Some(file_location))),
+ Ok(None) => self.set_location(None),
Err(_) => unreachable!("type checked upstream"),
};
if let Err(err) = res {
- gst::error!(CAT, obj: obj, "Failed to set property `location`: {}", err);
+ gst::error!(CAT, imp: self, "Failed to set property `location`: {}", err);
}
}
_ => unimplemented!(),
};
}
- fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
+ fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
"location" => {
let settings = self.settings.lock().unwrap();
@@ -181,10 +171,10 @@ impl ObjectImpl for FileSrc {
}
}
- fn constructed(&self, obj: &Self::Type) {
- self.parent_constructed(obj);
+ fn constructed(&self) {
+ self.parent_constructed();
- obj.set_format(gst::Format::Bytes);
+ self.instance().set_format(gst::Format::Bytes);
}
}
@@ -223,11 +213,11 @@ impl ElementImpl for FileSrc {
}
impl BaseSrcImpl for FileSrc {
- fn is_seekable(&self, _src: &Self::Type) -> bool {
+ fn is_seekable(&self) -> bool {
true
}
- fn size(&self, _src: &Self::Type) -> Option<u64> {
+ fn size(&self) -> Option<u64> {
let state = self.state.lock().unwrap();
if let State::Started { ref file, .. } = *state {
file.metadata().ok().map(|m| m.len())
@@ -236,7 +226,7 @@ impl BaseSrcImpl for FileSrc {
}
}
- fn start(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> {
+ fn start(&self) -> Result<(), gst::ErrorMessage> {
let mut state = self.state.lock().unwrap();
if let State::Started { .. } = *state {
unreachable!("FileSrc already started");
@@ -261,16 +251,16 @@ impl BaseSrcImpl for FileSrc {
)
})?;
- gst::debug!(CAT, obj: element, "Opened file {:?}", file);
+ gst::debug!(CAT, imp: self, "Opened file {:?}", file);
*state = State::Started { file, position: 0 };
- gst::info!(CAT, obj: element, "Started");
+ gst::info!(CAT, imp: self, "Started");
Ok(())
}
- fn stop(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> {
+ fn stop(&self) -> Result<(), gst::ErrorMessage> {
let mut state = self.state.lock().unwrap();
if let State::Stopped = *state {
return Err(gst::error_msg!(
@@ -281,14 +271,13 @@ impl BaseSrcImpl for FileSrc {
*state = State::Stopped;
- gst::info!(CAT, obj: element, "Stopped");
+ gst::info!(CAT, imp: self, "Stopped");
Ok(())
}
fn fill(
&self,
- element: &Self::Type,
offset: u64,
_length: u32,
buffer: &mut gst::BufferRef,
@@ -301,15 +290,15 @@ impl BaseSrcImpl for FileSrc {
ref mut position,
} => (file, position),
State::Stopped => {
- gst::element_error!(element, gst::CoreError::Failed, ["Not started yet"]);
+ gst::element_imp_error!(self, gst::CoreError::Failed, ["Not started yet"]);
return Err(gst::FlowError::Error);
}
};
if *position != offset {
file.seek(SeekFrom::Start(offset)).map_err(|err| {
- gst::element_error!(
- element,
+ gst::element_imp_error!(
+ self,
gst::LibraryError::Failed,
["Failed to seek to {}: {}", offset, err.to_string()]
);
@@ -321,13 +310,13 @@ impl BaseSrcImpl for FileSrc {
let size = {
let mut map = buffer.map_writable().map_err(|_| {
- gst::element_error!(element, gst::LibraryError::Failed, ["Failed to map buffer"]);
+ gst::element_imp_error!(self, gst::LibraryError::Failed, ["Failed to map buffer"]);
gst::FlowError::Error
})?;
file.read(map.as_mut()).map_err(|err| {
- gst::element_error!(
- element,
+ gst::element_imp_error!(
+ self,
gst::LibraryError::Failed,
["Failed to read at {}: {}", offset, err.to_string()]
);
@@ -350,7 +339,7 @@ impl URIHandlerImpl for FileSrc {
&["file"]
}
- fn uri(&self, _element: &Self::Type) -> Option<String> {
+ fn uri(&self) -> Option<String> {
let settings = self.settings.lock().unwrap();
// Conversion to Url already checked while building the `FileLocation`
@@ -361,13 +350,13 @@ impl URIHandlerImpl for FileSrc {
})
}
- fn set_uri(&self, element: &Self::Type, uri: &str) -> Result<(), glib::Error> {
+ fn set_uri(&self, uri: &str) -> Result<(), glib::Error> {
// Special case for "file://" as this is used by some applications to test
// with `gst_element_make_from_uri` if there's an element that supports the URI protocol
if uri != "file://" {
let file_location = FileLocation::try_from_uri_str(uri)?;
- self.set_location(element, Some(file_location))
+ self.set_location(Some(file_location))
} else {
Ok(())
}
diff --git a/generic/fmp4/src/fmp4mux/imp.rs b/generic/fmp4/src/fmp4mux/imp.rs
index a2d232e19..7433a1dd0 100644
--- a/generic/fmp4/src/fmp4mux/imp.rs
+++ b/generic/fmp4/src/fmp4mux/imp.rs
@@ -183,7 +183,6 @@ pub(crate) struct FMP4Mux {
impl FMP4Mux {
fn find_earliest_stream<'a>(
&self,
- element: &super::FMP4Mux,
state: &'a mut State,
timeout: bool,
) -> Result<Option<(usize, &'a mut Stream)>, gst::FlowError> {
@@ -255,21 +254,21 @@ impl FMP4Mux {
if !timeout && !all_have_data_or_eos {
gst::trace!(
CAT,
- obj: element,
+ imp: self,
"No timeout and not all streams have a buffer or are EOS"
);
Ok(None)
} else if let Some((idx, stream, earliest_running_time)) = earliest_stream {
gst::trace!(
CAT,
- obj: element,
+ imp: self,
"Stream {} is earliest stream with running time {}",
stream.sinkpad.name(),
earliest_running_time
);
Ok(Some((idx, stream)))
} else {
- gst::trace!(CAT, obj: element, "No streams have data queued currently");
+ gst::trace!(CAT, imp: self, "No streams have data queued currently");
Ok(None)
}
}
@@ -277,7 +276,6 @@ impl FMP4Mux {
// Queue incoming buffers as individual GOPs.
fn queue_gops(
&self,
- element: &super::FMP4Mux,
_idx: usize,
stream: &mut Stream,
segment: &gst::FormattedSegment<gst::ClockTime>,
@@ -439,7 +437,8 @@ impl FMP4Mux {
// If this is a multi-stream element then we need to update the PTS/DTS positions according
// to the output segment, specifically to re-timestamp them with the running time and
// adjust for the segment shift to compensate for negative DTS.
- let class = element.class();
+ let aggregator = self.instance();
+ let class = aggregator.class();
let (pts_position, dts_position) = if class.as_ref().variant.is_single_stream() {
(pts_position, dts_position)
} else {
@@ -609,7 +608,6 @@ impl FMP4Mux {
#[allow(clippy::type_complexity)]
fn drain_buffers(
&self,
- element: &super::FMP4Mux,
state: &mut State,
settings: &Settings,
timeout: bool,
@@ -652,7 +650,7 @@ impl FMP4Mux {
let fragment_start_pts = state.fragment_start_pts.unwrap();
gst::info!(
CAT,
- obj: element,
+ imp: self,
"Starting to drain at {}",
fragment_start_pts
);
@@ -898,7 +896,6 @@ impl FMP4Mux {
fn preprocess_drained_streams_onvif(
&self,
- element: &super::FMP4Mux,
state: &mut State,
drained_streams: &mut [(
gst::Caps,
@@ -906,7 +903,8 @@ impl FMP4Mux {
VecDeque<Buffer>,
)],
) -> Result<Option<gst::ClockTime>, gst::FlowError> {
- if element.class().as_ref().variant != super::Variant::ONVIF {
+ let aggregator = self.instance();
+ if aggregator.class().as_ref().variant != super::Variant::ONVIF {
return Ok(None);
}
@@ -1007,7 +1005,7 @@ impl FMP4Mux {
gst::debug!(
CAT,
- obj: element,
+ imp: self,
"Configuring start UTC time {}",
start_utc_time.unwrap()
);
@@ -1150,7 +1148,6 @@ impl FMP4Mux {
#[allow(clippy::type_complexity)]
fn interleave_buffers(
&self,
- _element: &super::FMP4Mux,
settings: &Settings,
mut drained_streams: Vec<(
gst::Caps,
@@ -1224,7 +1221,6 @@ impl FMP4Mux {
fn drain(
&self,
- element: &super::FMP4Mux,
state: &mut State,
settings: &Settings,
timeout: bool,
@@ -1232,9 +1228,9 @@ impl FMP4Mux {
upstream_events: &mut Vec<(gst_base::AggregatorPad, gst::Event)>,
) -> Result<(Option<gst::Caps>, Option<gst::BufferList>), gst::FlowError> {
if at_eos {
- gst::info!(CAT, obj: element, "Draining at EOS");
+ gst::info!(CAT, imp: self, "Draining at EOS");
} else if timeout {
- gst::info!(CAT, obj: element, "Draining at timeout");
+ gst::info!(CAT, imp: self, "Draining at timeout");
} else {
for stream in &state.streams {
if !stream.fragment_filled && !stream.sinkpad.is_eos() {
@@ -1250,7 +1246,7 @@ impl FMP4Mux {
min_earliest_pts,
min_start_dts_position,
fragment_end_pts,
- ) = self.drain_buffers(element, state, settings, timeout, at_eos)?;
+ ) = self.drain_buffers(state, settings, timeout, at_eos)?;
// Remove all GAP buffers before processing them further
for (_, _, buffers) in &mut drained_streams {
@@ -1263,20 +1259,18 @@ impl FMP4Mux {
// For ONVIF, replace all timestamps with timestamps based on UTC times.
let max_end_utc_time =
- self.preprocess_drained_streams_onvif(element, state, &mut drained_streams)?;
+ self.preprocess_drained_streams_onvif(state, &mut drained_streams)?;
// Create header now if it was not created before and return the caps
let mut caps = None;
if state.stream_header.is_none() {
- let (_, new_caps) = self
- .update_header(element, state, settings, false)?
- .unwrap();
+ let (_, new_caps) = self.update_header(state, settings, false)?.unwrap();
caps = Some(new_caps);
}
// Interleave buffers according to the settings into a single vec
let (mut interleaved_buffers, streams) =
- self.interleave_buffers(element, settings, drained_streams)?;
+ self.interleave_buffers(settings, drained_streams)?;
let mut buffer_list = None;
if interleaved_buffers.is_empty() {
@@ -1316,7 +1310,7 @@ impl FMP4Mux {
state.sequence_number += 1;
let (mut fmp4_fragment_header, moof_offset) =
boxes::create_fmp4_fragment_header(super::FragmentHeaderConfiguration {
- variant: element.class().as_ref().variant,
+ variant: self.instance().class().as_ref().variant,
sequence_number,
streams: streams.as_slice(),
buffers: interleaved_buffers.as_slice(),
@@ -1324,7 +1318,7 @@ impl FMP4Mux {
.map_err(|err| {
gst::error!(
CAT,
- obj: element,
+ imp: self,
"Failed to create FMP4 fragment header: {}",
err
);
@@ -1394,7 +1388,7 @@ impl FMP4Mux {
// Update for the start PTS of the next fragment
gst::info!(
CAT,
- obj: element,
+ imp: self,
"Starting new fragment at {}",
fragment_end_pts,
);
@@ -1402,7 +1396,7 @@ impl FMP4Mux {
gst::debug!(
CAT,
- obj: element,
+ imp: self,
"Sending force-keyunit events for running time {}",
fragment_end_pts + settings.fragment_duration,
);
@@ -1435,7 +1429,7 @@ impl FMP4Mux {
buffer_list.as_mut().unwrap().get_mut().unwrap().add(mfra);
}
Err(err) => {
- gst::error!(CAT, obj: element, "Failed to create mfra box: {}", err);
+ gst::error!(CAT, imp: self, "Failed to create mfra box: {}", err);
}
}
}
@@ -1446,12 +1440,9 @@ impl FMP4Mux {
Ok((caps, buffer_list))
}
- fn create_streams(
- &self,
- element: &super::FMP4Mux,
- state: &mut State,
- ) -> Result<(), gst::FlowError> {
- for pad in element
+ fn create_streams(&self, state: &mut State) -> Result<(), gst::FlowError> {
+ for pad in self
+ .instance()
.sink_pads()
.into_iter()
.map(|pad| pad.downcast::<gst_base::AggregatorPad>().unwrap())
@@ -1511,7 +1502,7 @@ impl FMP4Mux {
}
if state.streams.is_empty() {
- gst::error!(CAT, obj: element, "No streams available");
+ gst::error!(CAT, imp: self, "No streams available");
return Err(gst::FlowError::Error);
}
@@ -1546,12 +1537,12 @@ impl FMP4Mux {
fn update_header(
&self,
- element: &super::FMP4Mux,
state: &mut State,
settings: &Settings,
at_eos: bool,
) -> Result<Option<(gst::BufferList, gst::Caps)>, gst::FlowError> {
- let class = element.class();
+ let aggregator = self.instance();
+ let class = aggregator.class();
let variant = class.as_ref().variant;
if settings.header_update_mode == super::HeaderUpdateMode::None && at_eos {
@@ -1591,7 +1582,7 @@ impl FMP4Mux {
.map(|unix| unix.nseconds() / 100 + UNIX_1601_OFFSET * 10_000_000),
})
.map_err(|err| {
- gst::error!(CAT, obj: element, "Failed to create FMP4 header: {}", err);
+ gst::error!(CAT, imp: self, "Failed to create FMP4 header: {}", err);
gst::FlowError::Error
})?;
@@ -1680,13 +1671,7 @@ impl ObjectImpl for FMP4Mux {
&*PROPERTIES
}
- fn set_property(
- &self,
- obj: &Self::Type,
- _id: usize,
- value: &glib::Value,
- pspec: &glib::ParamSpec,
- ) {
+ fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
match pspec.name() {
"fragment-duration" => {
let mut settings = self.settings.lock().unwrap();
@@ -1694,7 +1679,7 @@ impl ObjectImpl for FMP4Mux {
if settings.fragment_duration != fragment_duration {
settings.fragment_duration = fragment_duration;
drop(settings);
- obj.set_latency(fragment_duration, None);
+ self.instance().set_latency(fragment_duration, None);
}
}
@@ -1733,7 +1718,7 @@ impl ObjectImpl for FMP4Mux {
}
}
- fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
+ fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
"fragment-duration" => {
let settings = self.settings.lock().unwrap();
@@ -1769,9 +1754,10 @@ impl ObjectImpl for FMP4Mux {
}
}
- fn constructed(&self, obj: &Self::Type) {
- self.parent_constructed(obj);
+ fn constructed(&self) {
+ self.parent_constructed();
+ let obj = self.instance();
let class = obj.class();
for templ in class.pad_template_list().filter(|templ| {
templ.presence() == gst::PadPresence::Always
@@ -1794,7 +1780,6 @@ impl GstObjectImpl for FMP4Mux {}
impl ElementImpl for FMP4Mux {
fn request_new_pad(
&self,
- element: &Self::Type,
templ: &gst::PadTemplate,
name: Option<String>,
caps: Option<&gst::Caps>,
@@ -1803,25 +1788,24 @@ impl ElementImpl for FMP4Mux {
if state.stream_header.is_some() {
gst::error!(
CAT,
- obj: element,
+ imp: self,
"Can't request new pads after header was generated"
);
return None;
}
- self.parent_request_new_pad(element, templ, name, caps)
+ self.parent_request_new_pad(templ, name, caps)
}
}
impl AggregatorImpl for FMP4Mux {
- fn next_time(&self, _aggregator: &Self::Type) -> Option<gst::ClockTime> {
+ fn next_time(&self) -> Option<gst::ClockTime> {
let state = self.state.lock().unwrap();
state.fragment_start_pts.opt_add(state.timeout_delay)
}
fn sink_query(
&self,
- aggregator: &Self::Type,
aggregator_pad: &gst_base::AggregatorPad,
query: &mut gst::QueryRef,
) -> bool {
@@ -1845,13 +1829,12 @@ impl AggregatorImpl for FMP4Mux {
true
}
- _ => self.parent_sink_query(aggregator, aggregator_pad, query),
+ _ => self.parent_sink_query(aggregator_pad, query),
}
}
fn sink_event_pre_queue(
&self,
- aggregator: &Self::Type,
aggregator_pad: &gst_base::AggregatorPad,
mut event: gst::Event,
) -> Result<gst::FlowSuccess, gst::FlowError> {
@@ -1872,18 +1855,13 @@ impl AggregatorImpl for FMP4Mux {
.seqnum(event.seqnum())
.build();
}
- self.parent_sink_event_pre_queue(aggregator, aggregator_pad, event)
+ self.parent_sink_event_pre_queue(aggregator_pad, event)
}
- _ => self.parent_sink_event_pre_queue(aggregator, aggregator_pad, event),
+ _ => self.parent_sink_event_pre_queue(aggregator_pad, event),
}
}
- fn sink_event(
- &self,
- aggregator: &Self::Type,
- aggregator_pad: &gst_base::AggregatorPad,
- event: gst::Event,
- ) -> bool {
+ fn sink_event(&self, aggregator_pad: &gst_base::AggregatorPad, event: gst::Event) -> bool {
use gst::EventView;
gst::trace!(CAT, obj: aggregator_pad, "Handling event {:?}", event);
@@ -1901,26 +1879,27 @@ impl AggregatorImpl for FMP4Mux {
// Only forward the segment event verbatim if this is a single stream variant.
// Otherwise we have to produce a default segment and re-timestamp all buffers
// with their running time.
+ let aggregator = self.instance();
let class = aggregator.class();
if class.as_ref().variant.is_single_stream() {
aggregator.update_segment(&segment);
}
- self.parent_sink_event(aggregator, aggregator_pad, event)
+ self.parent_sink_event(aggregator_pad, event)
}
EventView::Tag(_ev) => {
// TODO: Maybe store for putting into the headers of the next fragment?
- self.parent_sink_event(aggregator, aggregator_pad, event)
+ self.parent_sink_event(aggregator_pad, event)
}
- _ => self.parent_sink_event(aggregator, aggregator_pad, event),
+ _ => self.parent_sink_event(aggregator_pad, event),
}
}
- fn src_query(&self, aggregator: &Self::Type, query: &mut gst::QueryRef) -> bool {
+ fn src_query(&self, query: &mut gst::QueryRef) -> bool {
use gst::QueryViewMut;
- gst::trace!(CAT, obj: aggregator, "Handling query {:?}", query);
+ gst::trace!(CAT, imp: self, "Handling query {:?}", query);
match query.view_mut() {
QueryViewMut::Seeking(q) => {
@@ -1928,23 +1907,23 @@ impl AggregatorImpl for FMP4Mux {
q.set(false, gst::ClockTime::ZERO, gst::ClockTime::NONE);
true
}
- _ => self.parent_src_query(aggregator, query),
+ _ => self.parent_src_query(query),
}
}
- fn src_event(&self, aggregator: &Self::Type, event: gst::Event) -> bool {
+ fn src_event(&self, event: gst::Event) -> bool {
use gst::EventView;
- gst::trace!(CAT, obj: aggregator, "Handling event {:?}", event);
+ gst::trace!(CAT, imp: self, "Handling event {:?}", event);
match event.view() {
EventView::Seek(_ev) => false,
- _ => self.parent_src_event(aggregator, event),
+ _ => self.parent_src_event(event),
}
}
- fn flush(&self, aggregator: &Self::Type) -> Result<gst::FlowSuccess, gst::FlowError> {
- self.parent_flush(aggregator)?;
+ fn flush(&self) -> Result<gst::FlowSuccess, gst::FlowError> {
+ self.parent_flush()?;
let mut state = self.state.lock().unwrap();
@@ -1962,23 +1941,24 @@ impl AggregatorImpl for FMP4Mux {
Ok(gst::FlowSuccess::Ok)
}
- fn stop(&self, aggregator: &Self::Type) -> Result<(), gst::ErrorMessage> {
- gst::trace!(CAT, obj: aggregator, "Stopping");
+ fn stop(&self) -> Result<(), gst::ErrorMessage> {
+ gst::trace!(CAT, imp: self, "Stopping");
- let _ = self.parent_stop(aggregator);
+ let _ = self.parent_stop();
*self.state.lock().unwrap() = State::default();
Ok(())
}
- fn start(&self, aggregator: &Self::Type) -> Result<(), gst::ErrorMessage> {
- gst::trace!(CAT, obj: aggregator, "Starting");
+ fn start(&self) -> Result<(), gst::ErrorMessage> {
+ gst::trace!(CAT, imp: self, "Starting");
- self.parent_start(aggregator)?;
+ self.parent_start()?;
// For non-single-stream variants configure a default segment that allows for negative
// DTS so that we can correctly re-timestamp buffers with their running times.
+ let aggregator = self.instance();
let class = aggregator.class();
if !class.as_ref().variant.is_single_stream() {
let mut segment = gst::FormattedSegment::<gst::ClockTime>::new();
@@ -1992,15 +1972,11 @@ impl AggregatorImpl for FMP4Mux {
Ok(())
}
- fn negotiate(&self, _aggregator: &Self::Type) -> bool {
+ fn negotiate(&self) -> bool {
true
}
- fn aggregate(
- &self,
- aggregator: &Self::Type,
- timeout: bool,
- ) -> Result<gst::FlowSuccess, gst::FlowError> {
+ fn aggregate(&self, timeout: bool) -> Result<gst::FlowSuccess, gst::FlowError> {
let settings = self.settings.lock().unwrap().clone();
let mut upstream_events = vec![];
@@ -2011,7 +1987,7 @@ impl AggregatorImpl for FMP4Mux {
// Create streams
if state.streams.is_empty() {
- self.create_streams(aggregator, &mut state)?;
+ self.create_streams(&mut state)?;
}
// Queue buffers from all streams that are not filled for the current fragment yet
@@ -2020,9 +1996,7 @@ impl AggregatorImpl for FMP4Mux {
// fill-level at all sinkpads in sync.
let fragment_start_pts = state.fragment_start_pts;
- while let Some((idx, stream)) =
- self.find_earliest_stream(aggregator, &mut state, timeout)?
- {
+ while let Some((idx, stream)) = self.find_earliest_stream(&mut state, timeout)? {
// Can only happen if the stream was flushed in the meantime
let buffer = match stream.sinkpad.pop_buffer() {
Some(buffer) => buffer,
@@ -2045,7 +2019,7 @@ impl AggregatorImpl for FMP4Mux {
};
// Queue up the buffer and update GOP tracking state
- self.queue_gops(aggregator, idx, stream, &segment, buffer)?;
+ self.queue_gops(idx, stream, &segment, buffer)?;
// Check if this stream is filled enough now.
if let Some((queued_end_pts, fragment_start_pts)) = Option::zip(
@@ -2091,13 +2065,13 @@ impl AggregatorImpl for FMP4Mux {
}
if let Some(earliest_pts) = earliest_pts {
- gst::info!(CAT, obj: aggregator, "Got earliest PTS {}", earliest_pts);
+ gst::info!(CAT, imp: self, "Got earliest PTS {}", earliest_pts);
state.earliest_pts = Some(earliest_pts);
state.fragment_start_pts = Some(earliest_pts);
gst::debug!(
CAT,
- obj: aggregator,
+ imp: self,
"Sending first force-keyunit event for running time {}",
earliest_pts + settings.fragment_duration,
);
@@ -2130,12 +2104,11 @@ impl AggregatorImpl for FMP4Mux {
all_eos = state.streams.iter().all(|stream| stream.sinkpad.is_eos());
if all_eos {
- gst::debug!(CAT, obj: aggregator, "All streams are EOS now");
+ gst::debug!(CAT, imp: self, "All streams are EOS now");
}
// If enough GOPs were queued, drain and create the output fragment
match self.drain(
- aggregator,
&mut state,
&settings,
timeout,
@@ -2144,8 +2117,8 @@ impl AggregatorImpl for FMP4Mux {
) {
Ok(res) => res,
Err(gst_base::AGGREGATOR_FLOW_NEED_DATA) => {
- gst::element_warning!(
- aggregator,
+ gst::element_imp_warning!(
+ self,
gst::StreamError::Format,
["Longer GOPs than fragment duration"]
);
@@ -2166,38 +2139,30 @@ impl AggregatorImpl for FMP4Mux {
}
if let Some(caps) = caps {
- gst::debug!(
- CAT,
- obj: aggregator,
- "Setting caps on source pad: {:?}",
- caps
- );
- aggregator.set_src_caps(&caps);
+ gst::debug!(CAT, imp: self, "Setting caps on source pad: {:?}", caps);
+ self.instance().set_src_caps(&caps);
}
if let Some(buffers) = buffers {
- gst::trace!(CAT, obj: aggregator, "Pushing buffer list {:?}", buffers);
- aggregator.finish_buffer_list(buffers)?;
+ gst::trace!(CAT, imp: self, "Pushing buffer list {:?}", buffers);
+ self.instance().finish_buffer_list(buffers)?;
}
if all_eos {
- gst::debug!(CAT, obj: aggregator, "Doing EOS handling");
+ gst::debug!(CAT, imp: self, "Doing EOS handling");
if settings.header_update_mode != super::HeaderUpdateMode::None {
- let updated_header = self.update_header(
- aggregator,
- &mut self.state.lock().unwrap(),
- &settings,
- true,
- );
+ let updated_header =
+ self.update_header(&mut self.state.lock().unwrap(), &settings, true);
match updated_header {
Ok(Some((buffer_list, caps))) => {
match settings.header_update_mode {
super::HeaderUpdateMode::None => unreachable!(),
super::HeaderUpdateMode::Rewrite => {
- let src_pad = aggregator.src_pad();
let mut q = gst::query::Seeking::new(gst::Format::Bytes);
- if src_pad.peer_query(&mut q) && q.result().0 {
+ if self.instance().src_pad().peer_query(&mut q) && q.result().0 {
+ let aggregator = self.instance();
+
aggregator.set_src_caps(&caps);
// Seek to the beginning with a default bytes segment
@@ -2209,7 +2174,7 @@ impl AggregatorImpl for FMP4Mux {
if let Err(err) = aggregator.finish_buffer_list(buffer_list) {
gst::error!(
CAT,
- obj: aggregator,
+ imp: self,
"Failed pushing updated header buffer downstream: {:?}",
err,
);
@@ -2217,17 +2182,19 @@ impl AggregatorImpl for FMP4Mux {
} else {
gst::error!(
CAT,
- obj: aggregator,
+ imp: self,
"Can't rewrite header because downstream is not seekable"
);
}
}
super::HeaderUpdateMode::Update => {
+ let aggregator = self.instance();
+
aggregator.set_src_caps(&caps);
if let Err(err) = aggregator.finish_buffer_list(buffer_list) {
gst::error!(
CAT,
- obj: aggregator,
+ imp: self,
"Failed pushing updated header buffer downstream: {:?}",
err,
);
@@ -2239,7 +2206,7 @@ impl AggregatorImpl for FMP4Mux {
Err(err) => {
gst::error!(
CAT,
- obj: aggregator,
+ imp: self,
"Failed to generate updated header: {:?}",
err
);
diff --git a/generic/sodium/src/decrypter/imp.rs b/generic/sodium/src/decrypter/imp.rs
index 7f35582cc..546b25033 100644
--- a/generic/sodium/src/decrypter/imp.rs
+++ b/generic/sodium/src/decrypter/imp.rs
@@ -99,14 +99,14 @@ impl State {
// retrieval
fn decrypt_into_adapter(
&mut self,
- element: &super::Decrypter,
+ imp: &Decrypter,
pad: &gst::Pad,
buffer: &gst::Buffer,
chunk_index: u64,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let map = buffer.map_readable().map_err(|_| {
- gst::element_error!(
- element,
+ gst::element_imp_error!(
+ imp,
gst::StreamError::Format,
["Failed to map buffer readable"]
);
@@ -122,8 +122,8 @@ impl State {
for subbuffer in map.chunks(block_size) {
let plain =
box_::open_precomputed(subbuffer, &nonce, &self.precomputed_key).map_err(|_| {
- gst::element_error!(
- element,
+ gst::element_imp_error!(
+ imp,
gst::StreamError::Format,
["Failed to decrypt buffer"]
);
@@ -255,7 +255,6 @@ impl Decrypter {
fn src_activatemode_function(
&self,
_pad: &gst::Pad,
- element: &super::Decrypter,
mode: gst::PadMode,
active: bool,
) -> Result<(), gst::LoggableError> {
@@ -267,7 +266,7 @@ impl Decrypter {
// Set the nonce and block size from the headers
// right after we activate the pad
- self.check_headers(element)
+ self.check_headers()
}
gst::PadMode::Push => Err(gst::loggable_error!(CAT, "Push mode not supported")),
_ => Err(gst::loggable_error!(
@@ -278,12 +277,7 @@ impl Decrypter {
}
}
- fn src_query(
- &self,
- pad: &gst::Pad,
- element: &super::Decrypter,
- query: &mut gst::QueryRef,
- ) -> bool {
+ fn src_query(&self, pad: &gst::Pad, query: &mut gst::QueryRef) -> bool {
use gst::QueryViewMut;
gst::log!(CAT, obj: pad, "Handling query {:?}", query);
@@ -306,7 +300,7 @@ impl Decrypter {
}
QueryViewMut::Duration(q) => {
if q.format() != gst::Format::Bytes {
- return pad.query_default(Some(element), query);
+ return pad.query_default(Some(&*self.instance()), query);
}
/* First let's query the bytes duration upstream */
@@ -348,11 +342,11 @@ impl Decrypter {
true
}
- _ => pad.query_default(Some(element), query),
+ _ => pad.query_default(Some(&*self.instance()), query),
}
}
- fn check_headers(&self, element: &super::Decrypter) -> Result<(), gst::LoggableError> {
+ fn check_headers(&self) -> Result<(), gst::LoggableError> {
let is_none = {
let mutex_state = self.state.lock().unwrap();
let state = mutex_state.as_ref().unwrap();
@@ -372,26 +366,22 @@ impl Decrypter {
"Failed to pull nonce from the stream, reason: {:?}",
err
);
- err.log_with_object(element);
err
})?;
if buffer.size() != crate::HEADERS_SIZE {
let err = gst::loggable_error!(CAT, "Headers buffer has wrong size");
- err.log_with_object(element);
return Err(err);
}
let map = buffer.map_readable().map_err(|_| {
let err = gst::loggable_error!(CAT, "Failed to map buffer readable");
- err.log_with_object(element);
err
})?;
let sodium_header_slice = &map[..crate::TYPEFIND_HEADER_SIZE];
if sodium_header_slice != crate::TYPEFIND_HEADER {
let err = gst::loggable_error!(CAT, "Buffer has wrong typefind header");
- err.log_with_object(element);
return Err(err);
}
@@ -400,7 +390,6 @@ impl Decrypter {
assert_eq!(nonce_slice.len(), box_::NONCEBYTES);
let nonce = box_::Nonce::from_slice(nonce_slice).ok_or_else(|| {
let err = gst::loggable_error!(CAT, "Failed to create nonce from buffer");
- err.log_with_object(&self.srcpad);
err
})?;
@@ -416,9 +405,9 @@ impl Decrypter {
let state = state.as_mut().unwrap();
state.initial_nonce = Some(nonce);
- gst::debug!(CAT, obj: element, "Setting nonce to: {:?}", nonce.0);
+ gst::debug!(CAT, imp: self, "Setting nonce to: {:?}", nonce.0);
state.block_size = Some(block_size);
- gst::debug!(CAT, obj: element, "Setting block size to: {}", block_size);
+ gst::debug!(CAT, imp: self, "Setting block size to: {}", block_size);
Ok(())
}
@@ -426,7 +415,6 @@ impl Decrypter {
fn pull_requested_buffer(
&self,
pad: &gst::Pad,
- element: &super::Decrypter,
requested_size: u32,
block_size: u32,
chunk_index: u64,
@@ -441,8 +429,8 @@ impl Decrypter {
// calculate how many chunks are needed, if we need something like 3.2
// round the number to 4 and cut the buffer afterwards.
let checked = requested_size.checked_add(block_size).ok_or_else(|| {
- gst::element_error!(
- element,
+ gst::element_imp_error!(
+ self,
gst::LibraryError::Failed,
[
"Addition overflow when adding requested pull size and block size: {} + {}",
@@ -459,8 +447,8 @@ impl Decrypter {
// Pull a buffer of all the chunks we will need
let checked_size = total_chunks.checked_mul(block_size).ok_or_else(|| {
- gst::element_error!(
- element,
+ gst::element_imp_error!(
+ self,
gst::LibraryError::Failed,
[
"Overflowed trying to calculate the buffer size to pull: {} * {}",
@@ -494,7 +482,6 @@ impl Decrypter {
fn range(
&self,
pad: &gst::Pad,
- element: &super::Decrypter,
offset: u64,
buffer: Option<&mut gst::BufferRef>,
requested_size: u32,
@@ -519,20 +506,15 @@ impl Decrypter {
assert!(pull_offset <= std::u32::MAX as u64);
let pull_offset = pull_offset as u32;
- let pulled_buffer = self.pull_requested_buffer(
- pad,
- element,
- requested_size + pull_offset,
- block_size,
- chunk_index,
- )?;
+ let pulled_buffer =
+ self.pull_requested_buffer(pad, requested_size + pull_offset, block_size, chunk_index)?;
let mut state = self.state.lock().unwrap();
// This will only be run after READY state,
// and will be guaranted to be initialized
let state = state.as_mut().unwrap();
- state.decrypt_into_adapter(element, &self.srcpad, &pulled_buffer, chunk_index)?;
+ state.decrypt_into_adapter(self, &self.srcpad, &pulled_buffer, chunk_index)?;
let adapter_offset = pull_offset as usize;
state.requested_buffer(&self.srcpad, buffer, requested_size, adapter_offset)
@@ -555,7 +537,7 @@ impl ObjectSubclass for Decrypter {
Decrypter::catch_panic_pad_function(
parent,
|| Err(gst::FlowError::Error),
- |decrypter, element| decrypter.range(pad, element, offset, buffer, size),
+ |decrypter| decrypter.range(pad, offset, buffer, size),
)
})
.activatemode_function(|pad, parent, mode, active| {
@@ -567,16 +549,14 @@ impl ObjectSubclass for Decrypter {
"Panic activating srcpad with mode"
))
},
- |decrypter, element| {
- decrypter.src_activatemode_function(pad, element, mode, active)
- },
+ |decrypter| decrypter.src_activatemode_function(pad, mode, active),
)
})
.query_function(|pad, parent, query| {
Decrypter::catch_panic_pad_function(
parent,
|| false,
- |decrypter, element| decrypter.src_query(pad, element, query),
+ |decrypter| decrypter.src_query(pad, query),
)
})
.build();
@@ -612,20 +592,15 @@ impl ObjectImpl for Decrypter {
PROPERTIES.as_ref()
}
- fn constructed(&self, obj: &Self::Type) {
- self.parent_constructed(obj);
+ fn constructed(&self) {
+ self.parent_constructed();
+ let obj = self.instance();
obj.add_pad(&self.sinkpad).unwrap();
obj.add_pad(&self.srcpad).unwrap();
}
- fn set_property(
- &self,
- _obj: &Self::Type,
- _id: usize,
- value: &glib::Value,
- pspec: &glib::ParamSpec,
- ) {
+ fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
match pspec.name() {
"sender-key" => {
let mut props = self.props.lock().unwrap();
@@ -641,7 +616,7 @@ impl ObjectImpl for Decrypter {
}
}
- fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
+ fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
"receiver-key" => {
let props = self.props.lock().unwrap();
@@ -696,10 +671,9 @@ impl ElementImpl for Decrypter {
fn change_state(
&self,
- element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
- gst::debug!(CAT, obj: element, "Changing state {:?}", transition);
+ gst::debug!(CAT, imp: self, "Changing state {:?}", transition);
match transition {
gst::StateChange::NullToReady => {
@@ -708,7 +682,7 @@ impl ElementImpl for Decrypter {
// Create an internal state struct from the provided properties or
// refuse to change state
let state_ = State::from_props(&props).map_err(|err| {
- element.post_error_message(err);
+ self.post_error_message(err);
gst::StateChangeError
})?;
@@ -721,7 +695,7 @@ impl ElementImpl for Decrypter {
_ => (),
}
- let success = self.parent_change_state(element, transition)?;
+ let success = self.parent_change_state(transition)?;
if transition == gst::StateChange::ReadyToNull {
let _ = self.state.lock().unwrap().take();
diff --git a/generic/sodium/src/encrypter/imp.rs b/generic/sodium/src/encrypter/imp.rs
index b56080c23..b0cc8a4c2 100644
--- a/generic/sodium/src/encrypter/imp.rs
+++ b/generic/sodium/src/encrypter/imp.rs
@@ -166,7 +166,6 @@ impl Encrypter {
fn sink_chain(
&self,
pad: &gst::Pad,
- element: &super::Encrypter,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst::log!(CAT, obj: pad, "Handling buffer {:?}", buffer);
@@ -196,7 +195,7 @@ impl Encrypter {
for buffer in buffers {
self.srcpad.push(buffer).map_err(|err| {
- gst::error!(CAT, obj: element, "Failed to push buffer {:?}", err);
+ gst::error!(CAT, imp: self, "Failed to push buffer {:?}", err);
err
})?;
}
@@ -204,7 +203,7 @@ impl Encrypter {
Ok(gst::FlowSuccess::Ok)
}
- fn sink_event(&self, pad: &gst::Pad, element: &super::Encrypter, event: gst::Event) -> bool {
+ fn sink_event(&self, pad: &gst::Pad, event: gst::Event) -> bool {
use gst::EventView;
gst::log!(CAT, obj: pad, "Handling event {:?}", event);
@@ -239,34 +238,29 @@ impl Encrypter {
for buffer in buffers {
if let Err(err) = self.srcpad.push(buffer) {
- gst::error!(CAT, obj: element, "Failed to push buffer at EOS {:?}", err);
+ gst::error!(CAT, imp: self, "Failed to push buffer at EOS {:?}", err);
return false;
}
}
- pad.event_default(Some(element), event)
+ pad.event_default(Some(&*self.instance()), event)
}
- _ => pad.event_default(Some(element), event),
+ _ => pad.event_default(Some(&*self.instance()), event),
}
}
- fn src_event(&self, pad: &gst::Pad, element: &super::Encrypter, event: gst::Event) -> bool {
+ fn src_event(&self, pad: &gst::Pad, event: gst::Event) -> bool {
use gst::EventView;
gst::log!(CAT, obj: pad, "Handling event {:?}", event);
match event.view() {
EventView::Seek(_) => false,
- _ => pad.event_default(Some(element), event),
+ _ => pad.event_default(Some(&*self.instance()), event),
}
}
- fn src_query(
- &self,
- pad: &gst::Pad,
- element: &super::Encrypter,
- query: &mut gst::QueryRef,
- ) -> bool {
+ fn src_query(&self, pad: &gst::Pad, query: &mut gst::QueryRef) -> bool {
use gst::QueryViewMut;
gst::log!(CAT, obj: pad, "Handling query {:?}", query);
@@ -284,7 +278,7 @@ impl Encrypter {
}
QueryViewMut::Duration(q) => {
if q.format() != gst::Format::Bytes {
- return pad.query_default(Some(element), query);
+ return pad.query_default(Some(&*self.instance()), query);
}
/* First let's query the bytes duration upstream */
@@ -325,7 +319,7 @@ impl Encrypter {
true
}
- _ => pad.query_default(Some(element), query),
+ _ => pad.query_default(Some(&*self.instance()), query),
}
}
}
@@ -343,14 +337,14 @@ impl ObjectSubclass for Encrypter {
Encrypter::catch_panic_pad_function(
parent,
|| Err(gst::FlowError::Error),
- |encrypter, element| encrypter.sink_chain(pad, element, buffer),
+ |encrypter| encrypter.sink_chain(pad, buffer),
)
})
.event_function(|pad, parent, event| {
Encrypter::catch_panic_pad_function(
parent,
|| false,
- |encrypter, element| encrypter.sink_event(pad, element, event),
+ |encrypter| encrypter.sink_event(pad, event),
)
})
.build();
@@ -361,14 +355,14 @@ impl ObjectSubclass for Encrypter {
Encrypter::catch_panic_pad_function(
parent,
|| false,
- |encrypter, element| encrypter.src_query(pad, element, query),
+ |encrypter| encrypter.src_query(pad, query),
)
})
.event_function(|pad, parent, event| {
Encrypter::catch_panic_pad_function(
parent,
|| false,
- |encrypter, element| encrypter.src_event(pad, element, event),
+ |encrypter| encrypter.src_event(pad, event),
)
})
.build();
@@ -410,20 +404,15 @@ impl ObjectImpl for Encrypter {
PROPERTIES.as_ref()
}
- fn constructed(&self, obj: &Self::Type) {
- self.parent_constructed(obj);
+ fn constructed(&self) {
+ self.parent_constructed();
+ let obj = self.instance();
obj.add_pad(&self.sinkpad).unwrap();
obj.add_pad(&self.srcpad).unwrap();
}
- fn set_property(
- &self,
- _obj: &Self::Type,
- _id: usize,
- value: &glib::Value,
- pspec: &glib::ParamSpec,
- ) {
+ fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
match pspec.name() {
"sender-key" => {
let mut props = self.props.lock().unwrap();
@@ -444,7 +433,7 @@ impl ObjectImpl for Encrypter {
}
}
- fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
+ fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
"receiver-key" => {
let props = self.props.lock().unwrap();
@@ -504,10 +493,9 @@ impl ElementImpl for Encrypter {
fn change_state(
&self,
- element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
- gst::debug!(CAT, obj: element, "Changing state {:?}", transition);
+ gst::debug!(CAT, imp: self, "Changing state {:?}", transition);
match transition {
gst::StateChange::NullToReady => {
@@ -516,7 +504,7 @@ impl ElementImpl for Encrypter {
// Create an internal state struct from the provided properties or
// refuse to change state
let state_ = State::from_props(&props).map_err(|err| {
- element.post_error_message(err);
+ self.post_error_message(err);
gst::StateChangeError
})?;
@@ -529,7 +517,7 @@ impl ElementImpl for Encrypter {
_ => (),
}
- let success = self.parent_change_state(element, transition)?;
+ let success = self.parent_change_state(transition)?;
if transition == gst::StateChange::ReadyToNull {
let _ = self.state.lock().unwrap().take();
diff --git a/generic/threadshare/examples/standalone/sink/imp.rs b/generic/threadshare/examples/standalone/sink/imp.rs
index de0406f58..62ea73b60 100644
--- a/generic/threadshare/examples/standalone/sink/imp.rs
+++ b/generic/threadshare/examples/standalone/sink/imp.rs
@@ -608,12 +608,12 @@ impl TestSink {
self.item_sender.lock().unwrap().as_ref().unwrap().clone()
}
- fn prepare(&self, element: &super::TestSink) -> Result<(), gst::ErrorMessage> {
+ fn prepare(&self) -> Result<(), gst::ErrorMessage> {
let raise_log_level = self.settings.lock().unwrap().raise_log_level;
if raise_log_level {
- gst::debug!(CAT, obj: element, "Preparing");
+ gst::debug!(CAT, imp: self, "Preparing");
} else {
- gst::trace!(CAT, obj: element, "Preparing");
+ gst::trace!(CAT, imp: self, "Preparing");
}
let context = {
@@ -629,70 +629,70 @@ impl TestSink {
// Enable backpressure for items
let (item_sender, item_receiver) = flume::bounded(0);
- let task_impl = TestSinkTask::new(element, item_receiver);
+ let task_impl = TestSinkTask::new(&*self.instance(), item_receiver);
self.task.prepare(task_impl, context).block_on()?;
*self.item_sender.lock().unwrap() = Some(item_sender);
if raise_log_level {
- gst::debug!(CAT, obj: element, "Prepared");
+ gst::debug!(CAT, imp: self, "Prepared");
} else {
- gst::trace!(CAT, obj: element, "Prepared");
+ gst::trace!(CAT, imp: self, "Prepared");
}
Ok(())
}
- fn unprepare(&self, element: &super::TestSink) {
+ fn unprepare(&self) {
let raise_log_level = self.settings.lock().unwrap().raise_log_level;
if raise_log_level {
- gst::debug!(CAT, obj: element, "Unpreparing");
+ gst::debug!(CAT, imp: self, "Unpreparing");
} else {
- gst::trace!(CAT, obj: element, "Unpreparing");
+ gst::trace!(CAT, imp: self, "Unpreparing");
}
self.task.unprepare().block_on().unwrap();
if raise_log_level {
- gst::debug!(CAT, obj: element, "Unprepared");
+ gst::debug!(CAT, imp: self, "Unprepared");
} else {
- gst::trace!(CAT, obj: element, "Unprepared");
+ gst::trace!(CAT, imp: self, "Unprepared");
}
}
- fn stop(&self, element: &super::TestSink) -> Result<(), gst::ErrorMessage> {
+ fn stop(&self) -> Result<(), gst::ErrorMessage> {
let raise_log_level = self.settings.lock().unwrap().raise_log_level;
if raise_log_level {
- gst::debug!(CAT, obj: element, "Stopping");
+ gst::debug!(CAT, imp: self, "Stopping");
} else {
- gst::trace!(CAT, obj: element, "Stopping");
+ gst::trace!(CAT, imp: self, "Stopping");
}
self.task.stop().block_on()?;
if raise_log_level {
- gst::debug!(CAT, obj: element, "Stopped");
+ gst::debug!(CAT, imp: self, "Stopped");
} else {
- gst::trace!(CAT, obj: element, "Stopped");
+ gst::trace!(CAT, imp: self, "Stopped");
}
Ok(())
}
- fn start(&self, element: &super::TestSink) -> Result<(), gst::ErrorMessage> {
+ fn start(&self) -> Result<(), gst::ErrorMessage> {
let raise_log_level = self.settings.lock().unwrap().raise_log_level;
if raise_log_level {
- gst::debug!(CAT, obj: element, "Starting");
+ gst::debug!(CAT, imp: self, "Starting");
} else {
- gst::trace!(CAT, obj: element, "Starting");
+ gst::trace!(CAT, imp: self, "Starting");
}
self.task.start().block_on()?;
if raise_log_level {
- gst::debug!(CAT, obj: element, "Started");
+ gst::debug!(CAT, imp: self, "Started");
} else {
- gst::trace!(CAT, obj: element, "Started");
+ gst::trace!(CAT, imp: self, "Started");
}
Ok(())
@@ -760,13 +760,7 @@ impl ObjectImpl for TestSink {
PROPERTIES.as_ref()
}
- fn set_property(
- &self,
- _obj: &Self::Type,
- _id: usize,
- value: &glib::Value,
- pspec: &glib::ParamSpec,
- ) {
+ fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
let mut settings = self.settings.lock().unwrap();
match pspec.name() {
"context" => {
@@ -800,7 +794,7 @@ impl ObjectImpl for TestSink {
}
}
- fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
+ fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let settings = self.settings.lock().unwrap();
match pspec.name() {
"context" => settings.context.to_value(),
@@ -816,12 +810,12 @@ impl ObjectImpl for TestSink {
}
}
- fn constructed(&self, obj: &Self::Type) {
- self.parent_constructed(obj);
+ fn constructed(&self) {
+ self.parent_constructed();
+ let obj = self.instance();
obj.add_pad(self.sink_pad.gst_pad()).unwrap();
-
- gstthreadshare::set_element_flags(obj, gst::ElementFlags::SINK);
+ obj.set_element_flags(gst::ElementFlags::SINK);
}
}
@@ -861,30 +855,29 @@ impl ElementImpl for TestSink {
fn change_state(
&self,
- element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
- gst::trace!(CAT, obj: element, "Changing state {:?}", transition);
+ gst::trace!(CAT, imp: self, "Changing state {:?}", transition);
match transition {
gst::StateChange::NullToReady => {
- self.prepare(element).map_err(|err| {
- element.post_error_message(err);
+ self.prepare().map_err(|err| {
+ self.post_error_message(err);
gst::StateChangeError
})?;
}
gst::StateChange::ReadyToPaused => {
- self.start(element).map_err(|_| gst::StateChangeError)?;
+ self.start().map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::PausedToReady => {
- self.stop(element).map_err(|_| gst::StateChangeError)?;
+ self.stop().map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToNull => {
- self.unprepare(element);
+ self.unprepare();
}
_ => (),
}
- self.parent_change_state(element, transition)
+ self.parent_change_state(transition)
}
}
diff --git a/generic/threadshare/examples/standalone/src/imp.rs b/generic/threadshare/examples/standalone/src/imp.rs
index 1680f75a7..1efa12751 100644
--- a/generic/threadshare/examples/standalone/src/imp.rs
+++ b/generic/threadshare/examples/standalone/src/imp.rs
@@ -318,12 +318,12 @@ pub struct TestSrc {
}
impl TestSrc {
- fn prepare(&self, element: &super::TestSrc) -> Result<(), gst::ErrorMessage> {
+ fn prepare(&self) -> Result<(), gst::ErrorMessage> {
let raise_log_level = self.settings.lock().unwrap().raise_log_level;
if raise_log_level {
- gst::debug!(CAT, obj: element, "Preparing");
+ gst::debug!(CAT, imp: self, "Preparing");
} else {
- gst::trace!(CAT, obj: element, "Preparing");
+ gst::trace!(CAT, imp: self, "Preparing");
}
let settings = self.settings.lock().unwrap();
@@ -337,87 +337,87 @@ impl TestSrc {
drop(settings);
self.task
- .prepare(SrcTask::new(element.clone()), context)
+ .prepare(SrcTask::new(self.instance().clone()), context)
.block_on()?;
if raise_log_level {
- gst::debug!(CAT, obj: element, "Prepared");
+ gst::debug!(CAT, imp: self, "Prepared");
} else {
- gst::trace!(CAT, obj: element, "Prepared");
+ gst::trace!(CAT, imp: self, "Prepared");
}
Ok(())
}
- fn unprepare(&self, element: &super::TestSrc) {
+ fn unprepare(&self) {
let raise_log_level = self.settings.lock().unwrap().raise_log_level;
if raise_log_level {
- gst::debug!(CAT, obj: element, "Unpreparing");
+ gst::debug!(CAT, imp: self, "Unpreparing");
} else {
- gst::trace!(CAT, obj: element, "Unpreparing");
+ gst::trace!(CAT, imp: self, "Unpreparing");
}
self.task.unprepare().block_on().unwrap();
if raise_log_level {
- gst::debug!(CAT, obj: element, "Unprepared");
+ gst::debug!(CAT, imp: self, "Unprepared");
} else {
- gst::trace!(CAT, obj: element, "Unprepared");
+ gst::trace!(CAT, imp: self, "Unprepared");
}
}
- fn stop(&self, element: &super::TestSrc) -> Result<(), gst::ErrorMessage> {
+ fn stop(&self) -> Result<(), gst::ErrorMessage> {
let raise_log_level = self.settings.lock().unwrap().raise_log_level;
if raise_log_level {
- gst::debug!(CAT, obj: element, "Stopping");
+ gst::debug!(CAT, imp: self, "Stopping");
} else {
- gst::trace!(CAT, obj: element, "Stopping");
+ gst::trace!(CAT, imp: self, "Stopping");
}
self.task.stop().block_on()?;
if raise_log_level {
- gst::debug!(CAT, obj: element, "Stopped");
+ gst::debug!(CAT, imp: self, "Stopped");
} else {
- gst::trace!(CAT, obj: element, "Stopped");
+ gst::trace!(CAT, imp: self, "Stopped");
}
Ok(())
}
- fn start(&self, element: &super::TestSrc) -> Result<(), gst::ErrorMessage> {
+ fn start(&self) -> Result<(), gst::ErrorMessage> {
let raise_log_level = self.settings.lock().unwrap().raise_log_level;
if raise_log_level {
- gst::debug!(CAT, obj: element, "Starting");
+ gst::debug!(CAT, imp: self, "Starting");
} else {
- gst::trace!(CAT, obj: element, "Starting");
+ gst::trace!(CAT, imp: self, "Starting");
}
self.task.start().block_on()?;
if raise_log_level {
- gst::debug!(CAT, obj: element, "Started");
+ gst::debug!(CAT, imp: self, "Started");
} else {
- gst::trace!(CAT, obj: element, "Started");
+ gst::trace!(CAT, imp: self, "Started");
}
Ok(())
}
- fn pause(&self, element: &super::TestSrc) -> Result<(), gst::ErrorMessage> {
+ fn pause(&self) -> Result<(), gst::ErrorMessage> {
let raise_log_level = self.settings.lock().unwrap().raise_log_level;
if raise_log_level {
- gst::debug!(CAT, obj: element, "Pausing");
+ gst::debug!(CAT, imp: self, "Pausing");
} else {
- gst::trace!(CAT, obj: element, "Pausing");
+ gst::trace!(CAT, imp: self, "Pausing");
}
self.task.pause().block_on()?;
if raise_log_level {
- gst::debug!(CAT, obj: element, "Paused");
+ gst::debug!(CAT, imp: self, "Paused");
} else {
- gst::trace!(CAT, obj: element, "Paused");
+ gst::trace!(CAT, imp: self, "Paused");
}
Ok(())
@@ -479,13 +479,7 @@ impl ObjectImpl for TestSrc {
PROPERTIES.as_ref()
}
- fn set_property(
- &self,
- _obj: &Self::Type,
- _id: usize,
- value: &glib::Value,
- pspec: &glib::ParamSpec,
- ) {
+ fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
let mut settings = self.settings.lock().unwrap();
match pspec.name() {
"context" => {
@@ -515,7 +509,7 @@ impl ObjectImpl for TestSrc {
}
}
- fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
+ fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let settings = self.settings.lock().unwrap();
match pspec.name() {
"context" => settings.context.to_value(),
@@ -531,12 +525,12 @@ impl ObjectImpl for TestSrc {
}
}
- fn constructed(&self, obj: &Self::Type) {
- self.parent_constructed(obj);
+ fn constructed(&self) {
+ self.parent_constructed();
+ let obj = self.instance();
obj.add_pad(self.src_pad.gst_pad()).unwrap();
-
- gstthreadshare::set_element_flags(obj, gst::ElementFlags::SOURCE);
+ obj.set_element_flags(gst::ElementFlags::SOURCE);
}
}
@@ -575,41 +569,40 @@ impl ElementImpl for TestSrc {
fn change_state(
&self,
- element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
- gst::trace!(CAT, obj: element, "Changing state {:?}", transition);
+ gst::trace!(CAT, imp: self, "Changing state {:?}", transition);
match transition {
gst::StateChange::NullToReady => {
- self.prepare(element).map_err(|err| {
- element.post_error_message(err);
+ self.prepare().map_err(|err| {
+ self.post_error_message(err);
gst::StateChangeError
})?;
}
gst::StateChange::PlayingToPaused => {
- self.pause(element).map_err(|_| gst::StateChangeError)?;
+ self.pause().map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToNull => {
- self.unprepare(element);
+ self.unprepare();
}
_ => (),
}
- let mut success = self.parent_change_state(element, transition)?;
+ let mut success = self.parent_change_state(transition)?;
match transition {
gst::StateChange::ReadyToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToPlaying => {
- self.start(element).map_err(|_| gst::StateChangeError)?;
+ self.start().map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::PlayingToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToReady => {
- self.stop(element).map_err(|_| gst::StateChangeError)?;
+ self.stop().map_err(|_| gst::StateChangeError)?;
}
_ => (),
}
diff --git a/generic/threadshare/src/appsrc/imp.rs b/generic/threadshare/src/appsrc/imp.rs
index a88e83523..8eee681ae 100644
--- a/generic/threadshare/src/appsrc/imp.rs
+++ b/generic/threadshare/src/appsrc/imp.rs
@@ -319,24 +319,24 @@ pub struct AppSrc {
}
impl AppSrc {
- fn push_buffer(&self, element: &super::AppSrc, mut buffer: gst::Buffer) -> bool {
+ fn push_buffer(&self, mut buffer: gst::Buffer) -> bool {
let state = self.task.lock_state();
if *state != TaskState::Started && *state != TaskState::Paused {
- gst::debug!(CAT, obj: element, "Rejecting buffer due to element state");
+ gst::debug!(CAT, imp: self, "Rejecting buffer due to element state");
return false;
}
let do_timestamp = self.settings.lock().unwrap().do_timestamp;
if do_timestamp {
- if let Some(clock) = element.clock() {
- let base_time = element.base_time();
+ if let Some(clock) = self.instance().clock() {
+ let base_time = self.instance().base_time();
let now = clock.time();
let buffer = buffer.make_mut();
buffer.set_dts(now.opt_checked_sub(base_time).ok().flatten());
buffer.set_pts(None);
} else {
- gst::error!(CAT, obj: element, "Don't have a clock yet");
+ gst::error!(CAT, imp: self, "Don't have a clock yet");
return false;
}
}
@@ -349,13 +349,13 @@ impl AppSrc {
{
Ok(_) => true,
Err(err) => {
- gst::error!(CAT, obj: element, "Failed to queue buffer: {}", err);
+ gst::error!(CAT, imp: self, "Failed to queue buffer: {}", err);
false
}
}
}
- fn end_of_stream(&self, element: &super::AppSrc) -> bool {
+ fn end_of_stream(&self) -> bool {
let mut sender = self.sender.lock().unwrap();
let sender = match sender.as_mut() {
Some(sender) => sender,
@@ -365,14 +365,14 @@ impl AppSrc {
match sender.try_send(StreamItem::Event(gst::event::Eos::new())) {
Ok(_) => true,
Err(err) => {
- gst::error!(CAT, obj: element, "Failed to queue EOS: {}", err);
+ gst::error!(CAT, imp: self, "Failed to queue EOS: {}", err);
false
}
}
}
- fn prepare(&self, element: &super::AppSrc) -> Result<(), gst::ErrorMessage> {
- gst::debug!(CAT, obj: element, "Preparing");
+ fn prepare(&self) -> Result<(), gst::ErrorMessage> {
+ gst::debug!(CAT, imp: self, "Preparing");
let settings = self.settings.lock().unwrap();
let context =
@@ -396,41 +396,41 @@ impl AppSrc {
*self.sender.lock().unwrap() = Some(sender);
self.task
- .prepare(AppSrcTask::new(element.clone(), receiver), context)
+ .prepare(AppSrcTask::new(self.instance().clone(), receiver), context)
.block_on()?;
- gst::debug!(CAT, obj: element, "Prepared");
+ gst::debug!(CAT, imp: self, "Prepared");
Ok(())
}
- fn unprepare(&self, element: &super::AppSrc) {
- gst::debug!(CAT, obj: element, "Unpreparing");
+ fn unprepare(&self) {
+ gst::debug!(CAT, imp: self, "Unpreparing");
*self.sender.lock().unwrap() = None;
self.task.unprepare().block_on().unwrap();
- gst::debug!(CAT, obj: element, "Unprepared");
+ gst::debug!(CAT, imp: self, "Unprepared");
}
- fn stop(&self, element: &super::AppSrc) -> Result<(), gst::ErrorMessage> {
- gst::debug!(CAT, obj: element, "Stopping");
+ fn stop(&self) -> Result<(), gst::ErrorMessage> {
+ gst::debug!(CAT, imp: self, "Stopping");
self.task.stop().block_on()?;
- gst::debug!(CAT, obj: element, "Stopped");
+ gst::debug!(CAT, imp: self, "Stopped");
Ok(())
}
- fn start(&self, element: &super::AppSrc) -> Result<(), gst::ErrorMessage> {
- gst::debug!(CAT, obj: element, "Starting");
+ fn start(&self) -> Result<(), gst::ErrorMessage> {
+ gst::debug!(CAT, imp: self, "Starting");
self.task.start().block_on()?;
- gst::debug!(CAT, obj: element, "Started");
+ gst::debug!(CAT, imp: self, "Started");
Ok(())
}
- fn pause(&self, element: &super::AppSrc) -> Result<(), gst::ErrorMessage> {
- gst::debug!(CAT, obj: element, "Pausing");
+ fn pause(&self) -> Result<(), gst::ErrorMessage> {
+ gst::debug!(CAT, imp: self, "Pausing");
self.task.pause().block_on()?;
- gst::debug!(CAT, obj: element, "Paused");
+ gst::debug!(CAT, imp: self, "Paused");
Ok(())
}
}
@@ -503,7 +503,7 @@ impl ObjectImpl for AppSrc {
let buffer = args[1].get::<gst::Buffer>().expect("signal arg");
let appsrc = element.imp();
- Some(appsrc.push_buffer(&element, buffer).to_value())
+ Some(appsrc.push_buffer(buffer).to_value())
})
.build(),
/**
@@ -519,7 +519,7 @@ impl ObjectImpl for AppSrc {
let element = args[0].get::<super::AppSrc>().expect("signal arg");
let appsrc = element.imp();
- Some(appsrc.end_of_stream(&element).to_value())
+ Some(appsrc.end_of_stream().to_value())
})
.build(),
]
@@ -528,13 +528,7 @@ impl ObjectImpl for AppSrc {
SIGNALS.as_ref()
}
- fn set_property(
- &self,
- _obj: &Self::Type,
- _id: usize,
- value: &glib::Value,
- pspec: &glib::ParamSpec,
- ) {
+ fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
let mut settings = self.settings.lock().unwrap();
match pspec.name() {
"context" => {
@@ -561,7 +555,7 @@ impl ObjectImpl for AppSrc {
}
}
- fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
+ fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let settings = self.settings.lock().unwrap();
match pspec.name() {
"context" => settings.context.to_value(),
@@ -573,12 +567,12 @@ impl ObjectImpl for AppSrc {
}
}
- fn constructed(&self, obj: &Self::Type) {
- self.parent_constructed(obj);
+ fn constructed(&self) {
+ self.parent_constructed();
+ let obj = self.instance();
obj.add_pad(self.src_pad.gst_pad()).unwrap();
-
- crate::set_element_flags(obj, gst::ElementFlags::SOURCE);
+ obj.set_element_flags(gst::ElementFlags::SOURCE);
}
}
@@ -617,41 +611,40 @@ impl ElementImpl for AppSrc {
fn change_state(
&self,
- element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
- gst::trace!(CAT, obj: element, "Changing state {:?}", transition);
+ gst::trace!(CAT, imp: self, "Changing state {:?}", transition);
match transition {
gst::StateChange::NullToReady => {
- self.prepare(element).map_err(|err| {
- element.post_error_message(err);
+ self.prepare().map_err(|err| {
+ self.post_error_message(err);
gst::StateChangeError
})?;
}
gst::StateChange::PlayingToPaused => {
- self.pause(element).map_err(|_| gst::StateChangeError)?;
+ self.pause().map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToNull => {
- self.unprepare(element);
+ self.unprepare();
}
_ => (),
}
- let mut success = self.parent_change_state(element, transition)?;
+ let mut success = self.parent_change_state(transition)?;
match transition {
gst::StateChange::ReadyToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToPlaying => {
- self.start(element).map_err(|_| gst::StateChangeError)?;
+ self.start().map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::PlayingToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToReady => {
- self.stop(element).map_err(|_| gst::StateChangeError)?;
+ self.stop().map_err(|_| gst::StateChangeError)?;
}
_ => (),
}
diff --git a/generic/threadshare/src/inputselector/imp.rs b/generic/threadshare/src/inputselector/imp.rs
index 41d962de4..4175f4cff 100644
--- a/generic/threadshare/src/inputselector/imp.rs
+++ b/generic/threadshare/src/inputselector/imp.rs
@@ -92,8 +92,8 @@ impl InputSelectorPadSinkHandler {
async fn handle_item(
&self,
- pad: &PadSinkRef<'_>,
element: &super::InputSelector,
+ pad: &PadSinkRef<'_>,
mut buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let inputselector = element.imp();
@@ -173,7 +173,7 @@ impl PadSinkHandler for InputSelectorPadSinkHandler {
let pad_weak = pad.downgrade();
async move {
let pad = pad_weak.upgrade().expect("PadSink no longer exists");
- this.handle_item(&pad, &element, buffer).await
+ this.handle_item(&element, &pad, buffer).await
}
.boxed()
}
@@ -193,7 +193,7 @@ impl PadSinkHandler for InputSelectorPadSinkHandler {
gst::log!(CAT, obj: pad.gst_pad(), "Handling buffer list {:?}", list);
// TODO: Ideally we would keep the list intact and forward it in one go
for buffer in list.iter_owned() {
- this.handle_item(&pad, &element, buffer).await?;
+ this.handle_item(&element, &pad, buffer).await?;
}
Ok(gst::FlowSuccess::Ok)
@@ -377,11 +377,11 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
});
impl InputSelector {
- fn unprepare(&self, element: &super::InputSelector) {
+ fn unprepare(&self) {
let mut state = self.state.lock().unwrap();
- gst::debug!(CAT, obj: element, "Unpreparing");
+ gst::debug!(CAT, imp: self, "Unpreparing");
*state = State::default();
- gst::debug!(CAT, obj: element, "Unprepared");
+ gst::debug!(CAT, imp: self, "Unprepared");
}
}
@@ -433,13 +433,7 @@ impl ObjectImpl for InputSelector {
PROPERTIES.as_ref()
}
- fn set_property(
- &self,
- _obj: &Self::Type,
- _id: usize,
- value: &glib::Value,
- pspec: &glib::ParamSpec,
- ) {
+ fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
match pspec.name() {
"context" => {
let mut settings = self.settings.lock().unwrap();
@@ -488,7 +482,7 @@ impl ObjectImpl for InputSelector {
}
}
- fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
+ fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
"context" => {
let settings = self.settings.lock().unwrap();
@@ -507,9 +501,10 @@ impl ObjectImpl for InputSelector {
}
}
- fn constructed(&self, obj: &Self::Type) {
- self.parent_constructed(obj);
+ fn constructed(&self) {
+ self.parent_constructed();
+ let obj = self.instance();
obj.add_pad(self.src_pad.gst_pad()).unwrap();
obj.set_element_flags(gst::ElementFlags::PROVIDE_CLOCK | gst::ElementFlags::REQUIRE_CLOCK);
}
@@ -558,16 +553,15 @@ impl ElementImpl for InputSelector {
fn change_state(
&self,
- element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
- gst::trace!(CAT, obj: element, "Changing state {:?}", transition);
+ gst::trace!(CAT, imp: self, "Changing state {:?}", transition);
if let gst::StateChange::ReadyToNull = transition {
- self.unprepare(element);
+ self.unprepare();
}
- let mut success = self.parent_change_state(element, transition)?;
+ let mut success = self.parent_change_state(transition)?;
match transition {
gst::StateChange::ReadyToPaused => {
@@ -584,7 +578,6 @@ impl ElementImpl for InputSelector {
fn request_new_pad(
&self,
- element: &Self::Type,
templ: &gst::PadTemplate,
_name: Option<String>,
_caps: Option<&gst::Caps>,
@@ -595,7 +588,7 @@ impl ElementImpl for InputSelector {
gst::Pad::from_template(templ, Some(format!("sink_{}", pads.pad_serial).as_str()));
pads.pad_serial += 1;
sink_pad.set_active(true).unwrap();
- element.add_pad(&sink_pad).unwrap();
+ self.instance().add_pad(&sink_pad).unwrap();
let sink_pad = PadSink::new(sink_pad, InputSelectorPadSinkHandler::default());
let ret = sink_pad.gst_pad().clone();
@@ -608,22 +601,30 @@ impl ElementImpl for InputSelector {
drop(pads);
drop(state);
- let _ = element.post_message(gst::message::Latency::builder().src(element).build());
+ let _ = self.instance().post_message(
+ gst::message::Latency::builder()
+ .src(&*self.instance())
+ .build(),
+ );
Some(ret)
}
- fn release_pad(&self, element: &Self::Type, pad: &gst::Pad) {
+ fn release_pad(&self, pad: &gst::Pad) {
let mut pads = self.pads.lock().unwrap();
let sink_pad = pads.sink_pads.remove(pad).unwrap();
drop(sink_pad);
- element.remove_pad(pad).unwrap();
+ self.instance().remove_pad(pad).unwrap();
drop(pads);
- let _ = element.post_message(gst::message::Latency::builder().src(element).build());
+ let _ = self.instance().post_message(
+ gst::message::Latency::builder()
+ .src(&*self.instance())
+ .build(),
+ );
}
- fn provide_clock(&self, _element: &Self::Type) -> Option<gst::Clock> {
+ fn provide_clock(&self) -> Option<gst::Clock> {
Some(gst::SystemClock::obtain())
}
}
diff --git a/generic/threadshare/src/jitterbuffer/imp.rs b/generic/threadshare/src/jitterbuffer/imp.rs
index 516c71c4f..f2ec87fa4 100644
--- a/generic/threadshare/src/jitterbuffer/imp.rs
+++ b/generic/threadshare/src/jitterbuffer/imp.rs
@@ -1279,16 +1279,16 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
});
impl JitterBuffer {
- fn clear_pt_map(&self, element: &super::JitterBuffer) {
- gst::debug!(CAT, obj: element, "Clearing PT map");
+ fn clear_pt_map(&self) {
+ gst::debug!(CAT, imp: self, "Clearing PT map");
let mut state = self.state.lock().unwrap();
state.clock_rate = None;
state.jbuf.reset_skew();
}
- fn prepare(&self, element: &super::JitterBuffer) -> Result<(), gst::ErrorMessage> {
- gst::debug!(CAT, obj: element, "Preparing");
+ fn prepare(&self) -> Result<(), gst::ErrorMessage> {
+ gst::debug!(CAT, imp: self, "Preparing");
let context = {
let settings = self.settings.lock().unwrap();
@@ -1297,33 +1297,37 @@ impl JitterBuffer {
self.task
.prepare(
- JitterBufferTask::new(element, &self.src_pad_handler, &self.sink_pad_handler),
+ JitterBufferTask::new(
+ &*self.instance(),
+ &self.src_pad_handler,
+ &self.sink_pad_handler,
+ ),
context,
)
.block_on()?;
- gst::debug!(CAT, obj: element, "Prepared");
+ gst::debug!(CAT, imp: self, "Prepared");
Ok(())
}
- fn unprepare(&self, element: &super::JitterBuffer) {
- gst::debug!(CAT, obj: element, "Unpreparing");
+ fn unprepare(&self) {
+ gst::debug!(CAT, imp: self, "Unpreparing");
self.task.unprepare().block_on().unwrap();
- gst::debug!(CAT, obj: element, "Unprepared");
+ gst::debug!(CAT, imp: self, "Unprepared");
}
- fn start(&self, element: &super::JitterBuffer) -> Result<(), gst::ErrorMessage> {
- gst::debug!(CAT, obj: element, "Starting");
+ fn start(&self) -> Result<(), gst::ErrorMessage> {
+ gst::debug!(CAT, imp: self, "Starting");
self.task.start().block_on()?;
- gst::debug!(CAT, obj: element, "Started");
+ gst::debug!(CAT, imp: self, "Started");
Ok(())
}
- fn stop(&self, element: &super::JitterBuffer) -> Result<(), gst::ErrorMessage> {
- gst::debug!(CAT, obj: element, "Stopping");
+ fn stop(&self) -> Result<(), gst::ErrorMessage> {
+ gst::debug!(CAT, imp: self, "Stopping");
self.task.stop().block_on()?;
- gst::debug!(CAT, obj: element, "Stopped");
+ gst::debug!(CAT, imp: self, "Stopped");
Ok(())
}
}
@@ -1410,7 +1414,7 @@ impl ObjectImpl for JitterBuffer {
.class_handler(|_, args| {
let element = args[0].get::<super::JitterBuffer>().expect("signal arg");
let jb = element.imp();
- jb.clear_pt_map(&element);
+ jb.clear_pt_map();
None
})
.build(),
@@ -1424,13 +1428,7 @@ impl ObjectImpl for JitterBuffer {
SIGNALS.as_ref()
}
- fn set_property(
- &self,
- obj: &Self::Type,
- _id: usize,
- value: &glib::Value,
- pspec: &glib::ParamSpec,
- ) {
+ fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
match pspec.name() {
"latency" => {
let latency = {
@@ -1444,7 +1442,11 @@ impl ObjectImpl for JitterBuffer {
let state = self.state.lock().unwrap();
state.jbuf.set_delay(latency);
- let _ = obj.post_message(gst::message::Latency::builder().src(obj).build());
+ let _ = self.instance().post_message(
+ gst::message::Latency::builder()
+ .src(&*self.instance())
+ .build(),
+ );
}
"do-lost" => {
let mut settings = self.settings.lock().unwrap();
@@ -1475,7 +1477,7 @@ impl ObjectImpl for JitterBuffer {
}
}
- fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
+ fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
"latency" => {
let settings = self.settings.lock().unwrap();
@@ -1514,9 +1516,10 @@ impl ObjectImpl for JitterBuffer {
}
}
- fn constructed(&self, obj: &Self::Type) {
- self.parent_constructed(obj);
+ fn constructed(&self) {
+ self.parent_constructed();
+ let obj = self.instance();
obj.add_pad(self.sink_pad.gst_pad()).unwrap();
obj.add_pad(self.src_pad.gst_pad()).unwrap();
obj.set_element_flags(gst::ElementFlags::PROVIDE_CLOCK | gst::ElementFlags::REQUIRE_CLOCK);
@@ -1567,32 +1570,31 @@ impl ElementImpl for JitterBuffer {
fn change_state(
&self,
- element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
- gst::trace!(CAT, obj: element, "Changing state {:?}", transition);
+ gst::trace!(CAT, imp: self, "Changing state {:?}", transition);
match transition {
gst::StateChange::NullToReady => {
- self.prepare(element).map_err(|err| {
- element.post_error_message(err);
+ self.prepare().map_err(|err| {
+ self.post_error_message(err);
gst::StateChangeError
})?;
}
gst::StateChange::PausedToReady => {
- self.stop(element).map_err(|_| gst::StateChangeError)?;
+ self.stop().map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToNull => {
- self.unprepare(element);
+ self.unprepare();
}
_ => (),
}
- let mut success = self.parent_change_state(element, transition)?;
+ let mut success = self.parent_change_state(transition)?;
match transition {
gst::StateChange::ReadyToPaused => {
- self.start(element).map_err(|_| gst::StateChangeError)?;
+ self.start().map_err(|_| gst::StateChangeError)?;
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PlayingToPaused => {
@@ -1604,7 +1606,7 @@ impl ElementImpl for JitterBuffer {
Ok(success)
}
- fn provide_clock(&self, _element: &Self::Type) -> Option<gst::Clock> {
+ fn provide_clock(&self) -> Option<gst::Clock> {
Some(gst::SystemClock::obtain())
}
}
diff --git a/generic/threadshare/src/lib.rs b/generic/threadshare/src/lib.rs
index 91debce1e..94671379b 100644
--- a/generic/threadshare/src/lib.rs
+++ b/generic/threadshare/src/lib.rs
@@ -28,7 +28,6 @@ mod jitterbuffer;
mod proxy;
mod queue;
-use glib::translate::*;
use gst::glib;
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
@@ -55,34 +54,3 @@ gst::plugin_define!(
env!("CARGO_PKG_REPOSITORY"),
env!("BUILD_REL_DATE")
);
-
-pub fn set_element_flags<T: glib::IsA<gst::Object> + glib::IsA<gst::Element>>(
- element: &T,
- flags: gst::ElementFlags,
-) {
- unsafe {
- let ptr: *mut gst::ffi::GstObject = element.as_ptr() as *mut _;
- let _guard = MutexGuard::lock(&(*ptr).lock);
- (*ptr).flags |= flags.into_glib();
- }
-}
-
-#[must_use = "if unused the Mutex will immediately unlock"]
-struct MutexGuard<'a>(&'a glib::ffi::GMutex);
-
-impl<'a> MutexGuard<'a> {
- pub fn lock(mutex: &'a glib::ffi::GMutex) -> Self {
- unsafe {
- glib::ffi::g_mutex_lock(mut_override(mutex));
- }
- MutexGuard(mutex)
- }
-}
-
-impl<'a> Drop for MutexGuard<'a> {
- fn drop(&mut self) {
- unsafe {
- glib::ffi::g_mutex_unlock(mut_override(self.0));
- }
- }
-}
diff --git a/generic/threadshare/src/proxy/imp.rs b/generic/threadshare/src/proxy/imp.rs
index f706db702..e7ce6b303 100644
--- a/generic/threadshare/src/proxy/imp.rs
+++ b/generic/threadshare/src/proxy/imp.rs
@@ -227,9 +227,7 @@ impl PadSinkHandler for ProxySinkPadHandler {
let pad = pad_weak.upgrade().expect("PadSink no longer exists");
gst::log!(SINK_CAT, obj: pad.gst_pad(), "Handling {:?}", buffer);
let proxysink = element.imp();
- proxysink
- .enqueue_item(&element, DataQueueItem::Buffer(buffer))
- .await
+ proxysink.enqueue_item(DataQueueItem::Buffer(buffer)).await
}
.boxed()
}
@@ -248,7 +246,7 @@ impl PadSinkHandler for ProxySinkPadHandler {
gst::log!(SINK_CAT, obj: pad.gst_pad(), "Handling {:?}", list);
let proxysink = element.imp();
proxysink
- .enqueue_item(&element, DataQueueItem::BufferList(list))
+ .enqueue_item(DataQueueItem::BufferList(list))
.await
}
.boxed()
@@ -258,7 +256,7 @@ impl PadSinkHandler for ProxySinkPadHandler {
&self,
pad: &PadSinkRef,
proxysink: &ProxySink,
- element: &gst::Element,
+ _element: &gst::Element,
event: gst::Event,
) -> bool {
use gst::EventView;
@@ -277,7 +275,7 @@ impl PadSinkHandler for ProxySinkPadHandler {
};
if let EventView::FlushStart(..) = event.view() {
- proxysink.stop(element.downcast_ref::<super::ProxySink>().unwrap());
+ proxysink.stop();
}
if let Some(src_pad) = src_pad {
@@ -311,13 +309,13 @@ impl PadSinkHandler for ProxySinkPadHandler {
let _ =
element.post_message(gst::message::Eos::builder().src(&element).build());
}
- EventView::FlushStop(..) => proxysink.start(&element),
+ EventView::FlushStop(..) => proxysink.start(),
_ => (),
}
gst::log!(SINK_CAT, obj: pad.gst_pad(), "Queuing serialized {:?}", event);
proxysink
- .enqueue_item(&element, DataQueueItem::Event(event))
+ .enqueue_item(DataQueueItem::Event(event))
.await
.is_ok()
}
@@ -341,13 +339,13 @@ static SINK_CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
});
impl ProxySink {
- async fn schedule_pending_queue(&self, element: &super::ProxySink) {
+ async fn schedule_pending_queue(&self) {
loop {
let more_queue_space_receiver = {
let proxy_ctx = self.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
- gst::log!(SINK_CAT, obj: element, "Trying to empty pending queue");
+ gst::log!(SINK_CAT, imp: self, "Trying to empty pending queue");
let ProxyContextInner {
pending_queue: ref mut pq,
@@ -372,7 +370,7 @@ impl ProxySink {
receiver
} else {
- gst::log!(SINK_CAT, obj: element, "Pending queue is empty now");
+ gst::log!(SINK_CAT, imp: self, "Pending queue is empty now");
*pq = None;
return;
}
@@ -383,22 +381,18 @@ impl ProxySink {
receiver
}
} else {
- gst::log!(SINK_CAT, obj: element, "Flushing, dropping pending queue");
+ gst::log!(SINK_CAT, imp: self, "Flushing, dropping pending queue");
*pq = None;
return;
}
};
- gst::log!(SINK_CAT, obj: element, "Waiting for more queue space");
+ gst::log!(SINK_CAT, imp: self, "Waiting for more queue space");
let _ = more_queue_space_receiver.await;
}
}
- async fn enqueue_item(
- &self,
- element: &super::ProxySink,
- item: DataQueueItem,
- ) -> Result<gst::FlowSuccess, gst::FlowError> {
+ async fn enqueue_item(&self, item: DataQueueItem) -> Result<gst::FlowSuccess, gst::FlowError> {
let wait_fut = {
let proxy_ctx = self.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
@@ -463,18 +457,18 @@ impl ProxySink {
gst::log!(
SINK_CAT,
- obj: element,
+ imp: self,
"Proxy is full - Pushing first item on pending queue"
);
if schedule_now {
- gst::log!(SINK_CAT, obj: element, "Scheduling pending queue now");
+ gst::log!(SINK_CAT, imp: self, "Scheduling pending queue now");
pending_queue.scheduled = true;
- let wait_fut = self.schedule_pending_queue(element);
+ let wait_fut = self.schedule_pending_queue();
Some(wait_fut)
} else {
- gst::log!(SINK_CAT, obj: element, "Scheduling pending queue later");
+ gst::log!(SINK_CAT, imp: self, "Scheduling pending queue later");
None
}
@@ -494,11 +488,7 @@ impl ProxySink {
};
if let Some(wait_fut) = wait_fut {
- gst::log!(
- SINK_CAT,
- obj: element,
- "Blocking until queue has space again"
- );
+ gst::log!(SINK_CAT, imp: self, "Blocking until queue has space again");
wait_fut.await;
}
@@ -507,8 +497,8 @@ impl ProxySink {
shared_ctx.last_res
}
- fn prepare(&self, element: &super::ProxySink) -> Result<(), gst::ErrorMessage> {
- gst::debug!(SINK_CAT, obj: element, "Preparing");
+ fn prepare(&self) -> Result<(), gst::ErrorMessage> {
+ gst::debug!(SINK_CAT, imp: self, "Preparing");
let proxy_context = self.settings.lock().unwrap().proxy_context.to_string();
@@ -527,22 +517,22 @@ impl ProxySink {
*self.proxy_ctx.lock().unwrap() = Some(proxy_ctx);
- gst::debug!(SINK_CAT, obj: element, "Prepared");
+ gst::debug!(SINK_CAT, imp: self, "Prepared");
Ok(())
}
- fn unprepare(&self, element: &super::ProxySink) {
- gst::debug!(SINK_CAT, obj: element, "Unpreparing");
+ fn unprepare(&self) {
+ gst::debug!(SINK_CAT, imp: self, "Unpreparing");
*self.proxy_ctx.lock().unwrap() = None;
- gst::debug!(SINK_CAT, obj: element, "Unprepared");
+ gst::debug!(SINK_CAT, imp: self, "Unprepared");
}
- fn start(&self, element: &super::ProxySink) {
+ fn start(&self) {
let proxy_ctx = self.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
- gst::debug!(SINK_CAT, obj: element, "Starting");
+ gst::debug!(SINK_CAT, imp: self, "Starting");
{
let settings = self.settings.lock().unwrap();
@@ -552,19 +542,19 @@ impl ProxySink {
shared_ctx.last_res = Ok(gst::FlowSuccess::Ok);
- gst::debug!(SINK_CAT, obj: element, "Started");
+ gst::debug!(SINK_CAT, imp: self, "Started");
}
- fn stop(&self, element: &super::ProxySink) {
+ fn stop(&self) {
let proxy_ctx = self.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
- gst::debug!(SINK_CAT, obj: element, "Stopping");
+ gst::debug!(SINK_CAT, imp: self, "Stopping");
let _ = shared_ctx.pending_queue.take();
shared_ctx.last_res = Err(gst::FlowError::Flushing);
- gst::debug!(SINK_CAT, obj: element, "Stopped");
+ gst::debug!(SINK_CAT, imp: self, "Stopped");
}
}
@@ -599,13 +589,7 @@ impl ObjectImpl for ProxySink {
PROPERTIES.as_ref()
}
- fn set_property(
- &self,
- _obj: &Self::Type,
- _id: usize,
- value: &glib::Value,
- pspec: &glib::ParamSpec,
- ) {
+ fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
let mut settings = self.settings.lock().unwrap();
match pspec.name() {
"proxy-context" => {
@@ -618,7 +602,7 @@ impl ObjectImpl for ProxySink {
}
}
- fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
+ fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let settings = self.settings.lock().unwrap();
match pspec.name() {
"proxy-context" => settings.proxy_context.to_value(),
@@ -626,12 +610,12 @@ impl ObjectImpl for ProxySink {
}
}
- fn constructed(&self, obj: &Self::Type) {
- self.parent_constructed(obj);
+ fn constructed(&self) {
+ self.parent_constructed();
+ let obj = self.instance();
obj.add_pad(self.sink_pad.gst_pad()).unwrap();
-
- crate::set_element_flags(obj, gst::ElementFlags::SINK);
+ obj.set_element_flags(gst::ElementFlags::SINK);
}
}
@@ -671,31 +655,30 @@ impl ElementImpl for ProxySink {
fn change_state(
&self,
- element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
- gst::trace!(SINK_CAT, obj: element, "Changing state {:?}", transition);
+ gst::trace!(SINK_CAT, imp: self, "Changing state {:?}", transition);
match transition {
gst::StateChange::NullToReady => {
- self.prepare(element).map_err(|err| {
- element.post_error_message(err);
+ self.prepare().map_err(|err| {
+ self.post_error_message(err);
gst::StateChangeError
})?;
}
gst::StateChange::PausedToReady => {
- self.stop(element);
+ self.stop();
}
gst::StateChange::ReadyToNull => {
- self.unprepare(element);
+ self.unprepare();
}
_ => (),
}
- let success = self.parent_change_state(element, transition)?;
+ let success = self.parent_change_state(transition)?;
if transition == gst::StateChange::ReadyToPaused {
- self.start(element);
+ self.start();
}
Ok(success)
@@ -992,8 +975,8 @@ static SRC_CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
});
impl ProxySrc {
- fn prepare(&self, element: &super::ProxySrc) -> Result<(), gst::ErrorMessage> {
- gst::debug!(SRC_CAT, obj: element, "Preparing");
+ fn prepare(&self) -> Result<(), gst::ErrorMessage> {
+ gst::debug!(SRC_CAT, imp: self, "Preparing");
let settings = self.settings.lock().unwrap().clone();
@@ -1012,7 +995,7 @@ impl ProxySrc {
})?;
let dataqueue = DataQueue::new(
- &element.clone().upcast(),
+ &self.instance().clone().upcast(),
self.src_pad.gst_pad(),
if settings.max_size_buffers == 0 {
None
@@ -1044,16 +1027,19 @@ impl ProxySrc {
*self.dataqueue.lock().unwrap() = Some(dataqueue.clone());
self.task
- .prepare(ProxySrcTask::new(element.clone(), dataqueue), ts_ctx)
+ .prepare(
+ ProxySrcTask::new(self.instance().clone(), dataqueue),
+ ts_ctx,
+ )
.block_on()?;
- gst::debug!(SRC_CAT, obj: element, "Prepared");
+ gst::debug!(SRC_CAT, imp: self, "Prepared");
Ok(())
}
- fn unprepare(&self, element: &super::ProxySrc) {
- gst::debug!(SRC_CAT, obj: element, "Unpreparing");
+ fn unprepare(&self) {
+ gst::debug!(SRC_CAT, imp: self, "Unpreparing");
{
let settings = self.settings.lock().unwrap();
@@ -1066,27 +1052,27 @@ impl ProxySrc {
*self.dataqueue.lock().unwrap() = None;
*self.proxy_ctx.lock().unwrap() = None;
- gst::debug!(SRC_CAT, obj: element, "Unprepared");
+ gst::debug!(SRC_CAT, imp: self, "Unprepared");
}
- fn stop(&self, element: &super::ProxySrc) -> Result<(), gst::ErrorMessage> {
- gst::debug!(SRC_CAT, obj: element, "Stopping");
+ fn stop(&self) -> Result<(), gst::ErrorMessage> {
+ gst::debug!(SRC_CAT, imp: self, "Stopping");
self.task.stop().await_maybe_on_context()?;
- gst::debug!(SRC_CAT, obj: element, "Stopped");
+ gst::debug!(SRC_CAT, imp: self, "Stopped");
Ok(())
}
- fn start(&self, element: &super::ProxySrc) -> Result<(), gst::ErrorMessage> {
- gst::debug!(SRC_CAT, obj: element, "Starting");
+ fn start(&self) -> Result<(), gst::ErrorMessage> {
+ gst::debug!(SRC_CAT, imp: self, "Starting");
self.task.start().await_maybe_on_context()?;
- gst::debug!(SRC_CAT, obj: element, "Started");
+ gst::debug!(SRC_CAT, imp: self, "Started");
Ok(())
}
- fn pause(&self, element: &super::ProxySrc) -> Result<(), gst::ErrorMessage> {
- gst::debug!(SRC_CAT, obj: element, "Pausing");
+ fn pause(&self) -> Result<(), gst::ErrorMessage> {
+ gst::debug!(SRC_CAT, imp: self, "Pausing");
self.task.pause().block_on()?;
- gst::debug!(SRC_CAT, obj: element, "Paused");
+ gst::debug!(SRC_CAT, imp: self, "Paused");
Ok(())
}
}
@@ -1153,13 +1139,7 @@ impl ObjectImpl for ProxySrc {
PROPERTIES.as_ref()
}
- fn set_property(
- &self,
- _obj: &Self::Type,
- _id: usize,
- value: &glib::Value,
- pspec: &glib::ParamSpec,
- ) {
+ fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
let mut settings = self.settings.lock().unwrap();
match pspec.name() {
"max-size-buffers" => {
@@ -1193,7 +1173,7 @@ impl ObjectImpl for ProxySrc {
}
}
- fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
+ fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let settings = self.settings.lock().unwrap();
match pspec.name() {
"max-size-buffers" => settings.max_size_buffers.to_value(),
@@ -1206,12 +1186,12 @@ impl ObjectImpl for ProxySrc {
}
}
- fn constructed(&self, obj: &Self::Type) {
- self.parent_constructed(obj);
+ fn constructed(&self) {
+ self.parent_constructed();
+ let obj = self.instance();
obj.add_pad(self.src_pad.gst_pad()).unwrap();
-
- crate::set_element_flags(obj, gst::ElementFlags::SOURCE);
+ obj.set_element_flags(gst::ElementFlags::SOURCE);
}
}
@@ -1251,41 +1231,40 @@ impl ElementImpl for ProxySrc {
fn change_state(
&self,
- element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
- gst::trace!(SRC_CAT, obj: element, "Changing state {:?}", transition);
+ gst::trace!(SRC_CAT, imp: self, "Changing state {:?}", transition);
match transition {
gst::StateChange::NullToReady => {
- self.prepare(element).map_err(|err| {
- element.post_error_message(err);
+ self.prepare().map_err(|err| {
+ self.post_error_message(err);
gst::StateChangeError
})?;
}
gst::StateChange::PlayingToPaused => {
- self.pause(element).map_err(|_| gst::StateChangeError)?;
+ self.pause().map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToNull => {
- self.unprepare(element);
+ self.unprepare();
}
_ => (),
}
- let mut success = self.parent_change_state(element, transition)?;
+ let mut success = self.parent_change_state(transition)?;
match transition {
gst::StateChange::ReadyToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToPlaying => {
- self.start(element).map_err(|_| gst::StateChangeError)?;
+ self.start().map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::PlayingToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToReady => {
- self.stop(element).map_err(|_| gst::StateChangeError)?;
+ self.stop().map_err(|_| gst::StateChangeError)?;
}
_ => (),
}
diff --git a/generic/threadshare/src/queue/imp.rs b/generic/threadshare/src/queue/imp.rs
index a3c3b6a1c..b3d1df622 100644
--- a/generic/threadshare/src/queue/imp.rs
+++ b/generic/threadshare/src/queue/imp.rs
@@ -96,9 +96,7 @@ impl PadSinkHandler for QueuePadSinkHandler {
let pad = pad_weak.upgrade().expect("PadSink no longer exists");
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", buffer);
let queue = element.imp();
- queue
- .enqueue_item(&element, DataQueueItem::Buffer(buffer))
- .await
+ queue.enqueue_item(DataQueueItem::Buffer(buffer)).await
}
.boxed()
}
@@ -116,9 +114,7 @@ impl PadSinkHandler for QueuePadSinkHandler {
let pad = pad_weak.upgrade().expect("PadSink no longer exists");
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", list);
let queue = element.imp();
- queue
- .enqueue_item(&element, DataQueueItem::BufferList(list))
- .await
+ queue.enqueue_item(DataQueueItem::BufferList(list)).await
}
.boxed()
}
@@ -183,7 +179,7 @@ impl PadSinkHandler for QueuePadSinkHandler {
gst::log!(CAT, obj: pad.gst_pad(), "Queuing serialized {:?}", event);
queue
- .enqueue_item(&element, DataQueueItem::Event(event))
+ .enqueue_item(DataQueueItem::Event(event))
.await
.is_ok()
}
@@ -495,7 +491,7 @@ impl Queue {
/* Schedules emptying of the pending queue. If there is an upstream
* TaskContext, the new task is spawned, it is otherwise
* returned, for the caller to block on */
- async fn schedule_pending_queue(&self, element: &super::Queue) {
+ async fn schedule_pending_queue(&self) {
loop {
let more_queue_space_receiver = {
let dataqueue = self.dataqueue.lock().unwrap();
@@ -504,7 +500,7 @@ impl Queue {
}
let mut pending_queue_grd = self.pending_queue.lock().unwrap();
- gst::log!(CAT, obj: element, "Trying to empty pending queue");
+ gst::log!(CAT, imp: self, "Trying to empty pending queue");
if let Some(pending_queue) = pending_queue_grd.as_mut() {
let mut failed_item = None;
@@ -521,30 +517,26 @@ impl Queue {
receiver
} else {
- gst::log!(CAT, obj: element, "Pending queue is empty now");
+ gst::log!(CAT, imp: self, "Pending queue is empty now");
*pending_queue_grd = None;
return;
}
} else {
- gst::log!(CAT, obj: element, "Flushing, dropping pending queue");
+ gst::log!(CAT, imp: self, "Flushing, dropping pending queue");
return;
}
};
- gst::log!(CAT, obj: element, "Waiting for more queue space");
+ gst::log!(CAT, imp: self, "Waiting for more queue space");
let _ = more_queue_space_receiver.await;
}
}
- async fn enqueue_item(
- &self,
- element: &super::Queue,
- item: DataQueueItem,
- ) -> Result<gst::FlowSuccess, gst::FlowError> {
+ async fn enqueue_item(&self, item: DataQueueItem) -> Result<gst::FlowSuccess, gst::FlowError> {
let wait_fut = {
let dataqueue = self.dataqueue.lock().unwrap();
let dataqueue = dataqueue.as_ref().ok_or_else(|| {
- gst::error!(CAT, obj: element, "No DataQueue");
+ gst::error!(CAT, imp: self, "No DataQueue");
gst::FlowError::Error
})?;
@@ -573,18 +565,18 @@ impl Queue {
gst::log!(
CAT,
- obj: element,
+ imp: self,
"Queue is full - Pushing first item on pending queue"
);
if schedule_now {
- gst::log!(CAT, obj: element, "Scheduling pending queue now");
+ gst::log!(CAT, imp: self, "Scheduling pending queue now");
pending_queue.as_mut().unwrap().scheduled = true;
- let wait_fut = self.schedule_pending_queue(element);
+ let wait_fut = self.schedule_pending_queue();
Some(wait_fut)
} else {
- gst::log!(CAT, obj: element, "Scheduling pending queue later");
+ gst::log!(CAT, imp: self, "Scheduling pending queue later");
None
}
} else {
@@ -597,20 +589,20 @@ impl Queue {
};
if let Some(wait_fut) = wait_fut {
- gst::log!(CAT, obj: element, "Blocking until queue has space again");
+ gst::log!(CAT, imp: self, "Blocking until queue has space again");
wait_fut.await;
}
*self.last_res.lock().unwrap()
}
- fn prepare(&self, element: &super::Queue) -> Result<(), gst::ErrorMessage> {
- gst::debug!(CAT, obj: element, "Preparing");
+ fn prepare(&self) -> Result<(), gst::ErrorMessage> {
+ gst::debug!(CAT, imp: self, "Preparing");
let settings = self.settings.lock().unwrap().clone();
let dataqueue = DataQueue::new(
- &element.clone().upcast(),
+ &self.instance().clone().upcast(),
self.src_pad.gst_pad(),
if settings.max_size_buffers == 0 {
None
@@ -640,16 +632,16 @@ impl Queue {
})?;
self.task
- .prepare(QueueTask::new(element.clone(), dataqueue), context)
+ .prepare(QueueTask::new(self.instance().clone(), dataqueue), context)
.block_on()?;
- gst::debug!(CAT, obj: element, "Prepared");
+ gst::debug!(CAT, imp: self, "Prepared");
Ok(())
}
- fn unprepare(&self, element: &super::Queue) {
- gst::debug!(CAT, obj: element, "Unpreparing");
+ fn unprepare(&self) {
+ gst::debug!(CAT, imp: self, "Unpreparing");
self.task.unprepare().block_on().unwrap();
@@ -658,20 +650,20 @@ impl Queue {
*self.last_res.lock().unwrap() = Ok(gst::FlowSuccess::Ok);
- gst::debug!(CAT, obj: element, "Unprepared");
+ gst::debug!(CAT, imp: self, "Unprepared");
}
- fn stop(&self, element: &super::Queue) -> Result<(), gst::ErrorMessage> {
- gst::debug!(CAT, obj: element, "Stopping");
+ fn stop(&self) -> Result<(), gst::ErrorMessage> {
+ gst::debug!(CAT, imp: self, "Stopping");
self.task.stop().await_maybe_on_context()?;
- gst::debug!(CAT, obj: element, "Stopped");
+ gst::debug!(CAT, imp: self, "Stopped");
Ok(())
}
- fn start(&self, element: &super::Queue) -> Result<(), gst::ErrorMessage> {
- gst::debug!(CAT, obj: element, "Starting");
+ fn start(&self) -> Result<(), gst::ErrorMessage> {
+ gst::debug!(CAT, imp: self, "Starting");
self.task.start().await_maybe_on_context()?;
- gst::debug!(CAT, obj: element, "Started");
+ gst::debug!(CAT, imp: self, "Started");
Ok(())
}
}
@@ -738,13 +730,7 @@ impl ObjectImpl for Queue {
PROPERTIES.as_ref()
}
- fn set_property(
- &self,
- _obj: &Self::Type,
- _id: usize,
- value: &glib::Value,
- pspec: &glib::ParamSpec,
- ) {
+ fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
let mut settings = self.settings.lock().unwrap();
match pspec.name() {
"max-size-buffers" => {
@@ -772,7 +758,7 @@ impl ObjectImpl for Queue {
}
}
- fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
+ fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let settings = self.settings.lock().unwrap();
match pspec.name() {
"max-size-buffers" => settings.max_size_buffers.to_value(),
@@ -784,9 +770,10 @@ impl ObjectImpl for Queue {
}
}
- fn constructed(&self, obj: &Self::Type) {
- self.parent_constructed(obj);
+ fn constructed(&self) {
+ self.parent_constructed();
+ let obj = self.instance();
obj.add_pad(self.sink_pad.gst_pad()).unwrap();
obj.add_pad(self.src_pad.gst_pad()).unwrap();
}
@@ -836,31 +823,30 @@ impl ElementImpl for Queue {
fn change_state(
&self,
- element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
- gst::trace!(CAT, obj: element, "Changing state {:?}", transition);
+ gst::trace!(CAT, imp: self, "Changing state {:?}", transition);
match transition {
gst::StateChange::NullToReady => {
- self.prepare(element).map_err(|err| {
- element.post_error_message(err);
+ self.prepare().map_err(|err| {
+ self.post_error_message(err);
gst::StateChangeError
})?;
}
gst::StateChange::PausedToReady => {
- self.stop(element).map_err(|_| gst::StateChangeError)?;
+ self.stop().map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToNull => {
- self.unprepare(element);
+ self.unprepare();
}
_ => (),
}
- let success = self.parent_change_state(element, transition)?;
+ let success = self.parent_change_state(transition)?;
if transition == gst::StateChange::ReadyToPaused {
- self.start(element).map_err(|_| gst::StateChangeError)?;
+ self.start().map_err(|_| gst::StateChangeError)?;
}
Ok(success)
diff --git a/generic/threadshare/src/runtime/pad.rs b/generic/threadshare/src/runtime/pad.rs
index 89e144776..7b1b1496e 100644
--- a/generic/threadshare/src/runtime/pad.rs
+++ b/generic/threadshare/src/runtime/pad.rs
@@ -398,8 +398,9 @@ impl PadSrc {
"Panic in PadSrc activate"
))
},
- move |imp, element| {
+ move |imp| {
let this_ref = PadSrcRef::new(inner_arc);
+ let element = imp.instance();
handler.src_activate(
&this_ref,
imp,
@@ -424,8 +425,9 @@ impl PadSrc {
"Panic in PadSrc activatemode"
))
},
- move |imp, element| {
+ move |imp| {
let this_ref = PadSrcRef::new(inner_arc);
+ let element = imp.instance();
this_ref.activate_mode_hook(mode, active)?;
handler.src_activatemode(
&this_ref,
@@ -449,8 +451,9 @@ impl PadSrc {
H::ElementImpl::catch_panic_pad_function(
parent,
|| Err(FlowError::Error),
- move |imp, element| {
+ move |imp| {
let this_ref = PadSrcRef::new(inner_arc);
+ let element = imp.instance();
handler.src_event_full(
&this_ref,
imp,
@@ -469,8 +472,9 @@ impl PadSrc {
H::ElementImpl::catch_panic_pad_function(
parent,
|| false,
- move |imp, element| {
+ move |imp| {
let this_ref = PadSrcRef::new(inner_arc);
+ let element = imp.instance();
if !query.is_serialized() {
handler.src_query(&this_ref, imp, element.dynamic_cast_ref::<gst::Element>().unwrap(), query)
} else {
@@ -807,8 +811,9 @@ impl PadSink {
"Panic in PadSink activate"
))
},
- move |imp, element| {
+ move |imp| {
let this_ref = PadSinkRef::new(inner_arc);
+ let element = imp.instance();
handler.sink_activate(
&this_ref,
imp,
@@ -833,8 +838,9 @@ impl PadSink {
"Panic in PadSink activatemode"
))
},
- move |imp, element| {
+ move |imp| {
let this_ref = PadSinkRef::new(inner_arc);
+ let element = imp.instance();
this_ref.activate_mode_hook(mode, active)?;
handler.sink_activatemode(
@@ -857,7 +863,8 @@ impl PadSink {
H::ElementImpl::catch_panic_pad_function(
parent,
|| Err(FlowError::Error),
- move |imp, element| {
+ move |imp| {
+ let element = imp.instance();
if let Some((ctx, task_id)) = Context::current_task() {
let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc));
let handler = handler.clone();
@@ -898,7 +905,8 @@ impl PadSink {
H::ElementImpl::catch_panic_pad_function(
parent,
|| Err(FlowError::Error),
- move |imp, element| {
+ move |imp| {
+ let element = imp.instance();
if let Some((ctx, task_id)) = Context::current_task() {
let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc));
let handler = handler.clone();
@@ -943,7 +951,8 @@ impl PadSink {
H::ElementImpl::catch_panic_pad_function(
parent,
|| Err(FlowError::Error),
- move |imp, element| {
+ move |imp| {
+ let element = imp.instance();
if event.is_serialized() {
if let Some((ctx, task_id)) = Context::current_task() {
let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc));
@@ -1001,8 +1010,9 @@ impl PadSink {
H::ElementImpl::catch_panic_pad_function(
parent,
|| false,
- move |imp, element| {
+ move |imp| {
let this_ref = PadSinkRef::new(inner_arc);
+ let element = imp.instance();
if !query.is_serialized() {
handler.sink_query(&this_ref, imp, element.dynamic_cast_ref::<gst::Element>().unwrap(), query)
} else {
diff --git a/generic/threadshare/src/tcpclientsrc/imp.rs b/generic/threadshare/src/tcpclientsrc/imp.rs
index bd272c693..41cdaced3 100644
--- a/generic/threadshare/src/tcpclientsrc/imp.rs
+++ b/generic/threadshare/src/tcpclientsrc/imp.rs
@@ -408,8 +408,8 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
});
impl TcpClientSrc {
- fn prepare(&self, element: &super::TcpClientSrc) -> Result<(), gst::ErrorMessage> {
- gst::debug!(CAT, obj: element, "Preparing");
+ fn prepare(&self) -> Result<(), gst::ErrorMessage> {
+ gst::debug!(CAT, imp: self, "Preparing");
let settings = self.settings.lock().unwrap().clone();
let context =
@@ -459,40 +459,40 @@ impl TcpClientSrc {
let _ = self
.task
.prepare(
- TcpClientSrcTask::new(element.clone(), saddr, buffer_pool),
+ TcpClientSrcTask::new(self.instance().clone(), saddr, buffer_pool),
context,
)
.check()?;
- gst::debug!(CAT, obj: element, "Preparing asynchronously");
+ gst::debug!(CAT, imp: self, "Preparing asynchronously");
Ok(())
}
- fn unprepare(&self, element: &super::TcpClientSrc) {
- gst::debug!(CAT, obj: element, "Unpreparing");
+ fn unprepare(&self) {
+ gst::debug!(CAT, imp: self, "Unpreparing");
self.task.unprepare().block_on().unwrap();
- gst::debug!(CAT, obj: element, "Unprepared");
+ gst::debug!(CAT, imp: self, "Unprepared");
}
- fn stop(&self, element: &super::TcpClientSrc) -> Result<(), gst::ErrorMessage> {
- gst::debug!(CAT, obj: element, "Stopping");
+ fn stop(&self) -> Result<(), gst::ErrorMessage> {
+ gst::debug!(CAT, imp: self, "Stopping");
self.task.stop().block_on()?;
- gst::debug!(CAT, obj: element, "Stopped");
+ gst::debug!(CAT, imp: self, "Stopped");
Ok(())
}
- fn start(&self, element: &super::TcpClientSrc) -> Result<(), gst::ErrorMessage> {
- gst::debug!(CAT, obj: element, "Starting");
+ fn start(&self) -> Result<(), gst::ErrorMessage> {
+ gst::debug!(CAT, imp: self, "Starting");
self.task.start().block_on()?;
- gst::debug!(CAT, obj: element, "Started");
+ gst::debug!(CAT, imp: self, "Started");
Ok(())
}
- fn pause(&self, element: &super::TcpClientSrc) -> Result<(), gst::ErrorMessage> {
- gst::debug!(CAT, obj: element, "Pausing");
+ fn pause(&self) -> Result<(), gst::ErrorMessage> {
+ gst::debug!(CAT, imp: self, "Pausing");
self.task.pause().block_on()?;
- gst::debug!(CAT, obj: element, "Paused");
+ gst::debug!(CAT, imp: self, "Paused");
Ok(())
}
}
@@ -558,13 +558,7 @@ impl ObjectImpl for TcpClientSrc {
PROPERTIES.as_ref()
}
- fn set_property(
- &self,
- _obj: &Self::Type,
- _id: usize,
- value: &glib::Value,
- pspec: &glib::ParamSpec,
- ) {
+ fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
let mut settings = self.settings.lock().unwrap();
match pspec.name() {
"host" => {
@@ -594,7 +588,7 @@ impl ObjectImpl for TcpClientSrc {
}
}
- fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
+ fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let settings = self.settings.lock().unwrap();
match pspec.name() {
"host" => settings.host.to_value(),
@@ -607,12 +601,12 @@ impl ObjectImpl for TcpClientSrc {
}
}
- fn constructed(&self, obj: &Self::Type) {
- self.parent_constructed(obj);
+ fn constructed(&self) {
+ self.parent_constructed();
+ let obj = self.instance();
obj.add_pad(self.src_pad.gst_pad()).unwrap();
-
- crate::set_element_flags(obj, gst::ElementFlags::SOURCE);
+ obj.set_element_flags(gst::ElementFlags::SOURCE);
}
}
@@ -651,41 +645,40 @@ impl ElementImpl for TcpClientSrc {
fn change_state(
&self,
- element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
- gst::trace!(CAT, obj: element, "Changing state {:?}", transition);
+ gst::trace!(CAT, imp: self, "Changing state {:?}", transition);
match transition {
gst::StateChange::NullToReady => {
- self.prepare(element).map_err(|err| {
- element.post_error_message(err);
+ self.prepare().map_err(|err| {
+ self.post_error_message(err);
gst::StateChangeError
})?;
}
gst::StateChange::PlayingToPaused => {
- self.pause(element).map_err(|_| gst::StateChangeError)?;
+ self.pause().map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToNull => {
- self.unprepare(element);
+ self.unprepare();
}
_ => (),
}
- let mut success = self.parent_change_state(element, transition)?;
+ let mut success = self.parent_change_state(transition)?;
match transition {
gst::StateChange::ReadyToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToPlaying => {
- self.start(element).map_err(|_| gst::StateChangeError)?;
+ self.start().map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::PlayingToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToReady => {
- self.stop(element).map_err(|_| gst::StateChangeError)?;
+ self.stop().map_err(|_| gst::StateChangeError)?;
}
_ => (),
}
diff --git a/generic/threadshare/src/udpsink/imp.rs b/generic/threadshare/src/udpsink/imp.rs
index bc9f6c208..b99e432c3 100644
--- a/generic/threadshare/src/udpsink/imp.rs
+++ b/generic/threadshare/src/udpsink/imp.rs
@@ -838,8 +838,8 @@ impl UdpSink {
self.item_sender.lock().unwrap().as_ref().unwrap().clone()
}
- fn prepare(&self, element: &super::UdpSink) -> Result<(), gst::ErrorMessage> {
- gst::debug!(CAT, obj: element, "Preparing");
+ fn prepare(&self) -> Result<(), gst::ErrorMessage> {
+ gst::debug!(CAT, imp: self, "Preparing");
let context = {
let settings = self.settings.lock().unwrap();
@@ -855,39 +855,37 @@ impl UdpSink {
// Enable backpressure for items
let (item_sender, item_receiver) = flume::bounded(0);
let (cmd_sender, cmd_receiver) = flume::unbounded();
- let task_impl = UdpSinkTask::new(element, item_receiver, cmd_receiver);
+ let task_impl = UdpSinkTask::new(&*self.instance(), item_receiver, cmd_receiver);
self.task.prepare(task_impl, context).block_on()?;
*self.item_sender.lock().unwrap() = Some(item_sender);
*self.cmd_sender.lock().unwrap() = Some(cmd_sender);
- gst::debug!(CAT, obj: element, "Started preparation");
+ gst::debug!(CAT, imp: self, "Started preparation");
Ok(())
}
- fn unprepare(&self, element: &super::UdpSink) {
- gst::debug!(CAT, obj: element, "Unpreparing");
+ fn unprepare(&self) {
+ gst::debug!(CAT, imp: self, "Unpreparing");
self.task.unprepare().block_on().unwrap();
- gst::debug!(CAT, obj: element, "Unprepared");
+ gst::debug!(CAT, imp: self, "Unprepared");
}
- fn stop(&self, element: &super::UdpSink) -> Result<(), gst::ErrorMessage> {
- gst::debug!(CAT, obj: element, "Stopping");
+ fn stop(&self) -> Result<(), gst::ErrorMessage> {
+ gst::debug!(CAT, imp: self, "Stopping");
self.task.stop().block_on()?;
- gst::debug!(CAT, obj: element, "Stopped");
+ gst::debug!(CAT, imp: self, "Stopped");
Ok(())
}
- fn start(&self, element: &super::UdpSink) -> Result<(), gst::ErrorMessage> {
- gst::debug!(CAT, obj: element, "Starting");
+ fn start(&self) -> Result<(), gst::ErrorMessage> {
+ gst::debug!(CAT, imp: self, "Starting");
self.task.start().block_on()?;
- gst::debug!(CAT, obj: element, "Started");
+ gst::debug!(CAT, imp: self, "Started");
Ok(())
}
-}
-impl UdpSink {
fn add_client(&self, settings: &mut Settings, client: SocketAddr) {
settings.clients.insert(client);
if let Some(cmd_sender) = self.cmd_sender.lock().unwrap().as_mut() {
@@ -919,10 +917,10 @@ impl UdpSink {
}
}
-fn try_into_socket_addr(element: &super::UdpSink, host: &str, port: i32) -> Result<SocketAddr, ()> {
+fn try_into_socket_addr(imp: &UdpSink, host: &str, port: i32) -> Result<SocketAddr, ()> {
let addr: IpAddr = match host.parse() {
Err(err) => {
- gst::error!(CAT, obj: element, "Failed to parse host {}: {}", host, err);
+ gst::error!(CAT, imp: imp, "Failed to parse host {}: {}", host, err);
return Err(());
}
Ok(addr) => addr,
@@ -930,7 +928,7 @@ fn try_into_socket_addr(element: &super::UdpSink, host: &str, port: i32) -> Resu
let port: u16 = match port.try_into() {
Err(err) => {
- gst::error!(CAT, obj: element, "Invalid port {}: {}", port, err);
+ gst::error!(CAT, imp: imp, "Invalid port {}: {}", port, err);
return Err(());
}
Ok(port) => port,
@@ -1071,9 +1069,9 @@ impl ObjectImpl for UdpSink {
let element = args[0].get::<super::UdpSink>().expect("signal arg");
let host = args[1].get::<String>().expect("signal arg");
let port = args[2].get::<i32>().expect("signal arg");
+ let udpsink = element.imp();
- if let Ok(addr) = try_into_socket_addr(&element, &host, port) {
- let udpsink = element.imp();
+ if let Ok(addr) = try_into_socket_addr(udpsink, &host, port) {
let mut settings = udpsink.settings.lock().unwrap();
udpsink.add_client(&mut settings, addr);
}
@@ -1088,9 +1086,9 @@ impl ObjectImpl for UdpSink {
let element = args[0].get::<super::UdpSink>().expect("signal arg");
let host = args[1].get::<String>().expect("signal arg");
let port = args[2].get::<i32>().expect("signal arg");
+ let udpsink = element.imp();
- if let Ok(addr) = try_into_socket_addr(&element, &host, port) {
- let udpsink = element.imp();
+ if let Ok(addr) = try_into_socket_addr(udpsink, &host, port) {
let mut settings = udpsink.settings.lock().unwrap();
udpsink.remove_client(&mut settings, addr);
}
@@ -1116,13 +1114,7 @@ impl ObjectImpl for UdpSink {
SIGNALS.as_ref()
}
- fn set_property(
- &self,
- obj: &Self::Type,
- _id: usize,
- value: &glib::Value,
- pspec: &glib::ParamSpec,
- ) {
+ fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
let mut settings = self.settings.lock().unwrap();
match pspec.name() {
"sync" => {
@@ -1196,9 +1188,9 @@ impl ObjectImpl for UdpSink {
rsplit[0]
.parse::<i32>()
.map_err(|err| {
- gst::error!(CAT, obj: obj, "Invalid port {}: {}", rsplit[0], err);
+ gst::error!(CAT, imp: self, "Invalid port {}: {}", rsplit[0], err);
})
- .and_then(|port| try_into_socket_addr(obj, rsplit[1], port))
+ .and_then(|port| try_into_socket_addr(self, rsplit[1], port))
.ok()
} else {
None
@@ -1222,7 +1214,7 @@ impl ObjectImpl for UdpSink {
}
}
- fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
+ fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let settings = self.settings.lock().unwrap();
match pspec.name() {
"sync" => settings.sync.to_value(),
@@ -1268,12 +1260,12 @@ impl ObjectImpl for UdpSink {
}
}
- fn constructed(&self, obj: &Self::Type) {
- self.parent_constructed(obj);
+ fn constructed(&self) {
+ self.parent_constructed();
+ let obj = self.instance();
obj.add_pad(self.sink_pad.gst_pad()).unwrap();
-
- crate::set_element_flags(obj, gst::ElementFlags::SINK);
+ obj.set_element_flags(gst::ElementFlags::SINK);
}
}
@@ -1313,34 +1305,33 @@ impl ElementImpl for UdpSink {
fn change_state(
&self,
- element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
- gst::trace!(CAT, obj: element, "Changing state {:?}", transition);
+ gst::trace!(CAT, imp: self, "Changing state {:?}", transition);
match transition {
gst::StateChange::NullToReady => {
- self.prepare(element).map_err(|err| {
- element.post_error_message(err);
+ self.prepare().map_err(|err| {
+ self.post_error_message(err);
gst::StateChangeError
})?;
}
gst::StateChange::ReadyToPaused => {
- self.start(element).map_err(|_| gst::StateChangeError)?;
+ self.start().map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::PausedToReady => {
- self.stop(element).map_err(|_| gst::StateChangeError)?;
+ self.stop().map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToNull => {
- self.unprepare(element);
+ self.unprepare();
}
_ => (),
}
- self.parent_change_state(element, transition)
+ self.parent_change_state(transition)
}
- fn send_event(&self, _element: &Self::Type, event: gst::Event) -> bool {
+ fn send_event(&self, event: gst::Event) -> bool {
match event.view() {
EventView::Latency(ev) => {
let latency = Some(ev.latency());
diff --git a/generic/threadshare/src/udpsrc/imp.rs b/generic/threadshare/src/udpsrc/imp.rs
index 78c9acd26..5c7b79bf6 100644
--- a/generic/threadshare/src/udpsrc/imp.rs
+++ b/generic/threadshare/src/udpsrc/imp.rs
@@ -574,8 +574,8 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
});
impl UdpSrc {
- fn prepare(&self, element: &super::UdpSrc) -> Result<(), gst::ErrorMessage> {
- gst::debug!(CAT, obj: element, "Preparing");
+ fn prepare(&self) -> Result<(), gst::ErrorMessage> {
+ gst::debug!(CAT, imp: self, "Preparing");
let settings = self.settings.lock().unwrap();
let context =
@@ -589,38 +589,38 @@ impl UdpSrc {
*self.configured_caps.lock().unwrap() = None;
self.task
- .prepare(UdpSrcTask::new(element.clone()), context)
+ .prepare(UdpSrcTask::new(self.instance().clone()), context)
.block_on()?;
- gst::debug!(CAT, obj: element, "Prepared");
+ gst::debug!(CAT, imp: self, "Prepared");
Ok(())
}
- fn unprepare(&self, element: &super::UdpSrc) {
- gst::debug!(CAT, obj: element, "Unpreparing");
+ fn unprepare(&self) {
+ gst::debug!(CAT, imp: self, "Unpreparing");
self.task.unprepare().block_on().unwrap();
- gst::debug!(CAT, obj: element, "Unprepared");
+ gst::debug!(CAT, imp: self, "Unprepared");
}
- fn stop(&self, element: &super::UdpSrc) -> Result<(), gst::ErrorMessage> {
- gst::debug!(CAT, obj: element, "Stopping");
+ fn stop(&self) -> Result<(), gst::ErrorMessage> {
+ gst::debug!(CAT, imp: self, "Stopping");
self.task.stop().block_on()?;
- gst::debug!(CAT, obj: element, "Stopped");
+ gst::debug!(CAT, imp: self, "Stopped");
Ok(())
}
- fn start(&self, element: &super::UdpSrc) -> Result<(), gst::ErrorMessage> {
- gst::debug!(CAT, obj: element, "Starting");
+ fn start(&self) -> Result<(), gst::ErrorMessage> {
+ gst::debug!(CAT, imp: self, "Starting");
self.task.start().block_on()?;
- gst::debug!(CAT, obj: element, "Started");
+ gst::debug!(CAT, imp: self, "Started");
Ok(())
}
- fn pause(&self, element: &super::UdpSrc) -> Result<(), gst::ErrorMessage> {
- gst::debug!(CAT, obj: element, "Pausing");
+ fn pause(&self) -> Result<(), gst::ErrorMessage> {
+ gst::debug!(CAT, imp: self, "Pausing");
self.task.pause().block_on()?;
- gst::debug!(CAT, obj: element, "Paused");
+ gst::debug!(CAT, imp: self, "Paused");
Ok(())
}
}
@@ -716,13 +716,7 @@ impl ObjectImpl for UdpSrc {
PROPERTIES.as_ref()
}
- fn set_property(
- &self,
- _obj: &Self::Type,
- _id: usize,
- value: &glib::Value,
- pspec: &glib::ParamSpec,
- ) {
+ fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
let mut settings = self.settings.lock().unwrap();
match pspec.name() {
"address" => {
@@ -767,7 +761,7 @@ impl ObjectImpl for UdpSrc {
}
}
- fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
+ fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let settings = self.settings.lock().unwrap();
match pspec.name() {
"address" => settings.address.to_value(),
@@ -792,12 +786,12 @@ impl ObjectImpl for UdpSrc {
}
}
- fn constructed(&self, obj: &Self::Type) {
- self.parent_constructed(obj);
+ fn constructed(&self) {
+ self.parent_constructed();
+ let obj = self.instance();
obj.add_pad(self.src_pad.gst_pad()).unwrap();
-
- crate::set_element_flags(obj, gst::ElementFlags::SOURCE);
+ obj.set_element_flags(gst::ElementFlags::SOURCE);
}
}
@@ -836,41 +830,40 @@ impl ElementImpl for UdpSrc {
fn change_state(
&self,
- element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
- gst::trace!(CAT, obj: element, "Changing state {:?}", transition);
+ gst::trace!(CAT, imp: self, "Changing state {:?}", transition);
match transition {
gst::StateChange::NullToReady => {
- self.prepare(element).map_err(|err| {
- element.post_error_message(err);
+ self.prepare().map_err(|err| {
+ self.post_error_message(err);
gst::StateChangeError
})?;
}
gst::StateChange::PlayingToPaused => {
- self.pause(element).map_err(|_| gst::StateChangeError)?;
+ self.pause().map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToNull => {
- self.unprepare(element);
+ self.unprepare();
}
_ => (),
}
- let mut success = self.parent_change_state(element, transition)?;
+ let mut success = self.parent_change_state(transition)?;
match transition {
gst::StateChange::ReadyToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToPlaying => {
- self.start(element).map_err(|_| gst::StateChangeError)?;
+ self.start().map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::PlayingToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToReady => {
- self.stop(element).map_err(|_| gst::StateChangeError)?;
+ self.stop().map_err(|_| gst::StateChangeError)?;
}
_ => (),
}
diff --git a/generic/threadshare/tests/pad.rs b/generic/threadshare/tests/pad.rs
index 84b3d4642..312d28208 100644
--- a/generic/threadshare/tests/pad.rs
+++ b/generic/threadshare/tests/pad.rs
@@ -230,8 +230,8 @@ mod imp_src {
}
}
- fn prepare(&self, element: &super::ElementSrcTest) -> Result<(), gst::ErrorMessage> {
- gst::debug!(SRC_CAT, obj: element, "Preparing");
+ fn prepare(&self) -> Result<(), gst::ErrorMessage> {
+ gst::debug!(SRC_CAT, imp: self, "Preparing");
let settings = self.settings.lock().unwrap().clone();
let context =
@@ -246,39 +246,42 @@ mod imp_src {
*self.sender.lock().unwrap() = Some(sender);
self.task
- .prepare(ElementSrcTestTask::new(element.clone(), receiver), context)
+ .prepare(
+ ElementSrcTestTask::new(self.instance().clone(), receiver),
+ context,
+ )
.block_on()?;
- gst::debug!(SRC_CAT, obj: element, "Prepared");
+ gst::debug!(SRC_CAT, imp: self, "Prepared");
Ok(())
}
- fn unprepare(&self, element: &super::ElementSrcTest) {
- gst::debug!(SRC_CAT, obj: element, "Unpreparing");
+ fn unprepare(&self) {
+ gst::debug!(SRC_CAT, imp: self, "Unpreparing");
*self.sender.lock().unwrap() = None;
self.task.unprepare().block_on().unwrap();
- gst::debug!(SRC_CAT, obj: element, "Unprepared");
+ gst::debug!(SRC_CAT, imp: self, "Unprepared");
}
- fn stop(&self, element: &super::ElementSrcTest) {
- gst::debug!(SRC_CAT, obj: element, "Stopping");
+ fn stop(&self) {
+ gst::debug!(SRC_CAT, imp: self, "Stopping");
self.task.stop().await_maybe_on_context().unwrap();
- gst::debug!(SRC_CAT, obj: element, "Stopped");
+ gst::debug!(SRC_CAT, imp: self, "Stopped");
}
- fn start(&self, element: &super::ElementSrcTest) {
- gst::debug!(SRC_CAT, obj: element, "Starting");
+ fn start(&self) {
+ gst::debug!(SRC_CAT, imp: self, "Starting");
self.task.start().await_maybe_on_context().unwrap();
- gst::debug!(SRC_CAT, obj: element, "Started");
+ gst::debug!(SRC_CAT, imp: self, "Started");
}
- fn pause(&self, element: &super::ElementSrcTest) {
- gst::debug!(SRC_CAT, obj: element, "Pausing");
+ fn pause(&self) {
+ gst::debug!(SRC_CAT, imp: self, "Pausing");
self.task.pause().block_on().unwrap();
- gst::debug!(SRC_CAT, obj: element, "Paused");
+ gst::debug!(SRC_CAT, imp: self, "Paused");
}
}
@@ -315,13 +318,7 @@ mod imp_src {
PROPERTIES.as_ref()
}
- fn set_property(
- &self,
- _obj: &Self::Type,
- _id: usize,
- value: &glib::Value,
- pspec: &glib::ParamSpec,
- ) {
+ fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
match pspec.name() {
"context" => {
let context = value
@@ -335,9 +332,10 @@ mod imp_src {
}
}
- fn constructed(&self, obj: &Self::Type) {
- self.parent_constructed(obj);
+ fn constructed(&self) {
+ self.parent_constructed();
+ let obj = self.instance();
obj.add_pad(self.src_pad.gst_pad()).unwrap();
}
}
@@ -377,35 +375,34 @@ mod imp_src {
fn change_state(
&self,
- element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
- gst::log!(SRC_CAT, obj: element, "Changing state {:?}", transition);
+ gst::log!(SRC_CAT, imp: self, "Changing state {:?}", transition);
match transition {
gst::StateChange::NullToReady => {
- self.prepare(element).map_err(|err| {
- element.post_error_message(err);
+ self.prepare().map_err(|err| {
+ self.post_error_message(err);
gst::StateChangeError
})?;
}
gst::StateChange::PlayingToPaused => {
- self.pause(element);
+ self.pause();
}
gst::StateChange::ReadyToNull => {
- self.unprepare(element);
+ self.unprepare();
}
_ => (),
}
- let mut success = self.parent_change_state(element, transition)?;
+ let mut success = self.parent_change_state(transition)?;
match transition {
gst::StateChange::PausedToReady => {
- self.stop(element);
+ self.stop();
}
gst::StateChange::PausedToPlaying => {
- self.start(element);
+ self.start();
}
gst::StateChange::ReadyToPaused | gst::StateChange::PlayingToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
@@ -416,7 +413,7 @@ mod imp_src {
Ok(success)
}
- fn send_event(&self, _element: &Self::Type, event: gst::Event) -> bool {
+ fn send_event(&self, event: gst::Event) -> bool {
match event.view() {
EventView::FlushStart(..) => {
self.task.flush_start().await_maybe_on_context().unwrap();
@@ -464,9 +461,7 @@ mod imp_sink {
.unwrap();
async move {
let elem_sink_test = element.imp();
- elem_sink_test
- .forward_item(&element, Item::Buffer(buffer))
- .await
+ elem_sink_test.forward_item(Item::Buffer(buffer)).await
}
.boxed()
}
@@ -484,9 +479,7 @@ mod imp_sink {
.unwrap();
async move {
let elem_sink_test = element.imp();
- elem_sink_test
- .forward_item(&element, Item::BufferList(list))
- .await
+ elem_sink_test.forward_item(Item::BufferList(list)).await
}
.boxed()
}
@@ -495,14 +488,14 @@ mod imp_sink {
&self,
pad: &PadSinkRef,
elem_sink_test: &ElementSinkTest,
- element: &gst::Element,
+ _element: &gst::Element,
event: gst::Event,
) -> bool {
gst::debug!(SINK_CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event);
match event.view() {
EventView::FlushStart(..) => {
- elem_sink_test.stop(element.downcast_ref::<super::ElementSinkTest>().unwrap());
+ elem_sink_test.stop();
true
}
_ => false,
@@ -526,11 +519,11 @@ mod imp_sink {
let elem_sink_test = element.imp();
if let EventView::FlushStop(..) = event.view() {
- elem_sink_test.start(&element);
+ elem_sink_test.start();
}
elem_sink_test
- .forward_item(&element, Item::Event(event))
+ .forward_item(Item::Event(event))
.await
.is_ok()
}
@@ -546,13 +539,9 @@ mod imp_sink {
}
impl ElementSinkTest {
- async fn forward_item(
- &self,
- element: &super::ElementSinkTest,
- item: Item,
- ) -> Result<gst::FlowSuccess, gst::FlowError> {
+ async fn forward_item(&self, item: Item) -> Result<gst::FlowSuccess, gst::FlowError> {
if !self.flushing.load(Ordering::SeqCst) {
- gst::debug!(SINK_CAT, obj: element, "Fowarding {:?}", item);
+ gst::debug!(SINK_CAT, imp: self, "Fowarding {:?}", item);
let mut sender = self
.sender
.lock()
@@ -568,7 +557,7 @@ mod imp_sink {
} else {
gst::debug!(
SINK_CAT,
- obj: element,
+ imp: self,
"Not fowarding {:?} due to flushing",
item
);
@@ -576,34 +565,32 @@ mod imp_sink {
}
}
- fn start(&self, element: &super::ElementSinkTest) {
- gst::debug!(SINK_CAT, obj: element, "Starting");
+ fn start(&self) {
+ gst::debug!(SINK_CAT, imp: self, "Starting");
self.flushing.store(false, Ordering::SeqCst);
- gst::debug!(SINK_CAT, obj: element, "Started");
+ gst::debug!(SINK_CAT, imp: self, "Started");
}
- fn stop(&self, element: &super::ElementSinkTest) {
- gst::debug!(SINK_CAT, obj: element, "Stopping");
+ fn stop(&self) {
+ gst::debug!(SINK_CAT, imp: self, "Stopping");
self.flushing.store(true, Ordering::SeqCst);
- gst::debug!(SINK_CAT, obj: element, "Stopped");
+ gst::debug!(SINK_CAT, imp: self, "Stopped");
}
- }
- impl ElementSinkTest {
- pub fn push_flush_start(&self, element: &super::ElementSinkTest) {
- gst::debug!(SINK_CAT, obj: element, "Pushing FlushStart");
+ pub fn push_flush_start(&self) {
+ gst::debug!(SINK_CAT, imp: self, "Pushing FlushStart");
self.sink_pad
.gst_pad()
.push_event(gst::event::FlushStart::new());
- gst::debug!(SINK_CAT, obj: element, "FlushStart pushed");
+ gst::debug!(SINK_CAT, imp: self, "FlushStart pushed");
}
- pub fn push_flush_stop(&self, element: &super::ElementSinkTest) {
- gst::debug!(SINK_CAT, obj: element, "Pushing FlushStop");
+ pub fn push_flush_stop(&self) {
+ gst::debug!(SINK_CAT, imp: self, "Pushing FlushStop");
self.sink_pad
.gst_pad()
.push_event(gst::event::FlushStop::new(true));
- gst::debug!(SINK_CAT, obj: element, "FlushStop pushed");
+ gst::debug!(SINK_CAT, imp: self, "FlushStop pushed");
}
}
@@ -647,13 +634,7 @@ mod imp_sink {
PROPERTIES.as_ref()
}
- fn set_property(
- &self,
- _obj: &Self::Type,
- _id: usize,
- value: &glib::Value,
- pspec: &glib::ParamSpec,
- ) {
+ fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
match pspec.name() {
"sender" => {
let ItemSender { sender } = value
@@ -666,9 +647,10 @@ mod imp_sink {
}
}
- fn constructed(&self, obj: &Self::Type) {
- self.parent_constructed(obj);
+ fn constructed(&self) {
+ self.parent_constructed();
+ let obj = self.instance();
obj.add_pad(self.sink_pad.gst_pad()).unwrap();
}
}
@@ -708,19 +690,18 @@ mod imp_sink {
fn change_state(
&self,
- element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
- gst::log!(SINK_CAT, obj: element, "Changing state {:?}", transition);
+ gst::log!(SINK_CAT, imp: self, "Changing state {:?}", transition);
if let gst::StateChange::PausedToReady = transition {
- self.stop(element);
+ self.stop();
}
- let success = self.parent_change_state(element, transition)?;
+ let success = self.parent_change_state(transition)?;
if let gst::StateChange::ReadyToPaused = transition {
- self.start(element);
+ self.start();
}
Ok(success)
@@ -1225,13 +1206,13 @@ fn start_flush() {
let elem_sink_test = sink_element.imp();
- elem_sink_test.push_flush_start(&sink_element);
+ elem_sink_test.push_flush_start();
elem_src_test
.try_push(Item::Buffer(gst::Buffer::from_slice(vec![5, 6, 7])))
.unwrap_err();
- elem_sink_test.push_flush_stop(&sink_element);
+ elem_sink_test.push_flush_stop();
elem_src_test
.try_push(Item::Event(gst::event::Segment::new(