diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2023-10-19 18:44:30 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2023-10-19 18:44:30 +0300 |
commit | c7c7764c3d542ae23943d119294718ba1e905cab (patch) | |
tree | c5a0743f5999d24b117a13b5408a79277cb6288e | |
parent | 40dbb36fb46b294fb0e8443dda37307e3cd21b52 (diff) | |
parent | 14bd927d6d4973c1516d1c5c7568131c90606ac4 (diff) |
Merge branch 'smh-transaction-middleware' into 'master'
Introduce transaction managing middleware
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6458
Merged-by: Sami Hiltunen <shiltunen@gitlab.com>
Approved-by: Will Chandler <wchandler@gitlab.com>
Reviewed-by: Will Chandler <wchandler@gitlab.com>
Reviewed-by: Sami Hiltunen <shiltunen@gitlab.com>
32 files changed, 1182 insertions, 56 deletions
diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index c9d067fcc..b19091b58 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -372,6 +372,7 @@ func run(cfg config.Cfg, logger log.Logger) error { registry, diskCache, []*limithandler.LimiterMiddleware{perRPCLimitHandler, rateLimitHandler}, + server.TransactionMiddleware{}, ) defer gitalyServerFactory.Stop() diff --git a/internal/cli/gitaly/subcmd_hooks_test.go b/internal/cli/gitaly/subcmd_hooks_test.go index 338a72c0e..3e7ee64ec 100644 --- a/internal/cli/gitaly/subcmd_hooks_test.go +++ b/internal/cli/gitaly/subcmd_hooks_test.go @@ -16,6 +16,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/setup" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" + "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testserver" @@ -45,6 +46,11 @@ func TestSetHooksSubcommand(t *testing.T) { configPath := testcfg.WriteTemporaryGitalyConfigFile(t, cfg) + expectedMode := umask.Mask(fs.ModePerm) + if testhelper.IsWALEnabled() { + expectedMode = perm.PrivateDir + } + for _, tc := range []struct { desc string setup func() ([]string, *gitalypb.Repository) @@ -126,7 +132,7 @@ func TestSetHooksSubcommand(t *testing.T) { }, hooks: &bytes.Buffer{}, expectedState: testhelper.DirectoryState{ - "custom_hooks/": {Mode: umask.Mask(fs.ModePerm)}, + "custom_hooks/": {Mode: expectedMode}, }, }, { @@ -141,7 +147,7 @@ func TestSetHooksSubcommand(t *testing.T) { }, hooks: testhelper.MustCreateCustomHooksTar(t), expectedState: testhelper.DirectoryState{ - "custom_hooks/": {Mode: umask.Mask(fs.ModePerm)}, + "custom_hooks/": {Mode: expectedMode}, "custom_hooks/pre-commit": {Mode: umask.Mask(fs.ModePerm), Content: []byte("pre-commit content")}, "custom_hooks/pre-push": {Mode: umask.Mask(fs.ModePerm), Content: []byte("pre-push content")}, "custom_hooks/pre-receive": {Mode: umask.Mask(fs.ModePerm), Content: []byte("pre-receive content")}, @@ -165,7 +171,7 @@ func TestSetHooksSubcommand(t *testing.T) { }, hooks: testhelper.MustCreateCustomHooksTar(t), expectedState: testhelper.DirectoryState{ - "custom_hooks/": {Mode: umask.Mask(fs.ModePerm)}, + "custom_hooks/": {Mode: expectedMode}, "custom_hooks/pre-commit": {Mode: umask.Mask(fs.ModePerm), Content: []byte("pre-commit content")}, "custom_hooks/pre-push": {Mode: umask.Mask(fs.ModePerm), Content: []byte("pre-push content")}, "custom_hooks/pre-receive": {Mode: umask.Mask(fs.ModePerm), Content: []byte("pre-receive content")}, diff --git a/internal/git/localrepo/refs.go b/internal/git/localrepo/refs.go index c0be1ebdf..e7f219ee9 100644 --- a/internal/git/localrepo/refs.go +++ b/internal/git/localrepo/refs.go @@ -12,6 +12,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/command" "gitlab.com/gitlab-org/gitaly/v16/internal/git" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagectx" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/safe" ) @@ -175,6 +176,10 @@ func (repo *Repo) setDefaultBranchWithTransaction(ctx context.Context, txManager return fmt.Errorf("committing temporary HEAD: %w", err) } + storagectx.RunWithTransaction(ctx, func(tx storagectx.Transaction) { + tx.SetDefaultBranch(reference) + }) + return nil } diff --git a/internal/gitaly/server/auth_test.go b/internal/gitaly/server/auth_test.go index 7300a6ef8..befe375e4 100644 --- a/internal/gitaly/server/auth_test.go +++ b/internal/gitaly/server/auth_test.go @@ -203,7 +203,7 @@ func runServer(t *testing.T, cfg config.Cfg) string { limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, setupPerRPCConcurrencyLimiters) updaterWithHooks := updateref.NewUpdaterWithHooks(cfg, logger, locator, hookManager, gitCmdFactory, catfileCache) - srv, err := NewGitalyServerFactory(cfg, logger, registry, diskCache, []*limithandler.LimiterMiddleware{limitHandler}).New(false) + srv, err := NewGitalyServerFactory(cfg, logger, registry, diskCache, []*limithandler.LimiterMiddleware{limitHandler}, TransactionMiddleware{}).New(true, false) require.NoError(t, err) setup.RegisterAll(srv, &service.Dependencies{ @@ -246,7 +246,8 @@ func runSecureServer(t *testing.T, cfg config.Cfg) string { backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t)), []*limithandler.LimiterMiddleware{limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, setupPerRPCConcurrencyLimiters)}, - ).New(true) + TransactionMiddleware{}, + ).New(true, true) require.NoError(t, err) healthpb.RegisterHealthServer(srv, health.NewServer()) diff --git a/internal/gitaly/server/server.go b/internal/gitaly/server/server.go index d8c1ab9e7..2123a9cc5 100644 --- a/internal/gitaly/server/server.go +++ b/internal/gitaly/server/server.go @@ -53,7 +53,7 @@ func WithStreamInterceptor(interceptor grpc.StreamServerInterceptor) Option { } // New returns a GRPC server instance with a set of interceptors configured. -func (s *GitalyServerFactory) New(secure bool, opts ...Option) (*grpc.Server, error) { +func (s *GitalyServerFactory) New(external, secure bool, opts ...Option) (*grpc.Server, error) { var cfg serverConfig for _, opt := range opts { opt(&cfg) @@ -154,6 +154,19 @@ func (s *GitalyServerFactory) New(secure bool, opts ...Option) (*grpc.Server, er streamServerInterceptors = append(streamServerInterceptors, cfg.streamInterceptors...) unaryServerInterceptors = append(unaryServerInterceptors, cfg.unaryInterceptors...) + // Only requests coming through the external API need to be ran transactionalized. Only the HookService calls + // should arrive through the internal socket. Requests coming from there would already be running in a + // transaction as the external request that led to the internal socket call would have been transactionalized + // already. + if external { + if s.txMiddleware.UnaryInterceptor != nil { + unaryServerInterceptors = append(unaryServerInterceptors, s.txMiddleware.UnaryInterceptor) + } + if s.txMiddleware.StreamInterceptor != nil { + streamServerInterceptors = append(streamServerInterceptors, s.txMiddleware.StreamInterceptor) + } + } + serverOptions := []grpc.ServerOption{ grpc.StatsHandler(gitalylog.PerRPCLogHandler{ Underlying: &grpcstats.PayloadBytes{}, diff --git a/internal/gitaly/server/server_factory.go b/internal/gitaly/server/server_factory.go index 075adc9a3..b23ae840a 100644 --- a/internal/gitaly/server/server_factory.go +++ b/internal/gitaly/server/server_factory.go @@ -20,6 +20,16 @@ type GitalyServerFactory struct { logger log.Logger externalServers []*grpc.Server internalServers []*grpc.Server + txMiddleware TransactionMiddleware +} + +// TransactionMiddleware collects transaction middleware into a single struct that can be +// provided to enable transactions. +type TransactionMiddleware struct { + // UnaryInterceptor is the unary RPC interceptor that handles transaction logic. + UnaryInterceptor grpc.UnaryServerInterceptor + // StreamInterceptor is the stream RPC interceptor that handles transaction logic. + StreamInterceptor grpc.StreamServerInterceptor } // NewGitalyServerFactory allows to create and start secure/insecure 'grpc.Server's. @@ -29,6 +39,7 @@ func NewGitalyServerFactory( registry *backchannel.Registry, cacheInvalidator cache.Invalidator, limitHandlers []*limithandler.LimiterMiddleware, + txMiddleware TransactionMiddleware, ) *GitalyServerFactory { return &GitalyServerFactory{ cfg: cfg, @@ -36,6 +47,7 @@ func NewGitalyServerFactory( registry: registry, cacheInvalidator: cacheInvalidator, limitHandlers: limitHandlers, + txMiddleware: txMiddleware, } } @@ -77,7 +89,7 @@ func (s *GitalyServerFactory) GracefulStop() { // CreateExternal creates a new external gRPC server. The external servers are closed // before the internal servers when gracefully shutting down. func (s *GitalyServerFactory) CreateExternal(secure bool, opts ...Option) (*grpc.Server, error) { - server, err := s.New(secure, opts...) + server, err := s.New(true, secure, opts...) if err != nil { return nil, err } @@ -89,7 +101,7 @@ func (s *GitalyServerFactory) CreateExternal(secure bool, opts ...Option) (*grpc // CreateInternal creates a new internal gRPC server. Internal servers are closed // after the external ones when gracefully shutting down. func (s *GitalyServerFactory) CreateInternal(opts ...Option) (*grpc.Server, error) { - server, err := s.New(false, opts...) + server, err := s.New(false, false, opts...) if err != nil { return nil, err } diff --git a/internal/gitaly/server/server_factory_test.go b/internal/gitaly/server/server_factory_test.go index 6975d4863..896bf7223 100644 --- a/internal/gitaly/server/server_factory_test.go +++ b/internal/gitaly/server/server_factory_test.go @@ -84,6 +84,7 @@ func TestGitalyServerFactory(t *testing.T) { backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t)), nil, + TransactionMiddleware{}, ) t.Cleanup(sf.Stop) @@ -104,6 +105,7 @@ func TestGitalyServerFactory(t *testing.T) { backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t)), nil, + TransactionMiddleware{}, ) t.Cleanup(sf.Stop) @@ -118,6 +120,7 @@ func TestGitalyServerFactory(t *testing.T) { backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t)), nil, + TransactionMiddleware{}, ) t.Cleanup(sf.Stop) @@ -149,6 +152,7 @@ func TestGitalyServerFactory(t *testing.T) { backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg), logger), nil, + TransactionMiddleware{}, ) t.Cleanup(sf.Stop) @@ -184,6 +188,7 @@ func TestGitalyServerFactory_closeOrder(t *testing.T) { backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t)), nil, + TransactionMiddleware{}, ) defer sf.Stop() diff --git a/internal/gitaly/service/hook/pack_objects.go b/internal/gitaly/service/hook/pack_objects.go index 9b513c0a3..14ff7b946 100644 --- a/internal/gitaly/service/hook/pack_objects.go +++ b/internal/gitaly/service/hook/pack_objects.go @@ -20,6 +20,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/pktline" gitalyhook "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagectx" "gitlab.com/gitlab-org/gitaly/v16/internal/helper" "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/stream" @@ -44,7 +45,7 @@ var ( ) func (s *server) packObjectsHook(ctx context.Context, req *gitalypb.PackObjectsHookWithSidechannelRequest, args *packObjectsArgs, stdinReader io.Reader, output io.Writer) error { - cacheKey, stdin, err := s.computeCacheKey(req, stdinReader) + cacheKey, stdin, err := s.computeCacheKey(ctx, req, stdinReader) if err != nil { return err } @@ -113,10 +114,19 @@ func (s *server) packObjectsHook(ctx context.Context, req *gitalypb.PackObjectsH // computeCacheKey returns the cache key used for caching pack-objects. A cache key is made up of // both the requested objects and essential parameters that could impact the content of the // generated packfile. Including any insignificant information could result in a lower cache hit rate. -func (s *server) computeCacheKey(req *gitalypb.PackObjectsHookWithSidechannelRequest, stdinReader io.Reader) (string, io.ReadCloser, error) { +func (s *server) computeCacheKey(ctx context.Context, req *gitalypb.PackObjectsHookWithSidechannelRequest, stdinReader io.Reader) (string, io.ReadCloser, error) { cacheHash := sha256.New() + + repository := req.Repository + storagectx.RunWithTransaction(ctx, func(tx storagectx.Transaction) { + // The cache uses the requests as the keys. As the request's repository in the RPC handler has been rewritten + // to point to the transaction's repository, the handler sees each request as different even if they point to + // the same repository. Restore the original request to ensure identical requests get the same key. + repository = tx.OriginalRepository(req.Repository) + }) + cacheKeyPrefix, err := protojson.Marshal(&gitalypb.PackObjectsHookWithSidechannelRequest{ - Repository: req.Repository, + Repository: repository, Args: req.Args, GitProtocol: req.GitProtocol, }) diff --git a/internal/gitaly/service/operations/merge_branch_test.go b/internal/gitaly/service/operations/merge_branch_test.go index 049c8fee6..aaf450194 100644 --- a/internal/gitaly/service/operations/merge_branch_test.go +++ b/internal/gitaly/service/operations/merge_branch_test.go @@ -762,33 +762,61 @@ func testUserMergeBranchConcurrentUpdate(t *testing.T, ctx context.Context) { require.NoError(t, stream.Send(&gitalypb.UserMergeBranchRequest{Apply: true}), "apply merge") require.NoError(t, stream.CloseSend(), "close send") - secondResponse, err := stream.Recv() - testhelper.RequireGrpcError(t, - structerr.NewFailedPrecondition("reference update: reference does not point to expected object"). - WithDetail(&testproto.ErrorMetadata{ - Key: []byte("actual_object_id"), - Value: []byte(concurrentCommitID), - }). - WithDetail(&testproto.ErrorMetadata{ - Key: []byte("expected_object_id"), - Value: []byte(commits.left), - }). - WithDetail(&testproto.ErrorMetadata{ - Key: []byte("reference"), - Value: []byte("refs/heads/branch"), - }). - WithDetail(&gitalypb.UserMergeBranchError{ - Error: &gitalypb.UserMergeBranchError_ReferenceUpdate{ - ReferenceUpdate: &gitalypb.ReferenceUpdateError{ - ReferenceName: []byte("refs/heads/branch"), - OldOid: commits.left.String(), - NewOid: firstResponse.CommitId, + if testhelper.IsWALEnabled() { + // With transaction management enabled, the RPC won't observe concurrent updates in its snapshot. + // Only once it commits will the updates be verified against other concurrent updates. We thus have + // to receive once more after the successful merge to end the stream and trigger a transaction commit. + // + // This test case is also expecting a UserMergeBranch specific error with additional detail. Given the + // commit errors are universal and happen after the handler, we won't be able to return the same error + // here. + secondResponse, err := stream.Recv() + require.NoError(t, err) + testhelper.ProtoEqual(t, &gitalypb.UserMergeBranchResponse{ + BranchUpdate: &gitalypb.OperationBranchUpdate{ + CommitId: gittest.ObjectHashDependent(t, map[string]string{ + "sha1": "5e96169507b47c71ffb719dc3ad93d014f57617b", + "sha256": "a1165ec1ffbc14e1ad40dbae4c9ccb6afe6a3a4db1b7a7c5f6aa0c52999c45b1", + }), + }, + }, secondResponse) + + thirdResponse, err := stream.Recv() + testhelper.RequireGrpcError(t, structerr.NewInternal("%w", fmt.Errorf("commit: %w", fmt.Errorf("verify references: %w", storagemgr.ReferenceVerificationError{ + ReferenceName: "refs/heads/branch", + ExpectedOID: commits.left, + ActualOID: concurrentCommitID, + }))), err) + require.Nil(t, thirdResponse) + } else { + secondResponse, err := stream.Recv() + testhelper.RequireGrpcError(t, + structerr.NewFailedPrecondition("reference update: reference does not point to expected object"). + WithDetail(&testproto.ErrorMetadata{ + Key: []byte("actual_object_id"), + Value: []byte(concurrentCommitID), + }). + WithDetail(&testproto.ErrorMetadata{ + Key: []byte("expected_object_id"), + Value: []byte(commits.left), + }). + WithDetail(&testproto.ErrorMetadata{ + Key: []byte("reference"), + Value: []byte("refs/heads/branch"), + }). + WithDetail(&gitalypb.UserMergeBranchError{ + Error: &gitalypb.UserMergeBranchError_ReferenceUpdate{ + ReferenceUpdate: &gitalypb.ReferenceUpdateError{ + ReferenceName: []byte("refs/heads/branch"), + OldOid: commits.left.String(), + NewOid: firstResponse.CommitId, + }, }, - }, - }), - err, - ) - require.Nil(t, secondResponse) + }), + err, + ) + require.Nil(t, secondResponse) + } commit, err := repo.ReadCommit(ctx, "refs/heads/branch") require.NoError(t, err, "get commit after RPC finished") @@ -1033,6 +1061,8 @@ func TestUserMergeBranch_allowed(t *testing.T) { } func testUserMergeBranchAllowed(t *testing.T, ctx context.Context) { + testhelper.SkipWithWAL(t, "PartitionManager not injected in test setup") + t.Parallel() type expectedData struct { diff --git a/internal/gitaly/service/operations/rebase_confirmable_test.go b/internal/gitaly/service/operations/rebase_confirmable_test.go index 71f94db26..5ecb63859 100644 --- a/internal/gitaly/service/operations/rebase_confirmable_test.go +++ b/internal/gitaly/service/operations/rebase_confirmable_test.go @@ -610,7 +610,11 @@ func testUserRebaseConfirmablePreReceiveError(t *testing.T, ctx context.Context) ), err) _, err = localRepo.ReadCommit(ctx, git.Revision(firstResponse.GetRebaseSha())) - if hookName == "pre-receive" { + if testhelper.IsWALEnabled() || hookName == "pre-receive" { + // Previously objects were migrated to the repository if 'pre-receive' succeeded + // so 'update' hook failing would leave the objects in the repo. With transactions enabled, + // no objects end up in the repository unless the transaction is committed. We thus assert + // this to be the case always with transactions. require.Equal(t, localrepo.ErrObjectNotFound, err, "commit should have been discarded") } else { require.NoError(t, err) diff --git a/internal/gitaly/service/operations/squash.go b/internal/gitaly/service/operations/squash.go index 82b26d5ac..0b06c5030 100644 --- a/internal/gitaly/service/operations/squash.go +++ b/internal/gitaly/service/operations/squash.go @@ -9,6 +9,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagectx" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/internal/transaction/voting" @@ -199,5 +200,12 @@ func (s *Server) userSquash(ctx context.Context, req *gitalypb.UserSquashRequest return "", structerr.NewAborted("committing vote on squashed commit: %w", err) } + storagectx.RunWithTransaction(ctx, func(tx storagectx.Transaction) { + // Only referenced objects get committed as part of a transaction. Since this + // RPC doesn't reference the object, we have to manually mark it to be included + // in the final committed pack. + tx.IncludeObject(git.ObjectID(commitID)) + }) + return commitID, nil } diff --git a/internal/gitaly/service/operations/squash_test.go b/internal/gitaly/service/operations/squash_test.go index 3f1267102..1225c1a05 100644 --- a/internal/gitaly/service/operations/squash_test.go +++ b/internal/gitaly/service/operations/squash_test.go @@ -177,7 +177,11 @@ func testUserSquashTransactional(t *testing.T, ctx context.Context) { // Even though the committing vote has failed, we expect objects to have // been migrated after the preparatory vote. The commit should thus exist in // the repository. - expectedExists: true, + // + // With transactions, the object is written to the repository if the transaction successfully + // commits. Vote failure raises an error which causes the transaction to be rolled back. We don't + // thus expect to have the object in the repository. + expectedExists: !testhelper.IsWALEnabled(), }, } { t.Run(tc.desc, func(t *testing.T) { diff --git a/internal/gitaly/service/repository/fetch_remote_test.go b/internal/gitaly/service/repository/fetch_remote_test.go index 56f6c1f19..c62faf395 100644 --- a/internal/gitaly/service/repository/fetch_remote_test.go +++ b/internal/gitaly/service/repository/fetch_remote_test.go @@ -41,6 +41,13 @@ func gitRequestValidation(w http.ResponseWriter, r *http.Request, next http.Hand } func TestFetchRemote(t *testing.T) { + testhelper.SkipWithWAL(t, ` +Reference transaction hook is disabled in this RPC due to the reason commented in the +RPC handler body. For now, we'll wait for the RPC to be updated to do the reference updates +manually using update-ref with the fetch being just a dry-run. + +Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/3780`) + t.Parallel() ctx := testhelper.Context(t) @@ -1047,6 +1054,9 @@ func TestFetchRemote_transaction(t *testing.T) { } func TestFetchRemote_pooledRepository(t *testing.T) { + testhelper.SkipWithWAL(t, ` +Object pools are not yet supported with transaction management.`) + t.Parallel() ctx := testhelper.Context(t) diff --git a/internal/gitaly/service/repository/fsck_test.go b/internal/gitaly/service/repository/fsck_test.go index 0d78d1327..93156fbaf 100644 --- a/internal/gitaly/service/repository/fsck_test.go +++ b/internal/gitaly/service/repository/fsck_test.go @@ -1,6 +1,7 @@ package repository import ( + "fmt" "os" "path/filepath" "strings" @@ -13,6 +14,8 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) func TestFsck(t *testing.T) { @@ -77,6 +80,15 @@ func TestFsck(t *testing.T) { require.NoError(t, os.RemoveAll(filepath.Join(repoPath, "objects"))) require.NoError(t, os.WriteFile(filepath.Join(repoPath, "objects"), nil, perm.SharedFile)) + if testhelper.IsWALEnabled() { + return setupData{ + repo: repo, + expectedErr: status.Error(codes.Internal, + fmt.Sprintf("begin transaction: get partition: assign partition ID: get alternate partition ID: read alternates file: open: open %s/objects/info/alternates: not a directory", repoPath), + ), + } + } + return setupData{ repo: repo, requireResponse: func(actual *gitalypb.FsckResponse) { diff --git a/internal/gitaly/service/repository/get_custom_hooks_test.go b/internal/gitaly/service/repository/get_custom_hooks_test.go index 6e329b001..e9157c597 100644 --- a/internal/gitaly/service/repository/get_custom_hooks_test.go +++ b/internal/gitaly/service/repository/get_custom_hooks_test.go @@ -87,6 +87,11 @@ func TestGetCustomHooks_successful(t *testing.T) { } func TestGetCustomHooks_symlink(t *testing.T) { + testhelper.SkipWithWAL(t, ` +The repositories generally shouldn't have symlinks in them and the TransactionManager never writes any +symlinks. Symlinks are not supported when creating a snapshot of the repository. Disable the test as it +doesn't seem to test a realistic scenario.`) + t.Parallel() for _, tc := range []struct { diff --git a/internal/gitaly/service/repository/objects_size_test.go b/internal/gitaly/service/repository/objects_size_test.go index fd4ec9cc3..6a441985e 100644 --- a/internal/gitaly/service/repository/objects_size_test.go +++ b/internal/gitaly/service/repository/objects_size_test.go @@ -335,6 +335,8 @@ func testObjectsSize(t *testing.T, ctx context.Context) { { desc: "unique objects in an object deduplication network", setup: func(t *testing.T) setupData { + testhelper.SkipWithWAL(t, ` +Object pools are not yet supported with transaction management.`) repo, repoPath := gittest.CreateRepository(t, ctx, cfg) dedupBlob := gittest.WriteBlob(t, cfg, repoPath, []byte("1234")) dedupTree := gittest.WriteTree(t, cfg, repoPath, []gittest.TreeEntry{ @@ -379,6 +381,8 @@ func testObjectsSize(t *testing.T, ctx context.Context) { { desc: "deduplicated objects in an object deduplication network", setup: func(t *testing.T) setupData { + testhelper.SkipWithWAL(t, ` +Object pools are not yet supported with transaction management.`) repo, repoPath := gittest.CreateRepository(t, ctx, cfg) dedupBlob := gittest.WriteBlob(t, cfg, repoPath, []byte("1234")) dedupTree := gittest.WriteTree(t, cfg, repoPath, []gittest.TreeEntry{ diff --git a/internal/gitaly/service/repository/remove.go b/internal/gitaly/service/repository/remove.go index e1281a2f6..dc9318668 100644 --- a/internal/gitaly/service/repository/remove.go +++ b/internal/gitaly/service/repository/remove.go @@ -5,6 +5,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/repoutil" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagectx" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) @@ -19,5 +20,9 @@ func (s *server) RemoveRepository(ctx context.Context, in *gitalypb.RemoveReposi return nil, err } + storagectx.RunWithTransaction(ctx, func(tx storagectx.Transaction) { + tx.DeleteRepository() + }) + return &gitalypb.RemoveRepositoryResponse{}, nil } diff --git a/internal/gitaly/service/repository/remove_test.go b/internal/gitaly/service/repository/remove_test.go index b49ad7c4a..60deba09c 100644 --- a/internal/gitaly/service/repository/remove_test.go +++ b/internal/gitaly/service/repository/remove_test.go @@ -55,6 +55,10 @@ func TestRemoveRepository_validate(t *testing.T) { } func TestRemoveRepository_locking(t *testing.T) { + testhelper.SkipWithWAL(t, ` +Repository locks are not acquired with transaction management enabled. The test and the locking +logic will be removed once transaction managements is always enabled.`) + t.Parallel() ctx := testhelper.Context(t) diff --git a/internal/gitaly/service/repository/replicate_test.go b/internal/gitaly/service/repository/replicate_test.go index 7d17196be..fde02ce8e 100644 --- a/internal/gitaly/service/repository/replicate_test.go +++ b/internal/gitaly/service/repository/replicate_test.go @@ -42,6 +42,11 @@ import ( ) func TestReplicateRepository(t *testing.T) { + testhelper.SkipWithWAL(t, ` +ReplicateRepository is replicating git attributes as a separate file. WAL doesn't +support this as the separate attributes file is going to be replaced with reading the +attributes from HEAD.`) + t.Parallel() testhelper.NewFeatureSets( featureflag.ReplicateRepositoryObjectPool, diff --git a/internal/gitaly/service/repository/restore_repository_test.go b/internal/gitaly/service/repository/restore_repository_test.go index a8b50c92d..ce56ec877 100644 --- a/internal/gitaly/service/repository/restore_repository_test.go +++ b/internal/gitaly/service/repository/restore_repository_test.go @@ -159,10 +159,14 @@ func TestRestoreRepository(t *testing.T) { backupID: "abc123", } }, - expectedErr: structerr.NewInvalidArgument(testhelper.GitalyOrPraefect( - "restore repository: repository: repository not set", - "repository not set", - )), + expectedErr: func() error { + errorMessage := "restore repository: repository: repository not set" + if testhelper.IsPraefectEnabled() || testhelper.IsWALEnabled() { + errorMessage = "repository not set" + } + + return structerr.NewInvalidArgument(errorMessage) + }(), }, { desc: "missing backup sink", diff --git a/internal/gitaly/service/repository/set_custom_hooks.go b/internal/gitaly/service/repository/set_custom_hooks.go index 482c72ac4..5368d00be 100644 --- a/internal/gitaly/service/repository/set_custom_hooks.go +++ b/internal/gitaly/service/repository/set_custom_hooks.go @@ -1,7 +1,11 @@ package repository import ( + "bytes" + "io" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/repoutil" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagectx" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/v16/streamio" @@ -23,7 +27,8 @@ func (s *server) SetCustomHooks(stream gitalypb.RepositoryService_SetCustomHooks return structerr.NewInvalidArgument("%w", err) } - reader := streamio.NewReader(func() ([]byte, error) { + var customHooksTAR bytes.Buffer + reader := io.TeeReader(streamio.NewReader(func() ([]byte, error) { if firstRequest != nil { data := firstRequest.GetData() firstRequest = nil @@ -32,12 +37,16 @@ func (s *server) SetCustomHooks(stream gitalypb.RepositoryService_SetCustomHooks request, err := stream.Recv() return request.GetData(), err - }) + }), &customHooksTAR) if err := repoutil.SetCustomHooks(ctx, s.logger, s.locator, s.txManager, reader, repo); err != nil { return structerr.NewInternal("setting custom hooks: %w", err) } + storagectx.RunWithTransaction(ctx, func(tx storagectx.Transaction) { + tx.SetCustomHooks(customHooksTAR.Bytes()) + }) + return stream.SendAndClose(&gitalypb.SetCustomHooksResponse{}) } diff --git a/internal/gitaly/service/repository/set_custom_hooks_test.go b/internal/gitaly/service/repository/set_custom_hooks_test.go index 310591062..823762046 100644 --- a/internal/gitaly/service/repository/set_custom_hooks_test.go +++ b/internal/gitaly/service/repository/set_custom_hooks_test.go @@ -64,6 +64,8 @@ func TestSetCustomHooksRequest_success(t *testing.T) { { desc: "RestoreCustomHooks", streamWriter: func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, client gitalypb.RepositoryServiceClient) (io.Writer, func()) { + testhelper.SkipWithWAL(t, "This RPC is deprecated and pending removal") + //nolint:staticcheck stream, err := client.RestoreCustomHooks(ctx) require.NoError(t, err) diff --git a/internal/gitaly/service/repository/size_test.go b/internal/gitaly/service/repository/size_test.go index f41cb2186..0ed9b7516 100644 --- a/internal/gitaly/service/repository/size_test.go +++ b/internal/gitaly/service/repository/size_test.go @@ -22,6 +22,9 @@ import ( ) func TestRepositorySize_poolMember(t *testing.T) { + testhelper.SkipWithWAL(t, ` +Object pools are not yet supported with transaction management.`) + t.Parallel() testhelper.NewFeatureSets(featureflag.TransactionalLinkRepository).Run(t, testRepositorySizePoolMember) } diff --git a/internal/gitaly/service/repository/snapshot_test.go b/internal/gitaly/service/repository/snapshot_test.go index a051a9a77..35d969c26 100644 --- a/internal/gitaly/service/repository/snapshot_test.go +++ b/internal/gitaly/service/repository/snapshot_test.go @@ -100,6 +100,11 @@ func TestGetSnapshot(t *testing.T) { { desc: "repository contains symlink", setup: func(t *testing.T) setupData { + testhelper.SkipWithWAL(t, ` +The repositories generally shouldn't have symlinks in them and the TransactionManager never writes any +symlinks. Symlinks are not supported when creating a snapshot of the repository. Disable the test as it +doesn't seem to test a realistic scenario.`) + repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg) // Make packed-refs into a symlink so the RPC returns an error. diff --git a/internal/gitaly/service/smarthttp/cache.go b/internal/gitaly/service/smarthttp/cache.go index 6d8113bfc..df51f9a17 100644 --- a/internal/gitaly/service/smarthttp/cache.go +++ b/internal/gitaly/service/smarthttp/cache.go @@ -8,9 +8,11 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "gitlab.com/gitlab-org/gitaly/v16/internal/cache" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagectx" "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "google.golang.org/protobuf/proto" ) type infoRefCache struct { @@ -57,7 +59,15 @@ func (c infoRefCache) tryCache(ctx context.Context, in *gitalypb.InfoRefsRequest c.logger.DebugContext(ctx, "Attempting to fetch cached response") countAttempt() - stream, err := c.streamer.GetStream(ctx, in.GetRepository(), in) + storagectx.RunWithTransaction(ctx, func(tx storagectx.Transaction) { + // The cache uses the requests as the keys. As the request's repository in the RPC handler has been rewritten + // to point to the transaction's repository, the handler sees each request as different even if they point to + // the same repository. Restore the original request to ensure identical requests get the same key. + in = proto.Clone(in).(*gitalypb.InfoRefsRequest) + in.Repository = tx.OriginalRepository(in.Repository) + }) + + stream, err := c.streamer.GetStream(ctx, in.Repository, in) switch err { case nil: defer stream.Close() diff --git a/internal/gitaly/service/smarthttp/inforefs_test.go b/internal/gitaly/service/smarthttp/inforefs_test.go index 259cc0f91..a7a096247 100644 --- a/internal/gitaly/service/smarthttp/inforefs_test.go +++ b/internal/gitaly/service/smarthttp/inforefs_test.go @@ -318,6 +318,9 @@ func testInfoRefsReceivePackSuccessful(t *testing.T, ctx context.Context) { } func TestInfoRefsReceivePack_hiddenRefs(t *testing.T) { + testhelper.SkipWithWAL(t, ` +Object pools are not yet support with WAL. This test is testing with a pooled repository.`) + t.Parallel() testhelper.NewFeatureSets( featureflag.TransactionalLinkRepository, diff --git a/internal/gitaly/service/smarthttp/receive_pack_test.go b/internal/gitaly/service/smarthttp/receive_pack_test.go index 8724d3f97..2e503ae70 100644 --- a/internal/gitaly/service/smarthttp/receive_pack_test.go +++ b/internal/gitaly/service/smarthttp/receive_pack_test.go @@ -94,7 +94,22 @@ func TestPostReceivePack_successful(t *testing.T) { // Compare the repository up front so that we can use require.Equal for // the remaining values. - testhelper.ProtoEqual(t, gittest.RewrittenRepository(t, ctx, cfg, repo), payload.Repo) + expectedRepo := gittest.RewrittenRepository(t, ctx, cfg, repo) + if testhelper.IsWALEnabled() { + // The repository should be quarantined. + require.NotEmpty(t, payload.Repo.GitObjectDirectory) + payload.Repo.GitObjectDirectory = "OVERRIDDEN" + expectedRepo.GitObjectDirectory = "OVERRIDDEN" + require.NotEmpty(t, payload.Repo.GitAlternateObjectDirectories) + payload.Repo.GitAlternateObjectDirectories = []string{"OVERRIDDEN"} + expectedRepo.GitAlternateObjectDirectories = []string{"OVERRIDDEN"} + // The following values may change so we don't want to assert them for equality. + require.NotEmpty(t, payload.Repo.RelativePath) + payload.Repo.RelativePath = "OVERRIDDEN" + expectedRepo.RelativePath = "OVERRIDDEN" + } + + testhelper.ProtoEqual(t, expectedRepo, payload.Repo) payload.Repo = nil // If running tests with Praefect, then the transaction would be set, but we have no way of @@ -113,6 +128,11 @@ func TestPostReceivePack_successful(t *testing.T) { require.ElementsMatch(t, expectedFeatureFlags, payload.FeatureFlagsWithValue) payload.FeatureFlagsWithValue = nil + var transactionID storage.TransactionID + if testhelper.IsWALEnabled() { + transactionID = 1 + } + require.Equal(t, git.HooksPayload{ ObjectFormat: gittest.DefaultObjectHash.Format, RuntimeDir: cfg.RuntimeDir, @@ -124,6 +144,7 @@ func TestPostReceivePack_successful(t *testing.T) { Protocol: "http", }, RequestedHooks: git.ReceivePackHooks, + TransactionID: transactionID, }, payload) } diff --git a/internal/gitaly/service/ssh/receive_pack_test.go b/internal/gitaly/service/ssh/receive_pack_test.go index 3e461b0ad..83ddb0bb4 100644 --- a/internal/gitaly/service/ssh/receive_pack_test.go +++ b/internal/gitaly/service/ssh/receive_pack_test.go @@ -159,12 +159,26 @@ func TestReceivePack_success(t *testing.T) { // Compare the repository up front so that we can use require.Equal for // the remaining values. - testhelper.ProtoEqual(t, &gitalypb.Repository{ - StorageName: cfg.Storages[0].Name, - RelativePath: gittest.GetReplicaPath(t, ctx, cfg, remoteRepo), - GlProjectPath: remoteRepo.GlProjectPath, - GlRepository: remoteRepo.GlRepository, - }, payload.Repo) + // + expectedRepo := gittest.RewrittenRepository(t, ctx, cfg, remoteRepo) + // The repository should have a relative path. + if testhelper.IsWALEnabled() { + // The repository should be quarantined. + require.NotEmpty(t, payload.Repo.GitObjectDirectory) + payload.Repo.GitObjectDirectory = "OVERRIDDEN" + expectedRepo.GitObjectDirectory = "OVERRIDDEN" + require.NotEmpty(t, payload.Repo.GitAlternateObjectDirectories) + payload.Repo.GitAlternateObjectDirectories = []string{"OVERRIDDEN"} + expectedRepo.GitAlternateObjectDirectories = []string{"OVERRIDDEN"} + // The following values may change so we don't want to assert them for equality. + require.NotEmpty(t, payload.Repo.RelativePath) + payload.Repo.RelativePath = "OVERRIDDEN" + expectedRepo.RelativePath = "OVERRIDDEN" + } + + // Compare the repository up front so that we can use require.Equal for + // the remaining values. + testhelper.ProtoEqual(t, expectedRepo, payload.Repo) payload.Repo = nil // If running tests with Praefect, then the transaction would be set, but we have no way of @@ -183,6 +197,11 @@ func TestReceivePack_success(t *testing.T) { require.ElementsMatch(t, expectedFeatureFlags, payload.FeatureFlagsWithValue) payload.FeatureFlagsWithValue = nil + var transactionID storage.TransactionID + if testhelper.IsWALEnabled() { + transactionID = 2 + } + require.Equal(t, git.HooksPayload{ ObjectFormat: gittest.DefaultObjectHash.Format, RuntimeDir: cfg.RuntimeDir, @@ -194,6 +213,7 @@ func TestReceivePack_success(t *testing.T) { Protocol: "ssh", }, RequestedHooks: git.ReceivePackHooks, + TransactionID: transactionID, }, payload) } @@ -401,6 +421,9 @@ func TestReceivePack_customHookFailure(t *testing.T) { } func TestReceivePack_hidesObjectPoolReferences(t *testing.T) { + testhelper.SkipWithWAL(t, ` +Object pools are not yet support with WAL. This test is testing with a pooled repository.`) + t.Parallel() testhelper.NewFeatureSets(featureflag.TransactionalLinkRepository).Run(t, testReceivePackHidesObjectPoolReferences) } diff --git a/internal/gitaly/storage/storagemgr/middleware.go b/internal/gitaly/storage/storagemgr/middleware.go new file mode 100644 index 000000000..1ed793387 --- /dev/null +++ b/internal/gitaly/storage/storagemgr/middleware.go @@ -0,0 +1,313 @@ +package storagemgr + +import ( + "context" + "errors" + "fmt" + + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagectx" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" + "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" + "google.golang.org/grpc" + "google.golang.org/protobuf/proto" +) + +// ErrQuarantineConfiguredOnMutator is returned when a mutator request is received with a quarantine configured. +var ErrQuarantineConfiguredOnMutator = errors.New("quarantine configured on a mutator request") + +var nonTransactionalRPCs = map[string]struct{}{ + // This isn't registered in protoregistry so mark it here as non-transactional. + "/grpc.health.v1.Health/Check": {}, + + // These are missing annotations. We don't have a suitable scope for them + // so mark these as non-transactional here. + "/gitaly.ServerService/DiskStatistics": {}, + "/gitaly.ServerService/ServerInfo": {}, + "/gitaly.ServerService/ClockSynced": {}, + "/gitaly.ServerService/ReadinessCheck": {}, + + // Object pools are not yet supported with WAL. + "/gitaly.ObjectPoolService/CreateObjectPool": {}, + "/gitaly.ObjectPoolService/DeleteObjectPool": {}, + "/gitaly.ObjectPoolService/LinkRepositoryToObjectPool": {}, + "/gitaly.ObjectPoolService/DisconnectGitAlternates": {}, + "/gitaly.ObjectPoolService/FetchIntoObjectPool": {}, + "/gitaly.ObjectPoolService/GetObjectPool": {}, + // GetSnapshot is testing logic with object pools as well. + "/gitaly.RepositoryService/GetSnapshot": {}, + + // Repository creations are not yet handled through the WAL. + "/gitaly.RepositoryService/CreateRepository": {}, + "/gitaly.RepositoryService/CreateRepositoryFromURL": {}, + "/gitaly.RepositoryService/CreateRepositoryFromBundle": {}, + "/gitaly.RepositoryService/CreateFork": {}, + "/gitaly.RepositoryService/CreateRepositoryFromSnapshot": {}, + + // ReplicateRepository is replicating the attributes and config which the + // WAL won't support. This is pending removal of their replication. + // + // ReplicateRepository may also create a repository which is not yet supported + // through the WAL. + "/gitaly.RepositoryService/ReplicateRepository": {}, + + // Below RPCs implement functionality which isn't going to be supported by WAL. + // Handle these as non-transactional. Their usage must be removed prior to enabling WAL. + // + // Attributes are going to be read from HEAD. Writing out a separate attributes file + // won't be supported. + "/gitaly.RepositoryService/ApplyGitattributes": {}, + // SetFullPath writes the full path into git config and is the last RPC that writes into the + // git config. Writing into the config won't be supported. + "/gitaly.RepositoryService/SetFullPath": {}, + // RenameRepository is pending removal as we need stable IDs for repositories. Renaming won't + // be supported with WAL. + "/gitaly.RepositoryService/RenameRepository": {}, +} + +// NewUnaryInterceptor returns an unary interceptor that manages a unary RPC's transaction. It starts a transaction +// on the target repository of the request and rewrites the request to point to the transaction's snapshot repository. +// The transaction is committed if the handler doesn't return an error and rolled back otherwise. +func NewUnaryInterceptor(logger log.Logger, registry *protoregistry.Registry, txRegistry *TransactionRegistry, mgr *PartitionManager, locator storage.Locator) grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (_ interface{}, returnedErr error) { + if _, ok := nonTransactionalRPCs[info.FullMethod]; ok { + return handler(ctx, req) + } + + methodInfo, err := registry.LookupMethod(info.FullMethod) + if err != nil { + return nil, fmt.Errorf("lookup method: %w", err) + } + + txReq, err := transactionalizeRequest(ctx, logger, txRegistry, mgr, locator, methodInfo, req.(proto.Message)) + if err != nil { + return nil, err + } + defer func() { returnedErr = txReq.finishTransaction(returnedErr) }() + + return handler(txReq.ctx, txReq.firstMessage) + } +} + +// peekedStream allows for peeking the first message of ServerStream. Reading the first message would leave +// handler unable to read the first message as it was already consumed. peekedStream allows for restoring the +// stream so the RPC handler can read the first message as usual. It additionally supports overriding the +// context of the stream. +type peekedStream struct { + context context.Context + firstMessage proto.Message + firstError error + grpc.ServerStream +} + +func (ps *peekedStream) Context() context.Context { + return ps.context +} + +func (ps *peekedStream) RecvMsg(dst interface{}) error { + if ps.firstError != nil { + firstError := ps.firstError + ps.firstError = nil + return firstError + } + + if ps.firstMessage != nil { + marshaled, err := proto.Marshal(ps.firstMessage) + if err != nil { + return fmt.Errorf("marshal: %w", err) + } + + if err := proto.Unmarshal(marshaled, dst.(proto.Message)); err != nil { + return fmt.Errorf("unmarshal: %w", err) + } + + ps.firstMessage = nil + return nil + } + + return ps.ServerStream.RecvMsg(dst) +} + +// NewStreamInterceptor returns a stream interceptor that manages a streaming RPC's transaction. It starts a transaction +// on the target repository of the first request and rewrites the request to point to the transaction's snapshot repository. +// The transaction is committed if the handler doesn't return an error and rolled back otherwise. +func NewStreamInterceptor(logger log.Logger, registry *protoregistry.Registry, txRegistry *TransactionRegistry, mgr *PartitionManager, locator storage.Locator) grpc.StreamServerInterceptor { + return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (returnedErr error) { + if _, ok := nonTransactionalRPCs[info.FullMethod]; ok { + return handler(srv, ss) + } + + methodInfo, err := registry.LookupMethod(info.FullMethod) + if err != nil { + return fmt.Errorf("lookup method: %w", err) + } + + req := methodInfo.NewRequest() + if err := ss.RecvMsg(req); err != nil { + // All of the repository scoped streaming RPCs send the repository in the first message. + // Generally it should be fine to error out in all cases if there is no message sent. + // To maintain compatibility with tests, we instead invoke the handler to let them return + // the asserted error messages. Once the transaction management is on by default, we should + // error out here directly and amend the failing test cases. + return handler(srv, &peekedStream{ + context: ss.Context(), + firstError: err, + ServerStream: ss, + }) + } + + txReq, err := transactionalizeRequest(ss.Context(), logger, txRegistry, mgr, locator, methodInfo, req) + if err != nil { + return err + } + defer func() { returnedErr = txReq.finishTransaction(returnedErr) }() + + return handler(srv, &peekedStream{ + context: txReq.ctx, + firstMessage: txReq.firstMessage, + ServerStream: ss, + }) + } +} + +// transactionalizedRequest contains the context and the first request to pass into the RPC handler to +// run it correctly against the transaction. +type transactionalizedRequest struct { + // ctx is the request's context with the transaction added into it. + ctx context.Context + // firstMessage is the message to pass to the RPC as the first message. The target repository + // in it has been rewritten to point to the snapshot repository. + firstMessage proto.Message + // finishTransaction takes in the error returned from the handler and returns the error + // that should be returned to the client. If the handler error is nil, the transaction is committed. + // If the handler error is not nil, the transaction is rolled back. + finishTransaction func(error) error +} + +// nonTransactionalRequest returns a no-op transactionalizedRequest that configures the RPC handler to be +// run as normal without a transaction. +func nonTransactionalRequest(ctx context.Context, firstMessage proto.Message) transactionalizedRequest { + return transactionalizedRequest{ + ctx: ctx, + firstMessage: firstMessage, + finishTransaction: func(err error) error { return err }, + } +} + +// transactionalizeRequest begins a transaction for the repository targeted in the request. It returns the context and the request that the handler should +// be invoked with. In addition, it returns a function that must be called with the error returned from the handler to either commit or rollback the +// transaction. The returned values are valid even if the request should not run transactionally. +func transactionalizeRequest(ctx context.Context, logger log.Logger, txRegistry *TransactionRegistry, mgr *PartitionManager, locator storage.Locator, methodInfo protoregistry.MethodInfo, req proto.Message) (_ transactionalizedRequest, returnedErr error) { + if methodInfo.Scope != protoregistry.ScopeRepository { + return nonTransactionalRequest(ctx, req), nil + } + + repo, err := methodInfo.TargetRepo(req) + if err != nil { + if errors.Is(err, protoregistry.ErrRepositoryFieldNotFound) { + // The above error is returned when the repository field is not set in the request. + // Return instead the error many tests are asserting to be returned from the handlers. + return transactionalizedRequest{}, structerr.NewInvalidArgument("%w", storage.ErrRepositoryNotSet) + } + + return transactionalizedRequest{}, fmt.Errorf("extract target repository: %w", err) + } + + if methodInfo.Operation != protoregistry.OpAccessor && methodInfo.Operation != protoregistry.OpMutator { + // Transactions support only accessors and mutators. + return nonTransactionalRequest(ctx, req), nil + } + + if repo.GitObjectDirectory != "" || len(repo.GitAlternateObjectDirectories) > 0 { + // The object directories should only be configured on a repository coming from a request that + // was already configured with a quarantine directory and is being looped back to Gitaly from Rails' + // authorization checks. If that's the case, the request should already be running in scope of a + // transaction and the repository rewritten to point to the snapshot repository. We thus don't start + // a new transaction if we encounter this. + // + // This property is violated in tests which manually configure the object directory or the alternate + // object directory. This allows for circumventing the transaction management by configuring the either + // of the object directories. We'll leave this unaddressed for now and later address this by removing + // the options to configure object directories and alternates in a request. + // + // The relative path in quarantined requests is currently still pointing to the original repository. + // https://gitlab.com/gitlab-org/gitaly/-/issues/5483 tracks having Rails send the snapshot's relative + // path instead. + + if methodInfo.Operation == protoregistry.OpMutator { + // Accessor requests may come with quarantine configured from Rails' access checks. Since the + // RPC that triggered these access checks would already run in a transaction and target a + // snapshot, we won't start another one. Mutators however are rejected to prevent writes + // unintentionally targeting the main repository. + return transactionalizedRequest{}, ErrQuarantineConfiguredOnMutator + } + + return nonTransactionalRequest(ctx, req), nil + } + + // While the PartitionManager already verifies the repository's storage and relative path, it does not + // return the exact same error messages as some RPCs are testing for at the moment. In order to maintain + // compatibility with said tests, validate the repository here ahead of time and return the possible error + // as is. + if err := locator.ValidateRepository(repo, storage.WithSkipRepositoryExistenceCheck()); err != nil { + return transactionalizedRequest{}, err + } + + tx, err := mgr.Begin(ctx, repo, TransactionOptions{ReadOnly: methodInfo.Operation == protoregistry.OpAccessor}) + if err != nil { + return transactionalizedRequest{}, fmt.Errorf("begin transaction: %w", err) + } + ctx = storagectx.ContextWithTransaction(ctx, tx) + + txID := txRegistry.register(tx.Transaction) + ctx = storage.ContextWithTransactionID(ctx, txID) + + finishTX := func(handlerErr error) error { + defer txRegistry.unregister(txID) + + if handlerErr != nil { + if err := tx.Rollback(); err != nil { + logger.WithError(err).ErrorContext(ctx, "failed rolling back transaction") + } + + return handlerErr + } + + if err := tx.Commit(ctx); err != nil { + return fmt.Errorf("commit: %w", err) + } + + return nil + } + + defer func() { + if returnedErr != nil { + returnedErr = finishTX(returnedErr) + } + }() + + rewrittenReq, err := rewriteRequest(tx, methodInfo, req) + if err != nil { + return transactionalizedRequest{}, fmt.Errorf("rewrite request: %w", err) + } + + return transactionalizedRequest{ + ctx: ctx, + firstMessage: rewrittenReq, + finishTransaction: finishTX, + }, nil +} + +func rewriteRequest(tx *finalizableTransaction, methodInfo protoregistry.MethodInfo, req proto.Message) (proto.Message, error) { + // Clone the request in order to not rewrite the request in the earlier interceptors. + rewrittenReq := proto.Clone(req) + targetRepo, err := methodInfo.TargetRepo(rewrittenReq) + if err != nil { + return nil, fmt.Errorf("extract target repository: %w", err) + } + + *targetRepo = *tx.RewriteRepository(targetRepo) + + return rewrittenReq, nil +} diff --git a/internal/gitaly/storage/storagemgr/middleware_test.go b/internal/gitaly/storage/storagemgr/middleware_test.go new file mode 100644 index 000000000..6c2764369 --- /dev/null +++ b/internal/gitaly/storage/storagemgr/middleware_test.go @@ -0,0 +1,554 @@ +package storagemgr_test + +import ( + "context" + "errors" + "io" + "testing" + + "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus/hooks/test" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagectx" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" + "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testserver" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "google.golang.org/grpc" + "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/protobuf/proto" +) + +type mockRepositoryService struct { + objectFormatFunc func(context.Context, *gitalypb.ObjectFormatRequest) (*gitalypb.ObjectFormatResponse, error) + optimizeRepositoryFunc func(context.Context, *gitalypb.OptimizeRepositoryRequest) (*gitalypb.OptimizeRepositoryResponse, error) + removeRepositoryFunc func(context.Context, *gitalypb.RemoveRepositoryRequest) (*gitalypb.RemoveRepositoryResponse, error) + setCustomHooksFunc func(gitalypb.RepositoryService_SetCustomHooksServer) error + getCustomHooksFunc func(*gitalypb.GetCustomHooksRequest, gitalypb.RepositoryService_GetCustomHooksServer) error + gitalypb.UnimplementedRepositoryServiceServer +} + +func (m mockRepositoryService) ObjectFormat(ctx context.Context, req *gitalypb.ObjectFormatRequest) (*gitalypb.ObjectFormatResponse, error) { + return m.objectFormatFunc(ctx, req) +} + +func (m mockRepositoryService) OptimizeRepository(ctx context.Context, req *gitalypb.OptimizeRepositoryRequest) (*gitalypb.OptimizeRepositoryResponse, error) { + return m.optimizeRepositoryFunc(ctx, req) +} + +func (m mockRepositoryService) RemoveRepository(ctx context.Context, req *gitalypb.RemoveRepositoryRequest) (*gitalypb.RemoveRepositoryResponse, error) { + return m.removeRepositoryFunc(ctx, req) +} + +func (m mockRepositoryService) SetCustomHooks(stream gitalypb.RepositoryService_SetCustomHooksServer) error { + return m.setCustomHooksFunc(stream) +} + +func (m mockRepositoryService) GetCustomHooks(req *gitalypb.GetCustomHooksRequest, stream gitalypb.RepositoryService_GetCustomHooksServer) error { + return m.getCustomHooksFunc(req, stream) +} + +type mockHealthService struct { + checkFunc func(context.Context, *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) + grpc_health_v1.UnimplementedHealthServer +} + +func (m mockHealthService) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) { + return m.checkFunc(ctx, req) +} + +func TestMiddleware_transactional(t *testing.T) { + if !testhelper.IsWALEnabled() { + t.Skip(` +The test relies on the interceptor being configured in the test server. +Only run the test with WAL enabled as other wise the interceptor won't +be configured.`) + } + + testhelper.SkipWithPraefect(t, ` +This interceptor is for use with Gitaly. Praefect running in front of it may change error +messages and behavior by erroring out the requests before they even hit this interceptor.`) + + t.Parallel() + + validRepository := func() *gitalypb.Repository { + return &gitalypb.Repository{ + StorageName: "default", + RelativePath: "relative-path", + GlRepository: "gl-repository", + GlProjectPath: "project-path", + } + } + + for _, tc := range []struct { + desc string + repository *gitalypb.Repository + performRequest func(*testing.T, context.Context, gitalypb.RepositoryServiceClient) + handlerError error + rollbackTransaction bool + expectHandlerInvoked bool + expectedRollbackError error + expectedResponse proto.Message + expectedError error + }{ + { + desc: "missing repository", + performRequest: func(t *testing.T, ctx context.Context, client gitalypb.RepositoryServiceClient) { + resp, err := client.ObjectFormat(ctx, &gitalypb.ObjectFormatRequest{}) + require.Nil(t, resp) + testhelper.RequireGrpcError(t, + structerr.NewInvalidArgument("%w", storage.ErrRepositoryNotSet), + err, + ) + }, + }, + { + desc: "storage not set", + performRequest: func(t *testing.T, ctx context.Context, client gitalypb.RepositoryServiceClient) { + resp, err := client.ObjectFormat(ctx, &gitalypb.ObjectFormatRequest{ + Repository: &gitalypb.Repository{ + RelativePath: "non-existent-relative-path", + }, + }) + require.Nil(t, resp) + testhelper.RequireGrpcError(t, + structerr.NewInvalidArgument("%w", storage.ErrStorageNotSet), + err, + ) + }, + }, + { + desc: "relative path not set", + performRequest: func(t *testing.T, ctx context.Context, client gitalypb.RepositoryServiceClient) { + resp, err := client.ObjectFormat(ctx, &gitalypb.ObjectFormatRequest{ + Repository: &gitalypb.Repository{ + StorageName: "default", + }, + }) + require.Nil(t, resp) + testhelper.RequireGrpcError(t, + structerr.NewInvalidArgument("%w", storage.ErrRepositoryPathNotSet), + err, + ) + }, + }, + { + desc: "invalid storage", + performRequest: func(t *testing.T, ctx context.Context, client gitalypb.RepositoryServiceClient) { + resp, err := client.ObjectFormat(ctx, &gitalypb.ObjectFormatRequest{ + Repository: &gitalypb.Repository{ + StorageName: "non-existent-storage", + RelativePath: "non-existent-relative-path", + }, + }) + require.Nil(t, resp) + testhelper.RequireGrpcError(t, + testhelper.ToInterceptedMetadata(structerr.NewInvalidArgument("%w", storage.NewStorageNotFoundError("non-existent-storage"))), + err, + ) + }, + }, + { + desc: "unary rollback error is logged", + performRequest: func(t *testing.T, ctx context.Context, client gitalypb.RepositoryServiceClient) { + resp, err := client.ObjectFormat(ctx, &gitalypb.ObjectFormatRequest{Repository: validRepository()}) + require.Nil(t, resp) + testhelper.RequireGrpcError(t, + structerr.NewInternal("handler error"), + err, + ) + }, + rollbackTransaction: true, + expectHandlerInvoked: true, + handlerError: errors.New("handler error"), + expectedRollbackError: storagemgr.ErrTransactionAlreadyRollbacked, + }, + { + desc: "streaming rollback error is logged", + performRequest: func(t *testing.T, ctx context.Context, client gitalypb.RepositoryServiceClient) { + stream, err := client.GetCustomHooks(ctx, &gitalypb.GetCustomHooksRequest{Repository: validRepository()}) + require.NoError(t, err) + + resp, err := stream.Recv() + testhelper.RequireGrpcError(t, + structerr.NewInternal("handler error"), + err, + ) + require.Nil(t, resp) + }, + rollbackTransaction: true, + expectHandlerInvoked: true, + handlerError: errors.New("handler error"), + expectedRollbackError: storagemgr.ErrTransactionAlreadyRollbacked, + }, + { + desc: "unary commit error is returned", + performRequest: func(t *testing.T, ctx context.Context, client gitalypb.RepositoryServiceClient) { + resp, err := client.ObjectFormat(ctx, &gitalypb.ObjectFormatRequest{Repository: validRepository()}) + require.Nil(t, resp) + testhelper.RequireGrpcError(t, + structerr.NewInternal("commit: transaction already rollbacked"), + err, + ) + }, + rollbackTransaction: true, + expectHandlerInvoked: true, + }, + { + desc: "streaming commit error is returned", + performRequest: func(t *testing.T, ctx context.Context, client gitalypb.RepositoryServiceClient) { + stream, err := client.GetCustomHooks(ctx, &gitalypb.GetCustomHooksRequest{Repository: validRepository()}) + require.NoError(t, err) + + resp, err := stream.Recv() + testhelper.RequireGrpcError(t, + structerr.NewInternal("commit: transaction already rollbacked"), + err, + ) + require.Nil(t, resp) + }, + rollbackTransaction: true, + expectHandlerInvoked: true, + }, + { + desc: "failed unary request", + performRequest: func(t *testing.T, ctx context.Context, client gitalypb.RepositoryServiceClient) { + resp, err := client.ObjectFormat(ctx, &gitalypb.ObjectFormatRequest{Repository: validRepository()}) + testhelper.RequireGrpcError(t, + structerr.NewInternal("handler error"), + err, + ) + require.Nil(t, resp) + }, + + handlerError: errors.New("handler error"), + expectHandlerInvoked: true, + }, + { + desc: "successful unary accessor", + performRequest: func(t *testing.T, ctx context.Context, client gitalypb.RepositoryServiceClient) { + resp, err := client.ObjectFormat(ctx, &gitalypb.ObjectFormatRequest{Repository: validRepository()}) + require.NoError(t, err) + testhelper.ProtoEqual(t, &gitalypb.ObjectFormatResponse{}, resp) + }, + expectHandlerInvoked: true, + }, + { + desc: "successful streaming accessor", + performRequest: func(t *testing.T, ctx context.Context, client gitalypb.RepositoryServiceClient) { + stream, err := client.GetCustomHooks(ctx, &gitalypb.GetCustomHooksRequest{Repository: validRepository()}) + require.NoError(t, err) + + resp, err := stream.Recv() + require.Equal(t, io.EOF, err) + require.Nil(t, resp) + }, + expectHandlerInvoked: true, + }, + { + desc: "successful unary mutator", + performRequest: func(t *testing.T, ctx context.Context, client gitalypb.RepositoryServiceClient) { + resp, err := client.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{Repository: validRepository()}) + require.NoError(t, err) + testhelper.ProtoEqual(t, &gitalypb.RemoveRepositoryResponse{}, resp) + }, + expectHandlerInvoked: true, + }, + { + desc: "successful streaming mutator", + performRequest: func(t *testing.T, ctx context.Context, client gitalypb.RepositoryServiceClient) { + stream, err := client.SetCustomHooks(ctx) + require.NoError(t, err) + + require.NoError(t, stream.Send(&gitalypb.SetCustomHooksRequest{Repository: validRepository()})) + + resp, err := stream.CloseAndRecv() + require.Equal(t, io.EOF, err) + require.Nil(t, resp) + }, + expectHandlerInvoked: true, + }, + { + desc: "mutator with object directory configured", + performRequest: func(t *testing.T, ctx context.Context, client gitalypb.RepositoryServiceClient) { + repo := validRepository() + repo.GitObjectDirectory = "non-default" + + resp, err := client.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{Repository: repo}) + testhelper.RequireGrpcError(t, structerr.NewInternal("%w", storagemgr.ErrQuarantineConfiguredOnMutator), err) + require.Nil(t, resp) + }, + }, + { + desc: "mutator with alternate object directory configured", + performRequest: func(t *testing.T, ctx context.Context, client gitalypb.RepositoryServiceClient) { + repo := validRepository() + repo.GitAlternateObjectDirectories = []string{"non-default"} + + resp, err := client.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{Repository: repo}) + testhelper.RequireGrpcError(t, structerr.NewInternal("%w", storagemgr.ErrQuarantineConfiguredOnMutator), err) + require.Nil(t, resp) + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + cfg := testcfg.Build(t) + + ctx := testhelper.Context(t) + gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + RelativePath: validRepository().RelativePath, + }) + + txRegistry := storagemgr.NewTransactionRegistry() + + logger, loggerHook := test.NewNullLogger() + entry := logrus.NewEntry(logger) + + handlerInvoked := false + var transactionID storage.TransactionID + + assertHandler := func(ctx context.Context, isMutator bool, repo *gitalypb.Repository) { + handlerInvoked = true + + // The repositories should be equal except for the relative path which + // has been pointed to the snapshot. + expectedRepo := validRepository() + actualRepo := repo + + // When run in a transaction, the relative path will be pointed to the snapshot. + assert.NotEqual(t, expectedRepo.RelativePath, repo.RelativePath) + expectedRepo.RelativePath = "" + actualRepo.RelativePath = "" + + if isMutator { + // Mutators should have quarantine directory configured. + assert.NotEmpty(t, actualRepo.GitObjectDirectory) + actualRepo.GitObjectDirectory = "" + assert.NotEmpty(t, actualRepo.GitAlternateObjectDirectories) + actualRepo.GitAlternateObjectDirectories = nil + } else { + // Accessors should not have a quarantine directory configured. + assert.Empty(t, actualRepo.GitObjectDirectory) + assert.Empty(t, actualRepo.GitAlternateObjectDirectories) + } + + testhelper.ProtoEqual(t, expectedRepo, actualRepo) + + // The transaction ID should be included in the context. + transactionID = storage.ExtractTransactionID(ctx) + assert.Equal(t, storage.TransactionID(1), transactionID) + + // The transaction itself should be included in the context. + transactionInContext := false + storagectx.RunWithTransaction(ctx, func(tx storagectx.Transaction) { + transactionInContext = true + }) + assert.True(t, transactionInContext) + + // The transaction should be registered into the registry and retrievable + // with its ID. + tx, err := txRegistry.Get(transactionID) + assert.NoError(t, err) + + if tc.rollbackTransaction { + assert.NoError(t, tx.Rollback()) + } + } + + serverAddress := testserver.RunGitalyServer(t, cfg, func(server *grpc.Server, deps *service.Dependencies) { + gitalypb.RegisterRepositoryServiceServer(server, mockRepositoryService{ + objectFormatFunc: func(ctx context.Context, req *gitalypb.ObjectFormatRequest) (*gitalypb.ObjectFormatResponse, error) { + assertHandler(ctx, false, req.GetRepository()) + return &gitalypb.ObjectFormatResponse{}, tc.handlerError + }, + setCustomHooksFunc: func(stream gitalypb.RepositoryService_SetCustomHooksServer) error { + req, err := stream.Recv() + assert.NoError(t, err) + + assertHandler(stream.Context(), true, req.GetRepository()) + + resp, err := stream.Recv() + assert.Nil(t, resp) + assert.Equal(t, io.EOF, err) + return nil + }, + getCustomHooksFunc: func(req *gitalypb.GetCustomHooksRequest, stream gitalypb.RepositoryService_GetCustomHooksServer) error { + assertHandler(stream.Context(), false, req.GetRepository()) + return tc.handlerError + }, + removeRepositoryFunc: func(ctx context.Context, req *gitalypb.RemoveRepositoryRequest) (*gitalypb.RemoveRepositoryResponse, error) { + assertHandler(ctx, true, req.GetRepository()) + return &gitalypb.RemoveRepositoryResponse{}, tc.handlerError + }, + }) + }, + testserver.WithTransactionRegistry(txRegistry), + testserver.WithLogger(log.FromLogrusEntry(entry)), + ) + + clientConn, err := client.Dial(ctx, serverAddress) + require.NoError(t, err) + defer clientConn.Close() + + tc.performRequest(t, testhelper.Context(t), gitalypb.NewRepositoryServiceClient(clientConn)) + require.Equal(t, tc.expectHandlerInvoked, handlerInvoked) + + var rollbackFailureError error + for _, entry := range loggerHook.AllEntries() { + if entry.Message == "failed rolling back transaction" { + rollbackFailureError = entry.Data[logrus.ErrorKey].(error) + } + } + + require.Equal(t, tc.expectedRollbackError, rollbackFailureError) + + // The transaction should be unregistered at the end of the RPC to clean up. + _, err = txRegistry.Get(transactionID) + require.Equal(t, errors.New("transaction not found"), err) + }) + } +} + +func TestMiddleware_non_transactional(t *testing.T) { + if !testhelper.IsWALEnabled() { + t.Skip(` +The test relies on the interceptor being configured in the test server. +Only run the test with WAL enabled as other wise the interceptor won't +be configured.`) + } + + testhelper.SkipWithPraefect(t, ` +This interceptor is for use with Gitaly. Praefect running in front of it may change error +messages and behavior by erroring out the requests before they even hit this interceptor.`) + + validRepository := func() *gitalypb.Repository { + return &gitalypb.Repository{ + StorageName: "default", + RelativePath: "non-existent", + GlRepository: "gl-repository", + GlProjectPath: "project-path", + } + } + + for _, tc := range []struct { + desc string + performRequest func(*testing.T, context.Context, *grpc.ClientConn) + }{ + { + desc: "health service", + performRequest: func(t *testing.T, ctx context.Context, cc *grpc.ClientConn) { + resp, err := grpc_health_v1.NewHealthClient(cc).Check(ctx, &grpc_health_v1.HealthCheckRequest{}) + require.NoError(t, err) + testhelper.ProtoEqual(t, &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, resp) + }, + }, + { + desc: "repository with object directory does not start a transaction", + performRequest: func(t *testing.T, ctx context.Context, cc *grpc.ClientConn) { + resp, err := gitalypb.NewRepositoryServiceClient(cc).ObjectFormat(ctx, &gitalypb.ObjectFormatRequest{ + Repository: &gitalypb.Repository{ + StorageName: "default", + GitObjectDirectory: "non-default", + }, + }) + require.NoError(t, err) + testhelper.ProtoEqual(t, &gitalypb.ObjectFormatResponse{}, resp) + }, + }, + { + desc: "repository with alternate object directory does not start a transaction", + performRequest: func(t *testing.T, ctx context.Context, cc *grpc.ClientConn) { + resp, err := gitalypb.NewRepositoryServiceClient(cc).ObjectFormat(ctx, &gitalypb.ObjectFormatRequest{ + Repository: &gitalypb.Repository{ + StorageName: "default", + GitAlternateObjectDirectories: []string{"non-default"}, + }, + }) + require.NoError(t, err) + testhelper.ProtoEqual(t, &gitalypb.ObjectFormatResponse{}, resp) + }, + }, + { + desc: "streaming rpc without first message", + performRequest: func(t *testing.T, ctx context.Context, cc *grpc.ClientConn) { + stream, err := gitalypb.NewRepositoryServiceClient(cc).SetCustomHooks(ctx) + require.NoError(t, err) + + resp, err := stream.CloseAndRecv() + require.Equal(t, io.EOF, err) + require.Nil(t, resp) + }, + }, + { + desc: "maintenance rpc", + performRequest: func(t *testing.T, ctx context.Context, cc *grpc.ClientConn) { + resp, err := gitalypb.NewRepositoryServiceClient(cc).OptimizeRepository(ctx, &gitalypb.OptimizeRepositoryRequest{ + Repository: validRepository(), + }) + require.NoError(t, err) + testhelper.ProtoEqual(t, &gitalypb.OptimizeRepositoryResponse{}, resp) + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + ctx := testhelper.Context(t) + + cfg := testcfg.Build(t) + + handlerInvoked := false + assertHandler := func(ctx context.Context) { + handlerInvoked = true + assert.Equal(t, storage.TransactionID(0), storage.ExtractTransactionID(ctx)) + } + + serverAddress := testserver.RunGitalyServer(t, cfg, func(server *grpc.Server, deps *service.Dependencies) { + grpc_health_v1.RegisterHealthServer(server, mockHealthService{ + checkFunc: func(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) { + assertHandler(ctx) + return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil + }, + }) + gitalypb.RegisterRepositoryServiceServer(server, mockRepositoryService{ + objectFormatFunc: func(ctx context.Context, req *gitalypb.ObjectFormatRequest) (*gitalypb.ObjectFormatResponse, error) { + assertHandler(ctx) + return &gitalypb.ObjectFormatResponse{}, nil + }, + setCustomHooksFunc: func(stream gitalypb.RepositoryService_SetCustomHooksServer) error { + assertHandler(stream.Context()) + + resp, err := stream.Recv() + assert.Nil(t, resp) + assert.Equal(t, io.EOF, err) + return nil + }, + optimizeRepositoryFunc: func(ctx context.Context, req *gitalypb.OptimizeRepositoryRequest) (*gitalypb.OptimizeRepositoryResponse, error) { + assertHandler(ctx) + testhelper.ProtoEqual(t, validRepository(), req.GetRepository()) + return &gitalypb.OptimizeRepositoryResponse{}, nil + }, + removeRepositoryFunc: func(ctx context.Context, req *gitalypb.RemoveRepositoryRequest) (*gitalypb.RemoveRepositoryResponse, error) { + assertHandler(ctx) + testhelper.ProtoEqual(t, validRepository(), req.GetRepository()) + return &gitalypb.RemoveRepositoryResponse{}, nil + }, + }) + }) + + clientConn, err := client.Dial(ctx, serverAddress) + require.NoError(t, err) + defer clientConn.Close() + + tc.performRequest(t, testhelper.Context(t), clientConn) + require.True(t, handlerInvoked) + }) + } +} diff --git a/internal/gitaly/storage/storagemgr/transaction_manager.go b/internal/gitaly/storage/storagemgr/transaction_manager.go index 9068ce635..f0a87dbc1 100644 --- a/internal/gitaly/storage/storagemgr/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/transaction_manager.go @@ -327,6 +327,15 @@ func (txn *Transaction) RewriteRepository(repo *gitalypb.Repository) *gitalypb.R return rewritten } +// OriginalRepository returns the repository as it was before rewriting it to point to the snapshot. +func (txn *Transaction) OriginalRepository(repo *gitalypb.Repository) *gitalypb.Repository { + original := proto.Clone(repo).(*gitalypb.Repository) + original.RelativePath = strings.TrimPrefix(repo.RelativePath, txn.snapshotBaseRelativePath+string(os.PathSeparator)) + original.GitObjectDirectory = "" + original.GitAlternateObjectDirectories = nil + return original +} + // createRepositorySnapshot snapshots the repository's current state at snapshotPath. This is done by // recreating the repository's directory structure and hard linking the repository's files in their // correct locations there. This effectively does a copy-free clone of the repository. Since the files diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index 1c7b48bd3..ecb83999a 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -30,6 +30,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/limithandler" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm" "gitlab.com/gitlab-org/gitaly/v16/internal/limiter" "gitlab.com/gitlab-org/gitaly/v16/internal/log" @@ -173,12 +174,25 @@ func runGitaly(tb testing.TB, cfg config.Cfg, registrar func(srv *grpc.Server, d deps := gsd.createDependencies(tb, cfg) tb.Cleanup(func() { testhelper.MustClose(tb, gsd.conns) }) + var txMiddleware server.TransactionMiddleware + if deps.GetPartitionManager() != nil { + txMiddleware = server.TransactionMiddleware{ + UnaryInterceptor: storagemgr.NewUnaryInterceptor( + deps.Logger, protoregistry.GitalyProtoPreregistered, deps.GetTransactionRegistry(), deps.GetPartitionManager(), deps.GetLocator(), + ), + StreamInterceptor: storagemgr.NewStreamInterceptor( + deps.Logger, protoregistry.GitalyProtoPreregistered, deps.GetTransactionRegistry(), deps.GetPartitionManager(), deps.GetLocator(), + ), + } + } + serverFactory := server.NewGitalyServerFactory( cfg, gsd.logger.WithField("test", tb.Name()), deps.GetBackchannelRegistry(), deps.GetDiskCache(), []*limithandler.LimiterMiddleware{deps.GetLimitHandler()}, + txMiddleware, ) if cfg.RuntimeDir != "" { @@ -270,6 +284,7 @@ type gitalyServerDeps struct { backupSink backup.Sink backupLocator backup.Locator signingKey string + transactionRegistry *storagemgr.TransactionRegistry } func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) *service.Dependencies { @@ -303,9 +318,12 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) * gsd.gitCmdFactory = gittest.NewCommandFactory(tb, cfg) } - transactionRegistry := storagemgr.NewTransactionRegistry() + if gsd.transactionRegistry == nil { + gsd.transactionRegistry = storagemgr.NewTransactionRegistry() + } + if gsd.hookMgr == nil { - gsd.hookMgr = hook.NewManager(cfg, gsd.locator, gsd.logger, gsd.gitCmdFactory, gsd.txMgr, gsd.gitlabClient, hook.NewTransactionRegistry(transactionRegistry)) + gsd.hookMgr = hook.NewManager(cfg, gsd.locator, gsd.logger, gsd.gitCmdFactory, gsd.txMgr, gsd.gitlabClient, hook.NewTransactionRegistry(gsd.transactionRegistry)) } if gsd.catfileCache == nil { @@ -385,7 +403,7 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) * RepositoryCounter: gsd.repositoryCounter, UpdaterWithHooks: gsd.updaterWithHooks, HousekeepingManager: gsd.housekeepingManager, - TransactionRegistry: transactionRegistry, + TransactionRegistry: gsd.transactionRegistry, PartitionManager: partitionManager, BackupSink: gsd.backupSink, BackupLocator: gsd.backupLocator, @@ -518,3 +536,11 @@ func WithSigningKey(signingKey string) GitalyServerOpt { return deps } } + +// WithTransactionRegistry sets the transaction registry that will be used for Gitaly services. +func WithTransactionRegistry(registry *storagemgr.TransactionRegistry) GitalyServerOpt { + return func(deps gitalyServerDeps) gitalyServerDeps { + deps.transactionRegistry = registry + return deps + } +} |