# frozen_string_literal: true module Gitlab module Database module LoadBalancing # A single database host used for load balancing. class Host attr_reader :pool, :last_checked_at, :intervals, :load_balancer, :host, :port delegate :connection, :release_connection, :enable_query_cache!, :disable_query_cache!, :query_cache_enabled, to: :pool CONNECTION_ERRORS = if defined?(PG) [ ActionView::Template::Error, ActiveRecord::StatementInvalid, PG::Error ].freeze else [ ActionView::Template::Error, ActiveRecord::StatementInvalid ].freeze end # host - The address of the database. # load_balancer - The LoadBalancer that manages this Host. def initialize(host, load_balancer, port: nil) @host = host @port = port @load_balancer = load_balancer @pool = load_balancer.create_replica_connection_pool( load_balancer.configuration.pool_size, host, port ) @online = true @last_checked_at = Time.zone.now interval = load_balancer.configuration.replica_check_interval @intervals = (interval..(interval * 2)).step(0.5).to_a end # Disconnects the pool, once all connections are no longer in use. # # timeout - The time after which the pool should be forcefully # disconnected. def disconnect!(timeout: 120) start_time = ::Gitlab::Metrics::System.monotonic_time while (::Gitlab::Metrics::System.monotonic_time - start_time) <= timeout break if pool.connections.none?(&:in_use?) sleep(2) end pool.disconnect! end def offline! ::Gitlab::Database::LoadBalancing::Logger.warn( event: :host_offline, message: 'Marking host as offline', db_host: @host, db_port: @port ) @online = false @pool.disconnect! end # Returns true if the host is online. def online? return @online unless check_replica_status? refresh_status if @online ::Gitlab::Database::LoadBalancing::Logger.info( event: :host_online, message: 'Host is online after replica status check', db_host: @host, db_port: @port ) else ::Gitlab::Database::LoadBalancing::Logger.warn( event: :host_offline, message: 'Host is offline after replica status check', db_host: @host, db_port: @port ) end @online rescue *CONNECTION_ERRORS offline! false end def refresh_status @online = replica_is_up_to_date? @last_checked_at = Time.zone.now end def check_replica_status? (Time.zone.now - last_checked_at) >= intervals.sample end def replica_is_up_to_date? replication_lag_below_threshold? || data_is_recent_enough? end def replication_lag_below_threshold? if (lag_time = replication_lag_time) lag_time <= load_balancer.configuration.max_replication_lag_time else false end end # Returns true if the replica has replicated enough data to be useful. def data_is_recent_enough? # It's possible for a replica to not replay WAL data for a while, # despite being up to date. This can happen when a primary does not # receive any writes for a while. # # To prevent this from happening we check if the lag size (in bytes) # of the replica is small enough for the replica to be useful. We # only do this if we haven't replicated in a while so we only need # to connect to the primary when truly necessary. if (lag_size = replication_lag_size) lag_size <= load_balancer.configuration.max_replication_difference else false end end # Returns the replication lag time of this secondary in seconds as a # float. # # This method will return nil if no lag time could be calculated. def replication_lag_time row = query_and_release('SELECT EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp()))::float as lag') row['lag'].to_f if row.any? end # Returns the number of bytes this secondary is lagging behind the # primary. # # This method will return nil if no lag size could be calculated. def replication_lag_size location = connection.quote(primary_write_location) row = query_and_release(<<-SQL.squish) SELECT pg_wal_lsn_diff(#{location}, pg_last_wal_replay_lsn())::float AS diff SQL row['diff'].to_i if row.any? rescue *CONNECTION_ERRORS nil end def primary_write_location load_balancer.primary_write_location end def database_replica_location row = query_and_release(<<-SQL.squish) SELECT pg_last_wal_replay_lsn()::text AS location SQL row['location'] if row.any? rescue *CONNECTION_ERRORS nil end # Returns true if this host has caught up to the given transaction # write location. # # location - The transaction write location as reported by a primary. def caught_up?(location) string = connection.quote(location) # In case the host is a primary pg_last_wal_replay_lsn/pg_last_xlog_replay_location() returns # NULL. The recovery check ensures we treat the host as up-to-date in # such a case. query = <<-SQL.squish SELECT NOT pg_is_in_recovery() OR pg_wal_lsn_diff(pg_last_wal_replay_lsn(), #{string}) >= 0 AS result SQL row = query_and_release(query) ::Gitlab::Utils.to_boolean(row['result']) rescue *CONNECTION_ERRORS false end def query_and_release(sql) connection.select_all(sql).first || {} rescue StandardError {} ensure release_connection end end end end end