diff options
Diffstat (limited to 'app/workers/click_house/concerns/consistency_worker.rb')
-rw-r--r-- | app/workers/click_house/concerns/consistency_worker.rb | 108 |
1 files changed, 108 insertions, 0 deletions
diff --git a/app/workers/click_house/concerns/consistency_worker.rb b/app/workers/click_house/concerns/consistency_worker.rb new file mode 100644 index 00000000000..5fa1608ea2f --- /dev/null +++ b/app/workers/click_house/concerns/consistency_worker.rb @@ -0,0 +1,108 @@ +# frozen_string_literal: true + +module ClickHouse + module Concerns + # This module can be used for batching over a ClickHouse database table column + # and do something with the yielded values. The module is responsible for + # correctly restoring the state (cursor) in case the processing was + # interrupted or restart the processing from the beginning of the table + # when the table was fully processed. + # + # This class acts like a "template method" pattern where the implementor classes + # need to define two methods: + # + # - init_context: Returns a memoized hash, initializing the context that controls the data processing. + # - pluck_column: which column value to take from the ClickHouse DB when iterating + # - process_collected_values: once a limit is reached or no more data, do something + # - collect_values: filter, process and store the returned values from ClickHouse + # with the collected values. + module ConsistencyWorker + extend ActiveSupport::Concern + include Gitlab::Utils::StrongMemoize + + MAX_RUNTIME = 150.seconds + MAX_TTL = 5.minutes.to_i + CLICK_HOUSE_BATCH_SIZE = 100_000 + POSTGRESQL_BATCH_SIZE = 2500 + LIMIT_STATUSES = %i[limit_reached over_time].freeze + + included do + include Gitlab::ExclusiveLeaseHelpers + end + + def perform + return unless enabled? + + init_context + runtime_limiter + click_house_each_batch do |values| + collect_values(values) + + break if limit_was_reached? + end + + process_collected_values + + context[:last_processed_id] = 0 if table_fully_processed? + ClickHouse::SyncCursor.update_cursor_for(sync_cursor, context[:last_processed_id]) + log_extra_metadata_on_done(:result, metadata) + end + + private + + attr_reader :context + + def click_house_each_batch + in_lock(self.class.to_s, ttl: MAX_TTL, retries: 0) do + iterator.each_batch(column: batch_column, of: CLICK_HOUSE_BATCH_SIZE) do |scope| + query = scope.select(Arel.sql("DISTINCT #{pluck_column}")).to_sql + ids_from_click_house = connection.select(query).pluck(pluck_column).sort # rubocop: disable CodeReuse/ActiveRecord -- limited query + + ids_from_click_house.each_slice(POSTGRESQL_BATCH_SIZE) do |values| + yield values + end + end + end + end + + def enabled? + ClickHouse::Client.database_configured?(:main) && Feature.enabled?(:event_sync_worker_for_click_house) + end + + def runtime_limiter + @runtime_limiter ||= Analytics::CycleAnalytics::RuntimeLimiter.new(MAX_RUNTIME) + end + + def iterator + builder = ClickHouse::QueryBuilder.new(table.to_s) + ClickHouse::Iterator.new(query_builder: builder, connection: connection, min_value: previous_id) + end + + def sync_cursor + "#{table}_consistency_check" + end + + def previous_id + value = ClickHouse::SyncCursor.cursor_for(sync_cursor) + value == 0 ? nil : value + end + strong_memoize_attr :previous_id + + def metadata + @metadata ||= { status: :processed, modifications: 0 } + end + + def connection + @connection ||= ClickHouse::Connection.new(:main) + end + + def table_fully_processed? + metadata[:status] == :processed + end + + def limit_was_reached? + LIMIT_STATUSES.include?(metadata[:status]) + end + end + end +end |