diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2021-04-07 11:41:43 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2021-04-07 14:31:22 +0300 |
commit | e27e64d968b7415277dd1b7feabf84d95f8ca5f9 (patch) | |
tree | dd9b8c80e9f61b759a24cadcf32fd8dadeec48f4 | |
parent | b6667cd49a3c9b804d419531fc65180d8d15514b (diff) |
Take in backchannel dial options to ServerHandshaker
The backchannel connection might need some custom dial options to
be configured, such as the correlation id and tracing interceptors
required by Prafect and Gitaly. This commit allows for passing custom
dial options that are used when dialing the backchannel connection.
-rw-r--r-- | internal/backchannel/backchannel_example_test.go | 2 | ||||
-rw-r--r-- | internal/backchannel/backchannel_test.go | 17 | ||||
-rw-r--r-- | internal/backchannel/server.go | 16 | ||||
-rw-r--r-- | internal/gitaly/client/dial_test.go | 2 | ||||
-rw-r--r-- | internal/gitaly/server/server.go | 3 | ||||
-rw-r--r-- | internal/praefect/nodes/sql_elector_test.go | 2 |
6 files changed, 32 insertions, 10 deletions
diff --git a/internal/backchannel/backchannel_example_test.go b/internal/backchannel/backchannel_example_test.go index 2ec35f116..1def0c98c 100644 --- a/internal/backchannel/backchannel_example_test.go +++ b/internal/backchannel/backchannel_example_test.go @@ -30,7 +30,7 @@ func Example() { // it creates the backchannel connection and stores it into the registry. For each connection, // the ServerHandshaker passes down the peer ID via the context. The peer ID identifies a // backchannel connection. - handshaker := backchannel.NewServerHandshaker(logger, backchannel.Insecure(), registry) + handshaker := backchannel.NewServerHandshaker(logger, backchannel.Insecure(), registry, nil) // Create the server srv := grpc.NewServer( diff --git a/internal/backchannel/backchannel_test.go b/internal/backchannel/backchannel_test.go index a6238c552..2c820ec44 100644 --- a/internal/backchannel/backchannel_test.go +++ b/internal/backchannel/backchannel_test.go @@ -6,6 +6,7 @@ import ( "io" "net" "sync" + "sync/atomic" "testing" "github.com/stretchr/testify/assert" @@ -27,8 +28,19 @@ func (m mockTransactionServer) VoteTransaction(ctx context.Context, req *gitalyp } func TestBackchannel_concurrentRequestsFromMultipleClients(t *testing.T) { + var interceptorInvoked int32 registry := NewRegistry() - handshaker := NewServerHandshaker(testhelper.DiscardTestEntry(t), Insecure(), registry) + handshaker := NewServerHandshaker( + testhelper.DiscardTestEntry(t), + Insecure(), + registry, + []grpc.DialOption{ + grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + atomic.AddInt32(&interceptorInvoked, 1) + return invoker(ctx, method, req, reply, cc, opts...) + }), + }, + ) ln, err := net.Listen("tcp", "localhost:0") require.NoError(t, err) @@ -132,6 +144,7 @@ func TestBackchannel_concurrentRequestsFromMultipleClients(t *testing.T) { // Wait for the clients to finish their calls and close their connections. wg.Wait() + require.Equal(t, interceptorInvoked, int32(50)) } type mockSSHService struct { @@ -161,7 +174,7 @@ func Benchmark(b *testing.B) { var serverOpts []grpc.ServerOption if tc.multiplexed { serverOpts = []grpc.ServerOption{ - grpc.Creds(NewServerHandshaker(testhelper.DiscardTestEntry(b), Insecure(), NewRegistry())), + grpc.Creds(NewServerHandshaker(testhelper.DiscardTestEntry(b), Insecure(), NewRegistry(), nil)), } } diff --git a/internal/backchannel/server.go b/internal/backchannel/server.go index 5a82d68b9..c22b46e37 100644 --- a/internal/backchannel/server.go +++ b/internal/backchannel/server.go @@ -54,14 +54,19 @@ type ServerHandshaker struct { registry *Registry logger *logrus.Entry credentials.TransportCredentials + dialOpts []grpc.DialOption } -// NewServerHandshaker returns a new server side implementation of the backchannel. -func NewServerHandshaker(logger *logrus.Entry, tc credentials.TransportCredentials, reg *Registry) credentials.TransportCredentials { +// NewServerHandshaker returns a new server side implementation of the backchannel. The provided TransportCredentials +// are handshaked prior to initializing the multiplexing session. The Registry is used to store the backchannel connections. +// DialOptions can be used to set custom dial options for the backchannel connections. They must not contain a dialer or +// transport credentials as those set by the handshaker. +func NewServerHandshaker(logger *logrus.Entry, tc credentials.TransportCredentials, reg *Registry, dialOpts []grpc.DialOption) credentials.TransportCredentials { return ServerHandshaker{ TransportCredentials: tc, registry: reg, logger: logger, + dialOpts: dialOpts, } } @@ -124,8 +129,11 @@ func (s ServerHandshaker) ServerHandshake(conn net.Conn) (net.Conn, credentials. // WithInsecure is used as the multiplexer operates within a TLS session already if one is configured. backchannelConn, err := grpc.Dial( "multiplexed/"+conn.RemoteAddr().String(), - grpc.WithInsecure(), - grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { return muxSession.Open() }), + append( + s.dialOpts, + grpc.WithInsecure(), + grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { return muxSession.Open() }), + )..., ) if err != nil { logger.Close() diff --git a/internal/gitaly/client/dial_test.go b/internal/gitaly/client/dial_test.go index 666db52c7..d74bffe73 100644 --- a/internal/gitaly/client/dial_test.go +++ b/internal/gitaly/client/dial_test.go @@ -21,7 +21,7 @@ func TestDial(t *testing.T) { logger := testhelper.DiscardTestEntry(t) srv := grpc.NewServer( - grpc.Creds(backchannel.NewServerHandshaker(logger, backchannel.Insecure(), backchannel.NewRegistry())), + grpc.Creds(backchannel.NewServerHandshaker(logger, backchannel.Insecure(), backchannel.NewRegistry(), nil)), grpc.UnknownServiceHandler(func(srv interface{}, stream grpc.ServerStream) error { _, err := backchannel.GetPeerID(stream.Context()) if err == backchannel.ErrNonMultiplexedConnection { diff --git a/internal/gitaly/server/server.go b/internal/gitaly/server/server.go index a872fab68..2864fde85 100644 --- a/internal/gitaly/server/server.go +++ b/internal/gitaly/server/server.go @@ -17,6 +17,7 @@ import ( log "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/internal/backchannel" diskcache "gitlab.com/gitlab-org/gitaly/internal/cache" + "gitlab.com/gitlab-org/gitaly/internal/gitaly/client" "gitlab.com/gitlab-org/gitaly/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/internal/gitaly/server/auth" "gitlab.com/gitlab-org/gitaly/internal/helper/fieldextractors" @@ -114,7 +115,7 @@ func New(secure bool, cfg config.Cfg, logrusEntry *log.Entry, registry *backchan } opts := []grpc.ServerOption{ - grpc.Creds(backchannel.NewServerHandshaker(logrusEntry, transportCredentials, registry)), + grpc.Creds(backchannel.NewServerHandshaker(logrusEntry, transportCredentials, registry, []grpc.DialOption{client.UnaryInterceptor()})), grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( grpc_ctxtags.StreamServerInterceptor(ctxTagOpts...), grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler diff --git a/internal/praefect/nodes/sql_elector_test.go b/internal/praefect/nodes/sql_elector_test.go index 68398a8c2..5ddc74183 100644 --- a/internal/praefect/nodes/sql_elector_test.go +++ b/internal/praefect/nodes/sql_elector_test.go @@ -438,7 +438,7 @@ func TestConnectionMultiplexing(t *testing.T) { logger := testhelper.DiscardTestEntry(t) srv := grpc.NewServer( - grpc.Creds(backchannel.NewServerHandshaker(logger, backchannel.Insecure(), backchannel.NewRegistry())), + grpc.Creds(backchannel.NewServerHandshaker(logger, backchannel.Insecure(), backchannel.NewRegistry(), nil)), grpc.UnknownServiceHandler(func(srv interface{}, stream grpc.ServerStream) error { _, err := backchannel.GetPeerID(stream.Context()) if err == backchannel.ErrNonMultiplexedConnection { |