diff options
-rw-r--r-- | internal/praefect/nodes/health_manager.go | 21 | ||||
-rw-r--r-- | internal/praefect/nodes/health_manager_test.go | 49 |
2 files changed, 65 insertions, 5 deletions
diff --git a/internal/praefect/nodes/health_manager.go b/internal/praefect/nodes/health_manager.go index 32f174533..b8e34698f 100644 --- a/internal/praefect/nodes/health_manager.go +++ b/internal/praefect/nodes/health_manager.go @@ -44,9 +44,14 @@ type HealthManager struct { praefectName string // healthCheckTimeout is the duration after a health check attempt times out. healthCheckTimeout time.Duration - - firstUpdate bool - updated chan struct{} + // databaseTimeout applies the timeout for the database update. It returns a context with + // the timeout applied and a cancellation function. This should be shorter than the failover + // timeout, otherwise it is possible that the updated health checks are immediately considered + // outdated after the update has finished. This can be difficult to debug as Gitaly nodes are + // seemingly responding to the health checks but are considered outdated by Praefect. + databaseTimeout func(context.Context) (context.Context, func()) + firstUpdate bool + updated chan struct{} locallyHealthy atomic.Value } @@ -70,8 +75,11 @@ func NewHealthManager( }, praefectName: praefectName, healthCheckTimeout: healthcheckTimeout, - firstUpdate: true, - updated: make(chan struct{}, 1), + databaseTimeout: func(ctx context.Context) (context.Context, func()) { + return context.WithTimeout(ctx, 5*time.Second) + }, + firstUpdate: true, + updated: make(chan struct{}, 1), } hm.locallyHealthy.Store(make(map[string][]string, len(clients))) @@ -130,6 +138,9 @@ func (hm *HealthManager) updateHealthChecks(ctx context.Context, virtualStorages hm.locallyHealthy.Store(locallyHealthy) + ctx, cancel := hm.databaseTimeout(ctx) + defer cancel() + if _, err := hm.db.ExecContext(ctx, ` INSERT INTO node_status (praefect_name, shard_name, node_name, last_contact_attempt_at, last_seen_active_at) SELECT $1, shard_name, node_name, NOW(), CASE WHEN is_healthy THEN NOW() ELSE NULL END diff --git a/internal/praefect/nodes/health_manager_test.go b/internal/praefect/nodes/health_manager_test.go index 24d149a63..22a1219f8 100644 --- a/internal/praefect/nodes/health_manager_test.go +++ b/internal/praefect/nodes/health_manager_test.go @@ -554,6 +554,55 @@ func TestHealthManager(t *testing.T) { } } +func TestHealthManager_databaseTimeout(t *testing.T) { + ctx, cancel := testhelper.Context() + defer cancel() + + db := glsql.NewDB(t) + + blockingTx := db.Begin(t) + defer blockingTx.Rollback(t) + + newHealthManager := func(db glsql.Querier) *HealthManager { + return NewHealthManager(testhelper.DiscardTestLogger(t), db, "praefect", HealthClients{ + "virtual-storage": { + "gitaly": mockHealthClient{ + CheckFunc: func(context.Context, *grpc_health_v1.HealthCheckRequest, ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error) { + return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil + }, + }, + }, + }) + } + + // Run an update and leave the transaction open to block the other client. + blockingMgr := newHealthManager(blockingTx) + runCtx, cancelRun := context.WithCancel(ctx) + require.Equal(t, context.Canceled, blockingMgr.Run(runCtx, helper.NewCountTicker(1, cancelRun))) + + blockedMgr := newHealthManager(db) + + var timeoutQuery func() + blockedMgr.databaseTimeout = func(ctx context.Context) (context.Context, func()) { + ctx, timeoutQuery = context.WithCancel(ctx) + return ctx, timeoutQuery + } + blockedMgr.handleError = func(err error) error { return err } + + blockedErr := make(chan error) + // This will block in database waiting for a lock. + go func() { + blockedErr <- blockedMgr.Run(ctx, helper.NewCountTicker(1, func() {})) + }() + + // Wait until the blocked query is waiting. + glsql.WaitForBlockedQuery(ctx, t, db, "INSERT INTO node_status") + // Simulate a timeout. + timeoutQuery() + // Query should have been canceled. + require.EqualError(t, <-blockedErr, "update checks: pq: canceling statement due to user request") +} + func predateHealthChecks(t testing.TB, db glsql.DB, amount time.Duration) { t.Helper() |