Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2022-05-05 16:48:54 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2022-05-13 14:20:33 +0300
commit9c0c55cfed6977cdca5f217ad9f5fc2349f5ebe9 (patch)
tree6b7769b43b85b040db87b446ad8a708b9ab35f9c
parent110584aa627c9185ddb2c1b2d5d328873adf7430 (diff)
Feature flag atomic RenameRepository handlingsmh-repository-id-rename-repository
Feature flagging Praefect's atomic rename handling is a bit of a mess. The atomic handling relies on the unique replica paths generated by Praefect. Given that, it's not really possible to have a feature flag for testing the atomic renames without the unique replica paths. Generating the replica paths in Praefect breaks some assumptions made over a long period of time and can thus be dangerous. If we rolled out the flags and later had to disable the unique replica paths, the renames may not really work properly anymore as the repositories have not actually been moved on the disk by the atomic handling. There can be conflicts moving a repository to a relative path that is no longer used as the original repository was never moved from there due to the atomic handling not moving repositories on the disk. In order to make for a safer rollout, PraefectGenerateReplicaPaths flag has been added to control whether new repositories get Praefect generated paths or not. To make the rename handling safely feature flaggable as well, we should only apply the atomic rename handling to the repositories that have one of these Praefect generated paths. This ensures that once the flag is disabled, no other repository will face errors except the ones that were created while the flag was active. The rename functionality will thus work differently for repositories with Praefect generated paths and other repositories for as long as the feature flag is in place. In order to implement this, an interceptor is added that peeks the first message of the RenameRepository calls to determine whether this repository has a Praefect generated replica path or not. If not, the RenameRepository call is proxied as usual through Praefect. If yes, Praefect does not proxy the call but handles it in the atomic manner. After peeking, the first message is restored in the stream so the rest of the code can function without any knowledge of this decision.
-rw-r--r--internal/praefect/rename_repository.go125
-rw-r--r--internal/praefect/server.go8
-rw-r--r--internal/praefect/server_test.go76
3 files changed, 156 insertions, 53 deletions
diff --git a/internal/praefect/rename_repository.go b/internal/praefect/rename_repository.go
index be8196607..495b18e4b 100644
--- a/internal/praefect/rename_repository.go
+++ b/internal/praefect/rename_repository.go
@@ -3,15 +3,122 @@ package praefect
import (
"errors"
"fmt"
+ "strings"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/grpc-proxy/proxy"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"google.golang.org/grpc"
)
+type renamePeeker struct {
+ grpc.ServerStream
+ peeked *gitalypb.RenameRepositoryRequest
+}
+
+func (peeker *renamePeeker) RecvMsg(msg interface{}) error {
+ // On the first read, we'll return the peeked first message of the stream.
+ if peeker.peeked != nil {
+ peeked := peeker.peeked
+ peeker.peeked = nil
+
+ codec := proxy.NewCodec()
+ payload, err := codec.Marshal(peeked)
+ if err != nil {
+ return fmt.Errorf("marshaling peeked rename request: %w", err)
+ }
+
+ return codec.Unmarshal(payload, msg)
+ }
+
+ return peeker.ServerStream.RecvMsg(msg)
+}
+
+func validateRenameRepositoryRequest(req *gitalypb.RenameRepositoryRequest, virtualStorages map[string]struct{}) error {
+ // These checks are not strictly necessary but they exist to keep retain compatibility with
+ // Gitaly's tested behavior.
+ if req.GetRepository() == nil {
+ return helper.ErrInvalidArgumentf("empty Repository")
+ } else if req.GetRelativePath() == "" {
+ return helper.ErrInvalidArgumentf("destination relative path is empty")
+ } else if _, ok := virtualStorages[req.GetRepository().GetStorageName()]; !ok {
+ return helper.ErrInvalidArgumentf("GetStorageByName: no such storage: %q", req.GetRepository().GetStorageName())
+ } else if _, err := storage.ValidateRelativePath("/fake-root", req.GetRelativePath()); err != nil {
+ // Gitaly uses ValidateRelativePath to verify there are no traversals, so we use the same function
+ // here. Praefect is not susceptible to path traversals as it generates its own disk paths but we
+ // do this to retain API compatibility with Gitaly. ValidateRelativePath checks for traversals by
+ // seeing whether the relative path escapes the root directory. It's not possible to traverse up
+ // from the /, so the traversals in the path wouldn't be caught. To allow for the check to work,
+ // we use the /fake-root directory simply to notice if there were traversals in the path.
+ return helper.ErrInvalidArgumentf("GetRepoPath: %s", err)
+ }
+
+ return nil
+}
+
+// RenameRepositoryFeatureFlagger decides whether Praefect should handle the rename request or whether it should
+// be proxied to a Gitaly. Rolling out Praefect generated replica paths is difficult as the atomicity fixes depend on the
+// unique replica paths. If the unique replica paths are disabled, the in-place rename handling makes no longer sense either.
+// Since they don't work isolation, this method decides which handling is used based on whether the repository is using a Praefect
+// generated replica path or not. Repositories with client set paths are handled non-atomically by proxying to Gitalys.
+// The Praefect generated paths are always handled with the atomic handling, regardless whether the feature flag is disabled
+// later.
+//
+// This function peeks the first request and forwards the call either to a Gitaly or handles it in Praefect. This requires
+// peeking into the internals of the proxying so we can set restore the frame correctly.
+func RenameRepositoryFeatureFlagger(virtualStorageNames []string, rs datastore.RepositoryStore, handleRenameRepository grpc.StreamHandler) grpc.StreamServerInterceptor {
+ virtualStorages := make(map[string]struct{}, len(virtualStorageNames))
+ for _, virtualStorage := range virtualStorageNames {
+ virtualStorages[virtualStorage] = struct{}{}
+ }
+
+ return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
+ if info.FullMethod != "/gitaly.RepositoryService/RenameRepository" {
+ return handler(srv, stream)
+ }
+
+ // Peek the message
+ var request gitalypb.RenameRepositoryRequest
+ if err := stream.RecvMsg(&request); err != nil {
+ return fmt.Errorf("peek rename repository request: %w", err)
+ }
+
+ // In order for the handlers to work after the message is peeked, the stream is restored
+ // with an alternative implementation that returns the first message correctly.
+ stream = &renamePeeker{ServerStream: stream, peeked: &request}
+
+ if err := validateRenameRepositoryRequest(&request, virtualStorages); err != nil {
+ return err
+ }
+
+ repo := request.GetRepository()
+ repositoryID, err := rs.GetRepositoryID(stream.Context(), repo.GetStorageName(), repo.GetRelativePath())
+ if err != nil {
+ if errors.As(err, new(commonerr.RepositoryNotFoundError)) {
+ return helper.ErrNotFoundf("GetRepoPath: not a git repository: \"%s/%s\"", repo.GetStorageName(), repo.GetRelativePath())
+ }
+
+ return fmt.Errorf("get repository id: %w", err)
+ }
+
+ replicaPath, err := rs.GetReplicaPath(stream.Context(), repositoryID)
+ if err != nil {
+ return fmt.Errorf("get replica path: %w", err)
+ }
+
+ // Repositories that do not have a Praefect generated replica path are always handled in the old manner.
+ // Once the feature flag is removed, all of the repositories will be handled in the atomic manner.
+ if !strings.HasPrefix(replicaPath, "@cluster") {
+ return handler(srv, stream)
+ }
+
+ return handleRenameRepository(srv, stream)
+ }
+}
+
// RenameRepositoryHandler handles /gitaly.RepositoryService/RenameRepository calls by renaming
// the repository in the lookup table stored in the database.
func RenameRepositoryHandler(virtualStoragesNames []string, rs datastore.RepositoryStore) grpc.StreamHandler {
@@ -26,22 +133,8 @@ func RenameRepositoryHandler(virtualStoragesNames []string, rs datastore.Reposit
return fmt.Errorf("receive request: %w", err)
}
- // These checks are not strictly necessary but they exist to keep retain compatibility with
- // Gitaly's tested behavior.
- if req.GetRepository() == nil {
- return helper.ErrInvalidArgumentf("empty Repository")
- } else if req.GetRelativePath() == "" {
- return helper.ErrInvalidArgumentf("destination relative path is empty")
- } else if _, ok := virtualStorages[req.GetRepository().GetStorageName()]; !ok {
- return helper.ErrInvalidArgumentf("GetStorageByName: no such storage: %q", req.GetRepository().GetStorageName())
- } else if _, err := storage.ValidateRelativePath("/fake-root", req.GetRelativePath()); err != nil {
- // Gitaly uses ValidateRelativePath to verify there are no traversals, so we use the same function
- // here. Praefect is not susceptible to path traversals as it generates its own disk paths but we
- // do this to retain API compatibility with Gitaly. ValidateRelativePath checks for traversals by
- // seeing whether the relative path escapes the root directory. It's not possible to traverse up
- // from the /, so the traversals in the path wouldn't be caught. To allow for the check to work,
- // we use the /fake-root directory simply to notice if there were traversals in the path.
- return helper.ErrInvalidArgumentf("GetRepoPath: %s", err)
+ if err := validateRenameRepositoryRequest(&req, virtualStorages); err != nil {
+ return err
}
if err := rs.RenameRepositoryInPlace(stream.Context(),
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index 07d0d4640..d894bb3f3 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -111,6 +111,13 @@ func NewGRPCServer(
panichandler.StreamPanicHandler,
}
+ if conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository {
+ streamInterceptors = append(
+ streamInterceptors,
+ RenameRepositoryFeatureFlagger(conf.VirtualStorageNames(), rs, RenameRepositoryHandler(conf.VirtualStorageNames(), rs)),
+ )
+ }
+
grpcOpts = append(grpcOpts, proxyRequiredOpts(director)...)
grpcOpts = append(grpcOpts, []grpc.ServerOption{
grpc.StreamInterceptor(grpcmw.ChainStreamServer(streamInterceptors...)),
@@ -148,7 +155,6 @@ func NewGRPCServer(
proxy.RegisterStreamHandlers(srv, "gitaly.RepositoryService", map[string]grpc.StreamHandler{
"RepositoryExists": RepositoryExistsHandler(rs),
"RemoveRepository": RemoveRepositoryHandler(rs, conns),
- "RenameRepository": RenameRepositoryHandler(conf.VirtualStorageNames(), rs),
})
proxy.RegisterStreamHandlers(srv, "gitaly.ObjectPoolService", map[string]grpc.StreamHandler{
"DeleteObjectPool": DeleteObjectPoolHandler(rs, conns),
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index 0020c953b..a47e82529 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -7,7 +7,6 @@ import (
"io"
"math/rand"
"net"
- "path/filepath"
"sort"
"strings"
"sync"
@@ -25,8 +24,8 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
- "gitlab.com/gitlab-org/gitaly/v14/internal/helper/text"
"gitlab.com/gitlab-org/gitaly/v14/internal/listenmux"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/grpc-proxy/proxy"
@@ -568,26 +567,19 @@ func TestRemoveRepository(t *testing.T) {
}
func TestRenameRepository(t *testing.T) {
- t.Parallel()
- ctx := testhelper.Context(t)
+ testhelper.NewFeatureSets(featureflag.PraefectGeneratedReplicaPaths).Run(t, testRenameRepository)
+}
+func testRenameRepository(t *testing.T, ctx context.Context) {
gitalyStorages := []string{"gitaly-1", "gitaly-2", "gitaly-3"}
- repoPaths := make([]string, len(gitalyStorages))
praefectCfg := config.Config{
VirtualStorages: []*config.VirtualStorage{{Name: "praefect"}},
Failover: config.Failover{Enabled: true, ElectionStrategy: config.ElectionStrategyPerRepository},
}
- var repo *gitalypb.Repository
- for i, storageName := range gitalyStorages {
- const relativePath = "test-repository"
-
+ for _, storageName := range gitalyStorages {
cfgBuilder := testcfg.NewGitalyCfgBuilder(testcfg.WithStorages(storageName))
- gitalyCfg, repos := cfgBuilder.BuildWithRepoAt(t, relativePath)
- if repo == nil {
- repo = repos[0]
- }
-
+ gitalyCfg := cfgBuilder.Build(t)
gitalyAddr := testserver.RunGitalyServer(t, gitalyCfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
praefectCfg.VirtualStorages[0].Nodes = append(praefectCfg.VirtualStorages[0].Nodes, &config.Node{
@@ -595,8 +587,6 @@ func TestRenameRepository(t *testing.T) {
Address: gitalyAddr,
Token: gitalyCfg.Auth.Token,
})
-
- repoPaths[i] = filepath.Join(gitalyCfg.Storages[0].Path, relativePath)
}
evq := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t)))
@@ -604,10 +594,19 @@ func TestRenameRepository(t *testing.T) {
db := testdb.New(t)
rs := datastore.NewPostgresRepositoryStore(db, nil)
- require.NoError(t, rs.CreateRepository(ctx, 1, "praefect", repo.RelativePath, repo.RelativePath, "gitaly-1", []string{"gitaly-2", "gitaly-3"}, nil, true, false))
- require.NoError(t, rs.CreateRepository(ctx, 2, "praefect", "relative-path-2", "replica-path-2", "gitaly-1", nil, nil, true, false))
- nodeSet, err := DialNodes(ctx, praefectCfg.VirtualStorages, nil, nil, nil, nil)
+ txManager := transactions.NewManager(praefectCfg)
+ logger := testhelper.NewDiscardingLogEntry(t)
+ clientHandshaker := backchannel.NewClientHandshaker(
+ logger,
+ NewBackchannelServerFactory(
+ logger,
+ transaction.NewServer(txManager),
+ nil,
+ ),
+ )
+
+ nodeSet, err := DialNodes(ctx, praefectCfg.VirtualStorages, nil, nil, clientHandshaker, nil)
require.NoError(t, err)
defer nodeSet.Close()
@@ -624,50 +623,55 @@ func TestRenameRepository(t *testing.T) {
rs,
nil,
),
+ withTxMgr: txManager,
})
- defer cleanup()
+ t.Cleanup(cleanup)
- // virtualRepo is a virtual repository all requests to it would be applied to the underline Gitaly nodes behind it
- virtualRepo := proto.Clone(repo).(*gitalypb.Repository)
- virtualRepo.StorageName = praefectCfg.VirtualStorages[0].Name
+ virtualRepo1, _ := gittest.CreateRepository(ctx, t, gconfig.Cfg{
+ Storages: []gconfig.Storage{{Name: "praefect"}},
+ }, gittest.CreateRepositoryConfig{ClientConn: cc})
- repoServiceClient := gitalypb.NewRepositoryServiceClient(cc)
+ virtualRepo2, _ := gittest.CreateRepository(ctx, t, gconfig.Cfg{
+ Storages: []gconfig.Storage{{Name: "praefect"}},
+ }, gittest.CreateRepositoryConfig{ClientConn: cc})
- newName, err := text.RandomHex(20)
- require.NoError(t, err)
+ const newRelativePath = "unused-relative-path"
+
+ repoServiceClient := gitalypb.NewRepositoryServiceClient(cc)
_, err = repoServiceClient.RenameRepository(ctx, &gitalypb.RenameRepositoryRequest{
Repository: &gitalypb.Repository{
- StorageName: virtualRepo.StorageName,
+ StorageName: virtualRepo1.StorageName,
RelativePath: "not-found",
},
- RelativePath: newName,
+ RelativePath: virtualRepo2.RelativePath,
})
testhelper.RequireGrpcError(t, helper.ErrNotFoundf(`GetRepoPath: not a git repository: "praefect/not-found"`), err)
_, err = repoServiceClient.RenameRepository(ctx, &gitalypb.RenameRepositoryRequest{
- Repository: virtualRepo,
- RelativePath: "relative-path-2",
+ Repository: virtualRepo1,
+ RelativePath: virtualRepo2.RelativePath,
})
- expectedErr := helper.ErrAlreadyExistsf("destination already exists")
+
+ expectedErr := helper.ErrAlreadyExistsf("target repo exists already")
testhelper.RequireGrpcError(t, expectedErr, err)
_, err = repoServiceClient.RenameRepository(ctx, &gitalypb.RenameRepositoryRequest{
- Repository: virtualRepo,
- RelativePath: newName,
+ Repository: virtualRepo1,
+ RelativePath: newRelativePath,
})
require.NoError(t, err)
resp, err := repoServiceClient.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{
- Repository: virtualRepo,
+ Repository: virtualRepo1,
})
require.NoError(t, err)
require.False(t, resp.GetExists(), "repo with old name must gone")
resp, err = repoServiceClient.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{
Repository: &gitalypb.Repository{
- StorageName: virtualRepo.StorageName,
- RelativePath: newName,
+ StorageName: virtualRepo1.StorageName,
+ RelativePath: newRelativePath,
},
})
require.NoError(t, err)