diff options
author | Zeger-Jan van de Weg <git@zjvandeweg.nl> | 2021-07-12 16:04:39 +0300 |
---|---|---|
committer | Zeger-Jan van de Weg <git@zjvandeweg.nl> | 2021-07-12 16:04:39 +0300 |
commit | 871046170dc4a74fd6c8c011c36ea0f0ccb0e785 (patch) | |
tree | 9bf2a6441569b18d7b26bec4e7c19962be362768 | |
parent | c8a29dc9fd507cab8835b2e1152b94a6ac96de35 (diff) | |
parent | fe6d52572ed4619191dbe73ade0b7baa4fceaafe (diff) |
Merge branch 'smh-perform-lazy-failovers' into 'master'
Perform failovers lazily
Closes #3207
See merge request gitlab-org/gitaly!3543
-rw-r--r-- | cmd/praefect/main.go | 10 | ||||
-rw-r--r-- | internal/praefect/nodes/per_repository.go | 168 | ||||
-rw-r--r-- | internal/praefect/nodes/per_repository_test.go | 243 | ||||
-rw-r--r-- | internal/testhelper/db.go | 51 |
4 files changed, 147 insertions, 325 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index fde28f0ef..2418b8a90 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -325,15 +325,7 @@ func run(cfgs []starter.Config, conf config.Config) error { }() healthChecker = hm - elector := nodes.NewPerRepositoryElector(logger, db) - - if conf.Failover.Enabled { - go func() { - if err := elector.Run(ctx, hm.Updated()); err != nil { - logger.WithError(err).Error("primary elector exited") - } - }() - } + elector := nodes.NewPerRepositoryElector(db) primaryGetter = elector assignmentStore = datastore.NewAssignmentStore(db, conf.StorageNames()) diff --git a/internal/praefect/nodes/per_repository.go b/internal/praefect/nodes/per_repository.go index 92342b66f..93a96b8e8 100644 --- a/internal/praefect/nodes/per_repository.go +++ b/internal/praefect/nodes/per_repository.go @@ -5,8 +5,8 @@ import ( "database/sql" "errors" "fmt" - "time" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" @@ -18,76 +18,19 @@ var ErrNoPrimary = errors.New("no primary") // PerRepositoryElector implements an elector that selects a primary for each repository. // It elects a healthy node with most recent generation as the primary. If all nodes are // on the same generation, it picks one randomly to balance repositories in simple fashion. -type PerRepositoryElector struct { - log logrus.FieldLogger - db glsql.Querier - handleError func(error) error - retryWait time.Duration -} +type PerRepositoryElector struct{ db glsql.Querier } // NewPerRepositoryElector returns a new per repository primary elector. -func NewPerRepositoryElector(log logrus.FieldLogger, db glsql.Querier) *PerRepositoryElector { - log = log.WithField("component", "PerRepositoryElector") - return &PerRepositoryElector{ - log: log, - db: db, - handleError: func(err error) error { - log.WithError(err).Error("failed performing failovers") - return nil - }, - retryWait: time.Second, - } -} - -// primaryChanges is a type for collecting promotion and demotion counts. It's keyed by -// virtual storage -> storage -> (promoted | demoted). -type primaryChanges map[string]map[string]map[string]int - -// Run listens on the trigger channel for updates. On each update, it tries to elect new primaries for -// repositories which have an unhealthy primary. Blocks until the context is canceled or the trigger -// channel is closed. Returns the error from the context. -func (pr *PerRepositoryElector) Run(ctx context.Context, trigger <-chan struct{}) error { - pr.log.Info("per repository elector started") - defer pr.log.Info("per repository elector stopped") - - for { - select { - case <-ctx.Done(): - return ctx.Err() - case _, ok := <-trigger: - if !ok { - return nil - } - - for { - if err := pr.performFailovers(ctx); err != nil { - if err := pr.handleError(err); err != nil { - return err - } - - // Reattempt the failovers after one second if it failed. The trigger channel only ticks - // when a health change has occurred. If we fail to perform failovers, we would - // only try again when the health of a node has changed. This would leave some - // repositories without a healthy primary. Ideally we'd fix this by getting rid of - // the virtual storage wide failovers and perform failovers lazily for repositories - // when necessary: https://gitlab.com/gitlab-org/gitaly/-/issues/3207 - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(pr.retryWait): - continue - } - } - - break - } - } - } +func NewPerRepositoryElector(db glsql.Querier) *PerRepositoryElector { + return &PerRepositoryElector{db: db} } -func (pr *PerRepositoryElector) performFailovers(ctx context.Context) error { - rows, err := pr.db.QueryContext(ctx, ` -WITH updated AS ( +// GetPrimary returns the primary storage of a repository. If the primary is not a valid primary anymore, an election +// is attempted. If there are no valid primaries, the current primary is simply demoted. +func (pr *PerRepositoryElector) GetPrimary(ctx context.Context, virtualStorage, relativePath string) (string, error) { + var current, previous sql.NullString + if err := pr.db.QueryRowContext(ctx, ` +WITH new AS ( UPDATE repositories SET "primary" = ( SELECT storage @@ -97,82 +40,32 @@ WITH updated AS ( ORDER BY random() LIMIT 1 ) - WHERE NOT EXISTS ( + WHERE virtual_storage = $1 + AND relative_path = $2 + AND NOT EXISTS ( SELECT FROM valid_primaries WHERE virtual_storage = repositories.virtual_storage AND relative_path = repositories.relative_path AND storage = repositories."primary" ) - RETURNING virtual_storage, relative_path, "primary" -), - -demoted AS ( - SELECT virtual_storage, repositories."primary" AS storage, COUNT(*) AS demoted - FROM repositories - JOIN updated USING (virtual_storage, relative_path) - WHERE repositories."primary" IS NOT NULL - GROUP BY virtual_storage, repositories."primary" -), - -promoted AS ( - SELECT virtual_storage, "primary" AS storage, COUNT(*) AS promoted - FROM updated - WHERE updated."primary" IS NOT NULL - GROUP BY virtual_storage, "primary" + RETURNING true AS elected, "primary" ) -SELECT virtual_storage, storage, COALESCE(demoted, 0), COALESCE(promoted, 0) -FROM demoted -FULL JOIN promoted USING (virtual_storage, storage) - `) - if err != nil { - return fmt.Errorf("query: %w", err) - } - defer rows.Close() - - changes := primaryChanges{} - for rows.Next() { - var virtualStorage, storage string - var demoted, promoted int - - if err := rows.Scan(&virtualStorage, &storage, &demoted, &promoted); err != nil { - return fmt.Errorf("scan: %w", err) - } - - storageChanges, ok := changes[virtualStorage] - if !ok { - storageChanges = map[string]map[string]int{} - } - - storageChanges[storage] = map[string]int{"demoted": demoted, "promoted": promoted} - changes[virtualStorage] = storageChanges - } - - if err := rows.Err(); err != nil { - return fmt.Errorf("rows: %w", err) - } - - if len(changes) > 0 { - pr.log.WithField("changes", changes).Info("performed failovers") - } else { - pr.log.Info("attempting failovers resulted no changes") - } - - return nil -} - -// GetPrimary returns the primary storage of a repository. -func (pr *PerRepositoryElector) GetPrimary(ctx context.Context, virtualStorage, relativePath string) (string, error) { - var primary sql.NullString - if err := pr.db.QueryRowContext(ctx, ` -SELECT "primary" -FROM repositories +SELECT + CASE WHEN new.elected + THEN new.primary + ELSE old.primary + END, + old.primary +FROM repositories AS old +FULL JOIN new ON true WHERE virtual_storage = $1 AND relative_path = $2 + `, virtualStorage, relativePath, - ).Scan(&primary); err != nil { + ).Scan(¤t, &previous); err != nil { if errors.Is(err, sql.ErrNoRows) { return "", commonerr.NewRepositoryNotFoundError(virtualStorage, relativePath) } @@ -180,9 +73,18 @@ AND relative_path = $2 return "", fmt.Errorf("scan: %w", err) } - if !primary.Valid { + if current != previous { + ctxlogrus.Extract(ctx).WithFields(logrus.Fields{ + "virtual_storage": virtualStorage, + "relative_path": relativePath, + "current_primary": current.String, + "previous_primary": previous.String, + }).Info("primary node changed") + } + + if !current.Valid { return "", ErrNoPrimary } - return primary.String, nil + return current.String, nil } diff --git a/internal/praefect/nodes/per_repository_test.go b/internal/praefect/nodes/per_repository_test.go index 021a6465e..5fb825eb3 100644 --- a/internal/praefect/nodes/per_repository_test.go +++ b/internal/praefect/nodes/per_repository_test.go @@ -3,56 +3,18 @@ package nodes import ( - "context" "database/sql" - "errors" "testing" - "time" - "github.com/lib/pq" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "github.com/sirupsen/logrus" "github.com/sirupsen/logrus/hooks/test" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" ) -func setHealthyNodes(t testing.TB, ctx context.Context, db glsql.Querier, healthyNodes map[string]map[string][]string) { - var praefects, virtualStorages, storages []string - for praefect, virtualStors := range healthyNodes { - for virtualStorage, stors := range virtualStors { - for _, storage := range stors { - praefects = append(praefects, praefect) - virtualStorages = append(virtualStorages, virtualStorage) - storages = append(storages, storage) - } - } - } - - _, err := db.ExecContext(ctx, ` -WITH clear_previous_checks AS ( DELETE FROM node_status ) - -INSERT INTO node_status (praefect_name, shard_name, node_name, last_contact_attempt_at, last_seen_active_at) -SELECT - unnest($1::text[]) AS praefect_name, - unnest($2::text[]) AS shard_name, - unnest($3::text[]) AS node_name, - NOW() AS last_contact_attempt_at, - NOW() AS last_seen_active_at -ON CONFLICT (praefect_name, shard_name, node_name) DO UPDATE SET - last_contact_attempt_at = NOW(), - last_seen_active_at = NOW() - `, - pq.StringArray(praefects), - pq.StringArray(virtualStorages), - pq.StringArray(storages), - ) - require.NoError(t, err) -} - func TestPerRepositoryElector(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() @@ -79,34 +41,10 @@ func TestPerRepositoryElector(t *testing.T) { } } - type logMatcher func(t testing.TB, entry logrus.Entry) - - anyChange := func(expected ...primaryChanges) logMatcher { - return func(t testing.TB, entry logrus.Entry) { - require.Equal(t, "performed failovers", entry.Message) - - var fields []logrus.Fields - for _, changes := range expected { - fields = append(fields, logrus.Fields{ - "component": "PerRepositoryElector", - "changes": changes, - }) - } - - require.Contains(t, fields, entry.Data) - } - } - - noChanges := func(t testing.TB, entry logrus.Entry) { - t.Helper() - require.Equal(t, "attempting failovers resulted no changes", entry.Message) - } - type steps []struct { healthyNodes map[string][]string error error primary matcher - matchLogs logMatcher } for _, tc := range []struct { @@ -130,8 +68,7 @@ func TestPerRepositoryElector(t *testing.T) { healthyNodes: map[string][]string{ "virtual-storage-1": {"gitaly-1", "gitaly-2", "gitaly-3"}, }, - primary: any("gitaly-1"), - matchLogs: anyChange(primaryChanges{"virtual-storage-1": {"gitaly-1": {"demoted": 0, "promoted": 1}}}), + primary: any("gitaly-1"), }, }, }, @@ -150,9 +87,8 @@ func TestPerRepositoryElector(t *testing.T) { healthyNodes: map[string][]string{ "virtual-storage-1": {"gitaly-2", "gitaly-3"}, }, - error: ErrNoPrimary, - primary: noPrimary(), - matchLogs: noChanges, + error: ErrNoPrimary, + primary: noPrimary(), }, }, }, @@ -170,9 +106,8 @@ func TestPerRepositoryElector(t *testing.T) { healthyNodes: map[string][]string{ "virtual-storage-1": {"gitaly-2", "gitaly-3"}, }, - error: ErrNoPrimary, - primary: noPrimary(), - matchLogs: noChanges, + error: ErrNoPrimary, + primary: noPrimary(), }, }, }, @@ -192,10 +127,6 @@ func TestPerRepositoryElector(t *testing.T) { "virtual-storage-1": {"gitaly-1", "gitaly-2", "gitaly-3"}, }, primary: any("gitaly-1", "gitaly-2"), - matchLogs: anyChange( - primaryChanges{"virtual-storage-1": {"gitaly-1": {"demoted": 0, "promoted": 1}}}, - primaryChanges{"virtual-storage-1": {"gitaly-2": {"demoted": 0, "promoted": 1}}}, - ), }, }, }, @@ -215,18 +146,13 @@ func TestPerRepositoryElector(t *testing.T) { healthyNodes: map[string][]string{ "virtual-storage-1": {"gitaly-1", "gitaly-3"}, }, - primary: any("gitaly-1"), - matchLogs: anyChange(primaryChanges{"virtual-storage-1": {"gitaly-1": {"demoted": 0, "promoted": 1}}}), + primary: any("gitaly-1"), }, { healthyNodes: map[string][]string{ "virtual-storage-1": {"gitaly-2", "gitaly-3"}, }, primary: any("gitaly-2"), - matchLogs: anyChange(primaryChanges{"virtual-storage-1": { - "gitaly-1": {"demoted": 1, "promoted": 0}, - "gitaly-2": {"demoted": 0, "promoted": 1}, - }}), }, }, }, @@ -245,16 +171,14 @@ func TestPerRepositoryElector(t *testing.T) { healthyNodes: map[string][]string{ "virtual-storage-1": {"gitaly-1", "gitaly-2", "gitaly-3"}, }, - primary: any("gitaly-1"), - matchLogs: anyChange(primaryChanges{"virtual-storage-1": {"gitaly-1": {"demoted": 0, "promoted": 1}}}), + primary: any("gitaly-1"), }, { healthyNodes: map[string][]string{ "virtual-storage-1": {"gitaly-2", "gitaly-3"}, }, - error: ErrNoPrimary, - primary: noPrimary(), - matchLogs: anyChange(primaryChanges{"virtual-storage-1": {"gitaly-1": {"demoted": 1, "promoted": 0}}}), + error: ErrNoPrimary, + primary: noPrimary(), }, }, }, @@ -274,18 +198,13 @@ func TestPerRepositoryElector(t *testing.T) { healthyNodes: map[string][]string{ "virtual-storage-1": {"gitaly-1", "gitaly-3"}, }, - primary: any("gitaly-1"), - matchLogs: anyChange(primaryChanges{"virtual-storage-1": {"gitaly-1": {"demoted": 0, "promoted": 1}}}), + primary: any("gitaly-1"), }, { healthyNodes: map[string][]string{ "virtual-storage-1": {"gitaly-2", "gitaly-3"}, }, primary: any("gitaly-2"), - matchLogs: anyChange(primaryChanges{"virtual-storage-1": { - "gitaly-1": {"demoted": 1, "promoted": 0}, - "gitaly-2": {"demoted": 0, "promoted": 1}, - }}), }, }, }, @@ -304,18 +223,13 @@ func TestPerRepositoryElector(t *testing.T) { healthyNodes: map[string][]string{ "virtual-storage-1": {"gitaly-1", "gitaly-2", "gitaly-3"}, }, - primary: any("gitaly-1"), - matchLogs: anyChange(primaryChanges{"virtual-storage-1": {"gitaly-1": {"demoted": 0, "promoted": 1}}}), + primary: any("gitaly-1"), }, { healthyNodes: map[string][]string{ "virtual-storage-1": {"gitaly-2", "gitaly-3"}, }, primary: any("gitaly-2"), - matchLogs: anyChange(primaryChanges{"virtual-storage-1": { - "gitaly-1": {"demoted": 1, "promoted": 0}, - "gitaly-2": {"demoted": 0, "promoted": 1}, - }}), }, }, }, @@ -334,18 +248,13 @@ func TestPerRepositoryElector(t *testing.T) { healthyNodes: map[string][]string{ "virtual-storage-1": {"gitaly-2", "gitaly-3"}, }, - primary: any("gitaly-2"), - matchLogs: anyChange(primaryChanges{"virtual-storage-1": {"gitaly-2": {"demoted": 0, "promoted": 1}}}), + primary: any("gitaly-2"), }, { healthyNodes: map[string][]string{ "virtual-storage-1": {"gitaly-1", "gitaly-2", "gitaly-3"}, }, primary: any("gitaly-1"), - matchLogs: anyChange(primaryChanges{"virtual-storage-1": { - "gitaly-1": {"demoted": 0, "promoted": 1}, - "gitaly-2": {"demoted": 1, "promoted": 0}, - }}), }, }, }, @@ -364,15 +273,13 @@ func TestPerRepositoryElector(t *testing.T) { healthyNodes: map[string][]string{ "virtual-storage-1": {"gitaly-2", "gitaly-3"}, }, - primary: any("gitaly-2"), - matchLogs: anyChange(primaryChanges{"virtual-storage-1": {"gitaly-2": {"demoted": 0, "promoted": 1}}}), + primary: any("gitaly-2"), }, { healthyNodes: map[string][]string{ "virtual-storage-1": {"gitaly-1", "gitaly-2", "gitaly-3"}, }, - primary: any("gitaly-2"), - matchLogs: noChanges, + primary: any("gitaly-2"), }, }, }, @@ -391,16 +298,14 @@ func TestPerRepositoryElector(t *testing.T) { healthyNodes: map[string][]string{ "virtual-storage-1": {"gitaly-1", "gitaly-2", "gitaly-3"}, }, - primary: any("gitaly-1"), - matchLogs: anyChange(primaryChanges{"virtual-storage-1": {"gitaly-1": {"demoted": 0, "promoted": 1}}}), + primary: any("gitaly-1"), }, { healthyNodes: map[string][]string{ "virtual-storage-1": {"gitaly-2", "gitaly-3"}, }, - error: ErrNoPrimary, - primary: noPrimary(), - matchLogs: anyChange(primaryChanges{"virtual-storage-1": {"gitaly-1": {"demoted": 1, "promoted": 0}}}), + error: ErrNoPrimary, + primary: noPrimary(), }, }, }, @@ -429,9 +334,8 @@ func TestPerRepositoryElector(t *testing.T) { healthyNodes: map[string][]string{ "virtual-storage-1": {"gitaly-1"}, }, - error: ErrNoPrimary, - primary: noPrimary(), - matchLogs: noChanges, + error: ErrNoPrimary, + primary: noPrimary(), }, }, }, @@ -460,9 +364,8 @@ func TestPerRepositoryElector(t *testing.T) { healthyNodes: map[string][]string{ "virtual-storage-1": {"gitaly-1"}, }, - error: ErrNoPrimary, - primary: noPrimary(), - matchLogs: noChanges, + error: ErrNoPrimary, + primary: noPrimary(), }, }, }, @@ -491,9 +394,8 @@ func TestPerRepositoryElector(t *testing.T) { healthyNodes: map[string][]string{ "virtual-storage-1": {"gitaly-1"}, }, - error: ErrNoPrimary, - primary: noPrimary(), - matchLogs: noChanges, + error: ErrNoPrimary, + primary: noPrimary(), }, }, }, @@ -567,8 +469,7 @@ func TestPerRepositoryElector(t *testing.T) { healthyNodes: map[string][]string{ "virtual-storage-1": {"gitaly-1"}, }, - primary: any("gitaly-1"), - matchLogs: anyChange(primaryChanges{"virtual-storage-1": {"gitaly-1": {"demoted": 0, "promoted": 1}}}), + primary: any("gitaly-1"), }, }, }, @@ -576,9 +477,8 @@ func TestPerRepositoryElector(t *testing.T) { desc: "repository does not exist", steps: steps{ { - error: commonerr.NewRepositoryNotFoundError("virtual-storage-1", "relative-path-1"), - primary: noPrimary(), - matchLogs: noChanges, + error: commonerr.NewRepositoryNotFoundError("virtual-storage-1", "relative-path-1"), + primary: noPrimary(), }, }, }, @@ -616,27 +516,27 @@ func TestPerRepositoryElector(t *testing.T) { require.NoError(t, err) } + previousPrimary := "" for _, step := range tc.steps { - runElection := func(tx *sql.Tx, matchLogs logMatcher) { - setHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect-0": step.healthyNodes}) + runElection := func(tx *sql.Tx) (string, *logrus.Entry) { + testhelper.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect-0": step.healthyNodes}) - // The first transaction runs first logger, hook := test.NewNullLogger() - elector := NewPerRepositoryElector(logrus.NewEntry(logger), tx) - elector.handleError = func(err error) error { return err } + elector := NewPerRepositoryElector(tx) - trigger := make(chan struct{}, 1) - trigger <- struct{}{} - close(trigger) + primary, err := elector.GetPrimary( + ctxlogrus.ToContext(ctx, logrus.NewEntry(logger)), "virtual-storage-1", "relative-path-1") + require.Equal(t, step.error, err) + require.Less(t, len(hook.Entries), 2) - require.NoError(t, elector.Run(ctx, trigger)) + var entry *logrus.Entry + if len(hook.Entries) == 1 { + entry = &hook.Entries[0] + } - primary, err := elector.GetPrimary(ctx, "virtual-storage-1", "relative-path-1") - assert.Equal(t, step.error, err) - step.primary(t, primary) + require.NoError(t, tx.Commit()) - require.Len(t, hook.Entries, 3) - matchLogs(t, hook.Entries[1]) + return primary, entry } // Run every step with two concurrent transactions to ensure two Praefect's running @@ -654,53 +554,30 @@ func TestPerRepositoryElector(t *testing.T) { require.NoError(t, err) defer txSecond.Rollback() - runElection(txFirst, step.matchLogs) - - require.NoError(t, txFirst.Commit()) + primary, logEntry := runElection(txFirst) + step.primary(t, primary) + + if previousPrimary != primary { + require.NotNil(t, logEntry) + require.Equal(t, "primary node changed", logEntry.Message) + require.Equal(t, logrus.Fields{ + "virtual_storage": "virtual-storage-1", + "relative_path": "relative-path-1", + "current_primary": primary, + "previous_primary": previousPrimary, + }, logEntry.Data) + } else { + require.Nil(t, logEntry) + } // Run the second election on the same database snapshot. This should result in no changes. // Running this prior to the first transaction committing would block. - runElection(txSecond, noChanges) + secondPrimary, secondLogEntry := runElection(txSecond) + require.Equal(t, primary, secondPrimary) + require.Nil(t, secondLogEntry) - require.NoError(t, txSecond.Commit()) + previousPrimary = primary } }) } } - -func TestPerRepositoryElector_Retry(t *testing.T) { - ctx, cancel := testhelper.Context() - defer cancel() - - dbCalls := 0 - handleErrorCalls := 0 - elector := NewPerRepositoryElector( - testhelper.DiscardTestLogger(t), - &glsql.MockQuerier{ - QueryContextFunc: func(context.Context, string, ...interface{}) (*sql.Rows, error) { - dbCalls++ - - return nil, assert.AnError - }, - }, - ) - elector.retryWait = time.Nanosecond - elector.handleError = func(err error) error { - handleErrorCalls++ - require.True(t, errors.Is(err, assert.AnError)) - - if handleErrorCalls == 2 { - return context.Canceled - } - - return nil - } - - // we are only sending one trigger, second attempt must come from the retry logic - trigger := make(chan struct{}, 1) - trigger <- struct{}{} - - require.Equal(t, context.Canceled, elector.Run(ctx, trigger)) - require.Equal(t, 2, dbCalls) - require.Equal(t, 2, handleErrorCalls) -} diff --git a/internal/testhelper/db.go b/internal/testhelper/db.go new file mode 100644 index 000000000..be5cb1c1e --- /dev/null +++ b/internal/testhelper/db.go @@ -0,0 +1,51 @@ +package testhelper + +import ( + "context" + "testing" + + "github.com/lib/pq" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" +) + +// SetHealthyNodes sets the healthy nodes in the database as determined by the passed in map. The healthyNodes map is keyed by +// praefect name -> virtual storage -> storage. On each run, it clears all previous health checks from the table, so the +// passed in nodes are the only ones considered healthy after the function. As the healthy nodes are determined by the time of +// the last successful health check, this should be run in the same transastion as the tested query to prevent flakiness. +// +//nolint:golint +func SetHealthyNodes(t testing.TB, ctx context.Context, db glsql.Querier, healthyNodes map[string]map[string][]string) { + t.Helper() + + var praefects, virtualStorages, storages []string + for praefect, virtualStors := range healthyNodes { + for virtualStorage, stors := range virtualStors { + for _, storage := range stors { + praefects = append(praefects, praefect) + virtualStorages = append(virtualStorages, virtualStorage) + storages = append(storages, storage) + } + } + } + + _, err := db.ExecContext(ctx, ` +WITH clear_previous_checks AS ( DELETE FROM node_status ) + +INSERT INTO node_status (praefect_name, shard_name, node_name, last_contact_attempt_at, last_seen_active_at) +SELECT + unnest($1::text[]) AS praefect_name, + unnest($2::text[]) AS shard_name, + unnest($3::text[]) AS node_name, + NOW() AS last_contact_attempt_at, + NOW() AS last_seen_active_at +ON CONFLICT (praefect_name, shard_name, node_name) DO UPDATE SET + last_contact_attempt_at = NOW(), + last_seen_active_at = NOW() + `, + pq.StringArray(praefects), + pq.StringArray(virtualStorages), + pq.StringArray(storages), + ) + require.NoError(t, err) +} |