1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
|
# frozen_string_literal: true
module ClickHouse
class EventsSyncWorker
include ApplicationWorker
include ClickHouseWorker
include Gitlab::ExclusiveLeaseHelpers
include Gitlab::Utils::StrongMemoize
idempotent!
queue_namespace :cronjob
data_consistency :delayed
worker_has_external_dependencies! # the worker interacts with a ClickHouse database
feature_category :value_stream_management
# the job is scheduled every 3 minutes and we will allow maximum 2.5 minutes runtime
MAX_TTL = 2.5.minutes.to_i
MAX_RUNTIME = 120.seconds
BATCH_SIZE = 500
INSERT_BATCH_SIZE = 5000
CSV_MAPPING = {
id: :id,
path: :path,
author_id: :author_id,
target_id: :target_id,
target_type: :target_type,
action: :raw_action,
created_at: :casted_created_at,
updated_at: :casted_updated_at
}.freeze
# transforms the traversal_ids to a String:
# Example: group_id/subgroup_id/group_or_projectnamespace_id/
PATH_COLUMN = <<~SQL
(
CASE
WHEN project_id IS NOT NULL THEN (SELECT array_to_string(traversal_ids, '/') || '/' FROM namespaces WHERE id = (SELECT project_namespace_id FROM projects WHERE id = events.project_id LIMIT 1) LIMIT 1)
WHEN group_id IS NOT NULL THEN (SELECT array_to_string(traversal_ids, '/') || '/' FROM namespaces WHERE id = events.group_id LIMIT 1)
ELSE ''
END
) AS path
SQL
EVENT_PROJECTIONS = [
:id,
PATH_COLUMN,
:author_id,
:target_id,
:target_type,
'action AS raw_action',
'EXTRACT(epoch FROM created_at) AS casted_created_at',
'EXTRACT(epoch FROM updated_at) AS casted_updated_at'
].freeze
INSERT_EVENTS_QUERY = <<~SQL.squish
INSERT INTO events (#{CSV_MAPPING.keys.join(', ')})
SETTINGS async_insert=1, wait_for_async_insert=1 FORMAT CSV
SQL
def perform
unless enabled?
log_extra_metadata_on_done(:result, { status: :disabled })
return
end
metadata = { status: :processed }
begin
# Prevent parallel jobs
in_lock(self.class.to_s, ttl: MAX_TTL, retries: 0) do
loop { break unless next_batch }
metadata.merge!(records_inserted: context.total_record_count, reached_end_of_table: context.no_more_records?)
ClickHouse::SyncCursor.update_cursor_for(:events, context.last_processed_id) if context.last_processed_id
end
rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError
# Skip retrying, just let the next worker to start after a few minutes
metadata = { status: :skipped }
end
log_extra_metadata_on_done(:result, metadata)
end
private
def context
@context ||= ClickHouse::RecordSyncContext.new(
last_record_id: ClickHouse::SyncCursor.cursor_for(:events),
max_records_per_batch: INSERT_BATCH_SIZE,
runtime_limiter: Analytics::CycleAnalytics::RuntimeLimiter.new(MAX_RUNTIME)
)
end
def last_event_id_in_postgresql
Event.maximum(:id)
end
strong_memoize_attr :last_event_id_in_postgresql
def enabled?
ClickHouse::Client.database_configured?(:main) && Feature.enabled?(:event_sync_worker_for_click_house)
end
def next_batch
context.new_batch!
CsvBuilder::Gzip.new(process_batch(context), CSV_MAPPING).render do |tempfile, rows_written|
unless rows_written == 0
ClickHouse::Client.insert_csv(INSERT_EVENTS_QUERY, File.open(tempfile.path),
:main)
end
end
!(context.over_time? || context.no_more_records?)
end
def process_batch(context)
Enumerator.new do |yielder|
has_more_data = false
batching_scope.each_batch(of: BATCH_SIZE) do |relation|
records = relation.select(*EVENT_PROJECTIONS).to_a
has_more_data = records.size == BATCH_SIZE
records.each do |row|
yielder << row
context.last_processed_id = row.id
break if context.record_limit_reached?
end
break if context.over_time? || context.record_limit_reached? || !has_more_data
end
context.no_more_records! unless has_more_data
end
end
# rubocop: disable CodeReuse/ActiveRecord
def batching_scope
return Event.none unless last_event_id_in_postgresql
table = Event.arel_table
Event
.where(table[:id].gt(context.last_record_id))
.where(table[:id].lteq(last_event_id_in_postgresql))
end
# rubocop: enable CodeReuse/ActiveRecord
end
end
|