Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'spec/workers/bulk_imports/pipeline_worker_spec.rb')
-rw-r--r--spec/workers/bulk_imports/pipeline_worker_spec.rb251
1 files changed, 188 insertions, 63 deletions
diff --git a/spec/workers/bulk_imports/pipeline_worker_spec.rb b/spec/workers/bulk_imports/pipeline_worker_spec.rb
index d99b3e9de73..368c7537641 100644
--- a/spec/workers/bulk_imports/pipeline_worker_spec.rb
+++ b/spec/workers/bulk_imports/pipeline_worker_spec.rb
@@ -16,12 +16,16 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
def self.file_extraction_pipeline?
false
end
+
+ def self.abort_on_failure?
+ false
+ end
end
end
let_it_be(:bulk_import) { create(:bulk_import) }
let_it_be(:config) { create(:bulk_import_configuration, bulk_import: bulk_import) }
- let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import) }
+ let_it_be_with_reload(:entity) { create(:bulk_import_entity, bulk_import: bulk_import) }
let(:pipeline_tracker) do
create(
@@ -44,7 +48,7 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
end
end
- include_examples 'an idempotent worker' do
+ it_behaves_like 'an idempotent worker' do
let(:job_args) { [pipeline_tracker.id, pipeline_tracker.stage, entity.id] }
it 'runs the pipeline and sets tracker to finished' do
@@ -61,17 +65,9 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
it 'runs the given pipeline successfully' do
expect_next_instance_of(BulkImports::Logger) do |logger|
- expect(logger)
- .to receive(:info)
- .with(
- hash_including(
- 'pipeline_class' => 'FakePipeline',
- 'bulk_import_id' => entity.bulk_import_id,
- 'bulk_import_entity_id' => entity.id,
- 'bulk_import_entity_type' => entity.source_type,
- 'source_full_path' => entity.source_full_path
- )
- )
+ expect(logger).to receive(:with_tracker).with(pipeline_tracker).and_call_original
+ expect(logger).to receive(:with_entity).with(pipeline_tracker.entity).and_call_original
+ expect(logger).to receive(:info)
end
allow(worker).to receive(:jid).and_return('jid')
@@ -98,22 +94,9 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
job = { 'args' => [pipeline_tracker.id, pipeline_tracker.stage, entity.id] }
expect_next_instance_of(BulkImports::Logger) do |logger|
- expect(logger)
- .to receive(:error)
- .with(
- hash_including(
- 'pipeline_class' => 'FakePipeline',
- 'bulk_import_entity_id' => entity.id,
- 'bulk_import_id' => entity.bulk_import_id,
- 'bulk_import_entity_type' => entity.source_type,
- 'source_full_path' => entity.source_full_path,
- 'class' => 'BulkImports::PipelineWorker',
- 'exception.message' => 'Error!',
- 'message' => 'Pipeline failed',
- 'source_version' => entity.bulk_import.source_version_info.to_s,
- 'importer' => 'gitlab_migration'
- )
- )
+ expect(logger).to receive(:with_tracker).with(pipeline_tracker).and_call_original
+ expect(logger).to receive(:with_entity).with(pipeline_tracker.entity).and_call_original
+ expect(logger).to receive(:error)
end
expect(Gitlab::ErrorTracking)
@@ -121,13 +104,13 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
.with(
instance_of(StandardError),
hash_including(
- 'bulk_import_entity_id' => entity.id,
- 'bulk_import_id' => entity.bulk_import.id,
- 'bulk_import_entity_type' => entity.source_type,
- 'source_full_path' => entity.source_full_path,
- 'pipeline_class' => pipeline_tracker.pipeline_name,
- 'importer' => 'gitlab_migration',
- 'source_version' => entity.bulk_import.source_version_info.to_s
+ bulk_import_entity_id: entity.id,
+ bulk_import_id: entity.bulk_import.id,
+ bulk_import_entity_type: entity.source_type,
+ source_full_path: entity.source_full_path,
+ pipeline_class: pipeline_tracker.pipeline_name,
+ importer: 'gitlab_migration',
+ source_version: entity.bulk_import.source_version_info.to_s
)
)
@@ -156,6 +139,21 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
expect(pipeline_tracker.status_name).to eq(:failed)
expect(pipeline_tracker.jid).to eq('jid')
+ expect(entity.reload.status_name).to eq(:created)
+ end
+
+ context 'when pipeline has abort_on_failure' do
+ before do
+ allow(pipeline_class).to receive(:abort_on_failure?).and_return(true)
+ end
+
+ it 'marks entity as failed' do
+ job = { 'args' => [pipeline_tracker.id, pipeline_tracker.stage, entity.id] }
+
+ described_class.sidekiq_retries_exhausted_block.call(job, StandardError.new('Error!'))
+
+ expect(entity.reload.status_name).to eq(:failed)
+ end
end
end
@@ -266,6 +264,10 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
describe '#perform' do
context 'when entity is failed' do
+ before do
+ entity.update!(status: -1)
+ end
+
it 'marks tracker as skipped and logs the skip' do
pipeline_tracker = create(
:bulk_import_tracker,
@@ -274,23 +276,12 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
status_event: 'enqueue'
)
- entity.update!(status: -1)
-
expect_next_instance_of(BulkImports::Logger) do |logger|
allow(logger).to receive(:info)
expect(logger)
.to receive(:info)
- .with(
- hash_including(
- 'pipeline_class' => 'FakePipeline',
- 'bulk_import_entity_id' => entity.id,
- 'bulk_import_id' => entity.bulk_import_id,
- 'bulk_import_entity_type' => entity.source_type,
- 'source_full_path' => entity.source_full_path,
- 'message' => 'Skipping pipeline due to failed entity'
- )
- )
+ .with(hash_including(message: 'Skipping pipeline due to failed entity'))
end
worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
@@ -323,23 +314,15 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
end
end
- it 'reenqueues the worker' do
+ it 're_enqueues the worker' do
expect_any_instance_of(BulkImports::Tracker) do |tracker|
expect(tracker).to receive(:retry).and_call_original
end
expect_next_instance_of(BulkImports::Logger) do |logger|
- expect(logger)
- .to receive(:info)
- .with(
- hash_including(
- 'pipeline_class' => 'FakePipeline',
- 'bulk_import_entity_id' => entity.id,
- 'bulk_import_id' => entity.bulk_import_id,
- 'bulk_import_entity_type' => entity.source_type,
- 'source_full_path' => entity.source_full_path
- )
- )
+ expect(logger).to receive(:with_tracker).and_call_original
+ expect(logger).to receive(:with_entity).and_call_original
+ expect(logger).to receive(:info)
end
expect(described_class)
@@ -495,8 +478,8 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
end
end
- context 'when export is batched' do
- let(:batches_count) { 2 }
+ context 'when export is batched', :aggregate_failures do
+ let(:batches_count) { 3 }
before do
allow_next_instance_of(BulkImports::ExportStatus) do |status|
@@ -506,10 +489,30 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
allow(status).to receive(:empty?).and_return(false)
allow(status).to receive(:failed?).and_return(false)
end
+ allow(worker).to receive(:log_extra_metadata_on_done).and_call_original
end
it 'enqueues pipeline batches' do
+ expect(BulkImports::PipelineBatchWorker).to receive(:perform_async).exactly(3).times
+ expect(worker).to receive(:log_extra_metadata_on_done).with(:batched, true)
+ expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_batch_numbers_enqueued, [1, 2, 3])
+ expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_final_batch_was_enqueued, true)
+
+ worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
+
+ pipeline_tracker.reload
+
+ expect(pipeline_tracker.status_name).to eq(:started)
+ expect(pipeline_tracker.batched).to eq(true)
+ expect(pipeline_tracker.batches.pluck_batch_numbers).to contain_exactly(1, 2, 3)
+ expect(described_class.jobs).to be_empty
+ end
+
+ it 'enqueues only missing pipelines batches' do
+ create(:bulk_import_batch_tracker, tracker: pipeline_tracker, batch_number: 2)
expect(BulkImports::PipelineBatchWorker).to receive(:perform_async).twice
+ expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_batch_numbers_enqueued, [1, 3])
+ expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_final_batch_was_enqueued, true)
worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
@@ -517,7 +520,8 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
expect(pipeline_tracker.status_name).to eq(:started)
expect(pipeline_tracker.batched).to eq(true)
- expect(pipeline_tracker.batches.count).to eq(batches_count)
+ expect(pipeline_tracker.batches.pluck_batch_numbers).to contain_exactly(1, 2, 3)
+ expect(described_class.jobs).to be_empty
end
context 'when batches count is less than 1' do
@@ -531,6 +535,127 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
expect(pipeline_tracker.reload.status_name).to eq(:finished)
end
end
+
+ context 'when pipeline batch enqueuing should be limited' do
+ using RSpec::Parameterized::TableSyntax
+
+ before do
+ allow(::Gitlab::CurrentSettings).to receive(:bulk_import_concurrent_pipeline_batch_limit).and_return(2)
+ end
+
+ it 'only enqueues limited batches and reenqueues itself' do
+ expect(BulkImports::PipelineBatchWorker).to receive(:perform_async).twice
+ expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_batch_numbers_enqueued, [1, 2])
+ expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_final_batch_was_enqueued, false)
+
+ worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
+
+ pipeline_tracker.reload
+
+ expect(pipeline_tracker.status_name).to eq(:started)
+ expect(pipeline_tracker.batched).to eq(true)
+ expect(pipeline_tracker.batches.pluck_batch_numbers).to contain_exactly(1, 2)
+ expect(described_class.jobs).to contain_exactly(
+ hash_including(
+ 'args' => [pipeline_tracker.id, pipeline_tracker.stage, entity.id],
+ 'scheduled_at' => be_within(1).of(10.seconds.from_now.to_i)
+ )
+ )
+ end
+
+ context 'when there is a batch in progress' do
+ where(:status) { BulkImports::BatchTracker::IN_PROGRESS_STATES }
+
+ with_them do
+ before do
+ create(:bulk_import_batch_tracker, status, batch_number: 1, tracker: pipeline_tracker)
+ end
+
+ it 'counts the in progress batch against the limit' do
+ expect(BulkImports::PipelineBatchWorker).to receive(:perform_async).once
+ expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_batch_numbers_enqueued, [2])
+ expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_final_batch_was_enqueued, false)
+
+ worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
+
+ pipeline_tracker.reload
+
+ expect(pipeline_tracker.status_name).to eq(:started)
+ expect(pipeline_tracker.batched).to eq(true)
+ expect(pipeline_tracker.batches.pluck_batch_numbers).to contain_exactly(1, 2)
+ expect(described_class.jobs).to contain_exactly(
+ hash_including(
+ 'args' => [pipeline_tracker.id, pipeline_tracker.stage, entity.id],
+ 'scheduled_at' => be_within(1).of(10.seconds.from_now.to_i)
+ )
+ )
+ end
+ end
+ end
+
+ context 'when there is a batch that has finished' do
+ where(:status) do
+ all_statuses = BulkImports::BatchTracker.state_machines[:status].states.map(&:name)
+ all_statuses - BulkImports::BatchTracker::IN_PROGRESS_STATES
+ end
+
+ with_them do
+ before do
+ create(:bulk_import_batch_tracker, status, batch_number: 1, tracker: pipeline_tracker)
+ end
+
+ it 'does not count the finished batch against the limit' do
+ expect(BulkImports::PipelineBatchWorker).to receive(:perform_async).twice
+ expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_batch_numbers_enqueued, [2, 3])
+ expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_final_batch_was_enqueued, true)
+
+ worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
+
+ pipeline_tracker.reload
+
+ expect(pipeline_tracker.batches.pluck_batch_numbers).to contain_exactly(1, 2, 3)
+ expect(described_class.jobs).to be_empty
+ end
+ end
+ end
+
+ context 'when the feature flag is disabled' do
+ before do
+ stub_feature_flags(bulk_import_limit_concurrent_batches: false)
+ end
+
+ it 'does not limit batches' do
+ expect(BulkImports::PipelineBatchWorker).to receive(:perform_async).exactly(3).times
+ expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_batch_numbers_enqueued, [1, 2, 3])
+ expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_final_batch_was_enqueued, true)
+
+ worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
+
+ pipeline_tracker.reload
+
+ expect(pipeline_tracker.status_name).to eq(:started)
+ expect(pipeline_tracker.batched).to eq(true)
+ expect(pipeline_tracker.batches.pluck_batch_numbers).to contain_exactly(1, 2, 3)
+ expect(described_class.jobs).to be_empty
+ end
+
+ it 'still enqueues only missing pipelines batches' do
+ create(:bulk_import_batch_tracker, tracker: pipeline_tracker, batch_number: 2)
+ expect(BulkImports::PipelineBatchWorker).to receive(:perform_async).twice
+ expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_batch_numbers_enqueued, [1, 3])
+ expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_final_batch_was_enqueued, true)
+
+ worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
+
+ pipeline_tracker.reload
+
+ expect(pipeline_tracker.status_name).to eq(:started)
+ expect(pipeline_tracker.batched).to eq(true)
+ expect(pipeline_tracker.batches.pluck_batch_numbers).to contain_exactly(1, 2, 3)
+ expect(described_class.jobs).to be_empty
+ end
+ end
+ end
end
end
end