diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-02-04 15:14:52 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-02-04 15:14:52 +0300 |
commit | 0da32707243b9c339f75cd5a1c03b00c88502f86 (patch) | |
tree | a3c55fbb7bb64401541fbda6aac1f17a470d9efe | |
parent | 0460209dab07b05be09be6fc3d050211fdf4ded5 (diff) | |
parent | 1102b0b67283c229f75cf10fec4caa2fb7bfa3ad (diff) |
Merge branch 'pks-praefect-forced-primary-routing' into 'master'
praefect: Implement ability to force-route repo-scoped accessors to the primary
See merge request gitlab-org/gitaly!3093
-rw-r--r-- | internal/praefect/coordinator_test.go | 78 | ||||
-rw-r--r-- | internal/praefect/router_node_manager.go | 9 | ||||
-rw-r--r-- | internal/praefect/router_per_repository.go | 35 | ||||
-rw-r--r-- | internal/praefect/router_per_repository_test.go | 26 |
4 files changed, 147 insertions, 1 deletions
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 43fec982e..cd2e8f449 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -418,7 +418,7 @@ func TestStreamDirectorAccessor(t *testing.T) { func TestCoordinatorStreamDirector_distributesReads(t *testing.T) { gitalySocket0, gitalySocket1 := testhelper.GetTemporaryGitalySocketFileName(t), testhelper.GetTemporaryGitalySocketFileName(t) - srv1, _ := testhelper.NewServerWithHealth(t, gitalySocket0) + srv1, primaryHealthSrv := testhelper.NewServerWithHealth(t, gitalySocket0) defer srv1.Stop() srv2, healthSrv := testhelper.NewServerWithHealth(t, gitalySocket1) defer srv2.Stop() @@ -526,6 +526,55 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) { require.NotZero(t, secondaryChosen, "secondary should have been chosen at least once") }) + t.Run("forwards accessor to primary if force-routing", func(t *testing.T) { + var primaryChosen int + var secondaryChosen int + + for i := 0; i < 16; i++ { + frame, err := proto.Marshal(&gitalypb.FindAllBranchesRequest{Repository: &targetRepo}) + require.NoError(t, err) + + fullMethod := "/gitaly.RefService/FindAllBranches" + + peeker := &mockPeeker{frame: frame} + + ctx := correlation.ContextWithCorrelation(ctx, "my-correlation-id") + ctx = testhelper.MergeIncomingMetadata(ctx, metadata.Pairs(routeRepositoryAccessorPolicy, routeRepositoryAccessorPolicyPrimaryOnly)) + + streamParams, err := coordinator.StreamDirector(ctx, fullMethod, peeker) + require.NoError(t, err) + require.Contains(t, []string{primaryNodeConf.Address, secondaryNodeConf.Address}, streamParams.Primary().Conn.Target(), "must be redirected to primary or secondary") + + var nodeConf config.Node + switch streamParams.Primary().Conn.Target() { + case primaryNodeConf.Address: + nodeConf = primaryNodeConf + primaryChosen++ + case secondaryNodeConf.Address: + nodeConf = secondaryNodeConf + secondaryChosen++ + } + + md, ok := metadata.FromOutgoingContext(streamParams.Primary().Ctx) + require.True(t, ok) + require.Contains(t, md, praefect_metadata.PraefectMetadataKey) + + mi, err := coordinator.registry.LookupMethod(fullMethod) + require.NoError(t, err) + require.Equal(t, protoregistry.OpAccessor, mi.Operation, "method must be an accessor") + + m, err := protoMessage(mi, streamParams.Primary().Msg) + require.NoError(t, err) + + rewrittenTargetRepo, err := mi.TargetRepo(m) + require.NoError(t, err) + require.Equal(t, nodeConf.Storage, rewrittenTargetRepo.GetStorageName(), "stream director must rewrite the storage name") + } + + require.Equal(t, 16, primaryChosen, "primary should have always been chosen") + require.Zero(t, secondaryChosen, "secondary should never have been chosen") + }) + t.Run("forwards accessor operations only to healthy nodes", func(t *testing.T) { healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING) @@ -566,6 +615,33 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) { require.Equal(t, "gitaly-1", rewrittenTargetRepo.GetStorageName(), "stream director must rewrite the storage name") }) + t.Run("fails if force-routing to unhealthy primary", func(t *testing.T) { + primaryHealthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING) + + shard, err := nodeMgr.GetShard(ctx, conf.VirtualStorages[0].Name) + require.NoError(t, err) + + primaryGitaly, err := shard.GetNode(primaryNodeConf.Storage) + require.NoError(t, err) + waitNodeToChangeHealthStatus(ctx, t, primaryGitaly, false) + defer func() { + primaryHealthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING) + waitNodeToChangeHealthStatus(ctx, t, primaryGitaly, true) + }() + + frame, err := proto.Marshal(&gitalypb.FindAllBranchesRequest{Repository: &targetRepo}) + require.NoError(t, err) + + fullMethod := "/gitaly.RefService/FindAllBranches" + + ctx := correlation.ContextWithCorrelation(ctx, "my-correlation-id") + ctx = testhelper.MergeIncomingMetadata(ctx, metadata.Pairs(routeRepositoryAccessorPolicy, routeRepositoryAccessorPolicyPrimaryOnly)) + + peeker := &mockPeeker{frame: frame} + _, err = coordinator.StreamDirector(ctx, fullMethod, peeker) + require.True(t, errors.Is(err, nodes.ErrPrimaryNotHealthy)) + }) + t.Run("doesn't forward mutator operations", func(t *testing.T) { frame, err := proto.Marshal(&gitalypb.UserUpdateBranchRequest{Repository: &targetRepo}) require.NoError(t, err) diff --git a/internal/praefect/router_node_manager.go b/internal/praefect/router_node_manager.go index 8cb9ddfd1..ca45fed88 100644 --- a/internal/praefect/router_node_manager.go +++ b/internal/praefect/router_node_manager.go @@ -34,6 +34,15 @@ func NewNodeManagerRouter(mgr nodes.Manager, rs datastore.RepositoryStore) Route } func (r *nodeManagerRouter) RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string) (RouterNode, error) { + if shouldRouteRepositoryAccessorToPrimary(ctx) { + shard, err := r.mgr.GetShard(ctx, virtualStorage) + if err != nil { + return RouterNode{}, fmt.Errorf("get shard: %w", err) + } + + return toRouterNode(shard.Primary), nil + } + node, err := r.mgr.GetSyncedNode(ctx, virtualStorage, relativePath) if err != nil { return RouterNode{}, fmt.Errorf("get synced node: %w", err) diff --git a/internal/praefect/router_per_repository.go b/internal/praefect/router_per_repository.go index 5e95f006d..317e0e21a 100644 --- a/internal/praefect/router_per_repository.go +++ b/internal/praefect/router_per_repository.go @@ -8,6 +8,12 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +const ( + routeRepositoryAccessorPolicy = "gitaly-route-repository-accessor-policy" + routeRepositoryAccessorPolicyPrimaryOnly = "primary-only" ) // errRepositoryNotFound is retuned when trying to operate on a non-existent repository. @@ -125,12 +131,41 @@ func (r *PerRepositoryRouter) RouteStorageMutator(ctx context.Context, virtualSt return StorageMutatorRoute{}, errors.New("RouteStorageMutator is not implemented on PerRepositoryRouter") } +func shouldRouteRepositoryAccessorToPrimary(ctx context.Context) bool { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return false + } + + header := md.Get(routeRepositoryAccessorPolicy) + if len(header) == 0 { + return false + } + + return header[0] == routeRepositoryAccessorPolicyPrimaryOnly +} + func (r *PerRepositoryRouter) RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string) (RouterNode, error) { healthyNodes, err := r.healthyNodes(virtualStorage) if err != nil { return RouterNode{}, err } + if shouldRouteRepositoryAccessorToPrimary(ctx) { + primary, err := r.pg.GetPrimary(ctx, virtualStorage, relativePath) + if err != nil { + return RouterNode{}, fmt.Errorf("get primary: %w", err) + } + + for _, node := range healthyNodes { + if node.Storage == primary { + return node, nil + } + } + + return RouterNode{}, nodes.ErrPrimaryNotHealthy + } + consistentStorages, err := r.rs.GetConsistentStorages(ctx, virtualStorage, relativePath) if err != nil { return RouterNode{}, fmt.Errorf("consistent storages: %w", err) diff --git a/internal/praefect/router_per_repository_test.go b/internal/praefect/router_per_repository_test.go index aaa0a1171..0c09f2d9b 100644 --- a/internal/praefect/router_per_repository_test.go +++ b/internal/praefect/router_per_repository_test.go @@ -10,6 +10,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/internal/testhelper" "google.golang.org/grpc" + "google.golang.org/grpc/metadata" ) // StaticRepositoryAssignments is a static assignment of storages for each individual repository. @@ -109,6 +110,7 @@ func TestPerRepositoryRouter_RouteRepositoryAccessor(t *testing.T) { desc string virtualStorage string healthyNodes StaticHealthChecker + metadata map[string]string numCandidates int pickCandidate int error error @@ -162,11 +164,35 @@ func TestPerRepositoryRouter_RouteRepositoryAccessor(t *testing.T) { }, error: ErrNoSuitableNode, }, + { + desc: "primary force-picked", + virtualStorage: "virtual-storage-1", + healthyNodes: map[string][]string{ + "virtual-storage-1": {"primary", "consistent-secondary"}, + }, + metadata: map[string]string{ + routeRepositoryAccessorPolicy: routeRepositoryAccessorPolicyPrimaryOnly, + }, + node: "primary", + }, + { + desc: "secondary not picked if force-picking unhealthy primary", + virtualStorage: "virtual-storage-1", + healthyNodes: map[string][]string{ + "virtual-storage-1": {"consistent-secondary"}, + }, + metadata: map[string]string{ + routeRepositoryAccessorPolicy: routeRepositoryAccessorPolicyPrimaryOnly, + }, + error: nodes.ErrPrimaryNotHealthy, + }, } { t.Run(tc.desc, func(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() + ctx = testhelper.MergeIncomingMetadata(ctx, metadata.New(tc.metadata)) + conns := Connections{ "virtual-storage-1": { "primary": &grpc.ClientConn{}, |