Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGitLab Bot <gitlab-bot@gitlab.com>2023-12-14 06:10:57 +0300
committerGitLab Bot <gitlab-bot@gitlab.com>2023-12-14 06:10:57 +0300
commit3e4c792b2a44dd6f84398e7b256cec4aa05ca3eb (patch)
tree4b1d97d6ab1217dade377e5e76e49a66cb24735d /app/workers
parentc4fad7502e407e6d744206fe1b235bc2e3a44d98 (diff)
Add latest changes from gitlab-org/gitlab@master
Diffstat (limited to 'app/workers')
-rw-r--r--app/workers/bulk_imports/finish_batched_pipeline_worker.rb2
-rw-r--r--app/workers/bulk_imports/pipeline_worker.rb65
2 files changed, 54 insertions, 13 deletions
diff --git a/app/workers/bulk_imports/finish_batched_pipeline_worker.rb b/app/workers/bulk_imports/finish_batched_pipeline_worker.rb
index 60676f4bd15..2670dc5438d 100644
--- a/app/workers/bulk_imports/finish_batched_pipeline_worker.rb
+++ b/app/workers/bulk_imports/finish_batched_pipeline_worker.rb
@@ -42,7 +42,7 @@ module BulkImports
end
def import_in_progress?
- sorted_batches.any? { |b| b.started? || b.created? }
+ sorted_batches.in_progress.any?
end
def most_recent_batch_stale?
diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb
index 184b321ca7a..0bb9464c6de 100644
--- a/app/workers/bulk_imports/pipeline_worker.rb
+++ b/app/workers/bulk_imports/pipeline_worker.rb
@@ -4,9 +4,12 @@ module BulkImports
class PipelineWorker
include ApplicationWorker
include ExclusiveLeaseGuard
+ include Gitlab::Utils::StrongMemoize
FILE_EXTRACTION_PIPELINE_PERFORM_DELAY = 10.seconds
+ LimitedBatches = Struct.new(:numbers, :final?, keyword_init: true).freeze
+
DEFER_ON_HEALTH_DELAY = 5.minutes
data_consistency :always
@@ -52,7 +55,6 @@ module BulkImports
try_obtain_lease do
if pipeline_tracker.enqueued? || pipeline_tracker.started?
logger.info(log_attributes(message: 'Pipeline starting'))
-
run
end
end
@@ -84,7 +86,8 @@ module BulkImports
return pipeline_tracker.finish! if export_status.batches_count < 1
- enqueue_batches
+ enqueue_limited_batches
+ re_enqueue unless all_batches_enqueued?
else
log_extra_metadata_on_done(:batched, false)
@@ -194,22 +197,60 @@ module BulkImports
Time.zone.now - (pipeline_tracker.created_at || entity.created_at)
end
- def lease_timeout
- 30
+ def enqueue_limited_batches
+ next_batch.numbers.each do |batch_number|
+ batch = pipeline_tracker.batches.create!(batch_number: batch_number)
+
+ with_context(bulk_import_entity_id: entity.id) do
+ ::BulkImports::PipelineBatchWorker.perform_async(batch.id)
+ end
+ end
+
+ log_extra_metadata_on_done(:tracker_batch_numbers_enqueued, next_batch.numbers)
+ log_extra_metadata_on_done(:tracker_final_batch_was_enqueued, next_batch.final?)
end
- def lease_key
- "gitlab:bulk_imports:pipeline_worker:#{pipeline_tracker.id}"
+ def all_batches_enqueued?
+ next_batch.final?
end
- def enqueue_batches
- 1.upto(export_status.batches_count) do |batch_number|
- batch = pipeline_tracker.batches.find_or_create_by!(batch_number: batch_number) # rubocop:disable CodeReuse/ActiveRecord
+ def next_batch
+ all_batch_numbers = (1..export_status.batches_count).to_a
- with_context(bulk_import_entity_id: entity.id) do
- ::BulkImports::PipelineBatchWorker.perform_async(batch.id)
- end
+ created_batch_numbers = pipeline_tracker.batches.pluck_batch_numbers
+
+ remaining_batch_numbers = all_batch_numbers - created_batch_numbers
+
+ if Feature.disabled?(:bulk_import_limit_concurrent_batches, context.portable)
+ return LimitedBatches.new(numbers: remaining_batch_numbers, final?: true)
end
+
+ limit = next_batch_count
+
+ LimitedBatches.new(
+ numbers: remaining_batch_numbers.first(limit),
+ final?: remaining_batch_numbers.count <= limit
+ )
+ end
+ strong_memoize_attr :next_batch
+
+ # Calculate the number of batches, up to `batch_limit`, to process in the
+ # next round.
+ def next_batch_count
+ limit = batch_limit - pipeline_tracker.batches.in_progress.limit(batch_limit).count
+ [limit, 0].max
+ end
+
+ def batch_limit
+ ::Gitlab::CurrentSettings.bulk_import_concurrent_pipeline_batch_limit
+ end
+
+ def lease_timeout
+ 30
+ end
+
+ def lease_key
+ "gitlab:bulk_imports:pipeline_worker:#{pipeline_tracker.id}"
end
end
end