1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
# frozen_string_literal: true
module Gitlab
class SidekiqMigrateJobs
LOG_FREQUENCY = 1_000
attr_reader :sidekiq_set, :logger
def initialize(sidekiq_set, logger: nil)
@sidekiq_set = sidekiq_set
@logger = logger
end
# mappings is a hash of WorkerClassName => target_queue_name
def execute(mappings)
source_queues_regex = Regexp.union(mappings.keys)
cursor = 0
scanned = 0
migrated = 0
estimated_size = Sidekiq.redis { |c| c.zcard(sidekiq_set) }
logger&.info("Processing #{sidekiq_set} set. Estimated size: #{estimated_size}.")
begin
cursor, jobs = Sidekiq.redis { |c| c.zscan(sidekiq_set, cursor) }
jobs.each do |(job, score)|
if scanned > 0 && scanned % LOG_FREQUENCY == 0
logger&.info("In progress. Scanned records: #{scanned}. Migrated records: #{migrated}.")
end
scanned += 1
next unless job.match?(source_queues_regex)
job_hash = Sidekiq.load_json(job)
destination_queue = mappings[job_hash['class']]
next unless mappings.has_key?(job_hash['class'])
next if job_hash['queue'] == destination_queue
job_hash['queue'] = destination_queue
migrated += migrate_job(job, score, job_hash)
end
end while cursor.to_i != 0
logger&.info("Done. Scanned records: #{scanned}. Migrated records: #{migrated}.")
{
scanned: scanned,
migrated: migrated
}
end
private
def migrate_job(job, score, job_hash)
Sidekiq.redis do |connection|
removed = connection.zrem(sidekiq_set, job)
if removed
connection.zadd(sidekiq_set, score, Sidekiq.dump_json(job_hash))
1
else
0
end
end
end
end
end
|