diff options
Diffstat (limited to 'app/models/concerns/counter_attribute.rb')
-rw-r--r-- | app/models/concerns/counter_attribute.rb | 201 |
1 files changed, 57 insertions, 144 deletions
diff --git a/app/models/concerns/counter_attribute.rb b/app/models/concerns/counter_attribute.rb index 03e062a9855..f1efbba67e1 100644 --- a/app/models/concerns/counter_attribute.rb +++ b/app/models/concerns/counter_attribute.rb @@ -17,14 +17,29 @@ # counter_attribute :storage_size # end # +# It's possible to define a conditional counter attribute. You need to pass a proc +# that must accept a single argument, the object instance on which this concern is +# included. +# +# @example: +# +# class ProjectStatistics +# include CounterAttribute +# +# counter_attribute :conditional_one, if: -> { |object| object.use_counter_attribute? } +# end +# # To increment the counter we can use the method: -# delayed_increment_counter(:commit_count, 3) +# increment_counter(:commit_count, 3) +# +# This method would determine whether it would increment the counter using Redis, +# or fallback to legacy increment on ActiveRecord counters. # # It is possible to register callbacks to be executed after increments have # been flushed to the database. Callbacks are not executed if there are no increments # to flush. # -# counter_attribute_after_flush do |statistic| +# counter_attribute_after_commit do |statistic| # Namespaces::ScheduleAggregationWorker.perform_async(statistic.namespace_id) # end # @@ -32,99 +47,51 @@ module CounterAttribute extend ActiveSupport::Concern extend AfterCommitQueue include Gitlab::ExclusiveLeaseHelpers - - LUA_STEAL_INCREMENT_SCRIPT = <<~EOS - local increment_key, flushed_key = KEYS[1], KEYS[2] - local increment_value = redis.call("get", increment_key) or 0 - local flushed_value = redis.call("incrby", flushed_key, increment_value) - if flushed_value == 0 then - redis.call("del", increment_key, flushed_key) - else - redis.call("del", increment_key) - end - return flushed_value - EOS - - WORKER_DELAY = 10.minutes - WORKER_LOCK_TTL = 10.minutes + include Gitlab::Utils::StrongMemoize class_methods do - def counter_attribute(attribute) - counter_attributes << attribute + def counter_attribute(attribute, if: nil) + counter_attributes << { + attribute: attribute, + if_proc: binding.local_variable_get(:if) # can't read `if` directly + } end def counter_attributes - @counter_attributes ||= Set.new + @counter_attributes ||= [] end - def after_flush_callbacks - @after_flush_callbacks ||= [] + def after_commit_callbacks + @after_commit_callbacks ||= [] end - # perform registered callbacks after increments have been flushed to the database - def counter_attribute_after_flush(&callback) - after_flush_callbacks << callback - end - - def counter_attribute_enabled?(attribute) - counter_attributes.include?(attribute) + # perform registered callbacks after increments have been committed to the database + def counter_attribute_after_commit(&callback) + after_commit_callbacks << callback end end - # This method must only be called by FlushCounterIncrementsWorker - # because it should run asynchronously and with exclusive lease. - # This will - # 1. temporarily move the pending increment for a given attribute - # to a relative "flushed" Redis key, delete the increment key and return - # the value. If new increments are performed at this point, the increment - # key is recreated as part of `delayed_increment_counter`. - # The "flushed" key is used to ensure that we can keep incrementing - # counters in Redis while flushing existing values. - # 2. then the value is used to update the counter in the database. - # 3. finally the "flushed" key is deleted. - def flush_increments_to_database!(attribute) - lock_key = counter_lock_key(attribute) - - with_exclusive_lease(lock_key) do - previous_db_value = read_attribute(attribute) - increment_key = counter_key(attribute) - flushed_key = counter_flushed_key(attribute) - increment_value = steal_increments(increment_key, flushed_key) - new_db_value = nil - - next if increment_value == 0 - - transaction do - update_counters_with_lease({ attribute => increment_value }) - redis_state { |redis| redis.del(flushed_key) } - new_db_value = reset.read_attribute(attribute) - end + def counter_attribute_enabled?(attribute) + counter_attribute = self.class.counter_attributes.find { |registered| registered[:attribute] == attribute } + return false unless counter_attribute + return true unless counter_attribute[:if_proc] - execute_after_flush_callbacks + counter_attribute[:if_proc].call(self) + end - log_flush_counter(attribute, increment_value, previous_db_value, new_db_value) + def counter(attribute) + strong_memoize_with(:counter, attribute) do + # This needs #to_sym because attribute could come from a Sidekiq param, + # which would be a string. + build_counter_for(attribute.to_sym) end end - def delayed_increment_counter(attribute, increment) - raise ArgumentError, "#{attribute} is not a counter attribute" unless counter_attribute_enabled?(attribute) - + def increment_counter(attribute, increment) return if increment == 0 run_after_commit_or_now do - increment_counter(attribute, increment) - - FlushCounterIncrementsWorker.perform_in(WORKER_DELAY, self.class.name, self.id, attribute) - end - - true - end - - def increment_counter(attribute, increment) - if counter_attribute_enabled?(attribute) - new_value = redis_state do |redis| - redis.incrby(counter_key(attribute), increment) - end + new_value = counter(attribute).increment(increment) log_increment_counter(attribute, increment, new_value) end @@ -137,74 +104,33 @@ module CounterAttribute end def reset_counter!(attribute) - if counter_attribute_enabled?(attribute) - detect_race_on_record(log_fields: { caller: __method__, attributes: attribute }) do - update!(attribute => 0) - clear_counter!(attribute) - end - - log_clear_counter(attribute) + detect_race_on_record(log_fields: { caller: __method__, attributes: attribute }) do + counter(attribute).reset! end - end - def get_counter_value(attribute) - if counter_attribute_enabled?(attribute) - redis_state do |redis| - redis.get(counter_key(attribute)).to_i - end - end + log_clear_counter(attribute) end - def counter_key(attribute) - "project:{#{project_id}}:counters:#{self.class}:#{id}:#{attribute}" - end - - def counter_flushed_key(attribute) - counter_key(attribute) + ':flushed' - end - - def counter_lock_key(attribute) - counter_key(attribute) + ':lock' - end - - def counter_attribute_enabled?(attribute) - self.class.counter_attribute_enabled?(attribute) + def execute_after_commit_callbacks + self.class.after_commit_callbacks.each do |callback| + callback.call(self.reset) + end end private - def database_lock_key - "project:{#{project_id}}:#{self.class}:#{id}" - end - - def steal_increments(increment_key, flushed_key) - redis_state do |redis| - redis.eval(LUA_STEAL_INCREMENT_SCRIPT, keys: [increment_key, flushed_key]) - end - end + def build_counter_for(attribute) + raise ArgumentError, %(attribute "#{attribute}" does not exist) unless has_attribute?(attribute) - def clear_counter!(attribute) - redis_state do |redis| - redis.del(counter_key(attribute)) - end - end - - def execute_after_flush_callbacks - self.class.after_flush_callbacks.each do |callback| - callback.call(self) + if counter_attribute_enabled?(attribute) + Gitlab::Counters::BufferedCounter.new(self, attribute) + else + Gitlab::Counters::LegacyCounter.new(self, attribute) end end - def redis_state(&block) - Gitlab::Redis::SharedState.with(&block) - end - - def with_exclusive_lease(lock_key) - in_lock(lock_key, ttl: WORKER_LOCK_TTL) do - yield - end - rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError - # a worker is already updating the counters + def database_lock_key + "project:{#{project_id}}:#{self.class}:#{id}" end # detect_race_on_record uses a lease to monitor access @@ -258,19 +184,6 @@ module CounterAttribute Gitlab::AppLogger.info(payload) end - def log_flush_counter(attribute, increment, previous_db_value, new_db_value) - payload = Gitlab::ApplicationContext.current.merge( - message: 'Flush counter attribute to database', - attribute: attribute, - project_id: project_id, - increment: increment, - previous_db_value: previous_db_value, - new_db_value: new_db_value - ) - - Gitlab::AppLogger.info(payload) - end - def log_clear_counter(attribute) payload = Gitlab::ApplicationContext.current.merge( message: 'Clear counter attribute', |