Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'app/workers/click_house/events_sync_worker.rb')
-rw-r--r--app/workers/click_house/events_sync_worker.rb37
1 files changed, 27 insertions, 10 deletions
diff --git a/app/workers/click_house/events_sync_worker.rb b/app/workers/click_house/events_sync_worker.rb
index e884a43b1e3..21c10566a67 100644
--- a/app/workers/click_house/events_sync_worker.rb
+++ b/app/workers/click_house/events_sync_worker.rb
@@ -3,7 +3,9 @@
module ClickHouse
class EventsSyncWorker
include ApplicationWorker
+ include ClickHouseWorker
include Gitlab::ExclusiveLeaseHelpers
+ include Gitlab::Utils::StrongMemoize
idempotent!
queue_namespace :cronjob
@@ -91,8 +93,13 @@ module ClickHouse
)
end
+ def last_event_id_in_postgresql
+ Event.maximum(:id)
+ end
+ strong_memoize_attr :last_event_id_in_postgresql
+
def enabled?
- ClickHouse::Client.configuration.databases[:main].present? && Feature.enabled?(:event_sync_worker_for_click_house)
+ ClickHouse::Client.database_configured?(:main) && Feature.enabled?(:event_sync_worker_for_click_house)
end
def next_batch
@@ -110,24 +117,34 @@ module ClickHouse
def process_batch(context)
Enumerator.new do |yielder|
- has_data = false
- # rubocop: disable CodeReuse/ActiveRecord
- Event.where(Event.arel_table[:id].gt(context.last_record_id)).each_batch(of: BATCH_SIZE) do |relation|
- has_data = true
-
- relation.select(*EVENT_PROJECTIONS).each do |row|
+ has_more_data = false
+ batching_scope.each_batch(of: BATCH_SIZE) do |relation|
+ records = relation.select(*EVENT_PROJECTIONS).to_a
+ has_more_data = records.size == BATCH_SIZE
+ records.each do |row|
yielder << row
context.last_processed_id = row.id
break if context.record_limit_reached?
end
- break if context.over_time? || context.record_limit_reached?
+ break if context.over_time? || context.record_limit_reached? || !has_more_data
end
- context.no_more_records! if has_data == false
- # rubocop: enable CodeReuse/ActiveRecord
+ context.no_more_records! unless has_more_data
end
end
+
+ # rubocop: disable CodeReuse/ActiveRecord
+ def batching_scope
+ return Event.none unless last_event_id_in_postgresql
+
+ table = Event.arel_table
+
+ Event
+ .where(table[:id].gt(context.last_record_id))
+ .where(table[:id].lteq(last_event_id_in_postgresql))
+ end
+ # rubocop: enable CodeReuse/ActiveRecord
end
end