Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'lib/gitlab/sidekiq_middleware/pause_control/pause_control_service.rb')
-rw-r--r--lib/gitlab/sidekiq_middleware/pause_control/pause_control_service.rb118
1 files changed, 118 insertions, 0 deletions
diff --git a/lib/gitlab/sidekiq_middleware/pause_control/pause_control_service.rb b/lib/gitlab/sidekiq_middleware/pause_control/pause_control_service.rb
new file mode 100644
index 00000000000..73f42beaf9e
--- /dev/null
+++ b/lib/gitlab/sidekiq_middleware/pause_control/pause_control_service.rb
@@ -0,0 +1,118 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module SidekiqMiddleware
+ module PauseControl
+ class PauseControlService
+ # Class for managing queues for paused workers
+ # When a worker is paused all jobs are saved in a separate sorted sets in redis
+ LIMIT = 1000
+ PROJECT_CONTEXT_KEY = "#{Gitlab::ApplicationContext::LOG_KEY}.project".freeze
+
+ def initialize(worker_name)
+ @worker_name = worker_name
+
+ worker_name = @worker_name.underscore
+ @redis_set_key = "sidekiq:pause_control:paused_jobs:zset:{#{worker_name}}"
+ @redis_score_key = "sidekiq:pause_control:paused_jobs:score:{#{worker_name}}"
+ end
+
+ class << self
+ def add_to_waiting_queue!(worker_name, args, context)
+ new(worker_name).add_to_waiting_queue!(args, context)
+ end
+
+ def has_jobs_in_waiting_queue?(worker_name)
+ new(worker_name).has_jobs_in_waiting_queue?
+ end
+
+ def resume_processing!(worker_name)
+ new(worker_name).resume_processing!
+ end
+
+ def queue_size(worker_name)
+ new(worker_name).queue_size
+ end
+ end
+
+ def add_to_waiting_queue!(args, context)
+ with_redis do |redis|
+ redis.zadd(redis_set_key, generate_unique_score(redis), serialize(args, context))
+ end
+ end
+
+ def queue_size
+ with_redis { |redis| redis.zcard(redis_set_key) }
+ end
+
+ def has_jobs_in_waiting_queue?
+ with_redis { |redis| redis.exists?(redis_set_key) } # rubocop:disable CodeReuse/ActiveRecord
+ end
+
+ def resume_processing!(iterations: 1)
+ with_redis do |redis|
+ iterations.times do
+ jobs_with_scores = next_batch_from_waiting_queue(redis)
+ break if jobs_with_scores.empty?
+
+ parsed_jobs = jobs_with_scores.map { |j, _| deserialize(j) }
+
+ parsed_jobs.each { |j| send_to_processing_queue(j) }
+
+ remove_jobs_from_waiting_queue(redis, jobs_with_scores)
+ end
+
+ size = queue_size
+ redis.del(redis_score_key, redis_set_key) if size == 0
+
+ size
+ end
+ end
+
+ private
+
+ attr_reader :worker_name, :redis_set_key, :redis_score_key
+
+ def with_redis(&blk)
+ Gitlab::Redis::SharedState.with(&blk) # rubocop:disable CodeReuse/ActiveRecord
+ end
+
+ def serialize(args, context)
+ {
+ args: args,
+ # Only include part of the context that would not prevent deduplication
+ context: context.slice(PROJECT_CONTEXT_KEY)
+ }.to_json
+ end
+
+ def deserialize(json)
+ Gitlab::Json.parse(json)
+ end
+
+ def send_to_processing_queue(job)
+ Gitlab::ApplicationContext.with_raw_context(job['context']) do
+ args = job['args']
+
+ Gitlab::SidekiqLogging::PauseControlLogger.instance.resumed_log(worker_name, args)
+
+ worker_name.safe_constantize&.perform_async(*args)
+ end
+ end
+
+ def generate_unique_score(redis)
+ redis.incr(redis_score_key)
+ end
+
+ def next_batch_from_waiting_queue(redis)
+ redis.zrangebyscore(redis_set_key, '-inf', '+inf', limit: [0, LIMIT], with_scores: true)
+ end
+
+ def remove_jobs_from_waiting_queue(redis, jobs_with_scores)
+ first_score = jobs_with_scores.first.last
+ last_score = jobs_with_scores.last.last
+ redis.zremrangebyscore(redis_set_key, first_score, last_score)
+ end
+ end
+ end
+ end
+end