diff options
author | James Fargher <jfargher@gitlab.com> | 2022-11-21 00:27:15 +0300 |
---|---|---|
committer | James Fargher <jfargher@gitlab.com> | 2022-11-21 00:27:15 +0300 |
commit | 42bd25f0729b28c02dd0bf1f67f3472e6075c287 (patch) | |
tree | 8dc70d1d77d2555043292e298033528ed17e37bb | |
parent | d535557de9cbe5e32056b0af2bbf48e6d3e66baf (diff) |
Revert "Merge branch 'smh-remove-path-flag' into 'master'"revert_remove_path_flag
This reverts commit 0fac1b0f097f7eebbdc41ce8f20b4af0aa91710f, reversing
changes made to 9a0bd2944addccf8710c98f90144281a52278f50.
-rw-r--r-- | internal/gitaly/service/repository/rename_test.go | 26 | ||||
-rw-r--r-- | internal/metadata/featureflag/ff_praefect_generated_paths.go | 9 | ||||
-rw-r--r-- | internal/praefect/rename_repository.go | 85 | ||||
-rw-r--r-- | internal/praefect/router_per_repository.go | 16 | ||||
-rw-r--r-- | internal/praefect/router_per_repository_test.go | 14 | ||||
-rw-r--r-- | internal/praefect/server.go | 8 | ||||
-rw-r--r-- | internal/praefect/server_test.go | 6 | ||||
-rw-r--r-- | internal/testhelper/testhelper.go | 3 |
8 files changed, 146 insertions, 21 deletions
diff --git a/internal/gitaly/service/repository/rename_test.go b/internal/gitaly/service/repository/rename_test.go index 12e8383d1..d252345c4 100644 --- a/internal/gitaly/service/repository/rename_test.go +++ b/internal/gitaly/service/repository/rename_test.go @@ -3,6 +3,7 @@ package repository import ( + "context" "fmt" "os" "path/filepath" @@ -13,6 +14,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v15/internal/git" "gitlab.com/gitlab-org/gitaly/v15/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testserver" "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb" @@ -20,10 +22,12 @@ import ( "google.golang.org/grpc/status" ) -func TestRenameRepositorySuccess(t *testing.T) { - t.Parallel() +func TestRenameRepository_success(t *testing.T) { + testhelper.NewFeatureSets(featureflag.PraefectGeneratedReplicaPaths).Run(t, testRenameRepositorySuccess) +} - ctx := testhelper.Context(t) +func testRenameRepositorySuccess(t *testing.T, ctx context.Context) { + t.Parallel() // Praefect does not move repositories on the disk so this test case is not run with Praefect. cfg, repo, _, client := setupRepositoryService(t, ctx, testserver.WithDisablePraefect()) @@ -45,10 +49,12 @@ func TestRenameRepositorySuccess(t *testing.T) { gittest.RequireObjectExists(t, cfg, newDirectory, git.ObjectID("913c66a37b4a45b9769037c55c2d238bd0942d2e")) } -func TestRenameRepositoryDestinationExists(t *testing.T) { - t.Parallel() +func TestRenameRepository_DestinationExists(t *testing.T) { + testhelper.NewFeatureSets(featureflag.PraefectGeneratedReplicaPaths).Run(t, testRenameRepositoryDestinationExists) +} - ctx := testhelper.Context(t) +func testRenameRepositoryDestinationExists(t *testing.T, ctx context.Context) { + t.Parallel() cfg, client := setupRepositoryServiceWithoutRepo(t) @@ -73,10 +79,12 @@ func TestRenameRepositoryDestinationExists(t *testing.T) { gittest.RequireObjectExists(t, cfg, destinationRepoPath, commitID) } -func TestRenameRepositoryInvalidRequest(t *testing.T) { - t.Parallel() +func TestRenameRepository_invalidRequest(t *testing.T) { + testhelper.NewFeatureSets(featureflag.PraefectGeneratedReplicaPaths).Run(t, testRenameRepositoryInvalidRequest) +} - ctx := testhelper.Context(t) +func testRenameRepositoryInvalidRequest(t *testing.T, ctx context.Context) { + t.Parallel() _, repo, repoPath, client := setupRepositoryService(t, ctx) storagePath := strings.TrimSuffix(repoPath, "/"+repo.RelativePath) diff --git a/internal/metadata/featureflag/ff_praefect_generated_paths.go b/internal/metadata/featureflag/ff_praefect_generated_paths.go new file mode 100644 index 000000000..6e3103959 --- /dev/null +++ b/internal/metadata/featureflag/ff_praefect_generated_paths.go @@ -0,0 +1,9 @@ +package featureflag + +// PraefectGeneratedReplicaPaths will enable Praefect generated replica paths for new repositories. +var PraefectGeneratedReplicaPaths = NewFeatureFlag( + "praefect_generated_replica_paths", + "v15.0.0", + "https://gitlab.com/gitlab-org/gitaly/-/issues/4218", + true, +) diff --git a/internal/praefect/rename_repository.go b/internal/praefect/rename_repository.go index 1e6f88879..91f691fef 100644 --- a/internal/praefect/rename_repository.go +++ b/internal/praefect/rename_repository.go @@ -3,16 +3,41 @@ package praefect import ( "errors" "fmt" + "strings" "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v15/internal/helper" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/commonerr" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/grpc-proxy/proxy" "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb" "google.golang.org/grpc" ) +type renamePeeker struct { + grpc.ServerStream + peeked *gitalypb.RenameRepositoryRequest +} + +func (peeker *renamePeeker) RecvMsg(msg interface{}) error { + // On the first read, we'll return the peeked first message of the stream. + if peeker.peeked != nil { + peeked := peeker.peeked + peeker.peeked = nil + + codec := proxy.NewCodec() + payload, err := codec.Marshal(peeked) + if err != nil { + return fmt.Errorf("marshaling peeked rename request: %w", err) + } + + return codec.Unmarshal(payload, msg) + } + + return peeker.ServerStream.RecvMsg(msg) +} + func validateRenameRepositoryRequest(req *gitalypb.RenameRepositoryRequest, virtualStorages map[string]struct{}) error { // These checks are not strictly necessary but they exist to keep retain compatibility with // Gitaly's tested behavior. @@ -36,6 +61,66 @@ func validateRenameRepositoryRequest(req *gitalypb.RenameRepositoryRequest, virt return nil } +// RenameRepositoryFeatureFlagger decides whether Praefect should handle the rename request or whether it should +// be proxied to a Gitaly. Rolling out Praefect generated replica paths is difficult as the atomicity fixes depend on the +// unique replica paths. If the unique replica paths are disabled, the in-place rename handling makes no longer sense either. +// Since they don't work isolation, this method decides which handling is used based on whether the repository is using a Praefect +// generated replica path or not. Repositories with client set paths are handled non-atomically by proxying to Gitalys. +// The Praefect generated paths are always handled with the atomic handling, regardless whether the feature flag is disabled +// later. +// +// This function peeks the first request and forwards the call either to a Gitaly or handles it in Praefect. This requires +// peeking into the internals of the proxying so we can set restore the frame correctly. +func RenameRepositoryFeatureFlagger(virtualStorageNames []string, rs datastore.RepositoryStore, handleRenameRepository grpc.StreamHandler) grpc.StreamServerInterceptor { + virtualStorages := make(map[string]struct{}, len(virtualStorageNames)) + for _, virtualStorage := range virtualStorageNames { + virtualStorages[virtualStorage] = struct{}{} + } + + return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + if info.FullMethod != "/gitaly.RepositoryService/RenameRepository" { + return handler(srv, stream) + } + + // Peek the message + var request gitalypb.RenameRepositoryRequest + if err := stream.RecvMsg(&request); err != nil { + return fmt.Errorf("peek rename repository request: %w", err) + } + + // In order for the handlers to work after the message is peeked, the stream is restored + // with an alternative implementation that returns the first message correctly. + stream = &renamePeeker{ServerStream: stream, peeked: &request} + + if err := validateRenameRepositoryRequest(&request, virtualStorages); err != nil { + return err + } + + repo := request.GetRepository() + repositoryID, err := rs.GetRepositoryID(stream.Context(), repo.GetStorageName(), repo.GetRelativePath()) + if err != nil { + if errors.As(err, new(commonerr.RepositoryNotFoundError)) { + return helper.ErrNotFoundf("GetRepoPath: not a git repository: \"%s/%s\"", repo.GetStorageName(), repo.GetRelativePath()) + } + + return fmt.Errorf("get repository id: %w", err) + } + + replicaPath, err := rs.GetReplicaPath(stream.Context(), repositoryID) + if err != nil { + return fmt.Errorf("get replica path: %w", err) + } + + // Repositories that do not have a Praefect generated replica path are always handled in the old manner. + // Once the feature flag is removed, all of the repositories will be handled in the atomic manner. + if !strings.HasPrefix(replicaPath, "@cluster") { + return handler(srv, stream) + } + + return handleRenameRepository(srv, stream) + } +} + // RenameRepositoryHandler handles /gitaly.RepositoryService/RenameRepository calls by renaming // the repository in the lookup table stored in the database. func RenameRepositoryHandler(virtualStoragesNames []string, rs datastore.RepositoryStore) grpc.StreamHandler { diff --git a/internal/praefect/router_per_repository.go b/internal/praefect/router_per_repository.go index d9cc071fa..e248f1c9b 100644 --- a/internal/praefect/router_per_repository.go +++ b/internal/praefect/router_per_repository.go @@ -6,6 +6,7 @@ import ( "fmt" "gitlab.com/gitlab-org/gitaly/v15/internal/git/housekeeping" + "gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/praefectutil" @@ -310,12 +311,15 @@ func (r *PerRepositoryRouter) RouteRepositoryCreation(ctx context.Context, virtu return RepositoryMutatorRoute{}, fmt.Errorf("reserve repository id: %w", err) } - replicaPath := praefectutil.DeriveReplicaPath(id) - if housekeeping.IsRailsPoolRepository(&gitalypb.Repository{ - StorageName: virtualStorage, - RelativePath: relativePath, - }) { - replicaPath = praefectutil.DerivePoolPath(id) + replicaPath := relativePath + if featureflag.PraefectGeneratedReplicaPaths.IsEnabled(ctx) { + replicaPath = praefectutil.DeriveReplicaPath(id) + if housekeeping.IsRailsPoolRepository(&gitalypb.Repository{ + StorageName: virtualStorage, + RelativePath: relativePath, + }) { + replicaPath = praefectutil.DerivePoolPath(id) + } } replicationFactor := r.defaultReplicationFactors[virtualStorage] diff --git a/internal/praefect/router_per_repository_test.go b/internal/praefect/router_per_repository_test.go index fa60d9e33..711ea9563 100644 --- a/internal/praefect/router_per_repository_test.go +++ b/internal/praefect/router_per_repository_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v15/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/commonerr" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/nodes" @@ -664,10 +665,12 @@ func TestPerRepositoryRouter_RouteRepositoryMaintenance(t *testing.T) { } } -func TestPerRepositoryRouterRouteRepositoryCreation(t *testing.T) { - t.Parallel() +func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { + testhelper.NewFeatureSets(featureflag.PraefectGeneratedReplicaPaths).Run(t, testPerRepositoryRouterRouteRepositoryCreation) +} - ctx := testhelper.Context(t) +func testPerRepositoryRouterRouteRepositoryCreation(t *testing.T, ctx context.Context) { + t.Parallel() configuredNodes := map[string][]string{ "virtual-storage-1": {"primary", "secondary-1", "secondary-2"}, @@ -697,7 +700,10 @@ func TestPerRepositoryRouterRouteRepositoryCreation(t *testing.T) { additionalReplicaPath = "additional-replica-path" ) - replicaPath := praefectutil.DeriveReplicaPath(1) + replicaPath := relativePath + if featureflag.PraefectGeneratedReplicaPaths.IsEnabled(ctx) { + replicaPath = praefectutil.DeriveReplicaPath(1) + } for _, tc := range []struct { desc string diff --git a/internal/praefect/server.go b/internal/praefect/server.go index f85c85bac..718053d6f 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -114,6 +114,13 @@ func NewGRPCServer( panichandler.StreamPanicHandler, } + if conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository { + streamInterceptors = append( + streamInterceptors, + RenameRepositoryFeatureFlagger(conf.VirtualStorageNames(), rs, RenameRepositoryHandler(conf.VirtualStorageNames(), rs)), + ) + } + grpcOpts = append(grpcOpts, proxyRequiredOpts(director)...) grpcOpts = append(grpcOpts, []grpc.ServerOption{ grpc.StreamInterceptor(grpcmw.ChainStreamServer(streamInterceptors...)), @@ -155,7 +162,6 @@ func NewGRPCServer( proxy.RegisterStreamHandlers(srv, "gitaly.RepositoryService", map[string]grpc.StreamHandler{ "RepositoryExists": RepositoryExistsHandler(rs), "RemoveRepository": RemoveRepositoryHandler(rs, conns), - "RenameRepository": RenameRepositoryHandler(conf.VirtualStorageNames(), rs), }) proxy.RegisterStreamHandlers(srv, "gitaly.ObjectPoolService", map[string]grpc.StreamHandler{ "DeleteObjectPool": DeleteObjectPoolHandler(rs, conns), diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index 6d5dc81a5..eca6ec0e1 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -27,6 +27,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v15/internal/helper" "gitlab.com/gitlab-org/gitaly/v15/internal/listenmux" + "gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/grpc-proxy/proxy" @@ -566,6 +567,10 @@ func TestRemoveRepository(t *testing.T) { } func TestRenameRepository(t *testing.T) { + testhelper.NewFeatureSets(featureflag.PraefectGeneratedReplicaPaths).Run(t, testRenameRepository) +} + +func testRenameRepository(t *testing.T, ctx context.Context) { gitalyStorages := []string{"gitaly-1", "gitaly-2", "gitaly-3"} praefectCfg := config.Config{ VirtualStorages: []*config.VirtualStorage{{Name: "praefect"}}, @@ -601,7 +606,6 @@ func TestRenameRepository(t *testing.T) { ), ) - ctx := testhelper.Context(t) nodeSet, err := DialNodes(ctx, praefectCfg.VirtualStorages, nil, nil, clientHandshaker, nil) require.NoError(t, err) defer nodeSet.Close() diff --git a/internal/testhelper/testhelper.go b/internal/testhelper/testhelper.go index 1bee6fa47..0fc52dd9d 100644 --- a/internal/testhelper/testhelper.go +++ b/internal/testhelper/testhelper.go @@ -197,6 +197,9 @@ func ContextWithoutCancel(opts ...ContextOpt) context.Context { // context. ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.RunCommandsInCGroup, true) ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.ConvertErrToStatus, true) + // PraefectGeneratedReplicaPaths affects many tests as it changes the repository creation logic. + // Randomly enable the flag to exercise both paths to some extent. + ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.PraefectGeneratedReplicaPaths, rnd.Int()%2 == 0) // NodeErrorCancelsVoter affect many tests as it changes Praefect coordinator transaction logic. // Randomly enable the flag to exercise both paths to some extent. ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.NodeErrorCancelsVoter, rnd.Int()%2 == 0) |