blob: 62f64f2b9ff877883aa3aa23e426e8e7ad8373eb (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
# frozen_string_literal: true
module ClickHouse
# rubocop: disable CodeReuse/ActiveRecord -- Building worker-specific ActiveRecord and ClickHouse queries
class EventAuthorsConsistencyCronWorker
include ApplicationWorker
include ClickHouseWorker
include ClickHouse::Concerns::ConsistencyWorker # defines perform
include Gitlab::ExclusiveLeaseHelpers
idempotent!
queue_namespace :cronjob
data_consistency :delayed
worker_has_external_dependencies! # the worker interacts with a ClickHouse database
feature_category :value_stream_management
MAX_AUTHOR_DELETIONS = 2000
private
def collect_values(ids)
missing_user_ids_from_batch = missing_user_ids(ids)
context[:last_processed_id] = missing_user_ids_from_batch.last
context[:author_records_to_delete].concat(missing_user_ids_from_batch)
to_be_deleted_size = context[:author_records_to_delete].size
metadata[:modifications] = to_be_deleted_size
if to_be_deleted_size >= MAX_AUTHOR_DELETIONS
metadata[:status] = :limit_reached
return
end
metadata[:status] = :over_time if runtime_limiter.over_time?
end
def process_collected_values
ids = context[:author_records_to_delete]
query = ClickHouse::Client::Query.new(
raw_query: 'ALTER TABLE events DELETE WHERE author_id IN ({author_ids:Array(UInt64)})',
placeholders: { author_ids: ids.to_json }
)
connection.execute(query)
query = ClickHouse::Client::Query.new(
raw_query: 'ALTER TABLE event_authors DELETE WHERE author_id IN ({author_ids:Array(UInt64)})',
placeholders: { author_ids: ids.to_json }
)
connection.execute(query)
end
def init_context
@context = { author_records_to_delete: [], last_processed_id: 0 }
end
def table
'event_authors'
end
def batch_column
'author_id'
end
def pluck_column
'author_id'
end
def missing_user_ids(ids)
value_list = Arel::Nodes::ValuesList.new(ids.map { |id| [id] })
User
.from("(#{value_list.to_sql}) AS user_ids(id)")
.where('NOT EXISTS (SELECT 1 FROM users WHERE id = user_ids.id)')
.pluck(:id)
end
end
# rubocop: enable CodeReuse/ActiveRecord
end
|