Welcome to mirror list, hosted at ThFree Co, Russian Federation.

events_sync_worker.rb « click_house « workers « app - gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 5b7398cb071bb6aaa7bea08b0fba4bc82467dc76 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# frozen_string_literal: true

module ClickHouse
  class EventsSyncWorker
    include ApplicationWorker
    include Gitlab::ExclusiveLeaseHelpers

    idempotent!
    data_consistency :delayed
    worker_has_external_dependencies! # the worker interacts with a ClickHouse database
    feature_category :value_stream_management

    # the job is scheduled every 3 minutes and we will allow maximum 2.5 minutes runtime
    MAX_TTL = 2.5.minutes.to_i
    MAX_RUNTIME = 120.seconds
    BATCH_SIZE = 500
    INSERT_BATCH_SIZE = 5000
    CSV_MAPPING = {
      id: :id,
      path: :path,
      author_id: :author_id,
      target_id: :target_id,
      target_type: :target_type,
      action: :raw_action,
      created_at: :casted_created_at,
      updated_at: :casted_updated_at
    }.freeze

    # transforms the traversal_ids to a String:
    # Example: group_id/subgroup_id/group_or_projectnamespace_id/
    PATH_COLUMN = <<~SQL
      (
        CASE
          WHEN project_id IS NOT NULL THEN (SELECT array_to_string(traversal_ids, '/') || '/' FROM namespaces WHERE id = (SELECT project_namespace_id FROM projects WHERE id = events.project_id LIMIT 1) LIMIT 1)
          WHEN group_id IS NOT NULL THEN (SELECT array_to_string(traversal_ids, '/') || '/' FROM namespaces WHERE id = events.group_id LIMIT 1)
          ELSE ''
        END
      ) AS path
    SQL

    EVENT_PROJECTIONS = [
      :id,
      PATH_COLUMN,
      :author_id,
      :target_id,
      :target_type,
      'action AS raw_action',
      'EXTRACT(epoch FROM created_at) AS casted_created_at',
      'EXTRACT(epoch FROM updated_at) AS casted_updated_at'
    ].freeze

    INSERT_EVENTS_QUERY = <<~SQL.squish
      INSERT INTO events (#{CSV_MAPPING.keys.join(', ')})
      SETTINGS async_insert=1, wait_for_async_insert=1 FORMAT CSV
    SQL

    def perform
      unless enabled?
        log_extra_metadata_on_done(:result, { status: :disabled })

        return
      end

      metadata = { status: :processed }

      begin
        # Prevent parallel jobs
        in_lock(self.class.to_s, ttl: MAX_TTL, retries: 0) do
          loop { break unless next_batch }

          metadata.merge!(records_inserted: context.total_record_count, reached_end_of_table: context.no_more_records?)

          ClickHouse::SyncCursor.update_cursor_for(:events, context.last_processed_id) if context.last_processed_id
        end
      rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError
        # Skip retrying, just let the next worker to start after a few minutes
        metadata = { status: :skipped }
      end

      log_extra_metadata_on_done(:result, metadata)
    end

    private

    def context
      @context ||= ClickHouse::RecordSyncContext.new(
        last_record_id: ClickHouse::SyncCursor.cursor_for(:events),
        max_records_per_batch: INSERT_BATCH_SIZE,
        runtime_limiter: Analytics::CycleAnalytics::RuntimeLimiter.new(MAX_RUNTIME)
      )
    end

    def enabled?
      ClickHouse::Client.configuration.databases[:main].present? && Feature.enabled?(:event_sync_worker_for_click_house)
    end

    def next_batch
      context.new_batch!

      CsvBuilder::Gzip.new(process_batch(context), CSV_MAPPING).render do |tempfile, rows_written|
        unless rows_written == 0
          ClickHouse::Client.insert_csv(INSERT_EVENTS_QUERY, File.open(tempfile.path),
            :main)
        end
      end

      !(context.over_time? || context.no_more_records?)
    end

    def process_batch(context)
      Enumerator.new do |yielder|
        has_data = false
        # rubocop: disable CodeReuse/ActiveRecord
        Event.where(Event.arel_table[:id].gt(context.last_record_id)).each_batch(of: BATCH_SIZE) do |relation|
          has_data = true

          relation.select(*EVENT_PROJECTIONS).each do |row|
            yielder << row
            context.last_processed_id = row.id

            break if context.record_limit_reached?
          end

          break if context.over_time? || context.record_limit_reached?
        end

        context.no_more_records! if has_data == false
        # rubocop: enable CodeReuse/ActiveRecord
      end
    end
  end
end