Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKamil Trzciński <ayufan@ayufan.eu>2018-02-28 22:46:53 +0300
committerKamil Trzciński <ayufan@ayufan.eu>2018-02-28 22:46:53 +0300
commit45d2c31643017807cb3fc66c0be6e9cad9964faf (patch)
treeb3564188de7323969c6ba0a7ec4c86a3b6cce2ac /app/workers
parent87f11d2cf539d9539b439b54355f0dadaf4ebf76 (diff)
parent4b92efd90cedaa0aff218d11fdce279701128bea (diff)
Merge commit '4b92efd90cedaa0aff218d11fdce279701128bea' into object-storage-ee-to-ce-backport
Diffstat (limited to 'app/workers')
-rw-r--r--app/workers/all_queues.yml101
-rw-r--r--app/workers/background_migration_worker.rb45
-rw-r--r--app/workers/build_finished_worker.rb2
-rw-r--r--app/workers/build_hooks_worker.rb2
-rw-r--r--app/workers/build_queue_worker.rb2
-rw-r--r--app/workers/build_success_worker.rb2
-rw-r--r--app/workers/check_gcp_project_billing_worker.rb59
-rw-r--r--app/workers/concerns/application_worker.rb24
-rw-r--r--app/workers/concerns/cluster_queue.rb2
-rw-r--r--app/workers/concerns/cronjob_queue.rb3
-rw-r--r--app/workers/concerns/gitlab/github_import/queue.rb4
-rw-r--r--app/workers/concerns/new_issuable.rb8
-rw-r--r--app/workers/concerns/pipeline_queue.rb10
-rw-r--r--app/workers/concerns/project_import_options.rb23
-rw-r--r--app/workers/concerns/project_start_import.rb1
-rw-r--r--app/workers/concerns/repository_check_queue.rb4
-rw-r--r--app/workers/create_pipeline_worker.rb2
-rw-r--r--app/workers/expire_job_cache_worker.rb2
-rw-r--r--app/workers/expire_pipeline_cache_worker.rb4
-rw-r--r--app/workers/gitlab/github_import/advance_stage_worker.rb2
-rw-r--r--app/workers/pages_worker.rb2
-rw-r--r--app/workers/pipeline_hooks_worker.rb2
-rw-r--r--app/workers/pipeline_process_worker.rb2
-rw-r--r--app/workers/pipeline_success_worker.rb2
-rw-r--r--app/workers/pipeline_update_worker.rb2
-rw-r--r--app/workers/rebase_worker.rb12
-rw-r--r--app/workers/repository_fork_worker.rb22
-rw-r--r--app/workers/repository_import_worker.rb19
-rw-r--r--app/workers/run_pipeline_schedule_worker.rb22
-rw-r--r--app/workers/stage_update_worker.rb2
-rw-r--r--app/workers/stuck_merge_jobs_worker.rb7
-rw-r--r--app/workers/update_head_pipeline_for_merge_request_worker.rb16
32 files changed, 337 insertions, 75 deletions
diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml
new file mode 100644
index 00000000000..50e876b1d19
--- /dev/null
+++ b/app/workers/all_queues.yml
@@ -0,0 +1,101 @@
+---
+- cronjob:admin_email
+- cronjob:expire_build_artifacts
+- cronjob:gitlab_usage_ping
+- cronjob:import_export_project_cleanup
+- cronjob:pipeline_schedule
+- cronjob:prune_old_events
+- cronjob:remove_expired_group_links
+- cronjob:remove_expired_members
+- cronjob:remove_old_web_hook_logs
+- cronjob:remove_unreferenced_lfs_objects
+- cronjob:repository_archive_cache
+- cronjob:repository_check_batch
+- cronjob:requests_profiles
+- cronjob:schedule_update_user_activity
+- cronjob:stuck_ci_jobs
+- cronjob:stuck_import_jobs
+- cronjob:stuck_merge_jobs
+- cronjob:trending_projects
+
+- gcp_cluster:cluster_install_app
+- gcp_cluster:cluster_provision
+- gcp_cluster:cluster_wait_for_app_installation
+- gcp_cluster:wait_for_cluster_creation
+- gcp_cluster:check_gcp_project_billing
+
+- github_import_advance_stage
+- github_importer:github_import_import_diff_note
+- github_importer:github_import_import_issue
+- github_importer:github_import_import_note
+- github_importer:github_import_import_pull_request
+- github_importer:github_import_refresh_import_jid
+- github_importer:github_import_stage_finish_import
+- github_importer:github_import_stage_import_base_data
+- github_importer:github_import_stage_import_issues_and_diff_notes
+- github_importer:github_import_stage_import_notes
+- github_importer:github_import_stage_import_pull_requests
+- github_importer:github_import_stage_import_repository
+
+- pipeline_cache:expire_job_cache
+- pipeline_cache:expire_pipeline_cache
+- pipeline_creation:create_pipeline
+- pipeline_creation:run_pipeline_schedule
+- pipeline_default:build_coverage
+- pipeline_default:build_trace_sections
+- pipeline_default:pipeline_metrics
+- pipeline_default:pipeline_notification
+- pipeline_default:update_head_pipeline_for_merge_request
+- pipeline_hooks:build_hooks
+- pipeline_hooks:pipeline_hooks
+- pipeline_processing:build_finished
+- pipeline_processing:build_queue
+- pipeline_processing:build_success
+- pipeline_processing:pipeline_process
+- pipeline_processing:pipeline_success
+- pipeline_processing:pipeline_update
+- pipeline_processing:stage_update
+
+- repository_check:repository_check_clear
+- repository_check:repository_check_single_repository
+
+- default
+- mailers # ActionMailer::DeliveryJob.queue_name
+
+- authorized_projects
+- background_migration
+- create_gpg_signature
+- delete_merged_branches
+- delete_user
+- email_receiver
+- emails_on_push
+- expire_build_instance_artifacts
+- git_garbage_collect
+- gitlab_shell
+- group_destroy
+- invalid_gpg_signature_update
+- irker
+- merge
+- namespaceless_project_destroy
+- new_issue
+- new_merge_request
+- new_note
+- pages
+- post_receive
+- process_commit
+- project_cache
+- project_destroy
+- project_export
+- project_migrate_hashed_storage
+- project_service
+- propagate_service_template
+- reactive_caching
+- rebase
+- repository_fork
+- repository_import
+- storage_migrator
+- system_hook_push
+- update_merge_requests
+- update_user_activity
+- upload_checksum
+- web_hook
diff --git a/app/workers/background_migration_worker.rb b/app/workers/background_migration_worker.rb
index aeb3bc019b9..376703f6319 100644
--- a/app/workers/background_migration_worker.rb
+++ b/app/workers/background_migration_worker.rb
@@ -1,10 +1,53 @@
class BackgroundMigrationWorker
include ApplicationWorker
+ # The minimum amount of time between processing two jobs of the same migration
+ # class.
+ #
+ # This interval is set to 5 minutes so autovacuuming and other maintenance
+ # related tasks have plenty of time to clean up after a migration has been
+ # performed.
+ MIN_INTERVAL = 5.minutes.to_i
+
# Performs the background migration.
#
# See Gitlab::BackgroundMigration.perform for more information.
+ #
+ # class_name - The class name of the background migration to run.
+ # arguments - The arguments to pass to the migration class.
def perform(class_name, arguments = [])
- Gitlab::BackgroundMigration.perform(class_name, arguments)
+ should_perform, ttl = perform_and_ttl(class_name)
+
+ if should_perform
+ Gitlab::BackgroundMigration.perform(class_name, arguments)
+ else
+ # If the lease could not be obtained this means either another process is
+ # running a migration of this class or we ran one recently. In this case
+ # we'll reschedule the job in such a way that it is picked up again around
+ # the time the lease expires.
+ self.class.perform_in(ttl || MIN_INTERVAL, class_name, arguments)
+ end
+ end
+
+ def perform_and_ttl(class_name)
+ if always_perform?
+ # In test environments `perform_in` will run right away. This can then
+ # lead to stack level errors in the above `#perform`. To work around this
+ # we'll just perform the migration right away in the test environment.
+ [true, nil]
+ else
+ lease = lease_for(class_name)
+
+ [lease.try_obtain, lease.ttl]
+ end
+ end
+
+ def lease_for(class_name)
+ Gitlab::ExclusiveLease
+ .new("#{self.class.name}:#{class_name}", timeout: MIN_INTERVAL)
+ end
+
+ def always_perform?
+ Rails.env.test?
end
end
diff --git a/app/workers/build_finished_worker.rb b/app/workers/build_finished_worker.rb
index 5efa9180f5e..97d80305bec 100644
--- a/app/workers/build_finished_worker.rb
+++ b/app/workers/build_finished_worker.rb
@@ -2,7 +2,7 @@ class BuildFinishedWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :processing
+ queue_namespace :pipeline_processing
def perform(build_id)
Ci::Build.find_by(id: build_id).try do |build|
diff --git a/app/workers/build_hooks_worker.rb b/app/workers/build_hooks_worker.rb
index 6705a1c2709..cbfca8c342c 100644
--- a/app/workers/build_hooks_worker.rb
+++ b/app/workers/build_hooks_worker.rb
@@ -2,7 +2,7 @@ class BuildHooksWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :hooks
+ queue_namespace :pipeline_hooks
def perform(build_id)
Ci::Build.find_by(id: build_id)
diff --git a/app/workers/build_queue_worker.rb b/app/workers/build_queue_worker.rb
index fc775a84dc0..e4f4e6c1d9e 100644
--- a/app/workers/build_queue_worker.rb
+++ b/app/workers/build_queue_worker.rb
@@ -2,7 +2,7 @@ class BuildQueueWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :processing
+ queue_namespace :pipeline_processing
def perform(build_id)
Ci::Build.find_by(id: build_id).try do |build|
diff --git a/app/workers/build_success_worker.rb b/app/workers/build_success_worker.rb
index ec049821ad7..4b9097bc5e4 100644
--- a/app/workers/build_success_worker.rb
+++ b/app/workers/build_success_worker.rb
@@ -2,7 +2,7 @@ class BuildSuccessWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :processing
+ queue_namespace :pipeline_processing
def perform(build_id)
Ci::Build.find_by(id: build_id).try do |build|
diff --git a/app/workers/check_gcp_project_billing_worker.rb b/app/workers/check_gcp_project_billing_worker.rb
new file mode 100644
index 00000000000..557af14ee57
--- /dev/null
+++ b/app/workers/check_gcp_project_billing_worker.rb
@@ -0,0 +1,59 @@
+require 'securerandom'
+
+class CheckGcpProjectBillingWorker
+ include ApplicationWorker
+ include ClusterQueue
+
+ LEASE_TIMEOUT = 15.seconds.to_i
+ SESSION_KEY_TIMEOUT = 5.minutes
+ BILLING_TIMEOUT = 1.hour
+
+ def self.get_session_token(token_key)
+ Gitlab::Redis::SharedState.with do |redis|
+ redis.get(get_redis_session_key(token_key))
+ end
+ end
+
+ def self.store_session_token(token)
+ generate_token_key.tap do |token_key|
+ Gitlab::Redis::SharedState.with do |redis|
+ redis.set(get_redis_session_key(token_key), token, ex: SESSION_KEY_TIMEOUT)
+ end
+ end
+ end
+
+ def self.redis_shared_state_key_for(token)
+ "gitlab:gcp:#{token.hash}:billing_enabled"
+ end
+
+ def perform(token_key)
+ return unless token_key
+
+ token = self.get_session_token(token_key)
+ return unless token
+ return unless try_obtain_lease_for(token)
+
+ billing_enabled_projects = CheckGcpProjectBillingService.new.execute(token)
+ Gitlab::Redis::SharedState.with do |redis|
+ redis.set(self.class.redis_shared_state_key_for(token),
+ !billing_enabled_projects.empty?,
+ ex: BILLING_TIMEOUT)
+ end
+ end
+
+ private
+
+ def self.generate_token_key
+ SecureRandom.uuid
+ end
+
+ def self.get_redis_session_key(token_key)
+ "gitlab:gcp:session:#{token_key}"
+ end
+
+ def try_obtain_lease_for(token)
+ Gitlab::ExclusiveLease
+ .new("check_gcp_project_billing_worker:#{token.hash}", timeout: LEASE_TIMEOUT)
+ .try_obtain
+ end
+end
diff --git a/app/workers/concerns/application_worker.rb b/app/workers/concerns/application_worker.rb
index 9c3bdabc49e..37586e161c9 100644
--- a/app/workers/concerns/application_worker.rb
+++ b/app/workers/concerns/application_worker.rb
@@ -3,13 +3,23 @@ Sidekiq::Worker.extend ActiveSupport::Concern
module ApplicationWorker
extend ActiveSupport::Concern
- include Sidekiq::Worker
+ include Sidekiq::Worker # rubocop:disable Cop/IncludeSidekiqWorker
included do
- sidekiq_options queue: base_queue_name
+ set_queue
end
module ClassMethods
+ def inherited(subclass)
+ subclass.set_queue
+ end
+
+ def set_queue
+ queue_name = [queue_namespace, base_queue_name].compact.join(':')
+
+ sidekiq_options queue: queue_name # rubocop:disable Cop/SidekiqOptionsQueue
+ end
+
def base_queue_name
name
.sub(/\AGitlab::/, '')
@@ -18,6 +28,16 @@ module ApplicationWorker
.tr('/', '_')
end
+ def queue_namespace(new_namespace = nil)
+ if new_namespace
+ sidekiq_options queue_namespace: new_namespace
+
+ set_queue
+ else
+ get_sidekiq_options['queue_namespace']&.to_s
+ end
+ end
+
def queue
get_sidekiq_options['queue'].to_s
end
diff --git a/app/workers/concerns/cluster_queue.rb b/app/workers/concerns/cluster_queue.rb
index a5074d13220..24b9f145220 100644
--- a/app/workers/concerns/cluster_queue.rb
+++ b/app/workers/concerns/cluster_queue.rb
@@ -5,6 +5,6 @@ module ClusterQueue
extend ActiveSupport::Concern
included do
- sidekiq_options queue: :gcp_cluster
+ queue_namespace :gcp_cluster
end
end
diff --git a/app/workers/concerns/cronjob_queue.rb b/app/workers/concerns/cronjob_queue.rb
index e918bb011e0..b6581779f6a 100644
--- a/app/workers/concerns/cronjob_queue.rb
+++ b/app/workers/concerns/cronjob_queue.rb
@@ -4,6 +4,7 @@ module CronjobQueue
extend ActiveSupport::Concern
included do
- sidekiq_options queue: :cronjob, retry: false
+ queue_namespace :cronjob
+ sidekiq_options retry: false
end
end
diff --git a/app/workers/concerns/gitlab/github_import/queue.rb b/app/workers/concerns/gitlab/github_import/queue.rb
index a2bee361b86..22c2ce458e8 100644
--- a/app/workers/concerns/gitlab/github_import/queue.rb
+++ b/app/workers/concerns/gitlab/github_import/queue.rb
@@ -4,12 +4,14 @@ module Gitlab
extend ActiveSupport::Concern
included do
+ queue_namespace :github_importer
+
# If a job produces an error it may block a stage from advancing
# forever. To prevent this from happening we prevent jobs from going to
# the dead queue. This does mean some resources may not be imported, but
# this is better than a project being stuck in the "import" state
# forever.
- sidekiq_options queue: 'github_importer', dead: false, retry: 5
+ sidekiq_options dead: false, retry: 5
end
end
end
diff --git a/app/workers/concerns/new_issuable.rb b/app/workers/concerns/new_issuable.rb
index eb0d6c9c36c..526ed0bad07 100644
--- a/app/workers/concerns/new_issuable.rb
+++ b/app/workers/concerns/new_issuable.rb
@@ -9,15 +9,15 @@ module NewIssuable
end
def set_user(user_id)
- @user = User.find_by(id: user_id)
+ @user = User.find_by(id: user_id) # rubocop:disable Gitlab/ModuleWithInstanceVariables
- log_error(User, user_id) unless @user
+ log_error(User, user_id) unless @user # rubocop:disable Gitlab/ModuleWithInstanceVariables
end
def set_issuable(issuable_id)
- @issuable = issuable_class.find_by(id: issuable_id)
+ @issuable = issuable_class.find_by(id: issuable_id) # rubocop:disable Gitlab/ModuleWithInstanceVariables
- log_error(issuable_class, issuable_id) unless @issuable
+ log_error(issuable_class, issuable_id) unless @issuable # rubocop:disable Gitlab/ModuleWithInstanceVariables
end
def log_error(record_class, record_id)
diff --git a/app/workers/concerns/pipeline_queue.rb b/app/workers/concerns/pipeline_queue.rb
index ddf45b91345..e77093a6902 100644
--- a/app/workers/concerns/pipeline_queue.rb
+++ b/app/workers/concerns/pipeline_queue.rb
@@ -5,14 +5,6 @@ module PipelineQueue
extend ActiveSupport::Concern
included do
- sidekiq_options queue: 'pipeline_default'
- end
-
- class_methods do
- def enqueue_in(group:)
- raise ArgumentError, 'Unspecified queue group!' if group.empty?
-
- sidekiq_options queue: "pipeline_#{group}"
- end
+ queue_namespace :pipeline_default
end
end
diff --git a/app/workers/concerns/project_import_options.rb b/app/workers/concerns/project_import_options.rb
new file mode 100644
index 00000000000..10b971344f7
--- /dev/null
+++ b/app/workers/concerns/project_import_options.rb
@@ -0,0 +1,23 @@
+module ProjectImportOptions
+ extend ActiveSupport::Concern
+
+ included do
+ IMPORT_RETRY_COUNT = 5
+
+ sidekiq_options retry: IMPORT_RETRY_COUNT, status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION
+
+ # We only want to mark the project as failed once we exhausted all retries
+ sidekiq_retries_exhausted do |job|
+ project = Project.find(job['args'].first)
+
+ action = if project.forked?
+ "fork"
+ else
+ "import"
+ end
+
+ project.mark_import_as_failed("Every #{action} attempt has failed: #{job['error_message']}. Please try again.")
+ Sidekiq.logger.warn "Failed #{job['class']} with #{job['args']}: #{job['error_message']}"
+ end
+ end
+end
diff --git a/app/workers/concerns/project_start_import.rb b/app/workers/concerns/project_start_import.rb
index 0704ebbb0fd..4e55a1ee3d6 100644
--- a/app/workers/concerns/project_start_import.rb
+++ b/app/workers/concerns/project_start_import.rb
@@ -1,3 +1,4 @@
+# Used in EE by mirroring
module ProjectStartImport
def start(project)
if project.import_started? && project.import_jid == self.jid
diff --git a/app/workers/concerns/repository_check_queue.rb b/app/workers/concerns/repository_check_queue.rb
index a597321ccf4..43fb66c31b0 100644
--- a/app/workers/concerns/repository_check_queue.rb
+++ b/app/workers/concerns/repository_check_queue.rb
@@ -3,6 +3,8 @@ module RepositoryCheckQueue
extend ActiveSupport::Concern
included do
- sidekiq_options queue: :repository_check, retry: false
+ queue_namespace :repository_check
+
+ sidekiq_options retry: false
end
end
diff --git a/app/workers/create_pipeline_worker.rb b/app/workers/create_pipeline_worker.rb
index 00cd7b85b9f..c3ac35e54f5 100644
--- a/app/workers/create_pipeline_worker.rb
+++ b/app/workers/create_pipeline_worker.rb
@@ -2,7 +2,7 @@ class CreatePipelineWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :creation
+ queue_namespace :pipeline_creation
def perform(project_id, user_id, ref, source, params = {})
project = Project.find(project_id)
diff --git a/app/workers/expire_job_cache_worker.rb b/app/workers/expire_job_cache_worker.rb
index a591e2da519..7217364a9f2 100644
--- a/app/workers/expire_job_cache_worker.rb
+++ b/app/workers/expire_job_cache_worker.rb
@@ -2,7 +2,7 @@ class ExpireJobCacheWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :cache
+ queue_namespace :pipeline_cache
def perform(job_id)
job = CommitStatus.joins(:pipeline, :project).find_by(id: job_id)
diff --git a/app/workers/expire_pipeline_cache_worker.rb b/app/workers/expire_pipeline_cache_worker.rb
index a3ac32b437d..db73d37868a 100644
--- a/app/workers/expire_pipeline_cache_worker.rb
+++ b/app/workers/expire_pipeline_cache_worker.rb
@@ -2,7 +2,7 @@ class ExpirePipelineCacheWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :cache
+ queue_namespace :pipeline_cache
def perform(pipeline_id)
pipeline = Ci::Pipeline.find_by(id: pipeline_id)
@@ -13,7 +13,7 @@ class ExpirePipelineCacheWorker
store.touch(project_pipelines_path(project))
store.touch(project_pipeline_path(project, pipeline))
- store.touch(commit_pipelines_path(project, pipeline.commit)) if pipeline.commit
+ store.touch(commit_pipelines_path(project, pipeline.commit)) unless pipeline.commit.nil?
store.touch(new_merge_request_pipelines_path(project))
each_pipelines_merge_request_path(project, pipeline) do |path|
store.touch(path)
diff --git a/app/workers/gitlab/github_import/advance_stage_worker.rb b/app/workers/gitlab/github_import/advance_stage_worker.rb
index 400396d5755..f7f498af840 100644
--- a/app/workers/gitlab/github_import/advance_stage_worker.rb
+++ b/app/workers/gitlab/github_import/advance_stage_worker.rb
@@ -9,7 +9,7 @@ module Gitlab
class AdvanceStageWorker
include ApplicationWorker
- sidekiq_options queue: 'github_importer_advance_stage', dead: false
+ sidekiq_options dead: false
INTERVAL = 30.seconds.to_i
diff --git a/app/workers/pages_worker.rb b/app/workers/pages_worker.rb
index 62f733c02fc..3ec81d040b4 100644
--- a/app/workers/pages_worker.rb
+++ b/app/workers/pages_worker.rb
@@ -1,7 +1,7 @@
class PagesWorker
include ApplicationWorker
- sidekiq_options queue: :pages, retry: false
+ sidekiq_options retry: false
def perform(action, *arg)
send(action, *arg) # rubocop:disable GitlabSecurity/PublicSend
diff --git a/app/workers/pipeline_hooks_worker.rb b/app/workers/pipeline_hooks_worker.rb
index 661c29efe88..c94918ff4ee 100644
--- a/app/workers/pipeline_hooks_worker.rb
+++ b/app/workers/pipeline_hooks_worker.rb
@@ -2,7 +2,7 @@ class PipelineHooksWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :hooks
+ queue_namespace :pipeline_hooks
def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id)
diff --git a/app/workers/pipeline_process_worker.rb b/app/workers/pipeline_process_worker.rb
index 07dbf6a971e..24424b3f472 100644
--- a/app/workers/pipeline_process_worker.rb
+++ b/app/workers/pipeline_process_worker.rb
@@ -2,7 +2,7 @@ class PipelineProcessWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :processing
+ queue_namespace :pipeline_processing
def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id)
diff --git a/app/workers/pipeline_success_worker.rb b/app/workers/pipeline_success_worker.rb
index 68c40a259e1..2ab0739a17f 100644
--- a/app/workers/pipeline_success_worker.rb
+++ b/app/workers/pipeline_success_worker.rb
@@ -2,7 +2,7 @@ class PipelineSuccessWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :processing
+ queue_namespace :pipeline_processing
def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id).try do |pipeline|
diff --git a/app/workers/pipeline_update_worker.rb b/app/workers/pipeline_update_worker.rb
index 24a8a9fbed5..fc9da2d45b1 100644
--- a/app/workers/pipeline_update_worker.rb
+++ b/app/workers/pipeline_update_worker.rb
@@ -2,7 +2,7 @@ class PipelineUpdateWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :processing
+ queue_namespace :pipeline_processing
def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id)
diff --git a/app/workers/rebase_worker.rb b/app/workers/rebase_worker.rb
new file mode 100644
index 00000000000..090987778a2
--- /dev/null
+++ b/app/workers/rebase_worker.rb
@@ -0,0 +1,12 @@
+class RebaseWorker
+ include ApplicationWorker
+
+ def perform(merge_request_id, current_user_id)
+ current_user = User.find(current_user_id)
+ merge_request = MergeRequest.find(merge_request_id)
+
+ MergeRequests::RebaseService
+ .new(merge_request.source_project, current_user)
+ .execute(merge_request)
+ end
+end
diff --git a/app/workers/repository_fork_worker.rb b/app/workers/repository_fork_worker.rb
index a07ef1705a1..d1c57b82681 100644
--- a/app/workers/repository_fork_worker.rb
+++ b/app/workers/repository_fork_worker.rb
@@ -1,11 +1,8 @@
class RepositoryForkWorker
- ForkError = Class.new(StandardError)
-
include ApplicationWorker
include Gitlab::ShellAdapter
include ProjectStartImport
-
- sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION
+ include ProjectImportOptions
def perform(project_id, forked_from_repository_storage_path, source_disk_path)
project = Project.find(project_id)
@@ -18,20 +15,12 @@ class RepositoryForkWorker
result = gitlab_shell.fork_repository(forked_from_repository_storage_path, source_disk_path,
project.repository_storage_path, project.disk_path)
- raise ForkError, "Unable to fork project #{project_id} for repository #{source_disk_path} -> #{project.disk_path}" unless result
+ raise "Unable to fork project #{project_id} for repository #{source_disk_path} -> #{project.disk_path}" unless result
project.repository.after_import
- raise ForkError, "Project #{project_id} had an invalid repository after fork" unless project.valid_repo?
+ raise "Project #{project_id} had an invalid repository after fork" unless project.valid_repo?
project.import_finish
- rescue ForkError => ex
- fail_fork(project, ex.message)
- raise
- rescue => ex
- return unless project
-
- fail_fork(project, ex.message)
- raise ForkError, "#{ex.class} #{ex.message}"
end
private
@@ -42,9 +31,4 @@ class RepositoryForkWorker
Rails.logger.info("Project #{project.full_path} was in inconsistent state (#{project.import_status}) while forking.")
false
end
-
- def fail_fork(project, message)
- Rails.logger.error(message)
- project.mark_import_as_failed(message)
- end
end
diff --git a/app/workers/repository_import_worker.rb b/app/workers/repository_import_worker.rb
index 55715c83cb1..31e2798c36b 100644
--- a/app/workers/repository_import_worker.rb
+++ b/app/workers/repository_import_worker.rb
@@ -1,11 +1,8 @@
class RepositoryImportWorker
- ImportError = Class.new(StandardError)
-
include ApplicationWorker
include ExceptionBacktrace
include ProjectStartImport
-
- sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION
+ include ProjectImportOptions
def perform(project_id)
project = Project.find(project_id)
@@ -23,17 +20,9 @@ class RepositoryImportWorker
# to those importers to mark the import process as complete.
return if service.async?
- raise ImportError, result[:message] if result[:status] == :error
+ raise result[:message] if result[:status] == :error
project.after_import
- rescue ImportError => ex
- fail_import(project, ex.message)
- raise
- rescue => ex
- return unless project
-
- fail_import(project, ex.message)
- raise ImportError, "#{ex.class} #{ex.message}"
end
private
@@ -44,8 +33,4 @@ class RepositoryImportWorker
Rails.logger.info("Project #{project.full_path} was in inconsistent state (#{project.import_status}) while importing.")
false
end
-
- def fail_import(project, message)
- project.mark_import_as_failed(message)
- end
end
diff --git a/app/workers/run_pipeline_schedule_worker.rb b/app/workers/run_pipeline_schedule_worker.rb
new file mode 100644
index 00000000000..8f5138fc873
--- /dev/null
+++ b/app/workers/run_pipeline_schedule_worker.rb
@@ -0,0 +1,22 @@
+class RunPipelineScheduleWorker
+ include ApplicationWorker
+ include PipelineQueue
+
+ queue_namespace :pipeline_creation
+
+ def perform(schedule_id, user_id)
+ schedule = Ci::PipelineSchedule.find_by(id: schedule_id)
+ user = User.find_by(id: user_id)
+
+ return unless schedule && user
+
+ run_pipeline_schedule(schedule, user)
+ end
+
+ def run_pipeline_schedule(schedule, user)
+ Ci::CreatePipelineService.new(schedule.project,
+ user,
+ ref: schedule.ref)
+ .execute(:schedule, ignore_skip_ci: true, save_on_errors: false, schedule: schedule)
+ end
+end
diff --git a/app/workers/stage_update_worker.rb b/app/workers/stage_update_worker.rb
index 69f2318d83b..e4b683fca33 100644
--- a/app/workers/stage_update_worker.rb
+++ b/app/workers/stage_update_worker.rb
@@ -2,7 +2,7 @@ class StageUpdateWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :processing
+ queue_namespace :pipeline_processing
def perform(stage_id)
Ci::Stage.find_by(id: stage_id).try do |stage|
diff --git a/app/workers/stuck_merge_jobs_worker.rb b/app/workers/stuck_merge_jobs_worker.rb
index 36d2a2e6466..16394293c79 100644
--- a/app/workers/stuck_merge_jobs_worker.rb
+++ b/app/workers/stuck_merge_jobs_worker.rb
@@ -23,7 +23,12 @@ class StuckMergeJobsWorker
merge_requests = MergeRequest.where(id: completed_ids)
merge_requests.where.not(merge_commit_sha: nil).update_all(state: :merged)
- merge_requests.where(merge_commit_sha: nil).update_all(state: :opened, merge_jid: nil)
+
+ merge_requests_to_reopen = merge_requests.where(merge_commit_sha: nil)
+
+ # Do not reopen merge requests using direct queries.
+ # We rely on state machine callbacks to update head_pipeline_id
+ merge_requests_to_reopen.each(&:unlock_mr)
Rails.logger.info("Updated state of locked merge jobs. JIDs: #{completed_jids.join(', ')}")
end
diff --git a/app/workers/update_head_pipeline_for_merge_request_worker.rb b/app/workers/update_head_pipeline_for_merge_request_worker.rb
index 0a2e9b63578..f09d89aa170 100644
--- a/app/workers/update_head_pipeline_for_merge_request_worker.rb
+++ b/app/workers/update_head_pipeline_for_merge_request_worker.rb
@@ -1,15 +1,25 @@
class UpdateHeadPipelineForMergeRequestWorker
include ApplicationWorker
-
- sidekiq_options queue: 'pipeline_default'
+ include PipelineQueue
def perform(merge_request_id)
merge_request = MergeRequest.find(merge_request_id)
pipeline = Ci::Pipeline.where(project: merge_request.source_project, ref: merge_request.source_branch).last
return unless pipeline && pipeline.latest?
- raise ArgumentError, 'merge request sha does not equal pipeline sha' if merge_request.diff_head_sha != pipeline.sha
+
+ if merge_request.diff_head_sha != pipeline.sha
+ log_error_message_for(merge_request)
+
+ return
+ end
merge_request.update_attribute(:head_pipeline_id, pipeline.id)
end
+
+ def log_error_message_for(merge_request)
+ Rails.logger.error(
+ "Outdated head pipeline for active merge request: id=#{merge_request.id}, source_branch=#{merge_request.source_branch}, diff_head_sha=#{merge_request.diff_head_sha}"
+ )
+ end
end