Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2020-08-05 12:50:47 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2020-08-05 13:10:56 +0300
commit20d6187901167c96a618e7b94ec71e5f3de89f76 (patch)
tree3946278f64ca3e85baf5db5601841c940483b1da
parent4ad407c769b607e5112a861e5581347e9a16a63b (diff)
dequeue one event per worker at a timesmh-finer-replication-locking
Each replication queue worker dequeues up to 10 events at a time and executes them sequentially. If the first job in the batch is slow, the remaining jobs end up blocked. This commit changes a queue worker to dequeue only one event at a time, leading to finer grained locking of events. This should allow for other free workers to pick up events that are not yet being handled.
-rw-r--r--cmd/praefect/main.go1
-rw-r--r--internal/praefect/replicator.go11
2 files changed, 11 insertions, 1 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index 2cba3aac7..555576017 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -281,6 +281,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
nodeManager,
praefect.WithDelayMetric(delayMetric),
praefect.WithLatencyMetric(latencyMetric),
+ praefect.WithDequeueBatchSize(1),
)
srvFactory = praefect.NewServerFactory(
conf,
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 109acab65..d6955e28a 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -303,6 +303,7 @@ type ReplMgr struct {
replLatencyMetric prommetrics.HistogramVec
replDelayMetric prommetrics.HistogramVec
replJobTimeout time.Duration
+ dequeueBatchSize int
// whitelist contains the project names of the repos we wish to replicate
whitelist map[string]struct{}
}
@@ -324,6 +325,13 @@ func WithDelayMetric(h prommetrics.HistogramVec) func(*ReplMgr) {
}
}
+// WithDequeueBatchSize configures the number of events to dequeue in a single batch.
+func WithDequeueBatchSize(size int) func(*ReplMgr) {
+ return func(m *ReplMgr) {
+ m.dequeueBatchSize = size
+ }
+}
+
// NewReplMgr initializes a replication manager with the provided dependencies
// and options
func NewReplMgr(log *logrus.Entry, virtualStorages []string, queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore, nodeMgr nodes.Manager, opts ...ReplMgrOpt) ReplMgr {
@@ -342,6 +350,7 @@ func NewReplMgr(log *logrus.Entry, virtualStorages []string, queue datastore.Rep
),
replLatencyMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}),
replDelayMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}),
+ dequeueBatchSize: 10,
}
for _, opt := range opts {
@@ -488,7 +497,7 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora
}
func (r ReplMgr) handleNode(ctx context.Context, logger logrus.FieldLogger, shard nodes.Shard, virtualStorage string, target nodes.Node) int {
- events, err := r.queue.Dequeue(ctx, virtualStorage, target.GetStorage(), 10)
+ events, err := r.queue.Dequeue(ctx, virtualStorage, target.GetStorage(), r.dequeueBatchSize)
if err != nil {
logger.WithField(logWithReplTarget, target.GetStorage()).WithError(err).Error("failed to dequeue replication events")
return 0