diff options
author | GitLab Bot <gitlab-bot@gitlab.com> | 2023-12-01 03:11:54 +0300 |
---|---|---|
committer | GitLab Bot <gitlab-bot@gitlab.com> | 2023-12-01 03:11:54 +0300 |
commit | 08775893a80e4024d9b30bd1a17caff7ecb274f9 (patch) | |
tree | 1a28b448a4361ac8ebf7cfebdf8af5645c2b3b4c /workhorse | |
parent | ab37c8f6370868a8316992745589167517d422b7 (diff) |
Add latest changes from gitlab-org/gitlab@master
Diffstat (limited to 'workhorse')
-rw-r--r-- | workhorse/internal/redis/keywatcher.go | 12 | ||||
-rw-r--r-- | workhorse/internal/redis/keywatcher_test.go | 64 | ||||
-rw-r--r-- | workhorse/internal/redis/redis.go | 12 | ||||
-rw-r--r-- | workhorse/internal/redis/redis_test.go | 20 | ||||
-rw-r--r-- | workhorse/main.go | 9 |
5 files changed, 55 insertions, 62 deletions
diff --git a/workhorse/internal/redis/keywatcher.go b/workhorse/internal/redis/keywatcher.go index ddb838121b7..efc14f1cb5e 100644 --- a/workhorse/internal/redis/keywatcher.go +++ b/workhorse/internal/redis/keywatcher.go @@ -21,11 +21,11 @@ type KeyWatcher struct { subscribers map[string][]chan string shutdown chan struct{} reconnectBackoff backoff.Backoff - redisConn *redis.Client + redisConn *redis.Client // can be nil conn *redis.PubSub } -func NewKeyWatcher() *KeyWatcher { +func NewKeyWatcher(redisConn *redis.Client) *KeyWatcher { return &KeyWatcher{ shutdown: make(chan struct{}), reconnectBackoff: backoff.Backoff{ @@ -34,6 +34,7 @@ func NewKeyWatcher() *KeyWatcher { Factor: 2, Jitter: true, }, + redisConn: redisConn, } } @@ -125,16 +126,13 @@ func (kw *KeyWatcher) receivePubSubStream(ctx context.Context, pubsub *redis.Pub } } -func (kw *KeyWatcher) Process(client *redis.Client) { +func (kw *KeyWatcher) Process() { log.Info("keywatcher: starting process loop") ctx := context.Background() // lint:allow context.Background - kw.mu.Lock() - kw.redisConn = client - kw.mu.Unlock() for { - pubsub := client.Subscribe(ctx, []string{}...) + pubsub := kw.redisConn.Subscribe(ctx, []string{}...) if err := pubsub.Ping(ctx); err != nil { log.WithError(fmt.Errorf("keywatcher: %v", err)).Error() time.Sleep(kw.reconnectBackoff.Duration()) diff --git a/workhorse/internal/redis/keywatcher_test.go b/workhorse/internal/redis/keywatcher_test.go index bca4ca43a64..0e3278f2d26 100644 --- a/workhorse/internal/redis/keywatcher_test.go +++ b/workhorse/internal/redis/keywatcher_test.go @@ -7,6 +7,8 @@ import ( "testing" "time" + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitlab/workhorse/internal/config" @@ -18,24 +20,28 @@ const ( runnerKey = "runner:build_queue:10" ) -func initRdb() { - buf, _ := os.ReadFile("../../config.toml") - cfg, _ := config.LoadConfig(string(buf)) - Configure(cfg.Redis) +func initRdb(t *testing.T) *redis.Client { + buf, err := os.ReadFile("../../config.toml") + require.NoError(t, err) + cfg, err := config.LoadConfig(string(buf)) + require.NoError(t, err) + rdb, err := Configure(cfg.Redis) + require.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, rdb.Close()) + }) + return rdb } -func (kw *KeyWatcher) countSubscribers(key string) int { +func countSubscribers(kw *KeyWatcher, key string) int { kw.mu.Lock() defer kw.mu.Unlock() return len(kw.subscribers[key]) } // Forces a run of the `Process` loop against a mock PubSubConn. -func (kw *KeyWatcher) processMessages(t *testing.T, numWatchers int, value string, ready chan<- struct{}, wg *sync.WaitGroup) { - kw.mu.Lock() - kw.redisConn = rdb +func processMessages(t *testing.T, kw *KeyWatcher, numWatchers int, value string, ready chan<- struct{}, wg *sync.WaitGroup) { psc := kw.redisConn.Subscribe(ctx, []string{}...) - kw.mu.Unlock() errC := make(chan error) go func() { errC <- kw.receivePubSubStream(ctx, psc) }() @@ -48,7 +54,7 @@ func (kw *KeyWatcher) processMessages(t *testing.T, numWatchers int, value strin close(ready) require.Eventually(t, func() bool { - return kw.countSubscribers(runnerKey) == numWatchers + return countSubscribers(kw, runnerKey) == numWatchers }, time.Second, time.Millisecond) // send message after listeners are ready @@ -74,7 +80,7 @@ type keyChangeTestCase struct { } func TestKeyChangesInstantReturn(t *testing.T) { - initRdb() + rdb := initRdb(t) testCases := []keyChangeTestCase{ // WatchKeyStatusAlreadyChanged @@ -118,13 +124,10 @@ func TestKeyChangesInstantReturn(t *testing.T) { rdb.Set(ctx, runnerKey, tc.returnValue, 0) } - defer func() { - rdb.FlushDB(ctx) - }() + defer rdb.FlushDB(ctx) - kw := NewKeyWatcher() + kw := NewKeyWatcher(rdb) defer kw.Shutdown() - kw.redisConn = rdb kw.conn = kw.redisConn.Subscribe(ctx, []string{}...) val, err := kw.WatchKey(ctx, runnerKey, tc.watchValue, tc.timeout) @@ -136,7 +139,7 @@ func TestKeyChangesInstantReturn(t *testing.T) { } func TestKeyChangesWhenWatching(t *testing.T) { - initRdb() + rdb := initRdb(t) testCases := []keyChangeTestCase{ // WatchKeyStatusSeenChange @@ -170,11 +173,9 @@ func TestKeyChangesWhenWatching(t *testing.T) { rdb.Set(ctx, runnerKey, tc.returnValue, 0) } - kw := NewKeyWatcher() + kw := NewKeyWatcher(rdb) defer kw.Shutdown() - defer func() { - rdb.FlushDB(ctx) - }() + defer rdb.FlushDB(ctx) wg := &sync.WaitGroup{} wg.Add(1) @@ -189,13 +190,13 @@ func TestKeyChangesWhenWatching(t *testing.T) { require.Equal(t, tc.expectedStatus, val, "Expected value") }() - kw.processMessages(t, 1, tc.processedValue, ready, wg) + processMessages(t, kw, 1, tc.processedValue, ready, wg) }) } } func TestKeyChangesParallel(t *testing.T) { - initRdb() + rdb := initRdb(t) testCases := []keyChangeTestCase{ { @@ -222,15 +223,13 @@ func TestKeyChangesParallel(t *testing.T) { rdb.Set(ctx, runnerKey, tc.returnValue, 0) } - defer func() { - rdb.FlushDB(ctx) - }() + defer rdb.FlushDB(ctx) wg := &sync.WaitGroup{} wg.Add(runTimes) ready := make(chan struct{}) - kw := NewKeyWatcher() + kw := NewKeyWatcher(rdb) defer kw.Shutdown() for i := 0; i < runTimes; i++ { @@ -244,16 +243,15 @@ func TestKeyChangesParallel(t *testing.T) { }() } - kw.processMessages(t, runTimes, tc.processedValue, ready, wg) + processMessages(t, kw, runTimes, tc.processedValue, ready, wg) }) } } func TestShutdown(t *testing.T) { - initRdb() + rdb := initRdb(t) - kw := NewKeyWatcher() - kw.redisConn = rdb + kw := NewKeyWatcher(rdb) kw.conn = kw.redisConn.Subscribe(ctx, []string{}...) defer kw.Shutdown() @@ -272,14 +270,14 @@ func TestShutdown(t *testing.T) { go func() { defer wg.Done() - require.Eventually(t, func() bool { return kw.countSubscribers(runnerKey) == 1 }, 10*time.Second, time.Millisecond) + require.Eventually(t, func() bool { return countSubscribers(kw, runnerKey) == 1 }, 10*time.Second, time.Millisecond) kw.Shutdown() }() wg.Wait() - require.Eventually(t, func() bool { return kw.countSubscribers(runnerKey) == 0 }, 10*time.Second, time.Millisecond) + require.Eventually(t, func() bool { return countSubscribers(kw, runnerKey) == 0 }, 10*time.Second, time.Millisecond) // Adding a key after the shutdown should result in an immediate response var val WatchKeyStatus diff --git a/workhorse/internal/redis/redis.go b/workhorse/internal/redis/redis.go index e21dae916e4..1fd30b05de5 100644 --- a/workhorse/internal/redis/redis.go +++ b/workhorse/internal/redis/redis.go @@ -17,7 +17,6 @@ import ( ) var ( - rdb *redis.Client // found in https://github.com/redis/go-redis/blob/c7399b6a17d7d3e2a57654528af91349f2468529/sentinel.go#L626 errSentinelMasterAddr error = errors.New("redis: all sentinels specified in configuration are unreachable") @@ -129,16 +128,13 @@ func (s sentinelInstrumentationHook) ProcessPipelineHook(next redis.ProcessPipel } } -func GetRedisClient() *redis.Client { - return rdb -} - // Configure redis-connection -func Configure(cfg *config.RedisConfig) error { +func Configure(cfg *config.RedisConfig) (*redis.Client, error) { if cfg == nil { - return nil + return nil, nil } + var rdb *redis.Client var err error if len(cfg.Sentinel) > 0 { @@ -147,7 +143,7 @@ func Configure(cfg *config.RedisConfig) error { rdb, err = configureRedis(cfg) } - return err + return rdb, err } func configureRedis(cfg *config.RedisConfig) (*redis.Client, error) { diff --git a/workhorse/internal/redis/redis_test.go b/workhorse/internal/redis/redis_test.go index d16a7a02761..cbceb7e6183 100644 --- a/workhorse/internal/redis/redis_test.go +++ b/workhorse/internal/redis/redis_test.go @@ -29,8 +29,8 @@ func mockRedisServer(t *testing.T, connectReceived *atomic.Value) string { } func TestConfigureNoConfig(t *testing.T) { - rdb = nil - Configure(nil) + rdb, err := Configure(nil) + require.NoError(t, err) require.Nil(t, rdb, "rdb client should be nil") } @@ -57,15 +57,15 @@ func TestConfigureValidConfigX(t *testing.T) { parsedURL := helper.URLMustParse(tc.scheme + "://" + a) cfg := &config.RedisConfig{URL: config.TomlURL{URL: *parsedURL}} - Configure(cfg) + rdb, err := Configure(cfg) + require.NoError(t, err) + defer rdb.Close() - require.NotNil(t, GetRedisClient().Conn(), "Pool should not be nil") + require.NotNil(t, rdb.Conn(), "Pool should not be nil") // goredis initialise connections lazily rdb.Ping(context.Background()) require.True(t, connectReceived.Load().(bool)) - - rdb = nil }) } } @@ -96,15 +96,15 @@ func TestConnectToSentinel(t *testing.T) { } cfg := &config.RedisConfig{Sentinel: sentinelUrls} - Configure(cfg) + rdb, err := Configure(cfg) + require.NoError(t, err) + defer rdb.Close() - require.NotNil(t, GetRedisClient().Conn(), "Pool should not be nil") + require.NotNil(t, rdb.Conn(), "Pool should not be nil") // goredis initialise connections lazily rdb.Ping(context.Background()) require.True(t, connectReceived.Load().(bool)) - - rdb = nil }) } } diff --git a/workhorse/main.go b/workhorse/main.go index 3043ae50a22..5743355594f 100644 --- a/workhorse/main.go +++ b/workhorse/main.go @@ -225,13 +225,14 @@ func run(boot bootConfig, cfg config.Config) error { log.Info("Using redis/go-redis") - redisKeyWatcher := redis.NewKeyWatcher() - if err := redis.Configure(cfg.Redis); err != nil { + rdb, err := redis.Configure(cfg.Redis) + if err != nil { log.WithError(err).Error("unable to configure redis client") } + redisKeyWatcher := redis.NewKeyWatcher(rdb) - if rdb := redis.GetRedisClient(); rdb != nil { - go redisKeyWatcher.Process(rdb) + if rdb != nil { + go redisKeyWatcher.Process() } watchKeyFn := redisKeyWatcher.WatchKey |