diff options
Diffstat (limited to 'lib/gitlab/github_import/parallel_scheduling.rb')
-rw-r--r-- | lib/gitlab/github_import/parallel_scheduling.rb | 51 |
1 files changed, 49 insertions, 2 deletions
diff --git a/lib/gitlab/github_import/parallel_scheduling.rb b/lib/gitlab/github_import/parallel_scheduling.rb index 03aa02fb659..4b54a77983d 100644 --- a/lib/gitlab/github_import/parallel_scheduling.rb +++ b/lib/gitlab/github_import/parallel_scheduling.rb @@ -3,11 +3,18 @@ module Gitlab module GithubImport module ParallelScheduling - attr_reader :project, :client, :page_counter, :already_imported_cache_key + attr_reader :project, :client, :page_counter, :already_imported_cache_key, + :job_waiter_cache_key, :job_waiter_remaining_cache_key # The base cache key to use for tracking already imported objects. ALREADY_IMPORTED_CACHE_KEY = 'github-importer/already-imported/%{project}/%{collection}' + # The base cache key to use for storing job waiter key + JOB_WAITER_CACHE_KEY = + 'github-importer/job-waiter/%{project}/%{collection}' + # The base cache key to use for storing job waiter remaining jobs + JOB_WAITER_REMAINING_CACHE_KEY = + 'github-importer/job-waiter-remaining/%{project}/%{collection}' # project - An instance of `Project`. # client - An instance of `Gitlab::GithubImport::Client`. @@ -19,6 +26,10 @@ module Gitlab @page_counter = PageCounter.new(project, collection_method) @already_imported_cache_key = ALREADY_IMPORTED_CACHE_KEY % { project: project.id, collection: collection_method } + @job_waiter_cache_key = JOB_WAITER_CACHE_KEY % + { project: project.id, collection: collection_method } + @job_waiter_remaining_cache_key = JOB_WAITER_REMAINING_CACHE_KEY % + { project: project.id, collection: collection_method } end def parallel? @@ -74,7 +85,27 @@ module Gitlab def parallel_import raise 'Batch settings must be defined for parallel import' if parallel_import_batch.blank? - spread_parallel_import + if Feature.enabled?(:improved_spread_parallel_import) + improved_spread_parallel_import + else + spread_parallel_import + end + end + + def improved_spread_parallel_import + enqueued_job_counter = 0 + + each_object_to_import do |object| + repr = object_representation(object) + + job_delay = calculate_job_delay(enqueued_job_counter) + sidekiq_worker_class.perform_in(job_delay, project.id, repr.to_hash, job_waiter.key) + enqueued_job_counter += 1 + + job_waiter.jobs_remaining = Gitlab::Cache::Import::Caching.increment(job_waiter_remaining_cache_key) + end + + job_waiter end def spread_parallel_import @@ -233,6 +264,22 @@ module Gitlab parallel: parallel? ) end + + def job_waiter + @job_waiter ||= begin + key = Gitlab::Cache::Import::Caching.read(job_waiter_cache_key) + key ||= Gitlab::Cache::Import::Caching.write(job_waiter_cache_key, JobWaiter.generate_key) + jobs_remaining = Gitlab::Cache::Import::Caching.read(job_waiter_remaining_cache_key).to_i || 0 + + JobWaiter.new(jobs_remaining, key) + end + end + + def calculate_job_delay(job_index) + multiplier = (job_index / parallel_import_batch[:size]) + + (multiplier * parallel_import_batch[:delay]) + 1.second + end end end end |