Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2022-02-04 13:48:53 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2022-02-04 13:55:55 +0300
commit9deb1c29c60ae29164238a8fb9cd7bf3e85a63bf (patch)
tree2e5789e37518b09ca9256cb3da4f498425a6c3d9
parentb3039284a69e472a3272d2b60c9f4eb28489f438 (diff)
Resolve additional repository's replica path on creation callssmh-resolve-additional-repo-create
The object pool service RPCs carry an additional repository along with the target repository. This repository lies on the same storage as the target repository. The additional repository's relative path needs to also be rewritten by Praefect with the replica path as it would have been when the additional repository was originally created. Praefect currently does so for other mutator RPCs than 'create' type as there are no 'create' type RPCs yet that contain an additional repository. CreateObjectPool is incorrectly not marked as 'create' type call though. In preparation for fixing that, this commit already introduces relative path rewriting for additional repositories in 'create' type RPCs.
-rw-r--r--internal/praefect/coordinator.go16
-rw-r--r--internal/praefect/router.go2
-rw-r--r--internal/praefect/router_node_manager.go11
-rw-r--r--internal/praefect/router_per_repository.go45
-rw-r--r--internal/praefect/router_per_repository_test.go63
5 files changed, 85 insertions, 52 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 0e691ccca..52a7826e6 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -475,21 +475,21 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
return nil, fmt.Errorf("mutator call: replication details: %w", err)
}
+ var additionalRepoRelativePath string
+ if additionalRepo, ok, err := call.methodInfo.AdditionalRepo(call.msg); err != nil {
+ return nil, helper.ErrInvalidArgument(err)
+ } else if ok {
+ additionalRepoRelativePath = additionalRepo.GetRelativePath()
+ }
+
var route RepositoryMutatorRoute
switch change {
case datastore.CreateRepo:
- route, err = c.router.RouteRepositoryCreation(ctx, virtualStorage, targetRepo.RelativePath)
+ route, err = c.router.RouteRepositoryCreation(ctx, virtualStorage, targetRepo.RelativePath, additionalRepoRelativePath)
if err != nil {
return nil, fmt.Errorf("route repository creation: %w", err)
}
default:
- var additionalRepoRelativePath string
- if additionalRepo, ok, err := call.methodInfo.AdditionalRepo(call.msg); err != nil {
- return nil, helper.ErrInvalidArgument(err)
- } else if ok {
- additionalRepoRelativePath = additionalRepo.GetRelativePath()
- }
-
route, err = c.router.RouteRepositoryMutator(ctx, virtualStorage, targetRepo.RelativePath, additionalRepoRelativePath)
if err != nil {
if errors.Is(err, ErrRepositoryReadOnly) {
diff --git a/internal/praefect/router.go b/internal/praefect/router.go
index 8c3ec9a5c..1b6f19387 100644
--- a/internal/praefect/router.go
+++ b/internal/praefect/router.go
@@ -65,5 +65,5 @@ type Router interface {
RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error)
// RouteRepositoryCreation decides returns the primary and secondaries that should handle the repository creation
// request. It is up to the caller to store the assignments and primary information after finishing the RPC.
- RouteRepositoryCreation(ctx context.Context, virtualStorage, relativePath string) (RepositoryMutatorRoute, error)
+ RouteRepositoryCreation(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error)
}
diff --git a/internal/praefect/router_node_manager.go b/internal/praefect/router_node_manager.go
index 7793c4d6d..9cceaea0a 100644
--- a/internal/praefect/router_node_manager.go
+++ b/internal/praefect/router_node_manager.go
@@ -125,7 +125,7 @@ func (r *nodeManagerRouter) RouteRepositoryMutator(ctx context.Context, virtualS
// RouteRepositoryCreation includes healthy secondaries in the transaction and sets the unhealthy secondaries as
// replication targets. The virtual storage's primary acts as the primary for every repository.
-func (r *nodeManagerRouter) RouteRepositoryCreation(ctx context.Context, virtualStorage, relativePath string) (RepositoryMutatorRoute, error) {
+func (r *nodeManagerRouter) RouteRepositoryCreation(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error) {
shard, err := r.mgr.GetShard(ctx, virtualStorage)
if err != nil {
return RepositoryMutatorRoute{}, fmt.Errorf("get shard: %w", err)
@@ -144,9 +144,10 @@ func (r *nodeManagerRouter) RouteRepositoryCreation(ctx context.Context, virtual
}
return RepositoryMutatorRoute{
- Primary: toRouterNode(shard.Primary),
- ReplicaPath: relativePath,
- Secondaries: secondaries,
- ReplicationTargets: replicationTargets,
+ Primary: toRouterNode(shard.Primary),
+ ReplicaPath: relativePath,
+ AdditionalReplicaPath: additionalRepoRelativePath,
+ Secondaries: secondaries,
+ ReplicationTargets: replicationTargets,
}, nil
}
diff --git a/internal/praefect/router_per_repository.go b/internal/praefect/router_per_repository.go
index f6d3f89ed..bd3a37f66 100644
--- a/internal/praefect/router_per_repository.go
+++ b/internal/praefect/router_per_repository.go
@@ -190,6 +190,19 @@ func (r *PerRepositoryRouter) RouteRepositoryAccessor(ctx context.Context, virtu
}, nil
}
+func (r *PerRepositoryRouter) resolveAdditionalReplicaPath(ctx context.Context, virtualStorage, additionalRelativePath string) (string, error) {
+ if additionalRelativePath == "" {
+ return "", nil
+ }
+
+ additionalRepositoryID, err := r.rs.GetRepositoryID(ctx, virtualStorage, additionalRelativePath)
+ if err != nil {
+ return "", fmt.Errorf("get additional repository id: %w", err)
+ }
+
+ return r.rs.GetReplicaPath(ctx, additionalRepositoryID)
+}
+
//nolint: revive,stylecheck // This is unintentionally missing documentation.
func (r *PerRepositoryRouter) RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath, additionalRelativePath string) (RepositoryMutatorRoute, error) {
healthyNodes, err := r.healthyNodes(virtualStorage)
@@ -202,17 +215,9 @@ func (r *PerRepositoryRouter) RouteRepositoryMutator(ctx context.Context, virtua
return RepositoryMutatorRoute{}, fmt.Errorf("get repository id: %w", err)
}
- var additionalReplicaPath string
- if additionalRelativePath != "" {
- additionalRepositoryID, err := r.rs.GetRepositoryID(ctx, virtualStorage, additionalRelativePath)
- if err != nil {
- return RepositoryMutatorRoute{}, fmt.Errorf("get additional repository id: %w", err)
- }
-
- additionalReplicaPath, err = r.rs.GetReplicaPath(ctx, additionalRepositoryID)
- if err != nil {
- return RepositoryMutatorRoute{}, fmt.Errorf("get additional repository replica path: %w", err)
- }
+ additionalReplicaPath, err := r.resolveAdditionalReplicaPath(ctx, virtualStorage, additionalRelativePath)
+ if err != nil {
+ return RepositoryMutatorRoute{}, fmt.Errorf("resolve additional replica path: %w", err)
}
primary, err := r.pg.GetPrimary(ctx, virtualStorage, repositoryID)
@@ -280,7 +285,12 @@ func (r *PerRepositoryRouter) RouteRepositoryMutator(ctx context.Context, virtua
// RouteRepositoryCreation picks a random healthy node to act as the primary node and selects the secondary nodes
// if assignments are enabled. Healthy secondaries take part in the transaction, unhealthy secondaries are set as
// replication targets.
-func (r *PerRepositoryRouter) RouteRepositoryCreation(ctx context.Context, virtualStorage, relativePath string) (RepositoryMutatorRoute, error) {
+func (r *PerRepositoryRouter) RouteRepositoryCreation(ctx context.Context, virtualStorage, relativePath, additionalRelativePath string) (RepositoryMutatorRoute, error) {
+ additionalReplicaPath, err := r.resolveAdditionalReplicaPath(ctx, virtualStorage, additionalRelativePath)
+ if err != nil {
+ return RepositoryMutatorRoute{}, fmt.Errorf("resolve additional replica path: %w", err)
+ }
+
healthyNodes, err := r.healthyNodes(virtualStorage)
if err != nil {
return RepositoryMutatorRoute{}, err
@@ -348,10 +358,11 @@ func (r *PerRepositoryRouter) RouteRepositoryCreation(ctx context.Context, virtu
}
return RepositoryMutatorRoute{
- RepositoryID: id,
- ReplicaPath: relativePath,
- Primary: primary,
- Secondaries: secondaries,
- ReplicationTargets: replicationTargets,
+ RepositoryID: id,
+ ReplicaPath: relativePath,
+ AdditionalReplicaPath: additionalReplicaPath,
+ Primary: primary,
+ Secondaries: secondaries,
+ ReplicationTargets: replicationTargets,
}, nil
}
diff --git a/internal/praefect/router_per_repository_test.go b/internal/praefect/router_per_repository_test.go
index b98d94100..dfc68552d 100644
--- a/internal/praefect/router_per_repository_test.go
+++ b/internal/praefect/router_per_repository_test.go
@@ -473,19 +473,24 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) {
db := testdb.New(t)
- const relativePath = "relative-path"
+ const (
+ relativePath = "relative-path"
+ additionalRelativePath = "additional-relative-path"
+ additionalReplicaPath = "additional-replica-path"
+ )
for _, tc := range []struct {
- desc string
- virtualStorage string
- healthyNodes StaticHealthChecker
- replicationFactor int
- primaryCandidates int
- primaryPick int
- secondaryCandidates int
- repositoryExists bool
- matchRoute matcher
- error error
+ desc string
+ virtualStorage string
+ healthyNodes StaticHealthChecker
+ replicationFactor int
+ primaryCandidates int
+ primaryPick int
+ secondaryCandidates int
+ repositoryExists bool
+ additionalRelativePath string
+ matchRoute matcher
+ error error
}{
{
desc: "no healthy nodes",
@@ -499,17 +504,19 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) {
error: nodes.ErrVirtualStorageNotExist,
},
{
- desc: "no healthy secondaries",
- virtualStorage: "virtual-storage-1",
- healthyNodes: StaticHealthChecker{"virtual-storage-1": {"primary"}},
- primaryCandidates: 1,
- primaryPick: 0,
+ desc: "no healthy secondaries",
+ virtualStorage: "virtual-storage-1",
+ healthyNodes: StaticHealthChecker{"virtual-storage-1": {"primary"}},
+ primaryCandidates: 1,
+ primaryPick: 0,
+ additionalRelativePath: additionalRelativePath,
matchRoute: requireOneOf(
RepositoryMutatorRoute{
- RepositoryID: 1,
- ReplicaPath: relativePath,
- Primary: RouterNode{Storage: "primary", Connection: primaryConn},
- ReplicationTargets: []string{"secondary-1", "secondary-2"},
+ RepositoryID: 1,
+ ReplicaPath: relativePath,
+ AdditionalReplicaPath: additionalReplicaPath,
+ Primary: RouterNode{Storage: "primary", Connection: primaryConn},
+ ReplicationTargets: []string{"secondary-1", "secondary-2"},
},
),
},
@@ -614,6 +621,18 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) {
repositoryExists: true,
error: fmt.Errorf("reserve repository id: %w", commonerr.ErrRepositoryAlreadyExists),
},
+ {
+ desc: "additional repository doesn't exist",
+ virtualStorage: "virtual-storage-1",
+ additionalRelativePath: "non-existent",
+ error: fmt.Errorf(
+ "resolve additional replica path: %w",
+ fmt.Errorf(
+ "get additional repository id: %w",
+ commonerr.NewRepositoryNotFoundError("virtual-storage-1", "non-existent"),
+ ),
+ ),
+ },
} {
t.Run(tc.desc, func(t *testing.T) {
ctx := testhelper.Context(t)
@@ -627,6 +646,8 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) {
)
}
+ require.NoError(t, rs.CreateRepository(ctx, 2, "virtual-storage-1", additionalRelativePath, additionalReplicaPath, "primary", nil, nil, true, true))
+
route, err := NewPerRepositoryRouter(
Connections{
"virtual-storage-1": {
@@ -650,7 +671,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) {
nil,
rs,
map[string]int{"virtual-storage-1": tc.replicationFactor},
- ).RouteRepositoryCreation(ctx, tc.virtualStorage, relativePath)
+ ).RouteRepositoryCreation(ctx, tc.virtualStorage, relativePath, tc.additionalRelativePath)
if tc.error != nil {
require.Equal(t, tc.error, err)
return