diff options
author | Kamil Trzciński <ayufan@ayufan.eu> | 2018-02-28 22:46:53 +0300 |
---|---|---|
committer | Kamil Trzciński <ayufan@ayufan.eu> | 2018-02-28 22:46:53 +0300 |
commit | 45d2c31643017807cb3fc66c0be6e9cad9964faf (patch) | |
tree | b3564188de7323969c6ba0a7ec4c86a3b6cce2ac /app/workers | |
parent | 87f11d2cf539d9539b439b54355f0dadaf4ebf76 (diff) | |
parent | 4b92efd90cedaa0aff218d11fdce279701128bea (diff) |
Merge commit '4b92efd90cedaa0aff218d11fdce279701128bea' into object-storage-ee-to-ce-backport
Diffstat (limited to 'app/workers')
32 files changed, 337 insertions, 75 deletions
diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml new file mode 100644 index 00000000000..50e876b1d19 --- /dev/null +++ b/app/workers/all_queues.yml @@ -0,0 +1,101 @@ +--- +- cronjob:admin_email +- cronjob:expire_build_artifacts +- cronjob:gitlab_usage_ping +- cronjob:import_export_project_cleanup +- cronjob:pipeline_schedule +- cronjob:prune_old_events +- cronjob:remove_expired_group_links +- cronjob:remove_expired_members +- cronjob:remove_old_web_hook_logs +- cronjob:remove_unreferenced_lfs_objects +- cronjob:repository_archive_cache +- cronjob:repository_check_batch +- cronjob:requests_profiles +- cronjob:schedule_update_user_activity +- cronjob:stuck_ci_jobs +- cronjob:stuck_import_jobs +- cronjob:stuck_merge_jobs +- cronjob:trending_projects + +- gcp_cluster:cluster_install_app +- gcp_cluster:cluster_provision +- gcp_cluster:cluster_wait_for_app_installation +- gcp_cluster:wait_for_cluster_creation +- gcp_cluster:check_gcp_project_billing + +- github_import_advance_stage +- github_importer:github_import_import_diff_note +- github_importer:github_import_import_issue +- github_importer:github_import_import_note +- github_importer:github_import_import_pull_request +- github_importer:github_import_refresh_import_jid +- github_importer:github_import_stage_finish_import +- github_importer:github_import_stage_import_base_data +- github_importer:github_import_stage_import_issues_and_diff_notes +- github_importer:github_import_stage_import_notes +- github_importer:github_import_stage_import_pull_requests +- github_importer:github_import_stage_import_repository + +- pipeline_cache:expire_job_cache +- pipeline_cache:expire_pipeline_cache +- pipeline_creation:create_pipeline +- pipeline_creation:run_pipeline_schedule +- pipeline_default:build_coverage +- pipeline_default:build_trace_sections +- pipeline_default:pipeline_metrics +- pipeline_default:pipeline_notification +- pipeline_default:update_head_pipeline_for_merge_request +- pipeline_hooks:build_hooks +- pipeline_hooks:pipeline_hooks +- pipeline_processing:build_finished +- pipeline_processing:build_queue +- pipeline_processing:build_success +- pipeline_processing:pipeline_process +- pipeline_processing:pipeline_success +- pipeline_processing:pipeline_update +- pipeline_processing:stage_update + +- repository_check:repository_check_clear +- repository_check:repository_check_single_repository + +- default +- mailers # ActionMailer::DeliveryJob.queue_name + +- authorized_projects +- background_migration +- create_gpg_signature +- delete_merged_branches +- delete_user +- email_receiver +- emails_on_push +- expire_build_instance_artifacts +- git_garbage_collect +- gitlab_shell +- group_destroy +- invalid_gpg_signature_update +- irker +- merge +- namespaceless_project_destroy +- new_issue +- new_merge_request +- new_note +- pages +- post_receive +- process_commit +- project_cache +- project_destroy +- project_export +- project_migrate_hashed_storage +- project_service +- propagate_service_template +- reactive_caching +- rebase +- repository_fork +- repository_import +- storage_migrator +- system_hook_push +- update_merge_requests +- update_user_activity +- upload_checksum +- web_hook diff --git a/app/workers/background_migration_worker.rb b/app/workers/background_migration_worker.rb index aeb3bc019b9..376703f6319 100644 --- a/app/workers/background_migration_worker.rb +++ b/app/workers/background_migration_worker.rb @@ -1,10 +1,53 @@ class BackgroundMigrationWorker include ApplicationWorker + # The minimum amount of time between processing two jobs of the same migration + # class. + # + # This interval is set to 5 minutes so autovacuuming and other maintenance + # related tasks have plenty of time to clean up after a migration has been + # performed. + MIN_INTERVAL = 5.minutes.to_i + # Performs the background migration. # # See Gitlab::BackgroundMigration.perform for more information. + # + # class_name - The class name of the background migration to run. + # arguments - The arguments to pass to the migration class. def perform(class_name, arguments = []) - Gitlab::BackgroundMigration.perform(class_name, arguments) + should_perform, ttl = perform_and_ttl(class_name) + + if should_perform + Gitlab::BackgroundMigration.perform(class_name, arguments) + else + # If the lease could not be obtained this means either another process is + # running a migration of this class or we ran one recently. In this case + # we'll reschedule the job in such a way that it is picked up again around + # the time the lease expires. + self.class.perform_in(ttl || MIN_INTERVAL, class_name, arguments) + end + end + + def perform_and_ttl(class_name) + if always_perform? + # In test environments `perform_in` will run right away. This can then + # lead to stack level errors in the above `#perform`. To work around this + # we'll just perform the migration right away in the test environment. + [true, nil] + else + lease = lease_for(class_name) + + [lease.try_obtain, lease.ttl] + end + end + + def lease_for(class_name) + Gitlab::ExclusiveLease + .new("#{self.class.name}:#{class_name}", timeout: MIN_INTERVAL) + end + + def always_perform? + Rails.env.test? end end diff --git a/app/workers/build_finished_worker.rb b/app/workers/build_finished_worker.rb index 5efa9180f5e..97d80305bec 100644 --- a/app/workers/build_finished_worker.rb +++ b/app/workers/build_finished_worker.rb @@ -2,7 +2,7 @@ class BuildFinishedWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :processing + queue_namespace :pipeline_processing def perform(build_id) Ci::Build.find_by(id: build_id).try do |build| diff --git a/app/workers/build_hooks_worker.rb b/app/workers/build_hooks_worker.rb index 6705a1c2709..cbfca8c342c 100644 --- a/app/workers/build_hooks_worker.rb +++ b/app/workers/build_hooks_worker.rb @@ -2,7 +2,7 @@ class BuildHooksWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :hooks + queue_namespace :pipeline_hooks def perform(build_id) Ci::Build.find_by(id: build_id) diff --git a/app/workers/build_queue_worker.rb b/app/workers/build_queue_worker.rb index fc775a84dc0..e4f4e6c1d9e 100644 --- a/app/workers/build_queue_worker.rb +++ b/app/workers/build_queue_worker.rb @@ -2,7 +2,7 @@ class BuildQueueWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :processing + queue_namespace :pipeline_processing def perform(build_id) Ci::Build.find_by(id: build_id).try do |build| diff --git a/app/workers/build_success_worker.rb b/app/workers/build_success_worker.rb index ec049821ad7..4b9097bc5e4 100644 --- a/app/workers/build_success_worker.rb +++ b/app/workers/build_success_worker.rb @@ -2,7 +2,7 @@ class BuildSuccessWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :processing + queue_namespace :pipeline_processing def perform(build_id) Ci::Build.find_by(id: build_id).try do |build| diff --git a/app/workers/check_gcp_project_billing_worker.rb b/app/workers/check_gcp_project_billing_worker.rb new file mode 100644 index 00000000000..557af14ee57 --- /dev/null +++ b/app/workers/check_gcp_project_billing_worker.rb @@ -0,0 +1,59 @@ +require 'securerandom' + +class CheckGcpProjectBillingWorker + include ApplicationWorker + include ClusterQueue + + LEASE_TIMEOUT = 15.seconds.to_i + SESSION_KEY_TIMEOUT = 5.minutes + BILLING_TIMEOUT = 1.hour + + def self.get_session_token(token_key) + Gitlab::Redis::SharedState.with do |redis| + redis.get(get_redis_session_key(token_key)) + end + end + + def self.store_session_token(token) + generate_token_key.tap do |token_key| + Gitlab::Redis::SharedState.with do |redis| + redis.set(get_redis_session_key(token_key), token, ex: SESSION_KEY_TIMEOUT) + end + end + end + + def self.redis_shared_state_key_for(token) + "gitlab:gcp:#{token.hash}:billing_enabled" + end + + def perform(token_key) + return unless token_key + + token = self.get_session_token(token_key) + return unless token + return unless try_obtain_lease_for(token) + + billing_enabled_projects = CheckGcpProjectBillingService.new.execute(token) + Gitlab::Redis::SharedState.with do |redis| + redis.set(self.class.redis_shared_state_key_for(token), + !billing_enabled_projects.empty?, + ex: BILLING_TIMEOUT) + end + end + + private + + def self.generate_token_key + SecureRandom.uuid + end + + def self.get_redis_session_key(token_key) + "gitlab:gcp:session:#{token_key}" + end + + def try_obtain_lease_for(token) + Gitlab::ExclusiveLease + .new("check_gcp_project_billing_worker:#{token.hash}", timeout: LEASE_TIMEOUT) + .try_obtain + end +end diff --git a/app/workers/concerns/application_worker.rb b/app/workers/concerns/application_worker.rb index 9c3bdabc49e..37586e161c9 100644 --- a/app/workers/concerns/application_worker.rb +++ b/app/workers/concerns/application_worker.rb @@ -3,13 +3,23 @@ Sidekiq::Worker.extend ActiveSupport::Concern module ApplicationWorker extend ActiveSupport::Concern - include Sidekiq::Worker + include Sidekiq::Worker # rubocop:disable Cop/IncludeSidekiqWorker included do - sidekiq_options queue: base_queue_name + set_queue end module ClassMethods + def inherited(subclass) + subclass.set_queue + end + + def set_queue + queue_name = [queue_namespace, base_queue_name].compact.join(':') + + sidekiq_options queue: queue_name # rubocop:disable Cop/SidekiqOptionsQueue + end + def base_queue_name name .sub(/\AGitlab::/, '') @@ -18,6 +28,16 @@ module ApplicationWorker .tr('/', '_') end + def queue_namespace(new_namespace = nil) + if new_namespace + sidekiq_options queue_namespace: new_namespace + + set_queue + else + get_sidekiq_options['queue_namespace']&.to_s + end + end + def queue get_sidekiq_options['queue'].to_s end diff --git a/app/workers/concerns/cluster_queue.rb b/app/workers/concerns/cluster_queue.rb index a5074d13220..24b9f145220 100644 --- a/app/workers/concerns/cluster_queue.rb +++ b/app/workers/concerns/cluster_queue.rb @@ -5,6 +5,6 @@ module ClusterQueue extend ActiveSupport::Concern included do - sidekiq_options queue: :gcp_cluster + queue_namespace :gcp_cluster end end diff --git a/app/workers/concerns/cronjob_queue.rb b/app/workers/concerns/cronjob_queue.rb index e918bb011e0..b6581779f6a 100644 --- a/app/workers/concerns/cronjob_queue.rb +++ b/app/workers/concerns/cronjob_queue.rb @@ -4,6 +4,7 @@ module CronjobQueue extend ActiveSupport::Concern included do - sidekiq_options queue: :cronjob, retry: false + queue_namespace :cronjob + sidekiq_options retry: false end end diff --git a/app/workers/concerns/gitlab/github_import/queue.rb b/app/workers/concerns/gitlab/github_import/queue.rb index a2bee361b86..22c2ce458e8 100644 --- a/app/workers/concerns/gitlab/github_import/queue.rb +++ b/app/workers/concerns/gitlab/github_import/queue.rb @@ -4,12 +4,14 @@ module Gitlab extend ActiveSupport::Concern included do + queue_namespace :github_importer + # If a job produces an error it may block a stage from advancing # forever. To prevent this from happening we prevent jobs from going to # the dead queue. This does mean some resources may not be imported, but # this is better than a project being stuck in the "import" state # forever. - sidekiq_options queue: 'github_importer', dead: false, retry: 5 + sidekiq_options dead: false, retry: 5 end end end diff --git a/app/workers/concerns/new_issuable.rb b/app/workers/concerns/new_issuable.rb index eb0d6c9c36c..526ed0bad07 100644 --- a/app/workers/concerns/new_issuable.rb +++ b/app/workers/concerns/new_issuable.rb @@ -9,15 +9,15 @@ module NewIssuable end def set_user(user_id) - @user = User.find_by(id: user_id) + @user = User.find_by(id: user_id) # rubocop:disable Gitlab/ModuleWithInstanceVariables - log_error(User, user_id) unless @user + log_error(User, user_id) unless @user # rubocop:disable Gitlab/ModuleWithInstanceVariables end def set_issuable(issuable_id) - @issuable = issuable_class.find_by(id: issuable_id) + @issuable = issuable_class.find_by(id: issuable_id) # rubocop:disable Gitlab/ModuleWithInstanceVariables - log_error(issuable_class, issuable_id) unless @issuable + log_error(issuable_class, issuable_id) unless @issuable # rubocop:disable Gitlab/ModuleWithInstanceVariables end def log_error(record_class, record_id) diff --git a/app/workers/concerns/pipeline_queue.rb b/app/workers/concerns/pipeline_queue.rb index ddf45b91345..e77093a6902 100644 --- a/app/workers/concerns/pipeline_queue.rb +++ b/app/workers/concerns/pipeline_queue.rb @@ -5,14 +5,6 @@ module PipelineQueue extend ActiveSupport::Concern included do - sidekiq_options queue: 'pipeline_default' - end - - class_methods do - def enqueue_in(group:) - raise ArgumentError, 'Unspecified queue group!' if group.empty? - - sidekiq_options queue: "pipeline_#{group}" - end + queue_namespace :pipeline_default end end diff --git a/app/workers/concerns/project_import_options.rb b/app/workers/concerns/project_import_options.rb new file mode 100644 index 00000000000..10b971344f7 --- /dev/null +++ b/app/workers/concerns/project_import_options.rb @@ -0,0 +1,23 @@ +module ProjectImportOptions + extend ActiveSupport::Concern + + included do + IMPORT_RETRY_COUNT = 5 + + sidekiq_options retry: IMPORT_RETRY_COUNT, status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION + + # We only want to mark the project as failed once we exhausted all retries + sidekiq_retries_exhausted do |job| + project = Project.find(job['args'].first) + + action = if project.forked? + "fork" + else + "import" + end + + project.mark_import_as_failed("Every #{action} attempt has failed: #{job['error_message']}. Please try again.") + Sidekiq.logger.warn "Failed #{job['class']} with #{job['args']}: #{job['error_message']}" + end + end +end diff --git a/app/workers/concerns/project_start_import.rb b/app/workers/concerns/project_start_import.rb index 0704ebbb0fd..4e55a1ee3d6 100644 --- a/app/workers/concerns/project_start_import.rb +++ b/app/workers/concerns/project_start_import.rb @@ -1,3 +1,4 @@ +# Used in EE by mirroring module ProjectStartImport def start(project) if project.import_started? && project.import_jid == self.jid diff --git a/app/workers/concerns/repository_check_queue.rb b/app/workers/concerns/repository_check_queue.rb index a597321ccf4..43fb66c31b0 100644 --- a/app/workers/concerns/repository_check_queue.rb +++ b/app/workers/concerns/repository_check_queue.rb @@ -3,6 +3,8 @@ module RepositoryCheckQueue extend ActiveSupport::Concern included do - sidekiq_options queue: :repository_check, retry: false + queue_namespace :repository_check + + sidekiq_options retry: false end end diff --git a/app/workers/create_pipeline_worker.rb b/app/workers/create_pipeline_worker.rb index 00cd7b85b9f..c3ac35e54f5 100644 --- a/app/workers/create_pipeline_worker.rb +++ b/app/workers/create_pipeline_worker.rb @@ -2,7 +2,7 @@ class CreatePipelineWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :creation + queue_namespace :pipeline_creation def perform(project_id, user_id, ref, source, params = {}) project = Project.find(project_id) diff --git a/app/workers/expire_job_cache_worker.rb b/app/workers/expire_job_cache_worker.rb index a591e2da519..7217364a9f2 100644 --- a/app/workers/expire_job_cache_worker.rb +++ b/app/workers/expire_job_cache_worker.rb @@ -2,7 +2,7 @@ class ExpireJobCacheWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :cache + queue_namespace :pipeline_cache def perform(job_id) job = CommitStatus.joins(:pipeline, :project).find_by(id: job_id) diff --git a/app/workers/expire_pipeline_cache_worker.rb b/app/workers/expire_pipeline_cache_worker.rb index a3ac32b437d..db73d37868a 100644 --- a/app/workers/expire_pipeline_cache_worker.rb +++ b/app/workers/expire_pipeline_cache_worker.rb @@ -2,7 +2,7 @@ class ExpirePipelineCacheWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :cache + queue_namespace :pipeline_cache def perform(pipeline_id) pipeline = Ci::Pipeline.find_by(id: pipeline_id) @@ -13,7 +13,7 @@ class ExpirePipelineCacheWorker store.touch(project_pipelines_path(project)) store.touch(project_pipeline_path(project, pipeline)) - store.touch(commit_pipelines_path(project, pipeline.commit)) if pipeline.commit + store.touch(commit_pipelines_path(project, pipeline.commit)) unless pipeline.commit.nil? store.touch(new_merge_request_pipelines_path(project)) each_pipelines_merge_request_path(project, pipeline) do |path| store.touch(path) diff --git a/app/workers/gitlab/github_import/advance_stage_worker.rb b/app/workers/gitlab/github_import/advance_stage_worker.rb index 400396d5755..f7f498af840 100644 --- a/app/workers/gitlab/github_import/advance_stage_worker.rb +++ b/app/workers/gitlab/github_import/advance_stage_worker.rb @@ -9,7 +9,7 @@ module Gitlab class AdvanceStageWorker include ApplicationWorker - sidekiq_options queue: 'github_importer_advance_stage', dead: false + sidekiq_options dead: false INTERVAL = 30.seconds.to_i diff --git a/app/workers/pages_worker.rb b/app/workers/pages_worker.rb index 62f733c02fc..3ec81d040b4 100644 --- a/app/workers/pages_worker.rb +++ b/app/workers/pages_worker.rb @@ -1,7 +1,7 @@ class PagesWorker include ApplicationWorker - sidekiq_options queue: :pages, retry: false + sidekiq_options retry: false def perform(action, *arg) send(action, *arg) # rubocop:disable GitlabSecurity/PublicSend diff --git a/app/workers/pipeline_hooks_worker.rb b/app/workers/pipeline_hooks_worker.rb index 661c29efe88..c94918ff4ee 100644 --- a/app/workers/pipeline_hooks_worker.rb +++ b/app/workers/pipeline_hooks_worker.rb @@ -2,7 +2,7 @@ class PipelineHooksWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :hooks + queue_namespace :pipeline_hooks def perform(pipeline_id) Ci::Pipeline.find_by(id: pipeline_id) diff --git a/app/workers/pipeline_process_worker.rb b/app/workers/pipeline_process_worker.rb index 07dbf6a971e..24424b3f472 100644 --- a/app/workers/pipeline_process_worker.rb +++ b/app/workers/pipeline_process_worker.rb @@ -2,7 +2,7 @@ class PipelineProcessWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :processing + queue_namespace :pipeline_processing def perform(pipeline_id) Ci::Pipeline.find_by(id: pipeline_id) diff --git a/app/workers/pipeline_success_worker.rb b/app/workers/pipeline_success_worker.rb index 68c40a259e1..2ab0739a17f 100644 --- a/app/workers/pipeline_success_worker.rb +++ b/app/workers/pipeline_success_worker.rb @@ -2,7 +2,7 @@ class PipelineSuccessWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :processing + queue_namespace :pipeline_processing def perform(pipeline_id) Ci::Pipeline.find_by(id: pipeline_id).try do |pipeline| diff --git a/app/workers/pipeline_update_worker.rb b/app/workers/pipeline_update_worker.rb index 24a8a9fbed5..fc9da2d45b1 100644 --- a/app/workers/pipeline_update_worker.rb +++ b/app/workers/pipeline_update_worker.rb @@ -2,7 +2,7 @@ class PipelineUpdateWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :processing + queue_namespace :pipeline_processing def perform(pipeline_id) Ci::Pipeline.find_by(id: pipeline_id) diff --git a/app/workers/rebase_worker.rb b/app/workers/rebase_worker.rb new file mode 100644 index 00000000000..090987778a2 --- /dev/null +++ b/app/workers/rebase_worker.rb @@ -0,0 +1,12 @@ +class RebaseWorker + include ApplicationWorker + + def perform(merge_request_id, current_user_id) + current_user = User.find(current_user_id) + merge_request = MergeRequest.find(merge_request_id) + + MergeRequests::RebaseService + .new(merge_request.source_project, current_user) + .execute(merge_request) + end +end diff --git a/app/workers/repository_fork_worker.rb b/app/workers/repository_fork_worker.rb index a07ef1705a1..d1c57b82681 100644 --- a/app/workers/repository_fork_worker.rb +++ b/app/workers/repository_fork_worker.rb @@ -1,11 +1,8 @@ class RepositoryForkWorker - ForkError = Class.new(StandardError) - include ApplicationWorker include Gitlab::ShellAdapter include ProjectStartImport - - sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION + include ProjectImportOptions def perform(project_id, forked_from_repository_storage_path, source_disk_path) project = Project.find(project_id) @@ -18,20 +15,12 @@ class RepositoryForkWorker result = gitlab_shell.fork_repository(forked_from_repository_storage_path, source_disk_path, project.repository_storage_path, project.disk_path) - raise ForkError, "Unable to fork project #{project_id} for repository #{source_disk_path} -> #{project.disk_path}" unless result + raise "Unable to fork project #{project_id} for repository #{source_disk_path} -> #{project.disk_path}" unless result project.repository.after_import - raise ForkError, "Project #{project_id} had an invalid repository after fork" unless project.valid_repo? + raise "Project #{project_id} had an invalid repository after fork" unless project.valid_repo? project.import_finish - rescue ForkError => ex - fail_fork(project, ex.message) - raise - rescue => ex - return unless project - - fail_fork(project, ex.message) - raise ForkError, "#{ex.class} #{ex.message}" end private @@ -42,9 +31,4 @@ class RepositoryForkWorker Rails.logger.info("Project #{project.full_path} was in inconsistent state (#{project.import_status}) while forking.") false end - - def fail_fork(project, message) - Rails.logger.error(message) - project.mark_import_as_failed(message) - end end diff --git a/app/workers/repository_import_worker.rb b/app/workers/repository_import_worker.rb index 55715c83cb1..31e2798c36b 100644 --- a/app/workers/repository_import_worker.rb +++ b/app/workers/repository_import_worker.rb @@ -1,11 +1,8 @@ class RepositoryImportWorker - ImportError = Class.new(StandardError) - include ApplicationWorker include ExceptionBacktrace include ProjectStartImport - - sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION + include ProjectImportOptions def perform(project_id) project = Project.find(project_id) @@ -23,17 +20,9 @@ class RepositoryImportWorker # to those importers to mark the import process as complete. return if service.async? - raise ImportError, result[:message] if result[:status] == :error + raise result[:message] if result[:status] == :error project.after_import - rescue ImportError => ex - fail_import(project, ex.message) - raise - rescue => ex - return unless project - - fail_import(project, ex.message) - raise ImportError, "#{ex.class} #{ex.message}" end private @@ -44,8 +33,4 @@ class RepositoryImportWorker Rails.logger.info("Project #{project.full_path} was in inconsistent state (#{project.import_status}) while importing.") false end - - def fail_import(project, message) - project.mark_import_as_failed(message) - end end diff --git a/app/workers/run_pipeline_schedule_worker.rb b/app/workers/run_pipeline_schedule_worker.rb new file mode 100644 index 00000000000..8f5138fc873 --- /dev/null +++ b/app/workers/run_pipeline_schedule_worker.rb @@ -0,0 +1,22 @@ +class RunPipelineScheduleWorker + include ApplicationWorker + include PipelineQueue + + queue_namespace :pipeline_creation + + def perform(schedule_id, user_id) + schedule = Ci::PipelineSchedule.find_by(id: schedule_id) + user = User.find_by(id: user_id) + + return unless schedule && user + + run_pipeline_schedule(schedule, user) + end + + def run_pipeline_schedule(schedule, user) + Ci::CreatePipelineService.new(schedule.project, + user, + ref: schedule.ref) + .execute(:schedule, ignore_skip_ci: true, save_on_errors: false, schedule: schedule) + end +end diff --git a/app/workers/stage_update_worker.rb b/app/workers/stage_update_worker.rb index 69f2318d83b..e4b683fca33 100644 --- a/app/workers/stage_update_worker.rb +++ b/app/workers/stage_update_worker.rb @@ -2,7 +2,7 @@ class StageUpdateWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :processing + queue_namespace :pipeline_processing def perform(stage_id) Ci::Stage.find_by(id: stage_id).try do |stage| diff --git a/app/workers/stuck_merge_jobs_worker.rb b/app/workers/stuck_merge_jobs_worker.rb index 36d2a2e6466..16394293c79 100644 --- a/app/workers/stuck_merge_jobs_worker.rb +++ b/app/workers/stuck_merge_jobs_worker.rb @@ -23,7 +23,12 @@ class StuckMergeJobsWorker merge_requests = MergeRequest.where(id: completed_ids) merge_requests.where.not(merge_commit_sha: nil).update_all(state: :merged) - merge_requests.where(merge_commit_sha: nil).update_all(state: :opened, merge_jid: nil) + + merge_requests_to_reopen = merge_requests.where(merge_commit_sha: nil) + + # Do not reopen merge requests using direct queries. + # We rely on state machine callbacks to update head_pipeline_id + merge_requests_to_reopen.each(&:unlock_mr) Rails.logger.info("Updated state of locked merge jobs. JIDs: #{completed_jids.join(', ')}") end diff --git a/app/workers/update_head_pipeline_for_merge_request_worker.rb b/app/workers/update_head_pipeline_for_merge_request_worker.rb index 0a2e9b63578..f09d89aa170 100644 --- a/app/workers/update_head_pipeline_for_merge_request_worker.rb +++ b/app/workers/update_head_pipeline_for_merge_request_worker.rb @@ -1,15 +1,25 @@ class UpdateHeadPipelineForMergeRequestWorker include ApplicationWorker - - sidekiq_options queue: 'pipeline_default' + include PipelineQueue def perform(merge_request_id) merge_request = MergeRequest.find(merge_request_id) pipeline = Ci::Pipeline.where(project: merge_request.source_project, ref: merge_request.source_branch).last return unless pipeline && pipeline.latest? - raise ArgumentError, 'merge request sha does not equal pipeline sha' if merge_request.diff_head_sha != pipeline.sha + + if merge_request.diff_head_sha != pipeline.sha + log_error_message_for(merge_request) + + return + end merge_request.update_attribute(:head_pipeline_id, pipeline.id) end + + def log_error_message_for(merge_request) + Rails.logger.error( + "Outdated head pipeline for active merge request: id=#{merge_request.id}, source_branch=#{merge_request.source_branch}, diff_head_sha=#{merge_request.diff_head_sha}" + ) + end end |