Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2021-02-03 16:57:36 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2021-02-04 14:49:06 +0300
commit1102b0b67283c229f75cf10fec4caa2fb7bfa3ad (patch)
tree2c4f94fc652d0db3c94de3f273fc7032c35e6328
parent4d877d7d556449ae8bda17c399de1b0b300b30a0 (diff)
praefect: Implement force-routing to primary for node-manager router
With reads distribution being enabled, requests would typically be routed to a random up-to-date node. There is situations though where not even up-to-date nodes would be able to serve a request, e.g. when object quarantine is in use. This commit implements force-routing to the primary for the node-manager router. It's implemented a new GRPC metadata header "gitaly-route-repository-accessor-to-primary": if set, a repository-scoped accessors will always get routed to the primary.
-rw-r--r--internal/praefect/coordinator_test.go78
-rw-r--r--internal/praefect/router_node_manager.go9
2 files changed, 86 insertions, 1 deletions
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 43fec982e..cd2e8f449 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -418,7 +418,7 @@ func TestStreamDirectorAccessor(t *testing.T) {
func TestCoordinatorStreamDirector_distributesReads(t *testing.T) {
gitalySocket0, gitalySocket1 := testhelper.GetTemporaryGitalySocketFileName(t), testhelper.GetTemporaryGitalySocketFileName(t)
- srv1, _ := testhelper.NewServerWithHealth(t, gitalySocket0)
+ srv1, primaryHealthSrv := testhelper.NewServerWithHealth(t, gitalySocket0)
defer srv1.Stop()
srv2, healthSrv := testhelper.NewServerWithHealth(t, gitalySocket1)
defer srv2.Stop()
@@ -526,6 +526,55 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) {
require.NotZero(t, secondaryChosen, "secondary should have been chosen at least once")
})
+ t.Run("forwards accessor to primary if force-routing", func(t *testing.T) {
+ var primaryChosen int
+ var secondaryChosen int
+
+ for i := 0; i < 16; i++ {
+ frame, err := proto.Marshal(&gitalypb.FindAllBranchesRequest{Repository: &targetRepo})
+ require.NoError(t, err)
+
+ fullMethod := "/gitaly.RefService/FindAllBranches"
+
+ peeker := &mockPeeker{frame: frame}
+
+ ctx := correlation.ContextWithCorrelation(ctx, "my-correlation-id")
+ ctx = testhelper.MergeIncomingMetadata(ctx, metadata.Pairs(routeRepositoryAccessorPolicy, routeRepositoryAccessorPolicyPrimaryOnly))
+
+ streamParams, err := coordinator.StreamDirector(ctx, fullMethod, peeker)
+ require.NoError(t, err)
+ require.Contains(t, []string{primaryNodeConf.Address, secondaryNodeConf.Address}, streamParams.Primary().Conn.Target(), "must be redirected to primary or secondary")
+
+ var nodeConf config.Node
+ switch streamParams.Primary().Conn.Target() {
+ case primaryNodeConf.Address:
+ nodeConf = primaryNodeConf
+ primaryChosen++
+ case secondaryNodeConf.Address:
+ nodeConf = secondaryNodeConf
+ secondaryChosen++
+ }
+
+ md, ok := metadata.FromOutgoingContext(streamParams.Primary().Ctx)
+ require.True(t, ok)
+ require.Contains(t, md, praefect_metadata.PraefectMetadataKey)
+
+ mi, err := coordinator.registry.LookupMethod(fullMethod)
+ require.NoError(t, err)
+ require.Equal(t, protoregistry.OpAccessor, mi.Operation, "method must be an accessor")
+
+ m, err := protoMessage(mi, streamParams.Primary().Msg)
+ require.NoError(t, err)
+
+ rewrittenTargetRepo, err := mi.TargetRepo(m)
+ require.NoError(t, err)
+ require.Equal(t, nodeConf.Storage, rewrittenTargetRepo.GetStorageName(), "stream director must rewrite the storage name")
+ }
+
+ require.Equal(t, 16, primaryChosen, "primary should have always been chosen")
+ require.Zero(t, secondaryChosen, "secondary should never have been chosen")
+ })
+
t.Run("forwards accessor operations only to healthy nodes", func(t *testing.T) {
healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING)
@@ -566,6 +615,33 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) {
require.Equal(t, "gitaly-1", rewrittenTargetRepo.GetStorageName(), "stream director must rewrite the storage name")
})
+ t.Run("fails if force-routing to unhealthy primary", func(t *testing.T) {
+ primaryHealthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING)
+
+ shard, err := nodeMgr.GetShard(ctx, conf.VirtualStorages[0].Name)
+ require.NoError(t, err)
+
+ primaryGitaly, err := shard.GetNode(primaryNodeConf.Storage)
+ require.NoError(t, err)
+ waitNodeToChangeHealthStatus(ctx, t, primaryGitaly, false)
+ defer func() {
+ primaryHealthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
+ waitNodeToChangeHealthStatus(ctx, t, primaryGitaly, true)
+ }()
+
+ frame, err := proto.Marshal(&gitalypb.FindAllBranchesRequest{Repository: &targetRepo})
+ require.NoError(t, err)
+
+ fullMethod := "/gitaly.RefService/FindAllBranches"
+
+ ctx := correlation.ContextWithCorrelation(ctx, "my-correlation-id")
+ ctx = testhelper.MergeIncomingMetadata(ctx, metadata.Pairs(routeRepositoryAccessorPolicy, routeRepositoryAccessorPolicyPrimaryOnly))
+
+ peeker := &mockPeeker{frame: frame}
+ _, err = coordinator.StreamDirector(ctx, fullMethod, peeker)
+ require.True(t, errors.Is(err, nodes.ErrPrimaryNotHealthy))
+ })
+
t.Run("doesn't forward mutator operations", func(t *testing.T) {
frame, err := proto.Marshal(&gitalypb.UserUpdateBranchRequest{Repository: &targetRepo})
require.NoError(t, err)
diff --git a/internal/praefect/router_node_manager.go b/internal/praefect/router_node_manager.go
index 8cb9ddfd1..ca45fed88 100644
--- a/internal/praefect/router_node_manager.go
+++ b/internal/praefect/router_node_manager.go
@@ -34,6 +34,15 @@ func NewNodeManagerRouter(mgr nodes.Manager, rs datastore.RepositoryStore) Route
}
func (r *nodeManagerRouter) RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string) (RouterNode, error) {
+ if shouldRouteRepositoryAccessorToPrimary(ctx) {
+ shard, err := r.mgr.GetShard(ctx, virtualStorage)
+ if err != nil {
+ return RouterNode{}, fmt.Errorf("get shard: %w", err)
+ }
+
+ return toRouterNode(shard.Primary), nil
+ }
+
node, err := r.mgr.GetSyncedNode(ctx, virtualStorage, relativePath)
if err != nil {
return RouterNode{}, fmt.Errorf("get synced node: %w", err)