diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-02-03 16:57:36 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-02-04 14:49:06 +0300 |
commit | 1102b0b67283c229f75cf10fec4caa2fb7bfa3ad (patch) | |
tree | 2c4f94fc652d0db3c94de3f273fc7032c35e6328 | |
parent | 4d877d7d556449ae8bda17c399de1b0b300b30a0 (diff) |
praefect: Implement force-routing to primary for node-manager router
With reads distribution being enabled, requests would typically be
routed to a random up-to-date node. There is situations though where not
even up-to-date nodes would be able to serve a request, e.g. when object
quarantine is in use.
This commit implements force-routing to the primary for the node-manager
router. It's implemented a new GRPC metadata header
"gitaly-route-repository-accessor-to-primary": if set, a
repository-scoped accessors will always get routed to the primary.
-rw-r--r-- | internal/praefect/coordinator_test.go | 78 | ||||
-rw-r--r-- | internal/praefect/router_node_manager.go | 9 |
2 files changed, 86 insertions, 1 deletions
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 43fec982e..cd2e8f449 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -418,7 +418,7 @@ func TestStreamDirectorAccessor(t *testing.T) { func TestCoordinatorStreamDirector_distributesReads(t *testing.T) { gitalySocket0, gitalySocket1 := testhelper.GetTemporaryGitalySocketFileName(t), testhelper.GetTemporaryGitalySocketFileName(t) - srv1, _ := testhelper.NewServerWithHealth(t, gitalySocket0) + srv1, primaryHealthSrv := testhelper.NewServerWithHealth(t, gitalySocket0) defer srv1.Stop() srv2, healthSrv := testhelper.NewServerWithHealth(t, gitalySocket1) defer srv2.Stop() @@ -526,6 +526,55 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) { require.NotZero(t, secondaryChosen, "secondary should have been chosen at least once") }) + t.Run("forwards accessor to primary if force-routing", func(t *testing.T) { + var primaryChosen int + var secondaryChosen int + + for i := 0; i < 16; i++ { + frame, err := proto.Marshal(&gitalypb.FindAllBranchesRequest{Repository: &targetRepo}) + require.NoError(t, err) + + fullMethod := "/gitaly.RefService/FindAllBranches" + + peeker := &mockPeeker{frame: frame} + + ctx := correlation.ContextWithCorrelation(ctx, "my-correlation-id") + ctx = testhelper.MergeIncomingMetadata(ctx, metadata.Pairs(routeRepositoryAccessorPolicy, routeRepositoryAccessorPolicyPrimaryOnly)) + + streamParams, err := coordinator.StreamDirector(ctx, fullMethod, peeker) + require.NoError(t, err) + require.Contains(t, []string{primaryNodeConf.Address, secondaryNodeConf.Address}, streamParams.Primary().Conn.Target(), "must be redirected to primary or secondary") + + var nodeConf config.Node + switch streamParams.Primary().Conn.Target() { + case primaryNodeConf.Address: + nodeConf = primaryNodeConf + primaryChosen++ + case secondaryNodeConf.Address: + nodeConf = secondaryNodeConf + secondaryChosen++ + } + + md, ok := metadata.FromOutgoingContext(streamParams.Primary().Ctx) + require.True(t, ok) + require.Contains(t, md, praefect_metadata.PraefectMetadataKey) + + mi, err := coordinator.registry.LookupMethod(fullMethod) + require.NoError(t, err) + require.Equal(t, protoregistry.OpAccessor, mi.Operation, "method must be an accessor") + + m, err := protoMessage(mi, streamParams.Primary().Msg) + require.NoError(t, err) + + rewrittenTargetRepo, err := mi.TargetRepo(m) + require.NoError(t, err) + require.Equal(t, nodeConf.Storage, rewrittenTargetRepo.GetStorageName(), "stream director must rewrite the storage name") + } + + require.Equal(t, 16, primaryChosen, "primary should have always been chosen") + require.Zero(t, secondaryChosen, "secondary should never have been chosen") + }) + t.Run("forwards accessor operations only to healthy nodes", func(t *testing.T) { healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING) @@ -566,6 +615,33 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) { require.Equal(t, "gitaly-1", rewrittenTargetRepo.GetStorageName(), "stream director must rewrite the storage name") }) + t.Run("fails if force-routing to unhealthy primary", func(t *testing.T) { + primaryHealthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING) + + shard, err := nodeMgr.GetShard(ctx, conf.VirtualStorages[0].Name) + require.NoError(t, err) + + primaryGitaly, err := shard.GetNode(primaryNodeConf.Storage) + require.NoError(t, err) + waitNodeToChangeHealthStatus(ctx, t, primaryGitaly, false) + defer func() { + primaryHealthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING) + waitNodeToChangeHealthStatus(ctx, t, primaryGitaly, true) + }() + + frame, err := proto.Marshal(&gitalypb.FindAllBranchesRequest{Repository: &targetRepo}) + require.NoError(t, err) + + fullMethod := "/gitaly.RefService/FindAllBranches" + + ctx := correlation.ContextWithCorrelation(ctx, "my-correlation-id") + ctx = testhelper.MergeIncomingMetadata(ctx, metadata.Pairs(routeRepositoryAccessorPolicy, routeRepositoryAccessorPolicyPrimaryOnly)) + + peeker := &mockPeeker{frame: frame} + _, err = coordinator.StreamDirector(ctx, fullMethod, peeker) + require.True(t, errors.Is(err, nodes.ErrPrimaryNotHealthy)) + }) + t.Run("doesn't forward mutator operations", func(t *testing.T) { frame, err := proto.Marshal(&gitalypb.UserUpdateBranchRequest{Repository: &targetRepo}) require.NoError(t, err) diff --git a/internal/praefect/router_node_manager.go b/internal/praefect/router_node_manager.go index 8cb9ddfd1..ca45fed88 100644 --- a/internal/praefect/router_node_manager.go +++ b/internal/praefect/router_node_manager.go @@ -34,6 +34,15 @@ func NewNodeManagerRouter(mgr nodes.Manager, rs datastore.RepositoryStore) Route } func (r *nodeManagerRouter) RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string) (RouterNode, error) { + if shouldRouteRepositoryAccessorToPrimary(ctx) { + shard, err := r.mgr.GetShard(ctx, virtualStorage) + if err != nil { + return RouterNode{}, fmt.Errorf("get shard: %w", err) + } + + return toRouterNode(shard.Primary), nil + } + node, err := r.mgr.GetSyncedNode(ctx, virtualStorage, relativePath) if err != nil { return RouterNode{}, fmt.Errorf("get synced node: %w", err) |