Welcome to mirror list, hosted at ThFree Co, Russian Federation.

peeker_test.go « proxy « grpc-proxy « praefect « internal - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 80716b156221f24d42773483d304f6ab2a6bcf52 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
//go:build !gitaly_test_sha256

package proxy_test

import (
	"context"
	"io"
	"testing"

	"github.com/stretchr/testify/require"
	"gitlab.com/gitlab-org/gitaly/v16/internal/metadata"
	"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/grpc-proxy/proxy"
	"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
	"google.golang.org/grpc/test/grpc_testing"
	"google.golang.org/protobuf/proto"
)

// TestStreamPeeking demonstrates that a director function is able to peek into a stream. Further
// more, it demonstrates that peeking into a stream will not disturb the stream sent from the proxy
// client to the backend.
func TestStreamPeeking(t *testing.T) {
	t.Parallel()

	ctx := testhelper.Context(t)

	backendCC, backendSrvr := newBackendPinger(t, ctx)

	requestSent := &grpc_testing.StreamingOutputCallRequest{
		Payload: &grpc_testing.Payload{
			Body: []byte("hi"),
		},
	}
	responseSent := &grpc_testing.StreamingOutputCallResponse{
		Payload: &grpc_testing.Payload{
			Body: []byte("bye"),
		},
	}

	// We create a director that's peeking into the message in order to assert that the peeked
	// message will still be seen by the client.
	director := func(ctx context.Context, _ string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) {
		peekedMessage, err := peeker.Peek()
		require.NoError(t, err)

		var peekedRequest grpc_testing.StreamingOutputCallRequest
		require.NoError(t, proto.Unmarshal(peekedMessage, &peekedRequest))
		testhelper.ProtoEqual(t, requestSent, &peekedRequest)

		return proxy.NewStreamParameters(proxy.Destination{
			Ctx:  metadata.IncomingToOutgoing(ctx),
			Conn: backendCC,
			Msg:  peekedMessage,
		}, nil, nil, nil), nil
	}

	// The backend is supposed to still receive the message as expected without any modification
	// to it.
	backendSrvr.fullDuplexCall = func(stream grpc_testing.TestService_FullDuplexCallServer) error {
		requestReceived, err := stream.Recv()
		require.NoError(t, err)
		testhelper.ProtoEqual(t, requestSent, requestReceived)

		return stream.Send(responseSent)
	}

	proxyConn := newProxy(t, ctx, director, "grpc_testing.TestService", "FullDuplexCall")
	proxyClient := grpc_testing.NewTestServiceClient(proxyConn)

	// Send the request on the stream and close the writing side.
	proxyStream, err := proxyClient.FullDuplexCall(ctx)
	require.NoError(t, err)
	require.NoError(t, proxyStream.Send(requestSent))
	require.NoError(t, proxyStream.CloseSend())

	// And now verify that the response we've got in fact matches our expected response.
	responseReceived, err := proxyStream.Recv()
	require.NoError(t, err)
	testhelper.ProtoEqual(t, responseReceived, responseSent)

	_, err = proxyStream.Recv()
	require.Equal(t, io.EOF, err)
}

func TestStreamInjecting(t *testing.T) {
	t.Parallel()

	ctx := testhelper.Context(t)

	backendCC, backendSrvr := newBackendPinger(t, ctx)

	requestSent := &grpc_testing.StreamingOutputCallRequest{
		Payload: &grpc_testing.Payload{
			Body: []byte("hi"),
		},
	}
	requestReplaced := &grpc_testing.StreamingOutputCallRequest{
		Payload: &grpc_testing.Payload{
			Body: []byte("replaced"),
		},
	}
	responseSent := &grpc_testing.StreamingOutputCallResponse{
		Payload: &grpc_testing.Payload{
			Body: []byte("bye"),
		},
	}

	// We create a director that peeks the incoming request and in fact changes its values. This
	// is to assert that the client receives the changed requests.
	director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) {
		peekedMessage, err := peeker.Peek()
		require.NoError(t, err)

		// Assert that we get the expected original ping request.
		var peekedRequest grpc_testing.StreamingOutputCallRequest
		require.NoError(t, proto.Unmarshal(peekedMessage, &peekedRequest))
		testhelper.ProtoEqual(t, requestSent, &peekedRequest)

		// Replace the value of the peeked request and send along the changed request.
		replacedMessage, err := proto.Marshal(requestReplaced)
		require.NoError(t, err)

		return proxy.NewStreamParameters(proxy.Destination{
			Ctx:  metadata.IncomingToOutgoing(ctx),
			Conn: backendCC,
			Msg:  replacedMessage,
		}, nil, nil, nil), nil
	}

	// Upon receiving the request the backend server should only ever see the changed request.
	backendSrvr.fullDuplexCall = func(stream grpc_testing.TestService_FullDuplexCallServer) error {
		requestReceived, err := stream.Recv()
		require.NoError(t, err)
		testhelper.ProtoEqual(t, requestReplaced, requestReceived)

		return stream.Send(responseSent)
	}

	proxyConn := newProxy(t, ctx, director, "grpc_testing.TestService", "FullDuplexCall")
	proxyClient := grpc_testing.NewTestServiceClient(proxyConn)

	proxyStream, err := proxyClient.FullDuplexCall(ctx)
	require.NoError(t, err)
	defer func() {
		require.NoError(t, proxyStream.CloseSend())
	}()
	require.NoError(t, proxyStream.Send(requestSent))

	responseReceived, err := proxyStream.Recv()
	require.NoError(t, err)
	testhelper.ProtoEqual(t, responseSent, responseReceived)

	_, err = proxyStream.Recv()
	require.Equal(t, io.EOF, err)
}