Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Dröge <sebastian@centricular.com>2018-04-12 09:59:39 +0300
committerSebastian Dröge <sebastian@centricular.com>2018-11-05 14:36:47 +0300
commit2dfca38977b41e8394fc00841ce9ffed01b31a24 (patch)
tree69442622b06d0d7b98080b51aa57af1b47b165df
parentb56e1a987363611ab3887d965502df10f3889747 (diff)
threadshare: Turn the single-threaded executor until no futures are left to be run before waiting
Otherwise in e.g. a pipeline like ts-udpsrc ! ts-queue ! fakesink the first turn would only get a packet and queue it up, then we would wait due to throttling and only then we would forward the packet from the queue (but not poll the socket again), wait again due to throttling and only then poll and get the next packet. See https://github.com/tokio-rs/tokio/issues/310
-rw-r--r--gst-plugin-threadshare/Cargo.toml10
-rw-r--r--gst-plugin-threadshare/src/iocontext.rs9
2 files changed, 13 insertions, 6 deletions
diff --git a/gst-plugin-threadshare/Cargo.toml b/gst-plugin-threadshare/Cargo.toml
index f95e4dddd..e42c2c7e5 100644
--- a/gst-plugin-threadshare/Cargo.toml
+++ b/gst-plugin-threadshare/Cargo.toml
@@ -10,11 +10,11 @@ gstreamer-sys = { git = "https://github.com/sdroege/gstreamer-sys" }
glib = { git = "https://github.com/gtk-rs/glib" }
gstreamer = { git = "https://github.com/sdroege/gstreamer-rs" }
gst-plugin = { git = "https://github.com/sdroege/gst-plugin-rs" }
-tokio = { git = "https://github.com/tokio-rs/tokio" }
-tokio-reactor = { git = "https://github.com/tokio-rs/tokio" }
-tokio-executor = { git = "https://github.com/tokio-rs/tokio" }
-tokio-timer = { git = "https://github.com/tokio-rs/tokio" }
-tokio-threadpool = { git = "https://github.com/tokio-rs/tokio" }
+tokio = { git = "https://github.com/sdroege/tokio", branch = "fair-turn" }
+tokio-reactor = { git = "https://github.com/sdroege/tokio", branch = "fair-turn" }
+tokio-executor = { git = "https://github.com/sdroege/tokio", branch = "fair-turn" }
+tokio-timer = { git = "https://github.com/sdroege/tokio", branch = "fair-turn" }
+tokio-threadpool = { git = "https://github.com/sdroege/tokio", branch = "fair-turn" }
futures = "0.1"
lazy_static = "1.0"
either = "1.0"
diff --git a/gst-plugin-threadshare/src/iocontext.rs b/gst-plugin-threadshare/src/iocontext.rs
index f82f30a5f..b992f5a99 100644
--- a/gst-plugin-threadshare/src/iocontext.rs
+++ b/gst-plugin-threadshare/src/iocontext.rs
@@ -134,6 +134,8 @@ impl IOContextRunner {
::tokio_reactor::with_default(&handle, &mut enter, |mut enter| {
::tokio_timer::with_default(&timer_handle, &mut enter, |enter| loop {
+ use tokio::executor::current_thread::Turn;
+
let now = time::Instant::now();
if self.shutdown.load(atomic::Ordering::SeqCst) > RUNNING {
@@ -148,7 +150,11 @@ impl IOContextRunner {
}
gst_trace!(CONTEXT_CAT, "Turning current thread '{}'", self.name);
- current_thread.enter(enter).turn(None).unwrap();
+ while let Turn(true) = current_thread
+ .enter(enter)
+ .turn(Some(time::Duration::from_millis(0)))
+ .unwrap()
+ {}
gst_trace!(CONTEXT_CAT, "Turned current thread '{}'", self.name);
let elapsed = now.elapsed();
@@ -277,6 +283,7 @@ impl IOContext {
let shutdown = IOContextRunner::start(name, wait, reactor);
+ // FIXME: The executor threads are not throttled at all, only the reactor
let mut pool_builder = thread_pool::Builder::new();
pool_builder
.around_worker(move |w, enter| {