1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
|
package client
import (
"context"
"io"
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/v16/internal/backchannel"
"gitlab.com/gitlab-org/gitaly/v16/internal/listenmux"
"gitlab.com/gitlab-org/gitaly/v16/internal/sidechannel"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
// SidechannelRegistry associates sidechannel callbacks with outbound
// gRPC calls.
type SidechannelRegistry struct {
registry *sidechannel.Registry
logger *logrus.Entry
}
// NewSidechannelRegistry returns a new registry.
func NewSidechannelRegistry(logger *logrus.Entry) *SidechannelRegistry {
return &SidechannelRegistry{
registry: sidechannel.NewRegistry(),
logger: logger,
}
}
// Register registers a callback. It adds metadata to ctx and returns the
// new context. The caller must use the new context for the gRPC call.
// Caller must Close() the returned SidechannelWaiter to prevent resource
// leaks.
func (sr *SidechannelRegistry) Register(
ctx context.Context,
callback func(SidechannelConn) error,
) (context.Context, *SidechannelWaiter) {
ctx, waiter := sidechannel.RegisterSidechannel(
ctx,
sr.registry,
func(cc *sidechannel.ClientConn) error { return callback(cc) },
)
return ctx, &SidechannelWaiter{waiter: waiter}
}
// SidechannelWaiter represents a pending sidechannel and its callback.
type SidechannelWaiter struct{ waiter *sidechannel.Waiter }
// Close de-registers the sidechannel callback. If the callback is still
// running, Close blocks until it is done and returns the error return
// value of the callback. If the callback has not been called yet, Close
// returns an error immediately.
func (w *SidechannelWaiter) Close() error { return w.waiter.Close() }
// SidechannelConn allows a client to read and write bytes with less
// overhead than doing so via gRPC messages.
type SidechannelConn interface {
io.ReadWriter
// CloseWrite tells the server we won't write any more data. We can still
// read data from the server after CloseWrite(). A typical use case is in
// an RPC where the byte stream has a request/response pattern: the
// client then uses CloseWrite() to signal the end of the request body.
// When the client calls CloseWrite(), the server receives EOF.
CloseWrite() error
}
// TestSidechannelServer allows downstream consumers of this package to
// create mock sidechannel gRPC servers.
func TestSidechannelServer(
logger *logrus.Entry,
creds credentials.TransportCredentials,
handler func(interface{}, grpc.ServerStream, io.ReadWriteCloser) error,
) []grpc.ServerOption {
return []grpc.ServerOption{
SidechannelServer(logger, creds),
grpc.UnknownServiceHandler(func(srv interface{}, stream grpc.ServerStream) error {
conn, err := OpenServerSidechannel(stream.Context())
if err != nil {
return err
}
defer conn.Close()
return handler(srv, stream, conn)
}),
}
}
// SidechannelServer adds sidechannel support to a gRPC server
func SidechannelServer(logger *logrus.Entry, creds credentials.TransportCredentials) grpc.ServerOption {
lm := listenmux.New(creds)
lm.Register(backchannel.NewServerHandshaker(logger, backchannel.NewRegistry(), nil))
return grpc.Creds(lm)
}
// OpenServerSidechannel opens a sidechannel on the server side. This
// only works if the server was created using SidechannelServer().
func OpenServerSidechannel(ctx context.Context) (io.ReadWriteCloser, error) {
return sidechannel.OpenSidechannel(ctx)
}
|