diff options
Diffstat (limited to 'lib/bulk_imports/pipeline/runner.rb')
-rw-r--r-- | lib/bulk_imports/pipeline/runner.rb | 40 |
1 files changed, 27 insertions, 13 deletions
diff --git a/lib/bulk_imports/pipeline/runner.rb b/lib/bulk_imports/pipeline/runner.rb index 666916f8758..e2a14c35e79 100644 --- a/lib/bulk_imports/pipeline/runner.rb +++ b/lib/bulk_imports/pipeline/runner.rb @@ -17,7 +17,7 @@ module BulkImports if extracted_data extracted_data.each_with_index do |entry, index| raw_entry = entry.dup - next if Feature.enabled?(:bulk_import_idempotent_workers) && already_processed?(raw_entry, index) + next if already_processed?(raw_entry, index) transformers.each do |transformer| entry = run_pipeline_step(:transformer, transformer.class.name) do @@ -25,11 +25,11 @@ module BulkImports end end - run_pipeline_step(:loader, loader.class.name) do + run_pipeline_step(:loader, loader.class.name, entry) do loader.load(context, entry) end - save_processed_entry(raw_entry, index) if Feature.enabled?(:bulk_import_idempotent_workers) + save_processed_entry(raw_entry, index) end tracker.update!( @@ -40,6 +40,14 @@ module BulkImports run_pipeline_step(:after_run) do after_run(extracted_data) end + + # For batches, `#on_finish` is called once within `FinishBatchedPipelineWorker` + # after all batches have completed. + unless tracker.batched? + run_pipeline_step(:on_finish) do + on_finish + end + end end info(message: 'Pipeline finished') @@ -47,9 +55,11 @@ module BulkImports skip!('Skipping pipeline due to failed entity') end + def on_finish; end + private # rubocop:disable Lint/UselessAccessModifier - def run_pipeline_step(step, class_name = nil) + def run_pipeline_step(step, class_name = nil, entry = nil) raise MarkedAsFailedError if context.entity.failed? info(pipeline_step: step, step_class: class_name) @@ -65,11 +75,11 @@ module BulkImports rescue BulkImports::NetworkError => e raise BulkImports::RetryPipelineError.new(e.message, e.retry_delay) if e.retriable?(context.tracker) - log_and_fail(e, step) + log_and_fail(e, step, entry) rescue BulkImports::RetryPipelineError raise rescue StandardError => e - log_and_fail(e, step) + log_and_fail(e, step, entry) end def extracted_data_from @@ -95,8 +105,8 @@ module BulkImports run if extracted_data.has_next_page? end - def log_and_fail(exception, step) - log_import_failure(exception, step) + def log_and_fail(exception, step, entry = nil) + log_import_failure(exception, step, entry) if abort_on_failure? tracker.fail_op! @@ -114,16 +124,21 @@ module BulkImports tracker.skip! end - def log_import_failure(exception, step) + def log_import_failure(exception, step, entry) failure_attributes = { bulk_import_entity_id: context.entity.id, pipeline_class: pipeline, pipeline_step: step, exception_class: exception.class.to_s, - exception_message: exception.message.truncate(255), + exception_message: exception.message, correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id } + if entry + failure_attributes[:source_url] = BulkImports::SourceUrlBuilder.new(context, entry).url + failure_attributes[:source_title] = entry.try(:title) || entry.try(:name) + end + log_exception( exception, log_params( @@ -154,8 +169,7 @@ module BulkImports source_full_path: context.entity.source_full_path, pipeline_class: pipeline, context_extra: context.extra, - source_version: context.entity.bulk_import.source_version_info.to_s, - importer: 'gitlab_migration' + source_version: context.entity.bulk_import.source_version_info.to_s } defaults @@ -164,7 +178,7 @@ module BulkImports end def logger - @logger ||= Gitlab::Import::Logger.build + @logger ||= Logger.build end def log_exception(exception, payload) |