Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'app/workers/bulk_imports/entity_worker.rb')
-rw-r--r--app/workers/bulk_imports/entity_worker.rb114
1 files changed, 51 insertions, 63 deletions
diff --git a/app/workers/bulk_imports/entity_worker.rb b/app/workers/bulk_imports/entity_worker.rb
index fb99d63d06e..9b60dcdeb8a 100644
--- a/app/workers/bulk_imports/entity_worker.rb
+++ b/app/workers/bulk_imports/entity_worker.rb
@@ -1,97 +1,68 @@
# frozen_string_literal: true
module BulkImports
- class EntityWorker # rubocop:disable Scalability/IdempotentWorker
+ class EntityWorker
include ApplicationWorker
idempotent!
- deduplicate :until_executing
+ deduplicate :until_executed
data_consistency :always
feature_category :importers
sidekiq_options retry: false, dead: false
worker_has_external_dependencies!
- def perform(entity_id, current_stage = nil)
+ PERFORM_DELAY = 5.seconds
+
+ # Keep `_current_stage` parameter for backwards compatibility.
+ # The parameter will be remove in https://gitlab.com/gitlab-org/gitlab/-/issues/426311
+ def perform(entity_id, _current_stage = nil)
@entity = ::BulkImports::Entity.find(entity_id)
- if stage_running?(entity_id, current_stage)
- logger.info(
- structured_payload(
- bulk_import_entity_id: entity_id,
- bulk_import_id: entity.bulk_import_id,
- bulk_import_entity_type: entity.source_type,
- source_full_path: entity.source_full_path,
- current_stage: current_stage,
- message: 'Stage running',
- source_version: source_version,
- importer: 'gitlab_migration'
- )
- )
+ return unless @entity.started?
- return
+ if running_tracker.present?
+ log_info(message: 'Stage running', entity_stage: running_tracker.stage)
+ else
+ start_next_stage
end
- logger.info(
- structured_payload(
- bulk_import_entity_id: entity_id,
- bulk_import_id: entity.bulk_import_id,
- bulk_import_entity_type: entity.source_type,
- source_full_path: entity.source_full_path,
- current_stage: current_stage,
- message: 'Stage starting',
- source_version: source_version,
- importer: 'gitlab_migration'
- )
- )
-
- next_pipeline_trackers_for(entity_id).each do |pipeline_tracker|
- BulkImports::PipelineWorker.perform_async(
- pipeline_tracker.id,
- pipeline_tracker.stage,
- entity_id
- )
- end
+ re_enqueue
rescue StandardError => e
- log_exception(e,
- {
- bulk_import_entity_id: entity_id,
- bulk_import_id: entity.bulk_import_id,
- bulk_import_entity_type: entity.source_type,
- source_full_path: entity.source_full_path,
- current_stage: current_stage,
- message: 'Entity failed',
- source_version: source_version,
- importer: 'gitlab_migration'
- }
- )
-
- Gitlab::ErrorTracking.track_exception(
- e,
- bulk_import_entity_id: entity_id,
- bulk_import_id: entity.bulk_import_id,
- bulk_import_entity_type: entity.source_type,
- source_full_path: entity.source_full_path,
- source_version: source_version,
- importer: 'gitlab_migration'
- )
+ Gitlab::ErrorTracking.track_exception(e, log_params(message: 'Entity failed'))
- entity.fail_op!
+ @entity.fail_op!
end
private
attr_reader :entity
- def stage_running?(entity_id, stage)
- return unless stage
+ def re_enqueue
+ BulkImports::EntityWorker.perform_in(PERFORM_DELAY, entity.id)
+ end
- BulkImports::Tracker.stage_running?(entity_id, stage)
+ def running_tracker
+ @running_tracker ||= BulkImports::Tracker.running_trackers(entity.id).first
end
def next_pipeline_trackers_for(entity_id)
BulkImports::Tracker.next_pipeline_trackers_for(entity_id).update(status_event: 'enqueue')
end
+ def start_next_stage
+ next_pipeline_trackers = next_pipeline_trackers_for(entity.id)
+
+ next_pipeline_trackers.each_with_index do |pipeline_tracker, index|
+ log_info(message: 'Stage starting', entity_stage: pipeline_tracker.stage) if index == 0
+
+ BulkImports::PipelineWorker.perform_async(
+ pipeline_tracker.id,
+ pipeline_tracker.stage,
+ entity.id
+ )
+ end
+ end
+
def source_version
entity.bulk_import.source_version_info.to_s
end
@@ -105,5 +76,22 @@ module BulkImports
logger.error(structured_payload(payload))
end
+
+ def log_info(payload)
+ logger.info(structured_payload(log_params(payload)))
+ end
+
+ def log_params(extra)
+ defaults = {
+ bulk_import_entity_id: entity.id,
+ bulk_import_id: entity.bulk_import_id,
+ bulk_import_entity_type: entity.source_type,
+ source_full_path: entity.source_full_path,
+ source_version: source_version,
+ importer: 'gitlab_migration'
+ }
+
+ defaults.merge(extra)
+ end
end
end