diff options
Diffstat (limited to 'app/workers/click_house/events_sync_worker.rb')
-rw-r--r-- | app/workers/click_house/events_sync_worker.rb | 93 |
1 files changed, 90 insertions, 3 deletions
diff --git a/app/workers/click_house/events_sync_worker.rb b/app/workers/click_house/events_sync_worker.rb index 054e7763297..5b7398cb071 100644 --- a/app/workers/click_house/events_sync_worker.rb +++ b/app/workers/click_house/events_sync_worker.rb @@ -12,6 +12,47 @@ module ClickHouse # the job is scheduled every 3 minutes and we will allow maximum 2.5 minutes runtime MAX_TTL = 2.5.minutes.to_i + MAX_RUNTIME = 120.seconds + BATCH_SIZE = 500 + INSERT_BATCH_SIZE = 5000 + CSV_MAPPING = { + id: :id, + path: :path, + author_id: :author_id, + target_id: :target_id, + target_type: :target_type, + action: :raw_action, + created_at: :casted_created_at, + updated_at: :casted_updated_at + }.freeze + + # transforms the traversal_ids to a String: + # Example: group_id/subgroup_id/group_or_projectnamespace_id/ + PATH_COLUMN = <<~SQL + ( + CASE + WHEN project_id IS NOT NULL THEN (SELECT array_to_string(traversal_ids, '/') || '/' FROM namespaces WHERE id = (SELECT project_namespace_id FROM projects WHERE id = events.project_id LIMIT 1) LIMIT 1) + WHEN group_id IS NOT NULL THEN (SELECT array_to_string(traversal_ids, '/') || '/' FROM namespaces WHERE id = events.group_id LIMIT 1) + ELSE '' + END + ) AS path + SQL + + EVENT_PROJECTIONS = [ + :id, + PATH_COLUMN, + :author_id, + :target_id, + :target_type, + 'action AS raw_action', + 'EXTRACT(epoch FROM created_at) AS casted_created_at', + 'EXTRACT(epoch FROM updated_at) AS casted_updated_at' + ].freeze + + INSERT_EVENTS_QUERY = <<~SQL.squish + INSERT INTO events (#{CSV_MAPPING.keys.join(', ')}) + SETTINGS async_insert=1, wait_for_async_insert=1 FORMAT CSV + SQL def perform unless enabled? @@ -22,12 +63,15 @@ module ClickHouse metadata = { status: :processed } - # Prevent parallel jobs begin + # Prevent parallel jobs in_lock(self.class.to_s, ttl: MAX_TTL, retries: 0) do - true - end + loop { break unless next_batch } + + metadata.merge!(records_inserted: context.total_record_count, reached_end_of_table: context.no_more_records?) + ClickHouse::SyncCursor.update_cursor_for(:events, context.last_processed_id) if context.last_processed_id + end rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError # Skip retrying, just let the next worker to start after a few minutes metadata = { status: :skipped } @@ -38,8 +82,51 @@ module ClickHouse private + def context + @context ||= ClickHouse::RecordSyncContext.new( + last_record_id: ClickHouse::SyncCursor.cursor_for(:events), + max_records_per_batch: INSERT_BATCH_SIZE, + runtime_limiter: Analytics::CycleAnalytics::RuntimeLimiter.new(MAX_RUNTIME) + ) + end + def enabled? ClickHouse::Client.configuration.databases[:main].present? && Feature.enabled?(:event_sync_worker_for_click_house) end + + def next_batch + context.new_batch! + + CsvBuilder::Gzip.new(process_batch(context), CSV_MAPPING).render do |tempfile, rows_written| + unless rows_written == 0 + ClickHouse::Client.insert_csv(INSERT_EVENTS_QUERY, File.open(tempfile.path), + :main) + end + end + + !(context.over_time? || context.no_more_records?) + end + + def process_batch(context) + Enumerator.new do |yielder| + has_data = false + # rubocop: disable CodeReuse/ActiveRecord + Event.where(Event.arel_table[:id].gt(context.last_record_id)).each_batch(of: BATCH_SIZE) do |relation| + has_data = true + + relation.select(*EVENT_PROJECTIONS).each do |row| + yielder << row + context.last_processed_id = row.id + + break if context.record_limit_reached? + end + + break if context.over_time? || context.record_limit_reached? + end + + context.no_more_records! if has_data == false + # rubocop: enable CodeReuse/ActiveRecord + end + end end end |