diff options
author | Pavlo Strokov <pstrokov@gitlab.com> | 2021-09-28 14:39:51 +0300 |
---|---|---|
committer | Pavlo Strokov <pstrokov@gitlab.com> | 2021-10-05 15:53:04 +0300 |
commit | 064d5be95393430a820b7dad5683ca5467f97ab5 (patch) | |
tree | 46ce1559967b56b2fbb91410164dac0d38e4c648 | |
parent | 60a4ae5ca695db7f3e7325d2bf6286d854c5af75 (diff) |
replication: Process replication events with configurable number of workers
Given that we are able to configure amount of workers used to process
replication events we add that capability into the ReplMgr.
Now it starts configured amount of workers per virtual storage to
process replication events. If amount of workers is higher than
the minimum amount of gitaly storages it will be aligned with the
latest and the log message about will be issued. If amount is not
set the default value of 1 will be used (the old behaviour).
Part of: https://gitlab.com/gitlab-org/gitaly/-/issues/2915
-rw-r--r-- | cmd/praefect/main.go | 1 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 104 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 24 |
3 files changed, 85 insertions, 44 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index c92c801cd..65328d49e 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -364,6 +364,7 @@ func run(cfgs []starter.Config, conf config.Config) error { praefect.WithDelayMetric(delayMetric), praefect.WithLatencyMetric(latencyMetric), praefect.WithDequeueBatchSize(conf.Replication.BatchSize), + praefect.WithParallelStorageProcessingWorkers(conf.Replication.ParallelStorageProcessingWorkers), ) srvFactory = praefect.NewServerFactory( conf, diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index 50ac357c5..0436cf431 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -407,17 +407,18 @@ func (dr defaultReplicator) RepackFull(ctx context.Context, event datastore.Repl // ReplMgr is a replication manager for handling replication jobs type ReplMgr struct { - log *logrus.Entry - queue datastore.ReplicationEventQueue - hc HealthChecker - nodes NodeSet - storageNamesByVirtualStorage map[string][]string // replicas this replicator is responsible for - replicator Replicator // does the actual replication logic - replInFlightMetric *prometheus.GaugeVec - replLatencyMetric prommetrics.HistogramVec - replDelayMetric prommetrics.HistogramVec - replJobTimeout time.Duration - dequeueBatchSize uint + log *logrus.Entry + queue datastore.ReplicationEventQueue + hc HealthChecker + nodes NodeSet + storageNamesByVirtualStorage map[string][]string // replicas this replicator is responsible for + replicator Replicator // does the actual replication logic + replInFlightMetric *prometheus.GaugeVec + replLatencyMetric prommetrics.HistogramVec + replDelayMetric prommetrics.HistogramVec + replJobTimeout time.Duration + dequeueBatchSize uint + parallelStorageProcessingWorkers uint } // ReplMgrOpt allows a replicator to be configured with additional options @@ -444,6 +445,14 @@ func WithDequeueBatchSize(size uint) func(*ReplMgr) { } } +// WithParallelStorageProcessingWorkers configures the number of workers used to process replication +// events per virtual storage. +func WithParallelStorageProcessingWorkers(n uint) func(*ReplMgr) { + return func(m *ReplMgr) { + m.parallelStorageProcessingWorkers = n + } +} + // NewReplMgr initializes a replication manager with the provided dependencies // and options func NewReplMgr(log *logrus.Entry, storageNames map[string][]string, queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore, hc HealthChecker, nodes NodeSet, opts ...ReplMgrOpt) ReplMgr { @@ -460,15 +469,26 @@ func NewReplMgr(log *logrus.Entry, storageNames map[string][]string, queue datas Help: "Number of replication jobs in flight.", }, []string{"virtual_storage", "gitaly_storage", "change_type"}, ), - replLatencyMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}), - replDelayMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}), - dequeueBatchSize: config.DefaultReplicationConfig().BatchSize, + replLatencyMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}), + replDelayMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}), + dequeueBatchSize: config.DefaultReplicationConfig().BatchSize, + parallelStorageProcessingWorkers: 1, } for _, opt := range opts { opt(&r) } + for virtual, sn := range storageNames { + if len(sn) < int(r.parallelStorageProcessingWorkers) { + r.log.Infof("parallel processing workers decreased from %d "+ + "configured with config to %d according to minumal amount of "+ + "storages in the virtual storage %q", + r.parallelStorageProcessingWorkers, len(storageNames), virtual, + ) + r.parallelStorageProcessingWorkers = uint(len(storageNames)) + } + } return r } @@ -583,24 +603,36 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFactory, virtualSt appCtx := ctx ctx = helper.SuppressCancellation(ctx) - for _, storageName := range r.storageNamesByVirtualStorage[virtualStorage] { + storageNames := r.storageNamesByVirtualStorage[virtualStorage] + type StorageProcessing struct { + StorageName string + Backoff + BackoffReset + } + storagesQueue := make(chan StorageProcessing, len(storageNames)) + for _, storageName := range storageNames { + backoff, reset := b.Create() + storagesQueue <- StorageProcessing{StorageName: storageName, Backoff: backoff, BackoffReset: reset} + } + + for i := uint(0); i < r.parallelStorageProcessingWorkers; i++ { wg.Add(1) - go func(storageName string) { + go func() { defer wg.Done() - for backoff, reset := b.Create(); ; { + for { + var storageProcessing StorageProcessing select { case <-appCtx.Done(): logger.WithError(appCtx.Err()).Info("processing stopped") - return // processing must be stopped - default: - // proceed with processing + return + case storageProcessing = <-storagesQueue: } healthyStorages := r.hc.HealthyNodes()[virtualStorage] healthy := false for _, healthyStorageName := range healthyStorages { - if healthyStorageName != storageName { + if healthyStorageName != storageProcessing.StorageName { continue } healthy = true @@ -609,27 +641,33 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFactory, virtualSt var processedEvents int if healthy { - target, ok := r.nodes[virtualStorage][storageName] + target, ok := r.nodes[virtualStorage][storageProcessing.StorageName] if !ok { - logger.WithField("storage", storageName).Error("no connection to target storage") + logger.WithField("storage", storageProcessing.StorageName).Error("no connection to target storage") } else { processedEvents = r.handleNode(ctx, virtualStorage, target) } } if processedEvents == 0 { - select { - case <-time.After(backoff()): - continue - case <-appCtx.Done(): - logger.WithError(appCtx.Err()).Info("processing stopped") - return - } + // if the storage is not healthy or if there is no events to + // process we don't put it back to the queue immediately but + // wait for certain time period first. + go func() { + select { + case <-time.After(storageProcessing.Backoff()): + storagesQueue <- storageProcessing + case <-appCtx.Done(): + logger.WithError(appCtx.Err()).Info("processing stopped") + return + } + }() + } else { + storageProcessing.BackoffReset() + storagesQueue <- storageProcessing } - - reset() } - }(storageName) + }() } } diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 3d81fc7ac..043838b49 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -176,29 +176,35 @@ func TestReplMgr_ProcessBacklog(t *testing.T) { NodeSetFromNodeManager(nodeMgr), WithLatencyMetric(&mockReplicationLatencyHistogramVec), WithDelayMetric(&mockReplicationDelayHistogramVec), + WithParallelStorageProcessingWorkers(100), ) - replMgr.ProcessBacklog(ctx, ExpBackoffFactory{Start: time.Hour, Max: 0}) + replMgr.ProcessBacklog(ctx, noopBackoffFactory{}) logEntries := loggerHook.AllEntries() - require.True(t, len(logEntries) > 3, "expected at least 4 log entries to be present") + require.True(t, len(logEntries) > 4, "expected at least 5 log entries to be present") + require.Equal(t, + []interface{}{`parallel processing workers decreased from 100 configured with config to 1 according to minumal amount of storages in the virtual storage "virtual"`}, + []interface{}{logEntries[0].Message}, + ) + require.Equal(t, []interface{}{"processing started", "virtual"}, - []interface{}{logEntries[0].Message, logEntries[0].Data["virtual_storage"]}, + []interface{}{logEntries[1].Message, logEntries[1].Data["virtual_storage"]}, ) require.Equal(t, []interface{}{"replication job processing started", "virtual", "correlation-id"}, - []interface{}{logEntries[1].Message, logEntries[1].Data["virtual_storage"], logEntries[1].Data[logWithCorrID]}, + []interface{}{logEntries[2].Message, logEntries[2].Data["virtual_storage"], logEntries[2].Data[logWithCorrID]}, ) - dequeuedEvent := logEntries[1].Data["event"].(datastore.ReplicationEvent) + dequeuedEvent := logEntries[2].Data["event"].(datastore.ReplicationEvent) require.Equal(t, datastore.JobStateInProgress, dequeuedEvent.State) require.Equal(t, []string{"backup", "primary"}, []string{dequeuedEvent.Job.TargetNodeStorage, dequeuedEvent.Job.SourceNodeStorage}) require.Equal(t, []interface{}{"replication job processing finished", "virtual", datastore.JobStateCompleted, "correlation-id"}, - []interface{}{logEntries[2].Message, logEntries[2].Data["virtual_storage"], logEntries[2].Data["new_state"], logEntries[2].Data[logWithCorrID]}, + []interface{}{logEntries[3].Message, logEntries[3].Data["virtual_storage"], logEntries[3].Data["new_state"], logEntries[3].Data[logWithCorrID]}, ) replicatedPath := filepath.Join(backupCfg.Storages[0].Path, testRepo.GetRelativePath()) @@ -935,11 +941,7 @@ func TestReplMgrProcessBacklog_OnlyHealthyNodes(t *testing.T) { }, }, ) - replMgrDone := make(chan struct{}) - go func() { - defer close(replMgrDone) - replMgr.ProcessBacklog(ctx, ExpBackoffFactory{Start: time.Minute, Max: time.Minute}) - }() + replMgrDone := startProcessBacklog(ctx, replMgr) select { case <-ctx.Done(): |