Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'app/models/concerns/counter_attribute.rb')
-rw-r--r--app/models/concerns/counter_attribute.rb201
1 files changed, 57 insertions, 144 deletions
diff --git a/app/models/concerns/counter_attribute.rb b/app/models/concerns/counter_attribute.rb
index 03e062a9855..f1efbba67e1 100644
--- a/app/models/concerns/counter_attribute.rb
+++ b/app/models/concerns/counter_attribute.rb
@@ -17,14 +17,29 @@
# counter_attribute :storage_size
# end
#
+# It's possible to define a conditional counter attribute. You need to pass a proc
+# that must accept a single argument, the object instance on which this concern is
+# included.
+#
+# @example:
+#
+# class ProjectStatistics
+# include CounterAttribute
+#
+# counter_attribute :conditional_one, if: -> { |object| object.use_counter_attribute? }
+# end
+#
# To increment the counter we can use the method:
-# delayed_increment_counter(:commit_count, 3)
+# increment_counter(:commit_count, 3)
+#
+# This method would determine whether it would increment the counter using Redis,
+# or fallback to legacy increment on ActiveRecord counters.
#
# It is possible to register callbacks to be executed after increments have
# been flushed to the database. Callbacks are not executed if there are no increments
# to flush.
#
-# counter_attribute_after_flush do |statistic|
+# counter_attribute_after_commit do |statistic|
# Namespaces::ScheduleAggregationWorker.perform_async(statistic.namespace_id)
# end
#
@@ -32,99 +47,51 @@ module CounterAttribute
extend ActiveSupport::Concern
extend AfterCommitQueue
include Gitlab::ExclusiveLeaseHelpers
-
- LUA_STEAL_INCREMENT_SCRIPT = <<~EOS
- local increment_key, flushed_key = KEYS[1], KEYS[2]
- local increment_value = redis.call("get", increment_key) or 0
- local flushed_value = redis.call("incrby", flushed_key, increment_value)
- if flushed_value == 0 then
- redis.call("del", increment_key, flushed_key)
- else
- redis.call("del", increment_key)
- end
- return flushed_value
- EOS
-
- WORKER_DELAY = 10.minutes
- WORKER_LOCK_TTL = 10.minutes
+ include Gitlab::Utils::StrongMemoize
class_methods do
- def counter_attribute(attribute)
- counter_attributes << attribute
+ def counter_attribute(attribute, if: nil)
+ counter_attributes << {
+ attribute: attribute,
+ if_proc: binding.local_variable_get(:if) # can't read `if` directly
+ }
end
def counter_attributes
- @counter_attributes ||= Set.new
+ @counter_attributes ||= []
end
- def after_flush_callbacks
- @after_flush_callbacks ||= []
+ def after_commit_callbacks
+ @after_commit_callbacks ||= []
end
- # perform registered callbacks after increments have been flushed to the database
- def counter_attribute_after_flush(&callback)
- after_flush_callbacks << callback
- end
-
- def counter_attribute_enabled?(attribute)
- counter_attributes.include?(attribute)
+ # perform registered callbacks after increments have been committed to the database
+ def counter_attribute_after_commit(&callback)
+ after_commit_callbacks << callback
end
end
- # This method must only be called by FlushCounterIncrementsWorker
- # because it should run asynchronously and with exclusive lease.
- # This will
- # 1. temporarily move the pending increment for a given attribute
- # to a relative "flushed" Redis key, delete the increment key and return
- # the value. If new increments are performed at this point, the increment
- # key is recreated as part of `delayed_increment_counter`.
- # The "flushed" key is used to ensure that we can keep incrementing
- # counters in Redis while flushing existing values.
- # 2. then the value is used to update the counter in the database.
- # 3. finally the "flushed" key is deleted.
- def flush_increments_to_database!(attribute)
- lock_key = counter_lock_key(attribute)
-
- with_exclusive_lease(lock_key) do
- previous_db_value = read_attribute(attribute)
- increment_key = counter_key(attribute)
- flushed_key = counter_flushed_key(attribute)
- increment_value = steal_increments(increment_key, flushed_key)
- new_db_value = nil
-
- next if increment_value == 0
-
- transaction do
- update_counters_with_lease({ attribute => increment_value })
- redis_state { |redis| redis.del(flushed_key) }
- new_db_value = reset.read_attribute(attribute)
- end
+ def counter_attribute_enabled?(attribute)
+ counter_attribute = self.class.counter_attributes.find { |registered| registered[:attribute] == attribute }
+ return false unless counter_attribute
+ return true unless counter_attribute[:if_proc]
- execute_after_flush_callbacks
+ counter_attribute[:if_proc].call(self)
+ end
- log_flush_counter(attribute, increment_value, previous_db_value, new_db_value)
+ def counter(attribute)
+ strong_memoize_with(:counter, attribute) do
+ # This needs #to_sym because attribute could come from a Sidekiq param,
+ # which would be a string.
+ build_counter_for(attribute.to_sym)
end
end
- def delayed_increment_counter(attribute, increment)
- raise ArgumentError, "#{attribute} is not a counter attribute" unless counter_attribute_enabled?(attribute)
-
+ def increment_counter(attribute, increment)
return if increment == 0
run_after_commit_or_now do
- increment_counter(attribute, increment)
-
- FlushCounterIncrementsWorker.perform_in(WORKER_DELAY, self.class.name, self.id, attribute)
- end
-
- true
- end
-
- def increment_counter(attribute, increment)
- if counter_attribute_enabled?(attribute)
- new_value = redis_state do |redis|
- redis.incrby(counter_key(attribute), increment)
- end
+ new_value = counter(attribute).increment(increment)
log_increment_counter(attribute, increment, new_value)
end
@@ -137,74 +104,33 @@ module CounterAttribute
end
def reset_counter!(attribute)
- if counter_attribute_enabled?(attribute)
- detect_race_on_record(log_fields: { caller: __method__, attributes: attribute }) do
- update!(attribute => 0)
- clear_counter!(attribute)
- end
-
- log_clear_counter(attribute)
+ detect_race_on_record(log_fields: { caller: __method__, attributes: attribute }) do
+ counter(attribute).reset!
end
- end
- def get_counter_value(attribute)
- if counter_attribute_enabled?(attribute)
- redis_state do |redis|
- redis.get(counter_key(attribute)).to_i
- end
- end
+ log_clear_counter(attribute)
end
- def counter_key(attribute)
- "project:{#{project_id}}:counters:#{self.class}:#{id}:#{attribute}"
- end
-
- def counter_flushed_key(attribute)
- counter_key(attribute) + ':flushed'
- end
-
- def counter_lock_key(attribute)
- counter_key(attribute) + ':lock'
- end
-
- def counter_attribute_enabled?(attribute)
- self.class.counter_attribute_enabled?(attribute)
+ def execute_after_commit_callbacks
+ self.class.after_commit_callbacks.each do |callback|
+ callback.call(self.reset)
+ end
end
private
- def database_lock_key
- "project:{#{project_id}}:#{self.class}:#{id}"
- end
-
- def steal_increments(increment_key, flushed_key)
- redis_state do |redis|
- redis.eval(LUA_STEAL_INCREMENT_SCRIPT, keys: [increment_key, flushed_key])
- end
- end
+ def build_counter_for(attribute)
+ raise ArgumentError, %(attribute "#{attribute}" does not exist) unless has_attribute?(attribute)
- def clear_counter!(attribute)
- redis_state do |redis|
- redis.del(counter_key(attribute))
- end
- end
-
- def execute_after_flush_callbacks
- self.class.after_flush_callbacks.each do |callback|
- callback.call(self)
+ if counter_attribute_enabled?(attribute)
+ Gitlab::Counters::BufferedCounter.new(self, attribute)
+ else
+ Gitlab::Counters::LegacyCounter.new(self, attribute)
end
end
- def redis_state(&block)
- Gitlab::Redis::SharedState.with(&block)
- end
-
- def with_exclusive_lease(lock_key)
- in_lock(lock_key, ttl: WORKER_LOCK_TTL) do
- yield
- end
- rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError
- # a worker is already updating the counters
+ def database_lock_key
+ "project:{#{project_id}}:#{self.class}:#{id}"
end
# detect_race_on_record uses a lease to monitor access
@@ -258,19 +184,6 @@ module CounterAttribute
Gitlab::AppLogger.info(payload)
end
- def log_flush_counter(attribute, increment, previous_db_value, new_db_value)
- payload = Gitlab::ApplicationContext.current.merge(
- message: 'Flush counter attribute to database',
- attribute: attribute,
- project_id: project_id,
- increment: increment,
- previous_db_value: previous_db_value,
- new_db_value: new_db_value
- )
-
- Gitlab::AppLogger.info(payload)
- end
-
def log_clear_counter(attribute)
payload = Gitlab::ApplicationContext.current.merge(
message: 'Clear counter attribute',