diff options
author | Toon Claes <toon@gitlab.com> | 2021-03-29 17:12:50 +0300 |
---|---|---|
committer | Toon Claes <toon@gitlab.com> | 2021-03-29 17:12:50 +0300 |
commit | 6774facd910ff4b3d800efb384f926dad8f7c5d2 (patch) | |
tree | 9558900782c94d6565abd624f40f0e31c829c19a | |
parent | a134e059dafa23f29255c1b626887ae16201a819 (diff) | |
parent | 74882d5778003bcdf8ef30a1ddfd3902170747fa (diff) |
Merge branch 'pks-maintenance-resumable-walk' into 'master'
maintenance: Introduce rate limiting for `OptimizeRepo`
See merge request gitlab-org/gitaly!3296
-rw-r--r-- | internal/gitaly/maintenance/optimize.go | 83 | ||||
-rw-r--r-- | internal/gitaly/maintenance/optimize_test.go | 80 | ||||
-rw-r--r-- | internal/gitaly/maintenance/randomwalker.go | 95 | ||||
-rw-r--r-- | internal/gitaly/maintenance/randomwalker_test.go | 225 | ||||
-rw-r--r-- | internal/gitaly/server/server_factory.go | 4 |
5 files changed, 424 insertions, 63 deletions
diff --git a/internal/gitaly/maintenance/optimize.go b/internal/gitaly/maintenance/optimize.go index 3cbb4692e..854d68a04 100644 --- a/internal/gitaly/maintenance/optimize.go +++ b/internal/gitaly/maintenance/optimize.go @@ -2,7 +2,7 @@ package maintenance import ( "context" - "io/ioutil" + "errors" "math/rand" "os" "path/filepath" @@ -11,6 +11,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/storage" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "google.golang.org/grpc" @@ -35,10 +36,6 @@ func shuffledStoragesCopy(randSrc *rand.Rand, storages []config.Storage) []confi return shuffled } -func shuffleFileInfos(randSrc *rand.Rand, s []os.FileInfo) { - randSrc.Shuffle(len(s), func(i, j int) { s[i], s[j] = s[j], s[i] }) -} - // Optimizer knows how to optimize a repository type Optimizer interface { OptimizeRepository(context.Context, *gitalypb.OptimizeRepositoryRequest, ...grpc.CallOption) (*gitalypb.OptimizeRepositoryResponse, error) @@ -72,51 +69,57 @@ func optimizeRepoAtPath(ctx context.Context, l logrus.FieldLogger, s config.Stor return nil } -func walkReposShuffled(ctx context.Context, randSrc *rand.Rand, l logrus.FieldLogger, path string, s config.Storage, o Optimizer) error { - entries, err := ioutil.ReadDir(path) - switch { - case os.IsNotExist(err): - return nil // race condition: someone deleted it - case err != nil: - return err - } - - shuffleFileInfos(randSrc, entries) - - for _, e := range entries { - if err := ctx.Err(); err != nil { +func walkReposShuffled( + ctx context.Context, + walker *randomWalker, + l logrus.FieldLogger, + s config.Storage, + o Optimizer, + ticker helper.Ticker, +) error { + for { + fi, path, err := walker.next() + switch { + case errors.Is(err, errIterOver): + return nil + case os.IsNotExist(err): + continue // race condition: someone deleted it + case err != nil: return err } - if !e.IsDir() { + if !fi.IsDir() || !storage.IsGitDirectory(path) { continue } + walker.skipDir() - absPath := filepath.Join(path, e.Name()) - if !storage.IsGitDirectory(absPath) { - if err := walkReposShuffled(ctx, randSrc, l, absPath, s, o); err != nil { - return err - } - continue + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C(): } - if err := optimizeRepoAtPath(ctx, l, s, absPath, o); err != nil { + // Reset the ticker before doing the optimization such that we essentially limit + // ourselves to doing optimizations once per tick, not once per tick plus the time + // it takes to do the optimization. It's best effort given that traversing the + // directory hierarchy takes some time, too, but it should be good enough for now. + ticker.Reset() + + if err := optimizeRepoAtPath(ctx, l, s, path, o); err != nil { return err } } - - return nil } -// OptimizeReposRandomly returns a function to walk through each storage and -// attempt to optimize any repos encountered. +// OptimizeReposRandomly returns a function to walk through each storage and attempts to optimize +// any repos encountered. The ticker is used to rate-limit optimizations. // -// Only storage paths that map to an enabled storage name will be walked. -// Any storage paths shared by multiple storages will only be walked once. +// Only storage paths that map to an enabled storage name will be walked. Any storage paths shared +// by multiple storages will only be walked once. // -// Any errors during the optimization will be logged. Any other errors will be -// returned and cause the walk to end prematurely. -func OptimizeReposRandomly(storages []config.Storage, optimizer Optimizer) StoragesJob { +// Any errors during the optimization will be logged. Any other errors will be returned and cause +// the walk to end prematurely. +func OptimizeReposRandomly(storages []config.Storage, optimizer Optimizer, ticker helper.Ticker, rand *rand.Rand) StoragesJob { return func(ctx context.Context, l logrus.FieldLogger, enabledStorageNames []string) error { enabledNames := map[string]struct{}{} for _, sName := range enabledStorageNames { @@ -125,8 +128,10 @@ func OptimizeReposRandomly(storages []config.Storage, optimizer Optimizer) Stora visitedPaths := map[string]bool{} - randSrc := rand.New(rand.NewSource(time.Now().UnixNano())) - for _, storage := range shuffledStoragesCopy(randSrc, storages) { + ticker.Reset() + defer ticker.Stop() + + for _, storage := range shuffledStoragesCopy(rand, storages) { if _, ok := enabledNames[storage.Name]; !ok { continue // storage not enabled } @@ -138,7 +143,9 @@ func OptimizeReposRandomly(storages []config.Storage, optimizer Optimizer) Stora l.WithField("storage_path", storage.Path). Info("maintenance: optimizing repos in storage") - if err := walkReposShuffled(ctx, randSrc, l, storage.Path, storage, optimizer); err != nil { + walker := newRandomWalker(storage.Path, rand) + + if err := walkReposShuffled(ctx, walker, l, storage, optimizer, ticker); err != nil { l.WithError(err). WithField("storage_path", storage.Path). Errorf("maintenance: unable to completely walk storage") diff --git a/internal/gitaly/maintenance/optimize_test.go b/internal/gitaly/maintenance/optimize_test.go index 68b32651d..90d1c9f9a 100644 --- a/internal/gitaly/maintenance/optimize_test.go +++ b/internal/gitaly/maintenance/optimize_test.go @@ -2,6 +2,7 @@ package maintenance import ( "context" + "math/rand" "path/filepath" "testing" @@ -11,6 +12,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/internal/gitaly/service/repository" "gitlab.com/gitlab-org/gitaly/internal/gitaly/transaction" + "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/testhelper" "gitlab.com/gitlab-org/gitaly/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" @@ -42,37 +44,65 @@ func TestOptimizeReposRandomly(t *testing.T) { testhelper.MustRunCommand(t, nil, "git", "init", "--bare", filepath.Join(storage.Path, "b")) } - mo := &mockOptimizer{ - t: t, - cfg: cfg, - } - walker := OptimizeReposRandomly(cfg.Storages, mo) + cfg.Storages = append(cfg.Storages, config.Storage{ + Name: "duplicate", + Path: cfg.Storages[0].Path, + }) ctx, cancel := testhelper.Context() defer cancel() - require.NoError(t, walker(ctx, testhelper.DiscardTestEntry(t), []string{"0", "1"})) + for _, tc := range []struct { + desc string + storages []string + expected []*gitalypb.Repository + }{ + { + desc: "two storages", + storages: []string{"0", "1"}, + expected: []*gitalypb.Repository{ + {RelativePath: "a", StorageName: "0"}, + {RelativePath: "a", StorageName: "1"}, + {RelativePath: "b", StorageName: "0"}, + {RelativePath: "b", StorageName: "1"}, + }, + }, + { + desc: "duplicate storages", + storages: []string{"0", "1", "duplicate"}, + expected: []*gitalypb.Repository{ + {RelativePath: "a", StorageName: "0"}, + {RelativePath: "a", StorageName: "1"}, + {RelativePath: "b", StorageName: "0"}, + {RelativePath: "b", StorageName: "1"}, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + tickerDone := false + tickerCount := 0 - expect := []*gitalypb.Repository{ - {RelativePath: "a", StorageName: cfg.Storages[0].Name}, - {RelativePath: "a", StorageName: cfg.Storages[1].Name}, - {RelativePath: "b", StorageName: cfg.Storages[0].Name}, - {RelativePath: "b", StorageName: cfg.Storages[1].Name}, - } - require.ElementsMatch(t, expect, mo.actual) + ticker := helper.NewManualTicker() + ticker.ResetFunc = func() { + tickerCount++ + ticker.Tick() + } + ticker.StopFunc = func() { + tickerDone = true + } - // repeat storage paths should not impact repos visited - cfg.Storages = append(cfg.Storages, config.Storage{ - Name: "duplicate", - Path: cfg.Storages[0].Path, - }) + mo := &mockOptimizer{ + t: t, + cfg: cfg, + } + walker := OptimizeReposRandomly(cfg.Storages, mo, ticker, rand.New(rand.NewSource(1))) - mo = &mockOptimizer{ - t: t, - cfg: cfg, + require.NoError(t, walker(ctx, testhelper.DiscardTestEntry(t), tc.storages)) + require.ElementsMatch(t, tc.expected, mo.actual) + require.True(t, tickerDone) + // We expect one more tick than optimized repositories because of the + // initial tick up front to re-start the timer. + require.Equal(t, len(tc.expected)+1, tickerCount) + }) } - - walker = OptimizeReposRandomly(cfg.Storages, mo) - require.NoError(t, walker(ctx, testhelper.DiscardTestEntry(t), []string{"0", "1", "duplicate"})) - require.Equal(t, len(expect), len(mo.actual)) } diff --git a/internal/gitaly/maintenance/randomwalker.go b/internal/gitaly/maintenance/randomwalker.go new file mode 100644 index 000000000..8faefe659 --- /dev/null +++ b/internal/gitaly/maintenance/randomwalker.go @@ -0,0 +1,95 @@ +package maintenance + +import ( + "errors" + "io/ioutil" + "math/rand" + "os" + "path/filepath" +) + +var ( + errIterOver = errors.New("random walker at end") +) + +type stackFrame struct { + name string + entries []os.FileInfo +} + +// randomWalker is a filesystem walker which traverses a directory hierarchy in depth-first order, +// randomizing the order of each directory's entries. +type randomWalker struct { + stack []stackFrame + root string + pendingDir string + rand *rand.Rand +} + +// newRandomWalker creates a new random walker starting at `root`. +func newRandomWalker(root string, r *rand.Rand) *randomWalker { + return &randomWalker{ + pendingDir: root, + root: root, + rand: r, + } +} + +// next returns the next file. Traversal happens in depth-first order, where each directory's +// entities are traversed in random order. If there are no more files to iterate, `errIterOver` is +// returned. +func (r *randomWalker) next() (os.FileInfo, string, error) { + if r.pendingDir != "" { + // Reset pendingDir before returning the error such that the caller can continue if + // he doesn't care e.g. for the directory not existing. + pendingDir := r.pendingDir + r.pendingDir = "" + + entries, err := ioutil.ReadDir(pendingDir) + if err != nil { + return nil, pendingDir, err + } + + shuffleFileInfos(r.rand, entries) + r.stack = append(r.stack, stackFrame{ + name: pendingDir, + entries: entries, + }) + } + + // Iterate over all stack frames depth-first and search for the first non-empty + // one. If there are none, then it means that we've finished the depth-first search + // and return `errIterOver`. + for { + if len(r.stack) == 0 { + return nil, "", errIterOver + } + + // Retrieve the current bottom-most stack frame. If the stack frame is empty, we pop + // it and retry its parent frame. + stackFrame := &r.stack[len(r.stack)-1] + if len(stackFrame.entries) == 0 { + r.stack = r.stack[:len(r.stack)-1] + continue + } + + fi := stackFrame.entries[0] + stackFrame.entries = stackFrame.entries[1:] + + path := filepath.Join(stackFrame.name, fi.Name()) + if fi.IsDir() { + r.pendingDir = path + } + + return fi, path, nil + } +} + +func shuffleFileInfos(randSrc *rand.Rand, s []os.FileInfo) { + randSrc.Shuffle(len(s), func(i, j int) { s[i], s[j] = s[j], s[i] }) +} + +// skipDir marks the current directory such that it does not get descended into. +func (r *randomWalker) skipDir() { + r.pendingDir = "" +} diff --git a/internal/gitaly/maintenance/randomwalker_test.go b/internal/gitaly/maintenance/randomwalker_test.go new file mode 100644 index 000000000..9213197c9 --- /dev/null +++ b/internal/gitaly/maintenance/randomwalker_test.go @@ -0,0 +1,225 @@ +package maintenance + +import ( + "io/ioutil" + "math/rand" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" +) + +func TestRandomWalk(t *testing.T) { + for _, tc := range []struct { + desc string + dirs []string + files []string + skipPaths []string + expectedPaths []string + }{ + { + desc: "single directory", + dirs: []string{ + "foo", + }, + expectedPaths: []string{ + "foo", + }, + }, + { + desc: "multiple directories", + dirs: []string{ + "foo/bar/baz", + "foo/bar/qux", + "foo/bar/qux/qax", + "intermittent", + "other/dir", + "last", + }, + expectedPaths: []string{ + "foo", + "foo/bar", + "foo/bar/qux", + "foo/bar/qux/qax", + "foo/bar/baz", + "intermittent", + "other", + "other/dir", + "last", + }, + }, + { + desc: "single file", + files: []string{ + "file", + }, + expectedPaths: []string{ + "file", + }, + }, + { + desc: "mixed files and directories", + dirs: []string{ + "foo/bar/qux", + }, + files: []string{ + "file1", + "file2", + "file3", + "foo/file1", + "foo/file2", + "foo/file3", + "foo/bar/qux/file1", + "foo/bar/qux/file2", + "foo/bar/qux/file3", + }, + expectedPaths: []string{ + "file1", + "file2", + "foo", + "foo/bar", + "foo/bar/qux", + "foo/bar/qux/file2", + "foo/bar/qux/file3", + "foo/bar/qux/file1", + "foo/file2", + "foo/file3", + "foo/file1", + "file3", + }, + }, + { + desc: "single skipped dir", + dirs: []string{ + "foo", + }, + skipPaths: []string{ + "foo", + }, + expectedPaths: []string{ + "foo", + }, + }, + { + desc: "single skipped dir with nested contents", + dirs: []string{ + "foo", + "foo/subdir", + }, + files: []string{ + "foo/file", + }, + skipPaths: []string{ + "foo", + }, + expectedPaths: []string{ + "foo", + }, + }, + { + desc: "mixed files and directories with skipping", + dirs: []string{ + "dir1/subdir/subsubdir", + "dir2/foo/bar/qux", + "dir2/foo/baz", + "dir3", + }, + files: []string{ + "file", + "dir2/foo/file", + }, + skipPaths: []string{ + "dir1", + "dir2/foo/bar", + }, + expectedPaths: []string{ + "dir1", + "dir2", + "dir2/foo", + "dir2/foo/file", + "dir2/foo/bar", + "dir2/foo/baz", + "file", + "dir3", + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + root, cleanup := testhelper.TempDir(t) + defer cleanup() + + for _, dir := range tc.dirs { + require.NoError(t, os.MkdirAll(filepath.Join(root, dir), 0777)) + } + + for _, file := range tc.files { + require.NoError(t, ioutil.WriteFile(filepath.Join(root, file), []byte{}, 0777)) + } + + walker := newRandomWalker(root, rand.New(rand.NewSource(1))) + + skipPaths := make(map[string]bool) + for _, skipPath := range tc.skipPaths { + skipPaths[filepath.Join(root, skipPath)] = true + } + + actualPaths := []string{} + for { + fi, path, err := walker.next() + if err == errIterOver { + break + } + require.NoError(t, err) + + if skipPaths[path] { + walker.skipDir() + } + + require.Equal(t, filepath.Base(path), fi.Name()) + actualPaths = append(actualPaths, path) + } + + expectedPaths := make([]string, len(tc.expectedPaths)) + for i, expectedPath := range tc.expectedPaths { + expectedPaths[i] = filepath.Join(root, expectedPath) + } + + require.Equal(t, expectedPaths, actualPaths) + }) + } +} + +func TestRandomWalk_withRemovedDirs(t *testing.T) { + root, cleanup := testhelper.TempDir(t) + defer cleanup() + + for _, dir := range []string{"foo/bar", "foo/bar/deleteme", "foo/baz/qux", "foo/baz/other"} { + require.NoError(t, os.MkdirAll(filepath.Join(root, dir), 0777)) + } + + walker := newRandomWalker(root, rand.New(rand.NewSource(1))) + + for _, expectedPath := range []string{"foo", "foo/bar"} { + _, path, err := walker.next() + require.NoError(t, err) + require.Equal(t, filepath.Join(root, expectedPath), path) + } + + require.NoError(t, os.RemoveAll(filepath.Join(root, "foo/bar"))) + + _, path, err := walker.next() + require.Error(t, err, "expected ENOENT") + require.True(t, os.IsNotExist(err)) + require.Equal(t, filepath.Join(root, "foo/bar"), path) + + for _, expectedPath := range []string{"foo/baz", "foo/baz/other", "foo/baz/qux"} { + _, path, err := walker.next() + require.NoError(t, err) + require.Equal(t, filepath.Join(root, expectedPath), path) + } + + _, _, err = walker.next() + require.Equal(t, err, errIterOver) +} diff --git a/internal/gitaly/server/server_factory.go b/internal/gitaly/server/server_factory.go index ea902c3fa..4de443d69 100644 --- a/internal/gitaly/server/server_factory.go +++ b/internal/gitaly/server/server_factory.go @@ -3,6 +3,7 @@ package server import ( "context" "fmt" + "math/rand" "sync" "time" @@ -11,6 +12,7 @@ import ( "gitlab.com/gitlab-org/gitaly/client" "gitlab.com/gitlab-org/gitaly/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/internal/gitaly/maintenance" + "gitlab.com/gitlab-org/gitaly/internal/helper" gitalylog "gitlab.com/gitlab-org/gitaly/internal/log" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "google.golang.org/grpc" @@ -55,6 +57,8 @@ func (s *GitalyServerFactory) StartWorkers(ctx context.Context, l logrus.FieldLo maintenance.OptimizeReposRandomly( cfg.Storages, gitalypb.NewRepositoryServiceClient(cc), + helper.NewTimerTicker(1*time.Second), + rand.New(rand.NewSource(time.Now().UnixNano())), ), ) }() |