Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGitLab Bot <gitlab-bot@gitlab.com>2023-12-19 14:01:45 +0300
committerGitLab Bot <gitlab-bot@gitlab.com>2023-12-19 14:01:45 +0300
commit9297025d0b7ddf095eb618dfaaab2ff8f2018d8b (patch)
tree865198c01d1824a9b098127baa3ab980c9cd2c06 /app/workers
parent6372471f43ee03c05a7c1f8b0c6ac6b8a7431dbe (diff)
Add latest changes from gitlab-org/gitlab@16-7-stable-eev16.7.0-rc42
Diffstat (limited to 'app/workers')
-rw-r--r--app/workers/abuse/trust_score_worker.rb23
-rw-r--r--app/workers/all_queues.yml73
-rw-r--r--app/workers/bulk_imports/entity_worker.rb59
-rw-r--r--app/workers/bulk_imports/export_request_worker.rb28
-rw-r--r--app/workers/bulk_imports/finish_batched_pipeline_worker.rb51
-rw-r--r--app/workers/bulk_imports/finish_project_import_worker.rb2
-rw-r--r--app/workers/bulk_imports/pipeline_batch_worker.rb21
-rw-r--r--app/workers/bulk_imports/pipeline_worker.rb105
-rw-r--r--app/workers/bulk_imports/relation_batch_export_worker.rb3
-rw-r--r--app/workers/bulk_imports/relation_export_worker.rb2
-rw-r--r--app/workers/bulk_imports/stuck_import_worker.rb37
-rw-r--r--app/workers/bulk_imports/transform_references_worker.rb147
-rw-r--r--app/workers/ci/catalog/resources/process_sync_events_worker.rb41
-rw-r--r--app/workers/ci/low_urgency_cancel_redundant_pipelines_worker.rb10
-rw-r--r--app/workers/ci/pipeline_artifacts/coverage_report_worker.rb1
-rw-r--r--app/workers/ci/runners/process_runner_version_update_worker.rb2
-rw-r--r--app/workers/ci/runners/reconcile_existing_runner_versions_cron_worker.rb2
-rw-r--r--app/workers/ci/runners/stale_machines_cleanup_cron_worker.rb2
-rw-r--r--app/workers/click_house/events_sync_worker.rb37
-rw-r--r--app/workers/concerns/click_house_worker.rb30
-rw-r--r--app/workers/concerns/gitlab/bitbucket_server_import/object_importer.rb8
-rw-r--r--app/workers/concerns/gitlab/github_import/object_importer.rb8
-rw-r--r--app/workers/concerns/gitlab/github_import/queue.rb2
-rw-r--r--app/workers/concerns/gitlab/github_import/rescheduling_methods.rb6
-rw-r--r--app/workers/concerns/gitlab/github_import/stage_methods.rb9
-rw-r--r--app/workers/concerns/update_repository_storage_worker.rb44
-rw-r--r--app/workers/container_registry/cleanup_worker.rb2
-rw-r--r--app/workers/delete_user_worker.rb32
-rw-r--r--app/workers/gitlab/bitbucket_server_import/stage/import_repository_worker.rb6
-rw-r--r--app/workers/gitlab/bitbucket_server_import/stage/import_users_worker.rb25
-rw-r--r--app/workers/gitlab/github_import/advance_stage_worker.rb9
-rw-r--r--app/workers/gitlab/github_import/refresh_import_jid_worker.rb12
-rw-r--r--app/workers/gitlab/github_import/stage/finish_import_worker.rb1
-rw-r--r--app/workers/gitlab/github_import/stage/import_attachments_worker.rb1
-rw-r--r--app/workers/gitlab/github_import/stage/import_base_data_worker.rb1
-rw-r--r--app/workers/gitlab/github_import/stage/import_collaborators_worker.rb1
-rw-r--r--app/workers/gitlab/github_import/stage/import_issue_events_worker.rb1
-rw-r--r--app/workers/gitlab/github_import/stage/import_issues_and_diff_notes_worker.rb1
-rw-r--r--app/workers/gitlab/github_import/stage/import_lfs_objects_worker.rb1
-rw-r--r--app/workers/gitlab/github_import/stage/import_notes_worker.rb1
-rw-r--r--app/workers/gitlab/github_import/stage/import_protected_branches_worker.rb1
-rw-r--r--app/workers/gitlab/github_import/stage/import_pull_requests_merged_by_worker.rb1
-rw-r--r--app/workers/gitlab/github_import/stage/import_pull_requests_review_requests_worker.rb1
-rw-r--r--app/workers/gitlab/github_import/stage/import_pull_requests_reviews_worker.rb1
-rw-r--r--app/workers/gitlab/github_import/stage/import_pull_requests_worker.rb1
-rw-r--r--app/workers/gitlab/github_import/stage/import_repository_worker.rb7
-rw-r--r--app/workers/gitlab/import/advance_stage.rb2
-rw-r--r--app/workers/merge_request_cleanup_refs_worker.rb2
-rw-r--r--app/workers/packages/cleanup_package_registry_worker.rb5
-rw-r--r--app/workers/packages/npm/create_metadata_cache_worker.rb2
-rw-r--r--app/workers/packages/nuget/cleanup_stale_symbols_worker.rb46
-rw-r--r--app/workers/pages/deactivate_mr_deployments_worker.rb29
-rw-r--r--app/workers/pages/deactivated_deployments_delete_cron_worker.rb2
-rw-r--r--app/workers/pipeline_metrics_worker.rb2
-rw-r--r--app/workers/pipeline_schedule_worker.rb27
-rw-r--r--app/workers/process_commit_worker.rb2
-rw-r--r--app/workers/run_pipeline_schedule_worker.rb2
57 files changed, 772 insertions, 208 deletions
diff --git a/app/workers/abuse/trust_score_worker.rb b/app/workers/abuse/trust_score_worker.rb
new file mode 100644
index 00000000000..061042ffa8a
--- /dev/null
+++ b/app/workers/abuse/trust_score_worker.rb
@@ -0,0 +1,23 @@
+# frozen_string_literal: true
+
+module Abuse
+ class TrustScoreWorker
+ include ApplicationWorker
+
+ data_consistency :delayed
+
+ idempotent!
+ feature_category :instance_resiliency
+ urgency :low
+
+ def perform(user_id, source, score, correlation_id = '')
+ user = User.find_by_id(user_id)
+ unless user
+ logger.info(structured_payload(message: "User not found.", user_id: user_id))
+ return
+ end
+
+ Abuse::TrustScore.create!(user: user, source: source, score: score.to_f, correlation_id_value: correlation_id)
+ end
+ end
+end
diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml
index 0bb88efe183..ec5156bb1d0 100644
--- a/app/workers/all_queues.yml
+++ b/app/workers/all_queues.yml
@@ -246,6 +246,15 @@
:weight: 1
:idempotent: false
:tags: []
+- :name: cronjob:ci_catalog_resources_process_sync_events
+ :worker_name: Ci::Catalog::Resources::ProcessSyncEventsWorker
+ :feature_category: :pipeline_composition
+ :has_external_dependencies: false
+ :urgency: :high
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: cronjob:ci_delete_unit_tests
:worker_name: Ci::DeleteUnitTestsWorker
:feature_category: :code_testing
@@ -275,7 +284,7 @@
:tags: []
- :name: cronjob:ci_runners_reconcile_existing_runner_versions_cron
:worker_name: Ci::Runners::ReconcileExistingRunnerVersionsCronWorker
- :feature_category: :runner_fleet
+ :feature_category: :fleet_visibility
:has_external_dependencies: false
:urgency: :low
:resource_boundary: :unknown
@@ -284,7 +293,7 @@
:tags: []
- :name: cronjob:ci_runners_stale_machines_cleanup_cron
:worker_name: Ci::Runners::StaleMachinesCleanupCronWorker
- :feature_category: :runner_fleet
+ :feature_category: :fleet_visibility
:has_external_dependencies: false
:urgency: :low
:resource_boundary: :unknown
@@ -1749,6 +1758,15 @@
:weight: 1
:idempotent: true
:tags: []
+- :name: package_cleanup:packages_nuget_cleanup_stale_symbols
+ :worker_name: Packages::Nuget::CleanupStaleSymbolsWorker
+ :feature_category: :package_registry
+ :has_external_dependencies: false
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: package_repositories:packages_debian_generate_distribution
:worker_name: Packages::Debian::GenerateDistributionWorker
:feature_category: :package_registry
@@ -2041,7 +2059,7 @@
:worker_name: PipelineMetricsWorker
:feature_category: :continuous_integration
:has_external_dependencies: false
- :urgency: :high
+ :urgency: :low
:resource_boundary: :unknown
:weight: 3
:idempotent: false
@@ -2298,6 +2316,15 @@
:weight: 1
:idempotent: true
:tags: []
+- :name: abuse_trust_score
+ :worker_name: Abuse::TrustScoreWorker
+ :feature_category: :instance_resiliency
+ :has_external_dependencies: false
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: analytics_usage_trends_counter_job
:worker_name: Analytics::UsageTrends::CounterJobWorker
:feature_category: :devops_reports
@@ -2559,6 +2586,15 @@
:weight: 1
:idempotent: false
:tags: []
+- :name: bitbucket_server_import_stage_import_users
+ :worker_name: Gitlab::BitbucketServerImport::Stage::ImportUsersWorker
+ :feature_category: :importers
+ :has_external_dependencies: true
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: false
+ :tags: []
- :name: bulk_import
:worker_name: BulkImportWorker
:feature_category: :importers
@@ -2636,7 +2672,7 @@
:feature_category: :importers
:has_external_dependencies: false
:urgency: :low
- :resource_boundary: :unknown
+ :resource_boundary: :memory
:weight: 1
:idempotent: true
:tags: []
@@ -2649,6 +2685,15 @@
:weight: 1
:idempotent: true
:tags: []
+- :name: bulk_imports_transform_references
+ :worker_name: BulkImports::TransformReferencesWorker
+ :feature_category: :importers
+ :has_external_dependencies: false
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: chat_notification
:worker_name: ChatNotificationWorker
:feature_category: :integrations
@@ -2703,6 +2748,15 @@
:weight: 1
:idempotent: true
:tags: []
+- :name: ci_low_urgency_cancel_redundant_pipelines
+ :worker_name: Ci::LowUrgencyCancelRedundantPipelinesWorker
+ :feature_category: :continuous_integration
+ :has_external_dependencies: false
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: ci_parse_secure_file_metadata
:worker_name: Ci::ParseSecureFileMetadataWorker
:feature_category: :mobile_devops
@@ -2714,7 +2768,7 @@
:tags: []
- :name: ci_runners_process_runner_version_update
:worker_name: Ci::Runners::ProcessRunnerVersionUpdateWorker
- :feature_category: :runner_fleet
+ :feature_category: :fleet_visibility
:has_external_dependencies: false
:urgency: :low
:resource_boundary: :unknown
@@ -3459,6 +3513,15 @@
:weight: 1
:idempotent: false
:tags: []
+- :name: pages_deactivate_mr_deployments
+ :worker_name: Pages::DeactivateMrDeploymentsWorker
+ :feature_category: :pages
+ :has_external_dependencies: false
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: pages_domain_ssl_renewal
:worker_name: PagesDomainSslRenewalWorker
:feature_category: :pages
diff --git a/app/workers/bulk_imports/entity_worker.rb b/app/workers/bulk_imports/entity_worker.rb
index e510a8c0d06..258ccea1f63 100644
--- a/app/workers/bulk_imports/entity_worker.rb
+++ b/app/workers/bulk_imports/entity_worker.rb
@@ -3,9 +3,10 @@
module BulkImports
class EntityWorker
include ApplicationWorker
+ include ExclusiveLeaseGuard
idempotent!
- deduplicate :until_executed, if_deduplicated: :reschedule_once
+ deduplicate :until_executing
data_consistency :always
feature_category :importers
sidekiq_options retry: 3, dead: false
@@ -27,7 +28,10 @@ module BulkImports
if running_tracker.present?
log_info(message: 'Stage running', entity_stage: running_tracker.stage)
else
- start_next_stage
+ # Use lease guard to prevent duplicated workers from starting multiple stages
+ try_obtain_lease do
+ start_next_stage
+ end
end
re_enqueue
@@ -38,7 +42,9 @@ module BulkImports
Gitlab::ErrorTracking.track_exception(
exception,
- log_params(message: "Request to export #{entity.source_type} failed")
+ {
+ message: "Request to export #{entity.source_type} failed"
+ }.merge(logger.default_attributes)
)
entity.fail_op!
@@ -49,7 +55,9 @@ module BulkImports
attr_reader :entity
def re_enqueue
- BulkImports::EntityWorker.perform_in(PERFORM_DELAY, entity.id)
+ with_context(bulk_import_entity_id: entity.id) do
+ BulkImports::EntityWorker.perform_in(PERFORM_DELAY, entity.id)
+ end
end
def running_tracker
@@ -66,43 +74,34 @@ module BulkImports
next_pipeline_trackers.each_with_index do |pipeline_tracker, index|
log_info(message: 'Stage starting', entity_stage: pipeline_tracker.stage) if index == 0
- BulkImports::PipelineWorker.perform_async(
- pipeline_tracker.id,
- pipeline_tracker.stage,
- entity.id
- )
+ with_context(bulk_import_entity_id: entity.id) do
+ BulkImports::PipelineWorker.perform_async(
+ pipeline_tracker.id,
+ pipeline_tracker.stage,
+ entity.id
+ )
+ end
end
end
- def source_version
- entity.bulk_import.source_version_info.to_s
+ def lease_timeout
+ PERFORM_DELAY
end
- def logger
- @logger ||= Logger.build
+ def lease_key
+ "gitlab:bulk_imports:entity_worker:#{entity.id}"
end
- def log_exception(exception, payload)
- Gitlab::ExceptionLogFormatter.format!(exception, payload)
-
- logger.error(structured_payload(payload))
+ def log_lease_taken
+ log_info(message: lease_taken_message)
end
- def log_info(payload)
- logger.info(structured_payload(log_params(payload)))
+ def logger
+ @logger ||= Logger.build.with_entity(entity)
end
- def log_params(extra)
- defaults = {
- bulk_import_entity_id: entity.id,
- bulk_import_id: entity.bulk_import_id,
- bulk_import_entity_type: entity.source_type,
- source_full_path: entity.source_full_path,
- source_version: source_version,
- importer: Logger::IMPORTER_NAME
- }
-
- defaults.merge(extra)
+ def log_info(payload)
+ logger.info(structured_payload(payload))
end
end
end
diff --git a/app/workers/bulk_imports/export_request_worker.rb b/app/workers/bulk_imports/export_request_worker.rb
index f7456ddccb1..bfe561cca5c 100644
--- a/app/workers/bulk_imports/export_request_worker.rb
+++ b/app/workers/bulk_imports/export_request_worker.rb
@@ -20,7 +20,9 @@ module BulkImports
set_source_xid
request_export
- BulkImports::EntityWorker.perform_async(entity_id)
+ with_context(bulk_import_entity_id: entity_id) do
+ BulkImports::EntityWorker.perform_async(entity_id)
+ end
end
def perform_failure(exception, entity_id)
@@ -73,16 +75,7 @@ module BulkImports
::GlobalID.parse(response.dig(*entity_query.data_path, 'id')).model_id
rescue StandardError => e
- log_exception(e,
- {
- message: 'Failed to fetch source entity id',
- bulk_import_entity_id: entity.id,
- bulk_import_id: entity.bulk_import_id,
- bulk_import_entity_type: entity.source_type,
- source_full_path: entity.source_full_path,
- source_version: entity.bulk_import.source_version_info.to_s
- }
- )
+ log_exception(e, message: 'Failed to fetch source entity id')
nil
end
@@ -96,7 +89,7 @@ module BulkImports
end
def logger
- @logger ||= Logger.build
+ @logger ||= Logger.build.with_entity(entity)
end
def log_exception(exception, payload)
@@ -106,16 +99,7 @@ module BulkImports
end
def log_and_fail(exception)
- log_exception(exception,
- {
- bulk_import_entity_id: entity.id,
- bulk_import_id: entity.bulk_import_id,
- bulk_import_entity_type: entity.source_type,
- source_full_path: entity.source_full_path,
- message: "Request to export #{entity.source_type} failed",
- source_version: entity.bulk_import.source_version_info.to_s
- }
- )
+ log_exception(exception, message: "Request to export #{entity.source_type} failed")
BulkImports::Failure.create(failure_attributes(exception))
diff --git a/app/workers/bulk_imports/finish_batched_pipeline_worker.rb b/app/workers/bulk_imports/finish_batched_pipeline_worker.rb
index 40d26e14dc1..2670dc5438d 100644
--- a/app/workers/bulk_imports/finish_batched_pipeline_worker.rb
+++ b/app/workers/bulk_imports/finish_batched_pipeline_worker.rb
@@ -6,6 +6,7 @@ module BulkImports
include ExceptionBacktrace
REQUEUE_DELAY = 5.seconds
+ STALE_AFTER = 4.hours
idempotent!
deduplicate :until_executing
@@ -18,46 +19,50 @@ module BulkImports
@tracker = Tracker.find(pipeline_tracker_id)
@context = ::BulkImports::Pipeline::Context.new(tracker)
- return unless tracker.batched?
- return unless tracker.started?
+ return unless tracker.batched? && tracker.started?
+
+ @sorted_batches = tracker.batches.by_last_updated
+ return fail_stale_tracker_and_batches if most_recent_batch_stale?
+
return re_enqueue if import_in_progress?
- if tracker.stale?
- logger.error(log_attributes(message: 'Tracker stale. Failing batches and tracker'))
- tracker.batches.map(&:fail_op!)
- tracker.fail_op!
- else
- tracker.pipeline_class.new(@context).on_finish
- logger.info(log_attributes(message: 'Tracker finished'))
- tracker.finish!
- end
+ tracker.pipeline_class.new(@context).on_finish
+ logger.info(log_attributes(message: 'Tracker finished'))
+ tracker.finish!
end
private
- attr_reader :tracker
+ attr_reader :tracker, :sorted_batches
def re_enqueue
- self.class.perform_in(REQUEUE_DELAY, tracker.id)
+ with_context(bulk_import_entity_id: tracker.entity.id) do
+ self.class.perform_in(REQUEUE_DELAY, tracker.id)
+ end
end
def import_in_progress?
- tracker.batches.any? { |b| b.started? || b.created? }
+ sorted_batches.in_progress.any?
+ end
+
+ def most_recent_batch_stale?
+ return false unless sorted_batches.any?
+
+ sorted_batches.first.updated_at < STALE_AFTER.ago
+ end
+
+ def fail_stale_tracker_and_batches
+ logger.error(log_attributes(message: 'Batch stale. Failing batches and tracker'))
+ sorted_batches.map(&:fail_op!)
+ tracker.fail_op!
end
def logger
- @logger ||= Logger.build
+ @logger ||= Logger.build.with_tracker(tracker)
end
def log_attributes(extra = {})
- structured_payload(
- {
- tracker_id: tracker.id,
- bulk_import_id: tracker.entity.id,
- bulk_import_entity_id: tracker.entity.bulk_import_id,
- pipeline_class: tracker.pipeline_name
- }.merge(extra)
- )
+ structured_payload(extra)
end
end
end
diff --git a/app/workers/bulk_imports/finish_project_import_worker.rb b/app/workers/bulk_imports/finish_project_import_worker.rb
index 815101c89f3..18b8c016493 100644
--- a/app/workers/bulk_imports/finish_project_import_worker.rb
+++ b/app/workers/bulk_imports/finish_project_import_worker.rb
@@ -5,7 +5,7 @@ module BulkImports
include ApplicationWorker
feature_category :importers
- sidekiq_options retry: 5
+ sidekiq_options retry: 3
data_consistency :sticky
idempotent!
diff --git a/app/workers/bulk_imports/pipeline_batch_worker.rb b/app/workers/bulk_imports/pipeline_batch_worker.rb
index 1485275e616..c24cc64e5c0 100644
--- a/app/workers/bulk_imports/pipeline_batch_worker.rb
+++ b/app/workers/bulk_imports/pipeline_batch_worker.rb
@@ -9,7 +9,7 @@ module BulkImports
data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency
feature_category :importers
- sidekiq_options dead: false, retry: 3
+ sidekiq_options dead: false, retry: 6
worker_has_external_dependencies!
worker_resource_boundary :memory
idempotent!
@@ -42,6 +42,7 @@ module BulkImports
@batch = ::BulkImports::BatchTracker.find(batch_id)
@tracker = @batch.tracker
+ @entity = @tracker.entity
@pending_retry = false
return unless process_batch?
@@ -50,7 +51,11 @@ module BulkImports
try_obtain_lease { run }
ensure
- ::BulkImports::FinishBatchedPipelineWorker.perform_async(tracker.id) unless pending_retry
+ unless pending_retry
+ with_context(bulk_import_entity_id: entity.id) do
+ ::BulkImports::FinishBatchedPipelineWorker.perform_async(tracker.id)
+ end
+ end
end
def perform_failure(batch_id, exception)
@@ -62,7 +67,7 @@ module BulkImports
private
- attr_reader :batch, :tracker, :pending_retry
+ attr_reader :batch, :tracker, :pending_retry, :entity
def run
return batch.skip! if tracker.failed? || tracker.finished?
@@ -83,7 +88,7 @@ module BulkImports
Gitlab::ErrorTracking.track_exception(exception, log_attributes(message: 'Batch tracker failed'))
BulkImports::Failure.create(
- bulk_import_entity_id: batch.tracker.entity.id,
+ bulk_import_entity_id: tracker.entity.id,
pipeline_class: tracker.pipeline_name,
pipeline_step: 'pipeline_batch_worker_run',
exception_class: exception.class.to_s,
@@ -91,7 +96,9 @@ module BulkImports
correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id
)
- ::BulkImports::FinishBatchedPipelineWorker.perform_async(tracker.id)
+ with_context(bulk_import_entity_id: tracker.entity.id) do
+ ::BulkImports::FinishBatchedPipelineWorker.perform_async(tracker.id)
+ end
end
def context
@@ -115,7 +122,9 @@ module BulkImports
def re_enqueue(delay = FILE_EXTRACTION_PIPELINE_PERFORM_DELAY)
log_extra_metadata_on_done(:re_enqueue, true)
- self.class.perform_in(delay, batch.id)
+ with_context(bulk_import_entity_id: entity.id) do
+ self.class.perform_in(delay, batch.id)
+ end
end
def process_batch?
diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb
index 2c1d28b33c5..0bb9464c6de 100644
--- a/app/workers/bulk_imports/pipeline_worker.rb
+++ b/app/workers/bulk_imports/pipeline_worker.rb
@@ -4,14 +4,17 @@ module BulkImports
class PipelineWorker
include ApplicationWorker
include ExclusiveLeaseGuard
+ include Gitlab::Utils::StrongMemoize
FILE_EXTRACTION_PIPELINE_PERFORM_DELAY = 10.seconds
+ LimitedBatches = Struct.new(:numbers, :final?, keyword_init: true).freeze
+
DEFER_ON_HEALTH_DELAY = 5.minutes
data_consistency :always
feature_category :importers
- sidekiq_options dead: false, retry: 3
+ sidekiq_options dead: false, retry: 6
worker_has_external_dependencies!
deduplicate :until_executing
worker_resource_boundary :memory
@@ -52,7 +55,6 @@ module BulkImports
try_obtain_lease do
if pipeline_tracker.enqueued? || pipeline_tracker.started?
logger.info(log_attributes(message: 'Pipeline starting'))
-
run
end
end
@@ -62,7 +64,7 @@ module BulkImports
@entity = ::BulkImports::Entity.find(entity_id)
@pipeline_tracker = ::BulkImports::Tracker.find(pipeline_tracker_id)
- fail_tracker(exception)
+ fail_pipeline(exception)
end
private
@@ -84,7 +86,8 @@ module BulkImports
return pipeline_tracker.finish! if export_status.batches_count < 1
- enqueue_batches
+ enqueue_limited_batches
+ re_enqueue unless all_batches_enqueued?
else
log_extra_metadata_on_done(:batched, false)
@@ -96,13 +99,11 @@ module BulkImports
retry_tracker(e)
end
- def source_version
- entity.bulk_import.source_version_info.to_s
- end
-
- def fail_tracker(exception)
+ def fail_pipeline(exception)
pipeline_tracker.update!(status_event: 'fail_op', jid: jid)
+ entity.fail_op! if pipeline_tracker.abort_on_failure?
+
log_exception(exception, log_attributes(message: 'Pipeline failed'))
Gitlab::ErrorTracking.track_exception(exception, log_attributes)
@@ -118,18 +119,20 @@ module BulkImports
end
def logger
- @logger ||= Logger.build
+ @logger ||= Logger.build.with_tracker(pipeline_tracker)
end
def re_enqueue(delay = FILE_EXTRACTION_PIPELINE_PERFORM_DELAY)
log_extra_metadata_on_done(:re_enqueue, true)
- self.class.perform_in(
- delay,
- pipeline_tracker.id,
- pipeline_tracker.stage,
- entity.id
- )
+ with_context(bulk_import_entity_id: entity.id) do
+ self.class.perform_in(
+ delay,
+ pipeline_tracker.id,
+ pipeline_tracker.stage,
+ entity.id
+ )
+ end
end
def context
@@ -181,19 +184,7 @@ module BulkImports
end
def log_attributes(extra = {})
- structured_payload(
- {
- bulk_import_entity_id: entity.id,
- bulk_import_id: entity.bulk_import_id,
- bulk_import_entity_type: entity.source_type,
- source_full_path: entity.source_full_path,
- pipeline_tracker_id: pipeline_tracker.id,
- pipeline_class: pipeline_tracker.pipeline_name,
- pipeline_tracker_state: pipeline_tracker.human_status_name,
- source_version: source_version,
- importer: Logger::IMPORTER_NAME
- }.merge(extra)
- )
+ logger.default_attributes.merge(extra)
end
def log_exception(exception, payload)
@@ -206,20 +197,60 @@ module BulkImports
Time.zone.now - (pipeline_tracker.created_at || entity.created_at)
end
- def lease_timeout
- 30
+ def enqueue_limited_batches
+ next_batch.numbers.each do |batch_number|
+ batch = pipeline_tracker.batches.create!(batch_number: batch_number)
+
+ with_context(bulk_import_entity_id: entity.id) do
+ ::BulkImports::PipelineBatchWorker.perform_async(batch.id)
+ end
+ end
+
+ log_extra_metadata_on_done(:tracker_batch_numbers_enqueued, next_batch.numbers)
+ log_extra_metadata_on_done(:tracker_final_batch_was_enqueued, next_batch.final?)
end
- def lease_key
- "gitlab:bulk_imports:pipeline_worker:#{pipeline_tracker.id}"
+ def all_batches_enqueued?
+ next_batch.final?
end
- def enqueue_batches
- 1.upto(export_status.batches_count) do |batch_number|
- batch = pipeline_tracker.batches.find_or_create_by!(batch_number: batch_number) # rubocop:disable CodeReuse/ActiveRecord
+ def next_batch
+ all_batch_numbers = (1..export_status.batches_count).to_a
+
+ created_batch_numbers = pipeline_tracker.batches.pluck_batch_numbers
- ::BulkImports::PipelineBatchWorker.perform_async(batch.id)
+ remaining_batch_numbers = all_batch_numbers - created_batch_numbers
+
+ if Feature.disabled?(:bulk_import_limit_concurrent_batches, context.portable)
+ return LimitedBatches.new(numbers: remaining_batch_numbers, final?: true)
end
+
+ limit = next_batch_count
+
+ LimitedBatches.new(
+ numbers: remaining_batch_numbers.first(limit),
+ final?: remaining_batch_numbers.count <= limit
+ )
+ end
+ strong_memoize_attr :next_batch
+
+ # Calculate the number of batches, up to `batch_limit`, to process in the
+ # next round.
+ def next_batch_count
+ limit = batch_limit - pipeline_tracker.batches.in_progress.limit(batch_limit).count
+ [limit, 0].max
+ end
+
+ def batch_limit
+ ::Gitlab::CurrentSettings.bulk_import_concurrent_pipeline_batch_limit
+ end
+
+ def lease_timeout
+ 30
+ end
+
+ def lease_key
+ "gitlab:bulk_imports:pipeline_worker:#{pipeline_tracker.id}"
end
end
end
diff --git a/app/workers/bulk_imports/relation_batch_export_worker.rb b/app/workers/bulk_imports/relation_batch_export_worker.rb
index 87ceb775075..08c5fb81460 100644
--- a/app/workers/bulk_imports/relation_batch_export_worker.rb
+++ b/app/workers/bulk_imports/relation_batch_export_worker.rb
@@ -7,7 +7,8 @@ module BulkImports
idempotent!
data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency
feature_category :importers
- sidekiq_options status_expiration: StuckExportJobsWorker::EXPORT_JOBS_EXPIRATION, retry: 3
+ sidekiq_options status_expiration: StuckExportJobsWorker::EXPORT_JOBS_EXPIRATION, retry: 6
+ worker_resource_boundary :memory
sidekiq_retries_exhausted do |job, exception|
batch = BulkImports::ExportBatch.find(job['args'][1])
diff --git a/app/workers/bulk_imports/relation_export_worker.rb b/app/workers/bulk_imports/relation_export_worker.rb
index 168626fee85..90941e7583b 100644
--- a/app/workers/bulk_imports/relation_export_worker.rb
+++ b/app/workers/bulk_imports/relation_export_worker.rb
@@ -10,7 +10,7 @@ module BulkImports
loggable_arguments 2, 3
data_consistency :always
feature_category :importers
- sidekiq_options status_expiration: StuckExportJobsWorker::EXPORT_JOBS_EXPIRATION, retry: 3
+ sidekiq_options status_expiration: StuckExportJobsWorker::EXPORT_JOBS_EXPIRATION, retry: 6
worker_resource_boundary :memory
sidekiq_retries_exhausted do |job, exception|
diff --git a/app/workers/bulk_imports/stuck_import_worker.rb b/app/workers/bulk_imports/stuck_import_worker.rb
index 6c8569b0aa0..cb4e10a29b2 100644
--- a/app/workers/bulk_imports/stuck_import_worker.rb
+++ b/app/workers/bulk_imports/stuck_import_worker.rb
@@ -12,24 +12,27 @@ module BulkImports
feature_category :importers
+ # Using Keyset pagination for scopes that involve timestamp indexes
def perform
- BulkImport.stale.find_each do |import|
- logger.error(message: 'BulkImport stale', bulk_import_id: import.id)
- import.cleanup_stale
+ Gitlab::Pagination::Keyset::Iterator.new(scope: bulk_import_scope).each_batch do |imports|
+ imports.each do |import|
+ logger.error(message: 'BulkImport stale', bulk_import_id: import.id)
+ import.cleanup_stale
+ end
end
- BulkImports::Entity.includes(:trackers).stale.find_each do |entity| # rubocop: disable CodeReuse/ActiveRecord
- ApplicationRecord.transaction do
- logger.error(
- message: 'BulkImports::Entity stale',
- bulk_import_id: entity.bulk_import_id,
- bulk_import_entity_id: entity.id
- )
+ Gitlab::Pagination::Keyset::Iterator.new(scope: entity_scope).each_batch do |entities|
+ entities.each do |entity|
+ ApplicationRecord.transaction do
+ logger.with_entity(entity).error(
+ message: 'BulkImports::Entity stale'
+ )
- entity.cleanup_stale
+ entity.cleanup_stale
- entity.trackers.find_each do |tracker|
- tracker.cleanup_stale
+ entity.trackers.find_each do |tracker|
+ tracker.cleanup_stale
+ end
end
end
end
@@ -38,5 +41,13 @@ module BulkImports
def logger
@logger ||= Logger.build
end
+
+ def bulk_import_scope
+ BulkImport.stale.order_by_updated_at_and_id(:asc)
+ end
+
+ def entity_scope
+ BulkImports::Entity.with_trackers.stale.order_by_updated_at_and_id(:asc)
+ end
end
end
diff --git a/app/workers/bulk_imports/transform_references_worker.rb b/app/workers/bulk_imports/transform_references_worker.rb
new file mode 100644
index 00000000000..383ad2fd733
--- /dev/null
+++ b/app/workers/bulk_imports/transform_references_worker.rb
@@ -0,0 +1,147 @@
+# frozen_string_literal: true
+
+module BulkImports
+ class TransformReferencesWorker
+ include ApplicationWorker
+
+ idempotent!
+ data_consistency :delayed
+ sidekiq_options retry: 3, dead: false
+ feature_category :importers
+
+ # rubocop: disable CodeReuse/ActiveRecord
+ def perform(object_ids, klass, tracker_id)
+ @tracker = BulkImports::Tracker.find_by_id(tracker_id)
+
+ return unless tracker
+
+ project = tracker.entity.project
+
+ klass.constantize.where(id: object_ids, project: project).find_each do |object|
+ transform_and_save(object)
+ end
+ end
+ # rubocop: enable CodeReuse/ActiveRecord
+
+ attr_reader :tracker
+
+ private
+
+ def transform_and_save(object)
+ body = object_body(object).dup
+
+ return if body.blank?
+
+ object.refresh_markdown_cache!
+
+ body.gsub!(username_regex(mapped_usernames), mapped_usernames)
+
+ if object_has_reference?(body)
+ matching_urls(object).each do |old_url, new_url|
+ body.gsub!(old_url, new_url) if body.include?(old_url)
+ end
+ end
+
+ object.assign_attributes(body_field(object) => body)
+ object.save!(touch: false) if object_body_changed?(object)
+
+ object
+ rescue StandardError => e
+ log_and_fail(e)
+ end
+
+ def object_body(object)
+ call_object_method(object)
+ end
+
+ def object_body_changed?(object)
+ call_object_method(object, suffix: '_changed?')
+ end
+
+ def call_object_method(object, suffix: nil)
+ method = body_field(object)
+ method = "#{method}#{suffix}" if suffix.present?
+
+ object.public_send(method) # rubocop:disable GitlabSecurity/PublicSend -- the method being called is dependent on several factors
+ end
+
+ def body_field(object)
+ object.is_a?(Note) ? 'note' : 'description'
+ end
+
+ def mapped_usernames
+ @mapped_usernames ||= ::BulkImports::UsersMapper.new(context: context)
+ .map_usernames.transform_keys { |key| "@#{key}" }
+ .transform_values { |value| "@#{value}" }
+ end
+
+ def username_regex(mapped_usernames)
+ @username_regex ||= Regexp.new(mapped_usernames.keys.sort_by(&:length)
+ .reverse.map { |x| Regexp.escape(x) }.join('|'))
+ end
+
+ def matching_urls(object)
+ URI.extract(object_body(object), %w[http https]).each_with_object([]) do |url, array|
+ parsed_url = URI.parse(url)
+
+ next unless source_host == parsed_url.host
+ next unless parsed_url.path&.start_with?("/#{source_full_path}")
+
+ array << [url, new_url(object, parsed_url)]
+ end
+ end
+
+ def new_url(object, parsed_old_url)
+ parsed_old_url.host = ::Gitlab.config.gitlab.host
+ parsed_old_url.port = ::Gitlab.config.gitlab.port
+ parsed_old_url.scheme = ::Gitlab.config.gitlab.https ? 'https' : 'http'
+ parsed_old_url.to_s.gsub!(source_full_path, full_path(object))
+ end
+
+ def source_host
+ @source_host ||= URI.parse(context.configuration.url).host
+ end
+
+ def source_full_path
+ @source_full_path ||= context.entity.source_full_path
+ end
+
+ def full_path(object)
+ object.project.full_path
+ end
+
+ def object_has_reference?(body)
+ body.include?(source_full_path)
+ end
+
+ def log_and_fail(exception)
+ Gitlab::ErrorTracking.track_exception(exception, log_params)
+ BulkImports::Failure.create(failure_attributes(exception))
+ end
+
+ def log_params
+ {
+ message: 'Failed to update references',
+ bulk_import_id: context.bulk_import_id,
+ bulk_import_entity_id: tracker.bulk_import_entity_id,
+ source_full_path: context.entity.source_full_path,
+ source_version: context.bulk_import.source_version,
+ importer: 'gitlab_migration'
+ }
+ end
+
+ def failure_attributes(exception)
+ {
+ bulk_import_entity_id: context.entity.id,
+ pipeline_class: 'ReferencesPipeline',
+ exception_class: exception.class.to_s,
+ exception_message: exception.message.truncate(255),
+ correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id
+ }
+ end
+
+ def context
+ @context ||= BulkImports::Pipeline::Context.new(tracker)
+ end
+ end
+end
diff --git a/app/workers/ci/catalog/resources/process_sync_events_worker.rb b/app/workers/ci/catalog/resources/process_sync_events_worker.rb
new file mode 100644
index 00000000000..15e06393aff
--- /dev/null
+++ b/app/workers/ci/catalog/resources/process_sync_events_worker.rb
@@ -0,0 +1,41 @@
+# frozen_string_literal: true
+
+module Ci
+ module Catalog
+ module Resources
+ # This worker can be called multiple times simultaneously but only one can process events
+ # at a time. This is ensured by `try_obtain_lease` in `Ci::ProcessSyncEventsService`.
+ #
+ # This worker is enqueued in 3 ways:
+ # 1. By Project model callback after updating one of the columns referenced in
+ # `Ci::Catalog::Resource#sync_with_project`.
+ # 2. Every minute by cron job. This ensures we process SyncEvents from direct/bulk
+ # database updates that do not use the Project AR model.
+ # 3. By `Ci::ProcessSyncEventsService` if there are any remaining pending
+ # SyncEvents after processing.
+ #
+ class ProcessSyncEventsWorker
+ include ApplicationWorker
+ include CronjobQueue # rubocop: disable Scalability/CronWorkerContext -- Periodic processing is required
+
+ feature_category :pipeline_composition
+
+ data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency -- We should not sync stale data
+ urgency :high
+
+ idempotent!
+ deduplicate :until_executed, if_deduplicated: :reschedule_once, ttl: 1.minute
+
+ def perform
+ results = ::Ci::ProcessSyncEventsService.new(
+ ::Ci::Catalog::Resources::SyncEvent, ::Ci::Catalog::Resource
+ ).execute
+
+ results.each do |key, value|
+ log_extra_metadata_on_done(key, value)
+ end
+ end
+ end
+ end
+ end
+end
diff --git a/app/workers/ci/low_urgency_cancel_redundant_pipelines_worker.rb b/app/workers/ci/low_urgency_cancel_redundant_pipelines_worker.rb
new file mode 100644
index 00000000000..4eb55a9ecd4
--- /dev/null
+++ b/app/workers/ci/low_urgency_cancel_redundant_pipelines_worker.rb
@@ -0,0 +1,10 @@
+# frozen_string_literal: true
+
+module Ci
+ # Scheduled pipelines rarely cancel other pipelines and we don't need to
+ # use high urgency
+ class LowUrgencyCancelRedundantPipelinesWorker < CancelRedundantPipelinesWorker
+ urgency :low
+ idempotent!
+ end
+end
diff --git a/app/workers/ci/pipeline_artifacts/coverage_report_worker.rb b/app/workers/ci/pipeline_artifacts/coverage_report_worker.rb
index 53bed0fa9da..3184fee2071 100644
--- a/app/workers/ci/pipeline_artifacts/coverage_report_worker.rb
+++ b/app/workers/ci/pipeline_artifacts/coverage_report_worker.rb
@@ -13,6 +13,7 @@ module Ci
feature_category :code_testing
idempotent!
+ deduplicate :until_executed
def perform(pipeline_id)
pipeline = Ci::Pipeline.find_by_id(pipeline_id)
diff --git a/app/workers/ci/runners/process_runner_version_update_worker.rb b/app/workers/ci/runners/process_runner_version_update_worker.rb
index f1ad0c8563e..acb1aac78a4 100644
--- a/app/workers/ci/runners/process_runner_version_update_worker.rb
+++ b/app/workers/ci/runners/process_runner_version_update_worker.rb
@@ -7,7 +7,7 @@ module Ci
data_consistency :always
- feature_category :runner_fleet
+ feature_category :fleet_visibility
urgency :low
idempotent!
diff --git a/app/workers/ci/runners/reconcile_existing_runner_versions_cron_worker.rb b/app/workers/ci/runners/reconcile_existing_runner_versions_cron_worker.rb
index 722c513a4bb..7bcfed1580f 100644
--- a/app/workers/ci/runners/reconcile_existing_runner_versions_cron_worker.rb
+++ b/app/workers/ci/runners/reconcile_existing_runner_versions_cron_worker.rb
@@ -9,7 +9,7 @@ module Ci
include CronjobQueue # rubocop:disable Scalability/CronWorkerContext
data_consistency :sticky
- feature_category :runner_fleet
+ feature_category :fleet_visibility
urgency :low
deduplicate :until_executed
diff --git a/app/workers/ci/runners/stale_machines_cleanup_cron_worker.rb b/app/workers/ci/runners/stale_machines_cleanup_cron_worker.rb
index 9407e7c0e0a..9831e3e98b7 100644
--- a/app/workers/ci/runners/stale_machines_cleanup_cron_worker.rb
+++ b/app/workers/ci/runners/stale_machines_cleanup_cron_worker.rb
@@ -9,7 +9,7 @@ module Ci
include CronjobQueue # rubocop:disable Scalability/CronWorkerContext
data_consistency :sticky
- feature_category :runner_fleet
+ feature_category :fleet_visibility
urgency :low
idempotent!
diff --git a/app/workers/click_house/events_sync_worker.rb b/app/workers/click_house/events_sync_worker.rb
index e884a43b1e3..21c10566a67 100644
--- a/app/workers/click_house/events_sync_worker.rb
+++ b/app/workers/click_house/events_sync_worker.rb
@@ -3,7 +3,9 @@
module ClickHouse
class EventsSyncWorker
include ApplicationWorker
+ include ClickHouseWorker
include Gitlab::ExclusiveLeaseHelpers
+ include Gitlab::Utils::StrongMemoize
idempotent!
queue_namespace :cronjob
@@ -91,8 +93,13 @@ module ClickHouse
)
end
+ def last_event_id_in_postgresql
+ Event.maximum(:id)
+ end
+ strong_memoize_attr :last_event_id_in_postgresql
+
def enabled?
- ClickHouse::Client.configuration.databases[:main].present? && Feature.enabled?(:event_sync_worker_for_click_house)
+ ClickHouse::Client.database_configured?(:main) && Feature.enabled?(:event_sync_worker_for_click_house)
end
def next_batch
@@ -110,24 +117,34 @@ module ClickHouse
def process_batch(context)
Enumerator.new do |yielder|
- has_data = false
- # rubocop: disable CodeReuse/ActiveRecord
- Event.where(Event.arel_table[:id].gt(context.last_record_id)).each_batch(of: BATCH_SIZE) do |relation|
- has_data = true
-
- relation.select(*EVENT_PROJECTIONS).each do |row|
+ has_more_data = false
+ batching_scope.each_batch(of: BATCH_SIZE) do |relation|
+ records = relation.select(*EVENT_PROJECTIONS).to_a
+ has_more_data = records.size == BATCH_SIZE
+ records.each do |row|
yielder << row
context.last_processed_id = row.id
break if context.record_limit_reached?
end
- break if context.over_time? || context.record_limit_reached?
+ break if context.over_time? || context.record_limit_reached? || !has_more_data
end
- context.no_more_records! if has_data == false
- # rubocop: enable CodeReuse/ActiveRecord
+ context.no_more_records! unless has_more_data
end
end
+
+ # rubocop: disable CodeReuse/ActiveRecord
+ def batching_scope
+ return Event.none unless last_event_id_in_postgresql
+
+ table = Event.arel_table
+
+ Event
+ .where(table[:id].gt(context.last_record_id))
+ .where(table[:id].lteq(last_event_id_in_postgresql))
+ end
+ # rubocop: enable CodeReuse/ActiveRecord
end
end
diff --git a/app/workers/concerns/click_house_worker.rb b/app/workers/concerns/click_house_worker.rb
new file mode 100644
index 00000000000..6399796f6df
--- /dev/null
+++ b/app/workers/concerns/click_house_worker.rb
@@ -0,0 +1,30 @@
+# frozen_string_literal: true
+
+module ClickHouseWorker
+ extend ActiveSupport::Concern
+
+ class_methods do
+ def register_click_house_worker?
+ click_house_worker_attrs.present?
+ end
+
+ def click_house_worker_attrs
+ get_class_attribute(:click_house_worker_attrs)
+ end
+
+ def click_house_migration_lock(ttl)
+ raise ArgumentError unless ttl.is_a?(ActiveSupport::Duration)
+
+ set_class_attribute(
+ :click_house_worker_attrs,
+ (click_house_worker_attrs || {}).merge(migration_lock_ttl: ttl)
+ )
+ end
+ end
+
+ included do
+ click_house_migration_lock(ClickHouse::MigrationSupport::ExclusiveLock::DEFAULT_CLICKHOUSE_WORKER_TTL)
+
+ pause_control :click_house_migration
+ end
+end
diff --git a/app/workers/concerns/gitlab/bitbucket_server_import/object_importer.rb b/app/workers/concerns/gitlab/bitbucket_server_import/object_importer.rb
index 1090d82c922..fbcb5d81c8a 100644
--- a/app/workers/concerns/gitlab/bitbucket_server_import/object_importer.rb
+++ b/app/workers/concerns/gitlab/bitbucket_server_import/object_importer.rb
@@ -7,6 +7,8 @@ module Gitlab
module ObjectImporter
extend ActiveSupport::Concern
+ FAILED_IMPORT_STATES = %w[canceled failed].freeze
+
included do
include ApplicationWorker
@@ -33,8 +35,10 @@ module Gitlab
return unless project
- if project.import_state&.canceled?
- info(project.id, message: 'project import canceled')
+ import_state = project.import_status
+
+ if FAILED_IMPORT_STATES.include?(import_state)
+ info(project.id, message: "project import #{import_state}")
return
end
diff --git a/app/workers/concerns/gitlab/github_import/object_importer.rb b/app/workers/concerns/gitlab/github_import/object_importer.rb
index fcc7a96fa2b..15156e1deef 100644
--- a/app/workers/concerns/gitlab/github_import/object_importer.rb
+++ b/app/workers/concerns/gitlab/github_import/object_importer.rb
@@ -16,6 +16,7 @@ module Gitlab
feature_category :importers
worker_has_external_dependencies!
+ sidekiq_options retry: 5
sidekiq_retries_exhausted do |msg|
args = msg['args']
jid = msg['jid']
@@ -57,12 +58,7 @@ module Gitlab
end
info(project.id, message: 'importer finished')
- rescue NoMethodError => e
- # This exception will be more useful in development when a new
- # Representation is created but the developer forgot to add a
- # `#github_identifiers` method.
- track_and_raise_exception(project, e, fail_import: true)
- rescue ActiveRecord::RecordInvalid, NotRetriableError => e
+ rescue ActiveRecord::RecordInvalid, NotRetriableError, NoMethodError => e
# We do not raise exception to prevent job retry
track_exception(project, e)
rescue StandardError => e
diff --git a/app/workers/concerns/gitlab/github_import/queue.rb b/app/workers/concerns/gitlab/github_import/queue.rb
index 7cc23dd7c0b..5aabc74a3d5 100644
--- a/app/workers/concerns/gitlab/github_import/queue.rb
+++ b/app/workers/concerns/gitlab/github_import/queue.rb
@@ -14,7 +14,7 @@ module Gitlab
# the dead queue. This does mean some resources may not be imported, but
# this is better than a project being stuck in the "import" state
# forever.
- sidekiq_options dead: false, retry: 5
+ sidekiq_options dead: false
end
end
end
diff --git a/app/workers/concerns/gitlab/github_import/rescheduling_methods.rb b/app/workers/concerns/gitlab/github_import/rescheduling_methods.rb
index 316d30d94da..e2808f45821 100644
--- a/app/workers/concerns/gitlab/github_import/rescheduling_methods.rb
+++ b/app/workers/concerns/gitlab/github_import/rescheduling_methods.rb
@@ -8,6 +8,8 @@ module Gitlab
extend ActiveSupport::Concern
include JobDelayCalculator
+ attr_reader :project
+
ENQUEUED_JOB_COUNT = 'github-importer/enqueued_job_count/%{project}/%{collection}'
included do
@@ -17,8 +19,10 @@ module Gitlab
# project_id - The ID of the GitLab project to import the note into.
# hash - A Hash containing the details of the GitHub object to import.
# notify_key - The Redis key to notify upon completion, if any.
+
def perform(project_id, hash, notify_key = nil)
- project = Project.find_by_id(project_id)
+ @project = Project.find_by_id(project_id) # rubocop:disable Gitlab/ModuleWithInstanceVariables -- GitHub Import
+ # uses modules everywhere. Too big to refactor.
return notify_waiter(notify_key) unless project
diff --git a/app/workers/concerns/gitlab/github_import/stage_methods.rb b/app/workers/concerns/gitlab/github_import/stage_methods.rb
index 5c63c667a03..5f6812ab84f 100644
--- a/app/workers/concerns/gitlab/github_import/stage_methods.rb
+++ b/app/workers/concerns/gitlab/github_import/stage_methods.rb
@@ -9,6 +9,11 @@ module Gitlab
included do
include ApplicationWorker
+ include GithubImport::Queue
+
+ sidekiq_options retry: 6
+
+ sidekiq_options status_expiration: Gitlab::Import::StuckImportJob::IMPORT_JOBS_EXPIRATION
sidekiq_retries_exhausted do |msg, e|
Gitlab::Import::ImportFailureService.track(
@@ -37,8 +42,6 @@ module Gitlab
# - Continue their loop from where it left off:
# https://gitlab.com/gitlab-org/gitlab/-/blob/024235ec/lib/gitlab/github_import/importer/pull_requests/review_requests_importer.rb#L15
def resumes_work_when_interrupted!
- return unless Feature.enabled?(:github_importer_raise_max_interruptions)
-
sidekiq_options max_retries_after_interruption: MAX_RETRIES_AFTER_INTERRUPTION
end
end
@@ -79,7 +82,7 @@ module Gitlab
# client - An instance of Gitlab::GithubImport::Client.
# project - An instance of Project.
def try_import(client, project)
- project.import_state.refresh_jid_expiration
+ RefreshImportJidWorker.perform_in_the_future(project.id, jid)
import(client, project)
rescue RateLimitError
diff --git a/app/workers/concerns/update_repository_storage_worker.rb b/app/workers/concerns/update_repository_storage_worker.rb
index 01744d1e57d..fd437ebc158 100644
--- a/app/workers/concerns/update_repository_storage_worker.rb
+++ b/app/workers/concerns/update_repository_storage_worker.rb
@@ -11,7 +11,19 @@ module UpdateRepositoryStorageWorker
urgency :throttled
end
- def perform(container_id, new_repository_storage_key, repository_storage_move_id = nil)
+ LEASE_TIMEOUT = 30.minutes.to_i
+
+ # `container_id` and `new_repository_storage_key` arguments have been deprecated.
+ # `repository_storage_move_id` is now a mandatory argument.
+ # We are using *args for backwards compatability. Previously defined as:
+ # perform(container_id, new_repository_storage_key, repository_storage_move_id = nil)
+ def perform(*args)
+ if args.length == 1
+ repository_storage_move_id = args[0]
+ else
+ container_id, new_repository_storage_key, repository_storage_move_id = *args
+ end
+
repository_storage_move =
if repository_storage_move_id
find_repository_storage_move(repository_storage_move_id)
@@ -24,7 +36,35 @@ module UpdateRepositoryStorageWorker
)
end
- update_repository_storage(repository_storage_move)
+ container_id ||= repository_storage_move.container_id
+
+ # Use exclusive lock to prevent multiple storage migrations at the same time
+ #
+ # Note: instead of using a randomly generated `uuid`, we provide a worker jid value.
+ # That will allow to track a worker that requested a lease.
+ lease_key = [self.class.name.underscore, container_id].join(':')
+ exclusive_lease = Gitlab::ExclusiveLease.new(lease_key, uuid: jid, timeout: LEASE_TIMEOUT)
+ lease = exclusive_lease.try_obtain
+
+ if lease
+ begin
+ update_repository_storage(repository_storage_move)
+ ensure
+ exclusive_lease.cancel
+ end
+ else
+ # If there is an ungoing storage migration, then the current one should be marked as failed
+ repository_storage_move.do_fail!
+
+ # A special case
+ # Sidekiq can receive an interrupt signal during the processing.
+ # It kills existing workers and reschedules their jobs using the same jid.
+ # But it can cause a situation when the migration is only half complete (see https://gitlab.com/gitlab-org/gitlab/-/issues/429049#note_1635650597)
+ #
+ # Here we detect this case and release the lock.
+ uuid = Gitlab::ExclusiveLease.get_uuid(lease_key)
+ exclusive_lease.cancel if uuid == jid
+ end
end
private
diff --git a/app/workers/container_registry/cleanup_worker.rb b/app/workers/container_registry/cleanup_worker.rb
index 9ec02dd613e..cd61c5ebcb4 100644
--- a/app/workers/container_registry/cleanup_worker.rb
+++ b/app/workers/container_registry/cleanup_worker.rb
@@ -38,7 +38,7 @@ module ContainerRegistry
# Deleting stale ongoing repair details would put the project back to the analysis pool
ContainerRegistry::DataRepairDetail
.ongoing_since(STALE_REPAIR_DETAIL_THRESHOLD.ago)
- .each_batch(of: BATCH_SIZE) do |batch| # rubocop:disable Style/SymbolProc
+ .each_batch(of: BATCH_SIZE) do |batch|
batch.delete_all
end
end
diff --git a/app/workers/delete_user_worker.rb b/app/workers/delete_user_worker.rb
index 6a375a0cdd4..4634ea8ff4f 100644
--- a/app/workers/delete_user_worker.rb
+++ b/app/workers/delete_user_worker.rb
@@ -14,7 +14,7 @@ class DeleteUserWorker # rubocop:disable Scalability/IdempotentWorker
delete_user = User.find_by_id(delete_user_id)
return unless delete_user.present?
- return if delete_user.banned? && ::Feature.enabled?(:delay_delete_own_user)
+ return if skip_own_account_deletion?(delete_user)
current_user = User.find_by_id(current_user_id)
return unless current_user.present?
@@ -23,4 +23,34 @@ class DeleteUserWorker # rubocop:disable Scalability/IdempotentWorker
rescue Gitlab::Access::AccessDeniedError => e
Gitlab::AppLogger.warn("User could not be destroyed: #{e}")
end
+
+ private
+
+ def skip_own_account_deletion?(user)
+ return false unless ::Feature.enabled?(:delay_delete_own_user)
+
+ skip =
+ if user.banned?
+ true
+ else
+ # User is blocked when they delete their own account. Skip record deletion
+ # when user has been unblocked (e.g. when the user's account is reinstated
+ # by Trust & Safety)
+ user.deleted_own_account? && !user.blocked?
+ end
+
+ if skip
+ user.custom_attributes.by_key(UserCustomAttribute::DELETED_OWN_ACCOUNT_AT).first&.destroy
+ UserCustomAttribute.set_skipped_account_deletion_at(user)
+
+ Gitlab::AppLogger.info(
+ message: 'Skipped own account deletion.',
+ reason: "User has been #{user.banned? ? 'banned' : 'unblocked'}.",
+ user_id: user.id,
+ username: user.username
+ )
+ end
+
+ skip
+ end
end
diff --git a/app/workers/gitlab/bitbucket_server_import/stage/import_repository_worker.rb b/app/workers/gitlab/bitbucket_server_import/stage/import_repository_worker.rb
index b378d07d59c..573c73cd7df 100644
--- a/app/workers/gitlab/bitbucket_server_import/stage/import_repository_worker.rb
+++ b/app/workers/gitlab/bitbucket_server_import/stage/import_repository_worker.rb
@@ -14,7 +14,11 @@ module Gitlab
importer.execute
- ImportPullRequestsWorker.perform_async(project.id)
+ if Feature.enabled?(:bitbucket_server_convert_mentions_to_users, project.creator)
+ ImportUsersWorker.perform_async(project.id)
+ else
+ ImportPullRequestsWorker.perform_async(project.id)
+ end
end
def importer_class
diff --git a/app/workers/gitlab/bitbucket_server_import/stage/import_users_worker.rb b/app/workers/gitlab/bitbucket_server_import/stage/import_users_worker.rb
new file mode 100644
index 00000000000..dd18139fc9e
--- /dev/null
+++ b/app/workers/gitlab/bitbucket_server_import/stage/import_users_worker.rb
@@ -0,0 +1,25 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module BitbucketServerImport
+ module Stage
+ class ImportUsersWorker # rubocop:disable Scalability/IdempotentWorker -- ImportPullRequestsWorker is not idempotent
+ include StageMethods
+
+ private
+
+ def import(project)
+ importer = importer_class.new(project)
+
+ importer.execute
+
+ ImportPullRequestsWorker.perform_async(project.id)
+ end
+
+ def importer_class
+ Importers::UsersImporter
+ end
+ end
+ end
+ end
+end
diff --git a/app/workers/gitlab/github_import/advance_stage_worker.rb b/app/workers/gitlab/github_import/advance_stage_worker.rb
index a012241e90c..417b8598547 100644
--- a/app/workers/gitlab/github_import/advance_stage_worker.rb
+++ b/app/workers/gitlab/github_import/advance_stage_worker.rb
@@ -11,12 +11,15 @@ module Gitlab
data_consistency :always
- sidekiq_options retry: 3
include ::Gitlab::Import::AdvanceStage
- sidekiq_options dead: false
- feature_category :importers
loggable_arguments 1, 2
+ sidekiq_options retry: 6
+
+ # TODO: Allow this class to include GithubImport::Queue and remove
+ # the following two lines https://gitlab.com/gitlab-org/gitlab/-/issues/435622
+ feature_category :importers
+ sidekiq_options dead: false
# The known importer stages and their corresponding Sidekiq workers.
STAGES = {
diff --git a/app/workers/gitlab/github_import/refresh_import_jid_worker.rb b/app/workers/gitlab/github_import/refresh_import_jid_worker.rb
index 3de4bef053f..dfc581f201b 100644
--- a/app/workers/gitlab/github_import/refresh_import_jid_worker.rb
+++ b/app/workers/gitlab/github_import/refresh_import_jid_worker.rb
@@ -9,8 +9,10 @@ module Gitlab
include GithubImport::Queue
+ sidekiq_options retry: 5
+
# The interval to schedule new instances of this job at.
- INTERVAL = 1.minute.to_i
+ INTERVAL = 5.minutes.to_i
def self.perform_in_the_future(*args)
perform_in(INTERVAL, *args)
@@ -23,9 +25,11 @@ module Gitlab
return unless import_state
if SidekiqStatus.running?(check_job_id)
- # As long as the repository is being cloned we want to keep refreshing
- # the import JID status.
- import_state.refresh_jid_expiration
+ # As long as the worker is running we want to keep refreshing
+ # the worker's JID as well as the import's JID.
+ Gitlab::SidekiqStatus.expire(check_job_id, Gitlab::Import::StuckImportJob::IMPORT_JOBS_EXPIRATION)
+ Gitlab::SidekiqStatus.set(import_state.jid, Gitlab::Import::StuckImportJob::IMPORT_JOBS_EXPIRATION)
+
self.class.perform_in_the_future(project_id, check_job_id)
end
diff --git a/app/workers/gitlab/github_import/stage/finish_import_worker.rb b/app/workers/gitlab/github_import/stage/finish_import_worker.rb
index 90445a6d46c..8d5a98136af 100644
--- a/app/workers/gitlab/github_import/stage/finish_import_worker.rb
+++ b/app/workers/gitlab/github_import/stage/finish_import_worker.rb
@@ -8,7 +8,6 @@ module Gitlab
data_consistency :always
- include GithubImport::Queue
include StageMethods
# project - An instance of Project.
diff --git a/app/workers/gitlab/github_import/stage/import_attachments_worker.rb b/app/workers/gitlab/github_import/stage/import_attachments_worker.rb
index a5d085a82c0..bbf762133e1 100644
--- a/app/workers/gitlab/github_import/stage/import_attachments_worker.rb
+++ b/app/workers/gitlab/github_import/stage/import_attachments_worker.rb
@@ -8,7 +8,6 @@ module Gitlab
data_consistency :always
- include GithubImport::Queue
include StageMethods
resumes_work_when_interrupted!
diff --git a/app/workers/gitlab/github_import/stage/import_base_data_worker.rb b/app/workers/gitlab/github_import/stage/import_base_data_worker.rb
index 5bbe14b6528..d965c1ae847 100644
--- a/app/workers/gitlab/github_import/stage/import_base_data_worker.rb
+++ b/app/workers/gitlab/github_import/stage/import_base_data_worker.rb
@@ -8,7 +8,6 @@ module Gitlab
data_consistency :always
- include GithubImport::Queue
include StageMethods
# These importers are fast enough that we can just run them in the same
diff --git a/app/workers/gitlab/github_import/stage/import_collaborators_worker.rb b/app/workers/gitlab/github_import/stage/import_collaborators_worker.rb
index 037b529b866..b5b1601e3ed 100644
--- a/app/workers/gitlab/github_import/stage/import_collaborators_worker.rb
+++ b/app/workers/gitlab/github_import/stage/import_collaborators_worker.rb
@@ -8,7 +8,6 @@ module Gitlab
data_consistency :always
- include GithubImport::Queue
include StageMethods
# client - An instance of Gitlab::GithubImport::Client.
diff --git a/app/workers/gitlab/github_import/stage/import_issue_events_worker.rb b/app/workers/gitlab/github_import/stage/import_issue_events_worker.rb
index 35779d7bfc5..27d14a1a108 100644
--- a/app/workers/gitlab/github_import/stage/import_issue_events_worker.rb
+++ b/app/workers/gitlab/github_import/stage/import_issue_events_worker.rb
@@ -8,7 +8,6 @@ module Gitlab
data_consistency :always
- include GithubImport::Queue
include StageMethods
resumes_work_when_interrupted!
diff --git a/app/workers/gitlab/github_import/stage/import_issues_and_diff_notes_worker.rb b/app/workers/gitlab/github_import/stage/import_issues_and_diff_notes_worker.rb
index 58e1f637b6a..595f0ca44d4 100644
--- a/app/workers/gitlab/github_import/stage/import_issues_and_diff_notes_worker.rb
+++ b/app/workers/gitlab/github_import/stage/import_issues_and_diff_notes_worker.rb
@@ -8,7 +8,6 @@ module Gitlab
data_consistency :always
- include GithubImport::Queue
include StageMethods
resumes_work_when_interrupted!
diff --git a/app/workers/gitlab/github_import/stage/import_lfs_objects_worker.rb b/app/workers/gitlab/github_import/stage/import_lfs_objects_worker.rb
index 8d7bd98f303..34c31fea726 100644
--- a/app/workers/gitlab/github_import/stage/import_lfs_objects_worker.rb
+++ b/app/workers/gitlab/github_import/stage/import_lfs_objects_worker.rb
@@ -8,7 +8,6 @@ module Gitlab
data_consistency :always
- include GithubImport::Queue
include StageMethods
# Importer::LfsObjectsImporter can resume work when interrupted as
diff --git a/app/workers/gitlab/github_import/stage/import_notes_worker.rb b/app/workers/gitlab/github_import/stage/import_notes_worker.rb
index 0459545d8e1..8aea27a94d4 100644
--- a/app/workers/gitlab/github_import/stage/import_notes_worker.rb
+++ b/app/workers/gitlab/github_import/stage/import_notes_worker.rb
@@ -8,7 +8,6 @@ module Gitlab
data_consistency :always
- include GithubImport::Queue
include StageMethods
resumes_work_when_interrupted!
diff --git a/app/workers/gitlab/github_import/stage/import_protected_branches_worker.rb b/app/workers/gitlab/github_import/stage/import_protected_branches_worker.rb
index e281e965f94..65b9d85f453 100644
--- a/app/workers/gitlab/github_import/stage/import_protected_branches_worker.rb
+++ b/app/workers/gitlab/github_import/stage/import_protected_branches_worker.rb
@@ -8,7 +8,6 @@ module Gitlab
data_consistency :always
- include GithubImport::Queue
include StageMethods
# client - An instance of Gitlab::GithubImport::Client.
diff --git a/app/workers/gitlab/github_import/stage/import_pull_requests_merged_by_worker.rb b/app/workers/gitlab/github_import/stage/import_pull_requests_merged_by_worker.rb
index 2f543951bf3..20b2e5ed6af 100644
--- a/app/workers/gitlab/github_import/stage/import_pull_requests_merged_by_worker.rb
+++ b/app/workers/gitlab/github_import/stage/import_pull_requests_merged_by_worker.rb
@@ -8,7 +8,6 @@ module Gitlab
data_consistency :always
- include GithubImport::Queue
include StageMethods
resumes_work_when_interrupted!
diff --git a/app/workers/gitlab/github_import/stage/import_pull_requests_review_requests_worker.rb b/app/workers/gitlab/github_import/stage/import_pull_requests_review_requests_worker.rb
index db76545ae87..1262fc23c6c 100644
--- a/app/workers/gitlab/github_import/stage/import_pull_requests_review_requests_worker.rb
+++ b/app/workers/gitlab/github_import/stage/import_pull_requests_review_requests_worker.rb
@@ -8,7 +8,6 @@ module Gitlab
data_consistency :always
- include GithubImport::Queue
include StageMethods
resumes_work_when_interrupted!
diff --git a/app/workers/gitlab/github_import/stage/import_pull_requests_reviews_worker.rb b/app/workers/gitlab/github_import/stage/import_pull_requests_reviews_worker.rb
index 31b7c57a524..bb4699889da 100644
--- a/app/workers/gitlab/github_import/stage/import_pull_requests_reviews_worker.rb
+++ b/app/workers/gitlab/github_import/stage/import_pull_requests_reviews_worker.rb
@@ -8,7 +8,6 @@ module Gitlab
data_consistency :always
- include GithubImport::Queue
include StageMethods
resumes_work_when_interrupted!
diff --git a/app/workers/gitlab/github_import/stage/import_pull_requests_worker.rb b/app/workers/gitlab/github_import/stage/import_pull_requests_worker.rb
index c68b95b5111..bcc39b169af 100644
--- a/app/workers/gitlab/github_import/stage/import_pull_requests_worker.rb
+++ b/app/workers/gitlab/github_import/stage/import_pull_requests_worker.rb
@@ -8,7 +8,6 @@ module Gitlab
data_consistency :always
- include GithubImport::Queue
include StageMethods
resumes_work_when_interrupted!
diff --git a/app/workers/gitlab/github_import/stage/import_repository_worker.rb b/app/workers/gitlab/github_import/stage/import_repository_worker.rb
index 2a62930b5ea..44481b8a75c 100644
--- a/app/workers/gitlab/github_import/stage/import_repository_worker.rb
+++ b/app/workers/gitlab/github_import/stage/import_repository_worker.rb
@@ -8,18 +8,11 @@ module Gitlab
data_consistency :always
- include GithubImport::Queue
include StageMethods
# client - An instance of Gitlab::GithubImport::Client.
# project - An instance of Project.
def import(client, project)
- # In extreme cases it's possible for a clone to take more than the
- # import job expiration time. To work around this we schedule a
- # separate job that will periodically run and refresh the import
- # expiration time.
- RefreshImportJidWorker.perform_in_the_future(project.id, jid)
-
info(project.id, message: "starting importer", importer: 'Importer::RepositoryImporter')
# If a user creates an issue while the import is in progress, this can lead to an import failure.
diff --git a/app/workers/gitlab/import/advance_stage.rb b/app/workers/gitlab/import/advance_stage.rb
index 782439894c0..709957556d3 100644
--- a/app/workers/gitlab/import/advance_stage.rb
+++ b/app/workers/gitlab/import/advance_stage.rb
@@ -37,6 +37,8 @@ module Gitlab
if new_job_count != previous_job_count
timeout_timer = Time.zone.now
previous_job_count = new_job_count
+
+ import_state_jid.refresh_jid_expiration
end
if new_waiters.empty?
diff --git a/app/workers/merge_request_cleanup_refs_worker.rb b/app/workers/merge_request_cleanup_refs_worker.rb
index db1a1e96997..36979e843ef 100644
--- a/app/workers/merge_request_cleanup_refs_worker.rb
+++ b/app/workers/merge_request_cleanup_refs_worker.rb
@@ -19,7 +19,7 @@ class MergeRequestCleanupRefsWorker
def perform_work
unless merge_request
- logger.error('No existing merge request to be cleaned up.')
+ logger.info('No existing merge request to be cleaned up.')
return
end
diff --git a/app/workers/packages/cleanup_package_registry_worker.rb b/app/workers/packages/cleanup_package_registry_worker.rb
index 5b2d8bacd62..50036923e94 100644
--- a/app/workers/packages/cleanup_package_registry_worker.rb
+++ b/app/workers/packages/cleanup_package_registry_worker.rb
@@ -14,6 +14,7 @@ module Packages
enqueue_package_file_cleanup_job if Packages::PackageFile.pending_destruction.exists?
enqueue_cleanup_policy_jobs if Packages::Cleanup::Policy.runnable.exists?
enqueue_cleanup_stale_npm_metadata_cache_job if Packages::Npm::MetadataCache.pending_destruction.exists?
+ enqueue_cleanup_stale_nuget_symbols_job if Packages::Nuget::Symbol.pending_destruction.exists?
log_counts
end
@@ -32,6 +33,10 @@ module Packages
Packages::Npm::CleanupStaleMetadataCacheWorker.perform_with_capacity
end
+ def enqueue_cleanup_stale_nuget_symbols_job
+ Packages::Nuget::CleanupStaleSymbolsWorker.perform_with_capacity
+ end
+
def log_counts
use_replica_if_available do
pending_destruction_package_files_count = Packages::PackageFile.pending_destruction.count
diff --git a/app/workers/packages/npm/create_metadata_cache_worker.rb b/app/workers/packages/npm/create_metadata_cache_worker.rb
index 0b6e34b13eb..cff7871dab7 100644
--- a/app/workers/packages/npm/create_metadata_cache_worker.rb
+++ b/app/workers/packages/npm/create_metadata_cache_worker.rb
@@ -16,7 +16,7 @@ module Packages
def perform(project_id, package_name)
project = Project.find_by_id(project_id)
- return unless project && Feature.enabled?(:npm_metadata_cache, project)
+ return unless project
::Packages::Npm::CreateMetadataCacheService
.new(project, package_name)
diff --git a/app/workers/packages/nuget/cleanup_stale_symbols_worker.rb b/app/workers/packages/nuget/cleanup_stale_symbols_worker.rb
new file mode 100644
index 00000000000..be90b86604c
--- /dev/null
+++ b/app/workers/packages/nuget/cleanup_stale_symbols_worker.rb
@@ -0,0 +1,46 @@
+# frozen_string_literal: true
+
+module Packages
+ module Nuget
+ class CleanupStaleSymbolsWorker
+ include ApplicationWorker
+ include ::Packages::CleanupArtifactWorker
+
+ MAX_CAPACITY = 2
+
+ data_consistency :sticky
+
+ queue_namespace :package_cleanup
+ feature_category :package_registry
+
+ deduplicate :until_executed
+ idempotent!
+
+ def max_running_jobs
+ MAX_CAPACITY
+ end
+
+ private
+
+ def model
+ Packages::Nuget::Symbol
+ end
+
+ def next_item
+ model.next_pending_destruction(order_by: nil)
+ end
+
+ def log_metadata(nuget_symbol)
+ log_extra_metadata_on_done(:nuget_symbol_id, nuget_symbol.id)
+ end
+
+ def log_cleanup_item(nuget_symbol)
+ logger.info(
+ structured_payload(
+ nuget_symbol_id: nuget_symbol.id
+ )
+ )
+ end
+ end
+ end
+end
diff --git a/app/workers/pages/deactivate_mr_deployments_worker.rb b/app/workers/pages/deactivate_mr_deployments_worker.rb
new file mode 100644
index 00000000000..910cae72d12
--- /dev/null
+++ b/app/workers/pages/deactivate_mr_deployments_worker.rb
@@ -0,0 +1,29 @@
+# frozen_string_literal: true
+
+module Pages
+ class DeactivateMrDeploymentsWorker
+ include ApplicationWorker
+
+ idempotent!
+ data_consistency :always # rubocop: disable SidekiqLoadBalancing/WorkerDataConsistency -- performing writes only
+ urgency :low
+
+ feature_category :pages
+
+ def perform(merge_request_id)
+ build_ids = Ci::Build.ids_in_merge_request(merge_request_id)
+ deactivate_deployments_with_build_ids(build_ids)
+ end
+
+ private
+
+ def deactivate_deployments_with_build_ids(build_ids)
+ PagesDeployment
+ .versioned
+ .ci_build_id_in(build_ids)
+ .each_batch do |batch|
+ batch.deactivate
+ end
+ end
+ end
+end
diff --git a/app/workers/pages/deactivated_deployments_delete_cron_worker.rb b/app/workers/pages/deactivated_deployments_delete_cron_worker.rb
index 75905759761..eeafed446c8 100644
--- a/app/workers/pages/deactivated_deployments_delete_cron_worker.rb
+++ b/app/workers/pages/deactivated_deployments_delete_cron_worker.rb
@@ -11,7 +11,7 @@ module Pages
feature_category :pages
def perform
- PagesDeployment.deactivated.each_batch do |deployments| # rubocop: disable Style/SymbolProc
+ PagesDeployment.deactivated.each_batch do |deployments|
deployments.each { |deployment| deployment.file.remove! }
deployments.delete_all
end
diff --git a/app/workers/pipeline_metrics_worker.rb b/app/workers/pipeline_metrics_worker.rb
index 4e98c7268ac..b45a1c33d5c 100644
--- a/app/workers/pipeline_metrics_worker.rb
+++ b/app/workers/pipeline_metrics_worker.rb
@@ -8,7 +8,7 @@ class PipelineMetricsWorker # rubocop:disable Scalability/IdempotentWorker
sidekiq_options retry: 3
include PipelineQueue
- urgency :high
+ urgency :low
def perform(pipeline_id)
Ci::Pipeline.find_by_id(pipeline_id).try do |pipeline|
diff --git a/app/workers/pipeline_schedule_worker.rb b/app/workers/pipeline_schedule_worker.rb
index ca589acf26c..6237f64fa86 100644
--- a/app/workers/pipeline_schedule_worker.rb
+++ b/app/workers/pipeline_schedule_worker.rb
@@ -10,6 +10,8 @@ class PipelineScheduleWorker # rubocop:disable Scalability/IdempotentWorker
LOCK_RETRY = 3
LOCK_TTL = 5.minutes
+ DELAY = 7.seconds
+ BATCH_SIZE = 500
feature_category :continuous_integration
worker_resource_boundary :cpu
@@ -20,12 +22,8 @@ class PipelineScheduleWorker # rubocop:disable Scalability/IdempotentWorker
.select(:id, :owner_id, :project_id) # Minimize the selected columns
.runnable_schedules
.preloaded
- .find_in_batches do |schedules|
- RunPipelineScheduleWorker.bulk_perform_async_with_contexts(
- schedules,
- arguments_proc: ->(schedule) { [schedule.id, schedule.owner_id, { scheduling: true }] },
- context_proc: ->(schedule) { { project: schedule.project, user: schedule.owner } }
- )
+ .find_in_batches(batch_size: BATCH_SIZE).with_index do |schedules, index| # rubocop: disable CodeReuse/ActiveRecord -- activates because of batch_size
+ enqueue_run_pipeline_schedule_worker(schedules, index)
end
end
end
@@ -42,4 +40,21 @@ class PipelineScheduleWorker # rubocop:disable Scalability/IdempotentWorker
retries: LOCK_RETRY
}
end
+
+ def enqueue_run_pipeline_schedule_worker(schedules, index)
+ if ::Feature.enabled?(:run_pipeline_schedule_worker_with_delay)
+ RunPipelineScheduleWorker.bulk_perform_in_with_contexts(
+ [1, index * DELAY].max,
+ schedules,
+ arguments_proc: ->(schedule) { [schedule.id, schedule.owner_id, { scheduling: true }] },
+ context_proc: ->(schedule) { { project: schedule.project, user: schedule.owner } }
+ )
+ else
+ RunPipelineScheduleWorker.bulk_perform_async_with_contexts(
+ schedules,
+ arguments_proc: ->(schedule) { [schedule.id, schedule.owner_id, { scheduling: true }] },
+ context_proc: ->(schedule) { { project: schedule.project, user: schedule.owner } }
+ )
+ end
+ end
end
diff --git a/app/workers/process_commit_worker.rb b/app/workers/process_commit_worker.rb
index cc72704d8c9..30e394a95cf 100644
--- a/app/workers/process_commit_worker.rb
+++ b/app/workers/process_commit_worker.rb
@@ -42,6 +42,8 @@ class ProcessCommitWorker
update_issue_metrics(commit, author)
end
+ private
+
def process_commit_message(project, commit, user, author, default = false)
# Ignore closing references from GitLab-generated commit messages.
find_closing_issues = default && !commit.merged_merge_request?(user)
diff --git a/app/workers/run_pipeline_schedule_worker.rb b/app/workers/run_pipeline_schedule_worker.rb
index 61ef7494d38..52d825e5421 100644
--- a/app/workers/run_pipeline_schedule_worker.rb
+++ b/app/workers/run_pipeline_schedule_worker.rb
@@ -10,7 +10,7 @@ class RunPipelineScheduleWorker # rubocop:disable Scalability/IdempotentWorker
queue_namespace :pipeline_creation
feature_category :continuous_integration
- deduplicate :until_executed
+ deduplicate :until_executed, including_scheduled: true
idempotent!
def perform(schedule_id, user_id, options = {})