Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGitLab Bot <gitlab-bot@gitlab.com>2023-11-14 11:41:52 +0300
committerGitLab Bot <gitlab-bot@gitlab.com>2023-11-14 11:41:52 +0300
commit585826cb22ecea5998a2c2a4675735c94bdeedac (patch)
tree5b05f0b30d33cef48963609e8a18a4dff260eab3 /app/workers
parentdf221d036e5d0c6c0ee4d55b9c97f481ee05dee8 (diff)
Add latest changes from gitlab-org/gitlab@16-6-stable-eev16.6.0-rc42
Diffstat (limited to 'app/workers')
-rw-r--r--app/workers/abuse/spam_abuse_events_worker.rb60
-rw-r--r--app/workers/activity_pub/projects/releases_subscription_worker.rb39
-rw-r--r--app/workers/all_queues.yml96
-rw-r--r--app/workers/bulk_import_worker.rb17
-rw-r--r--app/workers/bulk_imports/entity_worker.rb25
-rw-r--r--app/workers/bulk_imports/export_request_worker.rb8
-rw-r--r--app/workers/bulk_imports/finish_batched_pipeline_worker.rb24
-rw-r--r--app/workers/bulk_imports/pipeline_batch_worker.rb84
-rw-r--r--app/workers/bulk_imports/pipeline_worker.rb67
-rw-r--r--app/workers/bulk_imports/relation_batch_export_worker.rb19
-rw-r--r--app/workers/bulk_imports/relation_export_worker.rb28
-rw-r--r--app/workers/bulk_imports/stuck_import_worker.rb17
-rw-r--r--app/workers/ci/cancel_pipeline_worker.rb2
-rw-r--r--app/workers/ci/initial_pipeline_process_worker.rb14
-rw-r--r--app/workers/ci/refs/unlock_previous_pipelines_worker.rb4
-rw-r--r--app/workers/concerns/gitlab/github_import/rescheduling_methods.rb3
-rw-r--r--app/workers/concerns/gitlab/github_import/stage_methods.rb27
-rw-r--r--app/workers/concerns/worker_attributes.rb4
-rw-r--r--app/workers/environments/auto_recover_worker.rb22
-rw-r--r--app/workers/environments/auto_stop_cron_worker.rb1
-rw-r--r--app/workers/gitlab/github_import/stage/import_attachments_worker.rb6
-rw-r--r--app/workers/gitlab/github_import/stage/import_base_data_worker.rb2
-rw-r--r--app/workers/gitlab/github_import/stage/import_collaborators_worker.rb3
-rw-r--r--app/workers/gitlab/github_import/stage/import_issue_events_worker.rb4
-rw-r--r--app/workers/gitlab/github_import/stage/import_issues_and_diff_notes_worker.rb4
-rw-r--r--app/workers/gitlab/github_import/stage/import_lfs_objects_worker.rb7
-rw-r--r--app/workers/gitlab/github_import/stage/import_notes_worker.rb4
-rw-r--r--app/workers/gitlab/github_import/stage/import_protected_branches_worker.rb4
-rw-r--r--app/workers/gitlab/github_import/stage/import_pull_requests_merged_by_worker.rb6
-rw-r--r--app/workers/gitlab/github_import/stage/import_pull_requests_review_requests_worker.rb6
-rw-r--r--app/workers/gitlab/github_import/stage/import_pull_requests_reviews_worker.rb6
-rw-r--r--app/workers/gitlab/github_import/stage/import_pull_requests_worker.rb6
-rw-r--r--app/workers/gitlab/import/advance_stage.rb6
-rw-r--r--app/workers/gitlab/jira_import/stage/import_issues_worker.rb9
-rw-r--r--app/workers/hashed_storage/base_worker.rb24
-rw-r--r--app/workers/hashed_storage/migrator_worker.rb18
-rw-r--r--app/workers/hashed_storage/project_migrate_worker.rb18
-rw-r--r--app/workers/hashed_storage/project_rollback_worker.rb18
-rw-r--r--app/workers/hashed_storage/rollbacker_worker.rb18
-rw-r--r--app/workers/merge_request_cleanup_refs_worker.rb2
-rw-r--r--app/workers/merge_requests/set_reviewer_reviewed_worker.rb21
-rw-r--r--app/workers/packages/cleanup_package_registry_worker.rb5
-rw-r--r--app/workers/packages/npm/cleanup_stale_metadata_cache_worker.rb42
-rw-r--r--app/workers/packages/nuget/extraction_worker.rb2
-rw-r--r--app/workers/projects/import_export/after_import_merge_requests_worker.rb21
-rw-r--r--app/workers/remove_expired_group_links_worker.rb2
-rw-r--r--app/workers/repository_fork_worker.rb22
-rw-r--r--app/workers/schedule_merge_request_cleanup_refs_worker.rb1
-rw-r--r--app/workers/tasks_to_be_done/create_worker.rb18
49 files changed, 576 insertions, 290 deletions
diff --git a/app/workers/abuse/spam_abuse_events_worker.rb b/app/workers/abuse/spam_abuse_events_worker.rb
new file mode 100644
index 00000000000..7d86e994ae4
--- /dev/null
+++ b/app/workers/abuse/spam_abuse_events_worker.rb
@@ -0,0 +1,60 @@
+# frozen_string_literal: true
+
+module Abuse
+ class SpamAbuseEventsWorker
+ include ApplicationWorker
+
+ data_consistency :delayed
+
+ idempotent!
+ feature_category :instance_resiliency
+ urgency :low
+
+ def perform(params)
+ params = params.with_indifferent_access
+
+ @user = User.find_by_id(params[:user_id])
+ unless @user
+ logger.info(structured_payload(message: "User not found.", user_id: params[:user_id]))
+ return
+ end
+
+ report_user(params)
+ end
+
+ private
+
+ attr_reader :user
+
+ def report_user(params)
+ category = 'spam'
+ reporter = Users::Internal.security_bot
+ report_params = { user_id: params[:user_id],
+ reporter: reporter,
+ category: category,
+ message: 'User reported for abuse based on spam verdict' }
+
+ abuse_report = AbuseReport.by_category(category).by_reporter_id(reporter.id).by_user_id(params[:user_id]).first
+
+ abuse_report = AbuseReport.create!(report_params) if abuse_report.nil?
+
+ create_abuse_event(abuse_report.id, params)
+ end
+
+ # Associate the abuse report with an abuse event
+ def create_abuse_event(abuse_report_id, params)
+ Abuse::Event.create!(
+ abuse_report_id: abuse_report_id,
+ category: :spam,
+ metadata: { noteable_type: params[:noteable_type],
+ title: params[:title],
+ description: params[:description],
+ source_ip: params[:source_ip],
+ user_agent: params[:user_agent],
+ verdict: params[:verdict] },
+ source: :spamcheck,
+ user: user
+ )
+ end
+ end
+end
diff --git a/app/workers/activity_pub/projects/releases_subscription_worker.rb b/app/workers/activity_pub/projects/releases_subscription_worker.rb
new file mode 100644
index 00000000000..c392726a469
--- /dev/null
+++ b/app/workers/activity_pub/projects/releases_subscription_worker.rb
@@ -0,0 +1,39 @@
+# frozen_string_literal: true
+
+module ActivityPub
+ module Projects
+ class ReleasesSubscriptionWorker
+ include ApplicationWorker
+ include Gitlab::Routing.url_helpers
+
+ idempotent!
+ worker_has_external_dependencies!
+ feature_category :release_orchestration
+ data_consistency :delayed
+ queue_namespace :activity_pub
+
+ sidekiq_retries_exhausted do |msg, _ex|
+ subscription_id = msg['args'].second
+ subscription = ActivityPub::ReleasesSubscription.find_by_id(subscription_id)
+ subscription&.destroy
+ end
+
+ def perform(subscription_id)
+ subscription = ActivityPub::ReleasesSubscription.find_by_id(subscription_id)
+ return if subscription.nil?
+
+ unless subscription.project.public?
+ subscription.destroy
+ return
+ end
+
+ InboxResolverService.new(subscription).execute if needs_resolving?(subscription)
+ AcceptFollowService.new(subscription, project_releases_url(subscription.project)).execute
+ end
+
+ def needs_resolving?(subscription)
+ subscription.subscriber_inbox_url.blank? || subscription.shared_inbox_url.blank?
+ end
+ end
+ end
+end
diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml
index e5b860ba525..0bb88efe183 100644
--- a/app/workers/all_queues.yml
+++ b/app/workers/all_queues.yml
@@ -3,6 +3,15 @@
#
# Do not edit it manually!
---
+- :name: activity_pub:activity_pub_projects_releases_subscription
+ :worker_name: ActivityPub::Projects::ReleasesSubscriptionWorker
+ :feature_category: :release_orchestration
+ :has_external_dependencies: true
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: authorized_project_update:authorized_project_update_project_recalculate
:worker_name: AuthorizedProjectUpdate::ProjectRecalculateWorker
:feature_category: :system_access
@@ -1461,42 +1470,6 @@
:weight: 1
:idempotent: false
:tags: []
-- :name: hashed_storage:hashed_storage_migrator
- :worker_name: HashedStorage::MigratorWorker
- :feature_category: :source_code_management
- :has_external_dependencies: false
- :urgency: :low
- :resource_boundary: :unknown
- :weight: 1
- :idempotent: false
- :tags: []
-- :name: hashed_storage:hashed_storage_project_migrate
- :worker_name: HashedStorage::ProjectMigrateWorker
- :feature_category: :source_code_management
- :has_external_dependencies: false
- :urgency: :low
- :resource_boundary: :unknown
- :weight: 1
- :idempotent: false
- :tags: []
-- :name: hashed_storage:hashed_storage_project_rollback
- :worker_name: HashedStorage::ProjectRollbackWorker
- :feature_category: :source_code_management
- :has_external_dependencies: false
- :urgency: :low
- :resource_boundary: :unknown
- :weight: 1
- :idempotent: false
- :tags: []
-- :name: hashed_storage:hashed_storage_rollbacker
- :worker_name: HashedStorage::RollbackerWorker
- :feature_category: :source_code_management
- :has_external_dependencies: false
- :urgency: :low
- :resource_boundary: :unknown
- :weight: 1
- :idempotent: false
- :tags: []
- :name: incident_management:incident_management_add_severity_system_note
:worker_name: IncidentManagement::AddSeveritySystemNoteWorker
:feature_category: :incident_management
@@ -1767,6 +1740,15 @@
:weight: 1
:idempotent: true
:tags: []
+- :name: package_cleanup:packages_npm_cleanup_stale_metadata_cache
+ :worker_name: Packages::Npm::CleanupStaleMetadataCacheWorker
+ :feature_category: :package_registry
+ :has_external_dependencies: false
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: package_repositories:packages_debian_generate_distribution
:worker_name: Packages::Debian::GenerateDistributionWorker
:feature_category: :package_registry
@@ -2307,6 +2289,15 @@
:weight: 1
:idempotent: true
:tags: []
+- :name: abuse_spam_abuse_events
+ :worker_name: Abuse::SpamAbuseEventsWorker
+ :feature_category: :instance_resiliency
+ :has_external_dependencies: false
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: analytics_usage_trends_counter_job
:worker_name: Analytics::UsageTrends::CounterJobWorker
:feature_category: :devops_reports
@@ -2575,7 +2566,7 @@
:urgency: :low
:resource_boundary: :unknown
:weight: 1
- :idempotent: false
+ :idempotent: true
:tags: []
- :name: bulk_imports_entity
:worker_name: BulkImports::EntityWorker
@@ -2629,7 +2620,7 @@
:urgency: :low
:resource_boundary: :memory
:weight: 1
- :idempotent: false
+ :idempotent: true
:tags: []
- :name: bulk_imports_pipeline_batch
:worker_name: BulkImports::PipelineBatchWorker
@@ -2638,7 +2629,7 @@
:urgency: :low
:resource_boundary: :memory
:weight: 1
- :idempotent: false
+ :idempotent: true
:tags: []
- :name: bulk_imports_relation_batch_export
:worker_name: BulkImports::RelationBatchExportWorker
@@ -2892,6 +2883,15 @@
:weight: 2
:idempotent: false
:tags: []
+- :name: environments_auto_recover
+ :worker_name: Environments::AutoRecoverWorker
+ :feature_category: :continuous_delivery
+ :has_external_dependencies: false
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: environments_auto_stop
:worker_name: Environments::AutoStopWorker
:feature_category: :continuous_delivery
@@ -3567,6 +3567,15 @@
:weight: 1
:idempotent: false
:tags: []
+- :name: projects_import_export_after_import_merge_requests
+ :worker_name: Projects::ImportExport::AfterImportMergeRequestsWorker
+ :feature_category: :importers
+ :has_external_dependencies: false
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: projects_import_export_create_relation_exports
:worker_name: Projects::ImportExport::CreateRelationExportsWorker
:feature_category: :importers
@@ -3837,15 +3846,6 @@
:weight: 1
:idempotent: false
:tags: []
-- :name: tasks_to_be_done_create
- :worker_name: TasksToBeDone::CreateWorker
- :feature_category: :onboarding
- :has_external_dependencies: false
- :urgency: :low
- :resource_boundary: :cpu
- :weight: 1
- :idempotent: true
- :tags: []
- :name: update_external_pull_requests
:worker_name: UpdateExternalPullRequestsWorker
:feature_category: :continuous_integration
diff --git a/app/workers/bulk_import_worker.rb b/app/workers/bulk_import_worker.rb
index 5b9b46081cc..70e7d82741f 100644
--- a/app/workers/bulk_import_worker.rb
+++ b/app/workers/bulk_import_worker.rb
@@ -1,11 +1,16 @@
# frozen_string_literal: true
-class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker
+class BulkImportWorker
include ApplicationWorker
data_consistency :always
feature_category :importers
- sidekiq_options retry: false, dead: false
+ sidekiq_options retry: 3, dead: false
+ idempotent!
+
+ sidekiq_retries_exhausted do |msg, exception|
+ new.perform_failure(exception, msg['args'].first)
+ end
def perform(bulk_import_id)
bulk_import = BulkImport.find_by_id(bulk_import_id)
@@ -13,4 +18,12 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker
BulkImports::ProcessService.new(bulk_import).execute
end
+
+ def perform_failure(exception, bulk_import_id)
+ bulk_import = BulkImport.find_by_id(bulk_import_id)
+
+ Gitlab::ErrorTracking.track_exception(exception, bulk_import_id: bulk_import.id)
+
+ bulk_import.fail_op
+ end
end
diff --git a/app/workers/bulk_imports/entity_worker.rb b/app/workers/bulk_imports/entity_worker.rb
index 9b60dcdeb8a..e510a8c0d06 100644
--- a/app/workers/bulk_imports/entity_worker.rb
+++ b/app/workers/bulk_imports/entity_worker.rb
@@ -5,12 +5,16 @@ module BulkImports
include ApplicationWorker
idempotent!
- deduplicate :until_executed
+ deduplicate :until_executed, if_deduplicated: :reschedule_once
data_consistency :always
feature_category :importers
- sidekiq_options retry: false, dead: false
+ sidekiq_options retry: 3, dead: false
worker_has_external_dependencies!
+ sidekiq_retries_exhausted do |msg, exception|
+ new.perform_failure(exception, msg['args'].first)
+ end
+
PERFORM_DELAY = 5.seconds
# Keep `_current_stage` parameter for backwards compatibility.
@@ -27,10 +31,17 @@ module BulkImports
end
re_enqueue
- rescue StandardError => e
- Gitlab::ErrorTracking.track_exception(e, log_params(message: 'Entity failed'))
+ end
+
+ def perform_failure(exception, entity_id)
+ @entity = ::BulkImports::Entity.find(entity_id)
+
+ Gitlab::ErrorTracking.track_exception(
+ exception,
+ log_params(message: "Request to export #{entity.source_type} failed")
+ )
- @entity.fail_op!
+ entity.fail_op!
end
private
@@ -68,7 +79,7 @@ module BulkImports
end
def logger
- @logger ||= Gitlab::Import::Logger.build
+ @logger ||= Logger.build
end
def log_exception(exception, payload)
@@ -88,7 +99,7 @@ module BulkImports
bulk_import_entity_type: entity.source_type,
source_full_path: entity.source_full_path,
source_version: source_version,
- importer: 'gitlab_migration'
+ importer: Logger::IMPORTER_NAME
}
defaults.merge(extra)
diff --git a/app/workers/bulk_imports/export_request_worker.rb b/app/workers/bulk_imports/export_request_worker.rb
index 44759916f99..f7456ddccb1 100644
--- a/app/workers/bulk_imports/export_request_worker.rb
+++ b/app/workers/bulk_imports/export_request_worker.rb
@@ -80,8 +80,7 @@ module BulkImports
bulk_import_id: entity.bulk_import_id,
bulk_import_entity_type: entity.source_type,
source_full_path: entity.source_full_path,
- source_version: entity.bulk_import.source_version_info.to_s,
- importer: 'gitlab_migration'
+ source_version: entity.bulk_import.source_version_info.to_s
}
)
@@ -97,7 +96,7 @@ module BulkImports
end
def logger
- @logger ||= Gitlab::Import::Logger.build
+ @logger ||= Logger.build
end
def log_exception(exception, payload)
@@ -114,8 +113,7 @@ module BulkImports
bulk_import_entity_type: entity.source_type,
source_full_path: entity.source_full_path,
message: "Request to export #{entity.source_type} failed",
- source_version: entity.bulk_import.source_version_info.to_s,
- importer: 'gitlab_migration'
+ source_version: entity.bulk_import.source_version_info.to_s
}
)
diff --git a/app/workers/bulk_imports/finish_batched_pipeline_worker.rb b/app/workers/bulk_imports/finish_batched_pipeline_worker.rb
index b1f3757e058..40d26e14dc1 100644
--- a/app/workers/bulk_imports/finish_batched_pipeline_worker.rb
+++ b/app/workers/bulk_imports/finish_batched_pipeline_worker.rb
@@ -16,22 +16,21 @@ module BulkImports
def perform(pipeline_tracker_id)
@tracker = Tracker.find(pipeline_tracker_id)
+ @context = ::BulkImports::Pipeline::Context.new(tracker)
return unless tracker.batched?
return unless tracker.started?
return re_enqueue if import_in_progress?
if tracker.stale?
+ logger.error(log_attributes(message: 'Tracker stale. Failing batches and tracker'))
tracker.batches.map(&:fail_op!)
tracker.fail_op!
else
+ tracker.pipeline_class.new(@context).on_finish
+ logger.info(log_attributes(message: 'Tracker finished'))
tracker.finish!
end
-
- ensure
- # This is needed for in-flight migrations.
- # It will be remove in https://gitlab.com/gitlab-org/gitlab/-/issues/426299
- ::BulkImports::EntityWorker.perform_async(tracker.entity.id) if job_version.nil?
end
private
@@ -45,5 +44,20 @@ module BulkImports
def import_in_progress?
tracker.batches.any? { |b| b.started? || b.created? }
end
+
+ def logger
+ @logger ||= Logger.build
+ end
+
+ def log_attributes(extra = {})
+ structured_payload(
+ {
+ tracker_id: tracker.id,
+ bulk_import_id: tracker.entity.id,
+ bulk_import_entity_id: tracker.entity.bulk_import_id,
+ pipeline_class: tracker.pipeline_name
+ }.merge(extra)
+ )
+ end
end
end
diff --git a/app/workers/bulk_imports/pipeline_batch_worker.rb b/app/workers/bulk_imports/pipeline_batch_worker.rb
index 6230d517641..1485275e616 100644
--- a/app/workers/bulk_imports/pipeline_batch_worker.rb
+++ b/app/workers/bulk_imports/pipeline_batch_worker.rb
@@ -1,26 +1,65 @@
# frozen_string_literal: true
module BulkImports
- class PipelineBatchWorker # rubocop:disable Scalability/IdempotentWorker
+ class PipelineBatchWorker
include ApplicationWorker
include ExclusiveLeaseGuard
+ DEFER_ON_HEALTH_DELAY = 5.minutes
+
data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency
feature_category :importers
- sidekiq_options retry: false, dead: false
+ sidekiq_options dead: false, retry: 3
worker_has_external_dependencies!
worker_resource_boundary :memory
+ idempotent!
+
+ sidekiq_retries_exhausted do |msg, exception|
+ new.perform_failure(msg['args'].first, exception)
+ end
+
+ defer_on_database_health_signal(:gitlab_main, [], DEFER_ON_HEALTH_DELAY) do |job_args, schema, tables|
+ batch = ::BulkImports::BatchTracker.find(job_args.first)
+ pipeline_tracker = batch.tracker
+ pipeline_schema = ::BulkImports::PipelineSchemaInfo.new(
+ pipeline_tracker.pipeline_class,
+ pipeline_tracker.entity.portable_class
+ )
+
+ if pipeline_schema.db_schema && pipeline_schema.db_table
+ schema = pipeline_schema.db_schema
+ tables = [pipeline_schema.db_table]
+ end
+
+ [schema, tables]
+ end
+
+ def self.defer_on_database_health_signal?
+ Feature.enabled?(:bulk_import_deferred_workers)
+ end
def perform(batch_id)
@batch = ::BulkImports::BatchTracker.find(batch_id)
+
@tracker = @batch.tracker
@pending_retry = false
+ return unless process_batch?
+
+ log_extra_metadata_on_done(:pipeline_class, @tracker.pipeline_name)
+
try_obtain_lease { run }
ensure
::BulkImports::FinishBatchedPipelineWorker.perform_async(tracker.id) unless pending_retry
end
+ def perform_failure(batch_id, exception)
+ @batch = ::BulkImports::BatchTracker.find(batch_id)
+ @tracker = @batch.tracker
+
+ fail_batch(exception)
+ end
+
private
attr_reader :batch, :tracker, :pending_retry
@@ -28,35 +67,31 @@ module BulkImports
def run
return batch.skip! if tracker.failed? || tracker.finished?
+ logger.info(log_attributes(message: 'Batch tracker started'))
batch.start!
tracker.pipeline_class.new(context).run
batch.finish!
+ logger.info(log_attributes(message: 'Batch tracker finished'))
rescue BulkImports::RetryPipelineError => e
@pending_retry = true
retry_batch(e)
- rescue StandardError => e
- fail_batch(e)
end
def fail_batch(exception)
batch.fail_op!
- Gitlab::ErrorTracking.track_exception(
- exception,
- batch_id: batch.id,
- tracker_id: tracker.id,
- pipeline_class: tracker.pipeline_name,
- pipeline_step: 'pipeline_batch_worker_run'
- )
+ Gitlab::ErrorTracking.track_exception(exception, log_attributes(message: 'Batch tracker failed'))
BulkImports::Failure.create(
bulk_import_entity_id: batch.tracker.entity.id,
pipeline_class: tracker.pipeline_name,
pipeline_step: 'pipeline_batch_worker_run',
exception_class: exception.class.to_s,
- exception_message: exception.message.truncate(255),
+ exception_message: exception.message,
correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id
)
+
+ ::BulkImports::FinishBatchedPipelineWorker.perform_async(tracker.id)
end
def context
@@ -78,7 +113,32 @@ module BulkImports
end
def re_enqueue(delay = FILE_EXTRACTION_PIPELINE_PERFORM_DELAY)
+ log_extra_metadata_on_done(:re_enqueue, true)
+
self.class.perform_in(delay, batch.id)
end
+
+ def process_batch?
+ batch.created? || batch.started?
+ end
+
+ def logger
+ @logger ||= Logger.build
+ end
+
+ def log_attributes(extra = {})
+ structured_payload(
+ {
+ batch_id: batch.id,
+ batch_number: batch.batch_number,
+ tracker_id: tracker.id,
+ bulk_import_id: tracker.entity.bulk_import_id,
+ bulk_import_entity_id: tracker.entity.id,
+ pipeline_class: tracker.pipeline_name,
+ pipeline_step: 'pipeline_batch_worker_run',
+ importer: Logger::IMPORTER_NAME
+ }.merge(extra)
+ )
+ end
end
end
diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb
index 24185f43795..2c1d28b33c5 100644
--- a/app/workers/bulk_imports/pipeline_worker.rb
+++ b/app/workers/bulk_imports/pipeline_worker.rb
@@ -1,43 +1,68 @@
# frozen_string_literal: true
module BulkImports
- class PipelineWorker # rubocop:disable Scalability/IdempotentWorker
+ class PipelineWorker
include ApplicationWorker
include ExclusiveLeaseGuard
FILE_EXTRACTION_PIPELINE_PERFORM_DELAY = 10.seconds
+ DEFER_ON_HEALTH_DELAY = 5.minutes
+
data_consistency :always
feature_category :importers
- sidekiq_options retry: false, dead: false
+ sidekiq_options dead: false, retry: 3
worker_has_external_dependencies!
deduplicate :until_executing
worker_resource_boundary :memory
+ idempotent!
version 2
+ sidekiq_retries_exhausted do |msg, exception|
+ new.perform_failure(msg['args'][0], msg['args'][2], exception)
+ end
+
+ defer_on_database_health_signal(:gitlab_main, [], DEFER_ON_HEALTH_DELAY) do |job_args, schema, tables|
+ pipeline_tracker = ::BulkImports::Tracker.find(job_args.first)
+ pipeline_schema = ::BulkImports::PipelineSchemaInfo.new(
+ pipeline_tracker.pipeline_class,
+ pipeline_tracker.entity.portable_class
+ )
+
+ if pipeline_schema.db_schema && pipeline_schema.db_table
+ schema = pipeline_schema.db_schema
+ tables = [pipeline_schema.db_table]
+ end
+
+ [schema, tables]
+ end
+
+ def self.defer_on_database_health_signal?
+ Feature.enabled?(:bulk_import_deferred_workers)
+ end
+
# Keep _stage parameter for backwards compatibility.
def perform(pipeline_tracker_id, _stage, entity_id)
@entity = ::BulkImports::Entity.find(entity_id)
@pipeline_tracker = ::BulkImports::Tracker.find(pipeline_tracker_id)
+ log_extra_metadata_on_done(:pipeline_class, @pipeline_tracker.pipeline_name)
+
try_obtain_lease do
- if pipeline_tracker.enqueued?
+ if pipeline_tracker.enqueued? || pipeline_tracker.started?
logger.info(log_attributes(message: 'Pipeline starting'))
run
- else
- message = "Pipeline in #{pipeline_tracker.human_status_name} state instead of expected enqueued state"
-
- logger.error(log_attributes(message: message))
-
- fail_tracker(StandardError.new(message)) unless pipeline_tracker.finished? || pipeline_tracker.skipped?
end
end
- ensure
- # This is needed for in-flight migrations.
- # It will be remove in https://gitlab.com/gitlab-org/gitlab/-/issues/426299
- ::BulkImports::EntityWorker.perform_async(entity_id) if job_version.nil?
+ end
+
+ def perform_failure(pipeline_tracker_id, entity_id, exception)
+ @entity = ::BulkImports::Entity.find(entity_id)
+ @pipeline_tracker = ::BulkImports::Tracker.find(pipeline_tracker_id)
+
+ fail_tracker(exception)
end
private
@@ -53,20 +78,22 @@ module BulkImports
return re_enqueue if export_empty? || export_started?
if file_extraction_pipeline? && export_status.batched?
+ log_extra_metadata_on_done(:batched, true)
+
pipeline_tracker.update!(status_event: 'start', jid: jid, batched: true)
return pipeline_tracker.finish! if export_status.batches_count < 1
enqueue_batches
else
+ log_extra_metadata_on_done(:batched, false)
+
pipeline_tracker.update!(status_event: 'start', jid: jid)
pipeline_tracker.pipeline_class.new(context).run
pipeline_tracker.finish!
end
rescue BulkImports::RetryPipelineError => e
retry_tracker(e)
- rescue StandardError => e
- fail_tracker(e)
end
def source_version
@@ -85,16 +112,18 @@ module BulkImports
pipeline_class: pipeline_tracker.pipeline_name,
pipeline_step: 'pipeline_worker_run',
exception_class: exception.class.to_s,
- exception_message: exception.message.truncate(255),
+ exception_message: exception.message,
correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id
)
end
def logger
- @logger ||= Gitlab::Import::Logger.build
+ @logger ||= Logger.build
end
def re_enqueue(delay = FILE_EXTRACTION_PIPELINE_PERFORM_DELAY)
+ log_extra_metadata_on_done(:re_enqueue, true)
+
self.class.perform_in(
delay,
pipeline_tracker.id,
@@ -159,10 +188,10 @@ module BulkImports
bulk_import_entity_type: entity.source_type,
source_full_path: entity.source_full_path,
pipeline_tracker_id: pipeline_tracker.id,
- pipeline_name: pipeline_tracker.pipeline_name,
+ pipeline_class: pipeline_tracker.pipeline_name,
pipeline_tracker_state: pipeline_tracker.human_status_name,
source_version: source_version,
- importer: 'gitlab_migration'
+ importer: Logger::IMPORTER_NAME
}.merge(extra)
)
end
diff --git a/app/workers/bulk_imports/relation_batch_export_worker.rb b/app/workers/bulk_imports/relation_batch_export_worker.rb
index 4ce36929e15..87ceb775075 100644
--- a/app/workers/bulk_imports/relation_batch_export_worker.rb
+++ b/app/workers/bulk_imports/relation_batch_export_worker.rb
@@ -7,10 +7,25 @@ module BulkImports
idempotent!
data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency
feature_category :importers
- sidekiq_options status_expiration: StuckExportJobsWorker::EXPORT_JOBS_EXPIRATION
+ sidekiq_options status_expiration: StuckExportJobsWorker::EXPORT_JOBS_EXPIRATION, retry: 3
+
+ sidekiq_retries_exhausted do |job, exception|
+ batch = BulkImports::ExportBatch.find(job['args'][1])
+ portable = batch.export.portable
+
+ Gitlab::ErrorTracking.track_exception(exception, portable_id: portable.id, portable_type: portable.class.name)
+
+ batch.update!(status_event: 'fail_op', error: exception.message.truncate(255))
+ end
def perform(user_id, batch_id)
- RelationBatchExportService.new(user_id, batch_id).execute
+ @user = User.find(user_id)
+ @batch = BulkImports::ExportBatch.find(batch_id)
+
+ log_extra_metadata_on_done(:relation, @batch.export.relation)
+ log_extra_metadata_on_done(:objects_count, @batch.objects_count)
+
+ RelationBatchExportService.new(@user, @batch).execute
end
end
end
diff --git a/app/workers/bulk_imports/relation_export_worker.rb b/app/workers/bulk_imports/relation_export_worker.rb
index 531edc6c7a7..168626fee85 100644
--- a/app/workers/bulk_imports/relation_export_worker.rb
+++ b/app/workers/bulk_imports/relation_export_worker.rb
@@ -10,25 +10,37 @@ module BulkImports
loggable_arguments 2, 3
data_consistency :always
feature_category :importers
- sidekiq_options status_expiration: StuckExportJobsWorker::EXPORT_JOBS_EXPIRATION
+ sidekiq_options status_expiration: StuckExportJobsWorker::EXPORT_JOBS_EXPIRATION, retry: 3
worker_resource_boundary :memory
+ sidekiq_retries_exhausted do |job, exception|
+ _user_id, portable_id, portable_type, relation, batched = job['args']
+ portable = portable(portable_id, portable_type)
+
+ export = portable.bulk_import_exports.find_by_relation(relation)
+
+ Gitlab::ErrorTracking.track_exception(exception, portable_id: portable_id, portable_type: portable.class.name)
+
+ export.update!(status_event: 'fail_op', error: exception.message.truncate(255), batched: batched)
+ end
+
+ def self.portable(portable_id, portable_class)
+ portable_class.classify.constantize.find(portable_id)
+ end
+
def perform(user_id, portable_id, portable_class, relation, batched = false)
user = User.find(user_id)
- portable = portable(portable_id, portable_class)
+ portable = self.class.portable(portable_id, portable_class)
config = BulkImports::FileTransfer.config_for(portable)
+ log_extra_metadata_on_done(:relation, relation)
if Gitlab::Utils.to_boolean(batched) && config.batchable_relation?(relation)
+ log_extra_metadata_on_done(:batched, true)
BatchedRelationExportService.new(user, portable, relation, jid).execute
else
+ log_extra_metadata_on_done(:batched, false)
RelationExportService.new(user, portable, relation, jid).execute
end
end
-
- private
-
- def portable(portable_id, portable_class)
- portable_class.classify.constantize.find(portable_id)
- end
end
end
diff --git a/app/workers/bulk_imports/stuck_import_worker.rb b/app/workers/bulk_imports/stuck_import_worker.rb
index 3fa4221728b..6c8569b0aa0 100644
--- a/app/workers/bulk_imports/stuck_import_worker.rb
+++ b/app/workers/bulk_imports/stuck_import_worker.rb
@@ -14,18 +14,29 @@ module BulkImports
def perform
BulkImport.stale.find_each do |import|
+ logger.error(message: 'BulkImport stale', bulk_import_id: import.id)
import.cleanup_stale
end
- BulkImports::Entity.includes(:trackers).stale.find_each do |import| # rubocop: disable CodeReuse/ActiveRecord
+ BulkImports::Entity.includes(:trackers).stale.find_each do |entity| # rubocop: disable CodeReuse/ActiveRecord
ApplicationRecord.transaction do
- import.cleanup_stale
+ logger.error(
+ message: 'BulkImports::Entity stale',
+ bulk_import_id: entity.bulk_import_id,
+ bulk_import_entity_id: entity.id
+ )
- import.trackers.find_each do |tracker|
+ entity.cleanup_stale
+
+ entity.trackers.find_each do |tracker|
tracker.cleanup_stale
end
end
end
end
+
+ def logger
+ @logger ||= Logger.build
+ end
end
end
diff --git a/app/workers/ci/cancel_pipeline_worker.rb b/app/workers/ci/cancel_pipeline_worker.rb
index 0b2c96e7ace..f099e185629 100644
--- a/app/workers/ci/cancel_pipeline_worker.rb
+++ b/app/workers/ci/cancel_pipeline_worker.rb
@@ -20,7 +20,7 @@ module Ci
pipeline: pipeline,
current_user: nil,
cascade_to_children: false,
- auto_canceled_by_pipeline_id: auto_canceled_by_pipeline_id
+ auto_canceled_by_pipeline: ::Ci::Pipeline.find_by_id(auto_canceled_by_pipeline_id)
).force_execute
end
end
diff --git a/app/workers/ci/initial_pipeline_process_worker.rb b/app/workers/ci/initial_pipeline_process_worker.rb
index 703cae8bf88..8d7a62e5b09 100644
--- a/app/workers/ci/initial_pipeline_process_worker.rb
+++ b/app/workers/ci/initial_pipeline_process_worker.rb
@@ -17,24 +17,10 @@ module Ci
def perform(pipeline_id)
Ci::Pipeline.find_by_id(pipeline_id).try do |pipeline|
- create_deployments!(pipeline)
-
Ci::PipelineCreation::StartPipelineService
.new(pipeline)
.execute
end
end
-
- private
-
- def create_deployments!(pipeline)
- return if Feature.enabled?(:create_deployment_only_for_processable_jobs, pipeline.project)
-
- pipeline.stages.flat_map(&:statuses).each { |build| create_deployment(build) }
- end
-
- def create_deployment(build)
- ::Deployments::CreateForJobService.new.execute(build)
- end
end
end
diff --git a/app/workers/ci/refs/unlock_previous_pipelines_worker.rb b/app/workers/ci/refs/unlock_previous_pipelines_worker.rb
index bf595590cb1..588ec4ce1f0 100644
--- a/app/workers/ci/refs/unlock_previous_pipelines_worker.rb
+++ b/app/workers/ci/refs/unlock_previous_pipelines_worker.rb
@@ -14,7 +14,9 @@ module Ci
def perform(ref_id)
::Ci::Ref.find_by_id(ref_id).try do |ref|
- pipeline = ref.last_finished_pipeline
+ next unless ref.artifacts_locked?
+
+ pipeline = ref.last_unlockable_ci_source_pipeline
result = ::Ci::Refs::EnqueuePipelinesToUnlockService.new.execute(ref, before_pipeline: pipeline)
log_extra_metadata_on_done(:total_pending_entries, result[:total_pending_entries])
diff --git a/app/workers/concerns/gitlab/github_import/rescheduling_methods.rb b/app/workers/concerns/gitlab/github_import/rescheduling_methods.rb
index f6feb6d1598..316d30d94da 100644
--- a/app/workers/concerns/gitlab/github_import/rescheduling_methods.rb
+++ b/app/workers/concerns/gitlab/github_import/rescheduling_methods.rb
@@ -52,8 +52,7 @@ module Gitlab
job_delay = client.rate_limit_resets_in + calculate_job_delay(enqueued_job_counter)
- self.class
- .perform_in(job_delay, project.id, hash, notify_key)
+ self.class.perform_in(job_delay, project.id, hash.deep_stringify_keys, notify_key.to_s)
end
end
end
diff --git a/app/workers/concerns/gitlab/github_import/stage_methods.rb b/app/workers/concerns/gitlab/github_import/stage_methods.rb
index 80013ff3cd9..5c63c667a03 100644
--- a/app/workers/concerns/gitlab/github_import/stage_methods.rb
+++ b/app/workers/concerns/gitlab/github_import/stage_methods.rb
@@ -5,6 +5,8 @@ module Gitlab
module StageMethods
extend ActiveSupport::Concern
+ MAX_RETRIES_AFTER_INTERRUPTION = 20
+
included do
include ApplicationWorker
@@ -18,6 +20,29 @@ module Gitlab
end
end
+ class_methods do
+ # We can increase the number of times a GitHubImport::Stage worker is retried
+ # after being interrupted if the importer it executes can restart exactly
+ # from where it left off.
+ #
+ # It is not safe to call this method if the importer loops over its data from
+ # the beginning when restarted, even if it skips data that is already imported
+ # inside the loop, as there is a possibility the importer will never reach
+ # the end of the loop.
+ #
+ # Examples of stage workers that call this method are ones that execute services that:
+ #
+ # - Continue paging an endpoint from where it left off:
+ # https://gitlab.com/gitlab-org/gitlab/-/blob/487521cc/lib/gitlab/github_import/parallel_scheduling.rb#L114-117
+ # - Continue their loop from where it left off:
+ # https://gitlab.com/gitlab-org/gitlab/-/blob/024235ec/lib/gitlab/github_import/importer/pull_requests/review_requests_importer.rb#L15
+ def resumes_work_when_interrupted!
+ return unless Feature.enabled?(:github_importer_raise_max_interruptions)
+
+ sidekiq_options max_retries_after_interruption: MAX_RETRIES_AFTER_INTERRUPTION
+ end
+ end
+
# project_id - The ID of the GitLab project to import the data into.
def perform(project_id)
info(project_id, message: 'starting stage')
@@ -54,6 +79,8 @@ module Gitlab
# client - An instance of Gitlab::GithubImport::Client.
# project - An instance of Project.
def try_import(client, project)
+ project.import_state.refresh_jid_expiration
+
import(client, project)
rescue RateLimitError
self.class.perform_in(client.rate_limit_resets_in, project.id)
diff --git a/app/workers/concerns/worker_attributes.rb b/app/workers/concerns/worker_attributes.rb
index cb09aaf1a6a..28c82a5a38e 100644
--- a/app/workers/concerns/worker_attributes.rb
+++ b/app/workers/concerns/worker_attributes.rb
@@ -201,10 +201,10 @@ module WorkerAttributes
!!get_class_attribute(:big_payload)
end
- def defer_on_database_health_signal(gitlab_schema, tables = [], delay_by = DEFAULT_DEFER_DELAY)
+ def defer_on_database_health_signal(gitlab_schema, tables = [], delay_by = DEFAULT_DEFER_DELAY, &block)
set_class_attribute(
:database_health_check_attrs,
- { gitlab_schema: gitlab_schema, tables: tables, delay_by: delay_by }
+ { gitlab_schema: gitlab_schema, tables: tables, delay_by: delay_by, block: block }
)
end
diff --git a/app/workers/environments/auto_recover_worker.rb b/app/workers/environments/auto_recover_worker.rb
new file mode 100644
index 00000000000..75e86e38f1a
--- /dev/null
+++ b/app/workers/environments/auto_recover_worker.rb
@@ -0,0 +1,22 @@
+# frozen_string_literal: true
+
+module Environments
+ class AutoRecoverWorker
+ include ApplicationWorker
+
+ deduplicate :until_executed
+ data_consistency :delayed
+ idempotent!
+ feature_category :continuous_delivery
+
+ def perform(environment_id, _params = {})
+ Environment.find_by_id(environment_id).try do |environment|
+ next unless environment.long_stopping?
+
+ next unless environment.stop_actions.all?(&:complete?)
+
+ environment.recover_stuck_stopping
+ end
+ end
+ end
+end
diff --git a/app/workers/environments/auto_stop_cron_worker.rb b/app/workers/environments/auto_stop_cron_worker.rb
index 4d6453a85e7..26b18c406e5 100644
--- a/app/workers/environments/auto_stop_cron_worker.rb
+++ b/app/workers/environments/auto_stop_cron_worker.rb
@@ -13,6 +13,7 @@ module Environments
def perform
AutoStopService.new.execute
+ AutoRecoverService.new.execute
end
end
end
diff --git a/app/workers/gitlab/github_import/stage/import_attachments_worker.rb b/app/workers/gitlab/github_import/stage/import_attachments_worker.rb
index f9952f04e99..a5d085a82c0 100644
--- a/app/workers/gitlab/github_import/stage/import_attachments_worker.rb
+++ b/app/workers/gitlab/github_import/stage/import_attachments_worker.rb
@@ -11,6 +11,8 @@ module Gitlab
include GithubImport::Queue
include StageMethods
+ resumes_work_when_interrupted!
+
# client - An instance of Gitlab::GithubImport::Client.
# project - An instance of Project.
def import(client, project)
@@ -48,8 +50,8 @@ module Gitlab
def move_to_next_stage(project, waiters = {})
AdvanceStageWorker.perform_async(
project.id,
- waiters,
- :protected_branches
+ waiters.deep_stringify_keys,
+ 'protected_branches'
)
end
end
diff --git a/app/workers/gitlab/github_import/stage/import_base_data_worker.rb b/app/workers/gitlab/github_import/stage/import_base_data_worker.rb
index 94cb3cb6c71..5bbe14b6528 100644
--- a/app/workers/gitlab/github_import/stage/import_base_data_worker.rb
+++ b/app/workers/gitlab/github_import/stage/import_base_data_worker.rb
@@ -27,8 +27,6 @@ module Gitlab
klass.new(project, client).execute
end
- project.import_state.refresh_jid_expiration
-
ImportPullRequestsWorker.perform_async(project.id)
end
end
diff --git a/app/workers/gitlab/github_import/stage/import_collaborators_worker.rb b/app/workers/gitlab/github_import/stage/import_collaborators_worker.rb
index 751ca92388a..037b529b866 100644
--- a/app/workers/gitlab/github_import/stage/import_collaborators_worker.rb
+++ b/app/workers/gitlab/github_import/stage/import_collaborators_worker.rb
@@ -20,7 +20,6 @@ module Gitlab
info(project.id, message: 'starting importer', importer: 'Importer::CollaboratorsImporter')
waiter = Importer::CollaboratorsImporter.new(project, client).execute
- project.import_state.refresh_jid_expiration
move_to_next_stage(project, { waiter.key => waiter.jobs_remaining })
end
@@ -44,7 +43,7 @@ module Gitlab
def move_to_next_stage(project, waiters = {})
AdvanceStageWorker.perform_async(
- project.id, waiters, :pull_requests_merged_by
+ project.id, waiters.deep_stringify_keys, 'pull_requests_merged_by'
)
end
end
diff --git a/app/workers/gitlab/github_import/stage/import_issue_events_worker.rb b/app/workers/gitlab/github_import/stage/import_issue_events_worker.rb
index c80412d941b..35779d7bfc5 100644
--- a/app/workers/gitlab/github_import/stage/import_issue_events_worker.rb
+++ b/app/workers/gitlab/github_import/stage/import_issue_events_worker.rb
@@ -11,6 +11,8 @@ module Gitlab
include GithubImport::Queue
include StageMethods
+ resumes_work_when_interrupted!
+
# client - An instance of Gitlab::GithubImport::Client.
# project - An instance of Project.
def import(client, project)
@@ -30,7 +32,7 @@ module Gitlab
end
def move_to_next_stage(project, waiters = {})
- AdvanceStageWorker.perform_async(project.id, waiters, :notes)
+ AdvanceStageWorker.perform_async(project.id, waiters.deep_stringify_keys, 'notes')
end
end
end
diff --git a/app/workers/gitlab/github_import/stage/import_issues_and_diff_notes_worker.rb b/app/workers/gitlab/github_import/stage/import_issues_and_diff_notes_worker.rb
index 592b789cc94..58e1f637b6a 100644
--- a/app/workers/gitlab/github_import/stage/import_issues_and_diff_notes_worker.rb
+++ b/app/workers/gitlab/github_import/stage/import_issues_and_diff_notes_worker.rb
@@ -11,6 +11,8 @@ module Gitlab
include GithubImport::Queue
include StageMethods
+ resumes_work_when_interrupted!
+
# client - An instance of Gitlab::GithubImport::Client.
# project - An instance of Project.
def import(client, project)
@@ -20,7 +22,7 @@ module Gitlab
hash[waiter.key] = waiter.jobs_remaining
end
- AdvanceStageWorker.perform_async(project.id, waiters, :issue_events)
+ AdvanceStageWorker.perform_async(project.id, waiters.deep_stringify_keys, 'issue_events')
end
# The importers to run in this stage. Issues can't be imported earlier
diff --git a/app/workers/gitlab/github_import/stage/import_lfs_objects_worker.rb b/app/workers/gitlab/github_import/stage/import_lfs_objects_worker.rb
index e89a850c991..8d7bd98f303 100644
--- a/app/workers/gitlab/github_import/stage/import_lfs_objects_worker.rb
+++ b/app/workers/gitlab/github_import/stage/import_lfs_objects_worker.rb
@@ -11,6 +11,11 @@ module Gitlab
include GithubImport::Queue
include StageMethods
+ # Importer::LfsObjectsImporter can resume work when interrupted as
+ # it uses Projects::LfsPointers::LfsObjectDownloadListService which excludes LFS objects that already exist.
+ # https://gitlab.com/gitlab-org/gitlab/-/blob/eabf0800/app/services/projects/lfs_pointers/lfs_object_download_list_service.rb#L69-71
+ resumes_work_when_interrupted!
+
def perform(project_id)
return unless (project = find_project(project_id))
@@ -28,7 +33,7 @@ module Gitlab
AdvanceStageWorker.perform_async(
project.id,
{ waiter.key => waiter.jobs_remaining },
- :finish
+ 'finish'
)
end
end
diff --git a/app/workers/gitlab/github_import/stage/import_notes_worker.rb b/app/workers/gitlab/github_import/stage/import_notes_worker.rb
index c1fdb76d03e..0459545d8e1 100644
--- a/app/workers/gitlab/github_import/stage/import_notes_worker.rb
+++ b/app/workers/gitlab/github_import/stage/import_notes_worker.rb
@@ -11,6 +11,8 @@ module Gitlab
include GithubImport::Queue
include StageMethods
+ resumes_work_when_interrupted!
+
# client - An instance of Gitlab::GithubImport::Client.
# project - An instance of Project.
def import(client, project)
@@ -20,7 +22,7 @@ module Gitlab
hash[waiter.key] = waiter.jobs_remaining
end
- AdvanceStageWorker.perform_async(project.id, waiters, :attachments)
+ AdvanceStageWorker.perform_async(project.id, waiters.deep_stringify_keys, 'attachments')
end
def importers(project)
diff --git a/app/workers/gitlab/github_import/stage/import_protected_branches_worker.rb b/app/workers/gitlab/github_import/stage/import_protected_branches_worker.rb
index f8448094c28..e281e965f94 100644
--- a/app/workers/gitlab/github_import/stage/import_protected_branches_worker.rb
+++ b/app/workers/gitlab/github_import/stage/import_protected_branches_worker.rb
@@ -19,12 +19,10 @@ module Gitlab
.new(project, client)
.execute
- project.import_state.refresh_jid_expiration
-
AdvanceStageWorker.perform_async(
project.id,
{ waiter.key => waiter.jobs_remaining },
- :lfs_objects
+ 'lfs_objects'
)
end
end
diff --git a/app/workers/gitlab/github_import/stage/import_pull_requests_merged_by_worker.rb b/app/workers/gitlab/github_import/stage/import_pull_requests_merged_by_worker.rb
index 2e7cd28578f..2f543951bf3 100644
--- a/app/workers/gitlab/github_import/stage/import_pull_requests_merged_by_worker.rb
+++ b/app/workers/gitlab/github_import/stage/import_pull_requests_merged_by_worker.rb
@@ -11,6 +11,8 @@ module Gitlab
include GithubImport::Queue
include StageMethods
+ resumes_work_when_interrupted!
+
# client - An instance of Gitlab::GithubImport::Client.
# project - An instance of Project.
def import(client, project)
@@ -18,12 +20,10 @@ module Gitlab
.new(project, client)
.execute
- project.import_state.refresh_jid_expiration
-
AdvanceStageWorker.perform_async(
project.id,
{ waiter.key => waiter.jobs_remaining },
- :pull_request_review_requests
+ 'pull_request_review_requests'
)
end
end
diff --git a/app/workers/gitlab/github_import/stage/import_pull_requests_review_requests_worker.rb b/app/workers/gitlab/github_import/stage/import_pull_requests_review_requests_worker.rb
index 2f860349e25..db76545ae87 100644
--- a/app/workers/gitlab/github_import/stage/import_pull_requests_review_requests_worker.rb
+++ b/app/workers/gitlab/github_import/stage/import_pull_requests_review_requests_worker.rb
@@ -11,6 +11,8 @@ module Gitlab
include GithubImport::Queue
include StageMethods
+ resumes_work_when_interrupted!
+
# client - An instance of Gitlab::GithubImport::Client.
# project - An instance of Project.
def import(client, project)
@@ -18,12 +20,10 @@ module Gitlab
.new(project, client)
.execute
- project.import_state.refresh_jid_expiration
-
AdvanceStageWorker.perform_async(
project.id,
{ waiter.key => waiter.jobs_remaining },
- :pull_request_reviews
+ 'pull_request_reviews'
)
end
end
diff --git a/app/workers/gitlab/github_import/stage/import_pull_requests_reviews_worker.rb b/app/workers/gitlab/github_import/stage/import_pull_requests_reviews_worker.rb
index 51730033133..31b7c57a524 100644
--- a/app/workers/gitlab/github_import/stage/import_pull_requests_reviews_worker.rb
+++ b/app/workers/gitlab/github_import/stage/import_pull_requests_reviews_worker.rb
@@ -11,6 +11,8 @@ module Gitlab
include GithubImport::Queue
include StageMethods
+ resumes_work_when_interrupted!
+
# client - An instance of Gitlab::GithubImport::Client.
# project - An instance of Project.
def import(client, project)
@@ -18,12 +20,10 @@ module Gitlab
.new(project, client)
.execute
- project.import_state.refresh_jid_expiration
-
AdvanceStageWorker.perform_async(
project.id,
{ waiter.key => waiter.jobs_remaining },
- :issues_and_diff_notes
+ 'issues_and_diff_notes'
)
end
end
diff --git a/app/workers/gitlab/github_import/stage/import_pull_requests_worker.rb b/app/workers/gitlab/github_import/stage/import_pull_requests_worker.rb
index 029d38d8b93..c68b95b5111 100644
--- a/app/workers/gitlab/github_import/stage/import_pull_requests_worker.rb
+++ b/app/workers/gitlab/github_import/stage/import_pull_requests_worker.rb
@@ -11,6 +11,8 @@ module Gitlab
include GithubImport::Queue
include StageMethods
+ resumes_work_when_interrupted!
+
# client - An instance of Gitlab::GithubImport::Client.
# project - An instance of Project.
def import(client, project)
@@ -25,12 +27,10 @@ module Gitlab
.new(project, client)
.execute
- project.import_state.refresh_jid_expiration
-
AdvanceStageWorker.perform_async(
project.id,
{ waiter.key => waiter.jobs_remaining },
- :collaborators
+ 'collaborators'
)
end
diff --git a/app/workers/gitlab/import/advance_stage.rb b/app/workers/gitlab/import/advance_stage.rb
index 180c08905ff..782439894c0 100644
--- a/app/workers/gitlab/import/advance_stage.rb
+++ b/app/workers/gitlab/import/advance_stage.rb
@@ -19,7 +19,7 @@ module Gitlab
# completed.
# timeout_timer - Time the sidekiq worker was first initiated with the current job_count
# previous_job_count - Number of jobs remaining on last invocation of this worker
- def perform(project_id, waiters, next_stage, timeout_timer = Time.zone.now, previous_job_count = nil)
+ def perform(project_id, waiters, next_stage, timeout_timer = Time.zone.now.to_s, previous_job_count = nil)
import_state_jid = find_import_state_jid(project_id)
# If the import state is nil the project may have been deleted or the import
@@ -45,7 +45,9 @@ module Gitlab
handle_timeout(import_state_jid, next_stage, project_id, new_waiters, new_job_count)
else
- self.class.perform_in(INTERVAL, project_id, new_waiters, next_stage, timeout_timer, previous_job_count)
+ self.class.perform_in(INTERVAL,
+ project_id, new_waiters.deep_stringify_keys, next_stage.to_s, timeout_timer.to_s, previous_job_count
+ )
end
end
diff --git a/app/workers/gitlab/jira_import/stage/import_issues_worker.rb b/app/workers/gitlab/jira_import/stage/import_issues_worker.rb
index 7a5eb6c1e3a..5d890ecfe13 100644
--- a/app/workers/gitlab/jira_import/stage/import_issues_worker.rb
+++ b/app/workers/gitlab/jira_import/stage/import_issues_worker.rb
@@ -9,7 +9,14 @@ module Gitlab
private
def import(project)
- jobs_waiter = Gitlab::JiraImport::IssuesImporter.new(project).execute
+ jira_client = if Feature.enabled?(:increase_jira_import_issues_timeout)
+ project.jira_integration.client(read_timeout: 2.minutes)
+ end
+
+ jobs_waiter = Gitlab::JiraImport::IssuesImporter.new(
+ project,
+ jira_client
+ ).execute
project.latest_jira_import.refresh_jid_expiration
diff --git a/app/workers/hashed_storage/base_worker.rb b/app/workers/hashed_storage/base_worker.rb
deleted file mode 100644
index 372440996d9..00000000000
--- a/app/workers/hashed_storage/base_worker.rb
+++ /dev/null
@@ -1,24 +0,0 @@
-# frozen_string_literal: true
-
-module HashedStorage
- class BaseWorker # rubocop:disable Scalability/IdempotentWorker
- include ExclusiveLeaseGuard
- include WorkerAttributes
-
- feature_category :source_code_management
-
- LEASE_TIMEOUT = 30.seconds.to_i
- LEASE_KEY_SEGMENT = 'project_migrate_hashed_storage_worker'
-
- protected
-
- def lease_key
- # we share the same lease key for both migration and rollback so they don't run simultaneously
- "#{LEASE_KEY_SEGMENT}:#{project_id}"
- end
-
- def lease_timeout
- LEASE_TIMEOUT
- end
- end
-end
diff --git a/app/workers/hashed_storage/migrator_worker.rb b/app/workers/hashed_storage/migrator_worker.rb
deleted file mode 100644
index a7e7a505681..00000000000
--- a/app/workers/hashed_storage/migrator_worker.rb
+++ /dev/null
@@ -1,18 +0,0 @@
-# frozen_string_literal: true
-
-module HashedStorage
- class MigratorWorker # rubocop:disable Scalability/IdempotentWorker
- include ApplicationWorker
-
- data_consistency :always
-
- sidekiq_options retry: 3
-
- queue_namespace :hashed_storage
- feature_category :source_code_management
-
- # @param [Integer] start initial ID of the batch
- # @param [Integer] finish last ID of the batch
- def perform(start, finish); end
- end
-end
diff --git a/app/workers/hashed_storage/project_migrate_worker.rb b/app/workers/hashed_storage/project_migrate_worker.rb
deleted file mode 100644
index e1bf71de179..00000000000
--- a/app/workers/hashed_storage/project_migrate_worker.rb
+++ /dev/null
@@ -1,18 +0,0 @@
-# frozen_string_literal: true
-
-module HashedStorage
- class ProjectMigrateWorker < BaseWorker # rubocop:disable Scalability/IdempotentWorker
- include ApplicationWorker
-
- data_consistency :always
-
- sidekiq_options retry: 3
-
- queue_namespace :hashed_storage
- loggable_arguments 1
-
- attr_reader :project_id
-
- def perform(project_id, old_disk_path = nil); end
- end
-end
diff --git a/app/workers/hashed_storage/project_rollback_worker.rb b/app/workers/hashed_storage/project_rollback_worker.rb
deleted file mode 100644
index af4223ff354..00000000000
--- a/app/workers/hashed_storage/project_rollback_worker.rb
+++ /dev/null
@@ -1,18 +0,0 @@
-# frozen_string_literal: true
-
-module HashedStorage
- class ProjectRollbackWorker < BaseWorker # rubocop:disable Scalability/IdempotentWorker
- include ApplicationWorker
-
- data_consistency :always
-
- sidekiq_options retry: 3
-
- queue_namespace :hashed_storage
- loggable_arguments 1
-
- attr_reader :project_id
-
- def perform(project_id, old_disk_path = nil); end
- end
-end
diff --git a/app/workers/hashed_storage/rollbacker_worker.rb b/app/workers/hashed_storage/rollbacker_worker.rb
deleted file mode 100644
index e659e65a370..00000000000
--- a/app/workers/hashed_storage/rollbacker_worker.rb
+++ /dev/null
@@ -1,18 +0,0 @@
-# frozen_string_literal: true
-
-module HashedStorage
- class RollbackerWorker # rubocop:disable Scalability/IdempotentWorker
- include ApplicationWorker
-
- data_consistency :always
-
- sidekiq_options retry: 3
-
- queue_namespace :hashed_storage
- feature_category :source_code_management
-
- # @param [Integer] start initial ID of the batch
- # @param [Integer] finish last ID of the batch
- def perform(start, finish); end
- end
-end
diff --git a/app/workers/merge_request_cleanup_refs_worker.rb b/app/workers/merge_request_cleanup_refs_worker.rb
index 92dfe8a8cb0..db1a1e96997 100644
--- a/app/workers/merge_request_cleanup_refs_worker.rb
+++ b/app/workers/merge_request_cleanup_refs_worker.rb
@@ -18,8 +18,6 @@ class MergeRequestCleanupRefsWorker
FAILURE_THRESHOLD = 3
def perform_work
- return unless Feature.enabled?(:merge_request_refs_cleanup)
-
unless merge_request
logger.error('No existing merge request to be cleaned up.')
return
diff --git a/app/workers/merge_requests/set_reviewer_reviewed_worker.rb b/app/workers/merge_requests/set_reviewer_reviewed_worker.rb
index 2f15bf3b879..7e8bc60f6e1 100644
--- a/app/workers/merge_requests/set_reviewer_reviewed_worker.rb
+++ b/app/workers/merge_requests/set_reviewer_reviewed_worker.rb
@@ -13,18 +13,23 @@ module MergeRequests
current_user_id = event.data[:current_user_id]
merge_request_id = event.data[:merge_request_id]
current_user = User.find_by_id(current_user_id)
- merge_request = MergeRequest.find_by_id(merge_request_id)
- if !current_user
+ unless current_user
logger.info(structured_payload(message: 'Current user not found.', current_user_id: current_user_id))
- elsif !merge_request
- logger.info(structured_payload(message: 'Merge request not found.', merge_request_id: merge_request_id))
- else
- project = merge_request.source_project
+ return
+ end
+
+ merge_request = MergeRequest.find_by_id(merge_request_id)
- ::MergeRequests::MarkReviewerReviewedService.new(project: project, current_user: current_user)
- .execute(merge_request)
+ unless merge_request
+ logger.info(structured_payload(message: 'Merge request not found.', merge_request_id: merge_request_id))
+ return
end
+
+ project = merge_request.source_project
+
+ ::MergeRequests::UpdateReviewerStateService.new(project: project, current_user: current_user)
+ .execute(merge_request, "reviewed")
end
end
end
diff --git a/app/workers/packages/cleanup_package_registry_worker.rb b/app/workers/packages/cleanup_package_registry_worker.rb
index 5f14102b5a1..5b2d8bacd62 100644
--- a/app/workers/packages/cleanup_package_registry_worker.rb
+++ b/app/workers/packages/cleanup_package_registry_worker.rb
@@ -13,6 +13,7 @@ module Packages
def perform
enqueue_package_file_cleanup_job if Packages::PackageFile.pending_destruction.exists?
enqueue_cleanup_policy_jobs if Packages::Cleanup::Policy.runnable.exists?
+ enqueue_cleanup_stale_npm_metadata_cache_job if Packages::Npm::MetadataCache.pending_destruction.exists?
log_counts
end
@@ -27,6 +28,10 @@ module Packages
Packages::Cleanup::ExecutePolicyWorker.perform_with_capacity
end
+ def enqueue_cleanup_stale_npm_metadata_cache_job
+ Packages::Npm::CleanupStaleMetadataCacheWorker.perform_with_capacity
+ end
+
def log_counts
use_replica_if_available do
pending_destruction_package_files_count = Packages::PackageFile.pending_destruction.count
diff --git a/app/workers/packages/npm/cleanup_stale_metadata_cache_worker.rb b/app/workers/packages/npm/cleanup_stale_metadata_cache_worker.rb
new file mode 100644
index 00000000000..158209c28fd
--- /dev/null
+++ b/app/workers/packages/npm/cleanup_stale_metadata_cache_worker.rb
@@ -0,0 +1,42 @@
+# frozen_string_literal: true
+
+module Packages
+ module Npm
+ class CleanupStaleMetadataCacheWorker
+ include ApplicationWorker
+ include ::Packages::CleanupArtifactWorker
+
+ MAX_CAPACITY = 2
+
+ data_consistency :sticky
+
+ queue_namespace :package_cleanup
+ feature_category :package_registry
+
+ deduplicate :until_executed
+ idempotent!
+
+ def max_running_jobs
+ MAX_CAPACITY
+ end
+
+ private
+
+ def model
+ Packages::Npm::MetadataCache
+ end
+
+ def log_metadata(npm_metadata_cache)
+ log_extra_metadata_on_done(:npm_metadata_cache_id, npm_metadata_cache.id)
+ end
+
+ def log_cleanup_item(npm_metadata_cache)
+ logger.info(
+ structured_payload(
+ npm_metadata_cache_id: npm_metadata_cache.id
+ )
+ )
+ end
+ end
+ end
+end
diff --git a/app/workers/packages/nuget/extraction_worker.rb b/app/workers/packages/nuget/extraction_worker.rb
index 55aca0beb03..33fc98cf95b 100644
--- a/app/workers/packages/nuget/extraction_worker.rb
+++ b/app/workers/packages/nuget/extraction_worker.rb
@@ -18,7 +18,7 @@ module Packages
return unless package_file
- ::Packages::Nuget::UpdatePackageFromMetadataService.new(package_file).execute
+ ::Packages::Nuget::ProcessPackageFileService.new(package_file).execute
rescue StandardError => exception
process_package_file_error(
package_file: package_file,
diff --git a/app/workers/projects/import_export/after_import_merge_requests_worker.rb b/app/workers/projects/import_export/after_import_merge_requests_worker.rb
new file mode 100644
index 00000000000..b40e0ca5f09
--- /dev/null
+++ b/app/workers/projects/import_export/after_import_merge_requests_worker.rb
@@ -0,0 +1,21 @@
+# frozen_string_literal: true
+
+module Projects
+ module ImportExport
+ class AfterImportMergeRequestsWorker
+ include ApplicationWorker
+
+ idempotent!
+ data_consistency :delayed
+ urgency :low
+ feature_category :importers
+
+ def perform(project_id)
+ project = Project.find_by_id(project_id)
+ return unless project
+
+ project.merge_requests.set_latest_merge_request_diff_ids!
+ end
+ end
+ end
+end
diff --git a/app/workers/remove_expired_group_links_worker.rb b/app/workers/remove_expired_group_links_worker.rb
index f1da5f37945..0bac595f0c4 100644
--- a/app/workers/remove_expired_group_links_worker.rb
+++ b/app/workers/remove_expired_group_links_worker.rb
@@ -11,7 +11,7 @@ class RemoveExpiredGroupLinksWorker # rubocop:disable Scalability/IdempotentWork
def perform
ProjectGroupLink.expired.find_each do |link|
- Projects::GroupLinks::DestroyService.new(link.project, nil).execute(link)
+ Projects::GroupLinks::DestroyService.new(link.project, nil).execute(link, skip_authorization: true)
end
GroupGroupLink.expired.find_in_batches do |link_batch|
diff --git a/app/workers/repository_fork_worker.rb b/app/workers/repository_fork_worker.rb
index 5ec9ceaf004..f4a507246ac 100644
--- a/app/workers/repository_fork_worker.rb
+++ b/app/workers/repository_fork_worker.rb
@@ -2,6 +2,7 @@
class RepositoryForkWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker
+ include Gitlab::Utils::StrongMemoize
data_consistency :always
@@ -12,10 +13,8 @@ class RepositoryForkWorker # rubocop:disable Scalability/IdempotentWorker
feature_category :source_code_management
def perform(*args)
- target_project_id = args.shift
- target_project = Project.find(target_project_id)
+ @target_project_id = args.shift
- source_project = target_project.forked_from_project
unless source_project
return target_project.import_state.mark_as_failed(_('Source project cannot be found.'))
end
@@ -25,6 +24,21 @@ class RepositoryForkWorker # rubocop:disable Scalability/IdempotentWorker
private
+ def target_project
+ Project.find(@target_project_id)
+ end
+ strong_memoize_attr :target_project
+
+ def source_project
+ @source_project ||= target_project.forked_from_project
+ end
+
+ def branch
+ return unless target_project.import_data&.data
+
+ target_project.import_data.data['fork_branch']
+ end
+
def fork_repository(target_project, source_project)
return unless start_fork(target_project)
@@ -46,7 +60,7 @@ class RepositoryForkWorker # rubocop:disable Scalability/IdempotentWorker
source_repo = source_project.repository.raw
target_repo = target_project.repository.raw
- ::Gitlab::GitalyClient::RepositoryService.new(target_repo).fork_repository(source_repo)
+ ::Gitlab::GitalyClient::RepositoryService.new(target_repo).fork_repository(source_repo, branch)
rescue GRPC::BadStatus => e
Gitlab::ErrorTracking.track_exception(e, source_project_id: source_project.id, target_project_id: target_project.id)
diff --git a/app/workers/schedule_merge_request_cleanup_refs_worker.rb b/app/workers/schedule_merge_request_cleanup_refs_worker.rb
index ced1f443ea6..2ecc95335e2 100644
--- a/app/workers/schedule_merge_request_cleanup_refs_worker.rb
+++ b/app/workers/schedule_merge_request_cleanup_refs_worker.rb
@@ -12,7 +12,6 @@ class ScheduleMergeRequestCleanupRefsWorker
def perform
return if Gitlab::Database.read_only?
- return unless Feature.enabled?(:merge_request_refs_cleanup)
MergeRequest::CleanupSchedule.stuck_retry!
MergeRequestCleanupRefsWorker.perform_with_capacity
diff --git a/app/workers/tasks_to_be_done/create_worker.rb b/app/workers/tasks_to_be_done/create_worker.rb
deleted file mode 100644
index 91046e3cfed..00000000000
--- a/app/workers/tasks_to_be_done/create_worker.rb
+++ /dev/null
@@ -1,18 +0,0 @@
-# frozen_string_literal: true
-
-module TasksToBeDone
- class CreateWorker
- include ApplicationWorker
-
- data_consistency :always
- idempotent!
- feature_category :onboarding
- urgency :low
- worker_resource_boundary :cpu
-
- def perform(member_task_id, current_user_id, assignee_ids = [])
- # no-op removing
- # https://docs.gitlab.com/ee/development/sidekiq/compatibility_across_updates.html#removing-worker-classes
- end
- end
-end