blob: 627a492d25151597323f2a90e7d269bdc2344d8f (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
// Package backchannel implements connection multiplexing that allows for invoking
// gRPC methods from the server to the client.
//
// gRPC allows only for invoking RPCs from client to the server. Invoking
// RPCs from the server to the client can be useful in some cases such as
// tunneling through firewalls. While implementing such a use case would be
// possible with plain bidirectional streams, the approach has various limitations
// that force additional work on the user. All messages in a single stream are ordered
// and processed sequentially. If concurrency is desired, this would require the user
// to implement their own concurrency handling. Request routing and cancellations would also
// have to be implemented separately on top of the bidirectional stream.
//
// To do away with these problems, this package provides a multiplexed transport for running two
// independent gRPC sessions on a single connection. This allows for dialing back to the client from
// the server to establish another gRPC session where the server and client roles are switched.
//
// The server side uses listenmux to support clients that are unaware of the multiplexing.
//
// Usage:
// 1. Implement a ServerFactory, which is simply a function that returns a Server that can serve on the backchannel
// connection. Plug in the ClientHandshake to the Clientconn via grpc.WithTransportCredentials when dialing.
// This ensures all connections established by gRPC work with a multiplexing session and have a backchannel Server serving.
// 2. Create a *listenmux.Mux and register a *ServerHandshaker with it.
// 3. Pass the *listenmux.Mux into the grpc Server using grpc.Creds.
// The Handshake method is called on each newly established connection that presents the backchannel magic bytes. It dials back to the client's backchannel server. Server
// makes the backchannel connection's available later via the Registry's Backchannel method. The ID of the
// peer associated with the current RPC handler can be fetched via GetPeerID. The returned ID can be used
// to access the correct backchannel connection from the Registry.
package backchannel
import (
"net"
"sync"
"github.com/hashicorp/yamux"
"github.com/sirupsen/logrus"
)
// magicBytes are sent by the client to server to identify as a multiplexing aware client.
var magicBytes = []byte("backchannel")
// muxConfig returns a new config to use with the multiplexing session.
func muxConfig(logger logrus.FieldLogger, cfg Configuration) *yamux.Config {
yamuxCfg := yamux.DefaultConfig()
yamuxCfg.Logger = logger
yamuxCfg.LogOutput = nil
// gRPC is already configured to send keep alives so we don't need yamux to do this for us.
// gRPC is a better choice as it sends the keep alives also to non-multiplexed connections.
yamuxCfg.EnableKeepAlive = false
yamuxCfg.AcceptBacklog = cfg.AcceptBacklog
yamuxCfg.MaxStreamWindowSize = cfg.MaximumStreamWindowSizeBytes
yamuxCfg.StreamCloseTimeout = cfg.StreamCloseTimeout
return yamuxCfg
}
// connCloser wraps a net.Conn and calls the provided close function instead when Close
// is called.
type connCloser struct {
net.Conn
// once ensures the close function is called only once. gRPC may invoke Close() on connCloser
// multiple times.
once sync.Once
close func() error
// closeErr records the error from the first call to close.
closeErr error
}
// Close calls the provided close function. The close function is only executed once and
// further calls to Close return the error from the first invocation.
func (cc *connCloser) Close() error {
cc.once.Do(func() { cc.closeErr = cc.close() })
return cc.closeErr
}
|