From 95bbcbcca4a2f344a171dba71f30071850f0fba6 Mon Sep 17 00:00:00 2001 From: James Liu Date: Tue, 12 Dec 2023 14:12:04 +1100 Subject: backup: Track repos that have been processed Adds a map to the Pipeline to track repos that have been restored or backed up. A mutex is used to synchronise access to the map, as entries are appended by goroutines operating in the workers. The signature of Done() is modified to return the map, and is intentionally ignored in the actual backup and restore logic for now. A subsequent commit will utilise the map for restore operations. --- internal/backup/pipeline.go | 17 +++++++++++++---- internal/backup/pipeline_test.go | 37 ++++++++++++++++++++++++++++++++---- internal/cli/gitalybackup/create.go | 2 +- internal/cli/gitalybackup/restore.go | 3 ++- 4 files changed, 49 insertions(+), 10 deletions(-) diff --git a/internal/backup/pipeline.go b/internal/backup/pipeline.go index 5672493ac..8000f61af 100644 --- a/internal/backup/pipeline.go +++ b/internal/backup/pipeline.go @@ -181,6 +181,9 @@ type Pipeline struct { pipelineError error cmdErrors *commandErrors + + processedRepos map[string][]*gitalypb.Repository + processedReposMu sync.Mutex } // NewPipeline creates a pipeline that executes backup and restore jobs. @@ -195,6 +198,7 @@ func NewPipeline(log log.Logger, opts ...PipelineOption) (*Pipeline, error) { done: make(chan struct{}), workersByStorage: make(map[string]chan *contextCommand), cmdErrors: &commandErrors{}, + processedRepos: make(map[string][]*gitalypb.Repository), } for _, opt := range opts { @@ -252,19 +256,19 @@ func (p *Pipeline) Handle(ctx context.Context, cmd Command) { } // Done waits for any in progress jobs to complete then reports any accumulated errors -func (p *Pipeline) Done() error { +func (p *Pipeline) Done() (processedRepos map[string][]*gitalypb.Repository, err error) { close(p.done) p.workerWg.Wait() if p.pipelineError != nil { - return fmt.Errorf("pipeline: %w", p.pipelineError) + return nil, fmt.Errorf("pipeline: %w", p.pipelineError) } if len(p.cmdErrors.errs) > 0 { - return fmt.Errorf("pipeline: %w", p.cmdErrors) + return nil, fmt.Errorf("pipeline: %w", p.cmdErrors) } - return nil + return p.processedRepos, nil } // getWorker finds the channel associated with a storage. When no channel is @@ -325,6 +329,11 @@ func (p *Pipeline) processCommand(ctx context.Context, cmd Command) { return } + storageName := cmd.Repository().StorageName + p.processedReposMu.Lock() + p.processedRepos[storageName] = append(p.processedRepos[storageName], cmd.Repository()) + p.processedReposMu.Unlock() + log.Info(fmt.Sprintf("completed %s", cmd.Name())) } diff --git a/internal/backup/pipeline_test.go b/internal/backup/pipeline_test.go index 04c539a5f..8545c905e 100644 --- a/internal/backup/pipeline_test.go +++ b/internal/backup/pipeline_test.go @@ -98,7 +98,8 @@ func TestPipeline(t *testing.T) { p.Handle(ctx, NewCreateCommand(strategy, CreateRequest{Repository: &gitalypb.Repository{StorageName: "storage1"}})) p.Handle(ctx, NewCreateCommand(strategy, CreateRequest{Repository: &gitalypb.Repository{StorageName: "storage2"}})) } - require.NoError(t, p.Done()) + _, err = p.Done() + require.NoError(t, err) }) } }) @@ -115,7 +116,8 @@ func TestPipeline(t *testing.T) { p.Handle(ctx, NewCreateCommand(strategy, CreateRequest{Repository: &gitalypb.Repository{StorageName: "default"}})) - require.EqualError(t, p.Done(), "pipeline: context canceled") + _, err = p.Done() + require.EqualError(t, err, "pipeline: context canceled") }) } @@ -222,7 +224,7 @@ func testPipeline(t *testing.T, init func() *Pipeline) { require.Equal(t, tc.level, logEntry.Level) } - err := p.Done() + _, err := p.Done() if tc.level == logrus.ErrorLevel { require.EqualError(t, err, "pipeline: 1 failures encountered:\n - c.git: assert.AnError general error for testing\n") @@ -258,7 +260,7 @@ func testPipeline(t *testing.T, init func() *Pipeline) { for _, cmd := range commands { p.Handle(ctx, cmd) } - err := p.Done() + _, err := p.Done() require.EqualError(t, err, "pipeline: 1 failures encountered:\n - c.git: assert.AnError general error for testing\n") }) } @@ -309,3 +311,30 @@ func TestPipelineError(t *testing.T) { }) } } + +func TestPipelineProcessedRepos(t *testing.T) { + strategy := MockStrategy{} + + repos := map[string][]*gitalypb.Repository{ + "storage1": { + {RelativePath: "a.git", StorageName: "storage1"}, + {RelativePath: "b.git", StorageName: "storage1"}, + }, + "storage2": {{RelativePath: "c.git", StorageName: "storage2"}}, + "storage3": {{RelativePath: "d.git", StorageName: "storage3"}}, + } + + p, err := NewPipeline(testhelper.SharedLogger(t)) + require.NoError(t, err) + + ctx := testhelper.Context(t) + for _, v := range repos { + for _, repo := range v { + p.Handle(ctx, NewRestoreCommand(strategy, RestoreRequest{Repository: repo})) + } + } + + processedRepos, err := p.Done() + require.NoError(t, err) + require.EqualValues(t, repos, processedRepos) +} diff --git a/internal/cli/gitalybackup/create.go b/internal/cli/gitalybackup/create.go index 4ad653490..be7e75cdb 100644 --- a/internal/cli/gitalybackup/create.go +++ b/internal/cli/gitalybackup/create.go @@ -174,7 +174,7 @@ func (cmd *createSubcommand) run(ctx context.Context, logger log.Logger, stdin i })) } - if err := pipeline.Done(); err != nil { + if _, err := pipeline.Done(); err != nil { return fmt.Errorf("create: %w", err) } return nil diff --git a/internal/cli/gitalybackup/restore.go b/internal/cli/gitalybackup/restore.go index de9e2cd3d..7a304f2ed 100644 --- a/internal/cli/gitalybackup/restore.go +++ b/internal/cli/gitalybackup/restore.go @@ -157,6 +157,7 @@ func (cmd *restoreSubcommand) run(ctx context.Context, logger log.Logger, stdin decoder := json.NewDecoder(stdin) for { + var req restoreRequest if err := decoder.Decode(&req); errors.Is(err, io.EOF) { break @@ -178,7 +179,7 @@ func (cmd *restoreSubcommand) run(ctx context.Context, logger log.Logger, stdin })) } - if err := pipeline.Done(); err != nil { + if _, err := pipeline.Done(); err != nil { return fmt.Errorf("restore: %w", err) } return nil -- cgit v1.2.3 From ada54fc25380ad5b6844f46df793a62075b4bb31 Mon Sep 17 00:00:00 2001 From: James Liu Date: Tue, 12 Dec 2023 14:54:25 +1100 Subject: backup: Add RemoveRepository to the strategy Adds a new method to the Strategy interface used by regular and server-side backups for performing repository backups and restores. This new method removes a single repository from its storage, and will eventually replace the existing RemoveAllRepositories method. --- internal/backup/backup.go | 19 ++++++++++++++++ internal/backup/backup_test.go | 43 +++++++++++++++++++++++++++++++++++++ internal/backup/pipeline.go | 7 ++++++ internal/backup/pipeline_test.go | 8 +++++++ internal/backup/server_side.go | 19 ++++++++++++++++ internal/backup/server_side_test.go | 35 ++++++++++++++++++++++++++++++ 6 files changed, 131 insertions(+) diff --git a/internal/backup/backup.go b/internal/backup/backup.go index 5eed0724a..bb6bad6a4 100644 --- a/internal/backup/backup.go +++ b/internal/backup/backup.go @@ -230,6 +230,25 @@ func (mgr *Manager) RemoveAllRepositories(ctx context.Context, req *RemoveAllRep return nil } +// RemoveRepository removes the specified repository from its storage. +func (mgr *Manager) RemoveRepository(ctx context.Context, req *RemoveRepositoryRequest) error { + if err := setContextServerInfo(ctx, &req.Server, req.Repo.StorageName); err != nil { + return fmt.Errorf("remove repo: set context: %w", err) + } + + repoClient, err := mgr.newRepoClient(ctx, req.Server) + if err != nil { + return fmt.Errorf("remove repo: create client: %w", err) + } + + _, err = repoClient.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{Repository: req.Repo}) + if err != nil { + return fmt.Errorf("remove repo: remove: %w", err) + } + + return nil +} + // Create creates a repository backup. func (mgr *Manager) Create(ctx context.Context, req *CreateRequest) error { if req.VanityRepository == nil { diff --git a/internal/backup/backup_test.go b/internal/backup/backup_test.go index 14341ec6a..ca92ad546 100644 --- a/internal/backup/backup_test.go +++ b/internal/backup/backup_test.go @@ -65,6 +65,49 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) require.NoError(t, err) } +func TestManager_RemoveRepository(t *testing.T) { + if testhelper.IsPraefectEnabled() { + t.Skip("local backup manager expects to operate on the local filesystem so cannot operate through praefect") + } + + t.Parallel() + + cfg := testcfg.Build(t) + cfg.SocketPath = testserver.RunGitalyServer(t, cfg, setup.RegisterAll) + + ctx := testhelper.Context(t) + + repo, repoPath := gittest.CreateRepository(t, ctx, cfg) + commitID := gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("main")) + gittest.WriteTag(t, cfg, repoPath, "v1.0.0", commitID.Revision()) + + pool := client.NewPool() + defer testhelper.MustClose(t, pool) + + backupRoot := testhelper.TempDir(t) + sink := backup.NewFilesystemSink(backupRoot) + defer testhelper.MustClose(t, sink) + + locator, err := backup.ResolveLocator("pointer", sink) + require.NoError(t, err) + + fsBackup := backup.NewManager(sink, locator, pool) + err = fsBackup.RemoveRepository(ctx, &backup.RemoveRepositoryRequest{ + Server: storage.ServerInfo{Address: cfg.SocketPath, Token: cfg.Auth.Token}, + Repo: repo, + }) + require.NoError(t, err) + require.NoDirExists(t, repoPath) + + // With an invalid repository + err = fsBackup.RemoveRepository(ctx, &backup.RemoveRepositoryRequest{ + Server: storage.ServerInfo{Address: cfg.SocketPath, Token: cfg.Auth.Token}, + Repo: &gitalypb.Repository{StorageName: "nonexistent", RelativePath: "nonexistent"}, + }) + + require.EqualError(t, err, "remove repo: remove: rpc error: code = InvalidArgument desc = storage name not found") +} + func TestManager_Create(t *testing.T) { t.Parallel() diff --git a/internal/backup/pipeline.go b/internal/backup/pipeline.go index 8000f61af..1b969ccac 100644 --- a/internal/backup/pipeline.go +++ b/internal/backup/pipeline.go @@ -16,6 +16,7 @@ import ( type Strategy interface { Create(context.Context, *CreateRequest) error Restore(context.Context, *RestoreRequest) error + RemoveRepository(context.Context, *RemoveRepositoryRequest) error RemoveAllRepositories(context.Context, *RemoveAllRepositoriesRequest) error } @@ -52,6 +53,12 @@ type RestoreRequest struct { BackupID string } +// RemoveRepositoryRequest is a request to remove an individual repository from its storage. +type RemoveRepositoryRequest struct { + Server storage.ServerInfo + Repo *gitalypb.Repository +} + // RemoveAllRepositoriesRequest is the request to remove all repositories in the specified // storage name. type RemoveAllRepositoriesRequest struct { diff --git a/internal/backup/pipeline_test.go b/internal/backup/pipeline_test.go index 8545c905e..cd77fb5a7 100644 --- a/internal/backup/pipeline_test.go +++ b/internal/backup/pipeline_test.go @@ -125,6 +125,7 @@ type MockStrategy struct { CreateFunc func(context.Context, *CreateRequest) error RestoreFunc func(context.Context, *RestoreRequest) error RemoveAllRepositoriesFunc func(context.Context, *RemoveAllRepositoriesRequest) error + RemoveRepositoryFunc func(context.Context, *RemoveRepositoryRequest) error } func (s MockStrategy) Create(ctx context.Context, req *CreateRequest) error { @@ -148,6 +149,13 @@ func (s MockStrategy) RemoveAllRepositories(ctx context.Context, req *RemoveAllR return nil } +func (s MockStrategy) RemoveRepository(ctx context.Context, req *RemoveRepositoryRequest) error { + if s.RemoveRepositoryFunc != nil { + return s.RemoveRepositoryFunc(ctx, req) + } + return nil +} + func testPipeline(t *testing.T, init func() *Pipeline) { strategy := MockStrategy{ CreateFunc: func(_ context.Context, req *CreateRequest) error { diff --git a/internal/backup/server_side.go b/internal/backup/server_side.go index 35654f215..17382e66e 100644 --- a/internal/backup/server_side.go +++ b/internal/backup/server_side.go @@ -112,6 +112,25 @@ func (ss ServerSideAdapter) RemoveAllRepositories(ctx context.Context, req *Remo return nil } +// RemoveRepository removes the specified repository from its storage. +func (ss ServerSideAdapter) RemoveRepository(ctx context.Context, req *RemoveRepositoryRequest) error { + if err := setContextServerInfo(ctx, &req.Server, req.Repo.StorageName); err != nil { + return fmt.Errorf("server-side remove repo: set context: %w", err) + } + + repoClient, err := ss.newRepoClient(ctx, req.Server) + if err != nil { + return fmt.Errorf("server-side remove repo: create client: %w", err) + } + + _, err = repoClient.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{Repository: req.Repo}) + if err != nil { + return fmt.Errorf("server-side remove repo: remove: %w", err) + } + + return nil +} + func (ss ServerSideAdapter) newRepoClient(ctx context.Context, server storage.ServerInfo) (gitalypb.RepositoryServiceClient, error) { conn, err := ss.pool.Dial(ctx, server.Address, server.Token) if err != nil { diff --git a/internal/backup/server_side_test.go b/internal/backup/server_side_test.go index 2acc547c5..a7957bcd8 100644 --- a/internal/backup/server_side_test.go +++ b/internal/backup/server_side_test.go @@ -295,3 +295,38 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) }) require.NoError(t, err) } + +func TestServerSideAdapter_RemoveRepository(t *testing.T) { + t.Parallel() + + db := testdb.New(t) + db.TruncateAll(t) + datastore.NewPostgresRepositoryStore(db, map[string][]string{"virtual-storage": {"default"}}) + + cfg := testcfg.Build(t) + cfg.SocketPath = testserver.RunGitalyServer(t, cfg, setup.RegisterAll) + + ctx := testhelper.Context(t) + + repo, repoPath := gittest.CreateRepository(t, ctx, cfg) + gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("main")) + + pool := client.NewPool() + defer testhelper.MustClose(t, pool) + + adapter := backup.NewServerSideAdapter(pool) + err := adapter.RemoveRepository(ctx, &backup.RemoveRepositoryRequest{ + Server: storage.ServerInfo{Address: cfg.SocketPath, Token: cfg.Auth.Token}, + Repo: repo, + }) + require.NoError(t, err) + require.NoDirExists(t, repoPath) + + // With an invalid repository + err = adapter.RemoveRepository(ctx, &backup.RemoveRepositoryRequest{ + Server: storage.ServerInfo{Address: cfg.SocketPath, Token: cfg.Auth.Token}, + Repo: &gitalypb.Repository{StorageName: "default", RelativePath: "nonexistent"}, + }) + + require.EqualError(t, err, "server-side remove repo: remove: rpc error: code = NotFound desc = repository does not exist") +} -- cgit v1.2.3 From 91e9c2e1e0c3f26db2b3a8b2acf19082c0b79bd8 Mon Sep 17 00:00:00 2001 From: James Liu Date: Wed, 13 Dec 2023 17:03:58 +1100 Subject: backup: Add ListRepositories to the strategy Adds a new method to the Strategy interface used by regular and server-side backups for performing repository backups and restores. This new method calls the internal WalkRepos() RPC to fetch a list of repos in a given storage. --- internal/backup/backup.go | 40 +++++++++++++++++ internal/backup/backup_test.go | 89 +++++++++++++++++++++++++++++++++++++ internal/backup/pipeline.go | 7 +++ internal/backup/pipeline_test.go | 8 ++++ internal/backup/server_side.go | 42 +++++++++++++++++ internal/backup/server_side_test.go | 87 ++++++++++++++++++++++++++++++++++++ 6 files changed, 273 insertions(+) diff --git a/internal/backup/backup.go b/internal/backup/backup.go index bb6bad6a4..afdf63d48 100644 --- a/internal/backup/backup.go +++ b/internal/backup/backup.go @@ -249,6 +249,37 @@ func (mgr *Manager) RemoveRepository(ctx context.Context, req *RemoveRepositoryR return nil } +// ListRepositories returns a list of repositories found in the given storage. +func (mgr *Manager) ListRepositories(ctx context.Context, req *ListRepositoriesRequest) (repos []*gitalypb.Repository, err error) { + if err := setContextServerInfo(ctx, &req.Server, req.StorageName); err != nil { + return nil, fmt.Errorf("list repos: set context: %w", err) + } + + internalClient, err := mgr.newInternalClient(ctx, req.Server) + if err != nil { + return nil, fmt.Errorf("list repos: create client: %w", err) + } + + stream, err := internalClient.WalkRepos(ctx, &gitalypb.WalkReposRequest{StorageName: req.StorageName}) + if err != nil { + return nil, fmt.Errorf("list repos: walk: %w", err) + } + + for { + resp, err := stream.Recv() + if errors.Is(err, io.EOF) { + break + } + if err != nil { + return nil, fmt.Errorf("list repos: receiving messages: %w", err) + } + + repos = append(repos, &gitalypb.Repository{RelativePath: resp.RelativePath, StorageName: req.StorageName}) + } + + return repos, nil +} + // Create creates a repository backup. func (mgr *Manager) Create(ctx context.Context, req *CreateRequest) error { if req.VanityRepository == nil { @@ -592,3 +623,12 @@ func (mgr *Manager) newRepoClient(ctx context.Context, server storage.ServerInfo return gitalypb.NewRepositoryServiceClient(conn), nil } + +func (mgr *Manager) newInternalClient(ctx context.Context, server storage.ServerInfo) (gitalypb.InternalGitalyClient, error) { + conn, err := mgr.conns.Dial(ctx, server.Address, server.Token) + if err != nil { + return nil, err + } + + return gitalypb.NewInternalGitalyClient(conn), nil +} diff --git a/internal/backup/backup_test.go b/internal/backup/backup_test.go index ca92ad546..fa764cc52 100644 --- a/internal/backup/backup_test.go +++ b/internal/backup/backup_test.go @@ -108,6 +108,95 @@ func TestManager_RemoveRepository(t *testing.T) { require.EqualError(t, err, "remove repo: remove: rpc error: code = InvalidArgument desc = storage name not found") } +func TestManager_ListRepositories(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + desc string + repos map[string][]*gitalypb.Repository + }{ + { + desc: "no repos", + repos: make(map[string][]*gitalypb.Repository), + }, + { + desc: "repos in a single storage", + repos: map[string][]*gitalypb.Repository{ + "storage-1": { + {RelativePath: "a", StorageName: "storage-1"}, + {RelativePath: "b", StorageName: "storage-1"}, + }, + }, + }, + { + desc: "repos in multiple storages", + repos: map[string][]*gitalypb.Repository{ + "storage-1": { + {RelativePath: "a", StorageName: "storage-1"}, + {RelativePath: "b", StorageName: "storage-1"}, + }, + "storage-2": { + {RelativePath: "c", StorageName: "storage-2"}, + {RelativePath: "d", StorageName: "storage-2"}, + }, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + if testhelper.IsPraefectEnabled() { + t.Skip("local backup manager expects to operate on the local filesystem so cannot operate through praefect") + } + + var storages []string + for storageName := range tc.repos { + storages = append(storages, storageName) + } + + // We don't really need a "default" storage, but this makes initialisation cleaner since + // WithStorages() takes at least one argument. + cfg := testcfg.Build(t, testcfg.WithStorages("default", storages...)) + cfg.SocketPath = testserver.RunGitalyServer(t, cfg, setup.RegisterAll) + + ctx := testhelper.Context(t) + + for storageName, repos := range tc.repos { + for _, repo := range repos { + storagePath, ok := cfg.StoragePath(storageName) + require.True(t, ok) + + _, _ = gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + RelativePath: repo.RelativePath, + Storage: config.Storage{Name: storageName, Path: storagePath}, + }) + } + } + + pool := client.NewPool() + defer testhelper.MustClose(t, pool) + + backupRoot := testhelper.TempDir(t) + sink := backup.NewFilesystemSink(backupRoot) + defer testhelper.MustClose(t, sink) + + locator, err := backup.ResolveLocator("pointer", sink) + require.NoError(t, err) + + fsBackup := backup.NewManager(sink, locator, pool) + + for storageName, repos := range tc.repos { + actualRepos, err := fsBackup.ListRepositories(ctx, &backup.ListRepositoriesRequest{ + Server: storage.ServerInfo{Address: cfg.SocketPath, Token: cfg.Auth.Token}, + StorageName: storageName, + }) + + require.NoError(t, err) + require.EqualValues(t, repos, actualRepos) + } + }) + } +} + func TestManager_Create(t *testing.T) { t.Parallel() diff --git a/internal/backup/pipeline.go b/internal/backup/pipeline.go index 1b969ccac..72e5d9740 100644 --- a/internal/backup/pipeline.go +++ b/internal/backup/pipeline.go @@ -16,6 +16,7 @@ import ( type Strategy interface { Create(context.Context, *CreateRequest) error Restore(context.Context, *RestoreRequest) error + ListRepositories(context.Context, *ListRepositoriesRequest) ([]*gitalypb.Repository, error) RemoveRepository(context.Context, *RemoveRepositoryRequest) error RemoveAllRepositories(context.Context, *RemoveAllRepositoriesRequest) error } @@ -66,6 +67,12 @@ type RemoveAllRepositoriesRequest struct { StorageName string } +// ListRepositoriesRequest is the request to list repositories in a given storage. +type ListRepositoriesRequest struct { + Server storage.ServerInfo + StorageName string +} + // Command handles a specific backup operation type Command interface { Repository() *gitalypb.Repository diff --git a/internal/backup/pipeline_test.go b/internal/backup/pipeline_test.go index cd77fb5a7..22dd65a83 100644 --- a/internal/backup/pipeline_test.go +++ b/internal/backup/pipeline_test.go @@ -126,6 +126,7 @@ type MockStrategy struct { RestoreFunc func(context.Context, *RestoreRequest) error RemoveAllRepositoriesFunc func(context.Context, *RemoveAllRepositoriesRequest) error RemoveRepositoryFunc func(context.Context, *RemoveRepositoryRequest) error + ListRepositoriesFunc func(context.Context, *ListRepositoriesRequest) ([]*gitalypb.Repository, error) } func (s MockStrategy) Create(ctx context.Context, req *CreateRequest) error { @@ -156,6 +157,13 @@ func (s MockStrategy) RemoveRepository(ctx context.Context, req *RemoveRepositor return nil } +func (s MockStrategy) ListRepositories(ctx context.Context, req *ListRepositoriesRequest) ([]*gitalypb.Repository, error) { + if s.ListRepositoriesFunc != nil { + return s.ListRepositoriesFunc(ctx, req) + } + return nil, nil +} + func testPipeline(t *testing.T, init func() *Pipeline) { strategy := MockStrategy{ CreateFunc: func(_ context.Context, req *CreateRequest) error { diff --git a/internal/backup/server_side.go b/internal/backup/server_side.go index 17382e66e..1b2319d73 100644 --- a/internal/backup/server_side.go +++ b/internal/backup/server_side.go @@ -2,7 +2,9 @@ package backup import ( "context" + "errors" "fmt" + "io" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" @@ -131,6 +133,37 @@ func (ss ServerSideAdapter) RemoveRepository(ctx context.Context, req *RemoveRep return nil } +// ListRepositories returns a list of repositories found in the given storage. +func (ss ServerSideAdapter) ListRepositories(ctx context.Context, req *ListRepositoriesRequest) (repos []*gitalypb.Repository, err error) { + if err := setContextServerInfo(ctx, &req.Server, req.StorageName); err != nil { + return nil, fmt.Errorf("server-side list repos: set context: %w", err) + } + + internalClient, err := ss.newInternalClient(ctx, req.Server) + if err != nil { + return nil, fmt.Errorf("server-side list repos: create client: %w", err) + } + + stream, err := internalClient.WalkRepos(ctx, &gitalypb.WalkReposRequest{StorageName: req.StorageName}) + if err != nil { + return nil, fmt.Errorf("server-side list repos: walk: %w", err) + } + + for { + resp, err := stream.Recv() + if errors.Is(err, io.EOF) { + break + } + if err != nil { + return nil, err + } + + repos = append(repos, &gitalypb.Repository{RelativePath: resp.RelativePath, StorageName: req.StorageName}) + } + + return repos, nil +} + func (ss ServerSideAdapter) newRepoClient(ctx context.Context, server storage.ServerInfo) (gitalypb.RepositoryServiceClient, error) { conn, err := ss.pool.Dial(ctx, server.Address, server.Token) if err != nil { @@ -139,3 +172,12 @@ func (ss ServerSideAdapter) newRepoClient(ctx context.Context, server storage.Se return gitalypb.NewRepositoryServiceClient(conn), nil } + +func (ss ServerSideAdapter) newInternalClient(ctx context.Context, server storage.ServerInfo) (gitalypb.InternalGitalyClient, error) { + conn, err := ss.pool.Dial(ctx, server.Address, server.Token) + if err != nil { + return nil, err + } + + return gitalypb.NewInternalGitalyClient(conn), nil +} diff --git a/internal/backup/server_side_test.go b/internal/backup/server_side_test.go index a7957bcd8..e4f1383c3 100644 --- a/internal/backup/server_side_test.go +++ b/internal/backup/server_side_test.go @@ -13,9 +13,11 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/setup" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" + "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testdb" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testserver" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) @@ -330,3 +332,88 @@ func TestServerSideAdapter_RemoveRepository(t *testing.T) { require.EqualError(t, err, "server-side remove repo: remove: rpc error: code = NotFound desc = repository does not exist") } + +func TestServerSideAdapter_ListRepositories(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + desc string + repos map[string][]*gitalypb.Repository + }{ + { + desc: "no repos", + repos: make(map[string][]*gitalypb.Repository), + }, + { + desc: "repos in a single storage", + repos: map[string][]*gitalypb.Repository{ + "storage-1": { + {RelativePath: "a", StorageName: "storage-1"}, + {RelativePath: "b", StorageName: "storage-1"}, + }, + }, + }, + { + desc: "repos in multiple storages", + repos: map[string][]*gitalypb.Repository{ + "storage-1": { + {RelativePath: "a", StorageName: "storage-1"}, + {RelativePath: "b", StorageName: "storage-1"}, + }, + "storage-2": { + {RelativePath: "c", StorageName: "storage-2"}, + {RelativePath: "d", StorageName: "storage-2"}, + }, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + var storages []string + for storageName := range tc.repos { + storages = append(storages, storageName) + } + + db := testdb.New(t) + db.TruncateAll(t) + rs := datastore.NewPostgresRepositoryStore(db, map[string][]string{"virtual-storage": storages}) + + // We don't really need a "default" storage, but this makes initialisation cleaner since + // WithStorages() takes at least one argument. + cfg := testcfg.Build(t, testcfg.WithStorages("default", storages...)) + cfg.SocketPath = testserver.RunGitalyServer(t, cfg, setup.RegisterAll) + + ctx := testhelper.Context(t) + + repoID := 1 + for storageName, repos := range tc.repos { + for _, repo := range repos { + storagePath, ok := cfg.StoragePath(storageName) + require.True(t, ok) + + _, _ = gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + RelativePath: repo.RelativePath, + Storage: config.Storage{Name: storageName, Path: storagePath}, + }) + + require.NoError(t, rs.CreateRepository(ctx, int64(repoID), "virtual-storage", repo.RelativePath, repo.RelativePath, storageName, nil, nil, false, false)) + + repoID++ + } + } + + pool := client.NewPool() + defer testhelper.MustClose(t, pool) + + adapter := backup.NewServerSideAdapter(pool) + + for storageName, repos := range tc.repos { + actualRepos, err := adapter.ListRepositories(ctx, &backup.ListRepositoriesRequest{ + Server: storage.ServerInfo{Address: cfg.SocketPath, Token: cfg.Auth.Token}, + StorageName: storageName, + }) + require.NoError(t, err) + require.EqualValues(t, repos, actualRepos) + } + }) + } +} -- cgit v1.2.3 From 07c7b8b56b241e98f6a3dc3a1b47070b3a7602a9 Mon Sep 17 00:00:00 2001 From: James Liu Date: Thu, 14 Dec 2023 11:48:00 +1100 Subject: backup: Stop sending invalid requests Modifies the Restore CLI tests so we stop sending an invalid restore command as the final JSON object into stdin. This is required as a subsequent commit will move the repository removal logic to execute after the Pipeline completes successfully. If the tests purposely cause the pipeline to fail, the restore logic will never execute. Coverage of the Pipeline's error messaging is covered separately in the Pipeline's unit tests. --- internal/cli/gitalybackup/restore_test.go | 20 ++------------------ 1 file changed, 2 insertions(+), 18 deletions(-) diff --git a/internal/cli/gitalybackup/restore_test.go b/internal/cli/gitalybackup/restore_test.go index 4eae52e4c..6d235525c 100644 --- a/internal/cli/gitalybackup/restore_test.go +++ b/internal/cli/gitalybackup/restore_test.go @@ -75,12 +75,6 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) })) } - require.NoError(t, encoder.Encode(map[string]string{ - "address": "invalid", - "token": "invalid", - "relative_path": "invalid", - })) - ctx = testhelper.MergeIncomingMetadata(ctx, testcfg.GitalyServersMetadataFromCfg(t, cfg)) args := []string{ @@ -103,9 +97,7 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) require.DirExists(t, existRepoPath) - require.EqualError(t, - cmd.RunContext(ctx, args), - "restore: pipeline: 1 failures encountered:\n - invalid: manager: could not dial source: invalid connection string: \"invalid\"\n") + require.NoError(t, cmd.RunContext(ctx, args)) require.NoDirExists(t, existRepoPath) @@ -179,12 +171,6 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) })) } - require.NoError(t, encoder.Encode(map[string]string{ - "address": "invalid", - "token": "invalid", - "relative_path": "invalid", - })) - ctx = testhelper.MergeIncomingMetadata(ctx, testcfg.GitalyServersMetadataFromCfg(t, cfg)) args := []string{ @@ -207,9 +193,7 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) require.DirExists(t, existRepoPath) - require.EqualError(t, - cmd.RunContext(ctx, args), - "restore: pipeline: 1 failures encountered:\n - invalid: server-side restore: could not dial source: invalid connection string: \"invalid\"\n") + require.NoError(t, cmd.RunContext(ctx, args)) require.NoDirExists(t, existRepoPath) -- cgit v1.2.3 From 592cd8528e30f1e4c0810b1889854aa38b208418 Mon Sep 17 00:00:00 2001 From: James Liu Date: Thu, 14 Dec 2023 11:51:00 +1100 Subject: backup: Only remove "dangling" repositories Instead of invoking the RemoveAll() RPC prior to the restore, we instead compare the set of existing repositories to the set of repositories contained in the backup being restored. The difference between the two sets -- the set of "dangling" repositories -- are removed individually. This is done to ensure the state of repos in Gitaly matches the worldview held by the Rails DB after a GitLab instance restore. Also fixes the existing tests so that all repositories are created with `gittest.CreateRepository` and are thus visible when we later query `ListRepositories` in the restore logic. --- internal/backup/pipeline.go | 20 +++++++++++--- internal/backup/pipeline_test.go | 29 +++++++++++++------- internal/cli/gitalybackup/restore.go | 35 +++++++++++++++++++----- internal/cli/gitalybackup/restore_test.go | 45 ++++++++++++++++++++----------- 4 files changed, 93 insertions(+), 36 deletions(-) diff --git a/internal/backup/pipeline.go b/internal/backup/pipeline.go index 72e5d9740..bfb298350 100644 --- a/internal/backup/pipeline.go +++ b/internal/backup/pipeline.go @@ -12,6 +12,10 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) +// repositoryKey uniquely identifies a repository, and is used here to key the +// map of processed repos. +type repositoryKey string + // Strategy used to create/restore backups type Strategy interface { Create(context.Context, *CreateRequest) error @@ -196,7 +200,7 @@ type Pipeline struct { pipelineError error cmdErrors *commandErrors - processedRepos map[string][]*gitalypb.Repository + processedRepos map[string]map[repositoryKey]struct{} processedReposMu sync.Mutex } @@ -212,7 +216,7 @@ func NewPipeline(log log.Logger, opts ...PipelineOption) (*Pipeline, error) { done: make(chan struct{}), workersByStorage: make(map[string]chan *contextCommand), cmdErrors: &commandErrors{}, - processedRepos: make(map[string][]*gitalypb.Repository), + processedRepos: make(map[string]map[repositoryKey]struct{}), } for _, opt := range opts { @@ -270,7 +274,7 @@ func (p *Pipeline) Handle(ctx context.Context, cmd Command) { } // Done waits for any in progress jobs to complete then reports any accumulated errors -func (p *Pipeline) Done() (processedRepos map[string][]*gitalypb.Repository, err error) { +func (p *Pipeline) Done() (processedRepos map[string]map[repositoryKey]struct{}, err error) { close(p.done) p.workerWg.Wait() @@ -345,7 +349,10 @@ func (p *Pipeline) processCommand(ctx context.Context, cmd Command) { storageName := cmd.Repository().StorageName p.processedReposMu.Lock() - p.processedRepos[storageName] = append(p.processedRepos[storageName], cmd.Repository()) + if _, ok := p.processedRepos[storageName]; !ok { + p.processedRepos[storageName] = make(map[repositoryKey]struct{}) + } + p.processedRepos[storageName][NewRepositoryKey(cmd.Repository())] = struct{}{} p.processedReposMu.Unlock() log.Info(fmt.Sprintf("completed %s", cmd.Name())) @@ -386,3 +393,8 @@ func (p *Pipeline) releaseWorkerSlot() { } <-p.totalWorkers } + +// NewRepositoryKey returns a unique identifier for the provided repo. +func NewRepositoryKey(repo *gitalypb.Repository) repositoryKey { + return repositoryKey(repo.StorageName + "-" + repo.RelativePath) +} diff --git a/internal/backup/pipeline_test.go b/internal/backup/pipeline_test.go index 22dd65a83..06d071f4b 100644 --- a/internal/backup/pipeline_test.go +++ b/internal/backup/pipeline_test.go @@ -331,26 +331,35 @@ func TestPipelineError(t *testing.T) { func TestPipelineProcessedRepos(t *testing.T) { strategy := MockStrategy{} - repos := map[string][]*gitalypb.Repository{ + repos := []*gitalypb.Repository{ + {RelativePath: "a.git", StorageName: "storage1"}, + {RelativePath: "b.git", StorageName: "storage1"}, + {RelativePath: "c.git", StorageName: "storage2"}, + {RelativePath: "d.git", StorageName: "storage3"}, + } + + expectedProcessedRepos := map[string]map[repositoryKey]struct{}{ "storage1": { - {RelativePath: "a.git", StorageName: "storage1"}, - {RelativePath: "b.git", StorageName: "storage1"}, + "storage1-a.git": {}, + "storage1-b.git": {}, + }, + "storage2": { + "storage2-c.git": {}, + }, + "storage3": { + "storage3-d.git": {}, }, - "storage2": {{RelativePath: "c.git", StorageName: "storage2"}}, - "storage3": {{RelativePath: "d.git", StorageName: "storage3"}}, } p, err := NewPipeline(testhelper.SharedLogger(t)) require.NoError(t, err) ctx := testhelper.Context(t) - for _, v := range repos { - for _, repo := range v { - p.Handle(ctx, NewRestoreCommand(strategy, RestoreRequest{Repository: repo})) - } + for _, repo := range repos { + p.Handle(ctx, NewRestoreCommand(strategy, RestoreRequest{Repository: repo})) } processedRepos, err := p.Done() require.NoError(t, err) - require.EqualValues(t, repos, processedRepos) + require.EqualValues(t, expectedProcessedRepos, processedRepos) } diff --git a/internal/cli/gitalybackup/restore.go b/internal/cli/gitalybackup/restore.go index 7a304f2ed..7a3833f80 100644 --- a/internal/cli/gitalybackup/restore.go +++ b/internal/cli/gitalybackup/restore.go @@ -135,15 +135,18 @@ func (cmd *restoreSubcommand) run(ctx context.Context, logger log.Logger, stdin manager = backup.NewManager(sink, locator, pool) } + // Get the set of existing repositories keyed by storage. We'll later use this to determine any + // dangling repos that should be removed. + existingRepos := make(map[string][]*gitalypb.Repository) for _, storageName := range cmd.removeAllRepositories { - err := manager.RemoveAllRepositories(ctx, &backup.RemoveAllRepositoriesRequest{ + repos, err := manager.ListRepositories(ctx, &backup.ListRepositoriesRequest{ StorageName: storageName, }) if err != nil { - // Treat RemoveAll failures as soft failures until we can determine - // how often it fails. - logger.WithError(err).WithField("storage_name", storageName).Warn("failed to remove all repositories") + logger.WithError(err).WithField("storage_name", storageName).Warn("failed to list repositories") } + + existingRepos[storageName] = repos } var opts []backup.PipelineOption @@ -157,7 +160,6 @@ func (cmd *restoreSubcommand) run(ctx context.Context, logger log.Logger, stdin decoder := json.NewDecoder(stdin) for { - var req restoreRequest if err := decoder.Decode(&req); errors.Is(err, io.EOF) { break @@ -179,8 +181,29 @@ func (cmd *restoreSubcommand) run(ctx context.Context, logger log.Logger, stdin })) } - if _, err := pipeline.Done(); err != nil { + restoredRepos, err := pipeline.Done() + if err != nil { return fmt.Errorf("restore: %w", err) } + + var removalErrors []error + for storageName, repos := range existingRepos { + for _, repo := range repos { + if _, ok := restoredRepos[storageName][backup.NewRepositoryKey(repo)]; !ok { + // If we have dangling repos (those which exist in the storage but + // weren't part of the restore), they need to be deleted so the + // state of repos in Gitaly matches that in the Rails DB. + if err := manager.RemoveRepository(ctx, &backup.RemoveRepositoryRequest{Repo: repo}); err != nil { + removalErrors = append(removalErrors, fmt.Errorf("storage_name %q relative_path %q: %w", storageName, repo.RelativePath, err)) + } + } + } + } + + if len(removalErrors) > 0 { + return fmt.Errorf("remove dangling repositories: %d failures encountered: %w", + len(removalErrors), errors.Join(removalErrors...)) + } + return nil } diff --git a/internal/cli/gitalybackup/restore_test.go b/internal/cli/gitalybackup/restore_test.go index 6d235525c..033420d8f 100644 --- a/internal/cli/gitalybackup/restore_test.go +++ b/internal/cli/gitalybackup/restore_test.go @@ -40,24 +40,32 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) cfg.SocketPath = testserver.RunGitalyServer(t, cfg, setup.RegisterAll) + // This is an example of a "dangling" repository (one that was created after a backup was taken) that should be + // removed after the backup is restored. existingRepo, existRepoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ RelativePath: "existing_repo", }) gittest.WriteCommit(t, cfg, existRepoPath, gittest.WithBranch(git.DefaultBranch)) - path := testhelper.TempDir(t) - existingRepoBundlePath := filepath.Join(path, existingRepo.RelativePath+".bundle") - existingRepoRefPath := filepath.Join(path, existingRepo.RelativePath+".refs") + // The backupDir contains the artifacts that would've been created as part of a backup. + backupDir := testhelper.TempDir(t) + existingRepoBundlePath := filepath.Join(backupDir, existingRepo.RelativePath+".bundle") + existingRepoRefPath := filepath.Join(backupDir, existingRepo.RelativePath+".refs") gittest.Exec(t, cfg, "-C", existRepoPath, "bundle", "create", existingRepoBundlePath, "--all") require.NoError(t, os.WriteFile(existingRepoRefPath, gittest.Exec(t, cfg, "-C", existRepoPath, "show-ref"), perm.SharedFile)) + // These repos are the ones being restored, and should exist after the restore. var repos []*gitalypb.Repository for i := 0; i < 2; i++ { - repo := gittest.InitRepoDir(t, cfg.Storages[0].Path, fmt.Sprintf("repo-%d", i)) - repoBundlePath := filepath.Join(path, repo.RelativePath+".bundle") - repoRefPath := filepath.Join(path, repo.RelativePath+".refs") + repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + RelativePath: fmt.Sprintf("repo-%d", i), + Storage: cfg.Storages[0], + }) + + repoBundlePath := filepath.Join(backupDir, repo.RelativePath+".bundle") testhelper.CopyFile(t, existingRepoBundlePath, repoBundlePath) + repoRefPath := filepath.Join(backupDir, repo.RelativePath+".refs") testhelper.CopyFile(t, existingRepoRefPath, repoRefPath) repos = append(repos, repo) } @@ -81,7 +89,7 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) progname, "restore", "--path", - path, + backupDir, "--parallel", strconv.Itoa(runtime.NumCPU()), "--parallel-storage", @@ -101,9 +109,10 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) require.NoDirExists(t, existRepoPath) + // Ensure the repos were restored correctly. for _, repo := range repos { repoPath := filepath.Join(cfg.Storages[0].Path, gittest.GetReplicaPath(t, ctx, cfg, repo)) - bundlePath := filepath.Join(path, repo.RelativePath+".bundle") + bundlePath := filepath.Join(backupDir, repo.RelativePath+".bundle") output := gittest.Exec(t, cfg, "-C", repoPath, "bundle", "verify", bundlePath) require.Contains(t, string(output), "The bundle records a complete history") @@ -122,8 +131,8 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) ctx := testhelper.Context(t) - path := testhelper.TempDir(t) - backupSink, err := backup.ResolveSink(ctx, path) + backupDir := testhelper.TempDir(t) + backupSink, err := backup.ResolveSink(ctx, backupDir) require.NoError(t, err) backupLocator, err := backup.ResolveLocator("pointer", backupSink) @@ -142,18 +151,22 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) }) gittest.WriteCommit(t, cfg, existRepoPath, gittest.WithBranch(git.DefaultBranch)) - existingRepoBundlePath := filepath.Join(path, existingRepo.RelativePath+".bundle") - existingRepoRefPath := filepath.Join(path, existingRepo.RelativePath+".refs") + existingRepoBundlePath := filepath.Join(backupDir, existingRepo.RelativePath+".bundle") + existingRepoRefPath := filepath.Join(backupDir, existingRepo.RelativePath+".refs") gittest.Exec(t, cfg, "-C", existRepoPath, "bundle", "create", existingRepoBundlePath, "--all") require.NoError(t, os.WriteFile(existingRepoRefPath, gittest.Exec(t, cfg, "-C", existRepoPath, "show-ref"), perm.SharedFile)) var repos []*gitalypb.Repository for i := 0; i < 2; i++ { - repo := gittest.InitRepoDir(t, cfg.Storages[0].Path, fmt.Sprintf("repo-%d", i)) - repoBundlePath := filepath.Join(path, repo.RelativePath+".bundle") - repoRefPath := filepath.Join(path, repo.RelativePath+".refs") + repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + RelativePath: fmt.Sprintf("repo-%d", i), + Storage: cfg.Storages[0], + }) + + repoBundlePath := filepath.Join(backupDir, repo.RelativePath+".bundle") testhelper.CopyFile(t, existingRepoBundlePath, repoBundlePath) + repoRefPath := filepath.Join(backupDir, repo.RelativePath+".refs") testhelper.CopyFile(t, existingRepoRefPath, repoRefPath) repos = append(repos, repo) } @@ -199,7 +212,7 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) for _, repo := range repos { repoPath := filepath.Join(cfg.Storages[0].Path, gittest.GetReplicaPath(t, ctx, cfg, repo)) - bundlePath := filepath.Join(path, repo.RelativePath+".bundle") + bundlePath := filepath.Join(backupDir, repo.RelativePath+".bundle") output := gittest.Exec(t, cfg, "-C", repoPath, "bundle", "verify", bundlePath) require.Contains(t, string(output), "The bundle records a complete history") -- cgit v1.2.3 From bdb0fb1e75db19747c4e9fd84f5fa4bec6d9faa7 Mon Sep 17 00:00:00 2001 From: James Liu Date: Thu, 14 Dec 2023 11:51:56 +1100 Subject: backup: Delete RemoveAllRepositories from Strategy This is now deprecated in favour of removing individual repos. --- internal/backup/backup.go | 19 ------------------ internal/backup/backup_test.go | 37 ----------------------------------- internal/backup/pipeline.go | 8 -------- internal/backup/pipeline_test.go | 16 ++++----------- internal/backup/server_side.go | 19 ------------------ internal/backup/server_side_test.go | 39 ------------------------------------- 6 files changed, 4 insertions(+), 134 deletions(-) diff --git a/internal/backup/backup.go b/internal/backup/backup.go index afdf63d48..a15f47abe 100644 --- a/internal/backup/backup.go +++ b/internal/backup/backup.go @@ -211,25 +211,6 @@ func NewManagerLocal( } } -// RemoveAllRepositories removes all repositories in the specified storage name. -func (mgr *Manager) RemoveAllRepositories(ctx context.Context, req *RemoveAllRepositoriesRequest) error { - if err := setContextServerInfo(ctx, &req.Server, req.StorageName); err != nil { - return fmt.Errorf("manager: %w", err) - } - - repoClient, err := mgr.newRepoClient(ctx, req.Server) - if err != nil { - return fmt.Errorf("manager: %w", err) - } - - _, err = repoClient.RemoveAll(ctx, &gitalypb.RemoveAllRequest{StorageName: req.StorageName}) - if err != nil { - return fmt.Errorf("manager: %w", err) - } - - return nil -} - // RemoveRepository removes the specified repository from its storage. func (mgr *Manager) RemoveRepository(ctx context.Context, req *RemoveRepositoryRequest) error { if err := setContextServerInfo(ctx, &req.Server, req.Repo.StorageName); err != nil { diff --git a/internal/backup/backup_test.go b/internal/backup/backup_test.go index fa764cc52..81e87a33a 100644 --- a/internal/backup/backup_test.go +++ b/internal/backup/backup_test.go @@ -28,43 +28,6 @@ import ( "google.golang.org/protobuf/proto" ) -func TestManager_RemoveAllRepositories(t *testing.T) { - testhelper.SkipWithWAL(t, ` -RemoveAll is removing the entire content of the storage. This would also remove the database's and -the transaction manager's disk state. The RPC needs to be updated to shut down all partitions and -the database and only then perform the removal. - -Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) - - t.Parallel() - - cfg := testcfg.Build(t) - cfg.SocketPath = testserver.RunGitalyServer(t, cfg, setup.RegisterAll) - - ctx := testhelper.Context(t) - - repo, repoPath := gittest.CreateRepository(t, ctx, cfg) - commitID := gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("main")) - gittest.WriteTag(t, cfg, repoPath, "v1.0.0", commitID.Revision()) - - pool := client.NewPool() - defer testhelper.MustClose(t, pool) - - backupRoot := testhelper.TempDir(t) - sink := backup.NewFilesystemSink(backupRoot) - defer testhelper.MustClose(t, sink) - - locator, err := backup.ResolveLocator("pointer", sink) - require.NoError(t, err) - - fsBackup := backup.NewManager(sink, locator, pool) - err = fsBackup.RemoveAllRepositories(ctx, &backup.RemoveAllRepositoriesRequest{ - Server: storage.ServerInfo{Address: cfg.SocketPath, Token: cfg.Auth.Token}, - StorageName: repo.StorageName, - }) - require.NoError(t, err) -} - func TestManager_RemoveRepository(t *testing.T) { if testhelper.IsPraefectEnabled() { t.Skip("local backup manager expects to operate on the local filesystem so cannot operate through praefect") diff --git a/internal/backup/pipeline.go b/internal/backup/pipeline.go index bfb298350..feb0eaf96 100644 --- a/internal/backup/pipeline.go +++ b/internal/backup/pipeline.go @@ -22,7 +22,6 @@ type Strategy interface { Restore(context.Context, *RestoreRequest) error ListRepositories(context.Context, *ListRepositoriesRequest) ([]*gitalypb.Repository, error) RemoveRepository(context.Context, *RemoveRepositoryRequest) error - RemoveAllRepositories(context.Context, *RemoveAllRepositoriesRequest) error } // CreateRequest is the request to create a backup @@ -64,13 +63,6 @@ type RemoveRepositoryRequest struct { Repo *gitalypb.Repository } -// RemoveAllRepositoriesRequest is the request to remove all repositories in the specified -// storage name. -type RemoveAllRepositoriesRequest struct { - Server storage.ServerInfo - StorageName string -} - // ListRepositoriesRequest is the request to list repositories in a given storage. type ListRepositoriesRequest struct { Server storage.ServerInfo diff --git a/internal/backup/pipeline_test.go b/internal/backup/pipeline_test.go index 06d071f4b..9d05eae74 100644 --- a/internal/backup/pipeline_test.go +++ b/internal/backup/pipeline_test.go @@ -122,11 +122,10 @@ func TestPipeline(t *testing.T) { } type MockStrategy struct { - CreateFunc func(context.Context, *CreateRequest) error - RestoreFunc func(context.Context, *RestoreRequest) error - RemoveAllRepositoriesFunc func(context.Context, *RemoveAllRepositoriesRequest) error - RemoveRepositoryFunc func(context.Context, *RemoveRepositoryRequest) error - ListRepositoriesFunc func(context.Context, *ListRepositoriesRequest) ([]*gitalypb.Repository, error) + CreateFunc func(context.Context, *CreateRequest) error + RestoreFunc func(context.Context, *RestoreRequest) error + RemoveRepositoryFunc func(context.Context, *RemoveRepositoryRequest) error + ListRepositoriesFunc func(context.Context, *ListRepositoriesRequest) ([]*gitalypb.Repository, error) } func (s MockStrategy) Create(ctx context.Context, req *CreateRequest) error { @@ -143,13 +142,6 @@ func (s MockStrategy) Restore(ctx context.Context, req *RestoreRequest) error { return nil } -func (s MockStrategy) RemoveAllRepositories(ctx context.Context, req *RemoveAllRepositoriesRequest) error { - if s.RemoveAllRepositoriesFunc != nil { - return s.RemoveAllRepositoriesFunc(ctx, req) - } - return nil -} - func (s MockStrategy) RemoveRepository(ctx context.Context, req *RemoveRepositoryRequest) error { if s.RemoveRepositoryFunc != nil { return s.RemoveRepositoryFunc(ctx, req) diff --git a/internal/backup/server_side.go b/internal/backup/server_side.go index 1b2319d73..a1a9a37eb 100644 --- a/internal/backup/server_side.go +++ b/internal/backup/server_side.go @@ -95,25 +95,6 @@ func (ss ServerSideAdapter) Restore(ctx context.Context, req *RestoreRequest) er return nil } -// RemoveAllRepositories removes all repositories in the specified storage name. -func (ss ServerSideAdapter) RemoveAllRepositories(ctx context.Context, req *RemoveAllRepositoriesRequest) error { - if err := setContextServerInfo(ctx, &req.Server, req.StorageName); err != nil { - return fmt.Errorf("server-side remove all: %w", err) - } - - repoClient, err := ss.newRepoClient(ctx, req.Server) - if err != nil { - return fmt.Errorf("server-side remove all: %w", err) - } - - _, err = repoClient.RemoveAll(ctx, &gitalypb.RemoveAllRequest{StorageName: req.StorageName}) - if err != nil { - return fmt.Errorf("server-side remove all: %w", err) - } - - return nil -} - // RemoveRepository removes the specified repository from its storage. func (ss ServerSideAdapter) RemoveRepository(ctx context.Context, req *RemoveRepositoryRequest) error { if err := setContextServerInfo(ctx, &req.Server, req.Repo.StorageName); err != nil { diff --git a/internal/backup/server_side_test.go b/internal/backup/server_side_test.go index e4f1383c3..69669df60 100644 --- a/internal/backup/server_side_test.go +++ b/internal/backup/server_side_test.go @@ -259,45 +259,6 @@ func TestServerSideAdapter_Restore(t *testing.T) { } } -func TestServerSideAdapter_RemoveAllRepositories(t *testing.T) { - testhelper.SkipWithWAL(t, ` -RemoveAll is removing the entire content of the storage. This would also remove the database's and -the transaction manager's disk state. The RPC needs to be updated to shut down all partitions and -the database and only then perform the removal. - -Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) - - t.Parallel() - - backupRoot := testhelper.TempDir(t) - sink := backup.NewFilesystemSink(backupRoot) - defer testhelper.MustClose(t, sink) - - locator, err := backup.ResolveLocator("pointer", sink) - require.NoError(t, err) - - cfg := testcfg.Build(t) - cfg.SocketPath = testserver.RunGitalyServer(t, cfg, setup.RegisterAll, - testserver.WithBackupSink(sink), - testserver.WithBackupLocator(locator), - ) - - ctx := testhelper.Context(t) - - repo, repoPath := gittest.CreateRepository(t, ctx, cfg) - gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("main")) - - pool := client.NewPool() - defer testhelper.MustClose(t, pool) - - adapter := backup.NewServerSideAdapter(pool) - err = adapter.RemoveAllRepositories(ctx, &backup.RemoveAllRepositoriesRequest{ - Server: storage.ServerInfo{Address: cfg.SocketPath, Token: cfg.Auth.Token}, - StorageName: repo.StorageName, - }) - require.NoError(t, err) -} - func TestServerSideAdapter_RemoveRepository(t *testing.T) { t.Parallel() -- cgit v1.2.3 From a9b8db1b0894b1d613c77b2736409563199d5e62 Mon Sep 17 00:00:00 2001 From: James Liu Date: Thu, 14 Dec 2023 11:55:36 +1100 Subject: proto: Deprecate RemoveAll Now that we've adjusted the restore mechanism to delete individual repos as needed, this RPC is no longer required. See the following issues for more context: - https://gitlab.com/gitlab-org/gitaly/-/issues/5357 - https://gitlab.com/gitlab-org/gitaly/-/issues/5269 Changelog: deprecated --- .../gitaly/service/repository/remove_all_test.go | 1 + internal/praefect/remove_all.go | 1 + internal/praefect/remove_all_test.go | 1 + proto/go/gitalypb/repository.pb.go | 52 +++++++++++----------- proto/go/gitalypb/repository_grpc.pb.go | 5 +++ proto/repository.proto | 2 + 6 files changed, 36 insertions(+), 26 deletions(-) diff --git a/internal/gitaly/service/repository/remove_all_test.go b/internal/gitaly/service/repository/remove_all_test.go index 785f54b01..fd913523a 100644 --- a/internal/gitaly/service/repository/remove_all_test.go +++ b/internal/gitaly/service/repository/remove_all_test.go @@ -28,6 +28,7 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) require.DirExists(t, repo1Path) require.DirExists(t, repo1Path) + //nolint:staticcheck _, err := client.RemoveAll(ctx, &gitalypb.RemoveAllRequest{ StorageName: cfg.Storages[0].Name, }) diff --git a/internal/praefect/remove_all.go b/internal/praefect/remove_all.go index 9fa08206f..9c383cd0e 100644 --- a/internal/praefect/remove_all.go +++ b/internal/praefect/remove_all.go @@ -32,6 +32,7 @@ func RemoveAllHandler(rs datastore.RepositoryStore, conns Connections) grpc.Stre conn := conn group.Go(func() error { + //nolint:staticcheck _, err := gitalypb.NewRepositoryServiceClient(conn).RemoveAll(ctx, &gitalypb.RemoveAllRequest{ StorageName: rewrittenStorage, }) diff --git a/internal/praefect/remove_all_test.go b/internal/praefect/remove_all_test.go index 965ffb95b..6b5e6f848 100644 --- a/internal/praefect/remove_all_test.go +++ b/internal/praefect/remove_all_test.go @@ -96,6 +96,7 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) _, err = client.RepositorySize(ctx, &gitalypb.RepositorySizeRequest{Repository: &gitalypb.Repository{}}) testhelper.RequireGrpcError(t, errServedByGitaly, err) + //nolint:staticcheck resp, err := client.RemoveAll(ctx, &gitalypb.RemoveAllRequest{StorageName: virtualStorage}) require.NoError(t, err) diff --git a/proto/go/gitalypb/repository.pb.go b/proto/go/gitalypb/repository.pb.go index ca005e2b5..bdfd812ed 100644 --- a/proto/go/gitalypb/repository.pb.go +++ b/proto/go/gitalypb/repository.pb.go @@ -6163,7 +6163,7 @@ var file_repository_proto_rawDesc = []byte{ 0x1c, 0x0a, 0x09, 0x61, 0x74, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x61, 0x74, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, - 0x6c, 0x75, 0x65, 0x32, 0xaf, 0x20, 0x0a, 0x11, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, + 0x6c, 0x75, 0x65, 0x32, 0xb2, 0x20, 0x0a, 0x11, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x5d, 0x0a, 0x10, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x45, 0x78, 0x69, 0x73, 0x74, 0x73, 0x12, 0x1f, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, @@ -6399,34 +6399,34 @@ var file_repository_proto_rawDesc = []byte{ 0x79, 0x2e, 0x46, 0x75, 0x6c, 0x6c, 0x50, 0x61, 0x74, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x46, 0x75, 0x6c, 0x6c, 0x50, 0x61, 0x74, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x09, 0xfa, 0x97, 0x28, - 0x02, 0x08, 0x02, 0x88, 0x02, 0x01, 0x12, 0x4a, 0x0a, 0x09, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, + 0x02, 0x08, 0x02, 0x88, 0x02, 0x01, 0x12, 0x4d, 0x0a, 0x09, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x41, 0x6c, 0x6c, 0x12, 0x18, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x41, 0x6c, 0x6c, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x41, 0x6c, 0x6c, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x08, 0xfa, 0x97, 0x28, 0x04, 0x08, 0x01, - 0x10, 0x02, 0x12, 0x5d, 0x0a, 0x10, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x70, 0x6f, - 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x1f, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, - 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, - 0x2e, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, - 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x06, 0xfa, 0x97, 0x28, 0x02, 0x08, - 0x02, 0x12, 0x60, 0x0a, 0x11, 0x52, 0x65, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x52, 0x65, 0x70, 0x6f, - 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x20, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, - 0x52, 0x65, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, - 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x21, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, - 0x79, 0x2e, 0x52, 0x65, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, - 0x6f, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x06, 0xfa, 0x97, 0x28, - 0x02, 0x08, 0x01, 0x12, 0x60, 0x0a, 0x11, 0x47, 0x65, 0x74, 0x46, 0x69, 0x6c, 0x65, 0x41, 0x74, - 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x73, 0x12, 0x20, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, - 0x79, 0x2e, 0x47, 0x65, 0x74, 0x46, 0x69, 0x6c, 0x65, 0x41, 0x74, 0x74, 0x72, 0x69, 0x62, 0x75, - 0x74, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x21, 0x2e, 0x67, 0x69, 0x74, - 0x61, 0x6c, 0x79, 0x2e, 0x47, 0x65, 0x74, 0x46, 0x69, 0x6c, 0x65, 0x41, 0x74, 0x74, 0x72, 0x69, - 0x62, 0x75, 0x74, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x06, 0xfa, - 0x97, 0x28, 0x02, 0x08, 0x02, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2e, - 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2d, 0x6f, 0x72, 0x67, 0x2f, 0x67, - 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x36, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, - 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x33, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x0b, 0xfa, 0x97, 0x28, 0x04, 0x08, 0x01, + 0x10, 0x02, 0x88, 0x02, 0x01, 0x12, 0x5d, 0x0a, 0x10, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52, + 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x1f, 0x2e, 0x67, 0x69, 0x74, 0x61, + 0x6c, 0x79, 0x2e, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, + 0x6f, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x67, 0x69, 0x74, + 0x61, 0x6c, 0x79, 0x2e, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, + 0x74, 0x6f, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x06, 0xfa, 0x97, + 0x28, 0x02, 0x08, 0x02, 0x12, 0x60, 0x0a, 0x11, 0x52, 0x65, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x52, + 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x20, 0x2e, 0x67, 0x69, 0x74, 0x61, + 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, + 0x74, 0x6f, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x21, 0x2e, 0x67, 0x69, + 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x52, 0x65, 0x70, 0x6f, + 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x06, + 0xfa, 0x97, 0x28, 0x02, 0x08, 0x01, 0x12, 0x60, 0x0a, 0x11, 0x47, 0x65, 0x74, 0x46, 0x69, 0x6c, + 0x65, 0x41, 0x74, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x73, 0x12, 0x20, 0x2e, 0x67, 0x69, + 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x47, 0x65, 0x74, 0x46, 0x69, 0x6c, 0x65, 0x41, 0x74, 0x74, 0x72, + 0x69, 0x62, 0x75, 0x74, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x21, 0x2e, + 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x47, 0x65, 0x74, 0x46, 0x69, 0x6c, 0x65, 0x41, 0x74, + 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x22, 0x06, 0xfa, 0x97, 0x28, 0x02, 0x08, 0x02, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x6c, + 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2d, 0x6f, 0x72, + 0x67, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x36, 0x2f, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, 0x62, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/proto/go/gitalypb/repository_grpc.pb.go b/proto/go/gitalypb/repository_grpc.pb.go index d5af55b94..b407b948c 100644 --- a/proto/go/gitalypb/repository_grpc.pb.go +++ b/proto/go/gitalypb/repository_grpc.pb.go @@ -192,7 +192,9 @@ type RepositoryServiceClient interface { // FullPath reads the "gitlab.fullpath" configuration from the repository's // gitconfig. Returns an error in case the full path has not been configured. FullPath(ctx context.Context, in *FullPathRequest, opts ...grpc.CallOption) (*FullPathResponse, error) + // Deprecated: Do not use. // RemoveAll deletes all repositories on a specified storage. + // Deprecated in favour of individually removing repositories with RemoveRepository. RemoveAll(ctx context.Context, in *RemoveAllRequest, opts ...grpc.CallOption) (*RemoveAllResponse, error) // BackupRepository creates a full or incremental backup streamed directly to // object-storage. The backup is created synchronously. The destination must @@ -957,6 +959,7 @@ func (c *repositoryServiceClient) FullPath(ctx context.Context, in *FullPathRequ return out, nil } +// Deprecated: Do not use. func (c *repositoryServiceClient) RemoveAll(ctx context.Context, in *RemoveAllRequest, opts ...grpc.CallOption) (*RemoveAllResponse, error) { out := new(RemoveAllResponse) err := c.cc.Invoke(ctx, "/gitaly.RepositoryService/RemoveAll", in, out, opts...) @@ -1167,7 +1170,9 @@ type RepositoryServiceServer interface { // FullPath reads the "gitlab.fullpath" configuration from the repository's // gitconfig. Returns an error in case the full path has not been configured. FullPath(context.Context, *FullPathRequest) (*FullPathResponse, error) + // Deprecated: Do not use. // RemoveAll deletes all repositories on a specified storage. + // Deprecated in favour of individually removing repositories with RemoveRepository. RemoveAll(context.Context, *RemoveAllRequest) (*RemoveAllResponse, error) // BackupRepository creates a full or incremental backup streamed directly to // object-storage. The backup is created synchronously. The destination must diff --git a/proto/repository.proto b/proto/repository.proto index 25d446f61..4a5d8dbf8 100644 --- a/proto/repository.proto +++ b/proto/repository.proto @@ -381,11 +381,13 @@ service RepositoryService { } // RemoveAll deletes all repositories on a specified storage. + // Deprecated in favour of individually removing repositories with RemoveRepository. rpc RemoveAll(RemoveAllRequest) returns (RemoveAllResponse) { option (op_type) = { op: MUTATOR scope_level: STORAGE }; + option deprecated = true; } // BackupRepository creates a full or incremental backup streamed directly to -- cgit v1.2.3 From e1f6da778cae5991314762474607b5bf3b228a9a Mon Sep 17 00:00:00 2001 From: James Liu Date: Mon, 18 Dec 2023 17:49:48 +1100 Subject: praefect: Intercept WalkRepos RPC Adds a handler to Praefect to intercept calls to the WalkRepos RPC. The handler provides an alternate implementation of listing repositories in a storage, which queries the Praefect DB rather than walking the filesystem on disk. This is required so when the RPC is invoked via Praefect, the DB is used as the source of truth rather than a random Gitaly node. The only user-facing difference between this and the original implementation is that the `modification_time` attribute of the response message is left empty, as this cannot be determined via the DB. --- internal/praefect/datastore/repository_store.go | 27 ++++++++ internal/praefect/server.go | 3 + internal/praefect/walkrepos.go | 47 +++++++++++++ internal/praefect/walkrepos_test.go | 88 +++++++++++++++++++++++++ 4 files changed, 165 insertions(+) create mode 100644 internal/praefect/walkrepos.go create mode 100644 internal/praefect/walkrepos_test.go diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go index 77dce6ba0..4925fcb50 100644 --- a/internal/praefect/datastore/repository_store.go +++ b/internal/praefect/datastore/repository_store.go @@ -111,6 +111,8 @@ type RepositoryStore interface { MarkVirtualStorageUnverified(ctx context.Context, virtualStorage string) (int64, error) // MarkStorageUnverified marsk all replicas on the storage as unverified. MarkStorageUnverified(ctx context.Context, virtualStorage, storage string) (int64, error) + // ListRepositoryPaths retrieves the relative path for all repositories present on the given virtual storage. + ListRepositoryPaths(ctx context.Context, virtualStorage string) ([]string, error) } // PostgresRepositoryStore is a Postgres implementation of RepositoryStore. @@ -916,3 +918,28 @@ func (rs *PostgresRepositoryStore) GetReplicaPath(ctx context.Context, repositor return replicaPath, nil } + +// ListRepositoryPaths retrieves the relative path for all repositories present on the given virtual storage. +func (rs *PostgresRepositoryStore) ListRepositoryPaths(ctx context.Context, virtualStorage string) ([]string, error) { + rows, err := rs.db.QueryContext(ctx, ` +SELECT relative_path +FROM repositories +WHERE virtual_storage = $1 +`, virtualStorage) + if err != nil { + return nil, fmt.Errorf("query: %w", err) + } + defer rows.Close() + + var relativePaths []string + for rows.Next() { + var relativePath string + if err := rows.Scan(&relativePath); err != nil { + return nil, fmt.Errorf("scan: %w", err) + } + + relativePaths = append(relativePaths, relativePath) + } + + return relativePaths, rows.Err() +} diff --git a/internal/praefect/server.go b/internal/praefect/server.go index 54cbee079..c52b160f5 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -195,6 +195,9 @@ func NewGRPCServer( "DeleteObjectPool": DeleteObjectPoolHandler(deps.RepositoryStore, deps.Logger, deps.Conns), "GetObjectPool": GetObjectPoolHandler(deps.RepositoryStore, deps.Router), }) + proxy.RegisterStreamHandlers(srv, "gitaly.InternalGitaly", map[string]grpc.StreamHandler{ + "WalkRepos": WalkReposHandler(deps.RepositoryStore), + }) } return srv diff --git a/internal/praefect/walkrepos.go b/internal/praefect/walkrepos.go new file mode 100644 index 000000000..1f321a4c7 --- /dev/null +++ b/internal/praefect/walkrepos.go @@ -0,0 +1,47 @@ +package praefect + +import ( + "fmt" + + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "google.golang.org/grpc" +) + +// WalkReposHandler implements an interceptor for the WalkRepos RPC, invoked when calling +// through Praefect. Instead of walking the storage directory in the filesystem, this Praefect +// implementation queries the database for all known repositories in the given virtual storage. +// As a consequence, the modification_time parameter can't be populated in the response. +func WalkReposHandler(rs datastore.RepositoryStore) grpc.StreamHandler { + return func(srv interface{}, stream grpc.ServerStream) error { + sendRepo := func(relPath string) error { + return stream.SendMsg(&gitalypb.WalkReposResponse{ + RelativePath: relPath, + }) + } + + var req gitalypb.WalkReposRequest + if err := stream.RecvMsg(&req); err != nil { + return fmt.Errorf("receive request: %w", err) + } + + if req.StorageName == "" { + return structerr.NewInvalidArgument("%w", storage.ErrStorageNotSet) + } + + repos, err := rs.ListRepositoryPaths(stream.Context(), req.StorageName) + if err != nil { + return structerr.NewInternal("list repository paths: %w", err) + } + + for _, repo := range repos { + if err := sendRepo(repo); err != nil { + return structerr.NewInternal("send repository path: %w", err) + } + } + + return nil + } +} diff --git a/internal/praefect/walkrepos_test.go b/internal/praefect/walkrepos_test.go new file mode 100644 index 000000000..63301caf5 --- /dev/null +++ b/internal/praefect/walkrepos_test.go @@ -0,0 +1,88 @@ +package praefect + +import ( + "net" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry" + "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testdb" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +func TestWalkReposHandler(t *testing.T) { + t.Parallel() + + db := testdb.New(t) + for _, tc := range []struct { + desc string + request *gitalypb.WalkReposRequest + responses []*gitalypb.WalkReposResponse + expectedErr error + }{ + { + desc: "missing storage name", + request: &gitalypb.WalkReposRequest{}, + expectedErr: structerr.NewInvalidArgument("%w", storage.ErrStorageNotSet), + }, + { + desc: "repositories found", + request: &gitalypb.WalkReposRequest{StorageName: "virtual-storage"}, + responses: []*gitalypb.WalkReposResponse{ + {RelativePath: "relative-path"}, + {RelativePath: "relative-path-2"}, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + db.TruncateAll(t) + rs := datastore.NewPostgresRepositoryStore(db, map[string][]string{"virtual-storage": {"storage"}}) + ctx := testhelper.Context(t) + + require.NoError(t, rs.CreateRepository(ctx, 0, "virtual-storage", "relative-path", "relative-path", "storage", nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage", "relative-path-2", "relative-path-2", "storage", nil, nil, false, false)) + + tmp := testhelper.TempDir(t) + + ln, err := net.Listen("unix", filepath.Join(tmp, "praefect")) + require.NoError(t, err) + + srv := NewGRPCServer(&Dependencies{ + Config: config.Config{Failover: config.Failover{ElectionStrategy: config.ElectionStrategyPerRepository}}, + Logger: testhelper.SharedLogger(t), + RepositoryStore: rs, + Registry: protoregistry.GitalyProtoPreregistered, + }, nil) + defer srv.Stop() + + go testhelper.MustServe(t, srv, ln) + + clientConn, err := grpc.DialContext(ctx, "unix://"+ln.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + defer testhelper.MustClose(t, clientConn) + + client := gitalypb.NewInternalGitalyClient(clientConn) + + stream, err := client.WalkRepos(ctx, tc.request) + if tc.expectedErr != nil { + // Consume the first message and test for errors only if we're expecting an error. + _, err = stream.Recv() + testhelper.RequireGrpcError(t, tc.expectedErr, err) + return + } + require.NoError(t, err) + + actualRepos, err := testhelper.Receive(stream.Recv) + require.NoError(t, err) + testhelper.ProtoEqual(t, tc.responses, actualRepos) + }) + } +} -- cgit v1.2.3 From 9195adfc06551f312c04b2f21f82c8c3658b9f3c Mon Sep 17 00:00:00 2001 From: James Liu Date: Wed, 10 Jan 2024 13:18:57 +1100 Subject: backup: Remove test exceptions for WAL Now that backups no longer invoke the RemoveAll RPC, we can remove the exemption for these tests when the WAL is enabled. --- internal/cli/gitalybackup/restore_test.go | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/internal/cli/gitalybackup/restore_test.go b/internal/cli/gitalybackup/restore_test.go index 033420d8f..4b1eee9aa 100644 --- a/internal/cli/gitalybackup/restore_test.go +++ b/internal/cli/gitalybackup/restore_test.go @@ -26,13 +26,6 @@ import ( func TestRestoreSubcommand(t *testing.T) { gittest.SkipWithSHA256(t) - testhelper.SkipWithWAL(t, ` -RemoveAll is removing the entire content of the storage. This would also remove the database's and -the transaction manager's disk state. The RPC needs to be updated to shut down all partitions and -the database and only then perform the removal. - -Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) - ctx := testhelper.Context(t) cfg := testcfg.Build(t) @@ -122,13 +115,6 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) func TestRestoreSubcommand_serverSide(t *testing.T) { gittest.SkipWithSHA256(t) - testhelper.SkipWithWAL(t, ` -RemoveAll is removing the entire content of the storage. This would also remove the database's and -the transaction manager's disk state. The RPC needs to be updated to shut down all partitions and -the database and only then perform the removal. - -Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) - ctx := testhelper.Context(t) backupDir := testhelper.TempDir(t) -- cgit v1.2.3