Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'workhorse/internal/redis/keywatcher_test.go')
-rw-r--r--workhorse/internal/redis/keywatcher_test.go103
1 files changed, 55 insertions, 48 deletions
diff --git a/workhorse/internal/redis/keywatcher_test.go b/workhorse/internal/redis/keywatcher_test.go
index 29041226b14..bae49d81bb1 100644
--- a/workhorse/internal/redis/keywatcher_test.go
+++ b/workhorse/internal/redis/keywatcher_test.go
@@ -1,7 +1,6 @@
package redis
import (
- "errors"
"sync"
"testing"
"time"
@@ -38,33 +37,38 @@ func createUnsubscribeMessage(key string) []interface{} {
}
}
-func countWatchers(key string) int {
- keyWatcherMutex.Lock()
- defer keyWatcherMutex.Unlock()
- return len(keyWatcher[key])
-}
-
-func deleteWatchers(key string) {
- keyWatcherMutex.Lock()
- defer keyWatcherMutex.Unlock()
- delete(keyWatcher, key)
+func (kw *KeyWatcher) countSubscribers(key string) int {
+ kw.mu.Lock()
+ defer kw.mu.Unlock()
+ return len(kw.subscribers[key])
}
// Forces a run of the `Process` loop against a mock PubSubConn.
-func processMessages(numWatchers int, value string) {
+func (kw *KeyWatcher) processMessages(t *testing.T, numWatchers int, value string, ready chan<- struct{}) {
psc := redigomock.NewConn()
+ psc.ReceiveWait = true
- // Setup the initial subscription message
- psc.Command("SUBSCRIBE", keySubChannel).Expect(createSubscribeMessage(keySubChannel))
- psc.Command("UNSUBSCRIBE", keySubChannel).Expect(createUnsubscribeMessage(keySubChannel))
- psc.AddSubscriptionMessage(createSubscriptionMessage(keySubChannel, runnerKey+"="+value))
+ channel := channelPrefix + runnerKey
+ psc.Command("SUBSCRIBE", channel).Expect(createSubscribeMessage(channel))
+ psc.Command("UNSUBSCRIBE", channel).Expect(createUnsubscribeMessage(channel))
+ psc.AddSubscriptionMessage(createSubscriptionMessage(channel, value))
- // Wait for all the `WatchKey` calls to be registered
- for countWatchers(runnerKey) != numWatchers {
- time.Sleep(time.Millisecond)
- }
+ errC := make(chan error)
+ go func() { errC <- kw.receivePubSubStream(psc) }()
+
+ require.Eventually(t, func() bool {
+ kw.mu.Lock()
+ defer kw.mu.Unlock()
+ return kw.conn != nil
+ }, time.Second, time.Millisecond)
+ close(ready)
+
+ require.Eventually(t, func() bool {
+ return kw.countSubscribers(runnerKey) == numWatchers
+ }, time.Second, time.Millisecond)
+ close(psc.ReceiveNow)
- processInner(psc)
+ require.NoError(t, <-errC)
}
type keyChangeTestCase struct {
@@ -77,18 +81,6 @@ type keyChangeTestCase struct {
timeout time.Duration
}
-func TestKeyChangesBubblesUpError(t *testing.T) {
- conn, td := setupMockPool()
- defer td()
-
- conn.Command("GET", runnerKey).ExpectError(errors.New("test error"))
-
- _, err := WatchKey(runnerKey, "something", time.Second)
- require.Error(t, err, "Expected error")
-
- deleteWatchers(runnerKey)
-}
-
func TestKeyChangesInstantReturn(t *testing.T) {
testCases := []keyChangeTestCase{
// WatchKeyStatusAlreadyChanged
@@ -135,12 +127,14 @@ func TestKeyChangesInstantReturn(t *testing.T) {
conn.Command("GET", runnerKey).Expect(tc.returnValue)
}
- val, err := WatchKey(runnerKey, tc.watchValue, tc.timeout)
+ kw := NewKeyWatcher()
+ defer kw.Shutdown()
+ kw.conn = &redis.PubSubConn{Conn: redigomock.NewConn()}
+
+ val, err := kw.WatchKey(runnerKey, tc.watchValue, tc.timeout)
require.NoError(t, err, "Expected no error")
require.Equal(t, tc.expectedStatus, val, "Expected value")
-
- deleteWatchers(runnerKey)
})
}
}
@@ -183,18 +177,23 @@ func TestKeyChangesWhenWatching(t *testing.T) {
conn.Command("GET", runnerKey).Expect(tc.returnValue)
}
+ kw := NewKeyWatcher()
+ defer kw.Shutdown()
+
wg := &sync.WaitGroup{}
wg.Add(1)
+ ready := make(chan struct{})
go func() {
defer wg.Done()
- val, err := WatchKey(runnerKey, tc.watchValue, time.Second)
+ <-ready
+ val, err := kw.WatchKey(runnerKey, tc.watchValue, time.Second)
require.NoError(t, err, "Expected no error")
require.Equal(t, tc.expectedStatus, val, "Expected value")
}()
- processMessages(1, tc.processedValue)
+ kw.processMessages(t, 1, tc.processedValue, ready)
wg.Wait()
})
}
@@ -237,18 +236,23 @@ func TestKeyChangesParallel(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(runTimes)
+ ready := make(chan struct{})
+
+ kw := NewKeyWatcher()
+ defer kw.Shutdown()
for i := 0; i < runTimes; i++ {
go func() {
defer wg.Done()
- val, err := WatchKey(runnerKey, tc.watchValue, time.Second)
+ <-ready
+ val, err := kw.WatchKey(runnerKey, tc.watchValue, time.Second)
require.NoError(t, err, "Expected no error")
require.Equal(t, tc.expectedStatus, val, "Expected value")
}()
}
- processMessages(runTimes, tc.processedValue)
+ kw.processMessages(t, runTimes, tc.processedValue, ready)
wg.Wait()
})
}
@@ -257,7 +261,10 @@ func TestKeyChangesParallel(t *testing.T) {
func TestShutdown(t *testing.T) {
conn, td := setupMockPool()
defer td()
- defer func() { shutdown = make(chan struct{}) }()
+
+ kw := NewKeyWatcher()
+ kw.conn = &redis.PubSubConn{Conn: redigomock.NewConn()}
+ defer kw.Shutdown()
conn.Command("GET", runnerKey).Expect("something")
@@ -265,30 +272,30 @@ func TestShutdown(t *testing.T) {
wg.Add(2)
go func() {
- val, err := WatchKey(runnerKey, "something", 10*time.Second)
+ defer wg.Done()
+ val, err := kw.WatchKey(runnerKey, "something", 10*time.Second)
require.NoError(t, err, "Expected no error")
require.Equal(t, WatchKeyStatusNoChange, val, "Expected value not to change")
- wg.Done()
}()
go func() {
- require.Eventually(t, func() bool { return countWatchers(runnerKey) == 1 }, 10*time.Second, time.Millisecond)
+ defer wg.Done()
+ require.Eventually(t, func() bool { return kw.countSubscribers(runnerKey) == 1 }, 10*time.Second, time.Millisecond)
- Shutdown()
- wg.Done()
+ kw.Shutdown()
}()
wg.Wait()
- require.Eventually(t, func() bool { return countWatchers(runnerKey) == 0 }, 10*time.Second, time.Millisecond)
+ require.Eventually(t, func() bool { return kw.countSubscribers(runnerKey) == 0 }, 10*time.Second, time.Millisecond)
// Adding a key after the shutdown should result in an immediate response
var val WatchKeyStatus
var err error
done := make(chan struct{})
go func() {
- val, err = WatchKey(runnerKey, "something", 10*time.Second)
+ val, err = kw.WatchKey(runnerKey, "something", 10*time.Second)
close(done)
}()