1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
package praefect
import (
"context"
"errors"
"fmt"
"gitlab.com/gitlab-org/gitaly/internal/praefect/commonerr"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
)
type nodeManagerRouter struct {
mgr nodes.Manager
rs datastore.RepositoryStore
}
func toRouterNode(node nodes.Node) RouterNode {
return RouterNode{
Storage: node.GetStorage(),
Connection: node.GetConnection(),
}
}
func toRouterNodes(nodes []nodes.Node) []RouterNode {
out := make([]RouterNode, len(nodes))
for i := range nodes {
out[i] = toRouterNode(nodes[i])
}
return out
}
// NewNodeManagerRouter returns a router that uses the NodeManager to make routing decisions.
func NewNodeManagerRouter(mgr nodes.Manager, rs datastore.RepositoryStore) Router {
return &nodeManagerRouter{mgr: mgr, rs: rs}
}
func (r *nodeManagerRouter) RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RouterNode, error) {
if forcePrimary {
shard, err := r.mgr.GetShard(ctx, virtualStorage)
if err != nil {
return RouterNode{}, fmt.Errorf("get shard: %w", err)
}
return toRouterNode(shard.Primary), nil
}
node, err := r.mgr.GetSyncedNode(ctx, virtualStorage, relativePath)
if err != nil {
return RouterNode{}, fmt.Errorf("get synced node: %w", err)
}
return toRouterNode(node), nil
}
func (r *nodeManagerRouter) RouteStorageAccessor(ctx context.Context, virtualStorage string) (RouterNode, error) {
shard, err := r.mgr.GetShard(ctx, virtualStorage)
if err != nil {
return RouterNode{}, err
}
return toRouterNode(shard.Primary), nil
}
func (r *nodeManagerRouter) RouteStorageMutator(ctx context.Context, virtualStorage string) (StorageMutatorRoute, error) {
shard, err := r.mgr.GetShard(ctx, virtualStorage)
if err != nil {
return StorageMutatorRoute{}, err
}
return StorageMutatorRoute{
Primary: toRouterNode(shard.Primary),
Secondaries: toRouterNodes(shard.GetHealthySecondaries()),
}, nil
}
func (r *nodeManagerRouter) RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath string) (RepositoryMutatorRoute, error) {
shard, err := r.mgr.GetShard(ctx, virtualStorage)
if err != nil {
return RepositoryMutatorRoute{}, fmt.Errorf("get shard: %w", err)
}
consistentStorages, err := r.rs.GetConsistentStorages(ctx, virtualStorage, relativePath)
if err != nil && !errors.As(err, new(commonerr.RepositoryNotFoundError)) {
return RepositoryMutatorRoute{}, fmt.Errorf("consistent storages: %w", err)
}
if len(consistentStorages) == 0 {
// if there is no up to date storages we'll have to consider the storage
// up to date as this will be the case on repository creation
consistentStorages = map[string]struct{}{shard.Primary.GetStorage(): {}}
}
if _, ok := consistentStorages[shard.Primary.GetStorage()]; !ok {
return RepositoryMutatorRoute{}, ErrRepositoryReadOnly
}
// Inconsistent nodes will anyway need repair so including them doesn't make sense. They
// also might vote to abort which might unnecessarily fail the transaction.
var replicationTargets []string
// Only healthy secondaries which are consistent with the primary are allowed to take
// part in the transaction. Unhealthy nodes would block the transaction until they come back.
participatingSecondaries := make([]nodes.Node, 0, len(consistentStorages))
for _, secondary := range shard.Secondaries {
if _, ok := consistentStorages[secondary.GetStorage()]; ok && secondary.IsHealthy() {
participatingSecondaries = append(participatingSecondaries, secondary)
continue
}
replicationTargets = append(replicationTargets, secondary.GetStorage())
}
return RepositoryMutatorRoute{
Primary: toRouterNode(shard.Primary),
Secondaries: toRouterNodes(participatingSecondaries),
ReplicationTargets: replicationTargets,
}, nil
}
func (r *nodeManagerRouter) RouteRepositoryCreation(ctx context.Context, virtualStorage string) (RepositoryMutatorRoute, error) {
// nodeManagerRouter doesn't support repository assignments nor repository specific primaries. It
// is sufficient to route the requests as normal mutators.
return r.RouteRepositoryMutator(ctx, virtualStorage, "")
}
|