diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2020-08-06 15:10:58 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2020-08-06 15:10:58 +0300 |
commit | 521022451881d47378f49cbcf8e3389a6d7d9da1 (patch) | |
tree | 8b3b6647542f0f6a259a73505bdd87d2512ed7a6 | |
parent | 9e5dfd987388c905b584f58b48afede245721ec7 (diff) |
Revert "dequeue one event per worker at a time"
This reverts commit 20d6187901167c96a618e7b94ec71e5f3de89f76.
-rw-r--r-- | cmd/praefect/main.go | 1 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 11 |
2 files changed, 1 insertions, 11 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index 1b3e136ec..efac1d21b 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -287,7 +287,6 @@ func run(cfgs []starter.Config, conf config.Config) error { nodeManager, praefect.WithDelayMetric(delayMetric), praefect.WithLatencyMetric(latencyMetric), - praefect.WithDequeueBatchSize(1), ) srvFactory = praefect.NewServerFactory( conf, diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index d6955e28a..109acab65 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -303,7 +303,6 @@ type ReplMgr struct { replLatencyMetric prommetrics.HistogramVec replDelayMetric prommetrics.HistogramVec replJobTimeout time.Duration - dequeueBatchSize int // whitelist contains the project names of the repos we wish to replicate whitelist map[string]struct{} } @@ -325,13 +324,6 @@ func WithDelayMetric(h prommetrics.HistogramVec) func(*ReplMgr) { } } -// WithDequeueBatchSize configures the number of events to dequeue in a single batch. -func WithDequeueBatchSize(size int) func(*ReplMgr) { - return func(m *ReplMgr) { - m.dequeueBatchSize = size - } -} - // NewReplMgr initializes a replication manager with the provided dependencies // and options func NewReplMgr(log *logrus.Entry, virtualStorages []string, queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore, nodeMgr nodes.Manager, opts ...ReplMgrOpt) ReplMgr { @@ -350,7 +342,6 @@ func NewReplMgr(log *logrus.Entry, virtualStorages []string, queue datastore.Rep ), replLatencyMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}), replDelayMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}), - dequeueBatchSize: 10, } for _, opt := range opts { @@ -497,7 +488,7 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora } func (r ReplMgr) handleNode(ctx context.Context, logger logrus.FieldLogger, shard nodes.Shard, virtualStorage string, target nodes.Node) int { - events, err := r.queue.Dequeue(ctx, virtualStorage, target.GetStorage(), r.dequeueBatchSize) + events, err := r.queue.Dequeue(ctx, virtualStorage, target.GetStorage(), 10) if err != nil { logger.WithField(logWithReplTarget, target.GetStorage()).WithError(err).Error("failed to dequeue replication events") return 0 |