Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGitLab Bot <gitlab-bot@gitlab.com>2022-08-19 18:11:58 +0300
committerGitLab Bot <gitlab-bot@gitlab.com>2022-08-19 18:11:58 +0300
commit4c083c816333ef903fe7c32f412eaa53d7b959d3 (patch)
tree199c0a0034a2620374a92a47762bf4a4c07be7ca /workhorse
parent35d5ae4e3de6444c02725b965ef59774d6256d8e (diff)
Add latest changes from gitlab-org/gitlab@master
Diffstat (limited to 'workhorse')
-rw-r--r--workhorse/internal/redis/keywatcher.go67
1 files changed, 48 insertions, 19 deletions
diff --git a/workhorse/internal/redis/keywatcher.go b/workhorse/internal/redis/keywatcher.go
index 82cb082f5f0..20e86daf5af 100644
--- a/workhorse/internal/redis/keywatcher.go
+++ b/workhorse/internal/redis/keywatcher.go
@@ -38,6 +38,19 @@ var (
Help: "How many messages gitlab-workhorse has received in total on pubsub.",
},
)
+ totalActions = promauto.NewCounterVec(
+ prometheus.CounterOpts{
+ Name: "gitlab_workhorse_keywatcher_actions_total",
+ Help: "Counts of various keywatcher actions",
+ },
+ []string{"action"},
+ )
+ receivedBytes = promauto.NewCounter(
+ prometheus.CounterOpts{
+ Name: "gitlab_workhorse_keywatcher_received_bytes_total",
+ Help: "How many bytes of messages gitlab-workhorse has received in total on pubsub.",
+ },
+ )
)
const (
@@ -50,6 +63,8 @@ type KeyChan struct {
Chan chan string
}
+func countAction(action string) { totalActions.WithLabelValues(action).Add(1) }
+
func processInner(conn redis.Conn) error {
defer conn.Close()
psc := redis.PubSubConn{Conn: conn}
@@ -63,13 +78,13 @@ func processInner(conn redis.Conn) error {
case redis.Message:
totalMessages.Inc()
dataStr := string(v.Data)
+ receivedBytes.Add(float64(len(dataStr)))
msg := strings.SplitN(dataStr, "=", 2)
if len(msg) != 2 {
log.WithError(fmt.Errorf("keywatcher: invalid notification: %q", dataStr)).Error()
continue
}
- key, value := msg[0], msg[1]
- notifyChanWatchers(key, value)
+ notifyChanWatchers(msg[0], msg[1])
case error:
log.WithError(fmt.Errorf("keywatcher: pubsub receive: %v", v)).Error()
// Intermittent error, return nil so that it doesn't wait before reconnect
@@ -131,37 +146,52 @@ func Shutdown() {
func notifyChanWatchers(key, value string) {
keyWatcherMutex.Lock()
defer keyWatcherMutex.Unlock()
- if chanList, ok := keyWatcher[key]; ok {
- for _, c := range chanList {
- c <- value
- keyWatchers.Dec()
- }
- delete(keyWatcher, key)
+
+ chanList, ok := keyWatcher[key]
+ if !ok {
+ countAction("drop-message")
+ return
+ }
+
+ countAction("deliver-message")
+ for _, c := range chanList {
+ c <- value
+ keyWatchers.Dec()
}
+ delete(keyWatcher, key)
}
func addKeyChan(kc *KeyChan) {
keyWatcherMutex.Lock()
defer keyWatcherMutex.Unlock()
+
keyWatcher[kc.Key] = append(keyWatcher[kc.Key], kc.Chan)
keyWatchers.Inc()
+ if len(keyWatcher[kc.Key]) == 1 {
+ countAction("create-subscription")
+ }
}
func delKeyChan(kc *KeyChan) {
keyWatcherMutex.Lock()
defer keyWatcherMutex.Unlock()
- if chans, ok := keyWatcher[kc.Key]; ok {
- for i, c := range chans {
- if kc.Chan == c {
- keyWatcher[kc.Key] = append(chans[:i], chans[i+1:]...)
- keyWatchers.Dec()
- break
- }
- }
- if len(keyWatcher[kc.Key]) == 0 {
- delete(keyWatcher, kc.Key)
+
+ chans, ok := keyWatcher[kc.Key]
+ if !ok {
+ return
+ }
+
+ for i, c := range chans {
+ if kc.Chan == c {
+ keyWatcher[kc.Key] = append(chans[:i], chans[i+1:]...)
+ keyWatchers.Dec()
+ break
}
}
+ if len(keyWatcher[kc.Key]) == 0 {
+ delete(keyWatcher, kc.Key)
+ countAction("delete-subscription")
+ }
}
// WatchKeyStatus is used to tell how WatchKey returned
@@ -211,7 +241,6 @@ func WatchKey(key, value string, timeout time.Duration) (WatchKeyStatus, error)
return WatchKeyStatusNoChange, nil
}
return WatchKeyStatusSeenChange, nil
-
case <-time.After(timeout):
return WatchKeyStatusTimeout, nil
}