Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2021-09-28 14:39:51 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2021-10-05 15:53:04 +0300
commit064d5be95393430a820b7dad5683ca5467f97ab5 (patch)
tree46ce1559967b56b2fbb91410164dac0d38e4c648
parent60a4ae5ca695db7f3e7325d2bf6286d854c5af75 (diff)
replication: Process replication events with configurable number of workers
Given that we are able to configure amount of workers used to process replication events we add that capability into the ReplMgr. Now it starts configured amount of workers per virtual storage to process replication events. If amount of workers is higher than the minimum amount of gitaly storages it will be aligned with the latest and the log message about will be issued. If amount is not set the default value of 1 will be used (the old behaviour). Part of: https://gitlab.com/gitlab-org/gitaly/-/issues/2915
-rw-r--r--cmd/praefect/main.go1
-rw-r--r--internal/praefect/replicator.go104
-rw-r--r--internal/praefect/replicator_test.go24
3 files changed, 85 insertions, 44 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index c92c801cd..65328d49e 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -364,6 +364,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
praefect.WithDelayMetric(delayMetric),
praefect.WithLatencyMetric(latencyMetric),
praefect.WithDequeueBatchSize(conf.Replication.BatchSize),
+ praefect.WithParallelStorageProcessingWorkers(conf.Replication.ParallelStorageProcessingWorkers),
)
srvFactory = praefect.NewServerFactory(
conf,
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 50ac357c5..0436cf431 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -407,17 +407,18 @@ func (dr defaultReplicator) RepackFull(ctx context.Context, event datastore.Repl
// ReplMgr is a replication manager for handling replication jobs
type ReplMgr struct {
- log *logrus.Entry
- queue datastore.ReplicationEventQueue
- hc HealthChecker
- nodes NodeSet
- storageNamesByVirtualStorage map[string][]string // replicas this replicator is responsible for
- replicator Replicator // does the actual replication logic
- replInFlightMetric *prometheus.GaugeVec
- replLatencyMetric prommetrics.HistogramVec
- replDelayMetric prommetrics.HistogramVec
- replJobTimeout time.Duration
- dequeueBatchSize uint
+ log *logrus.Entry
+ queue datastore.ReplicationEventQueue
+ hc HealthChecker
+ nodes NodeSet
+ storageNamesByVirtualStorage map[string][]string // replicas this replicator is responsible for
+ replicator Replicator // does the actual replication logic
+ replInFlightMetric *prometheus.GaugeVec
+ replLatencyMetric prommetrics.HistogramVec
+ replDelayMetric prommetrics.HistogramVec
+ replJobTimeout time.Duration
+ dequeueBatchSize uint
+ parallelStorageProcessingWorkers uint
}
// ReplMgrOpt allows a replicator to be configured with additional options
@@ -444,6 +445,14 @@ func WithDequeueBatchSize(size uint) func(*ReplMgr) {
}
}
+// WithParallelStorageProcessingWorkers configures the number of workers used to process replication
+// events per virtual storage.
+func WithParallelStorageProcessingWorkers(n uint) func(*ReplMgr) {
+ return func(m *ReplMgr) {
+ m.parallelStorageProcessingWorkers = n
+ }
+}
+
// NewReplMgr initializes a replication manager with the provided dependencies
// and options
func NewReplMgr(log *logrus.Entry, storageNames map[string][]string, queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore, hc HealthChecker, nodes NodeSet, opts ...ReplMgrOpt) ReplMgr {
@@ -460,15 +469,26 @@ func NewReplMgr(log *logrus.Entry, storageNames map[string][]string, queue datas
Help: "Number of replication jobs in flight.",
}, []string{"virtual_storage", "gitaly_storage", "change_type"},
),
- replLatencyMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}),
- replDelayMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}),
- dequeueBatchSize: config.DefaultReplicationConfig().BatchSize,
+ replLatencyMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}),
+ replDelayMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}),
+ dequeueBatchSize: config.DefaultReplicationConfig().BatchSize,
+ parallelStorageProcessingWorkers: 1,
}
for _, opt := range opts {
opt(&r)
}
+ for virtual, sn := range storageNames {
+ if len(sn) < int(r.parallelStorageProcessingWorkers) {
+ r.log.Infof("parallel processing workers decreased from %d "+
+ "configured with config to %d according to minumal amount of "+
+ "storages in the virtual storage %q",
+ r.parallelStorageProcessingWorkers, len(storageNames), virtual,
+ )
+ r.parallelStorageProcessingWorkers = uint(len(storageNames))
+ }
+ }
return r
}
@@ -583,24 +603,36 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFactory, virtualSt
appCtx := ctx
ctx = helper.SuppressCancellation(ctx)
- for _, storageName := range r.storageNamesByVirtualStorage[virtualStorage] {
+ storageNames := r.storageNamesByVirtualStorage[virtualStorage]
+ type StorageProcessing struct {
+ StorageName string
+ Backoff
+ BackoffReset
+ }
+ storagesQueue := make(chan StorageProcessing, len(storageNames))
+ for _, storageName := range storageNames {
+ backoff, reset := b.Create()
+ storagesQueue <- StorageProcessing{StorageName: storageName, Backoff: backoff, BackoffReset: reset}
+ }
+
+ for i := uint(0); i < r.parallelStorageProcessingWorkers; i++ {
wg.Add(1)
- go func(storageName string) {
+ go func() {
defer wg.Done()
- for backoff, reset := b.Create(); ; {
+ for {
+ var storageProcessing StorageProcessing
select {
case <-appCtx.Done():
logger.WithError(appCtx.Err()).Info("processing stopped")
- return // processing must be stopped
- default:
- // proceed with processing
+ return
+ case storageProcessing = <-storagesQueue:
}
healthyStorages := r.hc.HealthyNodes()[virtualStorage]
healthy := false
for _, healthyStorageName := range healthyStorages {
- if healthyStorageName != storageName {
+ if healthyStorageName != storageProcessing.StorageName {
continue
}
healthy = true
@@ -609,27 +641,33 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFactory, virtualSt
var processedEvents int
if healthy {
- target, ok := r.nodes[virtualStorage][storageName]
+ target, ok := r.nodes[virtualStorage][storageProcessing.StorageName]
if !ok {
- logger.WithField("storage", storageName).Error("no connection to target storage")
+ logger.WithField("storage", storageProcessing.StorageName).Error("no connection to target storage")
} else {
processedEvents = r.handleNode(ctx, virtualStorage, target)
}
}
if processedEvents == 0 {
- select {
- case <-time.After(backoff()):
- continue
- case <-appCtx.Done():
- logger.WithError(appCtx.Err()).Info("processing stopped")
- return
- }
+ // if the storage is not healthy or if there is no events to
+ // process we don't put it back to the queue immediately but
+ // wait for certain time period first.
+ go func() {
+ select {
+ case <-time.After(storageProcessing.Backoff()):
+ storagesQueue <- storageProcessing
+ case <-appCtx.Done():
+ logger.WithError(appCtx.Err()).Info("processing stopped")
+ return
+ }
+ }()
+ } else {
+ storageProcessing.BackoffReset()
+ storagesQueue <- storageProcessing
}
-
- reset()
}
- }(storageName)
+ }()
}
}
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 3d81fc7ac..043838b49 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -176,29 +176,35 @@ func TestReplMgr_ProcessBacklog(t *testing.T) {
NodeSetFromNodeManager(nodeMgr),
WithLatencyMetric(&mockReplicationLatencyHistogramVec),
WithDelayMetric(&mockReplicationDelayHistogramVec),
+ WithParallelStorageProcessingWorkers(100),
)
- replMgr.ProcessBacklog(ctx, ExpBackoffFactory{Start: time.Hour, Max: 0})
+ replMgr.ProcessBacklog(ctx, noopBackoffFactory{})
logEntries := loggerHook.AllEntries()
- require.True(t, len(logEntries) > 3, "expected at least 4 log entries to be present")
+ require.True(t, len(logEntries) > 4, "expected at least 5 log entries to be present")
+ require.Equal(t,
+ []interface{}{`parallel processing workers decreased from 100 configured with config to 1 according to minumal amount of storages in the virtual storage "virtual"`},
+ []interface{}{logEntries[0].Message},
+ )
+
require.Equal(t,
[]interface{}{"processing started", "virtual"},
- []interface{}{logEntries[0].Message, logEntries[0].Data["virtual_storage"]},
+ []interface{}{logEntries[1].Message, logEntries[1].Data["virtual_storage"]},
)
require.Equal(t,
[]interface{}{"replication job processing started", "virtual", "correlation-id"},
- []interface{}{logEntries[1].Message, logEntries[1].Data["virtual_storage"], logEntries[1].Data[logWithCorrID]},
+ []interface{}{logEntries[2].Message, logEntries[2].Data["virtual_storage"], logEntries[2].Data[logWithCorrID]},
)
- dequeuedEvent := logEntries[1].Data["event"].(datastore.ReplicationEvent)
+ dequeuedEvent := logEntries[2].Data["event"].(datastore.ReplicationEvent)
require.Equal(t, datastore.JobStateInProgress, dequeuedEvent.State)
require.Equal(t, []string{"backup", "primary"}, []string{dequeuedEvent.Job.TargetNodeStorage, dequeuedEvent.Job.SourceNodeStorage})
require.Equal(t,
[]interface{}{"replication job processing finished", "virtual", datastore.JobStateCompleted, "correlation-id"},
- []interface{}{logEntries[2].Message, logEntries[2].Data["virtual_storage"], logEntries[2].Data["new_state"], logEntries[2].Data[logWithCorrID]},
+ []interface{}{logEntries[3].Message, logEntries[3].Data["virtual_storage"], logEntries[3].Data["new_state"], logEntries[3].Data[logWithCorrID]},
)
replicatedPath := filepath.Join(backupCfg.Storages[0].Path, testRepo.GetRelativePath())
@@ -935,11 +941,7 @@ func TestReplMgrProcessBacklog_OnlyHealthyNodes(t *testing.T) {
},
},
)
- replMgrDone := make(chan struct{})
- go func() {
- defer close(replMgrDone)
- replMgr.ProcessBacklog(ctx, ExpBackoffFactory{Start: time.Minute, Max: time.Minute})
- }()
+ replMgrDone := startProcessBacklog(ctx, replMgr)
select {
case <-ctx.Done():