Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorQuang-Minh Nguyen <qmnguyen@gitlab.com>2024-01-17 03:36:41 +0300
committerGitLab <noreply@gitlab.com>2024-01-17 03:36:41 +0300
commit6a9aca19bd90424e5ca436c5e43a6f9593443342 (patch)
tree7790bed2e8cc30326955f328f07b8244fa02480c
parent7eb79ebcb084d4e881777f44ca5055cce6e60ccf (diff)
parent9195adfc06551f312c04b2f21f82c8c3658b9f3c (diff)
Merge branch 'jliu-track-restored-repos-second-attempt' into 'master'
backup: Track repos that have been processed (re-attempt) See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6614 Merged-by: Quang-Minh Nguyen <qmnguyen@gitlab.com> Approved-by: Quang-Minh Nguyen <qmnguyen@gitlab.com> Reviewed-by: Quang-Minh Nguyen <qmnguyen@gitlab.com> Co-authored-by: James Liu <jliu@gitlab.com>
-rw-r--r--internal/backup/backup.go54
-rw-r--r--internal/backup/backup_test.go115
-rw-r--r--internal/backup/pipeline.go43
-rw-r--r--internal/backup/pipeline_test.go66
-rw-r--r--internal/backup/server_side.go56
-rw-r--r--internal/backup/server_side_test.go125
-rw-r--r--internal/cli/gitalybackup/create.go2
-rw-r--r--internal/cli/gitalybackup/restore.go34
-rw-r--r--internal/cli/gitalybackup/restore_test.go79
-rw-r--r--internal/gitaly/service/repository/remove_all_test.go1
-rw-r--r--internal/praefect/datastore/repository_store.go27
-rw-r--r--internal/praefect/remove_all.go1
-rw-r--r--internal/praefect/remove_all_test.go1
-rw-r--r--internal/praefect/server.go3
-rw-r--r--internal/praefect/walkrepos.go47
-rw-r--r--internal/praefect/walkrepos_test.go88
-rw-r--r--proto/go/gitalypb/repository.pb.go52
-rw-r--r--proto/go/gitalypb/repository_grpc.pb.go5
-rw-r--r--proto/repository.proto2
19 files changed, 658 insertions, 143 deletions
diff --git a/internal/backup/backup.go b/internal/backup/backup.go
index 5eed0724a..a15f47abe 100644
--- a/internal/backup/backup.go
+++ b/internal/backup/backup.go
@@ -211,25 +211,56 @@ func NewManagerLocal(
}
}
-// RemoveAllRepositories removes all repositories in the specified storage name.
-func (mgr *Manager) RemoveAllRepositories(ctx context.Context, req *RemoveAllRepositoriesRequest) error {
- if err := setContextServerInfo(ctx, &req.Server, req.StorageName); err != nil {
- return fmt.Errorf("manager: %w", err)
+// RemoveRepository removes the specified repository from its storage.
+func (mgr *Manager) RemoveRepository(ctx context.Context, req *RemoveRepositoryRequest) error {
+ if err := setContextServerInfo(ctx, &req.Server, req.Repo.StorageName); err != nil {
+ return fmt.Errorf("remove repo: set context: %w", err)
}
repoClient, err := mgr.newRepoClient(ctx, req.Server)
if err != nil {
- return fmt.Errorf("manager: %w", err)
+ return fmt.Errorf("remove repo: create client: %w", err)
}
- _, err = repoClient.RemoveAll(ctx, &gitalypb.RemoveAllRequest{StorageName: req.StorageName})
+ _, err = repoClient.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{Repository: req.Repo})
if err != nil {
- return fmt.Errorf("manager: %w", err)
+ return fmt.Errorf("remove repo: remove: %w", err)
}
return nil
}
+// ListRepositories returns a list of repositories found in the given storage.
+func (mgr *Manager) ListRepositories(ctx context.Context, req *ListRepositoriesRequest) (repos []*gitalypb.Repository, err error) {
+ if err := setContextServerInfo(ctx, &req.Server, req.StorageName); err != nil {
+ return nil, fmt.Errorf("list repos: set context: %w", err)
+ }
+
+ internalClient, err := mgr.newInternalClient(ctx, req.Server)
+ if err != nil {
+ return nil, fmt.Errorf("list repos: create client: %w", err)
+ }
+
+ stream, err := internalClient.WalkRepos(ctx, &gitalypb.WalkReposRequest{StorageName: req.StorageName})
+ if err != nil {
+ return nil, fmt.Errorf("list repos: walk: %w", err)
+ }
+
+ for {
+ resp, err := stream.Recv()
+ if errors.Is(err, io.EOF) {
+ break
+ }
+ if err != nil {
+ return nil, fmt.Errorf("list repos: receiving messages: %w", err)
+ }
+
+ repos = append(repos, &gitalypb.Repository{RelativePath: resp.RelativePath, StorageName: req.StorageName})
+ }
+
+ return repos, nil
+}
+
// Create creates a repository backup.
func (mgr *Manager) Create(ctx context.Context, req *CreateRequest) error {
if req.VanityRepository == nil {
@@ -573,3 +604,12 @@ func (mgr *Manager) newRepoClient(ctx context.Context, server storage.ServerInfo
return gitalypb.NewRepositoryServiceClient(conn), nil
}
+
+func (mgr *Manager) newInternalClient(ctx context.Context, server storage.ServerInfo) (gitalypb.InternalGitalyClient, error) {
+ conn, err := mgr.conns.Dial(ctx, server.Address, server.Token)
+ if err != nil {
+ return nil, err
+ }
+
+ return gitalypb.NewInternalGitalyClient(conn), nil
+}
diff --git a/internal/backup/backup_test.go b/internal/backup/backup_test.go
index 14341ec6a..81e87a33a 100644
--- a/internal/backup/backup_test.go
+++ b/internal/backup/backup_test.go
@@ -28,13 +28,10 @@ import (
"google.golang.org/protobuf/proto"
)
-func TestManager_RemoveAllRepositories(t *testing.T) {
- testhelper.SkipWithWAL(t, `
-RemoveAll is removing the entire content of the storage. This would also remove the database's and
-the transaction manager's disk state. The RPC needs to be updated to shut down all partitions and
-the database and only then perform the removal.
-
-Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`)
+func TestManager_RemoveRepository(t *testing.T) {
+ if testhelper.IsPraefectEnabled() {
+ t.Skip("local backup manager expects to operate on the local filesystem so cannot operate through praefect")
+ }
t.Parallel()
@@ -58,11 +55,109 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`)
require.NoError(t, err)
fsBackup := backup.NewManager(sink, locator, pool)
- err = fsBackup.RemoveAllRepositories(ctx, &backup.RemoveAllRepositoriesRequest{
- Server: storage.ServerInfo{Address: cfg.SocketPath, Token: cfg.Auth.Token},
- StorageName: repo.StorageName,
+ err = fsBackup.RemoveRepository(ctx, &backup.RemoveRepositoryRequest{
+ Server: storage.ServerInfo{Address: cfg.SocketPath, Token: cfg.Auth.Token},
+ Repo: repo,
})
require.NoError(t, err)
+ require.NoDirExists(t, repoPath)
+
+ // With an invalid repository
+ err = fsBackup.RemoveRepository(ctx, &backup.RemoveRepositoryRequest{
+ Server: storage.ServerInfo{Address: cfg.SocketPath, Token: cfg.Auth.Token},
+ Repo: &gitalypb.Repository{StorageName: "nonexistent", RelativePath: "nonexistent"},
+ })
+
+ require.EqualError(t, err, "remove repo: remove: rpc error: code = InvalidArgument desc = storage name not found")
+}
+
+func TestManager_ListRepositories(t *testing.T) {
+ t.Parallel()
+
+ for _, tc := range []struct {
+ desc string
+ repos map[string][]*gitalypb.Repository
+ }{
+ {
+ desc: "no repos",
+ repos: make(map[string][]*gitalypb.Repository),
+ },
+ {
+ desc: "repos in a single storage",
+ repos: map[string][]*gitalypb.Repository{
+ "storage-1": {
+ {RelativePath: "a", StorageName: "storage-1"},
+ {RelativePath: "b", StorageName: "storage-1"},
+ },
+ },
+ },
+ {
+ desc: "repos in multiple storages",
+ repos: map[string][]*gitalypb.Repository{
+ "storage-1": {
+ {RelativePath: "a", StorageName: "storage-1"},
+ {RelativePath: "b", StorageName: "storage-1"},
+ },
+ "storage-2": {
+ {RelativePath: "c", StorageName: "storage-2"},
+ {RelativePath: "d", StorageName: "storage-2"},
+ },
+ },
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ if testhelper.IsPraefectEnabled() {
+ t.Skip("local backup manager expects to operate on the local filesystem so cannot operate through praefect")
+ }
+
+ var storages []string
+ for storageName := range tc.repos {
+ storages = append(storages, storageName)
+ }
+
+ // We don't really need a "default" storage, but this makes initialisation cleaner since
+ // WithStorages() takes at least one argument.
+ cfg := testcfg.Build(t, testcfg.WithStorages("default", storages...))
+ cfg.SocketPath = testserver.RunGitalyServer(t, cfg, setup.RegisterAll)
+
+ ctx := testhelper.Context(t)
+
+ for storageName, repos := range tc.repos {
+ for _, repo := range repos {
+ storagePath, ok := cfg.StoragePath(storageName)
+ require.True(t, ok)
+
+ _, _ = gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{
+ SkipCreationViaService: true,
+ RelativePath: repo.RelativePath,
+ Storage: config.Storage{Name: storageName, Path: storagePath},
+ })
+ }
+ }
+
+ pool := client.NewPool()
+ defer testhelper.MustClose(t, pool)
+
+ backupRoot := testhelper.TempDir(t)
+ sink := backup.NewFilesystemSink(backupRoot)
+ defer testhelper.MustClose(t, sink)
+
+ locator, err := backup.ResolveLocator("pointer", sink)
+ require.NoError(t, err)
+
+ fsBackup := backup.NewManager(sink, locator, pool)
+
+ for storageName, repos := range tc.repos {
+ actualRepos, err := fsBackup.ListRepositories(ctx, &backup.ListRepositoriesRequest{
+ Server: storage.ServerInfo{Address: cfg.SocketPath, Token: cfg.Auth.Token},
+ StorageName: storageName,
+ })
+
+ require.NoError(t, err)
+ require.EqualValues(t, repos, actualRepos)
+ }
+ })
+ }
}
func TestManager_Create(t *testing.T) {
diff --git a/internal/backup/pipeline.go b/internal/backup/pipeline.go
index 5672493ac..feb0eaf96 100644
--- a/internal/backup/pipeline.go
+++ b/internal/backup/pipeline.go
@@ -12,11 +12,16 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
)
+// repositoryKey uniquely identifies a repository, and is used here to key the
+// map of processed repos.
+type repositoryKey string
+
// Strategy used to create/restore backups
type Strategy interface {
Create(context.Context, *CreateRequest) error
Restore(context.Context, *RestoreRequest) error
- RemoveAllRepositories(context.Context, *RemoveAllRepositoriesRequest) error
+ ListRepositories(context.Context, *ListRepositoriesRequest) ([]*gitalypb.Repository, error)
+ RemoveRepository(context.Context, *RemoveRepositoryRequest) error
}
// CreateRequest is the request to create a backup
@@ -52,9 +57,14 @@ type RestoreRequest struct {
BackupID string
}
-// RemoveAllRepositoriesRequest is the request to remove all repositories in the specified
-// storage name.
-type RemoveAllRepositoriesRequest struct {
+// RemoveRepositoryRequest is a request to remove an individual repository from its storage.
+type RemoveRepositoryRequest struct {
+ Server storage.ServerInfo
+ Repo *gitalypb.Repository
+}
+
+// ListRepositoriesRequest is the request to list repositories in a given storage.
+type ListRepositoriesRequest struct {
Server storage.ServerInfo
StorageName string
}
@@ -181,6 +191,9 @@ type Pipeline struct {
pipelineError error
cmdErrors *commandErrors
+
+ processedRepos map[string]map[repositoryKey]struct{}
+ processedReposMu sync.Mutex
}
// NewPipeline creates a pipeline that executes backup and restore jobs.
@@ -195,6 +208,7 @@ func NewPipeline(log log.Logger, opts ...PipelineOption) (*Pipeline, error) {
done: make(chan struct{}),
workersByStorage: make(map[string]chan *contextCommand),
cmdErrors: &commandErrors{},
+ processedRepos: make(map[string]map[repositoryKey]struct{}),
}
for _, opt := range opts {
@@ -252,19 +266,19 @@ func (p *Pipeline) Handle(ctx context.Context, cmd Command) {
}
// Done waits for any in progress jobs to complete then reports any accumulated errors
-func (p *Pipeline) Done() error {
+func (p *Pipeline) Done() (processedRepos map[string]map[repositoryKey]struct{}, err error) {
close(p.done)
p.workerWg.Wait()
if p.pipelineError != nil {
- return fmt.Errorf("pipeline: %w", p.pipelineError)
+ return nil, fmt.Errorf("pipeline: %w", p.pipelineError)
}
if len(p.cmdErrors.errs) > 0 {
- return fmt.Errorf("pipeline: %w", p.cmdErrors)
+ return nil, fmt.Errorf("pipeline: %w", p.cmdErrors)
}
- return nil
+ return p.processedRepos, nil
}
// getWorker finds the channel associated with a storage. When no channel is
@@ -325,6 +339,14 @@ func (p *Pipeline) processCommand(ctx context.Context, cmd Command) {
return
}
+ storageName := cmd.Repository().StorageName
+ p.processedReposMu.Lock()
+ if _, ok := p.processedRepos[storageName]; !ok {
+ p.processedRepos[storageName] = make(map[repositoryKey]struct{})
+ }
+ p.processedRepos[storageName][NewRepositoryKey(cmd.Repository())] = struct{}{}
+ p.processedReposMu.Unlock()
+
log.Info(fmt.Sprintf("completed %s", cmd.Name()))
}
@@ -363,3 +385,8 @@ func (p *Pipeline) releaseWorkerSlot() {
}
<-p.totalWorkers
}
+
+// NewRepositoryKey returns a unique identifier for the provided repo.
+func NewRepositoryKey(repo *gitalypb.Repository) repositoryKey {
+ return repositoryKey(repo.StorageName + "-" + repo.RelativePath)
+}
diff --git a/internal/backup/pipeline_test.go b/internal/backup/pipeline_test.go
index 04c539a5f..9d05eae74 100644
--- a/internal/backup/pipeline_test.go
+++ b/internal/backup/pipeline_test.go
@@ -98,7 +98,8 @@ func TestPipeline(t *testing.T) {
p.Handle(ctx, NewCreateCommand(strategy, CreateRequest{Repository: &gitalypb.Repository{StorageName: "storage1"}}))
p.Handle(ctx, NewCreateCommand(strategy, CreateRequest{Repository: &gitalypb.Repository{StorageName: "storage2"}}))
}
- require.NoError(t, p.Done())
+ _, err = p.Done()
+ require.NoError(t, err)
})
}
})
@@ -115,14 +116,16 @@ func TestPipeline(t *testing.T) {
p.Handle(ctx, NewCreateCommand(strategy, CreateRequest{Repository: &gitalypb.Repository{StorageName: "default"}}))
- require.EqualError(t, p.Done(), "pipeline: context canceled")
+ _, err = p.Done()
+ require.EqualError(t, err, "pipeline: context canceled")
})
}
type MockStrategy struct {
- CreateFunc func(context.Context, *CreateRequest) error
- RestoreFunc func(context.Context, *RestoreRequest) error
- RemoveAllRepositoriesFunc func(context.Context, *RemoveAllRepositoriesRequest) error
+ CreateFunc func(context.Context, *CreateRequest) error
+ RestoreFunc func(context.Context, *RestoreRequest) error
+ RemoveRepositoryFunc func(context.Context, *RemoveRepositoryRequest) error
+ ListRepositoriesFunc func(context.Context, *ListRepositoriesRequest) ([]*gitalypb.Repository, error)
}
func (s MockStrategy) Create(ctx context.Context, req *CreateRequest) error {
@@ -139,13 +142,20 @@ func (s MockStrategy) Restore(ctx context.Context, req *RestoreRequest) error {
return nil
}
-func (s MockStrategy) RemoveAllRepositories(ctx context.Context, req *RemoveAllRepositoriesRequest) error {
- if s.RemoveAllRepositoriesFunc != nil {
- return s.RemoveAllRepositoriesFunc(ctx, req)
+func (s MockStrategy) RemoveRepository(ctx context.Context, req *RemoveRepositoryRequest) error {
+ if s.RemoveRepositoryFunc != nil {
+ return s.RemoveRepositoryFunc(ctx, req)
}
return nil
}
+func (s MockStrategy) ListRepositories(ctx context.Context, req *ListRepositoriesRequest) ([]*gitalypb.Repository, error) {
+ if s.ListRepositoriesFunc != nil {
+ return s.ListRepositoriesFunc(ctx, req)
+ }
+ return nil, nil
+}
+
func testPipeline(t *testing.T, init func() *Pipeline) {
strategy := MockStrategy{
CreateFunc: func(_ context.Context, req *CreateRequest) error {
@@ -222,7 +232,7 @@ func testPipeline(t *testing.T, init func() *Pipeline) {
require.Equal(t, tc.level, logEntry.Level)
}
- err := p.Done()
+ _, err := p.Done()
if tc.level == logrus.ErrorLevel {
require.EqualError(t, err, "pipeline: 1 failures encountered:\n - c.git: assert.AnError general error for testing\n")
@@ -258,7 +268,7 @@ func testPipeline(t *testing.T, init func() *Pipeline) {
for _, cmd := range commands {
p.Handle(ctx, cmd)
}
- err := p.Done()
+ _, err := p.Done()
require.EqualError(t, err, "pipeline: 1 failures encountered:\n - c.git: assert.AnError general error for testing\n")
})
}
@@ -309,3 +319,39 @@ func TestPipelineError(t *testing.T) {
})
}
}
+
+func TestPipelineProcessedRepos(t *testing.T) {
+ strategy := MockStrategy{}
+
+ repos := []*gitalypb.Repository{
+ {RelativePath: "a.git", StorageName: "storage1"},
+ {RelativePath: "b.git", StorageName: "storage1"},
+ {RelativePath: "c.git", StorageName: "storage2"},
+ {RelativePath: "d.git", StorageName: "storage3"},
+ }
+
+ expectedProcessedRepos := map[string]map[repositoryKey]struct{}{
+ "storage1": {
+ "storage1-a.git": {},
+ "storage1-b.git": {},
+ },
+ "storage2": {
+ "storage2-c.git": {},
+ },
+ "storage3": {
+ "storage3-d.git": {},
+ },
+ }
+
+ p, err := NewPipeline(testhelper.SharedLogger(t))
+ require.NoError(t, err)
+
+ ctx := testhelper.Context(t)
+ for _, repo := range repos {
+ p.Handle(ctx, NewRestoreCommand(strategy, RestoreRequest{Repository: repo}))
+ }
+
+ processedRepos, err := p.Done()
+ require.NoError(t, err)
+ require.EqualValues(t, expectedProcessedRepos, processedRepos)
+}
diff --git a/internal/backup/server_side.go b/internal/backup/server_side.go
index 35654f215..a1a9a37eb 100644
--- a/internal/backup/server_side.go
+++ b/internal/backup/server_side.go
@@ -2,7 +2,9 @@ package backup
import (
"context"
+ "errors"
"fmt"
+ "io"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
@@ -93,25 +95,56 @@ func (ss ServerSideAdapter) Restore(ctx context.Context, req *RestoreRequest) er
return nil
}
-// RemoveAllRepositories removes all repositories in the specified storage name.
-func (ss ServerSideAdapter) RemoveAllRepositories(ctx context.Context, req *RemoveAllRepositoriesRequest) error {
- if err := setContextServerInfo(ctx, &req.Server, req.StorageName); err != nil {
- return fmt.Errorf("server-side remove all: %w", err)
+// RemoveRepository removes the specified repository from its storage.
+func (ss ServerSideAdapter) RemoveRepository(ctx context.Context, req *RemoveRepositoryRequest) error {
+ if err := setContextServerInfo(ctx, &req.Server, req.Repo.StorageName); err != nil {
+ return fmt.Errorf("server-side remove repo: set context: %w", err)
}
repoClient, err := ss.newRepoClient(ctx, req.Server)
if err != nil {
- return fmt.Errorf("server-side remove all: %w", err)
+ return fmt.Errorf("server-side remove repo: create client: %w", err)
}
- _, err = repoClient.RemoveAll(ctx, &gitalypb.RemoveAllRequest{StorageName: req.StorageName})
+ _, err = repoClient.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{Repository: req.Repo})
if err != nil {
- return fmt.Errorf("server-side remove all: %w", err)
+ return fmt.Errorf("server-side remove repo: remove: %w", err)
}
return nil
}
+// ListRepositories returns a list of repositories found in the given storage.
+func (ss ServerSideAdapter) ListRepositories(ctx context.Context, req *ListRepositoriesRequest) (repos []*gitalypb.Repository, err error) {
+ if err := setContextServerInfo(ctx, &req.Server, req.StorageName); err != nil {
+ return nil, fmt.Errorf("server-side list repos: set context: %w", err)
+ }
+
+ internalClient, err := ss.newInternalClient(ctx, req.Server)
+ if err != nil {
+ return nil, fmt.Errorf("server-side list repos: create client: %w", err)
+ }
+
+ stream, err := internalClient.WalkRepos(ctx, &gitalypb.WalkReposRequest{StorageName: req.StorageName})
+ if err != nil {
+ return nil, fmt.Errorf("server-side list repos: walk: %w", err)
+ }
+
+ for {
+ resp, err := stream.Recv()
+ if errors.Is(err, io.EOF) {
+ break
+ }
+ if err != nil {
+ return nil, err
+ }
+
+ repos = append(repos, &gitalypb.Repository{RelativePath: resp.RelativePath, StorageName: req.StorageName})
+ }
+
+ return repos, nil
+}
+
func (ss ServerSideAdapter) newRepoClient(ctx context.Context, server storage.ServerInfo) (gitalypb.RepositoryServiceClient, error) {
conn, err := ss.pool.Dial(ctx, server.Address, server.Token)
if err != nil {
@@ -120,3 +153,12 @@ func (ss ServerSideAdapter) newRepoClient(ctx context.Context, server storage.Se
return gitalypb.NewRepositoryServiceClient(conn), nil
}
+
+func (ss ServerSideAdapter) newInternalClient(ctx context.Context, server storage.ServerInfo) (gitalypb.InternalGitalyClient, error) {
+ conn, err := ss.pool.Dial(ctx, server.Address, server.Token)
+ if err != nil {
+ return nil, err
+ }
+
+ return gitalypb.NewInternalGitalyClient(conn), nil
+}
diff --git a/internal/backup/server_side_test.go b/internal/backup/server_side_test.go
index 2acc547c5..69669df60 100644
--- a/internal/backup/server_side_test.go
+++ b/internal/backup/server_side_test.go
@@ -13,9 +13,11 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/setup"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testdb"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testserver"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
)
@@ -257,28 +259,15 @@ func TestServerSideAdapter_Restore(t *testing.T) {
}
}
-func TestServerSideAdapter_RemoveAllRepositories(t *testing.T) {
- testhelper.SkipWithWAL(t, `
-RemoveAll is removing the entire content of the storage. This would also remove the database's and
-the transaction manager's disk state. The RPC needs to be updated to shut down all partitions and
-the database and only then perform the removal.
-
-Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`)
-
+func TestServerSideAdapter_RemoveRepository(t *testing.T) {
t.Parallel()
- backupRoot := testhelper.TempDir(t)
- sink := backup.NewFilesystemSink(backupRoot)
- defer testhelper.MustClose(t, sink)
-
- locator, err := backup.ResolveLocator("pointer", sink)
- require.NoError(t, err)
+ db := testdb.New(t)
+ db.TruncateAll(t)
+ datastore.NewPostgresRepositoryStore(db, map[string][]string{"virtual-storage": {"default"}})
cfg := testcfg.Build(t)
- cfg.SocketPath = testserver.RunGitalyServer(t, cfg, setup.RegisterAll,
- testserver.WithBackupSink(sink),
- testserver.WithBackupLocator(locator),
- )
+ cfg.SocketPath = testserver.RunGitalyServer(t, cfg, setup.RegisterAll)
ctx := testhelper.Context(t)
@@ -289,9 +278,103 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`)
defer testhelper.MustClose(t, pool)
adapter := backup.NewServerSideAdapter(pool)
- err = adapter.RemoveAllRepositories(ctx, &backup.RemoveAllRepositoriesRequest{
- Server: storage.ServerInfo{Address: cfg.SocketPath, Token: cfg.Auth.Token},
- StorageName: repo.StorageName,
+ err := adapter.RemoveRepository(ctx, &backup.RemoveRepositoryRequest{
+ Server: storage.ServerInfo{Address: cfg.SocketPath, Token: cfg.Auth.Token},
+ Repo: repo,
})
require.NoError(t, err)
+ require.NoDirExists(t, repoPath)
+
+ // With an invalid repository
+ err = adapter.RemoveRepository(ctx, &backup.RemoveRepositoryRequest{
+ Server: storage.ServerInfo{Address: cfg.SocketPath, Token: cfg.Auth.Token},
+ Repo: &gitalypb.Repository{StorageName: "default", RelativePath: "nonexistent"},
+ })
+
+ require.EqualError(t, err, "server-side remove repo: remove: rpc error: code = NotFound desc = repository does not exist")
+}
+
+func TestServerSideAdapter_ListRepositories(t *testing.T) {
+ t.Parallel()
+
+ for _, tc := range []struct {
+ desc string
+ repos map[string][]*gitalypb.Repository
+ }{
+ {
+ desc: "no repos",
+ repos: make(map[string][]*gitalypb.Repository),
+ },
+ {
+ desc: "repos in a single storage",
+ repos: map[string][]*gitalypb.Repository{
+ "storage-1": {
+ {RelativePath: "a", StorageName: "storage-1"},
+ {RelativePath: "b", StorageName: "storage-1"},
+ },
+ },
+ },
+ {
+ desc: "repos in multiple storages",
+ repos: map[string][]*gitalypb.Repository{
+ "storage-1": {
+ {RelativePath: "a", StorageName: "storage-1"},
+ {RelativePath: "b", StorageName: "storage-1"},
+ },
+ "storage-2": {
+ {RelativePath: "c", StorageName: "storage-2"},
+ {RelativePath: "d", StorageName: "storage-2"},
+ },
+ },
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ var storages []string
+ for storageName := range tc.repos {
+ storages = append(storages, storageName)
+ }
+
+ db := testdb.New(t)
+ db.TruncateAll(t)
+ rs := datastore.NewPostgresRepositoryStore(db, map[string][]string{"virtual-storage": storages})
+
+ // We don't really need a "default" storage, but this makes initialisation cleaner since
+ // WithStorages() takes at least one argument.
+ cfg := testcfg.Build(t, testcfg.WithStorages("default", storages...))
+ cfg.SocketPath = testserver.RunGitalyServer(t, cfg, setup.RegisterAll)
+
+ ctx := testhelper.Context(t)
+
+ repoID := 1
+ for storageName, repos := range tc.repos {
+ for _, repo := range repos {
+ storagePath, ok := cfg.StoragePath(storageName)
+ require.True(t, ok)
+
+ _, _ = gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{
+ RelativePath: repo.RelativePath,
+ Storage: config.Storage{Name: storageName, Path: storagePath},
+ })
+
+ require.NoError(t, rs.CreateRepository(ctx, int64(repoID), "virtual-storage", repo.RelativePath, repo.RelativePath, storageName, nil, nil, false, false))
+
+ repoID++
+ }
+ }
+
+ pool := client.NewPool()
+ defer testhelper.MustClose(t, pool)
+
+ adapter := backup.NewServerSideAdapter(pool)
+
+ for storageName, repos := range tc.repos {
+ actualRepos, err := adapter.ListRepositories(ctx, &backup.ListRepositoriesRequest{
+ Server: storage.ServerInfo{Address: cfg.SocketPath, Token: cfg.Auth.Token},
+ StorageName: storageName,
+ })
+ require.NoError(t, err)
+ require.EqualValues(t, repos, actualRepos)
+ }
+ })
+ }
}
diff --git a/internal/cli/gitalybackup/create.go b/internal/cli/gitalybackup/create.go
index 4ad653490..be7e75cdb 100644
--- a/internal/cli/gitalybackup/create.go
+++ b/internal/cli/gitalybackup/create.go
@@ -174,7 +174,7 @@ func (cmd *createSubcommand) run(ctx context.Context, logger log.Logger, stdin i
}))
}
- if err := pipeline.Done(); err != nil {
+ if _, err := pipeline.Done(); err != nil {
return fmt.Errorf("create: %w", err)
}
return nil
diff --git a/internal/cli/gitalybackup/restore.go b/internal/cli/gitalybackup/restore.go
index de9e2cd3d..7a3833f80 100644
--- a/internal/cli/gitalybackup/restore.go
+++ b/internal/cli/gitalybackup/restore.go
@@ -135,15 +135,18 @@ func (cmd *restoreSubcommand) run(ctx context.Context, logger log.Logger, stdin
manager = backup.NewManager(sink, locator, pool)
}
+ // Get the set of existing repositories keyed by storage. We'll later use this to determine any
+ // dangling repos that should be removed.
+ existingRepos := make(map[string][]*gitalypb.Repository)
for _, storageName := range cmd.removeAllRepositories {
- err := manager.RemoveAllRepositories(ctx, &backup.RemoveAllRepositoriesRequest{
+ repos, err := manager.ListRepositories(ctx, &backup.ListRepositoriesRequest{
StorageName: storageName,
})
if err != nil {
- // Treat RemoveAll failures as soft failures until we can determine
- // how often it fails.
- logger.WithError(err).WithField("storage_name", storageName).Warn("failed to remove all repositories")
+ logger.WithError(err).WithField("storage_name", storageName).Warn("failed to list repositories")
}
+
+ existingRepos[storageName] = repos
}
var opts []backup.PipelineOption
@@ -178,8 +181,29 @@ func (cmd *restoreSubcommand) run(ctx context.Context, logger log.Logger, stdin
}))
}
- if err := pipeline.Done(); err != nil {
+ restoredRepos, err := pipeline.Done()
+ if err != nil {
return fmt.Errorf("restore: %w", err)
}
+
+ var removalErrors []error
+ for storageName, repos := range existingRepos {
+ for _, repo := range repos {
+ if _, ok := restoredRepos[storageName][backup.NewRepositoryKey(repo)]; !ok {
+ // If we have dangling repos (those which exist in the storage but
+ // weren't part of the restore), they need to be deleted so the
+ // state of repos in Gitaly matches that in the Rails DB.
+ if err := manager.RemoveRepository(ctx, &backup.RemoveRepositoryRequest{Repo: repo}); err != nil {
+ removalErrors = append(removalErrors, fmt.Errorf("storage_name %q relative_path %q: %w", storageName, repo.RelativePath, err))
+ }
+ }
+ }
+ }
+
+ if len(removalErrors) > 0 {
+ return fmt.Errorf("remove dangling repositories: %d failures encountered: %w",
+ len(removalErrors), errors.Join(removalErrors...))
+ }
+
return nil
}
diff --git a/internal/cli/gitalybackup/restore_test.go b/internal/cli/gitalybackup/restore_test.go
index 4eae52e4c..4b1eee9aa 100644
--- a/internal/cli/gitalybackup/restore_test.go
+++ b/internal/cli/gitalybackup/restore_test.go
@@ -26,13 +26,6 @@ import (
func TestRestoreSubcommand(t *testing.T) {
gittest.SkipWithSHA256(t)
- testhelper.SkipWithWAL(t, `
-RemoveAll is removing the entire content of the storage. This would also remove the database's and
-the transaction manager's disk state. The RPC needs to be updated to shut down all partitions and
-the database and only then perform the removal.
-
-Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`)
-
ctx := testhelper.Context(t)
cfg := testcfg.Build(t)
@@ -40,24 +33,32 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`)
cfg.SocketPath = testserver.RunGitalyServer(t, cfg, setup.RegisterAll)
+ // This is an example of a "dangling" repository (one that was created after a backup was taken) that should be
+ // removed after the backup is restored.
existingRepo, existRepoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{
RelativePath: "existing_repo",
})
gittest.WriteCommit(t, cfg, existRepoPath, gittest.WithBranch(git.DefaultBranch))
- path := testhelper.TempDir(t)
- existingRepoBundlePath := filepath.Join(path, existingRepo.RelativePath+".bundle")
- existingRepoRefPath := filepath.Join(path, existingRepo.RelativePath+".refs")
+ // The backupDir contains the artifacts that would've been created as part of a backup.
+ backupDir := testhelper.TempDir(t)
+ existingRepoBundlePath := filepath.Join(backupDir, existingRepo.RelativePath+".bundle")
+ existingRepoRefPath := filepath.Join(backupDir, existingRepo.RelativePath+".refs")
gittest.Exec(t, cfg, "-C", existRepoPath, "bundle", "create", existingRepoBundlePath, "--all")
require.NoError(t, os.WriteFile(existingRepoRefPath, gittest.Exec(t, cfg, "-C", existRepoPath, "show-ref"), perm.SharedFile))
+ // These repos are the ones being restored, and should exist after the restore.
var repos []*gitalypb.Repository
for i := 0; i < 2; i++ {
- repo := gittest.InitRepoDir(t, cfg.Storages[0].Path, fmt.Sprintf("repo-%d", i))
- repoBundlePath := filepath.Join(path, repo.RelativePath+".bundle")
- repoRefPath := filepath.Join(path, repo.RelativePath+".refs")
+ repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{
+ RelativePath: fmt.Sprintf("repo-%d", i),
+ Storage: cfg.Storages[0],
+ })
+
+ repoBundlePath := filepath.Join(backupDir, repo.RelativePath+".bundle")
testhelper.CopyFile(t, existingRepoBundlePath, repoBundlePath)
+ repoRefPath := filepath.Join(backupDir, repo.RelativePath+".refs")
testhelper.CopyFile(t, existingRepoRefPath, repoRefPath)
repos = append(repos, repo)
}
@@ -75,19 +76,13 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`)
}))
}
- require.NoError(t, encoder.Encode(map[string]string{
- "address": "invalid",
- "token": "invalid",
- "relative_path": "invalid",
- }))
-
ctx = testhelper.MergeIncomingMetadata(ctx, testcfg.GitalyServersMetadataFromCfg(t, cfg))
args := []string{
progname,
"restore",
"--path",
- path,
+ backupDir,
"--parallel",
strconv.Itoa(runtime.NumCPU()),
"--parallel-storage",
@@ -103,15 +98,14 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`)
require.DirExists(t, existRepoPath)
- require.EqualError(t,
- cmd.RunContext(ctx, args),
- "restore: pipeline: 1 failures encountered:\n - invalid: manager: could not dial source: invalid connection string: \"invalid\"\n")
+ require.NoError(t, cmd.RunContext(ctx, args))
require.NoDirExists(t, existRepoPath)
+ // Ensure the repos were restored correctly.
for _, repo := range repos {
repoPath := filepath.Join(cfg.Storages[0].Path, gittest.GetReplicaPath(t, ctx, cfg, repo))
- bundlePath := filepath.Join(path, repo.RelativePath+".bundle")
+ bundlePath := filepath.Join(backupDir, repo.RelativePath+".bundle")
output := gittest.Exec(t, cfg, "-C", repoPath, "bundle", "verify", bundlePath)
require.Contains(t, string(output), "The bundle records a complete history")
@@ -121,17 +115,10 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`)
func TestRestoreSubcommand_serverSide(t *testing.T) {
gittest.SkipWithSHA256(t)
- testhelper.SkipWithWAL(t, `
-RemoveAll is removing the entire content of the storage. This would also remove the database's and
-the transaction manager's disk state. The RPC needs to be updated to shut down all partitions and
-the database and only then perform the removal.
-
-Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`)
-
ctx := testhelper.Context(t)
- path := testhelper.TempDir(t)
- backupSink, err := backup.ResolveSink(ctx, path)
+ backupDir := testhelper.TempDir(t)
+ backupSink, err := backup.ResolveSink(ctx, backupDir)
require.NoError(t, err)
backupLocator, err := backup.ResolveLocator("pointer", backupSink)
@@ -150,18 +137,22 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`)
})
gittest.WriteCommit(t, cfg, existRepoPath, gittest.WithBranch(git.DefaultBranch))
- existingRepoBundlePath := filepath.Join(path, existingRepo.RelativePath+".bundle")
- existingRepoRefPath := filepath.Join(path, existingRepo.RelativePath+".refs")
+ existingRepoBundlePath := filepath.Join(backupDir, existingRepo.RelativePath+".bundle")
+ existingRepoRefPath := filepath.Join(backupDir, existingRepo.RelativePath+".refs")
gittest.Exec(t, cfg, "-C", existRepoPath, "bundle", "create", existingRepoBundlePath, "--all")
require.NoError(t, os.WriteFile(existingRepoRefPath, gittest.Exec(t, cfg, "-C", existRepoPath, "show-ref"), perm.SharedFile))
var repos []*gitalypb.Repository
for i := 0; i < 2; i++ {
- repo := gittest.InitRepoDir(t, cfg.Storages[0].Path, fmt.Sprintf("repo-%d", i))
- repoBundlePath := filepath.Join(path, repo.RelativePath+".bundle")
- repoRefPath := filepath.Join(path, repo.RelativePath+".refs")
+ repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{
+ RelativePath: fmt.Sprintf("repo-%d", i),
+ Storage: cfg.Storages[0],
+ })
+
+ repoBundlePath := filepath.Join(backupDir, repo.RelativePath+".bundle")
testhelper.CopyFile(t, existingRepoBundlePath, repoBundlePath)
+ repoRefPath := filepath.Join(backupDir, repo.RelativePath+".refs")
testhelper.CopyFile(t, existingRepoRefPath, repoRefPath)
repos = append(repos, repo)
}
@@ -179,12 +170,6 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`)
}))
}
- require.NoError(t, encoder.Encode(map[string]string{
- "address": "invalid",
- "token": "invalid",
- "relative_path": "invalid",
- }))
-
ctx = testhelper.MergeIncomingMetadata(ctx, testcfg.GitalyServersMetadataFromCfg(t, cfg))
args := []string{
@@ -207,15 +192,13 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`)
require.DirExists(t, existRepoPath)
- require.EqualError(t,
- cmd.RunContext(ctx, args),
- "restore: pipeline: 1 failures encountered:\n - invalid: server-side restore: could not dial source: invalid connection string: \"invalid\"\n")
+ require.NoError(t, cmd.RunContext(ctx, args))
require.NoDirExists(t, existRepoPath)
for _, repo := range repos {
repoPath := filepath.Join(cfg.Storages[0].Path, gittest.GetReplicaPath(t, ctx, cfg, repo))
- bundlePath := filepath.Join(path, repo.RelativePath+".bundle")
+ bundlePath := filepath.Join(backupDir, repo.RelativePath+".bundle")
output := gittest.Exec(t, cfg, "-C", repoPath, "bundle", "verify", bundlePath)
require.Contains(t, string(output), "The bundle records a complete history")
diff --git a/internal/gitaly/service/repository/remove_all_test.go b/internal/gitaly/service/repository/remove_all_test.go
index 785f54b01..fd913523a 100644
--- a/internal/gitaly/service/repository/remove_all_test.go
+++ b/internal/gitaly/service/repository/remove_all_test.go
@@ -28,6 +28,7 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`)
require.DirExists(t, repo1Path)
require.DirExists(t, repo1Path)
+ //nolint:staticcheck
_, err := client.RemoveAll(ctx, &gitalypb.RemoveAllRequest{
StorageName: cfg.Storages[0].Name,
})
diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go
index 77dce6ba0..4925fcb50 100644
--- a/internal/praefect/datastore/repository_store.go
+++ b/internal/praefect/datastore/repository_store.go
@@ -111,6 +111,8 @@ type RepositoryStore interface {
MarkVirtualStorageUnverified(ctx context.Context, virtualStorage string) (int64, error)
// MarkStorageUnverified marsk all replicas on the storage as unverified.
MarkStorageUnverified(ctx context.Context, virtualStorage, storage string) (int64, error)
+ // ListRepositoryPaths retrieves the relative path for all repositories present on the given virtual storage.
+ ListRepositoryPaths(ctx context.Context, virtualStorage string) ([]string, error)
}
// PostgresRepositoryStore is a Postgres implementation of RepositoryStore.
@@ -916,3 +918,28 @@ func (rs *PostgresRepositoryStore) GetReplicaPath(ctx context.Context, repositor
return replicaPath, nil
}
+
+// ListRepositoryPaths retrieves the relative path for all repositories present on the given virtual storage.
+func (rs *PostgresRepositoryStore) ListRepositoryPaths(ctx context.Context, virtualStorage string) ([]string, error) {
+ rows, err := rs.db.QueryContext(ctx, `
+SELECT relative_path
+FROM repositories
+WHERE virtual_storage = $1
+`, virtualStorage)
+ if err != nil {
+ return nil, fmt.Errorf("query: %w", err)
+ }
+ defer rows.Close()
+
+ var relativePaths []string
+ for rows.Next() {
+ var relativePath string
+ if err := rows.Scan(&relativePath); err != nil {
+ return nil, fmt.Errorf("scan: %w", err)
+ }
+
+ relativePaths = append(relativePaths, relativePath)
+ }
+
+ return relativePaths, rows.Err()
+}
diff --git a/internal/praefect/remove_all.go b/internal/praefect/remove_all.go
index 9fa08206f..9c383cd0e 100644
--- a/internal/praefect/remove_all.go
+++ b/internal/praefect/remove_all.go
@@ -32,6 +32,7 @@ func RemoveAllHandler(rs datastore.RepositoryStore, conns Connections) grpc.Stre
conn := conn
group.Go(func() error {
+ //nolint:staticcheck
_, err := gitalypb.NewRepositoryServiceClient(conn).RemoveAll(ctx, &gitalypb.RemoveAllRequest{
StorageName: rewrittenStorage,
})
diff --git a/internal/praefect/remove_all_test.go b/internal/praefect/remove_all_test.go
index 965ffb95b..6b5e6f848 100644
--- a/internal/praefect/remove_all_test.go
+++ b/internal/praefect/remove_all_test.go
@@ -96,6 +96,7 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`)
_, err = client.RepositorySize(ctx, &gitalypb.RepositorySizeRequest{Repository: &gitalypb.Repository{}})
testhelper.RequireGrpcError(t, errServedByGitaly, err)
+ //nolint:staticcheck
resp, err := client.RemoveAll(ctx, &gitalypb.RemoveAllRequest{StorageName: virtualStorage})
require.NoError(t, err)
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index 54cbee079..c52b160f5 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -195,6 +195,9 @@ func NewGRPCServer(
"DeleteObjectPool": DeleteObjectPoolHandler(deps.RepositoryStore, deps.Logger, deps.Conns),
"GetObjectPool": GetObjectPoolHandler(deps.RepositoryStore, deps.Router),
})
+ proxy.RegisterStreamHandlers(srv, "gitaly.InternalGitaly", map[string]grpc.StreamHandler{
+ "WalkRepos": WalkReposHandler(deps.RepositoryStore),
+ })
}
return srv
diff --git a/internal/praefect/walkrepos.go b/internal/praefect/walkrepos.go
new file mode 100644
index 000000000..1f321a4c7
--- /dev/null
+++ b/internal/praefect/walkrepos.go
@@ -0,0 +1,47 @@
+package praefect
+
+import (
+ "fmt"
+
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
+ "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
+ "google.golang.org/grpc"
+)
+
+// WalkReposHandler implements an interceptor for the WalkRepos RPC, invoked when calling
+// through Praefect. Instead of walking the storage directory in the filesystem, this Praefect
+// implementation queries the database for all known repositories in the given virtual storage.
+// As a consequence, the modification_time parameter can't be populated in the response.
+func WalkReposHandler(rs datastore.RepositoryStore) grpc.StreamHandler {
+ return func(srv interface{}, stream grpc.ServerStream) error {
+ sendRepo := func(relPath string) error {
+ return stream.SendMsg(&gitalypb.WalkReposResponse{
+ RelativePath: relPath,
+ })
+ }
+
+ var req gitalypb.WalkReposRequest
+ if err := stream.RecvMsg(&req); err != nil {
+ return fmt.Errorf("receive request: %w", err)
+ }
+
+ if req.StorageName == "" {
+ return structerr.NewInvalidArgument("%w", storage.ErrStorageNotSet)
+ }
+
+ repos, err := rs.ListRepositoryPaths(stream.Context(), req.StorageName)
+ if err != nil {
+ return structerr.NewInternal("list repository paths: %w", err)
+ }
+
+ for _, repo := range repos {
+ if err := sendRepo(repo); err != nil {
+ return structerr.NewInternal("send repository path: %w", err)
+ }
+ }
+
+ return nil
+ }
+}
diff --git a/internal/praefect/walkrepos_test.go b/internal/praefect/walkrepos_test.go
new file mode 100644
index 000000000..63301caf5
--- /dev/null
+++ b/internal/praefect/walkrepos_test.go
@@ -0,0 +1,88 @@
+package praefect
+
+import (
+ "net"
+ "path/filepath"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testdb"
+ "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+)
+
+func TestWalkReposHandler(t *testing.T) {
+ t.Parallel()
+
+ db := testdb.New(t)
+ for _, tc := range []struct {
+ desc string
+ request *gitalypb.WalkReposRequest
+ responses []*gitalypb.WalkReposResponse
+ expectedErr error
+ }{
+ {
+ desc: "missing storage name",
+ request: &gitalypb.WalkReposRequest{},
+ expectedErr: structerr.NewInvalidArgument("%w", storage.ErrStorageNotSet),
+ },
+ {
+ desc: "repositories found",
+ request: &gitalypb.WalkReposRequest{StorageName: "virtual-storage"},
+ responses: []*gitalypb.WalkReposResponse{
+ {RelativePath: "relative-path"},
+ {RelativePath: "relative-path-2"},
+ },
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ db.TruncateAll(t)
+ rs := datastore.NewPostgresRepositoryStore(db, map[string][]string{"virtual-storage": {"storage"}})
+ ctx := testhelper.Context(t)
+
+ require.NoError(t, rs.CreateRepository(ctx, 0, "virtual-storage", "relative-path", "relative-path", "storage", nil, nil, false, false))
+ require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage", "relative-path-2", "relative-path-2", "storage", nil, nil, false, false))
+
+ tmp := testhelper.TempDir(t)
+
+ ln, err := net.Listen("unix", filepath.Join(tmp, "praefect"))
+ require.NoError(t, err)
+
+ srv := NewGRPCServer(&Dependencies{
+ Config: config.Config{Failover: config.Failover{ElectionStrategy: config.ElectionStrategyPerRepository}},
+ Logger: testhelper.SharedLogger(t),
+ RepositoryStore: rs,
+ Registry: protoregistry.GitalyProtoPreregistered,
+ }, nil)
+ defer srv.Stop()
+
+ go testhelper.MustServe(t, srv, ln)
+
+ clientConn, err := grpc.DialContext(ctx, "unix://"+ln.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
+ require.NoError(t, err)
+ defer testhelper.MustClose(t, clientConn)
+
+ client := gitalypb.NewInternalGitalyClient(clientConn)
+
+ stream, err := client.WalkRepos(ctx, tc.request)
+ if tc.expectedErr != nil {
+ // Consume the first message and test for errors only if we're expecting an error.
+ _, err = stream.Recv()
+ testhelper.RequireGrpcError(t, tc.expectedErr, err)
+ return
+ }
+ require.NoError(t, err)
+
+ actualRepos, err := testhelper.Receive(stream.Recv)
+ require.NoError(t, err)
+ testhelper.ProtoEqual(t, tc.responses, actualRepos)
+ })
+ }
+}
diff --git a/proto/go/gitalypb/repository.pb.go b/proto/go/gitalypb/repository.pb.go
index ca005e2b5..bdfd812ed 100644
--- a/proto/go/gitalypb/repository.pb.go
+++ b/proto/go/gitalypb/repository.pb.go
@@ -6163,7 +6163,7 @@ var file_repository_proto_rawDesc = []byte{
0x1c, 0x0a, 0x09, 0x61, 0x74, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x18, 0x02, 0x20, 0x01,
0x28, 0x09, 0x52, 0x09, 0x61, 0x74, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x12, 0x14, 0x0a,
0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61,
- 0x6c, 0x75, 0x65, 0x32, 0xaf, 0x20, 0x0a, 0x11, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f,
+ 0x6c, 0x75, 0x65, 0x32, 0xb2, 0x20, 0x0a, 0x11, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f,
0x72, 0x79, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x5d, 0x0a, 0x10, 0x52, 0x65, 0x70,
0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x45, 0x78, 0x69, 0x73, 0x74, 0x73, 0x12, 0x1f, 0x2e,
0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72,
@@ -6399,34 +6399,34 @@ var file_repository_proto_rawDesc = []byte{
0x79, 0x2e, 0x46, 0x75, 0x6c, 0x6c, 0x50, 0x61, 0x74, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x1a, 0x18, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x46, 0x75, 0x6c, 0x6c, 0x50,
0x61, 0x74, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x09, 0xfa, 0x97, 0x28,
- 0x02, 0x08, 0x02, 0x88, 0x02, 0x01, 0x12, 0x4a, 0x0a, 0x09, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65,
+ 0x02, 0x08, 0x02, 0x88, 0x02, 0x01, 0x12, 0x4d, 0x0a, 0x09, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65,
0x41, 0x6c, 0x6c, 0x12, 0x18, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x6d,
0x6f, 0x76, 0x65, 0x41, 0x6c, 0x6c, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e,
0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x41, 0x6c, 0x6c,
- 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x08, 0xfa, 0x97, 0x28, 0x04, 0x08, 0x01,
- 0x10, 0x02, 0x12, 0x5d, 0x0a, 0x10, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x70, 0x6f,
- 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x1f, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e,
- 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79,
- 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79,
- 0x2e, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72,
- 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x06, 0xfa, 0x97, 0x28, 0x02, 0x08,
- 0x02, 0x12, 0x60, 0x0a, 0x11, 0x52, 0x65, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x52, 0x65, 0x70, 0x6f,
- 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x20, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e,
- 0x52, 0x65, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72,
- 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x21, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c,
- 0x79, 0x2e, 0x52, 0x65, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74,
- 0x6f, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x06, 0xfa, 0x97, 0x28,
- 0x02, 0x08, 0x01, 0x12, 0x60, 0x0a, 0x11, 0x47, 0x65, 0x74, 0x46, 0x69, 0x6c, 0x65, 0x41, 0x74,
- 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x73, 0x12, 0x20, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c,
- 0x79, 0x2e, 0x47, 0x65, 0x74, 0x46, 0x69, 0x6c, 0x65, 0x41, 0x74, 0x74, 0x72, 0x69, 0x62, 0x75,
- 0x74, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x21, 0x2e, 0x67, 0x69, 0x74,
- 0x61, 0x6c, 0x79, 0x2e, 0x47, 0x65, 0x74, 0x46, 0x69, 0x6c, 0x65, 0x41, 0x74, 0x74, 0x72, 0x69,
- 0x62, 0x75, 0x74, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x06, 0xfa,
- 0x97, 0x28, 0x02, 0x08, 0x02, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2e,
- 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2d, 0x6f, 0x72, 0x67, 0x2f, 0x67,
- 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x36, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f,
- 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f,
- 0x74, 0x6f, 0x33,
+ 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x0b, 0xfa, 0x97, 0x28, 0x04, 0x08, 0x01,
+ 0x10, 0x02, 0x88, 0x02, 0x01, 0x12, 0x5d, 0x0a, 0x10, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52,
+ 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x1f, 0x2e, 0x67, 0x69, 0x74, 0x61,
+ 0x6c, 0x79, 0x2e, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74,
+ 0x6f, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x67, 0x69, 0x74,
+ 0x61, 0x6c, 0x79, 0x2e, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69,
+ 0x74, 0x6f, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x06, 0xfa, 0x97,
+ 0x28, 0x02, 0x08, 0x02, 0x12, 0x60, 0x0a, 0x11, 0x52, 0x65, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x52,
+ 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x20, 0x2e, 0x67, 0x69, 0x74, 0x61,
+ 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69,
+ 0x74, 0x6f, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x21, 0x2e, 0x67, 0x69,
+ 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x52, 0x65, 0x70, 0x6f,
+ 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x06,
+ 0xfa, 0x97, 0x28, 0x02, 0x08, 0x01, 0x12, 0x60, 0x0a, 0x11, 0x47, 0x65, 0x74, 0x46, 0x69, 0x6c,
+ 0x65, 0x41, 0x74, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x73, 0x12, 0x20, 0x2e, 0x67, 0x69,
+ 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x47, 0x65, 0x74, 0x46, 0x69, 0x6c, 0x65, 0x41, 0x74, 0x74, 0x72,
+ 0x69, 0x62, 0x75, 0x74, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x21, 0x2e,
+ 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x47, 0x65, 0x74, 0x46, 0x69, 0x6c, 0x65, 0x41, 0x74,
+ 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
+ 0x22, 0x06, 0xfa, 0x97, 0x28, 0x02, 0x08, 0x02, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x6c,
+ 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2d, 0x6f, 0x72,
+ 0x67, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x36, 0x2f, 0x70, 0x72, 0x6f,
+ 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, 0x62, 0x62, 0x06,
+ 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
diff --git a/proto/go/gitalypb/repository_grpc.pb.go b/proto/go/gitalypb/repository_grpc.pb.go
index d5af55b94..b407b948c 100644
--- a/proto/go/gitalypb/repository_grpc.pb.go
+++ b/proto/go/gitalypb/repository_grpc.pb.go
@@ -192,7 +192,9 @@ type RepositoryServiceClient interface {
// FullPath reads the "gitlab.fullpath" configuration from the repository's
// gitconfig. Returns an error in case the full path has not been configured.
FullPath(ctx context.Context, in *FullPathRequest, opts ...grpc.CallOption) (*FullPathResponse, error)
+ // Deprecated: Do not use.
// RemoveAll deletes all repositories on a specified storage.
+ // Deprecated in favour of individually removing repositories with RemoveRepository.
RemoveAll(ctx context.Context, in *RemoveAllRequest, opts ...grpc.CallOption) (*RemoveAllResponse, error)
// BackupRepository creates a full or incremental backup streamed directly to
// object-storage. The backup is created synchronously. The destination must
@@ -957,6 +959,7 @@ func (c *repositoryServiceClient) FullPath(ctx context.Context, in *FullPathRequ
return out, nil
}
+// Deprecated: Do not use.
func (c *repositoryServiceClient) RemoveAll(ctx context.Context, in *RemoveAllRequest, opts ...grpc.CallOption) (*RemoveAllResponse, error) {
out := new(RemoveAllResponse)
err := c.cc.Invoke(ctx, "/gitaly.RepositoryService/RemoveAll", in, out, opts...)
@@ -1167,7 +1170,9 @@ type RepositoryServiceServer interface {
// FullPath reads the "gitlab.fullpath" configuration from the repository's
// gitconfig. Returns an error in case the full path has not been configured.
FullPath(context.Context, *FullPathRequest) (*FullPathResponse, error)
+ // Deprecated: Do not use.
// RemoveAll deletes all repositories on a specified storage.
+ // Deprecated in favour of individually removing repositories with RemoveRepository.
RemoveAll(context.Context, *RemoveAllRequest) (*RemoveAllResponse, error)
// BackupRepository creates a full or incremental backup streamed directly to
// object-storage. The backup is created synchronously. The destination must
diff --git a/proto/repository.proto b/proto/repository.proto
index 25d446f61..4a5d8dbf8 100644
--- a/proto/repository.proto
+++ b/proto/repository.proto
@@ -381,11 +381,13 @@ service RepositoryService {
}
// RemoveAll deletes all repositories on a specified storage.
+ // Deprecated in favour of individually removing repositories with RemoveRepository.
rpc RemoveAll(RemoveAllRequest) returns (RemoveAllResponse) {
option (op_type) = {
op: MUTATOR
scope_level: STORAGE
};
+ option deprecated = true;
}
// BackupRepository creates a full or incremental backup streamed directly to