diff options
Diffstat (limited to 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb')
-rw-r--r-- | lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb | 54 |
1 files changed, 49 insertions, 5 deletions
diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb index e63164efc94..f31262bfcc9 100644 --- a/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb +++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb @@ -19,11 +19,12 @@ module Gitlab class DuplicateJob include Gitlab::Utils::StrongMemoize - DUPLICATE_KEY_TTL = 6.hours + DEFAULT_DUPLICATE_KEY_TTL = 6.hours WAL_LOCATION_TTL = 60.seconds MAX_REDIS_RETRIES = 5 DEFAULT_STRATEGY = :until_executing STRATEGY_NONE = :none + DEDUPLICATED_FLAG_VALUE = 1 LUA_SET_WAL_SCRIPT = <<~EOS local key, wal, offset, ttl = KEYS[1], ARGV[1], tonumber(ARGV[2]), ARGV[3] @@ -58,7 +59,7 @@ module Gitlab end # This method will return the jid that was set in redis - def check!(expiry = DUPLICATE_KEY_TTL) + def check!(expiry = duplicate_key_ttl) read_jid = nil read_wal_locations = {} @@ -83,7 +84,11 @@ module Gitlab Sidekiq.redis do |redis| redis.multi do |multi| job_wal_locations.each do |connection_name, location| - multi.eval(LUA_SET_WAL_SCRIPT, keys: [wal_location_key(connection_name)], argv: [location, pg_wal_lsn_diff(connection_name).to_i, WAL_LOCATION_TTL]) + multi.eval( + LUA_SET_WAL_SCRIPT, + keys: [wal_location_key(connection_name)], + argv: [location, pg_wal_lsn_diff(connection_name).to_i, WAL_LOCATION_TTL] + ) end end end @@ -110,12 +115,18 @@ module Gitlab def delete! Sidekiq.redis do |redis| redis.multi do |multi| - multi.del(idempotency_key) + multi.del(idempotency_key, deduplicated_flag_key) delete_wal_locations!(multi) end end end + def reschedule + Gitlab::SidekiqLogging::DeduplicationLogger.instance.rescheduled_log(job) + + worker_klass.perform_async(*arguments) + end + def scheduled? scheduled_at.present? end @@ -126,6 +137,22 @@ module Gitlab jid != existing_jid end + def set_deduplicated_flag!(expiry = duplicate_key_ttl) + return unless reschedulable? + + Sidekiq.redis do |redis| + redis.set(deduplicated_flag_key, DEDUPLICATED_FLAG_VALUE, ex: expiry, nx: true) + end + end + + def should_reschedule? + return false unless reschedulable? + + Sidekiq.redis do |redis| + redis.get(deduplicated_flag_key).present? + end + end + def scheduled_at job['at'] end @@ -145,6 +172,10 @@ module Gitlab worker_klass.idempotent? end + def duplicate_key_ttl + options[:ttl] || DEFAULT_DUPLICATE_KEY_TTL + end + private attr_writer :existing_wal_locations @@ -181,7 +212,12 @@ module Gitlab end def pg_wal_lsn_diff(connection_name) - Gitlab::Database.databases[connection_name].pg_wal_lsn_diff(job_wal_locations[connection_name], existing_wal_locations[connection_name]) + model = Gitlab::Database.database_base_models[connection_name] + + model.connection.load_balancer.wal_diff( + job_wal_locations[connection_name], + existing_wal_locations[connection_name] + ) end def strategy @@ -216,6 +252,10 @@ module Gitlab @idempotency_key ||= job['idempotency_key'] || "#{namespace}:#{idempotency_hash}" end + def deduplicated_flag_key + "#{idempotency_key}:deduplicate_flag" + end + def idempotency_hash Digest::SHA256.hexdigest(idempotency_string) end @@ -235,6 +275,10 @@ module Gitlab def preserve_wal_location? Feature.enabled?(:preserve_latest_wal_locations_for_idempotent_jobs, default_enabled: :yaml) end + + def reschedulable? + !scheduled? && options[:if_deduplicated] == :reschedule_once + end end end end |