Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'lib/bulk_imports/pipeline/runner.rb')
-rw-r--r--lib/bulk_imports/pipeline/runner.rb26
1 files changed, 22 insertions, 4 deletions
diff --git a/lib/bulk_imports/pipeline/runner.rb b/lib/bulk_imports/pipeline/runner.rb
index 1e2d9152047..666916f8758 100644
--- a/lib/bulk_imports/pipeline/runner.rb
+++ b/lib/bulk_imports/pipeline/runner.rb
@@ -15,7 +15,10 @@ module BulkImports
extracted_data = extracted_data_from
if extracted_data
- extracted_data.each do |entry|
+ extracted_data.each_with_index do |entry, index|
+ raw_entry = entry.dup
+ next if Feature.enabled?(:bulk_import_idempotent_workers) && already_processed?(raw_entry, index)
+
transformers.each do |transformer|
entry = run_pipeline_step(:transformer, transformer.class.name) do
transformer.transform(context, entry)
@@ -25,6 +28,8 @@ module BulkImports
run_pipeline_step(:loader, loader.class.name) do
loader.load(context, entry)
end
+
+ save_processed_entry(raw_entry, index) if Feature.enabled?(:bulk_import_idempotent_workers)
end
tracker.update!(
@@ -73,6 +78,19 @@ module BulkImports
end
end
+ def cache_key
+ batch_number = context.extra[:batch_number] || 0
+
+ "#{self.class.name.underscore}/#{tracker.bulk_import_entity_id}/#{batch_number}"
+ end
+
+ # Overridden by child pipelines with different caching strategies
+ def already_processed?(*)
+ false
+ end
+
+ def save_processed_entry(*); end
+
def after_run(extracted_data)
run if extracted_data.has_next_page?
end
@@ -80,9 +98,9 @@ module BulkImports
def log_and_fail(exception, step)
log_import_failure(exception, step)
- tracker.fail_op!
-
if abort_on_failure?
+ tracker.fail_op!
+
warn(message: 'Aborting entity migration due to pipeline failure')
context.entity.fail_op!
end
@@ -112,7 +130,7 @@ module BulkImports
{
bulk_import_id: context.bulk_import_id,
pipeline_step: step,
- message: 'Pipeline failed'
+ message: 'An object of a pipeline failed to import'
}
)
)