Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2020-05-21 21:25:43 +0300
committerJohn Cai <jcai@gitlab.com>2020-05-21 21:25:43 +0300
commit9407b045beb1ac7dd528cca2ce6b832d0dd9ad92 (patch)
treeca8cb5bb30c56f08e626038df8a77284108dc8bb
parent489aa21c3fca811f873fef0380265cfdb02338ed (diff)
-rw-r--r--internal/praefect/nodes/manager.go10
-rw-r--r--internal/praefect/server.go6
2 files changed, 12 insertions, 4 deletions
diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go
index f7e94f28f..b13bcacaf 100644
--- a/internal/praefect/nodes/manager.go
+++ b/internal/praefect/nodes/manager.go
@@ -8,6 +8,8 @@ import (
"math/rand"
"time"
+ "gitlab.com/gitlab-org/gitaly/internal/middleware/errorhandler"
+
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
@@ -84,7 +86,7 @@ var ErrPrimaryNotHealthy = errors.New("primary is not healthy")
const dialTimeout = 10 * time.Second
// NewManager creates a new NodeMgr based on virtual storage configs
-func NewManager(log *logrus.Entry, c config.Config, db *sql.DB, ds datastore.Datastore, latencyHistogram prommetrics.HistogramVec, dialOpts ...grpc.DialOption) (*Mgr, error) {
+func NewManager(log *logrus.Entry, c config.Config, db *sql.DB, ds datastore.Datastore, errTracker *errorhandler.Errors, latencyHistogram prommetrics.HistogramVec, dialOpts ...grpc.DialOption) (*Mgr, error) {
strategies := make(map[string]leaderElectionStrategy, len(c.VirtualStorages))
ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
@@ -114,7 +116,7 @@ func NewManager(log *logrus.Entry, c config.Config, db *sql.DB, ds datastore.Dat
if err != nil {
return nil, err
}
- cs := newConnectionStatus(*node, conn, log, latencyHistogram)
+ cs := newConnectionStatus(*node, conn, log, errTracker, latencyHistogram)
if node.DefaultPrimary {
ns[0] = cs
} else {
@@ -216,11 +218,12 @@ func (n *Mgr) GetSyncedNode(ctx context.Context, virtualStorageName, repoPath st
return shard.Primary, nil // there is no matched secondaries, maybe because of re-configuration
}
-func newConnectionStatus(node models.Node, cc *grpc.ClientConn, l *logrus.Entry, latencyHist prommetrics.HistogramVec) *nodeStatus {
+func newConnectionStatus(node models.Node, cc *grpc.ClientConn, l *logrus.Entry, errTracker *errorhandler.Errors, latencyHist prommetrics.HistogramVec) *nodeStatus {
return &nodeStatus{
Node: node,
ClientConn: cc,
log: l,
+ errTracker: errTracker,
latencyHist: latencyHist,
}
}
@@ -228,6 +231,7 @@ func newConnectionStatus(node models.Node, cc *grpc.ClientConn, l *logrus.Entry,
type nodeStatus struct {
models.Node
*grpc.ClientConn
+ errTracker *errorhandler.Errors
log *logrus.Entry
latencyHist prommetrics.HistogramVec
}
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index c56eda44c..141a1ed0b 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -6,6 +6,8 @@ import (
"context"
"net"
+ "gitlab.com/gitlab-org/gitaly/internal/middleware/errorhandler"
+
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
@@ -63,7 +65,7 @@ func (srv *Server) warnDupeAddrs(c config.Config) {
// NewServer returns an initialized praefect gPRC proxy server configured
// with the provided gRPC server options
-func NewServer(director proxy.StreamDirector, l *logrus.Entry, r *protoregistry.Registry, conf config.Config, grpcOpts ...grpc.ServerOption) *Server {
+func NewServer(director proxy.StreamDirector, l *logrus.Entry, r *protoregistry.Registry, conf config.Config, errTracker *errorhandler.Errors, grpcOpts ...grpc.ServerOption) *Server {
ctxTagOpts := []grpc_ctxtags.Option{
grpc_ctxtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor),
}
@@ -81,6 +83,7 @@ func NewServer(director proxy.StreamDirector, l *logrus.Entry, r *protoregistry.
cancelhandler.Stream, // Should be below LogHandler
grpctracing.StreamServerTracingInterceptor(),
auth.StreamServerInterceptor(conf.Auth),
+ errorhandler.StreamErrorHandler(errTracker, r),
// Panic handler should remain last so that application panics will be
// converted to errors and logged
panichandler.StreamPanicHandler,
@@ -96,6 +99,7 @@ func NewServer(director proxy.StreamDirector, l *logrus.Entry, r *protoregistry.
cancelhandler.Unary, // Should be below LogHandler
grpctracing.UnaryServerTracingInterceptor(),
auth.UnaryServerInterceptor(conf.Auth),
+ errorhandler.UnaryErrorHandler(errTracker, r),
// Panic handler should remain last so that application panics will be
// converted to errors and logged
panichandler.UnaryPanicHandler,