diff options
author | GitLab Bot <gitlab-bot@gitlab.com> | 2023-12-19 14:01:45 +0300 |
---|---|---|
committer | GitLab Bot <gitlab-bot@gitlab.com> | 2023-12-19 14:01:45 +0300 |
commit | 9297025d0b7ddf095eb618dfaaab2ff8f2018d8b (patch) | |
tree | 865198c01d1824a9b098127baa3ab980c9cd2c06 /app/workers | |
parent | 6372471f43ee03c05a7c1f8b0c6ac6b8a7431dbe (diff) |
Add latest changes from gitlab-org/gitlab@16-7-stable-eev16.7.0-rc42
Diffstat (limited to 'app/workers')
57 files changed, 772 insertions, 208 deletions
diff --git a/app/workers/abuse/trust_score_worker.rb b/app/workers/abuse/trust_score_worker.rb new file mode 100644 index 00000000000..061042ffa8a --- /dev/null +++ b/app/workers/abuse/trust_score_worker.rb @@ -0,0 +1,23 @@ +# frozen_string_literal: true + +module Abuse + class TrustScoreWorker + include ApplicationWorker + + data_consistency :delayed + + idempotent! + feature_category :instance_resiliency + urgency :low + + def perform(user_id, source, score, correlation_id = '') + user = User.find_by_id(user_id) + unless user + logger.info(structured_payload(message: "User not found.", user_id: user_id)) + return + end + + Abuse::TrustScore.create!(user: user, source: source, score: score.to_f, correlation_id_value: correlation_id) + end + end +end diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml index 0bb88efe183..ec5156bb1d0 100644 --- a/app/workers/all_queues.yml +++ b/app/workers/all_queues.yml @@ -246,6 +246,15 @@ :weight: 1 :idempotent: false :tags: [] +- :name: cronjob:ci_catalog_resources_process_sync_events + :worker_name: Ci::Catalog::Resources::ProcessSyncEventsWorker + :feature_category: :pipeline_composition + :has_external_dependencies: false + :urgency: :high + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: cronjob:ci_delete_unit_tests :worker_name: Ci::DeleteUnitTestsWorker :feature_category: :code_testing @@ -275,7 +284,7 @@ :tags: [] - :name: cronjob:ci_runners_reconcile_existing_runner_versions_cron :worker_name: Ci::Runners::ReconcileExistingRunnerVersionsCronWorker - :feature_category: :runner_fleet + :feature_category: :fleet_visibility :has_external_dependencies: false :urgency: :low :resource_boundary: :unknown @@ -284,7 +293,7 @@ :tags: [] - :name: cronjob:ci_runners_stale_machines_cleanup_cron :worker_name: Ci::Runners::StaleMachinesCleanupCronWorker - :feature_category: :runner_fleet + :feature_category: :fleet_visibility :has_external_dependencies: false :urgency: :low :resource_boundary: :unknown @@ -1749,6 +1758,15 @@ :weight: 1 :idempotent: true :tags: [] +- :name: package_cleanup:packages_nuget_cleanup_stale_symbols + :worker_name: Packages::Nuget::CleanupStaleSymbolsWorker + :feature_category: :package_registry + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: package_repositories:packages_debian_generate_distribution :worker_name: Packages::Debian::GenerateDistributionWorker :feature_category: :package_registry @@ -2041,7 +2059,7 @@ :worker_name: PipelineMetricsWorker :feature_category: :continuous_integration :has_external_dependencies: false - :urgency: :high + :urgency: :low :resource_boundary: :unknown :weight: 3 :idempotent: false @@ -2298,6 +2316,15 @@ :weight: 1 :idempotent: true :tags: [] +- :name: abuse_trust_score + :worker_name: Abuse::TrustScoreWorker + :feature_category: :instance_resiliency + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: analytics_usage_trends_counter_job :worker_name: Analytics::UsageTrends::CounterJobWorker :feature_category: :devops_reports @@ -2559,6 +2586,15 @@ :weight: 1 :idempotent: false :tags: [] +- :name: bitbucket_server_import_stage_import_users + :worker_name: Gitlab::BitbucketServerImport::Stage::ImportUsersWorker + :feature_category: :importers + :has_external_dependencies: true + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: false + :tags: [] - :name: bulk_import :worker_name: BulkImportWorker :feature_category: :importers @@ -2636,7 +2672,7 @@ :feature_category: :importers :has_external_dependencies: false :urgency: :low - :resource_boundary: :unknown + :resource_boundary: :memory :weight: 1 :idempotent: true :tags: [] @@ -2649,6 +2685,15 @@ :weight: 1 :idempotent: true :tags: [] +- :name: bulk_imports_transform_references + :worker_name: BulkImports::TransformReferencesWorker + :feature_category: :importers + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: chat_notification :worker_name: ChatNotificationWorker :feature_category: :integrations @@ -2703,6 +2748,15 @@ :weight: 1 :idempotent: true :tags: [] +- :name: ci_low_urgency_cancel_redundant_pipelines + :worker_name: Ci::LowUrgencyCancelRedundantPipelinesWorker + :feature_category: :continuous_integration + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: ci_parse_secure_file_metadata :worker_name: Ci::ParseSecureFileMetadataWorker :feature_category: :mobile_devops @@ -2714,7 +2768,7 @@ :tags: [] - :name: ci_runners_process_runner_version_update :worker_name: Ci::Runners::ProcessRunnerVersionUpdateWorker - :feature_category: :runner_fleet + :feature_category: :fleet_visibility :has_external_dependencies: false :urgency: :low :resource_boundary: :unknown @@ -3459,6 +3513,15 @@ :weight: 1 :idempotent: false :tags: [] +- :name: pages_deactivate_mr_deployments + :worker_name: Pages::DeactivateMrDeploymentsWorker + :feature_category: :pages + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: pages_domain_ssl_renewal :worker_name: PagesDomainSslRenewalWorker :feature_category: :pages diff --git a/app/workers/bulk_imports/entity_worker.rb b/app/workers/bulk_imports/entity_worker.rb index e510a8c0d06..258ccea1f63 100644 --- a/app/workers/bulk_imports/entity_worker.rb +++ b/app/workers/bulk_imports/entity_worker.rb @@ -3,9 +3,10 @@ module BulkImports class EntityWorker include ApplicationWorker + include ExclusiveLeaseGuard idempotent! - deduplicate :until_executed, if_deduplicated: :reschedule_once + deduplicate :until_executing data_consistency :always feature_category :importers sidekiq_options retry: 3, dead: false @@ -27,7 +28,10 @@ module BulkImports if running_tracker.present? log_info(message: 'Stage running', entity_stage: running_tracker.stage) else - start_next_stage + # Use lease guard to prevent duplicated workers from starting multiple stages + try_obtain_lease do + start_next_stage + end end re_enqueue @@ -38,7 +42,9 @@ module BulkImports Gitlab::ErrorTracking.track_exception( exception, - log_params(message: "Request to export #{entity.source_type} failed") + { + message: "Request to export #{entity.source_type} failed" + }.merge(logger.default_attributes) ) entity.fail_op! @@ -49,7 +55,9 @@ module BulkImports attr_reader :entity def re_enqueue - BulkImports::EntityWorker.perform_in(PERFORM_DELAY, entity.id) + with_context(bulk_import_entity_id: entity.id) do + BulkImports::EntityWorker.perform_in(PERFORM_DELAY, entity.id) + end end def running_tracker @@ -66,43 +74,34 @@ module BulkImports next_pipeline_trackers.each_with_index do |pipeline_tracker, index| log_info(message: 'Stage starting', entity_stage: pipeline_tracker.stage) if index == 0 - BulkImports::PipelineWorker.perform_async( - pipeline_tracker.id, - pipeline_tracker.stage, - entity.id - ) + with_context(bulk_import_entity_id: entity.id) do + BulkImports::PipelineWorker.perform_async( + pipeline_tracker.id, + pipeline_tracker.stage, + entity.id + ) + end end end - def source_version - entity.bulk_import.source_version_info.to_s + def lease_timeout + PERFORM_DELAY end - def logger - @logger ||= Logger.build + def lease_key + "gitlab:bulk_imports:entity_worker:#{entity.id}" end - def log_exception(exception, payload) - Gitlab::ExceptionLogFormatter.format!(exception, payload) - - logger.error(structured_payload(payload)) + def log_lease_taken + log_info(message: lease_taken_message) end - def log_info(payload) - logger.info(structured_payload(log_params(payload))) + def logger + @logger ||= Logger.build.with_entity(entity) end - def log_params(extra) - defaults = { - bulk_import_entity_id: entity.id, - bulk_import_id: entity.bulk_import_id, - bulk_import_entity_type: entity.source_type, - source_full_path: entity.source_full_path, - source_version: source_version, - importer: Logger::IMPORTER_NAME - } - - defaults.merge(extra) + def log_info(payload) + logger.info(structured_payload(payload)) end end end diff --git a/app/workers/bulk_imports/export_request_worker.rb b/app/workers/bulk_imports/export_request_worker.rb index f7456ddccb1..bfe561cca5c 100644 --- a/app/workers/bulk_imports/export_request_worker.rb +++ b/app/workers/bulk_imports/export_request_worker.rb @@ -20,7 +20,9 @@ module BulkImports set_source_xid request_export - BulkImports::EntityWorker.perform_async(entity_id) + with_context(bulk_import_entity_id: entity_id) do + BulkImports::EntityWorker.perform_async(entity_id) + end end def perform_failure(exception, entity_id) @@ -73,16 +75,7 @@ module BulkImports ::GlobalID.parse(response.dig(*entity_query.data_path, 'id')).model_id rescue StandardError => e - log_exception(e, - { - message: 'Failed to fetch source entity id', - bulk_import_entity_id: entity.id, - bulk_import_id: entity.bulk_import_id, - bulk_import_entity_type: entity.source_type, - source_full_path: entity.source_full_path, - source_version: entity.bulk_import.source_version_info.to_s - } - ) + log_exception(e, message: 'Failed to fetch source entity id') nil end @@ -96,7 +89,7 @@ module BulkImports end def logger - @logger ||= Logger.build + @logger ||= Logger.build.with_entity(entity) end def log_exception(exception, payload) @@ -106,16 +99,7 @@ module BulkImports end def log_and_fail(exception) - log_exception(exception, - { - bulk_import_entity_id: entity.id, - bulk_import_id: entity.bulk_import_id, - bulk_import_entity_type: entity.source_type, - source_full_path: entity.source_full_path, - message: "Request to export #{entity.source_type} failed", - source_version: entity.bulk_import.source_version_info.to_s - } - ) + log_exception(exception, message: "Request to export #{entity.source_type} failed") BulkImports::Failure.create(failure_attributes(exception)) diff --git a/app/workers/bulk_imports/finish_batched_pipeline_worker.rb b/app/workers/bulk_imports/finish_batched_pipeline_worker.rb index 40d26e14dc1..2670dc5438d 100644 --- a/app/workers/bulk_imports/finish_batched_pipeline_worker.rb +++ b/app/workers/bulk_imports/finish_batched_pipeline_worker.rb @@ -6,6 +6,7 @@ module BulkImports include ExceptionBacktrace REQUEUE_DELAY = 5.seconds + STALE_AFTER = 4.hours idempotent! deduplicate :until_executing @@ -18,46 +19,50 @@ module BulkImports @tracker = Tracker.find(pipeline_tracker_id) @context = ::BulkImports::Pipeline::Context.new(tracker) - return unless tracker.batched? - return unless tracker.started? + return unless tracker.batched? && tracker.started? + + @sorted_batches = tracker.batches.by_last_updated + return fail_stale_tracker_and_batches if most_recent_batch_stale? + return re_enqueue if import_in_progress? - if tracker.stale? - logger.error(log_attributes(message: 'Tracker stale. Failing batches and tracker')) - tracker.batches.map(&:fail_op!) - tracker.fail_op! - else - tracker.pipeline_class.new(@context).on_finish - logger.info(log_attributes(message: 'Tracker finished')) - tracker.finish! - end + tracker.pipeline_class.new(@context).on_finish + logger.info(log_attributes(message: 'Tracker finished')) + tracker.finish! end private - attr_reader :tracker + attr_reader :tracker, :sorted_batches def re_enqueue - self.class.perform_in(REQUEUE_DELAY, tracker.id) + with_context(bulk_import_entity_id: tracker.entity.id) do + self.class.perform_in(REQUEUE_DELAY, tracker.id) + end end def import_in_progress? - tracker.batches.any? { |b| b.started? || b.created? } + sorted_batches.in_progress.any? + end + + def most_recent_batch_stale? + return false unless sorted_batches.any? + + sorted_batches.first.updated_at < STALE_AFTER.ago + end + + def fail_stale_tracker_and_batches + logger.error(log_attributes(message: 'Batch stale. Failing batches and tracker')) + sorted_batches.map(&:fail_op!) + tracker.fail_op! end def logger - @logger ||= Logger.build + @logger ||= Logger.build.with_tracker(tracker) end def log_attributes(extra = {}) - structured_payload( - { - tracker_id: tracker.id, - bulk_import_id: tracker.entity.id, - bulk_import_entity_id: tracker.entity.bulk_import_id, - pipeline_class: tracker.pipeline_name - }.merge(extra) - ) + structured_payload(extra) end end end diff --git a/app/workers/bulk_imports/finish_project_import_worker.rb b/app/workers/bulk_imports/finish_project_import_worker.rb index 815101c89f3..18b8c016493 100644 --- a/app/workers/bulk_imports/finish_project_import_worker.rb +++ b/app/workers/bulk_imports/finish_project_import_worker.rb @@ -5,7 +5,7 @@ module BulkImports include ApplicationWorker feature_category :importers - sidekiq_options retry: 5 + sidekiq_options retry: 3 data_consistency :sticky idempotent! diff --git a/app/workers/bulk_imports/pipeline_batch_worker.rb b/app/workers/bulk_imports/pipeline_batch_worker.rb index 1485275e616..c24cc64e5c0 100644 --- a/app/workers/bulk_imports/pipeline_batch_worker.rb +++ b/app/workers/bulk_imports/pipeline_batch_worker.rb @@ -9,7 +9,7 @@ module BulkImports data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency feature_category :importers - sidekiq_options dead: false, retry: 3 + sidekiq_options dead: false, retry: 6 worker_has_external_dependencies! worker_resource_boundary :memory idempotent! @@ -42,6 +42,7 @@ module BulkImports @batch = ::BulkImports::BatchTracker.find(batch_id) @tracker = @batch.tracker + @entity = @tracker.entity @pending_retry = false return unless process_batch? @@ -50,7 +51,11 @@ module BulkImports try_obtain_lease { run } ensure - ::BulkImports::FinishBatchedPipelineWorker.perform_async(tracker.id) unless pending_retry + unless pending_retry + with_context(bulk_import_entity_id: entity.id) do + ::BulkImports::FinishBatchedPipelineWorker.perform_async(tracker.id) + end + end end def perform_failure(batch_id, exception) @@ -62,7 +67,7 @@ module BulkImports private - attr_reader :batch, :tracker, :pending_retry + attr_reader :batch, :tracker, :pending_retry, :entity def run return batch.skip! if tracker.failed? || tracker.finished? @@ -83,7 +88,7 @@ module BulkImports Gitlab::ErrorTracking.track_exception(exception, log_attributes(message: 'Batch tracker failed')) BulkImports::Failure.create( - bulk_import_entity_id: batch.tracker.entity.id, + bulk_import_entity_id: tracker.entity.id, pipeline_class: tracker.pipeline_name, pipeline_step: 'pipeline_batch_worker_run', exception_class: exception.class.to_s, @@ -91,7 +96,9 @@ module BulkImports correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id ) - ::BulkImports::FinishBatchedPipelineWorker.perform_async(tracker.id) + with_context(bulk_import_entity_id: tracker.entity.id) do + ::BulkImports::FinishBatchedPipelineWorker.perform_async(tracker.id) + end end def context @@ -115,7 +122,9 @@ module BulkImports def re_enqueue(delay = FILE_EXTRACTION_PIPELINE_PERFORM_DELAY) log_extra_metadata_on_done(:re_enqueue, true) - self.class.perform_in(delay, batch.id) + with_context(bulk_import_entity_id: entity.id) do + self.class.perform_in(delay, batch.id) + end end def process_batch? diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb index 2c1d28b33c5..0bb9464c6de 100644 --- a/app/workers/bulk_imports/pipeline_worker.rb +++ b/app/workers/bulk_imports/pipeline_worker.rb @@ -4,14 +4,17 @@ module BulkImports class PipelineWorker include ApplicationWorker include ExclusiveLeaseGuard + include Gitlab::Utils::StrongMemoize FILE_EXTRACTION_PIPELINE_PERFORM_DELAY = 10.seconds + LimitedBatches = Struct.new(:numbers, :final?, keyword_init: true).freeze + DEFER_ON_HEALTH_DELAY = 5.minutes data_consistency :always feature_category :importers - sidekiq_options dead: false, retry: 3 + sidekiq_options dead: false, retry: 6 worker_has_external_dependencies! deduplicate :until_executing worker_resource_boundary :memory @@ -52,7 +55,6 @@ module BulkImports try_obtain_lease do if pipeline_tracker.enqueued? || pipeline_tracker.started? logger.info(log_attributes(message: 'Pipeline starting')) - run end end @@ -62,7 +64,7 @@ module BulkImports @entity = ::BulkImports::Entity.find(entity_id) @pipeline_tracker = ::BulkImports::Tracker.find(pipeline_tracker_id) - fail_tracker(exception) + fail_pipeline(exception) end private @@ -84,7 +86,8 @@ module BulkImports return pipeline_tracker.finish! if export_status.batches_count < 1 - enqueue_batches + enqueue_limited_batches + re_enqueue unless all_batches_enqueued? else log_extra_metadata_on_done(:batched, false) @@ -96,13 +99,11 @@ module BulkImports retry_tracker(e) end - def source_version - entity.bulk_import.source_version_info.to_s - end - - def fail_tracker(exception) + def fail_pipeline(exception) pipeline_tracker.update!(status_event: 'fail_op', jid: jid) + entity.fail_op! if pipeline_tracker.abort_on_failure? + log_exception(exception, log_attributes(message: 'Pipeline failed')) Gitlab::ErrorTracking.track_exception(exception, log_attributes) @@ -118,18 +119,20 @@ module BulkImports end def logger - @logger ||= Logger.build + @logger ||= Logger.build.with_tracker(pipeline_tracker) end def re_enqueue(delay = FILE_EXTRACTION_PIPELINE_PERFORM_DELAY) log_extra_metadata_on_done(:re_enqueue, true) - self.class.perform_in( - delay, - pipeline_tracker.id, - pipeline_tracker.stage, - entity.id - ) + with_context(bulk_import_entity_id: entity.id) do + self.class.perform_in( + delay, + pipeline_tracker.id, + pipeline_tracker.stage, + entity.id + ) + end end def context @@ -181,19 +184,7 @@ module BulkImports end def log_attributes(extra = {}) - structured_payload( - { - bulk_import_entity_id: entity.id, - bulk_import_id: entity.bulk_import_id, - bulk_import_entity_type: entity.source_type, - source_full_path: entity.source_full_path, - pipeline_tracker_id: pipeline_tracker.id, - pipeline_class: pipeline_tracker.pipeline_name, - pipeline_tracker_state: pipeline_tracker.human_status_name, - source_version: source_version, - importer: Logger::IMPORTER_NAME - }.merge(extra) - ) + logger.default_attributes.merge(extra) end def log_exception(exception, payload) @@ -206,20 +197,60 @@ module BulkImports Time.zone.now - (pipeline_tracker.created_at || entity.created_at) end - def lease_timeout - 30 + def enqueue_limited_batches + next_batch.numbers.each do |batch_number| + batch = pipeline_tracker.batches.create!(batch_number: batch_number) + + with_context(bulk_import_entity_id: entity.id) do + ::BulkImports::PipelineBatchWorker.perform_async(batch.id) + end + end + + log_extra_metadata_on_done(:tracker_batch_numbers_enqueued, next_batch.numbers) + log_extra_metadata_on_done(:tracker_final_batch_was_enqueued, next_batch.final?) end - def lease_key - "gitlab:bulk_imports:pipeline_worker:#{pipeline_tracker.id}" + def all_batches_enqueued? + next_batch.final? end - def enqueue_batches - 1.upto(export_status.batches_count) do |batch_number| - batch = pipeline_tracker.batches.find_or_create_by!(batch_number: batch_number) # rubocop:disable CodeReuse/ActiveRecord + def next_batch + all_batch_numbers = (1..export_status.batches_count).to_a + + created_batch_numbers = pipeline_tracker.batches.pluck_batch_numbers - ::BulkImports::PipelineBatchWorker.perform_async(batch.id) + remaining_batch_numbers = all_batch_numbers - created_batch_numbers + + if Feature.disabled?(:bulk_import_limit_concurrent_batches, context.portable) + return LimitedBatches.new(numbers: remaining_batch_numbers, final?: true) end + + limit = next_batch_count + + LimitedBatches.new( + numbers: remaining_batch_numbers.first(limit), + final?: remaining_batch_numbers.count <= limit + ) + end + strong_memoize_attr :next_batch + + # Calculate the number of batches, up to `batch_limit`, to process in the + # next round. + def next_batch_count + limit = batch_limit - pipeline_tracker.batches.in_progress.limit(batch_limit).count + [limit, 0].max + end + + def batch_limit + ::Gitlab::CurrentSettings.bulk_import_concurrent_pipeline_batch_limit + end + + def lease_timeout + 30 + end + + def lease_key + "gitlab:bulk_imports:pipeline_worker:#{pipeline_tracker.id}" end end end diff --git a/app/workers/bulk_imports/relation_batch_export_worker.rb b/app/workers/bulk_imports/relation_batch_export_worker.rb index 87ceb775075..08c5fb81460 100644 --- a/app/workers/bulk_imports/relation_batch_export_worker.rb +++ b/app/workers/bulk_imports/relation_batch_export_worker.rb @@ -7,7 +7,8 @@ module BulkImports idempotent! data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency feature_category :importers - sidekiq_options status_expiration: StuckExportJobsWorker::EXPORT_JOBS_EXPIRATION, retry: 3 + sidekiq_options status_expiration: StuckExportJobsWorker::EXPORT_JOBS_EXPIRATION, retry: 6 + worker_resource_boundary :memory sidekiq_retries_exhausted do |job, exception| batch = BulkImports::ExportBatch.find(job['args'][1]) diff --git a/app/workers/bulk_imports/relation_export_worker.rb b/app/workers/bulk_imports/relation_export_worker.rb index 168626fee85..90941e7583b 100644 --- a/app/workers/bulk_imports/relation_export_worker.rb +++ b/app/workers/bulk_imports/relation_export_worker.rb @@ -10,7 +10,7 @@ module BulkImports loggable_arguments 2, 3 data_consistency :always feature_category :importers - sidekiq_options status_expiration: StuckExportJobsWorker::EXPORT_JOBS_EXPIRATION, retry: 3 + sidekiq_options status_expiration: StuckExportJobsWorker::EXPORT_JOBS_EXPIRATION, retry: 6 worker_resource_boundary :memory sidekiq_retries_exhausted do |job, exception| diff --git a/app/workers/bulk_imports/stuck_import_worker.rb b/app/workers/bulk_imports/stuck_import_worker.rb index 6c8569b0aa0..cb4e10a29b2 100644 --- a/app/workers/bulk_imports/stuck_import_worker.rb +++ b/app/workers/bulk_imports/stuck_import_worker.rb @@ -12,24 +12,27 @@ module BulkImports feature_category :importers + # Using Keyset pagination for scopes that involve timestamp indexes def perform - BulkImport.stale.find_each do |import| - logger.error(message: 'BulkImport stale', bulk_import_id: import.id) - import.cleanup_stale + Gitlab::Pagination::Keyset::Iterator.new(scope: bulk_import_scope).each_batch do |imports| + imports.each do |import| + logger.error(message: 'BulkImport stale', bulk_import_id: import.id) + import.cleanup_stale + end end - BulkImports::Entity.includes(:trackers).stale.find_each do |entity| # rubocop: disable CodeReuse/ActiveRecord - ApplicationRecord.transaction do - logger.error( - message: 'BulkImports::Entity stale', - bulk_import_id: entity.bulk_import_id, - bulk_import_entity_id: entity.id - ) + Gitlab::Pagination::Keyset::Iterator.new(scope: entity_scope).each_batch do |entities| + entities.each do |entity| + ApplicationRecord.transaction do + logger.with_entity(entity).error( + message: 'BulkImports::Entity stale' + ) - entity.cleanup_stale + entity.cleanup_stale - entity.trackers.find_each do |tracker| - tracker.cleanup_stale + entity.trackers.find_each do |tracker| + tracker.cleanup_stale + end end end end @@ -38,5 +41,13 @@ module BulkImports def logger @logger ||= Logger.build end + + def bulk_import_scope + BulkImport.stale.order_by_updated_at_and_id(:asc) + end + + def entity_scope + BulkImports::Entity.with_trackers.stale.order_by_updated_at_and_id(:asc) + end end end diff --git a/app/workers/bulk_imports/transform_references_worker.rb b/app/workers/bulk_imports/transform_references_worker.rb new file mode 100644 index 00000000000..383ad2fd733 --- /dev/null +++ b/app/workers/bulk_imports/transform_references_worker.rb @@ -0,0 +1,147 @@ +# frozen_string_literal: true + +module BulkImports + class TransformReferencesWorker + include ApplicationWorker + + idempotent! + data_consistency :delayed + sidekiq_options retry: 3, dead: false + feature_category :importers + + # rubocop: disable CodeReuse/ActiveRecord + def perform(object_ids, klass, tracker_id) + @tracker = BulkImports::Tracker.find_by_id(tracker_id) + + return unless tracker + + project = tracker.entity.project + + klass.constantize.where(id: object_ids, project: project).find_each do |object| + transform_and_save(object) + end + end + # rubocop: enable CodeReuse/ActiveRecord + + attr_reader :tracker + + private + + def transform_and_save(object) + body = object_body(object).dup + + return if body.blank? + + object.refresh_markdown_cache! + + body.gsub!(username_regex(mapped_usernames), mapped_usernames) + + if object_has_reference?(body) + matching_urls(object).each do |old_url, new_url| + body.gsub!(old_url, new_url) if body.include?(old_url) + end + end + + object.assign_attributes(body_field(object) => body) + object.save!(touch: false) if object_body_changed?(object) + + object + rescue StandardError => e + log_and_fail(e) + end + + def object_body(object) + call_object_method(object) + end + + def object_body_changed?(object) + call_object_method(object, suffix: '_changed?') + end + + def call_object_method(object, suffix: nil) + method = body_field(object) + method = "#{method}#{suffix}" if suffix.present? + + object.public_send(method) # rubocop:disable GitlabSecurity/PublicSend -- the method being called is dependent on several factors + end + + def body_field(object) + object.is_a?(Note) ? 'note' : 'description' + end + + def mapped_usernames + @mapped_usernames ||= ::BulkImports::UsersMapper.new(context: context) + .map_usernames.transform_keys { |key| "@#{key}" } + .transform_values { |value| "@#{value}" } + end + + def username_regex(mapped_usernames) + @username_regex ||= Regexp.new(mapped_usernames.keys.sort_by(&:length) + .reverse.map { |x| Regexp.escape(x) }.join('|')) + end + + def matching_urls(object) + URI.extract(object_body(object), %w[http https]).each_with_object([]) do |url, array| + parsed_url = URI.parse(url) + + next unless source_host == parsed_url.host + next unless parsed_url.path&.start_with?("/#{source_full_path}") + + array << [url, new_url(object, parsed_url)] + end + end + + def new_url(object, parsed_old_url) + parsed_old_url.host = ::Gitlab.config.gitlab.host + parsed_old_url.port = ::Gitlab.config.gitlab.port + parsed_old_url.scheme = ::Gitlab.config.gitlab.https ? 'https' : 'http' + parsed_old_url.to_s.gsub!(source_full_path, full_path(object)) + end + + def source_host + @source_host ||= URI.parse(context.configuration.url).host + end + + def source_full_path + @source_full_path ||= context.entity.source_full_path + end + + def full_path(object) + object.project.full_path + end + + def object_has_reference?(body) + body.include?(source_full_path) + end + + def log_and_fail(exception) + Gitlab::ErrorTracking.track_exception(exception, log_params) + BulkImports::Failure.create(failure_attributes(exception)) + end + + def log_params + { + message: 'Failed to update references', + bulk_import_id: context.bulk_import_id, + bulk_import_entity_id: tracker.bulk_import_entity_id, + source_full_path: context.entity.source_full_path, + source_version: context.bulk_import.source_version, + importer: 'gitlab_migration' + } + end + + def failure_attributes(exception) + { + bulk_import_entity_id: context.entity.id, + pipeline_class: 'ReferencesPipeline', + exception_class: exception.class.to_s, + exception_message: exception.message.truncate(255), + correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id + } + end + + def context + @context ||= BulkImports::Pipeline::Context.new(tracker) + end + end +end diff --git a/app/workers/ci/catalog/resources/process_sync_events_worker.rb b/app/workers/ci/catalog/resources/process_sync_events_worker.rb new file mode 100644 index 00000000000..15e06393aff --- /dev/null +++ b/app/workers/ci/catalog/resources/process_sync_events_worker.rb @@ -0,0 +1,41 @@ +# frozen_string_literal: true + +module Ci + module Catalog + module Resources + # This worker can be called multiple times simultaneously but only one can process events + # at a time. This is ensured by `try_obtain_lease` in `Ci::ProcessSyncEventsService`. + # + # This worker is enqueued in 3 ways: + # 1. By Project model callback after updating one of the columns referenced in + # `Ci::Catalog::Resource#sync_with_project`. + # 2. Every minute by cron job. This ensures we process SyncEvents from direct/bulk + # database updates that do not use the Project AR model. + # 3. By `Ci::ProcessSyncEventsService` if there are any remaining pending + # SyncEvents after processing. + # + class ProcessSyncEventsWorker + include ApplicationWorker + include CronjobQueue # rubocop: disable Scalability/CronWorkerContext -- Periodic processing is required + + feature_category :pipeline_composition + + data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency -- We should not sync stale data + urgency :high + + idempotent! + deduplicate :until_executed, if_deduplicated: :reschedule_once, ttl: 1.minute + + def perform + results = ::Ci::ProcessSyncEventsService.new( + ::Ci::Catalog::Resources::SyncEvent, ::Ci::Catalog::Resource + ).execute + + results.each do |key, value| + log_extra_metadata_on_done(key, value) + end + end + end + end + end +end diff --git a/app/workers/ci/low_urgency_cancel_redundant_pipelines_worker.rb b/app/workers/ci/low_urgency_cancel_redundant_pipelines_worker.rb new file mode 100644 index 00000000000..4eb55a9ecd4 --- /dev/null +++ b/app/workers/ci/low_urgency_cancel_redundant_pipelines_worker.rb @@ -0,0 +1,10 @@ +# frozen_string_literal: true + +module Ci + # Scheduled pipelines rarely cancel other pipelines and we don't need to + # use high urgency + class LowUrgencyCancelRedundantPipelinesWorker < CancelRedundantPipelinesWorker + urgency :low + idempotent! + end +end diff --git a/app/workers/ci/pipeline_artifacts/coverage_report_worker.rb b/app/workers/ci/pipeline_artifacts/coverage_report_worker.rb index 53bed0fa9da..3184fee2071 100644 --- a/app/workers/ci/pipeline_artifacts/coverage_report_worker.rb +++ b/app/workers/ci/pipeline_artifacts/coverage_report_worker.rb @@ -13,6 +13,7 @@ module Ci feature_category :code_testing idempotent! + deduplicate :until_executed def perform(pipeline_id) pipeline = Ci::Pipeline.find_by_id(pipeline_id) diff --git a/app/workers/ci/runners/process_runner_version_update_worker.rb b/app/workers/ci/runners/process_runner_version_update_worker.rb index f1ad0c8563e..acb1aac78a4 100644 --- a/app/workers/ci/runners/process_runner_version_update_worker.rb +++ b/app/workers/ci/runners/process_runner_version_update_worker.rb @@ -7,7 +7,7 @@ module Ci data_consistency :always - feature_category :runner_fleet + feature_category :fleet_visibility urgency :low idempotent! diff --git a/app/workers/ci/runners/reconcile_existing_runner_versions_cron_worker.rb b/app/workers/ci/runners/reconcile_existing_runner_versions_cron_worker.rb index 722c513a4bb..7bcfed1580f 100644 --- a/app/workers/ci/runners/reconcile_existing_runner_versions_cron_worker.rb +++ b/app/workers/ci/runners/reconcile_existing_runner_versions_cron_worker.rb @@ -9,7 +9,7 @@ module Ci include CronjobQueue # rubocop:disable Scalability/CronWorkerContext data_consistency :sticky - feature_category :runner_fleet + feature_category :fleet_visibility urgency :low deduplicate :until_executed diff --git a/app/workers/ci/runners/stale_machines_cleanup_cron_worker.rb b/app/workers/ci/runners/stale_machines_cleanup_cron_worker.rb index 9407e7c0e0a..9831e3e98b7 100644 --- a/app/workers/ci/runners/stale_machines_cleanup_cron_worker.rb +++ b/app/workers/ci/runners/stale_machines_cleanup_cron_worker.rb @@ -9,7 +9,7 @@ module Ci include CronjobQueue # rubocop:disable Scalability/CronWorkerContext data_consistency :sticky - feature_category :runner_fleet + feature_category :fleet_visibility urgency :low idempotent! diff --git a/app/workers/click_house/events_sync_worker.rb b/app/workers/click_house/events_sync_worker.rb index e884a43b1e3..21c10566a67 100644 --- a/app/workers/click_house/events_sync_worker.rb +++ b/app/workers/click_house/events_sync_worker.rb @@ -3,7 +3,9 @@ module ClickHouse class EventsSyncWorker include ApplicationWorker + include ClickHouseWorker include Gitlab::ExclusiveLeaseHelpers + include Gitlab::Utils::StrongMemoize idempotent! queue_namespace :cronjob @@ -91,8 +93,13 @@ module ClickHouse ) end + def last_event_id_in_postgresql + Event.maximum(:id) + end + strong_memoize_attr :last_event_id_in_postgresql + def enabled? - ClickHouse::Client.configuration.databases[:main].present? && Feature.enabled?(:event_sync_worker_for_click_house) + ClickHouse::Client.database_configured?(:main) && Feature.enabled?(:event_sync_worker_for_click_house) end def next_batch @@ -110,24 +117,34 @@ module ClickHouse def process_batch(context) Enumerator.new do |yielder| - has_data = false - # rubocop: disable CodeReuse/ActiveRecord - Event.where(Event.arel_table[:id].gt(context.last_record_id)).each_batch(of: BATCH_SIZE) do |relation| - has_data = true - - relation.select(*EVENT_PROJECTIONS).each do |row| + has_more_data = false + batching_scope.each_batch(of: BATCH_SIZE) do |relation| + records = relation.select(*EVENT_PROJECTIONS).to_a + has_more_data = records.size == BATCH_SIZE + records.each do |row| yielder << row context.last_processed_id = row.id break if context.record_limit_reached? end - break if context.over_time? || context.record_limit_reached? + break if context.over_time? || context.record_limit_reached? || !has_more_data end - context.no_more_records! if has_data == false - # rubocop: enable CodeReuse/ActiveRecord + context.no_more_records! unless has_more_data end end + + # rubocop: disable CodeReuse/ActiveRecord + def batching_scope + return Event.none unless last_event_id_in_postgresql + + table = Event.arel_table + + Event + .where(table[:id].gt(context.last_record_id)) + .where(table[:id].lteq(last_event_id_in_postgresql)) + end + # rubocop: enable CodeReuse/ActiveRecord end end diff --git a/app/workers/concerns/click_house_worker.rb b/app/workers/concerns/click_house_worker.rb new file mode 100644 index 00000000000..6399796f6df --- /dev/null +++ b/app/workers/concerns/click_house_worker.rb @@ -0,0 +1,30 @@ +# frozen_string_literal: true + +module ClickHouseWorker + extend ActiveSupport::Concern + + class_methods do + def register_click_house_worker? + click_house_worker_attrs.present? + end + + def click_house_worker_attrs + get_class_attribute(:click_house_worker_attrs) + end + + def click_house_migration_lock(ttl) + raise ArgumentError unless ttl.is_a?(ActiveSupport::Duration) + + set_class_attribute( + :click_house_worker_attrs, + (click_house_worker_attrs || {}).merge(migration_lock_ttl: ttl) + ) + end + end + + included do + click_house_migration_lock(ClickHouse::MigrationSupport::ExclusiveLock::DEFAULT_CLICKHOUSE_WORKER_TTL) + + pause_control :click_house_migration + end +end diff --git a/app/workers/concerns/gitlab/bitbucket_server_import/object_importer.rb b/app/workers/concerns/gitlab/bitbucket_server_import/object_importer.rb index 1090d82c922..fbcb5d81c8a 100644 --- a/app/workers/concerns/gitlab/bitbucket_server_import/object_importer.rb +++ b/app/workers/concerns/gitlab/bitbucket_server_import/object_importer.rb @@ -7,6 +7,8 @@ module Gitlab module ObjectImporter extend ActiveSupport::Concern + FAILED_IMPORT_STATES = %w[canceled failed].freeze + included do include ApplicationWorker @@ -33,8 +35,10 @@ module Gitlab return unless project - if project.import_state&.canceled? - info(project.id, message: 'project import canceled') + import_state = project.import_status + + if FAILED_IMPORT_STATES.include?(import_state) + info(project.id, message: "project import #{import_state}") return end diff --git a/app/workers/concerns/gitlab/github_import/object_importer.rb b/app/workers/concerns/gitlab/github_import/object_importer.rb index fcc7a96fa2b..15156e1deef 100644 --- a/app/workers/concerns/gitlab/github_import/object_importer.rb +++ b/app/workers/concerns/gitlab/github_import/object_importer.rb @@ -16,6 +16,7 @@ module Gitlab feature_category :importers worker_has_external_dependencies! + sidekiq_options retry: 5 sidekiq_retries_exhausted do |msg| args = msg['args'] jid = msg['jid'] @@ -57,12 +58,7 @@ module Gitlab end info(project.id, message: 'importer finished') - rescue NoMethodError => e - # This exception will be more useful in development when a new - # Representation is created but the developer forgot to add a - # `#github_identifiers` method. - track_and_raise_exception(project, e, fail_import: true) - rescue ActiveRecord::RecordInvalid, NotRetriableError => e + rescue ActiveRecord::RecordInvalid, NotRetriableError, NoMethodError => e # We do not raise exception to prevent job retry track_exception(project, e) rescue StandardError => e diff --git a/app/workers/concerns/gitlab/github_import/queue.rb b/app/workers/concerns/gitlab/github_import/queue.rb index 7cc23dd7c0b..5aabc74a3d5 100644 --- a/app/workers/concerns/gitlab/github_import/queue.rb +++ b/app/workers/concerns/gitlab/github_import/queue.rb @@ -14,7 +14,7 @@ module Gitlab # the dead queue. This does mean some resources may not be imported, but # this is better than a project being stuck in the "import" state # forever. - sidekiq_options dead: false, retry: 5 + sidekiq_options dead: false end end end diff --git a/app/workers/concerns/gitlab/github_import/rescheduling_methods.rb b/app/workers/concerns/gitlab/github_import/rescheduling_methods.rb index 316d30d94da..e2808f45821 100644 --- a/app/workers/concerns/gitlab/github_import/rescheduling_methods.rb +++ b/app/workers/concerns/gitlab/github_import/rescheduling_methods.rb @@ -8,6 +8,8 @@ module Gitlab extend ActiveSupport::Concern include JobDelayCalculator + attr_reader :project + ENQUEUED_JOB_COUNT = 'github-importer/enqueued_job_count/%{project}/%{collection}' included do @@ -17,8 +19,10 @@ module Gitlab # project_id - The ID of the GitLab project to import the note into. # hash - A Hash containing the details of the GitHub object to import. # notify_key - The Redis key to notify upon completion, if any. + def perform(project_id, hash, notify_key = nil) - project = Project.find_by_id(project_id) + @project = Project.find_by_id(project_id) # rubocop:disable Gitlab/ModuleWithInstanceVariables -- GitHub Import + # uses modules everywhere. Too big to refactor. return notify_waiter(notify_key) unless project diff --git a/app/workers/concerns/gitlab/github_import/stage_methods.rb b/app/workers/concerns/gitlab/github_import/stage_methods.rb index 5c63c667a03..5f6812ab84f 100644 --- a/app/workers/concerns/gitlab/github_import/stage_methods.rb +++ b/app/workers/concerns/gitlab/github_import/stage_methods.rb @@ -9,6 +9,11 @@ module Gitlab included do include ApplicationWorker + include GithubImport::Queue + + sidekiq_options retry: 6 + + sidekiq_options status_expiration: Gitlab::Import::StuckImportJob::IMPORT_JOBS_EXPIRATION sidekiq_retries_exhausted do |msg, e| Gitlab::Import::ImportFailureService.track( @@ -37,8 +42,6 @@ module Gitlab # - Continue their loop from where it left off: # https://gitlab.com/gitlab-org/gitlab/-/blob/024235ec/lib/gitlab/github_import/importer/pull_requests/review_requests_importer.rb#L15 def resumes_work_when_interrupted! - return unless Feature.enabled?(:github_importer_raise_max_interruptions) - sidekiq_options max_retries_after_interruption: MAX_RETRIES_AFTER_INTERRUPTION end end @@ -79,7 +82,7 @@ module Gitlab # client - An instance of Gitlab::GithubImport::Client. # project - An instance of Project. def try_import(client, project) - project.import_state.refresh_jid_expiration + RefreshImportJidWorker.perform_in_the_future(project.id, jid) import(client, project) rescue RateLimitError diff --git a/app/workers/concerns/update_repository_storage_worker.rb b/app/workers/concerns/update_repository_storage_worker.rb index 01744d1e57d..fd437ebc158 100644 --- a/app/workers/concerns/update_repository_storage_worker.rb +++ b/app/workers/concerns/update_repository_storage_worker.rb @@ -11,7 +11,19 @@ module UpdateRepositoryStorageWorker urgency :throttled end - def perform(container_id, new_repository_storage_key, repository_storage_move_id = nil) + LEASE_TIMEOUT = 30.minutes.to_i + + # `container_id` and `new_repository_storage_key` arguments have been deprecated. + # `repository_storage_move_id` is now a mandatory argument. + # We are using *args for backwards compatability. Previously defined as: + # perform(container_id, new_repository_storage_key, repository_storage_move_id = nil) + def perform(*args) + if args.length == 1 + repository_storage_move_id = args[0] + else + container_id, new_repository_storage_key, repository_storage_move_id = *args + end + repository_storage_move = if repository_storage_move_id find_repository_storage_move(repository_storage_move_id) @@ -24,7 +36,35 @@ module UpdateRepositoryStorageWorker ) end - update_repository_storage(repository_storage_move) + container_id ||= repository_storage_move.container_id + + # Use exclusive lock to prevent multiple storage migrations at the same time + # + # Note: instead of using a randomly generated `uuid`, we provide a worker jid value. + # That will allow to track a worker that requested a lease. + lease_key = [self.class.name.underscore, container_id].join(':') + exclusive_lease = Gitlab::ExclusiveLease.new(lease_key, uuid: jid, timeout: LEASE_TIMEOUT) + lease = exclusive_lease.try_obtain + + if lease + begin + update_repository_storage(repository_storage_move) + ensure + exclusive_lease.cancel + end + else + # If there is an ungoing storage migration, then the current one should be marked as failed + repository_storage_move.do_fail! + + # A special case + # Sidekiq can receive an interrupt signal during the processing. + # It kills existing workers and reschedules their jobs using the same jid. + # But it can cause a situation when the migration is only half complete (see https://gitlab.com/gitlab-org/gitlab/-/issues/429049#note_1635650597) + # + # Here we detect this case and release the lock. + uuid = Gitlab::ExclusiveLease.get_uuid(lease_key) + exclusive_lease.cancel if uuid == jid + end end private diff --git a/app/workers/container_registry/cleanup_worker.rb b/app/workers/container_registry/cleanup_worker.rb index 9ec02dd613e..cd61c5ebcb4 100644 --- a/app/workers/container_registry/cleanup_worker.rb +++ b/app/workers/container_registry/cleanup_worker.rb @@ -38,7 +38,7 @@ module ContainerRegistry # Deleting stale ongoing repair details would put the project back to the analysis pool ContainerRegistry::DataRepairDetail .ongoing_since(STALE_REPAIR_DETAIL_THRESHOLD.ago) - .each_batch(of: BATCH_SIZE) do |batch| # rubocop:disable Style/SymbolProc + .each_batch(of: BATCH_SIZE) do |batch| batch.delete_all end end diff --git a/app/workers/delete_user_worker.rb b/app/workers/delete_user_worker.rb index 6a375a0cdd4..4634ea8ff4f 100644 --- a/app/workers/delete_user_worker.rb +++ b/app/workers/delete_user_worker.rb @@ -14,7 +14,7 @@ class DeleteUserWorker # rubocop:disable Scalability/IdempotentWorker delete_user = User.find_by_id(delete_user_id) return unless delete_user.present? - return if delete_user.banned? && ::Feature.enabled?(:delay_delete_own_user) + return if skip_own_account_deletion?(delete_user) current_user = User.find_by_id(current_user_id) return unless current_user.present? @@ -23,4 +23,34 @@ class DeleteUserWorker # rubocop:disable Scalability/IdempotentWorker rescue Gitlab::Access::AccessDeniedError => e Gitlab::AppLogger.warn("User could not be destroyed: #{e}") end + + private + + def skip_own_account_deletion?(user) + return false unless ::Feature.enabled?(:delay_delete_own_user) + + skip = + if user.banned? + true + else + # User is blocked when they delete their own account. Skip record deletion + # when user has been unblocked (e.g. when the user's account is reinstated + # by Trust & Safety) + user.deleted_own_account? && !user.blocked? + end + + if skip + user.custom_attributes.by_key(UserCustomAttribute::DELETED_OWN_ACCOUNT_AT).first&.destroy + UserCustomAttribute.set_skipped_account_deletion_at(user) + + Gitlab::AppLogger.info( + message: 'Skipped own account deletion.', + reason: "User has been #{user.banned? ? 'banned' : 'unblocked'}.", + user_id: user.id, + username: user.username + ) + end + + skip + end end diff --git a/app/workers/gitlab/bitbucket_server_import/stage/import_repository_worker.rb b/app/workers/gitlab/bitbucket_server_import/stage/import_repository_worker.rb index b378d07d59c..573c73cd7df 100644 --- a/app/workers/gitlab/bitbucket_server_import/stage/import_repository_worker.rb +++ b/app/workers/gitlab/bitbucket_server_import/stage/import_repository_worker.rb @@ -14,7 +14,11 @@ module Gitlab importer.execute - ImportPullRequestsWorker.perform_async(project.id) + if Feature.enabled?(:bitbucket_server_convert_mentions_to_users, project.creator) + ImportUsersWorker.perform_async(project.id) + else + ImportPullRequestsWorker.perform_async(project.id) + end end def importer_class diff --git a/app/workers/gitlab/bitbucket_server_import/stage/import_users_worker.rb b/app/workers/gitlab/bitbucket_server_import/stage/import_users_worker.rb new file mode 100644 index 00000000000..dd18139fc9e --- /dev/null +++ b/app/workers/gitlab/bitbucket_server_import/stage/import_users_worker.rb @@ -0,0 +1,25 @@ +# frozen_string_literal: true + +module Gitlab + module BitbucketServerImport + module Stage + class ImportUsersWorker # rubocop:disable Scalability/IdempotentWorker -- ImportPullRequestsWorker is not idempotent + include StageMethods + + private + + def import(project) + importer = importer_class.new(project) + + importer.execute + + ImportPullRequestsWorker.perform_async(project.id) + end + + def importer_class + Importers::UsersImporter + end + end + end + end +end diff --git a/app/workers/gitlab/github_import/advance_stage_worker.rb b/app/workers/gitlab/github_import/advance_stage_worker.rb index a012241e90c..417b8598547 100644 --- a/app/workers/gitlab/github_import/advance_stage_worker.rb +++ b/app/workers/gitlab/github_import/advance_stage_worker.rb @@ -11,12 +11,15 @@ module Gitlab data_consistency :always - sidekiq_options retry: 3 include ::Gitlab::Import::AdvanceStage - sidekiq_options dead: false - feature_category :importers loggable_arguments 1, 2 + sidekiq_options retry: 6 + + # TODO: Allow this class to include GithubImport::Queue and remove + # the following two lines https://gitlab.com/gitlab-org/gitlab/-/issues/435622 + feature_category :importers + sidekiq_options dead: false # The known importer stages and their corresponding Sidekiq workers. STAGES = { diff --git a/app/workers/gitlab/github_import/refresh_import_jid_worker.rb b/app/workers/gitlab/github_import/refresh_import_jid_worker.rb index 3de4bef053f..dfc581f201b 100644 --- a/app/workers/gitlab/github_import/refresh_import_jid_worker.rb +++ b/app/workers/gitlab/github_import/refresh_import_jid_worker.rb @@ -9,8 +9,10 @@ module Gitlab include GithubImport::Queue + sidekiq_options retry: 5 + # The interval to schedule new instances of this job at. - INTERVAL = 1.minute.to_i + INTERVAL = 5.minutes.to_i def self.perform_in_the_future(*args) perform_in(INTERVAL, *args) @@ -23,9 +25,11 @@ module Gitlab return unless import_state if SidekiqStatus.running?(check_job_id) - # As long as the repository is being cloned we want to keep refreshing - # the import JID status. - import_state.refresh_jid_expiration + # As long as the worker is running we want to keep refreshing + # the worker's JID as well as the import's JID. + Gitlab::SidekiqStatus.expire(check_job_id, Gitlab::Import::StuckImportJob::IMPORT_JOBS_EXPIRATION) + Gitlab::SidekiqStatus.set(import_state.jid, Gitlab::Import::StuckImportJob::IMPORT_JOBS_EXPIRATION) + self.class.perform_in_the_future(project_id, check_job_id) end diff --git a/app/workers/gitlab/github_import/stage/finish_import_worker.rb b/app/workers/gitlab/github_import/stage/finish_import_worker.rb index 90445a6d46c..8d5a98136af 100644 --- a/app/workers/gitlab/github_import/stage/finish_import_worker.rb +++ b/app/workers/gitlab/github_import/stage/finish_import_worker.rb @@ -8,7 +8,6 @@ module Gitlab data_consistency :always - include GithubImport::Queue include StageMethods # project - An instance of Project. diff --git a/app/workers/gitlab/github_import/stage/import_attachments_worker.rb b/app/workers/gitlab/github_import/stage/import_attachments_worker.rb index a5d085a82c0..bbf762133e1 100644 --- a/app/workers/gitlab/github_import/stage/import_attachments_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_attachments_worker.rb @@ -8,7 +8,6 @@ module Gitlab data_consistency :always - include GithubImport::Queue include StageMethods resumes_work_when_interrupted! diff --git a/app/workers/gitlab/github_import/stage/import_base_data_worker.rb b/app/workers/gitlab/github_import/stage/import_base_data_worker.rb index 5bbe14b6528..d965c1ae847 100644 --- a/app/workers/gitlab/github_import/stage/import_base_data_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_base_data_worker.rb @@ -8,7 +8,6 @@ module Gitlab data_consistency :always - include GithubImport::Queue include StageMethods # These importers are fast enough that we can just run them in the same diff --git a/app/workers/gitlab/github_import/stage/import_collaborators_worker.rb b/app/workers/gitlab/github_import/stage/import_collaborators_worker.rb index 037b529b866..b5b1601e3ed 100644 --- a/app/workers/gitlab/github_import/stage/import_collaborators_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_collaborators_worker.rb @@ -8,7 +8,6 @@ module Gitlab data_consistency :always - include GithubImport::Queue include StageMethods # client - An instance of Gitlab::GithubImport::Client. diff --git a/app/workers/gitlab/github_import/stage/import_issue_events_worker.rb b/app/workers/gitlab/github_import/stage/import_issue_events_worker.rb index 35779d7bfc5..27d14a1a108 100644 --- a/app/workers/gitlab/github_import/stage/import_issue_events_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_issue_events_worker.rb @@ -8,7 +8,6 @@ module Gitlab data_consistency :always - include GithubImport::Queue include StageMethods resumes_work_when_interrupted! diff --git a/app/workers/gitlab/github_import/stage/import_issues_and_diff_notes_worker.rb b/app/workers/gitlab/github_import/stage/import_issues_and_diff_notes_worker.rb index 58e1f637b6a..595f0ca44d4 100644 --- a/app/workers/gitlab/github_import/stage/import_issues_and_diff_notes_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_issues_and_diff_notes_worker.rb @@ -8,7 +8,6 @@ module Gitlab data_consistency :always - include GithubImport::Queue include StageMethods resumes_work_when_interrupted! diff --git a/app/workers/gitlab/github_import/stage/import_lfs_objects_worker.rb b/app/workers/gitlab/github_import/stage/import_lfs_objects_worker.rb index 8d7bd98f303..34c31fea726 100644 --- a/app/workers/gitlab/github_import/stage/import_lfs_objects_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_lfs_objects_worker.rb @@ -8,7 +8,6 @@ module Gitlab data_consistency :always - include GithubImport::Queue include StageMethods # Importer::LfsObjectsImporter can resume work when interrupted as diff --git a/app/workers/gitlab/github_import/stage/import_notes_worker.rb b/app/workers/gitlab/github_import/stage/import_notes_worker.rb index 0459545d8e1..8aea27a94d4 100644 --- a/app/workers/gitlab/github_import/stage/import_notes_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_notes_worker.rb @@ -8,7 +8,6 @@ module Gitlab data_consistency :always - include GithubImport::Queue include StageMethods resumes_work_when_interrupted! diff --git a/app/workers/gitlab/github_import/stage/import_protected_branches_worker.rb b/app/workers/gitlab/github_import/stage/import_protected_branches_worker.rb index e281e965f94..65b9d85f453 100644 --- a/app/workers/gitlab/github_import/stage/import_protected_branches_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_protected_branches_worker.rb @@ -8,7 +8,6 @@ module Gitlab data_consistency :always - include GithubImport::Queue include StageMethods # client - An instance of Gitlab::GithubImport::Client. diff --git a/app/workers/gitlab/github_import/stage/import_pull_requests_merged_by_worker.rb b/app/workers/gitlab/github_import/stage/import_pull_requests_merged_by_worker.rb index 2f543951bf3..20b2e5ed6af 100644 --- a/app/workers/gitlab/github_import/stage/import_pull_requests_merged_by_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_pull_requests_merged_by_worker.rb @@ -8,7 +8,6 @@ module Gitlab data_consistency :always - include GithubImport::Queue include StageMethods resumes_work_when_interrupted! diff --git a/app/workers/gitlab/github_import/stage/import_pull_requests_review_requests_worker.rb b/app/workers/gitlab/github_import/stage/import_pull_requests_review_requests_worker.rb index db76545ae87..1262fc23c6c 100644 --- a/app/workers/gitlab/github_import/stage/import_pull_requests_review_requests_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_pull_requests_review_requests_worker.rb @@ -8,7 +8,6 @@ module Gitlab data_consistency :always - include GithubImport::Queue include StageMethods resumes_work_when_interrupted! diff --git a/app/workers/gitlab/github_import/stage/import_pull_requests_reviews_worker.rb b/app/workers/gitlab/github_import/stage/import_pull_requests_reviews_worker.rb index 31b7c57a524..bb4699889da 100644 --- a/app/workers/gitlab/github_import/stage/import_pull_requests_reviews_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_pull_requests_reviews_worker.rb @@ -8,7 +8,6 @@ module Gitlab data_consistency :always - include GithubImport::Queue include StageMethods resumes_work_when_interrupted! diff --git a/app/workers/gitlab/github_import/stage/import_pull_requests_worker.rb b/app/workers/gitlab/github_import/stage/import_pull_requests_worker.rb index c68b95b5111..bcc39b169af 100644 --- a/app/workers/gitlab/github_import/stage/import_pull_requests_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_pull_requests_worker.rb @@ -8,7 +8,6 @@ module Gitlab data_consistency :always - include GithubImport::Queue include StageMethods resumes_work_when_interrupted! diff --git a/app/workers/gitlab/github_import/stage/import_repository_worker.rb b/app/workers/gitlab/github_import/stage/import_repository_worker.rb index 2a62930b5ea..44481b8a75c 100644 --- a/app/workers/gitlab/github_import/stage/import_repository_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_repository_worker.rb @@ -8,18 +8,11 @@ module Gitlab data_consistency :always - include GithubImport::Queue include StageMethods # client - An instance of Gitlab::GithubImport::Client. # project - An instance of Project. def import(client, project) - # In extreme cases it's possible for a clone to take more than the - # import job expiration time. To work around this we schedule a - # separate job that will periodically run and refresh the import - # expiration time. - RefreshImportJidWorker.perform_in_the_future(project.id, jid) - info(project.id, message: "starting importer", importer: 'Importer::RepositoryImporter') # If a user creates an issue while the import is in progress, this can lead to an import failure. diff --git a/app/workers/gitlab/import/advance_stage.rb b/app/workers/gitlab/import/advance_stage.rb index 782439894c0..709957556d3 100644 --- a/app/workers/gitlab/import/advance_stage.rb +++ b/app/workers/gitlab/import/advance_stage.rb @@ -37,6 +37,8 @@ module Gitlab if new_job_count != previous_job_count timeout_timer = Time.zone.now previous_job_count = new_job_count + + import_state_jid.refresh_jid_expiration end if new_waiters.empty? diff --git a/app/workers/merge_request_cleanup_refs_worker.rb b/app/workers/merge_request_cleanup_refs_worker.rb index db1a1e96997..36979e843ef 100644 --- a/app/workers/merge_request_cleanup_refs_worker.rb +++ b/app/workers/merge_request_cleanup_refs_worker.rb @@ -19,7 +19,7 @@ class MergeRequestCleanupRefsWorker def perform_work unless merge_request - logger.error('No existing merge request to be cleaned up.') + logger.info('No existing merge request to be cleaned up.') return end diff --git a/app/workers/packages/cleanup_package_registry_worker.rb b/app/workers/packages/cleanup_package_registry_worker.rb index 5b2d8bacd62..50036923e94 100644 --- a/app/workers/packages/cleanup_package_registry_worker.rb +++ b/app/workers/packages/cleanup_package_registry_worker.rb @@ -14,6 +14,7 @@ module Packages enqueue_package_file_cleanup_job if Packages::PackageFile.pending_destruction.exists? enqueue_cleanup_policy_jobs if Packages::Cleanup::Policy.runnable.exists? enqueue_cleanup_stale_npm_metadata_cache_job if Packages::Npm::MetadataCache.pending_destruction.exists? + enqueue_cleanup_stale_nuget_symbols_job if Packages::Nuget::Symbol.pending_destruction.exists? log_counts end @@ -32,6 +33,10 @@ module Packages Packages::Npm::CleanupStaleMetadataCacheWorker.perform_with_capacity end + def enqueue_cleanup_stale_nuget_symbols_job + Packages::Nuget::CleanupStaleSymbolsWorker.perform_with_capacity + end + def log_counts use_replica_if_available do pending_destruction_package_files_count = Packages::PackageFile.pending_destruction.count diff --git a/app/workers/packages/npm/create_metadata_cache_worker.rb b/app/workers/packages/npm/create_metadata_cache_worker.rb index 0b6e34b13eb..cff7871dab7 100644 --- a/app/workers/packages/npm/create_metadata_cache_worker.rb +++ b/app/workers/packages/npm/create_metadata_cache_worker.rb @@ -16,7 +16,7 @@ module Packages def perform(project_id, package_name) project = Project.find_by_id(project_id) - return unless project && Feature.enabled?(:npm_metadata_cache, project) + return unless project ::Packages::Npm::CreateMetadataCacheService .new(project, package_name) diff --git a/app/workers/packages/nuget/cleanup_stale_symbols_worker.rb b/app/workers/packages/nuget/cleanup_stale_symbols_worker.rb new file mode 100644 index 00000000000..be90b86604c --- /dev/null +++ b/app/workers/packages/nuget/cleanup_stale_symbols_worker.rb @@ -0,0 +1,46 @@ +# frozen_string_literal: true + +module Packages + module Nuget + class CleanupStaleSymbolsWorker + include ApplicationWorker + include ::Packages::CleanupArtifactWorker + + MAX_CAPACITY = 2 + + data_consistency :sticky + + queue_namespace :package_cleanup + feature_category :package_registry + + deduplicate :until_executed + idempotent! + + def max_running_jobs + MAX_CAPACITY + end + + private + + def model + Packages::Nuget::Symbol + end + + def next_item + model.next_pending_destruction(order_by: nil) + end + + def log_metadata(nuget_symbol) + log_extra_metadata_on_done(:nuget_symbol_id, nuget_symbol.id) + end + + def log_cleanup_item(nuget_symbol) + logger.info( + structured_payload( + nuget_symbol_id: nuget_symbol.id + ) + ) + end + end + end +end diff --git a/app/workers/pages/deactivate_mr_deployments_worker.rb b/app/workers/pages/deactivate_mr_deployments_worker.rb new file mode 100644 index 00000000000..910cae72d12 --- /dev/null +++ b/app/workers/pages/deactivate_mr_deployments_worker.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +module Pages + class DeactivateMrDeploymentsWorker + include ApplicationWorker + + idempotent! + data_consistency :always # rubocop: disable SidekiqLoadBalancing/WorkerDataConsistency -- performing writes only + urgency :low + + feature_category :pages + + def perform(merge_request_id) + build_ids = Ci::Build.ids_in_merge_request(merge_request_id) + deactivate_deployments_with_build_ids(build_ids) + end + + private + + def deactivate_deployments_with_build_ids(build_ids) + PagesDeployment + .versioned + .ci_build_id_in(build_ids) + .each_batch do |batch| + batch.deactivate + end + end + end +end diff --git a/app/workers/pages/deactivated_deployments_delete_cron_worker.rb b/app/workers/pages/deactivated_deployments_delete_cron_worker.rb index 75905759761..eeafed446c8 100644 --- a/app/workers/pages/deactivated_deployments_delete_cron_worker.rb +++ b/app/workers/pages/deactivated_deployments_delete_cron_worker.rb @@ -11,7 +11,7 @@ module Pages feature_category :pages def perform - PagesDeployment.deactivated.each_batch do |deployments| # rubocop: disable Style/SymbolProc + PagesDeployment.deactivated.each_batch do |deployments| deployments.each { |deployment| deployment.file.remove! } deployments.delete_all end diff --git a/app/workers/pipeline_metrics_worker.rb b/app/workers/pipeline_metrics_worker.rb index 4e98c7268ac..b45a1c33d5c 100644 --- a/app/workers/pipeline_metrics_worker.rb +++ b/app/workers/pipeline_metrics_worker.rb @@ -8,7 +8,7 @@ class PipelineMetricsWorker # rubocop:disable Scalability/IdempotentWorker sidekiq_options retry: 3 include PipelineQueue - urgency :high + urgency :low def perform(pipeline_id) Ci::Pipeline.find_by_id(pipeline_id).try do |pipeline| diff --git a/app/workers/pipeline_schedule_worker.rb b/app/workers/pipeline_schedule_worker.rb index ca589acf26c..6237f64fa86 100644 --- a/app/workers/pipeline_schedule_worker.rb +++ b/app/workers/pipeline_schedule_worker.rb @@ -10,6 +10,8 @@ class PipelineScheduleWorker # rubocop:disable Scalability/IdempotentWorker LOCK_RETRY = 3 LOCK_TTL = 5.minutes + DELAY = 7.seconds + BATCH_SIZE = 500 feature_category :continuous_integration worker_resource_boundary :cpu @@ -20,12 +22,8 @@ class PipelineScheduleWorker # rubocop:disable Scalability/IdempotentWorker .select(:id, :owner_id, :project_id) # Minimize the selected columns .runnable_schedules .preloaded - .find_in_batches do |schedules| - RunPipelineScheduleWorker.bulk_perform_async_with_contexts( - schedules, - arguments_proc: ->(schedule) { [schedule.id, schedule.owner_id, { scheduling: true }] }, - context_proc: ->(schedule) { { project: schedule.project, user: schedule.owner } } - ) + .find_in_batches(batch_size: BATCH_SIZE).with_index do |schedules, index| # rubocop: disable CodeReuse/ActiveRecord -- activates because of batch_size + enqueue_run_pipeline_schedule_worker(schedules, index) end end end @@ -42,4 +40,21 @@ class PipelineScheduleWorker # rubocop:disable Scalability/IdempotentWorker retries: LOCK_RETRY } end + + def enqueue_run_pipeline_schedule_worker(schedules, index) + if ::Feature.enabled?(:run_pipeline_schedule_worker_with_delay) + RunPipelineScheduleWorker.bulk_perform_in_with_contexts( + [1, index * DELAY].max, + schedules, + arguments_proc: ->(schedule) { [schedule.id, schedule.owner_id, { scheduling: true }] }, + context_proc: ->(schedule) { { project: schedule.project, user: schedule.owner } } + ) + else + RunPipelineScheduleWorker.bulk_perform_async_with_contexts( + schedules, + arguments_proc: ->(schedule) { [schedule.id, schedule.owner_id, { scheduling: true }] }, + context_proc: ->(schedule) { { project: schedule.project, user: schedule.owner } } + ) + end + end end diff --git a/app/workers/process_commit_worker.rb b/app/workers/process_commit_worker.rb index cc72704d8c9..30e394a95cf 100644 --- a/app/workers/process_commit_worker.rb +++ b/app/workers/process_commit_worker.rb @@ -42,6 +42,8 @@ class ProcessCommitWorker update_issue_metrics(commit, author) end + private + def process_commit_message(project, commit, user, author, default = false) # Ignore closing references from GitLab-generated commit messages. find_closing_issues = default && !commit.merged_merge_request?(user) diff --git a/app/workers/run_pipeline_schedule_worker.rb b/app/workers/run_pipeline_schedule_worker.rb index 61ef7494d38..52d825e5421 100644 --- a/app/workers/run_pipeline_schedule_worker.rb +++ b/app/workers/run_pipeline_schedule_worker.rb @@ -10,7 +10,7 @@ class RunPipelineScheduleWorker # rubocop:disable Scalability/IdempotentWorker queue_namespace :pipeline_creation feature_category :continuous_integration - deduplicate :until_executed + deduplicate :until_executed, including_scheduled: true idempotent! def perform(schedule_id, user_id, options = {}) |