Welcome to mirror list, hosted at ThFree Co, Russian Federation.

runner.rb « pipeline « bulk_imports « lib - gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: d39f4121b51b66a1ec52ccc70e95fb6c0db1f8b7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# frozen_string_literal: true

module BulkImports
  module Pipeline
    module Runner
      extend ActiveSupport::Concern

      MarkedAsFailedError = Class.new(StandardError)

      def run
        raise MarkedAsFailedError if marked_as_failed?

        info(message: 'Pipeline started')

        extracted_data = extracted_data_from

        extracted_data&.each do |entry|
          transformers.each do |transformer|
            entry = run_pipeline_step(:transformer, transformer.class.name) do
              transformer.transform(context, entry)
            end
          end

          run_pipeline_step(:loader, loader.class.name) do
            loader.load(context, entry)
          end
        end

        if respond_to?(:after_run)
          run_pipeline_step(:after_run) do
            after_run(extracted_data)
          end
        end

        info(message: 'Pipeline finished')
      rescue MarkedAsFailedError
        log_skip
      end

      private # rubocop:disable Lint/UselessAccessModifier

      def run_pipeline_step(step, class_name = nil)
        raise MarkedAsFailedError if marked_as_failed?

        info(pipeline_step: step, step_class: class_name)

        yield
      rescue MarkedAsFailedError
        log_skip(step => class_name)
      rescue => e
        log_import_failure(e, step)

        mark_as_failed if abort_on_failure?

        nil
      end

      def extracted_data_from
        run_pipeline_step(:extractor, extractor.class.name) do
          extractor.extract(context)
        end
      end

      def mark_as_failed
        warn(message: 'Pipeline failed', pipeline_class: pipeline)

        context.entity.fail_op!
      end

      def marked_as_failed?
        return true if context.entity.failed?

        false
      end

      def log_skip(extra = {})
        log = {
          message: 'Skipping due to failed pipeline status',
          pipeline_class: pipeline
        }.merge(extra)

        info(log)
      end

      def log_import_failure(exception, step)
        attributes = {
          bulk_import_entity_id: context.entity.id,
          pipeline_class: pipeline,
          pipeline_step: step,
          exception_class: exception.class.to_s,
          exception_message: exception.message.truncate(255),
          correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id
        }

        BulkImports::Failure.create(attributes)
      end

      def warn(extra = {})
        logger.warn(log_params(extra))
      end

      def info(extra = {})
        logger.info(log_params(extra))
      end

      def log_params(extra)
        defaults = {
          bulk_import_entity_id: context.entity.id,
          bulk_import_entity_type: context.entity.source_type,
          pipeline_class: pipeline
        }

        defaults.merge(extra).compact
      end

      def logger
        @logger ||= Gitlab::Import::Logger.build
      end
    end
  end
end