Welcome to mirror list, hosted at ThFree Co, Russian Federation.

each_batch.rb « concerns « models « app - gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 945d286a2fd43b437cc22ac67a71607661b8adf7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# frozen_string_literal: true

module EachBatch
  extend ActiveSupport::Concern
  include LooseIndexScan

  class_methods do
    # Iterates over the rows in a relation in batches, similar to Rails'
    # `in_batches` but in a more efficient way.
    #
    # Unlike `in_batches` provided by Rails this method does not support a
    # custom start/end range, nor does it provide support for the `load:`
    # keyword argument.
    #
    # This method will yield an ActiveRecord::Relation to the supplied block, or
    # return an Enumerator if no block is given.
    #
    # Example:
    #
    #     User.each_batch do |relation|
    #       relation.update_all(updated_at: Time.current)
    #     end
    #
    # The supplied block is also passed an optional batch index:
    #
    #     User.each_batch do |relation, index|
    #       puts index # => 1, 2, 3, ...
    #     end
    #
    # You can also specify an alternative column to use for ordering the rows:
    #
    #     User.each_batch(column: :created_at) do |relation|
    #       ...
    #     end
    #
    # This will produce SQL queries along the lines of:
    #
    #     User Load (0.7ms)  SELECT  "users"."id" FROM "users" WHERE ("users"."id" >= 41654)  ORDER BY "users"."id" ASC LIMIT 1 OFFSET 1000
    #       (0.7ms)  SELECT COUNT(*) FROM "users" WHERE ("users"."id" >= 41654) AND ("users"."id" < 42687)
    #
    # of - The number of rows to retrieve per batch.
    # column - The column to use for ordering the batches.
    # order_hint - An optional column to append to the `ORDER BY id`
    #   clause to help the query planner. PostgreSQL might perform badly
    #   with a LIMIT 1 because the planner is guessing that scanning the
    #   index in ID order will come across the desired row in less time
    #   it will take the planner than using another index. The
    #   order_hint does not affect the search results. For example,
    #   `ORDER BY id ASC, updated_at ASC` means the same thing as `ORDER
    #   BY id ASC`.
    def each_batch(of: 1000, column: primary_key, order: :asc, order_hint: nil)
      unless column
        raise ArgumentError,
          'the column: argument must be set to a column name to use for ordering rows'
      end

      start = except(:select)
        .select(column)
        .reorder(column => order)

      start = start.order(order_hint) if order_hint
      start = start.take

      return unless start

      start_id = start[column]
      arel_table = self.arel_table

      1.step do |index|
        start_cond = arel_table[column].gteq(start_id)
        start_cond = arel_table[column].lteq(start_id) if order == :desc
        stop = except(:select)
          .select(column)
          .where(start_cond)
          .reorder(column => order)

        stop = stop.order(order_hint) if order_hint
        stop = stop
          .offset(of)
          .limit(1)
          .take

        relation = where(start_cond)

        if stop
          stop_id = stop[column]
          start_id = stop_id
          stop_cond = arel_table[column].lt(stop_id)
          stop_cond = arel_table[column].gt(stop_id) if order == :desc
          relation = relation.where(stop_cond)
        end

        # Any ORDER BYs are useless for this relation and can lead to less
        # efficient UPDATE queries, hence we get rid of it.
        relation = relation.except(:order)

        # Using unscoped is necessary to prevent leaking the current scope used by
        # ActiveRecord to chain `each_batch` method.
        unscoped { yield relation, index }

        break unless stop
      end
    end

    # Iterates over the rows in a relation in batches by skipping duplicated values in the column.
    # Example: counting the number of distinct authors in `issues`
    #
    #  - Table size: 100_000
    #  - Column: author_id
    #  - Distinct author_ids in the table: 1000
    #
    #  The query will read maximum 1000 rows if we have index coverage on user_id.
    #
    #  > count = 0
    #  > Issue.distinct_each_batch(column: 'author_id', of: 1000) { |r| count += r.count(:author_id) }
    def distinct_each_batch(column:, order: :asc, of: 1000)
      start = except(:select)
        .select(column)
        .reorder(column => order)

      start = start.take

      return unless start

      start_id = start[column]
      arel_table = self.arel_table
      arel_column = arel_table[column.to_s]

      1.step do |index|
        stop = loose_index_scan(column: column, order: order) do |cte_query, inner_query|
          if order == :asc
            [cte_query.where(arel_column.gteq(start_id)), inner_query]
          else
            [cte_query.where(arel_column.lteq(start_id)), inner_query]
          end
        end.offset(of).take

        if stop
          stop_id = stop[column]

          relation = loose_index_scan(column: column, order: order) do |cte_query, inner_query|
            if order == :asc
              [cte_query.where(arel_column.gteq(start_id)), inner_query.where(arel_column.lt(stop_id))]
            else
              [cte_query.where(arel_column.lteq(start_id)), inner_query.where(arel_column.gt(stop_id))]
            end
          end
          start_id = stop_id
        else
          relation = loose_index_scan(column: column, order: order) do |cte_query, inner_query|
            if order == :asc
              [cte_query.where(arel_column.gteq(start_id)), inner_query]
            else
              [cte_query.where(arel_column.lteq(start_id)), inner_query]
            end
          end
        end

        unscoped { yield relation, index }

        break unless stop
      end
    end

    # Iterates over the relation and counts the rows. The counting
    # logic is combined with the iteration query which saves one query
    # compared to a standard each_batch approach.
    #
    # Basic usage:
    # count, _last_value = Project.each_batch_count
    #
    # The counting can be stopped by passing a block and making the last statement true.
    # Example:
    #
    # query_count = 0
    # count, last_value = Project.each_batch_count do
    #   query_count += 1
    #   query_count == 5 # stop counting after 5 loops
    # end
    #
    # Resume where the previous counting has stopped:
    #
    # count, last_value = Project.each_batch_count(last_count: count, last_value: last_value)
    #
    # Another example, counting issues in project:
    #
    # project = Project.find(1)
    # count, _ = project.issues.each_batch_count(column: :iid)
    def each_batch_count(of: 1000, column: :id, last_count: 0, last_value: nil)
      arel_table = self.arel_table
      window = Arel::Nodes::Window.new.order(arel_table[column])
      last_value_column = Arel::Nodes::NamedFunction
        .new('LAST_VALUE', [arel_table[column]])
        .over(window)
        .as(column.to_s)

      loop do
        count_column = Arel::Nodes::Addition
          .new(Arel::Nodes::NamedFunction.new('ROW_NUMBER', []).over(window), last_count)
          .as('count')

        projections = [count_column, last_value_column]
        scope = limit(1).offset(of - 1)
        scope = scope.where(arel_table[column].gt(last_value)) if last_value
        new_count, last_value = scope.pick(*projections)

        # When reaching the last batch the offset query might return no data, to address this
        # problem, we invoke a specialized query that takes the last row out of the resultset.
        # We could do this for each batch, however it would add unnecessary overhead to all
        # queries.
        if new_count.nil?
          inner_query = scope
            .select(*projections)
            .limit(nil)
            .offset(nil)
            .arel
            .as(quoted_table_name)

          new_count, last_value =
            unscoped
            .from(inner_query)
            .unscope(where: :type)
            .order(count: :desc)
            .limit(1)
            .pick(:count, column)

          last_count = new_count if new_count
          last_value = nil
          break
        end

        last_count = new_count

        if block_given?
          should_break = yield(last_count, last_value)
          break if should_break
        end
      end
      [last_count, last_value]
    end
  end
end