diff options
Diffstat (limited to 'lib/gitlab/redis')
-rw-r--r-- | lib/gitlab/redis/cache.rb | 40 | ||||
-rw-r--r-- | lib/gitlab/redis/chat.rb | 13 | ||||
-rw-r--r-- | lib/gitlab/redis/cluster_cache.rb | 13 | ||||
-rw-r--r-- | lib/gitlab/redis/cluster_util.rb | 32 | ||||
-rw-r--r-- | lib/gitlab/redis/cross_slot.rb | 141 | ||||
-rw-r--r-- | lib/gitlab/redis/multi_store.rb | 16 | ||||
-rw-r--r-- | lib/gitlab/redis/rate_limiting.rb | 7 |
7 files changed, 242 insertions, 20 deletions
diff --git a/lib/gitlab/redis/cache.rb b/lib/gitlab/redis/cache.rb index ba3af3e7a6f..60944268f91 100644 --- a/lib/gitlab/redis/cache.rb +++ b/lib/gitlab/redis/cache.rb @@ -5,19 +5,35 @@ module Gitlab class Cache < ::Gitlab::Redis::Wrapper CACHE_NAMESPACE = 'cache:gitlab' - # Full list of options: - # https://api.rubyonrails.org/classes/ActiveSupport/Cache/RedisCacheStore.html#method-c-new - def self.active_support_config - { - redis: pool, - compress: Gitlab::Utils.to_boolean(ENV.fetch('ENABLE_REDIS_CACHE_COMPRESSION', '1')), - namespace: CACHE_NAMESPACE, - expires_in: default_ttl_seconds - } - end + class << self + # Full list of options: + # https://api.rubyonrails.org/classes/ActiveSupport/Cache/RedisCacheStore.html#method-c-new + def active_support_config + { + redis: pool, + compress: Gitlab::Utils.to_boolean(ENV.fetch('ENABLE_REDIS_CACHE_COMPRESSION', '1')), + namespace: CACHE_NAMESPACE, + expires_in: default_ttl_seconds + } + end + + def default_ttl_seconds + ENV.fetch('GITLAB_RAILS_CACHE_DEFAULT_TTL_SECONDS', 8.hours).to_i + end + + # Exposes redis for Peek adapter. To be removed after ClusterCache migration. + def multistore_redis + redis + end + + private + + def redis + primary_store = ::Redis.new(Gitlab::Redis::ClusterCache.params) + secondary_store = ::Redis.new(params) - def self.default_ttl_seconds - ENV.fetch('GITLAB_RAILS_CACHE_DEFAULT_TTL_SECONDS', 8.hours).to_i + MultiStore.new(primary_store, secondary_store, store_name) + end end end end diff --git a/lib/gitlab/redis/chat.rb b/lib/gitlab/redis/chat.rb new file mode 100644 index 00000000000..6f320fa6fc6 --- /dev/null +++ b/lib/gitlab/redis/chat.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true + +module Gitlab + module Redis + class Chat < ::Gitlab::Redis::Wrapper + class << self + def config_fallback + Cache + end + end + end + end +end diff --git a/lib/gitlab/redis/cluster_cache.rb b/lib/gitlab/redis/cluster_cache.rb new file mode 100644 index 00000000000..15a87739c6d --- /dev/null +++ b/lib/gitlab/redis/cluster_cache.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true + +module Gitlab + module Redis + class ClusterCache < ::Gitlab::Redis::Wrapper + class << self + def config_fallback + Cache + end + end + end + end +end diff --git a/lib/gitlab/redis/cluster_util.rb b/lib/gitlab/redis/cluster_util.rb new file mode 100644 index 00000000000..5f1f39b5237 --- /dev/null +++ b/lib/gitlab/redis/cluster_util.rb @@ -0,0 +1,32 @@ +# frozen_string_literal: true + +module Gitlab + module Redis + module ClusterUtil + class << self + # clusters? is used to select Redis command types, on `true`, the subsequent + # commands should be compatible with Redis Cluster. + # + # When working with MultiStore, if even 1 of 2 stores is a Redis::Cluster, + # we should err on the side of caution and return `true `, + def cluster?(obj) + if obj.is_a?(MultiStore) + cluster?(obj.primary_store) || cluster?(obj.secondary_store) + else + obj.respond_to?(:_client) && obj._client.is_a?(::Redis::Cluster) + end + end + + def batch_unlink(keys, redis) + expired_count = 0 + keys.each_slice(1000) do |subset| + expired_count += Gitlab::Redis::CrossSlot::Pipeline.new(redis).pipelined do |pipeline| + subset.each { |key| pipeline.unlink(key) } + end.sum + end + expired_count + end + end + end + end +end diff --git a/lib/gitlab/redis/cross_slot.rb b/lib/gitlab/redis/cross_slot.rb new file mode 100644 index 00000000000..e5aa6d9ce72 --- /dev/null +++ b/lib/gitlab/redis/cross_slot.rb @@ -0,0 +1,141 @@ +# frozen_string_literal: true + +module Gitlab + module Redis + module CrossSlot + class Router + attr_reader :node_mapping, :futures, :node_sequence, :cmd_queue + + delegate :respond_to_missing?, to: :@redis + + # This map contains redis-rb methods which does not map directly + # to a standard Redis command. It is used transform unsupported commands to standard commands + # to find the node key for unsupported commands. + # + # Redis::Cluster::Command only contains details of commands which the Redis Server + # returns. Hence, commands like mapped_hmget and hscan_each internally will call the + # base command, hmget and hscan respectively. + # + # See https://github.com/redis/redis-rb/blob/v4.8.0/lib/redis/cluster/command.rb + UNSUPPORTED_CMD_MAPPING = { + # Internally, redis-rb calls the supported Redis command and transforms the output. + # See https://github.com/redis/redis-rb/blob/v4.8.0/lib/redis/commands/hashes.rb#L104 + mapped_hmget: :hmget + }.freeze + + # Initializes the CrossSlot::Router + # @param {::Redis} + def initialize(redis) + @redis = redis + @node_mapping = {} + @futures = {} + @node_sequence = [] + @cmd_queue = [] + end + + # For now we intercept every redis.call and return a Gitlab-Future object. + # This method groups every commands to a node for fan-out. Commands are grouped using the first key. + # + # rubocop:disable Style/MissingRespondToMissing + def method_missing(cmd, *args, **kwargs, &blk) + # Note that we can re-map the command without affecting execution as it is + # solely for finding the node key. The original cmd will be executed. + node = @redis._client._find_node_key([UNSUPPORTED_CMD_MAPPING.fetch(cmd, cmd)] + args) + + @node_mapping[node] ||= [] + @futures[node] ||= [] + + @node_sequence << node + @node_mapping[node] << [cmd, args, kwargs || {}, blk] + f = Future.new + @futures[node] << f + @cmd_queue << [f, cmd, args, kwargs || {}, blk] + f + end + # rubocop:enable Style/MissingRespondToMissing + end + + # Wraps over redis-rb's Future in + # https://github.com/redis/redis-rb/blob/v4.8.0/lib/redis/pipeline.rb#L244 + class Future + def set(future, is_val = false) + @redis_future = future + @is_val = is_val + end + + def value + return @redis_val if @is_val + + @redis_future.value + end + end + + # Pipeline allows cross-slot pipelined to be called. The fan-out logic is implemented in + # https://github.com/redis-rb/redis-cluster-client/blob/master/lib/redis_client/cluster/pipeline.rb + # which is available in redis-rb v5.0. + # + # This file can be deprecated after redis-rb v4.8.0 is upgraded to v5.0 + class Pipeline + # Initializes the CrossSlot::Pipeline + # @param {::Redis} + def initialize(redis) + @redis = redis + end + + # pipelined is used in place of ::Redis `.pipelined` when running in a cluster context + # where cross-slot operations may happen. + def pipelined(&block) + # Directly call .pipelined and defer the pipeline execution to MultiStore. + # MultiStore could wrap over 0, 1, or 2 Redis Cluster clients, handling it here + # will not work for 2 clients since the key-slot topology can differ. + if use_cross_slot_pipelining? + router = Router.new(@redis) + yield router + execute_commands(router) + else + # use redis-rb's pipelined method + @redis.pipelined(&block) + end + end + + private + + def use_cross_slot_pipelining? + !@redis.instance_of?(::Gitlab::Redis::MultiStore) && @redis._client.instance_of?(::Redis::Cluster) + end + + def execute_commands(router) + router.node_mapping.each do |node_key, commands| + # TODO possibly use Threads to speed up but for now `n` is 3-5 which is small. + @redis.pipelined do |p| + commands.each_with_index do |command, idx| + future = router.futures[node_key][idx] + cmd, args, kwargs, blk = command + future.set(p.public_send(cmd, *args, **kwargs, &blk)) # rubocop:disable GitlabSecurity/PublicSend + end + end + end + + router.node_sequence.map do |node_key| + router.futures[node_key].shift.value + end + rescue ::Redis::CommandError => err + if err.message.start_with?('MOVED', 'ASK') + Gitlab::ErrorTracking.log_exception(err) + return execute_commands_sequentially(router) + end + + raise + end + + def execute_commands_sequentially(router) + router.cmd_queue.map do |command| + future, cmd, args, kwargs, blk = command + future.set(@redis.public_send(cmd, *args, **kwargs, &blk), true) # rubocop:disable GitlabSecurity/PublicSend + future.value + end + end + end + end + end +end diff --git a/lib/gitlab/redis/multi_store.rb b/lib/gitlab/redis/multi_store.rb index 9571e2f92e6..d36ef6b99ee 100644 --- a/lib/gitlab/redis/multi_store.rb +++ b/lib/gitlab/redis/multi_store.rb @@ -44,6 +44,7 @@ module Gitlab hscan_each mapped_hmget mget + scan scan_each scard sismember @@ -66,11 +67,14 @@ module Gitlab mapped_hmset rpush sadd + sadd? set setex setnx srem unlink + + memory ].freeze PIPELINED_COMMANDS = %i[ @@ -122,7 +126,7 @@ module Gitlab if use_primary_and_secondary_stores? pipelined_both(name, *args, **kwargs, &block) else - default_store.send(name, *args, **kwargs, &block) + send_command(default_store, name, *args, **kwargs, &block) end end end @@ -289,6 +293,16 @@ module Gitlab # rubocop:disable GitlabSecurity/PublicSend def send_command(redis_instance, command_name, *args, **kwargs, &block) + # Run wrapped pipeline for each instance individually so that the fan-out is distinct. + # If both primary and secondary are Redis Clusters, the slot-node distribution could + # be different. + # + # We ignore args and kwargs since `pipelined` does not accept arguments + # See https://github.com/redis/redis-rb/blob/v4.8.0/lib/redis.rb#L164 + if command_name.to_s == 'pipelined' && redis_instance._client.instance_of?(::Redis::Cluster) + return Gitlab::Redis::CrossSlot::Pipeline.new(redis_instance).pipelined(&block) + end + if block # Make sure that block is wrapped and executed only on the redis instance that is executing the block redis_instance.send(command_name, *args, **kwargs) do |*params| diff --git a/lib/gitlab/redis/rate_limiting.rb b/lib/gitlab/redis/rate_limiting.rb index 74b4ca12d18..30ec44b748d 100644 --- a/lib/gitlab/redis/rate_limiting.rb +++ b/lib/gitlab/redis/rate_limiting.rb @@ -3,18 +3,11 @@ module Gitlab module Redis class RateLimiting < ::Gitlab::Redis::Wrapper - # We create a subclass only for the purpose of differentiating between different stores in cache metrics - RateLimitingStore = Class.new(ActiveSupport::Cache::RedisCacheStore) - class << self # The data we store on RateLimiting used to be stored on Cache. def config_fallback Cache end - - def cache_store - @cache_store ||= RateLimitingStore.new(redis: pool, namespace: Cache::CACHE_NAMESPACE) - end end end end |