diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-09-23 09:33:55 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-10-07 10:14:27 +0300 |
commit | 45d81b076be3755a6d7d124f9fe4d5ef8ef50c5d (patch) | |
tree | de46f3b303c148956d395f2ee421c2172e175ebd | |
parent | a256b01c0c403adc55418625e49b09a8f1ca6136 (diff) |
streamcache: Make cleanup Goroutines stoppable
The streamcache has two Goroutines which make sure to clean up stale
cache entries, either on disk or in-memory ones. These walkers currently
cannot be stopped at all, leading to Goroutine leakages in our tests.
Fix this by making the `Stop()` functions also signal the Goroutines to
stop and by calling this function in the testserver setup code.
-rw-r--r-- | internal/streamcache/cache.go | 15 | ||||
-rw-r--r-- | internal/streamcache/cache_test.go | 39 | ||||
-rw-r--r-- | internal/streamcache/filestore.go | 15 | ||||
-rw-r--r-- | internal/streamcache/filestore_test.go | 8 | ||||
-rw-r--r-- | internal/testhelper/testserver/gitaly.go | 1 |
5 files changed, 29 insertions, 49 deletions
diff --git a/internal/streamcache/cache.go b/internal/streamcache/cache.go index 882838206..0ea6ca3f0 100644 --- a/internal/streamcache/cache.go +++ b/internal/streamcache/cache.go @@ -122,18 +122,19 @@ type cache struct { stopOnce sync.Once logger logrus.FieldLogger dir string + sleepLoop *dontpanic.Forever } // New returns a new cache instance. func New(cfg config.StreamCacheConfig, logger logrus.FieldLogger) Cache { if cfg.Enabled { - return newCacheWithSleep(cfg.Dir, cfg.MaxAge.Duration(), time.Sleep, logger) + return newCacheWithSleep(cfg.Dir, cfg.MaxAge.Duration(), time.After, logger) } return NullCache{} } -func newCacheWithSleep(dir string, maxAge time.Duration, sleep func(time.Duration), logger logrus.FieldLogger) Cache { +func newCacheWithSleep(dir string, maxAge time.Duration, sleep func(time.Duration) <-chan time.Time, logger logrus.FieldLogger) Cache { fs := newFilestore(dir, maxAge, sleep, logger) c := &cache{ @@ -143,13 +144,15 @@ func newCacheWithSleep(dir string, maxAge time.Duration, sleep func(time.Duratio stop: make(chan struct{}), logger: logger, dir: dir, + sleepLoop: dontpanic.NewForever(time.Minute), } - dontpanic.NewForever(time.Minute).Go(func() { + c.sleepLoop.Go(func() { sleepLoop(c.stop, c.maxAge, sleep, c.clean) }) go func() { <-c.stop + c.sleepLoop.Cancel() fs.Stop() }() @@ -353,19 +356,17 @@ func (w *waiter) Wait(ctx context.Context) error { } } -func sleepLoop(done chan struct{}, period time.Duration, sleep func(time.Duration), callback func()) { +func sleepLoop(done chan struct{}, period time.Duration, sleep func(time.Duration) <-chan time.Time, callback func()) { const maxPeriod = time.Minute if period <= 0 || period >= maxPeriod { period = maxPeriod } for { - sleep(period) - select { case <-done: return - default: + case <-sleep(period): } callback() diff --git a/internal/streamcache/cache_test.go b/internal/streamcache/cache_test.go index 708d09fd9..f562dad42 100644 --- a/internal/streamcache/cache_test.go +++ b/internal/streamcache/cache_test.go @@ -10,7 +10,6 @@ import ( "os" "path/filepath" "strings" - "sync" "testing" "time" @@ -221,35 +220,6 @@ func TestCache_scope(t *testing.T) { } } -type clock struct { - n int - sync.Mutex - *sync.Cond -} - -func newClock() *clock { - cl := &clock{} - cl.Cond = sync.NewCond(cl) - return cl -} - -func (cl *clock) wait() { - cl.Lock() - defer cl.Unlock() - - for old := cl.n; old == cl.n; { - cl.Cond.Wait() - } -} - -func (cl *clock) advance() { - cl.Lock() - defer cl.Unlock() - - cl.n++ - cl.Cond.Broadcast() -} - func TestCache_diskCleanup(t *testing.T) { tmp := testhelper.TempDir(t) @@ -257,8 +227,11 @@ func TestCache_diskCleanup(t *testing.T) { key = "test key" ) - cl := newClock() - c := newCacheWithSleep(tmp, 0, func(time.Duration) { cl.wait() }, log.Default()) + timerCh := make(chan time.Time) + + c := newCacheWithSleep(tmp, 0, func(time.Duration) <-chan time.Time { + return timerCh + }, log.Default()) defer c.Stop() content := func(i int) string { return fmt.Sprintf("content %d", i) } @@ -278,7 +251,7 @@ func TestCache_diskCleanup(t *testing.T) { requireCacheEntries(t, c, 1) // Unblock cleanup goroutines so they run exactly once - cl.advance() + close(timerCh) // Give them time to do their work time.Sleep(10 * time.Millisecond) diff --git a/internal/streamcache/filestore.go b/internal/streamcache/filestore.go index c3d4d99cd..8933bc127 100644 --- a/internal/streamcache/filestore.go +++ b/internal/streamcache/filestore.go @@ -56,16 +56,19 @@ type filestore struct { id []byte counter uint64 stop chan struct{} + + sleepLoop *dontpanic.Forever } -func newFilestore(dir string, maxAge time.Duration, sleep func(time.Duration), logger logrus.FieldLogger) *filestore { +func newFilestore(dir string, maxAge time.Duration, sleep func(time.Duration) <-chan time.Time, logger logrus.FieldLogger) *filestore { fs := &filestore{ - dir: dir, - maxAge: maxAge, - stop: make(chan struct{}), + dir: dir, + maxAge: maxAge, + stop: make(chan struct{}), + sleepLoop: dontpanic.NewForever(time.Minute), } - dontpanic.NewForever(time.Minute).Go(func() { + fs.sleepLoop.Go(func() { sleepLoop(fs.stop, fs.maxAge, sleep, func() { diskUsageGauge.WithLabelValues(fs.dir).Set(fs.diskUsage()) @@ -146,6 +149,8 @@ func (fs *filestore) Stop() { default: close(fs.stop) } + + fs.sleepLoop.Cancel() } // cleanWalk removes files but not directories. This is to avoid races diff --git a/internal/streamcache/filestore_test.go b/internal/streamcache/filestore_test.go index b5c1f055e..cf8a29506 100644 --- a/internal/streamcache/filestore_test.go +++ b/internal/streamcache/filestore_test.go @@ -16,7 +16,7 @@ import ( func TestFilestoreCreate(t *testing.T) { tmp := testhelper.TempDir(t) - fs := newFilestore(tmp, 0, time.Sleep, log.Default()) + fs := newFilestore(tmp, 0, time.After, log.Default()) defer fs.Stop() f, err := fs.Create() @@ -39,7 +39,7 @@ func TestFilestoreCreate(t *testing.T) { func TestFilestoreCreate_concurrency(t *testing.T) { tmp := testhelper.TempDir(t) - fs := newFilestore(tmp, time.Hour, time.Sleep, log.Default()) + fs := newFilestore(tmp, time.Hour, time.After, log.Default()) defer fs.Stop() const N = 100 @@ -81,7 +81,7 @@ func TestFilestoreCreate_uniqueness(t *testing.T) { filenames := make(map[string]struct{}) for j := 0; j < M; j++ { - fs := newFilestore(tmp, time.Hour, time.Sleep, log.Default()) + fs := newFilestore(tmp, time.Hour, time.After, log.Default()) defer fs.Stop() for i := 0; i < N; i++ { @@ -101,7 +101,7 @@ func TestFilestoreCreate_uniqueness(t *testing.T) { func TestFilestoreCleanwalk(t *testing.T) { tmp := testhelper.TempDir(t) - fs := newFilestore(tmp, time.Hour, time.Sleep, log.Default()) + fs := newFilestore(tmp, time.Hour, time.After, log.Default()) defer fs.Stop() dir1 := filepath.Join(tmp, "dir1") diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index 7dac962f3..cf7bf7b04 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -350,6 +350,7 @@ func (gsd *gitalyServerDeps) createDependencies(t testing.TB, cfg config.Cfg, ru if gsd.packObjectsCache == nil { gsd.packObjectsCache = streamcache.New(cfg.PackObjectsCache, gsd.logger) + t.Cleanup(gsd.packObjectsCache.Stop) } return &service.Dependencies{ |