Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--internal/praefect/rename_repository.go125
-rw-r--r--internal/praefect/server.go8
-rw-r--r--internal/praefect/server_test.go76
3 files changed, 156 insertions, 53 deletions
diff --git a/internal/praefect/rename_repository.go b/internal/praefect/rename_repository.go
index be8196607..495b18e4b 100644
--- a/internal/praefect/rename_repository.go
+++ b/internal/praefect/rename_repository.go
@@ -3,15 +3,122 @@ package praefect
import (
"errors"
"fmt"
+ "strings"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/grpc-proxy/proxy"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"google.golang.org/grpc"
)
+type renamePeeker struct {
+ grpc.ServerStream
+ peeked *gitalypb.RenameRepositoryRequest
+}
+
+func (peeker *renamePeeker) RecvMsg(msg interface{}) error {
+ // On the first read, we'll return the peeked first message of the stream.
+ if peeker.peeked != nil {
+ peeked := peeker.peeked
+ peeker.peeked = nil
+
+ codec := proxy.NewCodec()
+ payload, err := codec.Marshal(peeked)
+ if err != nil {
+ return fmt.Errorf("marshaling peeked rename request: %w", err)
+ }
+
+ return codec.Unmarshal(payload, msg)
+ }
+
+ return peeker.ServerStream.RecvMsg(msg)
+}
+
+func validateRenameRepositoryRequest(req *gitalypb.RenameRepositoryRequest, virtualStorages map[string]struct{}) error {
+ // These checks are not strictly necessary but they exist to keep retain compatibility with
+ // Gitaly's tested behavior.
+ if req.GetRepository() == nil {
+ return helper.ErrInvalidArgumentf("empty Repository")
+ } else if req.GetRelativePath() == "" {
+ return helper.ErrInvalidArgumentf("destination relative path is empty")
+ } else if _, ok := virtualStorages[req.GetRepository().GetStorageName()]; !ok {
+ return helper.ErrInvalidArgumentf("GetStorageByName: no such storage: %q", req.GetRepository().GetStorageName())
+ } else if _, err := storage.ValidateRelativePath("/fake-root", req.GetRelativePath()); err != nil {
+ // Gitaly uses ValidateRelativePath to verify there are no traversals, so we use the same function
+ // here. Praefect is not susceptible to path traversals as it generates its own disk paths but we
+ // do this to retain API compatibility with Gitaly. ValidateRelativePath checks for traversals by
+ // seeing whether the relative path escapes the root directory. It's not possible to traverse up
+ // from the /, so the traversals in the path wouldn't be caught. To allow for the check to work,
+ // we use the /fake-root directory simply to notice if there were traversals in the path.
+ return helper.ErrInvalidArgumentf("GetRepoPath: %s", err)
+ }
+
+ return nil
+}
+
+// RenameRepositoryFeatureFlagger decides whether Praefect should handle the rename request or whether it should
+// be proxied to a Gitaly. Rolling out Praefect generated replica paths is difficult as the atomicity fixes depend on the
+// unique replica paths. If the unique replica paths are disabled, the in-place rename handling makes no longer sense either.
+// Since they don't work isolation, this method decides which handling is used based on whether the repository is using a Praefect
+// generated replica path or not. Repositories with client set paths are handled non-atomically by proxying to Gitalys.
+// The Praefect generated paths are always handled with the atomic handling, regardless whether the feature flag is disabled
+// later.
+//
+// This function peeks the first request and forwards the call either to a Gitaly or handles it in Praefect. This requires
+// peeking into the internals of the proxying so we can set restore the frame correctly.
+func RenameRepositoryFeatureFlagger(virtualStorageNames []string, rs datastore.RepositoryStore, handleRenameRepository grpc.StreamHandler) grpc.StreamServerInterceptor {
+ virtualStorages := make(map[string]struct{}, len(virtualStorageNames))
+ for _, virtualStorage := range virtualStorageNames {
+ virtualStorages[virtualStorage] = struct{}{}
+ }
+
+ return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
+ if info.FullMethod != "/gitaly.RepositoryService/RenameRepository" {
+ return handler(srv, stream)
+ }
+
+ // Peek the message
+ var request gitalypb.RenameRepositoryRequest
+ if err := stream.RecvMsg(&request); err != nil {
+ return fmt.Errorf("peek rename repository request: %w", err)
+ }
+
+ // In order for the handlers to work after the message is peeked, the stream is restored
+ // with an alternative implementation that returns the first message correctly.
+ stream = &renamePeeker{ServerStream: stream, peeked: &request}
+
+ if err := validateRenameRepositoryRequest(&request, virtualStorages); err != nil {
+ return err
+ }
+
+ repo := request.GetRepository()
+ repositoryID, err := rs.GetRepositoryID(stream.Context(), repo.GetStorageName(), repo.GetRelativePath())
+ if err != nil {
+ if errors.As(err, new(commonerr.RepositoryNotFoundError)) {
+ return helper.ErrNotFoundf("GetRepoPath: not a git repository: \"%s/%s\"", repo.GetStorageName(), repo.GetRelativePath())
+ }
+
+ return fmt.Errorf("get repository id: %w", err)
+ }
+
+ replicaPath, err := rs.GetReplicaPath(stream.Context(), repositoryID)
+ if err != nil {
+ return fmt.Errorf("get replica path: %w", err)
+ }
+
+ // Repositories that do not have a Praefect generated replica path are always handled in the old manner.
+ // Once the feature flag is removed, all of the repositories will be handled in the atomic manner.
+ if !strings.HasPrefix(replicaPath, "@cluster") {
+ return handler(srv, stream)
+ }
+
+ return handleRenameRepository(srv, stream)
+ }
+}
+
// RenameRepositoryHandler handles /gitaly.RepositoryService/RenameRepository calls by renaming
// the repository in the lookup table stored in the database.
func RenameRepositoryHandler(virtualStoragesNames []string, rs datastore.RepositoryStore) grpc.StreamHandler {
@@ -26,22 +133,8 @@ func RenameRepositoryHandler(virtualStoragesNames []string, rs datastore.Reposit
return fmt.Errorf("receive request: %w", err)
}
- // These checks are not strictly necessary but they exist to keep retain compatibility with
- // Gitaly's tested behavior.
- if req.GetRepository() == nil {
- return helper.ErrInvalidArgumentf("empty Repository")
- } else if req.GetRelativePath() == "" {
- return helper.ErrInvalidArgumentf("destination relative path is empty")
- } else if _, ok := virtualStorages[req.GetRepository().GetStorageName()]; !ok {
- return helper.ErrInvalidArgumentf("GetStorageByName: no such storage: %q", req.GetRepository().GetStorageName())
- } else if _, err := storage.ValidateRelativePath("/fake-root", req.GetRelativePath()); err != nil {
- // Gitaly uses ValidateRelativePath to verify there are no traversals, so we use the same function
- // here. Praefect is not susceptible to path traversals as it generates its own disk paths but we
- // do this to retain API compatibility with Gitaly. ValidateRelativePath checks for traversals by
- // seeing whether the relative path escapes the root directory. It's not possible to traverse up
- // from the /, so the traversals in the path wouldn't be caught. To allow for the check to work,
- // we use the /fake-root directory simply to notice if there were traversals in the path.
- return helper.ErrInvalidArgumentf("GetRepoPath: %s", err)
+ if err := validateRenameRepositoryRequest(&req, virtualStorages); err != nil {
+ return err
}
if err := rs.RenameRepositoryInPlace(stream.Context(),
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index 07d0d4640..d894bb3f3 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -111,6 +111,13 @@ func NewGRPCServer(
panichandler.StreamPanicHandler,
}
+ if conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository {
+ streamInterceptors = append(
+ streamInterceptors,
+ RenameRepositoryFeatureFlagger(conf.VirtualStorageNames(), rs, RenameRepositoryHandler(conf.VirtualStorageNames(), rs)),
+ )
+ }
+
grpcOpts = append(grpcOpts, proxyRequiredOpts(director)...)
grpcOpts = append(grpcOpts, []grpc.ServerOption{
grpc.StreamInterceptor(grpcmw.ChainStreamServer(streamInterceptors...)),
@@ -148,7 +155,6 @@ func NewGRPCServer(
proxy.RegisterStreamHandlers(srv, "gitaly.RepositoryService", map[string]grpc.StreamHandler{
"RepositoryExists": RepositoryExistsHandler(rs),
"RemoveRepository": RemoveRepositoryHandler(rs, conns),
- "RenameRepository": RenameRepositoryHandler(conf.VirtualStorageNames(), rs),
})
proxy.RegisterStreamHandlers(srv, "gitaly.ObjectPoolService", map[string]grpc.StreamHandler{
"DeleteObjectPool": DeleteObjectPoolHandler(rs, conns),
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index 0020c953b..a47e82529 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -7,7 +7,6 @@ import (
"io"
"math/rand"
"net"
- "path/filepath"
"sort"
"strings"
"sync"
@@ -25,8 +24,8 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
- "gitlab.com/gitlab-org/gitaly/v14/internal/helper/text"
"gitlab.com/gitlab-org/gitaly/v14/internal/listenmux"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/grpc-proxy/proxy"
@@ -568,26 +567,19 @@ func TestRemoveRepository(t *testing.T) {
}
func TestRenameRepository(t *testing.T) {
- t.Parallel()
- ctx := testhelper.Context(t)
+ testhelper.NewFeatureSets(featureflag.PraefectGeneratedReplicaPaths).Run(t, testRenameRepository)
+}
+func testRenameRepository(t *testing.T, ctx context.Context) {
gitalyStorages := []string{"gitaly-1", "gitaly-2", "gitaly-3"}
- repoPaths := make([]string, len(gitalyStorages))
praefectCfg := config.Config{
VirtualStorages: []*config.VirtualStorage{{Name: "praefect"}},
Failover: config.Failover{Enabled: true, ElectionStrategy: config.ElectionStrategyPerRepository},
}
- var repo *gitalypb.Repository
- for i, storageName := range gitalyStorages {
- const relativePath = "test-repository"
-
+ for _, storageName := range gitalyStorages {
cfgBuilder := testcfg.NewGitalyCfgBuilder(testcfg.WithStorages(storageName))
- gitalyCfg, repos := cfgBuilder.BuildWithRepoAt(t, relativePath)
- if repo == nil {
- repo = repos[0]
- }
-
+ gitalyCfg := cfgBuilder.Build(t)
gitalyAddr := testserver.RunGitalyServer(t, gitalyCfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
praefectCfg.VirtualStorages[0].Nodes = append(praefectCfg.VirtualStorages[0].Nodes, &config.Node{
@@ -595,8 +587,6 @@ func TestRenameRepository(t *testing.T) {
Address: gitalyAddr,
Token: gitalyCfg.Auth.Token,
})
-
- repoPaths[i] = filepath.Join(gitalyCfg.Storages[0].Path, relativePath)
}
evq := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t)))
@@ -604,10 +594,19 @@ func TestRenameRepository(t *testing.T) {
db := testdb.New(t)
rs := datastore.NewPostgresRepositoryStore(db, nil)
- require.NoError(t, rs.CreateRepository(ctx, 1, "praefect", repo.RelativePath, repo.RelativePath, "gitaly-1", []string{"gitaly-2", "gitaly-3"}, nil, true, false))
- require.NoError(t, rs.CreateRepository(ctx, 2, "praefect", "relative-path-2", "replica-path-2", "gitaly-1", nil, nil, true, false))
- nodeSet, err := DialNodes(ctx, praefectCfg.VirtualStorages, nil, nil, nil, nil)
+ txManager := transactions.NewManager(praefectCfg)
+ logger := testhelper.NewDiscardingLogEntry(t)
+ clientHandshaker := backchannel.NewClientHandshaker(
+ logger,
+ NewBackchannelServerFactory(
+ logger,
+ transaction.NewServer(txManager),
+ nil,
+ ),
+ )
+
+ nodeSet, err := DialNodes(ctx, praefectCfg.VirtualStorages, nil, nil, clientHandshaker, nil)
require.NoError(t, err)
defer nodeSet.Close()
@@ -624,50 +623,55 @@ func TestRenameRepository(t *testing.T) {
rs,
nil,
),
+ withTxMgr: txManager,
})
- defer cleanup()
+ t.Cleanup(cleanup)
- // virtualRepo is a virtual repository all requests to it would be applied to the underline Gitaly nodes behind it
- virtualRepo := proto.Clone(repo).(*gitalypb.Repository)
- virtualRepo.StorageName = praefectCfg.VirtualStorages[0].Name
+ virtualRepo1, _ := gittest.CreateRepository(ctx, t, gconfig.Cfg{
+ Storages: []gconfig.Storage{{Name: "praefect"}},
+ }, gittest.CreateRepositoryConfig{ClientConn: cc})
- repoServiceClient := gitalypb.NewRepositoryServiceClient(cc)
+ virtualRepo2, _ := gittest.CreateRepository(ctx, t, gconfig.Cfg{
+ Storages: []gconfig.Storage{{Name: "praefect"}},
+ }, gittest.CreateRepositoryConfig{ClientConn: cc})
- newName, err := text.RandomHex(20)
- require.NoError(t, err)
+ const newRelativePath = "unused-relative-path"
+
+ repoServiceClient := gitalypb.NewRepositoryServiceClient(cc)
_, err = repoServiceClient.RenameRepository(ctx, &gitalypb.RenameRepositoryRequest{
Repository: &gitalypb.Repository{
- StorageName: virtualRepo.StorageName,
+ StorageName: virtualRepo1.StorageName,
RelativePath: "not-found",
},
- RelativePath: newName,
+ RelativePath: virtualRepo2.RelativePath,
})
testhelper.RequireGrpcError(t, helper.ErrNotFoundf(`GetRepoPath: not a git repository: "praefect/not-found"`), err)
_, err = repoServiceClient.RenameRepository(ctx, &gitalypb.RenameRepositoryRequest{
- Repository: virtualRepo,
- RelativePath: "relative-path-2",
+ Repository: virtualRepo1,
+ RelativePath: virtualRepo2.RelativePath,
})
- expectedErr := helper.ErrAlreadyExistsf("destination already exists")
+
+ expectedErr := helper.ErrAlreadyExistsf("target repo exists already")
testhelper.RequireGrpcError(t, expectedErr, err)
_, err = repoServiceClient.RenameRepository(ctx, &gitalypb.RenameRepositoryRequest{
- Repository: virtualRepo,
- RelativePath: newName,
+ Repository: virtualRepo1,
+ RelativePath: newRelativePath,
})
require.NoError(t, err)
resp, err := repoServiceClient.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{
- Repository: virtualRepo,
+ Repository: virtualRepo1,
})
require.NoError(t, err)
require.False(t, resp.GetExists(), "repo with old name must gone")
resp, err = repoServiceClient.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{
Repository: &gitalypb.Repository{
- StorageName: virtualRepo.StorageName,
- RelativePath: newName,
+ StorageName: virtualRepo1.StorageName,
+ RelativePath: newRelativePath,
},
})
require.NoError(t, err)