Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'app/workers/bulk_imports/pipeline_worker.rb')
-rw-r--r--app/workers/bulk_imports/pipeline_worker.rb23
1 files changed, 19 insertions, 4 deletions
diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb
index f03e0bc0656..e0db18cb987 100644
--- a/app/workers/bulk_imports/pipeline_worker.rb
+++ b/app/workers/bulk_imports/pipeline_worker.rb
@@ -31,7 +31,6 @@ module BulkImports
fail_tracker(StandardError.new(message)) unless pipeline_tracker.finished? || pipeline_tracker.skipped?
end
end
-
ensure
::BulkImports::EntityWorker.perform_async(entity_id, stage)
end
@@ -49,9 +48,17 @@ module BulkImports
return re_enqueue if export_empty? || export_started?
- pipeline_tracker.update!(status_event: 'start', jid: jid)
- pipeline_tracker.pipeline_class.new(context).run
- pipeline_tracker.finish!
+ if file_extraction_pipeline? && export_status.batched?
+ pipeline_tracker.update!(status_event: 'start', jid: jid, batched: true)
+
+ return pipeline_tracker.finish! if export_status.batches_count < 1
+
+ enqueue_batches
+ else
+ pipeline_tracker.update!(status_event: 'start', jid: jid)
+ pipeline_tracker.pipeline_class.new(context).run
+ pipeline_tracker.finish!
+ end
rescue BulkImports::RetryPipelineError => e
retry_tracker(e)
rescue StandardError => e
@@ -179,5 +186,13 @@ module BulkImports
time_since_tracker_created > Pipeline::NDJSON_EXPORT_TIMEOUT
end
+
+ def enqueue_batches
+ 1.upto(export_status.batches_count) do |batch_number|
+ batch = pipeline_tracker.batches.find_or_create_by!(batch_number: batch_number) # rubocop:disable CodeReuse/ActiveRecord
+
+ ::BulkImports::PipelineBatchWorker.perform_async(batch.id)
+ end
+ end
end
end