Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2022-03-15 23:24:39 +0300
committerJohn Cai <jcai@gitlab.com>2022-03-18 05:24:47 +0300
commit4676f288e8e4befbaddab3044823470f0c4dc537 (patch)
tree1b896a74846eb51d01520d43270b3fe8fde27ed7
parentdb8b5d43582182fd743ef2a1fdf84b07efd8f786 (diff)
housekeeping: Limit concurrency of OptimizeRepository to 1jc-limit-optimize-repository-concurrency
OptimizeRepository should not run concurrently. It could lead to a data race that ends up deleting objects incorrectly. In order to limit the concurrency to 1, we can use a simple mutex. This change also reorganizes the code so that we can test it more easily by swapping in an implementation of optimizeRepository. Changelog: changed
-rw-r--r--internal/git/housekeeping/manager.go4
-rw-r--r--internal/git/housekeeping/optimize_repository.go17
-rw-r--r--internal/git/housekeeping/optimize_repository_test.go115
3 files changed, 136 insertions, 0 deletions
diff --git a/internal/git/housekeeping/manager.go b/internal/git/housekeeping/manager.go
index 859d7091e..1113f670f 100644
--- a/internal/git/housekeeping/manager.go
+++ b/internal/git/housekeeping/manager.go
@@ -2,6 +2,7 @@ package housekeeping
import (
"context"
+ "sync"
"github.com/prometheus/client_golang/prometheus"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/localrepo"
@@ -25,6 +26,8 @@ type RepositoryManager struct {
tasksTotal *prometheus.CounterVec
tasksLatency *prometheus.HistogramVec
prunedFilesTotal *prometheus.CounterVec
+ optimizeFunc func(ctx context.Context, m *RepositoryManager, repo *localrepo.Repo) error
+ reposInProgress sync.Map
}
// NewManager creates a new RepositoryManager.
@@ -53,6 +56,7 @@ func NewManager(txManager transaction.Manager) *RepositoryManager {
},
[]string{"filetype"},
),
+ optimizeFunc: optimizeRepository,
}
}
diff --git a/internal/git/housekeeping/optimize_repository.go b/internal/git/housekeeping/optimize_repository.go
index 20c0032eb..86431439f 100644
--- a/internal/git/housekeeping/optimize_repository.go
+++ b/internal/git/housekeeping/optimize_repository.go
@@ -22,6 +22,23 @@ import (
// OptimizeRepository performs optimizations on the repository. Whether optimizations are performed
// or not depends on a set of heuristics.
func (m *RepositoryManager) OptimizeRepository(ctx context.Context, repo *localrepo.Repo) error {
+ path, err := repo.Path()
+ if err != nil {
+ return err
+ }
+
+ if _, ok := m.reposInProgress.LoadOrStore(path, struct{}{}); ok {
+ return nil
+ }
+
+ defer func() {
+ m.reposInProgress.Delete(path)
+ }()
+
+ return m.optimizeFunc(ctx, m, repo)
+}
+
+func optimizeRepository(ctx context.Context, m *RepositoryManager, repo *localrepo.Repo) error {
totalTimer := prometheus.NewTimer(m.tasksLatency.WithLabelValues("total"))
optimizations := struct {
diff --git a/internal/git/housekeeping/optimize_repository_test.go b/internal/git/housekeeping/optimize_repository_test.go
index 2533b7b99..eaca0f23e 100644
--- a/internal/git/housekeeping/optimize_repository_test.go
+++ b/internal/git/housekeeping/optimize_repository_test.go
@@ -1,15 +1,18 @@
package housekeeping
import (
+ "context"
"fmt"
"io"
"os"
"path/filepath"
"strings"
+ "sync"
"testing"
"time"
"github.com/prometheus/client_golang/prometheus/testutil"
+ "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/internal/backchannel"
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
@@ -581,6 +584,118 @@ func TestOptimizeRepository(t *testing.T) {
}
}
+func TestOptimizeRepository_ConcurrencyLimit(t *testing.T) {
+ ctx := testhelper.Context(t)
+ cfg := testcfg.Build(t)
+
+ t.Run("subsequent calls get skipped", func(t *testing.T) {
+ reqReceivedCh, ch := make(chan struct{}), make(chan struct{})
+
+ repoProto, _ := gittest.CloneRepo(t, cfg, cfg.Storages[0])
+ repo := localrepo.NewTestRepo(t, cfg, repoProto)
+
+ manager := &RepositoryManager{
+ optimizeFunc: func(_ context.Context, _ *RepositoryManager, _ *localrepo.Repo) error {
+ reqReceivedCh <- struct{}{}
+ ch <- struct{}{}
+
+ return nil
+ },
+ }
+
+ go func() {
+ require.NoError(t, manager.OptimizeRepository(ctx, repo))
+ }()
+
+ <-reqReceivedCh
+ // second call will not block, but should be returned immediately
+ require.NoError(t, manager.OptimizeRepository(ctx, repo))
+
+ <-ch
+ })
+
+ t.Run("multiple repositories concurrently", func(t *testing.T) {
+ reqReceivedCh, ch := make(chan struct{}), make(chan struct{})
+
+ repoProtoFirst, _ := gittest.CloneRepo(t, cfg, cfg.Storages[0])
+ repoFirst := localrepo.NewTestRepo(t, cfg, repoProtoFirst)
+ repoProtoSecond, _ := gittest.CloneRepo(t, cfg, cfg.Storages[0])
+ repoSecond := localrepo.NewTestRepo(t, cfg, repoProtoSecond)
+
+ reposOptimized := make(map[string]struct{})
+
+ manager := &RepositoryManager{
+ optimizeFunc: func(_ context.Context, _ *RepositoryManager, repo *localrepo.Repo) error {
+ reposOptimized[repo.GetRelativePath()] = struct{}{}
+
+ if repo.GitRepo.GetRelativePath() == repoFirst.GetRelativePath() {
+ reqReceivedCh <- struct{}{}
+ ch <- struct{}{}
+ }
+
+ return nil
+ },
+ }
+
+ // The first call will block. Calls to optimize any other repositories should not be
+ // affected.
+ go func() {
+ require.NoError(t, manager.OptimizeRepository(ctx, repoFirst))
+ }()
+
+ <-reqReceivedCh
+
+ require.NoError(t, manager.OptimizeRepository(ctx, repoSecond))
+
+ <-ch
+
+ assert.Contains(t, reposOptimized, repoFirst.GetRelativePath())
+ assert.Contains(t, reposOptimized, repoSecond.GetRelativePath())
+ })
+
+ t.Run("serialized optimizations", func(t *testing.T) {
+ reqReceivedCh, ch := make(chan struct{}), make(chan struct{})
+ repoProto, _ := gittest.CloneRepo(t, cfg, cfg.Storages[0])
+ repo := localrepo.NewTestRepo(t, cfg, repoProto)
+ var optimizations int
+
+ manager := &RepositoryManager{
+ optimizeFunc: func(_ context.Context, _ *RepositoryManager, _ *localrepo.Repo) error {
+ optimizations++
+
+ if optimizations == 1 {
+ reqReceivedCh <- struct{}{}
+ ch <- struct{}{}
+ }
+
+ return nil
+ },
+ }
+
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ require.NoError(t, manager.OptimizeRepository(ctx, repo))
+ wg.Done()
+ }()
+
+ <-reqReceivedCh
+
+ require.NoError(t, manager.OptimizeRepository(ctx, repo))
+ require.NoError(t, manager.OptimizeRepository(ctx, repo))
+ require.NoError(t, manager.OptimizeRepository(ctx, repo))
+ assert.Equal(t, 1, optimizations)
+
+ <-ch
+ wg.Wait()
+
+ require.NoError(t, manager.OptimizeRepository(ctx, repo))
+ require.NoError(t, manager.OptimizeRepository(ctx, repo))
+ require.NoError(t, manager.OptimizeRepository(ctx, repo))
+ assert.Equal(t, 4, optimizations)
+ })
+}
+
func TestPruneIfNeeded(t *testing.T) {
ctx := testhelper.Context(t)
cfg := testcfg.Build(t)