Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGitLab Bot <gitlab-bot@gitlab.com>2023-12-01 03:11:54 +0300
committerGitLab Bot <gitlab-bot@gitlab.com>2023-12-01 03:11:54 +0300
commit08775893a80e4024d9b30bd1a17caff7ecb274f9 (patch)
tree1a28b448a4361ac8ebf7cfebdf8af5645c2b3b4c /workhorse
parentab37c8f6370868a8316992745589167517d422b7 (diff)
Add latest changes from gitlab-org/gitlab@master
Diffstat (limited to 'workhorse')
-rw-r--r--workhorse/internal/redis/keywatcher.go12
-rw-r--r--workhorse/internal/redis/keywatcher_test.go64
-rw-r--r--workhorse/internal/redis/redis.go12
-rw-r--r--workhorse/internal/redis/redis_test.go20
-rw-r--r--workhorse/main.go9
5 files changed, 55 insertions, 62 deletions
diff --git a/workhorse/internal/redis/keywatcher.go b/workhorse/internal/redis/keywatcher.go
index ddb838121b7..efc14f1cb5e 100644
--- a/workhorse/internal/redis/keywatcher.go
+++ b/workhorse/internal/redis/keywatcher.go
@@ -21,11 +21,11 @@ type KeyWatcher struct {
subscribers map[string][]chan string
shutdown chan struct{}
reconnectBackoff backoff.Backoff
- redisConn *redis.Client
+ redisConn *redis.Client // can be nil
conn *redis.PubSub
}
-func NewKeyWatcher() *KeyWatcher {
+func NewKeyWatcher(redisConn *redis.Client) *KeyWatcher {
return &KeyWatcher{
shutdown: make(chan struct{}),
reconnectBackoff: backoff.Backoff{
@@ -34,6 +34,7 @@ func NewKeyWatcher() *KeyWatcher {
Factor: 2,
Jitter: true,
},
+ redisConn: redisConn,
}
}
@@ -125,16 +126,13 @@ func (kw *KeyWatcher) receivePubSubStream(ctx context.Context, pubsub *redis.Pub
}
}
-func (kw *KeyWatcher) Process(client *redis.Client) {
+func (kw *KeyWatcher) Process() {
log.Info("keywatcher: starting process loop")
ctx := context.Background() // lint:allow context.Background
- kw.mu.Lock()
- kw.redisConn = client
- kw.mu.Unlock()
for {
- pubsub := client.Subscribe(ctx, []string{}...)
+ pubsub := kw.redisConn.Subscribe(ctx, []string{}...)
if err := pubsub.Ping(ctx); err != nil {
log.WithError(fmt.Errorf("keywatcher: %v", err)).Error()
time.Sleep(kw.reconnectBackoff.Duration())
diff --git a/workhorse/internal/redis/keywatcher_test.go b/workhorse/internal/redis/keywatcher_test.go
index bca4ca43a64..0e3278f2d26 100644
--- a/workhorse/internal/redis/keywatcher_test.go
+++ b/workhorse/internal/redis/keywatcher_test.go
@@ -7,6 +7,8 @@ import (
"testing"
"time"
+ "github.com/redis/go-redis/v9"
+ "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
@@ -18,24 +20,28 @@ const (
runnerKey = "runner:build_queue:10"
)
-func initRdb() {
- buf, _ := os.ReadFile("../../config.toml")
- cfg, _ := config.LoadConfig(string(buf))
- Configure(cfg.Redis)
+func initRdb(t *testing.T) *redis.Client {
+ buf, err := os.ReadFile("../../config.toml")
+ require.NoError(t, err)
+ cfg, err := config.LoadConfig(string(buf))
+ require.NoError(t, err)
+ rdb, err := Configure(cfg.Redis)
+ require.NoError(t, err)
+ t.Cleanup(func() {
+ assert.NoError(t, rdb.Close())
+ })
+ return rdb
}
-func (kw *KeyWatcher) countSubscribers(key string) int {
+func countSubscribers(kw *KeyWatcher, key string) int {
kw.mu.Lock()
defer kw.mu.Unlock()
return len(kw.subscribers[key])
}
// Forces a run of the `Process` loop against a mock PubSubConn.
-func (kw *KeyWatcher) processMessages(t *testing.T, numWatchers int, value string, ready chan<- struct{}, wg *sync.WaitGroup) {
- kw.mu.Lock()
- kw.redisConn = rdb
+func processMessages(t *testing.T, kw *KeyWatcher, numWatchers int, value string, ready chan<- struct{}, wg *sync.WaitGroup) {
psc := kw.redisConn.Subscribe(ctx, []string{}...)
- kw.mu.Unlock()
errC := make(chan error)
go func() { errC <- kw.receivePubSubStream(ctx, psc) }()
@@ -48,7 +54,7 @@ func (kw *KeyWatcher) processMessages(t *testing.T, numWatchers int, value strin
close(ready)
require.Eventually(t, func() bool {
- return kw.countSubscribers(runnerKey) == numWatchers
+ return countSubscribers(kw, runnerKey) == numWatchers
}, time.Second, time.Millisecond)
// send message after listeners are ready
@@ -74,7 +80,7 @@ type keyChangeTestCase struct {
}
func TestKeyChangesInstantReturn(t *testing.T) {
- initRdb()
+ rdb := initRdb(t)
testCases := []keyChangeTestCase{
// WatchKeyStatusAlreadyChanged
@@ -118,13 +124,10 @@ func TestKeyChangesInstantReturn(t *testing.T) {
rdb.Set(ctx, runnerKey, tc.returnValue, 0)
}
- defer func() {
- rdb.FlushDB(ctx)
- }()
+ defer rdb.FlushDB(ctx)
- kw := NewKeyWatcher()
+ kw := NewKeyWatcher(rdb)
defer kw.Shutdown()
- kw.redisConn = rdb
kw.conn = kw.redisConn.Subscribe(ctx, []string{}...)
val, err := kw.WatchKey(ctx, runnerKey, tc.watchValue, tc.timeout)
@@ -136,7 +139,7 @@ func TestKeyChangesInstantReturn(t *testing.T) {
}
func TestKeyChangesWhenWatching(t *testing.T) {
- initRdb()
+ rdb := initRdb(t)
testCases := []keyChangeTestCase{
// WatchKeyStatusSeenChange
@@ -170,11 +173,9 @@ func TestKeyChangesWhenWatching(t *testing.T) {
rdb.Set(ctx, runnerKey, tc.returnValue, 0)
}
- kw := NewKeyWatcher()
+ kw := NewKeyWatcher(rdb)
defer kw.Shutdown()
- defer func() {
- rdb.FlushDB(ctx)
- }()
+ defer rdb.FlushDB(ctx)
wg := &sync.WaitGroup{}
wg.Add(1)
@@ -189,13 +190,13 @@ func TestKeyChangesWhenWatching(t *testing.T) {
require.Equal(t, tc.expectedStatus, val, "Expected value")
}()
- kw.processMessages(t, 1, tc.processedValue, ready, wg)
+ processMessages(t, kw, 1, tc.processedValue, ready, wg)
})
}
}
func TestKeyChangesParallel(t *testing.T) {
- initRdb()
+ rdb := initRdb(t)
testCases := []keyChangeTestCase{
{
@@ -222,15 +223,13 @@ func TestKeyChangesParallel(t *testing.T) {
rdb.Set(ctx, runnerKey, tc.returnValue, 0)
}
- defer func() {
- rdb.FlushDB(ctx)
- }()
+ defer rdb.FlushDB(ctx)
wg := &sync.WaitGroup{}
wg.Add(runTimes)
ready := make(chan struct{})
- kw := NewKeyWatcher()
+ kw := NewKeyWatcher(rdb)
defer kw.Shutdown()
for i := 0; i < runTimes; i++ {
@@ -244,16 +243,15 @@ func TestKeyChangesParallel(t *testing.T) {
}()
}
- kw.processMessages(t, runTimes, tc.processedValue, ready, wg)
+ processMessages(t, kw, runTimes, tc.processedValue, ready, wg)
})
}
}
func TestShutdown(t *testing.T) {
- initRdb()
+ rdb := initRdb(t)
- kw := NewKeyWatcher()
- kw.redisConn = rdb
+ kw := NewKeyWatcher(rdb)
kw.conn = kw.redisConn.Subscribe(ctx, []string{}...)
defer kw.Shutdown()
@@ -272,14 +270,14 @@ func TestShutdown(t *testing.T) {
go func() {
defer wg.Done()
- require.Eventually(t, func() bool { return kw.countSubscribers(runnerKey) == 1 }, 10*time.Second, time.Millisecond)
+ require.Eventually(t, func() bool { return countSubscribers(kw, runnerKey) == 1 }, 10*time.Second, time.Millisecond)
kw.Shutdown()
}()
wg.Wait()
- require.Eventually(t, func() bool { return kw.countSubscribers(runnerKey) == 0 }, 10*time.Second, time.Millisecond)
+ require.Eventually(t, func() bool { return countSubscribers(kw, runnerKey) == 0 }, 10*time.Second, time.Millisecond)
// Adding a key after the shutdown should result in an immediate response
var val WatchKeyStatus
diff --git a/workhorse/internal/redis/redis.go b/workhorse/internal/redis/redis.go
index e21dae916e4..1fd30b05de5 100644
--- a/workhorse/internal/redis/redis.go
+++ b/workhorse/internal/redis/redis.go
@@ -17,7 +17,6 @@ import (
)
var (
- rdb *redis.Client
// found in https://github.com/redis/go-redis/blob/c7399b6a17d7d3e2a57654528af91349f2468529/sentinel.go#L626
errSentinelMasterAddr error = errors.New("redis: all sentinels specified in configuration are unreachable")
@@ -129,16 +128,13 @@ func (s sentinelInstrumentationHook) ProcessPipelineHook(next redis.ProcessPipel
}
}
-func GetRedisClient() *redis.Client {
- return rdb
-}
-
// Configure redis-connection
-func Configure(cfg *config.RedisConfig) error {
+func Configure(cfg *config.RedisConfig) (*redis.Client, error) {
if cfg == nil {
- return nil
+ return nil, nil
}
+ var rdb *redis.Client
var err error
if len(cfg.Sentinel) > 0 {
@@ -147,7 +143,7 @@ func Configure(cfg *config.RedisConfig) error {
rdb, err = configureRedis(cfg)
}
- return err
+ return rdb, err
}
func configureRedis(cfg *config.RedisConfig) (*redis.Client, error) {
diff --git a/workhorse/internal/redis/redis_test.go b/workhorse/internal/redis/redis_test.go
index d16a7a02761..cbceb7e6183 100644
--- a/workhorse/internal/redis/redis_test.go
+++ b/workhorse/internal/redis/redis_test.go
@@ -29,8 +29,8 @@ func mockRedisServer(t *testing.T, connectReceived *atomic.Value) string {
}
func TestConfigureNoConfig(t *testing.T) {
- rdb = nil
- Configure(nil)
+ rdb, err := Configure(nil)
+ require.NoError(t, err)
require.Nil(t, rdb, "rdb client should be nil")
}
@@ -57,15 +57,15 @@ func TestConfigureValidConfigX(t *testing.T) {
parsedURL := helper.URLMustParse(tc.scheme + "://" + a)
cfg := &config.RedisConfig{URL: config.TomlURL{URL: *parsedURL}}
- Configure(cfg)
+ rdb, err := Configure(cfg)
+ require.NoError(t, err)
+ defer rdb.Close()
- require.NotNil(t, GetRedisClient().Conn(), "Pool should not be nil")
+ require.NotNil(t, rdb.Conn(), "Pool should not be nil")
// goredis initialise connections lazily
rdb.Ping(context.Background())
require.True(t, connectReceived.Load().(bool))
-
- rdb = nil
})
}
}
@@ -96,15 +96,15 @@ func TestConnectToSentinel(t *testing.T) {
}
cfg := &config.RedisConfig{Sentinel: sentinelUrls}
- Configure(cfg)
+ rdb, err := Configure(cfg)
+ require.NoError(t, err)
+ defer rdb.Close()
- require.NotNil(t, GetRedisClient().Conn(), "Pool should not be nil")
+ require.NotNil(t, rdb.Conn(), "Pool should not be nil")
// goredis initialise connections lazily
rdb.Ping(context.Background())
require.True(t, connectReceived.Load().(bool))
-
- rdb = nil
})
}
}
diff --git a/workhorse/main.go b/workhorse/main.go
index 3043ae50a22..5743355594f 100644
--- a/workhorse/main.go
+++ b/workhorse/main.go
@@ -225,13 +225,14 @@ func run(boot bootConfig, cfg config.Config) error {
log.Info("Using redis/go-redis")
- redisKeyWatcher := redis.NewKeyWatcher()
- if err := redis.Configure(cfg.Redis); err != nil {
+ rdb, err := redis.Configure(cfg.Redis)
+ if err != nil {
log.WithError(err).Error("unable to configure redis client")
}
+ redisKeyWatcher := redis.NewKeyWatcher(rdb)
- if rdb := redis.GetRedisClient(); rdb != nil {
- go redisKeyWatcher.Process(rdb)
+ if rdb != nil {
+ go redisKeyWatcher.Process()
}
watchKeyFn := redisKeyWatcher.WatchKey