Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2021-09-03 17:22:41 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2021-09-30 15:05:49 +0300
commit007ebf28debb3b201ffc7e5d081bbe5e76a8b9fd (patch)
tree7b2f2d131dd47b933bf02ab8e1157da3c4fc6ab5
parent4aa02bd441b23818a39cd5966d442d48e88a3b3c (diff)
repoclean: Background task to act on the repositories
Background task runs periodically and scans gitaly nodes for the repositories. For each repository it determines if it is known to praefect or not. Based on this info some action can be performed. In our first implementation the action would be a warning log message for the missing repositories (upcoming set of changes). It uses only healthy gitaly nodes to search for repositories and gracefully handles any errors that occur during communication with gitaly nodes. Part of: https://gitlab.com/gitlab-org/gitaly/-/issues/3719
-rw-r--r--internal/praefect/repocleaner/init_test.go19
-rw-r--r--internal/praefect/repocleaner/repository.go202
-rw-r--r--internal/praefect/repocleaner/repository_test.go324
3 files changed, 545 insertions, 0 deletions
diff --git a/internal/praefect/repocleaner/init_test.go b/internal/praefect/repocleaner/init_test.go
new file mode 100644
index 000000000..011d4f2a4
--- /dev/null
+++ b/internal/praefect/repocleaner/init_test.go
@@ -0,0 +1,19 @@
+package repocleaner
+
+import (
+ "os"
+ "testing"
+
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+)
+
+func TestMain(m *testing.M) {
+ os.Exit(testMain(m))
+}
+
+func testMain(m *testing.M) (code int) {
+ defer testhelper.MustHaveNoChildProcess()
+ cleanup := testhelper.Configure()
+ defer cleanup()
+ return m.Run()
+}
diff --git a/internal/praefect/repocleaner/repository.go b/internal/praefect/repocleaner/repository.go
new file mode 100644
index 000000000..fbf3b841b
--- /dev/null
+++ b/internal/praefect/repocleaner/repository.go
@@ -0,0 +1,202 @@
+package repocleaner
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "io"
+ "time"
+
+ "github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+)
+
+// StateOwner performs check for the existence of the repositories.
+type StateOwner interface {
+ // DoesntExist returns RepositoryClusterPath for each repository that doesn't exist in the database
+ // by querying repositories and storage_repositories tables.
+ DoesntExist(ctx context.Context, virtualStorage, storage string, replicaPaths []string) ([]datastore.RepositoryClusterPath, error)
+}
+
+// Acquirer acquires storage for processing and no any other Acquirer can acquire it again until it is released.
+type Acquirer interface {
+ // Populate adds provided storage into the pool of entries to acquire.
+ Populate(ctx context.Context, virtualStorage, storage string) error
+ // AcquireNextStorage acquires next storage based on the inactive time.
+ AcquireNextStorage(ctx context.Context, inactive, updatePeriod time.Duration) (*datastore.ClusterPath, func() error, error)
+}
+
+// Action is a procedure to be executed on the repositories that doesn't exist in praefect database.
+type Action interface {
+ // Perform runs actual action for non-existing repositories.
+ Perform(ctx context.Context, notExisting []datastore.RepositoryClusterPath) error
+}
+
+// Runner scans healthy gitaly nodes for the repositories, verifies if
+// found repositories are known by praefect and runs a special action.
+type Runner struct {
+ cfg Cfg
+ logger logrus.FieldLogger
+ healthChecker praefect.HealthChecker
+ conns praefect.Connections
+ stateOwner StateOwner
+ acquirer Acquirer
+ action Action
+}
+
+// Cfg contains set of configuration parameters to run Runner.
+type Cfg struct {
+ // RunInterval: the check runs if the previous operation was done at least RunInterval before.
+ RunInterval time.Duration
+ // LivenessInterval: an update runs on the locked entity with provided period to signal that entity is in use.
+ LivenessInterval time.Duration
+ // RepositoriesInBatch is the number of repositories to pass as a batch for processing.
+ RepositoriesInBatch int
+}
+
+// NewRunner returns instance of the Runner.
+func NewRunner(cfg Cfg, logger logrus.FieldLogger, healthChecker praefect.HealthChecker, conns praefect.Connections, stateOwner StateOwner, acquirer Acquirer, action Action) *Runner {
+ return &Runner{
+ cfg: cfg,
+ logger: logger.WithField("component", "repocleaner.repository_existence"),
+ healthChecker: healthChecker,
+ conns: conns,
+ stateOwner: stateOwner,
+ acquirer: acquirer,
+ action: action,
+ }
+}
+
+// Run scans healthy gitaly nodes for the repositories, verifies if
+// found repositories are known by praefect and runs a special action.
+// It runs on each tick of the provided ticker and finishes with context cancellation.
+func (gs *Runner) Run(ctx context.Context, ticker helper.Ticker) error {
+ gs.logger.Info("started")
+ defer gs.logger.Info("completed")
+
+ defer ticker.Stop()
+
+ for virtualStorage, connByStorage := range gs.conns {
+ for storage := range connByStorage {
+ if err := gs.acquirer.Populate(ctx, virtualStorage, storage); err != nil {
+ return fmt.Errorf("populate database: %w", err)
+ }
+ }
+ }
+
+ var tick helper.Ticker
+ for {
+ // We use a local tick variable to run the first cycle
+ // without wait. All the other iterations are waiting
+ // for the next tick or context cancellation.
+ if tick != nil {
+ tick.Reset()
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-tick.C():
+ }
+ } else {
+ tick = ticker
+ }
+
+ gs.run(ctx)
+ }
+}
+
+func (gs *Runner) run(ctx context.Context) {
+ clusterPath, release, err := gs.acquirer.AcquireNextStorage(ctx, gs.cfg.RunInterval, gs.cfg.LivenessInterval)
+ if err != nil {
+ gs.logger.WithError(err).Error("unable to acquire next storage to verify")
+ return
+ }
+
+ logger := gs.logger
+ defer func() {
+ if err := release(); err != nil {
+ logger.WithError(err).Error("failed to release storage acquired to verify")
+ }
+ }()
+
+ if clusterPath == nil {
+ gs.logger.Debug("no storages to verify")
+ return
+ }
+
+ logger = gs.loggerWith(clusterPath.VirtualStorage, clusterPath.Storage)
+ err = gs.execOnRepositories(ctx, clusterPath.VirtualStorage, clusterPath.Storage, func(paths []datastore.RepositoryClusterPath) {
+ relativePaths := make([]string, len(paths))
+ for i, path := range paths {
+ relativePaths[i] = path.RelativeReplicaPath
+ }
+ notExisting, err := gs.stateOwner.DoesntExist(ctx, clusterPath.VirtualStorage, clusterPath.Storage, relativePaths)
+ if err != nil {
+ logger.WithError(err).WithField("repositories", paths).Error("failed to check existence")
+ return
+ }
+
+ if err := gs.action.Perform(ctx, notExisting); err != nil {
+ logger.WithError(err).WithField("existence", notExisting).Error("perform action")
+ return
+ }
+ })
+ if err != nil {
+ logger.WithError(err).Error("failed to exec action on repositories")
+ return
+ }
+}
+
+func (gs *Runner) loggerWith(virtualStorage, storage string) logrus.FieldLogger {
+ return gs.logger.WithFields(logrus.Fields{"virtual_storage": virtualStorage, "storage": storage})
+}
+
+func (gs *Runner) execOnRepositories(ctx context.Context, virtualStorage, storage string, action func([]datastore.RepositoryClusterPath)) error {
+ gclient, err := gs.getInternalGitalyClient(virtualStorage, storage)
+ if err != nil {
+ return fmt.Errorf("setup gitaly client: %w", err)
+ }
+
+ resp, err := gclient.WalkRepos(ctx, &gitalypb.WalkReposRequest{StorageName: storage})
+ if err != nil {
+ return fmt.Errorf("unable to walk repos: %w", err)
+ }
+
+ batch := make([]datastore.RepositoryClusterPath, 0, gs.cfg.RepositoriesInBatch)
+ for {
+ res, err := resp.Recv()
+ if err != nil {
+ if !errors.Is(err, io.EOF) {
+ return fmt.Errorf("failure on walking repos: %w", err)
+ }
+ break
+ }
+
+ batch = append(batch, datastore.RepositoryClusterPath{
+ ClusterPath: datastore.ClusterPath{
+ VirtualStorage: virtualStorage,
+ Storage: storage,
+ },
+ RelativeReplicaPath: res.RelativePath,
+ })
+
+ if len(batch) == cap(batch) {
+ action(batch)
+ batch = batch[:0]
+ }
+ }
+ if len(batch) > 0 {
+ action(batch)
+ }
+ return nil
+}
+
+func (gs *Runner) getInternalGitalyClient(virtualStorage, storage string) (gitalypb.InternalGitalyClient, error) {
+ conn, found := gs.conns[virtualStorage][storage]
+ if !found {
+ return nil, fmt.Errorf("no connection to the gitaly node %q/%q", virtualStorage, storage)
+ }
+ return gitalypb.NewInternalGitalyClient(conn), nil
+}
diff --git a/internal/praefect/repocleaner/repository_test.go b/internal/praefect/repocleaner/repository_test.go
new file mode 100644
index 000000000..32e7d7ad5
--- /dev/null
+++ b/internal/praefect/repocleaner/repository_test.go
@@ -0,0 +1,324 @@
+package repocleaner
+
+import (
+ "context"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/sirupsen/logrus"
+ "github.com/sirupsen/logrus/hooks/test"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/backchannel"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/service/transaction"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver"
+)
+
+func TestRunner_Run(t *testing.T) {
+ t.Parallel()
+
+ const (
+ repo1RelPath = "repo-1.git"
+ repo2RelPath = "repo-2.git"
+ repo3RelPath = "repo-3.git"
+
+ storage1 = "gitaly-1"
+ storage2 = "gitaly-2"
+ storage3 = "gitaly-3"
+
+ virtualStorage = "praefect"
+ )
+
+ g1Cfg := testcfg.Build(t, testcfg.WithStorages(storage1))
+ g2Cfg := testcfg.Build(t, testcfg.WithStorages(storage2))
+ g3Cfg := testcfg.Build(t, testcfg.WithStorages(storage3))
+
+ g1Addr := testserver.RunGitalyServer(t, g1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
+ g2Addr := testserver.RunGitalyServer(t, g2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
+ g3Addr := testserver.RunGitalyServer(t, g3Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
+
+ db := glsql.NewDB(t)
+ var database string
+ require.NoError(t, db.QueryRow(`SELECT current_database()`).Scan(&database))
+ dbConf := glsql.GetDBConfig(t, database)
+
+ conf := config.Config{
+ SocketPath: testhelper.GetTemporaryGitalySocketFileName(t),
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: virtualStorage,
+ Nodes: []*config.Node{
+ {Storage: g1Cfg.Storages[0].Name, Address: g1Addr},
+ {Storage: g2Cfg.Storages[0].Name, Address: g2Addr},
+ {Storage: g3Cfg.Storages[0].Name, Address: g3Addr},
+ },
+ },
+ },
+ DB: dbConf,
+ }
+ cfg := Cfg{
+ RunInterval: time.Duration(1),
+ LivenessInterval: time.Duration(1),
+ RepositoriesInBatch: 2,
+ }
+
+ gittest.CloneRepo(t, g1Cfg, g1Cfg.Storages[0], gittest.CloneRepoOpts{RelativePath: repo1RelPath})
+ gittest.CloneRepo(t, g1Cfg, g1Cfg.Storages[0], gittest.CloneRepoOpts{RelativePath: repo2RelPath})
+ gittest.CloneRepo(t, g1Cfg, g1Cfg.Storages[0], gittest.CloneRepoOpts{RelativePath: repo3RelPath})
+
+ // second gitaly is missing repo-3.git repository
+ gittest.CloneRepo(t, g2Cfg, g2Cfg.Storages[0], gittest.CloneRepoOpts{RelativePath: repo1RelPath})
+ gittest.CloneRepo(t, g2Cfg, g2Cfg.Storages[0], gittest.CloneRepoOpts{RelativePath: repo2RelPath})
+
+ // third gitaly has an extra repo-4.git repository
+ gittest.CloneRepo(t, g3Cfg, g3Cfg.Storages[0], gittest.CloneRepoOpts{RelativePath: repo1RelPath})
+ gittest.CloneRepo(t, g3Cfg, g3Cfg.Storages[0], gittest.CloneRepoOpts{RelativePath: repo2RelPath})
+ gittest.CloneRepo(t, g3Cfg, g3Cfg.Storages[0], gittest.CloneRepoOpts{RelativePath: repo3RelPath})
+ gittest.CloneRepo(t, g3Cfg, g3Cfg.Storages[0], gittest.CloneRepoOpts{RelativePath: "repo-4.git"})
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ repoStore := datastore.NewPostgresRepositoryStore(db.DB, nil)
+ for i, set := range []struct {
+ relativePath string
+ primary string
+ secondaries []string
+ }{
+ {
+ relativePath: repo1RelPath,
+ primary: storage1,
+ secondaries: []string{storage3},
+ },
+ {
+ relativePath: repo2RelPath,
+ primary: storage1,
+ secondaries: []string{storage2, storage3},
+ },
+ {
+ relativePath: repo3RelPath,
+ primary: storage1,
+ secondaries: []string{storage2, storage3},
+ },
+ } {
+ require.NoError(t, repoStore.CreateRepository(ctx, int64(i), conf.VirtualStorages[0].Name, set.relativePath, set.primary, set.secondaries, nil, false, false))
+ }
+
+ logger, loggerHook := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ entry := logger.WithContext(ctx)
+ clientHandshaker := backchannel.NewClientHandshaker(entry, praefect.NewBackchannelServerFactory(entry, transaction.NewServer(nil)))
+ nodeSet, err := praefect.DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, nil, clientHandshaker)
+ require.NoError(t, err)
+ defer nodeSet.Close()
+
+ storageCleanup := datastore.NewStorageCleanup(db.DB)
+
+ var iteration int32
+ runner := NewRunner(cfg, logger, praefect.StaticHealthChecker{virtualStorage: []string{storage1, storage2, storage3}}, nodeSet.Connections(), storageCleanup, storageCleanup, actionStub{
+ PerformMethod: func(ctx context.Context, notExisting []datastore.RepositoryClusterPath) error {
+ i := atomic.LoadInt32(&iteration)
+ switch i {
+ case 0:
+ assert.ElementsMatch(t, nil, notExisting)
+ case 1:
+ assert.ElementsMatch(t, nil, notExisting)
+ case 2:
+ assert.ElementsMatch(
+ t,
+ []datastore.RepositoryClusterPath{
+ datastore.NewRepositoryClusterPath(virtualStorage, storage2, repo1RelPath),
+ },
+ notExisting,
+ )
+ case 3:
+ assert.ElementsMatch(t, nil, notExisting)
+ case 4:
+ assert.Equal(
+ t,
+ []datastore.RepositoryClusterPath{
+ datastore.NewRepositoryClusterPath(virtualStorage, storage3, "repo-4.git"),
+ },
+ notExisting,
+ )
+ }
+ atomic.AddInt32(&iteration, 1)
+ return nil
+ },
+ })
+
+ ticker := helper.NewManualTicker()
+ done := make(chan struct{})
+ go func() {
+ defer close(done)
+ assert.Equal(t, context.Canceled, runner.Run(ctx, ticker))
+ }()
+ // we need to trigger it 5 times to make sure the 4-th run is fully completed
+ for i := 0; i < 5; i++ {
+ ticker.Tick()
+ }
+ require.Greater(t, atomic.LoadInt32(&iteration), int32(4))
+ require.Len(t, loggerHook.AllEntries(), 1)
+ require.Equal(
+ t,
+ map[string]interface{}{"Data": logrus.Fields{"component": "repocleaner.repository_existence"}, "Message": "started"},
+ map[string]interface{}{"Data": loggerHook.AllEntries()[0].Data, "Message": loggerHook.AllEntries()[0].Message},
+ )
+ // Terminates the loop.
+ cancel()
+ <-done
+ require.Equal(
+ t,
+ map[string]interface{}{"Data": logrus.Fields{"component": "repocleaner.repository_existence"}, "Message": "completed"},
+ map[string]interface{}{"Data": loggerHook.LastEntry().Data, "Message": loggerHook.LastEntry().Message},
+ )
+}
+
+func TestRunner_Run_noAvailableStorages(t *testing.T) {
+ t.Parallel()
+
+ const (
+ repo1RelPath = "repo-1.git"
+ storage1 = "gitaly-1"
+ virtualStorage = "praefect"
+ )
+
+ g1Cfg := testcfg.Build(t, testcfg.WithStorages(storage1))
+ g1Addr := testserver.RunGitalyServer(t, g1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
+
+ db := glsql.NewDB(t)
+ var database string
+ require.NoError(t, db.QueryRow(`SELECT current_database()`).Scan(&database))
+ dbConf := glsql.GetDBConfig(t, database)
+
+ conf := config.Config{
+ SocketPath: testhelper.GetTemporaryGitalySocketFileName(t),
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: virtualStorage,
+ Nodes: []*config.Node{
+ {Storage: g1Cfg.Storages[0].Name, Address: g1Addr},
+ },
+ },
+ },
+ DB: dbConf,
+ }
+ cfg := Cfg{
+ RunInterval: time.Minute,
+ LivenessInterval: time.Second,
+ RepositoriesInBatch: 2,
+ }
+
+ gittest.CloneRepo(t, g1Cfg, g1Cfg.Storages[0], gittest.CloneRepoOpts{RelativePath: repo1RelPath})
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ repoStore := datastore.NewPostgresRepositoryStore(db.DB, nil)
+ for i, set := range []struct {
+ relativePath string
+ primary string
+ }{
+ {
+ relativePath: repo1RelPath,
+ primary: storage1,
+ },
+ } {
+ require.NoError(t, repoStore.CreateRepository(ctx, int64(i), conf.VirtualStorages[0].Name, set.relativePath, set.primary, nil, nil, false, false))
+ }
+
+ logger := testhelper.NewTestLogger(t)
+ entry := logger.WithContext(ctx)
+ clientHandshaker := backchannel.NewClientHandshaker(entry, praefect.NewBackchannelServerFactory(entry, transaction.NewServer(nil)))
+ nodeSet, err := praefect.DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, nil, clientHandshaker)
+ require.NoError(t, err)
+ defer nodeSet.Close()
+
+ storageCleanup := datastore.NewStorageCleanup(db.DB)
+ startSecond := make(chan struct{})
+ releaseFirst := make(chan struct{})
+ runner := NewRunner(cfg, logger, praefect.StaticHealthChecker{virtualStorage: []string{storage1}}, nodeSet.Connections(), storageCleanup, storageCleanup, actionStub{
+ PerformMethod: func(ctx context.Context, notExisting []datastore.RepositoryClusterPath) error {
+ assert.Empty(t, notExisting)
+ // Block execution here until send instance completes its execution.
+ // It allows us to be sure the picked storage can't be picked once again by
+ // another instance as well as that it works without problems if there is
+ // nothing to pick up to process.
+ close(startSecond)
+ <-releaseFirst
+ return nil
+ },
+ })
+
+ var wg sync.WaitGroup
+ wg.Add(2)
+
+ go func() {
+ logger, loggerHook := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ runner := NewRunner(cfg, logger, praefect.StaticHealthChecker{virtualStorage: []string{storage1}}, nodeSet.Connections(), storageCleanup, storageCleanup, actionStub{
+ PerformMethod: func(ctx context.Context, notExisting []datastore.RepositoryClusterPath) error {
+ assert.FailNow(t, "should not be triggered as there is no available storages to acquire")
+ return nil
+ },
+ })
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+ ticker := helper.NewManualTicker()
+
+ <-startSecond
+ go func() {
+ defer wg.Done()
+ assert.Equal(t, context.Canceled, runner.Run(ctx, ticker))
+ }()
+ ticker.Tick()
+ ticker.Tick()
+ close(releaseFirst)
+ cancel()
+ wg.Wait()
+
+ entries := loggerHook.AllEntries()
+ require.Greater(t, len(entries), 2)
+ require.Equal(
+ t,
+ map[string]interface{}{"Data": logrus.Fields{"component": "repocleaner.repository_existence"}, "Message": "no storages to verify"},
+ map[string]interface{}{"Data": loggerHook.AllEntries()[1].Data, "Message": loggerHook.AllEntries()[1].Message},
+ )
+ }()
+
+ ticker := helper.NewManualTicker()
+ go func() {
+ defer wg.Done()
+ assert.Equal(t, context.Canceled, runner.Run(ctx, ticker))
+ }()
+ ticker.Tick()
+ ticker.Tick() // blocks until first processing cycle is done
+ cancel()
+ wg.Wait()
+}
+
+type actionStub struct {
+ PerformMethod func(ctx context.Context, existence []datastore.RepositoryClusterPath) error
+}
+
+func (as actionStub) Perform(ctx context.Context, existence []datastore.RepositoryClusterPath) error {
+ if as.PerformMethod != nil {
+ return as.PerformMethod(ctx, existence)
+ }
+ return nil
+}