diff options
Diffstat (limited to 'workhorse/internal/redis/keywatcher_test.go')
-rw-r--r-- | workhorse/internal/redis/keywatcher_test.go | 134 |
1 files changed, 60 insertions, 74 deletions
diff --git a/workhorse/internal/redis/keywatcher_test.go b/workhorse/internal/redis/keywatcher_test.go index 3abc1bf1107..0e3278f2d26 100644 --- a/workhorse/internal/redis/keywatcher_test.go +++ b/workhorse/internal/redis/keywatcher_test.go @@ -2,13 +2,16 @@ package redis import ( "context" + "os" "sync" "testing" "time" - "github.com/gomodule/redigo/redis" - "github.com/rafaeljusto/redigomock/v3" + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "gitlab.com/gitlab-org/gitlab/workhorse/internal/config" ) var ctx = context.Background() @@ -17,47 +20,31 @@ const ( runnerKey = "runner:build_queue:10" ) -func createSubscriptionMessage(key, data string) []interface{} { - return []interface{}{ - []byte("message"), - []byte(key), - []byte(data), - } +func initRdb(t *testing.T) *redis.Client { + buf, err := os.ReadFile("../../config.toml") + require.NoError(t, err) + cfg, err := config.LoadConfig(string(buf)) + require.NoError(t, err) + rdb, err := Configure(cfg.Redis) + require.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, rdb.Close()) + }) + return rdb } -func createSubscribeMessage(key string) []interface{} { - return []interface{}{ - []byte("subscribe"), - []byte(key), - []byte("1"), - } -} -func createUnsubscribeMessage(key string) []interface{} { - return []interface{}{ - []byte("unsubscribe"), - []byte(key), - []byte("1"), - } -} - -func (kw *KeyWatcher) countSubscribers(key string) int { +func countSubscribers(kw *KeyWatcher, key string) int { kw.mu.Lock() defer kw.mu.Unlock() return len(kw.subscribers[key]) } // Forces a run of the `Process` loop against a mock PubSubConn. -func (kw *KeyWatcher) processMessages(t *testing.T, numWatchers int, value string, ready chan<- struct{}) { - psc := redigomock.NewConn() - psc.ReceiveWait = true - - channel := channelPrefix + runnerKey - psc.Command("SUBSCRIBE", channel).Expect(createSubscribeMessage(channel)) - psc.Command("UNSUBSCRIBE", channel).Expect(createUnsubscribeMessage(channel)) - psc.AddSubscriptionMessage(createSubscriptionMessage(channel, value)) +func processMessages(t *testing.T, kw *KeyWatcher, numWatchers int, value string, ready chan<- struct{}, wg *sync.WaitGroup) { + psc := kw.redisConn.Subscribe(ctx, []string{}...) errC := make(chan error) - go func() { errC <- kw.receivePubSubStream(psc) }() + go func() { errC <- kw.receivePubSubStream(ctx, psc) }() require.Eventually(t, func() bool { kw.mu.Lock() @@ -67,9 +54,17 @@ func (kw *KeyWatcher) processMessages(t *testing.T, numWatchers int, value strin close(ready) require.Eventually(t, func() bool { - return kw.countSubscribers(runnerKey) == numWatchers + return countSubscribers(kw, runnerKey) == numWatchers }, time.Second, time.Millisecond) - close(psc.ReceiveNow) + + // send message after listeners are ready + kw.redisConn.Publish(ctx, channelPrefix+runnerKey, value) + + // close subscription after all workers are done + wg.Wait() + kw.mu.Lock() + kw.conn.Close() + kw.mu.Unlock() require.NoError(t, <-errC) } @@ -85,6 +80,8 @@ type keyChangeTestCase struct { } func TestKeyChangesInstantReturn(t *testing.T) { + rdb := initRdb(t) + testCases := []keyChangeTestCase{ // WatchKeyStatusAlreadyChanged { @@ -121,18 +118,17 @@ func TestKeyChangesInstantReturn(t *testing.T) { for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - conn, td := setupMockPool() - defer td() - if tc.isKeyMissing { - conn.Command("GET", runnerKey).ExpectError(redis.ErrNil) - } else { - conn.Command("GET", runnerKey).Expect(tc.returnValue) + // setup + if !tc.isKeyMissing { + rdb.Set(ctx, runnerKey, tc.returnValue, 0) } - kw := NewKeyWatcher() + defer rdb.FlushDB(ctx) + + kw := NewKeyWatcher(rdb) defer kw.Shutdown() - kw.conn = &redis.PubSubConn{Conn: redigomock.NewConn()} + kw.conn = kw.redisConn.Subscribe(ctx, []string{}...) val, err := kw.WatchKey(ctx, runnerKey, tc.watchValue, tc.timeout) @@ -143,6 +139,8 @@ func TestKeyChangesInstantReturn(t *testing.T) { } func TestKeyChangesWhenWatching(t *testing.T) { + rdb := initRdb(t) + testCases := []keyChangeTestCase{ // WatchKeyStatusSeenChange { @@ -171,17 +169,13 @@ func TestKeyChangesWhenWatching(t *testing.T) { for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - conn, td := setupMockPool() - defer td() - - if tc.isKeyMissing { - conn.Command("GET", runnerKey).ExpectError(redis.ErrNil) - } else { - conn.Command("GET", runnerKey).Expect(tc.returnValue) + if !tc.isKeyMissing { + rdb.Set(ctx, runnerKey, tc.returnValue, 0) } - kw := NewKeyWatcher() + kw := NewKeyWatcher(rdb) defer kw.Shutdown() + defer rdb.FlushDB(ctx) wg := &sync.WaitGroup{} wg.Add(1) @@ -196,13 +190,14 @@ func TestKeyChangesWhenWatching(t *testing.T) { require.Equal(t, tc.expectedStatus, val, "Expected value") }() - kw.processMessages(t, 1, tc.processedValue, ready) - wg.Wait() + processMessages(t, kw, 1, tc.processedValue, ready, wg) }) } } func TestKeyChangesParallel(t *testing.T) { + rdb := initRdb(t) + testCases := []keyChangeTestCase{ { desc: "massively parallel, sees change with key existing", @@ -224,24 +219,17 @@ func TestKeyChangesParallel(t *testing.T) { t.Run(tc.desc, func(t *testing.T) { runTimes := 100 - conn, td := setupMockPool() - defer td() - - getCmd := conn.Command("GET", runnerKey) - - for i := 0; i < runTimes; i++ { - if tc.isKeyMissing { - getCmd = getCmd.ExpectError(redis.ErrNil) - } else { - getCmd = getCmd.Expect(tc.returnValue) - } + if !tc.isKeyMissing { + rdb.Set(ctx, runnerKey, tc.returnValue, 0) } + defer rdb.FlushDB(ctx) + wg := &sync.WaitGroup{} wg.Add(runTimes) ready := make(chan struct{}) - kw := NewKeyWatcher() + kw := NewKeyWatcher(rdb) defer kw.Shutdown() for i := 0; i < runTimes; i++ { @@ -255,21 +243,19 @@ func TestKeyChangesParallel(t *testing.T) { }() } - kw.processMessages(t, runTimes, tc.processedValue, ready) - wg.Wait() + processMessages(t, kw, runTimes, tc.processedValue, ready, wg) }) } } func TestShutdown(t *testing.T) { - conn, td := setupMockPool() - defer td() + rdb := initRdb(t) - kw := NewKeyWatcher() - kw.conn = &redis.PubSubConn{Conn: redigomock.NewConn()} + kw := NewKeyWatcher(rdb) + kw.conn = kw.redisConn.Subscribe(ctx, []string{}...) defer kw.Shutdown() - conn.Command("GET", runnerKey).Expect("something") + rdb.Set(ctx, runnerKey, "something", 0) wg := &sync.WaitGroup{} wg.Add(2) @@ -284,14 +270,14 @@ func TestShutdown(t *testing.T) { go func() { defer wg.Done() - require.Eventually(t, func() bool { return kw.countSubscribers(runnerKey) == 1 }, 10*time.Second, time.Millisecond) + require.Eventually(t, func() bool { return countSubscribers(kw, runnerKey) == 1 }, 10*time.Second, time.Millisecond) kw.Shutdown() }() wg.Wait() - require.Eventually(t, func() bool { return kw.countSubscribers(runnerKey) == 0 }, 10*time.Second, time.Millisecond) + require.Eventually(t, func() bool { return countSubscribers(kw, runnerKey) == 0 }, 10*time.Second, time.Millisecond) // Adding a key after the shutdown should result in an immediate response var val WatchKeyStatus |