diff options
Diffstat (limited to 'app/workers/concerns')
-rw-r--r-- | app/workers/concerns/application_worker.rb | 13 | ||||
-rw-r--r-- | app/workers/concerns/chaos_queue.rb | 1 | ||||
-rw-r--r-- | app/workers/concerns/git_garbage_collect_methods.rb | 4 | ||||
-rw-r--r-- | app/workers/concerns/gitlab/github_import/object_importer.rb | 22 | ||||
-rw-r--r-- | app/workers/concerns/gitlab/github_import/stage_methods.rb | 2 | ||||
-rw-r--r-- | app/workers/concerns/gitlab/jira_import/import_worker.rb | 2 | ||||
-rw-r--r-- | app/workers/concerns/limited_capacity/job_tracker.rb | 42 | ||||
-rw-r--r-- | app/workers/concerns/limited_capacity/worker.rb | 86 | ||||
-rw-r--r-- | app/workers/concerns/reactive_cacheable_worker.rb | 2 | ||||
-rw-r--r-- | app/workers/concerns/waitable_worker.rb | 2 | ||||
-rw-r--r-- | app/workers/concerns/worker_attributes.rb | 26 |
11 files changed, 85 insertions, 117 deletions
diff --git a/app/workers/concerns/application_worker.rb b/app/workers/concerns/application_worker.rb index 0de26e27631..843be4896a3 100644 --- a/app/workers/concerns/application_worker.rb +++ b/app/workers/concerns/application_worker.rb @@ -16,6 +16,7 @@ module ApplicationWorker included do set_queue + after_set_class_attribute { set_queue } def structured_payload(payload = {}) context = Gitlab::ApplicationContext.current.merge( @@ -47,22 +48,14 @@ module ApplicationWorker class_methods do def inherited(subclass) subclass.set_queue + subclass.after_set_class_attribute { subclass.set_queue } end def set_queue - queue_name = [queue_namespace, base_queue_name].compact.join(':') - + queue_name = ::Gitlab::SidekiqConfig::WorkerRouter.global.route(self) sidekiq_options queue: queue_name # rubocop:disable Cop/SidekiqOptionsQueue end - def base_queue_name - name - .sub(/\AGitlab::/, '') - .sub(/Worker\z/, '') - .underscore - .tr('/', '_') - end - def queue_namespace(new_namespace = nil) if new_namespace sidekiq_options queue_namespace: new_namespace diff --git a/app/workers/concerns/chaos_queue.rb b/app/workers/concerns/chaos_queue.rb index a9c557f0175..2ccd55157c6 100644 --- a/app/workers/concerns/chaos_queue.rb +++ b/app/workers/concerns/chaos_queue.rb @@ -6,5 +6,6 @@ module ChaosQueue included do queue_namespace :chaos feature_category_not_owned! + tags :exclude_from_gitlab_com end end diff --git a/app/workers/concerns/git_garbage_collect_methods.rb b/app/workers/concerns/git_garbage_collect_methods.rb index 17a80d1ddb3..c46deeb716f 100644 --- a/app/workers/concerns/git_garbage_collect_methods.rb +++ b/app/workers/concerns/git_garbage_collect_methods.rb @@ -97,10 +97,10 @@ module GitGarbageCollectMethods end rescue GRPC::NotFound => e Gitlab::GitLogger.error("#{__method__} failed:\nRepository not found") - raise Gitlab::Git::Repository::NoRepository.new(e) + raise Gitlab::Git::Repository::NoRepository, e rescue GRPC::BadStatus => e Gitlab::GitLogger.error("#{__method__} failed:\n#{e}") - raise Gitlab::Git::CommandError.new(e) + raise Gitlab::Git::CommandError, e end def get_gitaly_client(task, repository) diff --git a/app/workers/concerns/gitlab/github_import/object_importer.rb b/app/workers/concerns/gitlab/github_import/object_importer.rb index 575cd4862b0..6ebf7c7c263 100644 --- a/app/workers/concerns/gitlab/github_import/object_importer.rb +++ b/app/workers/concerns/gitlab/github_import/object_importer.rb @@ -9,6 +9,8 @@ module Gitlab included do include ApplicationWorker + + sidekiq_options retry: 3 include GithubImport::Queue include ReschedulingMethods include Gitlab::NotifyUponDeath @@ -25,15 +27,19 @@ module Gitlab # client - An instance of `Gitlab::GithubImport::Client` # hash - A Hash containing the details of the object to import. def import(project, client, hash) + object = representation_class.from_json_hash(hash) + + # To better express in the logs what object is being imported. + self.github_id = object.attributes.fetch(:github_id) + info(project.id, message: 'starting importer') - object = representation_class.from_json_hash(hash) importer_class.new(object, project, client).execute counter.increment info(project.id, message: 'importer finished') - rescue => e - error(project.id, e) + rescue StandardError => e + error(project.id, e, hash) end def counter @@ -63,16 +69,19 @@ module Gitlab private + attr_accessor :github_id + def info(project_id, extra = {}) logger.info(log_attributes(project_id, extra)) end - def error(project_id, exception) + def error(project_id, exception, data = {}) logger.error( log_attributes( project_id, message: 'importer failed', - 'error.message': exception.message + 'error.message': exception.message, + 'github.data': data ) ) @@ -86,7 +95,8 @@ module Gitlab extra.merge( import_source: :github, project_id: project_id, - importer: importer_class.name + importer: importer_class.name, + github_id: github_id ) end end diff --git a/app/workers/concerns/gitlab/github_import/stage_methods.rb b/app/workers/concerns/gitlab/github_import/stage_methods.rb index e5985fb94da..916b273a28f 100644 --- a/app/workers/concerns/gitlab/github_import/stage_methods.rb +++ b/app/workers/concerns/gitlab/github_import/stage_methods.rb @@ -14,7 +14,7 @@ module Gitlab try_import(client, project) info(project_id, message: 'stage finished') - rescue => e + rescue StandardError => e error(project_id, e) end diff --git a/app/workers/concerns/gitlab/jira_import/import_worker.rb b/app/workers/concerns/gitlab/jira_import/import_worker.rb index fdc6e64bbaa..107b6e2e9be 100644 --- a/app/workers/concerns/gitlab/jira_import/import_worker.rb +++ b/app/workers/concerns/gitlab/jira_import/import_worker.rb @@ -7,6 +7,8 @@ module Gitlab included do include ApplicationWorker + + sidekiq_options retry: 3 include ProjectImportOptions include Gitlab::JiraImport::QueueOptions end diff --git a/app/workers/concerns/limited_capacity/job_tracker.rb b/app/workers/concerns/limited_capacity/job_tracker.rb index 96b6e1a2024..47b13cd5bf6 100644 --- a/app/workers/concerns/limited_capacity/job_tracker.rb +++ b/app/workers/concerns/limited_capacity/job_tracker.rb @@ -3,21 +3,30 @@ module LimitedCapacity class JobTracker # rubocop:disable Scalability/IdempotentWorker include Gitlab::Utils::StrongMemoize + LUA_REGISTER_SCRIPT = <<~EOS + local set_key, element, max_elements = KEYS[1], ARGV[1], ARGV[2] + + if redis.call("scard", set_key) < tonumber(max_elements) then + redis.call("sadd", set_key, element) + return true + end + + return false + EOS + def initialize(namespace) @namespace = namespace end - def register(jid) - _added, @count = with_redis_pipeline do |redis| - register_job_keys(redis, jid) - get_job_count(redis) - end + def register(jid, max_jids) + with_redis do |redis| + redis.eval(LUA_REGISTER_SCRIPT, keys: [counter_key], argv: [jid, max_jids]) + end.present? end def remove(jid) - _removed, @count = with_redis_pipeline do |redis| + with_redis do |redis| remove_job_keys(redis, jid) - get_job_count(redis) end end @@ -25,14 +34,13 @@ module LimitedCapacity completed_jids = Gitlab::SidekiqStatus.completed_jids(running_jids) return unless completed_jids.any? - _removed, @count = with_redis_pipeline do |redis| + with_redis do |redis| remove_job_keys(redis, completed_jids) - get_job_count(redis) end end def count - @count ||= with_redis { |redis| get_job_count(redis) } + with_redis { |redis| redis.scard(counter_key) } end def running_jids @@ -49,14 +57,6 @@ module LimitedCapacity "worker:#{namespace.to_s.underscore}:running" end - def get_job_count(redis) - redis.scard(counter_key) - end - - def register_job_keys(redis, keys) - redis.sadd(counter_key, keys) - end - def remove_job_keys(redis, keys) redis.srem(counter_key, keys) end @@ -64,11 +64,5 @@ module LimitedCapacity def with_redis(&block) Gitlab::Redis::Queues.with(&block) # rubocop: disable CodeReuse/ActiveRecord end - - def with_redis_pipeline(&block) - with_redis do |redis| - redis.pipelined(&block) - end - end end end diff --git a/app/workers/concerns/limited_capacity/worker.rb b/app/workers/concerns/limited_capacity/worker.rb index 9dd8d942146..b4cdfda680f 100644 --- a/app/workers/concerns/limited_capacity/worker.rb +++ b/app/workers/concerns/limited_capacity/worker.rb @@ -55,26 +55,14 @@ module LimitedCapacity def perform_with_capacity(*args) worker = self.new worker.remove_failed_jobs - worker.report_prometheus_metrics(*args) - required_jobs_count = worker.required_jobs_count(*args) - arguments = Array.new(required_jobs_count) { args } + arguments = Array.new(worker.max_running_jobs) { args } self.bulk_perform_async(arguments) # rubocop:disable Scalability/BulkPerformWithContext end end def perform(*args) - return unless has_capacity? - - job_tracker.register(jid) - report_running_jobs_metrics - perform_work(*args) - rescue => exception - raise - ensure - job_tracker.remove(jid) - report_prometheus_metrics(*args) - re_enqueue(*args) unless exception + perform_registered(*args) if job_tracker.register(jid, max_running_jobs) end def perform_work(*args) @@ -89,43 +77,32 @@ module LimitedCapacity raise NotImplementedError end - def has_capacity? - remaining_capacity > 0 - end - - def remaining_capacity - [ - max_running_jobs - running_jobs_count - self.class.queue_size, - 0 - ].max - end - - def has_work?(*args) - remaining_work_count(*args) > 0 - end - def remove_failed_jobs job_tracker.clean_up end def report_prometheus_metrics(*args) report_running_jobs_metrics - remaining_work_gauge.set(prometheus_labels, remaining_work_count(*args)) - max_running_jobs_gauge.set(prometheus_labels, max_running_jobs) + set_metric(:remaining_work_gauge, remaining_work_count(*args)) + set_metric(:max_running_jobs_gauge, max_running_jobs) end - def report_running_jobs_metrics - running_jobs_gauge.set(prometheus_labels, running_jobs_count) - end + private - def required_jobs_count(*args) - [ - remaining_work_count(*args), - remaining_capacity - ].min + def perform_registered(*args) + report_running_jobs_metrics + perform_work(*args) + rescue StandardError => exception + raise + ensure + job_tracker.remove(jid) + report_prometheus_metrics(*args) + re_enqueue(*args) unless exception end - private + def report_running_jobs_metrics + set_metric(:running_jobs_gauge, running_jobs_count) + end def running_jobs_count job_tracker.count @@ -138,32 +115,21 @@ module LimitedCapacity end def re_enqueue(*args) - return unless has_capacity? - return unless has_work?(*args) + return unless remaining_work_count(*args) > 0 self.class.perform_async(*args) end - def running_jobs_gauge - strong_memoize(:running_jobs_gauge) do - Gitlab::Metrics.gauge(:limited_capacity_worker_running_jobs, 'Number of running jobs') - end - end - - def max_running_jobs_gauge - strong_memoize(:max_running_jobs_gauge) do - Gitlab::Metrics.gauge(:limited_capacity_worker_max_running_jobs, 'Maximum number of running jobs') + def set_metric(name, value) + metrics = strong_memoize(:metrics) do + { + running_jobs_gauge: Gitlab::Metrics.gauge(:limited_capacity_worker_running_jobs, 'Number of running jobs'), + max_running_jobs_gauge: Gitlab::Metrics.gauge(:limited_capacity_worker_max_running_jobs, 'Maximum number of running jobs'), + remaining_work_gauge: Gitlab::Metrics.gauge(:limited_capacity_worker_remaining_work_count, 'Number of jobs waiting to be enqueued') + } end - end - - def remaining_work_gauge - strong_memoize(:remaining_work_gauge) do - Gitlab::Metrics.gauge(:limited_capacity_worker_remaining_work_count, 'Number of jobs waiting to be enqueued') - end - end - def prometheus_labels - { worker: self.class.name } + metrics[name].set({ worker: self.class.name }, value) end end end diff --git a/app/workers/concerns/reactive_cacheable_worker.rb b/app/workers/concerns/reactive_cacheable_worker.rb index 9e882c8ac7a..78fcf8087c2 100644 --- a/app/workers/concerns/reactive_cacheable_worker.rb +++ b/app/workers/concerns/reactive_cacheable_worker.rb @@ -6,6 +6,8 @@ module ReactiveCacheableWorker included do include ApplicationWorker + sidekiq_options retry: 3 + feature_category_not_owned! loggable_arguments 0 diff --git a/app/workers/concerns/waitable_worker.rb b/app/workers/concerns/waitable_worker.rb index c3abcdafcf2..e62bd8d9885 100644 --- a/app/workers/concerns/waitable_worker.rb +++ b/app/workers/concerns/waitable_worker.rb @@ -33,7 +33,7 @@ module WaitableWorker args_list.each do |args| new.perform(*args) - rescue + rescue StandardError failed << args end diff --git a/app/workers/concerns/worker_attributes.rb b/app/workers/concerns/worker_attributes.rb index 6f99fd089ac..6dee9402691 100644 --- a/app/workers/concerns/worker_attributes.rb +++ b/app/workers/concerns/worker_attributes.rb @@ -36,13 +36,13 @@ module WorkerAttributes def feature_category(value, *extras) raise "Invalid category. Use `feature_category_not_owned!` to mark a worker as not owned" if value == :not_owned - class_attributes[:feature_category] = value + set_class_attribute(:feature_category, value) end # Special case: mark this work as not associated with a feature category # this should be used for cross-cutting concerns, such as mailer workers. def feature_category_not_owned! - class_attributes[:feature_category] = :not_owned + set_class_attribute(:feature_category, :not_owned) end def get_feature_category @@ -64,7 +64,7 @@ module WorkerAttributes def urgency(urgency) raise "Invalid urgency: #{urgency}" unless VALID_URGENCIES.include?(urgency) - class_attributes[:urgency] = urgency + set_class_attribute(:urgency, urgency) end def get_urgency @@ -75,8 +75,8 @@ module WorkerAttributes raise ArgumentError, "Invalid data consistency: #{data_consistency}" unless VALID_DATA_CONSISTENCIES.include?(data_consistency) raise ArgumentError, 'Data consistency is already set' if class_attributes[:data_consistency] - class_attributes[:data_consistency_feature_flag] = feature_flag if feature_flag - class_attributes[:data_consistency] = data_consistency + set_class_attribute(:data_consistency_feature_flag, feature_flag) if feature_flag + set_class_attribute(:data_consistency, data_consistency) validate_worker_attributes! end @@ -105,7 +105,7 @@ module WorkerAttributes # doc/development/sidekiq_style_guide.md#jobs-with-external-dependencies for # details def worker_has_external_dependencies! - class_attributes[:external_dependencies] = true + set_class_attribute(:external_dependencies, true) end # Returns a truthy value if the worker has external dependencies. @@ -118,7 +118,7 @@ module WorkerAttributes def worker_resource_boundary(boundary) raise "Invalid boundary" unless VALID_RESOURCE_BOUNDARIES.include? boundary - class_attributes[:resource_boundary] = boundary + set_class_attribute(:resource_boundary, boundary) end def get_worker_resource_boundary @@ -126,7 +126,7 @@ module WorkerAttributes end def idempotent! - class_attributes[:idempotent] = true + set_class_attribute(:idempotent, true) validate_worker_attributes! end @@ -136,7 +136,7 @@ module WorkerAttributes end def weight(value) - class_attributes[:weight] = value + set_class_attribute(:weight, value) end def get_weight @@ -146,7 +146,7 @@ module WorkerAttributes end def tags(*values) - class_attributes[:tags] = values + set_class_attribute(:tags, values) end def get_tags @@ -154,8 +154,8 @@ module WorkerAttributes end def deduplicate(strategy, options = {}) - class_attributes[:deduplication_strategy] = strategy - class_attributes[:deduplication_options] = options + set_class_attribute(:deduplication_strategy, strategy) + set_class_attribute(:deduplication_options, options) end def get_deduplicate_strategy @@ -168,7 +168,7 @@ module WorkerAttributes end def big_payload! - class_attributes[:big_payload] = true + set_class_attribute(:big_payload, true) end def big_payload? |