Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'lib/gitlab/database/migrations')
-rw-r--r--lib/gitlab/database/migrations/batched_migration_last_id.rb50
-rw-r--r--lib/gitlab/database/migrations/runner.rb28
-rw-r--r--lib/gitlab/database/migrations/sidekiq_helpers.rb112
-rw-r--r--lib/gitlab/database/migrations/test_batched_background_runner.rb9
4 files changed, 191 insertions, 8 deletions
diff --git a/lib/gitlab/database/migrations/batched_migration_last_id.rb b/lib/gitlab/database/migrations/batched_migration_last_id.rb
new file mode 100644
index 00000000000..c77a2e9a375
--- /dev/null
+++ b/lib/gitlab/database/migrations/batched_migration_last_id.rb
@@ -0,0 +1,50 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module Database
+ module Migrations
+ class BatchedMigrationLastId
+ FILE_NAME = 'last-batched-background-migration-id.txt'
+
+ def initialize(connection, base_dir)
+ @connection = connection
+ @base_dir = base_dir
+ end
+
+ def store
+ File.open(file_path, 'wb') { |file| file.write(last_background_migration_id) }
+ end
+
+ # Reads the last id from the file
+ #
+ # @info casts the file content into an +Integer+.
+ # Casts any unexpected content to +nil+
+ #
+ # @example
+ # Integer('4', exception: false) # => 4
+ # Integer('', exception: false) # => nil
+ #
+ # @return [Integer, nil]
+ def read
+ return unless File.exist?(file_path)
+
+ Integer(File.read(file_path).presence, exception: false)
+ end
+
+ private
+
+ attr_reader :connection, :base_dir
+
+ def file_path
+ @file_path ||= base_dir.join(FILE_NAME)
+ end
+
+ def last_background_migration_id
+ Gitlab::Database::SharedModel.using_connection(connection) do
+ Gitlab::Database::BackgroundMigration::BatchedMigration.maximum(:id)
+ end
+ end
+ end
+ end
+ end
+end
diff --git a/lib/gitlab/database/migrations/runner.rb b/lib/gitlab/database/migrations/runner.rb
index 27b161419b2..ed55081c9ab 100644
--- a/lib/gitlab/database/migrations/runner.rb
+++ b/lib/gitlab/database/migrations/runner.rb
@@ -29,16 +29,14 @@ module Gitlab
def batched_background_migrations(for_database:, legacy_mode: false)
runner = nil
- result_dir = if legacy_mode
- BASE_RESULT_DIR.join('background_migrations')
- else
- BASE_RESULT_DIR.join(for_database.to_s, 'background_migrations')
- end
+ result_dir = background_migrations_dir(for_database, legacy_mode)
# Only one loop iteration since we pass `only:` here
Gitlab::Database::EachDatabase.each_database_connection(only: for_database) do |connection|
+ from_id = batched_migrations_last_id(for_database).read
+
runner = Gitlab::Database::Migrations::TestBatchedBackgroundRunner
- .new(result_dir: result_dir, connection: connection)
+ .new(result_dir: result_dir, connection: connection, from_id: from_id)
end
runner
@@ -66,6 +64,18 @@ module Gitlab
end
# rubocop:enable Database/MultipleDatabases
+ def batched_migrations_last_id(for_database)
+ runner = nil
+ base_dir = background_migrations_dir(for_database, false)
+
+ Gitlab::Database::EachDatabase.each_database_connection(only: for_database) do |connection|
+ runner = Gitlab::Database::Migrations::BatchedMigrationLastId
+ .new(connection, base_dir)
+ end
+
+ runner
+ end
+
private
def migrations_for_up(database)
@@ -90,6 +100,12 @@ module Gitlab
existing_versions.include?(migration.version) && versions_this_branch.include?(migration.version)
end
end
+
+ def background_migrations_dir(db, legacy_mode)
+ return BASE_RESULT_DIR.join('background_migrations') if legacy_mode
+
+ BASE_RESULT_DIR.join(db.to_s, 'background_migrations')
+ end
end
attr_reader :direction, :result_dir, :migrations
diff --git a/lib/gitlab/database/migrations/sidekiq_helpers.rb b/lib/gitlab/database/migrations/sidekiq_helpers.rb
new file mode 100644
index 00000000000..c536b33bbdf
--- /dev/null
+++ b/lib/gitlab/database/migrations/sidekiq_helpers.rb
@@ -0,0 +1,112 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module Database
+ module Migrations
+ # rubocop:disable Cop/SidekiqApiUsage
+ # rubocop:disable Cop/SidekiqRedisCall
+ module SidekiqHelpers
+ # Constants for default sidekiq_remove_jobs values
+ DEFAULT_MAX_ATTEMPTS = 5
+ DEFAULT_TIMES_IN_A_ROW = 2
+
+ # Probabilistically removes job_klasses from their specific queues, the
+ # retry set and the scheduled set.
+ #
+ # If jobs are still being processed at the same time, then there is a
+ # small chance it will not remove all instances of job_klass. To
+ # minimize this risk, it repeatedly removes matching jobs from each
+ # until nothing is removed twice in a row.
+ #
+ # Before calling this method, you should make sure that job_klass is no
+ # longer being scheduled within the running application.
+ def sidekiq_remove_jobs(
+ job_klasses:,
+ times_in_a_row: DEFAULT_TIMES_IN_A_ROW,
+ max_attempts: DEFAULT_MAX_ATTEMPTS
+ )
+
+ kwargs = { times_in_a_row: times_in_a_row, max_attempts: max_attempts }
+
+ job_klasses_queues = job_klasses
+ .select { |job_klass| job_klass.to_s.safe_constantize.present? }
+ .map { |job_klass| job_klass.safe_constantize.queue }
+ .uniq
+
+ job_klasses_queues.each do |queue|
+ delete_jobs_for(
+ set: Sidekiq::Queue.new(queue),
+ job_klasses: job_klasses,
+ kwargs: kwargs
+ )
+ end
+
+ delete_jobs_for(
+ set: Sidekiq::RetrySet.new,
+ kwargs: kwargs,
+ job_klasses: job_klasses
+ )
+
+ delete_jobs_for(
+ set: Sidekiq::ScheduledSet.new,
+ kwargs: kwargs,
+ job_klasses: job_klasses
+ )
+ end
+
+ def sidekiq_queue_migrate(queue_from, to:)
+ while sidekiq_queue_length(queue_from) > 0
+ Sidekiq.redis do |conn|
+ conn.rpoplpush "queue:#{queue_from}", "queue:#{to}"
+ end
+ end
+ end
+
+ def sidekiq_queue_length(queue_name)
+ Sidekiq.redis do |conn|
+ conn.llen("queue:#{queue_name}")
+ end
+ end
+
+ private
+
+ # Handle the "jobs deleted" tracking that is needed in order to track
+ # whether a job was deleted or not.
+ def delete_jobs_for(set:, kwargs:, job_klasses:)
+ until_equal_to(0, **kwargs) do
+ set.count do |job|
+ job_klasses.include?(job.klass) && job.delete
+ end
+ end
+ end
+
+ # Control how many times in a row you want to see a job deleted 0
+ # times. The idea is that if you see 0 jobs deleted x number of times
+ # in a row you've *likely* covered the case in which the queue was
+ # mutating while this was running.
+ def until_equal_to(target, times_in_a_row:, max_attempts:)
+ streak = 0
+
+ result = { attempts: 0, success: false }
+
+ 1.upto(max_attempts) do |current_attempt|
+ # yield's return value is a count of "jobs_deleted"
+ if yield == target
+ streak += 1
+ elsif streak > 0
+ streak = 0
+ end
+
+ result[:attempts] = current_attempt
+ result[:success] = streak == times_in_a_row
+
+ break if result[:success]
+ end
+ result
+ end
+ end
+ # rubocop:enable Cop/SidekiqApiUsage
+ # rubocop:enable Cop/SidekiqRedisCall
+ end
+ end
+end
diff --git a/lib/gitlab/database/migrations/test_batched_background_runner.rb b/lib/gitlab/database/migrations/test_batched_background_runner.rb
index 46855ca1921..a16103f452c 100644
--- a/lib/gitlab/database/migrations/test_batched_background_runner.rb
+++ b/lib/gitlab/database/migrations/test_batched_background_runner.rb
@@ -6,16 +6,17 @@ module Gitlab
class TestBatchedBackgroundRunner < BaseBackgroundRunner
include Gitlab::Database::DynamicModelHelpers
- def initialize(result_dir:, connection:)
+ def initialize(result_dir:, connection:, from_id:)
super(result_dir: result_dir, connection: connection)
@connection = connection
+ @from_id = from_id
end
def jobs_by_migration_name
Gitlab::Database::SharedModel.using_connection(connection) do
Gitlab::Database::BackgroundMigration::BatchedMigration
.executable
- .created_after(3.hours.ago) # Simple way to exclude migrations already running before migration testing
+ .where('id > ?', from_id)
.to_h do |migration|
batching_strategy = migration.batch_class.new(connection: connection)
@@ -102,6 +103,10 @@ module Gitlab
end
end
end
+
+ private
+
+ attr_reader :from_id
end
end
end