Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGitLab Bot <gitlab-bot@gitlab.com>2023-04-25 15:18:56 +0300
committerGitLab Bot <gitlab-bot@gitlab.com>2023-04-25 15:18:56 +0300
commitd2d913b606702ecefa01f03362602fde256e3f75 (patch)
tree07643306ee63f789188a9133823aac3c92c94dfb /app/workers
parentaf69e63b6655a450849a8fa2640ae6ce5a8db681 (diff)
Add latest changes from gitlab-org/gitlab@master
Diffstat (limited to 'app/workers')
-rw-r--r--app/workers/all_queues.yml18
-rw-r--r--app/workers/bulk_imports/finish_batched_relation_export_worker.rb62
-rw-r--r--app/workers/bulk_imports/relation_batch_export_worker.rb16
-rw-r--r--app/workers/bulk_imports/relation_export_worker.rb9
-rw-r--r--app/workers/pipeline_process_worker.rb12
5 files changed, 114 insertions, 3 deletions
diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml
index 582f5037e48..446cba267ad 100644
--- a/app/workers/all_queues.yml
+++ b/app/workers/all_queues.yml
@@ -2316,6 +2316,15 @@
:weight: 1
:idempotent: true
:tags: []
+- :name: bulk_imports_finish_batched_relation_export
+ :worker_name: BulkImports::FinishBatchedRelationExportWorker
+ :feature_category: :importers
+ :has_external_dependencies: false
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: bulk_imports_pipeline
:worker_name: BulkImports::PipelineWorker
:feature_category: :importers
@@ -2325,6 +2334,15 @@
:weight: 1
:idempotent: false
:tags: []
+- :name: bulk_imports_relation_batch_export
+ :worker_name: BulkImports::RelationBatchExportWorker
+ :feature_category: :importers
+ :has_external_dependencies: false
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: bulk_imports_relation_export
:worker_name: BulkImports::RelationExportWorker
:feature_category: :importers
diff --git a/app/workers/bulk_imports/finish_batched_relation_export_worker.rb b/app/workers/bulk_imports/finish_batched_relation_export_worker.rb
new file mode 100644
index 00000000000..aa7bbffa732
--- /dev/null
+++ b/app/workers/bulk_imports/finish_batched_relation_export_worker.rb
@@ -0,0 +1,62 @@
+# frozen_string_literal: true
+
+module BulkImports
+ class FinishBatchedRelationExportWorker
+ include ApplicationWorker
+
+ idempotent!
+ data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency
+ feature_category :importers
+
+ REENQUEUE_DELAY = 5.seconds
+ TIMEOUT = 6.hours
+
+ def perform(export_id)
+ @export = Export.find_by_id(export_id)
+
+ return unless export
+ return if export.finished? || export.failed?
+ return re_enqueue if export_in_progress?
+ return fail_export! if export_timeout?
+
+ finish_export!
+ end
+
+ private
+
+ attr_reader :export
+
+ def fail_export!
+ expire_cache!
+
+ export.batches.map(&:fail_op!)
+ export.fail_op!
+ end
+
+ def re_enqueue
+ self.class.perform_in(REENQUEUE_DELAY.ago, export.id)
+ end
+
+ def export_timeout?
+ export.updated_at < TIMEOUT.ago
+ end
+
+ def export_in_progress?
+ export.batches.any?(&:started?)
+ end
+
+ def finish_export!
+ expire_cache!
+
+ export.finish!
+ end
+
+ def expire_cache!
+ export.batches.each do |batch|
+ key = BulkImports::BatchedRelationExportService.cache_key(export.id, batch.id)
+
+ Gitlab::Cache::Import::Caching.expire(key, 0)
+ end
+ end
+ end
+end
diff --git a/app/workers/bulk_imports/relation_batch_export_worker.rb b/app/workers/bulk_imports/relation_batch_export_worker.rb
new file mode 100644
index 00000000000..4ce36929e15
--- /dev/null
+++ b/app/workers/bulk_imports/relation_batch_export_worker.rb
@@ -0,0 +1,16 @@
+# frozen_string_literal: true
+
+module BulkImports
+ class RelationBatchExportWorker
+ include ApplicationWorker
+
+ idempotent!
+ data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency
+ feature_category :importers
+ sidekiq_options status_expiration: StuckExportJobsWorker::EXPORT_JOBS_EXPIRATION
+
+ def perform(user_id, batch_id)
+ RelationBatchExportService.new(user_id, batch_id).execute
+ end
+ end
+end
diff --git a/app/workers/bulk_imports/relation_export_worker.rb b/app/workers/bulk_imports/relation_export_worker.rb
index 9d1ed30caf6..531edc6c7a7 100644
--- a/app/workers/bulk_imports/relation_export_worker.rb
+++ b/app/workers/bulk_imports/relation_export_worker.rb
@@ -13,11 +13,16 @@ module BulkImports
sidekiq_options status_expiration: StuckExportJobsWorker::EXPORT_JOBS_EXPIRATION
worker_resource_boundary :memory
- def perform(user_id, portable_id, portable_class, relation)
+ def perform(user_id, portable_id, portable_class, relation, batched = false)
user = User.find(user_id)
portable = portable(portable_id, portable_class)
+ config = BulkImports::FileTransfer.config_for(portable)
- RelationExportService.new(user, portable, relation, jid).execute
+ if Gitlab::Utils.to_boolean(batched) && config.batchable_relation?(relation)
+ BatchedRelationExportService.new(user, portable, relation, jid).execute
+ else
+ RelationExportService.new(user, portable, relation, jid).execute
+ end
end
private
diff --git a/app/workers/pipeline_process_worker.rb b/app/workers/pipeline_process_worker.rb
index b4712aaeafb..caa2591f1ba 100644
--- a/app/workers/pipeline_process_worker.rb
+++ b/app/workers/pipeline_process_worker.rb
@@ -14,7 +14,7 @@ class PipelineProcessWorker
loggable_arguments 1
idempotent!
- deduplicate :until_executing
+ deduplicate :until_executing # Remove when FF `ci_pipeline_process_worker_dedup_until_executed` is removed
def perform(pipeline_id)
Ci::Pipeline.find_by_id(pipeline_id).try do |pipeline|
@@ -23,4 +23,14 @@ class PipelineProcessWorker
.execute
end
end
+
+ # When FF `ci_pipeline_process_worker_dedup_until_executed` is removed, remove this method and
+ # add `deduplicate :until_executed, if_deduplicated: :reschedule_once`, ttl: 1.minute to the class
+ def self.perform_async(pipeline_id)
+ return super unless Feature.enabled?(:ci_pipeline_process_worker_dedup_until_executed)
+
+ set(
+ deduplicate: { strategy: :until_executed, options: { if_deduplicated: :reschedule_once, ttl: 1.minute } }
+ ).perform_async(pipeline_id)
+ end
end