Welcome to mirror list, hosted at ThFree Co, Russian Federation.

looping_batcher.rb « gitlab « lib - gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: adf0aeda5060862ce628ff2fd7895747fa80bbf6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# frozen_string_literal: true

module Gitlab
  # Returns an ID range within a table so it can be iterated over. Repeats from
  # the beginning after it reaches the end.
  #
  # Used by Geo in particular to iterate over a replicable and its registry
  # table.
  #
  # Tracks a cursor for each table, by "key". If the table is smaller than
  # batch_size, then a range for the whole table is returned on every call.
  class LoopingBatcher
    # @param [Class] model_class the class of the table to iterate on
    # @param [String] key to identify the cursor. Note, cursor is already unique
    #   per table.
    # @param [Integer] batch_size to limit the number of records in a batch
    def initialize(model_class, key:, batch_size: 1000)
      @model_class = model_class
      @key = key
      @batch_size = batch_size
    end

    # @return [Range] a range of IDs. `nil` if 0 records at or after the cursor.
    def next_range!
      return unless @model_class.any?

      batch_first_id = cursor_id

      batch_last_id = get_batch_last_id(batch_first_id)
      return unless batch_last_id

      batch_first_id..batch_last_id
    end

    private

    # @private
    #
    # Get the last ID of the batch. Increment the cursor or reset it if at end.
    #
    # @param [Integer] batch_first_id the first ID of the batch
    # @return [Integer] batch_last_id the last ID of the batch (not the table)
    def get_batch_last_id(batch_first_id)
      batch_last_id, more_rows = run_query(@model_class.table_name, @model_class.primary_key, batch_first_id, @batch_size)

      if more_rows
        increment_batch(batch_last_id)
      else
        reset if batch_first_id > 1
      end

      batch_last_id
    end

    def run_query(table, primary_key, batch_first_id, batch_size)
      sql = <<~SQL
        SELECT MAX(batch.id) AS batch_last_id,
        EXISTS (
          SELECT #{primary_key}
          FROM #{table}
          WHERE #{primary_key} > MAX(batch.id)
        ) AS more_rows
        FROM (
          SELECT #{primary_key}
          FROM #{table}
          WHERE #{primary_key} >= #{batch_first_id}
          ORDER BY #{primary_key}
          LIMIT #{batch_size}) AS batch;
      SQL

      result = ActiveRecord::Base.connection.exec_query(sql).first

      [result["batch_last_id"], result["more_rows"]]
    end

    def reset
      set_cursor_id(1)
    end

    def increment_batch(batch_last_id)
      set_cursor_id(batch_last_id + 1)
    end

    # @private
    #
    # @return [Integer] the cursor ID, or 1 if it is not set
    def cursor_id
      Rails.cache.fetch("#{cache_key}:cursor_id") || 1
    end

    def set_cursor_id(id)
      Rails.cache.write("#{cache_key}:cursor_id", id)
    end

    def cache_key
      @cache_key ||= "#{self.class.name.parameterize}:#{@model_class.name.parameterize}:#{@key}:cursor_id"
    end
  end
end