Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2021-09-23 09:33:55 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2021-10-07 10:14:27 +0300
commit45d81b076be3755a6d7d124f9fe4d5ef8ef50c5d (patch)
treede46f3b303c148956d395f2ee421c2172e175ebd
parenta256b01c0c403adc55418625e49b09a8f1ca6136 (diff)
streamcache: Make cleanup Goroutines stoppable
The streamcache has two Goroutines which make sure to clean up stale cache entries, either on disk or in-memory ones. These walkers currently cannot be stopped at all, leading to Goroutine leakages in our tests. Fix this by making the `Stop()` functions also signal the Goroutines to stop and by calling this function in the testserver setup code.
-rw-r--r--internal/streamcache/cache.go15
-rw-r--r--internal/streamcache/cache_test.go39
-rw-r--r--internal/streamcache/filestore.go15
-rw-r--r--internal/streamcache/filestore_test.go8
-rw-r--r--internal/testhelper/testserver/gitaly.go1
5 files changed, 29 insertions, 49 deletions
diff --git a/internal/streamcache/cache.go b/internal/streamcache/cache.go
index 882838206..0ea6ca3f0 100644
--- a/internal/streamcache/cache.go
+++ b/internal/streamcache/cache.go
@@ -122,18 +122,19 @@ type cache struct {
stopOnce sync.Once
logger logrus.FieldLogger
dir string
+ sleepLoop *dontpanic.Forever
}
// New returns a new cache instance.
func New(cfg config.StreamCacheConfig, logger logrus.FieldLogger) Cache {
if cfg.Enabled {
- return newCacheWithSleep(cfg.Dir, cfg.MaxAge.Duration(), time.Sleep, logger)
+ return newCacheWithSleep(cfg.Dir, cfg.MaxAge.Duration(), time.After, logger)
}
return NullCache{}
}
-func newCacheWithSleep(dir string, maxAge time.Duration, sleep func(time.Duration), logger logrus.FieldLogger) Cache {
+func newCacheWithSleep(dir string, maxAge time.Duration, sleep func(time.Duration) <-chan time.Time, logger logrus.FieldLogger) Cache {
fs := newFilestore(dir, maxAge, sleep, logger)
c := &cache{
@@ -143,13 +144,15 @@ func newCacheWithSleep(dir string, maxAge time.Duration, sleep func(time.Duratio
stop: make(chan struct{}),
logger: logger,
dir: dir,
+ sleepLoop: dontpanic.NewForever(time.Minute),
}
- dontpanic.NewForever(time.Minute).Go(func() {
+ c.sleepLoop.Go(func() {
sleepLoop(c.stop, c.maxAge, sleep, c.clean)
})
go func() {
<-c.stop
+ c.sleepLoop.Cancel()
fs.Stop()
}()
@@ -353,19 +356,17 @@ func (w *waiter) Wait(ctx context.Context) error {
}
}
-func sleepLoop(done chan struct{}, period time.Duration, sleep func(time.Duration), callback func()) {
+func sleepLoop(done chan struct{}, period time.Duration, sleep func(time.Duration) <-chan time.Time, callback func()) {
const maxPeriod = time.Minute
if period <= 0 || period >= maxPeriod {
period = maxPeriod
}
for {
- sleep(period)
-
select {
case <-done:
return
- default:
+ case <-sleep(period):
}
callback()
diff --git a/internal/streamcache/cache_test.go b/internal/streamcache/cache_test.go
index 708d09fd9..f562dad42 100644
--- a/internal/streamcache/cache_test.go
+++ b/internal/streamcache/cache_test.go
@@ -10,7 +10,6 @@ import (
"os"
"path/filepath"
"strings"
- "sync"
"testing"
"time"
@@ -221,35 +220,6 @@ func TestCache_scope(t *testing.T) {
}
}
-type clock struct {
- n int
- sync.Mutex
- *sync.Cond
-}
-
-func newClock() *clock {
- cl := &clock{}
- cl.Cond = sync.NewCond(cl)
- return cl
-}
-
-func (cl *clock) wait() {
- cl.Lock()
- defer cl.Unlock()
-
- for old := cl.n; old == cl.n; {
- cl.Cond.Wait()
- }
-}
-
-func (cl *clock) advance() {
- cl.Lock()
- defer cl.Unlock()
-
- cl.n++
- cl.Cond.Broadcast()
-}
-
func TestCache_diskCleanup(t *testing.T) {
tmp := testhelper.TempDir(t)
@@ -257,8 +227,11 @@ func TestCache_diskCleanup(t *testing.T) {
key = "test key"
)
- cl := newClock()
- c := newCacheWithSleep(tmp, 0, func(time.Duration) { cl.wait() }, log.Default())
+ timerCh := make(chan time.Time)
+
+ c := newCacheWithSleep(tmp, 0, func(time.Duration) <-chan time.Time {
+ return timerCh
+ }, log.Default())
defer c.Stop()
content := func(i int) string { return fmt.Sprintf("content %d", i) }
@@ -278,7 +251,7 @@ func TestCache_diskCleanup(t *testing.T) {
requireCacheEntries(t, c, 1)
// Unblock cleanup goroutines so they run exactly once
- cl.advance()
+ close(timerCh)
// Give them time to do their work
time.Sleep(10 * time.Millisecond)
diff --git a/internal/streamcache/filestore.go b/internal/streamcache/filestore.go
index c3d4d99cd..8933bc127 100644
--- a/internal/streamcache/filestore.go
+++ b/internal/streamcache/filestore.go
@@ -56,16 +56,19 @@ type filestore struct {
id []byte
counter uint64
stop chan struct{}
+
+ sleepLoop *dontpanic.Forever
}
-func newFilestore(dir string, maxAge time.Duration, sleep func(time.Duration), logger logrus.FieldLogger) *filestore {
+func newFilestore(dir string, maxAge time.Duration, sleep func(time.Duration) <-chan time.Time, logger logrus.FieldLogger) *filestore {
fs := &filestore{
- dir: dir,
- maxAge: maxAge,
- stop: make(chan struct{}),
+ dir: dir,
+ maxAge: maxAge,
+ stop: make(chan struct{}),
+ sleepLoop: dontpanic.NewForever(time.Minute),
}
- dontpanic.NewForever(time.Minute).Go(func() {
+ fs.sleepLoop.Go(func() {
sleepLoop(fs.stop, fs.maxAge, sleep, func() {
diskUsageGauge.WithLabelValues(fs.dir).Set(fs.diskUsage())
@@ -146,6 +149,8 @@ func (fs *filestore) Stop() {
default:
close(fs.stop)
}
+
+ fs.sleepLoop.Cancel()
}
// cleanWalk removes files but not directories. This is to avoid races
diff --git a/internal/streamcache/filestore_test.go b/internal/streamcache/filestore_test.go
index b5c1f055e..cf8a29506 100644
--- a/internal/streamcache/filestore_test.go
+++ b/internal/streamcache/filestore_test.go
@@ -16,7 +16,7 @@ import (
func TestFilestoreCreate(t *testing.T) {
tmp := testhelper.TempDir(t)
- fs := newFilestore(tmp, 0, time.Sleep, log.Default())
+ fs := newFilestore(tmp, 0, time.After, log.Default())
defer fs.Stop()
f, err := fs.Create()
@@ -39,7 +39,7 @@ func TestFilestoreCreate(t *testing.T) {
func TestFilestoreCreate_concurrency(t *testing.T) {
tmp := testhelper.TempDir(t)
- fs := newFilestore(tmp, time.Hour, time.Sleep, log.Default())
+ fs := newFilestore(tmp, time.Hour, time.After, log.Default())
defer fs.Stop()
const N = 100
@@ -81,7 +81,7 @@ func TestFilestoreCreate_uniqueness(t *testing.T) {
filenames := make(map[string]struct{})
for j := 0; j < M; j++ {
- fs := newFilestore(tmp, time.Hour, time.Sleep, log.Default())
+ fs := newFilestore(tmp, time.Hour, time.After, log.Default())
defer fs.Stop()
for i := 0; i < N; i++ {
@@ -101,7 +101,7 @@ func TestFilestoreCreate_uniqueness(t *testing.T) {
func TestFilestoreCleanwalk(t *testing.T) {
tmp := testhelper.TempDir(t)
- fs := newFilestore(tmp, time.Hour, time.Sleep, log.Default())
+ fs := newFilestore(tmp, time.Hour, time.After, log.Default())
defer fs.Stop()
dir1 := filepath.Join(tmp, "dir1")
diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go
index 7dac962f3..cf7bf7b04 100644
--- a/internal/testhelper/testserver/gitaly.go
+++ b/internal/testhelper/testserver/gitaly.go
@@ -350,6 +350,7 @@ func (gsd *gitalyServerDeps) createDependencies(t testing.TB, cfg config.Cfg, ru
if gsd.packObjectsCache == nil {
gsd.packObjectsCache = streamcache.New(cfg.PackObjectsCache, gsd.logger)
+ t.Cleanup(gsd.packObjectsCache.Stop)
}
return &service.Dependencies{