Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2021-04-07 11:41:43 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2021-04-07 14:31:22 +0300
commite27e64d968b7415277dd1b7feabf84d95f8ca5f9 (patch)
treedd9b8c80e9f61b759a24cadcf32fd8dadeec48f4
parentb6667cd49a3c9b804d419531fc65180d8d15514b (diff)
Take in backchannel dial options to ServerHandshaker
The backchannel connection might need some custom dial options to be configured, such as the correlation id and tracing interceptors required by Prafect and Gitaly. This commit allows for passing custom dial options that are used when dialing the backchannel connection.
-rw-r--r--internal/backchannel/backchannel_example_test.go2
-rw-r--r--internal/backchannel/backchannel_test.go17
-rw-r--r--internal/backchannel/server.go16
-rw-r--r--internal/gitaly/client/dial_test.go2
-rw-r--r--internal/gitaly/server/server.go3
-rw-r--r--internal/praefect/nodes/sql_elector_test.go2
6 files changed, 32 insertions, 10 deletions
diff --git a/internal/backchannel/backchannel_example_test.go b/internal/backchannel/backchannel_example_test.go
index 2ec35f116..1def0c98c 100644
--- a/internal/backchannel/backchannel_example_test.go
+++ b/internal/backchannel/backchannel_example_test.go
@@ -30,7 +30,7 @@ func Example() {
// it creates the backchannel connection and stores it into the registry. For each connection,
// the ServerHandshaker passes down the peer ID via the context. The peer ID identifies a
// backchannel connection.
- handshaker := backchannel.NewServerHandshaker(logger, backchannel.Insecure(), registry)
+ handshaker := backchannel.NewServerHandshaker(logger, backchannel.Insecure(), registry, nil)
// Create the server
srv := grpc.NewServer(
diff --git a/internal/backchannel/backchannel_test.go b/internal/backchannel/backchannel_test.go
index a6238c552..2c820ec44 100644
--- a/internal/backchannel/backchannel_test.go
+++ b/internal/backchannel/backchannel_test.go
@@ -6,6 +6,7 @@ import (
"io"
"net"
"sync"
+ "sync/atomic"
"testing"
"github.com/stretchr/testify/assert"
@@ -27,8 +28,19 @@ func (m mockTransactionServer) VoteTransaction(ctx context.Context, req *gitalyp
}
func TestBackchannel_concurrentRequestsFromMultipleClients(t *testing.T) {
+ var interceptorInvoked int32
registry := NewRegistry()
- handshaker := NewServerHandshaker(testhelper.DiscardTestEntry(t), Insecure(), registry)
+ handshaker := NewServerHandshaker(
+ testhelper.DiscardTestEntry(t),
+ Insecure(),
+ registry,
+ []grpc.DialOption{
+ grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
+ atomic.AddInt32(&interceptorInvoked, 1)
+ return invoker(ctx, method, req, reply, cc, opts...)
+ }),
+ },
+ )
ln, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)
@@ -132,6 +144,7 @@ func TestBackchannel_concurrentRequestsFromMultipleClients(t *testing.T) {
// Wait for the clients to finish their calls and close their connections.
wg.Wait()
+ require.Equal(t, interceptorInvoked, int32(50))
}
type mockSSHService struct {
@@ -161,7 +174,7 @@ func Benchmark(b *testing.B) {
var serverOpts []grpc.ServerOption
if tc.multiplexed {
serverOpts = []grpc.ServerOption{
- grpc.Creds(NewServerHandshaker(testhelper.DiscardTestEntry(b), Insecure(), NewRegistry())),
+ grpc.Creds(NewServerHandshaker(testhelper.DiscardTestEntry(b), Insecure(), NewRegistry(), nil)),
}
}
diff --git a/internal/backchannel/server.go b/internal/backchannel/server.go
index 5a82d68b9..c22b46e37 100644
--- a/internal/backchannel/server.go
+++ b/internal/backchannel/server.go
@@ -54,14 +54,19 @@ type ServerHandshaker struct {
registry *Registry
logger *logrus.Entry
credentials.TransportCredentials
+ dialOpts []grpc.DialOption
}
-// NewServerHandshaker returns a new server side implementation of the backchannel.
-func NewServerHandshaker(logger *logrus.Entry, tc credentials.TransportCredentials, reg *Registry) credentials.TransportCredentials {
+// NewServerHandshaker returns a new server side implementation of the backchannel. The provided TransportCredentials
+// are handshaked prior to initializing the multiplexing session. The Registry is used to store the backchannel connections.
+// DialOptions can be used to set custom dial options for the backchannel connections. They must not contain a dialer or
+// transport credentials as those set by the handshaker.
+func NewServerHandshaker(logger *logrus.Entry, tc credentials.TransportCredentials, reg *Registry, dialOpts []grpc.DialOption) credentials.TransportCredentials {
return ServerHandshaker{
TransportCredentials: tc,
registry: reg,
logger: logger,
+ dialOpts: dialOpts,
}
}
@@ -124,8 +129,11 @@ func (s ServerHandshaker) ServerHandshake(conn net.Conn) (net.Conn, credentials.
// WithInsecure is used as the multiplexer operates within a TLS session already if one is configured.
backchannelConn, err := grpc.Dial(
"multiplexed/"+conn.RemoteAddr().String(),
- grpc.WithInsecure(),
- grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { return muxSession.Open() }),
+ append(
+ s.dialOpts,
+ grpc.WithInsecure(),
+ grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { return muxSession.Open() }),
+ )...,
)
if err != nil {
logger.Close()
diff --git a/internal/gitaly/client/dial_test.go b/internal/gitaly/client/dial_test.go
index 666db52c7..d74bffe73 100644
--- a/internal/gitaly/client/dial_test.go
+++ b/internal/gitaly/client/dial_test.go
@@ -21,7 +21,7 @@ func TestDial(t *testing.T) {
logger := testhelper.DiscardTestEntry(t)
srv := grpc.NewServer(
- grpc.Creds(backchannel.NewServerHandshaker(logger, backchannel.Insecure(), backchannel.NewRegistry())),
+ grpc.Creds(backchannel.NewServerHandshaker(logger, backchannel.Insecure(), backchannel.NewRegistry(), nil)),
grpc.UnknownServiceHandler(func(srv interface{}, stream grpc.ServerStream) error {
_, err := backchannel.GetPeerID(stream.Context())
if err == backchannel.ErrNonMultiplexedConnection {
diff --git a/internal/gitaly/server/server.go b/internal/gitaly/server/server.go
index a872fab68..2864fde85 100644
--- a/internal/gitaly/server/server.go
+++ b/internal/gitaly/server/server.go
@@ -17,6 +17,7 @@ import (
log "github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/internal/backchannel"
diskcache "gitlab.com/gitlab-org/gitaly/internal/cache"
+ "gitlab.com/gitlab-org/gitaly/internal/gitaly/client"
"gitlab.com/gitlab-org/gitaly/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/internal/gitaly/server/auth"
"gitlab.com/gitlab-org/gitaly/internal/helper/fieldextractors"
@@ -114,7 +115,7 @@ func New(secure bool, cfg config.Cfg, logrusEntry *log.Entry, registry *backchan
}
opts := []grpc.ServerOption{
- grpc.Creds(backchannel.NewServerHandshaker(logrusEntry, transportCredentials, registry)),
+ grpc.Creds(backchannel.NewServerHandshaker(logrusEntry, transportCredentials, registry, []grpc.DialOption{client.UnaryInterceptor()})),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
grpc_ctxtags.StreamServerInterceptor(ctxTagOpts...),
grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler
diff --git a/internal/praefect/nodes/sql_elector_test.go b/internal/praefect/nodes/sql_elector_test.go
index 68398a8c2..5ddc74183 100644
--- a/internal/praefect/nodes/sql_elector_test.go
+++ b/internal/praefect/nodes/sql_elector_test.go
@@ -438,7 +438,7 @@ func TestConnectionMultiplexing(t *testing.T) {
logger := testhelper.DiscardTestEntry(t)
srv := grpc.NewServer(
- grpc.Creds(backchannel.NewServerHandshaker(logger, backchannel.Insecure(), backchannel.NewRegistry())),
+ grpc.Creds(backchannel.NewServerHandshaker(logger, backchannel.Insecure(), backchannel.NewRegistry(), nil)),
grpc.UnknownServiceHandler(func(srv interface{}, stream grpc.ServerStream) error {
_, err := backchannel.GetPeerID(stream.Context())
if err == backchannel.ErrNonMultiplexedConnection {