diff options
Diffstat (limited to 'internal/praefect/replicator.go')
-rw-r--r-- | internal/praefect/replicator.go | 11 |
1 files changed, 10 insertions, 1 deletions
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index 109acab65..d6955e28a 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -303,6 +303,7 @@ type ReplMgr struct { replLatencyMetric prommetrics.HistogramVec replDelayMetric prommetrics.HistogramVec replJobTimeout time.Duration + dequeueBatchSize int // whitelist contains the project names of the repos we wish to replicate whitelist map[string]struct{} } @@ -324,6 +325,13 @@ func WithDelayMetric(h prommetrics.HistogramVec) func(*ReplMgr) { } } +// WithDequeueBatchSize configures the number of events to dequeue in a single batch. +func WithDequeueBatchSize(size int) func(*ReplMgr) { + return func(m *ReplMgr) { + m.dequeueBatchSize = size + } +} + // NewReplMgr initializes a replication manager with the provided dependencies // and options func NewReplMgr(log *logrus.Entry, virtualStorages []string, queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore, nodeMgr nodes.Manager, opts ...ReplMgrOpt) ReplMgr { @@ -342,6 +350,7 @@ func NewReplMgr(log *logrus.Entry, virtualStorages []string, queue datastore.Rep ), replLatencyMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}), replDelayMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}), + dequeueBatchSize: 10, } for _, opt := range opts { @@ -488,7 +497,7 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora } func (r ReplMgr) handleNode(ctx context.Context, logger logrus.FieldLogger, shard nodes.Shard, virtualStorage string, target nodes.Node) int { - events, err := r.queue.Dequeue(ctx, virtualStorage, target.GetStorage(), 10) + events, err := r.queue.Dequeue(ctx, virtualStorage, target.GetStorage(), r.dequeueBatchSize) if err != nil { logger.WithField(logWithReplTarget, target.GetStorage()).WithError(err).Error("failed to dequeue replication events") return 0 |