Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGitLab Bot <gitlab-bot@gitlab.com>2021-11-18 16:16:36 +0300
committerGitLab Bot <gitlab-bot@gitlab.com>2021-11-18 16:16:36 +0300
commit311b0269b4eb9839fa63f80c8d7a58f32b8138a0 (patch)
tree07e7870bca8aed6d61fdcc810731c50d2c40af47 /lib/gitlab/sidekiq_middleware
parent27909cef6c4170ed9205afa7426b8d3de47cbb0c (diff)
Add latest changes from gitlab-org/gitlab@14-5-stable-eev14.5.0-rc42
Diffstat (limited to 'lib/gitlab/sidekiq_middleware')
-rw-r--r--lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb54
-rw-r--r--lib/gitlab/sidekiq_middleware/duplicate_jobs/server.rb2
-rw-r--r--lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/base.rb4
-rw-r--r--lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/deduplicates_when_scheduling.rb15
-rw-r--r--lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executed.rb3
-rw-r--r--lib/gitlab/sidekiq_middleware/query_analyzer.rb11
-rw-r--r--lib/gitlab/sidekiq_middleware/size_limiter/validator.rb39
7 files changed, 83 insertions, 45 deletions
diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb
index e63164efc94..f31262bfcc9 100644
--- a/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb
+++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb
@@ -19,11 +19,12 @@ module Gitlab
class DuplicateJob
include Gitlab::Utils::StrongMemoize
- DUPLICATE_KEY_TTL = 6.hours
+ DEFAULT_DUPLICATE_KEY_TTL = 6.hours
WAL_LOCATION_TTL = 60.seconds
MAX_REDIS_RETRIES = 5
DEFAULT_STRATEGY = :until_executing
STRATEGY_NONE = :none
+ DEDUPLICATED_FLAG_VALUE = 1
LUA_SET_WAL_SCRIPT = <<~EOS
local key, wal, offset, ttl = KEYS[1], ARGV[1], tonumber(ARGV[2]), ARGV[3]
@@ -58,7 +59,7 @@ module Gitlab
end
# This method will return the jid that was set in redis
- def check!(expiry = DUPLICATE_KEY_TTL)
+ def check!(expiry = duplicate_key_ttl)
read_jid = nil
read_wal_locations = {}
@@ -83,7 +84,11 @@ module Gitlab
Sidekiq.redis do |redis|
redis.multi do |multi|
job_wal_locations.each do |connection_name, location|
- multi.eval(LUA_SET_WAL_SCRIPT, keys: [wal_location_key(connection_name)], argv: [location, pg_wal_lsn_diff(connection_name).to_i, WAL_LOCATION_TTL])
+ multi.eval(
+ LUA_SET_WAL_SCRIPT,
+ keys: [wal_location_key(connection_name)],
+ argv: [location, pg_wal_lsn_diff(connection_name).to_i, WAL_LOCATION_TTL]
+ )
end
end
end
@@ -110,12 +115,18 @@ module Gitlab
def delete!
Sidekiq.redis do |redis|
redis.multi do |multi|
- multi.del(idempotency_key)
+ multi.del(idempotency_key, deduplicated_flag_key)
delete_wal_locations!(multi)
end
end
end
+ def reschedule
+ Gitlab::SidekiqLogging::DeduplicationLogger.instance.rescheduled_log(job)
+
+ worker_klass.perform_async(*arguments)
+ end
+
def scheduled?
scheduled_at.present?
end
@@ -126,6 +137,22 @@ module Gitlab
jid != existing_jid
end
+ def set_deduplicated_flag!(expiry = duplicate_key_ttl)
+ return unless reschedulable?
+
+ Sidekiq.redis do |redis|
+ redis.set(deduplicated_flag_key, DEDUPLICATED_FLAG_VALUE, ex: expiry, nx: true)
+ end
+ end
+
+ def should_reschedule?
+ return false unless reschedulable?
+
+ Sidekiq.redis do |redis|
+ redis.get(deduplicated_flag_key).present?
+ end
+ end
+
def scheduled_at
job['at']
end
@@ -145,6 +172,10 @@ module Gitlab
worker_klass.idempotent?
end
+ def duplicate_key_ttl
+ options[:ttl] || DEFAULT_DUPLICATE_KEY_TTL
+ end
+
private
attr_writer :existing_wal_locations
@@ -181,7 +212,12 @@ module Gitlab
end
def pg_wal_lsn_diff(connection_name)
- Gitlab::Database.databases[connection_name].pg_wal_lsn_diff(job_wal_locations[connection_name], existing_wal_locations[connection_name])
+ model = Gitlab::Database.database_base_models[connection_name]
+
+ model.connection.load_balancer.wal_diff(
+ job_wal_locations[connection_name],
+ existing_wal_locations[connection_name]
+ )
end
def strategy
@@ -216,6 +252,10 @@ module Gitlab
@idempotency_key ||= job['idempotency_key'] || "#{namespace}:#{idempotency_hash}"
end
+ def deduplicated_flag_key
+ "#{idempotency_key}:deduplicate_flag"
+ end
+
def idempotency_hash
Digest::SHA256.hexdigest(idempotency_string)
end
@@ -235,6 +275,10 @@ module Gitlab
def preserve_wal_location?
Feature.enabled?(:preserve_latest_wal_locations_for_idempotent_jobs, default_enabled: :yaml)
end
+
+ def reschedulable?
+ !scheduled? && options[:if_deduplicated] == :reschedule_once
+ end
end
end
end
diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/server.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/server.rb
index a35edc5774e..6d5d41902ea 100644
--- a/lib/gitlab/sidekiq_middleware/duplicate_jobs/server.rb
+++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/server.rb
@@ -5,7 +5,7 @@ module Gitlab
module DuplicateJobs
class Server
def call(worker, job, queue, &block)
- DuplicateJob.new(job, queue).perform(&block)
+ ::Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob.new(job, queue).perform(&block)
end
end
end
diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/base.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/base.rb
index df5df590281..9b3066bae6c 100644
--- a/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/base.rb
+++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/base.rb
@@ -26,8 +26,8 @@ module Gitlab
end
def check!
- # The default expiry time is the DuplicateJob::DUPLICATE_KEY_TTL already
- # Only the strategies de-duplicating when scheduling
+ # The default expiry time is the worker class'
+ # configured deduplication TTL or DuplicateJob::DEFAULT_DUPLICATE_KEY_TTL.
duplicate_job.check!
end
end
diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/deduplicates_when_scheduling.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/deduplicates_when_scheduling.rb
index b0da85b74a6..0fc95534e2a 100644
--- a/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/deduplicates_when_scheduling.rb
+++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/deduplicates_when_scheduling.rb
@@ -6,6 +6,7 @@ module Gitlab
module Strategies
class DeduplicatesWhenScheduling < Base
extend ::Gitlab::Utils::Override
+ include ::Gitlab::Utils::StrongMemoize
override :initialize
def initialize(duplicate_job)
@@ -19,8 +20,9 @@ module Gitlab
if duplicate_job.idempotent?
duplicate_job.update_latest_wal_location!
+ duplicate_job.set_deduplicated_flag!(expiry)
- Gitlab::SidekiqLogging::DeduplicationLogger.instance.log(
+ Gitlab::SidekiqLogging::DeduplicationLogger.instance.deduplicated_log(
job, "dropped #{strategy_name}", duplicate_job.options)
return false
end
@@ -49,11 +51,16 @@ module Gitlab
end
def expiry
- return DuplicateJob::DUPLICATE_KEY_TTL unless duplicate_job.scheduled?
+ strong_memoize(:expiry) do
+ next duplicate_job.duplicate_key_ttl unless duplicate_job.scheduled?
- time_diff = duplicate_job.scheduled_at.to_i - Time.now.to_i
+ time_diff = [
+ duplicate_job.scheduled_at.to_i - Time.now.to_i,
+ 0
+ ].max
- time_diff > 0 ? time_diff : DuplicateJob::DUPLICATE_KEY_TTL
+ time_diff + duplicate_job.duplicate_key_ttl
+ end
end
end
end
diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executed.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executed.rb
index 25f1b8b7c51..8c7e15364f8 100644
--- a/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executed.rb
+++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executed.rb
@@ -14,7 +14,10 @@ module Gitlab
yield
+ should_reschedule = duplicate_job.should_reschedule?
+ # Deleting before rescheduling to make sure we don't deduplicate again.
duplicate_job.delete!
+ duplicate_job.reschedule if should_reschedule
end
end
end
diff --git a/lib/gitlab/sidekiq_middleware/query_analyzer.rb b/lib/gitlab/sidekiq_middleware/query_analyzer.rb
new file mode 100644
index 00000000000..4478fcd3594
--- /dev/null
+++ b/lib/gitlab/sidekiq_middleware/query_analyzer.rb
@@ -0,0 +1,11 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module SidekiqMiddleware
+ class QueryAnalyzer
+ def call(worker, job, queue)
+ ::Gitlab::Database::QueryAnalyzer.instance.within { yield }
+ end
+ end
+ end
+end
diff --git a/lib/gitlab/sidekiq_middleware/size_limiter/validator.rb b/lib/gitlab/sidekiq_middleware/size_limiter/validator.rb
index 71316bbd243..6186c9ad1f4 100644
--- a/lib/gitlab/sidekiq_middleware/size_limiter/validator.rb
+++ b/lib/gitlab/sidekiq_middleware/size_limiter/validator.rb
@@ -55,18 +55,15 @@ module Gitlab
attr_reader :mode, :size_limit, :compression_threshold
- def initialize(
- worker_class, job,
- mode: Gitlab::CurrentSettings.sidekiq_job_limiter_mode,
- compression_threshold: Gitlab::CurrentSettings.sidekiq_job_limiter_compression_threshold_bytes,
- size_limit: Gitlab::CurrentSettings.sidekiq_job_limiter_limit_bytes
- )
+ def initialize(worker_class, job)
@worker_class = worker_class
@job = job
- set_mode(mode)
- set_compression_threshold(compression_threshold)
- set_size_limit(size_limit)
+ current_settings = Gitlab::CurrentSettings.current_application_settings
+
+ @mode = current_settings.sidekiq_job_limiter_mode
+ @compression_threshold = current_settings.sidekiq_job_limiter_compression_threshold_bytes
+ @size_limit = current_settings.sidekiq_job_limiter_limit_bytes
end
def validate!
@@ -90,30 +87,6 @@ module Gitlab
private
- def set_mode(mode)
- @mode = (mode || TRACK_MODE).to_s.strip
- unless MODES.include?(@mode)
- ::Sidekiq.logger.warn "Invalid Sidekiq size limiter mode: #{@mode}. Fallback to #{TRACK_MODE} mode."
- @mode = TRACK_MODE
- end
- end
-
- def set_compression_threshold(compression_threshold)
- @compression_threshold = (compression_threshold || DEFAULT_COMPRESSION_THRESHOLD_BYTES).to_i
- if @compression_threshold <= 0
- ::Sidekiq.logger.warn "Invalid Sidekiq size limiter compression threshold: #{@compression_threshold}"
- @compression_threshold = DEFAULT_COMPRESSION_THRESHOLD_BYTES
- end
- end
-
- def set_size_limit(size_limit)
- @size_limit = (size_limit || DEFAULT_SIZE_LIMIT).to_i
- if @size_limit < 0
- ::Sidekiq.logger.warn "Invalid Sidekiq size limiter limit: #{@size_limit}"
- @size_limit = DEFAULT_SIZE_LIMIT
- end
- end
-
def exceed_limit_error(job_args)
ExceedLimitError.new(@worker_class, job_args.bytesize, @size_limit).tap do |exception|
# This should belong to Gitlab::ErrorTracking. We'll remove this