Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2021-09-21 11:10:44 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2021-10-05 15:47:28 +0300
commit1e5f3b25ee1a260f513bacb1a012072e7f405cd1 (patch)
tree86472266d8ba4d37430818b9ef3ee26161f215ce
parent7589ebf9bedf761f43f72194c07010ebf8fc661e (diff)
replication: Process replication events for storages in parallel
Current implementation iterates over gitaly storages and executes replication events on each sequentially. As some replication events could take significant amount of time to complete this results into a long replication lag for other storages as their replication is blocked. With this change we start to process replication events for each gitaly storage in parallel, so there is no replication lag between. If one storage is blocked on the processing of the replication event all others are allowed to run their replication events in a mean time. Part of: https://gitlab.com/gitlab-org/gitaly/-/issues/2915 Changelog: changed
-rw-r--r--cmd/praefect/main.go2
-rw-r--r--internal/praefect/helper_test.go2
-rw-r--r--internal/praefect/replicator.go117
-rw-r--r--internal/praefect/replicator_test.go37
4 files changed, 89 insertions, 69 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index abed88189..c92c801cd 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -356,7 +356,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
repl = praefect.NewReplMgr(
logger,
- conf.VirtualStorageNames(),
+ conf.StorageNames(),
queue,
rs,
healthChecker,
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index 625a29697..70d320eaa 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -176,7 +176,7 @@ func runPraefectServer(t testing.TB, ctx context.Context, conf config.Config, op
// TODO: run a replmgr for EVERY virtual storage
replmgr := NewReplMgr(
opt.withLogger,
- conf.VirtualStorageNames(),
+ conf.StorageNames(),
opt.withQueue,
opt.withRepoStore,
opt.withNodeMgr,
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 71fa2a8f1..50ac357c5 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -407,17 +407,17 @@ func (dr defaultReplicator) RepackFull(ctx context.Context, event datastore.Repl
// ReplMgr is a replication manager for handling replication jobs
type ReplMgr struct {
- log *logrus.Entry
- queue datastore.ReplicationEventQueue
- hc HealthChecker
- nodes NodeSet
- virtualStorages []string // replicas this replicator is responsible for
- replicator Replicator // does the actual replication logic
- replInFlightMetric *prometheus.GaugeVec
- replLatencyMetric prommetrics.HistogramVec
- replDelayMetric prommetrics.HistogramVec
- replJobTimeout time.Duration
- dequeueBatchSize uint
+ log *logrus.Entry
+ queue datastore.ReplicationEventQueue
+ hc HealthChecker
+ nodes NodeSet
+ storageNamesByVirtualStorage map[string][]string // replicas this replicator is responsible for
+ replicator Replicator // does the actual replication logic
+ replInFlightMetric *prometheus.GaugeVec
+ replLatencyMetric prommetrics.HistogramVec
+ replDelayMetric prommetrics.HistogramVec
+ replJobTimeout time.Duration
+ dequeueBatchSize uint
}
// ReplMgrOpt allows a replicator to be configured with additional options
@@ -446,14 +446,14 @@ func WithDequeueBatchSize(size uint) func(*ReplMgr) {
// NewReplMgr initializes a replication manager with the provided dependencies
// and options
-func NewReplMgr(log *logrus.Entry, virtualStorages []string, queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore, hc HealthChecker, nodes NodeSet, opts ...ReplMgrOpt) ReplMgr {
+func NewReplMgr(log *logrus.Entry, storageNames map[string][]string, queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore, hc HealthChecker, nodes NodeSet, opts ...ReplMgrOpt) ReplMgr {
r := ReplMgr{
- log: log.WithField("component", "replication_manager"),
- queue: queue,
- replicator: defaultReplicator{rs: rs, log: log.WithField("component", "replicator")},
- virtualStorages: virtualStorages,
- hc: hc,
- nodes: nodes,
+ log: log.WithField("component", "replication_manager"),
+ queue: queue,
+ replicator: defaultReplicator{rs: rs, log: log.WithField("component", "replicator")},
+ storageNamesByVirtualStorage: storageNames,
+ hc: hc,
+ nodes: nodes,
replInFlightMetric: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "gitaly_praefect_replication_jobs",
@@ -535,16 +535,15 @@ type BackoffFactory interface {
// blocks until all backlog processing goroutines have returned
func (r ReplMgr) ProcessBacklog(ctx context.Context, b BackoffFactory) {
var wg sync.WaitGroup
+ defer wg.Wait()
- for _, virtualStorage := range r.virtualStorages {
+ for virtualStorage := range r.storageNamesByVirtualStorage {
wg.Add(1)
go func(virtualStorage string) {
defer wg.Done()
r.processBacklog(ctx, b, virtualStorage)
}(virtualStorage)
}
-
- wg.Wait()
}
// ProcessStale starts a background process to acknowledge stale replication jobs.
@@ -573,46 +572,64 @@ func (r ReplMgr) ProcessStale(ctx context.Context, checkPeriod, staleAfter time.
}
func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFactory, virtualStorage string) {
- logger := r.log.WithField(logWithVirtualStorage, virtualStorage)
- backoff, reset := b.Create()
+ var wg sync.WaitGroup
+ defer wg.Wait()
+ logger := r.log.WithField(logWithVirtualStorage, virtualStorage)
logger.Info("processing started")
// We should make a graceful shutdown of the processing loop and don't want to interrupt
// in-flight operations. That is why we suppress cancellation on the provided context.
appCtx := ctx
ctx = helper.SuppressCancellation(ctx)
- for {
- select {
- case <-appCtx.Done():
- logger.WithError(appCtx.Err()).Info("processing stopped")
- return // processing must be stopped
- default:
- // proceed with processing
- }
- var totalEvents int
- for _, storage := range r.hc.HealthyNodes()[virtualStorage] {
- target, ok := r.nodes[virtualStorage][storage]
- if !ok {
- logger.WithField("storage", storage).Error("no connection to target storage")
- continue
- }
+ for _, storageName := range r.storageNamesByVirtualStorage[virtualStorage] {
+ wg.Add(1)
+ go func(storageName string) {
+ defer wg.Done()
- totalEvents += r.handleNode(ctx, virtualStorage, target)
- }
+ for backoff, reset := b.Create(); ; {
+ select {
+ case <-appCtx.Done():
+ logger.WithError(appCtx.Err()).Info("processing stopped")
+ return // processing must be stopped
+ default:
+ // proceed with processing
+ }
- if totalEvents == 0 {
- select {
- case <-time.After(backoff()):
- continue
- case <-appCtx.Done():
- logger.WithError(appCtx.Err()).Info("processing stopped")
- return
- }
- }
+ healthyStorages := r.hc.HealthyNodes()[virtualStorage]
+ healthy := false
+ for _, healthyStorageName := range healthyStorages {
+ if healthyStorageName != storageName {
+ continue
+ }
+ healthy = true
+ break
+ }
- reset()
+ var processedEvents int
+ if healthy {
+ target, ok := r.nodes[virtualStorage][storageName]
+ if !ok {
+ logger.WithField("storage", storageName).Error("no connection to target storage")
+ } else {
+ processedEvents = r.handleNode(ctx, virtualStorage, target)
+ }
+ }
+
+ if processedEvents == 0 {
+ select {
+ case <-time.After(backoff()):
+ continue
+ case <-appCtx.Done():
+ logger.WithError(appCtx.Err()).Info("processing stopped")
+ return
+ }
+ }
+
+ reset()
+ }
+ }(storageName)
}
}
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index d52e86f92..3d81fc7ac 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -169,7 +169,7 @@ func TestReplMgr_ProcessBacklog(t *testing.T) {
replMgr := NewReplMgr(
loggerEntry,
- conf.VirtualStorageNames(),
+ conf.StorageNames(),
queue,
datastore.MockRepositoryStore{},
nodeMgr,
@@ -339,7 +339,7 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) {
protoregistry.GitalyProtoPreregistered,
)
- replmgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), queue, rs, nodeMgr, NodeSetFromNodeManager(nodeMgr))
+ replmgr := NewReplMgr(logEntry, conf.StorageNames(), queue, rs, nodeMgr, NodeSetFromNodeManager(nodeMgr))
prf := NewGRPCServer(conf, logEntry, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, nil, nil, nil)
@@ -707,7 +707,7 @@ func TestProcessBacklog_FailedJobs(t *testing.T) {
replMgr := NewReplMgr(
logEntry,
- conf.VirtualStorageNames(),
+ conf.StorageNames(),
queueInterceptor,
datastore.MockRepositoryStore{},
nodeMgr,
@@ -857,7 +857,7 @@ func TestProcessBacklog_Success(t *testing.T) {
replMgr := NewReplMgr(
logEntry,
- conf.VirtualStorageNames(),
+ conf.StorageNames(),
queueInterceptor,
datastore.MockRepositoryStore{},
nodeMgr,
@@ -896,23 +896,22 @@ func TestReplMgrProcessBacklog_OnlyHealthyNodes(t *testing.T) {
ctx, cancel := testhelper.Context()
- first := true
+ var mtx sync.Mutex
+ expStorages := map[string]bool{conf.VirtualStorages[0].Nodes[0].Storage: true, conf.VirtualStorages[0].Nodes[2].Storage: true}
queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)))
queueInterceptor.OnDequeue(func(_ context.Context, virtualStorageName string, storageName string, _ int, _ datastore.ReplicationEventQueue) ([]datastore.ReplicationEvent, error) {
select {
case <-ctx.Done():
return nil, nil
default:
- if first {
- first = false
- require.Equal(t, conf.VirtualStorages[0].Name, virtualStorageName)
- require.Equal(t, conf.VirtualStorages[0].Nodes[0].Storage, storageName)
- return nil, nil
- }
-
+ mtx.Lock()
+ defer mtx.Unlock()
assert.Equal(t, conf.VirtualStorages[0].Name, virtualStorageName)
- assert.Equal(t, conf.VirtualStorages[0].Nodes[2].Storage, storageName)
- cancel()
+ assert.True(t, expStorages[storageName], storageName, storageName)
+ delete(expStorages, storageName)
+ if len(expStorages) == 0 {
+ cancel()
+ }
return nil, nil
}
})
@@ -924,7 +923,7 @@ func TestReplMgrProcessBacklog_OnlyHealthyNodes(t *testing.T) {
replMgr := NewReplMgr(
testhelper.DiscardTestEntry(t),
- conf.VirtualStorageNames(),
+ conf.StorageNames(),
queueInterceptor,
nil,
StaticHealthChecker{virtualStorage: {node1.Storage, node3.Storage}},
@@ -936,7 +935,11 @@ func TestReplMgrProcessBacklog_OnlyHealthyNodes(t *testing.T) {
},
},
)
- replMgrDone := startProcessBacklog(ctx, replMgr)
+ replMgrDone := make(chan struct{})
+ go func() {
+ defer close(replMgrDone)
+ replMgr.ProcessBacklog(ctx, ExpBackoffFactory{Start: time.Minute, Max: time.Minute})
+ }()
select {
case <-ctx.Done():
@@ -995,7 +998,7 @@ func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) {
replMgr := NewReplMgr(
testhelper.DiscardTestEntry(t),
- conf.VirtualStorageNames(),
+ conf.StorageNames(),
queue,
datastore.MockRepositoryStore{},
StaticHealthChecker{virtualStorage: {primaryStorage, secondaryStorage}},