diff options
Diffstat (limited to 'workhorse/internal/redis/keywatcher.go')
-rw-r--r-- | workhorse/internal/redis/keywatcher.go | 198 |
1 files changed, 198 insertions, 0 deletions
diff --git a/workhorse/internal/redis/keywatcher.go b/workhorse/internal/redis/keywatcher.go new file mode 100644 index 00000000000..96e33a64b85 --- /dev/null +++ b/workhorse/internal/redis/keywatcher.go @@ -0,0 +1,198 @@ +package redis + +import ( + "fmt" + "strings" + "sync" + "time" + + "github.com/gomodule/redigo/redis" + "github.com/jpillora/backoff" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "gitlab.com/gitlab-org/labkit/log" + + "gitlab.com/gitlab-org/gitlab-workhorse/internal/helper" +) + +var ( + keyWatcher = make(map[string][]chan string) + keyWatcherMutex sync.Mutex + redisReconnectTimeout = backoff.Backoff{ + //These are the defaults + Min: 100 * time.Millisecond, + Max: 60 * time.Second, + Factor: 2, + Jitter: true, + } + keyWatchers = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "gitlab_workhorse_keywatcher_keywatchers", + Help: "The number of keys that is being watched by gitlab-workhorse", + }, + ) + totalMessages = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "gitlab_workhorse_keywatcher_total_messages", + Help: "How many messages gitlab-workhorse has received in total on pubsub.", + }, + ) +) + +const ( + keySubChannel = "workhorse:notifications" +) + +// KeyChan holds a key and a channel +type KeyChan struct { + Key string + Chan chan string +} + +func processInner(conn redis.Conn) error { + defer conn.Close() + psc := redis.PubSubConn{Conn: conn} + if err := psc.Subscribe(keySubChannel); err != nil { + return err + } + defer psc.Unsubscribe(keySubChannel) + + for { + switch v := psc.Receive().(type) { + case redis.Message: + totalMessages.Inc() + dataStr := string(v.Data) + msg := strings.SplitN(dataStr, "=", 2) + if len(msg) != 2 { + helper.LogError(nil, fmt.Errorf("keywatcher: invalid notification: %q", dataStr)) + continue + } + key, value := msg[0], msg[1] + notifyChanWatchers(key, value) + case error: + helper.LogError(nil, fmt.Errorf("keywatcher: pubsub receive: %v", v)) + // Intermittent error, return nil so that it doesn't wait before reconnect + return nil + } + } +} + +func dialPubSub(dialer redisDialerFunc) (redis.Conn, error) { + conn, err := dialer() + if err != nil { + return nil, err + } + + // Make sure Redis is actually connected + conn.Do("PING") + if err := conn.Err(); err != nil { + conn.Close() + return nil, err + } + + return conn, nil +} + +// Process redis subscriptions +// +// NOTE: There Can Only Be One! +func Process() { + log.Info("keywatcher: starting process loop") + for { + conn, err := dialPubSub(workerDialFunc) + if err != nil { + helper.LogError(nil, fmt.Errorf("keywatcher: %v", err)) + time.Sleep(redisReconnectTimeout.Duration()) + continue + } + redisReconnectTimeout.Reset() + + if err = processInner(conn); err != nil { + helper.LogError(nil, fmt.Errorf("keywatcher: process loop: %v", err)) + } + } +} + +func notifyChanWatchers(key, value string) { + keyWatcherMutex.Lock() + defer keyWatcherMutex.Unlock() + if chanList, ok := keyWatcher[key]; ok { + for _, c := range chanList { + c <- value + keyWatchers.Dec() + } + delete(keyWatcher, key) + } +} + +func addKeyChan(kc *KeyChan) { + keyWatcherMutex.Lock() + defer keyWatcherMutex.Unlock() + keyWatcher[kc.Key] = append(keyWatcher[kc.Key], kc.Chan) + keyWatchers.Inc() +} + +func delKeyChan(kc *KeyChan) { + keyWatcherMutex.Lock() + defer keyWatcherMutex.Unlock() + if chans, ok := keyWatcher[kc.Key]; ok { + for i, c := range chans { + if kc.Chan == c { + keyWatcher[kc.Key] = append(chans[:i], chans[i+1:]...) + keyWatchers.Dec() + break + } + } + if len(keyWatcher[kc.Key]) == 0 { + delete(keyWatcher, kc.Key) + } + } +} + +// WatchKeyStatus is used to tell how WatchKey returned +type WatchKeyStatus int + +const ( + // WatchKeyStatusTimeout is returned when the watch timeout provided by the caller was exceeded + WatchKeyStatusTimeout WatchKeyStatus = iota + // WatchKeyStatusAlreadyChanged is returned when the value passed by the caller was never observed + WatchKeyStatusAlreadyChanged + // WatchKeyStatusSeenChange is returned when we have seen the value passed by the caller get changed + WatchKeyStatusSeenChange + // WatchKeyStatusNoChange is returned when the function had to return before observing a change. + // Also returned on errors. + WatchKeyStatusNoChange +) + +// WatchKey waits for a key to be updated or expired +func WatchKey(key, value string, timeout time.Duration) (WatchKeyStatus, error) { + kw := &KeyChan{ + Key: key, + Chan: make(chan string, 1), + } + + addKeyChan(kw) + defer delKeyChan(kw) + + currentValue, err := GetString(key) + if err != nil { + return WatchKeyStatusNoChange, fmt.Errorf("keywatcher: redis GET: %v", err) + } + if currentValue != value { + return WatchKeyStatusAlreadyChanged, nil + } + + select { + case currentValue := <-kw.Chan: + if currentValue == "" { + return WatchKeyStatusNoChange, fmt.Errorf("keywatcher: redis GET failed") + } + if currentValue == value { + return WatchKeyStatusNoChange, nil + } + return WatchKeyStatusSeenChange, nil + + case <-time.After(timeout): + return WatchKeyStatusTimeout, nil + } +} |