diff options
Diffstat (limited to 'lib/gitlab/sidekiq_middleware')
10 files changed, 84 insertions, 67 deletions
diff --git a/lib/gitlab/sidekiq_middleware/client_metrics.rb b/lib/gitlab/sidekiq_middleware/client_metrics.rb index e3cc7b28c41..ef80ed706f3 100644 --- a/lib/gitlab/sidekiq_middleware/client_metrics.rb +++ b/lib/gitlab/sidekiq_middleware/client_metrics.rb @@ -13,9 +13,15 @@ module Gitlab def call(worker_class, job, queue, _redis_pool) # worker_class can either be the string or class of the worker being enqueued. - worker_class = worker_class.safe_constantize if worker_class.respond_to?(:safe_constantize) + worker_class = worker_class.to_s.safe_constantize + labels = create_labels(worker_class, queue, job) - labels[:scheduling] = job.key?('at') ? 'delayed' : 'immediate' + if job.key?('at') + labels[:scheduling] = 'delayed' + job[:scheduled_at] = job['at'] + else + labels[:scheduling] = 'immediate' + end @metrics.fetch(ENQUEUED).increment(labels, 1) diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb index aeb58d7c153..e63164efc94 100644 --- a/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb +++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb @@ -64,9 +64,9 @@ module Gitlab Sidekiq.redis do |redis| redis.multi do |multi| - redis.set(idempotency_key, jid, ex: expiry, nx: true) - read_wal_locations = check_existing_wal_locations!(redis, expiry) - read_jid = redis.get(idempotency_key) + multi.set(idempotency_key, jid, ex: expiry, nx: true) + read_wal_locations = check_existing_wal_locations!(multi, expiry) + read_jid = multi.get(idempotency_key) end end @@ -81,9 +81,9 @@ module Gitlab return unless job_wal_locations.present? Sidekiq.redis do |redis| - redis.multi do + redis.multi do |multi| job_wal_locations.each do |connection_name, location| - redis.eval(LUA_SET_WAL_SCRIPT, keys: [wal_location_key(connection_name)], argv: [location, pg_wal_lsn_diff(connection_name).to_i, WAL_LOCATION_TTL]) + multi.eval(LUA_SET_WAL_SCRIPT, keys: [wal_location_key(connection_name)], argv: [location, pg_wal_lsn_diff(connection_name).to_i, WAL_LOCATION_TTL]) end end end @@ -96,9 +96,9 @@ module Gitlab read_wal_locations = {} Sidekiq.redis do |redis| - redis.multi do + redis.multi do |multi| job_wal_locations.keys.each do |connection_name| - read_wal_locations[connection_name] = redis.lindex(wal_location_key(connection_name), 0) + read_wal_locations[connection_name] = multi.lindex(wal_location_key(connection_name), 0) end end end @@ -110,8 +110,8 @@ module Gitlab def delete! Sidekiq.redis do |redis| redis.multi do |multi| - redis.del(idempotency_key) - delete_wal_locations!(redis) + multi.del(idempotency_key) + delete_wal_locations!(multi) end end end @@ -140,13 +140,14 @@ module Gitlab def idempotent? return false unless worker_klass return false unless worker_klass.respond_to?(:idempotent?) + return false unless preserve_wal_location? || !worker_klass.utilizes_load_balancing_capabilities? worker_klass.idempotent? end private - attr_accessor :existing_wal_locations + attr_writer :existing_wal_locations attr_reader :queue_name, :job attr_writer :existing_jid @@ -154,8 +155,33 @@ module Gitlab @worker_klass ||= worker_class_name.to_s.safe_constantize end + def delete_wal_locations!(redis) + job_wal_locations.keys.each do |connection_name| + redis.del(wal_location_key(connection_name)) + redis.del(existing_wal_location_key(connection_name)) + end + end + + def check_existing_wal_locations!(redis, expiry) + read_wal_locations = {} + + job_wal_locations.each do |connection_name, location| + key = existing_wal_location_key(connection_name) + redis.set(key, location, ex: expiry, nx: true) + read_wal_locations[connection_name] = redis.get(key) + end + + read_wal_locations + end + + def job_wal_locations + return {} unless preserve_wal_location? + + job['wal_locations'] || {} + end + def pg_wal_lsn_diff(connection_name) - Gitlab::Database::DATABASES[connection_name].pg_wal_lsn_diff(job_wal_locations[connection_name], existing_wal_locations[connection_name]) + Gitlab::Database.databases[connection_name].pg_wal_lsn_diff(job_wal_locations[connection_name], existing_wal_locations[connection_name]) end def strategy @@ -178,12 +204,6 @@ module Gitlab job['jid'] end - def job_wal_locations - return {} unless preserve_wal_location? - - job['wal_locations'] || {} - end - def existing_wal_location_key(connection_name) "#{idempotency_key}:#{connection_name}:existing_wal_location" end @@ -208,23 +228,8 @@ module Gitlab "#{worker_class_name}:#{Sidekiq.dump_json(arguments)}" end - def delete_wal_locations!(redis) - job_wal_locations.keys.each do |connection_name| - redis.del(wal_location_key(connection_name)) - redis.del(existing_wal_location_key(connection_name)) - end - end - - def check_existing_wal_locations!(redis, expiry) - read_wal_locations = {} - - job_wal_locations.each do |connection_name, location| - key = existing_wal_location_key(connection_name) - redis.set(key, location, ex: expiry, nx: true) - read_wal_locations[connection_name] = redis.get(key) - end - - read_wal_locations + def existing_wal_locations + @existing_wal_locations ||= {} end def preserve_wal_location? diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/deduplicates_when_scheduling.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/deduplicates_when_scheduling.rb index fc58d4f5323..b0da85b74a6 100644 --- a/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/deduplicates_when_scheduling.rb +++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/deduplicates_when_scheduling.rb @@ -4,11 +4,15 @@ module Gitlab module SidekiqMiddleware module DuplicateJobs module Strategies - module DeduplicatesWhenScheduling + class DeduplicatesWhenScheduling < Base + extend ::Gitlab::Utils::Override + + override :initialize def initialize(duplicate_job) @duplicate_job = duplicate_job end + override :schedule def schedule(job) if deduplicatable_job? && check! && duplicate_job.duplicate? job['duplicate-of'] = duplicate_job.existing_jid @@ -25,6 +29,7 @@ module Gitlab yield end + override :perform def perform(job) update_job_wal_location!(job) end diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executed.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executed.rb index 5164b994267..25f1b8b7c51 100644 --- a/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executed.rb +++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executed.rb @@ -7,11 +7,7 @@ module Gitlab # This strategy takes a lock before scheduling the job in a queue and # removes the lock after the job has executed preventing a new job to be queued # while a job is still executing. - class UntilExecuted < Base - extend ::Gitlab::Utils::Override - - include DeduplicatesWhenScheduling - + class UntilExecuted < DeduplicatesWhenScheduling override :perform def perform(job) super diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executing.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executing.rb index 1f7e3a4ea30..693e404af73 100644 --- a/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executing.rb +++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executing.rb @@ -7,11 +7,7 @@ module Gitlab # This strategy takes a lock before scheduling the job in a queue and # removes the lock before the job starts allowing a new job to be queued # while a job is still executing. - class UntilExecuting < Base - extend ::Gitlab::Utils::Override - - include DeduplicatesWhenScheduling - + class UntilExecuting < DeduplicatesWhenScheduling override :perform def perform(job) super diff --git a/lib/gitlab/sidekiq_middleware/metrics_helper.rb b/lib/gitlab/sidekiq_middleware/metrics_helper.rb index 66930a34319..207d2d769b2 100644 --- a/lib/gitlab/sidekiq_middleware/metrics_helper.rb +++ b/lib/gitlab/sidekiq_middleware/metrics_helper.rb @@ -3,14 +3,21 @@ module Gitlab module SidekiqMiddleware module MetricsHelper + include ::Gitlab::SidekiqMiddleware::WorkerContext + TRUE_LABEL = "yes" FALSE_LABEL = "no" private def create_labels(worker_class, queue, job) - worker_name = (job['wrapped'].presence || worker_class).to_s - worker = find_worker(worker_name, worker_class) + worker = find_worker(worker_class, job) + + # This should never happen: we should always be able to find a + # worker class for a given Sidekiq job. But if we can't, we + # shouldn't blow up here, because we want to record this in our + # metrics. + worker_name = worker.try(:name) || worker.class.name labels = { queue: queue.to_s, worker: worker_name, @@ -23,9 +30,7 @@ module Gitlab labels[:urgency] = worker.get_urgency.to_s labels[:external_dependencies] = bool_as_label(worker.worker_has_external_dependencies?) - - feature_category = worker.get_feature_category - labels[:feature_category] = feature_category.to_s + labels[:feature_category] = worker.get_feature_category.to_s resource_boundary = worker.get_worker_resource_boundary labels[:boundary] = resource_boundary == :unknown ? "" : resource_boundary.to_s @@ -36,10 +41,6 @@ module Gitlab def bool_as_label(value) value ? TRUE_LABEL : FALSE_LABEL end - - def find_worker(worker_name, worker_class) - Gitlab::SidekiqConfig::DEFAULT_WORKERS.fetch(worker_name, worker_class) - end end end end diff --git a/lib/gitlab/sidekiq_middleware/server_metrics.rb b/lib/gitlab/sidekiq_middleware/server_metrics.rb index 2d9767e0266..bea98403997 100644 --- a/lib/gitlab/sidekiq_middleware/server_metrics.rb +++ b/lib/gitlab/sidekiq_middleware/server_metrics.rb @@ -53,10 +53,7 @@ module Gitlab def initialize @metrics = self.class.metrics - - if ::Gitlab::Database::LoadBalancing.enable? - @metrics[:sidekiq_load_balancing_count] = ::Gitlab::Metrics.counter(:sidekiq_load_balancing_count, 'Sidekiq jobs with load balancing') - end + @metrics[:sidekiq_load_balancing_count] = ::Gitlab::Metrics.counter(:sidekiq_load_balancing_count, 'Sidekiq jobs with load balancing') end def call(worker, job, queue) @@ -128,8 +125,6 @@ module Gitlab private def with_load_balancing_settings(job) - return unless ::Gitlab::Database::LoadBalancing.enable? - keys = %w[load_balancing_strategy worker_data_consistency] return unless keys.all? { |k| job.key?(k) } diff --git a/lib/gitlab/sidekiq_middleware/worker_context.rb b/lib/gitlab/sidekiq_middleware/worker_context.rb index 897a9211948..a5d92cf699c 100644 --- a/lib/gitlab/sidekiq_middleware/worker_context.rb +++ b/lib/gitlab/sidekiq_middleware/worker_context.rb @@ -10,6 +10,12 @@ module Gitlab context_or_nil.use(&block) end + + def find_worker(worker_class, job) + worker_name = (job['wrapped'].presence || worker_class).to_s + + Gitlab::SidekiqConfig::DEFAULT_WORKERS[worker_name]&.klass || worker_class + end end end end diff --git a/lib/gitlab/sidekiq_middleware/worker_context/client.rb b/lib/gitlab/sidekiq_middleware/worker_context/client.rb index 1a899b27ea3..7d3925e9dec 100644 --- a/lib/gitlab/sidekiq_middleware/worker_context/client.rb +++ b/lib/gitlab/sidekiq_middleware/worker_context/client.rb @@ -7,11 +7,11 @@ module Gitlab include Gitlab::SidekiqMiddleware::WorkerContext def call(worker_class_or_name, job, _queue, _redis_pool, &block) - worker_class = worker_class_or_name.to_s.safe_constantize + worker_class = find_worker(worker_class_or_name.to_s.safe_constantize, job) - # Mailers can't be constantized like this + # This is not a worker we know about, perhaps from a gem return yield unless worker_class - return yield unless worker_class.include?(::ApplicationWorker) + return yield unless worker_class.respond_to?(:context_for_arguments) context_for_args = worker_class.context_for_arguments(job['args']) @@ -19,7 +19,14 @@ module Gitlab # This should be inside the context for the arguments so # that we don't override the feature category on the worker # with the one from the caller. - Gitlab::ApplicationContext.with_context(feature_category: worker_class.get_feature_category.to_s, &block) + # + # We do not want to set anything explicitly in the context + # when the feature category is 'not_owned'. + if worker_class.feature_category_not_owned? + yield + else + Gitlab::ApplicationContext.with_context(feature_category: worker_class.get_feature_category.to_s, &block) + end end end end diff --git a/lib/gitlab/sidekiq_middleware/worker_context/server.rb b/lib/gitlab/sidekiq_middleware/worker_context/server.rb index 2d8fd8002d2..d026f4918c6 100644 --- a/lib/gitlab/sidekiq_middleware/worker_context/server.rb +++ b/lib/gitlab/sidekiq_middleware/worker_context/server.rb @@ -7,7 +7,7 @@ module Gitlab include Gitlab::SidekiqMiddleware::WorkerContext def call(worker, job, _queue, &block) - worker_class = worker.class + worker_class = find_worker(worker.class, job) # This is not a worker we know about, perhaps from a gem return yield unless worker_class.respond_to?(:get_worker_context) |