Welcome to mirror list, hosted at ThFree Co, Russian Federation.

backchannel.go « backchannel « grpc « internal - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 627a492d25151597323f2a90e7d269bdc2344d8f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
// Package backchannel implements connection multiplexing that allows for invoking
// gRPC methods from the server to the client.
//
// gRPC allows only for invoking RPCs from client to the server. Invoking
// RPCs from the server to the client can be useful in some cases such as
// tunneling through firewalls. While implementing such a use case would be
// possible with plain bidirectional streams, the approach has various limitations
// that force additional work on the user. All messages in a single stream are ordered
// and processed sequentially. If concurrency is desired, this would require the user
// to implement their own concurrency handling. Request routing and cancellations would also
// have to be implemented separately on top of the bidirectional stream.
//
// To do away with these problems, this package provides a multiplexed transport for running two
// independent gRPC sessions on a single connection. This allows for dialing back to the client from
// the server to establish another gRPC session where the server and client roles are switched.
//
// The server side uses listenmux to support clients that are unaware of the multiplexing.
//
// Usage:
//  1. Implement a ServerFactory, which is simply a function that returns a Server that can serve on the backchannel
//     connection. Plug in the ClientHandshake to the Clientconn via grpc.WithTransportCredentials when dialing.
//     This ensures all connections established by gRPC work with a multiplexing session and have a backchannel Server serving.
//  2. Create a *listenmux.Mux and register a *ServerHandshaker with it.
//  3. Pass the *listenmux.Mux into the grpc Server using grpc.Creds.
//     The Handshake method is called on each newly established connection that presents the backchannel magic bytes. It dials back to the client's backchannel server. Server
//     makes the backchannel connection's available later via the Registry's Backchannel method. The ID of the
//     peer associated with the current RPC handler can be fetched via GetPeerID. The returned ID can be used
//     to access the correct backchannel connection from the Registry.
package backchannel

import (
	"net"
	"sync"

	"github.com/hashicorp/yamux"
	"github.com/sirupsen/logrus"
)

// magicBytes are sent by the client to server to identify as a multiplexing aware client.
var magicBytes = []byte("backchannel")

// muxConfig returns a new config to use with the multiplexing session.
func muxConfig(logger logrus.FieldLogger, cfg Configuration) *yamux.Config {
	yamuxCfg := yamux.DefaultConfig()
	yamuxCfg.Logger = logger
	yamuxCfg.LogOutput = nil
	// gRPC is already configured to send keep alives so we don't need yamux to do this for us.
	// gRPC is a better choice as it sends the keep alives also to non-multiplexed connections.
	yamuxCfg.EnableKeepAlive = false
	yamuxCfg.AcceptBacklog = cfg.AcceptBacklog
	yamuxCfg.MaxStreamWindowSize = cfg.MaximumStreamWindowSizeBytes
	yamuxCfg.StreamCloseTimeout = cfg.StreamCloseTimeout

	return yamuxCfg
}

// connCloser wraps a net.Conn and calls the provided close function instead when Close
// is called.
type connCloser struct {
	net.Conn
	// once ensures the close function is called only once. gRPC may invoke Close() on connCloser
	// multiple times.
	once  sync.Once
	close func() error
	// closeErr records the error from the first call to close.
	closeErr error
}

// Close calls the provided close function. The close function is only executed once and
// further calls to Close return the error from the first invocation.
func (cc *connCloser) Close() error {
	cc.once.Do(func() { cc.closeErr = cc.close() })
	return cc.closeErr
}