diff options
author | Kamil TrzciĆski <ayufan@ayufan.eu> | 2019-08-14 18:56:37 +0300 |
---|---|---|
committer | Qingyu Zhao <qzhao@gitlab.com> | 2019-08-21 11:50:46 +0300 |
commit | 75e2302d0126c4bc8ea215ffb4e72612d44e73bb (patch) | |
tree | 9b7bb2eb248080aab20cd8de15cf73ceb8b97dd8 /lib | |
parent | ca622a3e13cf88d94c6b3c98554e9782d37d4ad5 (diff) |
Allow to interrupt running jobs
This adds a middleware to track all threads
for running jobs.
This makes sidekiq to watch for redis-delivered notifications.
This makes be able to send notification to interrupt
running sidekiq jobs.
This does not take into account any native code,
as `Thread.raise` generates exception once the control gets
back to Ruby.
The separate measure should be taken to interrupt gRPC, shellouts,
or anything else that escapes Ruby.
Diffstat (limited to 'lib')
-rw-r--r-- | lib/gitlab/sidekiq_middleware/jobs_threads.rb | 49 | ||||
-rw-r--r-- | lib/gitlab/sidekiq_status/monitor.rb | 46 |
2 files changed, 95 insertions, 0 deletions
diff --git a/lib/gitlab/sidekiq_middleware/jobs_threads.rb b/lib/gitlab/sidekiq_middleware/jobs_threads.rb new file mode 100644 index 00000000000..d0603bcee2d --- /dev/null +++ b/lib/gitlab/sidekiq_middleware/jobs_threads.rb @@ -0,0 +1,49 @@ +# frozen_string_literal: true + +module Gitlab + module SidekiqMiddleware + class JobsThreads + @@jobs = {} # rubocop:disable Style/ClassVars + MUTEX = Mutex.new + + def call(worker, job, queue) + jid = job['jid'] + + MUTEX.synchronize do + @@jobs[jid] = Thread.current + end + + return if self.class.cancelled?(jid) + + yield + ensure + MUTEX.synchronize do + @@jobs.delete(jid) + end + end + + def self.interrupt(jid) + MUTEX.synchronize do + thread = @@jobs[jid] + break unless thread + + thread.raise(Interrupt) + thread + end + end + + def self.cancelled?(jid) + Sidekiq.redis {|c| c.exists("cancelled-#{jid}") } + end + + def self.mark_job_as_cancelled(jid) + Sidekiq.redis {|c| c.setex("cancelled-#{jid}", 86400, 1) } + "Marked job as cancelled(if Sidekiq retry within 24 hours, the job will be skipped as `processed`). Jid: #{jid}" + end + + def self.jobs + @@jobs + end + end + end +end diff --git a/lib/gitlab/sidekiq_status/monitor.rb b/lib/gitlab/sidekiq_status/monitor.rb new file mode 100644 index 00000000000..3fd9f02b166 --- /dev/null +++ b/lib/gitlab/sidekiq_status/monitor.rb @@ -0,0 +1,46 @@ +# frozen_string_literal: true + +module Gitlab + module SidekiqStatus + class Monitor < Daemon + include ::Gitlab::Utils::StrongMemoize + + NOTIFICATION_CHANNEL = 'sidekiq:cancel:notifications'.freeze + + def start_working + Sidekiq.logger.info "Watching sidekiq monitor" + + ::Gitlab::Redis::SharedState.with do |redis| + redis.subscribe(NOTIFICATION_CHANNEL) do |on| + on.message do |channel, message| + Sidekiq.logger.info "Received #{message} on #{channel}..." + execute_job_cancel(message) + end + end + end + end + + def self.cancel_job(jid) + Gitlab::Redis::SharedState.with do |redis| + redis.publish(NOTIFICATION_CHANNEL, jid) + "Notification sent. Job should be cancelled soon. Check log to confirm. Jid: #{jid}" + end + end + + private + + def execute_job_cancel(jid) + Gitlab::SidekiqMiddleware::JobsThreads.mark_job_as_cancelled(jid) + + thread = Gitlab::SidekiqMiddleware::JobsThreads + .interrupt(jid) + + if thread + Sidekiq.logger.info "Interrupted thread: #{thread} for #{jid}." + else + Sidekiq.logger.info "Did not find thread for #{jid}." + end + end + end + end +end |