diff options
Diffstat (limited to 'spec/workers/bulk_imports/pipeline_worker_spec.rb')
-rw-r--r-- | spec/workers/bulk_imports/pipeline_worker_spec.rb | 142 |
1 files changed, 104 insertions, 38 deletions
diff --git a/spec/workers/bulk_imports/pipeline_worker_spec.rb b/spec/workers/bulk_imports/pipeline_worker_spec.rb index 209ae8862b6..b5f20e9ff76 100644 --- a/spec/workers/bulk_imports/pipeline_worker_spec.rb +++ b/spec/workers/bulk_imports/pipeline_worker_spec.rb @@ -22,9 +22,10 @@ RSpec.describe BulkImports::PipelineWorker do before do stub_const('FakePipeline', pipeline_class) + allow(entity).to receive(:pipeline_exists?).with('FakePipeline').and_return(true) allow_next_instance_of(BulkImports::Groups::Stage) do |instance| allow(instance).to receive(:pipelines) - .and_return([[0, pipeline_class]]) + .and_return([{ stage: 0, pipeline: pipeline_class }]) end end @@ -101,18 +102,26 @@ RSpec.describe BulkImports::PipelineWorker do pipeline_tracker = create( :bulk_import_tracker, entity: entity, - pipeline_name: 'InexistentPipeline', + pipeline_name: 'FakePipeline', status_event: 'enqueue' ) + allow(subject).to receive(:jid).and_return('jid') + + expect_next_instance_of(pipeline_class) do |pipeline| + expect(pipeline) + .to receive(:run) + .and_raise(StandardError, 'Error!') + end + expect_next_instance_of(Gitlab::Import::Logger) do |logger| expect(logger) .to receive(:error) .with( hash_including( - 'pipeline_name' => 'InexistentPipeline', + 'pipeline_name' => 'FakePipeline', 'entity_id' => entity.id, - 'message' => "'InexistentPipeline' is not a valid BulkImport Pipeline" + 'message' => 'Error!' ) ) end @@ -120,7 +129,7 @@ RSpec.describe BulkImports::PipelineWorker do expect(Gitlab::ErrorTracking) .to receive(:track_exception) .with( - instance_of(BulkImports::Error), + instance_of(StandardError), entity_id: entity.id, pipeline_name: pipeline_tracker.pipeline_name ) @@ -129,7 +138,18 @@ RSpec.describe BulkImports::PipelineWorker do .to receive(:perform_async) .with(entity.id, pipeline_tracker.stage) - allow(subject).to receive(:jid).and_return('jid') + expect(BulkImports::Failure) + .to receive(:create) + .with( + a_hash_including( + bulk_import_entity_id: entity.id, + pipeline_class: 'FakePipeline', + pipeline_step: 'pipeline_worker_run', + exception_class: 'StandardError', + exception_message: 'Error!', + correlation_id_value: anything + ) + ) subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) @@ -144,18 +164,19 @@ RSpec.describe BulkImports::PipelineWorker do pipeline_tracker = create( :bulk_import_tracker, entity: entity, - pipeline_name: 'Pipeline', + pipeline_name: 'FakePipeline', status_event: 'enqueue' ) entity.update!(status: -1) + expect(BulkImports::Failure).to receive(:create) expect_next_instance_of(Gitlab::Import::Logger) do |logger| expect(logger) .to receive(:error) .with( hash_including( - 'pipeline_name' => 'Pipeline', + 'pipeline_name' => 'FakePipeline', 'entity_id' => entity.id, 'message' => 'Failed entity status' ) @@ -168,56 +189,78 @@ RSpec.describe BulkImports::PipelineWorker do end end - context 'when it is a network error' do - it 'reenqueue on retriable network errors' do - pipeline_tracker = create( + context 'when network error is raised' do + let(:pipeline_tracker) do + create( :bulk_import_tracker, entity: entity, pipeline_name: 'FakePipeline', status_event: 'enqueue' ) + end - exception = BulkImports::NetworkError.new( - response: double(code: 429, headers: {}) - ) + let(:exception) do + BulkImports::NetworkError.new(response: instance_double(HTTParty::Response, code: 429, headers: {})) + end + + before do + allow(subject).to receive(:jid).and_return('jid') expect_next_instance_of(pipeline_class) do |pipeline| expect(pipeline) .to receive(:run) .and_raise(exception) end + end - allow(subject).to receive(:jid).and_return('jid') - - expect_any_instance_of(BulkImports::Tracker) do |tracker| - expect(tracker).to receive(:retry).and_call_original - end + context 'when error is retriable' do + it 'reenqueues the worker' do + expect_any_instance_of(BulkImports::Tracker) do |tracker| + expect(tracker).to receive(:retry).and_call_original + end + + expect_next_instance_of(Gitlab::Import::Logger) do |logger| + expect(logger) + .to receive(:info) + .with( + hash_including( + 'pipeline_name' => 'FakePipeline', + 'entity_id' => entity.id + ) + ) + end - expect_next_instance_of(Gitlab::Import::Logger) do |logger| - expect(logger) - .to receive(:info) + expect(described_class) + .to receive(:perform_in) .with( - hash_including( - 'pipeline_name' => 'FakePipeline', - 'entity_id' => entity.id - ) + 60.seconds, + pipeline_tracker.id, + pipeline_tracker.stage, + pipeline_tracker.entity.id ) + + subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + + pipeline_tracker.reload + + expect(pipeline_tracker.enqueued?).to be_truthy end - expect(described_class) - .to receive(:perform_in) - .with( - 60.seconds, - pipeline_tracker.id, - pipeline_tracker.stage, - pipeline_tracker.entity.id - ) + context 'when error is not retriable' do + let(:exception) do + BulkImports::NetworkError.new(response: instance_double(HTTParty::Response, code: 503, headers: {})) + end - subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + it 'marks tracker as failed and logs the error' do + expect(described_class).not_to receive(:perform_in) + + subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) - pipeline_tracker.reload + pipeline_tracker.reload - expect(pipeline_tracker.enqueued?).to be_truthy + expect(pipeline_tracker.failed?).to eq(true) + end + end end end end @@ -253,13 +296,14 @@ RSpec.describe BulkImports::PipelineWorker do allow_next_instance_of(BulkImports::Groups::Stage) do |instance| allow(instance).to receive(:pipelines) - .and_return([[0, file_extraction_pipeline]]) + .and_return([{ stage: 0, pipeline: file_extraction_pipeline }]) end end it 'runs the pipeline successfully' do allow_next_instance_of(BulkImports::ExportStatus) do |status| allow(status).to receive(:started?).and_return(false) + allow(status).to receive(:empty?).and_return(false) allow(status).to receive(:failed?).and_return(false) end @@ -272,6 +316,28 @@ RSpec.describe BulkImports::PipelineWorker do it 'reenqueues pipeline worker' do allow_next_instance_of(BulkImports::ExportStatus) do |status| allow(status).to receive(:started?).and_return(true) + allow(status).to receive(:empty?).and_return(false) + allow(status).to receive(:failed?).and_return(false) + end + + expect(described_class) + .to receive(:perform_in) + .with( + described_class::FILE_EXTRACTION_PIPELINE_PERFORM_DELAY, + pipeline_tracker.id, + pipeline_tracker.stage, + entity.id + ) + + subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + end + end + + context 'when export status is empty' do + it 'reenqueues pipeline worker' do + allow_next_instance_of(BulkImports::ExportStatus) do |status| + allow(status).to receive(:started?).and_return(false) + allow(status).to receive(:empty?).and_return(true) allow(status).to receive(:failed?).and_return(false) end |