Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'lib/gitlab/database/load_balancing/sidekiq_server_middleware.rb')
-rw-r--r--lib/gitlab/database/load_balancing/sidekiq_server_middleware.rb45
1 files changed, 16 insertions, 29 deletions
diff --git a/lib/gitlab/database/load_balancing/sidekiq_server_middleware.rb b/lib/gitlab/database/load_balancing/sidekiq_server_middleware.rb
index f7b8d2514ba..1cb91a5c45b 100644
--- a/lib/gitlab/database/load_balancing/sidekiq_server_middleware.rb
+++ b/lib/gitlab/database/load_balancing/sidekiq_server_middleware.rb
@@ -4,21 +4,24 @@ module Gitlab
module Database
module LoadBalancing
class SidekiqServerMiddleware
+ include WalTrackingReceiver
+
JobReplicaNotUpToDate = Class.new(::Gitlab::SidekiqMiddleware::RetryError)
- MINIMUM_DELAY_INTERVAL_SECONDS = 0.8
+ REPLICA_WAIT_SLEEP_SECONDS = 0.5
def call(worker, job, _queue)
- worker_class = worker.class
- strategy = select_load_balancing_strategy(worker_class, job)
+ # ActiveJobs have wrapped class stored in 'wrapped' key
+ resolved_class = job['wrapped']&.safe_constantize || worker.class
+ strategy = select_load_balancing_strategy(resolved_class, job)
job['load_balancing_strategy'] = strategy.to_s
if use_primary?(strategy)
::Gitlab::Database::LoadBalancing::Session.current.use_primary!
elsif strategy == :retry
- raise JobReplicaNotUpToDate, "Sidekiq job #{worker_class} JID-#{job['jid']} couldn't use the replica."\
- " Replica was not up to date."
+ raise JobReplicaNotUpToDate, "Sidekiq job #{resolved_class} JID-#{job['jid']} couldn't use the replica."\
+ " Replica was not up to date."
else
# this means we selected an up-to-date replica, but there is nothing to do in this case.
end
@@ -49,7 +52,10 @@ module Gitlab
# Happy case: we can read from a replica.
return replica_strategy(worker_class, job) if databases_in_sync?(wal_locations)
- sleep_if_needed(job)
+ 3.times do
+ sleep REPLICA_WAIT_SLEEP_SECONDS
+ break if databases_in_sync?(wal_locations)
+ end
if databases_in_sync?(wal_locations)
replica_strategy(worker_class, job)
@@ -62,24 +68,18 @@ module Gitlab
end
end
- def sleep_if_needed(job)
- remaining_delay = MINIMUM_DELAY_INTERVAL_SECONDS - (Time.current.to_f - job['created_at'].to_f)
-
- sleep remaining_delay if remaining_delay > 0 && remaining_delay < MINIMUM_DELAY_INTERVAL_SECONDS
- end
-
def get_wal_locations(job)
job['dedup_wal_locations'] || job['wal_locations']
end
def load_balancing_available?(worker_class)
- worker_class.include?(::ApplicationWorker) &&
+ worker_class.include?(::WorkerAttributes) &&
worker_class.utilizes_load_balancing_capabilities? &&
worker_class.get_data_consistency_feature_flag_enabled?
end
def can_retry?(worker_class, job)
- worker_class.get_data_consistency == :delayed && not_yet_retried?(job)
+ worker_class.get_data_consistency == :delayed && not_yet_requeued?(job)
end
def replica_strategy(worker_class, job)
@@ -87,27 +87,14 @@ module Gitlab
end
def retried_before?(worker_class, job)
- worker_class.get_data_consistency == :delayed && !not_yet_retried?(job)
+ worker_class.get_data_consistency == :delayed && !not_yet_requeued?(job)
end
- def not_yet_retried?(job)
+ def not_yet_requeued?(job)
# if `retry_count` is `nil` it indicates that this job was never retried
# the `0` indicates that this is a first retry
job['retry_count'].nil?
end
-
- def databases_in_sync?(wal_locations)
- ::Gitlab::Database::LoadBalancing.each_load_balancer.all? do |lb|
- if (location = wal_locations.with_indifferent_access[lb.name])
- lb.select_up_to_date_host(location)
- else
- # If there's no entry for a load balancer it means the Sidekiq
- # job doesn't care for it. In this case we'll treat the load
- # balancer as being in sync.
- true
- end
- end
- end
end
end
end