diff options
Diffstat (limited to 'app/workers/concerns/limited_capacity/worker.rb')
-rw-r--r-- | app/workers/concerns/limited_capacity/worker.rb | 86 |
1 files changed, 26 insertions, 60 deletions
diff --git a/app/workers/concerns/limited_capacity/worker.rb b/app/workers/concerns/limited_capacity/worker.rb index 9dd8d942146..b4cdfda680f 100644 --- a/app/workers/concerns/limited_capacity/worker.rb +++ b/app/workers/concerns/limited_capacity/worker.rb @@ -55,26 +55,14 @@ module LimitedCapacity def perform_with_capacity(*args) worker = self.new worker.remove_failed_jobs - worker.report_prometheus_metrics(*args) - required_jobs_count = worker.required_jobs_count(*args) - arguments = Array.new(required_jobs_count) { args } + arguments = Array.new(worker.max_running_jobs) { args } self.bulk_perform_async(arguments) # rubocop:disable Scalability/BulkPerformWithContext end end def perform(*args) - return unless has_capacity? - - job_tracker.register(jid) - report_running_jobs_metrics - perform_work(*args) - rescue => exception - raise - ensure - job_tracker.remove(jid) - report_prometheus_metrics(*args) - re_enqueue(*args) unless exception + perform_registered(*args) if job_tracker.register(jid, max_running_jobs) end def perform_work(*args) @@ -89,43 +77,32 @@ module LimitedCapacity raise NotImplementedError end - def has_capacity? - remaining_capacity > 0 - end - - def remaining_capacity - [ - max_running_jobs - running_jobs_count - self.class.queue_size, - 0 - ].max - end - - def has_work?(*args) - remaining_work_count(*args) > 0 - end - def remove_failed_jobs job_tracker.clean_up end def report_prometheus_metrics(*args) report_running_jobs_metrics - remaining_work_gauge.set(prometheus_labels, remaining_work_count(*args)) - max_running_jobs_gauge.set(prometheus_labels, max_running_jobs) + set_metric(:remaining_work_gauge, remaining_work_count(*args)) + set_metric(:max_running_jobs_gauge, max_running_jobs) end - def report_running_jobs_metrics - running_jobs_gauge.set(prometheus_labels, running_jobs_count) - end + private - def required_jobs_count(*args) - [ - remaining_work_count(*args), - remaining_capacity - ].min + def perform_registered(*args) + report_running_jobs_metrics + perform_work(*args) + rescue StandardError => exception + raise + ensure + job_tracker.remove(jid) + report_prometheus_metrics(*args) + re_enqueue(*args) unless exception end - private + def report_running_jobs_metrics + set_metric(:running_jobs_gauge, running_jobs_count) + end def running_jobs_count job_tracker.count @@ -138,32 +115,21 @@ module LimitedCapacity end def re_enqueue(*args) - return unless has_capacity? - return unless has_work?(*args) + return unless remaining_work_count(*args) > 0 self.class.perform_async(*args) end - def running_jobs_gauge - strong_memoize(:running_jobs_gauge) do - Gitlab::Metrics.gauge(:limited_capacity_worker_running_jobs, 'Number of running jobs') - end - end - - def max_running_jobs_gauge - strong_memoize(:max_running_jobs_gauge) do - Gitlab::Metrics.gauge(:limited_capacity_worker_max_running_jobs, 'Maximum number of running jobs') + def set_metric(name, value) + metrics = strong_memoize(:metrics) do + { + running_jobs_gauge: Gitlab::Metrics.gauge(:limited_capacity_worker_running_jobs, 'Number of running jobs'), + max_running_jobs_gauge: Gitlab::Metrics.gauge(:limited_capacity_worker_max_running_jobs, 'Maximum number of running jobs'), + remaining_work_gauge: Gitlab::Metrics.gauge(:limited_capacity_worker_remaining_work_count, 'Number of jobs waiting to be enqueued') + } end - end - - def remaining_work_gauge - strong_memoize(:remaining_work_gauge) do - Gitlab::Metrics.gauge(:limited_capacity_worker_remaining_work_count, 'Number of jobs waiting to be enqueued') - end - end - def prometheus_labels - { worker: self.class.name } + metrics[name].set({ worker: self.class.name }, value) end end end |