diff options
author | GitLab Bot <gitlab-bot@gitlab.com> | 2022-04-20 13:00:54 +0300 |
---|---|---|
committer | GitLab Bot <gitlab-bot@gitlab.com> | 2022-04-20 13:00:54 +0300 |
commit | 3cccd102ba543e02725d247893729e5c73b38295 (patch) | |
tree | f36a04ec38517f5deaaacb5acc7d949688d1e187 /app/workers | |
parent | 205943281328046ef7b4528031b90fbda70c75ac (diff) |
Add latest changes from gitlab-org/gitlab@14-10-stable-eev14.10.0-rc42
Diffstat (limited to 'app/workers')
30 files changed, 445 insertions, 199 deletions
diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml index 48bdee4062b..bfb70e0d496 100644 --- a/app/workers/all_queues.yml +++ b/app/workers/all_queues.yml @@ -192,6 +192,15 @@ :weight: 1 :idempotent: true :tags: [] +- :name: cronjob:bulk_imports_stuck_import + :worker_name: BulkImports::StuckImportWorker + :feature_category: :importers + :has_external_dependencies: + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: cronjob:ci_archive_traces_cron :worker_name: Ci::ArchiveTracesCronWorker :feature_category: :continuous_integration @@ -255,6 +264,15 @@ :weight: 1 :idempotent: true :tags: [] +- :name: cronjob:ci_update_locked_unknown_artifacts + :worker_name: Ci::UpdateLockedUnknownArtifactsWorker + :feature_category: :build_artifacts + :has_external_dependencies: + :urgency: :throttled + :resource_boundary: :unknown + :weight: 1 + :idempotent: + :tags: [] - :name: cronjob:clusters_integrations_check_prometheus_health :worker_name: Clusters::Integrations::CheckPrometheusHealthWorker :feature_category: :incident_management @@ -318,6 +336,24 @@ :weight: 1 :idempotent: true :tags: [] +- :name: cronjob:database_ci_namespace_mirrors_consistency_check + :worker_name: Database::CiNamespaceMirrorsConsistencyCheckWorker + :feature_category: :sharding + :has_external_dependencies: + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] +- :name: cronjob:database_ci_project_mirrors_consistency_check + :worker_name: Database::CiProjectMirrorsConsistencyCheckWorker + :feature_category: :sharding + :has_external_dependencies: + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: cronjob:database_drop_detached_partitions :worker_name: Database::DropDetachedPartitionsWorker :feature_category: :database @@ -579,15 +615,6 @@ :weight: 1 :idempotent: :tags: [] -- :name: cronjob:quality_test_data_cleanup - :worker_name: Quality::TestDataCleanupWorker - :feature_category: :quality_management - :has_external_dependencies: - :urgency: :low - :resource_boundary: :unknown - :weight: 1 - :idempotent: true - :tags: [] - :name: cronjob:releases_manage_evidence :worker_name: Releases::ManageEvidenceWorker :feature_category: :release_evidence @@ -2578,15 +2605,6 @@ :weight: 1 :idempotent: :tags: [] -- :name: namespaces_invite_team_email - :worker_name: Namespaces::InviteTeamEmailWorker - :feature_category: :experimentation_activation - :has_external_dependencies: - :urgency: :low - :resource_boundary: :unknown - :weight: 1 - :idempotent: - :tags: [] - :name: namespaces_onboarding_issue_created :worker_name: Namespaces::OnboardingIssueCreatedWorker :feature_category: :onboarding @@ -2771,7 +2789,7 @@ :worker_name: ProjectExportWorker :feature_category: :importers :has_external_dependencies: - :urgency: :throttled + :urgency: :low :resource_boundary: :memory :weight: 1 :idempotent: @@ -2812,6 +2830,15 @@ :weight: 1 :idempotent: true :tags: [] +- :name: projects_record_target_platforms + :worker_name: Projects::RecordTargetPlatformsWorker + :feature_category: :experimentation_activation + :has_external_dependencies: + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: projects_refresh_build_artifacts_size_statistics :worker_name: Projects::RefreshBuildArtifactsSizeStatisticsWorker :feature_category: :build_artifacts diff --git a/app/workers/bulk_import_worker.rb b/app/workers/bulk_import_worker.rb index d560ebcc6e6..157586ca397 100644 --- a/app/workers/bulk_import_worker.rb +++ b/app/workers/bulk_import_worker.rb @@ -3,15 +3,12 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker include ApplicationWorker - data_consistency :always + PERFORM_DELAY = 5.seconds + data_consistency :always feature_category :importers - sidekiq_options retry: false, dead: false - PERFORM_DELAY = 5.seconds - DEFAULT_BATCH_SIZE = 5 - def perform(bulk_import_id) @bulk_import = BulkImport.find_by_id(bulk_import_id) @@ -19,11 +16,10 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker return if @bulk_import.finished? || @bulk_import.failed? return @bulk_import.fail_op! if all_entities_failed? return @bulk_import.finish! if all_entities_processed? && @bulk_import.started? - return re_enqueue if max_batch_size_exceeded? # Do not start more jobs if max allowed are already running @bulk_import.start! if @bulk_import.created? - created_entities.first(next_batch_size).each do |entity| + created_entities.find_each do |entity| entity.create_pipeline_trackers! BulkImports::ExportRequestWorker.perform_async(entity.id) @@ -45,10 +41,6 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker @entities ||= @bulk_import.entities end - def started_entities - entities.with_status(:started) - end - def created_entities entities.with_status(:created) end @@ -61,14 +53,6 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker entities.all? { |entity| entity.failed? } end - def max_batch_size_exceeded? - started_entities.count >= DEFAULT_BATCH_SIZE - end - - def next_batch_size - [DEFAULT_BATCH_SIZE - started_entities.count, 0].max - end - # A new BulkImportWorker job is enqueued to either # - Process the new BulkImports::Entity created during import (e.g. for the subgroups) # - Or to mark the `bulk_import` as finished diff --git a/app/workers/bulk_imports/entity_worker.rb b/app/workers/bulk_imports/entity_worker.rb index 70d6626df91..f6b1c693fe4 100644 --- a/app/workers/bulk_imports/entity_worker.rb +++ b/app/workers/bulk_imports/entity_worker.rb @@ -4,24 +4,32 @@ module BulkImports class EntityWorker # rubocop:disable Scalability/IdempotentWorker include ApplicationWorker + idempotent! + deduplicate :until_executing data_consistency :always - feature_category :importers - sidekiq_options retry: false, dead: false - worker_has_external_dependencies! - idempotent! - deduplicate :until_executed, including_scheduled: true - def perform(entity_id, current_stage = nil) - return if stage_running?(entity_id, current_stage) + if stage_running?(entity_id, current_stage) + logger.info( + structured_payload( + entity_id: entity_id, + current_stage: current_stage, + message: 'Stage running' + ) + ) + + return + end logger.info( - worker: self.class.name, - entity_id: entity_id, - current_stage: current_stage + structured_payload( + entity_id: entity_id, + current_stage: current_stage, + message: 'Stage starting' + ) ) next_pipeline_trackers_for(entity_id).each do |pipeline_tracker| @@ -33,10 +41,11 @@ module BulkImports end rescue StandardError => e logger.error( - worker: self.class.name, - entity_id: entity_id, - current_stage: current_stage, - error_message: e.message + structured_payload( + entity_id: entity_id, + current_stage: current_stage, + message: e.message + ) ) Gitlab::ErrorTracking.track_exception(e, entity_id: entity_id) diff --git a/app/workers/bulk_imports/export_request_worker.rb b/app/workers/bulk_imports/export_request_worker.rb index 21040178cee..0d3e4f013dd 100644 --- a/app/workers/bulk_imports/export_request_worker.rb +++ b/app/workers/bulk_imports/export_request_worker.rb @@ -42,10 +42,12 @@ module BulkImports correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id } - Gitlab::Import::Logger.warn( - attributes.merge( - bulk_import_id: entity.bulk_import.id, - bulk_import_entity_type: entity.source_type + Gitlab::Import::Logger.error( + structured_payload( + attributes.merge( + bulk_import_id: entity.bulk_import.id, + bulk_import_entity_type: entity.source_type + ) ) ) diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb index 03ec2f058ca..1a98705c151 100644 --- a/app/workers/bulk_imports/pipeline_worker.rb +++ b/app/workers/bulk_imports/pipeline_worker.rb @@ -4,14 +4,11 @@ module BulkImports class PipelineWorker # rubocop:disable Scalability/IdempotentWorker include ApplicationWorker - data_consistency :always - - NDJSON_PIPELINE_PERFORM_DELAY = 1.minute + NDJSON_PIPELINE_PERFORM_DELAY = 10.seconds + data_consistency :always feature_category :importers - sidekiq_options retry: false, dead: false - worker_has_external_dependencies! def perform(pipeline_tracker_id, stage, entity_id) @@ -21,18 +18,20 @@ module BulkImports if pipeline_tracker.present? logger.info( - worker: self.class.name, - entity_id: pipeline_tracker.entity.id, - pipeline_name: pipeline_tracker.pipeline_name + structured_payload( + entity_id: pipeline_tracker.entity.id, + pipeline_name: pipeline_tracker.pipeline_name + ) ) run(pipeline_tracker) else logger.error( - worker: self.class.name, - entity_id: entity_id, - pipeline_tracker_id: pipeline_tracker_id, - message: 'Unstarted pipeline not found' + structured_payload( + entity_id: entity_id, + pipeline_tracker_id: pipeline_tracker_id, + message: 'Unstarted pipeline not found' + ) ) end @@ -66,10 +65,11 @@ module BulkImports rescue BulkImports::NetworkError => e if e.retriable?(pipeline_tracker) logger.error( - worker: self.class.name, - entity_id: pipeline_tracker.entity.id, - pipeline_name: pipeline_tracker.pipeline_name, - message: "Retrying error: #{e.message}" + structured_payload( + entity_id: pipeline_tracker.entity.id, + pipeline_name: pipeline_tracker.pipeline_name, + message: "Retrying error: #{e.message}" + ) ) pipeline_tracker.update!(status_event: 'retry', jid: jid) @@ -86,10 +86,11 @@ module BulkImports pipeline_tracker.update!(status_event: 'fail_op', jid: jid) logger.error( - worker: self.class.name, - entity_id: pipeline_tracker.entity.id, - pipeline_name: pipeline_tracker.pipeline_name, - message: exception.message + structured_payload( + entity_id: pipeline_tracker.entity.id, + pipeline_name: pipeline_tracker.pipeline_name, + message: exception.message + ) ) Gitlab::ErrorTracking.track_exception( diff --git a/app/workers/bulk_imports/relation_export_worker.rb b/app/workers/bulk_imports/relation_export_worker.rb index 9324b79cc75..dcac841b3b2 100644 --- a/app/workers/bulk_imports/relation_export_worker.rb +++ b/app/workers/bulk_imports/relation_export_worker.rb @@ -3,12 +3,12 @@ module BulkImports class RelationExportWorker include ApplicationWorker - - data_consistency :always include ExceptionBacktrace idempotent! + deduplicate :until_executed loggable_arguments 2, 3 + data_consistency :always feature_category :importers sidekiq_options status_expiration: StuckExportJobsWorker::EXPORT_JOBS_EXPIRATION diff --git a/app/workers/bulk_imports/stuck_import_worker.rb b/app/workers/bulk_imports/stuck_import_worker.rb new file mode 100644 index 00000000000..3fa4221728b --- /dev/null +++ b/app/workers/bulk_imports/stuck_import_worker.rb @@ -0,0 +1,31 @@ +# frozen_string_literal: true + +module BulkImports + class StuckImportWorker + include ApplicationWorker + + # This worker does not schedule other workers that require context. + include CronjobQueue # rubocop:disable Scalability/CronWorkerContext + + idempotent! + data_consistency :always + + feature_category :importers + + def perform + BulkImport.stale.find_each do |import| + import.cleanup_stale + end + + BulkImports::Entity.includes(:trackers).stale.find_each do |import| # rubocop: disable CodeReuse/ActiveRecord + ApplicationRecord.transaction do + import.cleanup_stale + + import.trackers.find_each do |tracker| + tracker.cleanup_stale + end + end + end + end + end +end diff --git a/app/workers/ci/update_locked_unknown_artifacts_worker.rb b/app/workers/ci/update_locked_unknown_artifacts_worker.rb new file mode 100644 index 00000000000..2d37ebb3c93 --- /dev/null +++ b/app/workers/ci/update_locked_unknown_artifacts_worker.rb @@ -0,0 +1,26 @@ +# frozen_string_literal: true + +module Ci + class UpdateLockedUnknownArtifactsWorker # rubocop:disable Scalability/IdempotentWorker + include ApplicationWorker + + data_consistency :sticky + urgency :throttled + + # rubocop:disable Scalability/CronWorkerContext + # This worker does not perform work scoped to a context + include CronjobQueue + # rubocop:enable Scalability/CronWorkerContext + + feature_category :build_artifacts + + def perform + return unless ::Feature.enabled?(:ci_job_artifacts_backlog_work) + + artifact_counts = Ci::JobArtifacts::UpdateUnknownLockedStatusService.new.execute + + log_extra_metadata_on_done(:removed_count, artifact_counts[:removed]) + log_extra_metadata_on_done(:locked_count, artifact_counts[:locked]) + end + end +end diff --git a/app/workers/concerns/chaos_queue.rb b/app/workers/concerns/chaos_queue.rb index a9c557f0175..23e58b5182b 100644 --- a/app/workers/concerns/chaos_queue.rb +++ b/app/workers/concerns/chaos_queue.rb @@ -5,6 +5,6 @@ module ChaosQueue included do queue_namespace :chaos - feature_category_not_owned! + feature_category :not_owned # rubocop:todo Gitlab/AvoidFeatureCategoryNotOwned end end diff --git a/app/workers/concerns/git_garbage_collect_methods.rb b/app/workers/concerns/git_garbage_collect_methods.rb index 13b7e7b5b1f..308ffacfc6b 100644 --- a/app/workers/concerns/git_garbage_collect_methods.rb +++ b/app/workers/concerns/git_garbage_collect_methods.rb @@ -121,8 +121,12 @@ module GitGarbageCollectMethods end.new(repository) end + # The option to enable/disable bitmaps has been removed in https://gitlab.com/gitlab-org/gitlab/-/issues/353777 + # Now the options is always enabled + # This method and all the deprecated RPCs are going to be removed in + # https://gitlab.com/gitlab-org/gitlab/-/issues/353779 def bitmaps_enabled? - Gitlab::CurrentSettings.housekeeping_bitmaps_enabled + true end def flush_ref_caches(resource) diff --git a/app/workers/concerns/packages/cleanup_artifact_worker.rb b/app/workers/concerns/packages/cleanup_artifact_worker.rb index d4ad023b4a8..a01d7e8abba 100644 --- a/app/workers/concerns/packages/cleanup_artifact_worker.rb +++ b/app/workers/concerns/packages/cleanup_artifact_worker.rb @@ -14,7 +14,9 @@ module Packages artifact.destroy! rescue StandardError - artifact&.error! + unless artifact&.destroyed? + artifact&.update_column(:status, :error) + end end after_destroy @@ -48,7 +50,7 @@ module Packages to_delete = next_item if to_delete - to_delete.processing! + to_delete.update_column(:status, :processing) log_cleanup_item(to_delete) end diff --git a/app/workers/concerns/reactive_cacheable_worker.rb b/app/workers/concerns/reactive_cacheable_worker.rb index 78fcf8087c2..a598b8a9d7d 100644 --- a/app/workers/concerns/reactive_cacheable_worker.rb +++ b/app/workers/concerns/reactive_cacheable_worker.rb @@ -8,7 +8,10 @@ module ReactiveCacheableWorker sidekiq_options retry: 3 - feature_category_not_owned! + # Feature category is different depending on the model that is using the + # reactive cache. Identified by the `related_class` attribute. + feature_category :not_owned # rubocop:todo Gitlab/AvoidFeatureCategoryNotOwned + loggable_arguments 0 def self.context_for_arguments(arguments) diff --git a/app/workers/concerns/worker_attributes.rb b/app/workers/concerns/worker_attributes.rb index 6f91418e38c..8f7a3da5429 100644 --- a/app/workers/concerns/worker_attributes.rb +++ b/app/workers/concerns/worker_attributes.rb @@ -35,17 +35,9 @@ module WorkerAttributes class_methods do def feature_category(value, *extras) - raise "Invalid category. Use `feature_category_not_owned!` to mark a worker as not owned" if value == :not_owned - set_class_attribute(:feature_category, value) end - # Special case: mark this work as not associated with a feature category - # this should be used for cross-cutting concerns, such as mailer workers. - def feature_category_not_owned! - set_class_attribute(:feature_category, :not_owned) - end - # Special case: if a worker is not owned, get the feature category # (if present) from the calling context. def get_feature_category diff --git a/app/workers/container_registry/migration/enqueuer_worker.rb b/app/workers/container_registry/migration/enqueuer_worker.rb index 5feaba870e6..8705deb0cb2 100644 --- a/app/workers/container_registry/migration/enqueuer_worker.rb +++ b/app/workers/container_registry/migration/enqueuer_worker.rb @@ -6,6 +6,9 @@ module ContainerRegistry include ApplicationWorker include CronjobQueue # rubocop:disable Scalability/CronWorkerContext include Gitlab::Utils::StrongMemoize + include ExclusiveLeaseGuard + + DEFAULT_LEASE_TIMEOUT = 30.minutes.to_i.freeze data_consistency :always feature_category :container_registry @@ -14,70 +17,103 @@ module ContainerRegistry idempotent! def perform - return unless migration.enabled? - return unless below_capacity? - return unless waiting_time_passed? + re_enqueue = false + try_obtain_lease do + break unless runnable? - re_enqueue_if_capacity if handle_aborted_migration || handle_next_migration - rescue StandardError => e - Gitlab::ErrorTracking.log_exception( - e, - next_repository_id: next_repository&.id, - next_aborted_repository_id: next_aborted_repository&.id - ) - - next_repository&.abort_import + re_enqueue = handle_aborted_migration || handle_next_migration + end + re_enqueue_if_capacity if re_enqueue end private def handle_aborted_migration - return unless next_aborted_repository&.retry_aborted_migration + return unless next_aborted_repository - log_extra_metadata_on_done(:container_repository_id, next_aborted_repository.id) log_extra_metadata_on_done(:import_type, 'retry') + log_repository(next_aborted_repository) + + next_aborted_repository.retry_aborted_migration + + true + rescue StandardError => e + Gitlab::ErrorTracking.log_exception(e, next_aborted_repository_id: next_aborted_repository&.id) true + ensure + log_repository_migration_state(next_aborted_repository) end def handle_next_migration return unless next_repository + + log_extra_metadata_on_done(:import_type, 'next') + log_repository(next_repository) + # We return true because the repository was successfully processed (migration_state is changed) return true if tag_count_too_high? return unless next_repository.start_pre_import - log_extra_metadata_on_done(:container_repository_id, next_repository.id) - log_extra_metadata_on_done(:import_type, 'next') - true + rescue StandardError => e + Gitlab::ErrorTracking.log_exception(e, next_repository_id: next_repository&.id) + next_repository&.abort_import + + false + ensure + log_repository_migration_state(next_repository) end def tag_count_too_high? return false unless next_repository.tags_count > migration.max_tags_count next_repository.skip_import(reason: :too_many_tags) + log_extra_metadata_on_done(:tags_count_too_high, true) + log_extra_metadata_on_done(:max_tags_count_setting, migration.max_tags_count) true end def below_capacity? - current_capacity <= maximum_capacity + current_capacity < maximum_capacity end def waiting_time_passed? delay = migration.enqueue_waiting_time return true if delay == 0 - return true unless last_step_completed_repository + return true unless last_step_completed_repository&.last_import_step_done_at last_step_completed_repository.last_import_step_done_at < Time.zone.now - delay end - def current_capacity - strong_memoize(:current_capacity) do - ContainerRepository.with_migration_states( - %w[pre_importing pre_import_done importing] - ).count + def runnable? + unless migration.enabled? + log_extra_metadata_on_done(:migration_enabled, false) + return false + end + + unless below_capacity? + log_extra_metadata_on_done(:max_capacity_setting, maximum_capacity) + log_extra_metadata_on_done(:below_capacity, false) + + return false + end + + unless waiting_time_passed? + log_extra_metadata_on_done(:waiting_time_passed, false) + log_extra_metadata_on_done(:current_waiting_time_setting, migration.enqueue_waiting_time) + + return false end + + true + end + + def current_capacity + ContainerRepository.with_migration_states( + %w[pre_importing pre_import_done importing] + ).count end def maximum_capacity @@ -107,10 +143,31 @@ module ContainerRegistry end def re_enqueue_if_capacity - return unless current_capacity < maximum_capacity + return unless below_capacity? self.class.perform_async end + + def log_repository(repository) + log_extra_metadata_on_done(:container_repository_id, repository&.id) + log_extra_metadata_on_done(:container_repository_path, repository&.path) + end + + def log_repository_migration_state(repository) + return unless repository + + log_extra_metadata_on_done(:container_repository_migration_state, repository.migration_state) + end + + # used by ExclusiveLeaseGuard + def lease_key + 'container_registry:migration:enqueuer_worker' + end + + # used by ExclusiveLeaseGuard + def lease_timeout + DEFAULT_LEASE_TIMEOUT + end end end end diff --git a/app/workers/container_registry/migration/guard_worker.rb b/app/workers/container_registry/migration/guard_worker.rb index 77ae111c1cb..bab6b8c2a72 100644 --- a/app/workers/container_registry/migration/guard_worker.rb +++ b/app/workers/container_registry/migration/guard_worker.rb @@ -29,46 +29,45 @@ module ContainerRegistry log_extra_metadata_on_done(:stale_migrations_count, repositories.to_a.size) repositories.each do |repository| - if abortable?(repository) + if actively_importing?(repository) + # if a repository is actively importing but not yet long_running, do nothing + if long_running_migration?(repository) + long_running_migration_ids << repository.id + cancel_long_running_migration(repository) + aborts_count += 1 + end + else repository.abort_import aborts_count += 1 - else - long_running_migration_ids << repository.id if long_running_migration?(repository) end end log_extra_metadata_on_done(:aborted_stale_migrations_count, aborts_count) if long_running_migration_ids.any? - log_extra_metadata_on_done(:long_running_stale_migration_container_repository_ids, long_running_migration_ids) + log_extra_metadata_on_done(:aborted_long_running_migration_ids, long_running_migration_ids) end end private - # This can ping the Container Registry API. - # We loop on a set of repositories to calls this function (see #perform) - # In the worst case scenario, we have a n+1 API calls situation here. - # - # This is reasonable because the maximum amount of repositories looped - # on is `25`. See ::ContainerRegistry::Migration.capacity. - # - # TODO We can remove this n+1 situation by having a Container Registry API - # endpoint that accepts multiple repository paths at once. This is issue + # A repository is actively_importing if it has an importing migration state + # and that state matches the state in the registry + # TODO We can have an API call n+1 situation here. It can be solved when the + # endpoint accepts multiple repository paths at once. This is issue # https://gitlab.com/gitlab-org/container-registry/-/issues/582 - def abortable?(repository) - # early return to save one Container Registry API request - return true unless repository.importing? || repository.pre_importing? - return true unless external_migration_in_progress?(repository) + def actively_importing?(repository) + return false unless repository.importing? || repository.pre_importing? + return false unless external_state_matches_migration_state?(repository) - false + true end def long_running_migration?(repository) migration_start_timestamp(repository).before?(long_running_migration_threshold) end - def external_migration_in_progress?(repository) + def external_state_matches_migration_state?(repository) status = repository.external_import_status (status == 'pre_import_in_progress' && repository.pre_importing?) || @@ -96,6 +95,21 @@ module ContainerRegistry def long_running_migration_threshold @threshold ||= 30.minutes.ago end + + def cancel_long_running_migration(repository) + result = repository.migration_cancel + + case result[:status] + when :ok + repository.skip_import(reason: :migration_canceled) + when :bad_request + repository.reconcile_import_status(result[:state]) do + repository.abort_import + end + else + repository.abort_import + end + end end end end diff --git a/app/workers/database/batched_background_migration/ci_database_worker.rb b/app/workers/database/batched_background_migration/ci_database_worker.rb index 98ec6f98123..13314cf95e2 100644 --- a/app/workers/database/batched_background_migration/ci_database_worker.rb +++ b/app/workers/database/batched_background_migration/ci_database_worker.rb @@ -4,6 +4,10 @@ module Database class CiDatabaseWorker # rubocop:disable Scalability/IdempotentWorker include SingleDatabaseWorker + def self.enabled? + Feature.enabled?(:execute_batched_migrations_on_schedule_ci_database, type: :ops, default_enabled: :yaml) + end + def self.tracking_database @tracking_database ||= Gitlab::Database::CI_DATABASE_NAME end diff --git a/app/workers/database/batched_background_migration/single_database_worker.rb b/app/workers/database/batched_background_migration/single_database_worker.rb index 78c82a6549f..aeadda4b8e1 100644 --- a/app/workers/database/batched_background_migration/single_database_worker.rb +++ b/app/workers/database/batched_background_migration/single_database_worker.rb @@ -23,6 +23,10 @@ module Database def tracking_database raise NotImplementedError, "#{self.name} does not implement #{__method__}" end + + def enabled? + raise NotImplementedError, "#{self.name} does not implement #{__method__}" + end # :nocov: def lease_key @@ -41,7 +45,7 @@ module Database end Gitlab::Database::SharedModel.using_connection(base_model.connection) do - break unless Feature.enabled?(:execute_batched_migrations_on_schedule, type: :ops, default_enabled: :yaml) && active_migration + break unless self.class.enabled? && active_migration with_exclusive_lease(active_migration.interval) do # Now that we have the exclusive lease, reload migration in case another process has changed it. diff --git a/app/workers/database/batched_background_migration_worker.rb b/app/workers/database/batched_background_migration_worker.rb index 29804be832d..6a41fe70915 100644 --- a/app/workers/database/batched_background_migration_worker.rb +++ b/app/workers/database/batched_background_migration_worker.rb @@ -4,6 +4,10 @@ module Database class BatchedBackgroundMigrationWorker # rubocop:disable Scalability/IdempotentWorker include BatchedBackgroundMigration::SingleDatabaseWorker + def self.enabled? + Feature.enabled?(:execute_batched_migrations_on_schedule, type: :ops, default_enabled: :yaml) + end + def self.tracking_database @tracking_database ||= Gitlab::Database::MAIN_DATABASE_NAME.to_sym end diff --git a/app/workers/database/ci_namespace_mirrors_consistency_check_worker.rb b/app/workers/database/ci_namespace_mirrors_consistency_check_worker.rb new file mode 100644 index 00000000000..2b4253947ac --- /dev/null +++ b/app/workers/database/ci_namespace_mirrors_consistency_check_worker.rb @@ -0,0 +1,28 @@ +# frozen_string_literal: true + +module Database + class CiNamespaceMirrorsConsistencyCheckWorker + include ApplicationWorker + include CronjobQueue # rubocop: disable Scalability/CronWorkerContext + + sidekiq_options retry: false + feature_category :sharding + data_consistency :sticky + idempotent! + + version 1 + + def perform + return if Feature.disabled?(:ci_namespace_mirrors_consistency_check, default_enabled: :yaml) + + results = ConsistencyCheckService.new( + source_model: Namespace, + target_model: Ci::NamespaceMirror, + source_columns: %w[id traversal_ids], + target_columns: %w[namespace_id traversal_ids] + ).execute + + log_extra_metadata_on_done(:results, results) + end + end +end diff --git a/app/workers/database/ci_project_mirrors_consistency_check_worker.rb b/app/workers/database/ci_project_mirrors_consistency_check_worker.rb new file mode 100644 index 00000000000..e9413256617 --- /dev/null +++ b/app/workers/database/ci_project_mirrors_consistency_check_worker.rb @@ -0,0 +1,28 @@ +# frozen_string_literal: true + +module Database + class CiProjectMirrorsConsistencyCheckWorker + include ApplicationWorker + include CronjobQueue # rubocop: disable Scalability/CronWorkerContext + + sidekiq_options retry: false + feature_category :sharding + data_consistency :sticky + idempotent! + + version 1 + + def perform + return if Feature.disabled?(:ci_project_mirrors_consistency_check, default_enabled: :yaml) + + results = ConsistencyCheckService.new( + source_model: Project, + target_model: Ci::ProjectMirror, + source_columns: %w[id namespace_id], + target_columns: %w[project_id namespace_id] + ).execute + + log_extra_metadata_on_done(:results, results) + end + end +end diff --git a/app/workers/delete_stored_files_worker.rb b/app/workers/delete_stored_files_worker.rb index d1080c8df64..86167a7fafe 100644 --- a/app/workers/delete_stored_files_worker.rb +++ b/app/workers/delete_stored_files_worker.rb @@ -7,7 +7,7 @@ class DeleteStoredFilesWorker # rubocop:disable Scalability/IdempotentWorker sidekiq_options retry: 3 - feature_category_not_owned! + feature_category :not_owned # rubocop:todo Gitlab/AvoidFeatureCategoryNotOwned loggable_arguments 0 def perform(class_name, keys) diff --git a/app/workers/environments/auto_stop_worker.rb b/app/workers/environments/auto_stop_worker.rb index 672a4f4121e..aee6e977550 100644 --- a/app/workers/environments/auto_stop_worker.rb +++ b/app/workers/environments/auto_stop_worker.rb @@ -10,8 +10,10 @@ module Environments def perform(environment_id, params = {}) Environment.find_by_id(environment_id).try do |environment| - user = environment.stop_action&.user - environment.stop_with_action!(user) + stop_actions = environment.stop_actions + + user = stop_actions.last&.user + environment.stop_with_actions!(user) end end end diff --git a/app/workers/flush_counter_increments_worker.rb b/app/workers/flush_counter_increments_worker.rb index c4a3a5283cc..e21a7ee35e7 100644 --- a/app/workers/flush_counter_increments_worker.rb +++ b/app/workers/flush_counter_increments_worker.rb @@ -12,7 +12,10 @@ class FlushCounterIncrementsWorker sidekiq_options retry: 3 - feature_category_not_owned! + # The increments in `ProjectStatistics` are owned by several teams depending + # on the counter + feature_category :not_owned # rubocop:disable Gitlab/AvoidFeatureCategoryNotOwned + urgency :low deduplicate :until_executing, including_scheduled: true diff --git a/app/workers/namespaces/invite_team_email_worker.rb b/app/workers/namespaces/invite_team_email_worker.rb deleted file mode 100644 index eabf33a7fba..00000000000 --- a/app/workers/namespaces/invite_team_email_worker.rb +++ /dev/null @@ -1,22 +0,0 @@ -# frozen_string_literal: true - -module Namespaces - class InviteTeamEmailWorker # rubocop:disable Scalability/IdempotentWorker - include ApplicationWorker - - data_consistency :always - - feature_category :experimentation_activation - urgency :low - - def perform(group_id, user_id) - # rubocop: disable CodeReuse/ActiveRecord - user = User.find_by(id: user_id) - group = Group.find_by(id: group_id) - # rubocop: enable CodeReuse/ActiveRecord - return unless user && group - - Namespaces::InviteTeamEmailService.send_email(user, group) - end - end -end diff --git a/app/workers/namespaces/root_statistics_worker.rb b/app/workers/namespaces/root_statistics_worker.rb index b97dbca2c1c..e1271dae335 100644 --- a/app/workers/namespaces/root_statistics_worker.rb +++ b/app/workers/namespaces/root_statistics_worker.rb @@ -20,8 +20,17 @@ module Namespaces Namespaces::StatisticsRefresherService.new.execute(namespace) namespace.aggregation_schedule.destroy + + notify_storage_usage(namespace) rescue ::Namespaces::StatisticsRefresherService::RefresherError, ActiveRecord::RecordNotFound => ex Gitlab::ErrorTracking.track_exception(ex, namespace_id: namespace_id, namespace: namespace&.full_path) end + + private + + def notify_storage_usage(namespace) + end end end + +Namespaces::RootStatisticsWorker.prepend_mod_with('Namespaces::RootStatisticsWorker') diff --git a/app/workers/object_storage/background_move_worker.rb b/app/workers/object_storage/background_move_worker.rb index 2204e504702..bb51f0d7e1f 100644 --- a/app/workers/object_storage/background_move_worker.rb +++ b/app/workers/object_storage/background_move_worker.rb @@ -8,7 +8,7 @@ module ObjectStorage include ObjectStorageQueue sidekiq_options retry: 5 - feature_category_not_owned! + feature_category :not_owned # rubocop:todo Gitlab/AvoidFeatureCategoryNotOwned loggable_arguments 0, 1, 2, 3 def perform(uploader_class_name, subject_class_name, file_field, subject_id) diff --git a/app/workers/object_storage/migrate_uploads_worker.rb b/app/workers/object_storage/migrate_uploads_worker.rb index ea4a90cf9d2..b7d938e6b68 100644 --- a/app/workers/object_storage/migrate_uploads_worker.rb +++ b/app/workers/object_storage/migrate_uploads_worker.rb @@ -10,7 +10,7 @@ module ObjectStorage sidekiq_options retry: 3 include ObjectStorageQueue - feature_category_not_owned! + feature_category :not_owned # rubocop:todo Gitlab/AvoidFeatureCategoryNotOwned loggable_arguments 0, 1, 2, 3 SanityCheckError = Class.new(StandardError) diff --git a/app/workers/project_export_worker.rb b/app/workers/project_export_worker.rb index e3f8c4bcd9d..ee892d43313 100644 --- a/app/workers/project_export_worker.rb +++ b/app/workers/project_export_worker.rb @@ -8,7 +8,7 @@ class ProjectExportWorker # rubocop:disable Scalability/IdempotentWorker feature_category :importers worker_resource_boundary :memory - urgency :throttled + urgency :low loggable_arguments 2, 3 sidekiq_options retry: false, dead: false sidekiq_options status_expiration: StuckExportJobsWorker::EXPORT_JOBS_EXPIRATION @@ -21,7 +21,10 @@ class ProjectExportWorker # rubocop:disable Scalability/IdempotentWorker export_job&.start - ::Projects::ImportExport::ExportService.new(project, current_user, params).execute(after_export) + export_service = ::Projects::ImportExport::ExportService.new(project, current_user, params) + export_service.execute(after_export) + + log_exporters_duration(export_service) export_job&.finish rescue ActiveRecord::RecordNotFound => e @@ -46,4 +49,13 @@ class ProjectExportWorker # rubocop:disable Scalability/IdempotentWorker def log_failure(project_id, ex) logger.error("Failed to export project #{project_id}: #{ex.message}") end + + def log_exporters_duration(export_service) + export_service.exporters.each do |exporter| + exporter_key = "#{exporter.class.name.demodulize.underscore}_duration_s".to_sym # e.g. uploads_saver_duration_s + exporter_duration = exporter.duration_s&.round(6) + + log_extra_metadata_on_done(exporter_key, exporter_duration) + end + end end diff --git a/app/workers/projects/record_target_platforms_worker.rb b/app/workers/projects/record_target_platforms_worker.rb new file mode 100644 index 00000000000..5b1f85ecca0 --- /dev/null +++ b/app/workers/projects/record_target_platforms_worker.rb @@ -0,0 +1,55 @@ +# frozen_string_literal: true + +module Projects + class RecordTargetPlatformsWorker + include ApplicationWorker + include ExclusiveLeaseGuard + + LEASE_TIMEOUT = 1.hour.to_i + APPLE_PLATFORM_LANGUAGES = %w(swift objective-c).freeze + + feature_category :experimentation_activation + data_consistency :always + deduplicate :until_executed + urgency :low + idempotent! + + def perform(project_id) + @project = Project.find_by_id(project_id) + + return unless project + return unless uses_apple_platform_languages? + + try_obtain_lease do + @target_platforms = Projects::RecordTargetPlatformsService.new(project).execute + log_target_platforms_metadata + end + end + + private + + attr_reader :target_platforms, :project + + def uses_apple_platform_languages? + project.repository_languages.with_programming_language(*APPLE_PLATFORM_LANGUAGES).present? + end + + def log_target_platforms_metadata + return unless target_platforms.present? + + log_extra_metadata_on_done(:target_platforms, target_platforms) + end + + def lease_key + @lease_key ||= "#{self.class.name.underscore}:#{project.id}" + end + + def lease_timeout + LEASE_TIMEOUT + end + + def lease_release? + false + end + end +end diff --git a/app/workers/quality/test_data_cleanup_worker.rb b/app/workers/quality/test_data_cleanup_worker.rb deleted file mode 100644 index 68b36cacbbf..00000000000 --- a/app/workers/quality/test_data_cleanup_worker.rb +++ /dev/null @@ -1,33 +0,0 @@ -# frozen_string_literal: true - -module Quality - class TestDataCleanupWorker - include ApplicationWorker - - data_consistency :always - feature_category :quality_management - urgency :low - - include CronjobQueue - idempotent! - - KEEP_RECENT_DATA_DAY = 3 - GROUP_PATH_PATTERN = 'test-group-fulfillment' - GROUP_OWNER_EMAIL_PATTERN = %w(test-user- gitlab-qa-user qa-user-).freeze - - # Remove test groups generated in E2E tests on gstg - # rubocop: disable CodeReuse/ActiveRecord - def perform - return unless Gitlab.staging? - - Group.where('path like ?', "#{GROUP_PATH_PATTERN}%").where('created_at < ?', KEEP_RECENT_DATA_DAY.days.ago).each do |group| - next unless GROUP_OWNER_EMAIL_PATTERN.any? { |pattern| group.owners.first.email.include?(pattern) } - - with_context(namespace: group, user: group.owners.first) do - Groups::DestroyService.new(group, group.owners.first).execute - end - end - end - # rubocop: enable CodeReuse/ActiveRecord - end -end |