diff options
author | James Fargher <proglottis@gmail.com> | 2021-04-08 07:00:45 +0300 |
---|---|---|
committer | James Fargher <proglottis@gmail.com> | 2021-04-08 07:00:45 +0300 |
commit | 7c16834a6dadf083438dd19faea1cce6df746e5d (patch) | |
tree | 1b300b6934669f5d345bdad0f053fa45865732af | |
parent | 98bc69b160d4376ae2932626bcc7c34aba207ff7 (diff) | |
parent | e27e64d968b7415277dd1b7feabf84d95f8ca5f9 (diff) |
Merge branch 'smh-inject-dialer' into 'master'
Take in backchannel dial options to ServerHandshaker
See merge request gitlab-org/gitaly!3342
-rw-r--r-- | internal/backchannel/backchannel_example_test.go | 2 | ||||
-rw-r--r-- | internal/backchannel/backchannel_test.go | 17 | ||||
-rw-r--r-- | internal/backchannel/server.go | 16 | ||||
-rw-r--r-- | internal/gitaly/client/dial.go | 13 | ||||
-rw-r--r-- | internal/gitaly/client/dial_test.go | 2 | ||||
-rw-r--r-- | internal/gitaly/server/server.go | 3 | ||||
-rw-r--r-- | internal/praefect/nodes/sql_elector_test.go | 2 |
7 files changed, 41 insertions, 14 deletions
diff --git a/internal/backchannel/backchannel_example_test.go b/internal/backchannel/backchannel_example_test.go index 2ec35f116..1def0c98c 100644 --- a/internal/backchannel/backchannel_example_test.go +++ b/internal/backchannel/backchannel_example_test.go @@ -30,7 +30,7 @@ func Example() { // it creates the backchannel connection and stores it into the registry. For each connection, // the ServerHandshaker passes down the peer ID via the context. The peer ID identifies a // backchannel connection. - handshaker := backchannel.NewServerHandshaker(logger, backchannel.Insecure(), registry) + handshaker := backchannel.NewServerHandshaker(logger, backchannel.Insecure(), registry, nil) // Create the server srv := grpc.NewServer( diff --git a/internal/backchannel/backchannel_test.go b/internal/backchannel/backchannel_test.go index a6238c552..2c820ec44 100644 --- a/internal/backchannel/backchannel_test.go +++ b/internal/backchannel/backchannel_test.go @@ -6,6 +6,7 @@ import ( "io" "net" "sync" + "sync/atomic" "testing" "github.com/stretchr/testify/assert" @@ -27,8 +28,19 @@ func (m mockTransactionServer) VoteTransaction(ctx context.Context, req *gitalyp } func TestBackchannel_concurrentRequestsFromMultipleClients(t *testing.T) { + var interceptorInvoked int32 registry := NewRegistry() - handshaker := NewServerHandshaker(testhelper.DiscardTestEntry(t), Insecure(), registry) + handshaker := NewServerHandshaker( + testhelper.DiscardTestEntry(t), + Insecure(), + registry, + []grpc.DialOption{ + grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + atomic.AddInt32(&interceptorInvoked, 1) + return invoker(ctx, method, req, reply, cc, opts...) + }), + }, + ) ln, err := net.Listen("tcp", "localhost:0") require.NoError(t, err) @@ -132,6 +144,7 @@ func TestBackchannel_concurrentRequestsFromMultipleClients(t *testing.T) { // Wait for the clients to finish their calls and close their connections. wg.Wait() + require.Equal(t, interceptorInvoked, int32(50)) } type mockSSHService struct { @@ -161,7 +174,7 @@ func Benchmark(b *testing.B) { var serverOpts []grpc.ServerOption if tc.multiplexed { serverOpts = []grpc.ServerOption{ - grpc.Creds(NewServerHandshaker(testhelper.DiscardTestEntry(b), Insecure(), NewRegistry())), + grpc.Creds(NewServerHandshaker(testhelper.DiscardTestEntry(b), Insecure(), NewRegistry(), nil)), } } diff --git a/internal/backchannel/server.go b/internal/backchannel/server.go index 5a82d68b9..c22b46e37 100644 --- a/internal/backchannel/server.go +++ b/internal/backchannel/server.go @@ -54,14 +54,19 @@ type ServerHandshaker struct { registry *Registry logger *logrus.Entry credentials.TransportCredentials + dialOpts []grpc.DialOption } -// NewServerHandshaker returns a new server side implementation of the backchannel. -func NewServerHandshaker(logger *logrus.Entry, tc credentials.TransportCredentials, reg *Registry) credentials.TransportCredentials { +// NewServerHandshaker returns a new server side implementation of the backchannel. The provided TransportCredentials +// are handshaked prior to initializing the multiplexing session. The Registry is used to store the backchannel connections. +// DialOptions can be used to set custom dial options for the backchannel connections. They must not contain a dialer or +// transport credentials as those set by the handshaker. +func NewServerHandshaker(logger *logrus.Entry, tc credentials.TransportCredentials, reg *Registry, dialOpts []grpc.DialOption) credentials.TransportCredentials { return ServerHandshaker{ TransportCredentials: tc, registry: reg, logger: logger, + dialOpts: dialOpts, } } @@ -124,8 +129,11 @@ func (s ServerHandshaker) ServerHandshake(conn net.Conn) (net.Conn, credentials. // WithInsecure is used as the multiplexer operates within a TLS session already if one is configured. backchannelConn, err := grpc.Dial( "multiplexed/"+conn.RemoteAddr().String(), - grpc.WithInsecure(), - grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { return muxSession.Open() }), + append( + s.dialOpts, + grpc.WithInsecure(), + grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { return muxSession.Open() }), + )..., ) if err != nil { logger.Close() diff --git a/internal/gitaly/client/dial.go b/internal/gitaly/client/dial.go index 5705d9d14..5b6fe226d 100644 --- a/internal/gitaly/client/dial.go +++ b/internal/gitaly/client/dial.go @@ -127,10 +127,7 @@ func Dial(ctx context.Context, rawAddress string, connOpts []grpc.DialOption, ha Time: 20 * time.Second, PermitWithoutStream: true, }), - grpc.WithChainUnaryInterceptor( - grpctracing.UnaryClientTracingInterceptor(), - grpccorrelation.UnaryClientCorrelationInterceptor(), - ), + UnaryInterceptor(), grpc.WithChainStreamInterceptor( grpctracing.StreamClientTracingInterceptor(), grpccorrelation.StreamClientCorrelationInterceptor(), @@ -144,3 +141,11 @@ func Dial(ctx context.Context, rawAddress string, connOpts []grpc.DialOption, ha return conn, nil } + +// UnaryInterceptor returns the unary interceptors that should be configured for a client. +func UnaryInterceptor() grpc.DialOption { + return grpc.WithChainUnaryInterceptor( + grpctracing.UnaryClientTracingInterceptor(), + grpccorrelation.UnaryClientCorrelationInterceptor(), + ) +} diff --git a/internal/gitaly/client/dial_test.go b/internal/gitaly/client/dial_test.go index 666db52c7..d74bffe73 100644 --- a/internal/gitaly/client/dial_test.go +++ b/internal/gitaly/client/dial_test.go @@ -21,7 +21,7 @@ func TestDial(t *testing.T) { logger := testhelper.DiscardTestEntry(t) srv := grpc.NewServer( - grpc.Creds(backchannel.NewServerHandshaker(logger, backchannel.Insecure(), backchannel.NewRegistry())), + grpc.Creds(backchannel.NewServerHandshaker(logger, backchannel.Insecure(), backchannel.NewRegistry(), nil)), grpc.UnknownServiceHandler(func(srv interface{}, stream grpc.ServerStream) error { _, err := backchannel.GetPeerID(stream.Context()) if err == backchannel.ErrNonMultiplexedConnection { diff --git a/internal/gitaly/server/server.go b/internal/gitaly/server/server.go index a872fab68..2864fde85 100644 --- a/internal/gitaly/server/server.go +++ b/internal/gitaly/server/server.go @@ -17,6 +17,7 @@ import ( log "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/internal/backchannel" diskcache "gitlab.com/gitlab-org/gitaly/internal/cache" + "gitlab.com/gitlab-org/gitaly/internal/gitaly/client" "gitlab.com/gitlab-org/gitaly/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/internal/gitaly/server/auth" "gitlab.com/gitlab-org/gitaly/internal/helper/fieldextractors" @@ -114,7 +115,7 @@ func New(secure bool, cfg config.Cfg, logrusEntry *log.Entry, registry *backchan } opts := []grpc.ServerOption{ - grpc.Creds(backchannel.NewServerHandshaker(logrusEntry, transportCredentials, registry)), + grpc.Creds(backchannel.NewServerHandshaker(logrusEntry, transportCredentials, registry, []grpc.DialOption{client.UnaryInterceptor()})), grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( grpc_ctxtags.StreamServerInterceptor(ctxTagOpts...), grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler diff --git a/internal/praefect/nodes/sql_elector_test.go b/internal/praefect/nodes/sql_elector_test.go index 68398a8c2..5ddc74183 100644 --- a/internal/praefect/nodes/sql_elector_test.go +++ b/internal/praefect/nodes/sql_elector_test.go @@ -438,7 +438,7 @@ func TestConnectionMultiplexing(t *testing.T) { logger := testhelper.DiscardTestEntry(t) srv := grpc.NewServer( - grpc.Creds(backchannel.NewServerHandshaker(logger, backchannel.Insecure(), backchannel.NewRegistry())), + grpc.Creds(backchannel.NewServerHandshaker(logger, backchannel.Insecure(), backchannel.NewRegistry(), nil)), grpc.UnknownServiceHandler(func(srv interface{}, stream grpc.ServerStream) error { _, err := backchannel.GetPeerID(stream.Context()) if err == backchannel.ErrNonMultiplexedConnection { |