Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'lib/gitlab/sidekiq_migrate_jobs.rb')
-rw-r--r--lib/gitlab/sidekiq_migrate_jobs.rb66
1 files changed, 57 insertions, 9 deletions
diff --git a/lib/gitlab/sidekiq_migrate_jobs.rb b/lib/gitlab/sidekiq_migrate_jobs.rb
index 62d62bf82c4..2467dd7ca43 100644
--- a/lib/gitlab/sidekiq_migrate_jobs.rb
+++ b/lib/gitlab/sidekiq_migrate_jobs.rb
@@ -3,16 +3,18 @@
module Gitlab
class SidekiqMigrateJobs
LOG_FREQUENCY = 1_000
+ LOG_FREQUENCY_QUEUES = 10
- attr_reader :sidekiq_set, :logger
+ attr_reader :logger, :mappings
- def initialize(sidekiq_set, logger: nil)
- @sidekiq_set = sidekiq_set
+ # mappings is a hash of WorkerClassName => target_queue_name
+ def initialize(mappings, logger: nil)
+ @mappings = mappings
@logger = logger
end
- # mappings is a hash of WorkerClassName => target_queue_name
- def execute(mappings)
+ # Migrate jobs in SortedSets, i.e. scheduled and retry sets.
+ def migrate_set(sidekiq_set)
source_queues_regex = Regexp.union(mappings.keys)
cursor = 0
scanned = 0
@@ -33,7 +35,7 @@ module Gitlab
next unless job.match?(source_queues_regex)
- job_hash = Sidekiq.load_json(job)
+ job_hash = Gitlab::Json.load(job)
destination_queue = mappings[job_hash['class']]
next unless mappings.has_key?(job_hash['class'])
@@ -41,7 +43,7 @@ module Gitlab
job_hash['queue'] = destination_queue
- migrated += migrate_job(job, score, job_hash)
+ migrated += migrate_job_in_set(sidekiq_set, job, score, job_hash)
end
end while cursor.to_i != 0
@@ -53,14 +55,54 @@ module Gitlab
}
end
+ # Migrates jobs from queues that are outside the mappings
+ def migrate_queues
+ routing_rules_queues = mappings.values.uniq
+ logger&.info("List of queues based on routing rules: #{routing_rules_queues}")
+ Sidekiq.redis do |conn|
+ # Redis 6 supports conn.scan_each(match: "queue:*", type: 'list')
+ conn.scan_each(match: "queue:*") do |key|
+ # Redis 5 compatibility
+ next unless conn.type(key) == 'list'
+
+ queue_from = key.split(':', 2).last
+ next if routing_rules_queues.include?(queue_from)
+
+ logger&.info("Migrating #{queue_from} queue")
+
+ migrated = 0
+ while queue_length(queue_from) > 0
+ begin
+ if migrated >= 0 && migrated % LOG_FREQUENCY_QUEUES == 0
+ logger&.info("Migrating from #{queue_from}. Total: #{queue_length(queue_from)}. Migrated: #{migrated}.")
+ end
+
+ job = conn.rpop "queue:#{queue_from}"
+ job_hash = Gitlab::Json.load(job)
+ next unless mappings.has_key?(job_hash['class'])
+
+ destination_queue = mappings[job_hash['class']]
+ job_hash['queue'] = destination_queue
+ conn.lpush("queue:#{destination_queue}", Gitlab::Json.dump(job_hash))
+ migrated += 1
+ rescue JSON::ParserError
+ logger&.error("Unmarshal JSON payload from SidekiqMigrateJobs failed. Job: #{job}")
+ next
+ end
+ end
+ logger&.info("Finished migrating #{queue_from} queue")
+ end
+ end
+ end
+
private
- def migrate_job(job, score, job_hash)
+ def migrate_job_in_set(sidekiq_set, job, score, job_hash)
Sidekiq.redis do |connection|
removed = connection.zrem(sidekiq_set, job)
if removed
- connection.zadd(sidekiq_set, score, Sidekiq.dump_json(job_hash))
+ connection.zadd(sidekiq_set, score, Gitlab::Json.dump(job_hash))
1
else
@@ -68,5 +110,11 @@ module Gitlab
end
end
end
+
+ def queue_length(queue_name)
+ Sidekiq.redis do |conn|
+ conn.llen("queue:#{queue_name}")
+ end
+ end
end
end