Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'app/workers')
-rw-r--r--app/workers/all_queues.yml99
-rw-r--r--app/workers/bulk_imports/export_request_worker.rb58
-rw-r--r--app/workers/bulk_imports/finish_batched_pipeline_worker.rb45
-rw-r--r--app/workers/bulk_imports/finish_batched_relation_export_worker.rb2
-rw-r--r--app/workers/bulk_imports/pipeline_batch_worker.rb81
-rw-r--r--app/workers/bulk_imports/pipeline_worker.rb23
-rw-r--r--app/workers/bulk_imports/relation_export_worker.rb4
-rw-r--r--app/workers/ci/pipeline_cleanup_ref_worker.rb35
-rw-r--r--app/workers/clusters/integrations/check_prometheus_health_worker.rb24
-rw-r--r--app/workers/container_registry/cleanup_worker.rb17
-rw-r--r--app/workers/container_registry/record_data_repair_detail_worker.rb6
-rw-r--r--app/workers/integrations/execute_worker.rb2
-rw-r--r--app/workers/integrations/group_mention_worker.rb42
-rw-r--r--app/workers/merge_requests/cleanup_ref_worker.rb35
-rw-r--r--app/workers/merge_requests/mergeability_check_batch_worker.rb15
-rw-r--r--app/workers/metrics/dashboard/prune_old_annotations_worker.rb22
-rw-r--r--app/workers/metrics/dashboard/schedule_annotations_prune_worker.rb22
-rw-r--r--app/workers/metrics/dashboard/sync_dashboards_worker.rb19
-rw-r--r--app/workers/packages/debian/process_changes_worker.rb46
-rw-r--r--app/workers/redis_migration_worker.rb40
-rw-r--r--app/workers/run_pipeline_schedule_worker.rb7
21 files changed, 411 insertions, 233 deletions
diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml
index f8aa06943ee..6f6fd9ddb65 100644
--- a/app/workers/all_queues.yml
+++ b/app/workers/all_queues.yml
@@ -318,15 +318,6 @@
:weight: 1
:idempotent: false
:tags: []
-- :name: cronjob:clusters_integrations_check_prometheus_health
- :worker_name: Clusters::Integrations::CheckPrometheusHealthWorker
- :feature_category: :incident_management
- :has_external_dependencies: true
- :urgency: :low
- :resource_boundary: :unknown
- :weight: 1
- :idempotent: true
- :tags: []
- :name: cronjob:container_expiration_policy
:worker_name: ContainerExpirationPolicyWorker
:feature_category: :container_registry
@@ -561,15 +552,6 @@
:weight: 1
:idempotent: false
:tags: []
-- :name: cronjob:metrics_dashboard_schedule_annotations_prune
- :worker_name: Metrics::Dashboard::ScheduleAnnotationsPruneWorker
- :feature_category: :metrics
- :has_external_dependencies: false
- :urgency: :low
- :resource_boundary: :unknown
- :weight: 1
- :idempotent: true
- :tags: []
- :name: cronjob:metrics_global_metrics_update
:worker_name: Metrics::GlobalMetricsUpdateWorker
:feature_category: :metrics
@@ -1740,15 +1722,6 @@
:weight: 1
:idempotent: true
:tags: []
-- :name: package_repositories:packages_debian_process_changes
- :worker_name: Packages::Debian::ProcessChangesWorker
- :feature_category: :package_registry
- :has_external_dependencies: false
- :urgency: :low
- :resource_boundary: :unknown
- :weight: 1
- :idempotent: true
- :tags: []
- :name: package_repositories:packages_debian_process_package_file
:worker_name: Packages::Debian::ProcessPackageFileWorker
:feature_category: :package_registry
@@ -2001,6 +1974,15 @@
:weight: 3
:idempotent: false
:tags: []
+- :name: pipeline_default:ci_pipeline_cleanup_ref
+ :worker_name: Ci::PipelineCleanupRefWorker
+ :feature_category: :continuous_integration
+ :has_external_dependencies: false
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 3
+ :idempotent: true
+ :tags: []
- :name: pipeline_default:ci_retry_pipeline
:worker_name: Ci::RetryPipelineWorker
:feature_category: :continuous_integration
@@ -2424,6 +2406,15 @@
:weight: 1
:idempotent: true
:tags: []
+- :name: bulk_imports_finish_batched_pipeline
+ :worker_name: BulkImports::FinishBatchedPipelineWorker
+ :feature_category: :importers
+ :has_external_dependencies: false
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: bulk_imports_finish_batched_relation_export
:worker_name: BulkImports::FinishBatchedRelationExportWorker
:feature_category: :importers
@@ -2442,6 +2433,15 @@
:weight: 1
:idempotent: false
:tags: []
+- :name: bulk_imports_pipeline_batch
+ :worker_name: BulkImports::PipelineBatchWorker
+ :feature_category: :importers
+ :has_external_dependencies: true
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: false
+ :tags: []
- :name: bulk_imports_relation_batch_export
:worker_name: BulkImports::RelationBatchExportWorker
:feature_category: :importers
@@ -2856,6 +2856,15 @@
:weight: 1
:idempotent: false
:tags: []
+- :name: integrations_group_mention
+ :worker_name: Integrations::GroupMentionWorker
+ :feature_category: :integrations
+ :has_external_dependencies: true
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: integrations_irker
:worker_name: Integrations::IrkerWorker
:feature_category: :integrations
@@ -2973,6 +2982,15 @@
:weight: 1
:idempotent: true
:tags: []
+- :name: merge_requests_cleanup_ref
+ :worker_name: MergeRequests::CleanupRefWorker
+ :feature_category: :code_review_workflow
+ :has_external_dependencies: false
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: merge_requests_close_issue
:worker_name: MergeRequests::CloseIssueWorker
:feature_category: :code_review_workflow
@@ -3072,24 +3090,6 @@
:weight: 1
:idempotent: true
:tags: []
-- :name: metrics_dashboard_prune_old_annotations
- :worker_name: Metrics::Dashboard::PruneOldAnnotationsWorker
- :feature_category: :metrics
- :has_external_dependencies: false
- :urgency: :low
- :resource_boundary: :unknown
- :weight: 1
- :idempotent: true
- :tags: []
-- :name: metrics_dashboard_sync_dashboards
- :worker_name: Metrics::Dashboard::SyncDashboardsWorker
- :feature_category: :metrics
- :has_external_dependencies: false
- :urgency: :low
- :resource_boundary: :unknown
- :weight: 1
- :idempotent: true
- :tags: []
- :name: migrate_external_diffs
:worker_name: MigrateExternalDiffsWorker
:feature_category: :code_review_workflow
@@ -3486,6 +3486,15 @@
:weight: 2
:idempotent: false
:tags: []
+- :name: redis_migration
+ :worker_name: RedisMigrationWorker
+ :feature_category: :redis
+ :has_external_dependencies: false
+ :urgency: :throttled
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: releases_create_evidence
:worker_name: Releases::CreateEvidenceWorker
:feature_category: :release_evidence
diff --git a/app/workers/bulk_imports/export_request_worker.rb b/app/workers/bulk_imports/export_request_worker.rb
index 530419dac26..44759916f99 100644
--- a/app/workers/bulk_imports/export_request_worker.rb
+++ b/app/workers/bulk_imports/export_request_worker.rb
@@ -15,35 +15,40 @@ module BulkImports
end
def perform(entity_id)
- entity = BulkImports::Entity.find(entity_id)
+ @entity = BulkImports::Entity.find(entity_id)
- entity.update!(source_xid: entity_source_xid(entity)) if entity.source_xid.nil?
-
- request_export(entity)
+ set_source_xid
+ request_export
BulkImports::EntityWorker.perform_async(entity_id)
end
def perform_failure(exception, entity_id)
- entity = BulkImports::Entity.find(entity_id)
+ @entity = BulkImports::Entity.find(entity_id)
- log_and_fail(exception, entity)
+ log_and_fail(exception)
end
private
- def request_export(entity)
- http_client(entity).post(entity.export_relations_url_path)
+ attr_reader :entity
+
+ def set_source_xid
+ entity.update!(source_xid: entity_source_xid) if entity.source_xid.nil?
+ end
+
+ def request_export
+ http_client.post(export_url)
end
- def http_client(entity)
+ def http_client
@client ||= Clients::HTTP.new(
url: entity.bulk_import.configuration.url,
token: entity.bulk_import.configuration.access_token
)
end
- def failure_attributes(exception, entity)
+ def failure_attributes(exception)
{
bulk_import_entity_id: entity.id,
pipeline_class: 'ExportRequestWorker',
@@ -53,23 +58,20 @@ module BulkImports
}
end
- def graphql_client(entity)
+ def graphql_client
@graphql_client ||= BulkImports::Clients::Graphql.new(
url: entity.bulk_import.configuration.url,
token: entity.bulk_import.configuration.access_token
)
end
- def entity_source_xid(entity)
- query = entity_query(entity)
- client = graphql_client(entity)
-
- response = client.execute(
- client.parse(query.to_s),
+ def entity_source_xid
+ response = graphql_client.execute(
+ graphql_client.parse(entity_query.to_s),
{ full_path: entity.source_full_path }
).original_hash
- ::GlobalID.parse(response.dig(*query.data_path, 'id')).model_id
+ ::GlobalID.parse(response.dig(*entity_query.data_path, 'id')).model_id
rescue StandardError => e
log_exception(e,
{
@@ -86,12 +88,12 @@ module BulkImports
nil
end
- def entity_query(entity)
- if entity.group?
- BulkImports::Groups::Graphql::GetGroupQuery.new(context: nil)
- else
- BulkImports::Projects::Graphql::GetProjectQuery.new(context: nil)
- end
+ def entity_query
+ @entity_query ||= if entity.group?
+ BulkImports::Groups::Graphql::GetGroupQuery.new(context: nil)
+ else
+ BulkImports::Projects::Graphql::GetProjectQuery.new(context: nil)
+ end
end
def logger
@@ -104,7 +106,7 @@ module BulkImports
logger.error(structured_payload(payload))
end
- def log_and_fail(exception, entity)
+ def log_and_fail(exception)
log_exception(exception,
{
bulk_import_entity_id: entity.id,
@@ -117,9 +119,13 @@ module BulkImports
}
)
- BulkImports::Failure.create(failure_attributes(exception, entity))
+ BulkImports::Failure.create(failure_attributes(exception))
entity.fail_op!
end
+
+ def export_url
+ entity.export_relations_url_path(batched: Feature.enabled?(:bulk_imports_batched_import_export))
+ end
end
end
diff --git a/app/workers/bulk_imports/finish_batched_pipeline_worker.rb b/app/workers/bulk_imports/finish_batched_pipeline_worker.rb
new file mode 100644
index 00000000000..4200d0e4a0f
--- /dev/null
+++ b/app/workers/bulk_imports/finish_batched_pipeline_worker.rb
@@ -0,0 +1,45 @@
+# frozen_string_literal: true
+
+module BulkImports
+ class FinishBatchedPipelineWorker
+ include ApplicationWorker
+ include ExceptionBacktrace
+
+ REQUEUE_DELAY = 5.seconds
+
+ idempotent!
+ deduplicate :until_executing
+ data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency
+ feature_category :importers
+
+ def perform(pipeline_tracker_id)
+ @tracker = Tracker.find(pipeline_tracker_id)
+
+ return unless tracker.batched?
+ return unless tracker.started?
+ return re_enqueue if import_in_progress?
+
+ if tracker.stale?
+ tracker.batches.map(&:fail_op!)
+ tracker.fail_op!
+ else
+ tracker.finish!
+ end
+
+ ensure
+ ::BulkImports::EntityWorker.perform_async(tracker.entity.id, tracker.stage)
+ end
+
+ private
+
+ attr_reader :tracker
+
+ def re_enqueue
+ self.class.perform_in(REQUEUE_DELAY, tracker.id)
+ end
+
+ def import_in_progress?
+ tracker.batches.any?(&:started?)
+ end
+ end
+end
diff --git a/app/workers/bulk_imports/finish_batched_relation_export_worker.rb b/app/workers/bulk_imports/finish_batched_relation_export_worker.rb
index aa7bbffa732..92a33a971e7 100644
--- a/app/workers/bulk_imports/finish_batched_relation_export_worker.rb
+++ b/app/workers/bulk_imports/finish_batched_relation_export_worker.rb
@@ -5,7 +5,7 @@ module BulkImports
include ApplicationWorker
idempotent!
- data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency
+ data_consistency :sticky
feature_category :importers
REENQUEUE_DELAY = 5.seconds
diff --git a/app/workers/bulk_imports/pipeline_batch_worker.rb b/app/workers/bulk_imports/pipeline_batch_worker.rb
new file mode 100644
index 00000000000..378eff99b52
--- /dev/null
+++ b/app/workers/bulk_imports/pipeline_batch_worker.rb
@@ -0,0 +1,81 @@
+# frozen_string_literal: true
+
+module BulkImports
+ class PipelineBatchWorker # rubocop:disable Scalability/IdempotentWorker
+ include ApplicationWorker
+ include ExclusiveLeaseGuard
+
+ data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency
+ feature_category :importers
+ sidekiq_options retry: false, dead: false
+ worker_has_external_dependencies!
+
+ def perform(batch_id)
+ @batch = ::BulkImports::BatchTracker.find(batch_id)
+ @tracker = @batch.tracker
+
+ try_obtain_lease { run }
+ ensure
+ ::BulkImports::FinishBatchedPipelineWorker.perform_async(tracker.id)
+ end
+
+ private
+
+ attr_reader :batch, :tracker
+
+ def run
+ return batch.skip! if tracker.failed? || tracker.finished?
+
+ batch.start!
+ tracker.pipeline_class.new(context).run
+ batch.finish!
+ rescue BulkImports::RetryPipelineError => e
+ retry_batch(e)
+ rescue StandardError => e
+ fail_batch(e)
+ end
+
+ def fail_batch(exception)
+ batch.fail_op!
+
+ Gitlab::ErrorTracking.track_exception(
+ exception,
+ batch_id: batch.id,
+ tracker_id: tracker.id,
+ pipeline_class: tracker.pipeline_name,
+ pipeline_step: 'pipeline_batch_worker_run'
+ )
+
+ BulkImports::Failure.create(
+ bulk_import_entity_id: batch.tracker.entity.id,
+ pipeline_class: tracker.pipeline_name,
+ pipeline_step: 'pipeline_batch_worker_run',
+ exception_class: exception.class.to_s,
+ exception_message: exception.message.truncate(255),
+ correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id
+ )
+ end
+
+ def context
+ @context ||= ::BulkImports::Pipeline::Context.new(tracker, batch_number: batch.batch_number)
+ end
+
+ def retry_batch(exception)
+ batch.retry!
+
+ re_enqueue(exception.retry_delay)
+ end
+
+ def lease_timeout
+ 30
+ end
+
+ def lease_key
+ "gitlab:bulk_imports:pipeline_batch_worker:#{batch.id}"
+ end
+
+ def re_enqueue(delay = FILE_EXTRACTION_PIPELINE_PERFORM_DELAY)
+ self.class.perform_in(delay, batch.id)
+ end
+ end
+end
diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb
index f03e0bc0656..e0db18cb987 100644
--- a/app/workers/bulk_imports/pipeline_worker.rb
+++ b/app/workers/bulk_imports/pipeline_worker.rb
@@ -31,7 +31,6 @@ module BulkImports
fail_tracker(StandardError.new(message)) unless pipeline_tracker.finished? || pipeline_tracker.skipped?
end
end
-
ensure
::BulkImports::EntityWorker.perform_async(entity_id, stage)
end
@@ -49,9 +48,17 @@ module BulkImports
return re_enqueue if export_empty? || export_started?
- pipeline_tracker.update!(status_event: 'start', jid: jid)
- pipeline_tracker.pipeline_class.new(context).run
- pipeline_tracker.finish!
+ if file_extraction_pipeline? && export_status.batched?
+ pipeline_tracker.update!(status_event: 'start', jid: jid, batched: true)
+
+ return pipeline_tracker.finish! if export_status.batches_count < 1
+
+ enqueue_batches
+ else
+ pipeline_tracker.update!(status_event: 'start', jid: jid)
+ pipeline_tracker.pipeline_class.new(context).run
+ pipeline_tracker.finish!
+ end
rescue BulkImports::RetryPipelineError => e
retry_tracker(e)
rescue StandardError => e
@@ -179,5 +186,13 @@ module BulkImports
time_since_tracker_created > Pipeline::NDJSON_EXPORT_TIMEOUT
end
+
+ def enqueue_batches
+ 1.upto(export_status.batches_count) do |batch_number|
+ batch = pipeline_tracker.batches.find_or_create_by!(batch_number: batch_number) # rubocop:disable CodeReuse/ActiveRecord
+
+ ::BulkImports::PipelineBatchWorker.perform_async(batch.id)
+ end
+ end
end
end
diff --git a/app/workers/bulk_imports/relation_export_worker.rb b/app/workers/bulk_imports/relation_export_worker.rb
index b6693f0b07d..531edc6c7a7 100644
--- a/app/workers/bulk_imports/relation_export_worker.rb
+++ b/app/workers/bulk_imports/relation_export_worker.rb
@@ -18,9 +18,7 @@ module BulkImports
portable = portable(portable_id, portable_class)
config = BulkImports::FileTransfer.config_for(portable)
- if Feature.enabled?(:bulk_imports_batched_import_export) &&
- Gitlab::Utils.to_boolean(batched) &&
- config.batchable_relation?(relation)
+ if Gitlab::Utils.to_boolean(batched) && config.batchable_relation?(relation)
BatchedRelationExportService.new(user, portable, relation, jid).execute
else
RelationExportService.new(user, portable, relation, jid).execute
diff --git a/app/workers/ci/pipeline_cleanup_ref_worker.rb b/app/workers/ci/pipeline_cleanup_ref_worker.rb
new file mode 100644
index 00000000000..291e1090c18
--- /dev/null
+++ b/app/workers/ci/pipeline_cleanup_ref_worker.rb
@@ -0,0 +1,35 @@
+# frozen_string_literal: true
+
+module Ci
+ class PipelineCleanupRefWorker
+ include ApplicationWorker
+ include Projects::RemoveRefs
+
+ sidekiq_options retry: 3
+ include PipelineQueue
+
+ idempotent!
+ deduplicate :until_executed, if_deduplicated: :reschedule_once, ttl: 1.minute
+ data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency
+
+ urgency :low
+
+ # Even though this worker is de-duplicated we need to acquire lock
+ # on a project to avoid running many concurrent refs removals
+ #
+ # TODO: Once underlying fix is done we can remove `in_lock`
+ #
+ # Related to:
+ # - https://gitlab.com/gitlab-org/gitaly/-/issues/5368
+ # - https://gitlab.com/gitlab-org/gitaly/-/issues/5369
+ def perform(pipeline_id)
+ pipeline = Ci::Pipeline.find_by_id(pipeline_id)
+ return unless pipeline
+ return unless pipeline.persistent_ref.should_delete?
+
+ serialized_remove_refs(pipeline.project_id) do
+ pipeline.reset.persistent_ref.delete
+ end
+ end
+ end
+end
diff --git a/app/workers/clusters/integrations/check_prometheus_health_worker.rb b/app/workers/clusters/integrations/check_prometheus_health_worker.rb
deleted file mode 100644
index b65b3424c3a..00000000000
--- a/app/workers/clusters/integrations/check_prometheus_health_worker.rb
+++ /dev/null
@@ -1,24 +0,0 @@
-# frozen_string_literal: true
-
-module Clusters
- module Integrations
- class CheckPrometheusHealthWorker
- include ApplicationWorker
-
- data_consistency :always
-
- # rubocop:disable Scalability/CronWorkerContext
- # This worker does not perform work scoped to a context
- include CronjobQueue
- # rubocop:enable Scalability/CronWorkerContext
-
- feature_category :incident_management
- urgency :low
-
- idempotent!
- worker_has_external_dependencies!
-
- def perform; end
- end
- end
-end
diff --git a/app/workers/container_registry/cleanup_worker.rb b/app/workers/container_registry/cleanup_worker.rb
index 448a16ad309..9ec02dd613e 100644
--- a/app/workers/container_registry/cleanup_worker.rb
+++ b/app/workers/container_registry/cleanup_worker.rb
@@ -16,8 +16,6 @@ module ContainerRegistry
BATCH_SIZE = 200
def perform
- log_counts
-
reset_stale_deletes
delete_stale_ongoing_repair_details
@@ -54,26 +52,13 @@ module ContainerRegistry
end
def should_enqueue_record_detail_jobs?
- return false unless Gitlab.com?
+ return false unless Gitlab.com_except_jh?
return false unless Feature.enabled?(:registry_data_repair_worker)
return false unless ContainerRegistry::GitlabApiClient.supports_gitlab_api?
Project.pending_data_repair_analysis.exists?
end
- def log_counts
- ::Gitlab::Database::LoadBalancing::Session.current.use_replicas_for_read_queries do
- log_extra_metadata_on_done(
- :delete_scheduled_container_repositories_count,
- ContainerRepository.delete_scheduled.count
- )
- log_extra_metadata_on_done(
- :stale_delete_container_repositories_count,
- stale_delete_container_repositories.count
- )
- end
- end
-
def stale_delete_container_repositories
ContainerRepository.delete_ongoing.with_stale_delete_at(STALE_DELETE_THRESHOLD.ago)
end
diff --git a/app/workers/container_registry/record_data_repair_detail_worker.rb b/app/workers/container_registry/record_data_repair_detail_worker.rb
index 390481f8e01..3e40dbbb99a 100644
--- a/app/workers/container_registry/record_data_repair_detail_worker.rb
+++ b/app/workers/container_registry/record_data_repair_detail_worker.rb
@@ -17,7 +17,7 @@ module ContainerRegistry
LEASE_TIMEOUT = 1.hour.to_i
def perform_work
- return unless Gitlab.com?
+ return unless Gitlab.com_except_jh?
return unless next_project
return if next_project.container_registry_data_repair_detail
@@ -51,7 +51,7 @@ module ContainerRegistry
end
def remaining_work_count
- return 0 unless Gitlab.com?
+ return 0 unless Gitlab.com_except_jh?
return 0 unless Feature.enabled?(:registry_data_repair_worker)
return 0 unless ContainerRegistry::GitlabApiClient.supports_gitlab_api?
@@ -69,7 +69,7 @@ module ContainerRegistry
end
def next_project
- Project.pending_data_repair_analysis.first
+ Project.pending_data_repair_analysis.limit(max_running_jobs * 2).sample
end
strong_memoize_attr :next_project
diff --git a/app/workers/integrations/execute_worker.rb b/app/workers/integrations/execute_worker.rb
index 443f1d9fe8e..6fe1937a222 100644
--- a/app/workers/integrations/execute_worker.rb
+++ b/app/workers/integrations/execute_worker.rb
@@ -13,6 +13,8 @@ module Integrations
worker_has_external_dependencies!
def perform(hook_id, data)
+ return if ::Gitlab::SilentMode.enabled?
+
data = data.with_indifferent_access
integration = Integration.find_by_id(hook_id)
return unless integration
diff --git a/app/workers/integrations/group_mention_worker.rb b/app/workers/integrations/group_mention_worker.rb
new file mode 100644
index 00000000000..6cde1657ccd
--- /dev/null
+++ b/app/workers/integrations/group_mention_worker.rb
@@ -0,0 +1,42 @@
+# frozen_string_literal: true
+
+module Integrations
+ class GroupMentionWorker
+ include ApplicationWorker
+
+ idempotent!
+ feature_category :integrations
+ deduplicate :until_executed
+ data_consistency :delayed
+ urgency :low
+
+ worker_has_external_dependencies!
+
+ def perform(args)
+ args = args.with_indifferent_access
+
+ mentionable_type = args[:mentionable_type]
+ mentionable_id = args[:mentionable_id]
+ hook_data = args[:hook_data]
+ is_confidential = args[:is_confidential]
+
+ mentionable = case mentionable_type
+ when 'Issue'
+ Issue.find(mentionable_id)
+ when 'MergeRequest'
+ MergeRequest.find(mentionable_id)
+ end
+
+ if mentionable.nil?
+ Sidekiq.logger.error(
+ message: 'Integrations::GroupMentionWorker: mentionable not supported',
+ mentionable_type: mentionable_type,
+ mentionable_id: mentionable_id
+ )
+ return
+ end
+
+ Integrations::GroupMentionService.new(mentionable, hook_data: hook_data, is_confidential: is_confidential).execute
+ end
+ end
+end
diff --git a/app/workers/merge_requests/cleanup_ref_worker.rb b/app/workers/merge_requests/cleanup_ref_worker.rb
new file mode 100644
index 00000000000..c714b976a2b
--- /dev/null
+++ b/app/workers/merge_requests/cleanup_ref_worker.rb
@@ -0,0 +1,35 @@
+# frozen_string_literal: true
+
+module MergeRequests
+ class CleanupRefWorker
+ include ApplicationWorker
+ include Projects::RemoveRefs
+
+ sidekiq_options retry: 3
+ loggable_arguments 2
+ feature_category :code_review_workflow
+
+ idempotent!
+ deduplicate :until_executed, if_deduplicated: :reschedule_once, ttl: 1.minute
+ data_consistency :delayed
+
+ urgency :low
+
+ # Even though this worker is de-duplicated we need to acquire lock
+ # on a project to avoid running many concurrent refs removals
+ #
+ # TODO: Once underlying fix is done we can remove `in_lock`
+ #
+ # Related to:
+ # - https://gitlab.com/gitlab-org/gitaly/-/issues/5368
+ # - https://gitlab.com/gitlab-org/gitaly/-/issues/5369
+ def perform(merge_request_id, only)
+ merge_request = MergeRequest.find_by_id(merge_request_id)
+ return unless merge_request
+
+ serialized_remove_refs(merge_request.target_project_id) do
+ merge_request.cleanup_refs(only: only.to_sym)
+ end
+ end
+ end
+end
diff --git a/app/workers/merge_requests/mergeability_check_batch_worker.rb b/app/workers/merge_requests/mergeability_check_batch_worker.rb
index cbe34ac3790..f48e9c234ab 100644
--- a/app/workers/merge_requests/mergeability_check_batch_worker.rb
+++ b/app/workers/merge_requests/mergeability_check_batch_worker.rb
@@ -15,10 +15,16 @@ module MergeRequests
@logger ||= Sidekiq.logger
end
- def perform(merge_request_ids)
+ def perform(merge_request_ids, user_id)
merge_requests = MergeRequest.id_in(merge_request_ids)
+ user = User.find_by_id(user_id)
merge_requests.each do |merge_request|
+ # Skip projects that user doesn't have update_merge_request access
+ next if merge_status_recheck_not_allowed?(merge_request, user)
+
+ merge_request.mark_as_checking
+
result = merge_request.check_mergeability
next unless result&.error?
@@ -30,5 +36,12 @@ module MergeRequests
)
end
end
+
+ private
+
+ def merge_status_recheck_not_allowed?(merge_request, user)
+ ::Feature.enabled?(:restrict_merge_status_recheck, merge_request.project) &&
+ !Ability.allowed?(user, :update_merge_request, merge_request.project)
+ end
end
end
diff --git a/app/workers/metrics/dashboard/prune_old_annotations_worker.rb b/app/workers/metrics/dashboard/prune_old_annotations_worker.rb
deleted file mode 100644
index 5b34f85606d..00000000000
--- a/app/workers/metrics/dashboard/prune_old_annotations_worker.rb
+++ /dev/null
@@ -1,22 +0,0 @@
-# frozen_string_literal: true
-
-module Metrics
- module Dashboard
- class PruneOldAnnotationsWorker
- include ApplicationWorker
-
- data_consistency :always
-
- sidekiq_options retry: 3
-
- DELETE_LIMIT = 10_000
- DEFAULT_CUT_OFF_PERIOD = 2.weeks
-
- feature_category :metrics
-
- idempotent! # in the scope of 24 hours
-
- def perform; end
- end
- end
-end
diff --git a/app/workers/metrics/dashboard/schedule_annotations_prune_worker.rb b/app/workers/metrics/dashboard/schedule_annotations_prune_worker.rb
deleted file mode 100644
index fe002ffa4a0..00000000000
--- a/app/workers/metrics/dashboard/schedule_annotations_prune_worker.rb
+++ /dev/null
@@ -1,22 +0,0 @@
-# frozen_string_literal: true
-
-module Metrics
- module Dashboard
- class ScheduleAnnotationsPruneWorker
- include ApplicationWorker
-
- data_consistency :always
-
- # rubocop:disable Scalability/CronWorkerContext
- # This worker does not perform work scoped to a context
- include CronjobQueue
- # rubocop:enable Scalability/CronWorkerContext
-
- feature_category :metrics
-
- idempotent! # PruneOldAnnotationsWorker worker is idempotent in the scope of 24 hours
-
- def perform; end
- end
- end
-end
diff --git a/app/workers/metrics/dashboard/sync_dashboards_worker.rb b/app/workers/metrics/dashboard/sync_dashboards_worker.rb
deleted file mode 100644
index 668542e51a5..00000000000
--- a/app/workers/metrics/dashboard/sync_dashboards_worker.rb
+++ /dev/null
@@ -1,19 +0,0 @@
-# frozen_string_literal: true
-
-module Metrics
- module Dashboard
- class SyncDashboardsWorker
- include ApplicationWorker
-
- data_consistency :always
-
- sidekiq_options retry: 3
-
- feature_category :metrics
-
- idempotent!
-
- def perform(project_id); end
- end
- end
-end
diff --git a/app/workers/packages/debian/process_changes_worker.rb b/app/workers/packages/debian/process_changes_worker.rb
deleted file mode 100644
index 0a716c61203..00000000000
--- a/app/workers/packages/debian/process_changes_worker.rb
+++ /dev/null
@@ -1,46 +0,0 @@
-# frozen_string_literal: true
-
-module Packages
- module Debian
- class ProcessChangesWorker
- include ApplicationWorker
-
- data_consistency :always
- include Gitlab::Utils::StrongMemoize
-
- deduplicate :until_executed
- idempotent!
-
- queue_namespace :package_repositories
- feature_category :package_registry
-
- def perform(package_file_id, user_id)
- @package_file_id = package_file_id
- @user_id = user_id
-
- return unless package_file && user
-
- ::Packages::Debian::ProcessChangesService.new(package_file, user).execute
- rescue StandardError => e
- Gitlab::ErrorTracking.log_exception(e, package_file_id: @package_file_id, user_id: @user_id)
- package_file.destroy!
- end
-
- private
-
- attr_reader :package_file_id, :user_id
-
- def package_file
- strong_memoize(:package_file) do
- ::Packages::PackageFile.find_by_id(package_file_id)
- end
- end
-
- def user
- strong_memoize(:user) do
- ::User.find_by_id(user_id)
- end
- end
- end
- end
-end
diff --git a/app/workers/redis_migration_worker.rb b/app/workers/redis_migration_worker.rb
new file mode 100644
index 00000000000..bad9baeac70
--- /dev/null
+++ b/app/workers/redis_migration_worker.rb
@@ -0,0 +1,40 @@
+# frozen_string_literal: true
+
+class RedisMigrationWorker
+ include ApplicationWorker
+
+ idempotent!
+ data_consistency :delayed
+ feature_category :redis
+ urgency :throttled
+ loggable_arguments 0
+
+ SCAN_START_STOP = '0'
+
+ def perform(job_class_name, cursor, options = {})
+ migrator = self.class.fetch_migrator!(job_class_name)
+
+ scan_size = options[:scan_size] || 1000
+ deadline = Time.now.utc + 3.minutes
+
+ while Time.now.utc < deadline
+ cursor, keys = migrator.redis.scan(cursor, match: migrator.scan_match_pattern, count: scan_size)
+
+ migrator.perform(keys) if keys.any?
+
+ sleep(0.01)
+ break if cursor == SCAN_START_STOP
+ end
+
+ self.class.perform_async(job_class_name, cursor, options) unless cursor == SCAN_START_STOP
+ end
+
+ class << self
+ def fetch_migrator!(job_class_name)
+ job_class = "Gitlab::BackgroundMigration::Redis::#{job_class_name}".safe_constantize
+ raise NotImplementedError, "#{job_class_name} does not exist" if job_class.nil?
+
+ job_class.new
+ end
+ end
+end
diff --git a/app/workers/run_pipeline_schedule_worker.rb b/app/workers/run_pipeline_schedule_worker.rb
index 4ca366efcad..dab92e16ee3 100644
--- a/app/workers/run_pipeline_schedule_worker.rb
+++ b/app/workers/run_pipeline_schedule_worker.rb
@@ -33,10 +33,15 @@ class RunPipelineScheduleWorker # rubocop:disable Scalability/IdempotentWorker
def run_pipeline_schedule(schedule, user)
response = Ci::CreatePipelineService
.new(schedule.project, user, ref: schedule.ref)
- .execute(:schedule, ignore_skip_ci: true, save_on_errors: false, schedule: schedule)
+ .execute(
+ :schedule,
+ save_on_errors: Feature.enabled?(:persist_failed_pipelines_from_schedules, schedule.project),
+ ignore_skip_ci: true, schedule: schedule
+ )
return response if response.payload.persisted?
+ # Remove with FF persist_failed_pipelines_from_schedules enabled, as corrupted yml is not longer logged
# This is a user operation error such as corrupted .gitlab-ci.yml. Log the error for debugging purpose.
log_extra_metadata_on_done(:pipeline_creation_error, response.message)
rescue StandardError => e