Welcome to mirror list, hosted at ThFree Co, Russian Federation.

parallel_scheduling.rb « bitbucket_server_import « gitlab « lib - gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 7fd61bbb20663afce092eeba42aac11b7504b33d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# frozen_string_literal: true

module Gitlab
  module BitbucketServerImport
    module ParallelScheduling
      include Loggable

      attr_reader :project, :already_processed_cache_key, :job_waiter_cache_key

      # The base cache key to use for tracking already processed objects.
      ALREADY_PROCESSED_CACHE_KEY =
        'bitbucket-server-importer/already-processed/%{project}/%{collection}'

      # The base cache key to use for storing job waiter key
      JOB_WAITER_CACHE_KEY =
        'bitbucket-server-importer/job-waiter/%{project}/%{collection}'

      BATCH_SIZE = 100

      # project - An instance of `Project`.
      def initialize(project)
        @project = project

        @already_processed_cache_key =
          format(ALREADY_PROCESSED_CACHE_KEY, project: project.id, collection: collection_method)
        @job_waiter_cache_key =
          format(JOB_WAITER_CACHE_KEY, project: project.id, collection: collection_method)
      end

      private

      def client
        @client ||= BitbucketServer::Client.new(project.import_data.credentials)
      end

      def project_key
        @project_key ||= project.import_data.data['project_key']
      end

      def repository_slug
        @repository_slug ||= project.import_data.data['repo_slug']
      end

      # Returns the ID to use for the cache used for checking if an object has
      # already been processed or not.
      #
      # object - The object we may want to import.
      def id_for_already_processed_cache(object)
        raise NotImplementedError
      end

      # The Sidekiq worker class used for scheduling the importing of objects in
      # parallel.
      def sidekiq_worker_class
        raise NotImplementedError
      end

      # The name of the method to call to retrieve the data to import.
      def collection_method
        raise NotImplementedError
      end

      def job_waiter
        @job_waiter ||= begin
          key = Gitlab::Cache::Import::Caching.read(job_waiter_cache_key)
          key ||= Gitlab::Cache::Import::Caching.write(job_waiter_cache_key, JobWaiter.generate_key)

          JobWaiter.new(0, key)
        end
      end

      def already_processed?(object)
        id = id_for_already_processed_cache(object)

        Gitlab::Cache::Import::Caching.set_includes?(already_processed_cache_key, id)
      end

      # Marks the given object as "already processed".
      def mark_as_processed(object)
        id = id_for_already_processed_cache(object)

        Gitlab::Cache::Import::Caching.set_add(already_processed_cache_key, id)
      end

      def calculate_job_delay(job_index)
        multiplier = (job_index / BATCH_SIZE)

        (multiplier * 1.minute) + 1.second
      end

      def track_import_failure!(project, exception:, **args)
        Gitlab::Import::ImportFailureService.track(
          project_id: project.id,
          error_source: self.class.name,
          exception: exception,
          **args
        )
      end
    end
  end
end