Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrançois Laignel <fengalin@free.fr>2021-10-09 13:17:05 +0300
committerFrançois Laignel <fengalin@free.fr>2021-10-18 16:09:47 +0300
commit27b9f0d868f436e9b2bcc3e51f393c40b56fcc02 (patch)
tree93c0db7b1cf26ea7d0e3a4d70a7d2844c2e00975 /generic/threadshare
parentbd8a7e8df7e8ebf751b2d00fe6a096d726683c00 (diff)
Improve usability thanks to opt-ops
The crate option-operations simplifies usage when dealing with `Option`s, which is often the case with `ClockTime`.
Diffstat (limited to 'generic/threadshare')
-rw-r--r--generic/threadshare/src/appsrc/imp.rs8
-rw-r--r--generic/threadshare/src/inputselector/imp.rs18
-rw-r--r--generic/threadshare/src/jitterbuffer/imp.rs23
-rw-r--r--generic/threadshare/src/proxy/imp.rs3
-rw-r--r--generic/threadshare/src/queue/imp.rs3
-rw-r--r--generic/threadshare/src/runtime/task.rs8
-rw-r--r--generic/threadshare/src/socket.rs6
-rw-r--r--generic/threadshare/src/tcpclientsrc/imp.rs3
-rw-r--r--generic/threadshare/src/udpsink/imp.rs18
-rw-r--r--generic/threadshare/src/udpsrc/imp.rs3
10 files changed, 25 insertions, 68 deletions
diff --git a/generic/threadshare/src/appsrc/imp.rs b/generic/threadshare/src/appsrc/imp.rs
index 2f5dd4284..a0d9cb3e4 100644
--- a/generic/threadshare/src/appsrc/imp.rs
+++ b/generic/threadshare/src/appsrc/imp.rs
@@ -38,8 +38,7 @@ use crate::runtime::prelude::*;
use crate::runtime::{Context, PadSrc, PadSrcRef, PadSrcWeak, Task, TaskState};
const DEFAULT_CONTEXT: &str = "";
-// FIXME use Duration::ZERO when MSVC >= 1.53.2
-const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0);
+const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO;
const DEFAULT_CAPS: Option<gst::Caps> = None;
const DEFAULT_MAX_BUFFERS: u32 = 10;
const DEFAULT_DO_TIMESTAMP: bool = false;
@@ -391,10 +390,7 @@ impl AppSrc {
let now = clock.time();
let buffer = buffer.make_mut();
- buffer.set_dts(
- now.zip(base_time)
- .and_then(|(now, base_time)| now.checked_sub(base_time)),
- );
+ buffer.set_dts(now.opt_checked_sub(base_time).ok().flatten());
buffer.set_pts(None);
} else {
gst_error!(CAT, obj: element, "Don't have a clock yet");
diff --git a/generic/threadshare/src/inputselector/imp.rs b/generic/threadshare/src/inputselector/imp.rs
index 0bccc227b..121f882b9 100644
--- a/generic/threadshare/src/inputselector/imp.rs
+++ b/generic/threadshare/src/inputselector/imp.rs
@@ -35,8 +35,7 @@ use crate::runtime::prelude::*;
use crate::runtime::{self, PadSink, PadSinkRef, PadSrc, PadSrcRef};
const DEFAULT_CONTEXT: &str = "";
-// FIXME use Duration::ZERO when MSVC >= 1.53.2
-const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0);
+const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO;
#[derive(Debug, Clone)]
struct Settings {
@@ -82,13 +81,9 @@ impl InputSelectorPadSinkHandler {
) {
let now = element.current_running_time();
- match running_time
- .into()
- .zip(now)
- .and_then(|(running_time, now)| running_time.checked_sub(now))
- {
- Some(delay) => runtime::time::delay_for(delay.into()).await,
- None => runtime::executor::yield_now().await,
+ match running_time.into().opt_checked_sub(now) {
+ Ok(Some(delay)) => runtime::time::delay_for(delay.into()).await,
+ _ => runtime::executor::yield_now().await,
}
}
@@ -315,10 +310,7 @@ impl PadSrcHandler for InputSelectorPadSrcHandler {
let (live, min, max) = peer_query.result();
if live {
min_latency = min.max(min_latency);
- max_latency = max
- .zip(max_latency)
- .map(|(max, max_latency)| max.min(max_latency))
- .or(max);
+ max_latency = max.opt_min(max_latency).or(max);
}
}
}
diff --git a/generic/threadshare/src/jitterbuffer/imp.rs b/generic/threadshare/src/jitterbuffer/imp.rs
index a27e1fa07..36209335b 100644
--- a/generic/threadshare/src/jitterbuffer/imp.rs
+++ b/generic/threadshare/src/jitterbuffer/imp.rs
@@ -212,10 +212,7 @@ impl SinkHandler {
) {
if inner.ips_rtptime != Some(rtptime) {
let pts = pts.into();
- let new_packet_spacing = inner
- .ips_pts
- .zip(pts)
- .and_then(|(ips_pts, pts)| pts.checked_sub(ips_pts));
+ let new_packet_spacing = pts.opt_checked_sub(inner.ips_pts).ok().flatten();
if let Some(new_packet_spacing) = new_packet_spacing {
let old_packet_spacing = state.packet_spacing;
@@ -744,7 +741,6 @@ impl SrcHandler {
&[
("seqnum", &(lost_seqnum as u32)),
("timestamp", &timestamp),
- // FIXME would probably make sense being a ClockTime
("duration", &duration.nseconds()),
("retry", &0),
],
@@ -864,8 +860,7 @@ impl SrcHandler {
if state.eos {
gst_debug!(CAT, obj: element, "EOS, not waiting");
- // FIXME use Duration::ZERO when MSVC >= 1.53.2
- return (now, Some((now, Duration::from_nanos(0))));
+ return (now, Some((now, Duration::ZERO)));
}
if state.earliest_pts.is_none() {
@@ -877,10 +872,8 @@ impl SrcHandler {
.map(|earliest_pts| earliest_pts + latency - state.packet_spacing - context_wait / 2);
let delay = next_wakeup
- .zip(now)
- .map_or(gst::ClockTime::ZERO, |(next_wakeup, now)| {
- next_wakeup.saturating_sub(now)
- });
+ .opt_saturating_sub(now)
+ .unwrap_or(gst::ClockTime::ZERO);
gst_debug!(
CAT,
@@ -1113,8 +1106,7 @@ impl TaskImpl for JitterBufferTask {
);
let (delay_fut, abort_handle) = match next_wakeup {
- // FIXME use Duration::ZERO when MSVC >= 1.53.2
- Some((_, delay)) if delay == Duration::from_nanos(0) => (None, None),
+ Some((_, delay)) if delay.is_zero() => (None, None),
_ => {
let (delay_fut, abort_handle) = abortable(async move {
match next_wakeup {
@@ -1166,10 +1158,7 @@ impl TaskImpl for JitterBufferTask {
);
if let Some((next_wakeup, _)) = next_wakeup {
- if next_wakeup
- .zip(now)
- .map_or(false, |(next_wakeup, now)| next_wakeup > now)
- {
+ if next_wakeup.opt_gt(now).unwrap_or(false) {
// Reschedule and wait a bit longer in the next iteration
return Ok(());
}
diff --git a/generic/threadshare/src/proxy/imp.rs b/generic/threadshare/src/proxy/imp.rs
index 0e5904594..3262e8056 100644
--- a/generic/threadshare/src/proxy/imp.rs
+++ b/generic/threadshare/src/proxy/imp.rs
@@ -53,8 +53,7 @@ const DEFAULT_MAX_SIZE_BUFFERS: u32 = 200;
const DEFAULT_MAX_SIZE_BYTES: u32 = 1024 * 1024;
const DEFAULT_MAX_SIZE_TIME: gst::ClockTime = gst::ClockTime::SECOND;
const DEFAULT_CONTEXT: &str = "";
-// FIXME use Duration::ZERO when MSVC >= 1.53.2
-const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0);
+const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO;
#[derive(Debug, Clone)]
struct SettingsSink {
diff --git a/generic/threadshare/src/queue/imp.rs b/generic/threadshare/src/queue/imp.rs
index ec8f89513..f9d375e02 100644
--- a/generic/threadshare/src/queue/imp.rs
+++ b/generic/threadshare/src/queue/imp.rs
@@ -40,8 +40,7 @@ const DEFAULT_MAX_SIZE_BUFFERS: u32 = 200;
const DEFAULT_MAX_SIZE_BYTES: u32 = 1024 * 1024;
const DEFAULT_MAX_SIZE_TIME: gst::ClockTime = gst::ClockTime::SECOND;
const DEFAULT_CONTEXT: &str = "";
-// FIXME use Duration::ZERO when MSVC >= 1.53.2
-const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0);
+const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO;
#[derive(Debug, Clone)]
struct Settings {
diff --git a/generic/threadshare/src/runtime/task.rs b/generic/threadshare/src/runtime/task.rs
index aa6497959..1feade11b 100644
--- a/generic/threadshare/src/runtime/task.rs
+++ b/generic/threadshare/src/runtime/task.rs
@@ -1612,9 +1612,7 @@ mod tests {
task.prepare(TaskPrepareTest { prepare_receiver }, context.clone())
.unwrap();
- // FIXME use Duration::ZERO when MSVC >= 1.53.2
- let start_ctx =
- Context::acquire("prepare_start_ok_requester", Duration::from_nanos(0)).unwrap();
+ let start_ctx = Context::acquire("prepare_start_ok_requester", Duration::ZERO).unwrap();
let task_clone = task.clone();
let (ready_sender, ready_receiver) = oneshot::channel();
let start_handle = start_ctx.spawn(async move {
@@ -1737,9 +1735,7 @@ mod tests {
)
.unwrap();
- // FIXME use Duration::ZERO when MSVC >= 1.53.2
- let start_ctx =
- Context::acquire("prepare_start_error_requester", Duration::from_nanos(0)).unwrap();
+ let start_ctx = Context::acquire("prepare_start_error_requester", Duration::ZERO).unwrap();
let task_clone = task.clone();
let (ready_sender, ready_receiver) = oneshot::channel();
let start_handle = start_ctx.spawn(async move {
diff --git a/generic/threadshare/src/socket.rs b/generic/threadshare/src/socket.rs
index 8d8da5caa..420e0f345 100644
--- a/generic/threadshare/src/socket.rs
+++ b/generic/threadshare/src/socket.rs
@@ -143,11 +143,7 @@ impl<T: SocketRead> Socket<T> {
Ok((len, saddr)) => {
let dts = if T::DO_TIMESTAMP {
let time = self.clock.as_ref().unwrap().time();
- let running_time = time
- .zip(self.base_time)
- // TODO Do we want None if time < base_time
- // or do we expect Some?
- .and_then(|(time, base_time)| time.checked_sub(base_time));
+ let running_time = time.opt_checked_sub(self.base_time).ok().flatten();
// FIXME maybe we should check if running_time.is_none
// so as to display another message
gst_debug!(
diff --git a/generic/threadshare/src/tcpclientsrc/imp.rs b/generic/threadshare/src/tcpclientsrc/imp.rs
index b832ee9fd..aaaa846cb 100644
--- a/generic/threadshare/src/tcpclientsrc/imp.rs
+++ b/generic/threadshare/src/tcpclientsrc/imp.rs
@@ -48,8 +48,7 @@ const DEFAULT_PORT: i32 = 4953;
const DEFAULT_CAPS: Option<gst::Caps> = None;
const DEFAULT_BLOCKSIZE: u32 = 4096;
const DEFAULT_CONTEXT: &str = "";
-// FIXME use Duration::ZERO when MSVC >= 1.53.2
-const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0);
+const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO;
#[derive(Debug, Clone)]
struct Settings {
diff --git a/generic/threadshare/src/udpsink/imp.rs b/generic/threadshare/src/udpsink/imp.rs
index 2d6bcb8e2..3a9fe5577 100644
--- a/generic/threadshare/src/udpsink/imp.rs
+++ b/generic/threadshare/src/udpsink/imp.rs
@@ -62,8 +62,7 @@ const DEFAULT_TTL_MC: u32 = 1;
const DEFAULT_QOS_DSCP: i32 = -1;
const DEFAULT_CLIENTS: &str = "";
const DEFAULT_CONTEXT: &str = "";
-// FIXME use Duration::ZERO when MSVC >= 1.53.2
-const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0);
+const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO;
#[derive(Debug, Clone)]
struct Settings {
@@ -412,10 +411,7 @@ impl UdpSinkPadHandler {
rtime = segment
.downcast_ref::<gst::format::Time>()
.and_then(|segment| {
- segment
- .to_running_time(buffer.pts())
- .zip(inner.latency)
- .map(|(rtime, latency)| rtime + latency)
+ segment.to_running_time(buffer.pts()).opt_add(inner.latency)
});
}
@@ -529,13 +525,9 @@ impl UdpSinkPadHandler {
) {
let now = element.current_running_time();
- match running_time
- .into()
- .zip(now)
- .and_then(|(running_time, now)| running_time.checked_sub(now))
- {
- Some(delay) => runtime::time::delay_for(delay.into()).await,
- None => runtime::executor::yield_now().await,
+ match running_time.into().opt_checked_sub(now) {
+ Ok(Some(delay)) => runtime::time::delay_for(delay.into()).await,
+ _ => runtime::executor::yield_now().await,
}
}
diff --git a/generic/threadshare/src/udpsrc/imp.rs b/generic/threadshare/src/udpsrc/imp.rs
index cccf7c17f..0318d1c48 100644
--- a/generic/threadshare/src/udpsrc/imp.rs
+++ b/generic/threadshare/src/udpsrc/imp.rs
@@ -48,8 +48,7 @@ const DEFAULT_MTU: u32 = 1492;
const DEFAULT_SOCKET: Option<GioSocketWrapper> = None;
const DEFAULT_USED_SOCKET: Option<GioSocketWrapper> = None;
const DEFAULT_CONTEXT: &str = "";
-// FIXME use Duration::ZERO when MSVC >= 1.53.2
-const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0);
+const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO;
const DEFAULT_RETRIEVE_SENDER_ADDRESS: bool = true;
#[derive(Debug, Clone)]