Welcome to mirror list, hosted at ThFree Co, Russian Federation.

runner.rb « pipeline « bulk_imports « lib - gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 11fb972217318270b7ad92f63b1d8d28cb165477 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# frozen_string_literal: true

module BulkImports
  module Pipeline
    module Runner
      extend ActiveSupport::Concern

      MarkedAsFailedError = Class.new(StandardError)

      def run(context)
        raise MarkedAsFailedError if marked_as_failed?(context)

        info(context, message: 'Pipeline started', pipeline_class: pipeline)

        Array.wrap(extracted_data_from(context)).each do |entry|
          transformers.each do |transformer|
            entry = run_pipeline_step(:transformer, transformer.class.name, context) do
              transformer.transform(context, entry)
            end
          end

          run_pipeline_step(:loader, loader.class.name, context) do
            loader.load(context, entry)
          end
        end

        after_run.call(context) if after_run.present?
      rescue MarkedAsFailedError
        log_skip(context)
      end

      private # rubocop:disable Lint/UselessAccessModifier

      def run_pipeline_step(type, class_name, context)
        raise MarkedAsFailedError if marked_as_failed?(context)

        info(context, type => class_name)

        yield
      rescue MarkedAsFailedError
        log_skip(context, type => class_name)
      rescue => e
        log_import_failure(e, context)

        mark_as_failed(context) if abort_on_failure?
      end

      def extracted_data_from(context)
        run_pipeline_step(:extractor, extractor.class.name, context) do
          extractor.extract(context)
        end
      end

      def mark_as_failed(context)
        warn(context, message: 'Pipeline failed', pipeline_class: pipeline)

        context.entity.fail_op!
      end

      def marked_as_failed?(context)
        return true if context.entity.failed?

        false
      end

      def log_skip(context, extra = {})
        log = {
          message: 'Skipping due to failed pipeline status',
          pipeline_class: pipeline
        }.merge(extra)

        info(context, log)
      end

      def log_import_failure(exception, context)
        attributes = {
          bulk_import_entity_id: context.entity.id,
          pipeline_class: pipeline,
          exception_class: exception.class.to_s,
          exception_message: exception.message.truncate(255),
          correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id
        }

        BulkImports::Failure.create(attributes)
      end

      def warn(context, extra = {})
        logger.warn(log_base_params(context).merge(extra))
      end

      def info(context, extra = {})
        logger.info(log_base_params(context).merge(extra))
      end

      def log_base_params(context)
        {
          bulk_import_entity_id: context.entity.id,
          bulk_import_entity_type: context.entity.source_type
        }
      end

      def logger
        @logger ||= Gitlab::Import::Logger.build
      end
    end
  end
end