Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorGitLab Bot <gitlab-bot@gitlab.com>2022-10-21 15:11:29 +0300
committerGitLab Bot <gitlab-bot@gitlab.com>2022-10-21 15:11:29 +0300
commit559b1da28e46a9969315beb11ee2d2056f75b06d (patch)
treefad20c706047f4aca44c1f030cb81d5b1e302cab /lib
parenta065770457b66dc856897fc5282bf897b9e4f65b (diff)
Add latest changes from gitlab-org/gitlab@master
Diffstat (limited to 'lib')
-rw-r--r--lib/gitlab/background_migration/backfill_project_namespace_on_issues.rb37
-rw-r--r--lib/gitlab/redis/multi_store.rb1
-rw-r--r--lib/gitlab/sidekiq_middleware/duplicate_jobs/cookie.rb62
-rw-r--r--lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb179
4 files changed, 231 insertions, 48 deletions
diff --git a/lib/gitlab/background_migration/backfill_project_namespace_on_issues.rb b/lib/gitlab/background_migration/backfill_project_namespace_on_issues.rb
index 815c346bb39..2e7d0c41509 100644
--- a/lib/gitlab/background_migration/backfill_project_namespace_on_issues.rb
+++ b/lib/gitlab/background_migration/backfill_project_namespace_on_issues.rb
@@ -4,6 +4,8 @@ module Gitlab
module BackgroundMigration
# Back-fills the `issues.namespace_id` by setting it to corresponding project.project_namespace_id
class BackfillProjectNamespaceOnIssues < BatchedMigrationJob
+ MAX_UPDATE_RETRIES = 3
+
def perform
each_sub_batch(
operation_name: :update_all,
@@ -12,13 +14,42 @@ module Gitlab
.select("issues.id AS issue_id, projects.project_namespace_id").where(issues: { namespace_id: nil })
}
) do |sub_batch|
- connection.execute <<~SQL
+ # updating issues table results in failed batches quite a bit,
+ # to prevent that as much as possible we try to update the same sub-batch up to 3 times.
+ update_with_retry(sub_batch)
+ end
+ end
+
+ private
+
+ # rubocop:disable Database/RescueQueryCanceled
+ # rubocop:disable Database/RescueStatementTimeout
+ def update_with_retry(sub_batch)
+ update_attempt = 1
+
+ begin
+ update_batch(sub_batch)
+ rescue ActiveRecord::StatementTimeout, ActiveRecord::QueryCanceled => e
+ update_attempt += 1
+
+ if update_attempt <= MAX_UPDATE_RETRIES
+ sleep(5)
+ retry
+ end
+
+ raise e
+ end
+ end
+ # rubocop:enable Database/RescueQueryCanceled
+ # rubocop:enable Database/RescueStatementTimeout
+
+ def update_batch(sub_batch)
+ connection.execute <<~SQL
UPDATE issues
SET namespace_id = projects.project_namespace_id
FROM (#{sub_batch.to_sql}) AS projects(issue_id, project_namespace_id)
WHERE issues.id = issue_id
- SQL
- end
+ SQL
end
end
end
diff --git a/lib/gitlab/redis/multi_store.rb b/lib/gitlab/redis/multi_store.rb
index a7c36786d2d..12cb1fc6153 100644
--- a/lib/gitlab/redis/multi_store.rb
+++ b/lib/gitlab/redis/multi_store.rb
@@ -52,6 +52,7 @@ module Gitlab
del
flushdb
rpush
+ eval
).freeze
PIPELINED_COMMANDS = %i(
diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/cookie.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/cookie.rb
new file mode 100644
index 00000000000..367ad4274a4
--- /dev/null
+++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/cookie.rb
@@ -0,0 +1,62 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module SidekiqMiddleware
+ module DuplicateJobs
+ # Cookie is a serialization format we use to minimize the number of keys
+ # we read, write and delete in Redis. Keys and values must be strings.
+ # Newlines are not allowed in either keys or values. Keys cannot contain
+ # '='. This format has the useful property that serialize(h1) +
+ # serialize(h2) == h1.merge(h2).
+ class Cookie
+ def self.serialize(hash)
+ hash.map { |k, v| "#{k}=#{v}\n" }.join
+ end
+
+ def self.deserialize(string)
+ string.each_line(chomp: true).to_h { |line| line.split('=', 2) }
+ end
+
+ def initialize(key)
+ @key = key
+ end
+
+ def set(hash, expiry)
+ with_redis { |redis| redis.set(@key, self.class.serialize(hash), nx: true, ex: expiry) }
+ end
+
+ def get
+ with_redis { |redis| self.class.deserialize(redis.get(@key) || '') }
+ end
+
+ def del
+ with_redis { |redis| redis.del(@key) }
+ end
+
+ def append(hash)
+ with_redis do |redis|
+ redis.eval(
+ # Only append if the keys exists. This way we are not responsible for
+ # setting the expiry of the key: that is the responsibility of #set.
+ 'if redis.call("exists", KEYS[1]) > 0 then redis.call("append", KEYS[1], ARGV[1]) end',
+ keys: [@key],
+ argv: [self.class.serialize(hash)]
+ )
+ end
+ end
+
+ def with_redis(&block)
+ if Feature.enabled?(:use_primary_and_secondary_stores_for_duplicate_jobs) ||
+ Feature.enabled?(:use_primary_store_as_default_for_duplicate_jobs)
+ # TODO: Swap for Gitlab::Redis::SharedState after store transition
+ # https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/923
+ Gitlab::Redis::DuplicateJobs.with(&block) # rubocop:disable CodeReuse/ActiveRecord
+ else
+ # Keep the old behavior intact if neither feature flag is turned on
+ Sidekiq.redis(&block) # rubocop:disable Cop/SidekiqRedisCall
+ end
+ end
+ end
+ end
+ end
+end
diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb
index d42bd672bac..a0eee0a150c 100644
--- a/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb
+++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb
@@ -21,7 +21,6 @@ module Gitlab
DEFAULT_DUPLICATE_KEY_TTL = 6.hours
WAL_LOCATION_TTL = 60.seconds
- MAX_REDIS_RETRIES = 5
DEFAULT_STRATEGY = :until_executing
STRATEGY_NONE = :none
DEDUPLICATED_FLAG_VALUE = 1
@@ -60,37 +59,20 @@ module Gitlab
# This method will return the jid that was set in redis
def check!(expiry = duplicate_key_ttl)
- read_jid = nil
- read_wal_locations = {}
-
- with_redis do |redis|
- redis.multi do |multi|
- multi.set(idempotency_key, jid, ex: expiry, nx: true)
- read_wal_locations = check_existing_wal_locations!(multi, expiry)
- read_jid = multi.get(idempotency_key)
- end
+ if Feature.enabled?(:duplicate_jobs_cookie)
+ check_cookie!(expiry)
+ else
+ check_multi!(expiry)
end
-
- job['idempotency_key'] = idempotency_key
-
- # We need to fetch values since the read_wal_locations and read_jid were obtained inside transaction, under redis.multi command.
- self.existing_wal_locations = read_wal_locations.transform_values(&:value)
- self.existing_jid = read_jid.value
end
def update_latest_wal_location!
return unless job_wal_locations.present?
- with_redis do |redis|
- redis.multi do |multi|
- job_wal_locations.each do |connection_name, location|
- multi.eval(
- LUA_SET_WAL_SCRIPT,
- keys: [wal_location_key(connection_name)],
- argv: [location, pg_wal_lsn_diff(connection_name).to_i, WAL_LOCATION_TTL]
- )
- end
- end
+ if Feature.enabled?(:duplicate_jobs_cookie)
+ update_latest_wal_location_cookie!
+ else
+ update_latest_wal_location_multi!
end
end
@@ -98,25 +80,24 @@ module Gitlab
return {} unless job_wal_locations.present?
strong_memoize(:latest_wal_locations) do
- read_wal_locations = {}
-
- with_redis do |redis|
- redis.multi do |multi|
- job_wal_locations.keys.each do |connection_name|
- read_wal_locations[connection_name] = multi.lindex(wal_location_key(connection_name), 0)
- end
- end
+ if Feature.enabled?(:duplicate_jobs_cookie)
+ latest_wal_locations_cookie
+ else
+ latest_wal_locations_multi
end
- read_wal_locations.transform_values(&:value).compact
end
end
def delete!
- Gitlab::Instrumentation::RedisClusterValidator.allow_cross_slot_commands do
- with_redis do |redis|
- redis.multi do |multi|
- multi.del(idempotency_key, deduplicated_flag_key)
- delete_wal_locations!(multi)
+ if Feature.enabled?(:duplicate_jobs_cookie)
+ cookie.del
+ else
+ Gitlab::Instrumentation::RedisClusterValidator.allow_cross_slot_commands do
+ with_redis do |redis|
+ redis.multi do |multi|
+ multi.del(idempotency_key, deduplicated_flag_key)
+ delete_wal_locations!(multi)
+ end
end
end
end
@@ -141,16 +122,24 @@ module Gitlab
def set_deduplicated_flag!(expiry = duplicate_key_ttl)
return unless reschedulable?
- with_redis do |redis|
- redis.set(deduplicated_flag_key, DEDUPLICATED_FLAG_VALUE, ex: expiry, nx: true)
+ if Feature.enabled?(:duplicate_jobs_cookie)
+ cookie.append({ 'deduplicated' => '1' })
+ else
+ with_redis do |redis|
+ redis.set(deduplicated_flag_key, DEDUPLICATED_FLAG_VALUE, ex: expiry, nx: true)
+ end
end
end
def should_reschedule?
return false unless reschedulable?
- with_redis do |redis|
- redis.get(deduplicated_flag_key).present?
+ if Feature.enabled?(:duplicate_jobs_cookie)
+ cookie.get['deduplicated'].present?
+ else
+ with_redis do |redis|
+ redis.get(deduplicated_flag_key).present?
+ end
end
end
@@ -182,6 +171,102 @@ module Gitlab
attr_reader :queue_name, :job
attr_writer :existing_jid
+ def check_cookie!(expiry)
+ my_cookie = { 'jid' => jid }
+ job_wal_locations.each do |connection_name, location|
+ my_cookie["existing_wal_location:#{connection_name}"] = location
+ end
+
+ actual_cookie = cookie.set(my_cookie, expiry) ? my_cookie : cookie.get
+
+ job['idempotency_key'] = idempotency_key
+
+ self.existing_wal_locations = filter_prefix(actual_cookie, 'existing_wal_location:')
+ self.existing_jid = actual_cookie['jid']
+ end
+
+ def check_multi!(expiry)
+ read_jid = nil
+ read_wal_locations = {}
+
+ with_redis do |redis|
+ redis.multi do |multi|
+ multi.set(idempotency_key, jid, ex: expiry, nx: true)
+ read_wal_locations = check_existing_wal_locations!(multi, expiry)
+ read_jid = multi.get(idempotency_key)
+ end
+ end
+
+ job['idempotency_key'] = idempotency_key
+
+ # We need to fetch values since the read_wal_locations and read_jid were obtained inside transaction, under redis.multi command.
+ self.existing_wal_locations = read_wal_locations.transform_values(&:value)
+ self.existing_jid = read_jid.value
+ end
+
+ def update_latest_wal_location_cookie!
+ new_wal_locations = {}
+ job_wal_locations.each do |connection_name, location|
+ offset = pg_wal_lsn_diff(connection_name).to_i
+ new_wal_locations["wal_location:#{connection_name}:#{offset}"] = location
+ end
+
+ cookie.append(new_wal_locations)
+ end
+
+ def update_latest_wal_location_multi!
+ with_redis do |redis|
+ redis.multi do |multi|
+ job_wal_locations.each do |connection_name, location|
+ multi.eval(
+ LUA_SET_WAL_SCRIPT,
+ keys: [wal_location_key(connection_name)],
+ argv: [location, pg_wal_lsn_diff(connection_name).to_i, WAL_LOCATION_TTL]
+ )
+ end
+ end
+ end
+ end
+
+ def latest_wal_locations_cookie
+ wal_locations = {}
+ offsets = {}
+ filter_prefix(cookie.get, 'wal_location:').each do |key, value|
+ connection, offset = key.split(':', 2)
+ offset = offset.to_i
+ if !offsets[connection] || offsets[connection] < offset
+ offsets[connection] = offset
+ wal_locations[connection] = value
+ end
+ end
+
+ wal_locations
+ end
+
+ def latest_wal_locations_multi
+ read_wal_locations = {}
+
+ with_redis do |redis|
+ redis.multi do |multi|
+ job_wal_locations.keys.each do |connection_name|
+ read_wal_locations[connection_name] = multi.lindex(wal_location_key(connection_name), 0)
+ end
+ end
+ end
+ read_wal_locations.transform_values(&:value).compact
+ end
+
+ # Filter_prefix extracts a sub-hash from a Ruby hash. For example, with
+ # input h = { 'foo:a' => '1', 'foo:b' => '2', 'bar' => '3' }, the output
+ # of filter_prefix(h, 'foo:') is { 'a' => '1', 'b' => '2' }.
+ def filter_prefix(hash, prefix)
+ out = {}
+ hash.each do |k, v|
+ out[k.delete_prefix(prefix)] = v if k.start_with?(prefix)
+ end
+ out
+ end
+
def worker_klass
@worker_klass ||= worker_class_name.to_s.safe_constantize
end
@@ -210,7 +295,7 @@ module Gitlab
end
def pg_wal_lsn_diff(connection_name)
- model = Gitlab::Database.database_base_models[connection_name]
+ model = Gitlab::Database.database_base_models[connection_name.to_sym]
model.connection.load_balancer.wal_diff(
job_wal_locations[connection_name],
@@ -246,6 +331,10 @@ module Gitlab
"#{idempotency_key}:#{connection_name}:wal_location"
end
+ def cookie
+ @cookie ||= Cookie.new("#{idempotency_key}:cookie")
+ end
+
def idempotency_key
@idempotency_key ||= job['idempotency_key'] || "#{namespace}:#{idempotency_hash}"
end