Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'spec/lib/gitlab/sidekiq_migrate_jobs_spec.rb')
-rw-r--r--spec/lib/gitlab/sidekiq_migrate_jobs_spec.rb322
1 files changed, 283 insertions, 39 deletions
diff --git a/spec/lib/gitlab/sidekiq_migrate_jobs_spec.rb b/spec/lib/gitlab/sidekiq_migrate_jobs_spec.rb
index d4391d3023a..9ed2a0642fc 100644
--- a/spec/lib/gitlab/sidekiq_migrate_jobs_spec.rb
+++ b/spec/lib/gitlab/sidekiq_migrate_jobs_spec.rb
@@ -16,34 +16,42 @@ RSpec.describe Gitlab::SidekiqMigrateJobs, :clean_gitlab_redis_queues do
clear_queues
end
- describe '#execute', :aggregate_failures do
+ describe '#migrate_set', :aggregate_failures do
shared_examples 'processing a set' do
- let(:migrator) { described_class.new(set_name) }
+ let(:migrator) { described_class.new(mappings) }
let(:set_after) do
Sidekiq.redis { |c| c.zrange(set_name, 0, -1, with_scores: true) }
- .map { |item, score| [Sidekiq.load_json(item), score] }
+ .map { |item, score| [Gitlab::Json.load(item), score] }
end
context 'when the set is empty' do
+ let(:mappings) { { 'AuthorizedProjectsWorker' => 'new_queue' } }
+
it 'returns the number of scanned and migrated jobs' do
- expect(migrator.execute('AuthorizedProjectsWorker' => 'new_queue')).to eq(scanned: 0, migrated: 0)
+ expect(migrator.migrate_set(set_name)).to eq(
+ scanned: 0,
+ migrated: 0)
end
end
context 'when the set is not empty' do
+ let(:mappings) { {} }
+
it 'returns the number of scanned and migrated jobs' do
create_jobs
- expect(migrator.execute({})).to eq(scanned: 4, migrated: 0)
+ expect(migrator.migrate_set(set_name)).to eq(scanned: 4, migrated: 0)
end
end
context 'when there are no matching jobs' do
+ let(:mappings) { { 'PostReceive' => 'new_queue' } }
+
it 'does not change any queue names' do
create_jobs(include_post_receive: false)
- expect(migrator.execute('PostReceive' => 'new_queue')).to eq(scanned: 3, migrated: 0)
+ expect(migrator.migrate_set(set_name)).to eq(scanned: 3, migrated: 0)
expect(set_after.length).to eq(3)
expect(set_after.map(&:first)).to all(include('queue' => 'authorized_projects',
@@ -53,10 +61,13 @@ RSpec.describe Gitlab::SidekiqMigrateJobs, :clean_gitlab_redis_queues do
context 'when there are matching jobs' do
it 'migrates only the workers matching the given worker from the set' do
+ migrator = described_class.new({ 'AuthorizedProjectsWorker' => 'new_queue' })
freeze_time do
create_jobs
- expect(migrator.execute('AuthorizedProjectsWorker' => 'new_queue')).to eq(scanned: 4, migrated: 3)
+ expect(migrator.migrate_set(set_name)).to eq(
+ scanned: 4,
+ migrated: 3)
set_after.each.with_index do |(item, score), i|
if item['class'] == 'AuthorizedProjectsWorker'
@@ -71,11 +82,14 @@ RSpec.describe Gitlab::SidekiqMigrateJobs, :clean_gitlab_redis_queues do
end
it 'allows migrating multiple workers at once' do
+ migrator = described_class.new({
+ 'AuthorizedProjectsWorker' => 'new_queue',
+ 'PostReceive' => 'another_queue'
+ })
freeze_time do
create_jobs
- expect(migrator.execute('AuthorizedProjectsWorker' => 'new_queue', 'PostReceive' => 'another_queue'))
- .to eq(scanned: 4, migrated: 4)
+ expect(migrator.migrate_set(set_name)).to eq(scanned: 4, migrated: 4)
set_after.each.with_index do |(item, score), i|
if item['class'] == 'AuthorizedProjectsWorker'
@@ -90,11 +104,14 @@ RSpec.describe Gitlab::SidekiqMigrateJobs, :clean_gitlab_redis_queues do
end
it 'allows migrating multiple workers to the same queue' do
+ migrator = described_class.new({
+ 'AuthorizedProjectsWorker' => 'new_queue',
+ 'PostReceive' => 'new_queue'
+ })
freeze_time do
create_jobs
- expect(migrator.execute('AuthorizedProjectsWorker' => 'new_queue', 'PostReceive' => 'new_queue'))
- .to eq(scanned: 4, migrated: 4)
+ expect(migrator.migrate_set(set_name)).to eq(scanned: 4, migrated: 4)
set_after.each.with_index do |(item, score), i|
expect(item).to include('queue' => 'new_queue', 'args' => [i])
@@ -104,16 +121,17 @@ RSpec.describe Gitlab::SidekiqMigrateJobs, :clean_gitlab_redis_queues do
end
it 'does not try to migrate jobs that are removed from the set during the migration' do
+ migrator = described_class.new({ 'PostReceive' => 'new_queue' })
freeze_time do
create_jobs
- allow(migrator).to receive(:migrate_job).and_wrap_original do |meth, *args|
- Sidekiq.redis { |c| c.zrem(set_name, args.first) }
+ allow(migrator).to receive(:migrate_job_in_set).and_wrap_original do |meth, *args|
+ Sidekiq.redis { |c| c.zrem(set_name, args.second) }
meth.call(*args)
end
- expect(migrator.execute('PostReceive' => 'new_queue')).to eq(scanned: 4, migrated: 0)
+ expect(migrator.migrate_set(set_name)).to eq(scanned: 4, migrated: 0)
expect(set_after.length).to eq(3)
expect(set_after.map(&:first)).to all(include('queue' => 'authorized_projects'))
@@ -121,11 +139,12 @@ RSpec.describe Gitlab::SidekiqMigrateJobs, :clean_gitlab_redis_queues do
end
it 'does not try to migrate unmatched jobs that are added to the set during the migration' do
+ migrator = described_class.new({ 'PostReceive' => 'new_queue' })
create_jobs
calls = 0
- allow(migrator).to receive(:migrate_job).and_wrap_original do |meth, *args|
+ allow(migrator).to receive(:migrate_job_in_set).and_wrap_original do |meth, *args|
if calls == 0
travel_to(5.hours.from_now) { create_jobs(include_post_receive: false) }
end
@@ -135,18 +154,19 @@ RSpec.describe Gitlab::SidekiqMigrateJobs, :clean_gitlab_redis_queues do
meth.call(*args)
end
- expect(migrator.execute('PostReceive' => 'new_queue')).to eq(scanned: 4, migrated: 1)
+ expect(migrator.migrate_set(set_name)).to eq(scanned: 4, migrated: 1)
expect(set_after.group_by { |job| job.first['queue'] }.transform_values(&:count))
.to eq('authorized_projects' => 6, 'new_queue' => 1)
end
it 'iterates through the entire set of jobs' do
+ migrator = described_class.new({ 'NonExistentWorker' => 'new_queue' })
50.times do |i|
travel_to(i.hours.from_now) { create_jobs }
end
- expect(migrator.execute('NonExistentWorker' => 'new_queue')).to eq(scanned: 200, migrated: 0)
+ expect(migrator.migrate_set(set_name)).to eq(scanned: 200, migrated: 0)
expect(set_after.length).to eq(200)
end
@@ -158,14 +178,16 @@ RSpec.describe Gitlab::SidekiqMigrateJobs, :clean_gitlab_redis_queues do
stub_const("#{described_class}::LOG_FREQUENCY", 2)
logger = Logger.new(StringIO.new)
- migrator = described_class.new(set_name, logger: logger)
+ migrator = described_class.new({
+ 'AuthorizedProjectsWorker' => 'new_queue',
+ 'PostReceive' => 'another_queue'
+ }, logger: logger)
expect(logger).to receive(:info).with(a_string_matching('Processing')).once.ordered
expect(logger).to receive(:info).with(a_string_matching('In progress')).once.ordered
expect(logger).to receive(:info).with(a_string_matching('Done')).once.ordered
- expect(migrator.execute('AuthorizedProjectsWorker' => 'new_queue', 'PostReceive' => 'new_queue'))
- .to eq(scanned: 4, migrated: 4)
+ expect(migrator.migrate_set(set_name)).to eq(scanned: 4, migrated: 4)
end
end
end
@@ -186,25 +208,6 @@ RSpec.describe Gitlab::SidekiqMigrateJobs, :clean_gitlab_redis_queues do
end
context 'retried jobs' do
- let(:set_name) { 'retry' }
- # Account for Sidekiq retry jitter
- # https://github.com/mperham/sidekiq/blob/3575ccb44c688dd08bfbfd937696260b12c622fb/lib/sidekiq/job_retry.rb#L217
- let(:schedule_jitter) { 10 }
-
- # Try to mimic as closely as possible what Sidekiq will actually
- # do to retry a job.
- def retry_in(klass, time, args)
- message = { 'class' => klass.name, 'args' => [args], 'retry' => true }.to_json
-
- allow(klass).to receive(:sidekiq_retry_in_block).and_return(proc { time })
-
- begin
- Sidekiq::JobRetry.new.local(klass, message, klass.queue) { raise 'boom' }
- rescue Sidekiq::JobRetry::Skip
- # Sidekiq scheduled the retry
- end
- end
-
def create_jobs(include_post_receive: true)
retry_in(AuthorizedProjectsWorker, 1.hour, 0)
retry_in(AuthorizedProjectsWorker, 2.hours, 1)
@@ -212,7 +215,248 @@ RSpec.describe Gitlab::SidekiqMigrateJobs, :clean_gitlab_redis_queues do
retry_in(AuthorizedProjectsWorker, 4.hours, 3)
end
+ include_context 'when handling retried jobs'
it_behaves_like 'processing a set'
end
end
+
+ describe '#migrate_queues', :aggregate_failures do
+ let(:migrator) { described_class.new(mappings, logger: logger) }
+ let(:logger) { nil }
+
+ def list_queues
+ queues = Sidekiq.redis do |conn|
+ conn.scan_each(match: "queue:*").to_a
+ end
+ queues.uniq.map { |queue| queue.split(':', 2).last }
+ end
+
+ def list_jobs(queue_name)
+ Sidekiq.redis { |conn| conn.lrange("queue:#{queue_name}", 0, -1) }
+ .map { |item| Gitlab::Json.load(item) }
+ end
+
+ def pre_migrate_checks; end
+
+ before do
+ queue_name_from_worker_name = Gitlab::SidekiqConfig::WorkerRouter.method(:queue_name_from_worker_name)
+ EmailReceiverWorker.sidekiq_options(queue: queue_name_from_worker_name.call(EmailReceiverWorker))
+ EmailReceiverWorker.perform_async('foo')
+ EmailReceiverWorker.perform_async('bar')
+
+ # test worker that has ':' inside the queue name
+ AuthorizedProjectUpdate::ProjectRecalculateWorker.sidekiq_options(
+ queue: queue_name_from_worker_name.call(AuthorizedProjectUpdate::ProjectRecalculateWorker)
+ )
+ AuthorizedProjectUpdate::ProjectRecalculateWorker.perform_async
+ end
+
+ after do
+ # resets the queue name to its original
+ EmailReceiverWorker.set_queue
+ AuthorizedProjectUpdate::ProjectRecalculateWorker.set_queue
+ end
+
+ shared_examples 'migrating queues' do
+ it 'migrates the jobs to the correct destination queue' do
+ queues = list_queues
+ expect(queues).to include(*queues_included_pre_migrate)
+ expect(queues).not_to include(*queues_excluded_pre_migrate)
+ pre_migrate_checks
+
+ migrator.migrate_queues
+
+ queues = list_queues
+ expect(queues).not_to include(*queues_excluded_post_migrate)
+ expect(queues).to include(*queues_included_post_migrate)
+ post_migrate_checks
+ end
+ end
+
+ context 'with all workers mapped to default queue' do
+ let(:mappings) do
+ { 'EmailReceiverWorker' => 'default', 'AuthorizedProjectUpdate::ProjectRecalculateWorker' => 'default' }
+ end
+
+ let(:queues_included_pre_migrate) do
+ ['email_receiver',
+ 'authorized_project_update:authorized_project_update_project_recalculate']
+ end
+
+ let(:queues_excluded_pre_migrate) { ['default'] }
+ let(:queues_excluded_post_migrate) do
+ ['email_receiver',
+ 'authorized_project_update:authorized_project_update_project_recalculate']
+ end
+
+ let(:queues_included_post_migrate) { ['default'] }
+
+ def post_migrate_checks
+ jobs = list_jobs('default')
+ expect(jobs.length).to eq(3)
+ sorted = jobs.sort_by { |job| [job["class"], job["args"]] }
+ expect(sorted[0]).to include('class' => 'AuthorizedProjectUpdate::ProjectRecalculateWorker',
+ 'queue' => 'default')
+ expect(sorted[1]).to include('class' => 'EmailReceiverWorker', 'args' => ['bar'], 'queue' => 'default')
+ expect(sorted[2]).to include('class' => 'EmailReceiverWorker', 'args' => ['foo'], 'queue' => 'default')
+ end
+
+ it_behaves_like 'migrating queues'
+ end
+
+ context 'with custom mapping to different queues' do
+ let(:mappings) do
+ { 'EmailReceiverWorker' => 'new_email',
+ 'AuthorizedProjectUpdate::ProjectRecalculateWorker' => 'new_authorized' }
+ end
+
+ let(:queues_included_pre_migrate) do
+ ['email_receiver',
+ 'authorized_project_update:authorized_project_update_project_recalculate']
+ end
+
+ let(:queues_excluded_pre_migrate) { %w[new_email new_authorized] }
+ let(:queues_excluded_post_migrate) do
+ ['email_receiver',
+ 'authorized_project_update:authorized_project_update_project_recalculate']
+ end
+
+ let(:queues_included_post_migrate) { %w[new_email new_authorized] }
+
+ def post_migrate_checks
+ email_jobs = list_jobs('new_email')
+ expect(email_jobs.length).to eq(2)
+ expect(email_jobs[0]).to include('class' => 'EmailReceiverWorker', 'args' => ['bar'], 'queue' => 'new_email')
+ expect(email_jobs[1]).to include('class' => 'EmailReceiverWorker', 'args' => ['foo'], 'queue' => 'new_email')
+
+ export_jobs = list_jobs('new_authorized')
+ expect(export_jobs.length).to eq(1)
+ expect(export_jobs[0]).to include('class' => 'AuthorizedProjectUpdate::ProjectRecalculateWorker',
+ 'queue' => 'new_authorized')
+ end
+
+ it_behaves_like 'migrating queues'
+ end
+
+ context 'with illegal JSON payload' do
+ let(:job) { '{foo: 1}' }
+ let(:mappings) do
+ { 'EmailReceiverWorker' => 'default', 'AuthorizedProjectUpdate::ProjectRecalculateWorker' => 'default' }
+ end
+
+ let(:queues_included_pre_migrate) do
+ ['email_receiver',
+ 'authorized_project_update:authorized_project_update_project_recalculate']
+ end
+
+ let(:queues_excluded_pre_migrate) { ['default'] }
+ let(:queues_excluded_post_migrate) do
+ ['email_receiver',
+ 'authorized_project_update:authorized_project_update_project_recalculate']
+ end
+
+ let(:queues_included_post_migrate) { ['default'] }
+ let(:logger) { Logger.new(StringIO.new) }
+
+ before do
+ Sidekiq.redis do |conn|
+ conn.lpush("queue:email_receiver", job)
+ end
+ end
+
+ def pre_migrate_checks
+ expect(logger).to receive(:error)
+ .with(a_string_matching('Unmarshal JSON payload from SidekiqMigrateJobs failed'))
+ .once
+ end
+
+ def post_migrate_checks
+ jobs = list_jobs('default')
+ expect(jobs.length).to eq(3)
+ sorted = jobs.sort_by { |job| [job["class"], job["args"]] }
+ expect(sorted[0]).to include('class' => 'AuthorizedProjectUpdate::ProjectRecalculateWorker',
+ 'queue' => 'default')
+ expect(sorted[1]).to include('class' => 'EmailReceiverWorker', 'args' => ['bar'], 'queue' => 'default')
+ expect(sorted[2]).to include('class' => 'EmailReceiverWorker', 'args' => ['foo'], 'queue' => 'default')
+ end
+
+ it_behaves_like 'migrating queues'
+ end
+
+ context 'when multiple workers are in the same queue' do
+ before do
+ ExportCsvWorker.sidekiq_options(queue: 'email_receiver') # follows EmailReceiverWorker's queue
+ ExportCsvWorker.perform_async('fizz')
+ end
+
+ after do
+ ExportCsvWorker.set_queue
+ end
+
+ context 'when the queue exists in mappings' do
+ let(:mappings) do
+ { 'EmailReceiverWorker' => 'email_receiver', 'AuthorizedProjectUpdate::ProjectRecalculateWorker' => 'default',
+ 'ExportCsvWorker' => 'default' }
+ end
+
+ let(:queues_included_pre_migrate) do
+ ['email_receiver',
+ 'authorized_project_update:authorized_project_update_project_recalculate']
+ end
+
+ let(:queues_excluded_pre_migrate) { ['default'] }
+ let(:queues_excluded_post_migrate) do
+ ['authorized_project_update:authorized_project_update_project_recalculate']
+ end
+
+ let(:queues_included_post_migrate) { %w[default email_receiver] }
+
+ it_behaves_like 'migrating queues'
+ def post_migrate_checks
+ # jobs from email_receiver are not migrated at all
+ jobs = list_jobs('email_receiver')
+ expect(jobs.length).to eq(3)
+ sorted = jobs.sort_by { |job| [job["class"], job["args"]] }
+ expect(sorted[0]).to include('class' => 'EmailReceiverWorker', 'args' => ['bar'], 'queue' => 'email_receiver')
+ expect(sorted[1]).to include('class' => 'EmailReceiverWorker', 'args' => ['foo'], 'queue' => 'email_receiver')
+ expect(sorted[2]).to include('class' => 'ExportCsvWorker', 'args' => ['fizz'], 'queue' => 'email_receiver')
+ end
+ end
+
+ context 'when the queue doesnt exist in mappings' do
+ let(:mappings) do
+ { 'EmailReceiverWorker' => 'default', 'AuthorizedProjectUpdate::ProjectRecalculateWorker' => 'default',
+ 'ExportCsvWorker' => 'default' }
+ end
+
+ let(:queues_included_pre_migrate) do
+ ['email_receiver',
+ 'authorized_project_update:authorized_project_update_project_recalculate']
+ end
+
+ let(:queues_excluded_pre_migrate) { ['default'] }
+ let(:queues_excluded_post_migrate) do
+ ['email_receiver', 'authorized_project_update:authorized_project_update_project_recalculate']
+ end
+
+ let(:queues_included_post_migrate) { ['default'] }
+
+ it_behaves_like 'migrating queues'
+ def post_migrate_checks
+ # jobs from email_receiver are all migrated
+ jobs = list_jobs('email_receiver')
+ expect(jobs.length).to eq(0)
+
+ jobs = list_jobs('default')
+ expect(jobs.length).to eq(4)
+ sorted = jobs.sort_by { |job| [job["class"], job["args"]] }
+ expect(sorted[0]).to include('class' => 'AuthorizedProjectUpdate::ProjectRecalculateWorker',
+ 'queue' => 'default')
+ expect(sorted[1]).to include('class' => 'EmailReceiverWorker', 'args' => ['bar'], 'queue' => 'default')
+ expect(sorted[2]).to include('class' => 'EmailReceiverWorker', 'args' => ['foo'], 'queue' => 'default')
+ expect(sorted[3]).to include('class' => 'ExportCsvWorker', 'args' => ['fizz'], 'queue' => 'default')
+ end
+ end
+ end
+ end
end