Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2021-10-25 15:59:11 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2022-05-16 16:31:02 +0300
commit3e92c2cf29a757444e11b39f61e58a8820751e2a (patch)
tree962f1b9987d0c4deaa163a1648eedfb58d0ba002
parent6ef60ea1d66fed382059e2dd8b66c12d41bc5ca1 (diff)
Intercept RenameRepository calls in Praefect
Praefect currently handles RenameRepository like every other mutator. This is problematic as Praefect ends up renaming repositories first on the disks of the Gitalys and then in the database. If only the first operation succeeds, Praefect has effectively lost track of the repositories. With Praefect now generating unique relative paths for each repository, there's no need to actually move the repositories on the disks anymore. It is sufficient to rename the repository only in the database and leave the replicas in their existing locations on disk. This commit intercepts RenameRepository in Praefect and renames repositories only in the database. The atomic rename handling will be rolled out progressively with Praefect generated replica paths as the change does not make sense on its own. To do so, the RenameRepositoryFeatureFlagger checks the replica path of the repository and only applies the new logic if the repository has a Praefect generated replica path. Changelog: fixed
-rw-r--r--internal/gitaly/service/repository/rename_test.go15
-rw-r--r--internal/praefect/rename_repository.go160
-rw-r--r--internal/praefect/server.go7
-rw-r--r--internal/praefect/server_test.go126
4 files changed, 233 insertions, 75 deletions
diff --git a/internal/gitaly/service/repository/rename_test.go b/internal/gitaly/service/repository/rename_test.go
index 9983eb894..6c4f2b520 100644
--- a/internal/gitaly/service/repository/rename_test.go
+++ b/internal/gitaly/service/repository/rename_test.go
@@ -70,9 +70,6 @@ func TestRenameRepository_DestinationExists(t *testing.T) {
}
func TestRenameRepository_invalidRequest(t *testing.T) {
- // Prafect applies renames to metadata even on failed requests, which fails this test.
- testhelper.SkipWithPraefect(t, "https://gitlab.com/gitlab-org/gitaly/-/issues/4003")
-
t.Parallel()
ctx := testhelper.Context(t)
@@ -87,7 +84,7 @@ func TestRenameRepository_invalidRequest(t *testing.T) {
{
desc: "empty repository",
req: &gitalypb.RenameRepositoryRequest{Repository: nil, RelativePath: "/tmp/abc"},
- exp: status.Error(codes.InvalidArgument, gitalyOrPraefect("empty Repository", "repo scoped: empty Repository")),
+ exp: status.Error(codes.InvalidArgument, "empty Repository"),
},
{
desc: "empty destination relative path",
@@ -101,20 +98,20 @@ func TestRenameRepository_invalidRequest(t *testing.T) {
},
{
desc: "repository storage doesn't exist",
- req: &gitalypb.RenameRepositoryRequest{Repository: &gitalypb.Repository{StorageName: "stub", RelativePath: repo.RelativePath}, RelativePath: "../usr/bin"},
- exp: status.Error(codes.InvalidArgument, gitalyOrPraefect(`GetStorageByName: no such storage: "stub"`, "repo scoped: invalid Repository")),
+ req: &gitalypb.RenameRepositoryRequest{Repository: &gitalypb.Repository{StorageName: "stub", RelativePath: repo.RelativePath}, RelativePath: "usr/bin"},
+ exp: status.Error(codes.InvalidArgument, `GetStorageByName: no such storage: "stub"`),
},
{
desc: "repository relative path doesn't exist",
- req: &gitalypb.RenameRepositoryRequest{Repository: &gitalypb.Repository{StorageName: repo.StorageName, RelativePath: "stub"}, RelativePath: "../usr/bin"},
- exp: status.Error(codes.NotFound, fmt.Sprintf(`GetRepoPath: not a git repository: "%s/stub"`, storagePath)),
+ req: &gitalypb.RenameRepositoryRequest{Repository: &gitalypb.Repository{StorageName: repo.StorageName, RelativePath: "stub"}, RelativePath: "non-existent/directory"},
+ exp: status.Error(codes.NotFound, fmt.Sprintf(`GetRepoPath: not a git repository: "%s/stub"`, gitalyOrPraefect(storagePath, repo.GetStorageName()))),
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
_, err := client.RenameRepository(ctx, tc.req)
- testhelper.RequireGrpcError(t, err, tc.exp)
+ testhelper.RequireGrpcError(t, tc.exp, err)
})
}
}
diff --git a/internal/praefect/rename_repository.go b/internal/praefect/rename_repository.go
new file mode 100644
index 000000000..495b18e4b
--- /dev/null
+++ b/internal/praefect/rename_repository.go
@@ -0,0 +1,160 @@
+package praefect
+
+import (
+ "errors"
+ "fmt"
+ "strings"
+
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/grpc-proxy/proxy"
+ "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+ "google.golang.org/grpc"
+)
+
+type renamePeeker struct {
+ grpc.ServerStream
+ peeked *gitalypb.RenameRepositoryRequest
+}
+
+func (peeker *renamePeeker) RecvMsg(msg interface{}) error {
+ // On the first read, we'll return the peeked first message of the stream.
+ if peeker.peeked != nil {
+ peeked := peeker.peeked
+ peeker.peeked = nil
+
+ codec := proxy.NewCodec()
+ payload, err := codec.Marshal(peeked)
+ if err != nil {
+ return fmt.Errorf("marshaling peeked rename request: %w", err)
+ }
+
+ return codec.Unmarshal(payload, msg)
+ }
+
+ return peeker.ServerStream.RecvMsg(msg)
+}
+
+func validateRenameRepositoryRequest(req *gitalypb.RenameRepositoryRequest, virtualStorages map[string]struct{}) error {
+ // These checks are not strictly necessary but they exist to keep retain compatibility with
+ // Gitaly's tested behavior.
+ if req.GetRepository() == nil {
+ return helper.ErrInvalidArgumentf("empty Repository")
+ } else if req.GetRelativePath() == "" {
+ return helper.ErrInvalidArgumentf("destination relative path is empty")
+ } else if _, ok := virtualStorages[req.GetRepository().GetStorageName()]; !ok {
+ return helper.ErrInvalidArgumentf("GetStorageByName: no such storage: %q", req.GetRepository().GetStorageName())
+ } else if _, err := storage.ValidateRelativePath("/fake-root", req.GetRelativePath()); err != nil {
+ // Gitaly uses ValidateRelativePath to verify there are no traversals, so we use the same function
+ // here. Praefect is not susceptible to path traversals as it generates its own disk paths but we
+ // do this to retain API compatibility with Gitaly. ValidateRelativePath checks for traversals by
+ // seeing whether the relative path escapes the root directory. It's not possible to traverse up
+ // from the /, so the traversals in the path wouldn't be caught. To allow for the check to work,
+ // we use the /fake-root directory simply to notice if there were traversals in the path.
+ return helper.ErrInvalidArgumentf("GetRepoPath: %s", err)
+ }
+
+ return nil
+}
+
+// RenameRepositoryFeatureFlagger decides whether Praefect should handle the rename request or whether it should
+// be proxied to a Gitaly. Rolling out Praefect generated replica paths is difficult as the atomicity fixes depend on the
+// unique replica paths. If the unique replica paths are disabled, the in-place rename handling makes no longer sense either.
+// Since they don't work isolation, this method decides which handling is used based on whether the repository is using a Praefect
+// generated replica path or not. Repositories with client set paths are handled non-atomically by proxying to Gitalys.
+// The Praefect generated paths are always handled with the atomic handling, regardless whether the feature flag is disabled
+// later.
+//
+// This function peeks the first request and forwards the call either to a Gitaly or handles it in Praefect. This requires
+// peeking into the internals of the proxying so we can set restore the frame correctly.
+func RenameRepositoryFeatureFlagger(virtualStorageNames []string, rs datastore.RepositoryStore, handleRenameRepository grpc.StreamHandler) grpc.StreamServerInterceptor {
+ virtualStorages := make(map[string]struct{}, len(virtualStorageNames))
+ for _, virtualStorage := range virtualStorageNames {
+ virtualStorages[virtualStorage] = struct{}{}
+ }
+
+ return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
+ if info.FullMethod != "/gitaly.RepositoryService/RenameRepository" {
+ return handler(srv, stream)
+ }
+
+ // Peek the message
+ var request gitalypb.RenameRepositoryRequest
+ if err := stream.RecvMsg(&request); err != nil {
+ return fmt.Errorf("peek rename repository request: %w", err)
+ }
+
+ // In order for the handlers to work after the message is peeked, the stream is restored
+ // with an alternative implementation that returns the first message correctly.
+ stream = &renamePeeker{ServerStream: stream, peeked: &request}
+
+ if err := validateRenameRepositoryRequest(&request, virtualStorages); err != nil {
+ return err
+ }
+
+ repo := request.GetRepository()
+ repositoryID, err := rs.GetRepositoryID(stream.Context(), repo.GetStorageName(), repo.GetRelativePath())
+ if err != nil {
+ if errors.As(err, new(commonerr.RepositoryNotFoundError)) {
+ return helper.ErrNotFoundf("GetRepoPath: not a git repository: \"%s/%s\"", repo.GetStorageName(), repo.GetRelativePath())
+ }
+
+ return fmt.Errorf("get repository id: %w", err)
+ }
+
+ replicaPath, err := rs.GetReplicaPath(stream.Context(), repositoryID)
+ if err != nil {
+ return fmt.Errorf("get replica path: %w", err)
+ }
+
+ // Repositories that do not have a Praefect generated replica path are always handled in the old manner.
+ // Once the feature flag is removed, all of the repositories will be handled in the atomic manner.
+ if !strings.HasPrefix(replicaPath, "@cluster") {
+ return handler(srv, stream)
+ }
+
+ return handleRenameRepository(srv, stream)
+ }
+}
+
+// RenameRepositoryHandler handles /gitaly.RepositoryService/RenameRepository calls by renaming
+// the repository in the lookup table stored in the database.
+func RenameRepositoryHandler(virtualStoragesNames []string, rs datastore.RepositoryStore) grpc.StreamHandler {
+ virtualStorages := make(map[string]struct{}, len(virtualStoragesNames))
+ for _, virtualStorage := range virtualStoragesNames {
+ virtualStorages[virtualStorage] = struct{}{}
+ }
+
+ return func(srv interface{}, stream grpc.ServerStream) error {
+ var req gitalypb.RenameRepositoryRequest
+ if err := stream.RecvMsg(&req); err != nil {
+ return fmt.Errorf("receive request: %w", err)
+ }
+
+ if err := validateRenameRepositoryRequest(&req, virtualStorages); err != nil {
+ return err
+ }
+
+ if err := rs.RenameRepositoryInPlace(stream.Context(),
+ req.GetRepository().GetStorageName(),
+ req.GetRepository().GetRelativePath(),
+ req.GetRelativePath(),
+ ); err != nil {
+ if errors.Is(err, commonerr.ErrRepositoryNotFound) {
+ return helper.ErrNotFoundf(
+ `GetRepoPath: not a git repository: "%s/%s"`,
+ req.GetRepository().GetStorageName(),
+ req.GetRepository().GetRelativePath(),
+ )
+ } else if errors.Is(err, commonerr.ErrRepositoryAlreadyExists) {
+ return helper.ErrAlreadyExistsf("target repo exists already")
+ }
+
+ return helper.ErrInternal(err)
+ }
+
+ return stream.SendMsg(&gitalypb.RenameRepositoryResponse{})
+ }
+}
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index d75810246..d894bb3f3 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -111,6 +111,13 @@ func NewGRPCServer(
panichandler.StreamPanicHandler,
}
+ if conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository {
+ streamInterceptors = append(
+ streamInterceptors,
+ RenameRepositoryFeatureFlagger(conf.VirtualStorageNames(), rs, RenameRepositoryHandler(conf.VirtualStorageNames(), rs)),
+ )
+ }
+
grpcOpts = append(grpcOpts, proxyRequiredOpts(director)...)
grpcOpts = append(grpcOpts, []grpc.ServerOption{
grpc.StreamInterceptor(grpcmw.ChainStreamServer(streamInterceptors...)),
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index c3681a34c..a47e82529 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -7,8 +7,6 @@ import (
"io"
"math/rand"
"net"
- "os"
- "path/filepath"
"sort"
"strings"
"sync"
@@ -26,8 +24,8 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
- "gitlab.com/gitlab-org/gitaly/v14/internal/helper/text"
"gitlab.com/gitlab-org/gitaly/v14/internal/listenmux"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/grpc-proxy/proxy"
@@ -568,43 +566,20 @@ func TestRemoveRepository(t *testing.T) {
verifyReposExistence(t, codes.NotFound)
}
-func pollUntilRemoved(t testing.TB, path string, deadline <-chan time.Time) {
- for {
- select {
- case <-deadline:
- require.Failf(t, "unable to detect path removal for %s", path)
- default:
- _, err := os.Stat(path)
- if os.IsNotExist(err) {
- return
- }
- require.NoError(t, err, "unexpected error while checking path %s", path)
- }
- time.Sleep(time.Millisecond)
- }
-}
-
func TestRenameRepository(t *testing.T) {
- t.Parallel()
- ctx := testhelper.Context(t)
+ testhelper.NewFeatureSets(featureflag.PraefectGeneratedReplicaPaths).Run(t, testRenameRepository)
+}
+func testRenameRepository(t *testing.T, ctx context.Context) {
gitalyStorages := []string{"gitaly-1", "gitaly-2", "gitaly-3"}
- repoPaths := make([]string, len(gitalyStorages))
praefectCfg := config.Config{
VirtualStorages: []*config.VirtualStorage{{Name: "praefect"}},
Failover: config.Failover{Enabled: true, ElectionStrategy: config.ElectionStrategyPerRepository},
}
- var repo *gitalypb.Repository
- for i, storageName := range gitalyStorages {
- const relativePath = "test-repository"
-
+ for _, storageName := range gitalyStorages {
cfgBuilder := testcfg.NewGitalyCfgBuilder(testcfg.WithStorages(storageName))
- gitalyCfg, repos := cfgBuilder.BuildWithRepoAt(t, relativePath)
- if repo == nil {
- repo = repos[0]
- }
-
+ gitalyCfg := cfgBuilder.Build(t)
gitalyAddr := testserver.RunGitalyServer(t, gitalyCfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
praefectCfg.VirtualStorages[0].Nodes = append(praefectCfg.VirtualStorages[0].Nodes, &config.Node{
@@ -612,76 +587,95 @@ func TestRenameRepository(t *testing.T) {
Address: gitalyAddr,
Token: gitalyCfg.Auth.Token,
})
-
- repoPaths[i] = filepath.Join(gitalyCfg.Storages[0].Path, relativePath)
}
evq := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t)))
- tx := testdb.New(t).Begin(t)
- defer tx.Rollback(t)
+ db := testdb.New(t)
+
+ rs := datastore.NewPostgresRepositoryStore(db, nil)
- rs := datastore.NewPostgresRepositoryStore(tx, nil)
- require.NoError(t, rs.CreateRepository(ctx, 1, "praefect", repo.RelativePath, repo.RelativePath, "gitaly-1", []string{"gitaly-2", "gitaly-3"}, nil, true, false))
+ txManager := transactions.NewManager(praefectCfg)
+ logger := testhelper.NewDiscardingLogEntry(t)
+ clientHandshaker := backchannel.NewClientHandshaker(
+ logger,
+ NewBackchannelServerFactory(
+ logger,
+ transaction.NewServer(txManager),
+ nil,
+ ),
+ )
- nodeSet, err := DialNodes(ctx, praefectCfg.VirtualStorages, nil, nil, nil, nil)
+ nodeSet, err := DialNodes(ctx, praefectCfg.VirtualStorages, nil, nil, clientHandshaker, nil)
require.NoError(t, err)
defer nodeSet.Close()
- testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": praefectCfg.StorageNames()})
-
cc, _, cleanup := runPraefectServer(t, ctx, praefectCfg, buildOptions{
withQueue: evq,
withRepoStore: rs,
withRouter: NewPerRepositoryRouter(
nodeSet.Connections(),
- nodes.NewPerRepositoryElector(tx),
+ nodes.NewPerRepositoryElector(db),
StaticHealthChecker(praefectCfg.StorageNames()),
NewLockedRandom(rand.New(rand.NewSource(0))),
rs,
- datastore.NewAssignmentStore(tx, praefectCfg.StorageNames()),
+ datastore.NewAssignmentStore(db, praefectCfg.StorageNames()),
rs,
nil,
),
+ withTxMgr: txManager,
})
- defer cleanup()
+ t.Cleanup(cleanup)
- // virtualRepo is a virtual repository all requests to it would be applied to the underline Gitaly nodes behind it
- virtualRepo := proto.Clone(repo).(*gitalypb.Repository)
- virtualRepo.StorageName = praefectCfg.VirtualStorages[0].Name
+ virtualRepo1, _ := gittest.CreateRepository(ctx, t, gconfig.Cfg{
+ Storages: []gconfig.Storage{{Name: "praefect"}},
+ }, gittest.CreateRepositoryConfig{ClientConn: cc})
+
+ virtualRepo2, _ := gittest.CreateRepository(ctx, t, gconfig.Cfg{
+ Storages: []gconfig.Storage{{Name: "praefect"}},
+ }, gittest.CreateRepositoryConfig{ClientConn: cc})
+
+ const newRelativePath = "unused-relative-path"
repoServiceClient := gitalypb.NewRepositoryServiceClient(cc)
- newName, err := text.RandomHex(20)
- require.NoError(t, err)
+ _, err = repoServiceClient.RenameRepository(ctx, &gitalypb.RenameRepositoryRequest{
+ Repository: &gitalypb.Repository{
+ StorageName: virtualRepo1.StorageName,
+ RelativePath: "not-found",
+ },
+ RelativePath: virtualRepo2.RelativePath,
+ })
+ testhelper.RequireGrpcError(t, helper.ErrNotFoundf(`GetRepoPath: not a git repository: "praefect/not-found"`), err)
+
+ _, err = repoServiceClient.RenameRepository(ctx, &gitalypb.RenameRepositoryRequest{
+ Repository: virtualRepo1,
+ RelativePath: virtualRepo2.RelativePath,
+ })
+
+ expectedErr := helper.ErrAlreadyExistsf("target repo exists already")
+ testhelper.RequireGrpcError(t, expectedErr, err)
_, err = repoServiceClient.RenameRepository(ctx, &gitalypb.RenameRepositoryRequest{
- Repository: virtualRepo,
- RelativePath: newName,
+ Repository: virtualRepo1,
+ RelativePath: newRelativePath,
})
require.NoError(t, err)
resp, err := repoServiceClient.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{
- Repository: virtualRepo,
+ Repository: virtualRepo1,
})
require.NoError(t, err)
require.False(t, resp.GetExists(), "repo with old name must gone")
- // as we renamed the repo we need to update RelativePath before we could check if it exists
- renamedVirtualRepo := virtualRepo
- renamedVirtualRepo.RelativePath = newName
-
- // wait until replication jobs propagate changes to other storages
- // as we don't know which one will be used to check because of reads distribution
- require.NoError(t, evq.Wait(time.Minute, func(i *datastore.ReplicationEventQueueInterceptor) bool {
- return len(i.GetAcknowledge()) == 2
- }))
-
- for _, oldLocation := range repoPaths {
- pollUntilRemoved(t, oldLocation, time.After(10*time.Second))
- newLocation := filepath.Join(filepath.Dir(oldLocation), newName)
- require.DirExists(t, newLocation, "must be renamed on secondary from %q to %q", oldLocation, newLocation)
- }
+ resp, err = repoServiceClient.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{
+ Repository: &gitalypb.Repository{
+ StorageName: virtualRepo1.StorageName,
+ RelativePath: newRelativePath,
+ },
+ })
+ require.NoError(t, err)
+ require.True(t, resp.GetExists(), "repo with new name must exist")
}
type mockSmartHTTP struct {