Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'app/workers/bulk_imports/pipeline_worker.rb')
-rw-r--r--app/workers/bulk_imports/pipeline_worker.rb67
1 files changed, 48 insertions, 19 deletions
diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb
index 24185f43795..2c1d28b33c5 100644
--- a/app/workers/bulk_imports/pipeline_worker.rb
+++ b/app/workers/bulk_imports/pipeline_worker.rb
@@ -1,43 +1,68 @@
# frozen_string_literal: true
module BulkImports
- class PipelineWorker # rubocop:disable Scalability/IdempotentWorker
+ class PipelineWorker
include ApplicationWorker
include ExclusiveLeaseGuard
FILE_EXTRACTION_PIPELINE_PERFORM_DELAY = 10.seconds
+ DEFER_ON_HEALTH_DELAY = 5.minutes
+
data_consistency :always
feature_category :importers
- sidekiq_options retry: false, dead: false
+ sidekiq_options dead: false, retry: 3
worker_has_external_dependencies!
deduplicate :until_executing
worker_resource_boundary :memory
+ idempotent!
version 2
+ sidekiq_retries_exhausted do |msg, exception|
+ new.perform_failure(msg['args'][0], msg['args'][2], exception)
+ end
+
+ defer_on_database_health_signal(:gitlab_main, [], DEFER_ON_HEALTH_DELAY) do |job_args, schema, tables|
+ pipeline_tracker = ::BulkImports::Tracker.find(job_args.first)
+ pipeline_schema = ::BulkImports::PipelineSchemaInfo.new(
+ pipeline_tracker.pipeline_class,
+ pipeline_tracker.entity.portable_class
+ )
+
+ if pipeline_schema.db_schema && pipeline_schema.db_table
+ schema = pipeline_schema.db_schema
+ tables = [pipeline_schema.db_table]
+ end
+
+ [schema, tables]
+ end
+
+ def self.defer_on_database_health_signal?
+ Feature.enabled?(:bulk_import_deferred_workers)
+ end
+
# Keep _stage parameter for backwards compatibility.
def perform(pipeline_tracker_id, _stage, entity_id)
@entity = ::BulkImports::Entity.find(entity_id)
@pipeline_tracker = ::BulkImports::Tracker.find(pipeline_tracker_id)
+ log_extra_metadata_on_done(:pipeline_class, @pipeline_tracker.pipeline_name)
+
try_obtain_lease do
- if pipeline_tracker.enqueued?
+ if pipeline_tracker.enqueued? || pipeline_tracker.started?
logger.info(log_attributes(message: 'Pipeline starting'))
run
- else
- message = "Pipeline in #{pipeline_tracker.human_status_name} state instead of expected enqueued state"
-
- logger.error(log_attributes(message: message))
-
- fail_tracker(StandardError.new(message)) unless pipeline_tracker.finished? || pipeline_tracker.skipped?
end
end
- ensure
- # This is needed for in-flight migrations.
- # It will be remove in https://gitlab.com/gitlab-org/gitlab/-/issues/426299
- ::BulkImports::EntityWorker.perform_async(entity_id) if job_version.nil?
+ end
+
+ def perform_failure(pipeline_tracker_id, entity_id, exception)
+ @entity = ::BulkImports::Entity.find(entity_id)
+ @pipeline_tracker = ::BulkImports::Tracker.find(pipeline_tracker_id)
+
+ fail_tracker(exception)
end
private
@@ -53,20 +78,22 @@ module BulkImports
return re_enqueue if export_empty? || export_started?
if file_extraction_pipeline? && export_status.batched?
+ log_extra_metadata_on_done(:batched, true)
+
pipeline_tracker.update!(status_event: 'start', jid: jid, batched: true)
return pipeline_tracker.finish! if export_status.batches_count < 1
enqueue_batches
else
+ log_extra_metadata_on_done(:batched, false)
+
pipeline_tracker.update!(status_event: 'start', jid: jid)
pipeline_tracker.pipeline_class.new(context).run
pipeline_tracker.finish!
end
rescue BulkImports::RetryPipelineError => e
retry_tracker(e)
- rescue StandardError => e
- fail_tracker(e)
end
def source_version
@@ -85,16 +112,18 @@ module BulkImports
pipeline_class: pipeline_tracker.pipeline_name,
pipeline_step: 'pipeline_worker_run',
exception_class: exception.class.to_s,
- exception_message: exception.message.truncate(255),
+ exception_message: exception.message,
correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id
)
end
def logger
- @logger ||= Gitlab::Import::Logger.build
+ @logger ||= Logger.build
end
def re_enqueue(delay = FILE_EXTRACTION_PIPELINE_PERFORM_DELAY)
+ log_extra_metadata_on_done(:re_enqueue, true)
+
self.class.perform_in(
delay,
pipeline_tracker.id,
@@ -159,10 +188,10 @@ module BulkImports
bulk_import_entity_type: entity.source_type,
source_full_path: entity.source_full_path,
pipeline_tracker_id: pipeline_tracker.id,
- pipeline_name: pipeline_tracker.pipeline_name,
+ pipeline_class: pipeline_tracker.pipeline_name,
pipeline_tracker_state: pipeline_tracker.human_status_name,
source_version: source_version,
- importer: 'gitlab_migration'
+ importer: Logger::IMPORTER_NAME
}.merge(extra)
)
end