diff options
Diffstat (limited to 'lib/gitlab/background_migration/job_coordinator.rb')
-rw-r--r-- | lib/gitlab/background_migration/job_coordinator.rb | 134 |
1 files changed, 134 insertions, 0 deletions
diff --git a/lib/gitlab/background_migration/job_coordinator.rb b/lib/gitlab/background_migration/job_coordinator.rb new file mode 100644 index 00000000000..1c8819eaa62 --- /dev/null +++ b/lib/gitlab/background_migration/job_coordinator.rb @@ -0,0 +1,134 @@ +# frozen_string_literal: true + +module Gitlab + module BackgroundMigration + # Class responsible for executing background migrations based on the given database. + # + # Chooses the correct worker class when selecting jobs from the queue based on the + # convention of how the queues and worker classes are setup for each database. + # + # Also provides a database connection to the correct tracking database. + class JobCoordinator + VALID_DATABASES = %i[main].freeze + WORKER_CLASS_NAME = 'BackgroundMigrationWorker' + + def self.for_database(database) + database = database.to_sym + + unless VALID_DATABASES.include?(database) + raise ArgumentError, "database must be one of [#{VALID_DATABASES.join(', ')}], got '#{database}'" + end + + namespace = database.to_s.capitalize unless database == :main + namespaced_worker_class = [namespace, WORKER_CLASS_NAME].compact.join('::') + + new(database, "::#{namespaced_worker_class}".constantize) + end + + attr_reader :database, :worker_class + + def queue + @queue ||= worker_class.sidekiq_options['queue'] + end + + def with_shared_connection(&block) + Gitlab::Database::SharedModel.using_connection(connection, &block) + end + + def steal(steal_class, retry_dead_jobs: false) + with_shared_connection do + queues = [ + Sidekiq::ScheduledSet.new, + Sidekiq::Queue.new(self.queue) + ] + + if retry_dead_jobs + queues << Sidekiq::RetrySet.new + queues << Sidekiq::DeadSet.new + end + + queues.each do |queue| + queue.each do |job| + migration_class, migration_args = job.args + + next unless job.klass == worker_class.name + next unless migration_class == steal_class + next if block_given? && !(yield job) + + begin + perform(migration_class, migration_args) if job.delete + rescue Exception # rubocop:disable Lint/RescueException + worker_class # enqueue this migration again + .perform_async(migration_class, migration_args) + + raise + end + end + end + end + end + + def perform(class_name, arguments) + with_shared_connection do + migration_class_for(class_name).new.perform(*arguments) + end + end + + def remaining + enqueued = Sidekiq::Queue.new(self.queue) + scheduled = Sidekiq::ScheduledSet.new + + [enqueued, scheduled].sum do |set| + set.count do |job| + job.klass == worker_class.name + end + end + end + + def exists?(migration_class, additional_queues = []) + enqueued = Sidekiq::Queue.new(self.queue) + scheduled = Sidekiq::ScheduledSet.new + + enqueued_job?([enqueued, scheduled], migration_class) + end + + def dead_jobs?(migration_class) + dead_set = Sidekiq::DeadSet.new + + enqueued_job?([dead_set], migration_class) + end + + def retrying_jobs?(migration_class) + retry_set = Sidekiq::RetrySet.new + + enqueued_job?([retry_set], migration_class) + end + + def migration_class_for(class_name) + Gitlab::BackgroundMigration.const_get(class_name, false) + end + + def enqueued_job?(queues, migration_class) + queues.any? do |queue| + queue.any? do |job| + job.klass == worker_class.name && job.args.first == migration_class + end + end + end + + private + + def initialize(database, worker_class) + @database = database + @worker_class = worker_class + end + + def connection + @connection ||= Gitlab::Database + .database_base_models + .fetch(database, Gitlab::Database::PRIMARY_DATABASE_NAME) + .connection + end + end + end +end |