Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGitLab Bot <gitlab-bot@gitlab.com>2020-03-12 18:09:39 +0300
committerGitLab Bot <gitlab-bot@gitlab.com>2020-03-12 18:09:39 +0300
commitdc889678d1de8c09310b2f8f9742bb6c78a6f1a4 (patch)
tree70945aa6721a271fc8057efa13c3216a03fbac45 /app/workers/gitlab
parentcd52759ee33051b8ad7b88b02ba7954e4fad7018 (diff)
Add latest changes from gitlab-org/gitlab@master
Diffstat (limited to 'app/workers/gitlab')
-rw-r--r--app/workers/gitlab/github_import/advance_stage_worker.rb51
-rw-r--r--app/workers/gitlab/import/advance_stage.rb61
2 files changed, 65 insertions, 47 deletions
diff --git a/app/workers/gitlab/github_import/advance_stage_worker.rb b/app/workers/gitlab/github_import/advance_stage_worker.rb
index 8c379be2ae4..8fbf88a1762 100644
--- a/app/workers/gitlab/github_import/advance_stage_worker.rb
+++ b/app/workers/gitlab/github_import/advance_stage_worker.rb
@@ -8,15 +8,12 @@ module Gitlab
# stage.
class AdvanceStageWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker
+ include ::Gitlab::Import::AdvanceStage
sidekiq_options dead: false
feature_category :importers
- INTERVAL = 30.seconds.to_i
-
- # The number of seconds to wait (while blocking the thread) before
- # continuing to the next waiter.
- BLOCKING_WAIT_TIME = 5
+ private
# The known importer stages and their corresponding Sidekiq workers.
STAGES = {
@@ -26,49 +23,9 @@ module Gitlab
finish: Stage::FinishImportWorker
}.freeze
- # project_id - The ID of the project being imported.
- # waiters - A Hash mapping Gitlab::JobWaiter keys to the number of
- # remaining jobs.
- # next_stage - The name of the next stage to start when all jobs have been
- # completed.
- def perform(project_id, waiters, next_stage)
- return unless import_state = find_import_state(project_id)
-
- new_waiters = wait_for_jobs(waiters)
-
- if new_waiters.empty?
- # We refresh the import JID here so workers importing individual
- # resources (e.g. notes) don't have to do this all the time, reducing
- # the pressure on Redis. We _only_ do this once all jobs are done so
- # we don't get stuck forever if one or more jobs failed to notify the
- # JobWaiter.
- import_state.refresh_jid_expiration
-
- STAGES.fetch(next_stage.to_sym).perform_async(project_id)
- else
- self.class.perform_in(INTERVAL, project_id, new_waiters, next_stage)
- end
- end
-
- def wait_for_jobs(waiters)
- waiters.each_with_object({}) do |(key, remaining), new_waiters|
- waiter = JobWaiter.new(remaining, key)
-
- # We wait for a brief moment of time so we don't reschedule if we can
- # complete the work fast enough.
- waiter.wait(BLOCKING_WAIT_TIME)
-
- next unless waiter.jobs_remaining.positive?
-
- new_waiters[waiter.key] = waiter.jobs_remaining
- end
- end
-
- # rubocop: disable CodeReuse/ActiveRecord
- def find_import_state(project_id)
- ProjectImportState.select(:jid).with_status(:started).find_by(project_id: project_id)
+ def next_stage_worker(next_stage)
+ STAGES.fetch(next_stage.to_sym)
end
- # rubocop: enable CodeReuse/ActiveRecord
end
end
end
diff --git a/app/workers/gitlab/import/advance_stage.rb b/app/workers/gitlab/import/advance_stage.rb
new file mode 100644
index 00000000000..5c836413ae3
--- /dev/null
+++ b/app/workers/gitlab/import/advance_stage.rb
@@ -0,0 +1,61 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module Import
+ module AdvanceStage
+ INTERVAL = 30.seconds.to_i
+
+ # The number of seconds to wait (while blocking the thread) before
+ # continuing to the next waiter.
+ BLOCKING_WAIT_TIME = 5
+
+ # project_id - The ID of the project being imported.
+ # waiters - A Hash mapping Gitlab::JobWaiter keys to the number of
+ # remaining jobs.
+ # next_stage - The name of the next stage to start when all jobs have been
+ # completed.
+ def perform(project_id, waiters, next_stage)
+ return unless import_state = find_import_state(project_id)
+
+ new_waiters = wait_for_jobs(waiters)
+
+ if new_waiters.empty?
+ # We refresh the import JID here so workers importing individual
+ # resources (e.g. notes) don't have to do this all the time, reducing
+ # the pressure on Redis. We _only_ do this once all jobs are done so
+ # we don't get stuck forever if one or more jobs failed to notify the
+ # JobWaiter.
+ import_state.refresh_jid_expiration
+
+ next_stage_worker(next_stage).perform_async(project_id)
+ else
+ self.class.perform_in(INTERVAL, project_id, new_waiters, next_stage)
+ end
+ end
+
+ def wait_for_jobs(waiters)
+ waiters.each_with_object({}) do |(key, remaining), new_waiters|
+ waiter = JobWaiter.new(remaining, key)
+
+ # We wait for a brief moment of time so we don't reschedule if we can
+ # complete the work fast enough.
+ waiter.wait(BLOCKING_WAIT_TIME)
+
+ next unless waiter.jobs_remaining.positive?
+
+ new_waiters[waiter.key] = waiter.jobs_remaining
+ end
+ end
+
+ def find_import_state(project_id)
+ ProjectImportState.jid_by(project_id: project_id, status: :started)
+ end
+
+ private
+
+ def next_stage_worker(next_stage)
+ raise NotImplementedError
+ end
+ end
+ end
+end