diff options
Diffstat (limited to 'app/workers/concerns/limited_capacity/worker.rb')
-rw-r--r-- | app/workers/concerns/limited_capacity/worker.rb | 164 |
1 files changed, 164 insertions, 0 deletions
diff --git a/app/workers/concerns/limited_capacity/worker.rb b/app/workers/concerns/limited_capacity/worker.rb new file mode 100644 index 00000000000..c0d6bfff2f5 --- /dev/null +++ b/app/workers/concerns/limited_capacity/worker.rb @@ -0,0 +1,164 @@ +# frozen_string_literal: true + +# Usage: +# +# Worker that performs the tasks: +# +# class DummyWorker +# include ApplicationWorker +# include LimitedCapacity::Worker +# +# # For each job that raises any error, a worker instance will be disabled +# # until the next schedule-run. +# # If you wish to get around this, exceptions must by handled by the implementer. +# # +# def perform_work(*args) +# end +# +# def remaining_work_count(*args) +# 5 +# end +# +# def max_running_jobs +# 25 +# end +# end +# +# Cron worker to fill the pool of regular workers: +# +# class ScheduleDummyCronWorker +# include ApplicationWorker +# include CronjobQueue +# +# def perform(*args) +# DummyWorker.perform_with_capacity(*args) +# end +# end +# + +module LimitedCapacity + module Worker + extend ActiveSupport::Concern + include Gitlab::Utils::StrongMemoize + + included do + # Disable Sidekiq retries, log the error, and send the job to the dead queue. + # This is done to have only one source that produces jobs and because the slot + # would be occupied by a job that will be performed in the distant future. + # We let the cron worker enqueue new jobs, this could be seen as our retry and + # back off mechanism because the job might fail again if executed immediately. + sidekiq_options retry: 0 + deduplicate :none + end + + class_methods do + def perform_with_capacity(*args) + worker = self.new + worker.remove_failed_jobs + worker.report_prometheus_metrics(*args) + required_jobs_count = worker.required_jobs_count(*args) + + arguments = Array.new(required_jobs_count) { args } + self.bulk_perform_async(arguments) # rubocop:disable Scalability/BulkPerformWithContext + end + end + + def perform(*args) + return unless has_capacity? + + job_tracker.register(jid) + perform_work(*args) + rescue => exception + raise + ensure + job_tracker.remove(jid) + report_prometheus_metrics + re_enqueue(*args) unless exception + end + + def perform_work(*args) + raise NotImplementedError + end + + def remaining_work_count(*args) + raise NotImplementedError + end + + def max_running_jobs + raise NotImplementedError + end + + def has_capacity? + remaining_capacity > 0 + end + + def remaining_capacity + [ + max_running_jobs - running_jobs_count - self.class.queue_size, + 0 + ].max + end + + def has_work?(*args) + remaining_work_count(*args) > 0 + end + + def remove_failed_jobs + job_tracker.clean_up + end + + def report_prometheus_metrics(*args) + running_jobs_gauge.set(prometheus_labels, running_jobs_count) + remaining_work_gauge.set(prometheus_labels, remaining_work_count(*args)) + max_running_jobs_gauge.set(prometheus_labels, max_running_jobs) + end + + def required_jobs_count(*args) + [ + remaining_work_count(*args), + remaining_capacity + ].min + end + + private + + def running_jobs_count + job_tracker.count + end + + def job_tracker + strong_memoize(:job_tracker) do + JobTracker.new(self.class.name) + end + end + + def re_enqueue(*args) + return unless has_capacity? + return unless has_work?(*args) + + self.class.perform_async(*args) + end + + def running_jobs_gauge + strong_memoize(:running_jobs_gauge) do + Gitlab::Metrics.gauge(:limited_capacity_worker_running_jobs, 'Number of running jobs') + end + end + + def max_running_jobs_gauge + strong_memoize(:max_running_jobs_gauge) do + Gitlab::Metrics.gauge(:limited_capacity_worker_max_running_jobs, 'Maximum number of running jobs') + end + end + + def remaining_work_gauge + strong_memoize(:remaining_work_gauge) do + Gitlab::Metrics.gauge(:limited_capacity_worker_remaining_work_count, 'Number of jobs waiting to be enqueued') + end + end + + def prometheus_labels + { worker: self.class.name } + end + end +end |