diff options
Diffstat (limited to 'spec/workers/bulk_imports/pipeline_worker_spec.rb')
-rw-r--r-- | spec/workers/bulk_imports/pipeline_worker_spec.rb | 236 |
1 files changed, 180 insertions, 56 deletions
diff --git a/spec/workers/bulk_imports/pipeline_worker_spec.rb b/spec/workers/bulk_imports/pipeline_worker_spec.rb index 23fbc5688ec..03ec6267ca8 100644 --- a/spec/workers/bulk_imports/pipeline_worker_spec.rb +++ b/spec/workers/bulk_imports/pipeline_worker_spec.rb @@ -2,7 +2,7 @@ require 'spec_helper' -RSpec.describe BulkImports::PipelineWorker do +RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do let(:pipeline_class) do Class.new do def initialize(_); end @@ -19,6 +19,15 @@ RSpec.describe BulkImports::PipelineWorker do let_it_be(:config) { create(:bulk_import_configuration, bulk_import: bulk_import) } let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import) } + let(:pipeline_tracker) do + create( + :bulk_import_tracker, + entity: entity, + pipeline_name: 'FakePipeline', + status_event: 'enqueue' + ) + end + before do stub_const('FakePipeline', pipeline_class) @@ -60,45 +69,12 @@ RSpec.describe BulkImports::PipelineWorker do end end - it_behaves_like 'successfully runs the pipeline' do - let(:pipeline_tracker) do - create( - :bulk_import_tracker, - entity: entity, - pipeline_name: 'FakePipeline', - status_event: 'enqueue' - ) - end - end + it_behaves_like 'successfully runs the pipeline' - context 'when the pipeline cannot be found' do - it 'logs the error' do - pipeline_tracker = create( - :bulk_import_tracker, - :finished, - entity: entity, - pipeline_name: 'FakePipeline' - ) - - expect_next_instance_of(Gitlab::Import::Logger) do |logger| - expect(logger) - .to receive(:error) - .with( - hash_including( - 'pipeline_tracker_id' => pipeline_tracker.id, - 'bulk_import_entity_id' => entity.id, - 'bulk_import_id' => entity.bulk_import_id, - 'bulk_import_entity_type' => entity.source_type, - 'source_full_path' => entity.source_full_path, - 'source_version' => entity.bulk_import.source_version_info.to_s, - 'message' => 'Unstarted pipeline not found' - ) - ) - end - - expect(BulkImports::EntityWorker) - .to receive(:perform_async) - .with(entity.id, pipeline_tracker.stage) + context 'when exclusive lease cannot be obtained' do + it 'does not run the pipeline' do + expect(subject).to receive(:try_obtain_lease).and_return(false) + expect(subject).not_to receive(:run) subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) end @@ -145,13 +121,15 @@ RSpec.describe BulkImports::PipelineWorker do .to receive(:track_exception) .with( instance_of(StandardError), - bulk_import_entity_id: entity.id, - bulk_import_id: entity.bulk_import.id, - bulk_import_entity_type: entity.source_type, - source_full_path: entity.source_full_path, - pipeline_name: pipeline_tracker.pipeline_name, - importer: 'gitlab_migration', - source_version: entity.bulk_import.source_version_info.to_s + hash_including( + 'bulk_import_entity_id' => entity.id, + 'bulk_import_id' => entity.bulk_import.id, + 'bulk_import_entity_type' => entity.source_type, + 'source_full_path' => entity.source_full_path, + 'pipeline_name' => pipeline_tracker.pipeline_name, + 'importer' => 'gitlab_migration', + 'source_version' => entity.bulk_import.source_version_info.to_s + ) ) expect(BulkImports::EntityWorker) @@ -179,6 +157,111 @@ RSpec.describe BulkImports::PipelineWorker do expect(pipeline_tracker.jid).to eq('jid') end + shared_examples 'successfully runs the pipeline' do + it 'runs the given pipeline successfully' do + expect_next_instance_of(Gitlab::Import::Logger) do |logger| + expect(logger) + .to receive(:info) + .with( + hash_including( + 'pipeline_name' => 'FakePipeline', + 'bulk_import_id' => entity.bulk_import_id, + 'bulk_import_entity_id' => entity.id, + 'bulk_import_entity_type' => entity.source_type, + 'source_full_path' => entity.source_full_path + ) + ) + end + + expect(BulkImports::EntityWorker) + .to receive(:perform_async) + .with(entity.id, pipeline_tracker.stage) + + allow(subject).to receive(:jid).and_return('jid') + + subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + + pipeline_tracker.reload + + expect(pipeline_tracker.status_name).to eq(:finished) + expect(pipeline_tracker.jid).to eq('jid') + end + end + + context 'when enqueued pipeline cannot be found' do + shared_examples 'logs the error' do + it 'logs the error' do + expect_next_instance_of(Gitlab::Import::Logger) do |logger| + status = pipeline_tracker.human_status_name + + expect(logger) + .to receive(:error) + .with( + hash_including( + 'bulk_import_entity_id' => entity.id, + 'bulk_import_id' => entity.bulk_import_id, + 'bulk_import_entity_type' => entity.source_type, + 'pipeline_tracker_id' => pipeline_tracker.id, + 'pipeline_tracker_state' => status, + 'pipeline_name' => pipeline_tracker.pipeline_name, + 'source_full_path' => entity.source_full_path, + 'source_version' => entity.bulk_import.source_version_info.to_s, + 'importer' => 'gitlab_migration', + 'message' => "Pipeline in #{status} state instead of expected enqueued state" + ) + ) + end + + expect(BulkImports::EntityWorker) + .to receive(:perform_async) + .with(entity.id, pipeline_tracker.stage) + + subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + end + end + + context 'when pipeline is finished' do + let(:pipeline_tracker) do + create( + :bulk_import_tracker, + :finished, + entity: entity, + pipeline_name: 'FakePipeline' + ) + end + + include_examples 'logs the error' + end + + context 'when pipeline is skipped' do + let(:pipeline_tracker) do + create( + :bulk_import_tracker, + :skipped, + entity: entity, + pipeline_name: 'FakePipeline' + ) + end + + include_examples 'logs the error' + end + + context 'when tracker is started' do + it 'marks tracker as failed' do + pipeline_tracker = create( + :bulk_import_tracker, + :started, + entity: entity, + pipeline_name: 'FakePipeline' + ) + + subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + + expect(pipeline_tracker.reload.failed?).to eq(true) + end + end + end + context 'when entity is failed' do it 'marks tracker as skipped and logs the skip' do pipeline_tracker = create( @@ -343,23 +426,64 @@ RSpec.describe BulkImports::PipelineWorker do end context 'when export status is empty' do - it 'reenqueues pipeline worker' do + before do allow_next_instance_of(BulkImports::ExportStatus) do |status| allow(status).to receive(:started?).and_return(false) allow(status).to receive(:empty?).and_return(true) allow(status).to receive(:failed?).and_return(false) end - expect(described_class) - .to receive(:perform_in) - .with( - described_class::FILE_EXTRACTION_PIPELINE_PERFORM_DELAY, - pipeline_tracker.id, - pipeline_tracker.stage, - entity.id - ) + entity.update!(created_at: entity_created_at) + end - subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + context 'when timeout is not reached' do + let(:entity_created_at) { 1.minute.ago } + + it 'reenqueues pipeline worker' do + expect(described_class) + .to receive(:perform_in) + .with( + described_class::FILE_EXTRACTION_PIPELINE_PERFORM_DELAY, + pipeline_tracker.id, + pipeline_tracker.stage, + entity.id + ) + + subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + + expect(pipeline_tracker.reload.status_name).to eq(:enqueued) + end + end + + context 'when timeout is reached' do + let(:entity_created_at) { 10.minutes.ago } + + it 'marks as failed and logs the error' do + expect_next_instance_of(Gitlab::Import::Logger) do |logger| + expect(logger) + .to receive(:error) + .with( + hash_including( + 'pipeline_name' => 'NdjsonPipeline', + 'bulk_import_entity_id' => entity.id, + 'bulk_import_id' => entity.bulk_import_id, + 'bulk_import_entity_type' => entity.source_type, + 'source_full_path' => entity.source_full_path, + 'class' => 'BulkImports::PipelineWorker', + 'exception.backtrace' => anything, + 'exception.class' => 'BulkImports::Pipeline::ExpiredError', + 'exception.message' => 'Empty export status on source instance', + 'importer' => 'gitlab_migration', + 'message' => 'Pipeline failed', + 'source_version' => entity.bulk_import.source_version_info.to_s + ) + ) + end + + subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + + expect(pipeline_tracker.reload.status_name).to eq(:failed) + end end end |