1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
|
# frozen_string_literal: true
module Gitlab
module GithubImport
module Importer
class SingleEndpointIssueEventsImporter
include ParallelScheduling
include SingleEndpointNotesImporting
PROCESSED_PAGE_CACHE_KEY = 'issues/%{issuable_iid}/%{collection}'
BATCH_SIZE = 100
def initialize(project, client, parallel: true)
@project = project
@client = client
@parallel = parallel
@already_imported_cache_key = ALREADY_IMPORTED_CACHE_KEY %
{ project: project.id, collection: collection_method }
@job_waiter_cache_key = JOB_WAITER_CACHE_KEY %
{ project: project.id, collection: collection_method }
@job_waiter_remaining_cache_key = JOB_WAITER_REMAINING_CACHE_KEY %
{ project: project.id, collection: collection_method }
end
# In single endpoint there is no issue info to which associated related
# To make it possible to identify issue in separated worker we need to patch
# Sawyer instances here with issue number
def each_associated(parent_record, associated)
associated = associated.to_h
compose_associated_id!(parent_record, associated)
return if already_imported?(associated) || supported_events.exclude?(associated[:event])
cache_event(parent_record, associated)
increment_object_counter(associated[:event])
pull_request = parent_record.is_a? MergeRequest
associated[:issue] = { number: parent_record.iid, pull_request: pull_request }
yield(associated)
mark_as_imported(associated)
end
# In Github Issues and MergeRequests uses the same API to get their events.
# Even more - they have commonly uniq iid
def each_associated_page(&block)
issues_collection.each_batch(of: BATCH_SIZE, column: :iid) { |batch| process_batch(batch, &block) }
merge_requests_collection.each_batch(of: BATCH_SIZE, column: :iid) { |batch| process_batch(batch, &block) }
end
def importer_class
IssueEventImporter
end
def representation_class
Representation::IssueEvent
end
def sidekiq_worker_class
ImportIssueEventWorker
end
def object_type
:issue_event
end
def increment_object_counter(event_name)
counter_type = importer_class::EVENT_COUNTER_MAP[event_name] if import_settings.extended_events?
counter_type ||= object_type
Gitlab::GithubImport::ObjectCounter.increment(project, counter_type, :fetched)
end
def collection_method
:issue_timeline
end
def issues_collection
project.issues.where.not(iid: already_imported_parents).select(:id, :iid) # rubocop: disable CodeReuse/ActiveRecord
end
def merge_requests_collection
project.merge_requests.where.not(iid: already_imported_parents).select(:id, :iid) # rubocop: disable CodeReuse/ActiveRecord
end
def parent_imported_cache_key
"github-importer/issues/#{collection_method}/already-imported/#{project.id}"
end
def page_counter_id(issuable)
PROCESSED_PAGE_CACHE_KEY % { issuable_iid: issuable.iid, collection: collection_method }
end
def id_for_already_imported_cache(event)
event[:id]
end
def collection_options
{ state: 'all', sort: 'created', direction: 'asc' }
end
# Cross-referenced events on Github doesn't have id.
def compose_associated_id!(issuable, event)
return if event[:event] != 'cross-referenced'
event[:id] = "cross-reference##{issuable.iid}-in-#{event.dig(:source, :issue, :id)}"
end
def import_settings
@import_settings ||= Gitlab::GithubImport::Settings.new(project)
end
def after_batch_processed(parent)
return unless import_settings.extended_events?
events = events_cache.events(parent)
return if events.empty?
hash = Representation::ReplayEvent.new(issuable_type: parent.class.name.to_s, issuable_iid: parent.iid)
.to_hash.deep_stringify_keys
ReplayEventsWorker.perform_async(project.id, hash, job_waiter.key.to_s)
job_waiter.jobs_remaining = Gitlab::Cache::Import::Caching.increment(job_waiter_remaining_cache_key)
end
def supported_events
return importer_class::EXTENDED_SUPPORTED_EVENTS if import_settings.extended_events?
importer_class::SUPPORTED_EVENTS
end
def cache_event(parent_record, associated)
return unless import_settings.extended_events?
return if Importer::ReplayEventsImporter::SUPPORTED_EVENTS.exclude?(associated[:event])
representation = representation_class.from_api_response(associated)
events_cache.add(parent_record, representation)
end
def events_cache
@events_cache ||= EventsCache.new(project)
end
end
end
end
end
|