Welcome to mirror list, hosted at ThFree Co, Russian Federation.

event_authors_consistency_cron_worker.rb « click_house « workers « app - gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 62f64f2b9ff877883aa3aa23e426e8e7ad8373eb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# frozen_string_literal: true

module ClickHouse
  # rubocop: disable CodeReuse/ActiveRecord -- Building worker-specific ActiveRecord and ClickHouse queries
  class EventAuthorsConsistencyCronWorker
    include ApplicationWorker
    include ClickHouseWorker
    include ClickHouse::Concerns::ConsistencyWorker # defines perform
    include Gitlab::ExclusiveLeaseHelpers

    idempotent!
    queue_namespace :cronjob
    data_consistency :delayed
    worker_has_external_dependencies! # the worker interacts with a ClickHouse database
    feature_category :value_stream_management

    MAX_AUTHOR_DELETIONS = 2000

    private

    def collect_values(ids)
      missing_user_ids_from_batch = missing_user_ids(ids)
      context[:last_processed_id] = missing_user_ids_from_batch.last
      context[:author_records_to_delete].concat(missing_user_ids_from_batch)

      to_be_deleted_size = context[:author_records_to_delete].size
      metadata[:modifications] = to_be_deleted_size

      if to_be_deleted_size >= MAX_AUTHOR_DELETIONS
        metadata[:status] = :limit_reached
        return
      end

      metadata[:status] = :over_time if runtime_limiter.over_time?
    end

    def process_collected_values
      ids = context[:author_records_to_delete]
      query = ClickHouse::Client::Query.new(
        raw_query: 'ALTER TABLE events DELETE WHERE author_id IN ({author_ids:Array(UInt64)})',
        placeholders: { author_ids: ids.to_json }
      )

      connection.execute(query)

      query = ClickHouse::Client::Query.new(
        raw_query: 'ALTER TABLE event_authors DELETE WHERE author_id IN ({author_ids:Array(UInt64)})',
        placeholders: { author_ids: ids.to_json }
      )

      connection.execute(query)
    end

    def init_context
      @context = { author_records_to_delete: [], last_processed_id: 0 }
    end

    def table
      'event_authors'
    end

    def batch_column
      'author_id'
    end

    def pluck_column
      'author_id'
    end

    def missing_user_ids(ids)
      value_list = Arel::Nodes::ValuesList.new(ids.map { |id| [id] })
      User
        .from("(#{value_list.to_sql}) AS user_ids(id)")
        .where('NOT EXISTS (SELECT 1 FROM users WHERE id = user_ids.id)')
        .pluck(:id)
    end
  end
  # rubocop: enable CodeReuse/ActiveRecord
end