package redis import ( "sync" "testing" "time" "github.com/rafaeljusto/redigomock" "github.com/stretchr/testify/require" ) const ( runnerKey = "runner:build_queue:10" ) func createSubscriptionMessage(key, data string) []interface{} { return []interface{}{ []byte("message"), []byte(key), []byte(data), } } func createSubscribeMessage(key string) []interface{} { return []interface{}{ []byte("subscribe"), []byte(key), []byte("1"), } } func createUnsubscribeMessage(key string) []interface{} { return []interface{}{ []byte("unsubscribe"), []byte(key), []byte("1"), } } func countWatchers(key string) int { keyWatcherMutex.Lock() defer keyWatcherMutex.Unlock() return len(keyWatcher[key]) } func deleteWatchers(key string) { keyWatcherMutex.Lock() defer keyWatcherMutex.Unlock() delete(keyWatcher, key) } // Forces a run of the `Process` loop against a mock PubSubConn. func processMessages(numWatchers int, value string) { psc := redigomock.NewConn() // Setup the initial subscription message psc.Command("SUBSCRIBE", keySubChannel).Expect(createSubscribeMessage(keySubChannel)) psc.Command("UNSUBSCRIBE", keySubChannel).Expect(createUnsubscribeMessage(keySubChannel)) psc.AddSubscriptionMessage(createSubscriptionMessage(keySubChannel, runnerKey+"="+value)) // Wait for all the `WatchKey` calls to be registered for countWatchers(runnerKey) != numWatchers { time.Sleep(time.Millisecond) } processInner(psc) } func TestWatchKeySeenChange(t *testing.T) { conn, td := setupMockPool() defer td() conn.Command("GET", runnerKey).Expect("something") wg := &sync.WaitGroup{} wg.Add(1) go func() { val, err := WatchKey(runnerKey, "something", time.Second) require.NoError(t, err, "Expected no error") require.Equal(t, WatchKeyStatusSeenChange, val, "Expected value to change") wg.Done() }() processMessages(1, "somethingelse") wg.Wait() } func TestWatchKeyNoChange(t *testing.T) { conn, td := setupMockPool() defer td() conn.Command("GET", runnerKey).Expect("something") wg := &sync.WaitGroup{} wg.Add(1) go func() { val, err := WatchKey(runnerKey, "something", time.Second) require.NoError(t, err, "Expected no error") require.Equal(t, WatchKeyStatusNoChange, val, "Expected notification without change to value") wg.Done() }() processMessages(1, "something") wg.Wait() } func TestWatchKeyTimeout(t *testing.T) { conn, td := setupMockPool() defer td() conn.Command("GET", runnerKey).Expect("something") val, err := WatchKey(runnerKey, "something", time.Millisecond) require.NoError(t, err, "Expected no error") require.Equal(t, WatchKeyStatusTimeout, val, "Expected value to not change") // Clean up watchers since Process isn't doing that for us (not running) deleteWatchers(runnerKey) } func TestWatchKeyAlreadyChanged(t *testing.T) { conn, td := setupMockPool() defer td() conn.Command("GET", runnerKey).Expect("somethingelse") val, err := WatchKey(runnerKey, "something", time.Second) require.NoError(t, err, "Expected no error") require.Equal(t, WatchKeyStatusAlreadyChanged, val, "Expected value to have already changed") // Clean up watchers since Process isn't doing that for us (not running) deleteWatchers(runnerKey) } func TestWatchKeyMassivelyParallel(t *testing.T) { runTimes := 100 // 100 parallel watchers conn, td := setupMockPool() defer td() wg := &sync.WaitGroup{} wg.Add(runTimes) getCmd := conn.Command("GET", runnerKey) for i := 0; i < runTimes; i++ { getCmd = getCmd.Expect("something") } for i := 0; i < runTimes; i++ { go func() { val, err := WatchKey(runnerKey, "something", time.Second) require.NoError(t, err, "Expected no error") require.Equal(t, WatchKeyStatusSeenChange, val, "Expected value to change") wg.Done() }() } processMessages(runTimes, "somethingelse") wg.Wait() }