Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'app/workers')
-rw-r--r--app/workers/all_queues.yml154
-rw-r--r--app/workers/bulk_import_worker.rb116
-rw-r--r--app/workers/bulk_imports/entity_worker.rb114
-rw-r--r--app/workers/bulk_imports/finish_batched_pipeline_worker.rb8
-rw-r--r--app/workers/bulk_imports/pipeline_batch_worker.rb6
-rw-r--r--app/workers/bulk_imports/pipeline_worker.rb9
-rw-r--r--app/workers/ci/initial_pipeline_process_worker.rb2
-rw-r--r--app/workers/ci/merge_requests/add_todo_when_build_fails_worker.rb1
-rw-r--r--app/workers/ci/ref_delete_unlock_artifacts_worker.rb18
-rw-r--r--app/workers/ci/refs/unlock_previous_pipelines_worker.rb26
-rw-r--r--app/workers/ci/schedule_unlock_pipelines_in_queue_cron_worker.rb21
-rw-r--r--app/workers/ci/unlock_pipelines_in_queue_worker.rb55
-rw-r--r--app/workers/click_house/events_sync_worker.rb1
-rw-r--r--app/workers/concerns/auto_devops_queue.rb2
-rw-r--r--app/workers/concerns/chaos_queue.rb2
-rw-r--r--app/workers/concerns/gitlab/github_import/object_importer.rb36
-rw-r--r--app/workers/concerns/gitlab/github_import/queue.rb8
-rw-r--r--app/workers/concerns/gitlab/github_import/stage_methods.rb22
-rw-r--r--app/workers/concerns/limited_capacity/job_tracker.rb1
-rw-r--r--app/workers/concerns/limited_capacity/worker.rb36
-rw-r--r--app/workers/concerns/worker_attributes.rb4
-rw-r--r--app/workers/database/batched_background_migration/ci_database_worker.rb1
-rw-r--r--app/workers/delete_container_repository_worker.rb15
-rw-r--r--app/workers/environments/stop_job_failed_worker.rb25
-rw-r--r--app/workers/gitlab/bitbucket_import/advance_stage_worker.rb12
-rw-r--r--app/workers/gitlab/bitbucket_import/import_issue_notes_worker.rb13
-rw-r--r--app/workers/gitlab/bitbucket_import/import_issue_worker.rb13
-rw-r--r--app/workers/gitlab/bitbucket_import/import_lfs_object_worker.rb13
-rw-r--r--app/workers/gitlab/bitbucket_import/import_pull_request_notes_worker.rb13
-rw-r--r--app/workers/gitlab/bitbucket_import/stage/import_issues_notes_worker.rb30
-rw-r--r--app/workers/gitlab/bitbucket_import/stage/import_issues_worker.rb30
-rw-r--r--app/workers/gitlab/bitbucket_import/stage/import_lfs_objects_worker.rb30
-rw-r--r--app/workers/gitlab/bitbucket_import/stage/import_pull_requests_notes_worker.rb30
-rw-r--r--app/workers/gitlab/bitbucket_import/stage/import_pull_requests_worker.rb2
-rw-r--r--app/workers/gitlab/bitbucket_server_import/advance_stage_worker.rb6
-rw-r--r--app/workers/gitlab/github_gists_import/import_gist_worker.rb6
-rw-r--r--app/workers/gitlab/github_gists_import/start_import_worker.rb2
-rw-r--r--app/workers/gitlab/github_import/advance_stage_worker.rb6
-rw-r--r--app/workers/gitlab/github_import/refresh_import_jid_worker.rb1
-rw-r--r--app/workers/gitlab/github_import/stage/finish_import_worker.rb1
-rw-r--r--app/workers/gitlab/github_import/stage/import_attachments_worker.rb1
-rw-r--r--app/workers/gitlab/github_import/stage/import_base_data_worker.rb17
-rw-r--r--app/workers/gitlab/github_import/stage/import_collaborators_worker.rb15
-rw-r--r--app/workers/gitlab/github_import/stage/import_issue_events_worker.rb1
-rw-r--r--app/workers/gitlab/github_import/stage/import_issues_and_diff_notes_worker.rb1
-rw-r--r--app/workers/gitlab/github_import/stage/import_lfs_objects_worker.rb1
-rw-r--r--app/workers/gitlab/github_import/stage/import_notes_worker.rb1
-rw-r--r--app/workers/gitlab/github_import/stage/import_protected_branches_worker.rb10
-rw-r--r--app/workers/gitlab/github_import/stage/import_pull_requests_merged_by_worker.rb1
-rw-r--r--app/workers/gitlab/github_import/stage/import_pull_requests_review_requests_worker.rb1
-rw-r--r--app/workers/gitlab/github_import/stage/import_pull_requests_reviews_worker.rb1
-rw-r--r--app/workers/gitlab/github_import/stage/import_pull_requests_worker.rb15
-rw-r--r--app/workers/gitlab/github_import/stage/import_repository_worker.rb16
-rw-r--r--app/workers/gitlab/import/advance_stage.rb88
-rw-r--r--app/workers/gitlab/import/stuck_project_import_jobs_worker.rb1
-rw-r--r--app/workers/gitlab/jira_import/advance_stage_worker.rb6
-rw-r--r--app/workers/gitlab_shell_worker.rb25
-rw-r--r--app/workers/hashed_storage/migrator_worker.rb5
-rw-r--r--app/workers/hashed_storage/project_migrate_worker.rb13
-rw-r--r--app/workers/hashed_storage/project_rollback_worker.rb13
-rw-r--r--app/workers/hashed_storage/rollbacker_worker.rb5
-rw-r--r--app/workers/integrations/irker_worker.rb2
-rw-r--r--app/workers/issuable/related_links_create_worker.rb65
-rw-r--r--app/workers/jira_connect/sync_project_worker.rb5
-rw-r--r--app/workers/merge_worker.rb5
-rw-r--r--app/workers/pages/deactivated_deployments_delete_cron_worker.rb19
-rw-r--r--app/workers/projects/after_import_worker.rb2
-rw-r--r--app/workers/projects/record_target_platforms_worker.rb2
-rw-r--r--app/workers/run_pipeline_schedule_worker.rb2
-rw-r--r--app/workers/tasks_to_be_done/create_worker.rb17
70 files changed, 757 insertions, 490 deletions
diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml
index 6ef7447b9da..e5b860ba525 100644
--- a/app/workers/all_queues.yml
+++ b/app/workers/all_queues.yml
@@ -174,15 +174,6 @@
:weight: 1
:idempotent: true
:tags: []
-- :name: container_repository:delete_container_repository
- :worker_name: DeleteContainerRepositoryWorker
- :feature_category: :container_registry
- :has_external_dependencies: false
- :urgency: :low
- :resource_boundary: :unknown
- :weight: 1
- :idempotent: false
- :tags: []
- :name: container_repository_delete:container_registry_delete_container_repository
:worker_name: ContainerRegistry::DeleteContainerRepositoryWorker
:feature_category: :container_registry
@@ -300,6 +291,15 @@
:weight: 1
:idempotent: true
:tags: []
+- :name: cronjob:ci_schedule_unlock_pipelines_in_queue_cron
+ :worker_name: Ci::ScheduleUnlockPipelinesInQueueCronWorker
+ :feature_category: :build_artifacts
+ :has_external_dependencies: false
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: cronjob:ci_stuck_builds_drop_running
:worker_name: Ci::StuckBuilds::DropRunningWorker
:feature_category: :continuous_integration
@@ -327,6 +327,15 @@
:weight: 1
:idempotent: false
:tags: []
+- :name: cronjob:click_house_events_sync
+ :worker_name: ClickHouse::EventsSyncWorker
+ :feature_category: :value_stream_management
+ :has_external_dependencies: true
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: cronjob:container_expiration_policy
:worker_name: ContainerExpirationPolicyWorker
:feature_category: :container_registry
@@ -642,6 +651,15 @@
:weight: 1
:idempotent: true
:tags: []
+- :name: cronjob:pages_deactivated_deployments_delete_cron
+ :worker_name: Pages::DeactivatedDeploymentsDeleteCronWorker
+ :feature_category: :pages
+ :has_external_dependencies: false
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: cronjob:pages_domain_removal_cron
:worker_name: PagesDomainRemovalCronWorker
:feature_category: :pages
@@ -1920,6 +1938,15 @@
:weight: 1
:idempotent: true
:tags: []
+- :name: pipeline_background:ci_refs_unlock_previous_pipelines
+ :worker_name: Ci::Refs::UnlockPreviousPipelinesWorker
+ :feature_category: :continuous_integration
+ :has_external_dependencies: false
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: pipeline_background:ci_test_failure_history
:worker_name: Ci::TestFailureHistoryWorker
:feature_category: :continuous_integration
@@ -2352,6 +2379,33 @@
:weight: 1
:idempotent: false
:tags: []
+- :name: bitbucket_import_import_issue
+ :worker_name: Gitlab::BitbucketImport::ImportIssueWorker
+ :feature_category: :importers
+ :has_external_dependencies: true
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: false
+ :tags: []
+- :name: bitbucket_import_import_issue_notes
+ :worker_name: Gitlab::BitbucketImport::ImportIssueNotesWorker
+ :feature_category: :importers
+ :has_external_dependencies: true
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: false
+ :tags: []
+- :name: bitbucket_import_import_lfs_object
+ :worker_name: Gitlab::BitbucketImport::ImportLfsObjectWorker
+ :feature_category: :importers
+ :has_external_dependencies: true
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: false
+ :tags: []
- :name: bitbucket_import_import_pull_request
:worker_name: Gitlab::BitbucketImport::ImportPullRequestWorker
:feature_category: :importers
@@ -2361,6 +2415,15 @@
:weight: 1
:idempotent: false
:tags: []
+- :name: bitbucket_import_import_pull_request_notes
+ :worker_name: Gitlab::BitbucketImport::ImportPullRequestNotesWorker
+ :feature_category: :importers
+ :has_external_dependencies: true
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: false
+ :tags: []
- :name: bitbucket_import_stage_finish_import
:worker_name: Gitlab::BitbucketImport::Stage::FinishImportWorker
:feature_category: :importers
@@ -2370,6 +2433,33 @@
:weight: 1
:idempotent: false
:tags: []
+- :name: bitbucket_import_stage_import_issues
+ :worker_name: Gitlab::BitbucketImport::Stage::ImportIssuesWorker
+ :feature_category: :importers
+ :has_external_dependencies: true
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: false
+ :tags: []
+- :name: bitbucket_import_stage_import_issues_notes
+ :worker_name: Gitlab::BitbucketImport::Stage::ImportIssuesNotesWorker
+ :feature_category: :importers
+ :has_external_dependencies: true
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: false
+ :tags: []
+- :name: bitbucket_import_stage_import_lfs_objects
+ :worker_name: Gitlab::BitbucketImport::Stage::ImportLfsObjectsWorker
+ :feature_category: :importers
+ :has_external_dependencies: true
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: false
+ :tags: []
- :name: bitbucket_import_stage_import_pull_requests
:worker_name: Gitlab::BitbucketImport::Stage::ImportPullRequestsWorker
:feature_category: :importers
@@ -2379,6 +2469,15 @@
:weight: 1
:idempotent: false
:tags: []
+- :name: bitbucket_import_stage_import_pull_requests_notes
+ :worker_name: Gitlab::BitbucketImport::Stage::ImportPullRequestsNotesWorker
+ :feature_category: :importers
+ :has_external_dependencies: true
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: false
+ :tags: []
- :name: bitbucket_import_stage_import_repository
:worker_name: Gitlab::BitbucketImport::Stage::ImportRepositoryWorker
:feature_category: :importers
@@ -2631,10 +2730,10 @@
:weight: 1
:idempotent: true
:tags: []
-- :name: click_house_events_sync
- :worker_name: ClickHouse::EventsSyncWorker
- :feature_category: :value_stream_management
- :has_external_dependencies: true
+- :name: ci_unlock_pipelines_in_queue
+ :worker_name: Ci::UnlockPipelinesInQueueWorker
+ :feature_category: :build_artifacts
+ :has_external_dependencies: false
:urgency: :low
:resource_boundary: :unknown
:weight: 1
@@ -2811,6 +2910,15 @@
:weight: 1
:idempotent: true
:tags: []
+- :name: environments_stop_job_failed
+ :worker_name: Environments::StopJobFailedWorker
+ :feature_category: :continuous_delivery
+ :has_external_dependencies: false
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: environments_stop_job_success
:worker_name: Environments::StopJobSuccessWorker
:feature_category: :continuous_delivery
@@ -2883,15 +2991,6 @@
:weight: 1
:idempotent: true
:tags: []
-- :name: gitlab_shell
- :worker_name: GitlabShellWorker
- :feature_category: :source_code_management
- :has_external_dependencies: false
- :urgency: :high
- :resource_boundary: :unknown
- :weight: 2
- :idempotent: false
- :tags: []
- :name: google_cloud_create_cloudsql_instance
:worker_name: GoogleCloud::CreateCloudsqlInstanceWorker
:feature_category: :not_owned
@@ -3045,6 +3144,15 @@
:weight: 1
:idempotent: true
:tags: []
+- :name: issuable_related_links_create
+ :worker_name: Issuable::RelatedLinksCreateWorker
+ :feature_category: :portfolio_management
+ :has_external_dependencies: false
+ :urgency: :high
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: issuables_clear_groups_issue_counter
:worker_name: Issuables::ClearGroupsIssueCounterWorker
:feature_category: :team_planning
@@ -3524,7 +3632,7 @@
:tags: []
- :name: projects_record_target_platforms
:worker_name: Projects::RecordTargetPlatformsWorker
- :feature_category: :experimentation_activation
+ :feature_category: :activation
:has_external_dependencies: false
:urgency: :low
:resource_boundary: :unknown
diff --git a/app/workers/bulk_import_worker.rb b/app/workers/bulk_import_worker.rb
index 83b881ee525..5b9b46081cc 100644
--- a/app/workers/bulk_import_worker.rb
+++ b/app/workers/bulk_import_worker.rb
@@ -3,124 +3,14 @@
class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker
- PERFORM_DELAY = 5.seconds
- DEFAULT_BATCH_SIZE = 5
-
data_consistency :always
feature_category :importers
sidekiq_options retry: false, dead: false
def perform(bulk_import_id)
- @bulk_import = BulkImport.find_by_id(bulk_import_id)
-
- return unless @bulk_import
- return if @bulk_import.finished? || @bulk_import.failed?
- return @bulk_import.fail_op! if all_entities_failed?
- return @bulk_import.finish! if all_entities_processed? && @bulk_import.started?
- return re_enqueue if max_batch_size_exceeded? # Do not start more jobs if max allowed are already running
-
- @bulk_import.start! if @bulk_import.created?
-
- created_entities.first(next_batch_size).each do |entity|
- create_tracker(entity)
-
- entity.start!
-
- BulkImports::ExportRequestWorker.perform_async(entity.id)
- end
-
- re_enqueue
- rescue StandardError => e
- Gitlab::ErrorTracking.track_exception(e, bulk_import_id: @bulk_import&.id)
-
- @bulk_import&.fail_op
- end
-
- private
-
- def entities
- @entities ||= @bulk_import.entities
- end
-
- def created_entities
- entities.with_status(:created)
- end
-
- def all_entities_processed?
- entities.all? { |entity| entity.finished? || entity.failed? }
- end
-
- def all_entities_failed?
- entities.all?(&:failed?)
- end
-
- # A new BulkImportWorker job is enqueued to either
- # - Process the new BulkImports::Entity created during import (e.g. for the subgroups)
- # - Or to mark the `bulk_import` as finished
- def re_enqueue
- BulkImportWorker.perform_in(PERFORM_DELAY, @bulk_import.id)
- end
-
- def started_entities
- entities.with_status(:started)
- end
-
- def max_batch_size_exceeded?
- started_entities.count >= DEFAULT_BATCH_SIZE
- end
-
- def next_batch_size
- [DEFAULT_BATCH_SIZE - started_entities.count, 0].max
- end
-
- def create_tracker(entity)
- entity.class.transaction do
- entity.pipelines.each do |pipeline|
- status = skip_pipeline?(pipeline, entity) ? :skipped : :created
-
- entity.trackers.create!(
- stage: pipeline[:stage],
- pipeline_name: pipeline[:pipeline],
- status: BulkImports::Tracker.state_machine.states[status].value
- )
- end
- end
- end
-
- def skip_pipeline?(pipeline, entity)
- return false unless entity.source_version.valid?
-
- minimum_version, maximum_version = pipeline.values_at(:minimum_source_version, :maximum_source_version)
-
- if source_version_out_of_range?(minimum_version, maximum_version, entity.source_version.without_patch)
- log_skipped_pipeline(pipeline, entity, minimum_version, maximum_version)
- return true
- end
-
- false
- end
-
- def source_version_out_of_range?(minimum_version, maximum_version, non_patch_source_version)
- (minimum_version && non_patch_source_version < Gitlab::VersionInfo.parse(minimum_version)) ||
- (maximum_version && non_patch_source_version > Gitlab::VersionInfo.parse(maximum_version))
- end
-
- def log_skipped_pipeline(pipeline, entity, minimum_version, maximum_version)
- logger.info(
- message: 'Pipeline skipped as source instance version not compatible with pipeline',
- bulk_import_entity_id: entity.id,
- bulk_import_id: entity.bulk_import_id,
- bulk_import_entity_type: entity.source_type,
- source_full_path: entity.source_full_path,
- pipeline_name: pipeline[:pipeline],
- minimum_source_version: minimum_version,
- maximum_source_version: maximum_version,
- source_version: entity.source_version.to_s,
- importer: 'gitlab_migration'
- )
- end
+ bulk_import = BulkImport.find_by_id(bulk_import_id)
+ return unless bulk_import
- def logger
- @logger ||= Gitlab::Import::Logger.build
+ BulkImports::ProcessService.new(bulk_import).execute
end
end
diff --git a/app/workers/bulk_imports/entity_worker.rb b/app/workers/bulk_imports/entity_worker.rb
index fb99d63d06e..9b60dcdeb8a 100644
--- a/app/workers/bulk_imports/entity_worker.rb
+++ b/app/workers/bulk_imports/entity_worker.rb
@@ -1,97 +1,68 @@
# frozen_string_literal: true
module BulkImports
- class EntityWorker # rubocop:disable Scalability/IdempotentWorker
+ class EntityWorker
include ApplicationWorker
idempotent!
- deduplicate :until_executing
+ deduplicate :until_executed
data_consistency :always
feature_category :importers
sidekiq_options retry: false, dead: false
worker_has_external_dependencies!
- def perform(entity_id, current_stage = nil)
+ PERFORM_DELAY = 5.seconds
+
+ # Keep `_current_stage` parameter for backwards compatibility.
+ # The parameter will be remove in https://gitlab.com/gitlab-org/gitlab/-/issues/426311
+ def perform(entity_id, _current_stage = nil)
@entity = ::BulkImports::Entity.find(entity_id)
- if stage_running?(entity_id, current_stage)
- logger.info(
- structured_payload(
- bulk_import_entity_id: entity_id,
- bulk_import_id: entity.bulk_import_id,
- bulk_import_entity_type: entity.source_type,
- source_full_path: entity.source_full_path,
- current_stage: current_stage,
- message: 'Stage running',
- source_version: source_version,
- importer: 'gitlab_migration'
- )
- )
+ return unless @entity.started?
- return
+ if running_tracker.present?
+ log_info(message: 'Stage running', entity_stage: running_tracker.stage)
+ else
+ start_next_stage
end
- logger.info(
- structured_payload(
- bulk_import_entity_id: entity_id,
- bulk_import_id: entity.bulk_import_id,
- bulk_import_entity_type: entity.source_type,
- source_full_path: entity.source_full_path,
- current_stage: current_stage,
- message: 'Stage starting',
- source_version: source_version,
- importer: 'gitlab_migration'
- )
- )
-
- next_pipeline_trackers_for(entity_id).each do |pipeline_tracker|
- BulkImports::PipelineWorker.perform_async(
- pipeline_tracker.id,
- pipeline_tracker.stage,
- entity_id
- )
- end
+ re_enqueue
rescue StandardError => e
- log_exception(e,
- {
- bulk_import_entity_id: entity_id,
- bulk_import_id: entity.bulk_import_id,
- bulk_import_entity_type: entity.source_type,
- source_full_path: entity.source_full_path,
- current_stage: current_stage,
- message: 'Entity failed',
- source_version: source_version,
- importer: 'gitlab_migration'
- }
- )
-
- Gitlab::ErrorTracking.track_exception(
- e,
- bulk_import_entity_id: entity_id,
- bulk_import_id: entity.bulk_import_id,
- bulk_import_entity_type: entity.source_type,
- source_full_path: entity.source_full_path,
- source_version: source_version,
- importer: 'gitlab_migration'
- )
+ Gitlab::ErrorTracking.track_exception(e, log_params(message: 'Entity failed'))
- entity.fail_op!
+ @entity.fail_op!
end
private
attr_reader :entity
- def stage_running?(entity_id, stage)
- return unless stage
+ def re_enqueue
+ BulkImports::EntityWorker.perform_in(PERFORM_DELAY, entity.id)
+ end
- BulkImports::Tracker.stage_running?(entity_id, stage)
+ def running_tracker
+ @running_tracker ||= BulkImports::Tracker.running_trackers(entity.id).first
end
def next_pipeline_trackers_for(entity_id)
BulkImports::Tracker.next_pipeline_trackers_for(entity_id).update(status_event: 'enqueue')
end
+ def start_next_stage
+ next_pipeline_trackers = next_pipeline_trackers_for(entity.id)
+
+ next_pipeline_trackers.each_with_index do |pipeline_tracker, index|
+ log_info(message: 'Stage starting', entity_stage: pipeline_tracker.stage) if index == 0
+
+ BulkImports::PipelineWorker.perform_async(
+ pipeline_tracker.id,
+ pipeline_tracker.stage,
+ entity.id
+ )
+ end
+ end
+
def source_version
entity.bulk_import.source_version_info.to_s
end
@@ -105,5 +76,22 @@ module BulkImports
logger.error(structured_payload(payload))
end
+
+ def log_info(payload)
+ logger.info(structured_payload(log_params(payload)))
+ end
+
+ def log_params(extra)
+ defaults = {
+ bulk_import_entity_id: entity.id,
+ bulk_import_id: entity.bulk_import_id,
+ bulk_import_entity_type: entity.source_type,
+ source_full_path: entity.source_full_path,
+ source_version: source_version,
+ importer: 'gitlab_migration'
+ }
+
+ defaults.merge(extra)
+ end
end
end
diff --git a/app/workers/bulk_imports/finish_batched_pipeline_worker.rb b/app/workers/bulk_imports/finish_batched_pipeline_worker.rb
index 4200d0e4a0f..b1f3757e058 100644
--- a/app/workers/bulk_imports/finish_batched_pipeline_worker.rb
+++ b/app/workers/bulk_imports/finish_batched_pipeline_worker.rb
@@ -12,6 +12,8 @@ module BulkImports
data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency
feature_category :importers
+ version 2
+
def perform(pipeline_tracker_id)
@tracker = Tracker.find(pipeline_tracker_id)
@@ -27,7 +29,9 @@ module BulkImports
end
ensure
- ::BulkImports::EntityWorker.perform_async(tracker.entity.id, tracker.stage)
+ # This is needed for in-flight migrations.
+ # It will be remove in https://gitlab.com/gitlab-org/gitlab/-/issues/426299
+ ::BulkImports::EntityWorker.perform_async(tracker.entity.id) if job_version.nil?
end
private
@@ -39,7 +43,7 @@ module BulkImports
end
def import_in_progress?
- tracker.batches.any?(&:started?)
+ tracker.batches.any? { |b| b.started? || b.created? }
end
end
end
diff --git a/app/workers/bulk_imports/pipeline_batch_worker.rb b/app/workers/bulk_imports/pipeline_batch_worker.rb
index 634d7ed3c87..6230d517641 100644
--- a/app/workers/bulk_imports/pipeline_batch_worker.rb
+++ b/app/workers/bulk_imports/pipeline_batch_worker.rb
@@ -14,15 +14,16 @@ module BulkImports
def perform(batch_id)
@batch = ::BulkImports::BatchTracker.find(batch_id)
@tracker = @batch.tracker
+ @pending_retry = false
try_obtain_lease { run }
ensure
- ::BulkImports::FinishBatchedPipelineWorker.perform_async(tracker.id)
+ ::BulkImports::FinishBatchedPipelineWorker.perform_async(tracker.id) unless pending_retry
end
private
- attr_reader :batch, :tracker
+ attr_reader :batch, :tracker, :pending_retry
def run
return batch.skip! if tracker.failed? || tracker.finished?
@@ -31,6 +32,7 @@ module BulkImports
tracker.pipeline_class.new(context).run
batch.finish!
rescue BulkImports::RetryPipelineError => e
+ @pending_retry = true
retry_batch(e)
rescue StandardError => e
fail_batch(e)
diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb
index 098e167ac29..24185f43795 100644
--- a/app/workers/bulk_imports/pipeline_worker.rb
+++ b/app/workers/bulk_imports/pipeline_worker.rb
@@ -14,7 +14,10 @@ module BulkImports
deduplicate :until_executing
worker_resource_boundary :memory
- def perform(pipeline_tracker_id, stage, entity_id)
+ version 2
+
+ # Keep _stage parameter for backwards compatibility.
+ def perform(pipeline_tracker_id, _stage, entity_id)
@entity = ::BulkImports::Entity.find(entity_id)
@pipeline_tracker = ::BulkImports::Tracker.find(pipeline_tracker_id)
@@ -32,7 +35,9 @@ module BulkImports
end
end
ensure
- ::BulkImports::EntityWorker.perform_async(entity_id, stage)
+ # This is needed for in-flight migrations.
+ # It will be remove in https://gitlab.com/gitlab-org/gitlab/-/issues/426299
+ ::BulkImports::EntityWorker.perform_async(entity_id) if job_version.nil?
end
private
diff --git a/app/workers/ci/initial_pipeline_process_worker.rb b/app/workers/ci/initial_pipeline_process_worker.rb
index 067dbb7492f..703cae8bf88 100644
--- a/app/workers/ci/initial_pipeline_process_worker.rb
+++ b/app/workers/ci/initial_pipeline_process_worker.rb
@@ -28,6 +28,8 @@ module Ci
private
def create_deployments!(pipeline)
+ return if Feature.enabled?(:create_deployment_only_for_processable_jobs, pipeline.project)
+
pipeline.stages.flat_map(&:statuses).each { |build| create_deployment(build) }
end
diff --git a/app/workers/ci/merge_requests/add_todo_when_build_fails_worker.rb b/app/workers/ci/merge_requests/add_todo_when_build_fails_worker.rb
index 98bb259db0a..8bcbe9d6c9f 100644
--- a/app/workers/ci/merge_requests/add_todo_when_build_fails_worker.rb
+++ b/app/workers/ci/merge_requests/add_todo_when_build_fails_worker.rb
@@ -1,4 +1,5 @@
# frozen_string_literal: true
+
module Ci
module MergeRequests
class AddTodoWhenBuildFailsWorker
diff --git a/app/workers/ci/ref_delete_unlock_artifacts_worker.rb b/app/workers/ci/ref_delete_unlock_artifacts_worker.rb
index aeadf111bfb..e343c0aedd4 100644
--- a/app/workers/ci/ref_delete_unlock_artifacts_worker.rb
+++ b/app/workers/ci/ref_delete_unlock_artifacts_worker.rb
@@ -13,17 +13,21 @@ module Ci
def perform(project_id, user_id, ref_path)
::Project.find_by_id(project_id).try do |project|
- ::User.find_by_id(user_id).try do |user|
+ ::User.find_by_id(user_id).try do |_|
project.ci_refs.find_by_ref_path(ref_path).try do |ci_ref|
- results = ::Ci::UnlockArtifactsService
- .new(project, user)
- .execute(ci_ref)
-
- log_extra_metadata_on_done(:unlocked_pipelines, results[:unlocked_pipelines])
- log_extra_metadata_on_done(:unlocked_job_artifacts, results[:unlocked_job_artifacts])
+ enqueue_pipelines_to_unlock(ci_ref)
end
end
end
end
+
+ private
+
+ def enqueue_pipelines_to_unlock(ci_ref)
+ result = ::Ci::Refs::EnqueuePipelinesToUnlockService.new.execute(ci_ref)
+
+ log_extra_metadata_on_done(:total_pending_entries, result[:total_pending_entries])
+ log_extra_metadata_on_done(:total_new_entries, result[:total_new_entries])
+ end
end
end
diff --git a/app/workers/ci/refs/unlock_previous_pipelines_worker.rb b/app/workers/ci/refs/unlock_previous_pipelines_worker.rb
new file mode 100644
index 00000000000..bf595590cb1
--- /dev/null
+++ b/app/workers/ci/refs/unlock_previous_pipelines_worker.rb
@@ -0,0 +1,26 @@
+# frozen_string_literal: true
+
+module Ci
+ module Refs
+ class UnlockPreviousPipelinesWorker
+ include ApplicationWorker
+
+ data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency
+
+ sidekiq_options retry: 3
+ include PipelineBackgroundQueue
+
+ idempotent!
+
+ def perform(ref_id)
+ ::Ci::Ref.find_by_id(ref_id).try do |ref|
+ pipeline = ref.last_finished_pipeline
+ result = ::Ci::Refs::EnqueuePipelinesToUnlockService.new.execute(ref, before_pipeline: pipeline)
+
+ log_extra_metadata_on_done(:total_pending_entries, result[:total_pending_entries])
+ log_extra_metadata_on_done(:total_new_entries, result[:total_new_entries])
+ end
+ end
+ end
+ end
+end
diff --git a/app/workers/ci/schedule_unlock_pipelines_in_queue_cron_worker.rb b/app/workers/ci/schedule_unlock_pipelines_in_queue_cron_worker.rb
new file mode 100644
index 00000000000..1a593326120
--- /dev/null
+++ b/app/workers/ci/schedule_unlock_pipelines_in_queue_cron_worker.rb
@@ -0,0 +1,21 @@
+# frozen_string_literal: true
+
+module Ci
+ class ScheduleUnlockPipelinesInQueueCronWorker
+ include ApplicationWorker
+
+ data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency
+
+ # rubocop:disable Scalability/CronWorkerContext
+ # This worker does not perform work scoped to a context
+ include CronjobQueue
+ # rubocop:enable Scalability/CronWorkerContext
+
+ feature_category :build_artifacts
+ idempotent!
+
+ def perform(...)
+ Ci::UnlockPipelinesInQueueWorker.perform_with_capacity(...)
+ end
+ end
+end
diff --git a/app/workers/ci/unlock_pipelines_in_queue_worker.rb b/app/workers/ci/unlock_pipelines_in_queue_worker.rb
new file mode 100644
index 00000000000..de579504711
--- /dev/null
+++ b/app/workers/ci/unlock_pipelines_in_queue_worker.rb
@@ -0,0 +1,55 @@
+# frozen_string_literal: true
+
+module Ci
+ class UnlockPipelinesInQueueWorker
+ include ApplicationWorker
+
+ data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency
+
+ include LimitedCapacity::Worker
+
+ feature_category :build_artifacts
+ idempotent!
+
+ MAX_RUNNING_LOW = 50
+ MAX_RUNNING_MEDIUM = 500
+ MAX_RUNNING_HIGH = 1500
+
+ def perform_work(*_)
+ pipeline_id, enqueue_timestamp = Ci::UnlockPipelineRequest.next!
+ return log_extra_metadata_on_done(:remaining_pending, 0) unless pipeline_id
+
+ Ci::Pipeline.find_by_id(pipeline_id).try do |pipeline|
+ log_extra_metadata_on_done(:pipeline_id, pipeline.id)
+ log_extra_metadata_on_done(:project, pipeline.project.full_path)
+
+ result = Ci::UnlockPipelineService.new(pipeline).execute
+
+ log_extra_metadata_on_done(:unlock_wait_time, Time.current.utc.to_i - enqueue_timestamp)
+ log_extra_metadata_on_done(:remaining_pending, Ci::UnlockPipelineRequest.total_pending)
+ log_extra_metadata_on_done(:skipped_already_leased, result[:skipped_already_leased])
+ log_extra_metadata_on_done(:skipped_already_unlocked, result[:skipped_already_unlocked])
+ log_extra_metadata_on_done(:exec_timeout, result[:exec_timeout])
+ log_extra_metadata_on_done(:unlocked_job_artifacts, result[:unlocked_job_artifacts])
+ log_extra_metadata_on_done(:unlocked_pipeline_artifacts, result[:unlocked_pipeline_artifacts])
+ end
+ end
+
+ def remaining_work_count(*_)
+ Ci::UnlockPipelineRequest.total_pending
+ end
+
+ def max_running_jobs
+ if ::Feature.enabled?(:ci_unlock_pipelines_high, type: :ops)
+ MAX_RUNNING_HIGH
+ elsif ::Feature.enabled?(:ci_unlock_pipelines_medium, type: :ops)
+ MAX_RUNNING_MEDIUM
+ elsif ::Feature.enabled?(:ci_unlock_pipelines, type: :ops)
+ # This is the default enabled flag
+ MAX_RUNNING_LOW
+ else
+ 0
+ end
+ end
+ end
+end
diff --git a/app/workers/click_house/events_sync_worker.rb b/app/workers/click_house/events_sync_worker.rb
index 5b7398cb071..e884a43b1e3 100644
--- a/app/workers/click_house/events_sync_worker.rb
+++ b/app/workers/click_house/events_sync_worker.rb
@@ -6,6 +6,7 @@ module ClickHouse
include Gitlab::ExclusiveLeaseHelpers
idempotent!
+ queue_namespace :cronjob
data_consistency :delayed
worker_has_external_dependencies! # the worker interacts with a ClickHouse database
feature_category :value_stream_management
diff --git a/app/workers/concerns/auto_devops_queue.rb b/app/workers/concerns/auto_devops_queue.rb
index 61e3c1544bd..cdf429a8be5 100644
--- a/app/workers/concerns/auto_devops_queue.rb
+++ b/app/workers/concerns/auto_devops_queue.rb
@@ -1,5 +1,5 @@
# frozen_string_literal: true
-#
+
module AutoDevopsQueue
extend ActiveSupport::Concern
diff --git a/app/workers/concerns/chaos_queue.rb b/app/workers/concerns/chaos_queue.rb
index 23e58b5182b..9a3d518dda8 100644
--- a/app/workers/concerns/chaos_queue.rb
+++ b/app/workers/concerns/chaos_queue.rb
@@ -1,5 +1,5 @@
# frozen_string_literal: true
-#
+
module ChaosQueue
extend ActiveSupport::Concern
diff --git a/app/workers/concerns/gitlab/github_import/object_importer.rb b/app/workers/concerns/gitlab/github_import/object_importer.rb
index e190ced5073..fcc7a96fa2b 100644
--- a/app/workers/concerns/gitlab/github_import/object_importer.rb
+++ b/app/workers/concerns/gitlab/github_import/object_importer.rb
@@ -10,7 +10,6 @@ module Gitlab
included do
include ApplicationWorker
- sidekiq_options retry: 3
include GithubImport::Queue
include ReschedulingMethods
@@ -19,11 +18,8 @@ module Gitlab
sidekiq_retries_exhausted do |msg|
args = msg['args']
- correlation_id = msg['correlation_id']
jid = msg['jid']
- new.perform_failure(args[0], args[1], correlation_id)
-
# If a job is being exhausted we still want to notify the
# Gitlab::Import::AdvanceStageWorker to prevent the entire import from getting stuck
if args.length == 3 && (key = args.last) && key.is_a?(String)
@@ -64,29 +60,15 @@ module Gitlab
rescue NoMethodError => e
# This exception will be more useful in development when a new
# Representation is created but the developer forgot to add a
- # `:github_identifiers` field.
+ # `#github_identifiers` method.
track_and_raise_exception(project, e, fail_import: true)
rescue ActiveRecord::RecordInvalid, NotRetriableError => e
# We do not raise exception to prevent job retry
- failure = track_exception(project, e)
- add_identifiers_to_failure(failure, object.github_identifiers)
+ track_exception(project, e)
rescue StandardError => e
track_and_raise_exception(project, e)
end
- # hash - A Hash containing the details of the object to import.
- def perform_failure(project_id, hash, correlation_id)
- project = Project.find_by_id(project_id)
- return unless project
-
- failure = project.import_failures.failures_by_correlation_id(correlation_id).first
- return unless failure
-
- object = representation_class.from_json_hash(hash)
-
- add_identifiers_to_failure(failure, object.github_identifiers)
- end
-
def increment_object_counter?(_object)
true
end
@@ -118,16 +100,20 @@ module Gitlab
extra.merge(
project_id: project_id,
importer: importer_class.name,
- github_identifiers: github_identifiers
+ external_identifiers: github_identifiers
)
end
def track_exception(project, exception, fail_import: false)
+ external_identifiers = github_identifiers || {}
+ external_identifiers[:object_type] ||= object_type&.to_s
+
Gitlab::Import::ImportFailureService.track(
project_id: project.id,
error_source: importer_class.name,
exception: exception,
- fail_import: fail_import
+ fail_import: fail_import,
+ external_identifiers: external_identifiers
)
end
@@ -136,12 +122,6 @@ module Gitlab
raise(exception)
end
-
- def add_identifiers_to_failure(failure, external_identifiers)
- external_identifiers[:object_type] = object_type
-
- failure.update_column(:external_identifiers, external_identifiers)
- end
end
end
end
diff --git a/app/workers/concerns/gitlab/github_import/queue.rb b/app/workers/concerns/gitlab/github_import/queue.rb
index e7156ac12f8..7cc23dd7c0b 100644
--- a/app/workers/concerns/gitlab/github_import/queue.rb
+++ b/app/workers/concerns/gitlab/github_import/queue.rb
@@ -15,14 +15,6 @@ module Gitlab
# this is better than a project being stuck in the "import" state
# forever.
sidekiq_options dead: false, retry: 5
-
- sidekiq_retries_exhausted do |msg, e|
- Gitlab::Import::ImportFailureService.track(
- project_id: msg['args'][0],
- exception: e,
- fail_import: true
- )
- end
end
end
end
diff --git a/app/workers/concerns/gitlab/github_import/stage_methods.rb b/app/workers/concerns/gitlab/github_import/stage_methods.rb
index 75db5589415..80013ff3cd9 100644
--- a/app/workers/concerns/gitlab/github_import/stage_methods.rb
+++ b/app/workers/concerns/gitlab/github_import/stage_methods.rb
@@ -3,6 +3,21 @@
module Gitlab
module GithubImport
module StageMethods
+ extend ActiveSupport::Concern
+
+ included do
+ include ApplicationWorker
+
+ sidekiq_retries_exhausted do |msg, e|
+ Gitlab::Import::ImportFailureService.track(
+ project_id: msg['args'][0],
+ exception: e,
+ error_source: self.class.name,
+ fail_import: true
+ )
+ end
+ end
+
# project_id - The ID of the GitLab project to import the data into.
def perform(project_id)
info(project_id, message: 'starting stage')
@@ -29,7 +44,8 @@ module Gitlab
project_id: project_id,
exception: e,
error_source: self.class.name,
- fail_import: abort_on_failure
+ fail_import: false,
+ metrics: true
)
raise(e)
@@ -51,10 +67,6 @@ module Gitlab
# rubocop: enable CodeReuse/ActiveRecord
end
- def abort_on_failure
- false
- end
-
private
def info(project_id, extra = {})
diff --git a/app/workers/concerns/limited_capacity/job_tracker.rb b/app/workers/concerns/limited_capacity/job_tracker.rb
index 4b5ce8a01f6..b4d884f914d 100644
--- a/app/workers/concerns/limited_capacity/job_tracker.rb
+++ b/app/workers/concerns/limited_capacity/job_tracker.rb
@@ -1,4 +1,5 @@
# frozen_string_literal: true
+
module LimitedCapacity
class JobTracker # rubocop:disable Scalability/IdempotentWorker
include Gitlab::Utils::StrongMemoize
diff --git a/app/workers/concerns/limited_capacity/worker.rb b/app/workers/concerns/limited_capacity/worker.rb
index af66d80b3e9..0a79c5c46d5 100644
--- a/app/workers/concerns/limited_capacity/worker.rb
+++ b/app/workers/concerns/limited_capacity/worker.rb
@@ -1,41 +1,5 @@
# frozen_string_literal: true
-# Usage:
-#
-# Worker that performs the tasks:
-#
-# class DummyWorker
-# include ApplicationWorker
-# include LimitedCapacity::Worker
-#
-# # For each job that raises any error, a worker instance will be disabled
-# # until the next schedule-run.
-# # If you wish to get around this, exceptions must by handled by the implementer.
-# #
-# def perform_work(*args)
-# end
-#
-# def remaining_work_count(*args)
-# 5
-# end
-#
-# def max_running_jobs
-# 25
-# end
-# end
-#
-# Cron worker to fill the pool of regular workers:
-#
-# class ScheduleDummyCronWorker
-# include ApplicationWorker
-# include CronjobQueue
-#
-# def perform(*args)
-# DummyWorker.perform_with_capacity(*args)
-# end
-# end
-#
-
module LimitedCapacity
module Worker
extend ActiveSupport::Concern
diff --git a/app/workers/concerns/worker_attributes.rb b/app/workers/concerns/worker_attributes.rb
index 02eda924b71..cb09aaf1a6a 100644
--- a/app/workers/concerns/worker_attributes.rb
+++ b/app/workers/concerns/worker_attributes.rb
@@ -155,6 +155,10 @@ module WorkerAttributes
::Gitlab::SidekiqMiddleware::PauseControl::WorkersMap.set_strategy_for(strategy: value, worker: self)
end
+ def get_pause_control
+ ::Gitlab::SidekiqMiddleware::PauseControl::WorkersMap.strategy_for(worker: self)
+ end
+
def get_weight
get_class_attribute(:weight) ||
NAMESPACE_WEIGHTS[queue_namespace] ||
diff --git a/app/workers/database/batched_background_migration/ci_database_worker.rb b/app/workers/database/batched_background_migration/ci_database_worker.rb
index 58b0f5496f4..417af4c7172 100644
--- a/app/workers/database/batched_background_migration/ci_database_worker.rb
+++ b/app/workers/database/batched_background_migration/ci_database_worker.rb
@@ -1,4 +1,5 @@
# frozen_string_literal: true
+
module Database
module BatchedBackgroundMigration
class CiDatabaseWorker # rubocop:disable Scalability/IdempotentWorker
diff --git a/app/workers/delete_container_repository_worker.rb b/app/workers/delete_container_repository_worker.rb
deleted file mode 100644
index d0552dce9fc..00000000000
--- a/app/workers/delete_container_repository_worker.rb
+++ /dev/null
@@ -1,15 +0,0 @@
-# frozen_string_literal: true
-
-class DeleteContainerRepositoryWorker # rubocop:disable Scalability/IdempotentWorker
- include ApplicationWorker
- include ExclusiveLeaseGuard
-
- data_consistency :always
-
- sidekiq_options retry: 3
-
- queue_namespace :container_repository
- feature_category :container_registry
-
- def perform(current_user_id, container_repository_id); end
-end
diff --git a/app/workers/environments/stop_job_failed_worker.rb b/app/workers/environments/stop_job_failed_worker.rb
new file mode 100644
index 00000000000..c04601e0428
--- /dev/null
+++ b/app/workers/environments/stop_job_failed_worker.rb
@@ -0,0 +1,25 @@
+# frozen_string_literal: true
+
+module Environments
+ class StopJobFailedWorker
+ include ApplicationWorker
+
+ data_consistency :delayed
+ idempotent!
+ feature_category :continuous_delivery
+
+ def perform(job_id, _params = {})
+ Ci::Processable.find_by_id(job_id).try do |job|
+ revert_environment(job) if job.stops_environment? && job.failed?
+ end
+ end
+
+ private
+
+ def revert_environment(job)
+ return if job.persisted_environment.nil?
+
+ job.persisted_environment.fire_state_event(:recover_stuck_stopping)
+ end
+ end
+end
diff --git a/app/workers/gitlab/bitbucket_import/advance_stage_worker.rb b/app/workers/gitlab/bitbucket_import/advance_stage_worker.rb
index 7f281352a1b..ed89f332652 100644
--- a/app/workers/gitlab/bitbucket_import/advance_stage_worker.rb
+++ b/app/workers/gitlab/bitbucket_import/advance_stage_worker.rb
@@ -20,13 +20,23 @@ module Gitlab
# The known importer stages and their corresponding Sidekiq workers.
STAGES = {
+ repository: Stage::ImportRepositoryWorker,
+ pull_requests: Stage::ImportPullRequestsWorker,
+ pull_requests_notes: Stage::ImportPullRequestsNotesWorker,
+ issues: Stage::ImportIssuesWorker,
+ issues_notes: Stage::ImportIssuesNotesWorker,
+ lfs_objects: Stage::ImportLfsObjectsWorker,
finish: Stage::FinishImportWorker
}.freeze
- def find_import_state(project_id)
+ def find_import_state_jid(project_id)
ProjectImportState.jid_by(project_id: project_id, status: :started)
end
+ def find_import_state(id)
+ ProjectImportState.find(id)
+ end
+
private
def next_stage_worker(next_stage)
diff --git a/app/workers/gitlab/bitbucket_import/import_issue_notes_worker.rb b/app/workers/gitlab/bitbucket_import/import_issue_notes_worker.rb
new file mode 100644
index 00000000000..de8239f30d9
--- /dev/null
+++ b/app/workers/gitlab/bitbucket_import/import_issue_notes_worker.rb
@@ -0,0 +1,13 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module BitbucketImport
+ class ImportIssueNotesWorker # rubocop:disable Scalability/IdempotentWorker
+ include ObjectImporter
+
+ def importer_class
+ Importers::IssueNotesImporter
+ end
+ end
+ end
+end
diff --git a/app/workers/gitlab/bitbucket_import/import_issue_worker.rb b/app/workers/gitlab/bitbucket_import/import_issue_worker.rb
new file mode 100644
index 00000000000..7df3f6d4a62
--- /dev/null
+++ b/app/workers/gitlab/bitbucket_import/import_issue_worker.rb
@@ -0,0 +1,13 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module BitbucketImport
+ class ImportIssueWorker # rubocop:disable Scalability/IdempotentWorker
+ include ObjectImporter
+
+ def importer_class
+ Importers::IssueImporter
+ end
+ end
+ end
+end
diff --git a/app/workers/gitlab/bitbucket_import/import_lfs_object_worker.rb b/app/workers/gitlab/bitbucket_import/import_lfs_object_worker.rb
new file mode 100644
index 00000000000..39b66684026
--- /dev/null
+++ b/app/workers/gitlab/bitbucket_import/import_lfs_object_worker.rb
@@ -0,0 +1,13 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module BitbucketImport
+ class ImportLfsObjectWorker # rubocop:disable Scalability/IdempotentWorker
+ include ObjectImporter
+
+ def importer_class
+ Importers::LfsObjectImporter
+ end
+ end
+ end
+end
diff --git a/app/workers/gitlab/bitbucket_import/import_pull_request_notes_worker.rb b/app/workers/gitlab/bitbucket_import/import_pull_request_notes_worker.rb
new file mode 100644
index 00000000000..8c9f84c97a5
--- /dev/null
+++ b/app/workers/gitlab/bitbucket_import/import_pull_request_notes_worker.rb
@@ -0,0 +1,13 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module BitbucketImport
+ class ImportPullRequestNotesWorker # rubocop:disable Scalability/IdempotentWorker
+ include ObjectImporter
+
+ def importer_class
+ Importers::PullRequestNotesImporter
+ end
+ end
+ end
+end
diff --git a/app/workers/gitlab/bitbucket_import/stage/import_issues_notes_worker.rb b/app/workers/gitlab/bitbucket_import/stage/import_issues_notes_worker.rb
new file mode 100644
index 00000000000..cbd67099086
--- /dev/null
+++ b/app/workers/gitlab/bitbucket_import/stage/import_issues_notes_worker.rb
@@ -0,0 +1,30 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module BitbucketImport
+ module Stage
+ class ImportIssuesNotesWorker # rubocop:disable Scalability/IdempotentWorker
+ include StageMethods
+
+ private
+
+ # project - An instance of Project.
+ def import(project)
+ waiter = importer_class.new(project).execute
+
+ project.import_state.refresh_jid_expiration
+
+ AdvanceStageWorker.perform_async(
+ project.id,
+ { waiter.key => waiter.jobs_remaining },
+ :lfs_objects
+ )
+ end
+
+ def importer_class
+ Importers::IssuesNotesImporter
+ end
+ end
+ end
+ end
+end
diff --git a/app/workers/gitlab/bitbucket_import/stage/import_issues_worker.rb b/app/workers/gitlab/bitbucket_import/stage/import_issues_worker.rb
new file mode 100644
index 00000000000..31a11d802c7
--- /dev/null
+++ b/app/workers/gitlab/bitbucket_import/stage/import_issues_worker.rb
@@ -0,0 +1,30 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module BitbucketImport
+ module Stage
+ class ImportIssuesWorker # rubocop:disable Scalability/IdempotentWorker
+ include StageMethods
+
+ private
+
+ # project - An instance of Project.
+ def import(project)
+ waiter = importer_class.new(project).execute
+
+ project.import_state.refresh_jid_expiration
+
+ AdvanceStageWorker.perform_async(
+ project.id,
+ { waiter.key => waiter.jobs_remaining },
+ :issues_notes
+ )
+ end
+
+ def importer_class
+ Importers::IssuesImporter
+ end
+ end
+ end
+ end
+end
diff --git a/app/workers/gitlab/bitbucket_import/stage/import_lfs_objects_worker.rb b/app/workers/gitlab/bitbucket_import/stage/import_lfs_objects_worker.rb
new file mode 100644
index 00000000000..c88a1be3446
--- /dev/null
+++ b/app/workers/gitlab/bitbucket_import/stage/import_lfs_objects_worker.rb
@@ -0,0 +1,30 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module BitbucketImport
+ module Stage
+ class ImportLfsObjectsWorker # rubocop:disable Scalability/IdempotentWorker
+ include StageMethods
+
+ private
+
+ # project - An instance of Project.
+ def import(project)
+ waiter = importer_class.new(project).execute
+
+ project.import_state.refresh_jid_expiration
+
+ AdvanceStageWorker.perform_async(
+ project.id,
+ { waiter.key => waiter.jobs_remaining },
+ :finish
+ )
+ end
+
+ def importer_class
+ Importers::LfsObjectsImporter
+ end
+ end
+ end
+ end
+end
diff --git a/app/workers/gitlab/bitbucket_import/stage/import_pull_requests_notes_worker.rb b/app/workers/gitlab/bitbucket_import/stage/import_pull_requests_notes_worker.rb
new file mode 100644
index 00000000000..36d60c7246c
--- /dev/null
+++ b/app/workers/gitlab/bitbucket_import/stage/import_pull_requests_notes_worker.rb
@@ -0,0 +1,30 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module BitbucketImport
+ module Stage
+ class ImportPullRequestsNotesWorker # rubocop:disable Scalability/IdempotentWorker
+ include StageMethods
+
+ private
+
+ # project - An instance of Project.
+ def import(project)
+ waiter = importer_class.new(project).execute
+
+ project.import_state.refresh_jid_expiration
+
+ AdvanceStageWorker.perform_async(
+ project.id,
+ { waiter.key => waiter.jobs_remaining },
+ :issues
+ )
+ end
+
+ def importer_class
+ Importers::PullRequestsNotesImporter
+ end
+ end
+ end
+ end
+end
diff --git a/app/workers/gitlab/bitbucket_import/stage/import_pull_requests_worker.rb b/app/workers/gitlab/bitbucket_import/stage/import_pull_requests_worker.rb
index e1f3b5ab79a..3f85c832d50 100644
--- a/app/workers/gitlab/bitbucket_import/stage/import_pull_requests_worker.rb
+++ b/app/workers/gitlab/bitbucket_import/stage/import_pull_requests_worker.rb
@@ -17,7 +17,7 @@ module Gitlab
AdvanceStageWorker.perform_async(
project.id,
{ waiter.key => waiter.jobs_remaining },
- :finish
+ :pull_requests_notes
)
end
diff --git a/app/workers/gitlab/bitbucket_server_import/advance_stage_worker.rb b/app/workers/gitlab/bitbucket_server_import/advance_stage_worker.rb
index 2c8db639725..1fc35725c9f 100644
--- a/app/workers/gitlab/bitbucket_server_import/advance_stage_worker.rb
+++ b/app/workers/gitlab/bitbucket_server_import/advance_stage_worker.rb
@@ -25,10 +25,14 @@ module Gitlab
finish: Stage::FinishImportWorker
}.freeze
- def find_import_state(project_id)
+ def find_import_state_jid(project_id)
ProjectImportState.jid_by(project_id: project_id, status: :started)
end
+ def find_import_state(id)
+ ProjectImportState.find(id)
+ end
+
private
def next_stage_worker(next_stage)
diff --git a/app/workers/gitlab/github_gists_import/import_gist_worker.rb b/app/workers/gitlab/github_gists_import/import_gist_worker.rb
index 60e4c8fdad6..151788150dd 100644
--- a/app/workers/gitlab/github_gists_import/import_gist_worker.rb
+++ b/app/workers/gitlab/github_gists_import/import_gist_worker.rb
@@ -106,9 +106,9 @@ module Gitlab
def error(user_id, error_message, github_identifiers)
attributes = {
user_id: user_id,
- github_identifiers: github_identifiers,
+ external_identifiers: github_identifiers,
message: 'importer failed',
- 'error.message': error_message
+ 'exception.message': error_message
}
Gitlab::GithubImport::Logger.error(structured_payload(attributes))
@@ -120,7 +120,7 @@ module Gitlab
attributes = {
user_id: user_id,
message: message,
- github_identifiers: gist_id
+ external_identifiers: gist_id
}
Gitlab::GithubImport::Logger.info(structured_payload(attributes))
diff --git a/app/workers/gitlab/github_gists_import/start_import_worker.rb b/app/workers/gitlab/github_gists_import/start_import_worker.rb
index 33c91611719..f7d3eb1d759 100644
--- a/app/workers/gitlab/github_gists_import/start_import_worker.rb
+++ b/app/workers/gitlab/github_gists_import/start_import_worker.rb
@@ -51,7 +51,7 @@ module Gitlab
end
def log_error_and_raise!(user_id, error)
- logger.error(structured_payload(user_id: user_id, message: 'import failed', 'error.message': error.message))
+ logger.error(structured_payload(user_id: user_id, message: 'import failed', 'exception.message': error.message))
raise(error)
end
diff --git a/app/workers/gitlab/github_import/advance_stage_worker.rb b/app/workers/gitlab/github_import/advance_stage_worker.rb
index 45f4bf486d7..a012241e90c 100644
--- a/app/workers/gitlab/github_import/advance_stage_worker.rb
+++ b/app/workers/gitlab/github_import/advance_stage_worker.rb
@@ -33,10 +33,14 @@ module Gitlab
finish: Stage::FinishImportWorker
}.freeze
- def find_import_state(project_id)
+ def find_import_state_jid(project_id)
ProjectImportState.jid_by(project_id: project_id, status: :started)
end
+ def find_import_state(id)
+ ProjectImportState.find(id)
+ end
+
private
def next_stage_worker(next_stage)
diff --git a/app/workers/gitlab/github_import/refresh_import_jid_worker.rb b/app/workers/gitlab/github_import/refresh_import_jid_worker.rb
index 2b9fb26d53a..3de4bef053f 100644
--- a/app/workers/gitlab/github_import/refresh_import_jid_worker.rb
+++ b/app/workers/gitlab/github_import/refresh_import_jid_worker.rb
@@ -7,7 +7,6 @@ module Gitlab
data_consistency :always
- sidekiq_options retry: 3
include GithubImport::Queue
# The interval to schedule new instances of this job at.
diff --git a/app/workers/gitlab/github_import/stage/finish_import_worker.rb b/app/workers/gitlab/github_import/stage/finish_import_worker.rb
index e716eda5c99..90445a6d46c 100644
--- a/app/workers/gitlab/github_import/stage/finish_import_worker.rb
+++ b/app/workers/gitlab/github_import/stage/finish_import_worker.rb
@@ -8,7 +8,6 @@ module Gitlab
data_consistency :always
- sidekiq_options retry: 3
include GithubImport::Queue
include StageMethods
diff --git a/app/workers/gitlab/github_import/stage/import_attachments_worker.rb b/app/workers/gitlab/github_import/stage/import_attachments_worker.rb
index 4045852e3f0..f9952f04e99 100644
--- a/app/workers/gitlab/github_import/stage/import_attachments_worker.rb
+++ b/app/workers/gitlab/github_import/stage/import_attachments_worker.rb
@@ -8,7 +8,6 @@ module Gitlab
data_consistency :always
- sidekiq_options retry: 5
include GithubImport::Queue
include StageMethods
diff --git a/app/workers/gitlab/github_import/stage/import_base_data_worker.rb b/app/workers/gitlab/github_import/stage/import_base_data_worker.rb
index cc6a2255160..94cb3cb6c71 100644
--- a/app/workers/gitlab/github_import/stage/import_base_data_worker.rb
+++ b/app/workers/gitlab/github_import/stage/import_base_data_worker.rb
@@ -8,7 +8,6 @@ module Gitlab
data_consistency :always
- sidekiq_options retry: 3
include GithubImport::Queue
include StageMethods
@@ -31,22 +30,6 @@ module Gitlab
project.import_state.refresh_jid_expiration
ImportPullRequestsWorker.perform_async(project.id)
- rescue StandardError => e
- Gitlab::Import::ImportFailureService.track(
- project_id: project.id,
- error_source: self.class.name,
- exception: e,
- fail_import: abort_on_failure,
- metrics: true
- )
-
- raise(e)
- end
-
- private
-
- def abort_on_failure
- true
end
end
end
diff --git a/app/workers/gitlab/github_import/stage/import_collaborators_worker.rb b/app/workers/gitlab/github_import/stage/import_collaborators_worker.rb
index 8f72cc051b3..751ca92388a 100644
--- a/app/workers/gitlab/github_import/stage/import_collaborators_worker.rb
+++ b/app/workers/gitlab/github_import/stage/import_collaborators_worker.rb
@@ -8,7 +8,6 @@ module Gitlab
data_consistency :always
- sidekiq_options retry: 3
include GithubImport::Queue
include StageMethods
@@ -24,16 +23,6 @@ module Gitlab
project.import_state.refresh_jid_expiration
move_to_next_stage(project, { waiter.key => waiter.jobs_remaining })
- rescue StandardError => e
- Gitlab::Import::ImportFailureService.track(
- project_id: project.id,
- error_source: self.class.name,
- exception: e,
- fail_import: abort_on_failure,
- metrics: true
- )
-
- raise(e)
end
private
@@ -58,10 +47,6 @@ module Gitlab
project.id, waiters, :pull_requests_merged_by
)
end
-
- def abort_on_failure
- true
- end
end
end
end
diff --git a/app/workers/gitlab/github_import/stage/import_issue_events_worker.rb b/app/workers/gitlab/github_import/stage/import_issue_events_worker.rb
index 54ed4c47e78..c80412d941b 100644
--- a/app/workers/gitlab/github_import/stage/import_issue_events_worker.rb
+++ b/app/workers/gitlab/github_import/stage/import_issue_events_worker.rb
@@ -8,7 +8,6 @@ module Gitlab
data_consistency :always
- sidekiq_options retry: 3
include GithubImport::Queue
include StageMethods
diff --git a/app/workers/gitlab/github_import/stage/import_issues_and_diff_notes_worker.rb b/app/workers/gitlab/github_import/stage/import_issues_and_diff_notes_worker.rb
index 3d1a8437da2..592b789cc94 100644
--- a/app/workers/gitlab/github_import/stage/import_issues_and_diff_notes_worker.rb
+++ b/app/workers/gitlab/github_import/stage/import_issues_and_diff_notes_worker.rb
@@ -8,7 +8,6 @@ module Gitlab
data_consistency :always
- sidekiq_options retry: 3
include GithubImport::Queue
include StageMethods
diff --git a/app/workers/gitlab/github_import/stage/import_lfs_objects_worker.rb b/app/workers/gitlab/github_import/stage/import_lfs_objects_worker.rb
index f6f5687130f..e89a850c991 100644
--- a/app/workers/gitlab/github_import/stage/import_lfs_objects_worker.rb
+++ b/app/workers/gitlab/github_import/stage/import_lfs_objects_worker.rb
@@ -8,7 +8,6 @@ module Gitlab
data_consistency :always
- sidekiq_options retry: 3
include GithubImport::Queue
include StageMethods
diff --git a/app/workers/gitlab/github_import/stage/import_notes_worker.rb b/app/workers/gitlab/github_import/stage/import_notes_worker.rb
index 40ca12b130f..c1fdb76d03e 100644
--- a/app/workers/gitlab/github_import/stage/import_notes_worker.rb
+++ b/app/workers/gitlab/github_import/stage/import_notes_worker.rb
@@ -8,7 +8,6 @@ module Gitlab
data_consistency :always
- sidekiq_options retry: 3
include GithubImport::Queue
include StageMethods
diff --git a/app/workers/gitlab/github_import/stage/import_protected_branches_worker.rb b/app/workers/gitlab/github_import/stage/import_protected_branches_worker.rb
index 73f4ea580c4..f8448094c28 100644
--- a/app/workers/gitlab/github_import/stage/import_protected_branches_worker.rb
+++ b/app/workers/gitlab/github_import/stage/import_protected_branches_worker.rb
@@ -8,7 +8,6 @@ module Gitlab
data_consistency :always
- sidekiq_options retry: 3
include GithubImport::Queue
include StageMethods
@@ -27,15 +26,6 @@ module Gitlab
{ waiter.key => waiter.jobs_remaining },
:lfs_objects
)
- rescue StandardError => e
- Gitlab::Import::ImportFailureService.track(
- project_id: project.id,
- error_source: self.class.name,
- exception: e,
- metrics: true
- )
-
- raise(e)
end
end
end
diff --git a/app/workers/gitlab/github_import/stage/import_pull_requests_merged_by_worker.rb b/app/workers/gitlab/github_import/stage/import_pull_requests_merged_by_worker.rb
index 329bf8f84b1..2e7cd28578f 100644
--- a/app/workers/gitlab/github_import/stage/import_pull_requests_merged_by_worker.rb
+++ b/app/workers/gitlab/github_import/stage/import_pull_requests_merged_by_worker.rb
@@ -8,7 +8,6 @@ module Gitlab
data_consistency :always
- sidekiq_options retry: 3
include GithubImport::Queue
include StageMethods
diff --git a/app/workers/gitlab/github_import/stage/import_pull_requests_review_requests_worker.rb b/app/workers/gitlab/github_import/stage/import_pull_requests_review_requests_worker.rb
index bcbf5dd471a..2f860349e25 100644
--- a/app/workers/gitlab/github_import/stage/import_pull_requests_review_requests_worker.rb
+++ b/app/workers/gitlab/github_import/stage/import_pull_requests_review_requests_worker.rb
@@ -8,7 +8,6 @@ module Gitlab
data_consistency :always
- sidekiq_options retry: 3
include GithubImport::Queue
include StageMethods
diff --git a/app/workers/gitlab/github_import/stage/import_pull_requests_reviews_worker.rb b/app/workers/gitlab/github_import/stage/import_pull_requests_reviews_worker.rb
index 33dee47bd03..51730033133 100644
--- a/app/workers/gitlab/github_import/stage/import_pull_requests_reviews_worker.rb
+++ b/app/workers/gitlab/github_import/stage/import_pull_requests_reviews_worker.rb
@@ -8,7 +8,6 @@ module Gitlab
data_consistency :always
- sidekiq_options retry: 3
include GithubImport::Queue
include StageMethods
diff --git a/app/workers/gitlab/github_import/stage/import_pull_requests_worker.rb b/app/workers/gitlab/github_import/stage/import_pull_requests_worker.rb
index b2dfded0280..029d38d8b93 100644
--- a/app/workers/gitlab/github_import/stage/import_pull_requests_worker.rb
+++ b/app/workers/gitlab/github_import/stage/import_pull_requests_worker.rb
@@ -8,7 +8,6 @@ module Gitlab
data_consistency :always
- sidekiq_options retry: 3
include GithubImport::Queue
include StageMethods
@@ -33,16 +32,6 @@ module Gitlab
{ waiter.key => waiter.jobs_remaining },
:collaborators
)
- rescue StandardError => e
- Gitlab::Import::ImportFailureService.track(
- project_id: project.id,
- error_source: self.class.name,
- exception: e,
- fail_import: abort_on_failure,
- metrics: true
- )
-
- raise(e)
end
private
@@ -57,10 +46,6 @@ module Gitlab
MergeRequest.track_target_project_iid!(project, last_github_pull_request[:number])
end
-
- def abort_on_failure
- true
- end
end
end
end
diff --git a/app/workers/gitlab/github_import/stage/import_repository_worker.rb b/app/workers/gitlab/github_import/stage/import_repository_worker.rb
index d998771b328..2a62930b5ea 100644
--- a/app/workers/gitlab/github_import/stage/import_repository_worker.rb
+++ b/app/workers/gitlab/github_import/stage/import_repository_worker.rb
@@ -8,7 +8,6 @@ module Gitlab
data_consistency :always
- sidekiq_options retry: 3
include GithubImport::Queue
include StageMethods
@@ -34,17 +33,6 @@ module Gitlab
counter.increment
ImportBaseDataWorker.perform_async(project.id)
-
- rescue StandardError => e
- Gitlab::Import::ImportFailureService.track(
- project_id: project.id,
- error_source: self.class.name,
- exception: e,
- fail_import: abort_on_failure,
- metrics: true
- )
-
- raise(e)
end
def counter
@@ -54,10 +42,6 @@ module Gitlab
)
end
- def abort_on_failure
- true
- end
-
private
def allocate_issues_internal_id!(project, client)
diff --git a/app/workers/gitlab/import/advance_stage.rb b/app/workers/gitlab/import/advance_stage.rb
index 5d5abc88388..180c08905ff 100644
--- a/app/workers/gitlab/import/advance_stage.rb
+++ b/app/workers/gitlab/import/advance_stage.rb
@@ -4,6 +4,9 @@ module Gitlab
module Import
module AdvanceStage
INTERVAL = 30.seconds.to_i
+ TIMEOUT_DURATION = 2.hours
+
+ AdvanceStageTimeoutError = Class.new(StandardError)
# The number of seconds to wait (while blocking the thread) before
# continuing to the next waiter.
@@ -14,30 +17,35 @@ module Gitlab
# remaining jobs.
# next_stage - The name of the next stage to start when all jobs have been
# completed.
- def perform(project_id, waiters, next_stage)
- import_state = find_import_state(project_id)
+ # timeout_timer - Time the sidekiq worker was first initiated with the current job_count
+ # previous_job_count - Number of jobs remaining on last invocation of this worker
+ def perform(project_id, waiters, next_stage, timeout_timer = Time.zone.now, previous_job_count = nil)
+ import_state_jid = find_import_state_jid(project_id)
# If the import state is nil the project may have been deleted or the import
# may have failed or been canceled. In this case we tidy up the cache data and no
# longer attempt to advance to the next stage.
- if import_state.nil?
+ if import_state_jid.nil?
clear_waiter_caches(waiters)
return
end
new_waiters = wait_for_jobs(waiters)
+ new_job_count = new_waiters.values.sum
+
+ # Reset the timeout timer as some jobs finished processing
+ if new_job_count != previous_job_count
+ timeout_timer = Time.zone.now
+ previous_job_count = new_job_count
+ end
if new_waiters.empty?
- # We refresh the import JID here so workers importing individual
- # resources (e.g. notes) don't have to do this all the time, reducing
- # the pressure on Redis. We _only_ do this once all jobs are done so
- # we don't get stuck forever if one or more jobs failed to notify the
- # JobWaiter.
- import_state.refresh_jid_expiration
-
- next_stage_worker(next_stage).perform_async(project_id)
+ proceed_to_next_stage(import_state_jid, next_stage, project_id)
+ elsif timeout_reached?(timeout_timer) && new_job_count == previous_job_count
+
+ handle_timeout(import_state_jid, next_stage, project_id, new_waiters, new_job_count)
else
- self.class.perform_in(INTERVAL, project_id, new_waiters, next_stage)
+ self.class.perform_in(INTERVAL, project_id, new_waiters, next_stage, timeout_timer, previous_job_count)
end
end
@@ -55,12 +63,66 @@ module Gitlab
end
end
- def find_import_state(project_id)
+ def find_import_state_jid(project_id)
+ raise NotImplementedError
+ end
+
+ def find_import_state(id)
raise NotImplementedError
end
private
+ def proceed_to_next_stage(import_state_jid, next_stage, project_id)
+ # We refresh the import JID here so workers importing individual
+ # resources (e.g. notes) don't have to do this all the time, reducing
+ # the pressure on Redis. We _only_ do this once all jobs are done so
+ # we don't get stuck forever if one or more jobs failed to notify the
+ # JobWaiter.
+ import_state_jid.refresh_jid_expiration
+
+ next_stage_worker(next_stage).perform_async(project_id)
+ end
+
+ def handle_timeout(import_state_jid, next_stage, project_id, new_waiters, new_job_count)
+ project = Project.find_by_id(project_id)
+ strategy = project.import_data&.data&.dig("timeout_strategy") || ProjectImportData::PESSIMISTIC_TIMEOUT
+
+ Gitlab::Import::Logger.info(
+ message: 'Timeout reached, no longer retrying',
+ project_id: project_id,
+ jobs_remaining: new_job_count,
+ waiters: new_waiters,
+ timeout_strategy: strategy
+ )
+
+ clear_waiter_caches(new_waiters)
+
+ case strategy
+ when ProjectImportData::OPTIMISTIC_TIMEOUT
+ proceed_to_next_stage(import_state_jid, next_stage, project_id)
+ when ProjectImportData::PESSIMISTIC_TIMEOUT
+ import_state = find_import_state(import_state_jid.id)
+ fail_import_and_log_status(import_state)
+ end
+ end
+
+ def fail_import_and_log_status(import_state)
+ raise AdvanceStageTimeoutError, "Failing advance stage, timeout reached with pessimistic strategy"
+ rescue AdvanceStageTimeoutError => e
+ Gitlab::Import::ImportFailureService.track(
+ import_state: import_state,
+ exception: e,
+ error_source: self.class.name,
+ fail_import: true
+ )
+ end
+
+ def timeout_reached?(timeout_timer)
+ timeout_timer = Time.zone.parse(timeout_timer) if timeout_timer.is_a?(String)
+ Time.zone.now > timeout_timer + TIMEOUT_DURATION
+ end
+
def next_stage_worker(next_stage)
raise NotImplementedError
end
diff --git a/app/workers/gitlab/import/stuck_project_import_jobs_worker.rb b/app/workers/gitlab/import/stuck_project_import_jobs_worker.rb
index 01979b2029f..93d670e1b8b 100644
--- a/app/workers/gitlab/import/stuck_project_import_jobs_worker.rb
+++ b/app/workers/gitlab/import/stuck_project_import_jobs_worker.rb
@@ -1,4 +1,5 @@
# frozen_string_literal: true
+
module Gitlab
module Import
class StuckProjectImportJobsWorker # rubocop:disable Scalability/IdempotentWorker
diff --git a/app/workers/gitlab/jira_import/advance_stage_worker.rb b/app/workers/gitlab/jira_import/advance_stage_worker.rb
index 5fae7caf791..9641b55a584 100644
--- a/app/workers/gitlab/jira_import/advance_stage_worker.rb
+++ b/app/workers/gitlab/jira_import/advance_stage_worker.rb
@@ -20,10 +20,14 @@ module Gitlab
finish: Gitlab::JiraImport::Stage::FinishImportWorker
}.freeze
- def find_import_state(project_id)
+ def find_import_state_jid(project_id)
JiraImportState.jid_by(project_id: project_id, status: :started)
end
+ def find_import_state(id)
+ JiraImportState.find(id)
+ end
+
private
def next_stage_worker(next_stage)
diff --git a/app/workers/gitlab_shell_worker.rb b/app/workers/gitlab_shell_worker.rb
deleted file mode 100644
index b3c0fa79658..00000000000
--- a/app/workers/gitlab_shell_worker.rb
+++ /dev/null
@@ -1,25 +0,0 @@
-# frozen_string_literal: true
-
-class GitlabShellWorker # rubocop:disable Scalability/IdempotentWorker
- include ApplicationWorker
-
- data_consistency :always
-
- sidekiq_options retry: 3
- include Gitlab::ShellAdapter
-
- feature_category :source_code_management
- urgency :high
- weight 2
- loggable_arguments 0
-
- def perform(action, *arg)
- if Gitlab::Shell::PERMITTED_ACTIONS.exclude?(action)
- raise(ArgumentError, "#{action} not allowed for #{self.class.name}")
- end
-
- Gitlab::GitalyClient::NamespaceService.allow do
- gitlab_shell.public_send(action, *arg) # rubocop:disable GitlabSecurity/PublicSend
- end
- end
-end
diff --git a/app/workers/hashed_storage/migrator_worker.rb b/app/workers/hashed_storage/migrator_worker.rb
index 5f90b8f1009..a7e7a505681 100644
--- a/app/workers/hashed_storage/migrator_worker.rb
+++ b/app/workers/hashed_storage/migrator_worker.rb
@@ -13,9 +13,6 @@ module HashedStorage
# @param [Integer] start initial ID of the batch
# @param [Integer] finish last ID of the batch
- def perform(start, finish)
- migrator = Gitlab::HashedStorage::Migrator.new
- migrator.bulk_migrate(start: start, finish: finish)
- end
+ def perform(start, finish); end
end
end
diff --git a/app/workers/hashed_storage/project_migrate_worker.rb b/app/workers/hashed_storage/project_migrate_worker.rb
index 01e2d6307de..e1bf71de179 100644
--- a/app/workers/hashed_storage/project_migrate_worker.rb
+++ b/app/workers/hashed_storage/project_migrate_worker.rb
@@ -13,17 +13,6 @@ module HashedStorage
attr_reader :project_id
- def perform(project_id, old_disk_path = nil)
- @project_id = project_id # we need to set this in order to create the lease_key
-
- try_obtain_lease do
- project = Project.without_deleted.find_by_id(project_id)
- break unless project && project.storage_upgradable?
-
- old_disk_path ||= Storage::LegacyProject.new(project).disk_path
-
- ::Projects::HashedStorage::MigrationService.new(project, old_disk_path, logger: logger).execute
- end
- end
+ def perform(project_id, old_disk_path = nil); end
end
end
diff --git a/app/workers/hashed_storage/project_rollback_worker.rb b/app/workers/hashed_storage/project_rollback_worker.rb
index 2ec323248ab..af4223ff354 100644
--- a/app/workers/hashed_storage/project_rollback_worker.rb
+++ b/app/workers/hashed_storage/project_rollback_worker.rb
@@ -13,17 +13,6 @@ module HashedStorage
attr_reader :project_id
- def perform(project_id, old_disk_path = nil)
- @project_id = project_id # we need to set this in order to create the lease_key
-
- try_obtain_lease do
- project = Project.without_deleted.find_by_id(project_id)
- break unless project
-
- old_disk_path ||= project.disk_path
-
- ::Projects::HashedStorage::RollbackService.new(project, old_disk_path, logger: logger).execute
- end
- end
+ def perform(project_id, old_disk_path = nil); end
end
end
diff --git a/app/workers/hashed_storage/rollbacker_worker.rb b/app/workers/hashed_storage/rollbacker_worker.rb
index c6c4990d799..e659e65a370 100644
--- a/app/workers/hashed_storage/rollbacker_worker.rb
+++ b/app/workers/hashed_storage/rollbacker_worker.rb
@@ -13,9 +13,6 @@ module HashedStorage
# @param [Integer] start initial ID of the batch
# @param [Integer] finish last ID of the batch
- def perform(start, finish)
- migrator = Gitlab::HashedStorage::Migrator.new
- migrator.bulk_rollback(start: start, finish: finish)
- end
+ def perform(start, finish); end
end
end
diff --git a/app/workers/integrations/irker_worker.rb b/app/workers/integrations/irker_worker.rb
index 3152d68b372..4c1f0df0fc7 100644
--- a/app/workers/integrations/irker_worker.rb
+++ b/app/workers/integrations/irker_worker.rb
@@ -58,7 +58,7 @@ module Integrations
allow_local_network: allow_local_requests?,
schemes: ['irc'])
@socket = TCPSocket.new ip_address, irker_port
- rescue Errno::ECONNREFUSED, Gitlab::UrlBlocker::BlockedUrlError => e
+ rescue Errno::ECONNREFUSED, Gitlab::HTTP_V2::UrlBlocker::BlockedUrlError => e
logger.fatal "Can't connect to Irker daemon: #{e}"
return false
end
diff --git a/app/workers/issuable/related_links_create_worker.rb b/app/workers/issuable/related_links_create_worker.rb
new file mode 100644
index 00000000000..7cbf70fd5ab
--- /dev/null
+++ b/app/workers/issuable/related_links_create_worker.rb
@@ -0,0 +1,65 @@
+# frozen_string_literal: true
+
+module Issuable
+ class RelatedLinksCreateWorker
+ include ApplicationWorker
+
+ data_consistency :delayed
+
+ sidekiq_options retry: 3
+
+ feature_category :portfolio_management
+ worker_resource_boundary :unknown
+ urgency :high
+ idempotent!
+
+ def perform(args)
+ @params = args.with_indifferent_access
+ @user = User.find_by_id(params[:user_id])
+ @issuable = issuable_class.find_by_id(params[:issuable_id])
+ @links = issuable_class.related_link_class&.where(id: params[:link_ids])
+ return unless user && issuable && links.present?
+
+ create_issuable_notes!
+ rescue ArgumentError => error
+ logger.error(
+ worker: self.class.name,
+ message: "Failed to complete job (user_id:#{params[:user_id]}, issuable_id:#{params[:issuable_id]}, " \
+ "issuable_class:#{params[:issuable_class]}): #{error.message}"
+ )
+ end
+
+ private
+
+ attr_reader :params, :user, :issuable, :links
+
+ def issuable_class
+ params[:issuable_class].constantize
+ rescue NameError
+ raise ArgumentError, "Unknown class '#{params[:issuable_class]}'"
+ end
+
+ def create_issuable_notes!
+ errors = create_notes.compact
+ return unless errors.any?
+
+ raise ArgumentError, "Could not create notes: #{errors.join(', ')}"
+ end
+
+ def create_notes
+ linked_item_notes_errors = links.filter_map { |link| create_system_note(link.target, issuable) }
+ issuable_note_error = create_system_note(issuable, links.collect(&:target))
+
+ linked_item_notes_errors << issuable_note_error
+ end
+
+ def create_system_note(noteable, references, method_name = :relate_issuable)
+ note = ::SystemNoteService.try(method_name, noteable, references, user)
+ return if note.present?
+
+ "{noteable_id: #{noteable.id}, reference_ids: #{[references].flatten.collect(&:id)}}"
+ end
+ end
+end
+
+Issuable::RelatedLinksCreateWorker.prepend_mod_with('Issuable::RelatedLinksCreateWorker')
diff --git a/app/workers/jira_connect/sync_project_worker.rb b/app/workers/jira_connect/sync_project_worker.rb
index 40f225ab756..09aa5edc73b 100644
--- a/app/workers/jira_connect/sync_project_worker.rb
+++ b/app/workers/jira_connect/sync_project_worker.rb
@@ -33,7 +33,10 @@ module JiraConnect
# rubocop: disable CodeReuse/ActiveRecord
def merge_requests_to_sync(project)
- project.merge_requests.with_jira_issue_keys.preload(:author).limit(MAX_RECORDS_LIMIT).order(id: :desc)
+ project.merge_requests.with_jira_issue_keys
+ .preload(:author, :approvals, merge_request_reviewers: :reviewer)
+ .limit(MAX_RECORDS_LIMIT)
+ .order(id: :desc)
end
# rubocop: enable CodeReuse/ActiveRecord
diff --git a/app/workers/merge_worker.rb b/app/workers/merge_worker.rb
index a0594b15e31..29f0c0bbbf4 100644
--- a/app/workers/merge_worker.rb
+++ b/app/workers/merge_worker.rb
@@ -16,8 +16,6 @@ class MergeWorker # rubocop:disable Scalability/IdempotentWorker
deduplicate :until_executed, including_scheduled: true
def perform(merge_request_id, current_user_id, params)
- params = params.with_indifferent_access
-
begin
current_user = User.find(current_user_id)
merge_request = MergeRequest.find(merge_request_id)
@@ -25,6 +23,9 @@ class MergeWorker # rubocop:disable Scalability/IdempotentWorker
return
end
+ params = params.with_indifferent_access
+ params[:check_mergeability_retry_lease] = true unless params.has_key?(:check_mergeability_retry_lease)
+
MergeRequests::MergeService.new(project: merge_request.target_project, current_user: current_user, params: params)
.execute(merge_request)
end
diff --git a/app/workers/pages/deactivated_deployments_delete_cron_worker.rb b/app/workers/pages/deactivated_deployments_delete_cron_worker.rb
new file mode 100644
index 00000000000..7ee6327cea7
--- /dev/null
+++ b/app/workers/pages/deactivated_deployments_delete_cron_worker.rb
@@ -0,0 +1,19 @@
+# frozen_string_literal: true
+
+module Pages
+ class DeactivatedDeploymentsDeleteCronWorker
+ include ApplicationWorker
+ include CronjobQueue # rubocop: disable Scalability/CronWorkerContext
+
+ idempotent!
+ data_consistency :always # rubocop: disable SidekiqLoadBalancing/WorkerDataConsistency
+
+ feature_category :pages
+
+ def perform
+ PagesDeployment.deactivated.each_batch do |deployments| # rubocop: disable Style/SymbolProc
+ deployments.delete_all
+ end
+ end
+ end
+end
diff --git a/app/workers/projects/after_import_worker.rb b/app/workers/projects/after_import_worker.rb
index 06211b2d991..47bd07d0850 100644
--- a/app/workers/projects/after_import_worker.rb
+++ b/app/workers/projects/after_import_worker.rb
@@ -31,7 +31,7 @@ module Projects
message: 'Project housekeeping failed',
project_full_path: @project.full_path,
project_id: @project.id,
- 'error.message' => e.message
+ 'exception.message' => e.message
)
end
diff --git a/app/workers/projects/record_target_platforms_worker.rb b/app/workers/projects/record_target_platforms_worker.rb
index bbe0c63cfd1..d458c9563d0 100644
--- a/app/workers/projects/record_target_platforms_worker.rb
+++ b/app/workers/projects/record_target_platforms_worker.rb
@@ -8,7 +8,7 @@ module Projects
LEASE_TIMEOUT = 1.hour.to_i
APPLE_PLATFORM_LANGUAGES = %w[swift objective-c].freeze
- feature_category :experimentation_activation
+ feature_category :activation
data_consistency :always
deduplicate :until_executed
urgency :low
diff --git a/app/workers/run_pipeline_schedule_worker.rb b/app/workers/run_pipeline_schedule_worker.rb
index dab92e16ee3..61ef7494d38 100644
--- a/app/workers/run_pipeline_schedule_worker.rb
+++ b/app/workers/run_pipeline_schedule_worker.rb
@@ -22,7 +22,7 @@ class RunPipelineScheduleWorker # rubocop:disable Scalability/IdempotentWorker
options.symbolize_keys!
if options[:scheduling]
- return if schedule.next_run_at > Time.current
+ return if schedule.next_run_at.future?
update_next_run_at_for(schedule)
end
diff --git a/app/workers/tasks_to_be_done/create_worker.rb b/app/workers/tasks_to_be_done/create_worker.rb
index d3824ceb4ae..91046e3cfed 100644
--- a/app/workers/tasks_to_be_done/create_worker.rb
+++ b/app/workers/tasks_to_be_done/create_worker.rb
@@ -11,21 +11,8 @@ module TasksToBeDone
worker_resource_boundary :cpu
def perform(member_task_id, current_user_id, assignee_ids = [])
- member_task = MemberTask.find(member_task_id)
- current_user = User.find(current_user_id)
- project = member_task.project
-
- member_task.tasks_to_be_done.each do |task|
- service_class(task)
- .new(container: project, current_user: current_user, assignee_ids: assignee_ids)
- .execute
- end
- end
-
- private
-
- def service_class(task)
- "TasksToBeDone::Create#{task.to_s.camelize}TaskService".constantize
+ # no-op removing
+ # https://docs.gitlab.com/ee/development/sidekiq/compatibility_across_updates.html#removing-worker-classes
end
end
end