diff options
Diffstat (limited to 'app/workers')
-rw-r--r-- | app/workers/authorized_projects_worker.rb | 2 | ||||
-rw-r--r-- | app/workers/build_coverage_worker.rb | 3 | ||||
-rw-r--r-- | app/workers/build_email_worker.rb | 20 | ||||
-rw-r--r-- | app/workers/gitlab_usage_ping_worker.rb | 31 | ||||
-rw-r--r-- | app/workers/post_receive.rb | 12 | ||||
-rw-r--r-- | app/workers/process_commit_worker.rb | 2 | ||||
-rw-r--r-- | app/workers/repository_import_worker.rb | 5 | ||||
-rw-r--r-- | app/workers/schedule_update_user_activity_worker.rb | 10 | ||||
-rw-r--r-- | app/workers/stuck_ci_builds_worker.rb | 19 | ||||
-rw-r--r-- | app/workers/stuck_ci_jobs_worker.rb | 59 | ||||
-rw-r--r-- | app/workers/stuck_import_jobs_worker.rb | 37 | ||||
-rw-r--r-- | app/workers/system_hook_push_worker.rb | 8 | ||||
-rw-r--r-- | app/workers/trigger_schedule_worker.rb | 18 | ||||
-rw-r--r-- | app/workers/update_merge_requests_worker.rb | 3 | ||||
-rw-r--r-- | app/workers/update_user_activity_worker.rb | 26 | ||||
-rw-r--r-- | app/workers/upload_checksum_worker.rb | 12 |
16 files changed, 212 insertions, 55 deletions
diff --git a/app/workers/authorized_projects_worker.rb b/app/workers/authorized_projects_worker.rb index 0e20df506a3..13207a8bc71 100644 --- a/app/workers/authorized_projects_worker.rb +++ b/app/workers/authorized_projects_worker.rb @@ -10,7 +10,7 @@ class AuthorizedProjectsWorker end def self.bulk_perform_async(args_list) - Sidekiq::Client.push_bulk('class' => self, 'args' => args_list) + Sidekiq::Client.push_bulk('class' => self, 'queue' => sidekiq_options['queue'], 'args' => args_list) end def perform(user_id) diff --git a/app/workers/build_coverage_worker.rb b/app/workers/build_coverage_worker.rb index def0ab1dde1..f7ae996bb17 100644 --- a/app/workers/build_coverage_worker.rb +++ b/app/workers/build_coverage_worker.rb @@ -3,7 +3,6 @@ class BuildCoverageWorker include BuildQueue def perform(build_id) - Ci::Build.find_by(id: build_id) - .try(:update_coverage) + Ci::Build.find_by(id: build_id)&.update_coverage end end diff --git a/app/workers/build_email_worker.rb b/app/workers/build_email_worker.rb deleted file mode 100644 index 5fdb1f2baa0..00000000000 --- a/app/workers/build_email_worker.rb +++ /dev/null @@ -1,20 +0,0 @@ -class BuildEmailWorker - include Sidekiq::Worker - include BuildQueue - - def perform(build_id, recipients, push_data) - recipients.each do |recipient| - begin - case push_data['build_status'] - when 'success' - Notify.build_success_email(build_id, recipient).deliver_now - when 'failed' - Notify.build_fail_email(build_id, recipient).deliver_now - end - # These are input errors and won't be corrected even if Sidekiq retries - rescue Net::SMTPFatalError, Net::SMTPSyntaxError => e - logger.info("Failed to send e-mail for project '#{push_data['project_name']}' to #{recipient}: #{e}") - end - end - end -end diff --git a/app/workers/gitlab_usage_ping_worker.rb b/app/workers/gitlab_usage_ping_worker.rb new file mode 100644 index 00000000000..2f02235b0ac --- /dev/null +++ b/app/workers/gitlab_usage_ping_worker.rb @@ -0,0 +1,31 @@ +class GitlabUsagePingWorker + LEASE_TIMEOUT = 86400 + + include Sidekiq::Worker + include CronjobQueue + include HTTParty + + def perform + return unless current_application_settings.usage_ping_enabled + + # Multiple Sidekiq workers could run this. We should only do this at most once a day. + return unless try_obtain_lease + + begin + HTTParty.post(url, + body: Gitlab::UsageData.to_json(force_refresh: true), + headers: { 'Content-type' => 'application/json' } + ) + rescue HTTParty::Error => e + Rails.logger.info "Unable to contact GitLab, Inc.: #{e}" + end + end + + def try_obtain_lease + Gitlab::ExclusiveLease.new('gitlab_usage_ping_worker:ping', timeout: LEASE_TIMEOUT).try_obtain + end + + def url + 'https://version.gitlab.com/usage_data' + end +end diff --git a/app/workers/post_receive.rb b/app/workers/post_receive.rb index 2fff6b0105d..015a41b6e82 100644 --- a/app/workers/post_receive.rb +++ b/app/workers/post_receive.rb @@ -3,20 +3,16 @@ class PostReceive include DedicatedSidekiqQueue def perform(repo_path, identifier, changes) - if path = Gitlab.config.repositories.storages.find { |p| repo_path.start_with?(p[1].to_s) } - repo_path.gsub!(path[1].to_s, "") - else - log("Check gitlab.yml config for correct repositories.storages values. No repository storage path matches \"#{repo_path}\"") - end + repo_relative_path = Gitlab::RepoPath.strip_storage_path(repo_path) changes = Base64.decode64(changes) unless changes.include?(' ') # Use Sidekiq.logger so arguments can be correlated with execution # time and thread ID's. Sidekiq.logger.info "changes: #{changes.inspect}" if ENV['SIDEKIQ_LOG_ARGUMENTS'] - post_received = Gitlab::GitPostReceive.new(repo_path, identifier, changes) + post_received = Gitlab::GitPostReceive.new(repo_relative_path, identifier, changes) if post_received.project.nil? - log("Triggered hook for non-existing project with full path \"#{repo_path}\"") + log("Triggered hook for non-existing project with full path \"#{repo_relative_path}\"") return false end @@ -25,7 +21,7 @@ class PostReceive elsif post_received.regular_project? process_project_changes(post_received) else - log("Triggered hook for unidentifiable repository type with full path \"#{repo_path}\"") + log("Triggered hook for unidentifiable repository type with full path \"#{repo_relative_path}\"") false end end diff --git a/app/workers/process_commit_worker.rb b/app/workers/process_commit_worker.rb index e9a5bd7f24e..2f7967cf531 100644 --- a/app/workers/process_commit_worker.rb +++ b/app/workers/process_commit_worker.rb @@ -53,6 +53,8 @@ class ProcessCommitWorker def update_issue_metrics(commit, author) mentioned_issues = commit.all_references(author).issues + return if mentioned_issues.empty? + Issue::Metrics.where(issue_id: mentioned_issues.map(&:id), first_mentioned_in_commit_at: nil). update_all(first_mentioned_in_commit_at: commit.committed_date) end diff --git a/app/workers/repository_import_worker.rb b/app/workers/repository_import_worker.rb index c8a77e21c12..b33ba2ed7c1 100644 --- a/app/workers/repository_import_worker.rb +++ b/app/workers/repository_import_worker.rb @@ -1,8 +1,9 @@ class RepositoryImportWorker include Sidekiq::Worker - include Gitlab::ShellAdapter include DedicatedSidekiqQueue + sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_EXPIRATION + attr_accessor :project, :current_user def perform(project_id) @@ -13,7 +14,7 @@ class RepositoryImportWorker import_url: @project.import_url, path: @project.path_with_namespace) - project.update_column(:import_error, nil) + project.update_columns(import_jid: self.jid, import_error: nil) result = Projects::ImportService.new(project, current_user).execute diff --git a/app/workers/schedule_update_user_activity_worker.rb b/app/workers/schedule_update_user_activity_worker.rb new file mode 100644 index 00000000000..6c2c3e437f3 --- /dev/null +++ b/app/workers/schedule_update_user_activity_worker.rb @@ -0,0 +1,10 @@ +class ScheduleUpdateUserActivityWorker + include Sidekiq::Worker + include CronjobQueue + + def perform(batch_size = 500) + Gitlab::UserActivities.new.each_slice(batch_size) do |batch| + UpdateUserActivityWorker.perform_async(Hash[batch]) + end + end +end diff --git a/app/workers/stuck_ci_builds_worker.rb b/app/workers/stuck_ci_builds_worker.rb deleted file mode 100644 index b70df5a1afa..00000000000 --- a/app/workers/stuck_ci_builds_worker.rb +++ /dev/null @@ -1,19 +0,0 @@ -class StuckCiBuildsWorker - include Sidekiq::Worker - include CronjobQueue - - BUILD_STUCK_TIMEOUT = 1.day - - def perform - Rails.logger.info 'Cleaning stuck builds' - - builds = Ci::Build.joins(:project).running_or_pending.where('ci_builds.updated_at < ?', BUILD_STUCK_TIMEOUT.ago) - builds.find_each(batch_size: 50).each do |build| - Rails.logger.debug "Dropping stuck #{build.status} build #{build.id} for runner #{build.runner_id}" - build.drop - end - - # Update builds that failed to drop - builds.update_all(status: 'failed') - end -end diff --git a/app/workers/stuck_ci_jobs_worker.rb b/app/workers/stuck_ci_jobs_worker.rb new file mode 100644 index 00000000000..ae8c980c9e4 --- /dev/null +++ b/app/workers/stuck_ci_jobs_worker.rb @@ -0,0 +1,59 @@ +class StuckCiJobsWorker + include Sidekiq::Worker + include CronjobQueue + + EXCLUSIVE_LEASE_KEY = 'stuck_ci_builds_worker_lease'.freeze + + BUILD_RUNNING_OUTDATED_TIMEOUT = 1.hour + BUILD_PENDING_OUTDATED_TIMEOUT = 1.day + BUILD_PENDING_STUCK_TIMEOUT = 1.hour + + def perform + return unless try_obtain_lease + + Rails.logger.info "#{self.class}: Cleaning stuck builds" + + drop :running, BUILD_RUNNING_OUTDATED_TIMEOUT + drop :pending, BUILD_PENDING_OUTDATED_TIMEOUT + drop_stuck :pending, BUILD_PENDING_STUCK_TIMEOUT + + remove_lease + end + + private + + def try_obtain_lease + @uuid = Gitlab::ExclusiveLease.new(EXCLUSIVE_LEASE_KEY, timeout: 30.minutes).try_obtain + end + + def remove_lease + Gitlab::ExclusiveLease.cancel(EXCLUSIVE_LEASE_KEY, @uuid) + end + + def drop(status, timeout) + search(status, timeout) do |build| + drop_build :outdated, build, status, timeout + end + end + + def drop_stuck(status, timeout) + search(status, timeout) do |build| + return unless build.stuck? + drop_build :stuck, build, status, timeout + end + end + + def search(status, timeout) + builds = Ci::Build.where(status: status).where('ci_builds.updated_at < ?', timeout.ago) + builds.joins(:project).includes(:tags, :runner, project: :namespace).find_each(batch_size: 50).each do |build| + yield(build) + end + end + + def drop_build(type, build, status, timeout) + Rails.logger.info "#{self.class}: Dropping #{type} build #{build.id} for runner #{build.runner_id} (status: #{status}, timeout: #{timeout})" + Gitlab::OptimisticLocking.retry_lock(build, 3) do |b| + b.drop + end + end +end diff --git a/app/workers/stuck_import_jobs_worker.rb b/app/workers/stuck_import_jobs_worker.rb new file mode 100644 index 00000000000..bfc5e667bb6 --- /dev/null +++ b/app/workers/stuck_import_jobs_worker.rb @@ -0,0 +1,37 @@ +class StuckImportJobsWorker + include Sidekiq::Worker + include CronjobQueue + + IMPORT_EXPIRATION = 15.hours.to_i + + def perform + stuck_projects.find_in_batches(batch_size: 500) do |group| + jids = group.map(&:import_jid) + + # Find the jobs that aren't currently running or that exceeded the threshold. + completed_jids = Gitlab::SidekiqStatus.completed_jids(jids) + + if completed_jids.any? + completed_ids = group.select { |project| completed_jids.include?(project.import_jid) }.map(&:id) + + fail_batch!(completed_jids, completed_ids) + end + end + end + + private + + def stuck_projects + Project.select('id, import_jid').with_import_status(:started).where.not(import_jid: nil) + end + + def fail_batch!(completed_jids, completed_ids) + Project.where(id: completed_ids).update_all(import_status: 'failed', import_error: error_message) + + Rails.logger.info("Marked stuck import jobs as failed. JIDs: #{completed_jids.join(', ')}") + end + + def error_message + "Import timed out. Import took longer than #{IMPORT_EXPIRATION} seconds" + end +end diff --git a/app/workers/system_hook_push_worker.rb b/app/workers/system_hook_push_worker.rb new file mode 100644 index 00000000000..e43bbe35de9 --- /dev/null +++ b/app/workers/system_hook_push_worker.rb @@ -0,0 +1,8 @@ +class SystemHookPushWorker + include Sidekiq::Worker + include DedicatedSidekiqQueue + + def perform(push_data, hook_id) + SystemHooksService.new.execute_hooks(push_data, hook_id) + end +end diff --git a/app/workers/trigger_schedule_worker.rb b/app/workers/trigger_schedule_worker.rb new file mode 100644 index 00000000000..9c1baf7e6c5 --- /dev/null +++ b/app/workers/trigger_schedule_worker.rb @@ -0,0 +1,18 @@ +class TriggerScheduleWorker + include Sidekiq::Worker + include CronjobQueue + + def perform + Ci::TriggerSchedule.active.where("next_run_at < ?", Time.now).find_each do |trigger_schedule| + begin + Ci::CreateTriggerRequestService.new.execute(trigger_schedule.project, + trigger_schedule.trigger, + trigger_schedule.ref) + rescue => e + Rails.logger.error "#{trigger_schedule.id}: Failed to trigger_schedule job: #{e.message}" + ensure + trigger_schedule.schedule_next_run! + end + end + end +end diff --git a/app/workers/update_merge_requests_worker.rb b/app/workers/update_merge_requests_worker.rb index acc4d858136..89ae17cef37 100644 --- a/app/workers/update_merge_requests_worker.rb +++ b/app/workers/update_merge_requests_worker.rb @@ -10,8 +10,5 @@ class UpdateMergeRequestsWorker return unless user MergeRequests::RefreshService.new(project, user).execute(oldrev, newrev, ref) - - push_data = Gitlab::DataBuilder::Push.build(project, user, oldrev, newrev, ref, []) - SystemHooksService.new.execute_hooks(push_data, :push_hooks) end end diff --git a/app/workers/update_user_activity_worker.rb b/app/workers/update_user_activity_worker.rb new file mode 100644 index 00000000000..b3c2f13aa33 --- /dev/null +++ b/app/workers/update_user_activity_worker.rb @@ -0,0 +1,26 @@ +class UpdateUserActivityWorker + include Sidekiq::Worker + include DedicatedSidekiqQueue + + def perform(pairs) + pairs = cast_data(pairs) + ids = pairs.keys + conditions = 'WHEN id = ? THEN ? ' * ids.length + + User.where(id: ids). + update_all([ + "last_activity_on = CASE #{conditions} ELSE last_activity_on END", + *pairs.to_a.flatten + ]) + + Gitlab::UserActivities.new.delete(*ids) + end + + private + + def cast_data(pairs) + pairs.each_with_object({}) do |(key, value), new_pairs| + new_pairs[key.to_i] = Time.at(value.to_i).to_s(:db) + end + end +end diff --git a/app/workers/upload_checksum_worker.rb b/app/workers/upload_checksum_worker.rb new file mode 100644 index 00000000000..78931f1258f --- /dev/null +++ b/app/workers/upload_checksum_worker.rb @@ -0,0 +1,12 @@ +class UploadChecksumWorker + include Sidekiq::Worker + include DedicatedSidekiqQueue + + def perform(upload_id) + upload = Upload.find(upload_id) + upload.calculate_checksum + upload.save! + rescue ActiveRecord::RecordNotFound + Rails.logger.error("UploadChecksumWorker: couldn't find upload #{upload_id}, skipping") + end +end |