Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'lib/gitlab/sidekiq_middleware')
-rw-r--r--lib/gitlab/sidekiq_middleware/client_metrics.rb10
-rw-r--r--lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb73
-rw-r--r--lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/deduplicates_when_scheduling.rb7
-rw-r--r--lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executed.rb6
-rw-r--r--lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executing.rb6
-rw-r--r--lib/gitlab/sidekiq_middleware/metrics_helper.rb19
-rw-r--r--lib/gitlab/sidekiq_middleware/server_metrics.rb7
-rw-r--r--lib/gitlab/sidekiq_middleware/worker_context.rb6
-rw-r--r--lib/gitlab/sidekiq_middleware/worker_context/client.rb15
-rw-r--r--lib/gitlab/sidekiq_middleware/worker_context/server.rb2
10 files changed, 84 insertions, 67 deletions
diff --git a/lib/gitlab/sidekiq_middleware/client_metrics.rb b/lib/gitlab/sidekiq_middleware/client_metrics.rb
index e3cc7b28c41..ef80ed706f3 100644
--- a/lib/gitlab/sidekiq_middleware/client_metrics.rb
+++ b/lib/gitlab/sidekiq_middleware/client_metrics.rb
@@ -13,9 +13,15 @@ module Gitlab
def call(worker_class, job, queue, _redis_pool)
# worker_class can either be the string or class of the worker being enqueued.
- worker_class = worker_class.safe_constantize if worker_class.respond_to?(:safe_constantize)
+ worker_class = worker_class.to_s.safe_constantize
+
labels = create_labels(worker_class, queue, job)
- labels[:scheduling] = job.key?('at') ? 'delayed' : 'immediate'
+ if job.key?('at')
+ labels[:scheduling] = 'delayed'
+ job[:scheduled_at] = job['at']
+ else
+ labels[:scheduling] = 'immediate'
+ end
@metrics.fetch(ENQUEUED).increment(labels, 1)
diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb
index aeb58d7c153..e63164efc94 100644
--- a/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb
+++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb
@@ -64,9 +64,9 @@ module Gitlab
Sidekiq.redis do |redis|
redis.multi do |multi|
- redis.set(idempotency_key, jid, ex: expiry, nx: true)
- read_wal_locations = check_existing_wal_locations!(redis, expiry)
- read_jid = redis.get(idempotency_key)
+ multi.set(idempotency_key, jid, ex: expiry, nx: true)
+ read_wal_locations = check_existing_wal_locations!(multi, expiry)
+ read_jid = multi.get(idempotency_key)
end
end
@@ -81,9 +81,9 @@ module Gitlab
return unless job_wal_locations.present?
Sidekiq.redis do |redis|
- redis.multi do
+ redis.multi do |multi|
job_wal_locations.each do |connection_name, location|
- redis.eval(LUA_SET_WAL_SCRIPT, keys: [wal_location_key(connection_name)], argv: [location, pg_wal_lsn_diff(connection_name).to_i, WAL_LOCATION_TTL])
+ multi.eval(LUA_SET_WAL_SCRIPT, keys: [wal_location_key(connection_name)], argv: [location, pg_wal_lsn_diff(connection_name).to_i, WAL_LOCATION_TTL])
end
end
end
@@ -96,9 +96,9 @@ module Gitlab
read_wal_locations = {}
Sidekiq.redis do |redis|
- redis.multi do
+ redis.multi do |multi|
job_wal_locations.keys.each do |connection_name|
- read_wal_locations[connection_name] = redis.lindex(wal_location_key(connection_name), 0)
+ read_wal_locations[connection_name] = multi.lindex(wal_location_key(connection_name), 0)
end
end
end
@@ -110,8 +110,8 @@ module Gitlab
def delete!
Sidekiq.redis do |redis|
redis.multi do |multi|
- redis.del(idempotency_key)
- delete_wal_locations!(redis)
+ multi.del(idempotency_key)
+ delete_wal_locations!(multi)
end
end
end
@@ -140,13 +140,14 @@ module Gitlab
def idempotent?
return false unless worker_klass
return false unless worker_klass.respond_to?(:idempotent?)
+ return false unless preserve_wal_location? || !worker_klass.utilizes_load_balancing_capabilities?
worker_klass.idempotent?
end
private
- attr_accessor :existing_wal_locations
+ attr_writer :existing_wal_locations
attr_reader :queue_name, :job
attr_writer :existing_jid
@@ -154,8 +155,33 @@ module Gitlab
@worker_klass ||= worker_class_name.to_s.safe_constantize
end
+ def delete_wal_locations!(redis)
+ job_wal_locations.keys.each do |connection_name|
+ redis.del(wal_location_key(connection_name))
+ redis.del(existing_wal_location_key(connection_name))
+ end
+ end
+
+ def check_existing_wal_locations!(redis, expiry)
+ read_wal_locations = {}
+
+ job_wal_locations.each do |connection_name, location|
+ key = existing_wal_location_key(connection_name)
+ redis.set(key, location, ex: expiry, nx: true)
+ read_wal_locations[connection_name] = redis.get(key)
+ end
+
+ read_wal_locations
+ end
+
+ def job_wal_locations
+ return {} unless preserve_wal_location?
+
+ job['wal_locations'] || {}
+ end
+
def pg_wal_lsn_diff(connection_name)
- Gitlab::Database::DATABASES[connection_name].pg_wal_lsn_diff(job_wal_locations[connection_name], existing_wal_locations[connection_name])
+ Gitlab::Database.databases[connection_name].pg_wal_lsn_diff(job_wal_locations[connection_name], existing_wal_locations[connection_name])
end
def strategy
@@ -178,12 +204,6 @@ module Gitlab
job['jid']
end
- def job_wal_locations
- return {} unless preserve_wal_location?
-
- job['wal_locations'] || {}
- end
-
def existing_wal_location_key(connection_name)
"#{idempotency_key}:#{connection_name}:existing_wal_location"
end
@@ -208,23 +228,8 @@ module Gitlab
"#{worker_class_name}:#{Sidekiq.dump_json(arguments)}"
end
- def delete_wal_locations!(redis)
- job_wal_locations.keys.each do |connection_name|
- redis.del(wal_location_key(connection_name))
- redis.del(existing_wal_location_key(connection_name))
- end
- end
-
- def check_existing_wal_locations!(redis, expiry)
- read_wal_locations = {}
-
- job_wal_locations.each do |connection_name, location|
- key = existing_wal_location_key(connection_name)
- redis.set(key, location, ex: expiry, nx: true)
- read_wal_locations[connection_name] = redis.get(key)
- end
-
- read_wal_locations
+ def existing_wal_locations
+ @existing_wal_locations ||= {}
end
def preserve_wal_location?
diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/deduplicates_when_scheduling.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/deduplicates_when_scheduling.rb
index fc58d4f5323..b0da85b74a6 100644
--- a/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/deduplicates_when_scheduling.rb
+++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/deduplicates_when_scheduling.rb
@@ -4,11 +4,15 @@ module Gitlab
module SidekiqMiddleware
module DuplicateJobs
module Strategies
- module DeduplicatesWhenScheduling
+ class DeduplicatesWhenScheduling < Base
+ extend ::Gitlab::Utils::Override
+
+ override :initialize
def initialize(duplicate_job)
@duplicate_job = duplicate_job
end
+ override :schedule
def schedule(job)
if deduplicatable_job? && check! && duplicate_job.duplicate?
job['duplicate-of'] = duplicate_job.existing_jid
@@ -25,6 +29,7 @@ module Gitlab
yield
end
+ override :perform
def perform(job)
update_job_wal_location!(job)
end
diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executed.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executed.rb
index 5164b994267..25f1b8b7c51 100644
--- a/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executed.rb
+++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executed.rb
@@ -7,11 +7,7 @@ module Gitlab
# This strategy takes a lock before scheduling the job in a queue and
# removes the lock after the job has executed preventing a new job to be queued
# while a job is still executing.
- class UntilExecuted < Base
- extend ::Gitlab::Utils::Override
-
- include DeduplicatesWhenScheduling
-
+ class UntilExecuted < DeduplicatesWhenScheduling
override :perform
def perform(job)
super
diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executing.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executing.rb
index 1f7e3a4ea30..693e404af73 100644
--- a/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executing.rb
+++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executing.rb
@@ -7,11 +7,7 @@ module Gitlab
# This strategy takes a lock before scheduling the job in a queue and
# removes the lock before the job starts allowing a new job to be queued
# while a job is still executing.
- class UntilExecuting < Base
- extend ::Gitlab::Utils::Override
-
- include DeduplicatesWhenScheduling
-
+ class UntilExecuting < DeduplicatesWhenScheduling
override :perform
def perform(job)
super
diff --git a/lib/gitlab/sidekiq_middleware/metrics_helper.rb b/lib/gitlab/sidekiq_middleware/metrics_helper.rb
index 66930a34319..207d2d769b2 100644
--- a/lib/gitlab/sidekiq_middleware/metrics_helper.rb
+++ b/lib/gitlab/sidekiq_middleware/metrics_helper.rb
@@ -3,14 +3,21 @@
module Gitlab
module SidekiqMiddleware
module MetricsHelper
+ include ::Gitlab::SidekiqMiddleware::WorkerContext
+
TRUE_LABEL = "yes"
FALSE_LABEL = "no"
private
def create_labels(worker_class, queue, job)
- worker_name = (job['wrapped'].presence || worker_class).to_s
- worker = find_worker(worker_name, worker_class)
+ worker = find_worker(worker_class, job)
+
+ # This should never happen: we should always be able to find a
+ # worker class for a given Sidekiq job. But if we can't, we
+ # shouldn't blow up here, because we want to record this in our
+ # metrics.
+ worker_name = worker.try(:name) || worker.class.name
labels = { queue: queue.to_s,
worker: worker_name,
@@ -23,9 +30,7 @@ module Gitlab
labels[:urgency] = worker.get_urgency.to_s
labels[:external_dependencies] = bool_as_label(worker.worker_has_external_dependencies?)
-
- feature_category = worker.get_feature_category
- labels[:feature_category] = feature_category.to_s
+ labels[:feature_category] = worker.get_feature_category.to_s
resource_boundary = worker.get_worker_resource_boundary
labels[:boundary] = resource_boundary == :unknown ? "" : resource_boundary.to_s
@@ -36,10 +41,6 @@ module Gitlab
def bool_as_label(value)
value ? TRUE_LABEL : FALSE_LABEL
end
-
- def find_worker(worker_name, worker_class)
- Gitlab::SidekiqConfig::DEFAULT_WORKERS.fetch(worker_name, worker_class)
- end
end
end
end
diff --git a/lib/gitlab/sidekiq_middleware/server_metrics.rb b/lib/gitlab/sidekiq_middleware/server_metrics.rb
index 2d9767e0266..bea98403997 100644
--- a/lib/gitlab/sidekiq_middleware/server_metrics.rb
+++ b/lib/gitlab/sidekiq_middleware/server_metrics.rb
@@ -53,10 +53,7 @@ module Gitlab
def initialize
@metrics = self.class.metrics
-
- if ::Gitlab::Database::LoadBalancing.enable?
- @metrics[:sidekiq_load_balancing_count] = ::Gitlab::Metrics.counter(:sidekiq_load_balancing_count, 'Sidekiq jobs with load balancing')
- end
+ @metrics[:sidekiq_load_balancing_count] = ::Gitlab::Metrics.counter(:sidekiq_load_balancing_count, 'Sidekiq jobs with load balancing')
end
def call(worker, job, queue)
@@ -128,8 +125,6 @@ module Gitlab
private
def with_load_balancing_settings(job)
- return unless ::Gitlab::Database::LoadBalancing.enable?
-
keys = %w[load_balancing_strategy worker_data_consistency]
return unless keys.all? { |k| job.key?(k) }
diff --git a/lib/gitlab/sidekiq_middleware/worker_context.rb b/lib/gitlab/sidekiq_middleware/worker_context.rb
index 897a9211948..a5d92cf699c 100644
--- a/lib/gitlab/sidekiq_middleware/worker_context.rb
+++ b/lib/gitlab/sidekiq_middleware/worker_context.rb
@@ -10,6 +10,12 @@ module Gitlab
context_or_nil.use(&block)
end
+
+ def find_worker(worker_class, job)
+ worker_name = (job['wrapped'].presence || worker_class).to_s
+
+ Gitlab::SidekiqConfig::DEFAULT_WORKERS[worker_name]&.klass || worker_class
+ end
end
end
end
diff --git a/lib/gitlab/sidekiq_middleware/worker_context/client.rb b/lib/gitlab/sidekiq_middleware/worker_context/client.rb
index 1a899b27ea3..7d3925e9dec 100644
--- a/lib/gitlab/sidekiq_middleware/worker_context/client.rb
+++ b/lib/gitlab/sidekiq_middleware/worker_context/client.rb
@@ -7,11 +7,11 @@ module Gitlab
include Gitlab::SidekiqMiddleware::WorkerContext
def call(worker_class_or_name, job, _queue, _redis_pool, &block)
- worker_class = worker_class_or_name.to_s.safe_constantize
+ worker_class = find_worker(worker_class_or_name.to_s.safe_constantize, job)
- # Mailers can't be constantized like this
+ # This is not a worker we know about, perhaps from a gem
return yield unless worker_class
- return yield unless worker_class.include?(::ApplicationWorker)
+ return yield unless worker_class.respond_to?(:context_for_arguments)
context_for_args = worker_class.context_for_arguments(job['args'])
@@ -19,7 +19,14 @@ module Gitlab
# This should be inside the context for the arguments so
# that we don't override the feature category on the worker
# with the one from the caller.
- Gitlab::ApplicationContext.with_context(feature_category: worker_class.get_feature_category.to_s, &block)
+ #
+ # We do not want to set anything explicitly in the context
+ # when the feature category is 'not_owned'.
+ if worker_class.feature_category_not_owned?
+ yield
+ else
+ Gitlab::ApplicationContext.with_context(feature_category: worker_class.get_feature_category.to_s, &block)
+ end
end
end
end
diff --git a/lib/gitlab/sidekiq_middleware/worker_context/server.rb b/lib/gitlab/sidekiq_middleware/worker_context/server.rb
index 2d8fd8002d2..d026f4918c6 100644
--- a/lib/gitlab/sidekiq_middleware/worker_context/server.rb
+++ b/lib/gitlab/sidekiq_middleware/worker_context/server.rb
@@ -7,7 +7,7 @@ module Gitlab
include Gitlab::SidekiqMiddleware::WorkerContext
def call(worker, job, _queue, &block)
- worker_class = worker.class
+ worker_class = find_worker(worker.class, job)
# This is not a worker we know about, perhaps from a gem
return yield unless worker_class.respond_to?(:get_worker_context)