Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGitLab Bot <gitlab-bot@gitlab.com>2022-04-20 13:00:54 +0300
committerGitLab Bot <gitlab-bot@gitlab.com>2022-04-20 13:00:54 +0300
commit3cccd102ba543e02725d247893729e5c73b38295 (patch)
treef36a04ec38517f5deaaacb5acc7d949688d1e187 /app/workers
parent205943281328046ef7b4528031b90fbda70c75ac (diff)
Add latest changes from gitlab-org/gitlab@14-10-stable-eev14.10.0-rc42
Diffstat (limited to 'app/workers')
-rw-r--r--app/workers/all_queues.yml65
-rw-r--r--app/workers/bulk_import_worker.rb22
-rw-r--r--app/workers/bulk_imports/entity_worker.rb37
-rw-r--r--app/workers/bulk_imports/export_request_worker.rb10
-rw-r--r--app/workers/bulk_imports/pipeline_worker.rb41
-rw-r--r--app/workers/bulk_imports/relation_export_worker.rb4
-rw-r--r--app/workers/bulk_imports/stuck_import_worker.rb31
-rw-r--r--app/workers/ci/update_locked_unknown_artifacts_worker.rb26
-rw-r--r--app/workers/concerns/chaos_queue.rb2
-rw-r--r--app/workers/concerns/git_garbage_collect_methods.rb6
-rw-r--r--app/workers/concerns/packages/cleanup_artifact_worker.rb6
-rw-r--r--app/workers/concerns/reactive_cacheable_worker.rb5
-rw-r--r--app/workers/concerns/worker_attributes.rb8
-rw-r--r--app/workers/container_registry/migration/enqueuer_worker.rb107
-rw-r--r--app/workers/container_registry/migration/guard_worker.rb52
-rw-r--r--app/workers/database/batched_background_migration/ci_database_worker.rb4
-rw-r--r--app/workers/database/batched_background_migration/single_database_worker.rb6
-rw-r--r--app/workers/database/batched_background_migration_worker.rb4
-rw-r--r--app/workers/database/ci_namespace_mirrors_consistency_check_worker.rb28
-rw-r--r--app/workers/database/ci_project_mirrors_consistency_check_worker.rb28
-rw-r--r--app/workers/delete_stored_files_worker.rb2
-rw-r--r--app/workers/environments/auto_stop_worker.rb6
-rw-r--r--app/workers/flush_counter_increments_worker.rb5
-rw-r--r--app/workers/namespaces/invite_team_email_worker.rb22
-rw-r--r--app/workers/namespaces/root_statistics_worker.rb9
-rw-r--r--app/workers/object_storage/background_move_worker.rb2
-rw-r--r--app/workers/object_storage/migrate_uploads_worker.rb2
-rw-r--r--app/workers/project_export_worker.rb16
-rw-r--r--app/workers/projects/record_target_platforms_worker.rb55
-rw-r--r--app/workers/quality/test_data_cleanup_worker.rb33
30 files changed, 445 insertions, 199 deletions
diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml
index 48bdee4062b..bfb70e0d496 100644
--- a/app/workers/all_queues.yml
+++ b/app/workers/all_queues.yml
@@ -192,6 +192,15 @@
:weight: 1
:idempotent: true
:tags: []
+- :name: cronjob:bulk_imports_stuck_import
+ :worker_name: BulkImports::StuckImportWorker
+ :feature_category: :importers
+ :has_external_dependencies:
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: cronjob:ci_archive_traces_cron
:worker_name: Ci::ArchiveTracesCronWorker
:feature_category: :continuous_integration
@@ -255,6 +264,15 @@
:weight: 1
:idempotent: true
:tags: []
+- :name: cronjob:ci_update_locked_unknown_artifacts
+ :worker_name: Ci::UpdateLockedUnknownArtifactsWorker
+ :feature_category: :build_artifacts
+ :has_external_dependencies:
+ :urgency: :throttled
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent:
+ :tags: []
- :name: cronjob:clusters_integrations_check_prometheus_health
:worker_name: Clusters::Integrations::CheckPrometheusHealthWorker
:feature_category: :incident_management
@@ -318,6 +336,24 @@
:weight: 1
:idempotent: true
:tags: []
+- :name: cronjob:database_ci_namespace_mirrors_consistency_check
+ :worker_name: Database::CiNamespaceMirrorsConsistencyCheckWorker
+ :feature_category: :sharding
+ :has_external_dependencies:
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
+- :name: cronjob:database_ci_project_mirrors_consistency_check
+ :worker_name: Database::CiProjectMirrorsConsistencyCheckWorker
+ :feature_category: :sharding
+ :has_external_dependencies:
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: cronjob:database_drop_detached_partitions
:worker_name: Database::DropDetachedPartitionsWorker
:feature_category: :database
@@ -579,15 +615,6 @@
:weight: 1
:idempotent:
:tags: []
-- :name: cronjob:quality_test_data_cleanup
- :worker_name: Quality::TestDataCleanupWorker
- :feature_category: :quality_management
- :has_external_dependencies:
- :urgency: :low
- :resource_boundary: :unknown
- :weight: 1
- :idempotent: true
- :tags: []
- :name: cronjob:releases_manage_evidence
:worker_name: Releases::ManageEvidenceWorker
:feature_category: :release_evidence
@@ -2578,15 +2605,6 @@
:weight: 1
:idempotent:
:tags: []
-- :name: namespaces_invite_team_email
- :worker_name: Namespaces::InviteTeamEmailWorker
- :feature_category: :experimentation_activation
- :has_external_dependencies:
- :urgency: :low
- :resource_boundary: :unknown
- :weight: 1
- :idempotent:
- :tags: []
- :name: namespaces_onboarding_issue_created
:worker_name: Namespaces::OnboardingIssueCreatedWorker
:feature_category: :onboarding
@@ -2771,7 +2789,7 @@
:worker_name: ProjectExportWorker
:feature_category: :importers
:has_external_dependencies:
- :urgency: :throttled
+ :urgency: :low
:resource_boundary: :memory
:weight: 1
:idempotent:
@@ -2812,6 +2830,15 @@
:weight: 1
:idempotent: true
:tags: []
+- :name: projects_record_target_platforms
+ :worker_name: Projects::RecordTargetPlatformsWorker
+ :feature_category: :experimentation_activation
+ :has_external_dependencies:
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: projects_refresh_build_artifacts_size_statistics
:worker_name: Projects::RefreshBuildArtifactsSizeStatisticsWorker
:feature_category: :build_artifacts
diff --git a/app/workers/bulk_import_worker.rb b/app/workers/bulk_import_worker.rb
index d560ebcc6e6..157586ca397 100644
--- a/app/workers/bulk_import_worker.rb
+++ b/app/workers/bulk_import_worker.rb
@@ -3,15 +3,12 @@
class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker
- data_consistency :always
+ PERFORM_DELAY = 5.seconds
+ data_consistency :always
feature_category :importers
-
sidekiq_options retry: false, dead: false
- PERFORM_DELAY = 5.seconds
- DEFAULT_BATCH_SIZE = 5
-
def perform(bulk_import_id)
@bulk_import = BulkImport.find_by_id(bulk_import_id)
@@ -19,11 +16,10 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker
return if @bulk_import.finished? || @bulk_import.failed?
return @bulk_import.fail_op! if all_entities_failed?
return @bulk_import.finish! if all_entities_processed? && @bulk_import.started?
- return re_enqueue if max_batch_size_exceeded? # Do not start more jobs if max allowed are already running
@bulk_import.start! if @bulk_import.created?
- created_entities.first(next_batch_size).each do |entity|
+ created_entities.find_each do |entity|
entity.create_pipeline_trackers!
BulkImports::ExportRequestWorker.perform_async(entity.id)
@@ -45,10 +41,6 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker
@entities ||= @bulk_import.entities
end
- def started_entities
- entities.with_status(:started)
- end
-
def created_entities
entities.with_status(:created)
end
@@ -61,14 +53,6 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker
entities.all? { |entity| entity.failed? }
end
- def max_batch_size_exceeded?
- started_entities.count >= DEFAULT_BATCH_SIZE
- end
-
- def next_batch_size
- [DEFAULT_BATCH_SIZE - started_entities.count, 0].max
- end
-
# A new BulkImportWorker job is enqueued to either
# - Process the new BulkImports::Entity created during import (e.g. for the subgroups)
# - Or to mark the `bulk_import` as finished
diff --git a/app/workers/bulk_imports/entity_worker.rb b/app/workers/bulk_imports/entity_worker.rb
index 70d6626df91..f6b1c693fe4 100644
--- a/app/workers/bulk_imports/entity_worker.rb
+++ b/app/workers/bulk_imports/entity_worker.rb
@@ -4,24 +4,32 @@ module BulkImports
class EntityWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker
+ idempotent!
+ deduplicate :until_executing
data_consistency :always
-
feature_category :importers
-
sidekiq_options retry: false, dead: false
-
worker_has_external_dependencies!
- idempotent!
- deduplicate :until_executed, including_scheduled: true
-
def perform(entity_id, current_stage = nil)
- return if stage_running?(entity_id, current_stage)
+ if stage_running?(entity_id, current_stage)
+ logger.info(
+ structured_payload(
+ entity_id: entity_id,
+ current_stage: current_stage,
+ message: 'Stage running'
+ )
+ )
+
+ return
+ end
logger.info(
- worker: self.class.name,
- entity_id: entity_id,
- current_stage: current_stage
+ structured_payload(
+ entity_id: entity_id,
+ current_stage: current_stage,
+ message: 'Stage starting'
+ )
)
next_pipeline_trackers_for(entity_id).each do |pipeline_tracker|
@@ -33,10 +41,11 @@ module BulkImports
end
rescue StandardError => e
logger.error(
- worker: self.class.name,
- entity_id: entity_id,
- current_stage: current_stage,
- error_message: e.message
+ structured_payload(
+ entity_id: entity_id,
+ current_stage: current_stage,
+ message: e.message
+ )
)
Gitlab::ErrorTracking.track_exception(e, entity_id: entity_id)
diff --git a/app/workers/bulk_imports/export_request_worker.rb b/app/workers/bulk_imports/export_request_worker.rb
index 21040178cee..0d3e4f013dd 100644
--- a/app/workers/bulk_imports/export_request_worker.rb
+++ b/app/workers/bulk_imports/export_request_worker.rb
@@ -42,10 +42,12 @@ module BulkImports
correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id
}
- Gitlab::Import::Logger.warn(
- attributes.merge(
- bulk_import_id: entity.bulk_import.id,
- bulk_import_entity_type: entity.source_type
+ Gitlab::Import::Logger.error(
+ structured_payload(
+ attributes.merge(
+ bulk_import_id: entity.bulk_import.id,
+ bulk_import_entity_type: entity.source_type
+ )
)
)
diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb
index 03ec2f058ca..1a98705c151 100644
--- a/app/workers/bulk_imports/pipeline_worker.rb
+++ b/app/workers/bulk_imports/pipeline_worker.rb
@@ -4,14 +4,11 @@ module BulkImports
class PipelineWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker
- data_consistency :always
-
- NDJSON_PIPELINE_PERFORM_DELAY = 1.minute
+ NDJSON_PIPELINE_PERFORM_DELAY = 10.seconds
+ data_consistency :always
feature_category :importers
-
sidekiq_options retry: false, dead: false
-
worker_has_external_dependencies!
def perform(pipeline_tracker_id, stage, entity_id)
@@ -21,18 +18,20 @@ module BulkImports
if pipeline_tracker.present?
logger.info(
- worker: self.class.name,
- entity_id: pipeline_tracker.entity.id,
- pipeline_name: pipeline_tracker.pipeline_name
+ structured_payload(
+ entity_id: pipeline_tracker.entity.id,
+ pipeline_name: pipeline_tracker.pipeline_name
+ )
)
run(pipeline_tracker)
else
logger.error(
- worker: self.class.name,
- entity_id: entity_id,
- pipeline_tracker_id: pipeline_tracker_id,
- message: 'Unstarted pipeline not found'
+ structured_payload(
+ entity_id: entity_id,
+ pipeline_tracker_id: pipeline_tracker_id,
+ message: 'Unstarted pipeline not found'
+ )
)
end
@@ -66,10 +65,11 @@ module BulkImports
rescue BulkImports::NetworkError => e
if e.retriable?(pipeline_tracker)
logger.error(
- worker: self.class.name,
- entity_id: pipeline_tracker.entity.id,
- pipeline_name: pipeline_tracker.pipeline_name,
- message: "Retrying error: #{e.message}"
+ structured_payload(
+ entity_id: pipeline_tracker.entity.id,
+ pipeline_name: pipeline_tracker.pipeline_name,
+ message: "Retrying error: #{e.message}"
+ )
)
pipeline_tracker.update!(status_event: 'retry', jid: jid)
@@ -86,10 +86,11 @@ module BulkImports
pipeline_tracker.update!(status_event: 'fail_op', jid: jid)
logger.error(
- worker: self.class.name,
- entity_id: pipeline_tracker.entity.id,
- pipeline_name: pipeline_tracker.pipeline_name,
- message: exception.message
+ structured_payload(
+ entity_id: pipeline_tracker.entity.id,
+ pipeline_name: pipeline_tracker.pipeline_name,
+ message: exception.message
+ )
)
Gitlab::ErrorTracking.track_exception(
diff --git a/app/workers/bulk_imports/relation_export_worker.rb b/app/workers/bulk_imports/relation_export_worker.rb
index 9324b79cc75..dcac841b3b2 100644
--- a/app/workers/bulk_imports/relation_export_worker.rb
+++ b/app/workers/bulk_imports/relation_export_worker.rb
@@ -3,12 +3,12 @@
module BulkImports
class RelationExportWorker
include ApplicationWorker
-
- data_consistency :always
include ExceptionBacktrace
idempotent!
+ deduplicate :until_executed
loggable_arguments 2, 3
+ data_consistency :always
feature_category :importers
sidekiq_options status_expiration: StuckExportJobsWorker::EXPORT_JOBS_EXPIRATION
diff --git a/app/workers/bulk_imports/stuck_import_worker.rb b/app/workers/bulk_imports/stuck_import_worker.rb
new file mode 100644
index 00000000000..3fa4221728b
--- /dev/null
+++ b/app/workers/bulk_imports/stuck_import_worker.rb
@@ -0,0 +1,31 @@
+# frozen_string_literal: true
+
+module BulkImports
+ class StuckImportWorker
+ include ApplicationWorker
+
+ # This worker does not schedule other workers that require context.
+ include CronjobQueue # rubocop:disable Scalability/CronWorkerContext
+
+ idempotent!
+ data_consistency :always
+
+ feature_category :importers
+
+ def perform
+ BulkImport.stale.find_each do |import|
+ import.cleanup_stale
+ end
+
+ BulkImports::Entity.includes(:trackers).stale.find_each do |import| # rubocop: disable CodeReuse/ActiveRecord
+ ApplicationRecord.transaction do
+ import.cleanup_stale
+
+ import.trackers.find_each do |tracker|
+ tracker.cleanup_stale
+ end
+ end
+ end
+ end
+ end
+end
diff --git a/app/workers/ci/update_locked_unknown_artifacts_worker.rb b/app/workers/ci/update_locked_unknown_artifacts_worker.rb
new file mode 100644
index 00000000000..2d37ebb3c93
--- /dev/null
+++ b/app/workers/ci/update_locked_unknown_artifacts_worker.rb
@@ -0,0 +1,26 @@
+# frozen_string_literal: true
+
+module Ci
+ class UpdateLockedUnknownArtifactsWorker # rubocop:disable Scalability/IdempotentWorker
+ include ApplicationWorker
+
+ data_consistency :sticky
+ urgency :throttled
+
+ # rubocop:disable Scalability/CronWorkerContext
+ # This worker does not perform work scoped to a context
+ include CronjobQueue
+ # rubocop:enable Scalability/CronWorkerContext
+
+ feature_category :build_artifacts
+
+ def perform
+ return unless ::Feature.enabled?(:ci_job_artifacts_backlog_work)
+
+ artifact_counts = Ci::JobArtifacts::UpdateUnknownLockedStatusService.new.execute
+
+ log_extra_metadata_on_done(:removed_count, artifact_counts[:removed])
+ log_extra_metadata_on_done(:locked_count, artifact_counts[:locked])
+ end
+ end
+end
diff --git a/app/workers/concerns/chaos_queue.rb b/app/workers/concerns/chaos_queue.rb
index a9c557f0175..23e58b5182b 100644
--- a/app/workers/concerns/chaos_queue.rb
+++ b/app/workers/concerns/chaos_queue.rb
@@ -5,6 +5,6 @@ module ChaosQueue
included do
queue_namespace :chaos
- feature_category_not_owned!
+ feature_category :not_owned # rubocop:todo Gitlab/AvoidFeatureCategoryNotOwned
end
end
diff --git a/app/workers/concerns/git_garbage_collect_methods.rb b/app/workers/concerns/git_garbage_collect_methods.rb
index 13b7e7b5b1f..308ffacfc6b 100644
--- a/app/workers/concerns/git_garbage_collect_methods.rb
+++ b/app/workers/concerns/git_garbage_collect_methods.rb
@@ -121,8 +121,12 @@ module GitGarbageCollectMethods
end.new(repository)
end
+ # The option to enable/disable bitmaps has been removed in https://gitlab.com/gitlab-org/gitlab/-/issues/353777
+ # Now the options is always enabled
+ # This method and all the deprecated RPCs are going to be removed in
+ # https://gitlab.com/gitlab-org/gitlab/-/issues/353779
def bitmaps_enabled?
- Gitlab::CurrentSettings.housekeeping_bitmaps_enabled
+ true
end
def flush_ref_caches(resource)
diff --git a/app/workers/concerns/packages/cleanup_artifact_worker.rb b/app/workers/concerns/packages/cleanup_artifact_worker.rb
index d4ad023b4a8..a01d7e8abba 100644
--- a/app/workers/concerns/packages/cleanup_artifact_worker.rb
+++ b/app/workers/concerns/packages/cleanup_artifact_worker.rb
@@ -14,7 +14,9 @@ module Packages
artifact.destroy!
rescue StandardError
- artifact&.error!
+ unless artifact&.destroyed?
+ artifact&.update_column(:status, :error)
+ end
end
after_destroy
@@ -48,7 +50,7 @@ module Packages
to_delete = next_item
if to_delete
- to_delete.processing!
+ to_delete.update_column(:status, :processing)
log_cleanup_item(to_delete)
end
diff --git a/app/workers/concerns/reactive_cacheable_worker.rb b/app/workers/concerns/reactive_cacheable_worker.rb
index 78fcf8087c2..a598b8a9d7d 100644
--- a/app/workers/concerns/reactive_cacheable_worker.rb
+++ b/app/workers/concerns/reactive_cacheable_worker.rb
@@ -8,7 +8,10 @@ module ReactiveCacheableWorker
sidekiq_options retry: 3
- feature_category_not_owned!
+ # Feature category is different depending on the model that is using the
+ # reactive cache. Identified by the `related_class` attribute.
+ feature_category :not_owned # rubocop:todo Gitlab/AvoidFeatureCategoryNotOwned
+
loggable_arguments 0
def self.context_for_arguments(arguments)
diff --git a/app/workers/concerns/worker_attributes.rb b/app/workers/concerns/worker_attributes.rb
index 6f91418e38c..8f7a3da5429 100644
--- a/app/workers/concerns/worker_attributes.rb
+++ b/app/workers/concerns/worker_attributes.rb
@@ -35,17 +35,9 @@ module WorkerAttributes
class_methods do
def feature_category(value, *extras)
- raise "Invalid category. Use `feature_category_not_owned!` to mark a worker as not owned" if value == :not_owned
-
set_class_attribute(:feature_category, value)
end
- # Special case: mark this work as not associated with a feature category
- # this should be used for cross-cutting concerns, such as mailer workers.
- def feature_category_not_owned!
- set_class_attribute(:feature_category, :not_owned)
- end
-
# Special case: if a worker is not owned, get the feature category
# (if present) from the calling context.
def get_feature_category
diff --git a/app/workers/container_registry/migration/enqueuer_worker.rb b/app/workers/container_registry/migration/enqueuer_worker.rb
index 5feaba870e6..8705deb0cb2 100644
--- a/app/workers/container_registry/migration/enqueuer_worker.rb
+++ b/app/workers/container_registry/migration/enqueuer_worker.rb
@@ -6,6 +6,9 @@ module ContainerRegistry
include ApplicationWorker
include CronjobQueue # rubocop:disable Scalability/CronWorkerContext
include Gitlab::Utils::StrongMemoize
+ include ExclusiveLeaseGuard
+
+ DEFAULT_LEASE_TIMEOUT = 30.minutes.to_i.freeze
data_consistency :always
feature_category :container_registry
@@ -14,70 +17,103 @@ module ContainerRegistry
idempotent!
def perform
- return unless migration.enabled?
- return unless below_capacity?
- return unless waiting_time_passed?
+ re_enqueue = false
+ try_obtain_lease do
+ break unless runnable?
- re_enqueue_if_capacity if handle_aborted_migration || handle_next_migration
- rescue StandardError => e
- Gitlab::ErrorTracking.log_exception(
- e,
- next_repository_id: next_repository&.id,
- next_aborted_repository_id: next_aborted_repository&.id
- )
-
- next_repository&.abort_import
+ re_enqueue = handle_aborted_migration || handle_next_migration
+ end
+ re_enqueue_if_capacity if re_enqueue
end
private
def handle_aborted_migration
- return unless next_aborted_repository&.retry_aborted_migration
+ return unless next_aborted_repository
- log_extra_metadata_on_done(:container_repository_id, next_aborted_repository.id)
log_extra_metadata_on_done(:import_type, 'retry')
+ log_repository(next_aborted_repository)
+
+ next_aborted_repository.retry_aborted_migration
+
+ true
+ rescue StandardError => e
+ Gitlab::ErrorTracking.log_exception(e, next_aborted_repository_id: next_aborted_repository&.id)
true
+ ensure
+ log_repository_migration_state(next_aborted_repository)
end
def handle_next_migration
return unless next_repository
+
+ log_extra_metadata_on_done(:import_type, 'next')
+ log_repository(next_repository)
+
# We return true because the repository was successfully processed (migration_state is changed)
return true if tag_count_too_high?
return unless next_repository.start_pre_import
- log_extra_metadata_on_done(:container_repository_id, next_repository.id)
- log_extra_metadata_on_done(:import_type, 'next')
-
true
+ rescue StandardError => e
+ Gitlab::ErrorTracking.log_exception(e, next_repository_id: next_repository&.id)
+ next_repository&.abort_import
+
+ false
+ ensure
+ log_repository_migration_state(next_repository)
end
def tag_count_too_high?
return false unless next_repository.tags_count > migration.max_tags_count
next_repository.skip_import(reason: :too_many_tags)
+ log_extra_metadata_on_done(:tags_count_too_high, true)
+ log_extra_metadata_on_done(:max_tags_count_setting, migration.max_tags_count)
true
end
def below_capacity?
- current_capacity <= maximum_capacity
+ current_capacity < maximum_capacity
end
def waiting_time_passed?
delay = migration.enqueue_waiting_time
return true if delay == 0
- return true unless last_step_completed_repository
+ return true unless last_step_completed_repository&.last_import_step_done_at
last_step_completed_repository.last_import_step_done_at < Time.zone.now - delay
end
- def current_capacity
- strong_memoize(:current_capacity) do
- ContainerRepository.with_migration_states(
- %w[pre_importing pre_import_done importing]
- ).count
+ def runnable?
+ unless migration.enabled?
+ log_extra_metadata_on_done(:migration_enabled, false)
+ return false
+ end
+
+ unless below_capacity?
+ log_extra_metadata_on_done(:max_capacity_setting, maximum_capacity)
+ log_extra_metadata_on_done(:below_capacity, false)
+
+ return false
+ end
+
+ unless waiting_time_passed?
+ log_extra_metadata_on_done(:waiting_time_passed, false)
+ log_extra_metadata_on_done(:current_waiting_time_setting, migration.enqueue_waiting_time)
+
+ return false
end
+
+ true
+ end
+
+ def current_capacity
+ ContainerRepository.with_migration_states(
+ %w[pre_importing pre_import_done importing]
+ ).count
end
def maximum_capacity
@@ -107,10 +143,31 @@ module ContainerRegistry
end
def re_enqueue_if_capacity
- return unless current_capacity < maximum_capacity
+ return unless below_capacity?
self.class.perform_async
end
+
+ def log_repository(repository)
+ log_extra_metadata_on_done(:container_repository_id, repository&.id)
+ log_extra_metadata_on_done(:container_repository_path, repository&.path)
+ end
+
+ def log_repository_migration_state(repository)
+ return unless repository
+
+ log_extra_metadata_on_done(:container_repository_migration_state, repository.migration_state)
+ end
+
+ # used by ExclusiveLeaseGuard
+ def lease_key
+ 'container_registry:migration:enqueuer_worker'
+ end
+
+ # used by ExclusiveLeaseGuard
+ def lease_timeout
+ DEFAULT_LEASE_TIMEOUT
+ end
end
end
end
diff --git a/app/workers/container_registry/migration/guard_worker.rb b/app/workers/container_registry/migration/guard_worker.rb
index 77ae111c1cb..bab6b8c2a72 100644
--- a/app/workers/container_registry/migration/guard_worker.rb
+++ b/app/workers/container_registry/migration/guard_worker.rb
@@ -29,46 +29,45 @@ module ContainerRegistry
log_extra_metadata_on_done(:stale_migrations_count, repositories.to_a.size)
repositories.each do |repository|
- if abortable?(repository)
+ if actively_importing?(repository)
+ # if a repository is actively importing but not yet long_running, do nothing
+ if long_running_migration?(repository)
+ long_running_migration_ids << repository.id
+ cancel_long_running_migration(repository)
+ aborts_count += 1
+ end
+ else
repository.abort_import
aborts_count += 1
- else
- long_running_migration_ids << repository.id if long_running_migration?(repository)
end
end
log_extra_metadata_on_done(:aborted_stale_migrations_count, aborts_count)
if long_running_migration_ids.any?
- log_extra_metadata_on_done(:long_running_stale_migration_container_repository_ids, long_running_migration_ids)
+ log_extra_metadata_on_done(:aborted_long_running_migration_ids, long_running_migration_ids)
end
end
private
- # This can ping the Container Registry API.
- # We loop on a set of repositories to calls this function (see #perform)
- # In the worst case scenario, we have a n+1 API calls situation here.
- #
- # This is reasonable because the maximum amount of repositories looped
- # on is `25`. See ::ContainerRegistry::Migration.capacity.
- #
- # TODO We can remove this n+1 situation by having a Container Registry API
- # endpoint that accepts multiple repository paths at once. This is issue
+ # A repository is actively_importing if it has an importing migration state
+ # and that state matches the state in the registry
+ # TODO We can have an API call n+1 situation here. It can be solved when the
+ # endpoint accepts multiple repository paths at once. This is issue
# https://gitlab.com/gitlab-org/container-registry/-/issues/582
- def abortable?(repository)
- # early return to save one Container Registry API request
- return true unless repository.importing? || repository.pre_importing?
- return true unless external_migration_in_progress?(repository)
+ def actively_importing?(repository)
+ return false unless repository.importing? || repository.pre_importing?
+ return false unless external_state_matches_migration_state?(repository)
- false
+ true
end
def long_running_migration?(repository)
migration_start_timestamp(repository).before?(long_running_migration_threshold)
end
- def external_migration_in_progress?(repository)
+ def external_state_matches_migration_state?(repository)
status = repository.external_import_status
(status == 'pre_import_in_progress' && repository.pre_importing?) ||
@@ -96,6 +95,21 @@ module ContainerRegistry
def long_running_migration_threshold
@threshold ||= 30.minutes.ago
end
+
+ def cancel_long_running_migration(repository)
+ result = repository.migration_cancel
+
+ case result[:status]
+ when :ok
+ repository.skip_import(reason: :migration_canceled)
+ when :bad_request
+ repository.reconcile_import_status(result[:state]) do
+ repository.abort_import
+ end
+ else
+ repository.abort_import
+ end
+ end
end
end
end
diff --git a/app/workers/database/batched_background_migration/ci_database_worker.rb b/app/workers/database/batched_background_migration/ci_database_worker.rb
index 98ec6f98123..13314cf95e2 100644
--- a/app/workers/database/batched_background_migration/ci_database_worker.rb
+++ b/app/workers/database/batched_background_migration/ci_database_worker.rb
@@ -4,6 +4,10 @@ module Database
class CiDatabaseWorker # rubocop:disable Scalability/IdempotentWorker
include SingleDatabaseWorker
+ def self.enabled?
+ Feature.enabled?(:execute_batched_migrations_on_schedule_ci_database, type: :ops, default_enabled: :yaml)
+ end
+
def self.tracking_database
@tracking_database ||= Gitlab::Database::CI_DATABASE_NAME
end
diff --git a/app/workers/database/batched_background_migration/single_database_worker.rb b/app/workers/database/batched_background_migration/single_database_worker.rb
index 78c82a6549f..aeadda4b8e1 100644
--- a/app/workers/database/batched_background_migration/single_database_worker.rb
+++ b/app/workers/database/batched_background_migration/single_database_worker.rb
@@ -23,6 +23,10 @@ module Database
def tracking_database
raise NotImplementedError, "#{self.name} does not implement #{__method__}"
end
+
+ def enabled?
+ raise NotImplementedError, "#{self.name} does not implement #{__method__}"
+ end
# :nocov:
def lease_key
@@ -41,7 +45,7 @@ module Database
end
Gitlab::Database::SharedModel.using_connection(base_model.connection) do
- break unless Feature.enabled?(:execute_batched_migrations_on_schedule, type: :ops, default_enabled: :yaml) && active_migration
+ break unless self.class.enabled? && active_migration
with_exclusive_lease(active_migration.interval) do
# Now that we have the exclusive lease, reload migration in case another process has changed it.
diff --git a/app/workers/database/batched_background_migration_worker.rb b/app/workers/database/batched_background_migration_worker.rb
index 29804be832d..6a41fe70915 100644
--- a/app/workers/database/batched_background_migration_worker.rb
+++ b/app/workers/database/batched_background_migration_worker.rb
@@ -4,6 +4,10 @@ module Database
class BatchedBackgroundMigrationWorker # rubocop:disable Scalability/IdempotentWorker
include BatchedBackgroundMigration::SingleDatabaseWorker
+ def self.enabled?
+ Feature.enabled?(:execute_batched_migrations_on_schedule, type: :ops, default_enabled: :yaml)
+ end
+
def self.tracking_database
@tracking_database ||= Gitlab::Database::MAIN_DATABASE_NAME.to_sym
end
diff --git a/app/workers/database/ci_namespace_mirrors_consistency_check_worker.rb b/app/workers/database/ci_namespace_mirrors_consistency_check_worker.rb
new file mode 100644
index 00000000000..2b4253947ac
--- /dev/null
+++ b/app/workers/database/ci_namespace_mirrors_consistency_check_worker.rb
@@ -0,0 +1,28 @@
+# frozen_string_literal: true
+
+module Database
+ class CiNamespaceMirrorsConsistencyCheckWorker
+ include ApplicationWorker
+ include CronjobQueue # rubocop: disable Scalability/CronWorkerContext
+
+ sidekiq_options retry: false
+ feature_category :sharding
+ data_consistency :sticky
+ idempotent!
+
+ version 1
+
+ def perform
+ return if Feature.disabled?(:ci_namespace_mirrors_consistency_check, default_enabled: :yaml)
+
+ results = ConsistencyCheckService.new(
+ source_model: Namespace,
+ target_model: Ci::NamespaceMirror,
+ source_columns: %w[id traversal_ids],
+ target_columns: %w[namespace_id traversal_ids]
+ ).execute
+
+ log_extra_metadata_on_done(:results, results)
+ end
+ end
+end
diff --git a/app/workers/database/ci_project_mirrors_consistency_check_worker.rb b/app/workers/database/ci_project_mirrors_consistency_check_worker.rb
new file mode 100644
index 00000000000..e9413256617
--- /dev/null
+++ b/app/workers/database/ci_project_mirrors_consistency_check_worker.rb
@@ -0,0 +1,28 @@
+# frozen_string_literal: true
+
+module Database
+ class CiProjectMirrorsConsistencyCheckWorker
+ include ApplicationWorker
+ include CronjobQueue # rubocop: disable Scalability/CronWorkerContext
+
+ sidekiq_options retry: false
+ feature_category :sharding
+ data_consistency :sticky
+ idempotent!
+
+ version 1
+
+ def perform
+ return if Feature.disabled?(:ci_project_mirrors_consistency_check, default_enabled: :yaml)
+
+ results = ConsistencyCheckService.new(
+ source_model: Project,
+ target_model: Ci::ProjectMirror,
+ source_columns: %w[id namespace_id],
+ target_columns: %w[project_id namespace_id]
+ ).execute
+
+ log_extra_metadata_on_done(:results, results)
+ end
+ end
+end
diff --git a/app/workers/delete_stored_files_worker.rb b/app/workers/delete_stored_files_worker.rb
index d1080c8df64..86167a7fafe 100644
--- a/app/workers/delete_stored_files_worker.rb
+++ b/app/workers/delete_stored_files_worker.rb
@@ -7,7 +7,7 @@ class DeleteStoredFilesWorker # rubocop:disable Scalability/IdempotentWorker
sidekiq_options retry: 3
- feature_category_not_owned!
+ feature_category :not_owned # rubocop:todo Gitlab/AvoidFeatureCategoryNotOwned
loggable_arguments 0
def perform(class_name, keys)
diff --git a/app/workers/environments/auto_stop_worker.rb b/app/workers/environments/auto_stop_worker.rb
index 672a4f4121e..aee6e977550 100644
--- a/app/workers/environments/auto_stop_worker.rb
+++ b/app/workers/environments/auto_stop_worker.rb
@@ -10,8 +10,10 @@ module Environments
def perform(environment_id, params = {})
Environment.find_by_id(environment_id).try do |environment|
- user = environment.stop_action&.user
- environment.stop_with_action!(user)
+ stop_actions = environment.stop_actions
+
+ user = stop_actions.last&.user
+ environment.stop_with_actions!(user)
end
end
end
diff --git a/app/workers/flush_counter_increments_worker.rb b/app/workers/flush_counter_increments_worker.rb
index c4a3a5283cc..e21a7ee35e7 100644
--- a/app/workers/flush_counter_increments_worker.rb
+++ b/app/workers/flush_counter_increments_worker.rb
@@ -12,7 +12,10 @@ class FlushCounterIncrementsWorker
sidekiq_options retry: 3
- feature_category_not_owned!
+ # The increments in `ProjectStatistics` are owned by several teams depending
+ # on the counter
+ feature_category :not_owned # rubocop:disable Gitlab/AvoidFeatureCategoryNotOwned
+
urgency :low
deduplicate :until_executing, including_scheduled: true
diff --git a/app/workers/namespaces/invite_team_email_worker.rb b/app/workers/namespaces/invite_team_email_worker.rb
deleted file mode 100644
index eabf33a7fba..00000000000
--- a/app/workers/namespaces/invite_team_email_worker.rb
+++ /dev/null
@@ -1,22 +0,0 @@
-# frozen_string_literal: true
-
-module Namespaces
- class InviteTeamEmailWorker # rubocop:disable Scalability/IdempotentWorker
- include ApplicationWorker
-
- data_consistency :always
-
- feature_category :experimentation_activation
- urgency :low
-
- def perform(group_id, user_id)
- # rubocop: disable CodeReuse/ActiveRecord
- user = User.find_by(id: user_id)
- group = Group.find_by(id: group_id)
- # rubocop: enable CodeReuse/ActiveRecord
- return unless user && group
-
- Namespaces::InviteTeamEmailService.send_email(user, group)
- end
- end
-end
diff --git a/app/workers/namespaces/root_statistics_worker.rb b/app/workers/namespaces/root_statistics_worker.rb
index b97dbca2c1c..e1271dae335 100644
--- a/app/workers/namespaces/root_statistics_worker.rb
+++ b/app/workers/namespaces/root_statistics_worker.rb
@@ -20,8 +20,17 @@ module Namespaces
Namespaces::StatisticsRefresherService.new.execute(namespace)
namespace.aggregation_schedule.destroy
+
+ notify_storage_usage(namespace)
rescue ::Namespaces::StatisticsRefresherService::RefresherError, ActiveRecord::RecordNotFound => ex
Gitlab::ErrorTracking.track_exception(ex, namespace_id: namespace_id, namespace: namespace&.full_path)
end
+
+ private
+
+ def notify_storage_usage(namespace)
+ end
end
end
+
+Namespaces::RootStatisticsWorker.prepend_mod_with('Namespaces::RootStatisticsWorker')
diff --git a/app/workers/object_storage/background_move_worker.rb b/app/workers/object_storage/background_move_worker.rb
index 2204e504702..bb51f0d7e1f 100644
--- a/app/workers/object_storage/background_move_worker.rb
+++ b/app/workers/object_storage/background_move_worker.rb
@@ -8,7 +8,7 @@ module ObjectStorage
include ObjectStorageQueue
sidekiq_options retry: 5
- feature_category_not_owned!
+ feature_category :not_owned # rubocop:todo Gitlab/AvoidFeatureCategoryNotOwned
loggable_arguments 0, 1, 2, 3
def perform(uploader_class_name, subject_class_name, file_field, subject_id)
diff --git a/app/workers/object_storage/migrate_uploads_worker.rb b/app/workers/object_storage/migrate_uploads_worker.rb
index ea4a90cf9d2..b7d938e6b68 100644
--- a/app/workers/object_storage/migrate_uploads_worker.rb
+++ b/app/workers/object_storage/migrate_uploads_worker.rb
@@ -10,7 +10,7 @@ module ObjectStorage
sidekiq_options retry: 3
include ObjectStorageQueue
- feature_category_not_owned!
+ feature_category :not_owned # rubocop:todo Gitlab/AvoidFeatureCategoryNotOwned
loggable_arguments 0, 1, 2, 3
SanityCheckError = Class.new(StandardError)
diff --git a/app/workers/project_export_worker.rb b/app/workers/project_export_worker.rb
index e3f8c4bcd9d..ee892d43313 100644
--- a/app/workers/project_export_worker.rb
+++ b/app/workers/project_export_worker.rb
@@ -8,7 +8,7 @@ class ProjectExportWorker # rubocop:disable Scalability/IdempotentWorker
feature_category :importers
worker_resource_boundary :memory
- urgency :throttled
+ urgency :low
loggable_arguments 2, 3
sidekiq_options retry: false, dead: false
sidekiq_options status_expiration: StuckExportJobsWorker::EXPORT_JOBS_EXPIRATION
@@ -21,7 +21,10 @@ class ProjectExportWorker # rubocop:disable Scalability/IdempotentWorker
export_job&.start
- ::Projects::ImportExport::ExportService.new(project, current_user, params).execute(after_export)
+ export_service = ::Projects::ImportExport::ExportService.new(project, current_user, params)
+ export_service.execute(after_export)
+
+ log_exporters_duration(export_service)
export_job&.finish
rescue ActiveRecord::RecordNotFound => e
@@ -46,4 +49,13 @@ class ProjectExportWorker # rubocop:disable Scalability/IdempotentWorker
def log_failure(project_id, ex)
logger.error("Failed to export project #{project_id}: #{ex.message}")
end
+
+ def log_exporters_duration(export_service)
+ export_service.exporters.each do |exporter|
+ exporter_key = "#{exporter.class.name.demodulize.underscore}_duration_s".to_sym # e.g. uploads_saver_duration_s
+ exporter_duration = exporter.duration_s&.round(6)
+
+ log_extra_metadata_on_done(exporter_key, exporter_duration)
+ end
+ end
end
diff --git a/app/workers/projects/record_target_platforms_worker.rb b/app/workers/projects/record_target_platforms_worker.rb
new file mode 100644
index 00000000000..5b1f85ecca0
--- /dev/null
+++ b/app/workers/projects/record_target_platforms_worker.rb
@@ -0,0 +1,55 @@
+# frozen_string_literal: true
+
+module Projects
+ class RecordTargetPlatformsWorker
+ include ApplicationWorker
+ include ExclusiveLeaseGuard
+
+ LEASE_TIMEOUT = 1.hour.to_i
+ APPLE_PLATFORM_LANGUAGES = %w(swift objective-c).freeze
+
+ feature_category :experimentation_activation
+ data_consistency :always
+ deduplicate :until_executed
+ urgency :low
+ idempotent!
+
+ def perform(project_id)
+ @project = Project.find_by_id(project_id)
+
+ return unless project
+ return unless uses_apple_platform_languages?
+
+ try_obtain_lease do
+ @target_platforms = Projects::RecordTargetPlatformsService.new(project).execute
+ log_target_platforms_metadata
+ end
+ end
+
+ private
+
+ attr_reader :target_platforms, :project
+
+ def uses_apple_platform_languages?
+ project.repository_languages.with_programming_language(*APPLE_PLATFORM_LANGUAGES).present?
+ end
+
+ def log_target_platforms_metadata
+ return unless target_platforms.present?
+
+ log_extra_metadata_on_done(:target_platforms, target_platforms)
+ end
+
+ def lease_key
+ @lease_key ||= "#{self.class.name.underscore}:#{project.id}"
+ end
+
+ def lease_timeout
+ LEASE_TIMEOUT
+ end
+
+ def lease_release?
+ false
+ end
+ end
+end
diff --git a/app/workers/quality/test_data_cleanup_worker.rb b/app/workers/quality/test_data_cleanup_worker.rb
deleted file mode 100644
index 68b36cacbbf..00000000000
--- a/app/workers/quality/test_data_cleanup_worker.rb
+++ /dev/null
@@ -1,33 +0,0 @@
-# frozen_string_literal: true
-
-module Quality
- class TestDataCleanupWorker
- include ApplicationWorker
-
- data_consistency :always
- feature_category :quality_management
- urgency :low
-
- include CronjobQueue
- idempotent!
-
- KEEP_RECENT_DATA_DAY = 3
- GROUP_PATH_PATTERN = 'test-group-fulfillment'
- GROUP_OWNER_EMAIL_PATTERN = %w(test-user- gitlab-qa-user qa-user-).freeze
-
- # Remove test groups generated in E2E tests on gstg
- # rubocop: disable CodeReuse/ActiveRecord
- def perform
- return unless Gitlab.staging?
-
- Group.where('path like ?', "#{GROUP_PATH_PATTERN}%").where('created_at < ?', KEEP_RECENT_DATA_DAY.days.ago).each do |group|
- next unless GROUP_OWNER_EMAIL_PATTERN.any? { |pattern| group.owners.first.email.include?(pattern) }
-
- with_context(namespace: group, user: group.owners.first) do
- Groups::DestroyService.new(group, group.owners.first).execute
- end
- end
- end
- # rubocop: enable CodeReuse/ActiveRecord
- end
-end