Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJustin Tobler <jtobler@gitlab.com>2024-01-18 00:58:00 +0300
committerGitLab <noreply@gitlab.com>2024-01-18 00:58:00 +0300
commit8c7ef2ac66421fbdb70025a3ab1ed237e8affa7a (patch)
tree732aa7dcea56261e5023c351fd8d814d84fad17a
parent72faf14d9551b706f46103d665d75099ea8b1b0c (diff)
parent3395de7c96c6770ad34bbd14bf7908d5e34bbe2f (diff)
Merge branch 'smh-create-fork-partitioning' into 'master'
Partition fork with source repository in CreateFork Closes #5762 See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6612 Merged-by: Justin Tobler <jtobler@gitlab.com> Approved-by: Will Chandler <wchandler@gitlab.com> Approved-by: Justin Tobler <jtobler@gitlab.com> Co-authored-by: Sami Hiltunen <shiltunen@gitlab.com>
-rw-r--r--internal/gitaly/service/objectpool/link_test.go67
-rw-r--r--internal/gitaly/service/objectpool/testhelper_test.go2
-rw-r--r--internal/gitaly/storage/storagemgr/middleware.go33
-rw-r--r--internal/gitaly/storage/storagemgr/middleware_test.go43
4 files changed, 135 insertions, 10 deletions
diff --git a/internal/gitaly/service/objectpool/link_test.go b/internal/gitaly/service/objectpool/link_test.go
index cf38812da..a5e705546 100644
--- a/internal/gitaly/service/objectpool/link_test.go
+++ b/internal/gitaly/service/objectpool/link_test.go
@@ -13,10 +13,77 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm"
"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testserver"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
)
+func TestCompleteForkCreationFlow(t *testing.T) {
+ t.Parallel()
+
+ ctx := testhelper.Context(t)
+
+ cfg, sourceRepository, _, _, objectPoolClient := setup(t, ctx, testserver.WithDisablePraefect())
+
+ repositoryClient := gitalypb.NewRepositoryServiceClient(
+ objectPoolClient.(clientWithConn).conn,
+ )
+
+ forkRepository := &gitalypb.Repository{
+ StorageName: sourceRepository.StorageName,
+ RelativePath: gittest.NewRepositoryName(t),
+ }
+
+ // Inject the Gitaly's address information in the context. CreateFork uses this to
+ // fetch from the source repository.
+ ctx = testhelper.MergeOutgoingMetadata(ctx, testcfg.GitalyServersMetadataFromCfg(t, cfg))
+ // Build GitalySSH as CreateFork uses to perform the fetch.
+ testcfg.BuildGitalySSH(t, cfg)
+ createForkResponse, err := repositoryClient.CreateFork(ctx, &gitalypb.CreateForkRequest{
+ Repository: forkRepository,
+ SourceRepository: sourceRepository,
+ })
+ require.NoError(t, err)
+ testhelper.ProtoEqual(t, &gitalypb.CreateForkResponse{}, createForkResponse)
+
+ // Create an object pool from the source repository.
+ objectPool, _, _ := createObjectPool(t, ctx, cfg, sourceRepository)
+
+ // Link the source repository itself to the object pool.
+ linkSourceToObjectPoolResponse, err := objectPoolClient.LinkRepositoryToObjectPool(ctx, &gitalypb.LinkRepositoryToObjectPoolRequest{
+ ObjectPool: objectPool,
+ Repository: sourceRepository,
+ })
+ require.NoError(t, err)
+ testhelper.ProtoEqual(t, &gitalypb.LinkRepositoryToObjectPoolResponse{}, linkSourceToObjectPoolResponse)
+
+ // Link the fork to the object pool.
+ linkForkToObjectPoolResponse, err := objectPoolClient.LinkRepositoryToObjectPool(ctx, &gitalypb.LinkRepositoryToObjectPoolRequest{
+ ObjectPool: objectPool,
+ Repository: forkRepository,
+ })
+ require.NoError(t, err)
+ testhelper.ProtoEqual(t, &gitalypb.LinkRepositoryToObjectPoolResponse{}, linkForkToObjectPoolResponse)
+
+ // Ensure the source is linked to the pool now.
+ getSourceObjectPoolResponse, err := objectPoolClient.GetObjectPool(ctx, &gitalypb.GetObjectPoolRequest{
+ Repository: sourceRepository,
+ })
+ require.NoError(t, err)
+ testhelper.ProtoEqual(t, &gitalypb.GetObjectPoolResponse{
+ ObjectPool: objectPool,
+ }, getSourceObjectPoolResponse)
+
+ // Ensure the fork is linked to the pool now.
+ getForkObjectPoolResponse, err := objectPoolClient.GetObjectPool(ctx, &gitalypb.GetObjectPoolRequest{
+ Repository: forkRepository,
+ })
+ require.NoError(t, err)
+ testhelper.ProtoEqual(t, &gitalypb.GetObjectPoolResponse{
+ ObjectPool: objectPool,
+ }, getForkObjectPoolResponse)
+}
+
func TestLink(t *testing.T) {
t.Parallel()
diff --git a/internal/gitaly/service/objectpool/testhelper_test.go b/internal/gitaly/service/objectpool/testhelper_test.go
index d1a083f45..498199953 100644
--- a/internal/gitaly/service/objectpool/testhelper_test.go
+++ b/internal/gitaly/service/objectpool/testhelper_test.go
@@ -13,6 +13,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service"
hookservice "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/hook"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/repository"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/ssh"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/v16/internal/log"
@@ -66,6 +67,7 @@ func runObjectPoolServer(t *testing.T, cfg config.Cfg, locator storage.Locator,
gitalypb.RegisterObjectPoolServiceServer(srv, NewServer(deps))
gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps))
gitalypb.RegisterRepositoryServiceServer(srv, repository.NewServer(deps))
+ gitalypb.RegisterSSHServiceServer(srv, ssh.NewServer(deps))
}, append(opts, testserver.WithLocator(locator), testserver.WithLogger(logger))...)
}
diff --git a/internal/gitaly/storage/storagemgr/middleware.go b/internal/gitaly/storage/storagemgr/middleware.go
index 290545def..a72bbe88a 100644
--- a/internal/gitaly/storage/storagemgr/middleware.go
+++ b/internal/gitaly/storage/storagemgr/middleware.go
@@ -26,6 +26,9 @@ var (
// ErrQuarantineWithoutSnapshotRelativePath is returned when a request is configured with a quarantine but the snapshot's
// relative path was not sent in a header.
ErrQuarantineWithoutSnapshotRelativePath = errors.New("quarantined request did not contain snapshot relative path")
+ // ErrRepositoriesInDifferentStorages is returned when trying to access two repositories in different storages in the
+ // same transaction.
+ ErrRepositoriesInDifferentStorages = structerr.NewInvalidArgument("additional and target repositories are in different storages")
)
// NonTransactionalRPCs are the RPCs that do not support transactions.
@@ -247,22 +250,36 @@ func transactionalizeRequest(ctx context.Context, logger log.Logger, txRegistry
return transactionalizedRequest{}, err
}
- var alternateRelativePath string
- if additionalRepo, err := methodInfo.AdditionalRepo(req); err != nil {
+ // Object pools need to be placed in the same partition as their members. Below we figure out which repository,
+ // if any, the target repository of the RPC must be partitioned with. We figure this out using two strategies:
+ //
+ // The general case is handled by extracting the additional repository from the RPC, and paritioning the target
+ // repository of the RPC with the additional repository. Many of the ObjectPoolService's RPCs operate on two
+ // repositories. Depending on the RPC, the additional repository is either the object pool itself or a member
+ // of the pool.
+ //
+ // CreateFork is special cased. The fork must partitioned with the source repository in order to successfully
+ // link it with the object pool later. The source repository is not tagged as additional repository in the
+ // CreateForkRequest. If the request is CreateForkRequest, we extract the source repository and partition the
+ // fork with it.
+ var additionalRepo *gitalypb.Repository
+ if additionalRepo, err = methodInfo.AdditionalRepo(req); err != nil {
if !errors.Is(err, protoregistry.ErrRepositoryFieldNotFound) {
return transactionalizedRequest{}, fmt.Errorf("extract additional repository: %w", err)
}
// There was no additional repository.
- } else {
- if additionalRepo.StorageName != targetRepo.StorageName {
- return transactionalizedRequest{}, structerr.NewInvalidArgument("additional and target repositories are in different storages")
- }
+ }
+
+ if req, ok := req.(*gitalypb.CreateForkRequest); ok {
+ additionalRepo = req.GetSourceRepository()
+ }
- alternateRelativePath = additionalRepo.RelativePath
+ if additionalRepo != nil && additionalRepo.StorageName != targetRepo.StorageName {
+ return transactionalizedRequest{}, ErrRepositoriesInDifferentStorages
}
- tx, err := mgr.Begin(ctx, targetRepo.StorageName, targetRepo.RelativePath, alternateRelativePath, methodInfo.Operation == protoregistry.OpAccessor)
+ tx, err := mgr.Begin(ctx, targetRepo.StorageName, targetRepo.RelativePath, additionalRepo.GetRelativePath(), methodInfo.Operation == protoregistry.OpAccessor)
if err != nil {
return transactionalizedRequest{}, fmt.Errorf("begin transaction: %w", err)
}
diff --git a/internal/gitaly/storage/storagemgr/middleware_test.go b/internal/gitaly/storage/storagemgr/middleware_test.go
index 794222874..9196d10b0 100644
--- a/internal/gitaly/storage/storagemgr/middleware_test.go
+++ b/internal/gitaly/storage/storagemgr/middleware_test.go
@@ -36,6 +36,7 @@ type mockRepositoryService struct {
removeRepositoryFunc func(context.Context, *gitalypb.RemoveRepositoryRequest) (*gitalypb.RemoveRepositoryResponse, error)
setCustomHooksFunc func(gitalypb.RepositoryService_SetCustomHooksServer) error
getCustomHooksFunc func(*gitalypb.GetCustomHooksRequest, gitalypb.RepositoryService_GetCustomHooksServer) error
+ createForkFunc func(context.Context, *gitalypb.CreateForkRequest) (*gitalypb.CreateForkResponse, error)
gitalypb.UnimplementedRepositoryServiceServer
}
@@ -59,6 +60,10 @@ func (m mockRepositoryService) GetCustomHooks(req *gitalypb.GetCustomHooksReques
return m.getCustomHooksFunc(req, stream)
}
+func (m mockRepositoryService) CreateFork(ctx context.Context, req *gitalypb.CreateForkRequest) (*gitalypb.CreateForkResponse, error) {
+ return m.createForkFunc(ctx, req)
+}
+
type mockHealthService struct {
checkFunc func(context.Context, *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error)
grpc_health_v1.UnimplementedHealthServer
@@ -396,6 +401,35 @@ messages and behavior by erroring out the requests before they even hit this int
},
expectHandlerInvoked: true,
},
+ {
+ desc: "successful CreateFork request",
+ performRequest: func(t *testing.T, ctx context.Context, cc *grpc.ClientConn) {
+ resp, err := gitalypb.NewRepositoryServiceClient(cc).CreateFork(ctx, &gitalypb.CreateForkRequest{
+ Repository: validAdditionalRepository(),
+ SourceRepository: validRepository(),
+ })
+ require.NoError(t, err)
+ testhelper.ProtoEqual(t, &gitalypb.CreateForkResponse{}, resp)
+ },
+ assertAdditionalRepository: func(t *testing.T, ctx context.Context, actual *gitalypb.Repository) {
+ testhelper.ProtoEqual(t, validRepository(), actual)
+ },
+ expectHandlerInvoked: true,
+ },
+ {
+ desc: "CreateFork fails due to repositories in different storages",
+ performRequest: func(t *testing.T, ctx context.Context, cc *grpc.ClientConn) {
+ sourceRepository := validRepository()
+ sourceRepository.StorageName = "different_storage"
+
+ resp, err := gitalypb.NewRepositoryServiceClient(cc).CreateFork(ctx, &gitalypb.CreateForkRequest{
+ Repository: validAdditionalRepository(),
+ SourceRepository: sourceRepository,
+ })
+ require.Equal(t, status.Error(codes.InvalidArgument, storagemgr.ErrRepositoriesInDifferentStorages.Error()), err)
+ require.Nil(t, resp)
+ },
+ },
} {
t.Run(tc.desc, func(t *testing.T) {
cfg := testcfg.Build(t)
@@ -414,7 +448,7 @@ messages and behavior by erroring out the requests before they even hit this int
handlerInvoked := false
var transactionID storage.TransactionID
- assertHandler := func(ctx context.Context, isMutator bool, repo *gitalypb.Repository) {
+ assertHandler := func(ctx context.Context, shouldBeQuarantined bool, repo *gitalypb.Repository) {
handlerInvoked = true
// The repositories should be equal except for the relative path which
@@ -427,7 +461,7 @@ messages and behavior by erroring out the requests before they even hit this int
expectedRepo.RelativePath = ""
actualRepo.RelativePath = ""
- if isMutator {
+ if shouldBeQuarantined {
// Mutators should have quarantine directory configured.
assert.NotEmpty(t, actualRepo.GitObjectDirectory)
actualRepo.GitObjectDirectory = ""
@@ -476,6 +510,11 @@ messages and behavior by erroring out the requests before they even hit this int
},
})
gitalypb.RegisterRepositoryServiceServer(server, mockRepositoryService{
+ createForkFunc: func(ctx context.Context, req *gitalypb.CreateForkRequest) (*gitalypb.CreateForkResponse, error) {
+ assertHandler(ctx, false, req.GetRepository())
+ tc.assertAdditionalRepository(t, ctx, req.GetSourceRepository())
+ return &gitalypb.CreateForkResponse{}, tc.handlerError
+ },
objectFormatFunc: func(ctx context.Context, req *gitalypb.ObjectFormatRequest) (*gitalypb.ObjectFormatResponse, error) {
assertHandler(ctx, false, req.GetRepository())
return &gitalypb.ObjectFormatResponse{}, tc.handlerError