diff options
Diffstat (limited to 'lib/bulk_imports/pipeline/runner.rb')
-rw-r--r-- | lib/bulk_imports/pipeline/runner.rb | 26 |
1 files changed, 22 insertions, 4 deletions
diff --git a/lib/bulk_imports/pipeline/runner.rb b/lib/bulk_imports/pipeline/runner.rb index 1e2d9152047..666916f8758 100644 --- a/lib/bulk_imports/pipeline/runner.rb +++ b/lib/bulk_imports/pipeline/runner.rb @@ -15,7 +15,10 @@ module BulkImports extracted_data = extracted_data_from if extracted_data - extracted_data.each do |entry| + extracted_data.each_with_index do |entry, index| + raw_entry = entry.dup + next if Feature.enabled?(:bulk_import_idempotent_workers) && already_processed?(raw_entry, index) + transformers.each do |transformer| entry = run_pipeline_step(:transformer, transformer.class.name) do transformer.transform(context, entry) @@ -25,6 +28,8 @@ module BulkImports run_pipeline_step(:loader, loader.class.name) do loader.load(context, entry) end + + save_processed_entry(raw_entry, index) if Feature.enabled?(:bulk_import_idempotent_workers) end tracker.update!( @@ -73,6 +78,19 @@ module BulkImports end end + def cache_key + batch_number = context.extra[:batch_number] || 0 + + "#{self.class.name.underscore}/#{tracker.bulk_import_entity_id}/#{batch_number}" + end + + # Overridden by child pipelines with different caching strategies + def already_processed?(*) + false + end + + def save_processed_entry(*); end + def after_run(extracted_data) run if extracted_data.has_next_page? end @@ -80,9 +98,9 @@ module BulkImports def log_and_fail(exception, step) log_import_failure(exception, step) - tracker.fail_op! - if abort_on_failure? + tracker.fail_op! + warn(message: 'Aborting entity migration due to pipeline failure') context.entity.fail_op! end @@ -112,7 +130,7 @@ module BulkImports { bulk_import_id: context.bulk_import_id, pipeline_step: step, - message: 'Pipeline failed' + message: 'An object of a pipeline failed to import' } ) ) |