diff options
Diffstat (limited to 'spec/workers/bulk_imports/pipeline_worker_spec.rb')
-rw-r--r-- | spec/workers/bulk_imports/pipeline_worker_spec.rb | 251 |
1 files changed, 188 insertions, 63 deletions
diff --git a/spec/workers/bulk_imports/pipeline_worker_spec.rb b/spec/workers/bulk_imports/pipeline_worker_spec.rb index d99b3e9de73..368c7537641 100644 --- a/spec/workers/bulk_imports/pipeline_worker_spec.rb +++ b/spec/workers/bulk_imports/pipeline_worker_spec.rb @@ -16,12 +16,16 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do def self.file_extraction_pipeline? false end + + def self.abort_on_failure? + false + end end end let_it_be(:bulk_import) { create(:bulk_import) } let_it_be(:config) { create(:bulk_import_configuration, bulk_import: bulk_import) } - let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import) } + let_it_be_with_reload(:entity) { create(:bulk_import_entity, bulk_import: bulk_import) } let(:pipeline_tracker) do create( @@ -44,7 +48,7 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do end end - include_examples 'an idempotent worker' do + it_behaves_like 'an idempotent worker' do let(:job_args) { [pipeline_tracker.id, pipeline_tracker.stage, entity.id] } it 'runs the pipeline and sets tracker to finished' do @@ -61,17 +65,9 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do it 'runs the given pipeline successfully' do expect_next_instance_of(BulkImports::Logger) do |logger| - expect(logger) - .to receive(:info) - .with( - hash_including( - 'pipeline_class' => 'FakePipeline', - 'bulk_import_id' => entity.bulk_import_id, - 'bulk_import_entity_id' => entity.id, - 'bulk_import_entity_type' => entity.source_type, - 'source_full_path' => entity.source_full_path - ) - ) + expect(logger).to receive(:with_tracker).with(pipeline_tracker).and_call_original + expect(logger).to receive(:with_entity).with(pipeline_tracker.entity).and_call_original + expect(logger).to receive(:info) end allow(worker).to receive(:jid).and_return('jid') @@ -98,22 +94,9 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do job = { 'args' => [pipeline_tracker.id, pipeline_tracker.stage, entity.id] } expect_next_instance_of(BulkImports::Logger) do |logger| - expect(logger) - .to receive(:error) - .with( - hash_including( - 'pipeline_class' => 'FakePipeline', - 'bulk_import_entity_id' => entity.id, - 'bulk_import_id' => entity.bulk_import_id, - 'bulk_import_entity_type' => entity.source_type, - 'source_full_path' => entity.source_full_path, - 'class' => 'BulkImports::PipelineWorker', - 'exception.message' => 'Error!', - 'message' => 'Pipeline failed', - 'source_version' => entity.bulk_import.source_version_info.to_s, - 'importer' => 'gitlab_migration' - ) - ) + expect(logger).to receive(:with_tracker).with(pipeline_tracker).and_call_original + expect(logger).to receive(:with_entity).with(pipeline_tracker.entity).and_call_original + expect(logger).to receive(:error) end expect(Gitlab::ErrorTracking) @@ -121,13 +104,13 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do .with( instance_of(StandardError), hash_including( - 'bulk_import_entity_id' => entity.id, - 'bulk_import_id' => entity.bulk_import.id, - 'bulk_import_entity_type' => entity.source_type, - 'source_full_path' => entity.source_full_path, - 'pipeline_class' => pipeline_tracker.pipeline_name, - 'importer' => 'gitlab_migration', - 'source_version' => entity.bulk_import.source_version_info.to_s + bulk_import_entity_id: entity.id, + bulk_import_id: entity.bulk_import.id, + bulk_import_entity_type: entity.source_type, + source_full_path: entity.source_full_path, + pipeline_class: pipeline_tracker.pipeline_name, + importer: 'gitlab_migration', + source_version: entity.bulk_import.source_version_info.to_s ) ) @@ -156,6 +139,21 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do expect(pipeline_tracker.status_name).to eq(:failed) expect(pipeline_tracker.jid).to eq('jid') + expect(entity.reload.status_name).to eq(:created) + end + + context 'when pipeline has abort_on_failure' do + before do + allow(pipeline_class).to receive(:abort_on_failure?).and_return(true) + end + + it 'marks entity as failed' do + job = { 'args' => [pipeline_tracker.id, pipeline_tracker.stage, entity.id] } + + described_class.sidekiq_retries_exhausted_block.call(job, StandardError.new('Error!')) + + expect(entity.reload.status_name).to eq(:failed) + end end end @@ -266,6 +264,10 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do describe '#perform' do context 'when entity is failed' do + before do + entity.update!(status: -1) + end + it 'marks tracker as skipped and logs the skip' do pipeline_tracker = create( :bulk_import_tracker, @@ -274,23 +276,12 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do status_event: 'enqueue' ) - entity.update!(status: -1) - expect_next_instance_of(BulkImports::Logger) do |logger| allow(logger).to receive(:info) expect(logger) .to receive(:info) - .with( - hash_including( - 'pipeline_class' => 'FakePipeline', - 'bulk_import_entity_id' => entity.id, - 'bulk_import_id' => entity.bulk_import_id, - 'bulk_import_entity_type' => entity.source_type, - 'source_full_path' => entity.source_full_path, - 'message' => 'Skipping pipeline due to failed entity' - ) - ) + .with(hash_including(message: 'Skipping pipeline due to failed entity')) end worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) @@ -323,23 +314,15 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do end end - it 'reenqueues the worker' do + it 're_enqueues the worker' do expect_any_instance_of(BulkImports::Tracker) do |tracker| expect(tracker).to receive(:retry).and_call_original end expect_next_instance_of(BulkImports::Logger) do |logger| - expect(logger) - .to receive(:info) - .with( - hash_including( - 'pipeline_class' => 'FakePipeline', - 'bulk_import_entity_id' => entity.id, - 'bulk_import_id' => entity.bulk_import_id, - 'bulk_import_entity_type' => entity.source_type, - 'source_full_path' => entity.source_full_path - ) - ) + expect(logger).to receive(:with_tracker).and_call_original + expect(logger).to receive(:with_entity).and_call_original + expect(logger).to receive(:info) end expect(described_class) @@ -495,8 +478,8 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do end end - context 'when export is batched' do - let(:batches_count) { 2 } + context 'when export is batched', :aggregate_failures do + let(:batches_count) { 3 } before do allow_next_instance_of(BulkImports::ExportStatus) do |status| @@ -506,10 +489,30 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do allow(status).to receive(:empty?).and_return(false) allow(status).to receive(:failed?).and_return(false) end + allow(worker).to receive(:log_extra_metadata_on_done).and_call_original end it 'enqueues pipeline batches' do + expect(BulkImports::PipelineBatchWorker).to receive(:perform_async).exactly(3).times + expect(worker).to receive(:log_extra_metadata_on_done).with(:batched, true) + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_batch_numbers_enqueued, [1, 2, 3]) + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_final_batch_was_enqueued, true) + + worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + + pipeline_tracker.reload + + expect(pipeline_tracker.status_name).to eq(:started) + expect(pipeline_tracker.batched).to eq(true) + expect(pipeline_tracker.batches.pluck_batch_numbers).to contain_exactly(1, 2, 3) + expect(described_class.jobs).to be_empty + end + + it 'enqueues only missing pipelines batches' do + create(:bulk_import_batch_tracker, tracker: pipeline_tracker, batch_number: 2) expect(BulkImports::PipelineBatchWorker).to receive(:perform_async).twice + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_batch_numbers_enqueued, [1, 3]) + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_final_batch_was_enqueued, true) worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) @@ -517,7 +520,8 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do expect(pipeline_tracker.status_name).to eq(:started) expect(pipeline_tracker.batched).to eq(true) - expect(pipeline_tracker.batches.count).to eq(batches_count) + expect(pipeline_tracker.batches.pluck_batch_numbers).to contain_exactly(1, 2, 3) + expect(described_class.jobs).to be_empty end context 'when batches count is less than 1' do @@ -531,6 +535,127 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do expect(pipeline_tracker.reload.status_name).to eq(:finished) end end + + context 'when pipeline batch enqueuing should be limited' do + using RSpec::Parameterized::TableSyntax + + before do + allow(::Gitlab::CurrentSettings).to receive(:bulk_import_concurrent_pipeline_batch_limit).and_return(2) + end + + it 'only enqueues limited batches and reenqueues itself' do + expect(BulkImports::PipelineBatchWorker).to receive(:perform_async).twice + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_batch_numbers_enqueued, [1, 2]) + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_final_batch_was_enqueued, false) + + worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + + pipeline_tracker.reload + + expect(pipeline_tracker.status_name).to eq(:started) + expect(pipeline_tracker.batched).to eq(true) + expect(pipeline_tracker.batches.pluck_batch_numbers).to contain_exactly(1, 2) + expect(described_class.jobs).to contain_exactly( + hash_including( + 'args' => [pipeline_tracker.id, pipeline_tracker.stage, entity.id], + 'scheduled_at' => be_within(1).of(10.seconds.from_now.to_i) + ) + ) + end + + context 'when there is a batch in progress' do + where(:status) { BulkImports::BatchTracker::IN_PROGRESS_STATES } + + with_them do + before do + create(:bulk_import_batch_tracker, status, batch_number: 1, tracker: pipeline_tracker) + end + + it 'counts the in progress batch against the limit' do + expect(BulkImports::PipelineBatchWorker).to receive(:perform_async).once + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_batch_numbers_enqueued, [2]) + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_final_batch_was_enqueued, false) + + worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + + pipeline_tracker.reload + + expect(pipeline_tracker.status_name).to eq(:started) + expect(pipeline_tracker.batched).to eq(true) + expect(pipeline_tracker.batches.pluck_batch_numbers).to contain_exactly(1, 2) + expect(described_class.jobs).to contain_exactly( + hash_including( + 'args' => [pipeline_tracker.id, pipeline_tracker.stage, entity.id], + 'scheduled_at' => be_within(1).of(10.seconds.from_now.to_i) + ) + ) + end + end + end + + context 'when there is a batch that has finished' do + where(:status) do + all_statuses = BulkImports::BatchTracker.state_machines[:status].states.map(&:name) + all_statuses - BulkImports::BatchTracker::IN_PROGRESS_STATES + end + + with_them do + before do + create(:bulk_import_batch_tracker, status, batch_number: 1, tracker: pipeline_tracker) + end + + it 'does not count the finished batch against the limit' do + expect(BulkImports::PipelineBatchWorker).to receive(:perform_async).twice + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_batch_numbers_enqueued, [2, 3]) + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_final_batch_was_enqueued, true) + + worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + + pipeline_tracker.reload + + expect(pipeline_tracker.batches.pluck_batch_numbers).to contain_exactly(1, 2, 3) + expect(described_class.jobs).to be_empty + end + end + end + + context 'when the feature flag is disabled' do + before do + stub_feature_flags(bulk_import_limit_concurrent_batches: false) + end + + it 'does not limit batches' do + expect(BulkImports::PipelineBatchWorker).to receive(:perform_async).exactly(3).times + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_batch_numbers_enqueued, [1, 2, 3]) + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_final_batch_was_enqueued, true) + + worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + + pipeline_tracker.reload + + expect(pipeline_tracker.status_name).to eq(:started) + expect(pipeline_tracker.batched).to eq(true) + expect(pipeline_tracker.batches.pluck_batch_numbers).to contain_exactly(1, 2, 3) + expect(described_class.jobs).to be_empty + end + + it 'still enqueues only missing pipelines batches' do + create(:bulk_import_batch_tracker, tracker: pipeline_tracker, batch_number: 2) + expect(BulkImports::PipelineBatchWorker).to receive(:perform_async).twice + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_batch_numbers_enqueued, [1, 3]) + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_final_batch_was_enqueued, true) + + worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + + pipeline_tracker.reload + + expect(pipeline_tracker.status_name).to eq(:started) + expect(pipeline_tracker.batched).to eq(true) + expect(pipeline_tracker.batches.pluck_batch_numbers).to contain_exactly(1, 2, 3) + expect(described_class.jobs).to be_empty + end + end + end end end end |