Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2023-10-19 18:44:30 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2023-10-19 18:44:30 +0300
commitc7c7764c3d542ae23943d119294718ba1e905cab (patch)
treec5a0743f5999d24b117a13b5408a79277cb6288e
parent40dbb36fb46b294fb0e8443dda37307e3cd21b52 (diff)
parent14bd927d6d4973c1516d1c5c7568131c90606ac4 (diff)
Merge branch 'smh-transaction-middleware' into 'master'
Introduce transaction managing middleware See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6458 Merged-by: Sami Hiltunen <shiltunen@gitlab.com> Approved-by: Will Chandler <wchandler@gitlab.com> Reviewed-by: Will Chandler <wchandler@gitlab.com> Reviewed-by: Sami Hiltunen <shiltunen@gitlab.com>
-rw-r--r--internal/cli/gitaly/serve.go1
-rw-r--r--internal/cli/gitaly/subcmd_hooks_test.go12
-rw-r--r--internal/git/localrepo/refs.go5
-rw-r--r--internal/gitaly/server/auth_test.go5
-rw-r--r--internal/gitaly/server/server.go15
-rw-r--r--internal/gitaly/server/server_factory.go16
-rw-r--r--internal/gitaly/server/server_factory_test.go5
-rw-r--r--internal/gitaly/service/hook/pack_objects.go16
-rw-r--r--internal/gitaly/service/operations/merge_branch_test.go82
-rw-r--r--internal/gitaly/service/operations/rebase_confirmable_test.go6
-rw-r--r--internal/gitaly/service/operations/squash.go8
-rw-r--r--internal/gitaly/service/operations/squash_test.go6
-rw-r--r--internal/gitaly/service/repository/fetch_remote_test.go10
-rw-r--r--internal/gitaly/service/repository/fsck_test.go12
-rw-r--r--internal/gitaly/service/repository/get_custom_hooks_test.go5
-rw-r--r--internal/gitaly/service/repository/objects_size_test.go4
-rw-r--r--internal/gitaly/service/repository/remove.go5
-rw-r--r--internal/gitaly/service/repository/remove_test.go4
-rw-r--r--internal/gitaly/service/repository/replicate_test.go5
-rw-r--r--internal/gitaly/service/repository/restore_repository_test.go12
-rw-r--r--internal/gitaly/service/repository/set_custom_hooks.go13
-rw-r--r--internal/gitaly/service/repository/set_custom_hooks_test.go2
-rw-r--r--internal/gitaly/service/repository/size_test.go3
-rw-r--r--internal/gitaly/service/repository/snapshot_test.go5
-rw-r--r--internal/gitaly/service/smarthttp/cache.go12
-rw-r--r--internal/gitaly/service/smarthttp/inforefs_test.go3
-rw-r--r--internal/gitaly/service/smarthttp/receive_pack_test.go23
-rw-r--r--internal/gitaly/service/ssh/receive_pack_test.go35
-rw-r--r--internal/gitaly/storage/storagemgr/middleware.go313
-rw-r--r--internal/gitaly/storage/storagemgr/middleware_test.go554
-rw-r--r--internal/gitaly/storage/storagemgr/transaction_manager.go9
-rw-r--r--internal/testhelper/testserver/gitaly.go32
32 files changed, 1182 insertions, 56 deletions
diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go
index c9d067fcc..b19091b58 100644
--- a/internal/cli/gitaly/serve.go
+++ b/internal/cli/gitaly/serve.go
@@ -372,6 +372,7 @@ func run(cfg config.Cfg, logger log.Logger) error {
registry,
diskCache,
[]*limithandler.LimiterMiddleware{perRPCLimitHandler, rateLimitHandler},
+ server.TransactionMiddleware{},
)
defer gitalyServerFactory.Stop()
diff --git a/internal/cli/gitaly/subcmd_hooks_test.go b/internal/cli/gitaly/subcmd_hooks_test.go
index 338a72c0e..3e7ee64ec 100644
--- a/internal/cli/gitaly/subcmd_hooks_test.go
+++ b/internal/cli/gitaly/subcmd_hooks_test.go
@@ -16,6 +16,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/setup"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testserver"
@@ -45,6 +46,11 @@ func TestSetHooksSubcommand(t *testing.T) {
configPath := testcfg.WriteTemporaryGitalyConfigFile(t, cfg)
+ expectedMode := umask.Mask(fs.ModePerm)
+ if testhelper.IsWALEnabled() {
+ expectedMode = perm.PrivateDir
+ }
+
for _, tc := range []struct {
desc string
setup func() ([]string, *gitalypb.Repository)
@@ -126,7 +132,7 @@ func TestSetHooksSubcommand(t *testing.T) {
},
hooks: &bytes.Buffer{},
expectedState: testhelper.DirectoryState{
- "custom_hooks/": {Mode: umask.Mask(fs.ModePerm)},
+ "custom_hooks/": {Mode: expectedMode},
},
},
{
@@ -141,7 +147,7 @@ func TestSetHooksSubcommand(t *testing.T) {
},
hooks: testhelper.MustCreateCustomHooksTar(t),
expectedState: testhelper.DirectoryState{
- "custom_hooks/": {Mode: umask.Mask(fs.ModePerm)},
+ "custom_hooks/": {Mode: expectedMode},
"custom_hooks/pre-commit": {Mode: umask.Mask(fs.ModePerm), Content: []byte("pre-commit content")},
"custom_hooks/pre-push": {Mode: umask.Mask(fs.ModePerm), Content: []byte("pre-push content")},
"custom_hooks/pre-receive": {Mode: umask.Mask(fs.ModePerm), Content: []byte("pre-receive content")},
@@ -165,7 +171,7 @@ func TestSetHooksSubcommand(t *testing.T) {
},
hooks: testhelper.MustCreateCustomHooksTar(t),
expectedState: testhelper.DirectoryState{
- "custom_hooks/": {Mode: umask.Mask(fs.ModePerm)},
+ "custom_hooks/": {Mode: expectedMode},
"custom_hooks/pre-commit": {Mode: umask.Mask(fs.ModePerm), Content: []byte("pre-commit content")},
"custom_hooks/pre-push": {Mode: umask.Mask(fs.ModePerm), Content: []byte("pre-push content")},
"custom_hooks/pre-receive": {Mode: umask.Mask(fs.ModePerm), Content: []byte("pre-receive content")},
diff --git a/internal/git/localrepo/refs.go b/internal/git/localrepo/refs.go
index c0be1ebdf..e7f219ee9 100644
--- a/internal/git/localrepo/refs.go
+++ b/internal/git/localrepo/refs.go
@@ -12,6 +12,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/command"
"gitlab.com/gitlab-org/gitaly/v16/internal/git"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagectx"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/v16/internal/safe"
)
@@ -175,6 +176,10 @@ func (repo *Repo) setDefaultBranchWithTransaction(ctx context.Context, txManager
return fmt.Errorf("committing temporary HEAD: %w", err)
}
+ storagectx.RunWithTransaction(ctx, func(tx storagectx.Transaction) {
+ tx.SetDefaultBranch(reference)
+ })
+
return nil
}
diff --git a/internal/gitaly/server/auth_test.go b/internal/gitaly/server/auth_test.go
index 7300a6ef8..befe375e4 100644
--- a/internal/gitaly/server/auth_test.go
+++ b/internal/gitaly/server/auth_test.go
@@ -203,7 +203,7 @@ func runServer(t *testing.T, cfg config.Cfg) string {
limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, setupPerRPCConcurrencyLimiters)
updaterWithHooks := updateref.NewUpdaterWithHooks(cfg, logger, locator, hookManager, gitCmdFactory, catfileCache)
- srv, err := NewGitalyServerFactory(cfg, logger, registry, diskCache, []*limithandler.LimiterMiddleware{limitHandler}).New(false)
+ srv, err := NewGitalyServerFactory(cfg, logger, registry, diskCache, []*limithandler.LimiterMiddleware{limitHandler}, TransactionMiddleware{}).New(true, false)
require.NoError(t, err)
setup.RegisterAll(srv, &service.Dependencies{
@@ -246,7 +246,8 @@ func runSecureServer(t *testing.T, cfg config.Cfg) string {
backchannel.NewRegistry(),
cache.New(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t)),
[]*limithandler.LimiterMiddleware{limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, setupPerRPCConcurrencyLimiters)},
- ).New(true)
+ TransactionMiddleware{},
+ ).New(true, true)
require.NoError(t, err)
healthpb.RegisterHealthServer(srv, health.NewServer())
diff --git a/internal/gitaly/server/server.go b/internal/gitaly/server/server.go
index d8c1ab9e7..2123a9cc5 100644
--- a/internal/gitaly/server/server.go
+++ b/internal/gitaly/server/server.go
@@ -53,7 +53,7 @@ func WithStreamInterceptor(interceptor grpc.StreamServerInterceptor) Option {
}
// New returns a GRPC server instance with a set of interceptors configured.
-func (s *GitalyServerFactory) New(secure bool, opts ...Option) (*grpc.Server, error) {
+func (s *GitalyServerFactory) New(external, secure bool, opts ...Option) (*grpc.Server, error) {
var cfg serverConfig
for _, opt := range opts {
opt(&cfg)
@@ -154,6 +154,19 @@ func (s *GitalyServerFactory) New(secure bool, opts ...Option) (*grpc.Server, er
streamServerInterceptors = append(streamServerInterceptors, cfg.streamInterceptors...)
unaryServerInterceptors = append(unaryServerInterceptors, cfg.unaryInterceptors...)
+ // Only requests coming through the external API need to be ran transactionalized. Only the HookService calls
+ // should arrive through the internal socket. Requests coming from there would already be running in a
+ // transaction as the external request that led to the internal socket call would have been transactionalized
+ // already.
+ if external {
+ if s.txMiddleware.UnaryInterceptor != nil {
+ unaryServerInterceptors = append(unaryServerInterceptors, s.txMiddleware.UnaryInterceptor)
+ }
+ if s.txMiddleware.StreamInterceptor != nil {
+ streamServerInterceptors = append(streamServerInterceptors, s.txMiddleware.StreamInterceptor)
+ }
+ }
+
serverOptions := []grpc.ServerOption{
grpc.StatsHandler(gitalylog.PerRPCLogHandler{
Underlying: &grpcstats.PayloadBytes{},
diff --git a/internal/gitaly/server/server_factory.go b/internal/gitaly/server/server_factory.go
index 075adc9a3..b23ae840a 100644
--- a/internal/gitaly/server/server_factory.go
+++ b/internal/gitaly/server/server_factory.go
@@ -20,6 +20,16 @@ type GitalyServerFactory struct {
logger log.Logger
externalServers []*grpc.Server
internalServers []*grpc.Server
+ txMiddleware TransactionMiddleware
+}
+
+// TransactionMiddleware collects transaction middleware into a single struct that can be
+// provided to enable transactions.
+type TransactionMiddleware struct {
+ // UnaryInterceptor is the unary RPC interceptor that handles transaction logic.
+ UnaryInterceptor grpc.UnaryServerInterceptor
+ // StreamInterceptor is the stream RPC interceptor that handles transaction logic.
+ StreamInterceptor grpc.StreamServerInterceptor
}
// NewGitalyServerFactory allows to create and start secure/insecure 'grpc.Server's.
@@ -29,6 +39,7 @@ func NewGitalyServerFactory(
registry *backchannel.Registry,
cacheInvalidator cache.Invalidator,
limitHandlers []*limithandler.LimiterMiddleware,
+ txMiddleware TransactionMiddleware,
) *GitalyServerFactory {
return &GitalyServerFactory{
cfg: cfg,
@@ -36,6 +47,7 @@ func NewGitalyServerFactory(
registry: registry,
cacheInvalidator: cacheInvalidator,
limitHandlers: limitHandlers,
+ txMiddleware: txMiddleware,
}
}
@@ -77,7 +89,7 @@ func (s *GitalyServerFactory) GracefulStop() {
// CreateExternal creates a new external gRPC server. The external servers are closed
// before the internal servers when gracefully shutting down.
func (s *GitalyServerFactory) CreateExternal(secure bool, opts ...Option) (*grpc.Server, error) {
- server, err := s.New(secure, opts...)
+ server, err := s.New(true, secure, opts...)
if err != nil {
return nil, err
}
@@ -89,7 +101,7 @@ func (s *GitalyServerFactory) CreateExternal(secure bool, opts ...Option) (*grpc
// CreateInternal creates a new internal gRPC server. Internal servers are closed
// after the external ones when gracefully shutting down.
func (s *GitalyServerFactory) CreateInternal(opts ...Option) (*grpc.Server, error) {
- server, err := s.New(false, opts...)
+ server, err := s.New(false, false, opts...)
if err != nil {
return nil, err
}
diff --git a/internal/gitaly/server/server_factory_test.go b/internal/gitaly/server/server_factory_test.go
index 6975d4863..896bf7223 100644
--- a/internal/gitaly/server/server_factory_test.go
+++ b/internal/gitaly/server/server_factory_test.go
@@ -84,6 +84,7 @@ func TestGitalyServerFactory(t *testing.T) {
backchannel.NewRegistry(),
cache.New(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t)),
nil,
+ TransactionMiddleware{},
)
t.Cleanup(sf.Stop)
@@ -104,6 +105,7 @@ func TestGitalyServerFactory(t *testing.T) {
backchannel.NewRegistry(),
cache.New(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t)),
nil,
+ TransactionMiddleware{},
)
t.Cleanup(sf.Stop)
@@ -118,6 +120,7 @@ func TestGitalyServerFactory(t *testing.T) {
backchannel.NewRegistry(),
cache.New(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t)),
nil,
+ TransactionMiddleware{},
)
t.Cleanup(sf.Stop)
@@ -149,6 +152,7 @@ func TestGitalyServerFactory(t *testing.T) {
backchannel.NewRegistry(),
cache.New(cfg, config.NewLocator(cfg), logger),
nil,
+ TransactionMiddleware{},
)
t.Cleanup(sf.Stop)
@@ -184,6 +188,7 @@ func TestGitalyServerFactory_closeOrder(t *testing.T) {
backchannel.NewRegistry(),
cache.New(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t)),
nil,
+ TransactionMiddleware{},
)
defer sf.Stop()
diff --git a/internal/gitaly/service/hook/pack_objects.go b/internal/gitaly/service/hook/pack_objects.go
index 9b513c0a3..14ff7b946 100644
--- a/internal/gitaly/service/hook/pack_objects.go
+++ b/internal/gitaly/service/hook/pack_objects.go
@@ -20,6 +20,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/git"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/pktline"
gitalyhook "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagectx"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper"
"gitlab.com/gitlab-org/gitaly/v16/internal/log"
"gitlab.com/gitlab-org/gitaly/v16/internal/stream"
@@ -44,7 +45,7 @@ var (
)
func (s *server) packObjectsHook(ctx context.Context, req *gitalypb.PackObjectsHookWithSidechannelRequest, args *packObjectsArgs, stdinReader io.Reader, output io.Writer) error {
- cacheKey, stdin, err := s.computeCacheKey(req, stdinReader)
+ cacheKey, stdin, err := s.computeCacheKey(ctx, req, stdinReader)
if err != nil {
return err
}
@@ -113,10 +114,19 @@ func (s *server) packObjectsHook(ctx context.Context, req *gitalypb.PackObjectsH
// computeCacheKey returns the cache key used for caching pack-objects. A cache key is made up of
// both the requested objects and essential parameters that could impact the content of the
// generated packfile. Including any insignificant information could result in a lower cache hit rate.
-func (s *server) computeCacheKey(req *gitalypb.PackObjectsHookWithSidechannelRequest, stdinReader io.Reader) (string, io.ReadCloser, error) {
+func (s *server) computeCacheKey(ctx context.Context, req *gitalypb.PackObjectsHookWithSidechannelRequest, stdinReader io.Reader) (string, io.ReadCloser, error) {
cacheHash := sha256.New()
+
+ repository := req.Repository
+ storagectx.RunWithTransaction(ctx, func(tx storagectx.Transaction) {
+ // The cache uses the requests as the keys. As the request's repository in the RPC handler has been rewritten
+ // to point to the transaction's repository, the handler sees each request as different even if they point to
+ // the same repository. Restore the original request to ensure identical requests get the same key.
+ repository = tx.OriginalRepository(req.Repository)
+ })
+
cacheKeyPrefix, err := protojson.Marshal(&gitalypb.PackObjectsHookWithSidechannelRequest{
- Repository: req.Repository,
+ Repository: repository,
Args: req.Args,
GitProtocol: req.GitProtocol,
})
diff --git a/internal/gitaly/service/operations/merge_branch_test.go b/internal/gitaly/service/operations/merge_branch_test.go
index 049c8fee6..aaf450194 100644
--- a/internal/gitaly/service/operations/merge_branch_test.go
+++ b/internal/gitaly/service/operations/merge_branch_test.go
@@ -762,33 +762,61 @@ func testUserMergeBranchConcurrentUpdate(t *testing.T, ctx context.Context) {
require.NoError(t, stream.Send(&gitalypb.UserMergeBranchRequest{Apply: true}), "apply merge")
require.NoError(t, stream.CloseSend(), "close send")
- secondResponse, err := stream.Recv()
- testhelper.RequireGrpcError(t,
- structerr.NewFailedPrecondition("reference update: reference does not point to expected object").
- WithDetail(&testproto.ErrorMetadata{
- Key: []byte("actual_object_id"),
- Value: []byte(concurrentCommitID),
- }).
- WithDetail(&testproto.ErrorMetadata{
- Key: []byte("expected_object_id"),
- Value: []byte(commits.left),
- }).
- WithDetail(&testproto.ErrorMetadata{
- Key: []byte("reference"),
- Value: []byte("refs/heads/branch"),
- }).
- WithDetail(&gitalypb.UserMergeBranchError{
- Error: &gitalypb.UserMergeBranchError_ReferenceUpdate{
- ReferenceUpdate: &gitalypb.ReferenceUpdateError{
- ReferenceName: []byte("refs/heads/branch"),
- OldOid: commits.left.String(),
- NewOid: firstResponse.CommitId,
+ if testhelper.IsWALEnabled() {
+ // With transaction management enabled, the RPC won't observe concurrent updates in its snapshot.
+ // Only once it commits will the updates be verified against other concurrent updates. We thus have
+ // to receive once more after the successful merge to end the stream and trigger a transaction commit.
+ //
+ // This test case is also expecting a UserMergeBranch specific error with additional detail. Given the
+ // commit errors are universal and happen after the handler, we won't be able to return the same error
+ // here.
+ secondResponse, err := stream.Recv()
+ require.NoError(t, err)
+ testhelper.ProtoEqual(t, &gitalypb.UserMergeBranchResponse{
+ BranchUpdate: &gitalypb.OperationBranchUpdate{
+ CommitId: gittest.ObjectHashDependent(t, map[string]string{
+ "sha1": "5e96169507b47c71ffb719dc3ad93d014f57617b",
+ "sha256": "a1165ec1ffbc14e1ad40dbae4c9ccb6afe6a3a4db1b7a7c5f6aa0c52999c45b1",
+ }),
+ },
+ }, secondResponse)
+
+ thirdResponse, err := stream.Recv()
+ testhelper.RequireGrpcError(t, structerr.NewInternal("%w", fmt.Errorf("commit: %w", fmt.Errorf("verify references: %w", storagemgr.ReferenceVerificationError{
+ ReferenceName: "refs/heads/branch",
+ ExpectedOID: commits.left,
+ ActualOID: concurrentCommitID,
+ }))), err)
+ require.Nil(t, thirdResponse)
+ } else {
+ secondResponse, err := stream.Recv()
+ testhelper.RequireGrpcError(t,
+ structerr.NewFailedPrecondition("reference update: reference does not point to expected object").
+ WithDetail(&testproto.ErrorMetadata{
+ Key: []byte("actual_object_id"),
+ Value: []byte(concurrentCommitID),
+ }).
+ WithDetail(&testproto.ErrorMetadata{
+ Key: []byte("expected_object_id"),
+ Value: []byte(commits.left),
+ }).
+ WithDetail(&testproto.ErrorMetadata{
+ Key: []byte("reference"),
+ Value: []byte("refs/heads/branch"),
+ }).
+ WithDetail(&gitalypb.UserMergeBranchError{
+ Error: &gitalypb.UserMergeBranchError_ReferenceUpdate{
+ ReferenceUpdate: &gitalypb.ReferenceUpdateError{
+ ReferenceName: []byte("refs/heads/branch"),
+ OldOid: commits.left.String(),
+ NewOid: firstResponse.CommitId,
+ },
},
- },
- }),
- err,
- )
- require.Nil(t, secondResponse)
+ }),
+ err,
+ )
+ require.Nil(t, secondResponse)
+ }
commit, err := repo.ReadCommit(ctx, "refs/heads/branch")
require.NoError(t, err, "get commit after RPC finished")
@@ -1033,6 +1061,8 @@ func TestUserMergeBranch_allowed(t *testing.T) {
}
func testUserMergeBranchAllowed(t *testing.T, ctx context.Context) {
+ testhelper.SkipWithWAL(t, "PartitionManager not injected in test setup")
+
t.Parallel()
type expectedData struct {
diff --git a/internal/gitaly/service/operations/rebase_confirmable_test.go b/internal/gitaly/service/operations/rebase_confirmable_test.go
index 71f94db26..5ecb63859 100644
--- a/internal/gitaly/service/operations/rebase_confirmable_test.go
+++ b/internal/gitaly/service/operations/rebase_confirmable_test.go
@@ -610,7 +610,11 @@ func testUserRebaseConfirmablePreReceiveError(t *testing.T, ctx context.Context)
), err)
_, err = localRepo.ReadCommit(ctx, git.Revision(firstResponse.GetRebaseSha()))
- if hookName == "pre-receive" {
+ if testhelper.IsWALEnabled() || hookName == "pre-receive" {
+ // Previously objects were migrated to the repository if 'pre-receive' succeeded
+ // so 'update' hook failing would leave the objects in the repo. With transactions enabled,
+ // no objects end up in the repository unless the transaction is committed. We thus assert
+ // this to be the case always with transactions.
require.Equal(t, localrepo.ErrObjectNotFound, err, "commit should have been discarded")
} else {
require.NoError(t, err)
diff --git a/internal/gitaly/service/operations/squash.go b/internal/gitaly/service/operations/squash.go
index 82b26d5ac..0b06c5030 100644
--- a/internal/gitaly/service/operations/squash.go
+++ b/internal/gitaly/service/operations/squash.go
@@ -9,6 +9,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/git"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagectx"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
"gitlab.com/gitlab-org/gitaly/v16/internal/transaction/voting"
@@ -199,5 +200,12 @@ func (s *Server) userSquash(ctx context.Context, req *gitalypb.UserSquashRequest
return "", structerr.NewAborted("committing vote on squashed commit: %w", err)
}
+ storagectx.RunWithTransaction(ctx, func(tx storagectx.Transaction) {
+ // Only referenced objects get committed as part of a transaction. Since this
+ // RPC doesn't reference the object, we have to manually mark it to be included
+ // in the final committed pack.
+ tx.IncludeObject(git.ObjectID(commitID))
+ })
+
return commitID, nil
}
diff --git a/internal/gitaly/service/operations/squash_test.go b/internal/gitaly/service/operations/squash_test.go
index 3f1267102..1225c1a05 100644
--- a/internal/gitaly/service/operations/squash_test.go
+++ b/internal/gitaly/service/operations/squash_test.go
@@ -177,7 +177,11 @@ func testUserSquashTransactional(t *testing.T, ctx context.Context) {
// Even though the committing vote has failed, we expect objects to have
// been migrated after the preparatory vote. The commit should thus exist in
// the repository.
- expectedExists: true,
+ //
+ // With transactions, the object is written to the repository if the transaction successfully
+ // commits. Vote failure raises an error which causes the transaction to be rolled back. We don't
+ // thus expect to have the object in the repository.
+ expectedExists: !testhelper.IsWALEnabled(),
},
} {
t.Run(tc.desc, func(t *testing.T) {
diff --git a/internal/gitaly/service/repository/fetch_remote_test.go b/internal/gitaly/service/repository/fetch_remote_test.go
index 56f6c1f19..c62faf395 100644
--- a/internal/gitaly/service/repository/fetch_remote_test.go
+++ b/internal/gitaly/service/repository/fetch_remote_test.go
@@ -41,6 +41,13 @@ func gitRequestValidation(w http.ResponseWriter, r *http.Request, next http.Hand
}
func TestFetchRemote(t *testing.T) {
+ testhelper.SkipWithWAL(t, `
+Reference transaction hook is disabled in this RPC due to the reason commented in the
+RPC handler body. For now, we'll wait for the RPC to be updated to do the reference updates
+manually using update-ref with the fetch being just a dry-run.
+
+Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/3780`)
+
t.Parallel()
ctx := testhelper.Context(t)
@@ -1047,6 +1054,9 @@ func TestFetchRemote_transaction(t *testing.T) {
}
func TestFetchRemote_pooledRepository(t *testing.T) {
+ testhelper.SkipWithWAL(t, `
+Object pools are not yet supported with transaction management.`)
+
t.Parallel()
ctx := testhelper.Context(t)
diff --git a/internal/gitaly/service/repository/fsck_test.go b/internal/gitaly/service/repository/fsck_test.go
index 0d78d1327..93156fbaf 100644
--- a/internal/gitaly/service/repository/fsck_test.go
+++ b/internal/gitaly/service/repository/fsck_test.go
@@ -1,6 +1,7 @@
package repository
import (
+ "fmt"
"os"
"path/filepath"
"strings"
@@ -13,6 +14,8 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
)
func TestFsck(t *testing.T) {
@@ -77,6 +80,15 @@ func TestFsck(t *testing.T) {
require.NoError(t, os.RemoveAll(filepath.Join(repoPath, "objects")))
require.NoError(t, os.WriteFile(filepath.Join(repoPath, "objects"), nil, perm.SharedFile))
+ if testhelper.IsWALEnabled() {
+ return setupData{
+ repo: repo,
+ expectedErr: status.Error(codes.Internal,
+ fmt.Sprintf("begin transaction: get partition: assign partition ID: get alternate partition ID: read alternates file: open: open %s/objects/info/alternates: not a directory", repoPath),
+ ),
+ }
+ }
+
return setupData{
repo: repo,
requireResponse: func(actual *gitalypb.FsckResponse) {
diff --git a/internal/gitaly/service/repository/get_custom_hooks_test.go b/internal/gitaly/service/repository/get_custom_hooks_test.go
index 6e329b001..e9157c597 100644
--- a/internal/gitaly/service/repository/get_custom_hooks_test.go
+++ b/internal/gitaly/service/repository/get_custom_hooks_test.go
@@ -87,6 +87,11 @@ func TestGetCustomHooks_successful(t *testing.T) {
}
func TestGetCustomHooks_symlink(t *testing.T) {
+ testhelper.SkipWithWAL(t, `
+The repositories generally shouldn't have symlinks in them and the TransactionManager never writes any
+symlinks. Symlinks are not supported when creating a snapshot of the repository. Disable the test as it
+doesn't seem to test a realistic scenario.`)
+
t.Parallel()
for _, tc := range []struct {
diff --git a/internal/gitaly/service/repository/objects_size_test.go b/internal/gitaly/service/repository/objects_size_test.go
index fd4ec9cc3..6a441985e 100644
--- a/internal/gitaly/service/repository/objects_size_test.go
+++ b/internal/gitaly/service/repository/objects_size_test.go
@@ -335,6 +335,8 @@ func testObjectsSize(t *testing.T, ctx context.Context) {
{
desc: "unique objects in an object deduplication network",
setup: func(t *testing.T) setupData {
+ testhelper.SkipWithWAL(t, `
+Object pools are not yet supported with transaction management.`)
repo, repoPath := gittest.CreateRepository(t, ctx, cfg)
dedupBlob := gittest.WriteBlob(t, cfg, repoPath, []byte("1234"))
dedupTree := gittest.WriteTree(t, cfg, repoPath, []gittest.TreeEntry{
@@ -379,6 +381,8 @@ func testObjectsSize(t *testing.T, ctx context.Context) {
{
desc: "deduplicated objects in an object deduplication network",
setup: func(t *testing.T) setupData {
+ testhelper.SkipWithWAL(t, `
+Object pools are not yet supported with transaction management.`)
repo, repoPath := gittest.CreateRepository(t, ctx, cfg)
dedupBlob := gittest.WriteBlob(t, cfg, repoPath, []byte("1234"))
dedupTree := gittest.WriteTree(t, cfg, repoPath, []gittest.TreeEntry{
diff --git a/internal/gitaly/service/repository/remove.go b/internal/gitaly/service/repository/remove.go
index e1281a2f6..dc9318668 100644
--- a/internal/gitaly/service/repository/remove.go
+++ b/internal/gitaly/service/repository/remove.go
@@ -5,6 +5,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/repoutil"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagectx"
"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
)
@@ -19,5 +20,9 @@ func (s *server) RemoveRepository(ctx context.Context, in *gitalypb.RemoveReposi
return nil, err
}
+ storagectx.RunWithTransaction(ctx, func(tx storagectx.Transaction) {
+ tx.DeleteRepository()
+ })
+
return &gitalypb.RemoveRepositoryResponse{}, nil
}
diff --git a/internal/gitaly/service/repository/remove_test.go b/internal/gitaly/service/repository/remove_test.go
index b49ad7c4a..60deba09c 100644
--- a/internal/gitaly/service/repository/remove_test.go
+++ b/internal/gitaly/service/repository/remove_test.go
@@ -55,6 +55,10 @@ func TestRemoveRepository_validate(t *testing.T) {
}
func TestRemoveRepository_locking(t *testing.T) {
+ testhelper.SkipWithWAL(t, `
+Repository locks are not acquired with transaction management enabled. The test and the locking
+logic will be removed once transaction managements is always enabled.`)
+
t.Parallel()
ctx := testhelper.Context(t)
diff --git a/internal/gitaly/service/repository/replicate_test.go b/internal/gitaly/service/repository/replicate_test.go
index 7d17196be..fde02ce8e 100644
--- a/internal/gitaly/service/repository/replicate_test.go
+++ b/internal/gitaly/service/repository/replicate_test.go
@@ -42,6 +42,11 @@ import (
)
func TestReplicateRepository(t *testing.T) {
+ testhelper.SkipWithWAL(t, `
+ReplicateRepository is replicating git attributes as a separate file. WAL doesn't
+support this as the separate attributes file is going to be replaced with reading the
+attributes from HEAD.`)
+
t.Parallel()
testhelper.NewFeatureSets(
featureflag.ReplicateRepositoryObjectPool,
diff --git a/internal/gitaly/service/repository/restore_repository_test.go b/internal/gitaly/service/repository/restore_repository_test.go
index a8b50c92d..ce56ec877 100644
--- a/internal/gitaly/service/repository/restore_repository_test.go
+++ b/internal/gitaly/service/repository/restore_repository_test.go
@@ -159,10 +159,14 @@ func TestRestoreRepository(t *testing.T) {
backupID: "abc123",
}
},
- expectedErr: structerr.NewInvalidArgument(testhelper.GitalyOrPraefect(
- "restore repository: repository: repository not set",
- "repository not set",
- )),
+ expectedErr: func() error {
+ errorMessage := "restore repository: repository: repository not set"
+ if testhelper.IsPraefectEnabled() || testhelper.IsWALEnabled() {
+ errorMessage = "repository not set"
+ }
+
+ return structerr.NewInvalidArgument(errorMessage)
+ }(),
},
{
desc: "missing backup sink",
diff --git a/internal/gitaly/service/repository/set_custom_hooks.go b/internal/gitaly/service/repository/set_custom_hooks.go
index 482c72ac4..5368d00be 100644
--- a/internal/gitaly/service/repository/set_custom_hooks.go
+++ b/internal/gitaly/service/repository/set_custom_hooks.go
@@ -1,7 +1,11 @@
package repository
import (
+ "bytes"
+ "io"
+
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/repoutil"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagectx"
"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
"gitlab.com/gitlab-org/gitaly/v16/streamio"
@@ -23,7 +27,8 @@ func (s *server) SetCustomHooks(stream gitalypb.RepositoryService_SetCustomHooks
return structerr.NewInvalidArgument("%w", err)
}
- reader := streamio.NewReader(func() ([]byte, error) {
+ var customHooksTAR bytes.Buffer
+ reader := io.TeeReader(streamio.NewReader(func() ([]byte, error) {
if firstRequest != nil {
data := firstRequest.GetData()
firstRequest = nil
@@ -32,12 +37,16 @@ func (s *server) SetCustomHooks(stream gitalypb.RepositoryService_SetCustomHooks
request, err := stream.Recv()
return request.GetData(), err
- })
+ }), &customHooksTAR)
if err := repoutil.SetCustomHooks(ctx, s.logger, s.locator, s.txManager, reader, repo); err != nil {
return structerr.NewInternal("setting custom hooks: %w", err)
}
+ storagectx.RunWithTransaction(ctx, func(tx storagectx.Transaction) {
+ tx.SetCustomHooks(customHooksTAR.Bytes())
+ })
+
return stream.SendAndClose(&gitalypb.SetCustomHooksResponse{})
}
diff --git a/internal/gitaly/service/repository/set_custom_hooks_test.go b/internal/gitaly/service/repository/set_custom_hooks_test.go
index 310591062..823762046 100644
--- a/internal/gitaly/service/repository/set_custom_hooks_test.go
+++ b/internal/gitaly/service/repository/set_custom_hooks_test.go
@@ -64,6 +64,8 @@ func TestSetCustomHooksRequest_success(t *testing.T) {
{
desc: "RestoreCustomHooks",
streamWriter: func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, client gitalypb.RepositoryServiceClient) (io.Writer, func()) {
+ testhelper.SkipWithWAL(t, "This RPC is deprecated and pending removal")
+
//nolint:staticcheck
stream, err := client.RestoreCustomHooks(ctx)
require.NoError(t, err)
diff --git a/internal/gitaly/service/repository/size_test.go b/internal/gitaly/service/repository/size_test.go
index f41cb2186..0ed9b7516 100644
--- a/internal/gitaly/service/repository/size_test.go
+++ b/internal/gitaly/service/repository/size_test.go
@@ -22,6 +22,9 @@ import (
)
func TestRepositorySize_poolMember(t *testing.T) {
+ testhelper.SkipWithWAL(t, `
+Object pools are not yet supported with transaction management.`)
+
t.Parallel()
testhelper.NewFeatureSets(featureflag.TransactionalLinkRepository).Run(t, testRepositorySizePoolMember)
}
diff --git a/internal/gitaly/service/repository/snapshot_test.go b/internal/gitaly/service/repository/snapshot_test.go
index a051a9a77..35d969c26 100644
--- a/internal/gitaly/service/repository/snapshot_test.go
+++ b/internal/gitaly/service/repository/snapshot_test.go
@@ -100,6 +100,11 @@ func TestGetSnapshot(t *testing.T) {
{
desc: "repository contains symlink",
setup: func(t *testing.T) setupData {
+ testhelper.SkipWithWAL(t, `
+The repositories generally shouldn't have symlinks in them and the TransactionManager never writes any
+symlinks. Symlinks are not supported when creating a snapshot of the repository. Disable the test as it
+doesn't seem to test a realistic scenario.`)
+
repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg)
// Make packed-refs into a symlink so the RPC returns an error.
diff --git a/internal/gitaly/service/smarthttp/cache.go b/internal/gitaly/service/smarthttp/cache.go
index 6d8113bfc..df51f9a17 100644
--- a/internal/gitaly/service/smarthttp/cache.go
+++ b/internal/gitaly/service/smarthttp/cache.go
@@ -8,9 +8,11 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"gitlab.com/gitlab-org/gitaly/v16/internal/cache"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagectx"
"gitlab.com/gitlab-org/gitaly/v16/internal/log"
"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
+ "google.golang.org/protobuf/proto"
)
type infoRefCache struct {
@@ -57,7 +59,15 @@ func (c infoRefCache) tryCache(ctx context.Context, in *gitalypb.InfoRefsRequest
c.logger.DebugContext(ctx, "Attempting to fetch cached response")
countAttempt()
- stream, err := c.streamer.GetStream(ctx, in.GetRepository(), in)
+ storagectx.RunWithTransaction(ctx, func(tx storagectx.Transaction) {
+ // The cache uses the requests as the keys. As the request's repository in the RPC handler has been rewritten
+ // to point to the transaction's repository, the handler sees each request as different even if they point to
+ // the same repository. Restore the original request to ensure identical requests get the same key.
+ in = proto.Clone(in).(*gitalypb.InfoRefsRequest)
+ in.Repository = tx.OriginalRepository(in.Repository)
+ })
+
+ stream, err := c.streamer.GetStream(ctx, in.Repository, in)
switch err {
case nil:
defer stream.Close()
diff --git a/internal/gitaly/service/smarthttp/inforefs_test.go b/internal/gitaly/service/smarthttp/inforefs_test.go
index 259cc0f91..a7a096247 100644
--- a/internal/gitaly/service/smarthttp/inforefs_test.go
+++ b/internal/gitaly/service/smarthttp/inforefs_test.go
@@ -318,6 +318,9 @@ func testInfoRefsReceivePackSuccessful(t *testing.T, ctx context.Context) {
}
func TestInfoRefsReceivePack_hiddenRefs(t *testing.T) {
+ testhelper.SkipWithWAL(t, `
+Object pools are not yet support with WAL. This test is testing with a pooled repository.`)
+
t.Parallel()
testhelper.NewFeatureSets(
featureflag.TransactionalLinkRepository,
diff --git a/internal/gitaly/service/smarthttp/receive_pack_test.go b/internal/gitaly/service/smarthttp/receive_pack_test.go
index 8724d3f97..2e503ae70 100644
--- a/internal/gitaly/service/smarthttp/receive_pack_test.go
+++ b/internal/gitaly/service/smarthttp/receive_pack_test.go
@@ -94,7 +94,22 @@ func TestPostReceivePack_successful(t *testing.T) {
// Compare the repository up front so that we can use require.Equal for
// the remaining values.
- testhelper.ProtoEqual(t, gittest.RewrittenRepository(t, ctx, cfg, repo), payload.Repo)
+ expectedRepo := gittest.RewrittenRepository(t, ctx, cfg, repo)
+ if testhelper.IsWALEnabled() {
+ // The repository should be quarantined.
+ require.NotEmpty(t, payload.Repo.GitObjectDirectory)
+ payload.Repo.GitObjectDirectory = "OVERRIDDEN"
+ expectedRepo.GitObjectDirectory = "OVERRIDDEN"
+ require.NotEmpty(t, payload.Repo.GitAlternateObjectDirectories)
+ payload.Repo.GitAlternateObjectDirectories = []string{"OVERRIDDEN"}
+ expectedRepo.GitAlternateObjectDirectories = []string{"OVERRIDDEN"}
+ // The following values may change so we don't want to assert them for equality.
+ require.NotEmpty(t, payload.Repo.RelativePath)
+ payload.Repo.RelativePath = "OVERRIDDEN"
+ expectedRepo.RelativePath = "OVERRIDDEN"
+ }
+
+ testhelper.ProtoEqual(t, expectedRepo, payload.Repo)
payload.Repo = nil
// If running tests with Praefect, then the transaction would be set, but we have no way of
@@ -113,6 +128,11 @@ func TestPostReceivePack_successful(t *testing.T) {
require.ElementsMatch(t, expectedFeatureFlags, payload.FeatureFlagsWithValue)
payload.FeatureFlagsWithValue = nil
+ var transactionID storage.TransactionID
+ if testhelper.IsWALEnabled() {
+ transactionID = 1
+ }
+
require.Equal(t, git.HooksPayload{
ObjectFormat: gittest.DefaultObjectHash.Format,
RuntimeDir: cfg.RuntimeDir,
@@ -124,6 +144,7 @@ func TestPostReceivePack_successful(t *testing.T) {
Protocol: "http",
},
RequestedHooks: git.ReceivePackHooks,
+ TransactionID: transactionID,
}, payload)
}
diff --git a/internal/gitaly/service/ssh/receive_pack_test.go b/internal/gitaly/service/ssh/receive_pack_test.go
index 3e461b0ad..83ddb0bb4 100644
--- a/internal/gitaly/service/ssh/receive_pack_test.go
+++ b/internal/gitaly/service/ssh/receive_pack_test.go
@@ -159,12 +159,26 @@ func TestReceivePack_success(t *testing.T) {
// Compare the repository up front so that we can use require.Equal for
// the remaining values.
- testhelper.ProtoEqual(t, &gitalypb.Repository{
- StorageName: cfg.Storages[0].Name,
- RelativePath: gittest.GetReplicaPath(t, ctx, cfg, remoteRepo),
- GlProjectPath: remoteRepo.GlProjectPath,
- GlRepository: remoteRepo.GlRepository,
- }, payload.Repo)
+ //
+ expectedRepo := gittest.RewrittenRepository(t, ctx, cfg, remoteRepo)
+ // The repository should have a relative path.
+ if testhelper.IsWALEnabled() {
+ // The repository should be quarantined.
+ require.NotEmpty(t, payload.Repo.GitObjectDirectory)
+ payload.Repo.GitObjectDirectory = "OVERRIDDEN"
+ expectedRepo.GitObjectDirectory = "OVERRIDDEN"
+ require.NotEmpty(t, payload.Repo.GitAlternateObjectDirectories)
+ payload.Repo.GitAlternateObjectDirectories = []string{"OVERRIDDEN"}
+ expectedRepo.GitAlternateObjectDirectories = []string{"OVERRIDDEN"}
+ // The following values may change so we don't want to assert them for equality.
+ require.NotEmpty(t, payload.Repo.RelativePath)
+ payload.Repo.RelativePath = "OVERRIDDEN"
+ expectedRepo.RelativePath = "OVERRIDDEN"
+ }
+
+ // Compare the repository up front so that we can use require.Equal for
+ // the remaining values.
+ testhelper.ProtoEqual(t, expectedRepo, payload.Repo)
payload.Repo = nil
// If running tests with Praefect, then the transaction would be set, but we have no way of
@@ -183,6 +197,11 @@ func TestReceivePack_success(t *testing.T) {
require.ElementsMatch(t, expectedFeatureFlags, payload.FeatureFlagsWithValue)
payload.FeatureFlagsWithValue = nil
+ var transactionID storage.TransactionID
+ if testhelper.IsWALEnabled() {
+ transactionID = 2
+ }
+
require.Equal(t, git.HooksPayload{
ObjectFormat: gittest.DefaultObjectHash.Format,
RuntimeDir: cfg.RuntimeDir,
@@ -194,6 +213,7 @@ func TestReceivePack_success(t *testing.T) {
Protocol: "ssh",
},
RequestedHooks: git.ReceivePackHooks,
+ TransactionID: transactionID,
}, payload)
}
@@ -401,6 +421,9 @@ func TestReceivePack_customHookFailure(t *testing.T) {
}
func TestReceivePack_hidesObjectPoolReferences(t *testing.T) {
+ testhelper.SkipWithWAL(t, `
+Object pools are not yet support with WAL. This test is testing with a pooled repository.`)
+
t.Parallel()
testhelper.NewFeatureSets(featureflag.TransactionalLinkRepository).Run(t, testReceivePackHidesObjectPoolReferences)
}
diff --git a/internal/gitaly/storage/storagemgr/middleware.go b/internal/gitaly/storage/storagemgr/middleware.go
new file mode 100644
index 000000000..1ed793387
--- /dev/null
+++ b/internal/gitaly/storage/storagemgr/middleware.go
@@ -0,0 +1,313 @@
+package storagemgr
+
+import (
+ "context"
+ "errors"
+ "fmt"
+
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagectx"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/log"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
+ "google.golang.org/grpc"
+ "google.golang.org/protobuf/proto"
+)
+
+// ErrQuarantineConfiguredOnMutator is returned when a mutator request is received with a quarantine configured.
+var ErrQuarantineConfiguredOnMutator = errors.New("quarantine configured on a mutator request")
+
+var nonTransactionalRPCs = map[string]struct{}{
+ // This isn't registered in protoregistry so mark it here as non-transactional.
+ "/grpc.health.v1.Health/Check": {},
+
+ // These are missing annotations. We don't have a suitable scope for them
+ // so mark these as non-transactional here.
+ "/gitaly.ServerService/DiskStatistics": {},
+ "/gitaly.ServerService/ServerInfo": {},
+ "/gitaly.ServerService/ClockSynced": {},
+ "/gitaly.ServerService/ReadinessCheck": {},
+
+ // Object pools are not yet supported with WAL.
+ "/gitaly.ObjectPoolService/CreateObjectPool": {},
+ "/gitaly.ObjectPoolService/DeleteObjectPool": {},
+ "/gitaly.ObjectPoolService/LinkRepositoryToObjectPool": {},
+ "/gitaly.ObjectPoolService/DisconnectGitAlternates": {},
+ "/gitaly.ObjectPoolService/FetchIntoObjectPool": {},
+ "/gitaly.ObjectPoolService/GetObjectPool": {},
+ // GetSnapshot is testing logic with object pools as well.
+ "/gitaly.RepositoryService/GetSnapshot": {},
+
+ // Repository creations are not yet handled through the WAL.
+ "/gitaly.RepositoryService/CreateRepository": {},
+ "/gitaly.RepositoryService/CreateRepositoryFromURL": {},
+ "/gitaly.RepositoryService/CreateRepositoryFromBundle": {},
+ "/gitaly.RepositoryService/CreateFork": {},
+ "/gitaly.RepositoryService/CreateRepositoryFromSnapshot": {},
+
+ // ReplicateRepository is replicating the attributes and config which the
+ // WAL won't support. This is pending removal of their replication.
+ //
+ // ReplicateRepository may also create a repository which is not yet supported
+ // through the WAL.
+ "/gitaly.RepositoryService/ReplicateRepository": {},
+
+ // Below RPCs implement functionality which isn't going to be supported by WAL.
+ // Handle these as non-transactional. Their usage must be removed prior to enabling WAL.
+ //
+ // Attributes are going to be read from HEAD. Writing out a separate attributes file
+ // won't be supported.
+ "/gitaly.RepositoryService/ApplyGitattributes": {},
+ // SetFullPath writes the full path into git config and is the last RPC that writes into the
+ // git config. Writing into the config won't be supported.
+ "/gitaly.RepositoryService/SetFullPath": {},
+ // RenameRepository is pending removal as we need stable IDs for repositories. Renaming won't
+ // be supported with WAL.
+ "/gitaly.RepositoryService/RenameRepository": {},
+}
+
+// NewUnaryInterceptor returns an unary interceptor that manages a unary RPC's transaction. It starts a transaction
+// on the target repository of the request and rewrites the request to point to the transaction's snapshot repository.
+// The transaction is committed if the handler doesn't return an error and rolled back otherwise.
+func NewUnaryInterceptor(logger log.Logger, registry *protoregistry.Registry, txRegistry *TransactionRegistry, mgr *PartitionManager, locator storage.Locator) grpc.UnaryServerInterceptor {
+ return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (_ interface{}, returnedErr error) {
+ if _, ok := nonTransactionalRPCs[info.FullMethod]; ok {
+ return handler(ctx, req)
+ }
+
+ methodInfo, err := registry.LookupMethod(info.FullMethod)
+ if err != nil {
+ return nil, fmt.Errorf("lookup method: %w", err)
+ }
+
+ txReq, err := transactionalizeRequest(ctx, logger, txRegistry, mgr, locator, methodInfo, req.(proto.Message))
+ if err != nil {
+ return nil, err
+ }
+ defer func() { returnedErr = txReq.finishTransaction(returnedErr) }()
+
+ return handler(txReq.ctx, txReq.firstMessage)
+ }
+}
+
+// peekedStream allows for peeking the first message of ServerStream. Reading the first message would leave
+// handler unable to read the first message as it was already consumed. peekedStream allows for restoring the
+// stream so the RPC handler can read the first message as usual. It additionally supports overriding the
+// context of the stream.
+type peekedStream struct {
+ context context.Context
+ firstMessage proto.Message
+ firstError error
+ grpc.ServerStream
+}
+
+func (ps *peekedStream) Context() context.Context {
+ return ps.context
+}
+
+func (ps *peekedStream) RecvMsg(dst interface{}) error {
+ if ps.firstError != nil {
+ firstError := ps.firstError
+ ps.firstError = nil
+ return firstError
+ }
+
+ if ps.firstMessage != nil {
+ marshaled, err := proto.Marshal(ps.firstMessage)
+ if err != nil {
+ return fmt.Errorf("marshal: %w", err)
+ }
+
+ if err := proto.Unmarshal(marshaled, dst.(proto.Message)); err != nil {
+ return fmt.Errorf("unmarshal: %w", err)
+ }
+
+ ps.firstMessage = nil
+ return nil
+ }
+
+ return ps.ServerStream.RecvMsg(dst)
+}
+
+// NewStreamInterceptor returns a stream interceptor that manages a streaming RPC's transaction. It starts a transaction
+// on the target repository of the first request and rewrites the request to point to the transaction's snapshot repository.
+// The transaction is committed if the handler doesn't return an error and rolled back otherwise.
+func NewStreamInterceptor(logger log.Logger, registry *protoregistry.Registry, txRegistry *TransactionRegistry, mgr *PartitionManager, locator storage.Locator) grpc.StreamServerInterceptor {
+ return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (returnedErr error) {
+ if _, ok := nonTransactionalRPCs[info.FullMethod]; ok {
+ return handler(srv, ss)
+ }
+
+ methodInfo, err := registry.LookupMethod(info.FullMethod)
+ if err != nil {
+ return fmt.Errorf("lookup method: %w", err)
+ }
+
+ req := methodInfo.NewRequest()
+ if err := ss.RecvMsg(req); err != nil {
+ // All of the repository scoped streaming RPCs send the repository in the first message.
+ // Generally it should be fine to error out in all cases if there is no message sent.
+ // To maintain compatibility with tests, we instead invoke the handler to let them return
+ // the asserted error messages. Once the transaction management is on by default, we should
+ // error out here directly and amend the failing test cases.
+ return handler(srv, &peekedStream{
+ context: ss.Context(),
+ firstError: err,
+ ServerStream: ss,
+ })
+ }
+
+ txReq, err := transactionalizeRequest(ss.Context(), logger, txRegistry, mgr, locator, methodInfo, req)
+ if err != nil {
+ return err
+ }
+ defer func() { returnedErr = txReq.finishTransaction(returnedErr) }()
+
+ return handler(srv, &peekedStream{
+ context: txReq.ctx,
+ firstMessage: txReq.firstMessage,
+ ServerStream: ss,
+ })
+ }
+}
+
+// transactionalizedRequest contains the context and the first request to pass into the RPC handler to
+// run it correctly against the transaction.
+type transactionalizedRequest struct {
+ // ctx is the request's context with the transaction added into it.
+ ctx context.Context
+ // firstMessage is the message to pass to the RPC as the first message. The target repository
+ // in it has been rewritten to point to the snapshot repository.
+ firstMessage proto.Message
+ // finishTransaction takes in the error returned from the handler and returns the error
+ // that should be returned to the client. If the handler error is nil, the transaction is committed.
+ // If the handler error is not nil, the transaction is rolled back.
+ finishTransaction func(error) error
+}
+
+// nonTransactionalRequest returns a no-op transactionalizedRequest that configures the RPC handler to be
+// run as normal without a transaction.
+func nonTransactionalRequest(ctx context.Context, firstMessage proto.Message) transactionalizedRequest {
+ return transactionalizedRequest{
+ ctx: ctx,
+ firstMessage: firstMessage,
+ finishTransaction: func(err error) error { return err },
+ }
+}
+
+// transactionalizeRequest begins a transaction for the repository targeted in the request. It returns the context and the request that the handler should
+// be invoked with. In addition, it returns a function that must be called with the error returned from the handler to either commit or rollback the
+// transaction. The returned values are valid even if the request should not run transactionally.
+func transactionalizeRequest(ctx context.Context, logger log.Logger, txRegistry *TransactionRegistry, mgr *PartitionManager, locator storage.Locator, methodInfo protoregistry.MethodInfo, req proto.Message) (_ transactionalizedRequest, returnedErr error) {
+ if methodInfo.Scope != protoregistry.ScopeRepository {
+ return nonTransactionalRequest(ctx, req), nil
+ }
+
+ repo, err := methodInfo.TargetRepo(req)
+ if err != nil {
+ if errors.Is(err, protoregistry.ErrRepositoryFieldNotFound) {
+ // The above error is returned when the repository field is not set in the request.
+ // Return instead the error many tests are asserting to be returned from the handlers.
+ return transactionalizedRequest{}, structerr.NewInvalidArgument("%w", storage.ErrRepositoryNotSet)
+ }
+
+ return transactionalizedRequest{}, fmt.Errorf("extract target repository: %w", err)
+ }
+
+ if methodInfo.Operation != protoregistry.OpAccessor && methodInfo.Operation != protoregistry.OpMutator {
+ // Transactions support only accessors and mutators.
+ return nonTransactionalRequest(ctx, req), nil
+ }
+
+ if repo.GitObjectDirectory != "" || len(repo.GitAlternateObjectDirectories) > 0 {
+ // The object directories should only be configured on a repository coming from a request that
+ // was already configured with a quarantine directory and is being looped back to Gitaly from Rails'
+ // authorization checks. If that's the case, the request should already be running in scope of a
+ // transaction and the repository rewritten to point to the snapshot repository. We thus don't start
+ // a new transaction if we encounter this.
+ //
+ // This property is violated in tests which manually configure the object directory or the alternate
+ // object directory. This allows for circumventing the transaction management by configuring the either
+ // of the object directories. We'll leave this unaddressed for now and later address this by removing
+ // the options to configure object directories and alternates in a request.
+ //
+ // The relative path in quarantined requests is currently still pointing to the original repository.
+ // https://gitlab.com/gitlab-org/gitaly/-/issues/5483 tracks having Rails send the snapshot's relative
+ // path instead.
+
+ if methodInfo.Operation == protoregistry.OpMutator {
+ // Accessor requests may come with quarantine configured from Rails' access checks. Since the
+ // RPC that triggered these access checks would already run in a transaction and target a
+ // snapshot, we won't start another one. Mutators however are rejected to prevent writes
+ // unintentionally targeting the main repository.
+ return transactionalizedRequest{}, ErrQuarantineConfiguredOnMutator
+ }
+
+ return nonTransactionalRequest(ctx, req), nil
+ }
+
+ // While the PartitionManager already verifies the repository's storage and relative path, it does not
+ // return the exact same error messages as some RPCs are testing for at the moment. In order to maintain
+ // compatibility with said tests, validate the repository here ahead of time and return the possible error
+ // as is.
+ if err := locator.ValidateRepository(repo, storage.WithSkipRepositoryExistenceCheck()); err != nil {
+ return transactionalizedRequest{}, err
+ }
+
+ tx, err := mgr.Begin(ctx, repo, TransactionOptions{ReadOnly: methodInfo.Operation == protoregistry.OpAccessor})
+ if err != nil {
+ return transactionalizedRequest{}, fmt.Errorf("begin transaction: %w", err)
+ }
+ ctx = storagectx.ContextWithTransaction(ctx, tx)
+
+ txID := txRegistry.register(tx.Transaction)
+ ctx = storage.ContextWithTransactionID(ctx, txID)
+
+ finishTX := func(handlerErr error) error {
+ defer txRegistry.unregister(txID)
+
+ if handlerErr != nil {
+ if err := tx.Rollback(); err != nil {
+ logger.WithError(err).ErrorContext(ctx, "failed rolling back transaction")
+ }
+
+ return handlerErr
+ }
+
+ if err := tx.Commit(ctx); err != nil {
+ return fmt.Errorf("commit: %w", err)
+ }
+
+ return nil
+ }
+
+ defer func() {
+ if returnedErr != nil {
+ returnedErr = finishTX(returnedErr)
+ }
+ }()
+
+ rewrittenReq, err := rewriteRequest(tx, methodInfo, req)
+ if err != nil {
+ return transactionalizedRequest{}, fmt.Errorf("rewrite request: %w", err)
+ }
+
+ return transactionalizedRequest{
+ ctx: ctx,
+ firstMessage: rewrittenReq,
+ finishTransaction: finishTX,
+ }, nil
+}
+
+func rewriteRequest(tx *finalizableTransaction, methodInfo protoregistry.MethodInfo, req proto.Message) (proto.Message, error) {
+ // Clone the request in order to not rewrite the request in the earlier interceptors.
+ rewrittenReq := proto.Clone(req)
+ targetRepo, err := methodInfo.TargetRepo(rewrittenReq)
+ if err != nil {
+ return nil, fmt.Errorf("extract target repository: %w", err)
+ }
+
+ *targetRepo = *tx.RewriteRepository(targetRepo)
+
+ return rewrittenReq, nil
+}
diff --git a/internal/gitaly/storage/storagemgr/middleware_test.go b/internal/gitaly/storage/storagemgr/middleware_test.go
new file mode 100644
index 000000000..6c2764369
--- /dev/null
+++ b/internal/gitaly/storage/storagemgr/middleware_test.go
@@ -0,0 +1,554 @@
+package storagemgr_test
+
+import (
+ "context"
+ "errors"
+ "io"
+ "testing"
+
+ "github.com/sirupsen/logrus"
+ "github.com/sirupsen/logrus/hooks/test"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagectx"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/log"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testserver"
+ "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/health/grpc_health_v1"
+ "google.golang.org/protobuf/proto"
+)
+
+type mockRepositoryService struct {
+ objectFormatFunc func(context.Context, *gitalypb.ObjectFormatRequest) (*gitalypb.ObjectFormatResponse, error)
+ optimizeRepositoryFunc func(context.Context, *gitalypb.OptimizeRepositoryRequest) (*gitalypb.OptimizeRepositoryResponse, error)
+ removeRepositoryFunc func(context.Context, *gitalypb.RemoveRepositoryRequest) (*gitalypb.RemoveRepositoryResponse, error)
+ setCustomHooksFunc func(gitalypb.RepositoryService_SetCustomHooksServer) error
+ getCustomHooksFunc func(*gitalypb.GetCustomHooksRequest, gitalypb.RepositoryService_GetCustomHooksServer) error
+ gitalypb.UnimplementedRepositoryServiceServer
+}
+
+func (m mockRepositoryService) ObjectFormat(ctx context.Context, req *gitalypb.ObjectFormatRequest) (*gitalypb.ObjectFormatResponse, error) {
+ return m.objectFormatFunc(ctx, req)
+}
+
+func (m mockRepositoryService) OptimizeRepository(ctx context.Context, req *gitalypb.OptimizeRepositoryRequest) (*gitalypb.OptimizeRepositoryResponse, error) {
+ return m.optimizeRepositoryFunc(ctx, req)
+}
+
+func (m mockRepositoryService) RemoveRepository(ctx context.Context, req *gitalypb.RemoveRepositoryRequest) (*gitalypb.RemoveRepositoryResponse, error) {
+ return m.removeRepositoryFunc(ctx, req)
+}
+
+func (m mockRepositoryService) SetCustomHooks(stream gitalypb.RepositoryService_SetCustomHooksServer) error {
+ return m.setCustomHooksFunc(stream)
+}
+
+func (m mockRepositoryService) GetCustomHooks(req *gitalypb.GetCustomHooksRequest, stream gitalypb.RepositoryService_GetCustomHooksServer) error {
+ return m.getCustomHooksFunc(req, stream)
+}
+
+type mockHealthService struct {
+ checkFunc func(context.Context, *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error)
+ grpc_health_v1.UnimplementedHealthServer
+}
+
+func (m mockHealthService) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
+ return m.checkFunc(ctx, req)
+}
+
+func TestMiddleware_transactional(t *testing.T) {
+ if !testhelper.IsWALEnabled() {
+ t.Skip(`
+The test relies on the interceptor being configured in the test server.
+Only run the test with WAL enabled as other wise the interceptor won't
+be configured.`)
+ }
+
+ testhelper.SkipWithPraefect(t, `
+This interceptor is for use with Gitaly. Praefect running in front of it may change error
+messages and behavior by erroring out the requests before they even hit this interceptor.`)
+
+ t.Parallel()
+
+ validRepository := func() *gitalypb.Repository {
+ return &gitalypb.Repository{
+ StorageName: "default",
+ RelativePath: "relative-path",
+ GlRepository: "gl-repository",
+ GlProjectPath: "project-path",
+ }
+ }
+
+ for _, tc := range []struct {
+ desc string
+ repository *gitalypb.Repository
+ performRequest func(*testing.T, context.Context, gitalypb.RepositoryServiceClient)
+ handlerError error
+ rollbackTransaction bool
+ expectHandlerInvoked bool
+ expectedRollbackError error
+ expectedResponse proto.Message
+ expectedError error
+ }{
+ {
+ desc: "missing repository",
+ performRequest: func(t *testing.T, ctx context.Context, client gitalypb.RepositoryServiceClient) {
+ resp, err := client.ObjectFormat(ctx, &gitalypb.ObjectFormatRequest{})
+ require.Nil(t, resp)
+ testhelper.RequireGrpcError(t,
+ structerr.NewInvalidArgument("%w", storage.ErrRepositoryNotSet),
+ err,
+ )
+ },
+ },
+ {
+ desc: "storage not set",
+ performRequest: func(t *testing.T, ctx context.Context, client gitalypb.RepositoryServiceClient) {
+ resp, err := client.ObjectFormat(ctx, &gitalypb.ObjectFormatRequest{
+ Repository: &gitalypb.Repository{
+ RelativePath: "non-existent-relative-path",
+ },
+ })
+ require.Nil(t, resp)
+ testhelper.RequireGrpcError(t,
+ structerr.NewInvalidArgument("%w", storage.ErrStorageNotSet),
+ err,
+ )
+ },
+ },
+ {
+ desc: "relative path not set",
+ performRequest: func(t *testing.T, ctx context.Context, client gitalypb.RepositoryServiceClient) {
+ resp, err := client.ObjectFormat(ctx, &gitalypb.ObjectFormatRequest{
+ Repository: &gitalypb.Repository{
+ StorageName: "default",
+ },
+ })
+ require.Nil(t, resp)
+ testhelper.RequireGrpcError(t,
+ structerr.NewInvalidArgument("%w", storage.ErrRepositoryPathNotSet),
+ err,
+ )
+ },
+ },
+ {
+ desc: "invalid storage",
+ performRequest: func(t *testing.T, ctx context.Context, client gitalypb.RepositoryServiceClient) {
+ resp, err := client.ObjectFormat(ctx, &gitalypb.ObjectFormatRequest{
+ Repository: &gitalypb.Repository{
+ StorageName: "non-existent-storage",
+ RelativePath: "non-existent-relative-path",
+ },
+ })
+ require.Nil(t, resp)
+ testhelper.RequireGrpcError(t,
+ testhelper.ToInterceptedMetadata(structerr.NewInvalidArgument("%w", storage.NewStorageNotFoundError("non-existent-storage"))),
+ err,
+ )
+ },
+ },
+ {
+ desc: "unary rollback error is logged",
+ performRequest: func(t *testing.T, ctx context.Context, client gitalypb.RepositoryServiceClient) {
+ resp, err := client.ObjectFormat(ctx, &gitalypb.ObjectFormatRequest{Repository: validRepository()})
+ require.Nil(t, resp)
+ testhelper.RequireGrpcError(t,
+ structerr.NewInternal("handler error"),
+ err,
+ )
+ },
+ rollbackTransaction: true,
+ expectHandlerInvoked: true,
+ handlerError: errors.New("handler error"),
+ expectedRollbackError: storagemgr.ErrTransactionAlreadyRollbacked,
+ },
+ {
+ desc: "streaming rollback error is logged",
+ performRequest: func(t *testing.T, ctx context.Context, client gitalypb.RepositoryServiceClient) {
+ stream, err := client.GetCustomHooks(ctx, &gitalypb.GetCustomHooksRequest{Repository: validRepository()})
+ require.NoError(t, err)
+
+ resp, err := stream.Recv()
+ testhelper.RequireGrpcError(t,
+ structerr.NewInternal("handler error"),
+ err,
+ )
+ require.Nil(t, resp)
+ },
+ rollbackTransaction: true,
+ expectHandlerInvoked: true,
+ handlerError: errors.New("handler error"),
+ expectedRollbackError: storagemgr.ErrTransactionAlreadyRollbacked,
+ },
+ {
+ desc: "unary commit error is returned",
+ performRequest: func(t *testing.T, ctx context.Context, client gitalypb.RepositoryServiceClient) {
+ resp, err := client.ObjectFormat(ctx, &gitalypb.ObjectFormatRequest{Repository: validRepository()})
+ require.Nil(t, resp)
+ testhelper.RequireGrpcError(t,
+ structerr.NewInternal("commit: transaction already rollbacked"),
+ err,
+ )
+ },
+ rollbackTransaction: true,
+ expectHandlerInvoked: true,
+ },
+ {
+ desc: "streaming commit error is returned",
+ performRequest: func(t *testing.T, ctx context.Context, client gitalypb.RepositoryServiceClient) {
+ stream, err := client.GetCustomHooks(ctx, &gitalypb.GetCustomHooksRequest{Repository: validRepository()})
+ require.NoError(t, err)
+
+ resp, err := stream.Recv()
+ testhelper.RequireGrpcError(t,
+ structerr.NewInternal("commit: transaction already rollbacked"),
+ err,
+ )
+ require.Nil(t, resp)
+ },
+ rollbackTransaction: true,
+ expectHandlerInvoked: true,
+ },
+ {
+ desc: "failed unary request",
+ performRequest: func(t *testing.T, ctx context.Context, client gitalypb.RepositoryServiceClient) {
+ resp, err := client.ObjectFormat(ctx, &gitalypb.ObjectFormatRequest{Repository: validRepository()})
+ testhelper.RequireGrpcError(t,
+ structerr.NewInternal("handler error"),
+ err,
+ )
+ require.Nil(t, resp)
+ },
+
+ handlerError: errors.New("handler error"),
+ expectHandlerInvoked: true,
+ },
+ {
+ desc: "successful unary accessor",
+ performRequest: func(t *testing.T, ctx context.Context, client gitalypb.RepositoryServiceClient) {
+ resp, err := client.ObjectFormat(ctx, &gitalypb.ObjectFormatRequest{Repository: validRepository()})
+ require.NoError(t, err)
+ testhelper.ProtoEqual(t, &gitalypb.ObjectFormatResponse{}, resp)
+ },
+ expectHandlerInvoked: true,
+ },
+ {
+ desc: "successful streaming accessor",
+ performRequest: func(t *testing.T, ctx context.Context, client gitalypb.RepositoryServiceClient) {
+ stream, err := client.GetCustomHooks(ctx, &gitalypb.GetCustomHooksRequest{Repository: validRepository()})
+ require.NoError(t, err)
+
+ resp, err := stream.Recv()
+ require.Equal(t, io.EOF, err)
+ require.Nil(t, resp)
+ },
+ expectHandlerInvoked: true,
+ },
+ {
+ desc: "successful unary mutator",
+ performRequest: func(t *testing.T, ctx context.Context, client gitalypb.RepositoryServiceClient) {
+ resp, err := client.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{Repository: validRepository()})
+ require.NoError(t, err)
+ testhelper.ProtoEqual(t, &gitalypb.RemoveRepositoryResponse{}, resp)
+ },
+ expectHandlerInvoked: true,
+ },
+ {
+ desc: "successful streaming mutator",
+ performRequest: func(t *testing.T, ctx context.Context, client gitalypb.RepositoryServiceClient) {
+ stream, err := client.SetCustomHooks(ctx)
+ require.NoError(t, err)
+
+ require.NoError(t, stream.Send(&gitalypb.SetCustomHooksRequest{Repository: validRepository()}))
+
+ resp, err := stream.CloseAndRecv()
+ require.Equal(t, io.EOF, err)
+ require.Nil(t, resp)
+ },
+ expectHandlerInvoked: true,
+ },
+ {
+ desc: "mutator with object directory configured",
+ performRequest: func(t *testing.T, ctx context.Context, client gitalypb.RepositoryServiceClient) {
+ repo := validRepository()
+ repo.GitObjectDirectory = "non-default"
+
+ resp, err := client.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{Repository: repo})
+ testhelper.RequireGrpcError(t, structerr.NewInternal("%w", storagemgr.ErrQuarantineConfiguredOnMutator), err)
+ require.Nil(t, resp)
+ },
+ },
+ {
+ desc: "mutator with alternate object directory configured",
+ performRequest: func(t *testing.T, ctx context.Context, client gitalypb.RepositoryServiceClient) {
+ repo := validRepository()
+ repo.GitAlternateObjectDirectories = []string{"non-default"}
+
+ resp, err := client.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{Repository: repo})
+ testhelper.RequireGrpcError(t, structerr.NewInternal("%w", storagemgr.ErrQuarantineConfiguredOnMutator), err)
+ require.Nil(t, resp)
+ },
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ cfg := testcfg.Build(t)
+
+ ctx := testhelper.Context(t)
+ gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{
+ SkipCreationViaService: true,
+ RelativePath: validRepository().RelativePath,
+ })
+
+ txRegistry := storagemgr.NewTransactionRegistry()
+
+ logger, loggerHook := test.NewNullLogger()
+ entry := logrus.NewEntry(logger)
+
+ handlerInvoked := false
+ var transactionID storage.TransactionID
+
+ assertHandler := func(ctx context.Context, isMutator bool, repo *gitalypb.Repository) {
+ handlerInvoked = true
+
+ // The repositories should be equal except for the relative path which
+ // has been pointed to the snapshot.
+ expectedRepo := validRepository()
+ actualRepo := repo
+
+ // When run in a transaction, the relative path will be pointed to the snapshot.
+ assert.NotEqual(t, expectedRepo.RelativePath, repo.RelativePath)
+ expectedRepo.RelativePath = ""
+ actualRepo.RelativePath = ""
+
+ if isMutator {
+ // Mutators should have quarantine directory configured.
+ assert.NotEmpty(t, actualRepo.GitObjectDirectory)
+ actualRepo.GitObjectDirectory = ""
+ assert.NotEmpty(t, actualRepo.GitAlternateObjectDirectories)
+ actualRepo.GitAlternateObjectDirectories = nil
+ } else {
+ // Accessors should not have a quarantine directory configured.
+ assert.Empty(t, actualRepo.GitObjectDirectory)
+ assert.Empty(t, actualRepo.GitAlternateObjectDirectories)
+ }
+
+ testhelper.ProtoEqual(t, expectedRepo, actualRepo)
+
+ // The transaction ID should be included in the context.
+ transactionID = storage.ExtractTransactionID(ctx)
+ assert.Equal(t, storage.TransactionID(1), transactionID)
+
+ // The transaction itself should be included in the context.
+ transactionInContext := false
+ storagectx.RunWithTransaction(ctx, func(tx storagectx.Transaction) {
+ transactionInContext = true
+ })
+ assert.True(t, transactionInContext)
+
+ // The transaction should be registered into the registry and retrievable
+ // with its ID.
+ tx, err := txRegistry.Get(transactionID)
+ assert.NoError(t, err)
+
+ if tc.rollbackTransaction {
+ assert.NoError(t, tx.Rollback())
+ }
+ }
+
+ serverAddress := testserver.RunGitalyServer(t, cfg, func(server *grpc.Server, deps *service.Dependencies) {
+ gitalypb.RegisterRepositoryServiceServer(server, mockRepositoryService{
+ objectFormatFunc: func(ctx context.Context, req *gitalypb.ObjectFormatRequest) (*gitalypb.ObjectFormatResponse, error) {
+ assertHandler(ctx, false, req.GetRepository())
+ return &gitalypb.ObjectFormatResponse{}, tc.handlerError
+ },
+ setCustomHooksFunc: func(stream gitalypb.RepositoryService_SetCustomHooksServer) error {
+ req, err := stream.Recv()
+ assert.NoError(t, err)
+
+ assertHandler(stream.Context(), true, req.GetRepository())
+
+ resp, err := stream.Recv()
+ assert.Nil(t, resp)
+ assert.Equal(t, io.EOF, err)
+ return nil
+ },
+ getCustomHooksFunc: func(req *gitalypb.GetCustomHooksRequest, stream gitalypb.RepositoryService_GetCustomHooksServer) error {
+ assertHandler(stream.Context(), false, req.GetRepository())
+ return tc.handlerError
+ },
+ removeRepositoryFunc: func(ctx context.Context, req *gitalypb.RemoveRepositoryRequest) (*gitalypb.RemoveRepositoryResponse, error) {
+ assertHandler(ctx, true, req.GetRepository())
+ return &gitalypb.RemoveRepositoryResponse{}, tc.handlerError
+ },
+ })
+ },
+ testserver.WithTransactionRegistry(txRegistry),
+ testserver.WithLogger(log.FromLogrusEntry(entry)),
+ )
+
+ clientConn, err := client.Dial(ctx, serverAddress)
+ require.NoError(t, err)
+ defer clientConn.Close()
+
+ tc.performRequest(t, testhelper.Context(t), gitalypb.NewRepositoryServiceClient(clientConn))
+ require.Equal(t, tc.expectHandlerInvoked, handlerInvoked)
+
+ var rollbackFailureError error
+ for _, entry := range loggerHook.AllEntries() {
+ if entry.Message == "failed rolling back transaction" {
+ rollbackFailureError = entry.Data[logrus.ErrorKey].(error)
+ }
+ }
+
+ require.Equal(t, tc.expectedRollbackError, rollbackFailureError)
+
+ // The transaction should be unregistered at the end of the RPC to clean up.
+ _, err = txRegistry.Get(transactionID)
+ require.Equal(t, errors.New("transaction not found"), err)
+ })
+ }
+}
+
+func TestMiddleware_non_transactional(t *testing.T) {
+ if !testhelper.IsWALEnabled() {
+ t.Skip(`
+The test relies on the interceptor being configured in the test server.
+Only run the test with WAL enabled as other wise the interceptor won't
+be configured.`)
+ }
+
+ testhelper.SkipWithPraefect(t, `
+This interceptor is for use with Gitaly. Praefect running in front of it may change error
+messages and behavior by erroring out the requests before they even hit this interceptor.`)
+
+ validRepository := func() *gitalypb.Repository {
+ return &gitalypb.Repository{
+ StorageName: "default",
+ RelativePath: "non-existent",
+ GlRepository: "gl-repository",
+ GlProjectPath: "project-path",
+ }
+ }
+
+ for _, tc := range []struct {
+ desc string
+ performRequest func(*testing.T, context.Context, *grpc.ClientConn)
+ }{
+ {
+ desc: "health service",
+ performRequest: func(t *testing.T, ctx context.Context, cc *grpc.ClientConn) {
+ resp, err := grpc_health_v1.NewHealthClient(cc).Check(ctx, &grpc_health_v1.HealthCheckRequest{})
+ require.NoError(t, err)
+ testhelper.ProtoEqual(t, &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, resp)
+ },
+ },
+ {
+ desc: "repository with object directory does not start a transaction",
+ performRequest: func(t *testing.T, ctx context.Context, cc *grpc.ClientConn) {
+ resp, err := gitalypb.NewRepositoryServiceClient(cc).ObjectFormat(ctx, &gitalypb.ObjectFormatRequest{
+ Repository: &gitalypb.Repository{
+ StorageName: "default",
+ GitObjectDirectory: "non-default",
+ },
+ })
+ require.NoError(t, err)
+ testhelper.ProtoEqual(t, &gitalypb.ObjectFormatResponse{}, resp)
+ },
+ },
+ {
+ desc: "repository with alternate object directory does not start a transaction",
+ performRequest: func(t *testing.T, ctx context.Context, cc *grpc.ClientConn) {
+ resp, err := gitalypb.NewRepositoryServiceClient(cc).ObjectFormat(ctx, &gitalypb.ObjectFormatRequest{
+ Repository: &gitalypb.Repository{
+ StorageName: "default",
+ GitAlternateObjectDirectories: []string{"non-default"},
+ },
+ })
+ require.NoError(t, err)
+ testhelper.ProtoEqual(t, &gitalypb.ObjectFormatResponse{}, resp)
+ },
+ },
+ {
+ desc: "streaming rpc without first message",
+ performRequest: func(t *testing.T, ctx context.Context, cc *grpc.ClientConn) {
+ stream, err := gitalypb.NewRepositoryServiceClient(cc).SetCustomHooks(ctx)
+ require.NoError(t, err)
+
+ resp, err := stream.CloseAndRecv()
+ require.Equal(t, io.EOF, err)
+ require.Nil(t, resp)
+ },
+ },
+ {
+ desc: "maintenance rpc",
+ performRequest: func(t *testing.T, ctx context.Context, cc *grpc.ClientConn) {
+ resp, err := gitalypb.NewRepositoryServiceClient(cc).OptimizeRepository(ctx, &gitalypb.OptimizeRepositoryRequest{
+ Repository: validRepository(),
+ })
+ require.NoError(t, err)
+ testhelper.ProtoEqual(t, &gitalypb.OptimizeRepositoryResponse{}, resp)
+ },
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ ctx := testhelper.Context(t)
+
+ cfg := testcfg.Build(t)
+
+ handlerInvoked := false
+ assertHandler := func(ctx context.Context) {
+ handlerInvoked = true
+ assert.Equal(t, storage.TransactionID(0), storage.ExtractTransactionID(ctx))
+ }
+
+ serverAddress := testserver.RunGitalyServer(t, cfg, func(server *grpc.Server, deps *service.Dependencies) {
+ grpc_health_v1.RegisterHealthServer(server, mockHealthService{
+ checkFunc: func(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
+ assertHandler(ctx)
+ return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil
+ },
+ })
+ gitalypb.RegisterRepositoryServiceServer(server, mockRepositoryService{
+ objectFormatFunc: func(ctx context.Context, req *gitalypb.ObjectFormatRequest) (*gitalypb.ObjectFormatResponse, error) {
+ assertHandler(ctx)
+ return &gitalypb.ObjectFormatResponse{}, nil
+ },
+ setCustomHooksFunc: func(stream gitalypb.RepositoryService_SetCustomHooksServer) error {
+ assertHandler(stream.Context())
+
+ resp, err := stream.Recv()
+ assert.Nil(t, resp)
+ assert.Equal(t, io.EOF, err)
+ return nil
+ },
+ optimizeRepositoryFunc: func(ctx context.Context, req *gitalypb.OptimizeRepositoryRequest) (*gitalypb.OptimizeRepositoryResponse, error) {
+ assertHandler(ctx)
+ testhelper.ProtoEqual(t, validRepository(), req.GetRepository())
+ return &gitalypb.OptimizeRepositoryResponse{}, nil
+ },
+ removeRepositoryFunc: func(ctx context.Context, req *gitalypb.RemoveRepositoryRequest) (*gitalypb.RemoveRepositoryResponse, error) {
+ assertHandler(ctx)
+ testhelper.ProtoEqual(t, validRepository(), req.GetRepository())
+ return &gitalypb.RemoveRepositoryResponse{}, nil
+ },
+ })
+ })
+
+ clientConn, err := client.Dial(ctx, serverAddress)
+ require.NoError(t, err)
+ defer clientConn.Close()
+
+ tc.performRequest(t, testhelper.Context(t), clientConn)
+ require.True(t, handlerInvoked)
+ })
+ }
+}
diff --git a/internal/gitaly/storage/storagemgr/transaction_manager.go b/internal/gitaly/storage/storagemgr/transaction_manager.go
index 9068ce635..f0a87dbc1 100644
--- a/internal/gitaly/storage/storagemgr/transaction_manager.go
+++ b/internal/gitaly/storage/storagemgr/transaction_manager.go
@@ -327,6 +327,15 @@ func (txn *Transaction) RewriteRepository(repo *gitalypb.Repository) *gitalypb.R
return rewritten
}
+// OriginalRepository returns the repository as it was before rewriting it to point to the snapshot.
+func (txn *Transaction) OriginalRepository(repo *gitalypb.Repository) *gitalypb.Repository {
+ original := proto.Clone(repo).(*gitalypb.Repository)
+ original.RelativePath = strings.TrimPrefix(repo.RelativePath, txn.snapshotBaseRelativePath+string(os.PathSeparator))
+ original.GitObjectDirectory = ""
+ original.GitAlternateObjectDirectories = nil
+ return original
+}
+
// createRepositorySnapshot snapshots the repository's current state at snapshotPath. This is done by
// recreating the repository's directory structure and hard linking the repository's files in their
// correct locations there. This effectively does a copy-free clone of the repository. Since the files
diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go
index 1c7b48bd3..ecb83999a 100644
--- a/internal/testhelper/testserver/gitaly.go
+++ b/internal/testhelper/testserver/gitaly.go
@@ -30,6 +30,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/limithandler"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm"
"gitlab.com/gitlab-org/gitaly/v16/internal/limiter"
"gitlab.com/gitlab-org/gitaly/v16/internal/log"
@@ -173,12 +174,25 @@ func runGitaly(tb testing.TB, cfg config.Cfg, registrar func(srv *grpc.Server, d
deps := gsd.createDependencies(tb, cfg)
tb.Cleanup(func() { testhelper.MustClose(tb, gsd.conns) })
+ var txMiddleware server.TransactionMiddleware
+ if deps.GetPartitionManager() != nil {
+ txMiddleware = server.TransactionMiddleware{
+ UnaryInterceptor: storagemgr.NewUnaryInterceptor(
+ deps.Logger, protoregistry.GitalyProtoPreregistered, deps.GetTransactionRegistry(), deps.GetPartitionManager(), deps.GetLocator(),
+ ),
+ StreamInterceptor: storagemgr.NewStreamInterceptor(
+ deps.Logger, protoregistry.GitalyProtoPreregistered, deps.GetTransactionRegistry(), deps.GetPartitionManager(), deps.GetLocator(),
+ ),
+ }
+ }
+
serverFactory := server.NewGitalyServerFactory(
cfg,
gsd.logger.WithField("test", tb.Name()),
deps.GetBackchannelRegistry(),
deps.GetDiskCache(),
[]*limithandler.LimiterMiddleware{deps.GetLimitHandler()},
+ txMiddleware,
)
if cfg.RuntimeDir != "" {
@@ -270,6 +284,7 @@ type gitalyServerDeps struct {
backupSink backup.Sink
backupLocator backup.Locator
signingKey string
+ transactionRegistry *storagemgr.TransactionRegistry
}
func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) *service.Dependencies {
@@ -303,9 +318,12 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) *
gsd.gitCmdFactory = gittest.NewCommandFactory(tb, cfg)
}
- transactionRegistry := storagemgr.NewTransactionRegistry()
+ if gsd.transactionRegistry == nil {
+ gsd.transactionRegistry = storagemgr.NewTransactionRegistry()
+ }
+
if gsd.hookMgr == nil {
- gsd.hookMgr = hook.NewManager(cfg, gsd.locator, gsd.logger, gsd.gitCmdFactory, gsd.txMgr, gsd.gitlabClient, hook.NewTransactionRegistry(transactionRegistry))
+ gsd.hookMgr = hook.NewManager(cfg, gsd.locator, gsd.logger, gsd.gitCmdFactory, gsd.txMgr, gsd.gitlabClient, hook.NewTransactionRegistry(gsd.transactionRegistry))
}
if gsd.catfileCache == nil {
@@ -385,7 +403,7 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) *
RepositoryCounter: gsd.repositoryCounter,
UpdaterWithHooks: gsd.updaterWithHooks,
HousekeepingManager: gsd.housekeepingManager,
- TransactionRegistry: transactionRegistry,
+ TransactionRegistry: gsd.transactionRegistry,
PartitionManager: partitionManager,
BackupSink: gsd.backupSink,
BackupLocator: gsd.backupLocator,
@@ -518,3 +536,11 @@ func WithSigningKey(signingKey string) GitalyServerOpt {
return deps
}
}
+
+// WithTransactionRegistry sets the transaction registry that will be used for Gitaly services.
+func WithTransactionRegistry(registry *storagemgr.TransactionRegistry) GitalyServerOpt {
+ return func(deps gitalyServerDeps) gitalyServerDeps {
+ deps.transactionRegistry = registry
+ return deps
+ }
+}