Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2021-02-04 15:14:52 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2021-02-04 15:14:52 +0300
commit0da32707243b9c339f75cd5a1c03b00c88502f86 (patch)
treea3c55fbb7bb64401541fbda6aac1f17a470d9efe
parent0460209dab07b05be09be6fc3d050211fdf4ded5 (diff)
parent1102b0b67283c229f75cf10fec4caa2fb7bfa3ad (diff)
Merge branch 'pks-praefect-forced-primary-routing' into 'master'
praefect: Implement ability to force-route repo-scoped accessors to the primary See merge request gitlab-org/gitaly!3093
-rw-r--r--internal/praefect/coordinator_test.go78
-rw-r--r--internal/praefect/router_node_manager.go9
-rw-r--r--internal/praefect/router_per_repository.go35
-rw-r--r--internal/praefect/router_per_repository_test.go26
4 files changed, 147 insertions, 1 deletions
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 43fec982e..cd2e8f449 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -418,7 +418,7 @@ func TestStreamDirectorAccessor(t *testing.T) {
func TestCoordinatorStreamDirector_distributesReads(t *testing.T) {
gitalySocket0, gitalySocket1 := testhelper.GetTemporaryGitalySocketFileName(t), testhelper.GetTemporaryGitalySocketFileName(t)
- srv1, _ := testhelper.NewServerWithHealth(t, gitalySocket0)
+ srv1, primaryHealthSrv := testhelper.NewServerWithHealth(t, gitalySocket0)
defer srv1.Stop()
srv2, healthSrv := testhelper.NewServerWithHealth(t, gitalySocket1)
defer srv2.Stop()
@@ -526,6 +526,55 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) {
require.NotZero(t, secondaryChosen, "secondary should have been chosen at least once")
})
+ t.Run("forwards accessor to primary if force-routing", func(t *testing.T) {
+ var primaryChosen int
+ var secondaryChosen int
+
+ for i := 0; i < 16; i++ {
+ frame, err := proto.Marshal(&gitalypb.FindAllBranchesRequest{Repository: &targetRepo})
+ require.NoError(t, err)
+
+ fullMethod := "/gitaly.RefService/FindAllBranches"
+
+ peeker := &mockPeeker{frame: frame}
+
+ ctx := correlation.ContextWithCorrelation(ctx, "my-correlation-id")
+ ctx = testhelper.MergeIncomingMetadata(ctx, metadata.Pairs(routeRepositoryAccessorPolicy, routeRepositoryAccessorPolicyPrimaryOnly))
+
+ streamParams, err := coordinator.StreamDirector(ctx, fullMethod, peeker)
+ require.NoError(t, err)
+ require.Contains(t, []string{primaryNodeConf.Address, secondaryNodeConf.Address}, streamParams.Primary().Conn.Target(), "must be redirected to primary or secondary")
+
+ var nodeConf config.Node
+ switch streamParams.Primary().Conn.Target() {
+ case primaryNodeConf.Address:
+ nodeConf = primaryNodeConf
+ primaryChosen++
+ case secondaryNodeConf.Address:
+ nodeConf = secondaryNodeConf
+ secondaryChosen++
+ }
+
+ md, ok := metadata.FromOutgoingContext(streamParams.Primary().Ctx)
+ require.True(t, ok)
+ require.Contains(t, md, praefect_metadata.PraefectMetadataKey)
+
+ mi, err := coordinator.registry.LookupMethod(fullMethod)
+ require.NoError(t, err)
+ require.Equal(t, protoregistry.OpAccessor, mi.Operation, "method must be an accessor")
+
+ m, err := protoMessage(mi, streamParams.Primary().Msg)
+ require.NoError(t, err)
+
+ rewrittenTargetRepo, err := mi.TargetRepo(m)
+ require.NoError(t, err)
+ require.Equal(t, nodeConf.Storage, rewrittenTargetRepo.GetStorageName(), "stream director must rewrite the storage name")
+ }
+
+ require.Equal(t, 16, primaryChosen, "primary should have always been chosen")
+ require.Zero(t, secondaryChosen, "secondary should never have been chosen")
+ })
+
t.Run("forwards accessor operations only to healthy nodes", func(t *testing.T) {
healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING)
@@ -566,6 +615,33 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) {
require.Equal(t, "gitaly-1", rewrittenTargetRepo.GetStorageName(), "stream director must rewrite the storage name")
})
+ t.Run("fails if force-routing to unhealthy primary", func(t *testing.T) {
+ primaryHealthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING)
+
+ shard, err := nodeMgr.GetShard(ctx, conf.VirtualStorages[0].Name)
+ require.NoError(t, err)
+
+ primaryGitaly, err := shard.GetNode(primaryNodeConf.Storage)
+ require.NoError(t, err)
+ waitNodeToChangeHealthStatus(ctx, t, primaryGitaly, false)
+ defer func() {
+ primaryHealthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
+ waitNodeToChangeHealthStatus(ctx, t, primaryGitaly, true)
+ }()
+
+ frame, err := proto.Marshal(&gitalypb.FindAllBranchesRequest{Repository: &targetRepo})
+ require.NoError(t, err)
+
+ fullMethod := "/gitaly.RefService/FindAllBranches"
+
+ ctx := correlation.ContextWithCorrelation(ctx, "my-correlation-id")
+ ctx = testhelper.MergeIncomingMetadata(ctx, metadata.Pairs(routeRepositoryAccessorPolicy, routeRepositoryAccessorPolicyPrimaryOnly))
+
+ peeker := &mockPeeker{frame: frame}
+ _, err = coordinator.StreamDirector(ctx, fullMethod, peeker)
+ require.True(t, errors.Is(err, nodes.ErrPrimaryNotHealthy))
+ })
+
t.Run("doesn't forward mutator operations", func(t *testing.T) {
frame, err := proto.Marshal(&gitalypb.UserUpdateBranchRequest{Repository: &targetRepo})
require.NoError(t, err)
diff --git a/internal/praefect/router_node_manager.go b/internal/praefect/router_node_manager.go
index 8cb9ddfd1..ca45fed88 100644
--- a/internal/praefect/router_node_manager.go
+++ b/internal/praefect/router_node_manager.go
@@ -34,6 +34,15 @@ func NewNodeManagerRouter(mgr nodes.Manager, rs datastore.RepositoryStore) Route
}
func (r *nodeManagerRouter) RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string) (RouterNode, error) {
+ if shouldRouteRepositoryAccessorToPrimary(ctx) {
+ shard, err := r.mgr.GetShard(ctx, virtualStorage)
+ if err != nil {
+ return RouterNode{}, fmt.Errorf("get shard: %w", err)
+ }
+
+ return toRouterNode(shard.Primary), nil
+ }
+
node, err := r.mgr.GetSyncedNode(ctx, virtualStorage, relativePath)
if err != nil {
return RouterNode{}, fmt.Errorf("get synced node: %w", err)
diff --git a/internal/praefect/router_per_repository.go b/internal/praefect/router_per_repository.go
index 5e95f006d..317e0e21a 100644
--- a/internal/praefect/router_per_repository.go
+++ b/internal/praefect/router_per_repository.go
@@ -8,6 +8,12 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
"google.golang.org/grpc"
+ "google.golang.org/grpc/metadata"
+)
+
+const (
+ routeRepositoryAccessorPolicy = "gitaly-route-repository-accessor-policy"
+ routeRepositoryAccessorPolicyPrimaryOnly = "primary-only"
)
// errRepositoryNotFound is retuned when trying to operate on a non-existent repository.
@@ -125,12 +131,41 @@ func (r *PerRepositoryRouter) RouteStorageMutator(ctx context.Context, virtualSt
return StorageMutatorRoute{}, errors.New("RouteStorageMutator is not implemented on PerRepositoryRouter")
}
+func shouldRouteRepositoryAccessorToPrimary(ctx context.Context) bool {
+ md, ok := metadata.FromIncomingContext(ctx)
+ if !ok {
+ return false
+ }
+
+ header := md.Get(routeRepositoryAccessorPolicy)
+ if len(header) == 0 {
+ return false
+ }
+
+ return header[0] == routeRepositoryAccessorPolicyPrimaryOnly
+}
+
func (r *PerRepositoryRouter) RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string) (RouterNode, error) {
healthyNodes, err := r.healthyNodes(virtualStorage)
if err != nil {
return RouterNode{}, err
}
+ if shouldRouteRepositoryAccessorToPrimary(ctx) {
+ primary, err := r.pg.GetPrimary(ctx, virtualStorage, relativePath)
+ if err != nil {
+ return RouterNode{}, fmt.Errorf("get primary: %w", err)
+ }
+
+ for _, node := range healthyNodes {
+ if node.Storage == primary {
+ return node, nil
+ }
+ }
+
+ return RouterNode{}, nodes.ErrPrimaryNotHealthy
+ }
+
consistentStorages, err := r.rs.GetConsistentStorages(ctx, virtualStorage, relativePath)
if err != nil {
return RouterNode{}, fmt.Errorf("consistent storages: %w", err)
diff --git a/internal/praefect/router_per_repository_test.go b/internal/praefect/router_per_repository_test.go
index aaa0a1171..0c09f2d9b 100644
--- a/internal/praefect/router_per_repository_test.go
+++ b/internal/praefect/router_per_repository_test.go
@@ -10,6 +10,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"google.golang.org/grpc"
+ "google.golang.org/grpc/metadata"
)
// StaticRepositoryAssignments is a static assignment of storages for each individual repository.
@@ -109,6 +110,7 @@ func TestPerRepositoryRouter_RouteRepositoryAccessor(t *testing.T) {
desc string
virtualStorage string
healthyNodes StaticHealthChecker
+ metadata map[string]string
numCandidates int
pickCandidate int
error error
@@ -162,11 +164,35 @@ func TestPerRepositoryRouter_RouteRepositoryAccessor(t *testing.T) {
},
error: ErrNoSuitableNode,
},
+ {
+ desc: "primary force-picked",
+ virtualStorage: "virtual-storage-1",
+ healthyNodes: map[string][]string{
+ "virtual-storage-1": {"primary", "consistent-secondary"},
+ },
+ metadata: map[string]string{
+ routeRepositoryAccessorPolicy: routeRepositoryAccessorPolicyPrimaryOnly,
+ },
+ node: "primary",
+ },
+ {
+ desc: "secondary not picked if force-picking unhealthy primary",
+ virtualStorage: "virtual-storage-1",
+ healthyNodes: map[string][]string{
+ "virtual-storage-1": {"consistent-secondary"},
+ },
+ metadata: map[string]string{
+ routeRepositoryAccessorPolicy: routeRepositoryAccessorPolicyPrimaryOnly,
+ },
+ error: nodes.ErrPrimaryNotHealthy,
+ },
} {
t.Run(tc.desc, func(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
+ ctx = testhelper.MergeIncomingMetadata(ctx, metadata.New(tc.metadata))
+
conns := Connections{
"virtual-storage-1": {
"primary": &grpc.ClientConn{},