diff options
Diffstat (limited to 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb')
-rw-r--r-- | lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb | 97 |
1 files changed, 96 insertions, 1 deletions
diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb index c1dc616cbb2..aeb58d7c153 100644 --- a/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb +++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb @@ -17,10 +17,26 @@ module Gitlab # # When new jobs can be scheduled again, the strategy calls `#delete`. class DuplicateJob + include Gitlab::Utils::StrongMemoize + DUPLICATE_KEY_TTL = 6.hours + WAL_LOCATION_TTL = 60.seconds + MAX_REDIS_RETRIES = 5 DEFAULT_STRATEGY = :until_executing STRATEGY_NONE = :none + LUA_SET_WAL_SCRIPT = <<~EOS + local key, wal, offset, ttl = KEYS[1], ARGV[1], tonumber(ARGV[2]), ARGV[3] + local existing_offset = redis.call("LINDEX", key, -1) + if existing_offset == false then + redis.call("RPUSH", key, wal, offset) + redis.call("EXPIRE", key, ttl) + elseif offset > tonumber(existing_offset) then + redis.call("LSET", key, 0, wal) + redis.call("LSET", key, -1, offset) + end + EOS + attr_reader :existing_jid def initialize(job, queue_name) @@ -44,22 +60,59 @@ module Gitlab # This method will return the jid that was set in redis def check!(expiry = DUPLICATE_KEY_TTL) read_jid = nil + read_wal_locations = {} Sidekiq.redis do |redis| redis.multi do |multi| redis.set(idempotency_key, jid, ex: expiry, nx: true) + read_wal_locations = check_existing_wal_locations!(redis, expiry) read_jid = redis.get(idempotency_key) end end job['idempotency_key'] = idempotency_key + # We need to fetch values since the read_wal_locations and read_jid were obtained inside transaction, under redis.multi command. + self.existing_wal_locations = read_wal_locations.transform_values(&:value) self.existing_jid = read_jid.value end + def update_latest_wal_location! + return unless job_wal_locations.present? + + Sidekiq.redis do |redis| + redis.multi do + job_wal_locations.each do |connection_name, location| + redis.eval(LUA_SET_WAL_SCRIPT, keys: [wal_location_key(connection_name)], argv: [location, pg_wal_lsn_diff(connection_name).to_i, WAL_LOCATION_TTL]) + end + end + end + end + + def latest_wal_locations + return {} unless job_wal_locations.present? + + strong_memoize(:latest_wal_locations) do + read_wal_locations = {} + + Sidekiq.redis do |redis| + redis.multi do + job_wal_locations.keys.each do |connection_name| + read_wal_locations[connection_name] = redis.lindex(wal_location_key(connection_name), 0) + end + end + end + + read_wal_locations.transform_values(&:value).compact + end + end + def delete! Sidekiq.redis do |redis| - redis.del(idempotency_key) + redis.multi do |multi| + redis.del(idempotency_key) + delete_wal_locations!(redis) + end end end @@ -93,6 +146,7 @@ module Gitlab private + attr_accessor :existing_wal_locations attr_reader :queue_name, :job attr_writer :existing_jid @@ -100,6 +154,10 @@ module Gitlab @worker_klass ||= worker_class_name.to_s.safe_constantize end + def pg_wal_lsn_diff(connection_name) + Gitlab::Database::DATABASES[connection_name].pg_wal_lsn_diff(job_wal_locations[connection_name], existing_wal_locations[connection_name]) + end + def strategy return DEFAULT_STRATEGY unless worker_klass return DEFAULT_STRATEGY unless worker_klass.respond_to?(:idempotent?) @@ -120,6 +178,20 @@ module Gitlab job['jid'] end + def job_wal_locations + return {} unless preserve_wal_location? + + job['wal_locations'] || {} + end + + def existing_wal_location_key(connection_name) + "#{idempotency_key}:#{connection_name}:existing_wal_location" + end + + def wal_location_key(connection_name) + "#{idempotency_key}:#{connection_name}:wal_location" + end + def idempotency_key @idempotency_key ||= job['idempotency_key'] || "#{namespace}:#{idempotency_hash}" end @@ -135,6 +207,29 @@ module Gitlab def idempotency_string "#{worker_class_name}:#{Sidekiq.dump_json(arguments)}" end + + def delete_wal_locations!(redis) + job_wal_locations.keys.each do |connection_name| + redis.del(wal_location_key(connection_name)) + redis.del(existing_wal_location_key(connection_name)) + end + end + + def check_existing_wal_locations!(redis, expiry) + read_wal_locations = {} + + job_wal_locations.each do |connection_name, location| + key = existing_wal_location_key(connection_name) + redis.set(key, location, ex: expiry, nx: true) + read_wal_locations[connection_name] = redis.get(key) + end + + read_wal_locations + end + + def preserve_wal_location? + Feature.enabled?(:preserve_latest_wal_locations_for_idempotent_jobs, default_enabled: :yaml) + end end end end |