Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'workhorse/internal/redis/keywatcher.go')
-rw-r--r--workhorse/internal/redis/keywatcher.go83
1 files changed, 41 insertions, 42 deletions
diff --git a/workhorse/internal/redis/keywatcher.go b/workhorse/internal/redis/keywatcher.go
index 8f1772a9195..ddb838121b7 100644
--- a/workhorse/internal/redis/keywatcher.go
+++ b/workhorse/internal/redis/keywatcher.go
@@ -8,10 +8,10 @@ import (
"sync"
"time"
- "github.com/gomodule/redigo/redis"
"github.com/jpillora/backoff"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
+ "github.com/redis/go-redis/v9"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/log"
)
@@ -21,7 +21,8 @@ type KeyWatcher struct {
subscribers map[string][]chan string
shutdown chan struct{}
reconnectBackoff backoff.Backoff
- conn *redis.PubSubConn
+ redisConn *redis.Client
+ conn *redis.PubSub
}
func NewKeyWatcher() *KeyWatcher {
@@ -74,12 +75,12 @@ const channelPrefix = "workhorse:notifications:"
func countAction(action string) { TotalActions.WithLabelValues(action).Add(1) }
-func (kw *KeyWatcher) receivePubSubStream(conn redis.Conn) error {
+func (kw *KeyWatcher) receivePubSubStream(ctx context.Context, pubsub *redis.PubSub) error {
kw.mu.Lock()
// We must share kw.conn with the goroutines that call SUBSCRIBE and
// UNSUBSCRIBE because Redis pubsub subscriptions are tied to the
// connection.
- kw.conn = &redis.PubSubConn{Conn: conn}
+ kw.conn = pubsub
kw.mu.Unlock()
defer func() {
@@ -100,51 +101,49 @@ func (kw *KeyWatcher) receivePubSubStream(conn redis.Conn) error {
}()
for {
- switch v := kw.conn.Receive().(type) {
- case redis.Message:
+ msg, err := kw.conn.Receive(ctx)
+ if err != nil {
+ log.WithError(fmt.Errorf("keywatcher: pubsub receive: %v", err)).Error()
+ return nil
+ }
+
+ switch msg := msg.(type) {
+ case *redis.Subscription:
+ RedisSubscriptions.Set(float64(msg.Count))
+ case *redis.Pong:
+ // Ignore.
+ case *redis.Message:
TotalMessages.Inc()
- ReceivedBytes.Add(float64(len(v.Data)))
- if strings.HasPrefix(v.Channel, channelPrefix) {
- kw.notifySubscribers(v.Channel[len(channelPrefix):], string(v.Data))
+ ReceivedBytes.Add(float64(len(msg.Payload)))
+ if strings.HasPrefix(msg.Channel, channelPrefix) {
+ kw.notifySubscribers(msg.Channel[len(channelPrefix):], string(msg.Payload))
}
- case redis.Subscription:
- RedisSubscriptions.Set(float64(v.Count))
- case error:
- log.WithError(fmt.Errorf("keywatcher: pubsub receive: %v", v)).Error()
- // Intermittent error, return nil so that it doesn't wait before reconnect
+ default:
+ log.WithError(fmt.Errorf("keywatcher: unknown: %T", msg)).Error()
return nil
}
}
}
-func dialPubSub(dialer redisDialerFunc) (redis.Conn, error) {
- conn, err := dialer()
- if err != nil {
- return nil, err
- }
-
- // Make sure Redis is actually connected
- conn.Do("PING")
- if err := conn.Err(); err != nil {
- conn.Close()
- return nil, err
- }
+func (kw *KeyWatcher) Process(client *redis.Client) {
+ log.Info("keywatcher: starting process loop")
- return conn, nil
-}
+ ctx := context.Background() // lint:allow context.Background
+ kw.mu.Lock()
+ kw.redisConn = client
+ kw.mu.Unlock()
-func (kw *KeyWatcher) Process() {
- log.Info("keywatcher: starting process loop")
for {
- conn, err := dialPubSub(workerDialFunc)
- if err != nil {
+ pubsub := client.Subscribe(ctx, []string{}...)
+ if err := pubsub.Ping(ctx); err != nil {
log.WithError(fmt.Errorf("keywatcher: %v", err)).Error()
time.Sleep(kw.reconnectBackoff.Duration())
continue
}
+
kw.reconnectBackoff.Reset()
- if err = kw.receivePubSubStream(conn); err != nil {
+ if err := kw.receivePubSubStream(ctx, pubsub); err != nil {
log.WithError(fmt.Errorf("keywatcher: receivePubSubStream: %v", err)).Error()
}
}
@@ -183,7 +182,7 @@ func (kw *KeyWatcher) notifySubscribers(key, value string) {
}
}
-func (kw *KeyWatcher) addSubscription(key string, notify chan string) error {
+func (kw *KeyWatcher) addSubscription(ctx context.Context, key string, notify chan string) error {
kw.mu.Lock()
defer kw.mu.Unlock()
@@ -196,7 +195,7 @@ func (kw *KeyWatcher) addSubscription(key string, notify chan string) error {
if len(kw.subscribers[key]) == 0 {
countAction("create-subscription")
- if err := kw.conn.Subscribe(channelPrefix + key); err != nil {
+ if err := kw.conn.Subscribe(ctx, channelPrefix+key); err != nil {
return err
}
}
@@ -210,7 +209,7 @@ func (kw *KeyWatcher) addSubscription(key string, notify chan string) error {
return nil
}
-func (kw *KeyWatcher) delSubscription(key string, notify chan string) {
+func (kw *KeyWatcher) delSubscription(ctx context.Context, key string, notify chan string) {
kw.mu.Lock()
defer kw.mu.Unlock()
@@ -232,7 +231,7 @@ func (kw *KeyWatcher) delSubscription(key string, notify chan string) {
delete(kw.subscribers, key)
countAction("delete-subscription")
if kw.conn != nil {
- kw.conn.Unsubscribe(channelPrefix + key)
+ kw.conn.Unsubscribe(ctx, channelPrefix+key)
}
}
}
@@ -252,15 +251,15 @@ const (
WatchKeyStatusNoChange
)
-func (kw *KeyWatcher) WatchKey(_ context.Context, key, value string, timeout time.Duration) (WatchKeyStatus, error) {
+func (kw *KeyWatcher) WatchKey(ctx context.Context, key, value string, timeout time.Duration) (WatchKeyStatus, error) {
notify := make(chan string, 1)
- if err := kw.addSubscription(key, notify); err != nil {
+ if err := kw.addSubscription(ctx, key, notify); err != nil {
return WatchKeyStatusNoChange, err
}
- defer kw.delSubscription(key, notify)
+ defer kw.delSubscription(ctx, key, notify)
- currentValue, err := GetString(key)
- if errors.Is(err, redis.ErrNil) {
+ currentValue, err := kw.redisConn.Get(ctx, key).Result()
+ if errors.Is(err, redis.Nil) {
currentValue = ""
} else if err != nil {
return WatchKeyStatusNoChange, fmt.Errorf("keywatcher: redis GET: %v", err)