Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb')
-rw-r--r--lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb97
1 files changed, 96 insertions, 1 deletions
diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb
index c1dc616cbb2..aeb58d7c153 100644
--- a/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb
+++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb
@@ -17,10 +17,26 @@ module Gitlab
#
# When new jobs can be scheduled again, the strategy calls `#delete`.
class DuplicateJob
+ include Gitlab::Utils::StrongMemoize
+
DUPLICATE_KEY_TTL = 6.hours
+ WAL_LOCATION_TTL = 60.seconds
+ MAX_REDIS_RETRIES = 5
DEFAULT_STRATEGY = :until_executing
STRATEGY_NONE = :none
+ LUA_SET_WAL_SCRIPT = <<~EOS
+ local key, wal, offset, ttl = KEYS[1], ARGV[1], tonumber(ARGV[2]), ARGV[3]
+ local existing_offset = redis.call("LINDEX", key, -1)
+ if existing_offset == false then
+ redis.call("RPUSH", key, wal, offset)
+ redis.call("EXPIRE", key, ttl)
+ elseif offset > tonumber(existing_offset) then
+ redis.call("LSET", key, 0, wal)
+ redis.call("LSET", key, -1, offset)
+ end
+ EOS
+
attr_reader :existing_jid
def initialize(job, queue_name)
@@ -44,22 +60,59 @@ module Gitlab
# This method will return the jid that was set in redis
def check!(expiry = DUPLICATE_KEY_TTL)
read_jid = nil
+ read_wal_locations = {}
Sidekiq.redis do |redis|
redis.multi do |multi|
redis.set(idempotency_key, jid, ex: expiry, nx: true)
+ read_wal_locations = check_existing_wal_locations!(redis, expiry)
read_jid = redis.get(idempotency_key)
end
end
job['idempotency_key'] = idempotency_key
+ # We need to fetch values since the read_wal_locations and read_jid were obtained inside transaction, under redis.multi command.
+ self.existing_wal_locations = read_wal_locations.transform_values(&:value)
self.existing_jid = read_jid.value
end
+ def update_latest_wal_location!
+ return unless job_wal_locations.present?
+
+ Sidekiq.redis do |redis|
+ redis.multi do
+ job_wal_locations.each do |connection_name, location|
+ redis.eval(LUA_SET_WAL_SCRIPT, keys: [wal_location_key(connection_name)], argv: [location, pg_wal_lsn_diff(connection_name).to_i, WAL_LOCATION_TTL])
+ end
+ end
+ end
+ end
+
+ def latest_wal_locations
+ return {} unless job_wal_locations.present?
+
+ strong_memoize(:latest_wal_locations) do
+ read_wal_locations = {}
+
+ Sidekiq.redis do |redis|
+ redis.multi do
+ job_wal_locations.keys.each do |connection_name|
+ read_wal_locations[connection_name] = redis.lindex(wal_location_key(connection_name), 0)
+ end
+ end
+ end
+
+ read_wal_locations.transform_values(&:value).compact
+ end
+ end
+
def delete!
Sidekiq.redis do |redis|
- redis.del(idempotency_key)
+ redis.multi do |multi|
+ redis.del(idempotency_key)
+ delete_wal_locations!(redis)
+ end
end
end
@@ -93,6 +146,7 @@ module Gitlab
private
+ attr_accessor :existing_wal_locations
attr_reader :queue_name, :job
attr_writer :existing_jid
@@ -100,6 +154,10 @@ module Gitlab
@worker_klass ||= worker_class_name.to_s.safe_constantize
end
+ def pg_wal_lsn_diff(connection_name)
+ Gitlab::Database::DATABASES[connection_name].pg_wal_lsn_diff(job_wal_locations[connection_name], existing_wal_locations[connection_name])
+ end
+
def strategy
return DEFAULT_STRATEGY unless worker_klass
return DEFAULT_STRATEGY unless worker_klass.respond_to?(:idempotent?)
@@ -120,6 +178,20 @@ module Gitlab
job['jid']
end
+ def job_wal_locations
+ return {} unless preserve_wal_location?
+
+ job['wal_locations'] || {}
+ end
+
+ def existing_wal_location_key(connection_name)
+ "#{idempotency_key}:#{connection_name}:existing_wal_location"
+ end
+
+ def wal_location_key(connection_name)
+ "#{idempotency_key}:#{connection_name}:wal_location"
+ end
+
def idempotency_key
@idempotency_key ||= job['idempotency_key'] || "#{namespace}:#{idempotency_hash}"
end
@@ -135,6 +207,29 @@ module Gitlab
def idempotency_string
"#{worker_class_name}:#{Sidekiq.dump_json(arguments)}"
end
+
+ def delete_wal_locations!(redis)
+ job_wal_locations.keys.each do |connection_name|
+ redis.del(wal_location_key(connection_name))
+ redis.del(existing_wal_location_key(connection_name))
+ end
+ end
+
+ def check_existing_wal_locations!(redis, expiry)
+ read_wal_locations = {}
+
+ job_wal_locations.each do |connection_name, location|
+ key = existing_wal_location_key(connection_name)
+ redis.set(key, location, ex: expiry, nx: true)
+ read_wal_locations[connection_name] = redis.get(key)
+ end
+
+ read_wal_locations
+ end
+
+ def preserve_wal_location?
+ Feature.enabled?(:preserve_latest_wal_locations_for_idempotent_jobs, default_enabled: :yaml)
+ end
end
end
end