diff options
Diffstat (limited to 'app/workers')
70 files changed, 757 insertions, 490 deletions
diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml index 6ef7447b9da..e5b860ba525 100644 --- a/app/workers/all_queues.yml +++ b/app/workers/all_queues.yml @@ -174,15 +174,6 @@ :weight: 1 :idempotent: true :tags: [] -- :name: container_repository:delete_container_repository - :worker_name: DeleteContainerRepositoryWorker - :feature_category: :container_registry - :has_external_dependencies: false - :urgency: :low - :resource_boundary: :unknown - :weight: 1 - :idempotent: false - :tags: [] - :name: container_repository_delete:container_registry_delete_container_repository :worker_name: ContainerRegistry::DeleteContainerRepositoryWorker :feature_category: :container_registry @@ -300,6 +291,15 @@ :weight: 1 :idempotent: true :tags: [] +- :name: cronjob:ci_schedule_unlock_pipelines_in_queue_cron + :worker_name: Ci::ScheduleUnlockPipelinesInQueueCronWorker + :feature_category: :build_artifacts + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: cronjob:ci_stuck_builds_drop_running :worker_name: Ci::StuckBuilds::DropRunningWorker :feature_category: :continuous_integration @@ -327,6 +327,15 @@ :weight: 1 :idempotent: false :tags: [] +- :name: cronjob:click_house_events_sync + :worker_name: ClickHouse::EventsSyncWorker + :feature_category: :value_stream_management + :has_external_dependencies: true + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: cronjob:container_expiration_policy :worker_name: ContainerExpirationPolicyWorker :feature_category: :container_registry @@ -642,6 +651,15 @@ :weight: 1 :idempotent: true :tags: [] +- :name: cronjob:pages_deactivated_deployments_delete_cron + :worker_name: Pages::DeactivatedDeploymentsDeleteCronWorker + :feature_category: :pages + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: cronjob:pages_domain_removal_cron :worker_name: PagesDomainRemovalCronWorker :feature_category: :pages @@ -1920,6 +1938,15 @@ :weight: 1 :idempotent: true :tags: [] +- :name: pipeline_background:ci_refs_unlock_previous_pipelines + :worker_name: Ci::Refs::UnlockPreviousPipelinesWorker + :feature_category: :continuous_integration + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: pipeline_background:ci_test_failure_history :worker_name: Ci::TestFailureHistoryWorker :feature_category: :continuous_integration @@ -2352,6 +2379,33 @@ :weight: 1 :idempotent: false :tags: [] +- :name: bitbucket_import_import_issue + :worker_name: Gitlab::BitbucketImport::ImportIssueWorker + :feature_category: :importers + :has_external_dependencies: true + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: false + :tags: [] +- :name: bitbucket_import_import_issue_notes + :worker_name: Gitlab::BitbucketImport::ImportIssueNotesWorker + :feature_category: :importers + :has_external_dependencies: true + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: false + :tags: [] +- :name: bitbucket_import_import_lfs_object + :worker_name: Gitlab::BitbucketImport::ImportLfsObjectWorker + :feature_category: :importers + :has_external_dependencies: true + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: false + :tags: [] - :name: bitbucket_import_import_pull_request :worker_name: Gitlab::BitbucketImport::ImportPullRequestWorker :feature_category: :importers @@ -2361,6 +2415,15 @@ :weight: 1 :idempotent: false :tags: [] +- :name: bitbucket_import_import_pull_request_notes + :worker_name: Gitlab::BitbucketImport::ImportPullRequestNotesWorker + :feature_category: :importers + :has_external_dependencies: true + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: false + :tags: [] - :name: bitbucket_import_stage_finish_import :worker_name: Gitlab::BitbucketImport::Stage::FinishImportWorker :feature_category: :importers @@ -2370,6 +2433,33 @@ :weight: 1 :idempotent: false :tags: [] +- :name: bitbucket_import_stage_import_issues + :worker_name: Gitlab::BitbucketImport::Stage::ImportIssuesWorker + :feature_category: :importers + :has_external_dependencies: true + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: false + :tags: [] +- :name: bitbucket_import_stage_import_issues_notes + :worker_name: Gitlab::BitbucketImport::Stage::ImportIssuesNotesWorker + :feature_category: :importers + :has_external_dependencies: true + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: false + :tags: [] +- :name: bitbucket_import_stage_import_lfs_objects + :worker_name: Gitlab::BitbucketImport::Stage::ImportLfsObjectsWorker + :feature_category: :importers + :has_external_dependencies: true + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: false + :tags: [] - :name: bitbucket_import_stage_import_pull_requests :worker_name: Gitlab::BitbucketImport::Stage::ImportPullRequestsWorker :feature_category: :importers @@ -2379,6 +2469,15 @@ :weight: 1 :idempotent: false :tags: [] +- :name: bitbucket_import_stage_import_pull_requests_notes + :worker_name: Gitlab::BitbucketImport::Stage::ImportPullRequestsNotesWorker + :feature_category: :importers + :has_external_dependencies: true + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: false + :tags: [] - :name: bitbucket_import_stage_import_repository :worker_name: Gitlab::BitbucketImport::Stage::ImportRepositoryWorker :feature_category: :importers @@ -2631,10 +2730,10 @@ :weight: 1 :idempotent: true :tags: [] -- :name: click_house_events_sync - :worker_name: ClickHouse::EventsSyncWorker - :feature_category: :value_stream_management - :has_external_dependencies: true +- :name: ci_unlock_pipelines_in_queue + :worker_name: Ci::UnlockPipelinesInQueueWorker + :feature_category: :build_artifacts + :has_external_dependencies: false :urgency: :low :resource_boundary: :unknown :weight: 1 @@ -2811,6 +2910,15 @@ :weight: 1 :idempotent: true :tags: [] +- :name: environments_stop_job_failed + :worker_name: Environments::StopJobFailedWorker + :feature_category: :continuous_delivery + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: environments_stop_job_success :worker_name: Environments::StopJobSuccessWorker :feature_category: :continuous_delivery @@ -2883,15 +2991,6 @@ :weight: 1 :idempotent: true :tags: [] -- :name: gitlab_shell - :worker_name: GitlabShellWorker - :feature_category: :source_code_management - :has_external_dependencies: false - :urgency: :high - :resource_boundary: :unknown - :weight: 2 - :idempotent: false - :tags: [] - :name: google_cloud_create_cloudsql_instance :worker_name: GoogleCloud::CreateCloudsqlInstanceWorker :feature_category: :not_owned @@ -3045,6 +3144,15 @@ :weight: 1 :idempotent: true :tags: [] +- :name: issuable_related_links_create + :worker_name: Issuable::RelatedLinksCreateWorker + :feature_category: :portfolio_management + :has_external_dependencies: false + :urgency: :high + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: issuables_clear_groups_issue_counter :worker_name: Issuables::ClearGroupsIssueCounterWorker :feature_category: :team_planning @@ -3524,7 +3632,7 @@ :tags: [] - :name: projects_record_target_platforms :worker_name: Projects::RecordTargetPlatformsWorker - :feature_category: :experimentation_activation + :feature_category: :activation :has_external_dependencies: false :urgency: :low :resource_boundary: :unknown diff --git a/app/workers/bulk_import_worker.rb b/app/workers/bulk_import_worker.rb index 83b881ee525..5b9b46081cc 100644 --- a/app/workers/bulk_import_worker.rb +++ b/app/workers/bulk_import_worker.rb @@ -3,124 +3,14 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker include ApplicationWorker - PERFORM_DELAY = 5.seconds - DEFAULT_BATCH_SIZE = 5 - data_consistency :always feature_category :importers sidekiq_options retry: false, dead: false def perform(bulk_import_id) - @bulk_import = BulkImport.find_by_id(bulk_import_id) - - return unless @bulk_import - return if @bulk_import.finished? || @bulk_import.failed? - return @bulk_import.fail_op! if all_entities_failed? - return @bulk_import.finish! if all_entities_processed? && @bulk_import.started? - return re_enqueue if max_batch_size_exceeded? # Do not start more jobs if max allowed are already running - - @bulk_import.start! if @bulk_import.created? - - created_entities.first(next_batch_size).each do |entity| - create_tracker(entity) - - entity.start! - - BulkImports::ExportRequestWorker.perform_async(entity.id) - end - - re_enqueue - rescue StandardError => e - Gitlab::ErrorTracking.track_exception(e, bulk_import_id: @bulk_import&.id) - - @bulk_import&.fail_op - end - - private - - def entities - @entities ||= @bulk_import.entities - end - - def created_entities - entities.with_status(:created) - end - - def all_entities_processed? - entities.all? { |entity| entity.finished? || entity.failed? } - end - - def all_entities_failed? - entities.all?(&:failed?) - end - - # A new BulkImportWorker job is enqueued to either - # - Process the new BulkImports::Entity created during import (e.g. for the subgroups) - # - Or to mark the `bulk_import` as finished - def re_enqueue - BulkImportWorker.perform_in(PERFORM_DELAY, @bulk_import.id) - end - - def started_entities - entities.with_status(:started) - end - - def max_batch_size_exceeded? - started_entities.count >= DEFAULT_BATCH_SIZE - end - - def next_batch_size - [DEFAULT_BATCH_SIZE - started_entities.count, 0].max - end - - def create_tracker(entity) - entity.class.transaction do - entity.pipelines.each do |pipeline| - status = skip_pipeline?(pipeline, entity) ? :skipped : :created - - entity.trackers.create!( - stage: pipeline[:stage], - pipeline_name: pipeline[:pipeline], - status: BulkImports::Tracker.state_machine.states[status].value - ) - end - end - end - - def skip_pipeline?(pipeline, entity) - return false unless entity.source_version.valid? - - minimum_version, maximum_version = pipeline.values_at(:minimum_source_version, :maximum_source_version) - - if source_version_out_of_range?(minimum_version, maximum_version, entity.source_version.without_patch) - log_skipped_pipeline(pipeline, entity, minimum_version, maximum_version) - return true - end - - false - end - - def source_version_out_of_range?(minimum_version, maximum_version, non_patch_source_version) - (minimum_version && non_patch_source_version < Gitlab::VersionInfo.parse(minimum_version)) || - (maximum_version && non_patch_source_version > Gitlab::VersionInfo.parse(maximum_version)) - end - - def log_skipped_pipeline(pipeline, entity, minimum_version, maximum_version) - logger.info( - message: 'Pipeline skipped as source instance version not compatible with pipeline', - bulk_import_entity_id: entity.id, - bulk_import_id: entity.bulk_import_id, - bulk_import_entity_type: entity.source_type, - source_full_path: entity.source_full_path, - pipeline_name: pipeline[:pipeline], - minimum_source_version: minimum_version, - maximum_source_version: maximum_version, - source_version: entity.source_version.to_s, - importer: 'gitlab_migration' - ) - end + bulk_import = BulkImport.find_by_id(bulk_import_id) + return unless bulk_import - def logger - @logger ||= Gitlab::Import::Logger.build + BulkImports::ProcessService.new(bulk_import).execute end end diff --git a/app/workers/bulk_imports/entity_worker.rb b/app/workers/bulk_imports/entity_worker.rb index fb99d63d06e..9b60dcdeb8a 100644 --- a/app/workers/bulk_imports/entity_worker.rb +++ b/app/workers/bulk_imports/entity_worker.rb @@ -1,97 +1,68 @@ # frozen_string_literal: true module BulkImports - class EntityWorker # rubocop:disable Scalability/IdempotentWorker + class EntityWorker include ApplicationWorker idempotent! - deduplicate :until_executing + deduplicate :until_executed data_consistency :always feature_category :importers sidekiq_options retry: false, dead: false worker_has_external_dependencies! - def perform(entity_id, current_stage = nil) + PERFORM_DELAY = 5.seconds + + # Keep `_current_stage` parameter for backwards compatibility. + # The parameter will be remove in https://gitlab.com/gitlab-org/gitlab/-/issues/426311 + def perform(entity_id, _current_stage = nil) @entity = ::BulkImports::Entity.find(entity_id) - if stage_running?(entity_id, current_stage) - logger.info( - structured_payload( - bulk_import_entity_id: entity_id, - bulk_import_id: entity.bulk_import_id, - bulk_import_entity_type: entity.source_type, - source_full_path: entity.source_full_path, - current_stage: current_stage, - message: 'Stage running', - source_version: source_version, - importer: 'gitlab_migration' - ) - ) + return unless @entity.started? - return + if running_tracker.present? + log_info(message: 'Stage running', entity_stage: running_tracker.stage) + else + start_next_stage end - logger.info( - structured_payload( - bulk_import_entity_id: entity_id, - bulk_import_id: entity.bulk_import_id, - bulk_import_entity_type: entity.source_type, - source_full_path: entity.source_full_path, - current_stage: current_stage, - message: 'Stage starting', - source_version: source_version, - importer: 'gitlab_migration' - ) - ) - - next_pipeline_trackers_for(entity_id).each do |pipeline_tracker| - BulkImports::PipelineWorker.perform_async( - pipeline_tracker.id, - pipeline_tracker.stage, - entity_id - ) - end + re_enqueue rescue StandardError => e - log_exception(e, - { - bulk_import_entity_id: entity_id, - bulk_import_id: entity.bulk_import_id, - bulk_import_entity_type: entity.source_type, - source_full_path: entity.source_full_path, - current_stage: current_stage, - message: 'Entity failed', - source_version: source_version, - importer: 'gitlab_migration' - } - ) - - Gitlab::ErrorTracking.track_exception( - e, - bulk_import_entity_id: entity_id, - bulk_import_id: entity.bulk_import_id, - bulk_import_entity_type: entity.source_type, - source_full_path: entity.source_full_path, - source_version: source_version, - importer: 'gitlab_migration' - ) + Gitlab::ErrorTracking.track_exception(e, log_params(message: 'Entity failed')) - entity.fail_op! + @entity.fail_op! end private attr_reader :entity - def stage_running?(entity_id, stage) - return unless stage + def re_enqueue + BulkImports::EntityWorker.perform_in(PERFORM_DELAY, entity.id) + end - BulkImports::Tracker.stage_running?(entity_id, stage) + def running_tracker + @running_tracker ||= BulkImports::Tracker.running_trackers(entity.id).first end def next_pipeline_trackers_for(entity_id) BulkImports::Tracker.next_pipeline_trackers_for(entity_id).update(status_event: 'enqueue') end + def start_next_stage + next_pipeline_trackers = next_pipeline_trackers_for(entity.id) + + next_pipeline_trackers.each_with_index do |pipeline_tracker, index| + log_info(message: 'Stage starting', entity_stage: pipeline_tracker.stage) if index == 0 + + BulkImports::PipelineWorker.perform_async( + pipeline_tracker.id, + pipeline_tracker.stage, + entity.id + ) + end + end + def source_version entity.bulk_import.source_version_info.to_s end @@ -105,5 +76,22 @@ module BulkImports logger.error(structured_payload(payload)) end + + def log_info(payload) + logger.info(structured_payload(log_params(payload))) + end + + def log_params(extra) + defaults = { + bulk_import_entity_id: entity.id, + bulk_import_id: entity.bulk_import_id, + bulk_import_entity_type: entity.source_type, + source_full_path: entity.source_full_path, + source_version: source_version, + importer: 'gitlab_migration' + } + + defaults.merge(extra) + end end end diff --git a/app/workers/bulk_imports/finish_batched_pipeline_worker.rb b/app/workers/bulk_imports/finish_batched_pipeline_worker.rb index 4200d0e4a0f..b1f3757e058 100644 --- a/app/workers/bulk_imports/finish_batched_pipeline_worker.rb +++ b/app/workers/bulk_imports/finish_batched_pipeline_worker.rb @@ -12,6 +12,8 @@ module BulkImports data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency feature_category :importers + version 2 + def perform(pipeline_tracker_id) @tracker = Tracker.find(pipeline_tracker_id) @@ -27,7 +29,9 @@ module BulkImports end ensure - ::BulkImports::EntityWorker.perform_async(tracker.entity.id, tracker.stage) + # This is needed for in-flight migrations. + # It will be remove in https://gitlab.com/gitlab-org/gitlab/-/issues/426299 + ::BulkImports::EntityWorker.perform_async(tracker.entity.id) if job_version.nil? end private @@ -39,7 +43,7 @@ module BulkImports end def import_in_progress? - tracker.batches.any?(&:started?) + tracker.batches.any? { |b| b.started? || b.created? } end end end diff --git a/app/workers/bulk_imports/pipeline_batch_worker.rb b/app/workers/bulk_imports/pipeline_batch_worker.rb index 634d7ed3c87..6230d517641 100644 --- a/app/workers/bulk_imports/pipeline_batch_worker.rb +++ b/app/workers/bulk_imports/pipeline_batch_worker.rb @@ -14,15 +14,16 @@ module BulkImports def perform(batch_id) @batch = ::BulkImports::BatchTracker.find(batch_id) @tracker = @batch.tracker + @pending_retry = false try_obtain_lease { run } ensure - ::BulkImports::FinishBatchedPipelineWorker.perform_async(tracker.id) + ::BulkImports::FinishBatchedPipelineWorker.perform_async(tracker.id) unless pending_retry end private - attr_reader :batch, :tracker + attr_reader :batch, :tracker, :pending_retry def run return batch.skip! if tracker.failed? || tracker.finished? @@ -31,6 +32,7 @@ module BulkImports tracker.pipeline_class.new(context).run batch.finish! rescue BulkImports::RetryPipelineError => e + @pending_retry = true retry_batch(e) rescue StandardError => e fail_batch(e) diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb index 098e167ac29..24185f43795 100644 --- a/app/workers/bulk_imports/pipeline_worker.rb +++ b/app/workers/bulk_imports/pipeline_worker.rb @@ -14,7 +14,10 @@ module BulkImports deduplicate :until_executing worker_resource_boundary :memory - def perform(pipeline_tracker_id, stage, entity_id) + version 2 + + # Keep _stage parameter for backwards compatibility. + def perform(pipeline_tracker_id, _stage, entity_id) @entity = ::BulkImports::Entity.find(entity_id) @pipeline_tracker = ::BulkImports::Tracker.find(pipeline_tracker_id) @@ -32,7 +35,9 @@ module BulkImports end end ensure - ::BulkImports::EntityWorker.perform_async(entity_id, stage) + # This is needed for in-flight migrations. + # It will be remove in https://gitlab.com/gitlab-org/gitlab/-/issues/426299 + ::BulkImports::EntityWorker.perform_async(entity_id) if job_version.nil? end private diff --git a/app/workers/ci/initial_pipeline_process_worker.rb b/app/workers/ci/initial_pipeline_process_worker.rb index 067dbb7492f..703cae8bf88 100644 --- a/app/workers/ci/initial_pipeline_process_worker.rb +++ b/app/workers/ci/initial_pipeline_process_worker.rb @@ -28,6 +28,8 @@ module Ci private def create_deployments!(pipeline) + return if Feature.enabled?(:create_deployment_only_for_processable_jobs, pipeline.project) + pipeline.stages.flat_map(&:statuses).each { |build| create_deployment(build) } end diff --git a/app/workers/ci/merge_requests/add_todo_when_build_fails_worker.rb b/app/workers/ci/merge_requests/add_todo_when_build_fails_worker.rb index 98bb259db0a..8bcbe9d6c9f 100644 --- a/app/workers/ci/merge_requests/add_todo_when_build_fails_worker.rb +++ b/app/workers/ci/merge_requests/add_todo_when_build_fails_worker.rb @@ -1,4 +1,5 @@ # frozen_string_literal: true + module Ci module MergeRequests class AddTodoWhenBuildFailsWorker diff --git a/app/workers/ci/ref_delete_unlock_artifacts_worker.rb b/app/workers/ci/ref_delete_unlock_artifacts_worker.rb index aeadf111bfb..e343c0aedd4 100644 --- a/app/workers/ci/ref_delete_unlock_artifacts_worker.rb +++ b/app/workers/ci/ref_delete_unlock_artifacts_worker.rb @@ -13,17 +13,21 @@ module Ci def perform(project_id, user_id, ref_path) ::Project.find_by_id(project_id).try do |project| - ::User.find_by_id(user_id).try do |user| + ::User.find_by_id(user_id).try do |_| project.ci_refs.find_by_ref_path(ref_path).try do |ci_ref| - results = ::Ci::UnlockArtifactsService - .new(project, user) - .execute(ci_ref) - - log_extra_metadata_on_done(:unlocked_pipelines, results[:unlocked_pipelines]) - log_extra_metadata_on_done(:unlocked_job_artifacts, results[:unlocked_job_artifacts]) + enqueue_pipelines_to_unlock(ci_ref) end end end end + + private + + def enqueue_pipelines_to_unlock(ci_ref) + result = ::Ci::Refs::EnqueuePipelinesToUnlockService.new.execute(ci_ref) + + log_extra_metadata_on_done(:total_pending_entries, result[:total_pending_entries]) + log_extra_metadata_on_done(:total_new_entries, result[:total_new_entries]) + end end end diff --git a/app/workers/ci/refs/unlock_previous_pipelines_worker.rb b/app/workers/ci/refs/unlock_previous_pipelines_worker.rb new file mode 100644 index 00000000000..bf595590cb1 --- /dev/null +++ b/app/workers/ci/refs/unlock_previous_pipelines_worker.rb @@ -0,0 +1,26 @@ +# frozen_string_literal: true + +module Ci + module Refs + class UnlockPreviousPipelinesWorker + include ApplicationWorker + + data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency + + sidekiq_options retry: 3 + include PipelineBackgroundQueue + + idempotent! + + def perform(ref_id) + ::Ci::Ref.find_by_id(ref_id).try do |ref| + pipeline = ref.last_finished_pipeline + result = ::Ci::Refs::EnqueuePipelinesToUnlockService.new.execute(ref, before_pipeline: pipeline) + + log_extra_metadata_on_done(:total_pending_entries, result[:total_pending_entries]) + log_extra_metadata_on_done(:total_new_entries, result[:total_new_entries]) + end + end + end + end +end diff --git a/app/workers/ci/schedule_unlock_pipelines_in_queue_cron_worker.rb b/app/workers/ci/schedule_unlock_pipelines_in_queue_cron_worker.rb new file mode 100644 index 00000000000..1a593326120 --- /dev/null +++ b/app/workers/ci/schedule_unlock_pipelines_in_queue_cron_worker.rb @@ -0,0 +1,21 @@ +# frozen_string_literal: true + +module Ci + class ScheduleUnlockPipelinesInQueueCronWorker + include ApplicationWorker + + data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency + + # rubocop:disable Scalability/CronWorkerContext + # This worker does not perform work scoped to a context + include CronjobQueue + # rubocop:enable Scalability/CronWorkerContext + + feature_category :build_artifacts + idempotent! + + def perform(...) + Ci::UnlockPipelinesInQueueWorker.perform_with_capacity(...) + end + end +end diff --git a/app/workers/ci/unlock_pipelines_in_queue_worker.rb b/app/workers/ci/unlock_pipelines_in_queue_worker.rb new file mode 100644 index 00000000000..de579504711 --- /dev/null +++ b/app/workers/ci/unlock_pipelines_in_queue_worker.rb @@ -0,0 +1,55 @@ +# frozen_string_literal: true + +module Ci + class UnlockPipelinesInQueueWorker + include ApplicationWorker + + data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency + + include LimitedCapacity::Worker + + feature_category :build_artifacts + idempotent! + + MAX_RUNNING_LOW = 50 + MAX_RUNNING_MEDIUM = 500 + MAX_RUNNING_HIGH = 1500 + + def perform_work(*_) + pipeline_id, enqueue_timestamp = Ci::UnlockPipelineRequest.next! + return log_extra_metadata_on_done(:remaining_pending, 0) unless pipeline_id + + Ci::Pipeline.find_by_id(pipeline_id).try do |pipeline| + log_extra_metadata_on_done(:pipeline_id, pipeline.id) + log_extra_metadata_on_done(:project, pipeline.project.full_path) + + result = Ci::UnlockPipelineService.new(pipeline).execute + + log_extra_metadata_on_done(:unlock_wait_time, Time.current.utc.to_i - enqueue_timestamp) + log_extra_metadata_on_done(:remaining_pending, Ci::UnlockPipelineRequest.total_pending) + log_extra_metadata_on_done(:skipped_already_leased, result[:skipped_already_leased]) + log_extra_metadata_on_done(:skipped_already_unlocked, result[:skipped_already_unlocked]) + log_extra_metadata_on_done(:exec_timeout, result[:exec_timeout]) + log_extra_metadata_on_done(:unlocked_job_artifacts, result[:unlocked_job_artifacts]) + log_extra_metadata_on_done(:unlocked_pipeline_artifacts, result[:unlocked_pipeline_artifacts]) + end + end + + def remaining_work_count(*_) + Ci::UnlockPipelineRequest.total_pending + end + + def max_running_jobs + if ::Feature.enabled?(:ci_unlock_pipelines_high, type: :ops) + MAX_RUNNING_HIGH + elsif ::Feature.enabled?(:ci_unlock_pipelines_medium, type: :ops) + MAX_RUNNING_MEDIUM + elsif ::Feature.enabled?(:ci_unlock_pipelines, type: :ops) + # This is the default enabled flag + MAX_RUNNING_LOW + else + 0 + end + end + end +end diff --git a/app/workers/click_house/events_sync_worker.rb b/app/workers/click_house/events_sync_worker.rb index 5b7398cb071..e884a43b1e3 100644 --- a/app/workers/click_house/events_sync_worker.rb +++ b/app/workers/click_house/events_sync_worker.rb @@ -6,6 +6,7 @@ module ClickHouse include Gitlab::ExclusiveLeaseHelpers idempotent! + queue_namespace :cronjob data_consistency :delayed worker_has_external_dependencies! # the worker interacts with a ClickHouse database feature_category :value_stream_management diff --git a/app/workers/concerns/auto_devops_queue.rb b/app/workers/concerns/auto_devops_queue.rb index 61e3c1544bd..cdf429a8be5 100644 --- a/app/workers/concerns/auto_devops_queue.rb +++ b/app/workers/concerns/auto_devops_queue.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true -# + module AutoDevopsQueue extend ActiveSupport::Concern diff --git a/app/workers/concerns/chaos_queue.rb b/app/workers/concerns/chaos_queue.rb index 23e58b5182b..9a3d518dda8 100644 --- a/app/workers/concerns/chaos_queue.rb +++ b/app/workers/concerns/chaos_queue.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true -# + module ChaosQueue extend ActiveSupport::Concern diff --git a/app/workers/concerns/gitlab/github_import/object_importer.rb b/app/workers/concerns/gitlab/github_import/object_importer.rb index e190ced5073..fcc7a96fa2b 100644 --- a/app/workers/concerns/gitlab/github_import/object_importer.rb +++ b/app/workers/concerns/gitlab/github_import/object_importer.rb @@ -10,7 +10,6 @@ module Gitlab included do include ApplicationWorker - sidekiq_options retry: 3 include GithubImport::Queue include ReschedulingMethods @@ -19,11 +18,8 @@ module Gitlab sidekiq_retries_exhausted do |msg| args = msg['args'] - correlation_id = msg['correlation_id'] jid = msg['jid'] - new.perform_failure(args[0], args[1], correlation_id) - # If a job is being exhausted we still want to notify the # Gitlab::Import::AdvanceStageWorker to prevent the entire import from getting stuck if args.length == 3 && (key = args.last) && key.is_a?(String) @@ -64,29 +60,15 @@ module Gitlab rescue NoMethodError => e # This exception will be more useful in development when a new # Representation is created but the developer forgot to add a - # `:github_identifiers` field. + # `#github_identifiers` method. track_and_raise_exception(project, e, fail_import: true) rescue ActiveRecord::RecordInvalid, NotRetriableError => e # We do not raise exception to prevent job retry - failure = track_exception(project, e) - add_identifiers_to_failure(failure, object.github_identifiers) + track_exception(project, e) rescue StandardError => e track_and_raise_exception(project, e) end - # hash - A Hash containing the details of the object to import. - def perform_failure(project_id, hash, correlation_id) - project = Project.find_by_id(project_id) - return unless project - - failure = project.import_failures.failures_by_correlation_id(correlation_id).first - return unless failure - - object = representation_class.from_json_hash(hash) - - add_identifiers_to_failure(failure, object.github_identifiers) - end - def increment_object_counter?(_object) true end @@ -118,16 +100,20 @@ module Gitlab extra.merge( project_id: project_id, importer: importer_class.name, - github_identifiers: github_identifiers + external_identifiers: github_identifiers ) end def track_exception(project, exception, fail_import: false) + external_identifiers = github_identifiers || {} + external_identifiers[:object_type] ||= object_type&.to_s + Gitlab::Import::ImportFailureService.track( project_id: project.id, error_source: importer_class.name, exception: exception, - fail_import: fail_import + fail_import: fail_import, + external_identifiers: external_identifiers ) end @@ -136,12 +122,6 @@ module Gitlab raise(exception) end - - def add_identifiers_to_failure(failure, external_identifiers) - external_identifiers[:object_type] = object_type - - failure.update_column(:external_identifiers, external_identifiers) - end end end end diff --git a/app/workers/concerns/gitlab/github_import/queue.rb b/app/workers/concerns/gitlab/github_import/queue.rb index e7156ac12f8..7cc23dd7c0b 100644 --- a/app/workers/concerns/gitlab/github_import/queue.rb +++ b/app/workers/concerns/gitlab/github_import/queue.rb @@ -15,14 +15,6 @@ module Gitlab # this is better than a project being stuck in the "import" state # forever. sidekiq_options dead: false, retry: 5 - - sidekiq_retries_exhausted do |msg, e| - Gitlab::Import::ImportFailureService.track( - project_id: msg['args'][0], - exception: e, - fail_import: true - ) - end end end end diff --git a/app/workers/concerns/gitlab/github_import/stage_methods.rb b/app/workers/concerns/gitlab/github_import/stage_methods.rb index 75db5589415..80013ff3cd9 100644 --- a/app/workers/concerns/gitlab/github_import/stage_methods.rb +++ b/app/workers/concerns/gitlab/github_import/stage_methods.rb @@ -3,6 +3,21 @@ module Gitlab module GithubImport module StageMethods + extend ActiveSupport::Concern + + included do + include ApplicationWorker + + sidekiq_retries_exhausted do |msg, e| + Gitlab::Import::ImportFailureService.track( + project_id: msg['args'][0], + exception: e, + error_source: self.class.name, + fail_import: true + ) + end + end + # project_id - The ID of the GitLab project to import the data into. def perform(project_id) info(project_id, message: 'starting stage') @@ -29,7 +44,8 @@ module Gitlab project_id: project_id, exception: e, error_source: self.class.name, - fail_import: abort_on_failure + fail_import: false, + metrics: true ) raise(e) @@ -51,10 +67,6 @@ module Gitlab # rubocop: enable CodeReuse/ActiveRecord end - def abort_on_failure - false - end - private def info(project_id, extra = {}) diff --git a/app/workers/concerns/limited_capacity/job_tracker.rb b/app/workers/concerns/limited_capacity/job_tracker.rb index 4b5ce8a01f6..b4d884f914d 100644 --- a/app/workers/concerns/limited_capacity/job_tracker.rb +++ b/app/workers/concerns/limited_capacity/job_tracker.rb @@ -1,4 +1,5 @@ # frozen_string_literal: true + module LimitedCapacity class JobTracker # rubocop:disable Scalability/IdempotentWorker include Gitlab::Utils::StrongMemoize diff --git a/app/workers/concerns/limited_capacity/worker.rb b/app/workers/concerns/limited_capacity/worker.rb index af66d80b3e9..0a79c5c46d5 100644 --- a/app/workers/concerns/limited_capacity/worker.rb +++ b/app/workers/concerns/limited_capacity/worker.rb @@ -1,41 +1,5 @@ # frozen_string_literal: true -# Usage: -# -# Worker that performs the tasks: -# -# class DummyWorker -# include ApplicationWorker -# include LimitedCapacity::Worker -# -# # For each job that raises any error, a worker instance will be disabled -# # until the next schedule-run. -# # If you wish to get around this, exceptions must by handled by the implementer. -# # -# def perform_work(*args) -# end -# -# def remaining_work_count(*args) -# 5 -# end -# -# def max_running_jobs -# 25 -# end -# end -# -# Cron worker to fill the pool of regular workers: -# -# class ScheduleDummyCronWorker -# include ApplicationWorker -# include CronjobQueue -# -# def perform(*args) -# DummyWorker.perform_with_capacity(*args) -# end -# end -# - module LimitedCapacity module Worker extend ActiveSupport::Concern diff --git a/app/workers/concerns/worker_attributes.rb b/app/workers/concerns/worker_attributes.rb index 02eda924b71..cb09aaf1a6a 100644 --- a/app/workers/concerns/worker_attributes.rb +++ b/app/workers/concerns/worker_attributes.rb @@ -155,6 +155,10 @@ module WorkerAttributes ::Gitlab::SidekiqMiddleware::PauseControl::WorkersMap.set_strategy_for(strategy: value, worker: self) end + def get_pause_control + ::Gitlab::SidekiqMiddleware::PauseControl::WorkersMap.strategy_for(worker: self) + end + def get_weight get_class_attribute(:weight) || NAMESPACE_WEIGHTS[queue_namespace] || diff --git a/app/workers/database/batched_background_migration/ci_database_worker.rb b/app/workers/database/batched_background_migration/ci_database_worker.rb index 58b0f5496f4..417af4c7172 100644 --- a/app/workers/database/batched_background_migration/ci_database_worker.rb +++ b/app/workers/database/batched_background_migration/ci_database_worker.rb @@ -1,4 +1,5 @@ # frozen_string_literal: true + module Database module BatchedBackgroundMigration class CiDatabaseWorker # rubocop:disable Scalability/IdempotentWorker diff --git a/app/workers/delete_container_repository_worker.rb b/app/workers/delete_container_repository_worker.rb deleted file mode 100644 index d0552dce9fc..00000000000 --- a/app/workers/delete_container_repository_worker.rb +++ /dev/null @@ -1,15 +0,0 @@ -# frozen_string_literal: true - -class DeleteContainerRepositoryWorker # rubocop:disable Scalability/IdempotentWorker - include ApplicationWorker - include ExclusiveLeaseGuard - - data_consistency :always - - sidekiq_options retry: 3 - - queue_namespace :container_repository - feature_category :container_registry - - def perform(current_user_id, container_repository_id); end -end diff --git a/app/workers/environments/stop_job_failed_worker.rb b/app/workers/environments/stop_job_failed_worker.rb new file mode 100644 index 00000000000..c04601e0428 --- /dev/null +++ b/app/workers/environments/stop_job_failed_worker.rb @@ -0,0 +1,25 @@ +# frozen_string_literal: true + +module Environments + class StopJobFailedWorker + include ApplicationWorker + + data_consistency :delayed + idempotent! + feature_category :continuous_delivery + + def perform(job_id, _params = {}) + Ci::Processable.find_by_id(job_id).try do |job| + revert_environment(job) if job.stops_environment? && job.failed? + end + end + + private + + def revert_environment(job) + return if job.persisted_environment.nil? + + job.persisted_environment.fire_state_event(:recover_stuck_stopping) + end + end +end diff --git a/app/workers/gitlab/bitbucket_import/advance_stage_worker.rb b/app/workers/gitlab/bitbucket_import/advance_stage_worker.rb index 7f281352a1b..ed89f332652 100644 --- a/app/workers/gitlab/bitbucket_import/advance_stage_worker.rb +++ b/app/workers/gitlab/bitbucket_import/advance_stage_worker.rb @@ -20,13 +20,23 @@ module Gitlab # The known importer stages and their corresponding Sidekiq workers. STAGES = { + repository: Stage::ImportRepositoryWorker, + pull_requests: Stage::ImportPullRequestsWorker, + pull_requests_notes: Stage::ImportPullRequestsNotesWorker, + issues: Stage::ImportIssuesWorker, + issues_notes: Stage::ImportIssuesNotesWorker, + lfs_objects: Stage::ImportLfsObjectsWorker, finish: Stage::FinishImportWorker }.freeze - def find_import_state(project_id) + def find_import_state_jid(project_id) ProjectImportState.jid_by(project_id: project_id, status: :started) end + def find_import_state(id) + ProjectImportState.find(id) + end + private def next_stage_worker(next_stage) diff --git a/app/workers/gitlab/bitbucket_import/import_issue_notes_worker.rb b/app/workers/gitlab/bitbucket_import/import_issue_notes_worker.rb new file mode 100644 index 00000000000..de8239f30d9 --- /dev/null +++ b/app/workers/gitlab/bitbucket_import/import_issue_notes_worker.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true + +module Gitlab + module BitbucketImport + class ImportIssueNotesWorker # rubocop:disable Scalability/IdempotentWorker + include ObjectImporter + + def importer_class + Importers::IssueNotesImporter + end + end + end +end diff --git a/app/workers/gitlab/bitbucket_import/import_issue_worker.rb b/app/workers/gitlab/bitbucket_import/import_issue_worker.rb new file mode 100644 index 00000000000..7df3f6d4a62 --- /dev/null +++ b/app/workers/gitlab/bitbucket_import/import_issue_worker.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true + +module Gitlab + module BitbucketImport + class ImportIssueWorker # rubocop:disable Scalability/IdempotentWorker + include ObjectImporter + + def importer_class + Importers::IssueImporter + end + end + end +end diff --git a/app/workers/gitlab/bitbucket_import/import_lfs_object_worker.rb b/app/workers/gitlab/bitbucket_import/import_lfs_object_worker.rb new file mode 100644 index 00000000000..39b66684026 --- /dev/null +++ b/app/workers/gitlab/bitbucket_import/import_lfs_object_worker.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true + +module Gitlab + module BitbucketImport + class ImportLfsObjectWorker # rubocop:disable Scalability/IdempotentWorker + include ObjectImporter + + def importer_class + Importers::LfsObjectImporter + end + end + end +end diff --git a/app/workers/gitlab/bitbucket_import/import_pull_request_notes_worker.rb b/app/workers/gitlab/bitbucket_import/import_pull_request_notes_worker.rb new file mode 100644 index 00000000000..8c9f84c97a5 --- /dev/null +++ b/app/workers/gitlab/bitbucket_import/import_pull_request_notes_worker.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true + +module Gitlab + module BitbucketImport + class ImportPullRequestNotesWorker # rubocop:disable Scalability/IdempotentWorker + include ObjectImporter + + def importer_class + Importers::PullRequestNotesImporter + end + end + end +end diff --git a/app/workers/gitlab/bitbucket_import/stage/import_issues_notes_worker.rb b/app/workers/gitlab/bitbucket_import/stage/import_issues_notes_worker.rb new file mode 100644 index 00000000000..cbd67099086 --- /dev/null +++ b/app/workers/gitlab/bitbucket_import/stage/import_issues_notes_worker.rb @@ -0,0 +1,30 @@ +# frozen_string_literal: true + +module Gitlab + module BitbucketImport + module Stage + class ImportIssuesNotesWorker # rubocop:disable Scalability/IdempotentWorker + include StageMethods + + private + + # project - An instance of Project. + def import(project) + waiter = importer_class.new(project).execute + + project.import_state.refresh_jid_expiration + + AdvanceStageWorker.perform_async( + project.id, + { waiter.key => waiter.jobs_remaining }, + :lfs_objects + ) + end + + def importer_class + Importers::IssuesNotesImporter + end + end + end + end +end diff --git a/app/workers/gitlab/bitbucket_import/stage/import_issues_worker.rb b/app/workers/gitlab/bitbucket_import/stage/import_issues_worker.rb new file mode 100644 index 00000000000..31a11d802c7 --- /dev/null +++ b/app/workers/gitlab/bitbucket_import/stage/import_issues_worker.rb @@ -0,0 +1,30 @@ +# frozen_string_literal: true + +module Gitlab + module BitbucketImport + module Stage + class ImportIssuesWorker # rubocop:disable Scalability/IdempotentWorker + include StageMethods + + private + + # project - An instance of Project. + def import(project) + waiter = importer_class.new(project).execute + + project.import_state.refresh_jid_expiration + + AdvanceStageWorker.perform_async( + project.id, + { waiter.key => waiter.jobs_remaining }, + :issues_notes + ) + end + + def importer_class + Importers::IssuesImporter + end + end + end + end +end diff --git a/app/workers/gitlab/bitbucket_import/stage/import_lfs_objects_worker.rb b/app/workers/gitlab/bitbucket_import/stage/import_lfs_objects_worker.rb new file mode 100644 index 00000000000..c88a1be3446 --- /dev/null +++ b/app/workers/gitlab/bitbucket_import/stage/import_lfs_objects_worker.rb @@ -0,0 +1,30 @@ +# frozen_string_literal: true + +module Gitlab + module BitbucketImport + module Stage + class ImportLfsObjectsWorker # rubocop:disable Scalability/IdempotentWorker + include StageMethods + + private + + # project - An instance of Project. + def import(project) + waiter = importer_class.new(project).execute + + project.import_state.refresh_jid_expiration + + AdvanceStageWorker.perform_async( + project.id, + { waiter.key => waiter.jobs_remaining }, + :finish + ) + end + + def importer_class + Importers::LfsObjectsImporter + end + end + end + end +end diff --git a/app/workers/gitlab/bitbucket_import/stage/import_pull_requests_notes_worker.rb b/app/workers/gitlab/bitbucket_import/stage/import_pull_requests_notes_worker.rb new file mode 100644 index 00000000000..36d60c7246c --- /dev/null +++ b/app/workers/gitlab/bitbucket_import/stage/import_pull_requests_notes_worker.rb @@ -0,0 +1,30 @@ +# frozen_string_literal: true + +module Gitlab + module BitbucketImport + module Stage + class ImportPullRequestsNotesWorker # rubocop:disable Scalability/IdempotentWorker + include StageMethods + + private + + # project - An instance of Project. + def import(project) + waiter = importer_class.new(project).execute + + project.import_state.refresh_jid_expiration + + AdvanceStageWorker.perform_async( + project.id, + { waiter.key => waiter.jobs_remaining }, + :issues + ) + end + + def importer_class + Importers::PullRequestsNotesImporter + end + end + end + end +end diff --git a/app/workers/gitlab/bitbucket_import/stage/import_pull_requests_worker.rb b/app/workers/gitlab/bitbucket_import/stage/import_pull_requests_worker.rb index e1f3b5ab79a..3f85c832d50 100644 --- a/app/workers/gitlab/bitbucket_import/stage/import_pull_requests_worker.rb +++ b/app/workers/gitlab/bitbucket_import/stage/import_pull_requests_worker.rb @@ -17,7 +17,7 @@ module Gitlab AdvanceStageWorker.perform_async( project.id, { waiter.key => waiter.jobs_remaining }, - :finish + :pull_requests_notes ) end diff --git a/app/workers/gitlab/bitbucket_server_import/advance_stage_worker.rb b/app/workers/gitlab/bitbucket_server_import/advance_stage_worker.rb index 2c8db639725..1fc35725c9f 100644 --- a/app/workers/gitlab/bitbucket_server_import/advance_stage_worker.rb +++ b/app/workers/gitlab/bitbucket_server_import/advance_stage_worker.rb @@ -25,10 +25,14 @@ module Gitlab finish: Stage::FinishImportWorker }.freeze - def find_import_state(project_id) + def find_import_state_jid(project_id) ProjectImportState.jid_by(project_id: project_id, status: :started) end + def find_import_state(id) + ProjectImportState.find(id) + end + private def next_stage_worker(next_stage) diff --git a/app/workers/gitlab/github_gists_import/import_gist_worker.rb b/app/workers/gitlab/github_gists_import/import_gist_worker.rb index 60e4c8fdad6..151788150dd 100644 --- a/app/workers/gitlab/github_gists_import/import_gist_worker.rb +++ b/app/workers/gitlab/github_gists_import/import_gist_worker.rb @@ -106,9 +106,9 @@ module Gitlab def error(user_id, error_message, github_identifiers) attributes = { user_id: user_id, - github_identifiers: github_identifiers, + external_identifiers: github_identifiers, message: 'importer failed', - 'error.message': error_message + 'exception.message': error_message } Gitlab::GithubImport::Logger.error(structured_payload(attributes)) @@ -120,7 +120,7 @@ module Gitlab attributes = { user_id: user_id, message: message, - github_identifiers: gist_id + external_identifiers: gist_id } Gitlab::GithubImport::Logger.info(structured_payload(attributes)) diff --git a/app/workers/gitlab/github_gists_import/start_import_worker.rb b/app/workers/gitlab/github_gists_import/start_import_worker.rb index 33c91611719..f7d3eb1d759 100644 --- a/app/workers/gitlab/github_gists_import/start_import_worker.rb +++ b/app/workers/gitlab/github_gists_import/start_import_worker.rb @@ -51,7 +51,7 @@ module Gitlab end def log_error_and_raise!(user_id, error) - logger.error(structured_payload(user_id: user_id, message: 'import failed', 'error.message': error.message)) + logger.error(structured_payload(user_id: user_id, message: 'import failed', 'exception.message': error.message)) raise(error) end diff --git a/app/workers/gitlab/github_import/advance_stage_worker.rb b/app/workers/gitlab/github_import/advance_stage_worker.rb index 45f4bf486d7..a012241e90c 100644 --- a/app/workers/gitlab/github_import/advance_stage_worker.rb +++ b/app/workers/gitlab/github_import/advance_stage_worker.rb @@ -33,10 +33,14 @@ module Gitlab finish: Stage::FinishImportWorker }.freeze - def find_import_state(project_id) + def find_import_state_jid(project_id) ProjectImportState.jid_by(project_id: project_id, status: :started) end + def find_import_state(id) + ProjectImportState.find(id) + end + private def next_stage_worker(next_stage) diff --git a/app/workers/gitlab/github_import/refresh_import_jid_worker.rb b/app/workers/gitlab/github_import/refresh_import_jid_worker.rb index 2b9fb26d53a..3de4bef053f 100644 --- a/app/workers/gitlab/github_import/refresh_import_jid_worker.rb +++ b/app/workers/gitlab/github_import/refresh_import_jid_worker.rb @@ -7,7 +7,6 @@ module Gitlab data_consistency :always - sidekiq_options retry: 3 include GithubImport::Queue # The interval to schedule new instances of this job at. diff --git a/app/workers/gitlab/github_import/stage/finish_import_worker.rb b/app/workers/gitlab/github_import/stage/finish_import_worker.rb index e716eda5c99..90445a6d46c 100644 --- a/app/workers/gitlab/github_import/stage/finish_import_worker.rb +++ b/app/workers/gitlab/github_import/stage/finish_import_worker.rb @@ -8,7 +8,6 @@ module Gitlab data_consistency :always - sidekiq_options retry: 3 include GithubImport::Queue include StageMethods diff --git a/app/workers/gitlab/github_import/stage/import_attachments_worker.rb b/app/workers/gitlab/github_import/stage/import_attachments_worker.rb index 4045852e3f0..f9952f04e99 100644 --- a/app/workers/gitlab/github_import/stage/import_attachments_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_attachments_worker.rb @@ -8,7 +8,6 @@ module Gitlab data_consistency :always - sidekiq_options retry: 5 include GithubImport::Queue include StageMethods diff --git a/app/workers/gitlab/github_import/stage/import_base_data_worker.rb b/app/workers/gitlab/github_import/stage/import_base_data_worker.rb index cc6a2255160..94cb3cb6c71 100644 --- a/app/workers/gitlab/github_import/stage/import_base_data_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_base_data_worker.rb @@ -8,7 +8,6 @@ module Gitlab data_consistency :always - sidekiq_options retry: 3 include GithubImport::Queue include StageMethods @@ -31,22 +30,6 @@ module Gitlab project.import_state.refresh_jid_expiration ImportPullRequestsWorker.perform_async(project.id) - rescue StandardError => e - Gitlab::Import::ImportFailureService.track( - project_id: project.id, - error_source: self.class.name, - exception: e, - fail_import: abort_on_failure, - metrics: true - ) - - raise(e) - end - - private - - def abort_on_failure - true end end end diff --git a/app/workers/gitlab/github_import/stage/import_collaborators_worker.rb b/app/workers/gitlab/github_import/stage/import_collaborators_worker.rb index 8f72cc051b3..751ca92388a 100644 --- a/app/workers/gitlab/github_import/stage/import_collaborators_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_collaborators_worker.rb @@ -8,7 +8,6 @@ module Gitlab data_consistency :always - sidekiq_options retry: 3 include GithubImport::Queue include StageMethods @@ -24,16 +23,6 @@ module Gitlab project.import_state.refresh_jid_expiration move_to_next_stage(project, { waiter.key => waiter.jobs_remaining }) - rescue StandardError => e - Gitlab::Import::ImportFailureService.track( - project_id: project.id, - error_source: self.class.name, - exception: e, - fail_import: abort_on_failure, - metrics: true - ) - - raise(e) end private @@ -58,10 +47,6 @@ module Gitlab project.id, waiters, :pull_requests_merged_by ) end - - def abort_on_failure - true - end end end end diff --git a/app/workers/gitlab/github_import/stage/import_issue_events_worker.rb b/app/workers/gitlab/github_import/stage/import_issue_events_worker.rb index 54ed4c47e78..c80412d941b 100644 --- a/app/workers/gitlab/github_import/stage/import_issue_events_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_issue_events_worker.rb @@ -8,7 +8,6 @@ module Gitlab data_consistency :always - sidekiq_options retry: 3 include GithubImport::Queue include StageMethods diff --git a/app/workers/gitlab/github_import/stage/import_issues_and_diff_notes_worker.rb b/app/workers/gitlab/github_import/stage/import_issues_and_diff_notes_worker.rb index 3d1a8437da2..592b789cc94 100644 --- a/app/workers/gitlab/github_import/stage/import_issues_and_diff_notes_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_issues_and_diff_notes_worker.rb @@ -8,7 +8,6 @@ module Gitlab data_consistency :always - sidekiq_options retry: 3 include GithubImport::Queue include StageMethods diff --git a/app/workers/gitlab/github_import/stage/import_lfs_objects_worker.rb b/app/workers/gitlab/github_import/stage/import_lfs_objects_worker.rb index f6f5687130f..e89a850c991 100644 --- a/app/workers/gitlab/github_import/stage/import_lfs_objects_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_lfs_objects_worker.rb @@ -8,7 +8,6 @@ module Gitlab data_consistency :always - sidekiq_options retry: 3 include GithubImport::Queue include StageMethods diff --git a/app/workers/gitlab/github_import/stage/import_notes_worker.rb b/app/workers/gitlab/github_import/stage/import_notes_worker.rb index 40ca12b130f..c1fdb76d03e 100644 --- a/app/workers/gitlab/github_import/stage/import_notes_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_notes_worker.rb @@ -8,7 +8,6 @@ module Gitlab data_consistency :always - sidekiq_options retry: 3 include GithubImport::Queue include StageMethods diff --git a/app/workers/gitlab/github_import/stage/import_protected_branches_worker.rb b/app/workers/gitlab/github_import/stage/import_protected_branches_worker.rb index 73f4ea580c4..f8448094c28 100644 --- a/app/workers/gitlab/github_import/stage/import_protected_branches_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_protected_branches_worker.rb @@ -8,7 +8,6 @@ module Gitlab data_consistency :always - sidekiq_options retry: 3 include GithubImport::Queue include StageMethods @@ -27,15 +26,6 @@ module Gitlab { waiter.key => waiter.jobs_remaining }, :lfs_objects ) - rescue StandardError => e - Gitlab::Import::ImportFailureService.track( - project_id: project.id, - error_source: self.class.name, - exception: e, - metrics: true - ) - - raise(e) end end end diff --git a/app/workers/gitlab/github_import/stage/import_pull_requests_merged_by_worker.rb b/app/workers/gitlab/github_import/stage/import_pull_requests_merged_by_worker.rb index 329bf8f84b1..2e7cd28578f 100644 --- a/app/workers/gitlab/github_import/stage/import_pull_requests_merged_by_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_pull_requests_merged_by_worker.rb @@ -8,7 +8,6 @@ module Gitlab data_consistency :always - sidekiq_options retry: 3 include GithubImport::Queue include StageMethods diff --git a/app/workers/gitlab/github_import/stage/import_pull_requests_review_requests_worker.rb b/app/workers/gitlab/github_import/stage/import_pull_requests_review_requests_worker.rb index bcbf5dd471a..2f860349e25 100644 --- a/app/workers/gitlab/github_import/stage/import_pull_requests_review_requests_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_pull_requests_review_requests_worker.rb @@ -8,7 +8,6 @@ module Gitlab data_consistency :always - sidekiq_options retry: 3 include GithubImport::Queue include StageMethods diff --git a/app/workers/gitlab/github_import/stage/import_pull_requests_reviews_worker.rb b/app/workers/gitlab/github_import/stage/import_pull_requests_reviews_worker.rb index 33dee47bd03..51730033133 100644 --- a/app/workers/gitlab/github_import/stage/import_pull_requests_reviews_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_pull_requests_reviews_worker.rb @@ -8,7 +8,6 @@ module Gitlab data_consistency :always - sidekiq_options retry: 3 include GithubImport::Queue include StageMethods diff --git a/app/workers/gitlab/github_import/stage/import_pull_requests_worker.rb b/app/workers/gitlab/github_import/stage/import_pull_requests_worker.rb index b2dfded0280..029d38d8b93 100644 --- a/app/workers/gitlab/github_import/stage/import_pull_requests_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_pull_requests_worker.rb @@ -8,7 +8,6 @@ module Gitlab data_consistency :always - sidekiq_options retry: 3 include GithubImport::Queue include StageMethods @@ -33,16 +32,6 @@ module Gitlab { waiter.key => waiter.jobs_remaining }, :collaborators ) - rescue StandardError => e - Gitlab::Import::ImportFailureService.track( - project_id: project.id, - error_source: self.class.name, - exception: e, - fail_import: abort_on_failure, - metrics: true - ) - - raise(e) end private @@ -57,10 +46,6 @@ module Gitlab MergeRequest.track_target_project_iid!(project, last_github_pull_request[:number]) end - - def abort_on_failure - true - end end end end diff --git a/app/workers/gitlab/github_import/stage/import_repository_worker.rb b/app/workers/gitlab/github_import/stage/import_repository_worker.rb index d998771b328..2a62930b5ea 100644 --- a/app/workers/gitlab/github_import/stage/import_repository_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_repository_worker.rb @@ -8,7 +8,6 @@ module Gitlab data_consistency :always - sidekiq_options retry: 3 include GithubImport::Queue include StageMethods @@ -34,17 +33,6 @@ module Gitlab counter.increment ImportBaseDataWorker.perform_async(project.id) - - rescue StandardError => e - Gitlab::Import::ImportFailureService.track( - project_id: project.id, - error_source: self.class.name, - exception: e, - fail_import: abort_on_failure, - metrics: true - ) - - raise(e) end def counter @@ -54,10 +42,6 @@ module Gitlab ) end - def abort_on_failure - true - end - private def allocate_issues_internal_id!(project, client) diff --git a/app/workers/gitlab/import/advance_stage.rb b/app/workers/gitlab/import/advance_stage.rb index 5d5abc88388..180c08905ff 100644 --- a/app/workers/gitlab/import/advance_stage.rb +++ b/app/workers/gitlab/import/advance_stage.rb @@ -4,6 +4,9 @@ module Gitlab module Import module AdvanceStage INTERVAL = 30.seconds.to_i + TIMEOUT_DURATION = 2.hours + + AdvanceStageTimeoutError = Class.new(StandardError) # The number of seconds to wait (while blocking the thread) before # continuing to the next waiter. @@ -14,30 +17,35 @@ module Gitlab # remaining jobs. # next_stage - The name of the next stage to start when all jobs have been # completed. - def perform(project_id, waiters, next_stage) - import_state = find_import_state(project_id) + # timeout_timer - Time the sidekiq worker was first initiated with the current job_count + # previous_job_count - Number of jobs remaining on last invocation of this worker + def perform(project_id, waiters, next_stage, timeout_timer = Time.zone.now, previous_job_count = nil) + import_state_jid = find_import_state_jid(project_id) # If the import state is nil the project may have been deleted or the import # may have failed or been canceled. In this case we tidy up the cache data and no # longer attempt to advance to the next stage. - if import_state.nil? + if import_state_jid.nil? clear_waiter_caches(waiters) return end new_waiters = wait_for_jobs(waiters) + new_job_count = new_waiters.values.sum + + # Reset the timeout timer as some jobs finished processing + if new_job_count != previous_job_count + timeout_timer = Time.zone.now + previous_job_count = new_job_count + end if new_waiters.empty? - # We refresh the import JID here so workers importing individual - # resources (e.g. notes) don't have to do this all the time, reducing - # the pressure on Redis. We _only_ do this once all jobs are done so - # we don't get stuck forever if one or more jobs failed to notify the - # JobWaiter. - import_state.refresh_jid_expiration - - next_stage_worker(next_stage).perform_async(project_id) + proceed_to_next_stage(import_state_jid, next_stage, project_id) + elsif timeout_reached?(timeout_timer) && new_job_count == previous_job_count + + handle_timeout(import_state_jid, next_stage, project_id, new_waiters, new_job_count) else - self.class.perform_in(INTERVAL, project_id, new_waiters, next_stage) + self.class.perform_in(INTERVAL, project_id, new_waiters, next_stage, timeout_timer, previous_job_count) end end @@ -55,12 +63,66 @@ module Gitlab end end - def find_import_state(project_id) + def find_import_state_jid(project_id) + raise NotImplementedError + end + + def find_import_state(id) raise NotImplementedError end private + def proceed_to_next_stage(import_state_jid, next_stage, project_id) + # We refresh the import JID here so workers importing individual + # resources (e.g. notes) don't have to do this all the time, reducing + # the pressure on Redis. We _only_ do this once all jobs are done so + # we don't get stuck forever if one or more jobs failed to notify the + # JobWaiter. + import_state_jid.refresh_jid_expiration + + next_stage_worker(next_stage).perform_async(project_id) + end + + def handle_timeout(import_state_jid, next_stage, project_id, new_waiters, new_job_count) + project = Project.find_by_id(project_id) + strategy = project.import_data&.data&.dig("timeout_strategy") || ProjectImportData::PESSIMISTIC_TIMEOUT + + Gitlab::Import::Logger.info( + message: 'Timeout reached, no longer retrying', + project_id: project_id, + jobs_remaining: new_job_count, + waiters: new_waiters, + timeout_strategy: strategy + ) + + clear_waiter_caches(new_waiters) + + case strategy + when ProjectImportData::OPTIMISTIC_TIMEOUT + proceed_to_next_stage(import_state_jid, next_stage, project_id) + when ProjectImportData::PESSIMISTIC_TIMEOUT + import_state = find_import_state(import_state_jid.id) + fail_import_and_log_status(import_state) + end + end + + def fail_import_and_log_status(import_state) + raise AdvanceStageTimeoutError, "Failing advance stage, timeout reached with pessimistic strategy" + rescue AdvanceStageTimeoutError => e + Gitlab::Import::ImportFailureService.track( + import_state: import_state, + exception: e, + error_source: self.class.name, + fail_import: true + ) + end + + def timeout_reached?(timeout_timer) + timeout_timer = Time.zone.parse(timeout_timer) if timeout_timer.is_a?(String) + Time.zone.now > timeout_timer + TIMEOUT_DURATION + end + def next_stage_worker(next_stage) raise NotImplementedError end diff --git a/app/workers/gitlab/import/stuck_project_import_jobs_worker.rb b/app/workers/gitlab/import/stuck_project_import_jobs_worker.rb index 01979b2029f..93d670e1b8b 100644 --- a/app/workers/gitlab/import/stuck_project_import_jobs_worker.rb +++ b/app/workers/gitlab/import/stuck_project_import_jobs_worker.rb @@ -1,4 +1,5 @@ # frozen_string_literal: true + module Gitlab module Import class StuckProjectImportJobsWorker # rubocop:disable Scalability/IdempotentWorker diff --git a/app/workers/gitlab/jira_import/advance_stage_worker.rb b/app/workers/gitlab/jira_import/advance_stage_worker.rb index 5fae7caf791..9641b55a584 100644 --- a/app/workers/gitlab/jira_import/advance_stage_worker.rb +++ b/app/workers/gitlab/jira_import/advance_stage_worker.rb @@ -20,10 +20,14 @@ module Gitlab finish: Gitlab::JiraImport::Stage::FinishImportWorker }.freeze - def find_import_state(project_id) + def find_import_state_jid(project_id) JiraImportState.jid_by(project_id: project_id, status: :started) end + def find_import_state(id) + JiraImportState.find(id) + end + private def next_stage_worker(next_stage) diff --git a/app/workers/gitlab_shell_worker.rb b/app/workers/gitlab_shell_worker.rb deleted file mode 100644 index b3c0fa79658..00000000000 --- a/app/workers/gitlab_shell_worker.rb +++ /dev/null @@ -1,25 +0,0 @@ -# frozen_string_literal: true - -class GitlabShellWorker # rubocop:disable Scalability/IdempotentWorker - include ApplicationWorker - - data_consistency :always - - sidekiq_options retry: 3 - include Gitlab::ShellAdapter - - feature_category :source_code_management - urgency :high - weight 2 - loggable_arguments 0 - - def perform(action, *arg) - if Gitlab::Shell::PERMITTED_ACTIONS.exclude?(action) - raise(ArgumentError, "#{action} not allowed for #{self.class.name}") - end - - Gitlab::GitalyClient::NamespaceService.allow do - gitlab_shell.public_send(action, *arg) # rubocop:disable GitlabSecurity/PublicSend - end - end -end diff --git a/app/workers/hashed_storage/migrator_worker.rb b/app/workers/hashed_storage/migrator_worker.rb index 5f90b8f1009..a7e7a505681 100644 --- a/app/workers/hashed_storage/migrator_worker.rb +++ b/app/workers/hashed_storage/migrator_worker.rb @@ -13,9 +13,6 @@ module HashedStorage # @param [Integer] start initial ID of the batch # @param [Integer] finish last ID of the batch - def perform(start, finish) - migrator = Gitlab::HashedStorage::Migrator.new - migrator.bulk_migrate(start: start, finish: finish) - end + def perform(start, finish); end end end diff --git a/app/workers/hashed_storage/project_migrate_worker.rb b/app/workers/hashed_storage/project_migrate_worker.rb index 01e2d6307de..e1bf71de179 100644 --- a/app/workers/hashed_storage/project_migrate_worker.rb +++ b/app/workers/hashed_storage/project_migrate_worker.rb @@ -13,17 +13,6 @@ module HashedStorage attr_reader :project_id - def perform(project_id, old_disk_path = nil) - @project_id = project_id # we need to set this in order to create the lease_key - - try_obtain_lease do - project = Project.without_deleted.find_by_id(project_id) - break unless project && project.storage_upgradable? - - old_disk_path ||= Storage::LegacyProject.new(project).disk_path - - ::Projects::HashedStorage::MigrationService.new(project, old_disk_path, logger: logger).execute - end - end + def perform(project_id, old_disk_path = nil); end end end diff --git a/app/workers/hashed_storage/project_rollback_worker.rb b/app/workers/hashed_storage/project_rollback_worker.rb index 2ec323248ab..af4223ff354 100644 --- a/app/workers/hashed_storage/project_rollback_worker.rb +++ b/app/workers/hashed_storage/project_rollback_worker.rb @@ -13,17 +13,6 @@ module HashedStorage attr_reader :project_id - def perform(project_id, old_disk_path = nil) - @project_id = project_id # we need to set this in order to create the lease_key - - try_obtain_lease do - project = Project.without_deleted.find_by_id(project_id) - break unless project - - old_disk_path ||= project.disk_path - - ::Projects::HashedStorage::RollbackService.new(project, old_disk_path, logger: logger).execute - end - end + def perform(project_id, old_disk_path = nil); end end end diff --git a/app/workers/hashed_storage/rollbacker_worker.rb b/app/workers/hashed_storage/rollbacker_worker.rb index c6c4990d799..e659e65a370 100644 --- a/app/workers/hashed_storage/rollbacker_worker.rb +++ b/app/workers/hashed_storage/rollbacker_worker.rb @@ -13,9 +13,6 @@ module HashedStorage # @param [Integer] start initial ID of the batch # @param [Integer] finish last ID of the batch - def perform(start, finish) - migrator = Gitlab::HashedStorage::Migrator.new - migrator.bulk_rollback(start: start, finish: finish) - end + def perform(start, finish); end end end diff --git a/app/workers/integrations/irker_worker.rb b/app/workers/integrations/irker_worker.rb index 3152d68b372..4c1f0df0fc7 100644 --- a/app/workers/integrations/irker_worker.rb +++ b/app/workers/integrations/irker_worker.rb @@ -58,7 +58,7 @@ module Integrations allow_local_network: allow_local_requests?, schemes: ['irc']) @socket = TCPSocket.new ip_address, irker_port - rescue Errno::ECONNREFUSED, Gitlab::UrlBlocker::BlockedUrlError => e + rescue Errno::ECONNREFUSED, Gitlab::HTTP_V2::UrlBlocker::BlockedUrlError => e logger.fatal "Can't connect to Irker daemon: #{e}" return false end diff --git a/app/workers/issuable/related_links_create_worker.rb b/app/workers/issuable/related_links_create_worker.rb new file mode 100644 index 00000000000..7cbf70fd5ab --- /dev/null +++ b/app/workers/issuable/related_links_create_worker.rb @@ -0,0 +1,65 @@ +# frozen_string_literal: true + +module Issuable + class RelatedLinksCreateWorker + include ApplicationWorker + + data_consistency :delayed + + sidekiq_options retry: 3 + + feature_category :portfolio_management + worker_resource_boundary :unknown + urgency :high + idempotent! + + def perform(args) + @params = args.with_indifferent_access + @user = User.find_by_id(params[:user_id]) + @issuable = issuable_class.find_by_id(params[:issuable_id]) + @links = issuable_class.related_link_class&.where(id: params[:link_ids]) + return unless user && issuable && links.present? + + create_issuable_notes! + rescue ArgumentError => error + logger.error( + worker: self.class.name, + message: "Failed to complete job (user_id:#{params[:user_id]}, issuable_id:#{params[:issuable_id]}, " \ + "issuable_class:#{params[:issuable_class]}): #{error.message}" + ) + end + + private + + attr_reader :params, :user, :issuable, :links + + def issuable_class + params[:issuable_class].constantize + rescue NameError + raise ArgumentError, "Unknown class '#{params[:issuable_class]}'" + end + + def create_issuable_notes! + errors = create_notes.compact + return unless errors.any? + + raise ArgumentError, "Could not create notes: #{errors.join(', ')}" + end + + def create_notes + linked_item_notes_errors = links.filter_map { |link| create_system_note(link.target, issuable) } + issuable_note_error = create_system_note(issuable, links.collect(&:target)) + + linked_item_notes_errors << issuable_note_error + end + + def create_system_note(noteable, references, method_name = :relate_issuable) + note = ::SystemNoteService.try(method_name, noteable, references, user) + return if note.present? + + "{noteable_id: #{noteable.id}, reference_ids: #{[references].flatten.collect(&:id)}}" + end + end +end + +Issuable::RelatedLinksCreateWorker.prepend_mod_with('Issuable::RelatedLinksCreateWorker') diff --git a/app/workers/jira_connect/sync_project_worker.rb b/app/workers/jira_connect/sync_project_worker.rb index 40f225ab756..09aa5edc73b 100644 --- a/app/workers/jira_connect/sync_project_worker.rb +++ b/app/workers/jira_connect/sync_project_worker.rb @@ -33,7 +33,10 @@ module JiraConnect # rubocop: disable CodeReuse/ActiveRecord def merge_requests_to_sync(project) - project.merge_requests.with_jira_issue_keys.preload(:author).limit(MAX_RECORDS_LIMIT).order(id: :desc) + project.merge_requests.with_jira_issue_keys + .preload(:author, :approvals, merge_request_reviewers: :reviewer) + .limit(MAX_RECORDS_LIMIT) + .order(id: :desc) end # rubocop: enable CodeReuse/ActiveRecord diff --git a/app/workers/merge_worker.rb b/app/workers/merge_worker.rb index a0594b15e31..29f0c0bbbf4 100644 --- a/app/workers/merge_worker.rb +++ b/app/workers/merge_worker.rb @@ -16,8 +16,6 @@ class MergeWorker # rubocop:disable Scalability/IdempotentWorker deduplicate :until_executed, including_scheduled: true def perform(merge_request_id, current_user_id, params) - params = params.with_indifferent_access - begin current_user = User.find(current_user_id) merge_request = MergeRequest.find(merge_request_id) @@ -25,6 +23,9 @@ class MergeWorker # rubocop:disable Scalability/IdempotentWorker return end + params = params.with_indifferent_access + params[:check_mergeability_retry_lease] = true unless params.has_key?(:check_mergeability_retry_lease) + MergeRequests::MergeService.new(project: merge_request.target_project, current_user: current_user, params: params) .execute(merge_request) end diff --git a/app/workers/pages/deactivated_deployments_delete_cron_worker.rb b/app/workers/pages/deactivated_deployments_delete_cron_worker.rb new file mode 100644 index 00000000000..7ee6327cea7 --- /dev/null +++ b/app/workers/pages/deactivated_deployments_delete_cron_worker.rb @@ -0,0 +1,19 @@ +# frozen_string_literal: true + +module Pages + class DeactivatedDeploymentsDeleteCronWorker + include ApplicationWorker + include CronjobQueue # rubocop: disable Scalability/CronWorkerContext + + idempotent! + data_consistency :always # rubocop: disable SidekiqLoadBalancing/WorkerDataConsistency + + feature_category :pages + + def perform + PagesDeployment.deactivated.each_batch do |deployments| # rubocop: disable Style/SymbolProc + deployments.delete_all + end + end + end +end diff --git a/app/workers/projects/after_import_worker.rb b/app/workers/projects/after_import_worker.rb index 06211b2d991..47bd07d0850 100644 --- a/app/workers/projects/after_import_worker.rb +++ b/app/workers/projects/after_import_worker.rb @@ -31,7 +31,7 @@ module Projects message: 'Project housekeeping failed', project_full_path: @project.full_path, project_id: @project.id, - 'error.message' => e.message + 'exception.message' => e.message ) end diff --git a/app/workers/projects/record_target_platforms_worker.rb b/app/workers/projects/record_target_platforms_worker.rb index bbe0c63cfd1..d458c9563d0 100644 --- a/app/workers/projects/record_target_platforms_worker.rb +++ b/app/workers/projects/record_target_platforms_worker.rb @@ -8,7 +8,7 @@ module Projects LEASE_TIMEOUT = 1.hour.to_i APPLE_PLATFORM_LANGUAGES = %w[swift objective-c].freeze - feature_category :experimentation_activation + feature_category :activation data_consistency :always deduplicate :until_executed urgency :low diff --git a/app/workers/run_pipeline_schedule_worker.rb b/app/workers/run_pipeline_schedule_worker.rb index dab92e16ee3..61ef7494d38 100644 --- a/app/workers/run_pipeline_schedule_worker.rb +++ b/app/workers/run_pipeline_schedule_worker.rb @@ -22,7 +22,7 @@ class RunPipelineScheduleWorker # rubocop:disable Scalability/IdempotentWorker options.symbolize_keys! if options[:scheduling] - return if schedule.next_run_at > Time.current + return if schedule.next_run_at.future? update_next_run_at_for(schedule) end diff --git a/app/workers/tasks_to_be_done/create_worker.rb b/app/workers/tasks_to_be_done/create_worker.rb index d3824ceb4ae..91046e3cfed 100644 --- a/app/workers/tasks_to_be_done/create_worker.rb +++ b/app/workers/tasks_to_be_done/create_worker.rb @@ -11,21 +11,8 @@ module TasksToBeDone worker_resource_boundary :cpu def perform(member_task_id, current_user_id, assignee_ids = []) - member_task = MemberTask.find(member_task_id) - current_user = User.find(current_user_id) - project = member_task.project - - member_task.tasks_to_be_done.each do |task| - service_class(task) - .new(container: project, current_user: current_user, assignee_ids: assignee_ids) - .execute - end - end - - private - - def service_class(task) - "TasksToBeDone::Create#{task.to_s.camelize}TaskService".constantize + # no-op removing + # https://docs.gitlab.com/ee/development/sidekiq/compatibility_across_updates.html#removing-worker-classes end end end |