Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2021-04-20 12:41:44 +0300
committerToon Claes <toon@gitlab.com>2021-04-20 12:41:44 +0300
commit1b1a7c98f9e5207a4ad5dd2303add69732f15ed2 (patch)
treecdfb4bcd8c20b49dd47e1bcaba6d6db730ceed05
parente505e98295e765f945719f289125a98e671a7e19 (diff)
Separate local health status from consensus in HealthManager
HealthManager is the component used with repository specific primaries to health check the Gitaly nodes. Currently, it returns the health consensus as determined by the Praefect nodes. Most of Praefect's components care about whether the local Praefect node is able to contact the Gitaly node. The request router doesn't want to route to a Gitaly which is not healthy to the Praefect even if the consensus is that the Gitaly is healthy. Likewise, Praefect should not dequeue replication jobs for Gitaly's which it can't access. Some other components should operate on the consensus though, such as the primary elector. Primary elector should only elect nodes that are deemed healthy by the majority and should not demote nodes which are only locally unhealthy. Reconciler probably should schedule jobs only for nodes that are deemed healthy by the consensus but currently uses the Praefect's local connection health. To support both cases, HealthManager's HealthyNodes is changed to actually return the result of the local health check. This is what majority of the components want. HealthConsensus method is added on the side to pipe the consensus to the locations that need it right now, namely the primary elector. In the future, we should not even load the consensus from the database and should instead query it directly where needed. It's implemented this way as we previously also supported the local elector, which did not have access to the database. As soon as local elector is removed, we should drop the HealthConsensus method and instead query directly in the database for the consensus.
-rw-r--r--internal/praefect/health_checker.go2
-rw-r--r--internal/praefect/nodes/health_manager.go48
-rw-r--r--internal/praefect/nodes/health_manager_test.go108
-rw-r--r--internal/praefect/nodes/per_repository.go16
-rw-r--r--internal/praefect/nodes/per_repository_test.go10
5 files changed, 114 insertions, 70 deletions
diff --git a/internal/praefect/health_checker.go b/internal/praefect/health_checker.go
index fadbfde00..2bd4721ea 100644
--- a/internal/praefect/health_checker.go
+++ b/internal/praefect/health_checker.go
@@ -1,6 +1,6 @@
package praefect
-// HealthChecker manages information of healthy nodes.
+// HealthChecker manages information of locally healthy nodes.
type HealthChecker interface {
// HealthyNodes gets a list of healthy storages by their virtual storage.
HealthyNodes() map[string][]string
diff --git a/internal/praefect/nodes/health_manager.go b/internal/praefect/nodes/health_manager.go
index 1e37beb81..2a3592694 100644
--- a/internal/praefect/nodes/health_manager.go
+++ b/internal/praefect/nodes/health_manager.go
@@ -51,9 +51,11 @@ type HealthManager struct {
// to be part of the quorum if it has not performed a health check.
quorumParticipantTimeout time.Duration
- firstUpdate bool
- updated chan struct{}
- healthyNodes atomic.Value
+ firstUpdate bool
+ updated chan struct{}
+
+ locallyHealthy atomic.Value
+ healthConsensus atomic.Value
}
// NewHealthManager returns a new health manager that monitors which nodes in the cluster
@@ -81,7 +83,8 @@ func NewHealthManager(
updated: make(chan struct{}, 1),
}
- hm.healthyNodes.Store(make(map[string][]string, len(clients)))
+ hm.locallyHealthy.Store(make(map[string][]string, len(clients)))
+ hm.healthConsensus.Store(make(map[string][]string, len(clients)))
return &hm
}
@@ -118,16 +121,33 @@ func (hm *HealthManager) Updated() <-chan struct{} {
return hm.updated
}
-// HealthyNodes returns a map of healthy nodes in each virtual storage. The set of
-// healthy nodes might include nodes which are not present in the local configuration
-// if the cluster's consensus has deemed them healthy.
+// HealthyNodes returns a map of healthy nodes in each virtual storage as seen by the latest
+// local health check.
func (hm *HealthManager) HealthyNodes() map[string][]string {
- return hm.healthyNodes.Load().(map[string][]string)
+ return hm.locallyHealthy.Load().(map[string][]string)
+}
+
+// HealthConsensus returns a map of healthy nodes in each virtual storage as determined by
+// the consensus of Praefect nodes. The returned set might include nodes which are not present
+// in the local configuration if the cluster's consensus has deemed them healthy.
+func (hm *HealthManager) HealthConsensus() map[string][]string {
+ return hm.healthConsensus.Load().(map[string][]string)
}
func (hm *HealthManager) updateHealthChecks(ctx context.Context) error {
virtualStorages, physicalStorages, healthy := hm.performHealthChecks(ctx)
+ locallyHealthy := map[string][]string{}
+ for i := range virtualStorages {
+ if !healthy[i] {
+ continue
+ }
+
+ locallyHealthy[virtualStorages[i]] = append(locallyHealthy[virtualStorages[i]], physicalStorages[i])
+ }
+
+ hm.locallyHealthy.Store(locallyHealthy)
+
rows, err := hm.db.QueryContext(ctx, `
WITH updated_checks AS (
INSERT INTO node_status (praefect_name, shard_name, node_name, last_contact_attempt_at, last_seen_active_at)
@@ -191,23 +211,23 @@ ORDER BY shard_name, node_name
}
}()
- currentlyHealthy := make(map[string][]string, len(physicalStorages))
+ healthConsensus := make(map[string][]string, len(physicalStorages))
for rows.Next() {
var virtualStorage, storage string
if err := rows.Scan(&virtualStorage, &storage); err != nil {
return fmt.Errorf("scan: %w", err)
}
- currentlyHealthy[virtualStorage] = append(currentlyHealthy[virtualStorage], storage)
+ healthConsensus[virtualStorage] = append(healthConsensus[virtualStorage], storage)
}
if err := rows.Err(); err != nil {
return fmt.Errorf("rows: %w", err)
}
- if hm.firstUpdate || hm.hasHealthySetChanged(currentlyHealthy) {
+ if hm.firstUpdate || hm.hasHealthConsensusChanged(healthConsensus) {
hm.firstUpdate = false
- hm.healthyNodes.Store(currentlyHealthy)
+ hm.healthConsensus.Store(healthConsensus)
select {
case hm.updated <- struct{}{}:
default:
@@ -261,8 +281,8 @@ func (hm *HealthManager) performHealthChecks(ctx context.Context) ([]string, []s
return virtualStorages, physicalStorages, healthy
}
-func (hm *HealthManager) hasHealthySetChanged(currentlyHealthy map[string][]string) bool {
- previouslyHealthy := hm.HealthyNodes()
+func (hm *HealthManager) hasHealthConsensusChanged(currentlyHealthy map[string][]string) bool {
+ previouslyHealthy := hm.HealthConsensus()
if len(previouslyHealthy) != len(currentlyHealthy) {
return true
diff --git a/internal/praefect/nodes/health_manager_test.go b/internal/praefect/nodes/health_manager_test.go
index 4e4a94b64..d2fccc62b 100644
--- a/internal/praefect/nodes/health_manager_test.go
+++ b/internal/praefect/nodes/health_manager_test.go
@@ -4,6 +4,7 @@ package nodes
import (
"context"
+ "sort"
"testing"
"time"
@@ -31,11 +32,11 @@ func TestHealthManager(t *testing.T) {
type LocalStatus map[string]map[string]bool
type HealthChecks []struct {
- After time.Duration
- PraefectName string
- LocalStatus LocalStatus
- Updated bool
- HealthyNodes map[string][]string
+ After time.Duration
+ PraefectName string
+ LocalStatus LocalStatus
+ Updated bool
+ HealthConsensus map[string][]string
}
for _, tc := range []struct {
@@ -62,7 +63,7 @@ func TestHealthManager(t *testing.T) {
},
},
Updated: true,
- HealthyNodes: map[string][]string{
+ HealthConsensus: map[string][]string{
"virtual-storage-1": {"gitaly-1", "gitaly-2"},
"virtual-storage-2": {"gitaly-1"},
},
@@ -79,8 +80,8 @@ func TestHealthManager(t *testing.T) {
"gitaly-1": false,
},
},
- Updated: true,
- HealthyNodes: map[string][]string{},
+ Updated: true,
+ HealthConsensus: map[string][]string{},
},
{
PraefectName: "praefect-1",
@@ -90,7 +91,7 @@ func TestHealthManager(t *testing.T) {
},
},
Updated: true,
- HealthyNodes: map[string][]string{
+ HealthConsensus: map[string][]string{
"virtual-storage-1": {"gitaly-1"},
},
},
@@ -107,7 +108,7 @@ func TestHealthManager(t *testing.T) {
},
},
Updated: true,
- HealthyNodes: map[string][]string{
+ HealthConsensus: map[string][]string{
"virtual-storage-1": {"gitaly-1"},
},
},
@@ -118,7 +119,7 @@ func TestHealthManager(t *testing.T) {
"gitaly-1": false,
},
},
- HealthyNodes: map[string][]string{
+ HealthConsensus: map[string][]string{
"virtual-storage-1": {"gitaly-1"},
},
},
@@ -135,7 +136,7 @@ func TestHealthManager(t *testing.T) {
},
},
Updated: true,
- HealthyNodes: map[string][]string{
+ HealthConsensus: map[string][]string{
"virtual-storage-1": {"gitaly-1"},
},
},
@@ -147,8 +148,8 @@ func TestHealthManager(t *testing.T) {
"gitaly-1": false,
},
},
- Updated: true,
- HealthyNodes: map[string][]string{},
+ Updated: true,
+ HealthConsensus: map[string][]string{},
},
},
},
@@ -162,8 +163,8 @@ func TestHealthManager(t *testing.T) {
"gitaly-1": false,
},
},
- Updated: true,
- HealthyNodes: map[string][]string{},
+ Updated: true,
+ HealthConsensus: map[string][]string{},
},
{
PraefectName: "praefect-2",
@@ -172,8 +173,8 @@ func TestHealthManager(t *testing.T) {
"gitaly-1": false,
},
},
- Updated: true,
- HealthyNodes: map[string][]string{},
+ Updated: true,
+ HealthConsensus: map[string][]string{},
},
{
After: activePraefectTimeout,
@@ -184,7 +185,7 @@ func TestHealthManager(t *testing.T) {
},
},
Updated: true,
- HealthyNodes: map[string][]string{
+ HealthConsensus: map[string][]string{
"virtual-storage-1": {"gitaly-1"},
},
},
@@ -201,7 +202,7 @@ func TestHealthManager(t *testing.T) {
},
},
Updated: true,
- HealthyNodes: map[string][]string{
+ HealthConsensus: map[string][]string{
"virtual-storage-1": {"configured"},
},
},
@@ -213,7 +214,7 @@ func TestHealthManager(t *testing.T) {
},
},
Updated: true,
- HealthyNodes: map[string][]string{
+ HealthConsensus: map[string][]string{
"virtual-storage-1": {"configured"},
},
},
@@ -226,7 +227,7 @@ func TestHealthManager(t *testing.T) {
},
},
Updated: true,
- HealthyNodes: map[string][]string{
+ HealthConsensus: map[string][]string{
"virtual-storage-1": {"configured"},
},
},
@@ -244,7 +245,7 @@ func TestHealthManager(t *testing.T) {
},
},
Updated: true,
- HealthyNodes: map[string][]string{
+ HealthConsensus: map[string][]string{
"virtual-storage-1": {"configured", "unconfigured"},
},
},
@@ -257,7 +258,7 @@ func TestHealthManager(t *testing.T) {
},
},
Updated: true,
- HealthyNodes: map[string][]string{
+ HealthConsensus: map[string][]string{
"virtual-storage-1": {"configured", "unconfigured"},
},
},
@@ -269,7 +270,7 @@ func TestHealthManager(t *testing.T) {
},
},
Updated: true,
- HealthyNodes: map[string][]string{
+ HealthConsensus: map[string][]string{
"virtual-storage-1": {"configured", "unconfigured"},
},
},
@@ -286,8 +287,8 @@ func TestHealthManager(t *testing.T) {
"gitaly-1": false,
},
},
- Updated: true,
- HealthyNodes: map[string][]string{},
+ Updated: true,
+ HealthConsensus: map[string][]string{},
},
{
PraefectName: "praefect-2",
@@ -297,7 +298,7 @@ func TestHealthManager(t *testing.T) {
},
},
Updated: true,
- HealthyNodes: map[string][]string{
+ HealthConsensus: map[string][]string{
"virtual-storage-1": {"gitaly-1"},
},
},
@@ -309,7 +310,7 @@ func TestHealthManager(t *testing.T) {
},
},
Updated: true,
- HealthyNodes: map[string][]string{
+ HealthConsensus: map[string][]string{
"virtual-storage-1": {"gitaly-1"},
},
},
@@ -326,7 +327,7 @@ func TestHealthManager(t *testing.T) {
},
},
Updated: true,
- HealthyNodes: map[string][]string{
+ HealthConsensus: map[string][]string{
"virtual-storage-1": {"gitaly-1"},
},
},
@@ -338,7 +339,7 @@ func TestHealthManager(t *testing.T) {
},
},
Updated: true,
- HealthyNodes: map[string][]string{
+ HealthConsensus: map[string][]string{
"virtual-storage-1": {"gitaly-1"},
},
},
@@ -349,8 +350,8 @@ func TestHealthManager(t *testing.T) {
"gitaly-1": false,
},
},
- Updated: true,
- HealthyNodes: map[string][]string{},
+ Updated: true,
+ HealthConsensus: map[string][]string{},
},
},
},
@@ -364,8 +365,8 @@ func TestHealthManager(t *testing.T) {
"gitaly-1": false,
},
},
- Updated: true,
- HealthyNodes: map[string][]string{},
+ Updated: true,
+ HealthConsensus: map[string][]string{},
},
{
PraefectName: "praefect-1",
@@ -374,7 +375,7 @@ func TestHealthManager(t *testing.T) {
"gitaly-1": false,
},
},
- HealthyNodes: map[string][]string{},
+ HealthConsensus: map[string][]string{},
},
},
},
@@ -390,7 +391,7 @@ func TestHealthManager(t *testing.T) {
},
},
Updated: true,
- HealthyNodes: map[string][]string{
+ HealthConsensus: map[string][]string{
"virtual-storage-1": {"gitaly-1"},
},
},
@@ -403,7 +404,7 @@ func TestHealthManager(t *testing.T) {
},
},
Updated: true,
- HealthyNodes: map[string][]string{
+ HealthConsensus: map[string][]string{
"virtual-storage-1": {"gitaly-1", "gitaly-2"},
},
},
@@ -420,7 +421,7 @@ func TestHealthManager(t *testing.T) {
},
},
Updated: true,
- HealthyNodes: map[string][]string{
+ HealthConsensus: map[string][]string{
"virtual-storage-1": {"gitaly-1"},
},
},
@@ -431,7 +432,7 @@ func TestHealthManager(t *testing.T) {
"gitaly-1": true,
},
},
- HealthyNodes: map[string][]string{
+ HealthConsensus: map[string][]string{
"virtual-storage-1": {"gitaly-1"},
},
},
@@ -449,7 +450,7 @@ func TestHealthManager(t *testing.T) {
},
},
Updated: true,
- HealthyNodes: map[string][]string{
+ HealthConsensus: map[string][]string{
"virtual-storage-1": {"gitaly-1"},
},
},
@@ -463,7 +464,7 @@ func TestHealthManager(t *testing.T) {
},
},
Updated: true,
- HealthyNodes: map[string][]string{
+ HealthConsensus: map[string][]string{
"virtual-storage-1": {"gitaly-2"},
},
},
@@ -518,9 +519,30 @@ func TestHealthManager(t *testing.T) {
predateHealthChecks(t, db, hc.After)
}
+ expectedHealthyNodes := map[string][]string{}
+ for virtualStorage, storages := range hc.LocalStatus {
+ for storage, healthy := range storages {
+ if !healthy {
+ continue
+ }
+
+ expectedHealthyNodes[virtualStorage] = append(expectedHealthyNodes[virtualStorage], storage)
+ }
+
+ sort.Strings(expectedHealthyNodes[virtualStorage])
+ }
+
runCtx, cancelRun := context.WithCancel(ctx)
require.Equal(t, context.Canceled, hm.Run(runCtx, helper.NewCountTicker(1, cancelRun)))
- require.Equal(t, hc.HealthyNodes, hm.HealthyNodes(), "health check %d", i+1)
+
+ // we need to sort the storages so the require.Equal matches, ElementsMatch does not work with a map.
+ actualHealthyNodes := hm.HealthyNodes()
+ for _, storages := range actualHealthyNodes {
+ sort.Strings(storages)
+ }
+
+ require.Equal(t, expectedHealthyNodes, actualHealthyNodes, "health check %d", i+1)
+ require.Equal(t, hc.HealthConsensus, hm.HealthConsensus(), "health check %d", i+1)
updated := false
select {
diff --git a/internal/praefect/nodes/per_repository.go b/internal/praefect/nodes/per_repository.go
index 637a18f1a..a65ec67a5 100644
--- a/internal/praefect/nodes/per_repository.go
+++ b/internal/praefect/nodes/per_repository.go
@@ -22,19 +22,21 @@ var ErrNoPrimary = errors.New("no primary")
type PerRepositoryElector struct {
log logrus.FieldLogger
db glsql.Querier
- hc HealthChecker
+ hc HealthConsensus
handleError func(error) error
retryWait time.Duration
}
-// HealthChecker maintains node health statuses.
-type HealthChecker interface {
- // HealthyNodes returns lists of healthy nodes by virtual storages.
- HealthyNodes() map[string][]string
+// HealthConsensus returns the cluster's consensus of healthy nodes.
+type HealthConsensus interface {
+ // HealthConsensus returns a list of healthy nodes by cluster consensus. Returned
+ // set may contains nodes not present in the local configuration if the cluster has
+ // deemed them healthy.
+ HealthConsensus() map[string][]string
}
// NewPerRepositoryElector returns a new per repository primary elector.
-func NewPerRepositoryElector(log logrus.FieldLogger, db glsql.Querier, hc HealthChecker) *PerRepositoryElector {
+func NewPerRepositoryElector(log logrus.FieldLogger, db glsql.Querier, hc HealthConsensus) *PerRepositoryElector {
log = log.WithField("component", "PerRepositoryElector")
return &PerRepositoryElector{
log: log,
@@ -91,7 +93,7 @@ func (pr *PerRepositoryElector) Run(ctx context.Context, trigger <-chan struct{}
}
func (pr *PerRepositoryElector) performFailovers(ctx context.Context) error {
- healthyNodes := pr.hc.HealthyNodes()
+ healthyNodes := pr.hc.HealthConsensus()
var virtualStorages, physicalStorages []string
for virtualStorage, nodes := range healthyNodes {
diff --git a/internal/praefect/nodes/per_repository_test.go b/internal/praefect/nodes/per_repository_test.go
index d416f8998..e5c9f317e 100644
--- a/internal/praefect/nodes/per_repository_test.go
+++ b/internal/praefect/nodes/per_repository_test.go
@@ -17,10 +17,10 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
)
-// HealthCheckerFunc is an adapter to turn a conforming function in to a HealthChecker.
-type HealthCheckerFunc func() map[string][]string
+// HealthConsensusFunc is an adapter to turn a conforming function in to a HealthConsensus.
+type HealthConsensusFunc func() map[string][]string
-func (fn HealthCheckerFunc) HealthyNodes() map[string][]string { return fn() }
+func (fn HealthConsensusFunc) HealthConsensus() map[string][]string { return fn() }
func TestPerRepositoryElector(t *testing.T) {
ctx, cancel := testhelper.Context()
@@ -448,7 +448,7 @@ func TestPerRepositoryElector(t *testing.T) {
for _, step := range tc.steps {
elector := NewPerRepositoryElector(testhelper.DiscardTestLogger(t), db,
- HealthCheckerFunc(func() map[string][]string { return step.healthyNodes }),
+ HealthConsensusFunc(func() map[string][]string { return step.healthyNodes }),
)
elector.handleError = func(err error) error { return err }
@@ -480,7 +480,7 @@ func TestPerRepositoryElector_Retry(t *testing.T) {
return nil, assert.AnError
},
},
- HealthCheckerFunc(func() map[string][]string { return map[string][]string{} }),
+ HealthConsensusFunc(func() map[string][]string { return map[string][]string{} }),
)
elector.retryWait = time.Nanosecond
elector.handleError = func(err error) error {