1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
|
# frozen_string_literal: true
module Ci
module Catalog
module Resources
# This table is used as a queue of catalog resources that need to be synchronized with `projects`.
# A PG trigger adds a SyncEvent when the associated `projects` record of a catalog resource
# updates any of the relevant columns referenced in `Ci::Catalog::Resource#sync_with_project`
# (DB function name: `insert_catalog_resource_sync_event`).
class SyncEvent < ::ApplicationRecord
include PartitionedTable
include IgnorableColumns
PARTITION_DURATION = 1.day
self.table_name = 'p_catalog_resource_sync_events'
self.primary_key = :id
self.sequence_name = :p_catalog_resource_sync_events_id_seq
ignore_column :partition_id, remove_never: true
belongs_to :catalog_resource, class_name: 'Ci::Catalog::Resource', inverse_of: :sync_events
belongs_to :project, inverse_of: :catalog_resource_sync_events
scope :for_partition, ->(partition) { where(partition_id: partition) }
scope :select_with_partition,
-> { select(:id, :catalog_resource_id, arel_table[:partition_id].as('partition')) }
scope :unprocessed_events, -> { select_with_partition.status_pending }
scope :preload_synced_relation, -> { preload(catalog_resource: :project) }
enum status: { pending: 1, processed: 2 }, _prefix: :status
partitioned_by :partition_id, strategy: :sliding_list,
next_partition_if: ->(active_partition) do
oldest_record_in_partition = Ci::Catalog::Resources::SyncEvent
.select(:id, :created_at)
.for_partition(active_partition.value)
.order(:id)
.limit(1)
.take
oldest_record_in_partition.present? &&
oldest_record_in_partition.created_at < PARTITION_DURATION.ago
end,
detach_partition_if: ->(partition) do
!Ci::Catalog::Resources::SyncEvent
.for_partition(partition.value)
.status_pending
.exists?
end
class << self
def mark_records_processed(records)
update_by_partition(records) do |partitioned_scope|
partitioned_scope.update_all(status: :processed)
end
end
def enqueue_worker
return unless Feature.enabled?(:ci_process_catalog_resource_sync_events)
::Ci::Catalog::Resources::ProcessSyncEventsWorker.perform_async # rubocop:disable CodeReuse/Worker -- Worker is scheduled in model callback functions
end
def upper_bound_count
select('COALESCE(MAX(id) - MIN(id) + 1, 0) AS upper_bound_count')
.status_pending.to_a.first.upper_bound_count
end
private
# You must use .select_with_partition before calling this method
# as it requires the partition to be explicitly selected.
def update_by_partition(records)
records.group_by(&:partition).each do |partition, records_within_partition|
partitioned_scope = status_pending
.for_partition(partition)
.where(id: records_within_partition.map(&:id))
yield(partitioned_scope)
end
end
end
end
end
end
end
|