Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'app/workers/concerns/limited_capacity')
-rw-r--r--app/workers/concerns/limited_capacity/job_tracker.rb42
-rw-r--r--app/workers/concerns/limited_capacity/worker.rb86
2 files changed, 44 insertions, 84 deletions
diff --git a/app/workers/concerns/limited_capacity/job_tracker.rb b/app/workers/concerns/limited_capacity/job_tracker.rb
index 96b6e1a2024..47b13cd5bf6 100644
--- a/app/workers/concerns/limited_capacity/job_tracker.rb
+++ b/app/workers/concerns/limited_capacity/job_tracker.rb
@@ -3,21 +3,30 @@ module LimitedCapacity
class JobTracker # rubocop:disable Scalability/IdempotentWorker
include Gitlab::Utils::StrongMemoize
+ LUA_REGISTER_SCRIPT = <<~EOS
+ local set_key, element, max_elements = KEYS[1], ARGV[1], ARGV[2]
+
+ if redis.call("scard", set_key) < tonumber(max_elements) then
+ redis.call("sadd", set_key, element)
+ return true
+ end
+
+ return false
+ EOS
+
def initialize(namespace)
@namespace = namespace
end
- def register(jid)
- _added, @count = with_redis_pipeline do |redis|
- register_job_keys(redis, jid)
- get_job_count(redis)
- end
+ def register(jid, max_jids)
+ with_redis do |redis|
+ redis.eval(LUA_REGISTER_SCRIPT, keys: [counter_key], argv: [jid, max_jids])
+ end.present?
end
def remove(jid)
- _removed, @count = with_redis_pipeline do |redis|
+ with_redis do |redis|
remove_job_keys(redis, jid)
- get_job_count(redis)
end
end
@@ -25,14 +34,13 @@ module LimitedCapacity
completed_jids = Gitlab::SidekiqStatus.completed_jids(running_jids)
return unless completed_jids.any?
- _removed, @count = with_redis_pipeline do |redis|
+ with_redis do |redis|
remove_job_keys(redis, completed_jids)
- get_job_count(redis)
end
end
def count
- @count ||= with_redis { |redis| get_job_count(redis) }
+ with_redis { |redis| redis.scard(counter_key) }
end
def running_jids
@@ -49,14 +57,6 @@ module LimitedCapacity
"worker:#{namespace.to_s.underscore}:running"
end
- def get_job_count(redis)
- redis.scard(counter_key)
- end
-
- def register_job_keys(redis, keys)
- redis.sadd(counter_key, keys)
- end
-
def remove_job_keys(redis, keys)
redis.srem(counter_key, keys)
end
@@ -64,11 +64,5 @@ module LimitedCapacity
def with_redis(&block)
Gitlab::Redis::Queues.with(&block) # rubocop: disable CodeReuse/ActiveRecord
end
-
- def with_redis_pipeline(&block)
- with_redis do |redis|
- redis.pipelined(&block)
- end
- end
end
end
diff --git a/app/workers/concerns/limited_capacity/worker.rb b/app/workers/concerns/limited_capacity/worker.rb
index 9dd8d942146..b4cdfda680f 100644
--- a/app/workers/concerns/limited_capacity/worker.rb
+++ b/app/workers/concerns/limited_capacity/worker.rb
@@ -55,26 +55,14 @@ module LimitedCapacity
def perform_with_capacity(*args)
worker = self.new
worker.remove_failed_jobs
- worker.report_prometheus_metrics(*args)
- required_jobs_count = worker.required_jobs_count(*args)
- arguments = Array.new(required_jobs_count) { args }
+ arguments = Array.new(worker.max_running_jobs) { args }
self.bulk_perform_async(arguments) # rubocop:disable Scalability/BulkPerformWithContext
end
end
def perform(*args)
- return unless has_capacity?
-
- job_tracker.register(jid)
- report_running_jobs_metrics
- perform_work(*args)
- rescue => exception
- raise
- ensure
- job_tracker.remove(jid)
- report_prometheus_metrics(*args)
- re_enqueue(*args) unless exception
+ perform_registered(*args) if job_tracker.register(jid, max_running_jobs)
end
def perform_work(*args)
@@ -89,43 +77,32 @@ module LimitedCapacity
raise NotImplementedError
end
- def has_capacity?
- remaining_capacity > 0
- end
-
- def remaining_capacity
- [
- max_running_jobs - running_jobs_count - self.class.queue_size,
- 0
- ].max
- end
-
- def has_work?(*args)
- remaining_work_count(*args) > 0
- end
-
def remove_failed_jobs
job_tracker.clean_up
end
def report_prometheus_metrics(*args)
report_running_jobs_metrics
- remaining_work_gauge.set(prometheus_labels, remaining_work_count(*args))
- max_running_jobs_gauge.set(prometheus_labels, max_running_jobs)
+ set_metric(:remaining_work_gauge, remaining_work_count(*args))
+ set_metric(:max_running_jobs_gauge, max_running_jobs)
end
- def report_running_jobs_metrics
- running_jobs_gauge.set(prometheus_labels, running_jobs_count)
- end
+ private
- def required_jobs_count(*args)
- [
- remaining_work_count(*args),
- remaining_capacity
- ].min
+ def perform_registered(*args)
+ report_running_jobs_metrics
+ perform_work(*args)
+ rescue StandardError => exception
+ raise
+ ensure
+ job_tracker.remove(jid)
+ report_prometheus_metrics(*args)
+ re_enqueue(*args) unless exception
end
- private
+ def report_running_jobs_metrics
+ set_metric(:running_jobs_gauge, running_jobs_count)
+ end
def running_jobs_count
job_tracker.count
@@ -138,32 +115,21 @@ module LimitedCapacity
end
def re_enqueue(*args)
- return unless has_capacity?
- return unless has_work?(*args)
+ return unless remaining_work_count(*args) > 0
self.class.perform_async(*args)
end
- def running_jobs_gauge
- strong_memoize(:running_jobs_gauge) do
- Gitlab::Metrics.gauge(:limited_capacity_worker_running_jobs, 'Number of running jobs')
- end
- end
-
- def max_running_jobs_gauge
- strong_memoize(:max_running_jobs_gauge) do
- Gitlab::Metrics.gauge(:limited_capacity_worker_max_running_jobs, 'Maximum number of running jobs')
+ def set_metric(name, value)
+ metrics = strong_memoize(:metrics) do
+ {
+ running_jobs_gauge: Gitlab::Metrics.gauge(:limited_capacity_worker_running_jobs, 'Number of running jobs'),
+ max_running_jobs_gauge: Gitlab::Metrics.gauge(:limited_capacity_worker_max_running_jobs, 'Maximum number of running jobs'),
+ remaining_work_gauge: Gitlab::Metrics.gauge(:limited_capacity_worker_remaining_work_count, 'Number of jobs waiting to be enqueued')
+ }
end
- end
-
- def remaining_work_gauge
- strong_memoize(:remaining_work_gauge) do
- Gitlab::Metrics.gauge(:limited_capacity_worker_remaining_work_count, 'Number of jobs waiting to be enqueued')
- end
- end
- def prometheus_labels
- { worker: self.class.name }
+ metrics[name].set({ worker: self.class.name }, value)
end
end
end