diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2021-10-22 14:34:23 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2021-11-10 12:20:48 +0300 |
commit | c90ed8b02c660e16a5cf4137d6183fee5418ae03 (patch) | |
tree | 32ccac2e8c30a0ab9d2662fbcd999a33303cc090 | |
parent | e4199889b4b0cb28d6ac0de374aafdd85faaaabb (diff) |
praefect: Return replica path for additional repository from Router
ObjectPoolService operates on multiple repositories in its mutators.
One of these repositories is the repository itself and the second one
is always the object pool. Depending on the operation, either one is
the target repo and the other one is called the additional repository.
As ObjectPool's are not handled in a special manner by Praefect, they
will also get unique relative paths generated for them by Praefect. The
proxied requests need to be then rewritten to use this generated path
instead of the client provided relative path. Prafect is already rewriting
the relative paths of the target repository but not the relative path of
the additional repository. This commit extends the Router to returns the
replica path of the additional repository if one is included in the request.
-rw-r--r-- | internal/praefect/coordinator.go | 9 | ||||
-rw-r--r-- | internal/praefect/router.go | 5 | ||||
-rw-r--r-- | internal/praefect/router_node_manager.go | 11 | ||||
-rw-r--r-- | internal/praefect/router_per_repository.go | 21 | ||||
-rw-r--r-- | internal/praefect/router_per_repository_test.go | 56 |
5 files changed, 79 insertions, 23 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index a22fad536..0eb1f690f 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -475,7 +475,14 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall return nil, fmt.Errorf("route repository creation: %w", err) } default: - route, err = c.router.RouteRepositoryMutator(ctx, virtualStorage, targetRepo.RelativePath) + var additionalRepoRelativePath string + if additionalRepo, ok, err := call.methodInfo.AdditionalRepo(call.msg); err != nil { + return nil, helper.ErrInvalidArgument(err) + } else if ok { + additionalRepoRelativePath = additionalRepo.GetRelativePath() + } + + route, err = c.router.RouteRepositoryMutator(ctx, virtualStorage, targetRepo.RelativePath, additionalRepoRelativePath) if err != nil { if errors.Is(err, ErrRepositoryReadOnly) { return nil, err diff --git a/internal/praefect/router.go b/internal/praefect/router.go index f4af466a4..8c3ec9a5c 100644 --- a/internal/praefect/router.go +++ b/internal/praefect/router.go @@ -37,6 +37,9 @@ type RepositoryMutatorRoute struct { RepositoryID int64 // ReplicaPath is the disk path where the replicas are stored. ReplicaPath string + // AdditionalReplicaPath is the disk path where the possible additional repository in the request + // is stored. This is only used for object pools. + AdditionalReplicaPath string // Primary is the primary node of the transaction. Primary RouterNode // Secondaries are the secondary participating in a transaction. @@ -59,7 +62,7 @@ type Router interface { // RouteRepositoryMutatorTransaction returns the primary and secondaries that should handle the repository mutator request. // Additionally, it returns nodes which should have the change replicated to. RouteRepositoryMutator should only be used // with existing repositories. - RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath string) (RepositoryMutatorRoute, error) + RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error) // RouteRepositoryCreation decides returns the primary and secondaries that should handle the repository creation // request. It is up to the caller to store the assignments and primary information after finishing the RPC. RouteRepositoryCreation(ctx context.Context, virtualStorage, relativePath string) (RepositoryMutatorRoute, error) diff --git a/internal/praefect/router_node_manager.go b/internal/praefect/router_node_manager.go index 10a14c43f..7793c4d6d 100644 --- a/internal/praefect/router_node_manager.go +++ b/internal/praefect/router_node_manager.go @@ -74,7 +74,7 @@ func (r *nodeManagerRouter) RouteStorageMutator(ctx context.Context, virtualStor }, nil } -func (r *nodeManagerRouter) RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath string) (RepositoryMutatorRoute, error) { +func (r *nodeManagerRouter) RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath, additionalRelativePath string) (RepositoryMutatorRoute, error) { shard, err := r.mgr.GetShard(ctx, virtualStorage) if err != nil { return RepositoryMutatorRoute{}, fmt.Errorf("get shard: %w", err) @@ -115,10 +115,11 @@ func (r *nodeManagerRouter) RouteRepositoryMutator(ctx context.Context, virtualS } return RepositoryMutatorRoute{ - ReplicaPath: relativePath, - Primary: toRouterNode(shard.Primary), - Secondaries: toRouterNodes(participatingSecondaries), - ReplicationTargets: replicationTargets, + ReplicaPath: relativePath, + AdditionalReplicaPath: additionalRelativePath, + Primary: toRouterNode(shard.Primary), + Secondaries: toRouterNodes(participatingSecondaries), + ReplicationTargets: replicationTargets, }, nil } diff --git a/internal/praefect/router_per_repository.go b/internal/praefect/router_per_repository.go index 84c0097a2..a41c0938e 100644 --- a/internal/praefect/router_per_repository.go +++ b/internal/praefect/router_per_repository.go @@ -189,7 +189,7 @@ func (r *PerRepositoryRouter) RouteRepositoryAccessor(ctx context.Context, virtu }, nil } -func (r *PerRepositoryRouter) RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath string) (RepositoryMutatorRoute, error) { +func (r *PerRepositoryRouter) RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath, additionalRelativePath string) (RepositoryMutatorRoute, error) { healthyNodes, err := r.healthyNodes(virtualStorage) if err != nil { return RepositoryMutatorRoute{}, err @@ -200,6 +200,19 @@ func (r *PerRepositoryRouter) RouteRepositoryMutator(ctx context.Context, virtua return RepositoryMutatorRoute{}, fmt.Errorf("get repository id: %w", err) } + var additionalReplicaPath string + if additionalRelativePath != "" { + additionalRepositoryID, err := r.rs.GetRepositoryID(ctx, virtualStorage, additionalRelativePath) + if err != nil { + return RepositoryMutatorRoute{}, fmt.Errorf("get additional repository id: %w", err) + } + + additionalReplicaPath, err = r.rs.GetReplicaPath(ctx, additionalRepositoryID) + if err != nil { + return RepositoryMutatorRoute{}, fmt.Errorf("get additional repository replica path: %w", err) + } + } + primary, err := r.pg.GetPrimary(ctx, virtualStorage, repositoryID) if err != nil { return RepositoryMutatorRoute{}, fmt.Errorf("get primary: %w", err) @@ -228,7 +241,11 @@ func (r *PerRepositoryRouter) RouteRepositoryMutator(ctx context.Context, virtua return RepositoryMutatorRoute{}, fmt.Errorf("get host assignments: %w", err) } - route := RepositoryMutatorRoute{RepositoryID: repositoryID, ReplicaPath: replicaPath} + route := RepositoryMutatorRoute{ + RepositoryID: repositoryID, + ReplicaPath: replicaPath, + AdditionalReplicaPath: additionalReplicaPath, + } for _, assigned := range assignedStorages { node, healthy := healthySet[assigned] if assigned == primary { diff --git a/internal/praefect/router_per_repository_test.go b/internal/praefect/router_per_repository_test.go index 8f50a1ac5..8372422d4 100644 --- a/internal/praefect/router_per_repository_test.go +++ b/internal/praefect/router_per_repository_test.go @@ -276,14 +276,15 @@ func TestPerRepositoryRouter_RouteRepositoryMutator(t *testing.T) { } for _, tc := range []struct { - desc string - virtualStorage string - healthyNodes StaticHealthChecker - consistentStorages []string - secondaries []string - replicationTargets []string - error error - assignedNodes StaticRepositoryAssignments + desc string + virtualStorage string + healthyNodes StaticHealthChecker + consistentStorages []string + secondaries []string + replicationTargets []string + error error + assignedNodes StaticRepositoryAssignments + noAdditionalRepository bool }{ { desc: "unknown virtual storage", @@ -312,6 +313,14 @@ func TestPerRepositoryRouter_RouteRepositoryMutator(t *testing.T) { secondaries: []string{"secondary-1", "secondary-2"}, }, { + desc: "no additional repository", + virtualStorage: "virtual-storage-1", + healthyNodes: StaticHealthChecker(configuredNodes), + consistentStorages: []string{"primary", "secondary-1", "secondary-2"}, + secondaries: []string{"secondary-1", "secondary-2"}, + noAdditionalRepository: true, + }, + { desc: "inconsistent secondary", virtualStorage: "virtual-storage-1", healthyNodes: StaticHealthChecker(configuredNodes), @@ -368,14 +377,25 @@ func TestPerRepositoryRouter_RouteRepositoryMutator(t *testing.T) { testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": configuredNodes}) - const relativePath = "repository" + const ( + virtualStorage = "virtual-storage-1" + relativePath = "repository" + additionalRelativePath = "additional-repository" + additionalReplicaPath = "additional-replica-path" + ) rs := datastore.NewPostgresRepositoryStore(tx, nil) - repositoryID, err := rs.ReserveRepositoryID(ctx, "virtual-storage-1", relativePath) + repositoryID, err := rs.ReserveRepositoryID(ctx, virtualStorage, relativePath) require.NoError(t, err) require.NoError(t, - rs.CreateRepository(ctx, repositoryID, "virtual-storage-1", relativePath, relativePath, "primary", []string{"secondary-1", "secondary-2"}, nil, true, false), + rs.CreateRepository(ctx, repositoryID, virtualStorage, relativePath, relativePath, "primary", []string{"secondary-1", "secondary-2"}, nil, true, false), + ) + + additionalRepositoryID, err := rs.ReserveRepositoryID(ctx, virtualStorage, additionalRelativePath) + require.NoError(t, err) + require.NoError(t, + rs.CreateRepository(ctx, additionalRepositoryID, virtualStorage, additionalRelativePath, additionalReplicaPath, "primary", nil, nil, true, false), ) if len(tc.consistentStorages) > 0 { @@ -405,7 +425,14 @@ func TestPerRepositoryRouter_RouteRepositoryMutator(t *testing.T) { nil, ) - route, err := router.RouteRepositoryMutator(ctx, tc.virtualStorage, relativePath) + requestAdditionalRelativePath := additionalRelativePath + expectedAdditionalReplicaPath := additionalReplicaPath + if tc.noAdditionalRepository { + expectedAdditionalReplicaPath = "" + requestAdditionalRelativePath = "" + } + + route, err := router.RouteRepositoryMutator(ctx, tc.virtualStorage, relativePath, requestAdditionalRelativePath) require.Equal(t, tc.error, err) if err == nil { var secondaries []RouterNode @@ -417,8 +444,9 @@ func TestPerRepositoryRouter_RouteRepositoryMutator(t *testing.T) { } require.Equal(t, RepositoryMutatorRoute{ - RepositoryID: repositoryID, - ReplicaPath: relativePath, + RepositoryID: repositoryID, + ReplicaPath: relativePath, + AdditionalReplicaPath: expectedAdditionalReplicaPath, Primary: RouterNode{ Storage: "primary", Connection: conns[tc.virtualStorage]["primary"], |