Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2020-09-08 20:20:20 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2020-09-11 12:11:53 +0300
commit2ed77beb0db0961982eaee3f243972e31dc9ccf1 (patch)
tree1a5558edde0677b5e0112faa0dd9237569855487 /internal/praefect
parent1f2d9e1963ce8f0fc869336a98b4ab949fb1682b (diff)
decouple health checking logic from sql elector
NodeManager and the sqlElector have grown in responsibility to cover connection management, health checking, leader election and request routing. This prevents reuse of the logic and makes it difficult to test the independent pieces. As a first step in decoupling the independent functionalities from the NodeManager and the sqlElector, this commit splits out the health checking logic in to its own component. The new HealthManager runs periodic health checks on the configured nodes. The health check results are recorded in the database and the cluster's health status is determined from a consensus of votes of all Praefect nodes in the cluster. The only difference to sqlElector's health checking logic is that the success threshold of three consecutive health checks is dropped. A node is considered healthy if it has a successful health check in the last 10 seconds, which is also checked for in the current implementation. This simplifies the logic and ensures all Praefect's have an accurate idea of how each node views the health of a given Gitaly node. ErrorTracker is not part of the health manager and will be implemented in a follow up behind the HealthClient interface as it doesn't have to be part of the core logic. The HealthManager is not yet wired up for use and will be done in a follow-up commit.
Diffstat (limited to 'internal/praefect')
-rw-r--r--internal/praefect/nodes/health_manager.go290
-rw-r--r--internal/praefect/nodes/health_manager_test.go547
2 files changed, 837 insertions, 0 deletions
diff --git a/internal/praefect/nodes/health_manager.go b/internal/praefect/nodes/health_manager.go
new file mode 100644
index 000000000..1e37beb81
--- /dev/null
+++ b/internal/praefect/nodes/health_manager.go
@@ -0,0 +1,290 @@
+package nodes
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/lib/pq"
+ "github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql"
+ "google.golang.org/grpc/health/grpc_health_v1"
+)
+
+// HealthClients contains HealthClients for every physical storage by virtual storage.
+type HealthClients map[string]map[string]grpc_health_v1.HealthClient
+
+// HealthManager monitors the health status of the storage cluster. The monitoring frequency
+// is controlled by the Ticker passed in to Run method. On each tick, the HealthManager:
+//
+// 1. Runs health checks on configured physical storages by performing a gRPC call
+// to the health checking endpoint. If an error tracker is configured, it also considers
+// its view of the node's health.
+// 2. Stores its health check results in the `node_status` table.
+// 3. Checks if the clusters consensus of healthy nodes has changed by querying the `node_status`
+// table for results of the other Praefect instances. If so, it sends to the Updated channel
+// to signal a change in the cluster status.
+//
+// To determine the participants for the quorum, we use a lightweight service discovery protocol.
+// A Praefect instance is deemed to be voting member if it has a recent health check in the
+// `node_status` table. Each Praefect node is identified by their host name and the provided
+// stable ID. The stable ID should uniquely identify a Praefect instance on the host.
+type HealthManager struct {
+ log logrus.FieldLogger
+ db glsql.Querier
+ handleError func(error) error
+ // clients contains connections to the configured physical storages within each
+ // virtual storage.
+ clients HealthClients
+ // praefectName is the identifier of the Praefect running the HealthManager. It should
+ // be stable through the restarts as they are used to identify quorum members.
+ praefectName string
+ // healthCheckTimeout is the duration after a health check attempt times out.
+ healthCheckTimeout time.Duration
+ // healthinessTimeout is the time after a node is unhealthy after the last
+ // successful health check.
+ healthinessTimeout time.Duration
+ // quorumParticipantTimeout is the time after a Praefect is no longer considered
+ // to be part of the quorum if it has not performed a health check.
+ quorumParticipantTimeout time.Duration
+
+ firstUpdate bool
+ updated chan struct{}
+ healthyNodes atomic.Value
+}
+
+// NewHealthManager returns a new health manager that monitors which nodes in the cluster
+// are healthy.
+func NewHealthManager(
+ log logrus.FieldLogger,
+ db glsql.Querier,
+ praefectName string,
+ clients HealthClients,
+) *HealthManager {
+ log = log.WithField("component", "HealthManager")
+ hm := HealthManager{
+ log: log,
+ db: db,
+ clients: clients,
+ handleError: func(err error) error {
+ log.WithError(err).Error("checking health failed")
+ return nil
+ },
+ praefectName: praefectName,
+ healthCheckTimeout: healthcheckTimeout,
+ healthinessTimeout: failoverTimeout,
+ quorumParticipantTimeout: activePraefectTimeout,
+ firstUpdate: true,
+ updated: make(chan struct{}, 1),
+ }
+
+ hm.healthyNodes.Store(make(map[string][]string, len(clients)))
+
+ return &hm
+}
+
+// Run runs the health check on every tick by the Ticker until the context is
+// canceled. Returns the error from the context.
+func (hm *HealthManager) Run(ctx context.Context, ticker helper.Ticker) error {
+ hm.log.Info("health manager started")
+ defer hm.log.Info("health manager stopped")
+
+ defer ticker.Stop()
+
+ for {
+ ticker.Reset()
+
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-ticker.C():
+ if err := hm.updateHealthChecks(ctx); err != nil {
+ if err := hm.handleError(err); err != nil {
+ return err
+ }
+ }
+ }
+ }
+}
+
+// Updated returns a channel that is sent to when the set of healthy nodes is updated.
+// Update is also sent to on the first check even if no nodes are healthy. The channel
+// is buffered to allow HealthManager to proceed with cluster health monitoring when
+// the channel consumer is slow.
+func (hm *HealthManager) Updated() <-chan struct{} {
+ return hm.updated
+}
+
+// HealthyNodes returns a map of healthy nodes in each virtual storage. The set of
+// healthy nodes might include nodes which are not present in the local configuration
+// if the cluster's consensus has deemed them healthy.
+func (hm *HealthManager) HealthyNodes() map[string][]string {
+ return hm.healthyNodes.Load().(map[string][]string)
+}
+
+func (hm *HealthManager) updateHealthChecks(ctx context.Context) error {
+ virtualStorages, physicalStorages, healthy := hm.performHealthChecks(ctx)
+
+ rows, err := hm.db.QueryContext(ctx, `
+WITH updated_checks AS (
+ INSERT INTO node_status (praefect_name, shard_name, node_name, last_contact_attempt_at, last_seen_active_at)
+ SELECT $1, shard_name, node_name, NOW(), CASE WHEN is_healthy THEN NOW() ELSE NULL END
+ FROM (
+ SELECT unnest($2::text[]) AS shard_name,
+ unnest($3::text[]) AS node_name,
+ unnest($4::boolean[]) AS is_healthy
+ ) AS results
+ ON CONFLICT (praefect_name, shard_name, node_name)
+ DO UPDATE SET
+ last_contact_attempt_at = NOW(),
+ last_seen_active_at = COALESCE(EXCLUDED.last_seen_active_at, node_status.last_seen_active_at)
+ RETURNING *
+),
+updated_node_status AS (
+ /*
+ Updates performed in a CTE are not visible except in the temporary table created by it.
+ Construct the updated view of node_status by getting rows updated during this statement
+ from update_checks and the rest of the rows from node_status.
+ */
+ SELECT *
+ FROM node_status
+ WHERE NOT EXISTS (
+ SELECT 1 FROM updated_checks
+ WHERE praefect_name = node_status.praefect_name
+ AND shard_name = node_status.shard_name
+ AND node_name = node_status.node_name
+ )
+ UNION
+ SELECT *
+ FROM updated_checks
+)
+
+SELECT shard_name, node_name
+FROM updated_node_status AS ns
+WHERE last_seen_active_at >= NOW() - INTERVAL '1 MICROSECOND' * $5
+GROUP BY shard_name, node_name
+HAVING COUNT(praefect_name) >= (
+ SELECT CEIL(COUNT(DISTINCT praefect_name) / 2.0) AS quorum_count
+ FROM updated_node_status
+ WHERE shard_name = ns.shard_name
+ AND last_contact_attempt_at >= NOW() - INTERVAL '1 MICROSECOND' * $6
+)
+ORDER BY shard_name, node_name
+ `,
+ hm.praefectName,
+ pq.StringArray(virtualStorages),
+ pq.StringArray(physicalStorages),
+ pq.BoolArray(healthy),
+ hm.healthinessTimeout.Microseconds(),
+ hm.quorumParticipantTimeout.Microseconds(),
+ )
+ if err != nil {
+ return fmt.Errorf("query: %w", err)
+ }
+
+ defer func() {
+ if err := rows.Close(); err != nil {
+ hm.log.WithError(err).Error("failed closing query rows")
+ }
+ }()
+
+ currentlyHealthy := make(map[string][]string, len(physicalStorages))
+ for rows.Next() {
+ var virtualStorage, storage string
+ if err := rows.Scan(&virtualStorage, &storage); err != nil {
+ return fmt.Errorf("scan: %w", err)
+ }
+
+ currentlyHealthy[virtualStorage] = append(currentlyHealthy[virtualStorage], storage)
+ }
+
+ if err := rows.Err(); err != nil {
+ return fmt.Errorf("rows: %w", err)
+ }
+
+ if hm.firstUpdate || hm.hasHealthySetChanged(currentlyHealthy) {
+ hm.firstUpdate = false
+ hm.healthyNodes.Store(currentlyHealthy)
+ select {
+ case hm.updated <- struct{}{}:
+ default:
+ }
+ }
+
+ return nil
+}
+
+func (hm *HealthManager) performHealthChecks(ctx context.Context) ([]string, []string, []bool) {
+ nodeCount := 0
+ for _, physicalStorages := range hm.clients {
+ nodeCount += len(physicalStorages)
+ }
+
+ virtualStorages := make([]string, nodeCount)
+ physicalStorages := make([]string, nodeCount)
+ healthy := make([]bool, nodeCount)
+
+ var wg sync.WaitGroup
+ wg.Add(nodeCount)
+
+ ctx, cancel := context.WithTimeout(ctx, hm.healthCheckTimeout)
+ defer cancel()
+
+ i := 0
+ for virtualStorage, storages := range hm.clients {
+ for storage, client := range storages {
+ virtualStorages[i] = virtualStorage
+ physicalStorages[i] = storage
+ go func(i int, client grpc_health_v1.HealthClient) {
+ defer wg.Done()
+
+ resp, err := client.Check(ctx, &grpc_health_v1.HealthCheckRequest{})
+ if err != nil {
+ hm.log.WithFields(logrus.Fields{
+ logrus.ErrorKey: err,
+ "virtual_storage": virtualStorages[i],
+ "storage": physicalStorages[i],
+ }).Error("failed checking node health")
+ }
+
+ healthy[i] = resp != nil && resp.Status == grpc_health_v1.HealthCheckResponse_SERVING
+ }(i, client)
+ i++
+ }
+ }
+
+ wg.Wait()
+
+ return virtualStorages, physicalStorages, healthy
+}
+
+func (hm *HealthManager) hasHealthySetChanged(currentlyHealthy map[string][]string) bool {
+ previouslyHealthy := hm.HealthyNodes()
+
+ if len(previouslyHealthy) != len(currentlyHealthy) {
+ return true
+ }
+
+ for virtualStorage, previousNodes := range previouslyHealthy {
+ currentNodes := currentlyHealthy[virtualStorage]
+ if len(currentNodes) != len(previousNodes) {
+ return true
+ }
+
+ previous := make(map[string]struct{}, len(previousNodes))
+ for _, node := range previousNodes {
+ previous[node] = struct{}{}
+ }
+
+ for _, node := range currentNodes {
+ if _, ok := previous[node]; !ok {
+ return true
+ }
+ }
+ }
+
+ return false
+}
diff --git a/internal/praefect/nodes/health_manager_test.go b/internal/praefect/nodes/health_manager_test.go
new file mode 100644
index 000000000..4e4a94b64
--- /dev/null
+++ b/internal/praefect/nodes/health_manager_test.go
@@ -0,0 +1,547 @@
+// +build postgres
+
+package nodes
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/health/grpc_health_v1"
+)
+
+type mockHealthClient struct {
+ grpc_health_v1.HealthClient
+ CheckFunc func(context.Context, *grpc_health_v1.HealthCheckRequest, ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error)
+}
+
+func (m mockHealthClient) Check(ctx context.Context, r *grpc_health_v1.HealthCheckRequest, opts ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error) {
+ return m.CheckFunc(ctx, r, opts...)
+}
+
+func TestHealthManager(t *testing.T) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ type LocalStatus map[string]map[string]bool
+
+ type HealthChecks []struct {
+ After time.Duration
+ PraefectName string
+ LocalStatus LocalStatus
+ Updated bool
+ HealthyNodes map[string][]string
+ }
+
+ for _, tc := range []struct {
+ desc string
+ healthChecks HealthChecks
+ }{
+ {
+ desc: "single voter basic scenarios",
+ healthChecks: HealthChecks{
+ {
+ PraefectName: "praefect-1",
+ LocalStatus: LocalStatus{
+ "virtual-storage-1": {
+ "gitaly-1": true,
+ "gitaly-2": true,
+ },
+ "virtual-storage-2": {
+ "gitaly-1": true,
+ "gitaly-2": false,
+ },
+ "virtual-storage-3": {
+ "gitaly-1": false,
+ "gitlay-2": false,
+ },
+ },
+ Updated: true,
+ HealthyNodes: map[string][]string{
+ "virtual-storage-1": {"gitaly-1", "gitaly-2"},
+ "virtual-storage-2": {"gitaly-1"},
+ },
+ },
+ },
+ },
+ {
+ desc: "updates own vote to healthy",
+ healthChecks: HealthChecks{
+ {
+ PraefectName: "praefect-1",
+ LocalStatus: LocalStatus{
+ "virtual-storage-1": {
+ "gitaly-1": false,
+ },
+ },
+ Updated: true,
+ HealthyNodes: map[string][]string{},
+ },
+ {
+ PraefectName: "praefect-1",
+ LocalStatus: LocalStatus{
+ "virtual-storage-1": {
+ "gitaly-1": true,
+ },
+ },
+ Updated: true,
+ HealthyNodes: map[string][]string{
+ "virtual-storage-1": {"gitaly-1"},
+ },
+ },
+ },
+ },
+ {
+ desc: "counts own healthy vote before timeout",
+ healthChecks: HealthChecks{
+ {
+ PraefectName: "praefect-1",
+ LocalStatus: LocalStatus{
+ "virtual-storage-1": {
+ "gitaly-1": true,
+ },
+ },
+ Updated: true,
+ HealthyNodes: map[string][]string{
+ "virtual-storage-1": {"gitaly-1"},
+ },
+ },
+ {
+ PraefectName: "praefect-1",
+ LocalStatus: LocalStatus{
+ "virtual-storage-1": {
+ "gitaly-1": false,
+ },
+ },
+ HealthyNodes: map[string][]string{
+ "virtual-storage-1": {"gitaly-1"},
+ },
+ },
+ },
+ },
+ {
+ desc: "discounts own healthy vote after timeout",
+ healthChecks: HealthChecks{
+ {
+ PraefectName: "praefect-1",
+ LocalStatus: LocalStatus{
+ "virtual-storage-1": {
+ "gitaly-1": true,
+ },
+ },
+ Updated: true,
+ HealthyNodes: map[string][]string{
+ "virtual-storage-1": {"gitaly-1"},
+ },
+ },
+ {
+ After: failoverTimeout,
+ PraefectName: "praefect-1",
+ LocalStatus: LocalStatus{
+ "virtual-storage-1": {
+ "gitaly-1": false,
+ },
+ },
+ Updated: true,
+ HealthyNodes: map[string][]string{},
+ },
+ },
+ },
+ {
+ desc: "inactive praefects not part of quorum",
+ healthChecks: HealthChecks{
+ {
+ PraefectName: "praefect-1",
+ LocalStatus: LocalStatus{
+ "virtual-storage-1": {
+ "gitaly-1": false,
+ },
+ },
+ Updated: true,
+ HealthyNodes: map[string][]string{},
+ },
+ {
+ PraefectName: "praefect-2",
+ LocalStatus: LocalStatus{
+ "virtual-storage-1": {
+ "gitaly-1": false,
+ },
+ },
+ Updated: true,
+ HealthyNodes: map[string][]string{},
+ },
+ {
+ After: activePraefectTimeout,
+ PraefectName: "praefect-3",
+ LocalStatus: LocalStatus{
+ "virtual-storage-1": {
+ "gitaly-1": true,
+ },
+ },
+ Updated: true,
+ HealthyNodes: map[string][]string{
+ "virtual-storage-1": {"gitaly-1"},
+ },
+ },
+ },
+ },
+ {
+ desc: "unconfigured node in minority is unhealthy",
+ healthChecks: HealthChecks{
+ {
+ PraefectName: "praefect-1",
+ LocalStatus: LocalStatus{
+ "virtual-storage-1": {
+ "configured": true,
+ },
+ },
+ Updated: true,
+ HealthyNodes: map[string][]string{
+ "virtual-storage-1": {"configured"},
+ },
+ },
+ {
+ PraefectName: "praefect-2",
+ LocalStatus: LocalStatus{
+ "virtual-storage-1": {
+ "configured": true,
+ },
+ },
+ Updated: true,
+ HealthyNodes: map[string][]string{
+ "virtual-storage-1": {"configured"},
+ },
+ },
+ {
+ PraefectName: "praefect-3",
+ LocalStatus: LocalStatus{
+ "virtual-storage-1": {
+ "configured": true,
+ "unconfigured": true,
+ },
+ },
+ Updated: true,
+ HealthyNodes: map[string][]string{
+ "virtual-storage-1": {"configured"},
+ },
+ },
+ },
+ },
+ {
+ desc: "unconfigured node in majority is unhealthy",
+ healthChecks: HealthChecks{
+ {
+ PraefectName: "praefect-1",
+ LocalStatus: LocalStatus{
+ "virtual-storage-1": {
+ "configured": true,
+ "unconfigured": true,
+ },
+ },
+ Updated: true,
+ HealthyNodes: map[string][]string{
+ "virtual-storage-1": {"configured", "unconfigured"},
+ },
+ },
+ {
+ PraefectName: "praefect-2",
+ LocalStatus: LocalStatus{
+ "virtual-storage-1": {
+ "configured": true,
+ "unconfigured": true,
+ },
+ },
+ Updated: true,
+ HealthyNodes: map[string][]string{
+ "virtual-storage-1": {"configured", "unconfigured"},
+ },
+ },
+ {
+ PraefectName: "praefect-3",
+ LocalStatus: LocalStatus{
+ "virtual-storage-1": {
+ "configured": true,
+ },
+ },
+ Updated: true,
+ HealthyNodes: map[string][]string{
+ "virtual-storage-1": {"configured", "unconfigured"},
+ },
+ },
+ },
+ },
+ {
+ desc: "majority consensus healthy",
+ healthChecks: HealthChecks{
+
+ {
+ PraefectName: "praefect-1",
+ LocalStatus: LocalStatus{
+ "virtual-storage-1": {
+ "gitaly-1": false,
+ },
+ },
+ Updated: true,
+ HealthyNodes: map[string][]string{},
+ },
+ {
+ PraefectName: "praefect-2",
+ LocalStatus: LocalStatus{
+ "virtual-storage-1": {
+ "gitaly-1": true,
+ },
+ },
+ Updated: true,
+ HealthyNodes: map[string][]string{
+ "virtual-storage-1": {"gitaly-1"},
+ },
+ },
+ {
+ PraefectName: "praefect-3",
+ LocalStatus: LocalStatus{
+ "virtual-storage-1": {
+ "gitaly-1": true,
+ },
+ },
+ Updated: true,
+ HealthyNodes: map[string][]string{
+ "virtual-storage-1": {"gitaly-1"},
+ },
+ },
+ },
+ },
+ {
+ desc: "majority consensus unhealthy",
+ healthChecks: HealthChecks{
+ {
+ PraefectName: "praefect-1",
+ LocalStatus: LocalStatus{
+ "virtual-storage-1": {
+ "gitaly-1": true,
+ },
+ },
+ Updated: true,
+ HealthyNodes: map[string][]string{
+ "virtual-storage-1": {"gitaly-1"},
+ },
+ },
+ {
+ PraefectName: "praefect-2",
+ LocalStatus: LocalStatus{
+ "virtual-storage-1": {
+ "gitaly-1": false,
+ },
+ },
+ Updated: true,
+ HealthyNodes: map[string][]string{
+ "virtual-storage-1": {"gitaly-1"},
+ },
+ },
+ {
+ PraefectName: "praefect-3",
+ LocalStatus: LocalStatus{
+ "virtual-storage-1": {
+ "gitaly-1": false,
+ },
+ },
+ Updated: true,
+ HealthyNodes: map[string][]string{},
+ },
+ },
+ },
+ {
+ desc: "first check triggers update",
+ healthChecks: HealthChecks{
+ {
+ PraefectName: "praefect-1",
+ LocalStatus: LocalStatus{
+ "virtual-storage-1": {
+ "gitaly-1": false,
+ },
+ },
+ Updated: true,
+ HealthyNodes: map[string][]string{},
+ },
+ {
+ PraefectName: "praefect-1",
+ LocalStatus: LocalStatus{
+ "virtual-storage-1": {
+ "gitaly-1": false,
+ },
+ },
+ HealthyNodes: map[string][]string{},
+ },
+ },
+ },
+ {
+ desc: "node becoming healthy triggers update",
+ healthChecks: HealthChecks{
+ {
+ PraefectName: "praefect-1",
+ LocalStatus: LocalStatus{
+ "virtual-storage-1": {
+ "gitaly-1": true,
+ "gitaly-2": false,
+ },
+ },
+ Updated: true,
+ HealthyNodes: map[string][]string{
+ "virtual-storage-1": {"gitaly-1"},
+ },
+ },
+ {
+ PraefectName: "praefect-1",
+ LocalStatus: LocalStatus{
+ "virtual-storage-1": {
+ "gitaly-1": true,
+ "gitaly-2": true,
+ },
+ },
+ Updated: true,
+ HealthyNodes: map[string][]string{
+ "virtual-storage-1": {"gitaly-1", "gitaly-2"},
+ },
+ },
+ },
+ },
+ {
+ desc: "same set of healthy nodes does not update",
+ healthChecks: HealthChecks{
+ {
+ PraefectName: "praefect-1",
+ LocalStatus: LocalStatus{
+ "virtual-storage-1": {
+ "gitaly-1": true,
+ },
+ },
+ Updated: true,
+ HealthyNodes: map[string][]string{
+ "virtual-storage-1": {"gitaly-1"},
+ },
+ },
+ {
+ PraefectName: "praefect-1",
+ LocalStatus: LocalStatus{
+ "virtual-storage-1": {
+ "gitaly-1": true,
+ },
+ },
+ HealthyNodes: map[string][]string{
+ "virtual-storage-1": {"gitaly-1"},
+ },
+ },
+ },
+ },
+ {
+ desc: "different node triggers update",
+ healthChecks: HealthChecks{
+ {
+ PraefectName: "praefect-1",
+ LocalStatus: LocalStatus{
+ "virtual-storage-1": {
+ "gitaly-1": true,
+ "gitaly-2": false,
+ },
+ },
+ Updated: true,
+ HealthyNodes: map[string][]string{
+ "virtual-storage-1": {"gitaly-1"},
+ },
+ },
+ {
+ After: failoverTimeout,
+ PraefectName: "praefect-1",
+ LocalStatus: LocalStatus{
+ "virtual-storage-1": {
+ "gitaly-1": false,
+ "gitaly-2": true,
+ },
+ },
+ Updated: true,
+ HealthyNodes: map[string][]string{
+ "virtual-storage-1": {"gitaly-2"},
+ },
+ },
+ },
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ db := getDB(t)
+
+ healthStatus := map[string]grpc_health_v1.HealthCheckResponse_ServingStatus{}
+ // healthManagers are cached in order to keep the internal state intact between different
+ // health checks during the test.
+ healthManagers := map[string]*HealthManager{}
+
+ for i, hc := range tc.healthChecks {
+ // Create or use existing health managers
+ hm, ok := healthManagers[hc.PraefectName]
+ if !ok {
+ clients := make(HealthClients, len(hc.LocalStatus))
+ for virtualStorage, nodeHealths := range hc.LocalStatus {
+ clients[virtualStorage] = make(map[string]grpc_health_v1.HealthClient, len(nodeHealths))
+ for node, _ := range nodeHealths {
+ virtualStorage, node := virtualStorage, node
+ clients[virtualStorage][node] = mockHealthClient{
+ CheckFunc: func(context.Context, *grpc_health_v1.HealthCheckRequest, ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error) {
+ return &grpc_health_v1.HealthCheckResponse{Status: healthStatus[virtualStorage+node]}, nil
+ },
+ }
+ }
+ }
+
+ hm = NewHealthManager(testhelper.DiscardTestLogger(t), db, hc.PraefectName, clients)
+ hm.handleError = func(err error) error { return err }
+ healthManagers[hc.PraefectName] = hm
+ }
+
+ // Set health statuses to the expected
+ for virtualStorage, nodeHealths := range hc.LocalStatus {
+ for node, healthy := range nodeHealths {
+ status := grpc_health_v1.HealthCheckResponse_UNKNOWN
+ if healthy {
+ status = grpc_health_v1.HealthCheckResponse_SERVING
+ }
+
+ healthStatus[virtualStorage+node] = status
+ }
+ }
+
+ // predate earlier health checks to simulate this health check being run after a certain
+ // time period
+ if hc.After > 0 {
+ predateHealthChecks(t, db, hc.After)
+ }
+
+ runCtx, cancelRun := context.WithCancel(ctx)
+ require.Equal(t, context.Canceled, hm.Run(runCtx, helper.NewCountTicker(1, cancelRun)))
+ require.Equal(t, hc.HealthyNodes, hm.HealthyNodes(), "health check %d", i+1)
+
+ updated := false
+ select {
+ case <-hm.Updated():
+ updated = true
+ default:
+ }
+ require.Equal(t, hc.Updated, updated, "health check %d", i+1)
+ }
+ })
+ }
+}
+
+func predateHealthChecks(t testing.TB, db glsql.DB, amount time.Duration) {
+ t.Helper()
+
+ _, err := db.Exec(`
+ UPDATE node_status SET
+ last_contact_attempt_at = last_contact_attempt_at - INTERVAL '1 MICROSECOND' * $1,
+ last_seen_active_at = last_seen_active_at - INTERVAL '1 MICROSECOND' * $1
+ `, amount.Microseconds(),
+ )
+ require.NoError(t, err)
+}