diff options
author | John Cai <jcai@gitlab.com> | 2022-03-19 00:29:19 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2022-03-19 00:29:19 +0300 |
commit | c9d197fd96f1b6baa8b190f165716c17f60a2223 (patch) | |
tree | 0aff17c9e203240685a36645696e034d67fdedd6 | |
parent | d84345d4f45748297ddea390158db33412f591e6 (diff) | |
parent | c83f00059e6873fadfc83dd389bd3fcb868594af (diff) |
Merge branch 'jc-limit-optimize-repository-concurrency' into 'master'
housekeeping: Limit concurrency of OptimizeRepository to 1
Closes #4105
See merge request gitlab-org/gitaly!4411
-rw-r--r-- | internal/git/housekeeping/manager.go | 4 | ||||
-rw-r--r-- | internal/git/housekeeping/optimize_repository.go | 17 | ||||
-rw-r--r-- | internal/git/housekeeping/optimize_repository_test.go | 125 |
3 files changed, 146 insertions, 0 deletions
diff --git a/internal/git/housekeeping/manager.go b/internal/git/housekeeping/manager.go index a40e1c102..552c153c9 100644 --- a/internal/git/housekeeping/manager.go +++ b/internal/git/housekeeping/manager.go @@ -2,6 +2,7 @@ package housekeeping import ( "context" + "sync" "github.com/prometheus/client_golang/prometheus" "gitlab.com/gitlab-org/gitaly/v14/internal/git/localrepo" @@ -26,6 +27,8 @@ type RepositoryManager struct { tasksTotal *prometheus.CounterVec tasksLatency *prometheus.HistogramVec prunedFilesTotal *prometheus.CounterVec + optimizeFunc func(ctx context.Context, m *RepositoryManager, repo *localrepo.Repo) error + reposInProgress sync.Map } // NewManager creates a new RepositoryManager. @@ -55,6 +58,7 @@ func NewManager(promCfg gitalycfgprom.Config, txManager transaction.Manager) *Re }, []string{"filetype"}, ), + optimizeFunc: optimizeRepository, } } diff --git a/internal/git/housekeeping/optimize_repository.go b/internal/git/housekeeping/optimize_repository.go index fc38753f4..e44f2ccf5 100644 --- a/internal/git/housekeeping/optimize_repository.go +++ b/internal/git/housekeeping/optimize_repository.go @@ -22,6 +22,23 @@ import ( // OptimizeRepository performs optimizations on the repository. Whether optimizations are performed // or not depends on a set of heuristics. func (m *RepositoryManager) OptimizeRepository(ctx context.Context, repo *localrepo.Repo) error { + path, err := repo.Path() + if err != nil { + return err + } + + if _, ok := m.reposInProgress.LoadOrStore(path, struct{}{}); ok { + return nil + } + + defer func() { + m.reposInProgress.Delete(path) + }() + + return m.optimizeFunc(ctx, m, repo) +} + +func optimizeRepository(ctx context.Context, m *RepositoryManager, repo *localrepo.Repo) error { totalTimer := prometheus.NewTimer(m.tasksLatency.WithLabelValues("total")) optimizations := struct { diff --git a/internal/git/housekeeping/optimize_repository_test.go b/internal/git/housekeeping/optimize_repository_test.go index 2f5a2d8fd..4762220d4 100644 --- a/internal/git/housekeeping/optimize_repository_test.go +++ b/internal/git/housekeeping/optimize_repository_test.go @@ -1,15 +1,18 @@ package housekeeping import ( + "context" "fmt" "io" "os" "path/filepath" "strings" + "sync" "testing" "time" "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/internal/backchannel" "gitlab.com/gitlab-org/gitaly/v14/internal/git" @@ -581,6 +584,128 @@ func TestOptimizeRepository(t *testing.T) { } } +func TestOptimizeRepository_ConcurrencyLimit(t *testing.T) { + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + + t.Run("subsequent calls get skipped", func(t *testing.T) { + reqReceivedCh, ch := make(chan struct{}), make(chan struct{}) + + repoProto, _ := gittest.CloneRepo(t, cfg, cfg.Storages[0]) + repo := localrepo.NewTestRepo(t, cfg, repoProto) + + manager := &RepositoryManager{ + optimizeFunc: func(_ context.Context, _ *RepositoryManager, _ *localrepo.Repo) error { + reqReceivedCh <- struct{}{} + ch <- struct{}{} + + return nil + }, + } + + go func() { + require.NoError(t, manager.OptimizeRepository(ctx, repo)) + }() + + <-reqReceivedCh + // When repository optimizations are performed for a specific repository already, + // then any subsequent calls to the same repository should just return immediately + // without doing any optimizations at all. + require.NoError(t, manager.OptimizeRepository(ctx, repo)) + + <-ch + }) + + t.Run("multiple repositories concurrently", func(t *testing.T) { + reqReceivedCh, ch := make(chan struct{}), make(chan struct{}) + + repoProtoFirst, _ := gittest.CloneRepo(t, cfg, cfg.Storages[0]) + repoFirst := localrepo.NewTestRepo(t, cfg, repoProtoFirst) + repoProtoSecond, _ := gittest.CloneRepo(t, cfg, cfg.Storages[0]) + repoSecond := localrepo.NewTestRepo(t, cfg, repoProtoSecond) + + reposOptimized := make(map[string]struct{}) + + manager := &RepositoryManager{ + optimizeFunc: func(_ context.Context, _ *RepositoryManager, repo *localrepo.Repo) error { + reposOptimized[repo.GetRelativePath()] = struct{}{} + + if repo.GitRepo.GetRelativePath() == repoFirst.GetRelativePath() { + reqReceivedCh <- struct{}{} + ch <- struct{}{} + } + + return nil + }, + } + + // We block in the first call so that we can assert that a second call + // to a different repository performs the optimization regardless without blocking. + go func() { + require.NoError(t, manager.OptimizeRepository(ctx, repoFirst)) + }() + + <-reqReceivedCh + + // Because this optimizes a different repository this call shouldn't block. + require.NoError(t, manager.OptimizeRepository(ctx, repoSecond)) + + <-ch + + assert.Contains(t, reposOptimized, repoFirst.GetRelativePath()) + assert.Contains(t, reposOptimized, repoSecond.GetRelativePath()) + }) + + t.Run("serialized optimizations", func(t *testing.T) { + reqReceivedCh, ch := make(chan struct{}), make(chan struct{}) + repoProto, _ := gittest.CloneRepo(t, cfg, cfg.Storages[0]) + repo := localrepo.NewTestRepo(t, cfg, repoProto) + var optimizations int + + manager := &RepositoryManager{ + optimizeFunc: func(_ context.Context, _ *RepositoryManager, _ *localrepo.Repo) error { + optimizations++ + + if optimizations == 1 { + reqReceivedCh <- struct{}{} + ch <- struct{}{} + } + + return nil + }, + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + require.NoError(t, manager.OptimizeRepository(ctx, repo)) + }() + + <-reqReceivedCh + + // Because we already have a concurrent call which optimizes the repository we expect + // that all subsequent calls which try to optimize the same repository return immediately. + // Furthermore, we expect to see only a single call to the optimizing function because we + // don't want to optimize the same repository concurrently. + require.NoError(t, manager.OptimizeRepository(ctx, repo)) + require.NoError(t, manager.OptimizeRepository(ctx, repo)) + require.NoError(t, manager.OptimizeRepository(ctx, repo)) + assert.Equal(t, 1, optimizations) + + <-ch + wg.Wait() + + // When performing optimizations sequentially though the repository + // should be unlocked after every call, and consequentially we should + // also see multiple calls to the optimizing function. + require.NoError(t, manager.OptimizeRepository(ctx, repo)) + require.NoError(t, manager.OptimizeRepository(ctx, repo)) + require.NoError(t, manager.OptimizeRepository(ctx, repo)) + assert.Equal(t, 4, optimizations) + }) +} + func TestPruneIfNeeded(t *testing.T) { ctx := testhelper.Context(t) cfg := testcfg.Build(t) |