diff options
Diffstat (limited to 'workhorse/internal/redis/keywatcher.go')
-rw-r--r-- | workhorse/internal/redis/keywatcher.go | 230 |
1 files changed, 149 insertions, 81 deletions
diff --git a/workhorse/internal/redis/keywatcher.go b/workhorse/internal/redis/keywatcher.go index 82cb082f5f0..cdf6ccd7e83 100644 --- a/workhorse/internal/redis/keywatcher.go +++ b/workhorse/internal/redis/keywatcher.go @@ -15,61 +15,99 @@ import ( "gitlab.com/gitlab-org/gitlab/workhorse/internal/log" ) -var ( - keyWatcher = make(map[string][]chan string) - keyWatcherMutex sync.Mutex - shutdown = make(chan struct{}) - redisReconnectTimeout = backoff.Backoff{ - //These are the defaults - Min: 100 * time.Millisecond, - Max: 60 * time.Second, - Factor: 2, - Jitter: true, +type KeyWatcher struct { + mu sync.Mutex + subscribers map[string][]chan string + shutdown chan struct{} + reconnectBackoff backoff.Backoff + conn *redis.PubSubConn +} + +func NewKeyWatcher() *KeyWatcher { + return &KeyWatcher{ + shutdown: make(chan struct{}), + reconnectBackoff: backoff.Backoff{ + Min: 100 * time.Millisecond, + Max: 60 * time.Second, + Factor: 2, + Jitter: true, + }, } +} + +var ( keyWatchers = promauto.NewGauge( prometheus.GaugeOpts{ Name: "gitlab_workhorse_keywatcher_keywatchers", Help: "The number of keys that is being watched by gitlab-workhorse", }, ) + redisSubscriptions = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "gitlab_workhorse_keywatcher_redis_subscriptions", + Help: "Current number of keywatcher Redis pubsub subscriptions", + }, + ) totalMessages = promauto.NewCounter( prometheus.CounterOpts{ Name: "gitlab_workhorse_keywatcher_total_messages", Help: "How many messages gitlab-workhorse has received in total on pubsub.", }, ) + totalActions = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "gitlab_workhorse_keywatcher_actions_total", + Help: "Counts of various keywatcher actions", + }, + []string{"action"}, + ) + receivedBytes = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "gitlab_workhorse_keywatcher_received_bytes_total", + Help: "How many bytes of messages gitlab-workhorse has received in total on pubsub.", + }, + ) ) -const ( - keySubChannel = "workhorse:notifications" -) +const channelPrefix = "workhorse:notifications:" -// KeyChan holds a key and a channel -type KeyChan struct { - Key string - Chan chan string -} +func countAction(action string) { totalActions.WithLabelValues(action).Add(1) } -func processInner(conn redis.Conn) error { - defer conn.Close() - psc := redis.PubSubConn{Conn: conn} - if err := psc.Subscribe(keySubChannel); err != nil { - return err - } - defer psc.Unsubscribe(keySubChannel) +func (kw *KeyWatcher) receivePubSubStream(conn redis.Conn) error { + kw.mu.Lock() + // We must share kw.conn with the goroutines that call SUBSCRIBE and + // UNSUBSCRIBE because Redis pubsub subscriptions are tied to the + // connection. + kw.conn = &redis.PubSubConn{Conn: conn} + kw.mu.Unlock() + + defer func() { + kw.mu.Lock() + defer kw.mu.Unlock() + kw.conn.Close() + kw.conn = nil + + // Reset kw.subscribers because it is tied to Redis server side state of + // kw.conn and we just closed that connection. + for _, chans := range kw.subscribers { + for _, ch := range chans { + close(ch) + keyWatchers.Dec() + } + } + kw.subscribers = nil + }() for { - switch v := psc.Receive().(type) { + switch v := kw.conn.Receive().(type) { case redis.Message: totalMessages.Inc() - dataStr := string(v.Data) - msg := strings.SplitN(dataStr, "=", 2) - if len(msg) != 2 { - log.WithError(fmt.Errorf("keywatcher: invalid notification: %q", dataStr)).Error() - continue + receivedBytes.Add(float64(len(v.Data))) + if strings.HasPrefix(v.Channel, channelPrefix) { + kw.notifySubscribers(v.Channel[len(channelPrefix):], string(v.Data)) } - key, value := msg[0], msg[1] - notifyChanWatchers(key, value) + case redis.Subscription: + redisSubscriptions.Set(float64(v.Count)) case error: log.WithError(fmt.Errorf("keywatcher: pubsub receive: %v", v)).Error() // Intermittent error, return nil so that it doesn't wait before reconnect @@ -94,72 +132,106 @@ func dialPubSub(dialer redisDialerFunc) (redis.Conn, error) { return conn, nil } -// Process redis subscriptions -// -// NOTE: There Can Only Be One! -func Process() { +func (kw *KeyWatcher) Process() { log.Info("keywatcher: starting process loop") for { conn, err := dialPubSub(workerDialFunc) if err != nil { log.WithError(fmt.Errorf("keywatcher: %v", err)).Error() - time.Sleep(redisReconnectTimeout.Duration()) + time.Sleep(kw.reconnectBackoff.Duration()) continue } - redisReconnectTimeout.Reset() + kw.reconnectBackoff.Reset() - if err = processInner(conn); err != nil { - log.WithError(fmt.Errorf("keywatcher: process loop: %v", err)).Error() + if err = kw.receivePubSubStream(conn); err != nil { + log.WithError(fmt.Errorf("keywatcher: receivePubSubStream: %v", err)).Error() } } } -func Shutdown() { +func (kw *KeyWatcher) Shutdown() { log.Info("keywatcher: shutting down") - keyWatcherMutex.Lock() - defer keyWatcherMutex.Unlock() + kw.mu.Lock() + defer kw.mu.Unlock() select { - case <-shutdown: + case <-kw.shutdown: // already closed default: - close(shutdown) + close(kw.shutdown) } } -func notifyChanWatchers(key, value string) { - keyWatcherMutex.Lock() - defer keyWatcherMutex.Unlock() - if chanList, ok := keyWatcher[key]; ok { - for _, c := range chanList { - c <- value - keyWatchers.Dec() +func (kw *KeyWatcher) notifySubscribers(key, value string) { + kw.mu.Lock() + defer kw.mu.Unlock() + + chanList, ok := kw.subscribers[key] + if !ok { + countAction("drop-message") + return + } + + countAction("deliver-message") + for _, c := range chanList { + select { + case c <- value: + default: } - delete(keyWatcher, key) } } -func addKeyChan(kc *KeyChan) { - keyWatcherMutex.Lock() - defer keyWatcherMutex.Unlock() - keyWatcher[kc.Key] = append(keyWatcher[kc.Key], kc.Chan) +func (kw *KeyWatcher) addSubscription(key string, notify chan string) error { + kw.mu.Lock() + defer kw.mu.Unlock() + + if kw.conn == nil { + // This can happen because CI long polling is disabled in this Workhorse + // process. It can also be that we are waiting for the pubsub connection + // to be established. Either way it is OK to fail fast. + return errors.New("no redis connection") + } + + if len(kw.subscribers[key]) == 0 { + countAction("create-subscription") + if err := kw.conn.Subscribe(channelPrefix + key); err != nil { + return err + } + } + + if kw.subscribers == nil { + kw.subscribers = make(map[string][]chan string) + } + kw.subscribers[key] = append(kw.subscribers[key], notify) keyWatchers.Inc() + + return nil } -func delKeyChan(kc *KeyChan) { - keyWatcherMutex.Lock() - defer keyWatcherMutex.Unlock() - if chans, ok := keyWatcher[kc.Key]; ok { - for i, c := range chans { - if kc.Chan == c { - keyWatcher[kc.Key] = append(chans[:i], chans[i+1:]...) - keyWatchers.Dec() - break - } +func (kw *KeyWatcher) delSubscription(key string, notify chan string) { + kw.mu.Lock() + defer kw.mu.Unlock() + + chans, ok := kw.subscribers[key] + if !ok { + // This can happen if the pubsub connection dropped while we were + // waiting. + return + } + + for i, c := range chans { + if notify == c { + kw.subscribers[key] = append(chans[:i], chans[i+1:]...) + keyWatchers.Dec() + break } - if len(keyWatcher[kc.Key]) == 0 { - delete(keyWatcher, kc.Key) + } + if len(kw.subscribers[key]) == 0 { + delete(kw.subscribers, key) + countAction("delete-subscription") + if kw.conn != nil { + kw.conn.Unsubscribe(channelPrefix + key) } } } @@ -179,15 +251,12 @@ const ( WatchKeyStatusNoChange ) -// WatchKey waits for a key to be updated or expired -func WatchKey(key, value string, timeout time.Duration) (WatchKeyStatus, error) { - kw := &KeyChan{ - Key: key, - Chan: make(chan string, 1), +func (kw *KeyWatcher) WatchKey(key, value string, timeout time.Duration) (WatchKeyStatus, error) { + notify := make(chan string, 1) + if err := kw.addSubscription(key, notify); err != nil { + return WatchKeyStatusNoChange, err } - - addKeyChan(kw) - defer delKeyChan(kw) + defer kw.delSubscription(key, notify) currentValue, err := GetString(key) if errors.Is(err, redis.ErrNil) { @@ -200,10 +269,10 @@ func WatchKey(key, value string, timeout time.Duration) (WatchKeyStatus, error) } select { - case <-shutdown: + case <-kw.shutdown: log.WithFields(log.Fields{"key": key}).Info("stopping watch due to shutdown") return WatchKeyStatusNoChange, nil - case currentValue := <-kw.Chan: + case currentValue := <-notify: if currentValue == "" { return WatchKeyStatusNoChange, fmt.Errorf("keywatcher: redis GET failed") } @@ -211,7 +280,6 @@ func WatchKey(key, value string, timeout time.Duration) (WatchKeyStatus, error) return WatchKeyStatusNoChange, nil } return WatchKeyStatusSeenChange, nil - case <-time.After(timeout): return WatchKeyStatusTimeout, nil } |