diff options
Diffstat (limited to 'app/workers')
-rw-r--r-- | app/workers/all_queues.yml | 9 | ||||
-rw-r--r-- | app/workers/click_house/event_authors_consistency_cron_worker.rb | 121 |
2 files changed, 130 insertions, 0 deletions
diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml index fc0695c7f62..dfad9f7f673 100644 --- a/app/workers/all_queues.yml +++ b/app/workers/all_queues.yml @@ -345,6 +345,15 @@ :weight: 1 :idempotent: false :tags: [] +- :name: cronjob:click_house_event_authors_consistency_cron + :worker_name: ClickHouse::EventAuthorsConsistencyCronWorker + :feature_category: :value_stream_management + :has_external_dependencies: true + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: cronjob:click_house_events_sync :worker_name: ClickHouse::EventsSyncWorker :feature_category: :value_stream_management diff --git a/app/workers/click_house/event_authors_consistency_cron_worker.rb b/app/workers/click_house/event_authors_consistency_cron_worker.rb new file mode 100644 index 00000000000..5c52cda0204 --- /dev/null +++ b/app/workers/click_house/event_authors_consistency_cron_worker.rb @@ -0,0 +1,121 @@ +# frozen_string_literal: true + +module ClickHouse + # rubocop: disable CodeReuse/ActiveRecord -- Building worker-specific ActiveRecord and ClickHouse queries + class EventAuthorsConsistencyCronWorker + include ApplicationWorker + include ClickHouseWorker + include Gitlab::ExclusiveLeaseHelpers + include Gitlab::Utils::StrongMemoize + + idempotent! + queue_namespace :cronjob + data_consistency :delayed + worker_has_external_dependencies! # the worker interacts with a ClickHouse database + feature_category :value_stream_management + + MAX_TTL = 5.minutes.to_i + MAX_RUNTIME = 150.seconds + MAX_AUTHOR_DELETIONS = 2000 + CLICK_HOUSE_BATCH_SIZE = 100_000 + POSTGRESQL_BATCH_SIZE = 2500 + + def perform + return unless enabled? + + runtime_limiter = Analytics::CycleAnalytics::RuntimeLimiter.new(MAX_RUNTIME) + + in_lock(self.class.to_s, ttl: MAX_TTL, retries: 0) do + author_records_to_delete = [] + last_processed_id = 0 + iterator.each_batch(column: :author_id, of: CLICK_HOUSE_BATCH_SIZE) do |scope| + query = scope.select(Arel.sql('DISTINCT author_id')).to_sql + ids_from_click_house = connection.select(query).pluck('author_id').sort + + ids_from_click_house.each_slice(POSTGRESQL_BATCH_SIZE) do |ids| + author_records_to_delete.concat(missing_user_ids(ids)) + last_processed_id = ids.last + + to_be_deleted_size = author_records_to_delete.size + if to_be_deleted_size >= MAX_AUTHOR_DELETIONS + metadata.merge!(status: :deletion_limit_reached, deletions: to_be_deleted_size) + break + end + + if runtime_limiter.over_time? + metadata.merge!(status: :over_time, deletions: to_be_deleted_size) + break + end + end + + break if limit_was_reached? + end + + delete_records_from_click_house(author_records_to_delete) + + last_processed_id = 0 if table_fully_processed? + ClickHouse::SyncCursor.update_cursor_for(:event_authors_consistency_check, last_processed_id) + + log_extra_metadata_on_done(:result, metadata) + end + end + + private + + def metadata + @metadata ||= { status: :processed, deletions: 0 } + end + + def limit_was_reached? + metadata[:status] == :deletion_limit_reached || metadata[:status] == :over_time + end + + def table_fully_processed? + metadata[:status] == :processed + end + + def enabled? + ClickHouse::Client.database_configured?(:main) && Feature.enabled?(:event_sync_worker_for_click_house) + end + + def previous_author_id + value = ClickHouse::SyncCursor.cursor_for(:event_authors_consistency_check) + value == 0 ? nil : value + end + strong_memoize_attr :previous_author_id + + def iterator + builder = ClickHouse::QueryBuilder.new('event_authors') + ClickHouse::Iterator.new(query_builder: builder, connection: connection, min_value: previous_author_id) + end + + def connection + @connection ||= ClickHouse::Connection.new(:main) + end + + def missing_user_ids(ids) + value_list = Arel::Nodes::ValuesList.new(ids.map { |id| [id] }) + User + .from("(#{value_list.to_sql}) AS user_ids(id)") + .where('NOT EXISTS (SELECT 1 FROM users WHERE id = user_ids.id)') + .pluck(:id) + end + + def delete_records_from_click_house(ids) + query = ClickHouse::Client::Query.new( + raw_query: "DELETE FROM events WHERE author_id IN ({author_ids:Array(UInt64)})", + placeholders: { author_ids: ids.to_json } + ) + + connection.execute(query) + + query = ClickHouse::Client::Query.new( + raw_query: "DELETE FROM event_authors WHERE author_id IN ({author_ids:Array(UInt64)})", + placeholders: { author_ids: ids.to_json } + ) + + connection.execute(query) + end + end + # rubocop: enable CodeReuse/ActiveRecord +end |