Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'spec/workers/bulk_imports/pipeline_batch_worker_spec.rb')
-rw-r--r--spec/workers/bulk_imports/pipeline_batch_worker_spec.rb213
1 files changed, 163 insertions, 50 deletions
diff --git a/spec/workers/bulk_imports/pipeline_batch_worker_spec.rb b/spec/workers/bulk_imports/pipeline_batch_worker_spec.rb
index 78ce52c41b4..c459c17b1bc 100644
--- a/spec/workers/bulk_imports/pipeline_batch_worker_spec.rb
+++ b/spec/workers/bulk_imports/pipeline_batch_worker_spec.rb
@@ -9,9 +9,17 @@ RSpec.describe BulkImports::PipelineBatchWorker, feature_category: :importers do
let(:pipeline_class) do
Class.new do
- def initialize(_); end
+ def initialize(context)
+ @context = context
+ end
+
+ def self.relation
+ 'labels'
+ end
- def run; end
+ def run
+ @context.tracker.finish!
+ end
def self.file_extraction_pipeline?
false
@@ -35,7 +43,6 @@ RSpec.describe BulkImports::PipelineBatchWorker, feature_category: :importers do
before do
stub_const('FakePipeline', pipeline_class)
- allow(subject).to receive(:jid).and_return('jid')
allow(entity).to receive(:pipeline_exists?).with('FakePipeline').and_return(true)
allow_next_instance_of(BulkImports::Groups::Stage) do |instance|
allow(instance)
@@ -44,51 +51,94 @@ RSpec.describe BulkImports::PipelineBatchWorker, feature_category: :importers do
end
end
+ include_examples 'an idempotent worker' do
+ let(:job_args) { batch.id }
+ let(:tracker) { create(:bulk_import_tracker, :started, entity: entity, pipeline_name: 'FakePipeline') }
+
+ it 'processes the batch once' do
+ allow_next_instance_of(pipeline_class) do |instance|
+ expect(instance).to receive(:run).once.and_call_original
+ end
+
+ perform_multiple(job_args)
+
+ expect(batch.reload).to be_finished
+ end
+ end
+
describe '#perform' do
it 'runs the given pipeline batch successfully' do
expect(BulkImports::FinishBatchedPipelineWorker).to receive(:perform_async).with(tracker.id)
+ expect_next_instance_of(BulkImports::Logger) do |logger|
+ expect(logger).to receive(:info).with(a_hash_including('message' => 'Batch tracker started'))
+ expect(logger).to receive(:info).with(a_hash_including('message' => 'Batch tracker finished'))
+ end
- subject.perform(batch.id)
+ worker.perform(batch.id)
expect(batch.reload).to be_finished
end
- context 'when tracker is failed' do
- let(:tracker) { create(:bulk_import_tracker, :failed) }
+ context 'with tracker status' do
+ context 'when tracker is failed' do
+ let(:tracker) { create(:bulk_import_tracker, :failed) }
- it 'skips the batch' do
- subject.perform(batch.id)
+ it 'skips the batch' do
+ worker.perform(batch.id)
- expect(batch.reload).to be_skipped
+ expect(batch.reload).to be_skipped
+ end
end
- end
- context 'when tracker is finished' do
- let(:tracker) { create(:bulk_import_tracker, :finished) }
+ context 'when tracker is finished' do
+ let(:tracker) { create(:bulk_import_tracker, :finished) }
- it 'skips the batch' do
- subject.perform(batch.id)
+ it 'skips the batch' do
+ worker.perform(batch.id)
- expect(batch.reload).to be_skipped
+ expect(batch.reload).to be_skipped
+ end
end
end
- context 'when batch status is started' do
- let(:batch) { create(:bulk_import_batch_tracker, :started, tracker: tracker) }
+ context 'with batch status' do
+ context 'when batch status is started' do
+ let(:batch) { create(:bulk_import_batch_tracker, :started, tracker: tracker) }
+
+ it 'finishes the batch' do
+ worker.perform(batch.id)
+
+ expect(batch.reload).to be_finished
+ end
+ end
+
+ context 'when batch status is created' do
+ let(:batch) { create(:bulk_import_batch_tracker, :created, tracker: tracker) }
- it 'runs the given pipeline batch successfully' do
- subject.perform(batch.id)
+ it 'finishes the batch' do
+ worker.perform(batch.id)
- expect(batch.reload).to be_finished
+ expect(batch.reload).to be_finished
+ end
+ end
+
+ context 'when batch status is finished' do
+ let(:batch) { create(:bulk_import_batch_tracker, :finished, tracker: tracker) }
+
+ it 'stays finished' do
+ worker.perform(batch.id)
+
+ expect(batch.reload).to be_finished
+ end
end
end
context 'when exclusive lease cannot be obtained' do
it 'does not run the pipeline' do
- expect(subject).to receive(:try_obtain_lease).and_return(false)
- expect(subject).not_to receive(:run)
+ expect(worker).to receive(:try_obtain_lease).and_return(false)
+ expect(worker).not_to receive(:run)
- subject.perform(batch.id)
+ worker.perform(batch.id)
end
end
@@ -104,44 +154,107 @@ RSpec.describe BulkImports::PipelineBatchWorker, feature_category: :importers do
expect(described_class).to receive(:perform_in).with(60, batch.id)
expect(BulkImports::FinishBatchedPipelineWorker).not_to receive(:perform_async).with(tracker.id)
- subject.perform(batch.id)
+ worker.perform(batch.id)
expect(batch.reload).to be_created
end
end
- context 'when pipeline is not retryable' do
- it 'fails the batch and creates a failure record' do
+ context 'when pipeline raises an error' do
+ it 'keeps batch status as `started` and lets the error bubble up' do
allow_next_instance_of(pipeline_class) do |instance|
allow(instance).to receive(:run).and_raise(StandardError, 'Something went wrong')
end
- expect(Gitlab::ErrorTracking).to receive(:track_exception).with(
- instance_of(StandardError),
- hash_including(
- batch_id: batch.id,
- tracker_id: tracker.id,
- pipeline_class: 'FakePipeline',
- pipeline_step: 'pipeline_batch_worker_run'
- )
- )
-
- expect(BulkImports::Failure).to receive(:create).with(
- bulk_import_entity_id: entity.id,
- pipeline_class: 'FakePipeline',
- pipeline_step: 'pipeline_batch_worker_run',
- exception_class: 'StandardError',
- exception_message: 'Something went wrong',
- correlation_id_value: anything
- )
-
- expect(BulkImports::FinishBatchedPipelineWorker).to receive(:perform_async).with(tracker.id)
-
- subject.perform(batch.id)
-
- expect(batch.reload).to be_failed
+ expect { worker.perform(batch.id) }.to raise_exception(StandardError)
+
+ expect(batch.reload).to be_started
end
end
end
end
+
+ describe '.sidekiq_retries_exhausted' do
+ it 'sets batch status to failed' do
+ job = { 'args' => [batch.id] }
+
+ expect(Gitlab::ErrorTracking).to receive(:track_exception).with(
+ instance_of(StandardError),
+ hash_including(
+ 'message' => 'Batch tracker failed',
+ 'batch_id' => batch.id,
+ 'tracker_id' => tracker.id,
+ 'pipeline_class' => 'FakePipeline',
+ 'pipeline_step' => 'pipeline_batch_worker_run',
+ 'importer' => 'gitlab_migration'
+ )
+ )
+
+ expect(BulkImports::Failure).to receive(:create).with(
+ bulk_import_entity_id: entity.id,
+ pipeline_class: 'FakePipeline',
+ pipeline_step: 'pipeline_batch_worker_run',
+ exception_class: 'StandardError',
+ exception_message: 'Something went wrong',
+ correlation_id_value: anything
+ )
+
+ expect(BulkImports::FinishBatchedPipelineWorker).to receive(:perform_async).with(tracker.id)
+
+ described_class.sidekiq_retries_exhausted_block.call(job, StandardError.new("Something went wrong"))
+
+ expect(batch.reload).to be_failed
+ end
+ end
+
+ context 'with stop signal from database health check' do
+ around do |example|
+ with_sidekiq_server_middleware do |chain|
+ chain.add Gitlab::SidekiqMiddleware::SkipJobs
+ Sidekiq::Testing.inline! { example.run }
+ end
+ end
+
+ before do
+ stub_feature_flags("drop_sidekiq_jobs_#{described_class.name}": false)
+
+ stop_signal = instance_double("Gitlab::Database::HealthStatus::Signals::Stop", stop?: true)
+ allow(Gitlab::Database::HealthStatus).to receive(:evaluate).and_return([stop_signal])
+ end
+
+ it 'defers the job by set time' do
+ expect_next_instance_of(described_class) do |worker|
+ expect(worker).not_to receive(:perform).with(batch.id)
+ end
+
+ expect(described_class).to receive(:perform_in).with(described_class::DEFER_ON_HEALTH_DELAY, batch.id)
+
+ described_class.perform_async(batch.id)
+ end
+
+ it 'lazy evaluates schema and tables', :aggregate_failures do
+ block = described_class.database_health_check_attrs[:block]
+
+ job_args = [batch.id]
+
+ schema, table = block.call([job_args])
+
+ expect(schema).to eq(:gitlab_main_cell)
+ expect(table).to eq(['labels'])
+ end
+
+ context 'when `bulk_import_deferred_workers` feature flag is disabled' do
+ it 'does not defer job execution' do
+ stub_feature_flags(bulk_import_deferred_workers: false)
+
+ expect_next_instance_of(described_class) do |worker|
+ expect(worker).to receive(:perform).with(batch.id)
+ end
+
+ expect(described_class).not_to receive(:perform_in)
+
+ described_class.perform_async(batch.id)
+ end
+ end
+ end
end