diff options
Diffstat (limited to 'workhorse/internal/redis/keywatcher_test.go')
-rw-r--r-- | workhorse/internal/redis/keywatcher_test.go | 103 |
1 files changed, 55 insertions, 48 deletions
diff --git a/workhorse/internal/redis/keywatcher_test.go b/workhorse/internal/redis/keywatcher_test.go index 29041226b14..bae49d81bb1 100644 --- a/workhorse/internal/redis/keywatcher_test.go +++ b/workhorse/internal/redis/keywatcher_test.go @@ -1,7 +1,6 @@ package redis import ( - "errors" "sync" "testing" "time" @@ -38,33 +37,38 @@ func createUnsubscribeMessage(key string) []interface{} { } } -func countWatchers(key string) int { - keyWatcherMutex.Lock() - defer keyWatcherMutex.Unlock() - return len(keyWatcher[key]) -} - -func deleteWatchers(key string) { - keyWatcherMutex.Lock() - defer keyWatcherMutex.Unlock() - delete(keyWatcher, key) +func (kw *KeyWatcher) countSubscribers(key string) int { + kw.mu.Lock() + defer kw.mu.Unlock() + return len(kw.subscribers[key]) } // Forces a run of the `Process` loop against a mock PubSubConn. -func processMessages(numWatchers int, value string) { +func (kw *KeyWatcher) processMessages(t *testing.T, numWatchers int, value string, ready chan<- struct{}) { psc := redigomock.NewConn() + psc.ReceiveWait = true - // Setup the initial subscription message - psc.Command("SUBSCRIBE", keySubChannel).Expect(createSubscribeMessage(keySubChannel)) - psc.Command("UNSUBSCRIBE", keySubChannel).Expect(createUnsubscribeMessage(keySubChannel)) - psc.AddSubscriptionMessage(createSubscriptionMessage(keySubChannel, runnerKey+"="+value)) + channel := channelPrefix + runnerKey + psc.Command("SUBSCRIBE", channel).Expect(createSubscribeMessage(channel)) + psc.Command("UNSUBSCRIBE", channel).Expect(createUnsubscribeMessage(channel)) + psc.AddSubscriptionMessage(createSubscriptionMessage(channel, value)) - // Wait for all the `WatchKey` calls to be registered - for countWatchers(runnerKey) != numWatchers { - time.Sleep(time.Millisecond) - } + errC := make(chan error) + go func() { errC <- kw.receivePubSubStream(psc) }() + + require.Eventually(t, func() bool { + kw.mu.Lock() + defer kw.mu.Unlock() + return kw.conn != nil + }, time.Second, time.Millisecond) + close(ready) + + require.Eventually(t, func() bool { + return kw.countSubscribers(runnerKey) == numWatchers + }, time.Second, time.Millisecond) + close(psc.ReceiveNow) - processInner(psc) + require.NoError(t, <-errC) } type keyChangeTestCase struct { @@ -77,18 +81,6 @@ type keyChangeTestCase struct { timeout time.Duration } -func TestKeyChangesBubblesUpError(t *testing.T) { - conn, td := setupMockPool() - defer td() - - conn.Command("GET", runnerKey).ExpectError(errors.New("test error")) - - _, err := WatchKey(runnerKey, "something", time.Second) - require.Error(t, err, "Expected error") - - deleteWatchers(runnerKey) -} - func TestKeyChangesInstantReturn(t *testing.T) { testCases := []keyChangeTestCase{ // WatchKeyStatusAlreadyChanged @@ -135,12 +127,14 @@ func TestKeyChangesInstantReturn(t *testing.T) { conn.Command("GET", runnerKey).Expect(tc.returnValue) } - val, err := WatchKey(runnerKey, tc.watchValue, tc.timeout) + kw := NewKeyWatcher() + defer kw.Shutdown() + kw.conn = &redis.PubSubConn{Conn: redigomock.NewConn()} + + val, err := kw.WatchKey(runnerKey, tc.watchValue, tc.timeout) require.NoError(t, err, "Expected no error") require.Equal(t, tc.expectedStatus, val, "Expected value") - - deleteWatchers(runnerKey) }) } } @@ -183,18 +177,23 @@ func TestKeyChangesWhenWatching(t *testing.T) { conn.Command("GET", runnerKey).Expect(tc.returnValue) } + kw := NewKeyWatcher() + defer kw.Shutdown() + wg := &sync.WaitGroup{} wg.Add(1) + ready := make(chan struct{}) go func() { defer wg.Done() - val, err := WatchKey(runnerKey, tc.watchValue, time.Second) + <-ready + val, err := kw.WatchKey(runnerKey, tc.watchValue, time.Second) require.NoError(t, err, "Expected no error") require.Equal(t, tc.expectedStatus, val, "Expected value") }() - processMessages(1, tc.processedValue) + kw.processMessages(t, 1, tc.processedValue, ready) wg.Wait() }) } @@ -237,18 +236,23 @@ func TestKeyChangesParallel(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(runTimes) + ready := make(chan struct{}) + + kw := NewKeyWatcher() + defer kw.Shutdown() for i := 0; i < runTimes; i++ { go func() { defer wg.Done() - val, err := WatchKey(runnerKey, tc.watchValue, time.Second) + <-ready + val, err := kw.WatchKey(runnerKey, tc.watchValue, time.Second) require.NoError(t, err, "Expected no error") require.Equal(t, tc.expectedStatus, val, "Expected value") }() } - processMessages(runTimes, tc.processedValue) + kw.processMessages(t, runTimes, tc.processedValue, ready) wg.Wait() }) } @@ -257,7 +261,10 @@ func TestKeyChangesParallel(t *testing.T) { func TestShutdown(t *testing.T) { conn, td := setupMockPool() defer td() - defer func() { shutdown = make(chan struct{}) }() + + kw := NewKeyWatcher() + kw.conn = &redis.PubSubConn{Conn: redigomock.NewConn()} + defer kw.Shutdown() conn.Command("GET", runnerKey).Expect("something") @@ -265,30 +272,30 @@ func TestShutdown(t *testing.T) { wg.Add(2) go func() { - val, err := WatchKey(runnerKey, "something", 10*time.Second) + defer wg.Done() + val, err := kw.WatchKey(runnerKey, "something", 10*time.Second) require.NoError(t, err, "Expected no error") require.Equal(t, WatchKeyStatusNoChange, val, "Expected value not to change") - wg.Done() }() go func() { - require.Eventually(t, func() bool { return countWatchers(runnerKey) == 1 }, 10*time.Second, time.Millisecond) + defer wg.Done() + require.Eventually(t, func() bool { return kw.countSubscribers(runnerKey) == 1 }, 10*time.Second, time.Millisecond) - Shutdown() - wg.Done() + kw.Shutdown() }() wg.Wait() - require.Eventually(t, func() bool { return countWatchers(runnerKey) == 0 }, 10*time.Second, time.Millisecond) + require.Eventually(t, func() bool { return kw.countSubscribers(runnerKey) == 0 }, 10*time.Second, time.Millisecond) // Adding a key after the shutdown should result in an immediate response var val WatchKeyStatus var err error done := make(chan struct{}) go func() { - val, err = WatchKey(runnerKey, "something", 10*time.Second) + val, err = kw.WatchKey(runnerKey, "something", 10*time.Second) close(done) }() |