diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-02-03 16:57:36 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-02-04 14:44:56 +0300 |
commit | 4d877d7d556449ae8bda17c399de1b0b300b30a0 (patch) | |
tree | 5793747e9a489aecc66447b0280d93790f5b185a | |
parent | 8ee19cc5ecc4692b3c0c62ea4ac85dc5c64988a5 (diff) |
praefect: Implement force-routing to primary for per-repo router
With reads distribution being enabled, requests would typically be
routed to a random up-to-date node. There is situations though where not
even up-to-date nodes would be able to serve a request, e.g. when object
quarantine is in use.
This commit implements force-routing to the primary for the
per-repository router. It's implemented a new GRPC metadata header
"gitaly-route-repository-accessor-to-primary": if set, a
repository-scoped accessors will always get routed to the primary.
-rw-r--r-- | internal/praefect/router_per_repository.go | 35 | ||||
-rw-r--r-- | internal/praefect/router_per_repository_test.go | 26 |
2 files changed, 61 insertions, 0 deletions
diff --git a/internal/praefect/router_per_repository.go b/internal/praefect/router_per_repository.go index 5e95f006d..317e0e21a 100644 --- a/internal/praefect/router_per_repository.go +++ b/internal/praefect/router_per_repository.go @@ -8,6 +8,12 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +const ( + routeRepositoryAccessorPolicy = "gitaly-route-repository-accessor-policy" + routeRepositoryAccessorPolicyPrimaryOnly = "primary-only" ) // errRepositoryNotFound is retuned when trying to operate on a non-existent repository. @@ -125,12 +131,41 @@ func (r *PerRepositoryRouter) RouteStorageMutator(ctx context.Context, virtualSt return StorageMutatorRoute{}, errors.New("RouteStorageMutator is not implemented on PerRepositoryRouter") } +func shouldRouteRepositoryAccessorToPrimary(ctx context.Context) bool { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return false + } + + header := md.Get(routeRepositoryAccessorPolicy) + if len(header) == 0 { + return false + } + + return header[0] == routeRepositoryAccessorPolicyPrimaryOnly +} + func (r *PerRepositoryRouter) RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string) (RouterNode, error) { healthyNodes, err := r.healthyNodes(virtualStorage) if err != nil { return RouterNode{}, err } + if shouldRouteRepositoryAccessorToPrimary(ctx) { + primary, err := r.pg.GetPrimary(ctx, virtualStorage, relativePath) + if err != nil { + return RouterNode{}, fmt.Errorf("get primary: %w", err) + } + + for _, node := range healthyNodes { + if node.Storage == primary { + return node, nil + } + } + + return RouterNode{}, nodes.ErrPrimaryNotHealthy + } + consistentStorages, err := r.rs.GetConsistentStorages(ctx, virtualStorage, relativePath) if err != nil { return RouterNode{}, fmt.Errorf("consistent storages: %w", err) diff --git a/internal/praefect/router_per_repository_test.go b/internal/praefect/router_per_repository_test.go index aaa0a1171..0c09f2d9b 100644 --- a/internal/praefect/router_per_repository_test.go +++ b/internal/praefect/router_per_repository_test.go @@ -10,6 +10,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/internal/testhelper" "google.golang.org/grpc" + "google.golang.org/grpc/metadata" ) // StaticRepositoryAssignments is a static assignment of storages for each individual repository. @@ -109,6 +110,7 @@ func TestPerRepositoryRouter_RouteRepositoryAccessor(t *testing.T) { desc string virtualStorage string healthyNodes StaticHealthChecker + metadata map[string]string numCandidates int pickCandidate int error error @@ -162,11 +164,35 @@ func TestPerRepositoryRouter_RouteRepositoryAccessor(t *testing.T) { }, error: ErrNoSuitableNode, }, + { + desc: "primary force-picked", + virtualStorage: "virtual-storage-1", + healthyNodes: map[string][]string{ + "virtual-storage-1": {"primary", "consistent-secondary"}, + }, + metadata: map[string]string{ + routeRepositoryAccessorPolicy: routeRepositoryAccessorPolicyPrimaryOnly, + }, + node: "primary", + }, + { + desc: "secondary not picked if force-picking unhealthy primary", + virtualStorage: "virtual-storage-1", + healthyNodes: map[string][]string{ + "virtual-storage-1": {"consistent-secondary"}, + }, + metadata: map[string]string{ + routeRepositoryAccessorPolicy: routeRepositoryAccessorPolicyPrimaryOnly, + }, + error: nodes.ErrPrimaryNotHealthy, + }, } { t.Run(tc.desc, func(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() + ctx = testhelper.MergeIncomingMetadata(ctx, metadata.New(tc.metadata)) + conns := Connections{ "virtual-storage-1": { "primary": &grpc.ClientConn{}, |