Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorToon Claes <toon@gitlab.com>2021-03-29 17:12:50 +0300
committerToon Claes <toon@gitlab.com>2021-03-29 17:12:50 +0300
commit6774facd910ff4b3d800efb384f926dad8f7c5d2 (patch)
tree9558900782c94d6565abd624f40f0e31c829c19a
parenta134e059dafa23f29255c1b626887ae16201a819 (diff)
parent74882d5778003bcdf8ef30a1ddfd3902170747fa (diff)
Merge branch 'pks-maintenance-resumable-walk' into 'master'
maintenance: Introduce rate limiting for `OptimizeRepo` See merge request gitlab-org/gitaly!3296
-rw-r--r--internal/gitaly/maintenance/optimize.go83
-rw-r--r--internal/gitaly/maintenance/optimize_test.go80
-rw-r--r--internal/gitaly/maintenance/randomwalker.go95
-rw-r--r--internal/gitaly/maintenance/randomwalker_test.go225
-rw-r--r--internal/gitaly/server/server_factory.go4
5 files changed, 424 insertions, 63 deletions
diff --git a/internal/gitaly/maintenance/optimize.go b/internal/gitaly/maintenance/optimize.go
index 3cbb4692e..854d68a04 100644
--- a/internal/gitaly/maintenance/optimize.go
+++ b/internal/gitaly/maintenance/optimize.go
@@ -2,7 +2,7 @@ package maintenance
import (
"context"
- "io/ioutil"
+ "errors"
"math/rand"
"os"
"path/filepath"
@@ -11,6 +11,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/internal/gitaly/config"
+ "gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/storage"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
"google.golang.org/grpc"
@@ -35,10 +36,6 @@ func shuffledStoragesCopy(randSrc *rand.Rand, storages []config.Storage) []confi
return shuffled
}
-func shuffleFileInfos(randSrc *rand.Rand, s []os.FileInfo) {
- randSrc.Shuffle(len(s), func(i, j int) { s[i], s[j] = s[j], s[i] })
-}
-
// Optimizer knows how to optimize a repository
type Optimizer interface {
OptimizeRepository(context.Context, *gitalypb.OptimizeRepositoryRequest, ...grpc.CallOption) (*gitalypb.OptimizeRepositoryResponse, error)
@@ -72,51 +69,57 @@ func optimizeRepoAtPath(ctx context.Context, l logrus.FieldLogger, s config.Stor
return nil
}
-func walkReposShuffled(ctx context.Context, randSrc *rand.Rand, l logrus.FieldLogger, path string, s config.Storage, o Optimizer) error {
- entries, err := ioutil.ReadDir(path)
- switch {
- case os.IsNotExist(err):
- return nil // race condition: someone deleted it
- case err != nil:
- return err
- }
-
- shuffleFileInfos(randSrc, entries)
-
- for _, e := range entries {
- if err := ctx.Err(); err != nil {
+func walkReposShuffled(
+ ctx context.Context,
+ walker *randomWalker,
+ l logrus.FieldLogger,
+ s config.Storage,
+ o Optimizer,
+ ticker helper.Ticker,
+) error {
+ for {
+ fi, path, err := walker.next()
+ switch {
+ case errors.Is(err, errIterOver):
+ return nil
+ case os.IsNotExist(err):
+ continue // race condition: someone deleted it
+ case err != nil:
return err
}
- if !e.IsDir() {
+ if !fi.IsDir() || !storage.IsGitDirectory(path) {
continue
}
+ walker.skipDir()
- absPath := filepath.Join(path, e.Name())
- if !storage.IsGitDirectory(absPath) {
- if err := walkReposShuffled(ctx, randSrc, l, absPath, s, o); err != nil {
- return err
- }
- continue
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-ticker.C():
}
- if err := optimizeRepoAtPath(ctx, l, s, absPath, o); err != nil {
+ // Reset the ticker before doing the optimization such that we essentially limit
+ // ourselves to doing optimizations once per tick, not once per tick plus the time
+ // it takes to do the optimization. It's best effort given that traversing the
+ // directory hierarchy takes some time, too, but it should be good enough for now.
+ ticker.Reset()
+
+ if err := optimizeRepoAtPath(ctx, l, s, path, o); err != nil {
return err
}
}
-
- return nil
}
-// OptimizeReposRandomly returns a function to walk through each storage and
-// attempt to optimize any repos encountered.
+// OptimizeReposRandomly returns a function to walk through each storage and attempts to optimize
+// any repos encountered. The ticker is used to rate-limit optimizations.
//
-// Only storage paths that map to an enabled storage name will be walked.
-// Any storage paths shared by multiple storages will only be walked once.
+// Only storage paths that map to an enabled storage name will be walked. Any storage paths shared
+// by multiple storages will only be walked once.
//
-// Any errors during the optimization will be logged. Any other errors will be
-// returned and cause the walk to end prematurely.
-func OptimizeReposRandomly(storages []config.Storage, optimizer Optimizer) StoragesJob {
+// Any errors during the optimization will be logged. Any other errors will be returned and cause
+// the walk to end prematurely.
+func OptimizeReposRandomly(storages []config.Storage, optimizer Optimizer, ticker helper.Ticker, rand *rand.Rand) StoragesJob {
return func(ctx context.Context, l logrus.FieldLogger, enabledStorageNames []string) error {
enabledNames := map[string]struct{}{}
for _, sName := range enabledStorageNames {
@@ -125,8 +128,10 @@ func OptimizeReposRandomly(storages []config.Storage, optimizer Optimizer) Stora
visitedPaths := map[string]bool{}
- randSrc := rand.New(rand.NewSource(time.Now().UnixNano()))
- for _, storage := range shuffledStoragesCopy(randSrc, storages) {
+ ticker.Reset()
+ defer ticker.Stop()
+
+ for _, storage := range shuffledStoragesCopy(rand, storages) {
if _, ok := enabledNames[storage.Name]; !ok {
continue // storage not enabled
}
@@ -138,7 +143,9 @@ func OptimizeReposRandomly(storages []config.Storage, optimizer Optimizer) Stora
l.WithField("storage_path", storage.Path).
Info("maintenance: optimizing repos in storage")
- if err := walkReposShuffled(ctx, randSrc, l, storage.Path, storage, optimizer); err != nil {
+ walker := newRandomWalker(storage.Path, rand)
+
+ if err := walkReposShuffled(ctx, walker, l, storage, optimizer, ticker); err != nil {
l.WithError(err).
WithField("storage_path", storage.Path).
Errorf("maintenance: unable to completely walk storage")
diff --git a/internal/gitaly/maintenance/optimize_test.go b/internal/gitaly/maintenance/optimize_test.go
index 68b32651d..90d1c9f9a 100644
--- a/internal/gitaly/maintenance/optimize_test.go
+++ b/internal/gitaly/maintenance/optimize_test.go
@@ -2,6 +2,7 @@ package maintenance
import (
"context"
+ "math/rand"
"path/filepath"
"testing"
@@ -11,6 +12,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/internal/gitaly/service/repository"
"gitlab.com/gitlab-org/gitaly/internal/gitaly/transaction"
+ "gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/internal/testhelper/testcfg"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
@@ -42,37 +44,65 @@ func TestOptimizeReposRandomly(t *testing.T) {
testhelper.MustRunCommand(t, nil, "git", "init", "--bare", filepath.Join(storage.Path, "b"))
}
- mo := &mockOptimizer{
- t: t,
- cfg: cfg,
- }
- walker := OptimizeReposRandomly(cfg.Storages, mo)
+ cfg.Storages = append(cfg.Storages, config.Storage{
+ Name: "duplicate",
+ Path: cfg.Storages[0].Path,
+ })
ctx, cancel := testhelper.Context()
defer cancel()
- require.NoError(t, walker(ctx, testhelper.DiscardTestEntry(t), []string{"0", "1"}))
+ for _, tc := range []struct {
+ desc string
+ storages []string
+ expected []*gitalypb.Repository
+ }{
+ {
+ desc: "two storages",
+ storages: []string{"0", "1"},
+ expected: []*gitalypb.Repository{
+ {RelativePath: "a", StorageName: "0"},
+ {RelativePath: "a", StorageName: "1"},
+ {RelativePath: "b", StorageName: "0"},
+ {RelativePath: "b", StorageName: "1"},
+ },
+ },
+ {
+ desc: "duplicate storages",
+ storages: []string{"0", "1", "duplicate"},
+ expected: []*gitalypb.Repository{
+ {RelativePath: "a", StorageName: "0"},
+ {RelativePath: "a", StorageName: "1"},
+ {RelativePath: "b", StorageName: "0"},
+ {RelativePath: "b", StorageName: "1"},
+ },
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ tickerDone := false
+ tickerCount := 0
- expect := []*gitalypb.Repository{
- {RelativePath: "a", StorageName: cfg.Storages[0].Name},
- {RelativePath: "a", StorageName: cfg.Storages[1].Name},
- {RelativePath: "b", StorageName: cfg.Storages[0].Name},
- {RelativePath: "b", StorageName: cfg.Storages[1].Name},
- }
- require.ElementsMatch(t, expect, mo.actual)
+ ticker := helper.NewManualTicker()
+ ticker.ResetFunc = func() {
+ tickerCount++
+ ticker.Tick()
+ }
+ ticker.StopFunc = func() {
+ tickerDone = true
+ }
- // repeat storage paths should not impact repos visited
- cfg.Storages = append(cfg.Storages, config.Storage{
- Name: "duplicate",
- Path: cfg.Storages[0].Path,
- })
+ mo := &mockOptimizer{
+ t: t,
+ cfg: cfg,
+ }
+ walker := OptimizeReposRandomly(cfg.Storages, mo, ticker, rand.New(rand.NewSource(1)))
- mo = &mockOptimizer{
- t: t,
- cfg: cfg,
+ require.NoError(t, walker(ctx, testhelper.DiscardTestEntry(t), tc.storages))
+ require.ElementsMatch(t, tc.expected, mo.actual)
+ require.True(t, tickerDone)
+ // We expect one more tick than optimized repositories because of the
+ // initial tick up front to re-start the timer.
+ require.Equal(t, len(tc.expected)+1, tickerCount)
+ })
}
-
- walker = OptimizeReposRandomly(cfg.Storages, mo)
- require.NoError(t, walker(ctx, testhelper.DiscardTestEntry(t), []string{"0", "1", "duplicate"}))
- require.Equal(t, len(expect), len(mo.actual))
}
diff --git a/internal/gitaly/maintenance/randomwalker.go b/internal/gitaly/maintenance/randomwalker.go
new file mode 100644
index 000000000..8faefe659
--- /dev/null
+++ b/internal/gitaly/maintenance/randomwalker.go
@@ -0,0 +1,95 @@
+package maintenance
+
+import (
+ "errors"
+ "io/ioutil"
+ "math/rand"
+ "os"
+ "path/filepath"
+)
+
+var (
+ errIterOver = errors.New("random walker at end")
+)
+
+type stackFrame struct {
+ name string
+ entries []os.FileInfo
+}
+
+// randomWalker is a filesystem walker which traverses a directory hierarchy in depth-first order,
+// randomizing the order of each directory's entries.
+type randomWalker struct {
+ stack []stackFrame
+ root string
+ pendingDir string
+ rand *rand.Rand
+}
+
+// newRandomWalker creates a new random walker starting at `root`.
+func newRandomWalker(root string, r *rand.Rand) *randomWalker {
+ return &randomWalker{
+ pendingDir: root,
+ root: root,
+ rand: r,
+ }
+}
+
+// next returns the next file. Traversal happens in depth-first order, where each directory's
+// entities are traversed in random order. If there are no more files to iterate, `errIterOver` is
+// returned.
+func (r *randomWalker) next() (os.FileInfo, string, error) {
+ if r.pendingDir != "" {
+ // Reset pendingDir before returning the error such that the caller can continue if
+ // he doesn't care e.g. for the directory not existing.
+ pendingDir := r.pendingDir
+ r.pendingDir = ""
+
+ entries, err := ioutil.ReadDir(pendingDir)
+ if err != nil {
+ return nil, pendingDir, err
+ }
+
+ shuffleFileInfos(r.rand, entries)
+ r.stack = append(r.stack, stackFrame{
+ name: pendingDir,
+ entries: entries,
+ })
+ }
+
+ // Iterate over all stack frames depth-first and search for the first non-empty
+ // one. If there are none, then it means that we've finished the depth-first search
+ // and return `errIterOver`.
+ for {
+ if len(r.stack) == 0 {
+ return nil, "", errIterOver
+ }
+
+ // Retrieve the current bottom-most stack frame. If the stack frame is empty, we pop
+ // it and retry its parent frame.
+ stackFrame := &r.stack[len(r.stack)-1]
+ if len(stackFrame.entries) == 0 {
+ r.stack = r.stack[:len(r.stack)-1]
+ continue
+ }
+
+ fi := stackFrame.entries[0]
+ stackFrame.entries = stackFrame.entries[1:]
+
+ path := filepath.Join(stackFrame.name, fi.Name())
+ if fi.IsDir() {
+ r.pendingDir = path
+ }
+
+ return fi, path, nil
+ }
+}
+
+func shuffleFileInfos(randSrc *rand.Rand, s []os.FileInfo) {
+ randSrc.Shuffle(len(s), func(i, j int) { s[i], s[j] = s[j], s[i] })
+}
+
+// skipDir marks the current directory such that it does not get descended into.
+func (r *randomWalker) skipDir() {
+ r.pendingDir = ""
+}
diff --git a/internal/gitaly/maintenance/randomwalker_test.go b/internal/gitaly/maintenance/randomwalker_test.go
new file mode 100644
index 000000000..9213197c9
--- /dev/null
+++ b/internal/gitaly/maintenance/randomwalker_test.go
@@ -0,0 +1,225 @@
+package maintenance
+
+import (
+ "io/ioutil"
+ "math/rand"
+ "os"
+ "path/filepath"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+)
+
+func TestRandomWalk(t *testing.T) {
+ for _, tc := range []struct {
+ desc string
+ dirs []string
+ files []string
+ skipPaths []string
+ expectedPaths []string
+ }{
+ {
+ desc: "single directory",
+ dirs: []string{
+ "foo",
+ },
+ expectedPaths: []string{
+ "foo",
+ },
+ },
+ {
+ desc: "multiple directories",
+ dirs: []string{
+ "foo/bar/baz",
+ "foo/bar/qux",
+ "foo/bar/qux/qax",
+ "intermittent",
+ "other/dir",
+ "last",
+ },
+ expectedPaths: []string{
+ "foo",
+ "foo/bar",
+ "foo/bar/qux",
+ "foo/bar/qux/qax",
+ "foo/bar/baz",
+ "intermittent",
+ "other",
+ "other/dir",
+ "last",
+ },
+ },
+ {
+ desc: "single file",
+ files: []string{
+ "file",
+ },
+ expectedPaths: []string{
+ "file",
+ },
+ },
+ {
+ desc: "mixed files and directories",
+ dirs: []string{
+ "foo/bar/qux",
+ },
+ files: []string{
+ "file1",
+ "file2",
+ "file3",
+ "foo/file1",
+ "foo/file2",
+ "foo/file3",
+ "foo/bar/qux/file1",
+ "foo/bar/qux/file2",
+ "foo/bar/qux/file3",
+ },
+ expectedPaths: []string{
+ "file1",
+ "file2",
+ "foo",
+ "foo/bar",
+ "foo/bar/qux",
+ "foo/bar/qux/file2",
+ "foo/bar/qux/file3",
+ "foo/bar/qux/file1",
+ "foo/file2",
+ "foo/file3",
+ "foo/file1",
+ "file3",
+ },
+ },
+ {
+ desc: "single skipped dir",
+ dirs: []string{
+ "foo",
+ },
+ skipPaths: []string{
+ "foo",
+ },
+ expectedPaths: []string{
+ "foo",
+ },
+ },
+ {
+ desc: "single skipped dir with nested contents",
+ dirs: []string{
+ "foo",
+ "foo/subdir",
+ },
+ files: []string{
+ "foo/file",
+ },
+ skipPaths: []string{
+ "foo",
+ },
+ expectedPaths: []string{
+ "foo",
+ },
+ },
+ {
+ desc: "mixed files and directories with skipping",
+ dirs: []string{
+ "dir1/subdir/subsubdir",
+ "dir2/foo/bar/qux",
+ "dir2/foo/baz",
+ "dir3",
+ },
+ files: []string{
+ "file",
+ "dir2/foo/file",
+ },
+ skipPaths: []string{
+ "dir1",
+ "dir2/foo/bar",
+ },
+ expectedPaths: []string{
+ "dir1",
+ "dir2",
+ "dir2/foo",
+ "dir2/foo/file",
+ "dir2/foo/bar",
+ "dir2/foo/baz",
+ "file",
+ "dir3",
+ },
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ root, cleanup := testhelper.TempDir(t)
+ defer cleanup()
+
+ for _, dir := range tc.dirs {
+ require.NoError(t, os.MkdirAll(filepath.Join(root, dir), 0777))
+ }
+
+ for _, file := range tc.files {
+ require.NoError(t, ioutil.WriteFile(filepath.Join(root, file), []byte{}, 0777))
+ }
+
+ walker := newRandomWalker(root, rand.New(rand.NewSource(1)))
+
+ skipPaths := make(map[string]bool)
+ for _, skipPath := range tc.skipPaths {
+ skipPaths[filepath.Join(root, skipPath)] = true
+ }
+
+ actualPaths := []string{}
+ for {
+ fi, path, err := walker.next()
+ if err == errIterOver {
+ break
+ }
+ require.NoError(t, err)
+
+ if skipPaths[path] {
+ walker.skipDir()
+ }
+
+ require.Equal(t, filepath.Base(path), fi.Name())
+ actualPaths = append(actualPaths, path)
+ }
+
+ expectedPaths := make([]string, len(tc.expectedPaths))
+ for i, expectedPath := range tc.expectedPaths {
+ expectedPaths[i] = filepath.Join(root, expectedPath)
+ }
+
+ require.Equal(t, expectedPaths, actualPaths)
+ })
+ }
+}
+
+func TestRandomWalk_withRemovedDirs(t *testing.T) {
+ root, cleanup := testhelper.TempDir(t)
+ defer cleanup()
+
+ for _, dir := range []string{"foo/bar", "foo/bar/deleteme", "foo/baz/qux", "foo/baz/other"} {
+ require.NoError(t, os.MkdirAll(filepath.Join(root, dir), 0777))
+ }
+
+ walker := newRandomWalker(root, rand.New(rand.NewSource(1)))
+
+ for _, expectedPath := range []string{"foo", "foo/bar"} {
+ _, path, err := walker.next()
+ require.NoError(t, err)
+ require.Equal(t, filepath.Join(root, expectedPath), path)
+ }
+
+ require.NoError(t, os.RemoveAll(filepath.Join(root, "foo/bar")))
+
+ _, path, err := walker.next()
+ require.Error(t, err, "expected ENOENT")
+ require.True(t, os.IsNotExist(err))
+ require.Equal(t, filepath.Join(root, "foo/bar"), path)
+
+ for _, expectedPath := range []string{"foo/baz", "foo/baz/other", "foo/baz/qux"} {
+ _, path, err := walker.next()
+ require.NoError(t, err)
+ require.Equal(t, filepath.Join(root, expectedPath), path)
+ }
+
+ _, _, err = walker.next()
+ require.Equal(t, err, errIterOver)
+}
diff --git a/internal/gitaly/server/server_factory.go b/internal/gitaly/server/server_factory.go
index ea902c3fa..4de443d69 100644
--- a/internal/gitaly/server/server_factory.go
+++ b/internal/gitaly/server/server_factory.go
@@ -3,6 +3,7 @@ package server
import (
"context"
"fmt"
+ "math/rand"
"sync"
"time"
@@ -11,6 +12,7 @@ import (
"gitlab.com/gitlab-org/gitaly/client"
"gitlab.com/gitlab-org/gitaly/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/internal/gitaly/maintenance"
+ "gitlab.com/gitlab-org/gitaly/internal/helper"
gitalylog "gitlab.com/gitlab-org/gitaly/internal/log"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
"google.golang.org/grpc"
@@ -55,6 +57,8 @@ func (s *GitalyServerFactory) StartWorkers(ctx context.Context, l logrus.FieldLo
maintenance.OptimizeReposRandomly(
cfg.Storages,
gitalypb.NewRepositoryServiceClient(cc),
+ helper.NewTimerTicker(1*time.Second),
+ rand.New(rand.NewSource(time.Now().UnixNano())),
),
)
}()