diff options
Diffstat (limited to 'app/workers')
21 files changed, 411 insertions, 233 deletions
diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml index f8aa06943ee..6f6fd9ddb65 100644 --- a/app/workers/all_queues.yml +++ b/app/workers/all_queues.yml @@ -318,15 +318,6 @@ :weight: 1 :idempotent: false :tags: [] -- :name: cronjob:clusters_integrations_check_prometheus_health - :worker_name: Clusters::Integrations::CheckPrometheusHealthWorker - :feature_category: :incident_management - :has_external_dependencies: true - :urgency: :low - :resource_boundary: :unknown - :weight: 1 - :idempotent: true - :tags: [] - :name: cronjob:container_expiration_policy :worker_name: ContainerExpirationPolicyWorker :feature_category: :container_registry @@ -561,15 +552,6 @@ :weight: 1 :idempotent: false :tags: [] -- :name: cronjob:metrics_dashboard_schedule_annotations_prune - :worker_name: Metrics::Dashboard::ScheduleAnnotationsPruneWorker - :feature_category: :metrics - :has_external_dependencies: false - :urgency: :low - :resource_boundary: :unknown - :weight: 1 - :idempotent: true - :tags: [] - :name: cronjob:metrics_global_metrics_update :worker_name: Metrics::GlobalMetricsUpdateWorker :feature_category: :metrics @@ -1740,15 +1722,6 @@ :weight: 1 :idempotent: true :tags: [] -- :name: package_repositories:packages_debian_process_changes - :worker_name: Packages::Debian::ProcessChangesWorker - :feature_category: :package_registry - :has_external_dependencies: false - :urgency: :low - :resource_boundary: :unknown - :weight: 1 - :idempotent: true - :tags: [] - :name: package_repositories:packages_debian_process_package_file :worker_name: Packages::Debian::ProcessPackageFileWorker :feature_category: :package_registry @@ -2001,6 +1974,15 @@ :weight: 3 :idempotent: false :tags: [] +- :name: pipeline_default:ci_pipeline_cleanup_ref + :worker_name: Ci::PipelineCleanupRefWorker + :feature_category: :continuous_integration + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :unknown + :weight: 3 + :idempotent: true + :tags: [] - :name: pipeline_default:ci_retry_pipeline :worker_name: Ci::RetryPipelineWorker :feature_category: :continuous_integration @@ -2424,6 +2406,15 @@ :weight: 1 :idempotent: true :tags: [] +- :name: bulk_imports_finish_batched_pipeline + :worker_name: BulkImports::FinishBatchedPipelineWorker + :feature_category: :importers + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: bulk_imports_finish_batched_relation_export :worker_name: BulkImports::FinishBatchedRelationExportWorker :feature_category: :importers @@ -2442,6 +2433,15 @@ :weight: 1 :idempotent: false :tags: [] +- :name: bulk_imports_pipeline_batch + :worker_name: BulkImports::PipelineBatchWorker + :feature_category: :importers + :has_external_dependencies: true + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: false + :tags: [] - :name: bulk_imports_relation_batch_export :worker_name: BulkImports::RelationBatchExportWorker :feature_category: :importers @@ -2856,6 +2856,15 @@ :weight: 1 :idempotent: false :tags: [] +- :name: integrations_group_mention + :worker_name: Integrations::GroupMentionWorker + :feature_category: :integrations + :has_external_dependencies: true + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: integrations_irker :worker_name: Integrations::IrkerWorker :feature_category: :integrations @@ -2973,6 +2982,15 @@ :weight: 1 :idempotent: true :tags: [] +- :name: merge_requests_cleanup_ref + :worker_name: MergeRequests::CleanupRefWorker + :feature_category: :code_review_workflow + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: merge_requests_close_issue :worker_name: MergeRequests::CloseIssueWorker :feature_category: :code_review_workflow @@ -3072,24 +3090,6 @@ :weight: 1 :idempotent: true :tags: [] -- :name: metrics_dashboard_prune_old_annotations - :worker_name: Metrics::Dashboard::PruneOldAnnotationsWorker - :feature_category: :metrics - :has_external_dependencies: false - :urgency: :low - :resource_boundary: :unknown - :weight: 1 - :idempotent: true - :tags: [] -- :name: metrics_dashboard_sync_dashboards - :worker_name: Metrics::Dashboard::SyncDashboardsWorker - :feature_category: :metrics - :has_external_dependencies: false - :urgency: :low - :resource_boundary: :unknown - :weight: 1 - :idempotent: true - :tags: [] - :name: migrate_external_diffs :worker_name: MigrateExternalDiffsWorker :feature_category: :code_review_workflow @@ -3486,6 +3486,15 @@ :weight: 2 :idempotent: false :tags: [] +- :name: redis_migration + :worker_name: RedisMigrationWorker + :feature_category: :redis + :has_external_dependencies: false + :urgency: :throttled + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: releases_create_evidence :worker_name: Releases::CreateEvidenceWorker :feature_category: :release_evidence diff --git a/app/workers/bulk_imports/export_request_worker.rb b/app/workers/bulk_imports/export_request_worker.rb index 530419dac26..44759916f99 100644 --- a/app/workers/bulk_imports/export_request_worker.rb +++ b/app/workers/bulk_imports/export_request_worker.rb @@ -15,35 +15,40 @@ module BulkImports end def perform(entity_id) - entity = BulkImports::Entity.find(entity_id) + @entity = BulkImports::Entity.find(entity_id) - entity.update!(source_xid: entity_source_xid(entity)) if entity.source_xid.nil? - - request_export(entity) + set_source_xid + request_export BulkImports::EntityWorker.perform_async(entity_id) end def perform_failure(exception, entity_id) - entity = BulkImports::Entity.find(entity_id) + @entity = BulkImports::Entity.find(entity_id) - log_and_fail(exception, entity) + log_and_fail(exception) end private - def request_export(entity) - http_client(entity).post(entity.export_relations_url_path) + attr_reader :entity + + def set_source_xid + entity.update!(source_xid: entity_source_xid) if entity.source_xid.nil? + end + + def request_export + http_client.post(export_url) end - def http_client(entity) + def http_client @client ||= Clients::HTTP.new( url: entity.bulk_import.configuration.url, token: entity.bulk_import.configuration.access_token ) end - def failure_attributes(exception, entity) + def failure_attributes(exception) { bulk_import_entity_id: entity.id, pipeline_class: 'ExportRequestWorker', @@ -53,23 +58,20 @@ module BulkImports } end - def graphql_client(entity) + def graphql_client @graphql_client ||= BulkImports::Clients::Graphql.new( url: entity.bulk_import.configuration.url, token: entity.bulk_import.configuration.access_token ) end - def entity_source_xid(entity) - query = entity_query(entity) - client = graphql_client(entity) - - response = client.execute( - client.parse(query.to_s), + def entity_source_xid + response = graphql_client.execute( + graphql_client.parse(entity_query.to_s), { full_path: entity.source_full_path } ).original_hash - ::GlobalID.parse(response.dig(*query.data_path, 'id')).model_id + ::GlobalID.parse(response.dig(*entity_query.data_path, 'id')).model_id rescue StandardError => e log_exception(e, { @@ -86,12 +88,12 @@ module BulkImports nil end - def entity_query(entity) - if entity.group? - BulkImports::Groups::Graphql::GetGroupQuery.new(context: nil) - else - BulkImports::Projects::Graphql::GetProjectQuery.new(context: nil) - end + def entity_query + @entity_query ||= if entity.group? + BulkImports::Groups::Graphql::GetGroupQuery.new(context: nil) + else + BulkImports::Projects::Graphql::GetProjectQuery.new(context: nil) + end end def logger @@ -104,7 +106,7 @@ module BulkImports logger.error(structured_payload(payload)) end - def log_and_fail(exception, entity) + def log_and_fail(exception) log_exception(exception, { bulk_import_entity_id: entity.id, @@ -117,9 +119,13 @@ module BulkImports } ) - BulkImports::Failure.create(failure_attributes(exception, entity)) + BulkImports::Failure.create(failure_attributes(exception)) entity.fail_op! end + + def export_url + entity.export_relations_url_path(batched: Feature.enabled?(:bulk_imports_batched_import_export)) + end end end diff --git a/app/workers/bulk_imports/finish_batched_pipeline_worker.rb b/app/workers/bulk_imports/finish_batched_pipeline_worker.rb new file mode 100644 index 00000000000..4200d0e4a0f --- /dev/null +++ b/app/workers/bulk_imports/finish_batched_pipeline_worker.rb @@ -0,0 +1,45 @@ +# frozen_string_literal: true + +module BulkImports + class FinishBatchedPipelineWorker + include ApplicationWorker + include ExceptionBacktrace + + REQUEUE_DELAY = 5.seconds + + idempotent! + deduplicate :until_executing + data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency + feature_category :importers + + def perform(pipeline_tracker_id) + @tracker = Tracker.find(pipeline_tracker_id) + + return unless tracker.batched? + return unless tracker.started? + return re_enqueue if import_in_progress? + + if tracker.stale? + tracker.batches.map(&:fail_op!) + tracker.fail_op! + else + tracker.finish! + end + + ensure + ::BulkImports::EntityWorker.perform_async(tracker.entity.id, tracker.stage) + end + + private + + attr_reader :tracker + + def re_enqueue + self.class.perform_in(REQUEUE_DELAY, tracker.id) + end + + def import_in_progress? + tracker.batches.any?(&:started?) + end + end +end diff --git a/app/workers/bulk_imports/finish_batched_relation_export_worker.rb b/app/workers/bulk_imports/finish_batched_relation_export_worker.rb index aa7bbffa732..92a33a971e7 100644 --- a/app/workers/bulk_imports/finish_batched_relation_export_worker.rb +++ b/app/workers/bulk_imports/finish_batched_relation_export_worker.rb @@ -5,7 +5,7 @@ module BulkImports include ApplicationWorker idempotent! - data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency + data_consistency :sticky feature_category :importers REENQUEUE_DELAY = 5.seconds diff --git a/app/workers/bulk_imports/pipeline_batch_worker.rb b/app/workers/bulk_imports/pipeline_batch_worker.rb new file mode 100644 index 00000000000..378eff99b52 --- /dev/null +++ b/app/workers/bulk_imports/pipeline_batch_worker.rb @@ -0,0 +1,81 @@ +# frozen_string_literal: true + +module BulkImports + class PipelineBatchWorker # rubocop:disable Scalability/IdempotentWorker + include ApplicationWorker + include ExclusiveLeaseGuard + + data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency + feature_category :importers + sidekiq_options retry: false, dead: false + worker_has_external_dependencies! + + def perform(batch_id) + @batch = ::BulkImports::BatchTracker.find(batch_id) + @tracker = @batch.tracker + + try_obtain_lease { run } + ensure + ::BulkImports::FinishBatchedPipelineWorker.perform_async(tracker.id) + end + + private + + attr_reader :batch, :tracker + + def run + return batch.skip! if tracker.failed? || tracker.finished? + + batch.start! + tracker.pipeline_class.new(context).run + batch.finish! + rescue BulkImports::RetryPipelineError => e + retry_batch(e) + rescue StandardError => e + fail_batch(e) + end + + def fail_batch(exception) + batch.fail_op! + + Gitlab::ErrorTracking.track_exception( + exception, + batch_id: batch.id, + tracker_id: tracker.id, + pipeline_class: tracker.pipeline_name, + pipeline_step: 'pipeline_batch_worker_run' + ) + + BulkImports::Failure.create( + bulk_import_entity_id: batch.tracker.entity.id, + pipeline_class: tracker.pipeline_name, + pipeline_step: 'pipeline_batch_worker_run', + exception_class: exception.class.to_s, + exception_message: exception.message.truncate(255), + correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id + ) + end + + def context + @context ||= ::BulkImports::Pipeline::Context.new(tracker, batch_number: batch.batch_number) + end + + def retry_batch(exception) + batch.retry! + + re_enqueue(exception.retry_delay) + end + + def lease_timeout + 30 + end + + def lease_key + "gitlab:bulk_imports:pipeline_batch_worker:#{batch.id}" + end + + def re_enqueue(delay = FILE_EXTRACTION_PIPELINE_PERFORM_DELAY) + self.class.perform_in(delay, batch.id) + end + end +end diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb index f03e0bc0656..e0db18cb987 100644 --- a/app/workers/bulk_imports/pipeline_worker.rb +++ b/app/workers/bulk_imports/pipeline_worker.rb @@ -31,7 +31,6 @@ module BulkImports fail_tracker(StandardError.new(message)) unless pipeline_tracker.finished? || pipeline_tracker.skipped? end end - ensure ::BulkImports::EntityWorker.perform_async(entity_id, stage) end @@ -49,9 +48,17 @@ module BulkImports return re_enqueue if export_empty? || export_started? - pipeline_tracker.update!(status_event: 'start', jid: jid) - pipeline_tracker.pipeline_class.new(context).run - pipeline_tracker.finish! + if file_extraction_pipeline? && export_status.batched? + pipeline_tracker.update!(status_event: 'start', jid: jid, batched: true) + + return pipeline_tracker.finish! if export_status.batches_count < 1 + + enqueue_batches + else + pipeline_tracker.update!(status_event: 'start', jid: jid) + pipeline_tracker.pipeline_class.new(context).run + pipeline_tracker.finish! + end rescue BulkImports::RetryPipelineError => e retry_tracker(e) rescue StandardError => e @@ -179,5 +186,13 @@ module BulkImports time_since_tracker_created > Pipeline::NDJSON_EXPORT_TIMEOUT end + + def enqueue_batches + 1.upto(export_status.batches_count) do |batch_number| + batch = pipeline_tracker.batches.find_or_create_by!(batch_number: batch_number) # rubocop:disable CodeReuse/ActiveRecord + + ::BulkImports::PipelineBatchWorker.perform_async(batch.id) + end + end end end diff --git a/app/workers/bulk_imports/relation_export_worker.rb b/app/workers/bulk_imports/relation_export_worker.rb index b6693f0b07d..531edc6c7a7 100644 --- a/app/workers/bulk_imports/relation_export_worker.rb +++ b/app/workers/bulk_imports/relation_export_worker.rb @@ -18,9 +18,7 @@ module BulkImports portable = portable(portable_id, portable_class) config = BulkImports::FileTransfer.config_for(portable) - if Feature.enabled?(:bulk_imports_batched_import_export) && - Gitlab::Utils.to_boolean(batched) && - config.batchable_relation?(relation) + if Gitlab::Utils.to_boolean(batched) && config.batchable_relation?(relation) BatchedRelationExportService.new(user, portable, relation, jid).execute else RelationExportService.new(user, portable, relation, jid).execute diff --git a/app/workers/ci/pipeline_cleanup_ref_worker.rb b/app/workers/ci/pipeline_cleanup_ref_worker.rb new file mode 100644 index 00000000000..291e1090c18 --- /dev/null +++ b/app/workers/ci/pipeline_cleanup_ref_worker.rb @@ -0,0 +1,35 @@ +# frozen_string_literal: true + +module Ci + class PipelineCleanupRefWorker + include ApplicationWorker + include Projects::RemoveRefs + + sidekiq_options retry: 3 + include PipelineQueue + + idempotent! + deduplicate :until_executed, if_deduplicated: :reschedule_once, ttl: 1.minute + data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency + + urgency :low + + # Even though this worker is de-duplicated we need to acquire lock + # on a project to avoid running many concurrent refs removals + # + # TODO: Once underlying fix is done we can remove `in_lock` + # + # Related to: + # - https://gitlab.com/gitlab-org/gitaly/-/issues/5368 + # - https://gitlab.com/gitlab-org/gitaly/-/issues/5369 + def perform(pipeline_id) + pipeline = Ci::Pipeline.find_by_id(pipeline_id) + return unless pipeline + return unless pipeline.persistent_ref.should_delete? + + serialized_remove_refs(pipeline.project_id) do + pipeline.reset.persistent_ref.delete + end + end + end +end diff --git a/app/workers/clusters/integrations/check_prometheus_health_worker.rb b/app/workers/clusters/integrations/check_prometheus_health_worker.rb deleted file mode 100644 index b65b3424c3a..00000000000 --- a/app/workers/clusters/integrations/check_prometheus_health_worker.rb +++ /dev/null @@ -1,24 +0,0 @@ -# frozen_string_literal: true - -module Clusters - module Integrations - class CheckPrometheusHealthWorker - include ApplicationWorker - - data_consistency :always - - # rubocop:disable Scalability/CronWorkerContext - # This worker does not perform work scoped to a context - include CronjobQueue - # rubocop:enable Scalability/CronWorkerContext - - feature_category :incident_management - urgency :low - - idempotent! - worker_has_external_dependencies! - - def perform; end - end - end -end diff --git a/app/workers/container_registry/cleanup_worker.rb b/app/workers/container_registry/cleanup_worker.rb index 448a16ad309..9ec02dd613e 100644 --- a/app/workers/container_registry/cleanup_worker.rb +++ b/app/workers/container_registry/cleanup_worker.rb @@ -16,8 +16,6 @@ module ContainerRegistry BATCH_SIZE = 200 def perform - log_counts - reset_stale_deletes delete_stale_ongoing_repair_details @@ -54,26 +52,13 @@ module ContainerRegistry end def should_enqueue_record_detail_jobs? - return false unless Gitlab.com? + return false unless Gitlab.com_except_jh? return false unless Feature.enabled?(:registry_data_repair_worker) return false unless ContainerRegistry::GitlabApiClient.supports_gitlab_api? Project.pending_data_repair_analysis.exists? end - def log_counts - ::Gitlab::Database::LoadBalancing::Session.current.use_replicas_for_read_queries do - log_extra_metadata_on_done( - :delete_scheduled_container_repositories_count, - ContainerRepository.delete_scheduled.count - ) - log_extra_metadata_on_done( - :stale_delete_container_repositories_count, - stale_delete_container_repositories.count - ) - end - end - def stale_delete_container_repositories ContainerRepository.delete_ongoing.with_stale_delete_at(STALE_DELETE_THRESHOLD.ago) end diff --git a/app/workers/container_registry/record_data_repair_detail_worker.rb b/app/workers/container_registry/record_data_repair_detail_worker.rb index 390481f8e01..3e40dbbb99a 100644 --- a/app/workers/container_registry/record_data_repair_detail_worker.rb +++ b/app/workers/container_registry/record_data_repair_detail_worker.rb @@ -17,7 +17,7 @@ module ContainerRegistry LEASE_TIMEOUT = 1.hour.to_i def perform_work - return unless Gitlab.com? + return unless Gitlab.com_except_jh? return unless next_project return if next_project.container_registry_data_repair_detail @@ -51,7 +51,7 @@ module ContainerRegistry end def remaining_work_count - return 0 unless Gitlab.com? + return 0 unless Gitlab.com_except_jh? return 0 unless Feature.enabled?(:registry_data_repair_worker) return 0 unless ContainerRegistry::GitlabApiClient.supports_gitlab_api? @@ -69,7 +69,7 @@ module ContainerRegistry end def next_project - Project.pending_data_repair_analysis.first + Project.pending_data_repair_analysis.limit(max_running_jobs * 2).sample end strong_memoize_attr :next_project diff --git a/app/workers/integrations/execute_worker.rb b/app/workers/integrations/execute_worker.rb index 443f1d9fe8e..6fe1937a222 100644 --- a/app/workers/integrations/execute_worker.rb +++ b/app/workers/integrations/execute_worker.rb @@ -13,6 +13,8 @@ module Integrations worker_has_external_dependencies! def perform(hook_id, data) + return if ::Gitlab::SilentMode.enabled? + data = data.with_indifferent_access integration = Integration.find_by_id(hook_id) return unless integration diff --git a/app/workers/integrations/group_mention_worker.rb b/app/workers/integrations/group_mention_worker.rb new file mode 100644 index 00000000000..6cde1657ccd --- /dev/null +++ b/app/workers/integrations/group_mention_worker.rb @@ -0,0 +1,42 @@ +# frozen_string_literal: true + +module Integrations + class GroupMentionWorker + include ApplicationWorker + + idempotent! + feature_category :integrations + deduplicate :until_executed + data_consistency :delayed + urgency :low + + worker_has_external_dependencies! + + def perform(args) + args = args.with_indifferent_access + + mentionable_type = args[:mentionable_type] + mentionable_id = args[:mentionable_id] + hook_data = args[:hook_data] + is_confidential = args[:is_confidential] + + mentionable = case mentionable_type + when 'Issue' + Issue.find(mentionable_id) + when 'MergeRequest' + MergeRequest.find(mentionable_id) + end + + if mentionable.nil? + Sidekiq.logger.error( + message: 'Integrations::GroupMentionWorker: mentionable not supported', + mentionable_type: mentionable_type, + mentionable_id: mentionable_id + ) + return + end + + Integrations::GroupMentionService.new(mentionable, hook_data: hook_data, is_confidential: is_confidential).execute + end + end +end diff --git a/app/workers/merge_requests/cleanup_ref_worker.rb b/app/workers/merge_requests/cleanup_ref_worker.rb new file mode 100644 index 00000000000..c714b976a2b --- /dev/null +++ b/app/workers/merge_requests/cleanup_ref_worker.rb @@ -0,0 +1,35 @@ +# frozen_string_literal: true + +module MergeRequests + class CleanupRefWorker + include ApplicationWorker + include Projects::RemoveRefs + + sidekiq_options retry: 3 + loggable_arguments 2 + feature_category :code_review_workflow + + idempotent! + deduplicate :until_executed, if_deduplicated: :reschedule_once, ttl: 1.minute + data_consistency :delayed + + urgency :low + + # Even though this worker is de-duplicated we need to acquire lock + # on a project to avoid running many concurrent refs removals + # + # TODO: Once underlying fix is done we can remove `in_lock` + # + # Related to: + # - https://gitlab.com/gitlab-org/gitaly/-/issues/5368 + # - https://gitlab.com/gitlab-org/gitaly/-/issues/5369 + def perform(merge_request_id, only) + merge_request = MergeRequest.find_by_id(merge_request_id) + return unless merge_request + + serialized_remove_refs(merge_request.target_project_id) do + merge_request.cleanup_refs(only: only.to_sym) + end + end + end +end diff --git a/app/workers/merge_requests/mergeability_check_batch_worker.rb b/app/workers/merge_requests/mergeability_check_batch_worker.rb index cbe34ac3790..f48e9c234ab 100644 --- a/app/workers/merge_requests/mergeability_check_batch_worker.rb +++ b/app/workers/merge_requests/mergeability_check_batch_worker.rb @@ -15,10 +15,16 @@ module MergeRequests @logger ||= Sidekiq.logger end - def perform(merge_request_ids) + def perform(merge_request_ids, user_id) merge_requests = MergeRequest.id_in(merge_request_ids) + user = User.find_by_id(user_id) merge_requests.each do |merge_request| + # Skip projects that user doesn't have update_merge_request access + next if merge_status_recheck_not_allowed?(merge_request, user) + + merge_request.mark_as_checking + result = merge_request.check_mergeability next unless result&.error? @@ -30,5 +36,12 @@ module MergeRequests ) end end + + private + + def merge_status_recheck_not_allowed?(merge_request, user) + ::Feature.enabled?(:restrict_merge_status_recheck, merge_request.project) && + !Ability.allowed?(user, :update_merge_request, merge_request.project) + end end end diff --git a/app/workers/metrics/dashboard/prune_old_annotations_worker.rb b/app/workers/metrics/dashboard/prune_old_annotations_worker.rb deleted file mode 100644 index 5b34f85606d..00000000000 --- a/app/workers/metrics/dashboard/prune_old_annotations_worker.rb +++ /dev/null @@ -1,22 +0,0 @@ -# frozen_string_literal: true - -module Metrics - module Dashboard - class PruneOldAnnotationsWorker - include ApplicationWorker - - data_consistency :always - - sidekiq_options retry: 3 - - DELETE_LIMIT = 10_000 - DEFAULT_CUT_OFF_PERIOD = 2.weeks - - feature_category :metrics - - idempotent! # in the scope of 24 hours - - def perform; end - end - end -end diff --git a/app/workers/metrics/dashboard/schedule_annotations_prune_worker.rb b/app/workers/metrics/dashboard/schedule_annotations_prune_worker.rb deleted file mode 100644 index fe002ffa4a0..00000000000 --- a/app/workers/metrics/dashboard/schedule_annotations_prune_worker.rb +++ /dev/null @@ -1,22 +0,0 @@ -# frozen_string_literal: true - -module Metrics - module Dashboard - class ScheduleAnnotationsPruneWorker - include ApplicationWorker - - data_consistency :always - - # rubocop:disable Scalability/CronWorkerContext - # This worker does not perform work scoped to a context - include CronjobQueue - # rubocop:enable Scalability/CronWorkerContext - - feature_category :metrics - - idempotent! # PruneOldAnnotationsWorker worker is idempotent in the scope of 24 hours - - def perform; end - end - end -end diff --git a/app/workers/metrics/dashboard/sync_dashboards_worker.rb b/app/workers/metrics/dashboard/sync_dashboards_worker.rb deleted file mode 100644 index 668542e51a5..00000000000 --- a/app/workers/metrics/dashboard/sync_dashboards_worker.rb +++ /dev/null @@ -1,19 +0,0 @@ -# frozen_string_literal: true - -module Metrics - module Dashboard - class SyncDashboardsWorker - include ApplicationWorker - - data_consistency :always - - sidekiq_options retry: 3 - - feature_category :metrics - - idempotent! - - def perform(project_id); end - end - end -end diff --git a/app/workers/packages/debian/process_changes_worker.rb b/app/workers/packages/debian/process_changes_worker.rb deleted file mode 100644 index 0a716c61203..00000000000 --- a/app/workers/packages/debian/process_changes_worker.rb +++ /dev/null @@ -1,46 +0,0 @@ -# frozen_string_literal: true - -module Packages - module Debian - class ProcessChangesWorker - include ApplicationWorker - - data_consistency :always - include Gitlab::Utils::StrongMemoize - - deduplicate :until_executed - idempotent! - - queue_namespace :package_repositories - feature_category :package_registry - - def perform(package_file_id, user_id) - @package_file_id = package_file_id - @user_id = user_id - - return unless package_file && user - - ::Packages::Debian::ProcessChangesService.new(package_file, user).execute - rescue StandardError => e - Gitlab::ErrorTracking.log_exception(e, package_file_id: @package_file_id, user_id: @user_id) - package_file.destroy! - end - - private - - attr_reader :package_file_id, :user_id - - def package_file - strong_memoize(:package_file) do - ::Packages::PackageFile.find_by_id(package_file_id) - end - end - - def user - strong_memoize(:user) do - ::User.find_by_id(user_id) - end - end - end - end -end diff --git a/app/workers/redis_migration_worker.rb b/app/workers/redis_migration_worker.rb new file mode 100644 index 00000000000..bad9baeac70 --- /dev/null +++ b/app/workers/redis_migration_worker.rb @@ -0,0 +1,40 @@ +# frozen_string_literal: true + +class RedisMigrationWorker + include ApplicationWorker + + idempotent! + data_consistency :delayed + feature_category :redis + urgency :throttled + loggable_arguments 0 + + SCAN_START_STOP = '0' + + def perform(job_class_name, cursor, options = {}) + migrator = self.class.fetch_migrator!(job_class_name) + + scan_size = options[:scan_size] || 1000 + deadline = Time.now.utc + 3.minutes + + while Time.now.utc < deadline + cursor, keys = migrator.redis.scan(cursor, match: migrator.scan_match_pattern, count: scan_size) + + migrator.perform(keys) if keys.any? + + sleep(0.01) + break if cursor == SCAN_START_STOP + end + + self.class.perform_async(job_class_name, cursor, options) unless cursor == SCAN_START_STOP + end + + class << self + def fetch_migrator!(job_class_name) + job_class = "Gitlab::BackgroundMigration::Redis::#{job_class_name}".safe_constantize + raise NotImplementedError, "#{job_class_name} does not exist" if job_class.nil? + + job_class.new + end + end +end diff --git a/app/workers/run_pipeline_schedule_worker.rb b/app/workers/run_pipeline_schedule_worker.rb index 4ca366efcad..dab92e16ee3 100644 --- a/app/workers/run_pipeline_schedule_worker.rb +++ b/app/workers/run_pipeline_schedule_worker.rb @@ -33,10 +33,15 @@ class RunPipelineScheduleWorker # rubocop:disable Scalability/IdempotentWorker def run_pipeline_schedule(schedule, user) response = Ci::CreatePipelineService .new(schedule.project, user, ref: schedule.ref) - .execute(:schedule, ignore_skip_ci: true, save_on_errors: false, schedule: schedule) + .execute( + :schedule, + save_on_errors: Feature.enabled?(:persist_failed_pipelines_from_schedules, schedule.project), + ignore_skip_ci: true, schedule: schedule + ) return response if response.payload.persisted? + # Remove with FF persist_failed_pipelines_from_schedules enabled, as corrupted yml is not longer logged # This is a user operation error such as corrupted .gitlab-ci.yml. Log the error for debugging purpose. log_extra_metadata_on_done(:pipeline_creation_error, response.message) rescue StandardError => e |