Welcome to mirror list, hosted at ThFree Co, Russian Federation.

buffered_counter.rb « counters « gitlab « lib - gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 258ada864c88bfe1338350e8443384443296b963 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
# frozen_string_literal: true

module Gitlab
  module Counters
    class BufferedCounter
      include Gitlab::ExclusiveLeaseHelpers

      WORKER_DELAY = 10.minutes
      WORKER_LOCK_TTL = 10.minutes

      # Refresh keys are set to expire after a very long time,
      # so that they do not occupy Redis memory indefinitely,
      # if for any reason they are not deleted.
      # In practice, a refresh is not expected to take longer than this TTL.
      REFRESH_KEYS_TTL = 14.days
      CLEANUP_BATCH_SIZE = 50
      CLEANUP_INTERVAL_SECONDS = 0.1

      # Limit size of bitmap key to 2^26-1 (~8MB)
      MAX_BITMAP_OFFSET = 67108863

      LUA_FLUSH_INCREMENT_SCRIPT = <<~LUA
        local increment_key, flushed_key = KEYS[1], KEYS[2]
        local increment_value = redis.call("get", increment_key) or 0
        local flushed_value = redis.call("incrby", flushed_key, increment_value)
        if flushed_value == 0 then
          redis.call("del", increment_key, flushed_key)
        else
          redis.call("del", increment_key)
        end
        return flushed_value
      LUA

      def initialize(counter_record, attribute)
        @counter_record = counter_record
        @attribute = attribute
      end

      def get
        redis_state do |redis|
          redis.get(key).to_i
        end
      end

      LUA_INCREMENT_WITH_DEDUPLICATION_SCRIPT = <<~LUA
        local counter_key, refresh_key, refresh_indicator_key = KEYS[1], KEYS[2], KEYS[3]
        local tracking_shard_key, opposing_tracking_shard_key, shards_key = KEYS[4], KEYS[5], KEYS[6]

        local amount, tracking_offset = tonumber(ARGV[1]), tonumber(ARGV[2])

        -- increment to the counter key when not refreshing
        if redis.call("exists", refresh_indicator_key) == 0 then
          return redis.call("incrby", counter_key, amount)
        end

        -- deduplicate and increment to the refresh counter key while refreshing
        local found_duplicate = redis.call("getbit", tracking_shard_key, tracking_offset)
        if found_duplicate == 1 then
          return redis.call("get", refresh_key)
        end

        redis.call("setbit", tracking_shard_key, tracking_offset, 1)
        redis.call("expire", tracking_shard_key, #{REFRESH_KEYS_TTL.seconds})
        redis.call("sadd", shards_key, tracking_shard_key)
        redis.call("expire", shards_key, #{REFRESH_KEYS_TTL.seconds})

        local found_opposing_change = redis.call("getbit", opposing_tracking_shard_key, tracking_offset)
        local increment_without_previous_decrement = amount > 0 and found_opposing_change == 0
        local decrement_with_previous_increment = amount < 0 and found_opposing_change == 1
        local net_change = 0

        if increment_without_previous_decrement or decrement_with_previous_increment then
          net_change = amount
        end

        return redis.call("incrby", refresh_key, net_change)
      LUA

      def increment(increment)
        result = redis_state do |redis|
          redis.eval(LUA_INCREMENT_WITH_DEDUPLICATION_SCRIPT, **increment_args(increment)).to_i
        end

        FlushCounterIncrementsWorker.perform_in(WORKER_DELAY, counter_record.class.name, counter_record.id, attribute)

        result
      end

      def bulk_increment(increments)
        result = redis_state do |redis|
          redis.pipelined do |pipeline|
            increments.each do |increment|
              pipeline.eval(LUA_INCREMENT_WITH_DEDUPLICATION_SCRIPT, **increment_args(increment))
            end
          end
        end

        FlushCounterIncrementsWorker.perform_in(WORKER_DELAY, counter_record.class.name, counter_record.id, attribute)

        result.last.to_i
      end

      LUA_INITIATE_REFRESH_SCRIPT = <<~LUA
        local counter_key, refresh_indicator_key = KEYS[1], KEYS[2]
        redis.call("del", counter_key)
        redis.call("set", refresh_indicator_key, 1, "ex", #{REFRESH_KEYS_TTL.seconds})
      LUA

      def initiate_refresh!
        counter_record.update!(attribute => 0)

        redis_state do |redis|
          redis.eval(LUA_INITIATE_REFRESH_SCRIPT, keys: [key, refresh_indicator_key])
        end
      end

      LUA_FINALIZE_REFRESH_SCRIPT = <<~LUA
        local counter_key, refresh_key, refresh_indicator_key = KEYS[1], KEYS[2], KEYS[3]
        local refresh_amount = redis.call("get", refresh_key) or 0

        redis.call("incrby", counter_key, refresh_amount)
        redis.call("del", refresh_indicator_key, refresh_key)
      LUA

      def finalize_refresh
        redis_state do |redis|
          redis.eval(LUA_FINALIZE_REFRESH_SCRIPT, keys: [key, refresh_key, refresh_indicator_key])
        end

        FlushCounterIncrementsWorker.perform_in(WORKER_DELAY, counter_record.class.name, counter_record.id, attribute)
        ::Counters::CleanupRefreshWorker.perform_async(counter_record.class.name, counter_record.id, attribute)
      end

      def cleanup_refresh
        redis_state do |redis|
          while (shards = redis.spop(shards_key, CLEANUP_BATCH_SIZE))
            redis.del(*shards)
            break if shards.size < CLEANUP_BATCH_SIZE

            sleep CLEANUP_INTERVAL_SECONDS
          end
        end
      end

      def commit_increment!
        with_exclusive_lease do
          flush_amount = amount_to_be_flushed
          next if flush_amount == 0

          counter_record.transaction do
            counter_record.update_counters_with_lease({ attribute => flush_amount })
            remove_flushed_key
          end

          counter_record.execute_after_commit_callbacks
        end

        counter_record.reset.read_attribute(attribute)
      end

      # amount_to_be_flushed returns the total value to be flushed.
      # The total value is the sum of the following:
      # - current value in the increment_key
      # - any existing value in the flushed_key that has not been flushed
      def amount_to_be_flushed
        redis_state do |redis|
          redis.eval(LUA_FLUSH_INCREMENT_SCRIPT, keys: [key, flushed_key])
        end
      end

      def key
        project_id = counter_record.project.id
        record_name = counter_record.class
        record_id = counter_record.id

        "project:{#{project_id}}:counters:#{record_name}:#{record_id}:#{attribute}"
      end

      def flushed_key
        "#{key}:flushed"
      end

      def refresh_indicator_key
        "#{key}:refresh-in-progress"
      end

      def refresh_key
        "#{key}:refresh"
      end

      private

      attr_reader :counter_record, :attribute

      def increment_args(increment)
        {
          keys: [
            key,
            refresh_key,
            refresh_indicator_key,
            tracking_shard_key(increment),
            opposing_tracking_shard_key(increment),
            shards_key
          ],
          argv: [
            increment.amount,
            tracking_offset(increment)
          ]
        }
      end

      def tracking_shard_key(increment)
        positive?(increment) ? positive_shard_key(increment.ref.to_i) : negative_shard_key(increment.ref.to_i)
      end

      def opposing_tracking_shard_key(increment)
        positive?(increment) ? negative_shard_key(increment.ref.to_i) : positive_shard_key(increment.ref.to_i)
      end

      def shards_key
        "#{refresh_key}:shards"
      end

      def positive_shard_key(ref)
        "#{refresh_key}:+:#{shard_number(ref)}"
      end

      def negative_shard_key(ref)
        "#{refresh_key}:-:#{shard_number(ref)}"
      end

      def shard_number(ref)
        ref / MAX_BITMAP_OFFSET
      end

      def tracking_offset(increment)
        increment.ref.to_i % MAX_BITMAP_OFFSET
      end

      def positive?(increment)
        increment.amount > 0
      end

      def remove_flushed_key
        redis_state do |redis|
          redis.del(flushed_key)
        end
      end

      def redis_state(&block)
        Gitlab::Redis::SharedState.with(&block)
      end

      def with_exclusive_lease(&block)
        lock_key = "#{key}:locked"

        in_lock(lock_key, ttl: WORKER_LOCK_TTL, &block)
      rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError
        # a worker is already updating the counters
      end
    end
  end
end