diff options
-rw-r--r-- | internal/praefect/rename_repository.go | 125 | ||||
-rw-r--r-- | internal/praefect/server.go | 8 | ||||
-rw-r--r-- | internal/praefect/server_test.go | 76 |
3 files changed, 156 insertions, 53 deletions
diff --git a/internal/praefect/rename_repository.go b/internal/praefect/rename_repository.go index be8196607..495b18e4b 100644 --- a/internal/praefect/rename_repository.go +++ b/internal/praefect/rename_repository.go @@ -3,15 +3,122 @@ package praefect import ( "errors" "fmt" + "strings" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/grpc-proxy/proxy" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "google.golang.org/grpc" ) +type renamePeeker struct { + grpc.ServerStream + peeked *gitalypb.RenameRepositoryRequest +} + +func (peeker *renamePeeker) RecvMsg(msg interface{}) error { + // On the first read, we'll return the peeked first message of the stream. + if peeker.peeked != nil { + peeked := peeker.peeked + peeker.peeked = nil + + codec := proxy.NewCodec() + payload, err := codec.Marshal(peeked) + if err != nil { + return fmt.Errorf("marshaling peeked rename request: %w", err) + } + + return codec.Unmarshal(payload, msg) + } + + return peeker.ServerStream.RecvMsg(msg) +} + +func validateRenameRepositoryRequest(req *gitalypb.RenameRepositoryRequest, virtualStorages map[string]struct{}) error { + // These checks are not strictly necessary but they exist to keep retain compatibility with + // Gitaly's tested behavior. + if req.GetRepository() == nil { + return helper.ErrInvalidArgumentf("empty Repository") + } else if req.GetRelativePath() == "" { + return helper.ErrInvalidArgumentf("destination relative path is empty") + } else if _, ok := virtualStorages[req.GetRepository().GetStorageName()]; !ok { + return helper.ErrInvalidArgumentf("GetStorageByName: no such storage: %q", req.GetRepository().GetStorageName()) + } else if _, err := storage.ValidateRelativePath("/fake-root", req.GetRelativePath()); err != nil { + // Gitaly uses ValidateRelativePath to verify there are no traversals, so we use the same function + // here. Praefect is not susceptible to path traversals as it generates its own disk paths but we + // do this to retain API compatibility with Gitaly. ValidateRelativePath checks for traversals by + // seeing whether the relative path escapes the root directory. It's not possible to traverse up + // from the /, so the traversals in the path wouldn't be caught. To allow for the check to work, + // we use the /fake-root directory simply to notice if there were traversals in the path. + return helper.ErrInvalidArgumentf("GetRepoPath: %s", err) + } + + return nil +} + +// RenameRepositoryFeatureFlagger decides whether Praefect should handle the rename request or whether it should +// be proxied to a Gitaly. Rolling out Praefect generated replica paths is difficult as the atomicity fixes depend on the +// unique replica paths. If the unique replica paths are disabled, the in-place rename handling makes no longer sense either. +// Since they don't work isolation, this method decides which handling is used based on whether the repository is using a Praefect +// generated replica path or not. Repositories with client set paths are handled non-atomically by proxying to Gitalys. +// The Praefect generated paths are always handled with the atomic handling, regardless whether the feature flag is disabled +// later. +// +// This function peeks the first request and forwards the call either to a Gitaly or handles it in Praefect. This requires +// peeking into the internals of the proxying so we can set restore the frame correctly. +func RenameRepositoryFeatureFlagger(virtualStorageNames []string, rs datastore.RepositoryStore, handleRenameRepository grpc.StreamHandler) grpc.StreamServerInterceptor { + virtualStorages := make(map[string]struct{}, len(virtualStorageNames)) + for _, virtualStorage := range virtualStorageNames { + virtualStorages[virtualStorage] = struct{}{} + } + + return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + if info.FullMethod != "/gitaly.RepositoryService/RenameRepository" { + return handler(srv, stream) + } + + // Peek the message + var request gitalypb.RenameRepositoryRequest + if err := stream.RecvMsg(&request); err != nil { + return fmt.Errorf("peek rename repository request: %w", err) + } + + // In order for the handlers to work after the message is peeked, the stream is restored + // with an alternative implementation that returns the first message correctly. + stream = &renamePeeker{ServerStream: stream, peeked: &request} + + if err := validateRenameRepositoryRequest(&request, virtualStorages); err != nil { + return err + } + + repo := request.GetRepository() + repositoryID, err := rs.GetRepositoryID(stream.Context(), repo.GetStorageName(), repo.GetRelativePath()) + if err != nil { + if errors.As(err, new(commonerr.RepositoryNotFoundError)) { + return helper.ErrNotFoundf("GetRepoPath: not a git repository: \"%s/%s\"", repo.GetStorageName(), repo.GetRelativePath()) + } + + return fmt.Errorf("get repository id: %w", err) + } + + replicaPath, err := rs.GetReplicaPath(stream.Context(), repositoryID) + if err != nil { + return fmt.Errorf("get replica path: %w", err) + } + + // Repositories that do not have a Praefect generated replica path are always handled in the old manner. + // Once the feature flag is removed, all of the repositories will be handled in the atomic manner. + if !strings.HasPrefix(replicaPath, "@cluster") { + return handler(srv, stream) + } + + return handleRenameRepository(srv, stream) + } +} + // RenameRepositoryHandler handles /gitaly.RepositoryService/RenameRepository calls by renaming // the repository in the lookup table stored in the database. func RenameRepositoryHandler(virtualStoragesNames []string, rs datastore.RepositoryStore) grpc.StreamHandler { @@ -26,22 +133,8 @@ func RenameRepositoryHandler(virtualStoragesNames []string, rs datastore.Reposit return fmt.Errorf("receive request: %w", err) } - // These checks are not strictly necessary but they exist to keep retain compatibility with - // Gitaly's tested behavior. - if req.GetRepository() == nil { - return helper.ErrInvalidArgumentf("empty Repository") - } else if req.GetRelativePath() == "" { - return helper.ErrInvalidArgumentf("destination relative path is empty") - } else if _, ok := virtualStorages[req.GetRepository().GetStorageName()]; !ok { - return helper.ErrInvalidArgumentf("GetStorageByName: no such storage: %q", req.GetRepository().GetStorageName()) - } else if _, err := storage.ValidateRelativePath("/fake-root", req.GetRelativePath()); err != nil { - // Gitaly uses ValidateRelativePath to verify there are no traversals, so we use the same function - // here. Praefect is not susceptible to path traversals as it generates its own disk paths but we - // do this to retain API compatibility with Gitaly. ValidateRelativePath checks for traversals by - // seeing whether the relative path escapes the root directory. It's not possible to traverse up - // from the /, so the traversals in the path wouldn't be caught. To allow for the check to work, - // we use the /fake-root directory simply to notice if there were traversals in the path. - return helper.ErrInvalidArgumentf("GetRepoPath: %s", err) + if err := validateRenameRepositoryRequest(&req, virtualStorages); err != nil { + return err } if err := rs.RenameRepositoryInPlace(stream.Context(), diff --git a/internal/praefect/server.go b/internal/praefect/server.go index 07d0d4640..d894bb3f3 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -111,6 +111,13 @@ func NewGRPCServer( panichandler.StreamPanicHandler, } + if conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository { + streamInterceptors = append( + streamInterceptors, + RenameRepositoryFeatureFlagger(conf.VirtualStorageNames(), rs, RenameRepositoryHandler(conf.VirtualStorageNames(), rs)), + ) + } + grpcOpts = append(grpcOpts, proxyRequiredOpts(director)...) grpcOpts = append(grpcOpts, []grpc.ServerOption{ grpc.StreamInterceptor(grpcmw.ChainStreamServer(streamInterceptors...)), @@ -148,7 +155,6 @@ func NewGRPCServer( proxy.RegisterStreamHandlers(srv, "gitaly.RepositoryService", map[string]grpc.StreamHandler{ "RepositoryExists": RepositoryExistsHandler(rs), "RemoveRepository": RemoveRepositoryHandler(rs, conns), - "RenameRepository": RenameRepositoryHandler(conf.VirtualStorageNames(), rs), }) proxy.RegisterStreamHandlers(srv, "gitaly.ObjectPoolService", map[string]grpc.StreamHandler{ "DeleteObjectPool": DeleteObjectPoolHandler(rs, conns), diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index 0020c953b..a47e82529 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -7,7 +7,6 @@ import ( "io" "math/rand" "net" - "path/filepath" "sort" "strings" "sync" @@ -25,8 +24,8 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v14/internal/helper" - "gitlab.com/gitlab-org/gitaly/v14/internal/helper/text" "gitlab.com/gitlab-org/gitaly/v14/internal/listenmux" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/grpc-proxy/proxy" @@ -568,26 +567,19 @@ func TestRemoveRepository(t *testing.T) { } func TestRenameRepository(t *testing.T) { - t.Parallel() - ctx := testhelper.Context(t) + testhelper.NewFeatureSets(featureflag.PraefectGeneratedReplicaPaths).Run(t, testRenameRepository) +} +func testRenameRepository(t *testing.T, ctx context.Context) { gitalyStorages := []string{"gitaly-1", "gitaly-2", "gitaly-3"} - repoPaths := make([]string, len(gitalyStorages)) praefectCfg := config.Config{ VirtualStorages: []*config.VirtualStorage{{Name: "praefect"}}, Failover: config.Failover{Enabled: true, ElectionStrategy: config.ElectionStrategyPerRepository}, } - var repo *gitalypb.Repository - for i, storageName := range gitalyStorages { - const relativePath = "test-repository" - + for _, storageName := range gitalyStorages { cfgBuilder := testcfg.NewGitalyCfgBuilder(testcfg.WithStorages(storageName)) - gitalyCfg, repos := cfgBuilder.BuildWithRepoAt(t, relativePath) - if repo == nil { - repo = repos[0] - } - + gitalyCfg := cfgBuilder.Build(t) gitalyAddr := testserver.RunGitalyServer(t, gitalyCfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) praefectCfg.VirtualStorages[0].Nodes = append(praefectCfg.VirtualStorages[0].Nodes, &config.Node{ @@ -595,8 +587,6 @@ func TestRenameRepository(t *testing.T) { Address: gitalyAddr, Token: gitalyCfg.Auth.Token, }) - - repoPaths[i] = filepath.Join(gitalyCfg.Storages[0].Path, relativePath) } evq := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) @@ -604,10 +594,19 @@ func TestRenameRepository(t *testing.T) { db := testdb.New(t) rs := datastore.NewPostgresRepositoryStore(db, nil) - require.NoError(t, rs.CreateRepository(ctx, 1, "praefect", repo.RelativePath, repo.RelativePath, "gitaly-1", []string{"gitaly-2", "gitaly-3"}, nil, true, false)) - require.NoError(t, rs.CreateRepository(ctx, 2, "praefect", "relative-path-2", "replica-path-2", "gitaly-1", nil, nil, true, false)) - nodeSet, err := DialNodes(ctx, praefectCfg.VirtualStorages, nil, nil, nil, nil) + txManager := transactions.NewManager(praefectCfg) + logger := testhelper.NewDiscardingLogEntry(t) + clientHandshaker := backchannel.NewClientHandshaker( + logger, + NewBackchannelServerFactory( + logger, + transaction.NewServer(txManager), + nil, + ), + ) + + nodeSet, err := DialNodes(ctx, praefectCfg.VirtualStorages, nil, nil, clientHandshaker, nil) require.NoError(t, err) defer nodeSet.Close() @@ -624,50 +623,55 @@ func TestRenameRepository(t *testing.T) { rs, nil, ), + withTxMgr: txManager, }) - defer cleanup() + t.Cleanup(cleanup) - // virtualRepo is a virtual repository all requests to it would be applied to the underline Gitaly nodes behind it - virtualRepo := proto.Clone(repo).(*gitalypb.Repository) - virtualRepo.StorageName = praefectCfg.VirtualStorages[0].Name + virtualRepo1, _ := gittest.CreateRepository(ctx, t, gconfig.Cfg{ + Storages: []gconfig.Storage{{Name: "praefect"}}, + }, gittest.CreateRepositoryConfig{ClientConn: cc}) - repoServiceClient := gitalypb.NewRepositoryServiceClient(cc) + virtualRepo2, _ := gittest.CreateRepository(ctx, t, gconfig.Cfg{ + Storages: []gconfig.Storage{{Name: "praefect"}}, + }, gittest.CreateRepositoryConfig{ClientConn: cc}) - newName, err := text.RandomHex(20) - require.NoError(t, err) + const newRelativePath = "unused-relative-path" + + repoServiceClient := gitalypb.NewRepositoryServiceClient(cc) _, err = repoServiceClient.RenameRepository(ctx, &gitalypb.RenameRepositoryRequest{ Repository: &gitalypb.Repository{ - StorageName: virtualRepo.StorageName, + StorageName: virtualRepo1.StorageName, RelativePath: "not-found", }, - RelativePath: newName, + RelativePath: virtualRepo2.RelativePath, }) testhelper.RequireGrpcError(t, helper.ErrNotFoundf(`GetRepoPath: not a git repository: "praefect/not-found"`), err) _, err = repoServiceClient.RenameRepository(ctx, &gitalypb.RenameRepositoryRequest{ - Repository: virtualRepo, - RelativePath: "relative-path-2", + Repository: virtualRepo1, + RelativePath: virtualRepo2.RelativePath, }) - expectedErr := helper.ErrAlreadyExistsf("destination already exists") + + expectedErr := helper.ErrAlreadyExistsf("target repo exists already") testhelper.RequireGrpcError(t, expectedErr, err) _, err = repoServiceClient.RenameRepository(ctx, &gitalypb.RenameRepositoryRequest{ - Repository: virtualRepo, - RelativePath: newName, + Repository: virtualRepo1, + RelativePath: newRelativePath, }) require.NoError(t, err) resp, err := repoServiceClient.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{ - Repository: virtualRepo, + Repository: virtualRepo1, }) require.NoError(t, err) require.False(t, resp.GetExists(), "repo with old name must gone") resp, err = repoServiceClient.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{ Repository: &gitalypb.Repository{ - StorageName: virtualRepo.StorageName, - RelativePath: newName, + StorageName: virtualRepo1.StorageName, + RelativePath: newRelativePath, }, }) require.NoError(t, err) |