diff options
Diffstat (limited to 'app/workers')
27 files changed, 444 insertions, 46 deletions
diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml index 6f6fd9ddb65..1664add1ac9 100644 --- a/app/workers/all_queues.yml +++ b/app/workers/all_queues.yml @@ -219,6 +219,15 @@ :weight: 1 :idempotent: true :tags: [] +- :name: cronjob:batched_git_ref_updates_cleanup_scheduler + :worker_name: BatchedGitRefUpdates::CleanupSchedulerWorker + :feature_category: :gitaly + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: cronjob:bulk_imports_stuck_import :worker_name: BulkImports::StuckImportWorker :feature_category: :importers @@ -552,6 +561,15 @@ :weight: 1 :idempotent: false :tags: [] +- :name: cronjob:members_expiring + :worker_name: Members::ExpiringWorker + :feature_category: :system_access + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: false + :tags: [] - :name: cronjob:metrics_global_metrics_update :worker_name: Metrics::GlobalMetricsUpdateWorker :feature_category: :metrics @@ -660,6 +678,15 @@ :weight: 1 :idempotent: true :tags: [] +- :name: cronjob:pause_control_resume + :worker_name: PauseControl::ResumeWorker + :feature_category: :global_search + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: cronjob:personal_access_tokens_expired_notification :worker_name: PersonalAccessTokens::ExpiredNotificationWorker :feature_category: :system_access @@ -795,6 +822,15 @@ :weight: 1 :idempotent: false :tags: [] +- :name: cronjob:service_desk_custom_email_verification_cleanup + :worker_name: ServiceDesk::CustomEmailVerificationCleanupWorker + :feature_category: :service_desk + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: cronjob:ssh_keys_expired_notification :worker_name: SshKeys::ExpiredNotificationWorker :feature_category: :compliance_management @@ -2298,6 +2334,15 @@ :weight: 1 :idempotent: false :tags: [] +- :name: batched_git_ref_updates_project_cleanup + :worker_name: BatchedGitRefUpdates::ProjectCleanupWorker + :feature_category: :gitaly + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: bitbucket_server_import_advance_stage :worker_name: Gitlab::BitbucketServerImport::AdvanceStageWorker :feature_category: :importers @@ -2438,7 +2483,7 @@ :feature_category: :importers :has_external_dependencies: true :urgency: :low - :resource_boundary: :unknown + :resource_boundary: :memory :weight: 1 :idempotent: false :tags: [] @@ -2523,6 +2568,15 @@ :weight: 1 :idempotent: true :tags: [] +- :name: click_house_events_sync + :worker_name: ClickHouse::EventsSyncWorker + :feature_category: :value_stream_management + :has_external_dependencies: true + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: counters_cleanup_refresh :worker_name: Counters::CleanupRefreshWorker :feature_category: :not_owned @@ -2685,6 +2739,15 @@ :weight: 1 :idempotent: true :tags: [] +- :name: environments_stop_job_success + :worker_name: Environments::StopJobSuccessWorker + :feature_category: :continuous_delivery + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: error_tracking_issue_link :worker_name: ErrorTrackingIssueLinkWorker :feature_category: :error_tracking @@ -2955,6 +3018,15 @@ :weight: 2 :idempotent: :tags: [] +- :name: members_expiring_email_notification + :worker_name: Members::ExpiringEmailNotificationWorker + :feature_category: :system_access + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: merge :worker_name: MergeWorker :feature_category: :source_code_management diff --git a/app/workers/batched_git_ref_updates/cleanup_scheduler_worker.rb b/app/workers/batched_git_ref_updates/cleanup_scheduler_worker.rb new file mode 100644 index 00000000000..9c50e319be0 --- /dev/null +++ b/app/workers/batched_git_ref_updates/cleanup_scheduler_worker.rb @@ -0,0 +1,20 @@ +# frozen_string_literal: true + +module BatchedGitRefUpdates + class CleanupSchedulerWorker + include ApplicationWorker + # Ignore RuboCop as the context is added in the service + include CronjobQueue # rubocop:disable Scalability/CronWorkerContext + + idempotent! + data_consistency :sticky + + feature_category :gitaly + + def perform + stats = CleanupSchedulerService.new.execute + + log_extra_metadata_on_done(:stats, stats) + end + end +end diff --git a/app/workers/batched_git_ref_updates/project_cleanup_worker.rb b/app/workers/batched_git_ref_updates/project_cleanup_worker.rb new file mode 100644 index 00000000000..b2b1df29430 --- /dev/null +++ b/app/workers/batched_git_ref_updates/project_cleanup_worker.rb @@ -0,0 +1,18 @@ +# frozen_string_literal: true + +module BatchedGitRefUpdates + class ProjectCleanupWorker + include ApplicationWorker + + idempotent! + data_consistency :delayed + + feature_category :gitaly + + def perform(project_id) + stats = ProjectCleanupService.new(project_id).execute + + log_extra_metadata_on_done(:stats, stats) + end + end +end diff --git a/app/workers/build_success_worker.rb b/app/workers/build_success_worker.rb index 247105d2a1a..f5baa220715 100644 --- a/app/workers/build_success_worker.rb +++ b/app/workers/build_success_worker.rb @@ -1,5 +1,7 @@ # frozen_string_literal: true +# Deprecated and will be removed in 17.0. +# Use `Environments::StopJobSuccessWorker` instead. class BuildSuccessWorker # rubocop:disable Scalability/IdempotentWorker include ApplicationWorker diff --git a/app/workers/bulk_imports/pipeline_batch_worker.rb b/app/workers/bulk_imports/pipeline_batch_worker.rb index 378eff99b52..634d7ed3c87 100644 --- a/app/workers/bulk_imports/pipeline_batch_worker.rb +++ b/app/workers/bulk_imports/pipeline_batch_worker.rb @@ -9,6 +9,7 @@ module BulkImports feature_category :importers sidekiq_options retry: false, dead: false worker_has_external_dependencies! + worker_resource_boundary :memory def perform(batch_id) @batch = ::BulkImports::BatchTracker.find(batch_id) diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb index e0db18cb987..098e167ac29 100644 --- a/app/workers/bulk_imports/pipeline_worker.rb +++ b/app/workers/bulk_imports/pipeline_worker.rb @@ -42,7 +42,6 @@ module BulkImports def run return skip_tracker if entity.failed? - raise(Pipeline::ExpiredError, 'Pipeline timeout') if job_timeout? raise(Pipeline::FailedError, "Export from source instance failed: #{export_status.error}") if export_failed? raise(Pipeline::ExpiredError, 'Empty export status on source instance') if empty_export_timeout? @@ -181,12 +180,6 @@ module BulkImports "gitlab:bulk_imports:pipeline_worker:#{pipeline_tracker.id}" end - def job_timeout? - return false unless file_extraction_pipeline? - - time_since_tracker_created > Pipeline::NDJSON_EXPORT_TIMEOUT - end - def enqueue_batches 1.upto(export_status.batches_count) do |batch_number| batch = pipeline_tracker.batches.find_or_create_by!(batch_number: batch_number) # rubocop:disable CodeReuse/ActiveRecord diff --git a/app/workers/ci/initial_pipeline_process_worker.rb b/app/workers/ci/initial_pipeline_process_worker.rb index 52a4f075cf0..067dbb7492f 100644 --- a/app/workers/ci/initial_pipeline_process_worker.rb +++ b/app/workers/ci/initial_pipeline_process_worker.rb @@ -32,7 +32,7 @@ module Ci end def create_deployment(build) - ::Deployments::CreateForBuildService.new.execute(build) + ::Deployments::CreateForJobService.new.execute(build) end end end diff --git a/app/workers/ci/pipeline_success_unlock_artifacts_worker.rb b/app/workers/ci/pipeline_success_unlock_artifacts_worker.rb index 2a1f492cacb..2bebfdf9114 100644 --- a/app/workers/ci/pipeline_success_unlock_artifacts_worker.rb +++ b/app/workers/ci/pipeline_success_unlock_artifacts_worker.rb @@ -3,14 +3,16 @@ module Ci class PipelineSuccessUnlockArtifactsWorker include ApplicationWorker + include PipelineBackgroundQueue data_consistency :always sidekiq_options retry: 3 - include PipelineBackgroundQueue idempotent! + defer_on_database_health_signal :gitlab_ci, [:ci_job_artifacts] + def perform(pipeline_id) ::Ci::Pipeline.find_by_id(pipeline_id).try do |pipeline| # TODO: Move this check inside the Ci::UnlockArtifactsService diff --git a/app/workers/click_house/events_sync_worker.rb b/app/workers/click_house/events_sync_worker.rb new file mode 100644 index 00000000000..054e7763297 --- /dev/null +++ b/app/workers/click_house/events_sync_worker.rb @@ -0,0 +1,45 @@ +# frozen_string_literal: true + +module ClickHouse + class EventsSyncWorker + include ApplicationWorker + include Gitlab::ExclusiveLeaseHelpers + + idempotent! + data_consistency :delayed + worker_has_external_dependencies! # the worker interacts with a ClickHouse database + feature_category :value_stream_management + + # the job is scheduled every 3 minutes and we will allow maximum 2.5 minutes runtime + MAX_TTL = 2.5.minutes.to_i + + def perform + unless enabled? + log_extra_metadata_on_done(:result, { status: :disabled }) + + return + end + + metadata = { status: :processed } + + # Prevent parallel jobs + begin + in_lock(self.class.to_s, ttl: MAX_TTL, retries: 0) do + true + end + + rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError + # Skip retrying, just let the next worker to start after a few minutes + metadata = { status: :skipped } + end + + log_extra_metadata_on_done(:result, metadata) + end + + private + + def enabled? + ClickHouse::Client.configuration.databases[:main].present? && Feature.enabled?(:event_sync_worker_for_click_house) + end + end +end diff --git a/app/workers/clusters/agents/notify_git_push_worker.rb b/app/workers/clusters/agents/notify_git_push_worker.rb index d2994bb9144..db1de0b3518 100644 --- a/app/workers/clusters/agents/notify_git_push_worker.rb +++ b/app/workers/clusters/agents/notify_git_push_worker.rb @@ -14,7 +14,6 @@ module Clusters def perform(project_id) return unless project = ::Project.find_by_id(project_id) - return unless Feature.enabled?(:notify_kas_on_git_push, project) Gitlab::Kas::Client.new.send_git_push_event(project: project) end diff --git a/app/workers/concerns/gitlab/github_import/rescheduling_methods.rb b/app/workers/concerns/gitlab/github_import/rescheduling_methods.rb index 772388ffc9e..b40914770b5 100644 --- a/app/workers/concerns/gitlab/github_import/rescheduling_methods.rb +++ b/app/workers/concerns/gitlab/github_import/rescheduling_methods.rb @@ -5,10 +5,15 @@ module Gitlab # Module that provides methods shared by the various workers used for # importing GitHub projects. module ReschedulingMethods + extend ActiveSupport::Concern include JobDelayCalculator ENQUEUED_JOB_COUNT = 'github-importer/enqueued_job_count/%{project}/%{collection}' + included do + loggable_arguments 2 + end + # project_id - The ID of the GitLab project to import the note into. # hash - A Hash containing the details of the GitHub object to import. # notify_key - The Redis key to notify upon completion, if any. diff --git a/app/workers/concerns/packages/error_handling.rb b/app/workers/concerns/packages/error_handling.rb new file mode 100644 index 00000000000..26948d39912 --- /dev/null +++ b/app/workers/concerns/packages/error_handling.rb @@ -0,0 +1,48 @@ +# frozen_string_literal: true + +module Packages + module ErrorHandling + extend ActiveSupport::Concern + + DEFAULT_STATUS_MESSAGE = 'Unexpected error' + + CONTROLLED_ERRORS = [ + ArgumentError, + ActiveRecord::RecordInvalid, + ::Packages::Helm::ExtractFileMetadataService::ExtractionError, + ::Packages::Nuget::ExtractMetadataFileService::ExtractionError, + ::Packages::Nuget::UpdatePackageFromMetadataService::InvalidMetadataError, + ::Packages::Nuget::UpdatePackageFromMetadataService::ZipError, + ::Packages::Rubygems::ProcessGemService::ExtractionError, + ::Packages::Rubygems::ProcessGemService::InvalidMetadataError + ].freeze + + def process_package_file_error(package_file:, exception:, extra_log_payload: {}) + log_payload = { + project_id: package_file.project_id, + package_file_id: package_file.id + }.merge(extra_log_payload) + Gitlab::ErrorTracking.log_exception(exception, **log_payload) + + package_file.package.update_columns( + status: :error, + status_message: truncated_status_message(exception) + ) + end + + private + + def controlled_error?(exception) + CONTROLLED_ERRORS.include?(exception.class) + end + + def truncated_status_message(exception) + status_message = exception.message if controlled_error?(exception) + + # Do not save the exception message in case it contains confidential data + status_message ||= "#{DEFAULT_STATUS_MESSAGE}: #{exception.class}" + + status_message.truncate(::Packages::Package::STATUS_MESSAGE_MAX_LENGTH) + end + end +end diff --git a/app/workers/concerns/worker_attributes.rb b/app/workers/concerns/worker_attributes.rb index c260e06607c..02eda924b71 100644 --- a/app/workers/concerns/worker_attributes.rb +++ b/app/workers/concerns/worker_attributes.rb @@ -151,6 +151,10 @@ module WorkerAttributes set_class_attribute(:weight, value) end + def pause_control(value) + ::Gitlab::SidekiqMiddleware::PauseControl::WorkersMap.set_strategy_for(strategy: value, worker: self) + end + def get_weight get_class_attribute(:weight) || NAMESPACE_WEIGHTS[queue_namespace] || @@ -193,10 +197,10 @@ module WorkerAttributes !!get_class_attribute(:big_payload) end - def defer_on_database_health_signal(gitlab_schema, delay_by = DEFAULT_DEFER_DELAY, tables = []) + def defer_on_database_health_signal(gitlab_schema, tables = [], delay_by = DEFAULT_DEFER_DELAY) set_class_attribute( :database_health_check_attrs, - { gitlab_schema: gitlab_schema, delay_by: delay_by, tables: tables } + { gitlab_schema: gitlab_schema, tables: tables, delay_by: delay_by } ) end diff --git a/app/workers/environments/stop_job_success_worker.rb b/app/workers/environments/stop_job_success_worker.rb new file mode 100644 index 00000000000..cc7d83512f3 --- /dev/null +++ b/app/workers/environments/stop_job_success_worker.rb @@ -0,0 +1,23 @@ +# frozen_string_literal: true + +module Environments + class StopJobSuccessWorker + include ApplicationWorker + + data_consistency :delayed + idempotent! + feature_category :continuous_delivery + + def perform(job_id, _params = {}) + Ci::Build.find_by_id(job_id).try do |build| + stop_environment(build) if build.stops_environment? && build.stop_action_successful? + end + end + + private + + def stop_environment(build) + build.persisted_environment.fire_state_event(:stop_complete) + end + end +end diff --git a/app/workers/integrations/group_mention_worker.rb b/app/workers/integrations/group_mention_worker.rb index 6cde1657ccd..cbf70dc5c6a 100644 --- a/app/workers/integrations/group_mention_worker.rb +++ b/app/workers/integrations/group_mention_worker.rb @@ -22,19 +22,19 @@ module Integrations mentionable = case mentionable_type when 'Issue' - Issue.find(mentionable_id) + Issue.find_by_id(mentionable_id) when 'MergeRequest' - MergeRequest.find(mentionable_id) + MergeRequest.find_by_id(mentionable_id) + else + Sidekiq.logger.error( + message: 'Integrations::GroupMentionWorker: mentionable not supported', + mentionable_type: mentionable_type, + mentionable_id: mentionable_id + ) + nil end - if mentionable.nil? - Sidekiq.logger.error( - message: 'Integrations::GroupMentionWorker: mentionable not supported', - mentionable_type: mentionable_type, - mentionable_id: mentionable_id - ) - return - end + return if mentionable.nil? Integrations::GroupMentionService.new(mentionable, hook_data: hook_data, is_confidential: is_confidential).execute end diff --git a/app/workers/members/expiring_email_notification_worker.rb b/app/workers/members/expiring_email_notification_worker.rb new file mode 100644 index 00000000000..1d0a6eb254a --- /dev/null +++ b/app/workers/members/expiring_email_notification_worker.rb @@ -0,0 +1,28 @@ +# frozen_string_literal: true + +module Members + class ExpiringEmailNotificationWorker # rubocop:disable Scalability/CronWorkerContext + include ApplicationWorker + + data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency + feature_category :system_access + urgency :low + idempotent! + + def perform(member_id) + notification_service = NotificationService.new + member = ::Member.find_by_id(member_id) + + return unless member + return unless Feature.enabled?(:member_expiring_email_notification, member.source.root_ancestor) + return if member.expiry_notified_at.present? + + with_context(user: member.user) do + notification_service.member_about_to_expire(member) + Gitlab::AppLogger.info(message: "Notifying user about expiring membership", member_id: member.id) + + member.update(expiry_notified_at: Time.current) + end + end + end +end diff --git a/app/workers/members/expiring_worker.rb b/app/workers/members/expiring_worker.rb new file mode 100644 index 00000000000..0d631af3a7c --- /dev/null +++ b/app/workers/members/expiring_worker.rb @@ -0,0 +1,32 @@ +# frozen_string_literal: true + +module Members + class ExpiringWorker # rubocop:disable Scalability/IdempotentWorker + include ApplicationWorker + + # rubocop:disable Scalability/CronWorkerContext + # This worker does not perform work scoped to a context + include CronjobQueue + # rubocop:enable Scalability/CronWorkerContext + + data_consistency :sticky + feature_category :system_access + urgency :low + + BATCH_LIMIT = 500 + + def perform + return unless Feature.enabled?(:member_expiring_email_notification) + + limit_date = Member::DAYS_TO_EXPIRE.days.from_now.to_date + + expiring_members = Member.active.where(users: { user_type: :human }).expiring_and_not_notified(limit_date) # rubocop: disable CodeReuse/ActiveRecord + + expiring_members.each_batch(of: BATCH_LIMIT) do |members| + members.pluck_primary_key.each do |member_id| + Members::ExpiringEmailNotificationWorker.perform_async(member_id) + end + end + end + end +end diff --git a/app/workers/merge_requests/mergeability_check_batch_worker.rb b/app/workers/merge_requests/mergeability_check_batch_worker.rb index f48e9c234ab..e95c3952c8c 100644 --- a/app/workers/merge_requests/mergeability_check_batch_worker.rb +++ b/app/workers/merge_requests/mergeability_check_batch_worker.rb @@ -40,8 +40,7 @@ module MergeRequests private def merge_status_recheck_not_allowed?(merge_request, user) - ::Feature.enabled?(:restrict_merge_status_recheck, merge_request.project) && - !Ability.allowed?(user, :update_merge_request, merge_request.project) + !Ability.allowed?(user, :update_merge_request, merge_request.project) end end end diff --git a/app/workers/packages/debian/process_package_file_worker.rb b/app/workers/packages/debian/process_package_file_worker.rb index 0e21e98d182..843560d4334 100644 --- a/app/workers/packages/debian/process_package_file_worker.rb +++ b/app/workers/packages/debian/process_package_file_worker.rb @@ -5,6 +5,7 @@ module Packages class ProcessPackageFileWorker include ApplicationWorker include Gitlab::Utils::StrongMemoize + include ::Packages::ErrorHandling data_consistency :always @@ -24,11 +25,16 @@ module Packages return unless package_file.debian_file_metadatum&.unknown? ::Packages::Debian::ProcessPackageFileService.new(package_file, distribution_name, component_name).execute - rescue StandardError => e - Gitlab::ErrorTracking.log_exception(e, package_file_id: @package_file_id, - distribution_name: @distribution_name, component_name: @component_name) + rescue StandardError => exception package_file.update_column(:status, :error) - package_file.package.update_column(:status, :error) + process_package_file_error( + package_file: package_file, + exception: exception, + extra_log_payload: { + distribution_name: @distribution_name, + component_name: @component_name + } + ) end private diff --git a/app/workers/packages/helm/extraction_worker.rb b/app/workers/packages/helm/extraction_worker.rb index 0ba2d149f77..ca043c5c8c7 100644 --- a/app/workers/packages/helm/extraction_worker.rb +++ b/app/workers/packages/helm/extraction_worker.rb @@ -4,6 +4,7 @@ module Packages module Helm class ExtractionWorker include ApplicationWorker + include ::Packages::ErrorHandling data_consistency :always @@ -19,10 +20,11 @@ module Packages return unless package_file && !package_file.package.default? ::Packages::Helm::ProcessFileService.new(channel, package_file).execute - - rescue StandardError => e - Gitlab::ErrorTracking.log_exception(e, project_id: package_file.project_id) - package_file.package.update_column(:status, :error) + rescue StandardError => exception + process_package_file_error( + package_file: package_file, + exception: exception + ) end end end diff --git a/app/workers/packages/nuget/extraction_worker.rb b/app/workers/packages/nuget/extraction_worker.rb index b8e00621aa1..55aca0beb03 100644 --- a/app/workers/packages/nuget/extraction_worker.rb +++ b/app/workers/packages/nuget/extraction_worker.rb @@ -4,6 +4,7 @@ module Packages module Nuget class ExtractionWorker # rubocop:disable Scalability/IdempotentWorker include ApplicationWorker + include ::Packages::ErrorHandling data_consistency :always @@ -18,10 +19,11 @@ module Packages return unless package_file ::Packages::Nuget::UpdatePackageFromMetadataService.new(package_file).execute - - rescue StandardError => e - Gitlab::ErrorTracking.log_exception(e, project_id: package_file.project_id) - package_file.package.update_column(:status, :error) + rescue StandardError => exception + process_package_file_error( + package_file: package_file, + exception: exception + ) end end end diff --git a/app/workers/packages/rubygems/extraction_worker.rb b/app/workers/packages/rubygems/extraction_worker.rb index dbaf9bc35a9..7076fdb3b90 100644 --- a/app/workers/packages/rubygems/extraction_worker.rb +++ b/app/workers/packages/rubygems/extraction_worker.rb @@ -4,6 +4,7 @@ module Packages module Rubygems class ExtractionWorker # rubocop:disable Scalability/IdempotentWorker include ApplicationWorker + include ::Packages::ErrorHandling data_consistency :always @@ -19,10 +20,11 @@ module Packages return unless package_file ::Packages::Rubygems::ProcessGemService.new(package_file).execute - - rescue StandardError => e - Gitlab::ErrorTracking.log_exception(e, project_id: package_file.project_id) - package_file.package.update_column(:status, :error) + rescue StandardError => exception + process_package_file_error( + package_file: package_file, + exception: exception + ) end end end diff --git a/app/workers/pause_control/resume_worker.rb b/app/workers/pause_control/resume_worker.rb new file mode 100644 index 00000000000..98725c0b6f2 --- /dev/null +++ b/app/workers/pause_control/resume_worker.rb @@ -0,0 +1,50 @@ +# frozen_string_literal: true + +module PauseControl + class ResumeWorker + include ApplicationWorker + # There is no onward scheduling and this cron handles work from across the + # application, so there's no useful context to add. + include CronjobQueue # rubocop:disable Scalability/CronWorkerContext + + RESCHEDULE_DELAY = 1.second + + feature_category :global_search + data_consistency :sticky + idempotent! + urgency :low + + def perform + reschedule_job = false + + pause_strategies_workers.each do |strategy, workers| + strategy_klass = Gitlab::SidekiqMiddleware::PauseControl.for(strategy) + + next if strategy_klass.should_pause? + + workers.each do |worker| + next unless jobs_in_the_queue?(worker) + + queue_size = resume_processing!(worker) + reschedule_job = true if queue_size.to_i > 0 + end + end + + self.class.perform_in(RESCHEDULE_DELAY) if reschedule_job + end + + private + + def jobs_in_the_queue?(worker) + Gitlab::SidekiqMiddleware::PauseControl::PauseControlService.has_jobs_in_waiting_queue?(worker.to_s) + end + + def resume_processing!(worker) + Gitlab::SidekiqMiddleware::PauseControl::PauseControlService.resume_processing!(worker.to_s) + end + + def pause_strategies_workers + Gitlab::SidekiqMiddleware::PauseControl::WorkersMap.workers || [] + end + end +end diff --git a/app/workers/process_commit_worker.rb b/app/workers/process_commit_worker.rb index 708dd3433cb..cc72704d8c9 100644 --- a/app/workers/process_commit_worker.rb +++ b/app/workers/process_commit_worker.rb @@ -19,6 +19,7 @@ class ProcessCommitWorker weight 3 idempotent! loggable_arguments 2, 3 + deduplicate :until_executed, feature_flag: :deduplicate_process_commit_worker # project_id - The ID of the project this commit belongs to. # user_id - The ID of the user that pushed the commit. diff --git a/app/workers/service_desk/custom_email_verification_cleanup_worker.rb b/app/workers/service_desk/custom_email_verification_cleanup_worker.rb new file mode 100644 index 00000000000..6434b9b09bb --- /dev/null +++ b/app/workers/service_desk/custom_email_verification_cleanup_worker.rb @@ -0,0 +1,36 @@ +# frozen_string_literal: true + +module ServiceDesk + # Marks custom email verifications as failed when + # verification has started and timeframe to ingest + # the verification email has closed. + # + # This ensures we can finish the verification process and send verification result emails + # even when we did not receive any verification email. + class CustomEmailVerificationCleanupWorker + include ApplicationWorker + include CronjobQueue + + idempotent! + + data_consistency :sticky + feature_category :service_desk + + def perform + # Limit ensures we have 50ms per verification before another job gets scheduled. + collection = CustomEmailVerification.started.overdue.limit(2_400) + + collection.find_each do |verification| + with_context(project: verification.project) do + CustomEmailVerifications::UpdateService.new( + project: verification.project, + current_user: nil, + params: { + mail: nil + } + ).execute + end + end + end + end +end diff --git a/app/workers/users/deactivate_dormant_users_worker.rb b/app/workers/users/deactivate_dormant_users_worker.rb index d024109e754..87566bff467 100644 --- a/app/workers/users/deactivate_dormant_users_worker.rb +++ b/app/workers/users/deactivate_dormant_users_worker.rb @@ -15,16 +15,21 @@ module Users return unless ::Gitlab::CurrentSettings.current_application_settings.deactivate_dormant_users - deactivate_users(User.dormant) - deactivate_users(User.with_no_activity) + admin_bot = User.admin_bot + return unless admin_bot + + deactivate_users(User.dormant, admin_bot) + deactivate_users(User.with_no_activity, admin_bot) end private - def deactivate_users(scope) + def deactivate_users(scope, admin_bot) with_context(caller_id: self.class.name.to_s) do scope.each_batch do |batch| - batch.each(&:deactivate) + batch.each do |user| + Users::DeactivateService.new(admin_bot).execute(user) + end end end end diff --git a/app/workers/web_hook_worker.rb b/app/workers/web_hook_worker.rb index 043a16e3527..cea0816f5a6 100644 --- a/app/workers/web_hook_worker.rb +++ b/app/workers/web_hook_worker.rb @@ -24,7 +24,10 @@ class WebHookWorker # present in the request header so the hook can pass this same header value in its request. Gitlab::WebHooks::RecursionDetection.set_request_uuid(params[:recursion_detection_request_uuid]) - WebHookService.new(hook, data, hook_name, jid).execute + WebHookService.new(hook, data, hook_name, jid).execute.tap do |response| + log_extra_metadata_on_done(:response_status, response.status) + log_extra_metadata_on_done(:http_status, response[:http_status]) + end end end # rubocop:enable Scalability/IdempotentWorker |