Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorToon Claes <toon@gitlab.com>2021-05-26 13:07:28 +0300
committerToon Claes <toon@gitlab.com>2021-05-26 13:07:28 +0300
commit6e58da454633a53c86d14a59587ee7a6a9d11031 (patch)
treef753d96ccfb4096476b9ce5691cafa1127a76d84 /internal
parent4089a5bbf05ace205c7acd5970c90c0c33540ede (diff)
parent5eced00e6a636116c52ac6ed7628fa98dfe4045d (diff)
Merge branch 'pks-coordinator-goroutine-leaks' into 'master'
coordinator: Fix Goroutine leaks See merge request gitlab-org/gitaly!3522
Diffstat (limited to 'internal')
-rw-r--r--internal/git/catfile/batch_cache.go54
-rw-r--r--internal/git/catfile/batch_cache_test.go8
-rw-r--r--internal/git/catfile/batch_test.go4
-rw-r--r--internal/praefect/coordinator_test.go1
-rw-r--r--internal/praefect/helper_test.go2
-rw-r--r--internal/testhelper/testserver/gitaly.go4
6 files changed, 47 insertions, 26 deletions
diff --git a/internal/git/catfile/batch_cache.go b/internal/git/catfile/batch_cache.go
index 08af66b4d..53f6e79d8 100644
--- a/internal/git/catfile/batch_cache.go
+++ b/internal/git/catfile/batch_cache.go
@@ -61,24 +61,25 @@ type entry struct {
// an entry gets added it gets an expiry time based on a fixed TTL. A
// monitor goroutine periodically evicts expired entries.
type BatchCache struct {
- entries []*entry
- sync.Mutex
-
// maxLen is the maximum number of keys in the cache
maxLen int
-
// ttl is the fixed ttl for cache entries
ttl time.Duration
-
// injectSpawnErrors is used for testing purposes only. If set to true, then spawned batch
// processes will simulate spawn errors.
injectSpawnErrors bool
+ // monitorTicker is the ticker used for the monitoring Goroutine.
+ monitorTicker *time.Ticker
+ monitorDone chan interface{}
catfileCacheCounter *prometheus.CounterVec
currentCatfileProcesses prometheus.Gauge
totalCatfileProcesses prometheus.Counter
catfileLookupCounter *prometheus.CounterVec
catfileCacheMembers prometheus.Gauge
+
+ entriesMutex sync.Mutex
+ entries []*entry
}
// NewCache creates a new catfile process cache.
@@ -86,6 +87,15 @@ func NewCache(cfg config.Cfg) *BatchCache {
return newCache(defaultBatchfileTTL, cfg.Git.CatfileCacheSize, defaultEvictionInterval)
}
+// Stop stops the monitoring Goroutine and evicts all cached processes. This must only be called
+// once.
+func (bc *BatchCache) Stop() {
+ bc.monitorTicker.Stop()
+ bc.monitorDone <- struct{}{}
+ <-bc.monitorDone
+ bc.Evict()
+}
+
func newCache(ttl time.Duration, maxLen int, refreshInterval time.Duration) *BatchCache {
if maxLen <= 0 {
maxLen = defaultMaxLen
@@ -126,9 +136,11 @@ func newCache(ttl time.Duration, maxLen int, refreshInterval time.Duration) *Bat
Help: "Gauge of catfile cache members",
},
),
+ monitorTicker: time.NewTicker(refreshInterval),
+ monitorDone: make(chan interface{}),
}
- go bc.monitor(refreshInterval)
+ go bc.monitor()
return bc
}
@@ -146,11 +158,15 @@ func (bc *BatchCache) Collect(metrics chan<- prometheus.Metric) {
bc.catfileCacheMembers.Collect(metrics)
}
-func (bc *BatchCache) monitor(refreshInterval time.Duration) {
- ticker := time.NewTicker(refreshInterval)
-
- for range ticker.C {
- bc.enforceTTL(time.Now())
+func (bc *BatchCache) monitor() {
+ for {
+ select {
+ case <-bc.monitorTicker.C:
+ bc.enforceTTL(time.Now())
+ case <-bc.monitorDone:
+ close(bc.monitorDone)
+ return
+ }
}
}
@@ -211,8 +227,8 @@ func (bc *BatchCache) returnWhenDone(done <-chan struct{}, cacheKey key, c *batc
// add adds a key, value pair to bc. If there are too many keys in bc
// already add will evict old keys until the length is OK again.
func (bc *BatchCache) add(k key, b *batch) {
- bc.Lock()
- defer bc.Unlock()
+ bc.entriesMutex.Lock()
+ defer bc.entriesMutex.Unlock()
if i, ok := bc.lookup(k); ok {
bc.catfileCacheCounter.WithLabelValues("duplicate").Inc()
@@ -235,8 +251,8 @@ func (bc *BatchCache) len() int { return len(bc.entries) }
// checkout removes a value from bc. After use the caller can re-add the value with bc.Add.
func (bc *BatchCache) checkout(k key) (*batch, bool) {
- bc.Lock()
- defer bc.Unlock()
+ bc.entriesMutex.Lock()
+ defer bc.entriesMutex.Unlock()
i, ok := bc.lookup(k)
if !ok {
@@ -254,8 +270,8 @@ func (bc *BatchCache) checkout(k key) (*batch, bool) {
// enforceTTL evicts all entries older than now, assuming the entry
// expiry times are increasing.
func (bc *BatchCache) enforceTTL(now time.Time) {
- bc.Lock()
- defer bc.Unlock()
+ bc.entriesMutex.Lock()
+ defer bc.entriesMutex.Unlock()
for bc.len() > 0 && now.After(bc.head().expiry) {
bc.evictHead()
@@ -264,8 +280,8 @@ func (bc *BatchCache) enforceTTL(now time.Time) {
// Evict evicts all cached processes from the cache.
func (bc *BatchCache) Evict() {
- bc.Lock()
- defer bc.Unlock()
+ bc.entriesMutex.Lock()
+ defer bc.entriesMutex.Unlock()
for bc.len() > 0 {
bc.evictHead()
diff --git a/internal/git/catfile/batch_cache_test.go b/internal/git/catfile/batch_cache_test.go
index c797e396a..885978a7c 100644
--- a/internal/git/catfile/batch_cache_test.go
+++ b/internal/git/catfile/batch_cache_test.go
@@ -158,8 +158,8 @@ func TestAutoExpiry(t *testing.T) {
}
func requireCacheValid(t *testing.T, bc *BatchCache) {
- bc.Lock()
- defer bc.Unlock()
+ bc.entriesMutex.Lock()
+ defer bc.entriesMutex.Unlock()
for _, ent := range bc.entries {
v := ent.value
@@ -172,8 +172,8 @@ func testValue() *batch { return &batch{} }
func testKey(i int) key { return key{sessionID: fmt.Sprintf("key-%d", i)} }
func keys(bc *BatchCache) []key {
- bc.Lock()
- defer bc.Unlock()
+ bc.entriesMutex.Lock()
+ defer bc.entriesMutex.Unlock()
var result []key
for _, ent := range bc.entries {
diff --git a/internal/git/catfile/batch_test.go b/internal/git/catfile/batch_test.go
index d0a2766d8..a7df2d846 100644
--- a/internal/git/catfile/batch_test.go
+++ b/internal/git/catfile/batch_test.go
@@ -443,7 +443,7 @@ func numGitChildren(t *testing.T) int {
}
func cacheSize(bc *BatchCache) int {
- bc.Lock()
- defer bc.Unlock()
+ bc.entriesMutex.Lock()
+ defer bc.entriesMutex.Unlock()
return bc.len()
}
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 0b3c28b7a..1355ceb64 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -1480,6 +1480,7 @@ func TestCoordinator_grpcErrorHandling(t *testing.T) {
grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec())),
})
require.NoError(t, err)
+ defer conn.Close()
gitalies[gitaly] = gitalyNode{
mock: &nodes.MockNode{
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index b9ee12476..caa15a98e 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -208,6 +208,8 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp
cc := dialLocalPort(t, port, false)
cleanup := func() {
+ cc.Close()
+
for _, cu := range cleanups {
cu()
}
diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go
index a25413088..0735636d0 100644
--- a/internal/testhelper/testserver/gitaly.go
+++ b/internal/testhelper/testserver/gitaly.go
@@ -320,7 +320,9 @@ func (gsd *gitalyServerDeps) createDependencies(t testing.TB, cfg config.Cfg, ru
}
if gsd.catfileCache == nil {
- gsd.catfileCache = catfile.NewCache(cfg)
+ cache := catfile.NewCache(cfg)
+ gsd.catfileCache = cache
+ t.Cleanup(cache.Stop)
}
if gsd.diskCache == nil {