diff options
author | James Fargher <jfargher@gitlab.com> | 2023-05-26 01:35:09 +0300 |
---|---|---|
committer | James Fargher <jfargher@gitlab.com> | 2023-06-01 03:52:26 +0300 |
commit | 39e8de2e186786070c33b8c2b38091262b4e135e (patch) | |
tree | 9242db22d820e64c892f9d8f62355cf98816370d | |
parent | b256361ad89292246032543fb712b8ff503be5f9 (diff) |
backup: Implement fetch bundle on backup.Repository
In preparation for server-side backups here we extract the RPC
implementation of fetch bundle and additionally implement a local
repository fetch bundle.
-rw-r--r-- | internal/backup/backup.go | 34 | ||||
-rw-r--r-- | internal/backup/repository.go | 41 |
2 files changed, 48 insertions, 27 deletions
diff --git a/internal/backup/backup.go b/internal/backup/backup.go index 545f50efe..7dc52ed64 100644 --- a/internal/backup/backup.go +++ b/internal/backup/backup.go @@ -87,6 +87,9 @@ type Repository interface { Remove(ctx context.Context) error // Create creates the repository. Create(ctx context.Context) error + // FetchBundle fetches references from a bundle. Refs will be mirrored to + // the repository. + FetchBundle(ctx context.Context, reader io.Reader) error } // ResolveLocator returns a locator implementation based on a locator identifier. @@ -276,7 +279,7 @@ func (mgr *Manager) Restore(ctx context.Context, req *RestoreRequest) error { } for _, step := range backup.Steps { - if err := mgr.restoreBundle(ctx, step.BundlePath, req.Server, req.Repository); err != nil { + if err := mgr.restoreBundle(ctx, repo, step.BundlePath); err != nil { if step.SkippableOnNotFound && errors.Is(err, ErrDoesntExist) { // For compatibility with existing backups we need to make sure the // repository exists even if there's no bundle for project @@ -425,37 +428,14 @@ func (s *createBundleFromRefListSender) Send() error { return s.stream.Send(&s.chunk) } -func (mgr *Manager) restoreBundle(ctx context.Context, path string, server storage.ServerInfo, repo *gitalypb.Repository) error { +func (mgr *Manager) restoreBundle(ctx context.Context, repo Repository, path string) error { reader, err := mgr.sink.GetReader(ctx, path) if err != nil { - return fmt.Errorf("restore bundle: %w", err) - } - defer reader.Close() - - repoClient, err := mgr.newRepoClient(ctx, server) - if err != nil { return fmt.Errorf("restore bundle: %q: %w", path, err) } - stream, err := repoClient.FetchBundle(ctx) - if err != nil { - return fmt.Errorf("restore bundle: %q: %w", path, err) - } - request := &gitalypb.FetchBundleRequest{Repository: repo, UpdateHead: true} - bundle := streamio.NewWriter(func(p []byte) error { - request.Data = p - if err := stream.Send(request); err != nil { - return err - } - - // Only set `Repository` on the first `Send` of the stream - request = &gitalypb.FetchBundleRequest{} + defer reader.Close() - return nil - }) - if _, err := io.Copy(bundle, reader); err != nil { - return fmt.Errorf("restore bundle: %q: %w", path, err) - } - if _, err = stream.CloseAndRecv(); err != nil { + if err := repo.FetchBundle(ctx, reader); err != nil { return fmt.Errorf("restore bundle: %q: %w", path, err) } return nil diff --git a/internal/backup/repository.go b/internal/backup/repository.go index 204ee1e8c..d000baee9 100644 --- a/internal/backup/repository.go +++ b/internal/backup/repository.go @@ -174,6 +174,35 @@ func (rr *remoteRepository) Create(ctx context.Context) error { return nil } +// FetchBundle fetches references from a bundle. Refs will be mirrored to the +// repository. +func (rr *remoteRepository) FetchBundle(ctx context.Context, reader io.Reader) error { + repoClient := rr.newRepoClient() + stream, err := repoClient.FetchBundle(ctx) + if err != nil { + return fmt.Errorf("remote repository: fetch bundle: %w", err) + } + request := &gitalypb.FetchBundleRequest{Repository: rr.repo, UpdateHead: true} + bundle := streamio.NewWriter(func(p []byte) error { + request.Data = p + if err := stream.Send(request); err != nil { + return err + } + + // Only set `Repository` on the first `Send` of the stream + request = &gitalypb.FetchBundleRequest{} + + return nil + }) + if _, err := io.Copy(bundle, reader); err != nil { + return fmt.Errorf("remote repository: fetch bundle: %w", err) + } + if _, err = stream.CloseAndRecv(); err != nil { + return fmt.Errorf("remote repository: fetch bundle: %w", err) + } + return nil +} + func (rr *remoteRepository) newRepoClient() gitalypb.RepositoryServiceClient { return gitalypb.NewRepositoryServiceClient(rr.conn) } @@ -292,3 +321,15 @@ func (r *localRepository) Create(ctx context.Context) error { } return nil } + +// FetchBundle fetches references from a bundle. Refs will be mirrored to the +// repository. +func (r *localRepository) FetchBundle(ctx context.Context, reader io.Reader) error { + err := r.repo.FetchBundle(ctx, r.txManager, reader, &localrepo.FetchBundleOpts{ + UpdateHead: true, + }) + if err != nil { + return fmt.Errorf("local repository: fetch bundle: %w", err) + } + return nil +} |