diff options
Diffstat (limited to 'app/workers/click_house/event_authors_consistency_cron_worker.rb')
-rw-r--r-- | app/workers/click_house/event_authors_consistency_cron_worker.rb | 108 |
1 files changed, 33 insertions, 75 deletions
diff --git a/app/workers/click_house/event_authors_consistency_cron_worker.rb b/app/workers/click_house/event_authors_consistency_cron_worker.rb index c35aadba593..62f64f2b9ff 100644 --- a/app/workers/click_house/event_authors_consistency_cron_worker.rb +++ b/app/workers/click_house/event_authors_consistency_cron_worker.rb @@ -5,8 +5,8 @@ module ClickHouse class EventAuthorsConsistencyCronWorker include ApplicationWorker include ClickHouseWorker + include ClickHouse::Concerns::ConsistencyWorker # defines perform include Gitlab::ExclusiveLeaseHelpers - include Gitlab::Utils::StrongMemoize idempotent! queue_namespace :cronjob @@ -14,83 +14,57 @@ module ClickHouse worker_has_external_dependencies! # the worker interacts with a ClickHouse database feature_category :value_stream_management - MAX_TTL = 5.minutes.to_i - MAX_RUNTIME = 150.seconds MAX_AUTHOR_DELETIONS = 2000 - CLICK_HOUSE_BATCH_SIZE = 100_000 - POSTGRESQL_BATCH_SIZE = 2500 - def perform - return unless enabled? - - runtime_limiter = Analytics::CycleAnalytics::RuntimeLimiter.new(MAX_RUNTIME) - - in_lock(self.class.to_s, ttl: MAX_TTL, retries: 0) do - author_records_to_delete = [] - last_processed_id = 0 - iterator.each_batch(column: :author_id, of: CLICK_HOUSE_BATCH_SIZE) do |scope| - query = scope.select(Arel.sql('DISTINCT author_id')).to_sql - ids_from_click_house = connection.select(query).pluck('author_id').sort - - ids_from_click_house.each_slice(POSTGRESQL_BATCH_SIZE) do |ids| - author_records_to_delete.concat(missing_user_ids(ids)) - last_processed_id = ids.last - - to_be_deleted_size = author_records_to_delete.size - if to_be_deleted_size >= MAX_AUTHOR_DELETIONS - metadata.merge!(status: :deletion_limit_reached, deletions: to_be_deleted_size) - break - end - - if runtime_limiter.over_time? - metadata.merge!(status: :over_time, deletions: to_be_deleted_size) - break - end - end - - break if limit_was_reached? - end + private - delete_records_from_click_house(author_records_to_delete) + def collect_values(ids) + missing_user_ids_from_batch = missing_user_ids(ids) + context[:last_processed_id] = missing_user_ids_from_batch.last + context[:author_records_to_delete].concat(missing_user_ids_from_batch) - last_processed_id = 0 if table_fully_processed? - ClickHouse::SyncCursor.update_cursor_for(:event_authors_consistency_check, last_processed_id) + to_be_deleted_size = context[:author_records_to_delete].size + metadata[:modifications] = to_be_deleted_size - log_extra_metadata_on_done(:result, metadata) + if to_be_deleted_size >= MAX_AUTHOR_DELETIONS + metadata[:status] = :limit_reached + return end + + metadata[:status] = :over_time if runtime_limiter.over_time? end - private + def process_collected_values + ids = context[:author_records_to_delete] + query = ClickHouse::Client::Query.new( + raw_query: 'ALTER TABLE events DELETE WHERE author_id IN ({author_ids:Array(UInt64)})', + placeholders: { author_ids: ids.to_json } + ) - def metadata - @metadata ||= { status: :processed, deletions: 0 } - end + connection.execute(query) - def limit_was_reached? - metadata[:status] == :deletion_limit_reached || metadata[:status] == :over_time - end + query = ClickHouse::Client::Query.new( + raw_query: 'ALTER TABLE event_authors DELETE WHERE author_id IN ({author_ids:Array(UInt64)})', + placeholders: { author_ids: ids.to_json } + ) - def table_fully_processed? - metadata[:status] == :processed + connection.execute(query) end - def enabled? - ClickHouse::Client.database_configured?(:main) && Feature.enabled?(:event_sync_worker_for_click_house) + def init_context + @context = { author_records_to_delete: [], last_processed_id: 0 } end - def previous_author_id - value = ClickHouse::SyncCursor.cursor_for(:event_authors_consistency_check) - value == 0 ? nil : value + def table + 'event_authors' end - strong_memoize_attr :previous_author_id - def iterator - builder = ClickHouse::QueryBuilder.new('event_authors') - ClickHouse::Iterator.new(query_builder: builder, connection: connection, min_value: previous_author_id) + def batch_column + 'author_id' end - def connection - @connection ||= ClickHouse::Connection.new(:main) + def pluck_column + 'author_id' end def missing_user_ids(ids) @@ -100,22 +74,6 @@ module ClickHouse .where('NOT EXISTS (SELECT 1 FROM users WHERE id = user_ids.id)') .pluck(:id) end - - def delete_records_from_click_house(ids) - query = ClickHouse::Client::Query.new( - raw_query: "ALTER TABLE events DELETE WHERE author_id IN ({author_ids:Array(UInt64)})", - placeholders: { author_ids: ids.to_json } - ) - - connection.execute(query) - - query = ClickHouse::Client::Query.new( - raw_query: "ALTER TABLE event_authors DELETE WHERE author_id IN ({author_ids:Array(UInt64)})", - placeholders: { author_ids: ids.to_json } - ) - - connection.execute(query) - end end # rubocop: enable CodeReuse/ActiveRecord end |