Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2023-10-31 20:09:54 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2023-12-04 19:26:54 +0300
commiteb631ac01a58d2f404151eb295572a9578cb6b0e (patch)
treebda514032cfebcbd0c511490b58f148caf9036d6
parent07768960e8fc98cdacd581a3dc769623adbfba1a (diff)
Include alternate repo in the snapshot
Some object pool related RPCs carry an alternate repository in the request they access. This repository needs to be included in the transaction's snapshot in order to ensure the RPC can access it as expected. This commit extracts the alternate repository from the request and includes it also in the transaction.
-rw-r--r--internal/gitaly/storage/storagemgr/middleware.go40
-rw-r--r--internal/gitaly/storage/storagemgr/middleware_test.go142
-rw-r--r--internal/gitaly/storage/storagemgr/partition_manager.go13
-rw-r--r--internal/gitaly/storage/storagemgr/partition_manager_test.go109
4 files changed, 282 insertions, 22 deletions
diff --git a/internal/gitaly/storage/storagemgr/middleware.go b/internal/gitaly/storage/storagemgr/middleware.go
index ac36e972c..2dc9eed3b 100644
--- a/internal/gitaly/storage/storagemgr/middleware.go
+++ b/internal/gitaly/storage/storagemgr/middleware.go
@@ -17,7 +17,8 @@ import (
// ErrQuarantineConfiguredOnMutator is returned when a mutator request is received with a quarantine configured.
var ErrQuarantineConfiguredOnMutator = errors.New("quarantine configured on a mutator request")
-var nonTransactionalRPCs = map[string]struct{}{
+// NonTransactionalRPCs are the RPCs that do not support transactions.
+var NonTransactionalRPCs = map[string]struct{}{
// This isn't registered in protoregistry so mark it here as non-transactional.
"/grpc.health.v1.Health/Check": {},
@@ -63,7 +64,7 @@ var nonTransactionalRPCs = map[string]struct{}{
// The transaction is committed if the handler doesn't return an error and rolled back otherwise.
func NewUnaryInterceptor(logger log.Logger, registry *protoregistry.Registry, txRegistry *TransactionRegistry, mgr *PartitionManager, locator storage.Locator) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (_ interface{}, returnedErr error) {
- if _, ok := nonTransactionalRPCs[info.FullMethod]; ok {
+ if _, ok := NonTransactionalRPCs[info.FullMethod]; ok {
return handler(ctx, req)
}
@@ -126,7 +127,7 @@ func (ps *peekedStream) RecvMsg(dst interface{}) error {
// The transaction is committed if the handler doesn't return an error and rolled back otherwise.
func NewStreamInterceptor(logger log.Logger, registry *protoregistry.Registry, txRegistry *TransactionRegistry, mgr *PartitionManager, locator storage.Locator) grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (returnedErr error) {
- if _, ok := nonTransactionalRPCs[info.FullMethod]; ok {
+ if _, ok := NonTransactionalRPCs[info.FullMethod]; ok {
return handler(srv, ss)
}
@@ -195,7 +196,7 @@ func transactionalizeRequest(ctx context.Context, logger log.Logger, txRegistry
return nonTransactionalRequest(ctx, req), nil
}
- repo, err := methodInfo.TargetRepo(req)
+ targetRepo, err := methodInfo.TargetRepo(req)
if err != nil {
if errors.Is(err, protoregistry.ErrRepositoryFieldNotFound) {
// The above error is returned when the repository field is not set in the request.
@@ -211,7 +212,7 @@ func transactionalizeRequest(ctx context.Context, logger log.Logger, txRegistry
return nonTransactionalRequest(ctx, req), nil
}
- if repo.GitObjectDirectory != "" || len(repo.GitAlternateObjectDirectories) > 0 {
+ if targetRepo.GitObjectDirectory != "" || len(targetRepo.GitAlternateObjectDirectories) > 0 {
// The object directories should only be configured on a repository coming from a request that
// was already configured with a quarantine directory and is being looped back to Gitaly from Rails'
// authorization checks. If that's the case, the request should already be running in scope of a
@@ -242,11 +243,26 @@ func transactionalizeRequest(ctx context.Context, logger log.Logger, txRegistry
// return the exact same error messages as some RPCs are testing for at the moment. In order to maintain
// compatibility with said tests, validate the repository here ahead of time and return the possible error
// as is.
- if err := locator.ValidateRepository(repo, storage.WithSkipRepositoryExistenceCheck()); err != nil {
+ if err := locator.ValidateRepository(targetRepo, storage.WithSkipRepositoryExistenceCheck()); err != nil {
return transactionalizedRequest{}, err
}
- tx, err := mgr.Begin(ctx, repo.StorageName, repo.RelativePath, methodInfo.Operation == protoregistry.OpAccessor)
+ var alternateRelativePath string
+ if additionalRepo, err := methodInfo.AdditionalRepo(req); err != nil {
+ if !errors.Is(err, protoregistry.ErrRepositoryFieldNotFound) {
+ return transactionalizedRequest{}, fmt.Errorf("extract additional repository: %w", err)
+ }
+
+ // There was no additional repository.
+ } else {
+ if additionalRepo.StorageName != targetRepo.StorageName {
+ return transactionalizedRequest{}, structerr.NewInvalidArgument("additional and target repositories are in different storages")
+ }
+
+ alternateRelativePath = additionalRepo.RelativePath
+ }
+
+ tx, err := mgr.Begin(ctx, targetRepo.StorageName, targetRepo.RelativePath, alternateRelativePath, methodInfo.Operation == protoregistry.OpAccessor)
if err != nil {
return transactionalizedRequest{}, fmt.Errorf("begin transaction: %w", err)
}
@@ -301,5 +317,15 @@ func rewriteRequest(tx *finalizableTransaction, methodInfo protoregistry.MethodI
*targetRepo = *tx.RewriteRepository(targetRepo)
+ if additionalRepo, err := methodInfo.AdditionalRepo(rewrittenReq); err != nil {
+ if !errors.Is(err, protoregistry.ErrRepositoryFieldNotFound) {
+ return nil, fmt.Errorf("extract additional repository: %w", err)
+ }
+
+ // There was no additional repository.
+ } else {
+ *additionalRepo = *tx.RewriteRepository(additionalRepo)
+ }
+
return rewrittenReq, nil
}
diff --git a/internal/gitaly/storage/storagemgr/middleware_test.go b/internal/gitaly/storage/storagemgr/middleware_test.go
index b125b59bd..c7203d437 100644
--- a/internal/gitaly/storage/storagemgr/middleware_test.go
+++ b/internal/gitaly/storage/storagemgr/middleware_test.go
@@ -65,6 +65,20 @@ func (m mockHealthService) Check(ctx context.Context, req *grpc_health_v1.Health
return m.checkFunc(ctx, req)
}
+type mockObjectPoolService struct {
+ createObjectPoolFunc func(context.Context, *gitalypb.CreateObjectPoolRequest) (*gitalypb.CreateObjectPoolResponse, error)
+ linkRepositoryToObjectPoolFunc func(context.Context, *gitalypb.LinkRepositoryToObjectPoolRequest) (*gitalypb.LinkRepositoryToObjectPoolResponse, error)
+ gitalypb.UnimplementedObjectPoolServiceServer
+}
+
+func (m mockObjectPoolService) CreateObjectPool(ctx context.Context, req *gitalypb.CreateObjectPoolRequest) (*gitalypb.CreateObjectPoolResponse, error) {
+ return m.createObjectPoolFunc(ctx, req)
+}
+
+func (m mockObjectPoolService) LinkRepositoryToObjectPool(ctx context.Context, req *gitalypb.LinkRepositoryToObjectPoolRequest) (*gitalypb.LinkRepositoryToObjectPoolResponse, error) {
+ return m.linkRepositoryToObjectPoolFunc(ctx, req)
+}
+
func TestMiddleware_transactional(t *testing.T) {
if !testhelper.IsWALEnabled() {
t.Skip(`
@@ -77,7 +91,20 @@ be configured.`)
This interceptor is for use with Gitaly. Praefect running in front of it may change error
messages and behavior by erroring out the requests before they even hit this interceptor.`)
- t.Parallel()
+ // This is a temporary workaround until transaction support is enabled the below RPCs. We're
+ // using them to test that the additional repository is properly handled by the middleware
+ // so we need the middleware to not short circuit when it encounters the RPC.
+ //
+ // As the RPCs themselves haven't yet been adapted for transactions, their tests will fail if we handled
+ // them transactionally. For now, make these RPCs transactional within this test only. There's no point
+ // switching to dependency injection with the list of non transactional RPCs as we're soon enabling
+ // transactions for the remaining non-transactional RPCs and remove the list entirely.
+ delete(storagemgr.NonTransactionalRPCs, "/gitaly.ObjectPoolService/CreateObjectPool")
+ delete(storagemgr.NonTransactionalRPCs, "/gitaly.ObjectPoolService/LinkRepositoryToObjectPool")
+ defer func() {
+ storagemgr.NonTransactionalRPCs["/gitaly.ObjectPoolService/CreateObjectPool"] = struct{}{}
+ storagemgr.NonTransactionalRPCs["/gitaly.ObjectPoolService/LinkRepositoryToObjectPool"] = struct{}{}
+ }()
validRepository := func() *gitalypb.Repository {
return &gitalypb.Repository{
@@ -88,16 +115,23 @@ messages and behavior by erroring out the requests before they even hit this int
}
}
+ validAdditionalRepository := func() *gitalypb.Repository {
+ repo := validRepository()
+ repo.RelativePath = "additional-relative-path"
+ return repo
+ }
+
for _, tc := range []struct {
- desc string
- repository *gitalypb.Repository
- performRequest func(*testing.T, context.Context, *grpc.ClientConn)
- handlerError error
- rollbackTransaction bool
- expectHandlerInvoked bool
- expectedRollbackError error
- expectedResponse proto.Message
- expectedError error
+ desc string
+ repository *gitalypb.Repository
+ performRequest func(*testing.T, context.Context, *grpc.ClientConn)
+ assertAdditionalRepository func(*testing.T, context.Context, *gitalypb.Repository)
+ handlerError error
+ rollbackTransaction bool
+ expectHandlerInvoked bool
+ expectedRollbackError error
+ expectedResponse proto.Message
+ expectedError error
}{
{
desc: "missing repository",
@@ -298,6 +332,82 @@ messages and behavior by erroring out the requests before they even hit this int
require.Nil(t, resp)
},
},
+ {
+ desc: "mutator with repository as additional repository",
+ performRequest: func(t *testing.T, ctx context.Context, cc *grpc.ClientConn) {
+ resp, err := gitalypb.NewObjectPoolServiceClient(cc).CreateObjectPool(ctx, &gitalypb.CreateObjectPoolRequest{
+ ObjectPool: &gitalypb.ObjectPool{Repository: validRepository()},
+ Origin: validAdditionalRepository(),
+ })
+ require.NoError(t, err)
+ testhelper.ProtoEqual(t, &gitalypb.CreateObjectPoolResponse{}, resp)
+ },
+ assertAdditionalRepository: func(t *testing.T, ctx context.Context, actual *gitalypb.Repository) {
+ var originalRepo *gitalypb.Repository
+ storagectx.RunWithTransaction(ctx, func(tx storagectx.Transaction) {
+ originalRepo = tx.OriginalRepository(actual)
+ })
+
+ expected := validAdditionalRepository()
+ // The additional repository's relative path should have been rewritten.
+ require.NotEqual(t, expected.RelativePath, actual.RelativePath)
+ // But the restored non-snapshotted repository should match the original.
+ testhelper.ProtoEqual(t, expected, originalRepo)
+ },
+ expectHandlerInvoked: true,
+ },
+ {
+ desc: "mutator without repository as additional repository",
+ performRequest: func(t *testing.T, ctx context.Context, cc *grpc.ClientConn) {
+ resp, err := gitalypb.NewObjectPoolServiceClient(cc).CreateObjectPool(ctx, &gitalypb.CreateObjectPoolRequest{
+ ObjectPool: &gitalypb.ObjectPool{Repository: validRepository()},
+ })
+ require.NoError(t, err)
+ testhelper.ProtoEqual(t, &gitalypb.CreateObjectPoolResponse{}, resp)
+ },
+ assertAdditionalRepository: func(t *testing.T, ctx context.Context, actual *gitalypb.Repository) {
+ assert.Nil(t, actual)
+ },
+ expectHandlerInvoked: true,
+ },
+ {
+ desc: "mutator with object pool as additional repository",
+ performRequest: func(t *testing.T, ctx context.Context, cc *grpc.ClientConn) {
+ resp, err := gitalypb.NewObjectPoolServiceClient(cc).LinkRepositoryToObjectPool(ctx, &gitalypb.LinkRepositoryToObjectPoolRequest{
+ Repository: validRepository(),
+ ObjectPool: &gitalypb.ObjectPool{Repository: validAdditionalRepository()},
+ })
+ require.NoError(t, err)
+ testhelper.ProtoEqual(t, &gitalypb.LinkRepositoryToObjectPoolResponse{}, resp)
+ },
+ assertAdditionalRepository: func(t *testing.T, ctx context.Context, actual *gitalypb.Repository) {
+ var originalRepo *gitalypb.Repository
+ storagectx.RunWithTransaction(ctx, func(tx storagectx.Transaction) {
+ originalRepo = tx.OriginalRepository(actual)
+ })
+
+ expected := validAdditionalRepository()
+ // The additional repository's relative path should have been rewritten.
+ require.NotEqual(t, expected.RelativePath, actual.RelativePath)
+ // But the restored non-snapshotted repository should match the original.
+ testhelper.ProtoEqual(t, expected, originalRepo)
+ },
+ expectHandlerInvoked: true,
+ },
+ {
+ desc: "mutator without object pool as additional repository",
+ performRequest: func(t *testing.T, ctx context.Context, cc *grpc.ClientConn) {
+ resp, err := gitalypb.NewObjectPoolServiceClient(cc).LinkRepositoryToObjectPool(ctx, &gitalypb.LinkRepositoryToObjectPoolRequest{
+ Repository: validRepository(),
+ })
+ require.NoError(t, err)
+ testhelper.ProtoEqual(t, &gitalypb.LinkRepositoryToObjectPoolResponse{}, resp)
+ },
+ assertAdditionalRepository: func(t *testing.T, ctx context.Context, actual *gitalypb.Repository) {
+ assert.Nil(t, actual)
+ },
+ expectHandlerInvoked: true,
+ },
} {
t.Run(tc.desc, func(t *testing.T) {
cfg := testcfg.Build(t)
@@ -365,6 +475,18 @@ messages and behavior by erroring out the requests before they even hit this int
}
serverAddress := testserver.RunGitalyServer(t, cfg, func(server *grpc.Server, deps *service.Dependencies) {
+ gitalypb.RegisterObjectPoolServiceServer(server, mockObjectPoolService{
+ createObjectPoolFunc: func(ctx context.Context, req *gitalypb.CreateObjectPoolRequest) (*gitalypb.CreateObjectPoolResponse, error) {
+ assertHandler(ctx, true, req.GetObjectPool().GetRepository())
+ tc.assertAdditionalRepository(t, ctx, req.GetOrigin())
+ return &gitalypb.CreateObjectPoolResponse{}, tc.handlerError
+ },
+ linkRepositoryToObjectPoolFunc: func(ctx context.Context, req *gitalypb.LinkRepositoryToObjectPoolRequest) (*gitalypb.LinkRepositoryToObjectPoolResponse, error) {
+ assertHandler(ctx, true, req.GetRepository())
+ tc.assertAdditionalRepository(t, ctx, req.GetObjectPool().GetRepository())
+ return &gitalypb.LinkRepositoryToObjectPoolResponse{}, tc.handlerError
+ },
+ })
gitalypb.RegisterRepositoryServiceServer(server, mockRepositoryService{
objectFormatFunc: func(ctx context.Context, req *gitalypb.ObjectFormatRequest) (*gitalypb.ObjectFormatResponse, error) {
assertHandler(ctx, false, req.GetRepository())
diff --git a/internal/gitaly/storage/storagemgr/partition_manager.go b/internal/gitaly/storage/storagemgr/partition_manager.go
index 7a46d44ea..045da1d0a 100644
--- a/internal/gitaly/storage/storagemgr/partition_manager.go
+++ b/internal/gitaly/storage/storagemgr/partition_manager.go
@@ -362,9 +362,11 @@ func stagingDirectoryPath(storagePath string) string {
//
// storageName and relativePath specify the target repository to begin a transaction against.
//
+// alternateRelativePath specifies a repository to include in the transaction's snapshot as well.
+//
// readOnly indicates whether this is a read-only transaction. Read-only transactions are not
// configured with a quarantine directory and do not commit a log entry.
-func (pm *PartitionManager) Begin(ctx context.Context, storageName, relativePath string, readOnly bool) (*finalizableTransaction, error) {
+func (pm *PartitionManager) Begin(ctx context.Context, storageName, relativePath, alternateRelativePath string, readOnly bool) (*finalizableTransaction, error) {
storageMgr, ok := pm.storages[storageName]
if !ok {
return nil, structerr.NewNotFound("unknown storage: %q", storageName)
@@ -375,7 +377,7 @@ func (pm *PartitionManager) Begin(ctx context.Context, storageName, relativePath
return nil, structerr.NewInvalidArgument("validate relative path: %w", err)
}
- partitionID, err := storageMgr.partitionAssigner.getPartitionID(ctx, relativePath, "")
+ partitionID, err := storageMgr.partitionAssigner.getPartitionID(ctx, relativePath, alternateRelativePath)
if err != nil {
if errors.Is(err, badger.ErrDBClosed) {
// The database is closed when PartitionManager is closing. Return a more
@@ -474,7 +476,12 @@ func (pm *PartitionManager) Begin(ctx context.Context, storageName, relativePath
ptn.pendingTransactionCount++
storageMgr.mu.Unlock()
- transaction, err := ptn.transactionManager.Begin(ctx, relativePath, nil, readOnly)
+ var snapshottedRelativePaths []string
+ if alternateRelativePath != "" {
+ snapshottedRelativePaths = []string{alternateRelativePath}
+ }
+
+ transaction, err := ptn.transactionManager.Begin(ctx, relativePath, snapshottedRelativePaths, readOnly)
if err != nil {
// The pending transaction count needs to be decremented since the transaction is no longer
// inflight. A transaction failing does not necessarily mean the transaction manager has
diff --git a/internal/gitaly/storage/storagemgr/partition_manager_test.go b/internal/gitaly/storage/storagemgr/partition_manager_test.go
index 4b951d6b8..4df6cd563 100644
--- a/internal/gitaly/storage/storagemgr/partition_manager_test.go
+++ b/internal/gitaly/storage/storagemgr/partition_manager_test.go
@@ -3,6 +3,7 @@ package storagemgr
import (
"context"
"errors"
+ "fmt"
"io/fs"
"os"
"path/filepath"
@@ -58,6 +59,8 @@ func TestPartitionManager(t *testing.T) {
ctx context.Context
// repo is the repository that the transaction belongs to.
repo storage.Repository
+ // alternateRelativePath is the relative path of the alternate repository.
+ alternateRelativePath string
// expectedState contains the partitions by their storages and their pending transaction count at
// the end of the step.
expectedState map[string]map[partitionID]uint
@@ -652,6 +655,108 @@ func TestPartitionManager(t *testing.T) {
}
},
},
+ {
+ desc: "repository and alternate target the same partition",
+ setup: func(t *testing.T, cfg config.Cfg) setupData {
+ repo := setupRepository(t, cfg, cfg.Storages[0])
+ alternateRepo := setupRepository(t, cfg, cfg.Storages[0])
+
+ return setupData{
+ steps: steps{
+ begin{
+ transactionID: 1,
+ repo: repo,
+ alternateRelativePath: alternateRepo.GetRelativePath(),
+ expectedState: map[string]map[partitionID]uint{
+ "default": {
+ 1: 1,
+ },
+ },
+ },
+ begin{
+ transactionID: 2,
+ repo: repo,
+ expectedState: map[string]map[partitionID]uint{
+ "default": {
+ 1: 2,
+ },
+ },
+ },
+ begin{
+ transactionID: 3,
+ repo: alternateRepo,
+ expectedState: map[string]map[partitionID]uint{
+ "default": {
+ 1: 3,
+ },
+ },
+ },
+ rollback{
+ transactionID: 1,
+ expectedState: map[string]map[partitionID]uint{
+ "default": {
+ 1: 2,
+ },
+ },
+ },
+ rollback{
+ transactionID: 2,
+ expectedState: map[string]map[partitionID]uint{
+ "default": {
+ 1: 1,
+ },
+ },
+ },
+ rollback{
+ transactionID: 3,
+ },
+ },
+ }
+ },
+ },
+ {
+ desc: "beginning transaction on repositories in different partitions fails",
+ setup: func(t *testing.T, cfg config.Cfg) setupData {
+ repo1 := setupRepository(t, cfg, cfg.Storages[0])
+ repo2 := setupRepository(t, cfg, cfg.Storages[0])
+
+ return setupData{
+ steps: steps{
+ begin{
+ transactionID: 1,
+ repo: repo1,
+ expectedState: map[string]map[partitionID]uint{
+ "default": {
+ 1: 1,
+ },
+ },
+ },
+ begin{
+ transactionID: 2,
+ repo: repo2,
+ expectedState: map[string]map[partitionID]uint{
+ "default": {
+ 1: 1,
+ 2: 1,
+ },
+ },
+ },
+ begin{
+ transactionID: 3,
+ repo: repo1,
+ alternateRelativePath: repo2.GetRelativePath(),
+ expectedState: map[string]map[partitionID]uint{
+ "default": {
+ 1: 1,
+ 2: 1,
+ },
+ },
+ expectedError: fmt.Errorf("get partition: %w", errRepositoriesAreInDifferentPartitions),
+ },
+ },
+ }
+ },
+ },
} {
tc := tc
t.Run(tc.desc, func(t *testing.T) {
@@ -721,7 +826,7 @@ func TestPartitionManager(t *testing.T) {
beginCtx = step.ctx
}
- txn, err := partitionManager.Begin(beginCtx, step.repo.GetStorageName(), step.repo.GetRelativePath(), false)
+ txn, err := partitionManager.Begin(beginCtx, step.repo.GetStorageName(), step.repo.GetRelativePath(), step.alternateRelativePath, false)
require.Equal(t, step.expectedError, err)
blockOnPartitionClosing(t, partitionManager)
@@ -916,7 +1021,7 @@ func TestPartitionManager_concurrentClose(t *testing.T) {
require.NoError(t, err)
defer partitionManager.Close()
- tx, err := partitionManager.Begin(ctx, cfg.Storages[0].Name, "relative-path", false)
+ tx, err := partitionManager.Begin(ctx, cfg.Storages[0].Name, "relative-path", "", false)
require.NoError(t, err)
start := make(chan struct{})