diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2021-04-20 12:41:44 +0300 |
---|---|---|
committer | Toon Claes <toon@gitlab.com> | 2021-04-20 12:41:44 +0300 |
commit | 1b1a7c98f9e5207a4ad5dd2303add69732f15ed2 (patch) | |
tree | cdfb4bcd8c20b49dd47e1bcaba6d6db730ceed05 | |
parent | e505e98295e765f945719f289125a98e671a7e19 (diff) |
Separate local health status from consensus in HealthManager
HealthManager is the component used with repository specific primaries
to health check the Gitaly nodes. Currently, it returns the health
consensus as determined by the Praefect nodes. Most of Praefect's
components care about whether the local Praefect node is able to contact
the Gitaly node. The request router doesn't want to route to a Gitaly
which is not healthy to the Praefect even if the consensus is that the
Gitaly is healthy. Likewise, Praefect should not dequeue replication jobs
for Gitaly's which it can't access.
Some other components should operate on the consensus though, such as the
primary elector. Primary elector should only elect nodes that are deemed
healthy by the majority and should not demote nodes which are only locally
unhealthy. Reconciler probably should schedule jobs only for nodes that are
deemed healthy by the consensus but currently uses the Praefect's local
connection health.
To support both cases, HealthManager's HealthyNodes is changed to actually
return the result of the local health check. This is what majority of the
components want. HealthConsensus method is added on the side to pipe the
consensus to the locations that need it right now, namely the primary
elector.
In the future, we should not even load the consensus from the database and
should instead query it directly where needed. It's implemented this way
as we previously also supported the local elector, which did not have access
to the database. As soon as local elector is removed, we should drop the
HealthConsensus method and instead query directly in the database for the
consensus.
-rw-r--r-- | internal/praefect/health_checker.go | 2 | ||||
-rw-r--r-- | internal/praefect/nodes/health_manager.go | 48 | ||||
-rw-r--r-- | internal/praefect/nodes/health_manager_test.go | 108 | ||||
-rw-r--r-- | internal/praefect/nodes/per_repository.go | 16 | ||||
-rw-r--r-- | internal/praefect/nodes/per_repository_test.go | 10 |
5 files changed, 114 insertions, 70 deletions
diff --git a/internal/praefect/health_checker.go b/internal/praefect/health_checker.go index fadbfde00..2bd4721ea 100644 --- a/internal/praefect/health_checker.go +++ b/internal/praefect/health_checker.go @@ -1,6 +1,6 @@ package praefect -// HealthChecker manages information of healthy nodes. +// HealthChecker manages information of locally healthy nodes. type HealthChecker interface { // HealthyNodes gets a list of healthy storages by their virtual storage. HealthyNodes() map[string][]string diff --git a/internal/praefect/nodes/health_manager.go b/internal/praefect/nodes/health_manager.go index 1e37beb81..2a3592694 100644 --- a/internal/praefect/nodes/health_manager.go +++ b/internal/praefect/nodes/health_manager.go @@ -51,9 +51,11 @@ type HealthManager struct { // to be part of the quorum if it has not performed a health check. quorumParticipantTimeout time.Duration - firstUpdate bool - updated chan struct{} - healthyNodes atomic.Value + firstUpdate bool + updated chan struct{} + + locallyHealthy atomic.Value + healthConsensus atomic.Value } // NewHealthManager returns a new health manager that monitors which nodes in the cluster @@ -81,7 +83,8 @@ func NewHealthManager( updated: make(chan struct{}, 1), } - hm.healthyNodes.Store(make(map[string][]string, len(clients))) + hm.locallyHealthy.Store(make(map[string][]string, len(clients))) + hm.healthConsensus.Store(make(map[string][]string, len(clients))) return &hm } @@ -118,16 +121,33 @@ func (hm *HealthManager) Updated() <-chan struct{} { return hm.updated } -// HealthyNodes returns a map of healthy nodes in each virtual storage. The set of -// healthy nodes might include nodes which are not present in the local configuration -// if the cluster's consensus has deemed them healthy. +// HealthyNodes returns a map of healthy nodes in each virtual storage as seen by the latest +// local health check. func (hm *HealthManager) HealthyNodes() map[string][]string { - return hm.healthyNodes.Load().(map[string][]string) + return hm.locallyHealthy.Load().(map[string][]string) +} + +// HealthConsensus returns a map of healthy nodes in each virtual storage as determined by +// the consensus of Praefect nodes. The returned set might include nodes which are not present +// in the local configuration if the cluster's consensus has deemed them healthy. +func (hm *HealthManager) HealthConsensus() map[string][]string { + return hm.healthConsensus.Load().(map[string][]string) } func (hm *HealthManager) updateHealthChecks(ctx context.Context) error { virtualStorages, physicalStorages, healthy := hm.performHealthChecks(ctx) + locallyHealthy := map[string][]string{} + for i := range virtualStorages { + if !healthy[i] { + continue + } + + locallyHealthy[virtualStorages[i]] = append(locallyHealthy[virtualStorages[i]], physicalStorages[i]) + } + + hm.locallyHealthy.Store(locallyHealthy) + rows, err := hm.db.QueryContext(ctx, ` WITH updated_checks AS ( INSERT INTO node_status (praefect_name, shard_name, node_name, last_contact_attempt_at, last_seen_active_at) @@ -191,23 +211,23 @@ ORDER BY shard_name, node_name } }() - currentlyHealthy := make(map[string][]string, len(physicalStorages)) + healthConsensus := make(map[string][]string, len(physicalStorages)) for rows.Next() { var virtualStorage, storage string if err := rows.Scan(&virtualStorage, &storage); err != nil { return fmt.Errorf("scan: %w", err) } - currentlyHealthy[virtualStorage] = append(currentlyHealthy[virtualStorage], storage) + healthConsensus[virtualStorage] = append(healthConsensus[virtualStorage], storage) } if err := rows.Err(); err != nil { return fmt.Errorf("rows: %w", err) } - if hm.firstUpdate || hm.hasHealthySetChanged(currentlyHealthy) { + if hm.firstUpdate || hm.hasHealthConsensusChanged(healthConsensus) { hm.firstUpdate = false - hm.healthyNodes.Store(currentlyHealthy) + hm.healthConsensus.Store(healthConsensus) select { case hm.updated <- struct{}{}: default: @@ -261,8 +281,8 @@ func (hm *HealthManager) performHealthChecks(ctx context.Context) ([]string, []s return virtualStorages, physicalStorages, healthy } -func (hm *HealthManager) hasHealthySetChanged(currentlyHealthy map[string][]string) bool { - previouslyHealthy := hm.HealthyNodes() +func (hm *HealthManager) hasHealthConsensusChanged(currentlyHealthy map[string][]string) bool { + previouslyHealthy := hm.HealthConsensus() if len(previouslyHealthy) != len(currentlyHealthy) { return true diff --git a/internal/praefect/nodes/health_manager_test.go b/internal/praefect/nodes/health_manager_test.go index 4e4a94b64..d2fccc62b 100644 --- a/internal/praefect/nodes/health_manager_test.go +++ b/internal/praefect/nodes/health_manager_test.go @@ -4,6 +4,7 @@ package nodes import ( "context" + "sort" "testing" "time" @@ -31,11 +32,11 @@ func TestHealthManager(t *testing.T) { type LocalStatus map[string]map[string]bool type HealthChecks []struct { - After time.Duration - PraefectName string - LocalStatus LocalStatus - Updated bool - HealthyNodes map[string][]string + After time.Duration + PraefectName string + LocalStatus LocalStatus + Updated bool + HealthConsensus map[string][]string } for _, tc := range []struct { @@ -62,7 +63,7 @@ func TestHealthManager(t *testing.T) { }, }, Updated: true, - HealthyNodes: map[string][]string{ + HealthConsensus: map[string][]string{ "virtual-storage-1": {"gitaly-1", "gitaly-2"}, "virtual-storage-2": {"gitaly-1"}, }, @@ -79,8 +80,8 @@ func TestHealthManager(t *testing.T) { "gitaly-1": false, }, }, - Updated: true, - HealthyNodes: map[string][]string{}, + Updated: true, + HealthConsensus: map[string][]string{}, }, { PraefectName: "praefect-1", @@ -90,7 +91,7 @@ func TestHealthManager(t *testing.T) { }, }, Updated: true, - HealthyNodes: map[string][]string{ + HealthConsensus: map[string][]string{ "virtual-storage-1": {"gitaly-1"}, }, }, @@ -107,7 +108,7 @@ func TestHealthManager(t *testing.T) { }, }, Updated: true, - HealthyNodes: map[string][]string{ + HealthConsensus: map[string][]string{ "virtual-storage-1": {"gitaly-1"}, }, }, @@ -118,7 +119,7 @@ func TestHealthManager(t *testing.T) { "gitaly-1": false, }, }, - HealthyNodes: map[string][]string{ + HealthConsensus: map[string][]string{ "virtual-storage-1": {"gitaly-1"}, }, }, @@ -135,7 +136,7 @@ func TestHealthManager(t *testing.T) { }, }, Updated: true, - HealthyNodes: map[string][]string{ + HealthConsensus: map[string][]string{ "virtual-storage-1": {"gitaly-1"}, }, }, @@ -147,8 +148,8 @@ func TestHealthManager(t *testing.T) { "gitaly-1": false, }, }, - Updated: true, - HealthyNodes: map[string][]string{}, + Updated: true, + HealthConsensus: map[string][]string{}, }, }, }, @@ -162,8 +163,8 @@ func TestHealthManager(t *testing.T) { "gitaly-1": false, }, }, - Updated: true, - HealthyNodes: map[string][]string{}, + Updated: true, + HealthConsensus: map[string][]string{}, }, { PraefectName: "praefect-2", @@ -172,8 +173,8 @@ func TestHealthManager(t *testing.T) { "gitaly-1": false, }, }, - Updated: true, - HealthyNodes: map[string][]string{}, + Updated: true, + HealthConsensus: map[string][]string{}, }, { After: activePraefectTimeout, @@ -184,7 +185,7 @@ func TestHealthManager(t *testing.T) { }, }, Updated: true, - HealthyNodes: map[string][]string{ + HealthConsensus: map[string][]string{ "virtual-storage-1": {"gitaly-1"}, }, }, @@ -201,7 +202,7 @@ func TestHealthManager(t *testing.T) { }, }, Updated: true, - HealthyNodes: map[string][]string{ + HealthConsensus: map[string][]string{ "virtual-storage-1": {"configured"}, }, }, @@ -213,7 +214,7 @@ func TestHealthManager(t *testing.T) { }, }, Updated: true, - HealthyNodes: map[string][]string{ + HealthConsensus: map[string][]string{ "virtual-storage-1": {"configured"}, }, }, @@ -226,7 +227,7 @@ func TestHealthManager(t *testing.T) { }, }, Updated: true, - HealthyNodes: map[string][]string{ + HealthConsensus: map[string][]string{ "virtual-storage-1": {"configured"}, }, }, @@ -244,7 +245,7 @@ func TestHealthManager(t *testing.T) { }, }, Updated: true, - HealthyNodes: map[string][]string{ + HealthConsensus: map[string][]string{ "virtual-storage-1": {"configured", "unconfigured"}, }, }, @@ -257,7 +258,7 @@ func TestHealthManager(t *testing.T) { }, }, Updated: true, - HealthyNodes: map[string][]string{ + HealthConsensus: map[string][]string{ "virtual-storage-1": {"configured", "unconfigured"}, }, }, @@ -269,7 +270,7 @@ func TestHealthManager(t *testing.T) { }, }, Updated: true, - HealthyNodes: map[string][]string{ + HealthConsensus: map[string][]string{ "virtual-storage-1": {"configured", "unconfigured"}, }, }, @@ -286,8 +287,8 @@ func TestHealthManager(t *testing.T) { "gitaly-1": false, }, }, - Updated: true, - HealthyNodes: map[string][]string{}, + Updated: true, + HealthConsensus: map[string][]string{}, }, { PraefectName: "praefect-2", @@ -297,7 +298,7 @@ func TestHealthManager(t *testing.T) { }, }, Updated: true, - HealthyNodes: map[string][]string{ + HealthConsensus: map[string][]string{ "virtual-storage-1": {"gitaly-1"}, }, }, @@ -309,7 +310,7 @@ func TestHealthManager(t *testing.T) { }, }, Updated: true, - HealthyNodes: map[string][]string{ + HealthConsensus: map[string][]string{ "virtual-storage-1": {"gitaly-1"}, }, }, @@ -326,7 +327,7 @@ func TestHealthManager(t *testing.T) { }, }, Updated: true, - HealthyNodes: map[string][]string{ + HealthConsensus: map[string][]string{ "virtual-storage-1": {"gitaly-1"}, }, }, @@ -338,7 +339,7 @@ func TestHealthManager(t *testing.T) { }, }, Updated: true, - HealthyNodes: map[string][]string{ + HealthConsensus: map[string][]string{ "virtual-storage-1": {"gitaly-1"}, }, }, @@ -349,8 +350,8 @@ func TestHealthManager(t *testing.T) { "gitaly-1": false, }, }, - Updated: true, - HealthyNodes: map[string][]string{}, + Updated: true, + HealthConsensus: map[string][]string{}, }, }, }, @@ -364,8 +365,8 @@ func TestHealthManager(t *testing.T) { "gitaly-1": false, }, }, - Updated: true, - HealthyNodes: map[string][]string{}, + Updated: true, + HealthConsensus: map[string][]string{}, }, { PraefectName: "praefect-1", @@ -374,7 +375,7 @@ func TestHealthManager(t *testing.T) { "gitaly-1": false, }, }, - HealthyNodes: map[string][]string{}, + HealthConsensus: map[string][]string{}, }, }, }, @@ -390,7 +391,7 @@ func TestHealthManager(t *testing.T) { }, }, Updated: true, - HealthyNodes: map[string][]string{ + HealthConsensus: map[string][]string{ "virtual-storage-1": {"gitaly-1"}, }, }, @@ -403,7 +404,7 @@ func TestHealthManager(t *testing.T) { }, }, Updated: true, - HealthyNodes: map[string][]string{ + HealthConsensus: map[string][]string{ "virtual-storage-1": {"gitaly-1", "gitaly-2"}, }, }, @@ -420,7 +421,7 @@ func TestHealthManager(t *testing.T) { }, }, Updated: true, - HealthyNodes: map[string][]string{ + HealthConsensus: map[string][]string{ "virtual-storage-1": {"gitaly-1"}, }, }, @@ -431,7 +432,7 @@ func TestHealthManager(t *testing.T) { "gitaly-1": true, }, }, - HealthyNodes: map[string][]string{ + HealthConsensus: map[string][]string{ "virtual-storage-1": {"gitaly-1"}, }, }, @@ -449,7 +450,7 @@ func TestHealthManager(t *testing.T) { }, }, Updated: true, - HealthyNodes: map[string][]string{ + HealthConsensus: map[string][]string{ "virtual-storage-1": {"gitaly-1"}, }, }, @@ -463,7 +464,7 @@ func TestHealthManager(t *testing.T) { }, }, Updated: true, - HealthyNodes: map[string][]string{ + HealthConsensus: map[string][]string{ "virtual-storage-1": {"gitaly-2"}, }, }, @@ -518,9 +519,30 @@ func TestHealthManager(t *testing.T) { predateHealthChecks(t, db, hc.After) } + expectedHealthyNodes := map[string][]string{} + for virtualStorage, storages := range hc.LocalStatus { + for storage, healthy := range storages { + if !healthy { + continue + } + + expectedHealthyNodes[virtualStorage] = append(expectedHealthyNodes[virtualStorage], storage) + } + + sort.Strings(expectedHealthyNodes[virtualStorage]) + } + runCtx, cancelRun := context.WithCancel(ctx) require.Equal(t, context.Canceled, hm.Run(runCtx, helper.NewCountTicker(1, cancelRun))) - require.Equal(t, hc.HealthyNodes, hm.HealthyNodes(), "health check %d", i+1) + + // we need to sort the storages so the require.Equal matches, ElementsMatch does not work with a map. + actualHealthyNodes := hm.HealthyNodes() + for _, storages := range actualHealthyNodes { + sort.Strings(storages) + } + + require.Equal(t, expectedHealthyNodes, actualHealthyNodes, "health check %d", i+1) + require.Equal(t, hc.HealthConsensus, hm.HealthConsensus(), "health check %d", i+1) updated := false select { diff --git a/internal/praefect/nodes/per_repository.go b/internal/praefect/nodes/per_repository.go index 637a18f1a..a65ec67a5 100644 --- a/internal/praefect/nodes/per_repository.go +++ b/internal/praefect/nodes/per_repository.go @@ -22,19 +22,21 @@ var ErrNoPrimary = errors.New("no primary") type PerRepositoryElector struct { log logrus.FieldLogger db glsql.Querier - hc HealthChecker + hc HealthConsensus handleError func(error) error retryWait time.Duration } -// HealthChecker maintains node health statuses. -type HealthChecker interface { - // HealthyNodes returns lists of healthy nodes by virtual storages. - HealthyNodes() map[string][]string +// HealthConsensus returns the cluster's consensus of healthy nodes. +type HealthConsensus interface { + // HealthConsensus returns a list of healthy nodes by cluster consensus. Returned + // set may contains nodes not present in the local configuration if the cluster has + // deemed them healthy. + HealthConsensus() map[string][]string } // NewPerRepositoryElector returns a new per repository primary elector. -func NewPerRepositoryElector(log logrus.FieldLogger, db glsql.Querier, hc HealthChecker) *PerRepositoryElector { +func NewPerRepositoryElector(log logrus.FieldLogger, db glsql.Querier, hc HealthConsensus) *PerRepositoryElector { log = log.WithField("component", "PerRepositoryElector") return &PerRepositoryElector{ log: log, @@ -91,7 +93,7 @@ func (pr *PerRepositoryElector) Run(ctx context.Context, trigger <-chan struct{} } func (pr *PerRepositoryElector) performFailovers(ctx context.Context) error { - healthyNodes := pr.hc.HealthyNodes() + healthyNodes := pr.hc.HealthConsensus() var virtualStorages, physicalStorages []string for virtualStorage, nodes := range healthyNodes { diff --git a/internal/praefect/nodes/per_repository_test.go b/internal/praefect/nodes/per_repository_test.go index d416f8998..e5c9f317e 100644 --- a/internal/praefect/nodes/per_repository_test.go +++ b/internal/praefect/nodes/per_repository_test.go @@ -17,10 +17,10 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/testhelper" ) -// HealthCheckerFunc is an adapter to turn a conforming function in to a HealthChecker. -type HealthCheckerFunc func() map[string][]string +// HealthConsensusFunc is an adapter to turn a conforming function in to a HealthConsensus. +type HealthConsensusFunc func() map[string][]string -func (fn HealthCheckerFunc) HealthyNodes() map[string][]string { return fn() } +func (fn HealthConsensusFunc) HealthConsensus() map[string][]string { return fn() } func TestPerRepositoryElector(t *testing.T) { ctx, cancel := testhelper.Context() @@ -448,7 +448,7 @@ func TestPerRepositoryElector(t *testing.T) { for _, step := range tc.steps { elector := NewPerRepositoryElector(testhelper.DiscardTestLogger(t), db, - HealthCheckerFunc(func() map[string][]string { return step.healthyNodes }), + HealthConsensusFunc(func() map[string][]string { return step.healthyNodes }), ) elector.handleError = func(err error) error { return err } @@ -480,7 +480,7 @@ func TestPerRepositoryElector_Retry(t *testing.T) { return nil, assert.AnError }, }, - HealthCheckerFunc(func() map[string][]string { return map[string][]string{} }), + HealthConsensusFunc(func() map[string][]string { return map[string][]string{} }), ) elector.retryWait = time.Nanosecond elector.handleError = func(err error) error { |