Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKamil TrzciƄski <ayufan@ayufan.eu>2019-08-14 18:56:37 +0300
committerQingyu Zhao <qzhao@gitlab.com>2019-08-21 11:50:46 +0300
commit75e2302d0126c4bc8ea215ffb4e72612d44e73bb (patch)
tree9b7bb2eb248080aab20cd8de15cf73ceb8b97dd8 /lib/gitlab/sidekiq_middleware
parentca622a3e13cf88d94c6b3c98554e9782d37d4ad5 (diff)
Allow to interrupt running jobs
This adds a middleware to track all threads for running jobs. This makes sidekiq to watch for redis-delivered notifications. This makes be able to send notification to interrupt running sidekiq jobs. This does not take into account any native code, as `Thread.raise` generates exception once the control gets back to Ruby. The separate measure should be taken to interrupt gRPC, shellouts, or anything else that escapes Ruby.
Diffstat (limited to 'lib/gitlab/sidekiq_middleware')
-rw-r--r--lib/gitlab/sidekiq_middleware/jobs_threads.rb49
1 files changed, 49 insertions, 0 deletions
diff --git a/lib/gitlab/sidekiq_middleware/jobs_threads.rb b/lib/gitlab/sidekiq_middleware/jobs_threads.rb
new file mode 100644
index 00000000000..d0603bcee2d
--- /dev/null
+++ b/lib/gitlab/sidekiq_middleware/jobs_threads.rb
@@ -0,0 +1,49 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module SidekiqMiddleware
+ class JobsThreads
+ @@jobs = {} # rubocop:disable Style/ClassVars
+ MUTEX = Mutex.new
+
+ def call(worker, job, queue)
+ jid = job['jid']
+
+ MUTEX.synchronize do
+ @@jobs[jid] = Thread.current
+ end
+
+ return if self.class.cancelled?(jid)
+
+ yield
+ ensure
+ MUTEX.synchronize do
+ @@jobs.delete(jid)
+ end
+ end
+
+ def self.interrupt(jid)
+ MUTEX.synchronize do
+ thread = @@jobs[jid]
+ break unless thread
+
+ thread.raise(Interrupt)
+ thread
+ end
+ end
+
+ def self.cancelled?(jid)
+ Sidekiq.redis {|c| c.exists("cancelled-#{jid}") }
+ end
+
+ def self.mark_job_as_cancelled(jid)
+ Sidekiq.redis {|c| c.setex("cancelled-#{jid}", 86400, 1) }
+ "Marked job as cancelled(if Sidekiq retry within 24 hours, the job will be skipped as `processed`). Jid: #{jid}"
+ end
+
+ def self.jobs
+ @@jobs
+ end
+ end
+ end
+end