Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2020-06-25 20:40:09 +0300
committerJohn Cai <jcai@gitlab.com>2020-07-23 00:58:55 +0300
commit2b8c681bc41227440de29470e21b7ec3f0c9d708 (patch)
tree9a5ad0758fe886549a9d61985a04cfc9badba071
parente3bedb3507c01fbe8395dd76589e095d7da14e66 (diff)
Use error tracker to determine if node is healthyjc-test-test
Hooks up the error tracker in the node manager so it checks if a certain backend node has reached a threshold of errors. If it has, then it will be deemed unhealthy.
-rw-r--r--changelogs/unreleased/jc-use-error-tracker.yml5
-rw-r--r--cmd/praefect/main.go20
-rw-r--r--internal/praefect/auth_test.go2
-rw-r--r--internal/praefect/config/config.go4
-rw-r--r--internal/praefect/coordinator_test.go12
-rw-r--r--internal/praefect/datastore/queue_test.go2
-rw-r--r--internal/praefect/helper_test.go5
-rw-r--r--internal/praefect/middleware/errorhandler.go8
-rw-r--r--internal/praefect/middleware/errorhandler_test.go4
-rw-r--r--internal/praefect/nodes/local_elector_test.go4
-rw-r--r--internal/praefect/nodes/manager.go70
-rw-r--r--internal/praefect/nodes/manager_test.go25
-rw-r--r--internal/praefect/nodes/sql_elector_test.go6
-rw-r--r--internal/praefect/nodes/tracker/errors.go (renamed from internal/praefect/nodes/errors.go)2
-rw-r--r--internal/praefect/nodes/tracker/errors_test.go (renamed from internal/praefect/nodes/errors_test.go)2
-rw-r--r--internal/praefect/replicator_test.go8
-rw-r--r--internal/praefect/server_factory_test.go2
-rw-r--r--internal/praefect/server_test.go164
18 files changed, 275 insertions, 70 deletions
diff --git a/changelogs/unreleased/jc-use-error-tracker.yml b/changelogs/unreleased/jc-use-error-tracker.yml
new file mode 100644
index 000000000..a6537c112
--- /dev/null
+++ b/changelogs/unreleased/jc-use-error-tracker.yml
@@ -0,0 +1,5 @@
+---
+title: Use error tracker to determine if node is healthy
+merge_request: 2341
+author:
+type: added
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index 1553c9c33..d9bc7d888 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -101,6 +101,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/internal/praefect/metrics"
"gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes/tracker"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/internal/praefect/transactions"
"gitlab.com/gitlab-org/gitaly/internal/version"
@@ -232,7 +233,21 @@ func run(cfgs []starter.Config, conf config.Config) error {
queue = datastore.NewPostgresReplicationEventQueue(db)
}
- nodeManager, err := nodes.NewManager(logger, conf, db, queue, nodeLatencyHistogram)
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ var errTracker tracker.ErrorTracker
+
+ if conf.Failover.Enabled && conf.Failover.ValidErrorThresholds() {
+ errTracker, err = tracker.NewErrors(ctx, conf.Failover.ErrorThresholdWindow.Duration(), conf.Failover.ReadErrorThresholdCount, conf.Failover.WriteErrorThresholdCount)
+ if err != nil {
+ return err
+ }
+ } else {
+ logger.Warn("skipping read/write error threshold failover. To enable read/write error threshold failover, provide non-zero values for error_threshold_window, write_error_threshold_count, read_error_threshold_count")
+ }
+
+ nodeManager, err := nodes.NewManager(logger, conf, db, queue, nodeLatencyHistogram, protoregistry.GitalyProtoPreregistered, errTracker)
if err != nil {
return err
}
@@ -283,9 +298,6 @@ func run(cfgs []starter.Config, conf config.Config) error {
prometheus.MustRegister(repl)
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
b, err := bootstrap.New()
if err != nil {
return fmt.Errorf("unable to create a bootstrap: %v", err)
diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go
index 48f786108..40126b275 100644
--- a/internal/praefect/auth_test.go
+++ b/internal/praefect/auth_test.go
@@ -168,7 +168,7 @@ func runServer(t *testing.T, token string, required bool) (*grpc.Server, string,
logEntry := testhelper.DiscardTestEntry(t)
queue := datastore.NewMemoryReplicationEventQueue(conf)
- nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queue, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queue, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
require.NoError(t, err)
txMgr := transactions.NewManager()
diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go
index f3f4a3f87..61014b6a4 100644
--- a/internal/praefect/config/config.go
+++ b/internal/praefect/config/config.go
@@ -23,6 +23,10 @@ type Failover struct {
ReadErrorThresholdCount uint32 `toml:"read_error_threshold_count"`
}
+func (f Failover) ValidErrorThresholds() bool {
+ return f.ErrorThresholdWindow > 0 && f.WriteErrorThresholdCount > 0 && f.ReadErrorThresholdCount > 0
+}
+
const sqlFailoverValue = "sql"
// Config is a container for everything found in the TOML config file
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 3336c21de..b3faf5067 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -158,7 +158,7 @@ func TestStreamDirectorMutator(t *testing.T) {
entry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(entry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
require.NoError(t, err)
txMgr := transactions.NewManager()
@@ -290,7 +290,7 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) {
defer cancel()
ctx = featureflag.IncomingCtxWithFeatureFlag(ctx, featureflag.ReferenceTransactions)
- nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, queueInterceptor, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, queueInterceptor, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
require.NoError(t, err)
shard, err := nodeMgr.GetShard(conf.VirtualStorages[0].Name)
@@ -407,7 +407,7 @@ func TestStreamDirectorAccessor(t *testing.T) {
entry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(entry, conf, nil, queue, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, queue, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
require.NoError(t, err)
nodeMgr.Start(0, time.Minute)
@@ -484,7 +484,7 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) {
entry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(entry, conf, nil, queue, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, queue, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
require.NoError(t, err)
nodeMgr.Start(0, time.Minute)
@@ -676,7 +676,7 @@ func TestAbsentCorrelationID(t *testing.T) {
entry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(entry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
require.NoError(t, err)
txMgr := transactions.NewManager()
@@ -878,7 +878,7 @@ func TestStreamDirectorStorageScope(t *testing.T) {
Nodes: []*config.Node{primaryGitaly, secondaryGitaly},
}}}
- nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
require.NoError(t, err)
nodeMgr.Start(0, time.Second)
coordinator := NewCoordinator(nil, nodeMgr, nil, conf, protoregistry.GitalyProtoPreregistered)
diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go
index 3ea4de70d..4cfcf2200 100644
--- a/internal/praefect/datastore/queue_test.go
+++ b/internal/praefect/datastore/queue_test.go
@@ -4,8 +4,8 @@ package datastore
import (
"context"
- "testing"
"sync"
+ "testing"
"time"
"github.com/stretchr/testify/assert"
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index dd0bd4acc..25ee2b3e6 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -189,7 +189,7 @@ func defaultTxMgr() *transactions.Manager {
}
func defaultNodeMgr(t testing.TB, conf config.Config, queue datastore.ReplicationEventQueue) nodes.Manager {
- nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, queue, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, queue, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
require.NoError(t, err)
nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond)
return nodeMgr
@@ -360,6 +360,9 @@ func dialLocalPort(tb testing.TB, port int, backend bool) *grpc.ClientConn {
func newMockDownstream(tb testing.TB, token string, m mock.SimpleServiceServer) (string, func()) {
srv := grpc.NewServer(grpc.UnaryInterceptor(auth.UnaryServerInterceptor(internalauth.Config{Token: token})))
mock.RegisterSimpleServiceServer(srv, m)
+ healthSrv := health.NewServer()
+ healthSrv.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
+ healthpb.RegisterHealthServer(srv, healthSrv)
// client to backend service
lis, port := listenAvailPort(tb)
diff --git a/internal/praefect/middleware/errorhandler.go b/internal/praefect/middleware/errorhandler.go
index c6d0e36ca..5339419a5 100644
--- a/internal/praefect/middleware/errorhandler.go
+++ b/internal/praefect/middleware/errorhandler.go
@@ -5,13 +5,13 @@ import (
"fmt"
"io"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes/tracker"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"google.golang.org/grpc"
)
// StreamErrorHandler returns a client interceptor that will track accessor/mutator errors from internal gitaly nodes
-func StreamErrorHandler(registry *protoregistry.Registry, errorTracker nodes.ErrorTracker, nodeStorage string) grpc.StreamClientInterceptor {
+func StreamErrorHandler(registry *protoregistry.Registry, errorTracker tracker.ErrorTracker, nodeStorage string) grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
stream, err := streamer(ctx, desc, cc, method, opts...)
@@ -27,12 +27,12 @@ func StreamErrorHandler(registry *protoregistry.Registry, errorTracker nodes.Err
// catchErrorSteamer is a custom ClientStream that adheres to grpc.ClientStream but keeps track of accessor/mutator errors
type catchErrorStreamer struct {
grpc.ClientStream
- errors nodes.ErrorTracker
+ errors tracker.ErrorTracker
operation protoregistry.OpType
nodeStorage string
}
-func newCatchErrorStreamer(streamer grpc.ClientStream, errors nodes.ErrorTracker, operation protoregistry.OpType, nodeStorage string) *catchErrorStreamer {
+func newCatchErrorStreamer(streamer grpc.ClientStream, errors tracker.ErrorTracker, operation protoregistry.OpType, nodeStorage string) *catchErrorStreamer {
return &catchErrorStreamer{
ClientStream: streamer,
errors: errors,
diff --git a/internal/praefect/middleware/errorhandler_test.go b/internal/praefect/middleware/errorhandler_test.go
index 348d5c074..490c4a99d 100644
--- a/internal/praefect/middleware/errorhandler_test.go
+++ b/internal/praefect/middleware/errorhandler_test.go
@@ -13,7 +13,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
"gitlab.com/gitlab-org/gitaly/internal/praefect/mock"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes/tracker"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"google.golang.org/grpc"
@@ -47,7 +47,7 @@ func TestStreamInterceptor(t *testing.T) {
window := 1 * time.Second
threshold := 5
- errTracker, err := nodes.NewErrors(ctx, window, uint32(threshold), uint32(threshold))
+ errTracker, err := tracker.NewErrors(ctx, window, uint32(threshold), uint32(threshold))
require.NoError(t, err)
nodeName := "node-1"
diff --git a/internal/praefect/nodes/local_elector_test.go b/internal/praefect/nodes/local_elector_test.go
index 6689936a1..c882df686 100644
--- a/internal/praefect/nodes/local_elector_test.go
+++ b/internal/praefect/nodes/local_elector_test.go
@@ -26,8 +26,8 @@ func setupElector(t *testing.T) (*localElector, []*nodeStatus, *grpc.ClientConn,
storageName := "default"
mockHistogramVec0, mockHistogramVec1 := promtest.NewMockHistogramVec(), promtest.NewMockHistogramVec()
- cs := newConnectionStatus(config.Node{Storage: storageName}, cc, testhelper.DiscardTestEntry(t), mockHistogramVec0)
- secondary := newConnectionStatus(config.Node{Storage: storageName}, cc, testhelper.DiscardTestEntry(t), mockHistogramVec1)
+ cs := newConnectionStatus(config.Node{Storage: storageName}, cc, testhelper.DiscardTestEntry(t), mockHistogramVec0, nil)
+ secondary := newConnectionStatus(config.Node{Storage: storageName}, cc, testhelper.DiscardTestEntry(t), mockHistogramVec1, nil)
ns := []*nodeStatus{cs, secondary}
logger := testhelper.NewTestLogger(t).WithField("test", t.Name())
strategy := newLocalElector(storageName, true, true, logger, ns)
diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go
index 96c5a61ec..4925e8568 100644
--- a/internal/praefect/nodes/manager.go
+++ b/internal/praefect/nodes/manager.go
@@ -20,6 +20,9 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
"gitlab.com/gitlab-org/gitaly/internal/praefect/metrics"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/middleware"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes/tracker"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
prommetrics "gitlab.com/gitlab-org/gitaly/internal/prometheus/metrics"
correlation "gitlab.com/gitlab-org/labkit/correlation/grpc"
grpctracing "gitlab.com/gitlab-org/labkit/tracing/grpc"
@@ -127,7 +130,15 @@ var ErrPrimaryNotHealthy = errors.New("primary is not healthy")
const dialTimeout = 10 * time.Second
// NewManager creates a new NodeMgr based on virtual storage configs
-func NewManager(log *logrus.Entry, c config.Config, db *sql.DB, queue datastore.ReplicationEventQueue, latencyHistogram prommetrics.HistogramVec, dialOpts ...grpc.DialOption) (*Mgr, error) {
+func NewManager(
+ log *logrus.Entry,
+ c config.Config,
+ db *sql.DB,
+ queue datastore.ReplicationEventQueue,
+ latencyHistogram prommetrics.HistogramVec,
+ registry *protoregistry.Registry,
+ errorTracker tracker.ErrorTracker,
+ dialOpts ...grpc.DialOption) (*Mgr, error) {
strategies := make(map[string]leaderElectionStrategy, len(c.VirtualStorages))
ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
@@ -138,28 +149,28 @@ func NewManager(log *logrus.Entry, c config.Config, db *sql.DB, queue datastore.
ns := make([]*nodeStatus, 0, len(virtualStorage.Nodes))
for _, node := range virtualStorage.Nodes {
+ streamInterceptors := []grpc.StreamClientInterceptor{
+ grpc_prometheus.StreamClientInterceptor,
+ grpctracing.StreamClientTracingInterceptor(),
+ correlation.StreamClientCorrelationInterceptor(),
+ }
+
+ if c.Failover.Enabled && errorTracker != nil {
+ streamInterceptors = append(streamInterceptors, middleware.StreamErrorHandler(registry, errorTracker, node.Storage))
+ }
+
conn, err := client.DialContext(ctx, node.Address,
- append(
- []grpc.DialOption{
- grpc.WithBlock(),
- grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec())),
- grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(node.Token)),
- grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
- grpc_prometheus.StreamClientInterceptor,
- grpctracing.StreamClientTracingInterceptor(),
- correlation.StreamClientCorrelationInterceptor(),
- )),
- grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
- grpc_prometheus.UnaryClientInterceptor,
- grpctracing.UnaryClientTracingInterceptor(),
- correlation.UnaryClientCorrelationInterceptor(),
- )),
- }, dialOpts...),
+ append([]grpc.DialOption{
+ grpc.WithBlock(),
+ grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec())),
+ grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(node.Token)),
+ grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(streamInterceptors...)),
+ }, dialOpts...),
)
if err != nil {
return nil, err
}
- cs := newConnectionStatus(*node, conn, log, latencyHistogram)
+ cs := newConnectionStatus(*node, conn, log, latencyHistogram, errorTracker)
ns = append(ns, cs)
}
@@ -213,7 +224,12 @@ func (n *Mgr) GetShard(virtualStorageName string) (Shard, error) {
return Shard{}, fmt.Errorf("virtual storage %q: %w", virtualStorageName, ErrVirtualStorageNotExist)
}
- return strategy.GetShard()
+ shard, err := strategy.GetShard()
+ if err != nil {
+ return shard, err
+ }
+
+ return shard, nil
}
func (n *Mgr) EnableWrites(ctx context.Context, virtualStorageName string) error {
@@ -272,12 +288,13 @@ func (n *Mgr) GetSyncedNode(ctx context.Context, virtualStorageName, repoPath st
return nil, errors.New("could not select random storage")
}
-func newConnectionStatus(node config.Node, cc *grpc.ClientConn, l logrus.FieldLogger, latencyHist prommetrics.HistogramVec) *nodeStatus {
+func newConnectionStatus(node config.Node, cc *grpc.ClientConn, l logrus.FieldLogger, latencyHist prommetrics.HistogramVec, errorTracker tracker.ErrorTracker) *nodeStatus {
return &nodeStatus{
node: node,
clientConn: cc,
log: l,
latencyHist: latencyHist,
+ errTracker: errorTracker,
}
}
@@ -288,6 +305,7 @@ type nodeStatus struct {
latencyHist prommetrics.HistogramVec
mtx sync.RWMutex
statuses []bool
+ errTracker tracker.ErrorTracker
}
// GetStorage gets the storage name of a node
@@ -341,6 +359,18 @@ func (n *nodeStatus) updateStatus(status bool) {
}
func (n *nodeStatus) CheckHealth(ctx context.Context) (bool, error) {
+ if n.errTracker != nil {
+ if n.errTracker.ReadThresholdReached(n.GetStorage()) {
+ n.updateStatus(false)
+ return false, fmt.Errorf("read error threshold reached for storage %q", n.GetStorage())
+ }
+
+ if n.errTracker.WriteThresholdReached(n.GetStorage()) {
+ n.updateStatus(false)
+ return false, fmt.Errorf("write error threshold reached for storage %q", n.GetStorage())
+ }
+ }
+
health := healthpb.NewHealthClient(n.clientConn)
ctx, cancel := context.WithTimeout(ctx, healthcheckTimeout)
defer cancel()
diff --git a/internal/praefect/nodes/manager_test.go b/internal/praefect/nodes/manager_test.go
index d958dc06d..228ad177d 100644
--- a/internal/praefect/nodes/manager_test.go
+++ b/internal/praefect/nodes/manager_test.go
@@ -12,6 +12,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest"
"google.golang.org/grpc"
@@ -73,7 +74,7 @@ func TestNodeStatus(t *testing.T) {
mockHistogramVec := promtest.NewMockHistogramVec()
storageName := "default"
- cs := newConnectionStatus(config.Node{Storage: storageName}, cc, testhelper.DiscardTestEntry(t), mockHistogramVec)
+ cs := newConnectionStatus(config.Node{Storage: storageName}, cc, testhelper.DiscardTestEntry(t), mockHistogramVec, nil)
var expectedLabels [][]string
for i := 0; i < healthcheckThreshold; i++ {
@@ -124,7 +125,7 @@ func TestManagerFailoverDisabledElectionStrategySQL(t *testing.T) {
Failover: config.Failover{Enabled: false, ElectionStrategy: "sql"},
VirtualStorages: []*config.VirtualStorage{virtualStorage},
}
- nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec())
+ nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
require.NoError(t, err)
nm.Start(time.Millisecond, time.Millisecond)
@@ -175,7 +176,7 @@ func TestBlockingDial(t *testing.T) {
healthSrv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
}()
- mgr, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec())
+ mgr, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
require.NoError(t, err)
mgr.Start(1*time.Millisecond, 1*time.Millisecond)
@@ -221,10 +222,10 @@ func TestNodeManager(t *testing.T) {
}
mockHistogram := promtest.NewMockHistogramVec()
- nm, err := NewManager(testhelper.DiscardTestEntry(t), confWithFailover, nil, nil, mockHistogram)
+ nm, err := NewManager(testhelper.DiscardTestEntry(t), confWithFailover, nil, nil, mockHistogram, protoregistry.GitalyProtoPreregistered, nil)
require.NoError(t, err)
- nmWithoutFailover, err := NewManager(testhelper.DiscardTestEntry(t), confWithoutFailover, nil, nil, mockHistogram)
+ nmWithoutFailover, err := NewManager(testhelper.DiscardTestEntry(t), confWithoutFailover, nil, nil, mockHistogram, protoregistry.GitalyProtoPreregistered, nil)
require.NoError(t, err)
nm.Start(1*time.Millisecond, 5*time.Second)
@@ -401,7 +402,7 @@ func TestMgr_GetSyncedNode(t *testing.T) {
verify := func(scenario func(t *testing.T, nm Manager, queue datastore.ReplicationEventQueue)) func(*testing.T) {
queue := datastore.NewMemoryReplicationEventQueue(conf)
- nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, queue, mockHistogram)
+ nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, queue, mockHistogram, protoregistry.GitalyProtoPreregistered, nil)
require.NoError(t, err)
nm.Start(time.Duration(0), time.Hour)
@@ -567,12 +568,12 @@ func TestNodeStatus_IsHealthy(t *testing.T) {
latencyHistMock := &promtest.MockHistogramVec{}
t.Run("unchecked node is unhealthy", func(t *testing.T) {
- ns := newConnectionStatus(node, clientConn, logger, latencyHistMock)
+ ns := newConnectionStatus(node, clientConn, logger, latencyHistMock, nil)
require.False(t, ns.IsHealthy())
})
t.Run("not enough check to consider it healthy", func(t *testing.T) {
- ns := newConnectionStatus(node, clientConn, logger, latencyHistMock)
+ ns := newConnectionStatus(node, clientConn, logger, latencyHistMock, nil)
healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
checkNTimes(ctx, t, ns, healthcheckThreshold-1)
@@ -580,7 +581,7 @@ func TestNodeStatus_IsHealthy(t *testing.T) {
})
t.Run("healthy", func(t *testing.T) {
- ns := newConnectionStatus(node, clientConn, logger, latencyHistMock)
+ ns := newConnectionStatus(node, clientConn, logger, latencyHistMock, nil)
healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
checkNTimes(ctx, t, ns, healthcheckThreshold)
@@ -588,7 +589,7 @@ func TestNodeStatus_IsHealthy(t *testing.T) {
})
t.Run("healthy turns into unhealthy after single failed check", func(t *testing.T) {
- ns := newConnectionStatus(node, clientConn, logger, latencyHistMock)
+ ns := newConnectionStatus(node, clientConn, logger, latencyHistMock, nil)
healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
checkNTimes(ctx, t, ns, healthcheckThreshold)
@@ -601,7 +602,7 @@ func TestNodeStatus_IsHealthy(t *testing.T) {
})
t.Run("unhealthy turns into healthy after pre-define threshold of checks", func(t *testing.T) {
- ns := newConnectionStatus(node, clientConn, logger, latencyHistMock)
+ ns := newConnectionStatus(node, clientConn, logger, latencyHistMock, nil)
healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
checkNTimes(ctx, t, ns, healthcheckThreshold)
@@ -623,7 +624,7 @@ func TestNodeStatus_IsHealthy(t *testing.T) {
})
t.Run("concurrent access has no races", func(t *testing.T) {
- ns := newConnectionStatus(node, clientConn, logger, latencyHistMock)
+ ns := newConnectionStatus(node, clientConn, logger, latencyHistMock, nil)
healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
t.Run("continuously does health checks - 1", func(t *testing.T) {
diff --git a/internal/praefect/nodes/sql_elector_test.go b/internal/praefect/nodes/sql_elector_test.go
index a3bb5db96..1f1899ab4 100644
--- a/internal/praefect/nodes/sql_elector_test.go
+++ b/internal/praefect/nodes/sql_elector_test.go
@@ -42,7 +42,7 @@ func TestGetPrimaryAndSecondaries(t *testing.T) {
storageName := "default"
mockHistogramVec0 := promtest.NewMockHistogramVec()
- cs0 := newConnectionStatus(config.Node{Storage: storageName + "-0"}, cc0, testhelper.DiscardTestEntry(t), mockHistogramVec0)
+ cs0 := newConnectionStatus(config.Node{Storage: storageName + "-0"}, cc0, testhelper.DiscardTestEntry(t), mockHistogramVec0, nil)
ns := []*nodeStatus{cs0}
elector := newSQLElector(shardName, conf, db.DB, logger, ns)
@@ -99,8 +99,8 @@ func TestBasicFailover(t *testing.T) {
storageName := "default"
- cs0 := newConnectionStatus(config.Node{Storage: storageName + "-0", Address: addr0}, cc0, logger, promtest.NewMockHistogramVec())
- cs1 := newConnectionStatus(config.Node{Storage: storageName + "-1", Address: addr1}, cc1, logger, promtest.NewMockHistogramVec())
+ cs0 := newConnectionStatus(config.Node{Storage: storageName + "-0", Address: addr0}, cc0, logger, promtest.NewMockHistogramVec(), nil)
+ cs1 := newConnectionStatus(config.Node{Storage: storageName + "-1", Address: addr1}, cc1, logger, promtest.NewMockHistogramVec(), nil)
ns := []*nodeStatus{cs0, cs1}
elector := newSQLElector(shardName, conf, db.DB, logger, ns)
diff --git a/internal/praefect/nodes/errors.go b/internal/praefect/nodes/tracker/errors.go
index 2faaeb3e0..6997f9ec7 100644
--- a/internal/praefect/nodes/errors.go
+++ b/internal/praefect/nodes/tracker/errors.go
@@ -1,4 +1,4 @@
-package nodes
+package tracker
import (
"context"
diff --git a/internal/praefect/nodes/errors_test.go b/internal/praefect/nodes/tracker/errors_test.go
index f5b42e06a..95b4a3ab9 100644
--- a/internal/praefect/nodes/errors_test.go
+++ b/internal/praefect/nodes/tracker/errors_test.go
@@ -1,4 +1,4 @@
-package nodes
+package tracker
import (
"sync"
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 2ca47ee13..f07c94af0 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -123,7 +123,7 @@ func TestProcessReplicationJob(t *testing.T) {
entry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(entry, conf, nil, queue, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, queue, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
require.NoError(t, err)
nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond)
@@ -217,7 +217,7 @@ func TestPropagateReplicationJob(t *testing.T) {
queue := datastore.NewMemoryReplicationEventQueue(conf)
logEntry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queue, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queue, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
require.NoError(t, err)
nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond)
@@ -501,7 +501,7 @@ func TestProcessBacklog_FailedJobs(t *testing.T) {
logEntry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
require.NoError(t, err)
replMgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), queueInterceptor, nodeMgr)
@@ -640,7 +640,7 @@ func TestProcessBacklog_Success(t *testing.T) {
logEntry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
require.NoError(t, err)
replMgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), queueInterceptor, nodeMgr)
diff --git a/internal/praefect/server_factory_test.go b/internal/praefect/server_factory_test.go
index c0f44ca9f..77410cd1f 100644
--- a/internal/praefect/server_factory_test.go
+++ b/internal/praefect/server_factory_test.go
@@ -77,7 +77,7 @@ func TestServerFactory(t *testing.T) {
logger := testhelper.DiscardTestEntry(t)
queue := datastore.NewMemoryReplicationEventQueue(conf)
- nodeMgr, err := nodes.NewManager(logger, conf, nil, queue, &promtest.MockHistogramVec{})
+ nodeMgr, err := nodes.NewManager(logger, conf, nil, queue, &promtest.MockHistogramVec{}, protoregistry.GitalyProtoPreregistered, nil)
require.NoError(t, err)
txMgr := transactions.NewManager()
registry := protoregistry.GitalyProtoPreregistered
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index 04f1843e6..8e7b8d8b3 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -16,6 +16,7 @@ import (
"time"
"github.com/golang/protobuf/proto"
+ "github.com/golang/protobuf/ptypes/empty"
"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
@@ -31,6 +32,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/metadata"
"gitlab.com/gitlab-org/gitaly/internal/praefect/mock"
"gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes/tracker"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/internal/praefect/transactions"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
@@ -40,7 +42,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
- "google.golang.org/grpc/reflection"
+ grpc_metadata "google.golang.org/grpc/metadata"
)
func TestServerRouteServerAccessor(t *testing.T) {
@@ -731,7 +733,7 @@ func (m *mockSmartHTTP) Called(method string) int {
return m.methodsCalled[method]
}
-func newGrpcServer(t *testing.T, srv gitalypb.SmartHTTPServiceServer) (string, *grpc.Server) {
+func newSmartHTTPGrpcServer(t *testing.T, srv gitalypb.SmartHTTPServiceServer) (string, *grpc.Server) {
socketPath := testhelper.GetTemporaryGitalySocketFileName()
listener, err := net.Listen("unix", socketPath)
require.NoError(t, err)
@@ -742,7 +744,6 @@ func newGrpcServer(t *testing.T, srv gitalypb.SmartHTTPServiceServer) (string, *
grpc_health_v1.RegisterHealthServer(grpcServer, healthSrvr)
healthSrvr.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
gitalypb.RegisterSmartHTTPServiceServer(grpcServer, srv)
- reflection.Register(grpcServer)
go grpcServer.Serve(listener)
@@ -754,11 +755,11 @@ func TestProxyWrites(t *testing.T) {
smartHTTP0, smartHTTP1, smartHTTP2 := &mockSmartHTTP{txMgr: txMgr}, &mockSmartHTTP{txMgr: txMgr}, &mockSmartHTTP{txMgr: txMgr}
- socket0, srv0 := newGrpcServer(t, smartHTTP0)
+ socket0, srv0 := newSmartHTTPGrpcServer(t, smartHTTP0)
defer srv0.Stop()
- socket1, srv1 := newGrpcServer(t, smartHTTP1)
+ socket1, srv1 := newSmartHTTPGrpcServer(t, smartHTTP1)
defer srv1.Stop()
- socket2, srv2 := newGrpcServer(t, smartHTTP2)
+ socket2, srv2 := newSmartHTTPGrpcServer(t, smartHTTP2)
defer srv2.Stop()
conf := config.Config{
@@ -786,7 +787,7 @@ func TestProxyWrites(t *testing.T) {
queue := datastore.NewMemoryReplicationEventQueue(conf)
entry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(entry, conf, nil, queue, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, queue, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
require.NoError(t, err)
coordinator := NewCoordinator(queue, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered)
@@ -855,6 +856,155 @@ func TestProxyWrites(t *testing.T) {
assert.Equal(t, bytes.Repeat([]byte(payload), 10), receivedData.Bytes())
}
+type errorSimpleService struct {
+}
+
+// ServerAccessor is implemented by a callback
+func (m *errorSimpleService) ServerAccessor(ctx context.Context, req *mock.SimpleRequest) (*mock.SimpleResponse, error) {
+ return nil, helper.ErrInternalf("something went wrong")
+}
+
+// RepoAccessorUnary is implemented by a callback
+func (m *errorSimpleService) RepoAccessorUnary(ctx context.Context, req *mock.RepoRequest) (*empty.Empty, error) {
+ md, ok := grpc_metadata.FromIncomingContext(ctx)
+ if !ok {
+ return &empty.Empty{}, errors.New("couldn't read metadata")
+ }
+
+ if md.Get("bad-header")[0] == "true" {
+ return &empty.Empty{}, helper.ErrInternalf("something went wrong")
+ }
+
+ return &empty.Empty{}, nil
+}
+
+// RepoMutatorUnary is implemented by a callback
+func (m *errorSimpleService) RepoMutatorUnary(ctx context.Context, req *mock.RepoRequest) (*empty.Empty, error) {
+ md, ok := grpc_metadata.FromIncomingContext(ctx)
+ if !ok {
+ return &empty.Empty{}, errors.New("couldn't read metadata")
+ }
+
+ if md.Get("bad-header")[0] == "true" {
+ return &empty.Empty{}, helper.ErrInternalf("something went wrong")
+ }
+
+ return &empty.Empty{}, nil
+}
+
+func TestErrorTreshold(t *testing.T) {
+ simple := &errorSimpleService{}
+
+ backendToken := ""
+ backend, cleanup := newMockDownstream(t, backendToken, simple)
+ defer cleanup()
+
+ conf := config.Config{
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: "default",
+ Nodes: []*config.Node{
+ {
+ Storage: "praefect-internal-0",
+ Address: backend,
+ },
+ },
+ },
+ },
+ Failover: config.Failover{
+ Enabled: true,
+ },
+ }
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ queue := datastore.NewMemoryReplicationEventQueue(conf)
+ entry := testhelper.DiscardTestEntry(t)
+
+ testCases := []struct {
+ desc string
+ readThreshold uint32
+ writeThreshold uint32
+ }{
+ {
+ desc: "read threshold reached",
+ readThreshold: 5,
+ writeThreshold: 10000,
+ },
+ {
+ desc: "write threshold reached",
+ readThreshold: 10000,
+ writeThreshold: 5,
+ },
+ }
+
+ gz := proto.FileDescriptor("mock.proto")
+ fd, err := protoregistry.ExtractFileDescriptor(gz)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ registry, err := protoregistry.New(fd)
+ require.NoError(t, err)
+
+ for _, tc := range testCases {
+ t.Run(tc.desc, func(t *testing.T) {
+ errorTracker, err := tracker.NewErrors(ctx, 10*time.Second, tc.readThreshold, tc.writeThreshold)
+ require.NoError(t, err)
+
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, queue, promtest.NewMockHistogramVec(), registry, errorTracker)
+ require.NoError(t, err)
+
+ coordinator := NewCoordinator(queue, nodeMgr, nil, conf, registry)
+
+ server := grpc.NewServer(
+ grpc.CustomCodec(proxy.NewCodec()),
+ grpc.UnknownServiceHandler(proxy.TransparentHandler(coordinator.StreamDirector)),
+ )
+
+ socket := testhelper.GetTemporaryGitalySocketFileName()
+ listener, err := net.Listen("unix", socket)
+ require.NoError(t, err)
+
+ go server.Serve(listener)
+ defer server.Stop()
+
+ conn, err := dial("unix://"+socket, []grpc.DialOption{grpc.WithInsecure()})
+ require.NoError(t, err)
+ cli := mock.NewSimpleServiceClient(conn)
+
+ nodeMgr.Start(0, 5*time.Millisecond)
+ repo, _, cleanup := testhelper.NewTestRepo(t)
+ defer cleanup()
+
+ calls := 6
+
+ for i := 0; i < calls; i++ {
+ ctx := grpc_metadata.AppendToOutgoingContext(ctx, "bad-header", "true")
+
+ _, err = cli.RepoAccessorUnary(ctx, &mock.RepoRequest{Repo: repo})
+ require.Error(t, err)
+
+ _, err = cli.RepoMutatorUnary(ctx, &mock.RepoRequest{Repo: repo})
+ require.Error(t, err)
+ }
+
+ time.Sleep(5 * time.Millisecond)
+
+ _, accessorErr := cli.RepoAccessorUnary(ctx, &mock.RepoRequest{Repo: repo})
+ _, mutatorErr := cli.RepoMutatorUnary(ctx, &mock.RepoRequest{Repo: repo})
+
+ if calls >= int(tc.readThreshold) {
+ require.Error(t, accessorErr)
+ }
+ if calls >= int(tc.writeThreshold) {
+ require.Error(t, mutatorErr)
+ }
+ })
+ }
+}
+
func newSmartHTTPClient(t *testing.T, serverSocketPath string) (gitalypb.SmartHTTPServiceClient, *grpc.ClientConn) {
t.Helper()