diff options
Diffstat (limited to 'lib/gitlab/database')
23 files changed, 1814 insertions, 19 deletions
diff --git a/lib/gitlab/database/background_migration/batched_job.rb b/lib/gitlab/database/background_migration/batched_job.rb index 869b97b8ac0..9a1dc4ee17d 100644 --- a/lib/gitlab/database/background_migration/batched_job.rb +++ b/lib/gitlab/database/background_migration/batched_job.rb @@ -30,7 +30,7 @@ module Gitlab scope :successful_in_execution_order, -> { where.not(finished_at: nil).succeeded.order(:finished_at) } - delegate :aborted?, :job_class, :table_name, :column_name, :job_arguments, + delegate :job_class, :table_name, :column_name, :job_arguments, to: :batched_migration, prefix: :migration attribute :pause_ms, :integer, default: 100 diff --git a/lib/gitlab/database/background_migration/batched_migration.rb b/lib/gitlab/database/background_migration/batched_migration.rb index e85162f355e..36e89023c86 100644 --- a/lib/gitlab/database/background_migration/batched_migration.rb +++ b/lib/gitlab/database/background_migration/batched_migration.rb @@ -14,12 +14,20 @@ module Gitlab class_name: 'Gitlab::Database::BackgroundMigration::BatchedJob', foreign_key: :batched_background_migration_id + validates :job_arguments, uniqueness: { + scope: [:job_class_name, :table_name, :column_name] + } + scope :queue_order, -> { order(id: :asc) } + scope :queued, -> { where(status: [:active, :paused]) } + scope :for_configuration, ->(job_class_name, table_name, column_name, job_arguments) do + where(job_class_name: job_class_name, table_name: table_name, column_name: column_name) + .where("job_arguments = ?", job_arguments.to_json) # rubocop:disable Rails/WhereEquals + end enum status: { paused: 0, active: 1, - aborted: 2, finished: 3, failed: 4 } @@ -30,6 +38,14 @@ module Gitlab active.queue_order.first end + def self.successful_rows_counts(migrations) + BatchedJob + .succeeded + .where(batched_background_migration_id: migrations) + .group(:batched_background_migration_id) + .sum(:batch_size) + end + def interval_elapsed?(variance: 0) return true unless last_job diff --git a/lib/gitlab/database/consistency.rb b/lib/gitlab/database/consistency.rb index e99ea7a3232..17c16640e4c 100644 --- a/lib/gitlab/database/consistency.rb +++ b/lib/gitlab/database/consistency.rb @@ -4,28 +4,18 @@ module Gitlab module Database ## # This class is used to make it possible to ensure read consistency in - # GitLab EE without the need of overriding a lot of methods / classes / + # GitLab without the need of overriding a lot of methods / classes / # classs. # - # This is a CE class that does nothing in CE, because database load - # balancing is EE-only feature, but you can still use it in CE. It will - # start ensuring read consistency once it is overridden in EE. - # - # Using this class in CE helps to avoid creeping discrepancy between CE / - # EE only to force usage of the primary database in EE. - # class Consistency ## - # In CE there is no database load balancing, so all reads are expected to - # be consistent by the ACID guarantees of a single PostgreSQL instance. - # - # This method is overridden in EE. + # Within the block, disable the database load balancing for calls that + # require read consistency after recent writes. # def self.with_read_consistency(&block) - yield + ::Gitlab::Database::LoadBalancing::Session + .current.use_primary(&block) end end end end - -::Gitlab::Database::Consistency.singleton_class.prepend_mod_with('Gitlab::Database::Consistency') diff --git a/lib/gitlab/database/dynamic_model_helpers.rb b/lib/gitlab/database/dynamic_model_helpers.rb index 892f8291780..7439591be99 100644 --- a/lib/gitlab/database/dynamic_model_helpers.rb +++ b/lib/gitlab/database/dynamic_model_helpers.rb @@ -11,6 +11,25 @@ module Gitlab self.inheritance_column = :_type_disabled end end + + def each_batch(table_name, scope: ->(table) { table.all }, of: 1000) + if transaction_open? + raise <<~MSG.squish + each_batch should not run inside a transaction, you can disable + transactions by calling disable_ddl_transaction! in the body of + your migration class + MSG + end + + scope.call(define_batchable_model(table_name)) + .each_batch(of: of) { |batch| yield batch } + end + + def each_batch_range(table_name, scope: ->(table) { table.all }, of: 1000) + each_batch(table_name, scope: scope, of: of) do |batch| + yield batch.pluck('MIN(id), MAX(id)').first + end + end end end end diff --git a/lib/gitlab/database/load_balancing.rb b/lib/gitlab/database/load_balancing.rb new file mode 100644 index 00000000000..88743cd2e75 --- /dev/null +++ b/lib/gitlab/database/load_balancing.rb @@ -0,0 +1,142 @@ +# frozen_string_literal: true + +module Gitlab + module Database + module LoadBalancing + # The exceptions raised for connection errors. + CONNECTION_ERRORS = if defined?(PG) + [ + PG::ConnectionBad, + PG::ConnectionDoesNotExist, + PG::ConnectionException, + PG::ConnectionFailure, + PG::UnableToSend, + # During a failover this error may be raised when + # writing to a primary. + PG::ReadOnlySqlTransaction + ].freeze + else + [].freeze + end + + ProxyNotConfiguredError = Class.new(StandardError) + + # The connection proxy to use for load balancing (if enabled). + def self.proxy + unless @proxy + Gitlab::ErrorTracking.track_exception( + ProxyNotConfiguredError.new( + "Attempting to access the database load balancing proxy, but it wasn't configured.\n" \ + "Did you forget to call '#{self.name}.configure_proxy'?" + )) + end + + @proxy + end + + # Returns a Hash containing the load balancing configuration. + def self.configuration + Gitlab::Database.config[:load_balancing] || {} + end + + # Returns the maximum replica lag size in bytes. + def self.max_replication_difference + (configuration['max_replication_difference'] || 8.megabytes).to_i + end + + # Returns the maximum lag time for a replica. + def self.max_replication_lag_time + (configuration['max_replication_lag_time'] || 60.0).to_f + end + + # Returns the interval (in seconds) to use for checking the status of a + # replica. + def self.replica_check_interval + (configuration['replica_check_interval'] || 60).to_f + end + + # Returns the additional hosts to use for load balancing. + def self.hosts + configuration['hosts'] || [] + end + + def self.service_discovery_enabled? + configuration.dig('discover', 'record').present? + end + + def self.service_discovery_configuration + conf = configuration['discover'] || {} + + { + nameserver: conf['nameserver'] || 'localhost', + port: conf['port'] || 8600, + record: conf['record'], + record_type: conf['record_type'] || 'A', + interval: conf['interval'] || 60, + disconnect_timeout: conf['disconnect_timeout'] || 120, + use_tcp: conf['use_tcp'] || false + } + end + + def self.pool_size + Gitlab::Database.config[:pool] + end + + # Returns true if load balancing is to be enabled. + def self.enable? + return false if Gitlab::Runtime.rake? + return false if Gitlab::Runtime.sidekiq? && !Gitlab::Utils.to_boolean(ENV['ENABLE_LOAD_BALANCING_FOR_SIDEKIQ'], default: false) + return false unless self.configured? + + true + end + + # Returns true if load balancing has been configured. Since + # Sidekiq does not currently use load balancing, we + # may want Web application servers to detect replication lag by + # posting the write location of the database if load balancing is + # configured. + def self.configured? + hosts.any? || service_discovery_enabled? + end + + def self.start_service_discovery + return unless service_discovery_enabled? + + ServiceDiscovery.new(service_discovery_configuration).start + end + + # Configures proxying of requests. + def self.configure_proxy(proxy = ConnectionProxy.new(hosts)) + @proxy = proxy + + # This hijacks the "connection" method to ensure both + # `ActiveRecord::Base.connection` and all models use the same load + # balancing proxy. + ActiveRecord::Base.singleton_class.prepend(ActiveRecordProxy) + end + + def self.active_record_models + ActiveRecord::Base.descendants + end + + DB_ROLES = [ + ROLE_PRIMARY = :primary, + ROLE_REPLICA = :replica, + ROLE_UNKNOWN = :unknown + ].freeze + + # Returns the role (primary/replica) of the database the connection is + # connecting to. At the moment, the connection can only be retrieved by + # Gitlab::Database::LoadBalancer#read or #read_write or from the + # ActiveRecord directly. Therefore, if the load balancer doesn't + # recognize the connection, this method returns the primary role + # directly. In future, we may need to check for other sources. + def self.db_role_for_connection(connection) + return ROLE_PRIMARY if !enable? || @proxy.blank? + + proxy.load_balancer.db_role_for_connection(connection) + end + end + end +end diff --git a/lib/gitlab/database/load_balancing/active_record_proxy.rb b/lib/gitlab/database/load_balancing/active_record_proxy.rb new file mode 100644 index 00000000000..7763497e770 --- /dev/null +++ b/lib/gitlab/database/load_balancing/active_record_proxy.rb @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +module Gitlab + module Database + module LoadBalancing + # Module injected into ActiveRecord::Base to allow hijacking of the + # "connection" method. + module ActiveRecordProxy + def connection + LoadBalancing.proxy + end + end + end + end +end diff --git a/lib/gitlab/database/load_balancing/connection_proxy.rb b/lib/gitlab/database/load_balancing/connection_proxy.rb new file mode 100644 index 00000000000..3a09689a724 --- /dev/null +++ b/lib/gitlab/database/load_balancing/connection_proxy.rb @@ -0,0 +1,140 @@ +# frozen_string_literal: true + +# rubocop:disable GitlabSecurity/PublicSend + +module Gitlab + module Database + module LoadBalancing + # Redirecting of ActiveRecord connections. + # + # The ConnectionProxy class redirects ActiveRecord connection requests to + # the right load balancer pool, depending on the type of query. + class ConnectionProxy + WriteInsideReadOnlyTransactionError = Class.new(StandardError) + READ_ONLY_TRANSACTION_KEY = :load_balacing_read_only_transaction + + attr_reader :load_balancer + + # These methods perform writes after which we need to stick to the + # primary. + STICKY_WRITES = %i( + delete + delete_all + insert + update + update_all + ).freeze + + NON_STICKY_READS = %i( + sanitize_limit + select + select_one + select_rows + quote_column_name + ).freeze + + # hosts - The hosts to use for load balancing. + def initialize(hosts = []) + @load_balancer = LoadBalancer.new(hosts) + end + + def select_all(arel, name = nil, binds = [], preparable: nil) + if arel.respond_to?(:locked) && arel.locked + # SELECT ... FOR UPDATE queries should be sent to the primary. + write_using_load_balancer(:select_all, [arel, name, binds], + sticky: true) + else + read_using_load_balancer(:select_all, [arel, name, binds]) + end + end + + NON_STICKY_READS.each do |name| + define_method(name) do |*args, &block| + read_using_load_balancer(name, args, &block) + end + end + + STICKY_WRITES.each do |name| + define_method(name) do |*args, &block| + write_using_load_balancer(name, args, sticky: true, &block) + end + end + + def transaction(*args, &block) + if current_session.fallback_to_replicas_for_ambiguous_queries? + track_read_only_transaction! + read_using_load_balancer(:transaction, args, &block) + else + write_using_load_balancer(:transaction, args, sticky: true, &block) + end + + ensure + untrack_read_only_transaction! + end + + # Delegates all unknown messages to a read-write connection. + def method_missing(name, *args, &block) + if current_session.fallback_to_replicas_for_ambiguous_queries? + read_using_load_balancer(name, args, &block) + else + write_using_load_balancer(name, args, &block) + end + end + + # Performs a read using the load balancer. + # + # name - The name of the method to call on a connection object. + def read_using_load_balancer(name, args, &block) + if current_session.use_primary? && + !current_session.use_replicas_for_read_queries? + @load_balancer.read_write do |connection| + connection.send(name, *args, &block) + end + else + @load_balancer.read do |connection| + connection.send(name, *args, &block) + end + end + end + + # Performs a write using the load balancer. + # + # name - The name of the method to call on a connection object. + # sticky - If set to true the session will stick to the master after + # the write. + def write_using_load_balancer(name, args, sticky: false, &block) + if read_only_transaction? + raise WriteInsideReadOnlyTransactionError, 'A write query is performed inside a read-only transaction' + end + + @load_balancer.read_write do |connection| + # Sticking has to be enabled before calling the method. Not doing so + # could lead to methods called in a block still being performed on a + # secondary instead of on a primary (when necessary). + current_session.write! if sticky + + connection.send(name, *args, &block) + end + end + + private + + def current_session + ::Gitlab::Database::LoadBalancing::Session.current + end + + def track_read_only_transaction! + Thread.current[READ_ONLY_TRANSACTION_KEY] = true + end + + def untrack_read_only_transaction! + Thread.current[READ_ONLY_TRANSACTION_KEY] = nil + end + + def read_only_transaction? + Thread.current[READ_ONLY_TRANSACTION_KEY] == true + end + end + end + end +end diff --git a/lib/gitlab/database/load_balancing/host.rb b/lib/gitlab/database/load_balancing/host.rb new file mode 100644 index 00000000000..3e74b5ea727 --- /dev/null +++ b/lib/gitlab/database/load_balancing/host.rb @@ -0,0 +1,209 @@ +# frozen_string_literal: true + +module Gitlab + module Database + module LoadBalancing + # A single database host used for load balancing. + class Host + attr_reader :pool, :last_checked_at, :intervals, :load_balancer, :host, :port + + delegate :connection, :release_connection, :enable_query_cache!, :disable_query_cache!, :query_cache_enabled, to: :pool + + CONNECTION_ERRORS = + if defined?(PG) + [ + ActionView::Template::Error, + ActiveRecord::StatementInvalid, + PG::Error + ].freeze + else + [ + ActionView::Template::Error, + ActiveRecord::StatementInvalid + ].freeze + end + + # host - The address of the database. + # load_balancer - The LoadBalancer that manages this Host. + def initialize(host, load_balancer, port: nil) + @host = host + @port = port + @load_balancer = load_balancer + @pool = Database.create_connection_pool(LoadBalancing.pool_size, host, port) + @online = true + @last_checked_at = Time.zone.now + + interval = LoadBalancing.replica_check_interval + @intervals = (interval..(interval * 2)).step(0.5).to_a + end + + # Disconnects the pool, once all connections are no longer in use. + # + # timeout - The time after which the pool should be forcefully + # disconnected. + def disconnect!(timeout = 120) + start_time = Metrics::System.monotonic_time + + while (Metrics::System.monotonic_time - start_time) <= timeout + break if pool.connections.none?(&:in_use?) + + sleep(2) + end + + pool.disconnect! + end + + def offline! + LoadBalancing::Logger.warn( + event: :host_offline, + message: 'Marking host as offline', + db_host: @host, + db_port: @port + ) + + @online = false + @pool.disconnect! + end + + # Returns true if the host is online. + def online? + return @online unless check_replica_status? + + refresh_status + + if @online + LoadBalancing::Logger.info( + event: :host_online, + message: 'Host is online after replica status check', + db_host: @host, + db_port: @port + ) + else + LoadBalancing::Logger.warn( + event: :host_offline, + message: 'Host is offline after replica status check', + db_host: @host, + db_port: @port + ) + end + + @online + rescue *CONNECTION_ERRORS + offline! + false + end + + def refresh_status + @online = replica_is_up_to_date? + @last_checked_at = Time.zone.now + end + + def check_replica_status? + (Time.zone.now - last_checked_at) >= intervals.sample + end + + def replica_is_up_to_date? + replication_lag_below_threshold? || data_is_recent_enough? + end + + def replication_lag_below_threshold? + if (lag_time = replication_lag_time) + lag_time <= LoadBalancing.max_replication_lag_time + else + false + end + end + + # Returns true if the replica has replicated enough data to be useful. + def data_is_recent_enough? + # It's possible for a replica to not replay WAL data for a while, + # despite being up to date. This can happen when a primary does not + # receive any writes for a while. + # + # To prevent this from happening we check if the lag size (in bytes) + # of the replica is small enough for the replica to be useful. We + # only do this if we haven't replicated in a while so we only need + # to connect to the primary when truly necessary. + if (lag_size = replication_lag_size) + lag_size <= LoadBalancing.max_replication_difference + else + false + end + end + + # Returns the replication lag time of this secondary in seconds as a + # float. + # + # This method will return nil if no lag time could be calculated. + def replication_lag_time + row = query_and_release('SELECT EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp()))::float as lag') + + row['lag'].to_f if row.any? + end + + # Returns the number of bytes this secondary is lagging behind the + # primary. + # + # This method will return nil if no lag size could be calculated. + def replication_lag_size + location = connection.quote(primary_write_location) + row = query_and_release(<<-SQL.squish) + SELECT pg_wal_lsn_diff(#{location}, pg_last_wal_replay_lsn())::float + AS diff + SQL + + row['diff'].to_i if row.any? + rescue *CONNECTION_ERRORS + nil + end + + def primary_write_location + load_balancer.primary_write_location + ensure + load_balancer.release_primary_connection + end + + def database_replica_location + row = query_and_release(<<-SQL.squish) + SELECT pg_last_wal_replay_lsn()::text AS location + SQL + + row['location'] if row.any? + rescue *CONNECTION_ERRORS + nil + end + + # Returns true if this host has caught up to the given transaction + # write location. + # + # location - The transaction write location as reported by a primary. + def caught_up?(location) + string = connection.quote(location) + + # In case the host is a primary pg_last_wal_replay_lsn/pg_last_xlog_replay_location() returns + # NULL. The recovery check ensures we treat the host as up-to-date in + # such a case. + query = <<-SQL.squish + SELECT NOT pg_is_in_recovery() + OR pg_wal_lsn_diff(pg_last_wal_replay_lsn(), #{string}) >= 0 + AS result + SQL + + row = query_and_release(query) + + ::Gitlab::Utils.to_boolean(row['result']) + rescue *CONNECTION_ERRORS + false + end + + def query_and_release(sql) + connection.select_all(sql).first || {} + rescue StandardError + {} + ensure + release_connection + end + end + end + end +end diff --git a/lib/gitlab/database/load_balancing/host_list.rb b/lib/gitlab/database/load_balancing/host_list.rb new file mode 100644 index 00000000000..24800012947 --- /dev/null +++ b/lib/gitlab/database/load_balancing/host_list.rb @@ -0,0 +1,99 @@ +# frozen_string_literal: true + +module Gitlab + module Database + module LoadBalancing + # A list of database hosts to use for connections. + class HostList + # hosts - The list of secondary hosts to add. + def initialize(hosts = []) + @hosts = hosts.shuffle + @pools = Set.new + @index = 0 + @mutex = Mutex.new + @hosts_gauge = Gitlab::Metrics.gauge(:db_load_balancing_hosts, 'Current number of load balancing hosts') + + set_metrics! + update_pools + end + + def hosts + @mutex.synchronize { @hosts.dup } + end + + def shuffle + @mutex.synchronize do + unsafe_shuffle + end + end + + def length + @mutex.synchronize { @hosts.length } + end + + def host_names_and_ports + @mutex.synchronize { @hosts.map { |host| [host.host, host.port] } } + end + + def manage_pool?(pool) + @pools.include?(pool) + end + + def hosts=(hosts) + @mutex.synchronize do + @hosts = hosts + unsafe_shuffle + update_pools + end + + set_metrics! + end + + # Sets metrics before returning next host + def next + next_host.tap do |_| + set_metrics! + end + end + + private + + def unsafe_shuffle + @hosts = @hosts.shuffle + @index = 0 + end + + # Returns the next available host. + # + # Returns a Gitlab::Database::LoadBalancing::Host instance, or nil if no + # hosts were available. + def next_host + @mutex.synchronize do + break if @hosts.empty? + + started_at = @index + + loop do + host = @hosts[@index] + @index = (@index + 1) % @hosts.length + + break host if host.online? + + # Return nil once we have cycled through all hosts and none were + # available. + break if @index == started_at + end + end + end + + def set_metrics! + @hosts_gauge.set({}, @hosts.length) + end + + def update_pools + @pools = Set.new(@hosts.map(&:pool)) + end + end + end + end +end diff --git a/lib/gitlab/database/load_balancing/load_balancer.rb b/lib/gitlab/database/load_balancing/load_balancer.rb new file mode 100644 index 00000000000..a833bb8491f --- /dev/null +++ b/lib/gitlab/database/load_balancing/load_balancer.rb @@ -0,0 +1,275 @@ +# frozen_string_literal: true + +module Gitlab + module Database + module LoadBalancing + # Load balancing for ActiveRecord connections. + # + # Each host in the load balancer uses the same credentials as the primary + # database. + # + # This class *requires* that `ActiveRecord::Base.retrieve_connection` + # always returns a connection to the primary. + class LoadBalancer + CACHE_KEY = :gitlab_load_balancer_host + VALID_HOSTS_CACHE_KEY = :gitlab_load_balancer_valid_hosts + + attr_reader :host_list + + # hosts - The hostnames/addresses of the additional databases. + def initialize(hosts = []) + @host_list = HostList.new(hosts.map { |addr| Host.new(addr, self) }) + @connection_db_roles = {}.compare_by_identity + @connection_db_roles_count = {}.compare_by_identity + end + + # Yields a connection that can be used for reads. + # + # If no secondaries were available this method will use the primary + # instead. + def read(&block) + connection = nil + conflict_retried = 0 + + while host + ensure_caching! + + begin + connection = host.connection + track_connection_role(connection, ROLE_REPLICA) + + return yield connection + rescue StandardError => error + untrack_connection_role(connection) + + if serialization_failure?(error) + # This error can occur when a query conflicts. See + # https://www.postgresql.org/docs/current/static/hot-standby.html#HOT-STANDBY-CONFLICT + # for more information. + # + # In this event we'll cycle through the secondaries at most 3 + # times before using the primary instead. + will_retry = conflict_retried < @host_list.length * 3 + + LoadBalancing::Logger.warn( + event: :host_query_conflict, + message: 'Query conflict on host', + conflict_retried: conflict_retried, + will_retry: will_retry, + db_host: host.host, + db_port: host.port, + host_list_length: @host_list.length + ) + + if will_retry + conflict_retried += 1 + release_host + else + break + end + elsif connection_error?(error) + host.offline! + release_host + else + raise error + end + end + end + + LoadBalancing::Logger.warn( + event: :no_secondaries_available, + message: 'No secondaries were available, using primary instead', + conflict_retried: conflict_retried, + host_list_length: @host_list.length + ) + + read_write(&block) + ensure + untrack_connection_role(connection) + end + + # Yields a connection that can be used for both reads and writes. + def read_write + connection = nil + # In the event of a failover the primary may be briefly unavailable. + # Instead of immediately grinding to a halt we'll retry the operation + # a few times. + retry_with_backoff do + connection = ActiveRecord::Base.retrieve_connection + track_connection_role(connection, ROLE_PRIMARY) + + yield connection + end + ensure + untrack_connection_role(connection) + end + + # Recognize the role (primary/replica) of the database this connection + # is connecting to. If the connection is not issued by this load + # balancer, return nil + def db_role_for_connection(connection) + return @connection_db_roles[connection] if @connection_db_roles[connection] + return ROLE_REPLICA if @host_list.manage_pool?(connection.pool) + return ROLE_PRIMARY if connection.pool == ActiveRecord::Base.connection_pool + end + + # Returns a host to use for queries. + # + # Hosts are scoped per thread so that multiple threads don't + # accidentally re-use the same host + connection. + def host + RequestStore[CACHE_KEY] ||= current_host_list.next + end + + # Releases the host and connection for the current thread. + def release_host + if host = RequestStore[CACHE_KEY] + host.disable_query_cache! + host.release_connection + end + + RequestStore.delete(CACHE_KEY) + RequestStore.delete(VALID_HOSTS_CACHE_KEY) + end + + def release_primary_connection + ActiveRecord::Base.connection_pool.release_connection + end + + # Returns the transaction write location of the primary. + def primary_write_location + location = read_write do |connection| + ::Gitlab::Database.get_write_location(connection) + end + + return location if location + + raise 'Failed to determine the write location of the primary database' + end + + # Returns true if all hosts have caught up to the given transaction + # write location. + def all_caught_up?(location) + @host_list.hosts.all? { |host| host.caught_up?(location) } + end + + # Returns true if there was at least one host that has caught up with the given transaction. + # + # In case of a retry, this method also stores the set of hosts that have caught up. + def select_caught_up_hosts(location) + all_hosts = @host_list.hosts + valid_hosts = all_hosts.select { |host| host.caught_up?(location) } + + return false if valid_hosts.empty? + + # Hosts can come online after the time when this scan was done, + # so we need to remember the ones that can be used. If the host went + # offline, we'll just rely on the retry mechanism to use the primary. + set_consistent_hosts_for_request(HostList.new(valid_hosts)) + + # Since we will be using a subset from the original list, let's just + # pick a random host and mix up the original list to ensure we don't + # only end up using one replica. + RequestStore[CACHE_KEY] = valid_hosts.sample + @host_list.shuffle + + true + end + + # Returns true if there was at least one host that has caught up with the given transaction. + # Similar to `#select_caught_up_hosts`, picks a random host, to rotate replicas we use. + # Unlike `#select_caught_up_hosts`, does not iterate over all hosts if finds any. + def select_up_to_date_host(location) + all_hosts = @host_list.hosts.shuffle + host = all_hosts.find { |host| host.caught_up?(location) } + + return false unless host + + RequestStore[CACHE_KEY] = host + + true + end + + def set_consistent_hosts_for_request(hosts) + RequestStore[VALID_HOSTS_CACHE_KEY] = hosts + end + + # Yields a block, retrying it upon error using an exponential backoff. + def retry_with_backoff(retries = 3, time = 2) + retried = 0 + last_error = nil + + while retried < retries + begin + return yield + rescue StandardError => error + raise error unless connection_error?(error) + + # We need to release the primary connection as otherwise Rails + # will keep raising errors when using the connection. + release_primary_connection + + last_error = error + sleep(time) + retried += 1 + time **= 2 + end + end + + raise last_error + end + + def connection_error?(error) + case error + when ActiveRecord::StatementInvalid, ActionView::Template::Error + # After connecting to the DB Rails will wrap query errors using this + # class. + connection_error?(error.cause) + when *CONNECTION_ERRORS + true + else + # When PG tries to set the client encoding but fails due to a + # connection error it will raise a PG::Error instance. Catching that + # would catch all errors (even those we don't want), so instead we + # check for the message of the error. + error.message.start_with?('invalid encoding name:') + end + end + + def serialization_failure?(error) + if error.cause + serialization_failure?(error.cause) + else + error.is_a?(PG::TRSerializationFailure) + end + end + + private + + def ensure_caching! + host.enable_query_cache! unless host.query_cache_enabled + end + + def track_connection_role(connection, role) + @connection_db_roles[connection] = role + @connection_db_roles_count[connection] ||= 0 + @connection_db_roles_count[connection] += 1 + end + + def untrack_connection_role(connection) + return if connection.blank? || @connection_db_roles_count[connection].blank? + + @connection_db_roles_count[connection] -= 1 + if @connection_db_roles_count[connection] <= 0 + @connection_db_roles.delete(connection) + @connection_db_roles_count.delete(connection) + end + end + + def current_host_list + RequestStore[VALID_HOSTS_CACHE_KEY] || @host_list + end + end + end + end +end diff --git a/lib/gitlab/database/load_balancing/logger.rb b/lib/gitlab/database/load_balancing/logger.rb new file mode 100644 index 00000000000..ee67ffcc99c --- /dev/null +++ b/lib/gitlab/database/load_balancing/logger.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true + +module Gitlab + module Database + module LoadBalancing + class Logger < ::Gitlab::JsonLogger + def self.file_name_noext + 'database_load_balancing' + end + end + end + end +end diff --git a/lib/gitlab/database/load_balancing/rack_middleware.rb b/lib/gitlab/database/load_balancing/rack_middleware.rb new file mode 100644 index 00000000000..4734ff99bd3 --- /dev/null +++ b/lib/gitlab/database/load_balancing/rack_middleware.rb @@ -0,0 +1,98 @@ +# frozen_string_literal: true + +module Gitlab + module Database + module LoadBalancing + # Rack middleware to handle sticking when serving Rails requests. Grape + # API calls are handled separately as different API endpoints need to + # stick based on different objects. + class RackMiddleware + STICK_OBJECT = 'load_balancing.stick_object' + + # Unsticks or continues sticking the current request. + # + # This method also updates the Rack environment so #call can later + # determine if we still need to stick or not. + # + # env - The Rack environment. + # namespace - The namespace to use for sticking. + # id - The identifier to use for sticking. + def self.stick_or_unstick(env, namespace, id) + return unless LoadBalancing.enable? + + Sticking.unstick_or_continue_sticking(namespace, id) + + env[STICK_OBJECT] ||= Set.new + env[STICK_OBJECT] << [namespace, id] + end + + def initialize(app) + @app = app + end + + def call(env) + # Ensure that any state that may have run before the first request + # doesn't linger around. + clear + + unstick_or_continue_sticking(env) + + result = @app.call(env) + + stick_if_necessary(env) + + result + ensure + clear + end + + # Determine if we need to stick based on currently available user data. + # + # Typically this code will only be reachable for Rails requests as + # Grape data is not yet available at this point. + def unstick_or_continue_sticking(env) + namespaces_and_ids = sticking_namespaces_and_ids(env) + + namespaces_and_ids.each do |namespace, id| + Sticking.unstick_or_continue_sticking(namespace, id) + end + end + + # Determine if we need to stick after handling a request. + def stick_if_necessary(env) + namespaces_and_ids = sticking_namespaces_and_ids(env) + + namespaces_and_ids.each do |namespace, id| + Sticking.stick_if_necessary(namespace, id) + end + end + + def clear + load_balancer.release_host + Session.clear_session + end + + def load_balancer + LoadBalancing.proxy.load_balancer + end + + # Determines the sticking namespace and identifier based on the Rack + # environment. + # + # For Rails requests this uses warden, but Grape and others have to + # manually set the right environment variable. + def sticking_namespaces_and_ids(env) + warden = env['warden'] + + if warden && warden.user + [[:user, warden.user.id]] + elsif env[STICK_OBJECT].present? + env[STICK_OBJECT].to_a + else + [] + end + end + end + end + end +end diff --git a/lib/gitlab/database/load_balancing/resolver.rb b/lib/gitlab/database/load_balancing/resolver.rb new file mode 100644 index 00000000000..a291080cc3d --- /dev/null +++ b/lib/gitlab/database/load_balancing/resolver.rb @@ -0,0 +1,52 @@ +# frozen_string_literal: true + +require 'net/dns' +require 'resolv' + +module Gitlab + module Database + module LoadBalancing + class Resolver + UnresolvableNameserverError = Class.new(StandardError) + + def initialize(nameserver) + @nameserver = nameserver + end + + def resolve + address = ip_address || ip_address_from_hosts_file || + ip_address_from_dns + + unless address + raise UnresolvableNameserverError, + "could not resolve #{@nameserver}" + end + + address + end + + private + + def ip_address + IPAddr.new(@nameserver) + rescue IPAddr::InvalidAddressError + end + + def ip_address_from_hosts_file + ip = Resolv::Hosts.new.getaddress(@nameserver) + IPAddr.new(ip) + rescue Resolv::ResolvError + end + + def ip_address_from_dns + answer = Net::DNS::Resolver.start(@nameserver, Net::DNS::A).answer + return if answer.empty? + + answer.first.address + rescue Net::DNS::Resolver::NoResponseError + raise UnresolvableNameserverError, "no response from DNS server(s)" + end + end + end + end +end diff --git a/lib/gitlab/database/load_balancing/service_discovery.rb b/lib/gitlab/database/load_balancing/service_discovery.rb new file mode 100644 index 00000000000..9b42b25be1c --- /dev/null +++ b/lib/gitlab/database/load_balancing/service_discovery.rb @@ -0,0 +1,187 @@ +# frozen_string_literal: true + +require 'net/dns' +require 'resolv' + +module Gitlab + module Database + module LoadBalancing + # Service discovery of secondary database hosts. + # + # Service discovery works by periodically looking up a DNS record. If the + # DNS record returns a new list of hosts, this class will update the load + # balancer with said hosts. Requests may continue to use the old hosts + # until they complete. + class ServiceDiscovery + attr_reader :interval, :record, :record_type, :disconnect_timeout + + MAX_SLEEP_ADJUSTMENT = 10 + + RECORD_TYPES = { + 'A' => Net::DNS::A, + 'SRV' => Net::DNS::SRV + }.freeze + + Address = Struct.new(:hostname, :port) do + def to_s + port ? "#{hostname}:#{port}" : hostname + end + + def <=>(other) + self.to_s <=> other.to_s + end + end + + # nameserver - The nameserver to use for DNS lookups. + # port - The port of the nameserver. + # record - The DNS record to look up for retrieving the secondaries. + # record_type - The type of DNS record to look up + # interval - The time to wait between lookups. + # disconnect_timeout - The time after which an old host should be + # forcefully disconnected. + # use_tcp - Use TCP instaed of UDP to look up resources + def initialize(nameserver:, port:, record:, record_type: 'A', interval: 60, disconnect_timeout: 120, use_tcp: false) + @nameserver = nameserver + @port = port + @record = record + @record_type = record_type_for(record_type) + @interval = interval + @disconnect_timeout = disconnect_timeout + @use_tcp = use_tcp + end + + def start + Thread.new do + loop do + interval = + begin + refresh_if_necessary + rescue StandardError => error + # Any exceptions that might occur should be reported to + # Sentry, instead of silently terminating this thread. + Gitlab::ErrorTracking.track_exception(error) + + Gitlab::AppLogger.error( + "Service discovery encountered an error: #{error.message}" + ) + + self.interval + end + + # We slightly randomize the sleep() interval. This should reduce + # the likelihood of _all_ processes refreshing at the same time, + # possibly putting unnecessary pressure on the DNS server. + sleep(interval + rand(MAX_SLEEP_ADJUSTMENT)) + end + end + end + + # Refreshes the hosts, but only if the DNS record returned a new list of + # addresses. + # + # The return value is the amount of time (in seconds) to wait before + # checking the DNS record for any changes. + def refresh_if_necessary + interval, from_dns = addresses_from_dns + + current = addresses_from_load_balancer + + replace_hosts(from_dns) if from_dns != current + + interval + end + + # Replaces all the hosts in the load balancer with the new ones, + # disconnecting the old connections. + # + # addresses - An Array of Address structs to use for the new hosts. + def replace_hosts(addresses) + old_hosts = load_balancer.host_list.hosts + + load_balancer.host_list.hosts = addresses.map do |addr| + Host.new(addr.hostname, load_balancer, port: addr.port) + end + + # We must explicitly disconnect the old connections, otherwise we may + # leak database connections over time. For example, if a request + # started just before we added the new hosts it will use an old + # host/connection. While this connection will be checked in and out, + # it won't be explicitly disconnected. + old_hosts.each do |host| + host.disconnect!(disconnect_timeout) + end + end + + # Returns an Array containing: + # + # 1. The time to wait for the next check. + # 2. An array containing the hostnames of the DNS record. + def addresses_from_dns + response = resolver.search(record, record_type) + resources = response.answer + + addresses = + case record_type + when Net::DNS::A + addresses_from_a_record(resources) + when Net::DNS::SRV + addresses_from_srv_record(response) + end + + # Addresses are sorted so we can directly compare the old and new + # addresses, without having to use any additional data structures. + [new_wait_time_for(resources), addresses.sort] + end + + def new_wait_time_for(resources) + wait = resources.first&.ttl || interval + + # The preconfigured interval acts as a minimum amount of time to + # wait. + wait < interval ? interval : wait + end + + def addresses_from_load_balancer + load_balancer.host_list.host_names_and_ports.map do |hostname, port| + Address.new(hostname, port) + end.sort + end + + def load_balancer + LoadBalancing.proxy.load_balancer + end + + def resolver + @resolver ||= Net::DNS::Resolver.new( + nameservers: Resolver.new(@nameserver).resolve, + port: @port, + use_tcp: @use_tcp + ) + end + + private + + def record_type_for(type) + RECORD_TYPES.fetch(type) do + raise(ArgumentError, "Unsupported record type: #{type}") + end + end + + def addresses_from_srv_record(response) + srv_resolver = SrvResolver.new(resolver, response.additional) + + response.answer.map do |r| + address = srv_resolver.address_for(r.host.to_s) + next unless address + + Address.new(address.to_s, r.port) + end.compact + end + + def addresses_from_a_record(resources) + resources.map { |r| Address.new(r.address.to_s) } + end + end + end + end +end diff --git a/lib/gitlab/database/load_balancing/session.rb b/lib/gitlab/database/load_balancing/session.rb new file mode 100644 index 00000000000..3682c9265c2 --- /dev/null +++ b/lib/gitlab/database/load_balancing/session.rb @@ -0,0 +1,118 @@ +# frozen_string_literal: true + +module Gitlab + module Database + module LoadBalancing + # Tracking of load balancing state per user session. + # + # A session starts at the beginning of a request and ends once the request + # has been completed. Sessions can be used to keep track of what hosts + # should be used for queries. + class Session + CACHE_KEY = :gitlab_load_balancer_session + + def self.current + RequestStore[CACHE_KEY] ||= new + end + + def self.clear_session + RequestStore.delete(CACHE_KEY) + end + + def self.without_sticky_writes(&block) + current.ignore_writes(&block) + end + + def initialize + @use_primary = false + @performed_write = false + @ignore_writes = false + @fallback_to_replicas_for_ambiguous_queries = false + @use_replicas_for_read_queries = false + end + + def use_primary? + @use_primary + end + + alias_method :using_primary?, :use_primary? + + def use_primary! + @use_primary = true + end + + def use_primary(&blk) + used_primary = @use_primary + @use_primary = true + yield + ensure + @use_primary = used_primary || @performed_write + end + + def ignore_writes(&block) + @ignore_writes = true + + yield + ensure + @ignore_writes = false + end + + # Indicates that the read SQL statements from anywhere inside this + # blocks should use a replica, regardless of the current primary + # stickiness or whether a write query is already performed in the + # current session. This interface is reserved mostly for performance + # purpose. This is a good tool to push expensive queries, which can + # tolerate the replica lags, to the replicas. + # + # Write and ambiguous queries inside this block are still handled by + # the primary. + def use_replicas_for_read_queries(&blk) + previous_flag = @use_replicas_for_read_queries + @use_replicas_for_read_queries = true + yield + ensure + @use_replicas_for_read_queries = previous_flag + end + + def use_replicas_for_read_queries? + @use_replicas_for_read_queries == true + end + + # Indicate that the ambiguous SQL statements from anywhere inside this + # block should use a replica. The ambiguous statements include: + # - Transactions. + # - Custom queries (via exec_query, execute, etc.) + # - In-flight connection configuration change (SET LOCAL statement_timeout = 5000) + # + # This is a weak enforcement. This helper incorporates well with + # primary stickiness: + # - If the queries are about to write + # - The current session already performed writes + # - It prefers to use primary, aka, use_primary or use_primary! were called + def fallback_to_replicas_for_ambiguous_queries(&blk) + previous_flag = @fallback_to_replicas_for_ambiguous_queries + @fallback_to_replicas_for_ambiguous_queries = true + yield + ensure + @fallback_to_replicas_for_ambiguous_queries = previous_flag + end + + def fallback_to_replicas_for_ambiguous_queries? + @fallback_to_replicas_for_ambiguous_queries == true && !use_primary? && !performed_write? + end + + def write! + @performed_write = true + + return if @ignore_writes + + use_primary! + end + + def performed_write? + @performed_write + end + end + end + end +end diff --git a/lib/gitlab/database/load_balancing/sidekiq_client_middleware.rb b/lib/gitlab/database/load_balancing/sidekiq_client_middleware.rb new file mode 100644 index 00000000000..524d69c00c0 --- /dev/null +++ b/lib/gitlab/database/load_balancing/sidekiq_client_middleware.rb @@ -0,0 +1,46 @@ +# frozen_string_literal: true + +module Gitlab + module Database + module LoadBalancing + class SidekiqClientMiddleware + def call(worker_class, job, _queue, _redis_pool) + worker_class = worker_class.to_s.safe_constantize + + mark_data_consistency_location(worker_class, job) + + yield + end + + private + + def mark_data_consistency_location(worker_class, job) + # Mailers can't be constantized + return unless worker_class + return unless worker_class.include?(::ApplicationWorker) + return unless worker_class.get_data_consistency_feature_flag_enabled? + + return if location_already_provided?(job) + + job['worker_data_consistency'] = worker_class.get_data_consistency + + return unless worker_class.utilizes_load_balancing_capabilities? + + if Session.current.use_primary? + job['database_write_location'] = load_balancer.primary_write_location + else + job['database_replica_location'] = load_balancer.host.database_replica_location + end + end + + def location_already_provided?(job) + job['database_replica_location'] || job['database_write_location'] + end + + def load_balancer + LoadBalancing.proxy.load_balancer + end + end + end + end +end diff --git a/lib/gitlab/database/load_balancing/sidekiq_server_middleware.rb b/lib/gitlab/database/load_balancing/sidekiq_server_middleware.rb new file mode 100644 index 00000000000..9bd0adf8dbd --- /dev/null +++ b/lib/gitlab/database/load_balancing/sidekiq_server_middleware.rb @@ -0,0 +1,71 @@ +# frozen_string_literal: true + +module Gitlab + module Database + module LoadBalancing + class SidekiqServerMiddleware + JobReplicaNotUpToDate = Class.new(StandardError) + + def call(worker, job, _queue) + if requires_primary?(worker.class, job) + Session.current.use_primary! + end + + yield + ensure + clear + end + + private + + def clear + load_balancer.release_host + Session.clear_session + end + + def requires_primary?(worker_class, job) + return true unless worker_class.include?(::ApplicationWorker) + return true unless worker_class.utilizes_load_balancing_capabilities? + return true unless worker_class.get_data_consistency_feature_flag_enabled? + + location = job['database_write_location'] || job['database_replica_location'] + + return true unless location + + job_data_consistency = worker_class.get_data_consistency + job[:data_consistency] = job_data_consistency.to_s + + if replica_caught_up?(location) + job[:database_chosen] = 'replica' + false + elsif job_data_consistency == :delayed && not_yet_retried?(job) + job[:database_chosen] = 'retry' + raise JobReplicaNotUpToDate, "Sidekiq job #{worker_class} JID-#{job['jid']} couldn't use the replica."\ + " Replica was not up to date." + else + job[:database_chosen] = 'primary' + true + end + end + + def not_yet_retried?(job) + # if `retry_count` is `nil` it indicates that this job was never retried + # the `0` indicates that this is a first retry + job['retry_count'].nil? + end + + def load_balancer + LoadBalancing.proxy.load_balancer + end + + def replica_caught_up?(location) + if Feature.enabled?(:sidekiq_load_balancing_rotate_up_to_date_replica) + load_balancer.select_up_to_date_host(location) + else + load_balancer.host.caught_up?(location) + end + end + end + end + end +end diff --git a/lib/gitlab/database/load_balancing/srv_resolver.rb b/lib/gitlab/database/load_balancing/srv_resolver.rb new file mode 100644 index 00000000000..20da525f4d2 --- /dev/null +++ b/lib/gitlab/database/load_balancing/srv_resolver.rb @@ -0,0 +1,46 @@ +# frozen_string_literal: true + +module Gitlab + module Database + module LoadBalancing + # Hostnames returned in SRV records cannot sometimes be resolved by a local + # resolver, however, there's a possibility that their A/AAAA records are + # returned as part of the SRV query in the additional section, so we try + # to extract the IPs from there first, failing back to querying the + # hostnames A/AAAA records one by one, using the same resolver that + # queried the SRV record. + class SrvResolver + include Gitlab::Utils::StrongMemoize + + attr_reader :resolver, :additional + + def initialize(resolver, additional) + @resolver = resolver + @additional = additional + end + + def address_for(host) + addresses_from_additional[host] || resolve_host(host) + end + + private + + def addresses_from_additional + strong_memoize(:addresses_from_additional) do + additional.each_with_object({}) do |rr, h| + h[rr.name] = rr.address if rr.is_a?(Net::DNS::RR::A) || rr.is_a?(Net::DNS::RR::AAAA) + end + end + end + + def resolve_host(host) + record = resolver.search(host, Net::DNS::ANY).answer.find do |rr| + rr.is_a?(Net::DNS::RR::A) || rr.is_a?(Net::DNS::RR::AAAA) + end + + record&.address + end + end + end + end +end diff --git a/lib/gitlab/database/load_balancing/sticking.rb b/lib/gitlab/database/load_balancing/sticking.rb new file mode 100644 index 00000000000..efbd7099300 --- /dev/null +++ b/lib/gitlab/database/load_balancing/sticking.rb @@ -0,0 +1,147 @@ +# frozen_string_literal: true + +module Gitlab + module Database + module LoadBalancing + # Module used for handling sticking connections to a primary, if + # necessary. + # + # ## Examples + # + # Sticking a user to the primary: + # + # Sticking.stick_if_necessary(:user, current_user.id) + # + # To unstick if possible, or continue using the primary otherwise: + # + # Sticking.unstick_or_continue_sticking(:user, current_user.id) + module Sticking + # The number of seconds after which a session should stop reading from + # the primary. + EXPIRATION = 30 + + # Sticks to the primary if a write was performed. + def self.stick_if_necessary(namespace, id) + return unless LoadBalancing.enable? + + stick(namespace, id) if Session.current.performed_write? + end + + # Checks if we are caught-up with all the work + def self.all_caught_up?(namespace, id) + location = last_write_location_for(namespace, id) + + return true unless location + + load_balancer.all_caught_up?(location).tap do |caught_up| + unstick(namespace, id) if caught_up + end + end + + # Selects hosts that have caught up with the primary. This ensures + # atomic selection of the host to prevent the host list changing + # in another thread. + # + # Returns true if one host was selected. + def self.select_caught_up_replicas(namespace, id) + location = last_write_location_for(namespace, id) + + # Unlike all_caught_up?, we return false if no write location exists. + # We want to be sure we talk to a replica that has caught up for a specific + # write location. If no such location exists, err on the side of caution. + return false unless location + + load_balancer.select_caught_up_hosts(location).tap do |selected| + unstick(namespace, id) if selected + end + end + + # Sticks to the primary if necessary, otherwise unsticks an object (if + # it was previously stuck to the primary). + def self.unstick_or_continue_sticking(namespace, id) + Session.current.use_primary! unless all_caught_up?(namespace, id) + end + + # Select a replica that has caught up with the primary. If one has not been + # found, stick to the primary. + def self.select_valid_host(namespace, id) + replica_selected = select_caught_up_replicas(namespace, id) + + Session.current.use_primary! unless replica_selected + end + + # Starts sticking to the primary for the given namespace and id, using + # the latest WAL pointer from the primary. + def self.stick(namespace, id) + return unless LoadBalancing.enable? + + mark_primary_write_location(namespace, id) + Session.current.use_primary! + end + + def self.bulk_stick(namespace, ids) + return unless LoadBalancing.enable? + + with_primary_write_location do |location| + ids.each do |id| + set_write_location_for(namespace, id, location) + end + end + + Session.current.use_primary! + end + + def self.with_primary_write_location + return unless LoadBalancing.configured? + + # Load balancing could be enabled for the Web application server, + # but it's not activated for Sidekiq. We should update Redis with + # the write location just in case load balancing is being used. + location = + if LoadBalancing.enable? + load_balancer.primary_write_location + else + Gitlab::Database.get_write_location(ActiveRecord::Base.connection) + end + + return if location.blank? + + yield(location) + end + + def self.mark_primary_write_location(namespace, id) + with_primary_write_location do |location| + set_write_location_for(namespace, id, location) + end + end + + # Stops sticking to the primary. + def self.unstick(namespace, id) + Gitlab::Redis::SharedState.with do |redis| + redis.del(redis_key_for(namespace, id)) + end + end + + def self.set_write_location_for(namespace, id, location) + Gitlab::Redis::SharedState.with do |redis| + redis.set(redis_key_for(namespace, id), location, ex: EXPIRATION) + end + end + + def self.last_write_location_for(namespace, id) + Gitlab::Redis::SharedState.with do |redis| + redis.get(redis_key_for(namespace, id)) + end + end + + def self.redis_key_for(namespace, id) + "database-load-balancing/write-location/#{namespace}/#{id}" + end + + def self.load_balancer + LoadBalancing.proxy.load_balancer + end + end + end + end +end diff --git a/lib/gitlab/database/migration_helpers.rb b/lib/gitlab/database/migration_helpers.rb index 3a94e109d2a..d155abefdc8 100644 --- a/lib/gitlab/database/migration_helpers.rb +++ b/lib/gitlab/database/migration_helpers.rb @@ -5,7 +5,7 @@ module Gitlab module MigrationHelpers include Migrations::BackgroundMigrationHelpers include DynamicModelHelpers - include Migrations::RenameTableHelpers + include RenameTableHelpers # https://www.postgresql.org/docs/current/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS MAX_IDENTIFIER_NAME_LENGTH = 63 @@ -1091,6 +1091,25 @@ module Gitlab execute("DELETE FROM batched_background_migrations WHERE #{conditions}") end + def ensure_batched_background_migration_is_finished(job_class_name:, table_name:, column_name:, job_arguments:) + migration = Gitlab::Database::BackgroundMigration::BatchedMigration + .for_configuration(job_class_name, table_name, column_name, job_arguments).first + + configuration = { + job_class_name: job_class_name, + table_name: table_name, + column_name: column_name, + job_arguments: job_arguments + } + + if migration.nil? + Gitlab::AppLogger.warn "Could not find batched background migration for the given configuration: #{configuration}" + elsif !migration.finished? + raise "Expected batched background migration for the given configuration to be marked as 'finished', " \ + "but it is '#{migration.status}': #{configuration}" + end + end + # Returns an Array containing the indexes for the given column def indexes_for(table, column) column = column.to_s diff --git a/lib/gitlab/database/migrations/background_migration_helpers.rb b/lib/gitlab/database/migrations/background_migration_helpers.rb index 8d5ea652bfc..fa30ffb62f5 100644 --- a/lib/gitlab/database/migrations/background_migration_helpers.rb +++ b/lib/gitlab/database/migrations/background_migration_helpers.rb @@ -131,12 +131,51 @@ module Gitlab final_delay end + # Requeue pending jobs previously queued with #queue_background_migration_jobs_by_range_at_intervals + # + # This method is useful to schedule jobs that had previously failed. + # + # job_class_name - The background migration job class as a string + # delay_interval - The duration between each job's scheduled time + # batch_size - The maximum number of jobs to fetch to memory from the database. + def requeue_background_migration_jobs_by_range_at_intervals(job_class_name, delay_interval, batch_size: BATCH_SIZE, initial_delay: 0) + # To not overload the worker too much we enforce a minimum interval both + # when scheduling and performing jobs. + delay_interval = [delay_interval, BackgroundMigrationWorker.minimum_interval].max + + final_delay = 0 + job_counter = 0 + + jobs = Gitlab::Database::BackgroundMigrationJob.pending.where(class_name: job_class_name) + jobs.each_batch(of: batch_size) do |job_batch| + job_batch.each do |job| + final_delay = initial_delay + delay_interval * job_counter + + migrate_in(final_delay, job_class_name, job.arguments) + + job_counter += 1 + end + end + + duration = initial_delay + delay_interval * job_counter + say <<~SAY + Scheduled #{job_counter} #{job_class_name} jobs with an interval of #{delay_interval} seconds. + + The migration is expected to take at least #{duration} seconds. Expect all jobs to have completed after #{Time.zone.now + duration}." + SAY + + duration + end + # Creates a batched background migration for the given table. A batched migration runs one job # at a time, computing the bounds of the next batch based on the current migration settings and the previous # batch bounds. Each job's execution status is tracked in the database as the migration runs. The given job # class must be present in the Gitlab::BackgroundMigration module, and the batch class (if specified) must be # present in the Gitlab::BackgroundMigration::BatchingStrategies module. # + # If migration with same job_class_name, table_name, column_name, and job_aruments already exists, this helper + # will log an warning and not create a new one. + # # job_class_name - The background migration job class as a string # batch_table_name - The name of the table the migration will batch over # batch_column_name - The name of the column the migration will batch over @@ -180,6 +219,13 @@ module Gitlab sub_batch_size: SUB_BATCH_SIZE ) + if Gitlab::Database::BackgroundMigration::BatchedMigration.for_configuration(job_class_name, batch_table_name, batch_column_name, job_arguments).exists? + Gitlab::AppLogger.warn "Batched background migration not enqueued because it already exists: " \ + "job_class_name: #{job_class_name}, table_name: #{batch_table_name}, column_name: #{batch_column_name}, " \ + "job_arguments: #{job_arguments.inspect}" + return + end + job_interval = BATCH_MIN_DELAY if job_interval < BATCH_MIN_DELAY batch_max_value ||= connection.select_value(<<~SQL) @@ -194,13 +240,13 @@ module Gitlab job_class_name: job_class_name, table_name: batch_table_name, column_name: batch_column_name, + job_arguments: job_arguments, interval: job_interval, min_value: batch_min_value, max_value: batch_max_value, batch_class_name: batch_class_name, batch_size: batch_size, sub_batch_size: sub_batch_size, - job_arguments: job_arguments, status: migration_status) # This guard is necessary since #total_tuple_count was only introduced schema-wise, diff --git a/lib/gitlab/database/postgresql_adapter/empty_query_ping.rb b/lib/gitlab/database/postgresql_adapter/empty_query_ping.rb index 906312478ac..88affaa9757 100644 --- a/lib/gitlab/database/postgresql_adapter/empty_query_ping.rb +++ b/lib/gitlab/database/postgresql_adapter/empty_query_ping.rb @@ -1,5 +1,8 @@ # frozen_string_literal: true +# This patch will be included in the next Rails release: https://github.com/rails/rails/pull/42368 +raise 'This patch can be removed' if Rails::VERSION::MAJOR > 6 + # rubocop:disable Gitlab/ModuleWithInstanceVariables module Gitlab module Database diff --git a/lib/gitlab/database/postgresql_adapter/type_map_cache.rb b/lib/gitlab/database/postgresql_adapter/type_map_cache.rb new file mode 100644 index 00000000000..ff66d9115ab --- /dev/null +++ b/lib/gitlab/database/postgresql_adapter/type_map_cache.rb @@ -0,0 +1,44 @@ +# frozen_string_literal: true + +# Caches loading of additional types from the DB +# https://github.com/rails/rails/blob/v6.0.3.2/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb#L521-L589 + +# rubocop:disable Gitlab/ModuleWithInstanceVariables + +module Gitlab + module Database + module PostgresqlAdapter + module TypeMapCache + extend ActiveSupport::Concern + + TYPE_MAP_CACHE_MONITOR = ::Monitor.new + + class_methods do + def type_map_cache + TYPE_MAP_CACHE_MONITOR.synchronize do + @type_map_cache ||= {} + end + end + end + + def initialize_type_map(map = type_map) + TYPE_MAP_CACHE_MONITOR.synchronize do + cached_type_map = self.class.type_map_cache[@connection_parameters.hash] + break @type_map = cached_type_map if cached_type_map + + super + self.class.type_map_cache[@connection_parameters.hash] = map + end + end + + def reload_type_map + TYPE_MAP_CACHE_MONITOR.synchronize do + self.class.type_map_cache[@connection_parameters.hash] = nil + end + + super + end + end + end + end +end |