From dc889678d1de8c09310b2f8f9742bb6c78a6f1a4 Mon Sep 17 00:00:00 2001 From: GitLab Bot Date: Thu, 12 Mar 2020 15:09:39 +0000 Subject: Add latest changes from gitlab-org/gitlab@master --- .../gitlab/github_import/advance_stage_worker.rb | 51 ++---------------- app/workers/gitlab/import/advance_stage.rb | 61 ++++++++++++++++++++++ 2 files changed, 65 insertions(+), 47 deletions(-) create mode 100644 app/workers/gitlab/import/advance_stage.rb (limited to 'app/workers/gitlab') diff --git a/app/workers/gitlab/github_import/advance_stage_worker.rb b/app/workers/gitlab/github_import/advance_stage_worker.rb index 8c379be2ae4..8fbf88a1762 100644 --- a/app/workers/gitlab/github_import/advance_stage_worker.rb +++ b/app/workers/gitlab/github_import/advance_stage_worker.rb @@ -8,15 +8,12 @@ module Gitlab # stage. class AdvanceStageWorker # rubocop:disable Scalability/IdempotentWorker include ApplicationWorker + include ::Gitlab::Import::AdvanceStage sidekiq_options dead: false feature_category :importers - INTERVAL = 30.seconds.to_i - - # The number of seconds to wait (while blocking the thread) before - # continuing to the next waiter. - BLOCKING_WAIT_TIME = 5 + private # The known importer stages and their corresponding Sidekiq workers. STAGES = { @@ -26,49 +23,9 @@ module Gitlab finish: Stage::FinishImportWorker }.freeze - # project_id - The ID of the project being imported. - # waiters - A Hash mapping Gitlab::JobWaiter keys to the number of - # remaining jobs. - # next_stage - The name of the next stage to start when all jobs have been - # completed. - def perform(project_id, waiters, next_stage) - return unless import_state = find_import_state(project_id) - - new_waiters = wait_for_jobs(waiters) - - if new_waiters.empty? - # We refresh the import JID here so workers importing individual - # resources (e.g. notes) don't have to do this all the time, reducing - # the pressure on Redis. We _only_ do this once all jobs are done so - # we don't get stuck forever if one or more jobs failed to notify the - # JobWaiter. - import_state.refresh_jid_expiration - - STAGES.fetch(next_stage.to_sym).perform_async(project_id) - else - self.class.perform_in(INTERVAL, project_id, new_waiters, next_stage) - end - end - - def wait_for_jobs(waiters) - waiters.each_with_object({}) do |(key, remaining), new_waiters| - waiter = JobWaiter.new(remaining, key) - - # We wait for a brief moment of time so we don't reschedule if we can - # complete the work fast enough. - waiter.wait(BLOCKING_WAIT_TIME) - - next unless waiter.jobs_remaining.positive? - - new_waiters[waiter.key] = waiter.jobs_remaining - end - end - - # rubocop: disable CodeReuse/ActiveRecord - def find_import_state(project_id) - ProjectImportState.select(:jid).with_status(:started).find_by(project_id: project_id) + def next_stage_worker(next_stage) + STAGES.fetch(next_stage.to_sym) end - # rubocop: enable CodeReuse/ActiveRecord end end end diff --git a/app/workers/gitlab/import/advance_stage.rb b/app/workers/gitlab/import/advance_stage.rb new file mode 100644 index 00000000000..5c836413ae3 --- /dev/null +++ b/app/workers/gitlab/import/advance_stage.rb @@ -0,0 +1,61 @@ +# frozen_string_literal: true + +module Gitlab + module Import + module AdvanceStage + INTERVAL = 30.seconds.to_i + + # The number of seconds to wait (while blocking the thread) before + # continuing to the next waiter. + BLOCKING_WAIT_TIME = 5 + + # project_id - The ID of the project being imported. + # waiters - A Hash mapping Gitlab::JobWaiter keys to the number of + # remaining jobs. + # next_stage - The name of the next stage to start when all jobs have been + # completed. + def perform(project_id, waiters, next_stage) + return unless import_state = find_import_state(project_id) + + new_waiters = wait_for_jobs(waiters) + + if new_waiters.empty? + # We refresh the import JID here so workers importing individual + # resources (e.g. notes) don't have to do this all the time, reducing + # the pressure on Redis. We _only_ do this once all jobs are done so + # we don't get stuck forever if one or more jobs failed to notify the + # JobWaiter. + import_state.refresh_jid_expiration + + next_stage_worker(next_stage).perform_async(project_id) + else + self.class.perform_in(INTERVAL, project_id, new_waiters, next_stage) + end + end + + def wait_for_jobs(waiters) + waiters.each_with_object({}) do |(key, remaining), new_waiters| + waiter = JobWaiter.new(remaining, key) + + # We wait for a brief moment of time so we don't reschedule if we can + # complete the work fast enough. + waiter.wait(BLOCKING_WAIT_TIME) + + next unless waiter.jobs_remaining.positive? + + new_waiters[waiter.key] = waiter.jobs_remaining + end + end + + def find_import_state(project_id) + ProjectImportState.jid_by(project_id: project_id, status: :started) + end + + private + + def next_stage_worker(next_stage) + raise NotImplementedError + end + end + end +end -- cgit v1.2.3