Welcome to mirror list, hosted at ThFree Co, Russian Federation.

chunker_test.go « chunk « helper « internal - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 79f236c867ad3e14a97901ce96d8062c3db97038 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package chunk

import (
	"io"
	"net"
	"os"
	"testing"

	"github.com/golang/protobuf/proto"
	"github.com/golang/protobuf/ptypes/wrappers"
	"github.com/stretchr/testify/require"
	test "gitlab.com/gitlab-org/gitaly/v14/internal/helper/chunk/pb"
	"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
	"google.golang.org/grpc"
)

func TestMain(m *testing.M) {
	os.Exit(testMain(m))
}

func testMain(m *testing.M) int {
	defer testhelper.MustHaveNoChildProcess()
	cleanup := testhelper.Configure()
	defer cleanup()
	return m.Run()
}

type testSender struct {
	stream test.Test_StreamOutputServer
	output [][]byte
}

func (ts *testSender) Reset() { ts.output = nil }
func (ts *testSender) Append(m proto.Message) {
	ts.output = append(ts.output, m.(*wrappers.BytesValue).Value)
}

func (ts *testSender) Send() error {
	return ts.stream.Send(&test.StreamOutputResponse{
		Msg: ts.output,
	})
}

func TestChunker(t *testing.T) {
	s := &server{}
	srv, serverSocketPath := runServer(t, s)
	defer srv.Stop()

	client, conn := newClient(t, serverSocketPath)
	defer conn.Close()

	ctx, cancel := testhelper.Context()
	defer cancel()

	stream, err := client.StreamOutput(ctx, &test.StreamOutputRequest{BytesToReturn: 3.5 * maxMessageSize})
	require.NoError(t, err)

	for {
		resp, err := stream.Recv()
		if err == io.EOF {
			break
		}
		require.Less(t, proto.Size(resp), maxMessageSize)
	}
}

type server struct{}

func (s *server) StreamOutput(req *test.StreamOutputRequest, srv test.Test_StreamOutputServer) error {
	const kilobyte = 1024

	c := New(&testSender{stream: srv})
	for numBytes := 0; numBytes < int(req.GetBytesToReturn()); numBytes += kilobyte {
		if err := c.Send(&wrappers.BytesValue{Value: make([]byte, kilobyte)}); err != nil {
			return err
		}
	}

	if err := c.Flush(); err != nil {
		return err
	}
	return nil
}

func runServer(t *testing.T, s *server, opt ...grpc.ServerOption) (*grpc.Server, string) {
	serverSocketPath := testhelper.GetTemporaryGitalySocketFileName(t)
	grpcServer := grpc.NewServer(opt...)
	test.RegisterTestServer(grpcServer, s)

	lis, err := net.Listen("unix", serverSocketPath)
	require.NoError(t, err)

	go grpcServer.Serve(lis)

	return grpcServer, "unix://" + serverSocketPath
}

func newClient(t *testing.T, serverSocketPath string) (test.TestClient, *grpc.ClientConn) {
	connOpts := []grpc.DialOption{
		grpc.WithInsecure(),
	}
	conn, err := grpc.Dial(serverSocketPath, connOpts...)
	if err != nil {
		t.Fatal(err)
	}

	return test.NewTestClient(conn), conn
}