diff options
author | GitLab Bot <gitlab-bot@gitlab.com> | 2020-07-20 15:26:25 +0300 |
---|---|---|
committer | GitLab Bot <gitlab-bot@gitlab.com> | 2020-07-20 15:26:25 +0300 |
commit | a09983ae35713f5a2bbb100981116d31ce99826e (patch) | |
tree | 2ee2af7bd104d57086db360a7e6d8c9d5d43667a /lib/gitlab/database | |
parent | 18c5ab32b738c0b6ecb4d0df3994000482f34bd8 (diff) |
Add latest changes from gitlab-org/gitlab@13-2-stable-ee
Diffstat (limited to 'lib/gitlab/database')
11 files changed, 744 insertions, 189 deletions
diff --git a/lib/gitlab/database/background_migration_job.rb b/lib/gitlab/database/background_migration_job.rb new file mode 100644 index 00000000000..445735b232a --- /dev/null +++ b/lib/gitlab/database/background_migration_job.rb @@ -0,0 +1,38 @@ +# frozen_string_literal: true + +module Gitlab + module Database + class BackgroundMigrationJob < ActiveRecord::Base # rubocop:disable Rails/ApplicationRecord + self.table_name = :background_migration_jobs + + scope :for_migration_class, -> (class_name) { where(class_name: normalize_class_name(class_name)) } + scope :for_migration_execution, -> (class_name, arguments) do + for_migration_class(class_name).where('arguments = ?', arguments.to_json) + end + + scope :for_partitioning_migration, -> (class_name, table_name) do + for_migration_class(class_name).where('arguments ->> 2 = ?', table_name) + end + + enum status: { + pending: 0, + succeeded: 1 + } + + def self.mark_all_as_succeeded(class_name, arguments) + self.pending.for_migration_execution(class_name, arguments) + .update_all("status = #{statuses[:succeeded]}, updated_at = NOW()") + end + + def self.normalize_class_name(class_name) + return class_name unless class_name.present? && class_name.start_with?('::') + + class_name[2..] + end + + def class_name=(value) + write_attribute(:class_name, self.class.normalize_class_name(value)) + end + end + end +end diff --git a/lib/gitlab/database/dynamic_model_helpers.rb b/lib/gitlab/database/dynamic_model_helpers.rb new file mode 100644 index 00000000000..892f8291780 --- /dev/null +++ b/lib/gitlab/database/dynamic_model_helpers.rb @@ -0,0 +1,16 @@ +# frozen_string_literal: true + +module Gitlab + module Database + module DynamicModelHelpers + def define_batchable_model(table_name) + Class.new(ActiveRecord::Base) do + include EachBatch + + self.table_name = table_name + self.inheritance_column = :_type_disabled + end + end + end + end +end diff --git a/lib/gitlab/database/migration_helpers.rb b/lib/gitlab/database/migration_helpers.rb index fd09c31e994..006a24da8fe 100644 --- a/lib/gitlab/database/migration_helpers.rb +++ b/lib/gitlab/database/migration_helpers.rb @@ -3,10 +3,10 @@ module Gitlab module Database module MigrationHelpers + include Migrations::BackgroundMigrationHelpers + # https://www.postgresql.org/docs/current/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS MAX_IDENTIFIER_NAME_LENGTH = 63 - BACKGROUND_MIGRATION_BATCH_SIZE = 1000 # Number of rows to process per job - BACKGROUND_MIGRATION_JOB_BUFFER_SIZE = 1000 # Number of jobs to bulk queue at a time PERMITTED_TIMESTAMP_COLUMNS = %i[created_at updated_at deleted_at].to_set.freeze DEFAULT_TIMESTAMP_COLUMNS = %i[created_at updated_at].freeze @@ -136,6 +136,10 @@ module Gitlab 'in the body of your migration class' end + index_name = index_name[:name] if index_name.is_a?(Hash) + + raise 'remove_concurrent_index_by_name must get an index name as the second argument' if index_name.blank? + options = options.merge({ algorithm: :concurrently }) unless index_exists_by_name?(table_name, index_name) @@ -477,7 +481,7 @@ module Gitlab # type is used. # batch_column_name - option is for tables without primary key, in this # case another unique integer column can be used. Example: :user_id - def rename_column_concurrently(table, old, new, type: nil, batch_column_name: :id) + def rename_column_concurrently(table, old, new, type: nil, type_cast_function: nil, batch_column_name: :id) unless column_exists?(table, batch_column_name) raise "Column #{batch_column_name} does not exist on #{table}" end @@ -488,7 +492,7 @@ module Gitlab check_trigger_permissions!(table) - create_column_from(table, old, new, type: type, batch_column_name: batch_column_name) + create_column_from(table, old, new, type: type, batch_column_name: batch_column_name, type_cast_function: type_cast_function) install_rename_triggers(table, old, new) end @@ -536,10 +540,10 @@ module Gitlab # table - The table containing the column. # column - The name of the column to change. # new_type - The new column type. - def change_column_type_concurrently(table, column, new_type) + def change_column_type_concurrently(table, column, new_type, type_cast_function: nil) temp_column = "#{column}_for_type_change" - rename_column_concurrently(table, column, temp_column, type: new_type) + rename_column_concurrently(table, column, temp_column, type: new_type, type_cast_function: type_cast_function) end # Performs cleanup of a concurrent type change. @@ -786,10 +790,6 @@ module Gitlab end end - def perform_background_migration_inline? - Rails.env.test? || Rails.env.development? - end - # Performs a concurrent column rename when using PostgreSQL. def install_rename_triggers_for_postgresql(trigger, table, old, new) execute <<-EOF.strip_heredoc @@ -973,106 +973,6 @@ into similar problems in the future (e.g. when new tables are created). end end - # Bulk queues background migration jobs for an entire table, batched by ID range. - # "Bulk" meaning many jobs will be pushed at a time for efficiency. - # If you need a delay interval per job, then use `queue_background_migration_jobs_by_range_at_intervals`. - # - # model_class - The table being iterated over - # job_class_name - The background migration job class as a string - # batch_size - The maximum number of rows per job - # - # Example: - # - # class Route < ActiveRecord::Base - # include EachBatch - # self.table_name = 'routes' - # end - # - # bulk_queue_background_migration_jobs_by_range(Route, 'ProcessRoutes') - # - # Where the model_class includes EachBatch, and the background migration exists: - # - # class Gitlab::BackgroundMigration::ProcessRoutes - # def perform(start_id, end_id) - # # do something - # end - # end - def bulk_queue_background_migration_jobs_by_range(model_class, job_class_name, batch_size: BACKGROUND_MIGRATION_BATCH_SIZE) - raise "#{model_class} does not have an ID to use for batch ranges" unless model_class.column_names.include?('id') - - jobs = [] - table_name = model_class.quoted_table_name - - model_class.each_batch(of: batch_size) do |relation| - start_id, end_id = relation.pluck("MIN(#{table_name}.id)", "MAX(#{table_name}.id)").first - - if jobs.length >= BACKGROUND_MIGRATION_JOB_BUFFER_SIZE - # Note: This code path generally only helps with many millions of rows - # We push multiple jobs at a time to reduce the time spent in - # Sidekiq/Redis operations. We're using this buffer based approach so we - # don't need to run additional queries for every range. - bulk_migrate_async(jobs) - jobs.clear - end - - jobs << [job_class_name, [start_id, end_id]] - end - - bulk_migrate_async(jobs) unless jobs.empty? - end - - # Queues background migration jobs for an entire table, batched by ID range. - # Each job is scheduled with a `delay_interval` in between. - # If you use a small interval, then some jobs may run at the same time. - # - # model_class - The table or relation being iterated over - # job_class_name - The background migration job class as a string - # delay_interval - The duration between each job's scheduled time (must respond to `to_f`) - # batch_size - The maximum number of rows per job - # other_arguments - Other arguments to send to the job - # - # *Returns the final migration delay* - # - # Example: - # - # class Route < ActiveRecord::Base - # include EachBatch - # self.table_name = 'routes' - # end - # - # queue_background_migration_jobs_by_range_at_intervals(Route, 'ProcessRoutes', 1.minute) - # - # Where the model_class includes EachBatch, and the background migration exists: - # - # class Gitlab::BackgroundMigration::ProcessRoutes - # def perform(start_id, end_id) - # # do something - # end - # end - def queue_background_migration_jobs_by_range_at_intervals(model_class, job_class_name, delay_interval, batch_size: BACKGROUND_MIGRATION_BATCH_SIZE, other_job_arguments: [], initial_delay: 0) - raise "#{model_class} does not have an ID to use for batch ranges" unless model_class.column_names.include?('id') - - # To not overload the worker too much we enforce a minimum interval both - # when scheduling and performing jobs. - if delay_interval < BackgroundMigrationWorker.minimum_interval - delay_interval = BackgroundMigrationWorker.minimum_interval - end - - final_delay = 0 - - model_class.each_batch(of: batch_size) do |relation, index| - start_id, end_id = relation.pluck(Arel.sql('MIN(id), MAX(id)')).first - - # `BackgroundMigrationWorker.bulk_perform_in` schedules all jobs for - # the same time, which is not helpful in most cases where we wish to - # spread the work over time. - final_delay = initial_delay + delay_interval * index - migrate_in(final_delay, job_class_name, [start_id, end_id] + other_job_arguments) - end - - final_delay - end - # Fetches indexes on a column by name for postgres. # # This will include indexes using an expression on the column, for example: @@ -1131,30 +1031,6 @@ into similar problems in the future (e.g. when new tables are created). execute(sql) end - def migrate_async(*args) - with_migration_context do - BackgroundMigrationWorker.perform_async(*args) - end - end - - def migrate_in(*args) - with_migration_context do - BackgroundMigrationWorker.perform_in(*args) - end - end - - def bulk_migrate_in(*args) - with_migration_context do - BackgroundMigrationWorker.bulk_perform_in(*args) - end - end - - def bulk_migrate_async(*args) - with_migration_context do - BackgroundMigrationWorker.bulk_perform_async(*args) - end - end - # Returns the name for a check constraint # # type: @@ -1396,7 +1272,7 @@ into similar problems in the future (e.g. when new tables are created). "ON DELETE #{on_delete.upcase}" end - def create_column_from(table, old, new, type: nil, batch_column_name: :id) + def create_column_from(table, old, new, type: nil, batch_column_name: :id, type_cast_function: nil) old_col = column_for(table, old) new_type = type || old_col.type @@ -1410,7 +1286,13 @@ into similar problems in the future (e.g. when new tables are created). # necessary since we copy over old values further down. change_column_default(table, new, old_col.default) unless old_col.default.nil? - update_column_in_batches(table, new, Arel::Table.new(table)[old], batch_column_name: batch_column_name) + old_value = Arel::Table.new(table)[old] + + if type_cast_function.present? + old_value = Arel::Nodes::NamedFunction.new(type_cast_function, [old_value]) + end + + update_column_in_batches(table, new, old_value, batch_column_name: batch_column_name) add_not_null_constraint(table, new) unless old_col.null @@ -1437,10 +1319,6 @@ into similar problems in the future (e.g. when new tables are created). your migration class ERROR end - - def with_migration_context(&block) - Gitlab::ApplicationContext.with_context(caller_id: self.class.to_s, &block) - end end end end diff --git a/lib/gitlab/database/migrations/background_migration_helpers.rb b/lib/gitlab/database/migrations/background_migration_helpers.rb new file mode 100644 index 00000000000..a6cc03aa9eb --- /dev/null +++ b/lib/gitlab/database/migrations/background_migration_helpers.rb @@ -0,0 +1,157 @@ +# frozen_string_literal: true + +module Gitlab + module Database + module Migrations + module BackgroundMigrationHelpers + BACKGROUND_MIGRATION_BATCH_SIZE = 1_000 # Number of rows to process per job + BACKGROUND_MIGRATION_JOB_BUFFER_SIZE = 1_000 # Number of jobs to bulk queue at a time + + # Bulk queues background migration jobs for an entire table, batched by ID range. + # "Bulk" meaning many jobs will be pushed at a time for efficiency. + # If you need a delay interval per job, then use `queue_background_migration_jobs_by_range_at_intervals`. + # + # model_class - The table being iterated over + # job_class_name - The background migration job class as a string + # batch_size - The maximum number of rows per job + # + # Example: + # + # class Route < ActiveRecord::Base + # include EachBatch + # self.table_name = 'routes' + # end + # + # bulk_queue_background_migration_jobs_by_range(Route, 'ProcessRoutes') + # + # Where the model_class includes EachBatch, and the background migration exists: + # + # class Gitlab::BackgroundMigration::ProcessRoutes + # def perform(start_id, end_id) + # # do something + # end + # end + def bulk_queue_background_migration_jobs_by_range(model_class, job_class_name, batch_size: BACKGROUND_MIGRATION_BATCH_SIZE) + raise "#{model_class} does not have an ID to use for batch ranges" unless model_class.column_names.include?('id') + + jobs = [] + table_name = model_class.quoted_table_name + + model_class.each_batch(of: batch_size) do |relation| + start_id, end_id = relation.pluck("MIN(#{table_name}.id)", "MAX(#{table_name}.id)").first + + if jobs.length >= BACKGROUND_MIGRATION_JOB_BUFFER_SIZE + # Note: This code path generally only helps with many millions of rows + # We push multiple jobs at a time to reduce the time spent in + # Sidekiq/Redis operations. We're using this buffer based approach so we + # don't need to run additional queries for every range. + bulk_migrate_async(jobs) + jobs.clear + end + + jobs << [job_class_name, [start_id, end_id]] + end + + bulk_migrate_async(jobs) unless jobs.empty? + end + + # Queues background migration jobs for an entire table, batched by ID range. + # Each job is scheduled with a `delay_interval` in between. + # If you use a small interval, then some jobs may run at the same time. + # + # model_class - The table or relation being iterated over + # job_class_name - The background migration job class as a string + # delay_interval - The duration between each job's scheduled time (must respond to `to_f`) + # batch_size - The maximum number of rows per job + # other_arguments - Other arguments to send to the job + # track_jobs - When this flag is set, creates a record in the background_migration_jobs table for each job that + # is scheduled to be run. These records can be used to trace execution of the background job, but there is no + # builtin support to manage that automatically at this time. You should only set this flag if you are aware of + # how it works, and intend to manually cleanup the database records in your background job. + # + # *Returns the final migration delay* + # + # Example: + # + # class Route < ActiveRecord::Base + # include EachBatch + # self.table_name = 'routes' + # end + # + # queue_background_migration_jobs_by_range_at_intervals(Route, 'ProcessRoutes', 1.minute) + # + # Where the model_class includes EachBatch, and the background migration exists: + # + # class Gitlab::BackgroundMigration::ProcessRoutes + # def perform(start_id, end_id) + # # do something + # end + # end + def queue_background_migration_jobs_by_range_at_intervals(model_class, job_class_name, delay_interval, batch_size: BACKGROUND_MIGRATION_BATCH_SIZE, other_job_arguments: [], initial_delay: 0, track_jobs: false) + raise "#{model_class} does not have an ID to use for batch ranges" unless model_class.column_names.include?('id') + + # To not overload the worker too much we enforce a minimum interval both + # when scheduling and performing jobs. + if delay_interval < BackgroundMigrationWorker.minimum_interval + delay_interval = BackgroundMigrationWorker.minimum_interval + end + + final_delay = 0 + + model_class.each_batch(of: batch_size) do |relation, index| + start_id, end_id = relation.pluck(Arel.sql('MIN(id), MAX(id)')).first + + # `BackgroundMigrationWorker.bulk_perform_in` schedules all jobs for + # the same time, which is not helpful in most cases where we wish to + # spread the work over time. + final_delay = initial_delay + delay_interval * index + full_job_arguments = [start_id, end_id] + other_job_arguments + + track_in_database(job_class_name, full_job_arguments) if track_jobs + migrate_in(final_delay, job_class_name, full_job_arguments) + end + + final_delay + end + + def perform_background_migration_inline? + Rails.env.test? || Rails.env.development? + end + + def migrate_async(*args) + with_migration_context do + BackgroundMigrationWorker.perform_async(*args) + end + end + + def migrate_in(*args) + with_migration_context do + BackgroundMigrationWorker.perform_in(*args) + end + end + + def bulk_migrate_in(*args) + with_migration_context do + BackgroundMigrationWorker.bulk_perform_in(*args) + end + end + + def bulk_migrate_async(*args) + with_migration_context do + BackgroundMigrationWorker.bulk_perform_async(*args) + end + end + + def with_migration_context(&block) + Gitlab::ApplicationContext.with_context(caller_id: self.class.to_s, &block) + end + + private + + def track_in_database(class_name, arguments) + Gitlab::Database::BackgroundMigrationJob.create!(class_name: class_name, arguments: arguments) + end + end + end + end +end diff --git a/lib/gitlab/database/partitioning/monthly_strategy.rb b/lib/gitlab/database/partitioning/monthly_strategy.rb new file mode 100644 index 00000000000..ecc05d9654a --- /dev/null +++ b/lib/gitlab/database/partitioning/monthly_strategy.rb @@ -0,0 +1,96 @@ +# frozen_string_literal: true + +module Gitlab + module Database + module Partitioning + class MonthlyStrategy + attr_reader :model, :partitioning_key + + # We create this many partitions in the future + HEADROOM = 6.months + + delegate :table_name, to: :model + + def initialize(model, partitioning_key) + @model = model + @partitioning_key = partitioning_key + end + + def current_partitions + result = connection.select_all(<<~SQL) + select + pg_class.relname, + parent_class.relname as base_table, + pg_get_expr(pg_class.relpartbound, inhrelid) as condition + from pg_class + inner join pg_inherits i on pg_class.oid = inhrelid + inner join pg_class parent_class on parent_class.oid = inhparent + inner join pg_namespace ON pg_namespace.oid = pg_class.relnamespace + where pg_namespace.nspname = #{connection.quote(Gitlab::Database::DYNAMIC_PARTITIONS_SCHEMA)} + and parent_class.relname = #{connection.quote(table_name)} + and pg_class.relispartition + order by pg_class.relname + SQL + + result.map do |record| + TimePartition.from_sql(table_name, record['relname'], record['condition']) + end + end + + # Check the currently existing partitions and determine which ones are missing + def missing_partitions + desired_partitions - current_partitions + end + + private + + def desired_partitions + [].tap do |parts| + min_date, max_date = relevant_range + + parts << partition_for(upper_bound: min_date) + + while min_date < max_date + next_date = min_date.next_month + + parts << partition_for(lower_bound: min_date, upper_bound: next_date) + + min_date = next_date + end + end + end + + # This determines the relevant time range for which we expect to have data + # (and therefore need to create partitions for). + # + # Note: We typically expect the first partition to be half-unbounded, i.e. + # to start from MINVALUE to a specific date `x`. The range returned + # does not include the range of the first, half-unbounded partition. + def relevant_range + if first_partition = current_partitions.min + # Case 1: First partition starts with MINVALUE, i.e. from is nil -> start with first real partition + # Case 2: Rather unexpectedly, first partition does not start with MINVALUE, i.e. from is not nil + # In this case, use first partition beginning as a start + min_date = first_partition.from || first_partition.to + end + + # In case we don't have a partition yet + min_date ||= Date.today + min_date = min_date.beginning_of_month + + max_date = Date.today.end_of_month + HEADROOM + + [min_date, max_date] + end + + def partition_for(lower_bound: nil, upper_bound:) + TimePartition.new(table_name, lower_bound, upper_bound) + end + + def connection + ActiveRecord::Base.connection + end + end + end + end +end diff --git a/lib/gitlab/database/partitioning/partition_creator.rb b/lib/gitlab/database/partitioning/partition_creator.rb new file mode 100644 index 00000000000..348dd1ba660 --- /dev/null +++ b/lib/gitlab/database/partitioning/partition_creator.rb @@ -0,0 +1,87 @@ +# frozen_string_literal: true + +module Gitlab + module Database + module Partitioning + class PartitionCreator + def self.register(model) + raise ArgumentError, "Only models with a #partitioning_strategy can be registered." unless model.respond_to?(:partitioning_strategy) + + models << model + end + + def self.models + @models ||= Set.new + end + + LEASE_TIMEOUT = 1.minute + LEASE_KEY = 'database_partition_creation_%s' + + attr_reader :models + + def initialize(models = self.class.models) + @models = models + end + + def create_partitions + return unless Feature.enabled?(:postgres_dynamic_partition_creation, default_enabled: true) + + models.each do |model| + # Double-checking before getting the lease: + # The prevailing situation is no missing partitions + next if missing_partitions(model).empty? + + only_with_exclusive_lease(model) do + partitions_to_create = missing_partitions(model) + + next if partitions_to_create.empty? + + create(model, partitions_to_create) + end + rescue => e + Gitlab::AppLogger.error("Failed to create partition(s) for #{model.table_name}: #{e.class}: #{e.message}") + end + end + + private + + def missing_partitions(model) + return [] unless connection.table_exists?(model.table_name) + + model.partitioning_strategy.missing_partitions + end + + def only_with_exclusive_lease(model) + lease = Gitlab::ExclusiveLease.new(LEASE_KEY % model.table_name, timeout: LEASE_TIMEOUT) + + yield if lease.try_obtain + ensure + lease&.cancel + end + + def create(model, partitions) + connection.transaction do + with_lock_retries do + partitions.each do |partition| + connection.execute partition.to_sql + + Gitlab::AppLogger.info("Created partition #{partition.partition_name} for table #{partition.table}") + end + end + end + end + + def with_lock_retries(&block) + Gitlab::Database::WithLockRetries.new({ + klass: self.class, + logger: Gitlab::AppLogger + }).run(&block) + end + + def connection + ActiveRecord::Base.connection + end + end + end + end +end diff --git a/lib/gitlab/database/partitioning/time_partition.rb b/lib/gitlab/database/partitioning/time_partition.rb new file mode 100644 index 00000000000..7dca60c0854 --- /dev/null +++ b/lib/gitlab/database/partitioning/time_partition.rb @@ -0,0 +1,84 @@ +# frozen_string_literal: true + +module Gitlab + module Database + module Partitioning + class TimePartition + include Comparable + + def self.from_sql(table, partition_name, definition) + matches = definition.match(/FOR VALUES FROM \('?(?<from>.+)'?\) TO \('?(?<to>.+)'?\)/) + + raise ArgumentError, "Unknown partition definition: #{definition}" unless matches + + raise NotImplementedError, "Open-end time partitions with MAXVALUE are not supported yet" if matches[:to] == 'MAXVALUE' + + from = matches[:from] == 'MINVALUE' ? nil : matches[:from] + to = matches[:to] + + new(table, from, to, partition_name: partition_name) + end + + attr_reader :table, :from, :to + + def initialize(table, from, to, partition_name: nil) + @table = table.to_s + @from = date_or_nil(from) + @to = date_or_nil(to) + @partition_name = partition_name + end + + def partition_name + return @partition_name if @partition_name + + suffix = from&.strftime('%Y%m') || '000000' + + "#{table}_#{suffix}" + end + + def to_sql + from_sql = from ? conn.quote(from.strftime('%Y-%m-%d')) : 'MINVALUE' + to_sql = conn.quote(to.strftime('%Y-%m-%d')) + + <<~SQL + CREATE TABLE IF NOT EXISTS #{fully_qualified_partition} + PARTITION OF #{conn.quote_table_name(table)} + FOR VALUES FROM (#{from_sql}) TO (#{to_sql}) + SQL + end + + def ==(other) + table == other.table && partition_name == other.partition_name && from == other.from && to == other.to + end + alias_method :eql?, :== + + def hash + [table, partition_name, from, to].hash + end + + def <=>(other) + return if table != other.table + + partition_name <=> other.partition_name + end + + private + + def date_or_nil(obj) + return unless obj + return obj if obj.is_a?(Date) + + Date.parse(obj) + end + + def fully_qualified_partition + "%s.%s" % [conn.quote_table_name(Gitlab::Database::DYNAMIC_PARTITIONS_SCHEMA), conn.quote_table_name(partition_name)] + end + + def conn + @conn ||= ActiveRecord::Base.connection + end + end + end + end +end diff --git a/lib/gitlab/database/partitioning_migration_helpers/backfill_partitioned_table.rb b/lib/gitlab/database/partitioning_migration_helpers/backfill_partitioned_table.rb new file mode 100644 index 00000000000..f9ad1e60776 --- /dev/null +++ b/lib/gitlab/database/partitioning_migration_helpers/backfill_partitioned_table.rb @@ -0,0 +1,105 @@ +# frozen_string_literal: true + +module Gitlab + module Database + module PartitioningMigrationHelpers + # Class that will generically copy data from a given table into its corresponding partitioned table + class BackfillPartitionedTable + include ::Gitlab::Database::DynamicModelHelpers + + SUB_BATCH_SIZE = 2_500 + PAUSE_SECONDS = 0.25 + + def perform(start_id, stop_id, source_table, partitioned_table, source_column) + return unless Feature.enabled?(:backfill_partitioned_audit_events, default_enabled: true) + + if transaction_open? + raise "Aborting job to backfill partitioned #{source_table} table! Do not run this job in a transaction block!" + end + + unless table_exists?(partitioned_table) + logger.warn "exiting backfill migration because partitioned table #{partitioned_table} does not exist. " \ + "This could be due to the migration being rolled back after migration jobs were enqueued in sidekiq" + return + end + + bulk_copy = BulkCopy.new(source_table, partitioned_table, source_column) + parent_batch_relation = relation_scoped_to_range(source_table, source_column, start_id, stop_id) + + parent_batch_relation.each_batch(of: SUB_BATCH_SIZE) do |sub_batch| + sub_start_id, sub_stop_id = sub_batch.pluck(Arel.sql("MIN(#{source_column}), MAX(#{source_column})")).first + + bulk_copy.copy_between(sub_start_id, sub_stop_id) + sleep(PAUSE_SECONDS) + end + + mark_jobs_as_succeeded(start_id, stop_id, source_table, partitioned_table, source_column) + end + + private + + def connection + ActiveRecord::Base.connection + end + + def transaction_open? + connection.transaction_open? + end + + def table_exists?(table) + connection.table_exists?(table) + end + + def logger + @logger ||= ::Gitlab::BackgroundMigration::Logger.build + end + + def relation_scoped_to_range(source_table, source_key_column, start_id, stop_id) + define_batchable_model(source_table).where(source_key_column => start_id..stop_id) + end + + def mark_jobs_as_succeeded(*arguments) + BackgroundMigrationJob.mark_all_as_succeeded(self.class.name, arguments) + end + + # Helper class to copy data between two tables via upserts + class BulkCopy + DELIMITER = ', ' + + attr_reader :source_table, :destination_table, :source_column + + def initialize(source_table, destination_table, source_column) + @source_table = source_table + @destination_table = destination_table + @source_column = source_column + end + + def copy_between(start_id, stop_id) + connection.execute(<<~SQL) + INSERT INTO #{destination_table} (#{column_listing}) + SELECT #{column_listing} + FROM #{source_table} + WHERE #{source_column} BETWEEN #{start_id} AND #{stop_id} + FOR UPDATE + ON CONFLICT (#{conflict_targets}) DO NOTHING + SQL + end + + private + + def connection + @connection ||= ActiveRecord::Base.connection + end + + def column_listing + @column_listing ||= connection.columns(source_table).map(&:name).join(DELIMITER) + end + + def conflict_targets + connection.primary_key(destination_table).join(DELIMITER) + end + end + end + end + end +end diff --git a/lib/gitlab/database/partitioning_migration_helpers/foreign_key_helpers.rb b/lib/gitlab/database/partitioning_migration_helpers/foreign_key_helpers.rb index 9e687009cd7..1fb9476b7d9 100644 --- a/lib/gitlab/database/partitioning_migration_helpers/foreign_key_helpers.rb +++ b/lib/gitlab/database/partitioning_migration_helpers/foreign_key_helpers.rb @@ -99,7 +99,7 @@ module Gitlab drop_function(fn_name, if_exists: true) else create_or_replace_fk_function(fn_name, final_keys) - create_trigger(trigger_name, fn_name, fires: "AFTER DELETE ON #{to_table}") + create_trigger(to_table, trigger_name, fn_name, fires: 'AFTER DELETE') end end end diff --git a/lib/gitlab/database/partitioning_migration_helpers/table_management_helpers.rb b/lib/gitlab/database/partitioning_migration_helpers/table_management_helpers.rb index f77fbe98df1..b676767f41d 100644 --- a/lib/gitlab/database/partitioning_migration_helpers/table_management_helpers.rb +++ b/lib/gitlab/database/partitioning_migration_helpers/table_management_helpers.rb @@ -5,10 +5,16 @@ module Gitlab module PartitioningMigrationHelpers module TableManagementHelpers include ::Gitlab::Database::SchemaHelpers + include ::Gitlab::Database::DynamicModelHelpers + include ::Gitlab::Database::Migrations::BackgroundMigrationHelpers - WHITELISTED_TABLES = %w[audit_events].freeze + ALLOWED_TABLES = %w[audit_events].freeze ERROR_SCOPE = 'table partitioning' + MIGRATION_CLASS_NAME = "::#{module_parent_name}::BackfillPartitionedTable" + BATCH_INTERVAL = 2.minutes.freeze + BATCH_SIZE = 50_000 + # Creates a partitioned copy of an existing table, using a RANGE partitioning strategy on a timestamp column. # One partition is created per month between the given `min_date` and `max_date`. # @@ -18,14 +24,25 @@ module Gitlab # # partition_table_by_date :audit_events, :created_at, min_date: Date.new(2020, 1), max_date: Date.new(2020, 6) # - # Required options are: + # Options are: # :min_date - a date specifying the lower bounds of the partition range - # :max_date - a date specifying the upper bounds of the partitioning range + # :max_date - a date specifying the upper bounds of the partitioning range, defaults to today + 1 month # - def partition_table_by_date(table_name, column_name, min_date:, max_date:) - assert_table_is_whitelisted(table_name) + # Unless min_date is specified explicitly, we default to + # 1. The minimum value for the partitioning column in the table + # 2. If no data is present yet, the current month + def partition_table_by_date(table_name, column_name, min_date: nil, max_date: nil) + assert_table_is_allowed(table_name) + assert_not_in_transaction_block(scope: ERROR_SCOPE) + max_date ||= Date.today + 1.month + + min_date ||= connection.select_one(<<~SQL)['minimum'] || max_date - 1.month + SELECT date_trunc('MONTH', MIN(#{column_name})) AS minimum + FROM #{table_name} + SQL + raise "max_date #{max_date} must be greater than min_date #{min_date}" if min_date >= max_date primary_key = connection.primary_key(table_name) @@ -34,10 +51,12 @@ module Gitlab partition_column = find_column_definition(table_name, column_name) raise "partition column #{column_name} does not exist on #{table_name}" if partition_column.nil? - new_table_name = partitioned_table_name(table_name) - create_range_partitioned_copy(new_table_name, table_name, partition_column, primary_key) - create_daterange_partitions(new_table_name, partition_column.name, min_date, max_date) - create_sync_trigger(table_name, new_table_name, primary_key) + partitioned_table_name = make_partitioned_table_name(table_name) + + create_range_partitioned_copy(table_name, partitioned_table_name, partition_column, primary_key) + create_daterange_partitions(partitioned_table_name, partition_column.name, min_date, max_date) + create_trigger_to_sync_tables(table_name, partitioned_table_name, primary_key) + enqueue_background_migration(table_name, partitioned_table_name, primary_key) end # Clean up a partitioned copy of an existing table. This deletes the partitioned table and all partitions. @@ -47,39 +66,58 @@ module Gitlab # drop_partitioned_table_for :audit_events # def drop_partitioned_table_for(table_name) - assert_table_is_whitelisted(table_name) + assert_table_is_allowed(table_name) assert_not_in_transaction_block(scope: ERROR_SCOPE) + cleanup_migration_jobs(table_name) + with_lock_retries do - trigger_name = sync_trigger_name(table_name) + trigger_name = make_sync_trigger_name(table_name) drop_trigger(table_name, trigger_name) end - function_name = sync_function_name(table_name) + function_name = make_sync_function_name(table_name) drop_function(function_name) - part_table_name = partitioned_table_name(table_name) - drop_table(part_table_name) + partitioned_table_name = make_partitioned_table_name(table_name) + drop_table(partitioned_table_name) + end + + def create_hash_partitions(table_name, number_of_partitions) + transaction do + (0..number_of_partitions - 1).each do |partition| + decimals = Math.log10(number_of_partitions).ceil + suffix = "%0#{decimals}d" % partition + partition_name = "#{table_name}_#{suffix}" + schema = Gitlab::Database::STATIC_PARTITIONS_SCHEMA + + execute(<<~SQL) + CREATE TABLE #{schema}.#{partition_name} + PARTITION OF #{table_name} + FOR VALUES WITH (MODULUS #{number_of_partitions}, REMAINDER #{partition}); + SQL + end + end end private - def assert_table_is_whitelisted(table_name) - return if WHITELISTED_TABLES.include?(table_name.to_s) + def assert_table_is_allowed(table_name) + return if ALLOWED_TABLES.include?(table_name.to_s) - raise "partitioning helpers are in active development, and #{table_name} is not whitelisted for use, " \ + raise "partitioning helpers are in active development, and #{table_name} is not allowed for use, " \ "for more information please contact the database team" end - def partitioned_table_name(table) + def make_partitioned_table_name(table) tmp_table_name("#{table}_part") end - def sync_function_name(table) + def make_sync_function_name(table) object_name(table, 'table_sync_function') end - def sync_trigger_name(table) + def make_sync_trigger_name(table) object_name(table, 'table_sync_trigger') end @@ -87,11 +125,11 @@ module Gitlab connection.columns(table).find { |c| c.name == column.to_s } end - def create_range_partitioned_copy(table_name, template_table_name, partition_column, primary_key) - if table_exists?(table_name) + def create_range_partitioned_copy(source_table_name, partitioned_table_name, partition_column, primary_key) + if table_exists?(partitioned_table_name) # rubocop:disable Gitlab/RailsLogger Rails.logger.warn "Partitioned table not created because it already exists" \ - " (this may be due to an aborted migration or similar): table_name: #{table_name} " + " (this may be due to an aborted migration or similar): table_name: #{partitioned_table_name} " # rubocop:enable Gitlab/RailsLogger return end @@ -99,20 +137,20 @@ module Gitlab tmp_column_name = object_name(partition_column.name, 'partition_key') transaction do execute(<<~SQL) - CREATE TABLE #{table_name} ( - LIKE #{template_table_name} INCLUDING ALL EXCLUDING INDEXES, + CREATE TABLE #{partitioned_table_name} ( + LIKE #{source_table_name} INCLUDING ALL EXCLUDING INDEXES, #{tmp_column_name} #{partition_column.sql_type} NOT NULL, PRIMARY KEY (#{[primary_key, tmp_column_name].join(", ")}) ) PARTITION BY RANGE (#{tmp_column_name}) SQL - remove_column(table_name, partition_column.name) - rename_column(table_name, tmp_column_name, partition_column.name) - change_column_default(table_name, primary_key, nil) + remove_column(partitioned_table_name, partition_column.name) + rename_column(partitioned_table_name, tmp_column_name, partition_column.name) + change_column_default(partitioned_table_name, primary_key, nil) - if column_of_type?(table_name, primary_key, :integer) + if column_of_type?(partitioned_table_name, primary_key, :integer) # Default to int8 primary keys to prevent overflow - change_column(table_name, primary_key, :bigint) + change_column(partitioned_table_name, primary_key, :bigint) end end end @@ -125,7 +163,8 @@ module Gitlab min_date = min_date.beginning_of_month.to_date max_date = max_date.next_month.beginning_of_month.to_date - create_range_partition_safely("#{table_name}_000000", table_name, 'MINVALUE', to_sql_date_literal(min_date)) + upper_bound = to_sql_date_literal(min_date) + create_range_partition_safely("#{table_name}_000000", table_name, 'MINVALUE', upper_bound) while min_date < max_date partition_name = "#{table_name}_#{min_date.strftime('%Y%m')}" @@ -143,7 +182,7 @@ module Gitlab end def create_range_partition_safely(partition_name, table_name, lower_bound, upper_bound) - if table_exists?(partition_name) + if table_exists?(table_for_range_partition(partition_name)) # rubocop:disable Gitlab/RailsLogger Rails.logger.warn "Partition not created because it already exists" \ " (this may be due to an aborted migration or similar): partition_name: #{partition_name}" @@ -154,34 +193,42 @@ module Gitlab create_range_partition(partition_name, table_name, lower_bound, upper_bound) end - def create_sync_trigger(source_table, target_table, unique_key) - function_name = sync_function_name(source_table) - trigger_name = sync_trigger_name(source_table) + def create_trigger_to_sync_tables(source_table_name, partitioned_table_name, unique_key) + function_name = make_sync_function_name(source_table_name) + trigger_name = make_sync_trigger_name(source_table_name) with_lock_retries do - create_sync_function(function_name, target_table, unique_key) - create_comment('FUNCTION', function_name, "Partitioning migration: table sync for #{source_table} table") + create_sync_function(function_name, partitioned_table_name, unique_key) + create_comment('FUNCTION', function_name, "Partitioning migration: table sync for #{source_table_name} table") - create_trigger(trigger_name, function_name, fires: "AFTER INSERT OR UPDATE OR DELETE ON #{source_table}") + create_sync_trigger(source_table_name, trigger_name, function_name) end end - def create_sync_function(name, target_table, unique_key) + def create_sync_function(name, partitioned_table_name, unique_key) + if function_exists?(name) + # rubocop:disable Gitlab/RailsLogger + Rails.logger.warn "Partitioning sync function not created because it already exists" \ + " (this may be due to an aborted migration or similar): function name: #{name}" + # rubocop:enable Gitlab/RailsLogger + return + end + delimiter = ",\n " - column_names = connection.columns(target_table).map(&:name) + column_names = connection.columns(partitioned_table_name).map(&:name) set_statements = build_set_statements(column_names, unique_key) insert_values = column_names.map { |name| "NEW.#{name}" } create_trigger_function(name, replace: false) do <<~SQL IF (TG_OP = 'DELETE') THEN - DELETE FROM #{target_table} where #{unique_key} = OLD.#{unique_key}; + DELETE FROM #{partitioned_table_name} where #{unique_key} = OLD.#{unique_key}; ELSIF (TG_OP = 'UPDATE') THEN - UPDATE #{target_table} + UPDATE #{partitioned_table_name} SET #{set_statements.join(delimiter)} - WHERE #{target_table}.#{unique_key} = NEW.#{unique_key}; + WHERE #{partitioned_table_name}.#{unique_key} = NEW.#{unique_key}; ELSIF (TG_OP = 'INSERT') THEN - INSERT INTO #{target_table} (#{column_names.join(delimiter)}) + INSERT INTO #{partitioned_table_name} (#{column_names.join(delimiter)}) VALUES (#{insert_values.join(delimiter)}); END IF; RETURN NULL; @@ -190,7 +237,35 @@ module Gitlab end def build_set_statements(column_names, unique_key) - column_names.reject { |name| name == unique_key }.map { |column_name| "#{column_name} = NEW.#{column_name}" } + column_names.reject { |name| name == unique_key }.map { |name| "#{name} = NEW.#{name}" } + end + + def create_sync_trigger(table_name, trigger_name, function_name) + if trigger_exists?(table_name, trigger_name) + # rubocop:disable Gitlab/RailsLogger + Rails.logger.warn "Partitioning sync trigger not created because it already exists" \ + " (this may be due to an aborted migration or similar): trigger name: #{trigger_name}" + # rubocop:enable Gitlab/RailsLogger + return + end + + create_trigger(table_name, trigger_name, function_name, fires: 'AFTER INSERT OR UPDATE OR DELETE') + end + + def enqueue_background_migration(source_table_name, partitioned_table_name, source_key) + source_model = define_batchable_model(source_table_name) + + queue_background_migration_jobs_by_range_at_intervals( + source_model, + MIGRATION_CLASS_NAME, + BATCH_INTERVAL, + batch_size: BATCH_SIZE, + other_job_arguments: [source_table_name.to_s, partitioned_table_name, source_key], + track_jobs: true) + end + + def cleanup_migration_jobs(table_name) + ::Gitlab::Database::BackgroundMigrationJob.for_partitioning_migration(MIGRATION_CLASS_NAME, table_name).delete_all end end end diff --git a/lib/gitlab/database/schema_helpers.rb b/lib/gitlab/database/schema_helpers.rb index 8e544307d81..34daafd06de 100644 --- a/lib/gitlab/database/schema_helpers.rb +++ b/lib/gitlab/database/schema_helpers.rb @@ -16,15 +16,30 @@ module Gitlab SQL end - def create_trigger(name, function_name, fires: nil) + def function_exists?(name) + connection.select_value("SELECT 1 FROM pg_proc WHERE proname = '#{name}'") + end + + def create_trigger(table_name, name, function_name, fires:) execute(<<~SQL) CREATE TRIGGER #{name} - #{fires} + #{fires} ON #{table_name} FOR EACH ROW EXECUTE PROCEDURE #{function_name}() SQL end + def trigger_exists?(table_name, name) + connection.select_value(<<~SQL) + SELECT 1 + FROM pg_trigger + INNER JOIN pg_class + ON pg_trigger.tgrelid = pg_class.oid + WHERE pg_class.relname = '#{table_name}' + AND pg_trigger.tgname = '#{name}' + SQL + end + def drop_function(name, if_exists: true) exists_clause = optional_clause(if_exists, "IF EXISTS") execute("DROP FUNCTION #{exists_clause} #{name}()") @@ -69,9 +84,13 @@ module Gitlab private + def table_for_range_partition(partition_name) + "#{Gitlab::Database::DYNAMIC_PARTITIONS_SCHEMA}.#{partition_name}" + end + def create_range_partition(partition_name, table_name, lower_bound, upper_bound) execute(<<~SQL) - CREATE TABLE #{partition_name} PARTITION OF #{table_name} + CREATE TABLE #{table_for_range_partition(partition_name)} PARTITION OF #{table_name} FOR VALUES FROM (#{lower_bound}) TO (#{upper_bound}) SQL end |