diff options
Diffstat (limited to 'app/workers/concerns')
7 files changed, 96 insertions, 23 deletions
diff --git a/app/workers/concerns/application_worker.rb b/app/workers/concerns/application_worker.rb index 3399a4f9b57..03a0b5fae00 100644 --- a/app/workers/concerns/application_worker.rb +++ b/app/workers/concerns/application_worker.rb @@ -14,6 +14,7 @@ module ApplicationWorker LOGGING_EXTRA_KEY = 'extra' DEFAULT_DELAY_INTERVAL = 1 + SAFE_PUSH_BULK_LIMIT = 1000 included do set_queue @@ -54,6 +55,12 @@ module ApplicationWorker subclass.after_set_class_attribute { subclass.set_queue } end + def with_status + status_from_class = self.sidekiq_options_hash['status_expiration'] + + set(status_expiration: status_from_class || Gitlab::SidekiqStatus::DEFAULT_EXPIRATION) + end + def generated_queue_name Gitlab::SidekiqConfig::WorkerRouter.queue_name_from_worker_name(self) end @@ -130,29 +137,62 @@ module ApplicationWorker end end + def log_bulk_perform_async? + @log_bulk_perform_async + end + + def log_bulk_perform_async! + @log_bulk_perform_async = true + end + def queue_size Sidekiq::Queue.new(queue).size end def bulk_perform_async(args_list) - Sidekiq::Client.push_bulk('class' => self, 'args' => args_list) + if log_bulk_perform_async? + Sidekiq.logger.info('class' => self.name, 'args_list' => args_list, 'args_list_count' => args_list.length, 'message' => 'Inserting multiple jobs') + end + + do_push_bulk(args_list).tap do |job_ids| + if log_bulk_perform_async? + Sidekiq.logger.info('class' => self.name, 'jid_list' => job_ids, 'jid_list_count' => job_ids.length, 'message' => 'Completed JID insertion') + end + end end def bulk_perform_in(delay, args_list, batch_size: nil, batch_delay: nil) now = Time.now.to_i - schedule = now + delay.to_i + base_schedule_at = now + delay.to_i - if schedule <= now - raise ArgumentError, _('The schedule time must be in the future!') + if base_schedule_at <= now + raise ArgumentError, 'The schedule time must be in the future!' end + schedule_at = base_schedule_at + if batch_size && batch_delay - args_list.each_slice(batch_size.to_i).with_index do |args_batch, idx| - batch_schedule = schedule + idx * batch_delay.to_i - Sidekiq::Client.push_bulk('class' => self, 'args' => args_batch, 'at' => batch_schedule) + batch_size = batch_size.to_i + batch_delay = batch_delay.to_i + + raise ArgumentError, 'batch_size should be greater than 0' unless batch_size > 0 + raise ArgumentError, 'batch_delay should be greater than 0' unless batch_delay > 0 + + # build an array of schedules corresponding to each item in `args_list` + bulk_schedule_at = Array.new(args_list.size) do |index| + batch_number = index / batch_size + base_schedule_at + (batch_number * batch_delay) + end + + schedule_at = bulk_schedule_at + end + + if Feature.enabled?(:sidekiq_push_bulk_in_batches) + in_safe_limit_batches(args_list, schedule_at) do |args_batch, schedule_at_for_batch| + Sidekiq::Client.push_bulk('class' => self, 'args' => args_batch, 'at' => schedule_at_for_batch) end else - Sidekiq::Client.push_bulk('class' => self, 'args' => args_list, 'at' => schedule) + Sidekiq::Client.push_bulk('class' => self, 'args' => args_list, 'at' => schedule_at) end end @@ -161,5 +201,44 @@ module ApplicationWorker def delay_interval DEFAULT_DELAY_INTERVAL.seconds end + + private + + def do_push_bulk(args_list) + if Feature.enabled?(:sidekiq_push_bulk_in_batches) + in_safe_limit_batches(args_list) do |args_batch, _| + Sidekiq::Client.push_bulk('class' => self, 'args' => args_batch) + end + else + Sidekiq::Client.push_bulk('class' => self, 'args' => args_list) + end + end + + def in_safe_limit_batches(args_list, schedule_at = nil, safe_limit = SAFE_PUSH_BULK_LIMIT) + # `schedule_at` could be one of + # - nil. + # - a single Numeric that represents time, like `30.minutes.from_now.to_i`. + # - an array, where each element is a Numeric that reprsents time. + # - Each element in this array would correspond to the time at which + # - the job in `args_list` at the corresponding index needs to be scheduled. + + # In the case where `schedule_at` is an array of Numeric, it needs to be sliced + # in the same manner as the `args_list`, with each slice containing `safe_limit` + # number of elements. + schedule_at = schedule_at.each_slice(safe_limit).to_a if schedule_at.is_a?(Array) + + args_list.each_slice(safe_limit).with_index.flat_map do |args_batch, index| + schedule_at_for_batch = process_schedule_at_for_batch(schedule_at, index) + + yield(args_batch, schedule_at_for_batch) + end + end + + def process_schedule_at_for_batch(schedule_at, index) + return unless schedule_at + return schedule_at[index] if schedule_at.is_a?(Array) && schedule_at.all?(Array) + + schedule_at + end end end diff --git a/app/workers/concerns/gitlab/github_import/rescheduling_methods.rb b/app/workers/concerns/gitlab/github_import/rescheduling_methods.rb index eb1af0869bd..0a43a0fc4d2 100644 --- a/app/workers/concerns/gitlab/github_import/rescheduling_methods.rb +++ b/app/workers/concerns/gitlab/github_import/rescheduling_methods.rb @@ -8,9 +8,8 @@ module Gitlab # project_id - The ID of the GitLab project to import the note into. # hash - A Hash containing the details of the GitHub object to import. # notify_key - The Redis key to notify upon completion, if any. - # rubocop: disable CodeReuse/ActiveRecord def perform(project_id, hash, notify_key = nil) - project = Project.find_by(id: project_id) + project = Project.find_by_id(project_id) return notify_waiter(notify_key) unless project @@ -25,7 +24,6 @@ module Gitlab .perform_in(client.rate_limit_resets_in, project.id, hash, notify_key) end end - # rubocop: enable CodeReuse/ActiveRecord def try_import(*args) import(*args) diff --git a/app/workers/concerns/gitlab/github_import/stage_methods.rb b/app/workers/concerns/gitlab/github_import/stage_methods.rb index d7b4578af63..225716f6bf3 100644 --- a/app/workers/concerns/gitlab/github_import/stage_methods.rb +++ b/app/workers/concerns/gitlab/github_import/stage_methods.rb @@ -33,13 +33,13 @@ module Gitlab self.class.perform_in(client.rate_limit_resets_in, project.id) end - # rubocop: disable CodeReuse/ActiveRecord def find_project(id) # If the project has been marked as failed we want to bail out # automatically. - Project.joins_import_state.where(import_state: { status: :started }).find_by(id: id) + # rubocop: disable CodeReuse/ActiveRecord + Project.joins_import_state.where(import_state: { status: :started }).find_by_id(id) + # rubocop: enable CodeReuse/ActiveRecord end - # rubocop: enable CodeReuse/ActiveRecord def abort_on_failure false diff --git a/app/workers/concerns/gitlab/jira_import/import_worker.rb b/app/workers/concerns/gitlab/jira_import/import_worker.rb index 107b6e2e9be..d18b9ff023b 100644 --- a/app/workers/concerns/gitlab/jira_import/import_worker.rb +++ b/app/workers/concerns/gitlab/jira_import/import_worker.rb @@ -14,7 +14,7 @@ module Gitlab end def perform(project_id) - project = Project.find_by(id: project_id) # rubocop: disable CodeReuse/ActiveRecord + project = Project.find_by_id(project_id) return unless can_import?(project) diff --git a/app/workers/concerns/limited_capacity/worker.rb b/app/workers/concerns/limited_capacity/worker.rb index b4cdfda680f..bcedb4efcc0 100644 --- a/app/workers/concerns/limited_capacity/worker.rb +++ b/app/workers/concerns/limited_capacity/worker.rb @@ -47,7 +47,7 @@ module LimitedCapacity # would be occupied by a job that will be performed in the distant future. # We let the cron worker enqueue new jobs, this could be seen as our retry and # back off mechanism because the job might fail again if executed immediately. - sidekiq_options retry: 0 + sidekiq_options retry: 0, status_expiration: Gitlab::SidekiqStatus::DEFAULT_EXPIRATION deduplicate :none end diff --git a/app/workers/concerns/new_issuable.rb b/app/workers/concerns/new_issuable.rb index 482a74f49f7..d761f023cad 100644 --- a/app/workers/concerns/new_issuable.rb +++ b/app/workers/concerns/new_issuable.rb @@ -10,21 +10,17 @@ module NewIssuable user && issuable end - # rubocop: disable CodeReuse/ActiveRecord def set_user(user_id) - @user = User.find_by(id: user_id) # rubocop:disable Gitlab/ModuleWithInstanceVariables + @user = User.find_by_id(user_id) # rubocop:disable Gitlab/ModuleWithInstanceVariables log_error(User, user_id) unless @user # rubocop:disable Gitlab/ModuleWithInstanceVariables end - # rubocop: enable CodeReuse/ActiveRecord - # rubocop: disable CodeReuse/ActiveRecord def set_issuable(issuable_id) - @issuable = issuable_class.find_by(id: issuable_id) # rubocop:disable Gitlab/ModuleWithInstanceVariables + @issuable = issuable_class.find_by_id(issuable_id) # rubocop:disable Gitlab/ModuleWithInstanceVariables log_error(issuable_class, issuable_id) unless @issuable # rubocop:disable Gitlab/ModuleWithInstanceVariables end - # rubocop: enable CodeReuse/ActiveRecord def log_error(record_class, record_id) Gitlab::AppLogger.error("#{self.class}: couldn't find #{record_class} with ID=#{record_id}, skipping job") diff --git a/app/workers/concerns/todos_destroyer_queue.rb b/app/workers/concerns/todos_destroyer_queue.rb index 1bbccbfb1f9..1c31b64ad97 100644 --- a/app/workers/concerns/todos_destroyer_queue.rb +++ b/app/workers/concerns/todos_destroyer_queue.rb @@ -8,6 +8,6 @@ module TodosDestroyerQueue included do queue_namespace :todos_destroyer - feature_category :issue_tracking + feature_category :team_planning end end |