diff options
author | GitLab Bot <gitlab-bot@gitlab.com> | 2023-11-14 11:41:52 +0300 |
---|---|---|
committer | GitLab Bot <gitlab-bot@gitlab.com> | 2023-11-14 11:41:52 +0300 |
commit | 585826cb22ecea5998a2c2a4675735c94bdeedac (patch) | |
tree | 5b05f0b30d33cef48963609e8a18a4dff260eab3 /app/workers | |
parent | df221d036e5d0c6c0ee4d55b9c97f481ee05dee8 (diff) |
Add latest changes from gitlab-org/gitlab@16-6-stable-eev16.6.0-rc42
Diffstat (limited to 'app/workers')
49 files changed, 576 insertions, 290 deletions
diff --git a/app/workers/abuse/spam_abuse_events_worker.rb b/app/workers/abuse/spam_abuse_events_worker.rb new file mode 100644 index 00000000000..7d86e994ae4 --- /dev/null +++ b/app/workers/abuse/spam_abuse_events_worker.rb @@ -0,0 +1,60 @@ +# frozen_string_literal: true + +module Abuse + class SpamAbuseEventsWorker + include ApplicationWorker + + data_consistency :delayed + + idempotent! + feature_category :instance_resiliency + urgency :low + + def perform(params) + params = params.with_indifferent_access + + @user = User.find_by_id(params[:user_id]) + unless @user + logger.info(structured_payload(message: "User not found.", user_id: params[:user_id])) + return + end + + report_user(params) + end + + private + + attr_reader :user + + def report_user(params) + category = 'spam' + reporter = Users::Internal.security_bot + report_params = { user_id: params[:user_id], + reporter: reporter, + category: category, + message: 'User reported for abuse based on spam verdict' } + + abuse_report = AbuseReport.by_category(category).by_reporter_id(reporter.id).by_user_id(params[:user_id]).first + + abuse_report = AbuseReport.create!(report_params) if abuse_report.nil? + + create_abuse_event(abuse_report.id, params) + end + + # Associate the abuse report with an abuse event + def create_abuse_event(abuse_report_id, params) + Abuse::Event.create!( + abuse_report_id: abuse_report_id, + category: :spam, + metadata: { noteable_type: params[:noteable_type], + title: params[:title], + description: params[:description], + source_ip: params[:source_ip], + user_agent: params[:user_agent], + verdict: params[:verdict] }, + source: :spamcheck, + user: user + ) + end + end +end diff --git a/app/workers/activity_pub/projects/releases_subscription_worker.rb b/app/workers/activity_pub/projects/releases_subscription_worker.rb new file mode 100644 index 00000000000..c392726a469 --- /dev/null +++ b/app/workers/activity_pub/projects/releases_subscription_worker.rb @@ -0,0 +1,39 @@ +# frozen_string_literal: true + +module ActivityPub + module Projects + class ReleasesSubscriptionWorker + include ApplicationWorker + include Gitlab::Routing.url_helpers + + idempotent! + worker_has_external_dependencies! + feature_category :release_orchestration + data_consistency :delayed + queue_namespace :activity_pub + + sidekiq_retries_exhausted do |msg, _ex| + subscription_id = msg['args'].second + subscription = ActivityPub::ReleasesSubscription.find_by_id(subscription_id) + subscription&.destroy + end + + def perform(subscription_id) + subscription = ActivityPub::ReleasesSubscription.find_by_id(subscription_id) + return if subscription.nil? + + unless subscription.project.public? + subscription.destroy + return + end + + InboxResolverService.new(subscription).execute if needs_resolving?(subscription) + AcceptFollowService.new(subscription, project_releases_url(subscription.project)).execute + end + + def needs_resolving?(subscription) + subscription.subscriber_inbox_url.blank? || subscription.shared_inbox_url.blank? + end + end + end +end diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml index e5b860ba525..0bb88efe183 100644 --- a/app/workers/all_queues.yml +++ b/app/workers/all_queues.yml @@ -3,6 +3,15 @@ # # Do not edit it manually! --- +- :name: activity_pub:activity_pub_projects_releases_subscription + :worker_name: ActivityPub::Projects::ReleasesSubscriptionWorker + :feature_category: :release_orchestration + :has_external_dependencies: true + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: authorized_project_update:authorized_project_update_project_recalculate :worker_name: AuthorizedProjectUpdate::ProjectRecalculateWorker :feature_category: :system_access @@ -1461,42 +1470,6 @@ :weight: 1 :idempotent: false :tags: [] -- :name: hashed_storage:hashed_storage_migrator - :worker_name: HashedStorage::MigratorWorker - :feature_category: :source_code_management - :has_external_dependencies: false - :urgency: :low - :resource_boundary: :unknown - :weight: 1 - :idempotent: false - :tags: [] -- :name: hashed_storage:hashed_storage_project_migrate - :worker_name: HashedStorage::ProjectMigrateWorker - :feature_category: :source_code_management - :has_external_dependencies: false - :urgency: :low - :resource_boundary: :unknown - :weight: 1 - :idempotent: false - :tags: [] -- :name: hashed_storage:hashed_storage_project_rollback - :worker_name: HashedStorage::ProjectRollbackWorker - :feature_category: :source_code_management - :has_external_dependencies: false - :urgency: :low - :resource_boundary: :unknown - :weight: 1 - :idempotent: false - :tags: [] -- :name: hashed_storage:hashed_storage_rollbacker - :worker_name: HashedStorage::RollbackerWorker - :feature_category: :source_code_management - :has_external_dependencies: false - :urgency: :low - :resource_boundary: :unknown - :weight: 1 - :idempotent: false - :tags: [] - :name: incident_management:incident_management_add_severity_system_note :worker_name: IncidentManagement::AddSeveritySystemNoteWorker :feature_category: :incident_management @@ -1767,6 +1740,15 @@ :weight: 1 :idempotent: true :tags: [] +- :name: package_cleanup:packages_npm_cleanup_stale_metadata_cache + :worker_name: Packages::Npm::CleanupStaleMetadataCacheWorker + :feature_category: :package_registry + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: package_repositories:packages_debian_generate_distribution :worker_name: Packages::Debian::GenerateDistributionWorker :feature_category: :package_registry @@ -2307,6 +2289,15 @@ :weight: 1 :idempotent: true :tags: [] +- :name: abuse_spam_abuse_events + :worker_name: Abuse::SpamAbuseEventsWorker + :feature_category: :instance_resiliency + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: analytics_usage_trends_counter_job :worker_name: Analytics::UsageTrends::CounterJobWorker :feature_category: :devops_reports @@ -2575,7 +2566,7 @@ :urgency: :low :resource_boundary: :unknown :weight: 1 - :idempotent: false + :idempotent: true :tags: [] - :name: bulk_imports_entity :worker_name: BulkImports::EntityWorker @@ -2629,7 +2620,7 @@ :urgency: :low :resource_boundary: :memory :weight: 1 - :idempotent: false + :idempotent: true :tags: [] - :name: bulk_imports_pipeline_batch :worker_name: BulkImports::PipelineBatchWorker @@ -2638,7 +2629,7 @@ :urgency: :low :resource_boundary: :memory :weight: 1 - :idempotent: false + :idempotent: true :tags: [] - :name: bulk_imports_relation_batch_export :worker_name: BulkImports::RelationBatchExportWorker @@ -2892,6 +2883,15 @@ :weight: 2 :idempotent: false :tags: [] +- :name: environments_auto_recover + :worker_name: Environments::AutoRecoverWorker + :feature_category: :continuous_delivery + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: environments_auto_stop :worker_name: Environments::AutoStopWorker :feature_category: :continuous_delivery @@ -3567,6 +3567,15 @@ :weight: 1 :idempotent: false :tags: [] +- :name: projects_import_export_after_import_merge_requests + :worker_name: Projects::ImportExport::AfterImportMergeRequestsWorker + :feature_category: :importers + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: projects_import_export_create_relation_exports :worker_name: Projects::ImportExport::CreateRelationExportsWorker :feature_category: :importers @@ -3837,15 +3846,6 @@ :weight: 1 :idempotent: false :tags: [] -- :name: tasks_to_be_done_create - :worker_name: TasksToBeDone::CreateWorker - :feature_category: :onboarding - :has_external_dependencies: false - :urgency: :low - :resource_boundary: :cpu - :weight: 1 - :idempotent: true - :tags: [] - :name: update_external_pull_requests :worker_name: UpdateExternalPullRequestsWorker :feature_category: :continuous_integration diff --git a/app/workers/bulk_import_worker.rb b/app/workers/bulk_import_worker.rb index 5b9b46081cc..70e7d82741f 100644 --- a/app/workers/bulk_import_worker.rb +++ b/app/workers/bulk_import_worker.rb @@ -1,11 +1,16 @@ # frozen_string_literal: true -class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker +class BulkImportWorker include ApplicationWorker data_consistency :always feature_category :importers - sidekiq_options retry: false, dead: false + sidekiq_options retry: 3, dead: false + idempotent! + + sidekiq_retries_exhausted do |msg, exception| + new.perform_failure(exception, msg['args'].first) + end def perform(bulk_import_id) bulk_import = BulkImport.find_by_id(bulk_import_id) @@ -13,4 +18,12 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker BulkImports::ProcessService.new(bulk_import).execute end + + def perform_failure(exception, bulk_import_id) + bulk_import = BulkImport.find_by_id(bulk_import_id) + + Gitlab::ErrorTracking.track_exception(exception, bulk_import_id: bulk_import.id) + + bulk_import.fail_op + end end diff --git a/app/workers/bulk_imports/entity_worker.rb b/app/workers/bulk_imports/entity_worker.rb index 9b60dcdeb8a..e510a8c0d06 100644 --- a/app/workers/bulk_imports/entity_worker.rb +++ b/app/workers/bulk_imports/entity_worker.rb @@ -5,12 +5,16 @@ module BulkImports include ApplicationWorker idempotent! - deduplicate :until_executed + deduplicate :until_executed, if_deduplicated: :reschedule_once data_consistency :always feature_category :importers - sidekiq_options retry: false, dead: false + sidekiq_options retry: 3, dead: false worker_has_external_dependencies! + sidekiq_retries_exhausted do |msg, exception| + new.perform_failure(exception, msg['args'].first) + end + PERFORM_DELAY = 5.seconds # Keep `_current_stage` parameter for backwards compatibility. @@ -27,10 +31,17 @@ module BulkImports end re_enqueue - rescue StandardError => e - Gitlab::ErrorTracking.track_exception(e, log_params(message: 'Entity failed')) + end + + def perform_failure(exception, entity_id) + @entity = ::BulkImports::Entity.find(entity_id) + + Gitlab::ErrorTracking.track_exception( + exception, + log_params(message: "Request to export #{entity.source_type} failed") + ) - @entity.fail_op! + entity.fail_op! end private @@ -68,7 +79,7 @@ module BulkImports end def logger - @logger ||= Gitlab::Import::Logger.build + @logger ||= Logger.build end def log_exception(exception, payload) @@ -88,7 +99,7 @@ module BulkImports bulk_import_entity_type: entity.source_type, source_full_path: entity.source_full_path, source_version: source_version, - importer: 'gitlab_migration' + importer: Logger::IMPORTER_NAME } defaults.merge(extra) diff --git a/app/workers/bulk_imports/export_request_worker.rb b/app/workers/bulk_imports/export_request_worker.rb index 44759916f99..f7456ddccb1 100644 --- a/app/workers/bulk_imports/export_request_worker.rb +++ b/app/workers/bulk_imports/export_request_worker.rb @@ -80,8 +80,7 @@ module BulkImports bulk_import_id: entity.bulk_import_id, bulk_import_entity_type: entity.source_type, source_full_path: entity.source_full_path, - source_version: entity.bulk_import.source_version_info.to_s, - importer: 'gitlab_migration' + source_version: entity.bulk_import.source_version_info.to_s } ) @@ -97,7 +96,7 @@ module BulkImports end def logger - @logger ||= Gitlab::Import::Logger.build + @logger ||= Logger.build end def log_exception(exception, payload) @@ -114,8 +113,7 @@ module BulkImports bulk_import_entity_type: entity.source_type, source_full_path: entity.source_full_path, message: "Request to export #{entity.source_type} failed", - source_version: entity.bulk_import.source_version_info.to_s, - importer: 'gitlab_migration' + source_version: entity.bulk_import.source_version_info.to_s } ) diff --git a/app/workers/bulk_imports/finish_batched_pipeline_worker.rb b/app/workers/bulk_imports/finish_batched_pipeline_worker.rb index b1f3757e058..40d26e14dc1 100644 --- a/app/workers/bulk_imports/finish_batched_pipeline_worker.rb +++ b/app/workers/bulk_imports/finish_batched_pipeline_worker.rb @@ -16,22 +16,21 @@ module BulkImports def perform(pipeline_tracker_id) @tracker = Tracker.find(pipeline_tracker_id) + @context = ::BulkImports::Pipeline::Context.new(tracker) return unless tracker.batched? return unless tracker.started? return re_enqueue if import_in_progress? if tracker.stale? + logger.error(log_attributes(message: 'Tracker stale. Failing batches and tracker')) tracker.batches.map(&:fail_op!) tracker.fail_op! else + tracker.pipeline_class.new(@context).on_finish + logger.info(log_attributes(message: 'Tracker finished')) tracker.finish! end - - ensure - # This is needed for in-flight migrations. - # It will be remove in https://gitlab.com/gitlab-org/gitlab/-/issues/426299 - ::BulkImports::EntityWorker.perform_async(tracker.entity.id) if job_version.nil? end private @@ -45,5 +44,20 @@ module BulkImports def import_in_progress? tracker.batches.any? { |b| b.started? || b.created? } end + + def logger + @logger ||= Logger.build + end + + def log_attributes(extra = {}) + structured_payload( + { + tracker_id: tracker.id, + bulk_import_id: tracker.entity.id, + bulk_import_entity_id: tracker.entity.bulk_import_id, + pipeline_class: tracker.pipeline_name + }.merge(extra) + ) + end end end diff --git a/app/workers/bulk_imports/pipeline_batch_worker.rb b/app/workers/bulk_imports/pipeline_batch_worker.rb index 6230d517641..1485275e616 100644 --- a/app/workers/bulk_imports/pipeline_batch_worker.rb +++ b/app/workers/bulk_imports/pipeline_batch_worker.rb @@ -1,26 +1,65 @@ # frozen_string_literal: true module BulkImports - class PipelineBatchWorker # rubocop:disable Scalability/IdempotentWorker + class PipelineBatchWorker include ApplicationWorker include ExclusiveLeaseGuard + DEFER_ON_HEALTH_DELAY = 5.minutes + data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency feature_category :importers - sidekiq_options retry: false, dead: false + sidekiq_options dead: false, retry: 3 worker_has_external_dependencies! worker_resource_boundary :memory + idempotent! + + sidekiq_retries_exhausted do |msg, exception| + new.perform_failure(msg['args'].first, exception) + end + + defer_on_database_health_signal(:gitlab_main, [], DEFER_ON_HEALTH_DELAY) do |job_args, schema, tables| + batch = ::BulkImports::BatchTracker.find(job_args.first) + pipeline_tracker = batch.tracker + pipeline_schema = ::BulkImports::PipelineSchemaInfo.new( + pipeline_tracker.pipeline_class, + pipeline_tracker.entity.portable_class + ) + + if pipeline_schema.db_schema && pipeline_schema.db_table + schema = pipeline_schema.db_schema + tables = [pipeline_schema.db_table] + end + + [schema, tables] + end + + def self.defer_on_database_health_signal? + Feature.enabled?(:bulk_import_deferred_workers) + end def perform(batch_id) @batch = ::BulkImports::BatchTracker.find(batch_id) + @tracker = @batch.tracker @pending_retry = false + return unless process_batch? + + log_extra_metadata_on_done(:pipeline_class, @tracker.pipeline_name) + try_obtain_lease { run } ensure ::BulkImports::FinishBatchedPipelineWorker.perform_async(tracker.id) unless pending_retry end + def perform_failure(batch_id, exception) + @batch = ::BulkImports::BatchTracker.find(batch_id) + @tracker = @batch.tracker + + fail_batch(exception) + end + private attr_reader :batch, :tracker, :pending_retry @@ -28,35 +67,31 @@ module BulkImports def run return batch.skip! if tracker.failed? || tracker.finished? + logger.info(log_attributes(message: 'Batch tracker started')) batch.start! tracker.pipeline_class.new(context).run batch.finish! + logger.info(log_attributes(message: 'Batch tracker finished')) rescue BulkImports::RetryPipelineError => e @pending_retry = true retry_batch(e) - rescue StandardError => e - fail_batch(e) end def fail_batch(exception) batch.fail_op! - Gitlab::ErrorTracking.track_exception( - exception, - batch_id: batch.id, - tracker_id: tracker.id, - pipeline_class: tracker.pipeline_name, - pipeline_step: 'pipeline_batch_worker_run' - ) + Gitlab::ErrorTracking.track_exception(exception, log_attributes(message: 'Batch tracker failed')) BulkImports::Failure.create( bulk_import_entity_id: batch.tracker.entity.id, pipeline_class: tracker.pipeline_name, pipeline_step: 'pipeline_batch_worker_run', exception_class: exception.class.to_s, - exception_message: exception.message.truncate(255), + exception_message: exception.message, correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id ) + + ::BulkImports::FinishBatchedPipelineWorker.perform_async(tracker.id) end def context @@ -78,7 +113,32 @@ module BulkImports end def re_enqueue(delay = FILE_EXTRACTION_PIPELINE_PERFORM_DELAY) + log_extra_metadata_on_done(:re_enqueue, true) + self.class.perform_in(delay, batch.id) end + + def process_batch? + batch.created? || batch.started? + end + + def logger + @logger ||= Logger.build + end + + def log_attributes(extra = {}) + structured_payload( + { + batch_id: batch.id, + batch_number: batch.batch_number, + tracker_id: tracker.id, + bulk_import_id: tracker.entity.bulk_import_id, + bulk_import_entity_id: tracker.entity.id, + pipeline_class: tracker.pipeline_name, + pipeline_step: 'pipeline_batch_worker_run', + importer: Logger::IMPORTER_NAME + }.merge(extra) + ) + end end end diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb index 24185f43795..2c1d28b33c5 100644 --- a/app/workers/bulk_imports/pipeline_worker.rb +++ b/app/workers/bulk_imports/pipeline_worker.rb @@ -1,43 +1,68 @@ # frozen_string_literal: true module BulkImports - class PipelineWorker # rubocop:disable Scalability/IdempotentWorker + class PipelineWorker include ApplicationWorker include ExclusiveLeaseGuard FILE_EXTRACTION_PIPELINE_PERFORM_DELAY = 10.seconds + DEFER_ON_HEALTH_DELAY = 5.minutes + data_consistency :always feature_category :importers - sidekiq_options retry: false, dead: false + sidekiq_options dead: false, retry: 3 worker_has_external_dependencies! deduplicate :until_executing worker_resource_boundary :memory + idempotent! version 2 + sidekiq_retries_exhausted do |msg, exception| + new.perform_failure(msg['args'][0], msg['args'][2], exception) + end + + defer_on_database_health_signal(:gitlab_main, [], DEFER_ON_HEALTH_DELAY) do |job_args, schema, tables| + pipeline_tracker = ::BulkImports::Tracker.find(job_args.first) + pipeline_schema = ::BulkImports::PipelineSchemaInfo.new( + pipeline_tracker.pipeline_class, + pipeline_tracker.entity.portable_class + ) + + if pipeline_schema.db_schema && pipeline_schema.db_table + schema = pipeline_schema.db_schema + tables = [pipeline_schema.db_table] + end + + [schema, tables] + end + + def self.defer_on_database_health_signal? + Feature.enabled?(:bulk_import_deferred_workers) + end + # Keep _stage parameter for backwards compatibility. def perform(pipeline_tracker_id, _stage, entity_id) @entity = ::BulkImports::Entity.find(entity_id) @pipeline_tracker = ::BulkImports::Tracker.find(pipeline_tracker_id) + log_extra_metadata_on_done(:pipeline_class, @pipeline_tracker.pipeline_name) + try_obtain_lease do - if pipeline_tracker.enqueued? + if pipeline_tracker.enqueued? || pipeline_tracker.started? logger.info(log_attributes(message: 'Pipeline starting')) run - else - message = "Pipeline in #{pipeline_tracker.human_status_name} state instead of expected enqueued state" - - logger.error(log_attributes(message: message)) - - fail_tracker(StandardError.new(message)) unless pipeline_tracker.finished? || pipeline_tracker.skipped? end end - ensure - # This is needed for in-flight migrations. - # It will be remove in https://gitlab.com/gitlab-org/gitlab/-/issues/426299 - ::BulkImports::EntityWorker.perform_async(entity_id) if job_version.nil? + end + + def perform_failure(pipeline_tracker_id, entity_id, exception) + @entity = ::BulkImports::Entity.find(entity_id) + @pipeline_tracker = ::BulkImports::Tracker.find(pipeline_tracker_id) + + fail_tracker(exception) end private @@ -53,20 +78,22 @@ module BulkImports return re_enqueue if export_empty? || export_started? if file_extraction_pipeline? && export_status.batched? + log_extra_metadata_on_done(:batched, true) + pipeline_tracker.update!(status_event: 'start', jid: jid, batched: true) return pipeline_tracker.finish! if export_status.batches_count < 1 enqueue_batches else + log_extra_metadata_on_done(:batched, false) + pipeline_tracker.update!(status_event: 'start', jid: jid) pipeline_tracker.pipeline_class.new(context).run pipeline_tracker.finish! end rescue BulkImports::RetryPipelineError => e retry_tracker(e) - rescue StandardError => e - fail_tracker(e) end def source_version @@ -85,16 +112,18 @@ module BulkImports pipeline_class: pipeline_tracker.pipeline_name, pipeline_step: 'pipeline_worker_run', exception_class: exception.class.to_s, - exception_message: exception.message.truncate(255), + exception_message: exception.message, correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id ) end def logger - @logger ||= Gitlab::Import::Logger.build + @logger ||= Logger.build end def re_enqueue(delay = FILE_EXTRACTION_PIPELINE_PERFORM_DELAY) + log_extra_metadata_on_done(:re_enqueue, true) + self.class.perform_in( delay, pipeline_tracker.id, @@ -159,10 +188,10 @@ module BulkImports bulk_import_entity_type: entity.source_type, source_full_path: entity.source_full_path, pipeline_tracker_id: pipeline_tracker.id, - pipeline_name: pipeline_tracker.pipeline_name, + pipeline_class: pipeline_tracker.pipeline_name, pipeline_tracker_state: pipeline_tracker.human_status_name, source_version: source_version, - importer: 'gitlab_migration' + importer: Logger::IMPORTER_NAME }.merge(extra) ) end diff --git a/app/workers/bulk_imports/relation_batch_export_worker.rb b/app/workers/bulk_imports/relation_batch_export_worker.rb index 4ce36929e15..87ceb775075 100644 --- a/app/workers/bulk_imports/relation_batch_export_worker.rb +++ b/app/workers/bulk_imports/relation_batch_export_worker.rb @@ -7,10 +7,25 @@ module BulkImports idempotent! data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency feature_category :importers - sidekiq_options status_expiration: StuckExportJobsWorker::EXPORT_JOBS_EXPIRATION + sidekiq_options status_expiration: StuckExportJobsWorker::EXPORT_JOBS_EXPIRATION, retry: 3 + + sidekiq_retries_exhausted do |job, exception| + batch = BulkImports::ExportBatch.find(job['args'][1]) + portable = batch.export.portable + + Gitlab::ErrorTracking.track_exception(exception, portable_id: portable.id, portable_type: portable.class.name) + + batch.update!(status_event: 'fail_op', error: exception.message.truncate(255)) + end def perform(user_id, batch_id) - RelationBatchExportService.new(user_id, batch_id).execute + @user = User.find(user_id) + @batch = BulkImports::ExportBatch.find(batch_id) + + log_extra_metadata_on_done(:relation, @batch.export.relation) + log_extra_metadata_on_done(:objects_count, @batch.objects_count) + + RelationBatchExportService.new(@user, @batch).execute end end end diff --git a/app/workers/bulk_imports/relation_export_worker.rb b/app/workers/bulk_imports/relation_export_worker.rb index 531edc6c7a7..168626fee85 100644 --- a/app/workers/bulk_imports/relation_export_worker.rb +++ b/app/workers/bulk_imports/relation_export_worker.rb @@ -10,25 +10,37 @@ module BulkImports loggable_arguments 2, 3 data_consistency :always feature_category :importers - sidekiq_options status_expiration: StuckExportJobsWorker::EXPORT_JOBS_EXPIRATION + sidekiq_options status_expiration: StuckExportJobsWorker::EXPORT_JOBS_EXPIRATION, retry: 3 worker_resource_boundary :memory + sidekiq_retries_exhausted do |job, exception| + _user_id, portable_id, portable_type, relation, batched = job['args'] + portable = portable(portable_id, portable_type) + + export = portable.bulk_import_exports.find_by_relation(relation) + + Gitlab::ErrorTracking.track_exception(exception, portable_id: portable_id, portable_type: portable.class.name) + + export.update!(status_event: 'fail_op', error: exception.message.truncate(255), batched: batched) + end + + def self.portable(portable_id, portable_class) + portable_class.classify.constantize.find(portable_id) + end + def perform(user_id, portable_id, portable_class, relation, batched = false) user = User.find(user_id) - portable = portable(portable_id, portable_class) + portable = self.class.portable(portable_id, portable_class) config = BulkImports::FileTransfer.config_for(portable) + log_extra_metadata_on_done(:relation, relation) if Gitlab::Utils.to_boolean(batched) && config.batchable_relation?(relation) + log_extra_metadata_on_done(:batched, true) BatchedRelationExportService.new(user, portable, relation, jid).execute else + log_extra_metadata_on_done(:batched, false) RelationExportService.new(user, portable, relation, jid).execute end end - - private - - def portable(portable_id, portable_class) - portable_class.classify.constantize.find(portable_id) - end end end diff --git a/app/workers/bulk_imports/stuck_import_worker.rb b/app/workers/bulk_imports/stuck_import_worker.rb index 3fa4221728b..6c8569b0aa0 100644 --- a/app/workers/bulk_imports/stuck_import_worker.rb +++ b/app/workers/bulk_imports/stuck_import_worker.rb @@ -14,18 +14,29 @@ module BulkImports def perform BulkImport.stale.find_each do |import| + logger.error(message: 'BulkImport stale', bulk_import_id: import.id) import.cleanup_stale end - BulkImports::Entity.includes(:trackers).stale.find_each do |import| # rubocop: disable CodeReuse/ActiveRecord + BulkImports::Entity.includes(:trackers).stale.find_each do |entity| # rubocop: disable CodeReuse/ActiveRecord ApplicationRecord.transaction do - import.cleanup_stale + logger.error( + message: 'BulkImports::Entity stale', + bulk_import_id: entity.bulk_import_id, + bulk_import_entity_id: entity.id + ) - import.trackers.find_each do |tracker| + entity.cleanup_stale + + entity.trackers.find_each do |tracker| tracker.cleanup_stale end end end end + + def logger + @logger ||= Logger.build + end end end diff --git a/app/workers/ci/cancel_pipeline_worker.rb b/app/workers/ci/cancel_pipeline_worker.rb index 0b2c96e7ace..f099e185629 100644 --- a/app/workers/ci/cancel_pipeline_worker.rb +++ b/app/workers/ci/cancel_pipeline_worker.rb @@ -20,7 +20,7 @@ module Ci pipeline: pipeline, current_user: nil, cascade_to_children: false, - auto_canceled_by_pipeline_id: auto_canceled_by_pipeline_id + auto_canceled_by_pipeline: ::Ci::Pipeline.find_by_id(auto_canceled_by_pipeline_id) ).force_execute end end diff --git a/app/workers/ci/initial_pipeline_process_worker.rb b/app/workers/ci/initial_pipeline_process_worker.rb index 703cae8bf88..8d7a62e5b09 100644 --- a/app/workers/ci/initial_pipeline_process_worker.rb +++ b/app/workers/ci/initial_pipeline_process_worker.rb @@ -17,24 +17,10 @@ module Ci def perform(pipeline_id) Ci::Pipeline.find_by_id(pipeline_id).try do |pipeline| - create_deployments!(pipeline) - Ci::PipelineCreation::StartPipelineService .new(pipeline) .execute end end - - private - - def create_deployments!(pipeline) - return if Feature.enabled?(:create_deployment_only_for_processable_jobs, pipeline.project) - - pipeline.stages.flat_map(&:statuses).each { |build| create_deployment(build) } - end - - def create_deployment(build) - ::Deployments::CreateForJobService.new.execute(build) - end end end diff --git a/app/workers/ci/refs/unlock_previous_pipelines_worker.rb b/app/workers/ci/refs/unlock_previous_pipelines_worker.rb index bf595590cb1..588ec4ce1f0 100644 --- a/app/workers/ci/refs/unlock_previous_pipelines_worker.rb +++ b/app/workers/ci/refs/unlock_previous_pipelines_worker.rb @@ -14,7 +14,9 @@ module Ci def perform(ref_id) ::Ci::Ref.find_by_id(ref_id).try do |ref| - pipeline = ref.last_finished_pipeline + next unless ref.artifacts_locked? + + pipeline = ref.last_unlockable_ci_source_pipeline result = ::Ci::Refs::EnqueuePipelinesToUnlockService.new.execute(ref, before_pipeline: pipeline) log_extra_metadata_on_done(:total_pending_entries, result[:total_pending_entries]) diff --git a/app/workers/concerns/gitlab/github_import/rescheduling_methods.rb b/app/workers/concerns/gitlab/github_import/rescheduling_methods.rb index f6feb6d1598..316d30d94da 100644 --- a/app/workers/concerns/gitlab/github_import/rescheduling_methods.rb +++ b/app/workers/concerns/gitlab/github_import/rescheduling_methods.rb @@ -52,8 +52,7 @@ module Gitlab job_delay = client.rate_limit_resets_in + calculate_job_delay(enqueued_job_counter) - self.class - .perform_in(job_delay, project.id, hash, notify_key) + self.class.perform_in(job_delay, project.id, hash.deep_stringify_keys, notify_key.to_s) end end end diff --git a/app/workers/concerns/gitlab/github_import/stage_methods.rb b/app/workers/concerns/gitlab/github_import/stage_methods.rb index 80013ff3cd9..5c63c667a03 100644 --- a/app/workers/concerns/gitlab/github_import/stage_methods.rb +++ b/app/workers/concerns/gitlab/github_import/stage_methods.rb @@ -5,6 +5,8 @@ module Gitlab module StageMethods extend ActiveSupport::Concern + MAX_RETRIES_AFTER_INTERRUPTION = 20 + included do include ApplicationWorker @@ -18,6 +20,29 @@ module Gitlab end end + class_methods do + # We can increase the number of times a GitHubImport::Stage worker is retried + # after being interrupted if the importer it executes can restart exactly + # from where it left off. + # + # It is not safe to call this method if the importer loops over its data from + # the beginning when restarted, even if it skips data that is already imported + # inside the loop, as there is a possibility the importer will never reach + # the end of the loop. + # + # Examples of stage workers that call this method are ones that execute services that: + # + # - Continue paging an endpoint from where it left off: + # https://gitlab.com/gitlab-org/gitlab/-/blob/487521cc/lib/gitlab/github_import/parallel_scheduling.rb#L114-117 + # - Continue their loop from where it left off: + # https://gitlab.com/gitlab-org/gitlab/-/blob/024235ec/lib/gitlab/github_import/importer/pull_requests/review_requests_importer.rb#L15 + def resumes_work_when_interrupted! + return unless Feature.enabled?(:github_importer_raise_max_interruptions) + + sidekiq_options max_retries_after_interruption: MAX_RETRIES_AFTER_INTERRUPTION + end + end + # project_id - The ID of the GitLab project to import the data into. def perform(project_id) info(project_id, message: 'starting stage') @@ -54,6 +79,8 @@ module Gitlab # client - An instance of Gitlab::GithubImport::Client. # project - An instance of Project. def try_import(client, project) + project.import_state.refresh_jid_expiration + import(client, project) rescue RateLimitError self.class.perform_in(client.rate_limit_resets_in, project.id) diff --git a/app/workers/concerns/worker_attributes.rb b/app/workers/concerns/worker_attributes.rb index cb09aaf1a6a..28c82a5a38e 100644 --- a/app/workers/concerns/worker_attributes.rb +++ b/app/workers/concerns/worker_attributes.rb @@ -201,10 +201,10 @@ module WorkerAttributes !!get_class_attribute(:big_payload) end - def defer_on_database_health_signal(gitlab_schema, tables = [], delay_by = DEFAULT_DEFER_DELAY) + def defer_on_database_health_signal(gitlab_schema, tables = [], delay_by = DEFAULT_DEFER_DELAY, &block) set_class_attribute( :database_health_check_attrs, - { gitlab_schema: gitlab_schema, tables: tables, delay_by: delay_by } + { gitlab_schema: gitlab_schema, tables: tables, delay_by: delay_by, block: block } ) end diff --git a/app/workers/environments/auto_recover_worker.rb b/app/workers/environments/auto_recover_worker.rb new file mode 100644 index 00000000000..75e86e38f1a --- /dev/null +++ b/app/workers/environments/auto_recover_worker.rb @@ -0,0 +1,22 @@ +# frozen_string_literal: true + +module Environments + class AutoRecoverWorker + include ApplicationWorker + + deduplicate :until_executed + data_consistency :delayed + idempotent! + feature_category :continuous_delivery + + def perform(environment_id, _params = {}) + Environment.find_by_id(environment_id).try do |environment| + next unless environment.long_stopping? + + next unless environment.stop_actions.all?(&:complete?) + + environment.recover_stuck_stopping + end + end + end +end diff --git a/app/workers/environments/auto_stop_cron_worker.rb b/app/workers/environments/auto_stop_cron_worker.rb index 4d6453a85e7..26b18c406e5 100644 --- a/app/workers/environments/auto_stop_cron_worker.rb +++ b/app/workers/environments/auto_stop_cron_worker.rb @@ -13,6 +13,7 @@ module Environments def perform AutoStopService.new.execute + AutoRecoverService.new.execute end end end diff --git a/app/workers/gitlab/github_import/stage/import_attachments_worker.rb b/app/workers/gitlab/github_import/stage/import_attachments_worker.rb index f9952f04e99..a5d085a82c0 100644 --- a/app/workers/gitlab/github_import/stage/import_attachments_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_attachments_worker.rb @@ -11,6 +11,8 @@ module Gitlab include GithubImport::Queue include StageMethods + resumes_work_when_interrupted! + # client - An instance of Gitlab::GithubImport::Client. # project - An instance of Project. def import(client, project) @@ -48,8 +50,8 @@ module Gitlab def move_to_next_stage(project, waiters = {}) AdvanceStageWorker.perform_async( project.id, - waiters, - :protected_branches + waiters.deep_stringify_keys, + 'protected_branches' ) end end diff --git a/app/workers/gitlab/github_import/stage/import_base_data_worker.rb b/app/workers/gitlab/github_import/stage/import_base_data_worker.rb index 94cb3cb6c71..5bbe14b6528 100644 --- a/app/workers/gitlab/github_import/stage/import_base_data_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_base_data_worker.rb @@ -27,8 +27,6 @@ module Gitlab klass.new(project, client).execute end - project.import_state.refresh_jid_expiration - ImportPullRequestsWorker.perform_async(project.id) end end diff --git a/app/workers/gitlab/github_import/stage/import_collaborators_worker.rb b/app/workers/gitlab/github_import/stage/import_collaborators_worker.rb index 751ca92388a..037b529b866 100644 --- a/app/workers/gitlab/github_import/stage/import_collaborators_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_collaborators_worker.rb @@ -20,7 +20,6 @@ module Gitlab info(project.id, message: 'starting importer', importer: 'Importer::CollaboratorsImporter') waiter = Importer::CollaboratorsImporter.new(project, client).execute - project.import_state.refresh_jid_expiration move_to_next_stage(project, { waiter.key => waiter.jobs_remaining }) end @@ -44,7 +43,7 @@ module Gitlab def move_to_next_stage(project, waiters = {}) AdvanceStageWorker.perform_async( - project.id, waiters, :pull_requests_merged_by + project.id, waiters.deep_stringify_keys, 'pull_requests_merged_by' ) end end diff --git a/app/workers/gitlab/github_import/stage/import_issue_events_worker.rb b/app/workers/gitlab/github_import/stage/import_issue_events_worker.rb index c80412d941b..35779d7bfc5 100644 --- a/app/workers/gitlab/github_import/stage/import_issue_events_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_issue_events_worker.rb @@ -11,6 +11,8 @@ module Gitlab include GithubImport::Queue include StageMethods + resumes_work_when_interrupted! + # client - An instance of Gitlab::GithubImport::Client. # project - An instance of Project. def import(client, project) @@ -30,7 +32,7 @@ module Gitlab end def move_to_next_stage(project, waiters = {}) - AdvanceStageWorker.perform_async(project.id, waiters, :notes) + AdvanceStageWorker.perform_async(project.id, waiters.deep_stringify_keys, 'notes') end end end diff --git a/app/workers/gitlab/github_import/stage/import_issues_and_diff_notes_worker.rb b/app/workers/gitlab/github_import/stage/import_issues_and_diff_notes_worker.rb index 592b789cc94..58e1f637b6a 100644 --- a/app/workers/gitlab/github_import/stage/import_issues_and_diff_notes_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_issues_and_diff_notes_worker.rb @@ -11,6 +11,8 @@ module Gitlab include GithubImport::Queue include StageMethods + resumes_work_when_interrupted! + # client - An instance of Gitlab::GithubImport::Client. # project - An instance of Project. def import(client, project) @@ -20,7 +22,7 @@ module Gitlab hash[waiter.key] = waiter.jobs_remaining end - AdvanceStageWorker.perform_async(project.id, waiters, :issue_events) + AdvanceStageWorker.perform_async(project.id, waiters.deep_stringify_keys, 'issue_events') end # The importers to run in this stage. Issues can't be imported earlier diff --git a/app/workers/gitlab/github_import/stage/import_lfs_objects_worker.rb b/app/workers/gitlab/github_import/stage/import_lfs_objects_worker.rb index e89a850c991..8d7bd98f303 100644 --- a/app/workers/gitlab/github_import/stage/import_lfs_objects_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_lfs_objects_worker.rb @@ -11,6 +11,11 @@ module Gitlab include GithubImport::Queue include StageMethods + # Importer::LfsObjectsImporter can resume work when interrupted as + # it uses Projects::LfsPointers::LfsObjectDownloadListService which excludes LFS objects that already exist. + # https://gitlab.com/gitlab-org/gitlab/-/blob/eabf0800/app/services/projects/lfs_pointers/lfs_object_download_list_service.rb#L69-71 + resumes_work_when_interrupted! + def perform(project_id) return unless (project = find_project(project_id)) @@ -28,7 +33,7 @@ module Gitlab AdvanceStageWorker.perform_async( project.id, { waiter.key => waiter.jobs_remaining }, - :finish + 'finish' ) end end diff --git a/app/workers/gitlab/github_import/stage/import_notes_worker.rb b/app/workers/gitlab/github_import/stage/import_notes_worker.rb index c1fdb76d03e..0459545d8e1 100644 --- a/app/workers/gitlab/github_import/stage/import_notes_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_notes_worker.rb @@ -11,6 +11,8 @@ module Gitlab include GithubImport::Queue include StageMethods + resumes_work_when_interrupted! + # client - An instance of Gitlab::GithubImport::Client. # project - An instance of Project. def import(client, project) @@ -20,7 +22,7 @@ module Gitlab hash[waiter.key] = waiter.jobs_remaining end - AdvanceStageWorker.perform_async(project.id, waiters, :attachments) + AdvanceStageWorker.perform_async(project.id, waiters.deep_stringify_keys, 'attachments') end def importers(project) diff --git a/app/workers/gitlab/github_import/stage/import_protected_branches_worker.rb b/app/workers/gitlab/github_import/stage/import_protected_branches_worker.rb index f8448094c28..e281e965f94 100644 --- a/app/workers/gitlab/github_import/stage/import_protected_branches_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_protected_branches_worker.rb @@ -19,12 +19,10 @@ module Gitlab .new(project, client) .execute - project.import_state.refresh_jid_expiration - AdvanceStageWorker.perform_async( project.id, { waiter.key => waiter.jobs_remaining }, - :lfs_objects + 'lfs_objects' ) end end diff --git a/app/workers/gitlab/github_import/stage/import_pull_requests_merged_by_worker.rb b/app/workers/gitlab/github_import/stage/import_pull_requests_merged_by_worker.rb index 2e7cd28578f..2f543951bf3 100644 --- a/app/workers/gitlab/github_import/stage/import_pull_requests_merged_by_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_pull_requests_merged_by_worker.rb @@ -11,6 +11,8 @@ module Gitlab include GithubImport::Queue include StageMethods + resumes_work_when_interrupted! + # client - An instance of Gitlab::GithubImport::Client. # project - An instance of Project. def import(client, project) @@ -18,12 +20,10 @@ module Gitlab .new(project, client) .execute - project.import_state.refresh_jid_expiration - AdvanceStageWorker.perform_async( project.id, { waiter.key => waiter.jobs_remaining }, - :pull_request_review_requests + 'pull_request_review_requests' ) end end diff --git a/app/workers/gitlab/github_import/stage/import_pull_requests_review_requests_worker.rb b/app/workers/gitlab/github_import/stage/import_pull_requests_review_requests_worker.rb index 2f860349e25..db76545ae87 100644 --- a/app/workers/gitlab/github_import/stage/import_pull_requests_review_requests_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_pull_requests_review_requests_worker.rb @@ -11,6 +11,8 @@ module Gitlab include GithubImport::Queue include StageMethods + resumes_work_when_interrupted! + # client - An instance of Gitlab::GithubImport::Client. # project - An instance of Project. def import(client, project) @@ -18,12 +20,10 @@ module Gitlab .new(project, client) .execute - project.import_state.refresh_jid_expiration - AdvanceStageWorker.perform_async( project.id, { waiter.key => waiter.jobs_remaining }, - :pull_request_reviews + 'pull_request_reviews' ) end end diff --git a/app/workers/gitlab/github_import/stage/import_pull_requests_reviews_worker.rb b/app/workers/gitlab/github_import/stage/import_pull_requests_reviews_worker.rb index 51730033133..31b7c57a524 100644 --- a/app/workers/gitlab/github_import/stage/import_pull_requests_reviews_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_pull_requests_reviews_worker.rb @@ -11,6 +11,8 @@ module Gitlab include GithubImport::Queue include StageMethods + resumes_work_when_interrupted! + # client - An instance of Gitlab::GithubImport::Client. # project - An instance of Project. def import(client, project) @@ -18,12 +20,10 @@ module Gitlab .new(project, client) .execute - project.import_state.refresh_jid_expiration - AdvanceStageWorker.perform_async( project.id, { waiter.key => waiter.jobs_remaining }, - :issues_and_diff_notes + 'issues_and_diff_notes' ) end end diff --git a/app/workers/gitlab/github_import/stage/import_pull_requests_worker.rb b/app/workers/gitlab/github_import/stage/import_pull_requests_worker.rb index 029d38d8b93..c68b95b5111 100644 --- a/app/workers/gitlab/github_import/stage/import_pull_requests_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_pull_requests_worker.rb @@ -11,6 +11,8 @@ module Gitlab include GithubImport::Queue include StageMethods + resumes_work_when_interrupted! + # client - An instance of Gitlab::GithubImport::Client. # project - An instance of Project. def import(client, project) @@ -25,12 +27,10 @@ module Gitlab .new(project, client) .execute - project.import_state.refresh_jid_expiration - AdvanceStageWorker.perform_async( project.id, { waiter.key => waiter.jobs_remaining }, - :collaborators + 'collaborators' ) end diff --git a/app/workers/gitlab/import/advance_stage.rb b/app/workers/gitlab/import/advance_stage.rb index 180c08905ff..782439894c0 100644 --- a/app/workers/gitlab/import/advance_stage.rb +++ b/app/workers/gitlab/import/advance_stage.rb @@ -19,7 +19,7 @@ module Gitlab # completed. # timeout_timer - Time the sidekiq worker was first initiated with the current job_count # previous_job_count - Number of jobs remaining on last invocation of this worker - def perform(project_id, waiters, next_stage, timeout_timer = Time.zone.now, previous_job_count = nil) + def perform(project_id, waiters, next_stage, timeout_timer = Time.zone.now.to_s, previous_job_count = nil) import_state_jid = find_import_state_jid(project_id) # If the import state is nil the project may have been deleted or the import @@ -45,7 +45,9 @@ module Gitlab handle_timeout(import_state_jid, next_stage, project_id, new_waiters, new_job_count) else - self.class.perform_in(INTERVAL, project_id, new_waiters, next_stage, timeout_timer, previous_job_count) + self.class.perform_in(INTERVAL, + project_id, new_waiters.deep_stringify_keys, next_stage.to_s, timeout_timer.to_s, previous_job_count + ) end end diff --git a/app/workers/gitlab/jira_import/stage/import_issues_worker.rb b/app/workers/gitlab/jira_import/stage/import_issues_worker.rb index 7a5eb6c1e3a..5d890ecfe13 100644 --- a/app/workers/gitlab/jira_import/stage/import_issues_worker.rb +++ b/app/workers/gitlab/jira_import/stage/import_issues_worker.rb @@ -9,7 +9,14 @@ module Gitlab private def import(project) - jobs_waiter = Gitlab::JiraImport::IssuesImporter.new(project).execute + jira_client = if Feature.enabled?(:increase_jira_import_issues_timeout) + project.jira_integration.client(read_timeout: 2.minutes) + end + + jobs_waiter = Gitlab::JiraImport::IssuesImporter.new( + project, + jira_client + ).execute project.latest_jira_import.refresh_jid_expiration diff --git a/app/workers/hashed_storage/base_worker.rb b/app/workers/hashed_storage/base_worker.rb deleted file mode 100644 index 372440996d9..00000000000 --- a/app/workers/hashed_storage/base_worker.rb +++ /dev/null @@ -1,24 +0,0 @@ -# frozen_string_literal: true - -module HashedStorage - class BaseWorker # rubocop:disable Scalability/IdempotentWorker - include ExclusiveLeaseGuard - include WorkerAttributes - - feature_category :source_code_management - - LEASE_TIMEOUT = 30.seconds.to_i - LEASE_KEY_SEGMENT = 'project_migrate_hashed_storage_worker' - - protected - - def lease_key - # we share the same lease key for both migration and rollback so they don't run simultaneously - "#{LEASE_KEY_SEGMENT}:#{project_id}" - end - - def lease_timeout - LEASE_TIMEOUT - end - end -end diff --git a/app/workers/hashed_storage/migrator_worker.rb b/app/workers/hashed_storage/migrator_worker.rb deleted file mode 100644 index a7e7a505681..00000000000 --- a/app/workers/hashed_storage/migrator_worker.rb +++ /dev/null @@ -1,18 +0,0 @@ -# frozen_string_literal: true - -module HashedStorage - class MigratorWorker # rubocop:disable Scalability/IdempotentWorker - include ApplicationWorker - - data_consistency :always - - sidekiq_options retry: 3 - - queue_namespace :hashed_storage - feature_category :source_code_management - - # @param [Integer] start initial ID of the batch - # @param [Integer] finish last ID of the batch - def perform(start, finish); end - end -end diff --git a/app/workers/hashed_storage/project_migrate_worker.rb b/app/workers/hashed_storage/project_migrate_worker.rb deleted file mode 100644 index e1bf71de179..00000000000 --- a/app/workers/hashed_storage/project_migrate_worker.rb +++ /dev/null @@ -1,18 +0,0 @@ -# frozen_string_literal: true - -module HashedStorage - class ProjectMigrateWorker < BaseWorker # rubocop:disable Scalability/IdempotentWorker - include ApplicationWorker - - data_consistency :always - - sidekiq_options retry: 3 - - queue_namespace :hashed_storage - loggable_arguments 1 - - attr_reader :project_id - - def perform(project_id, old_disk_path = nil); end - end -end diff --git a/app/workers/hashed_storage/project_rollback_worker.rb b/app/workers/hashed_storage/project_rollback_worker.rb deleted file mode 100644 index af4223ff354..00000000000 --- a/app/workers/hashed_storage/project_rollback_worker.rb +++ /dev/null @@ -1,18 +0,0 @@ -# frozen_string_literal: true - -module HashedStorage - class ProjectRollbackWorker < BaseWorker # rubocop:disable Scalability/IdempotentWorker - include ApplicationWorker - - data_consistency :always - - sidekiq_options retry: 3 - - queue_namespace :hashed_storage - loggable_arguments 1 - - attr_reader :project_id - - def perform(project_id, old_disk_path = nil); end - end -end diff --git a/app/workers/hashed_storage/rollbacker_worker.rb b/app/workers/hashed_storage/rollbacker_worker.rb deleted file mode 100644 index e659e65a370..00000000000 --- a/app/workers/hashed_storage/rollbacker_worker.rb +++ /dev/null @@ -1,18 +0,0 @@ -# frozen_string_literal: true - -module HashedStorage - class RollbackerWorker # rubocop:disable Scalability/IdempotentWorker - include ApplicationWorker - - data_consistency :always - - sidekiq_options retry: 3 - - queue_namespace :hashed_storage - feature_category :source_code_management - - # @param [Integer] start initial ID of the batch - # @param [Integer] finish last ID of the batch - def perform(start, finish); end - end -end diff --git a/app/workers/merge_request_cleanup_refs_worker.rb b/app/workers/merge_request_cleanup_refs_worker.rb index 92dfe8a8cb0..db1a1e96997 100644 --- a/app/workers/merge_request_cleanup_refs_worker.rb +++ b/app/workers/merge_request_cleanup_refs_worker.rb @@ -18,8 +18,6 @@ class MergeRequestCleanupRefsWorker FAILURE_THRESHOLD = 3 def perform_work - return unless Feature.enabled?(:merge_request_refs_cleanup) - unless merge_request logger.error('No existing merge request to be cleaned up.') return diff --git a/app/workers/merge_requests/set_reviewer_reviewed_worker.rb b/app/workers/merge_requests/set_reviewer_reviewed_worker.rb index 2f15bf3b879..7e8bc60f6e1 100644 --- a/app/workers/merge_requests/set_reviewer_reviewed_worker.rb +++ b/app/workers/merge_requests/set_reviewer_reviewed_worker.rb @@ -13,18 +13,23 @@ module MergeRequests current_user_id = event.data[:current_user_id] merge_request_id = event.data[:merge_request_id] current_user = User.find_by_id(current_user_id) - merge_request = MergeRequest.find_by_id(merge_request_id) - if !current_user + unless current_user logger.info(structured_payload(message: 'Current user not found.', current_user_id: current_user_id)) - elsif !merge_request - logger.info(structured_payload(message: 'Merge request not found.', merge_request_id: merge_request_id)) - else - project = merge_request.source_project + return + end + + merge_request = MergeRequest.find_by_id(merge_request_id) - ::MergeRequests::MarkReviewerReviewedService.new(project: project, current_user: current_user) - .execute(merge_request) + unless merge_request + logger.info(structured_payload(message: 'Merge request not found.', merge_request_id: merge_request_id)) + return end + + project = merge_request.source_project + + ::MergeRequests::UpdateReviewerStateService.new(project: project, current_user: current_user) + .execute(merge_request, "reviewed") end end end diff --git a/app/workers/packages/cleanup_package_registry_worker.rb b/app/workers/packages/cleanup_package_registry_worker.rb index 5f14102b5a1..5b2d8bacd62 100644 --- a/app/workers/packages/cleanup_package_registry_worker.rb +++ b/app/workers/packages/cleanup_package_registry_worker.rb @@ -13,6 +13,7 @@ module Packages def perform enqueue_package_file_cleanup_job if Packages::PackageFile.pending_destruction.exists? enqueue_cleanup_policy_jobs if Packages::Cleanup::Policy.runnable.exists? + enqueue_cleanup_stale_npm_metadata_cache_job if Packages::Npm::MetadataCache.pending_destruction.exists? log_counts end @@ -27,6 +28,10 @@ module Packages Packages::Cleanup::ExecutePolicyWorker.perform_with_capacity end + def enqueue_cleanup_stale_npm_metadata_cache_job + Packages::Npm::CleanupStaleMetadataCacheWorker.perform_with_capacity + end + def log_counts use_replica_if_available do pending_destruction_package_files_count = Packages::PackageFile.pending_destruction.count diff --git a/app/workers/packages/npm/cleanup_stale_metadata_cache_worker.rb b/app/workers/packages/npm/cleanup_stale_metadata_cache_worker.rb new file mode 100644 index 00000000000..158209c28fd --- /dev/null +++ b/app/workers/packages/npm/cleanup_stale_metadata_cache_worker.rb @@ -0,0 +1,42 @@ +# frozen_string_literal: true + +module Packages + module Npm + class CleanupStaleMetadataCacheWorker + include ApplicationWorker + include ::Packages::CleanupArtifactWorker + + MAX_CAPACITY = 2 + + data_consistency :sticky + + queue_namespace :package_cleanup + feature_category :package_registry + + deduplicate :until_executed + idempotent! + + def max_running_jobs + MAX_CAPACITY + end + + private + + def model + Packages::Npm::MetadataCache + end + + def log_metadata(npm_metadata_cache) + log_extra_metadata_on_done(:npm_metadata_cache_id, npm_metadata_cache.id) + end + + def log_cleanup_item(npm_metadata_cache) + logger.info( + structured_payload( + npm_metadata_cache_id: npm_metadata_cache.id + ) + ) + end + end + end +end diff --git a/app/workers/packages/nuget/extraction_worker.rb b/app/workers/packages/nuget/extraction_worker.rb index 55aca0beb03..33fc98cf95b 100644 --- a/app/workers/packages/nuget/extraction_worker.rb +++ b/app/workers/packages/nuget/extraction_worker.rb @@ -18,7 +18,7 @@ module Packages return unless package_file - ::Packages::Nuget::UpdatePackageFromMetadataService.new(package_file).execute + ::Packages::Nuget::ProcessPackageFileService.new(package_file).execute rescue StandardError => exception process_package_file_error( package_file: package_file, diff --git a/app/workers/projects/import_export/after_import_merge_requests_worker.rb b/app/workers/projects/import_export/after_import_merge_requests_worker.rb new file mode 100644 index 00000000000..b40e0ca5f09 --- /dev/null +++ b/app/workers/projects/import_export/after_import_merge_requests_worker.rb @@ -0,0 +1,21 @@ +# frozen_string_literal: true + +module Projects + module ImportExport + class AfterImportMergeRequestsWorker + include ApplicationWorker + + idempotent! + data_consistency :delayed + urgency :low + feature_category :importers + + def perform(project_id) + project = Project.find_by_id(project_id) + return unless project + + project.merge_requests.set_latest_merge_request_diff_ids! + end + end + end +end diff --git a/app/workers/remove_expired_group_links_worker.rb b/app/workers/remove_expired_group_links_worker.rb index f1da5f37945..0bac595f0c4 100644 --- a/app/workers/remove_expired_group_links_worker.rb +++ b/app/workers/remove_expired_group_links_worker.rb @@ -11,7 +11,7 @@ class RemoveExpiredGroupLinksWorker # rubocop:disable Scalability/IdempotentWork def perform ProjectGroupLink.expired.find_each do |link| - Projects::GroupLinks::DestroyService.new(link.project, nil).execute(link) + Projects::GroupLinks::DestroyService.new(link.project, nil).execute(link, skip_authorization: true) end GroupGroupLink.expired.find_in_batches do |link_batch| diff --git a/app/workers/repository_fork_worker.rb b/app/workers/repository_fork_worker.rb index 5ec9ceaf004..f4a507246ac 100644 --- a/app/workers/repository_fork_worker.rb +++ b/app/workers/repository_fork_worker.rb @@ -2,6 +2,7 @@ class RepositoryForkWorker # rubocop:disable Scalability/IdempotentWorker include ApplicationWorker + include Gitlab::Utils::StrongMemoize data_consistency :always @@ -12,10 +13,8 @@ class RepositoryForkWorker # rubocop:disable Scalability/IdempotentWorker feature_category :source_code_management def perform(*args) - target_project_id = args.shift - target_project = Project.find(target_project_id) + @target_project_id = args.shift - source_project = target_project.forked_from_project unless source_project return target_project.import_state.mark_as_failed(_('Source project cannot be found.')) end @@ -25,6 +24,21 @@ class RepositoryForkWorker # rubocop:disable Scalability/IdempotentWorker private + def target_project + Project.find(@target_project_id) + end + strong_memoize_attr :target_project + + def source_project + @source_project ||= target_project.forked_from_project + end + + def branch + return unless target_project.import_data&.data + + target_project.import_data.data['fork_branch'] + end + def fork_repository(target_project, source_project) return unless start_fork(target_project) @@ -46,7 +60,7 @@ class RepositoryForkWorker # rubocop:disable Scalability/IdempotentWorker source_repo = source_project.repository.raw target_repo = target_project.repository.raw - ::Gitlab::GitalyClient::RepositoryService.new(target_repo).fork_repository(source_repo) + ::Gitlab::GitalyClient::RepositoryService.new(target_repo).fork_repository(source_repo, branch) rescue GRPC::BadStatus => e Gitlab::ErrorTracking.track_exception(e, source_project_id: source_project.id, target_project_id: target_project.id) diff --git a/app/workers/schedule_merge_request_cleanup_refs_worker.rb b/app/workers/schedule_merge_request_cleanup_refs_worker.rb index ced1f443ea6..2ecc95335e2 100644 --- a/app/workers/schedule_merge_request_cleanup_refs_worker.rb +++ b/app/workers/schedule_merge_request_cleanup_refs_worker.rb @@ -12,7 +12,6 @@ class ScheduleMergeRequestCleanupRefsWorker def perform return if Gitlab::Database.read_only? - return unless Feature.enabled?(:merge_request_refs_cleanup) MergeRequest::CleanupSchedule.stuck_retry! MergeRequestCleanupRefsWorker.perform_with_capacity diff --git a/app/workers/tasks_to_be_done/create_worker.rb b/app/workers/tasks_to_be_done/create_worker.rb deleted file mode 100644 index 91046e3cfed..00000000000 --- a/app/workers/tasks_to_be_done/create_worker.rb +++ /dev/null @@ -1,18 +0,0 @@ -# frozen_string_literal: true - -module TasksToBeDone - class CreateWorker - include ApplicationWorker - - data_consistency :always - idempotent! - feature_category :onboarding - urgency :low - worker_resource_boundary :cpu - - def perform(member_task_id, current_user_id, assignee_ids = []) - # no-op removing - # https://docs.gitlab.com/ee/development/sidekiq/compatibility_across_updates.html#removing-worker-classes - end - end -end |