Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'workhorse/internal/gitaly/gitaly.go')
-rw-r--r--workhorse/internal/gitaly/gitaly.go46
1 files changed, 38 insertions, 8 deletions
diff --git a/workhorse/internal/gitaly/gitaly.go b/workhorse/internal/gitaly/gitaly.go
index 6ea99962056..362f380dc4d 100644
--- a/workhorse/internal/gitaly/gitaly.go
+++ b/workhorse/internal/gitaly/gitaly.go
@@ -11,6 +11,7 @@ import (
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
+ "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
@@ -23,15 +24,19 @@ import (
)
type Server struct {
- Address string `json:"address"`
- Token string `json:"token"`
- Features map[string]string `json:"features"`
+ Address string `json:"address"`
+ Token string `json:"token"`
+ Features map[string]string `json:"features"`
+ Sidechannel bool `json:"sidechannel"`
}
-type cacheKey struct{ address, token string }
+type cacheKey struct {
+ address, token string
+ sidechannel bool
+}
func (server Server) cacheKey() cacheKey {
- return cacheKey{address: server.Address, token: server.Token}
+ return cacheKey{address: server.Address, token: server.Token, sidechannel: server.Sidechannel}
}
type connectionsCache struct {
@@ -41,9 +46,17 @@ type connectionsCache struct {
var (
jsonUnMarshaler = jsonpb.Unmarshaler{AllowUnknownFields: true}
- cache = connectionsCache{
+ // This connection cache map contains two types of connections:
+ // - Normal gRPC connections
+ // - Sidechannel connections. When client dials to the Gitaly server, the
+ // server multiplexes the connection using Yamux. In the future, the server
+ // can open another stream to transfer data without gRPC. Besides, we apply
+ // a framing protocol to add the half-close capability to Yamux streams.
+ // Hence, we cannot use those connections interchangeably.
+ cache = connectionsCache{
connections: make(map[cacheKey]*grpc.ClientConn),
}
+ sidechannelRegistry *gitalyclient.SidechannelRegistry
connectionsTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
@@ -54,6 +67,12 @@ var (
)
)
+func InitializeSidechannelRegistry(logger *logrus.Logger) {
+ if sidechannelRegistry == nil {
+ sidechannelRegistry = gitalyclient.NewSidechannelRegistry(logrus.NewEntry(logger))
+ }
+}
+
func withOutgoingMetadata(ctx context.Context, features map[string]string) context.Context {
md := metadata.New(nil)
for k, v := range features {
@@ -72,7 +91,12 @@ func NewSmartHTTPClient(ctx context.Context, server Server) (context.Context, *S
return nil, nil, err
}
grpcClient := gitalypb.NewSmartHTTPServiceClient(conn)
- return withOutgoingMetadata(ctx, server.Features), &SmartHTTPClient{grpcClient}, nil
+ smartHTTPClient := &SmartHTTPClient{
+ SmartHTTPServiceClient: grpcClient,
+ sidechannelRegistry: sidechannelRegistry,
+ useSidechannel: server.Sidechannel,
+ }
+ return withOutgoingMetadata(ctx, server.Features), smartHTTPClient, nil
}
func NewBlobClient(ctx context.Context, server Server) (context.Context, *BlobClient, error) {
@@ -173,7 +197,13 @@ func newConnection(server Server) (*grpc.ClientConn, error) {
),
)
- conn, connErr := gitalyclient.Dial(server.Address, connOpts)
+ var conn *grpc.ClientConn
+ var connErr error
+ if server.Sidechannel {
+ conn, connErr = gitalyclient.DialSidechannel(context.Background(), server.Address, sidechannelRegistry, connOpts) // lint:allow context.Background
+ } else {
+ conn, connErr = gitalyclient.Dial(server.Address, connOpts)
+ }
label := "ok"
if connErr != nil {