Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb')
-rw-r--r--lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb107
1 files changed, 65 insertions, 42 deletions
diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb
index a0eee0a150c..f452abe8d13 100644
--- a/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb
+++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb
@@ -81,7 +81,7 @@ module Gitlab
strong_memoize(:latest_wal_locations) do
if Feature.enabled?(:duplicate_jobs_cookie)
- latest_wal_locations_cookie
+ get_cookie.fetch('wal_locations', {})
else
latest_wal_locations_multi
end
@@ -90,7 +90,7 @@ module Gitlab
def delete!
if Feature.enabled?(:duplicate_jobs_cookie)
- cookie.del
+ with_redis { |redis| redis.del(cookie_key) }
else
Gitlab::Instrumentation::RedisClusterValidator.allow_cross_slot_commands do
with_redis do |redis|
@@ -123,7 +123,7 @@ module Gitlab
return unless reschedulable?
if Feature.enabled?(:duplicate_jobs_cookie)
- cookie.append({ 'deduplicated' => '1' })
+ with_redis { |redis| redis.eval(DEDUPLICATED_SCRIPT, keys: [cookie_key]) }
else
with_redis do |redis|
redis.set(deduplicated_flag_key, DEDUPLICATED_FLAG_VALUE, ex: expiry, nx: true)
@@ -131,11 +131,21 @@ module Gitlab
end
end
+ DEDUPLICATED_SCRIPT = <<~LUA
+ local cookie_msgpack = redis.call("get", KEYS[1])
+ if not cookie_msgpack then
+ return
+ end
+ local cookie = cmsgpack.unpack(cookie_msgpack)
+ cookie.deduplicated = "1"
+ redis.call("set", KEYS[1], cmsgpack.pack(cookie), "ex", redis.call("ttl", KEYS[1]))
+ LUA
+
def should_reschedule?
return false unless reschedulable?
if Feature.enabled?(:duplicate_jobs_cookie)
- cookie.get['deduplicated'].present?
+ get_cookie['deduplicated'].present?
else
with_redis do |redis|
redis.get(deduplicated_flag_key).present?
@@ -172,16 +182,26 @@ module Gitlab
attr_writer :existing_jid
def check_cookie!(expiry)
- my_cookie = { 'jid' => jid }
- job_wal_locations.each do |connection_name, location|
- my_cookie["existing_wal_location:#{connection_name}"] = location
+ my_cookie = {
+ 'jid' => jid,
+ 'offsets' => {},
+ 'wal_locations' => {},
+ 'existing_wal_locations' => job_wal_locations
+ }
+
+ # There are 3 possible scenarios. In order of decreasing likelyhood:
+ # 1. SET NX succeeds.
+ # 2. SET NX fails, GET succeeds.
+ # 3. SET NX fails, the key expires and GET fails. In this case we must retry.
+ actual_cookie = {}
+ while actual_cookie.empty?
+ set_succeeded = with_redis { |r| r.set(cookie_key, my_cookie.to_msgpack, nx: true, ex: expiry) }
+ actual_cookie = set_succeeded ? my_cookie : get_cookie
end
- actual_cookie = cookie.set(my_cookie, expiry) ? my_cookie : cookie.get
-
job['idempotency_key'] = idempotency_key
- self.existing_wal_locations = filter_prefix(actual_cookie, 'existing_wal_location:')
+ self.existing_wal_locations = actual_cookie['existing_wal_locations']
self.existing_jid = actual_cookie['jid']
end
@@ -205,15 +225,40 @@ module Gitlab
end
def update_latest_wal_location_cookie!
- new_wal_locations = {}
+ argv = []
job_wal_locations.each do |connection_name, location|
- offset = pg_wal_lsn_diff(connection_name).to_i
- new_wal_locations["wal_location:#{connection_name}:#{offset}"] = location
+ argv += [connection_name, pg_wal_lsn_diff(connection_name), location]
end
- cookie.append(new_wal_locations)
+ with_redis { |r| r.eval(UPDATE_WAL_COOKIE_SCRIPT, keys: [cookie_key], argv: argv) }
end
+ # Generally speaking, updating a Redis key by deserializing and
+ # serializing it on the Redis server is bad for performance. However in
+ # the case of DuplicateJobs we know that key updates are rare, and the
+ # most common operations are setting, getting and deleting the key. The
+ # aim of this design is to make the common operations as fast as
+ # possible.
+ UPDATE_WAL_COOKIE_SCRIPT = <<~LUA
+ local cookie_msgpack = redis.call("get", KEYS[1])
+ if not cookie_msgpack then
+ return
+ end
+ local cookie = cmsgpack.unpack(cookie_msgpack)
+
+ for i = 1, #ARGV, 3 do
+ local connection = ARGV[i]
+ local current_offset = cookie.offsets[connection]
+ local new_offset = tonumber(ARGV[i+1])
+ if not current_offset or current_offset < new_offset then
+ cookie.offsets[connection] = new_offset
+ cookie.wal_locations[connection] = ARGV[i+2]
+ end
+ end
+
+ redis.call("set", KEYS[1], cmsgpack.pack(cookie), "ex", redis.call("ttl", KEYS[1]))
+ LUA
+
def update_latest_wal_location_multi!
with_redis do |redis|
redis.multi do |multi|
@@ -228,21 +273,6 @@ module Gitlab
end
end
- def latest_wal_locations_cookie
- wal_locations = {}
- offsets = {}
- filter_prefix(cookie.get, 'wal_location:').each do |key, value|
- connection, offset = key.split(':', 2)
- offset = offset.to_i
- if !offsets[connection] || offsets[connection] < offset
- offsets[connection] = offset
- wal_locations[connection] = value
- end
- end
-
- wal_locations
- end
-
def latest_wal_locations_multi
read_wal_locations = {}
@@ -256,17 +286,6 @@ module Gitlab
read_wal_locations.transform_values(&:value).compact
end
- # Filter_prefix extracts a sub-hash from a Ruby hash. For example, with
- # input h = { 'foo:a' => '1', 'foo:b' => '2', 'bar' => '3' }, the output
- # of filter_prefix(h, 'foo:') is { 'a' => '1', 'b' => '2' }.
- def filter_prefix(hash, prefix)
- out = {}
- hash.each do |k, v|
- out[k.delete_prefix(prefix)] = v if k.start_with?(prefix)
- end
- out
- end
-
def worker_klass
@worker_klass ||= worker_class_name.to_s.safe_constantize
end
@@ -331,8 +350,12 @@ module Gitlab
"#{idempotency_key}:#{connection_name}:wal_location"
end
- def cookie
- @cookie ||= Cookie.new("#{idempotency_key}:cookie")
+ def cookie_key
+ "#{idempotency_key}:cookie"
+ end
+
+ def get_cookie
+ with_redis { |redis| MessagePack.unpack(redis.get(cookie_key) || "\x80") }
end
def idempotency_key