From 4676f288e8e4befbaddab3044823470f0c4dc537 Mon Sep 17 00:00:00 2001 From: John Cai Date: Tue, 15 Mar 2022 16:24:39 -0400 Subject: housekeeping: Limit concurrency of OptimizeRepository to 1 OptimizeRepository should not run concurrently. It could lead to a data race that ends up deleting objects incorrectly. In order to limit the concurrency to 1, we can use a simple mutex. This change also reorganizes the code so that we can test it more easily by swapping in an implementation of optimizeRepository. Changelog: changed --- internal/git/housekeeping/manager.go | 4 + internal/git/housekeeping/optimize_repository.go | 17 +++ .../git/housekeeping/optimize_repository_test.go | 115 +++++++++++++++++++++ 3 files changed, 136 insertions(+) diff --git a/internal/git/housekeeping/manager.go b/internal/git/housekeeping/manager.go index 859d7091e..1113f670f 100644 --- a/internal/git/housekeeping/manager.go +++ b/internal/git/housekeeping/manager.go @@ -2,6 +2,7 @@ package housekeeping import ( "context" + "sync" "github.com/prometheus/client_golang/prometheus" "gitlab.com/gitlab-org/gitaly/v14/internal/git/localrepo" @@ -25,6 +26,8 @@ type RepositoryManager struct { tasksTotal *prometheus.CounterVec tasksLatency *prometheus.HistogramVec prunedFilesTotal *prometheus.CounterVec + optimizeFunc func(ctx context.Context, m *RepositoryManager, repo *localrepo.Repo) error + reposInProgress sync.Map } // NewManager creates a new RepositoryManager. @@ -53,6 +56,7 @@ func NewManager(txManager transaction.Manager) *RepositoryManager { }, []string{"filetype"}, ), + optimizeFunc: optimizeRepository, } } diff --git a/internal/git/housekeeping/optimize_repository.go b/internal/git/housekeeping/optimize_repository.go index 20c0032eb..86431439f 100644 --- a/internal/git/housekeeping/optimize_repository.go +++ b/internal/git/housekeeping/optimize_repository.go @@ -22,6 +22,23 @@ import ( // OptimizeRepository performs optimizations on the repository. Whether optimizations are performed // or not depends on a set of heuristics. func (m *RepositoryManager) OptimizeRepository(ctx context.Context, repo *localrepo.Repo) error { + path, err := repo.Path() + if err != nil { + return err + } + + if _, ok := m.reposInProgress.LoadOrStore(path, struct{}{}); ok { + return nil + } + + defer func() { + m.reposInProgress.Delete(path) + }() + + return m.optimizeFunc(ctx, m, repo) +} + +func optimizeRepository(ctx context.Context, m *RepositoryManager, repo *localrepo.Repo) error { totalTimer := prometheus.NewTimer(m.tasksLatency.WithLabelValues("total")) optimizations := struct { diff --git a/internal/git/housekeeping/optimize_repository_test.go b/internal/git/housekeeping/optimize_repository_test.go index 2533b7b99..eaca0f23e 100644 --- a/internal/git/housekeeping/optimize_repository_test.go +++ b/internal/git/housekeeping/optimize_repository_test.go @@ -1,15 +1,18 @@ package housekeeping import ( + "context" "fmt" "io" "os" "path/filepath" "strings" + "sync" "testing" "time" "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/internal/backchannel" "gitlab.com/gitlab-org/gitaly/v14/internal/git" @@ -581,6 +584,118 @@ func TestOptimizeRepository(t *testing.T) { } } +func TestOptimizeRepository_ConcurrencyLimit(t *testing.T) { + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + + t.Run("subsequent calls get skipped", func(t *testing.T) { + reqReceivedCh, ch := make(chan struct{}), make(chan struct{}) + + repoProto, _ := gittest.CloneRepo(t, cfg, cfg.Storages[0]) + repo := localrepo.NewTestRepo(t, cfg, repoProto) + + manager := &RepositoryManager{ + optimizeFunc: func(_ context.Context, _ *RepositoryManager, _ *localrepo.Repo) error { + reqReceivedCh <- struct{}{} + ch <- struct{}{} + + return nil + }, + } + + go func() { + require.NoError(t, manager.OptimizeRepository(ctx, repo)) + }() + + <-reqReceivedCh + // second call will not block, but should be returned immediately + require.NoError(t, manager.OptimizeRepository(ctx, repo)) + + <-ch + }) + + t.Run("multiple repositories concurrently", func(t *testing.T) { + reqReceivedCh, ch := make(chan struct{}), make(chan struct{}) + + repoProtoFirst, _ := gittest.CloneRepo(t, cfg, cfg.Storages[0]) + repoFirst := localrepo.NewTestRepo(t, cfg, repoProtoFirst) + repoProtoSecond, _ := gittest.CloneRepo(t, cfg, cfg.Storages[0]) + repoSecond := localrepo.NewTestRepo(t, cfg, repoProtoSecond) + + reposOptimized := make(map[string]struct{}) + + manager := &RepositoryManager{ + optimizeFunc: func(_ context.Context, _ *RepositoryManager, repo *localrepo.Repo) error { + reposOptimized[repo.GetRelativePath()] = struct{}{} + + if repo.GitRepo.GetRelativePath() == repoFirst.GetRelativePath() { + reqReceivedCh <- struct{}{} + ch <- struct{}{} + } + + return nil + }, + } + + // The first call will block. Calls to optimize any other repositories should not be + // affected. + go func() { + require.NoError(t, manager.OptimizeRepository(ctx, repoFirst)) + }() + + <-reqReceivedCh + + require.NoError(t, manager.OptimizeRepository(ctx, repoSecond)) + + <-ch + + assert.Contains(t, reposOptimized, repoFirst.GetRelativePath()) + assert.Contains(t, reposOptimized, repoSecond.GetRelativePath()) + }) + + t.Run("serialized optimizations", func(t *testing.T) { + reqReceivedCh, ch := make(chan struct{}), make(chan struct{}) + repoProto, _ := gittest.CloneRepo(t, cfg, cfg.Storages[0]) + repo := localrepo.NewTestRepo(t, cfg, repoProto) + var optimizations int + + manager := &RepositoryManager{ + optimizeFunc: func(_ context.Context, _ *RepositoryManager, _ *localrepo.Repo) error { + optimizations++ + + if optimizations == 1 { + reqReceivedCh <- struct{}{} + ch <- struct{}{} + } + + return nil + }, + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + require.NoError(t, manager.OptimizeRepository(ctx, repo)) + wg.Done() + }() + + <-reqReceivedCh + + require.NoError(t, manager.OptimizeRepository(ctx, repo)) + require.NoError(t, manager.OptimizeRepository(ctx, repo)) + require.NoError(t, manager.OptimizeRepository(ctx, repo)) + assert.Equal(t, 1, optimizations) + + <-ch + wg.Wait() + + require.NoError(t, manager.OptimizeRepository(ctx, repo)) + require.NoError(t, manager.OptimizeRepository(ctx, repo)) + require.NoError(t, manager.OptimizeRepository(ctx, repo)) + assert.Equal(t, 4, optimizations) + }) +} + func TestPruneIfNeeded(t *testing.T) { ctx := testhelper.Context(t) cfg := testcfg.Build(t) -- cgit v1.2.3