1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
|
package praefect
import (
"fmt"
"io/ioutil"
"net"
"os"
"path/filepath"
"testing"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"
)
func TestDialNodes(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
tmp, err := ioutil.TempDir("", "praefect")
require.NoError(t, err)
defer os.RemoveAll(tmp)
type nodeAssertion struct {
storage string
token string
status grpc_health_v1.HealthCheckResponse_ServingStatus
error error
}
expectedNodes := []nodeAssertion{
{
storage: "healthy",
token: "healthy-token",
status: grpc_health_v1.HealthCheckResponse_SERVING,
},
{
storage: "unhealthy",
token: "unhealthy-token",
status: grpc_health_v1.HealthCheckResponse_NOT_SERVING,
},
}
var cfgNodes []*config.Node
for _, n := range expectedNodes {
socket := filepath.Join(tmp, n.storage)
ln, err := net.Listen("unix", socket)
require.NoError(t, err)
healthSrv := health.NewServer()
healthSrv.SetServingStatus("", n.status)
srv := grpc.NewServer()
grpc_health_v1.RegisterHealthServer(srv, healthSrv)
defer srv.Stop()
go srv.Serve(ln)
cfgNodes = append(cfgNodes, &config.Node{
Storage: n.storage,
Token: n.token,
Address: fmt.Sprintf("%s://%s", ln.Addr().Network(), ln.Addr().String()),
})
}
expectedNodes = append(expectedNodes, nodeAssertion{
storage: "invalid",
error: status.Error(codes.Unavailable, `connection error: desc = "transport: Error while dialing dial unix non-existent-socket: connect: no such file or directory"`),
})
nodeSet, err := DialNodes(ctx,
[]*config.VirtualStorage{{
Name: "virtual-storage",
Nodes: append(cfgNodes, &config.Node{
Storage: "invalid",
Address: "unix:non-existent-socket",
}),
}}, nil, nil, nil,
)
require.NoError(t, err)
defer nodeSet.Close()
conns := nodeSet.Connections()
healthClients := nodeSet.HealthClients()
var actualNodes []nodeAssertion
for virtualStorage, nodes := range nodeSet {
for _, node := range nodes {
require.NotNil(t, conns[virtualStorage][node.Storage], "connection not found for storage %q", node.Storage)
resp, err := healthClients[virtualStorage][node.Storage].Check(ctx, &grpc_health_v1.HealthCheckRequest{})
assertion := nodeAssertion{
storage: node.Storage,
token: node.Token,
error: err,
}
if resp != nil {
assertion.status = resp.Status
}
actualNodes = append(actualNodes, assertion)
delete(conns[virtualStorage], node.Storage)
delete(healthClients[virtualStorage], node.Storage)
}
}
require.ElementsMatch(t, expectedNodes, actualNodes)
require.Equal(t, Connections{"virtual-storage": {}}, conns, "unexpected connections")
require.Equal(t, nodes.HealthClients{"virtual-storage": {}}, healthClients, "unexpected health clients")
}
|