1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
|
package nodes
import (
"context"
"database/sql"
"errors"
"fmt"
"time"
"github.com/lib/pq"
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/internal/praefect/commonerr"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql"
)
// ErrNoPrimary is returned if the repository does not have a primary.
var ErrNoPrimary = errors.New("no primary")
// PerRepositoryElector implements an elector that selects a primary for each repository.
// It elects a healthy node with most recent generation as the primary. If all nodes are
// on the same generation, it picks one randomly to balance repositories in simple fashion.
type PerRepositoryElector struct {
log logrus.FieldLogger
db glsql.Querier
hc HealthConsensus
handleError func(error) error
retryWait time.Duration
}
// HealthConsensus returns the cluster's consensus of healthy nodes.
type HealthConsensus interface {
// HealthConsensus returns a list of healthy nodes by cluster consensus. Returned
// set may contains nodes not present in the local configuration if the cluster has
// deemed them healthy.
HealthConsensus() map[string][]string
}
// NewPerRepositoryElector returns a new per repository primary elector.
func NewPerRepositoryElector(log logrus.FieldLogger, db glsql.Querier, hc HealthConsensus) *PerRepositoryElector {
log = log.WithField("component", "PerRepositoryElector")
return &PerRepositoryElector{
log: log,
db: db,
hc: hc,
handleError: func(err error) error {
log.WithError(err).Error("failed performing failovers")
return nil
},
retryWait: time.Second,
}
}
// primaryChanges is a type for collecting promotion and demotion counts. It's keyed by
// virtual storage -> storage -> (promoted | demoted).
type primaryChanges map[string]map[string]map[string]int
// Run listens on the trigger channel for updates. On each update, it tries to elect new primaries for
// repositories which have an unhealthy primary. Blocks until the context is canceled or the trigger
// channel is closed. Returns the error from the context.
func (pr *PerRepositoryElector) Run(ctx context.Context, trigger <-chan struct{}) error {
pr.log.Info("per repository elector started")
defer pr.log.Info("per repository elector stopped")
for {
select {
case <-ctx.Done():
return ctx.Err()
case _, ok := <-trigger:
if !ok {
return nil
}
for {
if err := pr.performFailovers(ctx); err != nil {
if err := pr.handleError(err); err != nil {
return err
}
// Reattempt the failovers after one second if it failed. The trigger channel only ticks
// when a health change has occurred. If we fail to perform failovers, we would
// only try again when the health of a node has changed. This would leave some
// repositories without a healthy primary. Ideally we'd fix this by getting rid of
// the virtual storage wide failovers and perform failovers lazily for repositories
// when necessary: https://gitlab.com/gitlab-org/gitaly/-/issues/3207
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(pr.retryWait):
continue
}
}
break
}
}
}
}
func (pr *PerRepositoryElector) performFailovers(ctx context.Context) error {
healthyNodes := pr.hc.HealthConsensus()
var virtualStorages, physicalStorages []string
for virtualStorage, nodes := range healthyNodes {
for _, node := range nodes {
virtualStorages = append(virtualStorages, virtualStorage)
physicalStorages = append(physicalStorages, node)
}
}
rows, err := pr.db.QueryContext(ctx, `
WITH healthy_storages AS (
SELECT unnest($1::text[]) AS virtual_storage, unnest($2::text[]) AS storage
),
updated AS (
UPDATE repositories
SET "primary" = (
SELECT storage
FROM healthy_storages
LEFT JOIN storage_repositories USING (virtual_storage, storage)
WHERE virtual_storage = repositories.virtual_storage
AND storage_repositories.relative_path = repositories.relative_path
AND (
-- If assignments exist for the repository, only the assigned storages elected as primary.
-- If no assignments exist, any healthy node can be elected as the primary
SELECT COUNT(*) = 0 OR COUNT(*) FILTER (WHERE storage = storage_repositories.storage) = 1
FROM repository_assignments
WHERE repository_assignments.virtual_storage = storage_repositories.virtual_storage
AND repository_assignments.relative_path = storage_repositories.relative_path
)
AND NOT EXISTS (
-- This check exists to prevent us from electing a primary that is pending deletion. The primary
-- could accept a write and lose it when the deletion is carried out.
SELECT true
FROM replication_queue
WHERE state NOT IN ('completed', 'dead', 'cancelled')
AND job->>'change' = 'delete_replica'
AND job->>'virtual_storage' = virtual_storage
AND job->>'relative_path' = relative_path
AND job->>'target_node_storage' = storage
)
ORDER BY generation DESC NULLS LAST, random()
LIMIT 1
)
WHERE NOT EXISTS (
SELECT 1
FROM healthy_storages
WHERE virtual_storage = repositories.virtual_storage
AND storage = repositories."primary"
)
RETURNING virtual_storage, relative_path, "primary"
),
demoted AS (
SELECT virtual_storage, repositories."primary" AS storage, COUNT(*) AS demoted
FROM repositories
JOIN updated USING (virtual_storage, relative_path)
WHERE repositories."primary" IS NOT NULL
GROUP BY virtual_storage, repositories."primary"
),
promoted AS (
SELECT virtual_storage, "primary" AS storage, COUNT(*) AS promoted
FROM updated
WHERE updated."primary" IS NOT NULL
GROUP BY virtual_storage, "primary"
)
SELECT virtual_storage, storage, COALESCE(demoted, 0), COALESCE(promoted, 0)
FROM demoted
FULL JOIN promoted USING (virtual_storage, storage)
`, pq.StringArray(virtualStorages), pq.StringArray(physicalStorages))
if err != nil {
return fmt.Errorf("query: %w", err)
}
defer rows.Close()
changes := primaryChanges{}
for rows.Next() {
var virtualStorage, storage string
var demoted, promoted int
if err := rows.Scan(&virtualStorage, &storage, &demoted, &promoted); err != nil {
return fmt.Errorf("scan: %w", err)
}
storageChanges, ok := changes[virtualStorage]
if !ok {
storageChanges = map[string]map[string]int{}
}
storageChanges[storage] = map[string]int{"demoted": demoted, "promoted": promoted}
changes[virtualStorage] = storageChanges
}
if err := rows.Err(); err != nil {
return fmt.Errorf("rows: %w", err)
}
if len(changes) > 0 {
pr.log.WithField("changes", changes).Info("performed failovers")
} else {
pr.log.Info("attempting failovers resulted no changes")
}
return nil
}
// GetPrimary returns the primary storage of a repository.
func (pr *PerRepositoryElector) GetPrimary(ctx context.Context, virtualStorage, relativePath string) (string, error) {
var primary sql.NullString
if err := pr.db.QueryRowContext(ctx, `
SELECT "primary"
FROM repositories
WHERE virtual_storage = $1
AND relative_path = $2
`,
virtualStorage,
relativePath,
).Scan(&primary); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return "", commonerr.NewRepositoryNotFoundError(virtualStorage, relativePath)
}
return "", fmt.Errorf("scan: %w", err)
}
if !primary.Valid {
return "", ErrNoPrimary
}
return primary.String, nil
}
|