diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2020-11-10 12:07:07 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2020-11-17 14:12:25 +0300 |
commit | 89ab01bfa9f9e669d439b360d5df29528d7cf9c4 (patch) | |
tree | 61248f1b149ce432e356009069348202f33504c2 | |
parent | 960817d20333a9cacf9d6e7040386fca6bc48393 (diff) |
extract dialing from node manager to a function
Praefect's dialing logic is currently embedded inside node manager.
This extracts the dialing functionality in to a separate function
so it can be used in isolation from other features in node manager
and reused for per repository elector stack.
-rw-r--r-- | internal/praefect/node.go | 43 | ||||
-rw-r--r-- | internal/praefect/node_test.go | 74 | ||||
-rw-r--r-- | internal/praefect/nodes/manager.go | 38 |
3 files changed, 140 insertions, 15 deletions
diff --git a/internal/praefect/node.go b/internal/praefect/node.go index 5de8d8eb9..e285700ab 100644 --- a/internal/praefect/node.go +++ b/internal/praefect/node.go @@ -1,7 +1,13 @@ package praefect import ( + "context" + "fmt" + + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" + "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes/tracker" + "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "google.golang.org/grpc" ) @@ -20,6 +26,15 @@ type Node struct { // NodeSet contains nodes by their virtual storage and storage names. type NodeSet map[string]map[string]Node +// Close closes the connections in the NodeSet. Errors on closing are ignored. +func (set NodeSet) Close() { + for _, nodes := range set { + for _, node := range nodes { + node.Connection.Close() + } + } +} + // NodeSetFromNodeManager converts connections set up by the node manager // in to a NodeSet. This is a temporary adapter required due to cyclic // imports between the praefect and nodes packages. @@ -45,3 +60,31 @@ func toNode(node nodes.Node) Node { Connection: node.GetConnection(), } } + +// DialNodes dials the configured storage nodes. +func DialNodes( + ctx context.Context, + virtualStorages []*config.VirtualStorage, + registry *protoregistry.Registry, + errorTracker tracker.ErrorTracker, +) (NodeSet, error) { + set := make(NodeSet, len(virtualStorages)) + for _, virtualStorage := range virtualStorages { + set[virtualStorage.Name] = make(map[string]Node, len(virtualStorage.Nodes)) + for _, node := range virtualStorage.Nodes { + conn, err := nodes.Dial(ctx, node, registry, errorTracker) + if err != nil { + return nil, fmt.Errorf("dial %q/%q: %w", virtualStorage.Name, node.Storage, err) + } + + set[virtualStorage.Name][node.Storage] = Node{ + Storage: node.Storage, + Address: node.Address, + Token: node.Token, + Connection: conn, + } + } + } + + return set, nil +} diff --git a/internal/praefect/node_test.go b/internal/praefect/node_test.go new file mode 100644 index 000000000..8df700c50 --- /dev/null +++ b/internal/praefect/node_test.go @@ -0,0 +1,74 @@ +package praefect + +import ( + "fmt" + "io/ioutil" + "net" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "google.golang.org/grpc" +) + +func TestDialNodes(t *testing.T) { + ctx, cancel := testhelper.Context() + defer cancel() + + tmp, err := ioutil.TempDir("", "praefect") + require.NoError(t, err) + defer os.RemoveAll(tmp) + + type nodeAssertion struct { + storage string + token string + } + + expectedNodes := []nodeAssertion{ + { + storage: "healthy", + token: "healthy-token", + }, + { + storage: "unhealthy", + token: "unhealthy-token", + }, + } + + var cfgNodes []*config.Node + for _, n := range expectedNodes { + socket := filepath.Join(tmp, n.storage) + ln, err := net.Listen("unix", socket) + require.NoError(t, err) + srv := grpc.NewServer() + defer srv.Stop() + go srv.Serve(ln) + + cfgNodes = append(cfgNodes, &config.Node{ + Storage: n.storage, + Token: n.token, + Address: fmt.Sprintf("%s://%s", ln.Addr().Network(), ln.Addr().String()), + }) + } + + nodeSet, err := DialNodes(ctx, + []*config.VirtualStorage{{Name: "virtual-storage", Nodes: cfgNodes}}, nil, nil, + ) + require.NoError(t, err) + defer nodeSet.Close() + + var actualNodes []nodeAssertion + for _, nodes := range nodeSet { + for _, node := range nodes { + actualNodes = append(actualNodes, nodeAssertion{ + storage: node.Storage, + token: node.Token, + }) + } + } + + require.ElementsMatch(t, expectedNodes, actualNodes) +} diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go index 9013a22d5..d20355ff9 100644 --- a/internal/praefect/nodes/manager.go +++ b/internal/praefect/nodes/manager.go @@ -118,6 +118,24 @@ var ErrPrimaryNotHealthy = errors.New("primary is not healthy") const dialTimeout = 10 * time.Second +// Dial dials a node with the necessary interceptors configured. +func Dial(ctx context.Context, node *config.Node, registry *protoregistry.Registry, errorTracker tracker.ErrorTracker) (*grpc.ClientConn, error) { + streamInterceptors := []grpc.StreamClientInterceptor{ + grpc_prometheus.StreamClientInterceptor, + } + + if errorTracker != nil { + streamInterceptors = append(streamInterceptors, middleware.StreamErrorHandler(registry, errorTracker, node.Storage)) + } + + return client.DialContext(ctx, node.Address, []grpc.DialOption{ + grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec())), + grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(node.Token)), + grpc.WithChainStreamInterceptor(streamInterceptors...), + grpc.WithChainUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor), + }) +} + // NewManager creates a new NodeMgr based on virtual storage configs func NewManager( log *logrus.Entry, @@ -128,31 +146,21 @@ func NewManager( registry *protoregistry.Registry, errorTracker tracker.ErrorTracker, ) (*Mgr, error) { - strategies := make(map[string]leaderElectionStrategy, len(c.VirtualStorages)) + if !c.Failover.Enabled { + errorTracker = nil + } ctx, cancel := context.WithTimeout(context.Background(), dialTimeout) defer cancel() nodes := make(map[string][]Node, len(c.VirtualStorages)) + strategies := make(map[string]leaderElectionStrategy, len(c.VirtualStorages)) for _, virtualStorage := range c.VirtualStorages { log = log.WithField("virtual_storage", virtualStorage.Name) ns := make([]*nodeStatus, 0, len(virtualStorage.Nodes)) for _, node := range virtualStorage.Nodes { - streamInterceptors := []grpc.StreamClientInterceptor{ - grpc_prometheus.StreamClientInterceptor, - } - - if c.Failover.Enabled && errorTracker != nil { - streamInterceptors = append(streamInterceptors, middleware.StreamErrorHandler(registry, errorTracker, node.Storage)) - } - - conn, err := client.DialContext(ctx, node.Address, []grpc.DialOption{ - grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec())), - grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(node.Token)), - grpc.WithChainStreamInterceptor(streamInterceptors...), - grpc.WithChainUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor), - }) + conn, err := Dial(ctx, node, registry, errorTracker) if err != nil { return nil, err } |