Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/sdroege/gst-plugin-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Dröge <sebastian@centricular.com>2022-10-09 16:06:59 +0300
committerSebastian Dröge <sebastian@centricular.com>2022-10-10 15:03:25 +0300
commit7ee4afacf413b2e3c386bb1070994ed4325994e6 (patch)
treeeddcc0e047ab4704e5a459dd551a55196e8a1848 /net/raptorq
parent7818ac658b02417fda071ce025b6d6a7fdb54a76 (diff)
Change *Impl trait methods to only take &self and not Self::Type in addition
Diffstat (limited to 'net/raptorq')
-rw-r--r--net/raptorq/src/raptorqdec/imp.rs141
-rw-r--r--net/raptorq/src/raptorqenc/imp.rs210
2 files changed, 143 insertions, 208 deletions
diff --git a/net/raptorq/src/raptorqdec/imp.rs b/net/raptorq/src/raptorqdec/imp.rs
index 7b30946a..0d6212c5 100644
--- a/net/raptorq/src/raptorqdec/imp.rs
+++ b/net/raptorq/src/raptorqdec/imp.rs
@@ -146,11 +146,7 @@ pub struct RaptorqDec {
}
impl RaptorqDec {
- fn process_source_block(
- &self,
- element: &super::RaptorqDec,
- state: &mut State,
- ) -> Result<gst::FlowSuccess, gst::FlowError> {
+ fn process_source_block(&self, state: &mut State) -> Result<gst::FlowSuccess, gst::FlowError> {
// Pull the information about the current Source Block from sequence.
// Data packets for current Source Block are in range: info.seq_range(),
// Repair Packets on the other hand, for a given block share the same key
@@ -169,7 +165,7 @@ impl RaptorqDec {
if data_packets_num == n {
gst::trace!(
CAT,
- obj: element,
+ imp: self,
"All packets ({}) received, dropping Source Block ({})",
data_packets_num,
seq_lo
@@ -304,7 +300,7 @@ impl RaptorqDec {
gst::debug!(
CAT,
- obj: element,
+ imp: self,
"Succesfully recovered packet: seqnum: {}, len: {}, ts: {}",
rtpbuf.seq(),
rtpbuf.payload_size(),
@@ -323,19 +319,18 @@ impl RaptorqDec {
fn store_media_packet(
&self,
- element: &super::RaptorqDec,
state: &mut State,
buffer: &gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let this_seq = {
let rtpbuf = RTPBuffer::from_buffer_readable(buffer).map_err(|err| {
- gst::error!(CAT, obj: element, "Failed to map rtp buffer : {}", err);
+ gst::error!(CAT, imp: self, "Failed to map rtp buffer : {}", err);
gst::FlowError::Error
})?;
gst::trace!(
CAT,
- obj: element,
+ imp: self,
"New data packet, seq {}, ts {}",
rtpbuf.seq(),
rtpbuf.timestamp()
@@ -383,18 +378,17 @@ impl RaptorqDec {
fn sink_chain(
&self,
_pad: &gst::Pad,
- element: &super::RaptorqDec,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut state = self.state.lock().unwrap();
- self.store_media_packet(element, &mut state, &buffer)?;
+ self.store_media_packet(&mut state, &buffer)?;
// Retire the packets that have been around for too long
let expired = state.expire_packets();
for seq in expired {
gst::trace!(
CAT,
- obj: element,
+ imp: self,
"Source Block ({}) dropped, because max wait time has been exceeded",
seq as u16
);
@@ -405,16 +399,16 @@ impl RaptorqDec {
if thresh > 0 && state.media_packets.len() >= thresh {
gst::warning!(
CAT,
- obj: element,
+ imp: self,
"Too many buffered media packets, resetting decoder. This might \
be because we haven't received a repair packet for too long, or \
repair packets have no valid timestamps.",
);
- self.reset(element);
+ self.reset();
}
- self.process_source_block(element, &mut state)?;
+ self.process_source_block(&mut state)?;
drop(state);
self.srcpad.push(buffer)
@@ -423,17 +417,16 @@ impl RaptorqDec {
fn fec_sink_chain(
&self,
_pad: &gst::Pad,
- element: &super::RaptorqDec,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let rtpbuf = RTPBuffer::from_buffer_readable(&buffer).map_err(|err| {
- gst::error!(CAT, obj: element, "Failed to map rtp buffer : {}", err);
+ gst::error!(CAT, imp: self, "Failed to map rtp buffer : {}", err);
gst::FlowError::Error
})?;
let payload = rtpbuf.payload().unwrap();
let payload_id = payload[0..7].try_into().map_err(|err| {
- gst::error!(CAT, obj: element, "Unexpected rtp fec payload : {}", err);
+ gst::error!(CAT, imp: self, "Unexpected rtp fec payload : {}", err);
gst::FlowError::Error
})?;
@@ -447,7 +440,7 @@ impl RaptorqDec {
gst::trace!(
CAT,
- obj: element,
+ imp: self,
"New repair packet, I: {}, LP: {}, LB: {}",
i,
lp,
@@ -496,30 +489,25 @@ impl RaptorqDec {
Ok(gst::FlowSuccess::Ok)
}
- fn sink_event(&self, pad: &gst::Pad, element: &super::RaptorqDec, event: gst::Event) -> bool {
+ fn sink_event(&self, pad: &gst::Pad, event: gst::Event) -> bool {
gst::debug!(CAT, "Handling event {:?}", event);
use gst::EventView;
if let EventView::FlushStop(_) = event.view() {
- self.reset(element);
+ self.reset();
}
- pad.event_default(Some(element), event)
+ pad.event_default(Some(&*self.instance()), event)
}
- fn fec_sink_event(
- &self,
- pad: &gst::Pad,
- element: &super::RaptorqDec,
- event: gst::Event,
- ) -> bool {
+ fn fec_sink_event(&self, pad: &gst::Pad, event: gst::Event) -> bool {
gst::debug!(CAT, "Handling event {:?}", event);
use gst::EventView;
if let EventView::Caps(c) = event.view() {
- if let Err(err) = self.start(element, c.caps()) {
- gst::element_error!(
- element,
+ if let Err(err) = self.start(c.caps()) {
+ gst::element_imp_error!(
+ self,
gst::CoreError::Event,
["Failed to start raptorqdec {:?}", err]
);
@@ -528,14 +516,10 @@ impl RaptorqDec {
}
}
- pad.event_default(Some(element), event)
+ pad.event_default(Some(&*self.instance()), event)
}
- fn iterate_internal_links(
- &self,
- pad: &gst::Pad,
- _element: &super::RaptorqDec,
- ) -> gst::Iterator<gst::Pad> {
+ fn iterate_internal_links(&self, pad: &gst::Pad) -> gst::Iterator<gst::Pad> {
if pad == &self.srcpad {
gst::Iterator::from_vec(vec![self.sinkpad.clone()])
} else if pad == &self.sinkpad {
@@ -545,21 +529,17 @@ impl RaptorqDec {
}
}
- fn start(
- &self,
- element: &super::RaptorqDec,
- incaps: &gst::CapsRef,
- ) -> Result<(), gst::ErrorMessage> {
+ fn start(&self, incaps: &gst::CapsRef) -> Result<(), gst::ErrorMessage> {
let symbol_size = fmtp_param_from_caps::<usize>("t", incaps)?;
if symbol_size > fecscheme::MAX_ENCODING_SYMBOL_SIZE {
- let details = format!(
- "Symbol size exceeds Maximum Encoding Symbol Size: {}",
- fecscheme::MAX_ENCODING_SYMBOL_SIZE
- );
-
- gst::element_error!(element, gst::CoreError::Failed, [&details]);
- return Err(error_msg!(gst::CoreError::Failed, [&details]));
+ return Err(error_msg!(
+ gst::CoreError::Failed,
+ [
+ "Symbol size exceeds Maximum Encoding Symbol Size: {}",
+ fecscheme::MAX_ENCODING_SYMBOL_SIZE
+ ]
+ ));
}
let settings = self.settings.lock().unwrap();
@@ -573,7 +553,7 @@ impl RaptorqDec {
let media_packets_reset_threshold = settings.media_packets_reset_threshold as usize;
- gst::debug!(CAT, obj: element, "Configured for caps {}", incaps);
+ gst::debug!(CAT, imp: self, "Configured for caps {}", incaps);
let mut state = self.state.lock().unwrap();
@@ -584,11 +564,11 @@ impl RaptorqDec {
Ok(())
}
- fn stop(&self, element: &super::RaptorqDec) {
- self.reset(element);
+ fn stop(&self) {
+ self.reset();
}
- fn reset(&self, _element: &super::RaptorqDec) {
+ fn reset(&self) {
let mut state = self.state.lock().unwrap();
state.media_packets.clear();
@@ -615,21 +595,17 @@ impl ObjectSubclass for RaptorqDec {
Self::catch_panic_pad_function(
parent,
|| Err(gst::FlowError::Error),
- |this, element| this.sink_chain(pad, element, buffer),
+ |this| this.sink_chain(pad, buffer),
)
})
.event_function(|pad, parent, event| {
- Self::catch_panic_pad_function(
- parent,
- || false,
- |this, element| this.sink_event(pad, element, event),
- )
+ Self::catch_panic_pad_function(parent, || false, |this| this.sink_event(pad, event))
})
.iterate_internal_links_function(|pad, parent| {
Self::catch_panic_pad_function(
parent,
|| gst::Iterator::from_vec(vec![]),
- |this, element| this.iterate_internal_links(pad, element),
+ |this| this.iterate_internal_links(pad),
)
})
.flags(gst::PadFlags::PROXY_CAPS)
@@ -641,7 +617,7 @@ impl ObjectSubclass for RaptorqDec {
Self::catch_panic_pad_function(
parent,
|| gst::Iterator::from_vec(vec![]),
- |this, element| this.iterate_internal_links(pad, element),
+ |this| this.iterate_internal_links(pad),
)
})
.flags(gst::PadFlags::PROXY_CAPS)
@@ -688,13 +664,7 @@ impl ObjectImpl for RaptorqDec {
PROPERTIES.as_ref()
}
- fn set_property(
- &self,
- _obj: &Self::Type,
- _id: usize,
- value: &glib::Value,
- pspec: &glib::ParamSpec,
- ) {
+ fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
match pspec.name() {
"repair-window-tolerance" => {
let mut settings = self.settings.lock().unwrap();
@@ -711,7 +681,7 @@ impl ObjectImpl for RaptorqDec {
}
}
- fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
+ fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
"repair-window-tolerance" => {
let settings = self.settings.lock().unwrap();
@@ -746,9 +716,10 @@ impl ObjectImpl for RaptorqDec {
_ => unimplemented!(),
}
}
- fn constructed(&self, obj: &Self::Type) {
- self.parent_constructed(obj);
+ fn constructed(&self) {
+ self.parent_constructed();
+ let obj = self.instance();
obj.add_pad(&self.sinkpad).unwrap();
obj.add_pad(&self.srcpad).unwrap();
}
@@ -815,27 +786,25 @@ impl ElementImpl for RaptorqDec {
fn change_state(
&self,
- element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
- gst::trace!(CAT, obj: element, "Changing state {:?}", transition);
+ gst::trace!(CAT, imp: self, "Changing state {:?}", transition);
match transition {
gst::StateChange::ReadyToPaused => {
- self.reset(element);
+ self.reset();
}
gst::StateChange::PausedToReady => {
- self.stop(element);
+ self.stop();
}
_ => (),
}
- self.parent_change_state(element, transition)
+ self.parent_change_state(transition)
}
fn request_new_pad(
&self,
- element: &Self::Type,
templ: &gst::PadTemplate,
name: Option<String>,
_caps: Option<&gst::Caps>,
@@ -843,8 +812,8 @@ impl ElementImpl for RaptorqDec {
let mut sinkpad_fec_guard = self.sinkpad_fec.lock().unwrap();
if sinkpad_fec_guard.is_some() {
- gst::element_error!(
- element,
+ gst::element_imp_error!(
+ self,
gst::CoreError::Pad,
["Not accepting more than one FEC stream"]
);
@@ -857,21 +826,21 @@ impl ElementImpl for RaptorqDec {
Self::catch_panic_pad_function(
parent,
|| Err(gst::FlowError::Error),
- |this, element| this.fec_sink_chain(pad, element, buffer),
+ |this| this.fec_sink_chain(pad, buffer),
)
})
.event_function(|pad, parent, event| {
Self::catch_panic_pad_function(
parent,
|| false,
- |this, element| this.fec_sink_event(pad, element, event),
+ |this| this.fec_sink_event(pad, event),
)
})
.iterate_internal_links_function(|pad, parent| {
Self::catch_panic_pad_function(
parent,
|| gst::Iterator::from_vec(vec![]),
- |this, element| this.iterate_internal_links(pad, element),
+ |this| this.iterate_internal_links(pad),
)
})
.build();
@@ -881,18 +850,18 @@ impl ElementImpl for RaptorqDec {
drop(sinkpad_fec_guard);
- element.add_pad(&sinkpad_fec).unwrap();
+ self.instance().add_pad(&sinkpad_fec).unwrap();
Some(sinkpad_fec)
}
- fn release_pad(&self, element: &Self::Type, _pad: &gst::Pad) {
+ fn release_pad(&self, _pad: &gst::Pad) {
let mut pad_guard = self.sinkpad_fec.lock().unwrap();
if let Some(pad) = pad_guard.take() {
drop(pad_guard);
pad.set_active(false).unwrap();
- element.remove_pad(&pad).unwrap();
+ self.instance().remove_pad(&pad).unwrap();
}
}
}
diff --git a/net/raptorq/src/raptorqenc/imp.rs b/net/raptorq/src/raptorqenc/imp.rs
index 1dbc8785..6333b540 100644
--- a/net/raptorq/src/raptorqenc/imp.rs
+++ b/net/raptorq/src/raptorqenc/imp.rs
@@ -4,7 +4,7 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
-use gst::{element_error, error_msg, glib, loggable_error};
+use gst::{element_imp_error, error_msg, glib, loggable_error};
use gst::prelude::*;
use gst::subclass::prelude::*;
@@ -105,7 +105,7 @@ pub struct RaptorqEnc {
impl RaptorqEnc {
fn process_source_block(
- element: &super::RaptorqEnc,
+ &self,
state: &mut State,
now_pts: Option<gst::ClockTime>,
now_dts: Option<gst::ClockTime>,
@@ -132,13 +132,7 @@ impl RaptorqEnc {
// placed in each repair packet.
let si = state.symbols_per_packet;
- gst::trace!(
- CAT,
- obj: element,
- "Source Block add ADU: si {}, li {}",
- si,
- li
- );
+ gst::trace!(CAT, imp: self, "Source Block add ADU: si {}, li {}", si, li);
let mut data = vec![0; si * state.symbol_size];
@@ -186,7 +180,7 @@ impl RaptorqEnc {
.map(|n| gst::ClockTime::from_mseconds((n * delay_step) as u64))
.collect::<Vec<_>>();
- let base_time = element.base_time();
+ let base_time = self.instance().base_time();
let running_time = state.segment.to_running_time(now_pts);
for (target_time, repair_packet) in Iterator::zip(
@@ -261,7 +255,7 @@ impl RaptorqEnc {
Ok(gst::FlowSuccess::Ok)
}
- fn start_task(&self, element: &super::RaptorqEnc) -> Result<(), gst::LoggableError> {
+ fn start_task(&self) -> Result<(), gst::LoggableError> {
let (sender, receiver) = mpsc::channel();
let mut state_guard = self.state.lock().unwrap();
@@ -270,36 +264,25 @@ impl RaptorqEnc {
state.sender = Some(sender);
drop(state_guard);
- let element_weak = element.downgrade();
- let pad_weak = self.srcpad_fec.downgrade();
+ let self_ = self.ref_counted();
let mut eos = false;
self.srcpad_fec
.start_task(move || {
while let Ok(msg) = receiver.recv() {
- let pad = match pad_weak.upgrade() {
- Some(pad) => pad,
- None => break,
- };
-
- let element = match element_weak.upgrade() {
- Some(element) => element,
- None => break,
- };
-
match msg {
SrcTaskMsg::Timeout((id, buf)) => {
- let mut timers = element.imp().pending_timers.lock().unwrap();
+ let mut timers = self_.pending_timers.lock().unwrap();
let _ = timers.remove(&id);
let push_eos = eos && timers.is_empty();
drop(timers);
- if let Err(err) = pad.push(buf) {
- gst::element_error!(
- element,
+ if let Err(err) = self_.srcpad_fec.push(buf) {
+ gst::element_imp_error!(
+ self_,
gst::CoreError::Pad,
["Failed to push on src FEC pad {:?}", err]
);
@@ -308,7 +291,7 @@ impl RaptorqEnc {
}
if push_eos {
- pad.push_event(gst::event::Eos::new());
+ self_.srcpad_fec.push_event(gst::event::Eos::new());
break;
}
}
@@ -317,9 +300,9 @@ impl RaptorqEnc {
Some(target) => target,
None => {
// No target, push buffer immediately
- if let Err(err) = pad.push(buf) {
- gst::element_error!(
- element,
+ if let Err(err) = self_.srcpad_fec.push(buf) {
+ gst::element_imp_error!(
+ self_,
gst::CoreError::Pad,
["Failed to push on src FEC pad {:?}", err]
);
@@ -330,13 +313,13 @@ impl RaptorqEnc {
}
};
- let clock = match element.clock() {
+ let clock = match self_.instance().clock() {
Some(clock) => clock,
None => {
// No clock provided, push buffer immediately
- if let Err(err) = pad.push(buf) {
- gst::element_error!(
- element,
+ if let Err(err) = self_.srcpad_fec.push(buf) {
+ gst::element_imp_error!(
+ self_,
gst::CoreError::Pad,
["Failed to push on src FEC pad {:?}", err]
);
@@ -348,7 +331,7 @@ impl RaptorqEnc {
};
let timeout_sender = {
- let state_guard = element.imp().state.lock().unwrap();
+ let state_guard = self_.state.lock().unwrap();
let state = match state_guard.as_ref() {
Some(state) => state,
None => break,
@@ -359,7 +342,7 @@ impl RaptorqEnc {
let timeout = clock.new_single_shot_id(target);
- let mut timers = element.imp().pending_timers.lock().unwrap();
+ let mut timers = self_.pending_timers.lock().unwrap();
timers.insert(timeout.clone().into());
timeout
@@ -370,8 +353,8 @@ impl RaptorqEnc {
.expect("Failed to wait async");
}
SrcTaskMsg::Eos => {
- if element.imp().pending_timers.lock().unwrap().is_empty() {
- pad.push_event(gst::event::Eos::new());
+ if self_.pending_timers.lock().unwrap().is_empty() {
+ self_.srcpad_fec.push_event(gst::event::Eos::new());
break;
}
@@ -381,12 +364,7 @@ impl RaptorqEnc {
}
// All senders dropped or error
- let pad = match pad_weak.upgrade() {
- Some(pad) => pad,
- None => return,
- };
-
- let _ = pad.pause_task();
+ let _ = self_.srcpad_fec.pause_task();
})
.map_err(|_| loggable_error!(CAT, "Failed to start pad task"))?;
@@ -396,12 +374,11 @@ impl RaptorqEnc {
fn src_activatemode(
&self,
_pad: &gst::Pad,
- element: &super::RaptorqEnc,
_mode: gst::PadMode,
active: bool,
) -> Result<(), gst::LoggableError> {
if active {
- self.start_task(element)?;
+ self.start_task()?;
} else {
// element stop should be called at this point so that all mpsc
// senders used in task are dropped, otherwise channel can deadlock
@@ -414,28 +391,27 @@ impl RaptorqEnc {
fn sink_chain(
&self,
_pad: &gst::Pad,
- element: &super::RaptorqEnc,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut state_guard = self.state.lock().unwrap();
let state = state_guard.as_mut().ok_or(gst::FlowError::NotNegotiated)?;
if buffer.size() > state.mtu {
- gst::error!(CAT, obj: element, "Packet length exceeds configured MTU");
+ gst::error!(CAT, imp: self, "Packet length exceeds configured MTU");
return Err(gst::FlowError::NotSupported);
}
let (curr_seq, now_rtpts) = match RTPBuffer::from_buffer_readable(&buffer) {
Ok(rtpbuf) => (rtpbuf.seq(), rtpbuf.timestamp()),
Err(_) => {
- gst::error!(CAT, obj: element, "Mapping to RTP packet failed");
+ gst::error!(CAT, imp: self, "Mapping to RTP packet failed");
return Err(gst::FlowError::NotSupported);
}
};
if let Some(last_seq) = state.seqnums.last() {
if last_seq.overflowing_add(1).0 != curr_seq {
- gst::error!(CAT, obj: element, "Got out of sequence packets");
+ gst::error!(CAT, imp: self, "Got out of sequence packets");
return Err(gst::FlowError::NotSupported);
}
}
@@ -450,22 +426,22 @@ impl RaptorqEnc {
let now_pts = buffer.pts();
let now_dts = buffer.dts_or_pts();
- Self::process_source_block(element, state, now_pts, now_dts, now_rtpts)?;
+ self.process_source_block(state, now_pts, now_dts, now_rtpts)?;
}
drop(state_guard);
self.srcpad.push(buffer)
}
- fn sink_event(&self, pad: &gst::Pad, element: &super::RaptorqEnc, event: gst::Event) -> bool {
+ fn sink_event(&self, pad: &gst::Pad, event: gst::Event) -> bool {
gst::debug!(CAT, "Handling event {:?}", event);
use gst::EventView;
match event.view() {
EventView::FlushStart(_) => {
- if let Err(err) = self.stop(element) {
- element_error!(
- element,
+ if let Err(err) = self.stop() {
+ element_imp_error!(
+ self,
gst::CoreError::Event,
["Failed to stop encoder after flush start {:?}", err]
);
@@ -475,9 +451,9 @@ impl RaptorqEnc {
let _ = self.srcpad_fec.set_active(false);
}
EventView::FlushStop(_) => {
- if let Err(err) = self.start(element) {
- element_error!(
- element,
+ if let Err(err) = self.start() {
+ element_imp_error!(
+ self,
gst::CoreError::Event,
["Failed to start encoder after flush stop {:?}", err]
);
@@ -499,7 +475,7 @@ impl RaptorqEnc {
// delayed repair packets.
if let Ok(clock_rate) = s.get::<i32>("clock-rate") {
if clock_rate <= 0 {
- element_error!(element, gst::CoreError::Event, ["Invalid clock rate"]);
+ element_imp_error!(self, gst::CoreError::Event, ["Invalid clock rate"]);
return false;
}
@@ -515,8 +491,8 @@ impl RaptorqEnc {
let segment = match segment.downcast::<gst::ClockTime>() {
Ok(segment) => segment,
Err(_) => {
- element_error!(
- element,
+ element_imp_error!(
+ self,
gst::CoreError::Event,
["Only time segments are supported"]
);
@@ -527,8 +503,10 @@ impl RaptorqEnc {
state.segment = segment.clone();
// Push stream events on FEC srcpad as well
- let pad = &self.srcpad_fec;
- let stream_id = pad.create_stream_id(element, Some("fec")).to_string();
+ let stream_id = self
+ .srcpad_fec
+ .create_stream_id(&*self.instance(), Some("fec"))
+ .to_string();
let kmax = extended_source_block_symbols(state.symbols_per_block as u32);
let scheme_id = fecscheme::FEC_SCHEME_ID;
@@ -548,9 +526,11 @@ impl RaptorqEnc {
drop(state_guard);
- pad.push_event(gst::event::StreamStart::new(&stream_id));
- pad.push_event(gst::event::Caps::new(&caps));
- pad.push_event(gst::event::Segment::new(&segment));
+ self.srcpad_fec
+ .push_event(gst::event::StreamStart::new(&stream_id));
+ self.srcpad_fec.push_event(gst::event::Caps::new(&caps));
+ self.srcpad_fec
+ .push_event(gst::event::Segment::new(&segment));
}
}
EventView::Eos(_) => {
@@ -563,14 +543,10 @@ impl RaptorqEnc {
_ => (),
}
- pad.event_default(Some(element), event)
+ pad.event_default(Some(&*self.instance()), event)
}
- fn iterate_internal_links(
- &self,
- pad: &gst::Pad,
- _element: &super::RaptorqEnc,
- ) -> gst::Iterator<gst::Pad> {
+ fn iterate_internal_links(&self, pad: &gst::Pad) -> gst::Iterator<gst::Pad> {
if pad == &self.sinkpad {
gst::Iterator::from_vec(vec![self.srcpad.clone()])
} else if pad == &self.srcpad {
@@ -580,7 +556,7 @@ impl RaptorqEnc {
}
}
- fn start(&self, element: &super::RaptorqEnc) -> Result<(), gst::ErrorMessage> {
+ fn start(&self) -> Result<(), gst::ErrorMessage> {
let settings = self.settings.lock().unwrap();
let protected_packets_num = settings.protected_packets as usize;
@@ -598,38 +574,38 @@ impl RaptorqEnc {
let symbols_per_block = symbols_per_packet * protected_packets_num;
if symbol_size.rem_euclid(SYMBOL_ALIGNMENT) != 0 {
- let details = format!(
- "Symbol size is not multiple of Symbol Alignment {}",
- SYMBOL_ALIGNMENT
- );
-
- gst::element_error!(element, gst::CoreError::Failed, [&details]);
- return Err(error_msg!(gst::CoreError::Failed, [&details]));
+ return Err(error_msg!(
+ gst::CoreError::Failed,
+ [
+ "Symbol size is not multiple of Symbol Alignment {}",
+ SYMBOL_ALIGNMENT
+ ]
+ ));
}
if symbol_size > fecscheme::MAX_ENCODING_SYMBOL_SIZE {
- let details = format!(
- "Symbol size exceeds Maximum Encoding Symbol Size: {}",
- fecscheme::MAX_ENCODING_SYMBOL_SIZE
- );
-
- gst::element_error!(element, gst::CoreError::Failed, [&details]);
- return Err(error_msg!(gst::CoreError::Failed, [&details]));
+ return Err(error_msg!(
+ gst::CoreError::Failed,
+ [
+ "Symbol size exceeds Maximum Encoding Symbol Size: {}",
+ fecscheme::MAX_ENCODING_SYMBOL_SIZE
+ ]
+ ));
}
if symbols_per_block > fecscheme::MAX_SOURCE_BLOCK_LEN {
- let details = format!(
- "Source block length exceeds Maximum Source Block Length: {}",
- fecscheme::MAX_SOURCE_BLOCK_LEN
- );
-
- gst::element_error!(element, gst::CoreError::Failed, [&details]);
- return Err(error_msg!(gst::CoreError::Failed, [&details]));
+ return Err(error_msg!(
+ gst::CoreError::Failed,
+ [
+ "Source block length exceeds Maximum Source Block Length: {}",
+ fecscheme::MAX_SOURCE_BLOCK_LEN
+ ]
+ ));
}
gst::info!(
CAT,
- obj: element,
+ imp: self,
"Starting RaptorQ Encoder, Symbols per Block: {}, Symbol Size: {}",
symbols_per_block,
symbol_size
@@ -663,7 +639,7 @@ impl RaptorqEnc {
Ok(())
}
- fn stop(&self, _element: &super::RaptorqEnc) -> Result<(), gst::ErrorMessage> {
+ fn stop(&self) -> Result<(), gst::ErrorMessage> {
let mut timers = self.pending_timers.lock().unwrap();
for timer in timers.drain() {
timer.unschedule();
@@ -688,21 +664,17 @@ impl ObjectSubclass for RaptorqEnc {
Self::catch_panic_pad_function(
parent,
|| Err(gst::FlowError::Error),
- |this, element| this.sink_chain(pad, element, buffer),
+ |this| this.sink_chain(pad, buffer),
)
})
.event_function(|pad, parent, event| {
- Self::catch_panic_pad_function(
- parent,
- || false,
- |this, element| this.sink_event(pad, element, event),
- )
+ Self::catch_panic_pad_function(parent, || false, |this| this.sink_event(pad, event))
})
.iterate_internal_links_function(|pad, parent| {
Self::catch_panic_pad_function(
parent,
|| gst::Iterator::from_vec(vec![]),
- |this, element| this.iterate_internal_links(pad, element),
+ |this| this.iterate_internal_links(pad),
)
})
.flags(gst::PadFlags::PROXY_CAPS)
@@ -714,7 +686,7 @@ impl ObjectSubclass for RaptorqEnc {
Self::catch_panic_pad_function(
parent,
|| gst::Iterator::from_vec(vec![]),
- |this, element| this.iterate_internal_links(pad, element),
+ |this| this.iterate_internal_links(pad),
)
})
.flags(gst::PadFlags::PROXY_CAPS)
@@ -726,14 +698,14 @@ impl ObjectSubclass for RaptorqEnc {
Self::catch_panic_pad_function(
parent,
|| Err(loggable_error!(CAT, "Panic activating src pad with mode")),
- |this, element| this.src_activatemode(pad, element, mode, active),
+ |this| this.src_activatemode(pad, mode, active),
)
})
.iterate_internal_links_function(|pad, parent| {
Self::catch_panic_pad_function(
parent,
|| gst::Iterator::from_vec(vec![]),
- |this, element| this.iterate_internal_links(pad, element),
+ |this| this.iterate_internal_links(pad),
)
})
.build();
@@ -805,13 +777,7 @@ impl ObjectImpl for RaptorqEnc {
PROPERTIES.as_ref()
}
- fn set_property(
- &self,
- _obj: &Self::Type,
- _id: usize,
- value: &glib::Value,
- pspec: &glib::ParamSpec,
- ) {
+ fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
match pspec.name() {
"protected-packets" => {
let mut settings = self.settings.lock().unwrap();
@@ -847,7 +813,7 @@ impl ObjectImpl for RaptorqEnc {
}
}
- fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
+ fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
"protected-packets" => {
let settings = self.settings.lock().unwrap();
@@ -877,9 +843,10 @@ impl ObjectImpl for RaptorqEnc {
}
}
- fn constructed(&self, obj: &Self::Type) {
- self.parent_constructed(obj);
+ fn constructed(&self) {
+ self.parent_constructed();
+ let obj = self.instance();
obj.add_pad(&self.sinkpad).unwrap();
obj.add_pad(&self.srcpad).unwrap();
obj.add_pad(&self.srcpad_fec).unwrap();
@@ -940,21 +907,20 @@ impl ElementImpl for RaptorqEnc {
fn change_state(
&self,
- element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
- gst::trace!(CAT, obj: element, "Changing state {:?}", transition);
+ gst::trace!(CAT, imp: self, "Changing state {:?}", transition);
match transition {
gst::StateChange::ReadyToPaused => {
- self.start(element).map_err(|_| gst::StateChangeError)?;
+ self.start().map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::PausedToReady => {
- self.stop(element).map_err(|_| gst::StateChangeError)?;
+ self.stop().map_err(|_| gst::StateChangeError)?;
}
_ => (),
}
- self.parent_change_state(element, transition)
+ self.parent_change_state(transition)
}
}