Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'app/workers/click_house/event_authors_consistency_cron_worker.rb')
-rw-r--r--app/workers/click_house/event_authors_consistency_cron_worker.rb108
1 files changed, 33 insertions, 75 deletions
diff --git a/app/workers/click_house/event_authors_consistency_cron_worker.rb b/app/workers/click_house/event_authors_consistency_cron_worker.rb
index c35aadba593..62f64f2b9ff 100644
--- a/app/workers/click_house/event_authors_consistency_cron_worker.rb
+++ b/app/workers/click_house/event_authors_consistency_cron_worker.rb
@@ -5,8 +5,8 @@ module ClickHouse
class EventAuthorsConsistencyCronWorker
include ApplicationWorker
include ClickHouseWorker
+ include ClickHouse::Concerns::ConsistencyWorker # defines perform
include Gitlab::ExclusiveLeaseHelpers
- include Gitlab::Utils::StrongMemoize
idempotent!
queue_namespace :cronjob
@@ -14,83 +14,57 @@ module ClickHouse
worker_has_external_dependencies! # the worker interacts with a ClickHouse database
feature_category :value_stream_management
- MAX_TTL = 5.minutes.to_i
- MAX_RUNTIME = 150.seconds
MAX_AUTHOR_DELETIONS = 2000
- CLICK_HOUSE_BATCH_SIZE = 100_000
- POSTGRESQL_BATCH_SIZE = 2500
- def perform
- return unless enabled?
-
- runtime_limiter = Analytics::CycleAnalytics::RuntimeLimiter.new(MAX_RUNTIME)
-
- in_lock(self.class.to_s, ttl: MAX_TTL, retries: 0) do
- author_records_to_delete = []
- last_processed_id = 0
- iterator.each_batch(column: :author_id, of: CLICK_HOUSE_BATCH_SIZE) do |scope|
- query = scope.select(Arel.sql('DISTINCT author_id')).to_sql
- ids_from_click_house = connection.select(query).pluck('author_id').sort
-
- ids_from_click_house.each_slice(POSTGRESQL_BATCH_SIZE) do |ids|
- author_records_to_delete.concat(missing_user_ids(ids))
- last_processed_id = ids.last
-
- to_be_deleted_size = author_records_to_delete.size
- if to_be_deleted_size >= MAX_AUTHOR_DELETIONS
- metadata.merge!(status: :deletion_limit_reached, deletions: to_be_deleted_size)
- break
- end
-
- if runtime_limiter.over_time?
- metadata.merge!(status: :over_time, deletions: to_be_deleted_size)
- break
- end
- end
-
- break if limit_was_reached?
- end
+ private
- delete_records_from_click_house(author_records_to_delete)
+ def collect_values(ids)
+ missing_user_ids_from_batch = missing_user_ids(ids)
+ context[:last_processed_id] = missing_user_ids_from_batch.last
+ context[:author_records_to_delete].concat(missing_user_ids_from_batch)
- last_processed_id = 0 if table_fully_processed?
- ClickHouse::SyncCursor.update_cursor_for(:event_authors_consistency_check, last_processed_id)
+ to_be_deleted_size = context[:author_records_to_delete].size
+ metadata[:modifications] = to_be_deleted_size
- log_extra_metadata_on_done(:result, metadata)
+ if to_be_deleted_size >= MAX_AUTHOR_DELETIONS
+ metadata[:status] = :limit_reached
+ return
end
+
+ metadata[:status] = :over_time if runtime_limiter.over_time?
end
- private
+ def process_collected_values
+ ids = context[:author_records_to_delete]
+ query = ClickHouse::Client::Query.new(
+ raw_query: 'ALTER TABLE events DELETE WHERE author_id IN ({author_ids:Array(UInt64)})',
+ placeholders: { author_ids: ids.to_json }
+ )
- def metadata
- @metadata ||= { status: :processed, deletions: 0 }
- end
+ connection.execute(query)
- def limit_was_reached?
- metadata[:status] == :deletion_limit_reached || metadata[:status] == :over_time
- end
+ query = ClickHouse::Client::Query.new(
+ raw_query: 'ALTER TABLE event_authors DELETE WHERE author_id IN ({author_ids:Array(UInt64)})',
+ placeholders: { author_ids: ids.to_json }
+ )
- def table_fully_processed?
- metadata[:status] == :processed
+ connection.execute(query)
end
- def enabled?
- ClickHouse::Client.database_configured?(:main) && Feature.enabled?(:event_sync_worker_for_click_house)
+ def init_context
+ @context = { author_records_to_delete: [], last_processed_id: 0 }
end
- def previous_author_id
- value = ClickHouse::SyncCursor.cursor_for(:event_authors_consistency_check)
- value == 0 ? nil : value
+ def table
+ 'event_authors'
end
- strong_memoize_attr :previous_author_id
- def iterator
- builder = ClickHouse::QueryBuilder.new('event_authors')
- ClickHouse::Iterator.new(query_builder: builder, connection: connection, min_value: previous_author_id)
+ def batch_column
+ 'author_id'
end
- def connection
- @connection ||= ClickHouse::Connection.new(:main)
+ def pluck_column
+ 'author_id'
end
def missing_user_ids(ids)
@@ -100,22 +74,6 @@ module ClickHouse
.where('NOT EXISTS (SELECT 1 FROM users WHERE id = user_ids.id)')
.pluck(:id)
end
-
- def delete_records_from_click_house(ids)
- query = ClickHouse::Client::Query.new(
- raw_query: "ALTER TABLE events DELETE WHERE author_id IN ({author_ids:Array(UInt64)})",
- placeholders: { author_ids: ids.to_json }
- )
-
- connection.execute(query)
-
- query = ClickHouse::Client::Query.new(
- raw_query: "ALTER TABLE event_authors DELETE WHERE author_id IN ({author_ids:Array(UInt64)})",
- placeholders: { author_ids: ids.to_json }
- )
-
- connection.execute(query)
- end
end
# rubocop: enable CodeReuse/ActiveRecord
end