diff options
author | John Cai <jcai@gitlab.com> | 2020-05-21 21:25:43 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2020-05-21 21:25:43 +0300 |
commit | 9407b045beb1ac7dd528cca2ce6b832d0dd9ad92 (patch) | |
tree | ca8cb5bb30c56f08e626038df8a77284108dc8bb | |
parent | 489aa21c3fca811f873fef0380265cfdb02338ed (diff) |
-rw-r--r-- | internal/praefect/nodes/manager.go | 10 | ||||
-rw-r--r-- | internal/praefect/server.go | 6 |
2 files changed, 12 insertions, 4 deletions
diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go index f7e94f28f..b13bcacaf 100644 --- a/internal/praefect/nodes/manager.go +++ b/internal/praefect/nodes/manager.go @@ -8,6 +8,8 @@ import ( "math/rand" "time" + "gitlab.com/gitlab-org/gitaly/internal/middleware/errorhandler" + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" @@ -84,7 +86,7 @@ var ErrPrimaryNotHealthy = errors.New("primary is not healthy") const dialTimeout = 10 * time.Second // NewManager creates a new NodeMgr based on virtual storage configs -func NewManager(log *logrus.Entry, c config.Config, db *sql.DB, ds datastore.Datastore, latencyHistogram prommetrics.HistogramVec, dialOpts ...grpc.DialOption) (*Mgr, error) { +func NewManager(log *logrus.Entry, c config.Config, db *sql.DB, ds datastore.Datastore, errTracker *errorhandler.Errors, latencyHistogram prommetrics.HistogramVec, dialOpts ...grpc.DialOption) (*Mgr, error) { strategies := make(map[string]leaderElectionStrategy, len(c.VirtualStorages)) ctx, cancel := context.WithTimeout(context.Background(), dialTimeout) @@ -114,7 +116,7 @@ func NewManager(log *logrus.Entry, c config.Config, db *sql.DB, ds datastore.Dat if err != nil { return nil, err } - cs := newConnectionStatus(*node, conn, log, latencyHistogram) + cs := newConnectionStatus(*node, conn, log, errTracker, latencyHistogram) if node.DefaultPrimary { ns[0] = cs } else { @@ -216,11 +218,12 @@ func (n *Mgr) GetSyncedNode(ctx context.Context, virtualStorageName, repoPath st return shard.Primary, nil // there is no matched secondaries, maybe because of re-configuration } -func newConnectionStatus(node models.Node, cc *grpc.ClientConn, l *logrus.Entry, latencyHist prommetrics.HistogramVec) *nodeStatus { +func newConnectionStatus(node models.Node, cc *grpc.ClientConn, l *logrus.Entry, errTracker *errorhandler.Errors, latencyHist prommetrics.HistogramVec) *nodeStatus { return &nodeStatus{ Node: node, ClientConn: cc, log: l, + errTracker: errTracker, latencyHist: latencyHist, } } @@ -228,6 +231,7 @@ func newConnectionStatus(node models.Node, cc *grpc.ClientConn, l *logrus.Entry, type nodeStatus struct { models.Node *grpc.ClientConn + errTracker *errorhandler.Errors log *logrus.Entry latencyHist prommetrics.HistogramVec } diff --git a/internal/praefect/server.go b/internal/praefect/server.go index c56eda44c..141a1ed0b 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -6,6 +6,8 @@ import ( "context" "net" + "gitlab.com/gitlab-org/gitaly/internal/middleware/errorhandler" + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus" grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" @@ -63,7 +65,7 @@ func (srv *Server) warnDupeAddrs(c config.Config) { // NewServer returns an initialized praefect gPRC proxy server configured // with the provided gRPC server options -func NewServer(director proxy.StreamDirector, l *logrus.Entry, r *protoregistry.Registry, conf config.Config, grpcOpts ...grpc.ServerOption) *Server { +func NewServer(director proxy.StreamDirector, l *logrus.Entry, r *protoregistry.Registry, conf config.Config, errTracker *errorhandler.Errors, grpcOpts ...grpc.ServerOption) *Server { ctxTagOpts := []grpc_ctxtags.Option{ grpc_ctxtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor), } @@ -81,6 +83,7 @@ func NewServer(director proxy.StreamDirector, l *logrus.Entry, r *protoregistry. cancelhandler.Stream, // Should be below LogHandler grpctracing.StreamServerTracingInterceptor(), auth.StreamServerInterceptor(conf.Auth), + errorhandler.StreamErrorHandler(errTracker, r), // Panic handler should remain last so that application panics will be // converted to errors and logged panichandler.StreamPanicHandler, @@ -96,6 +99,7 @@ func NewServer(director proxy.StreamDirector, l *logrus.Entry, r *protoregistry. cancelhandler.Unary, // Should be below LogHandler grpctracing.UnaryServerTracingInterceptor(), auth.UnaryServerInterceptor(conf.Auth), + errorhandler.UnaryErrorHandler(errTracker, r), // Panic handler should remain last so that application panics will be // converted to errors and logged panichandler.UnaryPanicHandler, |