Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJustin Tobler <jtobler@gitlab.com>2023-07-21 17:22:43 +0300
committerJustin Tobler <jtobler@gitlab.com>2023-07-21 17:22:43 +0300
commit429e9dc6845904f1af2263d420c6c9c64b8de915 (patch)
tree416d452582a4c9c3c7f6f2735b2135d0fec04872
parent24b83a6dedcad7e6e57ee6932fcf2636065a5be2 (diff)
parentc059b0955c700d9307e955b6ae065d3355490c84 (diff)
Merge branch 'qmnguyen0711/implement-aimd-calculator' into 'master'
Implement adaptive calculator for concurrency limiter See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6058 Merged-by: Justin Tobler <jtobler@gitlab.com> Approved-by: Justin Tobler <jtobler@gitlab.com> Reviewed-by: Quang-Minh Nguyen <qmnguyen@gitlab.com> Reviewed-by: Justin Tobler <jtobler@gitlab.com> Reviewed-by: karthik nayak <knayak@gitlab.com> Co-authored-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
-rw-r--r--internal/limiter/adaptive_calculator.go289
-rw-r--r--internal/limiter/adaptive_calculator_test.go671
-rw-r--r--internal/limiter/adaptive_limit.go48
3 files changed, 1008 insertions, 0 deletions
diff --git a/internal/limiter/adaptive_calculator.go b/internal/limiter/adaptive_calculator.go
new file mode 100644
index 000000000..98c01161b
--- /dev/null
+++ b/internal/limiter/adaptive_calculator.go
@@ -0,0 +1,289 @@
+package limiter
+
+import (
+ "context"
+ "fmt"
+ "math"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/helper"
+)
+
+const (
+ // MaximumWatcherTimeout is the number of maximum allowed timeout when polling backoff events from watchers.
+ // When this threshold is reached, a timeout polling is treated as a backoff event.
+ MaximumWatcherTimeout = 5
+)
+
+// BackoffEvent is a signal that the current system is under pressure. It's returned by the watchers under the
+// management of the AdaptiveCalculator at calibration points.
+type BackoffEvent struct {
+ WatcherName string
+ ShouldBackoff bool
+ Reason string
+}
+
+// ResourceWatcher is an interface of the watchers that monitor the system resources.
+type ResourceWatcher interface {
+ // Name returns the name of the resource watcher
+ Name() string
+ // Poll returns a backoff event when a watcher determine something goes wrong with the resource it is
+ // monitoring. If everything is fine, it returns `nil`. Watchers are expected to respect the cancellation of
+ // the input context.
+ Poll(context.Context) (*BackoffEvent, error)
+}
+
+// AdaptiveCalculator is responsible for calculating the adaptive limits based on additive increase/multiplicative
+// decrease (AIMD) algorithm. This method involves gradually increasing the limit during normal process functioning
+// but quickly reducing it when an issue (backoff event) occurs. It receives a list of AdaptiveLimiter and a list of
+// ResourceWatcher. Although the limits may have different settings (Initial, Min, Max, BackoffFactor), they all move
+// as a whole. The caller accesses the current limits via AdaptiveLimiter.Current method.
+//
+// When the calculator starts, each limit value is set to its Initial limit. Periodically, the calculator polls the
+// backoff events from the watchers. The current value of each limit is re-calibrated as follows:
+// * limit = limit + 1 if there is no backoff event since the last calibration. The new limit cannot exceed max limit.
+// * limit = limit * BackoffFactor otherwise. The new limit cannot be lower than min limit.
+//
+// A watcher returning an error is treated as a no backoff event.
+type AdaptiveCalculator struct {
+ sync.Mutex
+
+ logger *logrus.Entry
+ // started tells whether the calculator already starts. One calculator is allowed to be used once.
+ started bool
+ // calibration is the time duration until the next calibration event.
+ calibration time.Duration
+ // limits are the list of adaptive limits managed by this calculator.
+ limits []AdaptiveLimiter
+ // watchers stores a list of resource watchers that return the backoff events when queried.
+ watchers []ResourceWatcher
+ // watcherTimeouts is a map of counters for consecutive timeouts. The counter is reset when the associated
+ // watcher returns a non-error event or exceeds MaximumWatcherTimeout.
+ watcherTimeouts map[ResourceWatcher]*atomic.Int32
+ // lastBackoffEvent stores the last backoff event collected from the watchers.
+ lastBackoffEvent *BackoffEvent
+ // tickerCreator is a custom function that returns a Ticker. It's mostly used in test the manual ticker
+ tickerCreator func(duration time.Duration) helper.Ticker
+
+ // currentLimitVec is the gauge of current limit value of an adaptive concurrency limit
+ currentLimitVec *prometheus.GaugeVec
+ // watcherErrorsVec is the counter of the total number of watcher errors
+ watcherErrorsVec *prometheus.CounterVec
+ // backoffEventsVec is the counter of the total number of backoff events
+ backoffEventsVec *prometheus.CounterVec
+}
+
+// NewAdaptiveCalculator constructs a AdaptiveCalculator object. It's the responsibility of the caller to validate
+// the correctness of input AdaptiveLimiter and ResourceWatcher.
+func NewAdaptiveCalculator(calibration time.Duration, logger *logrus.Entry, limits []AdaptiveLimiter, watchers []ResourceWatcher) *AdaptiveCalculator {
+ watcherTimeouts := map[ResourceWatcher]*atomic.Int32{}
+ for _, watcher := range watchers {
+ watcherTimeouts[watcher] = &atomic.Int32{}
+ }
+
+ return &AdaptiveCalculator{
+ logger: logger,
+ calibration: calibration,
+ limits: limits,
+ watchers: watchers,
+ watcherTimeouts: watcherTimeouts,
+ lastBackoffEvent: nil,
+ currentLimitVec: prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Name: "gitaly_concurrency_limiting_current_limit",
+ Help: "The current limit value of an adaptive concurrency limit",
+ },
+ []string{"limit"},
+ ),
+ watcherErrorsVec: prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Name: "gitaly_concurrency_limiting_watcher_errors_total",
+ Help: "Counter of the total number of watcher errors",
+ },
+ []string{"watcher"},
+ ),
+ backoffEventsVec: prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Name: "gitaly_concurrency_limiting_backoff_events_total",
+ Help: "Counter of the total number of backoff events",
+ },
+ []string{"watcher"},
+ ),
+ }
+}
+
+// Start resets the current limit values and start a goroutine to poll the backoff events. This method exits after the
+// mentioned goroutine starts.
+func (c *AdaptiveCalculator) Start(ctx context.Context) (func(), error) {
+ c.Lock()
+ defer c.Unlock()
+
+ if c.started {
+ return nil, fmt.Errorf("adaptive calculator: already started")
+ }
+ c.started = true
+
+ // Reset all limits to their initial limits
+ for _, limit := range c.limits {
+ c.updateLimit(limit, limit.Setting().Initial)
+ }
+
+ done := make(chan struct{})
+ completed := make(chan struct{})
+
+ go func(ctx context.Context) {
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+ defer close(completed)
+
+ tickerCreator := c.tickerCreator
+ if tickerCreator == nil {
+ tickerCreator = helper.NewTimerTicker
+ }
+ timer := tickerCreator(c.calibration)
+ for {
+ // Reset the timer to the next calibration point. It accounts for the resource polling latencies.
+ timer.Reset()
+ select {
+ case <-timer.C():
+ // If multiple watchers fire multiple backoff events, the calculator decreases once.
+ // Usually, resources are highly correlated. When the memory level raises too high,
+ // the CPU usage also increases due to page faulting, memory reclaim, GC activities, etc.
+ // We might also have multiple watchers for the same resources, for example, memory
+ // usage watcher and page fault counter. Hence, re-calibrating after each event will
+ // cut the limits too aggressively.
+ c.pollBackoffEvent(ctx)
+ c.calibrateLimits(ctx)
+
+ // Reset backoff event
+ c.setLastBackoffEvent(nil)
+ case <-done:
+ timer.Stop()
+ return
+ }
+ }
+ }(ctx)
+
+ return func() {
+ close(done)
+ <-completed
+ }, nil
+}
+
+// Describe is used to describe Prometheus metrics.
+func (c *AdaptiveCalculator) Describe(descs chan<- *prometheus.Desc) {
+ prometheus.DescribeByCollect(c, descs)
+}
+
+// Collect is used to collect Prometheus metrics.
+func (c *AdaptiveCalculator) Collect(metrics chan<- prometheus.Metric) {
+ c.currentLimitVec.Collect(metrics)
+ c.watcherErrorsVec.Collect(metrics)
+ c.backoffEventsVec.Collect(metrics)
+}
+
+func (c *AdaptiveCalculator) pollBackoffEvent(ctx context.Context) {
+ // Set a timeout to prevent resource watcher runs forever. The deadline
+ // is the next calibration event.
+ ctx, cancel := context.WithTimeout(ctx, c.calibration)
+ defer cancel()
+
+ for _, w := range c.watchers {
+ // If the context is cancelled, return early.
+ if ctx.Err() != nil {
+ return
+ }
+
+ logger := c.logger.WithField("watcher", w.Name())
+ event, err := w.Poll(ctx)
+ if err != nil {
+ if err == context.DeadlineExceeded {
+ c.watcherTimeouts[w].Add(1)
+ // If the watcher timeouts for a number of consecutive times, treat it as a
+ // backoff event.
+ if timeoutCount := c.watcherTimeouts[w].Load(); timeoutCount >= MaximumWatcherTimeout {
+ c.setLastBackoffEvent(&BackoffEvent{
+ WatcherName: w.Name(),
+ ShouldBackoff: true,
+ Reason: fmt.Sprintf("%d consecutive polling timeout errors", timeoutCount),
+ })
+ // Reset the timeout counter. The next MaximumWatcherTimeout will trigger
+ // another backoff event.
+ c.watcherTimeouts[w].Store(0)
+ }
+ }
+
+ if err != context.Canceled {
+ c.watcherErrorsVec.WithLabelValues(w.Name()).Inc()
+ logger.Errorf("poll from resource watcher: %s", err)
+ }
+
+ continue
+ }
+ // Reset the timeout counter if the watcher polls successfully.
+ c.watcherTimeouts[w].Store(0)
+ if event.ShouldBackoff {
+ c.setLastBackoffEvent(event)
+ }
+ }
+}
+
+func (c *AdaptiveCalculator) calibrateLimits(ctx context.Context) {
+ c.Lock()
+ defer c.Unlock()
+
+ if ctx.Err() != nil {
+ return
+ }
+
+ for _, limit := range c.limits {
+ setting := limit.Setting()
+
+ var newLimit int
+ logger := c.logger.WithField("limit", limit.Name())
+
+ if c.lastBackoffEvent == nil {
+ // Additive increase, one unit at a time
+ newLimit = limit.Current() + 1
+ if newLimit > setting.Max {
+ newLimit = setting.Max
+ }
+ logger.WithFields(map[string]interface{}{
+ "previous_limit": limit.Current(),
+ "new_limit": newLimit,
+ }).Debugf("Additive increase")
+ } else {
+ // Multiplicative decrease
+ newLimit = int(math.Floor(float64(limit.Current()) * setting.BackoffBackoff))
+ if newLimit < setting.Min {
+ newLimit = setting.Min
+ }
+ logger.WithFields(map[string]interface{}{
+ "previous_limit": limit.Current(),
+ "new_limit": newLimit,
+ "watcher": c.lastBackoffEvent.WatcherName,
+ "reason": c.lastBackoffEvent.Reason,
+ }).Infof("Multiplicative decrease")
+ }
+ c.updateLimit(limit, newLimit)
+ }
+}
+
+func (c *AdaptiveCalculator) setLastBackoffEvent(event *BackoffEvent) {
+ c.Lock()
+ defer c.Unlock()
+
+ c.lastBackoffEvent = event
+ if event != nil && event.ShouldBackoff {
+ c.backoffEventsVec.WithLabelValues(event.WatcherName).Inc()
+ }
+}
+
+func (c *AdaptiveCalculator) updateLimit(limit AdaptiveLimiter, newLimit int) {
+ limit.Update(newLimit)
+ c.currentLimitVec.WithLabelValues(limit.Name()).Set(float64(newLimit))
+}
diff --git a/internal/limiter/adaptive_calculator_test.go b/internal/limiter/adaptive_calculator_test.go
new file mode 100644
index 000000000..a89445211
--- /dev/null
+++ b/internal/limiter/adaptive_calculator_test.go
@@ -0,0 +1,671 @@
+package limiter
+
+import (
+ "context"
+ "fmt"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/prometheus/client_golang/prometheus/testutil"
+ "github.com/sirupsen/logrus"
+ "github.com/sirupsen/logrus/hooks/test"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
+)
+
+func TestAdaptiveCalculator_alreadyStarted(t *testing.T) {
+ t.Parallel()
+
+ calculator := NewAdaptiveCalculator(10*time.Millisecond, testhelper.NewDiscardingLogEntry(t), nil, nil)
+
+ stop, err := calculator.Start(testhelper.Context(t))
+ require.NoError(t, err)
+
+ stop2, err := calculator.Start(testhelper.Context(t))
+ require.Errorf(t, err, "adaptive calculator: already started")
+ require.Nil(t, stop2)
+
+ stop()
+}
+
+func TestAdaptiveCalculator_realTimerTicker(t *testing.T) {
+ t.Parallel()
+
+ logger, hook := test.NewNullLogger()
+ logger.SetLevel(logrus.InfoLevel)
+
+ limit := newTestLimit("testLimit", 25, 100, 10, 0.5)
+ watcher := newTestWatcher("testWatcher", []string{"", "", "", "", ""}, nil)
+
+ calibration := 10 * time.Millisecond
+ calculator := NewAdaptiveCalculator(calibration, logger.WithContext(testhelper.Context(t)), []AdaptiveLimiter{limit}, []ResourceWatcher{watcher})
+
+ stop, err := calculator.Start(testhelper.Context(t))
+ require.NoError(t, err)
+ time.Sleep(10 * calibration)
+ stop()
+
+ require.Equal(t, []int{25, 26, 27, 28, 29, 30}, limit.currents[:6])
+ assertLogs(t, []string{}, hook.AllEntries())
+}
+
+func TestAdaptiveCalculator(t *testing.T) {
+ t.Parallel()
+
+ tests := []struct {
+ desc string
+ limits []AdaptiveLimiter
+ watchers []ResourceWatcher
+ waitEvents int
+ // The first captured limit is the initial limit
+ expectedLimits map[string][]int
+ expectedLogs []string
+ expectedMetrics string
+ }{
+ {
+ desc: "Empty watchers",
+ waitEvents: 5,
+ limits: []AdaptiveLimiter{
+ newTestLimit("testLimit", 25, 100, 10, 0.5),
+ },
+ watchers: []ResourceWatcher{},
+ expectedLimits: map[string][]int{
+ "testLimit": {25, 26, 27, 28, 29, 30},
+ },
+ expectedMetrics: `# HELP gitaly_concurrency_limiting_current_limit The current limit value of an adaptive concurrency limit
+# TYPE gitaly_concurrency_limiting_current_limit gauge
+gitaly_concurrency_limiting_current_limit{limit="testLimit"} {testLimit}
+
+`,
+ },
+ {
+ desc: "Empty limits and watchers",
+ waitEvents: 5,
+ limits: []AdaptiveLimiter{},
+ watchers: []ResourceWatcher{},
+ expectedLimits: map[string][]int{},
+ },
+ {
+ desc: "Additive increase",
+ waitEvents: 5,
+ limits: []AdaptiveLimiter{
+ newTestLimit("testLimit", 25, 100, 10, 0.5),
+ },
+ watchers: []ResourceWatcher{
+ newTestWatcher("testWatcher", []string{"", "", "", "", ""}, nil),
+ },
+ expectedLimits: map[string][]int{
+ "testLimit": {25, 26, 27, 28, 29, 30},
+ },
+ expectedMetrics: `# HELP gitaly_concurrency_limiting_current_limit The current limit value of an adaptive concurrency limit
+# TYPE gitaly_concurrency_limiting_current_limit gauge
+gitaly_concurrency_limiting_current_limit{limit="testLimit"} {testLimit}
+
+`,
+ },
+ {
+ desc: "Additive increase until reaching the max limit",
+ waitEvents: 5,
+ limits: []AdaptiveLimiter{
+ newTestLimit("testLimit", 25, 27, 10, 0.5),
+ },
+ watchers: []ResourceWatcher{
+ newTestWatcher("testWatcher", []string{"", "", "", "", ""}, nil),
+ },
+ expectedLimits: map[string][]int{
+ "testLimit": {25, 26, 27, 27, 27, 27},
+ },
+ // In this test, the current limit never exceeds the max value. No need to replace the value.
+ expectedMetrics: `# HELP gitaly_concurrency_limiting_current_limit The current limit value of an adaptive concurrency limit
+# TYPE gitaly_concurrency_limiting_current_limit gauge
+gitaly_concurrency_limiting_current_limit{limit="testLimit"} 27
+
+`,
+ },
+ {
+ desc: "Additive increase until a backoff event",
+ waitEvents: 7,
+ limits: []AdaptiveLimiter{
+ newTestLimit("testLimit", 25, 100, 10, 0.5),
+ },
+ watchers: []ResourceWatcher{
+ newTestWatcher("testWatcher", []string{"", "", "", "", "cgroup exceeds limit", "", ""}, nil),
+ },
+ expectedLimits: map[string][]int{
+ "testLimit": {25, 26, 27, 28, 29, 14, 15, 16},
+ },
+ expectedLogs: []string{
+ `level=info msg="Multiplicative decrease" limit=testLimit new_limit=14 previous_limit=29 reason="cgroup exceeds limit" watcher=testWatcher`,
+ },
+ expectedMetrics: `# HELP gitaly_concurrency_limiting_backoff_events_total Counter of the total number of backoff events
+# TYPE gitaly_concurrency_limiting_backoff_events_total counter
+gitaly_concurrency_limiting_backoff_events_total{watcher="testWatcher"} 1
+# HELP gitaly_concurrency_limiting_current_limit The current limit value of an adaptive concurrency limit
+# TYPE gitaly_concurrency_limiting_current_limit gauge
+gitaly_concurrency_limiting_current_limit{limit="testLimit"} {testLimit}
+
+`,
+ },
+ {
+ desc: "Multiplicative decrease until reaching min limit",
+ waitEvents: 6,
+ limits: []AdaptiveLimiter{
+ newTestLimit("testLimit", 25, 100, 10, 0.5),
+ },
+ watchers: []ResourceWatcher{
+ newTestWatcher("testWatcher", []string{"", "", "reason 1", "reason 2", "reason 3", ""}, nil),
+ },
+ expectedLimits: map[string][]int{
+ "testLimit": {25, 26, 27, 13, 10, 10, 11},
+ },
+ expectedLogs: []string{
+ `level=info msg="Multiplicative decrease" limit=testLimit new_limit=13 previous_limit=27 reason="reason 1" watcher=testWatcher`,
+ `level=info msg="Multiplicative decrease" limit=testLimit new_limit=10 previous_limit=13 reason="reason 2" watcher=testWatcher`,
+ `level=info msg="Multiplicative decrease" limit=testLimit new_limit=10 previous_limit=10 reason="reason 3" watcher=testWatcher`,
+ },
+ expectedMetrics: `# HELP gitaly_concurrency_limiting_backoff_events_total Counter of the total number of backoff events
+# TYPE gitaly_concurrency_limiting_backoff_events_total counter
+gitaly_concurrency_limiting_backoff_events_total{watcher="testWatcher"} 3
+# HELP gitaly_concurrency_limiting_current_limit The current limit value of an adaptive concurrency limit
+# TYPE gitaly_concurrency_limiting_current_limit gauge
+gitaly_concurrency_limiting_current_limit{limit="testLimit"} {testLimit}
+
+`,
+ },
+ {
+ desc: "Additive increase multiple limits",
+ waitEvents: 5,
+ limits: []AdaptiveLimiter{
+ newTestLimit("testLimit1", 25, 100, 10, 0.5),
+ newTestLimit("testLimit2", 15, 30, 10, 0.5),
+ },
+ watchers: []ResourceWatcher{
+ newTestWatcher("testWatcher", []string{"", "", "", "", ""}, nil),
+ },
+ expectedLimits: map[string][]int{
+ "testLimit1": {25, 26, 27, 28, 29, 30},
+ "testLimit2": {15, 16, 17, 18, 19, 20},
+ },
+ expectedMetrics: `# HELP gitaly_concurrency_limiting_current_limit The current limit value of an adaptive concurrency limit
+# TYPE gitaly_concurrency_limiting_current_limit gauge
+gitaly_concurrency_limiting_current_limit{limit="testLimit1"} {testLimit1}
+gitaly_concurrency_limiting_current_limit{limit="testLimit2"} {testLimit2}
+
+`,
+ },
+ {
+ desc: "Additive increase multiple limits until a backoff event",
+ waitEvents: 7,
+ limits: []AdaptiveLimiter{
+ newTestLimit("testLimit1", 25, 100, 10, 0.5),
+ newTestLimit("testLimit2", 15, 30, 10, 0.5),
+ },
+ watchers: []ResourceWatcher{
+ newTestWatcher("testWatcher", []string{"", "", "", "", "", "cgroup exceeds limit", ""}, nil),
+ },
+ expectedLimits: map[string][]int{
+ "testLimit1": {25, 26, 27, 28, 29, 30, 15, 16},
+ "testLimit2": {15, 16, 17, 18, 19, 20, 10, 11},
+ },
+ expectedLogs: []string{
+ `level=info msg="Multiplicative decrease" limit=testLimit1 new_limit=15 previous_limit=30 reason="cgroup exceeds limit" watcher=testWatcher`,
+ `level=info msg="Multiplicative decrease" limit=testLimit2 new_limit=10 previous_limit=20 reason="cgroup exceeds limit" watcher=testWatcher`,
+ },
+ expectedMetrics: `# HELP gitaly_concurrency_limiting_backoff_events_total Counter of the total number of backoff events
+# TYPE gitaly_concurrency_limiting_backoff_events_total counter
+gitaly_concurrency_limiting_backoff_events_total{watcher="testWatcher"} 1
+# HELP gitaly_concurrency_limiting_current_limit The current limit value of an adaptive concurrency limit
+# TYPE gitaly_concurrency_limiting_current_limit gauge
+gitaly_concurrency_limiting_current_limit{limit="testLimit1"} {testLimit1}
+gitaly_concurrency_limiting_current_limit{limit="testLimit2"} {testLimit2}
+
+`,
+ },
+ {
+ desc: "Additive increase multiple limits until a backoff event with multiple watchers",
+ waitEvents: 10,
+ limits: []AdaptiveLimiter{
+ newTestLimit("testLimit1", 25, 100, 10, 0.5),
+ newTestLimit("testLimit2", 15, 30, 10, 0.5),
+ },
+ watchers: []ResourceWatcher{
+ newTestWatcher("testWatcher1", []string{"", "", "", "", "", "", "", "", "cgroup exceeds limit 1", ""}, nil),
+ newTestWatcher("testWatcher2", []string{"", "", "", "", "", "", "", "", "", ""}, nil),
+ newTestWatcher("testWatcher3", []string{"", "", "cgroup exceeds limit 2", "", "", "", "", "", ""}, nil),
+ },
+ expectedLimits: map[string][]int{
+ "testLimit1": {25, 26, 27, 13, 14, 15, 16, 17, 18, 10, 11},
+ "testLimit2": {15, 16, 17, 10, 11, 12, 13, 14, 15, 10, 11},
+ },
+ expectedLogs: []string{
+ `level=info msg="Multiplicative decrease" limit=testLimit1 new_limit=13 previous_limit=27 reason="cgroup exceeds limit 2" watcher=testWatcher3`,
+ `level=info msg="Multiplicative decrease" limit=testLimit2 new_limit=10 previous_limit=17 reason="cgroup exceeds limit 2" watcher=testWatcher3`,
+ `level=info msg="Multiplicative decrease" limit=testLimit1 new_limit=10 previous_limit=18 reason="cgroup exceeds limit 1" watcher=testWatcher1`,
+ `level=info msg="Multiplicative decrease" limit=testLimit2 new_limit=10 previous_limit=15 reason="cgroup exceeds limit 1" watcher=testWatcher1`,
+ },
+ expectedMetrics: `# HELP gitaly_concurrency_limiting_backoff_events_total Counter of the total number of backoff events
+# TYPE gitaly_concurrency_limiting_backoff_events_total counter
+gitaly_concurrency_limiting_backoff_events_total{watcher="testWatcher1"} 1
+gitaly_concurrency_limiting_backoff_events_total{watcher="testWatcher3"} 1
+# HELP gitaly_concurrency_limiting_current_limit The current limit value of an adaptive concurrency limit
+# TYPE gitaly_concurrency_limiting_current_limit gauge
+gitaly_concurrency_limiting_current_limit{limit="testLimit1"} {testLimit1}
+gitaly_concurrency_limiting_current_limit{limit="testLimit2"} {testLimit2}
+
+`,
+ },
+ {
+ desc: "Additive increase multiple limits until multiple watchers return multiple errors",
+ waitEvents: 5,
+ limits: []AdaptiveLimiter{
+ newTestLimit("testLimit1", 25, 100, 10, 0.5),
+ newTestLimit("testLimit2", 15, 30, 10, 0.5),
+ },
+ watchers: []ResourceWatcher{
+ newTestWatcher("testWatcher1", []string{"", "", "cgroup exceeds limit 1", "", ""}, nil),
+ newTestWatcher("testWatcher2", []string{"", "", "", "", ""}, nil),
+ newTestWatcher("testWatcher3", []string{"", "", "cgroup exceeds limit 2", "", ""}, nil),
+ },
+ expectedLimits: map[string][]int{
+ "testLimit1": {25, 26, 27, 13, 14, 15},
+ "testLimit2": {15, 16, 17, 10, 11, 12},
+ },
+ expectedLogs: []string{
+ `level=info msg="Multiplicative decrease" limit=testLimit1 new_limit=13 previous_limit=27 reason="cgroup exceeds limit 2" watcher=testWatcher3`,
+ `level=info msg="Multiplicative decrease" limit=testLimit2 new_limit=10 previous_limit=17 reason="cgroup exceeds limit 2" watcher=testWatcher3`,
+ },
+ expectedMetrics: `# HELP gitaly_concurrency_limiting_backoff_events_total Counter of the total number of backoff events
+# TYPE gitaly_concurrency_limiting_backoff_events_total counter
+gitaly_concurrency_limiting_backoff_events_total{watcher="testWatcher1"} 1
+gitaly_concurrency_limiting_backoff_events_total{watcher="testWatcher3"} 1
+# HELP gitaly_concurrency_limiting_current_limit The current limit value of an adaptive concurrency limit
+# TYPE gitaly_concurrency_limiting_current_limit gauge
+gitaly_concurrency_limiting_current_limit{limit="testLimit1"} {testLimit1}
+gitaly_concurrency_limiting_current_limit{limit="testLimit2"} {testLimit2}
+
+`,
+ },
+ {
+ desc: "a watcher returns an error",
+ waitEvents: 5,
+ limits: []AdaptiveLimiter{
+ newTestLimit("testLimit1", 25, 100, 10, 0.5),
+ newTestLimit("testLimit2", 15, 30, 10, 0.5),
+ },
+ watchers: []ResourceWatcher{
+ newTestWatcher("testWatcher1", []string{"", "", "", "", ""}, []error{nil, nil, nil, nil, nil}),
+ newTestWatcher("testWatcher2", []string{"", "", "", "", ""}, []error{nil, nil, nil, fmt.Errorf("unexpected"), nil}),
+ newTestWatcher("testWatcher3", []string{"", "", "", "", ""}, []error{nil, fmt.Errorf("unexpected"), nil, nil, nil}),
+ },
+ expectedLimits: map[string][]int{
+ "testLimit1": {25, 26, 27, 28, 29, 30},
+ "testLimit2": {15, 16, 17, 18, 19, 20},
+ },
+ expectedLogs: []string{
+ `level=error msg="poll from resource watcher: unexpected" watcher=testWatcher3`,
+ `level=error msg="poll from resource watcher: unexpected" watcher=testWatcher2`,
+ },
+ expectedMetrics: `# HELP gitaly_concurrency_limiting_current_limit The current limit value of an adaptive concurrency limit
+# TYPE gitaly_concurrency_limiting_current_limit gauge
+gitaly_concurrency_limiting_current_limit{limit="testLimit1"} {testLimit1}
+gitaly_concurrency_limiting_current_limit{limit="testLimit2"} {testLimit2}
+# HELP gitaly_concurrency_limiting_watcher_errors_total Counter of the total number of watcher errors
+# TYPE gitaly_concurrency_limiting_watcher_errors_total counter
+gitaly_concurrency_limiting_watcher_errors_total{watcher="testWatcher2"} 1
+gitaly_concurrency_limiting_watcher_errors_total{watcher="testWatcher3"} 1
+
+`,
+ },
+ {
+ desc: "a watcher returns an error at the same time another watcher returns backoff event",
+ waitEvents: 5,
+ limits: []AdaptiveLimiter{
+ newTestLimit("testLimit1", 25, 100, 10, 0.5),
+ newTestLimit("testLimit2", 15, 30, 10, 0.5),
+ },
+ watchers: []ResourceWatcher{
+ newTestWatcher("testWatcher1", []string{"", "", "", "backoff please", ""}, []error{nil, nil, nil, nil, nil}),
+ newTestWatcher("testWatcher2", []string{"", "", "", "", ""}, []error{nil, nil, nil, fmt.Errorf("unexpected"), nil}),
+ newTestWatcher("testWatcher3", []string{"", "", "", "", ""}, []error{nil, fmt.Errorf("unexpected"), nil, nil, nil}),
+ },
+ expectedLimits: map[string][]int{
+ "testLimit1": {25, 26, 27, 28, 14, 15},
+ "testLimit2": {15, 16, 17, 18, 10, 11},
+ },
+ expectedLogs: []string{
+ `level=error msg="poll from resource watcher: unexpected" watcher=testWatcher3`,
+ `level=error msg="poll from resource watcher: unexpected" watcher=testWatcher2`,
+ `level=info msg="Multiplicative decrease" limit=testLimit1 new_limit=14 previous_limit=28 reason="backoff please" watcher=testWatcher1`,
+ `level=info msg="Multiplicative decrease" limit=testLimit2 new_limit=10 previous_limit=18 reason="backoff please" watcher=testWatcher1`,
+ },
+ expectedMetrics: `# HELP gitaly_concurrency_limiting_backoff_events_total Counter of the total number of backoff events
+# TYPE gitaly_concurrency_limiting_backoff_events_total counter
+gitaly_concurrency_limiting_backoff_events_total{watcher="testWatcher1"} 1
+# HELP gitaly_concurrency_limiting_current_limit The current limit value of an adaptive concurrency limit
+# TYPE gitaly_concurrency_limiting_current_limit gauge
+gitaly_concurrency_limiting_current_limit{limit="testLimit1"} {testLimit1}
+gitaly_concurrency_limiting_current_limit{limit="testLimit2"} {testLimit2}
+# HELP gitaly_concurrency_limiting_watcher_errors_total Counter of the total number of watcher errors
+# TYPE gitaly_concurrency_limiting_watcher_errors_total counter
+gitaly_concurrency_limiting_watcher_errors_total{watcher="testWatcher2"} 1
+gitaly_concurrency_limiting_watcher_errors_total{watcher="testWatcher3"} 1
+
+`,
+ },
+ {
+ desc: "a watcher returns context canceled error",
+ waitEvents: 5,
+ limits: []AdaptiveLimiter{
+ newTestLimit("testLimit1", 25, 100, 10, 0.5),
+ newTestLimit("testLimit2", 15, 30, 10, 0.5),
+ },
+ watchers: []ResourceWatcher{
+ newTestWatcher("testWatcher1", []string{"", "", "", "", ""}, []error{nil, nil, nil, nil, context.Canceled}),
+ newTestWatcher("testWatcher2", []string{"", "", "", "", ""}, []error{nil, nil, nil, nil, context.Canceled}),
+ newTestWatcher("testWatcher3", []string{"", "", "", "", ""}, []error{nil, nil, nil, nil, context.Canceled}),
+ },
+ expectedLimits: map[string][]int{
+ "testLimit1": {25, 26, 27, 28, 29, 30},
+ "testLimit2": {15, 16, 17, 18, 19, 20},
+ },
+ expectedLogs: []string{},
+ expectedMetrics: `# HELP gitaly_concurrency_limiting_current_limit The current limit value of an adaptive concurrency limit
+# TYPE gitaly_concurrency_limiting_current_limit gauge
+gitaly_concurrency_limiting_current_limit{limit="testLimit1"} {testLimit1}
+gitaly_concurrency_limiting_current_limit{limit="testLimit2"} {testLimit2}
+
+`,
+ },
+ {
+ desc: "a watcher returns some timeout errors",
+ waitEvents: 5,
+ limits: []AdaptiveLimiter{
+ newTestLimit("testLimit1", 25, 100, 10, 0.5),
+ newTestLimit("testLimit2", 15, 30, 10, 0.5),
+ },
+ watchers: []ResourceWatcher{
+ newTestWatcher("testWatcher", []string{"", "", "", "", ""}, []error{nil, context.DeadlineExceeded, context.DeadlineExceeded, nil, nil}),
+ },
+ expectedLimits: map[string][]int{
+ "testLimit1": {25, 26, 27, 28, 29, 30},
+ "testLimit2": {15, 16, 17, 18, 19, 20},
+ },
+ expectedLogs: []string{
+ // Not enough timeout to trigger a backoff event
+ `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher`,
+ `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher`,
+ },
+ expectedMetrics: `# HELP gitaly_concurrency_limiting_current_limit The current limit value of an adaptive concurrency limit
+# TYPE gitaly_concurrency_limiting_current_limit gauge
+gitaly_concurrency_limiting_current_limit{limit="testLimit1"} {testLimit1}
+gitaly_concurrency_limiting_current_limit{limit="testLimit2"} {testLimit2}
+# HELP gitaly_concurrency_limiting_watcher_errors_total Counter of the total number of watcher errors
+# TYPE gitaly_concurrency_limiting_watcher_errors_total counter
+gitaly_concurrency_limiting_watcher_errors_total{watcher="testWatcher"} 2
+
+`,
+ },
+ {
+ desc: "a watcher returns 5 consecutive timeout errors",
+ waitEvents: 6,
+ limits: []AdaptiveLimiter{
+ newTestLimit("testLimit1", 25, 100, 10, 0.5),
+ newTestLimit("testLimit2", 15, 30, 10, 0.5),
+ },
+ watchers: []ResourceWatcher{
+ newTestWatcher(
+ "testWatcher",
+ []string{"", "", "", "", "", ""},
+ []error{context.DeadlineExceeded, context.DeadlineExceeded, context.DeadlineExceeded, context.DeadlineExceeded, context.DeadlineExceeded, nil},
+ ),
+ },
+ expectedLimits: map[string][]int{
+ "testLimit1": {25, 26, 27, 28, 29, 14, 15},
+ "testLimit2": {15, 16, 17, 18, 19, 10, 11},
+ },
+ expectedLogs: []string{
+ // The last timeout triggers a backoff event, then increases again
+ `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher`,
+ `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher`,
+ `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher`,
+ `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher`,
+ `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher`,
+ `level=info msg="Multiplicative decrease" limit=testLimit1 new_limit=14 previous_limit=29 reason="5 consecutive polling timeout errors" watcher=testWatcher`,
+ `level=info msg="Multiplicative decrease" limit=testLimit2 new_limit=10 previous_limit=19 reason="5 consecutive polling timeout errors" watcher=testWatcher`,
+ },
+ expectedMetrics: `# HELP gitaly_concurrency_limiting_backoff_events_total Counter of the total number of backoff events
+# TYPE gitaly_concurrency_limiting_backoff_events_total counter
+gitaly_concurrency_limiting_backoff_events_total{watcher="testWatcher"} 1
+# HELP gitaly_concurrency_limiting_current_limit The current limit value of an adaptive concurrency limit
+# TYPE gitaly_concurrency_limiting_current_limit gauge
+gitaly_concurrency_limiting_current_limit{limit="testLimit1"} {testLimit1}
+gitaly_concurrency_limiting_current_limit{limit="testLimit2"} {testLimit2}
+# HELP gitaly_concurrency_limiting_watcher_errors_total Counter of the total number of watcher errors
+# TYPE gitaly_concurrency_limiting_watcher_errors_total counter
+gitaly_concurrency_limiting_watcher_errors_total{watcher="testWatcher"} 5
+
+`,
+ },
+ {
+ desc: "a watcher returns 6 consecutive timeout errors",
+ waitEvents: 7,
+ limits: []AdaptiveLimiter{
+ newTestLimit("testLimit1", 25, 100, 10, 0.5),
+ newTestLimit("testLimit2", 15, 30, 10, 0.5),
+ },
+ watchers: []ResourceWatcher{
+ newTestWatcher(
+ "testWatcher",
+ []string{"", "", "", "", "", "", ""},
+ []error{context.DeadlineExceeded, context.DeadlineExceeded, context.DeadlineExceeded, context.DeadlineExceeded, context.DeadlineExceeded, context.DeadlineExceeded, nil},
+ ),
+ },
+ expectedLimits: map[string][]int{
+ // The one next to the last triggers an event, but the last timeout does not trigger one.
+ "testLimit1": {25, 26, 27, 28, 29, 14, 15, 16},
+ "testLimit2": {15, 16, 17, 18, 19, 10, 11, 12},
+ },
+ expectedLogs: []string{
+ `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher`,
+ `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher`,
+ `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher`,
+ `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher`,
+ `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher`,
+ `level=info msg="Multiplicative decrease" limit=testLimit1 new_limit=14 previous_limit=29 reason="5 consecutive polling timeout errors" watcher=testWatcher`,
+ `level=info msg="Multiplicative decrease" limit=testLimit2 new_limit=10 previous_limit=19 reason="5 consecutive polling timeout errors" watcher=testWatcher`,
+ `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher`,
+ },
+ expectedMetrics: `# HELP gitaly_concurrency_limiting_backoff_events_total Counter of the total number of backoff events
+# TYPE gitaly_concurrency_limiting_backoff_events_total counter
+gitaly_concurrency_limiting_backoff_events_total{watcher="testWatcher"} 1
+# HELP gitaly_concurrency_limiting_current_limit The current limit value of an adaptive concurrency limit
+# TYPE gitaly_concurrency_limiting_current_limit gauge
+gitaly_concurrency_limiting_current_limit{limit="testLimit1"} {testLimit1}
+gitaly_concurrency_limiting_current_limit{limit="testLimit2"} {testLimit2}
+# HELP gitaly_concurrency_limiting_watcher_errors_total Counter of the total number of watcher errors
+# TYPE gitaly_concurrency_limiting_watcher_errors_total counter
+gitaly_concurrency_limiting_watcher_errors_total{watcher="testWatcher"} 6
+
+`,
+ },
+ {
+ desc: "multiple watchers returns 5 consecutive timeout errors",
+ waitEvents: 6,
+ limits: []AdaptiveLimiter{
+ newTestLimit("testLimit1", 25, 100, 10, 0.5),
+ newTestLimit("testLimit2", 15, 30, 10, 0.5),
+ },
+ watchers: []ResourceWatcher{
+ newTestWatcher(
+ "testWatcher1",
+ []string{"", "", "", "", "", ""},
+ []error{context.DeadlineExceeded, context.DeadlineExceeded, context.DeadlineExceeded, context.DeadlineExceeded, context.DeadlineExceeded, nil},
+ ),
+ newTestWatcher(
+ "testWatcher2",
+ []string{"", "", "", "", "", ""},
+ []error{context.DeadlineExceeded, context.DeadlineExceeded, context.DeadlineExceeded, context.DeadlineExceeded, context.DeadlineExceeded, nil},
+ ),
+ },
+ expectedLimits: map[string][]int{
+ "testLimit1": {25, 26, 27, 28, 29, 14, 15},
+ "testLimit2": {15, 16, 17, 18, 19, 10, 11},
+ },
+ expectedLogs: []string{
+ `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher1`,
+ `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher2`,
+ `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher1`,
+ `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher2`,
+ `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher1`,
+ `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher2`,
+ `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher1`,
+ `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher2`,
+ `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher1`,
+ `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher2`,
+ `level=info msg="Multiplicative decrease" limit=testLimit1 new_limit=14 previous_limit=29 reason="5 consecutive polling timeout errors" watcher=testWatcher2`,
+ `level=info msg="Multiplicative decrease" limit=testLimit2 new_limit=10 previous_limit=19 reason="5 consecutive polling timeout errors" watcher=testWatcher2`,
+ },
+ expectedMetrics: `# HELP gitaly_concurrency_limiting_backoff_events_total Counter of the total number of backoff events
+# TYPE gitaly_concurrency_limiting_backoff_events_total counter
+gitaly_concurrency_limiting_backoff_events_total{watcher="testWatcher1"} 1
+gitaly_concurrency_limiting_backoff_events_total{watcher="testWatcher2"} 1
+# HELP gitaly_concurrency_limiting_current_limit The current limit value of an adaptive concurrency limit
+# TYPE gitaly_concurrency_limiting_current_limit gauge
+gitaly_concurrency_limiting_current_limit{limit="testLimit1"} {testLimit1}
+gitaly_concurrency_limiting_current_limit{limit="testLimit2"} {testLimit2}
+# HELP gitaly_concurrency_limiting_watcher_errors_total Counter of the total number of watcher errors
+# TYPE gitaly_concurrency_limiting_watcher_errors_total counter
+gitaly_concurrency_limiting_watcher_errors_total{watcher="testWatcher1"} 5
+gitaly_concurrency_limiting_watcher_errors_total{watcher="testWatcher2"} 5
+
+`,
+ },
+ }
+ for _, tc := range tests {
+ t.Run(tc.desc, func(t *testing.T) {
+ logger, hook := test.NewNullLogger()
+ hook.Reset()
+ logger.SetLevel(logrus.InfoLevel)
+
+ ticker := helper.NewManualTicker()
+
+ calibration := 10 * time.Millisecond
+ calculator := NewAdaptiveCalculator(calibration, logger.WithContext(testhelper.Context(t)), tc.limits, tc.watchers)
+ calculator.tickerCreator = func(duration time.Duration) helper.Ticker { return ticker }
+
+ stop, err := calculator.Start(testhelper.Context(t))
+ require.NoError(t, err)
+ for i := 0; i <= tc.waitEvents; i++ {
+ ticker.Tick()
+ }
+ stop()
+
+ for name, expectedLimits := range tc.expectedLimits {
+ limit := findLimitWithName(tc.limits, name)
+ require.NotNil(t, limit, "not found limit with name %q", name)
+ require.Equal(t, expectedLimits, limit.currents[:tc.waitEvents+1])
+ }
+
+ // Replace the current limit in the metrics. The above test setup adds some time buffer. There
+ // might be some extra calibrations after the test finishes.
+ metrics := tc.expectedMetrics
+ for _, l := range tc.limits {
+ metrics = strings.Replace(metrics, fmt.Sprintf("{%s}", l.Name()), fmt.Sprintf("%d", l.Current()), -1)
+ }
+ assertLogs(t, tc.expectedLogs, hook.AllEntries())
+ require.NoError(t, testutil.CollectAndCompare(calculator, strings.NewReader(metrics),
+ "gitaly_concurrency_limiting_current_limit",
+ "gitaly_concurrency_limiting_backoff_events_total",
+ "gitaly_concurrency_limiting_watcher_errors_total",
+ ))
+ })
+ }
+}
+
+func assertLogs(t *testing.T, expectedLogs []string, entries []*logrus.Entry) {
+ require.Equal(t, len(expectedLogs), len(entries))
+ for index, expectedLog := range expectedLogs {
+ msg, err := entries[index].String()
+ require.NoError(t, err)
+ require.Contains(t, msg, expectedLog)
+ }
+}
+
+func findLimitWithName(limits []AdaptiveLimiter, name string) *testLimit {
+ for _, l := range limits {
+ limit := l.(*testLimit)
+ if limit.name == name {
+ return limit
+ }
+ }
+ return nil
+}
+
+type testLimit struct {
+ currents []int
+ name string
+ initial int
+ max int
+ min int
+ backoffBackoff float64
+}
+
+func newTestLimit(name string, initial int, max int, min int, backoff float64) *testLimit {
+ return &testLimit{name: name, initial: initial, max: max, min: min, backoffBackoff: backoff}
+}
+
+func (l *testLimit) Name() string { return l.name }
+func (l *testLimit) Current() int {
+ if len(l.currents) == 0 {
+ return 0
+ }
+ return l.currents[len(l.currents)-1]
+}
+func (l *testLimit) Update(val int) { l.currents = append(l.currents, val) }
+func (l *testLimit) Setting() AdaptiveSetting {
+ return AdaptiveSetting{
+ Initial: l.initial,
+ Max: l.max,
+ Min: l.min,
+ BackoffBackoff: l.backoffBackoff,
+ }
+}
+
+type testWatcher struct {
+ name string
+ events []*BackoffEvent
+ errors []error
+ index int
+}
+
+func (w *testWatcher) Name() string {
+ return w.name
+}
+
+func (w *testWatcher) Poll(context.Context) (*BackoffEvent, error) {
+ index := w.index
+ if index >= len(w.events) {
+ index = len(w.events) - 1
+ }
+ var err error
+ if w.errors != nil {
+ err = w.errors[index]
+ }
+ w.index++
+ return w.events[index], err
+}
+
+func newTestWatcher(name string, reasons []string, errors []error) *testWatcher {
+ var events []*BackoffEvent
+ for _, reason := range reasons {
+ event := &BackoffEvent{
+ ShouldBackoff: reason != "",
+ Reason: reason,
+ WatcherName: name,
+ }
+ events = append(events, event)
+ }
+ return &testWatcher{name: name, events: events, errors: errors}
+}
diff --git a/internal/limiter/adaptive_limit.go b/internal/limiter/adaptive_limit.go
new file mode 100644
index 000000000..2aefe5eb0
--- /dev/null
+++ b/internal/limiter/adaptive_limit.go
@@ -0,0 +1,48 @@
+package limiter
+
+import "sync/atomic"
+
+// AdaptiveSetting is a struct that holds the configuration parameters for an adaptive limiter.
+type AdaptiveSetting struct {
+ Initial int
+ Max int
+ Min int
+ BackoffBackoff float64
+}
+
+// AdaptiveLimiter is an interface for managing and updating adaptive limits.
+// It exposes methods to get the name, current limit value, update the limit value, and access its settings.
+type AdaptiveLimiter interface {
+ Name() string
+ Current() int
+ Update(val int)
+ Setting() AdaptiveSetting
+}
+
+// AdaptiveLimit is an implementation of the AdaptiveLimiter interface. It uses an atomic Int32 to represent the current
+// limit value, ensuring thread-safe updates.
+type AdaptiveLimit struct {
+ name string
+ current atomic.Int32
+ setting AdaptiveSetting
+}
+
+// Name returns the name of the adaptive limit
+func (l *AdaptiveLimit) Name() string {
+ return l.name
+}
+
+// Current returns the current limit. This function can be called without the need for synchronization.
+func (l *AdaptiveLimit) Current() int {
+ return int(l.current.Load())
+}
+
+// Update adjusts current limit value.
+func (l *AdaptiveLimit) Update(val int) {
+ l.current.Store(int32(val))
+}
+
+// Setting returns the configuration parameters for an adaptive limiter.
+func (l *AdaptiveLimit) Setting() AdaptiveSetting {
+ return l.setting
+}