From 91e9c2e1e0c3f26db2b3a8b2acf19082c0b79bd8 Mon Sep 17 00:00:00 2001 From: James Liu Date: Wed, 13 Dec 2023 17:03:58 +1100 Subject: backup: Add ListRepositories to the strategy Adds a new method to the Strategy interface used by regular and server-side backups for performing repository backups and restores. This new method calls the internal WalkRepos() RPC to fetch a list of repos in a given storage. --- internal/backup/backup.go | 40 +++++++++++++++++ internal/backup/backup_test.go | 89 +++++++++++++++++++++++++++++++++++++ internal/backup/pipeline.go | 7 +++ internal/backup/pipeline_test.go | 8 ++++ internal/backup/server_side.go | 42 +++++++++++++++++ internal/backup/server_side_test.go | 87 ++++++++++++++++++++++++++++++++++++ 6 files changed, 273 insertions(+) diff --git a/internal/backup/backup.go b/internal/backup/backup.go index bb6bad6a4..afdf63d48 100644 --- a/internal/backup/backup.go +++ b/internal/backup/backup.go @@ -249,6 +249,37 @@ func (mgr *Manager) RemoveRepository(ctx context.Context, req *RemoveRepositoryR return nil } +// ListRepositories returns a list of repositories found in the given storage. +func (mgr *Manager) ListRepositories(ctx context.Context, req *ListRepositoriesRequest) (repos []*gitalypb.Repository, err error) { + if err := setContextServerInfo(ctx, &req.Server, req.StorageName); err != nil { + return nil, fmt.Errorf("list repos: set context: %w", err) + } + + internalClient, err := mgr.newInternalClient(ctx, req.Server) + if err != nil { + return nil, fmt.Errorf("list repos: create client: %w", err) + } + + stream, err := internalClient.WalkRepos(ctx, &gitalypb.WalkReposRequest{StorageName: req.StorageName}) + if err != nil { + return nil, fmt.Errorf("list repos: walk: %w", err) + } + + for { + resp, err := stream.Recv() + if errors.Is(err, io.EOF) { + break + } + if err != nil { + return nil, fmt.Errorf("list repos: receiving messages: %w", err) + } + + repos = append(repos, &gitalypb.Repository{RelativePath: resp.RelativePath, StorageName: req.StorageName}) + } + + return repos, nil +} + // Create creates a repository backup. func (mgr *Manager) Create(ctx context.Context, req *CreateRequest) error { if req.VanityRepository == nil { @@ -592,3 +623,12 @@ func (mgr *Manager) newRepoClient(ctx context.Context, server storage.ServerInfo return gitalypb.NewRepositoryServiceClient(conn), nil } + +func (mgr *Manager) newInternalClient(ctx context.Context, server storage.ServerInfo) (gitalypb.InternalGitalyClient, error) { + conn, err := mgr.conns.Dial(ctx, server.Address, server.Token) + if err != nil { + return nil, err + } + + return gitalypb.NewInternalGitalyClient(conn), nil +} diff --git a/internal/backup/backup_test.go b/internal/backup/backup_test.go index ca92ad546..fa764cc52 100644 --- a/internal/backup/backup_test.go +++ b/internal/backup/backup_test.go @@ -108,6 +108,95 @@ func TestManager_RemoveRepository(t *testing.T) { require.EqualError(t, err, "remove repo: remove: rpc error: code = InvalidArgument desc = storage name not found") } +func TestManager_ListRepositories(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + desc string + repos map[string][]*gitalypb.Repository + }{ + { + desc: "no repos", + repos: make(map[string][]*gitalypb.Repository), + }, + { + desc: "repos in a single storage", + repos: map[string][]*gitalypb.Repository{ + "storage-1": { + {RelativePath: "a", StorageName: "storage-1"}, + {RelativePath: "b", StorageName: "storage-1"}, + }, + }, + }, + { + desc: "repos in multiple storages", + repos: map[string][]*gitalypb.Repository{ + "storage-1": { + {RelativePath: "a", StorageName: "storage-1"}, + {RelativePath: "b", StorageName: "storage-1"}, + }, + "storage-2": { + {RelativePath: "c", StorageName: "storage-2"}, + {RelativePath: "d", StorageName: "storage-2"}, + }, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + if testhelper.IsPraefectEnabled() { + t.Skip("local backup manager expects to operate on the local filesystem so cannot operate through praefect") + } + + var storages []string + for storageName := range tc.repos { + storages = append(storages, storageName) + } + + // We don't really need a "default" storage, but this makes initialisation cleaner since + // WithStorages() takes at least one argument. + cfg := testcfg.Build(t, testcfg.WithStorages("default", storages...)) + cfg.SocketPath = testserver.RunGitalyServer(t, cfg, setup.RegisterAll) + + ctx := testhelper.Context(t) + + for storageName, repos := range tc.repos { + for _, repo := range repos { + storagePath, ok := cfg.StoragePath(storageName) + require.True(t, ok) + + _, _ = gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + RelativePath: repo.RelativePath, + Storage: config.Storage{Name: storageName, Path: storagePath}, + }) + } + } + + pool := client.NewPool() + defer testhelper.MustClose(t, pool) + + backupRoot := testhelper.TempDir(t) + sink := backup.NewFilesystemSink(backupRoot) + defer testhelper.MustClose(t, sink) + + locator, err := backup.ResolveLocator("pointer", sink) + require.NoError(t, err) + + fsBackup := backup.NewManager(sink, locator, pool) + + for storageName, repos := range tc.repos { + actualRepos, err := fsBackup.ListRepositories(ctx, &backup.ListRepositoriesRequest{ + Server: storage.ServerInfo{Address: cfg.SocketPath, Token: cfg.Auth.Token}, + StorageName: storageName, + }) + + require.NoError(t, err) + require.EqualValues(t, repos, actualRepos) + } + }) + } +} + func TestManager_Create(t *testing.T) { t.Parallel() diff --git a/internal/backup/pipeline.go b/internal/backup/pipeline.go index 1b969ccac..72e5d9740 100644 --- a/internal/backup/pipeline.go +++ b/internal/backup/pipeline.go @@ -16,6 +16,7 @@ import ( type Strategy interface { Create(context.Context, *CreateRequest) error Restore(context.Context, *RestoreRequest) error + ListRepositories(context.Context, *ListRepositoriesRequest) ([]*gitalypb.Repository, error) RemoveRepository(context.Context, *RemoveRepositoryRequest) error RemoveAllRepositories(context.Context, *RemoveAllRepositoriesRequest) error } @@ -66,6 +67,12 @@ type RemoveAllRepositoriesRequest struct { StorageName string } +// ListRepositoriesRequest is the request to list repositories in a given storage. +type ListRepositoriesRequest struct { + Server storage.ServerInfo + StorageName string +} + // Command handles a specific backup operation type Command interface { Repository() *gitalypb.Repository diff --git a/internal/backup/pipeline_test.go b/internal/backup/pipeline_test.go index cd77fb5a7..22dd65a83 100644 --- a/internal/backup/pipeline_test.go +++ b/internal/backup/pipeline_test.go @@ -126,6 +126,7 @@ type MockStrategy struct { RestoreFunc func(context.Context, *RestoreRequest) error RemoveAllRepositoriesFunc func(context.Context, *RemoveAllRepositoriesRequest) error RemoveRepositoryFunc func(context.Context, *RemoveRepositoryRequest) error + ListRepositoriesFunc func(context.Context, *ListRepositoriesRequest) ([]*gitalypb.Repository, error) } func (s MockStrategy) Create(ctx context.Context, req *CreateRequest) error { @@ -156,6 +157,13 @@ func (s MockStrategy) RemoveRepository(ctx context.Context, req *RemoveRepositor return nil } +func (s MockStrategy) ListRepositories(ctx context.Context, req *ListRepositoriesRequest) ([]*gitalypb.Repository, error) { + if s.ListRepositoriesFunc != nil { + return s.ListRepositoriesFunc(ctx, req) + } + return nil, nil +} + func testPipeline(t *testing.T, init func() *Pipeline) { strategy := MockStrategy{ CreateFunc: func(_ context.Context, req *CreateRequest) error { diff --git a/internal/backup/server_side.go b/internal/backup/server_side.go index 17382e66e..1b2319d73 100644 --- a/internal/backup/server_side.go +++ b/internal/backup/server_side.go @@ -2,7 +2,9 @@ package backup import ( "context" + "errors" "fmt" + "io" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" @@ -131,6 +133,37 @@ func (ss ServerSideAdapter) RemoveRepository(ctx context.Context, req *RemoveRep return nil } +// ListRepositories returns a list of repositories found in the given storage. +func (ss ServerSideAdapter) ListRepositories(ctx context.Context, req *ListRepositoriesRequest) (repos []*gitalypb.Repository, err error) { + if err := setContextServerInfo(ctx, &req.Server, req.StorageName); err != nil { + return nil, fmt.Errorf("server-side list repos: set context: %w", err) + } + + internalClient, err := ss.newInternalClient(ctx, req.Server) + if err != nil { + return nil, fmt.Errorf("server-side list repos: create client: %w", err) + } + + stream, err := internalClient.WalkRepos(ctx, &gitalypb.WalkReposRequest{StorageName: req.StorageName}) + if err != nil { + return nil, fmt.Errorf("server-side list repos: walk: %w", err) + } + + for { + resp, err := stream.Recv() + if errors.Is(err, io.EOF) { + break + } + if err != nil { + return nil, err + } + + repos = append(repos, &gitalypb.Repository{RelativePath: resp.RelativePath, StorageName: req.StorageName}) + } + + return repos, nil +} + func (ss ServerSideAdapter) newRepoClient(ctx context.Context, server storage.ServerInfo) (gitalypb.RepositoryServiceClient, error) { conn, err := ss.pool.Dial(ctx, server.Address, server.Token) if err != nil { @@ -139,3 +172,12 @@ func (ss ServerSideAdapter) newRepoClient(ctx context.Context, server storage.Se return gitalypb.NewRepositoryServiceClient(conn), nil } + +func (ss ServerSideAdapter) newInternalClient(ctx context.Context, server storage.ServerInfo) (gitalypb.InternalGitalyClient, error) { + conn, err := ss.pool.Dial(ctx, server.Address, server.Token) + if err != nil { + return nil, err + } + + return gitalypb.NewInternalGitalyClient(conn), nil +} diff --git a/internal/backup/server_side_test.go b/internal/backup/server_side_test.go index a7957bcd8..e4f1383c3 100644 --- a/internal/backup/server_side_test.go +++ b/internal/backup/server_side_test.go @@ -13,9 +13,11 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/setup" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" + "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testdb" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testserver" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) @@ -330,3 +332,88 @@ func TestServerSideAdapter_RemoveRepository(t *testing.T) { require.EqualError(t, err, "server-side remove repo: remove: rpc error: code = NotFound desc = repository does not exist") } + +func TestServerSideAdapter_ListRepositories(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + desc string + repos map[string][]*gitalypb.Repository + }{ + { + desc: "no repos", + repos: make(map[string][]*gitalypb.Repository), + }, + { + desc: "repos in a single storage", + repos: map[string][]*gitalypb.Repository{ + "storage-1": { + {RelativePath: "a", StorageName: "storage-1"}, + {RelativePath: "b", StorageName: "storage-1"}, + }, + }, + }, + { + desc: "repos in multiple storages", + repos: map[string][]*gitalypb.Repository{ + "storage-1": { + {RelativePath: "a", StorageName: "storage-1"}, + {RelativePath: "b", StorageName: "storage-1"}, + }, + "storage-2": { + {RelativePath: "c", StorageName: "storage-2"}, + {RelativePath: "d", StorageName: "storage-2"}, + }, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + var storages []string + for storageName := range tc.repos { + storages = append(storages, storageName) + } + + db := testdb.New(t) + db.TruncateAll(t) + rs := datastore.NewPostgresRepositoryStore(db, map[string][]string{"virtual-storage": storages}) + + // We don't really need a "default" storage, but this makes initialisation cleaner since + // WithStorages() takes at least one argument. + cfg := testcfg.Build(t, testcfg.WithStorages("default", storages...)) + cfg.SocketPath = testserver.RunGitalyServer(t, cfg, setup.RegisterAll) + + ctx := testhelper.Context(t) + + repoID := 1 + for storageName, repos := range tc.repos { + for _, repo := range repos { + storagePath, ok := cfg.StoragePath(storageName) + require.True(t, ok) + + _, _ = gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + RelativePath: repo.RelativePath, + Storage: config.Storage{Name: storageName, Path: storagePath}, + }) + + require.NoError(t, rs.CreateRepository(ctx, int64(repoID), "virtual-storage", repo.RelativePath, repo.RelativePath, storageName, nil, nil, false, false)) + + repoID++ + } + } + + pool := client.NewPool() + defer testhelper.MustClose(t, pool) + + adapter := backup.NewServerSideAdapter(pool) + + for storageName, repos := range tc.repos { + actualRepos, err := adapter.ListRepositories(ctx, &backup.ListRepositoriesRequest{ + Server: storage.ServerInfo{Address: cfg.SocketPath, Token: cfg.Auth.Token}, + StorageName: storageName, + }) + require.NoError(t, err) + require.EqualValues(t, repos, actualRepos) + } + }) + } +} -- cgit v1.2.3