1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
|
# frozen_string_literal: true
module Gitlab
module Redis
module CrossSlot
class Router
attr_reader :node_mapping, :futures, :node_sequence, :cmd_queue
delegate :respond_to_missing?, to: :@redis
# This map contains redis-rb methods which does not map directly
# to a standard Redis command. It is used transform unsupported commands to standard commands
# to find the node key for unsupported commands.
#
# Redis::Cluster::Command only contains details of commands which the Redis Server
# returns. Hence, commands like mapped_hmget and hscan_each internally will call the
# base command, hmget and hscan respectively.
#
# See https://github.com/redis/redis-rb/blob/v4.8.0/lib/redis/cluster/command.rb
UNSUPPORTED_CMD_MAPPING = {
# Internally, redis-rb calls the supported Redis command and transforms the output.
# See https://github.com/redis/redis-rb/blob/v4.8.0/lib/redis/commands/hashes.rb#L104
mapped_hmget: :hmget
}.freeze
# Initializes the CrossSlot::Router
# @param {::Redis}
def initialize(redis)
@redis = redis
@node_mapping = {}
@futures = {}
@node_sequence = []
@cmd_queue = []
end
# For now we intercept every redis.call and return a Gitlab-Future object.
# This method groups every commands to a node for fan-out. Commands are grouped using the first key.
#
# rubocop:disable Style/MissingRespondToMissing
def method_missing(cmd, *args, **kwargs, &blk)
# Note that we can re-map the command without affecting execution as it is
# solely for finding the node key. The original cmd will be executed.
node = @redis._client._find_node_key([UNSUPPORTED_CMD_MAPPING.fetch(cmd, cmd)] + args)
@node_mapping[node] ||= []
@futures[node] ||= []
@node_sequence << node
@node_mapping[node] << [cmd, args, kwargs || {}, blk]
f = Future.new
@futures[node] << f
@cmd_queue << [f, cmd, args, kwargs || {}, blk]
f
end
# rubocop:enable Style/MissingRespondToMissing
end
# Wraps over redis-rb's Future in
# https://github.com/redis/redis-rb/blob/v4.8.0/lib/redis/pipeline.rb#L244
class Future
def set(future, is_val = false)
@redis_future = future
@is_val = is_val
end
def value
return @redis_val if @is_val
@redis_future.value
end
end
# Pipeline allows cross-slot pipelined to be called. The fan-out logic is implemented in
# https://github.com/redis-rb/redis-cluster-client/blob/master/lib/redis_client/cluster/pipeline.rb
# which is available in redis-rb v5.0.
#
# This file can be deprecated after redis-rb v4.8.0 is upgraded to v5.0
class Pipeline
# Initializes the CrossSlot::Pipeline
# @param {::Redis}
def initialize(redis)
@redis = redis
end
# pipelined is used in place of ::Redis `.pipelined` when running in a cluster context
# where cross-slot operations may happen.
def pipelined(&block)
# Directly call .pipelined and defer the pipeline execution to MultiStore.
# MultiStore could wrap over 0, 1, or 2 Redis Cluster clients, handling it here
# will not work for 2 clients since the key-slot topology can differ.
if use_cross_slot_pipelining?
router = Router.new(@redis)
yield router
execute_commands(router)
else
# use redis-rb's pipelined method
@redis.pipelined(&block)
end
end
private
def use_cross_slot_pipelining?
!@redis.instance_of?(::Gitlab::Redis::MultiStore) && @redis._client.instance_of?(::Redis::Cluster)
end
def execute_commands(router)
router.node_mapping.each do |node_key, commands|
# TODO possibly use Threads to speed up but for now `n` is 3-5 which is small.
@redis.pipelined do |p|
commands.each_with_index do |command, idx|
future = router.futures[node_key][idx]
cmd, args, kwargs, blk = command
future.set(p.public_send(cmd, *args, **kwargs, &blk)) # rubocop:disable GitlabSecurity/PublicSend
end
end
end
router.node_sequence.map do |node_key|
router.futures[node_key].shift.value
end
rescue ::Redis::CommandError => err
if err.message.start_with?('MOVED', 'ASK')
Gitlab::ErrorTracking.log_exception(err)
return execute_commands_sequentially(router)
end
raise
end
def execute_commands_sequentially(router)
router.cmd_queue.map do |command|
future, cmd, args, kwargs, blk = command
future.set(@redis.public_send(cmd, *args, **kwargs, &blk), true) # rubocop:disable GitlabSecurity/PublicSend
future.value
end
end
end
end
end
end
|