Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames Fargher <proglottis@gmail.com>2021-04-08 07:00:45 +0300
committerJames Fargher <proglottis@gmail.com>2021-04-08 07:00:45 +0300
commit7c16834a6dadf083438dd19faea1cce6df746e5d (patch)
tree1b300b6934669f5d345bdad0f053fa45865732af
parent98bc69b160d4376ae2932626bcc7c34aba207ff7 (diff)
parente27e64d968b7415277dd1b7feabf84d95f8ca5f9 (diff)
Merge branch 'smh-inject-dialer' into 'master'
Take in backchannel dial options to ServerHandshaker See merge request gitlab-org/gitaly!3342
-rw-r--r--internal/backchannel/backchannel_example_test.go2
-rw-r--r--internal/backchannel/backchannel_test.go17
-rw-r--r--internal/backchannel/server.go16
-rw-r--r--internal/gitaly/client/dial.go13
-rw-r--r--internal/gitaly/client/dial_test.go2
-rw-r--r--internal/gitaly/server/server.go3
-rw-r--r--internal/praefect/nodes/sql_elector_test.go2
7 files changed, 41 insertions, 14 deletions
diff --git a/internal/backchannel/backchannel_example_test.go b/internal/backchannel/backchannel_example_test.go
index 2ec35f116..1def0c98c 100644
--- a/internal/backchannel/backchannel_example_test.go
+++ b/internal/backchannel/backchannel_example_test.go
@@ -30,7 +30,7 @@ func Example() {
// it creates the backchannel connection and stores it into the registry. For each connection,
// the ServerHandshaker passes down the peer ID via the context. The peer ID identifies a
// backchannel connection.
- handshaker := backchannel.NewServerHandshaker(logger, backchannel.Insecure(), registry)
+ handshaker := backchannel.NewServerHandshaker(logger, backchannel.Insecure(), registry, nil)
// Create the server
srv := grpc.NewServer(
diff --git a/internal/backchannel/backchannel_test.go b/internal/backchannel/backchannel_test.go
index a6238c552..2c820ec44 100644
--- a/internal/backchannel/backchannel_test.go
+++ b/internal/backchannel/backchannel_test.go
@@ -6,6 +6,7 @@ import (
"io"
"net"
"sync"
+ "sync/atomic"
"testing"
"github.com/stretchr/testify/assert"
@@ -27,8 +28,19 @@ func (m mockTransactionServer) VoteTransaction(ctx context.Context, req *gitalyp
}
func TestBackchannel_concurrentRequestsFromMultipleClients(t *testing.T) {
+ var interceptorInvoked int32
registry := NewRegistry()
- handshaker := NewServerHandshaker(testhelper.DiscardTestEntry(t), Insecure(), registry)
+ handshaker := NewServerHandshaker(
+ testhelper.DiscardTestEntry(t),
+ Insecure(),
+ registry,
+ []grpc.DialOption{
+ grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
+ atomic.AddInt32(&interceptorInvoked, 1)
+ return invoker(ctx, method, req, reply, cc, opts...)
+ }),
+ },
+ )
ln, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)
@@ -132,6 +144,7 @@ func TestBackchannel_concurrentRequestsFromMultipleClients(t *testing.T) {
// Wait for the clients to finish their calls and close their connections.
wg.Wait()
+ require.Equal(t, interceptorInvoked, int32(50))
}
type mockSSHService struct {
@@ -161,7 +174,7 @@ func Benchmark(b *testing.B) {
var serverOpts []grpc.ServerOption
if tc.multiplexed {
serverOpts = []grpc.ServerOption{
- grpc.Creds(NewServerHandshaker(testhelper.DiscardTestEntry(b), Insecure(), NewRegistry())),
+ grpc.Creds(NewServerHandshaker(testhelper.DiscardTestEntry(b), Insecure(), NewRegistry(), nil)),
}
}
diff --git a/internal/backchannel/server.go b/internal/backchannel/server.go
index 5a82d68b9..c22b46e37 100644
--- a/internal/backchannel/server.go
+++ b/internal/backchannel/server.go
@@ -54,14 +54,19 @@ type ServerHandshaker struct {
registry *Registry
logger *logrus.Entry
credentials.TransportCredentials
+ dialOpts []grpc.DialOption
}
-// NewServerHandshaker returns a new server side implementation of the backchannel.
-func NewServerHandshaker(logger *logrus.Entry, tc credentials.TransportCredentials, reg *Registry) credentials.TransportCredentials {
+// NewServerHandshaker returns a new server side implementation of the backchannel. The provided TransportCredentials
+// are handshaked prior to initializing the multiplexing session. The Registry is used to store the backchannel connections.
+// DialOptions can be used to set custom dial options for the backchannel connections. They must not contain a dialer or
+// transport credentials as those set by the handshaker.
+func NewServerHandshaker(logger *logrus.Entry, tc credentials.TransportCredentials, reg *Registry, dialOpts []grpc.DialOption) credentials.TransportCredentials {
return ServerHandshaker{
TransportCredentials: tc,
registry: reg,
logger: logger,
+ dialOpts: dialOpts,
}
}
@@ -124,8 +129,11 @@ func (s ServerHandshaker) ServerHandshake(conn net.Conn) (net.Conn, credentials.
// WithInsecure is used as the multiplexer operates within a TLS session already if one is configured.
backchannelConn, err := grpc.Dial(
"multiplexed/"+conn.RemoteAddr().String(),
- grpc.WithInsecure(),
- grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { return muxSession.Open() }),
+ append(
+ s.dialOpts,
+ grpc.WithInsecure(),
+ grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { return muxSession.Open() }),
+ )...,
)
if err != nil {
logger.Close()
diff --git a/internal/gitaly/client/dial.go b/internal/gitaly/client/dial.go
index 5705d9d14..5b6fe226d 100644
--- a/internal/gitaly/client/dial.go
+++ b/internal/gitaly/client/dial.go
@@ -127,10 +127,7 @@ func Dial(ctx context.Context, rawAddress string, connOpts []grpc.DialOption, ha
Time: 20 * time.Second,
PermitWithoutStream: true,
}),
- grpc.WithChainUnaryInterceptor(
- grpctracing.UnaryClientTracingInterceptor(),
- grpccorrelation.UnaryClientCorrelationInterceptor(),
- ),
+ UnaryInterceptor(),
grpc.WithChainStreamInterceptor(
grpctracing.StreamClientTracingInterceptor(),
grpccorrelation.StreamClientCorrelationInterceptor(),
@@ -144,3 +141,11 @@ func Dial(ctx context.Context, rawAddress string, connOpts []grpc.DialOption, ha
return conn, nil
}
+
+// UnaryInterceptor returns the unary interceptors that should be configured for a client.
+func UnaryInterceptor() grpc.DialOption {
+ return grpc.WithChainUnaryInterceptor(
+ grpctracing.UnaryClientTracingInterceptor(),
+ grpccorrelation.UnaryClientCorrelationInterceptor(),
+ )
+}
diff --git a/internal/gitaly/client/dial_test.go b/internal/gitaly/client/dial_test.go
index 666db52c7..d74bffe73 100644
--- a/internal/gitaly/client/dial_test.go
+++ b/internal/gitaly/client/dial_test.go
@@ -21,7 +21,7 @@ func TestDial(t *testing.T) {
logger := testhelper.DiscardTestEntry(t)
srv := grpc.NewServer(
- grpc.Creds(backchannel.NewServerHandshaker(logger, backchannel.Insecure(), backchannel.NewRegistry())),
+ grpc.Creds(backchannel.NewServerHandshaker(logger, backchannel.Insecure(), backchannel.NewRegistry(), nil)),
grpc.UnknownServiceHandler(func(srv interface{}, stream grpc.ServerStream) error {
_, err := backchannel.GetPeerID(stream.Context())
if err == backchannel.ErrNonMultiplexedConnection {
diff --git a/internal/gitaly/server/server.go b/internal/gitaly/server/server.go
index a872fab68..2864fde85 100644
--- a/internal/gitaly/server/server.go
+++ b/internal/gitaly/server/server.go
@@ -17,6 +17,7 @@ import (
log "github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/internal/backchannel"
diskcache "gitlab.com/gitlab-org/gitaly/internal/cache"
+ "gitlab.com/gitlab-org/gitaly/internal/gitaly/client"
"gitlab.com/gitlab-org/gitaly/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/internal/gitaly/server/auth"
"gitlab.com/gitlab-org/gitaly/internal/helper/fieldextractors"
@@ -114,7 +115,7 @@ func New(secure bool, cfg config.Cfg, logrusEntry *log.Entry, registry *backchan
}
opts := []grpc.ServerOption{
- grpc.Creds(backchannel.NewServerHandshaker(logrusEntry, transportCredentials, registry)),
+ grpc.Creds(backchannel.NewServerHandshaker(logrusEntry, transportCredentials, registry, []grpc.DialOption{client.UnaryInterceptor()})),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
grpc_ctxtags.StreamServerInterceptor(ctxTagOpts...),
grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler
diff --git a/internal/praefect/nodes/sql_elector_test.go b/internal/praefect/nodes/sql_elector_test.go
index 68398a8c2..5ddc74183 100644
--- a/internal/praefect/nodes/sql_elector_test.go
+++ b/internal/praefect/nodes/sql_elector_test.go
@@ -438,7 +438,7 @@ func TestConnectionMultiplexing(t *testing.T) {
logger := testhelper.DiscardTestEntry(t)
srv := grpc.NewServer(
- grpc.Creds(backchannel.NewServerHandshaker(logger, backchannel.Insecure(), backchannel.NewRegistry())),
+ grpc.Creds(backchannel.NewServerHandshaker(logger, backchannel.Insecure(), backchannel.NewRegistry(), nil)),
grpc.UnknownServiceHandler(func(srv interface{}, stream grpc.ServerStream) error {
_, err := backchannel.GetPeerID(stream.Context())
if err == backchannel.ErrNonMultiplexedConnection {