From b7f892134e79cc5a5b9b16872d2ab8f55cb980bc Mon Sep 17 00:00:00 2001 From: John Cai Date: Tue, 29 Mar 2022 12:55:28 -0400 Subject: repocleaner: Add clean action Eventually we want the repocleaner to not just write log messages that a repository is unused, but actually clean it up. This commit adds a CleanAction that does just that. --- internal/praefect/repocleaner/action_clean.go | 50 ++++++++++++++++ internal/praefect/repocleaner/action_log_test.go | 72 ++++++++++++++++++++++++ internal/praefect/repocleaner/repository.go | 6 +- 3 files changed, 125 insertions(+), 3 deletions(-) create mode 100644 internal/praefect/repocleaner/action_clean.go diff --git a/internal/praefect/repocleaner/action_clean.go b/internal/praefect/repocleaner/action_clean.go new file mode 100644 index 000000000..aeb8d5099 --- /dev/null +++ b/internal/praefect/repocleaner/action_clean.go @@ -0,0 +1,50 @@ +package repocleaner + +import ( + "context" + + "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect" + "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" +) + +// CleanAction is an implementation of the Action interface that moves +// repositories to a lost+found folder in the root storage directory. +type CleanAction struct { + logger logrus.FieldLogger + conns praefect.Connections +} + +// NewCleanAction returns a new instance of CleanAction +func NewCleanAction(logger logrus.FieldLogger, conns praefect.Connections) *CleanAction { + return &CleanAction{ + logger: logger.WithField("component", "repocleaner.remove_repo_action"), + conns: conns, + } +} + +// Perform performs the action of moving repositories not known to Praefect to a +// lost+found folder in the storage root folder. +func (r *CleanAction) Perform(ctx context.Context, virtualStorage, storage string, replicaPaths []string) error { + l := r.logger.WithFields(logrus.Fields{ + "virtual_storage": virtualStorage, + "storage": storage, + }) + + l.WithField("cleaned_repos", replicaPaths). + Warn("cleaned repositories not managed by praefect") + + client, err := getInternalGitalyClient(r.conns, virtualStorage, storage) + if err != nil { + return err + } + + if _, err := client.CleanRepos(ctx, &gitalypb.CleanReposRequest{ + StorageName: storage, + RelativePaths: replicaPaths, + }); err != nil { + return err + } + + return nil +} diff --git a/internal/praefect/repocleaner/action_log_test.go b/internal/praefect/repocleaner/action_log_test.go index 00239d99f..c57e6b515 100644 --- a/internal/praefect/repocleaner/action_log_test.go +++ b/internal/praefect/repocleaner/action_log_test.go @@ -1,12 +1,26 @@ package repocleaner import ( + "path/filepath" "testing" + "time" "github.com/sirupsen/logrus" "github.com/sirupsen/logrus/hooks/test" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/internal/backchannel" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/service/transaction" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" ) func TestLogWarnAction_Perform(t *testing.T) { @@ -44,3 +58,61 @@ func TestLogWarnAction_Perform(t *testing.T) { "Message": hook.AllEntries()[1].Message, }}) } + +func TestCleanAction_Perform(t *testing.T) { + t.Parallel() + + const ( + repo1RelPath = "repo-1.git" + storage1 = "gitaly-3" + virtualStorage = "praefect" + ) + + g1Cfg := testcfg.Build(t, testcfg.WithStorages(storage1)) + + g1Addr := testserver.RunGitalyServer(t, g1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) + + db := testdb.New(t) + dbConf := testdb.GetConfig(t, db.Name) + + conf := config.Config{ + SocketPath: testhelper.GetTemporaryGitalySocketFileName(t), + VirtualStorages: []*config.VirtualStorage{ + { + Name: virtualStorage, + Nodes: []*config.Node{ + {Storage: g1Cfg.Storages[0].Name, Address: g1Addr}, + }, + }, + }, + DB: dbConf, + } + + gittest.CloneRepo(t, g1Cfg, g1Cfg.Storages[0], gittest.CloneRepoOpts{RelativePath: repo1RelPath}) + + repoStore := datastore.NewPostgresRepositoryStore(db.DB, nil) + + ctx := testhelper.Context(t) + require.NoError(t, repoStore.CreateRepository(ctx, 1, conf.VirtualStorages[0].Name, repo1RelPath, repo1RelPath, storage1, []string{}, nil, false, false)) + + logger, hook := test.NewNullLogger() + + entry := logger.WithContext(ctx) + clientHandshaker := backchannel.NewClientHandshaker(entry, praefect.NewBackchannelServerFactory(entry, transaction.NewServer(nil), nil)) + nodeSet, err := praefect.DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, nil, clientHandshaker, nil) + require.NoError(t, err) + defer nodeSet.Close() + + cleanAction := NewCleanAction(logger, nodeSet.Connections()) + + require.NoError(t, cleanAction.Perform(ctx, virtualStorage, storage1, []string{repo1RelPath})) + resultLogEntry := hook.LastEntry() + + assert.Equal(t, []string{repo1RelPath}, resultLogEntry.Data["cleaned_repos"]) + assert.Equal(t, "cleaned repositories not managed by praefect", resultLogEntry.Message) + + storagePath, found := g1Cfg.StoragePath(storage1) + require.True(t, found) + assert.DirExists(t, filepath.Join(storagePath, "+gitaly", "lost+found", time.Now().Format("2006-01-02"))) + assert.NoDirExists(t, filepath.Join(storagePath, repo1RelPath)) +} diff --git a/internal/praefect/repocleaner/repository.go b/internal/praefect/repocleaner/repository.go index afc8fd317..5df5b2b08 100644 --- a/internal/praefect/repocleaner/repository.go +++ b/internal/praefect/repocleaner/repository.go @@ -168,7 +168,7 @@ func NewWalker(conns praefect.Connections, batchSize int, gracePeriod time.Durat // ExecOnRepositories runs through all the repositories on a Gitaly storage and executes the provided action. // The processing is done in batches to reduce cost of operations. func (wr *Walker) ExecOnRepositories(ctx context.Context, virtualStorage, storage string, action func(string, string, []string) error) error { - gclient, err := wr.getInternalGitalyClient(virtualStorage, storage) + gclient, err := getInternalGitalyClient(wr.conns, virtualStorage, storage) if err != nil { return fmt.Errorf("setup gitaly client: %w", err) } @@ -211,8 +211,8 @@ func (wr *Walker) ExecOnRepositories(ctx context.Context, virtualStorage, storag return nil } -func (wr *Walker) getInternalGitalyClient(virtualStorage, storage string) (gitalypb.InternalGitalyClient, error) { - conn, found := wr.conns[virtualStorage][storage] +func getInternalGitalyClient(conns praefect.Connections, virtualStorage, storage string) (gitalypb.InternalGitalyClient, error) { + conn, found := conns[virtualStorage][storage] if !found { return nil, fmt.Errorf("no connection to the gitaly node %q/%q", virtualStorage, storage) } -- cgit v1.2.3