Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'app/workers/concerns')
-rw-r--r--app/workers/concerns/application_worker.rb13
-rw-r--r--app/workers/concerns/chaos_queue.rb1
-rw-r--r--app/workers/concerns/git_garbage_collect_methods.rb4
-rw-r--r--app/workers/concerns/gitlab/github_import/object_importer.rb22
-rw-r--r--app/workers/concerns/gitlab/github_import/stage_methods.rb2
-rw-r--r--app/workers/concerns/gitlab/jira_import/import_worker.rb2
-rw-r--r--app/workers/concerns/limited_capacity/job_tracker.rb42
-rw-r--r--app/workers/concerns/limited_capacity/worker.rb86
-rw-r--r--app/workers/concerns/reactive_cacheable_worker.rb2
-rw-r--r--app/workers/concerns/waitable_worker.rb2
-rw-r--r--app/workers/concerns/worker_attributes.rb26
11 files changed, 85 insertions, 117 deletions
diff --git a/app/workers/concerns/application_worker.rb b/app/workers/concerns/application_worker.rb
index 0de26e27631..843be4896a3 100644
--- a/app/workers/concerns/application_worker.rb
+++ b/app/workers/concerns/application_worker.rb
@@ -16,6 +16,7 @@ module ApplicationWorker
included do
set_queue
+ after_set_class_attribute { set_queue }
def structured_payload(payload = {})
context = Gitlab::ApplicationContext.current.merge(
@@ -47,22 +48,14 @@ module ApplicationWorker
class_methods do
def inherited(subclass)
subclass.set_queue
+ subclass.after_set_class_attribute { subclass.set_queue }
end
def set_queue
- queue_name = [queue_namespace, base_queue_name].compact.join(':')
-
+ queue_name = ::Gitlab::SidekiqConfig::WorkerRouter.global.route(self)
sidekiq_options queue: queue_name # rubocop:disable Cop/SidekiqOptionsQueue
end
- def base_queue_name
- name
- .sub(/\AGitlab::/, '')
- .sub(/Worker\z/, '')
- .underscore
- .tr('/', '_')
- end
-
def queue_namespace(new_namespace = nil)
if new_namespace
sidekiq_options queue_namespace: new_namespace
diff --git a/app/workers/concerns/chaos_queue.rb b/app/workers/concerns/chaos_queue.rb
index a9c557f0175..2ccd55157c6 100644
--- a/app/workers/concerns/chaos_queue.rb
+++ b/app/workers/concerns/chaos_queue.rb
@@ -6,5 +6,6 @@ module ChaosQueue
included do
queue_namespace :chaos
feature_category_not_owned!
+ tags :exclude_from_gitlab_com
end
end
diff --git a/app/workers/concerns/git_garbage_collect_methods.rb b/app/workers/concerns/git_garbage_collect_methods.rb
index 17a80d1ddb3..c46deeb716f 100644
--- a/app/workers/concerns/git_garbage_collect_methods.rb
+++ b/app/workers/concerns/git_garbage_collect_methods.rb
@@ -97,10 +97,10 @@ module GitGarbageCollectMethods
end
rescue GRPC::NotFound => e
Gitlab::GitLogger.error("#{__method__} failed:\nRepository not found")
- raise Gitlab::Git::Repository::NoRepository.new(e)
+ raise Gitlab::Git::Repository::NoRepository, e
rescue GRPC::BadStatus => e
Gitlab::GitLogger.error("#{__method__} failed:\n#{e}")
- raise Gitlab::Git::CommandError.new(e)
+ raise Gitlab::Git::CommandError, e
end
def get_gitaly_client(task, repository)
diff --git a/app/workers/concerns/gitlab/github_import/object_importer.rb b/app/workers/concerns/gitlab/github_import/object_importer.rb
index 575cd4862b0..6ebf7c7c263 100644
--- a/app/workers/concerns/gitlab/github_import/object_importer.rb
+++ b/app/workers/concerns/gitlab/github_import/object_importer.rb
@@ -9,6 +9,8 @@ module Gitlab
included do
include ApplicationWorker
+
+ sidekiq_options retry: 3
include GithubImport::Queue
include ReschedulingMethods
include Gitlab::NotifyUponDeath
@@ -25,15 +27,19 @@ module Gitlab
# client - An instance of `Gitlab::GithubImport::Client`
# hash - A Hash containing the details of the object to import.
def import(project, client, hash)
+ object = representation_class.from_json_hash(hash)
+
+ # To better express in the logs what object is being imported.
+ self.github_id = object.attributes.fetch(:github_id)
+
info(project.id, message: 'starting importer')
- object = representation_class.from_json_hash(hash)
importer_class.new(object, project, client).execute
counter.increment
info(project.id, message: 'importer finished')
- rescue => e
- error(project.id, e)
+ rescue StandardError => e
+ error(project.id, e, hash)
end
def counter
@@ -63,16 +69,19 @@ module Gitlab
private
+ attr_accessor :github_id
+
def info(project_id, extra = {})
logger.info(log_attributes(project_id, extra))
end
- def error(project_id, exception)
+ def error(project_id, exception, data = {})
logger.error(
log_attributes(
project_id,
message: 'importer failed',
- 'error.message': exception.message
+ 'error.message': exception.message,
+ 'github.data': data
)
)
@@ -86,7 +95,8 @@ module Gitlab
extra.merge(
import_source: :github,
project_id: project_id,
- importer: importer_class.name
+ importer: importer_class.name,
+ github_id: github_id
)
end
end
diff --git a/app/workers/concerns/gitlab/github_import/stage_methods.rb b/app/workers/concerns/gitlab/github_import/stage_methods.rb
index e5985fb94da..916b273a28f 100644
--- a/app/workers/concerns/gitlab/github_import/stage_methods.rb
+++ b/app/workers/concerns/gitlab/github_import/stage_methods.rb
@@ -14,7 +14,7 @@ module Gitlab
try_import(client, project)
info(project_id, message: 'stage finished')
- rescue => e
+ rescue StandardError => e
error(project_id, e)
end
diff --git a/app/workers/concerns/gitlab/jira_import/import_worker.rb b/app/workers/concerns/gitlab/jira_import/import_worker.rb
index fdc6e64bbaa..107b6e2e9be 100644
--- a/app/workers/concerns/gitlab/jira_import/import_worker.rb
+++ b/app/workers/concerns/gitlab/jira_import/import_worker.rb
@@ -7,6 +7,8 @@ module Gitlab
included do
include ApplicationWorker
+
+ sidekiq_options retry: 3
include ProjectImportOptions
include Gitlab::JiraImport::QueueOptions
end
diff --git a/app/workers/concerns/limited_capacity/job_tracker.rb b/app/workers/concerns/limited_capacity/job_tracker.rb
index 96b6e1a2024..47b13cd5bf6 100644
--- a/app/workers/concerns/limited_capacity/job_tracker.rb
+++ b/app/workers/concerns/limited_capacity/job_tracker.rb
@@ -3,21 +3,30 @@ module LimitedCapacity
class JobTracker # rubocop:disable Scalability/IdempotentWorker
include Gitlab::Utils::StrongMemoize
+ LUA_REGISTER_SCRIPT = <<~EOS
+ local set_key, element, max_elements = KEYS[1], ARGV[1], ARGV[2]
+
+ if redis.call("scard", set_key) < tonumber(max_elements) then
+ redis.call("sadd", set_key, element)
+ return true
+ end
+
+ return false
+ EOS
+
def initialize(namespace)
@namespace = namespace
end
- def register(jid)
- _added, @count = with_redis_pipeline do |redis|
- register_job_keys(redis, jid)
- get_job_count(redis)
- end
+ def register(jid, max_jids)
+ with_redis do |redis|
+ redis.eval(LUA_REGISTER_SCRIPT, keys: [counter_key], argv: [jid, max_jids])
+ end.present?
end
def remove(jid)
- _removed, @count = with_redis_pipeline do |redis|
+ with_redis do |redis|
remove_job_keys(redis, jid)
- get_job_count(redis)
end
end
@@ -25,14 +34,13 @@ module LimitedCapacity
completed_jids = Gitlab::SidekiqStatus.completed_jids(running_jids)
return unless completed_jids.any?
- _removed, @count = with_redis_pipeline do |redis|
+ with_redis do |redis|
remove_job_keys(redis, completed_jids)
- get_job_count(redis)
end
end
def count
- @count ||= with_redis { |redis| get_job_count(redis) }
+ with_redis { |redis| redis.scard(counter_key) }
end
def running_jids
@@ -49,14 +57,6 @@ module LimitedCapacity
"worker:#{namespace.to_s.underscore}:running"
end
- def get_job_count(redis)
- redis.scard(counter_key)
- end
-
- def register_job_keys(redis, keys)
- redis.sadd(counter_key, keys)
- end
-
def remove_job_keys(redis, keys)
redis.srem(counter_key, keys)
end
@@ -64,11 +64,5 @@ module LimitedCapacity
def with_redis(&block)
Gitlab::Redis::Queues.with(&block) # rubocop: disable CodeReuse/ActiveRecord
end
-
- def with_redis_pipeline(&block)
- with_redis do |redis|
- redis.pipelined(&block)
- end
- end
end
end
diff --git a/app/workers/concerns/limited_capacity/worker.rb b/app/workers/concerns/limited_capacity/worker.rb
index 9dd8d942146..b4cdfda680f 100644
--- a/app/workers/concerns/limited_capacity/worker.rb
+++ b/app/workers/concerns/limited_capacity/worker.rb
@@ -55,26 +55,14 @@ module LimitedCapacity
def perform_with_capacity(*args)
worker = self.new
worker.remove_failed_jobs
- worker.report_prometheus_metrics(*args)
- required_jobs_count = worker.required_jobs_count(*args)
- arguments = Array.new(required_jobs_count) { args }
+ arguments = Array.new(worker.max_running_jobs) { args }
self.bulk_perform_async(arguments) # rubocop:disable Scalability/BulkPerformWithContext
end
end
def perform(*args)
- return unless has_capacity?
-
- job_tracker.register(jid)
- report_running_jobs_metrics
- perform_work(*args)
- rescue => exception
- raise
- ensure
- job_tracker.remove(jid)
- report_prometheus_metrics(*args)
- re_enqueue(*args) unless exception
+ perform_registered(*args) if job_tracker.register(jid, max_running_jobs)
end
def perform_work(*args)
@@ -89,43 +77,32 @@ module LimitedCapacity
raise NotImplementedError
end
- def has_capacity?
- remaining_capacity > 0
- end
-
- def remaining_capacity
- [
- max_running_jobs - running_jobs_count - self.class.queue_size,
- 0
- ].max
- end
-
- def has_work?(*args)
- remaining_work_count(*args) > 0
- end
-
def remove_failed_jobs
job_tracker.clean_up
end
def report_prometheus_metrics(*args)
report_running_jobs_metrics
- remaining_work_gauge.set(prometheus_labels, remaining_work_count(*args))
- max_running_jobs_gauge.set(prometheus_labels, max_running_jobs)
+ set_metric(:remaining_work_gauge, remaining_work_count(*args))
+ set_metric(:max_running_jobs_gauge, max_running_jobs)
end
- def report_running_jobs_metrics
- running_jobs_gauge.set(prometheus_labels, running_jobs_count)
- end
+ private
- def required_jobs_count(*args)
- [
- remaining_work_count(*args),
- remaining_capacity
- ].min
+ def perform_registered(*args)
+ report_running_jobs_metrics
+ perform_work(*args)
+ rescue StandardError => exception
+ raise
+ ensure
+ job_tracker.remove(jid)
+ report_prometheus_metrics(*args)
+ re_enqueue(*args) unless exception
end
- private
+ def report_running_jobs_metrics
+ set_metric(:running_jobs_gauge, running_jobs_count)
+ end
def running_jobs_count
job_tracker.count
@@ -138,32 +115,21 @@ module LimitedCapacity
end
def re_enqueue(*args)
- return unless has_capacity?
- return unless has_work?(*args)
+ return unless remaining_work_count(*args) > 0
self.class.perform_async(*args)
end
- def running_jobs_gauge
- strong_memoize(:running_jobs_gauge) do
- Gitlab::Metrics.gauge(:limited_capacity_worker_running_jobs, 'Number of running jobs')
- end
- end
-
- def max_running_jobs_gauge
- strong_memoize(:max_running_jobs_gauge) do
- Gitlab::Metrics.gauge(:limited_capacity_worker_max_running_jobs, 'Maximum number of running jobs')
+ def set_metric(name, value)
+ metrics = strong_memoize(:metrics) do
+ {
+ running_jobs_gauge: Gitlab::Metrics.gauge(:limited_capacity_worker_running_jobs, 'Number of running jobs'),
+ max_running_jobs_gauge: Gitlab::Metrics.gauge(:limited_capacity_worker_max_running_jobs, 'Maximum number of running jobs'),
+ remaining_work_gauge: Gitlab::Metrics.gauge(:limited_capacity_worker_remaining_work_count, 'Number of jobs waiting to be enqueued')
+ }
end
- end
-
- def remaining_work_gauge
- strong_memoize(:remaining_work_gauge) do
- Gitlab::Metrics.gauge(:limited_capacity_worker_remaining_work_count, 'Number of jobs waiting to be enqueued')
- end
- end
- def prometheus_labels
- { worker: self.class.name }
+ metrics[name].set({ worker: self.class.name }, value)
end
end
end
diff --git a/app/workers/concerns/reactive_cacheable_worker.rb b/app/workers/concerns/reactive_cacheable_worker.rb
index 9e882c8ac7a..78fcf8087c2 100644
--- a/app/workers/concerns/reactive_cacheable_worker.rb
+++ b/app/workers/concerns/reactive_cacheable_worker.rb
@@ -6,6 +6,8 @@ module ReactiveCacheableWorker
included do
include ApplicationWorker
+ sidekiq_options retry: 3
+
feature_category_not_owned!
loggable_arguments 0
diff --git a/app/workers/concerns/waitable_worker.rb b/app/workers/concerns/waitable_worker.rb
index c3abcdafcf2..e62bd8d9885 100644
--- a/app/workers/concerns/waitable_worker.rb
+++ b/app/workers/concerns/waitable_worker.rb
@@ -33,7 +33,7 @@ module WaitableWorker
args_list.each do |args|
new.perform(*args)
- rescue
+ rescue StandardError
failed << args
end
diff --git a/app/workers/concerns/worker_attributes.rb b/app/workers/concerns/worker_attributes.rb
index 6f99fd089ac..6dee9402691 100644
--- a/app/workers/concerns/worker_attributes.rb
+++ b/app/workers/concerns/worker_attributes.rb
@@ -36,13 +36,13 @@ module WorkerAttributes
def feature_category(value, *extras)
raise "Invalid category. Use `feature_category_not_owned!` to mark a worker as not owned" if value == :not_owned
- class_attributes[:feature_category] = value
+ set_class_attribute(:feature_category, value)
end
# Special case: mark this work as not associated with a feature category
# this should be used for cross-cutting concerns, such as mailer workers.
def feature_category_not_owned!
- class_attributes[:feature_category] = :not_owned
+ set_class_attribute(:feature_category, :not_owned)
end
def get_feature_category
@@ -64,7 +64,7 @@ module WorkerAttributes
def urgency(urgency)
raise "Invalid urgency: #{urgency}" unless VALID_URGENCIES.include?(urgency)
- class_attributes[:urgency] = urgency
+ set_class_attribute(:urgency, urgency)
end
def get_urgency
@@ -75,8 +75,8 @@ module WorkerAttributes
raise ArgumentError, "Invalid data consistency: #{data_consistency}" unless VALID_DATA_CONSISTENCIES.include?(data_consistency)
raise ArgumentError, 'Data consistency is already set' if class_attributes[:data_consistency]
- class_attributes[:data_consistency_feature_flag] = feature_flag if feature_flag
- class_attributes[:data_consistency] = data_consistency
+ set_class_attribute(:data_consistency_feature_flag, feature_flag) if feature_flag
+ set_class_attribute(:data_consistency, data_consistency)
validate_worker_attributes!
end
@@ -105,7 +105,7 @@ module WorkerAttributes
# doc/development/sidekiq_style_guide.md#jobs-with-external-dependencies for
# details
def worker_has_external_dependencies!
- class_attributes[:external_dependencies] = true
+ set_class_attribute(:external_dependencies, true)
end
# Returns a truthy value if the worker has external dependencies.
@@ -118,7 +118,7 @@ module WorkerAttributes
def worker_resource_boundary(boundary)
raise "Invalid boundary" unless VALID_RESOURCE_BOUNDARIES.include? boundary
- class_attributes[:resource_boundary] = boundary
+ set_class_attribute(:resource_boundary, boundary)
end
def get_worker_resource_boundary
@@ -126,7 +126,7 @@ module WorkerAttributes
end
def idempotent!
- class_attributes[:idempotent] = true
+ set_class_attribute(:idempotent, true)
validate_worker_attributes!
end
@@ -136,7 +136,7 @@ module WorkerAttributes
end
def weight(value)
- class_attributes[:weight] = value
+ set_class_attribute(:weight, value)
end
def get_weight
@@ -146,7 +146,7 @@ module WorkerAttributes
end
def tags(*values)
- class_attributes[:tags] = values
+ set_class_attribute(:tags, values)
end
def get_tags
@@ -154,8 +154,8 @@ module WorkerAttributes
end
def deduplicate(strategy, options = {})
- class_attributes[:deduplication_strategy] = strategy
- class_attributes[:deduplication_options] = options
+ set_class_attribute(:deduplication_strategy, strategy)
+ set_class_attribute(:deduplication_options, options)
end
def get_deduplicate_strategy
@@ -168,7 +168,7 @@ module WorkerAttributes
end
def big_payload!
- class_attributes[:big_payload] = true
+ set_class_attribute(:big_payload, true)
end
def big_payload?