Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'app/workers/bulk_imports/pipeline_batch_worker.rb')
-rw-r--r--app/workers/bulk_imports/pipeline_batch_worker.rb84
1 files changed, 72 insertions, 12 deletions
diff --git a/app/workers/bulk_imports/pipeline_batch_worker.rb b/app/workers/bulk_imports/pipeline_batch_worker.rb
index 6230d517641..1485275e616 100644
--- a/app/workers/bulk_imports/pipeline_batch_worker.rb
+++ b/app/workers/bulk_imports/pipeline_batch_worker.rb
@@ -1,26 +1,65 @@
# frozen_string_literal: true
module BulkImports
- class PipelineBatchWorker # rubocop:disable Scalability/IdempotentWorker
+ class PipelineBatchWorker
include ApplicationWorker
include ExclusiveLeaseGuard
+ DEFER_ON_HEALTH_DELAY = 5.minutes
+
data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency
feature_category :importers
- sidekiq_options retry: false, dead: false
+ sidekiq_options dead: false, retry: 3
worker_has_external_dependencies!
worker_resource_boundary :memory
+ idempotent!
+
+ sidekiq_retries_exhausted do |msg, exception|
+ new.perform_failure(msg['args'].first, exception)
+ end
+
+ defer_on_database_health_signal(:gitlab_main, [], DEFER_ON_HEALTH_DELAY) do |job_args, schema, tables|
+ batch = ::BulkImports::BatchTracker.find(job_args.first)
+ pipeline_tracker = batch.tracker
+ pipeline_schema = ::BulkImports::PipelineSchemaInfo.new(
+ pipeline_tracker.pipeline_class,
+ pipeline_tracker.entity.portable_class
+ )
+
+ if pipeline_schema.db_schema && pipeline_schema.db_table
+ schema = pipeline_schema.db_schema
+ tables = [pipeline_schema.db_table]
+ end
+
+ [schema, tables]
+ end
+
+ def self.defer_on_database_health_signal?
+ Feature.enabled?(:bulk_import_deferred_workers)
+ end
def perform(batch_id)
@batch = ::BulkImports::BatchTracker.find(batch_id)
+
@tracker = @batch.tracker
@pending_retry = false
+ return unless process_batch?
+
+ log_extra_metadata_on_done(:pipeline_class, @tracker.pipeline_name)
+
try_obtain_lease { run }
ensure
::BulkImports::FinishBatchedPipelineWorker.perform_async(tracker.id) unless pending_retry
end
+ def perform_failure(batch_id, exception)
+ @batch = ::BulkImports::BatchTracker.find(batch_id)
+ @tracker = @batch.tracker
+
+ fail_batch(exception)
+ end
+
private
attr_reader :batch, :tracker, :pending_retry
@@ -28,35 +67,31 @@ module BulkImports
def run
return batch.skip! if tracker.failed? || tracker.finished?
+ logger.info(log_attributes(message: 'Batch tracker started'))
batch.start!
tracker.pipeline_class.new(context).run
batch.finish!
+ logger.info(log_attributes(message: 'Batch tracker finished'))
rescue BulkImports::RetryPipelineError => e
@pending_retry = true
retry_batch(e)
- rescue StandardError => e
- fail_batch(e)
end
def fail_batch(exception)
batch.fail_op!
- Gitlab::ErrorTracking.track_exception(
- exception,
- batch_id: batch.id,
- tracker_id: tracker.id,
- pipeline_class: tracker.pipeline_name,
- pipeline_step: 'pipeline_batch_worker_run'
- )
+ Gitlab::ErrorTracking.track_exception(exception, log_attributes(message: 'Batch tracker failed'))
BulkImports::Failure.create(
bulk_import_entity_id: batch.tracker.entity.id,
pipeline_class: tracker.pipeline_name,
pipeline_step: 'pipeline_batch_worker_run',
exception_class: exception.class.to_s,
- exception_message: exception.message.truncate(255),
+ exception_message: exception.message,
correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id
)
+
+ ::BulkImports::FinishBatchedPipelineWorker.perform_async(tracker.id)
end
def context
@@ -78,7 +113,32 @@ module BulkImports
end
def re_enqueue(delay = FILE_EXTRACTION_PIPELINE_PERFORM_DELAY)
+ log_extra_metadata_on_done(:re_enqueue, true)
+
self.class.perform_in(delay, batch.id)
end
+
+ def process_batch?
+ batch.created? || batch.started?
+ end
+
+ def logger
+ @logger ||= Logger.build
+ end
+
+ def log_attributes(extra = {})
+ structured_payload(
+ {
+ batch_id: batch.id,
+ batch_number: batch.batch_number,
+ tracker_id: tracker.id,
+ bulk_import_id: tracker.entity.bulk_import_id,
+ bulk_import_entity_id: tracker.entity.id,
+ pipeline_class: tracker.pipeline_name,
+ pipeline_step: 'pipeline_batch_worker_run',
+ importer: Logger::IMPORTER_NAME
+ }.merge(extra)
+ )
+ end
end
end