Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Okstad <pokstad@gitlab.com>2020-12-01 00:54:11 +0300
committerPaul Okstad <pokstad@gitlab.com>2020-12-01 00:54:11 +0300
commit5be688856e12a29ac5560f73352cd1490d65a04e (patch)
tree6d34e3e4b39316f06e5cef446a64d866d05cd822
parentdfded1eb6bfccdabc0ae23ede2cdbc8b9bf1acd0 (diff)
parent274e79982667b63601e0bf22be2dda84f820bbc7 (diff)
Merge branch 'smh-fix-test-threshold' into 'master'
Fix TestErrorThreshold flaking Closes #3134 See merge request gitlab-org/gitaly!2835
-rw-r--r--internal/praefect/nodes/manager.go28
-rw-r--r--internal/praefect/nodes/tracker/health_client.go40
-rw-r--r--internal/praefect/server_test.go39
3 files changed, 70 insertions, 37 deletions
diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go
index 2e4e17a64..7b7dab675 100644
--- a/internal/praefect/nodes/manager.go
+++ b/internal/praefect/nodes/manager.go
@@ -357,41 +357,27 @@ func (n *nodeStatus) updateStatus(status bool) {
}
func (n *nodeStatus) CheckHealth(ctx context.Context) (bool, error) {
+ health := healthpb.NewHealthClient(n.clientConn)
if n.errTracker != nil {
- if n.errTracker.ReadThresholdReached(n.GetStorage()) {
- n.updateStatus(false)
- return false, fmt.Errorf("read error threshold reached for storage %q", n.GetStorage())
- }
-
- if n.errTracker.WriteThresholdReached(n.GetStorage()) {
- n.updateStatus(false)
- return false, fmt.Errorf("write error threshold reached for storage %q", n.GetStorage())
- }
+ health = tracker.NewHealthClient(health, n.GetStorage(), n.errTracker)
}
- health := healthpb.NewHealthClient(n.clientConn)
ctx, cancel := context.WithTimeout(ctx, healthcheckTimeout)
defer cancel()
- status := false
start := time.Now()
- resp, err := health.Check(ctx, &healthpb.HealthCheckRequest{Service: ""})
+ resp, err := health.Check(ctx, &healthpb.HealthCheckRequest{})
n.latencyHist.WithLabelValues(n.node.Storage).Observe(time.Since(start).Seconds())
-
- if err == nil && resp.Status == healthpb.HealthCheckResponse_SERVING {
- status = true
- } else {
+ if err != nil {
n.log.WithError(err).WithFields(logrus.Fields{
"storage": n.node.Storage,
"address": n.node.Address,
}).Warn("error when pinging healthcheck")
}
- var gaugeValue float64
- if status {
- gaugeValue = 1
- }
- metrics.NodeLastHealthcheckGauge.WithLabelValues(n.GetStorage()).Set(gaugeValue)
+ status := resp.GetStatus() == healthpb.HealthCheckResponse_SERVING
+
+ metrics.NodeLastHealthcheckGauge.WithLabelValues(n.GetStorage()).Set(metrics.BoolAsFloat(status))
n.updateStatus(status)
diff --git a/internal/praefect/nodes/tracker/health_client.go b/internal/praefect/nodes/tracker/health_client.go
new file mode 100644
index 000000000..3c79d035c
--- /dev/null
+++ b/internal/praefect/nodes/tracker/health_client.go
@@ -0,0 +1,40 @@
+package tracker
+
+import (
+ "context"
+ "fmt"
+
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/health/grpc_health_v1"
+)
+
+// HealthClient wraps a gRPC HealthClient and circuit breaks its health checks if
+// the error threshold has been reached.
+type HealthClient struct {
+ storage string
+ tracker ErrorTracker
+ grpc_health_v1.HealthClient
+}
+
+// NewHealthClient returns the HealthClient wrapped with error threshold circuit breaker.
+func NewHealthClient(client grpc_health_v1.HealthClient, storage string, tracker ErrorTracker) HealthClient {
+ return HealthClient{
+ tracker: tracker,
+ HealthClient: client,
+ storage: storage,
+ }
+}
+
+// Check circuit breaks the health check if write or read error thresholds have been reached. If not, it performs
+// the health check.
+func (hc HealthClient) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest, opts ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error) {
+ if hc.tracker.ReadThresholdReached(hc.storage) {
+ return nil, fmt.Errorf("read error threshold reached for storage %q", hc.storage)
+ }
+
+ if hc.tracker.WriteThresholdReached(hc.storage) {
+ return nil, fmt.Errorf("write error threshold reached for storage %q", hc.storage)
+ }
+
+ return hc.HealthClient.Check(ctx, req, opts...)
+}
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index bdf217ef5..29321e696 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -891,13 +891,16 @@ func TestErrorThreshold(t *testing.T) {
testCases := []struct {
desc string
accessor bool
+ error error
}{
{
desc: "read threshold reached",
accessor: true,
+ error: errors.New(`read error threshold reached for storage "praefect-internal-0"`),
},
{
- desc: "write threshold reached",
+ desc: "write threshold reached",
+ error: errors.New(`write error threshold reached for storage "praefect-internal-0"`),
},
}
@@ -917,7 +920,7 @@ func TestErrorThreshold(t *testing.T) {
writeThreshold = 5000
}
- errorTracker, err := tracker.NewErrors(ctx, 10*time.Second, readThreshold, writeThreshold)
+ errorTracker, err := tracker.NewErrors(ctx, 10*time.Hour, readThreshold, writeThreshold)
require.NoError(t, err)
rs := datastore.NewMemoryRepositoryStore(conf.StorageNames())
@@ -950,15 +953,19 @@ func TestErrorThreshold(t *testing.T) {
require.NoError(t, err)
cli := mock.NewSimpleServiceClient(conn)
- nodeMgr.Start(0, time.Nanosecond)
repo, _, cleanup := testhelper.NewTestRepo(t)
defer cleanup()
- shard, err := nodeMgr.GetShard(ctx, "default")
- require.NoError(t, err)
- require.Equal(t, "praefect-internal-0", shard.Primary.GetStorage())
+ node := nodeMgr.Nodes()["default"][0]
+ require.Equal(t, "praefect-internal-0", node.GetStorage())
- expectedErr := status.Error(codes.Internal, "something went wrong")
+ // to get the node in a healthy starting state, three consecutive successful checks
+ // are needed
+ for i := 0; i < 3; i++ {
+ healthy, err := node.CheckHealth(ctx)
+ require.NoError(t, err)
+ require.True(t, healthy)
+ }
for i := 0; i < 5; i++ {
ctx := grpc_metadata.AppendToOutgoingContext(ctx, "bad-header", "true")
@@ -968,17 +975,17 @@ func TestErrorThreshold(t *testing.T) {
handler = cli.RepoAccessorUnary
}
- _, err := handler(ctx, &mock.RepoRequest{Repo: repo})
- require.Equal(t, expectedErr, err)
- }
+ healthy, err := node.CheckHealth(ctx)
+ require.NoError(t, err)
+ require.True(t, healthy)
- for i := 0; i < 50; i++ {
- if _, err = nodeMgr.GetShard(ctx, "default"); err != nil {
- break
- }
- time.Sleep(1 * time.Millisecond)
+ _, err = handler(ctx, &mock.RepoRequest{Repo: repo})
+ require.Equal(t, status.Error(codes.Internal, "something went wrong"), err)
}
- require.Equal(t, nodes.ErrPrimaryNotHealthy, err)
+
+ healthy, err := node.CheckHealth(ctx)
+ require.Equal(t, tc.error, err)
+ require.False(t, healthy)
})
}
}