diff options
-rw-r--r-- | cmd/praefect/main.go | 8 | ||||
-rw-r--r-- | internal/middleware/proxytime/proxy.go | 15 | ||||
-rw-r--r-- | internal/praefect/auth_test.go | 6 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 32 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 3 | ||||
-rw-r--r-- | internal/praefect/helper_test.go | 9 | ||||
-rw-r--r-- | internal/praefect/metrics/prometheus.go | 8 | ||||
-rw-r--r-- | internal/praefect/server.go | 5 | ||||
-rw-r--r-- | internal/server/server.go | 3 |
9 files changed, 60 insertions, 29 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index a0868f36d..28c7d4c13 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -39,6 +39,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/bootstrap/starter" "gitlab.com/gitlab-org/gitaly/internal/config/sentry" "gitlab.com/gitlab-org/gitaly/internal/log" + "gitlab.com/gitlab-org/gitaly/internal/middleware/proxytime" "gitlab.com/gitlab-org/gitaly/internal/praefect" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" @@ -113,6 +114,8 @@ func configure() (config.Config, error) { logger.WithField("address", conf.PrometheusListenAddr).Info("Starting prometheus listener") conf.Prometheus.Configure() + metrics.RegisterProxyTime(conf) + go func() { if err := monitoring.Serve( monitoring.WithListenerAddress(conf.PrometheusListenAddr), @@ -154,9 +157,10 @@ func run(cfgs []starter.Config, conf config.Config) error { } var ( + tt = proxytime.NewTrailerTracker() // top level server dependencies ds = datastore.NewInMemory(conf) - coordinator = praefect.NewCoordinator(logger, ds, clientConnections, conf, protoregistry.GitalyProtoFileDescriptors...) + coordinator = praefect.NewCoordinator(logger, ds, clientConnections, conf, tt, protoregistry.GitalyProtoFileDescriptors...) repl = praefect.NewReplMgr( "default", logger, @@ -164,7 +168,7 @@ func run(cfgs []starter.Config, conf config.Config) error { clientConnections, praefect.WithLatencyMetric(latencyMetric), praefect.WithQueueMetric(queueMetric)) - srv = praefect.NewServer(coordinator, repl, nil, logger, clientConnections, conf) + srv = praefect.NewServer(coordinator, repl, nil, logger, clientConnections, conf, tt) serverErrors = make(chan error, 1) ) diff --git a/internal/middleware/proxytime/proxy.go b/internal/middleware/proxytime/proxy.go index 18f1776f4..0f4f86702 100644 --- a/internal/middleware/proxytime/proxy.go +++ b/internal/middleware/proxytime/proxy.go @@ -5,18 +5,17 @@ import ( "strconv" "time" - grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus" - "github.com/google/uuid" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus" "gitlab.com/gitlab-org/gitaly/internal/praefect/metrics" "google.golang.org/grpc" "google.golang.org/grpc/metadata" ) const ( - // requestIDKey is the key for a unique request id generated upon every rpc request that goes through praefect - requestIDKey = "proxy-request-id" + // RequestIDKey is the key for a unique request id generated upon every rpc request that goes through praefect + RequestIDKey = "proxy-request-id" ) // Unary is a gRPC server-side interceptor that provides a prometheus metric for the latency praefect adds to every gitaly request. @@ -26,7 +25,7 @@ func Unary(tracker *TrailerTracker) grpc.UnaryServerInterceptor { requestID := uuid.New().String() - resp, err := handler(appendToIncomingContext(ctx, metadata.Pairs(requestIDKey, requestID)), req) + resp, err := handler(appendToIncomingContext(ctx, metadata.Pairs(RequestIDKey, requestID)), req) trailer, trailerErr := tracker.RemoveTrailer(requestID) if trailerErr != nil { @@ -39,7 +38,7 @@ func Unary(tracker *TrailerTracker) grpc.UnaryServerInterceptor { gitalyTime, gitalyTimeErr := strconv.ParseFloat(gitalyTimeTrailer[0], 64) if gitalyTimeErr == nil { praefectTime := time.Since(startTime) - metrics.ProxyTime.Observe(float64(praefectTime) - gitalyTime) + metrics.ProxyTime.Observe((float64(praefectTime) - gitalyTime) / float64(time.Second)) } } @@ -55,7 +54,7 @@ func Stream(tracker *TrailerTracker) grpc.StreamServerInterceptor { requestID := uuid.New().String() wrapped := grpc_middleware.WrapServerStream(ss) - wrapped.WrappedContext = appendToIncomingContext(ss.Context(), metadata.Pairs(requestIDKey, requestID)) + wrapped.WrappedContext = appendToIncomingContext(ss.Context(), metadata.Pairs(RequestIDKey, requestID)) err := handler(srv, wrapped) @@ -70,7 +69,7 @@ func Stream(tracker *TrailerTracker) grpc.StreamServerInterceptor { gitalyTime, gitalyTimeErr := strconv.ParseFloat(gitalyTimeTrailer[0], 64) if gitalyTimeErr == nil { praefectTime := time.Since(startTime) - metrics.ProxyTime.Observe(float64(praefectTime) - gitalyTime) + metrics.ProxyTime.Observe((float64(praefectTime) - gitalyTime) / float64(time.Second)) } } diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go index f9130bfd1..6e78e2a80 100644 --- a/internal/praefect/auth_test.go +++ b/internal/praefect/auth_test.go @@ -11,6 +11,7 @@ import ( gitalyauth "gitlab.com/gitlab-org/gitaly/auth" "gitlab.com/gitlab-org/gitaly/internal/config/auth" "gitlab.com/gitlab-org/gitaly/internal/log" + "gitlab.com/gitlab-org/gitaly/internal/middleware/proxytime" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" @@ -189,15 +190,16 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func logEntry := log.Default() ds := datastore.NewInMemory(conf) + tt := proxytime.NewTrailerTracker() clientConnections := conn.NewClientConnections() clientConnections.RegisterNode("praefect-internal-0", backend, backendToken) - coordinator := NewCoordinator(logEntry, ds, clientConnections, conf, fd) + coordinator := NewCoordinator(logEntry, ds, clientConnections, conf, tt, fd) replMgr := NewReplMgr("praefect-internal-0", logEntry, ds, clientConnections) - srv := NewServer(coordinator, replMgr, nil, logEntry, clientConnections, conf) + srv := NewServer(coordinator, replMgr, nil, logEntry, clientConnections, conf, tt) serverSocketPath := testhelper.GetTemporaryGitalySocketFileName() diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index f1b1d5492..95edc167d 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -12,6 +12,8 @@ import ( "github.com/golang/protobuf/proto" "github.com/golang/protobuf/protoc-gen-go/descriptor" "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/internal/metadata" + "gitlab.com/gitlab-org/gitaly/internal/middleware/proxytime" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" @@ -19,6 +21,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -37,21 +40,23 @@ type Coordinator struct { datastore datastore.Datastore - registry *protoregistry.Registry - conf config.Config + registry *protoregistry.Registry + conf config.Config + trailerTracker *proxytime.TrailerTracker } // NewCoordinator returns a new Coordinator that utilizes the provided logger -func NewCoordinator(l *logrus.Entry, ds datastore.Datastore, clientConnections *conn.ClientConnections, conf config.Config, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator { +func NewCoordinator(l *logrus.Entry, ds datastore.Datastore, clientConnections *conn.ClientConnections, conf config.Config, trailerTracker *proxytime.TrailerTracker, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator { registry := protoregistry.New() registry.RegisterFiles(fileDescriptors...) return &Coordinator{ - log: l, - datastore: ds, - registry: registry, - connections: clientConnections, - conf: conf, + log: l, + datastore: ds, + registry: registry, + connections: clientConnections, + conf: conf, + trailerTracker: trailerTracker, } } @@ -110,7 +115,16 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, return nil, fmt.Errorf("unable to find existing client connection for %s", storage) } - return proxy.NewStreamParameters(ctx, cc, requestFinalizer, nil), nil + var opts []grpc.CallOption + proxyRequestID := metadata.GetValue(ctx, proxytime.RequestIDKey) + if proxyRequestID != "" { + trailer, err := c.trailerTracker.Trailer(proxyRequestID) + if err == nil { + opts = append(opts, trailer) + } + } + + return proxy.NewStreamParameters(ctx, cc, requestFinalizer, opts), nil } var noopRequestFinalizer = func() {} diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 4283779a1..15e3fe9fe 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -9,6 +9,7 @@ import ( "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/internal/log" + "gitlab.com/gitlab-org/gitaly/internal/middleware/proxytime" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" @@ -60,7 +61,7 @@ func TestStreamDirector(t *testing.T) { clientConnections := conn.NewClientConnections() clientConnections.RegisterNode("praefect-internal-1", fmt.Sprintf("tcp://%s", address), "token") - coordinator := NewCoordinator(log.Default(), ds, clientConnections, conf) + coordinator := NewCoordinator(log.Default(), ds, clientConnections, conf, proxytime.NewTrailerTracker()) require.NoError(t, coordinator.RegisterProtos(protoregistry.GitalyProtoFileDescriptors...)) frame, err := proto.Marshal(&gitalypb.FetchIntoObjectPoolRequest{ diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go index 6e8791afb..fc8b38e52 100644 --- a/internal/praefect/helper_test.go +++ b/internal/praefect/helper_test.go @@ -13,6 +13,7 @@ import ( "gitlab.com/gitlab-org/gitaly/client" internalauth "gitlab.com/gitlab-org/gitaly/internal/config/auth" "gitlab.com/gitlab-org/gitaly/internal/log" + "gitlab.com/gitlab-org/gitaly/internal/middleware/proxytime" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" @@ -72,8 +73,9 @@ func testConfig(backends int) config.Config { // injection func setupServer(t testing.TB, conf config.Config, clientCC *conn.ClientConnections, l *logrus.Entry, fds []*descriptor.FileDescriptorProto) (*datastore.MemoryDatastore, *Server) { var ( + tt = proxytime.NewTrailerTracker() ds = datastore.NewInMemory(conf) - coordinator = NewCoordinator(l, ds, clientCC, conf, fds...) + coordinator = NewCoordinator(l, ds, clientCC, conf, tt, fds...) ) var defaultNode *models.Node @@ -97,6 +99,7 @@ func setupServer(t testing.TB, conf config.Config, clientCC *conn.ClientConnecti l, clientCC, conf, + tt, ) return ds, server @@ -177,8 +180,9 @@ func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.Client ds := datastore.NewInMemory(conf) logEntry := log.Default() + tt := proxytime.NewTrailerTracker() - coordinator := NewCoordinator(logEntry, ds, clientCC, conf, protoregistry.GitalyProtoFileDescriptors...) + coordinator := NewCoordinator(logEntry, ds, clientCC, conf, tt, protoregistry.GitalyProtoFileDescriptors...) replmgr := NewReplMgr( "", @@ -195,6 +199,7 @@ func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.Client logEntry, clientCC, conf, + tt, ) listener, port := listenAvailPort(t) diff --git a/internal/praefect/metrics/prometheus.go b/internal/praefect/metrics/prometheus.go index 4fcec26c8..61aa26ed9 100644 --- a/internal/praefect/metrics/prometheus.go +++ b/internal/praefect/metrics/prometheus.go @@ -54,12 +54,12 @@ var ( ProxyTime prometheus.Histogram ) -// Register registers praefect prometheus metrics -func Register(c config.Config) { - once.Do(func() { register(c) }) +// RegisterProxyTime registers praefect prometheus metrics +func RegisterProxyTime(c config.Config) { + once.Do(func() { registerProxyTime(c) }) } -func register(c config.Config) { +func registerProxyTime(c config.Config) { ProxyTime = prometheus.NewHistogram(prometheus.HistogramOpts{ Name: "praefect_proxy_time", Help: "Latency added by praefect", diff --git a/internal/praefect/server.go b/internal/praefect/server.go index ceb1b8a76..141be7150 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -15,6 +15,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/middleware/cancelhandler" "gitlab.com/gitlab-org/gitaly/internal/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/internal/middleware/panichandler" + "gitlab.com/gitlab-org/gitaly/internal/middleware/proxytime" "gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" @@ -60,7 +61,7 @@ func (srv *Server) warnDupeAddrs(c config.Config) { // NewServer returns an initialized praefect gPRC proxy server configured // with the provided gRPC server options -func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *logrus.Entry, clientConnections *conn.ClientConnections, conf config.Config) *Server { +func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *logrus.Entry, clientConnections *conn.ClientConnections, conf config.Config, tt *proxytime.TrailerTracker) *Server { ctxTagOpts := []grpc_ctxtags.Option{ grpc_ctxtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor), } @@ -68,6 +69,7 @@ func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *lo grpcOpts = append(grpcOpts, proxyRequiredOpts(c.streamDirector)...) grpcOpts = append(grpcOpts, []grpc.ServerOption{ grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( + proxytime.Stream(tt), grpc_ctxtags.StreamServerInterceptor(ctxTagOpts...), grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler metadatahandler.StreamInterceptor, @@ -82,6 +84,7 @@ func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *lo panichandler.StreamPanicHandler, )), grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( + proxytime.Unary(tt), grpc_ctxtags.UnaryServerInterceptor(ctxTagOpts...), grpccorrelation.UnaryServerCorrelationInterceptor(), // Must be above the metadata handler metadatahandler.UnaryInterceptor, diff --git a/internal/server/server.go b/internal/server/server.go index 250749248..b0cfcc5ec 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -20,6 +20,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/middleware/limithandler" "gitlab.com/gitlab-org/gitaly/internal/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/internal/middleware/panichandler" + "gitlab.com/gitlab-org/gitaly/internal/middleware/proxytime" "gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/internal/rubyserver" @@ -78,6 +79,7 @@ func createNewServer(rubyServer *rubyserver.Server, secure bool) *grpc.Server { opts := []grpc.ServerOption{ grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( + proxytime.StreamGitalyTime, grpc_ctxtags.StreamServerInterceptor(ctxTagOpts...), grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler metadatahandler.StreamInterceptor, @@ -94,6 +96,7 @@ func createNewServer(rubyServer *rubyserver.Server, secure bool) *grpc.Server { panichandler.StreamPanicHandler, )), grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( + proxytime.UnaryGitalyTime, grpc_ctxtags.UnaryServerInterceptor(ctxTagOpts...), grpccorrelation.UnaryServerCorrelationInterceptor(), // Must be above the metadata handler metadatahandler.UnaryInterceptor, |