Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'internal/praefect/replicator.go')
-rw-r--r--internal/praefect/replicator.go11
1 files changed, 10 insertions, 1 deletions
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 109acab65..d6955e28a 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -303,6 +303,7 @@ type ReplMgr struct {
replLatencyMetric prommetrics.HistogramVec
replDelayMetric prommetrics.HistogramVec
replJobTimeout time.Duration
+ dequeueBatchSize int
// whitelist contains the project names of the repos we wish to replicate
whitelist map[string]struct{}
}
@@ -324,6 +325,13 @@ func WithDelayMetric(h prommetrics.HistogramVec) func(*ReplMgr) {
}
}
+// WithDequeueBatchSize configures the number of events to dequeue in a single batch.
+func WithDequeueBatchSize(size int) func(*ReplMgr) {
+ return func(m *ReplMgr) {
+ m.dequeueBatchSize = size
+ }
+}
+
// NewReplMgr initializes a replication manager with the provided dependencies
// and options
func NewReplMgr(log *logrus.Entry, virtualStorages []string, queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore, nodeMgr nodes.Manager, opts ...ReplMgrOpt) ReplMgr {
@@ -342,6 +350,7 @@ func NewReplMgr(log *logrus.Entry, virtualStorages []string, queue datastore.Rep
),
replLatencyMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}),
replDelayMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}),
+ dequeueBatchSize: 10,
}
for _, opt := range opts {
@@ -488,7 +497,7 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora
}
func (r ReplMgr) handleNode(ctx context.Context, logger logrus.FieldLogger, shard nodes.Shard, virtualStorage string, target nodes.Node) int {
- events, err := r.queue.Dequeue(ctx, virtualStorage, target.GetStorage(), 10)
+ events, err := r.queue.Dequeue(ctx, virtualStorage, target.GetStorage(), r.dequeueBatchSize)
if err != nil {
logger.WithField(logWithReplTarget, target.GetStorage()).WithError(err).Error("failed to dequeue replication events")
return 0