Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb')
-rw-r--r--lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb54
1 files changed, 49 insertions, 5 deletions
diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb
index e63164efc94..f31262bfcc9 100644
--- a/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb
+++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb
@@ -19,11 +19,12 @@ module Gitlab
class DuplicateJob
include Gitlab::Utils::StrongMemoize
- DUPLICATE_KEY_TTL = 6.hours
+ DEFAULT_DUPLICATE_KEY_TTL = 6.hours
WAL_LOCATION_TTL = 60.seconds
MAX_REDIS_RETRIES = 5
DEFAULT_STRATEGY = :until_executing
STRATEGY_NONE = :none
+ DEDUPLICATED_FLAG_VALUE = 1
LUA_SET_WAL_SCRIPT = <<~EOS
local key, wal, offset, ttl = KEYS[1], ARGV[1], tonumber(ARGV[2]), ARGV[3]
@@ -58,7 +59,7 @@ module Gitlab
end
# This method will return the jid that was set in redis
- def check!(expiry = DUPLICATE_KEY_TTL)
+ def check!(expiry = duplicate_key_ttl)
read_jid = nil
read_wal_locations = {}
@@ -83,7 +84,11 @@ module Gitlab
Sidekiq.redis do |redis|
redis.multi do |multi|
job_wal_locations.each do |connection_name, location|
- multi.eval(LUA_SET_WAL_SCRIPT, keys: [wal_location_key(connection_name)], argv: [location, pg_wal_lsn_diff(connection_name).to_i, WAL_LOCATION_TTL])
+ multi.eval(
+ LUA_SET_WAL_SCRIPT,
+ keys: [wal_location_key(connection_name)],
+ argv: [location, pg_wal_lsn_diff(connection_name).to_i, WAL_LOCATION_TTL]
+ )
end
end
end
@@ -110,12 +115,18 @@ module Gitlab
def delete!
Sidekiq.redis do |redis|
redis.multi do |multi|
- multi.del(idempotency_key)
+ multi.del(idempotency_key, deduplicated_flag_key)
delete_wal_locations!(multi)
end
end
end
+ def reschedule
+ Gitlab::SidekiqLogging::DeduplicationLogger.instance.rescheduled_log(job)
+
+ worker_klass.perform_async(*arguments)
+ end
+
def scheduled?
scheduled_at.present?
end
@@ -126,6 +137,22 @@ module Gitlab
jid != existing_jid
end
+ def set_deduplicated_flag!(expiry = duplicate_key_ttl)
+ return unless reschedulable?
+
+ Sidekiq.redis do |redis|
+ redis.set(deduplicated_flag_key, DEDUPLICATED_FLAG_VALUE, ex: expiry, nx: true)
+ end
+ end
+
+ def should_reschedule?
+ return false unless reschedulable?
+
+ Sidekiq.redis do |redis|
+ redis.get(deduplicated_flag_key).present?
+ end
+ end
+
def scheduled_at
job['at']
end
@@ -145,6 +172,10 @@ module Gitlab
worker_klass.idempotent?
end
+ def duplicate_key_ttl
+ options[:ttl] || DEFAULT_DUPLICATE_KEY_TTL
+ end
+
private
attr_writer :existing_wal_locations
@@ -181,7 +212,12 @@ module Gitlab
end
def pg_wal_lsn_diff(connection_name)
- Gitlab::Database.databases[connection_name].pg_wal_lsn_diff(job_wal_locations[connection_name], existing_wal_locations[connection_name])
+ model = Gitlab::Database.database_base_models[connection_name]
+
+ model.connection.load_balancer.wal_diff(
+ job_wal_locations[connection_name],
+ existing_wal_locations[connection_name]
+ )
end
def strategy
@@ -216,6 +252,10 @@ module Gitlab
@idempotency_key ||= job['idempotency_key'] || "#{namespace}:#{idempotency_hash}"
end
+ def deduplicated_flag_key
+ "#{idempotency_key}:deduplicate_flag"
+ end
+
def idempotency_hash
Digest::SHA256.hexdigest(idempotency_string)
end
@@ -235,6 +275,10 @@ module Gitlab
def preserve_wal_location?
Feature.enabled?(:preserve_latest_wal_locations_for_idempotent_jobs, default_enabled: :yaml)
end
+
+ def reschedulable?
+ !scheduled? && options[:if_deduplicated] == :reschedule_once
+ end
end
end
end