1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
|
# frozen_string_literal: true
module BulkImports
module Pipeline
module Runner
extend ActiveSupport::Concern
MarkedAsFailedError = Class.new(StandardError)
def run
raise MarkedAsFailedError if context.entity.failed?
info(message: 'Pipeline started')
extracted_data = extracted_data_from
if extracted_data
extracted_data.each_with_index do |entry, index|
raw_entry = entry.dup
next if already_processed?(raw_entry, index)
transformers.each do |transformer|
entry = run_pipeline_step(:transformer, transformer.class.name) do
transformer.transform(context, entry)
end
end
run_pipeline_step(:loader, loader.class.name, entry) do
loader.load(context, entry)
end
save_processed_entry(raw_entry, index)
end
tracker.update!(
has_next_page: extracted_data.has_next_page?,
next_page: extracted_data.next_page
)
run_pipeline_step(:after_run) do
after_run(extracted_data)
end
# For batches, `#on_finish` is called once within `FinishBatchedPipelineWorker`
# after all batches have completed.
unless tracker.batched?
run_pipeline_step(:on_finish) do
on_finish
end
end
end
info(message: 'Pipeline finished')
rescue MarkedAsFailedError
skip!('Skipping pipeline due to failed entity')
end
def on_finish; end
private # rubocop:disable Lint/UselessAccessModifier
def run_pipeline_step(step, class_name = nil, entry = nil)
raise MarkedAsFailedError if context.entity.failed?
info(pipeline_step: step, step_class: class_name)
yield
rescue MarkedAsFailedError
skip!(
'Skipping pipeline due to failed entity',
pipeline_step: step,
step_class: class_name,
importer: 'gitlab_migration'
)
rescue BulkImports::NetworkError => e
raise BulkImports::RetryPipelineError.new(e.message, e.retry_delay) if e.retriable?(context.tracker)
log_and_fail(e, step, entry)
rescue BulkImports::RetryPipelineError
raise
rescue StandardError => e
log_and_fail(e, step, entry)
end
def extracted_data_from
run_pipeline_step(:extractor, extractor.class.name) do
extractor.extract(context)
end
end
def cache_key
batch_number = context.extra[:batch_number] || 0
"#{self.class.name.underscore}/#{tracker.bulk_import_entity_id}/#{batch_number}"
end
# Overridden by child pipelines with different caching strategies
def already_processed?(*)
false
end
def save_processed_entry(*); end
def after_run(extracted_data)
run if extracted_data.has_next_page?
end
def log_and_fail(exception, step, entry = nil)
log_import_failure(exception, step, entry)
if abort_on_failure?
tracker.fail_op!
warn(message: 'Aborting entity migration due to pipeline failure')
context.entity.fail_op!
end
nil
end
def skip!(message, extra = {})
warn({ message: message }.merge(extra))
tracker.skip!
end
def log_import_failure(exception, step, entry)
failure_attributes = {
bulk_import_entity_id: context.entity.id,
pipeline_class: pipeline,
pipeline_step: step,
exception_class: exception.class.to_s,
exception_message: exception.message,
correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id
}
if entry
failure_attributes[:source_url] = BulkImports::SourceUrlBuilder.new(context, entry).url
failure_attributes[:source_title] = entry.try(:title) || entry.try(:name)
end
log_exception(
exception,
log_params(
{
bulk_import_id: context.bulk_import_id,
pipeline_step: step,
message: 'An object of a pipeline failed to import'
}
)
)
BulkImports::Failure.create(failure_attributes)
end
def info(extra = {})
logger.info(log_params(extra))
end
def warn(extra = {})
logger.warn(log_params(extra))
end
def log_params(extra)
defaults = {
bulk_import_id: context.bulk_import_id,
bulk_import_entity_id: context.entity.id,
bulk_import_entity_type: context.entity.source_type,
source_full_path: context.entity.source_full_path,
pipeline_class: pipeline,
context_extra: context.extra,
source_version: context.entity.bulk_import.source_version_info.to_s
}
defaults
.merge(extra)
.reject { |_key, value| value.blank? }
end
def logger
@logger ||= Logger.build
end
def log_exception(exception, payload)
Gitlab::ExceptionLogFormatter.format!(exception, payload)
logger.error(structured_payload(payload))
end
def structured_payload(payload = {})
context = Gitlab::ApplicationContext.current.merge(
'class' => self.class.name
)
payload.stringify_keys.merge(context)
end
end
end
end
|