Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'lib/gitlab/bitbucket_server_import/parallel_scheduling.rb')
-rw-r--r--lib/gitlab/bitbucket_server_import/parallel_scheduling.rb101
1 files changed, 101 insertions, 0 deletions
diff --git a/lib/gitlab/bitbucket_server_import/parallel_scheduling.rb b/lib/gitlab/bitbucket_server_import/parallel_scheduling.rb
new file mode 100644
index 00000000000..7fd61bbb206
--- /dev/null
+++ b/lib/gitlab/bitbucket_server_import/parallel_scheduling.rb
@@ -0,0 +1,101 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module BitbucketServerImport
+ module ParallelScheduling
+ include Loggable
+
+ attr_reader :project, :already_processed_cache_key, :job_waiter_cache_key
+
+ # The base cache key to use for tracking already processed objects.
+ ALREADY_PROCESSED_CACHE_KEY =
+ 'bitbucket-server-importer/already-processed/%{project}/%{collection}'
+
+ # The base cache key to use for storing job waiter key
+ JOB_WAITER_CACHE_KEY =
+ 'bitbucket-server-importer/job-waiter/%{project}/%{collection}'
+
+ BATCH_SIZE = 100
+
+ # project - An instance of `Project`.
+ def initialize(project)
+ @project = project
+
+ @already_processed_cache_key =
+ format(ALREADY_PROCESSED_CACHE_KEY, project: project.id, collection: collection_method)
+ @job_waiter_cache_key =
+ format(JOB_WAITER_CACHE_KEY, project: project.id, collection: collection_method)
+ end
+
+ private
+
+ def client
+ @client ||= BitbucketServer::Client.new(project.import_data.credentials)
+ end
+
+ def project_key
+ @project_key ||= project.import_data.data['project_key']
+ end
+
+ def repository_slug
+ @repository_slug ||= project.import_data.data['repo_slug']
+ end
+
+ # Returns the ID to use for the cache used for checking if an object has
+ # already been processed or not.
+ #
+ # object - The object we may want to import.
+ def id_for_already_processed_cache(object)
+ raise NotImplementedError
+ end
+
+ # The Sidekiq worker class used for scheduling the importing of objects in
+ # parallel.
+ def sidekiq_worker_class
+ raise NotImplementedError
+ end
+
+ # The name of the method to call to retrieve the data to import.
+ def collection_method
+ raise NotImplementedError
+ end
+
+ def job_waiter
+ @job_waiter ||= begin
+ key = Gitlab::Cache::Import::Caching.read(job_waiter_cache_key)
+ key ||= Gitlab::Cache::Import::Caching.write(job_waiter_cache_key, JobWaiter.generate_key)
+
+ JobWaiter.new(0, key)
+ end
+ end
+
+ def already_processed?(object)
+ id = id_for_already_processed_cache(object)
+
+ Gitlab::Cache::Import::Caching.set_includes?(already_processed_cache_key, id)
+ end
+
+ # Marks the given object as "already processed".
+ def mark_as_processed(object)
+ id = id_for_already_processed_cache(object)
+
+ Gitlab::Cache::Import::Caching.set_add(already_processed_cache_key, id)
+ end
+
+ def calculate_job_delay(job_index)
+ multiplier = (job_index / BATCH_SIZE)
+
+ (multiplier * 1.minute) + 1.second
+ end
+
+ def track_import_failure!(project, exception:, **args)
+ Gitlab::Import::ImportFailureService.track(
+ project_id: project.id,
+ error_source: self.class.name,
+ exception: exception,
+ **args
+ )
+ end
+ end
+ end
+end