Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2020-05-20 01:01:01 +0300
committerJohn Cai <jcai@gitlab.com>2020-05-30 04:46:34 +0300
commit372ede9fdb5fd4f7e4f5b6ced9170457cb27d2ce (patch)
treed5a37bb55925b9e7288a4331cb87e0544c04447a
parenta23e2d9c4a922bead1d45a24a96f43930abc48e4 (diff)
Keep track of read/write errors in praefectjc-node-manager-error-threshold-client
-rw-r--r--changelogs/unreleased/jc-node-manager-error-threshold-client.yml5
-rw-r--r--cmd/praefect/main.go11
-rw-r--r--internal/praefect/auth_test.go3
-rw-r--r--internal/praefect/config/config.go9
-rw-r--r--internal/praefect/coordinator_test.go7
-rw-r--r--internal/praefect/helper_test.go3
-rw-r--r--internal/praefect/middleware/errorhandler.go179
-rw-r--r--internal/praefect/middleware/errorhandler_test.go180
-rw-r--r--internal/praefect/nodes/local_elector_test.go6
-rw-r--r--internal/praefect/nodes/manager.go27
-rw-r--r--internal/praefect/nodes/manager_test.go15
-rw-r--r--internal/praefect/nodes/sql_elector_test.go7
-rw-r--r--internal/praefect/replicator_test.go9
-rw-r--r--internal/service/repository/optimize.go4
14 files changed, 435 insertions, 30 deletions
diff --git a/changelogs/unreleased/jc-node-manager-error-threshold-client.yml b/changelogs/unreleased/jc-node-manager-error-threshold-client.yml
new file mode 100644
index 000000000..d4228c004
--- /dev/null
+++ b/changelogs/unreleased/jc-node-manager-error-threshold-client.yml
@@ -0,0 +1,5 @@
+---
+title: Node manager error threshold client
+merge_request: 2230
+author:
+type: added
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index c41ed9820..75e0dbe15 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -101,6 +101,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/internal/praefect/metrics"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/middleware"
"gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/internal/praefect/transactions"
@@ -238,7 +239,15 @@ func run(cfgs []starter.Config, conf config.Config) error {
queue = datastore.NewPostgresReplicationEventQueue(db)
}
- nodeManager, err := nodes.NewManager(logger, conf, db, queue, nodeLatencyHistogram)
+ nodeManager, err := nodes.NewManager(
+ logger,
+ conf,
+ db,
+ queue,
+ middleware.NewErrors(conf.Failover.ErrorThresholdWindowSeconds, conf.Failover.ReadErrorThreshold, conf.Failover.WriteErrorThreshold),
+ nodeLatencyHistogram,
+ )
+
if err != nil {
return err
}
diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go
index cc5f35bbb..3144665d1 100644
--- a/internal/praefect/auth_test.go
+++ b/internal/praefect/auth_test.go
@@ -12,6 +12,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/config/auth"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/middleware"
"gitlab.com/gitlab-org/gitaly/internal/praefect/mock"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
@@ -171,7 +172,7 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func
logEntry := testhelper.DiscardTestEntry(t)
queue := datastore.NewMemoryReplicationEventQueue(conf)
- nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queue, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queue, middleware.NewErrors(0, 0, 0), promtest.NewMockHistogramVec())
require.NoError(t, err)
txMgr := transactions.NewManager()
diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go
index 1057b9c1d..60f944b46 100644
--- a/internal/praefect/config/config.go
+++ b/internal/praefect/config/config.go
@@ -16,9 +16,12 @@ import (
)
type Failover struct {
- Enabled bool `toml:"enabled"`
- ElectionStrategy string `toml:"election_strategy"`
- ReadOnlyAfterFailover bool `toml:"read_only_after_failover"`
+ Enabled bool `toml:"enabled"`
+ ElectionStrategy string `toml:"election_strategy"`
+ ReadOnlyAfterFailover bool `toml:"read_only_after_failover"`
+ ErrorThresholdWindowSeconds int `toml:"error_threshold_time_seconds"`
+ WriteErrorThreshold int `toml:"write_error_threshold"`
+ ReadErrorThreshold int `toml:"read_error_threshold"`
}
// Config is a container for everything found in the TOML config file
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index f104e55b1..faef2b09c 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -15,6 +15,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/middleware/metadatahandler"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/middleware"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
@@ -182,7 +183,7 @@ func TestStreamDirectorMutator(t *testing.T) {
entry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(entry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, queueInterceptor, middleware.NewErrors(0, 0, 0), promtest.NewMockHistogramVec())
require.NoError(t, err)
txMgr := transactions.NewManager()
@@ -284,7 +285,7 @@ func TestStreamDirectorAccessor(t *testing.T) {
entry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(entry, conf, nil, queue, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, queue, middleware.NewErrors(0, 0, 0), promtest.NewMockHistogramVec())
require.NoError(t, err)
txMgr := transactions.NewManager()
@@ -377,7 +378,7 @@ func TestAbsentCorrelationID(t *testing.T) {
entry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(entry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, queueInterceptor, middleware.NewErrors(0, 0, 0), promtest.NewMockHistogramVec())
require.NoError(t, err)
txMgr := transactions.NewManager()
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index a8db569ce..49f97be8d 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -17,6 +17,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/middleware"
"gitlab.com/gitlab-org/gitaly/internal/praefect/mock"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
@@ -206,7 +207,7 @@ func defaultTxMgr() *transactions.Manager {
}
func defaultNodeMgr(t testing.TB, conf config.Config, queue datastore.ReplicationEventQueue) nodes.Manager {
- nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, queue, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, queue, middleware.NewErrors(0, 0, 0), promtest.NewMockHistogramVec())
require.NoError(t, err)
nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond)
return nodeMgr
diff --git a/internal/praefect/middleware/errorhandler.go b/internal/praefect/middleware/errorhandler.go
new file mode 100644
index 000000000..2ecd447b5
--- /dev/null
+++ b/internal/praefect/middleware/errorhandler.go
@@ -0,0 +1,179 @@
+package middleware
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "sync"
+ "time"
+
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
+ "google.golang.org/grpc"
+)
+
+func StreamErrorHandler(registry *protoregistry.Registry, errorTracker *Errors, node string) grpc.StreamClientInterceptor {
+ return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
+ stream, err := streamer(ctx, desc, cc, method, opts...)
+
+ mi, lookupErr := registry.LookupMethod(method)
+ if err != nil {
+ return nil, fmt.Errorf("error when looking up method: %w %v", err, lookupErr)
+ }
+
+ return newCatchErrorStreamer(stream, errorTracker, mi.Operation, node), err
+ }
+}
+
+type CatchErrorStreamer struct {
+ grpc.ClientStream
+ errors *Errors
+ operation protoregistry.OpType
+ node string
+}
+
+func (c *CatchErrorStreamer) SendMsg(m interface{}) error {
+ err := c.ClientStream.SendMsg(m)
+ if err != nil {
+ switch c.operation {
+ case protoregistry.OpAccessor:
+ c.errors.IncrReadErr(c.node)
+ case protoregistry.OpMutator:
+ c.errors.IncrWriteErr(c.node)
+ }
+ }
+
+ return err
+}
+
+func (c *CatchErrorStreamer) RecvMsg(m interface{}) error {
+ err := c.ClientStream.RecvMsg(m)
+ if err != nil && err != io.EOF {
+ switch c.operation {
+ case protoregistry.OpAccessor:
+ c.errors.IncrReadErr(c.node)
+ case protoregistry.OpMutator:
+ c.errors.IncrWriteErr(c.node)
+ }
+ }
+
+ return err
+}
+
+func newCatchErrorStreamer(streamer grpc.ClientStream, errors *Errors, operation protoregistry.OpType, node string) *CatchErrorStreamer {
+ return &CatchErrorStreamer{
+ ClientStream: streamer,
+ errors: errors,
+ operation: operation,
+ node: node,
+ }
+}
+
+type Errors struct {
+ duration time.Duration
+ m sync.RWMutex
+ writeThreshold, readThreshold int
+ readErrors, writeErrors map[string][]int64
+}
+
+const (
+ defaultWindowSeconds = 10
+)
+
+func NewErrors(windowSeconds, readThreshold, writeThreshold int) *Errors {
+ if windowSeconds <= 0 {
+ windowSeconds = 10
+ }
+
+ window := time.Duration(windowSeconds) * time.Second
+
+ if readThreshold == 0 {
+ readThreshold = 100 * defaultWindowSeconds
+ }
+
+ if writeThreshold == 0 {
+ writeThreshold = 100 * defaultWindowSeconds
+ }
+
+ e := &Errors{
+ duration: window,
+ readErrors: make(map[string][]int64),
+ writeErrors: make(map[string][]int64),
+ readThreshold: readThreshold,
+ writeThreshold: writeThreshold,
+ }
+ go e.PeriodicallyClear()
+
+ return e
+}
+
+func (e *Errors) IncrReadErr(node string) {
+ e.m.Lock()
+ defer e.m.Unlock()
+
+ if len(e.readErrors[node]) < e.readThreshold {
+ e.readErrors[node] = append(e.readErrors[node], time.Now().UnixNano())
+ }
+}
+
+func (e *Errors) IncrWriteErr(node string) {
+ e.m.Lock()
+ defer e.m.Unlock()
+
+ if len(e.writeErrors[node]) < e.writeThreshold {
+ e.writeErrors[node] = append(e.writeErrors[node], time.Now().UnixNano())
+ }
+}
+
+func (e *Errors) ReadThresholdReached(node string) bool {
+ e.m.RLock()
+ defer e.m.RUnlock()
+
+ return len(e.readErrors[node]) >= e.readThreshold
+}
+
+func (e *Errors) WriteThresholdReached(node string) bool {
+ e.m.RLock()
+ defer e.m.RUnlock()
+
+ return len(e.writeErrors[node]) >= e.writeThreshold
+}
+
+func (e *Errors) PeriodicallyClear() {
+ ticker := time.NewTicker(e.duration)
+ for {
+ start := time.Now()
+ <-ticker.C
+ e.clear(start)
+ }
+}
+
+func (e *Errors) clear(olderThan time.Time) {
+ e.m.Lock()
+ defer e.m.Unlock()
+
+ for node, nodeWriteErrors := range e.writeErrors {
+ for i, writeErrorTime := range nodeWriteErrors {
+ if time.Unix(0, writeErrorTime).Before(olderThan) {
+ if i+1 == len(nodeWriteErrors) {
+ e.writeErrors[node] = nil
+ } else {
+ e.writeErrors[node] = nodeWriteErrors[i+1:]
+ }
+ break
+ }
+ }
+ }
+
+ for node, nodeReadErrors := range e.readErrors {
+ for i, readErrorTime := range nodeReadErrors {
+ if time.Unix(0, readErrorTime).Before(olderThan) {
+ if i+1 == len(nodeReadErrors) {
+ e.readErrors[node] = nil
+ } else {
+ e.readErrors[node] = nodeReadErrors[i+1:]
+ }
+ break
+ }
+ }
+ }
+}
diff --git a/internal/praefect/middleware/errorhandler_test.go b/internal/praefect/middleware/errorhandler_test.go
new file mode 100644
index 000000000..b158102f1
--- /dev/null
+++ b/internal/praefect/middleware/errorhandler_test.go
@@ -0,0 +1,180 @@
+package middleware
+
+import (
+ "context"
+ "fmt"
+ "net"
+ "testing"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+ "github.com/golang/protobuf/ptypes/empty"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/mock"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/reflection"
+)
+
+func TestErrorTracker_IncrErrors(t *testing.T) {
+ writeThreshold, readThreshold := 10, 10
+ errors := NewErrors(100000, readThreshold, writeThreshold)
+
+ node := "backend-node-1"
+
+ assert.False(t, errors.WriteThresholdReached(node))
+ assert.False(t, errors.ReadThresholdReached(node))
+
+ for i := 0; i < writeThreshold; i++ {
+ errors.IncrWriteErr(node)
+ }
+
+ assert.True(t, errors.WriteThresholdReached(node))
+
+ for i := 0; i < readThreshold; i++ {
+ errors.IncrReadErr(node)
+ }
+
+ assert.True(t, errors.ReadThresholdReached(node))
+
+ errors.clear(time.Now())
+
+ assert.False(t, errors.WriteThresholdReached(node))
+ assert.False(t, errors.ReadThresholdReached(node))
+}
+
+func TestErrorTracker_ClearErrors(t *testing.T) {
+ writeThreshold, readThreshold := 10, 10
+ errors := NewErrors(100000, readThreshold, writeThreshold)
+
+ node := "backend-node-1"
+
+ errors.IncrWriteErr(node)
+ errors.IncrReadErr(node)
+
+ time.Sleep(10 * time.Millisecond)
+ clearErrorsOlderThan := time.Now()
+ time.Sleep(10 * time.Millisecond)
+
+ errors.IncrWriteErr(node)
+ errors.IncrReadErr(node)
+
+ errors.clear(clearErrorsOlderThan)
+ assert.Len(t, errors.readErrors[node], 1, "clear should only have cleared the read error older than the time specifiied")
+ assert.Len(t, errors.writeErrors[node], 1, "clear should only have cleared the write error older than the time specifiied")
+}
+
+type simpleService struct{}
+
+func (s *simpleService) RepoAccessorUnary(ctx context.Context, in *mock.RepoRequest) (*empty.Empty, error) {
+ if in.GetRepo() == nil {
+ return nil, helper.ErrInternalf("error")
+ }
+
+ return &empty.Empty{}, nil
+}
+
+func (s *simpleService) RepoMutatorUnary(ctx context.Context, in *mock.RepoRequest) (*empty.Empty, error) {
+ if in.GetRepo() == nil {
+ return nil, helper.ErrInternalf("error")
+ }
+
+ return &empty.Empty{}, nil
+}
+
+func (s *simpleService) ServerAccessor(ctx context.Context, in *mock.SimpleRequest) (*mock.SimpleResponse, error) {
+ return &mock.SimpleResponse{}, nil
+}
+
+func TestStreamInterceptor(t *testing.T) {
+ threshold := 5
+ errTracker := NewErrors(10000, threshold, threshold)
+ nodeName := "node-1"
+
+ internalSrv := grpc.NewServer()
+
+ internalServerSocketPath := testhelper.GetTemporaryGitalySocketFileName()
+ lis, err := net.Listen("unix", internalServerSocketPath)
+
+ gz := proto.FileDescriptor("mock.proto")
+ fmt.Printf("\nGZ SIZE: %d\n", len(gz))
+ fd, err := protoregistry.ExtractFileDescriptor(gz)
+ require.NoError(t, err)
+
+ registry, err := protoregistry.New(fd)
+ require.NoError(t, err)
+
+ require.NoError(t, err)
+ mock.RegisterSimpleServiceServer(internalSrv, &simpleService{})
+ reflection.Register(internalSrv)
+
+ go internalSrv.Serve(lis)
+ defer internalSrv.Stop()
+
+ srvOptions := []grpc.ServerOption{
+ grpc.CustomCodec(proxy.NewCodec()),
+ grpc.UnknownServiceHandler(proxy.TransparentHandler(func(ctx context.Context,
+ fullMethodName string,
+ peeker proxy.StreamModifier,
+ ) (*proxy.StreamParameters, error) {
+ cc, err := grpc.Dial("unix://"+internalServerSocketPath,
+ grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec())),
+ grpc.WithInsecure(),
+ grpc.WithStreamInterceptor(StreamErrorHandler(registry, errTracker, nodeName)),
+ )
+ require.NoError(t, err)
+ return proxy.NewStreamParameters(ctx, cc, func() {}, nil), nil
+ })),
+ }
+
+ praefectSocket := testhelper.GetTemporaryGitalySocketFileName()
+ praefectLis, err := net.Listen("unix", praefectSocket)
+ require.NoError(t, err)
+
+ praefectSrv := grpc.NewServer(srvOptions...)
+ defer praefectSrv.Stop()
+ go praefectSrv.Serve(praefectLis)
+
+ praefectCC, err := grpc.Dial("unix://"+praefectSocket, grpc.WithInsecure())
+ require.NoError(t, err)
+
+ simpleClient := mock.NewSimpleServiceClient(praefectCC)
+
+ testRepo, _, cleanup := testhelper.NewTestRepo(t)
+ defer cleanup()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ for i := 0; i < threshold; i++ {
+ _, err = simpleClient.RepoAccessorUnary(ctx, &mock.RepoRequest{
+ Repo: testRepo,
+ })
+ require.NoError(t, err)
+ _, err = simpleClient.RepoMutatorUnary(ctx, &mock.RepoRequest{
+ Repo: testRepo,
+ })
+ require.NoError(t, err)
+ }
+
+ assert.False(t, errTracker.WriteThresholdReached(nodeName))
+ assert.False(t, errTracker.ReadThresholdReached(nodeName))
+
+ for i := 0; i < threshold; i++ {
+ _, err = simpleClient.RepoAccessorUnary(ctx, &mock.RepoRequest{
+ Repo: nil,
+ })
+ require.Error(t, err)
+ _, err = simpleClient.RepoMutatorUnary(ctx, &mock.RepoRequest{
+ Repo: nil,
+ })
+ require.Error(t, err)
+ }
+
+ assert.True(t, errTracker.WriteThresholdReached(nodeName))
+ assert.True(t, errTracker.ReadThresholdReached(nodeName))
+}
diff --git a/internal/praefect/nodes/local_elector_test.go b/internal/praefect/nodes/local_elector_test.go
index 3b8e40f46..721956d1a 100644
--- a/internal/praefect/nodes/local_elector_test.go
+++ b/internal/praefect/nodes/local_elector_test.go
@@ -6,6 +6,7 @@ import (
"time"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/middleware"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest"
@@ -26,8 +27,9 @@ func setupElector(t *testing.T) (*localElector, []*nodeStatus, *grpc.ClientConn,
storageName := "default"
mockHistogramVec0, mockHistogramVec1 := promtest.NewMockHistogramVec(), promtest.NewMockHistogramVec()
- cs := newConnectionStatus(models.Node{Storage: storageName}, cc, testhelper.DiscardTestEntry(t), mockHistogramVec0)
- secondary := newConnectionStatus(models.Node{Storage: storageName}, cc, testhelper.DiscardTestEntry(t), mockHistogramVec1)
+ errTracker := middleware.NewErrors(0, 0, 0)
+ cs := newConnectionStatus(models.Node{Storage: storageName}, cc, testhelper.DiscardTestEntry(t), errTracker, mockHistogramVec0)
+ secondary := newConnectionStatus(models.Node{Storage: storageName}, cc, testhelper.DiscardTestEntry(t), errTracker, mockHistogramVec1)
ns := []*nodeStatus{cs, secondary}
logger := testhelper.NewTestLogger(t).WithField("test", t.Name())
strategy := newLocalElector(storageName, true, true, logger, ns)
diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go
index 91f2c1d97..4fb285653 100644
--- a/internal/praefect/nodes/manager.go
+++ b/internal/praefect/nodes/manager.go
@@ -19,7 +19,9 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
"gitlab.com/gitlab-org/gitaly/internal/praefect/metrics"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/middleware"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
prommetrics "gitlab.com/gitlab-org/gitaly/internal/prometheus/metrics"
correlation "gitlab.com/gitlab-org/labkit/correlation/grpc"
grpctracing "gitlab.com/gitlab-org/labkit/tracing/grpc"
@@ -98,7 +100,15 @@ var ErrPrimaryNotHealthy = errors.New("primary is not healthy")
const dialTimeout = 10 * time.Second
// NewManager creates a new NodeMgr based on virtual storage configs
-func NewManager(log *logrus.Entry, c config.Config, db *sql.DB, queue datastore.ReplicationEventQueue, latencyHistogram prommetrics.HistogramVec, dialOpts ...grpc.DialOption) (*Mgr, error) {
+func NewManager(
+ log *logrus.Entry,
+ c config.Config,
+ db *sql.DB,
+ queue datastore.ReplicationEventQueue,
+ errTracker *middleware.Errors,
+ latencyHistogram prommetrics.HistogramVec,
+ dialOpts ...grpc.DialOption,
+) (*Mgr, error) {
strategies := make(map[string]leaderElectionStrategy, len(c.VirtualStorages))
ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
@@ -117,6 +127,7 @@ func NewManager(log *logrus.Entry, c config.Config, db *sql.DB, queue datastore.
grpc_prometheus.StreamClientInterceptor,
grpctracing.StreamClientTracingInterceptor(),
correlation.StreamClientCorrelationInterceptor(),
+ middleware.StreamErrorHandler(protoregistry.GitalyProtoPreregistered, errTracker, node.Storage),
)),
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
grpc_prometheus.UnaryClientInterceptor,
@@ -128,7 +139,7 @@ func NewManager(log *logrus.Entry, c config.Config, db *sql.DB, queue datastore.
if err != nil {
return nil, err
}
- cs := newConnectionStatus(*node, conn, log, latencyHistogram)
+ cs := newConnectionStatus(*node, conn, log, errTracker, latencyHistogram)
if node.DefaultPrimary {
ns[0] = cs
} else {
@@ -229,11 +240,12 @@ func (n *Mgr) GetSyncedNode(ctx context.Context, virtualStorageName, repoPath st
return shard.Primary, nil // there is no matched secondaries, maybe because of re-configuration
}
-func newConnectionStatus(node models.Node, cc *grpc.ClientConn, l *logrus.Entry, latencyHist prommetrics.HistogramVec) *nodeStatus {
+func newConnectionStatus(node models.Node, cc *grpc.ClientConn, l *logrus.Entry, errTracker *middleware.Errors, latencyHist prommetrics.HistogramVec) *nodeStatus {
return &nodeStatus{
Node: node,
ClientConn: cc,
log: l,
+ errTracker: errTracker,
latencyHist: latencyHist,
}
}
@@ -241,6 +253,7 @@ func newConnectionStatus(node models.Node, cc *grpc.ClientConn, l *logrus.Entry,
type nodeStatus struct {
models.Node
*grpc.ClientConn
+ errTracker *middleware.Errors
log *logrus.Entry
latencyHist prommetrics.HistogramVec
}
@@ -268,6 +281,14 @@ func (n *nodeStatus) GetConnection() *grpc.ClientConn {
const checkTimeout = 1 * time.Second
func (n *nodeStatus) check(ctx context.Context) (bool, error) {
+ if n.errTracker.WriteThresholdReached(n.Storage) {
+ return false, errors.New("too many write errors")
+ }
+
+ if n.errTracker.ReadThresholdReached(n.Storage) {
+ return false, errors.New("too many read errors")
+ }
+
client := healthpb.NewHealthClient(n.ClientConn)
ctx, cancel := context.WithTimeout(ctx, checkTimeout)
defer cancel()
diff --git a/internal/praefect/nodes/manager_test.go b/internal/praefect/nodes/manager_test.go
index e2b318a59..0448b8401 100644
--- a/internal/praefect/nodes/manager_test.go
+++ b/internal/praefect/nodes/manager_test.go
@@ -11,6 +11,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/middleware"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest"
@@ -34,7 +35,7 @@ func TestNodeStatus(t *testing.T) {
mockHistogramVec := promtest.NewMockHistogramVec()
storageName := "default"
- cs := newConnectionStatus(models.Node{Storage: storageName}, cc, testhelper.DiscardTestEntry(t), mockHistogramVec)
+ cs := newConnectionStatus(models.Node{Storage: storageName}, cc, testhelper.DiscardTestEntry(t), middleware.NewErrors(0, 0, 0), mockHistogramVec)
var expectedLabels [][]string
for i := 0; i < healthcheckThreshold; i++ {
@@ -86,7 +87,7 @@ func TestManagerFailoverDisabledElectionStrategySQL(t *testing.T) {
Failover: config.Failover{Enabled: false, ElectionStrategy: "sql"},
VirtualStorages: []*config.VirtualStorage{virtualStorage},
}
- nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec())
+ nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, middleware.NewErrors(0, 0, 0), promtest.NewMockHistogramVec())
require.NoError(t, err)
nm.Start(time.Millisecond, time.Millisecond)
@@ -134,7 +135,7 @@ func TestPrimaryIsSecond(t *testing.T) {
}
mockHistogram := promtest.NewMockHistogramVec()
- nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, mockHistogram)
+ nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, middleware.NewErrors(0, 0, 0), mockHistogram)
require.NoError(t, err)
shard, err := nm.GetShard("virtual-storage-0")
@@ -184,7 +185,7 @@ func TestBlockingDial(t *testing.T) {
healthSrv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
}()
- mgr, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec())
+ mgr, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, middleware.NewErrors(0, 0, 0), promtest.NewMockHistogramVec())
require.NoError(t, err)
mgr.Start(1*time.Millisecond, 1*time.Millisecond)
@@ -230,10 +231,10 @@ func TestNodeManager(t *testing.T) {
}
mockHistogram := promtest.NewMockHistogramVec()
- nm, err := NewManager(testhelper.DiscardTestEntry(t), confWithFailover, nil, nil, mockHistogram)
+ nm, err := NewManager(testhelper.DiscardTestEntry(t), confWithFailover, nil, nil, middleware.NewErrors(0, 0, 0), mockHistogram)
require.NoError(t, err)
- nmWithoutFailover, err := NewManager(testhelper.DiscardTestEntry(t), confWithoutFailover, nil, nil, mockHistogram)
+ nmWithoutFailover, err := NewManager(testhelper.DiscardTestEntry(t), confWithoutFailover, nil, nil, middleware.NewErrors(0, 0, 0), mockHistogram)
require.NoError(t, err)
nm.Start(1*time.Millisecond, 5*time.Second)
@@ -394,7 +395,7 @@ func TestMgr_GetSyncedNode(t *testing.T) {
verify := func(scenario func(t *testing.T, nm Manager, queue datastore.ReplicationEventQueue)) func(*testing.T) {
queue := datastore.NewMemoryReplicationEventQueue(conf)
- nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, queue, mockHistogram)
+ nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, queue, middleware.NewErrors(0, 0, 0), mockHistogram)
require.NoError(t, err)
nm.Start(time.Duration(0), time.Hour)
diff --git a/internal/praefect/nodes/sql_elector_test.go b/internal/praefect/nodes/sql_elector_test.go
index adc602952..42765fe59 100644
--- a/internal/praefect/nodes/sql_elector_test.go
+++ b/internal/praefect/nodes/sql_elector_test.go
@@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/middleware"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest"
@@ -43,7 +44,7 @@ func TestGetPrimaryAndSecondaries(t *testing.T) {
storageName := "default"
mockHistogramVec0 := promtest.NewMockHistogramVec()
- cs0 := newConnectionStatus(models.Node{Storage: storageName + "-0"}, cc0, testhelper.DiscardTestEntry(t), mockHistogramVec0)
+ cs0 := newConnectionStatus(models.Node{Storage: storageName + "-0"}, cc0, testhelper.DiscardTestEntry(t), middleware.NewErrors(0, 0, 0), mockHistogramVec0)
ns := []*nodeStatus{cs0}
elector := newSQLElector(shardName, conf, 1, defaultActivePraefectSeconds, db.DB, logger, ns)
@@ -98,8 +99,8 @@ func TestBasicFailover(t *testing.T) {
storageName := "default"
mockHistogramVec0, mockHistogramVec1 := promtest.NewMockHistogramVec(), promtest.NewMockHistogramVec()
- cs0 := newConnectionStatus(models.Node{Storage: storageName + "-0"}, cc0, testhelper.DiscardTestEntry(t), mockHistogramVec0)
- cs1 := newConnectionStatus(models.Node{Storage: storageName + "-1"}, cc1, testhelper.DiscardTestEntry(t), mockHistogramVec1)
+ cs0 := newConnectionStatus(models.Node{Storage: storageName + "-0"}, cc0, testhelper.DiscardTestEntry(t), middleware.NewErrors(0, 0, 0), mockHistogramVec0)
+ cs1 := newConnectionStatus(models.Node{Storage: storageName + "-1"}, cc1, testhelper.DiscardTestEntry(t), middleware.NewErrors(0, 0, 0), mockHistogramVec1)
ns := []*nodeStatus{cs0, cs1}
failoverTimeSeconds := 1
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index fbfcb96ab..7937e40e4 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -19,6 +19,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/middleware/metadatahandler"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/middleware"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
@@ -122,7 +123,7 @@ func TestProcessReplicationJob(t *testing.T) {
entry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(entry, conf, nil, queue, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, queue, middleware.NewErrors(0, 0, 0), promtest.NewMockHistogramVec())
require.NoError(t, err)
nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond)
@@ -215,7 +216,7 @@ func TestPropagateReplicationJob(t *testing.T) {
queue := datastore.NewMemoryReplicationEventQueue(conf)
logEntry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queue, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queue, middleware.NewErrors(0, 0, 0), promtest.NewMockHistogramVec())
require.NoError(t, err)
nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond)
@@ -507,7 +508,7 @@ func TestProcessBacklog_FailedJobs(t *testing.T) {
logEntry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queueInterceptor, middleware.NewErrors(0, 0, 0), promtest.NewMockHistogramVec())
require.NoError(t, err)
replMgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), queueInterceptor, nodeMgr)
@@ -640,7 +641,7 @@ func TestProcessBacklog_Success(t *testing.T) {
logEntry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queueInterceptor, middleware.NewErrors(0, 0, 0), promtest.NewMockHistogramVec())
require.NoError(t, err)
replMgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), queueInterceptor, nodeMgr)
diff --git a/internal/service/repository/optimize.go b/internal/service/repository/optimize.go
index 5c366e881..1b831c1ef 100644
--- a/internal/service/repository/optimize.go
+++ b/internal/service/repository/optimize.go
@@ -31,8 +31,8 @@ func init() {
func removeEmptyDirs(ctx context.Context, target string) error {
if err := ctx.Err(); err != nil {
- return err
- }
+ return err
+ }
entries, err := ioutil.ReadDir(target)
switch {