Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'internal/praefect/replicator.go')
-rw-r--r--internal/praefect/replicator.go25
1 files changed, 19 insertions, 6 deletions
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 5e5f0f559..2facb1579 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -5,10 +5,15 @@ import (
"fmt"
"time"
+ gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
+ "gitlab.com/gitlab-org/gitaly/client"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
+
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
+
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/internal/helper"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/metrics"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
@@ -148,7 +153,7 @@ func (dr defaultReplicator) confirmChecksums(ctx context.Context, primaryClient,
type ReplMgr struct {
log *logrus.Entry
datastore datastore.Datastore
- clientConnections *conn.ClientConnections
+ nodeManager nodes.Manager
targetNode string // which replica is this replicator responsible for?
replicator Replicator // does the actual replication logic
replQueueMetric metrics.Gauge
@@ -177,14 +182,14 @@ func WithLatencyMetric(h metrics.Histogram) func(*ReplMgr) {
// NewReplMgr initializes a replication manager with the provided dependencies
// and options
-func NewReplMgr(targetNode string, log *logrus.Entry, datastore datastore.Datastore, c *conn.ClientConnections, opts ...ReplMgrOpt) ReplMgr {
+func NewReplMgr(targetNode string, log *logrus.Entry, datastore datastore.Datastore, nodeMgr nodes.Manager, opts ...ReplMgrOpt) ReplMgr {
r := ReplMgr{
log: log,
datastore: datastore,
whitelist: map[string]struct{}{},
replicator: defaultReplicator{log},
targetNode: targetNode,
- clientConnections: c,
+ nodeManager: nodeMgr,
replLatencyMetric: prometheus.NewHistogram(prometheus.HistogramOpts{}),
replQueueMetric: prometheus.NewGauge(prometheus.GaugeOpts{}),
}
@@ -278,6 +283,14 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error {
reset()
}
}
+func dialNode(addr, token string) (*grpc.ClientConn, error) {
+ return client.Dial(addr,
+ []grpc.DialOption{
+ grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())),
+ grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(token)),
+ },
+ )
+}
// TODO: errors that occur during replication should be handled better. Logging
// is a crutch in this situation. Ideally, we need to update state somewhere
@@ -294,13 +307,13 @@ func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob) {
return
}
- targetCC, err := r.clientConnections.GetConnection(job.TargetNode.Storage)
+ targetCC, err := dialNode(job.TargetNode.Address, job.TargetNode.Token)
if err != nil {
l.WithError(err).Error("unable to obtain client connection for secondary node in replication job")
return
}
- sourceCC, err := r.clientConnections.GetConnection(job.TargetNode.Storage)
+ sourceCC, err := dialNode(job.SourceNode.Address, job.SourceNode.Token)
if err != nil {
l.WithError(err).Error("unable to obtain client connection for primary node in replication job")
return