Welcome to mirror list, hosted at ThFree Co, Russian Federation.

runner.rb « pipeline « bulk_imports « lib - gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 7b5e1e68459fe275f0d4b9bd0735dfdb2c3d4b01 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# frozen_string_literal: true

module BulkImports
  module Pipeline
    module Runner
      extend ActiveSupport::Concern

      MarkedAsFailedError = Class.new(StandardError)

      def run
        raise MarkedAsFailedError if context.entity.failed?

        info(message: 'Pipeline started')

        extracted_data = extracted_data_from

        if extracted_data
          extracted_data.each_with_index do |entry, index|
            refresh_entity_and_import if index % 1000 == 0

            raw_entry = entry.dup
            next if already_processed?(raw_entry, index)

            transformers.each do |transformer|
              entry = run_pipeline_step(:transformer, transformer.class.name) do
                transformer.transform(context, entry)
              end
            end

            run_pipeline_step(:loader, loader.class.name, entry) do
              loader.load(context, entry)
            end

            save_processed_entry(raw_entry, index)
          end

          tracker.update!(
            has_next_page: extracted_data.has_next_page?,
            next_page: extracted_data.next_page
          )

          run_pipeline_step(:after_run) do
            after_run(extracted_data)
          end

          # For batches, `#on_finish` is called once within `FinishBatchedPipelineWorker`
          # after all batches have completed.
          unless tracker.batched?
            run_pipeline_step(:on_finish) do
              on_finish
            end
          end
        end

        info(message: 'Pipeline finished')
      rescue MarkedAsFailedError
        skip!('Skipping pipeline due to failed entity')
      end

      def on_finish; end

      private # rubocop:disable Lint/UselessAccessModifier

      def run_pipeline_step(step, class_name = nil, entry = nil)
        raise MarkedAsFailedError if context.entity.failed?

        info(pipeline_step: step, step_class: class_name)

        yield
      rescue MarkedAsFailedError
        skip!(
          'Skipping pipeline due to failed entity',
          pipeline_step: step,
          step_class: class_name,
          importer: 'gitlab_migration'
        )
      rescue BulkImports::NetworkError => e
        raise BulkImports::RetryPipelineError.new(e.message, e.retry_delay) if e.retriable?(context.tracker)

        log_and_fail(e, step, entry)
      rescue BulkImports::RetryPipelineError
        raise
      rescue StandardError => e
        log_and_fail(e, step, entry)
      end

      def extracted_data_from
        run_pipeline_step(:extractor, extractor.class.name) do
          extractor.extract(context)
        end
      end

      def cache_key
        batch_number = context.extra[:batch_number] || 0

        "#{self.class.name.underscore}/#{tracker.bulk_import_entity_id}/#{batch_number}"
      end

      # Overridden by child pipelines with different caching strategies
      def already_processed?(*)
        false
      end

      def save_processed_entry(*); end

      def after_run(extracted_data)
        run if extracted_data.has_next_page?
      end

      def log_and_fail(exception, step, entry = nil)
        log_import_failure(exception, step, entry)

        if abort_on_failure?
          tracker.fail_op!

          warn(message: 'Aborting entity migration due to pipeline failure')
          context.entity.fail_op!
        end

        nil
      end

      def skip!(message, extra = {})
        warn({ message: message }.merge(extra))

        tracker.skip!
      end

      def log_import_failure(exception, step, entry)
        failure_attributes = {
          bulk_import_entity_id: context.entity.id,
          pipeline_class: pipeline,
          pipeline_step: step,
          exception_class: exception.class.to_s,
          exception_message: exception.message,
          correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id
        }

        if entry
          failure_attributes[:source_url] = BulkImports::SourceUrlBuilder.new(context, entry).url
          failure_attributes[:source_title] = entry.try(:title) || entry.try(:name)
        end

        log_exception(
          exception,
          log_params(
            {
              bulk_import_id: context.bulk_import_id,
              pipeline_step: step,
              message: 'An object of a pipeline failed to import'
            }
          )
        )

        BulkImports::Failure.create(failure_attributes)
      end

      def info(extra = {})
        logger.info(log_params(extra))
      end

      def warn(extra = {})
        logger.warn(log_params(extra))
      end

      def log_params(extra)
        defaults = {
          bulk_import_id: context.bulk_import_id,
          pipeline_class: pipeline,
          context_extra: context.extra
        }

        defaults
          .merge(extra)
          .reject { |_key, value| value.blank? }
      end

      def logger
        @logger ||= Logger.build.with_entity(context.entity)
      end

      def log_exception(exception, payload)
        Gitlab::ExceptionLogFormatter.format!(exception, payload)
        logger.error(structured_payload(payload))
      end

      def structured_payload(payload = {})
        context = Gitlab::ApplicationContext.current.merge(
          'class' => self.class.name
        )

        payload.stringify_keys.merge(context)
      end

      def refresh_entity_and_import
        context.entity.touch
        context.bulk_import.touch
      end
    end
  end
end