Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'app/workers/container_registry')
-rw-r--r--app/workers/container_registry/migration/enqueuer_worker.rb116
-rw-r--r--app/workers/container_registry/migration/guard_worker.rb101
-rw-r--r--app/workers/container_registry/migration/observer_worker.rb40
3 files changed, 257 insertions, 0 deletions
diff --git a/app/workers/container_registry/migration/enqueuer_worker.rb b/app/workers/container_registry/migration/enqueuer_worker.rb
new file mode 100644
index 00000000000..5feaba870e6
--- /dev/null
+++ b/app/workers/container_registry/migration/enqueuer_worker.rb
@@ -0,0 +1,116 @@
+# frozen_string_literal: true
+
+module ContainerRegistry
+ module Migration
+ class EnqueuerWorker
+ include ApplicationWorker
+ include CronjobQueue # rubocop:disable Scalability/CronWorkerContext
+ include Gitlab::Utils::StrongMemoize
+
+ data_consistency :always
+ feature_category :container_registry
+ urgency :low
+ deduplicate :until_executing, including_scheduled: true
+ idempotent!
+
+ def perform
+ return unless migration.enabled?
+ return unless below_capacity?
+ return unless waiting_time_passed?
+
+ re_enqueue_if_capacity if handle_aborted_migration || handle_next_migration
+ rescue StandardError => e
+ Gitlab::ErrorTracking.log_exception(
+ e,
+ next_repository_id: next_repository&.id,
+ next_aborted_repository_id: next_aborted_repository&.id
+ )
+
+ next_repository&.abort_import
+ end
+
+ private
+
+ def handle_aborted_migration
+ return unless next_aborted_repository&.retry_aborted_migration
+
+ log_extra_metadata_on_done(:container_repository_id, next_aborted_repository.id)
+ log_extra_metadata_on_done(:import_type, 'retry')
+
+ true
+ end
+
+ def handle_next_migration
+ return unless next_repository
+ # We return true because the repository was successfully processed (migration_state is changed)
+ return true if tag_count_too_high?
+ return unless next_repository.start_pre_import
+
+ log_extra_metadata_on_done(:container_repository_id, next_repository.id)
+ log_extra_metadata_on_done(:import_type, 'next')
+
+ true
+ end
+
+ def tag_count_too_high?
+ return false unless next_repository.tags_count > migration.max_tags_count
+
+ next_repository.skip_import(reason: :too_many_tags)
+
+ true
+ end
+
+ def below_capacity?
+ current_capacity <= maximum_capacity
+ end
+
+ def waiting_time_passed?
+ delay = migration.enqueue_waiting_time
+ return true if delay == 0
+ return true unless last_step_completed_repository
+
+ last_step_completed_repository.last_import_step_done_at < Time.zone.now - delay
+ end
+
+ def current_capacity
+ strong_memoize(:current_capacity) do
+ ContainerRepository.with_migration_states(
+ %w[pre_importing pre_import_done importing]
+ ).count
+ end
+ end
+
+ def maximum_capacity
+ migration.capacity
+ end
+
+ def next_repository
+ strong_memoize(:next_repository) do
+ ContainerRepository.ready_for_import.take # rubocop:disable CodeReuse/ActiveRecord
+ end
+ end
+
+ def next_aborted_repository
+ strong_memoize(:next_aborted_repository) do
+ ContainerRepository.with_migration_state('import_aborted').take # rubocop:disable CodeReuse/ActiveRecord
+ end
+ end
+
+ def last_step_completed_repository
+ strong_memoize(:last_step_completed_repository) do
+ ContainerRepository.recently_done_migration_step.first
+ end
+ end
+
+ def migration
+ ::ContainerRegistry::Migration
+ end
+
+ def re_enqueue_if_capacity
+ return unless current_capacity < maximum_capacity
+
+ self.class.perform_async
+ end
+ end
+ end
+end
diff --git a/app/workers/container_registry/migration/guard_worker.rb b/app/workers/container_registry/migration/guard_worker.rb
new file mode 100644
index 00000000000..77ae111c1cb
--- /dev/null
+++ b/app/workers/container_registry/migration/guard_worker.rb
@@ -0,0 +1,101 @@
+# frozen_string_literal: true
+
+module ContainerRegistry
+ module Migration
+ class GuardWorker
+ include ApplicationWorker
+ # This is a general worker with no context.
+ # It is not scoped to a project, user or group.
+ # We don't have a context.
+ include CronjobQueue # rubocop:disable Scalability/CronWorkerContext
+
+ data_consistency :always
+ feature_category :container_registry
+ urgency :low
+ worker_resource_boundary :unknown
+ deduplicate :until_executed
+ idempotent!
+
+ def perform
+ return unless Gitlab.com?
+
+ repositories = ::ContainerRepository.with_stale_migration(step_before_timestamp)
+ .limit(max_capacity)
+ aborts_count = 0
+ long_running_migration_ids = []
+
+ # the #to_a is safe as the amount of entries is limited.
+ # In addition, we're calling #each in the next line and we don't want two different SQL queries for these two lines
+ log_extra_metadata_on_done(:stale_migrations_count, repositories.to_a.size)
+
+ repositories.each do |repository|
+ if abortable?(repository)
+ repository.abort_import
+ aborts_count += 1
+ else
+ long_running_migration_ids << repository.id if long_running_migration?(repository)
+ end
+ end
+
+ log_extra_metadata_on_done(:aborted_stale_migrations_count, aborts_count)
+
+ if long_running_migration_ids.any?
+ log_extra_metadata_on_done(:long_running_stale_migration_container_repository_ids, long_running_migration_ids)
+ end
+ end
+
+ private
+
+ # This can ping the Container Registry API.
+ # We loop on a set of repositories to calls this function (see #perform)
+ # In the worst case scenario, we have a n+1 API calls situation here.
+ #
+ # This is reasonable because the maximum amount of repositories looped
+ # on is `25`. See ::ContainerRegistry::Migration.capacity.
+ #
+ # TODO We can remove this n+1 situation by having a Container Registry API
+ # endpoint that accepts multiple repository paths at once. This is issue
+ # https://gitlab.com/gitlab-org/container-registry/-/issues/582
+ def abortable?(repository)
+ # early return to save one Container Registry API request
+ return true unless repository.importing? || repository.pre_importing?
+ return true unless external_migration_in_progress?(repository)
+
+ false
+ end
+
+ def long_running_migration?(repository)
+ migration_start_timestamp(repository).before?(long_running_migration_threshold)
+ end
+
+ def external_migration_in_progress?(repository)
+ status = repository.external_import_status
+
+ (status == 'pre_import_in_progress' && repository.pre_importing?) ||
+ (status == 'import_in_progress' && repository.importing?)
+ end
+
+ def migration_start_timestamp(repository)
+ if repository.pre_importing?
+ repository.migration_pre_import_started_at
+ else
+ repository.migration_import_started_at
+ end
+ end
+
+ def step_before_timestamp
+ ::ContainerRegistry::Migration.max_step_duration.seconds.ago
+ end
+
+ def max_capacity
+ # doubling the actual capacity to prevent issues in case the capacity
+ # is not properly applied
+ ::ContainerRegistry::Migration.capacity * 2
+ end
+
+ def long_running_migration_threshold
+ @threshold ||= 30.minutes.ago
+ end
+ end
+ end
+end
diff --git a/app/workers/container_registry/migration/observer_worker.rb b/app/workers/container_registry/migration/observer_worker.rb
new file mode 100644
index 00000000000..757c4fd11a5
--- /dev/null
+++ b/app/workers/container_registry/migration/observer_worker.rb
@@ -0,0 +1,40 @@
+# frozen_string_literal: true
+
+module ContainerRegistry
+ module Migration
+ class ObserverWorker
+ include ApplicationWorker
+ # This worker does not perform work scoped to a context
+ include CronjobQueue # rubocop:disable Scalability/CronWorkerContext
+
+ COUNT_BATCH_SIZE = 50000
+
+ data_consistency :sticky
+ feature_category :container_registry
+ urgency :low
+ deduplicate :until_executed, including_scheduled: true
+ idempotent!
+
+ def perform
+ return unless ::ContainerRegistry::Migration.enabled?
+
+ use_replica_if_available do
+ ContainerRepository::MIGRATION_STATES.each do |state|
+ relation = ContainerRepository.with_migration_state(state)
+ count = ::Gitlab::Database::BatchCount.batch_count(
+ relation, batch_size: COUNT_BATCH_SIZE
+ )
+ name = "#{state}_count".to_sym
+ log_extra_metadata_on_done(name, count)
+ end
+ end
+ end
+
+ private
+
+ def use_replica_if_available(&block)
+ ::Gitlab::Database::LoadBalancing::Session.current.use_replicas_for_read_queries(&block)
+ end
+ end
+ end
+end