Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames Fargher <jfargher@gitlab.com>2022-11-21 00:27:15 +0300
committerJames Fargher <jfargher@gitlab.com>2022-11-21 00:27:15 +0300
commit42bd25f0729b28c02dd0bf1f67f3472e6075c287 (patch)
tree8dc70d1d77d2555043292e298033528ed17e37bb
parentd535557de9cbe5e32056b0af2bbf48e6d3e66baf (diff)
Revert "Merge branch 'smh-remove-path-flag' into 'master'"revert_remove_path_flag
This reverts commit 0fac1b0f097f7eebbdc41ce8f20b4af0aa91710f, reversing changes made to 9a0bd2944addccf8710c98f90144281a52278f50.
-rw-r--r--internal/gitaly/service/repository/rename_test.go26
-rw-r--r--internal/metadata/featureflag/ff_praefect_generated_paths.go9
-rw-r--r--internal/praefect/rename_repository.go85
-rw-r--r--internal/praefect/router_per_repository.go16
-rw-r--r--internal/praefect/router_per_repository_test.go14
-rw-r--r--internal/praefect/server.go8
-rw-r--r--internal/praefect/server_test.go6
-rw-r--r--internal/testhelper/testhelper.go3
8 files changed, 146 insertions, 21 deletions
diff --git a/internal/gitaly/service/repository/rename_test.go b/internal/gitaly/service/repository/rename_test.go
index 12e8383d1..d252345c4 100644
--- a/internal/gitaly/service/repository/rename_test.go
+++ b/internal/gitaly/service/repository/rename_test.go
@@ -3,6 +3,7 @@
package repository
import (
+ "context"
"fmt"
"os"
"path/filepath"
@@ -13,6 +14,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v15/internal/git"
"gitlab.com/gitlab-org/gitaly/v15/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/storage"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testserver"
"gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb"
@@ -20,10 +22,12 @@ import (
"google.golang.org/grpc/status"
)
-func TestRenameRepositorySuccess(t *testing.T) {
- t.Parallel()
+func TestRenameRepository_success(t *testing.T) {
+ testhelper.NewFeatureSets(featureflag.PraefectGeneratedReplicaPaths).Run(t, testRenameRepositorySuccess)
+}
- ctx := testhelper.Context(t)
+func testRenameRepositorySuccess(t *testing.T, ctx context.Context) {
+ t.Parallel()
// Praefect does not move repositories on the disk so this test case is not run with Praefect.
cfg, repo, _, client := setupRepositoryService(t, ctx, testserver.WithDisablePraefect())
@@ -45,10 +49,12 @@ func TestRenameRepositorySuccess(t *testing.T) {
gittest.RequireObjectExists(t, cfg, newDirectory, git.ObjectID("913c66a37b4a45b9769037c55c2d238bd0942d2e"))
}
-func TestRenameRepositoryDestinationExists(t *testing.T) {
- t.Parallel()
+func TestRenameRepository_DestinationExists(t *testing.T) {
+ testhelper.NewFeatureSets(featureflag.PraefectGeneratedReplicaPaths).Run(t, testRenameRepositoryDestinationExists)
+}
- ctx := testhelper.Context(t)
+func testRenameRepositoryDestinationExists(t *testing.T, ctx context.Context) {
+ t.Parallel()
cfg, client := setupRepositoryServiceWithoutRepo(t)
@@ -73,10 +79,12 @@ func TestRenameRepositoryDestinationExists(t *testing.T) {
gittest.RequireObjectExists(t, cfg, destinationRepoPath, commitID)
}
-func TestRenameRepositoryInvalidRequest(t *testing.T) {
- t.Parallel()
+func TestRenameRepository_invalidRequest(t *testing.T) {
+ testhelper.NewFeatureSets(featureflag.PraefectGeneratedReplicaPaths).Run(t, testRenameRepositoryInvalidRequest)
+}
- ctx := testhelper.Context(t)
+func testRenameRepositoryInvalidRequest(t *testing.T, ctx context.Context) {
+ t.Parallel()
_, repo, repoPath, client := setupRepositoryService(t, ctx)
storagePath := strings.TrimSuffix(repoPath, "/"+repo.RelativePath)
diff --git a/internal/metadata/featureflag/ff_praefect_generated_paths.go b/internal/metadata/featureflag/ff_praefect_generated_paths.go
new file mode 100644
index 000000000..6e3103959
--- /dev/null
+++ b/internal/metadata/featureflag/ff_praefect_generated_paths.go
@@ -0,0 +1,9 @@
+package featureflag
+
+// PraefectGeneratedReplicaPaths will enable Praefect generated replica paths for new repositories.
+var PraefectGeneratedReplicaPaths = NewFeatureFlag(
+ "praefect_generated_replica_paths",
+ "v15.0.0",
+ "https://gitlab.com/gitlab-org/gitaly/-/issues/4218",
+ true,
+)
diff --git a/internal/praefect/rename_repository.go b/internal/praefect/rename_repository.go
index 1e6f88879..91f691fef 100644
--- a/internal/praefect/rename_repository.go
+++ b/internal/praefect/rename_repository.go
@@ -3,16 +3,41 @@ package praefect
import (
"errors"
"fmt"
+ "strings"
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/service"
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v15/internal/helper"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/commonerr"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/grpc-proxy/proxy"
"gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb"
"google.golang.org/grpc"
)
+type renamePeeker struct {
+ grpc.ServerStream
+ peeked *gitalypb.RenameRepositoryRequest
+}
+
+func (peeker *renamePeeker) RecvMsg(msg interface{}) error {
+ // On the first read, we'll return the peeked first message of the stream.
+ if peeker.peeked != nil {
+ peeked := peeker.peeked
+ peeker.peeked = nil
+
+ codec := proxy.NewCodec()
+ payload, err := codec.Marshal(peeked)
+ if err != nil {
+ return fmt.Errorf("marshaling peeked rename request: %w", err)
+ }
+
+ return codec.Unmarshal(payload, msg)
+ }
+
+ return peeker.ServerStream.RecvMsg(msg)
+}
+
func validateRenameRepositoryRequest(req *gitalypb.RenameRepositoryRequest, virtualStorages map[string]struct{}) error {
// These checks are not strictly necessary but they exist to keep retain compatibility with
// Gitaly's tested behavior.
@@ -36,6 +61,66 @@ func validateRenameRepositoryRequest(req *gitalypb.RenameRepositoryRequest, virt
return nil
}
+// RenameRepositoryFeatureFlagger decides whether Praefect should handle the rename request or whether it should
+// be proxied to a Gitaly. Rolling out Praefect generated replica paths is difficult as the atomicity fixes depend on the
+// unique replica paths. If the unique replica paths are disabled, the in-place rename handling makes no longer sense either.
+// Since they don't work isolation, this method decides which handling is used based on whether the repository is using a Praefect
+// generated replica path or not. Repositories with client set paths are handled non-atomically by proxying to Gitalys.
+// The Praefect generated paths are always handled with the atomic handling, regardless whether the feature flag is disabled
+// later.
+//
+// This function peeks the first request and forwards the call either to a Gitaly or handles it in Praefect. This requires
+// peeking into the internals of the proxying so we can set restore the frame correctly.
+func RenameRepositoryFeatureFlagger(virtualStorageNames []string, rs datastore.RepositoryStore, handleRenameRepository grpc.StreamHandler) grpc.StreamServerInterceptor {
+ virtualStorages := make(map[string]struct{}, len(virtualStorageNames))
+ for _, virtualStorage := range virtualStorageNames {
+ virtualStorages[virtualStorage] = struct{}{}
+ }
+
+ return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
+ if info.FullMethod != "/gitaly.RepositoryService/RenameRepository" {
+ return handler(srv, stream)
+ }
+
+ // Peek the message
+ var request gitalypb.RenameRepositoryRequest
+ if err := stream.RecvMsg(&request); err != nil {
+ return fmt.Errorf("peek rename repository request: %w", err)
+ }
+
+ // In order for the handlers to work after the message is peeked, the stream is restored
+ // with an alternative implementation that returns the first message correctly.
+ stream = &renamePeeker{ServerStream: stream, peeked: &request}
+
+ if err := validateRenameRepositoryRequest(&request, virtualStorages); err != nil {
+ return err
+ }
+
+ repo := request.GetRepository()
+ repositoryID, err := rs.GetRepositoryID(stream.Context(), repo.GetStorageName(), repo.GetRelativePath())
+ if err != nil {
+ if errors.As(err, new(commonerr.RepositoryNotFoundError)) {
+ return helper.ErrNotFoundf("GetRepoPath: not a git repository: \"%s/%s\"", repo.GetStorageName(), repo.GetRelativePath())
+ }
+
+ return fmt.Errorf("get repository id: %w", err)
+ }
+
+ replicaPath, err := rs.GetReplicaPath(stream.Context(), repositoryID)
+ if err != nil {
+ return fmt.Errorf("get replica path: %w", err)
+ }
+
+ // Repositories that do not have a Praefect generated replica path are always handled in the old manner.
+ // Once the feature flag is removed, all of the repositories will be handled in the atomic manner.
+ if !strings.HasPrefix(replicaPath, "@cluster") {
+ return handler(srv, stream)
+ }
+
+ return handleRenameRepository(srv, stream)
+ }
+}
+
// RenameRepositoryHandler handles /gitaly.RepositoryService/RenameRepository calls by renaming
// the repository in the lookup table stored in the database.
func RenameRepositoryHandler(virtualStoragesNames []string, rs datastore.RepositoryStore) grpc.StreamHandler {
diff --git a/internal/praefect/router_per_repository.go b/internal/praefect/router_per_repository.go
index d9cc071fa..e248f1c9b 100644
--- a/internal/praefect/router_per_repository.go
+++ b/internal/praefect/router_per_repository.go
@@ -6,6 +6,7 @@ import (
"fmt"
"gitlab.com/gitlab-org/gitaly/v15/internal/git/housekeeping"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/praefectutil"
@@ -310,12 +311,15 @@ func (r *PerRepositoryRouter) RouteRepositoryCreation(ctx context.Context, virtu
return RepositoryMutatorRoute{}, fmt.Errorf("reserve repository id: %w", err)
}
- replicaPath := praefectutil.DeriveReplicaPath(id)
- if housekeeping.IsRailsPoolRepository(&gitalypb.Repository{
- StorageName: virtualStorage,
- RelativePath: relativePath,
- }) {
- replicaPath = praefectutil.DerivePoolPath(id)
+ replicaPath := relativePath
+ if featureflag.PraefectGeneratedReplicaPaths.IsEnabled(ctx) {
+ replicaPath = praefectutil.DeriveReplicaPath(id)
+ if housekeeping.IsRailsPoolRepository(&gitalypb.Repository{
+ StorageName: virtualStorage,
+ RelativePath: relativePath,
+ }) {
+ replicaPath = praefectutil.DerivePoolPath(id)
+ }
}
replicationFactor := r.defaultReplicationFactors[virtualStorage]
diff --git a/internal/praefect/router_per_repository_test.go b/internal/praefect/router_per_repository_test.go
index fa60d9e33..711ea9563 100644
--- a/internal/praefect/router_per_repository_test.go
+++ b/internal/praefect/router_per_repository_test.go
@@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v15/internal/git/gittest"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/commonerr"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/nodes"
@@ -664,10 +665,12 @@ func TestPerRepositoryRouter_RouteRepositoryMaintenance(t *testing.T) {
}
}
-func TestPerRepositoryRouterRouteRepositoryCreation(t *testing.T) {
- t.Parallel()
+func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) {
+ testhelper.NewFeatureSets(featureflag.PraefectGeneratedReplicaPaths).Run(t, testPerRepositoryRouterRouteRepositoryCreation)
+}
- ctx := testhelper.Context(t)
+func testPerRepositoryRouterRouteRepositoryCreation(t *testing.T, ctx context.Context) {
+ t.Parallel()
configuredNodes := map[string][]string{
"virtual-storage-1": {"primary", "secondary-1", "secondary-2"},
@@ -697,7 +700,10 @@ func TestPerRepositoryRouterRouteRepositoryCreation(t *testing.T) {
additionalReplicaPath = "additional-replica-path"
)
- replicaPath := praefectutil.DeriveReplicaPath(1)
+ replicaPath := relativePath
+ if featureflag.PraefectGeneratedReplicaPaths.IsEnabled(ctx) {
+ replicaPath = praefectutil.DeriveReplicaPath(1)
+ }
for _, tc := range []struct {
desc string
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index f85c85bac..718053d6f 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -114,6 +114,13 @@ func NewGRPCServer(
panichandler.StreamPanicHandler,
}
+ if conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository {
+ streamInterceptors = append(
+ streamInterceptors,
+ RenameRepositoryFeatureFlagger(conf.VirtualStorageNames(), rs, RenameRepositoryHandler(conf.VirtualStorageNames(), rs)),
+ )
+ }
+
grpcOpts = append(grpcOpts, proxyRequiredOpts(director)...)
grpcOpts = append(grpcOpts, []grpc.ServerOption{
grpc.StreamInterceptor(grpcmw.ChainStreamServer(streamInterceptors...)),
@@ -155,7 +162,6 @@ func NewGRPCServer(
proxy.RegisterStreamHandlers(srv, "gitaly.RepositoryService", map[string]grpc.StreamHandler{
"RepositoryExists": RepositoryExistsHandler(rs),
"RemoveRepository": RemoveRepositoryHandler(rs, conns),
- "RenameRepository": RenameRepositoryHandler(conf.VirtualStorageNames(), rs),
})
proxy.RegisterStreamHandlers(srv, "gitaly.ObjectPoolService", map[string]grpc.StreamHandler{
"DeleteObjectPool": DeleteObjectPoolHandler(rs, conns),
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index 6d5dc81a5..eca6ec0e1 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -27,6 +27,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v15/internal/helper"
"gitlab.com/gitlab-org/gitaly/v15/internal/listenmux"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/grpc-proxy/proxy"
@@ -566,6 +567,10 @@ func TestRemoveRepository(t *testing.T) {
}
func TestRenameRepository(t *testing.T) {
+ testhelper.NewFeatureSets(featureflag.PraefectGeneratedReplicaPaths).Run(t, testRenameRepository)
+}
+
+func testRenameRepository(t *testing.T, ctx context.Context) {
gitalyStorages := []string{"gitaly-1", "gitaly-2", "gitaly-3"}
praefectCfg := config.Config{
VirtualStorages: []*config.VirtualStorage{{Name: "praefect"}},
@@ -601,7 +606,6 @@ func TestRenameRepository(t *testing.T) {
),
)
- ctx := testhelper.Context(t)
nodeSet, err := DialNodes(ctx, praefectCfg.VirtualStorages, nil, nil, clientHandshaker, nil)
require.NoError(t, err)
defer nodeSet.Close()
diff --git a/internal/testhelper/testhelper.go b/internal/testhelper/testhelper.go
index 1bee6fa47..0fc52dd9d 100644
--- a/internal/testhelper/testhelper.go
+++ b/internal/testhelper/testhelper.go
@@ -197,6 +197,9 @@ func ContextWithoutCancel(opts ...ContextOpt) context.Context {
// context.
ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.RunCommandsInCGroup, true)
ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.ConvertErrToStatus, true)
+ // PraefectGeneratedReplicaPaths affects many tests as it changes the repository creation logic.
+ // Randomly enable the flag to exercise both paths to some extent.
+ ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.PraefectGeneratedReplicaPaths, rnd.Int()%2 == 0)
// NodeErrorCancelsVoter affect many tests as it changes Praefect coordinator transaction logic.
// Randomly enable the flag to exercise both paths to some extent.
ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.NodeErrorCancelsVoter, rnd.Int()%2 == 0)