diff options
Diffstat (limited to 'spec/workers/bulk_imports/entity_worker_spec.rb')
-rw-r--r-- | spec/workers/bulk_imports/entity_worker_spec.rb | 226 |
1 files changed, 91 insertions, 135 deletions
diff --git a/spec/workers/bulk_imports/entity_worker_spec.rb b/spec/workers/bulk_imports/entity_worker_spec.rb index 8238721df01..5f948906c08 100644 --- a/spec/workers/bulk_imports/entity_worker_spec.rb +++ b/spec/workers/bulk_imports/entity_worker_spec.rb @@ -3,9 +3,11 @@ require 'spec_helper' RSpec.describe BulkImports::EntityWorker, feature_category: :importers do - let_it_be(:entity) { create(:bulk_import_entity) } + subject(:worker) { described_class.new } - let_it_be(:pipeline_tracker) do + let_it_be(:entity) { create(:bulk_import_entity, :started) } + + let_it_be_with_reload(:pipeline_tracker) do create( :bulk_import_tracker, entity: entity, @@ -14,173 +16,127 @@ RSpec.describe BulkImports::EntityWorker, feature_category: :importers do ) end - let(:job_args) { entity.id } + let_it_be_with_reload(:pipeline_tracker_2) do + create( + :bulk_import_tracker, + entity: entity, + pipeline_name: 'Stage1::Pipeline', + stage: 1 + ) + end + + include_examples 'an idempotent worker' do + let(:job_args) { entity.id } - it 'updates pipeline trackers to enqueued state when selected' do - worker = described_class.new + before do + allow(described_class).to receive(:perform_in) + allow(BulkImports::PipelineWorker).to receive(:perform_async) + end - next_tracker = worker.send(:next_pipeline_trackers_for, entity.id).first + it 'enqueues the pipeline workers of the first stage and then re-enqueues itself' do + expect_next_instance_of(Gitlab::Import::Logger) do |logger| + expect(logger).to receive(:info).with(hash_including('message' => 'Stage starting', 'entity_stage' => 0)) + expect(logger).to receive(:info).with(hash_including('message' => 'Stage running', 'entity_stage' => 0)) + end - next_tracker.reload + expect(BulkImports::PipelineWorker) + .to receive(:perform_async) + .with(pipeline_tracker.id, pipeline_tracker.stage, entity.id) - expect(next_tracker.enqueued?).to be_truthy + expect(described_class).to receive(:perform_in).twice.with(described_class::PERFORM_DELAY, entity.id) - expect(worker.send(:next_pipeline_trackers_for, entity.id)) - .not_to include(next_tracker) + expect { subject }.to change { pipeline_tracker.reload.status_name }.from(:created).to(:enqueued) + end end - include_examples 'an idempotent worker' do - it 'enqueues the first stage pipelines work' do + context 'when pipeline workers from a stage are running' do + before do + pipeline_tracker.enqueue! + end + + it 'does not enqueue the pipeline workers from the next stage and re-enqueues itself' do expect_next_instance_of(Gitlab::Import::Logger) do |logger| - # the worker runs twice but only executes once - expect(logger) - .to receive(:info).twice - .with( - hash_including( - 'bulk_import_entity_id' => entity.id, - 'bulk_import_id' => entity.bulk_import_id, - 'bulk_import_entity_type' => entity.source_type, - 'source_full_path' => entity.source_full_path, - 'current_stage' => nil, - 'message' => 'Stage starting', - 'source_version' => entity.bulk_import.source_version_info.to_s, - 'importer' => 'gitlab_migration' - ) - ) + expect(logger).to receive(:info).with(hash_including('message' => 'Stage running', 'entity_stage' => 0)) end - expect(BulkImports::PipelineWorker) - .to receive(:perform_async) - .with( - pipeline_tracker.id, - pipeline_tracker.stage, - entity.id - ) + expect(BulkImports::PipelineWorker).not_to receive(:perform_async) + expect(described_class).to receive(:perform_in).with(described_class::PERFORM_DELAY, entity.id) - subject + worker.perform(entity.id) end + end - it 'logs and tracks the raised exceptions' do - exception = StandardError.new('Error!') - - expect(BulkImports::PipelineWorker) - .to receive(:perform_async) - .and_raise(exception) + context 'when there are no pipeline workers from the previous stage running' do + before do + pipeline_tracker.fail_op! + end + it 'enqueues the pipeline workers from the next stage and re-enqueues itself' do expect_next_instance_of(Gitlab::Import::Logger) do |logger| - expect(logger) - .to receive(:info).twice - .with( - hash_including( - 'bulk_import_entity_id' => entity.id, - 'bulk_import_id' => entity.bulk_import_id, - 'bulk_import_entity_type' => entity.source_type, - 'source_full_path' => entity.source_full_path, - 'current_stage' => nil, - 'source_version' => entity.bulk_import.source_version_info.to_s, - 'importer' => 'gitlab_migration' - ) - ) + expect(logger).to receive(:info).with(hash_including('message' => 'Stage starting', 'entity_stage' => 1)) + end - expect(logger) - .to receive(:error) + expect(BulkImports::PipelineWorker) + .to receive(:perform_async) .with( - hash_including( - 'bulk_import_entity_id' => entity.id, - 'bulk_import_id' => entity.bulk_import_id, - 'bulk_import_entity_type' => entity.source_type, - 'source_full_path' => entity.source_full_path, - 'current_stage' => nil, - 'message' => 'Entity failed', - 'exception.backtrace' => anything, - 'exception.class' => 'StandardError', - 'exception.message' => 'Error!', - 'importer' => 'gitlab_migration', - 'source_version' => entity.bulk_import.source_version_info.to_s - ) + pipeline_tracker_2.id, + pipeline_tracker_2.stage, + entity.id ) - end - expect(Gitlab::ErrorTracking) - .to receive(:track_exception) - .with( - exception, - bulk_import_entity_id: entity.id, - bulk_import_id: entity.bulk_import_id, - bulk_import_entity_type: entity.source_type, - source_full_path: entity.source_full_path, - source_version: entity.bulk_import.source_version_info.to_s, - importer: 'gitlab_migration' - ) + expect(described_class).to receive(:perform_in).with(described_class::PERFORM_DELAY, entity.id) - subject + worker.perform(entity.id) + end + end - expect(entity.reload.failed?).to eq(true) + context 'when there are no next stage to run' do + before do + pipeline_tracker.fail_op! + pipeline_tracker_2.fail_op! end - context 'in first stage' do - let(:job_args) { [entity.id, 0] } + it 'does not enqueue any pipeline worker and re-enqueues itself' do + expect(BulkImports::PipelineWorker).not_to receive(:perform_async) + expect(described_class).to receive(:perform_in).with(described_class::PERFORM_DELAY, entity.id) - it 'do not enqueue a new pipeline job if the current stage still running' do - expect_next_instance_of(Gitlab::Import::Logger) do |logger| - expect(logger) - .to receive(:info).twice - .with( - hash_including( - 'bulk_import_entity_id' => entity.id, - 'bulk_import_id' => entity.bulk_import_id, - 'bulk_import_entity_type' => entity.source_type, - 'source_full_path' => entity.source_full_path, - 'current_stage' => 0, - 'message' => 'Stage running', - 'source_version' => entity.bulk_import.source_version_info.to_s, - 'importer' => 'gitlab_migration' - ) - ) - end + worker.perform(entity.id) + end + end - expect(BulkImports::PipelineWorker) - .not_to receive(:perform_async) + context 'when entity status is not started' do + let(:entity) { create(:bulk_import_entity, :finished) } - subject - end + it 'does not re-enqueues itself' do + expect(described_class).not_to receive(:perform_in) + + worker.perform(entity.id) + end + end - it 'enqueues the next stage pipelines when the current stage is finished' do - next_stage_pipeline_tracker = create( - :bulk_import_tracker, - entity: entity, - pipeline_name: 'Stage1::Pipeline', - stage: 1 - ) + it 'logs and tracks the raised exceptions' do + exception = StandardError.new('Error!') - pipeline_tracker.fail_op! + expect(BulkImports::PipelineWorker) + .to receive(:perform_async) + .and_raise(exception) - expect_next_instance_of(Gitlab::Import::Logger) do |logger| - expect(logger) - .to receive(:info).twice + expect(Gitlab::ErrorTracking) + .to receive(:track_exception) .with( + exception, hash_including( - 'bulk_import_entity_id' => entity.id, - 'bulk_import_id' => entity.bulk_import_id, - 'bulk_import_entity_type' => entity.source_type, - 'source_full_path' => entity.source_full_path, - 'current_stage' => 0, - 'source_version' => entity.bulk_import.source_version_info.to_s, - 'importer' => 'gitlab_migration' + bulk_import_entity_id: entity.id, + bulk_import_id: entity.bulk_import_id, + bulk_import_entity_type: entity.source_type, + source_full_path: entity.source_full_path, + source_version: entity.bulk_import.source_version_info.to_s, + importer: 'gitlab_migration' ) ) - end - expect(BulkImports::PipelineWorker) - .to receive(:perform_async) - .with( - next_stage_pipeline_tracker.id, - next_stage_pipeline_tracker.stage, - entity.id - ) + worker.perform(entity.id) - subject - end - end + expect(entity.reload.failed?).to eq(true) end end |