Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'workhorse/internal/redis/keywatcher_test.go')
-rw-r--r--workhorse/internal/redis/keywatcher_test.go134
1 files changed, 60 insertions, 74 deletions
diff --git a/workhorse/internal/redis/keywatcher_test.go b/workhorse/internal/redis/keywatcher_test.go
index 3abc1bf1107..0e3278f2d26 100644
--- a/workhorse/internal/redis/keywatcher_test.go
+++ b/workhorse/internal/redis/keywatcher_test.go
@@ -2,13 +2,16 @@ package redis
import (
"context"
+ "os"
"sync"
"testing"
"time"
- "github.com/gomodule/redigo/redis"
- "github.com/rafaeljusto/redigomock/v3"
+ "github.com/redis/go-redis/v9"
+ "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
)
var ctx = context.Background()
@@ -17,47 +20,31 @@ const (
runnerKey = "runner:build_queue:10"
)
-func createSubscriptionMessage(key, data string) []interface{} {
- return []interface{}{
- []byte("message"),
- []byte(key),
- []byte(data),
- }
+func initRdb(t *testing.T) *redis.Client {
+ buf, err := os.ReadFile("../../config.toml")
+ require.NoError(t, err)
+ cfg, err := config.LoadConfig(string(buf))
+ require.NoError(t, err)
+ rdb, err := Configure(cfg.Redis)
+ require.NoError(t, err)
+ t.Cleanup(func() {
+ assert.NoError(t, rdb.Close())
+ })
+ return rdb
}
-func createSubscribeMessage(key string) []interface{} {
- return []interface{}{
- []byte("subscribe"),
- []byte(key),
- []byte("1"),
- }
-}
-func createUnsubscribeMessage(key string) []interface{} {
- return []interface{}{
- []byte("unsubscribe"),
- []byte(key),
- []byte("1"),
- }
-}
-
-func (kw *KeyWatcher) countSubscribers(key string) int {
+func countSubscribers(kw *KeyWatcher, key string) int {
kw.mu.Lock()
defer kw.mu.Unlock()
return len(kw.subscribers[key])
}
// Forces a run of the `Process` loop against a mock PubSubConn.
-func (kw *KeyWatcher) processMessages(t *testing.T, numWatchers int, value string, ready chan<- struct{}) {
- psc := redigomock.NewConn()
- psc.ReceiveWait = true
-
- channel := channelPrefix + runnerKey
- psc.Command("SUBSCRIBE", channel).Expect(createSubscribeMessage(channel))
- psc.Command("UNSUBSCRIBE", channel).Expect(createUnsubscribeMessage(channel))
- psc.AddSubscriptionMessage(createSubscriptionMessage(channel, value))
+func processMessages(t *testing.T, kw *KeyWatcher, numWatchers int, value string, ready chan<- struct{}, wg *sync.WaitGroup) {
+ psc := kw.redisConn.Subscribe(ctx, []string{}...)
errC := make(chan error)
- go func() { errC <- kw.receivePubSubStream(psc) }()
+ go func() { errC <- kw.receivePubSubStream(ctx, psc) }()
require.Eventually(t, func() bool {
kw.mu.Lock()
@@ -67,9 +54,17 @@ func (kw *KeyWatcher) processMessages(t *testing.T, numWatchers int, value strin
close(ready)
require.Eventually(t, func() bool {
- return kw.countSubscribers(runnerKey) == numWatchers
+ return countSubscribers(kw, runnerKey) == numWatchers
}, time.Second, time.Millisecond)
- close(psc.ReceiveNow)
+
+ // send message after listeners are ready
+ kw.redisConn.Publish(ctx, channelPrefix+runnerKey, value)
+
+ // close subscription after all workers are done
+ wg.Wait()
+ kw.mu.Lock()
+ kw.conn.Close()
+ kw.mu.Unlock()
require.NoError(t, <-errC)
}
@@ -85,6 +80,8 @@ type keyChangeTestCase struct {
}
func TestKeyChangesInstantReturn(t *testing.T) {
+ rdb := initRdb(t)
+
testCases := []keyChangeTestCase{
// WatchKeyStatusAlreadyChanged
{
@@ -121,18 +118,17 @@ func TestKeyChangesInstantReturn(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
- conn, td := setupMockPool()
- defer td()
- if tc.isKeyMissing {
- conn.Command("GET", runnerKey).ExpectError(redis.ErrNil)
- } else {
- conn.Command("GET", runnerKey).Expect(tc.returnValue)
+ // setup
+ if !tc.isKeyMissing {
+ rdb.Set(ctx, runnerKey, tc.returnValue, 0)
}
- kw := NewKeyWatcher()
+ defer rdb.FlushDB(ctx)
+
+ kw := NewKeyWatcher(rdb)
defer kw.Shutdown()
- kw.conn = &redis.PubSubConn{Conn: redigomock.NewConn()}
+ kw.conn = kw.redisConn.Subscribe(ctx, []string{}...)
val, err := kw.WatchKey(ctx, runnerKey, tc.watchValue, tc.timeout)
@@ -143,6 +139,8 @@ func TestKeyChangesInstantReturn(t *testing.T) {
}
func TestKeyChangesWhenWatching(t *testing.T) {
+ rdb := initRdb(t)
+
testCases := []keyChangeTestCase{
// WatchKeyStatusSeenChange
{
@@ -171,17 +169,13 @@ func TestKeyChangesWhenWatching(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
- conn, td := setupMockPool()
- defer td()
-
- if tc.isKeyMissing {
- conn.Command("GET", runnerKey).ExpectError(redis.ErrNil)
- } else {
- conn.Command("GET", runnerKey).Expect(tc.returnValue)
+ if !tc.isKeyMissing {
+ rdb.Set(ctx, runnerKey, tc.returnValue, 0)
}
- kw := NewKeyWatcher()
+ kw := NewKeyWatcher(rdb)
defer kw.Shutdown()
+ defer rdb.FlushDB(ctx)
wg := &sync.WaitGroup{}
wg.Add(1)
@@ -196,13 +190,14 @@ func TestKeyChangesWhenWatching(t *testing.T) {
require.Equal(t, tc.expectedStatus, val, "Expected value")
}()
- kw.processMessages(t, 1, tc.processedValue, ready)
- wg.Wait()
+ processMessages(t, kw, 1, tc.processedValue, ready, wg)
})
}
}
func TestKeyChangesParallel(t *testing.T) {
+ rdb := initRdb(t)
+
testCases := []keyChangeTestCase{
{
desc: "massively parallel, sees change with key existing",
@@ -224,24 +219,17 @@ func TestKeyChangesParallel(t *testing.T) {
t.Run(tc.desc, func(t *testing.T) {
runTimes := 100
- conn, td := setupMockPool()
- defer td()
-
- getCmd := conn.Command("GET", runnerKey)
-
- for i := 0; i < runTimes; i++ {
- if tc.isKeyMissing {
- getCmd = getCmd.ExpectError(redis.ErrNil)
- } else {
- getCmd = getCmd.Expect(tc.returnValue)
- }
+ if !tc.isKeyMissing {
+ rdb.Set(ctx, runnerKey, tc.returnValue, 0)
}
+ defer rdb.FlushDB(ctx)
+
wg := &sync.WaitGroup{}
wg.Add(runTimes)
ready := make(chan struct{})
- kw := NewKeyWatcher()
+ kw := NewKeyWatcher(rdb)
defer kw.Shutdown()
for i := 0; i < runTimes; i++ {
@@ -255,21 +243,19 @@ func TestKeyChangesParallel(t *testing.T) {
}()
}
- kw.processMessages(t, runTimes, tc.processedValue, ready)
- wg.Wait()
+ processMessages(t, kw, runTimes, tc.processedValue, ready, wg)
})
}
}
func TestShutdown(t *testing.T) {
- conn, td := setupMockPool()
- defer td()
+ rdb := initRdb(t)
- kw := NewKeyWatcher()
- kw.conn = &redis.PubSubConn{Conn: redigomock.NewConn()}
+ kw := NewKeyWatcher(rdb)
+ kw.conn = kw.redisConn.Subscribe(ctx, []string{}...)
defer kw.Shutdown()
- conn.Command("GET", runnerKey).Expect("something")
+ rdb.Set(ctx, runnerKey, "something", 0)
wg := &sync.WaitGroup{}
wg.Add(2)
@@ -284,14 +270,14 @@ func TestShutdown(t *testing.T) {
go func() {
defer wg.Done()
- require.Eventually(t, func() bool { return kw.countSubscribers(runnerKey) == 1 }, 10*time.Second, time.Millisecond)
+ require.Eventually(t, func() bool { return countSubscribers(kw, runnerKey) == 1 }, 10*time.Second, time.Millisecond)
kw.Shutdown()
}()
wg.Wait()
- require.Eventually(t, func() bool { return kw.countSubscribers(runnerKey) == 0 }, 10*time.Second, time.Millisecond)
+ require.Eventually(t, func() bool { return countSubscribers(kw, runnerKey) == 0 }, 10*time.Second, time.Millisecond)
// Adding a key after the shutdown should result in an immediate response
var val WatchKeyStatus