Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'spec/workers/bulk_imports/pipeline_worker_spec.rb')
-rw-r--r--spec/workers/bulk_imports/pipeline_worker_spec.rb142
1 files changed, 104 insertions, 38 deletions
diff --git a/spec/workers/bulk_imports/pipeline_worker_spec.rb b/spec/workers/bulk_imports/pipeline_worker_spec.rb
index 209ae8862b6..b5f20e9ff76 100644
--- a/spec/workers/bulk_imports/pipeline_worker_spec.rb
+++ b/spec/workers/bulk_imports/pipeline_worker_spec.rb
@@ -22,9 +22,10 @@ RSpec.describe BulkImports::PipelineWorker do
before do
stub_const('FakePipeline', pipeline_class)
+ allow(entity).to receive(:pipeline_exists?).with('FakePipeline').and_return(true)
allow_next_instance_of(BulkImports::Groups::Stage) do |instance|
allow(instance).to receive(:pipelines)
- .and_return([[0, pipeline_class]])
+ .and_return([{ stage: 0, pipeline: pipeline_class }])
end
end
@@ -101,18 +102,26 @@ RSpec.describe BulkImports::PipelineWorker do
pipeline_tracker = create(
:bulk_import_tracker,
entity: entity,
- pipeline_name: 'InexistentPipeline',
+ pipeline_name: 'FakePipeline',
status_event: 'enqueue'
)
+ allow(subject).to receive(:jid).and_return('jid')
+
+ expect_next_instance_of(pipeline_class) do |pipeline|
+ expect(pipeline)
+ .to receive(:run)
+ .and_raise(StandardError, 'Error!')
+ end
+
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger)
.to receive(:error)
.with(
hash_including(
- 'pipeline_name' => 'InexistentPipeline',
+ 'pipeline_name' => 'FakePipeline',
'entity_id' => entity.id,
- 'message' => "'InexistentPipeline' is not a valid BulkImport Pipeline"
+ 'message' => 'Error!'
)
)
end
@@ -120,7 +129,7 @@ RSpec.describe BulkImports::PipelineWorker do
expect(Gitlab::ErrorTracking)
.to receive(:track_exception)
.with(
- instance_of(BulkImports::Error),
+ instance_of(StandardError),
entity_id: entity.id,
pipeline_name: pipeline_tracker.pipeline_name
)
@@ -129,7 +138,18 @@ RSpec.describe BulkImports::PipelineWorker do
.to receive(:perform_async)
.with(entity.id, pipeline_tracker.stage)
- allow(subject).to receive(:jid).and_return('jid')
+ expect(BulkImports::Failure)
+ .to receive(:create)
+ .with(
+ a_hash_including(
+ bulk_import_entity_id: entity.id,
+ pipeline_class: 'FakePipeline',
+ pipeline_step: 'pipeline_worker_run',
+ exception_class: 'StandardError',
+ exception_message: 'Error!',
+ correlation_id_value: anything
+ )
+ )
subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
@@ -144,18 +164,19 @@ RSpec.describe BulkImports::PipelineWorker do
pipeline_tracker = create(
:bulk_import_tracker,
entity: entity,
- pipeline_name: 'Pipeline',
+ pipeline_name: 'FakePipeline',
status_event: 'enqueue'
)
entity.update!(status: -1)
+ expect(BulkImports::Failure).to receive(:create)
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger)
.to receive(:error)
.with(
hash_including(
- 'pipeline_name' => 'Pipeline',
+ 'pipeline_name' => 'FakePipeline',
'entity_id' => entity.id,
'message' => 'Failed entity status'
)
@@ -168,56 +189,78 @@ RSpec.describe BulkImports::PipelineWorker do
end
end
- context 'when it is a network error' do
- it 'reenqueue on retriable network errors' do
- pipeline_tracker = create(
+ context 'when network error is raised' do
+ let(:pipeline_tracker) do
+ create(
:bulk_import_tracker,
entity: entity,
pipeline_name: 'FakePipeline',
status_event: 'enqueue'
)
+ end
- exception = BulkImports::NetworkError.new(
- response: double(code: 429, headers: {})
- )
+ let(:exception) do
+ BulkImports::NetworkError.new(response: instance_double(HTTParty::Response, code: 429, headers: {}))
+ end
+
+ before do
+ allow(subject).to receive(:jid).and_return('jid')
expect_next_instance_of(pipeline_class) do |pipeline|
expect(pipeline)
.to receive(:run)
.and_raise(exception)
end
+ end
- allow(subject).to receive(:jid).and_return('jid')
-
- expect_any_instance_of(BulkImports::Tracker) do |tracker|
- expect(tracker).to receive(:retry).and_call_original
- end
+ context 'when error is retriable' do
+ it 'reenqueues the worker' do
+ expect_any_instance_of(BulkImports::Tracker) do |tracker|
+ expect(tracker).to receive(:retry).and_call_original
+ end
+
+ expect_next_instance_of(Gitlab::Import::Logger) do |logger|
+ expect(logger)
+ .to receive(:info)
+ .with(
+ hash_including(
+ 'pipeline_name' => 'FakePipeline',
+ 'entity_id' => entity.id
+ )
+ )
+ end
- expect_next_instance_of(Gitlab::Import::Logger) do |logger|
- expect(logger)
- .to receive(:info)
+ expect(described_class)
+ .to receive(:perform_in)
.with(
- hash_including(
- 'pipeline_name' => 'FakePipeline',
- 'entity_id' => entity.id
- )
+ 60.seconds,
+ pipeline_tracker.id,
+ pipeline_tracker.stage,
+ pipeline_tracker.entity.id
)
+
+ subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
+
+ pipeline_tracker.reload
+
+ expect(pipeline_tracker.enqueued?).to be_truthy
end
- expect(described_class)
- .to receive(:perform_in)
- .with(
- 60.seconds,
- pipeline_tracker.id,
- pipeline_tracker.stage,
- pipeline_tracker.entity.id
- )
+ context 'when error is not retriable' do
+ let(:exception) do
+ BulkImports::NetworkError.new(response: instance_double(HTTParty::Response, code: 503, headers: {}))
+ end
- subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
+ it 'marks tracker as failed and logs the error' do
+ expect(described_class).not_to receive(:perform_in)
+
+ subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
- pipeline_tracker.reload
+ pipeline_tracker.reload
- expect(pipeline_tracker.enqueued?).to be_truthy
+ expect(pipeline_tracker.failed?).to eq(true)
+ end
+ end
end
end
end
@@ -253,13 +296,14 @@ RSpec.describe BulkImports::PipelineWorker do
allow_next_instance_of(BulkImports::Groups::Stage) do |instance|
allow(instance).to receive(:pipelines)
- .and_return([[0, file_extraction_pipeline]])
+ .and_return([{ stage: 0, pipeline: file_extraction_pipeline }])
end
end
it 'runs the pipeline successfully' do
allow_next_instance_of(BulkImports::ExportStatus) do |status|
allow(status).to receive(:started?).and_return(false)
+ allow(status).to receive(:empty?).and_return(false)
allow(status).to receive(:failed?).and_return(false)
end
@@ -272,6 +316,28 @@ RSpec.describe BulkImports::PipelineWorker do
it 'reenqueues pipeline worker' do
allow_next_instance_of(BulkImports::ExportStatus) do |status|
allow(status).to receive(:started?).and_return(true)
+ allow(status).to receive(:empty?).and_return(false)
+ allow(status).to receive(:failed?).and_return(false)
+ end
+
+ expect(described_class)
+ .to receive(:perform_in)
+ .with(
+ described_class::FILE_EXTRACTION_PIPELINE_PERFORM_DELAY,
+ pipeline_tracker.id,
+ pipeline_tracker.stage,
+ entity.id
+ )
+
+ subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
+ end
+ end
+
+ context 'when export status is empty' do
+ it 'reenqueues pipeline worker' do
+ allow_next_instance_of(BulkImports::ExportStatus) do |status|
+ allow(status).to receive(:started?).and_return(false)
+ allow(status).to receive(:empty?).and_return(true)
allow(status).to receive(:failed?).and_return(false)
end