diff options
author | Paul Okstad <pokstad@gitlab.com> | 2020-12-01 00:54:11 +0300 |
---|---|---|
committer | Paul Okstad <pokstad@gitlab.com> | 2020-12-01 00:54:11 +0300 |
commit | 5be688856e12a29ac5560f73352cd1490d65a04e (patch) | |
tree | 6d34e3e4b39316f06e5cef446a64d866d05cd822 | |
parent | dfded1eb6bfccdabc0ae23ede2cdbc8b9bf1acd0 (diff) | |
parent | 274e79982667b63601e0bf22be2dda84f820bbc7 (diff) |
Merge branch 'smh-fix-test-threshold' into 'master'
Fix TestErrorThreshold flaking
Closes #3134
See merge request gitlab-org/gitaly!2835
-rw-r--r-- | internal/praefect/nodes/manager.go | 28 | ||||
-rw-r--r-- | internal/praefect/nodes/tracker/health_client.go | 40 | ||||
-rw-r--r-- | internal/praefect/server_test.go | 39 |
3 files changed, 70 insertions, 37 deletions
diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go index 2e4e17a64..7b7dab675 100644 --- a/internal/praefect/nodes/manager.go +++ b/internal/praefect/nodes/manager.go @@ -357,41 +357,27 @@ func (n *nodeStatus) updateStatus(status bool) { } func (n *nodeStatus) CheckHealth(ctx context.Context) (bool, error) { + health := healthpb.NewHealthClient(n.clientConn) if n.errTracker != nil { - if n.errTracker.ReadThresholdReached(n.GetStorage()) { - n.updateStatus(false) - return false, fmt.Errorf("read error threshold reached for storage %q", n.GetStorage()) - } - - if n.errTracker.WriteThresholdReached(n.GetStorage()) { - n.updateStatus(false) - return false, fmt.Errorf("write error threshold reached for storage %q", n.GetStorage()) - } + health = tracker.NewHealthClient(health, n.GetStorage(), n.errTracker) } - health := healthpb.NewHealthClient(n.clientConn) ctx, cancel := context.WithTimeout(ctx, healthcheckTimeout) defer cancel() - status := false start := time.Now() - resp, err := health.Check(ctx, &healthpb.HealthCheckRequest{Service: ""}) + resp, err := health.Check(ctx, &healthpb.HealthCheckRequest{}) n.latencyHist.WithLabelValues(n.node.Storage).Observe(time.Since(start).Seconds()) - - if err == nil && resp.Status == healthpb.HealthCheckResponse_SERVING { - status = true - } else { + if err != nil { n.log.WithError(err).WithFields(logrus.Fields{ "storage": n.node.Storage, "address": n.node.Address, }).Warn("error when pinging healthcheck") } - var gaugeValue float64 - if status { - gaugeValue = 1 - } - metrics.NodeLastHealthcheckGauge.WithLabelValues(n.GetStorage()).Set(gaugeValue) + status := resp.GetStatus() == healthpb.HealthCheckResponse_SERVING + + metrics.NodeLastHealthcheckGauge.WithLabelValues(n.GetStorage()).Set(metrics.BoolAsFloat(status)) n.updateStatus(status) diff --git a/internal/praefect/nodes/tracker/health_client.go b/internal/praefect/nodes/tracker/health_client.go new file mode 100644 index 000000000..3c79d035c --- /dev/null +++ b/internal/praefect/nodes/tracker/health_client.go @@ -0,0 +1,40 @@ +package tracker + +import ( + "context" + "fmt" + + "google.golang.org/grpc" + "google.golang.org/grpc/health/grpc_health_v1" +) + +// HealthClient wraps a gRPC HealthClient and circuit breaks its health checks if +// the error threshold has been reached. +type HealthClient struct { + storage string + tracker ErrorTracker + grpc_health_v1.HealthClient +} + +// NewHealthClient returns the HealthClient wrapped with error threshold circuit breaker. +func NewHealthClient(client grpc_health_v1.HealthClient, storage string, tracker ErrorTracker) HealthClient { + return HealthClient{ + tracker: tracker, + HealthClient: client, + storage: storage, + } +} + +// Check circuit breaks the health check if write or read error thresholds have been reached. If not, it performs +// the health check. +func (hc HealthClient) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest, opts ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error) { + if hc.tracker.ReadThresholdReached(hc.storage) { + return nil, fmt.Errorf("read error threshold reached for storage %q", hc.storage) + } + + if hc.tracker.WriteThresholdReached(hc.storage) { + return nil, fmt.Errorf("write error threshold reached for storage %q", hc.storage) + } + + return hc.HealthClient.Check(ctx, req, opts...) +} diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index bdf217ef5..29321e696 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -891,13 +891,16 @@ func TestErrorThreshold(t *testing.T) { testCases := []struct { desc string accessor bool + error error }{ { desc: "read threshold reached", accessor: true, + error: errors.New(`read error threshold reached for storage "praefect-internal-0"`), }, { - desc: "write threshold reached", + desc: "write threshold reached", + error: errors.New(`write error threshold reached for storage "praefect-internal-0"`), }, } @@ -917,7 +920,7 @@ func TestErrorThreshold(t *testing.T) { writeThreshold = 5000 } - errorTracker, err := tracker.NewErrors(ctx, 10*time.Second, readThreshold, writeThreshold) + errorTracker, err := tracker.NewErrors(ctx, 10*time.Hour, readThreshold, writeThreshold) require.NoError(t, err) rs := datastore.NewMemoryRepositoryStore(conf.StorageNames()) @@ -950,15 +953,19 @@ func TestErrorThreshold(t *testing.T) { require.NoError(t, err) cli := mock.NewSimpleServiceClient(conn) - nodeMgr.Start(0, time.Nanosecond) repo, _, cleanup := testhelper.NewTestRepo(t) defer cleanup() - shard, err := nodeMgr.GetShard(ctx, "default") - require.NoError(t, err) - require.Equal(t, "praefect-internal-0", shard.Primary.GetStorage()) + node := nodeMgr.Nodes()["default"][0] + require.Equal(t, "praefect-internal-0", node.GetStorage()) - expectedErr := status.Error(codes.Internal, "something went wrong") + // to get the node in a healthy starting state, three consecutive successful checks + // are needed + for i := 0; i < 3; i++ { + healthy, err := node.CheckHealth(ctx) + require.NoError(t, err) + require.True(t, healthy) + } for i := 0; i < 5; i++ { ctx := grpc_metadata.AppendToOutgoingContext(ctx, "bad-header", "true") @@ -968,17 +975,17 @@ func TestErrorThreshold(t *testing.T) { handler = cli.RepoAccessorUnary } - _, err := handler(ctx, &mock.RepoRequest{Repo: repo}) - require.Equal(t, expectedErr, err) - } + healthy, err := node.CheckHealth(ctx) + require.NoError(t, err) + require.True(t, healthy) - for i := 0; i < 50; i++ { - if _, err = nodeMgr.GetShard(ctx, "default"); err != nil { - break - } - time.Sleep(1 * time.Millisecond) + _, err = handler(ctx, &mock.RepoRequest{Repo: repo}) + require.Equal(t, status.Error(codes.Internal, "something went wrong"), err) } - require.Equal(t, nodes.ErrPrimaryNotHealthy, err) + + healthy, err := node.CheckHealth(ctx) + require.Equal(t, tc.error, err) + require.False(t, healthy) }) } } |