1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
package nodes
import (
"context"
"database/sql"
"errors"
"fmt"
"gitlab.com/gitlab-org/gitaly/v16/internal/log"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore/glsql"
)
// ErrNoPrimary is returned if the repository does not have a primary.
var ErrNoPrimary = errors.New("no primary")
// PerRepositoryElector implements an elector that selects a primary for each repository.
// It elects a healthy node with most recent generation as the primary. If all nodes are
// on the same generation, it picks one randomly to balance repositories in simple fashion.
type PerRepositoryElector struct {
logger log.Logger
db glsql.Querier
}
// NewPerRepositoryElector returns a new per repository primary elector.
func NewPerRepositoryElector(logger log.Logger, db glsql.Querier) *PerRepositoryElector {
return &PerRepositoryElector{
logger: logger,
db: db,
}
}
// GetPrimary returns the primary storage of a repository. If the current primary is invalid, a new primary
// is elected if there are valid candidates for promotion.
func (pr *PerRepositoryElector) GetPrimary(ctx context.Context, virtualStorage string, repositoryID int64) (string, error) {
// The query below contains three parts to account for visibility with read-committed isolation mode and
// concurrent updates.
//
// If the repository already has a valid primary, the `reread` and `election` CTEs don't return results and
// the query simply returns the primary from the `repositories` table (aliased as `snapshot`). No locks are
// acquired. Same happens if the repository has an invalid primary but there are no valid candidates for
// promotion.
//
// If the primary is invalid, the `reread` CTE locks the record. Upon acquiring the lock, Postgres rereads
// the record. `reread` then contains an up to date record which has potentially been updated by a concurrent
// transaction. If the reread record still contains an invalid primary, the `election` CTE performs an election.
// If the repository has a valid primary after rereading the record, the `election` CTE doesn't re-elect a primary.
//
// The query then returns the primary from the correct CTE. The priority is:
// 1. `election`, as this indicates this transaction re-elected the primary and the CTE now contains the most
// recent change
// 2. `reread`, as this indicates a concurrent transaction had potentially changed the primary.
// 3. `snapshot`, if the current primary was valid in the transaction's database snapshot.
var current, previous sql.NullString
if err := pr.db.QueryRowContext(ctx, `
WITH reread AS (
SELECT true AS valid, repository_id, "primary"
FROM repositories
WHERE repository_id = $1
AND NOT EXISTS (
SELECT FROM valid_primaries
WHERE valid_primaries.repository_id = repositories.repository_id
AND storage = "primary"
) AND EXISTS (
SELECT FROM valid_primaries
WHERE valid_primaries.repository_id = repositories.repository_id
)
FOR NO KEY UPDATE
),
election AS (
UPDATE repositories
SET "primary" = (
SELECT storage
FROM valid_primaries
WHERE valid_primaries.repository_id = repositories.repository_id
ORDER BY random()
LIMIT 1
)
FROM reread
WHERE repositories.repository_id = reread.repository_id
AND NOT EXISTS (
SELECT FROM valid_primaries
WHERE valid_primaries.repository_id = $1
AND storage = reread.primary
)
RETURNING true AS valid, repositories.primary
)
SELECT
CASE WHEN election.valid
THEN election.primary
ELSE
CASE WHEN reread.valid
THEN reread.primary
ELSE snapshot.primary
END
END,
CASE WHEN reread.valid
THEN reread.primary
ELSE snapshot.primary
END
FROM repositories AS snapshot
LEFT JOIN reread ON reread.valid
LEFT JOIN election ON election.valid
WHERE snapshot.repository_id = $1
`,
repositoryID,
).Scan(¤t, &previous); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return "", datastore.ErrRepositoryNotFound
}
return "", fmt.Errorf("scan: %w", err)
}
if current != previous {
pr.logger.WithFields(log.Fields{
"repository_id": repositoryID,
"current_primary": current.String,
"previous_primary": previous.String,
}).InfoContext(ctx, "primary node changed")
}
if !current.Valid {
return "", ErrNoPrimary
}
return current.String, nil
}
|