1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
|
# frozen_string_literal: true
# Usage:
#
# Worker that performs the tasks:
#
# class DummyWorker
# include ApplicationWorker
# include LimitedCapacity::Worker
#
# # For each job that raises any error, a worker instance will be disabled
# # until the next schedule-run.
# # If you wish to get around this, exceptions must by handled by the implementer.
# #
# def perform_work(*args)
# end
#
# def remaining_work_count(*args)
# 5
# end
#
# def max_running_jobs
# 25
# end
# end
#
# Cron worker to fill the pool of regular workers:
#
# class ScheduleDummyCronWorker
# include ApplicationWorker
# include CronjobQueue
#
# def perform(*args)
# DummyWorker.perform_with_capacity(*args)
# end
# end
#
module LimitedCapacity
module Worker
extend ActiveSupport::Concern
include Gitlab::Utils::StrongMemoize
included do
# Disable Sidekiq retries, log the error, and send the job to the dead queue.
# This is done to have only one source that produces jobs and because the slot
# would be occupied by a job that will be performed in the distant future.
# We let the cron worker enqueue new jobs, this could be seen as our retry and
# back off mechanism because the job might fail again if executed immediately.
sidekiq_options retry: 0
deduplicate :none
end
class_methods do
def perform_with_capacity(*args)
worker = self.new
worker.remove_failed_jobs
worker.report_prometheus_metrics(*args)
required_jobs_count = worker.required_jobs_count(*args)
arguments = Array.new(required_jobs_count) { args }
self.bulk_perform_async(arguments) # rubocop:disable Scalability/BulkPerformWithContext
end
end
def perform(*args)
return unless has_capacity?
job_tracker.register(jid)
report_running_jobs_metrics
perform_work(*args)
rescue => exception
raise
ensure
job_tracker.remove(jid)
report_prometheus_metrics
re_enqueue(*args) unless exception
end
def perform_work(*args)
raise NotImplementedError
end
def remaining_work_count(*args)
raise NotImplementedError
end
def max_running_jobs
raise NotImplementedError
end
def has_capacity?
remaining_capacity > 0
end
def remaining_capacity
[
max_running_jobs - running_jobs_count - self.class.queue_size,
0
].max
end
def has_work?(*args)
remaining_work_count(*args) > 0
end
def remove_failed_jobs
job_tracker.clean_up
end
def report_prometheus_metrics(*args)
report_running_jobs_metrics
remaining_work_gauge.set(prometheus_labels, remaining_work_count(*args))
max_running_jobs_gauge.set(prometheus_labels, max_running_jobs)
end
def report_running_jobs_metrics
running_jobs_gauge.set(prometheus_labels, running_jobs_count)
end
def required_jobs_count(*args)
[
remaining_work_count(*args),
remaining_capacity
].min
end
private
def running_jobs_count
job_tracker.count
end
def job_tracker
strong_memoize(:job_tracker) do
JobTracker.new(self.class.name)
end
end
def re_enqueue(*args)
return unless has_capacity?
return unless has_work?(*args)
self.class.perform_async(*args)
end
def running_jobs_gauge
strong_memoize(:running_jobs_gauge) do
Gitlab::Metrics.gauge(:limited_capacity_worker_running_jobs, 'Number of running jobs')
end
end
def max_running_jobs_gauge
strong_memoize(:max_running_jobs_gauge) do
Gitlab::Metrics.gauge(:limited_capacity_worker_max_running_jobs, 'Maximum number of running jobs')
end
end
def remaining_work_gauge
strong_memoize(:remaining_work_gauge) do
Gitlab::Metrics.gauge(:limited_capacity_worker_remaining_work_count, 'Number of jobs waiting to be enqueued')
end
end
def prometheus_labels
{ worker: self.class.name }
end
end
end
|