Welcome to mirror list, hosted at ThFree Co, Russian Federation.

relative_position_rebalancing_service.rb « issues « services « app - gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 23bb409f3cd78fced31bfeb61dd0167770207fcc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# frozen_string_literal: true

module Issues
  class RelativePositionRebalancingService
    UPDATE_BATCH_SIZE = 100
    PREFETCH_ISSUES_BATCH_SIZE = 10_000
    SMALLEST_BATCH_SIZE = 5
    RETRIES_LIMIT = 3

    TooManyConcurrentRebalances = Class.new(StandardError)

    def initialize(projects)
      @projects_collection = (projects.is_a?(Array) ? Project.id_in(projects) : projects).projects_order_id_asc
      @root_namespace = @projects_collection.take.root_namespace # rubocop:disable CodeReuse/ActiveRecord
      @caching = ::Gitlab::Issues::Rebalancing::State.new(@root_namespace, @projects_collection)
    end

    def execute
      return unless Feature.enabled?(:rebalance_issues, root_namespace)

      # Given can_start_rebalance? and track_new_running_rebalance are not atomic
      # it can happen that we end up with more than Rebalancing::State::MAX_NUMBER_OF_CONCURRENT_REBALANCES running.
      # Considering the number of allowed Rebalancing::State::MAX_NUMBER_OF_CONCURRENT_REBALANCES is small we should be ok,
      # but should be something to consider if we'd want to scale this up.
      error_message = "#{caching.concurrent_running_rebalances_count} concurrent re-balances currently running"
      raise TooManyConcurrentRebalances, error_message unless caching.can_start_rebalance?

      block_issue_repositioning! unless root_namespace.issue_repositioning_disabled?
      caching.track_new_running_rebalance
      index = caching.get_current_index

      loop do
        issue_ids = get_issue_ids(index, PREFETCH_ISSUES_BATCH_SIZE)
        pairs_with_index = assign_indexes(issue_ids, index)

        pairs_with_index.each_slice(UPDATE_BATCH_SIZE) do |pairs_batch|
          update_positions_with_retry(pairs_batch, 're-balance issue positions in batches ordered by position')
        end

        index = caching.get_current_index

        break if index >= caching.issue_count - 1
      end

      caching.cleanup_cache
      unblock_issue_repositioning!
    end

    private

    attr_reader :root_namespace, :projects_collection, :caching

    def block_issue_repositioning!
      Feature.enable(:block_issue_repositioning, root_namespace)
    end

    def unblock_issue_repositioning!
      Feature.disable(:block_issue_repositioning, root_namespace)
    end

    def get_issue_ids(index, limit)
      issue_ids = caching.get_cached_issue_ids(index, limit)

      # if we have a list of cached issues and no current project id cached,
      # then we successfully cached issues for all projects
      return issue_ids if issue_ids.any? && caching.get_current_project_id.blank?

      # if we got no issue ids at the start of re-balancing then we did not cache any issue ids yet
      preload_issue_ids

      caching.get_cached_issue_ids(index, limit)
    end

    # rubocop: disable CodeReuse/ActiveRecord
    def preload_issue_ids
      index = 0
      cached_project_id = caching.get_current_project_id

      collection = projects_collection
      collection = projects_collection.where(Project.arel_table[:id].gteq(cached_project_id.to_i)) if cached_project_id.present?

      collection.each do |project|
        caching.cache_current_project_id(project.id)
        index += 1
        scope = Issue.in_projects(project).order_by_relative_position.with_non_null_relative_position.select(:id, :relative_position)

        with_retry(PREFETCH_ISSUES_BATCH_SIZE, 100) do |batch_size|
          Gitlab::Pagination::Keyset::Iterator.new(scope: scope).each_batch(of: batch_size) do |batch|
            caching.cache_issue_ids(batch)
          end
        end
      end

      caching.remove_current_project_id_cache
    end
    # rubocop: enable CodeReuse/ActiveRecord

    def assign_indexes(ids, start_index)
      ids.each_with_index.map do |id, idx|
        [id, start_index + idx]
      end
    end

    # The method runs in a loop where we try for RETRIES_LIMIT=3 times, to run the update statement on
    # a number of records(batch size). Method gets an array of (id, value) pairs as argument that is used
    # to build the update query matching by id and updating relative_position = value. If we get a statement
    # timeout, we split the batch size in half and try(for 3 times again) to batch update on a smaller number of records.
    # On success, because we know the batch size and we always pick from the beginning of the array param,
    # we can remove first batch_size number of items from array and continue with the successful batch_size for next batches.
    # On failures we continue to split batch size to a SMALLEST_BATCH_SIZE limit, which is now set at 5.
    #
    # e.g.
    # 0. items | previous batch size|new batch size | comment
    # 1. 100   | 100                | 100           | 3 failures -> split the batch size in half
    # 2. 100   | 100                | 50            | 3 failures -> split the batch size in half again
    # 3. 100   | 50                 | 25            | 3 succeed -> so we drop 25 items 3 times, 4th fails -> split the batch size in half again
    # 5. 25    | 25                 | 12            | 3 failures -> split the batch size in half
    # 6. 25    | 12                 | 6             | 3 failures -> we exit because smallest batch size is 5 and we'll be at 3 if we split again

    def update_positions_with_retry(pairs_with_index, query_name)
      retry_batch_size = pairs_with_index.size

      until pairs_with_index.empty?
        with_retry(retry_batch_size, SMALLEST_BATCH_SIZE) do |batch_size|
          retry_batch_size = batch_size
          update_positions(pairs_with_index.first(batch_size), query_name)
          # pairs_with_index[batch_size - 1] - can be nil for last batch
          # if last batch is smaller than batch_size, so we just get the last pair.
          last_pair_in_batch = pairs_with_index[batch_size - 1] || pairs_with_index.last
          caching.cache_current_index(last_pair_in_batch.last + 1)
          pairs_with_index = pairs_with_index.drop(batch_size)
        end
      end
    end

    def update_positions(pairs_with_position, query_name)
      values = pairs_with_position.map do |id, index|
        "(#{id}, #{start_position + (index * gap_size)})"
      end.join(', ')

      run_update_query(values, query_name)
    end

    def run_update_query(values, query_name)
      Issue.connection.exec_query(<<~SQL, query_name)
        WITH cte(cte_id, new_pos) AS #{Gitlab::Database::AsWithMaterialized.materialized_if_supported} (
         SELECT *
         FROM (VALUES #{values}) as t (id, pos)
        )
        UPDATE #{Issue.table_name}
        SET relative_position = cte.new_pos
        FROM cte
        WHERE cte_id = id
      SQL
    end

    def gaps
      caching.issue_count - 1
    end

    def gap_size
      RelativePositioning::MAX_GAP
    end

    def start_position
      @start_position ||= (RelativePositioning::START_POSITION - (gaps / 2) * gap_size).to_i
    end

    def with_retry(initial_batch_size, exit_batch_size)
      retries = 0
      batch_size = initial_batch_size

      begin
        yield batch_size
        retries = 0
      rescue ActiveRecord::StatementTimeout, ActiveRecord::QueryCanceled => ex
        raise ex if batch_size < exit_batch_size

        if (retries += 1) == RETRIES_LIMIT
          # shrink the batch size in half when RETRIES limit is reached and update still fails perhaps because batch size is still too big
          batch_size = (batch_size / 2).to_i
          retries = 0
        end

        retry
      end
    end
  end
end