Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGitLab Bot <gitlab-bot@gitlab.com>2021-06-04 15:10:17 +0300
committerGitLab Bot <gitlab-bot@gitlab.com>2021-06-04 15:10:17 +0300
commitc80a1141e306596202f694b101bfb1aab1864de9 (patch)
tree46aaee47523ecd57fa6396dae224c3f1cc4079eb /workhorse/internal
parent57f8f3552ca37f38f19a6520737ae1ce0009efb3 (diff)
Add latest changes from gitlab-org/gitlab@master
Diffstat (limited to 'workhorse/internal')
-rw-r--r--workhorse/internal/config/config.go3
-rw-r--r--workhorse/internal/redis/keywatcher.go18
-rw-r--r--workhorse/internal/redis/keywatcher_test.go55
3 files changed, 75 insertions, 1 deletions
diff --git a/workhorse/internal/config/config.go b/workhorse/internal/config/config.go
index d019a7710bf..9f214385f81 100644
--- a/workhorse/internal/config/config.go
+++ b/workhorse/internal/config/config.go
@@ -28,7 +28,7 @@ type TomlDuration struct {
time.Duration
}
-func (d *TomlDuration) UnmarshalTest(text []byte) error {
+func (d *TomlDuration) UnmarshalText(text []byte) error {
temp, err := time.ParseDuration(string(text))
d.Duration = temp
return err
@@ -103,6 +103,7 @@ type Config struct {
PropagateCorrelationID bool `toml:"-"`
ImageResizerConfig ImageResizerConfig `toml:"image_resizer"`
AltDocumentRoot string `toml:"alt_document_root"`
+ ShutdownTimeout TomlDuration `toml:"shutdown_timeout"`
}
var DefaultImageResizerConfig = ImageResizerConfig{
diff --git a/workhorse/internal/redis/keywatcher.go b/workhorse/internal/redis/keywatcher.go
index 8f3e61b5e9f..10d80d13d22 100644
--- a/workhorse/internal/redis/keywatcher.go
+++ b/workhorse/internal/redis/keywatcher.go
@@ -17,6 +17,7 @@ import (
var (
keyWatcher = make(map[string][]chan string)
keyWatcherMutex sync.Mutex
+ shutdown = make(chan struct{})
redisReconnectTimeout = backoff.Backoff{
//These are the defaults
Min: 100 * time.Millisecond,
@@ -112,6 +113,20 @@ func Process() {
}
}
+func Shutdown() {
+ log.Info("keywatcher: shutting down")
+
+ keyWatcherMutex.Lock()
+ defer keyWatcherMutex.Unlock()
+
+ select {
+ case <-shutdown:
+ // already closed
+ default:
+ close(shutdown)
+ }
+}
+
func notifyChanWatchers(key, value string) {
keyWatcherMutex.Lock()
defer keyWatcherMutex.Unlock()
@@ -182,6 +197,9 @@ func WatchKey(key, value string, timeout time.Duration) (WatchKeyStatus, error)
}
select {
+ case <-shutdown:
+ log.WithFields(log.Fields{"key": key}).Info("stopping watch due to shutdown")
+ return WatchKeyStatusNoChange, nil
case currentValue := <-kw.Chan:
if currentValue == "" {
return WatchKeyStatusNoChange, fmt.Errorf("keywatcher: redis GET failed")
diff --git a/workhorse/internal/redis/keywatcher_test.go b/workhorse/internal/redis/keywatcher_test.go
index f1ee77e2194..99892bc64b8 100644
--- a/workhorse/internal/redis/keywatcher_test.go
+++ b/workhorse/internal/redis/keywatcher_test.go
@@ -160,3 +160,58 @@ func TestWatchKeyMassivelyParallel(t *testing.T) {
processMessages(runTimes, "somethingelse")
wg.Wait()
}
+
+func TestShutdown(t *testing.T) {
+ conn, td := setupMockPool()
+ defer td()
+ defer func() { shutdown = make(chan struct{}) }()
+
+ conn.Command("GET", runnerKey).Expect("something")
+
+ wg := &sync.WaitGroup{}
+ wg.Add(2)
+
+ go func() {
+ val, err := WatchKey(runnerKey, "something", 10*time.Second)
+
+ require.NoError(t, err, "Expected no error")
+ require.Equal(t, WatchKeyStatusNoChange, val, "Expected value not to change")
+ wg.Done()
+ }()
+
+ go func() {
+ for countWatchers(runnerKey) == 0 {
+ time.Sleep(time.Millisecond)
+ }
+
+ require.Equal(t, 1, countWatchers(runnerKey))
+
+ Shutdown()
+ wg.Done()
+ }()
+
+ wg.Wait()
+
+ for countWatchers(runnerKey) == 1 {
+ time.Sleep(time.Millisecond)
+ }
+
+ require.Equal(t, 0, countWatchers(runnerKey))
+
+ // Adding a key after the shutdown should result in an immediate response
+ var val WatchKeyStatus
+ var err error
+ done := make(chan struct{})
+ go func() {
+ val, err = WatchKey(runnerKey, "something", 10*time.Second)
+ close(done)
+ }()
+
+ select {
+ case <-done:
+ require.NoError(t, err, "Expected no error")
+ require.Equal(t, WatchKeyStatusNoChange, val, "Expected value not to change")
+ case <-time.After(100 * time.Millisecond):
+ t.Fatal("timeout waiting for WatchKey")
+ }
+}