diff options
author | GitLab Bot <gitlab-bot@gitlab.com> | 2021-02-18 13:34:06 +0300 |
---|---|---|
committer | GitLab Bot <gitlab-bot@gitlab.com> | 2021-02-18 13:34:06 +0300 |
commit | 859a6fb938bb9ee2a317c46dfa4fcc1af49608f0 (patch) | |
tree | d7f2700abe6b4ffcb2dcfc80631b2d87d0609239 /lib/bulk_imports/pipeline/runner.rb | |
parent | 446d496a6d000c73a304be52587cd9bbc7493136 (diff) |
Add latest changes from gitlab-org/gitlab@13-9-stable-eev13.9.0-rc42
Diffstat (limited to 'lib/bulk_imports/pipeline/runner.rb')
-rw-r--r-- | lib/bulk_imports/pipeline/runner.rb | 72 |
1 files changed, 43 insertions, 29 deletions
diff --git a/lib/bulk_imports/pipeline/runner.rb b/lib/bulk_imports/pipeline/runner.rb index 11fb9722173..d39f4121b51 100644 --- a/lib/bulk_imports/pipeline/runner.rb +++ b/lib/bulk_imports/pipeline/runner.rb @@ -7,75 +7,86 @@ module BulkImports MarkedAsFailedError = Class.new(StandardError) - def run(context) - raise MarkedAsFailedError if marked_as_failed?(context) + def run + raise MarkedAsFailedError if marked_as_failed? - info(context, message: 'Pipeline started', pipeline_class: pipeline) + info(message: 'Pipeline started') - Array.wrap(extracted_data_from(context)).each do |entry| + extracted_data = extracted_data_from + + extracted_data&.each do |entry| transformers.each do |transformer| - entry = run_pipeline_step(:transformer, transformer.class.name, context) do + entry = run_pipeline_step(:transformer, transformer.class.name) do transformer.transform(context, entry) end end - run_pipeline_step(:loader, loader.class.name, context) do + run_pipeline_step(:loader, loader.class.name) do loader.load(context, entry) end end - after_run.call(context) if after_run.present? + if respond_to?(:after_run) + run_pipeline_step(:after_run) do + after_run(extracted_data) + end + end + + info(message: 'Pipeline finished') rescue MarkedAsFailedError - log_skip(context) + log_skip end private # rubocop:disable Lint/UselessAccessModifier - def run_pipeline_step(type, class_name, context) - raise MarkedAsFailedError if marked_as_failed?(context) + def run_pipeline_step(step, class_name = nil) + raise MarkedAsFailedError if marked_as_failed? - info(context, type => class_name) + info(pipeline_step: step, step_class: class_name) yield rescue MarkedAsFailedError - log_skip(context, type => class_name) + log_skip(step => class_name) rescue => e - log_import_failure(e, context) + log_import_failure(e, step) - mark_as_failed(context) if abort_on_failure? + mark_as_failed if abort_on_failure? + + nil end - def extracted_data_from(context) - run_pipeline_step(:extractor, extractor.class.name, context) do + def extracted_data_from + run_pipeline_step(:extractor, extractor.class.name) do extractor.extract(context) end end - def mark_as_failed(context) - warn(context, message: 'Pipeline failed', pipeline_class: pipeline) + def mark_as_failed + warn(message: 'Pipeline failed', pipeline_class: pipeline) context.entity.fail_op! end - def marked_as_failed?(context) + def marked_as_failed? return true if context.entity.failed? false end - def log_skip(context, extra = {}) + def log_skip(extra = {}) log = { message: 'Skipping due to failed pipeline status', pipeline_class: pipeline }.merge(extra) - info(context, log) + info(log) end - def log_import_failure(exception, context) + def log_import_failure(exception, step) attributes = { bulk_import_entity_id: context.entity.id, pipeline_class: pipeline, + pipeline_step: step, exception_class: exception.class.to_s, exception_message: exception.message.truncate(255), correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id @@ -84,19 +95,22 @@ module BulkImports BulkImports::Failure.create(attributes) end - def warn(context, extra = {}) - logger.warn(log_base_params(context).merge(extra)) + def warn(extra = {}) + logger.warn(log_params(extra)) end - def info(context, extra = {}) - logger.info(log_base_params(context).merge(extra)) + def info(extra = {}) + logger.info(log_params(extra)) end - def log_base_params(context) - { + def log_params(extra) + defaults = { bulk_import_entity_id: context.entity.id, - bulk_import_entity_type: context.entity.source_type + bulk_import_entity_type: context.entity.source_type, + pipeline_class: pipeline } + + defaults.merge(extra).compact end def logger |