Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZeger-Jan van de Weg <git@zjvandeweg.nl>2021-07-12 16:04:39 +0300
committerZeger-Jan van de Weg <git@zjvandeweg.nl>2021-07-12 16:04:39 +0300
commit871046170dc4a74fd6c8c011c36ea0f0ccb0e785 (patch)
tree9bf2a6441569b18d7b26bec4e7c19962be362768
parentc8a29dc9fd507cab8835b2e1152b94a6ac96de35 (diff)
parentfe6d52572ed4619191dbe73ade0b7baa4fceaafe (diff)
Merge branch 'smh-perform-lazy-failovers' into 'master'
Perform failovers lazily Closes #3207 See merge request gitlab-org/gitaly!3543
-rw-r--r--cmd/praefect/main.go10
-rw-r--r--internal/praefect/nodes/per_repository.go168
-rw-r--r--internal/praefect/nodes/per_repository_test.go243
-rw-r--r--internal/testhelper/db.go51
4 files changed, 147 insertions, 325 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index fde28f0ef..2418b8a90 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -325,15 +325,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
}()
healthChecker = hm
- elector := nodes.NewPerRepositoryElector(logger, db)
-
- if conf.Failover.Enabled {
- go func() {
- if err := elector.Run(ctx, hm.Updated()); err != nil {
- logger.WithError(err).Error("primary elector exited")
- }
- }()
- }
+ elector := nodes.NewPerRepositoryElector(db)
primaryGetter = elector
assignmentStore = datastore.NewAssignmentStore(db, conf.StorageNames())
diff --git a/internal/praefect/nodes/per_repository.go b/internal/praefect/nodes/per_repository.go
index 92342b66f..93a96b8e8 100644
--- a/internal/praefect/nodes/per_repository.go
+++ b/internal/praefect/nodes/per_repository.go
@@ -5,8 +5,8 @@ import (
"database/sql"
"errors"
"fmt"
- "time"
+ "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
@@ -18,76 +18,19 @@ var ErrNoPrimary = errors.New("no primary")
// PerRepositoryElector implements an elector that selects a primary for each repository.
// It elects a healthy node with most recent generation as the primary. If all nodes are
// on the same generation, it picks one randomly to balance repositories in simple fashion.
-type PerRepositoryElector struct {
- log logrus.FieldLogger
- db glsql.Querier
- handleError func(error) error
- retryWait time.Duration
-}
+type PerRepositoryElector struct{ db glsql.Querier }
// NewPerRepositoryElector returns a new per repository primary elector.
-func NewPerRepositoryElector(log logrus.FieldLogger, db glsql.Querier) *PerRepositoryElector {
- log = log.WithField("component", "PerRepositoryElector")
- return &PerRepositoryElector{
- log: log,
- db: db,
- handleError: func(err error) error {
- log.WithError(err).Error("failed performing failovers")
- return nil
- },
- retryWait: time.Second,
- }
-}
-
-// primaryChanges is a type for collecting promotion and demotion counts. It's keyed by
-// virtual storage -> storage -> (promoted | demoted).
-type primaryChanges map[string]map[string]map[string]int
-
-// Run listens on the trigger channel for updates. On each update, it tries to elect new primaries for
-// repositories which have an unhealthy primary. Blocks until the context is canceled or the trigger
-// channel is closed. Returns the error from the context.
-func (pr *PerRepositoryElector) Run(ctx context.Context, trigger <-chan struct{}) error {
- pr.log.Info("per repository elector started")
- defer pr.log.Info("per repository elector stopped")
-
- for {
- select {
- case <-ctx.Done():
- return ctx.Err()
- case _, ok := <-trigger:
- if !ok {
- return nil
- }
-
- for {
- if err := pr.performFailovers(ctx); err != nil {
- if err := pr.handleError(err); err != nil {
- return err
- }
-
- // Reattempt the failovers after one second if it failed. The trigger channel only ticks
- // when a health change has occurred. If we fail to perform failovers, we would
- // only try again when the health of a node has changed. This would leave some
- // repositories without a healthy primary. Ideally we'd fix this by getting rid of
- // the virtual storage wide failovers and perform failovers lazily for repositories
- // when necessary: https://gitlab.com/gitlab-org/gitaly/-/issues/3207
- select {
- case <-ctx.Done():
- return ctx.Err()
- case <-time.After(pr.retryWait):
- continue
- }
- }
-
- break
- }
- }
- }
+func NewPerRepositoryElector(db glsql.Querier) *PerRepositoryElector {
+ return &PerRepositoryElector{db: db}
}
-func (pr *PerRepositoryElector) performFailovers(ctx context.Context) error {
- rows, err := pr.db.QueryContext(ctx, `
-WITH updated AS (
+// GetPrimary returns the primary storage of a repository. If the primary is not a valid primary anymore, an election
+// is attempted. If there are no valid primaries, the current primary is simply demoted.
+func (pr *PerRepositoryElector) GetPrimary(ctx context.Context, virtualStorage, relativePath string) (string, error) {
+ var current, previous sql.NullString
+ if err := pr.db.QueryRowContext(ctx, `
+WITH new AS (
UPDATE repositories
SET "primary" = (
SELECT storage
@@ -97,82 +40,32 @@ WITH updated AS (
ORDER BY random()
LIMIT 1
)
- WHERE NOT EXISTS (
+ WHERE virtual_storage = $1
+ AND relative_path = $2
+ AND NOT EXISTS (
SELECT FROM valid_primaries
WHERE virtual_storage = repositories.virtual_storage
AND relative_path = repositories.relative_path
AND storage = repositories."primary"
)
- RETURNING virtual_storage, relative_path, "primary"
-),
-
-demoted AS (
- SELECT virtual_storage, repositories."primary" AS storage, COUNT(*) AS demoted
- FROM repositories
- JOIN updated USING (virtual_storage, relative_path)
- WHERE repositories."primary" IS NOT NULL
- GROUP BY virtual_storage, repositories."primary"
-),
-
-promoted AS (
- SELECT virtual_storage, "primary" AS storage, COUNT(*) AS promoted
- FROM updated
- WHERE updated."primary" IS NOT NULL
- GROUP BY virtual_storage, "primary"
+ RETURNING true AS elected, "primary"
)
-SELECT virtual_storage, storage, COALESCE(demoted, 0), COALESCE(promoted, 0)
-FROM demoted
-FULL JOIN promoted USING (virtual_storage, storage)
- `)
- if err != nil {
- return fmt.Errorf("query: %w", err)
- }
- defer rows.Close()
-
- changes := primaryChanges{}
- for rows.Next() {
- var virtualStorage, storage string
- var demoted, promoted int
-
- if err := rows.Scan(&virtualStorage, &storage, &demoted, &promoted); err != nil {
- return fmt.Errorf("scan: %w", err)
- }
-
- storageChanges, ok := changes[virtualStorage]
- if !ok {
- storageChanges = map[string]map[string]int{}
- }
-
- storageChanges[storage] = map[string]int{"demoted": demoted, "promoted": promoted}
- changes[virtualStorage] = storageChanges
- }
-
- if err := rows.Err(); err != nil {
- return fmt.Errorf("rows: %w", err)
- }
-
- if len(changes) > 0 {
- pr.log.WithField("changes", changes).Info("performed failovers")
- } else {
- pr.log.Info("attempting failovers resulted no changes")
- }
-
- return nil
-}
-
-// GetPrimary returns the primary storage of a repository.
-func (pr *PerRepositoryElector) GetPrimary(ctx context.Context, virtualStorage, relativePath string) (string, error) {
- var primary sql.NullString
- if err := pr.db.QueryRowContext(ctx, `
-SELECT "primary"
-FROM repositories
+SELECT
+ CASE WHEN new.elected
+ THEN new.primary
+ ELSE old.primary
+ END,
+ old.primary
+FROM repositories AS old
+FULL JOIN new ON true
WHERE virtual_storage = $1
AND relative_path = $2
+
`,
virtualStorage,
relativePath,
- ).Scan(&primary); err != nil {
+ ).Scan(&current, &previous); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return "", commonerr.NewRepositoryNotFoundError(virtualStorage, relativePath)
}
@@ -180,9 +73,18 @@ AND relative_path = $2
return "", fmt.Errorf("scan: %w", err)
}
- if !primary.Valid {
+ if current != previous {
+ ctxlogrus.Extract(ctx).WithFields(logrus.Fields{
+ "virtual_storage": virtualStorage,
+ "relative_path": relativePath,
+ "current_primary": current.String,
+ "previous_primary": previous.String,
+ }).Info("primary node changed")
+ }
+
+ if !current.Valid {
return "", ErrNoPrimary
}
- return primary.String, nil
+ return current.String, nil
}
diff --git a/internal/praefect/nodes/per_repository_test.go b/internal/praefect/nodes/per_repository_test.go
index 021a6465e..5fb825eb3 100644
--- a/internal/praefect/nodes/per_repository_test.go
+++ b/internal/praefect/nodes/per_repository_test.go
@@ -3,56 +3,18 @@
package nodes
import (
- "context"
"database/sql"
- "errors"
"testing"
- "time"
- "github.com/lib/pq"
+ "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
- "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
)
-func setHealthyNodes(t testing.TB, ctx context.Context, db glsql.Querier, healthyNodes map[string]map[string][]string) {
- var praefects, virtualStorages, storages []string
- for praefect, virtualStors := range healthyNodes {
- for virtualStorage, stors := range virtualStors {
- for _, storage := range stors {
- praefects = append(praefects, praefect)
- virtualStorages = append(virtualStorages, virtualStorage)
- storages = append(storages, storage)
- }
- }
- }
-
- _, err := db.ExecContext(ctx, `
-WITH clear_previous_checks AS ( DELETE FROM node_status )
-
-INSERT INTO node_status (praefect_name, shard_name, node_name, last_contact_attempt_at, last_seen_active_at)
-SELECT
- unnest($1::text[]) AS praefect_name,
- unnest($2::text[]) AS shard_name,
- unnest($3::text[]) AS node_name,
- NOW() AS last_contact_attempt_at,
- NOW() AS last_seen_active_at
-ON CONFLICT (praefect_name, shard_name, node_name) DO UPDATE SET
- last_contact_attempt_at = NOW(),
- last_seen_active_at = NOW()
- `,
- pq.StringArray(praefects),
- pq.StringArray(virtualStorages),
- pq.StringArray(storages),
- )
- require.NoError(t, err)
-}
-
func TestPerRepositoryElector(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
@@ -79,34 +41,10 @@ func TestPerRepositoryElector(t *testing.T) {
}
}
- type logMatcher func(t testing.TB, entry logrus.Entry)
-
- anyChange := func(expected ...primaryChanges) logMatcher {
- return func(t testing.TB, entry logrus.Entry) {
- require.Equal(t, "performed failovers", entry.Message)
-
- var fields []logrus.Fields
- for _, changes := range expected {
- fields = append(fields, logrus.Fields{
- "component": "PerRepositoryElector",
- "changes": changes,
- })
- }
-
- require.Contains(t, fields, entry.Data)
- }
- }
-
- noChanges := func(t testing.TB, entry logrus.Entry) {
- t.Helper()
- require.Equal(t, "attempting failovers resulted no changes", entry.Message)
- }
-
type steps []struct {
healthyNodes map[string][]string
error error
primary matcher
- matchLogs logMatcher
}
for _, tc := range []struct {
@@ -130,8 +68,7 @@ func TestPerRepositoryElector(t *testing.T) {
healthyNodes: map[string][]string{
"virtual-storage-1": {"gitaly-1", "gitaly-2", "gitaly-3"},
},
- primary: any("gitaly-1"),
- matchLogs: anyChange(primaryChanges{"virtual-storage-1": {"gitaly-1": {"demoted": 0, "promoted": 1}}}),
+ primary: any("gitaly-1"),
},
},
},
@@ -150,9 +87,8 @@ func TestPerRepositoryElector(t *testing.T) {
healthyNodes: map[string][]string{
"virtual-storage-1": {"gitaly-2", "gitaly-3"},
},
- error: ErrNoPrimary,
- primary: noPrimary(),
- matchLogs: noChanges,
+ error: ErrNoPrimary,
+ primary: noPrimary(),
},
},
},
@@ -170,9 +106,8 @@ func TestPerRepositoryElector(t *testing.T) {
healthyNodes: map[string][]string{
"virtual-storage-1": {"gitaly-2", "gitaly-3"},
},
- error: ErrNoPrimary,
- primary: noPrimary(),
- matchLogs: noChanges,
+ error: ErrNoPrimary,
+ primary: noPrimary(),
},
},
},
@@ -192,10 +127,6 @@ func TestPerRepositoryElector(t *testing.T) {
"virtual-storage-1": {"gitaly-1", "gitaly-2", "gitaly-3"},
},
primary: any("gitaly-1", "gitaly-2"),
- matchLogs: anyChange(
- primaryChanges{"virtual-storage-1": {"gitaly-1": {"demoted": 0, "promoted": 1}}},
- primaryChanges{"virtual-storage-1": {"gitaly-2": {"demoted": 0, "promoted": 1}}},
- ),
},
},
},
@@ -215,18 +146,13 @@ func TestPerRepositoryElector(t *testing.T) {
healthyNodes: map[string][]string{
"virtual-storage-1": {"gitaly-1", "gitaly-3"},
},
- primary: any("gitaly-1"),
- matchLogs: anyChange(primaryChanges{"virtual-storage-1": {"gitaly-1": {"demoted": 0, "promoted": 1}}}),
+ primary: any("gitaly-1"),
},
{
healthyNodes: map[string][]string{
"virtual-storage-1": {"gitaly-2", "gitaly-3"},
},
primary: any("gitaly-2"),
- matchLogs: anyChange(primaryChanges{"virtual-storage-1": {
- "gitaly-1": {"demoted": 1, "promoted": 0},
- "gitaly-2": {"demoted": 0, "promoted": 1},
- }}),
},
},
},
@@ -245,16 +171,14 @@ func TestPerRepositoryElector(t *testing.T) {
healthyNodes: map[string][]string{
"virtual-storage-1": {"gitaly-1", "gitaly-2", "gitaly-3"},
},
- primary: any("gitaly-1"),
- matchLogs: anyChange(primaryChanges{"virtual-storage-1": {"gitaly-1": {"demoted": 0, "promoted": 1}}}),
+ primary: any("gitaly-1"),
},
{
healthyNodes: map[string][]string{
"virtual-storage-1": {"gitaly-2", "gitaly-3"},
},
- error: ErrNoPrimary,
- primary: noPrimary(),
- matchLogs: anyChange(primaryChanges{"virtual-storage-1": {"gitaly-1": {"demoted": 1, "promoted": 0}}}),
+ error: ErrNoPrimary,
+ primary: noPrimary(),
},
},
},
@@ -274,18 +198,13 @@ func TestPerRepositoryElector(t *testing.T) {
healthyNodes: map[string][]string{
"virtual-storage-1": {"gitaly-1", "gitaly-3"},
},
- primary: any("gitaly-1"),
- matchLogs: anyChange(primaryChanges{"virtual-storage-1": {"gitaly-1": {"demoted": 0, "promoted": 1}}}),
+ primary: any("gitaly-1"),
},
{
healthyNodes: map[string][]string{
"virtual-storage-1": {"gitaly-2", "gitaly-3"},
},
primary: any("gitaly-2"),
- matchLogs: anyChange(primaryChanges{"virtual-storage-1": {
- "gitaly-1": {"demoted": 1, "promoted": 0},
- "gitaly-2": {"demoted": 0, "promoted": 1},
- }}),
},
},
},
@@ -304,18 +223,13 @@ func TestPerRepositoryElector(t *testing.T) {
healthyNodes: map[string][]string{
"virtual-storage-1": {"gitaly-1", "gitaly-2", "gitaly-3"},
},
- primary: any("gitaly-1"),
- matchLogs: anyChange(primaryChanges{"virtual-storage-1": {"gitaly-1": {"demoted": 0, "promoted": 1}}}),
+ primary: any("gitaly-1"),
},
{
healthyNodes: map[string][]string{
"virtual-storage-1": {"gitaly-2", "gitaly-3"},
},
primary: any("gitaly-2"),
- matchLogs: anyChange(primaryChanges{"virtual-storage-1": {
- "gitaly-1": {"demoted": 1, "promoted": 0},
- "gitaly-2": {"demoted": 0, "promoted": 1},
- }}),
},
},
},
@@ -334,18 +248,13 @@ func TestPerRepositoryElector(t *testing.T) {
healthyNodes: map[string][]string{
"virtual-storage-1": {"gitaly-2", "gitaly-3"},
},
- primary: any("gitaly-2"),
- matchLogs: anyChange(primaryChanges{"virtual-storage-1": {"gitaly-2": {"demoted": 0, "promoted": 1}}}),
+ primary: any("gitaly-2"),
},
{
healthyNodes: map[string][]string{
"virtual-storage-1": {"gitaly-1", "gitaly-2", "gitaly-3"},
},
primary: any("gitaly-1"),
- matchLogs: anyChange(primaryChanges{"virtual-storage-1": {
- "gitaly-1": {"demoted": 0, "promoted": 1},
- "gitaly-2": {"demoted": 1, "promoted": 0},
- }}),
},
},
},
@@ -364,15 +273,13 @@ func TestPerRepositoryElector(t *testing.T) {
healthyNodes: map[string][]string{
"virtual-storage-1": {"gitaly-2", "gitaly-3"},
},
- primary: any("gitaly-2"),
- matchLogs: anyChange(primaryChanges{"virtual-storage-1": {"gitaly-2": {"demoted": 0, "promoted": 1}}}),
+ primary: any("gitaly-2"),
},
{
healthyNodes: map[string][]string{
"virtual-storage-1": {"gitaly-1", "gitaly-2", "gitaly-3"},
},
- primary: any("gitaly-2"),
- matchLogs: noChanges,
+ primary: any("gitaly-2"),
},
},
},
@@ -391,16 +298,14 @@ func TestPerRepositoryElector(t *testing.T) {
healthyNodes: map[string][]string{
"virtual-storage-1": {"gitaly-1", "gitaly-2", "gitaly-3"},
},
- primary: any("gitaly-1"),
- matchLogs: anyChange(primaryChanges{"virtual-storage-1": {"gitaly-1": {"demoted": 0, "promoted": 1}}}),
+ primary: any("gitaly-1"),
},
{
healthyNodes: map[string][]string{
"virtual-storage-1": {"gitaly-2", "gitaly-3"},
},
- error: ErrNoPrimary,
- primary: noPrimary(),
- matchLogs: anyChange(primaryChanges{"virtual-storage-1": {"gitaly-1": {"demoted": 1, "promoted": 0}}}),
+ error: ErrNoPrimary,
+ primary: noPrimary(),
},
},
},
@@ -429,9 +334,8 @@ func TestPerRepositoryElector(t *testing.T) {
healthyNodes: map[string][]string{
"virtual-storage-1": {"gitaly-1"},
},
- error: ErrNoPrimary,
- primary: noPrimary(),
- matchLogs: noChanges,
+ error: ErrNoPrimary,
+ primary: noPrimary(),
},
},
},
@@ -460,9 +364,8 @@ func TestPerRepositoryElector(t *testing.T) {
healthyNodes: map[string][]string{
"virtual-storage-1": {"gitaly-1"},
},
- error: ErrNoPrimary,
- primary: noPrimary(),
- matchLogs: noChanges,
+ error: ErrNoPrimary,
+ primary: noPrimary(),
},
},
},
@@ -491,9 +394,8 @@ func TestPerRepositoryElector(t *testing.T) {
healthyNodes: map[string][]string{
"virtual-storage-1": {"gitaly-1"},
},
- error: ErrNoPrimary,
- primary: noPrimary(),
- matchLogs: noChanges,
+ error: ErrNoPrimary,
+ primary: noPrimary(),
},
},
},
@@ -567,8 +469,7 @@ func TestPerRepositoryElector(t *testing.T) {
healthyNodes: map[string][]string{
"virtual-storage-1": {"gitaly-1"},
},
- primary: any("gitaly-1"),
- matchLogs: anyChange(primaryChanges{"virtual-storage-1": {"gitaly-1": {"demoted": 0, "promoted": 1}}}),
+ primary: any("gitaly-1"),
},
},
},
@@ -576,9 +477,8 @@ func TestPerRepositoryElector(t *testing.T) {
desc: "repository does not exist",
steps: steps{
{
- error: commonerr.NewRepositoryNotFoundError("virtual-storage-1", "relative-path-1"),
- primary: noPrimary(),
- matchLogs: noChanges,
+ error: commonerr.NewRepositoryNotFoundError("virtual-storage-1", "relative-path-1"),
+ primary: noPrimary(),
},
},
},
@@ -616,27 +516,27 @@ func TestPerRepositoryElector(t *testing.T) {
require.NoError(t, err)
}
+ previousPrimary := ""
for _, step := range tc.steps {
- runElection := func(tx *sql.Tx, matchLogs logMatcher) {
- setHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect-0": step.healthyNodes})
+ runElection := func(tx *sql.Tx) (string, *logrus.Entry) {
+ testhelper.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect-0": step.healthyNodes})
- // The first transaction runs first
logger, hook := test.NewNullLogger()
- elector := NewPerRepositoryElector(logrus.NewEntry(logger), tx)
- elector.handleError = func(err error) error { return err }
+ elector := NewPerRepositoryElector(tx)
- trigger := make(chan struct{}, 1)
- trigger <- struct{}{}
- close(trigger)
+ primary, err := elector.GetPrimary(
+ ctxlogrus.ToContext(ctx, logrus.NewEntry(logger)), "virtual-storage-1", "relative-path-1")
+ require.Equal(t, step.error, err)
+ require.Less(t, len(hook.Entries), 2)
- require.NoError(t, elector.Run(ctx, trigger))
+ var entry *logrus.Entry
+ if len(hook.Entries) == 1 {
+ entry = &hook.Entries[0]
+ }
- primary, err := elector.GetPrimary(ctx, "virtual-storage-1", "relative-path-1")
- assert.Equal(t, step.error, err)
- step.primary(t, primary)
+ require.NoError(t, tx.Commit())
- require.Len(t, hook.Entries, 3)
- matchLogs(t, hook.Entries[1])
+ return primary, entry
}
// Run every step with two concurrent transactions to ensure two Praefect's running
@@ -654,53 +554,30 @@ func TestPerRepositoryElector(t *testing.T) {
require.NoError(t, err)
defer txSecond.Rollback()
- runElection(txFirst, step.matchLogs)
-
- require.NoError(t, txFirst.Commit())
+ primary, logEntry := runElection(txFirst)
+ step.primary(t, primary)
+
+ if previousPrimary != primary {
+ require.NotNil(t, logEntry)
+ require.Equal(t, "primary node changed", logEntry.Message)
+ require.Equal(t, logrus.Fields{
+ "virtual_storage": "virtual-storage-1",
+ "relative_path": "relative-path-1",
+ "current_primary": primary,
+ "previous_primary": previousPrimary,
+ }, logEntry.Data)
+ } else {
+ require.Nil(t, logEntry)
+ }
// Run the second election on the same database snapshot. This should result in no changes.
// Running this prior to the first transaction committing would block.
- runElection(txSecond, noChanges)
+ secondPrimary, secondLogEntry := runElection(txSecond)
+ require.Equal(t, primary, secondPrimary)
+ require.Nil(t, secondLogEntry)
- require.NoError(t, txSecond.Commit())
+ previousPrimary = primary
}
})
}
}
-
-func TestPerRepositoryElector_Retry(t *testing.T) {
- ctx, cancel := testhelper.Context()
- defer cancel()
-
- dbCalls := 0
- handleErrorCalls := 0
- elector := NewPerRepositoryElector(
- testhelper.DiscardTestLogger(t),
- &glsql.MockQuerier{
- QueryContextFunc: func(context.Context, string, ...interface{}) (*sql.Rows, error) {
- dbCalls++
-
- return nil, assert.AnError
- },
- },
- )
- elector.retryWait = time.Nanosecond
- elector.handleError = func(err error) error {
- handleErrorCalls++
- require.True(t, errors.Is(err, assert.AnError))
-
- if handleErrorCalls == 2 {
- return context.Canceled
- }
-
- return nil
- }
-
- // we are only sending one trigger, second attempt must come from the retry logic
- trigger := make(chan struct{}, 1)
- trigger <- struct{}{}
-
- require.Equal(t, context.Canceled, elector.Run(ctx, trigger))
- require.Equal(t, 2, dbCalls)
- require.Equal(t, 2, handleErrorCalls)
-}
diff --git a/internal/testhelper/db.go b/internal/testhelper/db.go
new file mode 100644
index 000000000..be5cb1c1e
--- /dev/null
+++ b/internal/testhelper/db.go
@@ -0,0 +1,51 @@
+package testhelper
+
+import (
+ "context"
+ "testing"
+
+ "github.com/lib/pq"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
+)
+
+// SetHealthyNodes sets the healthy nodes in the database as determined by the passed in map. The healthyNodes map is keyed by
+// praefect name -> virtual storage -> storage. On each run, it clears all previous health checks from the table, so the
+// passed in nodes are the only ones considered healthy after the function. As the healthy nodes are determined by the time of
+// the last successful health check, this should be run in the same transastion as the tested query to prevent flakiness.
+//
+//nolint:golint
+func SetHealthyNodes(t testing.TB, ctx context.Context, db glsql.Querier, healthyNodes map[string]map[string][]string) {
+ t.Helper()
+
+ var praefects, virtualStorages, storages []string
+ for praefect, virtualStors := range healthyNodes {
+ for virtualStorage, stors := range virtualStors {
+ for _, storage := range stors {
+ praefects = append(praefects, praefect)
+ virtualStorages = append(virtualStorages, virtualStorage)
+ storages = append(storages, storage)
+ }
+ }
+ }
+
+ _, err := db.ExecContext(ctx, `
+WITH clear_previous_checks AS ( DELETE FROM node_status )
+
+INSERT INTO node_status (praefect_name, shard_name, node_name, last_contact_attempt_at, last_seen_active_at)
+SELECT
+ unnest($1::text[]) AS praefect_name,
+ unnest($2::text[]) AS shard_name,
+ unnest($3::text[]) AS node_name,
+ NOW() AS last_contact_attempt_at,
+ NOW() AS last_seen_active_at
+ON CONFLICT (praefect_name, shard_name, node_name) DO UPDATE SET
+ last_contact_attempt_at = NOW(),
+ last_seen_active_at = NOW()
+ `,
+ pq.StringArray(praefects),
+ pq.StringArray(virtualStorages),
+ pq.StringArray(storages),
+ )
+ require.NoError(t, err)
+}