Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/client
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2021-03-24 15:37:39 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2021-03-30 18:02:37 +0300
commit1a2690a8cb42f304a501441ba0c217c265b6583a (patch)
treec23bab9479a1bc502c1af4248000d4844e74d563 /client
parent42f1073089f7f4f6513657c607776b033d3edbc5 (diff)
add DialWithMux to client package
This commit adds DialWithMux to client package which dials to Gitaly using a multiplexed connection. This is an experimental function that will be removed in the future and is not considered for backwards compatibility. This function is only going to be used for performance testing the multiplexed connection.
Diffstat (limited to 'client')
-rw-r--r--client/dial.go43
-rw-r--r--client/dial_test.go47
2 files changed, 82 insertions, 8 deletions
diff --git a/client/dial.go b/client/dial.go
index 68570d159..613535a5f 100644
--- a/client/dial.go
+++ b/client/dial.go
@@ -8,6 +8,8 @@ import (
"net/url"
"time"
+ "github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/internal/backchannel"
gitaly_x509 "gitlab.com/gitlab-org/gitaly/internal/x509"
grpccorrelation "gitlab.com/gitlab-org/labkit/correlation/grpc"
grpctracing "gitlab.com/gitlab-org/labkit/tracing/grpc"
@@ -30,8 +32,23 @@ const (
)
func DialContext(ctx context.Context, rawAddress string, connOpts []grpc.DialOption) (*grpc.ClientConn, error) {
+ return dialContext(ctx, rawAddress, connOpts, false, nil)
+}
+
+func Dial(rawAddress string, connOpts []grpc.DialOption) (*grpc.ClientConn, error) {
+ return dialContext(context.Background(), rawAddress, connOpts, false, nil)
+}
+
+// DialWithMux dials with a multiplexed connection to Gitaly. Experimental, this is going to be removed and
+// should not be depended upon.
+func DialWithMux(ctx context.Context, rawAddress string, connOpts []grpc.DialOption, logger *logrus.Entry) (*grpc.ClientConn, error) {
+ return dialContext(ctx, rawAddress, connOpts, true, logger)
+}
+
+func dialContext(ctx context.Context, rawAddress string, connOpts []grpc.DialOption, muxed bool, logger *logrus.Entry) (*grpc.ClientConn, error) {
var canonicalAddress string
var err error
+ var transportCredentials credentials.TransportCredentials
switch getConnectionType(rawAddress) {
case invalidConnection:
@@ -48,23 +65,21 @@ func DialContext(ctx context.Context, rawAddress string, connOpts []grpc.DialOpt
return nil, fmt.Errorf("failed to get system certificat pool for 'tls' connection: %w", err)
}
- connOpts = append(connOpts, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{
+ transportCredentials = credentials.NewTLS(&tls.Config{
RootCAs: certPool,
MinVersion: tls.VersionTLS12,
- })))
+ })
case tcpConnection:
canonicalAddress, err = extractHostFromRemoteURL(rawAddress) // Ensure the form: "host:port" ...
if err != nil {
return nil, fmt.Errorf("failed to extract host for 'tcp' connection: %w", err)
}
- connOpts = append(connOpts, grpc.WithInsecure())
case unixConnection:
canonicalAddress = rawAddress // This will be overridden by the custom dialer...
connOpts = append(
connOpts,
- grpc.WithInsecure(),
// Use a custom dialer to ensure that we don't experience
// issues in environments that have proxy configurations
// https://gitlab.com/gitlab-org/gitaly/merge_requests/1072#note_140408512
@@ -80,6 +95,22 @@ func DialContext(ctx context.Context, rawAddress string, connOpts []grpc.DialOpt
)
}
+ if muxed {
+ if transportCredentials == nil {
+ transportCredentials = backchannel.Insecure()
+ }
+
+ transportCredentials = backchannel.ServerFactory(
+ func() backchannel.Server { return grpc.NewServer() },
+ ).ClientHandshaker(logger, transportCredentials)
+ }
+
+ if transportCredentials == nil {
+ connOpts = append(connOpts, grpc.WithInsecure())
+ } else {
+ connOpts = append(connOpts, grpc.WithTransportCredentials(transportCredentials))
+ }
+
connOpts = append(connOpts,
// grpc.KeepaliveParams must be specified at least as large as what is allowed by the
// server-side grpc.KeepaliveEnforcementPolicy
@@ -105,10 +136,6 @@ func DialContext(ctx context.Context, rawAddress string, connOpts []grpc.DialOpt
return conn, nil
}
-func Dial(rawAddress string, connOpts []grpc.DialOption) (*grpc.ClientConn, error) {
- return DialContext(context.Background(), rawAddress, connOpts)
-}
-
func getConnectionType(rawAddress string) connectionType {
u, err := url.Parse(rawAddress)
if err != nil {
diff --git a/client/dial_test.go b/client/dial_test.go
index b9a1dabe2..027aa6056 100644
--- a/client/dial_test.go
+++ b/client/dial_test.go
@@ -16,9 +16,11 @@ import (
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-client-go"
gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
+ "gitlab.com/gitlab-org/gitaly/internal/backchannel"
proxytestdata "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/testdata"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
gitaly_x509 "gitlab.com/gitlab-org/gitaly/internal/x509"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
"gitlab.com/gitlab-org/labkit/correlation"
grpccorrelation "gitlab.com/gitlab-org/labkit/correlation/grpc"
grpctracing "gitlab.com/gitlab-org/labkit/tracing/grpc"
@@ -533,3 +535,48 @@ func TestHealthCheckDialer(t *testing.T) {
require.NoError(t, err)
cc.Close()
}
+
+func TestDialWithMux(t *testing.T) {
+ errNonMuxed := status.Error(codes.Internal, "non-muxed connection")
+ errMuxed := status.Error(codes.Internal, "muxed connection")
+
+ logger := testhelper.DiscardTestEntry(t)
+
+ srv := grpc.NewServer(
+ grpc.Creds(backchannel.NewServerHandshaker(logger, backchannel.Insecure(), backchannel.NewRegistry())),
+ grpc.UnknownServiceHandler(func(srv interface{}, stream grpc.ServerStream) error {
+ _, err := backchannel.GetPeerID(stream.Context())
+ if err == backchannel.ErrNonMultiplexedConnection {
+ return errNonMuxed
+ }
+
+ assert.NoError(t, err)
+ return errMuxed
+ }),
+ )
+ defer srv.Stop()
+
+ ln, err := net.Listen("tcp", "localhost:0")
+ require.NoError(t, err)
+
+ go srv.Serve(ln)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ t.Run("non-muxed conn", func(t *testing.T) {
+ nonMuxedConn, err := DialContext(ctx, "tcp://"+ln.Addr().String(), nil)
+ require.NoError(t, err)
+ defer func() { require.NoError(t, nonMuxedConn.Close()) }()
+
+ require.Equal(t, errNonMuxed, nonMuxedConn.Invoke(ctx, "/Service/Method", &gitalypb.VoteTransactionRequest{}, &gitalypb.VoteTransactionResponse{}))
+ })
+
+ t.Run("muxed conn", func(t *testing.T) {
+ nonMuxedConn, err := DialWithMux(ctx, "tcp://"+ln.Addr().String(), nil, logger)
+ require.NoError(t, err)
+ defer func() { require.NoError(t, nonMuxedConn.Close()) }()
+
+ require.Equal(t, errMuxed, nonMuxedConn.Invoke(ctx, "/Service/Method", &gitalypb.VoteTransactionRequest{}, &gitalypb.VoteTransactionResponse{}))
+ })
+}