Welcome to mirror list, hosted at ThFree Co, Russian Federation.

backchannel_example_test.go « backchannel « internal - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 63f4e5c5d1af6cbe8c53bba70de3abf56af6c31b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package backchannel_test

import (
	"context"
	"fmt"
	"net"

	"github.com/sirupsen/logrus"
	"gitlab.com/gitlab-org/gitaly/v14/internal/backchannel"
	"gitlab.com/gitlab-org/gitaly/v14/internal/listenmux"
	"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
)

func Example() {
	// Open the server's listener.
	ln, err := net.Listen("tcp", "localhost:0")
	if err != nil {
		fmt.Printf("failed to start listener: %v", err)
		return
	}

	// Registry is for storing the open backchannels. It should be passed into the ServerHandshaker
	// which creates the backchannel connections and adds them to the registry. The RPC handlers
	// can use the registry to access available backchannels by their peer ID.
	registry := backchannel.NewRegistry()

	logger := logrus.NewEntry(logrus.New())

	// ServerHandshaker initiates the multiplexing session on the server side. Once that is done,
	// it creates the backchannel connection and stores it into the registry. For each connection,
	// the ServerHandshaker passes down the peer ID via the context. The peer ID identifies a
	// backchannel connection.
	lm := listenmux.New(insecure.NewCredentials())
	lm.Register(backchannel.NewServerHandshaker(logger, registry, nil))

	// Create the server
	srv := grpc.NewServer(
		grpc.Creds(lm),
		grpc.UnknownServiceHandler(func(srv interface{}, stream grpc.ServerStream) error {
			fmt.Println("Gitaly received a transactional mutator")

			backchannelID, err := backchannel.GetPeerID(stream.Context())
			if err == backchannel.ErrNonMultiplexedConnection {
				// This call is from a client that is not multiplexing aware. Client is not
				// Praefect, so no need to perform voting. The client could be for example
				// GitLab calling Gitaly directly.
				fmt.Println("Gitaly responding to a non-multiplexed client")
				return stream.SendMsg(&gitalypb.CreateBranchResponse{})
			} else if err != nil {
				return fmt.Errorf("get peer id: %w", err)
			}

			backchannelConn, err := registry.Backchannel(backchannelID)
			if err != nil {
				return fmt.Errorf("get backchannel: %w", err)
			}

			fmt.Println("Gitaly sending vote to Praefect via backchannel")
			if err := backchannelConn.Invoke(
				stream.Context(), "/Praefect/VoteTransaction",
				&gitalypb.VoteTransactionRequest{}, &gitalypb.VoteTransactionResponse{},
			); err != nil {
				return fmt.Errorf("invoke backchannel: %w", err)
			}
			fmt.Println("Gitaly received vote response via backchannel")

			fmt.Println("Gitaly responding to the transactional mutator")
			return stream.SendMsg(&gitalypb.CreateBranchResponse{})
		}),
	)
	defer srv.Stop()

	// Start the server
	go func() {
		if err := srv.Serve(ln); err != nil {
			fmt.Printf("failed to serve: %v", err)
		}
	}()

	fmt.Printf("Invoke with a multiplexed client:\n\n")
	if err := invokeWithMuxedClient(logger, ln.Addr().String()); err != nil {
		fmt.Printf("failed to invoke with muxed client: %v", err)
		return
	}

	fmt.Printf("\nInvoke with a non-multiplexed client:\n\n")
	if err := invokeWithNormalClient(ln.Addr().String()); err != nil {
		fmt.Printf("failed to invoke with non-muxed client: %v", err)
		return
	}
	// Output:
	// Invoke with a multiplexed client:
	//
	// Gitaly received a transactional mutator
	// Gitaly sending vote to Praefect via backchannel
	// Praefect received vote via backchannel
	// Praefect responding via backchannel
	// Gitaly received vote response via backchannel
	// Gitaly responding to the transactional mutator
	//
	// Invoke with a non-multiplexed client:
	//
	// Gitaly received a transactional mutator
	// Gitaly responding to a non-multiplexed client
}

func invokeWithMuxedClient(logger *logrus.Entry, address string) error {
	// clientHandshaker's ClientHandshake gets called on each established connection. The Server returned by the
	// ServerFactory is started on Praefect's end of the connection, which Gitaly can call.
	clientHandshaker := backchannel.NewClientHandshaker(logger, func() backchannel.Server {
		return grpc.NewServer(grpc.UnknownServiceHandler(func(srv interface{}, stream grpc.ServerStream) error {
			fmt.Println("Praefect received vote via backchannel")
			fmt.Println("Praefect responding via backchannel")
			return stream.SendMsg(&gitalypb.VoteTransactionResponse{})
		}))
	})

	return invokeWithOpts(address, grpc.WithTransportCredentials(clientHandshaker.ClientHandshake(insecure.NewCredentials())))
}

func invokeWithNormalClient(address string) error {
	return invokeWithOpts(address, grpc.WithInsecure())
}

func invokeWithOpts(address string, opts ...grpc.DialOption) error {
	clientConn, err := grpc.Dial(address, opts...)
	if err != nil {
		return fmt.Errorf("dial server: %w", err)
	}

	if err := clientConn.Invoke(context.Background(), "/Gitaly/Mutator", &gitalypb.CreateBranchRequest{}, &gitalypb.CreateBranchResponse{}); err != nil {
		return fmt.Errorf("call server: %w", err)
	}

	if err := clientConn.Close(); err != nil {
		return fmt.Errorf("close clientConn: %w", err)
	}

	return nil
}