diff options
Diffstat (limited to 'lib/bulk_imports/pipeline/runner.rb')
-rw-r--r-- | lib/bulk_imports/pipeline/runner.rb | 26 |
1 files changed, 17 insertions, 9 deletions
diff --git a/lib/bulk_imports/pipeline/runner.rb b/lib/bulk_imports/pipeline/runner.rb index 11fb9722173..1c4ee154874 100644 --- a/lib/bulk_imports/pipeline/runner.rb +++ b/lib/bulk_imports/pipeline/runner.rb @@ -10,9 +10,11 @@ module BulkImports def run(context) raise MarkedAsFailedError if marked_as_failed?(context) - info(context, message: 'Pipeline started', pipeline_class: pipeline) + info(context, message: 'Pipeline started') - Array.wrap(extracted_data_from(context)).each do |entry| + extracted_data = extracted_data_from(context) + + extracted_data&.each do |entry| transformers.each do |transformer| entry = run_pipeline_step(:transformer, transformer.class.name, context) do transformer.transform(context, entry) @@ -24,25 +26,29 @@ module BulkImports end end - after_run.call(context) if after_run.present? + after_run(context, extracted_data) if respond_to?(:after_run) + + info(context, message: 'Pipeline finished') rescue MarkedAsFailedError log_skip(context) end private # rubocop:disable Lint/UselessAccessModifier - def run_pipeline_step(type, class_name, context) + def run_pipeline_step(step, class_name, context) raise MarkedAsFailedError if marked_as_failed?(context) - info(context, type => class_name) + info(context, pipeline_step: step, step_class: class_name) yield rescue MarkedAsFailedError - log_skip(context, type => class_name) + log_skip(context, step => class_name) rescue => e - log_import_failure(e, context) + log_import_failure(e, step, context) mark_as_failed(context) if abort_on_failure? + + nil end def extracted_data_from(context) @@ -72,10 +78,11 @@ module BulkImports info(context, log) end - def log_import_failure(exception, context) + def log_import_failure(exception, step, context) attributes = { bulk_import_entity_id: context.entity.id, pipeline_class: pipeline, + pipeline_step: step, exception_class: exception.class.to_s, exception_message: exception.message.truncate(255), correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id @@ -95,7 +102,8 @@ module BulkImports def log_base_params(context) { bulk_import_entity_id: context.entity.id, - bulk_import_entity_type: context.entity.source_type + bulk_import_entity_type: context.entity.source_type, + pipeline_class: pipeline } end |