Welcome to mirror list, hosted at ThFree Co, Russian Federation.

sidechannel.go « client - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 0741d07ccf60acdc16714d77628d186fd3f69dfb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package client

import (
	"context"
	"io"

	"github.com/sirupsen/logrus"
	"gitlab.com/gitlab-org/gitaly/v16/internal/backchannel"
	"gitlab.com/gitlab-org/gitaly/v16/internal/listenmux"
	"gitlab.com/gitlab-org/gitaly/v16/internal/sidechannel"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials"
)

// SidechannelRegistry associates sidechannel callbacks with outbound
// gRPC calls.
type SidechannelRegistry struct {
	registry *sidechannel.Registry
	logger   *logrus.Entry
}

// NewSidechannelRegistry returns a new registry.
func NewSidechannelRegistry(logger *logrus.Entry) *SidechannelRegistry {
	return &SidechannelRegistry{
		registry: sidechannel.NewRegistry(),
		logger:   logger,
	}
}

// Register registers a callback. It adds metadata to ctx and returns the
// new context. The caller must use the new context for the gRPC call.
// Caller must Close() the returned SidechannelWaiter to prevent resource
// leaks.
func (sr *SidechannelRegistry) Register(
	ctx context.Context,
	callback func(SidechannelConn) error,
) (context.Context, *SidechannelWaiter) {
	ctx, waiter := sidechannel.RegisterSidechannel(
		ctx,
		sr.registry,
		func(cc *sidechannel.ClientConn) error { return callback(cc) },
	)
	return ctx, &SidechannelWaiter{waiter: waiter}
}

// SidechannelWaiter represents a pending sidechannel and its callback.
type SidechannelWaiter struct{ waiter *sidechannel.Waiter }

// Close de-registers the sidechannel callback. If the callback is still
// running, Close blocks until it is done and returns the error return
// value of the callback. If the callback has not been called yet, Close
// returns an error immediately.
func (w *SidechannelWaiter) Close() error { return w.waiter.Close() }

// SidechannelConn allows a client to read and write bytes with less
// overhead than doing so via gRPC messages.
type SidechannelConn interface {
	io.ReadWriter

	// CloseWrite tells the server we won't write any more data. We can still
	// read data from the server after CloseWrite(). A typical use case is in
	// an RPC where the byte stream has a request/response pattern: the
	// client then uses CloseWrite() to signal the end of the request body.
	// When the client calls CloseWrite(), the server receives EOF.
	CloseWrite() error
}

// TestSidechannelServer allows downstream consumers of this package to
// create mock sidechannel gRPC servers.
func TestSidechannelServer(
	logger *logrus.Entry,
	creds credentials.TransportCredentials,
	handler func(interface{}, grpc.ServerStream, io.ReadWriteCloser) error,
) []grpc.ServerOption {
	return []grpc.ServerOption{
		SidechannelServer(logger, creds),
		grpc.UnknownServiceHandler(func(srv interface{}, stream grpc.ServerStream) error {
			conn, err := OpenServerSidechannel(stream.Context())
			if err != nil {
				return err
			}
			defer conn.Close()

			return handler(srv, stream, conn)
		}),
	}
}

// SidechannelServer adds sidechannel support to a gRPC server
func SidechannelServer(logger *logrus.Entry, creds credentials.TransportCredentials) grpc.ServerOption {
	lm := listenmux.New(creds)
	lm.Register(backchannel.NewServerHandshaker(logger, backchannel.NewRegistry(), nil))
	return grpc.Creds(lm)
}

// OpenServerSidechannel opens a sidechannel on the server side. This
// only works if the server was created using SidechannelServer().
func OpenServerSidechannel(ctx context.Context) (io.ReadWriteCloser, error) {
	return sidechannel.OpenSidechannel(ctx)
}