1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
|
# frozen_string_literal: true
module Ci
class RunnerManager < Ci::ApplicationRecord
include FromUnion
include RedisCacheable
include Ci::HasRunnerExecutor
# For legacy reasons, the table name is ci_runner_machines in the database
self.table_name = 'ci_runner_machines'
# The `UPDATE_CONTACT_COLUMN_EVERY` defines how often the Runner Machine DB entry can be updated
UPDATE_CONTACT_COLUMN_EVERY = (40.minutes)..(55.minutes)
belongs_to :runner
has_many :runner_manager_builds, inverse_of: :runner_manager, foreign_key: :runner_machine_id,
class_name: 'Ci::RunnerManagerBuild'
has_many :builds, through: :runner_manager_builds, class_name: 'Ci::Build'
belongs_to :runner_version, inverse_of: :runner_managers, primary_key: :version, foreign_key: :version,
class_name: 'Ci::RunnerVersion'
validates :runner, presence: true
validates :system_xid, presence: true, length: { maximum: 64 }
validates :version, length: { maximum: 2048 }
validates :revision, length: { maximum: 255 }
validates :platform, length: { maximum: 255 }
validates :architecture, length: { maximum: 255 }
validates :ip_address, length: { maximum: 1024 }
validates :config, json_schema: { filename: 'ci_runner_config' }
cached_attr_reader :version, :revision, :platform, :architecture, :ip_address, :contacted_at, :executor_type
# The `STALE_TIMEOUT` constant defines the how far past the last contact or creation date a runner manager
# will be considered stale
STALE_TIMEOUT = 7.days
scope :stale, -> do
created_some_time_ago = arel_table[:created_at].lteq(STALE_TIMEOUT.ago)
contacted_some_time_ago = arel_table[:contacted_at].lteq(STALE_TIMEOUT.ago)
from_union(
where(contacted_at: nil),
where(contacted_some_time_ago),
remove_duplicates: false).where(created_some_time_ago)
end
scope :for_runner, ->(runner_id) do
where(runner_id: runner_id)
end
scope :with_running_builds, -> do
where('EXISTS(?)',
Ci::Build.select(1)
.joins(:runner_manager_build)
.running
.where("#{::Ci::Build.quoted_table_name}.runner_id = #{quoted_table_name}.runner_id")
.where("#{::Ci::RunnerManagerBuild.quoted_table_name}.runner_machine_id = #{quoted_table_name}.id")
.limit(1)
)
end
scope :order_id_desc, -> { order(id: :desc) }
scope :with_version_prefix, ->(value) do
regex = version_regex_expression_for_version(value)
value += '.' if regex.end_with?('\.') && !value.end_with?('.')
substring = Arel::Nodes::NamedFunction.new('substring', [
Ci::RunnerManager.arel_table[:version],
Arel.sql("'#{regex}'::text")
])
where(substring.eq(sanitize_sql_like(value)))
end
scope :with_upgrade_status, ->(upgrade_status) do
joins(:runner_version).where(runner_version: { status: upgrade_status })
end
def self.online_contact_time_deadline
Ci::Runner.online_contact_time_deadline
end
def self.stale_deadline
STALE_TIMEOUT.ago
end
def self.aggregate_upgrade_status_by_runner_id
joins(:runner_version)
.group(:runner_id)
.maximum(:status)
.transform_values { |s| Ci::RunnerVersion.statuses.key(s).to_sym }
end
def heartbeat(values, update_contacted_at: true)
##
# We can safely ignore writes performed by a runner heartbeat. We do
# not want to upgrade database connection proxy to use the primary
# database after heartbeat write happens.
#
::Gitlab::Database::LoadBalancing::Session.without_sticky_writes do
values = values&.slice(:version, :revision, :platform, :architecture, :ip_address, :config, :executor) || {}
values[:contacted_at] = Time.current if update_contacted_at
if values.include?(:executor)
values[:executor_type] = Ci::Runner::EXECUTOR_NAME_TO_TYPES.fetch(values.delete(:executor), :unknown)
end
new_version = values[:version]
schedule_runner_version_update(new_version) if new_version && new_version != version
merge_cache_attributes(values)
# We save data without validation, it will always change due to `contacted_at`
update_columns(values) if persist_cached_data?
end
end
def status
return :stale if stale?
return :never_contacted unless contacted_at
online? ? :online : :offline
end
private
def online?
contacted_at && contacted_at > self.class.online_contact_time_deadline
end
def stale?
return false unless created_at
[created_at, contacted_at].compact.max <= self.class.stale_deadline
end
def persist_cached_data?
# Use a random threshold to prevent beating DB updates.
contacted_at_max_age = Random.rand(UPDATE_CONTACT_COLUMN_EVERY)
real_contacted_at = read_attribute(:contacted_at)
real_contacted_at.nil? ||
(Time.current - real_contacted_at) >= contacted_at_max_age
end
def schedule_runner_version_update(new_version)
return unless new_version && Gitlab::Ci::RunnerReleases.instance.enabled?
Ci::Runners::ProcessRunnerVersionUpdateWorker.perform_async(new_version)
end
def self.version_regex_expression_for_version(version)
case version
when /\d+\.\d+\.\d+/
'^\d+\.\d+\.\d+'
when /\d+\.\d+(\.)?/
'^\d+\.\d+\.'
else
'^\d+\.'
end
end
end
end
|