Welcome to mirror list, hosted at ThFree Co, Russian Federation.

host.rb « load_balancing « database « gitlab « lib - gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: f8ed5fcd4ccee106e2bc34f3fe8f7805df1199b8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# frozen_string_literal: true

module Gitlab
  module Database
    module LoadBalancing
      # A single database host used for load balancing.
      class Host
        attr_reader :pool, :last_checked_at, :intervals, :load_balancer, :host, :port

        delegate :connection, :release_connection, :enable_query_cache!, :disable_query_cache!, :query_cache_enabled, to: :pool

        CONNECTION_ERRORS = [
          ActionView::Template::Error,
          ActiveRecord::StatementInvalid,
          ActiveRecord::ConnectionNotEstablished,
          PG::Error
        ].freeze

        # This query checks that the current user has permissions before we try and query logical replication status. We
        # also only allow >= PG14 because these views are only accessible to superuser before PG14 even if the
        # has_table_privilege says otherwise.
        CAN_TRACK_LOGICAL_LSN_QUERY = <<~SQL.squish.freeze
          SELECT
            has_table_privilege('pg_replication_origin_status', 'select')
            AND
            has_function_privilege('pg_show_replication_origin_status()', 'execute')
            AND current_setting('server_version_num', true)::int >= 140000
            AS allowed
        SQL

        # The following is necessary to handle a mix of logical and physical replicas. We assume that if they have
        # pg_replication_origin_status then they are a logical replica. In a logical replica we need to use
        # `remote_lsn` rather than `pg_last_wal_replay_lsn` in order for our LSN to be comparable to the source
        # cluster. This logic would be broken if we have 2 logical subscriptions or if we have a logical subscription
        # in the source primary cluster. Read more at https://gitlab.com/gitlab-org/gitlab/-/merge_requests/121621
        LATEST_LSN_WITH_LOGICAL_QUERY = <<~SQL.squish.freeze
          CASE
          WHEN (SELECT TRUE FROM pg_replication_origin_status) THEN
            (SELECT remote_lsn FROM pg_replication_origin_status)
          WHEN pg_is_in_recovery() THEN
            pg_last_wal_replay_lsn()
          ELSE
            pg_current_wal_insert_lsn()
          END
        SQL

        LATEST_LSN_WITHOUT_LOGICAL_QUERY = <<~SQL.squish.freeze
          CASE
          WHEN pg_is_in_recovery() THEN
            pg_last_wal_replay_lsn()
          ELSE
            pg_current_wal_insert_lsn()
          END
        SQL

        # host - The address of the database.
        # load_balancer - The LoadBalancer that manages this Host.
        def initialize(host, load_balancer, port: nil)
          @host = host
          @port = port
          @load_balancer = load_balancer
          @pool = load_balancer.create_replica_connection_pool(
            load_balancer.configuration.pool_size,
            host,
            port
          )
          @online = true
          @last_checked_at = Time.zone.now

          # Randomly somewhere in between interval and 2*interval we'll refresh the status of the host
          interval = load_balancer.configuration.replica_check_interval
          @intervals = (interval..(interval * 2)).step(0.5).to_a
        end

        # Disconnects the pool, once all connections are no longer in use.
        #
        # timeout - The time after which the pool should be forcefully
        #           disconnected.
        def disconnect!(timeout: 120)
          start_time = ::Gitlab::Metrics::System.monotonic_time

          while (::Gitlab::Metrics::System.monotonic_time - start_time) <= timeout
            break if pool.connections.none?(&:in_use?)

            sleep(2)
          end

          pool.disconnect!
        end

        def offline!
          ::Gitlab::Database::LoadBalancing::Logger.warn(
            event: :host_offline,
            message: 'Marking host as offline',
            db_host: @host,
            db_port: @port
          )

          @online = false
          @pool.disconnect!
        end

        # Returns true if the host is online.
        def online?
          return @online unless check_replica_status?

          refresh_status

          if @online
            ::Gitlab::Database::LoadBalancing::Logger.info(
              event: :host_online,
              message: 'Host is online after replica status check',
              db_host: @host,
              db_port: @port
            )
          else
            ::Gitlab::Database::LoadBalancing::Logger.warn(
              event: :host_offline,
              message: 'Host is offline after replica status check',
              db_host: @host,
              db_port: @port
            )
          end

          @online
        rescue *CONNECTION_ERRORS
          offline!
          false
        end

        def refresh_status
          @latest_lsn_query = nil # Periodically clear the cached @latest_lsn_query value in case permissions change
          @online = replica_is_up_to_date?
          @last_checked_at = Time.zone.now
        end

        def check_replica_status?
          (Time.zone.now - last_checked_at) >= intervals.sample
        end

        def replica_is_up_to_date?
          replication_lag_below_threshold? || data_is_recent_enough?
        end

        def replication_lag_below_threshold?
          if (lag_time = replication_lag_time)
            lag_time <= load_balancer.configuration.max_replication_lag_time
          else
            false
          end
        end

        # Returns true if the replica has replicated enough data to be useful.
        def data_is_recent_enough?
          # It's possible for a replica to not replay WAL data for a while,
          # despite being up to date. This can happen when a primary does not
          # receive any writes for a while.
          #
          # To prevent this from happening we check if the lag size (in bytes)
          # of the replica is small enough for the replica to be useful. We
          # only do this if we haven't replicated in a while so we only need
          # to connect to the primary when truly necessary.
          if (lag_size = replication_lag_size)
            lag_size <= load_balancer.configuration.max_replication_difference
          else
            false
          end
        end

        # Returns the replication lag time of this secondary in seconds as a
        # float.
        #
        # This method will return nil if no lag time could be calculated.
        def replication_lag_time
          row = query_and_release('SELECT EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp()))::float as lag')

          row['lag'].to_f if row.any?
        end

        # Returns the number of bytes this secondary is lagging behind the
        # primary.
        #
        # This method will return nil if no lag size could be calculated.
        def replication_lag_size(location = primary_write_location)
          location = connection.quote(location)

          row = query_and_release(<<-SQL.squish)
            SELECT pg_wal_lsn_diff(#{location}, (#{latest_lsn_query}))::float AS diff
          SQL

          row['diff'].to_i if row.any?
        rescue *CONNECTION_ERRORS
          nil
        end

        def primary_write_location
          load_balancer.primary_write_location
        end

        def database_replica_location
          row = query_and_release(<<-SQL.squish)
            SELECT pg_last_wal_replay_lsn()::text AS location
          SQL

          row['location'] if row.any?
        rescue *CONNECTION_ERRORS
          nil
        end

        # Returns true if this host has caught up to the given transaction
        # write location.
        #
        # location - The transaction write location as reported by a primary.
        def caught_up?(location)
          lag = replication_lag_size(location)
          lag.present? && lag.to_i <= 0
        end

        def query_and_release(sql)
          connection.select_all(sql).first || {}
        rescue StandardError
          {}
        ensure
          release_connection
        end

        private

        def can_track_logical_lsn?
          row = query_and_release(CAN_TRACK_LOGICAL_LSN_QUERY)

          ::Gitlab::Utils.to_boolean(row['allowed'])
        rescue *CONNECTION_ERRORS
          false
        end

        # The LATEST_LSN_WITH_LOGICAL query requires permissions that may not be present in self-managed configurations.
        # We fallback gracefully to the query that does not correctly handle logical replicas for such configurations.
        def latest_lsn_query
          @latest_lsn_query ||= can_track_logical_lsn? ? LATEST_LSN_WITH_LOGICAL_QUERY : LATEST_LSN_WITHOUT_LOGICAL_QUERY
        end
      end
    end
  end
end