diff options
Diffstat (limited to 'spec/lib/gitlab/sidekiq_migrate_jobs_spec.rb')
-rw-r--r-- | spec/lib/gitlab/sidekiq_migrate_jobs_spec.rb | 322 |
1 files changed, 283 insertions, 39 deletions
diff --git a/spec/lib/gitlab/sidekiq_migrate_jobs_spec.rb b/spec/lib/gitlab/sidekiq_migrate_jobs_spec.rb index d4391d3023a..9ed2a0642fc 100644 --- a/spec/lib/gitlab/sidekiq_migrate_jobs_spec.rb +++ b/spec/lib/gitlab/sidekiq_migrate_jobs_spec.rb @@ -16,34 +16,42 @@ RSpec.describe Gitlab::SidekiqMigrateJobs, :clean_gitlab_redis_queues do clear_queues end - describe '#execute', :aggregate_failures do + describe '#migrate_set', :aggregate_failures do shared_examples 'processing a set' do - let(:migrator) { described_class.new(set_name) } + let(:migrator) { described_class.new(mappings) } let(:set_after) do Sidekiq.redis { |c| c.zrange(set_name, 0, -1, with_scores: true) } - .map { |item, score| [Sidekiq.load_json(item), score] } + .map { |item, score| [Gitlab::Json.load(item), score] } end context 'when the set is empty' do + let(:mappings) { { 'AuthorizedProjectsWorker' => 'new_queue' } } + it 'returns the number of scanned and migrated jobs' do - expect(migrator.execute('AuthorizedProjectsWorker' => 'new_queue')).to eq(scanned: 0, migrated: 0) + expect(migrator.migrate_set(set_name)).to eq( + scanned: 0, + migrated: 0) end end context 'when the set is not empty' do + let(:mappings) { {} } + it 'returns the number of scanned and migrated jobs' do create_jobs - expect(migrator.execute({})).to eq(scanned: 4, migrated: 0) + expect(migrator.migrate_set(set_name)).to eq(scanned: 4, migrated: 0) end end context 'when there are no matching jobs' do + let(:mappings) { { 'PostReceive' => 'new_queue' } } + it 'does not change any queue names' do create_jobs(include_post_receive: false) - expect(migrator.execute('PostReceive' => 'new_queue')).to eq(scanned: 3, migrated: 0) + expect(migrator.migrate_set(set_name)).to eq(scanned: 3, migrated: 0) expect(set_after.length).to eq(3) expect(set_after.map(&:first)).to all(include('queue' => 'authorized_projects', @@ -53,10 +61,13 @@ RSpec.describe Gitlab::SidekiqMigrateJobs, :clean_gitlab_redis_queues do context 'when there are matching jobs' do it 'migrates only the workers matching the given worker from the set' do + migrator = described_class.new({ 'AuthorizedProjectsWorker' => 'new_queue' }) freeze_time do create_jobs - expect(migrator.execute('AuthorizedProjectsWorker' => 'new_queue')).to eq(scanned: 4, migrated: 3) + expect(migrator.migrate_set(set_name)).to eq( + scanned: 4, + migrated: 3) set_after.each.with_index do |(item, score), i| if item['class'] == 'AuthorizedProjectsWorker' @@ -71,11 +82,14 @@ RSpec.describe Gitlab::SidekiqMigrateJobs, :clean_gitlab_redis_queues do end it 'allows migrating multiple workers at once' do + migrator = described_class.new({ + 'AuthorizedProjectsWorker' => 'new_queue', + 'PostReceive' => 'another_queue' + }) freeze_time do create_jobs - expect(migrator.execute('AuthorizedProjectsWorker' => 'new_queue', 'PostReceive' => 'another_queue')) - .to eq(scanned: 4, migrated: 4) + expect(migrator.migrate_set(set_name)).to eq(scanned: 4, migrated: 4) set_after.each.with_index do |(item, score), i| if item['class'] == 'AuthorizedProjectsWorker' @@ -90,11 +104,14 @@ RSpec.describe Gitlab::SidekiqMigrateJobs, :clean_gitlab_redis_queues do end it 'allows migrating multiple workers to the same queue' do + migrator = described_class.new({ + 'AuthorizedProjectsWorker' => 'new_queue', + 'PostReceive' => 'new_queue' + }) freeze_time do create_jobs - expect(migrator.execute('AuthorizedProjectsWorker' => 'new_queue', 'PostReceive' => 'new_queue')) - .to eq(scanned: 4, migrated: 4) + expect(migrator.migrate_set(set_name)).to eq(scanned: 4, migrated: 4) set_after.each.with_index do |(item, score), i| expect(item).to include('queue' => 'new_queue', 'args' => [i]) @@ -104,16 +121,17 @@ RSpec.describe Gitlab::SidekiqMigrateJobs, :clean_gitlab_redis_queues do end it 'does not try to migrate jobs that are removed from the set during the migration' do + migrator = described_class.new({ 'PostReceive' => 'new_queue' }) freeze_time do create_jobs - allow(migrator).to receive(:migrate_job).and_wrap_original do |meth, *args| - Sidekiq.redis { |c| c.zrem(set_name, args.first) } + allow(migrator).to receive(:migrate_job_in_set).and_wrap_original do |meth, *args| + Sidekiq.redis { |c| c.zrem(set_name, args.second) } meth.call(*args) end - expect(migrator.execute('PostReceive' => 'new_queue')).to eq(scanned: 4, migrated: 0) + expect(migrator.migrate_set(set_name)).to eq(scanned: 4, migrated: 0) expect(set_after.length).to eq(3) expect(set_after.map(&:first)).to all(include('queue' => 'authorized_projects')) @@ -121,11 +139,12 @@ RSpec.describe Gitlab::SidekiqMigrateJobs, :clean_gitlab_redis_queues do end it 'does not try to migrate unmatched jobs that are added to the set during the migration' do + migrator = described_class.new({ 'PostReceive' => 'new_queue' }) create_jobs calls = 0 - allow(migrator).to receive(:migrate_job).and_wrap_original do |meth, *args| + allow(migrator).to receive(:migrate_job_in_set).and_wrap_original do |meth, *args| if calls == 0 travel_to(5.hours.from_now) { create_jobs(include_post_receive: false) } end @@ -135,18 +154,19 @@ RSpec.describe Gitlab::SidekiqMigrateJobs, :clean_gitlab_redis_queues do meth.call(*args) end - expect(migrator.execute('PostReceive' => 'new_queue')).to eq(scanned: 4, migrated: 1) + expect(migrator.migrate_set(set_name)).to eq(scanned: 4, migrated: 1) expect(set_after.group_by { |job| job.first['queue'] }.transform_values(&:count)) .to eq('authorized_projects' => 6, 'new_queue' => 1) end it 'iterates through the entire set of jobs' do + migrator = described_class.new({ 'NonExistentWorker' => 'new_queue' }) 50.times do |i| travel_to(i.hours.from_now) { create_jobs } end - expect(migrator.execute('NonExistentWorker' => 'new_queue')).to eq(scanned: 200, migrated: 0) + expect(migrator.migrate_set(set_name)).to eq(scanned: 200, migrated: 0) expect(set_after.length).to eq(200) end @@ -158,14 +178,16 @@ RSpec.describe Gitlab::SidekiqMigrateJobs, :clean_gitlab_redis_queues do stub_const("#{described_class}::LOG_FREQUENCY", 2) logger = Logger.new(StringIO.new) - migrator = described_class.new(set_name, logger: logger) + migrator = described_class.new({ + 'AuthorizedProjectsWorker' => 'new_queue', + 'PostReceive' => 'another_queue' + }, logger: logger) expect(logger).to receive(:info).with(a_string_matching('Processing')).once.ordered expect(logger).to receive(:info).with(a_string_matching('In progress')).once.ordered expect(logger).to receive(:info).with(a_string_matching('Done')).once.ordered - expect(migrator.execute('AuthorizedProjectsWorker' => 'new_queue', 'PostReceive' => 'new_queue')) - .to eq(scanned: 4, migrated: 4) + expect(migrator.migrate_set(set_name)).to eq(scanned: 4, migrated: 4) end end end @@ -186,25 +208,6 @@ RSpec.describe Gitlab::SidekiqMigrateJobs, :clean_gitlab_redis_queues do end context 'retried jobs' do - let(:set_name) { 'retry' } - # Account for Sidekiq retry jitter - # https://github.com/mperham/sidekiq/blob/3575ccb44c688dd08bfbfd937696260b12c622fb/lib/sidekiq/job_retry.rb#L217 - let(:schedule_jitter) { 10 } - - # Try to mimic as closely as possible what Sidekiq will actually - # do to retry a job. - def retry_in(klass, time, args) - message = { 'class' => klass.name, 'args' => [args], 'retry' => true }.to_json - - allow(klass).to receive(:sidekiq_retry_in_block).and_return(proc { time }) - - begin - Sidekiq::JobRetry.new.local(klass, message, klass.queue) { raise 'boom' } - rescue Sidekiq::JobRetry::Skip - # Sidekiq scheduled the retry - end - end - def create_jobs(include_post_receive: true) retry_in(AuthorizedProjectsWorker, 1.hour, 0) retry_in(AuthorizedProjectsWorker, 2.hours, 1) @@ -212,7 +215,248 @@ RSpec.describe Gitlab::SidekiqMigrateJobs, :clean_gitlab_redis_queues do retry_in(AuthorizedProjectsWorker, 4.hours, 3) end + include_context 'when handling retried jobs' it_behaves_like 'processing a set' end end + + describe '#migrate_queues', :aggregate_failures do + let(:migrator) { described_class.new(mappings, logger: logger) } + let(:logger) { nil } + + def list_queues + queues = Sidekiq.redis do |conn| + conn.scan_each(match: "queue:*").to_a + end + queues.uniq.map { |queue| queue.split(':', 2).last } + end + + def list_jobs(queue_name) + Sidekiq.redis { |conn| conn.lrange("queue:#{queue_name}", 0, -1) } + .map { |item| Gitlab::Json.load(item) } + end + + def pre_migrate_checks; end + + before do + queue_name_from_worker_name = Gitlab::SidekiqConfig::WorkerRouter.method(:queue_name_from_worker_name) + EmailReceiverWorker.sidekiq_options(queue: queue_name_from_worker_name.call(EmailReceiverWorker)) + EmailReceiverWorker.perform_async('foo') + EmailReceiverWorker.perform_async('bar') + + # test worker that has ':' inside the queue name + AuthorizedProjectUpdate::ProjectRecalculateWorker.sidekiq_options( + queue: queue_name_from_worker_name.call(AuthorizedProjectUpdate::ProjectRecalculateWorker) + ) + AuthorizedProjectUpdate::ProjectRecalculateWorker.perform_async + end + + after do + # resets the queue name to its original + EmailReceiverWorker.set_queue + AuthorizedProjectUpdate::ProjectRecalculateWorker.set_queue + end + + shared_examples 'migrating queues' do + it 'migrates the jobs to the correct destination queue' do + queues = list_queues + expect(queues).to include(*queues_included_pre_migrate) + expect(queues).not_to include(*queues_excluded_pre_migrate) + pre_migrate_checks + + migrator.migrate_queues + + queues = list_queues + expect(queues).not_to include(*queues_excluded_post_migrate) + expect(queues).to include(*queues_included_post_migrate) + post_migrate_checks + end + end + + context 'with all workers mapped to default queue' do + let(:mappings) do + { 'EmailReceiverWorker' => 'default', 'AuthorizedProjectUpdate::ProjectRecalculateWorker' => 'default' } + end + + let(:queues_included_pre_migrate) do + ['email_receiver', + 'authorized_project_update:authorized_project_update_project_recalculate'] + end + + let(:queues_excluded_pre_migrate) { ['default'] } + let(:queues_excluded_post_migrate) do + ['email_receiver', + 'authorized_project_update:authorized_project_update_project_recalculate'] + end + + let(:queues_included_post_migrate) { ['default'] } + + def post_migrate_checks + jobs = list_jobs('default') + expect(jobs.length).to eq(3) + sorted = jobs.sort_by { |job| [job["class"], job["args"]] } + expect(sorted[0]).to include('class' => 'AuthorizedProjectUpdate::ProjectRecalculateWorker', + 'queue' => 'default') + expect(sorted[1]).to include('class' => 'EmailReceiverWorker', 'args' => ['bar'], 'queue' => 'default') + expect(sorted[2]).to include('class' => 'EmailReceiverWorker', 'args' => ['foo'], 'queue' => 'default') + end + + it_behaves_like 'migrating queues' + end + + context 'with custom mapping to different queues' do + let(:mappings) do + { 'EmailReceiverWorker' => 'new_email', + 'AuthorizedProjectUpdate::ProjectRecalculateWorker' => 'new_authorized' } + end + + let(:queues_included_pre_migrate) do + ['email_receiver', + 'authorized_project_update:authorized_project_update_project_recalculate'] + end + + let(:queues_excluded_pre_migrate) { %w[new_email new_authorized] } + let(:queues_excluded_post_migrate) do + ['email_receiver', + 'authorized_project_update:authorized_project_update_project_recalculate'] + end + + let(:queues_included_post_migrate) { %w[new_email new_authorized] } + + def post_migrate_checks + email_jobs = list_jobs('new_email') + expect(email_jobs.length).to eq(2) + expect(email_jobs[0]).to include('class' => 'EmailReceiverWorker', 'args' => ['bar'], 'queue' => 'new_email') + expect(email_jobs[1]).to include('class' => 'EmailReceiverWorker', 'args' => ['foo'], 'queue' => 'new_email') + + export_jobs = list_jobs('new_authorized') + expect(export_jobs.length).to eq(1) + expect(export_jobs[0]).to include('class' => 'AuthorizedProjectUpdate::ProjectRecalculateWorker', + 'queue' => 'new_authorized') + end + + it_behaves_like 'migrating queues' + end + + context 'with illegal JSON payload' do + let(:job) { '{foo: 1}' } + let(:mappings) do + { 'EmailReceiverWorker' => 'default', 'AuthorizedProjectUpdate::ProjectRecalculateWorker' => 'default' } + end + + let(:queues_included_pre_migrate) do + ['email_receiver', + 'authorized_project_update:authorized_project_update_project_recalculate'] + end + + let(:queues_excluded_pre_migrate) { ['default'] } + let(:queues_excluded_post_migrate) do + ['email_receiver', + 'authorized_project_update:authorized_project_update_project_recalculate'] + end + + let(:queues_included_post_migrate) { ['default'] } + let(:logger) { Logger.new(StringIO.new) } + + before do + Sidekiq.redis do |conn| + conn.lpush("queue:email_receiver", job) + end + end + + def pre_migrate_checks + expect(logger).to receive(:error) + .with(a_string_matching('Unmarshal JSON payload from SidekiqMigrateJobs failed')) + .once + end + + def post_migrate_checks + jobs = list_jobs('default') + expect(jobs.length).to eq(3) + sorted = jobs.sort_by { |job| [job["class"], job["args"]] } + expect(sorted[0]).to include('class' => 'AuthorizedProjectUpdate::ProjectRecalculateWorker', + 'queue' => 'default') + expect(sorted[1]).to include('class' => 'EmailReceiverWorker', 'args' => ['bar'], 'queue' => 'default') + expect(sorted[2]).to include('class' => 'EmailReceiverWorker', 'args' => ['foo'], 'queue' => 'default') + end + + it_behaves_like 'migrating queues' + end + + context 'when multiple workers are in the same queue' do + before do + ExportCsvWorker.sidekiq_options(queue: 'email_receiver') # follows EmailReceiverWorker's queue + ExportCsvWorker.perform_async('fizz') + end + + after do + ExportCsvWorker.set_queue + end + + context 'when the queue exists in mappings' do + let(:mappings) do + { 'EmailReceiverWorker' => 'email_receiver', 'AuthorizedProjectUpdate::ProjectRecalculateWorker' => 'default', + 'ExportCsvWorker' => 'default' } + end + + let(:queues_included_pre_migrate) do + ['email_receiver', + 'authorized_project_update:authorized_project_update_project_recalculate'] + end + + let(:queues_excluded_pre_migrate) { ['default'] } + let(:queues_excluded_post_migrate) do + ['authorized_project_update:authorized_project_update_project_recalculate'] + end + + let(:queues_included_post_migrate) { %w[default email_receiver] } + + it_behaves_like 'migrating queues' + def post_migrate_checks + # jobs from email_receiver are not migrated at all + jobs = list_jobs('email_receiver') + expect(jobs.length).to eq(3) + sorted = jobs.sort_by { |job| [job["class"], job["args"]] } + expect(sorted[0]).to include('class' => 'EmailReceiverWorker', 'args' => ['bar'], 'queue' => 'email_receiver') + expect(sorted[1]).to include('class' => 'EmailReceiverWorker', 'args' => ['foo'], 'queue' => 'email_receiver') + expect(sorted[2]).to include('class' => 'ExportCsvWorker', 'args' => ['fizz'], 'queue' => 'email_receiver') + end + end + + context 'when the queue doesnt exist in mappings' do + let(:mappings) do + { 'EmailReceiverWorker' => 'default', 'AuthorizedProjectUpdate::ProjectRecalculateWorker' => 'default', + 'ExportCsvWorker' => 'default' } + end + + let(:queues_included_pre_migrate) do + ['email_receiver', + 'authorized_project_update:authorized_project_update_project_recalculate'] + end + + let(:queues_excluded_pre_migrate) { ['default'] } + let(:queues_excluded_post_migrate) do + ['email_receiver', 'authorized_project_update:authorized_project_update_project_recalculate'] + end + + let(:queues_included_post_migrate) { ['default'] } + + it_behaves_like 'migrating queues' + def post_migrate_checks + # jobs from email_receiver are all migrated + jobs = list_jobs('email_receiver') + expect(jobs.length).to eq(0) + + jobs = list_jobs('default') + expect(jobs.length).to eq(4) + sorted = jobs.sort_by { |job| [job["class"], job["args"]] } + expect(sorted[0]).to include('class' => 'AuthorizedProjectUpdate::ProjectRecalculateWorker', + 'queue' => 'default') + expect(sorted[1]).to include('class' => 'EmailReceiverWorker', 'args' => ['bar'], 'queue' => 'default') + expect(sorted[2]).to include('class' => 'EmailReceiverWorker', 'args' => ['foo'], 'queue' => 'default') + expect(sorted[3]).to include('class' => 'ExportCsvWorker', 'args' => ['fizz'], 'queue' => 'default') + end + end + end + end end |