diff options
Diffstat (limited to 'app/services/bulk_imports/process_service.rb')
-rw-r--r-- | app/services/bulk_imports/process_service.rb | 129 |
1 files changed, 129 insertions, 0 deletions
diff --git a/app/services/bulk_imports/process_service.rb b/app/services/bulk_imports/process_service.rb new file mode 100644 index 00000000000..14c5545cfd5 --- /dev/null +++ b/app/services/bulk_imports/process_service.rb @@ -0,0 +1,129 @@ +# frozen_string_literal: true + +module BulkImports + class ProcessService + PERFORM_DELAY = 5.seconds + DEFAULT_BATCH_SIZE = 5 + + attr_reader :bulk_import + + def initialize(bulk_import) + @bulk_import = bulk_import + end + + def execute + return unless bulk_import + return if bulk_import.completed? + return bulk_import.fail_op! if all_entities_failed? + return bulk_import.finish! if all_entities_processed? && bulk_import.started? + return re_enqueue if max_batch_size_exceeded? # Do not start more jobs if max allowed are already running + + process_bulk_import + re_enqueue + rescue StandardError => e + Gitlab::ErrorTracking.track_exception(e, bulk_import_id: bulk_import.id) + + bulk_import.fail_op + end + + private + + def process_bulk_import + bulk_import.start! if bulk_import.created? + + created_entities.first(next_batch_size).each do |entity| + create_tracker(entity) + + entity.start! + + BulkImports::ExportRequestWorker.perform_async(entity.id) + end + end + + def entities + @entities ||= bulk_import.entities + end + + def created_entities + entities.with_status(:created) + end + + def all_entities_processed? + entities.all? { |entity| entity.finished? || entity.failed? } + end + + def all_entities_failed? + entities.all?(&:failed?) + end + + # A new BulkImportWorker job is enqueued to either + # - Process the new BulkImports::Entity created during import (e.g. for the subgroups) + # - Or to mark the `bulk_import` as finished + def re_enqueue + BulkImportWorker.perform_in(PERFORM_DELAY, bulk_import.id) + end + + def started_entities + entities.with_status(:started) + end + + def max_batch_size_exceeded? + started_entities.count >= DEFAULT_BATCH_SIZE + end + + def next_batch_size + [DEFAULT_BATCH_SIZE - started_entities.count, 0].max + end + + def create_tracker(entity) + entity.class.transaction do + entity.pipelines.each do |pipeline| + status = skip_pipeline?(pipeline, entity) ? :skipped : :created + + entity.trackers.create!( + stage: pipeline[:stage], + pipeline_name: pipeline[:pipeline], + status: BulkImports::Tracker.state_machine.states[status].value + ) + end + end + end + + def skip_pipeline?(pipeline, entity) + return false unless entity.source_version.valid? + + minimum_version, maximum_version = pipeline.values_at(:minimum_source_version, :maximum_source_version) + + if source_version_out_of_range?(minimum_version, maximum_version, entity.source_version.without_patch) + log_skipped_pipeline(pipeline, entity, minimum_version, maximum_version) + return true + end + + false + end + + def source_version_out_of_range?(minimum_version, maximum_version, non_patch_source_version) + (minimum_version && non_patch_source_version < Gitlab::VersionInfo.parse(minimum_version)) || + (maximum_version && non_patch_source_version > Gitlab::VersionInfo.parse(maximum_version)) + end + + def log_skipped_pipeline(pipeline, entity, minimum_version, maximum_version) + logger.info( + message: 'Pipeline skipped as source instance version not compatible with pipeline', + bulk_import_entity_id: entity.id, + bulk_import_id: entity.bulk_import_id, + bulk_import_entity_type: entity.source_type, + source_full_path: entity.source_full_path, + pipeline_name: pipeline[:pipeline], + minimum_source_version: minimum_version, + maximum_source_version: maximum_version, + source_version: entity.source_version.to_s, + importer: 'gitlab_migration' + ) + end + + def logger + @logger ||= Gitlab::Import::Logger.build + end + end +end |