diff options
Diffstat (limited to 'lib/bulk_imports/pipeline/runner.rb')
-rw-r--r-- | lib/bulk_imports/pipeline/runner.rb | 66 |
1 files changed, 66 insertions, 0 deletions
diff --git a/lib/bulk_imports/pipeline/runner.rb b/lib/bulk_imports/pipeline/runner.rb new file mode 100644 index 00000000000..04038e50399 --- /dev/null +++ b/lib/bulk_imports/pipeline/runner.rb @@ -0,0 +1,66 @@ +# frozen_string_literal: true + +module BulkImports + module Pipeline + module Runner + extend ActiveSupport::Concern + + included do + private + + def extractors + @extractors ||= self.class.extractors.map(&method(:instantiate)) + end + + def transformers + @transformers ||= self.class.transformers.map(&method(:instantiate)) + end + + def loaders + @loaders ||= self.class.loaders.map(&method(:instantiate)) + end + + def pipeline_name + @pipeline ||= self.class.name + end + + def instantiate(class_config) + class_config[:klass].new(class_config[:options]) + end + end + + def run(context) + info(context, message: "Pipeline started", pipeline: pipeline_name) + + extractors.each do |extractor| + extractor.extract(context).each do |entry| + info(context, extractor: extractor.class.name) + + transformers.each do |transformer| + info(context, transformer: transformer.class.name) + entry = transformer.transform(context, entry) + end + + loaders.each do |loader| + info(context, loader: loader.class.name) + loader.load(context, entry) + end + end + end + end + + private # rubocop:disable Lint/UselessAccessModifier + + def info(context, extra = {}) + logger.info({ + entity: context.entity.id, + entity_type: context.entity.source_type + }.merge(extra)) + end + + def logger + @logger ||= Gitlab::Import::Logger.build + end + end + end +end |