Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2024-01-14 14:10:25 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2024-01-14 18:56:05 +0300
commit3395de7c96c6770ad34bbd14bf7908d5e34bbe2f (patch)
tree81e9b238a4e973619762b8e55ece814af784e365
parent96b75e53b61c476029d510270daa1d5dfc444a09 (diff)
Partition fork with source repository in CreateFork
Gitaly's TransactionManager requires all object pool members to be in the same partition. Currently we're ensuring doing that generally by extracting the additional repository from the request and ensuring the target repository of the RPC gets partitioned with it. Additional repository is used in the ObjectPoolService's RPCs to tag the other repository being accessed in the pool related operations, and it may be either the object pool or one of the member repositories depending on the RPC. When a repository is first accessed, we also check whether it has an existing alternate link on the disk, and place the repository in the same partition with its alternate if so. These two methods are enough to ensure in general the pools and their members get partitioned together. One execption to this is CreateFork. The created fork should be placed in the same partition as the origin repository as they'll both eventually be connected to the same pool. CreateFork does not tag the source repository as an additional repository so the general handling does not apply for it. Tagging it as the additional repository won't work with Praefect as Praefect rewrites the paths of additional repositories. The additional repository is fetched through the API so it needs to have its original relative path intact. This commit introduces special handling for CreateFork. The transaction middleware checks whether the request is a CreateForkRequest. If so, the source repository is extracted and the newly created fork gets partitioned with it. Along with the behavior changes, we add a test that exercises the entire fork creation flow as typically done. Turns out Gitaly didn't have a test covering the scenario at all.
-rw-r--r--internal/gitaly/service/objectpool/link_test.go67
-rw-r--r--internal/gitaly/service/objectpool/testhelper_test.go2
-rw-r--r--internal/gitaly/storage/storagemgr/middleware.go33
-rw-r--r--internal/gitaly/storage/storagemgr/middleware_test.go43
4 files changed, 135 insertions, 10 deletions
diff --git a/internal/gitaly/service/objectpool/link_test.go b/internal/gitaly/service/objectpool/link_test.go
index cf38812da..a5e705546 100644
--- a/internal/gitaly/service/objectpool/link_test.go
+++ b/internal/gitaly/service/objectpool/link_test.go
@@ -13,10 +13,77 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm"
"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testserver"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
)
+func TestCompleteForkCreationFlow(t *testing.T) {
+ t.Parallel()
+
+ ctx := testhelper.Context(t)
+
+ cfg, sourceRepository, _, _, objectPoolClient := setup(t, ctx, testserver.WithDisablePraefect())
+
+ repositoryClient := gitalypb.NewRepositoryServiceClient(
+ objectPoolClient.(clientWithConn).conn,
+ )
+
+ forkRepository := &gitalypb.Repository{
+ StorageName: sourceRepository.StorageName,
+ RelativePath: gittest.NewRepositoryName(t),
+ }
+
+ // Inject the Gitaly's address information in the context. CreateFork uses this to
+ // fetch from the source repository.
+ ctx = testhelper.MergeOutgoingMetadata(ctx, testcfg.GitalyServersMetadataFromCfg(t, cfg))
+ // Build GitalySSH as CreateFork uses to perform the fetch.
+ testcfg.BuildGitalySSH(t, cfg)
+ createForkResponse, err := repositoryClient.CreateFork(ctx, &gitalypb.CreateForkRequest{
+ Repository: forkRepository,
+ SourceRepository: sourceRepository,
+ })
+ require.NoError(t, err)
+ testhelper.ProtoEqual(t, &gitalypb.CreateForkResponse{}, createForkResponse)
+
+ // Create an object pool from the source repository.
+ objectPool, _, _ := createObjectPool(t, ctx, cfg, sourceRepository)
+
+ // Link the source repository itself to the object pool.
+ linkSourceToObjectPoolResponse, err := objectPoolClient.LinkRepositoryToObjectPool(ctx, &gitalypb.LinkRepositoryToObjectPoolRequest{
+ ObjectPool: objectPool,
+ Repository: sourceRepository,
+ })
+ require.NoError(t, err)
+ testhelper.ProtoEqual(t, &gitalypb.LinkRepositoryToObjectPoolResponse{}, linkSourceToObjectPoolResponse)
+
+ // Link the fork to the object pool.
+ linkForkToObjectPoolResponse, err := objectPoolClient.LinkRepositoryToObjectPool(ctx, &gitalypb.LinkRepositoryToObjectPoolRequest{
+ ObjectPool: objectPool,
+ Repository: forkRepository,
+ })
+ require.NoError(t, err)
+ testhelper.ProtoEqual(t, &gitalypb.LinkRepositoryToObjectPoolResponse{}, linkForkToObjectPoolResponse)
+
+ // Ensure the source is linked to the pool now.
+ getSourceObjectPoolResponse, err := objectPoolClient.GetObjectPool(ctx, &gitalypb.GetObjectPoolRequest{
+ Repository: sourceRepository,
+ })
+ require.NoError(t, err)
+ testhelper.ProtoEqual(t, &gitalypb.GetObjectPoolResponse{
+ ObjectPool: objectPool,
+ }, getSourceObjectPoolResponse)
+
+ // Ensure the fork is linked to the pool now.
+ getForkObjectPoolResponse, err := objectPoolClient.GetObjectPool(ctx, &gitalypb.GetObjectPoolRequest{
+ Repository: forkRepository,
+ })
+ require.NoError(t, err)
+ testhelper.ProtoEqual(t, &gitalypb.GetObjectPoolResponse{
+ ObjectPool: objectPool,
+ }, getForkObjectPoolResponse)
+}
+
func TestLink(t *testing.T) {
t.Parallel()
diff --git a/internal/gitaly/service/objectpool/testhelper_test.go b/internal/gitaly/service/objectpool/testhelper_test.go
index d1a083f45..498199953 100644
--- a/internal/gitaly/service/objectpool/testhelper_test.go
+++ b/internal/gitaly/service/objectpool/testhelper_test.go
@@ -13,6 +13,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service"
hookservice "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/hook"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/repository"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/ssh"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/v16/internal/log"
@@ -66,6 +67,7 @@ func runObjectPoolServer(t *testing.T, cfg config.Cfg, locator storage.Locator,
gitalypb.RegisterObjectPoolServiceServer(srv, NewServer(deps))
gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps))
gitalypb.RegisterRepositoryServiceServer(srv, repository.NewServer(deps))
+ gitalypb.RegisterSSHServiceServer(srv, ssh.NewServer(deps))
}, append(opts, testserver.WithLocator(locator), testserver.WithLogger(logger))...)
}
diff --git a/internal/gitaly/storage/storagemgr/middleware.go b/internal/gitaly/storage/storagemgr/middleware.go
index 290545def..a72bbe88a 100644
--- a/internal/gitaly/storage/storagemgr/middleware.go
+++ b/internal/gitaly/storage/storagemgr/middleware.go
@@ -26,6 +26,9 @@ var (
// ErrQuarantineWithoutSnapshotRelativePath is returned when a request is configured with a quarantine but the snapshot's
// relative path was not sent in a header.
ErrQuarantineWithoutSnapshotRelativePath = errors.New("quarantined request did not contain snapshot relative path")
+ // ErrRepositoriesInDifferentStorages is returned when trying to access two repositories in different storages in the
+ // same transaction.
+ ErrRepositoriesInDifferentStorages = structerr.NewInvalidArgument("additional and target repositories are in different storages")
)
// NonTransactionalRPCs are the RPCs that do not support transactions.
@@ -247,22 +250,36 @@ func transactionalizeRequest(ctx context.Context, logger log.Logger, txRegistry
return transactionalizedRequest{}, err
}
- var alternateRelativePath string
- if additionalRepo, err := methodInfo.AdditionalRepo(req); err != nil {
+ // Object pools need to be placed in the same partition as their members. Below we figure out which repository,
+ // if any, the target repository of the RPC must be partitioned with. We figure this out using two strategies:
+ //
+ // The general case is handled by extracting the additional repository from the RPC, and paritioning the target
+ // repository of the RPC with the additional repository. Many of the ObjectPoolService's RPCs operate on two
+ // repositories. Depending on the RPC, the additional repository is either the object pool itself or a member
+ // of the pool.
+ //
+ // CreateFork is special cased. The fork must partitioned with the source repository in order to successfully
+ // link it with the object pool later. The source repository is not tagged as additional repository in the
+ // CreateForkRequest. If the request is CreateForkRequest, we extract the source repository and partition the
+ // fork with it.
+ var additionalRepo *gitalypb.Repository
+ if additionalRepo, err = methodInfo.AdditionalRepo(req); err != nil {
if !errors.Is(err, protoregistry.ErrRepositoryFieldNotFound) {
return transactionalizedRequest{}, fmt.Errorf("extract additional repository: %w", err)
}
// There was no additional repository.
- } else {
- if additionalRepo.StorageName != targetRepo.StorageName {
- return transactionalizedRequest{}, structerr.NewInvalidArgument("additional and target repositories are in different storages")
- }
+ }
+
+ if req, ok := req.(*gitalypb.CreateForkRequest); ok {
+ additionalRepo = req.GetSourceRepository()
+ }
- alternateRelativePath = additionalRepo.RelativePath
+ if additionalRepo != nil && additionalRepo.StorageName != targetRepo.StorageName {
+ return transactionalizedRequest{}, ErrRepositoriesInDifferentStorages
}
- tx, err := mgr.Begin(ctx, targetRepo.StorageName, targetRepo.RelativePath, alternateRelativePath, methodInfo.Operation == protoregistry.OpAccessor)
+ tx, err := mgr.Begin(ctx, targetRepo.StorageName, targetRepo.RelativePath, additionalRepo.GetRelativePath(), methodInfo.Operation == protoregistry.OpAccessor)
if err != nil {
return transactionalizedRequest{}, fmt.Errorf("begin transaction: %w", err)
}
diff --git a/internal/gitaly/storage/storagemgr/middleware_test.go b/internal/gitaly/storage/storagemgr/middleware_test.go
index 794222874..9196d10b0 100644
--- a/internal/gitaly/storage/storagemgr/middleware_test.go
+++ b/internal/gitaly/storage/storagemgr/middleware_test.go
@@ -36,6 +36,7 @@ type mockRepositoryService struct {
removeRepositoryFunc func(context.Context, *gitalypb.RemoveRepositoryRequest) (*gitalypb.RemoveRepositoryResponse, error)
setCustomHooksFunc func(gitalypb.RepositoryService_SetCustomHooksServer) error
getCustomHooksFunc func(*gitalypb.GetCustomHooksRequest, gitalypb.RepositoryService_GetCustomHooksServer) error
+ createForkFunc func(context.Context, *gitalypb.CreateForkRequest) (*gitalypb.CreateForkResponse, error)
gitalypb.UnimplementedRepositoryServiceServer
}
@@ -59,6 +60,10 @@ func (m mockRepositoryService) GetCustomHooks(req *gitalypb.GetCustomHooksReques
return m.getCustomHooksFunc(req, stream)
}
+func (m mockRepositoryService) CreateFork(ctx context.Context, req *gitalypb.CreateForkRequest) (*gitalypb.CreateForkResponse, error) {
+ return m.createForkFunc(ctx, req)
+}
+
type mockHealthService struct {
checkFunc func(context.Context, *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error)
grpc_health_v1.UnimplementedHealthServer
@@ -396,6 +401,35 @@ messages and behavior by erroring out the requests before they even hit this int
},
expectHandlerInvoked: true,
},
+ {
+ desc: "successful CreateFork request",
+ performRequest: func(t *testing.T, ctx context.Context, cc *grpc.ClientConn) {
+ resp, err := gitalypb.NewRepositoryServiceClient(cc).CreateFork(ctx, &gitalypb.CreateForkRequest{
+ Repository: validAdditionalRepository(),
+ SourceRepository: validRepository(),
+ })
+ require.NoError(t, err)
+ testhelper.ProtoEqual(t, &gitalypb.CreateForkResponse{}, resp)
+ },
+ assertAdditionalRepository: func(t *testing.T, ctx context.Context, actual *gitalypb.Repository) {
+ testhelper.ProtoEqual(t, validRepository(), actual)
+ },
+ expectHandlerInvoked: true,
+ },
+ {
+ desc: "CreateFork fails due to repositories in different storages",
+ performRequest: func(t *testing.T, ctx context.Context, cc *grpc.ClientConn) {
+ sourceRepository := validRepository()
+ sourceRepository.StorageName = "different_storage"
+
+ resp, err := gitalypb.NewRepositoryServiceClient(cc).CreateFork(ctx, &gitalypb.CreateForkRequest{
+ Repository: validAdditionalRepository(),
+ SourceRepository: sourceRepository,
+ })
+ require.Equal(t, status.Error(codes.InvalidArgument, storagemgr.ErrRepositoriesInDifferentStorages.Error()), err)
+ require.Nil(t, resp)
+ },
+ },
} {
t.Run(tc.desc, func(t *testing.T) {
cfg := testcfg.Build(t)
@@ -414,7 +448,7 @@ messages and behavior by erroring out the requests before they even hit this int
handlerInvoked := false
var transactionID storage.TransactionID
- assertHandler := func(ctx context.Context, isMutator bool, repo *gitalypb.Repository) {
+ assertHandler := func(ctx context.Context, shouldBeQuarantined bool, repo *gitalypb.Repository) {
handlerInvoked = true
// The repositories should be equal except for the relative path which
@@ -427,7 +461,7 @@ messages and behavior by erroring out the requests before they even hit this int
expectedRepo.RelativePath = ""
actualRepo.RelativePath = ""
- if isMutator {
+ if shouldBeQuarantined {
// Mutators should have quarantine directory configured.
assert.NotEmpty(t, actualRepo.GitObjectDirectory)
actualRepo.GitObjectDirectory = ""
@@ -476,6 +510,11 @@ messages and behavior by erroring out the requests before they even hit this int
},
})
gitalypb.RegisterRepositoryServiceServer(server, mockRepositoryService{
+ createForkFunc: func(ctx context.Context, req *gitalypb.CreateForkRequest) (*gitalypb.CreateForkResponse, error) {
+ assertHandler(ctx, false, req.GetRepository())
+ tc.assertAdditionalRepository(t, ctx, req.GetSourceRepository())
+ return &gitalypb.CreateForkResponse{}, tc.handlerError
+ },
objectFormatFunc: func(ctx context.Context, req *gitalypb.ObjectFormatRequest) (*gitalypb.ObjectFormatResponse, error) {
assertHandler(ctx, false, req.GetRepository())
return &gitalypb.ObjectFormatResponse{}, tc.handlerError