Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'app/workers/click_house/events_sync_worker.rb')
-rw-r--r--app/workers/click_house/events_sync_worker.rb93
1 files changed, 90 insertions, 3 deletions
diff --git a/app/workers/click_house/events_sync_worker.rb b/app/workers/click_house/events_sync_worker.rb
index 054e7763297..5b7398cb071 100644
--- a/app/workers/click_house/events_sync_worker.rb
+++ b/app/workers/click_house/events_sync_worker.rb
@@ -12,6 +12,47 @@ module ClickHouse
# the job is scheduled every 3 minutes and we will allow maximum 2.5 minutes runtime
MAX_TTL = 2.5.minutes.to_i
+ MAX_RUNTIME = 120.seconds
+ BATCH_SIZE = 500
+ INSERT_BATCH_SIZE = 5000
+ CSV_MAPPING = {
+ id: :id,
+ path: :path,
+ author_id: :author_id,
+ target_id: :target_id,
+ target_type: :target_type,
+ action: :raw_action,
+ created_at: :casted_created_at,
+ updated_at: :casted_updated_at
+ }.freeze
+
+ # transforms the traversal_ids to a String:
+ # Example: group_id/subgroup_id/group_or_projectnamespace_id/
+ PATH_COLUMN = <<~SQL
+ (
+ CASE
+ WHEN project_id IS NOT NULL THEN (SELECT array_to_string(traversal_ids, '/') || '/' FROM namespaces WHERE id = (SELECT project_namespace_id FROM projects WHERE id = events.project_id LIMIT 1) LIMIT 1)
+ WHEN group_id IS NOT NULL THEN (SELECT array_to_string(traversal_ids, '/') || '/' FROM namespaces WHERE id = events.group_id LIMIT 1)
+ ELSE ''
+ END
+ ) AS path
+ SQL
+
+ EVENT_PROJECTIONS = [
+ :id,
+ PATH_COLUMN,
+ :author_id,
+ :target_id,
+ :target_type,
+ 'action AS raw_action',
+ 'EXTRACT(epoch FROM created_at) AS casted_created_at',
+ 'EXTRACT(epoch FROM updated_at) AS casted_updated_at'
+ ].freeze
+
+ INSERT_EVENTS_QUERY = <<~SQL.squish
+ INSERT INTO events (#{CSV_MAPPING.keys.join(', ')})
+ SETTINGS async_insert=1, wait_for_async_insert=1 FORMAT CSV
+ SQL
def perform
unless enabled?
@@ -22,12 +63,15 @@ module ClickHouse
metadata = { status: :processed }
- # Prevent parallel jobs
begin
+ # Prevent parallel jobs
in_lock(self.class.to_s, ttl: MAX_TTL, retries: 0) do
- true
- end
+ loop { break unless next_batch }
+
+ metadata.merge!(records_inserted: context.total_record_count, reached_end_of_table: context.no_more_records?)
+ ClickHouse::SyncCursor.update_cursor_for(:events, context.last_processed_id) if context.last_processed_id
+ end
rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError
# Skip retrying, just let the next worker to start after a few minutes
metadata = { status: :skipped }
@@ -38,8 +82,51 @@ module ClickHouse
private
+ def context
+ @context ||= ClickHouse::RecordSyncContext.new(
+ last_record_id: ClickHouse::SyncCursor.cursor_for(:events),
+ max_records_per_batch: INSERT_BATCH_SIZE,
+ runtime_limiter: Analytics::CycleAnalytics::RuntimeLimiter.new(MAX_RUNTIME)
+ )
+ end
+
def enabled?
ClickHouse::Client.configuration.databases[:main].present? && Feature.enabled?(:event_sync_worker_for_click_house)
end
+
+ def next_batch
+ context.new_batch!
+
+ CsvBuilder::Gzip.new(process_batch(context), CSV_MAPPING).render do |tempfile, rows_written|
+ unless rows_written == 0
+ ClickHouse::Client.insert_csv(INSERT_EVENTS_QUERY, File.open(tempfile.path),
+ :main)
+ end
+ end
+
+ !(context.over_time? || context.no_more_records?)
+ end
+
+ def process_batch(context)
+ Enumerator.new do |yielder|
+ has_data = false
+ # rubocop: disable CodeReuse/ActiveRecord
+ Event.where(Event.arel_table[:id].gt(context.last_record_id)).each_batch(of: BATCH_SIZE) do |relation|
+ has_data = true
+
+ relation.select(*EVENT_PROJECTIONS).each do |row|
+ yielder << row
+ context.last_processed_id = row.id
+
+ break if context.record_limit_reached?
+ end
+
+ break if context.over_time? || context.record_limit_reached?
+ end
+
+ context.no_more_records! if has_data == false
+ # rubocop: enable CodeReuse/ActiveRecord
+ end
+ end
end
end