Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'spec/workers/bulk_imports/pipeline_worker_spec.rb')
-rw-r--r--spec/workers/bulk_imports/pipeline_worker_spec.rb341
1 files changed, 155 insertions, 186 deletions
diff --git a/spec/workers/bulk_imports/pipeline_worker_spec.rb b/spec/workers/bulk_imports/pipeline_worker_spec.rb
index e1259d5666d..d99b3e9de73 100644
--- a/spec/workers/bulk_imports/pipeline_worker_spec.rb
+++ b/spec/workers/bulk_imports/pipeline_worker_spec.rb
@@ -9,6 +9,10 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
def run; end
+ def self.relation
+ 'labels'
+ end
+
def self.file_extraction_pipeline?
false
end
@@ -28,6 +32,8 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
)
end
+ let(:worker) { described_class.new }
+
before do
stub_const('FakePipeline', pipeline_class)
@@ -38,13 +44,28 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
end
end
+ include_examples 'an idempotent worker' do
+ let(:job_args) { [pipeline_tracker.id, pipeline_tracker.stage, entity.id] }
+
+ it 'runs the pipeline and sets tracker to finished' do
+ allow(worker).to receive(:jid).and_return('jid')
+
+ perform_multiple(job_args, worker: worker)
+
+ pipeline_tracker.reload
+
+ expect(pipeline_tracker.status_name).to eq(:finished)
+ expect(pipeline_tracker.jid).to eq('jid')
+ end
+ end
+
it 'runs the given pipeline successfully' do
- expect_next_instance_of(Gitlab::Import::Logger) do |logger|
+ expect_next_instance_of(BulkImports::Logger) do |logger|
expect(logger)
.to receive(:info)
.with(
hash_including(
- 'pipeline_name' => 'FakePipeline',
+ 'pipeline_class' => 'FakePipeline',
'bulk_import_id' => entity.bulk_import_id,
'bulk_import_entity_id' => entity.id,
'bulk_import_entity_type' => entity.source_type,
@@ -53,9 +74,9 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
)
end
- allow(subject).to receive(:jid).and_return('jid')
+ allow(worker).to receive(:jid).and_return('jid')
- subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
+ worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
pipeline_tracker.reload
@@ -63,74 +84,30 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
expect(pipeline_tracker.jid).to eq('jid')
end
- context 'when job version is nil' do
- before do
- allow(subject).to receive(:job_version).and_return(nil)
- end
-
- it 'runs the given pipeline successfully and enqueues entity worker' do
- expect(BulkImports::EntityWorker).to receive(:perform_async).with(entity.id)
-
- subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
-
- pipeline_tracker.reload
-
- expect(pipeline_tracker.status_name).to eq(:finished)
- end
-
- context 'when an error occurs' do
- it 'enqueues entity worker' do
- expect_next_instance_of(pipeline_class) do |pipeline|
- expect(pipeline)
- .to receive(:run)
- .and_raise(StandardError, 'Error!')
- end
-
- expect(BulkImports::EntityWorker).to receive(:perform_async).with(entity.id)
-
- subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
- end
- end
- end
-
context 'when exclusive lease cannot be obtained' do
it 'does not run the pipeline' do
- expect(subject).to receive(:try_obtain_lease).and_return(false)
- expect(subject).not_to receive(:run)
+ expect(worker).to receive(:try_obtain_lease).and_return(false)
+ expect(worker).not_to receive(:run)
- subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
+ worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
end
end
- context 'when the pipeline raises an exception' do
- it 'logs the error' do
- pipeline_tracker = create(
- :bulk_import_tracker,
- entity: entity,
- pipeline_name: 'FakePipeline',
- status_event: 'enqueue'
- )
-
- allow(subject).to receive(:jid).and_return('jid')
-
- expect_next_instance_of(pipeline_class) do |pipeline|
- expect(pipeline)
- .to receive(:run)
- .and_raise(StandardError, 'Error!')
- end
+ describe '.sidekiq_retries_exhausted' do
+ it 'logs and sets status as failed' do
+ job = { 'args' => [pipeline_tracker.id, pipeline_tracker.stage, entity.id] }
- expect_next_instance_of(Gitlab::Import::Logger) do |logger|
+ expect_next_instance_of(BulkImports::Logger) do |logger|
expect(logger)
.to receive(:error)
.with(
hash_including(
- 'pipeline_name' => 'FakePipeline',
+ 'pipeline_class' => 'FakePipeline',
'bulk_import_entity_id' => entity.id,
'bulk_import_id' => entity.bulk_import_id,
'bulk_import_entity_type' => entity.source_type,
'source_full_path' => entity.source_full_path,
'class' => 'BulkImports::PipelineWorker',
- 'exception.backtrace' => anything,
'exception.message' => 'Error!',
'message' => 'Pipeline failed',
'source_version' => entity.bulk_import.source_version_info.to_s,
@@ -148,7 +125,7 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
'bulk_import_id' => entity.bulk_import.id,
'bulk_import_entity_type' => entity.source_type,
'source_full_path' => entity.source_full_path,
- 'pipeline_name' => pipeline_tracker.pipeline_name,
+ 'pipeline_class' => pipeline_tracker.pipeline_name,
'importer' => 'gitlab_migration',
'source_version' => entity.bulk_import.source_version_info.to_s
)
@@ -167,84 +144,127 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
)
)
- subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
+ expect_next_instance_of(described_class) do |worker|
+ expect(worker).to receive(:perform_failure).with(pipeline_tracker.id, entity.id, StandardError)
+ .and_call_original
+ allow(worker).to receive(:jid).and_return('jid')
+ end
+
+ described_class.sidekiq_retries_exhausted_block.call(job, StandardError.new('Error!'))
pipeline_tracker.reload
expect(pipeline_tracker.status_name).to eq(:failed)
expect(pipeline_tracker.jid).to eq('jid')
end
+ end
- context 'when enqueued pipeline cannot be found' do
- shared_examples 'logs the error' do
- it 'logs the error' do
- expect_next_instance_of(Gitlab::Import::Logger) do |logger|
- status = pipeline_tracker.human_status_name
-
- expect(logger)
- .to receive(:error)
- .with(
- hash_including(
- 'bulk_import_entity_id' => entity.id,
- 'bulk_import_id' => entity.bulk_import_id,
- 'bulk_import_entity_type' => entity.source_type,
- 'pipeline_tracker_id' => pipeline_tracker.id,
- 'pipeline_tracker_state' => status,
- 'pipeline_name' => pipeline_tracker.pipeline_name,
- 'source_full_path' => entity.source_full_path,
- 'source_version' => entity.bulk_import.source_version_info.to_s,
- 'importer' => 'gitlab_migration',
- 'message' => "Pipeline in #{status} state instead of expected enqueued state"
- )
- )
- end
-
- subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
- end
+ context 'with stop signal from database health check' do
+ around do |example|
+ with_sidekiq_server_middleware do |chain|
+ chain.add Gitlab::SidekiqMiddleware::SkipJobs
+ Sidekiq::Testing.inline! { example.run }
end
+ end
- context 'when pipeline is finished' do
- let(:pipeline_tracker) do
- create(
- :bulk_import_tracker,
- :finished,
- entity: entity,
- pipeline_name: 'FakePipeline'
- )
- end
+ before do
+ stub_feature_flags("drop_sidekiq_jobs_#{described_class.name}": false)
- include_examples 'logs the error'
+ stop_signal = instance_double("Gitlab::Database::HealthStatus::Signals::Stop", stop?: true)
+ allow(Gitlab::Database::HealthStatus).to receive(:evaluate).and_return([stop_signal])
+ end
+
+ it 'defers the job by set time' do
+ expect_next_instance_of(described_class) do |worker|
+ expect(worker).not_to receive(:perform).with(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
end
- context 'when pipeline is skipped' do
- let(:pipeline_tracker) do
- create(
- :bulk_import_tracker,
- :skipped,
- entity: entity,
- pipeline_name: 'FakePipeline'
- )
- end
+ expect(described_class).to receive(:perform_in).with(
+ described_class::DEFER_ON_HEALTH_DELAY,
+ pipeline_tracker.id,
+ pipeline_tracker.stage,
+ entity.id
+ )
- include_examples 'logs the error'
- end
+ described_class.perform_async(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
+ end
- context 'when tracker is started' do
- it 'marks tracker as failed' do
- pipeline_tracker = create(
- :bulk_import_tracker,
- :started,
- entity: entity,
- pipeline_name: 'FakePipeline'
- )
+ it 'lazy evaluates schema and tables', :aggregate_failures do
+ block = described_class.database_health_check_attrs[:block]
+
+ job_args = [pipeline_tracker.id, pipeline_tracker.stage, entity.id]
+
+ schema, table = block.call([job_args])
+
+ expect(schema).to eq(:gitlab_main_cell)
+ expect(table).to eq(['labels'])
+ end
- subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
+ context 'when `bulk_import_deferred_workers` feature flag is disabled' do
+ it 'does not defer job execution' do
+ stub_feature_flags(bulk_import_deferred_workers: false)
- expect(pipeline_tracker.reload.failed?).to eq(true)
+ expect_next_instance_of(described_class) do |worker|
+ expect(worker).to receive(:perform).with(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
end
+
+ expect(described_class).not_to receive(:perform_in)
+
+ described_class.perform_async(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
end
end
+ end
+
+ context 'when pipeline is finished' do
+ let(:pipeline_tracker) do
+ create(
+ :bulk_import_tracker,
+ :finished,
+ entity: entity,
+ pipeline_name: 'FakePipeline'
+ )
+ end
+ it 'no-ops and returns' do
+ expect(described_class).not_to receive(:run)
+
+ worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
+ end
+ end
+
+ context 'when pipeline is skipped' do
+ let(:pipeline_tracker) do
+ create(
+ :bulk_import_tracker,
+ :skipped,
+ entity: entity,
+ pipeline_name: 'FakePipeline'
+ )
+ end
+
+ it 'no-ops and returns' do
+ expect(described_class).not_to receive(:run)
+
+ worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
+ end
+ end
+
+ context 'when tracker is started' do
+ it 'runs the pipeline' do
+ pipeline_tracker = create(
+ :bulk_import_tracker,
+ :started,
+ entity: entity,
+ pipeline_name: 'FakePipeline'
+ )
+
+ worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
+
+ expect(pipeline_tracker.reload.finished?).to eq(true)
+ end
+ end
+
+ describe '#perform' do
context 'when entity is failed' do
it 'marks tracker as skipped and logs the skip' do
pipeline_tracker = create(
@@ -256,14 +276,14 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
entity.update!(status: -1)
- expect_next_instance_of(Gitlab::Import::Logger) do |logger|
+ expect_next_instance_of(BulkImports::Logger) do |logger|
allow(logger).to receive(:info)
expect(logger)
.to receive(:info)
.with(
hash_including(
- 'pipeline_name' => 'FakePipeline',
+ 'pipeline_class' => 'FakePipeline',
'bulk_import_entity_id' => entity.id,
'bulk_import_id' => entity.bulk_import_id,
'bulk_import_entity_type' => entity.source_type,
@@ -273,7 +293,7 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
)
end
- subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
+ worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
expect(pipeline_tracker.reload.status_name).to eq(:skipped)
end
@@ -294,7 +314,7 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
end
before do
- allow(subject).to receive(:jid).and_return('jid')
+ allow(worker).to receive(:jid).and_return('jid')
expect_next_instance_of(pipeline_class) do |pipeline|
expect(pipeline)
@@ -308,12 +328,12 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
expect(tracker).to receive(:retry).and_call_original
end
- expect_next_instance_of(Gitlab::Import::Logger) do |logger|
+ expect_next_instance_of(BulkImports::Logger) do |logger|
expect(logger)
.to receive(:info)
.with(
hash_including(
- 'pipeline_name' => 'FakePipeline',
+ 'pipeline_class' => 'FakePipeline',
'bulk_import_entity_id' => entity.id,
'bulk_import_id' => entity.bulk_import_id,
'bulk_import_entity_type' => entity.source_type,
@@ -331,7 +351,7 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
pipeline_tracker.entity.id
)
- subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
+ worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
pipeline_tracker.reload
@@ -384,7 +404,7 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
allow(status).to receive(:batched?).and_return(false)
end
- subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
+ worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
expect(pipeline_tracker.reload.status_name).to eq(:finished)
end
@@ -407,7 +427,7 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
entity.id
)
- subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
+ worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
end
end
@@ -436,7 +456,7 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
entity.id
)
- subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
+ worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
expect(pipeline_tracker.reload.status_name).to eq(:enqueued)
end
@@ -445,31 +465,9 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
context 'when empty export timeout is reached' do
let(:created_at) { 10.minutes.ago }
- it 'marks as failed and logs the error' do
- expect_next_instance_of(Gitlab::Import::Logger) do |logger|
- expect(logger)
- .to receive(:error)
- .with(
- hash_including(
- 'pipeline_name' => 'NdjsonPipeline',
- 'bulk_import_entity_id' => entity.id,
- 'bulk_import_id' => entity.bulk_import_id,
- 'bulk_import_entity_type' => entity.source_type,
- 'source_full_path' => entity.source_full_path,
- 'class' => 'BulkImports::PipelineWorker',
- 'exception.backtrace' => anything,
- 'exception.class' => 'BulkImports::Pipeline::ExpiredError',
- 'exception.message' => 'Empty export status on source instance',
- 'importer' => 'gitlab_migration',
- 'message' => 'Pipeline failed',
- 'source_version' => entity.bulk_import.source_version_info.to_s
- )
- )
- end
-
- subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
-
- expect(pipeline_tracker.reload.status_name).to eq(:failed)
+ it 'raises sidekiq error' do
+ expect { worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) }
+ .to raise_exception(BulkImports::Pipeline::ExpiredError)
end
end
@@ -479,17 +477,8 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
it 'falls back to entity created_at' do
entity.update!(created_at: 10.minutes.ago)
- expect_next_instance_of(Gitlab::Import::Logger) do |logger|
- expect(logger)
- .to receive(:error)
- .with(
- hash_including('exception.message' => 'Empty export status on source instance')
- )
- end
-
- subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
-
- expect(pipeline_tracker.reload.status_name).to eq(:failed)
+ expect { worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) }
+ .to raise_exception(BulkImports::Pipeline::ExpiredError)
end
end
end
@@ -501,28 +490,8 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
allow(status).to receive(:error).and_return('Error!')
end
- expect_next_instance_of(Gitlab::Import::Logger) do |logger|
- expect(logger)
- .to receive(:error)
- .with(
- hash_including(
- 'pipeline_name' => 'NdjsonPipeline',
- 'bulk_import_entity_id' => entity.id,
- 'bulk_import_id' => entity.bulk_import_id,
- 'bulk_import_entity_type' => entity.source_type,
- 'source_full_path' => entity.source_full_path,
- 'exception.backtrace' => anything,
- 'exception.class' => 'BulkImports::Pipeline::FailedError',
- 'exception.message' => 'Export from source instance failed: Error!',
- 'importer' => 'gitlab_migration',
- 'source_version' => entity.bulk_import.source_version_info.to_s
- )
- )
- end
-
- subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
-
- expect(pipeline_tracker.reload.status_name).to eq(:failed)
+ expect { worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) }
+ .to raise_exception(BulkImports::Pipeline::FailedError)
end
end
@@ -542,7 +511,7 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
it 'enqueues pipeline batches' do
expect(BulkImports::PipelineBatchWorker).to receive(:perform_async).twice
- subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
+ worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
pipeline_tracker.reload
@@ -555,9 +524,9 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
let(:batches_count) { 0 }
it 'marks tracker as finished' do
- expect(subject).not_to receive(:enqueue_batches)
+ expect(worker).not_to receive(:enqueue_batches)
- subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
+ worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
expect(pipeline_tracker.reload.status_name).to eq(:finished)
end