Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGitLab Bot <gitlab-bot@gitlab.com>2022-09-16 12:11:45 +0300
committerGitLab Bot <gitlab-bot@gitlab.com>2022-09-16 12:11:45 +0300
commit6f9c158ef16de95f0f0505623e0b73086fadea35 (patch)
treeb962aa56d916c85283c094ba34a80f547d981d77 /workhorse
parent2800e6ea59112f31833f8241a9a2f04ae8f7faa1 (diff)
Add latest changes from gitlab-org/gitlab@master
Diffstat (limited to 'workhorse')
-rw-r--r--workhorse/internal/redis/keywatcher.go41
-rw-r--r--workhorse/internal/redis/keywatcher_test.go41
-rw-r--r--workhorse/main.go4
3 files changed, 16 insertions, 70 deletions
diff --git a/workhorse/internal/redis/keywatcher.go b/workhorse/internal/redis/keywatcher.go
index 9eba33a1733..cdf6ccd7e83 100644
--- a/workhorse/internal/redis/keywatcher.go
+++ b/workhorse/internal/redis/keywatcher.go
@@ -20,11 +20,10 @@ type KeyWatcher struct {
subscribers map[string][]chan string
shutdown chan struct{}
reconnectBackoff backoff.Backoff
- channelPerKey bool // TODO remove this field https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1902
conn *redis.PubSubConn
}
-func NewKeyWatcher(channelPerKey bool) *KeyWatcher {
+func NewKeyWatcher() *KeyWatcher {
return &KeyWatcher{
shutdown: make(chan struct{}),
reconnectBackoff: backoff.Backoff{
@@ -33,7 +32,6 @@ func NewKeyWatcher(channelPerKey bool) *KeyWatcher {
Factor: 2,
Jitter: true,
},
- channelPerKey: channelPerKey,
}
}
@@ -71,10 +69,7 @@ var (
)
)
-const (
- keySubChannel = "workhorse:notifications"
- channelPrefix = keySubChannel + ":"
-)
+const channelPrefix = "workhorse:notifications:"
func countAction(action string) { totalActions.WithLabelValues(action).Add(1) }
@@ -103,33 +98,13 @@ func (kw *KeyWatcher) receivePubSubStream(conn redis.Conn) error {
kw.subscribers = nil
}()
- if kw.channelPerKey {
- // Do not drink from firehose
- } else {
- // Do drink from firehose
- if err := kw.conn.Subscribe(keySubChannel); err != nil {
- return err
- }
- defer kw.conn.Unsubscribe(keySubChannel)
- }
-
for {
switch v := kw.conn.Receive().(type) {
case redis.Message:
totalMessages.Inc()
- dataStr := string(v.Data)
- receivedBytes.Add(float64(len(dataStr)))
+ receivedBytes.Add(float64(len(v.Data)))
if strings.HasPrefix(v.Channel, channelPrefix) {
- // v is a message on a per-key channel
- kw.notifySubscribers(v.Channel[len(channelPrefix):], dataStr)
- } else if v.Channel == keySubChannel {
- // v is a message on the firehose channel
- msg := strings.SplitN(dataStr, "=", 2)
- if len(msg) != 2 {
- log.WithError(fmt.Errorf("keywatcher: invalid notification: %q", dataStr)).Error()
- continue
- }
- kw.notifySubscribers(msg[0], msg[1])
+ kw.notifySubscribers(v.Channel[len(channelPrefix):], string(v.Data))
}
case redis.Subscription:
redisSubscriptions.Set(float64(v.Count))
@@ -220,10 +195,8 @@ func (kw *KeyWatcher) addSubscription(key string, notify chan string) error {
if len(kw.subscribers[key]) == 0 {
countAction("create-subscription")
- if kw.channelPerKey {
- if err := kw.conn.Subscribe(channelPrefix + key); err != nil {
- return err
- }
+ if err := kw.conn.Subscribe(channelPrefix + key); err != nil {
+ return err
}
}
@@ -257,7 +230,7 @@ func (kw *KeyWatcher) delSubscription(key string, notify chan string) {
if len(kw.subscribers[key]) == 0 {
delete(kw.subscribers, key)
countAction("delete-subscription")
- if kw.channelPerKey && kw.conn != nil {
+ if kw.conn != nil {
kw.conn.Unsubscribe(channelPrefix + key)
}
}
diff --git a/workhorse/internal/redis/keywatcher_test.go b/workhorse/internal/redis/keywatcher_test.go
index 92865d29417..bae49d81bb1 100644
--- a/workhorse/internal/redis/keywatcher_test.go
+++ b/workhorse/internal/redis/keywatcher_test.go
@@ -1,7 +1,6 @@
package redis
import (
- "fmt"
"sync"
"testing"
"time"
@@ -49,16 +48,10 @@ func (kw *KeyWatcher) processMessages(t *testing.T, numWatchers int, value strin
psc := redigomock.NewConn()
psc.ReceiveWait = true
- if kw.channelPerKey {
- channel := channelPrefix + runnerKey
- psc.Command("SUBSCRIBE", channel).Expect(createSubscribeMessage(channel))
- psc.Command("UNSUBSCRIBE", channel).Expect(createUnsubscribeMessage(channel))
- psc.AddSubscriptionMessage(createSubscriptionMessage(channel, value))
- } else {
- psc.Command("SUBSCRIBE", keySubChannel).Expect(createSubscribeMessage(keySubChannel))
- psc.Command("UNSUBSCRIBE", keySubChannel).Expect(createUnsubscribeMessage(keySubChannel))
- psc.AddSubscriptionMessage(createSubscriptionMessage(keySubChannel, runnerKey+"="+value))
- }
+ channel := channelPrefix + runnerKey
+ psc.Command("SUBSCRIBE", channel).Expect(createSubscribeMessage(channel))
+ psc.Command("UNSUBSCRIBE", channel).Expect(createUnsubscribeMessage(channel))
+ psc.AddSubscriptionMessage(createSubscriptionMessage(channel, value))
errC := make(chan error)
go func() { errC <- kw.receivePubSubStream(psc) }()
@@ -89,12 +82,6 @@ type keyChangeTestCase struct {
}
func TestKeyChangesInstantReturn(t *testing.T) {
- for _, v := range []bool{false, true} {
- t.Run(fmt.Sprintf("channelPerKey:%v", v), func(t *testing.T) { testKeyChangesInstantReturn(t, v) })
- }
-}
-
-func testKeyChangesInstantReturn(t *testing.T, channelPerKey bool) {
testCases := []keyChangeTestCase{
// WatchKeyStatusAlreadyChanged
{
@@ -140,7 +127,7 @@ func testKeyChangesInstantReturn(t *testing.T, channelPerKey bool) {
conn.Command("GET", runnerKey).Expect(tc.returnValue)
}
- kw := NewKeyWatcher(channelPerKey)
+ kw := NewKeyWatcher()
defer kw.Shutdown()
kw.conn = &redis.PubSubConn{Conn: redigomock.NewConn()}
@@ -153,12 +140,6 @@ func testKeyChangesInstantReturn(t *testing.T, channelPerKey bool) {
}
func TestKeyChangesWhenWatching(t *testing.T) {
- for _, v := range []bool{false, true} {
- t.Run(fmt.Sprintf("channelPerKey:%v", v), func(t *testing.T) { testKeyChangesWhenWatching(t, v) })
- }
-}
-
-func testKeyChangesWhenWatching(t *testing.T, channelPerKey bool) {
testCases := []keyChangeTestCase{
// WatchKeyStatusSeenChange
{
@@ -196,7 +177,7 @@ func testKeyChangesWhenWatching(t *testing.T, channelPerKey bool) {
conn.Command("GET", runnerKey).Expect(tc.returnValue)
}
- kw := NewKeyWatcher(channelPerKey)
+ kw := NewKeyWatcher()
defer kw.Shutdown()
wg := &sync.WaitGroup{}
@@ -219,12 +200,6 @@ func testKeyChangesWhenWatching(t *testing.T, channelPerKey bool) {
}
func TestKeyChangesParallel(t *testing.T) {
- for _, v := range []bool{false, true} {
- t.Run(fmt.Sprintf("channelPerKey:%v", v), func(t *testing.T) { testKeyChangesParallel(t, v) })
- }
-}
-
-func testKeyChangesParallel(t *testing.T, channelPerKey bool) {
testCases := []keyChangeTestCase{
{
desc: "massively parallel, sees change with key existing",
@@ -263,7 +238,7 @@ func testKeyChangesParallel(t *testing.T, channelPerKey bool) {
wg.Add(runTimes)
ready := make(chan struct{})
- kw := NewKeyWatcher(channelPerKey)
+ kw := NewKeyWatcher()
defer kw.Shutdown()
for i := 0; i < runTimes; i++ {
@@ -287,7 +262,7 @@ func TestShutdown(t *testing.T) {
conn, td := setupMockPool()
defer td()
- kw := NewKeyWatcher(false)
+ kw := NewKeyWatcher()
kw.conn = &redis.PubSubConn{Conn: redigomock.NewConn()}
defer kw.Shutdown()
diff --git a/workhorse/main.go b/workhorse/main.go
index 027575747f4..b0f9760b0d5 100644
--- a/workhorse/main.go
+++ b/workhorse/main.go
@@ -220,9 +220,7 @@ func run(boot bootConfig, cfg config.Config) error {
secret.SetPath(boot.secretPath)
- keyWatcher := redis.NewKeyWatcher(
- os.Getenv("GITLAB_WORKHORSE_REDIS_SUBSCRIBE_MANY") == "1",
- )
+ keyWatcher := redis.NewKeyWatcher()
if cfg.Redis != nil {
redis.Configure(cfg.Redis, redis.DefaultDialFunc)
go keyWatcher.Process()