diff options
Diffstat (limited to 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb')
-rw-r--r-- | lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb | 107 |
1 files changed, 65 insertions, 42 deletions
diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb index a0eee0a150c..f452abe8d13 100644 --- a/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb +++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb @@ -81,7 +81,7 @@ module Gitlab strong_memoize(:latest_wal_locations) do if Feature.enabled?(:duplicate_jobs_cookie) - latest_wal_locations_cookie + get_cookie.fetch('wal_locations', {}) else latest_wal_locations_multi end @@ -90,7 +90,7 @@ module Gitlab def delete! if Feature.enabled?(:duplicate_jobs_cookie) - cookie.del + with_redis { |redis| redis.del(cookie_key) } else Gitlab::Instrumentation::RedisClusterValidator.allow_cross_slot_commands do with_redis do |redis| @@ -123,7 +123,7 @@ module Gitlab return unless reschedulable? if Feature.enabled?(:duplicate_jobs_cookie) - cookie.append({ 'deduplicated' => '1' }) + with_redis { |redis| redis.eval(DEDUPLICATED_SCRIPT, keys: [cookie_key]) } else with_redis do |redis| redis.set(deduplicated_flag_key, DEDUPLICATED_FLAG_VALUE, ex: expiry, nx: true) @@ -131,11 +131,21 @@ module Gitlab end end + DEDUPLICATED_SCRIPT = <<~LUA + local cookie_msgpack = redis.call("get", KEYS[1]) + if not cookie_msgpack then + return + end + local cookie = cmsgpack.unpack(cookie_msgpack) + cookie.deduplicated = "1" + redis.call("set", KEYS[1], cmsgpack.pack(cookie), "ex", redis.call("ttl", KEYS[1])) + LUA + def should_reschedule? return false unless reschedulable? if Feature.enabled?(:duplicate_jobs_cookie) - cookie.get['deduplicated'].present? + get_cookie['deduplicated'].present? else with_redis do |redis| redis.get(deduplicated_flag_key).present? @@ -172,16 +182,26 @@ module Gitlab attr_writer :existing_jid def check_cookie!(expiry) - my_cookie = { 'jid' => jid } - job_wal_locations.each do |connection_name, location| - my_cookie["existing_wal_location:#{connection_name}"] = location + my_cookie = { + 'jid' => jid, + 'offsets' => {}, + 'wal_locations' => {}, + 'existing_wal_locations' => job_wal_locations + } + + # There are 3 possible scenarios. In order of decreasing likelyhood: + # 1. SET NX succeeds. + # 2. SET NX fails, GET succeeds. + # 3. SET NX fails, the key expires and GET fails. In this case we must retry. + actual_cookie = {} + while actual_cookie.empty? + set_succeeded = with_redis { |r| r.set(cookie_key, my_cookie.to_msgpack, nx: true, ex: expiry) } + actual_cookie = set_succeeded ? my_cookie : get_cookie end - actual_cookie = cookie.set(my_cookie, expiry) ? my_cookie : cookie.get - job['idempotency_key'] = idempotency_key - self.existing_wal_locations = filter_prefix(actual_cookie, 'existing_wal_location:') + self.existing_wal_locations = actual_cookie['existing_wal_locations'] self.existing_jid = actual_cookie['jid'] end @@ -205,15 +225,40 @@ module Gitlab end def update_latest_wal_location_cookie! - new_wal_locations = {} + argv = [] job_wal_locations.each do |connection_name, location| - offset = pg_wal_lsn_diff(connection_name).to_i - new_wal_locations["wal_location:#{connection_name}:#{offset}"] = location + argv += [connection_name, pg_wal_lsn_diff(connection_name), location] end - cookie.append(new_wal_locations) + with_redis { |r| r.eval(UPDATE_WAL_COOKIE_SCRIPT, keys: [cookie_key], argv: argv) } end + # Generally speaking, updating a Redis key by deserializing and + # serializing it on the Redis server is bad for performance. However in + # the case of DuplicateJobs we know that key updates are rare, and the + # most common operations are setting, getting and deleting the key. The + # aim of this design is to make the common operations as fast as + # possible. + UPDATE_WAL_COOKIE_SCRIPT = <<~LUA + local cookie_msgpack = redis.call("get", KEYS[1]) + if not cookie_msgpack then + return + end + local cookie = cmsgpack.unpack(cookie_msgpack) + + for i = 1, #ARGV, 3 do + local connection = ARGV[i] + local current_offset = cookie.offsets[connection] + local new_offset = tonumber(ARGV[i+1]) + if not current_offset or current_offset < new_offset then + cookie.offsets[connection] = new_offset + cookie.wal_locations[connection] = ARGV[i+2] + end + end + + redis.call("set", KEYS[1], cmsgpack.pack(cookie), "ex", redis.call("ttl", KEYS[1])) + LUA + def update_latest_wal_location_multi! with_redis do |redis| redis.multi do |multi| @@ -228,21 +273,6 @@ module Gitlab end end - def latest_wal_locations_cookie - wal_locations = {} - offsets = {} - filter_prefix(cookie.get, 'wal_location:').each do |key, value| - connection, offset = key.split(':', 2) - offset = offset.to_i - if !offsets[connection] || offsets[connection] < offset - offsets[connection] = offset - wal_locations[connection] = value - end - end - - wal_locations - end - def latest_wal_locations_multi read_wal_locations = {} @@ -256,17 +286,6 @@ module Gitlab read_wal_locations.transform_values(&:value).compact end - # Filter_prefix extracts a sub-hash from a Ruby hash. For example, with - # input h = { 'foo:a' => '1', 'foo:b' => '2', 'bar' => '3' }, the output - # of filter_prefix(h, 'foo:') is { 'a' => '1', 'b' => '2' }. - def filter_prefix(hash, prefix) - out = {} - hash.each do |k, v| - out[k.delete_prefix(prefix)] = v if k.start_with?(prefix) - end - out - end - def worker_klass @worker_klass ||= worker_class_name.to_s.safe_constantize end @@ -331,8 +350,12 @@ module Gitlab "#{idempotency_key}:#{connection_name}:wal_location" end - def cookie - @cookie ||= Cookie.new("#{idempotency_key}:cookie") + def cookie_key + "#{idempotency_key}:cookie" + end + + def get_cookie + with_redis { |redis| MessagePack.unpack(redis.get(cookie_key) || "\x80") } end def idempotency_key |