Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-07-11 18:21:41 +0300
committerQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-07-21 11:18:07 +0300
commit9ef1221ea1f104c708dc735bc31470552c6c0d68 (patch)
tree4722af783c65476f541cee81d5f252cd070e6bc4
parent3c713dfedbc90e788a2174037682a7a92ef8aa16 (diff)
limiter: Implement adaptive calculator for concurrency limiter
This commit implements a calculator to calculate the adaptive limits based on Additive increase/multiplicative decrease (AIMD) algorithm. This method involves gradually increasing the limit during normal process functioning but quickly reducing it when an issue (backoff event) occurs. The calculator receives a list of AdaptiveLimiter and a list of ResourceWatcher. Although the limits may have different settings (Initial, Min, Max, BackoffFactor), they all move as a whole. The caller accesses the current limits via AdaptiveLimiter.Current method. When the calculator starts, each limit value is set to its Initial limit. Periodically, the calculator polls the backoff events from the watchers. The current value of each limit is re-calibrated as follows: * limit = limit + 1 if there is no backoff event since the last calibration. The new limit cannot exceed max limit. * limit = limit * BackoffFactor otherwise. The new limit cannot be lower than min limit.
-rw-r--r--internal/limiter/adaptive_calculator.go194
-rw-r--r--internal/limiter/adaptive_calculator_test.go378
-rw-r--r--internal/limiter/adaptive_limit.go48
3 files changed, 620 insertions, 0 deletions
diff --git a/internal/limiter/adaptive_calculator.go b/internal/limiter/adaptive_calculator.go
new file mode 100644
index 000000000..b350fbcbb
--- /dev/null
+++ b/internal/limiter/adaptive_calculator.go
@@ -0,0 +1,194 @@
+package limiter
+
+import (
+ "context"
+ "fmt"
+ "math"
+ "sync"
+ "time"
+
+ "github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/helper"
+)
+
+// BackoffEvent is a signal that the current system is under pressure. It's returned by the watchers under the
+// management of the AdaptiveCalculator at calibration points.
+type BackoffEvent struct {
+ WatcherName string
+ ShouldBackoff bool
+ Reason string
+}
+
+// ResourceWatcher is an interface of the watchers that monitor the system resources.
+type ResourceWatcher interface {
+ // Name returns the name of the resource watcher
+ Name() string
+ // Poll returns a backoff event when a watcher determine something goes wrong with the resource it is
+ // monitoring. If everything is fine, it returns `nil`. Watchers are expected to respect the cancellation of
+ // the input context.
+ Poll(context.Context) (*BackoffEvent, error)
+}
+
+// AdaptiveCalculator is responsible for calculating the adaptive limits based on additive increase/multiplicative
+// decrease (AIMD) algorithm. This method involves gradually increasing the limit during normal process functioning
+// but quickly reducing it when an issue (backoff event) occurs. It receives a list of AdaptiveLimiter and a list of
+// ResourceWatcher. Although the limits may have different settings (Initial, Min, Max, BackoffFactor), they all move
+// as a whole. The caller accesses the current limits via AdaptiveLimiter.Current method.
+//
+// When the calculator starts, each limit value is set to its Initial limit. Periodically, the calculator polls the
+// backoff events from the watchers. The current value of each limit is re-calibrated as follows:
+// * limit = limit + 1 if there is no backoff event since the last calibration. The new limit cannot exceed max limit.
+// * limit = limit * BackoffFactor otherwise. The new limit cannot be lower than min limit.
+//
+// A watcher returning an error is treated as a no backoff event.
+type AdaptiveCalculator struct {
+ sync.Mutex
+
+ logger *logrus.Entry
+ // started tells whether the calculator already starts. One calculator is allowed to be used once.
+ started bool
+ // calibration is the time duration until the next calibration event.
+ calibration time.Duration
+ // limits are the list of adaptive limits managed by this calculator.
+ limits []AdaptiveLimiter
+ // watchers stores a list of resource watchers that return the backoff events when queried.
+ watchers []ResourceWatcher
+ // lastBackoffEvent stores the last backoff event collected from the watchers.
+ lastBackoffEvent *BackoffEvent
+ // tickerCreator is a custom function that returns a Ticker. It's mostly used in test the manual ticker
+ tickerCreator func(duration time.Duration) helper.Ticker
+}
+
+// NewAdaptiveCalculator constructs a AdaptiveCalculator object. It's the responsibility of the caller to validate
+// the correctness of input AdaptiveLimiter and ResourceWatcher.
+func NewAdaptiveCalculator(calibration time.Duration, logger *logrus.Entry, limits []AdaptiveLimiter, watchers []ResourceWatcher) *AdaptiveCalculator {
+ return &AdaptiveCalculator{
+ logger: logger,
+ calibration: calibration,
+ limits: limits,
+ watchers: watchers,
+ lastBackoffEvent: nil,
+ }
+}
+
+// Start resets the current limit values and start a goroutine to poll the backoff events. This method exits after the
+// mentioned goroutine starts.
+func (c *AdaptiveCalculator) Start(ctx context.Context) (func(), error) {
+ c.Lock()
+ defer c.Unlock()
+
+ if c.started {
+ return nil, fmt.Errorf("adaptive calculator: already started")
+ }
+ c.started = true
+
+ // Reset all limits to their initial limits
+ for _, limit := range c.limits {
+ limit.Update(limit.Setting().Initial)
+ }
+
+ done := make(chan struct{})
+ completed := make(chan struct{})
+
+ go func(ctx context.Context) {
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+ defer close(completed)
+
+ tickerCreator := c.tickerCreator
+ if tickerCreator == nil {
+ tickerCreator = helper.NewTimerTicker
+ }
+ timer := tickerCreator(c.calibration)
+ for {
+ // Reset the timer to the next calibration point. It accounts for the resource polling latencies.
+ timer.Reset()
+ select {
+ case <-timer.C():
+ // If multiple watchers fire multiple backoff events, the calculator decreases once.
+ // Usually, resources are highly correlated. When the memory level raises too high,
+ // the CPU usage also increases due to page faulting, memory reclaim, GC activities, etc.
+ // We might also have multiple watchers for the same resources, for example, memory
+ // usage watcher and page fault counter. Hence, re-calibrating after each event will
+ // cut the limits too aggressively.
+ c.pollBackoffEvent(ctx)
+ c.calibrateLimits()
+
+ // Reset backoff event
+ c.setLastBackoffEvent(nil)
+ case <-done:
+ timer.Stop()
+ return
+ }
+ }
+ }(ctx)
+
+ return func() {
+ close(done)
+ <-completed
+ }, nil
+}
+
+func (c *AdaptiveCalculator) pollBackoffEvent(ctx context.Context) {
+ // Set a timeout to prevent resource watcher runs forever. The deadline
+ // is the next calibration event.
+ ctx, cancel := context.WithTimeout(ctx, c.calibration)
+ defer cancel()
+
+ for _, w := range c.watchers {
+ logger := c.logger.WithField("watcher", w.Name())
+
+ event, err := w.Poll(ctx)
+ if err != nil {
+ logger.Errorf("poll from resource watcher: %s", err)
+ continue
+ }
+ if event.ShouldBackoff {
+ c.setLastBackoffEvent(event)
+ }
+ }
+}
+
+func (c *AdaptiveCalculator) calibrateLimits() {
+ c.Lock()
+ defer c.Unlock()
+
+ for _, limit := range c.limits {
+ setting := limit.Setting()
+
+ var newLimit int
+ logger := c.logger.WithField("limit", limit.Name())
+
+ if c.lastBackoffEvent == nil {
+ // Additive increase, one unit at a time
+ newLimit = limit.Current() + 1
+ if newLimit > setting.Max {
+ newLimit = setting.Max
+ }
+ logger.WithFields(map[string]interface{}{
+ "previous_limit": limit.Current(),
+ "new_limit": newLimit,
+ }).Debugf("Additive increase")
+ } else {
+ // Multiplicative decrease
+ newLimit = int(math.Floor(float64(limit.Current()) * setting.BackoffBackoff))
+ if newLimit < setting.Min {
+ newLimit = setting.Min
+ }
+ logger.WithFields(map[string]interface{}{
+ "previous_limit": limit.Current(),
+ "new_limit": newLimit,
+ "watcher": c.lastBackoffEvent.WatcherName,
+ "reason": c.lastBackoffEvent.Reason,
+ }).Infof("Multiplicative decrease")
+ }
+ limit.Update(newLimit)
+ }
+}
+
+func (c *AdaptiveCalculator) setLastBackoffEvent(event *BackoffEvent) {
+ c.Lock()
+ defer c.Unlock()
+
+ c.lastBackoffEvent = event
+}
diff --git a/internal/limiter/adaptive_calculator_test.go b/internal/limiter/adaptive_calculator_test.go
new file mode 100644
index 000000000..6eb1886c4
--- /dev/null
+++ b/internal/limiter/adaptive_calculator_test.go
@@ -0,0 +1,378 @@
+package limiter
+
+import (
+ "context"
+ "fmt"
+ "testing"
+ "time"
+
+ "github.com/sirupsen/logrus"
+ "github.com/sirupsen/logrus/hooks/test"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
+)
+
+func TestAdaptiveCalculator_alreadyStarted(t *testing.T) {
+ t.Parallel()
+
+ calculator := NewAdaptiveCalculator(10*time.Millisecond, testhelper.NewDiscardingLogEntry(t), nil, nil)
+
+ stop, err := calculator.Start(testhelper.Context(t))
+ require.NoError(t, err)
+
+ stop2, err := calculator.Start(testhelper.Context(t))
+ require.Errorf(t, err, "adaptive calculator: already started")
+ require.Nil(t, stop2)
+
+ stop()
+}
+
+func TestAdaptiveCalculator_realTimerTicker(t *testing.T) {
+ t.Parallel()
+
+ logger, hook := test.NewNullLogger()
+ logger.SetLevel(logrus.InfoLevel)
+
+ limit := newTestLimit("testLimit", 25, 100, 10, 0.5)
+ watcher := newTestWatcher("testWatcher", []string{"", "", "", "", ""}, nil)
+
+ calibration := 10 * time.Millisecond
+ calculator := NewAdaptiveCalculator(calibration, logger.WithContext(testhelper.Context(t)), []AdaptiveLimiter{limit}, []ResourceWatcher{watcher})
+
+ stop, err := calculator.Start(testhelper.Context(t))
+ require.NoError(t, err)
+ time.Sleep(10 * calibration)
+ stop()
+
+ require.Equal(t, []int{25, 26, 27, 28, 29, 30}, limit.currents[:6])
+ assertLogs(t, []string{}, hook.AllEntries())
+}
+
+func TestAdaptiveCalculator(t *testing.T) {
+ t.Parallel()
+
+ tests := []struct {
+ desc string
+ limits []AdaptiveLimiter
+ watchers []ResourceWatcher
+ // The first captured limit is the initial limit
+ expectedLimits map[string][]int
+ expectedLogs []string
+ waitEvents int
+ }{
+ {
+ desc: "Empty watchers",
+ waitEvents: 5,
+ limits: []AdaptiveLimiter{
+ newTestLimit("testLimit", 25, 100, 10, 0.5),
+ },
+ watchers: []ResourceWatcher{},
+ expectedLimits: map[string][]int{
+ "testLimit": {25, 26, 27, 28, 29, 30},
+ },
+ },
+ {
+ desc: "Empty limits and watchers",
+ waitEvents: 5,
+ limits: []AdaptiveLimiter{},
+ watchers: []ResourceWatcher{},
+ expectedLimits: map[string][]int{},
+ },
+ {
+ desc: "Additive increase",
+ waitEvents: 5,
+ limits: []AdaptiveLimiter{
+ newTestLimit("testLimit", 25, 100, 10, 0.5),
+ },
+ watchers: []ResourceWatcher{
+ newTestWatcher("testWatcher", []string{"", "", "", "", ""}, nil),
+ },
+ expectedLimits: map[string][]int{
+ "testLimit": {25, 26, 27, 28, 29, 30},
+ },
+ },
+ {
+ desc: "Additive increase until reaching the max limit",
+ waitEvents: 5,
+ limits: []AdaptiveLimiter{
+ newTestLimit("testLimit", 25, 27, 10, 0.5),
+ },
+ watchers: []ResourceWatcher{
+ newTestWatcher("testWatcher", []string{"", "", "", "", ""}, nil),
+ },
+ expectedLimits: map[string][]int{
+ "testLimit": {25, 26, 27, 27, 27, 27},
+ },
+ },
+ {
+ desc: "Additive increase until a backoff event",
+ waitEvents: 7,
+ limits: []AdaptiveLimiter{
+ newTestLimit("testLimit", 25, 100, 10, 0.5),
+ },
+ watchers: []ResourceWatcher{
+ newTestWatcher("testWatcher", []string{"", "", "", "", "cgroup exceeds limit", "", ""}, nil),
+ },
+ expectedLimits: map[string][]int{
+ "testLimit": {25, 26, 27, 28, 29, 14, 15, 16},
+ },
+ expectedLogs: []string{
+ `level=info msg="Multiplicative decrease" limit=testLimit new_limit=14 previous_limit=29 reason="cgroup exceeds limit" watcher=testWatcher`,
+ },
+ },
+ {
+ desc: "Multiplicative decrease until reaching min limit",
+ waitEvents: 6,
+ limits: []AdaptiveLimiter{
+ newTestLimit("testLimit", 25, 100, 10, 0.5),
+ },
+ watchers: []ResourceWatcher{
+ newTestWatcher("testWatcher", []string{"", "", "reason 1", "reason 2", "reason 3", ""}, nil),
+ },
+ expectedLimits: map[string][]int{
+ "testLimit": {25, 26, 27, 13, 10, 10, 11},
+ },
+ expectedLogs: []string{
+ `level=info msg="Multiplicative decrease" limit=testLimit new_limit=13 previous_limit=27 reason="reason 1" watcher=testWatcher`,
+ `level=info msg="Multiplicative decrease" limit=testLimit new_limit=10 previous_limit=13 reason="reason 2" watcher=testWatcher`,
+ `level=info msg="Multiplicative decrease" limit=testLimit new_limit=10 previous_limit=10 reason="reason 3" watcher=testWatcher`,
+ },
+ },
+ {
+ desc: "Additive increase multiple limits",
+ waitEvents: 5,
+ limits: []AdaptiveLimiter{
+ newTestLimit("testLimit1", 25, 100, 10, 0.5),
+ newTestLimit("testLimit2", 15, 30, 10, 0.5),
+ },
+ watchers: []ResourceWatcher{
+ newTestWatcher("testWatcher", []string{"", "", "", "", ""}, nil),
+ },
+ expectedLimits: map[string][]int{
+ "testLimit1": {25, 26, 27, 28, 29, 30},
+ "testLimit2": {15, 16, 17, 18, 19, 20},
+ },
+ },
+ {
+ desc: "Additive increase multiple limits until a backoff event",
+ waitEvents: 7,
+ limits: []AdaptiveLimiter{
+ newTestLimit("testLimit1", 25, 100, 10, 0.5),
+ newTestLimit("testLimit2", 15, 30, 10, 0.5),
+ },
+ watchers: []ResourceWatcher{
+ newTestWatcher("testWatcher", []string{"", "", "", "", "", "cgroup exceeds limit", ""}, nil),
+ },
+ expectedLimits: map[string][]int{
+ "testLimit1": {25, 26, 27, 28, 29, 30, 15, 16},
+ "testLimit2": {15, 16, 17, 18, 19, 20, 10, 11},
+ },
+ expectedLogs: []string{
+ `level=info msg="Multiplicative decrease" limit=testLimit1 new_limit=15 previous_limit=30 reason="cgroup exceeds limit" watcher=testWatcher`,
+ `level=info msg="Multiplicative decrease" limit=testLimit2 new_limit=10 previous_limit=20 reason="cgroup exceeds limit" watcher=testWatcher`,
+ },
+ },
+ {
+ desc: "Additive increase multiple limits until a backoff event with multiple watchers",
+ waitEvents: 10,
+ limits: []AdaptiveLimiter{
+ newTestLimit("testLimit1", 25, 100, 10, 0.5),
+ newTestLimit("testLimit2", 15, 30, 10, 0.5),
+ },
+ watchers: []ResourceWatcher{
+ newTestWatcher("testWatcher1", []string{"", "", "", "", "", "", "", "", "cgroup exceeds limit 1", ""}, nil),
+ newTestWatcher("testWatcher2", []string{"", "", "", "", "", "", "", "", "", ""}, nil),
+ newTestWatcher("testWatcher3", []string{"", "", "cgroup exceeds limit 2", "", "", "", "", "", ""}, nil),
+ },
+ expectedLimits: map[string][]int{
+ "testLimit1": {25, 26, 27, 13, 14, 15, 16, 17, 18, 10, 11},
+ "testLimit2": {15, 16, 17, 10, 11, 12, 13, 14, 15, 10, 11},
+ },
+ expectedLogs: []string{
+ `level=info msg="Multiplicative decrease" limit=testLimit1 new_limit=13 previous_limit=27 reason="cgroup exceeds limit 2" watcher=testWatcher3`,
+ `level=info msg="Multiplicative decrease" limit=testLimit2 new_limit=10 previous_limit=17 reason="cgroup exceeds limit 2" watcher=testWatcher3`,
+ `level=info msg="Multiplicative decrease" limit=testLimit1 new_limit=10 previous_limit=18 reason="cgroup exceeds limit 1" watcher=testWatcher1`,
+ `level=info msg="Multiplicative decrease" limit=testLimit2 new_limit=10 previous_limit=15 reason="cgroup exceeds limit 1" watcher=testWatcher1`,
+ },
+ },
+ {
+ desc: "Additive increase multiple limits until multiple watchers return multiple errors",
+ waitEvents: 5,
+ limits: []AdaptiveLimiter{
+ newTestLimit("testLimit1", 25, 100, 10, 0.5),
+ newTestLimit("testLimit2", 15, 30, 10, 0.5),
+ },
+ watchers: []ResourceWatcher{
+ newTestWatcher("testWatcher1", []string{"", "", "cgroup exceeds limit 1", "", ""}, nil),
+ newTestWatcher("testWatcher2", []string{"", "", "", "", ""}, nil),
+ newTestWatcher("testWatcher3", []string{"", "", "cgroup exceeds limit 2", "", ""}, nil),
+ },
+ expectedLimits: map[string][]int{
+ "testLimit1": {25, 26, 27, 13, 14, 15},
+ "testLimit2": {15, 16, 17, 10, 11, 12},
+ },
+ expectedLogs: []string{
+ `level=info msg="Multiplicative decrease" limit=testLimit1 new_limit=13 previous_limit=27 reason="cgroup exceeds limit 2" watcher=testWatcher3`,
+ `level=info msg="Multiplicative decrease" limit=testLimit2 new_limit=10 previous_limit=17 reason="cgroup exceeds limit 2" watcher=testWatcher3`,
+ },
+ },
+ {
+ desc: "a watcher returns an error",
+ waitEvents: 5,
+ limits: []AdaptiveLimiter{
+ newTestLimit("testLimit1", 25, 100, 10, 0.5),
+ newTestLimit("testLimit2", 15, 30, 10, 0.5),
+ },
+ watchers: []ResourceWatcher{
+ newTestWatcher("testWatcher1", []string{"", "", "", "", ""}, []error{nil, nil, nil, nil, nil}),
+ newTestWatcher("testWatcher2", []string{"", "", "", "", ""}, []error{nil, nil, nil, fmt.Errorf("unexpected"), nil}),
+ newTestWatcher("testWatcher3", []string{"", "", "", "", ""}, []error{nil, fmt.Errorf("unexpected"), nil, nil, nil}),
+ },
+ expectedLimits: map[string][]int{
+ "testLimit1": {25, 26, 27, 28, 29, 30},
+ "testLimit2": {15, 16, 17, 18, 19, 20},
+ },
+ expectedLogs: []string{
+ `level=error msg="poll from resource watcher: unexpected" watcher=testWatcher3`,
+ `level=error msg="poll from resource watcher: unexpected" watcher=testWatcher2`,
+ },
+ },
+ {
+ desc: "a watcher returns an error at the same time another watcher returns backoff event",
+ waitEvents: 5,
+ limits: []AdaptiveLimiter{
+ newTestLimit("testLimit1", 25, 100, 10, 0.5),
+ newTestLimit("testLimit2", 15, 30, 10, 0.5),
+ },
+ watchers: []ResourceWatcher{
+ newTestWatcher("testWatcher1", []string{"", "", "", "backoff please", ""}, []error{nil, nil, nil, nil, nil}),
+ newTestWatcher("testWatcher2", []string{"", "", "", "", ""}, []error{nil, nil, nil, fmt.Errorf("unexpected"), nil}),
+ newTestWatcher("testWatcher3", []string{"", "", "", "", ""}, []error{nil, fmt.Errorf("unexpected"), nil, nil, nil}),
+ },
+ expectedLimits: map[string][]int{
+ "testLimit1": {25, 26, 27, 28, 14, 15},
+ "testLimit2": {15, 16, 17, 18, 10, 11},
+ },
+ expectedLogs: []string{
+ `level=error msg="poll from resource watcher: unexpected" watcher=testWatcher3`,
+ `level=error msg="poll from resource watcher: unexpected" watcher=testWatcher2`,
+ `level=info msg="Multiplicative decrease" limit=testLimit1 new_limit=14 previous_limit=28 reason="backoff please" watcher=testWatcher1`,
+ `level=info msg="Multiplicative decrease" limit=testLimit2 new_limit=10 previous_limit=18 reason="backoff please" watcher=testWatcher1`,
+ },
+ },
+ }
+ for _, tc := range tests {
+ t.Run(tc.desc, func(t *testing.T) {
+ logger, hook := test.NewNullLogger()
+ logger.SetLevel(logrus.InfoLevel)
+
+ ticker := helper.NewManualTicker()
+
+ calibration := 10 * time.Millisecond
+ calculator := NewAdaptiveCalculator(calibration, logger.WithContext(testhelper.Context(t)), tc.limits, tc.watchers)
+ calculator.tickerCreator = func(duration time.Duration) helper.Ticker { return ticker }
+
+ stop, err := calculator.Start(testhelper.Context(t))
+ require.NoError(t, err)
+ for i := 0; i <= tc.waitEvents; i++ {
+ ticker.Tick()
+ }
+ stop()
+
+ for name, expectedLimits := range tc.expectedLimits {
+ limit := findLimitWithName(tc.limits, name)
+ require.NotNil(t, limit, "not found limit with name %q", name)
+ require.Equal(t, expectedLimits, limit.currents[:tc.waitEvents+1])
+ }
+
+ assertLogs(t, tc.expectedLogs, hook.AllEntries())
+ })
+ }
+}
+
+func assertLogs(t *testing.T, expectedLogs []string, entries []*logrus.Entry) {
+ require.Equal(t, len(expectedLogs), len(entries))
+ for index, expectedLog := range expectedLogs {
+ msg, err := entries[index].String()
+ require.NoError(t, err)
+ require.Contains(t, msg, expectedLog)
+ }
+}
+
+func findLimitWithName(limits []AdaptiveLimiter, name string) *testLimit {
+ for _, l := range limits {
+ limit := l.(*testLimit)
+ if limit.name == name {
+ return limit
+ }
+ }
+ return nil
+}
+
+type testLimit struct {
+ currents []int
+ name string
+ initial int
+ max int
+ min int
+ backoffBackoff float64
+}
+
+func newTestLimit(name string, initial int, max int, min int, backoff float64) *testLimit {
+ return &testLimit{name: name, initial: initial, max: max, min: min, backoffBackoff: backoff}
+}
+
+func (l *testLimit) Name() string { return l.name }
+func (l *testLimit) Current() int {
+ if len(l.currents) == 0 {
+ return 0
+ }
+ return l.currents[len(l.currents)-1]
+}
+func (l *testLimit) Update(val int) { l.currents = append(l.currents, val) }
+func (l *testLimit) Setting() AdaptiveSetting {
+ return AdaptiveSetting{
+ Initial: l.initial,
+ Max: l.max,
+ Min: l.min,
+ BackoffBackoff: l.backoffBackoff,
+ }
+}
+
+type testWatcher struct {
+ name string
+ events []*BackoffEvent
+ errors []error
+ index int
+}
+
+func (w *testWatcher) Name() string {
+ return w.name
+}
+
+func (w *testWatcher) Poll(context.Context) (*BackoffEvent, error) {
+ index := w.index
+ if index >= len(w.events) {
+ index = len(w.events) - 1
+ }
+ var err error
+ if w.errors != nil {
+ err = w.errors[index]
+ }
+ w.index++
+ return w.events[index], err
+}
+
+func newTestWatcher(name string, reasons []string, errors []error) *testWatcher {
+ var events []*BackoffEvent
+ for _, reason := range reasons {
+ event := &BackoffEvent{
+ ShouldBackoff: reason != "",
+ Reason: reason,
+ WatcherName: name,
+ }
+ events = append(events, event)
+ }
+ return &testWatcher{name: name, events: events, errors: errors}
+}
diff --git a/internal/limiter/adaptive_limit.go b/internal/limiter/adaptive_limit.go
new file mode 100644
index 000000000..2aefe5eb0
--- /dev/null
+++ b/internal/limiter/adaptive_limit.go
@@ -0,0 +1,48 @@
+package limiter
+
+import "sync/atomic"
+
+// AdaptiveSetting is a struct that holds the configuration parameters for an adaptive limiter.
+type AdaptiveSetting struct {
+ Initial int
+ Max int
+ Min int
+ BackoffBackoff float64
+}
+
+// AdaptiveLimiter is an interface for managing and updating adaptive limits.
+// It exposes methods to get the name, current limit value, update the limit value, and access its settings.
+type AdaptiveLimiter interface {
+ Name() string
+ Current() int
+ Update(val int)
+ Setting() AdaptiveSetting
+}
+
+// AdaptiveLimit is an implementation of the AdaptiveLimiter interface. It uses an atomic Int32 to represent the current
+// limit value, ensuring thread-safe updates.
+type AdaptiveLimit struct {
+ name string
+ current atomic.Int32
+ setting AdaptiveSetting
+}
+
+// Name returns the name of the adaptive limit
+func (l *AdaptiveLimit) Name() string {
+ return l.name
+}
+
+// Current returns the current limit. This function can be called without the need for synchronization.
+func (l *AdaptiveLimit) Current() int {
+ return int(l.current.Load())
+}
+
+// Update adjusts current limit value.
+func (l *AdaptiveLimit) Update(val int) {
+ l.current.Store(int32(val))
+}
+
+// Setting returns the configuration parameters for an adaptive limiter.
+func (l *AdaptiveLimit) Setting() AdaptiveSetting {
+ return l.setting
+}