Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--internal/praefect/nodes/health_manager.go21
-rw-r--r--internal/praefect/nodes/health_manager_test.go49
2 files changed, 65 insertions, 5 deletions
diff --git a/internal/praefect/nodes/health_manager.go b/internal/praefect/nodes/health_manager.go
index 32f174533..b8e34698f 100644
--- a/internal/praefect/nodes/health_manager.go
+++ b/internal/praefect/nodes/health_manager.go
@@ -44,9 +44,14 @@ type HealthManager struct {
praefectName string
// healthCheckTimeout is the duration after a health check attempt times out.
healthCheckTimeout time.Duration
-
- firstUpdate bool
- updated chan struct{}
+ // databaseTimeout applies the timeout for the database update. It returns a context with
+ // the timeout applied and a cancellation function. This should be shorter than the failover
+ // timeout, otherwise it is possible that the updated health checks are immediately considered
+ // outdated after the update has finished. This can be difficult to debug as Gitaly nodes are
+ // seemingly responding to the health checks but are considered outdated by Praefect.
+ databaseTimeout func(context.Context) (context.Context, func())
+ firstUpdate bool
+ updated chan struct{}
locallyHealthy atomic.Value
}
@@ -70,8 +75,11 @@ func NewHealthManager(
},
praefectName: praefectName,
healthCheckTimeout: healthcheckTimeout,
- firstUpdate: true,
- updated: make(chan struct{}, 1),
+ databaseTimeout: func(ctx context.Context) (context.Context, func()) {
+ return context.WithTimeout(ctx, 5*time.Second)
+ },
+ firstUpdate: true,
+ updated: make(chan struct{}, 1),
}
hm.locallyHealthy.Store(make(map[string][]string, len(clients)))
@@ -130,6 +138,9 @@ func (hm *HealthManager) updateHealthChecks(ctx context.Context, virtualStorages
hm.locallyHealthy.Store(locallyHealthy)
+ ctx, cancel := hm.databaseTimeout(ctx)
+ defer cancel()
+
if _, err := hm.db.ExecContext(ctx, `
INSERT INTO node_status (praefect_name, shard_name, node_name, last_contact_attempt_at, last_seen_active_at)
SELECT $1, shard_name, node_name, NOW(), CASE WHEN is_healthy THEN NOW() ELSE NULL END
diff --git a/internal/praefect/nodes/health_manager_test.go b/internal/praefect/nodes/health_manager_test.go
index 24d149a63..22a1219f8 100644
--- a/internal/praefect/nodes/health_manager_test.go
+++ b/internal/praefect/nodes/health_manager_test.go
@@ -554,6 +554,55 @@ func TestHealthManager(t *testing.T) {
}
}
+func TestHealthManager_databaseTimeout(t *testing.T) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ db := glsql.NewDB(t)
+
+ blockingTx := db.Begin(t)
+ defer blockingTx.Rollback(t)
+
+ newHealthManager := func(db glsql.Querier) *HealthManager {
+ return NewHealthManager(testhelper.DiscardTestLogger(t), db, "praefect", HealthClients{
+ "virtual-storage": {
+ "gitaly": mockHealthClient{
+ CheckFunc: func(context.Context, *grpc_health_v1.HealthCheckRequest, ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error) {
+ return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil
+ },
+ },
+ },
+ })
+ }
+
+ // Run an update and leave the transaction open to block the other client.
+ blockingMgr := newHealthManager(blockingTx)
+ runCtx, cancelRun := context.WithCancel(ctx)
+ require.Equal(t, context.Canceled, blockingMgr.Run(runCtx, helper.NewCountTicker(1, cancelRun)))
+
+ blockedMgr := newHealthManager(db)
+
+ var timeoutQuery func()
+ blockedMgr.databaseTimeout = func(ctx context.Context) (context.Context, func()) {
+ ctx, timeoutQuery = context.WithCancel(ctx)
+ return ctx, timeoutQuery
+ }
+ blockedMgr.handleError = func(err error) error { return err }
+
+ blockedErr := make(chan error)
+ // This will block in database waiting for a lock.
+ go func() {
+ blockedErr <- blockedMgr.Run(ctx, helper.NewCountTicker(1, func() {}))
+ }()
+
+ // Wait until the blocked query is waiting.
+ glsql.WaitForBlockedQuery(ctx, t, db, "INSERT INTO node_status")
+ // Simulate a timeout.
+ timeoutQuery()
+ // Query should have been canceled.
+ require.EqualError(t, <-blockedErr, "update checks: pq: canceling statement due to user request")
+}
+
func predateHealthChecks(t testing.TB, db glsql.DB, amount time.Duration) {
t.Helper()