Welcome to mirror list, hosted at ThFree Co, Russian Federation.

runner.rb « pipeline « bulk_imports « lib - gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 81f8dee30d939b69f1c3614b5c942f47544cb4ed (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# frozen_string_literal: true

module BulkImports
  module Pipeline
    module Runner
      extend ActiveSupport::Concern

      MarkedAsFailedError = Class.new(StandardError)

      def run
        raise MarkedAsFailedError if context.entity.failed?

        info(message: 'Pipeline started')

        extracted_data = extracted_data_from

        if extracted_data
          extracted_data.each do |entry|
            transformers.each do |transformer|
              entry = run_pipeline_step(:transformer, transformer.class.name) do
                transformer.transform(context, entry)
              end
            end

            run_pipeline_step(:loader, loader.class.name) do
              loader.load(context, entry)
            end
          end

          tracker.update!(
            has_next_page: extracted_data.has_next_page?,
            next_page: extracted_data.next_page
          )

          run_pipeline_step(:after_run) do
            after_run(extracted_data)
          end
        end

        info(message: 'Pipeline finished')
      rescue MarkedAsFailedError
        skip!('Skipping pipeline due to failed entity')
      end

      private # rubocop:disable Lint/UselessAccessModifier

      def run_pipeline_step(step, class_name = nil)
        raise MarkedAsFailedError if context.entity.failed?

        info(pipeline_step: step, step_class: class_name)

        yield
      rescue MarkedAsFailedError
        skip!(
          'Skipping pipeline due to failed entity',
          pipeline_step: step,
          step_class: class_name,
          importer: 'gitlab_migration'
        )
      rescue BulkImports::NetworkError => e
        if e.retriable?(context.tracker)
          raise BulkImports::RetryPipelineError.new(e.message, e.retry_delay)
        else
          log_and_fail(e, step)
        end
      rescue BulkImports::RetryPipelineError
        raise
      rescue StandardError => e
        log_and_fail(e, step)
      end

      def extracted_data_from
        run_pipeline_step(:extractor, extractor.class.name) do
          extractor.extract(context)
        end
      end

      def after_run(extracted_data)
        run if extracted_data.has_next_page?
      end

      def log_and_fail(exception, step)
        log_import_failure(exception, step)

        tracker.fail_op!

        if abort_on_failure?
          warn(message: 'Aborting entity migration due to pipeline failure')
          context.entity.fail_op!
        end

        nil
      end

      def skip!(message, extra = {})
        warn({ message: message }.merge(extra))

        tracker.skip!
      end

      def log_import_failure(exception, step)
        failure_attributes = {
          bulk_import_entity_id: context.entity.id,
          pipeline_class: pipeline,
          pipeline_step: step,
          exception_class: exception.class.to_s,
          exception_message: exception.message.truncate(255),
          correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id
        }

        log_exception(
          exception,
          log_params(
            {
              bulk_import_id: context.bulk_import_id,
              pipeline_step: step,
              message: 'Pipeline failed'
            }
          )
        )

        BulkImports::Failure.create(failure_attributes)
      end

      def info(extra = {})
        logger.info(log_params(extra))
      end

      def warn(extra = {})
        logger.warn(log_params(extra))
      end

      def log_params(extra)
        defaults = {
          bulk_import_id: context.bulk_import_id,
          bulk_import_entity_id: context.entity.id,
          bulk_import_entity_type: context.entity.source_type,
          source_full_path: context.entity.source_full_path,
          pipeline_class: pipeline,
          context_extra: context.extra,
          source_version: context.entity.bulk_import.source_version_info.to_s,
          importer: 'gitlab_migration'
        }

        defaults
          .merge(extra)
          .reject { |_key, value| value.blank? }
      end

      def logger
        @logger ||= Gitlab::Import::Logger.build
      end

      def log_exception(exception, payload)
        Gitlab::ExceptionLogFormatter.format!(exception, payload)
        logger.error(structured_payload(payload))
      end

      def structured_payload(payload = {})
        context = Gitlab::ApplicationContext.current.merge(
          'class' => self.class.name
        )

        payload.stringify_keys.merge(context)
      end
    end
  end
end