diff options
author | GitLab Bot <gitlab-bot@gitlab.com> | 2023-12-14 06:10:57 +0300 |
---|---|---|
committer | GitLab Bot <gitlab-bot@gitlab.com> | 2023-12-14 06:10:57 +0300 |
commit | 3e4c792b2a44dd6f84398e7b256cec4aa05ca3eb (patch) | |
tree | 4b1d97d6ab1217dade377e5e76e49a66cb24735d /app/workers | |
parent | c4fad7502e407e6d744206fe1b235bc2e3a44d98 (diff) |
Add latest changes from gitlab-org/gitlab@master
Diffstat (limited to 'app/workers')
-rw-r--r-- | app/workers/bulk_imports/finish_batched_pipeline_worker.rb | 2 | ||||
-rw-r--r-- | app/workers/bulk_imports/pipeline_worker.rb | 65 |
2 files changed, 54 insertions, 13 deletions
diff --git a/app/workers/bulk_imports/finish_batched_pipeline_worker.rb b/app/workers/bulk_imports/finish_batched_pipeline_worker.rb index 60676f4bd15..2670dc5438d 100644 --- a/app/workers/bulk_imports/finish_batched_pipeline_worker.rb +++ b/app/workers/bulk_imports/finish_batched_pipeline_worker.rb @@ -42,7 +42,7 @@ module BulkImports end def import_in_progress? - sorted_batches.any? { |b| b.started? || b.created? } + sorted_batches.in_progress.any? end def most_recent_batch_stale? diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb index 184b321ca7a..0bb9464c6de 100644 --- a/app/workers/bulk_imports/pipeline_worker.rb +++ b/app/workers/bulk_imports/pipeline_worker.rb @@ -4,9 +4,12 @@ module BulkImports class PipelineWorker include ApplicationWorker include ExclusiveLeaseGuard + include Gitlab::Utils::StrongMemoize FILE_EXTRACTION_PIPELINE_PERFORM_DELAY = 10.seconds + LimitedBatches = Struct.new(:numbers, :final?, keyword_init: true).freeze + DEFER_ON_HEALTH_DELAY = 5.minutes data_consistency :always @@ -52,7 +55,6 @@ module BulkImports try_obtain_lease do if pipeline_tracker.enqueued? || pipeline_tracker.started? logger.info(log_attributes(message: 'Pipeline starting')) - run end end @@ -84,7 +86,8 @@ module BulkImports return pipeline_tracker.finish! if export_status.batches_count < 1 - enqueue_batches + enqueue_limited_batches + re_enqueue unless all_batches_enqueued? else log_extra_metadata_on_done(:batched, false) @@ -194,22 +197,60 @@ module BulkImports Time.zone.now - (pipeline_tracker.created_at || entity.created_at) end - def lease_timeout - 30 + def enqueue_limited_batches + next_batch.numbers.each do |batch_number| + batch = pipeline_tracker.batches.create!(batch_number: batch_number) + + with_context(bulk_import_entity_id: entity.id) do + ::BulkImports::PipelineBatchWorker.perform_async(batch.id) + end + end + + log_extra_metadata_on_done(:tracker_batch_numbers_enqueued, next_batch.numbers) + log_extra_metadata_on_done(:tracker_final_batch_was_enqueued, next_batch.final?) end - def lease_key - "gitlab:bulk_imports:pipeline_worker:#{pipeline_tracker.id}" + def all_batches_enqueued? + next_batch.final? end - def enqueue_batches - 1.upto(export_status.batches_count) do |batch_number| - batch = pipeline_tracker.batches.find_or_create_by!(batch_number: batch_number) # rubocop:disable CodeReuse/ActiveRecord + def next_batch + all_batch_numbers = (1..export_status.batches_count).to_a - with_context(bulk_import_entity_id: entity.id) do - ::BulkImports::PipelineBatchWorker.perform_async(batch.id) - end + created_batch_numbers = pipeline_tracker.batches.pluck_batch_numbers + + remaining_batch_numbers = all_batch_numbers - created_batch_numbers + + if Feature.disabled?(:bulk_import_limit_concurrent_batches, context.portable) + return LimitedBatches.new(numbers: remaining_batch_numbers, final?: true) end + + limit = next_batch_count + + LimitedBatches.new( + numbers: remaining_batch_numbers.first(limit), + final?: remaining_batch_numbers.count <= limit + ) + end + strong_memoize_attr :next_batch + + # Calculate the number of batches, up to `batch_limit`, to process in the + # next round. + def next_batch_count + limit = batch_limit - pipeline_tracker.batches.in_progress.limit(batch_limit).count + [limit, 0].max + end + + def batch_limit + ::Gitlab::CurrentSettings.bulk_import_concurrent_pipeline_batch_limit + end + + def lease_timeout + 30 + end + + def lease_key + "gitlab:bulk_imports:pipeline_worker:#{pipeline_tracker.id}" end end end |