diff options
author | Pavlo Strokov <pstrokov@gitlab.com> | 2021-09-21 11:10:44 +0300 |
---|---|---|
committer | Pavlo Strokov <pstrokov@gitlab.com> | 2021-10-05 15:47:28 +0300 |
commit | 1e5f3b25ee1a260f513bacb1a012072e7f405cd1 (patch) | |
tree | 86472266d8ba4d37430818b9ef3ee26161f215ce | |
parent | 7589ebf9bedf761f43f72194c07010ebf8fc661e (diff) |
replication: Process replication events for storages in parallel
Current implementation iterates over gitaly storages and executes
replication events on each sequentially. As some replication events
could take significant amount of time to complete this results into
a long replication lag for other storages as their replication is
blocked. With this change we start to process replication events for
each gitaly storage in parallel, so there is no replication lag between.
If one storage is blocked on the processing of the replication event
all others are allowed to run their replication events in a mean time.
Part of: https://gitlab.com/gitlab-org/gitaly/-/issues/2915
Changelog: changed
-rw-r--r-- | cmd/praefect/main.go | 2 | ||||
-rw-r--r-- | internal/praefect/helper_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 117 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 37 |
4 files changed, 89 insertions, 69 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index abed88189..c92c801cd 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -356,7 +356,7 @@ func run(cfgs []starter.Config, conf config.Config) error { repl = praefect.NewReplMgr( logger, - conf.VirtualStorageNames(), + conf.StorageNames(), queue, rs, healthChecker, diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go index 625a29697..70d320eaa 100644 --- a/internal/praefect/helper_test.go +++ b/internal/praefect/helper_test.go @@ -176,7 +176,7 @@ func runPraefectServer(t testing.TB, ctx context.Context, conf config.Config, op // TODO: run a replmgr for EVERY virtual storage replmgr := NewReplMgr( opt.withLogger, - conf.VirtualStorageNames(), + conf.StorageNames(), opt.withQueue, opt.withRepoStore, opt.withNodeMgr, diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index 71fa2a8f1..50ac357c5 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -407,17 +407,17 @@ func (dr defaultReplicator) RepackFull(ctx context.Context, event datastore.Repl // ReplMgr is a replication manager for handling replication jobs type ReplMgr struct { - log *logrus.Entry - queue datastore.ReplicationEventQueue - hc HealthChecker - nodes NodeSet - virtualStorages []string // replicas this replicator is responsible for - replicator Replicator // does the actual replication logic - replInFlightMetric *prometheus.GaugeVec - replLatencyMetric prommetrics.HistogramVec - replDelayMetric prommetrics.HistogramVec - replJobTimeout time.Duration - dequeueBatchSize uint + log *logrus.Entry + queue datastore.ReplicationEventQueue + hc HealthChecker + nodes NodeSet + storageNamesByVirtualStorage map[string][]string // replicas this replicator is responsible for + replicator Replicator // does the actual replication logic + replInFlightMetric *prometheus.GaugeVec + replLatencyMetric prommetrics.HistogramVec + replDelayMetric prommetrics.HistogramVec + replJobTimeout time.Duration + dequeueBatchSize uint } // ReplMgrOpt allows a replicator to be configured with additional options @@ -446,14 +446,14 @@ func WithDequeueBatchSize(size uint) func(*ReplMgr) { // NewReplMgr initializes a replication manager with the provided dependencies // and options -func NewReplMgr(log *logrus.Entry, virtualStorages []string, queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore, hc HealthChecker, nodes NodeSet, opts ...ReplMgrOpt) ReplMgr { +func NewReplMgr(log *logrus.Entry, storageNames map[string][]string, queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore, hc HealthChecker, nodes NodeSet, opts ...ReplMgrOpt) ReplMgr { r := ReplMgr{ - log: log.WithField("component", "replication_manager"), - queue: queue, - replicator: defaultReplicator{rs: rs, log: log.WithField("component", "replicator")}, - virtualStorages: virtualStorages, - hc: hc, - nodes: nodes, + log: log.WithField("component", "replication_manager"), + queue: queue, + replicator: defaultReplicator{rs: rs, log: log.WithField("component", "replicator")}, + storageNamesByVirtualStorage: storageNames, + hc: hc, + nodes: nodes, replInFlightMetric: prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "gitaly_praefect_replication_jobs", @@ -535,16 +535,15 @@ type BackoffFactory interface { // blocks until all backlog processing goroutines have returned func (r ReplMgr) ProcessBacklog(ctx context.Context, b BackoffFactory) { var wg sync.WaitGroup + defer wg.Wait() - for _, virtualStorage := range r.virtualStorages { + for virtualStorage := range r.storageNamesByVirtualStorage { wg.Add(1) go func(virtualStorage string) { defer wg.Done() r.processBacklog(ctx, b, virtualStorage) }(virtualStorage) } - - wg.Wait() } // ProcessStale starts a background process to acknowledge stale replication jobs. @@ -573,46 +572,64 @@ func (r ReplMgr) ProcessStale(ctx context.Context, checkPeriod, staleAfter time. } func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFactory, virtualStorage string) { - logger := r.log.WithField(logWithVirtualStorage, virtualStorage) - backoff, reset := b.Create() + var wg sync.WaitGroup + defer wg.Wait() + logger := r.log.WithField(logWithVirtualStorage, virtualStorage) logger.Info("processing started") // We should make a graceful shutdown of the processing loop and don't want to interrupt // in-flight operations. That is why we suppress cancellation on the provided context. appCtx := ctx ctx = helper.SuppressCancellation(ctx) - for { - select { - case <-appCtx.Done(): - logger.WithError(appCtx.Err()).Info("processing stopped") - return // processing must be stopped - default: - // proceed with processing - } - var totalEvents int - for _, storage := range r.hc.HealthyNodes()[virtualStorage] { - target, ok := r.nodes[virtualStorage][storage] - if !ok { - logger.WithField("storage", storage).Error("no connection to target storage") - continue - } + for _, storageName := range r.storageNamesByVirtualStorage[virtualStorage] { + wg.Add(1) + go func(storageName string) { + defer wg.Done() - totalEvents += r.handleNode(ctx, virtualStorage, target) - } + for backoff, reset := b.Create(); ; { + select { + case <-appCtx.Done(): + logger.WithError(appCtx.Err()).Info("processing stopped") + return // processing must be stopped + default: + // proceed with processing + } - if totalEvents == 0 { - select { - case <-time.After(backoff()): - continue - case <-appCtx.Done(): - logger.WithError(appCtx.Err()).Info("processing stopped") - return - } - } + healthyStorages := r.hc.HealthyNodes()[virtualStorage] + healthy := false + for _, healthyStorageName := range healthyStorages { + if healthyStorageName != storageName { + continue + } + healthy = true + break + } - reset() + var processedEvents int + if healthy { + target, ok := r.nodes[virtualStorage][storageName] + if !ok { + logger.WithField("storage", storageName).Error("no connection to target storage") + } else { + processedEvents = r.handleNode(ctx, virtualStorage, target) + } + } + + if processedEvents == 0 { + select { + case <-time.After(backoff()): + continue + case <-appCtx.Done(): + logger.WithError(appCtx.Err()).Info("processing stopped") + return + } + } + + reset() + } + }(storageName) } } diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index d52e86f92..3d81fc7ac 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -169,7 +169,7 @@ func TestReplMgr_ProcessBacklog(t *testing.T) { replMgr := NewReplMgr( loggerEntry, - conf.VirtualStorageNames(), + conf.StorageNames(), queue, datastore.MockRepositoryStore{}, nodeMgr, @@ -339,7 +339,7 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) { protoregistry.GitalyProtoPreregistered, ) - replmgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), queue, rs, nodeMgr, NodeSetFromNodeManager(nodeMgr)) + replmgr := NewReplMgr(logEntry, conf.StorageNames(), queue, rs, nodeMgr, NodeSetFromNodeManager(nodeMgr)) prf := NewGRPCServer(conf, logEntry, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, nil, nil, nil) @@ -707,7 +707,7 @@ func TestProcessBacklog_FailedJobs(t *testing.T) { replMgr := NewReplMgr( logEntry, - conf.VirtualStorageNames(), + conf.StorageNames(), queueInterceptor, datastore.MockRepositoryStore{}, nodeMgr, @@ -857,7 +857,7 @@ func TestProcessBacklog_Success(t *testing.T) { replMgr := NewReplMgr( logEntry, - conf.VirtualStorageNames(), + conf.StorageNames(), queueInterceptor, datastore.MockRepositoryStore{}, nodeMgr, @@ -896,23 +896,22 @@ func TestReplMgrProcessBacklog_OnlyHealthyNodes(t *testing.T) { ctx, cancel := testhelper.Context() - first := true + var mtx sync.Mutex + expStorages := map[string]bool{conf.VirtualStorages[0].Nodes[0].Storage: true, conf.VirtualStorages[0].Nodes[2].Storage: true} queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t))) queueInterceptor.OnDequeue(func(_ context.Context, virtualStorageName string, storageName string, _ int, _ datastore.ReplicationEventQueue) ([]datastore.ReplicationEvent, error) { select { case <-ctx.Done(): return nil, nil default: - if first { - first = false - require.Equal(t, conf.VirtualStorages[0].Name, virtualStorageName) - require.Equal(t, conf.VirtualStorages[0].Nodes[0].Storage, storageName) - return nil, nil - } - + mtx.Lock() + defer mtx.Unlock() assert.Equal(t, conf.VirtualStorages[0].Name, virtualStorageName) - assert.Equal(t, conf.VirtualStorages[0].Nodes[2].Storage, storageName) - cancel() + assert.True(t, expStorages[storageName], storageName, storageName) + delete(expStorages, storageName) + if len(expStorages) == 0 { + cancel() + } return nil, nil } }) @@ -924,7 +923,7 @@ func TestReplMgrProcessBacklog_OnlyHealthyNodes(t *testing.T) { replMgr := NewReplMgr( testhelper.DiscardTestEntry(t), - conf.VirtualStorageNames(), + conf.StorageNames(), queueInterceptor, nil, StaticHealthChecker{virtualStorage: {node1.Storage, node3.Storage}}, @@ -936,7 +935,11 @@ func TestReplMgrProcessBacklog_OnlyHealthyNodes(t *testing.T) { }, }, ) - replMgrDone := startProcessBacklog(ctx, replMgr) + replMgrDone := make(chan struct{}) + go func() { + defer close(replMgrDone) + replMgr.ProcessBacklog(ctx, ExpBackoffFactory{Start: time.Minute, Max: time.Minute}) + }() select { case <-ctx.Done(): @@ -995,7 +998,7 @@ func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) { replMgr := NewReplMgr( testhelper.DiscardTestEntry(t), - conf.VirtualStorageNames(), + conf.StorageNames(), queue, datastore.MockRepositoryStore{}, StaticHealthChecker{virtualStorage: {primaryStorage, secondaryStorage}}, |