1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
|
# frozen_string_literal: true
module BulkImports
module NdjsonPipeline
extend ActiveSupport::Concern
include Pipeline
include Pipeline::IndexCacheStrategy
included do
file_extraction_pipeline!
def transform(context, data)
return unless data
relation_hash, relation_index = data
return unless relation_hash
relation_object = deep_transform_relation!(relation_hash, relation, relation_definition) do |key, hash|
relation_factory.create(
relation_index: relation_index,
relation_sym: key.to_sym,
relation_hash: hash,
importable: context.portable,
members_mapper: members_mapper,
object_builder: object_builder,
user: context.current_user,
excluded_keys: import_export_config.relation_excluded_keys(key)
)
end
relation_object.assign_attributes(portable_class_sym => portable)
relation_object
end
def load(_context, object)
return unless object
if object.new_record?
saver = Gitlab::ImportExport::Base::RelationObjectSaver.new(
relation_object: object,
relation_key: relation,
relation_definition: relation_definition,
importable: portable
)
saver.execute
capture_invalid_subrelations(saver.invalid_subrelations)
else
if object.invalid?
Gitlab::Import::Errors.merge_nested_errors(object)
raise(ActiveRecord::RecordInvalid, object)
end
object.save!
end
end
def deep_transform_relation!(relation_hash, relation_key, relation_definition, &block)
relation_key = relation_key_override(relation_key)
relation_definition.each do |sub_relation_key, sub_relation_definition|
sub_relation = relation_hash[sub_relation_key]
next unless sub_relation
current_item =
if sub_relation.is_a?(Array)
sub_relation
.map { |entry| deep_transform_relation!(entry, sub_relation_key, sub_relation_definition, &block) }
.tap { |entry| entry.compact! }
.presence
else
deep_transform_relation!(sub_relation, sub_relation_key, sub_relation_definition, &block)
end
if current_item
relation_hash[sub_relation_key] = current_item
else
relation_hash.delete(sub_relation_key)
end
end
yield(relation_key, relation_hash)
end
def after_run(_)
extractor.remove_tmpdir if extractor.respond_to?(:remove_tmpdir)
end
def relation_class(relation_key)
relation_key.to_s.classify.constantize
rescue NameError
relation_key.to_s.constantize
end
def relation_key_override(relation_key)
relation_key_overrides[relation_key.to_sym]&.to_s || relation_key
end
def relation_key_overrides
"Gitlab::ImportExport::#{portable.class}::RelationFactory::OVERRIDES".constantize
end
def object_builder
"Gitlab::ImportExport::#{portable.class}::ObjectBuilder".constantize
end
def relation_factory
"Gitlab::ImportExport::#{portable.class}::RelationFactory".constantize
end
def relation
self.class.relation
end
def members_mapper
@members_mapper ||= BulkImports::UsersMapper.new(context: context)
end
def portable_class_sym
portable.class.to_s.downcase.to_sym
end
def relation_definition
import_export_config.top_relation_tree(relation)
end
def capture_invalid_subrelations(invalid_subrelations)
invalid_subrelations.each do |record|
BulkImports::Failure.create(
bulk_import_entity_id: tracker.entity.id,
pipeline_class: tracker.pipeline_name,
exception_class: 'RecordInvalid',
exception_message: record.errors.full_messages.to_sentence.truncate(255),
correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id
)
end
end
end
end
end
|