Welcome to mirror list, hosted at ThFree Co, Russian Federation.

router_node_manager.go « praefect « internal - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: f1f3c474aeaab63d3e8fd2e091e6b0d0d2e370e7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
package praefect

import (
	"context"
	"errors"
	"fmt"

	"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr"
	"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
	"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes"
)

type nodeManagerRouter struct {
	mgr nodes.Manager
	rs  datastore.RepositoryStore
}

func toRouterNode(node nodes.Node) RouterNode {
	return RouterNode{
		Storage:    node.GetStorage(),
		Connection: node.GetConnection(),
	}
}

func toRouterNodes(nodes []nodes.Node) []RouterNode {
	out := make([]RouterNode, len(nodes))
	for i := range nodes {
		out[i] = toRouterNode(nodes[i])
	}
	return out
}

// NewNodeManagerRouter returns a router that uses the NodeManager to make routing decisions.
func NewNodeManagerRouter(mgr nodes.Manager, rs datastore.RepositoryStore) Router {
	return &nodeManagerRouter{mgr: mgr, rs: rs}
}

func (r *nodeManagerRouter) RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RepositoryAccessorRoute, error) {
	if forcePrimary {
		shard, err := r.mgr.GetShard(ctx, virtualStorage)
		if err != nil {
			return RepositoryAccessorRoute{}, fmt.Errorf("get shard: %w", err)
		}

		return RepositoryAccessorRoute{ReplicaPath: relativePath, Node: toRouterNode(shard.Primary)}, nil
	}

	node, err := r.mgr.GetSyncedNode(ctx, virtualStorage, relativePath)
	if err != nil {
		return RepositoryAccessorRoute{}, fmt.Errorf("get synced node: %w", err)
	}

	return RepositoryAccessorRoute{ReplicaPath: relativePath, Node: toRouterNode(node)}, nil
}

func (r *nodeManagerRouter) RouteStorageAccessor(ctx context.Context, virtualStorage string) (RouterNode, error) {
	shard, err := r.mgr.GetShard(ctx, virtualStorage)
	if err != nil {
		return RouterNode{}, err
	}

	return toRouterNode(shard.Primary), nil
}

func (r *nodeManagerRouter) RouteStorageMutator(ctx context.Context, virtualStorage string) (StorageMutatorRoute, error) {
	shard, err := r.mgr.GetShard(ctx, virtualStorage)
	if err != nil {
		return StorageMutatorRoute{}, err
	}

	return StorageMutatorRoute{
		Primary:     toRouterNode(shard.Primary),
		Secondaries: toRouterNodes(shard.GetHealthySecondaries()),
	}, nil
}

func (r *nodeManagerRouter) RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath, additionalRelativePath string) (RepositoryMutatorRoute, error) {
	shard, err := r.mgr.GetShard(ctx, virtualStorage)
	if err != nil {
		return RepositoryMutatorRoute{}, fmt.Errorf("get shard: %w", err)
	}

	// The replica path is ignored as Rails' tests are the only user of NodeManagerRouter. The tests don't
	// set up a database, so the RepositoryStore here is always a mock. The mock doesn't know about the replica
	// paths of repositories and thus returns an empty string. This breaks the tests. Instead, we'll just keep
	// using the relative path in NodeManagerRouter.
	_, consistentStorages, err := r.rs.GetConsistentStorages(ctx, virtualStorage, relativePath)
	if err != nil && !errors.As(err, new(commonerr.RepositoryNotFoundError)) {
		return RepositoryMutatorRoute{}, fmt.Errorf("consistent storages: %w", err)
	}

	if len(consistentStorages) == 0 {
		// if there is no up to date storages we'll have to consider the storage
		// up to date as this will be the case on repository creation
		consistentStorages = map[string]struct{}{shard.Primary.GetStorage(): {}}
	}

	if _, ok := consistentStorages[shard.Primary.GetStorage()]; !ok {
		return RepositoryMutatorRoute{}, ErrRepositoryReadOnly
	}

	// Inconsistent nodes will anyway need repair so including them doesn't make sense. They
	// also might vote to abort which might unnecessarily fail the transaction.
	var replicationTargets []string
	// Only healthy secondaries which are consistent with the primary are allowed to take
	// part in the transaction. Unhealthy nodes would block the transaction until they come back.
	participatingSecondaries := make([]nodes.Node, 0, len(consistentStorages))
	for _, secondary := range shard.Secondaries {
		if _, ok := consistentStorages[secondary.GetStorage()]; ok && secondary.IsHealthy() {
			participatingSecondaries = append(participatingSecondaries, secondary)
			continue
		}

		replicationTargets = append(replicationTargets, secondary.GetStorage())
	}

	return RepositoryMutatorRoute{
		ReplicaPath:           relativePath,
		AdditionalReplicaPath: additionalRelativePath,
		Primary:               toRouterNode(shard.Primary),
		Secondaries:           toRouterNodes(participatingSecondaries),
		ReplicationTargets:    replicationTargets,
	}, nil
}

// RouteRepositoryCreation includes healthy secondaries in the transaction and sets the unhealthy secondaries as
// replication targets. The virtual storage's primary acts as the primary for every repository.
func (r *nodeManagerRouter) RouteRepositoryCreation(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error) {
	shard, err := r.mgr.GetShard(ctx, virtualStorage)
	if err != nil {
		return RepositoryMutatorRoute{}, fmt.Errorf("get shard: %w", err)
	}

	var secondaries []RouterNode
	var replicationTargets []string

	for _, secondary := range shard.Secondaries {
		if secondary.IsHealthy() {
			secondaries = append(secondaries, toRouterNode(secondary))
			continue
		}

		replicationTargets = append(replicationTargets, secondary.GetStorage())
	}

	return RepositoryMutatorRoute{
		Primary:               toRouterNode(shard.Primary),
		ReplicaPath:           relativePath,
		AdditionalReplicaPath: additionalRepoRelativePath,
		Secondaries:           secondaries,
		ReplicationTargets:    replicationTargets,
	}, nil
}

// RouteRepositoryMaintenance includes all healthy nodes regardless of whether they're consistent or
// not.
func (r *nodeManagerRouter) RouteRepositoryMaintenance(ctx context.Context, virtualStorage, relativePath string) (RepositoryMaintenanceRoute, error) {
	shard, err := r.mgr.GetShard(ctx, virtualStorage)
	if err != nil {
		return RepositoryMaintenanceRoute{}, fmt.Errorf("get shard: %w", err)
	}

	nodes := make([]RouterNode, 0, 1+len(shard.Secondaries))

	if shard.Primary.IsHealthy() {
		nodes = append(nodes, toRouterNode(shard.Primary))
	}

	for _, secondary := range shard.Secondaries {
		if secondary.IsHealthy() {
			nodes = append(nodes, toRouterNode(secondary))
			continue
		}
	}

	if len(nodes) == 0 {
		return RepositoryMaintenanceRoute{}, ErrNoHealthyNodes
	}

	return RepositoryMaintenanceRoute{
		ReplicaPath: relativePath,
		Nodes:       nodes,
	}, nil
}