diff options
Diffstat (limited to 'workhorse/internal/gitaly/gitaly.go')
-rw-r--r-- | workhorse/internal/gitaly/gitaly.go | 46 |
1 files changed, 38 insertions, 8 deletions
diff --git a/workhorse/internal/gitaly/gitaly.go b/workhorse/internal/gitaly/gitaly.go index 6ea99962056..362f380dc4d 100644 --- a/workhorse/internal/gitaly/gitaly.go +++ b/workhorse/internal/gitaly/gitaly.go @@ -11,6 +11,7 @@ import ( grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/sirupsen/logrus" "google.golang.org/grpc" "google.golang.org/grpc/metadata" @@ -23,15 +24,19 @@ import ( ) type Server struct { - Address string `json:"address"` - Token string `json:"token"` - Features map[string]string `json:"features"` + Address string `json:"address"` + Token string `json:"token"` + Features map[string]string `json:"features"` + Sidechannel bool `json:"sidechannel"` } -type cacheKey struct{ address, token string } +type cacheKey struct { + address, token string + sidechannel bool +} func (server Server) cacheKey() cacheKey { - return cacheKey{address: server.Address, token: server.Token} + return cacheKey{address: server.Address, token: server.Token, sidechannel: server.Sidechannel} } type connectionsCache struct { @@ -41,9 +46,17 @@ type connectionsCache struct { var ( jsonUnMarshaler = jsonpb.Unmarshaler{AllowUnknownFields: true} - cache = connectionsCache{ + // This connection cache map contains two types of connections: + // - Normal gRPC connections + // - Sidechannel connections. When client dials to the Gitaly server, the + // server multiplexes the connection using Yamux. In the future, the server + // can open another stream to transfer data without gRPC. Besides, we apply + // a framing protocol to add the half-close capability to Yamux streams. + // Hence, we cannot use those connections interchangeably. + cache = connectionsCache{ connections: make(map[cacheKey]*grpc.ClientConn), } + sidechannelRegistry *gitalyclient.SidechannelRegistry connectionsTotal = promauto.NewCounterVec( prometheus.CounterOpts{ @@ -54,6 +67,12 @@ var ( ) ) +func InitializeSidechannelRegistry(logger *logrus.Logger) { + if sidechannelRegistry == nil { + sidechannelRegistry = gitalyclient.NewSidechannelRegistry(logrus.NewEntry(logger)) + } +} + func withOutgoingMetadata(ctx context.Context, features map[string]string) context.Context { md := metadata.New(nil) for k, v := range features { @@ -72,7 +91,12 @@ func NewSmartHTTPClient(ctx context.Context, server Server) (context.Context, *S return nil, nil, err } grpcClient := gitalypb.NewSmartHTTPServiceClient(conn) - return withOutgoingMetadata(ctx, server.Features), &SmartHTTPClient{grpcClient}, nil + smartHTTPClient := &SmartHTTPClient{ + SmartHTTPServiceClient: grpcClient, + sidechannelRegistry: sidechannelRegistry, + useSidechannel: server.Sidechannel, + } + return withOutgoingMetadata(ctx, server.Features), smartHTTPClient, nil } func NewBlobClient(ctx context.Context, server Server) (context.Context, *BlobClient, error) { @@ -173,7 +197,13 @@ func newConnection(server Server) (*grpc.ClientConn, error) { ), ) - conn, connErr := gitalyclient.Dial(server.Address, connOpts) + var conn *grpc.ClientConn + var connErr error + if server.Sidechannel { + conn, connErr = gitalyclient.DialSidechannel(context.Background(), server.Address, sidechannelRegistry, connOpts) // lint:allow context.Background + } else { + conn, connErr = gitalyclient.Dial(server.Address, connOpts) + } label := "ok" if connErr != nil { |