diff options
Diffstat (limited to 'lib/gitlab/redis/multi_store.rb')
-rw-r--r-- | lib/gitlab/redis/multi_store.rb | 300 |
1 files changed, 300 insertions, 0 deletions
diff --git a/lib/gitlab/redis/multi_store.rb b/lib/gitlab/redis/multi_store.rb new file mode 100644 index 00000000000..24c540eea47 --- /dev/null +++ b/lib/gitlab/redis/multi_store.rb @@ -0,0 +1,300 @@ +# frozen_string_literal: true + +module Gitlab + module Redis + class MultiStore + include Gitlab::Utils::StrongMemoize + + class ReadFromPrimaryError < StandardError + def message + 'Value not found on the redis primary store. Read from the redis secondary store successful.' + end + end + class PipelinedDiffError < StandardError + def message + 'Pipelined command executed on both stores successfully but results differ between them.' + end + end + class MethodMissingError < StandardError + def message + 'Method missing. Falling back to execute method on the redis secondary store.' + end + end + + attr_reader :primary_store, :secondary_store, :instance_name + + FAILED_TO_READ_ERROR_MESSAGE = 'Failed to read from the redis primary_store.' + FAILED_TO_WRITE_ERROR_MESSAGE = 'Failed to write to the redis primary_store.' + FAILED_TO_RUN_PIPELINE = 'Failed to execute pipeline on the redis primary_store.' + + SKIP_LOG_METHOD_MISSING_FOR_COMMANDS = %i(info).freeze + + READ_COMMANDS = %i( + get + mget + smembers + scard + ).freeze + + WRITE_COMMANDS = %i( + set + setnx + setex + sadd + srem + del + flushdb + rpush + ).freeze + + PIPELINED_COMMANDS = %i( + pipelined + multi + ).freeze + + # To transition between two Redis store, `primary_store` should be the target store, + # and `secondary_store` should be the current store. Transition is controlled with feature flags: + # + # - At the default state, all read and write operations are executed in the secondary instance. + # - Turning use_primary_and_secondary_stores_for_<instance_name> on: The store writes to both instances. + # The read commands are executed in primary, but fallback to secondary. + # Other commands are executed in the the default instance (Secondary). + # - Turning use_primary_store_as_default_for_<instance_name> on: The behavior is the same as above, + # but other commands are executed in the primary now. + # - Turning use_primary_and_secondary_stores_for_<instance_name> off: commands are executed in the primary store. + def initialize(primary_store, secondary_store, instance_name) + @primary_store = primary_store + @secondary_store = secondary_store + @instance_name = instance_name + + validate_stores! + end + + # rubocop:disable GitlabSecurity/PublicSend + READ_COMMANDS.each do |name| + define_method(name) do |*args, &block| + if use_primary_and_secondary_stores? + read_command(name, *args, &block) + else + default_store.send(name, *args, &block) + end + end + end + + WRITE_COMMANDS.each do |name| + define_method(name) do |*args, **kwargs, &block| + if use_primary_and_secondary_stores? + write_command(name, *args, **kwargs, &block) + else + default_store.send(name, *args, **kwargs, &block) + end + end + end + + PIPELINED_COMMANDS.each do |name| + define_method(name) do |*args, **kwargs, &block| + if use_primary_and_secondary_stores? + pipelined_both(name, *args, **kwargs, &block) + else + default_store.send(name, *args, **kwargs, &block) + end + end + end + + def method_missing(...) + return @instance.send(...) if @instance + + log_method_missing(...) + + default_store.send(...) + end + # rubocop:enable GitlabSecurity/PublicSend + + def respond_to_missing?(command_name, include_private = false) + true + end + + # This is needed because of Redis::Rack::Connection is requiring Redis::Store + # https://github.com/redis-store/redis-rack/blob/a833086ba494083b6a384a1a4e58b36573a9165d/lib/redis/rack/connection.rb#L15 + # Done similarly in https://github.com/lsegal/yard/blob/main/lib/yard/templates/template.rb#L122 + def is_a?(klass) + return true if klass == default_store.class + + super(klass) + end + alias_method :kind_of?, :is_a? + + def to_s + use_primary_and_secondary_stores? ? primary_store.to_s : default_store.to_s + end + + def use_primary_and_secondary_stores? + feature_enabled?("use_primary_and_secondary_stores_for") + end + + def use_primary_store_as_default? + feature_enabled?("use_primary_store_as_default_for") + end + + def increment_pipelined_command_error_count(command_name) + @pipelined_command_error ||= Gitlab::Metrics.counter(:gitlab_redis_multi_store_pipelined_diff_error_total, + 'Redis MultiStore pipelined command diff between stores') + @pipelined_command_error.increment(command: command_name, instance_name: instance_name) + end + + def increment_read_fallback_count(command_name) + @read_fallback_counter ||= Gitlab::Metrics.counter(:gitlab_redis_multi_store_read_fallback_total, + 'Client side Redis MultiStore reading fallback') + @read_fallback_counter.increment(command: command_name, instance_name: instance_name) + end + + def increment_method_missing_count(command_name) + @method_missing_counter ||= Gitlab::Metrics.counter(:gitlab_redis_multi_store_method_missing_total, + 'Client side Redis MultiStore method missing') + @method_missing_counter.increment(command: command_name, instance_name: instance_name) + end + + def log_error(exception, command_name, extra = {}) + Gitlab::ErrorTracking.log_exception( + exception, + extra.merge(command_name: command_name, instance_name: instance_name)) + end + + private + + # @return [Boolean] + def feature_enabled?(prefix) + feature_table_exists? && + Feature.enabled?("#{prefix}_#{instance_name.underscore}") && + !same_redis_store? + end + + # @return [Boolean] + def feature_table_exists? + Feature::FlipperFeature.table_exists? + rescue StandardError + false + end + + def default_store + use_primary_store_as_default? ? primary_store : secondary_store + end + + def log_method_missing(command_name, *_args) + return if SKIP_LOG_METHOD_MISSING_FOR_COMMANDS.include?(command_name) + + log_error(MethodMissingError.new, command_name) + increment_method_missing_count(command_name) + end + + def read_command(command_name, *args, &block) + if @instance + send_command(@instance, command_name, *args, &block) + else + read_one_with_fallback(command_name, *args, &block) + end + end + + def write_command(command_name, *args, **kwargs, &block) + if @instance + send_command(@instance, command_name, *args, **kwargs, &block) + else + write_both(command_name, *args, **kwargs, &block) + end + end + + def read_one_with_fallback(command_name, *args, &block) + begin + value = send_command(primary_store, command_name, *args, &block) + rescue StandardError => e + log_error(e, command_name, + multi_store_error_message: FAILED_TO_READ_ERROR_MESSAGE) + end + + value || fallback_read(command_name, *args, &block) + end + + def fallback_read(command_name, *args, &block) + value = send_command(secondary_store, command_name, *args, &block) + + if value + log_error(ReadFromPrimaryError.new, command_name) + increment_read_fallback_count(command_name) + end + + value + end + + def write_both(command_name, *args, **kwargs, &block) + begin + send_command(primary_store, command_name, *args, **kwargs, &block) + rescue StandardError => e + log_error(e, command_name, + multi_store_error_message: FAILED_TO_WRITE_ERROR_MESSAGE) + end + + send_command(secondary_store, command_name, *args, **kwargs, &block) + end + + # Run the entire pipeline on both stores. We assume that `&block` is idempotent. + def pipelined_both(command_name, *args, **kwargs, &block) + begin + result_primary = send_command(primary_store, command_name, *args, **kwargs, &block) + rescue StandardError => e + log_error(e, command_name, multi_store_error_message: FAILED_TO_RUN_PIPELINE) + end + + result_secondary = send_command(secondary_store, command_name, *args, **kwargs, &block) + + # Pipelined commands return an array with all results. If they differ, + # log an error + if result_primary != result_secondary + log_error(PipelinedDiffError.new, command_name) + increment_pipelined_command_error_count(command_name) + end + + result_secondary + end + + def same_redis_store? + strong_memoize(:same_redis_store) do + # <Redis client v4.4.0 for redis:///path_to/redis/redis.socket/5>" + primary_store.inspect == secondary_store.inspect + end + end + + # rubocop:disable GitlabSecurity/PublicSend + def send_command(redis_instance, command_name, *args, **kwargs, &block) + if block_given? + # Make sure that block is wrapped and executed only on the redis instance that is executing the block + redis_instance.send(command_name, *args, **kwargs) do |*params| + with_instance(redis_instance, *params, &block) + end + else + redis_instance.send(command_name, *args, **kwargs) + end + end + # rubocop:enable GitlabSecurity/PublicSend + + def with_instance(instance, *params) + @instance = instance + + yield(*params) + ensure + @instance = nil + end + + def redis_store?(store) + store.is_a?(::Redis) || store.is_a?(::Redis::Namespace) + end + + def validate_stores! + raise ArgumentError, 'primary_store is required' unless primary_store + raise ArgumentError, 'secondary_store is required' unless secondary_store + raise ArgumentError, 'instance_name is required' unless instance_name + raise ArgumentError, 'invalid primary_store' unless redis_store?(primary_store) + raise ArgumentError, 'invalid secondary_store' unless redis_store?(secondary_store) + end + end + end +end |