Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cmd/gitaly-backup/restore_test.go2
-rw-r--r--internal/backup/backup.go130
-rw-r--r--internal/backup/backup_test.go461
-rw-r--r--internal/backup/repository.go157
-rw-r--r--internal/gitaly/service/internalgitaly/backup_repos.go7
-rw-r--r--internal/gitaly/service/internalgitaly/backup_repos_test.go4
-rw-r--r--internal/gitaly/service/internalgitaly/server.go4
-rw-r--r--internal/gitaly/service/internalgitaly/walkrepos_test.go4
-rw-r--r--internal/gitaly/service/setup/register.go1
9 files changed, 457 insertions, 313 deletions
diff --git a/cmd/gitaly-backup/restore_test.go b/cmd/gitaly-backup/restore_test.go
index 037078310..3492c6c15 100644
--- a/cmd/gitaly-backup/restore_test.go
+++ b/cmd/gitaly-backup/restore_test.go
@@ -76,7 +76,7 @@ func TestRestoreSubcommand(t *testing.T) {
require.NoError(t, fs.Parse([]string{"-path", path, "-remove-all-repositories", existingRepo.StorageName}))
require.EqualError(t,
cmd.Run(ctx, &stdin, io.Discard),
- "restore: pipeline: 1 failures encountered:\n - invalid: manager: remove repository: could not dial source: invalid connection string: \"invalid\"\n")
+ "restore: pipeline: 1 failures encountered:\n - invalid: manager: could not dial source: invalid connection string: \"invalid\"\n")
require.NoDirExists(t, existRepoPath)
diff --git a/internal/backup/backup.go b/internal/backup/backup.go
index 60fa0cd65..85e9a5055 100644
--- a/internal/backup/backup.go
+++ b/internal/backup/backup.go
@@ -12,10 +12,8 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
- "gitlab.com/gitlab-org/gitaly/v16/streamio"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)
@@ -83,6 +81,16 @@ type Repository interface {
GetCustomHooks(ctx context.Context, out io.Writer) error
// CreateBundle fetches a bundle that contains refs matching patterns.
CreateBundle(ctx context.Context, out io.Writer, patterns io.Reader) error
+ // Remove removes the repository. Does not return an error if the
+ // repository cannot be found.
+ Remove(ctx context.Context) error
+ // Create creates the repository.
+ Create(ctx context.Context) error
+ // FetchBundle fetches references from a bundle. Refs will be mirrored to
+ // the repository.
+ FetchBundle(ctx context.Context, reader io.Reader) error
+ // SetCustomHooks updates the custom hooks for the repository.
+ SetCustomHooks(ctx context.Context, reader io.Reader) error
}
// ResolveLocator returns a locator implementation based on a locator identifier.
@@ -125,6 +133,10 @@ func NewManager(sink Sink, locator Locator, pool *client.Pool, backupID string)
locator: locator,
backupID: backupID,
repositoryFactory: func(ctx context.Context, repo *gitalypb.Repository, server storage.ServerInfo) (Repository, error) {
+ if err := setContextServerInfo(ctx, &server, repo.GetStorageName()); err != nil {
+ return nil, err
+ }
+
conn, err := pool.Dial(ctx, server.Address, server.Token)
if err != nil {
return nil, err
@@ -136,7 +148,15 @@ func NewManager(sink Sink, locator Locator, pool *client.Pool, backupID string)
}
// NewManagerLocal creates and returns a *Manager instance for operating on local repositories.
-func NewManagerLocal(sink Sink, locator Locator, storageLocator storage.Locator, gitCmdFactory git.CommandFactory, catfileCache catfile.Cache, backupID string) *Manager {
+func NewManagerLocal(
+ sink Sink,
+ locator Locator,
+ storageLocator storage.Locator,
+ gitCmdFactory git.CommandFactory,
+ catfileCache catfile.Cache,
+ txManager transaction.Manager,
+ backupID string,
+) *Manager {
return &Manager{
sink: sink,
conns: nil, // Will be removed once the restore operations are part of the Repository interface.
@@ -145,7 +165,7 @@ func NewManagerLocal(sink Sink, locator Locator, storageLocator storage.Locator,
repositoryFactory: func(ctx context.Context, repo *gitalypb.Repository, server storage.ServerInfo) (Repository, error) {
localRepo := localrepo.New(storageLocator, gitCmdFactory, catfileCache, repo)
- return newLocalRepository(storageLocator, localRepo), nil
+ return newLocalRepository(storageLocator, gitCmdFactory, txManager, localRepo), nil
},
}
}
@@ -185,10 +205,6 @@ type CreateRequest struct {
// Create creates a repository backup.
func (mgr *Manager) Create(ctx context.Context, req *CreateRequest) error {
- if err := setContextServerInfo(ctx, &req.Server, req.Repository.GetStorageName()); err != nil {
- return fmt.Errorf("manager: %w", err)
- }
-
repo, err := mgr.repositoryFactory(ctx, req.Repository, req.Server)
if err != nil {
return fmt.Errorf("manager: %w", err)
@@ -241,11 +257,12 @@ type RestoreRequest struct {
// Restore restores a repository from a backup.
func (mgr *Manager) Restore(ctx context.Context, req *RestoreRequest) error {
- if err := setContextServerInfo(ctx, &req.Server, req.Repository.GetStorageName()); err != nil {
+ repo, err := mgr.repositoryFactory(ctx, req.Repository, req.Server)
+ if err != nil {
return fmt.Errorf("manager: %w", err)
}
- if err := mgr.removeRepository(ctx, req.Server, req.Repository); err != nil {
+ if err := repo.Remove(ctx); err != nil {
return fmt.Errorf("manager: %w", err)
}
@@ -254,12 +271,12 @@ func (mgr *Manager) Restore(ctx context.Context, req *RestoreRequest) error {
return fmt.Errorf("manager: %w", err)
}
- if err := mgr.createRepository(ctx, req.Server, req.Repository); err != nil {
+ if err := repo.Create(ctx); err != nil {
return fmt.Errorf("manager: %w", err)
}
for _, step := range backup.Steps {
- if err := mgr.restoreBundle(ctx, step.BundlePath, req.Server, req.Repository); err != nil {
+ if err := mgr.restoreBundle(ctx, repo, step.BundlePath); err != nil {
if step.SkippableOnNotFound && errors.Is(err, ErrDoesntExist) {
// For compatibility with existing backups we need to make sure the
// repository exists even if there's no bundle for project
@@ -271,14 +288,14 @@ func (mgr *Manager) Restore(ctx context.Context, req *RestoreRequest) error {
return nil
}
- if err := mgr.removeRepository(ctx, req.Server, req.Repository); err != nil {
+ if err := repo.Remove(ctx); err != nil {
return fmt.Errorf("manager: remove on skipped: %w", err)
}
return fmt.Errorf("manager: %w: %s", ErrSkipped, err.Error())
}
}
- if err := mgr.restoreCustomHooks(ctx, step.CustomHooksPath, req.Server, req.Repository); err != nil {
+ if err := mgr.restoreCustomHooks(ctx, repo, step.CustomHooksPath); err != nil {
return fmt.Errorf("manager: %w", err)
}
}
@@ -300,32 +317,6 @@ func setContextServerInfo(ctx context.Context, server *storage.ServerInfo, stora
return nil
}
-func (mgr *Manager) removeRepository(ctx context.Context, server storage.ServerInfo, repo *gitalypb.Repository) error {
- repoClient, err := mgr.newRepoClient(ctx, server)
- if err != nil {
- return fmt.Errorf("remove repository: %w", err)
- }
- _, err = repoClient.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{Repository: repo})
- switch {
- case status.Code(err) == codes.NotFound:
- return nil
- case err != nil:
- return fmt.Errorf("remove repository: %w", err)
- }
- return nil
-}
-
-func (mgr *Manager) createRepository(ctx context.Context, server storage.ServerInfo, repo *gitalypb.Repository) error {
- repoClient, err := mgr.newRepoClient(ctx, server)
- if err != nil {
- return fmt.Errorf("create repository: %w", err)
- }
- if _, err := repoClient.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{Repository: repo}); err != nil {
- return fmt.Errorf("create repository: %w", err)
- }
- return nil
-}
-
func (mgr *Manager) writeBundle(ctx context.Context, repo Repository, step *Step, refs []git.Reference) (returnErr error) {
negatedRefs, err := mgr.negatedKnownRefs(ctx, step)
if err != nil {
@@ -434,37 +425,14 @@ func (s *createBundleFromRefListSender) Send() error {
return s.stream.Send(&s.chunk)
}
-func (mgr *Manager) restoreBundle(ctx context.Context, path string, server storage.ServerInfo, repo *gitalypb.Repository) error {
+func (mgr *Manager) restoreBundle(ctx context.Context, repo Repository, path string) error {
reader, err := mgr.sink.GetReader(ctx, path)
if err != nil {
- return fmt.Errorf("restore bundle: %w", err)
- }
- defer reader.Close()
-
- repoClient, err := mgr.newRepoClient(ctx, server)
- if err != nil {
return fmt.Errorf("restore bundle: %q: %w", path, err)
}
- stream, err := repoClient.FetchBundle(ctx)
- if err != nil {
- return fmt.Errorf("restore bundle: %q: %w", path, err)
- }
- request := &gitalypb.FetchBundleRequest{Repository: repo, UpdateHead: true}
- bundle := streamio.NewWriter(func(p []byte) error {
- request.Data = p
- if err := stream.Send(request); err != nil {
- return err
- }
-
- // Only set `Repository` on the first `Send` of the stream
- request = &gitalypb.FetchBundleRequest{}
+ defer reader.Close()
- return nil
- })
- if _, err := io.Copy(bundle, reader); err != nil {
- return fmt.Errorf("restore bundle: %q: %w", path, err)
- }
- if _, err = stream.CloseAndRecv(); err != nil {
+ if err := repo.FetchBundle(ctx, reader); err != nil {
return fmt.Errorf("restore bundle: %q: %w", path, err)
}
return nil
@@ -485,7 +453,7 @@ func (mgr *Manager) writeCustomHooks(ctx context.Context, repo Repository, path
return nil
}
-func (mgr *Manager) restoreCustomHooks(ctx context.Context, path string, server storage.ServerInfo, repo *gitalypb.Repository) error {
+func (mgr *Manager) restoreCustomHooks(ctx context.Context, repo Repository, path string) error {
reader, err := mgr.sink.GetReader(ctx, path)
if err != nil {
if errors.Is(err, ErrDoesntExist) {
@@ -495,31 +463,7 @@ func (mgr *Manager) restoreCustomHooks(ctx context.Context, path string, server
}
defer reader.Close()
- repoClient, err := mgr.newRepoClient(ctx, server)
- if err != nil {
- return fmt.Errorf("restore custom hooks, %q: %w", path, err)
- }
- stream, err := repoClient.SetCustomHooks(ctx)
- if err != nil {
- return fmt.Errorf("restore custom hooks, %q: %w", path, err)
- }
-
- request := &gitalypb.SetCustomHooksRequest{Repository: repo}
- bundle := streamio.NewWriter(func(p []byte) error {
- request.Data = p
- if err := stream.Send(request); err != nil {
- return err
- }
-
- // Only set `Repository` on the first `Send` of the stream
- request = &gitalypb.SetCustomHooksRequest{}
-
- return nil
- })
- if _, err := io.Copy(bundle, reader); err != nil {
- return fmt.Errorf("restore custom hooks, %q: %w", path, err)
- }
- if _, err = stream.CloseAndRecv(); err != nil {
+ if err := repo.SetCustomHooks(ctx, reader); err != nil {
return fmt.Errorf("restore custom hooks, %q: %w", path, err)
}
return nil
diff --git a/internal/backup/backup_test.go b/internal/backup/backup_test.go
index 1341f767c..2472da0a5 100644
--- a/internal/backup/backup_test.go
+++ b/internal/backup/backup_test.go
@@ -20,6 +20,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/setup"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg"
@@ -94,8 +95,10 @@ func TestManager_Create(t *testing.T) {
storageLocator := config.NewLocator(cfg)
gitCmdFactory := gittest.NewCommandFactory(tb, cfg)
catfileCache := catfile.NewCache(cfg)
+ tb.Cleanup(catfileCache.Stop)
+ txManager := transaction.NewTrackingManager()
- return backup.NewManagerLocal(sink, locator, storageLocator, gitCmdFactory, catfileCache, backupID)
+ return backup.NewManagerLocal(sink, locator, storageLocator, gitCmdFactory, catfileCache, txManager, backupID)
},
},
} {
@@ -243,8 +246,10 @@ func TestManager_Create_incremental(t *testing.T) {
storageLocator := config.NewLocator(cfg)
gitCmdFactory := gittest.NewCommandFactory(tb, cfg)
catfileCache := catfile.NewCache(cfg)
+ tb.Cleanup(catfileCache.Stop)
+ txManager := transaction.NewTrackingManager()
- return backup.NewManagerLocal(sink, locator, storageLocator, gitCmdFactory, catfileCache, backupID)
+ return backup.NewManagerLocal(sink, locator, storageLocator, gitCmdFactory, catfileCache, txManager, backupID)
},
},
} {
@@ -355,232 +360,270 @@ func TestManager_Create_incremental(t *testing.T) {
func TestManager_Restore(t *testing.T) {
t.Parallel()
- ctx := testhelper.Context(t)
+ const backupID = "abc123"
+
cfg := testcfg.Build(t)
testcfg.BuildGitalyHooks(t, cfg)
-
cfg.SocketPath = testserver.RunGitalyServer(t, cfg, setup.RegisterAll)
- cc, err := client.Dial(cfg.SocketPath, nil)
- require.NoError(t, err)
- defer testhelper.MustClose(t, cc)
-
- repoClient := gitalypb.NewRepositoryServiceClient(cc)
-
- _, repoPath := gittest.CreateRepository(t, ctx, cfg)
- commitID := gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("main"))
- gittest.WriteTag(t, cfg, repoPath, "v1.0.0", commitID.Revision())
- repoChecksum := gittest.ChecksumRepo(t, cfg, repoPath)
-
- backupRoot := testhelper.TempDir(t)
-
- for _, tc := range []struct {
- desc string
- locators []string
- setup func(tb testing.TB) (*gitalypb.Repository, *git.Checksum)
- alwaysCreate bool
- expectExists bool
- expectedPaths []string
- expectedErrAs error
+ for _, managerTC := range []struct {
+ desc string
+ setup func(t testing.TB, sink backup.Sink, locator backup.Locator) *backup.Manager
}{
{
- desc: "existing repo, without hooks",
- locators: []string{"legacy", "pointer"},
- setup: func(tb testing.TB) (*gitalypb.Repository, *git.Checksum) {
- repo, _ := gittest.CreateRepository(t, ctx, cfg)
-
- relativePath := stripRelativePath(tb, repo)
- require.NoError(tb, os.MkdirAll(filepath.Join(backupRoot, relativePath), perm.PublicDir))
- bundlePath := filepath.Join(backupRoot, relativePath+".bundle")
- gittest.BundleRepo(tb, cfg, repoPath, bundlePath)
+ desc: "RPC manager",
+ setup: func(tb testing.TB, sink backup.Sink, locator backup.Locator) *backup.Manager {
+ pool := client.NewPool()
+ tb.Cleanup(func() {
+ testhelper.MustClose(tb, pool)
+ })
- return repo, repoChecksum
- },
- expectExists: true,
- },
- {
- desc: "existing repo, with hooks",
- locators: []string{"legacy", "pointer"},
- setup: func(tb testing.TB) (*gitalypb.Repository, *git.Checksum) {
- repo, _ := gittest.CreateRepository(t, ctx, cfg)
-
- relativePath := stripRelativePath(tb, repo)
- bundlePath := filepath.Join(backupRoot, relativePath+".bundle")
- customHooksPath := filepath.Join(backupRoot, relativePath, "custom_hooks.tar")
- require.NoError(tb, os.MkdirAll(filepath.Join(backupRoot, relativePath), perm.PublicDir))
- gittest.BundleRepo(tb, cfg, repoPath, bundlePath)
- testhelper.CopyFile(tb, mustCreateCustomHooksArchive(t, ctx), customHooksPath)
-
- return repo, repoChecksum
- },
- expectedPaths: []string{
- "custom_hooks/pre-commit.sample",
- "custom_hooks/prepare-commit-msg.sample",
- "custom_hooks/pre-push.sample",
- },
- expectExists: true,
- },
- {
- desc: "missing bundle",
- locators: []string{"legacy", "pointer"},
- setup: func(tb testing.TB) (*gitalypb.Repository, *git.Checksum) {
- repo, _ := gittest.CreateRepository(t, ctx, cfg)
- return repo, nil
- },
- expectedErrAs: backup.ErrSkipped,
- },
- {
- desc: "missing bundle, always create",
- locators: []string{"legacy", "pointer"},
- setup: func(tb testing.TB) (*gitalypb.Repository, *git.Checksum) {
- repo, _ := gittest.CreateRepository(t, ctx, cfg)
- return repo, new(git.Checksum)
+ return backup.NewManager(sink, locator, pool, backupID)
},
- alwaysCreate: true,
- expectExists: true,
},
{
- desc: "nonexistent repo",
- locators: []string{"legacy", "pointer"},
- setup: func(tb testing.TB) (*gitalypb.Repository, *git.Checksum) {
- repo := &gitalypb.Repository{
- StorageName: "default",
- RelativePath: gittest.NewRepositoryName(tb),
+ desc: "Local manager",
+ setup: func(tb testing.TB, sink backup.Sink, locator backup.Locator) *backup.Manager {
+ if testhelper.IsPraefectEnabled() {
+ tb.Skip("local backup manager expects to operate on the local filesystem so cannot operate through praefect")
}
- relativePath := stripRelativePath(tb, repo)
- require.NoError(tb, os.MkdirAll(filepath.Dir(filepath.Join(backupRoot, relativePath)), perm.PublicDir))
- bundlePath := filepath.Join(backupRoot, relativePath+".bundle")
- gittest.BundleRepo(tb, cfg, repoPath, bundlePath)
+ storageLocator := config.NewLocator(cfg)
+ gitCmdFactory := gittest.NewCommandFactory(tb, cfg)
+ catfileCache := catfile.NewCache(cfg)
+ tb.Cleanup(catfileCache.Stop)
+ txManager := transaction.NewTrackingManager()
- return repo, repoChecksum
+ return backup.NewManagerLocal(sink, locator, storageLocator, gitCmdFactory, catfileCache, txManager, backupID)
},
- expectExists: true,
- },
- {
- desc: "single incremental",
- locators: []string{"pointer"},
- setup: func(tb testing.TB) (*gitalypb.Repository, *git.Checksum) {
- const backupID = "abc123"
- repo, _ := gittest.CreateRepository(t, ctx, cfg)
- repoBackupPath := joinBackupPath(tb, backupRoot, repo)
- backupPath := filepath.Join(repoBackupPath, backupID)
- require.NoError(tb, os.MkdirAll(backupPath, perm.PublicDir))
- require.NoError(tb, os.WriteFile(filepath.Join(repoBackupPath, "LATEST"), []byte(backupID), perm.PublicFile))
- require.NoError(tb, os.WriteFile(filepath.Join(backupPath, "LATEST"), []byte("001"), perm.PublicFile))
- bundlePath := filepath.Join(backupPath, "001.bundle")
- gittest.BundleRepo(tb, cfg, repoPath, bundlePath)
-
- return repo, repoChecksum
- },
- expectExists: true,
- },
- {
- desc: "many incrementals",
- locators: []string{"pointer"},
- setup: func(tb testing.TB) (*gitalypb.Repository, *git.Checksum) {
- const backupID = "abc123"
-
- _, expectedRepoPath := gittest.CreateRepository(t, ctx, cfg)
-
- repo, _ := gittest.CreateRepository(t, ctx, cfg)
- repoBackupPath := joinBackupPath(tb, backupRoot, repo)
- backupPath := filepath.Join(repoBackupPath, backupID)
- require.NoError(tb, os.MkdirAll(backupPath, perm.PublicDir))
- require.NoError(tb, os.WriteFile(filepath.Join(repoBackupPath, "LATEST"), []byte(backupID), perm.PublicFile))
- require.NoError(tb, os.WriteFile(filepath.Join(backupPath, "LATEST"), []byte("002"), perm.PublicFile))
-
- root := gittest.WriteCommit(tb, cfg, expectedRepoPath,
- gittest.WithBranch("master"),
- )
- master1 := gittest.WriteCommit(tb, cfg, expectedRepoPath,
- gittest.WithBranch("master"),
- gittest.WithParents(root),
- )
- other := gittest.WriteCommit(tb, cfg, expectedRepoPath,
- gittest.WithBranch("other"),
- gittest.WithParents(root),
- )
- gittest.Exec(tb, cfg, "-C", expectedRepoPath, "symbolic-ref", "HEAD", "refs/heads/master")
- bundlePath1 := filepath.Join(backupPath, "001.bundle")
- gittest.Exec(tb, cfg, "-C", expectedRepoPath, "bundle", "create", bundlePath1,
- "HEAD",
- "refs/heads/master",
- "refs/heads/other",
- )
-
- master2 := gittest.WriteCommit(tb, cfg, expectedRepoPath,
- gittest.WithBranch("master"),
- gittest.WithParents(master1),
- )
- bundlePath2 := filepath.Join(backupPath, "002.bundle")
- gittest.Exec(tb, cfg, "-C", expectedRepoPath, "bundle", "create", bundlePath2,
- "HEAD",
- "^"+master1.String(),
- "^"+other.String(),
- "refs/heads/master",
- "refs/heads/other",
- )
-
- checksum := new(git.Checksum)
- checksum.Add(git.NewReference("HEAD", master2.String()))
- checksum.Add(git.NewReference("refs/heads/master", master2.String()))
- checksum.Add(git.NewReference("refs/heads/other", other.String()))
-
- return repo, checksum
- },
- expectExists: true,
},
} {
- t.Run(tc.desc, func(t *testing.T) {
- require.GreaterOrEqual(t, len(tc.locators), 1, "each test case must specify a locator")
-
- for _, locatorName := range tc.locators {
- t.Run(locatorName, func(t *testing.T) {
- repo, expectedChecksum := tc.setup(t)
-
- pool := client.NewPool()
- defer testhelper.MustClose(t, pool)
-
- sink := backup.NewFilesystemSink(backupRoot)
- locator, err := backup.ResolveLocator(locatorName, sink)
- require.NoError(t, err)
+ managerTC := managerTC
+
+ t.Run(managerTC.desc, func(t *testing.T) {
+ t.Parallel()
+
+ ctx := testhelper.Context(t)
+
+ cc, err := client.Dial(cfg.SocketPath, nil)
+ require.NoError(t, err)
+ defer testhelper.MustClose(t, cc)
+
+ repoClient := gitalypb.NewRepositoryServiceClient(cc)
+
+ _, repoPath := gittest.CreateRepository(t, ctx, cfg)
+ commitID := gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("main"))
+ gittest.WriteTag(t, cfg, repoPath, "v1.0.0", commitID.Revision())
+ repoChecksum := gittest.ChecksumRepo(t, cfg, repoPath)
+
+ backupRoot := testhelper.TempDir(t)
+
+ for _, tc := range []struct {
+ desc string
+ locators []string
+ setup func(tb testing.TB) (*gitalypb.Repository, *git.Checksum)
+ alwaysCreate bool
+ expectExists bool
+ expectedPaths []string
+ expectedErrAs error
+ }{
+ {
+ desc: "existing repo, without hooks",
+ locators: []string{"legacy", "pointer"},
+ setup: func(tb testing.TB) (*gitalypb.Repository, *git.Checksum) {
+ repo, _ := gittest.CreateRepository(t, ctx, cfg)
+
+ relativePath := stripRelativePath(tb, repo)
+ require.NoError(tb, os.MkdirAll(filepath.Join(backupRoot, relativePath), perm.PublicDir))
+ bundlePath := filepath.Join(backupRoot, relativePath+".bundle")
+ gittest.BundleRepo(tb, cfg, repoPath, bundlePath)
+
+ return repo, repoChecksum
+ },
+ expectExists: true,
+ },
+ {
+ desc: "existing repo, with hooks",
+ locators: []string{"legacy", "pointer"},
+ setup: func(tb testing.TB) (*gitalypb.Repository, *git.Checksum) {
+ repo, _ := gittest.CreateRepository(t, ctx, cfg)
+
+ relativePath := stripRelativePath(tb, repo)
+ bundlePath := filepath.Join(backupRoot, relativePath+".bundle")
+ customHooksPath := filepath.Join(backupRoot, relativePath, "custom_hooks.tar")
+ require.NoError(tb, os.MkdirAll(filepath.Join(backupRoot, relativePath), perm.PublicDir))
+ gittest.BundleRepo(tb, cfg, repoPath, bundlePath)
+ testhelper.CopyFile(tb, mustCreateCustomHooksArchive(t, ctx), customHooksPath)
+
+ return repo, repoChecksum
+ },
+ expectedPaths: []string{
+ "custom_hooks/pre-commit.sample",
+ "custom_hooks/prepare-commit-msg.sample",
+ "custom_hooks/pre-push.sample",
+ },
+ expectExists: true,
+ },
+ {
+ desc: "missing bundle",
+ locators: []string{"legacy", "pointer"},
+ setup: func(tb testing.TB) (*gitalypb.Repository, *git.Checksum) {
+ repo, _ := gittest.CreateRepository(t, ctx, cfg)
+ return repo, nil
+ },
+ expectedErrAs: backup.ErrSkipped,
+ },
+ {
+ desc: "missing bundle, always create",
+ locators: []string{"legacy", "pointer"},
+ setup: func(tb testing.TB) (*gitalypb.Repository, *git.Checksum) {
+ repo, _ := gittest.CreateRepository(t, ctx, cfg)
+ return repo, new(git.Checksum)
+ },
+ alwaysCreate: true,
+ expectExists: true,
+ },
+ {
+ desc: "nonexistent repo",
+ locators: []string{"legacy", "pointer"},
+ setup: func(tb testing.TB) (*gitalypb.Repository, *git.Checksum) {
+ repo := &gitalypb.Repository{
+ StorageName: "default",
+ RelativePath: gittest.NewRepositoryName(tb),
+ }
- fsBackup := backup.NewManager(sink, locator, pool, "unused-backup-id")
- err = fsBackup.Restore(ctx, &backup.RestoreRequest{
- Server: storage.ServerInfo{Address: cfg.SocketPath, Token: cfg.Auth.Token},
- Repository: repo,
- AlwaysCreate: tc.alwaysCreate,
- })
- if tc.expectedErrAs != nil {
- require.ErrorAs(t, err, &tc.expectedErrAs)
- } else {
- require.NoError(t, err)
- }
+ relativePath := stripRelativePath(tb, repo)
+ require.NoError(tb, os.MkdirAll(filepath.Dir(filepath.Join(backupRoot, relativePath)), perm.PublicDir))
+ bundlePath := filepath.Join(backupRoot, relativePath+".bundle")
+ gittest.BundleRepo(tb, cfg, repoPath, bundlePath)
- exists, err := repoClient.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{
- Repository: repo,
- })
- require.NoError(t, err)
- require.Equal(t, tc.expectExists, exists.Exists, "repository exists")
-
- if expectedChecksum != nil {
- checksum, err := repoClient.CalculateChecksum(ctx, &gitalypb.CalculateChecksumRequest{
- Repository: repo,
+ return repo, repoChecksum
+ },
+ expectExists: true,
+ },
+ {
+ desc: "single incremental",
+ locators: []string{"pointer"},
+ setup: func(tb testing.TB) (*gitalypb.Repository, *git.Checksum) {
+ const backupID = "abc123"
+ repo, _ := gittest.CreateRepository(t, ctx, cfg)
+ repoBackupPath := joinBackupPath(tb, backupRoot, repo)
+ backupPath := filepath.Join(repoBackupPath, backupID)
+ require.NoError(tb, os.MkdirAll(backupPath, perm.PublicDir))
+ require.NoError(tb, os.WriteFile(filepath.Join(repoBackupPath, "LATEST"), []byte(backupID), perm.PublicFile))
+ require.NoError(tb, os.WriteFile(filepath.Join(backupPath, "LATEST"), []byte("001"), perm.PublicFile))
+ bundlePath := filepath.Join(backupPath, "001.bundle")
+ gittest.BundleRepo(tb, cfg, repoPath, bundlePath)
+
+ return repo, repoChecksum
+ },
+ expectExists: true,
+ },
+ {
+ desc: "many incrementals",
+ locators: []string{"pointer"},
+ setup: func(tb testing.TB) (*gitalypb.Repository, *git.Checksum) {
+ const backupID = "abc123"
+
+ _, expectedRepoPath := gittest.CreateRepository(t, ctx, cfg)
+
+ repo, _ := gittest.CreateRepository(t, ctx, cfg)
+ repoBackupPath := joinBackupPath(tb, backupRoot, repo)
+ backupPath := filepath.Join(repoBackupPath, backupID)
+ require.NoError(tb, os.MkdirAll(backupPath, perm.PublicDir))
+ require.NoError(tb, os.WriteFile(filepath.Join(repoBackupPath, "LATEST"), []byte(backupID), perm.PublicFile))
+ require.NoError(tb, os.WriteFile(filepath.Join(backupPath, "LATEST"), []byte("002"), perm.PublicFile))
+
+ root := gittest.WriteCommit(tb, cfg, expectedRepoPath,
+ gittest.WithBranch("master"),
+ )
+ master1 := gittest.WriteCommit(tb, cfg, expectedRepoPath,
+ gittest.WithBranch("master"),
+ gittest.WithParents(root),
+ )
+ other := gittest.WriteCommit(tb, cfg, expectedRepoPath,
+ gittest.WithBranch("other"),
+ gittest.WithParents(root),
+ )
+ gittest.Exec(tb, cfg, "-C", expectedRepoPath, "symbolic-ref", "HEAD", "refs/heads/master")
+ bundlePath1 := filepath.Join(backupPath, "001.bundle")
+ gittest.Exec(tb, cfg, "-C", expectedRepoPath, "bundle", "create", bundlePath1,
+ "HEAD",
+ "refs/heads/master",
+ "refs/heads/other",
+ )
+
+ master2 := gittest.WriteCommit(tb, cfg, expectedRepoPath,
+ gittest.WithBranch("master"),
+ gittest.WithParents(master1),
+ )
+ bundlePath2 := filepath.Join(backupPath, "002.bundle")
+ gittest.Exec(tb, cfg, "-C", expectedRepoPath, "bundle", "create", bundlePath2,
+ "HEAD",
+ "^"+master1.String(),
+ "^"+other.String(),
+ "refs/heads/master",
+ "refs/heads/other",
+ )
+
+ checksum := new(git.Checksum)
+ checksum.Add(git.NewReference("HEAD", master2.String()))
+ checksum.Add(git.NewReference("refs/heads/master", master2.String()))
+ checksum.Add(git.NewReference("refs/heads/other", other.String()))
+
+ return repo, checksum
+ },
+ expectExists: true,
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ require.GreaterOrEqual(t, len(tc.locators), 1, "each test case must specify a locator")
+
+ for _, locatorName := range tc.locators {
+ t.Run(locatorName, func(t *testing.T) {
+ repo, expectedChecksum := tc.setup(t)
+
+ sink := backup.NewFilesystemSink(backupRoot)
+ locator, err := backup.ResolveLocator(locatorName, sink)
+ require.NoError(t, err)
+
+ fsBackup := managerTC.setup(t, sink, locator)
+ err = fsBackup.Restore(ctx, &backup.RestoreRequest{
+ Server: storage.ServerInfo{Address: cfg.SocketPath, Token: cfg.Auth.Token},
+ Repository: repo,
+ AlwaysCreate: tc.alwaysCreate,
+ })
+ if tc.expectedErrAs != nil {
+ require.ErrorAs(t, err, &tc.expectedErrAs)
+ } else {
+ require.NoError(t, err)
+ }
+
+ exists, err := repoClient.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{
+ Repository: repo,
+ })
+ require.NoError(t, err)
+ require.Equal(t, tc.expectExists, exists.Exists, "repository exists")
+
+ if expectedChecksum != nil {
+ checksum, err := repoClient.CalculateChecksum(ctx, &gitalypb.CalculateChecksumRequest{
+ Repository: repo,
+ })
+ require.NoError(t, err)
+
+ require.Equal(t, expectedChecksum.String(), checksum.GetChecksum())
+ }
+
+ if len(tc.expectedPaths) > 0 {
+ // Restore has to use the rewritten path as the relative path due to the test creating
+ // the repository through Praefect. In order to get to the correct disk paths, we need
+ // to get the replica path of the rewritten repository.
+ repoPath := filepath.Join(cfg.Storages[0].Path, gittest.GetReplicaPath(t, ctx, cfg, repo))
+ for _, p := range tc.expectedPaths {
+ require.FileExists(t, filepath.Join(repoPath, p))
+ }
+ }
})
- require.NoError(t, err)
-
- require.Equal(t, expectedChecksum.String(), checksum.GetChecksum())
- }
-
- if len(tc.expectedPaths) > 0 {
- // Restore has to use the rewritten path as the relative path due to the test creating
- // the repository through Praefect. In order to get to the correct disk paths, we need
- // to get the replica path of the rewritten repository.
- repoPath := filepath.Join(cfg.Storages[0].Path, gittest.GetReplicaPath(t, ctx, cfg, repo))
- for _, p := range tc.expectedPaths {
- require.FileExists(t, filepath.Join(repoPath, p))
- }
}
})
}
diff --git a/internal/backup/repository.go b/internal/backup/repository.go
index 6c2523908..957c4b64d 100644
--- a/internal/backup/repository.go
+++ b/internal/backup/repository.go
@@ -12,6 +12,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/repoutil"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper/chunk"
"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
@@ -148,6 +149,89 @@ func (rr *remoteRepository) CreateBundle(ctx context.Context, out io.Writer, pat
return nil
}
+// Remove removes the repository. Does not return an error if the repository
+// cannot be found.
+func (rr *remoteRepository) Remove(ctx context.Context) error {
+ repoClient := rr.newRepoClient()
+ _, err := repoClient.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{
+ Repository: rr.repo,
+ })
+ switch {
+ case status.Code(err) == codes.NotFound:
+ return nil
+ case err != nil:
+ return fmt.Errorf("remote repository: remove: %w", err)
+ }
+ return nil
+}
+
+// Create creates the repository.
+func (rr *remoteRepository) Create(ctx context.Context) error {
+ repoClient := rr.newRepoClient()
+ if _, err := repoClient.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{Repository: rr.repo}); err != nil {
+ return fmt.Errorf("remote repository: create: %w", err)
+ }
+ return nil
+}
+
+// FetchBundle fetches references from a bundle. Refs will be mirrored to the
+// repository.
+func (rr *remoteRepository) FetchBundle(ctx context.Context, reader io.Reader) error {
+ repoClient := rr.newRepoClient()
+ stream, err := repoClient.FetchBundle(ctx)
+ if err != nil {
+ return fmt.Errorf("remote repository: fetch bundle: %w", err)
+ }
+ request := &gitalypb.FetchBundleRequest{Repository: rr.repo, UpdateHead: true}
+ bundle := streamio.NewWriter(func(p []byte) error {
+ request.Data = p
+ if err := stream.Send(request); err != nil {
+ return err
+ }
+
+ // Only set `Repository` on the first `Send` of the stream
+ request = &gitalypb.FetchBundleRequest{}
+
+ return nil
+ })
+ if _, err := io.Copy(bundle, reader); err != nil {
+ return fmt.Errorf("remote repository: fetch bundle: %w", err)
+ }
+ if _, err = stream.CloseAndRecv(); err != nil {
+ return fmt.Errorf("remote repository: fetch bundle: %w", err)
+ }
+ return nil
+}
+
+// SetCustomHooks updates the custom hooks for the repository.
+func (rr *remoteRepository) SetCustomHooks(ctx context.Context, reader io.Reader) error {
+ repoClient := rr.newRepoClient()
+ stream, err := repoClient.SetCustomHooks(ctx)
+ if err != nil {
+ return fmt.Errorf("remote repository: set custom hooks: %w", err)
+ }
+
+ request := &gitalypb.SetCustomHooksRequest{Repository: rr.repo}
+ bundle := streamio.NewWriter(func(p []byte) error {
+ request.Data = p
+ if err := stream.Send(request); err != nil {
+ return err
+ }
+
+ // Only set `Repository` on the first `Send` of the stream
+ request = &gitalypb.SetCustomHooksRequest{}
+
+ return nil
+ })
+ if _, err := io.Copy(bundle, reader); err != nil {
+ return fmt.Errorf("remote repository: set custom hooks: %w", err)
+ }
+ if _, err = stream.CloseAndRecv(); err != nil {
+ return fmt.Errorf("remote repository: set custom hooks: %w", err)
+ }
+ return nil
+}
+
func (rr *remoteRepository) newRepoClient() gitalypb.RepositoryServiceClient {
return gitalypb.NewRepositoryServiceClient(rr.conn)
}
@@ -157,14 +241,23 @@ func (rr *remoteRepository) newRefClient() gitalypb.RefServiceClient {
}
type localRepository struct {
- locator storage.Locator
- repo *localrepo.Repo
+ locator storage.Locator
+ gitCmdFactory git.CommandFactory
+ txManager transaction.Manager
+ repo *localrepo.Repo
}
-func newLocalRepository(locator storage.Locator, repo *localrepo.Repo) *localRepository {
+func newLocalRepository(
+ locator storage.Locator,
+ gitCmdFactory git.CommandFactory,
+ txManager transaction.Manager,
+ repo *localrepo.Repo,
+) *localRepository {
return &localRepository{
- locator: locator,
- repo: repo,
+ locator: locator,
+ gitCmdFactory: gitCmdFactory,
+ txManager: txManager,
+ repo: repo,
}
}
@@ -229,3 +322,57 @@ func (r *localRepository) CreateBundle(ctx context.Context, out io.Writer, patte
return nil
}
+
+// Remove removes the repository. Does not return an error if the repository
+// cannot be found.
+func (r *localRepository) Remove(ctx context.Context) error {
+ err := repoutil.Remove(ctx, r.locator, r.txManager, r.repo)
+ switch {
+ case status.Code(err) == codes.NotFound:
+ return nil
+ case err != nil:
+ return fmt.Errorf("local repository: remove: %w", err)
+ }
+ return nil
+}
+
+// Create creates the repository.
+func (r *localRepository) Create(ctx context.Context) error {
+ if err := repoutil.Create(
+ ctx,
+ r.locator,
+ r.gitCmdFactory,
+ r.txManager,
+ r.repo,
+ func(repository *gitalypb.Repository) error { return nil },
+ ); err != nil {
+ return fmt.Errorf("local repository: create: %w", err)
+ }
+ return nil
+}
+
+// FetchBundle fetches references from a bundle. Refs will be mirrored to the
+// repository.
+func (r *localRepository) FetchBundle(ctx context.Context, reader io.Reader) error {
+ err := r.repo.FetchBundle(ctx, r.txManager, reader, &localrepo.FetchBundleOpts{
+ UpdateHead: true,
+ })
+ if err != nil {
+ return fmt.Errorf("local repository: fetch bundle: %w", err)
+ }
+ return nil
+}
+
+// SetCustomHooks updates the custom hooks for the repository.
+func (r *localRepository) SetCustomHooks(ctx context.Context, reader io.Reader) error {
+ if err := repoutil.SetCustomHooks(
+ ctx,
+ r.locator,
+ r.txManager,
+ reader,
+ r.repo,
+ ); err != nil {
+ return fmt.Errorf("local repository: set custom hooks: %w", err)
+ }
+ return nil
+}
diff --git a/internal/gitaly/service/internalgitaly/backup_repos.go b/internal/gitaly/service/internalgitaly/backup_repos.go
index ff43b5b5e..7fa56d583 100644
--- a/internal/gitaly/service/internalgitaly/backup_repos.go
+++ b/internal/gitaly/service/internalgitaly/backup_repos.go
@@ -37,17 +37,14 @@ func (s server) BackupRepos(stream gitalypb.InternalGitaly_BackupReposServer) er
return structerr.NewInvalidArgument("backup repos: resolve locator: %w", err)
}
- manager := backup.NewManagerLocal(sink, locator, s.locator, s.gitCmdFactory, s.catfileCache, backupID)
+ manager := backup.NewManagerLocal(sink, locator, s.locator, s.gitCmdFactory, s.catfileCache, s.txManager, backupID)
pipeline := backup.NewLoggingPipeline(ctxlogrus.Extract(ctx))
for {
for _, repo := range request.GetRepositories() {
pipeline.Handle(ctx, backup.NewCreateCommand(
manager,
- // ServerInfo will be removed once restore methods are added to
- // backup.Repository. Even though it is unused it must be
- // non-zero so that storage.ExtractGitalyServer is not called.
- storage.ServerInfo{Address: "unused"},
+ storage.ServerInfo{},
repo,
false,
))
diff --git a/internal/gitaly/service/internalgitaly/backup_repos_test.go b/internal/gitaly/service/internalgitaly/backup_repos_test.go
index 6b00d4da0..8adb11524 100644
--- a/internal/gitaly/service/internalgitaly/backup_repos_test.go
+++ b/internal/gitaly/service/internalgitaly/backup_repos_test.go
@@ -12,6 +12,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
@@ -120,11 +121,14 @@ func TestServerBackupRepos(t *testing.T) {
catfileCache := catfile.NewCache(cfg)
t.Cleanup(catfileCache.Stop)
+ txManager := transaction.NewTrackingManager()
+
srv := NewServer(
cfg.Storages,
config.NewLocator(cfg),
gittest.NewCommandFactory(t, cfg),
catfileCache,
+ txManager,
)
client := setupInternalGitalyService(t, cfg, srv)
diff --git a/internal/gitaly/service/internalgitaly/server.go b/internal/gitaly/service/internalgitaly/server.go
index d0ce88352..41231fcb1 100644
--- a/internal/gitaly/service/internalgitaly/server.go
+++ b/internal/gitaly/service/internalgitaly/server.go
@@ -5,6 +5,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
)
@@ -14,6 +15,7 @@ type server struct {
locator storage.Locator
gitCmdFactory git.CommandFactory
catfileCache catfile.Cache
+ txManager transaction.Manager
}
// NewServer return an instance of the Gitaly service.
@@ -22,11 +24,13 @@ func NewServer(
locator storage.Locator,
gitCmdFactory git.CommandFactory,
catfileCache catfile.Cache,
+ txManager transaction.Manager,
) gitalypb.InternalGitalyServer {
return &server{
storages: storages,
locator: locator,
gitCmdFactory: gitCmdFactory,
catfileCache: catfileCache,
+ txManager: txManager,
}
}
diff --git a/internal/gitaly/service/internalgitaly/walkrepos_test.go b/internal/gitaly/service/internalgitaly/walkrepos_test.go
index de4e5b23d..a1bfa769a 100644
--- a/internal/gitaly/service/internalgitaly/walkrepos_test.go
+++ b/internal/gitaly/service/internalgitaly/walkrepos_test.go
@@ -12,6 +12,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
@@ -74,6 +75,8 @@ func TestWalkRepos(t *testing.T) {
catfileCache := catfile.NewCache(cfg)
t.Cleanup(catfileCache.Stop)
+ txManager := transaction.NewTrackingManager()
+
// to test a directory being deleted during a walk, we must delete a directory after
// the file walk has started. To achieve that, we wrap the server to pass down a wrapped
// stream that allows us to hook in to stream responses. We then delete 'b' when
@@ -84,6 +87,7 @@ func TestWalkRepos(t *testing.T) {
config.NewLocator(cfg),
gittest.NewCommandFactory(t, cfg),
catfileCache,
+ txManager,
)
wsrv := &serverWrapper{
srv,
diff --git a/internal/gitaly/service/setup/register.go b/internal/gitaly/service/setup/register.go
index 50f0482c1..0a0177cfe 100644
--- a/internal/gitaly/service/setup/register.go
+++ b/internal/gitaly/service/setup/register.go
@@ -149,6 +149,7 @@ func RegisterAll(srv *grpc.Server, deps *service.Dependencies) {
deps.GetLocator(),
deps.GetGitCmdFactory(),
deps.GetCatfileCache(),
+ deps.GetTxManager(),
))
healthpb.RegisterHealthServer(srv, health.NewServer())