Welcome to mirror list, hosted at ThFree Co, Russian Federation.

sidechannel.go « client - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 26729c1381924b228c0866b0247929249994cc18 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package client

import (
	"context"
	"io"

	"github.com/sirupsen/logrus"
	"gitlab.com/gitlab-org/gitaly/v14/internal/backchannel"
	"gitlab.com/gitlab-org/gitaly/v14/internal/listenmux"
	"gitlab.com/gitlab-org/gitaly/v14/internal/sidechannel"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials"
)

// SidechannelRegistry associates sidechannel callbacks with outbound
// gRPC calls.
type SidechannelRegistry struct {
	registry *sidechannel.Registry
	logger   *logrus.Entry
}

// NewSidechannelRegistry returns a new registry.
func NewSidechannelRegistry(logger *logrus.Entry) *SidechannelRegistry {
	return &SidechannelRegistry{
		registry: sidechannel.NewRegistry(),
		logger:   logger,
	}
}

// Register registers a callback. It adds metadata to ctx and returns the
// new context. The caller must use the new context for the gRPC call.
// Caller must Close() the returned SidechannelWaiter to prevent resource
// leaks.
func (sr *SidechannelRegistry) Register(
	ctx context.Context,
	callback func(SidechannelConn) error,
) (context.Context, *SidechannelWaiter) {
	ctx, waiter := sidechannel.RegisterSidechannel(
		ctx,
		sr.registry,
		func(cc *sidechannel.ClientConn) error { return callback(cc) },
	)
	return ctx, &SidechannelWaiter{waiter: waiter}
}

// SidechannelWaiter represents a pending sidechannel and its callback.
type SidechannelWaiter struct{ waiter *sidechannel.Waiter }

// Close de-registers the sidechannel callback. If the callback is still
// running, Close blocks until it is done and returns the error return
// value of the callback. If the callback has not been called yet, Close
// returns an error immediately.
func (w *SidechannelWaiter) Close() error { return w.waiter.Close() }

// SidechannelConn allows a client to read and write bytes with less
// overhead than doing so via gRPC messages.
type SidechannelConn interface {
	io.ReadWriter

	// CloseWrite tells the server we won't write any more data. We can still
	// read data from the server after CloseWrite(). A typical use case is in
	// an RPC where the byte stream has a request/response pattern: the
	// client then uses CloseWrite() to signal the end of the request body.
	// When the client calls CloseWrite(), the server receives EOF.
	CloseWrite() error
}

// TestSidechannelServer allows downstream consumers of this package to
// create mock sidechannel gRPC servers.
func TestSidechannelServer(
	logger *logrus.Entry,
	creds credentials.TransportCredentials,
	handler func(interface{}, grpc.ServerStream, io.ReadWriteCloser) error,
) []grpc.ServerOption {
	lm := listenmux.New(creds)
	lm.Register(backchannel.NewServerHandshaker(logger, backchannel.NewRegistry(), nil))

	return []grpc.ServerOption{
		grpc.Creds(lm),
		grpc.UnknownServiceHandler(func(srv interface{}, stream grpc.ServerStream) error {
			conn, err := sidechannel.OpenSidechannel(stream.Context())
			if err != nil {
				return err
			}
			defer conn.Close()

			return handler(srv, stream, conn)
		}),
	}
}