diff options
author | Justin Tobler <jtobler@gitlab.com> | 2024-01-18 00:58:00 +0300 |
---|---|---|
committer | GitLab <noreply@gitlab.com> | 2024-01-18 00:58:00 +0300 |
commit | 8c7ef2ac66421fbdb70025a3ab1ed237e8affa7a (patch) | |
tree | 732aa7dcea56261e5023c351fd8d814d84fad17a | |
parent | 72faf14d9551b706f46103d665d75099ea8b1b0c (diff) | |
parent | 3395de7c96c6770ad34bbd14bf7908d5e34bbe2f (diff) |
Merge branch 'smh-create-fork-partitioning' into 'master'
Partition fork with source repository in CreateFork
Closes #5762
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6612
Merged-by: Justin Tobler <jtobler@gitlab.com>
Approved-by: Will Chandler <wchandler@gitlab.com>
Approved-by: Justin Tobler <jtobler@gitlab.com>
Co-authored-by: Sami Hiltunen <shiltunen@gitlab.com>
4 files changed, 135 insertions, 10 deletions
diff --git a/internal/gitaly/service/objectpool/link_test.go b/internal/gitaly/service/objectpool/link_test.go index cf38812da..a5e705546 100644 --- a/internal/gitaly/service/objectpool/link_test.go +++ b/internal/gitaly/service/objectpool/link_test.go @@ -13,10 +13,77 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testserver" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) +func TestCompleteForkCreationFlow(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + + cfg, sourceRepository, _, _, objectPoolClient := setup(t, ctx, testserver.WithDisablePraefect()) + + repositoryClient := gitalypb.NewRepositoryServiceClient( + objectPoolClient.(clientWithConn).conn, + ) + + forkRepository := &gitalypb.Repository{ + StorageName: sourceRepository.StorageName, + RelativePath: gittest.NewRepositoryName(t), + } + + // Inject the Gitaly's address information in the context. CreateFork uses this to + // fetch from the source repository. + ctx = testhelper.MergeOutgoingMetadata(ctx, testcfg.GitalyServersMetadataFromCfg(t, cfg)) + // Build GitalySSH as CreateFork uses to perform the fetch. + testcfg.BuildGitalySSH(t, cfg) + createForkResponse, err := repositoryClient.CreateFork(ctx, &gitalypb.CreateForkRequest{ + Repository: forkRepository, + SourceRepository: sourceRepository, + }) + require.NoError(t, err) + testhelper.ProtoEqual(t, &gitalypb.CreateForkResponse{}, createForkResponse) + + // Create an object pool from the source repository. + objectPool, _, _ := createObjectPool(t, ctx, cfg, sourceRepository) + + // Link the source repository itself to the object pool. + linkSourceToObjectPoolResponse, err := objectPoolClient.LinkRepositoryToObjectPool(ctx, &gitalypb.LinkRepositoryToObjectPoolRequest{ + ObjectPool: objectPool, + Repository: sourceRepository, + }) + require.NoError(t, err) + testhelper.ProtoEqual(t, &gitalypb.LinkRepositoryToObjectPoolResponse{}, linkSourceToObjectPoolResponse) + + // Link the fork to the object pool. + linkForkToObjectPoolResponse, err := objectPoolClient.LinkRepositoryToObjectPool(ctx, &gitalypb.LinkRepositoryToObjectPoolRequest{ + ObjectPool: objectPool, + Repository: forkRepository, + }) + require.NoError(t, err) + testhelper.ProtoEqual(t, &gitalypb.LinkRepositoryToObjectPoolResponse{}, linkForkToObjectPoolResponse) + + // Ensure the source is linked to the pool now. + getSourceObjectPoolResponse, err := objectPoolClient.GetObjectPool(ctx, &gitalypb.GetObjectPoolRequest{ + Repository: sourceRepository, + }) + require.NoError(t, err) + testhelper.ProtoEqual(t, &gitalypb.GetObjectPoolResponse{ + ObjectPool: objectPool, + }, getSourceObjectPoolResponse) + + // Ensure the fork is linked to the pool now. + getForkObjectPoolResponse, err := objectPoolClient.GetObjectPool(ctx, &gitalypb.GetObjectPoolRequest{ + Repository: forkRepository, + }) + require.NoError(t, err) + testhelper.ProtoEqual(t, &gitalypb.GetObjectPoolResponse{ + ObjectPool: objectPool, + }, getForkObjectPoolResponse) +} + func TestLink(t *testing.T) { t.Parallel() diff --git a/internal/gitaly/service/objectpool/testhelper_test.go b/internal/gitaly/service/objectpool/testhelper_test.go index d1a083f45..498199953 100644 --- a/internal/gitaly/service/objectpool/testhelper_test.go +++ b/internal/gitaly/service/objectpool/testhelper_test.go @@ -13,6 +13,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" hookservice "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/hook" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/repository" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/ssh" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/log" @@ -66,6 +67,7 @@ func runObjectPoolServer(t *testing.T, cfg config.Cfg, locator storage.Locator, gitalypb.RegisterObjectPoolServiceServer(srv, NewServer(deps)) gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps)) gitalypb.RegisterRepositoryServiceServer(srv, repository.NewServer(deps)) + gitalypb.RegisterSSHServiceServer(srv, ssh.NewServer(deps)) }, append(opts, testserver.WithLocator(locator), testserver.WithLogger(logger))...) } diff --git a/internal/gitaly/storage/storagemgr/middleware.go b/internal/gitaly/storage/storagemgr/middleware.go index 290545def..a72bbe88a 100644 --- a/internal/gitaly/storage/storagemgr/middleware.go +++ b/internal/gitaly/storage/storagemgr/middleware.go @@ -26,6 +26,9 @@ var ( // ErrQuarantineWithoutSnapshotRelativePath is returned when a request is configured with a quarantine but the snapshot's // relative path was not sent in a header. ErrQuarantineWithoutSnapshotRelativePath = errors.New("quarantined request did not contain snapshot relative path") + // ErrRepositoriesInDifferentStorages is returned when trying to access two repositories in different storages in the + // same transaction. + ErrRepositoriesInDifferentStorages = structerr.NewInvalidArgument("additional and target repositories are in different storages") ) // NonTransactionalRPCs are the RPCs that do not support transactions. @@ -247,22 +250,36 @@ func transactionalizeRequest(ctx context.Context, logger log.Logger, txRegistry return transactionalizedRequest{}, err } - var alternateRelativePath string - if additionalRepo, err := methodInfo.AdditionalRepo(req); err != nil { + // Object pools need to be placed in the same partition as their members. Below we figure out which repository, + // if any, the target repository of the RPC must be partitioned with. We figure this out using two strategies: + // + // The general case is handled by extracting the additional repository from the RPC, and paritioning the target + // repository of the RPC with the additional repository. Many of the ObjectPoolService's RPCs operate on two + // repositories. Depending on the RPC, the additional repository is either the object pool itself or a member + // of the pool. + // + // CreateFork is special cased. The fork must partitioned with the source repository in order to successfully + // link it with the object pool later. The source repository is not tagged as additional repository in the + // CreateForkRequest. If the request is CreateForkRequest, we extract the source repository and partition the + // fork with it. + var additionalRepo *gitalypb.Repository + if additionalRepo, err = methodInfo.AdditionalRepo(req); err != nil { if !errors.Is(err, protoregistry.ErrRepositoryFieldNotFound) { return transactionalizedRequest{}, fmt.Errorf("extract additional repository: %w", err) } // There was no additional repository. - } else { - if additionalRepo.StorageName != targetRepo.StorageName { - return transactionalizedRequest{}, structerr.NewInvalidArgument("additional and target repositories are in different storages") - } + } + + if req, ok := req.(*gitalypb.CreateForkRequest); ok { + additionalRepo = req.GetSourceRepository() + } - alternateRelativePath = additionalRepo.RelativePath + if additionalRepo != nil && additionalRepo.StorageName != targetRepo.StorageName { + return transactionalizedRequest{}, ErrRepositoriesInDifferentStorages } - tx, err := mgr.Begin(ctx, targetRepo.StorageName, targetRepo.RelativePath, alternateRelativePath, methodInfo.Operation == protoregistry.OpAccessor) + tx, err := mgr.Begin(ctx, targetRepo.StorageName, targetRepo.RelativePath, additionalRepo.GetRelativePath(), methodInfo.Operation == protoregistry.OpAccessor) if err != nil { return transactionalizedRequest{}, fmt.Errorf("begin transaction: %w", err) } diff --git a/internal/gitaly/storage/storagemgr/middleware_test.go b/internal/gitaly/storage/storagemgr/middleware_test.go index 794222874..9196d10b0 100644 --- a/internal/gitaly/storage/storagemgr/middleware_test.go +++ b/internal/gitaly/storage/storagemgr/middleware_test.go @@ -36,6 +36,7 @@ type mockRepositoryService struct { removeRepositoryFunc func(context.Context, *gitalypb.RemoveRepositoryRequest) (*gitalypb.RemoveRepositoryResponse, error) setCustomHooksFunc func(gitalypb.RepositoryService_SetCustomHooksServer) error getCustomHooksFunc func(*gitalypb.GetCustomHooksRequest, gitalypb.RepositoryService_GetCustomHooksServer) error + createForkFunc func(context.Context, *gitalypb.CreateForkRequest) (*gitalypb.CreateForkResponse, error) gitalypb.UnimplementedRepositoryServiceServer } @@ -59,6 +60,10 @@ func (m mockRepositoryService) GetCustomHooks(req *gitalypb.GetCustomHooksReques return m.getCustomHooksFunc(req, stream) } +func (m mockRepositoryService) CreateFork(ctx context.Context, req *gitalypb.CreateForkRequest) (*gitalypb.CreateForkResponse, error) { + return m.createForkFunc(ctx, req) +} + type mockHealthService struct { checkFunc func(context.Context, *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) grpc_health_v1.UnimplementedHealthServer @@ -396,6 +401,35 @@ messages and behavior by erroring out the requests before they even hit this int }, expectHandlerInvoked: true, }, + { + desc: "successful CreateFork request", + performRequest: func(t *testing.T, ctx context.Context, cc *grpc.ClientConn) { + resp, err := gitalypb.NewRepositoryServiceClient(cc).CreateFork(ctx, &gitalypb.CreateForkRequest{ + Repository: validAdditionalRepository(), + SourceRepository: validRepository(), + }) + require.NoError(t, err) + testhelper.ProtoEqual(t, &gitalypb.CreateForkResponse{}, resp) + }, + assertAdditionalRepository: func(t *testing.T, ctx context.Context, actual *gitalypb.Repository) { + testhelper.ProtoEqual(t, validRepository(), actual) + }, + expectHandlerInvoked: true, + }, + { + desc: "CreateFork fails due to repositories in different storages", + performRequest: func(t *testing.T, ctx context.Context, cc *grpc.ClientConn) { + sourceRepository := validRepository() + sourceRepository.StorageName = "different_storage" + + resp, err := gitalypb.NewRepositoryServiceClient(cc).CreateFork(ctx, &gitalypb.CreateForkRequest{ + Repository: validAdditionalRepository(), + SourceRepository: sourceRepository, + }) + require.Equal(t, status.Error(codes.InvalidArgument, storagemgr.ErrRepositoriesInDifferentStorages.Error()), err) + require.Nil(t, resp) + }, + }, } { t.Run(tc.desc, func(t *testing.T) { cfg := testcfg.Build(t) @@ -414,7 +448,7 @@ messages and behavior by erroring out the requests before they even hit this int handlerInvoked := false var transactionID storage.TransactionID - assertHandler := func(ctx context.Context, isMutator bool, repo *gitalypb.Repository) { + assertHandler := func(ctx context.Context, shouldBeQuarantined bool, repo *gitalypb.Repository) { handlerInvoked = true // The repositories should be equal except for the relative path which @@ -427,7 +461,7 @@ messages and behavior by erroring out the requests before they even hit this int expectedRepo.RelativePath = "" actualRepo.RelativePath = "" - if isMutator { + if shouldBeQuarantined { // Mutators should have quarantine directory configured. assert.NotEmpty(t, actualRepo.GitObjectDirectory) actualRepo.GitObjectDirectory = "" @@ -476,6 +510,11 @@ messages and behavior by erroring out the requests before they even hit this int }, }) gitalypb.RegisterRepositoryServiceServer(server, mockRepositoryService{ + createForkFunc: func(ctx context.Context, req *gitalypb.CreateForkRequest) (*gitalypb.CreateForkResponse, error) { + assertHandler(ctx, false, req.GetRepository()) + tc.assertAdditionalRepository(t, ctx, req.GetSourceRepository()) + return &gitalypb.CreateForkResponse{}, tc.handlerError + }, objectFormatFunc: func(ctx context.Context, req *gitalypb.ObjectFormatRequest) (*gitalypb.ObjectFormatResponse, error) { assertHandler(ctx, false, req.GetRepository()) return &gitalypb.ObjectFormatResponse{}, tc.handlerError |