diff options
Diffstat (limited to 'app/workers/click_house/events_sync_worker.rb')
-rw-r--r-- | app/workers/click_house/events_sync_worker.rb | 37 |
1 files changed, 27 insertions, 10 deletions
diff --git a/app/workers/click_house/events_sync_worker.rb b/app/workers/click_house/events_sync_worker.rb index e884a43b1e3..21c10566a67 100644 --- a/app/workers/click_house/events_sync_worker.rb +++ b/app/workers/click_house/events_sync_worker.rb @@ -3,7 +3,9 @@ module ClickHouse class EventsSyncWorker include ApplicationWorker + include ClickHouseWorker include Gitlab::ExclusiveLeaseHelpers + include Gitlab::Utils::StrongMemoize idempotent! queue_namespace :cronjob @@ -91,8 +93,13 @@ module ClickHouse ) end + def last_event_id_in_postgresql + Event.maximum(:id) + end + strong_memoize_attr :last_event_id_in_postgresql + def enabled? - ClickHouse::Client.configuration.databases[:main].present? && Feature.enabled?(:event_sync_worker_for_click_house) + ClickHouse::Client.database_configured?(:main) && Feature.enabled?(:event_sync_worker_for_click_house) end def next_batch @@ -110,24 +117,34 @@ module ClickHouse def process_batch(context) Enumerator.new do |yielder| - has_data = false - # rubocop: disable CodeReuse/ActiveRecord - Event.where(Event.arel_table[:id].gt(context.last_record_id)).each_batch(of: BATCH_SIZE) do |relation| - has_data = true - - relation.select(*EVENT_PROJECTIONS).each do |row| + has_more_data = false + batching_scope.each_batch(of: BATCH_SIZE) do |relation| + records = relation.select(*EVENT_PROJECTIONS).to_a + has_more_data = records.size == BATCH_SIZE + records.each do |row| yielder << row context.last_processed_id = row.id break if context.record_limit_reached? end - break if context.over_time? || context.record_limit_reached? + break if context.over_time? || context.record_limit_reached? || !has_more_data end - context.no_more_records! if has_data == false - # rubocop: enable CodeReuse/ActiveRecord + context.no_more_records! unless has_more_data end end + + # rubocop: disable CodeReuse/ActiveRecord + def batching_scope + return Event.none unless last_event_id_in_postgresql + + table = Event.arel_table + + Event + .where(table[:id].gt(context.last_record_id)) + .where(table[:id].lteq(last_event_id_in_postgresql)) + end + # rubocop: enable CodeReuse/ActiveRecord end end |