Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'lib/bulk_imports/pipeline/runner.rb')
-rw-r--r--lib/bulk_imports/pipeline/runner.rb40
1 files changed, 27 insertions, 13 deletions
diff --git a/lib/bulk_imports/pipeline/runner.rb b/lib/bulk_imports/pipeline/runner.rb
index 666916f8758..e2a14c35e79 100644
--- a/lib/bulk_imports/pipeline/runner.rb
+++ b/lib/bulk_imports/pipeline/runner.rb
@@ -17,7 +17,7 @@ module BulkImports
if extracted_data
extracted_data.each_with_index do |entry, index|
raw_entry = entry.dup
- next if Feature.enabled?(:bulk_import_idempotent_workers) && already_processed?(raw_entry, index)
+ next if already_processed?(raw_entry, index)
transformers.each do |transformer|
entry = run_pipeline_step(:transformer, transformer.class.name) do
@@ -25,11 +25,11 @@ module BulkImports
end
end
- run_pipeline_step(:loader, loader.class.name) do
+ run_pipeline_step(:loader, loader.class.name, entry) do
loader.load(context, entry)
end
- save_processed_entry(raw_entry, index) if Feature.enabled?(:bulk_import_idempotent_workers)
+ save_processed_entry(raw_entry, index)
end
tracker.update!(
@@ -40,6 +40,14 @@ module BulkImports
run_pipeline_step(:after_run) do
after_run(extracted_data)
end
+
+ # For batches, `#on_finish` is called once within `FinishBatchedPipelineWorker`
+ # after all batches have completed.
+ unless tracker.batched?
+ run_pipeline_step(:on_finish) do
+ on_finish
+ end
+ end
end
info(message: 'Pipeline finished')
@@ -47,9 +55,11 @@ module BulkImports
skip!('Skipping pipeline due to failed entity')
end
+ def on_finish; end
+
private # rubocop:disable Lint/UselessAccessModifier
- def run_pipeline_step(step, class_name = nil)
+ def run_pipeline_step(step, class_name = nil, entry = nil)
raise MarkedAsFailedError if context.entity.failed?
info(pipeline_step: step, step_class: class_name)
@@ -65,11 +75,11 @@ module BulkImports
rescue BulkImports::NetworkError => e
raise BulkImports::RetryPipelineError.new(e.message, e.retry_delay) if e.retriable?(context.tracker)
- log_and_fail(e, step)
+ log_and_fail(e, step, entry)
rescue BulkImports::RetryPipelineError
raise
rescue StandardError => e
- log_and_fail(e, step)
+ log_and_fail(e, step, entry)
end
def extracted_data_from
@@ -95,8 +105,8 @@ module BulkImports
run if extracted_data.has_next_page?
end
- def log_and_fail(exception, step)
- log_import_failure(exception, step)
+ def log_and_fail(exception, step, entry = nil)
+ log_import_failure(exception, step, entry)
if abort_on_failure?
tracker.fail_op!
@@ -114,16 +124,21 @@ module BulkImports
tracker.skip!
end
- def log_import_failure(exception, step)
+ def log_import_failure(exception, step, entry)
failure_attributes = {
bulk_import_entity_id: context.entity.id,
pipeline_class: pipeline,
pipeline_step: step,
exception_class: exception.class.to_s,
- exception_message: exception.message.truncate(255),
+ exception_message: exception.message,
correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id
}
+ if entry
+ failure_attributes[:source_url] = BulkImports::SourceUrlBuilder.new(context, entry).url
+ failure_attributes[:source_title] = entry.try(:title) || entry.try(:name)
+ end
+
log_exception(
exception,
log_params(
@@ -154,8 +169,7 @@ module BulkImports
source_full_path: context.entity.source_full_path,
pipeline_class: pipeline,
context_extra: context.extra,
- source_version: context.entity.bulk_import.source_version_info.to_s,
- importer: 'gitlab_migration'
+ source_version: context.entity.bulk_import.source_version_info.to_s
}
defaults
@@ -164,7 +178,7 @@ module BulkImports
end
def logger
- @logger ||= Gitlab::Import::Logger.build
+ @logger ||= Logger.build
end
def log_exception(exception, payload)