diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-09-23 09:19:22 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-10-07 10:14:27 +0300 |
commit | fc7ed4ea0182cf3a345f0e939e70fdfc17216bd1 (patch) | |
tree | 04f3575781d0375ffbecff2123cc1f34df1ba674 | |
parent | 125f313fbae3aa0f509a9953cb4f3934b703cd1d (diff) |
dontpanic: Refactor code to allow stopping of `GoForever()`
The dontpanic package provides a `GoForever()` function which allows the
caller to run a function forever even if it panics. While this is a
useful thing to have, the current design also causes leakage of
Goroutines because we do not have a way to stop these functions even in
the context of tests.
Introduce a new `Forever` structure which now hosts the `Go()` function.
Like this, we can also easily introduce a `Cancel()` function which
causes us to abort execution of the function.
Callers are not yet adjusted to make use of the `Cancel()` function.
-rw-r--r-- | internal/cache/walker.go | 2 | ||||
-rw-r--r-- | internal/dontpanic/retry.go | 56 | ||||
-rw-r--r-- | internal/dontpanic/retry_test.go | 4 | ||||
-rw-r--r-- | internal/streamcache/cache.go | 2 | ||||
-rw-r--r-- | internal/streamcache/filestore.go | 2 |
5 files changed, 54 insertions, 12 deletions
diff --git a/internal/cache/walker.go b/internal/cache/walker.go index bfd5fba01..25403ad08 100644 --- a/internal/cache/walker.go +++ b/internal/cache/walker.go @@ -98,7 +98,7 @@ func (c *DiskCache) walkLoop(walkPath string) { logger.Infof("Starting file walker for %s", walkPath) walkTick := time.NewTicker(cleanWalkFrequency) - dontpanic.GoForever(time.Minute, func() { + dontpanic.NewForever(time.Minute).Go(func() { if err := c.cleanWalk(walkPath); err != nil { logger.Error(err) } diff --git a/internal/dontpanic/retry.go b/internal/dontpanic/retry.go index bb8fa93bb..83eb007f5 100644 --- a/internal/dontpanic/retry.go +++ b/internal/dontpanic/retry.go @@ -8,6 +8,7 @@ package dontpanic import ( + "sync" "time" sentry "github.com/getsentry/sentry-go" @@ -56,24 +57,63 @@ func catchAndLog(fn func()) bool { return normal } -// GoForever will keep retrying a function fn in a goroutine forever in the -// background (until the process exits) while recovering from panics. Each -// time a closure panics, the recovered value will be sent to Sentry and -// logged at level error. The provided backoff will delay retries to enable +// Forever encapsulates logic to run a function forever. +type Forever struct { + backoff time.Duration + + cancelOnce sync.Once + cancelCh chan struct{} + doneCh chan struct{} +} + +// NewForever creates a new Forever struct. The given duration controls how long retry of a +// function should be delayed if the function were to thrown an error. +func NewForever(backoff time.Duration) *Forever { + return &Forever{ + backoff: backoff, + cancelCh: make(chan struct{}), + doneCh: make(chan struct{}), + } +} + +// Go will keep retrying a function fn in a goroutine forever in the background (until the process +// exits) while recovering from panics. Each time a closure panics, the recovered value will be +// sent to Sentry and logged at level error. The provided backoff will delay retries to enable // "breathing" room to prevent potentially worsening the situation. -func GoForever(backoff time.Duration, fn func()) { +func (f *Forever) Go(fn func()) { go func() { + defer close(f.doneCh) + for { + select { + case <-f.cancelCh: + return + default: + } + if Try(fn) { continue } - if backoff <= 0 { + if f.backoff <= 0 { continue } - logger.Infof("dontpanic: backing off %s before retrying", backoff) - time.Sleep(backoff) + logger.Infof("dontpanic: backing off %s before retrying", f.backoff) + + select { + case <-f.cancelCh: + return + case <-time.After(f.backoff): + } } }() } + +// Cancel cancels the walking loop. +func (f *Forever) Cancel() { + f.cancelOnce.Do(func() { + close(f.cancelCh) + <-f.doneCh + }) +} diff --git a/internal/dontpanic/retry_test.go b/internal/dontpanic/retry_test.go index d94ce3801..dcb683161 100644 --- a/internal/dontpanic/retry_test.go +++ b/internal/dontpanic/retry_test.go @@ -49,11 +49,13 @@ func TestGoForever(t *testing.T) { panic("don't panic") } - dontpanic.GoForever(time.Microsecond, fn) + forever := dontpanic.NewForever(time.Microsecond) + forever.Go(fn) var actualPanics int for range recoveredQ { actualPanics++ } require.Equal(t, expectPanics, actualPanics) + forever.Cancel() } diff --git a/internal/streamcache/cache.go b/internal/streamcache/cache.go index ffeca32ce..882838206 100644 --- a/internal/streamcache/cache.go +++ b/internal/streamcache/cache.go @@ -145,7 +145,7 @@ func newCacheWithSleep(dir string, maxAge time.Duration, sleep func(time.Duratio dir: dir, } - dontpanic.GoForever(1*time.Minute, func() { + dontpanic.NewForever(time.Minute).Go(func() { sleepLoop(c.stop, c.maxAge, sleep, c.clean) }) go func() { diff --git a/internal/streamcache/filestore.go b/internal/streamcache/filestore.go index 67337c95f..c3d4d99cd 100644 --- a/internal/streamcache/filestore.go +++ b/internal/streamcache/filestore.go @@ -65,7 +65,7 @@ func newFilestore(dir string, maxAge time.Duration, sleep func(time.Duration), l stop: make(chan struct{}), } - dontpanic.GoForever(1*time.Minute, func() { + dontpanic.NewForever(time.Minute).Go(func() { sleepLoop(fs.stop, fs.maxAge, sleep, func() { diskUsageGauge.WithLabelValues(fs.dir).Set(fs.diskUsage()) |