diff options
Diffstat (limited to 'app/workers/bulk_imports/pipeline_worker.rb')
-rw-r--r-- | app/workers/bulk_imports/pipeline_worker.rb | 67 |
1 files changed, 48 insertions, 19 deletions
diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb index 24185f43795..2c1d28b33c5 100644 --- a/app/workers/bulk_imports/pipeline_worker.rb +++ b/app/workers/bulk_imports/pipeline_worker.rb @@ -1,43 +1,68 @@ # frozen_string_literal: true module BulkImports - class PipelineWorker # rubocop:disable Scalability/IdempotentWorker + class PipelineWorker include ApplicationWorker include ExclusiveLeaseGuard FILE_EXTRACTION_PIPELINE_PERFORM_DELAY = 10.seconds + DEFER_ON_HEALTH_DELAY = 5.minutes + data_consistency :always feature_category :importers - sidekiq_options retry: false, dead: false + sidekiq_options dead: false, retry: 3 worker_has_external_dependencies! deduplicate :until_executing worker_resource_boundary :memory + idempotent! version 2 + sidekiq_retries_exhausted do |msg, exception| + new.perform_failure(msg['args'][0], msg['args'][2], exception) + end + + defer_on_database_health_signal(:gitlab_main, [], DEFER_ON_HEALTH_DELAY) do |job_args, schema, tables| + pipeline_tracker = ::BulkImports::Tracker.find(job_args.first) + pipeline_schema = ::BulkImports::PipelineSchemaInfo.new( + pipeline_tracker.pipeline_class, + pipeline_tracker.entity.portable_class + ) + + if pipeline_schema.db_schema && pipeline_schema.db_table + schema = pipeline_schema.db_schema + tables = [pipeline_schema.db_table] + end + + [schema, tables] + end + + def self.defer_on_database_health_signal? + Feature.enabled?(:bulk_import_deferred_workers) + end + # Keep _stage parameter for backwards compatibility. def perform(pipeline_tracker_id, _stage, entity_id) @entity = ::BulkImports::Entity.find(entity_id) @pipeline_tracker = ::BulkImports::Tracker.find(pipeline_tracker_id) + log_extra_metadata_on_done(:pipeline_class, @pipeline_tracker.pipeline_name) + try_obtain_lease do - if pipeline_tracker.enqueued? + if pipeline_tracker.enqueued? || pipeline_tracker.started? logger.info(log_attributes(message: 'Pipeline starting')) run - else - message = "Pipeline in #{pipeline_tracker.human_status_name} state instead of expected enqueued state" - - logger.error(log_attributes(message: message)) - - fail_tracker(StandardError.new(message)) unless pipeline_tracker.finished? || pipeline_tracker.skipped? end end - ensure - # This is needed for in-flight migrations. - # It will be remove in https://gitlab.com/gitlab-org/gitlab/-/issues/426299 - ::BulkImports::EntityWorker.perform_async(entity_id) if job_version.nil? + end + + def perform_failure(pipeline_tracker_id, entity_id, exception) + @entity = ::BulkImports::Entity.find(entity_id) + @pipeline_tracker = ::BulkImports::Tracker.find(pipeline_tracker_id) + + fail_tracker(exception) end private @@ -53,20 +78,22 @@ module BulkImports return re_enqueue if export_empty? || export_started? if file_extraction_pipeline? && export_status.batched? + log_extra_metadata_on_done(:batched, true) + pipeline_tracker.update!(status_event: 'start', jid: jid, batched: true) return pipeline_tracker.finish! if export_status.batches_count < 1 enqueue_batches else + log_extra_metadata_on_done(:batched, false) + pipeline_tracker.update!(status_event: 'start', jid: jid) pipeline_tracker.pipeline_class.new(context).run pipeline_tracker.finish! end rescue BulkImports::RetryPipelineError => e retry_tracker(e) - rescue StandardError => e - fail_tracker(e) end def source_version @@ -85,16 +112,18 @@ module BulkImports pipeline_class: pipeline_tracker.pipeline_name, pipeline_step: 'pipeline_worker_run', exception_class: exception.class.to_s, - exception_message: exception.message.truncate(255), + exception_message: exception.message, correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id ) end def logger - @logger ||= Gitlab::Import::Logger.build + @logger ||= Logger.build end def re_enqueue(delay = FILE_EXTRACTION_PIPELINE_PERFORM_DELAY) + log_extra_metadata_on_done(:re_enqueue, true) + self.class.perform_in( delay, pipeline_tracker.id, @@ -159,10 +188,10 @@ module BulkImports bulk_import_entity_type: entity.source_type, source_full_path: entity.source_full_path, pipeline_tracker_id: pipeline_tracker.id, - pipeline_name: pipeline_tracker.pipeline_name, + pipeline_class: pipeline_tracker.pipeline_name, pipeline_tracker_state: pipeline_tracker.human_status_name, source_version: source_version, - importer: 'gitlab_migration' + importer: Logger::IMPORTER_NAME }.merge(extra) ) end |