diff options
Diffstat (limited to 'lib/bulk_imports/pipeline')
-rw-r--r-- | lib/bulk_imports/pipeline/context.rb | 31 | ||||
-rw-r--r-- | lib/bulk_imports/pipeline/extracted_data.rb | 26 | ||||
-rw-r--r-- | lib/bulk_imports/pipeline/runner.rb | 72 |
3 files changed, 82 insertions, 47 deletions
diff --git a/lib/bulk_imports/pipeline/context.rb b/lib/bulk_imports/pipeline/context.rb index ad19f5cad7d..dd121b2dbed 100644 --- a/lib/bulk_imports/pipeline/context.rb +++ b/lib/bulk_imports/pipeline/context.rb @@ -3,30 +3,25 @@ module BulkImports module Pipeline class Context - include Gitlab::Utils::LazyAttributes + attr_reader :entity, :bulk_import + attr_accessor :extra - Attribute = Struct.new(:name, :type) - - PIPELINE_ATTRIBUTES = [ - Attribute.new(:current_user, User), - Attribute.new(:entity, ::BulkImports::Entity), - Attribute.new(:configuration, ::BulkImports::Configuration) - ].freeze - - def initialize(args) - assign_attributes(args) + def initialize(entity, extra = {}) + @entity = entity + @bulk_import = entity.bulk_import + @extra = extra end - private + def group + entity.group + end - PIPELINE_ATTRIBUTES.each do |attr| - lazy_attr_reader attr.name, type: attr.type + def current_user + bulk_import.user end - def assign_attributes(values) - values.slice(*PIPELINE_ATTRIBUTES.map(&:name)).each do |name, value| - instance_variable_set("@#{name}", value) - end + def configuration + bulk_import.configuration end end end diff --git a/lib/bulk_imports/pipeline/extracted_data.rb b/lib/bulk_imports/pipeline/extracted_data.rb new file mode 100644 index 00000000000..685a91a4afe --- /dev/null +++ b/lib/bulk_imports/pipeline/extracted_data.rb @@ -0,0 +1,26 @@ +# frozen_string_literal: true + +module BulkImports + module Pipeline + class ExtractedData + attr_reader :data + + def initialize(data: nil, page_info: {}) + @data = Array.wrap(data) + @page_info = page_info + end + + def has_next_page? + @page_info['has_next_page'] + end + + def next_page + @page_info['end_cursor'] + end + + def each(&block) + data.each(&block) + end + end + end +end diff --git a/lib/bulk_imports/pipeline/runner.rb b/lib/bulk_imports/pipeline/runner.rb index 11fb9722173..d39f4121b51 100644 --- a/lib/bulk_imports/pipeline/runner.rb +++ b/lib/bulk_imports/pipeline/runner.rb @@ -7,75 +7,86 @@ module BulkImports MarkedAsFailedError = Class.new(StandardError) - def run(context) - raise MarkedAsFailedError if marked_as_failed?(context) + def run + raise MarkedAsFailedError if marked_as_failed? - info(context, message: 'Pipeline started', pipeline_class: pipeline) + info(message: 'Pipeline started') - Array.wrap(extracted_data_from(context)).each do |entry| + extracted_data = extracted_data_from + + extracted_data&.each do |entry| transformers.each do |transformer| - entry = run_pipeline_step(:transformer, transformer.class.name, context) do + entry = run_pipeline_step(:transformer, transformer.class.name) do transformer.transform(context, entry) end end - run_pipeline_step(:loader, loader.class.name, context) do + run_pipeline_step(:loader, loader.class.name) do loader.load(context, entry) end end - after_run.call(context) if after_run.present? + if respond_to?(:after_run) + run_pipeline_step(:after_run) do + after_run(extracted_data) + end + end + + info(message: 'Pipeline finished') rescue MarkedAsFailedError - log_skip(context) + log_skip end private # rubocop:disable Lint/UselessAccessModifier - def run_pipeline_step(type, class_name, context) - raise MarkedAsFailedError if marked_as_failed?(context) + def run_pipeline_step(step, class_name = nil) + raise MarkedAsFailedError if marked_as_failed? - info(context, type => class_name) + info(pipeline_step: step, step_class: class_name) yield rescue MarkedAsFailedError - log_skip(context, type => class_name) + log_skip(step => class_name) rescue => e - log_import_failure(e, context) + log_import_failure(e, step) - mark_as_failed(context) if abort_on_failure? + mark_as_failed if abort_on_failure? + + nil end - def extracted_data_from(context) - run_pipeline_step(:extractor, extractor.class.name, context) do + def extracted_data_from + run_pipeline_step(:extractor, extractor.class.name) do extractor.extract(context) end end - def mark_as_failed(context) - warn(context, message: 'Pipeline failed', pipeline_class: pipeline) + def mark_as_failed + warn(message: 'Pipeline failed', pipeline_class: pipeline) context.entity.fail_op! end - def marked_as_failed?(context) + def marked_as_failed? return true if context.entity.failed? false end - def log_skip(context, extra = {}) + def log_skip(extra = {}) log = { message: 'Skipping due to failed pipeline status', pipeline_class: pipeline }.merge(extra) - info(context, log) + info(log) end - def log_import_failure(exception, context) + def log_import_failure(exception, step) attributes = { bulk_import_entity_id: context.entity.id, pipeline_class: pipeline, + pipeline_step: step, exception_class: exception.class.to_s, exception_message: exception.message.truncate(255), correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id @@ -84,19 +95,22 @@ module BulkImports BulkImports::Failure.create(attributes) end - def warn(context, extra = {}) - logger.warn(log_base_params(context).merge(extra)) + def warn(extra = {}) + logger.warn(log_params(extra)) end - def info(context, extra = {}) - logger.info(log_base_params(context).merge(extra)) + def info(extra = {}) + logger.info(log_params(extra)) end - def log_base_params(context) - { + def log_params(extra) + defaults = { bulk_import_entity_id: context.entity.id, - bulk_import_entity_type: context.entity.source_type + bulk_import_entity_type: context.entity.source_type, + pipeline_class: pipeline } + + defaults.merge(extra).compact end def logger |