diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2020-08-05 12:50:47 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2020-08-05 13:10:56 +0300 |
commit | 20d6187901167c96a618e7b94ec71e5f3de89f76 (patch) | |
tree | 3946278f64ca3e85baf5db5601841c940483b1da | |
parent | 4ad407c769b607e5112a861e5581347e9a16a63b (diff) |
dequeue one event per worker at a timesmh-finer-replication-locking
Each replication queue worker dequeues up to 10 events at a time
and executes them sequentially. If the first job in the batch is
slow, the remaining jobs end up blocked. This commit changes a
queue worker to dequeue only one event at a time, leading to finer
grained locking of events. This should allow for other free workers
to pick up events that are not yet being handled.
-rw-r--r-- | cmd/praefect/main.go | 1 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 11 |
2 files changed, 11 insertions, 1 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index 2cba3aac7..555576017 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -281,6 +281,7 @@ func run(cfgs []starter.Config, conf config.Config) error { nodeManager, praefect.WithDelayMetric(delayMetric), praefect.WithLatencyMetric(latencyMetric), + praefect.WithDequeueBatchSize(1), ) srvFactory = praefect.NewServerFactory( conf, diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index 109acab65..d6955e28a 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -303,6 +303,7 @@ type ReplMgr struct { replLatencyMetric prommetrics.HistogramVec replDelayMetric prommetrics.HistogramVec replJobTimeout time.Duration + dequeueBatchSize int // whitelist contains the project names of the repos we wish to replicate whitelist map[string]struct{} } @@ -324,6 +325,13 @@ func WithDelayMetric(h prommetrics.HistogramVec) func(*ReplMgr) { } } +// WithDequeueBatchSize configures the number of events to dequeue in a single batch. +func WithDequeueBatchSize(size int) func(*ReplMgr) { + return func(m *ReplMgr) { + m.dequeueBatchSize = size + } +} + // NewReplMgr initializes a replication manager with the provided dependencies // and options func NewReplMgr(log *logrus.Entry, virtualStorages []string, queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore, nodeMgr nodes.Manager, opts ...ReplMgrOpt) ReplMgr { @@ -342,6 +350,7 @@ func NewReplMgr(log *logrus.Entry, virtualStorages []string, queue datastore.Rep ), replLatencyMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}), replDelayMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}), + dequeueBatchSize: 10, } for _, opt := range opts { @@ -488,7 +497,7 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora } func (r ReplMgr) handleNode(ctx context.Context, logger logrus.FieldLogger, shard nodes.Shard, virtualStorage string, target nodes.Node) int { - events, err := r.queue.Dequeue(ctx, virtualStorage, target.GetStorage(), 10) + events, err := r.queue.Dequeue(ctx, virtualStorage, target.GetStorage(), r.dequeueBatchSize) if err != nil { logger.WithField(logWithReplTarget, target.GetStorage()).WithError(err).Error("failed to dequeue replication events") return 0 |