Welcome to mirror list, hosted at ThFree Co, Russian Federation.

node_test.go « praefect « internal - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: eebb9ba3289eb7d0af376b4bdc4867a3e1c4c8a4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package praefect

import (
	"fmt"
	"net"
	"path/filepath"
	"testing"

	"github.com/stretchr/testify/require"
	"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
	"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes"
	"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/health"
	"google.golang.org/grpc/health/grpc_health_v1"
	"google.golang.org/grpc/status"
)

func TestDialNodes(t *testing.T) {
	ctx := testhelper.Context(t)

	tmp := testhelper.TempDir(t)

	type nodeAssertion struct {
		storage string
		token   string
		status  grpc_health_v1.HealthCheckResponse_ServingStatus
		error   string
	}

	expectedNodes := []nodeAssertion{
		{
			storage: "healthy",
			token:   "healthy-token",
			status:  grpc_health_v1.HealthCheckResponse_SERVING,
		},
		{
			storage: "unhealthy",
			token:   "unhealthy-token",
			status:  grpc_health_v1.HealthCheckResponse_NOT_SERVING,
		},
	}

	var cfgNodes []*config.Node
	for _, n := range expectedNodes {
		socket := filepath.Join(tmp, n.storage)
		ln, err := net.Listen("unix", socket)
		require.NoError(t, err)
		healthSrv := health.NewServer()
		healthSrv.SetServingStatus("", n.status)
		srv := grpc.NewServer()
		grpc_health_v1.RegisterHealthServer(srv, healthSrv)
		defer srv.Stop()
		go srv.Serve(ln)

		cfgNodes = append(cfgNodes, &config.Node{
			Storage: n.storage,
			Token:   n.token,
			Address: fmt.Sprintf("%s://%s", ln.Addr().Network(), ln.Addr().String()),
		})
	}

	expectedNodes = append(expectedNodes, nodeAssertion{
		storage: "invalid",
		error:   status.Error(codes.Unavailable, `connection error: desc = "transport: Error while dialing dial unix non-existent-socket: connect: no such file or directory"`).Error(),
	})

	nodeSet, err := DialNodes(ctx,
		[]*config.VirtualStorage{{
			Name: "virtual-storage",
			Nodes: append(cfgNodes, &config.Node{
				Storage: "invalid",
				Address: "unix:non-existent-socket",
			}),
		}}, nil, nil, nil, nil,
	)
	require.NoError(t, err)
	defer nodeSet.Close()

	conns := nodeSet.Connections()
	healthClients := nodeSet.HealthClients()

	var actualNodes []nodeAssertion
	for virtualStorage, nodes := range nodeSet {
		for _, node := range nodes {
			require.NotNil(t, conns[virtualStorage][node.Storage], "connection not found for storage %q", node.Storage)
			resp, err := healthClients[virtualStorage][node.Storage].Check(ctx, &grpc_health_v1.HealthCheckRequest{})

			var errStr string
			if err != nil {
				errStr = err.Error()
			}

			assertion := nodeAssertion{
				storage: node.Storage,
				token:   node.Token,
				error:   errStr,
			}

			if resp != nil {
				assertion.status = resp.Status
			}

			actualNodes = append(actualNodes, assertion)

			delete(conns[virtualStorage], node.Storage)
			delete(healthClients[virtualStorage], node.Storage)
		}
	}

	require.ElementsMatch(t, expectedNodes, actualNodes)
	require.Equal(t, Connections{"virtual-storage": {}}, conns, "unexpected connections")
	require.Equal(t, nodes.HealthClients{"virtual-storage": {}}, healthClients, "unexpected health clients")
}