Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGitLab Bot <gitlab-bot@gitlab.com>2023-12-19 14:01:45 +0300
committerGitLab Bot <gitlab-bot@gitlab.com>2023-12-19 14:01:45 +0300
commit9297025d0b7ddf095eb618dfaaab2ff8f2018d8b (patch)
tree865198c01d1824a9b098127baa3ab980c9cd2c06 /spec/workers/bulk_imports
parent6372471f43ee03c05a7c1f8b0c6ac6b8a7431dbe (diff)
Add latest changes from gitlab-org/gitlab@16-7-stable-eev16.7.0-rc42
Diffstat (limited to 'spec/workers/bulk_imports')
-rw-r--r--spec/workers/bulk_imports/entity_worker_spec.rb26
-rw-r--r--spec/workers/bulk_imports/export_request_worker_spec.rb13
-rw-r--r--spec/workers/bulk_imports/finish_batched_pipeline_worker_spec.rb46
-rw-r--r--spec/workers/bulk_imports/pipeline_worker_spec.rb251
-rw-r--r--spec/workers/bulk_imports/stuck_import_worker_spec.rb59
-rw-r--r--spec/workers/bulk_imports/transform_references_worker_spec.rb257
6 files changed, 545 insertions, 107 deletions
diff --git a/spec/workers/bulk_imports/entity_worker_spec.rb b/spec/workers/bulk_imports/entity_worker_spec.rb
index 690555aa08f..325b31c85db 100644
--- a/spec/workers/bulk_imports/entity_worker_spec.rb
+++ b/spec/workers/bulk_imports/entity_worker_spec.rb
@@ -49,10 +49,6 @@ RSpec.describe BulkImports::EntityWorker, feature_category: :importers do
end
end
- it 'has the option to reschedule once if deduplicated' do
- expect(described_class.get_deduplication_options).to include({ if_deduplicated: :reschedule_once })
- end
-
context 'when pipeline workers from a stage are running' do
before do
pipeline_tracker.enqueue!
@@ -77,6 +73,8 @@ RSpec.describe BulkImports::EntityWorker, feature_category: :importers do
it 'enqueues the pipeline workers from the next stage and re-enqueues itself' do
expect_next_instance_of(BulkImports::Logger) do |logger|
+ expect(logger).to receive(:with_entity).with(entity).and_call_original
+
expect(logger).to receive(:info).with(hash_including('message' => 'Stage starting', 'entity_stage' => 1))
end
@@ -92,6 +90,26 @@ RSpec.describe BulkImports::EntityWorker, feature_category: :importers do
worker.perform(entity.id)
end
+
+ context 'when exclusive lease cannot be obtained' do
+ it 'does not start next stage and re-enqueue worker' do
+ expect_next_instance_of(Gitlab::ExclusiveLease) do |lease|
+ expect(lease).to receive(:try_obtain).and_return(false)
+ end
+
+ expect_next_instance_of(BulkImports::Logger) do |logger|
+ expect(logger).to receive(:info).with(
+ hash_including(
+ 'message' => 'Cannot obtain an exclusive lease. There must be another instance already in execution.'
+ )
+ )
+ end
+
+ expect(described_class).to receive(:perform_in)
+
+ worker.perform(entity.id)
+ end
+ end
end
context 'when there are no next stage to run' do
diff --git a/spec/workers/bulk_imports/export_request_worker_spec.rb b/spec/workers/bulk_imports/export_request_worker_spec.rb
index e9d0b6b24b2..2cc6348bb27 100644
--- a/spec/workers/bulk_imports/export_request_worker_spec.rb
+++ b/spec/workers/bulk_imports/export_request_worker_spec.rb
@@ -72,17 +72,14 @@ RSpec.describe BulkImports::ExportRequestWorker, feature_category: :importers do
entity.update!(source_xid: nil)
expect_next_instance_of(BulkImports::Logger) do |logger|
+ expect(logger).to receive(:with_entity).with(entity).and_call_original
+
expect(logger).to receive(:error).with(
a_hash_including(
- 'bulk_import_entity_id' => entity.id,
- 'bulk_import_id' => entity.bulk_import_id,
- 'bulk_import_entity_type' => entity.source_type,
- 'source_full_path' => entity.source_full_path,
'exception.backtrace' => anything,
'exception.class' => 'NoMethodError',
'exception.message' => /^undefined method `model_id' for nil:NilClass/,
- 'message' => 'Failed to fetch source entity id',
- 'source_version' => entity.bulk_import.source_version_info.to_s
+ 'message' => 'Failed to fetch source entity id'
)
).twice
end
@@ -148,7 +145,9 @@ RSpec.describe BulkImports::ExportRequestWorker, feature_category: :importers do
entity = create(:bulk_import_entity, bulk_import: bulk_import)
error = 'Exhausted error!'
- expect_next_instance_of(Gitlab::Import::Logger) do |logger|
+ expect_next_instance_of(BulkImports::Logger) do |logger|
+ expect(logger).to receive(:with_entity).with(entity).and_call_original
+
expect(logger)
.to receive(:error)
.with(hash_including('message' => "Request to export #{entity.source_type} failed"))
diff --git a/spec/workers/bulk_imports/finish_batched_pipeline_worker_spec.rb b/spec/workers/bulk_imports/finish_batched_pipeline_worker_spec.rb
index 59ae4205c0f..2dd5b23b3d2 100644
--- a/spec/workers/bulk_imports/finish_batched_pipeline_worker_spec.rb
+++ b/spec/workers/bulk_imports/finish_batched_pipeline_worker_spec.rb
@@ -33,6 +33,8 @@ RSpec.describe BulkImports::FinishBatchedPipelineWorker, feature_category: :impo
)
end
+ let!(:batch_1) { create(:bulk_import_batch_tracker, :finished, tracker: pipeline_tracker) }
+
subject(:worker) { described_class.new }
describe '#perform' do
@@ -45,27 +47,30 @@ RSpec.describe BulkImports::FinishBatchedPipelineWorker, feature_category: :impo
end
end
- context 'when import is in progress' do
- it 'marks the tracker as finished' do
- expect_next_instance_of(BulkImports::Logger) do |logger|
- expect(logger).to receive(:info).with(
- a_hash_including('message' => 'Tracker finished')
- )
- end
+ it 'marks the tracker as finished' do
+ expect_next_instance_of(BulkImports::Logger) do |logger|
+ expect(logger).to receive(:with_tracker).with(pipeline_tracker).and_call_original
+ expect(logger).to receive(:with_entity).with(entity).and_call_original
- expect { subject.perform(pipeline_tracker.id) }
- .to change { pipeline_tracker.reload.finished? }
- .from(false).to(true)
+ expect(logger).to receive(:info).with(
+ a_hash_including('message' => 'Tracker finished')
+ )
end
- it "calls the pipeline's `#on_finish`" do
- expect_next_instance_of(pipeline_class) do |pipeline|
- expect(pipeline).to receive(:on_finish)
- end
+ expect { subject.perform(pipeline_tracker.id) }
+ .to change { pipeline_tracker.reload.finished? }
+ .from(false).to(true)
+ end
- subject.perform(pipeline_tracker.id)
+ it "calls the pipeline's `#on_finish`" do
+ expect_next_instance_of(pipeline_class) do |pipeline|
+ expect(pipeline).to receive(:on_finish)
end
+ subject.perform(pipeline_tracker.id)
+ end
+
+ context 'when import is in progress' do
it 're-enqueues for any started batches' do
create(:bulk_import_batch_tracker, :started, tracker: pipeline_tracker)
@@ -88,14 +93,17 @@ RSpec.describe BulkImports::FinishBatchedPipelineWorker, feature_category: :impo
end
context 'when pipeline tracker is stale' do
- let(:pipeline_tracker) { create(:bulk_import_tracker, :started, :batched, :stale, entity: entity) }
+ before do
+ batch_1.update!(updated_at: 5.hours.ago)
+ end
it 'fails pipeline tracker and its batches' do
- create(:bulk_import_batch_tracker, :finished, tracker: pipeline_tracker)
-
expect_next_instance_of(BulkImports::Logger) do |logger|
+ expect(logger).to receive(:with_tracker).with(pipeline_tracker).and_call_original
+ expect(logger).to receive(:with_entity).with(entity).and_call_original
+
expect(logger).to receive(:error).with(
- a_hash_including('message' => 'Tracker stale. Failing batches and tracker')
+ a_hash_including('message' => 'Batch stale. Failing batches and tracker')
)
end
diff --git a/spec/workers/bulk_imports/pipeline_worker_spec.rb b/spec/workers/bulk_imports/pipeline_worker_spec.rb
index d99b3e9de73..368c7537641 100644
--- a/spec/workers/bulk_imports/pipeline_worker_spec.rb
+++ b/spec/workers/bulk_imports/pipeline_worker_spec.rb
@@ -16,12 +16,16 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
def self.file_extraction_pipeline?
false
end
+
+ def self.abort_on_failure?
+ false
+ end
end
end
let_it_be(:bulk_import) { create(:bulk_import) }
let_it_be(:config) { create(:bulk_import_configuration, bulk_import: bulk_import) }
- let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import) }
+ let_it_be_with_reload(:entity) { create(:bulk_import_entity, bulk_import: bulk_import) }
let(:pipeline_tracker) do
create(
@@ -44,7 +48,7 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
end
end
- include_examples 'an idempotent worker' do
+ it_behaves_like 'an idempotent worker' do
let(:job_args) { [pipeline_tracker.id, pipeline_tracker.stage, entity.id] }
it 'runs the pipeline and sets tracker to finished' do
@@ -61,17 +65,9 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
it 'runs the given pipeline successfully' do
expect_next_instance_of(BulkImports::Logger) do |logger|
- expect(logger)
- .to receive(:info)
- .with(
- hash_including(
- 'pipeline_class' => 'FakePipeline',
- 'bulk_import_id' => entity.bulk_import_id,
- 'bulk_import_entity_id' => entity.id,
- 'bulk_import_entity_type' => entity.source_type,
- 'source_full_path' => entity.source_full_path
- )
- )
+ expect(logger).to receive(:with_tracker).with(pipeline_tracker).and_call_original
+ expect(logger).to receive(:with_entity).with(pipeline_tracker.entity).and_call_original
+ expect(logger).to receive(:info)
end
allow(worker).to receive(:jid).and_return('jid')
@@ -98,22 +94,9 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
job = { 'args' => [pipeline_tracker.id, pipeline_tracker.stage, entity.id] }
expect_next_instance_of(BulkImports::Logger) do |logger|
- expect(logger)
- .to receive(:error)
- .with(
- hash_including(
- 'pipeline_class' => 'FakePipeline',
- 'bulk_import_entity_id' => entity.id,
- 'bulk_import_id' => entity.bulk_import_id,
- 'bulk_import_entity_type' => entity.source_type,
- 'source_full_path' => entity.source_full_path,
- 'class' => 'BulkImports::PipelineWorker',
- 'exception.message' => 'Error!',
- 'message' => 'Pipeline failed',
- 'source_version' => entity.bulk_import.source_version_info.to_s,
- 'importer' => 'gitlab_migration'
- )
- )
+ expect(logger).to receive(:with_tracker).with(pipeline_tracker).and_call_original
+ expect(logger).to receive(:with_entity).with(pipeline_tracker.entity).and_call_original
+ expect(logger).to receive(:error)
end
expect(Gitlab::ErrorTracking)
@@ -121,13 +104,13 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
.with(
instance_of(StandardError),
hash_including(
- 'bulk_import_entity_id' => entity.id,
- 'bulk_import_id' => entity.bulk_import.id,
- 'bulk_import_entity_type' => entity.source_type,
- 'source_full_path' => entity.source_full_path,
- 'pipeline_class' => pipeline_tracker.pipeline_name,
- 'importer' => 'gitlab_migration',
- 'source_version' => entity.bulk_import.source_version_info.to_s
+ bulk_import_entity_id: entity.id,
+ bulk_import_id: entity.bulk_import.id,
+ bulk_import_entity_type: entity.source_type,
+ source_full_path: entity.source_full_path,
+ pipeline_class: pipeline_tracker.pipeline_name,
+ importer: 'gitlab_migration',
+ source_version: entity.bulk_import.source_version_info.to_s
)
)
@@ -156,6 +139,21 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
expect(pipeline_tracker.status_name).to eq(:failed)
expect(pipeline_tracker.jid).to eq('jid')
+ expect(entity.reload.status_name).to eq(:created)
+ end
+
+ context 'when pipeline has abort_on_failure' do
+ before do
+ allow(pipeline_class).to receive(:abort_on_failure?).and_return(true)
+ end
+
+ it 'marks entity as failed' do
+ job = { 'args' => [pipeline_tracker.id, pipeline_tracker.stage, entity.id] }
+
+ described_class.sidekiq_retries_exhausted_block.call(job, StandardError.new('Error!'))
+
+ expect(entity.reload.status_name).to eq(:failed)
+ end
end
end
@@ -266,6 +264,10 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
describe '#perform' do
context 'when entity is failed' do
+ before do
+ entity.update!(status: -1)
+ end
+
it 'marks tracker as skipped and logs the skip' do
pipeline_tracker = create(
:bulk_import_tracker,
@@ -274,23 +276,12 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
status_event: 'enqueue'
)
- entity.update!(status: -1)
-
expect_next_instance_of(BulkImports::Logger) do |logger|
allow(logger).to receive(:info)
expect(logger)
.to receive(:info)
- .with(
- hash_including(
- 'pipeline_class' => 'FakePipeline',
- 'bulk_import_entity_id' => entity.id,
- 'bulk_import_id' => entity.bulk_import_id,
- 'bulk_import_entity_type' => entity.source_type,
- 'source_full_path' => entity.source_full_path,
- 'message' => 'Skipping pipeline due to failed entity'
- )
- )
+ .with(hash_including(message: 'Skipping pipeline due to failed entity'))
end
worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
@@ -323,23 +314,15 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
end
end
- it 'reenqueues the worker' do
+ it 're_enqueues the worker' do
expect_any_instance_of(BulkImports::Tracker) do |tracker|
expect(tracker).to receive(:retry).and_call_original
end
expect_next_instance_of(BulkImports::Logger) do |logger|
- expect(logger)
- .to receive(:info)
- .with(
- hash_including(
- 'pipeline_class' => 'FakePipeline',
- 'bulk_import_entity_id' => entity.id,
- 'bulk_import_id' => entity.bulk_import_id,
- 'bulk_import_entity_type' => entity.source_type,
- 'source_full_path' => entity.source_full_path
- )
- )
+ expect(logger).to receive(:with_tracker).and_call_original
+ expect(logger).to receive(:with_entity).and_call_original
+ expect(logger).to receive(:info)
end
expect(described_class)
@@ -495,8 +478,8 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
end
end
- context 'when export is batched' do
- let(:batches_count) { 2 }
+ context 'when export is batched', :aggregate_failures do
+ let(:batches_count) { 3 }
before do
allow_next_instance_of(BulkImports::ExportStatus) do |status|
@@ -506,10 +489,30 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
allow(status).to receive(:empty?).and_return(false)
allow(status).to receive(:failed?).and_return(false)
end
+ allow(worker).to receive(:log_extra_metadata_on_done).and_call_original
end
it 'enqueues pipeline batches' do
+ expect(BulkImports::PipelineBatchWorker).to receive(:perform_async).exactly(3).times
+ expect(worker).to receive(:log_extra_metadata_on_done).with(:batched, true)
+ expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_batch_numbers_enqueued, [1, 2, 3])
+ expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_final_batch_was_enqueued, true)
+
+ worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
+
+ pipeline_tracker.reload
+
+ expect(pipeline_tracker.status_name).to eq(:started)
+ expect(pipeline_tracker.batched).to eq(true)
+ expect(pipeline_tracker.batches.pluck_batch_numbers).to contain_exactly(1, 2, 3)
+ expect(described_class.jobs).to be_empty
+ end
+
+ it 'enqueues only missing pipelines batches' do
+ create(:bulk_import_batch_tracker, tracker: pipeline_tracker, batch_number: 2)
expect(BulkImports::PipelineBatchWorker).to receive(:perform_async).twice
+ expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_batch_numbers_enqueued, [1, 3])
+ expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_final_batch_was_enqueued, true)
worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
@@ -517,7 +520,8 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
expect(pipeline_tracker.status_name).to eq(:started)
expect(pipeline_tracker.batched).to eq(true)
- expect(pipeline_tracker.batches.count).to eq(batches_count)
+ expect(pipeline_tracker.batches.pluck_batch_numbers).to contain_exactly(1, 2, 3)
+ expect(described_class.jobs).to be_empty
end
context 'when batches count is less than 1' do
@@ -531,6 +535,127 @@ RSpec.describe BulkImports::PipelineWorker, feature_category: :importers do
expect(pipeline_tracker.reload.status_name).to eq(:finished)
end
end
+
+ context 'when pipeline batch enqueuing should be limited' do
+ using RSpec::Parameterized::TableSyntax
+
+ before do
+ allow(::Gitlab::CurrentSettings).to receive(:bulk_import_concurrent_pipeline_batch_limit).and_return(2)
+ end
+
+ it 'only enqueues limited batches and reenqueues itself' do
+ expect(BulkImports::PipelineBatchWorker).to receive(:perform_async).twice
+ expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_batch_numbers_enqueued, [1, 2])
+ expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_final_batch_was_enqueued, false)
+
+ worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
+
+ pipeline_tracker.reload
+
+ expect(pipeline_tracker.status_name).to eq(:started)
+ expect(pipeline_tracker.batched).to eq(true)
+ expect(pipeline_tracker.batches.pluck_batch_numbers).to contain_exactly(1, 2)
+ expect(described_class.jobs).to contain_exactly(
+ hash_including(
+ 'args' => [pipeline_tracker.id, pipeline_tracker.stage, entity.id],
+ 'scheduled_at' => be_within(1).of(10.seconds.from_now.to_i)
+ )
+ )
+ end
+
+ context 'when there is a batch in progress' do
+ where(:status) { BulkImports::BatchTracker::IN_PROGRESS_STATES }
+
+ with_them do
+ before do
+ create(:bulk_import_batch_tracker, status, batch_number: 1, tracker: pipeline_tracker)
+ end
+
+ it 'counts the in progress batch against the limit' do
+ expect(BulkImports::PipelineBatchWorker).to receive(:perform_async).once
+ expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_batch_numbers_enqueued, [2])
+ expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_final_batch_was_enqueued, false)
+
+ worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
+
+ pipeline_tracker.reload
+
+ expect(pipeline_tracker.status_name).to eq(:started)
+ expect(pipeline_tracker.batched).to eq(true)
+ expect(pipeline_tracker.batches.pluck_batch_numbers).to contain_exactly(1, 2)
+ expect(described_class.jobs).to contain_exactly(
+ hash_including(
+ 'args' => [pipeline_tracker.id, pipeline_tracker.stage, entity.id],
+ 'scheduled_at' => be_within(1).of(10.seconds.from_now.to_i)
+ )
+ )
+ end
+ end
+ end
+
+ context 'when there is a batch that has finished' do
+ where(:status) do
+ all_statuses = BulkImports::BatchTracker.state_machines[:status].states.map(&:name)
+ all_statuses - BulkImports::BatchTracker::IN_PROGRESS_STATES
+ end
+
+ with_them do
+ before do
+ create(:bulk_import_batch_tracker, status, batch_number: 1, tracker: pipeline_tracker)
+ end
+
+ it 'does not count the finished batch against the limit' do
+ expect(BulkImports::PipelineBatchWorker).to receive(:perform_async).twice
+ expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_batch_numbers_enqueued, [2, 3])
+ expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_final_batch_was_enqueued, true)
+
+ worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
+
+ pipeline_tracker.reload
+
+ expect(pipeline_tracker.batches.pluck_batch_numbers).to contain_exactly(1, 2, 3)
+ expect(described_class.jobs).to be_empty
+ end
+ end
+ end
+
+ context 'when the feature flag is disabled' do
+ before do
+ stub_feature_flags(bulk_import_limit_concurrent_batches: false)
+ end
+
+ it 'does not limit batches' do
+ expect(BulkImports::PipelineBatchWorker).to receive(:perform_async).exactly(3).times
+ expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_batch_numbers_enqueued, [1, 2, 3])
+ expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_final_batch_was_enqueued, true)
+
+ worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
+
+ pipeline_tracker.reload
+
+ expect(pipeline_tracker.status_name).to eq(:started)
+ expect(pipeline_tracker.batched).to eq(true)
+ expect(pipeline_tracker.batches.pluck_batch_numbers).to contain_exactly(1, 2, 3)
+ expect(described_class.jobs).to be_empty
+ end
+
+ it 'still enqueues only missing pipelines batches' do
+ create(:bulk_import_batch_tracker, tracker: pipeline_tracker, batch_number: 2)
+ expect(BulkImports::PipelineBatchWorker).to receive(:perform_async).twice
+ expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_batch_numbers_enqueued, [1, 3])
+ expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_final_batch_was_enqueued, true)
+
+ worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
+
+ pipeline_tracker.reload
+
+ expect(pipeline_tracker.status_name).to eq(:started)
+ expect(pipeline_tracker.batched).to eq(true)
+ expect(pipeline_tracker.batches.pluck_batch_numbers).to contain_exactly(1, 2, 3)
+ expect(described_class.jobs).to be_empty
+ end
+ end
+ end
end
end
end
diff --git a/spec/workers/bulk_imports/stuck_import_worker_spec.rb b/spec/workers/bulk_imports/stuck_import_worker_spec.rb
index eadf3864190..09fd1e1b524 100644
--- a/spec/workers/bulk_imports/stuck_import_worker_spec.rb
+++ b/spec/workers/bulk_imports/stuck_import_worker_spec.rb
@@ -5,10 +5,21 @@ require 'spec_helper'
RSpec.describe BulkImports::StuckImportWorker, feature_category: :importers do
let_it_be(:created_bulk_import) { create(:bulk_import, :created) }
let_it_be(:started_bulk_import) { create(:bulk_import, :started) }
- let_it_be(:stale_created_bulk_import) { create(:bulk_import, :created, created_at: 3.days.ago) }
- let_it_be(:stale_started_bulk_import) { create(:bulk_import, :started, created_at: 3.days.ago) }
- let_it_be(:stale_created_bulk_import_entity) { create(:bulk_import_entity, :created, created_at: 3.days.ago) }
- let_it_be(:stale_started_bulk_import_entity) { create(:bulk_import_entity, :started, created_at: 3.days.ago) }
+ let_it_be(:stale_created_bulk_import) do
+ create(:bulk_import, :created, updated_at: 3.days.ago)
+ end
+
+ let_it_be(:stale_started_bulk_import) do
+ create(:bulk_import, :started, updated_at: 3.days.ago)
+ end
+
+ let_it_be(:stale_created_bulk_import_entity) do
+ create(:bulk_import_entity, :created, updated_at: 3.days.ago)
+ end
+
+ let_it_be(:stale_started_bulk_import_entity) do
+ create(:bulk_import_entity, :started, updated_at: 3.days.ago)
+ end
let_it_be(:started_bulk_import_tracker) do
create(:bulk_import_tracker, :started, entity: stale_started_bulk_import_entity)
@@ -37,16 +48,12 @@ RSpec.describe BulkImports::StuckImportWorker, feature_category: :importers do
it 'updates the status of bulk import entities to timeout' do
expect_next_instance_of(BulkImports::Logger) do |logger|
allow(logger).to receive(:error)
- expect(logger).to receive(:error).with(
- message: 'BulkImports::Entity stale',
- bulk_import_entity_id: stale_created_bulk_import_entity.id,
- bulk_import_id: stale_created_bulk_import_entity.bulk_import_id
- )
- expect(logger).to receive(:error).with(
- message: 'BulkImports::Entity stale',
- bulk_import_entity_id: stale_started_bulk_import_entity.id,
- bulk_import_id: stale_started_bulk_import_entity.bulk_import_id
- )
+
+ expect(logger).to receive(:with_entity).with(stale_created_bulk_import_entity).and_call_original
+ expect(logger).to receive(:error).with(message: 'BulkImports::Entity stale')
+
+ expect(logger).to receive(:with_entity).with(stale_started_bulk_import_entity).and_call_original
+ expect(logger).to receive(:error).with(message: 'BulkImports::Entity stale')
end
expect { subject }.to change { stale_created_bulk_import_entity.reload.status_name }.from(:created).to(:timeout)
@@ -61,5 +68,29 @@ RSpec.describe BulkImports::StuckImportWorker, feature_category: :importers do
expect { subject }.to not_change { created_bulk_import.reload.status }
.and not_change { started_bulk_import.reload.status }
end
+
+ context 'when bulk import has been updated recently', :clean_gitlab_redis_shared_state do
+ before do
+ stale_created_bulk_import.update!(updated_at: 2.minutes.ago)
+ stale_started_bulk_import.update!(updated_at: 2.minutes.ago)
+ end
+
+ it 'does not update the status of the import' do
+ expect { subject }.to not_change { stale_created_bulk_import.reload.status_name }
+ .and not_change { stale_started_bulk_import.reload.status_name }
+ end
+ end
+
+ context 'when bulk import entity has been updated recently', :clean_gitlab_redis_shared_state do
+ before do
+ stale_created_bulk_import_entity.update!(updated_at: 2.minutes.ago)
+ stale_started_bulk_import_entity.update!(updated_at: 2.minutes.ago)
+ end
+
+ it 'does not update the status of the entity' do
+ expect { subject }.to not_change { stale_created_bulk_import_entity.reload.status_name }
+ .and not_change { stale_started_bulk_import_entity.reload.status_name }
+ end
+ end
end
end
diff --git a/spec/workers/bulk_imports/transform_references_worker_spec.rb b/spec/workers/bulk_imports/transform_references_worker_spec.rb
new file mode 100644
index 00000000000..6295ecb47d6
--- /dev/null
+++ b/spec/workers/bulk_imports/transform_references_worker_spec.rb
@@ -0,0 +1,257 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+RSpec.describe BulkImports::TransformReferencesWorker, feature_category: :importers do
+ let_it_be(:project) do
+ project = create(:project)
+ project.add_owner(user)
+ project
+ end
+
+ let_it_be(:user) { create(:user) }
+ let_it_be(:bulk_import) { create(:bulk_import) }
+
+ let_it_be(:entity) do
+ create(:bulk_import_entity, :project_entity, project: project, bulk_import: bulk_import,
+ source_full_path: 'source/full/path')
+ end
+
+ let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) }
+ let_it_be(:config) { create(:bulk_import_configuration, bulk_import: bulk_import, url: 'https://my.gitlab.com') }
+
+ let_it_be_with_refind(:issue) do
+ create(:issue,
+ project: project,
+ description: 'https://my.gitlab.com/source/full/path/-/issues/1')
+ end
+
+ let_it_be(:merge_request) do
+ create(:merge_request,
+ source_project: project,
+ description: 'https://my.gitlab.com/source/full/path/-/merge_requests/1 @source_username? @bob, @alice!')
+ end
+
+ let_it_be(:issue_note) do
+ create(:note,
+ noteable: issue,
+ project: project,
+ note: 'https://my.gitlab.com/source/full/path/-/issues/1 @older_username, not_a@username, and @old_username.')
+ end
+
+ let_it_be(:merge_request_note) do
+ create(:note,
+ noteable: merge_request,
+ project: project,
+ note: 'https://my.gitlab.com/source/full/path/-/merge_requests/1 @same_username')
+ end
+
+ let_it_be(:system_note) do
+ create(:note,
+ project: project,
+ system: true,
+ noteable: issue,
+ note: "mentioned in merge request !#{merge_request.iid} created by @old_username",
+ note_html: 'note html'
+ )
+ end
+
+ let(:expected_url) do
+ expected_url = URI('')
+ expected_url.scheme = ::Gitlab.config.gitlab.https ? 'https' : 'http'
+ expected_url.host = ::Gitlab.config.gitlab.host
+ expected_url.port = ::Gitlab.config.gitlab.port
+ expected_url.path = "/#{project.full_path}"
+ expected_url
+ end
+
+ subject { described_class.new.perform([object.id], object.class.to_s, tracker.id) }
+
+ before do
+ allow(Gitlab::Cache::Import::Caching)
+ .to receive(:values_from_hash)
+ .and_return({
+ 'old_username' => 'new_username',
+ 'older_username' => 'newer_username',
+ 'source_username' => 'destination_username',
+ 'bob' => 'alice-gdk',
+ 'alice' => 'bob-gdk',
+ 'manuelgrabowski' => 'manuelgrabowski-admin',
+ 'manuelgrabowski-admin' => 'manuelgrabowski',
+ 'boaty-mc-boatface' => 'boatymcboatface',
+ 'boatymcboatface' => 'boaty-mc-boatface'
+ })
+ end
+
+ it_behaves_like 'an idempotent worker' do
+ let(:job_args) { [[issue.id], 'Issue', tracker.id] }
+ end
+
+ it 'transforms and saves multiple objects' do
+ old_note = merge_request_note.note
+ merge_request_note_2 = create(:note, noteable: merge_request, project: project, note: old_note)
+
+ described_class.new.perform([merge_request_note.id, merge_request_note_2.id], 'Note', tracker.id)
+
+ expect(merge_request_note.reload.note).not_to eq(old_note)
+ expect(merge_request_note_2.reload.note).not_to eq(old_note)
+ end
+
+ shared_examples 'transforms and saves references' do
+ it 'transforms references and saves the object' do
+ expect_any_instance_of(object.class) do |object|
+ expect(object).to receive(:save!)
+ end
+
+ expect { subject }.not_to change { object.updated_at }
+
+ expect(body).to eq(expected_body)
+ end
+
+ context 'when an error is raised' do
+ before do
+ allow(BulkImports::UsersMapper).to receive(:new).and_raise(StandardError)
+ end
+
+ it 'tracks the error and creates an import failure' do
+ expect(Gitlab::ErrorTracking).to receive(:track_exception)
+ .with(anything, hash_including(bulk_import_id: bulk_import.id))
+
+ expect(BulkImports::Failure).to receive(:create)
+ .with(hash_including(bulk_import_entity_id: entity.id, pipeline_class: 'ReferencesPipeline'))
+
+ subject
+ end
+ end
+ end
+
+ context 'for issue description' do
+ let(:object) { issue }
+ let(:body) { object.reload.description }
+ let(:expected_body) { "http://localhost:80/#{object.namespace.full_path}/-/issues/1" }
+
+ include_examples 'transforms and saves references'
+
+ shared_examples 'returns object unchanged' do
+ it 'returns object unchanged' do
+ issue.update!(description: description)
+
+ subject
+
+ expect(issue.reload.description).to eq(description)
+ end
+
+ it 'does not save the object' do
+ expect_any_instance_of(object.class) do |object|
+ expect(object).to receive(:save!)
+ end
+
+ subject
+ end
+ end
+
+ context 'when object does not have reference or username' do
+ let(:description) { 'foo' }
+
+ include_examples 'returns object unchanged'
+ end
+
+ context 'when there are no matched urls or usernames' do
+ let(:description) { 'https://my.gitlab.com/another/project/path/-/issues/1 @random_username' }
+
+ include_examples 'returns object unchanged'
+ end
+
+ context 'when url path does not start with source full path' do
+ let(:description) { 'https://my.gitlab.com/another/source/full/path/-/issues/1' }
+
+ include_examples 'returns object unchanged'
+ end
+
+ context 'when host does not match and url path starts with source full path' do
+ let(:description) { 'https://another.gitlab.com/source/full/path/-/issues/1' }
+
+ include_examples 'returns object unchanged'
+ end
+
+ context 'when url does not match at all' do
+ let(:description) { 'https://website.example/foo/bar' }
+
+ include_examples 'returns object unchanged'
+ end
+ end
+
+ context 'for merge request description' do
+ let(:object) { merge_request }
+ let(:body) { object.reload.description }
+ let(:expected_body) do
+ "#{expected_url}/-/merge_requests/#{merge_request.iid} @destination_username? @alice-gdk, @bob-gdk!"
+ end
+
+ include_examples 'transforms and saves references'
+ end
+
+ context 'for issue notes' do
+ let(:object) { issue_note }
+ let(:body) { object.reload.note }
+ let(:expected_body) { "#{expected_url}/-/issues/#{issue.iid} @newer_username, not_a@username, and @new_username." }
+
+ include_examples 'transforms and saves references'
+ end
+
+ context 'for merge request notes' do
+ let(:object) { merge_request_note }
+ let(:body) { object.reload.note }
+ let(:expected_body) { "#{expected_url}/-/merge_requests/#{merge_request.iid} @same_username" }
+
+ include_examples 'transforms and saves references'
+ end
+
+ context 'for system notes' do
+ let(:object) { system_note }
+ let(:body) { object.reload.note }
+ let(:expected_body) { "mentioned in merge request !#{merge_request.iid} created by @new_username" }
+
+ include_examples 'transforms and saves references'
+
+ context 'when the note includes a username' do
+ let_it_be(:object) do
+ create(:note,
+ project: project,
+ system: true,
+ noteable: issue,
+ note: 'mentioned in merge request created by @source_username.',
+ note_html: 'empty'
+ )
+ end
+
+ let(:body) { object.reload.note }
+ let(:expected_body) { 'mentioned in merge request created by @destination_username.' }
+
+ include_examples 'transforms and saves references'
+ end
+ end
+
+ context 'when old and new usernames are interchanged' do
+ # e.g
+ # |------------------------|-------------------------|
+ # | old_username | new_username |
+ # |------------------------|-------------------------|
+ # | @manuelgrabowski-admin | @manuelgrabowski |
+ # | @manuelgrabowski | @manuelgrabowski-admin |
+ # |------------------------|-------------------------|
+
+ let_it_be(:object) do
+ create(:note,
+ project: project,
+ noteable: merge_request,
+ note: '@manuelgrabowski-admin, @boaty-mc-boatface'
+ )
+ end
+
+ let(:body) { object.reload.note }
+ let(:expected_body) { '@manuelgrabowski, @boatymcboatface' }
+
+ include_examples 'transforms and saves references'
+ end
+end