Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGitLab Bot <gitlab-bot@gitlab.com>2021-09-20 16:18:24 +0300
committerGitLab Bot <gitlab-bot@gitlab.com>2021-09-20 16:18:24 +0300
commit0653e08efd039a5905f3fa4f6e9cef9f5d2f799c (patch)
tree4dcc884cf6d81db44adae4aa99f8ec1233a41f55 /lib/gitlab/sidekiq_middleware
parent744144d28e3e7fddc117924fef88de5d9674fe4c (diff)
Add latest changes from gitlab-org/gitlab@14-3-stable-eev14.3.0-rc42
Diffstat (limited to 'lib/gitlab/sidekiq_middleware')
-rw-r--r--lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb97
-rw-r--r--lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/deduplicates_when_scheduling.rb10
-rw-r--r--lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executed.rb7
-rw-r--r--lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executing.rb6
-rw-r--r--lib/gitlab/sidekiq_middleware/size_limiter/validator.rb52
5 files changed, 152 insertions, 20 deletions
diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb
index c1dc616cbb2..aeb58d7c153 100644
--- a/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb
+++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb
@@ -17,10 +17,26 @@ module Gitlab
#
# When new jobs can be scheduled again, the strategy calls `#delete`.
class DuplicateJob
+ include Gitlab::Utils::StrongMemoize
+
DUPLICATE_KEY_TTL = 6.hours
+ WAL_LOCATION_TTL = 60.seconds
+ MAX_REDIS_RETRIES = 5
DEFAULT_STRATEGY = :until_executing
STRATEGY_NONE = :none
+ LUA_SET_WAL_SCRIPT = <<~EOS
+ local key, wal, offset, ttl = KEYS[1], ARGV[1], tonumber(ARGV[2]), ARGV[3]
+ local existing_offset = redis.call("LINDEX", key, -1)
+ if existing_offset == false then
+ redis.call("RPUSH", key, wal, offset)
+ redis.call("EXPIRE", key, ttl)
+ elseif offset > tonumber(existing_offset) then
+ redis.call("LSET", key, 0, wal)
+ redis.call("LSET", key, -1, offset)
+ end
+ EOS
+
attr_reader :existing_jid
def initialize(job, queue_name)
@@ -44,22 +60,59 @@ module Gitlab
# This method will return the jid that was set in redis
def check!(expiry = DUPLICATE_KEY_TTL)
read_jid = nil
+ read_wal_locations = {}
Sidekiq.redis do |redis|
redis.multi do |multi|
redis.set(idempotency_key, jid, ex: expiry, nx: true)
+ read_wal_locations = check_existing_wal_locations!(redis, expiry)
read_jid = redis.get(idempotency_key)
end
end
job['idempotency_key'] = idempotency_key
+ # We need to fetch values since the read_wal_locations and read_jid were obtained inside transaction, under redis.multi command.
+ self.existing_wal_locations = read_wal_locations.transform_values(&:value)
self.existing_jid = read_jid.value
end
+ def update_latest_wal_location!
+ return unless job_wal_locations.present?
+
+ Sidekiq.redis do |redis|
+ redis.multi do
+ job_wal_locations.each do |connection_name, location|
+ redis.eval(LUA_SET_WAL_SCRIPT, keys: [wal_location_key(connection_name)], argv: [location, pg_wal_lsn_diff(connection_name).to_i, WAL_LOCATION_TTL])
+ end
+ end
+ end
+ end
+
+ def latest_wal_locations
+ return {} unless job_wal_locations.present?
+
+ strong_memoize(:latest_wal_locations) do
+ read_wal_locations = {}
+
+ Sidekiq.redis do |redis|
+ redis.multi do
+ job_wal_locations.keys.each do |connection_name|
+ read_wal_locations[connection_name] = redis.lindex(wal_location_key(connection_name), 0)
+ end
+ end
+ end
+
+ read_wal_locations.transform_values(&:value).compact
+ end
+ end
+
def delete!
Sidekiq.redis do |redis|
- redis.del(idempotency_key)
+ redis.multi do |multi|
+ redis.del(idempotency_key)
+ delete_wal_locations!(redis)
+ end
end
end
@@ -93,6 +146,7 @@ module Gitlab
private
+ attr_accessor :existing_wal_locations
attr_reader :queue_name, :job
attr_writer :existing_jid
@@ -100,6 +154,10 @@ module Gitlab
@worker_klass ||= worker_class_name.to_s.safe_constantize
end
+ def pg_wal_lsn_diff(connection_name)
+ Gitlab::Database::DATABASES[connection_name].pg_wal_lsn_diff(job_wal_locations[connection_name], existing_wal_locations[connection_name])
+ end
+
def strategy
return DEFAULT_STRATEGY unless worker_klass
return DEFAULT_STRATEGY unless worker_klass.respond_to?(:idempotent?)
@@ -120,6 +178,20 @@ module Gitlab
job['jid']
end
+ def job_wal_locations
+ return {} unless preserve_wal_location?
+
+ job['wal_locations'] || {}
+ end
+
+ def existing_wal_location_key(connection_name)
+ "#{idempotency_key}:#{connection_name}:existing_wal_location"
+ end
+
+ def wal_location_key(connection_name)
+ "#{idempotency_key}:#{connection_name}:wal_location"
+ end
+
def idempotency_key
@idempotency_key ||= job['idempotency_key'] || "#{namespace}:#{idempotency_hash}"
end
@@ -135,6 +207,29 @@ module Gitlab
def idempotency_string
"#{worker_class_name}:#{Sidekiq.dump_json(arguments)}"
end
+
+ def delete_wal_locations!(redis)
+ job_wal_locations.keys.each do |connection_name|
+ redis.del(wal_location_key(connection_name))
+ redis.del(existing_wal_location_key(connection_name))
+ end
+ end
+
+ def check_existing_wal_locations!(redis, expiry)
+ read_wal_locations = {}
+
+ job_wal_locations.each do |connection_name, location|
+ key = existing_wal_location_key(connection_name)
+ redis.set(key, location, ex: expiry, nx: true)
+ read_wal_locations[connection_name] = redis.get(key)
+ end
+
+ read_wal_locations
+ end
+
+ def preserve_wal_location?
+ Feature.enabled?(:preserve_latest_wal_locations_for_idempotent_jobs, default_enabled: :yaml)
+ end
end
end
end
diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/deduplicates_when_scheduling.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/deduplicates_when_scheduling.rb
index 469033a5e52..fc58d4f5323 100644
--- a/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/deduplicates_when_scheduling.rb
+++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/deduplicates_when_scheduling.rb
@@ -14,6 +14,8 @@ module Gitlab
job['duplicate-of'] = duplicate_job.existing_jid
if duplicate_job.idempotent?
+ duplicate_job.update_latest_wal_location!
+
Gitlab::SidekiqLogging::DeduplicationLogger.instance.log(
job, "dropped #{strategy_name}", duplicate_job.options)
return false
@@ -23,8 +25,16 @@ module Gitlab
yield
end
+ def perform(job)
+ update_job_wal_location!(job)
+ end
+
private
+ def update_job_wal_location!(job)
+ job['dedup_wal_locations'] = duplicate_job.latest_wal_locations if duplicate_job.latest_wal_locations.present?
+ end
+
def deduplicatable_job?
!duplicate_job.scheduled? || duplicate_job.options[:including_scheduled]
end
diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executed.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executed.rb
index 738efa36fc8..5164b994267 100644
--- a/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executed.rb
+++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executed.rb
@@ -8,9 +8,14 @@ module Gitlab
# removes the lock after the job has executed preventing a new job to be queued
# while a job is still executing.
class UntilExecuted < Base
+ extend ::Gitlab::Utils::Override
+
include DeduplicatesWhenScheduling
- def perform(_job)
+ override :perform
+ def perform(job)
+ super
+
yield
duplicate_job.delete!
diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executing.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executing.rb
index 68d66383b2b..1f7e3a4ea30 100644
--- a/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executing.rb
+++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executing.rb
@@ -8,9 +8,13 @@ module Gitlab
# removes the lock before the job starts allowing a new job to be queued
# while a job is still executing.
class UntilExecuting < Base
+ extend ::Gitlab::Utils::Override
+
include DeduplicatesWhenScheduling
- def perform(_job)
+ override :perform
+ def perform(job)
+ super
duplicate_job.delete!
yield
diff --git a/lib/gitlab/sidekiq_middleware/size_limiter/validator.rb b/lib/gitlab/sidekiq_middleware/size_limiter/validator.rb
index b37eeb8bad1..a83522a489a 100644
--- a/lib/gitlab/sidekiq_middleware/size_limiter/validator.rb
+++ b/lib/gitlab/sidekiq_middleware/size_limiter/validator.rb
@@ -4,12 +4,12 @@ module Gitlab
module SidekiqMiddleware
module SizeLimiter
# Handle a Sidekiq job payload limit based on current configuration.
- # This validator pulls the configuration from the environment variables:
- # - GITLAB_SIDEKIQ_SIZE_LIMITER_MODE: the current mode of the size
- # limiter. This must be either `track` or `compress`.
- # - GITLAB_SIDEKIQ_SIZE_LIMITER_COMPRESSION_THRESHOLD_BYTES: the
- # threshold before the input job payload is compressed.
- # - GITLAB_SIDEKIQ_SIZE_LIMITER_LIMIT_BYTES: the size limit in bytes.
+ # This validator pulls the configuration from application settings:
+ # - limiter_mode: the current mode of the size
+ # limiter. This must be either `track` or `compress`.
+ # - compression_threshold_bytes: the threshold before the input job
+ # payload is compressed.
+ # - limit_bytes: the size limit in bytes.
#
# In track mode, if a job payload limit exceeds the size limit, an
# event is sent to Sentry and the job is scheduled like normal.
@@ -18,12 +18,29 @@ module Gitlab
# then compressed. If the compressed payload still exceeds the limit, the
# job is discarded, and a ExceedLimitError exception is raised.
class Validator
- def self.validate!(worker_class, job)
- new(worker_class, job).validate!
+ # Avoid limiting the size of jobs for `BackgroundMigrationWorker` classes.
+ # We can't read the configuration from `ApplicationSetting` for those jobs
+ # when migrating a path that modifies the `application_settings` table.
+ # Reading the application settings through `ApplicationSetting#current`
+ # causes a `SELECT` with a list of column names, but that list of column
+ # names might not match what the table currently looks like causing
+ # an error when scheduling background migrations.
+ #
+ # The worker classes aren't constants here, because that would force
+ # Application Settings to be loaded earlier causing failures loading
+ # the environmant in rake tasks
+ EXEMPT_WORKER_NAMES = ["BackgroundMigrationWorker", "Database::BatchedBackgroundMigrationWorker"].to_set
+
+ class << self
+ def validate!(worker_class, job)
+ return if EXEMPT_WORKER_NAMES.include?(worker_class.to_s)
+
+ new(worker_class, job).validate!
+ end
end
DEFAULT_SIZE_LIMIT = 0
- DEFAULT_COMPRESION_THRESHOLD_BYTES = 100_000 # 100kb
+ DEFAULT_COMPRESSION_THRESHOLD_BYTES = 100_000 # 100kb
MODES = [
TRACK_MODE = 'track',
@@ -34,9 +51,9 @@ module Gitlab
def initialize(
worker_class, job,
- mode: ENV['GITLAB_SIDEKIQ_SIZE_LIMITER_MODE'],
- compression_threshold: ENV['GITLAB_SIDEKIQ_SIZE_LIMITER_COMPRESSION_THRESHOLD_BYTES'],
- size_limit: ENV['GITLAB_SIDEKIQ_SIZE_LIMITER_LIMIT_BYTES']
+ mode: Gitlab::CurrentSettings.sidekiq_job_limiter_mode,
+ compression_threshold: Gitlab::CurrentSettings.sidekiq_job_limiter_compression_threshold_bytes,
+ size_limit: Gitlab::CurrentSettings.sidekiq_job_limiter_limit_bytes
)
@worker_class = worker_class
@job = job
@@ -47,11 +64,11 @@ module Gitlab
end
def validate!
- return unless @size_limit > 0
- return if allow_big_payload?
-
job_args = compress_if_necessary(::Sidekiq.dump_json(@job['args']))
+
+ return if @size_limit == 0
return if job_args.bytesize <= @size_limit
+ return if allow_big_payload?
exception = exceed_limit_error(job_args)
if compress_mode?
@@ -72,10 +89,10 @@ module Gitlab
end
def set_compression_threshold(compression_threshold)
- @compression_threshold = (compression_threshold || DEFAULT_COMPRESION_THRESHOLD_BYTES).to_i
+ @compression_threshold = (compression_threshold || DEFAULT_COMPRESSION_THRESHOLD_BYTES).to_i
if @compression_threshold <= 0
::Sidekiq.logger.warn "Invalid Sidekiq size limiter compression threshold: #{@compression_threshold}"
- @compression_threshold = DEFAULT_COMPRESION_THRESHOLD_BYTES
+ @compression_threshold = DEFAULT_COMPRESSION_THRESHOLD_BYTES
end
end
@@ -83,6 +100,7 @@ module Gitlab
@size_limit = (size_limit || DEFAULT_SIZE_LIMIT).to_i
if @size_limit < 0
::Sidekiq.logger.warn "Invalid Sidekiq size limiter limit: #{@size_limit}"
+ @size_limit = DEFAULT_SIZE_LIMIT
end
end