diff options
author | John Cai <jcai@gitlab.com> | 2020-06-25 20:40:09 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2020-07-23 00:58:55 +0300 |
commit | 2b8c681bc41227440de29470e21b7ec3f0c9d708 (patch) | |
tree | 9a5ad0758fe886549a9d61985a04cfc9badba071 | |
parent | e3bedb3507c01fbe8395dd76589e095d7da14e66 (diff) |
Use error tracker to determine if node is healthyjc-test-test
Hooks up the error tracker in the node manager so it checks if a certain
backend node has reached a threshold of errors. If it has, then it will
be deemed unhealthy.
18 files changed, 275 insertions, 70 deletions
diff --git a/changelogs/unreleased/jc-use-error-tracker.yml b/changelogs/unreleased/jc-use-error-tracker.yml new file mode 100644 index 000000000..a6537c112 --- /dev/null +++ b/changelogs/unreleased/jc-use-error-tracker.yml @@ -0,0 +1,5 @@ +--- +title: Use error tracker to determine if node is healthy +merge_request: 2341 +author: +type: added diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index 1553c9c33..d9bc7d888 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -101,6 +101,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/internal/praefect/metrics" "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" + "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes/tracker" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/internal/praefect/transactions" "gitlab.com/gitlab-org/gitaly/internal/version" @@ -232,7 +233,21 @@ func run(cfgs []starter.Config, conf config.Config) error { queue = datastore.NewPostgresReplicationEventQueue(db) } - nodeManager, err := nodes.NewManager(logger, conf, db, queue, nodeLatencyHistogram) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var errTracker tracker.ErrorTracker + + if conf.Failover.Enabled && conf.Failover.ValidErrorThresholds() { + errTracker, err = tracker.NewErrors(ctx, conf.Failover.ErrorThresholdWindow.Duration(), conf.Failover.ReadErrorThresholdCount, conf.Failover.WriteErrorThresholdCount) + if err != nil { + return err + } + } else { + logger.Warn("skipping read/write error threshold failover. To enable read/write error threshold failover, provide non-zero values for error_threshold_window, write_error_threshold_count, read_error_threshold_count") + } + + nodeManager, err := nodes.NewManager(logger, conf, db, queue, nodeLatencyHistogram, protoregistry.GitalyProtoPreregistered, errTracker) if err != nil { return err } @@ -283,9 +298,6 @@ func run(cfgs []starter.Config, conf config.Config) error { prometheus.MustRegister(repl) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - b, err := bootstrap.New() if err != nil { return fmt.Errorf("unable to create a bootstrap: %v", err) diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go index 48f786108..40126b275 100644 --- a/internal/praefect/auth_test.go +++ b/internal/praefect/auth_test.go @@ -168,7 +168,7 @@ func runServer(t *testing.T, token string, required bool) (*grpc.Server, string, logEntry := testhelper.DiscardTestEntry(t) queue := datastore.NewMemoryReplicationEventQueue(conf) - nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queue, promtest.NewMockHistogramVec()) + nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queue, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) require.NoError(t, err) txMgr := transactions.NewManager() diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go index f3f4a3f87..61014b6a4 100644 --- a/internal/praefect/config/config.go +++ b/internal/praefect/config/config.go @@ -23,6 +23,10 @@ type Failover struct { ReadErrorThresholdCount uint32 `toml:"read_error_threshold_count"` } +func (f Failover) ValidErrorThresholds() bool { + return f.ErrorThresholdWindow > 0 && f.WriteErrorThresholdCount > 0 && f.ReadErrorThresholdCount > 0 +} + const sqlFailoverValue = "sql" // Config is a container for everything found in the TOML config file diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 3336c21de..b3faf5067 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -158,7 +158,7 @@ func TestStreamDirectorMutator(t *testing.T) { entry := testhelper.DiscardTestEntry(t) - nodeMgr, err := nodes.NewManager(entry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec()) + nodeMgr, err := nodes.NewManager(entry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) require.NoError(t, err) txMgr := transactions.NewManager() @@ -290,7 +290,7 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { defer cancel() ctx = featureflag.IncomingCtxWithFeatureFlag(ctx, featureflag.ReferenceTransactions) - nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, queueInterceptor, promtest.NewMockHistogramVec()) + nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, queueInterceptor, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) require.NoError(t, err) shard, err := nodeMgr.GetShard(conf.VirtualStorages[0].Name) @@ -407,7 +407,7 @@ func TestStreamDirectorAccessor(t *testing.T) { entry := testhelper.DiscardTestEntry(t) - nodeMgr, err := nodes.NewManager(entry, conf, nil, queue, promtest.NewMockHistogramVec()) + nodeMgr, err := nodes.NewManager(entry, conf, nil, queue, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) require.NoError(t, err) nodeMgr.Start(0, time.Minute) @@ -484,7 +484,7 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) { entry := testhelper.DiscardTestEntry(t) - nodeMgr, err := nodes.NewManager(entry, conf, nil, queue, promtest.NewMockHistogramVec()) + nodeMgr, err := nodes.NewManager(entry, conf, nil, queue, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) require.NoError(t, err) nodeMgr.Start(0, time.Minute) @@ -676,7 +676,7 @@ func TestAbsentCorrelationID(t *testing.T) { entry := testhelper.DiscardTestEntry(t) - nodeMgr, err := nodes.NewManager(entry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec()) + nodeMgr, err := nodes.NewManager(entry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) require.NoError(t, err) txMgr := transactions.NewManager() @@ -878,7 +878,7 @@ func TestStreamDirectorStorageScope(t *testing.T) { Nodes: []*config.Node{primaryGitaly, secondaryGitaly}, }}} - nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec()) + nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) require.NoError(t, err) nodeMgr.Start(0, time.Second) coordinator := NewCoordinator(nil, nodeMgr, nil, conf, protoregistry.GitalyProtoPreregistered) diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go index 3ea4de70d..4cfcf2200 100644 --- a/internal/praefect/datastore/queue_test.go +++ b/internal/praefect/datastore/queue_test.go @@ -4,8 +4,8 @@ package datastore import ( "context" - "testing" "sync" + "testing" "time" "github.com/stretchr/testify/assert" diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go index dd0bd4acc..25ee2b3e6 100644 --- a/internal/praefect/helper_test.go +++ b/internal/praefect/helper_test.go @@ -189,7 +189,7 @@ func defaultTxMgr() *transactions.Manager { } func defaultNodeMgr(t testing.TB, conf config.Config, queue datastore.ReplicationEventQueue) nodes.Manager { - nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, queue, promtest.NewMockHistogramVec()) + nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, queue, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) require.NoError(t, err) nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond) return nodeMgr @@ -360,6 +360,9 @@ func dialLocalPort(tb testing.TB, port int, backend bool) *grpc.ClientConn { func newMockDownstream(tb testing.TB, token string, m mock.SimpleServiceServer) (string, func()) { srv := grpc.NewServer(grpc.UnaryInterceptor(auth.UnaryServerInterceptor(internalauth.Config{Token: token}))) mock.RegisterSimpleServiceServer(srv, m) + healthSrv := health.NewServer() + healthSrv.SetServingStatus("", healthpb.HealthCheckResponse_SERVING) + healthpb.RegisterHealthServer(srv, healthSrv) // client to backend service lis, port := listenAvailPort(tb) diff --git a/internal/praefect/middleware/errorhandler.go b/internal/praefect/middleware/errorhandler.go index c6d0e36ca..5339419a5 100644 --- a/internal/praefect/middleware/errorhandler.go +++ b/internal/praefect/middleware/errorhandler.go @@ -5,13 +5,13 @@ import ( "fmt" "io" - "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" + "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes/tracker" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "google.golang.org/grpc" ) // StreamErrorHandler returns a client interceptor that will track accessor/mutator errors from internal gitaly nodes -func StreamErrorHandler(registry *protoregistry.Registry, errorTracker nodes.ErrorTracker, nodeStorage string) grpc.StreamClientInterceptor { +func StreamErrorHandler(registry *protoregistry.Registry, errorTracker tracker.ErrorTracker, nodeStorage string) grpc.StreamClientInterceptor { return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { stream, err := streamer(ctx, desc, cc, method, opts...) @@ -27,12 +27,12 @@ func StreamErrorHandler(registry *protoregistry.Registry, errorTracker nodes.Err // catchErrorSteamer is a custom ClientStream that adheres to grpc.ClientStream but keeps track of accessor/mutator errors type catchErrorStreamer struct { grpc.ClientStream - errors nodes.ErrorTracker + errors tracker.ErrorTracker operation protoregistry.OpType nodeStorage string } -func newCatchErrorStreamer(streamer grpc.ClientStream, errors nodes.ErrorTracker, operation protoregistry.OpType, nodeStorage string) *catchErrorStreamer { +func newCatchErrorStreamer(streamer grpc.ClientStream, errors tracker.ErrorTracker, operation protoregistry.OpType, nodeStorage string) *catchErrorStreamer { return &catchErrorStreamer{ ClientStream: streamer, errors: errors, diff --git a/internal/praefect/middleware/errorhandler_test.go b/internal/praefect/middleware/errorhandler_test.go index 348d5c074..490c4a99d 100644 --- a/internal/praefect/middleware/errorhandler_test.go +++ b/internal/praefect/middleware/errorhandler_test.go @@ -13,7 +13,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" "gitlab.com/gitlab-org/gitaly/internal/praefect/mock" - "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" + "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes/tracker" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/internal/testhelper" "google.golang.org/grpc" @@ -47,7 +47,7 @@ func TestStreamInterceptor(t *testing.T) { window := 1 * time.Second threshold := 5 - errTracker, err := nodes.NewErrors(ctx, window, uint32(threshold), uint32(threshold)) + errTracker, err := tracker.NewErrors(ctx, window, uint32(threshold), uint32(threshold)) require.NoError(t, err) nodeName := "node-1" diff --git a/internal/praefect/nodes/local_elector_test.go b/internal/praefect/nodes/local_elector_test.go index 6689936a1..c882df686 100644 --- a/internal/praefect/nodes/local_elector_test.go +++ b/internal/praefect/nodes/local_elector_test.go @@ -26,8 +26,8 @@ func setupElector(t *testing.T) (*localElector, []*nodeStatus, *grpc.ClientConn, storageName := "default" mockHistogramVec0, mockHistogramVec1 := promtest.NewMockHistogramVec(), promtest.NewMockHistogramVec() - cs := newConnectionStatus(config.Node{Storage: storageName}, cc, testhelper.DiscardTestEntry(t), mockHistogramVec0) - secondary := newConnectionStatus(config.Node{Storage: storageName}, cc, testhelper.DiscardTestEntry(t), mockHistogramVec1) + cs := newConnectionStatus(config.Node{Storage: storageName}, cc, testhelper.DiscardTestEntry(t), mockHistogramVec0, nil) + secondary := newConnectionStatus(config.Node{Storage: storageName}, cc, testhelper.DiscardTestEntry(t), mockHistogramVec1, nil) ns := []*nodeStatus{cs, secondary} logger := testhelper.NewTestLogger(t).WithField("test", t.Name()) strategy := newLocalElector(storageName, true, true, logger, ns) diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go index 96c5a61ec..4925e8568 100644 --- a/internal/praefect/nodes/manager.go +++ b/internal/praefect/nodes/manager.go @@ -20,6 +20,9 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" "gitlab.com/gitlab-org/gitaly/internal/praefect/metrics" + "gitlab.com/gitlab-org/gitaly/internal/praefect/middleware" + "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes/tracker" + "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" prommetrics "gitlab.com/gitlab-org/gitaly/internal/prometheus/metrics" correlation "gitlab.com/gitlab-org/labkit/correlation/grpc" grpctracing "gitlab.com/gitlab-org/labkit/tracing/grpc" @@ -127,7 +130,15 @@ var ErrPrimaryNotHealthy = errors.New("primary is not healthy") const dialTimeout = 10 * time.Second // NewManager creates a new NodeMgr based on virtual storage configs -func NewManager(log *logrus.Entry, c config.Config, db *sql.DB, queue datastore.ReplicationEventQueue, latencyHistogram prommetrics.HistogramVec, dialOpts ...grpc.DialOption) (*Mgr, error) { +func NewManager( + log *logrus.Entry, + c config.Config, + db *sql.DB, + queue datastore.ReplicationEventQueue, + latencyHistogram prommetrics.HistogramVec, + registry *protoregistry.Registry, + errorTracker tracker.ErrorTracker, + dialOpts ...grpc.DialOption) (*Mgr, error) { strategies := make(map[string]leaderElectionStrategy, len(c.VirtualStorages)) ctx, cancel := context.WithTimeout(context.Background(), dialTimeout) @@ -138,28 +149,28 @@ func NewManager(log *logrus.Entry, c config.Config, db *sql.DB, queue datastore. ns := make([]*nodeStatus, 0, len(virtualStorage.Nodes)) for _, node := range virtualStorage.Nodes { + streamInterceptors := []grpc.StreamClientInterceptor{ + grpc_prometheus.StreamClientInterceptor, + grpctracing.StreamClientTracingInterceptor(), + correlation.StreamClientCorrelationInterceptor(), + } + + if c.Failover.Enabled && errorTracker != nil { + streamInterceptors = append(streamInterceptors, middleware.StreamErrorHandler(registry, errorTracker, node.Storage)) + } + conn, err := client.DialContext(ctx, node.Address, - append( - []grpc.DialOption{ - grpc.WithBlock(), - grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec())), - grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(node.Token)), - grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( - grpc_prometheus.StreamClientInterceptor, - grpctracing.StreamClientTracingInterceptor(), - correlation.StreamClientCorrelationInterceptor(), - )), - grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( - grpc_prometheus.UnaryClientInterceptor, - grpctracing.UnaryClientTracingInterceptor(), - correlation.UnaryClientCorrelationInterceptor(), - )), - }, dialOpts...), + append([]grpc.DialOption{ + grpc.WithBlock(), + grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec())), + grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(node.Token)), + grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(streamInterceptors...)), + }, dialOpts...), ) if err != nil { return nil, err } - cs := newConnectionStatus(*node, conn, log, latencyHistogram) + cs := newConnectionStatus(*node, conn, log, latencyHistogram, errorTracker) ns = append(ns, cs) } @@ -213,7 +224,12 @@ func (n *Mgr) GetShard(virtualStorageName string) (Shard, error) { return Shard{}, fmt.Errorf("virtual storage %q: %w", virtualStorageName, ErrVirtualStorageNotExist) } - return strategy.GetShard() + shard, err := strategy.GetShard() + if err != nil { + return shard, err + } + + return shard, nil } func (n *Mgr) EnableWrites(ctx context.Context, virtualStorageName string) error { @@ -272,12 +288,13 @@ func (n *Mgr) GetSyncedNode(ctx context.Context, virtualStorageName, repoPath st return nil, errors.New("could not select random storage") } -func newConnectionStatus(node config.Node, cc *grpc.ClientConn, l logrus.FieldLogger, latencyHist prommetrics.HistogramVec) *nodeStatus { +func newConnectionStatus(node config.Node, cc *grpc.ClientConn, l logrus.FieldLogger, latencyHist prommetrics.HistogramVec, errorTracker tracker.ErrorTracker) *nodeStatus { return &nodeStatus{ node: node, clientConn: cc, log: l, latencyHist: latencyHist, + errTracker: errorTracker, } } @@ -288,6 +305,7 @@ type nodeStatus struct { latencyHist prommetrics.HistogramVec mtx sync.RWMutex statuses []bool + errTracker tracker.ErrorTracker } // GetStorage gets the storage name of a node @@ -341,6 +359,18 @@ func (n *nodeStatus) updateStatus(status bool) { } func (n *nodeStatus) CheckHealth(ctx context.Context) (bool, error) { + if n.errTracker != nil { + if n.errTracker.ReadThresholdReached(n.GetStorage()) { + n.updateStatus(false) + return false, fmt.Errorf("read error threshold reached for storage %q", n.GetStorage()) + } + + if n.errTracker.WriteThresholdReached(n.GetStorage()) { + n.updateStatus(false) + return false, fmt.Errorf("write error threshold reached for storage %q", n.GetStorage()) + } + } + health := healthpb.NewHealthClient(n.clientConn) ctx, cancel := context.WithTimeout(ctx, healthcheckTimeout) defer cancel() diff --git a/internal/praefect/nodes/manager_test.go b/internal/praefect/nodes/manager_test.go index d958dc06d..228ad177d 100644 --- a/internal/praefect/nodes/manager_test.go +++ b/internal/praefect/nodes/manager_test.go @@ -12,6 +12,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/internal/testhelper" "gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest" "google.golang.org/grpc" @@ -73,7 +74,7 @@ func TestNodeStatus(t *testing.T) { mockHistogramVec := promtest.NewMockHistogramVec() storageName := "default" - cs := newConnectionStatus(config.Node{Storage: storageName}, cc, testhelper.DiscardTestEntry(t), mockHistogramVec) + cs := newConnectionStatus(config.Node{Storage: storageName}, cc, testhelper.DiscardTestEntry(t), mockHistogramVec, nil) var expectedLabels [][]string for i := 0; i < healthcheckThreshold; i++ { @@ -124,7 +125,7 @@ func TestManagerFailoverDisabledElectionStrategySQL(t *testing.T) { Failover: config.Failover{Enabled: false, ElectionStrategy: "sql"}, VirtualStorages: []*config.VirtualStorage{virtualStorage}, } - nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec()) + nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) require.NoError(t, err) nm.Start(time.Millisecond, time.Millisecond) @@ -175,7 +176,7 @@ func TestBlockingDial(t *testing.T) { healthSrv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING) }() - mgr, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec()) + mgr, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) require.NoError(t, err) mgr.Start(1*time.Millisecond, 1*time.Millisecond) @@ -221,10 +222,10 @@ func TestNodeManager(t *testing.T) { } mockHistogram := promtest.NewMockHistogramVec() - nm, err := NewManager(testhelper.DiscardTestEntry(t), confWithFailover, nil, nil, mockHistogram) + nm, err := NewManager(testhelper.DiscardTestEntry(t), confWithFailover, nil, nil, mockHistogram, protoregistry.GitalyProtoPreregistered, nil) require.NoError(t, err) - nmWithoutFailover, err := NewManager(testhelper.DiscardTestEntry(t), confWithoutFailover, nil, nil, mockHistogram) + nmWithoutFailover, err := NewManager(testhelper.DiscardTestEntry(t), confWithoutFailover, nil, nil, mockHistogram, protoregistry.GitalyProtoPreregistered, nil) require.NoError(t, err) nm.Start(1*time.Millisecond, 5*time.Second) @@ -401,7 +402,7 @@ func TestMgr_GetSyncedNode(t *testing.T) { verify := func(scenario func(t *testing.T, nm Manager, queue datastore.ReplicationEventQueue)) func(*testing.T) { queue := datastore.NewMemoryReplicationEventQueue(conf) - nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, queue, mockHistogram) + nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, queue, mockHistogram, protoregistry.GitalyProtoPreregistered, nil) require.NoError(t, err) nm.Start(time.Duration(0), time.Hour) @@ -567,12 +568,12 @@ func TestNodeStatus_IsHealthy(t *testing.T) { latencyHistMock := &promtest.MockHistogramVec{} t.Run("unchecked node is unhealthy", func(t *testing.T) { - ns := newConnectionStatus(node, clientConn, logger, latencyHistMock) + ns := newConnectionStatus(node, clientConn, logger, latencyHistMock, nil) require.False(t, ns.IsHealthy()) }) t.Run("not enough check to consider it healthy", func(t *testing.T) { - ns := newConnectionStatus(node, clientConn, logger, latencyHistMock) + ns := newConnectionStatus(node, clientConn, logger, latencyHistMock, nil) healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING) checkNTimes(ctx, t, ns, healthcheckThreshold-1) @@ -580,7 +581,7 @@ func TestNodeStatus_IsHealthy(t *testing.T) { }) t.Run("healthy", func(t *testing.T) { - ns := newConnectionStatus(node, clientConn, logger, latencyHistMock) + ns := newConnectionStatus(node, clientConn, logger, latencyHistMock, nil) healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING) checkNTimes(ctx, t, ns, healthcheckThreshold) @@ -588,7 +589,7 @@ func TestNodeStatus_IsHealthy(t *testing.T) { }) t.Run("healthy turns into unhealthy after single failed check", func(t *testing.T) { - ns := newConnectionStatus(node, clientConn, logger, latencyHistMock) + ns := newConnectionStatus(node, clientConn, logger, latencyHistMock, nil) healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING) checkNTimes(ctx, t, ns, healthcheckThreshold) @@ -601,7 +602,7 @@ func TestNodeStatus_IsHealthy(t *testing.T) { }) t.Run("unhealthy turns into healthy after pre-define threshold of checks", func(t *testing.T) { - ns := newConnectionStatus(node, clientConn, logger, latencyHistMock) + ns := newConnectionStatus(node, clientConn, logger, latencyHistMock, nil) healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING) checkNTimes(ctx, t, ns, healthcheckThreshold) @@ -623,7 +624,7 @@ func TestNodeStatus_IsHealthy(t *testing.T) { }) t.Run("concurrent access has no races", func(t *testing.T) { - ns := newConnectionStatus(node, clientConn, logger, latencyHistMock) + ns := newConnectionStatus(node, clientConn, logger, latencyHistMock, nil) healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING) t.Run("continuously does health checks - 1", func(t *testing.T) { diff --git a/internal/praefect/nodes/sql_elector_test.go b/internal/praefect/nodes/sql_elector_test.go index a3bb5db96..1f1899ab4 100644 --- a/internal/praefect/nodes/sql_elector_test.go +++ b/internal/praefect/nodes/sql_elector_test.go @@ -42,7 +42,7 @@ func TestGetPrimaryAndSecondaries(t *testing.T) { storageName := "default" mockHistogramVec0 := promtest.NewMockHistogramVec() - cs0 := newConnectionStatus(config.Node{Storage: storageName + "-0"}, cc0, testhelper.DiscardTestEntry(t), mockHistogramVec0) + cs0 := newConnectionStatus(config.Node{Storage: storageName + "-0"}, cc0, testhelper.DiscardTestEntry(t), mockHistogramVec0, nil) ns := []*nodeStatus{cs0} elector := newSQLElector(shardName, conf, db.DB, logger, ns) @@ -99,8 +99,8 @@ func TestBasicFailover(t *testing.T) { storageName := "default" - cs0 := newConnectionStatus(config.Node{Storage: storageName + "-0", Address: addr0}, cc0, logger, promtest.NewMockHistogramVec()) - cs1 := newConnectionStatus(config.Node{Storage: storageName + "-1", Address: addr1}, cc1, logger, promtest.NewMockHistogramVec()) + cs0 := newConnectionStatus(config.Node{Storage: storageName + "-0", Address: addr0}, cc0, logger, promtest.NewMockHistogramVec(), nil) + cs1 := newConnectionStatus(config.Node{Storage: storageName + "-1", Address: addr1}, cc1, logger, promtest.NewMockHistogramVec(), nil) ns := []*nodeStatus{cs0, cs1} elector := newSQLElector(shardName, conf, db.DB, logger, ns) diff --git a/internal/praefect/nodes/errors.go b/internal/praefect/nodes/tracker/errors.go index 2faaeb3e0..6997f9ec7 100644 --- a/internal/praefect/nodes/errors.go +++ b/internal/praefect/nodes/tracker/errors.go @@ -1,4 +1,4 @@ -package nodes +package tracker import ( "context" diff --git a/internal/praefect/nodes/errors_test.go b/internal/praefect/nodes/tracker/errors_test.go index f5b42e06a..95b4a3ab9 100644 --- a/internal/praefect/nodes/errors_test.go +++ b/internal/praefect/nodes/tracker/errors_test.go @@ -1,4 +1,4 @@ -package nodes +package tracker import ( "sync" diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 2ca47ee13..f07c94af0 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -123,7 +123,7 @@ func TestProcessReplicationJob(t *testing.T) { entry := testhelper.DiscardTestEntry(t) - nodeMgr, err := nodes.NewManager(entry, conf, nil, queue, promtest.NewMockHistogramVec()) + nodeMgr, err := nodes.NewManager(entry, conf, nil, queue, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) require.NoError(t, err) nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond) @@ -217,7 +217,7 @@ func TestPropagateReplicationJob(t *testing.T) { queue := datastore.NewMemoryReplicationEventQueue(conf) logEntry := testhelper.DiscardTestEntry(t) - nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queue, promtest.NewMockHistogramVec()) + nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queue, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) require.NoError(t, err) nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond) @@ -501,7 +501,7 @@ func TestProcessBacklog_FailedJobs(t *testing.T) { logEntry := testhelper.DiscardTestEntry(t) - nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec()) + nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) require.NoError(t, err) replMgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), queueInterceptor, nodeMgr) @@ -640,7 +640,7 @@ func TestProcessBacklog_Success(t *testing.T) { logEntry := testhelper.DiscardTestEntry(t) - nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec()) + nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) require.NoError(t, err) replMgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), queueInterceptor, nodeMgr) diff --git a/internal/praefect/server_factory_test.go b/internal/praefect/server_factory_test.go index c0f44ca9f..77410cd1f 100644 --- a/internal/praefect/server_factory_test.go +++ b/internal/praefect/server_factory_test.go @@ -77,7 +77,7 @@ func TestServerFactory(t *testing.T) { logger := testhelper.DiscardTestEntry(t) queue := datastore.NewMemoryReplicationEventQueue(conf) - nodeMgr, err := nodes.NewManager(logger, conf, nil, queue, &promtest.MockHistogramVec{}) + nodeMgr, err := nodes.NewManager(logger, conf, nil, queue, &promtest.MockHistogramVec{}, protoregistry.GitalyProtoPreregistered, nil) require.NoError(t, err) txMgr := transactions.NewManager() registry := protoregistry.GitalyProtoPreregistered diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index 04f1843e6..8e7b8d8b3 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -16,6 +16,7 @@ import ( "time" "github.com/golang/protobuf/proto" + "github.com/golang/protobuf/ptypes/empty" "github.com/sirupsen/logrus" "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" @@ -31,6 +32,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/metadata" "gitlab.com/gitlab-org/gitaly/internal/praefect/mock" "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" + "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes/tracker" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/internal/praefect/transactions" "gitlab.com/gitlab-org/gitaly/internal/testhelper" @@ -40,7 +42,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/health" "google.golang.org/grpc/health/grpc_health_v1" - "google.golang.org/grpc/reflection" + grpc_metadata "google.golang.org/grpc/metadata" ) func TestServerRouteServerAccessor(t *testing.T) { @@ -731,7 +733,7 @@ func (m *mockSmartHTTP) Called(method string) int { return m.methodsCalled[method] } -func newGrpcServer(t *testing.T, srv gitalypb.SmartHTTPServiceServer) (string, *grpc.Server) { +func newSmartHTTPGrpcServer(t *testing.T, srv gitalypb.SmartHTTPServiceServer) (string, *grpc.Server) { socketPath := testhelper.GetTemporaryGitalySocketFileName() listener, err := net.Listen("unix", socketPath) require.NoError(t, err) @@ -742,7 +744,6 @@ func newGrpcServer(t *testing.T, srv gitalypb.SmartHTTPServiceServer) (string, * grpc_health_v1.RegisterHealthServer(grpcServer, healthSrvr) healthSrvr.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING) gitalypb.RegisterSmartHTTPServiceServer(grpcServer, srv) - reflection.Register(grpcServer) go grpcServer.Serve(listener) @@ -754,11 +755,11 @@ func TestProxyWrites(t *testing.T) { smartHTTP0, smartHTTP1, smartHTTP2 := &mockSmartHTTP{txMgr: txMgr}, &mockSmartHTTP{txMgr: txMgr}, &mockSmartHTTP{txMgr: txMgr} - socket0, srv0 := newGrpcServer(t, smartHTTP0) + socket0, srv0 := newSmartHTTPGrpcServer(t, smartHTTP0) defer srv0.Stop() - socket1, srv1 := newGrpcServer(t, smartHTTP1) + socket1, srv1 := newSmartHTTPGrpcServer(t, smartHTTP1) defer srv1.Stop() - socket2, srv2 := newGrpcServer(t, smartHTTP2) + socket2, srv2 := newSmartHTTPGrpcServer(t, smartHTTP2) defer srv2.Stop() conf := config.Config{ @@ -786,7 +787,7 @@ func TestProxyWrites(t *testing.T) { queue := datastore.NewMemoryReplicationEventQueue(conf) entry := testhelper.DiscardTestEntry(t) - nodeMgr, err := nodes.NewManager(entry, conf, nil, queue, promtest.NewMockHistogramVec()) + nodeMgr, err := nodes.NewManager(entry, conf, nil, queue, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) require.NoError(t, err) coordinator := NewCoordinator(queue, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered) @@ -855,6 +856,155 @@ func TestProxyWrites(t *testing.T) { assert.Equal(t, bytes.Repeat([]byte(payload), 10), receivedData.Bytes()) } +type errorSimpleService struct { +} + +// ServerAccessor is implemented by a callback +func (m *errorSimpleService) ServerAccessor(ctx context.Context, req *mock.SimpleRequest) (*mock.SimpleResponse, error) { + return nil, helper.ErrInternalf("something went wrong") +} + +// RepoAccessorUnary is implemented by a callback +func (m *errorSimpleService) RepoAccessorUnary(ctx context.Context, req *mock.RepoRequest) (*empty.Empty, error) { + md, ok := grpc_metadata.FromIncomingContext(ctx) + if !ok { + return &empty.Empty{}, errors.New("couldn't read metadata") + } + + if md.Get("bad-header")[0] == "true" { + return &empty.Empty{}, helper.ErrInternalf("something went wrong") + } + + return &empty.Empty{}, nil +} + +// RepoMutatorUnary is implemented by a callback +func (m *errorSimpleService) RepoMutatorUnary(ctx context.Context, req *mock.RepoRequest) (*empty.Empty, error) { + md, ok := grpc_metadata.FromIncomingContext(ctx) + if !ok { + return &empty.Empty{}, errors.New("couldn't read metadata") + } + + if md.Get("bad-header")[0] == "true" { + return &empty.Empty{}, helper.ErrInternalf("something went wrong") + } + + return &empty.Empty{}, nil +} + +func TestErrorTreshold(t *testing.T) { + simple := &errorSimpleService{} + + backendToken := "" + backend, cleanup := newMockDownstream(t, backendToken, simple) + defer cleanup() + + conf := config.Config{ + VirtualStorages: []*config.VirtualStorage{ + { + Name: "default", + Nodes: []*config.Node{ + { + Storage: "praefect-internal-0", + Address: backend, + }, + }, + }, + }, + Failover: config.Failover{ + Enabled: true, + }, + } + + ctx, cancel := testhelper.Context() + defer cancel() + + queue := datastore.NewMemoryReplicationEventQueue(conf) + entry := testhelper.DiscardTestEntry(t) + + testCases := []struct { + desc string + readThreshold uint32 + writeThreshold uint32 + }{ + { + desc: "read threshold reached", + readThreshold: 5, + writeThreshold: 10000, + }, + { + desc: "write threshold reached", + readThreshold: 10000, + writeThreshold: 5, + }, + } + + gz := proto.FileDescriptor("mock.proto") + fd, err := protoregistry.ExtractFileDescriptor(gz) + if err != nil { + t.Fatal(err) + } + + registry, err := protoregistry.New(fd) + require.NoError(t, err) + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + errorTracker, err := tracker.NewErrors(ctx, 10*time.Second, tc.readThreshold, tc.writeThreshold) + require.NoError(t, err) + + nodeMgr, err := nodes.NewManager(entry, conf, nil, queue, promtest.NewMockHistogramVec(), registry, errorTracker) + require.NoError(t, err) + + coordinator := NewCoordinator(queue, nodeMgr, nil, conf, registry) + + server := grpc.NewServer( + grpc.CustomCodec(proxy.NewCodec()), + grpc.UnknownServiceHandler(proxy.TransparentHandler(coordinator.StreamDirector)), + ) + + socket := testhelper.GetTemporaryGitalySocketFileName() + listener, err := net.Listen("unix", socket) + require.NoError(t, err) + + go server.Serve(listener) + defer server.Stop() + + conn, err := dial("unix://"+socket, []grpc.DialOption{grpc.WithInsecure()}) + require.NoError(t, err) + cli := mock.NewSimpleServiceClient(conn) + + nodeMgr.Start(0, 5*time.Millisecond) + repo, _, cleanup := testhelper.NewTestRepo(t) + defer cleanup() + + calls := 6 + + for i := 0; i < calls; i++ { + ctx := grpc_metadata.AppendToOutgoingContext(ctx, "bad-header", "true") + + _, err = cli.RepoAccessorUnary(ctx, &mock.RepoRequest{Repo: repo}) + require.Error(t, err) + + _, err = cli.RepoMutatorUnary(ctx, &mock.RepoRequest{Repo: repo}) + require.Error(t, err) + } + + time.Sleep(5 * time.Millisecond) + + _, accessorErr := cli.RepoAccessorUnary(ctx, &mock.RepoRequest{Repo: repo}) + _, mutatorErr := cli.RepoMutatorUnary(ctx, &mock.RepoRequest{Repo: repo}) + + if calls >= int(tc.readThreshold) { + require.Error(t, accessorErr) + } + if calls >= int(tc.writeThreshold) { + require.Error(t, mutatorErr) + } + }) + } +} + func newSmartHTTPClient(t *testing.T, serverSocketPath string) (gitalypb.SmartHTTPServiceClient, *grpc.ClientConn) { t.Helper() |