diff options
author | John Cai <jcai@gitlab.com> | 2020-05-20 01:01:01 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2020-05-30 04:46:34 +0300 |
commit | 372ede9fdb5fd4f7e4f5b6ced9170457cb27d2ce (patch) | |
tree | d5a37bb55925b9e7288a4331cb87e0544c04447a | |
parent | a23e2d9c4a922bead1d45a24a96f43930abc48e4 (diff) |
Keep track of read/write errors in praefectjc-node-manager-error-threshold-client
-rw-r--r-- | changelogs/unreleased/jc-node-manager-error-threshold-client.yml | 5 | ||||
-rw-r--r-- | cmd/praefect/main.go | 11 | ||||
-rw-r--r-- | internal/praefect/auth_test.go | 3 | ||||
-rw-r--r-- | internal/praefect/config/config.go | 9 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 7 | ||||
-rw-r--r-- | internal/praefect/helper_test.go | 3 | ||||
-rw-r--r-- | internal/praefect/middleware/errorhandler.go | 179 | ||||
-rw-r--r-- | internal/praefect/middleware/errorhandler_test.go | 180 | ||||
-rw-r--r-- | internal/praefect/nodes/local_elector_test.go | 6 | ||||
-rw-r--r-- | internal/praefect/nodes/manager.go | 27 | ||||
-rw-r--r-- | internal/praefect/nodes/manager_test.go | 15 | ||||
-rw-r--r-- | internal/praefect/nodes/sql_elector_test.go | 7 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 9 | ||||
-rw-r--r-- | internal/service/repository/optimize.go | 4 |
14 files changed, 435 insertions, 30 deletions
diff --git a/changelogs/unreleased/jc-node-manager-error-threshold-client.yml b/changelogs/unreleased/jc-node-manager-error-threshold-client.yml new file mode 100644 index 000000000..d4228c004 --- /dev/null +++ b/changelogs/unreleased/jc-node-manager-error-threshold-client.yml @@ -0,0 +1,5 @@ +--- +title: Node manager error threshold client +merge_request: 2230 +author: +type: added diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index c41ed9820..75e0dbe15 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -101,6 +101,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/internal/praefect/metrics" + "gitlab.com/gitlab-org/gitaly/internal/praefect/middleware" "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/internal/praefect/transactions" @@ -238,7 +239,15 @@ func run(cfgs []starter.Config, conf config.Config) error { queue = datastore.NewPostgresReplicationEventQueue(db) } - nodeManager, err := nodes.NewManager(logger, conf, db, queue, nodeLatencyHistogram) + nodeManager, err := nodes.NewManager( + logger, + conf, + db, + queue, + middleware.NewErrors(conf.Failover.ErrorThresholdWindowSeconds, conf.Failover.ReadErrorThreshold, conf.Failover.WriteErrorThreshold), + nodeLatencyHistogram, + ) + if err != nil { return err } diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go index cc5f35bbb..3144665d1 100644 --- a/internal/praefect/auth_test.go +++ b/internal/praefect/auth_test.go @@ -12,6 +12,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/config/auth" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/internal/praefect/middleware" "gitlab.com/gitlab-org/gitaly/internal/praefect/mock" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" @@ -171,7 +172,7 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func logEntry := testhelper.DiscardTestEntry(t) queue := datastore.NewMemoryReplicationEventQueue(conf) - nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queue, promtest.NewMockHistogramVec()) + nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queue, middleware.NewErrors(0, 0, 0), promtest.NewMockHistogramVec()) require.NoError(t, err) txMgr := transactions.NewManager() diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go index 1057b9c1d..60f944b46 100644 --- a/internal/praefect/config/config.go +++ b/internal/praefect/config/config.go @@ -16,9 +16,12 @@ import ( ) type Failover struct { - Enabled bool `toml:"enabled"` - ElectionStrategy string `toml:"election_strategy"` - ReadOnlyAfterFailover bool `toml:"read_only_after_failover"` + Enabled bool `toml:"enabled"` + ElectionStrategy string `toml:"election_strategy"` + ReadOnlyAfterFailover bool `toml:"read_only_after_failover"` + ErrorThresholdWindowSeconds int `toml:"error_threshold_time_seconds"` + WriteErrorThreshold int `toml:"write_error_threshold"` + ReadErrorThreshold int `toml:"read_error_threshold"` } // Config is a container for everything found in the TOML config file diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index f104e55b1..faef2b09c 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -15,6 +15,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/internal/praefect/middleware" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" @@ -182,7 +183,7 @@ func TestStreamDirectorMutator(t *testing.T) { entry := testhelper.DiscardTestEntry(t) - nodeMgr, err := nodes.NewManager(entry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec()) + nodeMgr, err := nodes.NewManager(entry, conf, nil, queueInterceptor, middleware.NewErrors(0, 0, 0), promtest.NewMockHistogramVec()) require.NoError(t, err) txMgr := transactions.NewManager() @@ -284,7 +285,7 @@ func TestStreamDirectorAccessor(t *testing.T) { entry := testhelper.DiscardTestEntry(t) - nodeMgr, err := nodes.NewManager(entry, conf, nil, queue, promtest.NewMockHistogramVec()) + nodeMgr, err := nodes.NewManager(entry, conf, nil, queue, middleware.NewErrors(0, 0, 0), promtest.NewMockHistogramVec()) require.NoError(t, err) txMgr := transactions.NewManager() @@ -377,7 +378,7 @@ func TestAbsentCorrelationID(t *testing.T) { entry := testhelper.DiscardTestEntry(t) - nodeMgr, err := nodes.NewManager(entry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec()) + nodeMgr, err := nodes.NewManager(entry, conf, nil, queueInterceptor, middleware.NewErrors(0, 0, 0), promtest.NewMockHistogramVec()) require.NoError(t, err) txMgr := transactions.NewManager() diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go index a8db569ce..49f97be8d 100644 --- a/internal/praefect/helper_test.go +++ b/internal/praefect/helper_test.go @@ -17,6 +17,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" + "gitlab.com/gitlab-org/gitaly/internal/praefect/middleware" "gitlab.com/gitlab-org/gitaly/internal/praefect/mock" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" @@ -206,7 +207,7 @@ func defaultTxMgr() *transactions.Manager { } func defaultNodeMgr(t testing.TB, conf config.Config, queue datastore.ReplicationEventQueue) nodes.Manager { - nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, queue, promtest.NewMockHistogramVec()) + nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, queue, middleware.NewErrors(0, 0, 0), promtest.NewMockHistogramVec()) require.NoError(t, err) nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond) return nodeMgr diff --git a/internal/praefect/middleware/errorhandler.go b/internal/praefect/middleware/errorhandler.go new file mode 100644 index 000000000..2ecd447b5 --- /dev/null +++ b/internal/praefect/middleware/errorhandler.go @@ -0,0 +1,179 @@ +package middleware + +import ( + "context" + "fmt" + "io" + "sync" + "time" + + "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" + "google.golang.org/grpc" +) + +func StreamErrorHandler(registry *protoregistry.Registry, errorTracker *Errors, node string) grpc.StreamClientInterceptor { + return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + stream, err := streamer(ctx, desc, cc, method, opts...) + + mi, lookupErr := registry.LookupMethod(method) + if err != nil { + return nil, fmt.Errorf("error when looking up method: %w %v", err, lookupErr) + } + + return newCatchErrorStreamer(stream, errorTracker, mi.Operation, node), err + } +} + +type CatchErrorStreamer struct { + grpc.ClientStream + errors *Errors + operation protoregistry.OpType + node string +} + +func (c *CatchErrorStreamer) SendMsg(m interface{}) error { + err := c.ClientStream.SendMsg(m) + if err != nil { + switch c.operation { + case protoregistry.OpAccessor: + c.errors.IncrReadErr(c.node) + case protoregistry.OpMutator: + c.errors.IncrWriteErr(c.node) + } + } + + return err +} + +func (c *CatchErrorStreamer) RecvMsg(m interface{}) error { + err := c.ClientStream.RecvMsg(m) + if err != nil && err != io.EOF { + switch c.operation { + case protoregistry.OpAccessor: + c.errors.IncrReadErr(c.node) + case protoregistry.OpMutator: + c.errors.IncrWriteErr(c.node) + } + } + + return err +} + +func newCatchErrorStreamer(streamer grpc.ClientStream, errors *Errors, operation protoregistry.OpType, node string) *CatchErrorStreamer { + return &CatchErrorStreamer{ + ClientStream: streamer, + errors: errors, + operation: operation, + node: node, + } +} + +type Errors struct { + duration time.Duration + m sync.RWMutex + writeThreshold, readThreshold int + readErrors, writeErrors map[string][]int64 +} + +const ( + defaultWindowSeconds = 10 +) + +func NewErrors(windowSeconds, readThreshold, writeThreshold int) *Errors { + if windowSeconds <= 0 { + windowSeconds = 10 + } + + window := time.Duration(windowSeconds) * time.Second + + if readThreshold == 0 { + readThreshold = 100 * defaultWindowSeconds + } + + if writeThreshold == 0 { + writeThreshold = 100 * defaultWindowSeconds + } + + e := &Errors{ + duration: window, + readErrors: make(map[string][]int64), + writeErrors: make(map[string][]int64), + readThreshold: readThreshold, + writeThreshold: writeThreshold, + } + go e.PeriodicallyClear() + + return e +} + +func (e *Errors) IncrReadErr(node string) { + e.m.Lock() + defer e.m.Unlock() + + if len(e.readErrors[node]) < e.readThreshold { + e.readErrors[node] = append(e.readErrors[node], time.Now().UnixNano()) + } +} + +func (e *Errors) IncrWriteErr(node string) { + e.m.Lock() + defer e.m.Unlock() + + if len(e.writeErrors[node]) < e.writeThreshold { + e.writeErrors[node] = append(e.writeErrors[node], time.Now().UnixNano()) + } +} + +func (e *Errors) ReadThresholdReached(node string) bool { + e.m.RLock() + defer e.m.RUnlock() + + return len(e.readErrors[node]) >= e.readThreshold +} + +func (e *Errors) WriteThresholdReached(node string) bool { + e.m.RLock() + defer e.m.RUnlock() + + return len(e.writeErrors[node]) >= e.writeThreshold +} + +func (e *Errors) PeriodicallyClear() { + ticker := time.NewTicker(e.duration) + for { + start := time.Now() + <-ticker.C + e.clear(start) + } +} + +func (e *Errors) clear(olderThan time.Time) { + e.m.Lock() + defer e.m.Unlock() + + for node, nodeWriteErrors := range e.writeErrors { + for i, writeErrorTime := range nodeWriteErrors { + if time.Unix(0, writeErrorTime).Before(olderThan) { + if i+1 == len(nodeWriteErrors) { + e.writeErrors[node] = nil + } else { + e.writeErrors[node] = nodeWriteErrors[i+1:] + } + break + } + } + } + + for node, nodeReadErrors := range e.readErrors { + for i, readErrorTime := range nodeReadErrors { + if time.Unix(0, readErrorTime).Before(olderThan) { + if i+1 == len(nodeReadErrors) { + e.readErrors[node] = nil + } else { + e.readErrors[node] = nodeReadErrors[i+1:] + } + break + } + } + } +} diff --git a/internal/praefect/middleware/errorhandler_test.go b/internal/praefect/middleware/errorhandler_test.go new file mode 100644 index 000000000..b158102f1 --- /dev/null +++ b/internal/praefect/middleware/errorhandler_test.go @@ -0,0 +1,180 @@ +package middleware + +import ( + "context" + "fmt" + "net" + "testing" + "time" + + "github.com/golang/protobuf/proto" + "github.com/golang/protobuf/ptypes/empty" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/helper" + "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" + "gitlab.com/gitlab-org/gitaly/internal/praefect/mock" + "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "google.golang.org/grpc" + "google.golang.org/grpc/reflection" +) + +func TestErrorTracker_IncrErrors(t *testing.T) { + writeThreshold, readThreshold := 10, 10 + errors := NewErrors(100000, readThreshold, writeThreshold) + + node := "backend-node-1" + + assert.False(t, errors.WriteThresholdReached(node)) + assert.False(t, errors.ReadThresholdReached(node)) + + for i := 0; i < writeThreshold; i++ { + errors.IncrWriteErr(node) + } + + assert.True(t, errors.WriteThresholdReached(node)) + + for i := 0; i < readThreshold; i++ { + errors.IncrReadErr(node) + } + + assert.True(t, errors.ReadThresholdReached(node)) + + errors.clear(time.Now()) + + assert.False(t, errors.WriteThresholdReached(node)) + assert.False(t, errors.ReadThresholdReached(node)) +} + +func TestErrorTracker_ClearErrors(t *testing.T) { + writeThreshold, readThreshold := 10, 10 + errors := NewErrors(100000, readThreshold, writeThreshold) + + node := "backend-node-1" + + errors.IncrWriteErr(node) + errors.IncrReadErr(node) + + time.Sleep(10 * time.Millisecond) + clearErrorsOlderThan := time.Now() + time.Sleep(10 * time.Millisecond) + + errors.IncrWriteErr(node) + errors.IncrReadErr(node) + + errors.clear(clearErrorsOlderThan) + assert.Len(t, errors.readErrors[node], 1, "clear should only have cleared the read error older than the time specifiied") + assert.Len(t, errors.writeErrors[node], 1, "clear should only have cleared the write error older than the time specifiied") +} + +type simpleService struct{} + +func (s *simpleService) RepoAccessorUnary(ctx context.Context, in *mock.RepoRequest) (*empty.Empty, error) { + if in.GetRepo() == nil { + return nil, helper.ErrInternalf("error") + } + + return &empty.Empty{}, nil +} + +func (s *simpleService) RepoMutatorUnary(ctx context.Context, in *mock.RepoRequest) (*empty.Empty, error) { + if in.GetRepo() == nil { + return nil, helper.ErrInternalf("error") + } + + return &empty.Empty{}, nil +} + +func (s *simpleService) ServerAccessor(ctx context.Context, in *mock.SimpleRequest) (*mock.SimpleResponse, error) { + return &mock.SimpleResponse{}, nil +} + +func TestStreamInterceptor(t *testing.T) { + threshold := 5 + errTracker := NewErrors(10000, threshold, threshold) + nodeName := "node-1" + + internalSrv := grpc.NewServer() + + internalServerSocketPath := testhelper.GetTemporaryGitalySocketFileName() + lis, err := net.Listen("unix", internalServerSocketPath) + + gz := proto.FileDescriptor("mock.proto") + fmt.Printf("\nGZ SIZE: %d\n", len(gz)) + fd, err := protoregistry.ExtractFileDescriptor(gz) + require.NoError(t, err) + + registry, err := protoregistry.New(fd) + require.NoError(t, err) + + require.NoError(t, err) + mock.RegisterSimpleServiceServer(internalSrv, &simpleService{}) + reflection.Register(internalSrv) + + go internalSrv.Serve(lis) + defer internalSrv.Stop() + + srvOptions := []grpc.ServerOption{ + grpc.CustomCodec(proxy.NewCodec()), + grpc.UnknownServiceHandler(proxy.TransparentHandler(func(ctx context.Context, + fullMethodName string, + peeker proxy.StreamModifier, + ) (*proxy.StreamParameters, error) { + cc, err := grpc.Dial("unix://"+internalServerSocketPath, + grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec())), + grpc.WithInsecure(), + grpc.WithStreamInterceptor(StreamErrorHandler(registry, errTracker, nodeName)), + ) + require.NoError(t, err) + return proxy.NewStreamParameters(ctx, cc, func() {}, nil), nil + })), + } + + praefectSocket := testhelper.GetTemporaryGitalySocketFileName() + praefectLis, err := net.Listen("unix", praefectSocket) + require.NoError(t, err) + + praefectSrv := grpc.NewServer(srvOptions...) + defer praefectSrv.Stop() + go praefectSrv.Serve(praefectLis) + + praefectCC, err := grpc.Dial("unix://"+praefectSocket, grpc.WithInsecure()) + require.NoError(t, err) + + simpleClient := mock.NewSimpleServiceClient(praefectCC) + + testRepo, _, cleanup := testhelper.NewTestRepo(t) + defer cleanup() + + ctx, cancel := testhelper.Context() + defer cancel() + + for i := 0; i < threshold; i++ { + _, err = simpleClient.RepoAccessorUnary(ctx, &mock.RepoRequest{ + Repo: testRepo, + }) + require.NoError(t, err) + _, err = simpleClient.RepoMutatorUnary(ctx, &mock.RepoRequest{ + Repo: testRepo, + }) + require.NoError(t, err) + } + + assert.False(t, errTracker.WriteThresholdReached(nodeName)) + assert.False(t, errTracker.ReadThresholdReached(nodeName)) + + for i := 0; i < threshold; i++ { + _, err = simpleClient.RepoAccessorUnary(ctx, &mock.RepoRequest{ + Repo: nil, + }) + require.Error(t, err) + _, err = simpleClient.RepoMutatorUnary(ctx, &mock.RepoRequest{ + Repo: nil, + }) + require.Error(t, err) + } + + assert.True(t, errTracker.WriteThresholdReached(nodeName)) + assert.True(t, errTracker.ReadThresholdReached(nodeName)) +} diff --git a/internal/praefect/nodes/local_elector_test.go b/internal/praefect/nodes/local_elector_test.go index 3b8e40f46..721956d1a 100644 --- a/internal/praefect/nodes/local_elector_test.go +++ b/internal/praefect/nodes/local_elector_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/praefect/middleware" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/testhelper" "gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest" @@ -26,8 +27,9 @@ func setupElector(t *testing.T) (*localElector, []*nodeStatus, *grpc.ClientConn, storageName := "default" mockHistogramVec0, mockHistogramVec1 := promtest.NewMockHistogramVec(), promtest.NewMockHistogramVec() - cs := newConnectionStatus(models.Node{Storage: storageName}, cc, testhelper.DiscardTestEntry(t), mockHistogramVec0) - secondary := newConnectionStatus(models.Node{Storage: storageName}, cc, testhelper.DiscardTestEntry(t), mockHistogramVec1) + errTracker := middleware.NewErrors(0, 0, 0) + cs := newConnectionStatus(models.Node{Storage: storageName}, cc, testhelper.DiscardTestEntry(t), errTracker, mockHistogramVec0) + secondary := newConnectionStatus(models.Node{Storage: storageName}, cc, testhelper.DiscardTestEntry(t), errTracker, mockHistogramVec1) ns := []*nodeStatus{cs, secondary} logger := testhelper.NewTestLogger(t).WithField("test", t.Name()) strategy := newLocalElector(storageName, true, true, logger, ns) diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go index 91f2c1d97..4fb285653 100644 --- a/internal/praefect/nodes/manager.go +++ b/internal/praefect/nodes/manager.go @@ -19,7 +19,9 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" "gitlab.com/gitlab-org/gitaly/internal/praefect/metrics" + "gitlab.com/gitlab-org/gitaly/internal/praefect/middleware" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" + "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" prommetrics "gitlab.com/gitlab-org/gitaly/internal/prometheus/metrics" correlation "gitlab.com/gitlab-org/labkit/correlation/grpc" grpctracing "gitlab.com/gitlab-org/labkit/tracing/grpc" @@ -98,7 +100,15 @@ var ErrPrimaryNotHealthy = errors.New("primary is not healthy") const dialTimeout = 10 * time.Second // NewManager creates a new NodeMgr based on virtual storage configs -func NewManager(log *logrus.Entry, c config.Config, db *sql.DB, queue datastore.ReplicationEventQueue, latencyHistogram prommetrics.HistogramVec, dialOpts ...grpc.DialOption) (*Mgr, error) { +func NewManager( + log *logrus.Entry, + c config.Config, + db *sql.DB, + queue datastore.ReplicationEventQueue, + errTracker *middleware.Errors, + latencyHistogram prommetrics.HistogramVec, + dialOpts ...grpc.DialOption, +) (*Mgr, error) { strategies := make(map[string]leaderElectionStrategy, len(c.VirtualStorages)) ctx, cancel := context.WithTimeout(context.Background(), dialTimeout) @@ -117,6 +127,7 @@ func NewManager(log *logrus.Entry, c config.Config, db *sql.DB, queue datastore. grpc_prometheus.StreamClientInterceptor, grpctracing.StreamClientTracingInterceptor(), correlation.StreamClientCorrelationInterceptor(), + middleware.StreamErrorHandler(protoregistry.GitalyProtoPreregistered, errTracker, node.Storage), )), grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( grpc_prometheus.UnaryClientInterceptor, @@ -128,7 +139,7 @@ func NewManager(log *logrus.Entry, c config.Config, db *sql.DB, queue datastore. if err != nil { return nil, err } - cs := newConnectionStatus(*node, conn, log, latencyHistogram) + cs := newConnectionStatus(*node, conn, log, errTracker, latencyHistogram) if node.DefaultPrimary { ns[0] = cs } else { @@ -229,11 +240,12 @@ func (n *Mgr) GetSyncedNode(ctx context.Context, virtualStorageName, repoPath st return shard.Primary, nil // there is no matched secondaries, maybe because of re-configuration } -func newConnectionStatus(node models.Node, cc *grpc.ClientConn, l *logrus.Entry, latencyHist prommetrics.HistogramVec) *nodeStatus { +func newConnectionStatus(node models.Node, cc *grpc.ClientConn, l *logrus.Entry, errTracker *middleware.Errors, latencyHist prommetrics.HistogramVec) *nodeStatus { return &nodeStatus{ Node: node, ClientConn: cc, log: l, + errTracker: errTracker, latencyHist: latencyHist, } } @@ -241,6 +253,7 @@ func newConnectionStatus(node models.Node, cc *grpc.ClientConn, l *logrus.Entry, type nodeStatus struct { models.Node *grpc.ClientConn + errTracker *middleware.Errors log *logrus.Entry latencyHist prommetrics.HistogramVec } @@ -268,6 +281,14 @@ func (n *nodeStatus) GetConnection() *grpc.ClientConn { const checkTimeout = 1 * time.Second func (n *nodeStatus) check(ctx context.Context) (bool, error) { + if n.errTracker.WriteThresholdReached(n.Storage) { + return false, errors.New("too many write errors") + } + + if n.errTracker.ReadThresholdReached(n.Storage) { + return false, errors.New("too many read errors") + } + client := healthpb.NewHealthClient(n.ClientConn) ctx, cancel := context.WithTimeout(ctx, checkTimeout) defer cancel() diff --git a/internal/praefect/nodes/manager_test.go b/internal/praefect/nodes/manager_test.go index e2b318a59..0448b8401 100644 --- a/internal/praefect/nodes/manager_test.go +++ b/internal/praefect/nodes/manager_test.go @@ -11,6 +11,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/internal/praefect/middleware" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/testhelper" "gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest" @@ -34,7 +35,7 @@ func TestNodeStatus(t *testing.T) { mockHistogramVec := promtest.NewMockHistogramVec() storageName := "default" - cs := newConnectionStatus(models.Node{Storage: storageName}, cc, testhelper.DiscardTestEntry(t), mockHistogramVec) + cs := newConnectionStatus(models.Node{Storage: storageName}, cc, testhelper.DiscardTestEntry(t), middleware.NewErrors(0, 0, 0), mockHistogramVec) var expectedLabels [][]string for i := 0; i < healthcheckThreshold; i++ { @@ -86,7 +87,7 @@ func TestManagerFailoverDisabledElectionStrategySQL(t *testing.T) { Failover: config.Failover{Enabled: false, ElectionStrategy: "sql"}, VirtualStorages: []*config.VirtualStorage{virtualStorage}, } - nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec()) + nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, middleware.NewErrors(0, 0, 0), promtest.NewMockHistogramVec()) require.NoError(t, err) nm.Start(time.Millisecond, time.Millisecond) @@ -134,7 +135,7 @@ func TestPrimaryIsSecond(t *testing.T) { } mockHistogram := promtest.NewMockHistogramVec() - nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, mockHistogram) + nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, middleware.NewErrors(0, 0, 0), mockHistogram) require.NoError(t, err) shard, err := nm.GetShard("virtual-storage-0") @@ -184,7 +185,7 @@ func TestBlockingDial(t *testing.T) { healthSrv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING) }() - mgr, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec()) + mgr, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, middleware.NewErrors(0, 0, 0), promtest.NewMockHistogramVec()) require.NoError(t, err) mgr.Start(1*time.Millisecond, 1*time.Millisecond) @@ -230,10 +231,10 @@ func TestNodeManager(t *testing.T) { } mockHistogram := promtest.NewMockHistogramVec() - nm, err := NewManager(testhelper.DiscardTestEntry(t), confWithFailover, nil, nil, mockHistogram) + nm, err := NewManager(testhelper.DiscardTestEntry(t), confWithFailover, nil, nil, middleware.NewErrors(0, 0, 0), mockHistogram) require.NoError(t, err) - nmWithoutFailover, err := NewManager(testhelper.DiscardTestEntry(t), confWithoutFailover, nil, nil, mockHistogram) + nmWithoutFailover, err := NewManager(testhelper.DiscardTestEntry(t), confWithoutFailover, nil, nil, middleware.NewErrors(0, 0, 0), mockHistogram) require.NoError(t, err) nm.Start(1*time.Millisecond, 5*time.Second) @@ -394,7 +395,7 @@ func TestMgr_GetSyncedNode(t *testing.T) { verify := func(scenario func(t *testing.T, nm Manager, queue datastore.ReplicationEventQueue)) func(*testing.T) { queue := datastore.NewMemoryReplicationEventQueue(conf) - nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, queue, mockHistogram) + nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, queue, middleware.NewErrors(0, 0, 0), mockHistogram) require.NoError(t, err) nm.Start(time.Duration(0), time.Hour) diff --git a/internal/praefect/nodes/sql_elector_test.go b/internal/praefect/nodes/sql_elector_test.go index adc602952..42765fe59 100644 --- a/internal/praefect/nodes/sql_elector_test.go +++ b/internal/praefect/nodes/sql_elector_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql" + "gitlab.com/gitlab-org/gitaly/internal/praefect/middleware" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/testhelper" "gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest" @@ -43,7 +44,7 @@ func TestGetPrimaryAndSecondaries(t *testing.T) { storageName := "default" mockHistogramVec0 := promtest.NewMockHistogramVec() - cs0 := newConnectionStatus(models.Node{Storage: storageName + "-0"}, cc0, testhelper.DiscardTestEntry(t), mockHistogramVec0) + cs0 := newConnectionStatus(models.Node{Storage: storageName + "-0"}, cc0, testhelper.DiscardTestEntry(t), middleware.NewErrors(0, 0, 0), mockHistogramVec0) ns := []*nodeStatus{cs0} elector := newSQLElector(shardName, conf, 1, defaultActivePraefectSeconds, db.DB, logger, ns) @@ -98,8 +99,8 @@ func TestBasicFailover(t *testing.T) { storageName := "default" mockHistogramVec0, mockHistogramVec1 := promtest.NewMockHistogramVec(), promtest.NewMockHistogramVec() - cs0 := newConnectionStatus(models.Node{Storage: storageName + "-0"}, cc0, testhelper.DiscardTestEntry(t), mockHistogramVec0) - cs1 := newConnectionStatus(models.Node{Storage: storageName + "-1"}, cc1, testhelper.DiscardTestEntry(t), mockHistogramVec1) + cs0 := newConnectionStatus(models.Node{Storage: storageName + "-0"}, cc0, testhelper.DiscardTestEntry(t), middleware.NewErrors(0, 0, 0), mockHistogramVec0) + cs1 := newConnectionStatus(models.Node{Storage: storageName + "-1"}, cc1, testhelper.DiscardTestEntry(t), middleware.NewErrors(0, 0, 0), mockHistogramVec1) ns := []*nodeStatus{cs0, cs1} failoverTimeSeconds := 1 diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index fbfcb96ab..7937e40e4 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -19,6 +19,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/internal/praefect/middleware" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" @@ -122,7 +123,7 @@ func TestProcessReplicationJob(t *testing.T) { entry := testhelper.DiscardTestEntry(t) - nodeMgr, err := nodes.NewManager(entry, conf, nil, queue, promtest.NewMockHistogramVec()) + nodeMgr, err := nodes.NewManager(entry, conf, nil, queue, middleware.NewErrors(0, 0, 0), promtest.NewMockHistogramVec()) require.NoError(t, err) nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond) @@ -215,7 +216,7 @@ func TestPropagateReplicationJob(t *testing.T) { queue := datastore.NewMemoryReplicationEventQueue(conf) logEntry := testhelper.DiscardTestEntry(t) - nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queue, promtest.NewMockHistogramVec()) + nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queue, middleware.NewErrors(0, 0, 0), promtest.NewMockHistogramVec()) require.NoError(t, err) nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond) @@ -507,7 +508,7 @@ func TestProcessBacklog_FailedJobs(t *testing.T) { logEntry := testhelper.DiscardTestEntry(t) - nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec()) + nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queueInterceptor, middleware.NewErrors(0, 0, 0), promtest.NewMockHistogramVec()) require.NoError(t, err) replMgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), queueInterceptor, nodeMgr) @@ -640,7 +641,7 @@ func TestProcessBacklog_Success(t *testing.T) { logEntry := testhelper.DiscardTestEntry(t) - nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec()) + nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queueInterceptor, middleware.NewErrors(0, 0, 0), promtest.NewMockHistogramVec()) require.NoError(t, err) replMgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), queueInterceptor, nodeMgr) diff --git a/internal/service/repository/optimize.go b/internal/service/repository/optimize.go index 5c366e881..1b831c1ef 100644 --- a/internal/service/repository/optimize.go +++ b/internal/service/repository/optimize.go @@ -31,8 +31,8 @@ func init() { func removeEmptyDirs(ctx context.Context, target string) error { if err := ctx.Err(); err != nil { - return err - } + return err + } entries, err := ioutil.ReadDir(target) switch { |