diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2021-05-25 18:21:22 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2021-05-25 18:21:22 +0300 |
commit | 6768fe2d8a6f413a537d03cdc0daf2ca5dc32dc3 (patch) | |
tree | 358f025467bbbf0785e67aba835fc99cccce418d | |
parent | d0083f4c828772537e6891cae4fe0df1f6b255f4 (diff) |
Query the health consensus directly in PerRepositoryElector
PerRepositoryElector currently gets the health consensus from the
HealthManager. This is an unnecessary loop as the primary elector
could query for the consensus directly from the database. This was
originally implemented like this to ease testing and avoid duplicating
the query logic. With the query logic extracted into a view, let's
query the view directly.
-rw-r--r-- | cmd/praefect/main.go | 2 | ||||
-rw-r--r-- | internal/praefect/nodes/per_repository.go | 31 | ||||
-rw-r--r-- | internal/praefect/nodes/per_repository_test.go | 42 |
3 files changed, 39 insertions, 36 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index 3f8ee04d1..0f014b76c 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -321,7 +321,7 @@ func run(cfgs []starter.Config, conf config.Config) error { }() healthChecker = hm - elector := nodes.NewPerRepositoryElector(logger, db, hm) + elector := nodes.NewPerRepositoryElector(logger, db) if conf.Failover.Enabled { go func() { diff --git a/internal/praefect/nodes/per_repository.go b/internal/praefect/nodes/per_repository.go index 62c4d5777..a5ea2c908 100644 --- a/internal/praefect/nodes/per_repository.go +++ b/internal/praefect/nodes/per_repository.go @@ -7,7 +7,6 @@ import ( "fmt" "time" - "github.com/lib/pq" "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/internal/praefect/commonerr" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql" @@ -22,26 +21,16 @@ var ErrNoPrimary = errors.New("no primary") type PerRepositoryElector struct { log logrus.FieldLogger db glsql.Querier - hc HealthConsensus handleError func(error) error retryWait time.Duration } -// HealthConsensus returns the cluster's consensus of healthy nodes. -type HealthConsensus interface { - // HealthConsensus returns a list of healthy nodes by cluster consensus. Returned - // set may contains nodes not present in the local configuration if the cluster has - // deemed them healthy. - HealthConsensus() map[string][]string -} - // NewPerRepositoryElector returns a new per repository primary elector. -func NewPerRepositoryElector(log logrus.FieldLogger, db glsql.Querier, hc HealthConsensus) *PerRepositoryElector { +func NewPerRepositoryElector(log logrus.FieldLogger, db glsql.Querier) *PerRepositoryElector { log = log.WithField("component", "PerRepositoryElector") return &PerRepositoryElector{ log: log, db: db, - hc: hc, handleError: func(err error) error { log.WithError(err).Error("failed performing failovers") return nil @@ -97,22 +86,8 @@ func (pr *PerRepositoryElector) Run(ctx context.Context, trigger <-chan struct{} } func (pr *PerRepositoryElector) performFailovers(ctx context.Context) error { - healthyNodes := pr.hc.HealthConsensus() - - var virtualStorages, physicalStorages []string - for virtualStorage, nodes := range healthyNodes { - for _, node := range nodes { - virtualStorages = append(virtualStorages, virtualStorage) - physicalStorages = append(physicalStorages, node) - } - } - rows, err := pr.db.QueryContext(ctx, ` -WITH healthy_storages AS ( - SELECT unnest($1::text[]) AS virtual_storage, unnest($2::text[]) AS storage -), - -updated AS ( +WITH updated AS ( UPDATE repositories SET "primary" = ( SELECT storage @@ -169,7 +144,7 @@ promoted AS ( SELECT virtual_storage, storage, COALESCE(demoted, 0), COALESCE(promoted, 0) FROM demoted FULL JOIN promoted USING (virtual_storage, storage) -`, pq.StringArray(virtualStorages), pq.StringArray(physicalStorages)) + `) if err != nil { return fmt.Errorf("query: %w", err) } diff --git a/internal/praefect/nodes/per_repository_test.go b/internal/praefect/nodes/per_repository_test.go index 067b132e4..634738226 100644 --- a/internal/praefect/nodes/per_repository_test.go +++ b/internal/praefect/nodes/per_repository_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/lib/pq" "github.com/sirupsen/logrus" "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" @@ -19,10 +20,38 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/testhelper" ) -// HealthConsensusFunc is an adapter to turn a conforming function in to a HealthConsensus. -type HealthConsensusFunc func() map[string][]string +func setHealthyNodes(t testing.TB, ctx context.Context, db glsql.Querier, healthyNodes map[string]map[string][]string) { + var praefects, virtualStorages, storages []string + for praefect, virtualStors := range healthyNodes { + for virtualStorage, stors := range virtualStors { + for _, storage := range stors { + praefects = append(praefects, praefect) + virtualStorages = append(virtualStorages, virtualStorage) + storages = append(storages, storage) + } + } + } -func (fn HealthConsensusFunc) HealthConsensus() map[string][]string { return fn() } + _, err := db.ExecContext(ctx, ` +WITH clear_previous_checks AS ( DELETE FROM node_status ) + +INSERT INTO node_status (praefect_name, shard_name, node_name, last_contact_attempt_at, last_seen_active_at) +SELECT + unnest($1::text[]) AS praefect_name, + unnest($2::text[]) AS shard_name, + unnest($3::text[]) AS node_name, + NOW() AS last_contact_attempt_at, + NOW() AS last_seen_active_at +ON CONFLICT (praefect_name, shard_name, node_name) DO UPDATE SET + last_contact_attempt_at = NOW(), + last_seen_active_at = NOW() + `, + pq.StringArray(praefects), + pq.StringArray(virtualStorages), + pq.StringArray(storages), + ) + require.NoError(t, err) +} func TestPerRepositoryElector(t *testing.T) { ctx, cancel := testhelper.Context() @@ -503,11 +532,11 @@ func TestPerRepositoryElector(t *testing.T) { for _, step := range tc.steps { runElection := func(tx *sql.Tx, matchLogs logMatcher) { + setHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect-0": step.healthyNodes}) + // The first transaction runs first logger, hook := test.NewNullLogger() - elector := NewPerRepositoryElector(logrus.NewEntry(logger), tx, - HealthConsensusFunc(func() map[string][]string { return step.healthyNodes }), - ) + elector := NewPerRepositoryElector(logrus.NewEntry(logger), tx) elector.handleError = func(err error) error { return err } trigger := make(chan struct{}, 1) @@ -568,7 +597,6 @@ func TestPerRepositoryElector_Retry(t *testing.T) { return nil, assert.AnError }, }, - HealthConsensusFunc(func() map[string][]string { return map[string][]string{} }), ) elector.retryWait = time.Nanosecond elector.handleError = func(err error) error { |