Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'spec/workers/bulk_imports/pipeline_worker_spec.rb')
-rw-r--r--spec/workers/bulk_imports/pipeline_worker_spec.rb236
1 files changed, 180 insertions, 56 deletions
diff --git a/spec/workers/bulk_imports/pipeline_worker_spec.rb b/spec/workers/bulk_imports/pipeline_worker_spec.rb
index 23fbc5688ec..03ec6267ca8 100644
--- a/spec/workers/bulk_imports/pipeline_worker_spec.rb
+++ b/spec/workers/bulk_imports/pipeline_worker_spec.rb
@@ -2,7 +2,7 @@
require 'spec_helper'
-RSpec.describe BulkImports::PipelineWorker do
+RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
let(:pipeline_class) do
Class.new do
def initialize(_); end
@@ -19,6 +19,15 @@ RSpec.describe BulkImports::PipelineWorker do
let_it_be(:config) { create(:bulk_import_configuration, bulk_import: bulk_import) }
let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import) }
+ let(:pipeline_tracker) do
+ create(
+ :bulk_import_tracker,
+ entity: entity,
+ pipeline_name: 'FakePipeline',
+ status_event: 'enqueue'
+ )
+ end
+
before do
stub_const('FakePipeline', pipeline_class)
@@ -60,45 +69,12 @@ RSpec.describe BulkImports::PipelineWorker do
end
end
- it_behaves_like 'successfully runs the pipeline' do
- let(:pipeline_tracker) do
- create(
- :bulk_import_tracker,
- entity: entity,
- pipeline_name: 'FakePipeline',
- status_event: 'enqueue'
- )
- end
- end
+ it_behaves_like 'successfully runs the pipeline'
- context 'when the pipeline cannot be found' do
- it 'logs the error' do
- pipeline_tracker = create(
- :bulk_import_tracker,
- :finished,
- entity: entity,
- pipeline_name: 'FakePipeline'
- )
-
- expect_next_instance_of(Gitlab::Import::Logger) do |logger|
- expect(logger)
- .to receive(:error)
- .with(
- hash_including(
- 'pipeline_tracker_id' => pipeline_tracker.id,
- 'bulk_import_entity_id' => entity.id,
- 'bulk_import_id' => entity.bulk_import_id,
- 'bulk_import_entity_type' => entity.source_type,
- 'source_full_path' => entity.source_full_path,
- 'source_version' => entity.bulk_import.source_version_info.to_s,
- 'message' => 'Unstarted pipeline not found'
- )
- )
- end
-
- expect(BulkImports::EntityWorker)
- .to receive(:perform_async)
- .with(entity.id, pipeline_tracker.stage)
+ context 'when exclusive lease cannot be obtained' do
+ it 'does not run the pipeline' do
+ expect(subject).to receive(:try_obtain_lease).and_return(false)
+ expect(subject).not_to receive(:run)
subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
end
@@ -145,13 +121,15 @@ RSpec.describe BulkImports::PipelineWorker do
.to receive(:track_exception)
.with(
instance_of(StandardError),
- bulk_import_entity_id: entity.id,
- bulk_import_id: entity.bulk_import.id,
- bulk_import_entity_type: entity.source_type,
- source_full_path: entity.source_full_path,
- pipeline_name: pipeline_tracker.pipeline_name,
- importer: 'gitlab_migration',
- source_version: entity.bulk_import.source_version_info.to_s
+ hash_including(
+ 'bulk_import_entity_id' => entity.id,
+ 'bulk_import_id' => entity.bulk_import.id,
+ 'bulk_import_entity_type' => entity.source_type,
+ 'source_full_path' => entity.source_full_path,
+ 'pipeline_name' => pipeline_tracker.pipeline_name,
+ 'importer' => 'gitlab_migration',
+ 'source_version' => entity.bulk_import.source_version_info.to_s
+ )
)
expect(BulkImports::EntityWorker)
@@ -179,6 +157,111 @@ RSpec.describe BulkImports::PipelineWorker do
expect(pipeline_tracker.jid).to eq('jid')
end
+ shared_examples 'successfully runs the pipeline' do
+ it 'runs the given pipeline successfully' do
+ expect_next_instance_of(Gitlab::Import::Logger) do |logger|
+ expect(logger)
+ .to receive(:info)
+ .with(
+ hash_including(
+ 'pipeline_name' => 'FakePipeline',
+ 'bulk_import_id' => entity.bulk_import_id,
+ 'bulk_import_entity_id' => entity.id,
+ 'bulk_import_entity_type' => entity.source_type,
+ 'source_full_path' => entity.source_full_path
+ )
+ )
+ end
+
+ expect(BulkImports::EntityWorker)
+ .to receive(:perform_async)
+ .with(entity.id, pipeline_tracker.stage)
+
+ allow(subject).to receive(:jid).and_return('jid')
+
+ subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
+
+ pipeline_tracker.reload
+
+ expect(pipeline_tracker.status_name).to eq(:finished)
+ expect(pipeline_tracker.jid).to eq('jid')
+ end
+ end
+
+ context 'when enqueued pipeline cannot be found' do
+ shared_examples 'logs the error' do
+ it 'logs the error' do
+ expect_next_instance_of(Gitlab::Import::Logger) do |logger|
+ status = pipeline_tracker.human_status_name
+
+ expect(logger)
+ .to receive(:error)
+ .with(
+ hash_including(
+ 'bulk_import_entity_id' => entity.id,
+ 'bulk_import_id' => entity.bulk_import_id,
+ 'bulk_import_entity_type' => entity.source_type,
+ 'pipeline_tracker_id' => pipeline_tracker.id,
+ 'pipeline_tracker_state' => status,
+ 'pipeline_name' => pipeline_tracker.pipeline_name,
+ 'source_full_path' => entity.source_full_path,
+ 'source_version' => entity.bulk_import.source_version_info.to_s,
+ 'importer' => 'gitlab_migration',
+ 'message' => "Pipeline in #{status} state instead of expected enqueued state"
+ )
+ )
+ end
+
+ expect(BulkImports::EntityWorker)
+ .to receive(:perform_async)
+ .with(entity.id, pipeline_tracker.stage)
+
+ subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
+ end
+ end
+
+ context 'when pipeline is finished' do
+ let(:pipeline_tracker) do
+ create(
+ :bulk_import_tracker,
+ :finished,
+ entity: entity,
+ pipeline_name: 'FakePipeline'
+ )
+ end
+
+ include_examples 'logs the error'
+ end
+
+ context 'when pipeline is skipped' do
+ let(:pipeline_tracker) do
+ create(
+ :bulk_import_tracker,
+ :skipped,
+ entity: entity,
+ pipeline_name: 'FakePipeline'
+ )
+ end
+
+ include_examples 'logs the error'
+ end
+
+ context 'when tracker is started' do
+ it 'marks tracker as failed' do
+ pipeline_tracker = create(
+ :bulk_import_tracker,
+ :started,
+ entity: entity,
+ pipeline_name: 'FakePipeline'
+ )
+
+ subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
+
+ expect(pipeline_tracker.reload.failed?).to eq(true)
+ end
+ end
+ end
+
context 'when entity is failed' do
it 'marks tracker as skipped and logs the skip' do
pipeline_tracker = create(
@@ -343,23 +426,64 @@ RSpec.describe BulkImports::PipelineWorker do
end
context 'when export status is empty' do
- it 'reenqueues pipeline worker' do
+ before do
allow_next_instance_of(BulkImports::ExportStatus) do |status|
allow(status).to receive(:started?).and_return(false)
allow(status).to receive(:empty?).and_return(true)
allow(status).to receive(:failed?).and_return(false)
end
- expect(described_class)
- .to receive(:perform_in)
- .with(
- described_class::FILE_EXTRACTION_PIPELINE_PERFORM_DELAY,
- pipeline_tracker.id,
- pipeline_tracker.stage,
- entity.id
- )
+ entity.update!(created_at: entity_created_at)
+ end
- subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
+ context 'when timeout is not reached' do
+ let(:entity_created_at) { 1.minute.ago }
+
+ it 'reenqueues pipeline worker' do
+ expect(described_class)
+ .to receive(:perform_in)
+ .with(
+ described_class::FILE_EXTRACTION_PIPELINE_PERFORM_DELAY,
+ pipeline_tracker.id,
+ pipeline_tracker.stage,
+ entity.id
+ )
+
+ subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
+
+ expect(pipeline_tracker.reload.status_name).to eq(:enqueued)
+ end
+ end
+
+ context 'when timeout is reached' do
+ let(:entity_created_at) { 10.minutes.ago }
+
+ it 'marks as failed and logs the error' do
+ expect_next_instance_of(Gitlab::Import::Logger) do |logger|
+ expect(logger)
+ .to receive(:error)
+ .with(
+ hash_including(
+ 'pipeline_name' => 'NdjsonPipeline',
+ 'bulk_import_entity_id' => entity.id,
+ 'bulk_import_id' => entity.bulk_import_id,
+ 'bulk_import_entity_type' => entity.source_type,
+ 'source_full_path' => entity.source_full_path,
+ 'class' => 'BulkImports::PipelineWorker',
+ 'exception.backtrace' => anything,
+ 'exception.class' => 'BulkImports::Pipeline::ExpiredError',
+ 'exception.message' => 'Empty export status on source instance',
+ 'importer' => 'gitlab_migration',
+ 'message' => 'Pipeline failed',
+ 'source_version' => entity.bulk_import.source_version_info.to_s
+ )
+ )
+ end
+
+ subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
+
+ expect(pipeline_tracker.reload.status_name).to eq(:failed)
+ end
end
end