diff options
Diffstat (limited to 'app/workers/concerns/limited_capacity')
-rw-r--r-- | app/workers/concerns/limited_capacity/job_tracker.rb | 42 | ||||
-rw-r--r-- | app/workers/concerns/limited_capacity/worker.rb | 86 |
2 files changed, 44 insertions, 84 deletions
diff --git a/app/workers/concerns/limited_capacity/job_tracker.rb b/app/workers/concerns/limited_capacity/job_tracker.rb index 96b6e1a2024..47b13cd5bf6 100644 --- a/app/workers/concerns/limited_capacity/job_tracker.rb +++ b/app/workers/concerns/limited_capacity/job_tracker.rb @@ -3,21 +3,30 @@ module LimitedCapacity class JobTracker # rubocop:disable Scalability/IdempotentWorker include Gitlab::Utils::StrongMemoize + LUA_REGISTER_SCRIPT = <<~EOS + local set_key, element, max_elements = KEYS[1], ARGV[1], ARGV[2] + + if redis.call("scard", set_key) < tonumber(max_elements) then + redis.call("sadd", set_key, element) + return true + end + + return false + EOS + def initialize(namespace) @namespace = namespace end - def register(jid) - _added, @count = with_redis_pipeline do |redis| - register_job_keys(redis, jid) - get_job_count(redis) - end + def register(jid, max_jids) + with_redis do |redis| + redis.eval(LUA_REGISTER_SCRIPT, keys: [counter_key], argv: [jid, max_jids]) + end.present? end def remove(jid) - _removed, @count = with_redis_pipeline do |redis| + with_redis do |redis| remove_job_keys(redis, jid) - get_job_count(redis) end end @@ -25,14 +34,13 @@ module LimitedCapacity completed_jids = Gitlab::SidekiqStatus.completed_jids(running_jids) return unless completed_jids.any? - _removed, @count = with_redis_pipeline do |redis| + with_redis do |redis| remove_job_keys(redis, completed_jids) - get_job_count(redis) end end def count - @count ||= with_redis { |redis| get_job_count(redis) } + with_redis { |redis| redis.scard(counter_key) } end def running_jids @@ -49,14 +57,6 @@ module LimitedCapacity "worker:#{namespace.to_s.underscore}:running" end - def get_job_count(redis) - redis.scard(counter_key) - end - - def register_job_keys(redis, keys) - redis.sadd(counter_key, keys) - end - def remove_job_keys(redis, keys) redis.srem(counter_key, keys) end @@ -64,11 +64,5 @@ module LimitedCapacity def with_redis(&block) Gitlab::Redis::Queues.with(&block) # rubocop: disable CodeReuse/ActiveRecord end - - def with_redis_pipeline(&block) - with_redis do |redis| - redis.pipelined(&block) - end - end end end diff --git a/app/workers/concerns/limited_capacity/worker.rb b/app/workers/concerns/limited_capacity/worker.rb index 9dd8d942146..b4cdfda680f 100644 --- a/app/workers/concerns/limited_capacity/worker.rb +++ b/app/workers/concerns/limited_capacity/worker.rb @@ -55,26 +55,14 @@ module LimitedCapacity def perform_with_capacity(*args) worker = self.new worker.remove_failed_jobs - worker.report_prometheus_metrics(*args) - required_jobs_count = worker.required_jobs_count(*args) - arguments = Array.new(required_jobs_count) { args } + arguments = Array.new(worker.max_running_jobs) { args } self.bulk_perform_async(arguments) # rubocop:disable Scalability/BulkPerformWithContext end end def perform(*args) - return unless has_capacity? - - job_tracker.register(jid) - report_running_jobs_metrics - perform_work(*args) - rescue => exception - raise - ensure - job_tracker.remove(jid) - report_prometheus_metrics(*args) - re_enqueue(*args) unless exception + perform_registered(*args) if job_tracker.register(jid, max_running_jobs) end def perform_work(*args) @@ -89,43 +77,32 @@ module LimitedCapacity raise NotImplementedError end - def has_capacity? - remaining_capacity > 0 - end - - def remaining_capacity - [ - max_running_jobs - running_jobs_count - self.class.queue_size, - 0 - ].max - end - - def has_work?(*args) - remaining_work_count(*args) > 0 - end - def remove_failed_jobs job_tracker.clean_up end def report_prometheus_metrics(*args) report_running_jobs_metrics - remaining_work_gauge.set(prometheus_labels, remaining_work_count(*args)) - max_running_jobs_gauge.set(prometheus_labels, max_running_jobs) + set_metric(:remaining_work_gauge, remaining_work_count(*args)) + set_metric(:max_running_jobs_gauge, max_running_jobs) end - def report_running_jobs_metrics - running_jobs_gauge.set(prometheus_labels, running_jobs_count) - end + private - def required_jobs_count(*args) - [ - remaining_work_count(*args), - remaining_capacity - ].min + def perform_registered(*args) + report_running_jobs_metrics + perform_work(*args) + rescue StandardError => exception + raise + ensure + job_tracker.remove(jid) + report_prometheus_metrics(*args) + re_enqueue(*args) unless exception end - private + def report_running_jobs_metrics + set_metric(:running_jobs_gauge, running_jobs_count) + end def running_jobs_count job_tracker.count @@ -138,32 +115,21 @@ module LimitedCapacity end def re_enqueue(*args) - return unless has_capacity? - return unless has_work?(*args) + return unless remaining_work_count(*args) > 0 self.class.perform_async(*args) end - def running_jobs_gauge - strong_memoize(:running_jobs_gauge) do - Gitlab::Metrics.gauge(:limited_capacity_worker_running_jobs, 'Number of running jobs') - end - end - - def max_running_jobs_gauge - strong_memoize(:max_running_jobs_gauge) do - Gitlab::Metrics.gauge(:limited_capacity_worker_max_running_jobs, 'Maximum number of running jobs') + def set_metric(name, value) + metrics = strong_memoize(:metrics) do + { + running_jobs_gauge: Gitlab::Metrics.gauge(:limited_capacity_worker_running_jobs, 'Number of running jobs'), + max_running_jobs_gauge: Gitlab::Metrics.gauge(:limited_capacity_worker_max_running_jobs, 'Maximum number of running jobs'), + remaining_work_gauge: Gitlab::Metrics.gauge(:limited_capacity_worker_remaining_work_count, 'Number of jobs waiting to be enqueued') + } end - end - - def remaining_work_gauge - strong_memoize(:remaining_work_gauge) do - Gitlab::Metrics.gauge(:limited_capacity_worker_remaining_work_count, 'Number of jobs waiting to be enqueued') - end - end - def prometheus_labels - { worker: self.class.name } + metrics[name].set({ worker: self.class.name }, value) end end end |