Welcome to mirror list, hosted at ThFree Co, Russian Federation.

per_repository.go « nodes « praefect « internal - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 0b1682f7315f98b2c0414830a5d73c3ba82755e8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package nodes

import (
	"context"
	"database/sql"
	"errors"
	"fmt"

	"gitlab.com/gitlab-org/gitaly/v16/internal/log"
	"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore"
	"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore/glsql"
)

// ErrNoPrimary is returned if the repository does not have a primary.
var ErrNoPrimary = errors.New("no primary")

// PerRepositoryElector implements an elector that selects a primary for each repository.
// It elects a healthy node with most recent generation as the primary. If all nodes are
// on the same generation, it picks one randomly to balance repositories in simple fashion.
type PerRepositoryElector struct {
	logger log.Logger
	db     glsql.Querier
}

// NewPerRepositoryElector returns a new per repository primary elector.
func NewPerRepositoryElector(logger log.Logger, db glsql.Querier) *PerRepositoryElector {
	return &PerRepositoryElector{
		logger: logger,
		db:     db,
	}
}

// GetPrimary returns the primary storage of a repository. If the current primary is invalid, a new primary
// is elected if there are valid candidates for promotion.
func (pr *PerRepositoryElector) GetPrimary(ctx context.Context, virtualStorage string, repositoryID int64) (string, error) {
	// The query below contains three parts to account for visibility with read-committed isolation mode and
	// concurrent updates.
	//
	// If the repository already has a valid primary, the `reread` and `election` CTEs don't return results and
	// the query simply returns the primary from the `repositories` table (aliased as `snapshot`). No locks are
	// acquired. Same happens if the repository has an invalid primary but there are no valid candidates for
	// promotion.
	//
	// If the primary is invalid, the `reread` CTE locks the record. Upon acquiring the lock, Postgres rereads
	// the record. `reread` then contains an up to date record which has potentially been updated by a concurrent
	// transaction. If the reread record still contains an invalid primary, the `election` CTE performs an election.
	// If the repository has a valid primary after rereading the record, the `election` CTE doesn't re-elect a primary.
	//
	// The query then returns the primary from the correct CTE. The priority is:
	//   1. `election`, as this indicates this transaction re-elected the primary and the CTE now contains the most
	//      recent change
	//   2. `reread`, as this indicates a concurrent transaction had potentially changed the primary.
	//   3. `snapshot`, if the current primary was valid in the transcation's database snapshot.
	var current, previous sql.NullString
	if err := pr.db.QueryRowContext(ctx, `
WITH reread AS (
	SELECT true AS valid, repository_id, "primary"
	FROM repositories
	WHERE repository_id = $1
	AND NOT EXISTS (
		SELECT FROM valid_primaries
		WHERE valid_primaries.repository_id = repositories.repository_id
		AND storage = "primary"
	) AND EXISTS (
		SELECT FROM valid_primaries
		WHERE valid_primaries.repository_id = repositories.repository_id
	)
	FOR NO KEY UPDATE
),

election AS (
	UPDATE repositories
	SET "primary" = (
		SELECT storage
		FROM valid_primaries
		WHERE valid_primaries.repository_id = repositories.repository_id
		ORDER BY random()
		LIMIT 1
	)
	FROM reread
	WHERE repositories.repository_id = reread.repository_id
	AND NOT EXISTS (
		SELECT FROM valid_primaries
		WHERE valid_primaries.repository_id = $1
		AND storage = reread.primary
	)
	RETURNING true AS valid, repositories.primary
)

SELECT
	CASE WHEN election.valid
		THEN election.primary
		ELSE
			CASE WHEN reread.valid
				THEN reread.primary
				ELSE snapshot.primary
			END
	END,
	CASE WHEN reread.valid
		THEN reread.primary
		ELSE snapshot.primary
	END
FROM repositories AS snapshot
LEFT JOIN reread ON reread.valid
LEFT JOIN election ON election.valid
WHERE snapshot.repository_id = $1
`,
		repositoryID,
	).Scan(&current, &previous); err != nil {
		if errors.Is(err, sql.ErrNoRows) {
			return "", datastore.ErrRepositoryNotFound
		}

		return "", fmt.Errorf("scan: %w", err)
	}

	if current != previous {
		pr.logger.WithFields(log.Fields{
			"repository_id":    repositoryID,
			"current_primary":  current.String,
			"previous_primary": previous.String,
		}).InfoContext(ctx, "primary node changed")
	}

	if !current.Valid {
		return "", ErrNoPrimary
	}

	return current.String, nil
}