Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2021-10-22 14:34:23 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2021-11-10 12:20:48 +0300
commitc90ed8b02c660e16a5cf4137d6183fee5418ae03 (patch)
tree32ccac2e8c30a0ab9d2662fbcd999a33303cc090
parente4199889b4b0cb28d6ac0de374aafdd85faaaabb (diff)
praefect: Return replica path for additional repository from Router
ObjectPoolService operates on multiple repositories in its mutators. One of these repositories is the repository itself and the second one is always the object pool. Depending on the operation, either one is the target repo and the other one is called the additional repository. As ObjectPool's are not handled in a special manner by Praefect, they will also get unique relative paths generated for them by Praefect. The proxied requests need to be then rewritten to use this generated path instead of the client provided relative path. Prafect is already rewriting the relative paths of the target repository but not the relative path of the additional repository. This commit extends the Router to returns the replica path of the additional repository if one is included in the request.
-rw-r--r--internal/praefect/coordinator.go9
-rw-r--r--internal/praefect/router.go5
-rw-r--r--internal/praefect/router_node_manager.go11
-rw-r--r--internal/praefect/router_per_repository.go21
-rw-r--r--internal/praefect/router_per_repository_test.go56
5 files changed, 79 insertions, 23 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index a22fad536..0eb1f690f 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -475,7 +475,14 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
return nil, fmt.Errorf("route repository creation: %w", err)
}
default:
- route, err = c.router.RouteRepositoryMutator(ctx, virtualStorage, targetRepo.RelativePath)
+ var additionalRepoRelativePath string
+ if additionalRepo, ok, err := call.methodInfo.AdditionalRepo(call.msg); err != nil {
+ return nil, helper.ErrInvalidArgument(err)
+ } else if ok {
+ additionalRepoRelativePath = additionalRepo.GetRelativePath()
+ }
+
+ route, err = c.router.RouteRepositoryMutator(ctx, virtualStorage, targetRepo.RelativePath, additionalRepoRelativePath)
if err != nil {
if errors.Is(err, ErrRepositoryReadOnly) {
return nil, err
diff --git a/internal/praefect/router.go b/internal/praefect/router.go
index f4af466a4..8c3ec9a5c 100644
--- a/internal/praefect/router.go
+++ b/internal/praefect/router.go
@@ -37,6 +37,9 @@ type RepositoryMutatorRoute struct {
RepositoryID int64
// ReplicaPath is the disk path where the replicas are stored.
ReplicaPath string
+ // AdditionalReplicaPath is the disk path where the possible additional repository in the request
+ // is stored. This is only used for object pools.
+ AdditionalReplicaPath string
// Primary is the primary node of the transaction.
Primary RouterNode
// Secondaries are the secondary participating in a transaction.
@@ -59,7 +62,7 @@ type Router interface {
// RouteRepositoryMutatorTransaction returns the primary and secondaries that should handle the repository mutator request.
// Additionally, it returns nodes which should have the change replicated to. RouteRepositoryMutator should only be used
// with existing repositories.
- RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath string) (RepositoryMutatorRoute, error)
+ RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error)
// RouteRepositoryCreation decides returns the primary and secondaries that should handle the repository creation
// request. It is up to the caller to store the assignments and primary information after finishing the RPC.
RouteRepositoryCreation(ctx context.Context, virtualStorage, relativePath string) (RepositoryMutatorRoute, error)
diff --git a/internal/praefect/router_node_manager.go b/internal/praefect/router_node_manager.go
index 10a14c43f..7793c4d6d 100644
--- a/internal/praefect/router_node_manager.go
+++ b/internal/praefect/router_node_manager.go
@@ -74,7 +74,7 @@ func (r *nodeManagerRouter) RouteStorageMutator(ctx context.Context, virtualStor
}, nil
}
-func (r *nodeManagerRouter) RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath string) (RepositoryMutatorRoute, error) {
+func (r *nodeManagerRouter) RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath, additionalRelativePath string) (RepositoryMutatorRoute, error) {
shard, err := r.mgr.GetShard(ctx, virtualStorage)
if err != nil {
return RepositoryMutatorRoute{}, fmt.Errorf("get shard: %w", err)
@@ -115,10 +115,11 @@ func (r *nodeManagerRouter) RouteRepositoryMutator(ctx context.Context, virtualS
}
return RepositoryMutatorRoute{
- ReplicaPath: relativePath,
- Primary: toRouterNode(shard.Primary),
- Secondaries: toRouterNodes(participatingSecondaries),
- ReplicationTargets: replicationTargets,
+ ReplicaPath: relativePath,
+ AdditionalReplicaPath: additionalRelativePath,
+ Primary: toRouterNode(shard.Primary),
+ Secondaries: toRouterNodes(participatingSecondaries),
+ ReplicationTargets: replicationTargets,
}, nil
}
diff --git a/internal/praefect/router_per_repository.go b/internal/praefect/router_per_repository.go
index 84c0097a2..a41c0938e 100644
--- a/internal/praefect/router_per_repository.go
+++ b/internal/praefect/router_per_repository.go
@@ -189,7 +189,7 @@ func (r *PerRepositoryRouter) RouteRepositoryAccessor(ctx context.Context, virtu
}, nil
}
-func (r *PerRepositoryRouter) RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath string) (RepositoryMutatorRoute, error) {
+func (r *PerRepositoryRouter) RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath, additionalRelativePath string) (RepositoryMutatorRoute, error) {
healthyNodes, err := r.healthyNodes(virtualStorage)
if err != nil {
return RepositoryMutatorRoute{}, err
@@ -200,6 +200,19 @@ func (r *PerRepositoryRouter) RouteRepositoryMutator(ctx context.Context, virtua
return RepositoryMutatorRoute{}, fmt.Errorf("get repository id: %w", err)
}
+ var additionalReplicaPath string
+ if additionalRelativePath != "" {
+ additionalRepositoryID, err := r.rs.GetRepositoryID(ctx, virtualStorage, additionalRelativePath)
+ if err != nil {
+ return RepositoryMutatorRoute{}, fmt.Errorf("get additional repository id: %w", err)
+ }
+
+ additionalReplicaPath, err = r.rs.GetReplicaPath(ctx, additionalRepositoryID)
+ if err != nil {
+ return RepositoryMutatorRoute{}, fmt.Errorf("get additional repository replica path: %w", err)
+ }
+ }
+
primary, err := r.pg.GetPrimary(ctx, virtualStorage, repositoryID)
if err != nil {
return RepositoryMutatorRoute{}, fmt.Errorf("get primary: %w", err)
@@ -228,7 +241,11 @@ func (r *PerRepositoryRouter) RouteRepositoryMutator(ctx context.Context, virtua
return RepositoryMutatorRoute{}, fmt.Errorf("get host assignments: %w", err)
}
- route := RepositoryMutatorRoute{RepositoryID: repositoryID, ReplicaPath: replicaPath}
+ route := RepositoryMutatorRoute{
+ RepositoryID: repositoryID,
+ ReplicaPath: replicaPath,
+ AdditionalReplicaPath: additionalReplicaPath,
+ }
for _, assigned := range assignedStorages {
node, healthy := healthySet[assigned]
if assigned == primary {
diff --git a/internal/praefect/router_per_repository_test.go b/internal/praefect/router_per_repository_test.go
index 8f50a1ac5..8372422d4 100644
--- a/internal/praefect/router_per_repository_test.go
+++ b/internal/praefect/router_per_repository_test.go
@@ -276,14 +276,15 @@ func TestPerRepositoryRouter_RouteRepositoryMutator(t *testing.T) {
}
for _, tc := range []struct {
- desc string
- virtualStorage string
- healthyNodes StaticHealthChecker
- consistentStorages []string
- secondaries []string
- replicationTargets []string
- error error
- assignedNodes StaticRepositoryAssignments
+ desc string
+ virtualStorage string
+ healthyNodes StaticHealthChecker
+ consistentStorages []string
+ secondaries []string
+ replicationTargets []string
+ error error
+ assignedNodes StaticRepositoryAssignments
+ noAdditionalRepository bool
}{
{
desc: "unknown virtual storage",
@@ -312,6 +313,14 @@ func TestPerRepositoryRouter_RouteRepositoryMutator(t *testing.T) {
secondaries: []string{"secondary-1", "secondary-2"},
},
{
+ desc: "no additional repository",
+ virtualStorage: "virtual-storage-1",
+ healthyNodes: StaticHealthChecker(configuredNodes),
+ consistentStorages: []string{"primary", "secondary-1", "secondary-2"},
+ secondaries: []string{"secondary-1", "secondary-2"},
+ noAdditionalRepository: true,
+ },
+ {
desc: "inconsistent secondary",
virtualStorage: "virtual-storage-1",
healthyNodes: StaticHealthChecker(configuredNodes),
@@ -368,14 +377,25 @@ func TestPerRepositoryRouter_RouteRepositoryMutator(t *testing.T) {
testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": configuredNodes})
- const relativePath = "repository"
+ const (
+ virtualStorage = "virtual-storage-1"
+ relativePath = "repository"
+ additionalRelativePath = "additional-repository"
+ additionalReplicaPath = "additional-replica-path"
+ )
rs := datastore.NewPostgresRepositoryStore(tx, nil)
- repositoryID, err := rs.ReserveRepositoryID(ctx, "virtual-storage-1", relativePath)
+ repositoryID, err := rs.ReserveRepositoryID(ctx, virtualStorage, relativePath)
require.NoError(t, err)
require.NoError(t,
- rs.CreateRepository(ctx, repositoryID, "virtual-storage-1", relativePath, relativePath, "primary", []string{"secondary-1", "secondary-2"}, nil, true, false),
+ rs.CreateRepository(ctx, repositoryID, virtualStorage, relativePath, relativePath, "primary", []string{"secondary-1", "secondary-2"}, nil, true, false),
+ )
+
+ additionalRepositoryID, err := rs.ReserveRepositoryID(ctx, virtualStorage, additionalRelativePath)
+ require.NoError(t, err)
+ require.NoError(t,
+ rs.CreateRepository(ctx, additionalRepositoryID, virtualStorage, additionalRelativePath, additionalReplicaPath, "primary", nil, nil, true, false),
)
if len(tc.consistentStorages) > 0 {
@@ -405,7 +425,14 @@ func TestPerRepositoryRouter_RouteRepositoryMutator(t *testing.T) {
nil,
)
- route, err := router.RouteRepositoryMutator(ctx, tc.virtualStorage, relativePath)
+ requestAdditionalRelativePath := additionalRelativePath
+ expectedAdditionalReplicaPath := additionalReplicaPath
+ if tc.noAdditionalRepository {
+ expectedAdditionalReplicaPath = ""
+ requestAdditionalRelativePath = ""
+ }
+
+ route, err := router.RouteRepositoryMutator(ctx, tc.virtualStorage, relativePath, requestAdditionalRelativePath)
require.Equal(t, tc.error, err)
if err == nil {
var secondaries []RouterNode
@@ -417,8 +444,9 @@ func TestPerRepositoryRouter_RouteRepositoryMutator(t *testing.T) {
}
require.Equal(t, RepositoryMutatorRoute{
- RepositoryID: repositoryID,
- ReplicaPath: relativePath,
+ RepositoryID: repositoryID,
+ ReplicaPath: relativePath,
+ AdditionalReplicaPath: expectedAdditionalReplicaPath,
Primary: RouterNode{
Storage: "primary",
Connection: conns[tc.virtualStorage]["primary"],