From 0653e08efd039a5905f3fa4f6e9cef9f5d2f799c Mon Sep 17 00:00:00 2001 From: GitLab Bot Date: Mon, 20 Sep 2021 13:18:24 +0000 Subject: Add latest changes from gitlab-org/gitlab@14-3-stable-ee --- .../duplicate_jobs/duplicate_job.rb | 97 +++++++++++++++++++++- .../strategies/deduplicates_when_scheduling.rb | 10 +++ .../duplicate_jobs/strategies/until_executed.rb | 7 +- .../duplicate_jobs/strategies/until_executing.rb | 6 +- .../sidekiq_middleware/size_limiter/validator.rb | 52 ++++++++---- 5 files changed, 152 insertions(+), 20 deletions(-) (limited to 'lib/gitlab/sidekiq_middleware') diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb index c1dc616cbb2..aeb58d7c153 100644 --- a/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb +++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb @@ -17,10 +17,26 @@ module Gitlab # # When new jobs can be scheduled again, the strategy calls `#delete`. class DuplicateJob + include Gitlab::Utils::StrongMemoize + DUPLICATE_KEY_TTL = 6.hours + WAL_LOCATION_TTL = 60.seconds + MAX_REDIS_RETRIES = 5 DEFAULT_STRATEGY = :until_executing STRATEGY_NONE = :none + LUA_SET_WAL_SCRIPT = <<~EOS + local key, wal, offset, ttl = KEYS[1], ARGV[1], tonumber(ARGV[2]), ARGV[3] + local existing_offset = redis.call("LINDEX", key, -1) + if existing_offset == false then + redis.call("RPUSH", key, wal, offset) + redis.call("EXPIRE", key, ttl) + elseif offset > tonumber(existing_offset) then + redis.call("LSET", key, 0, wal) + redis.call("LSET", key, -1, offset) + end + EOS + attr_reader :existing_jid def initialize(job, queue_name) @@ -44,22 +60,59 @@ module Gitlab # This method will return the jid that was set in redis def check!(expiry = DUPLICATE_KEY_TTL) read_jid = nil + read_wal_locations = {} Sidekiq.redis do |redis| redis.multi do |multi| redis.set(idempotency_key, jid, ex: expiry, nx: true) + read_wal_locations = check_existing_wal_locations!(redis, expiry) read_jid = redis.get(idempotency_key) end end job['idempotency_key'] = idempotency_key + # We need to fetch values since the read_wal_locations and read_jid were obtained inside transaction, under redis.multi command. + self.existing_wal_locations = read_wal_locations.transform_values(&:value) self.existing_jid = read_jid.value end + def update_latest_wal_location! + return unless job_wal_locations.present? + + Sidekiq.redis do |redis| + redis.multi do + job_wal_locations.each do |connection_name, location| + redis.eval(LUA_SET_WAL_SCRIPT, keys: [wal_location_key(connection_name)], argv: [location, pg_wal_lsn_diff(connection_name).to_i, WAL_LOCATION_TTL]) + end + end + end + end + + def latest_wal_locations + return {} unless job_wal_locations.present? + + strong_memoize(:latest_wal_locations) do + read_wal_locations = {} + + Sidekiq.redis do |redis| + redis.multi do + job_wal_locations.keys.each do |connection_name| + read_wal_locations[connection_name] = redis.lindex(wal_location_key(connection_name), 0) + end + end + end + + read_wal_locations.transform_values(&:value).compact + end + end + def delete! Sidekiq.redis do |redis| - redis.del(idempotency_key) + redis.multi do |multi| + redis.del(idempotency_key) + delete_wal_locations!(redis) + end end end @@ -93,6 +146,7 @@ module Gitlab private + attr_accessor :existing_wal_locations attr_reader :queue_name, :job attr_writer :existing_jid @@ -100,6 +154,10 @@ module Gitlab @worker_klass ||= worker_class_name.to_s.safe_constantize end + def pg_wal_lsn_diff(connection_name) + Gitlab::Database::DATABASES[connection_name].pg_wal_lsn_diff(job_wal_locations[connection_name], existing_wal_locations[connection_name]) + end + def strategy return DEFAULT_STRATEGY unless worker_klass return DEFAULT_STRATEGY unless worker_klass.respond_to?(:idempotent?) @@ -120,6 +178,20 @@ module Gitlab job['jid'] end + def job_wal_locations + return {} unless preserve_wal_location? + + job['wal_locations'] || {} + end + + def existing_wal_location_key(connection_name) + "#{idempotency_key}:#{connection_name}:existing_wal_location" + end + + def wal_location_key(connection_name) + "#{idempotency_key}:#{connection_name}:wal_location" + end + def idempotency_key @idempotency_key ||= job['idempotency_key'] || "#{namespace}:#{idempotency_hash}" end @@ -135,6 +207,29 @@ module Gitlab def idempotency_string "#{worker_class_name}:#{Sidekiq.dump_json(arguments)}" end + + def delete_wal_locations!(redis) + job_wal_locations.keys.each do |connection_name| + redis.del(wal_location_key(connection_name)) + redis.del(existing_wal_location_key(connection_name)) + end + end + + def check_existing_wal_locations!(redis, expiry) + read_wal_locations = {} + + job_wal_locations.each do |connection_name, location| + key = existing_wal_location_key(connection_name) + redis.set(key, location, ex: expiry, nx: true) + read_wal_locations[connection_name] = redis.get(key) + end + + read_wal_locations + end + + def preserve_wal_location? + Feature.enabled?(:preserve_latest_wal_locations_for_idempotent_jobs, default_enabled: :yaml) + end end end end diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/deduplicates_when_scheduling.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/deduplicates_when_scheduling.rb index 469033a5e52..fc58d4f5323 100644 --- a/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/deduplicates_when_scheduling.rb +++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/deduplicates_when_scheduling.rb @@ -14,6 +14,8 @@ module Gitlab job['duplicate-of'] = duplicate_job.existing_jid if duplicate_job.idempotent? + duplicate_job.update_latest_wal_location! + Gitlab::SidekiqLogging::DeduplicationLogger.instance.log( job, "dropped #{strategy_name}", duplicate_job.options) return false @@ -23,8 +25,16 @@ module Gitlab yield end + def perform(job) + update_job_wal_location!(job) + end + private + def update_job_wal_location!(job) + job['dedup_wal_locations'] = duplicate_job.latest_wal_locations if duplicate_job.latest_wal_locations.present? + end + def deduplicatable_job? !duplicate_job.scheduled? || duplicate_job.options[:including_scheduled] end diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executed.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executed.rb index 738efa36fc8..5164b994267 100644 --- a/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executed.rb +++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executed.rb @@ -8,9 +8,14 @@ module Gitlab # removes the lock after the job has executed preventing a new job to be queued # while a job is still executing. class UntilExecuted < Base + extend ::Gitlab::Utils::Override + include DeduplicatesWhenScheduling - def perform(_job) + override :perform + def perform(job) + super + yield duplicate_job.delete! diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executing.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executing.rb index 68d66383b2b..1f7e3a4ea30 100644 --- a/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executing.rb +++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executing.rb @@ -8,9 +8,13 @@ module Gitlab # removes the lock before the job starts allowing a new job to be queued # while a job is still executing. class UntilExecuting < Base + extend ::Gitlab::Utils::Override + include DeduplicatesWhenScheduling - def perform(_job) + override :perform + def perform(job) + super duplicate_job.delete! yield diff --git a/lib/gitlab/sidekiq_middleware/size_limiter/validator.rb b/lib/gitlab/sidekiq_middleware/size_limiter/validator.rb index b37eeb8bad1..a83522a489a 100644 --- a/lib/gitlab/sidekiq_middleware/size_limiter/validator.rb +++ b/lib/gitlab/sidekiq_middleware/size_limiter/validator.rb @@ -4,12 +4,12 @@ module Gitlab module SidekiqMiddleware module SizeLimiter # Handle a Sidekiq job payload limit based on current configuration. - # This validator pulls the configuration from the environment variables: - # - GITLAB_SIDEKIQ_SIZE_LIMITER_MODE: the current mode of the size - # limiter. This must be either `track` or `compress`. - # - GITLAB_SIDEKIQ_SIZE_LIMITER_COMPRESSION_THRESHOLD_BYTES: the - # threshold before the input job payload is compressed. - # - GITLAB_SIDEKIQ_SIZE_LIMITER_LIMIT_BYTES: the size limit in bytes. + # This validator pulls the configuration from application settings: + # - limiter_mode: the current mode of the size + # limiter. This must be either `track` or `compress`. + # - compression_threshold_bytes: the threshold before the input job + # payload is compressed. + # - limit_bytes: the size limit in bytes. # # In track mode, if a job payload limit exceeds the size limit, an # event is sent to Sentry and the job is scheduled like normal. @@ -18,12 +18,29 @@ module Gitlab # then compressed. If the compressed payload still exceeds the limit, the # job is discarded, and a ExceedLimitError exception is raised. class Validator - def self.validate!(worker_class, job) - new(worker_class, job).validate! + # Avoid limiting the size of jobs for `BackgroundMigrationWorker` classes. + # We can't read the configuration from `ApplicationSetting` for those jobs + # when migrating a path that modifies the `application_settings` table. + # Reading the application settings through `ApplicationSetting#current` + # causes a `SELECT` with a list of column names, but that list of column + # names might not match what the table currently looks like causing + # an error when scheduling background migrations. + # + # The worker classes aren't constants here, because that would force + # Application Settings to be loaded earlier causing failures loading + # the environmant in rake tasks + EXEMPT_WORKER_NAMES = ["BackgroundMigrationWorker", "Database::BatchedBackgroundMigrationWorker"].to_set + + class << self + def validate!(worker_class, job) + return if EXEMPT_WORKER_NAMES.include?(worker_class.to_s) + + new(worker_class, job).validate! + end end DEFAULT_SIZE_LIMIT = 0 - DEFAULT_COMPRESION_THRESHOLD_BYTES = 100_000 # 100kb + DEFAULT_COMPRESSION_THRESHOLD_BYTES = 100_000 # 100kb MODES = [ TRACK_MODE = 'track', @@ -34,9 +51,9 @@ module Gitlab def initialize( worker_class, job, - mode: ENV['GITLAB_SIDEKIQ_SIZE_LIMITER_MODE'], - compression_threshold: ENV['GITLAB_SIDEKIQ_SIZE_LIMITER_COMPRESSION_THRESHOLD_BYTES'], - size_limit: ENV['GITLAB_SIDEKIQ_SIZE_LIMITER_LIMIT_BYTES'] + mode: Gitlab::CurrentSettings.sidekiq_job_limiter_mode, + compression_threshold: Gitlab::CurrentSettings.sidekiq_job_limiter_compression_threshold_bytes, + size_limit: Gitlab::CurrentSettings.sidekiq_job_limiter_limit_bytes ) @worker_class = worker_class @job = job @@ -47,11 +64,11 @@ module Gitlab end def validate! - return unless @size_limit > 0 - return if allow_big_payload? - job_args = compress_if_necessary(::Sidekiq.dump_json(@job['args'])) + + return if @size_limit == 0 return if job_args.bytesize <= @size_limit + return if allow_big_payload? exception = exceed_limit_error(job_args) if compress_mode? @@ -72,10 +89,10 @@ module Gitlab end def set_compression_threshold(compression_threshold) - @compression_threshold = (compression_threshold || DEFAULT_COMPRESION_THRESHOLD_BYTES).to_i + @compression_threshold = (compression_threshold || DEFAULT_COMPRESSION_THRESHOLD_BYTES).to_i if @compression_threshold <= 0 ::Sidekiq.logger.warn "Invalid Sidekiq size limiter compression threshold: #{@compression_threshold}" - @compression_threshold = DEFAULT_COMPRESION_THRESHOLD_BYTES + @compression_threshold = DEFAULT_COMPRESSION_THRESHOLD_BYTES end end @@ -83,6 +100,7 @@ module Gitlab @size_limit = (size_limit || DEFAULT_SIZE_LIMIT).to_i if @size_limit < 0 ::Sidekiq.logger.warn "Invalid Sidekiq size limiter limit: #{@size_limit}" + @size_limit = DEFAULT_SIZE_LIMIT end end -- cgit v1.2.3