Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'workhorse/internal/redis/keywatcher_test.go')
-rw-r--r--workhorse/internal/redis/keywatcher_test.go162
1 files changed, 162 insertions, 0 deletions
diff --git a/workhorse/internal/redis/keywatcher_test.go b/workhorse/internal/redis/keywatcher_test.go
new file mode 100644
index 00000000000..f1ee77e2194
--- /dev/null
+++ b/workhorse/internal/redis/keywatcher_test.go
@@ -0,0 +1,162 @@
+package redis
+
+import (
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/rafaeljusto/redigomock"
+ "github.com/stretchr/testify/require"
+)
+
+const (
+ runnerKey = "runner:build_queue:10"
+)
+
+func createSubscriptionMessage(key, data string) []interface{} {
+ return []interface{}{
+ []byte("message"),
+ []byte(key),
+ []byte(data),
+ }
+}
+
+func createSubscribeMessage(key string) []interface{} {
+ return []interface{}{
+ []byte("subscribe"),
+ []byte(key),
+ []byte("1"),
+ }
+}
+func createUnsubscribeMessage(key string) []interface{} {
+ return []interface{}{
+ []byte("unsubscribe"),
+ []byte(key),
+ []byte("1"),
+ }
+}
+
+func countWatchers(key string) int {
+ keyWatcherMutex.Lock()
+ defer keyWatcherMutex.Unlock()
+ return len(keyWatcher[key])
+}
+
+func deleteWatchers(key string) {
+ keyWatcherMutex.Lock()
+ defer keyWatcherMutex.Unlock()
+ delete(keyWatcher, key)
+}
+
+// Forces a run of the `Process` loop against a mock PubSubConn.
+func processMessages(numWatchers int, value string) {
+ psc := redigomock.NewConn()
+
+ // Setup the initial subscription message
+ psc.Command("SUBSCRIBE", keySubChannel).Expect(createSubscribeMessage(keySubChannel))
+ psc.Command("UNSUBSCRIBE", keySubChannel).Expect(createUnsubscribeMessage(keySubChannel))
+ psc.AddSubscriptionMessage(createSubscriptionMessage(keySubChannel, runnerKey+"="+value))
+
+ // Wait for all the `WatchKey` calls to be registered
+ for countWatchers(runnerKey) != numWatchers {
+ time.Sleep(time.Millisecond)
+ }
+
+ processInner(psc)
+}
+
+func TestWatchKeySeenChange(t *testing.T) {
+ conn, td := setupMockPool()
+ defer td()
+
+ conn.Command("GET", runnerKey).Expect("something")
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ go func() {
+ val, err := WatchKey(runnerKey, "something", time.Second)
+ require.NoError(t, err, "Expected no error")
+ require.Equal(t, WatchKeyStatusSeenChange, val, "Expected value to change")
+ wg.Done()
+ }()
+
+ processMessages(1, "somethingelse")
+ wg.Wait()
+}
+
+func TestWatchKeyNoChange(t *testing.T) {
+ conn, td := setupMockPool()
+ defer td()
+
+ conn.Command("GET", runnerKey).Expect("something")
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ go func() {
+ val, err := WatchKey(runnerKey, "something", time.Second)
+ require.NoError(t, err, "Expected no error")
+ require.Equal(t, WatchKeyStatusNoChange, val, "Expected notification without change to value")
+ wg.Done()
+ }()
+
+ processMessages(1, "something")
+ wg.Wait()
+}
+
+func TestWatchKeyTimeout(t *testing.T) {
+ conn, td := setupMockPool()
+ defer td()
+
+ conn.Command("GET", runnerKey).Expect("something")
+
+ val, err := WatchKey(runnerKey, "something", time.Millisecond)
+ require.NoError(t, err, "Expected no error")
+ require.Equal(t, WatchKeyStatusTimeout, val, "Expected value to not change")
+
+ // Clean up watchers since Process isn't doing that for us (not running)
+ deleteWatchers(runnerKey)
+}
+
+func TestWatchKeyAlreadyChanged(t *testing.T) {
+ conn, td := setupMockPool()
+ defer td()
+
+ conn.Command("GET", runnerKey).Expect("somethingelse")
+
+ val, err := WatchKey(runnerKey, "something", time.Second)
+ require.NoError(t, err, "Expected no error")
+ require.Equal(t, WatchKeyStatusAlreadyChanged, val, "Expected value to have already changed")
+
+ // Clean up watchers since Process isn't doing that for us (not running)
+ deleteWatchers(runnerKey)
+}
+
+func TestWatchKeyMassivelyParallel(t *testing.T) {
+ runTimes := 100 // 100 parallel watchers
+
+ conn, td := setupMockPool()
+ defer td()
+
+ wg := &sync.WaitGroup{}
+ wg.Add(runTimes)
+
+ getCmd := conn.Command("GET", runnerKey)
+
+ for i := 0; i < runTimes; i++ {
+ getCmd = getCmd.Expect("something")
+ }
+
+ for i := 0; i < runTimes; i++ {
+ go func() {
+ val, err := WatchKey(runnerKey, "something", time.Second)
+ require.NoError(t, err, "Expected no error")
+ require.Equal(t, WatchKeyStatusSeenChange, val, "Expected value to change")
+ wg.Done()
+ }()
+ }
+
+ processMessages(runTimes, "somethingelse")
+ wg.Wait()
+}