diff options
Diffstat (limited to 'internal/praefect/replicator.go')
-rw-r--r-- | internal/praefect/replicator.go | 25 |
1 files changed, 19 insertions, 6 deletions
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index 5e5f0f559..2facb1579 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -5,10 +5,15 @@ import ( "fmt" "time" + gitalyauth "gitlab.com/gitlab-org/gitaly/auth" + "gitlab.com/gitlab-org/gitaly/client" + "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" + + "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/internal/helper" - "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/metrics" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" @@ -148,7 +153,7 @@ func (dr defaultReplicator) confirmChecksums(ctx context.Context, primaryClient, type ReplMgr struct { log *logrus.Entry datastore datastore.Datastore - clientConnections *conn.ClientConnections + nodeManager nodes.Manager targetNode string // which replica is this replicator responsible for? replicator Replicator // does the actual replication logic replQueueMetric metrics.Gauge @@ -177,14 +182,14 @@ func WithLatencyMetric(h metrics.Histogram) func(*ReplMgr) { // NewReplMgr initializes a replication manager with the provided dependencies // and options -func NewReplMgr(targetNode string, log *logrus.Entry, datastore datastore.Datastore, c *conn.ClientConnections, opts ...ReplMgrOpt) ReplMgr { +func NewReplMgr(targetNode string, log *logrus.Entry, datastore datastore.Datastore, nodeMgr nodes.Manager, opts ...ReplMgrOpt) ReplMgr { r := ReplMgr{ log: log, datastore: datastore, whitelist: map[string]struct{}{}, replicator: defaultReplicator{log}, targetNode: targetNode, - clientConnections: c, + nodeManager: nodeMgr, replLatencyMetric: prometheus.NewHistogram(prometheus.HistogramOpts{}), replQueueMetric: prometheus.NewGauge(prometheus.GaugeOpts{}), } @@ -278,6 +283,14 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error { reset() } } +func dialNode(addr, token string) (*grpc.ClientConn, error) { + return client.Dial(addr, + []grpc.DialOption{ + grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())), + grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(token)), + }, + ) +} // TODO: errors that occur during replication should be handled better. Logging // is a crutch in this situation. Ideally, we need to update state somewhere @@ -294,13 +307,13 @@ func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob) { return } - targetCC, err := r.clientConnections.GetConnection(job.TargetNode.Storage) + targetCC, err := dialNode(job.TargetNode.Address, job.TargetNode.Token) if err != nil { l.WithError(err).Error("unable to obtain client connection for secondary node in replication job") return } - sourceCC, err := r.clientConnections.GetConnection(job.TargetNode.Storage) + sourceCC, err := dialNode(job.SourceNode.Address, job.SourceNode.Token) if err != nil { l.WithError(err).Error("unable to obtain client connection for primary node in replication job") return |