Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames Fargher <jfargher@gitlab.com>2023-05-26 01:35:09 +0300
committerJames Fargher <jfargher@gitlab.com>2023-06-01 03:52:26 +0300
commit39e8de2e186786070c33b8c2b38091262b4e135e (patch)
tree9242db22d820e64c892f9d8f62355cf98816370d
parentb256361ad89292246032543fb712b8ff503be5f9 (diff)
backup: Implement fetch bundle on backup.Repository
In preparation for server-side backups here we extract the RPC implementation of fetch bundle and additionally implement a local repository fetch bundle.
-rw-r--r--internal/backup/backup.go34
-rw-r--r--internal/backup/repository.go41
2 files changed, 48 insertions, 27 deletions
diff --git a/internal/backup/backup.go b/internal/backup/backup.go
index 545f50efe..7dc52ed64 100644
--- a/internal/backup/backup.go
+++ b/internal/backup/backup.go
@@ -87,6 +87,9 @@ type Repository interface {
Remove(ctx context.Context) error
// Create creates the repository.
Create(ctx context.Context) error
+ // FetchBundle fetches references from a bundle. Refs will be mirrored to
+ // the repository.
+ FetchBundle(ctx context.Context, reader io.Reader) error
}
// ResolveLocator returns a locator implementation based on a locator identifier.
@@ -276,7 +279,7 @@ func (mgr *Manager) Restore(ctx context.Context, req *RestoreRequest) error {
}
for _, step := range backup.Steps {
- if err := mgr.restoreBundle(ctx, step.BundlePath, req.Server, req.Repository); err != nil {
+ if err := mgr.restoreBundle(ctx, repo, step.BundlePath); err != nil {
if step.SkippableOnNotFound && errors.Is(err, ErrDoesntExist) {
// For compatibility with existing backups we need to make sure the
// repository exists even if there's no bundle for project
@@ -425,37 +428,14 @@ func (s *createBundleFromRefListSender) Send() error {
return s.stream.Send(&s.chunk)
}
-func (mgr *Manager) restoreBundle(ctx context.Context, path string, server storage.ServerInfo, repo *gitalypb.Repository) error {
+func (mgr *Manager) restoreBundle(ctx context.Context, repo Repository, path string) error {
reader, err := mgr.sink.GetReader(ctx, path)
if err != nil {
- return fmt.Errorf("restore bundle: %w", err)
- }
- defer reader.Close()
-
- repoClient, err := mgr.newRepoClient(ctx, server)
- if err != nil {
return fmt.Errorf("restore bundle: %q: %w", path, err)
}
- stream, err := repoClient.FetchBundle(ctx)
- if err != nil {
- return fmt.Errorf("restore bundle: %q: %w", path, err)
- }
- request := &gitalypb.FetchBundleRequest{Repository: repo, UpdateHead: true}
- bundle := streamio.NewWriter(func(p []byte) error {
- request.Data = p
- if err := stream.Send(request); err != nil {
- return err
- }
-
- // Only set `Repository` on the first `Send` of the stream
- request = &gitalypb.FetchBundleRequest{}
+ defer reader.Close()
- return nil
- })
- if _, err := io.Copy(bundle, reader); err != nil {
- return fmt.Errorf("restore bundle: %q: %w", path, err)
- }
- if _, err = stream.CloseAndRecv(); err != nil {
+ if err := repo.FetchBundle(ctx, reader); err != nil {
return fmt.Errorf("restore bundle: %q: %w", path, err)
}
return nil
diff --git a/internal/backup/repository.go b/internal/backup/repository.go
index 204ee1e8c..d000baee9 100644
--- a/internal/backup/repository.go
+++ b/internal/backup/repository.go
@@ -174,6 +174,35 @@ func (rr *remoteRepository) Create(ctx context.Context) error {
return nil
}
+// FetchBundle fetches references from a bundle. Refs will be mirrored to the
+// repository.
+func (rr *remoteRepository) FetchBundle(ctx context.Context, reader io.Reader) error {
+ repoClient := rr.newRepoClient()
+ stream, err := repoClient.FetchBundle(ctx)
+ if err != nil {
+ return fmt.Errorf("remote repository: fetch bundle: %w", err)
+ }
+ request := &gitalypb.FetchBundleRequest{Repository: rr.repo, UpdateHead: true}
+ bundle := streamio.NewWriter(func(p []byte) error {
+ request.Data = p
+ if err := stream.Send(request); err != nil {
+ return err
+ }
+
+ // Only set `Repository` on the first `Send` of the stream
+ request = &gitalypb.FetchBundleRequest{}
+
+ return nil
+ })
+ if _, err := io.Copy(bundle, reader); err != nil {
+ return fmt.Errorf("remote repository: fetch bundle: %w", err)
+ }
+ if _, err = stream.CloseAndRecv(); err != nil {
+ return fmt.Errorf("remote repository: fetch bundle: %w", err)
+ }
+ return nil
+}
+
func (rr *remoteRepository) newRepoClient() gitalypb.RepositoryServiceClient {
return gitalypb.NewRepositoryServiceClient(rr.conn)
}
@@ -292,3 +321,15 @@ func (r *localRepository) Create(ctx context.Context) error {
}
return nil
}
+
+// FetchBundle fetches references from a bundle. Refs will be mirrored to the
+// repository.
+func (r *localRepository) FetchBundle(ctx context.Context, reader io.Reader) error {
+ err := r.repo.FetchBundle(ctx, r.txManager, reader, &localrepo.FetchBundleOpts{
+ UpdateHead: true,
+ })
+ if err != nil {
+ return fmt.Errorf("local repository: fetch bundle: %w", err)
+ }
+ return nil
+}