diff options
Diffstat (limited to 'cmd/praefect/main.go')
-rw-r--r-- | cmd/praefect/main.go | 27 |
1 files changed, 10 insertions, 17 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index a0868f36d..831f196d6 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -33,6 +33,9 @@ import ( "fmt" "os" "strings" + "time" + + "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/internal/bootstrap" @@ -41,7 +44,6 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/log" "gitlab.com/gitlab-org/gitaly/internal/praefect" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" - "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/metrics" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" @@ -128,20 +130,11 @@ func configure() (config.Config, error) { } func run(cfgs []starter.Config, conf config.Config) error { - clientConnections := conn.NewClientConnections() - - for _, virtualStorage := range conf.VirtualStorages { - for _, node := range virtualStorage.Nodes { - if _, err := clientConnections.GetConnection(node.Storage); err == nil { - continue - } - if err := clientConnections.RegisterNode(node.Storage, node.Address, node.Token); err != nil { - return fmt.Errorf("failed to register %s: %s", node.Address, err) - } - - logger.WithField("node_address", node.Address).Info("registered gitaly node") - } + nodeManager, err := nodes.NewManager(logger, conf.VirtualStorages) + if err != nil { + return err } + nodeManager.Start(1*time.Second, 3*time.Second) latencyMetric, err := metrics.RegisterReplicationLatency(conf.Prometheus) if err != nil { @@ -156,15 +149,15 @@ func run(cfgs []starter.Config, conf config.Config) error { var ( // top level server dependencies ds = datastore.NewInMemory(conf) - coordinator = praefect.NewCoordinator(logger, ds, clientConnections, conf, protoregistry.GitalyProtoFileDescriptors...) + coordinator = praefect.NewCoordinator(logger, ds, nodeManager, conf, protoregistry.GitalyProtoFileDescriptors...) repl = praefect.NewReplMgr( "default", logger, ds, - clientConnections, + nodeManager, praefect.WithLatencyMetric(latencyMetric), praefect.WithQueueMetric(queueMetric)) - srv = praefect.NewServer(coordinator, repl, nil, logger, clientConnections, conf) + srv = praefect.NewServer(coordinator, repl, nil, logger, nodeManager, conf) serverErrors = make(chan error, 1) ) |