Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2020-11-10 12:07:07 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2020-11-17 14:12:25 +0300
commit89ab01bfa9f9e669d439b360d5df29528d7cf9c4 (patch)
tree61248f1b149ce432e356009069348202f33504c2
parent960817d20333a9cacf9d6e7040386fca6bc48393 (diff)
extract dialing from node manager to a function
Praefect's dialing logic is currently embedded inside node manager. This extracts the dialing functionality in to a separate function so it can be used in isolation from other features in node manager and reused for per repository elector stack.
-rw-r--r--internal/praefect/node.go43
-rw-r--r--internal/praefect/node_test.go74
-rw-r--r--internal/praefect/nodes/manager.go38
3 files changed, 140 insertions, 15 deletions
diff --git a/internal/praefect/node.go b/internal/praefect/node.go
index 5de8d8eb9..e285700ab 100644
--- a/internal/praefect/node.go
+++ b/internal/praefect/node.go
@@ -1,7 +1,13 @@
package praefect
import (
+ "context"
+ "fmt"
+
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes/tracker"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"google.golang.org/grpc"
)
@@ -20,6 +26,15 @@ type Node struct {
// NodeSet contains nodes by their virtual storage and storage names.
type NodeSet map[string]map[string]Node
+// Close closes the connections in the NodeSet. Errors on closing are ignored.
+func (set NodeSet) Close() {
+ for _, nodes := range set {
+ for _, node := range nodes {
+ node.Connection.Close()
+ }
+ }
+}
+
// NodeSetFromNodeManager converts connections set up by the node manager
// in to a NodeSet. This is a temporary adapter required due to cyclic
// imports between the praefect and nodes packages.
@@ -45,3 +60,31 @@ func toNode(node nodes.Node) Node {
Connection: node.GetConnection(),
}
}
+
+// DialNodes dials the configured storage nodes.
+func DialNodes(
+ ctx context.Context,
+ virtualStorages []*config.VirtualStorage,
+ registry *protoregistry.Registry,
+ errorTracker tracker.ErrorTracker,
+) (NodeSet, error) {
+ set := make(NodeSet, len(virtualStorages))
+ for _, virtualStorage := range virtualStorages {
+ set[virtualStorage.Name] = make(map[string]Node, len(virtualStorage.Nodes))
+ for _, node := range virtualStorage.Nodes {
+ conn, err := nodes.Dial(ctx, node, registry, errorTracker)
+ if err != nil {
+ return nil, fmt.Errorf("dial %q/%q: %w", virtualStorage.Name, node.Storage, err)
+ }
+
+ set[virtualStorage.Name][node.Storage] = Node{
+ Storage: node.Storage,
+ Address: node.Address,
+ Token: node.Token,
+ Connection: conn,
+ }
+ }
+ }
+
+ return set, nil
+}
diff --git a/internal/praefect/node_test.go b/internal/praefect/node_test.go
new file mode 100644
index 000000000..8df700c50
--- /dev/null
+++ b/internal/praefect/node_test.go
@@ -0,0 +1,74 @@
+package praefect
+
+import (
+ "fmt"
+ "io/ioutil"
+ "net"
+ "os"
+ "path/filepath"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+ "google.golang.org/grpc"
+)
+
+func TestDialNodes(t *testing.T) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ tmp, err := ioutil.TempDir("", "praefect")
+ require.NoError(t, err)
+ defer os.RemoveAll(tmp)
+
+ type nodeAssertion struct {
+ storage string
+ token string
+ }
+
+ expectedNodes := []nodeAssertion{
+ {
+ storage: "healthy",
+ token: "healthy-token",
+ },
+ {
+ storage: "unhealthy",
+ token: "unhealthy-token",
+ },
+ }
+
+ var cfgNodes []*config.Node
+ for _, n := range expectedNodes {
+ socket := filepath.Join(tmp, n.storage)
+ ln, err := net.Listen("unix", socket)
+ require.NoError(t, err)
+ srv := grpc.NewServer()
+ defer srv.Stop()
+ go srv.Serve(ln)
+
+ cfgNodes = append(cfgNodes, &config.Node{
+ Storage: n.storage,
+ Token: n.token,
+ Address: fmt.Sprintf("%s://%s", ln.Addr().Network(), ln.Addr().String()),
+ })
+ }
+
+ nodeSet, err := DialNodes(ctx,
+ []*config.VirtualStorage{{Name: "virtual-storage", Nodes: cfgNodes}}, nil, nil,
+ )
+ require.NoError(t, err)
+ defer nodeSet.Close()
+
+ var actualNodes []nodeAssertion
+ for _, nodes := range nodeSet {
+ for _, node := range nodes {
+ actualNodes = append(actualNodes, nodeAssertion{
+ storage: node.Storage,
+ token: node.Token,
+ })
+ }
+ }
+
+ require.ElementsMatch(t, expectedNodes, actualNodes)
+}
diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go
index 9013a22d5..d20355ff9 100644
--- a/internal/praefect/nodes/manager.go
+++ b/internal/praefect/nodes/manager.go
@@ -118,6 +118,24 @@ var ErrPrimaryNotHealthy = errors.New("primary is not healthy")
const dialTimeout = 10 * time.Second
+// Dial dials a node with the necessary interceptors configured.
+func Dial(ctx context.Context, node *config.Node, registry *protoregistry.Registry, errorTracker tracker.ErrorTracker) (*grpc.ClientConn, error) {
+ streamInterceptors := []grpc.StreamClientInterceptor{
+ grpc_prometheus.StreamClientInterceptor,
+ }
+
+ if errorTracker != nil {
+ streamInterceptors = append(streamInterceptors, middleware.StreamErrorHandler(registry, errorTracker, node.Storage))
+ }
+
+ return client.DialContext(ctx, node.Address, []grpc.DialOption{
+ grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec())),
+ grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(node.Token)),
+ grpc.WithChainStreamInterceptor(streamInterceptors...),
+ grpc.WithChainUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
+ })
+}
+
// NewManager creates a new NodeMgr based on virtual storage configs
func NewManager(
log *logrus.Entry,
@@ -128,31 +146,21 @@ func NewManager(
registry *protoregistry.Registry,
errorTracker tracker.ErrorTracker,
) (*Mgr, error) {
- strategies := make(map[string]leaderElectionStrategy, len(c.VirtualStorages))
+ if !c.Failover.Enabled {
+ errorTracker = nil
+ }
ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
defer cancel()
nodes := make(map[string][]Node, len(c.VirtualStorages))
+ strategies := make(map[string]leaderElectionStrategy, len(c.VirtualStorages))
for _, virtualStorage := range c.VirtualStorages {
log = log.WithField("virtual_storage", virtualStorage.Name)
ns := make([]*nodeStatus, 0, len(virtualStorage.Nodes))
for _, node := range virtualStorage.Nodes {
- streamInterceptors := []grpc.StreamClientInterceptor{
- grpc_prometheus.StreamClientInterceptor,
- }
-
- if c.Failover.Enabled && errorTracker != nil {
- streamInterceptors = append(streamInterceptors, middleware.StreamErrorHandler(registry, errorTracker, node.Storage))
- }
-
- conn, err := client.DialContext(ctx, node.Address, []grpc.DialOption{
- grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec())),
- grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(node.Token)),
- grpc.WithChainStreamInterceptor(streamInterceptors...),
- grpc.WithChainUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
- })
+ conn, err := Dial(ctx, node, registry, errorTracker)
if err != nil {
return nil, err
}