Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cmd/praefect/main.go8
-rw-r--r--internal/middleware/proxytime/proxy.go15
-rw-r--r--internal/praefect/auth_test.go6
-rw-r--r--internal/praefect/coordinator.go32
-rw-r--r--internal/praefect/coordinator_test.go3
-rw-r--r--internal/praefect/helper_test.go9
-rw-r--r--internal/praefect/metrics/prometheus.go8
-rw-r--r--internal/praefect/server.go5
-rw-r--r--internal/server/server.go3
9 files changed, 60 insertions, 29 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index a0868f36d..28c7d4c13 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -39,6 +39,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/bootstrap/starter"
"gitlab.com/gitlab-org/gitaly/internal/config/sentry"
"gitlab.com/gitlab-org/gitaly/internal/log"
+ "gitlab.com/gitlab-org/gitaly/internal/middleware/proxytime"
"gitlab.com/gitlab-org/gitaly/internal/praefect"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
@@ -113,6 +114,8 @@ func configure() (config.Config, error) {
logger.WithField("address", conf.PrometheusListenAddr).Info("Starting prometheus listener")
conf.Prometheus.Configure()
+ metrics.RegisterProxyTime(conf)
+
go func() {
if err := monitoring.Serve(
monitoring.WithListenerAddress(conf.PrometheusListenAddr),
@@ -154,9 +157,10 @@ func run(cfgs []starter.Config, conf config.Config) error {
}
var (
+ tt = proxytime.NewTrailerTracker()
// top level server dependencies
ds = datastore.NewInMemory(conf)
- coordinator = praefect.NewCoordinator(logger, ds, clientConnections, conf, protoregistry.GitalyProtoFileDescriptors...)
+ coordinator = praefect.NewCoordinator(logger, ds, clientConnections, conf, tt, protoregistry.GitalyProtoFileDescriptors...)
repl = praefect.NewReplMgr(
"default",
logger,
@@ -164,7 +168,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
clientConnections,
praefect.WithLatencyMetric(latencyMetric),
praefect.WithQueueMetric(queueMetric))
- srv = praefect.NewServer(coordinator, repl, nil, logger, clientConnections, conf)
+ srv = praefect.NewServer(coordinator, repl, nil, logger, clientConnections, conf, tt)
serverErrors = make(chan error, 1)
)
diff --git a/internal/middleware/proxytime/proxy.go b/internal/middleware/proxytime/proxy.go
index 18f1776f4..0f4f86702 100644
--- a/internal/middleware/proxytime/proxy.go
+++ b/internal/middleware/proxytime/proxy.go
@@ -5,18 +5,17 @@ import (
"strconv"
"time"
- grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
-
"github.com/google/uuid"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
+ grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
"gitlab.com/gitlab-org/gitaly/internal/praefect/metrics"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
const (
- // requestIDKey is the key for a unique request id generated upon every rpc request that goes through praefect
- requestIDKey = "proxy-request-id"
+ // RequestIDKey is the key for a unique request id generated upon every rpc request that goes through praefect
+ RequestIDKey = "proxy-request-id"
)
// Unary is a gRPC server-side interceptor that provides a prometheus metric for the latency praefect adds to every gitaly request.
@@ -26,7 +25,7 @@ func Unary(tracker *TrailerTracker) grpc.UnaryServerInterceptor {
requestID := uuid.New().String()
- resp, err := handler(appendToIncomingContext(ctx, metadata.Pairs(requestIDKey, requestID)), req)
+ resp, err := handler(appendToIncomingContext(ctx, metadata.Pairs(RequestIDKey, requestID)), req)
trailer, trailerErr := tracker.RemoveTrailer(requestID)
if trailerErr != nil {
@@ -39,7 +38,7 @@ func Unary(tracker *TrailerTracker) grpc.UnaryServerInterceptor {
gitalyTime, gitalyTimeErr := strconv.ParseFloat(gitalyTimeTrailer[0], 64)
if gitalyTimeErr == nil {
praefectTime := time.Since(startTime)
- metrics.ProxyTime.Observe(float64(praefectTime) - gitalyTime)
+ metrics.ProxyTime.Observe((float64(praefectTime) - gitalyTime) / float64(time.Second))
}
}
@@ -55,7 +54,7 @@ func Stream(tracker *TrailerTracker) grpc.StreamServerInterceptor {
requestID := uuid.New().String()
wrapped := grpc_middleware.WrapServerStream(ss)
- wrapped.WrappedContext = appendToIncomingContext(ss.Context(), metadata.Pairs(requestIDKey, requestID))
+ wrapped.WrappedContext = appendToIncomingContext(ss.Context(), metadata.Pairs(RequestIDKey, requestID))
err := handler(srv, wrapped)
@@ -70,7 +69,7 @@ func Stream(tracker *TrailerTracker) grpc.StreamServerInterceptor {
gitalyTime, gitalyTimeErr := strconv.ParseFloat(gitalyTimeTrailer[0], 64)
if gitalyTimeErr == nil {
praefectTime := time.Since(startTime)
- metrics.ProxyTime.Observe(float64(praefectTime) - gitalyTime)
+ metrics.ProxyTime.Observe((float64(praefectTime) - gitalyTime) / float64(time.Second))
}
}
diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go
index f9130bfd1..6e78e2a80 100644
--- a/internal/praefect/auth_test.go
+++ b/internal/praefect/auth_test.go
@@ -11,6 +11,7 @@ import (
gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
"gitlab.com/gitlab-org/gitaly/internal/config/auth"
"gitlab.com/gitlab-org/gitaly/internal/log"
+ "gitlab.com/gitlab-org/gitaly/internal/middleware/proxytime"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
@@ -189,15 +190,16 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func
logEntry := log.Default()
ds := datastore.NewInMemory(conf)
+ tt := proxytime.NewTrailerTracker()
clientConnections := conn.NewClientConnections()
clientConnections.RegisterNode("praefect-internal-0", backend, backendToken)
- coordinator := NewCoordinator(logEntry, ds, clientConnections, conf, fd)
+ coordinator := NewCoordinator(logEntry, ds, clientConnections, conf, tt, fd)
replMgr := NewReplMgr("praefect-internal-0", logEntry, ds, clientConnections)
- srv := NewServer(coordinator, replMgr, nil, logEntry, clientConnections, conf)
+ srv := NewServer(coordinator, replMgr, nil, logEntry, clientConnections, conf, tt)
serverSocketPath := testhelper.GetTemporaryGitalySocketFileName()
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index f1b1d5492..95edc167d 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -12,6 +12,8 @@ import (
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/protoc-gen-go/descriptor"
"github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/internal/metadata"
+ "gitlab.com/gitlab-org/gitaly/internal/middleware/proxytime"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
@@ -19,6 +21,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
@@ -37,21 +40,23 @@ type Coordinator struct {
datastore datastore.Datastore
- registry *protoregistry.Registry
- conf config.Config
+ registry *protoregistry.Registry
+ conf config.Config
+ trailerTracker *proxytime.TrailerTracker
}
// NewCoordinator returns a new Coordinator that utilizes the provided logger
-func NewCoordinator(l *logrus.Entry, ds datastore.Datastore, clientConnections *conn.ClientConnections, conf config.Config, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator {
+func NewCoordinator(l *logrus.Entry, ds datastore.Datastore, clientConnections *conn.ClientConnections, conf config.Config, trailerTracker *proxytime.TrailerTracker, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator {
registry := protoregistry.New()
registry.RegisterFiles(fileDescriptors...)
return &Coordinator{
- log: l,
- datastore: ds,
- registry: registry,
- connections: clientConnections,
- conf: conf,
+ log: l,
+ datastore: ds,
+ registry: registry,
+ connections: clientConnections,
+ conf: conf,
+ trailerTracker: trailerTracker,
}
}
@@ -110,7 +115,16 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string,
return nil, fmt.Errorf("unable to find existing client connection for %s", storage)
}
- return proxy.NewStreamParameters(ctx, cc, requestFinalizer, nil), nil
+ var opts []grpc.CallOption
+ proxyRequestID := metadata.GetValue(ctx, proxytime.RequestIDKey)
+ if proxyRequestID != "" {
+ trailer, err := c.trailerTracker.Trailer(proxyRequestID)
+ if err == nil {
+ opts = append(opts, trailer)
+ }
+ }
+
+ return proxy.NewStreamParameters(ctx, cc, requestFinalizer, opts), nil
}
var noopRequestFinalizer = func() {}
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 4283779a1..15e3fe9fe 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -9,6 +9,7 @@ import (
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/internal/log"
+ "gitlab.com/gitlab-org/gitaly/internal/middleware/proxytime"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
@@ -60,7 +61,7 @@ func TestStreamDirector(t *testing.T) {
clientConnections := conn.NewClientConnections()
clientConnections.RegisterNode("praefect-internal-1", fmt.Sprintf("tcp://%s", address), "token")
- coordinator := NewCoordinator(log.Default(), ds, clientConnections, conf)
+ coordinator := NewCoordinator(log.Default(), ds, clientConnections, conf, proxytime.NewTrailerTracker())
require.NoError(t, coordinator.RegisterProtos(protoregistry.GitalyProtoFileDescriptors...))
frame, err := proto.Marshal(&gitalypb.FetchIntoObjectPoolRequest{
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index 6e8791afb..fc8b38e52 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -13,6 +13,7 @@ import (
"gitlab.com/gitlab-org/gitaly/client"
internalauth "gitlab.com/gitlab-org/gitaly/internal/config/auth"
"gitlab.com/gitlab-org/gitaly/internal/log"
+ "gitlab.com/gitlab-org/gitaly/internal/middleware/proxytime"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
@@ -72,8 +73,9 @@ func testConfig(backends int) config.Config {
// injection
func setupServer(t testing.TB, conf config.Config, clientCC *conn.ClientConnections, l *logrus.Entry, fds []*descriptor.FileDescriptorProto) (*datastore.MemoryDatastore, *Server) {
var (
+ tt = proxytime.NewTrailerTracker()
ds = datastore.NewInMemory(conf)
- coordinator = NewCoordinator(l, ds, clientCC, conf, fds...)
+ coordinator = NewCoordinator(l, ds, clientCC, conf, tt, fds...)
)
var defaultNode *models.Node
@@ -97,6 +99,7 @@ func setupServer(t testing.TB, conf config.Config, clientCC *conn.ClientConnecti
l,
clientCC,
conf,
+ tt,
)
return ds, server
@@ -177,8 +180,9 @@ func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.Client
ds := datastore.NewInMemory(conf)
logEntry := log.Default()
+ tt := proxytime.NewTrailerTracker()
- coordinator := NewCoordinator(logEntry, ds, clientCC, conf, protoregistry.GitalyProtoFileDescriptors...)
+ coordinator := NewCoordinator(logEntry, ds, clientCC, conf, tt, protoregistry.GitalyProtoFileDescriptors...)
replmgr := NewReplMgr(
"",
@@ -195,6 +199,7 @@ func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.Client
logEntry,
clientCC,
conf,
+ tt,
)
listener, port := listenAvailPort(t)
diff --git a/internal/praefect/metrics/prometheus.go b/internal/praefect/metrics/prometheus.go
index 4fcec26c8..61aa26ed9 100644
--- a/internal/praefect/metrics/prometheus.go
+++ b/internal/praefect/metrics/prometheus.go
@@ -54,12 +54,12 @@ var (
ProxyTime prometheus.Histogram
)
-// Register registers praefect prometheus metrics
-func Register(c config.Config) {
- once.Do(func() { register(c) })
+// RegisterProxyTime registers praefect prometheus metrics
+func RegisterProxyTime(c config.Config) {
+ once.Do(func() { registerProxyTime(c) })
}
-func register(c config.Config) {
+func registerProxyTime(c config.Config) {
ProxyTime = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "praefect_proxy_time",
Help: "Latency added by praefect",
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index ceb1b8a76..141be7150 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -15,6 +15,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/middleware/cancelhandler"
"gitlab.com/gitlab-org/gitaly/internal/middleware/metadatahandler"
"gitlab.com/gitlab-org/gitaly/internal/middleware/panichandler"
+ "gitlab.com/gitlab-org/gitaly/internal/middleware/proxytime"
"gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
@@ -60,7 +61,7 @@ func (srv *Server) warnDupeAddrs(c config.Config) {
// NewServer returns an initialized praefect gPRC proxy server configured
// with the provided gRPC server options
-func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *logrus.Entry, clientConnections *conn.ClientConnections, conf config.Config) *Server {
+func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *logrus.Entry, clientConnections *conn.ClientConnections, conf config.Config, tt *proxytime.TrailerTracker) *Server {
ctxTagOpts := []grpc_ctxtags.Option{
grpc_ctxtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor),
}
@@ -68,6 +69,7 @@ func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *lo
grpcOpts = append(grpcOpts, proxyRequiredOpts(c.streamDirector)...)
grpcOpts = append(grpcOpts, []grpc.ServerOption{
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
+ proxytime.Stream(tt),
grpc_ctxtags.StreamServerInterceptor(ctxTagOpts...),
grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler
metadatahandler.StreamInterceptor,
@@ -82,6 +84,7 @@ func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *lo
panichandler.StreamPanicHandler,
)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
+ proxytime.Unary(tt),
grpc_ctxtags.UnaryServerInterceptor(ctxTagOpts...),
grpccorrelation.UnaryServerCorrelationInterceptor(), // Must be above the metadata handler
metadatahandler.UnaryInterceptor,
diff --git a/internal/server/server.go b/internal/server/server.go
index 250749248..b0cfcc5ec 100644
--- a/internal/server/server.go
+++ b/internal/server/server.go
@@ -20,6 +20,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/middleware/limithandler"
"gitlab.com/gitlab-org/gitaly/internal/middleware/metadatahandler"
"gitlab.com/gitlab-org/gitaly/internal/middleware/panichandler"
+ "gitlab.com/gitlab-org/gitaly/internal/middleware/proxytime"
"gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/internal/rubyserver"
@@ -78,6 +79,7 @@ func createNewServer(rubyServer *rubyserver.Server, secure bool) *grpc.Server {
opts := []grpc.ServerOption{
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
+ proxytime.StreamGitalyTime,
grpc_ctxtags.StreamServerInterceptor(ctxTagOpts...),
grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler
metadatahandler.StreamInterceptor,
@@ -94,6 +96,7 @@ func createNewServer(rubyServer *rubyserver.Server, secure bool) *grpc.Server {
panichandler.StreamPanicHandler,
)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
+ proxytime.UnaryGitalyTime,
grpc_ctxtags.UnaryServerInterceptor(ctxTagOpts...),
grpccorrelation.UnaryServerCorrelationInterceptor(), // Must be above the metadata handler
metadatahandler.UnaryInterceptor,