From 6438df3a1e0fb944485cebf07976160184697d72 Mon Sep 17 00:00:00 2001 From: Robert Speicher Date: Wed, 20 Jan 2021 13:34:23 -0600 Subject: Add latest changes from gitlab-org/gitlab@13-8-stable-ee --- .../common/extractors/graphql_extractor.rb | 12 +++----- lib/bulk_imports/importers/group_importer.rb | 14 ++++++--- lib/bulk_imports/importers/groups_importer.rb | 36 ---------------------- lib/bulk_imports/pipeline.rb | 20 ++++++------ lib/bulk_imports/pipeline/runner.rb | 30 ++++++++---------- 5 files changed, 37 insertions(+), 75 deletions(-) delete mode 100644 lib/bulk_imports/importers/groups_importer.rb (limited to 'lib/bulk_imports') diff --git a/lib/bulk_imports/common/extractors/graphql_extractor.rb b/lib/bulk_imports/common/extractors/graphql_extractor.rb index c0cef61d2b2..af274ee1299 100644 --- a/lib/bulk_imports/common/extractors/graphql_extractor.rb +++ b/lib/bulk_imports/common/extractors/graphql_extractor.rb @@ -11,14 +11,10 @@ module BulkImports def extract(context) client = graphql_client(context) - Enumerator.new do |yielder| - result = client.execute( - client.parse(query.to_s), - query.variables(context.entity) - ) - - yielder << result.original_hash.deep_dup - end + client.execute( + client.parse(query.to_s), + query.variables(context.entity) + ).original_hash.deep_dup end private diff --git a/lib/bulk_imports/importers/group_importer.rb b/lib/bulk_imports/importers/group_importer.rb index 82cb1ca03a2..6e1b86e9515 100644 --- a/lib/bulk_imports/importers/group_importer.rb +++ b/lib/bulk_imports/importers/group_importer.rb @@ -8,7 +8,6 @@ module BulkImports end def execute - entity.start! bulk_import = entity.bulk_import configuration = bulk_import.configuration @@ -18,9 +17,7 @@ module BulkImports configuration: configuration ) - BulkImports::Groups::Pipelines::GroupPipeline.new.run(context) - 'BulkImports::EE::Groups::Pipelines::EpicsPipeline'.constantize.new.run(context) if Gitlab.ee? - BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline.new.run(context) + pipelines.each { |pipeline| pipeline.new.run(context) } entity.finish! end @@ -28,6 +25,15 @@ module BulkImports private attr_reader :entity + + def pipelines + [ + BulkImports::Groups::Pipelines::GroupPipeline, + BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline + ] + end end end end + +BulkImports::Importers::GroupImporter.prepend_if_ee('EE::BulkImports::Importers::GroupImporter') diff --git a/lib/bulk_imports/importers/groups_importer.rb b/lib/bulk_imports/importers/groups_importer.rb deleted file mode 100644 index 8641577ff47..00000000000 --- a/lib/bulk_imports/importers/groups_importer.rb +++ /dev/null @@ -1,36 +0,0 @@ -# frozen_string_literal: true - -module BulkImports - module Importers - class GroupsImporter - def initialize(bulk_import_id) - @bulk_import = BulkImport.find(bulk_import_id) - end - - def execute - bulk_import.start! unless bulk_import.started? - - if entities_to_import.empty? - bulk_import.finish! - else - entities_to_import.each do |entity| - BulkImports::Importers::GroupImporter.new(entity).execute - end - - # A new BulkImportWorker job is enqueued to either - # - Process the new BulkImports::Entity created for the subgroups - # - Or to mark the `bulk_import` as finished. - BulkImportWorker.perform_async(bulk_import.id) - end - end - - private - - attr_reader :bulk_import - - def entities_to_import - @entities_to_import ||= bulk_import.entities.with_status(:created) - end - end - end -end diff --git a/lib/bulk_imports/pipeline.rb b/lib/bulk_imports/pipeline.rb index a44f8fc7193..06b81b5da14 100644 --- a/lib/bulk_imports/pipeline.rb +++ b/lib/bulk_imports/pipeline.rb @@ -10,16 +10,16 @@ module BulkImports private - def extractors - @extractors ||= self.class.extractors.map(&method(:instantiate)) + def extractor + @extractor ||= instantiate(self.class.get_extractor) end def transformers @transformers ||= self.class.transformers.map(&method(:instantiate)) end - def loaders - @loaders ||= self.class.loaders.map(&method(:instantiate)) + def loader + @loaders ||= instantiate(self.class.get_loader) end def after_run @@ -41,7 +41,7 @@ module BulkImports class_methods do def extractor(klass, options = nil) - add_attribute(:extractors, klass, options) + class_attributes[:extractor] = { klass: klass, options: options } end def transformer(klass, options = nil) @@ -49,23 +49,23 @@ module BulkImports end def loader(klass, options = nil) - add_attribute(:loaders, klass, options) + class_attributes[:loader] = { klass: klass, options: options } end def after_run(&block) class_attributes[:after_run] = block end - def extractors - class_attributes[:extractors] + def get_extractor + class_attributes[:extractor] end def transformers class_attributes[:transformers] end - def loaders - class_attributes[:loaders] + def get_loader + class_attributes[:loader] end def after_run_callback diff --git a/lib/bulk_imports/pipeline/runner.rb b/lib/bulk_imports/pipeline/runner.rb index 88b96f0ab6e..11fb9722173 100644 --- a/lib/bulk_imports/pipeline/runner.rb +++ b/lib/bulk_imports/pipeline/runner.rb @@ -12,25 +12,15 @@ module BulkImports info(context, message: 'Pipeline started', pipeline_class: pipeline) - extractors.each do |extractor| - data = run_pipeline_step(:extractor, extractor.class.name, context) do - extractor.extract(context) + Array.wrap(extracted_data_from(context)).each do |entry| + transformers.each do |transformer| + entry = run_pipeline_step(:transformer, transformer.class.name, context) do + transformer.transform(context, entry) + end end - if data && data.respond_to?(:each) - data.each do |entry| - transformers.each do |transformer| - entry = run_pipeline_step(:transformer, transformer.class.name, context) do - transformer.transform(context, entry) - end - end - - loaders.each do |loader| - run_pipeline_step(:loader, loader.class.name, context) do - loader.load(context, entry) - end - end - end + run_pipeline_step(:loader, loader.class.name, context) do + loader.load(context, entry) end end @@ -55,6 +45,12 @@ module BulkImports mark_as_failed(context) if abort_on_failure? end + def extracted_data_from(context) + run_pipeline_step(:extractor, extractor.class.name, context) do + extractor.extract(context) + end + end + def mark_as_failed(context) warn(context, message: 'Pipeline failed', pipeline_class: pipeline) -- cgit v1.2.3