1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
|
package chunk
import (
"io"
"net"
"strconv"
"testing"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/test/grpc_testing"
"google.golang.org/protobuf/proto"
)
func TestMain(m *testing.M) {
testhelper.Run(m)
}
type testSender struct {
stream grpc_testing.TestService_StreamingOutputCallServer
body []byte
}
func (ts *testSender) Reset() { ts.body = ts.body[:0] }
func (ts *testSender) Append(m proto.Message) {
ts.body = append(ts.body, m.(*grpc_testing.Payload).Body...)
}
func (ts *testSender) Send() error {
return ts.stream.Send(&grpc_testing.StreamingOutputCallResponse{
Payload: &grpc_testing.Payload{
Body: ts.body,
},
})
}
func TestChunker(t *testing.T) {
s := &server{}
srv, serverSocketPath := runServer(t, s)
defer srv.Stop()
client, conn := newClient(t, serverSocketPath)
defer conn.Close()
ctx := testhelper.Context(t)
stream, err := client.StreamingOutputCall(ctx, &grpc_testing.StreamingOutputCallRequest{
Payload: &grpc_testing.Payload{
Body: []byte(strconv.FormatInt(3.5*maxMessageSize, 10)),
},
})
require.NoError(t, err)
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
require.Less(t, proto.Size(resp), maxMessageSize)
}
}
type server struct {
grpc_testing.UnimplementedTestServiceServer
}
func (s *server) StreamingOutputCall(req *grpc_testing.StreamingOutputCallRequest, stream grpc_testing.TestService_StreamingOutputCallServer) error {
const kilobyte = 1024
bytesToSend, err := strconv.ParseInt(string(req.GetPayload().GetBody()), 10, 64)
if err != nil {
return err
}
c := New(&testSender{stream: stream})
for numBytes := int64(0); numBytes < bytesToSend; numBytes += kilobyte {
if err := c.Send(&grpc_testing.Payload{Body: make([]byte, kilobyte)}); err != nil {
return err
}
}
if err := c.Flush(); err != nil {
return err
}
return nil
}
func runServer(t *testing.T, s *server, opt ...grpc.ServerOption) (*grpc.Server, string) {
serverSocketPath := testhelper.GetTemporaryGitalySocketFileName(t)
grpcServer := grpc.NewServer(opt...)
grpc_testing.RegisterTestServiceServer(grpcServer, s)
lis, err := net.Listen("unix", serverSocketPath)
require.NoError(t, err)
go testhelper.MustServe(t, grpcServer, lis)
return grpcServer, "unix://" + serverSocketPath
}
func newClient(t *testing.T, serverSocketPath string) (grpc_testing.TestServiceClient, *grpc.ClientConn) {
connOpts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
}
conn, err := grpc.Dial(serverSocketPath, connOpts...)
if err != nil {
t.Fatal(err)
}
return grpc_testing.NewTestServiceClient(conn), conn
}
|