Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'spec/lib/gitlab/background_migration/job_coordinator_spec.rb')
-rw-r--r--spec/lib/gitlab/background_migration/job_coordinator_spec.rb344
1 files changed, 344 insertions, 0 deletions
diff --git a/spec/lib/gitlab/background_migration/job_coordinator_spec.rb b/spec/lib/gitlab/background_migration/job_coordinator_spec.rb
new file mode 100644
index 00000000000..a0543ca9958
--- /dev/null
+++ b/spec/lib/gitlab/background_migration/job_coordinator_spec.rb
@@ -0,0 +1,344 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+RSpec.describe Gitlab::BackgroundMigration::JobCoordinator do
+ let(:database) { :main }
+ let(:worker_class) { BackgroundMigrationWorker }
+ let(:coordinator) { described_class.new(database, worker_class) }
+
+ describe '.for_database' do
+ it 'returns an executor with the correct worker class and database' do
+ coordinator = described_class.for_database(database)
+
+ expect(coordinator.database).to eq(database)
+ expect(coordinator.worker_class).to eq(worker_class)
+ end
+
+ context 'when passed in as a string' do
+ it 'retruns an executor with the correct worker class and database' do
+ coordinator = described_class.for_database(database.to_s)
+
+ expect(coordinator.database).to eq(database)
+ expect(coordinator.worker_class).to eq(worker_class)
+ end
+ end
+
+ context 'when an invalid value is given' do
+ it 'raises an error' do
+ expect do
+ described_class.for_database('notvalid')
+ end.to raise_error(ArgumentError, "database must be one of [main], got 'notvalid'")
+ end
+ end
+ end
+
+ describe '#queue' do
+ it 'returns background migration worker queue' do
+ expect(coordinator.queue).to eq(worker_class.sidekiq_options['queue'])
+ end
+ end
+
+ describe '#with_shared_connection' do
+ it 'yields to the block after properly configuring SharedModel' do
+ expect(Gitlab::Database::SharedModel).to receive(:using_connection)
+ .with(ActiveRecord::Base.connection).and_yield
+
+ expect { |b| coordinator.with_shared_connection(&b) }.to yield_with_no_args
+ end
+ end
+
+ describe '#steal' do
+ context 'when there are enqueued jobs present' do
+ let(:queue) do
+ [
+ double(args: ['Foo', [10, 20]], klass: worker_class.name),
+ double(args: ['Bar', [20, 30]], klass: worker_class.name),
+ double(args: ['Foo', [20, 30]], klass: 'MergeWorker')
+ ]
+ end
+
+ before do
+ allow(Sidekiq::Queue).to receive(:new)
+ .with(coordinator.queue)
+ .and_return(queue)
+ end
+
+ context 'when queue contains unprocessed jobs' do
+ it 'steals jobs from a queue' do
+ expect(queue[0]).to receive(:delete).and_return(true)
+
+ expect(coordinator).to receive(:perform).with('Foo', [10, 20])
+
+ coordinator.steal('Foo')
+ end
+
+ it 'sets up the shared connection while stealing jobs' do
+ connection = double('connection')
+ allow(coordinator).to receive(:connection).and_return(connection)
+
+ expect(coordinator).to receive(:with_shared_connection).and_call_original
+
+ expect(queue[0]).to receive(:delete).and_return(true)
+
+ expect(coordinator).to receive(:perform).with('Foo', [10, 20]) do
+ expect(Gitlab::Database::SharedModel.connection).to be(connection)
+ end
+
+ coordinator.steal('Foo') do
+ expect(Gitlab::Database::SharedModel.connection).to be(connection)
+
+ true # the job is only performed if the block returns true
+ end
+ end
+
+ it 'does not steal job that has already been taken' do
+ expect(queue[0]).to receive(:delete).and_return(false)
+
+ expect(coordinator).not_to receive(:perform)
+
+ coordinator.steal('Foo')
+ end
+
+ it 'does not steal jobs for a different migration' do
+ expect(coordinator).not_to receive(:perform)
+
+ expect(queue[0]).not_to receive(:delete)
+
+ coordinator.steal('Baz')
+ end
+
+ context 'when a custom predicate is given' do
+ it 'steals jobs that match the predicate' do
+ expect(queue[0]).to receive(:delete).and_return(true)
+
+ expect(coordinator).to receive(:perform).with('Foo', [10, 20])
+
+ coordinator.steal('Foo') { |job| job.args.second.first == 10 && job.args.second.second == 20 }
+ end
+
+ it 'does not steal jobs that do not match the predicate' do
+ expect(described_class).not_to receive(:perform)
+
+ expect(queue[0]).not_to receive(:delete)
+
+ coordinator.steal('Foo') { |(arg1, _)| arg1 == 5 }
+ end
+ end
+ end
+
+ context 'when one of the jobs raises an error' do
+ let(:migration) { spy(:migration) }
+
+ let(:queue) do
+ [double(args: ['Foo', [10, 20]], klass: worker_class.name),
+ double(args: ['Foo', [20, 30]], klass: worker_class.name)]
+ end
+
+ before do
+ stub_const('Gitlab::BackgroundMigration::Foo', migration)
+
+ allow(queue[0]).to receive(:delete).and_return(true)
+ allow(queue[1]).to receive(:delete).and_return(true)
+ end
+
+ it 'enqueues the migration again and re-raises the error' do
+ allow(migration).to receive(:perform).with(10, 20).and_raise(Exception, 'Migration error').once
+
+ expect(worker_class).to receive(:perform_async).with('Foo', [10, 20]).once
+
+ expect { coordinator.steal('Foo') }.to raise_error(Exception)
+ end
+ end
+ end
+
+ context 'when there are scheduled jobs present', :redis do
+ it 'steals all jobs from the scheduled sets' do
+ Sidekiq::Testing.disable! do
+ worker_class.perform_in(10.minutes, 'Object')
+
+ expect(Sidekiq::ScheduledSet.new).to be_one
+ expect(coordinator).to receive(:perform).with('Object', any_args)
+
+ coordinator.steal('Object')
+
+ expect(Sidekiq::ScheduledSet.new).to be_none
+ end
+ end
+ end
+
+ context 'when there are enqueued and scheduled jobs present', :redis do
+ it 'steals from the scheduled sets queue first' do
+ Sidekiq::Testing.disable! do
+ expect(coordinator).to receive(:perform).with('Object', [1]).ordered
+ expect(coordinator).to receive(:perform).with('Object', [2]).ordered
+
+ worker_class.perform_async('Object', [2])
+ worker_class.perform_in(10.minutes, 'Object', [1])
+
+ coordinator.steal('Object')
+ end
+ end
+ end
+
+ context 'when retry_dead_jobs is true', :redis do
+ let(:retry_queue) do
+ [double(args: ['Object', [3]], klass: worker_class.name, delete: true)]
+ end
+
+ let(:dead_queue) do
+ [double(args: ['Object', [4]], klass: worker_class.name, delete: true)]
+ end
+
+ before do
+ allow(Sidekiq::RetrySet).to receive(:new).and_return(retry_queue)
+ allow(Sidekiq::DeadSet).to receive(:new).and_return(dead_queue)
+ end
+
+ it 'steals from the dead and retry queue' do
+ Sidekiq::Testing.disable! do
+ expect(coordinator).to receive(:perform).with('Object', [1]).ordered
+ expect(coordinator).to receive(:perform).with('Object', [2]).ordered
+ expect(coordinator).to receive(:perform).with('Object', [3]).ordered
+ expect(coordinator).to receive(:perform).with('Object', [4]).ordered
+
+ worker_class.perform_async('Object', [2])
+ worker_class.perform_in(10.minutes, 'Object', [1])
+
+ coordinator.steal('Object', retry_dead_jobs: true)
+ end
+ end
+ end
+ end
+
+ describe '#perform' do
+ let(:migration) { spy(:migration) }
+ let(:connection) { double('connection') }
+
+ before do
+ stub_const('Gitlab::BackgroundMigration::Foo', migration)
+
+ allow(coordinator).to receive(:connection).and_return(connection)
+ end
+
+ it 'performs a background migration with the configured shared connection' do
+ expect(coordinator).to receive(:with_shared_connection).and_call_original
+
+ expect(migration).to receive(:perform).with(10, 20).once do
+ expect(Gitlab::Database::SharedModel.connection).to be(connection)
+ end
+
+ coordinator.perform('Foo', [10, 20])
+ end
+ end
+
+ describe '.remaining', :redis do
+ context 'when there are jobs remaining' do
+ before do
+ Sidekiq::Testing.disable! do
+ MergeWorker.perform_async('Foo')
+ MergeWorker.perform_in(10.minutes, 'Foo')
+
+ 5.times do
+ worker_class.perform_async('Foo')
+ end
+ 3.times do
+ worker_class.perform_in(10.minutes, 'Foo')
+ end
+ end
+ end
+
+ it 'returns the enqueued jobs plus the scheduled jobs' do
+ expect(coordinator.remaining).to eq(8)
+ end
+ end
+
+ context 'when there are no jobs remaining' do
+ it 'returns zero' do
+ expect(coordinator.remaining).to be_zero
+ end
+ end
+ end
+
+ describe '.exists?', :redis do
+ context 'when there are enqueued jobs present' do
+ before do
+ Sidekiq::Testing.disable! do
+ MergeWorker.perform_async('Bar')
+ worker_class.perform_async('Foo')
+ end
+ end
+
+ it 'returns true if specific job exists' do
+ expect(coordinator.exists?('Foo')).to eq(true)
+ end
+
+ it 'returns false if specific job does not exist' do
+ expect(coordinator.exists?('Bar')).to eq(false)
+ end
+ end
+
+ context 'when there are scheduled jobs present' do
+ before do
+ Sidekiq::Testing.disable! do
+ MergeWorker.perform_in(10.minutes, 'Bar')
+ worker_class.perform_in(10.minutes, 'Foo')
+ end
+ end
+
+ it 'returns true if specific job exists' do
+ expect(coordinator.exists?('Foo')).to eq(true)
+ end
+
+ it 'returns false if specific job does not exist' do
+ expect(coordinator.exists?('Bar')).to eq(false)
+ end
+ end
+ end
+
+ describe '.dead_jobs?' do
+ let(:queue) do
+ [
+ double(args: ['Foo', [10, 20]], klass: worker_class.name),
+ double(args: ['Bar'], klass: 'MergeWorker')
+ ]
+ end
+
+ context 'when there are dead jobs present' do
+ before do
+ allow(Sidekiq::DeadSet).to receive(:new).and_return(queue)
+ end
+
+ it 'returns true if specific job exists' do
+ expect(coordinator.dead_jobs?('Foo')).to eq(true)
+ end
+
+ it 'returns false if specific job does not exist' do
+ expect(coordinator.dead_jobs?('Bar')).to eq(false)
+ end
+ end
+ end
+
+ describe '.retrying_jobs?' do
+ let(:queue) do
+ [
+ double(args: ['Foo', [10, 20]], klass: worker_class.name),
+ double(args: ['Bar'], klass: 'MergeWorker')
+ ]
+ end
+
+ context 'when there are dead jobs present' do
+ before do
+ allow(Sidekiq::RetrySet).to receive(:new).and_return(queue)
+ end
+
+ it 'returns true if specific job exists' do
+ expect(coordinator.retrying_jobs?('Foo')).to eq(true)
+ end
+
+ it 'returns false if specific job does not exist' do
+ expect(coordinator.retrying_jobs?('Bar')).to eq(false)
+ end
+ end
+ end
+end